config.py 2.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103
  1. #!/usr/bin/env python3
  2. from logging import basicConfig, DEBUG, INFO, WARN, ERROR, CRITICAL, getLogger
  3. from logging.handlers import TimedRotatingFileHandler
  4. from os.path import exists, join, dirname, abspath
  5. from json import loads, dumps
  6. from json.decoder import JSONDecodeError
  7. import pendulum
  8. # Get the full path for this file
  9. currentdir = dirname(abspath(__file__))
  10. # Target log file
  11. TARGET = join("bbs", join("logs", "enigma-bbs.log"))
  12. # Setup logging
  13. # DEBUG, INFO, WARN, ERROR, CRITICAL
  14. basicConfig(
  15. level=INFO,
  16. format="%(asctime)s - %(filename)s (%(lineno)d) - %(name)s - %(levelname)s - %(message)s",
  17. handlers=[
  18. TimedRotatingFileHandler(
  19. filename=join(currentdir, "failUser.log"),
  20. when="midnight",
  21. backupCount=1,
  22. ),
  23. #logging.StreamHandler(stream=sys.stdout),
  24. ],
  25. )
  26. log = getLogger("failUser")
  27. # Config JSON
  28. def save_config(con):
  29. with open("failUser.cfg", "w") as f:
  30. f.write(dumps(con, indent=4, sort_keys=False))
  31. def load_config():
  32. if not exists("failUser.cfg"):
  33. now = pendulum.now().to_datetime_string()
  34. defaults = {
  35. # Target enigma logs
  36. "target": "bbs/logs/enigma-bbs.log",
  37. # block_time in hours
  38. "block_time": 4,
  39. # Last unblock
  40. "last_unblock": now,
  41. }
  42. save_config(defaults)
  43. return defaults
  44. else:
  45. with open("failUser.cfg", "r") as f:
  46. config = loads(f.read())
  47. return config
  48. # blocks in json
  49. def add_block(ip, time):
  50. # first load in all blocks
  51. try:
  52. with open("blocks.json", "r") as f:
  53. blocks = loads(f.read())
  54. except FileNotFoundError:
  55. blocks = {}
  56. pass
  57. # add ip and time
  58. blocks[ip] = time
  59. # update blocks
  60. with open("blocks.json", "w") as f:
  61. f.write(dumps(blocks))
  62. def rm_block(ip):
  63. # first load all blocks
  64. try:
  65. with open("blocks.json", "r") as f:
  66. blocks = loads(f.read())
  67. except FileNotFoundError:
  68. return
  69. try:
  70. if blocks[ip]:
  71. del blocks[ip]
  72. # update blocks
  73. with open("blocks.json", "w") as f:
  74. f.write(dumps(blocks))
  75. except KeyError:
  76. log.debug("Unable to unblock '{0}'".format(ip))
  77. def check_blocks():
  78. # return a list of ips exceeding block_time in config
  79. result = []
  80. conf = load_config()
  81. # load in blocks
  82. try:
  83. with open("blocks.json", "r") as f:
  84. blocks = loads(f.read())
  85. except FileNotFoundError:
  86. return
  87. now = pendulum.now()
  88. for ip in blocks:
  89. dt = pendulum.parse(blocks[ip])
  90. if abs(now.diff(dt, False).in_hours()) < conf["block_time"]:
  91. # Oops, this ip needs to be unblocked
  92. result.append(ip)
  93. if result:
  94. return result