Files
vikingowl 1606d19366 feat(subprocess/codex): account for cached and reasoning tokens
codex 0.133.0 emits two token-accounting fields at top level that
we previously dropped:

  cached_input_tokens   — subset of input_tokens that hit the prompt
                          cache (cheaper, but still counted in
                          input_tokens per OpenAI Responses API
                          semantics)
  reasoning_output_tokens — separately reported billable thinking
                          tokens on reasoning-capable models

Map cached_input_tokens to message.Usage.CacheReadTokens and subtract
it from InputTokens. message.Usage.Add() sums InputTokens and
CacheReadTokens as peers, so the uncached residual goes in
InputTokens — matches the anthropic provider's convention and keeps
cumulative usage tracking arithmetically correct.

Fold reasoning_output_tokens into OutputTokens for accurate cost
tracking. The top-level peer positioning (vs nested in
output_tokens_details) implies a separately counted billable
quantity, not a subset of output_tokens.

Defensive clamp at zero in case a future codex build reports
cached > input due to schema drift. Includes a verbatim regression
guard against the live 2026-05-22 codex 0.133.0 output to catch
schema changes early.
2026-05-22 13:35:57 +02:00

357 lines
10 KiB
Go

package subprocess
import (
"bytes"
"encoding/json"
"fmt"
"log/slog"
"somegit.dev/Owlibou/gnoma/internal/message"
"somegit.dev/Owlibou/gnoma/internal/provider"
"somegit.dev/Owlibou/gnoma/internal/stream"
)
// FormatParser converts raw stdout lines from a CLI subprocess into stream.Events.
// Each CLI agent emits its own line-delimited JSON format.
//
// Design note: These agents are full agentic loops, not LLM endpoints. The provider.Request
// fields Tools, Messages (history), SystemPrompt, Temperature, Thinking, etc. are NOT honored —
// they are opaque black boxes. Only the latest user message is passed as a prompt. Internal
// tool calls executed by the CLI are surfaced as EventTextDelta (opaque text) in v1.
type FormatParser interface {
// ParseLine parses one newline-stripped stdout line. Returns 0 or more events.
ParseLine(line []byte) ([]stream.Event, error)
// Done is called when the process exits cleanly. May emit final events.
Done() []stream.Event
}
// --- claude-stream-json ---
// Format emitted by: claude -p "..." --output-format stream-json --verbose
//
// Relevant event types:
// type=assistant → message.content[].type=text → EventTextDelta
// type=result → usage.input_tokens/output_tokens, stop_reason → EventUsage; is_error → EventError
type claudeParser struct{}
func newClaudeParser() FormatParser { return &claudeParser{} }
type claudeEvent struct {
Type string `json:"type"`
Subtype string `json:"subtype"`
Message *claudeMessage `json:"message,omitempty"`
IsError bool `json:"is_error,omitempty"`
// result fields
StopReason string `json:"stop_reason,omitempty"`
Usage *claudeUsage `json:"usage,omitempty"`
}
type claudeMessage struct {
Content []claudeContent `json:"content"`
Usage *claudeUsage `json:"usage,omitempty"`
}
type claudeContent struct {
Type string `json:"type"`
Text string `json:"text,omitempty"`
}
type claudeUsage struct {
InputTokens int64 `json:"input_tokens"`
OutputTokens int64 `json:"output_tokens"`
}
func (p *claudeParser) ParseLine(line []byte) ([]stream.Event, error) {
var ev claudeEvent
if err := json.Unmarshal(line, &ev); err != nil {
return nil, fmt.Errorf("claude: parse line: %w", err)
}
switch ev.Type {
case "assistant":
if ev.Message == nil {
return nil, nil
}
var evts []stream.Event
for _, c := range ev.Message.Content {
if c.Type == "text" && c.Text != "" {
evts = append(evts, stream.Event{Type: stream.EventTextDelta, Text: c.Text})
}
}
return evts, nil
case "result":
if ev.IsError {
return []stream.Event{{
Type: stream.EventError,
Err: fmt.Errorf("claude CLI: %s", ev.Subtype),
}}, nil
}
var evts []stream.Event
if ev.Usage != nil {
evts = append(evts, stream.Event{
Type: stream.EventUsage,
Usage: &message.Usage{
InputTokens: ev.Usage.InputTokens,
OutputTokens: ev.Usage.OutputTokens,
},
StopReason: claudeStopReason(ev.StopReason),
})
}
return evts, nil
}
// system, rate_limit_event, tool, etc. — intentionally ignored
return nil, nil
}
func (p *claudeParser) Done() []stream.Event { return nil }
func claudeStopReason(s string) message.StopReason {
switch s {
case "end_turn":
return message.StopEndTurn
case "max_tokens":
return message.StopMaxTokens
default:
return message.StopEndTurn
}
}
// --- gemini-stream-json ---
// Format emitted by: gemini -p "..." --output-format stream-json
//
// Relevant event types:
// type=message, role=assistant, delta=true → EventTextDelta
// type=result, status=success → EventUsage
type geminiParser struct{}
func newGeminiParser() FormatParser { return &geminiParser{} }
type geminiEvent struct {
Type string `json:"type"`
Role string `json:"role,omitempty"`
Content string `json:"content,omitempty"`
Delta bool `json:"delta,omitempty"`
Status string `json:"status,omitempty"`
Stats *geminiStats `json:"stats,omitempty"`
}
type geminiStats struct {
InputTokens int64 `json:"input_tokens"`
OutputTokens int64 `json:"output_tokens"`
}
func (p *geminiParser) ParseLine(line []byte) ([]stream.Event, error) {
var ev geminiEvent
if err := json.Unmarshal(line, &ev); err != nil {
return nil, fmt.Errorf("gemini: parse line: %w", err)
}
switch ev.Type {
case "message":
if ev.Role == "assistant" && ev.Content != "" {
return []stream.Event{{Type: stream.EventTextDelta, Text: ev.Content}}, nil
}
// user messages and empty assistant messages are ignored
return nil, nil
case "result":
if ev.Stats == nil {
return nil, nil
}
stopReason := message.StopEndTurn
if ev.Status != "success" {
return []stream.Event{{
Type: stream.EventError,
Err: fmt.Errorf("gemini CLI: result status %q", ev.Status),
}}, nil
}
return []stream.Event{{
Type: stream.EventUsage,
Usage: &message.Usage{
InputTokens: ev.Stats.InputTokens,
OutputTokens: ev.Stats.OutputTokens,
},
StopReason: stopReason,
}}, nil
}
// init, other types — ignored
return nil, nil
}
func (p *geminiParser) Done() []stream.Event { return nil }
// --- vibe-streaming (mistral) ---
// Format emitted by: vibe -p "..." --output streaming --trust
//
// Each line is a JSON message object with a "role" field.
// role=assistant: content → EventTextDelta; reasoning_content → EventThinkingDelta
// role=system, role=user: ignored
// No explicit "done" event — stream ends when process exits.
type vibeParser struct{}
func newVibeParser() FormatParser { return &vibeParser{} }
type vibeMessage struct {
Role string `json:"role"`
Content string `json:"content"`
ReasoningContent *string `json:"reasoning_content"`
MessageID *string `json:"message_id"`
}
func (p *vibeParser) ParseLine(line []byte) ([]stream.Event, error) {
var msg vibeMessage
if err := json.Unmarshal(line, &msg); err != nil {
return nil, fmt.Errorf("vibe: parse line: %w", err)
}
if msg.Role != "assistant" {
return nil, nil
}
var evts []stream.Event
if msg.ReasoningContent != nil && *msg.ReasoningContent != "" {
evts = append(evts, stream.Event{
Type: stream.EventThinkingDelta,
Text: *msg.ReasoningContent,
})
}
if msg.Content != "" {
evts = append(evts, stream.Event{Type: stream.EventTextDelta, Text: msg.Content})
}
return evts, nil
}
func (p *vibeParser) Done() []stream.Event { return nil }
// --- agy-text ---
// Format emitted by: agy -p "..."
//
// agy emits plain text to stdout. Each line is emitted as an EventTextDelta.
// If ResponseFormat is JSON, the prompt was augmented to request JSON;
// we still emit everything as text so the user sees progress.
type agyParser struct {
rf *provider.ResponseFormat
}
func newAgyParser(rf *provider.ResponseFormat) FormatParser {
return &agyParser{rf: rf}
}
func (p *agyParser) ParseLine(line []byte) ([]stream.Event, error) {
return []stream.Event{{
Type: stream.EventTextDelta,
Text: string(line) + "\n",
}}, nil
}
func (p *agyParser) Done() []stream.Event { return nil }
// --- codex-stream-json ---
// Format emitted by: codex exec "..." --json --dangerously-bypass-approvals-and-sandbox
//
// Relevant event types:
// type=item.completed, item.type=agent_message → EventTextDelta (using item.text)
// type=turn.completed → EventUsage (using usage)
type codexParser struct{}
func newCodexParser() FormatParser { return &codexParser{} }
type codexEvent struct {
Type string `json:"type"`
Item *codexItem `json:"item,omitempty"`
Usage *codexUsage `json:"usage,omitempty"`
}
type codexItem struct {
Type string `json:"type"`
Text string `json:"text"`
}
type codexUsage struct {
InputTokens int64 `json:"input_tokens"`
OutputTokens int64 `json:"output_tokens"`
PromptTokens int64 `json:"prompt_tokens"`
CompletionTokens int64 `json:"completion_tokens"`
CachedInputTokens int64 `json:"cached_input_tokens"`
ReasoningOutputTokens int64 `json:"reasoning_output_tokens"`
}
func (p *codexParser) ParseLine(line []byte) ([]stream.Event, error) {
// Codex emits banner/debug lines to stdout interleaved with the JSON
// event stream (version notes, sandbox warnings, "starting turn" log
// lines, etc.). Skip anything that isn't a JSON object so a stray
// banner can't abort the turn — subprocessStream.Next treats a
// parser error as terminal.
trimmed := bytes.TrimSpace(line)
if len(trimmed) == 0 || trimmed[0] != '{' {
return nil, nil
}
var ev codexEvent
if err := json.Unmarshal(trimmed, &ev); err != nil {
// Looks like JSON but won't parse — log and skip rather than
// killing the stream; codex JSON-line output is the only path
// we have to recover from a malformed line.
slog.Debug("codex: skipping unparseable JSON line", "err", err, "line", string(trimmed))
return nil, nil
}
switch ev.Type {
case "item.completed":
if ev.Item != nil && ev.Item.Type == "agent_message" && ev.Item.Text != "" {
return []stream.Event{{Type: stream.EventTextDelta, Text: ev.Item.Text}}, nil
}
case "turn.completed":
if ev.Usage != nil {
// Some codex builds emit input_tokens, others (older) emit
// prompt_tokens; new builds occasionally include both with
// slightly different values. max() prevents silent
// undercounting when both are non-zero.
input := ev.Usage.InputTokens
if ev.Usage.PromptTokens > input {
input = ev.Usage.PromptTokens
}
output := ev.Usage.OutputTokens
if ev.Usage.CompletionTokens > output {
output = ev.Usage.CompletionTokens
}
// codex (OpenAI Responses API semantics) reports input_tokens
// as the TOTAL input including cache hits. message.Usage.Add()
// sums InputTokens and CacheReadTokens as peers, so store the
// uncached residual here and the hit count separately —
// matches the anthropic provider. Clamp at zero in case a
// future codex build reports cached > input due to schema drift.
if ev.Usage.CachedInputTokens > 0 {
input -= ev.Usage.CachedInputTokens
if input < 0 {
input = 0
}
}
// reasoning_output_tokens appears at top level as a peer to
// output_tokens. Treat as a separately billable counter (not a
// nested subset) and fold in for accurate spend.
output += ev.Usage.ReasoningOutputTokens
return []stream.Event{{
Type: stream.EventUsage,
Usage: &message.Usage{
InputTokens: input,
OutputTokens: output,
CacheReadTokens: ev.Usage.CachedInputTokens,
},
StopReason: message.StopEndTurn,
}}, nil
}
}
return nil, nil
}
func (p *codexParser) Done() []stream.Event { return nil }