from twisted.internet import reactor from twisted.internet import task from twisted.internet import defer from colorama import Fore, Back, Style from twisted.python import log from itertools import cycle import pendulum from pprint import pformat def merge(color_string): """ Given a string of colorama ANSI, merge them if you can. """ return color_string.replace("m\x1b[", ";") class PlayerInput(object): """ Player Input Example: from flexible import PlayerInput ask = PlayerInput(self.game) # abort_blank means, if the input field is blank, abort. Use error_back. d = ask.prompt("What is your quest?", 40, name="quest", abort_blank=True) # Display the user's input / but not needed. d.addCallback(ask.output) d.addCallback( lambda ignore: ask.prompt( "What is your favorite color?", 10, name="color" ) ) d.addCallback(ask.output) d.addCallback( lambda ignore: ask.prompt( "What is your least favorite number?", 12, name="number", digits=True, ) ) d.addCallback(ask.output) def show_values(show): log.msg(show) self.queue_game.put(pformat(show).replace("\n", "\n\r") + self.nl) d.addCallback(lambda ignore: show_values(ask.keep)) d.addCallback(self.welcome_back) # On error, just return back d.addErrback(self.welcome_back) """ def __init__(self, game): # I think game gives us access to everything we need self.game = game self.observer = self.game.observer self.save = None self.deferred = None self.queue_game = game.queue_game self.keep = {} # default colors self.c = merge(Style.BRIGHT + Fore.WHITE + Back.BLUE) self.cp = merge(Style.BRIGHT + Fore.YELLOW + Back.BLUE) # useful consts self.r = Style.RESET_ALL self.nl = "\n\r" self.bsb = "\b \b" self.keepalive = None def color(self, c): self.c = c def colorp(self, cp): self.cp = cp def alive(self): log.msg("PlayerInput.alive()") self.game.queue_player.put(" ") def prompt(self, user_prompt, limit, **kw): """ Generate prompt for user input. Note: This returns deferred. prompt = text displayed. limit = # of characters allowed. default = (text to default to) keywords: abort_blank : Abort if they give us blank text. name : Stores the input in self.keep dict. """ log.msg("PlayerInput({0}, {1}, {2}".format(user_prompt, limit, kw)) self.limit = limit self.input = "" self.kw = kw assert self.save is None assert self.keepalive is None # Note: This clears out the server "keep alive" self.save = self.observer.save() self.observer.connect("player", self.get_input) self.keepalive = task.LoopingCall(self.alive) self.keepalive.start(30) # We need to "hide" the game output. # Otherwise it WITH mess up the user input display. self.to_player = self.game.to_player self.game.to_player = False # Display prompt # self.queue_game.put(self.r + self.nl + self.c + user_prompt + " " + self.cp) self.queue_game.put(self.r + self.c + user_prompt + self.r + " " + self.cp) # Set "Background of prompt" self.queue_game.put(" " * limit + "\b" * limit) assert self.deferred is None d = defer.Deferred() self.deferred = d log.msg("Return deferred ...", self.deferred) return d def get_input(self, chunk): """ Data from player (in bytes) """ chunk = chunk.decode("utf-8", "ignore") for ch in chunk: if ch == "\b": if len(self.input) > 0: self.queue_game.put(self.bsb) self.input = self.input[0:-1] else: self.queue_game.put("\a") elif ch == "\r": self.queue_game.put(self.r + self.nl) log.msg("Restore observer dispatch", self.save) assert not self.save is None self.observer.load(self.save) self.save = None log.msg("Disable keepalive") self.keepalive.stop() self.keepalive = None line = self.input self.input = "" assert not self.deferred is None self.game.to_player = self.to_player # If they gave us the keyword name, save the value as that name if "name" in self.kw: self.keep[self.kw["name"]] = line if "abort_blank" in self.kw and self.kw["abort_blank"]: # Abort on blank input if line.strip() == "": # Yes, input is blank, abort. log.msg("errback, abort_blank") reactor.callLater( 0, self.deferred.errback, Exception("abort_blank") ) self.deferred = None return # Ok, use deferred.callback, or reactor.callLater? # self.deferred.callback(line) reactor.callLater(0, self.deferred.callback, line) self.deferred = None return elif ch.isprintable(): # Printable, but is it acceptable? if "digits" in self.kw: if not ch.isdigit(): self.queue_game.put("\a") continue if len(self.input) + 1 <= self.limit: self.input += ch self.queue_game.put(ch) else: self.queue_game.put("\a") def output(self, line): """ A default display of what they just input. """ log.msg("PlayerInput.output({0})".format(line)) self.game.queue_game.put(self.r + "[{0}]".format(line) + self.nl) return line PORT_CLASSES = { 1: "BBS", 2: "BSB", 3: "SBB", 4: "SSB", 5: "SBS", 6: "BSS", 7: "SSS", 8: "BBB", } CLASSES_PORT = {v: k for k, v in PORT_CLASSES.items()} import re class CIMPortReport(object): """ Parse data from CIM Port Report Example: from flexible import CIMPortReport report = CIMPortReport(self.game) d = report.whenDone() d.addCallback(self.port_report) d.addErrback(self.welcome_back) def port_report(self, portdata): self.portdata = portdata self.queue_game.put("Loaded {0} records.".format(len(portdata)) + self.nl) self.welcome_back() def welcome_back(self,*_): ... restore keep alive timers, etc. """ def __init__(self, game): self.game = game self.queue_game = game.queue_game self.queue_player = game.queue_player self.observer = game.observer # Yes, at this point we would activate self.prompt = game.buffer self.save = self.observer.save() # I actually don't want the player input, but I'll grab it anyway. self.observer.connect("player", self.player) self.observer.connect("prompt", self.game_prompt) self.observer.connect("game-line", self.game_line) # If we want it, it's here. self.defer = None self.to_player = self.game.to_player log.msg("to_player (stored)", self.to_player) # Hide what's happening from the player self.game.to_player = False self.queue_player.put("^") # Activate CIM self.state = 1 self.portdata = {} self.portcycle = cycle(["/", "-", "\\", "|"]) def game_prompt(self, prompt): if prompt == ": ": if self.state == 1: # Ok, then we're ready to request the port report self.portcycle = cycle(["/", "-", "\\", "|"]) self.queue_player.put("R") self.state = 2 elif self.state == 2: self.queue_player.put("Q") self.state = 3 if re.match(r"Command \[TL=.* \(\?=Help\)\? :", prompt): if self.state == 3: # Ok, time to exit # exit from this... self.game.to_player = self.to_player self.observer.load(self.save) self.save = None self.game.portdata = self.portdata self.queue_game.put("\b \b\r\n") if not self.defer is None: self.defer.callback(self.portdata) self.defer = None def game_line(self, line): if line == "" or line == ": ": return if line == ": ENDINTERROG": return # This should be the CIM Report Data -- parse it if self.portcycle: if len(self.portdata) % 10 == 0: self.queue_game.put("\b" + next(self.portcycle)) work = line.replace("%", "") parts = re.split(r"(?<=\d)\s", work) if len(parts) == 8: port = int(parts[0].strip()) data = dict() def portBS(info): if info[0] == "-": bs = "B" else: bs = "S" return (bs, int(info[1:].strip())) data["fuel"] = dict() data["fuel"]["sale"], data["fuel"]["units"] = portBS(parts[1]) data["fuel"]["pct"] = int(parts[2].strip()) data["org"] = dict() data["org"]["sale"], data["org"]["units"] = portBS(parts[3]) data["org"]["pct"] = int(parts[4].strip()) data["equ"] = dict() data["equ"]["sale"], data["equ"]["units"] = portBS(parts[5]) data["equ"]["pct"] = int(parts[6].strip()) # Store what this port is buying/selling data["port"] = ( data["fuel"]["sale"] + data["org"]["sale"] + data["equ"]["sale"] ) # Convert BBS/SBB to Class number 1-8 data["class"] = CLASSES_PORT[data["port"]] self.portdata[port] = data else: log.msg("CIMPortReport:", line, "???") def __del__(self): log.msg("CIMPortReport {0} RIP".format(self)) def whenDone(self): self.defer = defer.Deferred() # Call this to chain something after we exit. return self.defer def player(self, chunk): """ Data from player (in bytes). """ chunk = chunk.decode("utf-8", "ignore") key = chunk.upper() log.msg("CIMPortReport.player({0}) : I AM stopping...".format(key)) # Stop the keepalive if we are activating something else # or leaving... # self.keepalive.stop() self.queue_game.put("\b \b\r\n") if not self.defer is None: # We have something, so: self.game.to_player = self.to_player self.observer.load(self.save) self.save = None self.defer.errback(Exception("User Abort")) self.defer = None else: # Still "exit" out. self.game.to_player = self.to_player self.observer.load(self.save) class ScriptPort(object): """ Performs the Port script. """ def __init__(self, game): self.game = game self.queue_game = game.queue_game self.queue_player = game.queue_player self.observer = game.observer self.r = Style.RESET_ALL self.nl = "\n\r" # Activate self.prompt = game.buffer self.save = self.observer.save() self.observer.connect('player', self.player) self.observer.connect("prompt", self.game_prompt) self.observer.connect("game-line", self.game_line) self.defer = None self.queue_game.put( self.nl + "Script based on: Port Pair Trading v2.00" + self.r + self.nl ) self.sector_number = None self.possible_sectors = None self.state = 1 self.queue_player.put("D") # Original, send 'D' to display current sector. # We could get the sector number from the self.prompt string -- HOWEVER: # IF! We send 'D', we can also get the sectors around -- we might not even need to # prompt for sector to trade with (we could possibly figure it out ourselves). # [Command [TL=00:00:00]:[967] (?=Help)? : D] # [] # [] # [Sector : 967 in uncharted space.] # [Planets : (M) Into the Darkness] # [Warps to Sector(s) : 397 - (562) - (639)] # [] def whenDone(self): self.defer = defer.Deferred() # Call this to chain something after we exit. return self.defer def deactivate(self): log.msg("ScriptPort.deactivate") assert(not self.save is None) self.observer.load(self.save) self.save = None if self.defer: self.defer.callback('done') self.defer = None def player(self, chunk: bytes): pass def game_prompt(self, prompt: str): log.msg("{0} : {1}".format(self.state, prompt)) if self.state == 3: log.msg("game_prompt: ", prompt) if re.match(r"Command \[TL=.* \(\?=Help\)\? :", prompt): self.state = 4 log.msg("Ok, state 4") self.deactivate() def game_line(self, line: str): if self.state == 1: # First exploration if line.startswith("Sector :"): # We have starting sector information parts = re.split("\s+", line) self.sector_number = int(parts[2]) # These will be the ones swapped around as we trade back and forth. self.sector1 = self.sector_number elif line.startswith("Warps to Sector(s) : "): # Warps to Sector(s) : 397 - (562) - (639) _, _, warps = line.partition(':') warps = warps.replace('-', '').replace('(', '').replace(')', '').strip() log.msg("Warps: [{0}]".format(warps)) self.warps = [ int(x) for x in re.split("\s+", warps)] log.msg("Warps: [{0}]".format(self.warps)) self.state = 2 elif self.state == 2: if line == "": # Ok, we're done self.state = 3 # Check to see if we have information on any possible ports if hasattr(self.game, 'portdata'): if not self.sector_number in self.game.portdata: log.msg("Current sector {0} not in portdata.".format(self.sector_number)) self.queue_game.put(self.r + self.nl + "I can't find the current sector in the portdata." + self.nl) self.deactivate() return possible = [ x for x in self.warps if x in self.game.portdata] log.msg("Possible:", possible) self.possible = possible if len(possible) == 0: self.queue_game.put(self.r + self.nl + "I don't see any ports in [{0}].".format(self.warps) + self.nl) self.deactivate() return elif len(possible) == 1: # Ok! there's only one! self.sector1 = possible[0] # Display possible ports: # spos = [ str(x) for x in possible] # self.queue_game.put(self.r + self.nl + self.nl.join(spos) + self.nl) # At state 3, we only get a prompt. return else: log.msg("We don't have any portdata!") self.queue_game.put(self.r + self.nl + "I have no portdata. Please run CIM Port Report." + self.nl) self.deactivate() return # elif self.state == 3: # log.msg("At state 3 [{0}]".format(line)) # self.queue_game.put("At state 3.") # self.deactivate() # return class ProxyMenu(object): """ Display ProxyMenu Example: from flexible import ProxyMenu if re.match(r"Command \[TL=.* \(\?=Help\)\? :", prompt): menu = ProxyMenu(self.game) """ def __init__(self, game): self.nl = "\n\r" self.c = merge(Style.BRIGHT + Fore.YELLOW + Back.BLUE) self.r = Style.RESET_ALL self.c1 = merge(Style.BRIGHT + Fore.BLUE) self.c2 = merge(Style.NORMAL + Fore.CYAN) self.portdata = None self.game = game self.queue_game = game.queue_game self.observer = game.observer if hasattr(self.game, "portdata"): self.portdata = self.game.portdata else: self.portdata = {} # Yes, at this point we would activate self.prompt = game.buffer self.save = self.observer.save() self.observer.connect("player", self.player) # If we want it, it's here. self.defer = None self.keepalive = task.LoopingCall(self.awake) self.keepalive.start(30) self.menu() def __del__(self): log.msg("ProxyMenu {0} RIP".format(self)) def whenDone(self): self.defer = defer.Deferred() # Call this to chain something after we exit. return self.defer def menu(self): self.queue_game.put( self.nl + self.c + "TradeWars Proxy active." + self.r + self.nl ) def menu_item(ch: str, desc: str): self.queue_game.put( " " + self.c1 + ch + self.c2 + " - " + self.c1 + desc + self.nl ) menu_item("D", "Diagnostics") menu_item("Q", "Quest") menu_item("T", "Display current Time") menu_item("P", "Port CIM Report") menu_item("S", "Scripts") menu_item("X", "eXit") self.queue_game.put(" " + self.c + "-=>" + self.r + " ") def awake(self): log.msg("ProxyMenu.awake()") self.game.queue_player.put(" ") def port_report(self, portdata: dict): self.portdata = portdata self.queue_game.put("Loaded {0} records.".format(len(portdata)) + self.nl) self.welcome_back() def player(self, chunk: bytes): """ Data from player (in bytes). """ chunk = chunk.decode("utf-8", "ignore") key = chunk.upper() log.msg("ProxyMenu.player({0})".format(key)) # Stop the keepalive if we are activating something else # or leaving... self.keepalive.stop() if key == "T": self.queue_game.put(self.c + key + self.r + self.nl) # perform T option now = pendulum.now() self.queue_game.put( self.nl + self.c1 + "Current time " + now.to_datetime_string() + self.nl ) elif key == "P": self.queue_game.put(self.c + key + self.r + self.nl) # Activate CIM Port Report report = CIMPortReport(self.game) d = report.whenDone() d.addCallback(self.port_report) d.addErrback(self.welcome_back) return elif key == "S": self.queue_game.put(self.c + key + self.r + self.nl) self.activate_scripts_menu() return elif key == "D": self.queue_game.put(self.c + key + self.r + self.nl) self.queue_game.put(pformat(self.portdata).replace("\n", "\n\r") + self.nl) elif key == "Q": self.queue_game.put(self.c + key + self.r + self.nl) # This is an example of chaining PlayerInput prompt calls. ask = PlayerInput(self.game) d = ask.prompt("What is your quest?", 40, name="quest", abort_blank=True) # Display the user's input d.addCallback(ask.output) d.addCallback( lambda ignore: ask.prompt( "What is your favorite color?", 10, name="color" ) ) d.addCallback(ask.output) d.addCallback( lambda ignore: ask.prompt( "What is the meaning of the squirrel?", 12, name="squirrel", digits=True, ) ) d.addCallback(ask.output) def show_values(show): log.msg(show) self.queue_game.put(pformat(show).replace("\n", "\n\r") + self.nl) d.addCallback(lambda ignore: show_values(ask.keep)) d.addCallback(self.welcome_back) # On error, just return back # This doesn't seem to be getting called. # d.addErrback(lambda ignore: self.welcome_back) d.addErrback(self.welcome_back) return elif key == "X": self.queue_game.put(self.c + key + self.r + self.nl) self.observer.load(self.save) self.save = None # It isn't running (NOW), so don't try to stop it. # self.keepalive.stop() self.keepalive = None self.queue_game.put(self.prompt) self.prompt = None # Possibly: Send '\r' to re-display the prompt # instead of displaying the original one. # Were we asked to do something when we were done here? if self.defer: reactor.CallLater(0, self.defer.callback) # self.defer.callback() self.defer = None return self.keepalive.start(30, True) self.menu() def activate_scripts_menu(self): self.observer.disconnect("player", self.player) self.observer.connect("player", self.scripts_player) self.scripts_menu() def deactivate_scripts_menu(self): self.observer.disconnect("player", self.scripts_player) self.observer.connect("player", self.player) self.welcome_back() def scripts_menu(self, *_): c1 = merge(Style.BRIGHT + Fore.CYAN) c2 = merge(Style.NORMAL + Fore.CYAN) def menu_item(ch, desc): self.queue_game.put( " " + c1 + ch + c2 + " - " + c1 + desc + self.nl ) menu_item("1", "Ports (Trades between two sectors)") menu_item("2", "TODO") menu_item("3", "TODO") menu_item("X", "eXit") self.queue_game.put(" " + c1 + "-=>" + self.r + " ") def scripts_player(self, chunk: bytes): """ Data from player (in bytes). """ chunk = chunk.decode("utf-8", "ignore") key = chunk.upper() if key == '1': self.queue_game.put(self.c + key + self.r + self.nl) # Activate this magical event here ports = ScriptPort(self.game) d = ports.whenDone() d.addCallback(self.scripts_menu) d.addErrback(self.scripts_menu) return elif key == 'X': self.queue_game.put(self.c + key + self.r + self.nl) self.deactivate_scripts_menu() return else: self.queue_game.put(self.c + "?" + self.r + self.nl) self.scripts_menu() def welcome_back(self, *_): log.msg("welcome_back") self.keepalive.start(30, True) self.menu()