--- title: Une façon créative de bloquer les indésirables date: '2026-03-07 10:26:35' cover: images/cover.png tags: - Administration réseau - Caddy - NixOS - OPNsense - Python - Firewall - Sécurité - RSS weather: temperature: 10.7777777777778 humidity: 73 pressure: 1025.39848029525 illuminance: 32588.4 wind_speed: 4.9889664 wind_direction: 68 precipitations: false source: - influxdb comments_url: https://com.richard-dern.fr/post/480 --- L'architecture de mon réseau parlera peut-être aux plus vieux, mais elle fera probablement sourciller les plus jeunes et les [_cloud-natives_](https://fr.wikipedia.org/wiki/Cloud_natif). Ces derniers disposent d'une infinité d'outils pour mener à bien le genre de tâche à laquelle je me suis attaqué dernièrement. > Ma solution intervient en **complément** d'autres mesures sécuritaires (notamment [IPS/IDS](https://docs.opnsense.org/manual/ips.html)). > Elle est rendue possible notamment par le fait que mon site est statique, sauf pour la page de recherche qui sera traitée plus loin. > J'ai décrit à ChatGPT 5.4 ce que je voulais, c'est lui qui a produit le code nix et python. ## Description de mon architecture réseau Derrière ma [Freebox](https://www.free.fr/freebox/freebox-pop) en mode bridge se trouve mon routeur sous [OPNsense](https://opnsense.org/), sur lequel est installé le [plugin](https://docs.opnsense.org/manual/how-tos/caddy.html) [Caddy](https://caddyserver.com/) en tant que reverse-proxy pour tout service web que je désire rendre accessible depuis Internet. Ces services sont hébergés sur différentes machines physiques de mon réseau, derrière le routeur. Plus spécifiquement, mon site est hébergé sur une machine sobrement appelée `server-main`. Cette machine, sous NixOS, héberge en particulier un autre serveur Caddy, qui permet d'accéder à ce site. ![](images/architecture-reseau.png) ## Objectifs - Produire un flux RSS qui va me tenir informé du comportement suspect des clients de mon site - Ajouter automatiquement les IPs des clients au comportement suspect à un alias de firewall d'OPNsense. ## Contraintes - Je ne veux pas introduire la complexité d'une véritable stack d'analyse de journaux (par exemple [ELK](https://www.elastic.co/elastic-stack), [EFK](https://docs.fluentd.org/how-to-guides/free-alternative-to-splunk-by-fluentd) ou [Loki/Promtail/Grafana](https://grafana.com/oss/loki/)) : je veux accomplir mes objectifs de la façon la plus directe possible. - Je ne veux pas de notification en temps réel. J'ai autre chose à faire que de me creuser des ulcères à chaque fois qu'un bot fait quelque chose qu'il n'est pas censé faire. - Je ne veux pas de solution reposant sur Docker (par principe personnel). ## Principe de fonctionnement L'instance de Caddy sur `server-main` est configurée pour enregistrer les journaux des erreurs spécifiques au blog. Cela va inclure les tentatives d'accès à des fichiers inexistants, mais aussi des choses plus exotiques ou délibérées, notamment la recherche de scripts d'administration. Un script, exécuté périodiquement, va analyser ces logs, et en extraire des informations utiles à la prise de décision : doit-on considérer que telle adresse IP a un comportement suspect ? Si on décide qu'un client a un comportement inapproprié, le script contacte le serveur OPNsense afin d'ajouter à un alias du firewall l'adresse IP incriminée. Enfin, le script génère un flux RSS de ces erreurs, de sorte à me remonter l'information. ## Définition d'un comportement suspect Compte tenu du fait que mon site est statique, toute tentative de lire un fichier `.php` est considérée comme un scan agressif. Pour la même raison, toute requête de type _POST_ est un comportement considéré agressif, sauf s'il survient dans le cadre d'une recherche. Enfin, j'ai constaté plusieurs clients qui scannent mes fichiers à la recherche d'une installation de WordPress, ou d'autres applications populaires, par exemple des outils de gestion de base de données. Comme la structure de WordPress est bien connue, toute tentative d'accès à un de ses dossiers aboutira à un blocage du client. > J'ai constaté, pendant l'écriture de ce système, que la majorité des clients à comportement suspect proviennent des services de cloud offerts par Microsoft qui, décidément, équipe aussi bien les administrations que les bandits... ## FAQ ### Pourquoi ne pas utiliser fail2ban comme tout le monde ? Parce que [fail2ban](https://github.com/fail2ban/fail2ban) répond surtout à un besoin local : lire des journaux et modifier des règles sur la machine concernée. Mon besoin est un peu différent : je veux décider à partir des journaux HTTP de `server-main`, mais appliquer le blocage à l'entrée de mon réseau, sur OPNsense, via son API. ### Pourquoi des flux RSS ? Pour éviter le temps réel, qui me noierait sous les notifications. Parce que j'utilise déjà les [flux RSS](https://fr.wikipedia.org/wiki/RSS). Et parce qu'un flux RSS me permet de convoyer plus d'informations qu'une petite notification. ### Pourquoi pas une application web complète et pratique ? Parce que je n'en ai pas besoin. J'ai juste besoin d'un script, pas d'une usine à gaz, même si l'on pourra objecter que mon script en est déjà une à sa manière. ## Mise en place ### Module nix ```nix { config, lib, pkgs, ... }: let cfg = config.services.caddyErrorsRss; absolutePathType = lib.types.strMatching "^/.*"; feedFileNameType = lib.types.strMatching "^[^/]+$"; formatFeedHost = host: if lib.hasInfix ":" host && !(lib.hasPrefix "[" host && lib.hasSuffix "]" host) then "[${host}]" else host; stateDirectory = "caddy-errors-rss"; stateDirectoryPath = "/var/lib/${stateDirectory}"; defaultFeedDirectory = "/var/lib/${stateDirectory}-feed"; feedFilePath = "${cfg.feedDirectory}/${cfg.feedFileName}"; feedLink = "http://${formatFeedHost cfg.feedAddress}:${toString cfg.listenPort}/${cfg.feedFileName}"; pythonEnv = pkgs.python3.withPackages (ps: with ps; [ dnspython feedgen ipwhois requests ]); rssScript = pkgs.writeTextFile { name = "caddy-errors-rss.py"; executable = true; # C'est là que j'ai placé le script python, mais rien n'empêche de le mettre ailleurs text = builtins.readFile ../scripts/caddy-errors-rss.py; }; runtimeConfig = pkgs.writeText "caddy-errors-rss-config.json" (builtins.toJSON { logFile = cfg.logFile; outputFile = feedFilePath; cacheFile = "${stateDirectoryPath}/ip-cache.json"; feedLink = feedLink; language = cfg.language; maxItems = cfg.analysis.maxItems; maxUrlsPerItem = cfg.analysis.maxUrlsPerItem; cacheTtlSeconds = cfg.analysis.cacheTtlSeconds; tailLines = cfg.analysis.tailLines; networkLookupTimeoutSeconds = cfg.analysis.networkLookupTimeoutSeconds; trustedBlockExcludeNets = cfg.blocking.trustedExcludeNets; allowedPostUris = cfg.blocking.allowedPostUris; suspiciousPathPrefixes = cfg.blocking.suspiciousPathPrefixes; opnsense = lib.optionalAttrs cfg.opnsense.enable { baseUrl = cfg.opnsense.apiBaseUrl; timeoutSeconds = cfg.opnsense.apiTimeoutSeconds; verifyTls = cfg.opnsense.verifyTls; alias = { name = cfg.opnsense.aliasName; type = cfg.opnsense.aliasType; description = cfg.opnsense.aliasDescription; }; apiPaths = { aliasGetUuid = cfg.opnsense.apiPaths.aliasGetUuid; aliasAddItem = cfg.opnsense.apiPaths.aliasAddItem; aliasSetItem = cfg.opnsense.apiPaths.aliasSetItem; aliasReconfigure = cfg.opnsense.apiPaths.aliasReconfigure; aliasUtilList = cfg.opnsense.apiPaths.aliasUtilList; aliasUtilAdd = cfg.opnsense.apiPaths.aliasUtilAdd; }; }; }); in { options.services.caddyErrorsRss = { enable = lib.mkOption { type = lib.types.bool; default = true; description = "Enable generation of the Caddy error RSS feed."; }; user = lib.mkOption { type = lib.types.str; default = "caddy"; description = "User used by the RSS generation service."; }; group = lib.mkOption { type = lib.types.str; default = "caddy"; description = "Group used by the RSS generation service."; }; logFile = lib.mkOption { type = absolutePathType; default = "/var/log/caddy/caddy-errors.json"; description = "Absolute path to the Caddy JSON error log file."; }; feedDirectory = lib.mkOption { type = absolutePathType; default = defaultFeedDirectory; description = "Directory where the generated RSS feed is written."; }; feedFileName = lib.mkOption { type = feedFileNameType; default = "index.xml"; description = "File name used for the generated RSS feed."; }; feedAddress = lib.mkOption { type = lib.types.str; default = "127.0.0.1"; description = "Host name or IP address advertised in the generated feed link."; }; listenPort = lib.mkOption { type = lib.types.port; default = 30083; description = "Local Caddy port used to serve the generated feed."; }; language = lib.mkOption { type = lib.types.str; default = "fr-FR"; description = "Language advertised in the generated RSS feed."; }; onBootDelay = lib.mkOption { type = lib.types.str; default = "2m"; description = "Delay before the timer triggers after boot."; }; refreshInterval = lib.mkOption { type = lib.types.str; default = "15m"; description = "Interval between RSS feed refreshes."; }; analysis = lib.mkOption { type = lib.types.submodule { options = { maxItems = lib.mkOption { type = lib.types.ints.positive; default = 500; description = "Maximum number of distinct RSS items kept in the feed."; }; maxUrlsPerItem = lib.mkOption { type = lib.types.ints.positive; default = 25; description = "Maximum number of URLs shown per IP in each RSS item."; }; cacheTtlSeconds = lib.mkOption { type = lib.types.ints.positive; default = 7 * 24 * 3600; description = "TTL of cached IP intelligence lookups, in seconds."; }; tailLines = lib.mkOption { type = lib.types.ints.positive; default = 50000; description = "Number of log lines read from the end of the Caddy error log."; }; networkLookupTimeoutSeconds = lib.mkOption { type = lib.types.ints.positive; default = 8; description = "Timeout used for DNS, RDAP and OPNsense lookups."; }; }; }; default = { }; description = "Feed generation and log analysis limits."; }; blocking = lib.mkOption { type = lib.types.submodule { options = { trustedExcludeNets = lib.mkOption { type = lib.types.listOf lib.types.str; default = [ ]; description = "Networks that must never be auto-blocked."; }; allowedPostUris = lib.mkOption { type = lib.types.listOf lib.types.str; default = [ ]; description = "POST URIs that should not trigger OPNsense auto-blocking."; }; suspiciousPathPrefixes = lib.mkOption { type = lib.types.listOf lib.types.str; default = [ ]; description = "Common probe path prefixes that should trigger OPNsense auto-blocking."; }; }; }; default = { }; description = "Auto-blocking heuristics and exclusions."; }; opnsense = lib.mkOption { type = lib.types.submodule { options = { enable = lib.mkOption { type = lib.types.bool; default = false; description = "Enable OPNsense alias management for suspicious clients."; }; aliasName = lib.mkOption { type = lib.types.str; default = "suspicious_caddy_clients"; description = "OPNsense alias used for suspicious IP auto-blocking."; }; aliasType = lib.mkOption { type = lib.types.enum [ "host" "network" "external" ]; default = "host"; description = "Alias type created on OPNsense when the alias is missing."; }; aliasDescription = lib.mkOption { type = lib.types.str; default = "Automatically maintained alias for suspicious Caddy clients"; description = "Description used when creating the OPNsense alias."; }; apiBaseUrl = lib.mkOption { type = lib.types.nullOr lib.types.str; default = null; description = "Base URL of the OPNsense API."; }; apiTimeoutSeconds = lib.mkOption { type = lib.types.ints.positive; default = 8; description = "Timeout used for OPNsense API requests."; }; verifyTls = lib.mkOption { type = lib.types.bool; default = false; description = "Whether to verify the OPNsense API TLS certificate."; }; apiKeyFile = lib.mkOption { type = lib.types.nullOr absolutePathType; default = null; description = "Absolute path to the OPNsense API key file."; }; apiSecretFile = lib.mkOption { type = lib.types.nullOr absolutePathType; default = null; description = "Absolute path to the OPNsense API secret file."; }; apiPaths = lib.mkOption { type = lib.types.submodule { options = { aliasGetUuid = lib.mkOption { type = lib.types.str; default = "/api/firewall/alias/get_alias_u_u_i_d/{alias}"; description = "Path template used to retrieve an alias UUID by name."; }; aliasAddItem = lib.mkOption { type = lib.types.str; default = "/api/firewall/alias/add_item"; description = "Path used to create a new alias."; }; aliasSetItem = lib.mkOption { type = lib.types.str; default = "/api/firewall/alias/set_item/{uuid}"; description = "Path template used to update an existing alias."; }; aliasReconfigure = lib.mkOption { type = lib.types.str; default = "/api/firewall/alias/reconfigure"; description = "Path used to apply alias configuration changes."; }; aliasUtilList = lib.mkOption { type = lib.types.str; default = "/api/firewall/alias_util/list/{alias}"; description = "Path template used to list alias table members."; }; aliasUtilAdd = lib.mkOption { type = lib.types.str; default = "/api/firewall/alias_util/add/{alias}"; description = "Path template used to add an address to an alias table."; }; }; }; default = { }; description = "OPNsense API paths used by the service."; }; }; }; default = { }; description = "OPNsense connectivity used for alias updates."; }; }; config = lib.mkIf cfg.enable { assertions = [ { assertion = config.services.caddy.enable; message = "services.caddyErrorsRss requires services.caddy.enable = true."; } { assertion = !cfg.opnsense.enable || cfg.opnsense.apiBaseUrl != null; message = "services.caddyErrorsRss.opnsense.apiBaseUrl must be set when OPNsense support is enabled."; } { assertion = !cfg.opnsense.enable || cfg.opnsense.apiKeyFile != null; message = "services.caddyErrorsRss.opnsense.apiKeyFile must be set when OPNsense support is enabled."; } { assertion = !cfg.opnsense.enable || cfg.opnsense.apiSecretFile != null; message = "services.caddyErrorsRss.opnsense.apiSecretFile must be set when OPNsense support is enabled."; } ]; # Le vhost caddy qui va permettre d'accéder au flux RSS services.caddy.virtualHosts = { ":${toString cfg.listenPort}".extraConfig = '' root * ${cfg.feedDirectory} file_server log { output discard } ''; }; systemd.tmpfiles.rules = [ "d ${cfg.feedDirectory} 0755 ${cfg.user} ${cfg.group} -" ]; systemd.services.caddy-errors-rss = { description = "Génération du flux RSS des erreurs Caddy"; wants = [ "network-online.target" ]; after = [ "network-online.target" "caddy.service" ]; serviceConfig = { Type = "oneshot"; User = cfg.user; Group = cfg.group; UMask = "0022"; WorkingDirectory = stateDirectoryPath; StateDirectory = stateDirectory; ExecStart = "${pythonEnv}/bin/python3 ${rssScript} --config ${runtimeConfig}"; LoadCredential = lib.optionals cfg.opnsense.enable [ "opnsense_api_key:${cfg.opnsense.apiKeyFile}" "opnsense_api_secret:${cfg.opnsense.apiSecretFile}" ]; ReadWritePaths = [ cfg.feedDirectory ]; NoNewPrivileges = true; PrivateTmp = true; ProtectSystem = "strict"; ProtectHome = true; ProtectKernelTunables = true; ProtectKernelModules = true; ProtectControlGroups = true; RestrictSUIDSGID = true; LockPersonality = true; MemoryDenyWriteExecute = true; }; }; systemd.timers.caddy-errors-rss = { description = "Reconstruction périodique du flux RSS des erreurs Caddy"; wantedBy = [ "timers.target" ]; timerConfig = { OnBootSec = cfg.onBootDelay; OnUnitActiveSec = cfg.refreshInterval; Persistent = true; Unit = "caddy-errors-rss.service"; }; }; }; } ``` ### Script ```python from __future__ import annotations import argparse import datetime as dt import hashlib import ipaddress import json import os from collections import deque from dataclasses import dataclass from pathlib import Path from typing import Any from urllib.parse import quote import xml.etree.ElementTree as ET import dns.exception import dns.reversename import dns.resolver import requests from feedgen.feed import FeedGenerator from ipwhois.exceptions import BaseIpwhoisException, IPDefinedError from ipwhois import IPWhois LOCAL_TZ = dt.datetime.now().astimezone().tzinfo or dt.timezone.utc @dataclass(frozen=True) class AliasSpec: """Décrit l'alias OPNsense maintenu par le service.""" name: str type: str description: str @dataclass(frozen=True) class ApiPaths: """Stocke les modèles de chemins de l'API OPNsense utilisés par le client.""" alias_get_uuid: str alias_add_item: str alias_set_item: str alias_reconfigure: str alias_util_list: str alias_util_add: str @dataclass(frozen=True) class OPNsenseConfig: """Regroupe les paramètres OPNsense consommés par le script.""" base_url: str timeout_seconds: int verify_tls: bool alias: AliasSpec api_paths: ApiPaths @dataclass(frozen=True) class Config: """Représente la configuration d'exécution entièrement parsée.""" log_file: Path output_file: Path cache_file: Path feed_link: str language: str max_items: int max_urls_per_item: int cache_ttl_seconds: int tail_lines: int network_lookup_timeout_seconds: int trusted_block_exclude_nets: tuple[ipaddress._BaseNetwork, ...] allowed_post_uris: frozenset[str] suspicious_path_prefixes: tuple[str, ...] opnsense: OPNsenseConfig | None @dataclass(frozen=True) class Record: """Représente une entrée d'erreur Caddy déjà parsée.""" client_ip: str uri: str method: str status: int user_agent: str host: str full_url: str ts_epoch: float timestamp_iso: str def require(condition: bool, message: str) -> None: """Lève une erreur d'exécution lorsqu'une condition attendue n'est pas satisfaite.""" if not condition: raise RuntimeError(message) def normalize_ip(value: str) -> str: """Normalise une IPv4 ou une IPv6 dans sa forme textuelle canonique.""" raw = value.strip() if raw.startswith("[") and raw.endswith("]"): raw = raw[1:-1] if "%" in raw: raw = raw.split("%", 1)[0] return str(ipaddress.ip_address(raw)) def normalize_uri_path(uri: str) -> str: """Supprime la chaîne de requête et le fragment d'une URI.""" return uri.split("#", 1)[0].split("?", 1)[0].strip() def normalize_suspicious_path_prefix(prefix: str) -> str: """Normalise et valide un préfixe de chemin suspect.""" normalized = normalize_uri_path(prefix).strip().lower().rstrip("/") require(normalized != "", "Un préfixe de chemin suspect ne peut pas être vide") require(normalized.startswith("/"), f"Préfixe de chemin suspect invalide: {prefix!r}") require(normalized != "/", "Le préfixe de chemin suspect '/' est trop large") return normalized def path_matches_suspicious_prefix(path: str, prefix: str) -> bool: """Indique si un chemin correspond à un préfixe suspect avec borne raisonnable.""" if path == prefix: return True if not path.startswith(prefix): return False return path[len(prefix)] in "/._-" def uri_targets_suspicious_path(uri: str, suspicious_path_prefixes: tuple[str, ...]) -> bool: """Indique si une URI cible un chemin classiquement scanné.""" path = normalize_uri_path(uri).lower() return any( path_matches_suspicious_prefix(path, prefix) for prefix in suspicious_path_prefixes ) def method_is_post(method: str) -> bool: """Indique si la méthode HTTP est POST.""" return method.strip().upper() == "POST" def uri_ends_with_php(uri: str) -> bool: """Indique si le chemin d'URI normalisé cible une ressource PHP.""" return normalize_uri_path(uri).lower().endswith(".php") def format_iso_local(ts_epoch: float) -> str: """Formate un timestamp Unix en ISO 8601 dans le fuseau local.""" return dt.datetime.fromtimestamp(ts_epoch, tz=LOCAL_TZ).isoformat() def read_json(path: Path) -> Any: """Lit et décode un document JSON depuis le disque.""" with path.open("r", encoding="utf-8") as handle: return json.load(handle) def parse_config(raw: dict[str, Any]) -> Config: """Convertit la charge JSON générée en configuration typée.""" trusted_nets = tuple( ipaddress.ip_network(value, strict=False) for value in raw["trustedBlockExcludeNets"] ) suspicious_path_prefixes = tuple( normalize_suspicious_path_prefix(value) for value in raw["suspiciousPathPrefixes"] ) opnsense_raw = raw.get("opnsense") opnsense = None if opnsense_raw: opnsense = OPNsenseConfig( base_url=opnsense_raw["baseUrl"].rstrip("/"), timeout_seconds=int(opnsense_raw["timeoutSeconds"]), verify_tls=bool(opnsense_raw["verifyTls"]), alias=AliasSpec( name=opnsense_raw["alias"]["name"], type=opnsense_raw["alias"]["type"], description=opnsense_raw["alias"]["description"], ), api_paths=ApiPaths( alias_get_uuid=opnsense_raw["apiPaths"]["aliasGetUuid"], alias_add_item=opnsense_raw["apiPaths"]["aliasAddItem"], alias_set_item=opnsense_raw["apiPaths"]["aliasSetItem"], alias_reconfigure=opnsense_raw["apiPaths"]["aliasReconfigure"], alias_util_list=opnsense_raw["apiPaths"]["aliasUtilList"], alias_util_add=opnsense_raw["apiPaths"]["aliasUtilAdd"], ), ) return Config( log_file=Path(raw["logFile"]), output_file=Path(raw["outputFile"]), cache_file=Path(raw["cacheFile"]), feed_link=raw["feedLink"], language=raw["language"], max_items=int(raw["maxItems"]), max_urls_per_item=int(raw["maxUrlsPerItem"]), cache_ttl_seconds=int(raw["cacheTtlSeconds"]), tail_lines=int(raw["tailLines"]), network_lookup_timeout_seconds=int(raw["networkLookupTimeoutSeconds"]), trusted_block_exclude_nets=trusted_nets, allowed_post_uris=frozenset(raw["allowedPostUris"]), suspicious_path_prefixes=suspicious_path_prefixes, opnsense=opnsense, ) def load_config(path: Path) -> Config: """Charge et parse le fichier de configuration JSON.""" return parse_config(read_json(path)) def load_cache(path: Path) -> dict[str, dict[str, Any]]: """Charge le cache de renseignements IP lorsqu'il existe.""" if not path.exists(): return {} payload = read_json(path) require(isinstance(payload, dict), f"Invalid cache payload in {path}") return payload def save_cache(path: Path, cache: dict[str, dict[str, Any]]) -> None: """Écrit le cache de renseignements IP de manière atomique.""" path.parent.mkdir(parents=True, exist_ok=True) tmp_path = path.with_suffix(path.suffix + ".tmp") with tmp_path.open("w", encoding="utf-8") as handle: json.dump(cache, handle, ensure_ascii=False, sort_keys=True) tmp_path.replace(path) def read_log_tail(path: Path, max_lines: int) -> list[str]: """Lit uniquement la fin du journal d'erreurs Caddy.""" if not path.exists(): raise FileNotFoundError(path) with path.open("r", encoding="utf-8", errors="strict") as handle: return list(deque(handle, maxlen=max_lines)) def extract_user_agent(headers: dict[str, Any]) -> str: """Extrait l'en-tête User-Agent d'une requête Caddy.""" user_agent_value = headers.get("User-Agent") or headers.get("user-agent") or "-" if isinstance(user_agent_value, list): return user_agent_value[0] if user_agent_value else "-" require(isinstance(user_agent_value, str), "Invalid User-Agent header payload") return user_agent_value def parse_timestamp(obj: dict[str, Any]) -> float: """Extrait un timestamp Unix depuis un objet de log Caddy.""" ts_value = obj.get("ts") if isinstance(ts_value, (int, float)): return float(ts_value) if isinstance(ts_value, str): return float(ts_value) time_value = obj.get("time") require(isinstance(time_value, str), "Missing Caddy timestamp in log record") return dt.datetime.fromisoformat(time_value.replace("Z", "+00:00")).timestamp() def parse_records(lines: list[str]) -> list[Record]: """Parse les lignes JSON de Caddy en enregistrements d'erreur triés.""" records: list[Record] = [] for raw_line in lines: line = raw_line.strip() if not line: continue obj = json.loads(line) require(isinstance(obj, dict), "Unexpected Caddy log payload") status = obj.get("status") require(isinstance(status, int), "Missing Caddy status in log record") if status < 400 or status >= 600: continue request = obj.get("request") require(isinstance(request, dict), "Missing Caddy request payload") headers = request.get("headers") or {} require(isinstance(headers, dict), "Invalid Caddy headers payload") raw_client_ip = request.get("client_ip") or request.get("remote_ip") require(isinstance(raw_client_ip, str) and raw_client_ip.strip(), "Missing client IP") client_ip = normalize_ip(raw_client_ip) uri = str(request.get("uri") or "/") method = str(request.get("method") or "-") host = str(request.get("host") or "") ts_epoch = parse_timestamp(obj) records.append( Record( client_ip=client_ip, uri=uri, method=method, status=status, user_agent=extract_user_agent(headers), host=host, full_url=f"https://{host}{uri}" if host else uri, ts_epoch=ts_epoch, timestamp_iso=format_iso_local(ts_epoch), ) ) return sorted(records, key=lambda item: item.ts_epoch, reverse=True) def build_dedup_items(records: list[Record], max_items: int) -> list[Record]: """Conserve au plus un élément de flux par IP cliente et URI.""" seen: set[tuple[str, str]] = set() dedup: list[Record] = [] for record in records: key = (record.client_ip, record.uri) if key in seen: continue seen.add(key) dedup.append(record) if len(dedup) >= max_items: break return dedup def build_ip_stats(records: list[Record]) -> dict[str, dict[str, Any]]: """Agrège les compteurs, dates et fréquences d'URL par IP.""" stats: dict[str, dict[str, Any]] = {} for record in records: current = stats.setdefault( record.client_ip, { "count": 0, "first_seen": record.ts_epoch, "last_seen": record.ts_epoch, "urls": {}, }, ) current["count"] += 1 current["first_seen"] = min(current["first_seen"], record.ts_epoch) current["last_seen"] = max(current["last_seen"], record.ts_epoch) current["urls"][record.uri] = current["urls"].get(record.uri, 0) + 1 for current in stats.values(): urls_sorted = sorted(current["urls"].items(), key=lambda item: (-item[1], item[0])) current["urls_sorted"] = urls_sorted current["distinct_urls"] = len(urls_sorted) return stats def is_excluded_from_blocking(ip: str, excluded_nets: tuple[ipaddress._BaseNetwork, ...]) -> bool: """Indique si une IP appartient à un réseau exclu du blocage.""" parsed_ip = ipaddress.ip_address(ip) return any(parsed_ip in network for network in excluded_nets) def build_blocking_reasons( records: list[Record], excluded_nets: tuple[ipaddress._BaseNetwork, ...], allowed_post_uris: frozenset[str], suspicious_path_prefixes: tuple[str, ...], ) -> dict[str, set[str]]: """Calcule les motifs de blocage détectés pour chaque IP cliente.""" reasons_by_ip: dict[str, set[str]] = {} for record in records: if is_excluded_from_blocking(record.client_ip, excluded_nets): continue reasons = reasons_by_ip.setdefault(record.client_ip, set()) if uri_ends_with_php(record.uri): reasons.add("url_php") if method_is_post(record.method) and normalize_uri_path(record.uri) not in allowed_post_uris: reasons.add("method_post") if uri_targets_suspicious_path(record.uri, suspicious_path_prefixes): reasons.add("path_probe") return reasons_by_ip def format_blocking_reasons(reasons: set[str]) -> str: """Traduit les codes de blocage en libellés lisibles.""" labels = [] if "method_post" in reasons: labels.append("requête POST hors recherche") if "url_php" in reasons: labels.append("URL en .php") if "path_probe" in reasons: labels.append("chemin classiquement scanné (CMS/admin/fichiers sensibles)") return ", ".join(labels) if labels else "aucun" def classify_ip(ip: str) -> dict[str, str]: """Décrit la version et la portée d'une adresse IP.""" addr = ipaddress.ip_address(ip) if addr.is_private: scope = "privée" elif addr.is_loopback: scope = "loopback" elif addr.is_link_local: scope = "link-local" elif addr.is_multicast: scope = "multicast" elif addr.is_reserved: scope = "réservée" elif addr.is_global: scope = "publique routable" else: scope = "indéterminée" return {"version": "IPv6" if addr.version == 6 else "IPv4", "scope": scope} def classify_network_profile(as_name: str, network: str, ptr: str, scope: str) -> str: """Estime si une IP publique semble résidentielle ou hébergée.""" if scope != "publique routable": return "adresse non publique" signal = " ".join([as_name, network, ptr]).lower() hosting_keywords = [ "cloud", "hosting", "datacenter", "server", "vps", "ovh", "amazon", "aws", "google", "azure", "microsoft", "digitalocean", "linode", "hetzner", "oracle", "scaleway", "ionos", ] residential_keywords = [ "residential", "broadband", "dsl", "fiber", "fibre", "pool", "dynamic", "pppoe", ] if any(keyword in signal for keyword in hosting_keywords): return "hébergeur/datacenter probable" if any(keyword in signal for keyword in residential_keywords): return "accès résidentiel probable" return "profil réseau indéterminé" class DNSLookup: """Effectue les requêtes DNS utilisées pour enrichir les IP clientes suspectes.""" def __init__(self, timeout_seconds: int): """Construit un résolveur configuré avec le délai demandé.""" self.resolver = dns.resolver.Resolver() self.resolver.timeout = timeout_seconds self.resolver.lifetime = timeout_seconds def _resolve_text(self, qname: Any, rdtype: str) -> list[str]: """Résout un jeu d'enregistrements et renvoie des réponses textuelles normalisées.""" answers = self.resolver.resolve(qname, rdtype) return [answer.to_text().rstrip(".") for answer in answers] def reverse_ptr(self, ip: str) -> str: """Renvoie le PTR d'une IP, ou n/d s'il est absent.""" try: pointer = dns.reversename.from_address(ip) return self._resolve_text(pointer, "PTR")[0] except dns.exception.DNSException: return "n/d" def forward_matches(self, host: str, ip: str, rrtype: str) -> str: """Vérifie qu'un nom d'hôte résout bien vers l'IP attendue.""" if host == "n/d": return "n/d" try: values = {normalize_ip(value) for value in self._resolve_text(host, rrtype)} except (dns.resolver.NXDOMAIN, dns.resolver.NoAnswer): return "non" except dns.exception.DNSException: return "indéterminé" return "oui" if ip in values else "non" def spamhaus(self, ip: str) -> str: """Interroge la DNSBL Spamhaus ZEN pour une IPv4 publique.""" addr = ipaddress.ip_address(ip) if not addr.is_global: return "non applicable (IP non publique)" if addr.version == 6: return "non vérifiée (IPv6)" query = ".".join(reversed(ip.split("."))) + ".zen.spamhaus.org" try: listed = self._resolve_text(query, "A") except (dns.resolver.NXDOMAIN, dns.resolver.NoAnswer): return "non listée" except dns.exception.DNSException: return "indisponible" return "listée (" + ", ".join(listed) + ")" def extract_abuse_contact(payload: dict[str, Any]) -> str: """Extrait le contact abuse le plus pertinent d'une réponse RDAP.""" for obj in payload.get("objects", {}).values(): if not isinstance(obj, dict): continue roles = {str(role).lower() for role in obj.get("roles", []) if isinstance(role, str)} contact = obj.get("contact") if not isinstance(contact, dict): continue emails = contact.get("email") or [] if not isinstance(emails, list): continue for entry in emails: if not isinstance(entry, dict): continue value = entry.get("value") if isinstance(value, str) and value: if "abuse" in roles: return value for obj in payload.get("objects", {}).values(): if not isinstance(obj, dict): continue contact = obj.get("contact") if not isinstance(contact, dict): continue emails = contact.get("email") or [] if not isinstance(emails, list): continue for entry in emails: if isinstance(entry, dict) and isinstance(entry.get("value"), str): return entry["value"] return "n/d" def query_ip_intel( ip: str, cache: dict[str, dict[str, Any]], now_ts: int, dns_lookup: DNSLookup, timeout_seconds: int, cache_ttl_seconds: int, ) -> dict[str, Any]: """Renvoie les renseignements DNS et RDAP d'une IP, en cache ou fraîchement récupérés.""" cached = cache.get(ip) if isinstance(cached, dict): updated_at = cached.get("updated_at") if isinstance(updated_at, (int, float)) and (now_ts - int(updated_at) < cache_ttl_seconds): return cached ip_meta = classify_ip(ip) ptr = dns_lookup.reverse_ptr(ip) rr_type = "AAAA" if ip_meta["version"] == "IPv6" else "A" fcrdns = dns_lookup.forward_matches(ptr, ip, rr_type) spamhaus = dns_lookup.spamhaus(ip) intel = { "updated_at": now_ts, "version": ip_meta["version"], "scope": ip_meta["scope"], "ptr": ptr, "fcrdns": fcrdns, "asn": "n/d", "prefix": "n/d", "as_name": "n/d", "network": "n/d", "abuse_contact": "n/d", "spamhaus": spamhaus, "profile": "adresse non publique", } if ipaddress.ip_address(ip).is_global: try: rdap = IPWhois(ip, timeout=timeout_seconds).lookup_rdap(depth=1) except (IPDefinedError, BaseIpwhoisException, OSError): rdap = None if isinstance(rdap, dict): network = rdap.get("network") or {} intel["asn"] = rdap.get("asn") or "n/d" intel["prefix"] = rdap.get("asn_cidr") or network.get("cidr") or "n/d" intel["as_name"] = rdap.get("asn_description") or "n/d" intel["network"] = network.get("name") or network.get("handle") or "n/d" intel["abuse_contact"] = extract_abuse_contact(rdap) intel["profile"] = classify_network_profile( intel["as_name"], intel["network"], ptr, intel["scope"], ) else: intel["profile"] = "profil réseau indéterminé" cache[ip] = intel return intel def build_list(root: ET.Element, title: str, rows: list[tuple[str, str, bool]]) -> None: """Ajoute une section de liste HTML titrée à la description RSS.""" title_paragraph = ET.SubElement(root, "p") ET.SubElement(title_paragraph, "strong").text = title items = ET.SubElement(root, "ul") for label, value, as_code in rows: item = ET.SubElement(items, "li") strong = ET.SubElement(item, "strong") strong.text = label if as_code: strong.tail = " " code = ET.SubElement(item, "code") code.text = value else: strong.tail = value def build_urls_list(root: ET.Element, urls_sorted: list[tuple[str, int]], max_urls: int) -> None: """Ajoute à la description HTML la liste des URL et de leurs fréquences pour l'IP.""" paragraph = ET.SubElement(root, "p") ET.SubElement(paragraph, "strong").text = "Toutes les URL en erreur pour cette IP (avec nombre d'erreurs)" items = ET.SubElement(root, "ul") visible_urls = urls_sorted[:max_urls] omitted_urls_count = max(0, len(urls_sorted) - len(visible_urls)) if not visible_urls: ET.SubElement(items, "li").text = "Aucune URL" return for url, count in visible_urls: item = ET.SubElement(items, "li") code = ET.SubElement(item, "code") code.text = url code.tail = f" ({count})" if omitted_urls_count: item = ET.SubElement(items, "li") emphasis = ET.SubElement(item, "em") emphasis.text = ( f"{omitted_urls_count} URL supplémentaires omises pour limiter la taille du flux" ) def build_description_html( record: Record, stats: dict[str, Any], intel: dict[str, Any], block_reasons: set[str], block_status: str, max_urls_per_item: int, ) -> str: """Construit le contenu HTML embarqué dans chaque élément RSS.""" root = ET.Element("div") build_list( root, "Requête en erreur", [ ("Méthode HTTP : ", record.method, True), ("Code de statut : ", str(record.status), True), ("Hôte demandé : ", record.host or "-", True), ("URL demandée : ", record.full_url, True), ("User-Agent : ", record.user_agent, True), ("IP cliente : ", record.client_ip, True), ("Date de la requête (heure locale) : ", record.timestamp_iso, False), ( "Nombre total de requêtes en erreur (IP, fenêtre analysée) : ", str(stats.get("count", 1)), False, ), ( "Nombre d'URL distinctes en erreur (IP) : ", str(stats.get("distinct_urls", 1)), False, ), ( "Signal de blocage OPNsense observé pour cette IP : ", format_blocking_reasons(block_reasons), False, ), ( "État d'ajout dans l'alias OPNsense : ", block_status or "non tenté", False, ), ], ) build_list( root, "Renseignements réseau", [ ( "Type d'adresse : ", f"{intel.get('version', 'n/d')} - {intel.get('scope', 'n/d')}", False, ), ("PTR (reverse DNS) : ", intel.get("ptr", "n/d"), False), ("FCrDNS (PTR cohérent) : ", intel.get("fcrdns", "n/d"), False), ("ASN : ", intel.get("asn", "n/d"), False), ("Préfixe : ", intel.get("prefix", "n/d"), False), ("Nom AS : ", intel.get("as_name", "n/d"), False), ("Réseau / organisation : ", intel.get("network", "n/d"), False), ("Contact abuse : ", intel.get("abuse_contact", "n/d"), False), ("Spamhaus ZEN : ", intel.get("spamhaus", "inconnu"), False), ("Profil réseau : ", intel.get("profile", "indéterminé"), False), ( "Première erreur observée (heure locale) : ", format_iso_local(stats.get("first_seen", record.ts_epoch)), False, ), ( "Dernière erreur observée (heure locale) : ", format_iso_local(stats.get("last_seen", record.ts_epoch)), False, ), ], ) build_urls_list(root, stats.get("urls_sorted", []), max_urls_per_item) return ET.tostring(root, encoding="unicode", method="html") class OPNsenseAliasClient: """Gère l'alias OPNsense utilisé pour les IP clientes suspectes.""" def __init__(self, config: OPNsenseConfig, credentials_directory: Path): """Initialise la session HTTP authentifiée vers OPNsense.""" self.config = config self.alias_uuid: str | None = None self.known_alias_ips: set[str] | None = None key_path = credentials_directory / "opnsense_api_key" secret_path = credentials_directory / "opnsense_api_secret" api_key = key_path.read_text(encoding="utf-8").strip() api_secret = secret_path.read_text(encoding="utf-8").strip() require(api_key != "", f"Empty OPNsense API key in {key_path}") require(api_secret != "", f"Empty OPNsense API secret in {secret_path}") self.session = requests.Session() self.session.auth = (api_key, api_secret) self.session.verify = config.verify_tls def _build_url(self, template: str, **values: str) -> str: """Rend un modèle de chemin d'API en URL complète.""" replacements = {key: quote(value, safe="") for key, value in values.items()} return self.config.base_url + template.format(**replacements) def _request_json(self, method: str, template: str, **kwargs: Any) -> dict[str, Any]: """Envoie une requête à l'API OPNsense et valide la forme de la réponse JSON.""" response = self.session.request( method=method, url=self._build_url(template, **kwargs.pop("path_values", {})), timeout=self.config.timeout_seconds, **kwargs, ) response.raise_for_status() payload = response.json() require(isinstance(payload, dict), "Unexpected OPNsense API payload") return payload def get_alias_uuid(self) -> str | None: """Recherche l'UUID de l'alias configuré, s'il existe.""" payload = self._request_json( "GET", self.config.api_paths.alias_get_uuid, path_values={"alias": self.config.alias.name}, ) uuid = payload.get("uuid") return uuid if isinstance(uuid, str) and uuid else None def reconfigure(self) -> None: """Demande à OPNsense d'appliquer les changements d'alias en attente.""" payload = self._request_json("POST", self.config.api_paths.alias_reconfigure) require( payload.get("status") in {"ok", "done"}, f"OPNsense alias reconfigure failed: {payload}", ) def ensure_alias_exists(self) -> None: """Crée l'alias configuré lorsqu'il n'existe pas encore.""" if self.alias_uuid is not None: return uuid = self.get_alias_uuid() if uuid is None: self._request_json( "POST", self.config.api_paths.alias_add_item, json={ "alias": { "enabled": "1", "name": self.config.alias.name, "type": self.config.alias.type, "content": "", "description": self.config.alias.description, } }, ) uuid = self.get_alias_uuid() require(uuid is not None, f"Unable to create OPNsense alias {self.config.alias.name!r}") self.alias_uuid = uuid self._request_json( "POST", self.config.api_paths.alias_set_item, path_values={"uuid": uuid}, json={ "alias": { "enabled": "1", "name": self.config.alias.name, "type": self.config.alias.type, "content": "", "description": self.config.alias.description, } }, ) self.reconfigure() else: self.alias_uuid = uuid def ensure_alias_snapshot(self) -> set[str]: """Récupère et met en cache les IP actuellement présentes dans la table d'alias.""" if self.known_alias_ips is not None: return self.known_alias_ips self.ensure_alias_exists() payload = self._request_json( "GET", self.config.api_paths.alias_util_list, path_values={"alias": self.config.alias.name}, ) rows = payload.get("rows") require(isinstance(rows, list), "Unexpected OPNsense alias listing payload") snapshot: set[str] = set() for row in rows: require(isinstance(row, dict), "Unexpected OPNsense alias row payload") candidate = row.get("ip") or row.get("address") or row.get("item") if candidate: snapshot.add(normalize_ip(str(candidate))) self.known_alias_ips = snapshot return snapshot def add_ip_if_missing(self, ip: str) -> str: """Ajoute une IP à la table d'alias si elle n'y est pas déjà.""" snapshot = self.ensure_alias_snapshot() if ip in snapshot: return "already_present" payload = self._request_json( "POST", self.config.api_paths.alias_util_add, path_values={"alias": self.config.alias.name}, json={"address": ip}, ) require(payload.get("status") == "done", f"OPNsense alias add failed: {payload}") snapshot.add(ip) return "added" def build_feed( config: Config, items: list[Record], ip_stats: dict[str, dict[str, Any]], blocking_reasons_by_ip: dict[str, set[str]], cache: dict[str, dict[str, Any]], opnsense_client: OPNsenseAliasClient | None, ) -> bytes: """Construit la charge RSS à partir des enregistrements parsés et des données d'enrichissement.""" fg = FeedGenerator() fg.title("Flux RSS des erreurs Caddy") fg.description("Erreurs HTTP Caddy (déduplication par IP client + URL demandée)") fg.link(href=config.feed_link) fg.language(config.language) fg.generator("caddy-errors-rss") fg.lastBuildDate(dt.datetime.now(tz=LOCAL_TZ)) dns_lookup = DNSLookup(config.network_lookup_timeout_seconds) blocking_state_by_ip: dict[str, str] = {} now_ts = int(dt.datetime.now(tz=dt.timezone.utc).timestamp()) for item in items: block_reasons = blocking_reasons_by_ip.get(item.client_ip, set()) should_try_block = bool(block_reasons) intel = query_ip_intel( item.client_ip, cache, now_ts, dns_lookup, config.network_lookup_timeout_seconds, config.cache_ttl_seconds, ) stats = ip_stats.get( item.client_ip, { "count": 1, "distinct_urls": 1, "urls_sorted": [(item.uri, 1)], "first_seen": item.ts_epoch, "last_seen": item.ts_epoch, }, ) block_status = "" if should_try_block: if opnsense_client is None: block_status = "désactivé" else: if item.client_ip not in blocking_state_by_ip: blocking_state_by_ip[item.client_ip] = opnsense_client.add_ip_if_missing( item.client_ip ) block_status = blocking_state_by_ip[item.client_ip] title_prefix = "[Bloqué] " if block_status in {"added", "already_present"} else "" title = f"{title_prefix}[{item.client_ip}] [{item.method} {item.status}] {item.uri}" guid = hashlib.sha256(f"{item.client_ip}|{item.uri}".encode("utf-8")).hexdigest() entry = fg.add_entry(order="append") entry.title(title) entry.guid(guid, permalink=False) entry.pubDate(dt.datetime.fromtimestamp(item.ts_epoch, tz=LOCAL_TZ)) entry.content( build_description_html( record=item, stats=stats, intel=intel, block_reasons=block_reasons, block_status=block_status, max_urls_per_item=config.max_urls_per_item, ), type="CDATA", ) return fg.rss_str(pretty=True) def parse_args() -> argparse.Namespace: """Parse les arguments de ligne de commande acceptés par le script.""" parser = argparse.ArgumentParser(description="Generate an RSS feed from Caddy error logs.") parser.add_argument("--config", required=True, type=Path, help="Path to the JSON config file.") return parser.parse_args() def main() -> None: """Exécute un cycle complet de génération du flux et écrit les résultats sur disque.""" args = parse_args() config = load_config(args.config) config.output_file.parent.mkdir(parents=True, exist_ok=True) config.cache_file.parent.mkdir(parents=True, exist_ok=True) opnsense_client = None if config.opnsense is not None: credentials_directory = Path(os.environ["CREDENTIALS_DIRECTORY"]) opnsense_client = OPNsenseAliasClient(config.opnsense, credentials_directory) cache = load_cache(config.cache_file) records = parse_records(read_log_tail(config.log_file, config.tail_lines)) items = build_dedup_items(records, config.max_items) ip_stats = build_ip_stats(records) blocking_reasons_by_ip = build_blocking_reasons( records, config.trusted_block_exclude_nets, config.allowed_post_uris, config.suspicious_path_prefixes, ) rss_payload = build_feed( config, items, ip_stats, blocking_reasons_by_ip, cache, opnsense_client, ) tmp_output = config.output_file.with_suffix(config.output_file.suffix + ".tmp") tmp_output.write_bytes(rss_payload) tmp_output.replace(config.output_file) save_cache(config.cache_file, cache) if __name__ == "__main__": main() ``` ### Configuration de mon virtualhost ```nix { config, lib, ... }: { services.caddy = { # [...] # Le vhost de mon blog virtualHosts = { ":30080" = { extraConfig = '' # [...] # On log les erreurs à part log errors { output file ${config.services.caddyErrorsRss.logFile} { roll_size 15MiB roll_keep 14 roll_keep_for 336h } } # [...] handle_errors { log_name errors # On n'oublie pas la gestion spécifiques des 404 @notFound expression {http.error.status_code} == 404 route @notFound { rewrite * /404.html file_server } } ''; }; }; }; } ``` ### Configuration du module nix ```nix { ... }: { imports = [ ./modules/caddy-errors.nix ]; services.caddyErrorsRss = { # Chemin du fichier journal généré par Caddy logFile = "/var/log/caddy/richard-dern.fr-errors.log"; # Adresse du serveur à joindre pour afficher le flux feedAddress = "10.0.3.1"; # Port du vhost permettant d'accéder au flux listenPort = 30083; blocking = { trustedExcludeNets = [ "10.0.0.0/8" # Mes préfixes IPv6 "...:3210::/64" "...:3211::/64" ]; allowedPostUris = [ # C'est ici qu'on évite le blocage sur le formulaire de recherche "/api/search/indexes/blog_posts/search" "/indexes/blog_posts/search" ]; # Et ici les URL caractéristiques. # À ne pas utiliser si votre blog utilise Wordpress... suspiciousPathPrefixes = [ "/wp-admin" "/wp-login.php" "/wp-content" "/wp-includes" "/wp-json" "/wordpress" "/xmlrpc.php" "/wlwmanifest.xml" "/readme.html" "/license.txt" "/admin" "/administrator" "/adminer" "/phpmyadmin" "/pma" "/manager" "/manager/html" "/webadmin" "/server-status" "/phpinfo.php" "/drupal" "/sites/default" "/core/install.php" "/joomla" "/components" "/modules" "/plugins" "/templates" "/vendor/composer" "/.env" "/.git" "/.svn" "/.hg" "/.bzr" "/.DS_Store" "/.aws" "/.ssh" "/vendor/phpunit" "/artisan" "/storage" "/_ignition" "/telescope" "/cgi-bin" "/boaform" "/HNAP1" "/autodiscover" "/owa" "/remote" "/jmx-console" "/solr" "/actuator" "/jenkins" "/hudson" ]; }; opnsense = { enable = true; # Nom de l'alias dans OPNsense aliasName = "comportement_suspect"; # Adresse d'accès à OPNsense apiBaseUrl = "https://..."; apiKeyFile = "/etc/nixos/secrets/services/caddy-errors-rss/opnsense-api-key"; apiSecretFile = "/etc/nixos/secrets/services/caddy-errors-rss/opnsense-api-secret"; }; }; } ``` La clé d'API et le secret sont obtenus depuis _System_, _Access_, _Users_ dans OPNsense (c'est l'un des boutons en regard d'un utilisateur). ## Résultat ![](images/shaming.png) Chaque élément de flux me permet alors de décider si je dois bloquer une IP manuellement. Les éléments heuristiques indiqués dans la configuration permettent un blocage automatique, mais certains peuvent passer au travers : ces informations supplémentaires m'aident à décider si une requête est légitime ou pas. Un effet de bord positif de ce que j'ai mis en place est que je peux voir des requêtes pointant vers des anciens URLs, légitimes mais qui ont changé au cours du temps. Cela me permet d'ajouter à Hugo des aliases pour ces URLs afin d'éviter le _link-rot_. ![](images/aliases.png) On peut voir alors l'alias se remplir, et le nombre de paquets matchés dans OPNsense. C'est efficace ! ## Conclusion L'implémentation de cette solution a nécessité quelques efforts en amont pour un résultat immédiatement satisfaisant : je n'ai plus besoin d'avoir le nez ni dans les logs de Caddy ni dans le firewall d'OPNsense, mais j'ai quand même un visuel clair grâce aux flux RSS. Et j'ai la souplesse intrinsèque à une solution "fait-maison" ou "à l'ancienne". En outre, la solution de blocage intervient là où elle est supposée intervenir : elle bloque des cas concrets et tangibles de "tentatives d'infractions", après l'IPS/IDS d'OPNsense. En d'autres termes, elle respecte la présomption d'innocence : elle ne bloque pas un bot [sous prétexte que c'est une IA](/interets/informatique/2025/04/02/laisser-l-ia-apprendre-une-position-ethique/). Enfin, comme je n'ai jamais adhéré au _cloud_, je voulais une solution _locale_, sur laquelle j'ai le contrôle. Autrement, j'aurais fait comme tout le monde, en passant par [Cloudflare](https://www.cloudflare.com/fr-fr/).