1234567891011121314151617181920212223242526272829303132333435363738394041424344 |
- #!/usr/bin/env python3
- from json import loads, dumps
- from json.decoder import JSONDecodeError
- import pendulum
- from subprocess import run, PIPE
- from os.path import exists, join
- # Branch off the logging into a seperate file
- from config import log
- TARGET = join("bbs", join("logs", "enigma-bbs.log"))
- def blocker(ip):
- # Utility function to block given ip as string
- # call = run(["iptables", "-I", "DOCKER-USER", "-i", "eth0", "-s", ip, "-j", "DROP"], stdout=PIPE, check=True)
- print("iptables -I DOCKER-USER -i eth0 -s {0} -j DROP".format(ip))
- def is_bad(line):
- # Given line, attempt to parse... then is there a issue with it
- # Returns a python dict with ip and time in log
- try:
- j = loads(l)
- if j["msg"] == "Attempt to login with banned username":
- r = {}
- r["ip"] = "{0}".format(j["ip"][7:])
- r["time"] = j["time"]
- return r
- except JSONDecodeError:
- log.error("Failed to decode line, '{0}'".format(l))
- # Collecting banned users
- lusers = {}
- with open(TARGET, "r") as f:
- for l in f:
- user = is_bad(l)
- if user:
- lusers[user["ip"]] = user["time"]
- # Itterate over all blocked users
- for u in lusers:
- print("Blocking {0}".format(u))
- blocker(u)
- now = pendulum.now()
- log.info("Blocked {0} at {1}".format(u, now.to_datetime_string()))
|