feat(provider): subprocess CLI provider for claude, gemini, vibe

Adds internal/provider/subprocess — a provider.Provider that spawns CLI
agents (claude, gemini, vibe) as subprocesses and streams their output.

- FormatParser interface + three parsers for claude-stream-json,
  gemini-stream-json, and vibe-streaming formats; fixtures captured from
  real binaries
- subprocessStream: pull-based stream.Stream over subprocess stdout with
  bounded stderr capture (8KB) and guarded reap() to prevent double-Wait
- DiscoverCLIAgents: parallel PATH scan with 10s timeout, stable ordering
- Provider: only the last user message is passed as --prompt; all other
  request fields (history, tools, system prompt) are intentionally ignored
  (see package doc)
- main.go: discover and register CLI arms at startup; TODO(P0c) for
  tier-based routing to enforce preference order explicitly
This commit is contained in:
2026-05-07 14:29:34 +02:00
parent f9b8c1886b
commit f213d8f9ce
12 changed files with 1232 additions and 0 deletions
+23
View File
@@ -32,6 +32,7 @@ import (
googleprov "somegit.dev/Owlibou/gnoma/internal/provider/google"
oaiprov "somegit.dev/Owlibou/gnoma/internal/provider/openai"
"somegit.dev/Owlibou/gnoma/internal/provider/openaicompat"
subprocprov "somegit.dev/Owlibou/gnoma/internal/provider/subprocess"
"somegit.dev/Owlibou/gnoma/internal/session"
"somegit.dev/Owlibou/gnoma/internal/stream"
"somegit.dev/Owlibou/gnoma/internal/tool"
@@ -288,6 +289,28 @@ func main() {
logger.Debug("local models discovered", "count", len(localModels))
}
// Discover CLI agents (claude, gemini, vibe) and register as arms.
// TODO(P0c): CLI arms have cost=0 and ToolUse=true, so the router currently
// always prefers them over API arms. Tier-based routing (subprocess > local > API)
// needs to be explicit in the selector, not implicit through cost.
cliAgents := subprocprov.DiscoverCLIAgents(context.Background())
for _, agent := range cliAgents {
cliArmID := router.NewArmID("subprocess", agent.Name)
if _, exists := rtr.LookupArm(cliArmID); !exists {
rtr.RegisterArm(&router.Arm{
ID: cliArmID,
Provider: subprocprov.New(agent),
ModelName: agent.Name,
IsLocal: false,
Capabilities: agent.Capabilities,
})
logger.Debug("registered CLI agent", "name", agent.Name, "version", agent.Version)
}
}
if len(cliAgents) > 0 {
logger.Debug("CLI agents discovered", "count", len(cliAgents))
}
// Start background discovery polling (30s interval).
// modelUpdater is set after the session is created so the discovery loop
// can update the displayed model name when it reconciles the forced arm.
+173
View File
@@ -0,0 +1,173 @@
package subprocess
import (
"context"
"os/exec"
"sort"
"sync"
"time"
"somegit.dev/Owlibou/gnoma/internal/provider"
)
// StreamFormat identifies the line-delimited JSON format a CLI agent emits.
type StreamFormat string
const (
FormatClaudeStreamJSON StreamFormat = "claude-stream-json"
FormatGeminiStreamJSON StreamFormat = "gemini-stream-json"
FormatVibeStreaming StreamFormat = "vibe-streaming"
)
// CLIAgent describes a known CLI agent binary.
type CLIAgent struct {
Name string
DisplayName string
ProbeArgs []string // args to fetch version (e.g. ["--version"])
PromptArgs func(string) []string // build argv for a non-interactive prompt run
Format StreamFormat
Capabilities provider.Capabilities
}
// DiscoveredAgent is a CLIAgent confirmed present on PATH with its resolved path.
type DiscoveredAgent struct {
CLIAgent
Path string
Version string
}
// knownAgents is the registry of CLI agents Gnoma supports.
var knownAgents = []CLIAgent{
{
Name: "claude",
DisplayName: "Claude Code",
ProbeArgs: []string{"--version"},
PromptArgs: func(p string) []string {
return []string{"-p", p, "--output-format", "stream-json", "--verbose"}
},
Format: FormatClaudeStreamJSON,
// ToolUse=true: the claude CLI is a full agent with its own tool loop.
// This is a routing capability flag, not a provider-layer capability.
Capabilities: provider.Capabilities{
ToolUse: true,
ContextWindow: 200000,
},
},
{
Name: "gemini",
DisplayName: "Gemini CLI",
ProbeArgs: []string{"--version"},
PromptArgs: func(p string) []string {
return []string{"-p", p, "--output-format", "stream-json", "--yolo"}
},
Format: FormatGeminiStreamJSON,
Capabilities: provider.Capabilities{
ToolUse: true,
ContextWindow: 1048576,
},
},
{
Name: "vibe",
DisplayName: "Mistral Vibe",
ProbeArgs: []string{"--version"},
PromptArgs: func(p string) []string {
return []string{"-p", p, "--output", "streaming", "--trust"}
},
Format: FormatVibeStreaming,
Capabilities: provider.Capabilities{
ToolUse: true,
ContextWindow: 128000,
},
},
}
// newParser returns a FormatParser for the given format.
func newParser(f StreamFormat) FormatParser {
switch f {
case FormatClaudeStreamJSON:
return newClaudeParser()
case FormatGeminiStreamJSON:
return newGeminiParser()
case FormatVibeStreaming:
return newVibeParser()
default:
return nil
}
}
// DiscoverCLIAgents scans PATH for known CLI agents in parallel and returns the
// ones that are present and respond to their probe command.
func DiscoverCLIAgents(ctx context.Context) []DiscoveredAgent {
ctx, cancel := context.WithTimeout(ctx, 10*time.Second)
defer cancel()
var mu sync.Mutex
var found []DiscoveredAgent
var wg sync.WaitGroup
sem := make(chan struct{}, 4)
for _, agent := range knownAgents {
path, err := exec.LookPath(agent.Name)
if err != nil {
continue
}
wg.Add(1)
sem <- struct{}{}
go func(a CLIAgent, p string) {
defer wg.Done()
defer func() { <-sem }()
version := probeAgentVersion(ctx, p, a.ProbeArgs)
mu.Lock()
found = append(found, DiscoveredAgent{CLIAgent: a, Path: p, Version: version})
mu.Unlock()
}(agent, path)
}
wg.Wait()
// Stable order: match knownAgents ordering.
order := make(map[string]int, len(knownAgents))
for i, a := range knownAgents {
order[a.Name] = i
}
sort.Slice(found, func(i, j int) bool {
return order[found[i].Name] < order[found[j].Name]
})
return found
}
func probeAgentVersion(ctx context.Context, path string, args []string) string {
ctx, cancel := context.WithTimeout(ctx, 3*time.Second)
defer cancel()
cmd := exec.CommandContext(ctx, path, args...)
out, err := cmd.Output()
if err != nil && len(out) == 0 {
return ""
}
// Return the first non-empty line.
for _, b := range splitNL(out) {
if len(b) > 0 {
return string(b)
}
}
return ""
}
// splitNL splits bytes by newlines, trimming carriage returns.
func splitNL(b []byte) [][]byte {
var lines [][]byte
start := 0
for i, c := range b {
if c == '\n' {
line := b[start:i]
if len(line) > 0 && line[len(line)-1] == '\r' {
line = line[:len(line)-1]
}
lines = append(lines, line)
start = i + 1
}
}
if start < len(b) {
lines = append(lines, b[start:])
}
return lines
}
@@ -0,0 +1,74 @@
package subprocess
import (
"testing"
)
func TestKnownAgents_Defined(t *testing.T) {
if len(knownAgents) == 0 {
t.Fatal("knownAgents must not be empty")
}
for _, a := range knownAgents {
if a.Name == "" {
t.Errorf("agent with empty Name: %+v", a)
}
if a.DisplayName == "" {
t.Errorf("agent %q has empty DisplayName", a.Name)
}
if a.Format == "" {
t.Errorf("agent %q has empty Format", a.Name)
}
if a.PromptArgs == nil {
t.Errorf("agent %q has nil PromptArgs", a.Name)
}
}
}
func TestKnownAgents_UniqueNames(t *testing.T) {
seen := make(map[string]bool)
for _, a := range knownAgents {
if seen[a.Name] {
t.Errorf("duplicate agent name %q", a.Name)
}
seen[a.Name] = true
}
}
func TestKnownAgents_ValidFormats(t *testing.T) {
valid := map[StreamFormat]bool{
FormatClaudeStreamJSON: true,
FormatGeminiStreamJSON: true,
FormatVibeStreaming: true,
}
for _, a := range knownAgents {
if !valid[a.Format] {
t.Errorf("agent %q has unknown format %q", a.Name, a.Format)
}
}
}
func TestKnownAgents_PromptArgsIncludePrompt(t *testing.T) {
const testPrompt = "TESTPROMPT_UNIQUE_SENTINEL"
for _, a := range knownAgents {
args := a.PromptArgs(testPrompt)
found := false
for _, arg := range args {
if arg == testPrompt {
found = true
break
}
}
if !found {
t.Errorf("agent %q PromptArgs(%q) does not include the prompt in args: %v", a.Name, testPrompt, args)
}
}
}
func TestNewParser_ReturnsParserForKnownFormats(t *testing.T) {
for _, f := range []StreamFormat{FormatClaudeStreamJSON, FormatGeminiStreamJSON, FormatVibeStreaming} {
p := newParser(f)
if p == nil {
t.Errorf("newParser(%q) returned nil", f)
}
}
}
+228
View File
@@ -0,0 +1,228 @@
package subprocess
import (
"encoding/json"
"fmt"
"somegit.dev/Owlibou/gnoma/internal/message"
"somegit.dev/Owlibou/gnoma/internal/stream"
)
// FormatParser converts raw stdout lines from a CLI subprocess into stream.Events.
// Each CLI agent emits its own line-delimited JSON format.
//
// Design note: These agents are full agentic loops, not LLM endpoints. The provider.Request
// fields Tools, Messages (history), SystemPrompt, Temperature, Thinking, etc. are NOT honored —
// they are opaque black boxes. Only the latest user message is passed as a prompt. Internal
// tool calls executed by the CLI are surfaced as EventTextDelta (opaque text) in v1.
type FormatParser interface {
// ParseLine parses one newline-stripped stdout line. Returns 0 or more events.
ParseLine(line []byte) ([]stream.Event, error)
// Done is called when the process exits cleanly. May emit final events.
Done() []stream.Event
}
// --- claude-stream-json ---
// Format emitted by: claude -p "..." --output-format stream-json --verbose
//
// Relevant event types:
// type=assistant → message.content[].type=text → EventTextDelta
// type=result → usage.input_tokens/output_tokens, stop_reason → EventUsage; is_error → EventError
type claudeParser struct{}
func newClaudeParser() FormatParser { return &claudeParser{} }
type claudeEvent struct {
Type string `json:"type"`
Subtype string `json:"subtype"`
Message *claudeMessage `json:"message,omitempty"`
IsError bool `json:"is_error,omitempty"`
// result fields
StopReason string `json:"stop_reason,omitempty"`
Usage *claudeUsage `json:"usage,omitempty"`
}
type claudeMessage struct {
Content []claudeContent `json:"content"`
Usage *claudeUsage `json:"usage,omitempty"`
}
type claudeContent struct {
Type string `json:"type"`
Text string `json:"text,omitempty"`
}
type claudeUsage struct {
InputTokens int64 `json:"input_tokens"`
OutputTokens int64 `json:"output_tokens"`
}
func (p *claudeParser) ParseLine(line []byte) ([]stream.Event, error) {
var ev claudeEvent
if err := json.Unmarshal(line, &ev); err != nil {
return nil, fmt.Errorf("claude: parse line: %w", err)
}
switch ev.Type {
case "assistant":
if ev.Message == nil {
return nil, nil
}
var evts []stream.Event
for _, c := range ev.Message.Content {
if c.Type == "text" && c.Text != "" {
evts = append(evts, stream.Event{Type: stream.EventTextDelta, Text: c.Text})
}
}
return evts, nil
case "result":
if ev.IsError {
return []stream.Event{{
Type: stream.EventError,
Err: fmt.Errorf("claude CLI: %s", ev.Subtype),
}}, nil
}
var evts []stream.Event
if ev.Usage != nil {
evts = append(evts, stream.Event{
Type: stream.EventUsage,
Usage: &message.Usage{
InputTokens: ev.Usage.InputTokens,
OutputTokens: ev.Usage.OutputTokens,
},
StopReason: claudeStopReason(ev.StopReason),
})
}
return evts, nil
}
// system, rate_limit_event, tool, etc. — intentionally ignored
return nil, nil
}
func (p *claudeParser) Done() []stream.Event { return nil }
func claudeStopReason(s string) message.StopReason {
switch s {
case "end_turn":
return message.StopEndTurn
case "max_tokens":
return message.StopMaxTokens
default:
return message.StopEndTurn
}
}
// --- gemini-stream-json ---
// Format emitted by: gemini -p "..." --output-format stream-json
//
// Relevant event types:
// type=message, role=assistant, delta=true → EventTextDelta
// type=result, status=success → EventUsage
type geminiParser struct{}
func newGeminiParser() FormatParser { return &geminiParser{} }
type geminiEvent struct {
Type string `json:"type"`
Role string `json:"role,omitempty"`
Content string `json:"content,omitempty"`
Delta bool `json:"delta,omitempty"`
Status string `json:"status,omitempty"`
Stats *geminiStats `json:"stats,omitempty"`
}
type geminiStats struct {
InputTokens int64 `json:"input_tokens"`
OutputTokens int64 `json:"output_tokens"`
}
func (p *geminiParser) ParseLine(line []byte) ([]stream.Event, error) {
var ev geminiEvent
if err := json.Unmarshal(line, &ev); err != nil {
return nil, fmt.Errorf("gemini: parse line: %w", err)
}
switch ev.Type {
case "message":
if ev.Role == "assistant" && ev.Content != "" {
return []stream.Event{{Type: stream.EventTextDelta, Text: ev.Content}}, nil
}
// user messages and empty assistant messages are ignored
return nil, nil
case "result":
if ev.Stats == nil {
return nil, nil
}
stopReason := message.StopEndTurn
if ev.Status != "success" {
return []stream.Event{{
Type: stream.EventError,
Err: fmt.Errorf("gemini CLI: result status %q", ev.Status),
}}, nil
}
return []stream.Event{{
Type: stream.EventUsage,
Usage: &message.Usage{
InputTokens: ev.Stats.InputTokens,
OutputTokens: ev.Stats.OutputTokens,
},
StopReason: stopReason,
}}, nil
}
// init, other types — ignored
return nil, nil
}
func (p *geminiParser) Done() []stream.Event { return nil }
// --- vibe-streaming (mistral) ---
// Format emitted by: vibe -p "..." --output streaming --trust
//
// Each line is a JSON message object with a "role" field.
// role=assistant: content → EventTextDelta; reasoning_content → EventThinkingDelta
// role=system, role=user: ignored
// No explicit "done" event — stream ends when process exits.
type vibeParser struct {
lastAssistantMsgID string
}
func newVibeParser() FormatParser { return &vibeParser{} }
type vibeMessage struct {
Role string `json:"role"`
Content string `json:"content"`
ReasoningContent *string `json:"reasoning_content"`
MessageID *string `json:"message_id"`
}
func (p *vibeParser) ParseLine(line []byte) ([]stream.Event, error) {
var msg vibeMessage
if err := json.Unmarshal(line, &msg); err != nil {
return nil, fmt.Errorf("vibe: parse line: %w", err)
}
if msg.Role != "assistant" {
return nil, nil
}
var evts []stream.Event
if msg.ReasoningContent != nil && *msg.ReasoningContent != "" {
evts = append(evts, stream.Event{
Type: stream.EventThinkingDelta,
Text: *msg.ReasoningContent,
})
}
if msg.Content != "" {
evts = append(evts, stream.Event{Type: stream.EventTextDelta, Text: msg.Content})
}
return evts, nil
}
func (p *vibeParser) Done() []stream.Event { return nil }
+311
View File
@@ -0,0 +1,311 @@
package subprocess
import (
"os"
"strings"
"testing"
"somegit.dev/Owlibou/gnoma/internal/message"
"somegit.dev/Owlibou/gnoma/internal/stream"
)
// loadFixture reads testdata/<name>.jsonl and returns non-empty lines.
func loadFixture(t *testing.T, name string) [][]byte {
t.Helper()
data, err := os.ReadFile("testdata/" + name + ".jsonl")
if err != nil {
t.Fatalf("load fixture %s: %v", name, err)
}
var lines [][]byte
for _, line := range strings.Split(string(data), "\n") {
line = strings.TrimSpace(line)
if line != "" {
lines = append(lines, []byte(line))
}
}
return lines
}
// collectEvents runs all fixture lines through a parser and returns all emitted events.
func collectEvents(t *testing.T, p FormatParser, lines [][]byte) []stream.Event {
t.Helper()
var events []stream.Event
for _, line := range lines {
evts, err := p.ParseLine(line)
if err != nil {
t.Errorf("ParseLine(%s): %v", string(line), err)
continue
}
events = append(events, evts...)
}
events = append(events, p.Done()...)
return events
}
// --- claude-stream-json ---
func TestClaudeParser_ExtractsTextDelta(t *testing.T) {
p := newClaudeParser()
line := []byte(`{"type":"assistant","message":{"model":"claude-sonnet-4-6","content":[{"type":"text","text":"hello world"}],"usage":{"input_tokens":5,"output_tokens":2}}}`)
evts, err := p.ParseLine(line)
if err != nil {
t.Fatal(err)
}
if len(evts) == 0 {
t.Fatal("expected at least one event")
}
if evts[0].Type != stream.EventTextDelta {
t.Errorf("got type %v, want EventTextDelta", evts[0].Type)
}
if evts[0].Text != "hello world" {
t.Errorf("got text %q, want %q", evts[0].Text, "hello world")
}
}
func TestClaudeParser_ExtractsUsageFromResult(t *testing.T) {
p := newClaudeParser()
line := []byte(`{"type":"result","subtype":"success","is_error":false,"stop_reason":"end_turn","usage":{"input_tokens":10,"output_tokens":5},"result":"hello"}`)
evts, err := p.ParseLine(line)
if err != nil {
t.Fatal(err)
}
var usageEvt *stream.Event
for i := range evts {
if evts[i].Type == stream.EventUsage {
usageEvt = &evts[i]
}
}
if usageEvt == nil {
t.Fatal("no EventUsage emitted")
}
if usageEvt.Usage == nil {
t.Fatal("Usage is nil")
}
if usageEvt.Usage.InputTokens != 10 {
t.Errorf("input_tokens: got %d, want 10", usageEvt.Usage.InputTokens)
}
if usageEvt.Usage.OutputTokens != 5 {
t.Errorf("output_tokens: got %d, want 5", usageEvt.Usage.OutputTokens)
}
if usageEvt.StopReason != message.StopEndTurn {
t.Errorf("stop_reason: got %v, want StopEndTurn", usageEvt.StopReason)
}
}
func TestClaudeParser_IgnoresSystemAndRateLimit(t *testing.T) {
p := newClaudeParser()
system := []byte(`{"type":"system","subtype":"init","model":"claude-sonnet-4-6"}`)
rateLimit := []byte(`{"type":"rate_limit_event","rate_limit_info":{}}`)
for _, line := range [][]byte{system, rateLimit} {
evts, err := p.ParseLine(line)
if err != nil {
t.Errorf("ParseLine(%s): unexpected error: %v", line, err)
}
if len(evts) != 0 {
t.Errorf("expected no events for %s, got %d", line, len(evts))
}
}
}
func TestClaudeParser_ErrorResult(t *testing.T) {
p := newClaudeParser()
line := []byte(`{"type":"result","subtype":"error_during_run","is_error":true,"usage":{"input_tokens":5,"output_tokens":0}}`)
evts, err := p.ParseLine(line)
if err != nil {
t.Fatal(err)
}
var hasError bool
for _, e := range evts {
if e.Type == stream.EventError {
hasError = true
}
}
if !hasError {
t.Error("expected EventError for is_error=true result")
}
}
func TestClaudeParser_FixtureFile(t *testing.T) {
lines := loadFixture(t, "claude")
p := newClaudeParser()
evts := collectEvents(t, p, lines)
var textEvts, usageEvts int
for _, e := range evts {
switch e.Type {
case stream.EventTextDelta:
textEvts++
if e.Text == "" {
t.Error("EventTextDelta with empty text")
}
case stream.EventUsage:
usageEvts++
}
}
if textEvts == 0 {
t.Error("no EventTextDelta from claude fixture")
}
if usageEvts == 0 {
t.Error("no EventUsage from claude fixture")
}
}
// --- gemini-stream-json ---
func TestGeminiParser_ExtractsTextDelta(t *testing.T) {
p := newGeminiParser()
line := []byte(`{"type":"message","role":"assistant","content":"hello world","delta":true}`)
evts, err := p.ParseLine(line)
if err != nil {
t.Fatal(err)
}
if len(evts) == 0 {
t.Fatal("expected at least one event")
}
if evts[0].Type != stream.EventTextDelta {
t.Errorf("got type %v, want EventTextDelta", evts[0].Type)
}
if evts[0].Text != "hello world" {
t.Errorf("got text %q, want %q", evts[0].Text, "hello world")
}
}
func TestGeminiParser_ExtractsUsageFromResult(t *testing.T) {
p := newGeminiParser()
line := []byte(`{"type":"result","status":"success","stats":{"input_tokens":100,"output_tokens":20}}`)
evts, err := p.ParseLine(line)
if err != nil {
t.Fatal(err)
}
var usageEvt *stream.Event
for i := range evts {
if evts[i].Type == stream.EventUsage {
usageEvt = &evts[i]
}
}
if usageEvt == nil {
t.Fatal("no EventUsage emitted")
}
if usageEvt.Usage.InputTokens != 100 {
t.Errorf("input_tokens: got %d, want 100", usageEvt.Usage.InputTokens)
}
if usageEvt.Usage.OutputTokens != 20 {
t.Errorf("output_tokens: got %d, want 20", usageEvt.Usage.OutputTokens)
}
}
func TestGeminiParser_IgnoresUserMessages(t *testing.T) {
p := newGeminiParser()
line := []byte(`{"type":"message","role":"user","content":"say hi"}`)
evts, err := p.ParseLine(line)
if err != nil {
t.Fatal(err)
}
if len(evts) != 0 {
t.Errorf("expected no events for user message, got %d", len(evts))
}
}
func TestGeminiParser_FixtureFile(t *testing.T) {
lines := loadFixture(t, "gemini")
p := newGeminiParser()
evts := collectEvents(t, p, lines)
var textEvts, usageEvts int
for _, e := range evts {
switch e.Type {
case stream.EventTextDelta:
textEvts++
case stream.EventUsage:
usageEvts++
}
}
if textEvts == 0 {
t.Error("no EventTextDelta from gemini fixture")
}
if usageEvts == 0 {
t.Error("no EventUsage from gemini fixture")
}
}
// --- vibe-streaming (mistral) ---
func TestVibeParser_ExtractsTextDelta(t *testing.T) {
p := newVibeParser()
line := []byte(`{"role":"assistant","content":"hello world","reasoning_content":null,"tool_calls":null,"message_id":"abc123"}`)
evts, err := p.ParseLine(line)
if err != nil {
t.Fatal(err)
}
if len(evts) == 0 {
t.Fatal("expected at least one event")
}
if evts[0].Type != stream.EventTextDelta {
t.Errorf("got type %v, want EventTextDelta", evts[0].Type)
}
if evts[0].Text != "hello world" {
t.Errorf("got text %q, want %q", evts[0].Text, "hello world")
}
}
func TestVibeParser_ExtractsThinkingDelta(t *testing.T) {
p := newVibeParser()
line := []byte(`{"role":"assistant","content":"hello","reasoning_content":"I should say hi","tool_calls":null,"message_id":"abc123"}`)
evts, err := p.ParseLine(line)
if err != nil {
t.Fatal(err)
}
var hasThinking bool
for _, e := range evts {
if e.Type == stream.EventThinkingDelta {
hasThinking = true
if e.Text == "" {
t.Error("EventThinkingDelta with empty text")
}
}
}
if !hasThinking {
t.Error("expected EventThinkingDelta when reasoning_content is set")
}
}
func TestVibeParser_IgnoresSystemAndUser(t *testing.T) {
p := newVibeParser()
for _, line := range []string{
`{"role":"system","content":"You are Vibe...","message_id":null}`,
`{"role":"user","content":"say hi","message_id":"abc"}`,
} {
evts, err := p.ParseLine([]byte(line))
if err != nil {
t.Errorf("ParseLine(%s): unexpected error: %v", line, err)
}
if len(evts) != 0 {
t.Errorf("expected no events for role=%s, got %d", line[:20], len(evts))
}
}
}
func TestVibeParser_FixtureFile(t *testing.T) {
lines := loadFixture(t, "vibe")
p := newVibeParser()
evts := collectEvents(t, p, lines)
var textEvts int
for _, e := range evts {
if e.Type == stream.EventTextDelta {
textEvts++
}
}
if textEvts == 0 {
t.Error("no EventTextDelta from vibe fixture")
}
}
+85
View File
@@ -0,0 +1,85 @@
// Package subprocess provides a provider.Provider that delegates to CLI agents
// (claude, gemini, vibe) by spawning them as subprocesses.
//
// Impedance mismatch: these CLI agents are full agentic loops, not LLM endpoints.
// Only the latest user message is passed as a prompt. The following provider.Request
// fields are intentionally ignored: Tools, SystemPrompt, Messages (history),
// Temperature, TopP, TopK, Thinking, ResponseFormat, ToolChoice, MaxTokens.
// Internal tool calls executed by the CLI are surfaced as EventTextDelta (opaque).
package subprocess
import (
"context"
"fmt"
"os/exec"
"somegit.dev/Owlibou/gnoma/internal/message"
"somegit.dev/Owlibou/gnoma/internal/provider"
"somegit.dev/Owlibou/gnoma/internal/stream"
)
// Provider wraps a single DiscoveredAgent and implements provider.Provider.
type Provider struct {
agent DiscoveredAgent
}
// New creates a Provider for the given discovered CLI agent.
func New(agent DiscoveredAgent) *Provider {
return &Provider{agent: agent}
}
// Name returns "subprocess" — all CLI agents share this provider namespace.
func (p *Provider) Name() string { return "subprocess" }
// DefaultModel returns the CLI binary name (e.g., "claude", "gemini", "vibe").
func (p *Provider) DefaultModel() string { return p.agent.Name }
// Models returns a single ModelInfo describing this CLI agent.
func (p *Provider) Models(_ context.Context) ([]provider.ModelInfo, error) {
return []provider.ModelInfo{
{
ID: p.agent.Name,
Name: p.agent.DisplayName,
Provider: "subprocess",
Capabilities: p.agent.Capabilities,
},
}, nil
}
// Stream spawns the CLI agent with the latest user message as a prompt and
// returns an event stream. All fields in req except the last user message are
// ignored — see package doc for rationale.
func (p *Provider) Stream(ctx context.Context, req provider.Request) (stream.Stream, error) {
prompt := extractLastUserMessage(req.Messages)
args := p.agent.PromptArgs(prompt)
cmd := exec.CommandContext(ctx, p.agent.Path, args...)
parser := newParser(p.agent.Format)
if parser == nil {
return nil, fmt.Errorf("subprocess: unknown format %q for agent %q", p.agent.Format, p.agent.Name)
}
s, err := newSubprocessStream(ctx, cmd, parser)
if err != nil {
return nil, fmt.Errorf("subprocess %q: %w", p.agent.Name, err)
}
return s, nil
}
// extractLastUserMessage returns the content of the last user-role message in msgs.
// Returns an empty string if there are no user messages.
func extractLastUserMessage(msgs []message.Message) string {
for i := len(msgs) - 1; i >= 0; i-- {
m := msgs[i]
if m.Role != message.RoleUser {
continue
}
for _, c := range m.Content {
if c.Type == message.ContentText && c.Text != "" {
return c.Text
}
}
}
return ""
}
@@ -0,0 +1,79 @@
package subprocess
import (
"context"
"testing"
"somegit.dev/Owlibou/gnoma/internal/provider"
)
func TestProvider_NameAndDefaultModel(t *testing.T) {
agent := DiscoveredAgent{
CLIAgent: CLIAgent{
Name: "testcli",
DisplayName: "Test CLI",
Format: FormatVibeStreaming,
Capabilities: provider.Capabilities{ContextWindow: 32000},
},
Path: "/usr/bin/testcli",
Version: "1.0.0",
}
p := New(agent)
if p.Name() != "subprocess" {
t.Errorf("Name() = %q, want %q", p.Name(), "subprocess")
}
if p.DefaultModel() != "testcli" {
t.Errorf("DefaultModel() = %q, want %q", p.DefaultModel(), "testcli")
}
}
func TestProvider_Models(t *testing.T) {
agent := DiscoveredAgent{
CLIAgent: CLIAgent{
Name: "claude",
DisplayName: "Claude Code",
Format: FormatClaudeStreamJSON,
Capabilities: provider.Capabilities{
ContextWindow: 200000,
},
},
Path: "/usr/bin/claude",
Version: "2.1.0",
}
p := New(agent)
models, err := p.Models(context.Background())
if err != nil {
t.Fatal(err)
}
if len(models) != 1 {
t.Fatalf("Models() returned %d models, want 1", len(models))
}
m := models[0]
if m.Provider != "subprocess" {
t.Errorf("model.Provider = %q, want %q", m.Provider, "subprocess")
}
if m.Capabilities.ContextWindow != 200000 {
t.Errorf("ContextWindow = %d, want 200000", m.Capabilities.ContextWindow)
}
}
func TestProvider_Stream_MissingBinary(t *testing.T) {
agent := DiscoveredAgent{
CLIAgent: CLIAgent{
Name: "nonexistentcli",
DisplayName: "Nonexistent",
Format: FormatVibeStreaming,
PromptArgs: func(p string) []string { return []string{p} },
},
Path: "/nonexistent/cli",
}
p := New(agent)
req := provider.Request{}
_, err := p.Stream(context.Background(), req)
if err == nil {
t.Error("expected error for nonexistent binary, got nil")
}
}
+158
View File
@@ -0,0 +1,158 @@
package subprocess
import (
"bufio"
"bytes"
"context"
"fmt"
"io"
"os/exec"
"strings"
"somegit.dev/Owlibou/gnoma/internal/stream"
)
// subprocessStream implements stream.Stream by reading line-delimited JSON
// from a subprocess stdout and converting lines via a FormatParser.
type subprocessStream struct {
cmd *exec.Cmd
stdout io.ReadCloser
stderrBuf *bytes.Buffer
scanner *bufio.Scanner
parser FormatParser
pending []stream.Event
current stream.Event
err error
done bool
waited bool
}
// newSubprocessStream starts cmd, attaches a stdout pipe, and returns the stream.
// The caller must call Close() to release resources.
func newSubprocessStream(ctx context.Context, cmd *exec.Cmd, parser FormatParser) (*subprocessStream, error) {
stdout, err := cmd.StdoutPipe()
if err != nil {
return nil, fmt.Errorf("subprocess: stdout pipe: %w", err)
}
// Capture stderr for error messages; bounded to 8KB.
stderrBuf := &bytes.Buffer{}
cmd.Stderr = &limitedWriter{w: stderrBuf, n: 8192}
// Explicitly close stdin so the subprocess doesn't block waiting for input.
cmd.Stdin = nil
if err := cmd.Start(); err != nil {
return nil, fmt.Errorf("subprocess: start: %w", err)
}
_ = ctx // context cancellation is handled by exec.CommandContext in the caller
scanner := bufio.NewScanner(stdout)
scanner.Buffer(make([]byte, 1024*1024), 1024*1024) // 1MB line buffer
return &subprocessStream{
cmd: cmd,
stdout: stdout,
stderrBuf: stderrBuf,
scanner: scanner,
parser: parser,
}, nil
}
func (s *subprocessStream) Next() bool {
if s.done || s.err != nil {
return false
}
for {
// Drain buffered events first.
if len(s.pending) > 0 {
s.current = s.pending[0]
s.pending = s.pending[1:]
return true
}
// Read next line from subprocess stdout.
if !s.scanner.Scan() {
// EOF — process has exited (or pipe closed).
if err := s.scanner.Err(); err != nil {
s.err = err
return false
}
// Emit final events from parser.
final := s.parser.Done()
if len(final) > 0 {
s.pending = final
}
// Wait for process to exit and surface any non-zero exit code.
s.reap()
s.done = true
if len(s.pending) > 0 {
s.current = s.pending[0]
s.pending = s.pending[1:]
return s.err == nil
}
return false
}
line := s.scanner.Bytes()
if len(line) == 0 {
continue
}
evts, err := s.parser.ParseLine(line)
if err != nil {
// Non-fatal parse error: skip the line but continue.
continue
}
s.pending = append(s.pending, evts...)
}
}
func (s *subprocessStream) Current() stream.Event { return s.current }
func (s *subprocessStream) Err() error { return s.err }
func (s *subprocessStream) Close() error {
if s.cmd.Process != nil {
_ = s.cmd.Process.Kill()
}
_ = s.stdout.Close()
s.reap()
return nil
}
// reap waits for the process exactly once. Non-zero exit is stored as stream error.
func (s *subprocessStream) reap() {
if s.waited {
return
}
s.waited = true
if err := s.cmd.Wait(); err != nil {
if s.err == nil {
msg := strings.TrimSpace(s.stderrBuf.String())
if msg != "" {
s.err = fmt.Errorf("subprocess: %w: %s", err, msg)
} else {
s.err = fmt.Errorf("subprocess: %w", err)
}
}
}
}
// limitedWriter is a writer that stops writing after n bytes.
type limitedWriter struct {
w io.Writer
n int
}
func (lw *limitedWriter) Write(p []byte) (int, error) {
if lw.n <= 0 {
return len(p), nil // silently discard
}
if len(p) > lw.n {
p = p[:lw.n]
}
n, err := lw.w.Write(p)
lw.n -= n
return n, err
}
@@ -0,0 +1,90 @@
package subprocess
import (
"context"
"os/exec"
"runtime"
"testing"
"somegit.dev/Owlibou/gnoma/internal/stream"
)
// TestSubprocessStream_EchoShell runs a real subprocess (printf) and verifies
// the stream delivers the expected events.
func TestSubprocessStream_EchoShell(t *testing.T) {
if runtime.GOOS == "windows" {
t.Skip("shell printf not available on windows")
}
// Feed a vibe-format line through a printf subprocess.
// We use vibe format because it's the simplest (no "done" event needed).
line := `{"role":"assistant","content":"hello from subprocess","reasoning_content":null,"tool_calls":null,"message_id":"abc"}`
cmd := exec.Command("sh", "-c", "printf '%s\n'", line)
// Actually build the printf command with the correct argument
cmd = exec.CommandContext(context.Background(), "sh", "-c",
`printf '{"role":"assistant","content":"hello from subprocess","reasoning_content":null,"tool_calls":null,"message_id":"abc"}\n'`)
s, err := newSubprocessStream(context.Background(), cmd, newVibeParser())
if err != nil {
t.Fatal(err)
}
defer s.Close()
var texts []string
for s.Next() {
ev := s.Current()
if ev.Type == stream.EventTextDelta {
texts = append(texts, ev.Text)
}
}
if err := s.Err(); err != nil {
t.Fatalf("stream error: %v", err)
}
if len(texts) == 0 {
t.Fatal("no text events received")
}
if texts[0] != "hello from subprocess" {
t.Errorf("got text %q, want %q", texts[0], "hello from subprocess")
}
}
func TestSubprocessStream_ContextCancel(t *testing.T) {
if runtime.GOOS == "windows" {
t.Skip("sleep not available on windows")
}
ctx, cancel := context.WithCancel(context.Background())
cmd := exec.CommandContext(ctx, "sh", "-c", "sleep 30")
s, err := newSubprocessStream(ctx, cmd, newVibeParser())
if err != nil {
t.Fatal(err)
}
defer s.Close()
cancel()
// Drain — should stop quickly due to context cancellation.
for s.Next() {
}
// No error assertion: context cancel may or may not propagate as stream error.
}
func TestSubprocessStream_ProcessError(t *testing.T) {
if runtime.GOOS == "windows" {
t.Skip("sh not available on windows")
}
cmd := exec.CommandContext(context.Background(), "sh", "-c", "exit 1")
s, err := newSubprocessStream(context.Background(), cmd, newVibeParser())
if err != nil {
t.Fatal(err)
}
defer s.Close()
for s.Next() {
}
// A non-zero exit should surface as a stream error.
if s.Err() == nil {
t.Error("expected stream error for non-zero exit, got nil")
}
}
+4
View File
@@ -0,0 +1,4 @@
{"type":"system","subtype":"init","cwd":"/home/user/project","session_id":"8ae89dc8","tools":["Bash","Read"],"mcp_servers":[],"model":"claude-sonnet-4-6","permissionMode":"default","apiKeySource":"none","claude_code_version":"2.1.131","uuid":"549e7847"}
{"type":"assistant","message":{"model":"claude-sonnet-4-6","id":"msg_01SRH","type":"message","role":"assistant","content":[{"type":"text","text":"hi"}],"stop_reason":null,"stop_sequence":null,"usage":{"input_tokens":3,"cache_creation_input_tokens":11062,"cache_read_input_tokens":12372,"output_tokens":1,"service_tier":"standard"},"context_management":null},"parent_tool_use_id":null,"session_id":"8ae89dc8","uuid":"97ab5bd6"}
{"type":"rate_limit_event","rate_limit_info":{"status":"allowed_warning","rateLimitType":"seven_day","utilization":0.76},"uuid":"12249404","session_id":"8ae89dc8"}
{"type":"result","subtype":"success","is_error":false,"duration_ms":1989,"duration_api_ms":1856,"num_turns":1,"result":"hi","stop_reason":"end_turn","session_id":"8ae89dc8","total_cost_usd":0.045,"usage":{"input_tokens":3,"cache_creation_input_tokens":11062,"cache_read_input_tokens":12372,"output_tokens":4,"service_tier":"standard"},"modelUsage":{"claude-sonnet-4-6":{"inputTokens":3,"outputTokens":4}},"terminal_reason":"completed","uuid":"cd4fcb02"}
+4
View File
@@ -0,0 +1,4 @@
{"type":"init","timestamp":"2026-05-07T12:18:49.605Z","session_id":"c2d08136","model":"auto-gemini-3"}
{"type":"message","timestamp":"2026-05-07T12:18:49.606Z","role":"user","content":"say the word hi and nothing else"}
{"type":"message","timestamp":"2026-05-07T12:18:52.063Z","role":"assistant","content":"hi","delta":true}
{"type":"result","timestamp":"2026-05-07T12:18:52.111Z","status":"success","stats":{"total_tokens":10843,"input_tokens":10747,"output_tokens":1,"cached":7862,"input":2885,"duration_ms":2506,"tool_calls":0,"models":{"gemini-3-flash-preview":{"total_tokens":10843,"input_tokens":10747,"output_tokens":1,"cached":7862,"input":2885}}}}
+3
View File
@@ -0,0 +1,3 @@
{"role": "system", "content": "You are Mistral Vibe, a CLI coding agent built by Mistral AI.", "injected": false, "reasoning_content": null, "reasoning_state": null, "tool_calls": null, "name": null, "tool_call_id": null, "message_id": null}
{"role": "user", "content": "say the word hi and nothing else", "injected": false, "reasoning_content": null, "tool_calls": null, "name": null, "tool_call_id": null, "message_id": "a358a1ec"}
{"role": "assistant", "content": "hi", "injected": false, "reasoning_content": "The user wants me to say the word \"hi\" and nothing else.", "reasoning_state": null, "tool_calls": null, "name": null, "tool_call_id": null, "message_id": "c8ecdd49"}