package service import ( "bufio" "context" "errors" "fmt" "io" "log" "net" "os" "strings" "sync" "syscall" "time" "git.dern.ovh/infrastructure/caddy-opnsense-blocker/internal/caddylog" "git.dern.ovh/infrastructure/caddy-opnsense-blocker/internal/config" "git.dern.ovh/infrastructure/caddy-opnsense-blocker/internal/engine" "git.dern.ovh/infrastructure/caddy-opnsense-blocker/internal/model" "git.dern.ovh/infrastructure/caddy-opnsense-blocker/internal/opnsense" "git.dern.ovh/infrastructure/caddy-opnsense-blocker/internal/store" ) type Service struct { cfg *config.Config store *store.Store evaluator *engine.Evaluator blocker opnsense.AliasClient investigator Investigator logger *log.Logger } type Investigator interface { Investigate(ctx context.Context, ip string, userAgents []string) (model.IPInvestigation, error) } func New(cfg *config.Config, db *store.Store, blocker opnsense.AliasClient, investigator Investigator, logger *log.Logger) *Service { if logger == nil { logger = log.New(io.Discard, "", 0) } return &Service{ cfg: cfg, store: db, evaluator: engine.NewEvaluator(), blocker: blocker, investigator: investigator, logger: logger, } } func (s *Service) Run(ctx context.Context) error { var wg sync.WaitGroup for _, source := range s.cfg.Sources { source := source wg.Add(1) go func() { defer wg.Done() s.runSource(ctx, source) }() } <-ctx.Done() wg.Wait() return nil } func (s *Service) GetOverview(ctx context.Context, limit int) (model.Overview, error) { return s.store.GetOverview(ctx, limit) } func (s *Service) ListEvents(ctx context.Context, limit int) ([]model.Event, error) { return s.store.ListRecentEvents(ctx, limit) } func (s *Service) ListIPs(ctx context.Context, limit int, state string) ([]model.IPState, error) { return s.store.ListIPStates(ctx, limit, state) } func (s *Service) ListRecentIPs(ctx context.Context, since time.Time, limit int) ([]model.RecentIPRow, error) { items, err := s.store.ListRecentIPRows(ctx, since, limit) if err != nil { return nil, err } for index := range items { state := model.IPState{ IP: items[index].IP, State: items[index].State, ManualOverride: items[index].ManualOverride, } backend := s.resolveOPNsenseStatus(ctx, state) items[index].Actions = actionAvailability(state, backend) } return items, nil } func (s *Service) GetIPDetails(ctx context.Context, ip string) (model.IPDetails, error) { normalized, err := normalizeIP(ip) if err != nil { return model.IPDetails{}, err } details, err := s.store.GetIPDetails(ctx, normalized, 0, 100, 100) if err != nil { return model.IPDetails{}, err } return s.decorateDetails(ctx, details) } func (s *Service) InvestigateIP(ctx context.Context, ip string) (model.IPDetails, error) { normalized, err := normalizeIP(ip) if err != nil { return model.IPDetails{}, err } details, err := s.store.GetIPDetails(ctx, normalized, 0, 100, 100) if err != nil { return model.IPDetails{}, err } if s.investigator != nil { investigation, found, err := s.store.GetInvestigation(ctx, normalized) if err != nil { return model.IPDetails{}, err } shouldRefresh := !found || time.Since(investigation.UpdatedAt) >= s.cfg.Investigation.RefreshAfter.Duration if shouldRefresh { fresh, err := s.investigator.Investigate(ctx, normalized, collectUserAgents(details.RecentEvents)) if err != nil { return model.IPDetails{}, err } if err := s.store.SaveInvestigation(ctx, fresh); err != nil { return model.IPDetails{}, err } details.Investigation = &fresh } } return s.decorateDetails(ctx, details) } func (s *Service) ForceBlock(ctx context.Context, ip string, actor string, reason string) error { return s.applyManualOverride(ctx, ip, model.ManualOverrideForceBlock, model.IPStateBlocked, actor, defaultReason(reason, "manual block"), "block") } func (s *Service) ForceAllow(ctx context.Context, ip string, actor string, reason string) error { return s.applyManualOverride(ctx, ip, model.ManualOverrideForceAllow, model.IPStateAllowed, actor, defaultReason(reason, "manual allow"), "unblock") } func (s *Service) ClearOverride(ctx context.Context, ip string, actor string, reason string) error { normalized, err := normalizeIP(ip) if err != nil { return err } reason = defaultReason(reason, "manual override cleared") state, err := s.store.ClearManualOverride(ctx, normalized, reason) if err != nil { return err } return s.store.AddDecision(ctx, &model.DecisionRecord{ EventID: state.LastEventID, IP: normalized, SourceName: state.LastSourceName, Kind: "manual", Action: model.DecisionActionNone, Reason: reason, Actor: defaultActor(actor), Enforced: false, CreatedAt: time.Now().UTC(), }) } func (s *Service) runSource(ctx context.Context, source config.SourceConfig) { s.pollSource(ctx, source) ticker := time.NewTicker(source.PollInterval.Duration) defer ticker.Stop() for { select { case <-ctx.Done(): return case <-ticker.C: s.pollSource(ctx, source) } } } func (s *Service) pollSource(ctx context.Context, source config.SourceConfig) { lines, err := s.readNewLines(ctx, source) if err != nil { s.logger.Printf("source %s: %v", source.Name, err) return } if len(lines) == 0 { return } profile := s.cfg.Profiles[source.Profile] for _, line := range lines { record, err := caddylog.ParseLine(line) if err != nil { if errors.Is(err, caddylog.ErrEmptyLine) { continue } s.logger.Printf("source %s: parse line: %v", source.Name, err) continue } if record.Status < profile.MinStatus || record.Status > profile.MaxStatus { continue } if err := s.processRecord(ctx, source, profile, record); err != nil { s.logger.Printf("source %s: process record: %v", source.Name, err) } } } func (s *Service) processRecord(ctx context.Context, source config.SourceConfig, profile config.ProfileConfig, record model.AccessLogRecord) error { state, found, err := s.store.GetIPState(ctx, record.ClientIP) if err != nil { return err } override := model.ManualOverrideNone if found { override = state.ManualOverride } decision := s.evaluator.Evaluate(record, profile, override) event := model.Event{ SourceName: source.Name, ProfileName: source.Profile, OccurredAt: record.OccurredAt, RemoteIP: record.RemoteIP, ClientIP: record.ClientIP, Host: record.Host, Method: record.Method, URI: record.URI, Path: record.Path, Status: record.Status, UserAgent: record.UserAgent, Decision: decision.Action, DecisionReason: decision.PrimaryReason(), DecisionReasons: append([]string(nil), decision.Reasons...), Enforced: false, RawJSON: record.RawJSON, CreatedAt: time.Now().UTC(), } var backendAction *model.OPNsenseAction if decision.Action == model.DecisionActionBlock && s.blocker != nil { result, blockErr := s.blocker.AddIPIfMissing(ctx, record.ClientIP) backendAction = &model.OPNsenseAction{ IP: record.ClientIP, Action: "block", CreatedAt: time.Now().UTC(), } if blockErr != nil { backendAction.Result = "error" backendAction.Message = blockErr.Error() } else { backendAction.Result = result backendAction.Message = decision.PrimaryReason() event.Enforced = true } } if err := s.store.RecordEvent(ctx, &event); err != nil { return err } if decision.Action != model.DecisionActionNone { if err := s.store.AddDecision(ctx, &model.DecisionRecord{ EventID: event.ID, IP: record.ClientIP, SourceName: source.Name, Kind: "automatic", Action: decision.Action, Reason: strings.Join(decision.Reasons, ", "), Actor: "engine", Enforced: event.Enforced, CreatedAt: time.Now().UTC(), }); err != nil { return err } } if backendAction != nil { if err := s.store.AddBackendAction(ctx, backendAction); err != nil { return err } } return nil } func (s *Service) readNewLines(ctx context.Context, source config.SourceConfig) ([]string, error) { info, err := os.Stat(source.Path) if err != nil { if errors.Is(err, os.ErrNotExist) { return nil, nil } return nil, fmt.Errorf("stat source path %q: %w", source.Path, err) } inode := fileIdentity(info) size := info.Size() offset, found, err := s.store.GetSourceOffset(ctx, source.Name) if err != nil { return nil, err } if !found { start := int64(0) if source.InitialPosition == "end" { start = size } offset = model.SourceOffset{ SourceName: source.Name, Path: source.Path, Inode: inode, Offset: start, UpdatedAt: time.Now().UTC(), } if err := s.store.SaveSourceOffset(ctx, offset); err != nil { return nil, err } if start >= size { return nil, nil } } else if offset.Inode != inode || size < offset.Offset { offset = model.SourceOffset{ SourceName: source.Name, Path: source.Path, Inode: inode, Offset: 0, UpdatedAt: time.Now().UTC(), } } file, err := os.Open(source.Path) if err != nil { return nil, fmt.Errorf("open source path %q: %w", source.Path, err) } defer file.Close() if _, err := file.Seek(offset.Offset, io.SeekStart); err != nil { return nil, fmt.Errorf("seek source path %q: %w", source.Path, err) } reader := bufio.NewReader(file) lines := make([]string, 0, source.BatchSize) currentOffset := offset.Offset for len(lines) < source.BatchSize { line, err := reader.ReadString('\n') if err != nil { if errors.Is(err, io.EOF) { break } return nil, fmt.Errorf("read source path %q: %w", source.Path, err) } currentOffset += int64(len(line)) lines = append(lines, strings.TrimRight(line, "\r\n")) } offset.Path = source.Path offset.Inode = inode offset.Offset = currentOffset offset.UpdatedAt = time.Now().UTC() if err := s.store.SaveSourceOffset(ctx, offset); err != nil { return nil, err } return lines, nil } func (s *Service) applyManualOverride(ctx context.Context, ip string, override model.ManualOverride, state model.IPStateStatus, actor string, reason string, backendAction string) error { normalized, err := normalizeIP(ip) if err != nil { return err } enforced := false var backendRecord *model.OPNsenseAction if s.blocker != nil { backendRecord = &model.OPNsenseAction{ IP: normalized, Action: backendAction, CreatedAt: time.Now().UTC(), } switch override { case model.ManualOverrideForceBlock: result, callErr := s.blocker.AddIPIfMissing(ctx, normalized) if callErr != nil { backendRecord.Result = "error" backendRecord.Message = callErr.Error() } else { backendRecord.Result = result backendRecord.Message = reason enforced = true } case model.ManualOverrideForceAllow: result, callErr := s.blocker.RemoveIPIfPresent(ctx, normalized) if callErr != nil { backendRecord.Result = "error" backendRecord.Message = callErr.Error() } else { backendRecord.Result = result backendRecord.Message = reason enforced = true } } } current, err := s.store.SetManualOverride(ctx, normalized, override, state, reason) if err != nil { return err } if err := s.store.AddDecision(ctx, &model.DecisionRecord{ EventID: current.LastEventID, IP: normalized, SourceName: current.LastSourceName, Kind: "manual", Action: actionForOverride(override), Reason: reason, Actor: defaultActor(actor), Enforced: enforced, CreatedAt: time.Now().UTC(), }); err != nil { return err } if backendRecord != nil { if err := s.store.AddBackendAction(ctx, backendRecord); err != nil { return err } } return nil } func normalizeIP(ip string) (string, error) { parsed := net.ParseIP(strings.TrimSpace(ip)) if parsed == nil { return "", fmt.Errorf("invalid ip address %q", ip) } return parsed.String(), nil } func fileIdentity(info os.FileInfo) string { if stat, ok := info.Sys().(*syscall.Stat_t); ok { return fmt.Sprintf("%d:%d", stat.Dev, stat.Ino) } return fmt.Sprintf("fallback:%d:%d", info.ModTime().UnixNano(), info.Size()) } func actionForOverride(override model.ManualOverride) model.DecisionAction { switch override { case model.ManualOverrideForceBlock: return model.DecisionActionBlock case model.ManualOverrideForceAllow: return model.DecisionActionAllow default: return model.DecisionActionNone } } func defaultActor(actor string) string { if strings.TrimSpace(actor) == "" { return "web-ui" } return strings.TrimSpace(actor) } func defaultReason(reason string, fallback string) string { if strings.TrimSpace(reason) == "" { return fallback } return strings.TrimSpace(reason) } func (s *Service) decorateDetails(ctx context.Context, details model.IPDetails) (model.IPDetails, error) { if details.State.IP != "" && details.Investigation == nil { investigation, found, err := s.store.GetInvestigation(ctx, details.State.IP) if err != nil { return model.IPDetails{}, err } if found { details.Investigation = &investigation } } details.OPNsense = s.resolveOPNsenseStatus(ctx, details.State) details.Actions = actionAvailability(details.State, details.OPNsense) return details, nil } func (s *Service) resolveOPNsenseStatus(ctx context.Context, state model.IPState) model.OPNsenseStatus { status := model.OPNsenseStatus{Configured: s.blocker != nil} if s.blocker == nil || state.IP == "" { return status } status.CheckedAt = time.Now().UTC() present, err := s.blocker.IsIPPresent(ctx, state.IP) if err != nil { status.Error = err.Error() return status } status.Present = present return status } func actionAvailability(state model.IPState, backend model.OPNsenseStatus) model.ActionAvailability { present := false if backend.Configured && backend.Error == "" { present = backend.Present } else { present = state.State == model.IPStateBlocked || state.ManualOverride == model.ManualOverrideForceBlock } return model.ActionAvailability{ CanBlock: !present, CanUnblock: present, CanClearOverride: state.ManualOverride != model.ManualOverrideNone, } } func collectUserAgents(events []model.Event) []string { items := make([]string, 0, len(events)) for _, event := range events { if strings.TrimSpace(event.UserAgent) == "" { continue } items = append(items, event.UserAgent) } return items }