galaxy.py 17 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508
  1. import jsonlines
  2. import os
  3. import logging
  4. # from twisted.python import log
  5. from pprint import pprint
  6. from colorama import Fore, Back, Style
  7. log = logging.getLogger(__name__)
  8. def merge(color_string):
  9. """ Given a string of colorama ANSI, merge them if you can. """
  10. return color_string.replace("m\x1b[", ";")
  11. PORT_CLASSES = {
  12. 1: "BBS",
  13. 2: "BSB",
  14. 3: "SBB",
  15. 4: "SSB",
  16. 5: "SBS",
  17. 6: "BSS",
  18. 7: "SSS",
  19. 8: "BBB",
  20. }
  21. CLASSES_PORT = {v: k for k, v in PORT_CLASSES.items()}
  22. class GameData(object):
  23. def __init__(self, usergame: tuple):
  24. # Construct the GameData storage object
  25. self.usergame = usergame
  26. self.warps = {}
  27. self.ports = {}
  28. self.config = {}
  29. # 10 = 300 bytes
  30. self.warp_groups = 10
  31. # 3 = 560 bytes
  32. # 5 = 930 bytes
  33. self.port_groups = 3
  34. def storage_filename(self):
  35. """ return filename
  36. username.lower _ game.upper.json
  37. """
  38. user, game = self.usergame
  39. return "{0}_{1}.json".format(user.lower(), game.upper())
  40. def reset_ports(self):
  41. self.ports = {}
  42. def reset_warps(self):
  43. self.warps = {}
  44. def special_ports(self):
  45. """ Save the special class ports 0, 9 """
  46. return {
  47. p: self.ports[p] for p in self.ports if 'class' in self.ports[p] and self.ports[p]["class"] in (0, 9)
  48. }
  49. def display(self):
  50. pprint(self.warps)
  51. pprint(self.ports)
  52. def save(self, *_):
  53. """ save gamedata as jsonlines.
  54. Enable sort_keys=True to provide stable json data output.
  55. We also sorted(.keys()) to keep records in order.
  56. Note: There's a "bug" when serializing to json, keys must be strings!
  57. """
  58. filename = self.storage_filename()
  59. with jsonlines.open(filename, mode="w", sort_keys=True) as writer:
  60. # for warp, sectors in self.warps.items():
  61. c = {"config": self.config}
  62. writer.write(c)
  63. w = {"warp": {}}
  64. for warp in sorted(self.warps.keys()):
  65. sectors = self.warps[warp]
  66. # log.debug("save:", warp, sectors)
  67. sects = sorted(list(sectors)) # make a list
  68. w["warp"][warp] = sects
  69. if len(w["warp"]) >= self.warp_groups:
  70. writer.write(w)
  71. w = {"warp": {}}
  72. yield
  73. # log.debug(w)
  74. # writer.write(w)
  75. # yield
  76. if len(w["warp"]) > 0:
  77. writer.write(w)
  78. # for sector, port in self.ports.items():
  79. p = {"port": {}}
  80. for sector in sorted(self.ports.keys()):
  81. port = self.ports[sector]
  82. p["port"][sector] = port
  83. if len(p["port"]) >= self.port_groups:
  84. writer.write(p)
  85. p = {"port": {}}
  86. yield
  87. if len(p["port"]) > 0:
  88. writer.write(p)
  89. log.info(
  90. "Saved {0} {1}/{2}/{3}".format(
  91. filename, len(self.ports), len(self.warps), len(self.config)
  92. )
  93. )
  94. def untwisted_save(self, *_):
  95. """ save gamedata as jsonlines.
  96. Enable sort_keys=True to provide stable json data output.
  97. We also sorted(.keys()) to keep records in order.
  98. Note: There's a "bug" when serializing to json, keys must be strings!
  99. """
  100. filename = self.storage_filename()
  101. with jsonlines.open(filename, mode="w", sort_keys=True) as writer:
  102. # for warp, sectors in self.warps.items():
  103. c = {"config": self.config}
  104. writer.write(c)
  105. w = {"warp": {}}
  106. for warp in sorted(self.warps.keys()):
  107. sectors = self.warps[warp]
  108. # log.debug("save:", warp, sectors)
  109. sects = sorted(list(sectors)) # make a list
  110. w["warp"][warp] = sects
  111. if len(w["warp"]) >= self.warp_groups:
  112. writer.write(w)
  113. w = {"warp": {}}
  114. # log.debug(w)
  115. # writer.write(w)
  116. # yield
  117. if len(w["warp"]) > 0:
  118. writer.write(w)
  119. # for sector, port in self.ports.items():
  120. p = {"port": {}}
  121. for sector in sorted(self.ports.keys()):
  122. port = self.ports[sector]
  123. p["port"][sector] = port
  124. if len(p["port"]) >= self.port_groups:
  125. writer.write(p)
  126. p = {"port": {}}
  127. if len(p["port"]) > 0:
  128. writer.write(p)
  129. log.info(
  130. "Saved {0} {1}/{2}/{3}".format(
  131. filename, len(self.ports), len(self.warps), len(self.config)
  132. )
  133. )
  134. def load(self):
  135. filename = self.storage_filename()
  136. self.warps = {}
  137. self.ports = {}
  138. self.config = {}
  139. if os.path.exists(filename):
  140. # Load it
  141. with jsonlines.open(filename) as reader:
  142. for obj in reader:
  143. if "config" in obj:
  144. self.config.update(obj["config"])
  145. if "warp" in obj:
  146. for s, w in obj["warp"].items():
  147. # log.debug(s, w)
  148. self.warps[int(s)] = set(w)
  149. # self.warps.update(obj["warp"])
  150. if "port" in obj:
  151. for s, p in obj["port"].items():
  152. self.ports[int(s)] = p
  153. # self.ports.update(obj["port"])
  154. yield
  155. log.info(
  156. "Loaded {0} {1}/{2}/{3}".format(
  157. filename, len(self.ports), len(self.warps), len(self.config)
  158. )
  159. )
  160. def untwisted_load(self):
  161. """ Load file without twisted deferred
  162. This is for testing things out.
  163. """
  164. filename = self.storage_filename()
  165. self.warps = {}
  166. self.ports = {}
  167. self.config = {}
  168. if os.path.exists(filename):
  169. # Load it
  170. with jsonlines.open(filename) as reader:
  171. for obj in reader:
  172. if "config" in obj:
  173. self.config.update(obj["config"])
  174. if "warp" in obj:
  175. for s, w in obj["warp"].items():
  176. # log.debug(s, w)
  177. self.warps[int(s)] = set(w)
  178. # self.warps.update(obj["warp"])
  179. if "port" in obj:
  180. for s, p in obj["port"].items():
  181. self.ports[int(s)] = p
  182. # self.ports.update(obj["port"])
  183. log.info(
  184. "Loaded {0} {1}/{2}/{3}".format(
  185. filename, len(self.ports), len(self.warps), len(self.config)
  186. )
  187. )
  188. def get_warps(self, sector: int):
  189. if sector in self.warps:
  190. return self.warps[sector]
  191. return None
  192. def warp_to(self, source: int, *dest):
  193. """ connect sector source to destination.
  194. """
  195. log.debug("Warp {0} to {1}".format(source, dest))
  196. if source not in self.warps:
  197. self.warps[source] = set()
  198. for d in dest:
  199. if d not in self.warps[source]:
  200. self.warps[source].add(d)
  201. def get_config(self, key, default=None):
  202. if key in self.config:
  203. return self.config[key]
  204. else:
  205. if default is not None:
  206. self.config[key] = default
  207. return default
  208. def set_config(self, key, value):
  209. self.config.update({key: value})
  210. def set_port(self, sector: int, data: dict):
  211. log.debug("Port {0} : {1}".format(sector, data))
  212. if sector not in self.ports:
  213. self.ports[sector] = dict()
  214. self.ports[sector].update(data)
  215. if "port" not in self.ports[sector]:
  216. # incomplete port type - can we "complete" it?
  217. if all(x in self.ports for x in ["fuel", "org", "equ"]):
  218. # We have all of the port types, so:
  219. port = "".join(
  220. [self.ports[sector][x]["sale"] for x in ["fuel", "org", "equ"]]
  221. )
  222. self.ports[sector]["port"] = port
  223. self.ports[sector]["class"] = CLASSES_PORT[port]
  224. log.debug("completed {0} : {1}".format(sector, self.ports[sector]))
  225. def port_buying(self, sector: int, cargo: str):
  226. """ Given a sector, is this port buying this?
  227. cargo is a char (F/O/E)
  228. """
  229. cargo_to_index = {"F": 0, "O": 1, "E": 2}
  230. cargo_types = ("fuel", "org", "equ")
  231. cargo = cargo[0]
  232. if sector not in self.ports:
  233. log.warn("port_buying( {0}, {1}): sector unknown!".format(sector, cargo))
  234. return False
  235. port = self.ports[sector]
  236. if "port" not in port:
  237. log.warn("port_buying( {0}, {1}): port unknown!".format(sector, cargo))
  238. return True
  239. cargo_index = cargo_to_index[cargo]
  240. if port["port"] in ('Special', 'StarDock'):
  241. log.warn("port_buying( {0}, {1}): not buying (is {2})".format(sector, cargo, port['port']))
  242. return False
  243. if port["port"][cargo_index] == "S":
  244. log.warn("port_buying( {0}, {1}): not buying cargo".format(sector, cargo))
  245. return False
  246. # ok, they buy it, but *WILL THEY* really buy it?
  247. cargo_key = cargo_types[cargo_index]
  248. if cargo_key in port:
  249. if int(port[cargo_key]["units"]) > 40:
  250. log.warn(
  251. "port_buying( {0}, {1}): Yes, buying {2}".format(
  252. sector, cargo, port[cargo_key]["sale"]
  253. )
  254. )
  255. return True
  256. else:
  257. log.warn(
  258. "port_buying( {0}, {1}): No, units < 40 {2}".format(
  259. sector, cargo, port[cargo_key]["sale"]
  260. )
  261. )
  262. return False
  263. else:
  264. log.warn(
  265. "port_buying( {0}, {1}): Yes, buying (but values unknown)".format(
  266. sector, cargo
  267. )
  268. )
  269. return True # unknown port, we're guess yes.
  270. return False
  271. @staticmethod
  272. def port_burnt(port: dict):
  273. """ Is this port burned out? """
  274. if all(x in port for x in ["fuel", "org", "equ"]):
  275. if all("pct" in port[x] for x in ["fuel", "org", "equ"]):
  276. if (
  277. port["equ"]["pct"] <= 20
  278. or port["fuel"]["pct"] <= 20
  279. or port["org"]["pct"] <= 20
  280. ):
  281. return True
  282. return False
  283. # Since we don't have any port information, hope for the best, assume it isn't burnt.
  284. return False
  285. @staticmethod
  286. def flip(buy_sell):
  287. # Invert B's and S's to determine if we can trade or not between ports.
  288. return buy_sell.replace("S", "W").replace("B", "S").replace("W", "B")
  289. @staticmethod
  290. def port_trading(port1, port2):
  291. # Given the port settings, can we trade between these?
  292. if port1 == port2:
  293. return False
  294. if port1 in ('Special', 'StarDock') or port2 in ('Special', 'StarDock'):
  295. return False
  296. p1 = [c for c in port1]
  297. p2 = [c for c in port2]
  298. rem = False
  299. for i in range(3):
  300. if p1[i] == p2[i]:
  301. p1[i] = "X"
  302. p2[i] = "X"
  303. rem = True
  304. if rem:
  305. j1 = "".join(p1).replace("X", "")
  306. j2 = "".join(p2).replace("X", "")
  307. if j1 == "BS" and j2 == "SB":
  308. return True
  309. if j1 == "SB" and j2 == "BS":
  310. return True
  311. rport1 = GameData.flip(port1)
  312. c = 0
  313. match = []
  314. for i in range(3):
  315. if rport1[i] == port2[i]:
  316. match.append(port2[i])
  317. c += 1
  318. if c > 1:
  319. # Remove first match, flip it
  320. f = GameData.flip(match.pop(0))
  321. # Verify it is in there.
  322. # so we're not matching SSS/BBB
  323. if f in match:
  324. return True
  325. return False
  326. return False
  327. @staticmethod
  328. def color_pct(pct: int):
  329. if pct > 50:
  330. # green
  331. return "{0}{1:3}{2}".format(Fore.GREEN, pct, Style.RESET_ALL)
  332. elif pct > 25:
  333. return "{0}{1:3}{2}".format(
  334. merge(Fore.YELLOW + Style.BRIGHT), pct, Style.RESET_ALL
  335. )
  336. else:
  337. return "{0:3}".format(pct)
  338. @staticmethod
  339. def port_pct(port: dict):
  340. # Make sure these exist in the port data given.
  341. if all(x in port for x in ["fuel", "org", "equ"]):
  342. return "{0},{1},{2}%".format(
  343. GameData.color_pct(port["fuel"]["pct"]),
  344. GameData.color_pct(port["org"]["pct"]),
  345. GameData.color_pct(port["equ"]["pct"]),
  346. )
  347. else:
  348. return "---,---,---%"
  349. @staticmethod
  350. def port_show_part(sector: int, sector_port: dict):
  351. return "{0:5} ({1}) {2}".format(
  352. sector, sector_port["port"], GameData.port_pct(sector_port)
  353. )
  354. def port_above(self, port: dict, limit: int) -> bool:
  355. if all(x in port for x in ["fuel", "org", "equ"]):
  356. if all(
  357. x in port and port[x]["pct"] >= limit for x in ["fuel", "org", "equ"]
  358. ):
  359. return True
  360. else:
  361. return False
  362. # Port is unknown, we'll assume it is above the limit.
  363. return True
  364. def port_trade_show(self, sector: int, warp: int, limit: int = 90):
  365. sector_port = self.ports[sector]
  366. warp_port = self.ports[warp]
  367. if self.port_above(sector_port, limit) and self.port_above(warp_port, limit):
  368. # sector_pct = GameData.port_pct(sector_port)
  369. # warp_pct = GameData.port_pct(warp_port)
  370. return "{0} \xae\xcd\xaf {1}".format(
  371. GameData.port_show_part(sector, sector_port),
  372. GameData.port_show_part(warp, warp_port),
  373. )
  374. return None
  375. @staticmethod
  376. def port_show(sector: int, sector_port: dict, warp: int, warp_port: dict):
  377. # sector_pct = GameData.port_pct(sector_port)
  378. # warp_pct = GameData.port_pct(warp_port)
  379. return "{0} -=- {1}".format(
  380. GameData.port_show_part(sector, sector_port),
  381. GameData.port_show_part(warp, warp_port),
  382. )
  383. def find_nearest_selling(
  384. self, sector: int, selling: str, at_least: int = 100
  385. ) -> int:
  386. """ find nearest port that is selling at_least amount of this item
  387. selling is 'f', 'o', or 'e'.
  388. """
  389. names = {"e": "equ", "o": "org", "f": "fuel"}
  390. pos = {"f": 0, "o": 1, "e": 2}
  391. sell = names[selling[0].lower()]
  392. s_pos = pos[selling[0].lower()]
  393. log.warn(
  394. "find_nearest_selling({0}, {1}, {2}): {3}, {4}".format(
  395. sector, selling, at_least, sell, s_pos
  396. )
  397. )
  398. searched = set()
  399. if sector not in self.warps:
  400. log.warn("( {0}, {1}): sector not in warps".format(sector, selling))
  401. return 0
  402. # Start with the current sector
  403. look = set((sector,))
  404. while len(look) > 0:
  405. for s in look:
  406. if s in self.ports:
  407. # Ok, possibly?
  408. sp = self.ports[s]
  409. if sp['port'] in ('Special', 'StarDock'):
  410. continue
  411. if sp["port"][s_pos] == "S":
  412. # Ok, they are selling!
  413. if sell in sp:
  414. if int(sp[sell]["units"]) >= at_least:
  415. log.warn(
  416. "find_nearest_selling( {0}, {1}): {2} {3} units".format(
  417. sector, selling, s, sp[sell]["units"]
  418. )
  419. )
  420. return s
  421. else:
  422. # We know they sell it, but we don't know units
  423. log.warn(
  424. "find_nearest_selling( {0}, {1}): {2} {3}".format(
  425. sector, selling, s, sp["port"]
  426. )
  427. )
  428. return s
  429. # Ok, not found here. Branch out.
  430. searched.update(look)
  431. step_from = look
  432. look = set()
  433. for s in step_from:
  434. if s in self.warps:
  435. look.update(self.warps[s])
  436. # look only contains warps we haven't searched
  437. look = look.difference(searched)
  438. # Ok, we have run out of places to search
  439. log.warn("find_nearest_selling( {0}, {1}) : failed".format(sector, selling))
  440. return 0