|
@@ -3,53 +3,38 @@ from json import loads, dumps
|
|
|
from json.decoder import JSONDecodeError
|
|
|
import pendulum
|
|
|
from subprocess import run, PIPE
|
|
|
+from os.path import exists, join
|
|
|
|
|
|
-from logging import basicConfig, DEBUG, INFO, WARN, ERROR, CRITICAL, getLogger
|
|
|
-from logging.handlers import TimedRotatingFileHandler
|
|
|
-from os.path import exists, join, dirname, abspath
|
|
|
-from os import mkdir
|
|
|
+# Branch off the logging into a seperate file
|
|
|
+from config import log
|
|
|
|
|
|
-# Get the full path for this file
|
|
|
-currentdir = dirname(abspath(__file__))
|
|
|
-
|
|
|
-# Target log file
|
|
|
TARGET = join("bbs", join("logs", "enigma-bbs.log"))
|
|
|
|
|
|
-# Setup logging
|
|
|
-# DEBUG, INFO, WARN, ERROR, CRITICAL
|
|
|
-basicConfig(
|
|
|
- level=INFO,
|
|
|
- format="%(asctime)s - %(filename)s (%(lineno)d) - %(name)s - %(levelname)s - %(message)s",
|
|
|
- handlers=[
|
|
|
- TimedRotatingFileHandler(
|
|
|
- filename=join(currentdir, "failUser.log"),
|
|
|
- when="midnight",
|
|
|
- backupCount=1,
|
|
|
- ),
|
|
|
- #logging.StreamHandler(stream=sys.stdout),
|
|
|
- ],
|
|
|
-)
|
|
|
-
|
|
|
-log = getLogger("failUser")
|
|
|
+def blocker(ip):
|
|
|
+ # Utility function to block given ip as string
|
|
|
+ # call = run(["iptables", "-I", "DOCKER-USER", "-i", "eth0", "-s", ip, "-j", "DROP"], stdout=PIPE, check=True)
|
|
|
+ print("iptables -I DOCKER-USER -i eth0 -s {0} -j DROP".format(ip))
|
|
|
+
|
|
|
+def is_bad(line):
|
|
|
+ # Given line, attempt to parse... then is there a issue with it
|
|
|
+ # Returns a python dict with ip and time in log
|
|
|
+ try:
|
|
|
+ j = loads(l)
|
|
|
+ if j["msg"] == "Attempt to login with banned username":
|
|
|
+ r = {}
|
|
|
+ r["ip"] = "{0}".format(j["ip"][7:])
|
|
|
+ r["time"] = j["time"]
|
|
|
+ return r
|
|
|
+ except JSONDecodeError:
|
|
|
+ log.error("Failed to decode line, '{0}'".format(l))
|
|
|
|
|
|
# Collecting banned users
|
|
|
lusers = {}
|
|
|
with open(TARGET, "r") as f:
|
|
|
for l in f:
|
|
|
- try:
|
|
|
- j = loads(l)
|
|
|
- if j["msg"] == "Attempt to login with banned username":
|
|
|
- lusers["{0}".format(j["ip"][7:])] = j["time"]
|
|
|
- except JSONDecodeError:
|
|
|
- log.error("Failed to decode line, '{0}'".format(l))
|
|
|
-
|
|
|
-# dt = pendulum.parse(r['221.234.238.64'])
|
|
|
-# dt = dt.in_tz('America/New_York')
|
|
|
-# print(dt)
|
|
|
-
|
|
|
-# Utility function to block given ip as string
|
|
|
-def blocker(ip):
|
|
|
- call = run(["iptables", "-I", "DOCKER-USER", "-i", "eth0", "-s", ip, "-j", "DROP"], stdout=PIPE, check=True)
|
|
|
+ user = is_bad(l)
|
|
|
+ if user:
|
|
|
+ lusers[user["ip"]] = user["time"]
|
|
|
|
|
|
# Itterate over all blocked users
|
|
|
for u in lusers:
|