aca830e7db
When a stream errors out before producing any user-visible content
(text, thinking, or tool calls), the engine now transparently retries
on the next-best arm instead of bubbling the error to the TUI. Covers
the case from the post-SLM screenshot: subprocess CLI agents that
exit non-zero on auth/config failures, network drops mid-stream,
rate-limited arms whose error surfaces after Stream() already returned.
Mechanism: the stream-create + consume blocks are wrapped in a labeled
streamLoop. On s.Err() != nil with empty accumulator, the engine emits
a new EventFailover ("↻ <failed_arm> failed (<reason>) — retrying on
another arm"), excludes the failed arm via task.ExcludedArms, and
re-enters the loop. Cap of 4 failovers per round.
Guards:
- !acc.HasContent() — if text/tool calls already streamed, fail loud
rather than duplicate visible output on retry.
- isFailoverable(err) — deny-list approach: context.Canceled/Deadline
and HTTP 400/413 are fatal; everything else (auth, rate limit, 5xx,
subprocess exit, network) is failoverable.
- Router.ForcedArm() == "" — when the user pinned an arm via --provider,
failover is disabled by design.
- failoverAttempt < maxFailovers — bounded retry budget.
TUI renders EventFailover under the existing "cost" role styling.
shortFailReason strips the subprocess wrapper envelope so the user sees
"Invalid API key. Try again." instead of
"subprocess: exit status 1: Error: Invalid API key. Try again.".
Tests cover the classifier (isFailoverable, shortFailReason), end-to-end
auth-error failover, content-already-streamed guard, and context-cancel
guard. Deterministic across 10x -race runs by giving the failing arm
IsCLIAgent=true to anchor it in tier 0 ahead of the API-tier backup.
172 lines
4.2 KiB
Go
172 lines
4.2 KiB
Go
package stream
|
|
|
|
import (
|
|
"strings"
|
|
|
|
"somegit.dev/Owlibou/gnoma/internal/message"
|
|
)
|
|
|
|
// Accumulator assembles a message.Response from a sequence of Events.
|
|
// Provider adapters translate SDK events into stream.Events;
|
|
// the Accumulator — shared, tested once — builds the final Response.
|
|
type Accumulator struct {
|
|
content []message.Content
|
|
usage message.Usage
|
|
|
|
// Active text block being built
|
|
textBuf *strings.Builder
|
|
|
|
// Active thinking block being built
|
|
thinkBuf *strings.Builder
|
|
|
|
// Tool calls in progress, keyed by ToolCallID
|
|
toolCalls map[string]*toolCallAccum
|
|
// Ordered tool call IDs to preserve emission order
|
|
toolCallOrder []string
|
|
}
|
|
|
|
type toolCallAccum struct {
|
|
id string
|
|
name string
|
|
argsBuf strings.Builder
|
|
args []byte // final complete args (from Done event)
|
|
}
|
|
|
|
func NewAccumulator() *Accumulator {
|
|
return &Accumulator{
|
|
toolCalls: make(map[string]*toolCallAccum),
|
|
}
|
|
}
|
|
|
|
// Apply processes a single event, updating the accumulator's state.
|
|
func (a *Accumulator) Apply(e Event) {
|
|
switch e.Type {
|
|
case EventTextDelta:
|
|
a.flushThinking()
|
|
if a.textBuf == nil {
|
|
a.textBuf = &strings.Builder{}
|
|
}
|
|
a.textBuf.WriteString(e.Text)
|
|
|
|
case EventThinkingDelta:
|
|
a.flushText()
|
|
if a.thinkBuf == nil {
|
|
a.thinkBuf = &strings.Builder{}
|
|
}
|
|
a.thinkBuf.WriteString(e.Text)
|
|
|
|
case EventToolCallStart:
|
|
a.flushText()
|
|
a.flushThinking()
|
|
tc := &toolCallAccum{id: e.ToolCallID, name: e.ToolCallName}
|
|
a.toolCalls[e.ToolCallID] = tc
|
|
a.toolCallOrder = append(a.toolCallOrder, e.ToolCallID)
|
|
|
|
case EventToolCallDelta:
|
|
if tc, ok := a.toolCalls[e.ToolCallID]; ok {
|
|
tc.argsBuf.WriteString(e.ArgDelta)
|
|
}
|
|
|
|
case EventToolCallDone:
|
|
tc, ok := a.toolCalls[e.ToolCallID]
|
|
if !ok {
|
|
// Done without prior Start (e.g., Mistral sends complete tool calls in one chunk)
|
|
tc = &toolCallAccum{id: e.ToolCallID, name: e.ToolCallName}
|
|
a.toolCalls[e.ToolCallID] = tc
|
|
a.toolCallOrder = append(a.toolCallOrder, e.ToolCallID)
|
|
}
|
|
if e.Args != nil {
|
|
// Done event carries authoritative complete args
|
|
tc.args = e.Args
|
|
} else {
|
|
// Fall back to accumulated deltas
|
|
tc.args = []byte(tc.argsBuf.String())
|
|
}
|
|
|
|
case EventUsage:
|
|
if e.Usage != nil {
|
|
a.usage.Add(*e.Usage)
|
|
}
|
|
|
|
case EventError:
|
|
// Errors are handled by the stream consumer, not accumulated
|
|
}
|
|
}
|
|
|
|
// HasContent reports whether any user-visible content (text, thinking, or
|
|
// tool calls) has been accumulated. Used by the engine to decide whether a
|
|
// stream error is safe to fail over to another arm: a retry that would
|
|
// produce duplicate text after the user already saw partial output is
|
|
// worse than surfacing the error.
|
|
//
|
|
// Usage events alone do not count as content — they are bookkeeping.
|
|
func (a *Accumulator) HasContent() bool {
|
|
if a.textBuf != nil && a.textBuf.Len() > 0 {
|
|
return true
|
|
}
|
|
if a.thinkBuf != nil && a.thinkBuf.Len() > 0 {
|
|
return true
|
|
}
|
|
if len(a.content) > 0 {
|
|
return true
|
|
}
|
|
if len(a.toolCallOrder) > 0 {
|
|
return true
|
|
}
|
|
return false
|
|
}
|
|
|
|
// Response builds the final message.Response from all accumulated events.
|
|
func (a *Accumulator) Response(stopReason message.StopReason, model string) message.Response {
|
|
a.flushText()
|
|
a.flushThinking()
|
|
a.flushToolCalls()
|
|
|
|
return message.Response{
|
|
Message: message.Message{
|
|
Role: message.RoleAssistant,
|
|
Content: a.content,
|
|
},
|
|
StopReason: stopReason,
|
|
Usage: a.usage,
|
|
Model: model,
|
|
}
|
|
}
|
|
|
|
func (a *Accumulator) flushText() {
|
|
if a.textBuf != nil && a.textBuf.Len() > 0 {
|
|
a.content = append(a.content, message.NewTextContent(a.textBuf.String()))
|
|
a.textBuf = nil
|
|
}
|
|
}
|
|
|
|
func (a *Accumulator) flushThinking() {
|
|
if a.thinkBuf != nil && a.thinkBuf.Len() > 0 {
|
|
a.content = append(a.content, message.NewThinkingContent(message.Thinking{
|
|
Text: a.thinkBuf.String(),
|
|
}))
|
|
a.thinkBuf = nil
|
|
}
|
|
}
|
|
|
|
func (a *Accumulator) flushToolCalls() {
|
|
for _, id := range a.toolCallOrder {
|
|
tc, ok := a.toolCalls[id]
|
|
if !ok {
|
|
continue
|
|
}
|
|
args := tc.args
|
|
if args == nil {
|
|
// Fallback: use accumulated deltas even if Done never arrived
|
|
args = []byte(tc.argsBuf.String())
|
|
}
|
|
a.content = append(a.content, message.NewToolCallContent(message.ToolCall{
|
|
ID: tc.id,
|
|
Name: tc.name,
|
|
Arguments: args,
|
|
}))
|
|
}
|
|
a.toolCalls = make(map[string]*toolCallAccum)
|
|
a.toolCallOrder = nil
|
|
}
|