From f213d8f9cef2e7d1087fee5745edabdf67b78ed0 Mon Sep 17 00:00:00 2001 From: vikingowl Date: Thu, 7 May 2026 14:29:34 +0200 Subject: [PATCH] feat(provider): subprocess CLI provider for claude, gemini, vibe MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Adds internal/provider/subprocess — a provider.Provider that spawns CLI agents (claude, gemini, vibe) as subprocesses and streams their output. - FormatParser interface + three parsers for claude-stream-json, gemini-stream-json, and vibe-streaming formats; fixtures captured from real binaries - subprocessStream: pull-based stream.Stream over subprocess stdout with bounded stderr capture (8KB) and guarded reap() to prevent double-Wait - DiscoverCLIAgents: parallel PATH scan with 10s timeout, stable ordering - Provider: only the last user message is passed as --prompt; all other request fields (history, tools, system prompt) are intentionally ignored (see package doc) - main.go: discover and register CLI arms at startup; TODO(P0c) for tier-based routing to enforce preference order explicitly --- cmd/gnoma/main.go | 23 ++ internal/provider/subprocess/agent.go | 173 ++++++++++ internal/provider/subprocess/agent_test.go | 74 +++++ internal/provider/subprocess/format.go | 228 +++++++++++++ internal/provider/subprocess/format_test.go | 311 ++++++++++++++++++ internal/provider/subprocess/provider.go | 85 +++++ internal/provider/subprocess/provider_test.go | 79 +++++ internal/provider/subprocess/stream.go | 158 +++++++++ internal/provider/subprocess/stream_test.go | 90 +++++ .../provider/subprocess/testdata/claude.jsonl | 4 + .../provider/subprocess/testdata/gemini.jsonl | 4 + .../provider/subprocess/testdata/vibe.jsonl | 3 + 12 files changed, 1232 insertions(+) create mode 100644 internal/provider/subprocess/agent.go create mode 100644 internal/provider/subprocess/agent_test.go create mode 100644 internal/provider/subprocess/format.go create mode 100644 internal/provider/subprocess/format_test.go create mode 100644 internal/provider/subprocess/provider.go create mode 100644 internal/provider/subprocess/provider_test.go create mode 100644 internal/provider/subprocess/stream.go create mode 100644 internal/provider/subprocess/stream_test.go create mode 100644 internal/provider/subprocess/testdata/claude.jsonl create mode 100644 internal/provider/subprocess/testdata/gemini.jsonl create mode 100644 internal/provider/subprocess/testdata/vibe.jsonl diff --git a/cmd/gnoma/main.go b/cmd/gnoma/main.go index 9981714..08eae8b 100644 --- a/cmd/gnoma/main.go +++ b/cmd/gnoma/main.go @@ -32,6 +32,7 @@ import ( googleprov "somegit.dev/Owlibou/gnoma/internal/provider/google" oaiprov "somegit.dev/Owlibou/gnoma/internal/provider/openai" "somegit.dev/Owlibou/gnoma/internal/provider/openaicompat" + subprocprov "somegit.dev/Owlibou/gnoma/internal/provider/subprocess" "somegit.dev/Owlibou/gnoma/internal/session" "somegit.dev/Owlibou/gnoma/internal/stream" "somegit.dev/Owlibou/gnoma/internal/tool" @@ -288,6 +289,28 @@ func main() { logger.Debug("local models discovered", "count", len(localModels)) } + // Discover CLI agents (claude, gemini, vibe) and register as arms. + // TODO(P0c): CLI arms have cost=0 and ToolUse=true, so the router currently + // always prefers them over API arms. Tier-based routing (subprocess > local > API) + // needs to be explicit in the selector, not implicit through cost. + cliAgents := subprocprov.DiscoverCLIAgents(context.Background()) + for _, agent := range cliAgents { + cliArmID := router.NewArmID("subprocess", agent.Name) + if _, exists := rtr.LookupArm(cliArmID); !exists { + rtr.RegisterArm(&router.Arm{ + ID: cliArmID, + Provider: subprocprov.New(agent), + ModelName: agent.Name, + IsLocal: false, + Capabilities: agent.Capabilities, + }) + logger.Debug("registered CLI agent", "name", agent.Name, "version", agent.Version) + } + } + if len(cliAgents) > 0 { + logger.Debug("CLI agents discovered", "count", len(cliAgents)) + } + // Start background discovery polling (30s interval). // modelUpdater is set after the session is created so the discovery loop // can update the displayed model name when it reconciles the forced arm. diff --git a/internal/provider/subprocess/agent.go b/internal/provider/subprocess/agent.go new file mode 100644 index 0000000..5d89e1a --- /dev/null +++ b/internal/provider/subprocess/agent.go @@ -0,0 +1,173 @@ +package subprocess + +import ( + "context" + "os/exec" + "sort" + "sync" + "time" + + "somegit.dev/Owlibou/gnoma/internal/provider" +) + +// StreamFormat identifies the line-delimited JSON format a CLI agent emits. +type StreamFormat string + +const ( + FormatClaudeStreamJSON StreamFormat = "claude-stream-json" + FormatGeminiStreamJSON StreamFormat = "gemini-stream-json" + FormatVibeStreaming StreamFormat = "vibe-streaming" +) + +// CLIAgent describes a known CLI agent binary. +type CLIAgent struct { + Name string + DisplayName string + ProbeArgs []string // args to fetch version (e.g. ["--version"]) + PromptArgs func(string) []string // build argv for a non-interactive prompt run + Format StreamFormat + Capabilities provider.Capabilities +} + +// DiscoveredAgent is a CLIAgent confirmed present on PATH with its resolved path. +type DiscoveredAgent struct { + CLIAgent + Path string + Version string +} + +// knownAgents is the registry of CLI agents Gnoma supports. +var knownAgents = []CLIAgent{ + { + Name: "claude", + DisplayName: "Claude Code", + ProbeArgs: []string{"--version"}, + PromptArgs: func(p string) []string { + return []string{"-p", p, "--output-format", "stream-json", "--verbose"} + }, + Format: FormatClaudeStreamJSON, + // ToolUse=true: the claude CLI is a full agent with its own tool loop. + // This is a routing capability flag, not a provider-layer capability. + Capabilities: provider.Capabilities{ + ToolUse: true, + ContextWindow: 200000, + }, + }, + { + Name: "gemini", + DisplayName: "Gemini CLI", + ProbeArgs: []string{"--version"}, + PromptArgs: func(p string) []string { + return []string{"-p", p, "--output-format", "stream-json", "--yolo"} + }, + Format: FormatGeminiStreamJSON, + Capabilities: provider.Capabilities{ + ToolUse: true, + ContextWindow: 1048576, + }, + }, + { + Name: "vibe", + DisplayName: "Mistral Vibe", + ProbeArgs: []string{"--version"}, + PromptArgs: func(p string) []string { + return []string{"-p", p, "--output", "streaming", "--trust"} + }, + Format: FormatVibeStreaming, + Capabilities: provider.Capabilities{ + ToolUse: true, + ContextWindow: 128000, + }, + }, +} + +// newParser returns a FormatParser for the given format. +func newParser(f StreamFormat) FormatParser { + switch f { + case FormatClaudeStreamJSON: + return newClaudeParser() + case FormatGeminiStreamJSON: + return newGeminiParser() + case FormatVibeStreaming: + return newVibeParser() + default: + return nil + } +} + +// DiscoverCLIAgents scans PATH for known CLI agents in parallel and returns the +// ones that are present and respond to their probe command. +func DiscoverCLIAgents(ctx context.Context) []DiscoveredAgent { + ctx, cancel := context.WithTimeout(ctx, 10*time.Second) + defer cancel() + + var mu sync.Mutex + var found []DiscoveredAgent + var wg sync.WaitGroup + sem := make(chan struct{}, 4) + + for _, agent := range knownAgents { + path, err := exec.LookPath(agent.Name) + if err != nil { + continue + } + wg.Add(1) + sem <- struct{}{} + go func(a CLIAgent, p string) { + defer wg.Done() + defer func() { <-sem }() + version := probeAgentVersion(ctx, p, a.ProbeArgs) + mu.Lock() + found = append(found, DiscoveredAgent{CLIAgent: a, Path: p, Version: version}) + mu.Unlock() + }(agent, path) + } + wg.Wait() + + // Stable order: match knownAgents ordering. + order := make(map[string]int, len(knownAgents)) + for i, a := range knownAgents { + order[a.Name] = i + } + sort.Slice(found, func(i, j int) bool { + return order[found[i].Name] < order[found[j].Name] + }) + return found +} + +func probeAgentVersion(ctx context.Context, path string, args []string) string { + ctx, cancel := context.WithTimeout(ctx, 3*time.Second) + defer cancel() + cmd := exec.CommandContext(ctx, path, args...) + out, err := cmd.Output() + if err != nil && len(out) == 0 { + return "" + } + // Return the first non-empty line. + for _, b := range splitNL(out) { + if len(b) > 0 { + return string(b) + } + } + return "" +} + +// splitNL splits bytes by newlines, trimming carriage returns. +func splitNL(b []byte) [][]byte { + var lines [][]byte + start := 0 + for i, c := range b { + if c == '\n' { + line := b[start:i] + if len(line) > 0 && line[len(line)-1] == '\r' { + line = line[:len(line)-1] + } + lines = append(lines, line) + start = i + 1 + } + } + if start < len(b) { + lines = append(lines, b[start:]) + } + return lines +} diff --git a/internal/provider/subprocess/agent_test.go b/internal/provider/subprocess/agent_test.go new file mode 100644 index 0000000..a4c0031 --- /dev/null +++ b/internal/provider/subprocess/agent_test.go @@ -0,0 +1,74 @@ +package subprocess + +import ( + "testing" +) + +func TestKnownAgents_Defined(t *testing.T) { + if len(knownAgents) == 0 { + t.Fatal("knownAgents must not be empty") + } + for _, a := range knownAgents { + if a.Name == "" { + t.Errorf("agent with empty Name: %+v", a) + } + if a.DisplayName == "" { + t.Errorf("agent %q has empty DisplayName", a.Name) + } + if a.Format == "" { + t.Errorf("agent %q has empty Format", a.Name) + } + if a.PromptArgs == nil { + t.Errorf("agent %q has nil PromptArgs", a.Name) + } + } +} + +func TestKnownAgents_UniqueNames(t *testing.T) { + seen := make(map[string]bool) + for _, a := range knownAgents { + if seen[a.Name] { + t.Errorf("duplicate agent name %q", a.Name) + } + seen[a.Name] = true + } +} + +func TestKnownAgents_ValidFormats(t *testing.T) { + valid := map[StreamFormat]bool{ + FormatClaudeStreamJSON: true, + FormatGeminiStreamJSON: true, + FormatVibeStreaming: true, + } + for _, a := range knownAgents { + if !valid[a.Format] { + t.Errorf("agent %q has unknown format %q", a.Name, a.Format) + } + } +} + +func TestKnownAgents_PromptArgsIncludePrompt(t *testing.T) { + const testPrompt = "TESTPROMPT_UNIQUE_SENTINEL" + for _, a := range knownAgents { + args := a.PromptArgs(testPrompt) + found := false + for _, arg := range args { + if arg == testPrompt { + found = true + break + } + } + if !found { + t.Errorf("agent %q PromptArgs(%q) does not include the prompt in args: %v", a.Name, testPrompt, args) + } + } +} + +func TestNewParser_ReturnsParserForKnownFormats(t *testing.T) { + for _, f := range []StreamFormat{FormatClaudeStreamJSON, FormatGeminiStreamJSON, FormatVibeStreaming} { + p := newParser(f) + if p == nil { + t.Errorf("newParser(%q) returned nil", f) + } + } +} diff --git a/internal/provider/subprocess/format.go b/internal/provider/subprocess/format.go new file mode 100644 index 0000000..18836a3 --- /dev/null +++ b/internal/provider/subprocess/format.go @@ -0,0 +1,228 @@ +package subprocess + +import ( + "encoding/json" + "fmt" + + "somegit.dev/Owlibou/gnoma/internal/message" + "somegit.dev/Owlibou/gnoma/internal/stream" +) + +// FormatParser converts raw stdout lines from a CLI subprocess into stream.Events. +// Each CLI agent emits its own line-delimited JSON format. +// +// Design note: These agents are full agentic loops, not LLM endpoints. The provider.Request +// fields Tools, Messages (history), SystemPrompt, Temperature, Thinking, etc. are NOT honored — +// they are opaque black boxes. Only the latest user message is passed as a prompt. Internal +// tool calls executed by the CLI are surfaced as EventTextDelta (opaque text) in v1. +type FormatParser interface { + // ParseLine parses one newline-stripped stdout line. Returns 0 or more events. + ParseLine(line []byte) ([]stream.Event, error) + // Done is called when the process exits cleanly. May emit final events. + Done() []stream.Event +} + +// --- claude-stream-json --- +// Format emitted by: claude -p "..." --output-format stream-json --verbose +// +// Relevant event types: +// type=assistant → message.content[].type=text → EventTextDelta +// type=result → usage.input_tokens/output_tokens, stop_reason → EventUsage; is_error → EventError + +type claudeParser struct{} + +func newClaudeParser() FormatParser { return &claudeParser{} } + +type claudeEvent struct { + Type string `json:"type"` + Subtype string `json:"subtype"` + Message *claudeMessage `json:"message,omitempty"` + IsError bool `json:"is_error,omitempty"` + // result fields + StopReason string `json:"stop_reason,omitempty"` + Usage *claudeUsage `json:"usage,omitempty"` +} + +type claudeMessage struct { + Content []claudeContent `json:"content"` + Usage *claudeUsage `json:"usage,omitempty"` +} + +type claudeContent struct { + Type string `json:"type"` + Text string `json:"text,omitempty"` +} + +type claudeUsage struct { + InputTokens int64 `json:"input_tokens"` + OutputTokens int64 `json:"output_tokens"` +} + +func (p *claudeParser) ParseLine(line []byte) ([]stream.Event, error) { + var ev claudeEvent + if err := json.Unmarshal(line, &ev); err != nil { + return nil, fmt.Errorf("claude: parse line: %w", err) + } + + switch ev.Type { + case "assistant": + if ev.Message == nil { + return nil, nil + } + var evts []stream.Event + for _, c := range ev.Message.Content { + if c.Type == "text" && c.Text != "" { + evts = append(evts, stream.Event{Type: stream.EventTextDelta, Text: c.Text}) + } + } + return evts, nil + + case "result": + if ev.IsError { + return []stream.Event{{ + Type: stream.EventError, + Err: fmt.Errorf("claude CLI: %s", ev.Subtype), + }}, nil + } + var evts []stream.Event + if ev.Usage != nil { + evts = append(evts, stream.Event{ + Type: stream.EventUsage, + Usage: &message.Usage{ + InputTokens: ev.Usage.InputTokens, + OutputTokens: ev.Usage.OutputTokens, + }, + StopReason: claudeStopReason(ev.StopReason), + }) + } + return evts, nil + } + + // system, rate_limit_event, tool, etc. — intentionally ignored + return nil, nil +} + +func (p *claudeParser) Done() []stream.Event { return nil } + +func claudeStopReason(s string) message.StopReason { + switch s { + case "end_turn": + return message.StopEndTurn + case "max_tokens": + return message.StopMaxTokens + default: + return message.StopEndTurn + } +} + +// --- gemini-stream-json --- +// Format emitted by: gemini -p "..." --output-format stream-json +// +// Relevant event types: +// type=message, role=assistant, delta=true → EventTextDelta +// type=result, status=success → EventUsage + +type geminiParser struct{} + +func newGeminiParser() FormatParser { return &geminiParser{} } + +type geminiEvent struct { + Type string `json:"type"` + Role string `json:"role,omitempty"` + Content string `json:"content,omitempty"` + Delta bool `json:"delta,omitempty"` + Status string `json:"status,omitempty"` + Stats *geminiStats `json:"stats,omitempty"` +} + +type geminiStats struct { + InputTokens int64 `json:"input_tokens"` + OutputTokens int64 `json:"output_tokens"` +} + +func (p *geminiParser) ParseLine(line []byte) ([]stream.Event, error) { + var ev geminiEvent + if err := json.Unmarshal(line, &ev); err != nil { + return nil, fmt.Errorf("gemini: parse line: %w", err) + } + + switch ev.Type { + case "message": + if ev.Role == "assistant" && ev.Content != "" { + return []stream.Event{{Type: stream.EventTextDelta, Text: ev.Content}}, nil + } + // user messages and empty assistant messages are ignored + return nil, nil + + case "result": + if ev.Stats == nil { + return nil, nil + } + stopReason := message.StopEndTurn + if ev.Status != "success" { + return []stream.Event{{ + Type: stream.EventError, + Err: fmt.Errorf("gemini CLI: result status %q", ev.Status), + }}, nil + } + return []stream.Event{{ + Type: stream.EventUsage, + Usage: &message.Usage{ + InputTokens: ev.Stats.InputTokens, + OutputTokens: ev.Stats.OutputTokens, + }, + StopReason: stopReason, + }}, nil + } + + // init, other types — ignored + return nil, nil +} + +func (p *geminiParser) Done() []stream.Event { return nil } + +// --- vibe-streaming (mistral) --- +// Format emitted by: vibe -p "..." --output streaming --trust +// +// Each line is a JSON message object with a "role" field. +// role=assistant: content → EventTextDelta; reasoning_content → EventThinkingDelta +// role=system, role=user: ignored +// No explicit "done" event — stream ends when process exits. + +type vibeParser struct { + lastAssistantMsgID string +} + +func newVibeParser() FormatParser { return &vibeParser{} } + +type vibeMessage struct { + Role string `json:"role"` + Content string `json:"content"` + ReasoningContent *string `json:"reasoning_content"` + MessageID *string `json:"message_id"` +} + +func (p *vibeParser) ParseLine(line []byte) ([]stream.Event, error) { + var msg vibeMessage + if err := json.Unmarshal(line, &msg); err != nil { + return nil, fmt.Errorf("vibe: parse line: %w", err) + } + + if msg.Role != "assistant" { + return nil, nil + } + + var evts []stream.Event + if msg.ReasoningContent != nil && *msg.ReasoningContent != "" { + evts = append(evts, stream.Event{ + Type: stream.EventThinkingDelta, + Text: *msg.ReasoningContent, + }) + } + if msg.Content != "" { + evts = append(evts, stream.Event{Type: stream.EventTextDelta, Text: msg.Content}) + } + return evts, nil +} + +func (p *vibeParser) Done() []stream.Event { return nil } diff --git a/internal/provider/subprocess/format_test.go b/internal/provider/subprocess/format_test.go new file mode 100644 index 0000000..da54350 --- /dev/null +++ b/internal/provider/subprocess/format_test.go @@ -0,0 +1,311 @@ +package subprocess + +import ( + "os" + "strings" + "testing" + + "somegit.dev/Owlibou/gnoma/internal/message" + "somegit.dev/Owlibou/gnoma/internal/stream" +) + +// loadFixture reads testdata/.jsonl and returns non-empty lines. +func loadFixture(t *testing.T, name string) [][]byte { + t.Helper() + data, err := os.ReadFile("testdata/" + name + ".jsonl") + if err != nil { + t.Fatalf("load fixture %s: %v", name, err) + } + var lines [][]byte + for _, line := range strings.Split(string(data), "\n") { + line = strings.TrimSpace(line) + if line != "" { + lines = append(lines, []byte(line)) + } + } + return lines +} + +// collectEvents runs all fixture lines through a parser and returns all emitted events. +func collectEvents(t *testing.T, p FormatParser, lines [][]byte) []stream.Event { + t.Helper() + var events []stream.Event + for _, line := range lines { + evts, err := p.ParseLine(line) + if err != nil { + t.Errorf("ParseLine(%s): %v", string(line), err) + continue + } + events = append(events, evts...) + } + events = append(events, p.Done()...) + return events +} + +// --- claude-stream-json --- + +func TestClaudeParser_ExtractsTextDelta(t *testing.T) { + p := newClaudeParser() + line := []byte(`{"type":"assistant","message":{"model":"claude-sonnet-4-6","content":[{"type":"text","text":"hello world"}],"usage":{"input_tokens":5,"output_tokens":2}}}`) + + evts, err := p.ParseLine(line) + if err != nil { + t.Fatal(err) + } + if len(evts) == 0 { + t.Fatal("expected at least one event") + } + if evts[0].Type != stream.EventTextDelta { + t.Errorf("got type %v, want EventTextDelta", evts[0].Type) + } + if evts[0].Text != "hello world" { + t.Errorf("got text %q, want %q", evts[0].Text, "hello world") + } +} + +func TestClaudeParser_ExtractsUsageFromResult(t *testing.T) { + p := newClaudeParser() + line := []byte(`{"type":"result","subtype":"success","is_error":false,"stop_reason":"end_turn","usage":{"input_tokens":10,"output_tokens":5},"result":"hello"}`) + + evts, err := p.ParseLine(line) + if err != nil { + t.Fatal(err) + } + var usageEvt *stream.Event + for i := range evts { + if evts[i].Type == stream.EventUsage { + usageEvt = &evts[i] + } + } + if usageEvt == nil { + t.Fatal("no EventUsage emitted") + } + if usageEvt.Usage == nil { + t.Fatal("Usage is nil") + } + if usageEvt.Usage.InputTokens != 10 { + t.Errorf("input_tokens: got %d, want 10", usageEvt.Usage.InputTokens) + } + if usageEvt.Usage.OutputTokens != 5 { + t.Errorf("output_tokens: got %d, want 5", usageEvt.Usage.OutputTokens) + } + if usageEvt.StopReason != message.StopEndTurn { + t.Errorf("stop_reason: got %v, want StopEndTurn", usageEvt.StopReason) + } +} + +func TestClaudeParser_IgnoresSystemAndRateLimit(t *testing.T) { + p := newClaudeParser() + system := []byte(`{"type":"system","subtype":"init","model":"claude-sonnet-4-6"}`) + rateLimit := []byte(`{"type":"rate_limit_event","rate_limit_info":{}}`) + + for _, line := range [][]byte{system, rateLimit} { + evts, err := p.ParseLine(line) + if err != nil { + t.Errorf("ParseLine(%s): unexpected error: %v", line, err) + } + if len(evts) != 0 { + t.Errorf("expected no events for %s, got %d", line, len(evts)) + } + } +} + +func TestClaudeParser_ErrorResult(t *testing.T) { + p := newClaudeParser() + line := []byte(`{"type":"result","subtype":"error_during_run","is_error":true,"usage":{"input_tokens":5,"output_tokens":0}}`) + + evts, err := p.ParseLine(line) + if err != nil { + t.Fatal(err) + } + var hasError bool + for _, e := range evts { + if e.Type == stream.EventError { + hasError = true + } + } + if !hasError { + t.Error("expected EventError for is_error=true result") + } +} + +func TestClaudeParser_FixtureFile(t *testing.T) { + lines := loadFixture(t, "claude") + p := newClaudeParser() + evts := collectEvents(t, p, lines) + + var textEvts, usageEvts int + for _, e := range evts { + switch e.Type { + case stream.EventTextDelta: + textEvts++ + if e.Text == "" { + t.Error("EventTextDelta with empty text") + } + case stream.EventUsage: + usageEvts++ + } + } + if textEvts == 0 { + t.Error("no EventTextDelta from claude fixture") + } + if usageEvts == 0 { + t.Error("no EventUsage from claude fixture") + } +} + +// --- gemini-stream-json --- + +func TestGeminiParser_ExtractsTextDelta(t *testing.T) { + p := newGeminiParser() + line := []byte(`{"type":"message","role":"assistant","content":"hello world","delta":true}`) + + evts, err := p.ParseLine(line) + if err != nil { + t.Fatal(err) + } + if len(evts) == 0 { + t.Fatal("expected at least one event") + } + if evts[0].Type != stream.EventTextDelta { + t.Errorf("got type %v, want EventTextDelta", evts[0].Type) + } + if evts[0].Text != "hello world" { + t.Errorf("got text %q, want %q", evts[0].Text, "hello world") + } +} + +func TestGeminiParser_ExtractsUsageFromResult(t *testing.T) { + p := newGeminiParser() + line := []byte(`{"type":"result","status":"success","stats":{"input_tokens":100,"output_tokens":20}}`) + + evts, err := p.ParseLine(line) + if err != nil { + t.Fatal(err) + } + var usageEvt *stream.Event + for i := range evts { + if evts[i].Type == stream.EventUsage { + usageEvt = &evts[i] + } + } + if usageEvt == nil { + t.Fatal("no EventUsage emitted") + } + if usageEvt.Usage.InputTokens != 100 { + t.Errorf("input_tokens: got %d, want 100", usageEvt.Usage.InputTokens) + } + if usageEvt.Usage.OutputTokens != 20 { + t.Errorf("output_tokens: got %d, want 20", usageEvt.Usage.OutputTokens) + } +} + +func TestGeminiParser_IgnoresUserMessages(t *testing.T) { + p := newGeminiParser() + line := []byte(`{"type":"message","role":"user","content":"say hi"}`) + + evts, err := p.ParseLine(line) + if err != nil { + t.Fatal(err) + } + if len(evts) != 0 { + t.Errorf("expected no events for user message, got %d", len(evts)) + } +} + +func TestGeminiParser_FixtureFile(t *testing.T) { + lines := loadFixture(t, "gemini") + p := newGeminiParser() + evts := collectEvents(t, p, lines) + + var textEvts, usageEvts int + for _, e := range evts { + switch e.Type { + case stream.EventTextDelta: + textEvts++ + case stream.EventUsage: + usageEvts++ + } + } + if textEvts == 0 { + t.Error("no EventTextDelta from gemini fixture") + } + if usageEvts == 0 { + t.Error("no EventUsage from gemini fixture") + } +} + +// --- vibe-streaming (mistral) --- + +func TestVibeParser_ExtractsTextDelta(t *testing.T) { + p := newVibeParser() + line := []byte(`{"role":"assistant","content":"hello world","reasoning_content":null,"tool_calls":null,"message_id":"abc123"}`) + + evts, err := p.ParseLine(line) + if err != nil { + t.Fatal(err) + } + if len(evts) == 0 { + t.Fatal("expected at least one event") + } + if evts[0].Type != stream.EventTextDelta { + t.Errorf("got type %v, want EventTextDelta", evts[0].Type) + } + if evts[0].Text != "hello world" { + t.Errorf("got text %q, want %q", evts[0].Text, "hello world") + } +} + +func TestVibeParser_ExtractsThinkingDelta(t *testing.T) { + p := newVibeParser() + line := []byte(`{"role":"assistant","content":"hello","reasoning_content":"I should say hi","tool_calls":null,"message_id":"abc123"}`) + + evts, err := p.ParseLine(line) + if err != nil { + t.Fatal(err) + } + var hasThinking bool + for _, e := range evts { + if e.Type == stream.EventThinkingDelta { + hasThinking = true + if e.Text == "" { + t.Error("EventThinkingDelta with empty text") + } + } + } + if !hasThinking { + t.Error("expected EventThinkingDelta when reasoning_content is set") + } +} + +func TestVibeParser_IgnoresSystemAndUser(t *testing.T) { + p := newVibeParser() + for _, line := range []string{ + `{"role":"system","content":"You are Vibe...","message_id":null}`, + `{"role":"user","content":"say hi","message_id":"abc"}`, + } { + evts, err := p.ParseLine([]byte(line)) + if err != nil { + t.Errorf("ParseLine(%s): unexpected error: %v", line, err) + } + if len(evts) != 0 { + t.Errorf("expected no events for role=%s, got %d", line[:20], len(evts)) + } + } +} + +func TestVibeParser_FixtureFile(t *testing.T) { + lines := loadFixture(t, "vibe") + p := newVibeParser() + evts := collectEvents(t, p, lines) + + var textEvts int + for _, e := range evts { + if e.Type == stream.EventTextDelta { + textEvts++ + } + } + if textEvts == 0 { + t.Error("no EventTextDelta from vibe fixture") + } +} diff --git a/internal/provider/subprocess/provider.go b/internal/provider/subprocess/provider.go new file mode 100644 index 0000000..3f3087e --- /dev/null +++ b/internal/provider/subprocess/provider.go @@ -0,0 +1,85 @@ +// Package subprocess provides a provider.Provider that delegates to CLI agents +// (claude, gemini, vibe) by spawning them as subprocesses. +// +// Impedance mismatch: these CLI agents are full agentic loops, not LLM endpoints. +// Only the latest user message is passed as a prompt. The following provider.Request +// fields are intentionally ignored: Tools, SystemPrompt, Messages (history), +// Temperature, TopP, TopK, Thinking, ResponseFormat, ToolChoice, MaxTokens. +// Internal tool calls executed by the CLI are surfaced as EventTextDelta (opaque). +package subprocess + +import ( + "context" + "fmt" + "os/exec" + + "somegit.dev/Owlibou/gnoma/internal/message" + "somegit.dev/Owlibou/gnoma/internal/provider" + "somegit.dev/Owlibou/gnoma/internal/stream" +) + +// Provider wraps a single DiscoveredAgent and implements provider.Provider. +type Provider struct { + agent DiscoveredAgent +} + +// New creates a Provider for the given discovered CLI agent. +func New(agent DiscoveredAgent) *Provider { + return &Provider{agent: agent} +} + +// Name returns "subprocess" — all CLI agents share this provider namespace. +func (p *Provider) Name() string { return "subprocess" } + +// DefaultModel returns the CLI binary name (e.g., "claude", "gemini", "vibe"). +func (p *Provider) DefaultModel() string { return p.agent.Name } + +// Models returns a single ModelInfo describing this CLI agent. +func (p *Provider) Models(_ context.Context) ([]provider.ModelInfo, error) { + return []provider.ModelInfo{ + { + ID: p.agent.Name, + Name: p.agent.DisplayName, + Provider: "subprocess", + Capabilities: p.agent.Capabilities, + }, + }, nil +} + +// Stream spawns the CLI agent with the latest user message as a prompt and +// returns an event stream. All fields in req except the last user message are +// ignored — see package doc for rationale. +func (p *Provider) Stream(ctx context.Context, req provider.Request) (stream.Stream, error) { + prompt := extractLastUserMessage(req.Messages) + + args := p.agent.PromptArgs(prompt) + cmd := exec.CommandContext(ctx, p.agent.Path, args...) + + parser := newParser(p.agent.Format) + if parser == nil { + return nil, fmt.Errorf("subprocess: unknown format %q for agent %q", p.agent.Format, p.agent.Name) + } + + s, err := newSubprocessStream(ctx, cmd, parser) + if err != nil { + return nil, fmt.Errorf("subprocess %q: %w", p.agent.Name, err) + } + return s, nil +} + +// extractLastUserMessage returns the content of the last user-role message in msgs. +// Returns an empty string if there are no user messages. +func extractLastUserMessage(msgs []message.Message) string { + for i := len(msgs) - 1; i >= 0; i-- { + m := msgs[i] + if m.Role != message.RoleUser { + continue + } + for _, c := range m.Content { + if c.Type == message.ContentText && c.Text != "" { + return c.Text + } + } + } + return "" +} diff --git a/internal/provider/subprocess/provider_test.go b/internal/provider/subprocess/provider_test.go new file mode 100644 index 0000000..cfcb4c9 --- /dev/null +++ b/internal/provider/subprocess/provider_test.go @@ -0,0 +1,79 @@ +package subprocess + +import ( + "context" + "testing" + + "somegit.dev/Owlibou/gnoma/internal/provider" +) + +func TestProvider_NameAndDefaultModel(t *testing.T) { + agent := DiscoveredAgent{ + CLIAgent: CLIAgent{ + Name: "testcli", + DisplayName: "Test CLI", + Format: FormatVibeStreaming, + Capabilities: provider.Capabilities{ContextWindow: 32000}, + }, + Path: "/usr/bin/testcli", + Version: "1.0.0", + } + p := New(agent) + + if p.Name() != "subprocess" { + t.Errorf("Name() = %q, want %q", p.Name(), "subprocess") + } + if p.DefaultModel() != "testcli" { + t.Errorf("DefaultModel() = %q, want %q", p.DefaultModel(), "testcli") + } +} + +func TestProvider_Models(t *testing.T) { + agent := DiscoveredAgent{ + CLIAgent: CLIAgent{ + Name: "claude", + DisplayName: "Claude Code", + Format: FormatClaudeStreamJSON, + Capabilities: provider.Capabilities{ + ContextWindow: 200000, + }, + }, + Path: "/usr/bin/claude", + Version: "2.1.0", + } + p := New(agent) + + models, err := p.Models(context.Background()) + if err != nil { + t.Fatal(err) + } + if len(models) != 1 { + t.Fatalf("Models() returned %d models, want 1", len(models)) + } + m := models[0] + if m.Provider != "subprocess" { + t.Errorf("model.Provider = %q, want %q", m.Provider, "subprocess") + } + if m.Capabilities.ContextWindow != 200000 { + t.Errorf("ContextWindow = %d, want 200000", m.Capabilities.ContextWindow) + } +} + +func TestProvider_Stream_MissingBinary(t *testing.T) { + agent := DiscoveredAgent{ + CLIAgent: CLIAgent{ + Name: "nonexistentcli", + DisplayName: "Nonexistent", + Format: FormatVibeStreaming, + PromptArgs: func(p string) []string { return []string{p} }, + }, + Path: "/nonexistent/cli", + } + p := New(agent) + + req := provider.Request{} + _, err := p.Stream(context.Background(), req) + if err == nil { + t.Error("expected error for nonexistent binary, got nil") + } +} diff --git a/internal/provider/subprocess/stream.go b/internal/provider/subprocess/stream.go new file mode 100644 index 0000000..020d60a --- /dev/null +++ b/internal/provider/subprocess/stream.go @@ -0,0 +1,158 @@ +package subprocess + +import ( + "bufio" + "bytes" + "context" + "fmt" + "io" + "os/exec" + "strings" + + "somegit.dev/Owlibou/gnoma/internal/stream" +) + +// subprocessStream implements stream.Stream by reading line-delimited JSON +// from a subprocess stdout and converting lines via a FormatParser. +type subprocessStream struct { + cmd *exec.Cmd + stdout io.ReadCloser + stderrBuf *bytes.Buffer + scanner *bufio.Scanner + parser FormatParser + pending []stream.Event + current stream.Event + err error + done bool + waited bool +} + +// newSubprocessStream starts cmd, attaches a stdout pipe, and returns the stream. +// The caller must call Close() to release resources. +func newSubprocessStream(ctx context.Context, cmd *exec.Cmd, parser FormatParser) (*subprocessStream, error) { + stdout, err := cmd.StdoutPipe() + if err != nil { + return nil, fmt.Errorf("subprocess: stdout pipe: %w", err) + } + + // Capture stderr for error messages; bounded to 8KB. + stderrBuf := &bytes.Buffer{} + cmd.Stderr = &limitedWriter{w: stderrBuf, n: 8192} + + // Explicitly close stdin so the subprocess doesn't block waiting for input. + cmd.Stdin = nil + + if err := cmd.Start(); err != nil { + return nil, fmt.Errorf("subprocess: start: %w", err) + } + + _ = ctx // context cancellation is handled by exec.CommandContext in the caller + scanner := bufio.NewScanner(stdout) + scanner.Buffer(make([]byte, 1024*1024), 1024*1024) // 1MB line buffer + + return &subprocessStream{ + cmd: cmd, + stdout: stdout, + stderrBuf: stderrBuf, + scanner: scanner, + parser: parser, + }, nil +} + +func (s *subprocessStream) Next() bool { + if s.done || s.err != nil { + return false + } + + for { + // Drain buffered events first. + if len(s.pending) > 0 { + s.current = s.pending[0] + s.pending = s.pending[1:] + return true + } + + // Read next line from subprocess stdout. + if !s.scanner.Scan() { + // EOF — process has exited (or pipe closed). + if err := s.scanner.Err(); err != nil { + s.err = err + return false + } + // Emit final events from parser. + final := s.parser.Done() + if len(final) > 0 { + s.pending = final + } + // Wait for process to exit and surface any non-zero exit code. + s.reap() + s.done = true + if len(s.pending) > 0 { + s.current = s.pending[0] + s.pending = s.pending[1:] + return s.err == nil + } + return false + } + + line := s.scanner.Bytes() + if len(line) == 0 { + continue + } + + evts, err := s.parser.ParseLine(line) + if err != nil { + // Non-fatal parse error: skip the line but continue. + continue + } + s.pending = append(s.pending, evts...) + } +} + +func (s *subprocessStream) Current() stream.Event { return s.current } +func (s *subprocessStream) Err() error { return s.err } + +func (s *subprocessStream) Close() error { + if s.cmd.Process != nil { + _ = s.cmd.Process.Kill() + } + _ = s.stdout.Close() + s.reap() + return nil +} + +// reap waits for the process exactly once. Non-zero exit is stored as stream error. +func (s *subprocessStream) reap() { + if s.waited { + return + } + s.waited = true + if err := s.cmd.Wait(); err != nil { + if s.err == nil { + msg := strings.TrimSpace(s.stderrBuf.String()) + if msg != "" { + s.err = fmt.Errorf("subprocess: %w: %s", err, msg) + } else { + s.err = fmt.Errorf("subprocess: %w", err) + } + } + } +} + +// limitedWriter is a writer that stops writing after n bytes. +type limitedWriter struct { + w io.Writer + n int +} + +func (lw *limitedWriter) Write(p []byte) (int, error) { + if lw.n <= 0 { + return len(p), nil // silently discard + } + if len(p) > lw.n { + p = p[:lw.n] + } + n, err := lw.w.Write(p) + lw.n -= n + return n, err +} diff --git a/internal/provider/subprocess/stream_test.go b/internal/provider/subprocess/stream_test.go new file mode 100644 index 0000000..19a84c1 --- /dev/null +++ b/internal/provider/subprocess/stream_test.go @@ -0,0 +1,90 @@ +package subprocess + +import ( + "context" + "os/exec" + "runtime" + "testing" + + "somegit.dev/Owlibou/gnoma/internal/stream" +) + +// TestSubprocessStream_EchoShell runs a real subprocess (printf) and verifies +// the stream delivers the expected events. +func TestSubprocessStream_EchoShell(t *testing.T) { + if runtime.GOOS == "windows" { + t.Skip("shell printf not available on windows") + } + + // Feed a vibe-format line through a printf subprocess. + // We use vibe format because it's the simplest (no "done" event needed). + line := `{"role":"assistant","content":"hello from subprocess","reasoning_content":null,"tool_calls":null,"message_id":"abc"}` + cmd := exec.Command("sh", "-c", "printf '%s\n'", line) + // Actually build the printf command with the correct argument + cmd = exec.CommandContext(context.Background(), "sh", "-c", + `printf '{"role":"assistant","content":"hello from subprocess","reasoning_content":null,"tool_calls":null,"message_id":"abc"}\n'`) + + s, err := newSubprocessStream(context.Background(), cmd, newVibeParser()) + if err != nil { + t.Fatal(err) + } + defer s.Close() + + var texts []string + for s.Next() { + ev := s.Current() + if ev.Type == stream.EventTextDelta { + texts = append(texts, ev.Text) + } + } + if err := s.Err(); err != nil { + t.Fatalf("stream error: %v", err) + } + if len(texts) == 0 { + t.Fatal("no text events received") + } + if texts[0] != "hello from subprocess" { + t.Errorf("got text %q, want %q", texts[0], "hello from subprocess") + } +} + +func TestSubprocessStream_ContextCancel(t *testing.T) { + if runtime.GOOS == "windows" { + t.Skip("sleep not available on windows") + } + + ctx, cancel := context.WithCancel(context.Background()) + + cmd := exec.CommandContext(ctx, "sh", "-c", "sleep 30") + s, err := newSubprocessStream(ctx, cmd, newVibeParser()) + if err != nil { + t.Fatal(err) + } + defer s.Close() + + cancel() + // Drain — should stop quickly due to context cancellation. + for s.Next() { + } + // No error assertion: context cancel may or may not propagate as stream error. +} + +func TestSubprocessStream_ProcessError(t *testing.T) { + if runtime.GOOS == "windows" { + t.Skip("sh not available on windows") + } + + cmd := exec.CommandContext(context.Background(), "sh", "-c", "exit 1") + s, err := newSubprocessStream(context.Background(), cmd, newVibeParser()) + if err != nil { + t.Fatal(err) + } + defer s.Close() + + for s.Next() { + } + // A non-zero exit should surface as a stream error. + if s.Err() == nil { + t.Error("expected stream error for non-zero exit, got nil") + } +} diff --git a/internal/provider/subprocess/testdata/claude.jsonl b/internal/provider/subprocess/testdata/claude.jsonl new file mode 100644 index 0000000..a5d0b29 --- /dev/null +++ b/internal/provider/subprocess/testdata/claude.jsonl @@ -0,0 +1,4 @@ +{"type":"system","subtype":"init","cwd":"/home/user/project","session_id":"8ae89dc8","tools":["Bash","Read"],"mcp_servers":[],"model":"claude-sonnet-4-6","permissionMode":"default","apiKeySource":"none","claude_code_version":"2.1.131","uuid":"549e7847"} +{"type":"assistant","message":{"model":"claude-sonnet-4-6","id":"msg_01SRH","type":"message","role":"assistant","content":[{"type":"text","text":"hi"}],"stop_reason":null,"stop_sequence":null,"usage":{"input_tokens":3,"cache_creation_input_tokens":11062,"cache_read_input_tokens":12372,"output_tokens":1,"service_tier":"standard"},"context_management":null},"parent_tool_use_id":null,"session_id":"8ae89dc8","uuid":"97ab5bd6"} +{"type":"rate_limit_event","rate_limit_info":{"status":"allowed_warning","rateLimitType":"seven_day","utilization":0.76},"uuid":"12249404","session_id":"8ae89dc8"} +{"type":"result","subtype":"success","is_error":false,"duration_ms":1989,"duration_api_ms":1856,"num_turns":1,"result":"hi","stop_reason":"end_turn","session_id":"8ae89dc8","total_cost_usd":0.045,"usage":{"input_tokens":3,"cache_creation_input_tokens":11062,"cache_read_input_tokens":12372,"output_tokens":4,"service_tier":"standard"},"modelUsage":{"claude-sonnet-4-6":{"inputTokens":3,"outputTokens":4}},"terminal_reason":"completed","uuid":"cd4fcb02"} diff --git a/internal/provider/subprocess/testdata/gemini.jsonl b/internal/provider/subprocess/testdata/gemini.jsonl new file mode 100644 index 0000000..52f5959 --- /dev/null +++ b/internal/provider/subprocess/testdata/gemini.jsonl @@ -0,0 +1,4 @@ +{"type":"init","timestamp":"2026-05-07T12:18:49.605Z","session_id":"c2d08136","model":"auto-gemini-3"} +{"type":"message","timestamp":"2026-05-07T12:18:49.606Z","role":"user","content":"say the word hi and nothing else"} +{"type":"message","timestamp":"2026-05-07T12:18:52.063Z","role":"assistant","content":"hi","delta":true} +{"type":"result","timestamp":"2026-05-07T12:18:52.111Z","status":"success","stats":{"total_tokens":10843,"input_tokens":10747,"output_tokens":1,"cached":7862,"input":2885,"duration_ms":2506,"tool_calls":0,"models":{"gemini-3-flash-preview":{"total_tokens":10843,"input_tokens":10747,"output_tokens":1,"cached":7862,"input":2885}}}} diff --git a/internal/provider/subprocess/testdata/vibe.jsonl b/internal/provider/subprocess/testdata/vibe.jsonl new file mode 100644 index 0000000..d3acc0b --- /dev/null +++ b/internal/provider/subprocess/testdata/vibe.jsonl @@ -0,0 +1,3 @@ +{"role": "system", "content": "You are Mistral Vibe, a CLI coding agent built by Mistral AI.", "injected": false, "reasoning_content": null, "reasoning_state": null, "tool_calls": null, "name": null, "tool_call_id": null, "message_id": null} +{"role": "user", "content": "say the word hi and nothing else", "injected": false, "reasoning_content": null, "tool_calls": null, "name": null, "tool_call_id": null, "message_id": "a358a1ec"} +{"role": "assistant", "content": "hi", "injected": false, "reasoning_content": "The user wants me to say the word \"hi\" and nothing else.", "reasoning_state": null, "tool_calls": null, "name": null, "tool_call_id": null, "message_id": "c8ecdd49"}