package store import ( "context" "database/sql" "encoding/json" "errors" "fmt" "os" "path/filepath" "sort" "strings" "time" "git.dern.ovh/infrastructure/caddy-opnsense-blocker/internal/model" _ "modernc.org/sqlite" ) const schema = ` CREATE TABLE IF NOT EXISTS events ( id INTEGER PRIMARY KEY AUTOINCREMENT, source_name TEXT NOT NULL, profile_name TEXT NOT NULL, occurred_at TEXT NOT NULL, remote_ip TEXT NOT NULL, client_ip TEXT NOT NULL, host TEXT NOT NULL, method TEXT NOT NULL, uri TEXT NOT NULL, path TEXT NOT NULL, status INTEGER NOT NULL, user_agent TEXT NOT NULL, decision TEXT NOT NULL, decision_reason TEXT NOT NULL, decision_reasons_json TEXT NOT NULL, enforced INTEGER NOT NULL DEFAULT 0, raw_json TEXT NOT NULL, created_at TEXT NOT NULL ); CREATE INDEX IF NOT EXISTS idx_events_occurred_at ON events(occurred_at DESC, id DESC); CREATE INDEX IF NOT EXISTS idx_events_client_ip ON events(client_ip, occurred_at DESC, id DESC); CREATE INDEX IF NOT EXISTS idx_events_source_name ON events(source_name, occurred_at DESC, id DESC); CREATE INDEX IF NOT EXISTS idx_events_decision ON events(decision, occurred_at DESC, id DESC); CREATE TABLE IF NOT EXISTS ip_state ( ip TEXT PRIMARY KEY, first_seen_at TEXT NOT NULL, last_seen_at TEXT NOT NULL, last_source_name TEXT NOT NULL, last_user_agent TEXT NOT NULL, latest_status INTEGER NOT NULL, total_events INTEGER NOT NULL, state TEXT NOT NULL, state_reason TEXT NOT NULL, manual_override TEXT NOT NULL, last_event_id INTEGER NOT NULL, updated_at TEXT NOT NULL ); CREATE INDEX IF NOT EXISTS idx_ip_state_last_seen ON ip_state(last_seen_at DESC, ip ASC); CREATE INDEX IF NOT EXISTS idx_ip_state_state ON ip_state(state, last_seen_at DESC, ip ASC); CREATE TABLE IF NOT EXISTS decisions ( id INTEGER PRIMARY KEY AUTOINCREMENT, event_id INTEGER NOT NULL, ip TEXT NOT NULL, source_name TEXT NOT NULL, kind TEXT NOT NULL, action TEXT NOT NULL, reason TEXT NOT NULL, actor TEXT NOT NULL, enforced INTEGER NOT NULL DEFAULT 0, created_at TEXT NOT NULL ); CREATE INDEX IF NOT EXISTS idx_decisions_ip ON decisions(ip, created_at DESC, id DESC); CREATE INDEX IF NOT EXISTS idx_decisions_event_id ON decisions(event_id, created_at DESC, id DESC); CREATE TABLE IF NOT EXISTS backend_actions ( id INTEGER PRIMARY KEY AUTOINCREMENT, ip TEXT NOT NULL, action TEXT NOT NULL, result TEXT NOT NULL, message TEXT NOT NULL, created_at TEXT NOT NULL ); CREATE INDEX IF NOT EXISTS idx_backend_actions_ip ON backend_actions(ip, created_at DESC, id DESC); CREATE TABLE IF NOT EXISTS source_offsets ( source_name TEXT PRIMARY KEY, path TEXT NOT NULL, inode TEXT NOT NULL, offset INTEGER NOT NULL, updated_at TEXT NOT NULL ); CREATE TABLE IF NOT EXISTS ip_investigations ( ip TEXT PRIMARY KEY, payload_json TEXT NOT NULL, updated_at TEXT NOT NULL ); CREATE INDEX IF NOT EXISTS idx_ip_investigations_updated_at ON ip_investigations(updated_at DESC, ip ASC); ` type Store struct { db *sql.DB } func Open(path string) (*Store, error) { if err := os.MkdirAll(filepath.Dir(path), 0o755); err != nil { return nil, fmt.Errorf("create storage directory: %w", err) } db, err := sql.Open("sqlite", path) if err != nil { return nil, fmt.Errorf("open sqlite database: %w", err) } db.SetMaxOpenConns(1) db.SetMaxIdleConns(1) db.SetConnMaxLifetime(0) ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) defer cancel() for _, statement := range []string{ "PRAGMA journal_mode = WAL;", "PRAGMA busy_timeout = 5000;", "PRAGMA foreign_keys = ON;", } { if _, err := db.ExecContext(ctx, statement); err != nil { db.Close() return nil, fmt.Errorf("apply sqlite pragma %q: %w", statement, err) } } if _, err := db.ExecContext(ctx, schema); err != nil { db.Close() return nil, fmt.Errorf("apply sqlite schema: %w", err) } return &Store{db: db}, nil } func (s *Store) Close() error { if s == nil || s.db == nil { return nil } return s.db.Close() } func (s *Store) RecordEvent(ctx context.Context, event *model.Event) error { if event == nil { return errors.New("nil event") } if event.OccurredAt.IsZero() { event.OccurredAt = time.Now().UTC() } if event.CreatedAt.IsZero() { event.CreatedAt = time.Now().UTC() } encodedReasons, err := json.Marshal(event.DecisionReasons) if err != nil { return fmt.Errorf("encode decision reasons: %w", err) } tx, err := s.db.BeginTx(ctx, nil) if err != nil { return fmt.Errorf("begin transaction: %w", err) } defer tx.Rollback() state, found, err := getIPStateTx(tx, event.ClientIP) if err != nil { return err } result, err := tx.ExecContext( ctx, `INSERT INTO events ( source_name, profile_name, occurred_at, remote_ip, client_ip, host, method, uri, path, status, user_agent, decision, decision_reason, decision_reasons_json, enforced, raw_json, created_at ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)`, event.SourceName, event.ProfileName, formatTime(event.OccurredAt), event.RemoteIP, event.ClientIP, event.Host, event.Method, event.URI, event.Path, event.Status, event.UserAgent, string(event.Decision), event.DecisionReason, string(encodedReasons), boolToInt(event.Enforced), event.RawJSON, formatTime(event.CreatedAt), ) if err != nil { return fmt.Errorf("insert event: %w", err) } eventID, err := result.LastInsertId() if err != nil { return fmt.Errorf("load inserted event id: %w", err) } event.ID = eventID updatedState := mergeEventIntoState(state, found, *event) event.CurrentState = updatedState.State event.ManualOverride = updatedState.ManualOverride if err := upsertIPStateTx(ctx, tx, updatedState); err != nil { return err } if err := tx.Commit(); err != nil { return fmt.Errorf("commit event transaction: %w", err) } return nil } func (s *Store) AddDecision(ctx context.Context, decision *model.DecisionRecord) error { if decision == nil { return errors.New("nil decision record") } if decision.CreatedAt.IsZero() { decision.CreatedAt = time.Now().UTC() } result, err := s.db.ExecContext( ctx, `INSERT INTO decisions (event_id, ip, source_name, kind, action, reason, actor, enforced, created_at) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)`, decision.EventID, decision.IP, decision.SourceName, decision.Kind, string(decision.Action), decision.Reason, decision.Actor, boolToInt(decision.Enforced), formatTime(decision.CreatedAt), ) if err != nil { return fmt.Errorf("insert decision record: %w", err) } decision.ID, err = result.LastInsertId() if err != nil { return fmt.Errorf("load inserted decision id: %w", err) } return nil } func (s *Store) AddBackendAction(ctx context.Context, action *model.OPNsenseAction) error { if action == nil { return errors.New("nil backend action") } if action.CreatedAt.IsZero() { action.CreatedAt = time.Now().UTC() } result, err := s.db.ExecContext( ctx, `INSERT INTO backend_actions (ip, action, result, message, created_at) VALUES (?, ?, ?, ?, ?)`, action.IP, action.Action, action.Result, action.Message, formatTime(action.CreatedAt), ) if err != nil { return fmt.Errorf("insert backend action: %w", err) } action.ID, err = result.LastInsertId() if err != nil { return fmt.Errorf("load inserted backend action id: %w", err) } return nil } func (s *Store) GetIPState(ctx context.Context, ip string) (model.IPState, bool, error) { return getIPStateDB(ctx, s.db, ip) } func (s *Store) SetManualOverride(ctx context.Context, ip string, override model.ManualOverride, state model.IPStateStatus, reason string) (model.IPState, error) { tx, err := s.db.BeginTx(ctx, nil) if err != nil { return model.IPState{}, fmt.Errorf("begin transaction: %w", err) } defer tx.Rollback() current, found, err := getIPStateTx(tx, ip) if err != nil { return model.IPState{}, err } now := time.Now().UTC() if !found { current = model.IPState{ IP: ip, FirstSeenAt: now, LastSeenAt: now, LastSourceName: "", LastUserAgent: "", LatestStatus: 0, TotalEvents: 0, State: state, StateReason: strings.TrimSpace(reason), ManualOverride: override, LastEventID: 0, UpdatedAt: now, } } else { current.ManualOverride = override current.State = state if strings.TrimSpace(reason) != "" { current.StateReason = strings.TrimSpace(reason) } current.UpdatedAt = now } if err := upsertIPStateTx(ctx, tx, current); err != nil { return model.IPState{}, err } if err := tx.Commit(); err != nil { return model.IPState{}, fmt.Errorf("commit transaction: %w", err) } return current, nil } func (s *Store) ClearManualOverride(ctx context.Context, ip string, reason string) (model.IPState, error) { tx, err := s.db.BeginTx(ctx, nil) if err != nil { return model.IPState{}, fmt.Errorf("begin transaction: %w", err) } defer tx.Rollback() current, found, err := getIPStateTx(tx, ip) if err != nil { return model.IPState{}, err } now := time.Now().UTC() if !found { current = model.IPState{ IP: ip, FirstSeenAt: now, LastSeenAt: now, State: model.IPStateObserved, StateReason: strings.TrimSpace(reason), ManualOverride: model.ManualOverrideNone, UpdatedAt: now, } } else { current.ManualOverride = model.ManualOverrideNone if current.State == "" { current.State = model.IPStateObserved } if strings.TrimSpace(reason) != "" { current.StateReason = strings.TrimSpace(reason) } current.UpdatedAt = now } if err := upsertIPStateTx(ctx, tx, current); err != nil { return model.IPState{}, err } if err := tx.Commit(); err != nil { return model.IPState{}, fmt.Errorf("commit transaction: %w", err) } return current, nil } func (s *Store) GetOverview(ctx context.Context, limit int) (model.Overview, error) { if limit <= 0 { limit = 50 } var overview model.Overview if err := s.db.QueryRowContext(ctx, `SELECT COUNT(*) FROM events`).Scan(&overview.TotalEvents); err != nil { return model.Overview{}, fmt.Errorf("count events: %w", err) } if err := s.db.QueryRowContext(ctx, `SELECT COUNT(*) FROM ip_state`).Scan(&overview.TotalIPs); err != nil { return model.Overview{}, fmt.Errorf("count ip states: %w", err) } if err := s.db.QueryRowContext(ctx, `SELECT COUNT(*) FROM ip_state WHERE state = ?`, string(model.IPStateBlocked)).Scan(&overview.BlockedIPs); err != nil { return model.Overview{}, fmt.Errorf("count blocked ip states: %w", err) } if err := s.db.QueryRowContext(ctx, `SELECT COUNT(*) FROM ip_state WHERE state = ?`, string(model.IPStateReview)).Scan(&overview.ReviewIPs); err != nil { return model.Overview{}, fmt.Errorf("count review ip states: %w", err) } if err := s.db.QueryRowContext(ctx, `SELECT COUNT(*) FROM ip_state WHERE state = ?`, string(model.IPStateAllowed)).Scan(&overview.AllowedIPs); err != nil { return model.Overview{}, fmt.Errorf("count allowed ip states: %w", err) } if err := s.db.QueryRowContext(ctx, `SELECT COUNT(*) FROM ip_state WHERE state = ?`, string(model.IPStateObserved)).Scan(&overview.ObservedIPs); err != nil { return model.Overview{}, fmt.Errorf("count observed ip states: %w", err) } recentIPs, err := s.ListIPStates(ctx, limit, "") if err != nil { return model.Overview{}, err } recentEvents, err := s.ListRecentEvents(ctx, limit) if err != nil { return model.Overview{}, err } overview.RecentIPs = recentIPs overview.RecentEvents = recentEvents return overview, nil } func (s *Store) ListRecentEvents(ctx context.Context, limit int) ([]model.Event, error) { if limit <= 0 { limit = 50 } rows, err := s.db.QueryContext(ctx, ` SELECT e.id, e.source_name, e.profile_name, e.occurred_at, e.remote_ip, e.client_ip, e.host, e.method, e.uri, e.path, e.status, e.user_agent, e.decision, e.decision_reason, e.decision_reasons_json, e.enforced, e.raw_json, e.created_at, COALESCE(s.state, ''), COALESCE(s.manual_override, '') FROM events e LEFT JOIN ip_state s ON s.ip = e.client_ip ORDER BY e.occurred_at DESC, e.id DESC LIMIT ?`, limit, ) if err != nil { return nil, fmt.Errorf("list recent events: %w", err) } defer rows.Close() items := make([]model.Event, 0, limit) for rows.Next() { item, err := scanEvent(rows) if err != nil { return nil, err } items = append(items, item) } if err := rows.Err(); err != nil { return nil, fmt.Errorf("iterate recent events: %w", err) } return items, nil } func (s *Store) ListIPStates(ctx context.Context, limit int, stateFilter string) ([]model.IPState, error) { if limit <= 0 { limit = 50 } query := `SELECT ip, first_seen_at, last_seen_at, last_source_name, last_user_agent, latest_status, total_events, state, state_reason, manual_override, last_event_id, updated_at FROM ip_state` args := []any{} if strings.TrimSpace(stateFilter) != "" { query += ` WHERE state = ?` args = append(args, strings.TrimSpace(stateFilter)) } query += ` ORDER BY last_seen_at DESC, ip ASC LIMIT ?` args = append(args, limit) rows, err := s.db.QueryContext(ctx, query, args...) if err != nil { return nil, fmt.Errorf("list ip states: %w", err) } defer rows.Close() items := make([]model.IPState, 0, limit) for rows.Next() { item, err := scanIPState(rows) if err != nil { return nil, err } items = append(items, item) } if err := rows.Err(); err != nil { return nil, fmt.Errorf("iterate ip states: %w", err) } return items, nil } func (s *Store) ListRecentIPRows(ctx context.Context, since time.Time, limit int) ([]model.RecentIPRow, error) { if limit <= 0 { limit = 200 } rows, err := s.db.QueryContext(ctx, ` WITH recent AS ( SELECT client_ip, COUNT(*) AS event_count, MAX(occurred_at) AS last_seen_at FROM events WHERE occurred_at >= ? GROUP BY client_ip ) SELECT s.ip, COALESCE(( SELECT e.source_name FROM events e WHERE e.client_ip = s.ip AND e.occurred_at >= ? ORDER BY e.occurred_at DESC, e.id DESC LIMIT 1 ), s.last_source_name) AS source_name, s.state, recent.event_count, recent.last_seen_at, s.state_reason, s.manual_override FROM recent JOIN ip_state s ON s.ip = recent.client_ip ORDER BY recent.event_count DESC, recent.last_seen_at DESC, s.ip ASC LIMIT ?`, formatTime(since), formatTime(since), limit, ) if err != nil { return nil, fmt.Errorf("list recent ip rows: %w", err) } defer rows.Close() items := make([]model.RecentIPRow, 0, limit) for rows.Next() { var item model.RecentIPRow var state string var lastSeenAt string var manualOverride string if err := rows.Scan( &item.IP, &item.SourceName, &state, &item.Events, &lastSeenAt, &item.Reason, &manualOverride, ); err != nil { return nil, fmt.Errorf("scan recent ip row: %w", err) } parsedLastSeenAt, err := parseTime(lastSeenAt) if err != nil { return nil, fmt.Errorf("parse recent ip row last_seen_at: %w", err) } item.State = model.IPStateStatus(state) item.LastSeenAt = parsedLastSeenAt item.ManualOverride = model.ManualOverride(manualOverride) items = append(items, item) } if err := rows.Err(); err != nil { return nil, fmt.Errorf("iterate recent ip rows: %w", err) } return items, nil } func (s *Store) ListIPsWithoutInvestigation(ctx context.Context, since time.Time, limit int) ([]string, error) { if limit <= 0 { limit = 200 } query := ` SELECT s.ip FROM ip_state s LEFT JOIN ip_investigations i ON i.ip = s.ip WHERE i.ip IS NULL` args := make([]any, 0, 2) if !since.IsZero() { query += ` AND s.last_seen_at >= ?` args = append(args, formatTime(since)) } query += ` ORDER BY s.last_seen_at DESC, s.ip ASC LIMIT ?` args = append(args, limit) rows, err := s.db.QueryContext(ctx, query, args...) if err != nil { return nil, fmt.Errorf("list ips without investigation: %w", err) } defer rows.Close() items := make([]string, 0, limit) for rows.Next() { var ip string if err := rows.Scan(&ip); err != nil { return nil, fmt.Errorf("scan ip without investigation: %w", err) } items = append(items, ip) } if err := rows.Err(); err != nil { return nil, fmt.Errorf("iterate ips without investigation: %w", err) } return items, nil } func (s *Store) GetIPDetails(ctx context.Context, ip string, eventLimit, decisionLimit, actionLimit int) (model.IPDetails, error) { state, _, err := s.GetIPState(ctx, ip) if err != nil { return model.IPDetails{}, err } events, err := s.listEventsForIP(ctx, ip, eventLimit) if err != nil { return model.IPDetails{}, err } decisions, err := s.listDecisionsForIP(ctx, ip, decisionLimit) if err != nil { return model.IPDetails{}, err } actions, err := s.listBackendActionsForIP(ctx, ip, actionLimit) if err != nil { return model.IPDetails{}, err } return model.IPDetails{ State: state, RecentEvents: events, Decisions: decisions, BackendActions: actions, }, nil } func (s *Store) GetInvestigation(ctx context.Context, ip string) (model.IPInvestigation, bool, error) { row := s.db.QueryRowContext(ctx, `SELECT payload_json, updated_at FROM ip_investigations WHERE ip = ?`, ip) var payload string var updatedAt string if err := row.Scan(&payload, &updatedAt); err != nil { if errors.Is(err, sql.ErrNoRows) { return model.IPInvestigation{}, false, nil } return model.IPInvestigation{}, false, fmt.Errorf("query ip investigation %q: %w", ip, err) } var item model.IPInvestigation if err := json.Unmarshal([]byte(payload), &item); err != nil { return model.IPInvestigation{}, false, fmt.Errorf("decode ip investigation %q: %w", ip, err) } parsed, err := parseTime(updatedAt) if err != nil { return model.IPInvestigation{}, false, fmt.Errorf("parse ip investigation updated_at: %w", err) } item.UpdatedAt = parsed return item, true, nil } func (s *Store) GetInvestigationsForIPs(ctx context.Context, ips []string) (map[string]model.IPInvestigation, error) { unique := uniqueNonEmptyStrings(ips) if len(unique) == 0 { return map[string]model.IPInvestigation{}, nil } placeholders := make([]string, len(unique)) args := make([]any, 0, len(unique)) for index, ip := range unique { placeholders[index] = "?" args = append(args, ip) } rows, err := s.db.QueryContext(ctx, fmt.Sprintf( `SELECT ip, payload_json, updated_at FROM ip_investigations WHERE ip IN (%s)`, strings.Join(placeholders, ", "), ), args...) if err != nil { return nil, fmt.Errorf("query ip investigations: %w", err) } defer rows.Close() items := make(map[string]model.IPInvestigation, len(unique)) for rows.Next() { var ip string var payload string var updatedAt string if err := rows.Scan(&ip, &payload, &updatedAt); err != nil { return nil, fmt.Errorf("scan ip investigation: %w", err) } var item model.IPInvestigation if err := json.Unmarshal([]byte(payload), &item); err != nil { return nil, fmt.Errorf("decode ip investigation %q: %w", ip, err) } parsed, err := parseTime(updatedAt) if err != nil { return nil, fmt.Errorf("parse ip investigation updated_at for %q: %w", ip, err) } item.UpdatedAt = parsed items[ip] = item } if err := rows.Err(); err != nil { return nil, fmt.Errorf("iterate ip investigations: %w", err) } return items, nil } func (s *Store) SaveInvestigation(ctx context.Context, item model.IPInvestigation) error { if item.UpdatedAt.IsZero() { item.UpdatedAt = time.Now().UTC() } payload, err := json.Marshal(item) if err != nil { return fmt.Errorf("encode ip investigation: %w", err) } _, err = s.db.ExecContext( ctx, `INSERT INTO ip_investigations (ip, payload_json, updated_at) VALUES (?, ?, ?) ON CONFLICT(ip) DO UPDATE SET payload_json = excluded.payload_json, updated_at = excluded.updated_at`, item.IP, string(payload), formatTime(item.UpdatedAt), ) if err != nil { return fmt.Errorf("upsert ip investigation %q: %w", item.IP, err) } return nil } func (s *Store) ListRecentUserAgentsForIP(ctx context.Context, ip string, limit int) ([]string, error) { if limit <= 0 { limit = 10 } rows, err := s.db.QueryContext(ctx, ` SELECT user_agent FROM events WHERE client_ip = ? AND TRIM(user_agent) <> '' GROUP BY user_agent ORDER BY MAX(occurred_at) DESC, user_agent ASC LIMIT ?`, ip, limit, ) if err != nil { return nil, fmt.Errorf("list recent user agents for ip %q: %w", ip, err) } defer rows.Close() items := make([]string, 0, limit) for rows.Next() { var userAgent string if err := rows.Scan(&userAgent); err != nil { return nil, fmt.Errorf("scan recent user agent for ip %q: %w", ip, err) } items = append(items, userAgent) } if err := rows.Err(); err != nil { return nil, fmt.Errorf("iterate recent user agents for ip %q: %w", ip, err) } return items, nil } func (s *Store) GetSourceOffset(ctx context.Context, sourceName string) (model.SourceOffset, bool, error) { row := s.db.QueryRowContext(ctx, `SELECT source_name, path, inode, offset, updated_at FROM source_offsets WHERE source_name = ?`, sourceName) var offset model.SourceOffset var updatedAt string if err := row.Scan(&offset.SourceName, &offset.Path, &offset.Inode, &offset.Offset, &updatedAt); err != nil { if errors.Is(err, sql.ErrNoRows) { return model.SourceOffset{}, false, nil } return model.SourceOffset{}, false, fmt.Errorf("query source offset %q: %w", sourceName, err) } parsed, err := parseTime(updatedAt) if err != nil { return model.SourceOffset{}, false, fmt.Errorf("parse source offset updated_at: %w", err) } offset.UpdatedAt = parsed return offset, true, nil } func (s *Store) SaveSourceOffset(ctx context.Context, offset model.SourceOffset) error { if offset.UpdatedAt.IsZero() { offset.UpdatedAt = time.Now().UTC() } _, err := s.db.ExecContext( ctx, `INSERT INTO source_offsets (source_name, path, inode, offset, updated_at) VALUES (?, ?, ?, ?, ?) ON CONFLICT(source_name) DO UPDATE SET path = excluded.path, inode = excluded.inode, offset = excluded.offset, updated_at = excluded.updated_at`, offset.SourceName, offset.Path, offset.Inode, offset.Offset, formatTime(offset.UpdatedAt), ) if err != nil { return fmt.Errorf("upsert source offset: %w", err) } return nil } func (s *Store) listEventsForIP(ctx context.Context, ip string, limit int) ([]model.Event, error) { query := ` SELECT e.id, e.source_name, e.profile_name, e.occurred_at, e.remote_ip, e.client_ip, e.host, e.method, e.uri, e.path, e.status, e.user_agent, e.decision, e.decision_reason, e.decision_reasons_json, e.enforced, e.raw_json, e.created_at, COALESCE(s.state, ''), COALESCE(s.manual_override, '') FROM events e LEFT JOIN ip_state s ON s.ip = e.client_ip WHERE e.client_ip = ? ORDER BY e.occurred_at DESC, e.id DESC` args := []any{ip} if limit > 0 { query += ` LIMIT ?` args = append(args, limit) } rows, err := s.db.QueryContext(ctx, query, args...) if err != nil { return nil, fmt.Errorf("list events for ip %q: %w", ip, err) } defer rows.Close() items := make([]model.Event, 0, max(limit, 0)) for rows.Next() { item, err := scanEvent(rows) if err != nil { return nil, err } items = append(items, item) } if err := rows.Err(); err != nil { return nil, fmt.Errorf("iterate events for ip %q: %w", ip, err) } return items, nil } func (s *Store) listDecisionsForIP(ctx context.Context, ip string, limit int) ([]model.DecisionRecord, error) { if limit <= 0 { limit = 50 } rows, err := s.db.QueryContext(ctx, ` SELECT id, event_id, ip, source_name, kind, action, reason, actor, enforced, created_at FROM decisions WHERE ip = ? ORDER BY created_at DESC, id DESC LIMIT ?`, ip, limit, ) if err != nil { return nil, fmt.Errorf("list decisions for ip %q: %w", ip, err) } defer rows.Close() items := make([]model.DecisionRecord, 0, limit) for rows.Next() { var item model.DecisionRecord var action string var enforced int var createdAt string if err := rows.Scan(&item.ID, &item.EventID, &item.IP, &item.SourceName, &item.Kind, &action, &item.Reason, &item.Actor, &enforced, &createdAt); err != nil { return nil, fmt.Errorf("scan decision record: %w", err) } parsed, err := parseTime(createdAt) if err != nil { return nil, fmt.Errorf("parse decision created_at: %w", err) } item.Action = model.DecisionAction(action) item.Enforced = enforced != 0 item.CreatedAt = parsed items = append(items, item) } if err := rows.Err(); err != nil { return nil, fmt.Errorf("iterate decisions for ip %q: %w", ip, err) } return items, nil } func (s *Store) listBackendActionsForIP(ctx context.Context, ip string, limit int) ([]model.OPNsenseAction, error) { if limit <= 0 { limit = 50 } rows, err := s.db.QueryContext(ctx, ` SELECT id, ip, action, result, message, created_at FROM backend_actions WHERE ip = ? ORDER BY created_at DESC, id DESC LIMIT ?`, ip, limit, ) if err != nil { return nil, fmt.Errorf("list backend actions for ip %q: %w", ip, err) } defer rows.Close() items := make([]model.OPNsenseAction, 0, limit) for rows.Next() { var item model.OPNsenseAction var createdAt string if err := rows.Scan(&item.ID, &item.IP, &item.Action, &item.Result, &item.Message, &createdAt); err != nil { return nil, fmt.Errorf("scan backend action: %w", err) } parsed, err := parseTime(createdAt) if err != nil { return nil, fmt.Errorf("parse backend action created_at: %w", err) } item.CreatedAt = parsed items = append(items, item) } if err := rows.Err(); err != nil { return nil, fmt.Errorf("iterate backend actions for ip %q: %w", ip, err) } return items, nil } func max(left int, right int) int { if left > right { return left } return right } func getIPStateDB(ctx context.Context, db queryer, ip string) (model.IPState, bool, error) { row := db.QueryRowContext(ctx, ` SELECT ip, first_seen_at, last_seen_at, last_source_name, last_user_agent, latest_status, total_events, state, state_reason, manual_override, last_event_id, updated_at FROM ip_state WHERE ip = ?`, ip) var item model.IPState var firstSeenAt string var lastSeenAt string var updatedAt string var state string var manualOverride string if err := row.Scan( &item.IP, &firstSeenAt, &lastSeenAt, &item.LastSourceName, &item.LastUserAgent, &item.LatestStatus, &item.TotalEvents, &state, &item.StateReason, &manualOverride, &item.LastEventID, &updatedAt, ); err != nil { if errors.Is(err, sql.ErrNoRows) { return model.IPState{}, false, nil } return model.IPState{}, false, fmt.Errorf("query ip state %q: %w", ip, err) } var err error item.FirstSeenAt, err = parseTime(firstSeenAt) if err != nil { return model.IPState{}, false, fmt.Errorf("parse ip state first_seen_at: %w", err) } item.LastSeenAt, err = parseTime(lastSeenAt) if err != nil { return model.IPState{}, false, fmt.Errorf("parse ip state last_seen_at: %w", err) } item.UpdatedAt, err = parseTime(updatedAt) if err != nil { return model.IPState{}, false, fmt.Errorf("parse ip state updated_at: %w", err) } item.State = model.IPStateStatus(state) item.ManualOverride = model.ManualOverride(manualOverride) return item, true, nil } func getIPStateTx(tx *sql.Tx, ip string) (model.IPState, bool, error) { return getIPStateDB(context.Background(), tx, ip) } func upsertIPStateTx(ctx context.Context, tx *sql.Tx, state model.IPState) error { if state.UpdatedAt.IsZero() { state.UpdatedAt = time.Now().UTC() } if state.FirstSeenAt.IsZero() { state.FirstSeenAt = state.UpdatedAt } if state.LastSeenAt.IsZero() { state.LastSeenAt = state.UpdatedAt } if state.State == "" { state.State = model.IPStateObserved } if state.ManualOverride == "" { state.ManualOverride = model.ManualOverrideNone } _, err := tx.ExecContext( ctx, `INSERT INTO ip_state ( ip, first_seen_at, last_seen_at, last_source_name, last_user_agent, latest_status, total_events, state, state_reason, manual_override, last_event_id, updated_at ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) ON CONFLICT(ip) DO UPDATE SET first_seen_at = excluded.first_seen_at, last_seen_at = excluded.last_seen_at, last_source_name = excluded.last_source_name, last_user_agent = excluded.last_user_agent, latest_status = excluded.latest_status, total_events = excluded.total_events, state = excluded.state, state_reason = excluded.state_reason, manual_override = excluded.manual_override, last_event_id = excluded.last_event_id, updated_at = excluded.updated_at`, state.IP, formatTime(state.FirstSeenAt), formatTime(state.LastSeenAt), state.LastSourceName, state.LastUserAgent, state.LatestStatus, state.TotalEvents, string(state.State), state.StateReason, string(state.ManualOverride), state.LastEventID, formatTime(state.UpdatedAt), ) if err != nil { return fmt.Errorf("upsert ip state %q: %w", state.IP, err) } return nil } func mergeEventIntoState(existing model.IPState, found bool, event model.Event) model.IPState { now := time.Now().UTC() state := existing if !found { state = model.IPState{ IP: event.ClientIP, FirstSeenAt: event.OccurredAt, LastSeenAt: event.OccurredAt, LastSourceName: event.SourceName, LastUserAgent: event.UserAgent, LatestStatus: event.Status, TotalEvents: 0, State: model.IPStateObserved, StateReason: "", ManualOverride: model.ManualOverrideNone, LastEventID: 0, UpdatedAt: now, } } if state.FirstSeenAt.IsZero() || event.OccurredAt.Before(state.FirstSeenAt) { state.FirstSeenAt = event.OccurredAt } if state.LastSeenAt.IsZero() || event.OccurredAt.After(state.LastSeenAt) { state.LastSeenAt = event.OccurredAt } state.LastSourceName = event.SourceName state.LastUserAgent = event.UserAgent state.LatestStatus = event.Status state.TotalEvents++ state.LastEventID = event.ID state.UpdatedAt = now if state.ManualOverride == "" { state.ManualOverride = model.ManualOverrideNone } switch state.ManualOverride { case model.ManualOverrideForceBlock: state.State = model.IPStateBlocked if event.DecisionReason != "" { state.StateReason = event.DecisionReason } else if state.StateReason == "" { state.StateReason = "manual override: block" } return state case model.ManualOverrideForceAllow: state.State = model.IPStateAllowed if event.DecisionReason != "" { state.StateReason = event.DecisionReason } else if state.StateReason == "" { state.StateReason = "manual override: allow" } return state } switch event.Decision { case model.DecisionActionBlock: state.State = model.IPStateBlocked state.StateReason = event.DecisionReason case model.DecisionActionReview: if state.State != model.IPStateBlocked && state.State != model.IPStateAllowed { state.State = model.IPStateReview state.StateReason = event.DecisionReason } case model.DecisionActionAllow: state.State = model.IPStateAllowed state.StateReason = event.DecisionReason default: if state.State == "" { state.State = model.IPStateObserved } } return state } func scanEvent(scanner interface{ Scan(dest ...any) error }) (model.Event, error) { var item model.Event var occurredAt string var createdAt string var decision string var decisionReasonsJSON string var enforced int var currentState string var manualOverride string if err := scanner.Scan( &item.ID, &item.SourceName, &item.ProfileName, &occurredAt, &item.RemoteIP, &item.ClientIP, &item.Host, &item.Method, &item.URI, &item.Path, &item.Status, &item.UserAgent, &decision, &item.DecisionReason, &decisionReasonsJSON, &enforced, &item.RawJSON, &createdAt, ¤tState, &manualOverride, ); err != nil { return model.Event{}, fmt.Errorf("scan event: %w", err) } parsedOccurredAt, err := parseTime(occurredAt) if err != nil { return model.Event{}, fmt.Errorf("parse event occurred_at: %w", err) } parsedCreatedAt, err := parseTime(createdAt) if err != nil { return model.Event{}, fmt.Errorf("parse event created_at: %w", err) } var reasons []string if strings.TrimSpace(decisionReasonsJSON) != "" { if err := json.Unmarshal([]byte(decisionReasonsJSON), &reasons); err != nil { return model.Event{}, fmt.Errorf("decode event decision_reasons_json: %w", err) } } item.OccurredAt = parsedOccurredAt item.CreatedAt = parsedCreatedAt item.Decision = model.DecisionAction(decision) item.DecisionReasons = reasons item.Enforced = enforced != 0 item.CurrentState = model.IPStateStatus(currentState) item.ManualOverride = model.ManualOverride(manualOverride) return item, nil } func scanIPState(scanner interface{ Scan(dest ...any) error }) (model.IPState, error) { var item model.IPState var firstSeenAt string var lastSeenAt string var updatedAt string var state string var manualOverride string if err := scanner.Scan( &item.IP, &firstSeenAt, &lastSeenAt, &item.LastSourceName, &item.LastUserAgent, &item.LatestStatus, &item.TotalEvents, &state, &item.StateReason, &manualOverride, &item.LastEventID, &updatedAt, ); err != nil { return model.IPState{}, fmt.Errorf("scan ip state: %w", err) } var err error item.FirstSeenAt, err = parseTime(firstSeenAt) if err != nil { return model.IPState{}, fmt.Errorf("parse ip state first_seen_at: %w", err) } item.LastSeenAt, err = parseTime(lastSeenAt) if err != nil { return model.IPState{}, fmt.Errorf("parse ip state last_seen_at: %w", err) } item.UpdatedAt, err = parseTime(updatedAt) if err != nil { return model.IPState{}, fmt.Errorf("parse ip state updated_at: %w", err) } item.State = model.IPStateStatus(state) item.ManualOverride = model.ManualOverride(manualOverride) return item, nil } func parseTime(value string) (time.Time, error) { trimmed := strings.TrimSpace(value) if trimmed == "" { return time.Time{}, nil } parsed, err := time.Parse(time.RFC3339Nano, trimmed) if err != nil { return time.Time{}, err } return parsed.UTC(), nil } func formatTime(value time.Time) string { if value.IsZero() { return "" } return value.UTC().Format(time.RFC3339Nano) } func boolToInt(value bool) int { if value { return 1 } return 0 } func uniqueNonEmptyStrings(items []string) []string { seen := make(map[string]struct{}, len(items)) result := make([]string, 0, len(items)) for _, item := range items { trimmed := strings.TrimSpace(item) if trimmed == "" { continue } if _, ok := seen[trimmed]; ok { continue } seen[trimmed] = struct{}{} result = append(result, trimmed) } sort.Strings(result) return result } type queryer interface { QueryRowContext(ctx context.Context, query string, args ...any) *sql.Row }