#!/usr/bin/env python3 import sys import re from twisted.internet import defer from twisted.internet import protocol from twisted.internet import reactor from twisted.internet import task from twisted.python import log from twisted.python.logfile import DailyLogFile import pendulum from subprocess import check_output from colorama import Fore, Back, Style from itertools import cycle from deprecated import deprecated from pprint import pformat # This isn't the best configuration, but it's simple # and works. Mostly. try: from config_dev import * except ModuleNotFoundError: from config import * # Extract the version information from git. # The match gives us only tags starting with v[0-9]* Using anything else trips up on double digits. version = check_output( [ "git", "describe", "--abbrev=8", "--long", "--tags", "--dirty", "--always", "--match", "v[0-9]*", ], universal_newlines=True, ).strip() def merge(color_string): """ Given a string of colorama ANSI, merge them if you can. """ return color_string.replace("m\x1b[", ";") # https://en.wikipedia.org/wiki/ANSI_escape_code # Cleans all ANSI cleaner = re.compile(r"\x1b\[[0-9;]*[A-Zmh]") # Looks for ANSI (that should be considered to be a newline) # This needs to see what is send when something enters / leaves # the player's current sector. (That doesn't work/isn't # detected. NNY!) It is "\x1b[K" Erase in Line! makeNL = re.compile(r"\x1b\[[0-9;]*[JK]") def treatAsNL(line): """ Replace any ANSI codes that would be better understood as newlines. """ global makeNL return makeNL.sub("\n", line) def cleanANSI(line): """ Remove all ANSI codes. """ global cleaner return cleaner.sub("", line) # return re.sub(r'\x1b\[([0-9,A-Z]{1,2}(;[0-9]{1,2})?(;[0-9]{3})?)?[m|K]?', '', line) PORT_CLASSES = { 1: 'BBS', 2: 'BSB', 3: 'SBB', 4:'SSB', 5:'SBS', 6:'BSS', 7:'SSS', 8:'BBB'} CLASSES_PORT = { v: k for k,v in PORT_CLASSES.items() } from observer import Observer class PlayerInput(object): def __init__(self, game): # I think game gives us access to everything we need self.game = game self.observer = self.game.observer self.save = None self.deferred = None self.queue_game = game.queue_game # default colors, and useful consts self.c = merge(Style.BRIGHT + Fore.WHITE + Back.BLUE) self.r = Style.RESET_ALL self.nl = "\n\r" self.bsb = "\b \b" self.keepalive = None def color(self, c): self.c = c def alive(self): log.msg("PlayerInput.alive()") self.game.queue_player.put(" ") def prompt(self, prompt, limit, default=''): log.msg("PlayerInput({0}, {1}, {2}".format(prompt, limit, default)) self.prompt = prompt self.limit = limit self.default = default self.input = '' assert(self.save is None) assert(self.keepalive is None) # Note: This clears out the server "keep alive" self.save = self.observer.save() self.observer.connect('player', self.get_input) self.keepalive = task.LoopingCall(self.alive) self.keepalive.start(30) # We need to "hide" the game output. # Otherwise it WITH mess up the user input display. self.to_player = self.game.to_player self.game.to_player = False # Display prompt self.queue_game.put(self.r + self.nl + self.c + prompt) # Set "Background of prompt" self.queue_game.put( " " * limit + "\b" * limit) assert(self.deferred is None) d = defer.Deferred() self.deferred = d log.msg("Return deferred ...", self.deferred) return d def get_input(self, chunk): """ Data from player (in bytes) """ chunk = chunk.decode('utf-8', 'ignore') for ch in chunk: if ch == "\b": if len(self.input) > 0: self.queue_game.put(self.bsb) self.input = self.input[0:-1] else: self.queue_game.put("\a") if ch == "\r": self.queue_game.put(self.r + self.nl) log.msg("Restore observer dispatch", self.save) assert(not self.save is None) self.observer.load(self.save) self.save = None log.msg("Disable keepalive") self.keepalive.stop() self.keepalive = None line = self.input self.input = '' assert(not self.deferred is None) # Ok, use deferred.callback, or reactor.callLater? # self.deferred.callback(line) reactor.callLater(0, self.deferred.callback, line) self.deferred = None if ch.isprintable(): if len(self.input) + 1 <= self.limit: self.input += ch self.queue_game.put(ch) else: self.queue_game.put("\a") def output(self, line): """ A default display of what they just input. """ log.msg("PlayerInput.output({0})".format(line)) self.game.queue_game.put(self.r + self.nl + "[{0}]".format(line) + self.nl) return line class ProxyMenu(object): def __init__(self, game): self.nl = "\n\r" self.c = merge(Style.BRIGHT + Fore.YELLOW + Back.BLUE) self.r = Style.RESET_ALL self.c1 = merge(Style.BRIGHT + Fore.BLUE) self.c2 = merge(Style.NORMAL + Fore.BLUE) self.game = game self.queue_game = game.queue_game self.observer = game.observer # Yes, at this point we would activate self.prompt = game.buffer self.save = self.observer.save() self.observer.connect("player", self.player) # If we want it, it's here. self.defer = None self.keepalive = task.LoopingCall(self.awake) self.keepalive.start(30) self.menu() def __del__(self): log.msg("ProxyMenu {0} RIP".format(self)) def whenDone(self): self.defer = defer.Deferred() # Call this to chain something after we exit. return self.defer def menu(self): self.queue_game.put(self.nl + self.c + "TradeWars Proxy active." + self.r + self.nl) def menu_item(ch, desc): self.queue_game.put(" " + self.c1 + ch + self.c2 + " - " + self.c1 + desc + self.nl) self.queue_game.put(" " + self.c1 + "D" + self.c2 + " - " + self.c1 + "Diagnostics" + self.nl) menu_item("Q", "Quest") menu_item("T", "Display current Time") self.queue_game.put(" " + self.c1 + "P" + self.c2 + " - " + self.c1 + "Port CIM Report" + self.nl) self.queue_game.put(" " + self.c1 + "S" + self.c2 + " - " + self.c1 + "Scripts" + self.nl) self.queue_game.put(" " + self.c1 + "X" + self.c2 + " - " + self.c1 + "eXit" + self.nl) self.queue_game.put(" " + self.c + "-=>" + self.r + " ") def awake(self): log.msg("ProxyMenu.awake()") self.game.queue_player.put(" ") def player(self, chunk): """ Data from player (in bytes). """ chunk = chunk.decode("utf-8", 'ignore') key = chunk.upper() log.msg("ProxyMenu.player({0})".format(key)) # Stop the keepalive if we are activating something else # or leaving... self.keepalive.stop() if key == "T": self.queue_game.put(self.c + key + self.r + self.nl) # perform T option now = pendulum.now() self.queue_game.put(self.nl + self.c1 + "Current time " + now.to_datetime_string() + self.nl) elif key == 'Q': self.queue_game.put(self.c + key + self.r + self.nl) # Ok, keepalive is stop(), and we are leaving this. # So, when the PlayerInput is done, have it call welcome_back, # which reinstates keepalive, and displays the menu. ask = PlayerInput(self.game) d = ask.prompt("What is your quest? ", 20) # Display the user's input d.addCallback(ask.output) # To "return" to the ProxyMenu, call self.welcome_back d.addCallback(self.welcome_back) return elif key == 'X': self.queue_game.put(self.c + key + self.r + self.nl) self.observer.load(self.save) self.save = None # It isn't running (NOW), so don't try to stop it. # self.keepalive.stop() self.keepalive = None self.queue_game.put(self.prompt) self.prompt = None # Were we asked to do something when we were done here? if self.defer: reactor.CallLater(0, self.defer.callback) # self.defer.callback() self.defer = None return self.keepalive.start(30, True) self.menu() def welcome_back(self, _): log.msg("welcome_back") self.keepalive.start(30, True) self.menu() from mcp import MCP class Game(protocol.Protocol): def __init__(self): self.buffer = "" self.game = None self.usergame = (None, None) self.to_player = True self.mcp = MCP(self) def connectionMade(self): log.msg("Connected to Game Server") self.queue_player = self.factory.queue_player self.queue_game = self.factory.queue_game self.observer = self.factory.observer self.factory.game = self self.setPlayerReceived() self.observer.connect("user-game", self.show_game) self.mcp.finishSetup() def show_game(self, game): self.usergame = game log.msg("## User-Game:", game) def setPlayerReceived(self): """ Get deferred from client queue, callback clientDataReceived. """ self.queue_player.get().addCallback(self.playerDataReceived) def playerDataReceived(self, chunk): if chunk is False: self.queue_player = None log.msg("Player: disconnected, close connection to game") # I don't believe I need this if I'm using protocol.Factory self.factory.continueTrying = False self.transport.loseConnection() else: # Pass received data to the server if type(chunk) == str: self.transport.write(chunk.encode()) else: self.transport.write(chunk) self.setPlayerReceived() def lineReceived(self, line): """ line received from the game. """ if LOG_LINES: log.msg(">> [{0}]".format(line)) # if "TWGS v2.20b" in line and "www.eisonline.com" in line: # I would still love to "inject" this into the stream # so it is consistent. # self.queue_game.put( # "TWGS Proxy build " # + version # + " is active. \x1b[1;34m~\x1b[0m to activate.\n\r\n\r", # ) if "TradeWars Game Server" in line and "Copyright (C) EIS" in line: # We are not in a game if not self.game is None: # We were in a game. self.game = None self.observer.emit("user-game", (self.factory.player.user, self.game)) if "Selection (? for menu): " in line: game = line[-1] if game >= "A" and game < "Q": self.game = game log.msg("Game: {0}".format(self.game)) self.observer.emit("user-game", (self.factory.player.user, self.game)) self.observer.emit("game-line", line) def getPrompt(self): """ Return the current prompt, stripped of ANSI. """ return cleanANSI(self.buffer) def dataReceived(self, chunk): """ Data received from the Game. Remove backspaces. Treat some ANSI codes as NewLine. Remove ANSI. Break into lines. Trim out carriage returns. Call lineReceived(). "Optionally" pass data to player. FUTURE: trigger on prompt. [cleanANSI(buffer)] """ # Store the text into the buffer before we inject into it. self.buffer += chunk.decode("utf-8", "ignore") # log.msg("data: [{0}]".format(repr(chunk))) if b'TWGS v2.20b' in chunk and b'www.eisonline.com' in chunk: # Ok, we have a possible target. target = b'www.eisonline.com\n\r' pos = chunk.find(target) if pos != -1: # Found it! Inject! message = "TWGS Proxy build " + version + ". ~ to activate in game.\n\r" chunk = chunk[0:pos + len(target)] + message.encode() + chunk[pos + len(target):] # Sequence error: # If I don't put the chunk(I received) to the player. # anything I display -- lineReceive() put() ... would # be out of order. (I'd be responding -- before it # was displayed to the user.) if self.to_player: self.queue_game.put(chunk) # self.buffer += chunk.decode("utf-8", "ignore") # # Begin processing the buffer # # Process any backspaces while "\b" in self.buffer: part = self.buffer.partition("\b") self.buffer = part[0][:-1] + part[2] # Treat some ANSI codes as a newline self.buffer = treatAsNL(self.buffer) # Break into lines while "\n" in self.buffer: part = self.buffer.partition("\n") line = part[0].replace("\r", "") # Clean ANSI codes from line line = cleanANSI(line) self.lineReceived(line) self.buffer = part[2] self.observer.emit("prompt", self.getPrompt()) def connectionLost(self, why): log.msg("Game connectionLost because: %s" % why) self.observer.emit('close', why) self.queue_game.put(False) self.transport.loseConnection() class GlueFactory(protocol.ClientFactory): # class GlueFactory(protocol.Factory): maxDelay = 10 protocol = Game def __init__(self, player): self.player = player self.queue_player = player.queue_player self.queue_game = player.queue_game self.observer = player.observer self.game = None def closeIt(self): log.msg("closeIt") self.queue_game.put(False) def getUser(self, user): log.msg("getUser( %s )" % user) self.twgs.logUser(user) # This was needed when I replaced ClientFactory with Factory. # def clientConnectionLost(self, connector, why): # log.msg("clientconnectionlost: %s" % why) # self.queue_client.put(False) def clientConnectionFailed(self, connector, why): log.msg("connection to game failed: %s" % why) self.queue_game.put(b"Sorry! I'm Unable to connect to the game server.\r\n") # syncterm gets cranky/locks up if we close this here. # (Because it is still sending rlogin information?) reactor.callLater(2, self.closeIt) class Player(protocol.Protocol): def __init__(self): self.buffer = "" self.user = None self.observer = Observer() self.game = None self.glue = None def connectionMade(self): """ connected, setup queues. queue_player is data from player. queue_game is data to player. (possibly from game) """ self.queue_player = defer.DeferredQueue() self.queue_game = defer.DeferredQueue() self.setGameReceived() # Connect GlueFactory to this Player object. factory = GlueFactory(self) self.glue = factory # Make connection to the game server reactor.connectTCP(HOST, PORT, factory, 5) def setGameReceived(self): """ Get deferred from client queue, callback clientDataReceived. """ self.queue_game.get().addCallback(self.gameDataReceived) def gameDataReceived(self, chunk): """ Data received from the game. """ # If we have received game data, it has to be connected. if self.game is None: self.game = self.glue.game if chunk is False: self.transport.loseConnection() else: if type(chunk) == bytes: self.transport.write(chunk) elif type(chunk) == str: self.transport.write(chunk.encode()) else: log.err("gameDataReceived: type (%s) given!", type(chunk)) self.transport.write(chunk) self.setGameReceived() def dataReceived(self, chunk): if self.user is None: self.buffer += chunk.decode("utf-8", "ignore") parts = self.buffer.split("\x00") if len(parts) >= 5: # rlogin we have the username self.user = parts[1] log.msg("User: {0}".format(self.user)) zpos = self.buffer.rindex("\x00") self.buffer = self.buffer[zpos + 1 :] # but I don't need the buffer anymore, so: self.buffer = "" # Pass user value on to whatever needs it. self.observer.emit("user", self.user) # Unfortunately, the ones interested in this don't exist yet. if not self.observer.emit("player", chunk): # Was not dispatched. Send to game. self.queue_player.put(chunk) else: # There's an observer. Don't continue. return if chunk == b"~": if self.game: prompt = self.game.getPrompt() if re.match(r"Command \[TL=.* \(\?=Help\)\? :", prompt): self.observer.emit("hotkey", prompt) else: self.observer.emit("notyet", prompt) # Selection (? for menu): (the game server menu) # Enter your choice: (game menu) # Command [TL=00:00:00]:[1800] (?=Help)? : <- YES! # Computer command [TL=00:00:00]:[613] (?=Help)? if chunk == b"|": # how can I tell if this is active or not? # I no longer see a 'player' observer. GOOD! # log.msg(pformat(self.observer.dispatch)) # prompt = self.game.getPrompt() # if re.match(r"Command \[TL=.* \(\?=Help\)\? :", prompt): menu = ProxyMenu(self.game) def connectionLost(self, why): log.msg("lost connection %s" % why) self.observer.emit('close', why) self.queue_player.put(False) def connectionFailed(self, why): log.msg("connectionFailed: %s" % why) if __name__ == "__main__": if LOGFILE: log.startLogging(DailyLogFile("proxy.log", ".")) else: log.startLogging(sys.stdout) log.msg("This is version: %s" % version) factory = protocol.Factory() factory.protocol = Player reactor.listenTCP(LISTEN_PORT, factory, interface=LISTEN_ON) reactor.run()