Files
vikingowl aca830e7db feat(engine): consumption-time stream-error failover
When a stream errors out before producing any user-visible content
(text, thinking, or tool calls), the engine now transparently retries
on the next-best arm instead of bubbling the error to the TUI. Covers
the case from the post-SLM screenshot: subprocess CLI agents that
exit non-zero on auth/config failures, network drops mid-stream,
rate-limited arms whose error surfaces after Stream() already returned.

Mechanism: the stream-create + consume blocks are wrapped in a labeled
streamLoop. On s.Err() != nil with empty accumulator, the engine emits
a new EventFailover ("↻ <failed_arm> failed (<reason>) — retrying on
another arm"), excludes the failed arm via task.ExcludedArms, and
re-enters the loop. Cap of 4 failovers per round.

Guards:
- !acc.HasContent() — if text/tool calls already streamed, fail loud
  rather than duplicate visible output on retry.
- isFailoverable(err) — deny-list approach: context.Canceled/Deadline
  and HTTP 400/413 are fatal; everything else (auth, rate limit, 5xx,
  subprocess exit, network) is failoverable.
- Router.ForcedArm() == "" — when the user pinned an arm via --provider,
  failover is disabled by design.
- failoverAttempt < maxFailovers — bounded retry budget.

TUI renders EventFailover under the existing "cost" role styling.
shortFailReason strips the subprocess wrapper envelope so the user sees
"Invalid API key. Try again." instead of
"subprocess: exit status 1: Error: Invalid API key. Try again.".

Tests cover the classifier (isFailoverable, shortFailReason), end-to-end
auth-error failover, content-already-streamed guard, and context-cancel
guard. Deterministic across 10x -race runs by giving the failing arm
IsCLIAgent=true to anchor it in tier 0 ahead of the API-tier backup.
2026-05-20 02:20:00 +02:00

127 lines
3.6 KiB
Go

package tui
import (
"fmt"
tea "charm.land/bubbletea/v2"
"somegit.dev/Owlibou/gnoma/internal/message"
"somegit.dev/Owlibou/gnoma/internal/stream"
)
func (m Model) handleStreamEvent(evt stream.Event) (tea.Model, tea.Cmd) {
switch evt.Type {
case stream.EventTextDelta:
if evt.Text != "" {
text := filterModelCodeBlocks(&m.streamFilterClose, evt.Text)
if text != "" {
m.streamBuf.WriteString(text)
}
}
case stream.EventThinkingDelta:
// Accumulate reasoning in a separate buffer so it stays frozen/dim
// while regular text content streams normally below it.
if m.streamBuf.Len() == 0 {
m.thinkingBuf.WriteString(evt.Text)
} else {
// Text has already started; treat additional thinking as text.
m.streamBuf.WriteString(evt.Text)
}
case stream.EventToolCallStart:
// Flush both buffers before tool call label
if m.thinkingBuf.Len() > 0 {
m.messages = append(m.messages, chatMessage{role: "thinking", content: m.thinkingBuf.String()})
m.thinkingBuf.Reset()
}
if m.streamBuf.Len() > 0 {
m.messages = append(m.messages, chatMessage{role: m.currentRole, content: m.streamBuf.String()})
m.streamBuf.Reset()
}
if m.initPending {
m.initHadToolCalls = true
}
case stream.EventToolCallDone:
if evt.ToolCallName == "agent" || evt.ToolCallName == "spawn_elfs" {
// Suppress tool message — elf tree view handles display
m.elfToolActive = true
} else {
// Track running tools transiently — not in permanent chat history
m.runningTools = append(m.runningTools, evt.ToolCallName)
}
case stream.EventRouting:
content := fmt.Sprintf("routed → %s (task: %s", evt.RoutingModel, evt.RoutingTask)
if evt.RoutingClassifier != "" {
content += ", by: " + evt.RoutingClassifier
}
content += ")"
m.messages = append(m.messages, chatMessage{role: "cost", content: content})
case stream.EventFailover:
content := fmt.Sprintf("↻ %s failed", evt.FailedArm)
if evt.FailedReason != "" {
content += " (" + evt.FailedReason + ")"
}
content += " — retrying on another arm"
m.messages = append(m.messages, chatMessage{role: "cost", content: content})
case stream.EventToolResult:
if m.elfToolActive {
// Suppress raw elf output — tree shows progress, LLM summarizes
m.elfToolActive = false
} else {
// Pop first running tool (FIFO — results arrive in call order)
if len(m.runningTools) > 0 {
m.runningTools = m.runningTools[1:]
}
m.messages = append(m.messages, chatMessage{
role: "toolresult", content: evt.ToolOutput,
})
}
}
return m, m.listenForEvents()
}
func (m Model) listenForEvents() tea.Cmd {
ch := m.session.Events()
permReqCh := m.config.PermReqCh
elfProgressCh := m.config.ElfProgress
return func() tea.Msg {
// Listen for stream events, permission requests, and elf progress
if permReqCh != nil || elfProgressCh != nil {
// Build select dynamically — always listen on ch
select {
case evt, ok := <-ch:
if !ok {
turn, err := m.session.TurnResult()
var usage message.Usage
if turn != nil {
usage = turn.Usage
}
return turnDoneMsg{err: err, usage: usage}
}
return streamEventMsg{event: evt}
case req, ok := <-permReqCh:
if ok {
return req
}
return nil
case progress, ok := <-elfProgressCh:
if ok {
return elfProgressMsg{progress: progress}
}
return nil
}
}
evt, ok := <-ch
if !ok {
turn, err := m.session.TurnResult()
var usage message.Usage
if turn != nil {
usage = turn.Usage
}
return turnDoneMsg{err: err, usage: usage}
}
return streamEventMsg{event: evt}
}
}