talker.py 11 KB


  1. #!/home/mystic/mystic/bbs/venv/bin/python
  2. import trio
  3. import sys
  4. from pprint import pprint
  5. import json
  6. from colorama import Fore, Back, Style
  7. import argparse
  8. import string
  9. def merge(color_string):
  10. """ Given a string of colorama ANSI, merge them if you can. """
  11. return color_string.replace("m\x1b[", ";")
  12. parser = argparse.ArgumentParser(description="TradeWars Score Bulletins Puller")
  13. parser.add_argument("--host", type=str, help="Hostname to contact", default="127.0.0.1")
  14. parser.add_argument("--port", type=int, help="Port number", default=2002)
  15. parser.add_argument("--username", type=str, help="RLogin Username", default="phil")
  16. parser.add_argument("--password", type=str, help="RLogin Password", default="phil")
  17. parser.add_argument("--games", type=str, help="TWGS Games to select")
  18. parser.add_argument("--name", type=str, help="Server Name for Report")
  19. parser.add_argument("--report", type=str, help="Space Reports base filename")
  20. parser.add_argument("--debug", action="store_true")
  21. parser.add_argument("--telnet", action="store_true")
  22. parser.add_argument(
  23. "--prompt", type=str, help="TWGS Custom Menu Prompt", default="Quit"
  24. )
  25. # Unfortunately, this can't figure out the custom prison TWGS menus. :(
  26. # I would like this to allow you to select which reports it'll do.
  27. # So I could do the prison report with games A and B.
  28. #
  29. args = parser.parse_args()
  30. HOST = args.host
  31. # PORT=2003
  32. PORT = args.port
  33. state = 1
  34. options = {}
  35. pos = "A"
  36. scoreboard = {}
  37. settings = {}
  38. setup = {}
  39. if args.games is not None:
  40. options = {x: x for x in args.games}
  41. print("We will pull:", ",".join(options.keys()))
  42. # NOTE: --games does not work unless you are also using prompt
  43. if args.debug:
  44. print("ARGS:")
  45. pprint(args)
  46. print("=" * 40)
  47. import re
  48. # Cleans all ANSI
  49. cleaner = re.compile(r"\x1b\[[0-9;]*[A-Zmh]")
  50. def cleanANSI(line):
  51. """ Remove all ANSI codes. """
  52. global cleaner
  53. return cleaner.sub("", line)
  54. def ANSIsplit(line, pos=70):
  55. # Verify that we're not inside an ANSI sequence at this very moment.
  56. save = "\x1b[s"
  57. restore = "\x1b[u"
  58. ansi = line.rindex("\x1b[", 0, pos-1)
  59. start_ansi = ansi
  60. ansi += 2
  61. # to_check = string.digits + string.ascii_letters + ";"
  62. to_check = string.digits + ";"
  63. while line[ansi] in to_check:
  64. ansi += 1
  65. if line[ansi] in string.ascii_letters:
  66. ansi += 1
  67. if ansi > pos:
  68. # Ok, pos is inside an ANSI escape code. break at the start of the
  69. # ANSI code.
  70. ansi = start_ansi
  71. else:
  72. next_ansi = line.index("\x1b[", start_ansi + 1)
  73. if pos < next_ansi:
  74. ansi = pos
  75. else:
  76. ansi = next_ansi
  77. # else:
  78. # next_ansi = line.index("\x1b[", start_ansi + 1)
  79. # if pos < next_ansi:
  80. # ansi = pos
  81. # else:
  82. # ansi = next_ansi
  83. parts = (line[0:ansi] + save, restore + line[ansi:])
  84. return parts
  85. def output(ansi, bbs, line):
  86. cleaned = cleanANSI(line)
  87. if len(line) > 60:
  88. line = re.sub(
  89. r"\s{5,}", lambda spaces: "\x1b[{0}C".format(len(spaces.group(0))), line
  90. )
  91. if len(line) > 65:
  92. lines = ANSIsplit(line, 65)
  93. print(lines[0], file=ansi)
  94. print(lines[1], file=ansi)
  95. else:
  96. print(line, file=ansi)
  97. print(cleaned, file=bbs)
  98. async def space_report():
  99. global args, scoreboard, settings, setup
  100. # Clean up the scoreboard
  101. for k in scoreboard:
  102. l = scoreboard[k][0]
  103. # _, x, l = l.rpartition("\x1b[2J")
  104. _, x, l = l.rpartition("\x1b[H")
  105. scoreboard[k][0] = l
  106. nl = "\r\n"
  107. reset = Style.RESET_ALL
  108. # Being backwards here confuses ansilove. :(
  109. # Style first, then Fore, then Background.
  110. cyan = merge(Style.BRIGHT + Fore.CYAN)
  111. white = merge(Style.BRIGHT + Fore.WHITE)
  112. green = merge(Style.NORMAL + Fore.GREEN)
  113. if args.report and setup:
  114. # Ok, there's a base filename for the report.
  115. ansi = args.report + ".ans"
  116. bbs = args.report + ".bbs"
  117. aFP = open(ansi, "w")
  118. bFP = open(bbs, "w")
  119. output(aFP, bFP, "{0}Space Report for {1}{2}".format(cyan, white, args.name))
  120. output(aFP, bFP, "")
  121. for k in sorted(scoreboard):
  122. name = options[k]
  123. if name == k:
  124. name = ""
  125. output(
  126. aFP, bFP, "{0}Game {1} {2}{3}{4}".format(cyan, k, white, name, reset)
  127. )
  128. # Ok, what settings would I want displayed?
  129. # >> Age of game : 65 days Days since start : 65 days
  130. # >> Delete if idle : 30 days
  131. # >> Time per day : Unlimited Turns per day : 10000
  132. # >> Sectors in game : 25000 Display StarDock : Yes
  133. line = "{0}{1:16} : {2}{3:20}{0}{4:16} : {2}{5}{6}".format(
  134. green,
  135. "Age of game",
  136. cyan,
  137. setup[k]["Age of game"],
  138. "Days since start",
  139. setup[k]["Days since start"],
  140. reset,
  141. )
  142. output(aFP, bFP, line)
  143. line = "{0}{1:16} : {2}{3:20}{0}{4:16} : {2}{5}{6}".format(
  144. green,
  145. "Time per day",
  146. cyan,
  147. setup[k]["Time per day"],
  148. "Turns per day",
  149. setup[k]["Turns per day"],
  150. reset,
  151. )
  152. output(aFP, bFP, line)
  153. line = "{0}{1:16} : {2}{3:20}{0}{4:16} : {2}{5}{6}".format(
  154. green,
  155. "Delete if idle",
  156. cyan,
  157. setup[k]["Delete if idle"],
  158. "Sectors in game",
  159. setup[k]["Sectors in game"],
  160. reset,
  161. )
  162. output(aFP, bFP, line)
  163. # line = "{0}{1:16} : {2}{3}".format(
  164. # green, "Delete if idle", cyan, setup[k]["Delete if idle"], reset
  165. # )
  166. # output(aFP, bFP, line)
  167. output(aFP, bFP, "")
  168. for line in scoreboard[k]:
  169. output(aFP, bFP, line)
  170. aFP.close()
  171. bFP.close()
  172. for k in scoreboard:
  173. print(k, "\n".join(scoreboard[k]))
  174. # pprint(scoreboard)
  175. # for k in settings:
  176. # print(k, "\n".join(settings[k]))
  177. if False:
  178. for k in settings:
  179. print(k)
  180. for l in settings[k]:
  181. # if "[Pause]" in l:
  182. # break
  183. print(l)
  184. pprint(settings)
  185. # pprint(setup)
  186. if args.report and setup:
  187. # Save all the configuration settings to json
  188. filename = args.report + ".json"
  189. print("Saving setup to:", filename)
  190. with open(filename, "w") as fp:
  191. json.dump(setup, fp)
  192. async def send(client, data):
  193. global args
  194. if args.debug:
  195. print("<<", data)
  196. await client.send_all(data.encode("latin-1"))
  197. async def readline(client_stream, line):
  198. global state, pos, settings, args
  199. if state == 1:
  200. if "<" in line:
  201. parts = {x[0]: x[2:].strip() for x in line.split("<") if x != ""}
  202. options.update(parts)
  203. # if "Quit" in line:
  204. if args.prompt in line:
  205. if args.debug:
  206. print("Prompt seen!")
  207. state += 1
  208. if pos in options:
  209. await send(client_stream, pos)
  210. else:
  211. await send(client_stream, "Q\r")
  212. elif state == 3:
  213. if pos not in scoreboard:
  214. scoreboard[pos] = []
  215. if (
  216. not scoreboard[pos]
  217. and (line == "")
  218. or (("Enter your choice" in line) or ("Ranking Traders" in line))
  219. ):
  220. return
  221. scoreboard[pos].append(line)
  222. elif state == 5:
  223. if pos not in settings:
  224. settings[pos] = []
  225. setup[pos] = {}
  226. if not settings[pos] and (line == "" or "Enter your choice" in line):
  227. return
  228. settings[pos].append(line)
  229. if ":" in line:
  230. line = cleanANSI(line)
  231. part2 = line[40:]
  232. if ":" in part2:
  233. # yes, this is a 2 parter
  234. parts = line[0:39].strip().split(":")
  235. else:
  236. parts = line.strip().split(":")
  237. part2 = ""
  238. setup[pos][parts[0].strip()] = parts[1].strip()
  239. if ":" in part2:
  240. parts = part2.strip().split(":")
  241. setup[pos][parts[0].strip()] = parts[1].strip()
  242. elif state == 6:
  243. # if "Quit" in line:
  244. if args.prompt in line:
  245. pos = chr(ord(pos) + 1)
  246. if pos in options:
  247. state = 2
  248. await send(client_stream, pos)
  249. else:
  250. await send(client_stream, "q\r")
  251. # elif state == 2:
  252. # if "[Pause]" in line:
  253. # await client_stream.send_all("\r".encode("latin-1"))
  254. if args.debug:
  255. print(">>", line)
  256. async def prompt(client_stream, line):
  257. global state, pos
  258. if state == 2:
  259. if "[Pause]" in line:
  260. # await client_stream.send_all("\r".encode("latin-1"))
  261. await send(client_stream, "\r")
  262. if "Enter your choice" in line:
  263. state = 3
  264. await send(client_stream, "H\r")
  265. elif state == 3:
  266. if "[Pause]" in line:
  267. state = 4
  268. # await client_stream.send_all("\r".encode("latin-1"))
  269. await send(client_stream, "\r")
  270. # pos = chr(ord(pos) + 1)
  271. elif state == 4:
  272. if "Enter your choice" in line:
  273. state = 5
  274. await send(client_stream, "S\r")
  275. elif state == 5:
  276. if "[Pause]" in line:
  277. await send(client_stream, "\r")
  278. if "Enter your choice" in line:
  279. state = 6
  280. await send(client_stream, "X\r")
  281. if args.debug:
  282. print(">>> {!r}".format(line))
  283. async def receiver(client_stream):
  284. global args
  285. if args.debug:
  286. print("receiver: started")
  287. if args.telnet:
  288. cmd = await client_stream.receive_some(3)
  289. await client_stream.send_all("\xff\xfd\xf6".encode("latin-1"))
  290. login = "{0}\r".format(args.username)
  291. await client_stream.send_all(login.encode("utf-8"))
  292. else:
  293. # I need to send the rlogin connection information ...
  294. rlogin = "\x00{0}\x00{1}\x00ansi-bbs\x009600\x00".format(
  295. args.username, args.password
  296. )
  297. await client_stream.send_all(rlogin.encode("utf-8"))
  298. buffer = ""
  299. async for chunk in client_stream:
  300. buffer += chunk.decode("latin-1", "ignore")
  301. while "\n" in buffer:
  302. part = buffer.partition("\n")
  303. line = part[0].replace("\r", "")
  304. buffer = part[2]
  305. await readline(client_stream, line)
  306. buffer = buffer.replace("\r", "")
  307. if buffer != "":
  308. await prompt(client_stream, buffer)
  309. print("receiver: closed")
  310. # pprint(options)
  311. await space_report()
  312. async def start():
  313. print("connecting...")
  314. client_stream = await trio.open_tcp_stream(HOST, PORT)
  315. async with client_stream:
  316. async with trio.open_nursery() as nursery:
  317. nursery.start_soon(receiver, client_stream)
  318. trio.run(start)