import re from twisted.internet import defer from twisted.internet import protocol from twisted.internet import reactor from twisted.internet import task from twisted.internet.task import coiterate # from twisted.python import log import logging import pendulum from colorama import Fore, Back, Style from pprint import pformat from config import config, version log = logging.getLogger(__name__) def merge(color_string: str): """ Given a string of colorama ANSI, merge them if you can. """ return color_string.replace("m\x1b[", ";") # https://en.wikipedia.org/wiki/ANSI_escape_code # Cleans all ANSI cleaner = re.compile(r"\x1b\[[0-9;]*[A-Zmh]") # Looks for ANSI (that should be considered to be a newline) # This needs to see what is send when something enters / leaves # the player's current sector. (That doesn't work/isn't # detected. NNY!) It is "\x1b[K" Erase in Line! makeNL = re.compile(r"\x1b\[[0-9;]*[JK]") def treatAsNL(line: str): """ Replace any ANSI codes that would be better understood as newlines. """ global makeNL return makeNL.sub("\n", line) def cleanANSI(line: str): """ Remove all ANSI codes. """ global cleaner return cleaner.sub("", line) # return re.sub(r'\x1b\[([0-9,A-Z]{1,2}(;[0-9]{1,2})?(;[0-9]{3})?)?[m|K]?', '', line) class UserAdapter(logging.LoggerAdapter): def process(self, msg, kwargs): return "[{0}] {1}".format(self.extra["game"].usergame, msg), kwargs from observer import Observer from flexible import PlayerInput, ProxyMenu from galaxy import GameData, PORT_CLASSES, CLASSES_PORT class Game(protocol.Protocol): def __init__(self): self.buffer = "" self.game = None self.usergame = (None, None) self.gamedata = None self.to_player = True self.linestate = "" baselog = logging.getLogger(__name__) self.log = UserAdapter(baselog, {"game": self}) def connectionMade(self): self.log.info("Connected to Game Server") self.queue_player = self.factory.queue_player self.queue_game = self.factory.queue_game self.observer = self.factory.observer self.factory.game = self self.setPlayerReceived() self.observer.connect("user-game", self.show_game) def show_game(self, game: tuple): self.usergame = game self.log.info("## User-Game: {0}".format(game)) if game[1] is None: if self.gamedata is not None: # start the save coiterate(self.gamedata.save()) self.gamedata = None else: # Load the game data (if any) self.gamedata = GameData(game) coiterate(self.gamedata.load()) def setPlayerReceived(self): """ Get deferred from client queue, callback clientDataReceived. """ self.queue_player.get().addCallback(self.playerDataReceived) def playerDataReceived(self, chunk): if chunk is False: self.queue_player = None self.log.info("Player: disconnected, close connection to game") # I don't believe I need this if I'm using protocol.Factory self.factory.continueTrying = False self.transport.loseConnection() else: # Pass received data to the server if type(chunk) == str: self.transport.write(chunk.encode("latin-1")) log.debug(">> [{0}]".format(chunk)) else: self.transport.write(chunk) self.log.debug(">> [{0}]".format(chunk.decode("latin-1", "ignore"))) self.setPlayerReceived() def warpline(self, line: str): self.log.debug("warp: " + line) # 1 > 3 > 5 > 77 > 999 last_sector = self.lastwarp line = line.replace("(", "").replace(")", "").replace(">", "").strip() for s in line.split(): # Ok, this should be all of the warps. sector = int(s) if last_sector > 0: self.gamedata.warp_to(last_sector, sector) last_sector = sector self.lastwarp = sector def cimline(self, line: str): # log.debug(self.linestate, ":", line) if line[-1] == "%": self.linestate = "portcim" if self.linestate == "warpcim": # warps work = line.strip() if work != "": parts = re.split(r"(?<=\d)\s", work) parts = [int(x) for x in parts] sector = parts.pop(0) self.gamedata.warp_to(sector, *parts) elif self.linestate == "portcim": # ports work = line.replace("%", "") parts = re.parts = re.split(r"(?<=\d)\s", work) if len(parts) == 8: sector = int(parts[0].strip()) data = dict() def portBS(info): if info[0] == "-": bs = "B" else: bs = "S" return (bs, int(info[1:].strip())) data["fuel"] = dict() data["fuel"]["sale"], data["fuel"]["units"] = portBS(parts[1]) data["fuel"]["pct"] = int(parts[2].strip()) data["org"] = dict() data["org"]["sale"], data["org"]["units"] = portBS(parts[3]) data["org"]["pct"] = int(parts[4].strip()) data["equ"] = dict() data["equ"]["sale"], data["equ"]["units"] = portBS(parts[5]) data["equ"]["pct"] = int(parts[6].strip()) # Store what this port is buying/selling data["port"] = ( data["fuel"]["sale"] + data["org"]["sale"] + data["equ"]["sale"] ) # Convert BBS/SBB to Class number 1-8 data["class"] = CLASSES_PORT[data["port"]] self.gamedata.set_port(sector, data) else: self.linestate = "cim" def sectorline(self, line: str): self.log.debug("sector: {0} : {1}".format(self.current_sector, line)) if line.startswith("Beacon : "): pass # get beacon text elif line.startswith("Ports : "): # Ports : Ballista, Class 1 (BBS) self.sector_state = "port" if "<=-DANGER-=>" in line: # Port is destroyed if self.current_sector in self.gamedata.ports: del self.gamedata.ports[self.current_sector] elif "(StarDock)" not in line: _, _, class_port = line.partition(", Class ") c, port = class_port.split(" ") c = int(c) port = port.replace("(", "").replace(")", "") data = {"port": port, "class": c} self.gamedata.set_port(self.current_sector, data) elif line.startswith("Planets : "): # Planets : (O) Flipper self.sector_state = "planet" elif line.startswith("Traders : "): self.sector_state = "trader" elif line.startswith("Ships : "): self.sector_state = "ship" elif line.startswith("Fighters: "): self.sector_state = "fighter" elif line.startswith("NavHaz : "): pass elif line.startswith("Mines : "): self.sector_state = "mine" elif line.startswith(" "): # continues if self.sector_state == "mines": pass if self.sector_state == "planet": pass if self.sector_state == "trader": pass if self.sector_state == "ship": pass elif len(line) > 8 and line[8] == ":": self.sector_state = "normal" elif line.startswith("Warps to Sector(s) :"): # Warps to Sector(s) : 5468 _, _, work = line.partition(":") work = work.strip().replace("(", "").replace(")", "").replace(" - ", " ") parts = [int(x) for x in work.split(" ")] self.log.debug("Sectorline warps {0}".format(parts)) self.gamedata.warp_to(self.current_sector, *parts) self.sector_state = "normal" self.linestate = "" def portline(self, line: str): # Map these items to which keys self.log.debug("portline({0}): {1}".format(self.current_sector, line)) mapto = {"Fuel": "fuel", "Organics": "org", "Equipment": "equ"} if "%" in line: # Fuel Ore Buying 2890 100% 0 work = line.replace("Fuel Ore", "Fuel").replace("%", "") parts = re.split(r"\s+", work) data = { mapto[parts[0]]: { "sale": parts[1][0], "units": parts[2], "pct": int(parts[3]), } } # log.debug("Setting {0} to {1}".format(self.current_sector, data)) self.gamedata.set_port(self.current_sector, data) # log.debug("NOW: {0}".format(self.gamedata.ports[self.current_sector])) def lineReceived(self, line: str): """ line received from the game. """ if "log_lines" in config and config["log_lines"]: self.log.debug("<< [{0}]".format(line)) if "TradeWars Game Server" in line and "Copyright (C) EIS" in line: # We are not in a game if not self.game is None: # We were in a game. self.game = None self.observer.emit("user-game", (self.factory.player.user, self.game)) elif "Selection (? for menu): " in line: game = line[-1] if game >= "A" and game < "Q": self.game = game log.info("Game: {0}".format(self.game)) self.observer.emit("user-game", (self.factory.player.user, self.game)) # Process.pas parse line if line.startswith("Command [TL=]"): # Ok, get the current sector from this _, _, sector = line.partition("]:[") sector, _, _ = sector.partition("]") self.current_sector = int(sector) self.log.info("current sector: {0}".format(self.current_sector)) if line.startswith("The shortest path (") or line.startswith(" TO > "): self.linestate = "warpline" self.lastwarp = 0 elif line.startswith(" Items Status Trading % of max OnBoard"): self.linestate = "port" elif self.linestate == "warpline": if line == "": self.linestate = "" else: self.warpline(line) elif self.linestate == "portcim" or self.linestate == "warpcim": if line == ": ENDINTERROG": self.linestate = "" elif line == ": ": self.linestate = "cim" elif line == "": self.linestate = "" else: if len(line) > 2: self.cimline(line) elif self.linestate == "cim": if line == ": ENDINTERROG" or line == "": self.linestate = "" elif len(line) > 2: if line.rstrip()[-1] == "%": self.linestate = "portcim" else: self.linestate = "warpcim" self.cimline(line) elif line.startswith(": "): self.linestate = "cim" elif line.startswith("Sector : "): # Sector : 2565 in uncharted space. self.linestate = "sector" work = line.strip() parts = re.split(r"\s+", work) self.current_sector = int(parts[2]) elif self.linestate == "sector": self.sectorline(line) elif self.linestate == "port": if line == "": self.linestate = "" else: self.portline(line) self.observer.emit("game-line", line) def getPrompt(self): """ Return the current prompt, stripped of ANSI. """ return cleanANSI(self.buffer) def dataReceived(self, chunk): """ Data received from the Game. Remove backspaces. Treat some ANSI codes as NewLine. Remove ANSI. Break into lines. Trim out carriage returns. Call lineReceived(). "Optionally" pass data to player. FUTURE: trigger on prompt. [cleanANSI(buffer)] """ # Store the text into the buffer before we inject into it. self.buffer += chunk.decode("latin-1", "ignore") # log.debug("data: [{0}]".format(repr(chunk))) if b"TWGS v2.20b" in chunk and b"www.eisonline.com" in chunk: # Ok, we have a possible target. target = b"www.eisonline.com\n\r" pos = chunk.find(target) if pos != -1: # Found it! Inject! message = ( "TWGS Proxy build " + version + ". ~ to activate in game.\n\r" ) chunk = ( chunk[0 : pos + len(target)] + message.encode("latin-1") + chunk[pos + len(target) :] ) # Sequence error: # If I don't put the chunk(I received) to the player. # anything I display -- lineReceive() put() ... would # be out of order. (I'd be responding -- before it # was displayed to the user.) if self.to_player: self.queue_game.put(chunk) # self.buffer += chunk.decode("latin-1", "ignore") # # Begin processing the buffer # # Process any backspaces while "\b" in self.buffer: part = self.buffer.partition("\b") self.buffer = part[0][:-1] + part[2] # Treat some ANSI codes as a newline self.buffer = treatAsNL(self.buffer) # Break into lines while "\n" in self.buffer: part = self.buffer.partition("\n") line = part[0].replace("\r", "") # Clean ANSI codes from line line = cleanANSI(line) self.lineReceived(line) self.buffer = part[2] self.observer.emit("prompt", self.getPrompt()) def connectionLost(self, why): self.log.info("Game connectionLost because: %s" % why) self.observer.emit("close", why) self.queue_game.put(False) self.transport.loseConnection() class Player(protocol.Protocol): def __init__(self): self.buffer = "" self.user = None self.observer = Observer() self.game = None self.glue = None def connectionMade(self): """ connected, setup queues. queue_player is data from player. queue_game is data to player. (possibly from game) """ self.queue_player = defer.DeferredQueue() self.queue_game = defer.DeferredQueue() self.setGameReceived() # Connect GlueFactory to this Player object. factory = GlueFactory(self) self.glue = factory # Make connection to the game server reactor.connectTCP(config["host"], config["port"], factory, 5) def setGameReceived(self): """ Get deferred from client queue, callback clientDataReceived. """ self.queue_game.get().addCallback(self.gameDataReceived) def gameDataReceived(self, chunk): """ Data received from the game. """ # If we have received game data, it has to be connected. if self.game is None: self.game = self.glue.game if chunk is False: self.transport.loseConnection() else: if type(chunk) == bytes: self.transport.write(chunk) elif type(chunk) == str: self.transport.write(chunk.encode("latin-1")) else: log.err("gameDataReceived: type ({0}) given!".format(type(chunk))) self.transport.write(chunk) self.setGameReceived() def dataReceived(self, chunk): if self.user is None: self.buffer += chunk.decode("latin-1", "ignore") parts = self.buffer.split("\x00") if len(parts) >= 5: # rlogin we have the username self.user = parts[1] log.info("User: {0}".format(self.user)) zpos = self.buffer.rindex("\x00") self.buffer = self.buffer[zpos + 1 :] # but I don't need the buffer anymore, so: self.buffer = "" # Pass user value on to whatever needs it. self.observer.emit("user", self.user) # Unfortunately, the ones interested in this don't exist yet. if not self.observer.emit("player", chunk): # Was not dispatched. Send to game. self.queue_player.put(chunk) else: # There's an observer. Don't continue. return if chunk == b"~": prompt = self.game.getPrompt() # Selection (? for menu): (the game server menu) # Enter your choice: (game menu) # Command [TL=00:00:00]:[1800] (?=Help)? : <- YES! # Computer command [TL=00:00:00]:[613] (?=Help)? # (and others I've yet to see...) if re.match(r"Command \[TL=.* \(\?=Help\)\? :", prompt): menu = ProxyMenu(self.game) else: nl = "\n\r" r = Style.RESET_ALL log.warn("NNY!") prompt = self.game.buffer self.queue_game.put( r + nl + Style.BRIGHT + "Proxy:" + Style.RESET_ALL + " I can't activate at this time." + nl ) self.queue_game.put(prompt) self.queue_player.put("\a") # self.observer.emit("notyet", prompt) def connectionLost(self, why): log.info("lost connection %s" % why) self.observer.emit("close", why) self.queue_player.put(False) def connectionFailed(self, why): log.error("connectionFailed: %s" % why) class GlueFactory(protocol.ClientFactory): # class GlueFactory(protocol.Factory): maxDelay = 10 protocol = Game def __init__(self, player: Player): self.player = player self.queue_player = player.queue_player self.queue_game = player.queue_game self.observer = player.observer self.game = None def closeIt(self): log.info("closeIt") self.queue_game.put(False) def getUser(self, user): log.msg("getUser( %s )" % user) self.game.logUser(user) # This was needed when I replaced ClientFactory with Factory. # def clientConnectionLost(self, connector, why): # log.debug("clientconnectionlost: %s" % why) # self.queue_client.put(False) def clientConnectionFailed(self, connector, why): log.error("connection to game failed: %s" % why) self.queue_game.put(b"Sorry! I'm Unable to connect to the game server.\r\n") # syncterm gets cranky/locks up if we close this here. # (Because it is still sending rlogin information?) reactor.callLater(2, self.closeIt)