2

Cache IP intelligence and add allowed filter

This commit is contained in:
2026-03-12 11:48:22 +01:00
parent 61c34699cb
commit 8b744d31f3
10 changed files with 194 additions and 33 deletions

View File

@@ -112,7 +112,6 @@ func (s *Service) ListRecentIPs(ctx context.Context, since time.Time, limit int)
if err != nil {
return nil, err
}
staleSince := time.Now().UTC().Add(-s.cfg.Investigation.RefreshAfter.Duration)
for index := range items {
state := model.IPState{
IP: items[index].IP,
@@ -121,9 +120,6 @@ func (s *Service) ListRecentIPs(ctx context.Context, since time.Time, limit int)
}
if investigation, ok := investigations[items[index].IP]; ok {
items[index].Bot = investigation.Bot
if investigation.UpdatedAt.Before(staleSince) {
s.enqueueInvestigation(items[index].IP)
}
} else {
s.enqueueInvestigation(items[index].IP)
}
@@ -316,7 +312,7 @@ func (s *Service) processRecord(ctx context.Context, source config.SourceConfig,
}
func (s *Service) runInvestigationScheduler(ctx context.Context) {
s.enqueueRecentInvestigations(ctx)
s.enqueueMissingInvestigations(ctx)
ticker := time.NewTicker(s.cfg.Investigation.BackgroundPollInterval.Duration)
defer ticker.Stop()
for {
@@ -324,7 +320,7 @@ func (s *Service) runInvestigationScheduler(ctx context.Context) {
case <-ctx.Done():
return
case <-ticker.C:
s.enqueueRecentInvestigations(ctx)
s.enqueueMissingInvestigations(ctx)
}
}
}
@@ -348,27 +344,21 @@ func (s *Service) runInvestigationWorker(ctx context.Context) {
}
}
func (s *Service) enqueueRecentInvestigations(ctx context.Context) {
func (s *Service) enqueueMissingInvestigations(ctx context.Context) {
if s.investigationQueue == nil {
return
}
since := time.Now().UTC().Add(-s.cfg.Investigation.BackgroundLookback.Duration)
items, err := s.store.ListRecentIPRows(ctx, since, s.cfg.Investigation.BackgroundBatchSize)
since := time.Time{}
if s.cfg.Investigation.BackgroundLookback.Duration > 0 {
since = time.Now().UTC().Add(-s.cfg.Investigation.BackgroundLookback.Duration)
}
items, err := s.store.ListIPsWithoutInvestigation(ctx, since, s.cfg.Investigation.BackgroundBatchSize)
if err != nil {
s.logger.Printf("list recent IPs for investigation: %v", err)
s.logger.Printf("list IPs without investigation: %v", err)
return
}
investigations, err := s.store.GetInvestigationsForIPs(ctx, recentRowIPs(items))
if err != nil {
s.logger.Printf("list investigations for recent IPs: %v", err)
return
}
staleSince := time.Now().UTC().Add(-s.cfg.Investigation.RefreshAfter.Duration)
for _, item := range items {
investigation, found := investigations[item.IP]
if !found || investigation.UpdatedAt.Before(staleSince) {
s.enqueueInvestigation(item.IP)
}
s.enqueueInvestigation(item)
}
}
@@ -412,7 +402,7 @@ func (s *Service) refreshInvestigation(ctx context.Context, ip string, force boo
if err != nil {
return nil, err
}
shouldRefresh := force || !found || time.Since(investigation.UpdatedAt) >= s.cfg.Investigation.RefreshAfter.Duration
shouldRefresh := force || !found
if !shouldRefresh {
return &investigation, nil
}

View File

@@ -240,6 +240,87 @@ sources:
}
}
func TestServiceDoesNotRefreshCachedInvestigationAutomatically(t *testing.T) {
t.Parallel()
tempDir := t.TempDir()
configPath := filepath.Join(tempDir, "config.yaml")
payload := fmt.Sprintf(`storage:
path: %s/blocker.db
investigation:
enabled: true
refresh_after: 1ns
timeout: 500ms
background_workers: 1
background_poll_interval: 50ms
profiles:
main:
auto_block: false
sources:
- name: main
path: %s/access.log
profile: main
`, tempDir, tempDir)
if err := os.WriteFile(configPath, []byte(payload), 0o600); err != nil {
t.Fatalf("write config: %v", err)
}
cfg, err := config.Load(configPath)
if err != nil {
t.Fatalf("load config: %v", err)
}
database, err := store.Open(cfg.Storage.Path)
if err != nil {
t.Fatalf("open store: %v", err)
}
defer database.Close()
investigator := &fakeInvestigator{}
svc := New(cfg, database, nil, investigator, log.New(os.Stderr, "", 0))
ctx := context.Background()
event := &model.Event{
SourceName: "main",
ProfileName: "main",
OccurredAt: time.Now().UTC(),
RemoteIP: "198.51.100.10",
ClientIP: "203.0.113.44",
Host: "example.test",
Method: "GET",
URI: "/cached",
Path: "/cached",
Status: 404,
UserAgent: "test-agent/1.0",
Decision: model.DecisionActionReview,
DecisionReason: "review",
DecisionReasons: []string{"review"},
RawJSON: `{"status":404}`,
}
if err := database.RecordEvent(ctx, event); err != nil {
t.Fatalf("record event: %v", err)
}
if err := database.SaveInvestigation(ctx, model.IPInvestigation{
IP: "203.0.113.44",
UpdatedAt: time.Now().UTC().Add(-48 * time.Hour),
Bot: &model.BotMatch{
Name: "CachedBot",
Verified: true,
},
}); err != nil {
t.Fatalf("save investigation: %v", err)
}
loaded, err := svc.refreshInvestigation(ctx, "203.0.113.44", false)
if err != nil {
t.Fatalf("refresh investigation: %v", err)
}
if loaded == nil || loaded.Bot == nil || loaded.Bot.Name != "CachedBot" {
t.Fatalf("expected cached investigation, got %+v", loaded)
}
if investigator.callCount() != 0 {
t.Fatalf("expected cached investigation to skip external refresh, got %d calls", investigator.callCount())
}
}
type fakeOPNsenseServer struct {
*httptest.Server
mu sync.Mutex