120 lines
2.9 KiB
Go
120 lines
2.9 KiB
Go
package cmd
|
|
|
|
import (
|
|
"context"
|
|
"fmt"
|
|
"log/slog"
|
|
"net"
|
|
"os"
|
|
"os/signal"
|
|
"path/filepath"
|
|
"syscall"
|
|
"time"
|
|
|
|
"github.com/spf13/cobra"
|
|
"google.golang.org/grpc"
|
|
|
|
"somegit.dev/vikingowl/reddit-reader/internal/config"
|
|
"somegit.dev/vikingowl/reddit-reader/internal/domain"
|
|
grpcserver "somegit.dev/vikingowl/reddit-reader/internal/grpc/server"
|
|
"somegit.dev/vikingowl/reddit-reader/internal/llm"
|
|
"somegit.dev/vikingowl/reddit-reader/internal/monitor"
|
|
redditpkg "somegit.dev/vikingowl/reddit-reader/internal/reddit"
|
|
"somegit.dev/vikingowl/reddit-reader/internal/store"
|
|
)
|
|
|
|
var serveCmd = &cobra.Command{
|
|
Use: "serve",
|
|
Short: "Start the monitor daemon and gRPC server",
|
|
RunE: runServe,
|
|
}
|
|
|
|
func init() {
|
|
rootCmd.AddCommand(serveCmd)
|
|
}
|
|
|
|
func runServe(_ *cobra.Command, _ []string) error {
|
|
cfg, err := config.LoadFromFile(config.DefaultPath())
|
|
if err != nil {
|
|
return fmt.Errorf("load config (run 'reddit-reader setup' first): %w", err)
|
|
}
|
|
cfg.ApplyEnvOverrides()
|
|
|
|
configDir, err := os.UserConfigDir()
|
|
if err != nil {
|
|
return fmt.Errorf("resolve config dir: %w", err)
|
|
}
|
|
dbPath := filepath.Join(configDir, "reddit-reader", "reddit-reader.db")
|
|
|
|
st, err := store.Open(dbPath)
|
|
if err != nil {
|
|
return fmt.Errorf("open store: %w", err)
|
|
}
|
|
|
|
redditClient, err := redditpkg.NewClient(
|
|
cfg.Reddit.ClientID,
|
|
cfg.Reddit.ClientSecret,
|
|
cfg.Reddit.Username,
|
|
cfg.Reddit.Password,
|
|
)
|
|
if err != nil {
|
|
return fmt.Errorf("create reddit client: %w", err)
|
|
}
|
|
|
|
var llmClient llm.Summarizer
|
|
if cfg.LLM.Backend == "mistral" {
|
|
llmClient = llm.NewMistralClient(cfg.LLM.APIKey, cfg.LLM.Model)
|
|
} else {
|
|
llmClient = llm.NewOpenAIClient(cfg.LLM.Endpoint, cfg.LLM.Model)
|
|
}
|
|
|
|
socketPath := cfg.GRPC.Socket
|
|
if socketPath == "" {
|
|
socketPath = config.DefaultSocket()
|
|
}
|
|
|
|
lis, err := net.Listen("unix", socketPath)
|
|
if err != nil {
|
|
return fmt.Errorf("listen on socket %s: %w", socketPath, err)
|
|
}
|
|
|
|
grpcSrv := grpc.NewServer()
|
|
srv := grpcserver.Register(grpcSrv, st, time.Now())
|
|
|
|
go func() {
|
|
slog.Info("gRPC server listening", "socket", socketPath)
|
|
if serveErr := grpcSrv.Serve(lis); serveErr != nil {
|
|
slog.Error("gRPC serve error", "err", serveErr)
|
|
}
|
|
}()
|
|
|
|
monitorCfg := monitor.Config{
|
|
PollInterval: cfg.Monitor.PollInterval.Duration,
|
|
RelevanceThreshold: cfg.LLM.RelevanceThreshold,
|
|
MaxPostsPerPoll: cfg.Monitor.MaxPostsPerPoll,
|
|
Interests: domain.Interests{
|
|
Description: cfg.Interests.Description,
|
|
},
|
|
}
|
|
|
|
mon := monitor.New(st, redditClient, llmClient, monitorCfg)
|
|
|
|
ctx, stop := signal.NotifyContext(context.Background(), syscall.SIGINT, syscall.SIGTERM)
|
|
defer stop()
|
|
|
|
slog.Info("monitor starting")
|
|
runErr := mon.Run(ctx, func(posts []domain.Post) {
|
|
srv.Notify(posts)
|
|
})
|
|
|
|
grpcSrv.GracefulStop()
|
|
if removeErr := os.Remove(socketPath); removeErr != nil && !os.IsNotExist(removeErr) {
|
|
slog.Warn("failed to remove socket", "path", socketPath, "err", removeErr)
|
|
}
|
|
|
|
if runErr != nil && runErr != context.Canceled {
|
|
return runErr
|
|
}
|
|
return nil
|
|
}
|