|
- from twisted.internet import reactor
- from twisted.internet import task
- from twisted.internet import defer
- from colorama import Fore, Back, Style
- from twisted.python import log
- """ This entire object, will likely be replaced with something
- smaller, simpler, and much more reliable.
- """
- class MCP(object):
- def __init__(self, game):
- self.game = game
- self.queue_game = None
- # we don't have this .. yet!
- self.prompt = None
- self.observer = None
- self.keepalive = None
- # Port Data
- self.portdata = None
- self.portcycle = None
- def finishSetup(self):
- # if self.queue_game is None:
- self.queue_game = self.game.queue_game
- # if self.observer is None:
- self.observer = self.game.observer
- self.observer.connect("hotkey", self.activate)
- self.observer.connect("notyet", self.notyet)
- self.observer.connect("close", self.close)
- def close(self, _):
- if self.keepalive:
- if self.keepalive.running:
- self.keepalive.stop()
- def notyet(self, _):
- """ No, not yet! """
- nl = "\n\r"
- r = Style.RESET_ALL
- log.msg("NNY!")
- prompt = self.game.buffer
- self.queue_game.put(
- r
- + nl
- + Style.BRIGHT
- + "Proxy:"
- + Style.RESET_ALL
- + " I can't activate at this time."
- + nl
- )
- self.queue_game.put(prompt)
- def stayAwake(self):
- """ Send a space to the game to keep it alive/don't timeout. """
- log.msg("Gameserver, stay awake.")
- self.game.queue_player.put(" ")
- def startAwake(self):
- """ Start the task that keeps the game server alive.
- There is currently a bug in there somewhere, which causes it to
- duplicate:
- 2019-11-25 21:19:03-0500 [-] Gameserver, stay awake.
- 2019-11-25 21:19:14-0500 [-] Gameserver, stay awake.
- 2019-11-25 21:19:24-0500 [-] Gameserver, stay awake.
- 2019-11-25 21:19:27-0500 [-] Gameserver, stay awake.
- 2019-11-25 21:19:31-0500 [-] Gameserver, stay awake.
- 2019-11-25 21:19:33-0500 [-] Gameserver, stay awake.
- 2019-11-25 21:19:44-0500 [-] Gameserver, stay awake.
- 2019-11-25 21:19:54-0500 [-] Gameserver, stay awake.
- 2019-11-25 21:19:57-0500 [-] Gameserver, stay awake.
- 2019-11-25 21:20:01-0500 [-] Gameserver, stay awake.
- 2019-11-25 21:20:03-0500 [-] Gameserver, stay awake.
- ^ These aren't 30 seconds apart.
- These are being sent, even when the MCP is not active!
- """
- self.keepalive = task.LoopingCall(self.stayAwake)
- self.keepalive.start(30)
- def activate(self, _):
- log.msg("MCP menu called.")
- # We want the raw one, not the ANSI cleaned getPrompt.
- prompt = self.game.buffer
- if not self.prompt is None:
- # silly, we're already active
- log.msg("I think we're already active. Ignoring request.")
- return
- # Or will the caller setup/restore the prompt?
- self.prompt = prompt
- # queue_game = to player
- self.displayMenu()
- self.observer.connect("player", self.fromPlayer)
- # TODO: Add background "keepalive" event so the game doesn't time out on us.
- self.startAwake()
- # self.keepalive = task.LoopingCall(self.stayAwake)
- # self.keepalive.start(30)
- def displayMenu(self):
- nl = "\n\r"
- c = merge(Style.BRIGHT + Fore.YELLOW + Back.BLUE)
- r = Style.RESET_ALL
- c1 = merge(Style.BRIGHT + Fore.BLUE)
- c2 = merge(Style.NORMAL + Fore.BLUE)
- self.queue_game.put(nl + c + "TradeWars Proxy active." + r + nl)
- self.queue_game.put(" " + c1 + "D" + c2 + " - " + c1 + "Diagnostics" + nl)
- self.queue_game.put(
- " " + c1 + "T" + c2 + " - " + c1 + "Display current Time" + nl
- )
- self.queue_game.put(" " + c1 + "P" + c2 + " - " + c1 + "Port CIM Report" + nl)
- self.queue_game.put(" " + c1 + "S" + c2 + " - " + c1 + "Scripts" + nl)
- self.queue_game.put(" " + c1 + "X" + c2 + " - " + c1 + "eXit" + nl)
- self.queue_game.put(" " + c + "-=>" + r + " ")
- def fromPlayer(self, chunk):
- """ Data from player (in bytes). """
- chunk = chunk.decode("utf-8", "ignore")
- nl = "\n\r"
- c = merge(Style.BRIGHT + Fore.YELLOW + Back.BLUE)
- r = Style.RESET_ALL
- c1 = merge(Style.BRIGHT + Fore.BLUE)
- c2 = merge(Style.NORMAL + Fore.BLUE)
- key = chunk.upper()
- if key == "T":
- self.queue_game.put(c + key + r + nl)
- now = pendulum.now()
- log.msg("Time")
- self.queue_game.put(
- nl + c1 + "It is currently " + now.to_datetime_string() + "." + nl
- )
- self.displayMenu()
- elif key == "P":
- log.msg("Port")
- self.queue_game.put(c + key + r + nl)
- self.portReport()
- # self.queue_game.put(nl + c + "NO, NOT YET!" + r + nl)
- # self.displayMenu()
- elif key == "S":
- log.msg("Scripts")
- self.queue_game.put(c + key + r + nl)
- self.scripts()
- elif key == "D":
- self.queue_game.put(nl + "Diagnostics" + nl + "portdata:" + nl)
- line = pformat(self.portdata).replace("\n", "\n\r")
- self.queue_game.put(line + nl)
- self.displayMenu()
- elif key == "X":
- log.msg('"Quit, return to "normal". (Whatever that means!)')
- self.queue_game.put(c + key + r + nl)
- self.observer.disconnect("player", self.fromPlayer)
- self.queue_game.put(nl + c1 + "Returning to game" + c2 + "..." + r + nl)
- self.queue_game.put(self.prompt)
- self.prompt = None
- self.keepalive.stop()
- self.keepalive = None
- self.game.to_player = True
- else:
- if key.isprintable():
- self.queue_game.put(r + nl)
- self.queue_game.put("Excuse me? I don't understand '" + key + "'." + nl)
- self.displayMenu()
- def portReport(self):
- """ Activate CIM and request Port Report """
- self.game.to_player = False
- self.portdata = None
- self.observer.connect("prompt", self.portPrompt)
- self.observer.connect("game-line", self.portParse)
- self.game.queue_player.put("^")
- def portPrompt(self, prompt):
- if prompt == ": ":
- log.msg("CIM Prompt")
- if self.portdata is None:
- log.msg("R - Port Report")
- self.portdata = dict()
- self.game.queue_player.put("R")
- self.portcycle = cycle(["/", "-", "\\", "|"])
- self.queue_game.put(" ")
- else:
- log.msg("Q - Quit")
- self.game.queue_player.put("Q")
- self.portcycle = None
- def portBS(self, info):
- if info[0] == "-":
- bs = "B"
- else:
- bs = "S"
- return (bs, int(info[1:].strip()))
- def portParse(self, line):
- if line == "":
- return
- if line == ": ":
- return
- log.msg("parse line:", line)
- if line.startswith("Command [TL="):
- return
- if line == ": ENDINTERROG":
- log.msg("CIM Done")
- log.msg(pformat(self.portdata))
- self.queue_game.put("\b \b" + "\n\r")
- self.observer.disconnect("prompt", self.portPrompt)
- self.observer.disconnect("game-line", self.portParse)
- self.game.to_player = True
- # self.keepalive.start(30)
- self.startAwake()
- self.displayMenu()
- return
- # Give some sort of feedback to the user.
- if self.portcycle:
- if len(self.portdata) % 10 == 0:
- self.queue_game.put("\b" + next(self.portcycle))
- # Ok, we need to parse this line
- # 436 2870 100% - 1520 100% - 2820 100%
- # 2 1950 100% - 1050 100% 2780 100%
- # 5 2800 100% - 2330 100% - 1230 100%
- # 8 2890 100% 1530 100% - 2310 100%
- # 9 - 2160 100% 2730 100% - 2120 100%
- # 324 - 2800 100% 2650 100% - 2490 100%
- # 492 990 100% 900 100% 1660 100%
- # 890 1920 100% - 2140 100% 1480 100%
- # 1229 - 2870 100% - 1266 90% 728 68%
- # 1643 - 3000 100% - 3000 100% - 3000 100%
- # 1683 - 1021 97% 1460 100% - 2620 100%
- # 1898 - 1600 100% - 1940 100% - 1860 100%
- # 2186 1220 100% - 900 100% - 1840 100%
- # 2194 2030 100% - 1460 100% - 1080 100%
- # 2577 2810 100% - 1550 100% - 2350 100%
- # 2629 2570 100% - 2270 100% - 1430 100%
- # 3659 - 1720 100% 1240 100% - 2760 100%
- # 3978 - 920 100% 2560 100% - 2590 100%
- # 4302 348 25% - 2530 100% - 316 23%
- # 4516 - 1231 60% - 1839 75% 7 0%
- work = line.replace("%", "")
- parts = re.split(r"(?<=\d)\s", work)
- if len(parts) == 8:
- port = int(parts[0].strip())
- data = dict()
- data["fuel"] = dict()
- data["fuel"]["sale"], data["fuel"]["units"] = self.portBS(parts[1])
- data["fuel"]["pct"] = int(parts[2].strip())
- data["org"] = dict()
- data["org"]["sale"], data["org"]["units"] = self.portBS(parts[3])
- data["org"]["pct"] = int(parts[4].strip())
- data["equ"] = dict()
- data["equ"]["sale"], data["equ"]["units"] = self.portBS(parts[5])
- data["equ"]["pct"] = int(parts[6].strip())
- # Store what this port is buying/selling
- data["port"] = (
- data["fuel"]["sale"] + data["org"]["sale"] + data["equ"]["sale"]
- )
- # Convert BBS/SBB to Class number 1-8
- data["class"] = CLASSES_PORT[data["port"]]
- self.portdata[port] = data
- else:
- self.queue_game.put("?")
- log.msg("Line in question is: [{0}].".format(line))
- log.msg(repr(parts))
- def scripts(self):
- self.script = dict()
- self.observer.disconnect("player", self.fromPlayer)
- self.observer.connect("player", self.scriptFromPlayer)
- self.observer.connect("game-line", self.scriptLine)
- nl = "\n\r"
- c = merge(Style.BRIGHT + Fore.WHITE + Back.BLUE)
- r = Style.RESET_ALL
- c1 = merge(Style.BRIGHT + Fore.CYAN)
- c2 = merge(Style.NORMAL + Fore.CYAN)
- self.queue_game.put(nl + c + "TradeWars Proxy Script(s)" + r + nl)
- self.queue_game.put(" " + c1 + "P" + c2 + " - " + c1 + "Port Trading Pair" + nl)
- self.queue_game.put(" " + c + "-=>" + r + " ")
- def unscript(self):
- self.observer.connect("player", self.fromPlayer)
- self.observer.disconnect("player", self.scriptFromPlayer)
- self.observer.disconnect("game-line", self.scriptLine)
- self.displayMenu()
- def scriptLine(self, line):
- pass
- def scriptFromPlayer(self, chunk):
- """ Data from player (in bytes). """
- chunk = chunk.decode("utf-8", "ignore")
- nl = "\n\r"
- c = merge(Style.BRIGHT + Fore.WHITE + Back.BLUE)
- r = Style.RESET_ALL
- key = chunk.upper()
- if key == "Q":
- self.queue_game.put(c + key + r + nl)
- self.observer.connect("player", self.fromPlayer)
- self.observer.disconnect("player", self.scriptFromPlayer)
- self.observer.disconnect("game-line", self.scriptLine)
- self.observer.connect("game-line", self.portParse)
- self.displayMenu()
- elif key == "P":
- self.queue_game.put(c + key + r + nl)
- d = self.playerInput("Enter sector to trade to: ", 6)
- d.addCallback(self.save_sector)
- def save_sector(self, sector):
- log.msg("save_sector {0}".format(sector))
- if sector.strip() == "":
- self.queue_game.put("Script Aborted.")
- self.unscript()
- return
- s = int(sector.strip())
- self.script["sector"] = s
- d = self.playerInput("Enter times to execute script: ", 6)
- d.addCallback(self.save_loop)
- def save_loop(self, loop):
- log.msg("save_loop {0}".format(loop))
- if loop.strip() == "":
- self.queue_game.put("Script Aborted.")
- self.unscript()
- return
- l = int(loop.strip())
- self.script["loop"] = l
- d = self.playerInput("Enter markup/markdown percentage: ", 3)
- d.addCallback(self.save_mark)
- def save_mark(self, mark):
- log.msg("save_mark {0}".format(mark))
- if mark.strip() == "":
- self.script["mark"] = 5
- else:
- self.script["mark"] = int(mark.strip())
- # Ok, we have the values we need to run the Port trade script
- self.queue_game.put(pformat(self.script).replace("\n", "\n\r"))
- self.unscript()
- def playerInput(self, prompt, limit):
- """ Given a prompt and limit, this handles user input.
- This displays the prompt, and sets up the proper
- observers, while preserving the "current" ones.
- This returns a deferred, so you can chain the results
- of this.
- """
- log.msg("playerInput({0}, {1}".format(prompt, limit))
- nl = "\n\r"
- c = merge(Style.BRIGHT + Fore.WHITE + Back.BLUE)
- r = Style.RESET_ALL
- self.queue_game.put(r + nl + c + prompt)
- # This should set the background and show the size of the entry area.
- self.queue_game.put(" " * limit + "\b" * limit)
- d = defer.Deferred()
- self.player_input = d
- self.input_limit = limit
- self.input_input = ""
- self.save = self.observer.save()
- self.observer.connect("player", self.niceInput)
- # input_funcs = { 'player': [self.niceInput] }
- # self.observer.set_funcs(input_funcs)
- return d
- def niceInput(self, chunk):
- """ Data from player (in bytes). """
- chunk = chunk.decode("utf-8", "ignore")
- # log.msg("niceInput:", repr(chunk))
- r = Style.RESET_ALL
- for c in chunk:
- if c == "\b":
- # Backspace
- if len(self.input_input) > 0:
- self.queue_game.put("\b \b")
- self.input_input = self.input_input[0:-1]
- else:
- # Can't
- self.queue_game.put("\a")
- if c == "\r":
- # Ok, completed!
- self.queue_game.put(r + "\n\r")
- self.observer.load(self.save)
- self.save = None
- line = self.input_input
- log.msg("finishing niceInput {0}".format(line))
- # self.queue_game.put("[{0}]\n\r".format(line))
- self.input_input = ""
- # Ok, maybe this isn't the way to do this ...
- # self.player_input.callback(line)
- reactor.callLater(0, self.player_input.callback, line)
- self.player_input = None
- if c.isprintable():
- if len(self.input_input) + 1 <= self.input_limit:
- self.input_input += c
- self.queue_game.put(c)
- else:
- # Limit reached
- self.queue_game.put("\a")
|