failUser.py 1.7 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859
  1. #!/usr/bin/env python3
  2. from json import loads, dumps
  3. from json.decoder import JSONDecodeError
  4. import pendulum
  5. from subprocess import run, PIPE
  6. from logging import basicConfig, DEBUG, INFO, WARN, ERROR, CRITICAL, getLogger
  7. from logging.handlers import TimedRotatingFileHandler
  8. from os.path import exists, join, dirname, abspath
  9. from os import mkdir
  10. # Get the full path for this file
  11. currentdir = dirname(abspath(__file__))
  12. # Target log file
  13. TARGET = join("bbs", join("logs", "enigma-bbs.log"))
  14. # Setup logging
  15. # DEBUG, INFO, WARN, ERROR, CRITICAL
  16. basicConfig(
  17. level=INFO,
  18. format="%(asctime)s - %(filename)s (%(lineno)d) - %(name)s - %(levelname)s - %(message)s",
  19. handlers=[
  20. TimedRotatingFileHandler(
  21. filename=join(currentdir, "failUser.log"),
  22. when="midnight",
  23. backupCount=1,
  24. ),
  25. #logging.StreamHandler(stream=sys.stdout),
  26. ],
  27. )
  28. log = getLogger("failUser")
  29. # Collecting banned users
  30. lusers = {}
  31. with open(TARGET, "r") as f:
  32. for l in f:
  33. try:
  34. j = loads(l)
  35. if j["msg"] == "Attempt to login with banned username":
  36. lusers["{0}".format(j["ip"][7:])] = j["time"]
  37. except JSONDecodeError:
  38. log.error("Failed to decode line, '{0}'".format(l))
  39. # dt = pendulum.parse(r['221.234.238.64'])
  40. # dt = dt.in_tz('America/New_York')
  41. # print(dt)
  42. # Utility function to block given ip as string
  43. def blocker(ip):
  44. call = run(["iptables", "-I", "DOCKER-USER", "-i", "eth0", "-s", ip, "-j", "DROP"], stdout=PIPE, check=True)
  45. # Itterate over all blocked users
  46. for u in lusers:
  47. print("Blocking {0}".format(u))
  48. blocker(u)
  49. now = pendulum.now()
  50. log.info("Blocked {0} at {1}".format(u, now.to_datetime_string()))