aca830e7db
When a stream errors out before producing any user-visible content
(text, thinking, or tool calls), the engine now transparently retries
on the next-best arm instead of bubbling the error to the TUI. Covers
the case from the post-SLM screenshot: subprocess CLI agents that
exit non-zero on auth/config failures, network drops mid-stream,
rate-limited arms whose error surfaces after Stream() already returned.
Mechanism: the stream-create + consume blocks are wrapped in a labeled
streamLoop. On s.Err() != nil with empty accumulator, the engine emits
a new EventFailover ("↻ <failed_arm> failed (<reason>) — retrying on
another arm"), excludes the failed arm via task.ExcludedArms, and
re-enters the loop. Cap of 4 failovers per round.
Guards:
- !acc.HasContent() — if text/tool calls already streamed, fail loud
rather than duplicate visible output on retry.
- isFailoverable(err) — deny-list approach: context.Canceled/Deadline
and HTTP 400/413 are fatal; everything else (auth, rate limit, 5xx,
subprocess exit, network) is failoverable.
- Router.ForcedArm() == "" — when the user pinned an arm via --provider,
failover is disabled by design.
- failoverAttempt < maxFailovers — bounded retry budget.
TUI renders EventFailover under the existing "cost" role styling.
shortFailReason strips the subprocess wrapper envelope so the user sees
"Invalid API key. Try again." instead of
"subprocess: exit status 1: Error: Invalid API key. Try again.".
Tests cover the classifier (isFailoverable, shortFailReason), end-to-end
auth-error failover, content-already-streamed guard, and context-cancel
guard. Deterministic across 10x -race runs by giving the failing arm
IsCLIAgent=true to anchor it in tier 0 ahead of the API-tier backup.
127 lines
3.6 KiB
Go
127 lines
3.6 KiB
Go
package tui
|
|
|
|
import (
|
|
"fmt"
|
|
|
|
tea "charm.land/bubbletea/v2"
|
|
"somegit.dev/Owlibou/gnoma/internal/message"
|
|
"somegit.dev/Owlibou/gnoma/internal/stream"
|
|
)
|
|
|
|
func (m Model) handleStreamEvent(evt stream.Event) (tea.Model, tea.Cmd) {
|
|
switch evt.Type {
|
|
case stream.EventTextDelta:
|
|
if evt.Text != "" {
|
|
text := filterModelCodeBlocks(&m.streamFilterClose, evt.Text)
|
|
if text != "" {
|
|
m.streamBuf.WriteString(text)
|
|
}
|
|
}
|
|
case stream.EventThinkingDelta:
|
|
// Accumulate reasoning in a separate buffer so it stays frozen/dim
|
|
// while regular text content streams normally below it.
|
|
if m.streamBuf.Len() == 0 {
|
|
m.thinkingBuf.WriteString(evt.Text)
|
|
} else {
|
|
// Text has already started; treat additional thinking as text.
|
|
m.streamBuf.WriteString(evt.Text)
|
|
}
|
|
case stream.EventToolCallStart:
|
|
// Flush both buffers before tool call label
|
|
if m.thinkingBuf.Len() > 0 {
|
|
m.messages = append(m.messages, chatMessage{role: "thinking", content: m.thinkingBuf.String()})
|
|
m.thinkingBuf.Reset()
|
|
}
|
|
if m.streamBuf.Len() > 0 {
|
|
m.messages = append(m.messages, chatMessage{role: m.currentRole, content: m.streamBuf.String()})
|
|
m.streamBuf.Reset()
|
|
}
|
|
if m.initPending {
|
|
m.initHadToolCalls = true
|
|
}
|
|
case stream.EventToolCallDone:
|
|
if evt.ToolCallName == "agent" || evt.ToolCallName == "spawn_elfs" {
|
|
// Suppress tool message — elf tree view handles display
|
|
m.elfToolActive = true
|
|
} else {
|
|
// Track running tools transiently — not in permanent chat history
|
|
m.runningTools = append(m.runningTools, evt.ToolCallName)
|
|
}
|
|
case stream.EventRouting:
|
|
content := fmt.Sprintf("routed → %s (task: %s", evt.RoutingModel, evt.RoutingTask)
|
|
if evt.RoutingClassifier != "" {
|
|
content += ", by: " + evt.RoutingClassifier
|
|
}
|
|
content += ")"
|
|
m.messages = append(m.messages, chatMessage{role: "cost", content: content})
|
|
case stream.EventFailover:
|
|
content := fmt.Sprintf("↻ %s failed", evt.FailedArm)
|
|
if evt.FailedReason != "" {
|
|
content += " (" + evt.FailedReason + ")"
|
|
}
|
|
content += " — retrying on another arm"
|
|
m.messages = append(m.messages, chatMessage{role: "cost", content: content})
|
|
case stream.EventToolResult:
|
|
if m.elfToolActive {
|
|
// Suppress raw elf output — tree shows progress, LLM summarizes
|
|
m.elfToolActive = false
|
|
} else {
|
|
// Pop first running tool (FIFO — results arrive in call order)
|
|
if len(m.runningTools) > 0 {
|
|
m.runningTools = m.runningTools[1:]
|
|
}
|
|
m.messages = append(m.messages, chatMessage{
|
|
role: "toolresult", content: evt.ToolOutput,
|
|
})
|
|
}
|
|
}
|
|
return m, m.listenForEvents()
|
|
}
|
|
|
|
func (m Model) listenForEvents() tea.Cmd {
|
|
ch := m.session.Events()
|
|
permReqCh := m.config.PermReqCh
|
|
|
|
elfProgressCh := m.config.ElfProgress
|
|
|
|
return func() tea.Msg {
|
|
// Listen for stream events, permission requests, and elf progress
|
|
if permReqCh != nil || elfProgressCh != nil {
|
|
// Build select dynamically — always listen on ch
|
|
select {
|
|
case evt, ok := <-ch:
|
|
if !ok {
|
|
turn, err := m.session.TurnResult()
|
|
var usage message.Usage
|
|
if turn != nil {
|
|
usage = turn.Usage
|
|
}
|
|
return turnDoneMsg{err: err, usage: usage}
|
|
}
|
|
return streamEventMsg{event: evt}
|
|
case req, ok := <-permReqCh:
|
|
if ok {
|
|
return req
|
|
}
|
|
return nil
|
|
case progress, ok := <-elfProgressCh:
|
|
if ok {
|
|
return elfProgressMsg{progress: progress}
|
|
}
|
|
return nil
|
|
}
|
|
}
|
|
|
|
evt, ok := <-ch
|
|
if !ok {
|
|
turn, err := m.session.TurnResult()
|
|
var usage message.Usage
|
|
if turn != nil {
|
|
usage = turn.Usage
|
|
}
|
|
return turnDoneMsg{err: err, usage: usage}
|
|
}
|
|
return streamEventMsg{event: evt}
|
|
}
|
|
}
|