123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289 |
- from twisted.internet import reactor
- from twisted.internet import task
- from twisted.internet import defer
- from colorama import Fore, Back, Style
- from twisted.python import log
- import pendulum
- def merge(color_string):
- """ Given a string of colorama ANSI, merge them if you can. """
- return color_string.replace("m\x1b[", ";")
- class PlayerInput(object):
- def __init__(self, game):
- # I think game gives us access to everything we need
- self.game = game
- self.observer = self.game.observer
- self.save = None
- self.deferred = None
- self.queue_game = game.queue_game
- self.keep = {}
- # default colors
- self.c = merge(Style.BRIGHT + Fore.WHITE + Back.BLUE)
- self.cp = merge(Style.BRIGHT + Fore.YELLOW + Back.BLUE)
- # useful consts
- self.r = Style.RESET_ALL
- self.nl = "\n\r"
- self.bsb = "\b \b"
- self.keepalive = None
- def color(self, c):
- self.c = c
- def colorp(self, cp):
- self.cp = cp
- def alive(self):
- log.msg("PlayerInput.alive()")
- self.game.queue_player.put(" ")
- def prompt(self, user_prompt, limit, **kw):
- """ Generate prompt for user input.
- prompt = text displayed.
- limit = # of characters allowed.
- default = (text to default to)
- keywords:
- abort_blank : Abort if they give us blank text.
- """
- log.msg("PlayerInput({0}, {1}, {2}".format(user_prompt, limit, kw))
- self.limit = limit
- self.input = ""
- self.kw = kw
- assert self.save is None
- assert self.keepalive is None
- # Note: This clears out the server "keep alive"
- self.save = self.observer.save()
- self.observer.connect("player", self.get_input)
- self.keepalive = task.LoopingCall(self.alive)
- self.keepalive.start(30)
- # We need to "hide" the game output.
- # Otherwise it WITH mess up the user input display.
- self.to_player = self.game.to_player
- self.game.to_player = False
- # Display prompt
- self.queue_game.put(self.r + self.nl + self.c + user_prompt + " " + self.cp)
- # Set "Background of prompt"
- self.queue_game.put(" " * limit + "\b" * limit)
- assert self.deferred is None
- d = defer.Deferred()
- self.deferred = d
- log.msg("Return deferred ...", self.deferred)
- return d
- def get_input(self, chunk):
- """ Data from player (in bytes) """
- chunk = chunk.decode("utf-8", "ignore")
- for ch in chunk:
- if ch == "\b":
- if len(self.input) > 0:
- self.queue_game.put(self.bsb)
- self.input = self.input[0:-1]
- else:
- self.queue_game.put("\a")
- elif ch == "\r":
- self.queue_game.put(self.r + self.nl)
- log.msg("Restore observer dispatch", self.save)
- assert not self.save is None
- self.observer.load(self.save)
- self.save = None
- log.msg("Disable keepalive")
- self.keepalive.stop()
- self.keepalive = None
- line = self.input
- self.input = ""
- assert not self.deferred is None
- # If they gave us the keyword name, save the value as that name
- if "name" in self.kw:
- self.keep[self.kw["name"]] = line
- if "abort_blank" in self.kw and self.kw["abort_blank"]:
- # Abort on blank input
- if line.strip() == "":
- # Yes, input is blank, abort.
- log.msg("errback, abort_blank")
- reactor.callLater(
- 0, self.deferred.errback, Exception("abort_blank")
- )
- self.deferred = None
- return
- # Ok, use deferred.callback, or reactor.callLater?
- # self.deferred.callback(line)
- reactor.callLater(0, self.deferred.callback, line)
- self.deferred = None
- return
- elif ch.isprintable():
- # Printable, but is it acceptable?
- if "digits" in self.kw:
- if not ch.isdigit():
- self.queue_game.put("\a")
- continue
- if len(self.input) + 1 <= self.limit:
- self.input += ch
- self.queue_game.put(ch)
- else:
- self.queue_game.put("\a")
- def output(self, line):
- """ A default display of what they just input. """
- log.msg("PlayerInput.output({0})".format(line))
- self.game.queue_game.put(self.r + self.nl + "[{0}]".format(line) + self.nl)
- return line
- class ProxyMenu(object):
- def __init__(self, game):
- self.nl = "\n\r"
- self.c = merge(Style.BRIGHT + Fore.YELLOW + Back.BLUE)
- self.r = Style.RESET_ALL
- self.c1 = merge(Style.BRIGHT + Fore.BLUE)
- self.c2 = merge(Style.NORMAL + Fore.CYAN)
- self.game = game
- self.queue_game = game.queue_game
- self.observer = game.observer
- # Yes, at this point we would activate
- self.prompt = game.buffer
- self.save = self.observer.save()
- self.observer.connect("player", self.player)
- # If we want it, it's here.
- self.defer = None
- self.keepalive = task.LoopingCall(self.awake)
- self.keepalive.start(30)
- self.menu()
- def __del__(self):
- log.msg("ProxyMenu {0} RIP".format(self))
- def whenDone(self):
- self.defer = defer.Deferred()
- # Call this to chain something after we exit.
- return self.defer
- def menu(self):
- self.queue_game.put(
- self.nl + self.c + "TradeWars Proxy active." + self.r + self.nl
- )
- def menu_item(ch, desc):
- self.queue_game.put(
- " " + self.c1 + ch + self.c2 + " - " + self.c1 + desc + self.nl
- )
- menu_item("D", "Diagnostics")
- menu_item("Q", "Quest")
- menu_item("T", "Display current Time")
- menu_item("P", "Port CIM Report")
- menu_item("S", "Scripts")
- menu_item("X", "eXit")
- self.queue_game.put(" " + self.c + "-=>" + self.r + " ")
- def awake(self):
- log.msg("ProxyMenu.awake()")
- self.game.queue_player.put(" ")
- def player(self, chunk):
- """ Data from player (in bytes). """
- chunk = chunk.decode("utf-8", "ignore")
- key = chunk.upper()
- log.msg("ProxyMenu.player({0})".format(key))
- # Stop the keepalive if we are activating something else
- # or leaving...
- self.keepalive.stop()
- if key == "T":
- self.queue_game.put(self.c + key + self.r + self.nl)
- # perform T option
- now = pendulum.now()
- self.queue_game.put(
- self.nl + self.c1 + "Current time " + now.to_datetime_string() + self.nl
- )
- elif key == "Q":
- self.queue_game.put(self.c + key + self.r + self.nl)
- # This is an example of chaining PlayerInput prompt calls.
- ask = PlayerInput(self.game)
- d = ask.prompt("What is your quest?", 20, name="quest", abort_blank=True)
- # Display the user's input
- d.addCallback(ask.output)
- d.addCallback(
- lambda ignore: ask.prompt(
- "What is your favorite color?", 5, name="color"
- )
- )
- d.addCallback(ask.output)
- d.addCallback(
- lambda ignore: ask.prompt(
- "What is the meaning of the squirrel?",
- 40,
- name="squirrel",
- digits=True,
- )
- )
- d.addCallback(ask.output)
- def show_values(show):
- log.msg(show)
- d.addCallback(lambda ignore: show_values(ask.keep))
- d.addCallback(self.welcome_back)
- # On error, just return back
- # This doesn't seem to be getting called.
- # d.addErrback(lambda ignore: self.welcome_back)
- d.addErrback(self.welcome_back)
- return
- elif key == "X":
- self.queue_game.put(self.c + key + self.r + self.nl)
- self.observer.load(self.save)
- self.save = None
- # It isn't running (NOW), so don't try to stop it.
- # self.keepalive.stop()
- self.keepalive = None
- self.queue_game.put(self.prompt)
- self.prompt = None
- # Possibly: Send '\r' to re-display the prompt
- # instead of displaying the original one.
- # Were we asked to do something when we were done here?
- if self.defer:
- reactor.CallLater(0, self.defer.callback)
- # self.defer.callback()
- self.defer = None
- return
- self.keepalive.start(30, True)
- self.menu()
- def welcome_back(self, *_):
- log.msg("welcome_back")
- self.keepalive.start(30, True)
- self.menu()
|