|
@@ -0,0 +1,373 @@
|
|
|
+#!/home/mystic/mystic/bbs/venv/bin/python
|
|
|
+
|
|
|
+import trio
|
|
|
+import sys
|
|
|
+from pprint import pprint
|
|
|
+import json
|
|
|
+from colorama import Fore, Back, Style
|
|
|
+import argparse
|
|
|
+import string
|
|
|
+
|
|
|
+def merge(color_string):
|
|
|
+ """ Given a string of colorama ANSI, merge them if you can. """
|
|
|
+ return color_string.replace("m\x1b[", ";")
|
|
|
+
|
|
|
+
|
|
|
+parser = argparse.ArgumentParser(description="TradeWars Score Bulletins Puller")
|
|
|
+parser.add_argument("--host", type=str, help="Hostname to contact", default="127.0.0.1")
|
|
|
+parser.add_argument("--port", type=int, help="Port number", default=2002)
|
|
|
+parser.add_argument("--username", type=str, help="RLogin Username", default="phil")
|
|
|
+parser.add_argument("--password", type=str, help="RLogin Password", default="phil")
|
|
|
+parser.add_argument("--games", type=str, help="TWGS Games to select")
|
|
|
+parser.add_argument("--name", type=str, help="Server Name for Report")
|
|
|
+parser.add_argument("--report", type=str, help="Space Reports base filename")
|
|
|
+parser.add_argument("--debug", action="store_true")
|
|
|
+parser.add_argument(
|
|
|
+ "--prompt", type=str, help="TWGS Custom Menu Prompt", default="Quit"
|
|
|
+)
|
|
|
+
|
|
|
+# Unfortunately, this can't figure out the custom prison TWGS menus. :(
|
|
|
+# I would like this to allow you to select which reports it'll do.
|
|
|
+# So I could do the prison report with games A and B.
|
|
|
+#
|
|
|
+
|
|
|
+args = parser.parse_args()
|
|
|
+HOST = args.host
|
|
|
+# PORT=2003
|
|
|
+PORT = args.port
|
|
|
+
|
|
|
+state = 1
|
|
|
+options = {}
|
|
|
+pos = "A"
|
|
|
+scoreboard = {}
|
|
|
+settings = {}
|
|
|
+setup = {}
|
|
|
+
|
|
|
+if args.games is not None:
|
|
|
+ options = {x: x for x in args.games}
|
|
|
+ print("We will pull:", ",".join(options.keys()))
|
|
|
+
|
|
|
+# NOTE: --games does not work unless you are also using prompt
|
|
|
+
|
|
|
+if args.debug:
|
|
|
+ print("ARGS:")
|
|
|
+ pprint(args)
|
|
|
+ print("=" * 40)
|
|
|
+
|
|
|
+import re
|
|
|
+
|
|
|
+# Cleans all ANSI
|
|
|
+cleaner = re.compile(r"\x1b\[[0-9;]*[A-Zmh]")
|
|
|
+
|
|
|
+
|
|
|
+def cleanANSI(line):
|
|
|
+ """ Remove all ANSI codes. """
|
|
|
+ global cleaner
|
|
|
+ return cleaner.sub("", line)
|
|
|
+
|
|
|
+def ANSIsplit(line, pos=70):
|
|
|
+ # Verify that we're not inside an ANSI sequence at this very moment.
|
|
|
+ save = "\x1b[s"
|
|
|
+ restore = "\x1b[u"
|
|
|
+
|
|
|
+ ansi = line.rindex("\x1b[", 0, pos-1)
|
|
|
+ start_ansi = ansi
|
|
|
+ ansi += 2
|
|
|
+ # to_check = string.digits + string.ascii_letters + ";"
|
|
|
+ to_check = string.digits + ";"
|
|
|
+
|
|
|
+ while line[ansi] in to_check:
|
|
|
+ ansi += 1
|
|
|
+ if line[ansi] in string.ascii_letters:
|
|
|
+ ansi += 1
|
|
|
+
|
|
|
+ if ansi > pos:
|
|
|
+ # Ok, pos is inside an ANSI escape code. break at the start of the
|
|
|
+ # ANSI code.
|
|
|
+ ansi = start_ansi
|
|
|
+ else:
|
|
|
+ next_ansi = line.index("\x1b[", start_ansi + 1)
|
|
|
+ if pos < next_ansi:
|
|
|
+ ansi = pos
|
|
|
+ else:
|
|
|
+ ansi = next_ansi
|
|
|
+
|
|
|
+ # else:
|
|
|
+ # next_ansi = line.index("\x1b[", start_ansi + 1)
|
|
|
+ # if pos < next_ansi:
|
|
|
+ # ansi = pos
|
|
|
+ # else:
|
|
|
+ # ansi = next_ansi
|
|
|
+
|
|
|
+ parts = (line[0:ansi] + save, restore + line[ansi:])
|
|
|
+ return parts
|
|
|
+
|
|
|
+def output(ansi, bbs, line):
|
|
|
+ cleaned = cleanANSI(line)
|
|
|
+ if len(line) > 60:
|
|
|
+ line = re.sub(
|
|
|
+ r"\s{5,}", lambda spaces: "\x1b[{0}C".format(len(spaces.group(0))), line
|
|
|
+ )
|
|
|
+ if len(line) > 65:
|
|
|
+ lines = ANSIsplit(line, 65)
|
|
|
+ print(lines[0], file=ansi)
|
|
|
+ print(lines[1], file=ansi)
|
|
|
+ else:
|
|
|
+ print(line, file=ansi)
|
|
|
+ print(cleaned, file=bbs)
|
|
|
+
|
|
|
+
|
|
|
+async def space_report():
|
|
|
+ global args, scoreboard, settings, setup
|
|
|
+
|
|
|
+ # Clean up the scoreboard
|
|
|
+ for k in scoreboard:
|
|
|
+ l = scoreboard[k][0]
|
|
|
+ # _, x, l = l.rpartition("\x1b[2J")
|
|
|
+ _, x, l = l.rpartition("\x1b[H")
|
|
|
+ scoreboard[k][0] = l
|
|
|
+
|
|
|
+ nl = "\r\n"
|
|
|
+ reset = Style.RESET_ALL
|
|
|
+ cyan = merge(Fore.CYAN + Style.BRIGHT)
|
|
|
+ white = merge(Fore.WHITE + Style.BRIGHT)
|
|
|
+ green = merge(Fore.GREEN + Style.NORMAL)
|
|
|
+
|
|
|
+ if args.report:
|
|
|
+ # Ok, there's a base filename for the report.
|
|
|
+ ansi = args.report + ".ans"
|
|
|
+ bbs = args.report + ".bbs"
|
|
|
+ aFP = open(ansi, "w")
|
|
|
+ bFP = open(bbs, "w")
|
|
|
+
|
|
|
+ output(aFP, bFP, "{0}Space Report for {1}{2}".format(cyan, white, args.name))
|
|
|
+ output(aFP, bFP, "")
|
|
|
+
|
|
|
+ for k in sorted(scoreboard):
|
|
|
+ name = options[k]
|
|
|
+ if name == k:
|
|
|
+ name = ""
|
|
|
+ output(
|
|
|
+ aFP, bFP, "{0}Game {1} {2}{3}{4}".format(cyan, k, white, name, reset)
|
|
|
+ )
|
|
|
+
|
|
|
+ # Ok, what settings would I want displayed?
|
|
|
+ # >> Age of game : 65 days Days since start : 65 days
|
|
|
+ # >> Delete if idle : 30 days
|
|
|
+ # >> Time per day : Unlimited Turns per day : 10000
|
|
|
+ # >> Sectors in game : 25000 Display StarDock : Yes
|
|
|
+ line = "{0}{1:16} : {2}{3:20}{0}{4:16} : {2}{5}{6}".format(
|
|
|
+ green,
|
|
|
+ "Age of game",
|
|
|
+ cyan,
|
|
|
+ setup[k]["Age of game"],
|
|
|
+ "Days since start",
|
|
|
+ setup[k]["Days since start"],
|
|
|
+ reset,
|
|
|
+ )
|
|
|
+ output(aFP, bFP, line)
|
|
|
+
|
|
|
+ line = "{0}{1:16} : {2}{3:20}{0}{4:16} : {2}{5}{6}".format(
|
|
|
+ green,
|
|
|
+ "Time per day",
|
|
|
+ cyan,
|
|
|
+ setup[k]["Time per day"],
|
|
|
+ "Turns per day",
|
|
|
+ setup[k]["Turns per day"],
|
|
|
+ reset,
|
|
|
+ )
|
|
|
+ output(aFP, bFP, line)
|
|
|
+
|
|
|
+ line = "{0}{1:16} : {2}{3:20}{0}{4:16} : {2}{5}{6}".format(
|
|
|
+ green,
|
|
|
+ "Delete if idle",
|
|
|
+ cyan,
|
|
|
+ setup[k]["Delete if idle"],
|
|
|
+ "Sectors in game",
|
|
|
+ setup[k]["Sectors in game"],
|
|
|
+ reset,
|
|
|
+ )
|
|
|
+ output(aFP, bFP, line)
|
|
|
+
|
|
|
+ # line = "{0}{1:16} : {2}{3}".format(
|
|
|
+ # green, "Delete if idle", cyan, setup[k]["Delete if idle"], reset
|
|
|
+ # )
|
|
|
+ # output(aFP, bFP, line)
|
|
|
+ output(aFP, bFP, "")
|
|
|
+
|
|
|
+ for line in scoreboard[k]:
|
|
|
+ output(aFP, bFP, line)
|
|
|
+
|
|
|
+ aFP.close()
|
|
|
+ bFP.close()
|
|
|
+
|
|
|
+ for k in scoreboard:
|
|
|
+ print(k, "\n".join(scoreboard[k]))
|
|
|
+
|
|
|
+ # pprint(scoreboard)
|
|
|
+ # for k in settings:
|
|
|
+ # print(k, "\n".join(settings[k]))
|
|
|
+
|
|
|
+ if False:
|
|
|
+ for k in settings:
|
|
|
+ print(k)
|
|
|
+ for l in settings[k]:
|
|
|
+ # if "[Pause]" in l:
|
|
|
+ # break
|
|
|
+ print(l)
|
|
|
+ pprint(settings)
|
|
|
+
|
|
|
+ # pprint(setup)
|
|
|
+ if args.report:
|
|
|
+ # Save all the configuration settings to json
|
|
|
+ filename = args.report + ".json"
|
|
|
+ print("Saving setup to:", filename)
|
|
|
+ with open(filename, "w") as fp:
|
|
|
+ json.dump(setup, fp)
|
|
|
+
|
|
|
+
|
|
|
+async def send(client, data):
|
|
|
+ global args
|
|
|
+ if args.debug:
|
|
|
+ print("<<", data)
|
|
|
+ await client.send_all(data.encode("latin-1"))
|
|
|
+
|
|
|
+
|
|
|
+async def readline(client_stream, line):
|
|
|
+ global state, pos, settings, args
|
|
|
+
|
|
|
+ if state == 1:
|
|
|
+ if "<" in line:
|
|
|
+ parts = {x[0]: x[2:].strip() for x in line.split("<") if x != ""}
|
|
|
+ options.update(parts)
|
|
|
+
|
|
|
+ # if "Quit" in line:
|
|
|
+ if args.prompt in line:
|
|
|
+ if args.debug:
|
|
|
+ print("Prompt seen!")
|
|
|
+ state += 1
|
|
|
+ if pos in options:
|
|
|
+ await send(client_stream, pos)
|
|
|
+ else:
|
|
|
+ await send(client_stream, "Q\r")
|
|
|
+ elif state == 3:
|
|
|
+ if pos not in scoreboard:
|
|
|
+ scoreboard[pos] = []
|
|
|
+ if (
|
|
|
+ not scoreboard[pos]
|
|
|
+ and (line == "")
|
|
|
+ or (("Enter your choice" in line) or ("Ranking Traders" in line))
|
|
|
+ ):
|
|
|
+ return
|
|
|
+ scoreboard[pos].append(line)
|
|
|
+
|
|
|
+ elif state == 5:
|
|
|
+ if pos not in settings:
|
|
|
+ settings[pos] = []
|
|
|
+ setup[pos] = {}
|
|
|
+ if not settings[pos] and (line == "" or "Enter your choice" in line):
|
|
|
+ return
|
|
|
+ settings[pos].append(line)
|
|
|
+ if ":" in line:
|
|
|
+ line = cleanANSI(line)
|
|
|
+ part2 = line[40:]
|
|
|
+ if ":" in part2:
|
|
|
+ # yes, this is a 2 parter
|
|
|
+ parts = line[0:39].strip().split(":")
|
|
|
+ else:
|
|
|
+ parts = line.strip().split(":")
|
|
|
+ part2 = ""
|
|
|
+
|
|
|
+ setup[pos][parts[0].strip()] = parts[1].strip()
|
|
|
+ if ":" in part2:
|
|
|
+ parts = part2.strip().split(":")
|
|
|
+ setup[pos][parts[0].strip()] = parts[1].strip()
|
|
|
+
|
|
|
+ elif state == 6:
|
|
|
+ # if "Quit" in line:
|
|
|
+ if args.prompt in line:
|
|
|
+ pos = chr(ord(pos) + 1)
|
|
|
+ if pos in options:
|
|
|
+ state = 2
|
|
|
+ await send(client_stream, pos)
|
|
|
+ else:
|
|
|
+ await send(client_stream, "q\r")
|
|
|
+
|
|
|
+ # elif state == 2:
|
|
|
+ # if "[Pause]" in line:
|
|
|
+ # await client_stream.send_all("\r".encode("latin-1"))
|
|
|
+
|
|
|
+ if args.debug:
|
|
|
+ print(">>", line)
|
|
|
+
|
|
|
+
|
|
|
+async def prompt(client_stream, line):
|
|
|
+ global state, pos
|
|
|
+
|
|
|
+ if state == 2:
|
|
|
+ if "[Pause]" in line:
|
|
|
+ # await client_stream.send_all("\r".encode("latin-1"))
|
|
|
+ await send(client_stream, "\r")
|
|
|
+ if "Enter your choice" in line:
|
|
|
+ state = 3
|
|
|
+ await send(client_stream, "H\r")
|
|
|
+ elif state == 3:
|
|
|
+ if "[Pause]" in line:
|
|
|
+ state = 4
|
|
|
+ # await client_stream.send_all("\r".encode("latin-1"))
|
|
|
+ await send(client_stream, "\r")
|
|
|
+ # pos = chr(ord(pos) + 1)
|
|
|
+ elif state == 4:
|
|
|
+ if "Enter your choice" in line:
|
|
|
+ state = 5
|
|
|
+ await send(client_stream, "S\r")
|
|
|
+ elif state == 5:
|
|
|
+ if "[Pause]" in line:
|
|
|
+ await send(client_stream, "\r")
|
|
|
+ if "Enter your choice" in line:
|
|
|
+ state = 6
|
|
|
+ await send(client_stream, "X\r")
|
|
|
+
|
|
|
+ if args.debug:
|
|
|
+ print(">>> {!r}".format(line))
|
|
|
+
|
|
|
+
|
|
|
+async def receiver(client_stream):
|
|
|
+ global args
|
|
|
+
|
|
|
+ if args.debug:
|
|
|
+ print("receiver: started")
|
|
|
+ # I need to send the rlogin connection information ...
|
|
|
+ rlogin = "\x00{0}\x00{1}\x00ansi-bbs\x009600\x00".format(
|
|
|
+ args.username, args.password
|
|
|
+ )
|
|
|
+ await client_stream.send_all(rlogin.encode("utf-8"))
|
|
|
+ buffer = ""
|
|
|
+ async for chunk in client_stream:
|
|
|
+ buffer += chunk.decode("latin-1", "ignore")
|
|
|
+ while "\n" in buffer:
|
|
|
+ part = buffer.partition("\n")
|
|
|
+ line = part[0].replace("\r", "")
|
|
|
+ buffer = part[2]
|
|
|
+ await readline(client_stream, line)
|
|
|
+
|
|
|
+ buffer = buffer.replace("\r", "")
|
|
|
+ if buffer != "":
|
|
|
+ await prompt(client_stream, buffer)
|
|
|
+
|
|
|
+ print("receiver: closed")
|
|
|
+ # pprint(options)
|
|
|
+
|
|
|
+ await space_report()
|
|
|
+
|
|
|
+
|
|
|
+async def start():
|
|
|
+ print("connecting...")
|
|
|
+ client_stream = await trio.open_tcp_stream(HOST, PORT)
|
|
|
+ async with client_stream:
|
|
|
+ async with trio.open_nursery() as nursery:
|
|
|
+ nursery.start_soon(receiver, client_stream)
|
|
|
+
|
|
|
+
|
|
|
+trio.run(start)
|
|
|
+
|