failUser.py 1.3 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344
  1. #!/usr/bin/env python3
  2. from json import loads, dumps
  3. from json.decoder import JSONDecodeError
  4. import pendulum
  5. from subprocess import run, PIPE
  6. from os.path import exists, join
  7. # Branch off the logging into a seperate file
  8. from config import log
  9. TARGET = join("bbs", join("logs", "enigma-bbs.log"))
  10. def blocker(ip):
  11. # Utility function to block given ip as string
  12. # call = run(["iptables", "-I", "DOCKER-USER", "-i", "eth0", "-s", ip, "-j", "DROP"], stdout=PIPE, check=True)
  13. print("iptables -I DOCKER-USER -i eth0 -s {0} -j DROP".format(ip))
  14. def is_bad(line):
  15. # Given line, attempt to parse... then is there a issue with it
  16. # Returns a python dict with ip and time in log
  17. try:
  18. j = loads(l)
  19. if j["msg"] == "Attempt to login with banned username":
  20. r = {}
  21. r["ip"] = "{0}".format(j["ip"][7:])
  22. r["time"] = j["time"]
  23. return r
  24. except JSONDecodeError:
  25. log.error("Failed to decode line, '{0}'".format(l))
  26. # Collecting banned users
  27. lusers = {}
  28. with open(TARGET, "r") as f:
  29. for l in f:
  30. user = is_bad(l)
  31. if user:
  32. lusers[user["ip"]] = user["time"]
  33. # Itterate over all blocked users
  34. for u in lusers:
  35. print("Blocking {0}".format(u))
  36. blocker(u)
  37. now = pendulum.now()
  38. log.info("Blocked {0} at {1}".format(u, now.to_datetime_string()))