feat(engine): consumption-time stream-error failover
When a stream errors out before producing any user-visible content
(text, thinking, or tool calls), the engine now transparently retries
on the next-best arm instead of bubbling the error to the TUI. Covers
the case from the post-SLM screenshot: subprocess CLI agents that
exit non-zero on auth/config failures, network drops mid-stream,
rate-limited arms whose error surfaces after Stream() already returned.
Mechanism: the stream-create + consume blocks are wrapped in a labeled
streamLoop. On s.Err() != nil with empty accumulator, the engine emits
a new EventFailover ("↻ <failed_arm> failed (<reason>) — retrying on
another arm"), excludes the failed arm via task.ExcludedArms, and
re-enters the loop. Cap of 4 failovers per round.
Guards:
- !acc.HasContent() — if text/tool calls already streamed, fail loud
rather than duplicate visible output on retry.
- isFailoverable(err) — deny-list approach: context.Canceled/Deadline
and HTTP 400/413 are fatal; everything else (auth, rate limit, 5xx,
subprocess exit, network) is failoverable.
- Router.ForcedArm() == "" — when the user pinned an arm via --provider,
failover is disabled by design.
- failoverAttempt < maxFailovers — bounded retry budget.
TUI renders EventFailover under the existing "cost" role styling.
shortFailReason strips the subprocess wrapper envelope so the user sees
"Invalid API key. Try again." instead of
"subprocess: exit status 1: Error: Invalid API key. Try again.".
Tests cover the classifier (isFailoverable, shortFailReason), end-to-end
auth-error failover, content-already-streamed guard, and context-cancel
guard. Deterministic across 10x -race runs by giving the failing arm
IsCLIAgent=true to anchor it in tier 0 ahead of the API-tier backup.
This commit is contained in:
@@ -35,7 +35,6 @@ Phases (2026-05-07 roadmap):
|
||||
- **Thinking mode** (disabled / budget / adaptive) — M12 in milestones
|
||||
- **Structured output** with JSON schema validation — M12
|
||||
- **Native agy JSON output** — update subprocess provider to use `--output-format stream-json` once supported by agy CLI, replacing the current prompt-augmentation fallback.
|
||||
- **Stream-error failover** — when an arm's `Stream` returns an error (auth failure, rate limit, subprocess exit, transport error), the router should transparently retry on the next-best arm matching the task type and surface a one-line hint to the user (e.g. `↻ vibe failed (Invalid API key), retried on anthropic/claude-opus-4-7`). Today the error bubbles straight to the TUI and the turn dies. Needs: classify retryable vs. fatal errors, cap retries per turn, ensure no duplicate billing on streams that emitted partial usage.
|
||||
- **SQLite session persistence** + serve mode — M10
|
||||
- **Task learning** (pattern recognition, persistent tasks) — M11
|
||||
- **Web UI** (`gnoma web`) — M15
|
||||
|
||||
@@ -0,0 +1,271 @@
|
||||
package engine
|
||||
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
"fmt"
|
||||
"strings"
|
||||
"testing"
|
||||
|
||||
"somegit.dev/Owlibou/gnoma/internal/message"
|
||||
"somegit.dev/Owlibou/gnoma/internal/provider"
|
||||
"somegit.dev/Owlibou/gnoma/internal/router"
|
||||
"somegit.dev/Owlibou/gnoma/internal/security"
|
||||
"somegit.dev/Owlibou/gnoma/internal/stream"
|
||||
"somegit.dev/Owlibou/gnoma/internal/tool"
|
||||
)
|
||||
|
||||
// --- isFailoverable ---
|
||||
|
||||
func TestIsFailoverable(t *testing.T) {
|
||||
cases := []struct {
|
||||
name string
|
||||
err error
|
||||
want bool
|
||||
}{
|
||||
{"nil", nil, false},
|
||||
{"context canceled", context.Canceled, false},
|
||||
{"context deadline", context.DeadlineExceeded, false},
|
||||
{"wrapped context canceled", fmt.Errorf("wrapped: %w", context.Canceled), false},
|
||||
{"bad request 400", &provider.ProviderError{StatusCode: 400, Message: "schema"}, false},
|
||||
{"too large 413", &provider.ProviderError{StatusCode: 413, Message: "too big"}, false},
|
||||
{"auth 401", &provider.ProviderError{StatusCode: 401, Message: "unauthorized"}, true},
|
||||
{"forbidden 403", &provider.ProviderError{StatusCode: 403, Message: "forbidden"}, true},
|
||||
{"rate limit 429", &provider.ProviderError{StatusCode: 429, Message: "rate"}, true},
|
||||
{"server error 500", &provider.ProviderError{StatusCode: 500, Message: "boom"}, true},
|
||||
{"subprocess auth error", errors.New("subprocess: exit status 1: Error: Invalid API key"), true},
|
||||
{"generic network error", errors.New("connection reset"), true},
|
||||
}
|
||||
for _, tc := range cases {
|
||||
t.Run(tc.name, func(t *testing.T) {
|
||||
if got := isFailoverable(tc.err); got != tc.want {
|
||||
t.Errorf("isFailoverable(%v) = %v, want %v", tc.err, got, tc.want)
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
// --- shortFailReason ---
|
||||
|
||||
func TestShortFailReason(t *testing.T) {
|
||||
cases := []struct {
|
||||
name string
|
||||
err error
|
||||
want string
|
||||
}{
|
||||
{
|
||||
name: "nil",
|
||||
err: nil,
|
||||
want: "",
|
||||
},
|
||||
{
|
||||
name: "simple message",
|
||||
err: errors.New("bad thing"),
|
||||
want: "bad thing",
|
||||
},
|
||||
{
|
||||
name: "subprocess wrapper stripped",
|
||||
err: errors.New("subprocess: exit status 1: Error: Invalid API key. Try again."),
|
||||
want: "Invalid API key. Try again.",
|
||||
},
|
||||
{
|
||||
name: "newlines collapsed",
|
||||
err: errors.New("first line\nsecond line"),
|
||||
want: "first line second line",
|
||||
},
|
||||
}
|
||||
for _, tc := range cases {
|
||||
t.Run(tc.name, func(t *testing.T) {
|
||||
if got := shortFailReason(tc.err); got != tc.want {
|
||||
t.Errorf("shortFailReason() = %q, want %q", got, tc.want)
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func TestShortFailReason_Truncates(t *testing.T) {
|
||||
long := strings.Repeat("x", 500)
|
||||
got := shortFailReason(errors.New(long))
|
||||
if len(got) > 160 {
|
||||
t.Errorf("expected truncation to <= 160 chars, got %d", len(got))
|
||||
}
|
||||
if !strings.HasSuffix(got, "...") {
|
||||
t.Errorf("expected ellipsis suffix, got %q", got[len(got)-10:])
|
||||
}
|
||||
}
|
||||
|
||||
// --- end-to-end failover via router ---
|
||||
|
||||
// erroringStream returns no events and reports err from Err().
|
||||
type erroringStream struct {
|
||||
err error
|
||||
done bool
|
||||
}
|
||||
|
||||
func (s *erroringStream) Next() bool { return false }
|
||||
func (s *erroringStream) Current() stream.Event { return stream.Event{} }
|
||||
func (s *erroringStream) Err() error { return s.err }
|
||||
func (s *erroringStream) Close() error { s.done = true; return nil }
|
||||
|
||||
// makeFailoverArm builds a router.Arm wrapping a mock provider whose Stream
|
||||
// either returns an erroringStream (when failErr is non-nil) or a successful
|
||||
// eventStream with the given text. isCLIAgent puts the arm into the router's
|
||||
// tier-0 bucket so tests can deterministically order which arm gets picked
|
||||
// first (otherwise the quality bandit makes selection flaky).
|
||||
func makeFailoverArm(t *testing.T, id router.ArmID, text string, failErr error, isCLIAgent bool) *router.Arm {
|
||||
t.Helper()
|
||||
mp := &mockProvider{name: id.Provider()}
|
||||
if failErr != nil {
|
||||
mp.streams = []stream.Stream{&erroringStream{err: failErr}}
|
||||
} else {
|
||||
mp.streams = []stream.Stream{
|
||||
newEventStream(message.StopEndTurn, id.Model(),
|
||||
stream.Event{Type: stream.EventTextDelta, Text: text},
|
||||
),
|
||||
}
|
||||
}
|
||||
return &router.Arm{
|
||||
ID: id,
|
||||
Provider: security.WrapProvider(mp, nil),
|
||||
ModelName: id.Model(),
|
||||
IsCLIAgent: isCLIAgent,
|
||||
Capabilities: provider.Capabilities{ToolUse: true, ContextWindow: 32000},
|
||||
}
|
||||
}
|
||||
|
||||
func TestEngine_FailsOver_OnConsumptionTimeAuthError(t *testing.T) {
|
||||
rtr := router.New(router.Config{})
|
||||
// vibe is tier-0 (CLI agent); claude is tier-2 (API). Router picks vibe
|
||||
// first; on exclusion after failover, claude is the only remaining arm.
|
||||
rtr.RegisterArm(makeFailoverArm(t, router.NewArmID("subprocess", "vibe"), "",
|
||||
errors.New("subprocess: exit status 1: Error: Invalid API key"), true))
|
||||
rtr.RegisterArm(makeFailoverArm(t, router.NewArmID("anthropic", "claude"), "Hello world", nil, false))
|
||||
|
||||
e, err := New(Config{
|
||||
Provider: secureMock(&mockProvider{name: "fallback"}),
|
||||
Router: rtr,
|
||||
Tools: tool.NewRegistry(),
|
||||
})
|
||||
if err != nil {
|
||||
t.Fatalf("New: %v", err)
|
||||
}
|
||||
|
||||
var events []stream.Event
|
||||
turn, err := e.Submit(context.Background(), "hi", func(evt stream.Event) {
|
||||
events = append(events, evt)
|
||||
})
|
||||
if err != nil {
|
||||
t.Fatalf("Submit: %v", err)
|
||||
}
|
||||
|
||||
// Failover event must have fired.
|
||||
var failover *stream.Event
|
||||
for i := range events {
|
||||
if events[i].Type == stream.EventFailover {
|
||||
failover = &events[i]
|
||||
break
|
||||
}
|
||||
}
|
||||
if failover == nil {
|
||||
t.Fatal("expected EventFailover in callback events")
|
||||
}
|
||||
if !strings.Contains(failover.FailedArm, "vibe") {
|
||||
t.Errorf("FailedArm = %q, want one mentioning vibe", failover.FailedArm)
|
||||
}
|
||||
if !strings.Contains(failover.FailedReason, "Invalid API key") {
|
||||
t.Errorf("FailedReason = %q, want one mentioning Invalid API key", failover.FailedReason)
|
||||
}
|
||||
|
||||
// Final response must be from the successful arm.
|
||||
if len(turn.Messages) == 0 || turn.Messages[len(turn.Messages)-1].TextContent() != "Hello world" {
|
||||
t.Errorf("final text = %q, want %q",
|
||||
turn.Messages[len(turn.Messages)-1].TextContent(), "Hello world")
|
||||
}
|
||||
}
|
||||
|
||||
func TestEngine_DoesNotFailOver_AfterContentEmitted(t *testing.T) {
|
||||
// One arm. Its stream emits text, then errors. The engine must NOT
|
||||
// attempt failover (would-be-duplicate output) — content is visible
|
||||
// to the user, so the only honest response is to surface the error.
|
||||
// Single-arm setup keeps the test deterministic without needing to
|
||||
// stub the router's bandit selection.
|
||||
rtr := router.New(router.Config{})
|
||||
mp := &mockProvider{
|
||||
name: "subprocess",
|
||||
streams: []stream.Stream{
|
||||
&halfFailStream{
|
||||
preEvents: []stream.Event{{Type: stream.EventTextDelta, Text: "partial..."}},
|
||||
err: errors.New("subprocess: exit status 1: Error: disconnected"),
|
||||
},
|
||||
},
|
||||
}
|
||||
rtr.RegisterArm(&router.Arm{
|
||||
ID: router.NewArmID("subprocess", "vibe"),
|
||||
Provider: security.WrapProvider(mp, nil),
|
||||
ModelName: "vibe",
|
||||
Capabilities: provider.Capabilities{ToolUse: true, ContextWindow: 32000},
|
||||
})
|
||||
|
||||
e, _ := New(Config{
|
||||
Provider: secureMock(&mockProvider{name: "fallback"}),
|
||||
Router: rtr,
|
||||
Tools: tool.NewRegistry(),
|
||||
})
|
||||
|
||||
var events []stream.Event
|
||||
_, err := e.Submit(context.Background(), "hi", func(evt stream.Event) {
|
||||
events = append(events, evt)
|
||||
})
|
||||
if err == nil {
|
||||
t.Fatal("expected error to bubble up since content was already streamed")
|
||||
}
|
||||
for _, ev := range events {
|
||||
if ev.Type == stream.EventFailover {
|
||||
t.Errorf("unexpected EventFailover after content emission")
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func TestEngine_DoesNotFailOver_OnContextCancel(t *testing.T) {
|
||||
rtr := router.New(router.Config{})
|
||||
rtr.RegisterArm(makeFailoverArm(t, router.NewArmID("subprocess", "vibe"), "",
|
||||
fmt.Errorf("wrapped: %w", context.Canceled), true))
|
||||
rtr.RegisterArm(makeFailoverArm(t, router.NewArmID("anthropic", "claude"), "should-not-run", nil, false))
|
||||
|
||||
e, _ := New(Config{
|
||||
Provider: secureMock(&mockProvider{name: "fallback"}),
|
||||
Router: rtr,
|
||||
Tools: tool.NewRegistry(),
|
||||
})
|
||||
|
||||
var events []stream.Event
|
||||
_, err := e.Submit(context.Background(), "hi", func(evt stream.Event) {
|
||||
events = append(events, evt)
|
||||
})
|
||||
if err == nil {
|
||||
t.Fatal("expected context-canceled error to surface, got nil")
|
||||
}
|
||||
for _, ev := range events {
|
||||
if ev.Type == stream.EventFailover {
|
||||
t.Error("EventFailover must not fire on context.Canceled")
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// halfFailStream emits preEvents, then reports err from Err().
|
||||
type halfFailStream struct {
|
||||
preEvents []stream.Event
|
||||
idx int
|
||||
err error
|
||||
}
|
||||
|
||||
func (s *halfFailStream) Next() bool {
|
||||
if s.idx >= len(s.preEvents) {
|
||||
return false
|
||||
}
|
||||
s.idx++
|
||||
return true
|
||||
}
|
||||
func (s *halfFailStream) Current() stream.Event { return s.preEvents[s.idx-1] }
|
||||
func (s *halfFailStream) Err() error { return s.err }
|
||||
func (s *halfFailStream) Close() error { return nil }
|
||||
+122
-11
@@ -109,10 +109,29 @@ func (e *Engine) runLoop(ctx context.Context, cb Callback) (*Turn, error) {
|
||||
// Build provider request (gates tools on model capabilities)
|
||||
req := e.buildRequest(ctx)
|
||||
|
||||
// Route and stream
|
||||
// Route and stream. Both stream-creation errors (existing path) and
|
||||
// stream-consumption errors (new path, end of streamLoop) can trigger
|
||||
// failover to a different arm. failedArms accumulates across the whole
|
||||
// round so the router doesn't re-pick a known-broken arm.
|
||||
var s stream.Stream
|
||||
var err error
|
||||
var decision router.RoutingDecision
|
||||
var failedArms []router.ArmID
|
||||
var acc *stream.Accumulator
|
||||
var stopReason message.StopReason
|
||||
var model string
|
||||
var streamStart, streamEnd time.Time
|
||||
var firstTokenAt time.Time
|
||||
var repetitionTripped bool
|
||||
|
||||
// maxFailovers caps the consumption-time failover budget per round.
|
||||
// Creation-time retries inside retryOnTransient have their own
|
||||
// 4-attempt budget; together they bound total arm attempts per round.
|
||||
const maxFailovers = 4
|
||||
failoverAttempt := 0
|
||||
|
||||
streamLoop:
|
||||
for {
|
||||
|
||||
if e.cfg.Router != nil {
|
||||
prompt := e.latestUserPrompt()
|
||||
@@ -122,11 +141,13 @@ func (e *Engine) runLoop(ctx context.Context, cb Callback) (*Turn, error) {
|
||||
} else {
|
||||
task.EstimatedTokens = int(gnomactx.EstimateTokens(prompt))
|
||||
}
|
||||
task.ExcludedArms = failedArms
|
||||
|
||||
e.logger.Debug("routing request",
|
||||
"task_type", task.Type,
|
||||
"complexity", task.ComplexityScore,
|
||||
"round", turn.Rounds,
|
||||
"failover_attempt", failoverAttempt,
|
||||
)
|
||||
|
||||
s, decision, err = e.cfg.Router.Stream(ctx, task, req)
|
||||
@@ -142,7 +163,7 @@ func (e *Engine) runLoop(ctx context.Context, cb Callback) (*Turn, error) {
|
||||
"tools", len(req.Tools),
|
||||
"round", turn.Rounds,
|
||||
)
|
||||
if turn.Rounds == 1 && cb != nil {
|
||||
if turn.Rounds == 1 && failoverAttempt == 0 && cb != nil {
|
||||
cb(stream.Event{
|
||||
Type: stream.EventRouting,
|
||||
RoutingModel: string(decision.Arm.ID),
|
||||
@@ -163,7 +184,6 @@ func (e *Engine) runLoop(ctx context.Context, cb Callback) (*Turn, error) {
|
||||
s, err = prov.Stream(ctx, req)
|
||||
}
|
||||
if err != nil {
|
||||
var failedArms []router.ArmID
|
||||
if e.cfg.Router != nil && decision.Arm != nil {
|
||||
failedArms = append(failedArms, decision.Arm.ID)
|
||||
}
|
||||
@@ -224,13 +244,12 @@ func (e *Engine) runLoop(ctx context.Context, cb Callback) (*Turn, error) {
|
||||
|
||||
// Consume stream, forwarding events to callback.
|
||||
// Track TTFT and stream duration for arm performance metrics.
|
||||
acc := stream.NewAccumulator()
|
||||
var stopReason message.StopReason
|
||||
var model string
|
||||
|
||||
streamStart := time.Now()
|
||||
var firstTokenAt time.Time
|
||||
repetitionTripped := false
|
||||
acc = stream.NewAccumulator()
|
||||
stopReason = ""
|
||||
model = ""
|
||||
streamStart = time.Now()
|
||||
firstTokenAt = time.Time{}
|
||||
repetitionTripped = false
|
||||
|
||||
for s.Next() {
|
||||
evt := s.Current()
|
||||
@@ -267,15 +286,53 @@ func (e *Engine) runLoop(ctx context.Context, cb Callback) (*Turn, error) {
|
||||
cb(evt)
|
||||
}
|
||||
}
|
||||
streamEnd := time.Now()
|
||||
streamEnd = time.Now()
|
||||
if err := s.Err(); err != nil {
|
||||
e.logger.Debug("stream terminated with error",
|
||||
"error", err,
|
||||
"rounds", turn.Rounds,
|
||||
"failover_attempt", failoverAttempt,
|
||||
"has_content", acc.HasContent(),
|
||||
)
|
||||
if closeErr := s.Close(); closeErr != nil {
|
||||
e.logger.Warn("stream close after error failed", "error", closeErr)
|
||||
}
|
||||
|
||||
// Consumption-time failover: the stream errored before producing
|
||||
// any user-visible content, the error class warrants trying a
|
||||
// different arm, and we have a router that can pick one. Emit a
|
||||
// hint to the TUI, exclude the failed arm, and loop. If any of
|
||||
// these guards fails — content was streamed, error is fatal, the
|
||||
// arm was force-pinned, retry budget exhausted — fall through
|
||||
// to the existing terminal error path so the user sees what
|
||||
// went wrong instead of a silent stall.
|
||||
canFailover := e.cfg.Router != nil &&
|
||||
e.cfg.Router.ForcedArm() == "" &&
|
||||
decision.Arm != nil &&
|
||||
!acc.HasContent() &&
|
||||
isFailoverable(err) &&
|
||||
failoverAttempt < maxFailovers
|
||||
if canFailover {
|
||||
failedArmID := decision.Arm.ID
|
||||
failedArms = append(failedArms, failedArmID)
|
||||
decision.Rollback()
|
||||
if cb != nil {
|
||||
cb(stream.Event{
|
||||
Type: stream.EventFailover,
|
||||
FailedArm: string(failedArmID),
|
||||
FailedReason: shortFailReason(err),
|
||||
Err: err,
|
||||
})
|
||||
}
|
||||
e.logger.Info("stream failover",
|
||||
"failed_arm", failedArmID,
|
||||
"reason", err,
|
||||
"attempt", failoverAttempt+1,
|
||||
)
|
||||
failoverAttempt++
|
||||
continue streamLoop
|
||||
}
|
||||
|
||||
decision.Rollback()
|
||||
streamErr := e.annotateStreamError(err, len(req.Tools))
|
||||
reportOutcome(streamErr)
|
||||
@@ -284,6 +341,8 @@ func (e *Engine) runLoop(ctx context.Context, cb Callback) (*Turn, error) {
|
||||
if err := s.Close(); err != nil {
|
||||
e.logger.Warn("stream close failed", "error", err)
|
||||
}
|
||||
break streamLoop
|
||||
} // end streamLoop
|
||||
|
||||
// Build response
|
||||
resp := acc.Response(stopReason, model)
|
||||
@@ -848,6 +907,58 @@ func (e *Engine) retryOnTransient(ctx context.Context, firstErr error, skipDelay
|
||||
return nil, firstErr
|
||||
}
|
||||
|
||||
// isFailoverable reports whether err warrants asking the router for a
|
||||
// different arm. Broader than retryOnTransient's Retryable check: a
|
||||
// subprocess CLI agent that exits 1 because of bad credentials is not a
|
||||
// provider.ProviderError but still calls for trying a different arm.
|
||||
//
|
||||
// Conservative deny-list — fatal classes that another arm cannot help with:
|
||||
// - context.Canceled / DeadlineExceeded: user-driven abort, propagate.
|
||||
// - HTTP 400 (bad request) / 413 (too large): request-shape problem,
|
||||
// other arms will reject it the same way. (413 has its own dedicated
|
||||
// reactive-compaction path; we don't want to compete with it here.)
|
||||
//
|
||||
// Everything else — auth (401/403), rate limits (429), 5xx, subprocess
|
||||
// errors, network/transport failures — is failoverable.
|
||||
func isFailoverable(err error) bool {
|
||||
if err == nil {
|
||||
return false
|
||||
}
|
||||
if errors.Is(err, context.Canceled) || errors.Is(err, context.DeadlineExceeded) {
|
||||
return false
|
||||
}
|
||||
var provErr *provider.ProviderError
|
||||
if errors.As(err, &provErr) {
|
||||
if provErr.StatusCode == 400 || provErr.StatusCode == 413 {
|
||||
return false
|
||||
}
|
||||
}
|
||||
return true
|
||||
}
|
||||
|
||||
// shortFailReason produces a one-line summary of err for TUI display.
|
||||
// Long error chains (a full subprocess exit + stderr dump can run to a
|
||||
// few hundred chars) are truncated to keep the rendered hint readable.
|
||||
func shortFailReason(err error) string {
|
||||
if err == nil {
|
||||
return ""
|
||||
}
|
||||
s := err.Error()
|
||||
// Strip the leading "subprocess: exit status N: " envelope when the
|
||||
// CLI agent has surfaced its own message after the colon; the user
|
||||
// cares about the inner message, not our wrapper.
|
||||
if i := strings.Index(s, "Error: "); i >= 0 && i < 80 {
|
||||
s = s[i+len("Error: "):]
|
||||
}
|
||||
if len(s) > 160 {
|
||||
s = s[:157] + "..."
|
||||
}
|
||||
// Collapse newlines to single spaces — multi-line stderr breaks TUI layout.
|
||||
s = strings.ReplaceAll(s, "\n", " ")
|
||||
s = strings.ReplaceAll(s, "\r", " ")
|
||||
return strings.TrimSpace(s)
|
||||
}
|
||||
|
||||
// annotateStreamError wraps a stream error with diagnostic context when the
|
||||
// failure is a deterministic tool-parse error from a local server. The extra
|
||||
// context is visible in the TUI (slog.Debug goes to a file).
|
||||
|
||||
@@ -93,6 +93,29 @@ func (a *Accumulator) Apply(e Event) {
|
||||
}
|
||||
}
|
||||
|
||||
// HasContent reports whether any user-visible content (text, thinking, or
|
||||
// tool calls) has been accumulated. Used by the engine to decide whether a
|
||||
// stream error is safe to fail over to another arm: a retry that would
|
||||
// produce duplicate text after the user already saw partial output is
|
||||
// worse than surfacing the error.
|
||||
//
|
||||
// Usage events alone do not count as content — they are bookkeeping.
|
||||
func (a *Accumulator) HasContent() bool {
|
||||
if a.textBuf != nil && a.textBuf.Len() > 0 {
|
||||
return true
|
||||
}
|
||||
if a.thinkBuf != nil && a.thinkBuf.Len() > 0 {
|
||||
return true
|
||||
}
|
||||
if len(a.content) > 0 {
|
||||
return true
|
||||
}
|
||||
if len(a.toolCallOrder) > 0 {
|
||||
return true
|
||||
}
|
||||
return false
|
||||
}
|
||||
|
||||
// Response builds the final message.Response from all accumulated events.
|
||||
func (a *Accumulator) Response(stopReason message.StopReason, model string) message.Response {
|
||||
a.flushText()
|
||||
|
||||
@@ -19,7 +19,8 @@ const (
|
||||
EventToolResult // tool execution output
|
||||
EventPermissionReq // permission prompt needed
|
||||
EventUsage
|
||||
EventRouting // router arm selection
|
||||
EventRouting // router arm selection
|
||||
EventFailover // arm swap after a stream error (no content was emitted)
|
||||
EventError
|
||||
)
|
||||
|
||||
@@ -43,6 +44,8 @@ func (et EventType) String() string {
|
||||
return "usage"
|
||||
case EventRouting:
|
||||
return "routing"
|
||||
case EventFailover:
|
||||
return "failover"
|
||||
case EventError:
|
||||
return "error"
|
||||
default:
|
||||
@@ -80,6 +83,13 @@ type Event struct {
|
||||
RoutingTask string // classified task type
|
||||
RoutingClassifier string // classifier source: heuristic / slm / slm_fallback
|
||||
|
||||
// Failover — set on EventFailover. FailedArm is the arm whose stream
|
||||
// errored out before producing content; FailedReason is a short message
|
||||
// suitable for TUI display (the underlying error is in Err). RoutingModel
|
||||
// holds the arm being tried next.
|
||||
FailedArm string
|
||||
FailedReason string
|
||||
|
||||
// Error
|
||||
Err error
|
||||
|
||||
|
||||
@@ -54,6 +54,13 @@ func (m Model) handleStreamEvent(evt stream.Event) (tea.Model, tea.Cmd) {
|
||||
}
|
||||
content += ")"
|
||||
m.messages = append(m.messages, chatMessage{role: "cost", content: content})
|
||||
case stream.EventFailover:
|
||||
content := fmt.Sprintf("↻ %s failed", evt.FailedArm)
|
||||
if evt.FailedReason != "" {
|
||||
content += " (" + evt.FailedReason + ")"
|
||||
}
|
||||
content += " — retrying on another arm"
|
||||
m.messages = append(m.messages, chatMessage{role: "cost", content: content})
|
||||
case stream.EventToolResult:
|
||||
if m.elfToolActive {
|
||||
// Suppress raw elf output — tree shows progress, LLM summarizes
|
||||
|
||||
Reference in New Issue
Block a user