|
- import jsonlines
- import os
- import logging
- # from twisted.python import log
- from pprint import pprint
- from colorama import Fore, Back, Style
- log = logging.getLogger(__name__)
- def merge(color_string):
- """ Given a string of colorama ANSI, merge them if you can. """
- return color_string.replace("m\x1b[", ";")
- PORT_CLASSES = {
- 1: "BBS",
- 2: "BSB",
- 3: "SBB",
- 4: "SSB",
- 5: "SBS",
- 6: "BSS",
- 7: "SSS",
- 8: "BBB",
- }
- CLASSES_PORT = {v: k for k, v in PORT_CLASSES.items()}
- class GameData(object):
- def __init__(self, usergame: tuple):
- # Construct the GameData storage object
- self.usergame = usergame
- self.warps = {}
- self.ports = {}
- self.config = {}
- # 10 = 300 bytes
- self.warp_groups = 10
- # 3 = 560 bytes
- # 5 = 930 bytes
- self.port_groups = 3
- def storage_filename(self):
- """ return filename
- username.lower _ game.upper.json
- """
- user, game = self.usergame
- return "{0}_{1}.json".format(user.lower(), game.upper())
- def reset_ports(self):
- self.ports = {}
- def reset_warps(self):
- self.warps = {}
- def display(self):
- pprint(self.warps)
- pprint(self.ports)
- def save(self, *_):
- """ save gamedata as jsonlines.
- Enable sort_keys=True to provide stable json data output.
- We also sorted(.keys()) to keep records in order.
- Note: There's a "bug" when serializing to json, keys must be strings!
- """
- filename = self.storage_filename()
- with jsonlines.open(filename, mode="w", sort_keys=True) as writer:
- # for warp, sectors in self.warps.items():
- c = {"config": self.config}
- writer.write(c)
- w = {"warp": {}}
- for warp in sorted(self.warps.keys()):
- sectors = self.warps[warp]
- # log.debug("save:", warp, sectors)
- sects = sorted(list(sectors)) # make a list
- w["warp"][warp] = sects
- if len(w["warp"]) >= self.warp_groups:
- writer.write(w)
- w = {"warp": {}}
- yield
- # log.debug(w)
- # writer.write(w)
- # yield
- if len(w["warp"]) > 1:
- writer.write(w)
- # for sector, port in self.ports.items():
- p = {"port": {}}
- for sector in sorted(self.ports.keys()):
- port = self.ports[sector]
- p["port"][sector] = port
- if len(p["port"]) >= self.port_groups:
- writer.write(p)
- p = {"port": {}}
- yield
- if len(p["port"]) > 1:
- writer.write(p)
- log.info(
- "Saved {0} {1}/{2}/{3}".format(
- filename, len(self.ports), len(self.warps), len(self.config)
- )
- )
- def load(self):
- filename = self.storage_filename()
- self.warps = {}
- self.ports = {}
- self.config = {}
- if os.path.exists(filename):
- # Load it
- with jsonlines.open(filename) as reader:
- for obj in reader:
- if "config" in obj:
- self.config.update(obj["config"])
- if "warp" in obj:
- for s, w in obj["warp"].items():
- # log.debug(s, w)
- self.warps[int(s)] = set(w)
- # self.warps.update(obj["warp"])
- if "port" in obj:
- for s, p in obj["port"].items():
- self.ports[int(s)] = p
- # self.ports.update(obj["port"])
- yield
- log.info(
- "Loaded {0} {1}/{2}/{3}".format(
- filename, len(self.ports), len(self.warps), len(self.config)
- )
- )
- def untwisted_load(self):
- """ Load file without twisted deferred
-
- This is for testing things out.
- """
- filename = self.storage_filename()
- self.warps = {}
- self.ports = {}
- self.config = {}
- if os.path.exists(filename):
- # Load it
- with jsonlines.open(filename) as reader:
- for obj in reader:
- if "config" in obj:
- self.config.update(obj["config"])
- if "warp" in obj:
- for s, w in obj["warp"].items():
- # log.debug(s, w)
- self.warps[int(s)] = set(w)
- # self.warps.update(obj["warp"])
- if "port" in obj:
- for s, p in obj["port"].items():
- self.ports[int(s)] = p
- # self.ports.update(obj["port"])
- log.info(
- "Loaded {0} {1}/{2}/{3}".format(
- filename, len(self.ports), len(self.warps), len(self.config)
- )
- )
- def get_warps(self, sector: int):
- if sector in self.warps:
- return self.warps[sector]
- return None
- def warp_to(self, source: int, *dest):
- """ connect sector source to destination.
- """
- log.debug("Warp {0} to {1}".format(source, dest))
- if source not in self.warps:
- self.warps[source] = set()
- for d in dest:
- if d not in self.warps[source]:
- self.warps[source].add(d)
- def get_config(self, key, default=None):
- if key in self.config:
- return self.config[key]
- return default
- def set_config(self, key, value):
- self.config.update({key: value})
- def set_port(self, sector: int, data: dict):
- log.debug("Port {0} : {1}".format(sector, data))
- if sector not in self.ports:
- self.ports[sector] = dict()
- self.ports[sector].update(data)
- if "port" not in self.ports[sector]:
- # incomplete port type - can we "complete" it?
- if all(x in self.ports for x in ["fuel", "org", "equ"]):
- # We have all of the port types, so:
- port = "".join(
- [self.ports[sector][x]["sale"] for x in ["fuel", "org", "equ"]]
- )
- self.ports[sector]["port"] = port
- self.ports[sector]["class"] = CLASSES_PORT[port]
- log.debug("completed {0} : {1}".format(sector, self.ports[sector]))
- def port_buying(self, sector: int, cargo: str):
- """ Given a sector, is this port buying this?
- cargo is a char (F/O/E)
- """
- cargo_to_index = {"F": 0, "O": 1, "E": 2}
- cargo_types = ("fuel", "org", "equ")
- cargo = cargo[0]
- if sector not in self.ports:
- log.warn("port_buying( {0}, {1}): sector unknown!".format(sector, cargo))
- return False
- port = self.ports[sector]
- if "port" not in port:
- log.warn("port_buying( {0}, {1}): port unknown!".format(sector, cargo))
- return True
- cargo_index = cargo_to_index[cargo]
- if port["port"][cargo_index] == "S":
- log.warn("port_buying( {0}, {1}): not buying cargo".format(sector, cargo))
- return False
- # ok, they buy it, but *WILL THEY* really buy it?
- cargo_key = cargo_types[cargo_index]
- if cargo_key in port:
- if int(port[cargo_key]["units"]) > 40:
- log.warn(
- "port_buying( {0}, {1}): Yes, buying {2}".format(
- sector, cargo, port[cargo_key]["sale"]
- )
- )
- return True
- else:
- log.warn(
- "port_buying( {0}, {1}): Yes, buying (but values unknown)".format(
- sector, cargo
- )
- )
- return True # unknown port, we're guess yes.
- return False
- @staticmethod
- def port_burnt(port: dict):
- """ Is this port burned out? """
- if all(x in port for x in ["fuel", "org", "equ"]):
- if all("pct" in port[x] for x in ["fuel", "org", "equ"]):
- if (
- port["equ"]["pct"] <= 20
- or port["fuel"]["pct"] <= 20
- or port["org"]["pct"] <= 20
- ):
- return True
- return False
- # Since we don't have any port information, hope for the best, assume it isn't burnt.
- return False
- @staticmethod
- def flip(buy_sell):
- # Invert B's and S's to determine if we can trade or not between ports.
- return buy_sell.replace("S", "W").replace("B", "S").replace("W", "B")
- @staticmethod
- def port_trading(port1, port2):
- # Given the port settings, can we trade between these?
- if port1 == port2:
- return False
- p1 = [c for c in port1]
- p2 = [c for c in port2]
- rem = False
- for i in range(3):
- if p1[i] == p2[i]:
- p1[i] = "X"
- p2[i] = "X"
- rem = True
- if rem:
- j1 = "".join(p1).replace("X", "")
- j2 = "".join(p2).replace("X", "")
- if j1 == "BS" and j2 == "SB":
- return True
- if j1 == "SB" and j2 == "BS":
- return True
- rport1 = GameData.flip(port1)
- c = 0
- match = []
- for i in range(3):
- if rport1[i] == port2[i]:
- match.append(port2[i])
- c += 1
- if c > 1:
- # Remove first match, flip it
- f = GameData.flip(match.pop(0))
- # Verify it is in there.
- # so we're not matching SSS/BBB
- if f in match:
- return True
- return False
- return False
- @staticmethod
- def color_pct(pct: int):
- if pct > 50:
- # green
- return "{0}{1:3}{2}".format(Fore.GREEN, pct, Style.RESET_ALL)
- elif pct > 25:
- return "{0}{1:3}{2}".format(
- merge(Fore.YELLOW + Style.BRIGHT), pct, Style.RESET_ALL
- )
- else:
- return "{0:3}".format(pct)
- @staticmethod
- def port_pct(port: dict):
- # Make sure these exist in the port data given.
- if all(x in port for x in ["fuel", "org", "equ"]):
- return "{0},{1},{2}%".format(
- GameData.color_pct(port["fuel"]["pct"]),
- GameData.color_pct(port["org"]["pct"]),
- GameData.color_pct(port["equ"]["pct"]),
- )
- else:
- return "---,---,---%"
- @staticmethod
- def port_show_part(sector: int, sector_port: dict):
- return "{0:5} ({1}) {2}".format(
- sector, sector_port["port"], GameData.port_pct(sector_port)
- )
- def port_trade_show(self, sector: int, warp: int):
- sector_port = self.ports[sector]
- warp_port = self.ports[warp]
- sector_pct = GameData.port_pct(sector_port)
- warp_pct = GameData.port_pct(warp_port)
- return "{0} \xae\xcd\xaf {1}".format(
- GameData.port_show_part(sector, sector_port),
- GameData.port_show_part(warp, warp_port),
- )
- @staticmethod
- def port_show(sector: int, sector_port: dict, warp: int, warp_port: dict):
- sector_pct = GameData.port_pct(sector_port)
- warp_pct = GameData.port_pct(warp_port)
- return "{0} -=- {1}".format(
- GameData.port_show_part(sector, sector_port),
- GameData.port_show_part(warp, warp_port),
- )
|