import jsonlines import os import logging # from twisted.python import log from pprint import pprint from colorama import Fore, Back, Style log = logging.getLogger(__name__) def merge(color_string): """ Given a string of colorama ANSI, merge them if you can. """ return color_string.replace("m\x1b[", ";") PORT_CLASSES = { 1: "BBS", 2: "BSB", 3: "SBB", 4: "SSB", 5: "SBS", 6: "BSS", 7: "SSS", 8: "BBB", } CLASSES_PORT = {v: k for k, v in PORT_CLASSES.items()} class GameData(object): def __init__(self, usergame: tuple): # Construct the GameData storage object self.usergame = usergame self.warps = {} self.ports = {} self.config = {} # 10 = 300 bytes self.warp_groups = 10 # 3 = 560 bytes # 5 = 930 bytes self.port_groups = 3 def storage_filename(self): """ return filename username.lower _ game.upper.json """ user, game = self.usergame return "{0}_{1}.json".format(user.lower(), game.upper()) def reset_ports(self): self.ports = {} def reset_warps(self): self.warps = {} def display(self): pprint(self.warps) pprint(self.ports) def save(self, *_): """ save gamedata as jsonlines. Enable sort_keys=True to provide stable json data output. We also sorted(.keys()) to keep records in order. Note: There's a "bug" when serializing to json, keys must be strings! """ filename = self.storage_filename() with jsonlines.open(filename, mode="w", sort_keys=True) as writer: # for warp, sectors in self.warps.items(): c = {"config": self.config} writer.write(c) w = {"warp": {}} for warp in sorted(self.warps.keys()): sectors = self.warps[warp] # log.debug("save:", warp, sectors) sects = sorted(list(sectors)) # make a list w["warp"][warp] = sects if len(w["warp"]) >= self.warp_groups: writer.write(w) w = {"warp": {}} yield # log.debug(w) # writer.write(w) # yield if len(w["warp"]) > 1: writer.write(w) # for sector, port in self.ports.items(): p = {"port": {}} for sector in sorted(self.ports.keys()): port = self.ports[sector] p["port"][sector] = port if len(p["port"]) >= self.port_groups: writer.write(p) p = {"port": {}} yield if len(p["port"]) > 1: writer.write(p) log.info( "Saved {0} {1}/{2}/{3}".format( filename, len(self.ports), len(self.warps), len(self.config) ) ) def load(self): filename = self.storage_filename() self.warps = {} self.ports = {} self.config = {} if os.path.exists(filename): # Load it with jsonlines.open(filename) as reader: for obj in reader: if "config" in obj: self.config.update(obj["config"]) if "warp" in obj: for s, w in obj["warp"].items(): # log.debug(s, w) self.warps[int(s)] = set(w) # self.warps.update(obj["warp"]) if "port" in obj: for s, p in obj["port"].items(): self.ports[int(s)] = p # self.ports.update(obj["port"]) yield log.info( "Loaded {0} {1}/{2}/{3}".format( filename, len(self.ports), len(self.warps), len(self.config) ) ) def untwisted_load(self): """ Load file without twisted deferred This is for testing things out. """ filename = self.storage_filename() self.warps = {} self.ports = {} self.config = {} if os.path.exists(filename): # Load it with jsonlines.open(filename) as reader: for obj in reader: if "config" in obj: self.config.update(obj["config"]) if "warp" in obj: for s, w in obj["warp"].items(): # log.debug(s, w) self.warps[int(s)] = set(w) # self.warps.update(obj["warp"]) if "port" in obj: for s, p in obj["port"].items(): self.ports[int(s)] = p # self.ports.update(obj["port"]) log.info( "Loaded {0} {1}/{2}/{3}".format( filename, len(self.ports), len(self.warps), len(self.config) ) ) def get_warps(self, sector: int): if sector in self.warps: return self.warps[sector] return None def warp_to(self, source: int, *dest): """ connect sector source to destination. """ log.debug("Warp {0} to {1}".format(source, dest)) if source not in self.warps: self.warps[source] = set() for d in dest: if d not in self.warps[source]: self.warps[source].add(d) def get_config(self, key, default=None): if key in self.config: return self.config[key] return default def set_config(self, key, value): self.config.update({key: value}) def set_port(self, sector: int, data: dict): log.debug("Port {0} : {1}".format(sector, data)) if sector not in self.ports: self.ports[sector] = dict() self.ports[sector].update(data) if "port" not in self.ports[sector]: # incomplete port type - can we "complete" it? if all(x in self.ports for x in ["fuel", "org", "equ"]): # We have all of the port types, so: port = "".join( [self.ports[sector][x]["sale"] for x in ["fuel", "org", "equ"]] ) self.ports[sector]["port"] = port self.ports[sector]["class"] = CLASSES_PORT[port] log.debug("completed {0} : {1}".format(sector, self.ports[sector])) def port_buying(self, sector: int, cargo: str): """ Given a sector, is this port buying this? cargo is a char (F/O/E) """ cargo_to_index = {"F": 0, "O": 1, "E": 2} cargo_types = ("fuel", "org", "equ") cargo = cargo[0] if sector not in self.ports: log.warn("port_buying( {0}, {1}): sector unknown!".format(sector, cargo)) return False port = self.ports[sector] if "port" not in port: log.warn("port_buying( {0}, {1}): port unknown!".format(sector, cargo)) return True cargo_index = cargo_to_index[cargo] if port["port"][cargo_index] == "S": log.warn("port_buying( {0}, {1}): not buying cargo".format(sector, cargo)) return False # ok, they buy it, but *WILL THEY* really buy it? cargo_key = cargo_types[cargo_index] if cargo_key in port: if int(port[cargo_key]["units"]) > 40: log.warn( "port_buying( {0}, {1}): Yes, buying {2}".format( sector, cargo, port[cargo_key]["sale"] ) ) return True else: log.warn( "port_buying( {0}, {1}): Yes, buying (but values unknown)".format( sector, cargo ) ) return True # unknown port, we're guess yes. return False @staticmethod def port_burnt(port: dict): """ Is this port burned out? """ if all(x in port for x in ["fuel", "org", "equ"]): if all("pct" in port[x] for x in ["fuel", "org", "equ"]): if ( port["equ"]["pct"] <= 20 or port["fuel"]["pct"] <= 20 or port["org"]["pct"] <= 20 ): return True return False # Since we don't have any port information, hope for the best, assume it isn't burnt. return False @staticmethod def flip(buy_sell): # Invert B's and S's to determine if we can trade or not between ports. return buy_sell.replace("S", "W").replace("B", "S").replace("W", "B") @staticmethod def port_trading(port1, port2): # Given the port settings, can we trade between these? if port1 == port2: return False p1 = [c for c in port1] p2 = [c for c in port2] rem = False for i in range(3): if p1[i] == p2[i]: p1[i] = "X" p2[i] = "X" rem = True if rem: j1 = "".join(p1).replace("X", "") j2 = "".join(p2).replace("X", "") if j1 == "BS" and j2 == "SB": return True if j1 == "SB" and j2 == "BS": return True rport1 = GameData.flip(port1) c = 0 match = [] for i in range(3): if rport1[i] == port2[i]: match.append(port2[i]) c += 1 if c > 1: # Remove first match, flip it f = GameData.flip(match.pop(0)) # Verify it is in there. # so we're not matching SSS/BBB if f in match: return True return False return False @staticmethod def color_pct(pct: int): if pct > 50: # green return "{0}{1:3}{2}".format(Fore.GREEN, pct, Style.RESET_ALL) elif pct > 25: return "{0}{1:3}{2}".format( merge(Fore.YELLOW + Style.BRIGHT), pct, Style.RESET_ALL ) else: return "{0:3}".format(pct) @staticmethod def port_pct(port: dict): # Make sure these exist in the port data given. if all(x in port for x in ["fuel", "org", "equ"]): return "{0},{1},{2}%".format( GameData.color_pct(port["fuel"]["pct"]), GameData.color_pct(port["org"]["pct"]), GameData.color_pct(port["equ"]["pct"]), ) else: return "---,---,---%" @staticmethod def port_show_part(sector: int, sector_port: dict): return "{0:5} ({1}) {2}".format( sector, sector_port["port"], GameData.port_pct(sector_port) ) def port_trade_show(self, sector: int, warp: int): sector_port = self.ports[sector] warp_port = self.ports[warp] sector_pct = GameData.port_pct(sector_port) warp_pct = GameData.port_pct(warp_port) return "{0} \xae\xcd\xaf {1}".format( GameData.port_show_part(sector, sector_port), GameData.port_show_part(warp, warp_port), ) @staticmethod def port_show(sector: int, sector_port: dict, warp: int, warp_port: dict): sector_pct = GameData.port_pct(sector_port) warp_pct = GameData.port_pct(warp_port) return "{0} -=- {1}".format( GameData.port_show_part(sector, sector_port), GameData.port_show_part(warp, warp_port), )