You've already forked caddy-opnsense-blocker
1429 lines
41 KiB
Go
1429 lines
41 KiB
Go
package store
|
|
|
|
import (
|
|
"context"
|
|
"database/sql"
|
|
"encoding/json"
|
|
"errors"
|
|
"fmt"
|
|
"os"
|
|
"path/filepath"
|
|
"sort"
|
|
"strings"
|
|
"time"
|
|
|
|
"git.dern.ovh/infrastructure/caddy-opnsense-blocker/internal/model"
|
|
_ "modernc.org/sqlite"
|
|
)
|
|
|
|
const schema = `
|
|
CREATE TABLE IF NOT EXISTS events (
|
|
id INTEGER PRIMARY KEY AUTOINCREMENT,
|
|
source_name TEXT NOT NULL,
|
|
profile_name TEXT NOT NULL,
|
|
occurred_at TEXT NOT NULL,
|
|
remote_ip TEXT NOT NULL,
|
|
client_ip TEXT NOT NULL,
|
|
host TEXT NOT NULL,
|
|
method TEXT NOT NULL,
|
|
uri TEXT NOT NULL,
|
|
path TEXT NOT NULL,
|
|
status INTEGER NOT NULL,
|
|
user_agent TEXT NOT NULL,
|
|
decision TEXT NOT NULL,
|
|
decision_reason TEXT NOT NULL,
|
|
decision_reasons_json TEXT NOT NULL,
|
|
enforced INTEGER NOT NULL DEFAULT 0,
|
|
raw_json TEXT NOT NULL,
|
|
created_at TEXT NOT NULL
|
|
);
|
|
|
|
CREATE INDEX IF NOT EXISTS idx_events_occurred_at ON events(occurred_at DESC, id DESC);
|
|
CREATE INDEX IF NOT EXISTS idx_events_client_ip ON events(client_ip, occurred_at DESC, id DESC);
|
|
CREATE INDEX IF NOT EXISTS idx_events_source_name ON events(source_name, occurred_at DESC, id DESC);
|
|
CREATE INDEX IF NOT EXISTS idx_events_decision ON events(decision, occurred_at DESC, id DESC);
|
|
|
|
CREATE TABLE IF NOT EXISTS ip_state (
|
|
ip TEXT PRIMARY KEY,
|
|
first_seen_at TEXT NOT NULL,
|
|
last_seen_at TEXT NOT NULL,
|
|
last_source_name TEXT NOT NULL,
|
|
last_user_agent TEXT NOT NULL,
|
|
latest_status INTEGER NOT NULL,
|
|
total_events INTEGER NOT NULL,
|
|
state TEXT NOT NULL,
|
|
state_reason TEXT NOT NULL,
|
|
manual_override TEXT NOT NULL,
|
|
last_event_id INTEGER NOT NULL,
|
|
updated_at TEXT NOT NULL
|
|
);
|
|
|
|
CREATE INDEX IF NOT EXISTS idx_ip_state_last_seen ON ip_state(last_seen_at DESC, ip ASC);
|
|
CREATE INDEX IF NOT EXISTS idx_ip_state_state ON ip_state(state, last_seen_at DESC, ip ASC);
|
|
|
|
CREATE TABLE IF NOT EXISTS decisions (
|
|
id INTEGER PRIMARY KEY AUTOINCREMENT,
|
|
event_id INTEGER NOT NULL,
|
|
ip TEXT NOT NULL,
|
|
source_name TEXT NOT NULL,
|
|
kind TEXT NOT NULL,
|
|
action TEXT NOT NULL,
|
|
reason TEXT NOT NULL,
|
|
actor TEXT NOT NULL,
|
|
enforced INTEGER NOT NULL DEFAULT 0,
|
|
created_at TEXT NOT NULL
|
|
);
|
|
|
|
CREATE INDEX IF NOT EXISTS idx_decisions_ip ON decisions(ip, created_at DESC, id DESC);
|
|
CREATE INDEX IF NOT EXISTS idx_decisions_event_id ON decisions(event_id, created_at DESC, id DESC);
|
|
|
|
CREATE TABLE IF NOT EXISTS backend_actions (
|
|
id INTEGER PRIMARY KEY AUTOINCREMENT,
|
|
ip TEXT NOT NULL,
|
|
action TEXT NOT NULL,
|
|
result TEXT NOT NULL,
|
|
message TEXT NOT NULL,
|
|
created_at TEXT NOT NULL
|
|
);
|
|
|
|
CREATE INDEX IF NOT EXISTS idx_backend_actions_ip ON backend_actions(ip, created_at DESC, id DESC);
|
|
|
|
CREATE TABLE IF NOT EXISTS source_offsets (
|
|
source_name TEXT PRIMARY KEY,
|
|
path TEXT NOT NULL,
|
|
inode TEXT NOT NULL,
|
|
offset INTEGER NOT NULL,
|
|
updated_at TEXT NOT NULL
|
|
);
|
|
|
|
CREATE TABLE IF NOT EXISTS ip_investigations (
|
|
ip TEXT PRIMARY KEY,
|
|
payload_json TEXT NOT NULL,
|
|
updated_at TEXT NOT NULL
|
|
);
|
|
|
|
CREATE INDEX IF NOT EXISTS idx_ip_investigations_updated_at ON ip_investigations(updated_at DESC, ip ASC);
|
|
`
|
|
|
|
type Store struct {
|
|
db *sql.DB
|
|
}
|
|
|
|
func Open(path string) (*Store, error) {
|
|
if err := os.MkdirAll(filepath.Dir(path), 0o755); err != nil {
|
|
return nil, fmt.Errorf("create storage directory: %w", err)
|
|
}
|
|
|
|
db, err := sql.Open("sqlite", path)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("open sqlite database: %w", err)
|
|
}
|
|
db.SetMaxOpenConns(1)
|
|
db.SetMaxIdleConns(1)
|
|
db.SetConnMaxLifetime(0)
|
|
|
|
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
|
|
defer cancel()
|
|
|
|
for _, statement := range []string{
|
|
"PRAGMA journal_mode = WAL;",
|
|
"PRAGMA busy_timeout = 5000;",
|
|
"PRAGMA foreign_keys = ON;",
|
|
} {
|
|
if _, err := db.ExecContext(ctx, statement); err != nil {
|
|
db.Close()
|
|
return nil, fmt.Errorf("apply sqlite pragma %q: %w", statement, err)
|
|
}
|
|
}
|
|
if _, err := db.ExecContext(ctx, schema); err != nil {
|
|
db.Close()
|
|
return nil, fmt.Errorf("apply sqlite schema: %w", err)
|
|
}
|
|
|
|
return &Store{db: db}, nil
|
|
}
|
|
|
|
func (s *Store) Close() error {
|
|
if s == nil || s.db == nil {
|
|
return nil
|
|
}
|
|
return s.db.Close()
|
|
}
|
|
|
|
func (s *Store) RecordEvent(ctx context.Context, event *model.Event) error {
|
|
if event == nil {
|
|
return errors.New("nil event")
|
|
}
|
|
if event.OccurredAt.IsZero() {
|
|
event.OccurredAt = time.Now().UTC()
|
|
}
|
|
if event.CreatedAt.IsZero() {
|
|
event.CreatedAt = time.Now().UTC()
|
|
}
|
|
encodedReasons, err := json.Marshal(event.DecisionReasons)
|
|
if err != nil {
|
|
return fmt.Errorf("encode decision reasons: %w", err)
|
|
}
|
|
|
|
tx, err := s.db.BeginTx(ctx, nil)
|
|
if err != nil {
|
|
return fmt.Errorf("begin transaction: %w", err)
|
|
}
|
|
defer tx.Rollback()
|
|
|
|
state, found, err := getIPStateTx(tx, event.ClientIP)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
result, err := tx.ExecContext(
|
|
ctx,
|
|
`INSERT INTO events (
|
|
source_name, profile_name, occurred_at, remote_ip, client_ip, host, method, uri, path,
|
|
status, user_agent, decision, decision_reason, decision_reasons_json, enforced, raw_json, created_at
|
|
) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)`,
|
|
event.SourceName,
|
|
event.ProfileName,
|
|
formatTime(event.OccurredAt),
|
|
event.RemoteIP,
|
|
event.ClientIP,
|
|
event.Host,
|
|
event.Method,
|
|
event.URI,
|
|
event.Path,
|
|
event.Status,
|
|
event.UserAgent,
|
|
string(event.Decision),
|
|
event.DecisionReason,
|
|
string(encodedReasons),
|
|
boolToInt(event.Enforced),
|
|
event.RawJSON,
|
|
formatTime(event.CreatedAt),
|
|
)
|
|
if err != nil {
|
|
return fmt.Errorf("insert event: %w", err)
|
|
}
|
|
eventID, err := result.LastInsertId()
|
|
if err != nil {
|
|
return fmt.Errorf("load inserted event id: %w", err)
|
|
}
|
|
event.ID = eventID
|
|
|
|
updatedState := mergeEventIntoState(state, found, *event)
|
|
event.CurrentState = updatedState.State
|
|
event.ManualOverride = updatedState.ManualOverride
|
|
|
|
if err := upsertIPStateTx(ctx, tx, updatedState); err != nil {
|
|
return err
|
|
}
|
|
|
|
if err := tx.Commit(); err != nil {
|
|
return fmt.Errorf("commit event transaction: %w", err)
|
|
}
|
|
return nil
|
|
}
|
|
|
|
const responseBytesExpression = `CASE WHEN json_valid(e.raw_json) THEN CAST(COALESCE(json_extract(e.raw_json, '$.size'), 0) AS INTEGER) ELSE 0 END`
|
|
|
|
func overviewFilterQueryParts(options model.OverviewOptions) (joins []string, clauses []string) {
|
|
if !options.ShowAllowed {
|
|
joins = append(joins, `LEFT JOIN ip_state s ON s.ip = e.client_ip`)
|
|
clauses = append(clauses, `COALESCE(s.state, '') <> '`+string(model.IPStateAllowed)+`'`)
|
|
}
|
|
if !options.ShowKnownBots {
|
|
clauses = append(clauses, `NOT EXISTS (
|
|
SELECT 1
|
|
FROM ip_investigations i
|
|
WHERE i.ip = e.client_ip
|
|
AND json_valid(i.payload_json)
|
|
AND json_type(i.payload_json, '$.bot') IS NOT NULL
|
|
)`)
|
|
}
|
|
return joins, clauses
|
|
}
|
|
|
|
func (s *Store) AddDecision(ctx context.Context, decision *model.DecisionRecord) error {
|
|
if decision == nil {
|
|
return errors.New("nil decision record")
|
|
}
|
|
if decision.CreatedAt.IsZero() {
|
|
decision.CreatedAt = time.Now().UTC()
|
|
}
|
|
result, err := s.db.ExecContext(
|
|
ctx,
|
|
`INSERT INTO decisions (event_id, ip, source_name, kind, action, reason, actor, enforced, created_at)
|
|
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)`,
|
|
decision.EventID,
|
|
decision.IP,
|
|
decision.SourceName,
|
|
decision.Kind,
|
|
string(decision.Action),
|
|
decision.Reason,
|
|
decision.Actor,
|
|
boolToInt(decision.Enforced),
|
|
formatTime(decision.CreatedAt),
|
|
)
|
|
if err != nil {
|
|
return fmt.Errorf("insert decision record: %w", err)
|
|
}
|
|
decision.ID, err = result.LastInsertId()
|
|
if err != nil {
|
|
return fmt.Errorf("load inserted decision id: %w", err)
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func (s *Store) AddBackendAction(ctx context.Context, action *model.OPNsenseAction) error {
|
|
if action == nil {
|
|
return errors.New("nil backend action")
|
|
}
|
|
if action.CreatedAt.IsZero() {
|
|
action.CreatedAt = time.Now().UTC()
|
|
}
|
|
result, err := s.db.ExecContext(
|
|
ctx,
|
|
`INSERT INTO backend_actions (ip, action, result, message, created_at)
|
|
VALUES (?, ?, ?, ?, ?)`,
|
|
action.IP,
|
|
action.Action,
|
|
action.Result,
|
|
action.Message,
|
|
formatTime(action.CreatedAt),
|
|
)
|
|
if err != nil {
|
|
return fmt.Errorf("insert backend action: %w", err)
|
|
}
|
|
action.ID, err = result.LastInsertId()
|
|
if err != nil {
|
|
return fmt.Errorf("load inserted backend action id: %w", err)
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func (s *Store) GetIPState(ctx context.Context, ip string) (model.IPState, bool, error) {
|
|
return getIPStateDB(ctx, s.db, ip)
|
|
}
|
|
|
|
func (s *Store) SetManualOverride(ctx context.Context, ip string, override model.ManualOverride, state model.IPStateStatus, reason string) (model.IPState, error) {
|
|
tx, err := s.db.BeginTx(ctx, nil)
|
|
if err != nil {
|
|
return model.IPState{}, fmt.Errorf("begin transaction: %w", err)
|
|
}
|
|
defer tx.Rollback()
|
|
|
|
current, found, err := getIPStateTx(tx, ip)
|
|
if err != nil {
|
|
return model.IPState{}, err
|
|
}
|
|
now := time.Now().UTC()
|
|
if !found {
|
|
current = model.IPState{
|
|
IP: ip,
|
|
FirstSeenAt: now,
|
|
LastSeenAt: now,
|
|
LastSourceName: "",
|
|
LastUserAgent: "",
|
|
LatestStatus: 0,
|
|
TotalEvents: 0,
|
|
State: state,
|
|
StateReason: strings.TrimSpace(reason),
|
|
ManualOverride: override,
|
|
LastEventID: 0,
|
|
UpdatedAt: now,
|
|
}
|
|
} else {
|
|
current.ManualOverride = override
|
|
current.State = state
|
|
if strings.TrimSpace(reason) != "" {
|
|
current.StateReason = strings.TrimSpace(reason)
|
|
}
|
|
current.UpdatedAt = now
|
|
}
|
|
if err := upsertIPStateTx(ctx, tx, current); err != nil {
|
|
return model.IPState{}, err
|
|
}
|
|
if err := tx.Commit(); err != nil {
|
|
return model.IPState{}, fmt.Errorf("commit transaction: %w", err)
|
|
}
|
|
return current, nil
|
|
}
|
|
|
|
func (s *Store) ClearManualOverride(ctx context.Context, ip string, reason string) (model.IPState, error) {
|
|
tx, err := s.db.BeginTx(ctx, nil)
|
|
if err != nil {
|
|
return model.IPState{}, fmt.Errorf("begin transaction: %w", err)
|
|
}
|
|
defer tx.Rollback()
|
|
|
|
current, found, err := getIPStateTx(tx, ip)
|
|
if err != nil {
|
|
return model.IPState{}, err
|
|
}
|
|
now := time.Now().UTC()
|
|
if !found {
|
|
current = model.IPState{
|
|
IP: ip,
|
|
FirstSeenAt: now,
|
|
LastSeenAt: now,
|
|
State: model.IPStateObserved,
|
|
StateReason: strings.TrimSpace(reason),
|
|
ManualOverride: model.ManualOverrideNone,
|
|
UpdatedAt: now,
|
|
}
|
|
} else {
|
|
current.ManualOverride = model.ManualOverrideNone
|
|
if current.State == "" {
|
|
current.State = model.IPStateObserved
|
|
}
|
|
if strings.TrimSpace(reason) != "" {
|
|
current.StateReason = strings.TrimSpace(reason)
|
|
}
|
|
current.UpdatedAt = now
|
|
}
|
|
if err := upsertIPStateTx(ctx, tx, current); err != nil {
|
|
return model.IPState{}, err
|
|
}
|
|
if err := tx.Commit(); err != nil {
|
|
return model.IPState{}, fmt.Errorf("commit transaction: %w", err)
|
|
}
|
|
return current, nil
|
|
}
|
|
|
|
func (s *Store) GetOverview(ctx context.Context, since time.Time, limit int, options model.OverviewOptions) (model.Overview, error) {
|
|
if limit <= 0 {
|
|
limit = 50
|
|
}
|
|
var overview model.Overview
|
|
if !since.IsZero() {
|
|
overview.ActivitySince = since.UTC()
|
|
}
|
|
if err := s.db.QueryRowContext(ctx, `SELECT COUNT(*) FROM events`).Scan(&overview.TotalEvents); err != nil {
|
|
return model.Overview{}, fmt.Errorf("count events: %w", err)
|
|
}
|
|
if err := s.db.QueryRowContext(ctx, `SELECT COUNT(*) FROM ip_state`).Scan(&overview.TotalIPs); err != nil {
|
|
return model.Overview{}, fmt.Errorf("count ip states: %w", err)
|
|
}
|
|
if err := s.db.QueryRowContext(ctx, `SELECT COUNT(*) FROM ip_state WHERE state = ?`, string(model.IPStateBlocked)).Scan(&overview.BlockedIPs); err != nil {
|
|
return model.Overview{}, fmt.Errorf("count blocked ip states: %w", err)
|
|
}
|
|
if err := s.db.QueryRowContext(ctx, `SELECT COUNT(*) FROM ip_state WHERE state = ?`, string(model.IPStateReview)).Scan(&overview.ReviewIPs); err != nil {
|
|
return model.Overview{}, fmt.Errorf("count review ip states: %w", err)
|
|
}
|
|
if err := s.db.QueryRowContext(ctx, `SELECT COUNT(*) FROM ip_state WHERE state = ?`, string(model.IPStateAllowed)).Scan(&overview.AllowedIPs); err != nil {
|
|
return model.Overview{}, fmt.Errorf("count allowed ip states: %w", err)
|
|
}
|
|
if err := s.db.QueryRowContext(ctx, `SELECT COUNT(*) FROM ip_state WHERE state = ?`, string(model.IPStateObserved)).Scan(&overview.ObservedIPs); err != nil {
|
|
return model.Overview{}, fmt.Errorf("count observed ip states: %w", err)
|
|
}
|
|
|
|
recentIPs, err := s.ListIPStates(ctx, limit, "")
|
|
if err != nil {
|
|
return model.Overview{}, err
|
|
}
|
|
recentEvents, err := s.ListRecentEvents(ctx, limit)
|
|
if err != nil {
|
|
return model.Overview{}, err
|
|
}
|
|
topIPsByEvents, err := s.listTopIPRows(ctx, since, limit, "events", options)
|
|
if err != nil {
|
|
return model.Overview{}, err
|
|
}
|
|
topIPsByTraffic, err := s.listTopIPRows(ctx, since, limit, "traffic", options)
|
|
if err != nil {
|
|
return model.Overview{}, err
|
|
}
|
|
topSources, err := s.listTopSourceRows(ctx, since, limit, options)
|
|
if err != nil {
|
|
return model.Overview{}, err
|
|
}
|
|
topURLs, err := s.listTopURLRows(ctx, since, limit, options)
|
|
if err != nil {
|
|
return model.Overview{}, err
|
|
}
|
|
overview.RecentIPs = recentIPs
|
|
overview.RecentEvents = recentEvents
|
|
overview.TopIPsByEvents = topIPsByEvents
|
|
overview.TopIPsByTraffic = topIPsByTraffic
|
|
overview.TopSources = topSources
|
|
overview.TopURLs = topURLs
|
|
return overview, nil
|
|
}
|
|
|
|
func (s *Store) listTopIPRows(ctx context.Context, since time.Time, limit int, orderBy string, options model.OverviewOptions) ([]model.TopIPRow, error) {
|
|
if limit <= 0 {
|
|
limit = 10
|
|
}
|
|
joins, clauses := overviewFilterQueryParts(options)
|
|
query := fmt.Sprintf(`
|
|
SELECT e.client_ip,
|
|
COUNT(*) AS event_count,
|
|
COALESCE(SUM(%s), 0) AS traffic_bytes,
|
|
MAX(e.occurred_at) AS last_seen_at
|
|
FROM events e`, responseBytesExpression)
|
|
if len(joins) > 0 {
|
|
query += ` ` + strings.Join(joins, ` `)
|
|
}
|
|
args := make([]any, 0, 2)
|
|
if !since.IsZero() {
|
|
clauses = append([]string{`e.occurred_at >= ?`}, clauses...)
|
|
args = append(args, formatTime(since))
|
|
}
|
|
if len(clauses) > 0 {
|
|
query += ` WHERE ` + strings.Join(clauses, ` AND `)
|
|
}
|
|
query += ` GROUP BY e.client_ip`
|
|
switch orderBy {
|
|
case "traffic":
|
|
query += ` ORDER BY traffic_bytes DESC, event_count DESC, last_seen_at DESC, e.client_ip ASC`
|
|
default:
|
|
query += ` ORDER BY event_count DESC, traffic_bytes DESC, last_seen_at DESC, e.client_ip ASC`
|
|
}
|
|
query += ` LIMIT ?`
|
|
args = append(args, limit)
|
|
|
|
rows, err := s.db.QueryContext(ctx, query, args...)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("list top ip rows by %s: %w", orderBy, err)
|
|
}
|
|
defer rows.Close()
|
|
|
|
items := make([]model.TopIPRow, 0, limit)
|
|
for rows.Next() {
|
|
var item model.TopIPRow
|
|
var lastSeenAt string
|
|
if err := rows.Scan(&item.IP, &item.Events, &item.TrafficBytes, &lastSeenAt); err != nil {
|
|
return nil, fmt.Errorf("scan top ip row: %w", err)
|
|
}
|
|
parsed, err := parseTime(lastSeenAt)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("parse top ip row last_seen_at: %w", err)
|
|
}
|
|
item.LastSeenAt = parsed
|
|
items = append(items, item)
|
|
}
|
|
if err := rows.Err(); err != nil {
|
|
return nil, fmt.Errorf("iterate top ip rows by %s: %w", orderBy, err)
|
|
}
|
|
return items, nil
|
|
}
|
|
|
|
func (s *Store) listTopSourceRows(ctx context.Context, since time.Time, limit int, options model.OverviewOptions) ([]model.TopSourceRow, error) {
|
|
if limit <= 0 {
|
|
limit = 10
|
|
}
|
|
joins, clauses := overviewFilterQueryParts(options)
|
|
query := fmt.Sprintf(`
|
|
SELECT e.source_name,
|
|
COUNT(*) AS event_count,
|
|
COALESCE(SUM(%s), 0) AS traffic_bytes,
|
|
MAX(e.occurred_at) AS last_seen_at
|
|
FROM events e`, responseBytesExpression)
|
|
if len(joins) > 0 {
|
|
query += ` ` + strings.Join(joins, ` `)
|
|
}
|
|
args := make([]any, 0, 2)
|
|
if !since.IsZero() {
|
|
clauses = append([]string{`e.occurred_at >= ?`}, clauses...)
|
|
args = append(args, formatTime(since))
|
|
}
|
|
if len(clauses) > 0 {
|
|
query += ` WHERE ` + strings.Join(clauses, ` AND `)
|
|
}
|
|
query += ` GROUP BY e.source_name ORDER BY event_count DESC, traffic_bytes DESC, last_seen_at DESC, e.source_name ASC LIMIT ?`
|
|
args = append(args, limit)
|
|
|
|
rows, err := s.db.QueryContext(ctx, query, args...)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("list top source rows: %w", err)
|
|
}
|
|
defer rows.Close()
|
|
|
|
items := make([]model.TopSourceRow, 0, limit)
|
|
for rows.Next() {
|
|
var item model.TopSourceRow
|
|
var lastSeenAt string
|
|
if err := rows.Scan(&item.SourceName, &item.Events, &item.TrafficBytes, &lastSeenAt); err != nil {
|
|
return nil, fmt.Errorf("scan top source row: %w", err)
|
|
}
|
|
parsed, err := parseTime(lastSeenAt)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("parse top source row last_seen_at: %w", err)
|
|
}
|
|
item.LastSeenAt = parsed
|
|
items = append(items, item)
|
|
}
|
|
if err := rows.Err(); err != nil {
|
|
return nil, fmt.Errorf("iterate top source rows: %w", err)
|
|
}
|
|
return items, nil
|
|
}
|
|
|
|
func (s *Store) listTopURLRows(ctx context.Context, since time.Time, limit int, options model.OverviewOptions) ([]model.TopURLRow, error) {
|
|
if limit <= 0 {
|
|
limit = 10
|
|
}
|
|
joins, clauses := overviewFilterQueryParts(options)
|
|
query := fmt.Sprintf(`
|
|
SELECT e.host,
|
|
e.uri,
|
|
COUNT(*) AS event_count,
|
|
COALESCE(SUM(%s), 0) AS traffic_bytes,
|
|
MAX(e.occurred_at) AS last_seen_at
|
|
FROM events e`, responseBytesExpression)
|
|
if len(joins) > 0 {
|
|
query += ` ` + strings.Join(joins, ` `)
|
|
}
|
|
args := make([]any, 0, 2)
|
|
if !since.IsZero() {
|
|
clauses = append([]string{`e.occurred_at >= ?`}, clauses...)
|
|
args = append(args, formatTime(since))
|
|
}
|
|
if len(clauses) > 0 {
|
|
query += ` WHERE ` + strings.Join(clauses, ` AND `)
|
|
}
|
|
query += ` GROUP BY e.host, e.uri ORDER BY event_count DESC, traffic_bytes DESC, last_seen_at DESC, e.host ASC, e.uri ASC LIMIT ?`
|
|
args = append(args, limit)
|
|
|
|
rows, err := s.db.QueryContext(ctx, query, args...)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("list top url rows: %w", err)
|
|
}
|
|
defer rows.Close()
|
|
|
|
items := make([]model.TopURLRow, 0, limit)
|
|
for rows.Next() {
|
|
var item model.TopURLRow
|
|
var lastSeenAt string
|
|
if err := rows.Scan(&item.Host, &item.URI, &item.Events, &item.TrafficBytes, &lastSeenAt); err != nil {
|
|
return nil, fmt.Errorf("scan top url row: %w", err)
|
|
}
|
|
parsed, err := parseTime(lastSeenAt)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("parse top url row last_seen_at: %w", err)
|
|
}
|
|
item.LastSeenAt = parsed
|
|
items = append(items, item)
|
|
}
|
|
if err := rows.Err(); err != nil {
|
|
return nil, fmt.Errorf("iterate top url rows: %w", err)
|
|
}
|
|
return items, nil
|
|
}
|
|
|
|
func (s *Store) ListRecentEvents(ctx context.Context, limit int) ([]model.Event, error) {
|
|
if limit <= 0 {
|
|
limit = 50
|
|
}
|
|
rows, err := s.db.QueryContext(ctx, `
|
|
SELECT e.id, e.source_name, e.profile_name, e.occurred_at, e.remote_ip, e.client_ip, e.host,
|
|
e.method, e.uri, e.path, e.status, e.user_agent, e.decision, e.decision_reason,
|
|
e.decision_reasons_json, e.enforced, e.raw_json, e.created_at,
|
|
COALESCE(s.state, ''), COALESCE(s.manual_override, '')
|
|
FROM events e
|
|
LEFT JOIN ip_state s ON s.ip = e.client_ip
|
|
ORDER BY e.occurred_at DESC, e.id DESC
|
|
LIMIT ?`,
|
|
limit,
|
|
)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("list recent events: %w", err)
|
|
}
|
|
defer rows.Close()
|
|
|
|
items := make([]model.Event, 0, limit)
|
|
for rows.Next() {
|
|
item, err := scanEvent(rows)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
items = append(items, item)
|
|
}
|
|
if err := rows.Err(); err != nil {
|
|
return nil, fmt.Errorf("iterate recent events: %w", err)
|
|
}
|
|
return items, nil
|
|
}
|
|
|
|
func (s *Store) ListIPStates(ctx context.Context, limit int, stateFilter string) ([]model.IPState, error) {
|
|
if limit <= 0 {
|
|
limit = 50
|
|
}
|
|
query := `SELECT ip, first_seen_at, last_seen_at, last_source_name, last_user_agent, latest_status,
|
|
total_events, state, state_reason, manual_override, last_event_id, updated_at
|
|
FROM ip_state`
|
|
args := []any{}
|
|
if strings.TrimSpace(stateFilter) != "" {
|
|
query += ` WHERE state = ?`
|
|
args = append(args, strings.TrimSpace(stateFilter))
|
|
}
|
|
query += ` ORDER BY last_seen_at DESC, ip ASC LIMIT ?`
|
|
args = append(args, limit)
|
|
|
|
rows, err := s.db.QueryContext(ctx, query, args...)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("list ip states: %w", err)
|
|
}
|
|
defer rows.Close()
|
|
|
|
items := make([]model.IPState, 0, limit)
|
|
for rows.Next() {
|
|
item, err := scanIPState(rows)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
items = append(items, item)
|
|
}
|
|
if err := rows.Err(); err != nil {
|
|
return nil, fmt.Errorf("iterate ip states: %w", err)
|
|
}
|
|
return items, nil
|
|
}
|
|
|
|
func (s *Store) ListRecentIPRows(ctx context.Context, since time.Time, limit int) ([]model.RecentIPRow, error) {
|
|
if limit <= 0 {
|
|
limit = 200
|
|
}
|
|
rows, err := s.db.QueryContext(ctx, `
|
|
WITH recent AS (
|
|
SELECT client_ip, COUNT(*) AS event_count, MAX(occurred_at) AS last_seen_at
|
|
FROM events
|
|
WHERE occurred_at >= ?
|
|
GROUP BY client_ip
|
|
)
|
|
SELECT s.ip,
|
|
COALESCE((
|
|
SELECT e.source_name
|
|
FROM events e
|
|
WHERE e.client_ip = s.ip AND e.occurred_at >= ?
|
|
ORDER BY e.occurred_at DESC, e.id DESC
|
|
LIMIT 1
|
|
), s.last_source_name) AS source_name,
|
|
s.state,
|
|
recent.event_count,
|
|
recent.last_seen_at,
|
|
s.state_reason,
|
|
s.manual_override
|
|
FROM recent
|
|
JOIN ip_state s ON s.ip = recent.client_ip
|
|
ORDER BY recent.event_count DESC, recent.last_seen_at DESC, s.ip ASC
|
|
LIMIT ?`,
|
|
formatTime(since),
|
|
formatTime(since),
|
|
limit,
|
|
)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("list recent ip rows: %w", err)
|
|
}
|
|
defer rows.Close()
|
|
|
|
items := make([]model.RecentIPRow, 0, limit)
|
|
for rows.Next() {
|
|
var item model.RecentIPRow
|
|
var state string
|
|
var lastSeenAt string
|
|
var manualOverride string
|
|
if err := rows.Scan(
|
|
&item.IP,
|
|
&item.SourceName,
|
|
&state,
|
|
&item.Events,
|
|
&lastSeenAt,
|
|
&item.Reason,
|
|
&manualOverride,
|
|
); err != nil {
|
|
return nil, fmt.Errorf("scan recent ip row: %w", err)
|
|
}
|
|
parsedLastSeenAt, err := parseTime(lastSeenAt)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("parse recent ip row last_seen_at: %w", err)
|
|
}
|
|
item.State = model.IPStateStatus(state)
|
|
item.LastSeenAt = parsedLastSeenAt
|
|
item.ManualOverride = model.ManualOverride(manualOverride)
|
|
items = append(items, item)
|
|
}
|
|
if err := rows.Err(); err != nil {
|
|
return nil, fmt.Errorf("iterate recent ip rows: %w", err)
|
|
}
|
|
return items, nil
|
|
}
|
|
|
|
func (s *Store) ListIPsWithoutInvestigation(ctx context.Context, since time.Time, limit int) ([]string, error) {
|
|
if limit <= 0 {
|
|
limit = 200
|
|
}
|
|
query := `
|
|
SELECT s.ip
|
|
FROM ip_state s
|
|
LEFT JOIN ip_investigations i ON i.ip = s.ip
|
|
WHERE i.ip IS NULL`
|
|
args := make([]any, 0, 2)
|
|
if !since.IsZero() {
|
|
query += ` AND s.last_seen_at >= ?`
|
|
args = append(args, formatTime(since))
|
|
}
|
|
query += ` ORDER BY s.last_seen_at DESC, s.ip ASC LIMIT ?`
|
|
args = append(args, limit)
|
|
|
|
rows, err := s.db.QueryContext(ctx, query, args...)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("list ips without investigation: %w", err)
|
|
}
|
|
defer rows.Close()
|
|
|
|
items := make([]string, 0, limit)
|
|
for rows.Next() {
|
|
var ip string
|
|
if err := rows.Scan(&ip); err != nil {
|
|
return nil, fmt.Errorf("scan ip without investigation: %w", err)
|
|
}
|
|
items = append(items, ip)
|
|
}
|
|
if err := rows.Err(); err != nil {
|
|
return nil, fmt.Errorf("iterate ips without investigation: %w", err)
|
|
}
|
|
return items, nil
|
|
}
|
|
|
|
func (s *Store) GetIPDetails(ctx context.Context, ip string, eventLimit, decisionLimit, actionLimit int) (model.IPDetails, error) {
|
|
state, _, err := s.GetIPState(ctx, ip)
|
|
if err != nil {
|
|
return model.IPDetails{}, err
|
|
}
|
|
events, err := s.listEventsForIP(ctx, ip, eventLimit)
|
|
if err != nil {
|
|
return model.IPDetails{}, err
|
|
}
|
|
decisions, err := s.listDecisionsForIP(ctx, ip, decisionLimit)
|
|
if err != nil {
|
|
return model.IPDetails{}, err
|
|
}
|
|
actions, err := s.listBackendActionsForIP(ctx, ip, actionLimit)
|
|
if err != nil {
|
|
return model.IPDetails{}, err
|
|
}
|
|
return model.IPDetails{
|
|
State: state,
|
|
RecentEvents: events,
|
|
Decisions: decisions,
|
|
BackendActions: actions,
|
|
}, nil
|
|
}
|
|
|
|
func (s *Store) GetInvestigation(ctx context.Context, ip string) (model.IPInvestigation, bool, error) {
|
|
row := s.db.QueryRowContext(ctx, `SELECT payload_json, updated_at FROM ip_investigations WHERE ip = ?`, ip)
|
|
var payload string
|
|
var updatedAt string
|
|
if err := row.Scan(&payload, &updatedAt); err != nil {
|
|
if errors.Is(err, sql.ErrNoRows) {
|
|
return model.IPInvestigation{}, false, nil
|
|
}
|
|
return model.IPInvestigation{}, false, fmt.Errorf("query ip investigation %q: %w", ip, err)
|
|
}
|
|
var item model.IPInvestigation
|
|
if err := json.Unmarshal([]byte(payload), &item); err != nil {
|
|
return model.IPInvestigation{}, false, fmt.Errorf("decode ip investigation %q: %w", ip, err)
|
|
}
|
|
parsed, err := parseTime(updatedAt)
|
|
if err != nil {
|
|
return model.IPInvestigation{}, false, fmt.Errorf("parse ip investigation updated_at: %w", err)
|
|
}
|
|
item.UpdatedAt = parsed
|
|
return item, true, nil
|
|
}
|
|
|
|
func (s *Store) GetInvestigationsForIPs(ctx context.Context, ips []string) (map[string]model.IPInvestigation, error) {
|
|
unique := uniqueNonEmptyStrings(ips)
|
|
if len(unique) == 0 {
|
|
return map[string]model.IPInvestigation{}, nil
|
|
}
|
|
placeholders := make([]string, len(unique))
|
|
args := make([]any, 0, len(unique))
|
|
for index, ip := range unique {
|
|
placeholders[index] = "?"
|
|
args = append(args, ip)
|
|
}
|
|
rows, err := s.db.QueryContext(ctx, fmt.Sprintf(
|
|
`SELECT ip, payload_json, updated_at FROM ip_investigations WHERE ip IN (%s)`,
|
|
strings.Join(placeholders, ", "),
|
|
), args...)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("query ip investigations: %w", err)
|
|
}
|
|
defer rows.Close()
|
|
|
|
items := make(map[string]model.IPInvestigation, len(unique))
|
|
for rows.Next() {
|
|
var ip string
|
|
var payload string
|
|
var updatedAt string
|
|
if err := rows.Scan(&ip, &payload, &updatedAt); err != nil {
|
|
return nil, fmt.Errorf("scan ip investigation: %w", err)
|
|
}
|
|
var item model.IPInvestigation
|
|
if err := json.Unmarshal([]byte(payload), &item); err != nil {
|
|
return nil, fmt.Errorf("decode ip investigation %q: %w", ip, err)
|
|
}
|
|
parsed, err := parseTime(updatedAt)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("parse ip investigation updated_at for %q: %w", ip, err)
|
|
}
|
|
item.UpdatedAt = parsed
|
|
items[ip] = item
|
|
}
|
|
if err := rows.Err(); err != nil {
|
|
return nil, fmt.Errorf("iterate ip investigations: %w", err)
|
|
}
|
|
return items, nil
|
|
}
|
|
|
|
func (s *Store) SaveInvestigation(ctx context.Context, item model.IPInvestigation) error {
|
|
if item.UpdatedAt.IsZero() {
|
|
item.UpdatedAt = time.Now().UTC()
|
|
}
|
|
payload, err := json.Marshal(item)
|
|
if err != nil {
|
|
return fmt.Errorf("encode ip investigation: %w", err)
|
|
}
|
|
_, err = s.db.ExecContext(
|
|
ctx,
|
|
`INSERT INTO ip_investigations (ip, payload_json, updated_at)
|
|
VALUES (?, ?, ?)
|
|
ON CONFLICT(ip) DO UPDATE SET
|
|
payload_json = excluded.payload_json,
|
|
updated_at = excluded.updated_at`,
|
|
item.IP,
|
|
string(payload),
|
|
formatTime(item.UpdatedAt),
|
|
)
|
|
if err != nil {
|
|
return fmt.Errorf("upsert ip investigation %q: %w", item.IP, err)
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func (s *Store) ListRecentUserAgentsForIP(ctx context.Context, ip string, limit int) ([]string, error) {
|
|
if limit <= 0 {
|
|
limit = 10
|
|
}
|
|
rows, err := s.db.QueryContext(ctx, `
|
|
SELECT user_agent
|
|
FROM events
|
|
WHERE client_ip = ? AND TRIM(user_agent) <> ''
|
|
GROUP BY user_agent
|
|
ORDER BY MAX(occurred_at) DESC, user_agent ASC
|
|
LIMIT ?`,
|
|
ip,
|
|
limit,
|
|
)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("list recent user agents for ip %q: %w", ip, err)
|
|
}
|
|
defer rows.Close()
|
|
|
|
items := make([]string, 0, limit)
|
|
for rows.Next() {
|
|
var userAgent string
|
|
if err := rows.Scan(&userAgent); err != nil {
|
|
return nil, fmt.Errorf("scan recent user agent for ip %q: %w", ip, err)
|
|
}
|
|
items = append(items, userAgent)
|
|
}
|
|
if err := rows.Err(); err != nil {
|
|
return nil, fmt.Errorf("iterate recent user agents for ip %q: %w", ip, err)
|
|
}
|
|
return items, nil
|
|
}
|
|
|
|
func (s *Store) GetSourceOffset(ctx context.Context, sourceName string) (model.SourceOffset, bool, error) {
|
|
row := s.db.QueryRowContext(ctx, `SELECT source_name, path, inode, offset, updated_at FROM source_offsets WHERE source_name = ?`, sourceName)
|
|
var offset model.SourceOffset
|
|
var updatedAt string
|
|
if err := row.Scan(&offset.SourceName, &offset.Path, &offset.Inode, &offset.Offset, &updatedAt); err != nil {
|
|
if errors.Is(err, sql.ErrNoRows) {
|
|
return model.SourceOffset{}, false, nil
|
|
}
|
|
return model.SourceOffset{}, false, fmt.Errorf("query source offset %q: %w", sourceName, err)
|
|
}
|
|
parsed, err := parseTime(updatedAt)
|
|
if err != nil {
|
|
return model.SourceOffset{}, false, fmt.Errorf("parse source offset updated_at: %w", err)
|
|
}
|
|
offset.UpdatedAt = parsed
|
|
return offset, true, nil
|
|
}
|
|
|
|
func (s *Store) SaveSourceOffset(ctx context.Context, offset model.SourceOffset) error {
|
|
if offset.UpdatedAt.IsZero() {
|
|
offset.UpdatedAt = time.Now().UTC()
|
|
}
|
|
_, err := s.db.ExecContext(
|
|
ctx,
|
|
`INSERT INTO source_offsets (source_name, path, inode, offset, updated_at)
|
|
VALUES (?, ?, ?, ?, ?)
|
|
ON CONFLICT(source_name) DO UPDATE SET
|
|
path = excluded.path,
|
|
inode = excluded.inode,
|
|
offset = excluded.offset,
|
|
updated_at = excluded.updated_at`,
|
|
offset.SourceName,
|
|
offset.Path,
|
|
offset.Inode,
|
|
offset.Offset,
|
|
formatTime(offset.UpdatedAt),
|
|
)
|
|
if err != nil {
|
|
return fmt.Errorf("upsert source offset: %w", err)
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func (s *Store) listEventsForIP(ctx context.Context, ip string, limit int) ([]model.Event, error) {
|
|
query := `
|
|
SELECT e.id, e.source_name, e.profile_name, e.occurred_at, e.remote_ip, e.client_ip, e.host,
|
|
e.method, e.uri, e.path, e.status, e.user_agent, e.decision, e.decision_reason,
|
|
e.decision_reasons_json, e.enforced, e.raw_json, e.created_at,
|
|
COALESCE(s.state, ''), COALESCE(s.manual_override, '')
|
|
FROM events e
|
|
LEFT JOIN ip_state s ON s.ip = e.client_ip
|
|
WHERE e.client_ip = ?
|
|
ORDER BY e.occurred_at DESC, e.id DESC`
|
|
args := []any{ip}
|
|
if limit > 0 {
|
|
query += ` LIMIT ?`
|
|
args = append(args, limit)
|
|
}
|
|
rows, err := s.db.QueryContext(ctx, query, args...)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("list events for ip %q: %w", ip, err)
|
|
}
|
|
defer rows.Close()
|
|
|
|
items := make([]model.Event, 0, max(limit, 0))
|
|
for rows.Next() {
|
|
item, err := scanEvent(rows)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
items = append(items, item)
|
|
}
|
|
if err := rows.Err(); err != nil {
|
|
return nil, fmt.Errorf("iterate events for ip %q: %w", ip, err)
|
|
}
|
|
return items, nil
|
|
}
|
|
|
|
func (s *Store) listDecisionsForIP(ctx context.Context, ip string, limit int) ([]model.DecisionRecord, error) {
|
|
if limit <= 0 {
|
|
limit = 50
|
|
}
|
|
rows, err := s.db.QueryContext(ctx, `
|
|
SELECT id, event_id, ip, source_name, kind, action, reason, actor, enforced, created_at
|
|
FROM decisions
|
|
WHERE ip = ?
|
|
ORDER BY created_at DESC, id DESC
|
|
LIMIT ?`,
|
|
ip,
|
|
limit,
|
|
)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("list decisions for ip %q: %w", ip, err)
|
|
}
|
|
defer rows.Close()
|
|
|
|
items := make([]model.DecisionRecord, 0, limit)
|
|
for rows.Next() {
|
|
var item model.DecisionRecord
|
|
var action string
|
|
var enforced int
|
|
var createdAt string
|
|
if err := rows.Scan(&item.ID, &item.EventID, &item.IP, &item.SourceName, &item.Kind, &action, &item.Reason, &item.Actor, &enforced, &createdAt); err != nil {
|
|
return nil, fmt.Errorf("scan decision record: %w", err)
|
|
}
|
|
parsed, err := parseTime(createdAt)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("parse decision created_at: %w", err)
|
|
}
|
|
item.Action = model.DecisionAction(action)
|
|
item.Enforced = enforced != 0
|
|
item.CreatedAt = parsed
|
|
items = append(items, item)
|
|
}
|
|
if err := rows.Err(); err != nil {
|
|
return nil, fmt.Errorf("iterate decisions for ip %q: %w", ip, err)
|
|
}
|
|
return items, nil
|
|
}
|
|
|
|
func (s *Store) listBackendActionsForIP(ctx context.Context, ip string, limit int) ([]model.OPNsenseAction, error) {
|
|
if limit <= 0 {
|
|
limit = 50
|
|
}
|
|
rows, err := s.db.QueryContext(ctx, `
|
|
SELECT id, ip, action, result, message, created_at
|
|
FROM backend_actions
|
|
WHERE ip = ?
|
|
ORDER BY created_at DESC, id DESC
|
|
LIMIT ?`,
|
|
ip,
|
|
limit,
|
|
)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("list backend actions for ip %q: %w", ip, err)
|
|
}
|
|
defer rows.Close()
|
|
|
|
items := make([]model.OPNsenseAction, 0, limit)
|
|
for rows.Next() {
|
|
var item model.OPNsenseAction
|
|
var createdAt string
|
|
if err := rows.Scan(&item.ID, &item.IP, &item.Action, &item.Result, &item.Message, &createdAt); err != nil {
|
|
return nil, fmt.Errorf("scan backend action: %w", err)
|
|
}
|
|
parsed, err := parseTime(createdAt)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("parse backend action created_at: %w", err)
|
|
}
|
|
item.CreatedAt = parsed
|
|
items = append(items, item)
|
|
}
|
|
if err := rows.Err(); err != nil {
|
|
return nil, fmt.Errorf("iterate backend actions for ip %q: %w", ip, err)
|
|
}
|
|
return items, nil
|
|
}
|
|
|
|
func max(left int, right int) int {
|
|
if left > right {
|
|
return left
|
|
}
|
|
return right
|
|
}
|
|
|
|
func getIPStateDB(ctx context.Context, db queryer, ip string) (model.IPState, bool, error) {
|
|
row := db.QueryRowContext(ctx, `
|
|
SELECT ip, first_seen_at, last_seen_at, last_source_name, last_user_agent, latest_status,
|
|
total_events, state, state_reason, manual_override, last_event_id, updated_at
|
|
FROM ip_state WHERE ip = ?`, ip)
|
|
|
|
var item model.IPState
|
|
var firstSeenAt string
|
|
var lastSeenAt string
|
|
var updatedAt string
|
|
var state string
|
|
var manualOverride string
|
|
if err := row.Scan(
|
|
&item.IP,
|
|
&firstSeenAt,
|
|
&lastSeenAt,
|
|
&item.LastSourceName,
|
|
&item.LastUserAgent,
|
|
&item.LatestStatus,
|
|
&item.TotalEvents,
|
|
&state,
|
|
&item.StateReason,
|
|
&manualOverride,
|
|
&item.LastEventID,
|
|
&updatedAt,
|
|
); err != nil {
|
|
if errors.Is(err, sql.ErrNoRows) {
|
|
return model.IPState{}, false, nil
|
|
}
|
|
return model.IPState{}, false, fmt.Errorf("query ip state %q: %w", ip, err)
|
|
}
|
|
|
|
var err error
|
|
item.FirstSeenAt, err = parseTime(firstSeenAt)
|
|
if err != nil {
|
|
return model.IPState{}, false, fmt.Errorf("parse ip state first_seen_at: %w", err)
|
|
}
|
|
item.LastSeenAt, err = parseTime(lastSeenAt)
|
|
if err != nil {
|
|
return model.IPState{}, false, fmt.Errorf("parse ip state last_seen_at: %w", err)
|
|
}
|
|
item.UpdatedAt, err = parseTime(updatedAt)
|
|
if err != nil {
|
|
return model.IPState{}, false, fmt.Errorf("parse ip state updated_at: %w", err)
|
|
}
|
|
item.State = model.IPStateStatus(state)
|
|
item.ManualOverride = model.ManualOverride(manualOverride)
|
|
return item, true, nil
|
|
}
|
|
|
|
func getIPStateTx(tx *sql.Tx, ip string) (model.IPState, bool, error) {
|
|
return getIPStateDB(context.Background(), tx, ip)
|
|
}
|
|
|
|
func upsertIPStateTx(ctx context.Context, tx *sql.Tx, state model.IPState) error {
|
|
if state.UpdatedAt.IsZero() {
|
|
state.UpdatedAt = time.Now().UTC()
|
|
}
|
|
if state.FirstSeenAt.IsZero() {
|
|
state.FirstSeenAt = state.UpdatedAt
|
|
}
|
|
if state.LastSeenAt.IsZero() {
|
|
state.LastSeenAt = state.UpdatedAt
|
|
}
|
|
if state.State == "" {
|
|
state.State = model.IPStateObserved
|
|
}
|
|
if state.ManualOverride == "" {
|
|
state.ManualOverride = model.ManualOverrideNone
|
|
}
|
|
_, err := tx.ExecContext(
|
|
ctx,
|
|
`INSERT INTO ip_state (
|
|
ip, first_seen_at, last_seen_at, last_source_name, last_user_agent, latest_status,
|
|
total_events, state, state_reason, manual_override, last_event_id, updated_at
|
|
) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
|
|
ON CONFLICT(ip) DO UPDATE SET
|
|
first_seen_at = excluded.first_seen_at,
|
|
last_seen_at = excluded.last_seen_at,
|
|
last_source_name = excluded.last_source_name,
|
|
last_user_agent = excluded.last_user_agent,
|
|
latest_status = excluded.latest_status,
|
|
total_events = excluded.total_events,
|
|
state = excluded.state,
|
|
state_reason = excluded.state_reason,
|
|
manual_override = excluded.manual_override,
|
|
last_event_id = excluded.last_event_id,
|
|
updated_at = excluded.updated_at`,
|
|
state.IP,
|
|
formatTime(state.FirstSeenAt),
|
|
formatTime(state.LastSeenAt),
|
|
state.LastSourceName,
|
|
state.LastUserAgent,
|
|
state.LatestStatus,
|
|
state.TotalEvents,
|
|
string(state.State),
|
|
state.StateReason,
|
|
string(state.ManualOverride),
|
|
state.LastEventID,
|
|
formatTime(state.UpdatedAt),
|
|
)
|
|
if err != nil {
|
|
return fmt.Errorf("upsert ip state %q: %w", state.IP, err)
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func mergeEventIntoState(existing model.IPState, found bool, event model.Event) model.IPState {
|
|
now := time.Now().UTC()
|
|
state := existing
|
|
if !found {
|
|
state = model.IPState{
|
|
IP: event.ClientIP,
|
|
FirstSeenAt: event.OccurredAt,
|
|
LastSeenAt: event.OccurredAt,
|
|
LastSourceName: event.SourceName,
|
|
LastUserAgent: event.UserAgent,
|
|
LatestStatus: event.Status,
|
|
TotalEvents: 0,
|
|
State: model.IPStateObserved,
|
|
StateReason: "",
|
|
ManualOverride: model.ManualOverrideNone,
|
|
LastEventID: 0,
|
|
UpdatedAt: now,
|
|
}
|
|
}
|
|
if state.FirstSeenAt.IsZero() || event.OccurredAt.Before(state.FirstSeenAt) {
|
|
state.FirstSeenAt = event.OccurredAt
|
|
}
|
|
if state.LastSeenAt.IsZero() || event.OccurredAt.After(state.LastSeenAt) {
|
|
state.LastSeenAt = event.OccurredAt
|
|
}
|
|
state.LastSourceName = event.SourceName
|
|
state.LastUserAgent = event.UserAgent
|
|
state.LatestStatus = event.Status
|
|
state.TotalEvents++
|
|
state.LastEventID = event.ID
|
|
state.UpdatedAt = now
|
|
if state.ManualOverride == "" {
|
|
state.ManualOverride = model.ManualOverrideNone
|
|
}
|
|
|
|
switch state.ManualOverride {
|
|
case model.ManualOverrideForceBlock:
|
|
state.State = model.IPStateBlocked
|
|
if event.DecisionReason != "" {
|
|
state.StateReason = event.DecisionReason
|
|
} else if state.StateReason == "" {
|
|
state.StateReason = "manual override: block"
|
|
}
|
|
return state
|
|
case model.ManualOverrideForceAllow:
|
|
state.State = model.IPStateAllowed
|
|
if event.DecisionReason != "" {
|
|
state.StateReason = event.DecisionReason
|
|
} else if state.StateReason == "" {
|
|
state.StateReason = "manual override: allow"
|
|
}
|
|
return state
|
|
}
|
|
|
|
switch event.Decision {
|
|
case model.DecisionActionBlock:
|
|
state.State = model.IPStateBlocked
|
|
state.StateReason = event.DecisionReason
|
|
case model.DecisionActionReview:
|
|
if state.State != model.IPStateBlocked && state.State != model.IPStateAllowed {
|
|
state.State = model.IPStateReview
|
|
state.StateReason = event.DecisionReason
|
|
}
|
|
case model.DecisionActionAllow:
|
|
state.State = model.IPStateAllowed
|
|
state.StateReason = event.DecisionReason
|
|
default:
|
|
if state.State == "" {
|
|
state.State = model.IPStateObserved
|
|
}
|
|
}
|
|
return state
|
|
}
|
|
|
|
func scanEvent(scanner interface{ Scan(dest ...any) error }) (model.Event, error) {
|
|
var item model.Event
|
|
var occurredAt string
|
|
var createdAt string
|
|
var decision string
|
|
var decisionReasonsJSON string
|
|
var enforced int
|
|
var currentState string
|
|
var manualOverride string
|
|
if err := scanner.Scan(
|
|
&item.ID,
|
|
&item.SourceName,
|
|
&item.ProfileName,
|
|
&occurredAt,
|
|
&item.RemoteIP,
|
|
&item.ClientIP,
|
|
&item.Host,
|
|
&item.Method,
|
|
&item.URI,
|
|
&item.Path,
|
|
&item.Status,
|
|
&item.UserAgent,
|
|
&decision,
|
|
&item.DecisionReason,
|
|
&decisionReasonsJSON,
|
|
&enforced,
|
|
&item.RawJSON,
|
|
&createdAt,
|
|
¤tState,
|
|
&manualOverride,
|
|
); err != nil {
|
|
return model.Event{}, fmt.Errorf("scan event: %w", err)
|
|
}
|
|
parsedOccurredAt, err := parseTime(occurredAt)
|
|
if err != nil {
|
|
return model.Event{}, fmt.Errorf("parse event occurred_at: %w", err)
|
|
}
|
|
parsedCreatedAt, err := parseTime(createdAt)
|
|
if err != nil {
|
|
return model.Event{}, fmt.Errorf("parse event created_at: %w", err)
|
|
}
|
|
var reasons []string
|
|
if strings.TrimSpace(decisionReasonsJSON) != "" {
|
|
if err := json.Unmarshal([]byte(decisionReasonsJSON), &reasons); err != nil {
|
|
return model.Event{}, fmt.Errorf("decode event decision_reasons_json: %w", err)
|
|
}
|
|
}
|
|
item.OccurredAt = parsedOccurredAt
|
|
item.CreatedAt = parsedCreatedAt
|
|
item.Decision = model.DecisionAction(decision)
|
|
item.DecisionReasons = reasons
|
|
item.Enforced = enforced != 0
|
|
item.CurrentState = model.IPStateStatus(currentState)
|
|
item.ManualOverride = model.ManualOverride(manualOverride)
|
|
return item, nil
|
|
}
|
|
|
|
func scanIPState(scanner interface{ Scan(dest ...any) error }) (model.IPState, error) {
|
|
var item model.IPState
|
|
var firstSeenAt string
|
|
var lastSeenAt string
|
|
var updatedAt string
|
|
var state string
|
|
var manualOverride string
|
|
if err := scanner.Scan(
|
|
&item.IP,
|
|
&firstSeenAt,
|
|
&lastSeenAt,
|
|
&item.LastSourceName,
|
|
&item.LastUserAgent,
|
|
&item.LatestStatus,
|
|
&item.TotalEvents,
|
|
&state,
|
|
&item.StateReason,
|
|
&manualOverride,
|
|
&item.LastEventID,
|
|
&updatedAt,
|
|
); err != nil {
|
|
return model.IPState{}, fmt.Errorf("scan ip state: %w", err)
|
|
}
|
|
var err error
|
|
item.FirstSeenAt, err = parseTime(firstSeenAt)
|
|
if err != nil {
|
|
return model.IPState{}, fmt.Errorf("parse ip state first_seen_at: %w", err)
|
|
}
|
|
item.LastSeenAt, err = parseTime(lastSeenAt)
|
|
if err != nil {
|
|
return model.IPState{}, fmt.Errorf("parse ip state last_seen_at: %w", err)
|
|
}
|
|
item.UpdatedAt, err = parseTime(updatedAt)
|
|
if err != nil {
|
|
return model.IPState{}, fmt.Errorf("parse ip state updated_at: %w", err)
|
|
}
|
|
item.State = model.IPStateStatus(state)
|
|
item.ManualOverride = model.ManualOverride(manualOverride)
|
|
return item, nil
|
|
}
|
|
|
|
func parseTime(value string) (time.Time, error) {
|
|
trimmed := strings.TrimSpace(value)
|
|
if trimmed == "" {
|
|
return time.Time{}, nil
|
|
}
|
|
parsed, err := time.Parse(time.RFC3339Nano, trimmed)
|
|
if err != nil {
|
|
return time.Time{}, err
|
|
}
|
|
return parsed.UTC(), nil
|
|
}
|
|
|
|
func formatTime(value time.Time) string {
|
|
if value.IsZero() {
|
|
return ""
|
|
}
|
|
return value.UTC().Format(time.RFC3339Nano)
|
|
}
|
|
|
|
func boolToInt(value bool) int {
|
|
if value {
|
|
return 1
|
|
}
|
|
return 0
|
|
}
|
|
|
|
func uniqueNonEmptyStrings(items []string) []string {
|
|
seen := make(map[string]struct{}, len(items))
|
|
result := make([]string, 0, len(items))
|
|
for _, item := range items {
|
|
trimmed := strings.TrimSpace(item)
|
|
if trimmed == "" {
|
|
continue
|
|
}
|
|
if _, ok := seen[trimmed]; ok {
|
|
continue
|
|
}
|
|
seen[trimmed] = struct{}{}
|
|
result = append(result, trimmed)
|
|
}
|
|
sort.Strings(result)
|
|
return result
|
|
}
|
|
|
|
type queryer interface {
|
|
QueryRowContext(ctx context.Context, query string, args ...any) *sql.Row
|
|
}
|