123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768 |
- import jsonlines
- import os
- import logging
- # from twisted.python import log
- from pprint import pprint
- from colorama import Fore, Back, Style
- log = logging.getLogger(__name__)
- def merge(color_string):
- """ Given a string of colorama ANSI, merge them if you can. """
- return color_string.replace("m\x1b[", ";")
- PORT_CLASSES = {
- 1: "BBS",
- 2: "BSB",
- 3: "SBB",
- 4: "SSB",
- 5: "SBS",
- 6: "BSS",
- 7: "SSS",
- 8: "BBB",
- }
- CLASSES_PORT = {v: k for k, v in PORT_CLASSES.items()}
- class GameData(object):
- def __init__(self, usergame: tuple):
- # Construct the GameData storage object
- self.usergame = usergame
- self.warps = {}
- self.ports = {}
- self.busts = [] # Added for evilTrade, just contains sector of port
- self.config = {}
- # 10 = 300 bytes
- self.warp_groups = 10
- # 3 = 560 bytes
- # 5 = 930 bytes
- self.port_groups = 3
- # Not sure, it's going to be small I know that
- self.bust_groups = 10
- def storage_filename(self):
- """ return filename
- username.lower _ game.upper.json
- """
- user, game = self.usergame
- return "{0}_{1}.json".format(user.lower(), game.upper())
- def reset_ports(self):
- self.ports = {}
- def reset_warps(self):
- self.warps = {}
- def reset_busts(self):
- self.busts = []
- def special_ports(self):
- """ Save the special class ports 0, 9 """
- return {
- p: self.ports[p]
- for p in self.ports
- if "class" in self.ports[p] and self.ports[p]["class"] in (0, 9)
- }
- def display(self):
- pprint(self.warps)
- pprint(self.ports)
- pprint(self.busts)
- def save(self, *_):
- """ save gamedata as jsonlines.
- Enable sort_keys=True to provide stable json data output.
- We also sorted(.keys()) to keep records in order.
- Note: There's a "bug" when serializing to json, keys must be strings!
- """
- filename = self.storage_filename()
- with jsonlines.open(filename, mode="w", sort_keys=True) as writer:
- # for warp, sectors in self.warps.items():
- c = {"config": self.config}
- writer.write(c)
- w = {"warp": {}}
- for warp in sorted(self.warps.keys()):
- sectors = self.warps[warp]
- # log.debug("save:", warp, sectors)
- sects = sorted(list(sectors)) # make a list
- w["warp"][warp] = sects
- if len(w["warp"]) >= self.warp_groups:
- writer.write(w)
- w = {"warp": {}}
- yield
- # log.debug(w)
- # writer.write(w)
- # yield
- if len(w["warp"]) > 0:
- writer.write(w)
- # for sector, port in self.ports.items():
- p = {"port": {}}
- for sector in sorted(self.ports.keys()):
- port = self.ports[sector]
- p["port"][sector] = port
- if len(p["port"]) >= self.port_groups:
- writer.write(p)
- p = {"port": {}}
- yield
- if len(p["port"]) > 0:
- writer.write(p)
- # Added for evil
- b = {"busts": []}
- for bust in sorted(self.busts):
- b["busts"].append(bust)
- if len(b["busts"]) >= self.bust_groups:
- writer.write(b)
- b = {"busts": []}
- yield
- if len(b["busts"]) > 0:
- writer.write(b)
- log.info(
- "Saved {0} {1}/{2}/{3}/{4}".format(
- filename,
- len(self.ports),
- len(self.warps),
- len(self.config),
- len(self.busts),
- )
- )
- def untwisted_save(self, *_):
- """ save gamedata as jsonlines.
- Enable sort_keys=True to provide stable json data output.
- We also sorted(.keys()) to keep records in order.
- Note: There's a "bug" when serializing to json, keys must be strings!
- """
- filename = self.storage_filename()
- with jsonlines.open(filename, mode="w", sort_keys=True) as writer:
- # for warp, sectors in self.warps.items():
- c = {"config": self.config}
- writer.write(c)
- w = {"warp": {}}
- for warp in sorted(self.warps.keys()):
- sectors = self.warps[warp]
- # log.debug("save:", warp, sectors)
- sects = sorted(list(sectors)) # make a list
- w["warp"][warp] = sects
- if len(w["warp"]) >= self.warp_groups:
- writer.write(w)
- w = {"warp": {}}
- # log.debug(w)
- # writer.write(w)
- # yield
- if len(w["warp"]) > 0:
- writer.write(w)
- # for sector, port in self.ports.items():
- p = {"port": {}}
- for sector in sorted(self.ports.keys()):
- port = self.ports[sector]
- p["port"][sector] = port
- if len(p["port"]) >= self.port_groups:
- writer.write(p)
- p = {"port": {}}
- if len(p["port"]) > 0:
- writer.write(p)
- # Added for evil
- b = {"busts": []}
- for bust in sorted(self.busts):
- b["busts"].append(bust)
- if len(b["busts"]) >= self.bust_groups:
- writer.write(b)
- b = {"busts": []}
- if len(b["busts"]) > 0:
- writer.write(b)
- log.info(
- "Saved {0} {1}/{2}/{3}/{4}".format(
- filename,
- len(self.ports),
- len(self.warps),
- len(self.config),
- len(self.busts),
- )
- )
- def load(self):
- filename = self.storage_filename()
- self.warps = {}
- self.ports = {}
- self.config = {}
- self.busts = []
- if os.path.exists(filename):
- # Load it
- with jsonlines.open(filename) as reader:
- for obj in reader:
- if "config" in obj:
- self.config.update(obj["config"])
- if "warp" in obj:
- for s, w in obj["warp"].items():
- # log.debug(s, w)
- self.warps[int(s)] = set(w)
- # self.warps.update(obj["warp"])
- if "port" in obj:
- for s, p in obj["port"].items():
- self.ports[int(s)] = p
- # self.ports.update(obj["port"])
- if "busts" in obj: # evil ports list
- for l in obj["busts"]:
- self.busts.append(l)
- yield
- log.info(
- "Loaded {0} {1}/{2}/{3}/{4}".format(
- filename,
- len(self.ports),
- len(self.warps),
- len(self.config),
- len(self.busts),
- )
- )
- def untwisted_load(self):
- """ Load file without twisted deferred
-
- This is for testing things out.
- """
- filename = self.storage_filename()
- self.warps = {}
- self.ports = {}
- self.config = {}
- if os.path.exists(filename):
- # Load it
- with jsonlines.open(filename) as reader:
- for obj in reader:
- if "config" in obj:
- self.config.update(obj["config"])
- if "warp" in obj:
- for s, w in obj["warp"].items():
- # log.debug(s, w)
- self.warps[int(s)] = set(w)
- # self.warps.update(obj["warp"])
- if "port" in obj:
- for s, p in obj["port"].items():
- self.ports[int(s)] = p
- # self.ports.update(obj["port"])
- if "busts" in obj:
- for l in obj["busts"]:
- self.busts.append(l)
- log.info(
- "Loaded {0} {1}/{2}/{3}/{4}".format(
- filename,
- len(self.ports),
- len(self.warps),
- len(self.config),
- len(self.busts),
- )
- )
- def get_warps(self, sector: int):
- if sector in self.warps:
- return self.warps[sector]
- return None
- def warp_to(self, source: int, *dest):
- """ connect sector source to destination.
- """
- log.debug("Warp {0} to {1}".format(source, dest))
- if source not in self.warps:
- self.warps[source] = set()
- for d in dest:
- if d not in self.warps[source]:
- self.warps[source].add(d)
- def get_config(self, key, default=None):
- if key in self.config:
- return self.config[key]
- else:
- if default is not None:
- self.config[key] = default
- return default
- def set_config(self, key, value):
- self.config.update({key: value})
- def set_port(self, sector: int, data: dict):
- log.debug("Port {0} : {1}".format(sector, data))
- if sector not in self.ports:
- self.ports[sector] = dict()
- self.ports[sector].update(data)
- if "port" not in self.ports[sector]:
- # incomplete port type - can we "complete" it?
- if all(x in self.ports for x in ["fuel", "org", "equ"]):
- # We have all of the port types, so:
- port = "".join(
- [self.ports[sector][x]["sale"] for x in ["fuel", "org", "equ"]]
- )
- self.ports[sector]["port"] = port
- self.ports[sector]["class"] = CLASSES_PORT[port]
- log.debug("completed {0} : {1}".format(sector, self.ports[sector]))
- def set_bust(self, sect: int):
- # Given sector we add it to busts (avoid using these ports)
- log.debug("Bust {0}".format(sect))
- if sect not in self.busts:
- self.busts.append(sect)
- log.debug("completed {0} : {1}".format(sect, self.busts))
- def port_buying(self, sector: int, cargo: str):
- """ Given a sector, is this port buying this?
- cargo is a char (F/O/E)
- """
- cargo_to_index = {"F": 0, "O": 1, "E": 2}
- cargo_types = ("fuel", "org", "equ")
- cargo = cargo[0]
- if sector not in self.ports:
- log.warn("port_buying( {0}, {1}): sector unknown!".format(sector, cargo))
- return False
- port = self.ports[sector]
- if "port" not in port:
- log.warn("port_buying( {0}, {1}): port unknown!".format(sector, cargo))
- return True
- if sector in self.busts: # Abort! This given sector is a busted port!
- log.warn(
- "port_buying({0}, {1}): sector contains a busted port!".format(
- sector, cargo
- )
- )
- return False
- cargo_index = cargo_to_index[cargo]
- if port["port"] in ("Special", "StarDock"):
- log.warn(
- "port_buying( {0}, {1}): not buying (is {2})".format(
- sector, cargo, port["port"]
- )
- )
- return False
- if port["port"][cargo_index] == "S":
- log.warn("port_buying( {0}, {1}): not buying cargo".format(sector, cargo))
- return False
- # ok, they buy it, but *WILL THEY* really buy it?
- cargo_key = cargo_types[cargo_index]
- if cargo_key in port:
- if int(port[cargo_key]["units"]) > 40:
- log.warn(
- "port_buying( {0}, {1}): Yes, buying {2}".format(
- sector, cargo, port[cargo_key]["sale"]
- )
- )
- return True
- else:
- log.warn(
- "port_buying( {0}, {1}): No, units < 40 {2}".format(
- sector, cargo, port[cargo_key]["sale"]
- )
- )
- return False
- else:
- log.warn(
- "port_buying( {0}, {1}): Yes, buying (but values unknown)".format(
- sector, cargo
- )
- )
- return True # unknown port, we're guess yes.
- return False
- @staticmethod
- def port_burnt(port: dict):
- """ Is this port burned out? """
- if all(x in port for x in ["fuel", "org", "equ"]):
- if all("pct" in port[x] for x in ["fuel", "org", "equ"]):
- if (
- port["equ"]["pct"] <= 20
- or port["fuel"]["pct"] <= 20
- or port["org"]["pct"] <= 20
- ):
- return True
- return False
- # Since we don't have any port information, hope for the best, assume it isn't burnt.
- return False
- @staticmethod
- def flip(buy_sell):
- # Invert B's and S's to determine if we can trade or not between ports.
- return buy_sell.replace("S", "W").replace("B", "S").replace("W", "B")
- @staticmethod
- def port_trading(port1, port2):
- # Given the port settings, can we trade between these?
- if port1 == port2:
- return False
- if port1 in ("Special", "StarDock") or port2 in ("Special", "StarDock"):
- return False
- if port1 in self.busts:
- log.warn("port_trading({0}, {1}) port1 is busted".format(port1, port2))
- return False
- elif port2 in self.busts:
- log.warn("port_trading({0}, {1}) port2 is busted".format(port1, port2))
- return False
- elif port1 in self.busts and port2 in self.busts:
- log.warn(
- "port_trading({0}, {1}) both port1 and port2 are busted".format(
- port1, port2
- )
- )
- return False
- p1 = [c for c in port1]
- p2 = [c for c in port2]
- rem = False
- for i in range(3):
- if p1[i] == p2[i]:
- p1[i] = "X"
- p2[i] = "X"
- rem = True
- if rem:
- j1 = "".join(p1).replace("X", "")
- j2 = "".join(p2).replace("X", "")
- if j1 == "BS" and j2 == "SB":
- return True
- if j1 == "SB" and j2 == "BS":
- return True
- rport1 = GameData.flip(port1)
- c = 0
- match = []
- for i in range(3):
- if rport1[i] == port2[i]:
- match.append(port2[i])
- c += 1
- if c > 1:
- # Remove first match, flip it
- f = GameData.flip(match.pop(0))
- # Verify it is in there.
- # so we're not matching SSS/BBB
- if f in match:
- return True
- return False
- return False
- @staticmethod
- def color_pct(pct: int):
- if pct > 50:
- # green
- return "{0}{1:3}{2}".format(Fore.GREEN, pct, Style.RESET_ALL)
- elif pct > 25:
- return "{0}{1:3}{2}".format(
- merge(Fore.YELLOW + Style.BRIGHT), pct, Style.RESET_ALL
- )
- else:
- return "{0:3}".format(pct)
- @staticmethod
- def port_pct(port: dict):
- # Make sure these exist in the port data given.
- if all(x in port for x in ["fuel", "org", "equ"]):
- return "{0},{1},{2}%".format(
- GameData.color_pct(port["fuel"]["pct"]),
- GameData.color_pct(port["org"]["pct"]),
- GameData.color_pct(port["equ"]["pct"]),
- )
- else:
- return "---,---,---%"
- @staticmethod
- def port_show_part(sector: int, sector_port: dict):
- return "{0:5} ({1}) {2}".format(
- sector, sector_port["port"], GameData.port_pct(sector_port)
- )
- def port_above(self, port: dict, limit: int) -> bool:
- if all(x in port for x in ["fuel", "org", "equ"]):
- if all(
- x in port and port[x]["pct"] >= limit for x in ["fuel", "org", "equ"]
- ):
- return True
- else:
- return False
- # Port is unknown, we'll assume it is above the limit.
- return True
- def port_trade_show(self, sector: int, warp: int, limit: int = 90):
- sector_port = self.ports[sector]
- warp_port = self.ports[warp]
- if self.port_above(sector_port, limit) and self.port_above(warp_port, limit):
- # sector_pct = GameData.port_pct(sector_port)
- # warp_pct = GameData.port_pct(warp_port)
- return "{0} \xae\xcd\xaf {1}".format(
- GameData.port_show_part(sector, sector_port),
- GameData.port_show_part(warp, warp_port),
- )
- return None
- @staticmethod
- def port_show(sector: int, sector_port: dict, warp: int, warp_port: dict):
- # sector_pct = GameData.port_pct(sector_port)
- # warp_pct = GameData.port_pct(warp_port)
- return "{0} -=- {1}".format(
- GameData.port_show_part(sector, sector_port),
- GameData.port_show_part(warp, warp_port),
- )
- def find_nearest_tradepairs(self, sector: int, obj):
- """ find nearest tradepair
- When do we use good? When do we use ok?
- """
- searched = set()
- if sector not in self.warps:
- log.warn(":Sector {0} not in warps.".format(sector))
- obj.target_sector = None
- return None
- if sector in self.busts:
- log.warn(":Sector {0} in busted".format(sector))
- obj.target_sector = None
- return None
- # Start with the current sector
- look = set((sector,))
- while len(look) > 0:
- log.warn("Searched [{0}]".format(searched))
- log.warn("Checking [{0}]".format(look))
- for s in look:
- if s in self.ports:
- # Ok, there's a port at least
- sp = self.ports[s]
- if sp["port"] in ("Special", "StarDock"):
- continue
- if self.port_burnt(sp):
- continue
- if "class" not in sp:
- continue
- if s in self.busts: # Check for busted port
- continue
- sc = sp["class"]
- if s not in self.warps:
- continue
- log.warn("{0} has warps {1}".format(s, self.warps[s]))
- # Ok, check for tradepairs.
- for w in self.warps[s]:
- if not w in self.warps:
- continue
- if not s in self.warps[w]:
- continue
- if not w in self.ports:
- continue
- # Ok, has possible port
- cp = self.ports[w]
- if cp["port"] in ("Special", "StarDock"):
- continue
- if self.port_burnt(cp):
- continue
- if "class" not in cp:
- continue
- if w in self.busts: # Check for busted
- continue
- cc = cp["class"]
- log.warn("{0} {1} - {2} {3}".format(s, sc, w, cc))
- if sc in (1, 5) and cc in (2, 4):
- # Good!
- log.warn("GOOD: {0}".format(s))
- obj.target_sector = s
- return s
- if sc in (2, 4) and cc in (1, 5):
- # Good!
- log.warn("GOOD: {0}".format(s))
- obj.target_sector = s
- return s
- # What about "OK" pairs?
- # Ok, not found here.
- searched.update(look)
- step_from = look
- look = set()
- for s in step_from:
- if s in self.warps:
- look.update(self.warps[s])
- # Look only contains warps we haven't searched
- look = look.difference(searched)
- yield
- obj.target_sector = None
- return None
- def find_nearest_evilpairs(self, sector: int, obj):
- """ find nearest evilpair
- XXB -=- XXB
-
- When do we use good? When do we use ok?
- """
- searched = set()
- if sector not in self.warps:
- log.warn(":Sector {0} not in warps.".format(sector))
- obj.target_sector = None
- return None
- if sector in self.busts:
- log.warn(":Sector {0} in busted".format(sector))
- obj.target_sector = None
- return None
- # Start with the current sector
- look = set((sector,))
- while len(look) > 0:
- log.warn("Searched [{0}]".format(searched))
- log.warn("Checking [{0}]".format(look))
- for s in look:
- if s in self.ports:
- # Ok, there's a port at least
- sp = self.ports[s]
- if sp["port"] in ("Special", "StarDock"):
- continue
- if self.port_burnt(sp):
- continue
- if "class" not in sp:
- continue
- if s in self.busts: # Check for busted port
- continue
- sc = sp["class"]
- if s not in self.warps:
- continue
- log.warn("{0} has warps {1}".format(s, self.warps[s]))
- # Ok, check for tradepairs.
- for w in self.warps[s]:
- if not w in self.warps:
- continue
- if not s in self.warps[w]:
- continue
- if not w in self.ports:
- continue
- # Ok, has possible port
- cp = self.ports[w]
- if cp["port"] in ("Special", "StarDock"):
- continue
- if self.port_burnt(cp):
- continue
- if "class" not in cp:
- continue
- if w in self.busts: # Check for busted
- continue
- cc = cp["class"]
- log.warn("{0} {1} - {2} {3}".format(s, sc, w, cc))
- if sc in (2, 3, 4, 8) and cc in (2, 3, 4, 8):
- # Good!
- log.warn("GOOD: {0}".format(s))
- obj.target_sector = s
- return s
- # What about "OK" pairs?
- # Ok, not found here.
- searched.update(look)
- step_from = look
- look = set()
- for s in step_from:
- if s in self.warps:
- look.update(self.warps[s])
- # Look only contains warps we haven't searched
- look = look.difference(searched)
- yield
- obj.target_sector = None
- return None
- def find_nearest_selling(
- self, sector: int, selling: str, at_least: int = 100
- ) -> int:
- """ find nearest port that is selling at_least amount of this item
- selling is 'f', 'o', or 'e'.
- """
- names = {"e": "equ", "o": "org", "f": "fuel"}
- pos = {"f": 0, "o": 1, "e": 2}
- sell = names[selling[0].lower()]
- s_pos = pos[selling[0].lower()]
- log.warn(
- "find_nearest_selling({0}, {1}, {2}): {3}, {4}".format(
- sector, selling, at_least, sell, s_pos
- )
- )
- searched = set()
- if sector not in self.warps:
- log.warn("( {0}, {1}): sector not in warps".format(sector, selling))
- return 0
- if sector in self.busts:
- log.warn(
- "({0}, {1}): sector is in busted ports list".format(sector, selling)
- )
- return 0
- # Start with the current sector
- look = set((sector,))
- while len(look) > 0:
- for s in look:
- if s in self.ports:
- # Ok, possibly?
- sp = self.ports[s]
- if sp["port"] in ("Special", "StarDock"):
- continue
- if s in self.busts: # Busted!
- continue
- if sp["port"][s_pos] == "S":
- # Ok, they are selling!
- if sell in sp:
- if int(sp[sell]["units"]) >= at_least:
- log.warn(
- "find_nearest_selling( {0}, {1}): {2} {3} units".format(
- sector, selling, s, sp[sell]["units"]
- )
- )
- return s
- else:
- # We know they sell it, but we don't know units
- log.warn(
- "find_nearest_selling( {0}, {1}): {2} {3}".format(
- sector, selling, s, sp["port"]
- )
- )
- return s
- # Ok, not found here. Branch out.
- searched.update(look)
- step_from = look
- look = set()
- for s in step_from:
- if s in self.warps:
- look.update(self.warps[s])
- # look only contains warps we haven't searched
- look = look.difference(searched)
- # Ok, we have run out of places to search
- log.warn("find_nearest_selling( {0}, {1}) : failed".format(sector, selling))
- return 0
|