flexible.py 15 KB


  1. from twisted.internet import reactor
  2. from twisted.internet import task
  3. from twisted.internet import defer
  4. from colorama import Fore, Back, Style
  5. from twisted.python import log
  6. from itertools import cycle
  7. import pendulum
  8. from pprint import pformat
  9. def merge(color_string):
  10. """ Given a string of colorama ANSI, merge them if you can. """
  11. return color_string.replace("m\x1b[", ";")
  12. class PlayerInput(object):
  13. def __init__(self, game):
  14. # I think game gives us access to everything we need
  15. self.game = game
  16. self.observer = self.game.observer
  17. self.save = None
  18. self.deferred = None
  19. self.queue_game = game.queue_game
  20. self.keep = {}
  21. # default colors
  22. self.c = merge(Style.BRIGHT + Fore.WHITE + Back.BLUE)
  23. self.cp = merge(Style.BRIGHT + Fore.YELLOW + Back.BLUE)
  24. # useful consts
  25. self.r = Style.RESET_ALL
  26. self.nl = "\n\r"
  27. self.bsb = "\b \b"
  28. self.keepalive = None
  29. def color(self, c):
  30. self.c = c
  31. def colorp(self, cp):
  32. self.cp = cp
  33. def alive(self):
  34. log.msg("PlayerInput.alive()")
  35. self.game.queue_player.put(" ")
  36. def prompt(self, user_prompt, limit, **kw):
  37. """ Generate prompt for user input.
  38. prompt = text displayed.
  39. limit = # of characters allowed.
  40. default = (text to default to)
  41. keywords:
  42. abort_blank : Abort if they give us blank text.
  43. name : Stores the input in self.keep dict.
  44. """
  45. log.msg("PlayerInput({0}, {1}, {2}".format(user_prompt, limit, kw))
  46. self.limit = limit
  47. self.input = ""
  48. self.kw = kw
  49. assert self.save is None
  50. assert self.keepalive is None
  51. # Note: This clears out the server "keep alive"
  52. self.save = self.observer.save()
  53. self.observer.connect("player", self.get_input)
  54. self.keepalive = task.LoopingCall(self.alive)
  55. self.keepalive.start(30)
  56. # We need to "hide" the game output.
  57. # Otherwise it WITH mess up the user input display.
  58. self.to_player = self.game.to_player
  59. self.game.to_player = False
  60. # Display prompt
  61. # self.queue_game.put(self.r + self.nl + self.c + user_prompt + " " + self.cp)
  62. self.queue_game.put(self.r + self.c + user_prompt + self.r + " " + self.cp)
  63. # Set "Background of prompt"
  64. self.queue_game.put(" " * limit + "\b" * limit)
  65. assert self.deferred is None
  66. d = defer.Deferred()
  67. self.deferred = d
  68. log.msg("Return deferred ...", self.deferred)
  69. return d
  70. def get_input(self, chunk):
  71. """ Data from player (in bytes) """
  72. chunk = chunk.decode("utf-8", "ignore")
  73. for ch in chunk:
  74. if ch == "\b":
  75. if len(self.input) > 0:
  76. self.queue_game.put(self.bsb)
  77. self.input = self.input[0:-1]
  78. else:
  79. self.queue_game.put("\a")
  80. elif ch == "\r":
  81. self.queue_game.put(self.r + self.nl)
  82. log.msg("Restore observer dispatch", self.save)
  83. assert not self.save is None
  84. self.observer.load(self.save)
  85. self.save = None
  86. log.msg("Disable keepalive")
  87. self.keepalive.stop()
  88. self.keepalive = None
  89. line = self.input
  90. self.input = ""
  91. assert not self.deferred is None
  92. self.game.to_player = self.to_player
  93. # If they gave us the keyword name, save the value as that name
  94. if "name" in self.kw:
  95. self.keep[self.kw["name"]] = line
  96. if "abort_blank" in self.kw and self.kw["abort_blank"]:
  97. # Abort on blank input
  98. if line.strip() == "":
  99. # Yes, input is blank, abort.
  100. log.msg("errback, abort_blank")
  101. reactor.callLater(
  102. 0, self.deferred.errback, Exception("abort_blank")
  103. )
  104. self.deferred = None
  105. return
  106. # Ok, use deferred.callback, or reactor.callLater?
  107. # self.deferred.callback(line)
  108. reactor.callLater(0, self.deferred.callback, line)
  109. self.deferred = None
  110. return
  111. elif ch.isprintable():
  112. # Printable, but is it acceptable?
  113. if "digits" in self.kw:
  114. if not ch.isdigit():
  115. self.queue_game.put("\a")
  116. continue
  117. if len(self.input) + 1 <= self.limit:
  118. self.input += ch
  119. self.queue_game.put(ch)
  120. else:
  121. self.queue_game.put("\a")
  122. def output(self, line):
  123. """ A default display of what they just input. """
  124. log.msg("PlayerInput.output({0})".format(line))
  125. self.game.queue_game.put(self.r + "[{0}]".format(line) + self.nl)
  126. return line
  127. PORT_CLASSES = {
  128. 1: "BBS",
  129. 2: "BSB",
  130. 3: "SBB",
  131. 4: "SSB",
  132. 5: "SBS",
  133. 6: "BSS",
  134. 7: "SSS",
  135. 8: "BBB",
  136. }
  137. CLASSES_PORT = {v: k for k, v in PORT_CLASSES.items()}
  138. import re
  139. class CIMPortReport(object):
  140. def __init__(self, game):
  141. self.game = game
  142. self.queue_game = game.queue_game
  143. self.queue_player = game.queue_player
  144. self.observer = game.observer
  145. # Yes, at this point we would activate
  146. self.prompt = game.buffer
  147. self.save = self.observer.save()
  148. # I actually don't want the player input, but I'll grab it anyway.
  149. self.observer.connect("player", self.player)
  150. self.observer.connect("prompt", self.game_prompt)
  151. self.observer.connect("game-line", self.game_line)
  152. # If we want it, it's here.
  153. self.defer = None
  154. self.to_player = self.game.to_player
  155. log.msg("to_player (stored)", self.to_player)
  156. # Hide what's happening from the player
  157. self.game.to_player = False
  158. self.queue_player.put("^") # Activate CIM
  159. self.state = 1
  160. self.portdata = {}
  161. self.portcycle = cycle(["/", "-", "\\", "|"])
  162. def game_prompt(self, prompt):
  163. if prompt == ": ":
  164. if self.state == 1:
  165. # Ok, then we're ready to request the port report
  166. self.portcycle = cycle(["/", "-", "\\", "|"])
  167. self.queue_player.put("R")
  168. self.state = 2
  169. if self.state == 2:
  170. self.queue_player.put("Q")
  171. self.state = 3
  172. if re.match(r"Command \[TL=.* \(\?=Help\)\? :", prompt):
  173. if self.state == 3:
  174. # Ok, time to exit
  175. # exit from this...
  176. self.game.to_player = self.to_player
  177. self.observer.load(self.save)
  178. self.save = None
  179. self.queue_game.put("\b \b\r\n")
  180. if not self.defer is None:
  181. self.defer.callback(self.portdata)
  182. self.defer = None
  183. def game_line(self, line):
  184. if line == "" or line == ": ":
  185. return
  186. if line == ": ENDINTERROG":
  187. return
  188. # This should be the CIM Report Data -- parse it
  189. if self.portcycle:
  190. if len(self.portdata) % 10 == 0:
  191. self.queue_game.put("\b" + next(self.portcycle))
  192. work = line.replace("%", "")
  193. parts = re.split(r"(?<=\d)\s", work)
  194. if len(parts) == 8:
  195. port = int(parts[0].strip())
  196. data = dict()
  197. def portBS(info):
  198. if info[0] == "-":
  199. bs = "B"
  200. else:
  201. bs = "S"
  202. return (bs, int(info[1:].strip()))
  203. data["fuel"] = dict()
  204. data["fuel"]["sale"], data["fuel"]["units"] = portBS(parts[1])
  205. data["fuel"]["pct"] = int(parts[2].strip())
  206. data["org"] = dict()
  207. data["org"]["sale"], data["org"]["units"] = portBS(parts[3])
  208. data["org"]["pct"] = int(parts[4].strip())
  209. data["equ"] = dict()
  210. data["equ"]["sale"], data["equ"]["units"] = portBS(parts[5])
  211. data["equ"]["pct"] = int(parts[6].strip())
  212. # Store what this port is buying/selling
  213. data["port"] = (
  214. data["fuel"]["sale"] + data["org"]["sale"] + data["equ"]["sale"]
  215. )
  216. # Convert BBS/SBB to Class number 1-8
  217. data["class"] = CLASSES_PORT[data["port"]]
  218. self.portdata[port] = data
  219. else:
  220. log.msg("CIMPortReport:", line, "???")
  221. def __del__(self):
  222. log.msg("CIMPortReport {0} RIP".format(self))
  223. def whenDone(self):
  224. self.defer = defer.Deferred()
  225. # Call this to chain something after we exit.
  226. return self.defer
  227. def player(self, chunk):
  228. """ Data from player (in bytes). """
  229. chunk = chunk.decode("utf-8", "ignore")
  230. key = chunk.upper()
  231. log.msg("CIMPortReport.player({0}) : I AM stopping...".format(key))
  232. # Stop the keepalive if we are activating something else
  233. # or leaving...
  234. # self.keepalive.stop()
  235. self.queue_game.put("\b \b\r\n")
  236. if not self.defer is None:
  237. # We have something, so:
  238. self.game.to_player = self.to_player
  239. self.observer.load(self.save)
  240. self.save = None
  241. self.defer.errback(Exception("User Abort"))
  242. self.defer = None
  243. else:
  244. # Still "exit" out.
  245. self.game.to_player = self.to_player
  246. self.observer.load(self.save)
  247. class ProxyMenu(object):
  248. def __init__(self, game):
  249. self.nl = "\n\r"
  250. self.c = merge(Style.BRIGHT + Fore.YELLOW + Back.BLUE)
  251. self.r = Style.RESET_ALL
  252. self.c1 = merge(Style.BRIGHT + Fore.BLUE)
  253. self.c2 = merge(Style.NORMAL + Fore.CYAN)
  254. self.portdata = None
  255. self.game = game
  256. self.queue_game = game.queue_game
  257. self.observer = game.observer
  258. # Yes, at this point we would activate
  259. self.prompt = game.buffer
  260. self.save = self.observer.save()
  261. self.observer.connect("player", self.player)
  262. # If we want it, it's here.
  263. self.defer = None
  264. self.keepalive = task.LoopingCall(self.awake)
  265. self.keepalive.start(30)
  266. self.menu()
  267. def __del__(self):
  268. log.msg("ProxyMenu {0} RIP".format(self))
  269. def whenDone(self):
  270. self.defer = defer.Deferred()
  271. # Call this to chain something after we exit.
  272. return self.defer
  273. def menu(self):
  274. self.queue_game.put(
  275. self.nl + self.c + "TradeWars Proxy active." + self.r + self.nl
  276. )
  277. def menu_item(ch, desc):
  278. self.queue_game.put(
  279. " " + self.c1 + ch + self.c2 + " - " + self.c1 + desc + self.nl
  280. )
  281. menu_item("D", "Diagnostics")
  282. menu_item("Q", "Quest")
  283. menu_item("T", "Display current Time")
  284. menu_item("P", "Port CIM Report")
  285. menu_item("S", "Scripts")
  286. menu_item("X", "eXit")
  287. self.queue_game.put(" " + self.c + "-=>" + self.r + " ")
  288. def awake(self):
  289. log.msg("ProxyMenu.awake()")
  290. self.game.queue_player.put(" ")
  291. def port_report(self, portdata):
  292. self.portdata = portdata
  293. self.queue_game.put("Loaded {0} records.".format(len(portdata)) + self.nl)
  294. self.welcome_back()
  295. def player(self, chunk):
  296. """ Data from player (in bytes). """
  297. chunk = chunk.decode("utf-8", "ignore")
  298. key = chunk.upper()
  299. log.msg("ProxyMenu.player({0})".format(key))
  300. # Stop the keepalive if we are activating something else
  301. # or leaving...
  302. self.keepalive.stop()
  303. if key == "T":
  304. self.queue_game.put(self.c + key + self.r + self.nl)
  305. # perform T option
  306. now = pendulum.now()
  307. self.queue_game.put(
  308. self.nl + self.c1 + "Current time " + now.to_datetime_string() + self.nl
  309. )
  310. elif key == "P":
  311. self.queue_game.put(self.c + key + self.r + self.nl)
  312. # Activate CIM Port Report
  313. report = CIMPortReport(self.game)
  314. d = report.whenDone()
  315. d.addCallback(self.port_report)
  316. d.addErrback(self.welcome_back)
  317. return
  318. elif key == "D":
  319. self.queue_game.put(self.c + key + self.r + self.nl)
  320. self.queue_game.put(pformat(self.portdata).replace("\n", "\n\r") + self.nl)
  321. elif key == "Q":
  322. self.queue_game.put(self.c + key + self.r + self.nl)
  323. # This is an example of chaining PlayerInput prompt calls.
  324. ask = PlayerInput(self.game)
  325. d = ask.prompt("What is your quest?", 40, name="quest", abort_blank=True)
  326. # Display the user's input
  327. d.addCallback(ask.output)
  328. d.addCallback(
  329. lambda ignore: ask.prompt(
  330. "What is your favorite color?", 10, name="color"
  331. )
  332. )
  333. d.addCallback(ask.output)
  334. d.addCallback(
  335. lambda ignore: ask.prompt(
  336. "What is the meaning of the squirrel?",
  337. 12,
  338. name="squirrel",
  339. digits=True,
  340. )
  341. )
  342. d.addCallback(ask.output)
  343. def show_values(show):
  344. log.msg(show)
  345. self.queue_game.put(pformat(show).replace("\n", "\n\r") + self.nl)
  346. d.addCallback(lambda ignore: show_values(ask.keep))
  347. d.addCallback(self.welcome_back)
  348. # On error, just return back
  349. # This doesn't seem to be getting called.
  350. # d.addErrback(lambda ignore: self.welcome_back)
  351. d.addErrback(self.welcome_back)
  352. return
  353. elif key == "X":
  354. self.queue_game.put(self.c + key + self.r + self.nl)
  355. self.observer.load(self.save)
  356. self.save = None
  357. # It isn't running (NOW), so don't try to stop it.
  358. # self.keepalive.stop()
  359. self.keepalive = None
  360. self.queue_game.put(self.prompt)
  361. self.prompt = None
  362. # Possibly: Send '\r' to re-display the prompt
  363. # instead of displaying the original one.
  364. # Were we asked to do something when we were done here?
  365. if self.defer:
  366. reactor.CallLater(0, self.defer.callback)
  367. # self.defer.callback()
  368. self.defer = None
  369. return
  370. self.keepalive.start(30, True)
  371. self.menu()
  372. def welcome_back(self, *_):
  373. log.msg("welcome_back")
  374. self.keepalive.start(30, True)
  375. self.menu()