#!/home/mystic/mystic/bbs/venv/bin/python import trio import sys from pprint import pprint import json from colorama import Fore, Back, Style import argparse import string def merge(color_string): """ Given a string of colorama ANSI, merge them if you can. """ return color_string.replace("m\x1b[", ";") parser = argparse.ArgumentParser(description="TradeWars Score Bulletins Puller") parser.add_argument("--host", type=str, help="Hostname to contact", default="127.0.0.1") parser.add_argument("--port", type=int, help="Port number", default=2002) parser.add_argument("--username", type=str, help="RLogin Username", default="phil") parser.add_argument("--password", type=str, help="RLogin Password", default="phil") parser.add_argument("--games", type=str, help="TWGS Games to select") parser.add_argument("--name", type=str, help="Server Name for Report") parser.add_argument("--report", type=str, help="Space Reports base filename") parser.add_argument("--debug", action="store_true") parser.add_argument("--telnet", action="store_true") parser.add_argument( "--prompt", type=str, help="TWGS Custom Menu Prompt", default="Quit" ) # Unfortunately, this can't figure out the custom prison TWGS menus. :( # I would like this to allow you to select which reports it'll do. # So I could do the prison report with games A and B. # args = parser.parse_args() HOST = args.host # PORT=2003 PORT = args.port state = 1 options = {} pos = "A" scoreboard = {} settings = {} setup = {} if args.games is not None: options = {x: x for x in args.games} print("We will pull:", ",".join(options.keys())) # NOTE: --games does not work unless you are also using prompt if args.debug: print("ARGS:") pprint(args) print("=" * 40) import re # Cleans all ANSI cleaner = re.compile(r"\x1b\[[0-9;]*[A-Zmh]") def cleanANSI(line): """ Remove all ANSI codes. """ global cleaner return cleaner.sub("", line) def ANSIsplit(line, pos=70): # Verify that we're not inside an ANSI sequence at this very moment. save = "\x1b[s" restore = "\x1b[u" ansi = line.rindex("\x1b[", 0, pos-1) start_ansi = ansi ansi += 2 # to_check = string.digits + string.ascii_letters + ";" to_check = string.digits + ";" while line[ansi] in to_check: ansi += 1 if line[ansi] in string.ascii_letters: ansi += 1 if ansi > pos: # Ok, pos is inside an ANSI escape code. break at the start of the # ANSI code. ansi = start_ansi else: next_ansi = line.index("\x1b[", start_ansi + 1) if pos < next_ansi: ansi = pos else: ansi = next_ansi # else: # next_ansi = line.index("\x1b[", start_ansi + 1) # if pos < next_ansi: # ansi = pos # else: # ansi = next_ansi parts = (line[0:ansi] + save, restore + line[ansi:]) return parts def output(ansi, bbs, line): cleaned = cleanANSI(line) if len(line) > 60: line = re.sub( r"\s{5,}", lambda spaces: "\x1b[{0}C".format(len(spaces.group(0))), line ) if len(line) > 65: lines = ANSIsplit(line, 65) print(lines[0], file=ansi) print(lines[1], file=ansi) else: print(line, file=ansi) print(cleaned, file=bbs) async def space_report(): global args, scoreboard, settings, setup # Clean up the scoreboard for k in scoreboard: l = scoreboard[k][0] # _, x, l = l.rpartition("\x1b[2J") _, x, l = l.rpartition("\x1b[H") scoreboard[k][0] = l nl = "\r\n" reset = Style.RESET_ALL # Being backwards here confuses ansilove. :( # Style first, then Fore, then Background. cyan = merge(Style.BRIGHT + Fore.CYAN) white = merge(Style.BRIGHT + Fore.WHITE) green = merge(Style.NORMAL + Fore.GREEN) if args.report and setup: # Ok, there's a base filename for the report. ansi = args.report + ".ans" bbs = args.report + ".bbs" aFP = open(ansi, "w") bFP = open(bbs, "w") output(aFP, bFP, "{0}Space Report for {1}{2}".format(cyan, white, args.name)) output(aFP, bFP, "") for k in sorted(scoreboard): name = options[k] if name == k: name = "" output( aFP, bFP, "{0}Game {1} {2}{3}{4}".format(cyan, k, white, name, reset) ) # Ok, what settings would I want displayed? # >> Age of game : 65 days Days since start : 65 days # >> Delete if idle : 30 days # >> Time per day : Unlimited Turns per day : 10000 # >> Sectors in game : 25000 Display StarDock : Yes line = "{0}{1:16} : {2}{3:20}{0}{4:16} : {2}{5}{6}".format( green, "Age of game", cyan, setup[k]["Age of game"], "Days since start", setup[k]["Days since start"], reset, ) output(aFP, bFP, line) line = "{0}{1:16} : {2}{3:20}{0}{4:16} : {2}{5}{6}".format( green, "Time per day", cyan, setup[k]["Time per day"], "Turns per day", setup[k]["Turns per day"], reset, ) output(aFP, bFP, line) line = "{0}{1:16} : {2}{3:20}{0}{4:16} : {2}{5}{6}".format( green, "Delete if idle", cyan, setup[k]["Delete if idle"], "Sectors in game", setup[k]["Sectors in game"], reset, ) output(aFP, bFP, line) # line = "{0}{1:16} : {2}{3}".format( # green, "Delete if idle", cyan, setup[k]["Delete if idle"], reset # ) # output(aFP, bFP, line) output(aFP, bFP, "") for line in scoreboard[k]: output(aFP, bFP, line) aFP.close() bFP.close() for k in scoreboard: print(k, "\n".join(scoreboard[k])) # pprint(scoreboard) # for k in settings: # print(k, "\n".join(settings[k])) if False: for k in settings: print(k) for l in settings[k]: # if "[Pause]" in l: # break print(l) pprint(settings) # pprint(setup) if args.report and setup: # Save all the configuration settings to json filename = args.report + ".json" print("Saving setup to:", filename) with open(filename, "w") as fp: json.dump(setup, fp) async def send(client, data): global args if args.debug: print("<<", data) await client.send_all(data.encode("latin-1")) async def readline(client_stream, line): global state, pos, settings, args if state == 1: if "<" in line: parts = {x[0]: x[2:].strip() for x in line.split("<") if x != ""} options.update(parts) # if "Quit" in line: if args.prompt in line: if args.debug: print("Prompt seen!") state += 1 if pos in options: await send(client_stream, pos) else: await send(client_stream, "Q\r") elif state == 3: if pos not in scoreboard: scoreboard[pos] = [] if ( not scoreboard[pos] and (line == "") or (("Enter your choice" in line) or ("Ranking Traders" in line)) ): return scoreboard[pos].append(line) elif state == 5: if pos not in settings: settings[pos] = [] setup[pos] = {} if not settings[pos] and (line == "" or "Enter your choice" in line): return settings[pos].append(line) if ":" in line: line = cleanANSI(line) part2 = line[40:] if ":" in part2: # yes, this is a 2 parter parts = line[0:39].strip().split(":") else: parts = line.strip().split(":") part2 = "" setup[pos][parts[0].strip()] = parts[1].strip() if ":" in part2: parts = part2.strip().split(":") setup[pos][parts[0].strip()] = parts[1].strip() elif state == 6: # if "Quit" in line: if args.prompt in line: pos = chr(ord(pos) + 1) if pos in options: state = 2 await send(client_stream, pos) else: await send(client_stream, "q\r") # elif state == 2: # if "[Pause]" in line: # await client_stream.send_all("\r".encode("latin-1")) if args.debug: print(">>", line) async def prompt(client_stream, line): global state, pos if state == 2: if "[Pause]" in line: # await client_stream.send_all("\r".encode("latin-1")) await send(client_stream, "\r") if "Enter your choice" in line: state = 3 await send(client_stream, "H\r") elif state == 3: if "[Pause]" in line: state = 4 # await client_stream.send_all("\r".encode("latin-1")) await send(client_stream, "\r") # pos = chr(ord(pos) + 1) elif state == 4: if "Enter your choice" in line: state = 5 await send(client_stream, "S\r") elif state == 5: if "[Pause]" in line: await send(client_stream, "\r") if "Enter your choice" in line: state = 6 await send(client_stream, "X\r") if args.debug: print(">>> {!r}".format(line)) async def receiver(client_stream): global args if args.debug: print("receiver: started") if args.telnet: cmd = await client_stream.receive_some(3) await client_stream.send_all("\xff\xfd\xf6".encode("latin-1")) login = "{0}\r".format(args.username) await client_stream.send_all(login.encode("utf-8")) else: # I need to send the rlogin connection information ... rlogin = "\x00{0}\x00{1}\x00ansi-bbs\x009600\x00".format( args.username, args.password ) await client_stream.send_all(rlogin.encode("utf-8")) buffer = "" async for chunk in client_stream: buffer += chunk.decode("latin-1", "ignore") while "\n" in buffer: part = buffer.partition("\n") line = part[0].replace("\r", "") buffer = part[2] await readline(client_stream, line) buffer = buffer.replace("\r", "") if buffer != "": await prompt(client_stream, buffer) print("receiver: closed") # pprint(options) await space_report() async def start(): print("connecting...") client_stream = await trio.open_tcp_stream(HOST, PORT) async with client_stream: async with trio.open_nursery() as nursery: nursery.start_soon(receiver, client_stream) trio.run(start)