From aca830e7dbc181d6a66733740b618cce6950abf7 Mon Sep 17 00:00:00 2001 From: vikingowl <26+vikingowl@noreply.somegit.dev> Date: Wed, 20 May 2026 02:20:00 +0200 Subject: [PATCH] feat(engine): consumption-time stream-error failover MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit When a stream errors out before producing any user-visible content (text, thinking, or tool calls), the engine now transparently retries on the next-best arm instead of bubbling the error to the TUI. Covers the case from the post-SLM screenshot: subprocess CLI agents that exit non-zero on auth/config failures, network drops mid-stream, rate-limited arms whose error surfaces after Stream() already returned. Mechanism: the stream-create + consume blocks are wrapped in a labeled streamLoop. On s.Err() != nil with empty accumulator, the engine emits a new EventFailover ("↻ failed () — retrying on another arm"), excludes the failed arm via task.ExcludedArms, and re-enters the loop. Cap of 4 failovers per round. Guards: - !acc.HasContent() — if text/tool calls already streamed, fail loud rather than duplicate visible output on retry. - isFailoverable(err) — deny-list approach: context.Canceled/Deadline and HTTP 400/413 are fatal; everything else (auth, rate limit, 5xx, subprocess exit, network) is failoverable. - Router.ForcedArm() == "" — when the user pinned an arm via --provider, failover is disabled by design. - failoverAttempt < maxFailovers — bounded retry budget. TUI renders EventFailover under the existing "cost" role styling. shortFailReason strips the subprocess wrapper envelope so the user sees "Invalid API key. Try again." instead of "subprocess: exit status 1: Error: Invalid API key. Try again.". Tests cover the classifier (isFailoverable, shortFailReason), end-to-end auth-error failover, content-already-streamed guard, and context-cancel guard. Deterministic across 10x -race runs by giving the failing arm IsCLIAgent=true to anchor it in tier 0 ahead of the API-tier backup. --- TODO.md | 1 - internal/engine/failover_test.go | 271 +++++++++++++++++++++++++++++++ internal/engine/loop.go | 133 +++++++++++++-- internal/stream/accumulator.go | 23 +++ internal/stream/event.go | 12 +- internal/tui/events.go | 7 + 6 files changed, 434 insertions(+), 13 deletions(-) create mode 100644 internal/engine/failover_test.go diff --git a/TODO.md b/TODO.md index de4f23e..c60a624 100644 --- a/TODO.md +++ b/TODO.md @@ -35,7 +35,6 @@ Phases (2026-05-07 roadmap): - **Thinking mode** (disabled / budget / adaptive) — M12 in milestones - **Structured output** with JSON schema validation — M12 - **Native agy JSON output** — update subprocess provider to use `--output-format stream-json` once supported by agy CLI, replacing the current prompt-augmentation fallback. -- **Stream-error failover** — when an arm's `Stream` returns an error (auth failure, rate limit, subprocess exit, transport error), the router should transparently retry on the next-best arm matching the task type and surface a one-line hint to the user (e.g. `↻ vibe failed (Invalid API key), retried on anthropic/claude-opus-4-7`). Today the error bubbles straight to the TUI and the turn dies. Needs: classify retryable vs. fatal errors, cap retries per turn, ensure no duplicate billing on streams that emitted partial usage. - **SQLite session persistence** + serve mode — M10 - **Task learning** (pattern recognition, persistent tasks) — M11 - **Web UI** (`gnoma web`) — M15 diff --git a/internal/engine/failover_test.go b/internal/engine/failover_test.go new file mode 100644 index 0000000..ed62549 --- /dev/null +++ b/internal/engine/failover_test.go @@ -0,0 +1,271 @@ +package engine + +import ( + "context" + "errors" + "fmt" + "strings" + "testing" + + "somegit.dev/Owlibou/gnoma/internal/message" + "somegit.dev/Owlibou/gnoma/internal/provider" + "somegit.dev/Owlibou/gnoma/internal/router" + "somegit.dev/Owlibou/gnoma/internal/security" + "somegit.dev/Owlibou/gnoma/internal/stream" + "somegit.dev/Owlibou/gnoma/internal/tool" +) + +// --- isFailoverable --- + +func TestIsFailoverable(t *testing.T) { + cases := []struct { + name string + err error + want bool + }{ + {"nil", nil, false}, + {"context canceled", context.Canceled, false}, + {"context deadline", context.DeadlineExceeded, false}, + {"wrapped context canceled", fmt.Errorf("wrapped: %w", context.Canceled), false}, + {"bad request 400", &provider.ProviderError{StatusCode: 400, Message: "schema"}, false}, + {"too large 413", &provider.ProviderError{StatusCode: 413, Message: "too big"}, false}, + {"auth 401", &provider.ProviderError{StatusCode: 401, Message: "unauthorized"}, true}, + {"forbidden 403", &provider.ProviderError{StatusCode: 403, Message: "forbidden"}, true}, + {"rate limit 429", &provider.ProviderError{StatusCode: 429, Message: "rate"}, true}, + {"server error 500", &provider.ProviderError{StatusCode: 500, Message: "boom"}, true}, + {"subprocess auth error", errors.New("subprocess: exit status 1: Error: Invalid API key"), true}, + {"generic network error", errors.New("connection reset"), true}, + } + for _, tc := range cases { + t.Run(tc.name, func(t *testing.T) { + if got := isFailoverable(tc.err); got != tc.want { + t.Errorf("isFailoverable(%v) = %v, want %v", tc.err, got, tc.want) + } + }) + } +} + +// --- shortFailReason --- + +func TestShortFailReason(t *testing.T) { + cases := []struct { + name string + err error + want string + }{ + { + name: "nil", + err: nil, + want: "", + }, + { + name: "simple message", + err: errors.New("bad thing"), + want: "bad thing", + }, + { + name: "subprocess wrapper stripped", + err: errors.New("subprocess: exit status 1: Error: Invalid API key. Try again."), + want: "Invalid API key. Try again.", + }, + { + name: "newlines collapsed", + err: errors.New("first line\nsecond line"), + want: "first line second line", + }, + } + for _, tc := range cases { + t.Run(tc.name, func(t *testing.T) { + if got := shortFailReason(tc.err); got != tc.want { + t.Errorf("shortFailReason() = %q, want %q", got, tc.want) + } + }) + } +} + +func TestShortFailReason_Truncates(t *testing.T) { + long := strings.Repeat("x", 500) + got := shortFailReason(errors.New(long)) + if len(got) > 160 { + t.Errorf("expected truncation to <= 160 chars, got %d", len(got)) + } + if !strings.HasSuffix(got, "...") { + t.Errorf("expected ellipsis suffix, got %q", got[len(got)-10:]) + } +} + +// --- end-to-end failover via router --- + +// erroringStream returns no events and reports err from Err(). +type erroringStream struct { + err error + done bool +} + +func (s *erroringStream) Next() bool { return false } +func (s *erroringStream) Current() stream.Event { return stream.Event{} } +func (s *erroringStream) Err() error { return s.err } +func (s *erroringStream) Close() error { s.done = true; return nil } + +// makeFailoverArm builds a router.Arm wrapping a mock provider whose Stream +// either returns an erroringStream (when failErr is non-nil) or a successful +// eventStream with the given text. isCLIAgent puts the arm into the router's +// tier-0 bucket so tests can deterministically order which arm gets picked +// first (otherwise the quality bandit makes selection flaky). +func makeFailoverArm(t *testing.T, id router.ArmID, text string, failErr error, isCLIAgent bool) *router.Arm { + t.Helper() + mp := &mockProvider{name: id.Provider()} + if failErr != nil { + mp.streams = []stream.Stream{&erroringStream{err: failErr}} + } else { + mp.streams = []stream.Stream{ + newEventStream(message.StopEndTurn, id.Model(), + stream.Event{Type: stream.EventTextDelta, Text: text}, + ), + } + } + return &router.Arm{ + ID: id, + Provider: security.WrapProvider(mp, nil), + ModelName: id.Model(), + IsCLIAgent: isCLIAgent, + Capabilities: provider.Capabilities{ToolUse: true, ContextWindow: 32000}, + } +} + +func TestEngine_FailsOver_OnConsumptionTimeAuthError(t *testing.T) { + rtr := router.New(router.Config{}) + // vibe is tier-0 (CLI agent); claude is tier-2 (API). Router picks vibe + // first; on exclusion after failover, claude is the only remaining arm. + rtr.RegisterArm(makeFailoverArm(t, router.NewArmID("subprocess", "vibe"), "", + errors.New("subprocess: exit status 1: Error: Invalid API key"), true)) + rtr.RegisterArm(makeFailoverArm(t, router.NewArmID("anthropic", "claude"), "Hello world", nil, false)) + + e, err := New(Config{ + Provider: secureMock(&mockProvider{name: "fallback"}), + Router: rtr, + Tools: tool.NewRegistry(), + }) + if err != nil { + t.Fatalf("New: %v", err) + } + + var events []stream.Event + turn, err := e.Submit(context.Background(), "hi", func(evt stream.Event) { + events = append(events, evt) + }) + if err != nil { + t.Fatalf("Submit: %v", err) + } + + // Failover event must have fired. + var failover *stream.Event + for i := range events { + if events[i].Type == stream.EventFailover { + failover = &events[i] + break + } + } + if failover == nil { + t.Fatal("expected EventFailover in callback events") + } + if !strings.Contains(failover.FailedArm, "vibe") { + t.Errorf("FailedArm = %q, want one mentioning vibe", failover.FailedArm) + } + if !strings.Contains(failover.FailedReason, "Invalid API key") { + t.Errorf("FailedReason = %q, want one mentioning Invalid API key", failover.FailedReason) + } + + // Final response must be from the successful arm. + if len(turn.Messages) == 0 || turn.Messages[len(turn.Messages)-1].TextContent() != "Hello world" { + t.Errorf("final text = %q, want %q", + turn.Messages[len(turn.Messages)-1].TextContent(), "Hello world") + } +} + +func TestEngine_DoesNotFailOver_AfterContentEmitted(t *testing.T) { + // One arm. Its stream emits text, then errors. The engine must NOT + // attempt failover (would-be-duplicate output) — content is visible + // to the user, so the only honest response is to surface the error. + // Single-arm setup keeps the test deterministic without needing to + // stub the router's bandit selection. + rtr := router.New(router.Config{}) + mp := &mockProvider{ + name: "subprocess", + streams: []stream.Stream{ + &halfFailStream{ + preEvents: []stream.Event{{Type: stream.EventTextDelta, Text: "partial..."}}, + err: errors.New("subprocess: exit status 1: Error: disconnected"), + }, + }, + } + rtr.RegisterArm(&router.Arm{ + ID: router.NewArmID("subprocess", "vibe"), + Provider: security.WrapProvider(mp, nil), + ModelName: "vibe", + Capabilities: provider.Capabilities{ToolUse: true, ContextWindow: 32000}, + }) + + e, _ := New(Config{ + Provider: secureMock(&mockProvider{name: "fallback"}), + Router: rtr, + Tools: tool.NewRegistry(), + }) + + var events []stream.Event + _, err := e.Submit(context.Background(), "hi", func(evt stream.Event) { + events = append(events, evt) + }) + if err == nil { + t.Fatal("expected error to bubble up since content was already streamed") + } + for _, ev := range events { + if ev.Type == stream.EventFailover { + t.Errorf("unexpected EventFailover after content emission") + } + } +} + +func TestEngine_DoesNotFailOver_OnContextCancel(t *testing.T) { + rtr := router.New(router.Config{}) + rtr.RegisterArm(makeFailoverArm(t, router.NewArmID("subprocess", "vibe"), "", + fmt.Errorf("wrapped: %w", context.Canceled), true)) + rtr.RegisterArm(makeFailoverArm(t, router.NewArmID("anthropic", "claude"), "should-not-run", nil, false)) + + e, _ := New(Config{ + Provider: secureMock(&mockProvider{name: "fallback"}), + Router: rtr, + Tools: tool.NewRegistry(), + }) + + var events []stream.Event + _, err := e.Submit(context.Background(), "hi", func(evt stream.Event) { + events = append(events, evt) + }) + if err == nil { + t.Fatal("expected context-canceled error to surface, got nil") + } + for _, ev := range events { + if ev.Type == stream.EventFailover { + t.Error("EventFailover must not fire on context.Canceled") + } + } +} + +// halfFailStream emits preEvents, then reports err from Err(). +type halfFailStream struct { + preEvents []stream.Event + idx int + err error +} + +func (s *halfFailStream) Next() bool { + if s.idx >= len(s.preEvents) { + return false + } + s.idx++ + return true +} +func (s *halfFailStream) Current() stream.Event { return s.preEvents[s.idx-1] } +func (s *halfFailStream) Err() error { return s.err } +func (s *halfFailStream) Close() error { return nil } diff --git a/internal/engine/loop.go b/internal/engine/loop.go index 75ac11e..fd3adab 100644 --- a/internal/engine/loop.go +++ b/internal/engine/loop.go @@ -109,10 +109,29 @@ func (e *Engine) runLoop(ctx context.Context, cb Callback) (*Turn, error) { // Build provider request (gates tools on model capabilities) req := e.buildRequest(ctx) - // Route and stream + // Route and stream. Both stream-creation errors (existing path) and + // stream-consumption errors (new path, end of streamLoop) can trigger + // failover to a different arm. failedArms accumulates across the whole + // round so the router doesn't re-pick a known-broken arm. var s stream.Stream var err error var decision router.RoutingDecision + var failedArms []router.ArmID + var acc *stream.Accumulator + var stopReason message.StopReason + var model string + var streamStart, streamEnd time.Time + var firstTokenAt time.Time + var repetitionTripped bool + + // maxFailovers caps the consumption-time failover budget per round. + // Creation-time retries inside retryOnTransient have their own + // 4-attempt budget; together they bound total arm attempts per round. + const maxFailovers = 4 + failoverAttempt := 0 + + streamLoop: + for { if e.cfg.Router != nil { prompt := e.latestUserPrompt() @@ -122,11 +141,13 @@ func (e *Engine) runLoop(ctx context.Context, cb Callback) (*Turn, error) { } else { task.EstimatedTokens = int(gnomactx.EstimateTokens(prompt)) } + task.ExcludedArms = failedArms e.logger.Debug("routing request", "task_type", task.Type, "complexity", task.ComplexityScore, "round", turn.Rounds, + "failover_attempt", failoverAttempt, ) s, decision, err = e.cfg.Router.Stream(ctx, task, req) @@ -142,7 +163,7 @@ func (e *Engine) runLoop(ctx context.Context, cb Callback) (*Turn, error) { "tools", len(req.Tools), "round", turn.Rounds, ) - if turn.Rounds == 1 && cb != nil { + if turn.Rounds == 1 && failoverAttempt == 0 && cb != nil { cb(stream.Event{ Type: stream.EventRouting, RoutingModel: string(decision.Arm.ID), @@ -163,7 +184,6 @@ func (e *Engine) runLoop(ctx context.Context, cb Callback) (*Turn, error) { s, err = prov.Stream(ctx, req) } if err != nil { - var failedArms []router.ArmID if e.cfg.Router != nil && decision.Arm != nil { failedArms = append(failedArms, decision.Arm.ID) } @@ -224,13 +244,12 @@ func (e *Engine) runLoop(ctx context.Context, cb Callback) (*Turn, error) { // Consume stream, forwarding events to callback. // Track TTFT and stream duration for arm performance metrics. - acc := stream.NewAccumulator() - var stopReason message.StopReason - var model string - - streamStart := time.Now() - var firstTokenAt time.Time - repetitionTripped := false + acc = stream.NewAccumulator() + stopReason = "" + model = "" + streamStart = time.Now() + firstTokenAt = time.Time{} + repetitionTripped = false for s.Next() { evt := s.Current() @@ -267,15 +286,53 @@ func (e *Engine) runLoop(ctx context.Context, cb Callback) (*Turn, error) { cb(evt) } } - streamEnd := time.Now() + streamEnd = time.Now() if err := s.Err(); err != nil { e.logger.Debug("stream terminated with error", "error", err, "rounds", turn.Rounds, + "failover_attempt", failoverAttempt, + "has_content", acc.HasContent(), ) if closeErr := s.Close(); closeErr != nil { e.logger.Warn("stream close after error failed", "error", closeErr) } + + // Consumption-time failover: the stream errored before producing + // any user-visible content, the error class warrants trying a + // different arm, and we have a router that can pick one. Emit a + // hint to the TUI, exclude the failed arm, and loop. If any of + // these guards fails — content was streamed, error is fatal, the + // arm was force-pinned, retry budget exhausted — fall through + // to the existing terminal error path so the user sees what + // went wrong instead of a silent stall. + canFailover := e.cfg.Router != nil && + e.cfg.Router.ForcedArm() == "" && + decision.Arm != nil && + !acc.HasContent() && + isFailoverable(err) && + failoverAttempt < maxFailovers + if canFailover { + failedArmID := decision.Arm.ID + failedArms = append(failedArms, failedArmID) + decision.Rollback() + if cb != nil { + cb(stream.Event{ + Type: stream.EventFailover, + FailedArm: string(failedArmID), + FailedReason: shortFailReason(err), + Err: err, + }) + } + e.logger.Info("stream failover", + "failed_arm", failedArmID, + "reason", err, + "attempt", failoverAttempt+1, + ) + failoverAttempt++ + continue streamLoop + } + decision.Rollback() streamErr := e.annotateStreamError(err, len(req.Tools)) reportOutcome(streamErr) @@ -284,6 +341,8 @@ func (e *Engine) runLoop(ctx context.Context, cb Callback) (*Turn, error) { if err := s.Close(); err != nil { e.logger.Warn("stream close failed", "error", err) } + break streamLoop + } // end streamLoop // Build response resp := acc.Response(stopReason, model) @@ -848,6 +907,58 @@ func (e *Engine) retryOnTransient(ctx context.Context, firstErr error, skipDelay return nil, firstErr } +// isFailoverable reports whether err warrants asking the router for a +// different arm. Broader than retryOnTransient's Retryable check: a +// subprocess CLI agent that exits 1 because of bad credentials is not a +// provider.ProviderError but still calls for trying a different arm. +// +// Conservative deny-list — fatal classes that another arm cannot help with: +// - context.Canceled / DeadlineExceeded: user-driven abort, propagate. +// - HTTP 400 (bad request) / 413 (too large): request-shape problem, +// other arms will reject it the same way. (413 has its own dedicated +// reactive-compaction path; we don't want to compete with it here.) +// +// Everything else — auth (401/403), rate limits (429), 5xx, subprocess +// errors, network/transport failures — is failoverable. +func isFailoverable(err error) bool { + if err == nil { + return false + } + if errors.Is(err, context.Canceled) || errors.Is(err, context.DeadlineExceeded) { + return false + } + var provErr *provider.ProviderError + if errors.As(err, &provErr) { + if provErr.StatusCode == 400 || provErr.StatusCode == 413 { + return false + } + } + return true +} + +// shortFailReason produces a one-line summary of err for TUI display. +// Long error chains (a full subprocess exit + stderr dump can run to a +// few hundred chars) are truncated to keep the rendered hint readable. +func shortFailReason(err error) string { + if err == nil { + return "" + } + s := err.Error() + // Strip the leading "subprocess: exit status N: " envelope when the + // CLI agent has surfaced its own message after the colon; the user + // cares about the inner message, not our wrapper. + if i := strings.Index(s, "Error: "); i >= 0 && i < 80 { + s = s[i+len("Error: "):] + } + if len(s) > 160 { + s = s[:157] + "..." + } + // Collapse newlines to single spaces — multi-line stderr breaks TUI layout. + s = strings.ReplaceAll(s, "\n", " ") + s = strings.ReplaceAll(s, "\r", " ") + return strings.TrimSpace(s) +} + // annotateStreamError wraps a stream error with diagnostic context when the // failure is a deterministic tool-parse error from a local server. The extra // context is visible in the TUI (slog.Debug goes to a file). diff --git a/internal/stream/accumulator.go b/internal/stream/accumulator.go index 404613e..b7fc021 100644 --- a/internal/stream/accumulator.go +++ b/internal/stream/accumulator.go @@ -93,6 +93,29 @@ func (a *Accumulator) Apply(e Event) { } } +// HasContent reports whether any user-visible content (text, thinking, or +// tool calls) has been accumulated. Used by the engine to decide whether a +// stream error is safe to fail over to another arm: a retry that would +// produce duplicate text after the user already saw partial output is +// worse than surfacing the error. +// +// Usage events alone do not count as content — they are bookkeeping. +func (a *Accumulator) HasContent() bool { + if a.textBuf != nil && a.textBuf.Len() > 0 { + return true + } + if a.thinkBuf != nil && a.thinkBuf.Len() > 0 { + return true + } + if len(a.content) > 0 { + return true + } + if len(a.toolCallOrder) > 0 { + return true + } + return false +} + // Response builds the final message.Response from all accumulated events. func (a *Accumulator) Response(stopReason message.StopReason, model string) message.Response { a.flushText() diff --git a/internal/stream/event.go b/internal/stream/event.go index a2b27fa..c588208 100644 --- a/internal/stream/event.go +++ b/internal/stream/event.go @@ -19,7 +19,8 @@ const ( EventToolResult // tool execution output EventPermissionReq // permission prompt needed EventUsage - EventRouting // router arm selection + EventRouting // router arm selection + EventFailover // arm swap after a stream error (no content was emitted) EventError ) @@ -43,6 +44,8 @@ func (et EventType) String() string { return "usage" case EventRouting: return "routing" + case EventFailover: + return "failover" case EventError: return "error" default: @@ -80,6 +83,13 @@ type Event struct { RoutingTask string // classified task type RoutingClassifier string // classifier source: heuristic / slm / slm_fallback + // Failover — set on EventFailover. FailedArm is the arm whose stream + // errored out before producing content; FailedReason is a short message + // suitable for TUI display (the underlying error is in Err). RoutingModel + // holds the arm being tried next. + FailedArm string + FailedReason string + // Error Err error diff --git a/internal/tui/events.go b/internal/tui/events.go index 6521989..5fd9f12 100644 --- a/internal/tui/events.go +++ b/internal/tui/events.go @@ -54,6 +54,13 @@ func (m Model) handleStreamEvent(evt stream.Event) (tea.Model, tea.Cmd) { } content += ")" m.messages = append(m.messages, chatMessage{role: "cost", content: content}) + case stream.EventFailover: + content := fmt.Sprintf("↻ %s failed", evt.FailedArm) + if evt.FailedReason != "" { + content += " (" + evt.FailedReason + ")" + } + content += " — retrying on another arm" + m.messages = append(m.messages, chatMessage{role: "cost", content: content}) case stream.EventToolResult: if m.elfToolActive { // Suppress raw elf output — tree shows progress, LLM summarizes