provider/openai: - Fix doubled tool call args (argsComplete flag): Ollama sends complete args in the first streaming chunk then repeats them as delta, causing doubled JSON and 400 errors in elfs - Handle fs: prefix (gemma4 uses fs:grep instead of fs.grep) - Add Reasoning field support for Ollama thinking output cmd/gnoma: - Early TTY detection so logger is created with correct destination before any component gets a reference to it (fixes slog WARN bleed into TUI textarea) permission: - Exempt spawn_elfs and agent tools from safety scanner: elf prompt text may legitimately mention .env/.ssh/credentials patterns and should not be blocked tui/app: - /init retry chain: no-tool-calls → spawn_elfs nudge → write nudge (ask for plain text output) → TUI fallback write from streamBuf - looksLikeAgentsMD + extractMarkdownDoc: validate and clean fallback content before writing (reject refusals, strip narrative preambles) - Collapse thinking output to 3 lines; ctrl+o to expand (live stream and committed messages) - Stream-level filter for model pseudo-tool-call blocks: suppresses <<tool_code>>...</tool_code>> and <<function_call>>...<tool_call|> from entering streamBuf across chunk boundaries - sanitizeAssistantText regex covers both block formats - Reset streamFilterClose at every turn start
58 lines
1.4 KiB
Go
58 lines
1.4 KiB
Go
package provider
|
|
|
|
import (
|
|
"context"
|
|
"sync"
|
|
|
|
"somegit.dev/Owlibou/gnoma/internal/stream"
|
|
)
|
|
|
|
// ConcurrentProvider wraps a Provider with a shared semaphore that limits the
|
|
// number of in-flight Stream calls. All engines sharing the same
|
|
// ConcurrentProvider instance share the same concurrency budget.
|
|
type ConcurrentProvider struct {
|
|
Provider
|
|
sem chan struct{}
|
|
}
|
|
|
|
// WithConcurrency wraps p so that at most max Stream calls can be in-flight
|
|
// simultaneously. If max <= 0, p is returned unwrapped.
|
|
func WithConcurrency(p Provider, max int) Provider {
|
|
if max <= 0 {
|
|
return p
|
|
}
|
|
sem := make(chan struct{}, max)
|
|
for range max {
|
|
sem <- struct{}{}
|
|
}
|
|
return &ConcurrentProvider{Provider: p, sem: sem}
|
|
}
|
|
|
|
// Stream acquires a concurrency slot, calls the inner provider, and returns a
|
|
// stream that releases the slot when Close is called.
|
|
func (cp *ConcurrentProvider) Stream(ctx context.Context, req Request) (stream.Stream, error) {
|
|
select {
|
|
case <-cp.sem:
|
|
case <-ctx.Done():
|
|
return nil, ctx.Err()
|
|
}
|
|
s, err := cp.Provider.Stream(ctx, req)
|
|
if err != nil {
|
|
cp.sem <- struct{}{}
|
|
return nil, err
|
|
}
|
|
return &semStream{Stream: s, release: func() { cp.sem <- struct{}{} }}, nil
|
|
}
|
|
|
|
// semStream wraps a stream.Stream to release a semaphore slot on Close.
|
|
type semStream struct {
|
|
stream.Stream
|
|
release func()
|
|
once sync.Once
|
|
}
|
|
|
|
func (s *semStream) Close() error {
|
|
s.once.Do(s.release)
|
|
return s.Stream.Close()
|
|
}
|