123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110111111121113111411151116111711181119112011211122112311241125112611271128112911301131113211331134113511361137113811391140114111421143114411451146114711481149115011511152115311541155115611571158115911601161116211631164116511661167116811691170117111721173117411751176117711781179118011811182118311841185118611871188118911901191119211931194119511961197119811991200120112021203120412051206120712081209121012111212121312141215121612171218121912201221122212231224122512261227122812291230123112321233123412351236123712381239124012411242124312441245124612471248124912501251125212531254125512561257125812591260126112621263126412651266126712681269127012711272127312741275127612771278127912801281128212831284128512861287128812891290129112921293129412951296129712981299130013011302130313041305130613071308130913101311131213131314131513161317131813191320132113221323132413251326132713281329133013311332133313341335133613371338133913401341134213431344134513461347134813491350135113521353135413551356135713581359136013611362136313641365136613671368136913701371137213731374137513761377137813791380138113821383138413851386138713881389139013911392139313941395139613971398139914001401140214031404140514061407140814091410141114121413141414151416141714181419142014211422142314241425142614271428142914301431143214331434143514361437143814391440144114421443144414451446144714481449145014511452145314541455145614571458145914601461146214631464146514661467146814691470147114721473147414751476147714781479148014811482148314841485148614871488148914901491149214931494149514961497149814991500150115021503150415051506150715081509151015111512151315141515151615171518151915201521152215231524152515261527152815291530153115321533153415351536153715381539154015411542154315441545154615471548154915501551155215531554155515561557155815591560156115621563156415651566156715681569157015711572157315741575157615771578157915801581158215831584158515861587158815891590159115921593159415951596159715981599160016011602160316041605160616071608160916101611161216131614161516161617161816191620162116221623162416251626162716281629163016311632163316341635163616371638163916401641164216431644164516461647164816491650165116521653165416551656165716581659166016611662166316641665166616671668166916701671167216731674167516761677167816791680168116821683168416851686168716881689169016911692169316941695169616971698169917001701170217031704170517061707170817091710171117121713171417151716171717181719172017211722172317241725172617271728172917301731173217331734173517361737173817391740174117421743174417451746174717481749175017511752175317541755175617571758175917601761176217631764176517661767176817691770177117721773177417751776177717781779178017811782178317841785178617871788178917901791179217931794179517961797179817991800180118021803180418051806180718081809181018111812181318141815181618171818181918201821182218231824182518261827182818291830183118321833183418351836183718381839184018411842184318441845184618471848184918501851185218531854185518561857185818591860186118621863186418651866186718681869187018711872187318741875187618771878187918801881188218831884188518861887188818891890189118921893189418951896189718981899190019011902190319041905190619071908190919101911191219131914191519161917191819191920192119221923192419251926192719281929193019311932193319341935193619371938193919401941194219431944194519461947194819491950195119521953195419551956195719581959196019611962196319641965196619671968196919701971197219731974197519761977197819791980198119821983198419851986198719881989199019911992199319941995199619971998199920002001200220032004200520062007200820092010201120122013201420152016201720182019202020212022202320242025 |
- from twisted.internet import reactor
- from twisted.internet import task
- from twisted.internet import defer
- from colorama import Fore, Back, Style
- from twisted.python import log
- from twisted.internet.task import coiterate
- from twisted.internet.defer import gatherResults
- from itertools import cycle
- import pendulum
- from pprint import pformat
- from galaxy import GameData, PORT_CLASSES, CLASSES_PORT
- from boxes import Boxes
- class SpinningCursor(object):
- """ Spinner class, that handles every so many clicks
-
- s = SpinningCursor(5) # every 5
- for x in range(10):
- if s.click():
- print(s.cycle())
- """
- def __init__(self, every=10):
- self.itercycle = cycle(["/", "-", "\\", "|"])
- self.count = 0
- self.every = every
- def reset(self):
- self.itercycle = cycle(["/", "-", "\\", "|"])
- self.count = 0
- def click(self):
- self.count += 1
- return self.count % self.every == 0
- def cycle(self):
- return next(self.itercycle)
- def merge(color_string):
- """ Given a string of colorama ANSI, merge them if you can. """
- return color_string.replace("m\x1b[", ";")
- class PlayerInput(object):
- """ Player Input
- Example:
- from flexible import PlayerInput
- ask = PlayerInput(self.game)
- # abort_blank means, if the input field is blank, abort. Use error_back.
- d = ask.prompt("What is your quest?", 40, name="quest", abort_blank=True)
- # Display the user's input / but not needed.
- d.addCallback(ask.output)
- d.addCallback(
- lambda ignore: ask.prompt(
- "What is your favorite color?", 10, name="color"
- )
- )
- d.addCallback(ask.output)
- d.addCallback(
- lambda ignore: ask.prompt(
- "What is your least favorite number?",
- 12,
- name="number",
- digits=True,
- )
- )
- d.addCallback(ask.output)
- def show_values(show):
- log.msg(show)
- self.queue_game.put(pformat(show).replace("\n", "\n\r") + self.nl)
- d.addCallback(lambda ignore: show_values(ask.keep))
- d.addCallback(self.welcome_back)
- # On error, just return back
- d.addErrback(self.welcome_back)
- """
- def __init__(self, game):
- # I think game gives us access to everything we need
- self.game = game
- self.observer = self.game.observer
- self.save = None
- self.deferred = None
- self.queue_game = game.queue_game
- self.keep = {}
- # default colors
- self.c = merge(Style.BRIGHT + Fore.WHITE + Back.BLUE)
- self.cp = merge(Style.BRIGHT + Fore.YELLOW + Back.BLUE)
- # useful consts
- self.r = Style.RESET_ALL
- self.nl = "\n\r"
- self.bsb = "\b \b"
- self.keepalive = None
- def color(self, c):
- self.c = c
- def colorp(self, cp):
- self.cp = cp
- def alive(self):
- log.msg("PlayerInput.alive()")
- self.game.queue_player.put(" ")
- def prompt(self, user_prompt, limit, **kw):
- """ Generate prompt for user input.
- Note: This returns deferred.
- prompt = text displayed.
- limit = # of characters allowed.
- default = (text to default to)
- keywords:
- abort_blank : Abort if they give us blank text.
- name : Stores the input in self.keep dict.
- digits : Only allow 0-9 to be entered.
- """
- log.msg("PlayerInput({0}, {1}, {2}".format(user_prompt, limit, kw))
- self.limit = limit
- self.input = ""
- self.kw = kw
- assert self.save is None
- assert self.keepalive is None
- # Note: This clears out the server "keep alive"
- self.save = self.observer.save()
- self.observer.connect("player", self.get_input)
- self.keepalive = task.LoopingCall(self.alive)
- self.keepalive.start(30)
- # We need to "hide" the game output.
- # Otherwise it WITH mess up the user input display.
- self.to_player = self.game.to_player
- self.game.to_player = False
- # Display prompt
- # self.queue_game.put(self.r + self.nl + self.c + user_prompt + " " + self.cp)
- self.queue_game.put(self.r + self.c + user_prompt + self.r + " " + self.cp)
- # Set "Background of prompt"
- self.queue_game.put(" " * limit + "\b" * limit)
- assert self.deferred is None
- d = defer.Deferred()
- self.deferred = d
- log.msg("Return deferred ...", self.deferred)
- return d
- def get_input(self, chunk):
- """ Data from player (in bytes) """
- chunk = chunk.decode("latin-1", "ignore")
- for ch in chunk:
- if ch == "\b":
- if len(self.input) > 0:
- self.queue_game.put(self.bsb)
- self.input = self.input[0:-1]
- else:
- self.queue_game.put("\a")
- elif ch == "\r":
- self.queue_game.put(self.r + self.nl)
- log.msg("Restore observer dispatch", self.save)
- assert not self.save is None
- self.observer.load(self.save)
- self.save = None
- log.msg("Disable keepalive")
- self.keepalive.stop()
- self.keepalive = None
- line = self.input
- self.input = ""
- assert not self.deferred is None
- self.game.to_player = self.to_player
- # If they gave us the keyword name, save the value as that name
- if "name" in self.kw:
- self.keep[self.kw["name"]] = line
- if "abort_blank" in self.kw and self.kw["abort_blank"]:
- # Abort on blank input
- if line.strip() == "":
- # Yes, input is blank, abort.
- log.msg("errback, abort_blank")
- reactor.callLater(
- 0, self.deferred.errback, Exception("abort_blank")
- )
- self.deferred = None
- return
- # Ok, use deferred.callback, or reactor.callLater?
- # self.deferred.callback(line)
- reactor.callLater(0, self.deferred.callback, line)
- self.deferred = None
- return
- elif ch.isprintable():
- # Printable, but is it acceptable?
- if "digits" in self.kw:
- if not ch.isdigit():
- self.queue_game.put("\a")
- continue
- if len(self.input) + 1 <= self.limit:
- self.input += ch
- self.queue_game.put(ch)
- else:
- self.queue_game.put("\a")
- def output(self, line):
- """ A default display of what they just input. """
- log.msg("PlayerInput.output({0})".format(line))
- self.game.queue_game.put(self.r + "[{0}]".format(line) + self.nl)
- return line
- import re
- # The CIMWarpReport -- is only needed if the json file gets damaged in some way.
- # or needs to be reset. The warps should automatically update themselves now.
- class CIMWarpReport(object):
- def __init__(self, game):
- self.game = game
- self.queue_game = game.queue_game
- self.queue_player = game.queue_player
- self.observer = game.observer
- # Yes, at this point we would activate
- self.prompt = game.buffer
- self.save = self.observer.save()
- # I actually don't want the player input, but I'll grab it anyway.
- self.observer.connect("player", self.player)
- self.observer.connect("prompt", self.game_prompt)
- self.observer.connect("game-line", self.game_line)
- # If we want it, it's here.
- self.defer = None
- self.to_player = self.game.to_player
- # Hide what's happening from the player
- self.game.to_player = False
- self.queue_player.put("^") # Activate CIM
- self.state = 1
- # self.warpdata = {}
- self.warpcycle = SpinningCursor()
- def game_prompt(self, prompt):
- if prompt == ": ":
- if self.state == 1:
- # Ok, then we're ready to request the port report
- self.warpcycle.reset()
- self.queue_player.put("I")
- self.state = 2
- elif self.state == 2:
- self.queue_player.put("Q")
- self.state = 3
- if re.match(r"Command \[TL=.* \(\?=Help\)\? :", prompt):
- if self.state == 3:
- # Ok, time to exit
- # exit from this...
- self.game.to_player = self.to_player
- self.observer.load(self.save)
- self.save = None
- # self.game.warpdata = self.warpdata
- self.queue_game.put("\b \b\r\n")
- if not self.defer is None:
- self.defer.callback(self.game.gamedata.warps)
- self.defer = None
- def game_line(self, line):
- if line == "" or line == ": ":
- return
- if line == ": ENDINTERROG":
- return
- if line.startswith('Command [TL='):
- return
- # This should be the CIM Report Data -- parse it
- if self.warpcycle:
- if self.warpcycle.click():
- self.queue_game.put("\b" + self.warpcycle.cycle())
- work = line.strip()
- parts = re.split(r"(?<=\d)\s", work)
- parts = [int(x) for x in parts]
- sector = parts.pop(0)
- # tuples are nicer on memory, and the warpdata map isn't going to be changing.
- # self.warpdata[sector] = tuple(parts)
- self.game.gamedata.warp_to(sector, *parts)
- def __del__(self):
- log.msg("CIMWarpReport {0} RIP".format(self))
- def whenDone(self):
- self.defer = defer.Deferred()
- # Call this to chain something after we exit.
- return self.defer
- def player(self, chunk):
- """ Data from player (in bytes). """
- chunk = chunk.decode("latin-1", "ignore")
- key = chunk.upper()
- log.msg("CIMWarpReport.player({0}) : I AM stopping...".format(key))
- # Stop the keepalive if we are activating something else
- # or leaving...
- # self.keepalive.stop()
- self.queue_game.put("\b \b\r\n")
- if not self.defer is None:
- # We have something, so:
- self.game.to_player = self.to_player
- self.observer.load(self.save)
- self.save = None
- self.defer.errback(Exception("User Abort"))
- self.defer = None
- else:
- # Still "exit" out.
- self.game.to_player = self.to_player
- self.observer.load(self.save)
- # the CIMPortReport will still be needed.
- # We can't get fresh report data (that changes) any other way.
- class CIMPortReport(object):
- """ Parse data from CIM Port Report
- Example:
- from flexible import CIMPortReport
- report = CIMPortReport(self.game)
- d = report.whenDone()
- d.addCallback(self.port_report)
- d.addErrback(self.welcome_back)
- def port_report(self, portdata):
- self.portdata = portdata
- self.queue_game.put("Loaded {0} records.".format(len(portdata)) + self.nl)
- self.welcome_back()
- def welcome_back(self,*_):
- ... restore keep alive timers, etc.
- """
- def __init__(self, game):
- self.game = game
- self.queue_game = game.queue_game
- self.queue_player = game.queue_player
- self.observer = game.observer
- # Yes, at this point we would activate
- self.prompt = game.buffer
- self.save = self.observer.save()
- # I actually don't want the player input, but I'll grab it anyway.
- self.observer.connect("player", self.player)
- self.observer.connect("prompt", self.game_prompt)
- self.observer.connect("game-line", self.game_line)
- # If we want it, it's here.
- self.defer = None
- self.to_player = self.game.to_player
- log.msg("to_player (stored)", self.to_player)
- # Hide what's happening from the player
- self.game.to_player = False
- self.queue_player.put("^") # Activate CIM
- self.state = 1
- # self.portdata = {}
- self.portcycle = SpinningCursor()
- def game_prompt(self, prompt):
- if prompt == ": ":
- if self.state == 1:
- # Ok, then we're ready to request the port report
- self.portcycle.reset()
- self.queue_player.put("R")
- self.state = 2
- elif self.state == 2:
- self.queue_player.put("Q")
- self.state = 3
- if re.match(r"Command \[TL=.* \(\?=Help\)\? :", prompt):
- if self.state == 3:
- # Ok, time to exit
- # exit from this...
- self.game.to_player = self.to_player
- self.observer.load(self.save)
- self.save = None
- # self.game.portdata = self.portdata
- self.queue_game.put("\b \b\r\n")
- if not self.defer is None:
- self.defer.callback(self.game.gamedata.ports)
- self.defer = None
- def game_line(self, line: str):
- if line == "" or line == ": ":
- return
- if line == ": ENDINTERROG":
- return
- # This should be the CIM Report Data -- parse it
- if self.portcycle:
- if self.portcycle.click():
- self.queue_game.put("\b" + self.portcycle.cycle())
- work = line.replace("%", "")
- parts = re.split(r"(?<=\d)\s", work)
- if len(parts) == 8:
- port = int(parts[0].strip())
- data = dict()
- def portBS(info):
- if info[0] == "-":
- bs = "B"
- else:
- bs = "S"
- return (bs, int(info[1:].strip()))
- data["fuel"] = dict()
- data["fuel"]["sale"], data["fuel"]["units"] = portBS(parts[1])
- data["fuel"]["pct"] = int(parts[2].strip())
- data["org"] = dict()
- data["org"]["sale"], data["org"]["units"] = portBS(parts[3])
- data["org"]["pct"] = int(parts[4].strip())
- data["equ"] = dict()
- data["equ"]["sale"], data["equ"]["units"] = portBS(parts[5])
- data["equ"]["pct"] = int(parts[6].strip())
- # Store what this port is buying/selling
- data["port"] = (
- data["fuel"]["sale"] + data["org"]["sale"] + data["equ"]["sale"]
- )
- # Convert BBS/SBB to Class number 1-8
- data["class"] = CLASSES_PORT[data["port"]]
- self.game.gamedata.set_port(port, data)
- # self.portdata[port] = data
- else:
- log.msg("CIMPortReport:", line, "???")
- def __del__(self):
- log.msg("CIMPortReport {0} RIP".format(self))
- def whenDone(self):
- self.defer = defer.Deferred()
- # Call this to chain something after we exit.
- return self.defer
- def player(self, chunk):
- """ Data from player (in bytes). """
- chunk = chunk.decode("latin-1", "ignore")
- key = chunk.upper()
- log.msg("CIMPortReport.player({0}) : I AM stopping...".format(key))
- # Stop the keepalive if we are activating something else
- # or leaving...
- # self.keepalive.stop()
- self.queue_game.put("\b \b\r\n")
- if not self.defer is None:
- # We have something, so:
- self.game.to_player = self.to_player
- self.observer.load(self.save)
- self.save = None
- self.defer.errback(Exception("User Abort"))
- self.defer = None
- else:
- # Still "exit" out.
- self.game.to_player = self.to_player
- self.observer.load(self.save)
- class ScriptPort(object):
- """ Performs the Port script.
-
- This is close to the original.
- We don't ask for the port to trade with --
- because that information is available to us after "D" (display).
- We look at the adjacent sectors, and see if we know any ports.
- If the ports are burnt (< 20%), we remove them from the list.
- If there's just one, we use it. Otherwise we ask them to choose.
-
- """
- def __init__(self, game):
- self.game = game
- self.queue_game = game.queue_game
- self.queue_player = game.queue_player
- self.observer = game.observer
- self.r = Style.RESET_ALL
- self.nl = "\n\r"
- self.this_sector = None # Starting sector
- self.sector1 = None # Current Sector
- self.sector2 = None # Next Sector Stop
- self.percent = 5 # Stick with the good default.
- self.credits = 0
- self.last_credits = None
- self.times_left = 0
- # Activate
- self.prompt = game.buffer
- self.save = self.observer.save()
- self.observer.connect('player', self.player)
- self.observer.connect("prompt", self.game_prompt)
- self.observer.connect("game-line", self.game_line)
- self.defer = None
- self.queue_game.put(
- self.nl + "Script based on: Port Pair Trading v2.00" + self.r + self.nl
- )
- self.possible_sectors = None
- self.state = 1
- self.queue_player.put("D")
- # Original, send 'D' to display current sector.
- # We could get the sector number from the self.prompt string -- HOWEVER:
- # IF! We send 'D', we can also get the sectors around -- we might not even need to
- # prompt for sector to trade with (we could possibly figure it out ourselves).
- # [Command [TL=00:00:00]:[967] (?=Help)? : D]
- # [<Re-Display>]
- # []
- # [Sector : 967 in uncharted space.]
- # [Planets : (M) Into the Darkness]
- # [Warps to Sector(s) : 397 - (562) - (639)]
- # []
- def whenDone(self):
- self.defer = defer.Deferred()
- # Call this to chain something after we exit.
- return self.defer
- def deactivate(self):
- self.state = 0
- log.msg("ScriptPort.deactivate ({0})".format(self.times_left))
- assert(not self.save is None)
- self.observer.load(self.save)
- self.save = None
- if self.defer:
- self.defer.callback('done')
- self.defer = None
- def player(self, chunk: bytes):
- # If we receive anything -- ABORT!
- self.deactivate()
- def game_prompt(self, prompt: str):
- log.msg("{0} : {1}".format(self.state, prompt))
- if self.state == 3:
- log.msg("game_prompt: ", prompt)
- if re.match(r"Command \[TL=.* \(\?=Help\)\? :", prompt):
- self.state = 4
- log.msg("Ok, state 4")
- if self.sector2 is None:
- # Ok, we need to prompt for this.
- self.queue_game.put(self.r + self.nl +
- "Which sector to trade with? {0}".format(GameData.port_show_part(self.this_sector, self.game.gamedata.ports[self.this_sector])) +
- self.nl)
- for i, p in enumerate(self.possible):
- self.queue_game.put(" " + Fore.CYAN + str(i + 1) + " : " + GameData.port_show_part(p, self.game.gamedata.ports[p]) + self.nl)
-
- pi = PlayerInput(self.game)
- def got_need1(*_):
- log.msg("Ok, I have:", pi.keep)
- if pi.keep['count'].strip() == '':
- self.deactivate()
- return
- self.times_left = int(pi.keep['count'])
- if pi.keep['choice'].strip() == '':
- self.deactivate()
- return
- c = int(pi.keep['choice']) -1
- if c < 0 or c >= len(self.possible):
- self.deactivate()
- return
- self.sector2 = self.possible[int(pi.keep['choice']) -1]
- # self.queue_game.put(pformat(pi.keep).replace("\n", "\n\r"))
- self.state = 5
- self.trade()
- d = pi.prompt("Choose -=>", 5, name='choice', digits=True)
- d.addCallback(lambda ignore: pi.prompt("Times to execute script:", 5, name='count', digits=True))
- d.addCallback(got_need1)
- else:
- # We already have our target port, so...
- self.queue_game.put(self.r + self.nl + "Trading from {0} ({1}) to default {2} ({3}).".format(
- self.this_sector,
- self.game.gamedata.ports[self.this_sector]['port'],
- self.sector2, self.game.gamedata.ports[self.sector2]['port']) + self.nl
- )
- self.queue_game.put(self.r + self.nl + "Trading {0}".format(
- self.game.gamedata.port_trade_show(self.this_sector,
- self.sector2)
- ))
- # The code is smart enough now, that this can't happen. :( Drat!
- if self.game.gamedata.ports[self.this_sector]['port'] == self.game.gamedata.ports[self.sector2]['port']:
- self.queue_game.put("Hey dummy! Look out the window! These ports are the same class!" + nl)
- self.deactivate()
- return
- pi = PlayerInput(self.game)
- def got_need2(*_):
- if pi.keep['count'].strip() == '':
- self.deactivate()
- return
- self.times_left = int(pi.keep['count'])
- log.msg("Ok, I have:", pi.keep)
- # self.queue_game.put(pformat(pi.keep).replace("\n", "\n\r"))
- self.state = 5
- self.trade()
- self.queue_game.put(self.r + self.nl)
- d = pi.prompt("Times to execute script", 5, name='count')
- d.addCallback(got_need2)
- elif self.state == 6:
- if re.match(r"Command \[TL=.* \(\?=Help\)\? :", prompt):
- if self.fixable:
- self.queue_game.put("Ok! Let's fix this by going to the other sector..." + self.nl)
- log.msg("Fixing...")
- if self.this_sector == self.sector1:
- self.this_sector = self.sector2
- self.queue_player.put("{0}\r".format(self.sector2))
- self.state = 5
- self.trade()
- else:
- self.this_sector = self.sector1
- self.queue_player.put("{0}\r".format(self.sector1))
- self.state = 5
- self.trade()
- elif self.state == 7:
- if re.match(r"Command \[TL=.* \(\?=Help\)\? :", prompt):
- # Done
- if self.this_sector == self.sector1:
- self.this_sector = self.sector2
- self.queue_player.put("{0}\r".format(self.sector2))
- self.state = 10
- else:
- self.times_left -= 1
- if self.times_left <= 0:
- # Ok, exit out
- self.deactivate()
- return
- if self.last_credits is None:
- self.last_credits = self.credits
- else:
- if self.credits <= self.last_credits:
- log.msg("We don't seem to be making any money here...")
- self.queue_game.put(self.r + self.nl + "We don't seem to be making any money here. I'm stopping!" + self.nl)
- self.deactivate()
- return
- self.this_sector = self.sector1
- self.queue_player.put("{0}\r".format(self.sector1))
- self.state = 10
- elif re.match(r'Your offer \[\d+\] \?', prompt):
- if self.fix_offer:
- # Make real offer / WHAT?@?!
- work = prompt.replace(',', '')
- parts = re.split(r"\s+", work)
- amount = parts[2]
- # Ok, we have the amount, now to figure pct...
- if self.sell_pct > 100:
- self.sell_pct -= 1
- else:
- self.sell_pct += 1
- price = amount * self.sell_pct // 100
- log.msg("start: {0} % {1} price {2}".format(amount, self.sell_perc, price))
- if self.sell_pct > 100:
- self.sell_pct -= 1
- else:
- self.sell_pct += 1
- self.queue_player.put("{0}\r".format(price))
- elif self.state == 8:
- # What are we trading
- # How many holds of Equipment do you want to buy [75]?
- if re.match(r"How many holds of .+ do you want to buy \[\d+\]\?", prompt):
- parts = prompt.split()
- trade_type = parts[4]
- if trade_type == 'Fuel':
- if (self.tpc in (5,7)) and (self.opc in (2,3,4,8)):
- # Can buy equipment - fuel ore is worthless.
- self.queue_player.put("0\r")
- return
- if (self.tpc in(4,7)) and (self.opc in (1,3,5,8)):
- # Can buy organics - fuel ore is worthless.
- self.queue_player.put("0\r")
- return
- if (self.tpc in (4,7,3,5)) and (self.opc in (3,4,5,7)):
- # No point in buying fuel ore if it can't be sold.
- self.queue_player.put("0\r")
- return
- elif trade_type == 'Organics':
- if (self.tpc in (6,7)) and (self.opc in (2,3,4,8)):
- # Can buy equipment - organics is worthless.
- self.queue_player.put("0\r")
- return
- if (self.tpc in (2,4,6,7)) and (self.opc in (2,4,6,7)):
- # No point in buying organics if it can't be sold.
- self.queue_player.put("0\r")
- return
- elif trade_type == 'Equipment':
- if (self.opc in (1,5,6,7)):
- # No point in buying equipment if it can't be sold.
- self.queue_player.put("0\r")
- return
- self.queue_player.put("\r")
- elif re.match(r"Command \[TL=.* \(\?=Help\)\? :", prompt):
- # Done
- if self.this_sector == self.sector1:
- self.this_sector = self.sector2
- self.queue_player.put("{0}\r".format(self.sector2))
- self.state = 10
- else:
- self.times_left -= 1
- if self.times_left <= 0:
- # Ok, exit out
- self.deactivate()
- return
- if self.last_credits is None:
- self.last_credits = self.credits
- else:
- if self.credits <= self.last_credits:
- log.msg("We don't seem to be making any money here...")
- self.deactivate()
- return
- self.this_sector = self.sector1
- self.queue_player.put("{0}\r".format(self.sector1))
- self.state = 10
-
- def trade(self, *_):
- # state 5
- log.msg("trade!")
- self.queue_player.put("pt") # Port Trade
- self.this_port = self.game.gamedata.ports[self.this_sector]
- if self.this_sector == self.sector1:
- self.other_port = self.game.gamedata.ports[self.sector2]
- else:
- self.other_port = self.game.gamedata.ports[self.sector1]
- # Ok, perform some calculations
- self.tpc = self.this_port['class']
- self.opc = self.other_port['class']
- self.fixable = 0
- self.fix_offer = 0
- # [ Items Status Trading % of max OnBoard]
- # [ ----- ------ ------- -------- -------]
- # [Fuel Ore Selling 2573 93% 0]
- # [Organics Buying 2960 100% 0]
- # [Equipment Buying 1958 86% 0]
- # []
- # []
- # [You have 1,000 credits and 20 empty cargo holds.]
- # []
- # [We are selling up to 2573. You have 0 in your holds.]
- # [How many holds of Fuel Ore do you want to buy [20]? 0]
- def game_line(self, line: str):
- if line.startswith("You have ") and 'credits and' in line:
- parts = line.replace(',', '').split()
- credits = int(parts[2])
- log.msg("Credits: {0}".format(credits))
- self.credits = credits
- if self.state == 1:
- # First exploration
- if line.startswith("Sector :"):
- # We have starting sector information
- parts = re.split("\s+", line)
- self.this_sector = int(parts[2])
- # These will be the ones swapped around as we trade back and forth.
- self.sector1 = self.this_sector
- elif line.startswith("Warps to Sector(s) : "):
- # Warps to Sector(s) : 397 - (562) - (639)
- _, _, warps = line.partition(':')
- warps = warps.replace('-', '').replace('(', '').replace(')', '').strip()
- log.msg("Warps: [{0}]".format(warps))
- self.warps = [ int(x) for x in re.split("\s+", warps)]
- log.msg("Warps: [{0}]".format(self.warps))
- self.state = 2
- elif self.state == 2:
- if line == "":
- # Ok, we're done
- self.state = 3
- # Check to see if we have information on any possible ports
- # if hasattr(self.game, 'portdata'):
- if True:
- if not self.this_sector in self.game.gamedata.ports:
- self.state = 0
- log.msg("Current sector {0} not in portdata.".format(self.this_sector))
- self.queue_game.put(self.r + self.nl + "I can't find the current sector in the portdata." + self.nl)
- self.deactivate()
- return
- else:
- # Ok, we are in the portdata
- pd = self.game.gamedata.ports[self.this_sector]
- if GameData.port_burnt(pd):
- log.msg("Current sector {0} port is burnt (<= 20%).".format(self.this_sector))
- self.queue_game.put(self.r + self.nl + "Current sector port is burnt out. <= 20%." + self.nl)
- self.deactivate()
- return
- possible = [ x for x in self.warps if x in self.game.gamedata.ports ]
- log.msg("Possible:", possible)
- # BUG: Sometimes links to another sector, don't link back!
- # This causes the game to plot a course / autopilot.
- # if hasattr(self.game, 'warpdata'):
- if True:
- # Great! verify that those warps link back to us!
- possible = [ x for x in possible if x in self.game.gamedata.warps and self.this_sector in self.game.gamedata.warps[x]]
- if len(possible) == 0:
- self.state = 0
- self.queue_game.put(self.r + self.nl + "I don't see any ports in [{0}].".format(self.warps) + self.nl)
- self.deactivate()
- return
- possible = [ x for x in possible if not GameData.port_burnt(self.game.gamedata.ports[x]) ]
- log.msg("Possible:", possible)
- if len(possible) == 0:
- self.state = 0
- self.queue_game.put(self.r + self.nl + "I don't see any unburnt ports in [{0}].".format(self.warps) + self.nl)
- self.deactivate()
- return
- possible = [ x for x in possible if GameData.port_trading(self.game.gamedata.ports[self.this_sector]['port'], self.game.gamedata.ports[x]['port'])]
- self.possible = possible
- if len(possible) == 0:
- self.state = 0
- self.queue_game.put(self.r + self.nl + "I don't see any possible port trades in [{0}].".format(self.warps) + self.nl)
- self.deactivate()
- return
- elif len(possible) == 1:
- # Ok! there's only one!
- self.sector2 = possible[0]
-
- # Display possible ports:
- # spos = [ str(x) for x in possible]
- # self.queue_game.put(self.r + self.nl + self.nl.join(spos) + self.nl)
- # At state 3, we only get a prompt.
- return
- else:
- self.state = 0
- log.msg("We don't have any portdata!")
- self.queue_game.put(self.r + self.nl + "I have no portdata. Please run CIM Port Report." + self.nl)
- self.deactivate()
- return
- elif self.state == 5:
- if "-----" in line:
- self.state = 6
- elif self.state == 6:
- if "We are buying up to" in line:
- # Sell
- self.state = 7
- self.queue_player.put("\r")
- self.sell_perc = 100 + self.percent
- if "We are selling up to" in line:
- # Buy
- self.state = 8
- self.sell_perc = 100 - self.percent
- if line.startswith('Fuel Ore') or line.startswith('Organics') or line.startswith('Equipment'):
- work = line.replace('Fuel Ore', 'Fuel')
- parts = re.split(r"\s+", work)
- # log.msg(parts)
- # Equipment, Selling xxx x% xxx
- if parts[-1] != '0' and parts[2] != '0' and parts[1] != 'Buying':
- log.msg("We have a problem -- they aren't buying what we have in stock!")
- stuff = line[0] # F O or E.
- if stuff == 'F':
- spos = 0
- elif stuff == 'O':
- spos = 1
- else:
- spos = 2
- other = self.other_port['port']
- if other[spos] == 'B':
- self.fixable = 1
- if "You don't have anything they want" in line:
- # Neither! DRAT!
- if not self.fixable:
- self.deactivate()
- return
- if "We're not interested." in line:
- log.msg("Try, try again. :(")
- self.state = 5
- self.trade()
- elif self.state == 7:
- # Haggle Sell
- if "We'll buy them for" in line or "Our final offer" in line:
- if "Our final offer" in line:
- self.sell_perc -= 1
- parts = line.replace(',', '').split()
- start_price = int(parts[4])
- price = start_price * self.sell_perc // 100
- log.msg("start: {0} % {1} price {2}".format(start_price, self.sell_perc, price))
- self.sell_perc -= 1
- self.queue_player.put("{0}\r".format(price))
- if "We are selling up to" in line:
- # Buy
- self.state = 8
- self.sell_perc = 100 - self.percent
- if "We're not interested." in line:
- log.msg("Try, try again. :(")
- self.state = 5
- self.trade()
- if "WHAT?!@!? you must be crazy!" in line:
- log.msg("fix offer")
- self.fix_offer = 1
- if "So, you think I'm as stupid as you look?" in line:
- log.msg("fix offer")
- self.fix_offer = 1
- if "Quit playing around, you're wasting my time!" in line:
- log.msg("fix offer")
- self.fix_offer = 1
- elif self.state == 8:
- # Haggle Buy
- if "We'll sell them for" in line or "Our final offer" in line:
- if "Our final offer" in line:
- self.sell_perc += 1
- parts = line.replace(',', '').split()
- start_price = int(parts[4])
- price = start_price * self.sell_perc // 100
- log.msg("start: {0} % {1} price {2}".format(start_price, self.sell_perc, price))
- self.sell_perc += 1
- self.queue_player.put("{0}\r".format(price))
- if "We're not interested." in line:
- log.msg("Try, try again. :(")
- self.state = 5
- self.trade()
- elif self.state == 10:
- if "Sector : " in line:
- # Trade
- self.state = 5
- reactor.callLater(0, self.trade, 0)
- # self.trade()
- # elif self.state == 3:
- # log.msg("At state 3 [{0}]".format(line))
- # self.queue_game.put("At state 3.")
- # self.deactivate()
- # return
- class ScriptExplore(object):
- """ Script Explore v1.00
- By: David Thielemann
- WARNINGS:
- We assume the player has a Holo-Scanner!
- We assume the player has lots o turns, or unlimited turns!
- We assume the player is aware we run infinitly until we can't find new sectors to move to!
- """
- def __init__(self, game):
- self.game = game
- self.queue_game = game.queue_game
- self.queue_player = game.queue_player
- self.observer = game.observer
- self.r = Style.RESET_ALL
- self.c = merge(Style.BRIGHT + Fore.YELLOW)
- self.nl = "\n\r"
-
- # Our Stuff, Not our pants!
- self.dense = [] # We did a density, store that info.
- self.clear = [] # Warps that we know are clear.
- self.didDense = False
- self.didHolo = False
- self.highsector = 0 # Selected Sector to move to next!
- self.highwarp = 0 # Selected Sector's Warp Count!
- self.stacksector = set() # Set of sectors that we have not picked but are unexplored... even though we did a holo!
- self.oneMoveSector = False
- self.times = 0
- self.maxtimes = 0
- # Activate
- self.prompt = game.buffer
- self.save = self.observer.save()
- self.observer.connect('player', self.player)
- self.observer.connect("prompt", self.game_prompt)
- self.observer.connect("game-line", self.game_line)
- self.defer = None
- self.send2player(Boxes.alert("Explorer v1.01", base="green"))
- # How many times we going to go today?
- ask = PlayerInput(self.game)
-
- def settimes(*_):
- times = ask.keep['times'].strip()
- log.msg("settimes got '{0}'".format(times))
- if times == '':
- self.deactivate()
- else:
- times = int(times)
- self.times = times
- self.maxtimes = times
- self.send2game("D")
- self.state = 1
-
- d = ask.prompt("How many sectors would you like to explorer?", 5, name="times", digits=True)
- #d.addCallback(ask.output)
- #d.addCallback(lambda ignore: self.settimes(ask.keep))
- d.addCallback(settimes)
-
- def whenDone(self):
- self.defer = defer.Deferred()
- # Call this to chain something after we exit.
- return self.defer
-
- def deactivate(self):
- self.state = 0
- log.msg("ScriptExplore.deactivate()")
- assert(not self.save is None)
- self.observer.load(self.save)
- self.save = None
- if self.defer:
- self.defer.callback('done')
- self.defer = None
-
- def player(self, chunk: bytes):
- # If we receive anything -- ABORT!
- self.deactivate()
-
- def send2game(self, txt):
- self.queue_player.put(txt)
- def send2player(self, txt):
- self.queue_game.put(
- self.nl + txt + self.r + self.nl
- )
- def resetStuff(self):
- self.didDense = False
- self.didHolo = False
- self.dense = []
- self.clear = []
- self.highwarp = 0
- self.highsector = 0
- log.msg("ScriptExplore.resetStuff()")
- def dead_end(self):
- """ We've reached a dead end.
- Either pop a new location to travel to, or give it up.
- """
- self.send2player(Boxes.alert("** DEAD END **", base="blue"))
- if self.stacksector:
- # Ok, there's somewhere to go ...
- self.highsector = self.stacksector.pop()
- # travel state
- self.state = 10
- self.send2game("{0}\r".format(self.highsector))
- else:
- self.send2player(Boxes.alert("I've run out of places to look ({0}/{1}).".format(self.maxtimes - self.times, self.maxtimes)))
- self.deactivate()
- def game_over(self, msg):
- self.send2player(Boxes.alert("STOP: {0} ({1}/{2}).".format(msg, self.maxtimes - self.times, self.maxtimes)))
- self.deactivate()
- def game_prompt(self, prompt: str):
- # log.msg("{0} : {1}".format(self.state, prompt))
- if self.state == 2:
- if "Select (H)olo Scan or (D)ensity Scan or (Q)uit" in prompt:
- self.send2game("D")
- # self.state += 1
- elif self.state == 5:
- log.msg("dense is {0} sectors big".format(len(self.dense)))
- self.state += 1
- self.send2game("SH")
- elif self.state == 12:
- # Looking for "Engage the Autopilot?"
- if prompt.startswith('Engage the Autopilot? (Y/N/Single step/Express) [Y]'):
- self.send2game("S")
- self.travel_path.pop(0)
- if prompt.startswith('Stop in this sector (Y,N,E,I,R,S,D,P,?) (?=Help) [N]'):
- self.send2game("SD")
- self.state += 1
- # Arriving sector :1691 Autopilot disengaging.
- if re.match(r"Command \[TL=.* \(\?=Help\)\? :", prompt):
- log.msg("We made it to where we wanted to go!")
- # can't init state 1, because we're at a prompt, so...
- self.send2game("S")
- self.state = 2
- return
- elif self.state == 15:
- if prompt.startswith('Stop in this sector (Y,N,E,I,R,S,D,P,?) (?=Help) [N]'):
- self.send2game("N")
- self.travel_path.pop(0)
- self.state = 12
- if re.match(r"Command \[TL=.* \(\?=Help\)\? :", prompt):
- log.msg("We made it to where we wanted to go!")
- # can't init state 1, because we're at a prompt, so...
- self.send2game("S")
- self.state = 2
- return
- elif self.state == 20:
- if prompt.startswith('Stop in this sector (Y,N,E,I,R,S,D,P,?) (?=Help) [N]'):
- # Stop in this sector / Yes!
- self.send2game("Y")
- self.state = 1
- # this should re-trigger a scan
-
- def game_line(self, line: str):
- log.msg("{0} | {1}".format(self.state, line))
- #if "Mine Control" in line: # If we don't have a Holo-Scanner and we attempted to do a Holo-scan, abort
- # self.deactivate()
- if self.state == 1:
- self.send2game("S")
- self.state += 1
- elif self.state == 2:
- if "Relative Density Scan" in line:
- self.state = 3
- elif "You don't have a long range scanner." in line:
- log.msg("FATAL: No Long Range Scanner Installed!")
- self.send2player(Boxes.alert("You need a Long Range Scanner!"))
- self.deactivate()
- return
- # elif "Long Range Scan" in line:
- # self.state += 1
- elif self.state == 3:
- # Get the Density Data!
- if line.startswith("Sector"):
- new_sector = '(' in line
- work = line.replace(':', '').replace(')', '').replace('(', '').replace('%', '').replace('==>', '')
- work = re.split(r"\s+", work)
- log.msg(work)
- # 'Sector', '8192', '0', 'Warps', '4', 'NavHaz', '0', 'Anom', 'No'
- # 'Sector', '(', '8192)', '0', 'Warps', '4', 'NavHaz', '0', 'Anom', 'No'
- # New Sector?
- if new_sector:
- # Switch Anom into bool state
- # if(work[8] == 'No'):
- # temp = False
- # else:
- # temp = True
- #self.dense.append( {'sector': int(work[1]), 'density': int(work[2]), 'warps': int(work[4]), 'navhaz': int(work[6]), 'anom': temp} )
- self.dense.append( {'sector': int(work[1]), 'density': int(work[2]), 'warps': int(work[4]), 'navhaz': int(work[6]), 'anom': work[8] == 'Yes'} )
- log.msg(self.dense)
- # {'sector': 8192, 'density': 0, 'warps': 4, 'navhaz': 0, 'anom': False}
-
- elif line == "":
- self.state += 1
- # yeah, this would be better in the above line...
- # leaving it for now.
- # Which is why I broke the elif chain. ...
- if self.state == 4:
- # Begin Processing our data we got from density scan and find highest warp count in what sectors
- # Remove sectors with one warp
- log.msg("state 4: {0}".format(self.dense))
- # Do we have a new place to go? (That is also worth going to)
- if not self.dense: # Dense contains no new sectors, abort
- log.msg("No New Sectors Found!")
- self.dead_end()
- return
- # self.send2player(Boxes.alert("Find a new area for me to search in!"))
- # Attempt to resolve no new sectors!
- # if self.stacksector: # Do we have anything on the stack? (If so we set highsector with one of the randomly selected sectors)
- # self.highsector = self.stacksector.pop()
- # self.deactivate()
- # elif self.dense: # Dense does contain at least 1 new sector, continue on
- # ugh, actually, we don't care about making a list
- # we only want to know if there's any that have warps > 1. :(
- # see any:
- # list comprehension
- # t = [d for d in self.dense if d['warps'] > 1]
- # t = [] # Pre-Test to check if there are just a bunch of 1 warp sectors
- # for d in self.dense:
- # if d['warps'] > 1:
- # t.append(d['sector'])
- # I don't care if there's only one warp. if it's un-explorered then
- # explore it!
- # if not any( w['warps'] > 1 for w in self.dense):
- # # If there are no sectors with more that 1 warp, abort
- # log.msg("No Sectors Found except one move sector!")
- # self.dead_end()
- # return
- # self.send2player(Boxes.alert("Find a new area for me to look at!"))
- # Attempt to resolve no new sectors with more than 1 warp!
- # if self.stacksector: # Do we have anything on the stack? (If so we set highsector with one of the randomly selected sectors)
- # self.highsector = self.stacksector.pop()
- # self.deactivate()
- # Is the sector safe to go into?
- # self.clear = [ x['sector'] for x in self.dense if not x['anom'] and not x['navhaz'] and x['density'] in (0,1,100,101) and x['warps'] > 1 ]
- self.clear = [ x for x in self.dense if not x['anom'] and not x['navhaz'] and x['density'] in (0,1,100,101) ]
- # for d in self.dense:
- # if not d['anom']:
- # # Sector does not contain a Anomoly
- # if not d['navhaz']:
- # # Sector does not contain Hazards
- # if d['density'] in (0, 1, 100, 101):
- # # Sector does contain empty space / a beacon / a port / or a beacon and port
- # if d['warps'] > 1:
- # # If Sector is worth checking out?
- # self.clear.append(d['sector'])
-
- if self.clear: # We have sector(s) we can move to!
- log.msg("Clear Sectors: {0}".format(len(self.clear)))
- # This was state 5 but why can't we reduce number of states? ( Yeah let's kick California and New York out of the US, oh wrong states :P )
- # Sort to find greatest warp count
- self.highwarp, self.highsector = max( (x['warps'], x['sector']) for x in self.clear)
- # for c in self.clear:
- # for d in self.dense:
- # if d['sector'] == c:
- # if d['warps'] > self.highwarp:
- # self.highwarp = d['warps']
- # self.highsector = d['sector']
- # elif d['warps'] == self.highwarp:
- # if d['sector'] > self.highsector:
- # self.highsector = d['sector']
-
- # if self.highwarp and self.highsector:
- log.msg("Sector: {0:5d} Warps: {1}".format(self.highsector, self.highwarp))
- self.state += 1
- else:
- log.msg("No (safe) sectors to move to!")
- # Let's try this?!
- # self.dead_end() # NO!
- self.game_over("No SAFE moves.")
- # Another NOP state. This also could be merged into above.
- # break the elif chain.
- if self.state == 5:
- # Add the dense scan of unknown sectors onto the stack of sectors, only save the ones we think are clear... for now.
- for c in self.clear:
- sector = c['sector']
- if sector != self.highsector:
- self.stacksector.add(sector)
-
- # Or simply not add it in the first place ...
- # Remove the sector we are just about to go to, we use discard so if the sector does not exist we don't throw a error!
- # self.stacksector.discard(self.highsector)
- # Ok, we need to decide to stop exploring -- before we
- # issue the sector move! :P
- #
- # Warning! Yes we can and will eat all the turns! :P
- if self.times == 0:
- self.send2player(Boxes.alert("Completed {0}".format(self.maxtimes), base="green"))
- log.msg("Completed {0}".format(self.maxtimes))
- self.deactivate()
- return
- self.times -= 1
- # Ok we know the sector we want to go to now let's move it!
- self.send2game("m{0}\r".format(self.highsector))
- # Reset Variables for fresh data
- self.resetStuff()
- self.state = 1
- elif self.state == 10:
- if line.startswith("You are already in that sector!"):
- log.msg("Already here. (Whoops!)")
- self.state = 1
- return
- if line.startswith("Sector : {0}".format(self.highsector)):
- log.msg("We're here!")
- # Ok, we're already there! no autopilot needed!
- self.state = 1
- return
- # Warping
- self.go_on = True
- if line.startswith('The shortest path ('):
- # Ok, we've got a path.
- self.state += 1
- elif self.state == 11:
- self.travel_path = line.replace('(', '').replace(')', '').split(' > ')
- log.msg("Travel path: {0}".format(self.travel_path))
- self.state += 1
- self.travel_path.pop(0) # First sector is one we're in.
- self.stophere = False
- self.go_on = True
- elif self.state == 12:
- # Arriving sector :1691 Autopilot disengaging.
- if 'Autopilot disengaging.' in line:
- log.msg("We made it to where we wanted to go!")
- self.state = 1
- return
- elif self.state == 13:
- if 'Relative Density Scan' in line:
- self.state += 1
- elif self.state == 14:
- if line == "":
- log.msg("PATH: {0}".format(self.travel_path))
- # end of the scan, decision time
- if self.stophere:
- log.msg("STOPHERE")
- # Ok, let's stop here!
- # Re-save the sector we were trying to get to. (we didn't make it there)
- self.stacksector.add(self.highsector)
- self.state = 20
- else:
- if self.go_on:
- log.msg("GO ON")
- # Ok, carry on!
- self.state = 15
- else:
- log.msg("Our way is blocked...")
- self.stacksector.add(self.highsector)
- self.state = 20
- else:
- if line.strip('-') != '':
- work = line.replace(' :', '').replace('%', '').replace(')', '').replace('==>', '')
- # Does this contain something new? unseen?
- stophere = '(' in work
- work = work.replace('(','')
- #Sector XXXX DENS Warps N NavHaz P Anom YN
- parts = re.split(r'\s+', work)
- # Don't bother stopping if there's only one warp
- # YES! Stop, even if there is just one warp!
- # if stophere and parts[4] == '1':
- # stophere = False
-
- if stophere:
- self.stophere = True
- next_stop = self.travel_path[0]
- log.msg("next_stop {0} from {1}".format(next_stop, self.travel_path))
- log.msg("parts: {0}".format(parts))
- if parts[1] == next_stop:
- log.msg("next_stop {0} found...".format(next_stop))
- # Ok, this is our next stop. Is it safe to travel to?
- if parts[2] not in ('100', '0', '1', '101'):
- # Ok, it's not safe to go on.
- self.go_on = False
- # Check the rest navhav and anom ...
- class ScriptSpace(object):
- """ Space Exploration script.
-
- Send "SD", verify paths are clear.
- Find nearest unknown. Sector + CR.
- Save "shortest path from to" information.
- At 'Engage the Autopilot', Send "S" (Single Step)
- At '[Stop in this sector', Send "SD", verify path is clear.
- Send "SH" (glean sector/port information along the way.)
- Send "N" (Next)!
- Send "SD" / Verify clear.
- Send "SH"
- Repeat for Next closest.
-
- """
- def __init__(self, game):
- self.game = game
- self.queue_game = game.queue_game
- self.queue_player = game.queue_player
- self.observer = game.observer
- self.r = Style.RESET_ALL
- self.nl = "\n\r"
- self.this_sector = None # Starting sector
- self.target_sector = None # Sector going to
- self.path = []
- self.times_left = 0 # How many times to look for target
- self.density = dict() # Results of density scan. (Just the numbers)
- # Activate
- self.prompt = game.buffer
- self.save = self.observer.save()
- self.observer.connect('player', self.player)
- self.observer.connect("prompt", self.game_prompt)
- self.observer.connect("game-line", self.game_line)
- self.defer = None
- self.queue_game.put(
- self.nl + "Bugz (like space), is big." + self.r + self.nl
- )
- self.state = 1
- self.queue_player.put("SD")
- # Get current density scan + also get the current sector.
- # [Command [TL=00:00:00]:[XXXX] (?=Help)? : D]
- def whenDone(self):
- self.defer = defer.Deferred()
- # Call this to chain something after we exit.
- return self.defer
- def deactivate(self):
- self.state = 0
- log.msg("ScriptPort.deactivate ({0})".format(self.times_left))
- assert(not self.save is None)
- self.observer.load(self.save)
- self.save = None
- if self.defer:
- self.defer.callback('done')
- self.defer = None
- def player(self, chunk: bytes):
- # If we receive anything -- ABORT!
- self.deactivate()
- def unknown_search(self, starting_sector):
- seen = set()
- possible = set()
- possible.add(int(starting_sector))
- done = False
- while not done:
- next_possible = set()
- for s in possible:
- p = self.game.gamedata.get_warps(s)
- if p is not None:
- for pos in p:
- if pos not in seen:
- next_possible.add(pos)
- else:
- log.msg("unknown found:", s)
- self.unknown = s
- done = True
- break
- seen.add(s)
- if self.unknown is None:
- log.msg("possible:", next_possible)
- possible = next_possible
- yield
- def find_unknown(self, starting_sector):
- log.msg("find_unknown( {0})".format(starting_sector))
- d = defer.Deferred()
- # Process things
- self.unknown = None
- c = coiterate(self.unknown_search(starting_sector))
- c.addCallback(lambda unknown: d.callback(self.unknown))
- return d
- def show_unknown(self, sector):
- if sector is None:
- self.deactivate()
- return
- log.msg("Travel to {0}...".format(sector))
- self.queue_player.put("{0}\r".format(sector))
- def game_prompt(self, prompt: str):
- log.msg("{0} : {1}".format(self.state, prompt))
- if self.state == 3:
- if re.match(r"Command \[TL=.* \(\?=Help\)\? :", prompt):
- # this_sector code isn't working -- so! Get sector from prompt
- self.state = 4
- _, _, sector = prompt.partition(']:[')
- sector, _, _ = sector.partition(']')
- self.this_sector = int(sector)
- # Ok, we're done with Density Scan, and we're back at the command prompt
- log.msg("Go find the nearest unknown...")
- d = self.find_unknown(sector)
- d.addCallback(self.show_unknown)
- elif self.state == 6:
- # Engage the autopilot?
- if prompt.startswith('Engage the Autopilot? (Y/N/Single step/Express) [Y]'):
- self.state = 7
- sector = self.path.pop(0)
- if sector in self.density:
- if self.density[sector] in (0, 100):
- # Ok, looks safe!
- self.queue_player.put("S")
- self.this_sector = sector
- else:
- log.msg("FATAL: Density for {0} is {1}".format(sector, self.density[sector]))
- self.deactivate()
- return
- else:
- log.msg("{0} not in density scan? (how's that possible?)".format(sector))
- self.deactivate()
- return
- elif self.state == 7:
- # Ok, we're in a new sector (single stepping through space)
- # update density scan
- if prompt.startswith('Stop in this sector (Y,N,E,I,R,S,D,P,?) (?=Help) [N] ?'):
- self.queue_player.put("SD")
- self.state = 8
- elif self.state == 10:
- # Because we're here
- if prompt.startswith('Stop in this sector (Y,N,E,I,R,S,D,P,?) (?=Help) [N] ?'):
- self.queue_player.put("SH")
- self.state = 11
- elif self.state == 11:
- if prompt.startswith('Stop in this sector (Y,N,E,I,R,S,D,P,?) (?=Help) [N] ?'):
- # Ok, is the density scan clear?
- sector = self.path.pop(0)
- if sector in self.density:
- if self.density[sector] in (0, 100):
- # Ok, looks safe
- self.queue_player.put("N")
- self.state = 7
- self.this_sector = sector
- else:
- log.msg("FATAL: Density for {0} is {1}".format(sector, self.density[sector]))
- self.deactivate()
- return
- else:
- log.msg("{0} not in density scane? (how's that possible...)".format(sector))
- self.deactivate()
- return
- def next_unknown(self, sector):
- log.msg("Unknown is :", sector)
- self.deactivate()
- def game_line(self, line: str):
- log.msg("line {0} : {1}".format(self.state, line))
- if line.startswith('Sector : '):
- work = line.strip()
- parts = re.split(r"\s+", work)
- self.this_sector = int(parts[2])
- log.msg("game_line sector", self.this_sector)
- elif line.startswith("Command [TL=]"):
- # Ok, get the current sector from this
- _, _, sector = line.partition("]:[")
- sector, _, _ = sector.partition("]")
- self.this_sector = int(sector)
- log.msg("current sector: {0}".format(self.this_sector))
- elif line.startswith('Warps to Sector(s) :'):
- # Warps to Sector(s) : 5468
- _, _, work = line.partition(':')
- work = work.strip().replace('(', '').replace(')', '').replace(' - ', ' ')
- parts = [ int(x) for x in work.split(' ')]
- self.path = list(parts)
- if self.state in (1, 8):
- if 'Relative Density Scan' in line:
- # Start Density Scan
- self.state += 1
- self.density = {}
- elif self.state in (2, 9):
- if line == '':
- # End of Density Scan
- self.state += 1
- log.msg("Density: {0}".format(self.density))
- # self.deactivate()
- elif line.startswith('Sector'):
- # Parse Density Scan values
- work = line.replace('(', '').replace(')', '').replace(':', '').replace('%', '').replace(',', '')
- parts = re.split(r'\s+', work)
- log.msg("Sector", parts)
- sector = int(parts[1])
- self.density[sector] = int(parts[3])
- if parts[7] != '0':
- log.msg("NavHaz {0} : {1}".format(parts[7], work))
- self.density[sector] += 99
- if parts[9] != 'No':
- log.msg("Anom {0} : {1}".format(parts[9], work))
- self.density[sector] += 990
- elif self.state == 4:
- # Looking for shortest path message / warp info
- # Or possibly, "We're here!"
- if line.startswith('Sector :') and str(self.unknown) in line:
- # Ok, I'd guess that we're already there!
- # Try it again!
- self.queue_player.put("SD")
- self.state = 1
- if line.startswith('The shortest path'):
- self.state = 5
- elif self.state == 5:
- # This is the warps line
- # Can this be multiple lines?
- if line == "":
- self.state = 6
- else:
- work = line.replace("(", "").replace(")", "").replace(">", "").strip()
- self.path = [int(x) for x in work.split()]
- log.msg("Path:", self.path)
- # Verify
- current = self.path.pop(0)
- if current != self.this_sector:
- log.msg("Failed: {0} != {1}".format(current, self.this_sector))
- self.deactivate()
- return
- elif self.state == 7:
- if self.unknown == self.this_sector:
- # We have arrived!
- log.msg("We're here!")
- self.deactivate()
- from boxes import Boxes
- class ProxyMenu(object):
- """ Display ProxyMenu
-
- Example:
- from flexible import ProxyMenu
- if re.match(r"Command \[TL=.* \(\?=Help\)\? :", prompt):
- menu = ProxyMenu(self.game)
- """
- def __init__(self, game):
- self.nl = "\n\r"
- self.c = merge(Style.BRIGHT + Fore.YELLOW + Back.BLUE)
- self.r = Style.RESET_ALL
- self.c1 = merge(Style.BRIGHT + Fore.WHITE + Back.BLUE)
- self.c2 = merge(Style.NORMAL + Fore.CYAN + Back.BLUE)
- # self.portdata = None
- self.game = game
- self.queue_game = game.queue_game
- self.observer = game.observer
- # Am I using self or game? (I think I want game, not self.)
- # if hasattr(self.game, "portdata"):
- # self.portdata = self.game.portdata
- # else:
- # self.portdata = {}
- # if hasattr(self.game, 'warpdata'):
- # self.warpdata = self.game.warpdata
- # else:
- # self.warpdata = {}
- if hasattr(self.game, 'trade_report'):
- self.trade_report = self.game.trade_report
- else:
- self.trade_report = []
- # Yes, at this point we would activate
- self.prompt = game.buffer
- self.save = self.observer.save()
- self.observer.connect("player", self.player)
- # If we want it, it's here.
- self.defer = None
- self.keepalive = task.LoopingCall(self.awake)
- self.keepalive.start(30)
- self.menu()
- def __del__(self):
- log.msg("ProxyMenu {0} RIP".format(self))
- def whenDone(self):
- self.defer = defer.Deferred()
- # Call this to chain something after we exit.
- return self.defer
- def menu(self):
- box = Boxes(30, color=self.c)
- self.queue_game.put(box.top())
- self.queue_game.put(
- box.row(self.c + "{0:^30}".format("TradeWars Proxy Active")))
- self.queue_game.put(box.middle())
- def menu_item(ch: str, desc: str):
- row = self.c1 + " {0} {1}- {2}{3:25}".format(ch, self.c2, self.c1, desc)
- self.queue_game.put(box.row(row))
- # self.queue_game.put(
- # " " + self.c1 + ch + self.c2 + " - " + self.c1 + desc + self.nl
- # )
- menu_item("D", "Display Report again")
- # menu_item("Q", "Quest")
- # if hasattr(self.game, 'portdata'):
- # ports = len(self.game.portdata)
- # else:
- # ports = '?'
- menu_item("P", "Port CIM Report ({0})".format(len(self.game.gamedata.ports)))
- # if hasattr(self.game, 'warpdata'):
- # warps = len(self.game.warpdata)
- # else:
- # warps = '?'
- menu_item("W", "Warp CIM Report ({0})".format(len(self.game.gamedata.warps)))
- menu_item("T", "Trading Report")
- menu_item("S", "Scripts")
- menu_item("X", "eXit")
- self.queue_game.put(box.bottom())
- self.queue_game.put(" " + self.c + "-=>" + self.r + " ")
- def awake(self):
- log.msg("ProxyMenu.awake()")
- self.game.queue_player.put(" ")
- def port_report(self, portdata: dict):
- # self.portdata = portdata
- # self.game.portdata = portdata
- self.queue_game.put("Loaded {0} ports.".format(len(portdata)) + self.nl)
- self.welcome_back()
- def warp_report(self, warpdata: dict):
- # self.warpdata = warpdata
- # self.game.warpdata = warpdata
- self.queue_game.put("Loaded {0} sectors.".format(len(warpdata)) + self.nl)
- self.welcome_back()
- def make_trade_report(self):
- log.msg("make_trade_report()")
- ok_trades = []
- best_trades = []
- # for sector, pd in self.game.gamedata.ports.items():
- for sector in sorted(self.game.gamedata.ports.keys()):
- pd = self.game.gamedata.ports[sector]
- if not GameData.port_burnt(pd):
- pc = pd['class']
- # Ok, let's look into it.
- if not sector in self.game.gamedata.warps:
- continue
- warps = self.game.gamedata.warps[sector]
- for w in warps:
- # Verify that we have that warp's info, and that the sector is in it.
- # (We can get back from it)
- if w in self.game.gamedata.warps and sector in self.game.gamedata.warps[w]:
- # Ok, we can get there -- and get back!
- if w > sector and w in self.game.gamedata.ports and not GameData.port_burnt(self.game.gamedata.ports[w]):
- # it is > and has a port.
- wd = self.game.gamedata.ports[w]
- wc = wd['class']
- # 1: "BBS",
- # 2: "BSB",
- # 3: "SBB",
- # 4: "SSB",
- # 5: "SBS",
- # 6: "BSS",
- # 7: "SSS",
- # 8: "BBB",
- if pc in (1,5) and wc in (2,4):
- best_trades.append(self.game.gamedata.port_trade_show(sector, w))
- # best_trades.append( "{0:5} -=- {1:5}".format(sector, w))
- elif pc in (2,4) and wc in (1,5):
- best_trades.append(self.game.gamedata.port_trade_show(sector, w))
- # best_trades.append( "{0:5} -=- {1:5}".format(sector, w))
- elif GameData.port_trading(pd['port'], self.game.gamedata.ports[w]['port']):
- # ok_trades.append( "{0:5} -=- {1:5}".format(sector,w))
- ok_trades.append(self.game.gamedata.port_trade_show(sector, w))
- yield
- self.trade_report.append("Best Trades: (org/equ)")
- self.trade_report.extend(best_trades)
- self.trade_report.append("Ok Trades:")
- self.trade_report.extend(ok_trades)
- # self.queue_game.put("BEST TRADES:" + self.nl + self.nl.join(best_trades) + self.nl)
- # self.queue_game.put("OK TRADES:" + self.nl + self.nl.join(ok_trades) + self.nl)
- def show_trade_report(self, *_):
- self.game.trade_report = self.trade_report
- for t in self.trade_report:
- self.queue_game.put(t + self.nl)
- self.welcome_back()
- def player(self, chunk: bytes):
- """ Data from player (in bytes). """
- chunk = chunk.decode("latin-1", "ignore")
- key = chunk.upper()
- log.msg("ProxyMenu.player({0})".format(key))
- # Stop the keepalive if we are activating something else
- # or leaving...
- self.keepalive.stop()
- if key == "T":
- self.queue_game.put(self.c + key + self.r + self.nl)
- # Trade Report
- # do we have enough information to do this?
- # if not hasattr(self.game, 'portdata') and not hasattr(self.game, 'warpdata'):
- # self.queue_game.put("Missing portdata and warpdata." + self.nl)
- # elif not hasattr(self.game, 'portdata'):
- # self.queue_game.put("Missing portdata." + self.nl)
- # elif not hasattr(self.game, 'warpdata'):
- # self.queue_game.put("Missing warpdata." + self.nl)
- # else:
- if True:
- # Yes, so let's start!
- self.trade_report = []
- d = coiterate(self.make_trade_report())
- d.addCallback(self.show_trade_report)
- return
- elif key == "P":
- self.queue_game.put(self.c + key + self.r + self.nl)
- self.game.gamedata.reset_ports()
- # Activate CIM Port Report
- report = CIMPortReport(self.game)
- d = report.whenDone()
- d.addCallback(self.port_report)
- d.addErrback(self.welcome_back)
- return
- elif key == "W":
- self.queue_game.put(self.c + key + self.r + self.nl)
- self.game.gamedata.reset_warps()
- # Activate CIM Warp Report
- report = CIMWarpReport(self.game)
- d = report.whenDone()
- d.addCallback(self.warp_report)
- d.addErrback(self.welcome_back)
- return
- elif key == "S":
- self.queue_game.put(self.c + key + self.r + self.nl)
- # Scripts
- self.activate_scripts_menu()
- return
- elif key == "D":
- self.queue_game.put(self.c + key + self.r + self.nl)
- # (Re) Display Trade Report
- if self.trade_report:
- for t in self.trade_report:
- self.queue_game.put(t + self.nl)
- else:
- self.queue_game.put("Missing trade_report." + self.nl)
- # self.queue_game.put(pformat(self.portdata).replace("\n", "\n\r") + self.nl)
- # self.queue_game.put(pformat(self.warpdata).replace("\n", "\n\r") + self.nl)
- elif key == "Q":
- self.queue_game.put(self.c + key + self.r + self.nl)
- # This is an example of chaining PlayerInput prompt calls.
- ask = PlayerInput(self.game)
- d = ask.prompt("What is your quest?", 40, name="quest", abort_blank=True)
- # Display the user's input
- d.addCallback(ask.output)
- d.addCallback(
- lambda ignore: ask.prompt(
- "What is your favorite color?", 10, name="color"
- )
- )
- d.addCallback(ask.output)
- d.addCallback(
- lambda ignore: ask.prompt(
- "What is the meaning of the squirrel?",
- 12,
- name="squirrel",
- digits=True,
- )
- )
- d.addCallback(ask.output)
- def show_values(show):
- log.msg(show)
- self.queue_game.put(pformat(show).replace("\n", "\n\r") + self.nl)
- d.addCallback(lambda ignore: show_values(ask.keep))
- d.addCallback(self.welcome_back)
- # On error, just return back
- # This doesn't seem to be getting called.
- # d.addErrback(lambda ignore: self.welcome_back)
- d.addErrback(self.welcome_back)
- return
- elif key == "X":
- self.queue_game.put(self.c + key + self.r + self.nl)
- self.queue_game.put(Boxes.alert("Proxy done.", base="green"))
- self.observer.load(self.save)
- self.save = None
- # It isn't running (NOW), so don't try to stop it.
- # self.keepalive.stop()
- self.keepalive = None
- # Ok, this is a HORRIBLE idea, because the prompt might be
- # outdated.
- # self.queue_game.put(self.prompt)
- self.prompt = None
- # Possibly: Send '\r' to re-display the prompt
- # instead of displaying the original one.
- self.game.queue_player.put("d")
- # Were we asked to do something when we were done here?
- if self.defer:
- reactor.CallLater(0, self.defer.callback)
- # self.defer.callback()
- self.defer = None
- return
- self.keepalive.start(30, True)
- self.menu()
- def activate_scripts_menu(self):
- self.observer.disconnect("player", self.player)
- self.observer.connect("player", self.scripts_player)
- self.scripts_menu()
- def deactivate_scripts_menu(self, *_):
- self.observer.disconnect("player", self.scripts_player)
- self.observer.connect("player", self.player)
- self.welcome_back()
- def scripts_menu(self, *_):
- c1 = merge(Style.BRIGHT + Fore.CYAN)
- c2 = merge(Style.NORMAL + Fore.CYAN)
- box = Boxes(40, color=c1)
- self.queue_game.put(box.top())
- self.queue_game.put(box.row(c1 + "{0:^40}".format("Scripts")))
- self.queue_game.put(box.middle())
- def menu_item(ch, desc):
- row = " {0}{1} {2}-{3} {4:35}".format(c1, ch, c2, c1, desc)
- # self.queue_game.put(
- # " " + c1 + ch + c2 + " - " + c1 + desc + self.nl
- # )
- self.queue_game.put(box.row(row))
- menu_item("1", "Ports (Trades between two sectors)")
- menu_item("2", "Explore (Strange new sectors)")
- menu_item("3", "Space... the final frontier...")
- menu_item("X", "eXit")
- self.queue_game.put(box.bottom())
- self.queue_game.put(" " + c1 + "-=>" + self.r + " ")
- def scripts_player(self, chunk: bytes):
- """ Data from player (in bytes). """
- chunk = chunk.decode("latin-1", "ignore")
- key = chunk.upper()
-
- if key == '1':
- self.queue_game.put(self.c + key + self.r + self.nl)
- # Activate this magical event here
- ports = ScriptPort(self.game)
- d = ports.whenDone()
- # d.addCallback(self.scripts_menu)
- # d.addErrback(self.scripts_menu)
- d.addCallback(self.deactivate_scripts_menu)
- d.addErrback(self.deactivate_scripts_menu)
- return
- elif key == '2':
- self.queue_game.put(self.c + key + self.r + self.nl)
- explore = ScriptExplore(self.game)
- d = explore.whenDone()
- d.addCallback(self.deactivate_scripts_menu)
- d.addErrback(self.deactivate_scripts_menu)
- return
- elif key == '3':
- self.queue_game.put(self.c + key + self.r + self.nl)
- space = ScriptSpace(self.game)
- d = space.whenDone()
- d.addCallback(self.deactivate_scripts_menu)
- d.addErrback(self.deactivate_scripts_menu)
- return
- elif key == 'X':
- self.queue_game.put(self.c + key + self.r + self.nl)
- self.deactivate_scripts_menu()
- return
- else:
- self.queue_game.put(self.c + "?" + self.r + self.nl)
- self.scripts_menu()
- def welcome_back(self, *_):
- log.msg("welcome_back")
- self.keepalive.start(30, True)
- self.menu()
|