Files
reddit-reader/cmd/serve.go

120 lines
2.9 KiB
Go

package cmd
import (
"context"
"fmt"
"log/slog"
"net"
"os"
"os/signal"
"path/filepath"
"syscall"
"time"
"github.com/spf13/cobra"
"google.golang.org/grpc"
"somegit.dev/vikingowl/reddit-reader/internal/config"
"somegit.dev/vikingowl/reddit-reader/internal/domain"
grpcserver "somegit.dev/vikingowl/reddit-reader/internal/grpc/server"
"somegit.dev/vikingowl/reddit-reader/internal/llm"
"somegit.dev/vikingowl/reddit-reader/internal/monitor"
redditpkg "somegit.dev/vikingowl/reddit-reader/internal/reddit"
"somegit.dev/vikingowl/reddit-reader/internal/store"
)
var serveCmd = &cobra.Command{
Use: "serve",
Short: "Start the monitor daemon and gRPC server",
RunE: runServe,
}
func init() {
rootCmd.AddCommand(serveCmd)
}
func runServe(_ *cobra.Command, _ []string) error {
cfg, err := config.LoadFromFile(config.DefaultPath())
if err != nil {
return fmt.Errorf("load config (run 'reddit-reader setup' first): %w", err)
}
cfg.ApplyEnvOverrides()
configDir, err := os.UserConfigDir()
if err != nil {
return fmt.Errorf("resolve config dir: %w", err)
}
dbPath := filepath.Join(configDir, "reddit-reader", "reddit-reader.db")
st, err := store.Open(dbPath)
if err != nil {
return fmt.Errorf("open store: %w", err)
}
redditClient, err := redditpkg.NewClient(
cfg.Reddit.ClientID,
cfg.Reddit.ClientSecret,
cfg.Reddit.Username,
cfg.Reddit.Password,
)
if err != nil {
return fmt.Errorf("create reddit client: %w", err)
}
var llmClient llm.Summarizer
if cfg.LLM.Backend == "mistral" {
llmClient = llm.NewMistralClient(cfg.LLM.APIKey, cfg.LLM.Model)
} else {
llmClient = llm.NewOpenAIClient(cfg.LLM.Endpoint, cfg.LLM.Model)
}
socketPath := cfg.GRPC.Socket
if socketPath == "" {
socketPath = config.DefaultSocket()
}
lis, err := net.Listen("unix", socketPath)
if err != nil {
return fmt.Errorf("listen on socket %s: %w", socketPath, err)
}
grpcSrv := grpc.NewServer()
srv := grpcserver.Register(grpcSrv, st, time.Now())
go func() {
slog.Info("gRPC server listening", "socket", socketPath)
if serveErr := grpcSrv.Serve(lis); serveErr != nil {
slog.Error("gRPC serve error", "err", serveErr)
}
}()
monitorCfg := monitor.Config{
PollInterval: cfg.Monitor.PollInterval.Duration,
RelevanceThreshold: cfg.LLM.RelevanceThreshold,
MaxPostsPerPoll: cfg.Monitor.MaxPostsPerPoll,
Interests: domain.Interests{
Description: cfg.Interests.Description,
},
}
mon := monitor.New(st, redditClient, llmClient, monitorCfg)
ctx, stop := signal.NotifyContext(context.Background(), syscall.SIGINT, syscall.SIGTERM)
defer stop()
slog.Info("monitor starting")
runErr := mon.Run(ctx, func(posts []domain.Post) {
srv.Notify(posts)
})
grpcSrv.GracefulStop()
if removeErr := os.Remove(socketPath); removeErr != nil && !os.IsNotExist(removeErr) {
slog.Warn("failed to remove socket", "path", socketPath, "err", removeErr)
}
if runErr != nil && runErr != context.Canceled {
return runErr
}
return nil
}