flexible.py 23 KB


  1. from twisted.internet import reactor
  2. from twisted.internet import task
  3. from twisted.internet import defer
  4. from colorama import Fore, Back, Style
  5. from twisted.python import log
  6. from itertools import cycle
  7. import pendulum
  8. from pprint import pformat
  9. def merge(color_string):
  10. """ Given a string of colorama ANSI, merge them if you can. """
  11. return color_string.replace("m\x1b[", ";")
  12. class PlayerInput(object):
  13. """ Player Input
  14. Example:
  15. from flexible import PlayerInput
  16. ask = PlayerInput(self.game)
  17. # abort_blank means, if the input field is blank, abort. Use error_back.
  18. d = ask.prompt("What is your quest?", 40, name="quest", abort_blank=True)
  19. # Display the user's input / but not needed.
  20. d.addCallback(ask.output)
  21. d.addCallback(
  22. lambda ignore: ask.prompt(
  23. "What is your favorite color?", 10, name="color"
  24. )
  25. )
  26. d.addCallback(ask.output)
  27. d.addCallback(
  28. lambda ignore: ask.prompt(
  29. "What is your least favorite number?",
  30. 12,
  31. name="number",
  32. digits=True,
  33. )
  34. )
  35. d.addCallback(ask.output)
  36. def show_values(show):
  37. log.msg(show)
  38. self.queue_game.put(pformat(show).replace("\n", "\n\r") + self.nl)
  39. d.addCallback(lambda ignore: show_values(ask.keep))
  40. d.addCallback(self.welcome_back)
  41. # On error, just return back
  42. d.addErrback(self.welcome_back)
  43. """
  44. def __init__(self, game):
  45. # I think game gives us access to everything we need
  46. self.game = game
  47. self.observer = self.game.observer
  48. self.save = None
  49. self.deferred = None
  50. self.queue_game = game.queue_game
  51. self.keep = {}
  52. # default colors
  53. self.c = merge(Style.BRIGHT + Fore.WHITE + Back.BLUE)
  54. self.cp = merge(Style.BRIGHT + Fore.YELLOW + Back.BLUE)
  55. # useful consts
  56. self.r = Style.RESET_ALL
  57. self.nl = "\n\r"
  58. self.bsb = "\b \b"
  59. self.keepalive = None
  60. def color(self, c):
  61. self.c = c
  62. def colorp(self, cp):
  63. self.cp = cp
  64. def alive(self):
  65. log.msg("PlayerInput.alive()")
  66. self.game.queue_player.put(" ")
  67. def prompt(self, user_prompt, limit, **kw):
  68. """ Generate prompt for user input.
  69. Note: This returns deferred.
  70. prompt = text displayed.
  71. limit = # of characters allowed.
  72. default = (text to default to)
  73. keywords:
  74. abort_blank : Abort if they give us blank text.
  75. name : Stores the input in self.keep dict.
  76. """
  77. log.msg("PlayerInput({0}, {1}, {2}".format(user_prompt, limit, kw))
  78. self.limit = limit
  79. self.input = ""
  80. self.kw = kw
  81. assert self.save is None
  82. assert self.keepalive is None
  83. # Note: This clears out the server "keep alive"
  84. self.save = self.observer.save()
  85. self.observer.connect("player", self.get_input)
  86. self.keepalive = task.LoopingCall(self.alive)
  87. self.keepalive.start(30)
  88. # We need to "hide" the game output.
  89. # Otherwise it WITH mess up the user input display.
  90. self.to_player = self.game.to_player
  91. self.game.to_player = False
  92. # Display prompt
  93. # self.queue_game.put(self.r + self.nl + self.c + user_prompt + " " + self.cp)
  94. self.queue_game.put(self.r + self.c + user_prompt + self.r + " " + self.cp)
  95. # Set "Background of prompt"
  96. self.queue_game.put(" " * limit + "\b" * limit)
  97. assert self.deferred is None
  98. d = defer.Deferred()
  99. self.deferred = d
  100. log.msg("Return deferred ...", self.deferred)
  101. return d
  102. def get_input(self, chunk):
  103. """ Data from player (in bytes) """
  104. chunk = chunk.decode("utf-8", "ignore")
  105. for ch in chunk:
  106. if ch == "\b":
  107. if len(self.input) > 0:
  108. self.queue_game.put(self.bsb)
  109. self.input = self.input[0:-1]
  110. else:
  111. self.queue_game.put("\a")
  112. elif ch == "\r":
  113. self.queue_game.put(self.r + self.nl)
  114. log.msg("Restore observer dispatch", self.save)
  115. assert not self.save is None
  116. self.observer.load(self.save)
  117. self.save = None
  118. log.msg("Disable keepalive")
  119. self.keepalive.stop()
  120. self.keepalive = None
  121. line = self.input
  122. self.input = ""
  123. assert not self.deferred is None
  124. self.game.to_player = self.to_player
  125. # If they gave us the keyword name, save the value as that name
  126. if "name" in self.kw:
  127. self.keep[self.kw["name"]] = line
  128. if "abort_blank" in self.kw and self.kw["abort_blank"]:
  129. # Abort on blank input
  130. if line.strip() == "":
  131. # Yes, input is blank, abort.
  132. log.msg("errback, abort_blank")
  133. reactor.callLater(
  134. 0, self.deferred.errback, Exception("abort_blank")
  135. )
  136. self.deferred = None
  137. return
  138. # Ok, use deferred.callback, or reactor.callLater?
  139. # self.deferred.callback(line)
  140. reactor.callLater(0, self.deferred.callback, line)
  141. self.deferred = None
  142. return
  143. elif ch.isprintable():
  144. # Printable, but is it acceptable?
  145. if "digits" in self.kw:
  146. if not ch.isdigit():
  147. self.queue_game.put("\a")
  148. continue
  149. if len(self.input) + 1 <= self.limit:
  150. self.input += ch
  151. self.queue_game.put(ch)
  152. else:
  153. self.queue_game.put("\a")
  154. def output(self, line):
  155. """ A default display of what they just input. """
  156. log.msg("PlayerInput.output({0})".format(line))
  157. self.game.queue_game.put(self.r + "[{0}]".format(line) + self.nl)
  158. return line
  159. PORT_CLASSES = {
  160. 1: "BBS",
  161. 2: "BSB",
  162. 3: "SBB",
  163. 4: "SSB",
  164. 5: "SBS",
  165. 6: "BSS",
  166. 7: "SSS",
  167. 8: "BBB",
  168. }
  169. CLASSES_PORT = {v: k for k, v in PORT_CLASSES.items()}
  170. import re
  171. class CIMPortReport(object):
  172. """ Parse data from CIM Port Report
  173. Example:
  174. from flexible import CIMPortReport
  175. report = CIMPortReport(self.game)
  176. d = report.whenDone()
  177. d.addCallback(self.port_report)
  178. d.addErrback(self.welcome_back)
  179. def port_report(self, portdata):
  180. self.portdata = portdata
  181. self.queue_game.put("Loaded {0} records.".format(len(portdata)) + self.nl)
  182. self.welcome_back()
  183. def welcome_back(self,*_):
  184. ... restore keep alive timers, etc.
  185. """
  186. def __init__(self, game):
  187. self.game = game
  188. self.queue_game = game.queue_game
  189. self.queue_player = game.queue_player
  190. self.observer = game.observer
  191. # Yes, at this point we would activate
  192. self.prompt = game.buffer
  193. self.save = self.observer.save()
  194. # I actually don't want the player input, but I'll grab it anyway.
  195. self.observer.connect("player", self.player)
  196. self.observer.connect("prompt", self.game_prompt)
  197. self.observer.connect("game-line", self.game_line)
  198. # If we want it, it's here.
  199. self.defer = None
  200. self.to_player = self.game.to_player
  201. log.msg("to_player (stored)", self.to_player)
  202. # Hide what's happening from the player
  203. self.game.to_player = False
  204. self.queue_player.put("^") # Activate CIM
  205. self.state = 1
  206. self.portdata = {}
  207. self.portcycle = cycle(["/", "-", "\\", "|"])
  208. def game_prompt(self, prompt):
  209. if prompt == ": ":
  210. if self.state == 1:
  211. # Ok, then we're ready to request the port report
  212. self.portcycle = cycle(["/", "-", "\\", "|"])
  213. self.queue_player.put("R")
  214. self.state = 2
  215. elif self.state == 2:
  216. self.queue_player.put("Q")
  217. self.state = 3
  218. if re.match(r"Command \[TL=.* \(\?=Help\)\? :", prompt):
  219. if self.state == 3:
  220. # Ok, time to exit
  221. # exit from this...
  222. self.game.to_player = self.to_player
  223. self.observer.load(self.save)
  224. self.save = None
  225. self.game.portdata = self.portdata
  226. self.queue_game.put("\b \b\r\n")
  227. if not self.defer is None:
  228. self.defer.callback(self.portdata)
  229. self.defer = None
  230. def game_line(self, line):
  231. if line == "" or line == ": ":
  232. return
  233. if line == ": ENDINTERROG":
  234. return
  235. # This should be the CIM Report Data -- parse it
  236. if self.portcycle:
  237. if len(self.portdata) % 10 == 0:
  238. self.queue_game.put("\b" + next(self.portcycle))
  239. work = line.replace("%", "")
  240. parts = re.split(r"(?<=\d)\s", work)
  241. if len(parts) == 8:
  242. port = int(parts[0].strip())
  243. data = dict()
  244. def portBS(info):
  245. if info[0] == "-":
  246. bs = "B"
  247. else:
  248. bs = "S"
  249. return (bs, int(info[1:].strip()))
  250. data["fuel"] = dict()
  251. data["fuel"]["sale"], data["fuel"]["units"] = portBS(parts[1])
  252. data["fuel"]["pct"] = int(parts[2].strip())
  253. data["org"] = dict()
  254. data["org"]["sale"], data["org"]["units"] = portBS(parts[3])
  255. data["org"]["pct"] = int(parts[4].strip())
  256. data["equ"] = dict()
  257. data["equ"]["sale"], data["equ"]["units"] = portBS(parts[5])
  258. data["equ"]["pct"] = int(parts[6].strip())
  259. # Store what this port is buying/selling
  260. data["port"] = (
  261. data["fuel"]["sale"] + data["org"]["sale"] + data["equ"]["sale"]
  262. )
  263. # Convert BBS/SBB to Class number 1-8
  264. data["class"] = CLASSES_PORT[data["port"]]
  265. self.portdata[port] = data
  266. else:
  267. log.msg("CIMPortReport:", line, "???")
  268. def __del__(self):
  269. log.msg("CIMPortReport {0} RIP".format(self))
  270. def whenDone(self):
  271. self.defer = defer.Deferred()
  272. # Call this to chain something after we exit.
  273. return self.defer
  274. def player(self, chunk):
  275. """ Data from player (in bytes). """
  276. chunk = chunk.decode("utf-8", "ignore")
  277. key = chunk.upper()
  278. log.msg("CIMPortReport.player({0}) : I AM stopping...".format(key))
  279. # Stop the keepalive if we are activating something else
  280. # or leaving...
  281. # self.keepalive.stop()
  282. self.queue_game.put("\b \b\r\n")
  283. if not self.defer is None:
  284. # We have something, so:
  285. self.game.to_player = self.to_player
  286. self.observer.load(self.save)
  287. self.save = None
  288. self.defer.errback(Exception("User Abort"))
  289. self.defer = None
  290. else:
  291. # Still "exit" out.
  292. self.game.to_player = self.to_player
  293. self.observer.load(self.save)
  294. class ScriptPort(object):
  295. """ Performs the Port script. """
  296. def __init__(self, game):
  297. self.game = game
  298. self.queue_game = game.queue_game
  299. self.queue_player = game.queue_player
  300. self.observer = game.observer
  301. self.r = Style.RESET_ALL
  302. self.nl = "\n\r"
  303. # Activate
  304. self.prompt = game.buffer
  305. self.save = self.observer.save()
  306. self.observer.connect('player', self.player)
  307. self.observer.connect("prompt", self.game_prompt)
  308. self.observer.connect("game-line", self.game_line)
  309. self.defer = None
  310. self.queue_game.put(
  311. self.nl + "Script based on: Port Pair Trading v2.00" + self.r + self.nl
  312. )
  313. self.sector_number = None
  314. self.possible_sectors = None
  315. self.state = 1
  316. self.queue_player.put("D")
  317. # Original, send 'D' to display current sector.
  318. # We could get the sector number from the self.prompt string -- HOWEVER:
  319. # IF! We send 'D', we can also get the sectors around -- we might not even need to
  320. # prompt for sector to trade with (we could possibly figure it out ourselves).
  321. # [Command [TL=00:00:00]:[967] (?=Help)? : D]
  322. # [<Re-Display>]
  323. # []
  324. # [Sector : 967 in uncharted space.]
  325. # [Planets : (M) Into the Darkness]
  326. # [Warps to Sector(s) : 397 - (562) - (639)]
  327. # []
  328. def whenDone(self):
  329. self.defer = defer.Deferred()
  330. # Call this to chain something after we exit.
  331. return self.defer
  332. def deactivate(self):
  333. log.msg("ScriptPort.deactivate")
  334. assert(not self.save is None)
  335. self.observer.load(self.save)
  336. self.save = None
  337. if self.defer:
  338. self.defer.callback('done')
  339. self.defer = None
  340. def player(self, chunk: bytes):
  341. pass
  342. def game_prompt(self, prompt: str):
  343. log.msg("{0} : {1}".format(self.state, prompt))
  344. if self.state == 3:
  345. log.msg("game_prompt: ", prompt)
  346. if re.match(r"Command \[TL=.* \(\?=Help\)\? :", prompt):
  347. self.state = 4
  348. log.msg("Ok, state 4")
  349. self.deactivate()
  350. def game_line(self, line: str):
  351. if self.state == 1:
  352. # First exploration
  353. if line.startswith("Sector :"):
  354. # We have starting sector information
  355. parts = re.split("\s+", line)
  356. self.sector_number = int(parts[2])
  357. # These will be the ones swapped around as we trade back and forth.
  358. self.sector1 = self.sector_number
  359. elif line.startswith("Warps to Sector(s) : "):
  360. # Warps to Sector(s) : 397 - (562) - (639)
  361. _, _, warps = line.partition(':')
  362. warps = warps.replace('-', '').replace('(', '').replace(')', '').strip()
  363. log.msg("Warps: [{0}]".format(warps))
  364. self.warps = [ int(x) for x in re.split("\s+", warps)]
  365. log.msg("Warps: [{0}]".format(self.warps))
  366. self.state = 2
  367. elif self.state == 2:
  368. if line == "":
  369. # Ok, we're done
  370. self.state = 3
  371. # Check to see if we have information on any possible ports
  372. if hasattr(self.game, 'portdata'):
  373. if not self.sector_number in self.game.portdata:
  374. log.msg("Current sector {0} not in portdata.".format(self.sector_number))
  375. self.queue_game.put(self.r + self.nl + "I can't find the current sector in the portdata." + self.nl)
  376. self.deactivate()
  377. return
  378. possible = [ x for x in self.warps if x in self.game.portdata]
  379. log.msg("Possible:", possible)
  380. self.possible = possible
  381. if len(possible) == 0:
  382. self.queue_game.put(self.r + self.nl + "I don't see any ports in [{0}].".format(self.warps) + self.nl)
  383. self.deactivate()
  384. return
  385. elif len(possible) == 1:
  386. # Ok! there's only one!
  387. self.sector1 = possible[0]
  388. # Display possible ports:
  389. # spos = [ str(x) for x in possible]
  390. # self.queue_game.put(self.r + self.nl + self.nl.join(spos) + self.nl)
  391. # At state 3, we only get a prompt.
  392. return
  393. else:
  394. log.msg("We don't have any portdata!")
  395. self.queue_game.put(self.r + self.nl + "I have no portdata. Please run CIM Port Report." + self.nl)
  396. self.deactivate()
  397. return
  398. # elif self.state == 3:
  399. # log.msg("At state 3 [{0}]".format(line))
  400. # self.queue_game.put("At state 3.")
  401. # self.deactivate()
  402. # return
  403. class ProxyMenu(object):
  404. """ Display ProxyMenu
  405. Example:
  406. from flexible import ProxyMenu
  407. if re.match(r"Command \[TL=.* \(\?=Help\)\? :", prompt):
  408. menu = ProxyMenu(self.game)
  409. """
  410. def __init__(self, game):
  411. self.nl = "\n\r"
  412. self.c = merge(Style.BRIGHT + Fore.YELLOW + Back.BLUE)
  413. self.r = Style.RESET_ALL
  414. self.c1 = merge(Style.BRIGHT + Fore.BLUE)
  415. self.c2 = merge(Style.NORMAL + Fore.CYAN)
  416. self.portdata = None
  417. self.game = game
  418. self.queue_game = game.queue_game
  419. self.observer = game.observer
  420. if hasattr(self.game, "portdata"):
  421. self.portdata = self.game.portdata
  422. else:
  423. self.portdata = {}
  424. # Yes, at this point we would activate
  425. self.prompt = game.buffer
  426. self.save = self.observer.save()
  427. self.observer.connect("player", self.player)
  428. # If we want it, it's here.
  429. self.defer = None
  430. self.keepalive = task.LoopingCall(self.awake)
  431. self.keepalive.start(30)
  432. self.menu()
  433. def __del__(self):
  434. log.msg("ProxyMenu {0} RIP".format(self))
  435. def whenDone(self):
  436. self.defer = defer.Deferred()
  437. # Call this to chain something after we exit.
  438. return self.defer
  439. def menu(self):
  440. self.queue_game.put(
  441. self.nl + self.c + "TradeWars Proxy active." + self.r + self.nl
  442. )
  443. def menu_item(ch: str, desc: str):
  444. self.queue_game.put(
  445. " " + self.c1 + ch + self.c2 + " - " + self.c1 + desc + self.nl
  446. )
  447. menu_item("D", "Diagnostics")
  448. menu_item("Q", "Quest")
  449. menu_item("T", "Display current Time")
  450. menu_item("P", "Port CIM Report")
  451. menu_item("S", "Scripts")
  452. menu_item("X", "eXit")
  453. self.queue_game.put(" " + self.c + "-=>" + self.r + " ")
  454. def awake(self):
  455. log.msg("ProxyMenu.awake()")
  456. self.game.queue_player.put(" ")
  457. def port_report(self, portdata: dict):
  458. self.portdata = portdata
  459. self.queue_game.put("Loaded {0} records.".format(len(portdata)) + self.nl)
  460. self.welcome_back()
  461. def player(self, chunk: bytes):
  462. """ Data from player (in bytes). """
  463. chunk = chunk.decode("utf-8", "ignore")
  464. key = chunk.upper()
  465. log.msg("ProxyMenu.player({0})".format(key))
  466. # Stop the keepalive if we are activating something else
  467. # or leaving...
  468. self.keepalive.stop()
  469. if key == "T":
  470. self.queue_game.put(self.c + key + self.r + self.nl)
  471. # perform T option
  472. now = pendulum.now()
  473. self.queue_game.put(
  474. self.nl + self.c1 + "Current time " + now.to_datetime_string() + self.nl
  475. )
  476. elif key == "P":
  477. self.queue_game.put(self.c + key + self.r + self.nl)
  478. # Activate CIM Port Report
  479. report = CIMPortReport(self.game)
  480. d = report.whenDone()
  481. d.addCallback(self.port_report)
  482. d.addErrback(self.welcome_back)
  483. return
  484. elif key == "S":
  485. self.queue_game.put(self.c + key + self.r + self.nl)
  486. self.activate_scripts_menu()
  487. return
  488. elif key == "D":
  489. self.queue_game.put(self.c + key + self.r + self.nl)
  490. self.queue_game.put(pformat(self.portdata).replace("\n", "\n\r") + self.nl)
  491. elif key == "Q":
  492. self.queue_game.put(self.c + key + self.r + self.nl)
  493. # This is an example of chaining PlayerInput prompt calls.
  494. ask = PlayerInput(self.game)
  495. d = ask.prompt("What is your quest?", 40, name="quest", abort_blank=True)
  496. # Display the user's input
  497. d.addCallback(ask.output)
  498. d.addCallback(
  499. lambda ignore: ask.prompt(
  500. "What is your favorite color?", 10, name="color"
  501. )
  502. )
  503. d.addCallback(ask.output)
  504. d.addCallback(
  505. lambda ignore: ask.prompt(
  506. "What is the meaning of the squirrel?",
  507. 12,
  508. name="squirrel",
  509. digits=True,
  510. )
  511. )
  512. d.addCallback(ask.output)
  513. def show_values(show):
  514. log.msg(show)
  515. self.queue_game.put(pformat(show).replace("\n", "\n\r") + self.nl)
  516. d.addCallback(lambda ignore: show_values(ask.keep))
  517. d.addCallback(self.welcome_back)
  518. # On error, just return back
  519. # This doesn't seem to be getting called.
  520. # d.addErrback(lambda ignore: self.welcome_back)
  521. d.addErrback(self.welcome_back)
  522. return
  523. elif key == "X":
  524. self.queue_game.put(self.c + key + self.r + self.nl)
  525. self.observer.load(self.save)
  526. self.save = None
  527. # It isn't running (NOW), so don't try to stop it.
  528. # self.keepalive.stop()
  529. self.keepalive = None
  530. self.queue_game.put(self.prompt)
  531. self.prompt = None
  532. # Possibly: Send '\r' to re-display the prompt
  533. # instead of displaying the original one.
  534. # Were we asked to do something when we were done here?
  535. if self.defer:
  536. reactor.CallLater(0, self.defer.callback)
  537. # self.defer.callback()
  538. self.defer = None
  539. return
  540. self.keepalive.start(30, True)
  541. self.menu()
  542. def activate_scripts_menu(self):
  543. self.observer.disconnect("player", self.player)
  544. self.observer.connect("player", self.scripts_player)
  545. self.scripts_menu()
  546. def deactivate_scripts_menu(self):
  547. self.observer.disconnect("player", self.scripts_player)
  548. self.observer.connect("player", self.player)
  549. self.welcome_back()
  550. def scripts_menu(self, *_):
  551. c1 = merge(Style.BRIGHT + Fore.CYAN)
  552. c2 = merge(Style.NORMAL + Fore.CYAN)
  553. def menu_item(ch, desc):
  554. self.queue_game.put(
  555. " " + c1 + ch + c2 + " - " + c1 + desc + self.nl
  556. )
  557. menu_item("1", "Ports (Trades between two sectors)")
  558. menu_item("2", "TODO")
  559. menu_item("3", "TODO")
  560. menu_item("X", "eXit")
  561. self.queue_game.put(" " + c1 + "-=>" + self.r + " ")
  562. def scripts_player(self, chunk: bytes):
  563. """ Data from player (in bytes). """
  564. chunk = chunk.decode("utf-8", "ignore")
  565. key = chunk.upper()
  566. if key == '1':
  567. self.queue_game.put(self.c + key + self.r + self.nl)
  568. # Activate this magical event here
  569. ports = ScriptPort(self.game)
  570. d = ports.whenDone()
  571. d.addCallback(self.scripts_menu)
  572. d.addErrback(self.scripts_menu)
  573. return
  574. elif key == 'X':
  575. self.queue_game.put(self.c + key + self.r + self.nl)
  576. self.deactivate_scripts_menu()
  577. return
  578. else:
  579. self.queue_game.put(self.c + "?" + self.r + self.nl)
  580. self.scripts_menu()
  581. def welcome_back(self, *_):
  582. log.msg("welcome_back")
  583. self.keepalive.start(30, True)
  584. self.menu()