Files
vikingowl aca830e7db feat(engine): consumption-time stream-error failover
When a stream errors out before producing any user-visible content
(text, thinking, or tool calls), the engine now transparently retries
on the next-best arm instead of bubbling the error to the TUI. Covers
the case from the post-SLM screenshot: subprocess CLI agents that
exit non-zero on auth/config failures, network drops mid-stream,
rate-limited arms whose error surfaces after Stream() already returned.

Mechanism: the stream-create + consume blocks are wrapped in a labeled
streamLoop. On s.Err() != nil with empty accumulator, the engine emits
a new EventFailover ("↻ <failed_arm> failed (<reason>) — retrying on
another arm"), excludes the failed arm via task.ExcludedArms, and
re-enters the loop. Cap of 4 failovers per round.

Guards:
- !acc.HasContent() — if text/tool calls already streamed, fail loud
  rather than duplicate visible output on retry.
- isFailoverable(err) — deny-list approach: context.Canceled/Deadline
  and HTTP 400/413 are fatal; everything else (auth, rate limit, 5xx,
  subprocess exit, network) is failoverable.
- Router.ForcedArm() == "" — when the user pinned an arm via --provider,
  failover is disabled by design.
- failoverAttempt < maxFailovers — bounded retry budget.

TUI renders EventFailover under the existing "cost" role styling.
shortFailReason strips the subprocess wrapper envelope so the user sees
"Invalid API key. Try again." instead of
"subprocess: exit status 1: Error: Invalid API key. Try again.".

Tests cover the classifier (isFailoverable, shortFailReason), end-to-end
auth-error failover, content-already-streamed guard, and context-cancel
guard. Deterministic across 10x -race runs by giving the failing arm
IsCLIAgent=true to anchor it in tier 0 ahead of the API-tier backup.
2026-05-20 02:20:00 +02:00

172 lines
4.2 KiB
Go

package stream
import (
"strings"
"somegit.dev/Owlibou/gnoma/internal/message"
)
// Accumulator assembles a message.Response from a sequence of Events.
// Provider adapters translate SDK events into stream.Events;
// the Accumulator — shared, tested once — builds the final Response.
type Accumulator struct {
content []message.Content
usage message.Usage
// Active text block being built
textBuf *strings.Builder
// Active thinking block being built
thinkBuf *strings.Builder
// Tool calls in progress, keyed by ToolCallID
toolCalls map[string]*toolCallAccum
// Ordered tool call IDs to preserve emission order
toolCallOrder []string
}
type toolCallAccum struct {
id string
name string
argsBuf strings.Builder
args []byte // final complete args (from Done event)
}
func NewAccumulator() *Accumulator {
return &Accumulator{
toolCalls: make(map[string]*toolCallAccum),
}
}
// Apply processes a single event, updating the accumulator's state.
func (a *Accumulator) Apply(e Event) {
switch e.Type {
case EventTextDelta:
a.flushThinking()
if a.textBuf == nil {
a.textBuf = &strings.Builder{}
}
a.textBuf.WriteString(e.Text)
case EventThinkingDelta:
a.flushText()
if a.thinkBuf == nil {
a.thinkBuf = &strings.Builder{}
}
a.thinkBuf.WriteString(e.Text)
case EventToolCallStart:
a.flushText()
a.flushThinking()
tc := &toolCallAccum{id: e.ToolCallID, name: e.ToolCallName}
a.toolCalls[e.ToolCallID] = tc
a.toolCallOrder = append(a.toolCallOrder, e.ToolCallID)
case EventToolCallDelta:
if tc, ok := a.toolCalls[e.ToolCallID]; ok {
tc.argsBuf.WriteString(e.ArgDelta)
}
case EventToolCallDone:
tc, ok := a.toolCalls[e.ToolCallID]
if !ok {
// Done without prior Start (e.g., Mistral sends complete tool calls in one chunk)
tc = &toolCallAccum{id: e.ToolCallID, name: e.ToolCallName}
a.toolCalls[e.ToolCallID] = tc
a.toolCallOrder = append(a.toolCallOrder, e.ToolCallID)
}
if e.Args != nil {
// Done event carries authoritative complete args
tc.args = e.Args
} else {
// Fall back to accumulated deltas
tc.args = []byte(tc.argsBuf.String())
}
case EventUsage:
if e.Usage != nil {
a.usage.Add(*e.Usage)
}
case EventError:
// Errors are handled by the stream consumer, not accumulated
}
}
// HasContent reports whether any user-visible content (text, thinking, or
// tool calls) has been accumulated. Used by the engine to decide whether a
// stream error is safe to fail over to another arm: a retry that would
// produce duplicate text after the user already saw partial output is
// worse than surfacing the error.
//
// Usage events alone do not count as content — they are bookkeeping.
func (a *Accumulator) HasContent() bool {
if a.textBuf != nil && a.textBuf.Len() > 0 {
return true
}
if a.thinkBuf != nil && a.thinkBuf.Len() > 0 {
return true
}
if len(a.content) > 0 {
return true
}
if len(a.toolCallOrder) > 0 {
return true
}
return false
}
// Response builds the final message.Response from all accumulated events.
func (a *Accumulator) Response(stopReason message.StopReason, model string) message.Response {
a.flushText()
a.flushThinking()
a.flushToolCalls()
return message.Response{
Message: message.Message{
Role: message.RoleAssistant,
Content: a.content,
},
StopReason: stopReason,
Usage: a.usage,
Model: model,
}
}
func (a *Accumulator) flushText() {
if a.textBuf != nil && a.textBuf.Len() > 0 {
a.content = append(a.content, message.NewTextContent(a.textBuf.String()))
a.textBuf = nil
}
}
func (a *Accumulator) flushThinking() {
if a.thinkBuf != nil && a.thinkBuf.Len() > 0 {
a.content = append(a.content, message.NewThinkingContent(message.Thinking{
Text: a.thinkBuf.String(),
}))
a.thinkBuf = nil
}
}
func (a *Accumulator) flushToolCalls() {
for _, id := range a.toolCallOrder {
tc, ok := a.toolCalls[id]
if !ok {
continue
}
args := tc.args
if args == nil {
// Fallback: use accumulated deltas even if Done never arrived
args = []byte(tc.argsBuf.String())
}
a.content = append(a.content, message.NewToolCallContent(message.ToolCall{
ID: tc.id,
Name: tc.name,
Arguments: args,
}))
}
a.toolCalls = make(map[string]*toolCallAccum)
a.toolCallOrder = nil
}