1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030103110321033103410351036103710381039104010411042104310441045104610471048104910501051105210531054105510561057105810591060106110621063106410651066106710681069107010711072107310741075107610771078107910801081108210831084 |
- from twisted.internet import reactor
- from twisted.internet import task
- from twisted.internet import defer
- from colorama import Fore, Back, Style
- from twisted.python import log
- from itertools import cycle
- import pendulum
- from pprint import pformat
- def merge(color_string):
- """ Given a string of colorama ANSI, merge them if you can. """
- return color_string.replace("m\x1b[", ";")
- class PlayerInput(object):
- """ Player Input
- Example:
- from flexible import PlayerInput
- ask = PlayerInput(self.game)
- # abort_blank means, if the input field is blank, abort. Use error_back.
- d = ask.prompt("What is your quest?", 40, name="quest", abort_blank=True)
- # Display the user's input / but not needed.
- d.addCallback(ask.output)
- d.addCallback(
- lambda ignore: ask.prompt(
- "What is your favorite color?", 10, name="color"
- )
- )
- d.addCallback(ask.output)
- d.addCallback(
- lambda ignore: ask.prompt(
- "What is your least favorite number?",
- 12,
- name="number",
- digits=True,
- )
- )
- d.addCallback(ask.output)
- def show_values(show):
- log.msg(show)
- self.queue_game.put(pformat(show).replace("\n", "\n\r") + self.nl)
- d.addCallback(lambda ignore: show_values(ask.keep))
- d.addCallback(self.welcome_back)
- # On error, just return back
- d.addErrback(self.welcome_back)
- """
- def __init__(self, game):
- # I think game gives us access to everything we need
- self.game = game
- self.observer = self.game.observer
- self.save = None
- self.deferred = None
- self.queue_game = game.queue_game
- self.keep = {}
- # default colors
- self.c = merge(Style.BRIGHT + Fore.WHITE + Back.BLUE)
- self.cp = merge(Style.BRIGHT + Fore.YELLOW + Back.BLUE)
- # useful consts
- self.r = Style.RESET_ALL
- self.nl = "\n\r"
- self.bsb = "\b \b"
- self.keepalive = None
- def color(self, c):
- self.c = c
- def colorp(self, cp):
- self.cp = cp
- def alive(self):
- log.msg("PlayerInput.alive()")
- self.game.queue_player.put(" ")
- def prompt(self, user_prompt, limit, **kw):
- """ Generate prompt for user input.
- Note: This returns deferred.
- prompt = text displayed.
- limit = # of characters allowed.
- default = (text to default to)
- keywords:
- abort_blank : Abort if they give us blank text.
- name : Stores the input in self.keep dict.
- digits : Only allow 0-9 to be entered.
- """
- log.msg("PlayerInput({0}, {1}, {2}".format(user_prompt, limit, kw))
- self.limit = limit
- self.input = ""
- self.kw = kw
- assert self.save is None
- assert self.keepalive is None
- # Note: This clears out the server "keep alive"
- self.save = self.observer.save()
- self.observer.connect("player", self.get_input)
- self.keepalive = task.LoopingCall(self.alive)
- self.keepalive.start(30)
- # We need to "hide" the game output.
- # Otherwise it WITH mess up the user input display.
- self.to_player = self.game.to_player
- self.game.to_player = False
- # Display prompt
- # self.queue_game.put(self.r + self.nl + self.c + user_prompt + " " + self.cp)
- self.queue_game.put(self.r + self.c + user_prompt + self.r + " " + self.cp)
- # Set "Background of prompt"
- self.queue_game.put(" " * limit + "\b" * limit)
- assert self.deferred is None
- d = defer.Deferred()
- self.deferred = d
- log.msg("Return deferred ...", self.deferred)
- return d
- def get_input(self, chunk):
- """ Data from player (in bytes) """
- chunk = chunk.decode("utf-8", "ignore")
- for ch in chunk:
- if ch == "\b":
- if len(self.input) > 0:
- self.queue_game.put(self.bsb)
- self.input = self.input[0:-1]
- else:
- self.queue_game.put("\a")
- elif ch == "\r":
- self.queue_game.put(self.r + self.nl)
- log.msg("Restore observer dispatch", self.save)
- assert not self.save is None
- self.observer.load(self.save)
- self.save = None
- log.msg("Disable keepalive")
- self.keepalive.stop()
- self.keepalive = None
- line = self.input
- self.input = ""
- assert not self.deferred is None
- self.game.to_player = self.to_player
- # If they gave us the keyword name, save the value as that name
- if "name" in self.kw:
- self.keep[self.kw["name"]] = line
- if "abort_blank" in self.kw and self.kw["abort_blank"]:
- # Abort on blank input
- if line.strip() == "":
- # Yes, input is blank, abort.
- log.msg("errback, abort_blank")
- reactor.callLater(
- 0, self.deferred.errback, Exception("abort_blank")
- )
- self.deferred = None
- return
- # Ok, use deferred.callback, or reactor.callLater?
- # self.deferred.callback(line)
- reactor.callLater(0, self.deferred.callback, line)
- self.deferred = None
- return
- elif ch.isprintable():
- # Printable, but is it acceptable?
- if "digits" in self.kw:
- if not ch.isdigit():
- self.queue_game.put("\a")
- continue
- if len(self.input) + 1 <= self.limit:
- self.input += ch
- self.queue_game.put(ch)
- else:
- self.queue_game.put("\a")
- def output(self, line):
- """ A default display of what they just input. """
- log.msg("PlayerInput.output({0})".format(line))
- self.game.queue_game.put(self.r + "[{0}]".format(line) + self.nl)
- return line
- PORT_CLASSES = {
- 1: "BBS",
- 2: "BSB",
- 3: "SBB",
- 4: "SSB",
- 5: "SBS",
- 6: "BSS",
- 7: "SSS",
- 8: "BBB",
- }
- CLASSES_PORT = {v: k for k, v in PORT_CLASSES.items()}
- import re
- class CIMWarpReport(object):
- def __init__(self, game):
- self.game = game
- self.queue_game = game.queue_game
- self.queue_player = game.queue_player
- self.observer = game.observer
- # Yes, at this point we would activate
- self.prompt = game.buffer
- self.save = self.observer.save()
- # I actually don't want the player input, but I'll grab it anyway.
- self.observer.connect("player", self.player)
- self.observer.connect("prompt", self.game_prompt)
- self.observer.connect("game-line", self.game_line)
- # If we want it, it's here.
- self.defer = None
- self.to_player = self.game.to_player
- # Hide what's happening from the player
- self.game.to_player = False
- self.queue_player.put("^") # Activate CIM
- self.state = 1
- self.warpdata = {}
- self.warpcycle = cycle(["/", "-", "\\", "|"])
- def game_prompt(self, prompt):
- if prompt == ": ":
- if self.state == 1:
- # Ok, then we're ready to request the port report
- self.warpcycle = cycle(["/", "-", "\\", "|"])
- self.queue_player.put("I")
- self.state = 2
- elif self.state == 2:
- self.queue_player.put("Q")
- self.state = 3
- if re.match(r"Command \[TL=.* \(\?=Help\)\? :", prompt):
- if self.state == 3:
- # Ok, time to exit
- # exit from this...
- self.game.to_player = self.to_player
- self.observer.load(self.save)
- self.save = None
- self.game.warpdata = self.warpdata
- self.queue_game.put("\b \b\r\n")
- if not self.defer is None:
- self.defer.callback(self.warpdata)
- self.defer = None
- def game_line(self, line):
- if line == "" or line == ": ":
- return
- if line == ": ENDINTERROG":
- return
- # This should be the CIM Report Data -- parse it
- if self.warpcycle:
- if len(self.warpdata) % 10 == 0:
- self.queue_game.put("\b" + next(self.warpcycle))
- work = line.strip()
- parts = re.split(r"(?<=\d)\s", work)
- parts = [int(x) for x in parts]
- sector = parts.pop(0)
- # tuples are nicer on memory, and the warpdata map isn't going to be changing.
- self.warpdata[sector] = tuple(parts)
- def __del__(self):
- log.msg("CIMWarpReport {0} RIP".format(self))
- def whenDone(self):
- self.defer = defer.Deferred()
- # Call this to chain something after we exit.
- return self.defer
- def player(self, chunk):
- """ Data from player (in bytes). """
- chunk = chunk.decode("utf-8", "ignore")
- key = chunk.upper()
- log.msg("CIMWarpReport.player({0}) : I AM stopping...".format(key))
- # Stop the keepalive if we are activating something else
- # or leaving...
- # self.keepalive.stop()
- self.queue_game.put("\b \b\r\n")
- if not self.defer is None:
- # We have something, so:
- self.game.to_player = self.to_player
- self.observer.load(self.save)
- self.save = None
- self.defer.errback(Exception("User Abort"))
- self.defer = None
- else:
- # Still "exit" out.
- self.game.to_player = self.to_player
- self.observer.load(self.save)
- class CIMPortReport(object):
- """ Parse data from CIM Port Report
- Example:
- from flexible import CIMPortReport
- report = CIMPortReport(self.game)
- d = report.whenDone()
- d.addCallback(self.port_report)
- d.addErrback(self.welcome_back)
- def port_report(self, portdata):
- self.portdata = portdata
- self.queue_game.put("Loaded {0} records.".format(len(portdata)) + self.nl)
- self.welcome_back()
- def welcome_back(self,*_):
- ... restore keep alive timers, etc.
- """
- def __init__(self, game):
- self.game = game
- self.queue_game = game.queue_game
- self.queue_player = game.queue_player
- self.observer = game.observer
- # Yes, at this point we would activate
- self.prompt = game.buffer
- self.save = self.observer.save()
- # I actually don't want the player input, but I'll grab it anyway.
- self.observer.connect("player", self.player)
- self.observer.connect("prompt", self.game_prompt)
- self.observer.connect("game-line", self.game_line)
- # If we want it, it's here.
- self.defer = None
- self.to_player = self.game.to_player
- log.msg("to_player (stored)", self.to_player)
- # Hide what's happening from the player
- self.game.to_player = False
- self.queue_player.put("^") # Activate CIM
- self.state = 1
- self.portdata = {}
- self.portcycle = cycle(["/", "-", "\\", "|"])
- def game_prompt(self, prompt):
- if prompt == ": ":
- if self.state == 1:
- # Ok, then we're ready to request the port report
- self.portcycle = cycle(["/", "-", "\\", "|"])
- self.queue_player.put("R")
- self.state = 2
- elif self.state == 2:
- self.queue_player.put("Q")
- self.state = 3
- if re.match(r"Command \[TL=.* \(\?=Help\)\? :", prompt):
- if self.state == 3:
- # Ok, time to exit
- # exit from this...
- self.game.to_player = self.to_player
- self.observer.load(self.save)
- self.save = None
- self.game.portdata = self.portdata
- self.queue_game.put("\b \b\r\n")
- if not self.defer is None:
- self.defer.callback(self.portdata)
- self.defer = None
- def game_line(self, line):
- if line == "" or line == ": ":
- return
- if line == ": ENDINTERROG":
- return
- # This should be the CIM Report Data -- parse it
- if self.portcycle:
- if len(self.portdata) % 10 == 0:
- self.queue_game.put("\b" + next(self.portcycle))
- work = line.replace("%", "")
- parts = re.split(r"(?<=\d)\s", work)
- if len(parts) == 8:
- port = int(parts[0].strip())
- data = dict()
- def portBS(info):
- if info[0] == "-":
- bs = "B"
- else:
- bs = "S"
- return (bs, int(info[1:].strip()))
- data["fuel"] = dict()
- data["fuel"]["sale"], data["fuel"]["units"] = portBS(parts[1])
- data["fuel"]["pct"] = int(parts[2].strip())
- data["org"] = dict()
- data["org"]["sale"], data["org"]["units"] = portBS(parts[3])
- data["org"]["pct"] = int(parts[4].strip())
- data["equ"] = dict()
- data["equ"]["sale"], data["equ"]["units"] = portBS(parts[5])
- data["equ"]["pct"] = int(parts[6].strip())
- # Store what this port is buying/selling
- data["port"] = (
- data["fuel"]["sale"] + data["org"]["sale"] + data["equ"]["sale"]
- )
- # Convert BBS/SBB to Class number 1-8
- data["class"] = CLASSES_PORT[data["port"]]
- self.portdata[port] = data
- else:
- log.msg("CIMPortReport:", line, "???")
- def __del__(self):
- log.msg("CIMPortReport {0} RIP".format(self))
- def whenDone(self):
- self.defer = defer.Deferred()
- # Call this to chain something after we exit.
- return self.defer
- def player(self, chunk):
- """ Data from player (in bytes). """
- chunk = chunk.decode("utf-8", "ignore")
- key = chunk.upper()
- log.msg("CIMPortReport.player({0}) : I AM stopping...".format(key))
- # Stop the keepalive if we are activating something else
- # or leaving...
- # self.keepalive.stop()
- self.queue_game.put("\b \b\r\n")
- if not self.defer is None:
- # We have something, so:
- self.game.to_player = self.to_player
- self.observer.load(self.save)
- self.save = None
- self.defer.errback(Exception("User Abort"))
- self.defer = None
- else:
- # Still "exit" out.
- self.game.to_player = self.to_player
- self.observer.load(self.save)
- class ScriptPort(object):
- """ Performs the Port script. """
- def __init__(self, game):
- self.game = game
- self.queue_game = game.queue_game
- self.queue_player = game.queue_player
- self.observer = game.observer
- self.r = Style.RESET_ALL
- self.nl = "\n\r"
- self.this_sector = None # Starting sector
- self.sector1 = None # Current Sector
- self.sector2 = None # Next Sector Stop
- self.percent = 5 # Stick with the good default.
- self.credits = 0
- # Activate
- self.prompt = game.buffer
- self.save = self.observer.save()
- self.observer.connect('player', self.player)
- self.observer.connect("prompt", self.game_prompt)
- self.observer.connect("game-line", self.game_line)
- self.defer = None
- self.queue_game.put(
- self.nl + "Script based on: Port Pair Trading v2.00" + self.r + self.nl
- )
- self.possible_sectors = None
- self.state = 1
- self.queue_player.put("D")
- # Original, send 'D' to display current sector.
- # We could get the sector number from the self.prompt string -- HOWEVER:
- # IF! We send 'D', we can also get the sectors around -- we might not even need to
- # prompt for sector to trade with (we could possibly figure it out ourselves).
- # [Command [TL=00:00:00]:[967] (?=Help)? : D]
- # [<Re-Display>]
- # []
- # [Sector : 967 in uncharted space.]
- # [Planets : (M) Into the Darkness]
- # [Warps to Sector(s) : 397 - (562) - (639)]
- # []
- def whenDone(self):
- self.defer = defer.Deferred()
- # Call this to chain something after we exit.
- return self.defer
- def deactivate(self):
- self.state = 0
- log.msg("ScriptPort.deactivate")
- assert(not self.save is None)
- self.observer.load(self.save)
- self.save = None
- if self.defer:
- self.defer.callback('done')
- self.defer = None
- def player(self, chunk: bytes):
- # If we receive anything -- ABORT!
- self.deactivate()
- def game_prompt(self, prompt: str):
- log.msg("{0} : {1}".format(self.state, prompt))
- if self.state == 3:
- log.msg("game_prompt: ", prompt)
- if re.match(r"Command \[TL=.* \(\?=Help\)\? :", prompt):
- self.state = 4
- log.msg("Ok, state 4")
- if self.sector2 is None:
- # Ok, we need to prompt for this.
- self.queue_game.put(self.r + self.nl + "Which sector to trade with? {0} ({1})".format(self.this_sector, self.game.portdata[self.this_sector]['port']) + self.nl + Fore.CYAN)
- for i, p in enumerate(self.possible):
- self.queue_game.put(" " + str(i + 1) + " : " + str(p) + " (" + self.game.portdata[p]['port'] + ")" + self.nl)
-
- pi = PlayerInput(self.game)
- def got_need1(*_):
- log.msg("Ok, I have:", pi.keep)
- if pi.keep['count'].strip() == '':
- self.deactivate()
- return
- self.times_left = int(pi.keep['count'])
- if pi.keep['choice'].strip() == '':
- self.deactivate()
- return
- c = int(pi.keep['choice']) -1
- if c < 0 or c >= len(self.possible):
- self.deactivate()
- return
- self.sector2 = self.possible[int(pi.keep['choice']) -1]
- # self.queue_game.put(pformat(pi.keep).replace("\n", "\n\r"))
- self.state = 5
- self.trade()
- d = pi.prompt("Choose -=>", 5, name='choice', digits=True)
- d.addCallback(lambda ignore: pi.prompt("Times to execute script:", 5, name='count', digits=True))
- d.addCallback(got_need1)
- else:
- # We already have our target port, so...
- self.queue_game.put(self.r + self.nl + "Trading from {0} ({1}) to default {2} ({3}).".format(
- self.this_sector,
- self.game.portdata[self.this_sector]['port'],
- self.sector2, self.game.portdata[self.sector2]['port']) + self.nl
- )
- pi = PlayerInput(self.game)
- def got_need2(*_):
- if pi.keep['count'].strip() == '':
- self.deactivate()
- return
- self.times_left = int(pi.keep['count'])
- log.msg("Ok, I have:", pi.keep)
- self.queue_game.put(pformat(pi.keep).replace("\n", "\n\r"))
- self.state = 5
- self.trade()
- self.queue_game.put(self.r + self.nl)
- d = pi.prompt("Times to execute script", 5, name='count')
- d.addCallback(got_need2)
-
- elif self.state == 7:
- if re.match(r"Command \[TL=.* \(\?=Help\)\? :", prompt):
- # Done
- if self.this_sector == self.sector1:
- self.this_sector = self.sector2
- self.queue_player.put("{0}\r".format(self.sector2))
- self.state = 10
- else:
- self.times_left -= 1
- if self.times_left <= 0:
- # Ok, exit out
- self.deactivate()
- return
- self.this_sector = self.sector1
- self.queue_player.put("{0}\r".format(self.sector1))
- self.state = 10
- elif self.state == 8:
- # What are we trading
- # How many holds of Equipment do you want to buy [75]?
- if re.match(r"How many holds of .+ do you want to buy \[\d+\]\?", prompt):
- parts = prompt.split()
- trade_type = parts[4]
- if trade_type == 'Fuel':
- if (self.tpc in (5,7)) and (self.opc in (2,3,4,8)):
- # Can buy equipment - fuel ore is worthless.
- self.queue_player.put("0\r")
- return
- if (self.tpc in(4,7)) and (self.opc in (1,3,5,8)):
- # Can buy organics - fuel ore is worthless.
- self.queue_player.put("0\r")
- return
- if (self.tpc in (4,7,3,5)) and (self.opc in (3,4,5,7)):
- # No point in buying fuel ore if it can't be sold.
- self.queue_player.put("0\r")
- return
- elif trade_type == 'Organics':
- if (self.tpc in (6,7)) and (self.opc in (2,3,4,8)):
- # Can buy equipment - organics is worthless.
- self.queue_player.put("0\r")
- return
- if (self.tpc in (2,4,6,7)) and (self.opc in (2,4,6,7)):
- # No point in buying organics if it can't be sold.
- self.queue_player.put("0\r")
- return
- elif trade_type == 'Equipment':
- if (self.opc in (1,5,6,7)):
- # No point in buying equipment if it can't be sold.
- self.queue_player.put("0\r")
- return
- self.queue_player.put("\r")
- elif re.match(r"Command \[TL=.* \(\?=Help\)\? :", prompt):
- # Done
- if self.this_sector == self.sector1:
- self.this_sector = self.sector2
- self.queue_player.put("{0}\r".format(self.sector2))
- self.state = 10
- else:
- self.times_left -= 1
- if self.times_left <= 0:
- # Ok, exit out
- self.deactivate()
- return
- self.this_sector = self.sector1
- self.queue_player.put("{0}\r".format(self.sector1))
- self.state = 10
-
- def trade(self, *_):
- # state 5
- log.msg("trade!")
- self.queue_player.put("pt") # Port Trade
- self.this_port = self.game.portdata[self.this_sector]
- if self.this_sector == self.sector1:
- self.other_port = self.game.portdata[self.sector2]
- else:
- self.other_port = self.game.portdata[self.sector1]
- # Ok, perform some calculations
- self.tpc = self.this_port['class']
- self.opc = self.other_port['class']
- # [ Items Status Trading % of max OnBoard]
- # [ ----- ------ ------- -------- -------]
- # [Fuel Ore Selling 2573 93% 0]
- # [Organics Buying 2960 100% 0]
- # [Equipment Buying 1958 86% 0]
- # []
- # []
- # [You have 1,000 credits and 20 empty cargo holds.]
- # []
- # [We are selling up to 2573. You have 0 in your holds.]
- # [How many holds of Fuel Ore do you want to buy [20]? 0]
- pass
- def game_line(self, line: str):
- if self.state == 1:
- # First exploration
- if line.startswith("Sector :"):
- # We have starting sector information
- parts = re.split("\s+", line)
- self.this_sector = int(parts[2])
- # These will be the ones swapped around as we trade back and forth.
- self.sector1 = self.this_sector
- elif line.startswith("Warps to Sector(s) : "):
- # Warps to Sector(s) : 397 - (562) - (639)
- _, _, warps = line.partition(':')
- warps = warps.replace('-', '').replace('(', '').replace(')', '').strip()
- log.msg("Warps: [{0}]".format(warps))
- self.warps = [ int(x) for x in re.split("\s+", warps)]
- log.msg("Warps: [{0}]".format(self.warps))
- self.state = 2
- elif self.state == 2:
- if line == "":
- # Ok, we're done
- self.state = 3
- # Check to see if we have information on any possible ports
- if hasattr(self.game, 'portdata'):
- if not self.this_sector in self.game.portdata:
- self.state = 0
- log.msg("Current sector {0} not in portdata.".format(self.this_sector))
- self.queue_game.put(self.r + self.nl + "I can't find the current sector in the portdata." + self.nl)
- self.deactivate()
- return
- else:
- # Ok, we are in the portdata
- def port_burnt(port):
- if port['equ']['pct'] <= 20 or port['fuel']['pct'] <= 20 or port['org']['pct'] <= 20:
- return True
- return False
- pd = self.game.portdata[self.this_sector]
- if port_burnt(pd):
- log.msg("Current sector {0} port is burnt (<= 20%).".format(self.this_sector))
- self.queue_game.put(self.r + self.nl + "Current sector port is burnt out. <= 20%." + self.nl)
- self.deactivate()
- return
- possible = [ x for x in self.warps if x in self.game.portdata ]
- log.msg("Ppossible:", possible)
- if len(possible) == 0:
- self.state = 0
- self.queue_game.put(self.r + self.nl + "I don't see any ports in [{0}].".format(self.warps) + self.nl)
- self.deactivate()
- return
- possible = [ x for x in possible if not port_burnt(self.game.portdata[x]) ]
- log.msg("Possible:", possible)
- self.possible = possible
- if len(possible) == 0:
- self.state = 0
- self.queue_game.put(self.r + self.nl + "I don't see any unburnt ports in [{0}].".format(self.warps) + self.nl)
- self.deactivate()
- return
- elif len(possible) == 1:
- # Ok! there's only one!
- self.sector2 = possible[0]
-
- # Display possible ports:
- # spos = [ str(x) for x in possible]
- # self.queue_game.put(self.r + self.nl + self.nl.join(spos) + self.nl)
- # At state 3, we only get a prompt.
- return
- else:
- self.state = 0
- log.msg("We don't have any portdata!")
- self.queue_game.put(self.r + self.nl + "I have no portdata. Please run CIM Port Report." + self.nl)
- self.deactivate()
- return
- elif self.state == 5:
- if "-----" in line:
- self.state = 6
- elif self.state == 6:
- if "We are buying up to" in line:
- # Sell
- self.state = 7
- self.queue_player.put("\r")
- self.sell_perc = 100 + self.percent
- if "We are selling up to" in line:
- # Buy
- self.state = 8
- self.sell_perc = 100 - self.percent
- if "You don't have anything they want" in line:
- # Neither! DRAT!
- self.deactivate()
- return
- if "We're not interested." in line:
- log.msg("Try, try again. :(")
- self.state = 5
- self.trade()
- elif self.state == 7:
- # Haggle Sell
- if "We'll buy them for" in line or "Our final offer" in line:
- if "Our final offer" in line:
- self.sell_perc -= 1
- parts = line.replace(',', '').split()
- start_price = int(parts[4])
- price = start_price * self.sell_perc // 100
- log.msg("start: {0} % {1} price {2}".format(start_price, self.sell_perc, price))
- self.sell_perc -= 1
- self.queue_player.put("{0}\r".format(price))
- if "We are selling up to" in line:
- # Buy
- self.state = 8
- self.sell_perc = 100 - self.percent
- if line.startswith("You have ") and 'credits and' in line:
- parts = line.replace(',', '').split()
- credits = int(parts[2])
- if self.credits == 0:
- self.credits = credits
- else:
- if credits <= self.credits:
- log.msg("We don't appear to be making any money here {0}.".format(credits))
- self.deactivate()
- return
- if "We're not interested." in line:
- log.msg("Try, try again. :(")
- self.state = 5
- self.trade()
- elif self.state == 8:
- # Haggle Buy
- if "We'll sell them for" in line or "Our final offer" in line:
- if "Our final offer" in line:
- self.sell_perc += 1
- parts = line.replace(',', '').split()
- start_price = int(parts[4])
- price = start_price * self.sell_perc // 100
- log.msg("start: {0} % {1} price {2}".format(start_price, self.sell_perc, price))
- self.sell_perc += 1
- self.queue_player.put("{0}\r".format(price))
- if "We're not interested." in line:
- log.msg("Try, try again. :(")
- self.state = 5
- self.trade()
- elif self.state == 10:
- if "Sector : " in line:
- # Trade
- self.state = 5
- reactor.callLater(0, self.trade, 0)
- # self.trade()
- # elif self.state == 3:
- # log.msg("At state 3 [{0}]".format(line))
- # self.queue_game.put("At state 3.")
- # self.deactivate()
- # return
- class ProxyMenu(object):
- """ Display ProxyMenu
-
- Example:
- from flexible import ProxyMenu
- if re.match(r"Command \[TL=.* \(\?=Help\)\? :", prompt):
- menu = ProxyMenu(self.game)
- """
- def __init__(self, game):
- self.nl = "\n\r"
- self.c = merge(Style.BRIGHT + Fore.YELLOW + Back.BLUE)
- self.r = Style.RESET_ALL
- self.c1 = merge(Style.BRIGHT + Fore.BLUE)
- self.c2 = merge(Style.NORMAL + Fore.CYAN)
- self.portdata = None
- self.game = game
- self.queue_game = game.queue_game
- self.observer = game.observer
- if hasattr(self.game, "portdata"):
- self.portdata = self.game.portdata
- else:
- self.portdata = {}
- # Yes, at this point we would activate
- self.prompt = game.buffer
- self.save = self.observer.save()
- self.observer.connect("player", self.player)
- # If we want it, it's here.
- self.defer = None
- self.keepalive = task.LoopingCall(self.awake)
- self.keepalive.start(30)
- self.menu()
- def __del__(self):
- log.msg("ProxyMenu {0} RIP".format(self))
- def whenDone(self):
- self.defer = defer.Deferred()
- # Call this to chain something after we exit.
- return self.defer
- def menu(self):
- self.queue_game.put(
- self.nl + self.c + "TradeWars Proxy active." + self.r + self.nl
- )
- def menu_item(ch: str, desc: str):
- self.queue_game.put(
- " " + self.c1 + ch + self.c2 + " - " + self.c1 + desc + self.nl
- )
- menu_item("D", "Diagnostics")
- menu_item("Q", "Quest")
- menu_item("T", "Display current Time")
- menu_item("P", "Port CIM Report")
- menu_item("S", "Scripts")
- menu_item("W", "Warp CIM Report")
- menu_item("X", "eXit")
- self.queue_game.put(" " + self.c + "-=>" + self.r + " ")
- def awake(self):
- log.msg("ProxyMenu.awake()")
- self.game.queue_player.put(" ")
- def port_report(self, portdata: dict):
- self.portdata = portdata
- self.queue_game.put("Loaded {0} records.".format(len(portdata)) + self.nl)
- self.welcome_back()
- def warp_report(self, warpdata: dict):
- self.warpdata = warpdata
- self.queue_game.put("Loaded {0} records.".format(len(warpdata)) + self.nl)
- self.welcome_back()
- def player(self, chunk: bytes):
- """ Data from player (in bytes). """
- chunk = chunk.decode("utf-8", "ignore")
- key = chunk.upper()
- log.msg("ProxyMenu.player({0})".format(key))
- # Stop the keepalive if we are activating something else
- # or leaving...
- self.keepalive.stop()
- if key == "T":
- self.queue_game.put(self.c + key + self.r + self.nl)
- # perform T option
- now = pendulum.now()
- self.queue_game.put(
- self.nl + self.c1 + "Current time " + now.to_datetime_string() + self.nl
- )
- elif key == "P":
- self.queue_game.put(self.c + key + self.r + self.nl)
- # Activate CIM Port Report
- report = CIMPortReport(self.game)
- d = report.whenDone()
- d.addCallback(self.port_report)
- d.addErrback(self.welcome_back)
- return
- elif key == "W":
- self.queue_game.put(self.c + key + self.r + self.nl)
- # Activate CIM Port Report
- report = CIMWarpReport(self.game)
- d = report.whenDone()
- d.addCallback(self.warp_report)
- d.addErrback(self.welcome_back)
- return
- elif key == "S":
- self.queue_game.put(self.c + key + self.r + self.nl)
- self.activate_scripts_menu()
- return
- elif key == "D":
- self.queue_game.put(self.c + key + self.r + self.nl)
- self.queue_game.put(pformat(self.portdata).replace("\n", "\n\r") + self.nl)
- self.queue_game.put(pformat(self.warpdata).replace("\n", "\n\r") + self.nl)
- elif key == "Q":
- self.queue_game.put(self.c + key + self.r + self.nl)
- # This is an example of chaining PlayerInput prompt calls.
- ask = PlayerInput(self.game)
- d = ask.prompt("What is your quest?", 40, name="quest", abort_blank=True)
- # Display the user's input
- d.addCallback(ask.output)
- d.addCallback(
- lambda ignore: ask.prompt(
- "What is your favorite color?", 10, name="color"
- )
- )
- d.addCallback(ask.output)
- d.addCallback(
- lambda ignore: ask.prompt(
- "What is the meaning of the squirrel?",
- 12,
- name="squirrel",
- digits=True,
- )
- )
- d.addCallback(ask.output)
- def show_values(show):
- log.msg(show)
- self.queue_game.put(pformat(show).replace("\n", "\n\r") + self.nl)
- d.addCallback(lambda ignore: show_values(ask.keep))
- d.addCallback(self.welcome_back)
- # On error, just return back
- # This doesn't seem to be getting called.
- # d.addErrback(lambda ignore: self.welcome_back)
- d.addErrback(self.welcome_back)
- return
- elif key == "X":
- self.queue_game.put(self.c + key + self.r + self.nl)
- self.queue_game.put("Proxy done." + self.nl)
- self.observer.load(self.save)
- self.save = None
- # It isn't running (NOW), so don't try to stop it.
- # self.keepalive.stop()
- self.keepalive = None
- # Ok, this is a HORRIBLE idea, because the prompt might be
- # outdated.
- # self.queue_game.put(self.prompt)
- self.prompt = None
- # Possibly: Send '\r' to re-display the prompt
- # instead of displaying the original one.
- self.game.queue_player.put("d")
- # Were we asked to do something when we were done here?
- if self.defer:
- reactor.CallLater(0, self.defer.callback)
- # self.defer.callback()
- self.defer = None
- return
- self.keepalive.start(30, True)
- self.menu()
- def activate_scripts_menu(self):
- self.observer.disconnect("player", self.player)
- self.observer.connect("player", self.scripts_player)
- self.scripts_menu()
- def deactivate_scripts_menu(self, *_):
- self.observer.disconnect("player", self.scripts_player)
- self.observer.connect("player", self.player)
- self.welcome_back()
- def scripts_menu(self, *_):
- c1 = merge(Style.BRIGHT + Fore.CYAN)
- c2 = merge(Style.NORMAL + Fore.CYAN)
- def menu_item(ch, desc):
- self.queue_game.put(
- " " + c1 + ch + c2 + " - " + c1 + desc + self.nl
- )
- menu_item("1", "Ports (Trades between two sectors)")
- menu_item("2", "TODO")
- menu_item("3", "TODO")
- menu_item("X", "eXit")
- self.queue_game.put(" " + c1 + "-=>" + self.r + " ")
- def scripts_player(self, chunk: bytes):
- """ Data from player (in bytes). """
- chunk = chunk.decode("utf-8", "ignore")
- key = chunk.upper()
-
- if key == '1':
- self.queue_game.put(self.c + key + self.r + self.nl)
- # Activate this magical event here
- ports = ScriptPort(self.game)
- d = ports.whenDone()
- # d.addCallback(self.scripts_menu)
- # d.addErrback(self.scripts_menu)
- d.addCallback(self.deactivate_scripts_menu)
- d.addErrback(self.deactivate_scripts_menu)
- return
- elif key == 'X':
- self.queue_game.put(self.c + key + self.r + self.nl)
- self.deactivate_scripts_menu()
- return
- else:
- self.queue_game.put(self.c + "?" + self.r + self.nl)
- self.scripts_menu()
- def welcome_back(self, *_):
- log.msg("welcome_back")
- self.keepalive.start(30, True)
- self.menu()
|