1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859 |
- #!/usr/bin/env python3
- from json import loads, dumps
- from json.decoder import JSONDecodeError
- import pendulum
- from subprocess import run, PIPE
- from logging import basicConfig, DEBUG, INFO, WARN, ERROR, CRITICAL, getLogger
- from logging.handlers import TimedRotatingFileHandler
- from os.path import exists, join, dirname, abspath
- from os import mkdir
- # Get the full path for this file
- currentdir = dirname(abspath(__file__))
- # Target log file
- TARGET = join("bbs", join("logs", "enigma-bbs.log"))
- # Setup logging
- # DEBUG, INFO, WARN, ERROR, CRITICAL
- basicConfig(
- level=INFO,
- format="%(asctime)s - %(filename)s (%(lineno)d) - %(name)s - %(levelname)s - %(message)s",
- handlers=[
- TimedRotatingFileHandler(
- filename=join(currentdir, "failUser.log"),
- when="midnight",
- backupCount=1,
- ),
- #logging.StreamHandler(stream=sys.stdout),
- ],
- )
- log = getLogger("failUser")
- # Collecting banned users
- lusers = {}
- with open(TARGET, "r") as f:
- for l in f:
- try:
- j = loads(l)
- if j["msg"] == "Attempt to login with banned username":
- lusers["{0}".format(j["ip"][7:])] = j["time"]
- except JSONDecodeError:
- log.error("Failed to decode line, '{0}'".format(l))
- # dt = pendulum.parse(r['221.234.238.64'])
- # dt = dt.in_tz('America/New_York')
- # print(dt)
- # Utility function to block given ip as string
- def blocker(ip):
- call = run(["iptables", "-I", "DOCKER-USER", "-i", "eth0", "-s", ip, "-j", "DROP"], stdout=PIPE, check=True)
- # Itterate over all blocked users
- for u in lusers:
- print("Blocking {0}".format(u))
- blocker(u)
- now = pendulum.now()
- log.info("Blocked {0} at {1}".format(u, now.to_datetime_string()))
|