From e0055b6ef9cee754c1edb6a54f60983becf4b7d2 Mon Sep 17 00:00:00 2001 From: vikingowl Date: Fri, 3 Apr 2026 11:58:11 +0200 Subject: [PATCH] feat(monitor): polling loop with dedup, keyword filter, LLM scoring --- internal/monitor/monitor.go | 194 +++++++++++++++++++++++++++++++ internal/monitor/monitor_test.go | 112 ++++++++++++++++++ 2 files changed, 306 insertions(+) create mode 100644 internal/monitor/monitor.go create mode 100644 internal/monitor/monitor_test.go diff --git a/internal/monitor/monitor.go b/internal/monitor/monitor.go new file mode 100644 index 0000000..38a96db --- /dev/null +++ b/internal/monitor/monitor.go @@ -0,0 +1,194 @@ +package monitor + +import ( + "context" + "log/slog" + "time" + + "somegit.dev/vikingowl/reddit-reader/internal/domain" + "somegit.dev/vikingowl/reddit-reader/internal/filter" + "somegit.dev/vikingowl/reddit-reader/internal/llm" + "somegit.dev/vikingowl/reddit-reader/internal/reddit" + "somegit.dev/vikingowl/reddit-reader/internal/store" +) + +// Config holds monitor tuning parameters. +type Config struct { + PollInterval time.Duration + RelevanceThreshold float64 + MaxPostsPerPoll int + Interests domain.Interests +} + +// Monitor polls subreddits, scores, summarizes, and persists new posts. +type Monitor struct { + store *store.Store + fetcher reddit.Fetcher + scorer *filter.Scorer + llm llm.Summarizer + cfg Config +} + +// New creates a Monitor with an internal relevance scorer. +func New(s *store.Store, f reddit.Fetcher, l llm.Summarizer, cfg Config) *Monitor { + return &Monitor{ + store: s, + fetcher: f, + scorer: filter.NewScorer(l, cfg.RelevanceThreshold), + llm: l, + cfg: cfg, + } +} + +// PollOnce runs a single poll cycle across all enabled subreddits. +// It retries unsummarized posts first, then polls each subreddit. +func (m *Monitor) PollOnce(ctx context.Context) ([]domain.Post, error) { + m.retrySummaries(ctx) + + subs, err := m.store.ListSubreddits() + if err != nil { + return nil, err + } + + var all []domain.Post + for _, sub := range subs { + if !sub.Enabled { + continue + } + posts, err := m.pollSubreddit(ctx, sub) + if err != nil { + slog.Warn("pollSubreddit failed", "subreddit", sub.Name, "err", err) + continue + } + all = append(all, posts...) + } + return all, nil +} + +// pollSubreddit fetches, filters, scores, summarizes, and stores posts for one subreddit. +func (m *Monitor) pollSubreddit(ctx context.Context, sub domain.Subreddit) ([]domain.Post, error) { + limit := m.cfg.MaxPostsPerPoll + if limit <= 0 { + limit = 25 + } + + fetched, err := m.fetcher.FetchPosts(ctx, sub.Name, sub.PollSort, limit) + if err != nil { + return nil, err + } + + filters, err := m.store.ListFilters(sub.Name) + if err != nil { + return nil, err + } + + feedback, err := m.store.RecentFeedback(20) + if err != nil { + slog.Warn("RecentFeedback failed", "err", err) + } + interests := domain.Interests{ + Description: m.cfg.Interests.Description, + Examples: feedback, + } + + var newPosts []domain.Post + + for _, post := range fetched { + exists, err := m.store.PostExists(post.ID) + if err != nil { + slog.Warn("PostExists check failed", "id", post.ID, "err", err) + continue + } + if exists { + continue + } + + if !filter.MatchesAny(post, filters) { + continue + } + + score, above, err := m.scorer.ScorePost(ctx, post, interests) + if err != nil { + slog.Warn("ScorePost failed, storing without score", "id", post.ID, "err", err) + post.FetchedAt = time.Now() + if insertErr := m.store.InsertPost(post); insertErr != nil { + slog.Warn("InsertPost failed", "id", post.ID, "err", insertErr) + } + continue + } + + post.Relevance = &score + + if !above { + post.FetchedAt = time.Now() + if insertErr := m.store.InsertPost(post); insertErr != nil { + slog.Warn("InsertPost failed", "id", post.ID, "err", insertErr) + } + continue + } + + summary, err := m.llm.Summarize(ctx, post) + if err != nil { + slog.Warn("Summarize failed", "id", post.ID, "err", err) + } else { + post.Summary = &summary + } + + post.FetchedAt = time.Now() + if insertErr := m.store.InsertPost(post); insertErr != nil { + slog.Warn("InsertPost failed", "id", post.ID, "err", insertErr) + continue + } + + newPosts = append(newPosts, post) + } + + return newPosts, nil +} + +// retrySummaries attempts to summarize posts that were stored without a summary. +func (m *Monitor) retrySummaries(ctx context.Context) { + posts, err := m.store.UnsummarizedPosts() + if err != nil { + slog.Warn("UnsummarizedPosts failed", "err", err) + return + } + + for _, post := range posts { + summary, err := m.llm.Summarize(ctx, post) + if err != nil { + slog.Warn("retrySummaries: Summarize failed", "id", post.ID, "err", err) + continue + } + if err := m.store.UpdatePost(post.ID, store.PostUpdate{Summary: &summary}); err != nil { + slog.Warn("retrySummaries: UpdatePost failed", "id", post.ID, "err", err) + } + } +} + +// Run is a blocking loop that polls on each tick and calls notify with new posts. +func (m *Monitor) Run(ctx context.Context, notify func([]domain.Post)) error { + interval := m.cfg.PollInterval + if interval <= 0 { + interval = 5 * time.Minute + } + + ticker := time.NewTicker(interval) + defer ticker.Stop() + + for { + select { + case <-ctx.Done(): + return ctx.Err() + case <-ticker.C: + posts, err := m.PollOnce(ctx) + if err != nil { + slog.Warn("PollOnce failed", "err", err) + continue + } + if len(posts) > 0 && notify != nil { + notify(posts) + } + } + } +} diff --git a/internal/monitor/monitor_test.go b/internal/monitor/monitor_test.go new file mode 100644 index 0000000..01c51cd --- /dev/null +++ b/internal/monitor/monitor_test.go @@ -0,0 +1,112 @@ +package monitor_test + +import ( + "context" + "sync" + "testing" + "time" + + "somegit.dev/vikingowl/reddit-reader/internal/domain" + "somegit.dev/vikingowl/reddit-reader/internal/monitor" + "somegit.dev/vikingowl/reddit-reader/internal/store" +) + +type mockFetcher struct { + posts []domain.Post +} + +func (m *mockFetcher) FetchPosts(_ context.Context, _, _ string, _ int) ([]domain.Post, error) { + return m.posts, nil +} + +type mockSummarizer struct { + mu sync.Mutex + calls int +} + +func (m *mockSummarizer) Score(_ context.Context, _ domain.Post, _ domain.Interests) (float64, error) { + m.mu.Lock() + defer m.mu.Unlock() + m.calls++ + return 0.8, nil +} + +func (m *mockSummarizer) Summarize(_ context.Context, _ domain.Post) (string, error) { + return "- bullet 1\n- bullet 2\n- bullet 3\n- bullet 4\n- bullet 5", nil +} + +func TestMonitorPollCycle(t *testing.T) { + st, err := store.Open(":memory:") + if err != nil { + t.Fatal(err) + } + defer st.Close() + + st.AddSubreddit(domain.Subreddit{Name: "golang", PollSort: "new"}) + st.AddFilter(domain.Filter{Subreddit: "golang", Pattern: "go"}) + + fetcher := &mockFetcher{ + posts: []domain.Post{ + {ID: "t3_new1", Subreddit: "golang", Title: "Go tips", CreatedUTC: time.Now()}, + {ID: "t3_new2", Subreddit: "golang", Title: "Rust tips", CreatedUTC: time.Now()}, + }, + } + summarizer := &mockSummarizer{} + + m := monitor.New(st, fetcher, summarizer, monitor.Config{ + RelevanceThreshold: 0.6, + MaxPostsPerPoll: 25, + Interests: domain.Interests{Description: "Go programming"}, + }) + + newPosts, err := m.PollOnce(context.Background()) + if err != nil { + t.Fatalf("PollOnce: %v", err) + } + + // "Go tips" matches keyword "go", "Rust tips" does not + if len(newPosts) != 1 { + t.Fatalf("newPosts = %d, want 1", len(newPosts)) + } + if newPosts[0].ID != "t3_new1" { + t.Errorf("post ID = %q, want t3_new1", newPosts[0].ID) + } + if newPosts[0].Summary == nil { + t.Error("expected summary to be set") + } + + exists, _ := st.PostExists("t3_new1") + if !exists { + t.Error("post should be in store") + } +} + +func TestMonitorDedup(t *testing.T) { + st, err := store.Open(":memory:") + if err != nil { + t.Fatal(err) + } + defer st.Close() + + st.AddSubreddit(domain.Subreddit{Name: "golang", PollSort: "new"}) + st.InsertPost(domain.Post{ID: "t3_existing", Subreddit: "golang", Title: "Old", CreatedUTC: time.Now()}) + + fetcher := &mockFetcher{ + posts: []domain.Post{ + {ID: "t3_existing", Subreddit: "golang", Title: "Old", CreatedUTC: time.Now()}, + }, + } + + m := monitor.New(st, fetcher, &mockSummarizer{}, monitor.Config{ + RelevanceThreshold: 0.6, MaxPostsPerPoll: 25, + Interests: domain.Interests{Description: "Go"}, + }) + + newPosts, err := m.PollOnce(context.Background()) + if err != nil { + t.Fatalf("PollOnce: %v", err) + } + if len(newPosts) != 0 { + t.Errorf("expected 0 new posts (dedup), got %d", len(newPosts)) + } +}