AI News Hub Logo

AI News Hub

Real-Time Anomaly Detection Engine for a Cloud Storage Platform

DEV Community
Timilehin Obalereko

I built a Python daemon that watches incoming HTTP traffic in real time, learns what "normal" looks like, and automatically blocks attackers using Linux's built-in firewall — all without any third-party security tools. Imagine you run a cloud storage company. Your platform is public — anyone on the internet can send requests to it. Most of those people are legitimate users uploading files. But some of them are attackers — bots hammering your server with thousands of requests per second, trying to crash it, brute-force passwords, or scrape data. You need something that: Watches all incoming traffic in real time Learns what normal traffic looks like Detects when something looks abnormal Automatically blocks the attacker Alerts your team on Slack Shows you a live dashboard of what is happening That is exactly what I built for my HNG DevSecOps internship. Let me walk you through every part of it in plain English. Before we dive into code, here is how all the pieces fit together: [User's Browser / Attacker Bot] │ │ HTTP Request ▼ ┌─────────┐ │ Nginx │ ← logs every request as JSON └────┬────┘ │ ▼ ┌───────────┐ │ Nextcloud │ ← the actual cloud storage app └───────────┘ ┌──────────────────────────┐ │ Shared Docker Volume │ ← log file lives here └──────────────────────────┘ │ reads logs ▼ ┌─────────────────────────────────────┐ │ Detector Daemon │ │ │ │ monitor.py → reads log lines │ │ baseline.py → learns normal │ │ detector.py → spots anomalies │ │ blocker.py → runs iptables │ │ unbanner.py → lifts bans later │ │ notifier.py → sends Slack alerts │ │ dashboard.py → web UI │ └─────────────────────────────────────┘ │ ▼ iptables blocks attacker IPs Slack receives alerts Dashboard shows live stats The key insight is that the detector runs alongside Nextcloud — not inside it. It reads Nginx's logs from a shared Docker volume and acts as an automated security guard. The first challenge is reading the Nginx log file as new lines appear — like watching a live news feed. In bash, you do this with tail -f. In Python, I built the same thing: def tail_log(log_path): """ Generator that continuously reads new lines from a log file. Yields one parsed log entry at a time. """ # Wait until the log file exists while not os.path.exists(log_path): time.sleep(2) with open(log_path, "r") as f: # Jump to the END of the file # We only want NEW requests, not old history f.seek(0, 2) while True: line = f.readline() if line: parsed = parse_line(line.strip()) if parsed: yield parsed # send one entry to the caller else: # No new line yet — wait 50ms and try again time.sleep(0.05) A few things worth explaining here: f.seek(0, 2) — This jumps to the end of the file. Without this, we would process every old log line from before the daemon started, which would give us garbage baseline data. yield parsed — This makes the function a generator. Instead of returning a whole list of log entries (which would use lots of memory), it sends entries one at a time to the caller. The caller gets each entry the instant it arrives. time.sleep(0.05) — When there are no new lines, we wait 50 milliseconds before checking again. This is the "tail" behaviour — fast enough to catch requests in real time, not so fast that we burn CPU. Nginx writes logs in JSON format, so parsing is simple: def parse_line(line): try: data = json.loads(line) # Make sure all required fields are present required = ["source_ip", "timestamp", "method", "path", "status", "response_size"] for field in required: if field not in data: return None return data except json.JSONDecodeError: return None # skip malformed lines Now that we can read log lines, we need to calculate how many requests per second a given IP is making. The naive approach would be: count all requests in the last minute, divide by 60. But this is called a "per-minute counter" and it has a problem — it only updates once per minute, so it misses short bursts. The right approach is a sliding window. Think of it like this: you are standing on a moving train, looking out a window that shows exactly 60 seconds of track behind you. As the train moves forward, the window always shows the LAST 60 seconds — older track disappears from view automatically. In code, this uses Python's collections.deque: from collections import deque, defaultdict import time class SlidingWindowDetector: def __init__(self, config): self.window_seconds = 60 # look at last 60 seconds # One deque per IP — each entry is a timestamp self.ip_windows = defaultdict(deque) # One global deque for all traffic combined self.global_window = deque() def record(self, ip, status): """Called for every incoming request.""" now = time.time() self.ip_windows[ip].append(now) # add timestamp self.global_window.append(now) # add to global too def _evict_old(self, dq, now, window): """ Remove timestamps older than `window` seconds. Deques are ordered — oldest on the LEFT, newest on the RIGHT. We pop from the left until the oldest entry is within our window. This is the eviction logic — what makes it a SLIDING window. """ cutoff = now - window # anything before this is too old while dq and dq[0] = 10: if count > 10 * self.effective_mean: print(f"[Baseline] Spike guard: {count} req/s discarded") self.current_second_count = 0 return # do NOT save this to baseline # Normal traffic — save it self.global_samples.append((time.time(), count)) self.current_second_count = 0 With this in place: Attack happens → spike is discarded → baseline stays clean ✅ Attack stops → baseline is still accurate ✅ Second attack → detected immediately because baseline is still clean ✅ Now we have: The current request rate (from the sliding window) The baseline mean and stddev (from the baseline engine) How do we decide if the rate is "too high"? The z-score measures how many standard deviations above the mean a value is: z-score = (current_rate - mean) / stddev If the mean is 5 req/s and stddev is 1, and we see 8.5 req/s: z-score = (8.5 - 5) / 1 = 3.5 In statistics, a z-score above 3.0 means the value is so extreme it only occurs 0.13% of the time in normal data. In other words: 99.87% chance something is wrong. def check_ip_anomaly(self, ip, baseline): rate = self.get_ip_rate(ip) mean = baseline["mean"] stddev = baseline["stddev"] # Z-score check if stddev > 0: zscore = (rate - mean) / stddev else: zscore = 0 if zscore > 3.0: return True, f"z-score {zscore:.2f} > 3.0", rate # Rate multiplier check (backup) if rate > 5.0 * mean: return True, f"rate {rate:.2f} > 5x mean {mean:.2f}", rate return False, "", rate Even if z-score math produces unexpected results, if someone is sending 5x the normal amount of traffic, that is unambiguously suspicious. This is the backup check that catches edge cases. If an IP is causing lots of 404s (page not found) and 401s (unauthorized), it is likely probing for vulnerabilities. We automatically tighten the thresholds for these IPs: error_rate = self.get_ip_error_rate(ip) if error_rate >= 3.0 * baseline["error_mean"]: # Suspicious error pattern — use tighter thresholds zscore_threshold = 2.0 # was 3.0 rate_threshold = 3.0 # was 5.0 When we detect an anomaly, we block the IP using iptables — Linux's built-in packet filter. iptables sits at the network level, before any application (Nginx, Nextcloud, Python) ever sees a packet. When you add a DROP rule, the Linux kernel silently discards all packets from that IP. The attacker's requests just time out — they get no response at all. import subprocess def ban_ip(self, ip): """Add an iptables DROP rule for this IP.""" # -I INPUT 1 = insert at position 1 (top priority) # -s {ip} = match packets FROM this IP # -j DROP = silently discard the packet subprocess.run([ "iptables", "-I", "INPUT", "1", "-s", ip, "-j", "DROP" ]) print(f"[Blocker] BANNED {ip}") def unban_ip(self, ip): """Remove the iptables DROP rule.""" # -D = delete the matching rule subprocess.run([ "iptables", "-D", "INPUT", "-s", ip, "-j", "DROP" ]) print(f"[Blocker] UNBANNED {ip}") We use -I INPUT 1 (insert at position 1) rather than -A (append) so the DROP rule has the highest priority — it is checked before any ACCEPT rules. We do not ban forever immediately. The ban schedule escalates with repeated offenses: Offense Ban Duration 1st 10 minutes 2nd 30 minutes 3rd 2 hours 4th+ Permanent A background thread checks every 30 seconds and lifts expired bans: def _check_bans(self): now = time.time() for ip, ban_info in self.blocker.get_active_bans().items(): duration = ban_info["duration"] banned_at = ban_info["banned_at"] if duration == -1: continue # permanent ban, never auto-unban if now >= banned_at + duration: self.blocker.unban_ip(ip) self.detector.banned_ips.discard(ip) self.notifier.send_unban(ip=ip, ...) Every ban, unban, and global anomaly sends a message to Slack via an Incoming Webhook — a special URL that posts messages to a channel when you send an HTTP POST request to it. def send_ban(self, ip, reason, rate, baseline, duration): message = ( f"🚨 *IP BANNED* — `{ip}`\n" f"*Condition:* {reason}\n" f"*Current Rate:* {rate:.2f} req/s\n" f"*Baseline Mean:* {baseline['mean']:.2f} req/s\n" f"*Ban Duration:* {duration_str}\n" f"*Time:* {timestamp}" ) requests.post( self.webhook_url, json={"text": message}, timeout=10 ) The 10-second timeout is important — if Slack's servers are slow, we do not want the Slack call to block our detection loop. The dashboard is a Flask web app that serves a single HTML page. The page polls a /api/metrics endpoint every 3 seconds and updates the display without reloading: @app.route("/api/metrics") def metrics(): return jsonify({ "global_rate": detector.get_global_rate(), "top_ips": detector.get_top_ips(10), "banned_ips": blocker.get_active_bans(), "cpu_percent": psutil.cpu_percent(), "mem_percent": psutil.virtual_memory().percent, "baseline_mean": baseline.effective_mean, "baseline_stddev": baseline.effective_stddev, "uptime": calculate_uptime(), }) The JavaScript side polls this every 3 seconds: async function refresh() { const res = await fetch('/api/metrics'); const d = await res.json(); document.getElementById('global-rate').textContent = d.global_rate; document.getElementById('baseline-mean').textContent = d.baseline_mean; // ... update all other fields } setInterval(refresh, 3000); // run every 3 seconds The main.py orchestrator wires everything together in one loop: for log_entry in tail_log(config["log_path"]): ip = log_entry["source_ip"] status = log_entry["status"] # 1. Record in sliding window for rate detection detector.record(ip, status) # 2. Feed into baseline ONLY if IP is not banned # (prevents attack traffic from corrupting baseline) if not blocker.is_banned(ip): baseline.record_request(ip, status) # 3. Maybe recalculate baseline every 60 seconds if baseline.maybe_recalculate(): audit_logger.log_baseline_recalc(...) # 4. Get current baseline b = baseline.get_baseline() # 5. Check if this IP is anomalous is_anomaly, reason, rate = detector.check_ip_anomaly(ip, b) if is_anomaly and not blocker.is_banned(ip): duration = blocker.ban_ip(ip) notifier.send_ban(ip, reason, rate, b, duration) audit_logger.log_ban(ip, reason, rate, b, duration) # 6. Check if global traffic is anomalous global_anomaly, reason, rate = detector.check_global_anomaly(b) if global_anomaly: notifier.send_global_alert(reason, rate, b) Every single log line goes through this sequence within milliseconds of Nginx writing it. When an attack hits the server, here is what happens end to end: Attacker sends 150 concurrent requests per second Nginx logs each request as JSON to the shared volume monitor.py detects new lines within 50ms Sliding window calculates rate: 150 req/s Z-score: (150 - 1.5) / 0.8 = 185 — massively above threshold iptables -I INPUT 1 -s {attacker_ip} -j DROP fires Slack alert sent within seconds Audit log entry written After 10 minutes, auto-unban fires Slack unban alert sent If attacker returns — detected again immediately because baseline stayed clean 1. The baseline is everything. 2. Two windows catch two attack types. 3. Z-score beats fixed thresholds. 4. iptables operates at the kernel level. 5. Auto-unban is necessary. Python 3.11 — main language Docker + Docker Compose — containerization Nginx — reverse proxy and JSON logging Nextcloud — the cloud storage application Flask + Waitress — dashboard web server iptables — IP blocking at kernel level Slack Webhooks — alerting psutil — system metrics The full source code is available at: https://github.com/devops-timi/anomaly-detection-engine The repository includes: All Python source files with detailed comments Nginx configuration Docker Compose setup Architecture diagram Full README with setup instructions Building this taught me that real security tooling is not about fancy AI or expensive software. At its core, it is about: Watching what is happening (log tailing) Learning what normal looks like (rolling baseline) Spotting deviations quickly (z-score detection) Responding automatically (iptables + alerts) The hardest part was not the detection logic — it was making sure the baseline stayed honest. Once the spike guard was in place, everything else clicked into place. If you are learning DevSecOps or cloud infrastructure, I highly recommend trying to build something like this from scratch. You will learn more about how the internet works in one project than in months of reading.