talker.py 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380
  1. #!/home/mystic/mystic/bbs/venv/bin/python
  2. import trio
  3. import sys
  4. from pprint import pprint
  5. import json
  6. from colorama import Fore, Back, Style
  7. import argparse
  8. import string
  9. def merge(color_string):
  10. """ Given a string of colorama ANSI, merge them if you can. """
  11. return color_string.replace("m\x1b[", ";")
  12. parser = argparse.ArgumentParser(description="TradeWars Score Bulletins Puller")
  13. parser.add_argument("--host", type=str, help="Hostname to contact", default="127.0.0.1")
  14. parser.add_argument("--port", type=int, help="Port number", default=2002)
  15. parser.add_argument("--username", type=str, help="RLogin Username", default="phil")
  16. parser.add_argument("--password", type=str, help="RLogin Password", default="phil")
  17. parser.add_argument("--games", type=str, help="TWGS Games to select")
  18. parser.add_argument("--name", type=str, help="Server Name for Report")
  19. parser.add_argument("--report", type=str, help="Space Reports base filename")
  20. parser.add_argument("--debug", action="store_true")
  21. parser.add_argument("--telnet", action="store_true")
  22. parser.add_argument(
  23. "--prompt", type=str, help="TWGS Custom Menu Prompt", default="Quit"
  24. )
  25. # Unfortunately, this can't figure out the custom prison TWGS menus. :(
  26. # I would like this to allow you to select which reports it'll do.
  27. # So I could do the prison report with games A and B.
  28. #
  29. args = parser.parse_args()
  30. HOST = args.host
  31. # PORT=2003
  32. PORT = args.port
  33. state = 1
  34. options = {}
  35. pos = "A"
  36. scoreboard = {}
  37. settings = {}
  38. setup = {}
  39. if args.games is not None:
  40. options = {x: x for x in args.games}
  41. print("We will pull:", ",".join(options.keys()))
  42. # NOTE: --games does not work unless you are also using prompt
  43. if args.debug:
  44. print("ARGS:")
  45. pprint(args)
  46. print("=" * 40)
  47. import re
  48. # Cleans all ANSI
  49. cleaner = re.compile(r"\x1b\[[0-9;]*[A-Zmh]")
  50. def cleanANSI(line):
  51. """ Remove all ANSI codes. """
  52. global cleaner
  53. return cleaner.sub("", line)
  54. def ANSIsplit(line, pos=70):
  55. # Verify that we're not inside an ANSI sequence at this very moment.
  56. save = "\x1b[s"
  57. restore = "\x1b[u"
  58. ansi = line.rindex("\x1b[", 0, pos-1)
  59. start_ansi = ansi
  60. ansi += 2
  61. # to_check = string.digits + string.ascii_letters + ";"
  62. to_check = string.digits + ";"
  63. while line[ansi] in to_check:
  64. ansi += 1
  65. if line[ansi] in string.ascii_letters:
  66. ansi += 1
  67. if ansi > pos:
  68. # Ok, pos is inside an ANSI escape code. break at the start of the
  69. # ANSI code.
  70. ansi = start_ansi
  71. else:
  72. next_ansi = line.index("\x1b[", start_ansi + 1)
  73. if pos < next_ansi:
  74. ansi = pos
  75. else:
  76. ansi = next_ansi
  77. # else:
  78. # next_ansi = line.index("\x1b[", start_ansi + 1)
  79. # if pos < next_ansi:
  80. # ansi = pos
  81. # else:
  82. # ansi = next_ansi
  83. parts = (line[0:ansi] + save, restore + line[ansi:])
  84. return parts
  85. def output(ansi, bbs, line):
  86. cleaned = cleanANSI(line)
  87. if len(line) > 60:
  88. line = re.sub(
  89. r"\s{5,}", lambda spaces: "\x1b[{0}C".format(len(spaces.group(0))), line
  90. )
  91. if len(line) > 65:
  92. lines = ANSIsplit(line, 65)
  93. print(lines[0], file=ansi)
  94. print(lines[1], file=ansi)
  95. else:
  96. print(line, file=ansi)
  97. print(cleaned, file=bbs)
  98. async def space_report():
  99. global args, scoreboard, settings, setup
  100. # Clean up the scoreboard
  101. for k in scoreboard:
  102. l = scoreboard[k][0]
  103. # _, x, l = l.rpartition("\x1b[2J")
  104. _, x, l = l.rpartition("\x1b[H")
  105. scoreboard[k][0] = l
  106. nl = "\r\n"
  107. reset = Style.RESET_ALL
  108. cyan = merge(Fore.CYAN + Style.BRIGHT)
  109. white = merge(Fore.WHITE + Style.BRIGHT)
  110. green = merge(Fore.GREEN + Style.NORMAL)
  111. if args.report:
  112. # Ok, there's a base filename for the report.
  113. ansi = args.report + ".ans"
  114. bbs = args.report + ".bbs"
  115. aFP = open(ansi, "w")
  116. bFP = open(bbs, "w")
  117. output(aFP, bFP, "{0}Space Report for {1}{2}".format(cyan, white, args.name))
  118. output(aFP, bFP, "")
  119. for k in sorted(scoreboard):
  120. name = options[k]
  121. if name == k:
  122. name = ""
  123. output(
  124. aFP, bFP, "{0}Game {1} {2}{3}{4}".format(cyan, k, white, name, reset)
  125. )
  126. # Ok, what settings would I want displayed?
  127. # >> Age of game : 65 days Days since start : 65 days
  128. # >> Delete if idle : 30 days
  129. # >> Time per day : Unlimited Turns per day : 10000
  130. # >> Sectors in game : 25000 Display StarDock : Yes
  131. line = "{0}{1:16} : {2}{3:20}{0}{4:16} : {2}{5}{6}".format(
  132. green,
  133. "Age of game",
  134. cyan,
  135. setup[k]["Age of game"],
  136. "Days since start",
  137. setup[k]["Days since start"],
  138. reset,
  139. )
  140. output(aFP, bFP, line)
  141. line = "{0}{1:16} : {2}{3:20}{0}{4:16} : {2}{5}{6}".format(
  142. green,
  143. "Time per day",
  144. cyan,
  145. setup[k]["Time per day"],
  146. "Turns per day",
  147. setup[k]["Turns per day"],
  148. reset,
  149. )
  150. output(aFP, bFP, line)
  151. line = "{0}{1:16} : {2}{3:20}{0}{4:16} : {2}{5}{6}".format(
  152. green,
  153. "Delete if idle",
  154. cyan,
  155. setup[k]["Delete if idle"],
  156. "Sectors in game",
  157. setup[k]["Sectors in game"],
  158. reset,
  159. )
  160. output(aFP, bFP, line)
  161. # line = "{0}{1:16} : {2}{3}".format(
  162. # green, "Delete if idle", cyan, setup[k]["Delete if idle"], reset
  163. # )
  164. # output(aFP, bFP, line)
  165. output(aFP, bFP, "")
  166. for line in scoreboard[k]:
  167. output(aFP, bFP, line)
  168. aFP.close()
  169. bFP.close()
  170. for k in scoreboard:
  171. print(k, "\n".join(scoreboard[k]))
  172. # pprint(scoreboard)
  173. # for k in settings:
  174. # print(k, "\n".join(settings[k]))
  175. if False:
  176. for k in settings:
  177. print(k)
  178. for l in settings[k]:
  179. # if "[Pause]" in l:
  180. # break
  181. print(l)
  182. pprint(settings)
  183. # pprint(setup)
  184. if args.report:
  185. # Save all the configuration settings to json
  186. filename = args.report + ".json"
  187. print("Saving setup to:", filename)
  188. with open(filename, "w") as fp:
  189. json.dump(setup, fp)
  190. async def send(client, data):
  191. global args
  192. if args.debug:
  193. print("<<", data)
  194. await client.send_all(data.encode("latin-1"))
  195. async def readline(client_stream, line):
  196. global state, pos, settings, args
  197. if state == 1:
  198. if "<" in line:
  199. parts = {x[0]: x[2:].strip() for x in line.split("<") if x != ""}
  200. options.update(parts)
  201. # if "Quit" in line:
  202. if args.prompt in line:
  203. if args.debug:
  204. print("Prompt seen!")
  205. state += 1
  206. if pos in options:
  207. await send(client_stream, pos)
  208. else:
  209. await send(client_stream, "Q\r")
  210. elif state == 3:
  211. if pos not in scoreboard:
  212. scoreboard[pos] = []
  213. if (
  214. not scoreboard[pos]
  215. and (line == "")
  216. or (("Enter your choice" in line) or ("Ranking Traders" in line))
  217. ):
  218. return
  219. scoreboard[pos].append(line)
  220. elif state == 5:
  221. if pos not in settings:
  222. settings[pos] = []
  223. setup[pos] = {}
  224. if not settings[pos] and (line == "" or "Enter your choice" in line):
  225. return
  226. settings[pos].append(line)
  227. if ":" in line:
  228. line = cleanANSI(line)
  229. part2 = line[40:]
  230. if ":" in part2:
  231. # yes, this is a 2 parter
  232. parts = line[0:39].strip().split(":")
  233. else:
  234. parts = line.strip().split(":")
  235. part2 = ""
  236. setup[pos][parts[0].strip()] = parts[1].strip()
  237. if ":" in part2:
  238. parts = part2.strip().split(":")
  239. setup[pos][parts[0].strip()] = parts[1].strip()
  240. elif state == 6:
  241. # if "Quit" in line:
  242. if args.prompt in line:
  243. pos = chr(ord(pos) + 1)
  244. if pos in options:
  245. state = 2
  246. await send(client_stream, pos)
  247. else:
  248. await send(client_stream, "q\r")
  249. # elif state == 2:
  250. # if "[Pause]" in line:
  251. # await client_stream.send_all("\r".encode("latin-1"))
  252. if args.debug:
  253. print(">>", line)
  254. async def prompt(client_stream, line):
  255. global state, pos
  256. if state == 2:
  257. if "[Pause]" in line:
  258. # await client_stream.send_all("\r".encode("latin-1"))
  259. await send(client_stream, "\r")
  260. if "Enter your choice" in line:
  261. state = 3
  262. await send(client_stream, "H\r")
  263. elif state == 3:
  264. if "[Pause]" in line:
  265. state = 4
  266. # await client_stream.send_all("\r".encode("latin-1"))
  267. await send(client_stream, "\r")
  268. # pos = chr(ord(pos) + 1)
  269. elif state == 4:
  270. if "Enter your choice" in line:
  271. state = 5
  272. await send(client_stream, "S\r")
  273. elif state == 5:
  274. if "[Pause]" in line:
  275. await send(client_stream, "\r")
  276. if "Enter your choice" in line:
  277. state = 6
  278. await send(client_stream, "X\r")
  279. if args.debug:
  280. print(">>> {!r}".format(line))
  281. async def receiver(client_stream):
  282. global args
  283. if args.debug:
  284. print("receiver: started")
  285. if args.telnet:
  286. cmd = await client_stream.receive_some(3)
  287. await client_stream.send_all("\xff\xfd\xf6".encode("latin-1"))
  288. login = "{0}\r".format(args.username)
  289. await client_stream.send_all(login.encode("utf-8"))
  290. else:
  291. # I need to send the rlogin connection information ...
  292. rlogin = "\x00{0}\x00{1}\x00ansi-bbs\x009600\x00".format(
  293. args.username, args.password
  294. )
  295. await client_stream.send_all(rlogin.encode("utf-8"))
  296. buffer = ""
  297. async for chunk in client_stream:
  298. buffer += chunk.decode("latin-1", "ignore")
  299. while "\n" in buffer:
  300. part = buffer.partition("\n")
  301. line = part[0].replace("\r", "")
  302. buffer = part[2]
  303. await readline(client_stream, line)
  304. buffer = buffer.replace("\r", "")
  305. if buffer != "":
  306. await prompt(client_stream, buffer)
  307. print("receiver: closed")
  308. # pprint(options)
  309. await space_report()
  310. async def start():
  311. print("connecting...")
  312. client_stream = await trio.open_tcp_stream(HOST, PORT)
  313. async with client_stream:
  314. async with trio.open_nursery() as nursery:
  315. nursery.start_soon(receiver, client_stream)
  316. trio.run(start)