package cmd import ( "context" "fmt" "log/slog" "net" "os" "os/signal" "path/filepath" "syscall" "time" "github.com/spf13/cobra" "google.golang.org/grpc" "somegit.dev/vikingowl/reddit-reader/internal/config" "somegit.dev/vikingowl/reddit-reader/internal/domain" grpcserver "somegit.dev/vikingowl/reddit-reader/internal/grpc/server" "somegit.dev/vikingowl/reddit-reader/internal/llm" "somegit.dev/vikingowl/reddit-reader/internal/monitor" redditpkg "somegit.dev/vikingowl/reddit-reader/internal/reddit" "somegit.dev/vikingowl/reddit-reader/internal/store" ) var serveCmd = &cobra.Command{ Use: "serve", Short: "Start the monitor daemon and gRPC server", RunE: runServe, } func init() { rootCmd.AddCommand(serveCmd) } func runServe(_ *cobra.Command, _ []string) error { cfg, err := config.LoadFromFile(config.DefaultPath()) if err != nil { return fmt.Errorf("load config (run 'reddit-reader setup' first): %w", err) } cfg.ApplyEnvOverrides() configDir, err := os.UserConfigDir() if err != nil { return fmt.Errorf("resolve config dir: %w", err) } dbPath := filepath.Join(configDir, "reddit-reader", "reddit-reader.db") st, err := store.Open(dbPath) if err != nil { return fmt.Errorf("open store: %w", err) } redditClient, err := redditpkg.NewClient( cfg.Reddit.ClientID, cfg.Reddit.ClientSecret, cfg.Reddit.Username, cfg.Reddit.Password, ) if err != nil { return fmt.Errorf("create reddit client: %w", err) } var llmClient llm.Summarizer if cfg.LLM.Backend == "mistral" { llmClient = llm.NewMistralClient(cfg.LLM.APIKey, cfg.LLM.Model) } else { llmClient = llm.NewOpenAIClient(cfg.LLM.Endpoint, cfg.LLM.Model) } socketPath := cfg.GRPC.Socket if socketPath == "" { socketPath = config.DefaultSocket() } lis, err := net.Listen("unix", socketPath) if err != nil { return fmt.Errorf("listen on socket %s: %w", socketPath, err) } grpcSrv := grpc.NewServer() srv := grpcserver.Register(grpcSrv, st, time.Now()) go func() { slog.Info("gRPC server listening", "socket", socketPath) if serveErr := grpcSrv.Serve(lis); serveErr != nil { slog.Error("gRPC serve error", "err", serveErr) } }() monitorCfg := monitor.Config{ PollInterval: cfg.Monitor.PollInterval.Duration, RelevanceThreshold: cfg.LLM.RelevanceThreshold, MaxPostsPerPoll: cfg.Monitor.MaxPostsPerPoll, Interests: domain.Interests{ Description: cfg.Interests.Description, }, } mon := monitor.New(st, redditClient, llmClient, monitorCfg) ctx, stop := signal.NotifyContext(context.Background(), syscall.SIGINT, syscall.SIGTERM) defer stop() slog.Info("monitor starting") runErr := mon.Run(ctx, func(posts []domain.Post) { srv.Notify(posts) }) grpcSrv.GracefulStop() if removeErr := os.Remove(socketPath); removeErr != nil && !os.IsNotExist(removeErr) { slog.Warn("failed to remove socket", "path", socketPath, "err", removeErr) } if runErr != nil && runErr != context.Canceled { return runErr } return nil }