galaxy.py 20 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589
  1. import jsonlines
  2. import os
  3. import logging
  4. # from twisted.python import log
  5. from pprint import pprint
  6. from colorama import Fore, Back, Style
  7. log = logging.getLogger(__name__)
  8. def merge(color_string):
  9. """ Given a string of colorama ANSI, merge them if you can. """
  10. return color_string.replace("m\x1b[", ";")
  11. PORT_CLASSES = {
  12. 1: "BBS",
  13. 2: "BSB",
  14. 3: "SBB",
  15. 4: "SSB",
  16. 5: "SBS",
  17. 6: "BSS",
  18. 7: "SSS",
  19. 8: "BBB",
  20. }
  21. CLASSES_PORT = {v: k for k, v in PORT_CLASSES.items()}
  22. class GameData(object):
  23. def __init__(self, usergame: tuple):
  24. # Construct the GameData storage object
  25. self.usergame = usergame
  26. self.warps = {}
  27. self.ports = {}
  28. self.config = {}
  29. # 10 = 300 bytes
  30. self.warp_groups = 10
  31. # 3 = 560 bytes
  32. # 5 = 930 bytes
  33. self.port_groups = 3
  34. def storage_filename(self):
  35. """ return filename
  36. username.lower _ game.upper.json
  37. """
  38. user, game = self.usergame
  39. return "{0}_{1}.json".format(user.lower(), game.upper())
  40. def reset_ports(self):
  41. self.ports = {}
  42. def reset_warps(self):
  43. self.warps = {}
  44. def special_ports(self):
  45. """ Save the special class ports 0, 9 """
  46. return {
  47. p: self.ports[p]
  48. for p in self.ports
  49. if "class" in self.ports[p] and self.ports[p]["class"] in (0, 9)
  50. }
  51. def display(self):
  52. pprint(self.warps)
  53. pprint(self.ports)
  54. def save(self, *_):
  55. """ save gamedata as jsonlines.
  56. Enable sort_keys=True to provide stable json data output.
  57. We also sorted(.keys()) to keep records in order.
  58. Note: There's a "bug" when serializing to json, keys must be strings!
  59. """
  60. filename = self.storage_filename()
  61. with jsonlines.open(filename, mode="w", sort_keys=True) as writer:
  62. # for warp, sectors in self.warps.items():
  63. c = {"config": self.config}
  64. writer.write(c)
  65. w = {"warp": {}}
  66. for warp in sorted(self.warps.keys()):
  67. sectors = self.warps[warp]
  68. # log.debug("save:", warp, sectors)
  69. sects = sorted(list(sectors)) # make a list
  70. w["warp"][warp] = sects
  71. if len(w["warp"]) >= self.warp_groups:
  72. writer.write(w)
  73. w = {"warp": {}}
  74. yield
  75. # log.debug(w)
  76. # writer.write(w)
  77. # yield
  78. if len(w["warp"]) > 0:
  79. writer.write(w)
  80. # for sector, port in self.ports.items():
  81. p = {"port": {}}
  82. for sector in sorted(self.ports.keys()):
  83. port = self.ports[sector]
  84. p["port"][sector] = port
  85. if len(p["port"]) >= self.port_groups:
  86. writer.write(p)
  87. p = {"port": {}}
  88. yield
  89. if len(p["port"]) > 0:
  90. writer.write(p)
  91. log.info(
  92. "Saved {0} {1}/{2}/{3}".format(
  93. filename, len(self.ports), len(self.warps), len(self.config)
  94. )
  95. )
  96. def untwisted_save(self, *_):
  97. """ save gamedata as jsonlines.
  98. Enable sort_keys=True to provide stable json data output.
  99. We also sorted(.keys()) to keep records in order.
  100. Note: There's a "bug" when serializing to json, keys must be strings!
  101. """
  102. filename = self.storage_filename()
  103. with jsonlines.open(filename, mode="w", sort_keys=True) as writer:
  104. # for warp, sectors in self.warps.items():
  105. c = {"config": self.config}
  106. writer.write(c)
  107. w = {"warp": {}}
  108. for warp in sorted(self.warps.keys()):
  109. sectors = self.warps[warp]
  110. # log.debug("save:", warp, sectors)
  111. sects = sorted(list(sectors)) # make a list
  112. w["warp"][warp] = sects
  113. if len(w["warp"]) >= self.warp_groups:
  114. writer.write(w)
  115. w = {"warp": {}}
  116. # log.debug(w)
  117. # writer.write(w)
  118. # yield
  119. if len(w["warp"]) > 0:
  120. writer.write(w)
  121. # for sector, port in self.ports.items():
  122. p = {"port": {}}
  123. for sector in sorted(self.ports.keys()):
  124. port = self.ports[sector]
  125. p["port"][sector] = port
  126. if len(p["port"]) >= self.port_groups:
  127. writer.write(p)
  128. p = {"port": {}}
  129. if len(p["port"]) > 0:
  130. writer.write(p)
  131. log.info(
  132. "Saved {0} {1}/{2}/{3}".format(
  133. filename, len(self.ports), len(self.warps), len(self.config)
  134. )
  135. )
  136. def load(self):
  137. filename = self.storage_filename()
  138. self.warps = {}
  139. self.ports = {}
  140. self.config = {}
  141. if os.path.exists(filename):
  142. # Load it
  143. with jsonlines.open(filename) as reader:
  144. for obj in reader:
  145. if "config" in obj:
  146. self.config.update(obj["config"])
  147. if "warp" in obj:
  148. for s, w in obj["warp"].items():
  149. # log.debug(s, w)
  150. self.warps[int(s)] = set(w)
  151. # self.warps.update(obj["warp"])
  152. if "port" in obj:
  153. for s, p in obj["port"].items():
  154. self.ports[int(s)] = p
  155. # self.ports.update(obj["port"])
  156. yield
  157. log.info(
  158. "Loaded {0} {1}/{2}/{3}".format(
  159. filename, len(self.ports), len(self.warps), len(self.config)
  160. )
  161. )
  162. def untwisted_load(self):
  163. """ Load file without twisted deferred
  164. This is for testing things out.
  165. """
  166. filename = self.storage_filename()
  167. self.warps = {}
  168. self.ports = {}
  169. self.config = {}
  170. if os.path.exists(filename):
  171. # Load it
  172. with jsonlines.open(filename) as reader:
  173. for obj in reader:
  174. if "config" in obj:
  175. self.config.update(obj["config"])
  176. if "warp" in obj:
  177. for s, w in obj["warp"].items():
  178. # log.debug(s, w)
  179. self.warps[int(s)] = set(w)
  180. # self.warps.update(obj["warp"])
  181. if "port" in obj:
  182. for s, p in obj["port"].items():
  183. self.ports[int(s)] = p
  184. # self.ports.update(obj["port"])
  185. log.info(
  186. "Loaded {0} {1}/{2}/{3}".format(
  187. filename, len(self.ports), len(self.warps), len(self.config)
  188. )
  189. )
  190. def get_warps(self, sector: int):
  191. if sector in self.warps:
  192. return self.warps[sector]
  193. return None
  194. def warp_to(self, source: int, *dest):
  195. """ connect sector source to destination.
  196. """
  197. log.debug("Warp {0} to {1}".format(source, dest))
  198. if source not in self.warps:
  199. self.warps[source] = set()
  200. for d in dest:
  201. if d not in self.warps[source]:
  202. self.warps[source].add(d)
  203. def get_config(self, key, default=None):
  204. if key in self.config:
  205. return self.config[key]
  206. else:
  207. if default is not None:
  208. self.config[key] = default
  209. return default
  210. def set_config(self, key, value):
  211. self.config.update({key: value})
  212. def set_port(self, sector: int, data: dict):
  213. log.debug("Port {0} : {1}".format(sector, data))
  214. if sector not in self.ports:
  215. self.ports[sector] = dict()
  216. self.ports[sector].update(data)
  217. if "port" not in self.ports[sector]:
  218. # incomplete port type - can we "complete" it?
  219. if all(x in self.ports for x in ["fuel", "org", "equ"]):
  220. # We have all of the port types, so:
  221. port = "".join(
  222. [self.ports[sector][x]["sale"] for x in ["fuel", "org", "equ"]]
  223. )
  224. self.ports[sector]["port"] = port
  225. self.ports[sector]["class"] = CLASSES_PORT[port]
  226. log.debug("completed {0} : {1}".format(sector, self.ports[sector]))
  227. def port_buying(self, sector: int, cargo: str):
  228. """ Given a sector, is this port buying this?
  229. cargo is a char (F/O/E)
  230. """
  231. cargo_to_index = {"F": 0, "O": 1, "E": 2}
  232. cargo_types = ("fuel", "org", "equ")
  233. cargo = cargo[0]
  234. if sector not in self.ports:
  235. log.warn("port_buying( {0}, {1}): sector unknown!".format(sector, cargo))
  236. return False
  237. port = self.ports[sector]
  238. if "port" not in port:
  239. log.warn("port_buying( {0}, {1}): port unknown!".format(sector, cargo))
  240. return True
  241. cargo_index = cargo_to_index[cargo]
  242. if port["port"] in ("Special", "StarDock"):
  243. log.warn(
  244. "port_buying( {0}, {1}): not buying (is {2})".format(
  245. sector, cargo, port["port"]
  246. )
  247. )
  248. return False
  249. if port["port"][cargo_index] == "S":
  250. log.warn("port_buying( {0}, {1}): not buying cargo".format(sector, cargo))
  251. return False
  252. # ok, they buy it, but *WILL THEY* really buy it?
  253. cargo_key = cargo_types[cargo_index]
  254. if cargo_key in port:
  255. if int(port[cargo_key]["units"]) > 40:
  256. log.warn(
  257. "port_buying( {0}, {1}): Yes, buying {2}".format(
  258. sector, cargo, port[cargo_key]["sale"]
  259. )
  260. )
  261. return True
  262. else:
  263. log.warn(
  264. "port_buying( {0}, {1}): No, units < 40 {2}".format(
  265. sector, cargo, port[cargo_key]["sale"]
  266. )
  267. )
  268. return False
  269. else:
  270. log.warn(
  271. "port_buying( {0}, {1}): Yes, buying (but values unknown)".format(
  272. sector, cargo
  273. )
  274. )
  275. return True # unknown port, we're guess yes.
  276. return False
  277. @staticmethod
  278. def port_burnt(port: dict):
  279. """ Is this port burned out? """
  280. if all(x in port for x in ["fuel", "org", "equ"]):
  281. if all("pct" in port[x] for x in ["fuel", "org", "equ"]):
  282. if (
  283. port["equ"]["pct"] <= 20
  284. or port["fuel"]["pct"] <= 20
  285. or port["org"]["pct"] <= 20
  286. ):
  287. return True
  288. return False
  289. # Since we don't have any port information, hope for the best, assume it isn't burnt.
  290. return False
  291. @staticmethod
  292. def flip(buy_sell):
  293. # Invert B's and S's to determine if we can trade or not between ports.
  294. return buy_sell.replace("S", "W").replace("B", "S").replace("W", "B")
  295. @staticmethod
  296. def port_trading(port1, port2):
  297. # Given the port settings, can we trade between these?
  298. if port1 == port2:
  299. return False
  300. if port1 in ("Special", "StarDock") or port2 in ("Special", "StarDock"):
  301. return False
  302. p1 = [c for c in port1]
  303. p2 = [c for c in port2]
  304. rem = False
  305. for i in range(3):
  306. if p1[i] == p2[i]:
  307. p1[i] = "X"
  308. p2[i] = "X"
  309. rem = True
  310. if rem:
  311. j1 = "".join(p1).replace("X", "")
  312. j2 = "".join(p2).replace("X", "")
  313. if j1 == "BS" and j2 == "SB":
  314. return True
  315. if j1 == "SB" and j2 == "BS":
  316. return True
  317. rport1 = GameData.flip(port1)
  318. c = 0
  319. match = []
  320. for i in range(3):
  321. if rport1[i] == port2[i]:
  322. match.append(port2[i])
  323. c += 1
  324. if c > 1:
  325. # Remove first match, flip it
  326. f = GameData.flip(match.pop(0))
  327. # Verify it is in there.
  328. # so we're not matching SSS/BBB
  329. if f in match:
  330. return True
  331. return False
  332. return False
  333. @staticmethod
  334. def color_pct(pct: int):
  335. if pct > 50:
  336. # green
  337. return "{0}{1:3}{2}".format(Fore.GREEN, pct, Style.RESET_ALL)
  338. elif pct > 25:
  339. return "{0}{1:3}{2}".format(
  340. merge(Fore.YELLOW + Style.BRIGHT), pct, Style.RESET_ALL
  341. )
  342. else:
  343. return "{0:3}".format(pct)
  344. @staticmethod
  345. def port_pct(port: dict):
  346. # Make sure these exist in the port data given.
  347. if all(x in port for x in ["fuel", "org", "equ"]):
  348. return "{0},{1},{2}%".format(
  349. GameData.color_pct(port["fuel"]["pct"]),
  350. GameData.color_pct(port["org"]["pct"]),
  351. GameData.color_pct(port["equ"]["pct"]),
  352. )
  353. else:
  354. return "---,---,---%"
  355. @staticmethod
  356. def port_show_part(sector: int, sector_port: dict):
  357. return "{0:5} ({1}) {2}".format(
  358. sector, sector_port["port"], GameData.port_pct(sector_port)
  359. )
  360. def port_above(self, port: dict, limit: int) -> bool:
  361. if all(x in port for x in ["fuel", "org", "equ"]):
  362. if all(
  363. x in port and port[x]["pct"] >= limit for x in ["fuel", "org", "equ"]
  364. ):
  365. return True
  366. else:
  367. return False
  368. # Port is unknown, we'll assume it is above the limit.
  369. return True
  370. def port_trade_show(self, sector: int, warp: int, limit: int = 90):
  371. sector_port = self.ports[sector]
  372. warp_port = self.ports[warp]
  373. if self.port_above(sector_port, limit) and self.port_above(warp_port, limit):
  374. # sector_pct = GameData.port_pct(sector_port)
  375. # warp_pct = GameData.port_pct(warp_port)
  376. return "{0} \xae\xcd\xaf {1}".format(
  377. GameData.port_show_part(sector, sector_port),
  378. GameData.port_show_part(warp, warp_port),
  379. )
  380. return None
  381. @staticmethod
  382. def port_show(sector: int, sector_port: dict, warp: int, warp_port: dict):
  383. # sector_pct = GameData.port_pct(sector_port)
  384. # warp_pct = GameData.port_pct(warp_port)
  385. return "{0} -=- {1}".format(
  386. GameData.port_show_part(sector, sector_port),
  387. GameData.port_show_part(warp, warp_port),
  388. )
  389. def find_nearest_tradepairs(self, sector: int, obj):
  390. """ find nearest tradepair
  391. When do we use good? When do we use ok?
  392. """
  393. searched = set()
  394. if sector not in self.warps:
  395. log.warn(":Sector {0} not in warps.".format(sector))
  396. obj.target_sector = None
  397. return None
  398. # Start with the current sector
  399. look = set((sector,))
  400. while len(look) > 0:
  401. log.warn("Searched [{0}]".format(searched))
  402. log.warn("Checking [{0}]".format(look))
  403. for s in look:
  404. if s in self.ports:
  405. # Ok, there's a port at least
  406. sp = self.ports[s]
  407. if sp["port"] in ("Special", "StarDock"):
  408. continue
  409. if self.port_burnt(sp):
  410. continue
  411. if "class" not in sp:
  412. continue
  413. sc = sp["class"]
  414. log.warn("{0} has warps {1}".format(s, self.warps[s]))
  415. # Ok, check for tradepairs.
  416. for w in self.warps[s]:
  417. if not w in self.warps:
  418. continue
  419. if not s in self.warps[w]:
  420. continue
  421. if not w in self.ports:
  422. continue
  423. # Ok, has possible port
  424. cp = self.ports[w]
  425. if cp["port"] in ("Special", "StarDock"):
  426. continue
  427. if self.port_burnt(cp):
  428. continue
  429. if "class" not in cp:
  430. continue
  431. cc = cp["class"]
  432. log.warn("{0} {1} - {2} {3}".format(s, sc, w, cc))
  433. if sc in (1, 5) and cc in (2, 4):
  434. # Good!
  435. log.warn("GOOD: {0}".format(s))
  436. obj.target_sector = s
  437. return s
  438. if sc in (2, 4) and cc in (1, 5):
  439. # Good!
  440. log.warn("GOOD: {0}".format(s))
  441. obj.target_sector = s
  442. return s
  443. # What about "OK" pairs?
  444. # Ok, not found here.
  445. searched.update(look)
  446. step_from = look
  447. look = set()
  448. for s in step_from:
  449. if s in self.warps:
  450. look.update(self.warps[s])
  451. # Look only contains warps we haven't searched
  452. look = look.difference(searched)
  453. yield
  454. obj.target_sector = None
  455. return None
  456. def find_nearest_selling(
  457. self, sector: int, selling: str, at_least: int = 100
  458. ) -> int:
  459. """ find nearest port that is selling at_least amount of this item
  460. selling is 'f', 'o', or 'e'.
  461. """
  462. names = {"e": "equ", "o": "org", "f": "fuel"}
  463. pos = {"f": 0, "o": 1, "e": 2}
  464. sell = names[selling[0].lower()]
  465. s_pos = pos[selling[0].lower()]
  466. log.warn(
  467. "find_nearest_selling({0}, {1}, {2}): {3}, {4}".format(
  468. sector, selling, at_least, sell, s_pos
  469. )
  470. )
  471. searched = set()
  472. if sector not in self.warps:
  473. log.warn("( {0}, {1}): sector not in warps".format(sector, selling))
  474. return 0
  475. # Start with the current sector
  476. look = set((sector,))
  477. while len(look) > 0:
  478. for s in look:
  479. if s in self.ports:
  480. # Ok, possibly?
  481. sp = self.ports[s]
  482. if sp["port"] in ("Special", "StarDock"):
  483. continue
  484. if sp["port"][s_pos] == "S":
  485. # Ok, they are selling!
  486. if sell in sp:
  487. if int(sp[sell]["units"]) >= at_least:
  488. log.warn(
  489. "find_nearest_selling( {0}, {1}): {2} {3} units".format(
  490. sector, selling, s, sp[sell]["units"]
  491. )
  492. )
  493. return s
  494. else:
  495. # We know they sell it, but we don't know units
  496. log.warn(
  497. "find_nearest_selling( {0}, {1}): {2} {3}".format(
  498. sector, selling, s, sp["port"]
  499. )
  500. )
  501. return s
  502. # Ok, not found here. Branch out.
  503. searched.update(look)
  504. step_from = look
  505. look = set()
  506. for s in step_from:
  507. if s in self.warps:
  508. look.update(self.warps[s])
  509. # look only contains warps we haven't searched
  510. look = look.difference(searched)
  511. # Ok, we have run out of places to search
  512. log.warn("find_nearest_selling( {0}, {1}) : failed".format(sector, selling))
  513. return 0