feat: M7 Elfs — sub-agents with router-integrated spawning
internal/elf/: - BackgroundElf: runs on own goroutine with independent engine, history, and provider. No shared mutable state. - Manager: spawns elfs via router.Select() (picks best arm per task type), tracks lifecycle, WaitAll(), CancelAll(), Cleanup(). internal/tool/agent/: - Agent tool: LLM can call 'agent' to spawn sub-agents. Supports task_type hint for routing, wait/background mode. 5-minute timeout, context cancellation propagated. Concurrent tool execution: - Read-only tools (fs.read, fs.grep, fs.glob, etc.) execute in parallel via goroutines. - Write tools (bash, fs.write, fs.edit) execute sequentially. - Partition by tool.IsReadOnly(). TUI: /elf command explains how to use sub-agents. 5 elf tests. Exit criteria: parent spawns 3 background elfs on different providers, collects and synthesizes results.
This commit is contained in:
@@ -29,6 +29,8 @@ import (
|
|||||||
"somegit.dev/Owlibou/gnoma/internal/tui"
|
"somegit.dev/Owlibou/gnoma/internal/tui"
|
||||||
|
|
||||||
tea "charm.land/bubbletea/v2"
|
tea "charm.land/bubbletea/v2"
|
||||||
|
"somegit.dev/Owlibou/gnoma/internal/elf"
|
||||||
|
"somegit.dev/Owlibou/gnoma/internal/tool/agent"
|
||||||
"somegit.dev/Owlibou/gnoma/internal/tool/bash"
|
"somegit.dev/Owlibou/gnoma/internal/tool/bash"
|
||||||
"somegit.dev/Owlibou/gnoma/internal/tool/fs"
|
"somegit.dev/Owlibou/gnoma/internal/tool/fs"
|
||||||
"somegit.dev/Owlibou/gnoma/internal/tool/sysinfo"
|
"somegit.dev/Owlibou/gnoma/internal/tool/sysinfo"
|
||||||
@@ -141,6 +143,9 @@ func main() {
|
|||||||
// Register system_info tool backed by the inventory
|
// Register system_info tool backed by the inventory
|
||||||
reg.Register(sysinfo.New(inventory))
|
reg.Register(sysinfo.New(inventory))
|
||||||
|
|
||||||
|
// Elf manager (created now, agent tool registered after router exists)
|
||||||
|
// We'll register the agent tool after the router is created below
|
||||||
|
|
||||||
// Create router and register the provider as a single arm
|
// Create router and register the provider as a single arm
|
||||||
// (M4 foundation: one provider from CLI. Multi-provider routing comes with config.)
|
// (M4 foundation: one provider from CLI. Multi-provider routing comes with config.)
|
||||||
rtr := router.New(router.Config{Logger: logger})
|
rtr := router.New(router.Config{Logger: logger})
|
||||||
@@ -174,6 +179,14 @@ func main() {
|
|||||||
logger.Debug("local models discovered", "count", len(localModels))
|
logger.Debug("local models discovered", "count", len(localModels))
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Create elf manager and register agent tool
|
||||||
|
elfMgr := elf.NewManager(elf.ManagerConfig{
|
||||||
|
Router: rtr,
|
||||||
|
Tools: reg,
|
||||||
|
Logger: logger,
|
||||||
|
})
|
||||||
|
reg.Register(agent.New(elfMgr))
|
||||||
|
|
||||||
// Create firewall
|
// Create firewall
|
||||||
fw := security.NewFirewall(security.FirewallConfig{
|
fw := security.NewFirewall(security.FirewallConfig{
|
||||||
ScanOutgoing: true,
|
ScanOutgoing: true,
|
||||||
|
|||||||
153
internal/elf/elf.go
Normal file
153
internal/elf/elf.go
Normal file
@@ -0,0 +1,153 @@
|
|||||||
|
package elf
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"fmt"
|
||||||
|
"sync/atomic"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"somegit.dev/Owlibou/gnoma/internal/engine"
|
||||||
|
"somegit.dev/Owlibou/gnoma/internal/message"
|
||||||
|
"somegit.dev/Owlibou/gnoma/internal/stream"
|
||||||
|
)
|
||||||
|
|
||||||
|
// Status tracks the lifecycle of an elf.
|
||||||
|
type Status int
|
||||||
|
|
||||||
|
const (
|
||||||
|
StatusPending Status = iota
|
||||||
|
StatusRunning
|
||||||
|
StatusCompleted
|
||||||
|
StatusFailed
|
||||||
|
StatusCancelled
|
||||||
|
)
|
||||||
|
|
||||||
|
func (s Status) String() string {
|
||||||
|
switch s {
|
||||||
|
case StatusPending:
|
||||||
|
return "pending"
|
||||||
|
case StatusRunning:
|
||||||
|
return "running"
|
||||||
|
case StatusCompleted:
|
||||||
|
return "completed"
|
||||||
|
case StatusFailed:
|
||||||
|
return "failed"
|
||||||
|
case StatusCancelled:
|
||||||
|
return "cancelled"
|
||||||
|
default:
|
||||||
|
return "unknown"
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Result is the output of a completed elf.
|
||||||
|
type Result struct {
|
||||||
|
ID string
|
||||||
|
Status Status
|
||||||
|
Messages []message.Message
|
||||||
|
Usage message.Usage
|
||||||
|
Output string // final text output
|
||||||
|
Error error
|
||||||
|
Duration time.Duration
|
||||||
|
}
|
||||||
|
|
||||||
|
// Elf is a sub-agent with its own engine and conversation history.
|
||||||
|
type Elf interface {
|
||||||
|
// ID returns the unique elf identifier.
|
||||||
|
ID() string
|
||||||
|
// Status returns the current lifecycle status.
|
||||||
|
Status() Status
|
||||||
|
// Events returns a channel for streaming events (nil for sync elfs).
|
||||||
|
Events() <-chan stream.Event
|
||||||
|
// Wait blocks until the elf completes and returns its result.
|
||||||
|
Wait() Result
|
||||||
|
// Cancel aborts the elf.
|
||||||
|
Cancel()
|
||||||
|
}
|
||||||
|
|
||||||
|
var elfCounter atomic.Int64
|
||||||
|
|
||||||
|
func nextID(prefix string) string {
|
||||||
|
n := elfCounter.Add(1)
|
||||||
|
return fmt.Sprintf("%s-%d", prefix, n)
|
||||||
|
}
|
||||||
|
|
||||||
|
// BackgroundElf runs on its own goroutine with an independent engine.
|
||||||
|
type BackgroundElf struct {
|
||||||
|
id string
|
||||||
|
eng *engine.Engine
|
||||||
|
events chan stream.Event
|
||||||
|
result chan Result
|
||||||
|
cancel context.CancelFunc
|
||||||
|
status atomic.Int32
|
||||||
|
startAt time.Time
|
||||||
|
}
|
||||||
|
|
||||||
|
// SpawnBackground creates and starts a background elf.
|
||||||
|
func SpawnBackground(eng *engine.Engine, prompt string) *BackgroundElf {
|
||||||
|
ctx, cancel := context.WithCancel(context.Background())
|
||||||
|
|
||||||
|
elf := &BackgroundElf{
|
||||||
|
id: nextID("elf"),
|
||||||
|
eng: eng,
|
||||||
|
events: make(chan stream.Event, 64),
|
||||||
|
result: make(chan Result, 1),
|
||||||
|
cancel: cancel,
|
||||||
|
startAt: time.Now(),
|
||||||
|
}
|
||||||
|
elf.status.Store(int32(StatusRunning))
|
||||||
|
|
||||||
|
go elf.run(ctx, prompt)
|
||||||
|
|
||||||
|
return elf
|
||||||
|
}
|
||||||
|
|
||||||
|
func (e *BackgroundElf) run(ctx context.Context, prompt string) {
|
||||||
|
cb := func(evt stream.Event) {
|
||||||
|
select {
|
||||||
|
case e.events <- evt:
|
||||||
|
case <-ctx.Done():
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
turn, err := e.eng.Submit(ctx, prompt, cb)
|
||||||
|
|
||||||
|
close(e.events)
|
||||||
|
|
||||||
|
r := Result{
|
||||||
|
ID: e.id,
|
||||||
|
Duration: time.Since(e.startAt),
|
||||||
|
}
|
||||||
|
|
||||||
|
if ctx.Err() != nil {
|
||||||
|
r.Status = StatusCancelled
|
||||||
|
r.Error = ctx.Err()
|
||||||
|
e.status.Store(int32(StatusCancelled))
|
||||||
|
} else if err != nil {
|
||||||
|
r.Status = StatusFailed
|
||||||
|
r.Error = err
|
||||||
|
e.status.Store(int32(StatusFailed))
|
||||||
|
} else {
|
||||||
|
r.Status = StatusCompleted
|
||||||
|
r.Messages = turn.Messages
|
||||||
|
r.Usage = turn.Usage
|
||||||
|
// Extract final text from last assistant message
|
||||||
|
for i := len(turn.Messages) - 1; i >= 0; i-- {
|
||||||
|
if turn.Messages[i].Role == message.RoleAssistant {
|
||||||
|
r.Output = turn.Messages[i].TextContent()
|
||||||
|
break
|
||||||
|
}
|
||||||
|
}
|
||||||
|
e.status.Store(int32(StatusCompleted))
|
||||||
|
}
|
||||||
|
|
||||||
|
e.result <- r
|
||||||
|
}
|
||||||
|
|
||||||
|
func (e *BackgroundElf) ID() string { return e.id }
|
||||||
|
func (e *BackgroundElf) Status() Status { return Status(e.status.Load()) }
|
||||||
|
func (e *BackgroundElf) Events() <-chan stream.Event { return e.events }
|
||||||
|
func (e *BackgroundElf) Cancel() { e.cancel() }
|
||||||
|
|
||||||
|
func (e *BackgroundElf) Wait() Result {
|
||||||
|
return <-e.result
|
||||||
|
}
|
||||||
239
internal/elf/elf_test.go
Normal file
239
internal/elf/elf_test.go
Normal file
@@ -0,0 +1,239 @@
|
|||||||
|
package elf
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"fmt"
|
||||||
|
"testing"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"somegit.dev/Owlibou/gnoma/internal/engine"
|
||||||
|
"somegit.dev/Owlibou/gnoma/internal/message"
|
||||||
|
"somegit.dev/Owlibou/gnoma/internal/provider"
|
||||||
|
"somegit.dev/Owlibou/gnoma/internal/router"
|
||||||
|
"somegit.dev/Owlibou/gnoma/internal/stream"
|
||||||
|
"somegit.dev/Owlibou/gnoma/internal/tool"
|
||||||
|
)
|
||||||
|
|
||||||
|
// --- Mock Provider ---
|
||||||
|
|
||||||
|
type mockProvider struct {
|
||||||
|
name string
|
||||||
|
calls int
|
||||||
|
streams []stream.Stream
|
||||||
|
}
|
||||||
|
|
||||||
|
func (m *mockProvider) Name() string { return m.name }
|
||||||
|
func (m *mockProvider) DefaultModel() string { return "mock" }
|
||||||
|
func (m *mockProvider) Models(_ context.Context) ([]provider.ModelInfo, error) { return nil, nil }
|
||||||
|
func (m *mockProvider) Stream(_ context.Context, _ provider.Request) (stream.Stream, error) {
|
||||||
|
if m.calls >= len(m.streams) {
|
||||||
|
return nil, fmt.Errorf("no more streams")
|
||||||
|
}
|
||||||
|
s := m.streams[m.calls]
|
||||||
|
m.calls++
|
||||||
|
return s, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
type eventStream struct {
|
||||||
|
events []stream.Event
|
||||||
|
idx int
|
||||||
|
}
|
||||||
|
|
||||||
|
func newEventStream(text string) *eventStream {
|
||||||
|
return &eventStream{
|
||||||
|
events: []stream.Event{
|
||||||
|
{Type: stream.EventTextDelta, Text: text},
|
||||||
|
{Type: stream.EventTextDelta, StopReason: message.StopEndTurn},
|
||||||
|
},
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *eventStream) Next() bool { s.idx++; return s.idx <= len(s.events) }
|
||||||
|
func (s *eventStream) Current() stream.Event { return s.events[s.idx-1] }
|
||||||
|
func (s *eventStream) Err() error { return nil }
|
||||||
|
func (s *eventStream) Close() error { return nil }
|
||||||
|
|
||||||
|
// --- Tests ---
|
||||||
|
|
||||||
|
func TestBackgroundElf_RunsAndCompletes(t *testing.T) {
|
||||||
|
mp := &mockProvider{
|
||||||
|
name: "test",
|
||||||
|
streams: []stream.Stream{newEventStream("Hello from elf!")},
|
||||||
|
}
|
||||||
|
eng, _ := engine.New(engine.Config{Provider: mp, Tools: tool.NewRegistry()})
|
||||||
|
|
||||||
|
elf := SpawnBackground(eng, "say hello")
|
||||||
|
|
||||||
|
if elf.Status() != StatusRunning {
|
||||||
|
t.Errorf("initial status = %s, want running", elf.Status())
|
||||||
|
}
|
||||||
|
|
||||||
|
result := elf.Wait()
|
||||||
|
|
||||||
|
if result.Status != StatusCompleted {
|
||||||
|
t.Errorf("result status = %s, want completed", result.Status)
|
||||||
|
}
|
||||||
|
if result.Output != "Hello from elf!" {
|
||||||
|
t.Errorf("output = %q", result.Output)
|
||||||
|
}
|
||||||
|
if result.Duration <= 0 {
|
||||||
|
t.Error("duration should be positive")
|
||||||
|
}
|
||||||
|
if elf.Status() != StatusCompleted {
|
||||||
|
t.Errorf("final status = %s, want completed", elf.Status())
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestBackgroundElf_Cancel(t *testing.T) {
|
||||||
|
// Stream that blocks
|
||||||
|
slowStream := &slowEventStream{}
|
||||||
|
mp := &mockProvider{
|
||||||
|
name: "test",
|
||||||
|
streams: []stream.Stream{slowStream},
|
||||||
|
}
|
||||||
|
eng, _ := engine.New(engine.Config{Provider: mp, Tools: tool.NewRegistry()})
|
||||||
|
|
||||||
|
elf := SpawnBackground(eng, "slow task")
|
||||||
|
|
||||||
|
time.Sleep(10 * time.Millisecond)
|
||||||
|
elf.Cancel()
|
||||||
|
|
||||||
|
result := elf.Wait()
|
||||||
|
if result.Status != StatusCancelled && result.Status != StatusFailed {
|
||||||
|
t.Errorf("status = %s, want cancelled or failed", result.Status)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestBackgroundElf_CollectEvents(t *testing.T) {
|
||||||
|
mp := &mockProvider{
|
||||||
|
name: "test",
|
||||||
|
streams: []stream.Stream{newEventStream("event test")},
|
||||||
|
}
|
||||||
|
eng, _ := engine.New(engine.Config{Provider: mp, Tools: tool.NewRegistry()})
|
||||||
|
|
||||||
|
elf := SpawnBackground(eng, "generate events")
|
||||||
|
|
||||||
|
var events []stream.Event
|
||||||
|
for evt := range elf.Events() {
|
||||||
|
events = append(events, evt)
|
||||||
|
}
|
||||||
|
|
||||||
|
if len(events) == 0 {
|
||||||
|
t.Error("should receive events")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestManager_SpawnAndList(t *testing.T) {
|
||||||
|
mp := &mockProvider{
|
||||||
|
name: "test",
|
||||||
|
streams: []stream.Stream{
|
||||||
|
newEventStream("elf 1"),
|
||||||
|
newEventStream("elf 2"),
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
rtr := router.New(router.Config{})
|
||||||
|
rtr.RegisterArm(&router.Arm{
|
||||||
|
ID: "test/mock",
|
||||||
|
Provider: mp,
|
||||||
|
ModelName: "mock",
|
||||||
|
Capabilities: provider.Capabilities{ToolUse: true},
|
||||||
|
})
|
||||||
|
|
||||||
|
mgr := NewManager(ManagerConfig{
|
||||||
|
Router: rtr,
|
||||||
|
Tools: tool.NewRegistry(),
|
||||||
|
})
|
||||||
|
|
||||||
|
// Spawn two elfs
|
||||||
|
e1, err := mgr.Spawn(context.Background(), router.TaskGeneration, "task 1", "you are elf 1")
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("Spawn 1: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
e2, err := mgr.Spawn(context.Background(), router.TaskReview, "task 2", "you are elf 2")
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("Spawn 2: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
// List should have 2
|
||||||
|
if len(mgr.List()) != 2 {
|
||||||
|
t.Errorf("List() = %d, want 2", len(mgr.List()))
|
||||||
|
}
|
||||||
|
|
||||||
|
// Wait for both
|
||||||
|
r1 := e1.Wait()
|
||||||
|
r2 := e2.Wait()
|
||||||
|
|
||||||
|
if r1.Status != StatusCompleted {
|
||||||
|
t.Errorf("elf 1 status = %s", r1.Status)
|
||||||
|
}
|
||||||
|
if r2.Status != StatusCompleted {
|
||||||
|
t.Errorf("elf 2 status = %s", r2.Status)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Active should be 0
|
||||||
|
if len(mgr.Active()) != 0 {
|
||||||
|
t.Errorf("Active() = %d, want 0", len(mgr.Active()))
|
||||||
|
}
|
||||||
|
|
||||||
|
// Cleanup
|
||||||
|
mgr.Cleanup()
|
||||||
|
if len(mgr.List()) != 0 {
|
||||||
|
t.Errorf("after cleanup, List() = %d", len(mgr.List()))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestManager_WaitAll(t *testing.T) {
|
||||||
|
mp := &mockProvider{
|
||||||
|
name: "test",
|
||||||
|
streams: []stream.Stream{
|
||||||
|
newEventStream("result A"),
|
||||||
|
newEventStream("result B"),
|
||||||
|
newEventStream("result C"),
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
rtr := router.New(router.Config{})
|
||||||
|
rtr.RegisterArm(&router.Arm{
|
||||||
|
ID: "test/mock", Provider: mp, ModelName: "mock",
|
||||||
|
Capabilities: provider.Capabilities{ToolUse: true},
|
||||||
|
})
|
||||||
|
|
||||||
|
mgr := NewManager(ManagerConfig{Router: rtr, Tools: tool.NewRegistry()})
|
||||||
|
|
||||||
|
mgr.Spawn(context.Background(), router.TaskGeneration, "a", "")
|
||||||
|
mgr.Spawn(context.Background(), router.TaskGeneration, "b", "")
|
||||||
|
mgr.Spawn(context.Background(), router.TaskGeneration, "c", "")
|
||||||
|
|
||||||
|
results := mgr.WaitAll()
|
||||||
|
if len(results) != 3 {
|
||||||
|
t.Fatalf("WaitAll() = %d results, want 3", len(results))
|
||||||
|
}
|
||||||
|
|
||||||
|
completed := 0
|
||||||
|
for _, r := range results {
|
||||||
|
if r.Status == StatusCompleted {
|
||||||
|
completed++
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if completed != 3 {
|
||||||
|
t.Errorf("%d completed, want 3", completed)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// slowEventStream blocks until context cancelled
|
||||||
|
type slowEventStream struct {
|
||||||
|
done bool
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *slowEventStream) Next() bool {
|
||||||
|
if s.done {
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
time.Sleep(100 * time.Millisecond)
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
func (s *slowEventStream) Current() stream.Event { return stream.Event{} }
|
||||||
|
func (s *slowEventStream) Err() error { return context.Canceled }
|
||||||
|
func (s *slowEventStream) Close() error { s.done = true; return nil }
|
||||||
184
internal/elf/manager.go
Normal file
184
internal/elf/manager.go
Normal file
@@ -0,0 +1,184 @@
|
|||||||
|
package elf
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"fmt"
|
||||||
|
"log/slog"
|
||||||
|
"sync"
|
||||||
|
|
||||||
|
"somegit.dev/Owlibou/gnoma/internal/engine"
|
||||||
|
"somegit.dev/Owlibou/gnoma/internal/provider"
|
||||||
|
"somegit.dev/Owlibou/gnoma/internal/router"
|
||||||
|
"somegit.dev/Owlibou/gnoma/internal/tool"
|
||||||
|
)
|
||||||
|
|
||||||
|
// Manager spawns, tracks, and manages elfs.
|
||||||
|
type Manager struct {
|
||||||
|
mu sync.RWMutex
|
||||||
|
elfs map[string]Elf
|
||||||
|
router *router.Router
|
||||||
|
tools *tool.Registry
|
||||||
|
logger *slog.Logger
|
||||||
|
}
|
||||||
|
|
||||||
|
type ManagerConfig struct {
|
||||||
|
Router *router.Router
|
||||||
|
Tools *tool.Registry
|
||||||
|
Logger *slog.Logger
|
||||||
|
}
|
||||||
|
|
||||||
|
func NewManager(cfg ManagerConfig) *Manager {
|
||||||
|
logger := cfg.Logger
|
||||||
|
if logger == nil {
|
||||||
|
logger = slog.Default()
|
||||||
|
}
|
||||||
|
return &Manager{
|
||||||
|
elfs: make(map[string]Elf),
|
||||||
|
router: cfg.Router,
|
||||||
|
tools: cfg.Tools,
|
||||||
|
logger: logger,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Spawn creates a new background elf with a router-selected provider.
|
||||||
|
// The elf gets its own engine, history, and tools — no shared state.
|
||||||
|
func (m *Manager) Spawn(ctx context.Context, taskType router.TaskType, prompt, systemPrompt string) (Elf, error) {
|
||||||
|
// Ask router for the best arm for this task type
|
||||||
|
task := router.Task{
|
||||||
|
Type: taskType,
|
||||||
|
RequiresTools: true,
|
||||||
|
Priority: router.PriorityNormal,
|
||||||
|
EstimatedTokens: 4000,
|
||||||
|
}
|
||||||
|
|
||||||
|
decision := m.router.Select(task)
|
||||||
|
if decision.Error != nil {
|
||||||
|
return nil, fmt.Errorf("no arm available for elf: %w", decision.Error)
|
||||||
|
}
|
||||||
|
|
||||||
|
arm := decision.Arm
|
||||||
|
m.logger.Info("spawning elf",
|
||||||
|
"arm", arm.ID,
|
||||||
|
"task_type", taskType,
|
||||||
|
"model", arm.ModelName,
|
||||||
|
)
|
||||||
|
|
||||||
|
// Create independent engine for the elf
|
||||||
|
eng, err := engine.New(engine.Config{
|
||||||
|
Provider: arm.Provider,
|
||||||
|
Tools: m.tools,
|
||||||
|
System: systemPrompt,
|
||||||
|
Model: arm.ModelName,
|
||||||
|
MaxTurns: 20,
|
||||||
|
Logger: m.logger,
|
||||||
|
})
|
||||||
|
if err != nil {
|
||||||
|
return nil, fmt.Errorf("create elf engine: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
elf := SpawnBackground(eng, prompt)
|
||||||
|
|
||||||
|
m.mu.Lock()
|
||||||
|
m.elfs[elf.ID()] = elf
|
||||||
|
m.mu.Unlock()
|
||||||
|
|
||||||
|
m.logger.Info("elf spawned", "id", elf.ID(), "arm", arm.ID)
|
||||||
|
return elf, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// SpawnWithProvider creates an elf using a specific provider (bypasses router).
|
||||||
|
func (m *Manager) SpawnWithProvider(prov provider.Provider, model, prompt, systemPrompt string) (Elf, error) {
|
||||||
|
eng, err := engine.New(engine.Config{
|
||||||
|
Provider: prov,
|
||||||
|
Tools: m.tools,
|
||||||
|
System: systemPrompt,
|
||||||
|
Model: model,
|
||||||
|
MaxTurns: 20,
|
||||||
|
Logger: m.logger,
|
||||||
|
})
|
||||||
|
if err != nil {
|
||||||
|
return nil, fmt.Errorf("create elf engine: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
elf := SpawnBackground(eng, prompt)
|
||||||
|
|
||||||
|
m.mu.Lock()
|
||||||
|
m.elfs[elf.ID()] = elf
|
||||||
|
m.mu.Unlock()
|
||||||
|
|
||||||
|
m.logger.Info("elf spawned (direct)", "id", elf.ID(), "model", model)
|
||||||
|
return elf, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// Get returns an elf by ID.
|
||||||
|
func (m *Manager) Get(id string) (Elf, bool) {
|
||||||
|
m.mu.RLock()
|
||||||
|
defer m.mu.RUnlock()
|
||||||
|
e, ok := m.elfs[id]
|
||||||
|
return e, ok
|
||||||
|
}
|
||||||
|
|
||||||
|
// List returns all tracked elfs.
|
||||||
|
func (m *Manager) List() []Elf {
|
||||||
|
m.mu.RLock()
|
||||||
|
defer m.mu.RUnlock()
|
||||||
|
elfs := make([]Elf, 0, len(m.elfs))
|
||||||
|
for _, e := range m.elfs {
|
||||||
|
elfs = append(elfs, e)
|
||||||
|
}
|
||||||
|
return elfs
|
||||||
|
}
|
||||||
|
|
||||||
|
// Active returns elfs that are still running.
|
||||||
|
func (m *Manager) Active() []Elf {
|
||||||
|
m.mu.RLock()
|
||||||
|
defer m.mu.RUnlock()
|
||||||
|
var active []Elf
|
||||||
|
for _, e := range m.elfs {
|
||||||
|
if e.Status() == StatusRunning {
|
||||||
|
active = append(active, e)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return active
|
||||||
|
}
|
||||||
|
|
||||||
|
// CancelAll cancels all running elfs.
|
||||||
|
func (m *Manager) CancelAll() {
|
||||||
|
m.mu.RLock()
|
||||||
|
defer m.mu.RUnlock()
|
||||||
|
for _, e := range m.elfs {
|
||||||
|
if e.Status() == StatusRunning {
|
||||||
|
e.Cancel()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// WaitAll waits for all elfs to complete and returns their results.
|
||||||
|
func (m *Manager) WaitAll() []Result {
|
||||||
|
elfs := m.List()
|
||||||
|
results := make([]Result, len(elfs))
|
||||||
|
var wg sync.WaitGroup
|
||||||
|
|
||||||
|
for i, e := range elfs {
|
||||||
|
wg.Add(1)
|
||||||
|
go func(idx int, elf Elf) {
|
||||||
|
defer wg.Done()
|
||||||
|
results[idx] = elf.Wait()
|
||||||
|
}(i, e)
|
||||||
|
}
|
||||||
|
|
||||||
|
wg.Wait()
|
||||||
|
return results
|
||||||
|
}
|
||||||
|
|
||||||
|
// Cleanup removes completed/failed/cancelled elfs from tracking.
|
||||||
|
func (m *Manager) Cleanup() {
|
||||||
|
m.mu.Lock()
|
||||||
|
defer m.mu.Unlock()
|
||||||
|
for id, e := range m.elfs {
|
||||||
|
s := e.Status()
|
||||||
|
if s == StatusCompleted || s == StatusFailed || s == StatusCancelled {
|
||||||
|
delete(m.elfs, id)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
@@ -4,6 +4,7 @@ import (
|
|||||||
"context"
|
"context"
|
||||||
"encoding/json"
|
"encoding/json"
|
||||||
"fmt"
|
"fmt"
|
||||||
|
"sync"
|
||||||
|
|
||||||
gnomactx "somegit.dev/Owlibou/gnoma/internal/context"
|
gnomactx "somegit.dev/Owlibou/gnoma/internal/context"
|
||||||
"somegit.dev/Owlibou/gnoma/internal/message"
|
"somegit.dev/Owlibou/gnoma/internal/message"
|
||||||
@@ -11,6 +12,7 @@ import (
|
|||||||
"somegit.dev/Owlibou/gnoma/internal/provider"
|
"somegit.dev/Owlibou/gnoma/internal/provider"
|
||||||
"somegit.dev/Owlibou/gnoma/internal/router"
|
"somegit.dev/Owlibou/gnoma/internal/router"
|
||||||
"somegit.dev/Owlibou/gnoma/internal/stream"
|
"somegit.dev/Owlibou/gnoma/internal/stream"
|
||||||
|
"somegit.dev/Owlibou/gnoma/internal/tool"
|
||||||
)
|
)
|
||||||
|
|
||||||
// Submit sends a user message and runs the agentic loop to completion.
|
// Submit sends a user message and runs the agentic loop to completion.
|
||||||
@@ -198,81 +200,119 @@ func (e *Engine) buildRequest(ctx context.Context) provider.Request {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (e *Engine) executeTools(ctx context.Context, calls []message.ToolCall, cb Callback) ([]message.ToolResult, error) {
|
func (e *Engine) executeTools(ctx context.Context, calls []message.ToolCall, cb Callback) ([]message.ToolResult, error) {
|
||||||
results := make([]message.ToolResult, 0, len(calls))
|
// Partition into read-only (parallel) and write (serial) batches
|
||||||
|
type toolCallWithTool struct {
|
||||||
|
call message.ToolCall
|
||||||
|
tool tool.Tool
|
||||||
|
}
|
||||||
|
|
||||||
|
var readOnly []toolCallWithTool
|
||||||
|
var readWrite []toolCallWithTool
|
||||||
|
var unknownResults []message.ToolResult
|
||||||
|
|
||||||
for _, call := range calls {
|
for _, call := range calls {
|
||||||
t, ok := e.cfg.Tools.Get(call.Name)
|
t, ok := e.cfg.Tools.Get(call.Name)
|
||||||
if !ok {
|
if !ok {
|
||||||
e.logger.Warn("unknown tool", "name", call.Name)
|
e.logger.Warn("unknown tool", "name", call.Name)
|
||||||
results = append(results, message.ToolResult{
|
unknownResults = append(unknownResults, message.ToolResult{
|
||||||
ToolCallID: call.ID,
|
ToolCallID: call.ID,
|
||||||
Content: fmt.Sprintf("unknown tool: %s", call.Name),
|
Content: fmt.Sprintf("unknown tool: %s", call.Name),
|
||||||
IsError: true,
|
IsError: true,
|
||||||
})
|
})
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
tc := toolCallWithTool{call: call, tool: t}
|
||||||
// Permission check
|
if t.IsReadOnly() {
|
||||||
if e.cfg.Permissions != nil {
|
readOnly = append(readOnly, tc)
|
||||||
info := permission.ToolInfo{
|
} else {
|
||||||
Name: call.Name,
|
readWrite = append(readWrite, tc)
|
||||||
IsReadOnly: t.IsReadOnly(),
|
|
||||||
IsDestructive: t.IsDestructive(),
|
|
||||||
}
|
|
||||||
if err := e.cfg.Permissions.Check(ctx, info, call.Arguments); err != nil {
|
|
||||||
e.logger.Info("tool permission denied", "name", call.Name, "error", err)
|
|
||||||
results = append(results, message.ToolResult{
|
|
||||||
ToolCallID: call.ID,
|
|
||||||
Content: fmt.Sprintf("permission denied: %v", err),
|
|
||||||
IsError: true,
|
|
||||||
})
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
|
||||||
e.logger.Debug("executing tool", "name", call.Name, "id", call.ID)
|
results := make([]message.ToolResult, 0, len(calls))
|
||||||
|
results = append(results, unknownResults...)
|
||||||
|
|
||||||
result, err := t.Execute(ctx, call.Arguments)
|
// Execute read-only tools in parallel
|
||||||
if err != nil {
|
if len(readOnly) > 0 {
|
||||||
e.logger.Error("tool execution failed", "name", call.Name, "error", err)
|
e.logger.Debug("executing read-only tools in parallel", "count", len(readOnly))
|
||||||
results = append(results, message.ToolResult{
|
parallelResults := make([]message.ToolResult, len(readOnly))
|
||||||
ToolCallID: call.ID,
|
var wg sync.WaitGroup
|
||||||
Content: err.Error(),
|
for i, tc := range readOnly {
|
||||||
IsError: true,
|
wg.Add(1)
|
||||||
})
|
go func(idx int, tc toolCallWithTool) {
|
||||||
continue
|
defer wg.Done()
|
||||||
|
parallelResults[idx] = e.executeSingleTool(ctx, tc.call, tc.tool, cb)
|
||||||
|
}(i, tc)
|
||||||
}
|
}
|
||||||
|
wg.Wait()
|
||||||
|
results = append(results, parallelResults...)
|
||||||
|
}
|
||||||
|
|
||||||
// Scan tool result through firewall
|
// Execute write tools sequentially
|
||||||
output := result.Output
|
for _, tc := range readWrite {
|
||||||
if e.cfg.Firewall != nil {
|
results = append(results, e.executeSingleTool(ctx, tc.call, tc.tool, cb))
|
||||||
output = e.cfg.Firewall.ScanToolResult(output)
|
|
||||||
}
|
|
||||||
|
|
||||||
// Persist large results to disk
|
|
||||||
if persisted, ok := gnomactx.PersistLargeResult(output, call.ID, ".gnoma/sessions"); ok {
|
|
||||||
e.logger.Debug("tool result persisted to disk", "name", call.Name, "size", len(output))
|
|
||||||
output = persisted
|
|
||||||
}
|
|
||||||
|
|
||||||
// Emit tool result event for the UI
|
|
||||||
if cb != nil {
|
|
||||||
cb(stream.Event{
|
|
||||||
Type: stream.EventToolResult,
|
|
||||||
ToolName: call.Name,
|
|
||||||
ToolOutput: truncate(output, 2000),
|
|
||||||
})
|
|
||||||
}
|
|
||||||
|
|
||||||
results = append(results, message.ToolResult{
|
|
||||||
ToolCallID: call.ID,
|
|
||||||
Content: output,
|
|
||||||
})
|
|
||||||
}
|
}
|
||||||
|
|
||||||
return results, nil
|
return results, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (e *Engine) executeSingleTool(ctx context.Context, call message.ToolCall, t tool.Tool, cb Callback) message.ToolResult {
|
||||||
|
// Permission check
|
||||||
|
if e.cfg.Permissions != nil {
|
||||||
|
info := permission.ToolInfo{
|
||||||
|
Name: call.Name,
|
||||||
|
IsReadOnly: t.IsReadOnly(),
|
||||||
|
IsDestructive: t.IsDestructive(),
|
||||||
|
}
|
||||||
|
if err := e.cfg.Permissions.Check(ctx, info, call.Arguments); err != nil {
|
||||||
|
e.logger.Info("tool permission denied", "name", call.Name, "error", err)
|
||||||
|
return message.ToolResult{
|
||||||
|
ToolCallID: call.ID,
|
||||||
|
Content: fmt.Sprintf("permission denied: %v", err),
|
||||||
|
IsError: true,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
e.logger.Debug("executing tool", "name", call.Name, "id", call.ID)
|
||||||
|
|
||||||
|
result, err := t.Execute(ctx, call.Arguments)
|
||||||
|
if err != nil {
|
||||||
|
e.logger.Error("tool execution failed", "name", call.Name, "error", err)
|
||||||
|
return message.ToolResult{
|
||||||
|
ToolCallID: call.ID,
|
||||||
|
Content: err.Error(),
|
||||||
|
IsError: true,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Scan tool result through firewall
|
||||||
|
output := result.Output
|
||||||
|
if e.cfg.Firewall != nil {
|
||||||
|
output = e.cfg.Firewall.ScanToolResult(output)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Persist large results to disk
|
||||||
|
if persisted, ok := gnomactx.PersistLargeResult(output, call.ID, ".gnoma/sessions"); ok {
|
||||||
|
e.logger.Debug("tool result persisted to disk", "name", call.Name, "size", len(output))
|
||||||
|
output = persisted
|
||||||
|
}
|
||||||
|
|
||||||
|
// Emit tool result event for the UI
|
||||||
|
if cb != nil {
|
||||||
|
cb(stream.Event{
|
||||||
|
Type: stream.EventToolResult,
|
||||||
|
ToolName: call.Name,
|
||||||
|
ToolOutput: truncate(output, 2000),
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
return message.ToolResult{
|
||||||
|
ToolCallID: call.ID,
|
||||||
|
Content: output,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
func truncate(s string, maxLen int) string {
|
func truncate(s string, maxLen int) string {
|
||||||
if len(s) <= maxLen {
|
if len(s) <= maxLen {
|
||||||
return s
|
return s
|
||||||
|
|||||||
136
internal/tool/agent/agent.go
Normal file
136
internal/tool/agent/agent.go
Normal file
@@ -0,0 +1,136 @@
|
|||||||
|
package agent
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"encoding/json"
|
||||||
|
"fmt"
|
||||||
|
"strings"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"somegit.dev/Owlibou/gnoma/internal/elf"
|
||||||
|
"somegit.dev/Owlibou/gnoma/internal/router"
|
||||||
|
"somegit.dev/Owlibou/gnoma/internal/tool"
|
||||||
|
)
|
||||||
|
|
||||||
|
var paramSchema = json.RawMessage(`{
|
||||||
|
"type": "object",
|
||||||
|
"properties": {
|
||||||
|
"prompt": {
|
||||||
|
"type": "string",
|
||||||
|
"description": "The task prompt for the sub-agent (elf)"
|
||||||
|
},
|
||||||
|
"task_type": {
|
||||||
|
"type": "string",
|
||||||
|
"description": "Task type hint for provider routing",
|
||||||
|
"enum": ["generation", "review", "refactor", "debug", "explain", "planning"]
|
||||||
|
},
|
||||||
|
"wait": {
|
||||||
|
"type": "boolean",
|
||||||
|
"description": "Wait for the elf to complete (default true)"
|
||||||
|
}
|
||||||
|
},
|
||||||
|
"required": ["prompt"]
|
||||||
|
}`)
|
||||||
|
|
||||||
|
// Tool allows the LLM to spawn sub-agents (elfs).
|
||||||
|
type Tool struct {
|
||||||
|
manager *elf.Manager
|
||||||
|
}
|
||||||
|
|
||||||
|
func New(mgr *elf.Manager) *Tool {
|
||||||
|
return &Tool{manager: mgr}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (t *Tool) Name() string { return "agent" }
|
||||||
|
func (t *Tool) Description() string { return "Spawn a sub-agent (elf) to handle a task independently. The elf gets its own conversation and tools." }
|
||||||
|
func (t *Tool) Parameters() json.RawMessage { return paramSchema }
|
||||||
|
func (t *Tool) IsReadOnly() bool { return true }
|
||||||
|
func (t *Tool) IsDestructive() bool { return false }
|
||||||
|
|
||||||
|
type agentArgs struct {
|
||||||
|
Prompt string `json:"prompt"`
|
||||||
|
TaskType string `json:"task_type,omitempty"`
|
||||||
|
Wait *bool `json:"wait,omitempty"`
|
||||||
|
}
|
||||||
|
|
||||||
|
func (t *Tool) Execute(ctx context.Context, args json.RawMessage) (tool.Result, error) {
|
||||||
|
var a agentArgs
|
||||||
|
if err := json.Unmarshal(args, &a); err != nil {
|
||||||
|
return tool.Result{}, fmt.Errorf("agent: invalid args: %w", err)
|
||||||
|
}
|
||||||
|
if a.Prompt == "" {
|
||||||
|
return tool.Result{}, fmt.Errorf("agent: prompt required")
|
||||||
|
}
|
||||||
|
|
||||||
|
taskType := parseTaskType(a.TaskType)
|
||||||
|
wait := true
|
||||||
|
if a.Wait != nil {
|
||||||
|
wait = *a.Wait
|
||||||
|
}
|
||||||
|
|
||||||
|
systemPrompt := "You are an elf — a focused sub-agent of gnoma. Complete the given task thoroughly and concisely. Use tools as needed."
|
||||||
|
|
||||||
|
e, err := t.manager.Spawn(ctx, taskType, a.Prompt, systemPrompt)
|
||||||
|
if err != nil {
|
||||||
|
return tool.Result{Output: fmt.Sprintf("Failed to spawn elf: %v", err)}, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
if !wait {
|
||||||
|
return tool.Result{
|
||||||
|
Output: fmt.Sprintf("Elf %s spawned in background (task: %s)", e.ID(), taskType),
|
||||||
|
Metadata: map[string]any{"elf_id": e.ID(), "background": true},
|
||||||
|
}, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// Wait with timeout
|
||||||
|
done := make(chan elf.Result, 1)
|
||||||
|
go func() { done <- e.Wait() }()
|
||||||
|
|
||||||
|
var result elf.Result
|
||||||
|
select {
|
||||||
|
case result = <-done:
|
||||||
|
case <-ctx.Done():
|
||||||
|
e.Cancel()
|
||||||
|
return tool.Result{Output: "Elf cancelled"}, nil
|
||||||
|
case <-time.After(5 * time.Minute):
|
||||||
|
e.Cancel()
|
||||||
|
return tool.Result{Output: "Elf timed out after 5 minutes"}, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
var b strings.Builder
|
||||||
|
fmt.Fprintf(&b, "Elf %s completed (%s, %s)\n\n", result.ID, result.Status, result.Duration.Round(time.Millisecond))
|
||||||
|
if result.Error != nil {
|
||||||
|
fmt.Fprintf(&b, "Error: %v\n", result.Error)
|
||||||
|
}
|
||||||
|
if result.Output != "" {
|
||||||
|
b.WriteString(result.Output)
|
||||||
|
}
|
||||||
|
|
||||||
|
return tool.Result{
|
||||||
|
Output: b.String(),
|
||||||
|
Metadata: map[string]any{
|
||||||
|
"elf_id": result.ID,
|
||||||
|
"status": result.Status.String(),
|
||||||
|
"duration": result.Duration.String(),
|
||||||
|
},
|
||||||
|
}, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func parseTaskType(s string) router.TaskType {
|
||||||
|
switch strings.ToLower(s) {
|
||||||
|
case "generation":
|
||||||
|
return router.TaskGeneration
|
||||||
|
case "review":
|
||||||
|
return router.TaskReview
|
||||||
|
case "refactor":
|
||||||
|
return router.TaskRefactor
|
||||||
|
case "debug":
|
||||||
|
return router.TaskDebug
|
||||||
|
case "explain":
|
||||||
|
return router.TaskExplain
|
||||||
|
case "planning":
|
||||||
|
return router.TaskPlanning
|
||||||
|
default:
|
||||||
|
return router.TaskGeneration
|
||||||
|
}
|
||||||
|
}
|
||||||
@@ -397,6 +397,13 @@ func (m Model) handleCommand(cmd string) (tea.Model, tea.Cmd) {
|
|||||||
m.messages = append(m.messages, chatMessage{role: "system", content: b.String()})
|
m.messages = append(m.messages, chatMessage{role: "system", content: b.String()})
|
||||||
return m, nil
|
return m, nil
|
||||||
|
|
||||||
|
case "/elf", "/elfs":
|
||||||
|
if args == "" {
|
||||||
|
m.messages = append(m.messages, chatMessage{role: "system",
|
||||||
|
content: "Elfs are spawned by the LLM via the 'agent' tool.\nAsk the model to use sub-agents for parallel tasks.\n\nExample: \"Research these 3 files in parallel using sub-agents\""})
|
||||||
|
}
|
||||||
|
return m, nil
|
||||||
|
|
||||||
case "/shell":
|
case "/shell":
|
||||||
m.messages = append(m.messages, chatMessage{role: "system",
|
m.messages = append(m.messages, chatMessage{role: "system",
|
||||||
content: "interactive shell not yet implemented\nFor now, use ! prefix in your terminal: ! sudo command"})
|
content: "interactive shell not yet implemented\nFor now, use ! prefix in your terminal: ! sudo command"})
|
||||||
|
|||||||
Reference in New Issue
Block a user