from twisted.internet import reactor from twisted.internet import task from twisted.internet import defer from colorama import Fore, Back, Style # from twisted.python import log from twisted.internet.task import coiterate from twisted.internet.defer import gatherResults from itertools import cycle import pendulum from pprint import pformat from galaxy import GameData, PORT_CLASSES, CLASSES_PORT from boxes import Boxes import logging log = logging.getLogger(__name__) class SpinningCursor(object): """ Spinner class, that handles every so many clicks s = SpinningCursor(5) # every 5 for x in range(10): if s.click(): print(s.cycle()) """ def __init__(self, every=10): self.itercycle = cycle(["/", "-", "\\", "|"]) self.count = 0 self.every = every def reset(self): self.itercycle = cycle(["/", "-", "\\", "|"]) self.count = 0 def click(self): self.count += 1 return self.count % self.every == 0 def cycle(self): return next(self.itercycle) def merge(color_string): """ Given a string of colorama ANSI, merge them if you can. """ return color_string.replace("m\x1b[", ";") class PlayerInput(object): """ Player Input Example: from flexible import PlayerInput ask = PlayerInput(self.game) # abort_blank means, if the input field is blank, abort. Use error_back. d = ask.prompt("What is your quest?", 40, name="quest", abort_blank=True) # Display the user's input / but not needed. d.addCallback(ask.output) d.addCallback( lambda ignore: ask.prompt( "What is your favorite color?", 10, name="color" ) ) d.addCallback(ask.output) d.addCallback( lambda ignore: ask.prompt( "What is your least favorite number?", 12, name="number", digits=True, ) ) d.addCallback(ask.output) def show_values(show): log.debug(show) self.queue_game.put(pformat(show).replace("\n", "\n\r") + self.nl) d.addCallback(lambda ignore: show_values(ask.keep)) d.addCallback(self.welcome_back) # On error, just return back d.addErrback(self.welcome_back) """ def __init__(self, game): # I think game gives us access to everything we need self.game = game self.observer = self.game.observer self.save = None self.deferred = None self.queue_game = game.queue_game self.keep = {} # default colors self.c = merge(Style.BRIGHT + Fore.WHITE + Back.BLUE) self.cp = merge(Style.BRIGHT + Fore.YELLOW + Back.BLUE) # useful constants self.r = Style.RESET_ALL self.nl = "\n\r" self.bsb = "\b \b" self.keepalive = None def color(self, c): self.c = c def colorp(self, cp): self.cp = cp def alive(self): log.debug("PlayerInput.alive()") self.game.queue_player.put(" ") def prompt(self, user_prompt, limit, **kw): """ Generate prompt for user input. Note: This returns deferred. prompt = text displayed. limit = # of characters allowed. default = (text to default to) keywords: abort_blank : Abort if they give us blank text. name : Stores the input in self.keep dict. digits : Only allow 0-9 to be entered. """ log.debug("PlayerInput({0}, {1}, {2}".format(user_prompt, limit, kw)) self.limit = limit self.input = "" self.kw = kw assert self.save is None assert self.keepalive is None # Note: This clears out the server "keep alive" self.save = self.observer.save() self.observer.connect("player", self.get_input) self.keepalive = task.LoopingCall(self.alive) self.keepalive.start(30) # We need to "hide" the game output. # Otherwise it WITH mess up the user input display. self.to_player = self.game.to_player self.game.to_player = False # Display prompt # self.queue_game.put(self.r + self.nl + self.c + user_prompt + " " + self.cp) self.queue_game.put(self.r + self.c + user_prompt + self.r + " " + self.cp) # Set "Background of prompt" self.queue_game.put(" " * limit + "\b" * limit) assert self.deferred is None d = defer.Deferred() self.deferred = d log.debug("Return deferred ...") return d def get_input(self, chunk): """ Data from player (in bytes) """ chunk = chunk.decode("latin-1", "ignore") for ch in chunk: if ch == "\b": if len(self.input) > 0: self.queue_game.put(self.bsb) self.input = self.input[0:-1] else: self.queue_game.put("\a") elif ch == "\r": self.queue_game.put(self.r + self.nl) log.debug("Restore observer dispatch", self.save) assert not self.save is None self.observer.load(self.save) self.save = None log.debug("Disable keepalive") self.keepalive.stop() self.keepalive = None line = self.input self.input = "" assert not self.deferred is None self.game.to_player = self.to_player # If they gave us the keyword name, save the value as that name if "name" in self.kw: self.keep[self.kw["name"]] = line if "abort_blank" in self.kw and self.kw["abort_blank"]: # Abort on blank input if line.strip() == "": # Yes, input is blank, abort. log.info("errback, abort_blank") reactor.callLater( 0, self.deferred.errback, Exception("abort_blank") ) self.deferred = None return # Ok, use deferred.callback, or reactor.callLater? # self.deferred.callback(line) reactor.callLater(0, self.deferred.callback, line) self.deferred = None return elif ch.isprintable(): # Printable, but is it acceptable? if "digits" in self.kw: if not ch.isdigit(): self.queue_game.put("\a") continue if len(self.input) + 1 <= self.limit: self.input += ch self.queue_game.put(ch) else: self.queue_game.put("\a") def output(self, line): """ A default display of what they just input. """ log.debug("PlayerInput.output({0})".format(line)) self.game.queue_game.put(self.r + "[{0}]".format(line) + self.nl) return line import re # The CIMWarpReport -- is only needed if the json file gets damaged in some way. # or needs to be reset. The warps should automatically update themselves now. class CIMWarpReport(object): def __init__(self, game): self.game = game self.queue_game = game.queue_game self.queue_player = game.queue_player self.observer = game.observer # Yes, at this point we would activate self.prompt = game.buffer self.save = self.observer.save() # I actually don't want the player input, but I'll grab it anyway. self.observer.connect("player", self.player) self.observer.connect("prompt", self.game_prompt) self.observer.connect("game-line", self.game_line) # If we want it, it's here. self.defer = None self.to_player = self.game.to_player # Hide what's happening from the player self.game.to_player = False self.queue_player.put("^") # Activate CIM self.state = 1 # self.warpdata = {} self.warpcycle = SpinningCursor() def game_prompt(self, prompt): if prompt == ": ": if self.state == 1: # Ok, then we're ready to request the port report self.warpcycle.reset() self.queue_player.put("I") self.state = 2 elif self.state == 2: self.queue_player.put("Q") self.state = 3 if re.match(r"Command \[TL=.* \(\?=Help\)\? :", prompt): if self.state == 3: # Ok, time to exit # exit from this... self.game.to_player = self.to_player self.observer.load(self.save) self.save = None # self.game.warpdata = self.warpdata self.queue_game.put("\b \b\r\n") if not self.defer is None: self.defer.callback(self.game.gamedata.warps) self.defer = None def game_line(self, line): if line == "" or line == ": ": return if line == ": ENDINTERROG": return if line.startswith('Command [TL='): return # This should be the CIM Report Data -- parse it if self.warpcycle: if self.warpcycle.click(): self.queue_game.put("\b" + self.warpcycle.cycle()) work = line.strip() parts = re.split(r"(?<=\d)\s", work) parts = [int(x) for x in parts] sector = parts.pop(0) # tuples are nicer on memory, and the warpdata map isn't going to be changing. # self.warpdata[sector] = tuple(parts) self.game.gamedata.warp_to(sector, *parts) def __del__(self): log.debug("CIMWarpReport {0} RIP".format(self)) def whenDone(self): self.defer = defer.Deferred() # Call this to chain something after we exit. return self.defer def player(self, chunk): """ Data from player (in bytes). """ chunk = chunk.decode("latin-1", "ignore") key = chunk.upper() log.warn("CIMWarpReport.player({0}) : I AM stopping...".format(key)) # Stop the keepalive if we are activating something else # or leaving... # self.keepalive.stop() self.queue_game.put("\b \b\r\n") if not self.defer is None: # We have something, so: self.game.to_player = self.to_player self.observer.load(self.save) self.save = None self.defer.errback(Exception("User Abort")) self.defer = None else: # Still "exit" out. self.game.to_player = self.to_player self.observer.load(self.save) # the CIMPortReport will still be needed. # We can't get fresh report data (that changes) any other way. class CIMPortReport(object): """ Parse data from CIM Port Report Example: from flexible import CIMPortReport report = CIMPortReport(self.game) d = report.whenDone() d.addCallback(self.port_report) d.addErrback(self.welcome_back) def port_report(self, portdata): self.portdata = portdata self.queue_game.put("Loaded {0} records.".format(len(portdata)) + self.nl) self.welcome_back() def welcome_back(self,*_): ... restore keep alive timers, etc. """ def __init__(self, game): self.game = game self.queue_game = game.queue_game self.queue_player = game.queue_player self.observer = game.observer # Yes, at this point we would activate self.prompt = game.buffer self.save = self.observer.save() # I actually don't want the player input, but I'll grab it anyway. self.observer.connect("player", self.player) self.observer.connect("prompt", self.game_prompt) self.observer.connect("game-line", self.game_line) # If we want it, it's here. self.defer = None self.to_player = self.game.to_player log.debug("to_player (stored)", self.to_player) # Hide what's happening from the player self.game.to_player = False self.queue_player.put("^") # Activate CIM self.state = 1 # self.portdata = {} self.portcycle = SpinningCursor() def game_prompt(self, prompt): if prompt == ": ": if self.state == 1: # Ok, then we're ready to request the port report self.portcycle.reset() self.queue_player.put("R") self.state = 2 elif self.state == 2: self.queue_player.put("Q") self.state = 3 if re.match(r"Command \[TL=.* \(\?=Help\)\? :", prompt): if self.state == 3: # Ok, time to exit # exit from this... self.game.to_player = self.to_player self.observer.load(self.save) self.save = None # self.game.portdata = self.portdata self.queue_game.put("\b \b\r\n") if not self.defer is None: self.defer.callback(self.game.gamedata.ports) self.defer = None def game_line(self, line: str): if line == "" or line == ": ": return if line == ": ENDINTERROG": return # This should be the CIM Report Data -- parse it if self.portcycle: if self.portcycle.click(): self.queue_game.put("\b" + self.portcycle.cycle()) work = line.replace("%", "") parts = re.split(r"(?<=\d)\s", work) if len(parts) == 8: port = int(parts[0].strip()) data = dict() def portBS(info): if info[0] == "-": bs = "B" else: bs = "S" return (bs, int(info[1:].strip())) data["fuel"] = dict() data["fuel"]["sale"], data["fuel"]["units"] = portBS(parts[1]) data["fuel"]["pct"] = int(parts[2].strip()) data["org"] = dict() data["org"]["sale"], data["org"]["units"] = portBS(parts[3]) data["org"]["pct"] = int(parts[4].strip()) data["equ"] = dict() data["equ"]["sale"], data["equ"]["units"] = portBS(parts[5]) data["equ"]["pct"] = int(parts[6].strip()) # Store what this port is buying/selling data["port"] = ( data["fuel"]["sale"] + data["org"]["sale"] + data["equ"]["sale"] ) # Convert BBS/SBB to Class number 1-8 data["class"] = CLASSES_PORT[data["port"]] self.game.gamedata.set_port(port, data) # self.portdata[port] = data else: log.error("CIMPortReport:", line, "???") def __del__(self): log.debug("CIMPortReport {0} RIP".format(self)) def whenDone(self): self.defer = defer.Deferred() # Call this to chain something after we exit. return self.defer def player(self, chunk): """ Data from player (in bytes). """ chunk = chunk.decode("latin-1", "ignore") key = chunk.upper() log.warn("CIMPortReport.player({0}) : I AM stopping...".format(key)) # Stop the keepalive if we are activating something else # or leaving... # self.keepalive.stop() self.queue_game.put("\b \b\r\n") if not self.defer is None: # We have something, so: self.game.to_player = self.to_player self.observer.load(self.save) self.save = None self.defer.errback(Exception("User Abort")) self.defer = None else: # Still "exit" out. self.game.to_player = self.to_player self.observer.load(self.save) class ScriptPort(object): """ Performs the Port script. This is close to the original. We don't ask for the port to trade with -- because that information is available to us after "D" (display). We look at the adjacent sectors, and see if we know any ports. If the ports are burnt (< 20%), we remove them from the list. If there's just one, we use it. Otherwise we ask them to choose. """ def __init__(self, game): self.game = game self.queue_game = game.queue_game self.queue_player = game.queue_player self.observer = game.observer self.r = Style.RESET_ALL self.nl = "\n\r" self.this_sector = None # Starting sector self.sector1 = None # Current Sector self.sector2 = None # Next Sector Stop self.percent = 5 # Stick with the good default. self.credits = 0 self.last_credits = None self.times_left = 0 # Activate self.prompt = game.buffer self.save = self.observer.save() self.observer.connect('player', self.player) self.observer.connect("prompt", self.game_prompt) self.observer.connect("game-line", self.game_line) self.defer = None self.queue_game.put( self.nl + "Script based on: Port Pair Trading v2.00" + self.r + self.nl ) self.possible_sectors = None self.state = 1 self.queue_player.put("D") # Original, send 'D' to display current sector. # We could get the sector number from the self.prompt string -- HOWEVER: # IF! We send 'D', we can also get the sectors around -- we might not even need to # prompt for sector to trade with (we could possibly figure it out ourselves). # [Command [TL=00:00:00]:[967] (?=Help)? : D] # [] # [] # [Sector : 967 in uncharted space.] # [Planets : (M) Into the Darkness] # [Warps to Sector(s) : 397 - (562) - (639)] # [] def whenDone(self): self.defer = defer.Deferred() # Call this to chain something after we exit. return self.defer def deactivate(self): self.state = 0 log.debug("ScriptPort.deactivate ({0})".format(self.times_left)) self.queue_game.put(self.nl + Boxes.alert("Trading Script deactivating...", width=50)) assert(not self.save is None) self.observer.load(self.save) self.save = None if self.defer: self.defer.callback('done') self.defer = None def player(self, chunk: bytes): # If we receive anything -- ABORT! self.deactivate() def game_prompt(self, prompt: str): log.debug("{0} : {1}".format(self.state, prompt)) if self.state == 3: # log.("game_prompt: ", prompt) if re.match(r"Command \[TL=.* \(\?=Help\)\? :", prompt): self.state = 4 log.debug("Ok, state 4") if self.sector2 is None: # Ok, we need to prompt for this. self.queue_game.put(self.r + self.nl + "Which sector to trade with? {0}".format(GameData.port_show_part(self.this_sector, self.game.gamedata.ports[self.this_sector])) + self.nl) for i, p in enumerate(self.possible): self.queue_game.put(" " + Fore.CYAN + str(i + 1) + " : " + GameData.port_show_part(p, self.game.gamedata.ports[p]) + self.nl) pi = PlayerInput(self.game) def got_need1(*_): log.debug("Ok, I have:", pi.keep) if pi.keep['count'].strip() == '': self.deactivate() return self.times_left = int(pi.keep['count']) if pi.keep['choice'].strip() == '': self.deactivate() return c = int(pi.keep['choice']) -1 if c < 0 or c >= len(self.possible): self.deactivate() return self.sector2 = self.possible[int(pi.keep['choice']) -1] # self.queue_game.put(pformat(pi.keep).replace("\n", "\n\r")) self.state = 5 self.trade() d = pi.prompt("Choose -=>", 5, name='choice', digits=True) d.addCallback(lambda ignore: pi.prompt("Times to execute script:", 5, name='count', digits=True)) d.addCallback(got_need1) else: # We already have our target port, so... self.queue_game.put(self.r + self.nl + "Trading from {0} ({1}) to default {2} ({3}).".format( self.this_sector, self.game.gamedata.ports[self.this_sector]['port'], self.sector2, self.game.gamedata.ports[self.sector2]['port']) + self.nl ) self.queue_game.put(self.r + self.nl + "Trading {0}".format( self.game.gamedata.port_trade_show(self.this_sector, self.sector2) )) pi = PlayerInput(self.game) def got_need2(*_): if pi.keep['count'].strip() == '': self.deactivate() return self.times_left = int(pi.keep['count']) log.debug("Ok, I have:", pi.keep) # self.queue_game.put(pformat(pi.keep).replace("\n", "\n\r")) self.state = 5 self.trade() self.queue_game.put(self.r + self.nl) d = pi.prompt("Times to execute script", 5, name='count') d.addCallback(got_need2) elif self.state == 6: if re.match(r"Command \[TL=.* \(\?=Help\)\? :", prompt): if self.fixable: # self.queue_game.put("Ok! Let's fix this by going to the other sector..." + self.nl) self.queue_game.put(self.nl + Boxes.alert("Ok, FINE. We'll trade with the other port.", base="green", style=0)) log.debug("Fixing...") # Swap this and other sector self.this_sector, self.other_sector = (self.other_sector, self.this_sector) self.queue_player.put("{0}\r".format(self.sector2)) self.state = 5 self.trade() elif self.state == 7: if re.match(r"Command \[TL=.* \(\?=Help\)\? :", prompt): # Done # Swap this and other sector self.this_sector, self.other_sector = (self.other_sector, self.this_sector) if self.this_sector == self.sector2: self.times_left -= 1 if self.times_left <= 0: # Ok, exit out self.deactivate() return if self.last_credits is None: self.last_credits = self.credits else: if self.credits <= self.last_credits: log.warn("We don't seem to be making any money here...") self.queue_game.put(self.r + self.nl + "We don't seem to be making any money here. I'm stopping!" + self.nl) self.deactivate() return self.queue_player.put("{0}\r".format(self.this_sector)) self.state = 10 elif re.match(r'Your offer \[\d+\] \?', prompt): if self.fix_offer: # Make real offer / WHAT?@?! work = prompt.replace(',', '') parts = re.split(r"\s+", work) amount = parts[2] # Ok, we have the amount, now to figure pct... if self.sell_pct > 100: self.sell_pct -= 1 else: self.sell_pct += 1 price = amount * self.sell_pct // 100 log.debug("start: {0} % {1} price {2}".format(amount, self.sell_perc, price)) if self.sell_pct > 100: self.sell_pct -= 1 else: self.sell_pct += 1 self.queue_player.put("{0}\r".format(price)) elif self.state == 8: # What are we trading # How many holds of Equipment do you want to buy [75]? if re.match(r"How many holds of .+ do you want to buy \[\d+\]\?", prompt): parts = prompt.split() trade_type = parts[4] if trade_type == 'Fuel': if (self.tpc in (5,7)) and (self.opc in (2,3,4,8)): # Can buy equipment - fuel ore is worthless. self.queue_player.put("0\r") return if (self.tpc in(4,7)) and (self.opc in (1,3,5,8)): # Can buy organics - fuel ore is worthless. self.queue_player.put("0\r") return if (self.tpc in (4,7,3,5)) and (self.opc in (3,4,5,7)): # No point in buying fuel ore if it can't be sold. self.queue_player.put("0\r") return elif trade_type == 'Organics': if (self.tpc in (6,7)) and (self.opc in (2,3,4,8)): # Can buy equipment - organics is worthless. self.queue_player.put("0\r") return if (self.tpc in (2,4,6,7)) and (self.opc in (2,4,6,7)): # No point in buying organics if it can't be sold. self.queue_player.put("0\r") return elif trade_type == 'Equipment': if (self.opc in (1,5,6,7)): # No point in buying equipment if it can't be sold. self.queue_player.put("0\r") return self.queue_player.put("\r") elif re.match(r"Command \[TL=.* \(\?=Help\)\? :", prompt): # Done # Swap this and other sector self.this_sector, self.other_sector = (self.other_sector, self.this_sector) if self.this_sector == self.sector2: self.times_left -= 1 if self.times_left <= 0: # Ok, exit out self.deactivate() return if self.last_credits is None: self.last_credits = self.credits else: if self.credits <= self.last_credits: log.warn("We don't seem to be making any money here...") self.queue_game.put(self.r + self.nl + "We don't seem to be making any money here. I'm stopping!" + self.nl) self.deactivate() return self.queue_player.put("{0}\r".format(self.this_sector)) self.state = 10 elif self.state == 99: # This is a good place to deactivate at. if re.match(r"Command \[TL=.* \(\?=Help\)\? :", prompt): if hasattr(self, 'message'): if self.message is not None: self.queue_game.put(self.message) self.message = None self.deactivate() def trade(self, *_): # state 5 log.debug("trade!") self.queue_player.put("pt") # Port Trade self.this_port = self.game.gamedata.ports[self.this_sector] # I think other_sector will alway be correct, but leaving this # for now. FUTURE: TODO: REMOVE if self.this_sector == self.sector1: self.other_sector = self.sector2 else: self.other_sector = self.sector1 self.other_port = self.game.gamedata.ports[self.other_sector] # Ok, perform some calculations self.tpc = self.this_port['class'] self.opc = self.other_port['class'] self.fixable = 0 self.fix_offer = 0 # [ Items Status Trading % of max OnBoard] # [ ----- ------ ------- -------- -------] # [Fuel Ore Selling 2573 93% 0] # [Organics Buying 2960 100% 0] # [Equipment Buying 1958 86% 0] # [] # [] # [You have 1,000 credits and 20 empty cargo holds.] # [] # [We are selling up to 2573. You have 0 in your holds.] # [How many holds of Fuel Ore do you want to buy [20]? 0] def game_line(self, line: str): if line.startswith("You have ") and 'credits and' in line: parts = line.replace(',', '').split() credits = int(parts[2]) log.debug("Credits: {0}".format(credits)) self.credits = credits if self.state == 1: # First exploration if line.startswith("Sector :"): # We have starting sector information parts = re.split("\s+", line) self.this_sector = int(parts[2]) # These will be the ones swapped around as we trade back and forth. self.sector1 = self.this_sector elif line.startswith("Warps to Sector(s) : "): # Warps to Sector(s) : 397 - (562) - (639) _, _, warps = line.partition(':') warps = warps.replace('-', '').replace('(', '').replace(')', '').strip() log.debug("Warps: [{0}]".format(warps)) self.warps = [ int(x) for x in re.split("\s+", warps)] log.debug("Warps: [{0}]".format(self.warps)) self.state = 2 elif self.state == 2: if line == "": # Ok, we're done self.state = 3 # Check to see if we have information on any possible ports # if hasattr(self.game, 'portdata'): if True: if not self.this_sector in self.game.gamedata.ports: self.state = 0 log.debug("Current sector {0} not in portdata.".format(self.this_sector)) self.queue_game.put(self.r + self.nl + "I can't find the current sector in the portdata." + self.nl) self.deactivate() return else: # Ok, we are in the portdata pd = self.game.gamedata.ports[self.this_sector] if GameData.port_burnt(pd): log.debug("Current sector {0} port is burnt (<= 20%).".format(self.this_sector)) self.queue_game.put(self.r + self.nl + "Current sector port is burnt out. <= 20%." + self.nl) self.deactivate() return possible = [ x for x in self.warps if x in self.game.gamedata.ports ] log.debug("Possible: {0}".format(possible)) # BUG: Sometimes links to another sector, don't link back! # This causes the game to plot a course / autopilot. # if hasattr(self.game, 'warpdata'): if True: # Great! verify that those warps link back to us! possible = [ x for x in possible if x in self.game.gamedata.warps and self.this_sector in self.game.gamedata.warps[x]] if len(possible) == 0: self.state = 0 self.queue_game.put(self.r + self.nl + "I don't see any ports in [{0}].".format(self.warps) + self.nl) self.deactivate() return possible = [ x for x in possible if not GameData.port_burnt(self.game.gamedata.ports[x]) ] log.debug("Possible: {0}".format(possible)) if len(possible) == 0: self.state = 0 self.queue_game.put(self.r + self.nl + "I don't see any unburnt ports in [{0}].".format(self.warps) + self.nl) self.deactivate() return possible = [ x for x in possible if GameData.port_trading(self.game.gamedata.ports[self.this_sector]['port'], self.game.gamedata.ports[x]['port'])] self.possible = possible if len(possible) == 0: self.state = 0 self.queue_game.put(self.r + self.nl + "I don't see any possible port trades in [{0}].".format(self.warps) + self.nl) self.deactivate() return elif len(possible) == 1: # Ok! there's only one! self.sector2 = possible[0] # Display possible ports: # spos = [ str(x) for x in possible] # self.queue_game.put(self.r + self.nl + self.nl.join(spos) + self.nl) # At state 3, we only get a prompt. return else: self.state = 0 log.warn("We don't have any portdata!") self.queue_game.put(self.r + self.nl + "I have no portdata. Please run CIM Port Report." + self.nl) self.deactivate() return elif self.state == 5: if "-----" in line: self.state = 6 elif self.state == 6: if "We are buying up to" in line: # Sell self.state = 7 self.queue_player.put("\r") self.sell_perc = 100 + self.percent if "We are selling up to" in line: # Buy self.state = 8 self.sell_perc = 100 - self.percent if line.startswith('Fuel Ore') or line.startswith('Organics') or line.startswith('Equipment'): work = line.replace('Fuel Ore', 'Fuel') parts = re.split(r"\s+", work) # log.debug(parts) # Equipment, Selling xxx x% xxx if parts[-1] != '0' and parts[2] != '0' and parts[1] != 'Buying': log.warn("We have a problem -- they aren't buying what we have in stock!") stuff = line[0] # F O or E. if self.game.gamedata.port_buying(self.other_sector, stuff): self.fixable = True if "You don't have anything they want" in line: # Neither! DRAT! if not self.fixable: self.state = 99 return if "We're not interested." in line: log.warn("Try, try again. :(") self.state = 5 self.trade() elif self.state == 7: # Haggle Sell if "We'll buy them for" in line or "Our final offer" in line: if "Our final offer" in line: self.sell_perc -= 1 parts = line.replace(',', '').split() start_price = int(parts[4]) price = start_price * self.sell_perc // 100 log.debug("start: {0} % {1} price {2}".format(start_price, self.sell_perc, price)) self.sell_perc -= 1 self.queue_player.put("{0}\r".format(price)) if "We are selling up to" in line: # Buy self.state = 8 self.sell_perc = 100 - self.percent if "We're not interested." in line: log.info("Try, try again. :(") self.state = 5 self.trade() if "WHAT?!@!? you must be crazy!" in line: log.warn("fix offer") self.fix_offer = 1 if "So, you think I'm as stupid as you look?" in line: log.warn("fix offer") self.fix_offer = 1 if "Quit playing around, you're wasting my time!" in line: log.warn("fix offer") self.fix_offer = 1 elif self.state == 8: # Haggle Buy if "We'll sell them for" in line or "Our final offer" in line: if "Our final offer" in line: self.sell_perc += 1 parts = line.replace(',', '').split() start_price = int(parts[4]) price = start_price * self.sell_perc // 100 log.debug("start: {0} % {1} price {2}".format(start_price, self.sell_perc, price)) self.sell_perc += 1 self.queue_player.put("{0}\r".format(price)) if "We're not interested." in line: log.info("Try, try again. :(") self.state = 5 self.trade() elif self.state == 10: if "Sector : " in line: # Trade self.state = 5 reactor.callLater(0, self.trade, 0) # self.trade() # elif self.state == 3: # log.debug("At state 3 [{0}]".format(line)) # self.queue_game.put("At state 3.") # self.deactivate() # return class ScriptExplore(object): """ Script Explore v1.00 By: David Thielemann WARNINGS: We assume the player has a Holo-Scanner! We assume the player has lots o turns, or unlimited turns! We assume the player is aware we run infinitely until we can't find new sectors to move to! """ def __init__(self, game): self.game = game self.queue_game = game.queue_game self.queue_player = game.queue_player self.observer = game.observer self.r = Style.RESET_ALL self.c = merge(Style.BRIGHT + Fore.YELLOW) self.nl = "\n\r" # Our Stuff, Not our pants! self.dense = [] # We did a density, store that info. self.clear = [] # Warps that we know are clear. self.didDense = False self.didHolo = False self.highsector = 0 # Selected Sector to move to next! self.highwarp = 0 # Selected Sector's Warp Count! self.stacksector = set() # Set of sectors that we have not picked but are unexplored... even though we did a holo! self.oneMoveSector = False self.times = 0 self.maxtimes = 0 # Activate self.prompt = game.buffer self.save = self.observer.save() self.observer.connect('player', self.player) self.observer.connect("prompt", self.game_prompt) self.observer.connect("game-line", self.game_line) self.defer = None self.send2player(Boxes.alert("Explorer v1.01", base="green")) # How many times we going to go today? ask = PlayerInput(self.game) def settimes(*_): times = ask.keep['times'].strip() log.debug("settimes got '{0}'".format(times)) if times == '': self.deactivate() else: times = int(times) self.times = times self.maxtimes = times self.send2game("D") self.state = 1 d = ask.prompt("How many sectors would you like to explorer?", 5, name="times", digits=True) #d.addCallback(ask.output) #d.addCallback(lambda ignore: self.settimes(ask.keep)) d.addCallback(settimes) def whenDone(self): self.defer = defer.Deferred() # Call this to chain something after we exit. return self.defer def deactivate(self): self.state = 0 log.debug("ScriptExplore.deactivate()") assert(not self.save is None) self.observer.load(self.save) self.save = None if self.defer: self.defer.callback('done') self.defer = None def player(self, chunk: bytes): # If we receive anything -- ABORT! self.deactivate() def send2game(self, txt): self.queue_player.put(txt) def send2player(self, txt): self.queue_game.put( self.nl + txt + self.r + self.nl ) def resetStuff(self): self.didDense = False self.didHolo = False self.dense = [] self.clear = [] self.highwarp = 0 self.highsector = 0 log.debug("ScriptExplore.resetStuff()") def dead_end(self): """ We've reached a dead end. Either pop a new location to travel to, or give it up. """ self.send2player(Boxes.alert("** DEAD END **", base="blue")) if self.stacksector: # Ok, there's somewhere to go ... self.highsector = self.stacksector.pop() # travel state self.state = 10 self.send2game("{0}\r".format(self.highsector)) else: self.send2player(Boxes.alert("I've run out of places to look ({0}/{1}).".format(self.maxtimes - self.times, self.maxtimes))) self.deactivate() def game_over(self, msg): self.send2player(Boxes.alert("STOP: {0} ({1}/{2}).".format(msg, self.maxtimes - self.times, self.maxtimes))) self.deactivate() def game_prompt(self, prompt: str): log.debug("{0} : {1}".format(self.state, prompt)) if self.state == 2: if "Select (H)olo Scan or (D)ensity Scan or (Q)uit" in prompt: self.send2game("D") # self.state += 1 elif self.state == 5: log.debug("dense is {0} sectors big".format(len(self.dense))) self.state += 1 self.send2game("SH") elif self.state == 12: # Looking for "Engage the Autopilot?" if prompt.startswith('Engage the Autopilot? (Y/N/Single step/Express) [Y]'): self.send2game("S") self.travel_path.pop(0) if prompt.startswith('Stop in this sector (Y,N,E,I,R,S,D,P,?) (?=Help) [N]'): self.send2game("SD") self.state += 1 # Arriving sector :1691 Autopilot disengaging. if re.match(r"Command \[TL=.* \(\?=Help\)\? :", prompt): log.info("We made it to where we wanted to go!") # can't init state 1, because we're at a prompt, so... self.send2game("S") self.state = 2 return elif self.state == 15: if prompt.startswith('Stop in this sector (Y,N,E,I,R,S,D,P,?) (?=Help) [N]'): self.send2game("N") self.travel_path.pop(0) self.state = 12 if re.match(r"Command \[TL=.* \(\?=Help\)\? :", prompt): log.info("We made it to where we wanted to go!") # can't init state 1, because we're at a prompt, so... self.send2game("S") self.state = 2 return elif self.state == 20: if prompt.startswith('Stop in this sector (Y,N,E,I,R,S,D,P,?) (?=Help) [N]'): # Stop in this sector / Yes! self.send2game("Y") self.state = 1 # this should re-trigger a scan def game_line(self, line: str): log.debug("{0} | {1}".format(self.state, line)) #if "Mine Control" in line: # If we don't have a Holo-Scanner and we attempted to do a Holo-scan, abort # self.deactivate() if self.state == 1: self.send2game("S") self.state += 1 elif self.state == 2: if "Relative Density Scan" in line: self.state = 3 elif "You don't have a long range scanner." in line: log.warn("FATAL: No Long Range Scanner Installed!") self.send2player(Boxes.alert("You need a Long Range Scanner!")) self.deactivate() return # elif "Long Range Scan" in line: # self.state += 1 elif self.state == 3: # Get the Density Data! if line.startswith("Sector"): new_sector = '(' in line work = line.replace(':', '').replace(')', '').replace('(', '').replace('%', '').replace('==>', '') work = re.split(r"\s+", work) log.debug(work) # 'Sector', '8192', '0', 'Warps', '4', 'NavHaz', '0', 'Anom', 'No' # 'Sector', '(', '8192)', '0', 'Warps', '4', 'NavHaz', '0', 'Anom', 'No' # New Sector? if new_sector: # Switch Anom into bool state # if(work[8] == 'No'): # temp = False # else: # temp = True #self.dense.append( {'sector': int(work[1]), 'density': int(work[2]), 'warps': int(work[4]), 'navhaz': int(work[6]), 'anom': temp} ) self.dense.append( {'sector': int(work[1]), 'density': int(work[2]), 'warps': int(work[4]), 'navhaz': int(work[6]), 'anom': work[8] == 'Yes'} ) log.debug(self.dense) # {'sector': 8192, 'density': 0, 'warps': 4, 'navhaz': 0, 'anom': False} elif line == "": self.state += 1 # yeah, this would be better in the above line... # leaving it for now. # Which is why I broke the elif chain. ... if self.state == 4: # Begin Processing our data we got from density scan and find highest warp count in what sectors # Remove sectors with one warp log.debug("state 4: {0}".format(self.dense)) # Do we have a new place to go? (That is also worth going to) if not self.dense: # Dense contains no new sectors, abort log.info("No New Sectors Found!") self.dead_end() return # self.send2player(Boxes.alert("Find a new area for me to search in!")) # Attempt to resolve no new sectors! # if self.stacksector: # Do we have anything on the stack? (If so we set highsector with one of the randomly selected sectors) # self.highsector = self.stacksector.pop() # self.deactivate() # elif self.dense: # Dense does contain at least 1 new sector, continue on # ugh, actually, we don't care about making a list # we only want to know if there's any that have warps > 1. :( # see any: # list comprehension # t = [d for d in self.dense if d['warps'] > 1] # t = [] # Pre-Test to check if there are just a bunch of 1 warp sectors # for d in self.dense: # if d['warps'] > 1: # t.append(d['sector']) # I don't care if there's only one warp. if it's un-explorered then # explore it! # if not any( w['warps'] > 1 for w in self.dense): # # If there are no sectors with more that 1 warp, abort # log.debug("No Sectors Found except one move sector!") # self.dead_end() # return # self.send2player(Boxes.alert("Find a new area for me to look at!")) # Attempt to resolve no new sectors with more than 1 warp! # if self.stacksector: # Do we have anything on the stack? (If so we set highsector with one of the randomly selected sectors) # self.highsector = self.stacksector.pop() # self.deactivate() # Is the sector safe to go into? # self.clear = [ x['sector'] for x in self.dense if not x['anom'] and not x['navhaz'] and x['density'] in (0,1,100,101) and x['warps'] > 1 ] self.clear = [ x for x in self.dense if not x['anom'] and not x['navhaz'] and x['density'] in (0,1,100,101) ] # for d in self.dense: # if not d['anom']: # # Sector does not contain a Anomoly # if not d['navhaz']: # # Sector does not contain Hazards # if d['density'] in (0, 1, 100, 101): # # Sector does contain empty space / a beacon / a port / or a beacon and port # if d['warps'] > 1: # # If Sector is worth checking out? # self.clear.append(d['sector']) if self.clear: # We have sector(s) we can move to! log.debug("Clear Sectors: {0}".format(len(self.clear))) # This was state 5 but why can't we reduce number of states? ( Yeah let's kick California and New York out of the US, oh wrong states :P ) # Sort to find greatest warp count self.highwarp, self.highsector = max( (x['warps'], x['sector']) for x in self.clear) # for c in self.clear: # for d in self.dense: # if d['sector'] == c: # if d['warps'] > self.highwarp: # self.highwarp = d['warps'] # self.highsector = d['sector'] # elif d['warps'] == self.highwarp: # if d['sector'] > self.highsector: # self.highsector = d['sector'] # if self.highwarp and self.highsector: log.info("Sector: {0:5d} Warps: {1}".format(self.highsector, self.highwarp)) self.state += 1 else: log.warn("No (safe) sectors to move to!") # Let's try this?! # self.dead_end() # NO! self.game_over("No SAFE moves.") # Another NOP state. This also could be merged into above. # break the elif chain. if self.state == 5: # Add the dense scan of unknown sectors onto the stack of sectors, only save the ones we think are clear... for now. for c in self.clear: sector = c['sector'] if sector != self.highsector: self.stacksector.add(sector) # Or simply not add it in the first place ... # Remove the sector we are just about to go to, we use discard so if the sector does not exist we don't throw a error! # self.stacksector.discard(self.highsector) # Ok, we need to decide to stop exploring -- before we # issue the sector move! :P # # Warning! Yes we can and will eat all the turns! :P if self.times == 0: self.send2player(Boxes.alert("Completed {0}".format(self.maxtimes), base="green")) log.info("Completed {0}".format(self.maxtimes)) self.deactivate() return self.times -= 1 # Ok we know the sector we want to go to now let's move it! self.send2game("m{0}\r".format(self.highsector)) # Reset Variables for fresh data self.resetStuff() self.state = 1 elif self.state == 10: if line.startswith("You are already in that sector!"): log.info("Already here. (Whoops!)") self.state = 1 return if line.startswith("Sector : {0}".format(self.highsector)): log.info("We're here!") # Ok, we're already there! no autopilot needed! self.state = 1 return # Warping self.go_on = True if line.startswith('The shortest path ('): # Ok, we've got a path. self.state += 1 self.travel_path = [] elif self.state == 11: if line == '': # The end of the (possibly) multiline warp. self.state += 1 self.travel_path.pop(0) # First sector is one we're in. self.stophere = False self.go_on = True else: self.travel_path.extend(line.replace('(', '').replace(')', '').split(' > ') ) log.debug("Travel path: {0}".format(self.travel_path)) elif self.state == 12: # Arriving sector :1691 Autopilot disengaging. if 'Autopilot disengaging.' in line: log.info("We made it to where we wanted to go!") self.state = 1 return elif self.state == 13: if 'Relative Density Scan' in line: self.state += 1 elif self.state == 14: if line == "": log.debug("PATH: {0}".format(self.travel_path)) # end of the scan, decision time if self.stophere: log.info("STOPHERE") # Ok, let's stop here! # Re-save the sector we were trying to get to. (we didn't make it there) self.stacksector.add(self.highsector) self.state = 20 else: if self.go_on: log.info("GO ON") # Ok, carry on! self.state = 15 else: log.warn("Our way is blocked...") self.stacksector.add(self.highsector) self.state = 20 else: if line.strip('-') != '': work = line.replace(' :', '').replace('%', '').replace(')', '').replace('==>', '') # Does this contain something new? unseen? stophere = '(' in work work = work.replace('(','') #Sector XXXX DENS Warps N NavHaz P Anom YN parts = re.split(r'\s+', work) # Don't bother stopping if there's only one warp # YES! Stop, even if there is just one warp! # if stophere and parts[4] == '1': # stophere = False if stophere: self.stophere = True next_stop = self.travel_path[0] log.debug("next_stop {0} from {1}".format(next_stop, self.travel_path)) log.debug("parts: {0}".format(parts)) if parts[1] == next_stop: log.info("next_stop {0} found...".format(next_stop)) # Ok, this is our next stop. Is it safe to travel to? if parts[2] not in ('100', '0', '1', '101'): # Ok, it's not safe to go on. self.go_on = False # Check the rest navhav and anom ... class ScriptSpace(object): """ Space Exploration script. Send "SD", verify paths are clear. Find nearest unknown. Sector + CR. Save "shortest path from to" information. At 'Engage the Autopilot', Send "S" (Single Step) At '[Stop in this sector', Send "SD", verify path is clear. Send "SH" (glean sector/port information along the way.) Send "N" (Next)! Send "SD" / Verify clear. Send "SH" Repeat for Next closest. """ def __init__(self, game): self.game = game self.queue_game = game.queue_game self.queue_player = game.queue_player self.observer = game.observer self.r = Style.RESET_ALL self.nl = "\n\r" self.this_sector = None # Starting sector self.target_sector = None # Sector going to self.path = [] self.times_left = 0 # How many times to look for target self.density = dict() # Results of density scan. (Just the numbers) # Activate self.prompt = game.buffer self.save = self.observer.save() self.observer.connect('player', self.player) self.observer.connect("prompt", self.game_prompt) self.observer.connect("game-line", self.game_line) self.defer = None self.queue_game.put( self.nl + "Bugz (like space), is big." + self.r + self.nl ) self.state = 1 self.queue_player.put("SD") # Get current density scan + also get the current sector. # [Command [TL=00:00:00]:[XXXX] (?=Help)? : D] def whenDone(self): self.defer = defer.Deferred() # Call this to chain something after we exit. return self.defer def deactivate(self): self.state = 0 log.debug("ScriptPort.deactivate ({0})".format(self.times_left)) assert(not self.save is None) self.observer.load(self.save) self.save = None if self.defer: self.defer.callback('done') self.defer = None def player(self, chunk: bytes): # If we receive anything -- ABORT! self.deactivate() def unknown_search(self, starting_sector): seen = set() possible = set() possible.add(int(starting_sector)) done = False while not done: next_possible = set() for s in possible: p = self.game.gamedata.get_warps(s) if p is not None: for pos in p: if pos not in seen: next_possible.add(pos) else: log.debug("unknown found:", s) self.unknown = s done = True break seen.add(s) if self.unknown is None: log.debug("possible:", next_possible) possible = next_possible yield def find_unknown(self, starting_sector): log.debug("find_unknown( {0})".format(starting_sector)) d = defer.Deferred() # Process things self.unknown = None c = coiterate(self.unknown_search(starting_sector)) c.addCallback(lambda unknown: d.callback(self.unknown)) return d def show_unknown(self, sector): if sector is None: self.deactivate() return log.debug("Travel to {0}...".format(sector)) self.queue_player.put("{0}\r".format(sector)) def game_prompt(self, prompt: str): log.debug("{0} : {1}".format(self.state, prompt)) if self.state == 3: if re.match(r"Command \[TL=.* \(\?=Help\)\? :", prompt): # this_sector code isn't working -- so! Get sector from prompt self.state = 4 _, _, sector = prompt.partition(']:[') sector, _, _ = sector.partition(']') self.this_sector = int(sector) # Ok, we're done with Density Scan, and we're back at the command prompt log.debug("Go find the nearest unknown...") d = self.find_unknown(sector) d.addCallback(self.show_unknown) elif self.state == 6: # Engage the autopilot? if prompt.startswith('Engage the Autopilot? (Y/N/Single step/Express) [Y]'): self.state = 7 sector = self.path.pop(0) if sector in self.density: if self.density[sector] in (0, 100): # Ok, looks safe! self.queue_player.put("S") self.this_sector = sector else: log.warn("FATAL: Density for {0} is {1}".format(sector, self.density[sector])) self.deactivate() return else: log.error("{0} not in density scan? (how's that possible?)".format(sector)) self.deactivate() return elif self.state == 7: # Ok, we're in a new sector (single stepping through space) # update density scan if prompt.startswith('Stop in this sector (Y,N,E,I,R,S,D,P,?) (?=Help) [N] ?'): self.queue_player.put("SD") self.state = 8 elif self.state == 10: # Because we're here if prompt.startswith('Stop in this sector (Y,N,E,I,R,S,D,P,?) (?=Help) [N] ?'): self.queue_player.put("SH") self.state = 11 elif self.state == 11: if prompt.startswith('Stop in this sector (Y,N,E,I,R,S,D,P,?) (?=Help) [N] ?'): # Ok, is the density scan clear? sector = self.path.pop(0) if sector in self.density: if self.density[sector] in (0, 100): # Ok, looks safe self.queue_player.put("N") self.state = 7 self.this_sector = sector else: log.warn("FATAL: Density for {0} is {1}".format(sector, self.density[sector])) self.deactivate() return else: log.error("{0} not in density scane? (how's that possible...)".format(sector)) self.deactivate() return def next_unknown(self, sector): log.info("Unknown is :", sector) self.deactivate() def game_line(self, line: str): log.debug("line {0} : {1}".format(self.state, line)) if line.startswith('Sector : '): work = line.strip() parts = re.split(r"\s+", work) self.this_sector = int(parts[2]) log.debug("game_line sector", self.this_sector) elif line.startswith("Command [TL=]"): # Ok, get the current sector from this _, _, sector = line.partition("]:[") sector, _, _ = sector.partition("]") self.this_sector = int(sector) log.debug("current sector: {0}".format(self.this_sector)) elif line.startswith('Warps to Sector(s) :'): # Warps to Sector(s) : 5468 _, _, work = line.partition(':') work = work.strip().replace('(', '').replace(')', '').replace(' - ', ' ') parts = [ int(x) for x in work.split(' ')] self.path = list(parts) if self.state in (1, 8): if 'Relative Density Scan' in line: # Start Density Scan self.state += 1 self.density = {} elif self.state in (2, 9): if line == '': # End of Density Scan self.state += 1 log.debug("Density: {0}".format(self.density)) # self.deactivate() elif line.startswith('Sector'): # Parse Density Scan values work = line.replace('(', '').replace(')', '').replace(':', '').replace('%', '').replace(',', '') parts = re.split(r'\s+', work) log.debug("Sector", parts) sector = int(parts[1]) self.density[sector] = int(parts[3]) if parts[7] != '0': log.warn("NavHaz {0} : {1}".format(parts[7], work)) self.density[sector] += 99 if parts[9] != 'No': log.warn("Anom {0} : {1}".format(parts[9], work)) self.density[sector] += 990 elif self.state == 4: # Looking for shortest path message / warp info # Or possibly, "We're here!" if line.startswith('Sector :') and str(self.unknown) in line: # Ok, I'd guess that we're already there! # Try it again! self.queue_player.put("SD") self.state = 1 if line.startswith('The shortest path'): self.state = 5 elif self.state == 5: # This is the warps line # Can this be multiple lines? if line == "": self.state = 6 else: work = line.replace("(", "").replace(")", "").replace(">", "").strip() self.path = [int(x) for x in work.split()] log.debug("Path:", self.path) # Verify current = self.path.pop(0) if current != self.this_sector: log.warn("Failed: {0} != {1}".format(current, self.this_sector)) self.deactivate() return elif self.state == 7: if self.unknown == self.this_sector: # We have arrived! log.info("We're here!") self.deactivate() from boxes import Boxes class ProxyMenu(object): """ Display ProxyMenu Example: from flexible import ProxyMenu if re.match(r"Command \[TL=.* \(\?=Help\)\? :", prompt): menu = ProxyMenu(self.game) """ def __init__(self, game): self.nl = "\n\r" self.c = merge(Style.BRIGHT + Fore.YELLOW + Back.BLUE) self.r = Style.RESET_ALL self.c1 = merge(Style.BRIGHT + Fore.WHITE + Back.BLUE) self.c2 = merge(Style.NORMAL + Fore.CYAN + Back.BLUE) # self.portdata = None self.game = game self.queue_game = game.queue_game self.observer = game.observer # Am I using self or game? (I think I want game, not self.) # if hasattr(self.game, "portdata"): # self.portdata = self.game.portdata # else: # self.portdata = {} # if hasattr(self.game, 'warpdata'): # self.warpdata = self.game.warpdata # else: # self.warpdata = {} if hasattr(self.game, 'trade_report'): self.trade_report = self.game.trade_report else: self.trade_report = [] # Yes, at this point we would activate self.prompt = game.buffer self.save = self.observer.save() self.observer.connect("player", self.player) # If we want it, it's here. self.defer = None self.keepalive = task.LoopingCall(self.awake) self.keepalive.start(30) self.menu() def __del__(self): log.debug("ProxyMenu {0} RIP".format(self)) def whenDone(self): self.defer = defer.Deferred() # Call this to chain something after we exit. return self.defer def menu(self): box = Boxes(30, color=self.c) self.queue_game.put(box.top()) self.queue_game.put( box.row(self.c + "{0:^30}".format("TradeWars Proxy Active"))) self.queue_game.put(box.middle()) def menu_item(ch: str, desc: str): row = self.c1 + " {0} {1}- {2}{3:25}".format(ch, self.c2, self.c1, desc) self.queue_game.put(box.row(row)) # self.queue_game.put( # " " + self.c1 + ch + self.c2 + " - " + self.c1 + desc + self.nl # ) menu_item("D", "Display Report again") # menu_item("Q", "Quest") # if hasattr(self.game, 'portdata'): # ports = len(self.game.portdata) # else: # ports = '?' menu_item("P", "Port CIM Report ({0})".format(len(self.game.gamedata.ports))) # if hasattr(self.game, 'warpdata'): # warps = len(self.game.warpdata) # else: # warps = '?' menu_item("W", "Warp CIM Report ({0})".format(len(self.game.gamedata.warps))) menu_item("T", "Trading Report") menu_item("S", "Scripts") menu_item("X", "eXit") self.queue_game.put(box.bottom()) self.queue_game.put(" " + self.c + "-=>" + self.r + " ") def awake(self): log.info("ProxyMenu.awake()") self.game.queue_player.put(" ") def port_report(self, portdata: dict): # self.portdata = portdata # self.game.portdata = portdata self.queue_game.put("Loaded {0} ports.".format(len(portdata)) + self.nl) self.welcome_back() def warp_report(self, warpdata: dict): # self.warpdata = warpdata # self.game.warpdata = warpdata self.queue_game.put("Loaded {0} sectors.".format(len(warpdata)) + self.nl) self.welcome_back() def make_trade_report(self): log.debug("make_trade_report()") ok_trades = [] best_trades = [] # for sector, pd in self.game.gamedata.ports.items(): for sector in sorted(self.game.gamedata.ports.keys()): pd = self.game.gamedata.ports[sector] if not GameData.port_burnt(pd): pc = pd['class'] # Ok, let's look into it. if not sector in self.game.gamedata.warps: continue warps = self.game.gamedata.warps[sector] for w in warps: # Verify that we have that warp's info, and that the sector is in it. # (We can get back from it) if w in self.game.gamedata.warps and sector in self.game.gamedata.warps[w]: # Ok, we can get there -- and get back! if w > sector and w in self.game.gamedata.ports and not GameData.port_burnt(self.game.gamedata.ports[w]): # it is > and has a port. wd = self.game.gamedata.ports[w] wc = wd['class'] # 1: "BBS", # 2: "BSB", # 3: "SBB", # 4: "SSB", # 5: "SBS", # 6: "BSS", # 7: "SSS", # 8: "BBB", if pc in (1,5) and wc in (2,4): best_trades.append(self.game.gamedata.port_trade_show(sector, w)) # best_trades.append( "{0:5} -=- {1:5}".format(sector, w)) elif pc in (2,4) and wc in (1,5): best_trades.append(self.game.gamedata.port_trade_show(sector, w)) # best_trades.append( "{0:5} -=- {1:5}".format(sector, w)) elif GameData.port_trading(pd['port'], self.game.gamedata.ports[w]['port']): # ok_trades.append( "{0:5} -=- {1:5}".format(sector,w)) ok_trades.append(self.game.gamedata.port_trade_show(sector, w)) yield self.trade_report.append("Best Trades: (org/equ)") self.trade_report.extend(best_trades) self.trade_report.append("Ok Trades:") self.trade_report.extend(ok_trades) # self.queue_game.put("BEST TRADES:" + self.nl + self.nl.join(best_trades) + self.nl) # self.queue_game.put("OK TRADES:" + self.nl + self.nl.join(ok_trades) + self.nl) def show_trade_report(self, *_): self.game.trade_report = self.trade_report for t in self.trade_report: self.queue_game.put(t + self.nl) self.welcome_back() def player(self, chunk: bytes): """ Data from player (in bytes). """ chunk = chunk.decode("latin-1", "ignore") key = chunk.upper() log.debug("ProxyMenu.player({0})".format(key)) # Stop the keepalive if we are activating something else # or leaving... self.keepalive.stop() if key == "T": self.queue_game.put(self.c + key + self.r + self.nl) # Trade Report # do we have enough information to do this? # if not hasattr(self.game, 'portdata') and not hasattr(self.game, 'warpdata'): # self.queue_game.put("Missing portdata and warpdata." + self.nl) # elif not hasattr(self.game, 'portdata'): # self.queue_game.put("Missing portdata." + self.nl) # elif not hasattr(self.game, 'warpdata'): # self.queue_game.put("Missing warpdata." + self.nl) # else: if True: # Yes, so let's start! self.trade_report = [] d = coiterate(self.make_trade_report()) d.addCallback(self.show_trade_report) return elif key == "P": self.queue_game.put(self.c + key + self.r + self.nl) self.game.gamedata.reset_ports() # Activate CIM Port Report report = CIMPortReport(self.game) d = report.whenDone() d.addCallback(self.port_report) d.addErrback(self.welcome_back) return elif key == "W": self.queue_game.put(self.c + key + self.r + self.nl) self.game.gamedata.reset_warps() # Activate CIM Warp Report report = CIMWarpReport(self.game) d = report.whenDone() d.addCallback(self.warp_report) d.addErrback(self.welcome_back) return elif key == "S": self.queue_game.put(self.c + key + self.r + self.nl) # Scripts self.activate_scripts_menu() return elif key == "D": self.queue_game.put(self.c + key + self.r + self.nl) # (Re) Display Trade Report if self.trade_report: for t in self.trade_report: self.queue_game.put(t + self.nl) else: self.queue_game.put("Missing trade_report." + self.nl) # self.queue_game.put(pformat(self.portdata).replace("\n", "\n\r") + self.nl) # self.queue_game.put(pformat(self.warpdata).replace("\n", "\n\r") + self.nl) elif key == "Q": self.queue_game.put(self.c + key + self.r + self.nl) # This is an example of chaining PlayerInput prompt calls. ask = PlayerInput(self.game) d = ask.prompt("What is your quest?", 40, name="quest", abort_blank=True) # Display the user's input d.addCallback(ask.output) d.addCallback( lambda ignore: ask.prompt( "What is your favorite color?", 10, name="color" ) ) d.addCallback(ask.output) d.addCallback( lambda ignore: ask.prompt( "What is the meaning of the squirrel?", 12, name="squirrel", digits=True, ) ) d.addCallback(ask.output) def show_values(show): log.debug(show) self.queue_game.put(pformat(show).replace("\n", "\n\r") + self.nl) d.addCallback(lambda ignore: show_values(ask.keep)) d.addCallback(self.welcome_back) # On error, just return back # This doesn't seem to be getting called. # d.addErrback(lambda ignore: self.welcome_back) d.addErrback(self.welcome_back) return elif key == "X": self.queue_game.put(self.c + key + self.r + self.nl) self.queue_game.put(Boxes.alert("Proxy done.", base="green")) self.observer.load(self.save) self.save = None # It isn't running (NOW), so don't try to stop it. # self.keepalive.stop() self.keepalive = None # Ok, this is a HORRIBLE idea, because the prompt might be # outdated. # self.queue_game.put(self.prompt) self.prompt = None # Possibly: Send '\r' to re-display the prompt # instead of displaying the original one. self.game.queue_player.put("d") # Were we asked to do something when we were done here? if self.defer: reactor.CallLater(0, self.defer.callback) # self.defer.callback() self.defer = None return self.keepalive.start(30, True) self.menu() def activate_scripts_menu(self): self.observer.disconnect("player", self.player) self.observer.connect("player", self.scripts_player) self.scripts_menu() def deactivate_scripts_menu(self, *_): self.observer.disconnect("player", self.scripts_player) self.observer.connect("player", self.player) self.welcome_back() def scripts_menu(self, *_): c1 = merge(Style.BRIGHT + Fore.CYAN) c2 = merge(Style.NORMAL + Fore.CYAN) box = Boxes(40, color=c1) self.queue_game.put(box.top()) self.queue_game.put(box.row(c1 + "{0:^40}".format("Scripts"))) self.queue_game.put(box.middle()) def menu_item(ch, desc): row = " {0}{1} {2}-{3} {4:35}".format(c1, ch, c2, c1, desc) # self.queue_game.put( # " " + c1 + ch + c2 + " - " + c1 + desc + self.nl # ) self.queue_game.put(box.row(row)) menu_item("1", "Ports (Trades between two sectors)") menu_item("2", "Explore (Strange new sectors)") menu_item("3", "Space... the final frontier...") menu_item("X", "eXit") self.queue_game.put(box.bottom()) self.queue_game.put(" " + c1 + "-=>" + self.r + " ") def scripts_player(self, chunk: bytes): """ Data from player (in bytes). """ chunk = chunk.decode("latin-1", "ignore") key = chunk.upper() if key == '1': self.queue_game.put(self.c + key + self.r + self.nl) # Activate this magical event here ports = ScriptPort(self.game) d = ports.whenDone() # d.addCallback(self.scripts_menu) # d.addErrback(self.scripts_menu) d.addCallback(self.deactivate_scripts_menu) d.addErrback(self.deactivate_scripts_menu) return elif key == '2': self.queue_game.put(self.c + key + self.r + self.nl) explore = ScriptExplore(self.game) d = explore.whenDone() d.addCallback(self.deactivate_scripts_menu) d.addErrback(self.deactivate_scripts_menu) return elif key == '3': self.queue_game.put(self.c + key + self.r + self.nl) space = ScriptSpace(self.game) d = space.whenDone() d.addCallback(self.deactivate_scripts_menu) d.addErrback(self.deactivate_scripts_menu) return elif key == 'X': self.queue_game.put(self.c + key + self.r + self.nl) self.deactivate_scripts_menu() return else: self.queue_game.put(self.c + "?" + self.r + self.nl) self.scripts_menu() def welcome_back(self, *_): log.debug("welcome_back") self.keepalive.start(30, True) self.menu()