import jsonlines import os import logging import pendulum # from twisted.python import log from copy import deepcopy from pprint import pprint from colorama import Fore, Back, Style import config log = logging.getLogger(__name__) def merge(color_string): """ Given a string of colorama ANSI, merge them if you can. """ return color_string.replace("m\x1b[", ";") PORT_CLASSES = { 1: "BBS", 2: "BSB", 3: "SBB", 4: "SSB", 5: "SBS", 6: "BSS", 7: "SSS", 8: "BBB", } CLASSES_PORT = {v: k for k, v in PORT_CLASSES.items()} class GameData(object): def __init__(self, usergame: tuple): # Construct the GameData storage object self.usergame = usergame self.warps = {} self.ports = {} self.busts = ( {} ) # Added for evilTrade, just contains sector of port and datetime self.config = {} # 10 = 300 bytes self.warp_groups = 10 # 3 = 560 bytes # 5 = 930 bytes self.port_groups = 3 # Not sure, I don't think it will be big. self.bust_groups = 5 self.activated = False # Did we activate the proxy? Not sure if this is the right spot to put it in. def storage_filename(self): """ return filename username.lower _ game.upper.json """ user, game = self.usergame if "base" in config.config: base = config.config["base"] + "_" else: base = "" return "{0}{1}_{2}.json".format(base, user.lower(), game.upper()) def reset_ports(self): self.ports = {} def reset_warps(self): self.warps = {} def reset_busts(self): # Ok well we won't ever use this since maint_busts() goes thru them all and filters out those that are 7 days or older self.busts = {} def special_ports(self): """ Save the special class ports 0, 9 """ return { p: self.ports[p] for p in self.ports if "class" in self.ports[p] and self.ports[p]["class"] in (0, 9) } def display(self): pprint(self.warps) pprint(self.ports) pprint(self.busts) def save(self, *_): """ save gamedata as jsonlines. Enable sort_keys=True to provide stable json data output. We also sorted(.keys()) to keep records in order. Note: There's a "bug" when serializing to json, keys must be strings! """ filename = self.storage_filename() with jsonlines.open(filename, mode="w", sort_keys=True) as writer: # for warp, sectors in self.warps.items(): c = {"config": self.config} writer.write(c) w = {"warp": {}} for warp in sorted(self.warps.keys()): sectors = self.warps[warp] # log.debug("save:", warp, sectors) sects = sorted(list(sectors)) # make a list w["warp"][warp] = sects if len(w["warp"]) >= self.warp_groups: writer.write(w) w = {"warp": {}} yield # log.debug(w) # writer.write(w) # yield if len(w["warp"]) > 0: writer.write(w) # for sector, port in self.ports.items(): p = {"port": {}} for sector in sorted(self.ports.keys()): port = self.ports[sector] p["port"][sector] = port if len(p["port"]) >= self.port_groups: writer.write(p) p = {"port": {}} yield if len(p["port"]) > 0: writer.write(p) # Added for evil b = {"busts": {}} for sector in sorted(self.busts.keys()): bust = self.busts[sector] b["busts"][sector] = bust if len(b["busts"]) >= self.bust_groups: writer.write(b) b = {"busts": {}} yield if len(b["busts"]) > 0: writer.write(b) log.info( "Saved {0} {1}/{2}/{3}/{4}".format( filename, len(self.ports), len(self.warps), len(self.config), len(self.busts), ) ) def untwisted_save(self, *_): """ save gamedata as jsonlines. Enable sort_keys=True to provide stable json data output. We also sorted(.keys()) to keep records in order. Note: There's a "bug" when serializing to json, keys must be strings! """ filename = self.storage_filename() with jsonlines.open(filename, mode="w", sort_keys=True) as writer: # for warp, sectors in self.warps.items(): c = {"config": self.config} writer.write(c) w = {"warp": {}} for warp in sorted(self.warps.keys()): sectors = self.warps[warp] # log.debug("save:", warp, sectors) sects = sorted(list(sectors)) # make a list w["warp"][warp] = sects if len(w["warp"]) >= self.warp_groups: writer.write(w) w = {"warp": {}} # log.debug(w) # writer.write(w) # yield if len(w["warp"]) > 0: writer.write(w) # for sector, port in self.ports.items(): p = {"port": {}} for sector in sorted(self.ports.keys()): port = self.ports[sector] p["port"][sector] = port if len(p["port"]) >= self.port_groups: writer.write(p) p = {"port": {}} if len(p["port"]) > 0: writer.write(p) # Added for evil b = {"busts": {}} for sector in sorted(self.busts.keys()): bust = self.busts[sector] b["busts"][sector] = bust if len(b["busts"]) >= self.bust_groups: writer.write(b) b = {"busts": {}} if len(b["busts"]) > 0: writer.write(b) log.info( "Saved {0} {1}/{2}/{3}/{4}".format( filename, len(self.ports), len(self.warps), len(self.config), len(self.busts), ) ) def load(self): filename = self.storage_filename() self.warps = {} self.ports = {} self.config = {} self.busts = {} if os.path.exists(filename): # Load it with jsonlines.open(filename) as reader: for obj in reader: if "config" in obj: self.config.update(obj["config"]) if "warp" in obj: for s, w in obj["warp"].items(): # log.debug(s, w) self.warps[int(s)] = set(w) # self.warps.update(obj["warp"]) if "port" in obj: for s, p in obj["port"].items(): self.ports[int(s)] = p # self.ports.update(obj["port"]) if "busts" in obj: # evil ports list for s, d in obj["busts"].items(): self.busts[int(s)] = d yield # Clean Busts after loading self.maint_busts() log.info( "Loaded {0} {1}/{2}/{3}/{4}".format( filename, len(self.ports), len(self.warps), len(self.config), len(self.busts), ) ) def untwisted_load(self): """ Load file without twisted deferred This is for testing things out. """ filename = self.storage_filename() self.warps = {} self.ports = {} self.config = {} self.busts = {} if os.path.exists(filename): # Load it with jsonlines.open(filename) as reader: for obj in reader: if "config" in obj: self.config.update(obj["config"]) if "warp" in obj: for s, w in obj["warp"].items(): # log.debug(s, w) self.warps[int(s)] = set(w) # self.warps.update(obj["warp"]) if "port" in obj: for s, p in obj["port"].items(): self.ports[int(s)] = p # self.ports.update(obj["port"]) if "busts" in obj: for s, d in obj["busts"].items(): self.busts[int(s)] = d # Clean Busts after loading self.maint_busts() log.info( "Loaded {0} {1}/{2}/{3}/{4}".format( filename, len(self.ports), len(self.warps), len(self.config), len(self.busts), ) ) def get_warps(self, sector: int): if sector in self.warps: return self.warps[sector] return None def warp_to(self, source: int, *dest): """ connect sector source to destination. """ log.debug("Warp {0} to {1}".format(source, dest)) if source not in self.warps: self.warps[source] = set() for d in dest: if d not in self.warps[source]: self.warps[source].add(d) def get_config(self, key, default=None): if key in self.config: return self.config[key] else: if default is not None: self.config[key] = default return default def set_config(self, key, value): self.config.update({key: value}) def set_port(self, sector: int, data: dict): log.debug("Port {0} : {1}".format(sector, data)) if sector not in self.ports: self.ports[sector] = dict() self.ports[sector].update(data) if "port" not in self.ports[sector]: # incomplete port type - can we "complete" it? if all(x in self.ports for x in ["fuel", "org", "equ"]): # We have all of the port types, so: port = "".join( [self.ports[sector][x]["sale"] for x in ["fuel", "org", "equ"]] ) self.ports[sector]["port"] = port self.ports[sector]["class"] = CLASSES_PORT[port] log.debug("completed {0} : {1}".format(sector, self.ports[sector])) def set_bust(self, sect: int): # Given sector we add it to busts (avoid using these ports) log.debug("Bust {0}".format(sect)) if sect not in self.busts: self.busts[sect] = str(pendulum.now()) log.debug("Done {0} | {1}".format(sect, self.busts)) else: log.debug("{0} already is in there".format(sect)) def maint_busts(self): # Checks the current date to see if any busts need to be cleared rightNow = pendulum.now() dropped = 0 # Add in debug message so we can see if we are running correctly new = deepcopy( self.busts ) # Get essentially a backup, one we will be changing without iterating over for s in self.busts: d = self.busts[s] # Pull string of DateTime object d = pendulum.parse(d) # Convert string to DateTime obj d = d.add(7) # Add 7 days if d <= rightNow: # Compare del new[s] # remove it from a 2nd dict dropped += 1 self.busts = new log.debug("Dropped {0} sectors from busted list".format(dropped)) def port_buying(self, sector: int, cargo: str): """ Given a sector, is this port buying this? cargo is a char (F/O/E) """ cargo_to_index = {"F": 0, "O": 1, "E": 2} cargo_types = ("fuel", "org", "equ") cargo = cargo[0] if sector not in self.ports: log.warn("port_buying( {0}, {1}): sector unknown!".format(sector, cargo)) return False port = self.ports[sector] if "port" not in port: log.warn("port_buying( {0}, {1}): port unknown!".format(sector, cargo)) return True if sector in self.busts: # Abort! This given sector is a busted port! log.warn( "port_buying({0}, {1}): sector contains a busted port!".format( sector, cargo ) ) return False cargo_index = cargo_to_index[cargo] if port["port"] in ("Special", "StarDock"): log.warn( "port_buying( {0}, {1}): not buying (is {2})".format( sector, cargo, port["port"] ) ) return False if port["port"][cargo_index] == "S": log.warn("port_buying( {0}, {1}): not buying cargo".format(sector, cargo)) return False # ok, they buy it, but *WILL THEY* really buy it? cargo_key = cargo_types[cargo_index] if cargo_key in port: if int(port[cargo_key]["units"]) > 40: log.warn( "port_buying( {0}, {1}): Yes, buying {2}".format( sector, cargo, port[cargo_key]["sale"] ) ) return True else: log.warn( "port_buying( {0}, {1}): No, units < 40 {2}".format( sector, cargo, port[cargo_key]["sale"] ) ) return False else: log.warn( "port_buying( {0}, {1}): Yes, buying (but values unknown)".format( sector, cargo ) ) return True # unknown port, we're guess yes. return False @staticmethod def port_burnt(port: dict): """ Is this port burned out? """ if all(x in port for x in ["fuel", "org", "equ"]): if all("pct" in port[x] for x in ["fuel", "org", "equ"]): if ( port["equ"]["pct"] <= 20 or port["fuel"]["pct"] <= 20 or port["org"]["pct"] <= 20 ): return True return False # Since we don't have any port information, hope for the best, assume it isn't burnt. return False @staticmethod def flip(buy_sell): # Invert B's and S's to determine if we can trade or not between ports. return buy_sell.replace("S", "W").replace("B", "S").replace("W", "B") @staticmethod def port_trading(port1, port2): # Given the port settings, can we trade between these? if port1 == port2: return False if port1 in ("Special", "StarDock") or port2 in ("Special", "StarDock"): return False # Oops, hey, we are given port settings not a sector a port is in, # So don't try to check it against the busted list. p1 = [c for c in port1] p2 = [c for c in port2] rem = False for i in range(3): if p1[i] == p2[i]: p1[i] = "X" p2[i] = "X" rem = True if rem: j1 = "".join(p1).replace("X", "") j2 = "".join(p2).replace("X", "") if j1 == "BS" and j2 == "SB": return True if j1 == "SB" and j2 == "BS": return True rport1 = GameData.flip(port1) c = 0 match = [] for i in range(3): if rport1[i] == port2[i]: match.append(port2[i]) c += 1 if c > 1: # Remove first match, flip it f = GameData.flip(match.pop(0)) # Verify it is in there. # so we're not matching SSS/BBB if f in match: return True return False return False @staticmethod def color_pct(pct: int): if pct > 50: # green return "{0}{1:3}{2}".format(Fore.GREEN, pct, Style.RESET_ALL) elif pct > 25: return "{0}{1:3}{2}".format( merge(Fore.YELLOW + Style.BRIGHT), pct, Style.RESET_ALL ) else: return "{0:3}".format(pct) @staticmethod def port_pct(port: dict): # Make sure these exist in the port data given. if all(x in port for x in ["fuel", "org", "equ"]): return "{0},{1},{2}%".format( GameData.color_pct(port["fuel"]["pct"]), GameData.color_pct(port["org"]["pct"]), GameData.color_pct(port["equ"]["pct"]), ) else: return "---,---,---%" @staticmethod def port_show_part(sector: int, sector_port: dict): return "{0:5} ({1}) {2}".format( sector, sector_port["port"], GameData.port_pct(sector_port) ) def port_above(self, port: dict, limit: int) -> bool: if all(x in port for x in ["fuel", "org", "equ"]): if all( x in port and port[x]["pct"] >= limit for x in ["fuel", "org", "equ"] ): return True else: return False # Port is unknown, we'll assume it is above the limit. return True def port_trade_show(self, sector: int, warp: int, limit: int = 90): sector_port = self.ports[sector] warp_port = self.ports[warp] if self.port_above(sector_port, limit) and self.port_above(warp_port, limit): # sector_pct = GameData.port_pct(sector_port) # warp_pct = GameData.port_pct(warp_port) return "{0} \xae\xcd\xaf {1}".format( GameData.port_show_part(sector, sector_port), GameData.port_show_part(warp, warp_port), ) return None @staticmethod def port_show(sector: int, sector_port: dict, warp: int, warp_port: dict): # sector_pct = GameData.port_pct(sector_port) # warp_pct = GameData.port_pct(warp_port) return "{0} -=- {1}".format( GameData.port_show_part(sector, sector_port), GameData.port_show_part(warp, warp_port), ) def find_nearest_tradepairs(self, sector: int, obj): """ find nearest tradepair When do we use good? When do we use ok? """ searched = set() if sector not in self.warps: log.warn(":Sector {0} not in warps.".format(sector)) obj.target_sector = None return None if sector in self.busts: log.warn(":Sector {0} in busted".format(sector)) obj.target_sector = None return None # Start with the current sector look = set((sector,)) while len(look) > 0: log.warn("Searched [{0}]".format(searched)) log.warn("Checking [{0}]".format(look)) for s in look: if s in self.ports: # Ok, there's a port at least sp = self.ports[s] if sp["port"] in ("Special", "StarDock"): continue if self.port_burnt(sp): continue if "class" not in sp: continue if s in self.busts: # Check for busted port continue sc = sp["class"] if s not in self.warps: continue log.warn("{0} has warps {1}".format(s, self.warps[s])) # Ok, check for tradepairs. for w in self.warps[s]: if not w in self.warps: continue if not s in self.warps[w]: continue if not w in self.ports: continue # Ok, has possible port cp = self.ports[w] if cp["port"] in ("Special", "StarDock"): continue if self.port_burnt(cp): continue if "class" not in cp: continue if w in self.busts: # Check for busted continue cc = cp["class"] log.warn("{0} {1} - {2} {3}".format(s, sc, w, cc)) if sc in (1, 5) and cc in (2, 4): # Good! log.warn("GOOD: {0}".format(s)) obj.target_sector = s return s if sc in (2, 4) and cc in (1, 5): # Good! log.warn("GOOD: {0}".format(s)) obj.target_sector = s return s # What about "OK" pairs? # Ok, not found here. searched.update(look) step_from = look look = set() for s in step_from: if s in self.warps: look.update(self.warps[s]) # Look only contains warps we haven't searched look = look.difference(searched) yield obj.target_sector = None return None def find_nearest_evilpairs(self, sector: int, obj): """ find nearest evilpair XXB -=- XXB When do we use good? When do we use ok? """ searched = set() if sector not in self.warps: log.warn(":Sector {0} not in warps.".format(sector)) obj.target_sector = None return None if sector in self.busts: log.warn(":Sector {0} in busted".format(sector)) obj.target_sector = None return None # Start with the current sector look = set((sector,)) while len(look) > 0: log.warn("Searched [{0}]".format(searched)) log.warn("Checking [{0}]".format(look)) for s in look: if s in self.ports: # Ok, there's a port at least sp = self.ports[s] if sp["port"] in ("Special", "StarDock"): continue if self.port_burnt(sp): continue if "class" not in sp: continue if s in self.busts: # Check for busted port continue sc = sp["class"] if s not in self.warps: continue log.warn("{0} has warps {1}".format(s, self.warps[s])) # Ok, check for tradepairs. for w in self.warps[s]: if not w in self.warps: continue if not s in self.warps[w]: continue if not w in self.ports: continue # Ok, has possible port cp = self.ports[w] if cp["port"] in ("Special", "StarDock"): continue if self.port_burnt(cp): continue if "class" not in cp: continue if w in self.busts: # Check for busted continue cc = cp["class"] log.warn("{0} {1} - {2} {3}".format(s, sc, w, cc)) if sc in (2, 3, 4, 8) and cc in (2, 3, 4, 8): # Good! log.warn("GOOD: {0}".format(s)) obj.target_sector = s return s # What about "OK" pairs? # Ok, not found here. searched.update(look) step_from = look look = set() for s in step_from: if s in self.warps: look.update(self.warps[s]) # Look only contains warps we haven't searched look = look.difference(searched) yield obj.target_sector = None return None def find_nearest_selling( self, sector: int, selling: str, at_least: int = 100 ) -> int: """ find nearest port that is selling at_least amount of this item selling is 'f', 'o', or 'e'. """ names = {"e": "equ", "o": "org", "f": "fuel"} pos = {"f": 0, "o": 1, "e": 2} sell = names[selling[0].lower()] s_pos = pos[selling[0].lower()] log.warn( "find_nearest_selling({0}, {1}, {2}): {3}, {4}".format( sector, selling, at_least, sell, s_pos ) ) searched = set() if sector not in self.warps: log.warn("( {0}, {1}): sector not in warps".format(sector, selling)) return 0 if sector in self.busts: log.warn( "({0}, {1}): sector is in busted ports list".format(sector, selling) ) return 0 # Start with the current sector look = set((sector,)) while len(look) > 0: for s in look: if s in self.ports: # Ok, possibly? sp = self.ports[s] if sp["port"] in ("Special", "StarDock"): continue if s in self.busts: # Busted! continue if sp["port"][s_pos] == "S": # Ok, they are selling! if sell in sp: if int(sp[sell]["units"]) >= at_least: log.warn( "find_nearest_selling( {0}, {1}): {2} {3} units".format( sector, selling, s, sp[sell]["units"] ) ) return s else: # We know they sell it, but we don't know units log.warn( "find_nearest_selling( {0}, {1}): {2} {3}".format( sector, selling, s, sp["port"] ) ) return s # Ok, not found here. Branch out. searched.update(look) step_from = look look = set() for s in step_from: if s in self.warps: look.update(self.warps[s]) # look only contains warps we haven't searched look = look.difference(searched) # Ok, we have run out of places to search log.warn("find_nearest_selling( {0}, {1}) : failed".format(sector, selling)) return 0