2
Files
caddy-opnsense-blocker/internal/store/store.go

1657 lines
49 KiB
Go

package store
import (
"context"
"database/sql"
"encoding/json"
"errors"
"fmt"
"os"
"path/filepath"
"sort"
"strings"
"time"
"git.dern.ovh/infrastructure/caddy-opnsense-blocker/internal/model"
_ "modernc.org/sqlite"
)
const schema = `
CREATE TABLE IF NOT EXISTS events (
id INTEGER PRIMARY KEY AUTOINCREMENT,
source_name TEXT NOT NULL,
profile_name TEXT NOT NULL,
occurred_at TEXT NOT NULL,
remote_ip TEXT NOT NULL,
client_ip TEXT NOT NULL,
host TEXT NOT NULL,
method TEXT NOT NULL,
uri TEXT NOT NULL,
path TEXT NOT NULL,
status INTEGER NOT NULL,
user_agent TEXT NOT NULL,
decision TEXT NOT NULL,
decision_reason TEXT NOT NULL,
decision_reasons_json TEXT NOT NULL,
enforced INTEGER NOT NULL DEFAULT 0,
raw_json TEXT NOT NULL,
created_at TEXT NOT NULL
);
CREATE INDEX IF NOT EXISTS idx_events_occurred_at ON events(occurred_at DESC, id DESC);
CREATE INDEX IF NOT EXISTS idx_events_client_ip ON events(client_ip, occurred_at DESC, id DESC);
CREATE INDEX IF NOT EXISTS idx_events_source_name ON events(source_name, occurred_at DESC, id DESC);
CREATE INDEX IF NOT EXISTS idx_events_decision ON events(decision, occurred_at DESC, id DESC);
CREATE TABLE IF NOT EXISTS ip_state (
ip TEXT PRIMARY KEY,
first_seen_at TEXT NOT NULL,
last_seen_at TEXT NOT NULL,
last_source_name TEXT NOT NULL,
last_user_agent TEXT NOT NULL,
latest_status INTEGER NOT NULL,
total_events INTEGER NOT NULL,
state TEXT NOT NULL,
state_reason TEXT NOT NULL,
manual_override TEXT NOT NULL,
last_event_id INTEGER NOT NULL,
updated_at TEXT NOT NULL
);
CREATE INDEX IF NOT EXISTS idx_ip_state_last_seen ON ip_state(last_seen_at DESC, ip ASC);
CREATE INDEX IF NOT EXISTS idx_ip_state_state ON ip_state(state, last_seen_at DESC, ip ASC);
CREATE TABLE IF NOT EXISTS decisions (
id INTEGER PRIMARY KEY AUTOINCREMENT,
event_id INTEGER NOT NULL,
ip TEXT NOT NULL,
source_name TEXT NOT NULL,
kind TEXT NOT NULL,
action TEXT NOT NULL,
reason TEXT NOT NULL,
actor TEXT NOT NULL,
enforced INTEGER NOT NULL DEFAULT 0,
created_at TEXT NOT NULL
);
CREATE INDEX IF NOT EXISTS idx_decisions_ip ON decisions(ip, created_at DESC, id DESC);
CREATE INDEX IF NOT EXISTS idx_decisions_event_id ON decisions(event_id, created_at DESC, id DESC);
CREATE TABLE IF NOT EXISTS backend_actions (
id INTEGER PRIMARY KEY AUTOINCREMENT,
ip TEXT NOT NULL,
action TEXT NOT NULL,
result TEXT NOT NULL,
message TEXT NOT NULL,
created_at TEXT NOT NULL
);
CREATE INDEX IF NOT EXISTS idx_backend_actions_ip ON backend_actions(ip, created_at DESC, id DESC);
CREATE TABLE IF NOT EXISTS source_offsets (
source_name TEXT PRIMARY KEY,
path TEXT NOT NULL,
inode TEXT NOT NULL,
offset INTEGER NOT NULL,
updated_at TEXT NOT NULL
);
CREATE TABLE IF NOT EXISTS ip_investigations (
ip TEXT PRIMARY KEY,
payload_json TEXT NOT NULL,
updated_at TEXT NOT NULL
);
CREATE INDEX IF NOT EXISTS idx_ip_investigations_updated_at ON ip_investigations(updated_at DESC, ip ASC);
`
type Store struct {
db *sql.DB
}
func Open(path string) (*Store, error) {
if err := os.MkdirAll(filepath.Dir(path), 0o755); err != nil {
return nil, fmt.Errorf("create storage directory: %w", err)
}
db, err := sql.Open("sqlite", path)
if err != nil {
return nil, fmt.Errorf("open sqlite database: %w", err)
}
db.SetMaxOpenConns(1)
db.SetMaxIdleConns(1)
db.SetConnMaxLifetime(0)
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
for _, statement := range []string{
"PRAGMA journal_mode = WAL;",
"PRAGMA busy_timeout = 5000;",
"PRAGMA foreign_keys = ON;",
} {
if _, err := db.ExecContext(ctx, statement); err != nil {
db.Close()
return nil, fmt.Errorf("apply sqlite pragma %q: %w", statement, err)
}
}
if _, err := db.ExecContext(ctx, schema); err != nil {
db.Close()
return nil, fmt.Errorf("apply sqlite schema: %w", err)
}
return &Store{db: db}, nil
}
func (s *Store) Close() error {
if s == nil || s.db == nil {
return nil
}
return s.db.Close()
}
func (s *Store) RecordEvent(ctx context.Context, event *model.Event) error {
if event == nil {
return errors.New("nil event")
}
if event.OccurredAt.IsZero() {
event.OccurredAt = time.Now().UTC()
}
if event.CreatedAt.IsZero() {
event.CreatedAt = time.Now().UTC()
}
encodedReasons, err := json.Marshal(event.DecisionReasons)
if err != nil {
return fmt.Errorf("encode decision reasons: %w", err)
}
tx, err := s.db.BeginTx(ctx, nil)
if err != nil {
return fmt.Errorf("begin transaction: %w", err)
}
defer tx.Rollback()
state, found, err := getIPStateTx(tx, event.ClientIP)
if err != nil {
return err
}
result, err := tx.ExecContext(
ctx,
`INSERT INTO events (
source_name, profile_name, occurred_at, remote_ip, client_ip, host, method, uri, path,
status, user_agent, decision, decision_reason, decision_reasons_json, enforced, raw_json, created_at
) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)`,
event.SourceName,
event.ProfileName,
formatTime(event.OccurredAt),
event.RemoteIP,
event.ClientIP,
event.Host,
event.Method,
event.URI,
event.Path,
event.Status,
event.UserAgent,
string(event.Decision),
event.DecisionReason,
string(encodedReasons),
boolToInt(event.Enforced),
event.RawJSON,
formatTime(event.CreatedAt),
)
if err != nil {
return fmt.Errorf("insert event: %w", err)
}
eventID, err := result.LastInsertId()
if err != nil {
return fmt.Errorf("load inserted event id: %w", err)
}
event.ID = eventID
updatedState := mergeEventIntoState(state, found, *event)
event.CurrentState = updatedState.State
event.ManualOverride = updatedState.ManualOverride
if err := upsertIPStateTx(ctx, tx, updatedState); err != nil {
return err
}
if err := tx.Commit(); err != nil {
return fmt.Errorf("commit event transaction: %w", err)
}
return nil
}
const responseBytesExpression = `CASE WHEN json_valid(e.raw_json) THEN CAST(COALESCE(json_extract(e.raw_json, '$.size'), 0) AS INTEGER) ELSE 0 END`
func knownBotExistsClause(ipExpression string) string {
return `EXISTS (
SELECT 1
FROM ip_investigations i
WHERE i.ip = ` + ipExpression + `
AND json_valid(i.payload_json)
AND json_type(i.payload_json, '$.bot') IS NOT NULL
AND COALESCE(json_extract(i.payload_json, '$.bot.verified'), 0) = 1
)`
}
func overviewFilterQueryParts(options model.OverviewOptions) (joins []string, clauses []string) {
if !options.ShowAllowed {
joins = append(joins, `LEFT JOIN ip_state s ON s.ip = e.client_ip`)
clauses = append(clauses, `COALESCE(s.state, '') <> '`+string(model.IPStateAllowed)+`'`)
}
if !options.ShowKnownBots {
clauses = append(clauses, `NOT `+knownBotExistsClause(`e.client_ip`))
}
return joins, clauses
}
func eventFilterQueryParts(options model.EventListOptions) (joins []string, clauses []string) {
joins = append(joins, `LEFT JOIN ip_state s ON s.ip = e.client_ip`)
if !options.ShowAllowed {
clauses = append(clauses, `COALESCE(s.state, '') <> '`+string(model.IPStateAllowed)+`'`)
}
if options.ReviewOnly {
clauses = append(clauses, `COALESCE(s.state, '') = '`+string(model.IPStateReview)+`'`)
}
if !options.ShowKnownBots {
clauses = append(clauses, `NOT `+knownBotExistsClause(`e.client_ip`))
}
return joins, clauses
}
func (s *Store) AddDecision(ctx context.Context, decision *model.DecisionRecord) error {
if decision == nil {
return errors.New("nil decision record")
}
if decision.CreatedAt.IsZero() {
decision.CreatedAt = time.Now().UTC()
}
result, err := s.db.ExecContext(
ctx,
`INSERT INTO decisions (event_id, ip, source_name, kind, action, reason, actor, enforced, created_at)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)`,
decision.EventID,
decision.IP,
decision.SourceName,
decision.Kind,
string(decision.Action),
decision.Reason,
decision.Actor,
boolToInt(decision.Enforced),
formatTime(decision.CreatedAt),
)
if err != nil {
return fmt.Errorf("insert decision record: %w", err)
}
decision.ID, err = result.LastInsertId()
if err != nil {
return fmt.Errorf("load inserted decision id: %w", err)
}
return nil
}
func (s *Store) AddBackendAction(ctx context.Context, action *model.OPNsenseAction) error {
if action == nil {
return errors.New("nil backend action")
}
if action.CreatedAt.IsZero() {
action.CreatedAt = time.Now().UTC()
}
result, err := s.db.ExecContext(
ctx,
`INSERT INTO backend_actions (ip, action, result, message, created_at)
VALUES (?, ?, ?, ?, ?)`,
action.IP,
action.Action,
action.Result,
action.Message,
formatTime(action.CreatedAt),
)
if err != nil {
return fmt.Errorf("insert backend action: %w", err)
}
action.ID, err = result.LastInsertId()
if err != nil {
return fmt.Errorf("load inserted backend action id: %w", err)
}
return nil
}
func (s *Store) GetIPState(ctx context.Context, ip string) (model.IPState, bool, error) {
return getIPStateDB(ctx, s.db, ip)
}
func (s *Store) SetManualOverride(ctx context.Context, ip string, override model.ManualOverride, state model.IPStateStatus, reason string) (model.IPState, error) {
tx, err := s.db.BeginTx(ctx, nil)
if err != nil {
return model.IPState{}, fmt.Errorf("begin transaction: %w", err)
}
defer tx.Rollback()
current, found, err := getIPStateTx(tx, ip)
if err != nil {
return model.IPState{}, err
}
now := time.Now().UTC()
if !found {
current = model.IPState{
IP: ip,
FirstSeenAt: now,
LastSeenAt: now,
LastSourceName: "",
LastUserAgent: "",
LatestStatus: 0,
TotalEvents: 0,
State: state,
StateReason: strings.TrimSpace(reason),
ManualOverride: override,
LastEventID: 0,
UpdatedAt: now,
}
} else {
current.ManualOverride = override
current.State = state
if strings.TrimSpace(reason) != "" {
current.StateReason = strings.TrimSpace(reason)
}
current.UpdatedAt = now
}
if err := upsertIPStateTx(ctx, tx, current); err != nil {
return model.IPState{}, err
}
if err := tx.Commit(); err != nil {
return model.IPState{}, fmt.Errorf("commit transaction: %w", err)
}
return current, nil
}
func (s *Store) ClearManualOverride(ctx context.Context, ip string, reason string) (model.IPState, error) {
tx, err := s.db.BeginTx(ctx, nil)
if err != nil {
return model.IPState{}, fmt.Errorf("begin transaction: %w", err)
}
defer tx.Rollback()
current, found, err := getIPStateTx(tx, ip)
if err != nil {
return model.IPState{}, err
}
now := time.Now().UTC()
if !found {
current = model.IPState{
IP: ip,
FirstSeenAt: now,
LastSeenAt: now,
State: model.IPStateObserved,
StateReason: strings.TrimSpace(reason),
ManualOverride: model.ManualOverrideNone,
UpdatedAt: now,
}
} else {
current.ManualOverride = model.ManualOverrideNone
if current.State == "" {
current.State = model.IPStateObserved
}
if strings.TrimSpace(reason) != "" {
current.StateReason = strings.TrimSpace(reason)
}
current.UpdatedAt = now
}
if err := upsertIPStateTx(ctx, tx, current); err != nil {
return model.IPState{}, err
}
if err := tx.Commit(); err != nil {
return model.IPState{}, fmt.Errorf("commit transaction: %w", err)
}
return current, nil
}
func (s *Store) GetOverview(ctx context.Context, since time.Time, limit int, options model.OverviewOptions) (model.Overview, error) {
if limit <= 0 {
limit = 50
}
var overview model.Overview
if !since.IsZero() {
overview.ActivitySince = since.UTC()
}
if err := s.db.QueryRowContext(ctx, `SELECT COUNT(*) FROM events`).Scan(&overview.TotalEvents); err != nil {
return model.Overview{}, fmt.Errorf("count events: %w", err)
}
if err := s.db.QueryRowContext(ctx, `SELECT COUNT(*) FROM ip_state`).Scan(&overview.TotalIPs); err != nil {
return model.Overview{}, fmt.Errorf("count ip states: %w", err)
}
if err := s.db.QueryRowContext(ctx, `SELECT COUNT(*) FROM ip_state WHERE state = ?`, string(model.IPStateBlocked)).Scan(&overview.BlockedIPs); err != nil {
return model.Overview{}, fmt.Errorf("count blocked ip states: %w", err)
}
if err := s.db.QueryRowContext(ctx, `SELECT COUNT(*) FROM ip_state WHERE state = ?`, string(model.IPStateReview)).Scan(&overview.ReviewIPs); err != nil {
return model.Overview{}, fmt.Errorf("count review ip states: %w", err)
}
if err := s.db.QueryRowContext(ctx, `SELECT COUNT(*) FROM ip_state WHERE state = ?`, string(model.IPStateAllowed)).Scan(&overview.AllowedIPs); err != nil {
return model.Overview{}, fmt.Errorf("count allowed ip states: %w", err)
}
if err := s.db.QueryRowContext(ctx, `SELECT COUNT(*) FROM ip_state WHERE state = ?`, string(model.IPStateObserved)).Scan(&overview.ObservedIPs); err != nil {
return model.Overview{}, fmt.Errorf("count observed ip states: %w", err)
}
recentIPs, err := s.ListIPStates(ctx, limit, "")
if err != nil {
return model.Overview{}, err
}
recentEvents, err := s.ListRecentEvents(ctx, limit)
if err != nil {
return model.Overview{}, err
}
topIPsByEvents, err := s.listTopIPRows(ctx, since, limit, "events", options)
if err != nil {
return model.Overview{}, err
}
topIPsByTraffic, err := s.listTopIPRows(ctx, since, limit, "traffic", options)
if err != nil {
return model.Overview{}, err
}
topSources, err := s.listTopSourceRows(ctx, since, limit, options)
if err != nil {
return model.Overview{}, err
}
topURLs, err := s.listTopURLRows(ctx, since, limit, options)
if err != nil {
return model.Overview{}, err
}
activityBuckets, err := s.listActivityBuckets(ctx, since, options)
if err != nil {
return model.Overview{}, err
}
methods, err := s.listMethodBreakdown(ctx, since, options)
if err != nil {
return model.Overview{}, err
}
bots, err := s.listBotBreakdown(ctx, since, options)
if err != nil {
return model.Overview{}, err
}
overview.RecentIPs = recentIPs
overview.RecentEvents = recentEvents
overview.ActivityBuckets = activityBuckets
overview.Methods = methods
overview.Bots = bots
overview.TopIPsByEvents = topIPsByEvents
overview.TopIPsByTraffic = topIPsByTraffic
overview.TopSources = topSources
overview.TopURLs = topURLs
return overview, nil
}
func (s *Store) listTopIPRows(ctx context.Context, since time.Time, limit int, orderBy string, options model.OverviewOptions) ([]model.TopIPRow, error) {
if limit <= 0 {
limit = 10
}
joins, clauses := overviewFilterQueryParts(options)
query := fmt.Sprintf(`
SELECT e.client_ip,
COUNT(*) AS event_count,
COALESCE(SUM(%s), 0) AS traffic_bytes,
MAX(e.occurred_at) AS last_seen_at
FROM events e`, responseBytesExpression)
if len(joins) > 0 {
query += ` ` + strings.Join(joins, ` `)
}
args := make([]any, 0, 2)
if !since.IsZero() {
clauses = append([]string{`e.occurred_at >= ?`}, clauses...)
args = append(args, formatTime(since))
}
if len(clauses) > 0 {
query += ` WHERE ` + strings.Join(clauses, ` AND `)
}
query += ` GROUP BY e.client_ip`
switch orderBy {
case "traffic":
query += ` ORDER BY traffic_bytes DESC, event_count DESC, last_seen_at DESC, e.client_ip ASC`
default:
query += ` ORDER BY event_count DESC, traffic_bytes DESC, last_seen_at DESC, e.client_ip ASC`
}
query += ` LIMIT ?`
args = append(args, limit)
rows, err := s.db.QueryContext(ctx, query, args...)
if err != nil {
return nil, fmt.Errorf("list top ip rows by %s: %w", orderBy, err)
}
defer rows.Close()
items := make([]model.TopIPRow, 0, limit)
for rows.Next() {
var item model.TopIPRow
var lastSeenAt string
if err := rows.Scan(&item.IP, &item.Events, &item.TrafficBytes, &lastSeenAt); err != nil {
return nil, fmt.Errorf("scan top ip row: %w", err)
}
parsed, err := parseTime(lastSeenAt)
if err != nil {
return nil, fmt.Errorf("parse top ip row last_seen_at: %w", err)
}
item.LastSeenAt = parsed
items = append(items, item)
}
if err := rows.Err(); err != nil {
return nil, fmt.Errorf("iterate top ip rows by %s: %w", orderBy, err)
}
return items, nil
}
func (s *Store) listTopSourceRows(ctx context.Context, since time.Time, limit int, options model.OverviewOptions) ([]model.TopSourceRow, error) {
if limit <= 0 {
limit = 10
}
joins, clauses := overviewFilterQueryParts(options)
query := fmt.Sprintf(`
SELECT e.source_name,
COUNT(*) AS event_count,
COALESCE(SUM(%s), 0) AS traffic_bytes,
MAX(e.occurred_at) AS last_seen_at
FROM events e`, responseBytesExpression)
if len(joins) > 0 {
query += ` ` + strings.Join(joins, ` `)
}
args := make([]any, 0, 2)
if !since.IsZero() {
clauses = append([]string{`e.occurred_at >= ?`}, clauses...)
args = append(args, formatTime(since))
}
if len(clauses) > 0 {
query += ` WHERE ` + strings.Join(clauses, ` AND `)
}
query += ` GROUP BY e.source_name ORDER BY event_count DESC, traffic_bytes DESC, last_seen_at DESC, e.source_name ASC LIMIT ?`
args = append(args, limit)
rows, err := s.db.QueryContext(ctx, query, args...)
if err != nil {
return nil, fmt.Errorf("list top source rows: %w", err)
}
defer rows.Close()
items := make([]model.TopSourceRow, 0, limit)
for rows.Next() {
var item model.TopSourceRow
var lastSeenAt string
if err := rows.Scan(&item.SourceName, &item.Events, &item.TrafficBytes, &lastSeenAt); err != nil {
return nil, fmt.Errorf("scan top source row: %w", err)
}
parsed, err := parseTime(lastSeenAt)
if err != nil {
return nil, fmt.Errorf("parse top source row last_seen_at: %w", err)
}
item.LastSeenAt = parsed
items = append(items, item)
}
if err := rows.Err(); err != nil {
return nil, fmt.Errorf("iterate top source rows: %w", err)
}
return items, nil
}
func (s *Store) listTopURLRows(ctx context.Context, since time.Time, limit int, options model.OverviewOptions) ([]model.TopURLRow, error) {
if limit <= 0 {
limit = 10
}
joins, clauses := overviewFilterQueryParts(options)
query := fmt.Sprintf(`
SELECT e.host,
e.uri,
COUNT(*) AS event_count,
COALESCE(SUM(%s), 0) AS traffic_bytes,
MAX(e.occurred_at) AS last_seen_at
FROM events e`, responseBytesExpression)
if len(joins) > 0 {
query += ` ` + strings.Join(joins, ` `)
}
args := make([]any, 0, 2)
if !since.IsZero() {
clauses = append([]string{`e.occurred_at >= ?`}, clauses...)
args = append(args, formatTime(since))
}
if len(clauses) > 0 {
query += ` WHERE ` + strings.Join(clauses, ` AND `)
}
query += ` GROUP BY e.host, e.uri ORDER BY event_count DESC, traffic_bytes DESC, last_seen_at DESC, e.host ASC, e.uri ASC LIMIT ?`
args = append(args, limit)
rows, err := s.db.QueryContext(ctx, query, args...)
if err != nil {
return nil, fmt.Errorf("list top url rows: %w", err)
}
defer rows.Close()
items := make([]model.TopURLRow, 0, limit)
for rows.Next() {
var item model.TopURLRow
var lastSeenAt string
if err := rows.Scan(&item.Host, &item.URI, &item.Events, &item.TrafficBytes, &lastSeenAt); err != nil {
return nil, fmt.Errorf("scan top url row: %w", err)
}
parsed, err := parseTime(lastSeenAt)
if err != nil {
return nil, fmt.Errorf("parse top url row last_seen_at: %w", err)
}
item.LastSeenAt = parsed
items = append(items, item)
}
if err := rows.Err(); err != nil {
return nil, fmt.Errorf("iterate top url rows: %w", err)
}
return items, nil
}
func (s *Store) listActivityBuckets(ctx context.Context, since time.Time, options model.OverviewOptions) ([]model.ActivityBucket, error) {
if since.IsZero() {
return nil, nil
}
joins, clauses := overviewFilterQueryParts(options)
query := `
SELECT (CAST(strftime('%s', e.occurred_at) AS INTEGER) / 600) * 600 AS bucket_unix,
e.source_name,
COUNT(*) AS event_count
FROM events e`
if len(joins) > 0 {
query += ` ` + strings.Join(joins, ` `)
}
args := []any{formatTime(since)}
clauses = append([]string{`e.occurred_at >= ?`}, clauses...)
if len(clauses) > 0 {
query += ` WHERE ` + strings.Join(clauses, ` AND `)
}
query += ` GROUP BY bucket_unix, e.source_name ORDER BY bucket_unix ASC, event_count DESC, e.source_name ASC`
rows, err := s.db.QueryContext(ctx, query, args...)
if err != nil {
return nil, fmt.Errorf("list activity buckets: %w", err)
}
defer rows.Close()
type bucketKey int64
bucketMap := map[bucketKey]*model.ActivityBucket{}
for rows.Next() {
var bucketUnix int64
var sourceName string
var events int64
if err := rows.Scan(&bucketUnix, &sourceName, &events); err != nil {
return nil, fmt.Errorf("scan activity bucket: %w", err)
}
key := bucketKey(bucketUnix)
bucket, ok := bucketMap[key]
if !ok {
bucket = &model.ActivityBucket{BucketStart: time.Unix(bucketUnix, 0).UTC()}
bucketMap[key] = bucket
}
bucket.TotalEvents += events
bucket.Sources = append(bucket.Sources, model.ActivitySourceCount{SourceName: sourceName, Events: events})
}
if err := rows.Err(); err != nil {
return nil, fmt.Errorf("iterate activity buckets: %w", err)
}
start := since.UTC().Truncate(10 * time.Minute)
end := time.Now().UTC().Truncate(10 * time.Minute)
buckets := make([]model.ActivityBucket, 0, int(end.Sub(start)/(10*time.Minute))+1)
for current := start; !current.After(end); current = current.Add(10 * time.Minute) {
bucket, ok := bucketMap[bucketKey(current.Unix())]
if !ok {
buckets = append(buckets, model.ActivityBucket{BucketStart: current})
continue
}
sort.Slice(bucket.Sources, func(left int, right int) bool {
if bucket.Sources[left].Events != bucket.Sources[right].Events {
return bucket.Sources[left].Events > bucket.Sources[right].Events
}
return bucket.Sources[left].SourceName < bucket.Sources[right].SourceName
})
buckets = append(buckets, *bucket)
}
return buckets, nil
}
func (s *Store) listMethodBreakdown(ctx context.Context, since time.Time, options model.OverviewOptions) ([]model.MethodBreakdownRow, error) {
joins, clauses := overviewFilterQueryParts(options)
query := `
SELECT COALESCE(NULLIF(UPPER(TRIM(e.method)), ''), 'OTHER') AS method,
COUNT(*) AS event_count
FROM events e`
if len(joins) > 0 {
query += ` ` + strings.Join(joins, ` `)
}
args := make([]any, 0, 2)
if !since.IsZero() {
clauses = append([]string{`e.occurred_at >= ?`}, clauses...)
args = append(args, formatTime(since))
}
if len(clauses) > 0 {
query += ` WHERE ` + strings.Join(clauses, ` AND `)
}
query += ` GROUP BY method ORDER BY event_count DESC, method ASC`
rows, err := s.db.QueryContext(ctx, query, args...)
if err != nil {
return nil, fmt.Errorf("list method breakdown: %w", err)
}
defer rows.Close()
items := make([]model.MethodBreakdownRow, 0, 8)
for rows.Next() {
var item model.MethodBreakdownRow
if err := rows.Scan(&item.Method, &item.Events); err != nil {
return nil, fmt.Errorf("scan method breakdown row: %w", err)
}
items = append(items, item)
}
if err := rows.Err(); err != nil {
return nil, fmt.Errorf("iterate method breakdown rows: %w", err)
}
return items, nil
}
func (s *Store) listBotBreakdown(ctx context.Context, since time.Time, options model.OverviewOptions) ([]model.BotBreakdownRow, error) {
joins, clauses := overviewFilterQueryParts(options)
joins = append(joins, `LEFT JOIN ip_investigations i ON i.ip = e.client_ip`)
query := `
SELECT
COALESCE(SUM(CASE WHEN json_valid(i.payload_json)
AND json_type(i.payload_json, '$.bot') IS NOT NULL
AND COALESCE(json_extract(i.payload_json, '$.bot.verified'), 0) = 1
THEN 1 ELSE 0 END), 0) AS known_bots,
COALESCE(SUM(CASE WHEN json_valid(i.payload_json)
AND json_type(i.payload_json, '$.bot') IS NOT NULL
AND COALESCE(json_extract(i.payload_json, '$.bot.verified'), 0) <> 1
THEN 1 ELSE 0 END), 0) AS possible_bots,
COALESCE(SUM(CASE WHEN NOT json_valid(i.payload_json)
OR json_type(i.payload_json, '$.bot') IS NULL
THEN 1 ELSE 0 END), 0) AS other_traffic
FROM events e`
if len(joins) > 0 {
query += ` ` + strings.Join(joins, ` `)
}
args := make([]any, 0, 2)
if !since.IsZero() {
clauses = append([]string{`e.occurred_at >= ?`}, clauses...)
args = append(args, formatTime(since))
}
if len(clauses) > 0 {
query += ` WHERE ` + strings.Join(clauses, ` AND `)
}
var knownBots int64
var possibleBots int64
var otherTraffic int64
if err := s.db.QueryRowContext(ctx, query, args...).Scan(&knownBots, &possibleBots, &otherTraffic); err != nil {
return nil, fmt.Errorf("list bot breakdown: %w", err)
}
return []model.BotBreakdownRow{
{Key: "known", Label: "Known bots", Events: knownBots},
{Key: "possible", Label: "Possible bots", Events: possibleBots},
{Key: "other", Label: "Other traffic", Events: otherTraffic},
}, nil
}
func (s *Store) ListEvents(ctx context.Context, since time.Time, limit int, options model.EventListOptions) ([]model.Event, error) {
if limit <= 0 {
limit = 100
}
joins, clauses := eventFilterQueryParts(options)
query := `
SELECT e.id, e.source_name, e.profile_name, e.occurred_at, e.remote_ip, e.client_ip, e.host,
e.method, e.uri, e.path, e.status, e.user_agent, e.decision, e.decision_reason,
e.decision_reasons_json, e.enforced, e.raw_json, e.created_at,
COALESCE(s.state, ''), COALESCE(s.manual_override, '')
FROM events e`
if len(joins) > 0 {
query += ` ` + strings.Join(joins, ` `)
}
args := make([]any, 0, 2)
if !since.IsZero() {
clauses = append([]string{`e.occurred_at >= ?`}, clauses...)
args = append(args, formatTime(since))
}
if len(clauses) > 0 {
query += ` WHERE ` + strings.Join(clauses, ` AND `)
}
query += ` ORDER BY e.occurred_at DESC, e.id DESC LIMIT ?`
args = append(args, limit)
rows, err := s.db.QueryContext(ctx, query, args...)
if err != nil {
return nil, fmt.Errorf("list events: %w", err)
}
defer rows.Close()
items := make([]model.Event, 0, limit)
for rows.Next() {
item, err := scanEvent(rows)
if err != nil {
return nil, err
}
items = append(items, item)
}
if err := rows.Err(); err != nil {
return nil, fmt.Errorf("iterate events: %w", err)
}
return items, nil
}
func (s *Store) ListRecentEvents(ctx context.Context, limit int) ([]model.Event, error) {
if limit <= 0 {
limit = 50
}
rows, err := s.db.QueryContext(ctx, `
SELECT e.id, e.source_name, e.profile_name, e.occurred_at, e.remote_ip, e.client_ip, e.host,
e.method, e.uri, e.path, e.status, e.user_agent, e.decision, e.decision_reason,
e.decision_reasons_json, e.enforced, e.raw_json, e.created_at,
COALESCE(s.state, ''), COALESCE(s.manual_override, '')
FROM events e
LEFT JOIN ip_state s ON s.ip = e.client_ip
ORDER BY e.occurred_at DESC, e.id DESC
LIMIT ?`,
limit,
)
if err != nil {
return nil, fmt.Errorf("list recent events: %w", err)
}
defer rows.Close()
items := make([]model.Event, 0, limit)
for rows.Next() {
item, err := scanEvent(rows)
if err != nil {
return nil, err
}
items = append(items, item)
}
if err := rows.Err(); err != nil {
return nil, fmt.Errorf("iterate recent events: %w", err)
}
return items, nil
}
func (s *Store) ListIPStates(ctx context.Context, limit int, stateFilter string) ([]model.IPState, error) {
if limit <= 0 {
limit = 50
}
query := `SELECT ip, first_seen_at, last_seen_at, last_source_name, last_user_agent, latest_status,
total_events, state, state_reason, manual_override, last_event_id, updated_at
FROM ip_state`
args := []any{}
if strings.TrimSpace(stateFilter) != "" {
query += ` WHERE state = ?`
args = append(args, strings.TrimSpace(stateFilter))
}
query += ` ORDER BY last_seen_at DESC, ip ASC LIMIT ?`
args = append(args, limit)
rows, err := s.db.QueryContext(ctx, query, args...)
if err != nil {
return nil, fmt.Errorf("list ip states: %w", err)
}
defer rows.Close()
items := make([]model.IPState, 0, limit)
for rows.Next() {
item, err := scanIPState(rows)
if err != nil {
return nil, err
}
items = append(items, item)
}
if err := rows.Err(); err != nil {
return nil, fmt.Errorf("iterate ip states: %w", err)
}
return items, nil
}
func (s *Store) ListRecentIPRows(ctx context.Context, since time.Time, limit int) ([]model.RecentIPRow, error) {
if limit <= 0 {
limit = 200
}
rows, err := s.db.QueryContext(ctx, `
WITH recent AS (
SELECT client_ip, COUNT(*) AS event_count, MAX(occurred_at) AS last_seen_at
FROM events
WHERE occurred_at >= ?
GROUP BY client_ip
)
SELECT s.ip,
COALESCE((
SELECT e.source_name
FROM events e
WHERE e.client_ip = s.ip AND e.occurred_at >= ?
ORDER BY e.occurred_at DESC, e.id DESC
LIMIT 1
), s.last_source_name) AS source_name,
s.state,
recent.event_count,
recent.last_seen_at,
s.state_reason,
s.manual_override
FROM recent
JOIN ip_state s ON s.ip = recent.client_ip
ORDER BY recent.event_count DESC, recent.last_seen_at DESC, s.ip ASC
LIMIT ?`,
formatTime(since),
formatTime(since),
limit,
)
if err != nil {
return nil, fmt.Errorf("list recent ip rows: %w", err)
}
defer rows.Close()
items := make([]model.RecentIPRow, 0, limit)
for rows.Next() {
var item model.RecentIPRow
var state string
var lastSeenAt string
var manualOverride string
if err := rows.Scan(
&item.IP,
&item.SourceName,
&state,
&item.Events,
&lastSeenAt,
&item.Reason,
&manualOverride,
); err != nil {
return nil, fmt.Errorf("scan recent ip row: %w", err)
}
parsedLastSeenAt, err := parseTime(lastSeenAt)
if err != nil {
return nil, fmt.Errorf("parse recent ip row last_seen_at: %w", err)
}
item.State = model.IPStateStatus(state)
item.LastSeenAt = parsedLastSeenAt
item.ManualOverride = model.ManualOverride(manualOverride)
items = append(items, item)
}
if err := rows.Err(); err != nil {
return nil, fmt.Errorf("iterate recent ip rows: %w", err)
}
return items, nil
}
func (s *Store) ListIPsWithoutInvestigation(ctx context.Context, since time.Time, limit int) ([]string, error) {
if limit <= 0 {
limit = 200
}
query := `
SELECT s.ip
FROM ip_state s
LEFT JOIN ip_investigations i ON i.ip = s.ip
WHERE i.ip IS NULL`
args := make([]any, 0, 2)
if !since.IsZero() {
query += ` AND s.last_seen_at >= ?`
args = append(args, formatTime(since))
}
query += ` ORDER BY s.last_seen_at DESC, s.ip ASC LIMIT ?`
args = append(args, limit)
rows, err := s.db.QueryContext(ctx, query, args...)
if err != nil {
return nil, fmt.Errorf("list ips without investigation: %w", err)
}
defer rows.Close()
items := make([]string, 0, limit)
for rows.Next() {
var ip string
if err := rows.Scan(&ip); err != nil {
return nil, fmt.Errorf("scan ip without investigation: %w", err)
}
items = append(items, ip)
}
if err := rows.Err(); err != nil {
return nil, fmt.Errorf("iterate ips without investigation: %w", err)
}
return items, nil
}
func (s *Store) GetIPDetails(ctx context.Context, ip string, eventLimit, decisionLimit, actionLimit int) (model.IPDetails, error) {
state, _, err := s.GetIPState(ctx, ip)
if err != nil {
return model.IPDetails{}, err
}
events, err := s.listEventsForIP(ctx, ip, eventLimit)
if err != nil {
return model.IPDetails{}, err
}
decisions, err := s.listDecisionsForIP(ctx, ip, decisionLimit)
if err != nil {
return model.IPDetails{}, err
}
actions, err := s.listBackendActionsForIP(ctx, ip, actionLimit)
if err != nil {
return model.IPDetails{}, err
}
return model.IPDetails{
State: state,
RecentEvents: events,
Decisions: decisions,
BackendActions: actions,
}, nil
}
func (s *Store) GetInvestigation(ctx context.Context, ip string) (model.IPInvestigation, bool, error) {
row := s.db.QueryRowContext(ctx, `SELECT payload_json, updated_at FROM ip_investigations WHERE ip = ?`, ip)
var payload string
var updatedAt string
if err := row.Scan(&payload, &updatedAt); err != nil {
if errors.Is(err, sql.ErrNoRows) {
return model.IPInvestigation{}, false, nil
}
return model.IPInvestigation{}, false, fmt.Errorf("query ip investigation %q: %w", ip, err)
}
var item model.IPInvestigation
if err := json.Unmarshal([]byte(payload), &item); err != nil {
return model.IPInvestigation{}, false, fmt.Errorf("decode ip investigation %q: %w", ip, err)
}
parsed, err := parseTime(updatedAt)
if err != nil {
return model.IPInvestigation{}, false, fmt.Errorf("parse ip investigation updated_at: %w", err)
}
item.UpdatedAt = parsed
return item, true, nil
}
func (s *Store) GetInvestigationsForIPs(ctx context.Context, ips []string) (map[string]model.IPInvestigation, error) {
unique := uniqueNonEmptyStrings(ips)
if len(unique) == 0 {
return map[string]model.IPInvestigation{}, nil
}
placeholders := make([]string, len(unique))
args := make([]any, 0, len(unique))
for index, ip := range unique {
placeholders[index] = "?"
args = append(args, ip)
}
rows, err := s.db.QueryContext(ctx, fmt.Sprintf(
`SELECT ip, payload_json, updated_at FROM ip_investigations WHERE ip IN (%s)`,
strings.Join(placeholders, ", "),
), args...)
if err != nil {
return nil, fmt.Errorf("query ip investigations: %w", err)
}
defer rows.Close()
items := make(map[string]model.IPInvestigation, len(unique))
for rows.Next() {
var ip string
var payload string
var updatedAt string
if err := rows.Scan(&ip, &payload, &updatedAt); err != nil {
return nil, fmt.Errorf("scan ip investigation: %w", err)
}
var item model.IPInvestigation
if err := json.Unmarshal([]byte(payload), &item); err != nil {
return nil, fmt.Errorf("decode ip investigation %q: %w", ip, err)
}
parsed, err := parseTime(updatedAt)
if err != nil {
return nil, fmt.Errorf("parse ip investigation updated_at for %q: %w", ip, err)
}
item.UpdatedAt = parsed
items[ip] = item
}
if err := rows.Err(); err != nil {
return nil, fmt.Errorf("iterate ip investigations: %w", err)
}
return items, nil
}
func (s *Store) SaveInvestigation(ctx context.Context, item model.IPInvestigation) error {
if item.UpdatedAt.IsZero() {
item.UpdatedAt = time.Now().UTC()
}
payload, err := json.Marshal(item)
if err != nil {
return fmt.Errorf("encode ip investigation: %w", err)
}
_, err = s.db.ExecContext(
ctx,
`INSERT INTO ip_investigations (ip, payload_json, updated_at)
VALUES (?, ?, ?)
ON CONFLICT(ip) DO UPDATE SET
payload_json = excluded.payload_json,
updated_at = excluded.updated_at`,
item.IP,
string(payload),
formatTime(item.UpdatedAt),
)
if err != nil {
return fmt.Errorf("upsert ip investigation %q: %w", item.IP, err)
}
return nil
}
func (s *Store) ListRecentUserAgentsForIP(ctx context.Context, ip string, limit int) ([]string, error) {
if limit <= 0 {
limit = 10
}
rows, err := s.db.QueryContext(ctx, `
SELECT user_agent
FROM events
WHERE client_ip = ? AND TRIM(user_agent) <> ''
GROUP BY user_agent
ORDER BY MAX(occurred_at) DESC, user_agent ASC
LIMIT ?`,
ip,
limit,
)
if err != nil {
return nil, fmt.Errorf("list recent user agents for ip %q: %w", ip, err)
}
defer rows.Close()
items := make([]string, 0, limit)
for rows.Next() {
var userAgent string
if err := rows.Scan(&userAgent); err != nil {
return nil, fmt.Errorf("scan recent user agent for ip %q: %w", ip, err)
}
items = append(items, userAgent)
}
if err := rows.Err(); err != nil {
return nil, fmt.Errorf("iterate recent user agents for ip %q: %w", ip, err)
}
return items, nil
}
func (s *Store) GetSourceOffset(ctx context.Context, sourceName string) (model.SourceOffset, bool, error) {
row := s.db.QueryRowContext(ctx, `SELECT source_name, path, inode, offset, updated_at FROM source_offsets WHERE source_name = ?`, sourceName)
var offset model.SourceOffset
var updatedAt string
if err := row.Scan(&offset.SourceName, &offset.Path, &offset.Inode, &offset.Offset, &updatedAt); err != nil {
if errors.Is(err, sql.ErrNoRows) {
return model.SourceOffset{}, false, nil
}
return model.SourceOffset{}, false, fmt.Errorf("query source offset %q: %w", sourceName, err)
}
parsed, err := parseTime(updatedAt)
if err != nil {
return model.SourceOffset{}, false, fmt.Errorf("parse source offset updated_at: %w", err)
}
offset.UpdatedAt = parsed
return offset, true, nil
}
func (s *Store) SaveSourceOffset(ctx context.Context, offset model.SourceOffset) error {
if offset.UpdatedAt.IsZero() {
offset.UpdatedAt = time.Now().UTC()
}
_, err := s.db.ExecContext(
ctx,
`INSERT INTO source_offsets (source_name, path, inode, offset, updated_at)
VALUES (?, ?, ?, ?, ?)
ON CONFLICT(source_name) DO UPDATE SET
path = excluded.path,
inode = excluded.inode,
offset = excluded.offset,
updated_at = excluded.updated_at`,
offset.SourceName,
offset.Path,
offset.Inode,
offset.Offset,
formatTime(offset.UpdatedAt),
)
if err != nil {
return fmt.Errorf("upsert source offset: %w", err)
}
return nil
}
func (s *Store) listEventsForIP(ctx context.Context, ip string, limit int) ([]model.Event, error) {
query := `
SELECT e.id, e.source_name, e.profile_name, e.occurred_at, e.remote_ip, e.client_ip, e.host,
e.method, e.uri, e.path, e.status, e.user_agent, e.decision, e.decision_reason,
e.decision_reasons_json, e.enforced, e.raw_json, e.created_at,
COALESCE(s.state, ''), COALESCE(s.manual_override, '')
FROM events e
LEFT JOIN ip_state s ON s.ip = e.client_ip
WHERE e.client_ip = ?
ORDER BY e.occurred_at DESC, e.id DESC`
args := []any{ip}
if limit > 0 {
query += ` LIMIT ?`
args = append(args, limit)
}
rows, err := s.db.QueryContext(ctx, query, args...)
if err != nil {
return nil, fmt.Errorf("list events for ip %q: %w", ip, err)
}
defer rows.Close()
items := make([]model.Event, 0, max(limit, 0))
for rows.Next() {
item, err := scanEvent(rows)
if err != nil {
return nil, err
}
items = append(items, item)
}
if err := rows.Err(); err != nil {
return nil, fmt.Errorf("iterate events for ip %q: %w", ip, err)
}
return items, nil
}
func (s *Store) listDecisionsForIP(ctx context.Context, ip string, limit int) ([]model.DecisionRecord, error) {
if limit <= 0 {
limit = 50
}
rows, err := s.db.QueryContext(ctx, `
SELECT id, event_id, ip, source_name, kind, action, reason, actor, enforced, created_at
FROM decisions
WHERE ip = ?
ORDER BY created_at DESC, id DESC
LIMIT ?`,
ip,
limit,
)
if err != nil {
return nil, fmt.Errorf("list decisions for ip %q: %w", ip, err)
}
defer rows.Close()
items := make([]model.DecisionRecord, 0, limit)
for rows.Next() {
var item model.DecisionRecord
var action string
var enforced int
var createdAt string
if err := rows.Scan(&item.ID, &item.EventID, &item.IP, &item.SourceName, &item.Kind, &action, &item.Reason, &item.Actor, &enforced, &createdAt); err != nil {
return nil, fmt.Errorf("scan decision record: %w", err)
}
parsed, err := parseTime(createdAt)
if err != nil {
return nil, fmt.Errorf("parse decision created_at: %w", err)
}
item.Action = model.DecisionAction(action)
item.Enforced = enforced != 0
item.CreatedAt = parsed
items = append(items, item)
}
if err := rows.Err(); err != nil {
return nil, fmt.Errorf("iterate decisions for ip %q: %w", ip, err)
}
return items, nil
}
func (s *Store) listBackendActionsForIP(ctx context.Context, ip string, limit int) ([]model.OPNsenseAction, error) {
if limit <= 0 {
limit = 50
}
rows, err := s.db.QueryContext(ctx, `
SELECT id, ip, action, result, message, created_at
FROM backend_actions
WHERE ip = ?
ORDER BY created_at DESC, id DESC
LIMIT ?`,
ip,
limit,
)
if err != nil {
return nil, fmt.Errorf("list backend actions for ip %q: %w", ip, err)
}
defer rows.Close()
items := make([]model.OPNsenseAction, 0, limit)
for rows.Next() {
var item model.OPNsenseAction
var createdAt string
if err := rows.Scan(&item.ID, &item.IP, &item.Action, &item.Result, &item.Message, &createdAt); err != nil {
return nil, fmt.Errorf("scan backend action: %w", err)
}
parsed, err := parseTime(createdAt)
if err != nil {
return nil, fmt.Errorf("parse backend action created_at: %w", err)
}
item.CreatedAt = parsed
items = append(items, item)
}
if err := rows.Err(); err != nil {
return nil, fmt.Errorf("iterate backend actions for ip %q: %w", ip, err)
}
return items, nil
}
func max(left int, right int) int {
if left > right {
return left
}
return right
}
func getIPStateDB(ctx context.Context, db queryer, ip string) (model.IPState, bool, error) {
row := db.QueryRowContext(ctx, `
SELECT ip, first_seen_at, last_seen_at, last_source_name, last_user_agent, latest_status,
total_events, state, state_reason, manual_override, last_event_id, updated_at
FROM ip_state WHERE ip = ?`, ip)
var item model.IPState
var firstSeenAt string
var lastSeenAt string
var updatedAt string
var state string
var manualOverride string
if err := row.Scan(
&item.IP,
&firstSeenAt,
&lastSeenAt,
&item.LastSourceName,
&item.LastUserAgent,
&item.LatestStatus,
&item.TotalEvents,
&state,
&item.StateReason,
&manualOverride,
&item.LastEventID,
&updatedAt,
); err != nil {
if errors.Is(err, sql.ErrNoRows) {
return model.IPState{}, false, nil
}
return model.IPState{}, false, fmt.Errorf("query ip state %q: %w", ip, err)
}
var err error
item.FirstSeenAt, err = parseTime(firstSeenAt)
if err != nil {
return model.IPState{}, false, fmt.Errorf("parse ip state first_seen_at: %w", err)
}
item.LastSeenAt, err = parseTime(lastSeenAt)
if err != nil {
return model.IPState{}, false, fmt.Errorf("parse ip state last_seen_at: %w", err)
}
item.UpdatedAt, err = parseTime(updatedAt)
if err != nil {
return model.IPState{}, false, fmt.Errorf("parse ip state updated_at: %w", err)
}
item.State = model.IPStateStatus(state)
item.ManualOverride = model.ManualOverride(manualOverride)
return item, true, nil
}
func getIPStateTx(tx *sql.Tx, ip string) (model.IPState, bool, error) {
return getIPStateDB(context.Background(), tx, ip)
}
func upsertIPStateTx(ctx context.Context, tx *sql.Tx, state model.IPState) error {
if state.UpdatedAt.IsZero() {
state.UpdatedAt = time.Now().UTC()
}
if state.FirstSeenAt.IsZero() {
state.FirstSeenAt = state.UpdatedAt
}
if state.LastSeenAt.IsZero() {
state.LastSeenAt = state.UpdatedAt
}
if state.State == "" {
state.State = model.IPStateObserved
}
if state.ManualOverride == "" {
state.ManualOverride = model.ManualOverrideNone
}
_, err := tx.ExecContext(
ctx,
`INSERT INTO ip_state (
ip, first_seen_at, last_seen_at, last_source_name, last_user_agent, latest_status,
total_events, state, state_reason, manual_override, last_event_id, updated_at
) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
ON CONFLICT(ip) DO UPDATE SET
first_seen_at = excluded.first_seen_at,
last_seen_at = excluded.last_seen_at,
last_source_name = excluded.last_source_name,
last_user_agent = excluded.last_user_agent,
latest_status = excluded.latest_status,
total_events = excluded.total_events,
state = excluded.state,
state_reason = excluded.state_reason,
manual_override = excluded.manual_override,
last_event_id = excluded.last_event_id,
updated_at = excluded.updated_at`,
state.IP,
formatTime(state.FirstSeenAt),
formatTime(state.LastSeenAt),
state.LastSourceName,
state.LastUserAgent,
state.LatestStatus,
state.TotalEvents,
string(state.State),
state.StateReason,
string(state.ManualOverride),
state.LastEventID,
formatTime(state.UpdatedAt),
)
if err != nil {
return fmt.Errorf("upsert ip state %q: %w", state.IP, err)
}
return nil
}
func mergeEventIntoState(existing model.IPState, found bool, event model.Event) model.IPState {
now := time.Now().UTC()
state := existing
if !found {
state = model.IPState{
IP: event.ClientIP,
FirstSeenAt: event.OccurredAt,
LastSeenAt: event.OccurredAt,
LastSourceName: event.SourceName,
LastUserAgent: event.UserAgent,
LatestStatus: event.Status,
TotalEvents: 0,
State: model.IPStateObserved,
StateReason: "",
ManualOverride: model.ManualOverrideNone,
LastEventID: 0,
UpdatedAt: now,
}
}
if state.FirstSeenAt.IsZero() || event.OccurredAt.Before(state.FirstSeenAt) {
state.FirstSeenAt = event.OccurredAt
}
if state.LastSeenAt.IsZero() || event.OccurredAt.After(state.LastSeenAt) {
state.LastSeenAt = event.OccurredAt
}
state.LastSourceName = event.SourceName
state.LastUserAgent = event.UserAgent
state.LatestStatus = event.Status
state.TotalEvents++
state.LastEventID = event.ID
state.UpdatedAt = now
if state.ManualOverride == "" {
state.ManualOverride = model.ManualOverrideNone
}
switch state.ManualOverride {
case model.ManualOverrideForceBlock:
state.State = model.IPStateBlocked
if event.DecisionReason != "" {
state.StateReason = event.DecisionReason
} else if state.StateReason == "" {
state.StateReason = "manual override: block"
}
return state
case model.ManualOverrideForceAllow:
state.State = model.IPStateAllowed
if event.DecisionReason != "" {
state.StateReason = event.DecisionReason
} else if state.StateReason == "" {
state.StateReason = "manual override: allow"
}
return state
}
switch event.Decision {
case model.DecisionActionBlock:
state.State = model.IPStateBlocked
state.StateReason = event.DecisionReason
case model.DecisionActionReview:
if state.State != model.IPStateBlocked && state.State != model.IPStateAllowed {
state.State = model.IPStateReview
state.StateReason = event.DecisionReason
}
case model.DecisionActionAllow:
state.State = model.IPStateAllowed
state.StateReason = event.DecisionReason
default:
if state.State == "" {
state.State = model.IPStateObserved
}
}
return state
}
func scanEvent(scanner interface{ Scan(dest ...any) error }) (model.Event, error) {
var item model.Event
var occurredAt string
var createdAt string
var decision string
var decisionReasonsJSON string
var enforced int
var currentState string
var manualOverride string
if err := scanner.Scan(
&item.ID,
&item.SourceName,
&item.ProfileName,
&occurredAt,
&item.RemoteIP,
&item.ClientIP,
&item.Host,
&item.Method,
&item.URI,
&item.Path,
&item.Status,
&item.UserAgent,
&decision,
&item.DecisionReason,
&decisionReasonsJSON,
&enforced,
&item.RawJSON,
&createdAt,
&currentState,
&manualOverride,
); err != nil {
return model.Event{}, fmt.Errorf("scan event: %w", err)
}
parsedOccurredAt, err := parseTime(occurredAt)
if err != nil {
return model.Event{}, fmt.Errorf("parse event occurred_at: %w", err)
}
parsedCreatedAt, err := parseTime(createdAt)
if err != nil {
return model.Event{}, fmt.Errorf("parse event created_at: %w", err)
}
var reasons []string
if strings.TrimSpace(decisionReasonsJSON) != "" {
if err := json.Unmarshal([]byte(decisionReasonsJSON), &reasons); err != nil {
return model.Event{}, fmt.Errorf("decode event decision_reasons_json: %w", err)
}
}
item.OccurredAt = parsedOccurredAt
item.CreatedAt = parsedCreatedAt
item.Decision = model.DecisionAction(decision)
item.DecisionReasons = reasons
item.Enforced = enforced != 0
item.CurrentState = model.IPStateStatus(currentState)
item.ManualOverride = model.ManualOverride(manualOverride)
return item, nil
}
func scanIPState(scanner interface{ Scan(dest ...any) error }) (model.IPState, error) {
var item model.IPState
var firstSeenAt string
var lastSeenAt string
var updatedAt string
var state string
var manualOverride string
if err := scanner.Scan(
&item.IP,
&firstSeenAt,
&lastSeenAt,
&item.LastSourceName,
&item.LastUserAgent,
&item.LatestStatus,
&item.TotalEvents,
&state,
&item.StateReason,
&manualOverride,
&item.LastEventID,
&updatedAt,
); err != nil {
return model.IPState{}, fmt.Errorf("scan ip state: %w", err)
}
var err error
item.FirstSeenAt, err = parseTime(firstSeenAt)
if err != nil {
return model.IPState{}, fmt.Errorf("parse ip state first_seen_at: %w", err)
}
item.LastSeenAt, err = parseTime(lastSeenAt)
if err != nil {
return model.IPState{}, fmt.Errorf("parse ip state last_seen_at: %w", err)
}
item.UpdatedAt, err = parseTime(updatedAt)
if err != nil {
return model.IPState{}, fmt.Errorf("parse ip state updated_at: %w", err)
}
item.State = model.IPStateStatus(state)
item.ManualOverride = model.ManualOverride(manualOverride)
return item, nil
}
func parseTime(value string) (time.Time, error) {
trimmed := strings.TrimSpace(value)
if trimmed == "" {
return time.Time{}, nil
}
parsed, err := time.Parse(time.RFC3339Nano, trimmed)
if err != nil {
return time.Time{}, err
}
return parsed.UTC(), nil
}
func formatTime(value time.Time) string {
if value.IsZero() {
return ""
}
return value.UTC().Format(time.RFC3339Nano)
}
func boolToInt(value bool) int {
if value {
return 1
}
return 0
}
func uniqueNonEmptyStrings(items []string) []string {
seen := make(map[string]struct{}, len(items))
result := make([]string, 0, len(items))
for _, item := range items {
trimmed := strings.TrimSpace(item)
if trimmed == "" {
continue
}
if _, ok := seen[trimmed]; ok {
continue
}
seen[trimmed] = struct{}{}
result = append(result, trimmed)
}
sort.Strings(result)
return result
}
type queryer interface {
QueryRowContext(ctx context.Context, query string, args ...any) *sql.Row
}