config.py 2.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105
  1. #!/usr/bin/env python3
  2. from logging import basicConfig, DEBUG, INFO, WARN, ERROR, CRITICAL, getLogger
  3. from logging.handlers import TimedRotatingFileHandler
  4. from os.path import exists, join, dirname, abspath
  5. from json import loads, dumps
  6. from json.decoder import JSONDecodeError
  7. import pendulum
  8. # Get the full path for this file
  9. currentdir = dirname(abspath(__file__))
  10. # Target log file
  11. TARGET = join("bbs", join("logs", "enigma-bbs.log"))
  12. # Setup logging
  13. # DEBUG, INFO, WARN, ERROR, CRITICAL
  14. basicConfig(
  15. level=INFO,
  16. format="%(asctime)s - %(filename)s (%(lineno)d) - %(name)s - %(levelname)s - %(message)s",
  17. handlers=[
  18. TimedRotatingFileHandler(
  19. filename=join(currentdir, "failUser.log"),
  20. when="midnight",
  21. backupCount=1,
  22. ),
  23. #logging.StreamHandler(stream=sys.stdout),
  24. ],
  25. )
  26. log = getLogger("failUser")
  27. # Config JSON
  28. def save_config(con):
  29. with open("failUser.cfg", "w") as f:
  30. f.write(dumps(con, indent=4, sort_keys=False))
  31. def load_config():
  32. if not exists("failUser.cfg"):
  33. now = pendulum.now().to_datetime_string()
  34. defaults = {
  35. # Target enigma logs
  36. "target": "bbs/logs/enigma-bbs.log",
  37. # block_time in hours
  38. "block_time": 4,
  39. # Last unblock
  40. "last_unblock": now,
  41. }
  42. save_config(defaults)
  43. return defaults
  44. else:
  45. with open("failUser.cfg", "r") as f:
  46. config = loads(f.read())
  47. return config
  48. # blocks in json
  49. def add_block(ip, time):
  50. # first load in all blocks
  51. try:
  52. with open("blocks.json", "r") as f:
  53. blocks = loads(f.read())
  54. except FileNotFoundError:
  55. blocks = {}
  56. pass
  57. # add ip and time
  58. log.debug("Added {0} in blocks.json".format(ip))
  59. blocks[ip] = time
  60. # update blocks
  61. with open("blocks.json", "w") as f:
  62. f.write(dumps(blocks))
  63. def rm_block(ip):
  64. # first load all blocks
  65. try:
  66. with open("blocks.json", "r") as f:
  67. blocks = loads(f.read())
  68. except FileNotFoundError:
  69. return
  70. try:
  71. if blocks[ip]:
  72. log.debug("Removed {0} in blocks.json".format(ip))
  73. del blocks[ip]
  74. # update blocks
  75. with open("blocks.json", "w") as f:
  76. f.write(dumps(blocks))
  77. except KeyError:
  78. log.debug("Unable to unblock '{0}'".format(ip))
  79. def check_blocks():
  80. # return a list of ips exceeding block_time in config
  81. result = []
  82. conf = load_config()
  83. # load in blocks
  84. try:
  85. with open("blocks.json", "r") as f:
  86. blocks = loads(f.read())
  87. except FileNotFoundError:
  88. return
  89. now = pendulum.now()
  90. for ip in blocks:
  91. dt = pendulum.parse(blocks[ip])
  92. if abs(now.diff(dt, False).in_hours()) < conf["block_time"]:
  93. # Oops, this ip needs to be unblocked
  94. result.append(ip)
  95. if result:
  96. return result