galaxy.py 11 KB


  1. import jsonlines
  2. import os
  3. import logging
  4. # from twisted.python import log
  5. from pprint import pprint
  6. from colorama import Fore, Back, Style
  7. log = logging.getLogger(__name__)
  8. def merge(color_string):
  9. """ Given a string of colorama ANSI, merge them if you can. """
  10. return color_string.replace("m\x1b[", ";")
  11. PORT_CLASSES = {
  12. 1: "BBS",
  13. 2: "BSB",
  14. 3: "SBB",
  15. 4: "SSB",
  16. 5: "SBS",
  17. 6: "BSS",
  18. 7: "SSS",
  19. 8: "BBB",
  20. }
  21. CLASSES_PORT = {v: k for k, v in PORT_CLASSES.items()}
  22. class GameData(object):
  23. def __init__(self, usergame: tuple):
  24. # Construct the GameData storage object
  25. self.usergame = usergame
  26. self.warps = {}
  27. self.ports = {}
  28. self.config = {}
  29. # 10 = 300 bytes
  30. self.warp_groups = 10
  31. # 3 = 560 bytes
  32. # 5 = 930 bytes
  33. self.port_groups = 3
  34. def storage_filename(self):
  35. """ return filename
  36. username.lower _ game.upper.json
  37. """
  38. user, game = self.usergame
  39. return "{0}_{1}.json".format(user.lower(), game.upper())
  40. def reset_ports(self):
  41. self.ports = {}
  42. def reset_warps(self):
  43. self.warps = {}
  44. def display(self):
  45. pprint(self.warps)
  46. pprint(self.ports)
  47. def save(self, *_):
  48. """ save gamedata as jsonlines.
  49. Enable sort_keys=True to provide stable json data output.
  50. We also sorted(.keys()) to keep records in order.
  51. Note: There's a "bug" when serializing to json, keys must be strings!
  52. """
  53. filename = self.storage_filename()
  54. with jsonlines.open(filename, mode="w", sort_keys=True) as writer:
  55. # for warp, sectors in self.warps.items():
  56. c = {"config": self.config}
  57. writer.write(c)
  58. w = {"warp": {}}
  59. for warp in sorted(self.warps.keys()):
  60. sectors = self.warps[warp]
  61. # log.debug("save:", warp, sectors)
  62. sects = sorted(list(sectors)) # make a list
  63. w["warp"][warp] = sects
  64. if len(w["warp"]) >= self.warp_groups:
  65. writer.write(w)
  66. w = {"warp": {}}
  67. yield
  68. # log.debug(w)
  69. # writer.write(w)
  70. # yield
  71. if len(w["warp"]) > 1:
  72. writer.write(w)
  73. # for sector, port in self.ports.items():
  74. p = {"port": {}}
  75. for sector in sorted(self.ports.keys()):
  76. port = self.ports[sector]
  77. p["port"][sector] = port
  78. if len(p["port"]) >= self.port_groups:
  79. writer.write(p)
  80. p = {"port": {}}
  81. yield
  82. if len(p["port"]) > 1:
  83. writer.write(p)
  84. log.info(
  85. "Saved {0} {1}/{2}/{3}".format(
  86. filename, len(self.ports), len(self.warps), len(self.config)
  87. )
  88. )
  89. def load(self):
  90. filename = self.storage_filename()
  91. self.warps = {}
  92. self.ports = {}
  93. self.config = {}
  94. if os.path.exists(filename):
  95. # Load it
  96. with jsonlines.open(filename) as reader:
  97. for obj in reader:
  98. if "config" in obj:
  99. self.config.update(obj["config"])
  100. if "warp" in obj:
  101. for s, w in obj["warp"].items():
  102. # log.debug(s, w)
  103. self.warps[int(s)] = set(w)
  104. # self.warps.update(obj["warp"])
  105. if "port" in obj:
  106. for s, p in obj["port"].items():
  107. self.ports[int(s)] = p
  108. # self.ports.update(obj["port"])
  109. yield
  110. log.info(
  111. "Loaded {0} {1}/{2}/{3}".format(
  112. filename, len(self.ports), len(self.warps), len(self.config)
  113. )
  114. )
  115. def untwisted_load(self):
  116. """ Load file without twisted deferred
  117. This is for testing things out.
  118. """
  119. filename = self.storage_filename()
  120. self.warps = {}
  121. self.ports = {}
  122. self.config = {}
  123. if os.path.exists(filename):
  124. # Load it
  125. with jsonlines.open(filename) as reader:
  126. for obj in reader:
  127. if "config" in obj:
  128. self.config.update(obj["config"])
  129. if "warp" in obj:
  130. for s, w in obj["warp"].items():
  131. # log.debug(s, w)
  132. self.warps[int(s)] = set(w)
  133. # self.warps.update(obj["warp"])
  134. if "port" in obj:
  135. for s, p in obj["port"].items():
  136. self.ports[int(s)] = p
  137. # self.ports.update(obj["port"])
  138. log.info(
  139. "Loaded {0} {1}/{2}/{3}".format(
  140. filename, len(self.ports), len(self.warps), len(self.config)
  141. )
  142. )
  143. def get_warps(self, sector: int):
  144. if sector in self.warps:
  145. return self.warps[sector]
  146. return None
  147. def warp_to(self, source: int, *dest):
  148. """ connect sector source to destination.
  149. """
  150. log.debug("Warp {0} to {1}".format(source, dest))
  151. if source not in self.warps:
  152. self.warps[source] = set()
  153. for d in dest:
  154. if d not in self.warps[source]:
  155. self.warps[source].add(d)
  156. def get_config(self, key, default=None):
  157. if key in self.config:
  158. return self.config[key]
  159. return default
  160. def set_config(self, key, value):
  161. self.config.update({key: value})
  162. def set_port(self, sector: int, data: dict):
  163. log.debug("Port {0} : {1}".format(sector, data))
  164. if sector not in self.ports:
  165. self.ports[sector] = dict()
  166. self.ports[sector].update(data)
  167. if "port" not in self.ports[sector]:
  168. # incomplete port type - can we "complete" it?
  169. if all(x in self.ports for x in ["fuel", "org", "equ"]):
  170. # We have all of the port types, so:
  171. port = "".join(
  172. [self.ports[sector][x]["sale"] for x in ["fuel", "org", "equ"]]
  173. )
  174. self.ports[sector]["port"] = port
  175. self.ports[sector]["class"] = CLASSES_PORT[port]
  176. log.debug("completed {0} : {1}".format(sector, self.ports[sector]))
  177. def port_buying(self, sector: int, cargo: str):
  178. """ Given a sector, is this port buying this?
  179. cargo is a char (F/O/E)
  180. """
  181. cargo_to_index = {"F": 0, "O": 1, "E": 2}
  182. cargo_types = ("fuel", "org", "equ")
  183. cargo = cargo[0]
  184. if sector not in self.ports:
  185. log.warn("port_buying( {0}, {1}): sector unknown!".format(sector, cargo))
  186. return False
  187. port = self.ports[sector]
  188. if "port" not in port:
  189. log.warn("port_buying( {0}, {1}): port unknown!".format(sector, cargo))
  190. return True
  191. cargo_index = cargo_to_index[cargo]
  192. if port["port"][cargo_index] == "S":
  193. log.warn("port_buying( {0}, {1}): not buying cargo".format(sector, cargo))
  194. return False
  195. # ok, they buy it, but *WILL THEY* really buy it?
  196. cargo_key = cargo_types[cargo_index]
  197. if cargo_key in port:
  198. if int(port[cargo_key]["units"]) > 40:
  199. log.warn(
  200. "port_buying( {0}, {1}): Yes, buying {2}".format(
  201. sector, cargo, port[cargo_key]["sale"]
  202. )
  203. )
  204. return True
  205. else:
  206. log.warn(
  207. "port_buying( {0}, {1}): Yes, buying (but values unknown)".format(
  208. sector, cargo
  209. )
  210. )
  211. return True # unknown port, we're guess yes.
  212. return False
  213. @staticmethod
  214. def port_burnt(port: dict):
  215. """ Is this port burned out? """
  216. if all(x in port for x in ["fuel", "org", "equ"]):
  217. if all("pct" in port[x] for x in ["fuel", "org", "equ"]):
  218. if (
  219. port["equ"]["pct"] <= 20
  220. or port["fuel"]["pct"] <= 20
  221. or port["org"]["pct"] <= 20
  222. ):
  223. return True
  224. return False
  225. # Since we don't have any port information, hope for the best, assume it isn't burnt.
  226. return False
  227. @staticmethod
  228. def flip(buy_sell):
  229. # Invert B's and S's to determine if we can trade or not between ports.
  230. return buy_sell.replace("S", "W").replace("B", "S").replace("W", "B")
  231. @staticmethod
  232. def port_trading(port1, port2):
  233. # Given the port settings, can we trade between these?
  234. if port1 == port2:
  235. return False
  236. p1 = [c for c in port1]
  237. p2 = [c for c in port2]
  238. rem = False
  239. for i in range(3):
  240. if p1[i] == p2[i]:
  241. p1[i] = "X"
  242. p2[i] = "X"
  243. rem = True
  244. if rem:
  245. j1 = "".join(p1).replace("X", "")
  246. j2 = "".join(p2).replace("X", "")
  247. if j1 == "BS" and j2 == "SB":
  248. return True
  249. if j1 == "SB" and j2 == "BS":
  250. return True
  251. rport1 = GameData.flip(port1)
  252. c = 0
  253. match = []
  254. for i in range(3):
  255. if rport1[i] == port2[i]:
  256. match.append(port2[i])
  257. c += 1
  258. if c > 1:
  259. # Remove first match, flip it
  260. f = GameData.flip(match.pop(0))
  261. # Verify it is in there.
  262. # so we're not matching SSS/BBB
  263. if f in match:
  264. return True
  265. return False
  266. return False
  267. @staticmethod
  268. def color_pct(pct: int):
  269. if pct > 50:
  270. # green
  271. return "{0}{1:3}{2}".format(Fore.GREEN, pct, Style.RESET_ALL)
  272. elif pct > 25:
  273. return "{0}{1:3}{2}".format(
  274. merge(Fore.YELLOW + Style.BRIGHT), pct, Style.RESET_ALL
  275. )
  276. else:
  277. return "{0:3}".format(pct)
  278. @staticmethod
  279. def port_pct(port: dict):
  280. # Make sure these exist in the port data given.
  281. if all(x in port for x in ["fuel", "org", "equ"]):
  282. return "{0},{1},{2}%".format(
  283. GameData.color_pct(port["fuel"]["pct"]),
  284. GameData.color_pct(port["org"]["pct"]),
  285. GameData.color_pct(port["equ"]["pct"]),
  286. )
  287. else:
  288. return "---,---,---%"
  289. @staticmethod
  290. def port_show_part(sector: int, sector_port: dict):
  291. return "{0:5} ({1}) {2}".format(
  292. sector, sector_port["port"], GameData.port_pct(sector_port)
  293. )
  294. def port_trade_show(self, sector: int, warp: int):
  295. sector_port = self.ports[sector]
  296. warp_port = self.ports[warp]
  297. sector_pct = GameData.port_pct(sector_port)
  298. warp_pct = GameData.port_pct(warp_port)
  299. return "{0} \xae\xcd\xaf {1}".format(
  300. GameData.port_show_part(sector, sector_port),
  301. GameData.port_show_part(warp, warp_port),
  302. )
  303. @staticmethod
  304. def port_show(sector: int, sector_port: dict, warp: int, warp_port: dict):
  305. sector_pct = GameData.port_pct(sector_port)
  306. warp_pct = GameData.port_pct(warp_port)
  307. return "{0} -=- {1}".format(
  308. GameData.port_show_part(sector, sector_port),
  309. GameData.port_show_part(warp, warp_port),
  310. )