|
- #!/home/mystic/mystic/bbs/venv/bin/python
- import trio
- import sys
- from pprint import pprint
- import json
- from colorama import Fore, Back, Style
- import argparse
- import string
- def merge(color_string):
- """ Given a string of colorama ANSI, merge them if you can. """
- return color_string.replace("m\x1b[", ";")
- parser = argparse.ArgumentParser(description="TradeWars Score Bulletins Puller")
- parser.add_argument("--host", type=str, help="Hostname to contact", default="127.0.0.1")
- parser.add_argument("--port", type=int, help="Port number", default=2002)
- parser.add_argument("--username", type=str, help="RLogin Username", default="phil")
- parser.add_argument("--password", type=str, help="RLogin Password", default="phil")
- parser.add_argument("--games", type=str, help="TWGS Games to select")
- parser.add_argument("--name", type=str, help="Server Name for Report")
- parser.add_argument("--report", type=str, help="Space Reports base filename")
- parser.add_argument("--debug", action="store_true")
- parser.add_argument(
- "--prompt", type=str, help="TWGS Custom Menu Prompt", default="Quit"
- )
- # Unfortunately, this can't figure out the custom prison TWGS menus. :(
- # I would like this to allow you to select which reports it'll do.
- # So I could do the prison report with games A and B.
- #
- args = parser.parse_args()
- HOST = args.host
- # PORT=2003
- PORT = args.port
- state = 1
- options = {}
- pos = "A"
- scoreboard = {}
- settings = {}
- setup = {}
- if args.games is not None:
- options = {x: x for x in args.games}
- print("We will pull:", ",".join(options.keys()))
- # NOTE: --games does not work unless you are also using prompt
- if args.debug:
- print("ARGS:")
- pprint(args)
- print("=" * 40)
- import re
- # Cleans all ANSI
- cleaner = re.compile(r"\x1b\[[0-9;]*[A-Zmh]")
- def cleanANSI(line):
- """ Remove all ANSI codes. """
- global cleaner
- return cleaner.sub("", line)
- def ANSIsplit(line, pos=70):
- # Verify that we're not inside an ANSI sequence at this very moment.
- save = "\x1b[s"
- restore = "\x1b[u"
- ansi = line.rindex("\x1b[", 0, pos-1)
- start_ansi = ansi
- ansi += 2
- # to_check = string.digits + string.ascii_letters + ";"
- to_check = string.digits + ";"
- while line[ansi] in to_check:
- ansi += 1
- if line[ansi] in string.ascii_letters:
- ansi += 1
- if ansi > pos:
- # Ok, pos is inside an ANSI escape code. break at the start of the
- # ANSI code.
- ansi = start_ansi
- else:
- next_ansi = line.index("\x1b[", start_ansi + 1)
- if pos < next_ansi:
- ansi = pos
- else:
- ansi = next_ansi
- # else:
- # next_ansi = line.index("\x1b[", start_ansi + 1)
- # if pos < next_ansi:
- # ansi = pos
- # else:
- # ansi = next_ansi
- parts = (line[0:ansi] + save, restore + line[ansi:])
- return parts
- def output(ansi, bbs, line):
- cleaned = cleanANSI(line)
- if len(line) > 60:
- line = re.sub(
- r"\s{5,}", lambda spaces: "\x1b[{0}C".format(len(spaces.group(0))), line
- )
- if len(line) > 65:
- lines = ANSIsplit(line, 65)
- print(lines[0], file=ansi)
- print(lines[1], file=ansi)
- else:
- print(line, file=ansi)
- print(cleaned, file=bbs)
- async def space_report():
- global args, scoreboard, settings, setup
- # Clean up the scoreboard
- for k in scoreboard:
- l = scoreboard[k][0]
- # _, x, l = l.rpartition("\x1b[2J")
- _, x, l = l.rpartition("\x1b[H")
- scoreboard[k][0] = l
- nl = "\r\n"
- reset = Style.RESET_ALL
- cyan = merge(Fore.CYAN + Style.BRIGHT)
- white = merge(Fore.WHITE + Style.BRIGHT)
- green = merge(Fore.GREEN + Style.NORMAL)
- if args.report:
- # Ok, there's a base filename for the report.
- ansi = args.report + ".ans"
- bbs = args.report + ".bbs"
- aFP = open(ansi, "w")
- bFP = open(bbs, "w")
- output(aFP, bFP, "{0}Space Report for {1}{2}".format(cyan, white, args.name))
- output(aFP, bFP, "")
- for k in sorted(scoreboard):
- name = options[k]
- if name == k:
- name = ""
- output(
- aFP, bFP, "{0}Game {1} {2}{3}{4}".format(cyan, k, white, name, reset)
- )
- # Ok, what settings would I want displayed?
- # >> Age of game : 65 days Days since start : 65 days
- # >> Delete if idle : 30 days
- # >> Time per day : Unlimited Turns per day : 10000
- # >> Sectors in game : 25000 Display StarDock : Yes
- line = "{0}{1:16} : {2}{3:20}{0}{4:16} : {2}{5}{6}".format(
- green,
- "Age of game",
- cyan,
- setup[k]["Age of game"],
- "Days since start",
- setup[k]["Days since start"],
- reset,
- )
- output(aFP, bFP, line)
- line = "{0}{1:16} : {2}{3:20}{0}{4:16} : {2}{5}{6}".format(
- green,
- "Time per day",
- cyan,
- setup[k]["Time per day"],
- "Turns per day",
- setup[k]["Turns per day"],
- reset,
- )
- output(aFP, bFP, line)
- line = "{0}{1:16} : {2}{3:20}{0}{4:16} : {2}{5}{6}".format(
- green,
- "Delete if idle",
- cyan,
- setup[k]["Delete if idle"],
- "Sectors in game",
- setup[k]["Sectors in game"],
- reset,
- )
- output(aFP, bFP, line)
- # line = "{0}{1:16} : {2}{3}".format(
- # green, "Delete if idle", cyan, setup[k]["Delete if idle"], reset
- # )
- # output(aFP, bFP, line)
- output(aFP, bFP, "")
- for line in scoreboard[k]:
- output(aFP, bFP, line)
- aFP.close()
- bFP.close()
- for k in scoreboard:
- print(k, "\n".join(scoreboard[k]))
- # pprint(scoreboard)
- # for k in settings:
- # print(k, "\n".join(settings[k]))
- if False:
- for k in settings:
- print(k)
- for l in settings[k]:
- # if "[Pause]" in l:
- # break
- print(l)
- pprint(settings)
- # pprint(setup)
- if args.report:
- # Save all the configuration settings to json
- filename = args.report + ".json"
- print("Saving setup to:", filename)
- with open(filename, "w") as fp:
- json.dump(setup, fp)
- async def send(client, data):
- global args
- if args.debug:
- print("<<", data)
- await client.send_all(data.encode("latin-1"))
- async def readline(client_stream, line):
- global state, pos, settings, args
- if state == 1:
- if "<" in line:
- parts = {x[0]: x[2:].strip() for x in line.split("<") if x != ""}
- options.update(parts)
- # if "Quit" in line:
- if args.prompt in line:
- if args.debug:
- print("Prompt seen!")
- state += 1
- if pos in options:
- await send(client_stream, pos)
- else:
- await send(client_stream, "Q\r")
- elif state == 3:
- if pos not in scoreboard:
- scoreboard[pos] = []
- if (
- not scoreboard[pos]
- and (line == "")
- or (("Enter your choice" in line) or ("Ranking Traders" in line))
- ):
- return
- scoreboard[pos].append(line)
- elif state == 5:
- if pos not in settings:
- settings[pos] = []
- setup[pos] = {}
- if not settings[pos] and (line == "" or "Enter your choice" in line):
- return
- settings[pos].append(line)
- if ":" in line:
- line = cleanANSI(line)
- part2 = line[40:]
- if ":" in part2:
- # yes, this is a 2 parter
- parts = line[0:39].strip().split(":")
- else:
- parts = line.strip().split(":")
- part2 = ""
- setup[pos][parts[0].strip()] = parts[1].strip()
- if ":" in part2:
- parts = part2.strip().split(":")
- setup[pos][parts[0].strip()] = parts[1].strip()
- elif state == 6:
- # if "Quit" in line:
- if args.prompt in line:
- pos = chr(ord(pos) + 1)
- if pos in options:
- state = 2
- await send(client_stream, pos)
- else:
- await send(client_stream, "q\r")
- # elif state == 2:
- # if "[Pause]" in line:
- # await client_stream.send_all("\r".encode("latin-1"))
- if args.debug:
- print(">>", line)
- async def prompt(client_stream, line):
- global state, pos
- if state == 2:
- if "[Pause]" in line:
- # await client_stream.send_all("\r".encode("latin-1"))
- await send(client_stream, "\r")
- if "Enter your choice" in line:
- state = 3
- await send(client_stream, "H\r")
- elif state == 3:
- if "[Pause]" in line:
- state = 4
- # await client_stream.send_all("\r".encode("latin-1"))
- await send(client_stream, "\r")
- # pos = chr(ord(pos) + 1)
- elif state == 4:
- if "Enter your choice" in line:
- state = 5
- await send(client_stream, "S\r")
- elif state == 5:
- if "[Pause]" in line:
- await send(client_stream, "\r")
- if "Enter your choice" in line:
- state = 6
- await send(client_stream, "X\r")
- if args.debug:
- print(">>> {!r}".format(line))
- async def receiver(client_stream):
- global args
- if args.debug:
- print("receiver: started")
- # I need to send the rlogin connection information ...
- rlogin = "\x00{0}\x00{1}\x00ansi-bbs\x009600\x00".format(
- args.username, args.password
- )
- await client_stream.send_all(rlogin.encode("utf-8"))
- buffer = ""
- async for chunk in client_stream:
- buffer += chunk.decode("latin-1", "ignore")
- while "\n" in buffer:
- part = buffer.partition("\n")
- line = part[0].replace("\r", "")
- buffer = part[2]
- await readline(client_stream, line)
- buffer = buffer.replace("\r", "")
- if buffer != "":
- await prompt(client_stream, buffer)
- print("receiver: closed")
- # pprint(options)
- await space_report()
- async def start():
- print("connecting...")
- client_stream = await trio.open_tcp_stream(HOST, PORT)
- async with client_stream:
- async with trio.open_nursery() as nursery:
- nursery.start_soon(receiver, client_stream)
- trio.run(start)
|