• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

smallnest / goclaw / 21978860214

13 Feb 2026 07:45AM UTC coverage: 5.772% (+0.008%) from 5.764%
21978860214

push

github

chaoyuepan
improve web fetch

4 of 24 new or added lines in 10 files covered. (16.67%)

221 existing lines in 4 files now uncovered.

1517 of 26284 relevant lines covered (5.77%)

0.55 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

0.0
/agent/orchestrator.go
1
package agent
2

3
import (
4
        "context"
5
        "fmt"
6
        "strings"
7
        "time"
8

9
        "github.com/smallnest/goclaw/internal/logger"
10
        "github.com/smallnest/goclaw/providers"
11
        "go.uber.org/zap"
12
)
13

14
// Orchestrator manages the agent execution loop
15
// Based on pi-mono's agent-loop.ts design
16
type Orchestrator struct {
17
        config     *LoopConfig
18
        state      *AgentState
19
        eventChan  chan *Event
20
        cancelFunc context.CancelFunc
21
}
22

23
// NewOrchestrator creates a new agent orchestrator
24
func NewOrchestrator(config *LoopConfig, initialState *AgentState) *Orchestrator {
×
25
        return &Orchestrator{
×
26
                config:    config,
×
27
                state:     initialState,
×
28
                eventChan: make(chan *Event, 100),
×
29
        }
×
30
}
×
31

32
// Run starts the agent loop with initial prompts
33
func (o *Orchestrator) Run(ctx context.Context, prompts []AgentMessage) ([]AgentMessage, error) {
×
34
        logger.Info("=== Orchestrator Run Start ===",
×
35
                zap.Int("prompts_count", len(prompts)))
×
36

×
37
        ctx, cancel := context.WithCancel(ctx)
×
38
        o.cancelFunc = cancel
×
39

×
40
        // Initialize state with prompts
×
41
        newMessages := make([]AgentMessage, len(prompts))
×
42
        copy(newMessages, prompts)
×
43
        currentState := o.state.Clone()
×
44
        currentState.AddMessages(newMessages)
×
45

×
46
        // Emit start event
×
47
        o.emit(NewEvent(EventAgentStart))
×
48

×
49
        // Main loop
×
50
        finalMessages, err := o.runLoop(ctx, currentState)
×
51

×
52
        logger.Info("=== Orchestrator Run End ===",
×
53
                zap.Int("final_messages_count", len(finalMessages)),
×
54
                zap.Error(err))
×
55

×
56
        // Emit end event
×
57
        endEvent := NewEvent(EventAgentEnd)
×
58
        if finalMessages != nil {
×
59
                endEvent = NewEvent(EventAgentEnd).WithFinalMessages(finalMessages)
×
60
        }
×
61
        o.emit(endEvent)
×
62

×
63
        cancel()
×
64
        if err != nil {
×
65
                return nil, fmt.Errorf("agent loop failed: %w", err)
×
66
        }
×
67

68
        return finalMessages, nil
×
69
}
70

71
// runLoop implements the main agent loop logic
72
func (o *Orchestrator) runLoop(ctx context.Context, state *AgentState) ([]AgentMessage, error) {
×
73
        firstTurn := true
×
74

×
75
        // Check for steering messages at start
×
76
        pendingMessages := o.fetchSteeringMessages()
×
77

×
78
        // Outer loop: continues when queued follow-up messages arrive
×
79
        for {
×
80
                hasMoreToolCalls := true
×
81
                steeringAfterTools := false
×
82

×
83
                // Inner loop: process tool calls and steering messages
×
84
                for hasMoreToolCalls || len(pendingMessages) > 0 {
×
85
                        if !firstTurn {
×
86
                                o.emit(NewEvent(EventTurnStart))
×
87
                        } else {
×
88
                                firstTurn = false
×
89
                        }
×
90

91
                        // Process pending messages (inject before next assistant response)
92
                        if len(pendingMessages) > 0 {
×
93
                                for _, msg := range pendingMessages {
×
94
                                        o.emit(NewEvent(EventMessageStart))
×
95
                                        state.AddMessage(msg)
×
96
                                        o.emit(NewEvent(EventMessageEnd))
×
97
                                }
×
98
                                pendingMessages = []AgentMessage{}
×
99
                        }
100

101
                        // Stream assistant response
102
                        assistantMsg, err := o.streamAssistantResponse(ctx, state)
×
103
                        if err != nil {
×
104
                                o.emitErrorEnd(state, err)
×
105
                                return state.Messages, err
×
106
                        }
×
107

108
                        state.AddMessage(assistantMsg)
×
109

×
110
                        // Check for tool calls
×
111
                        toolCalls := extractToolCalls(assistantMsg)
×
112
                        hasMoreToolCalls = len(toolCalls) > 0
×
113

×
114
                        if hasMoreToolCalls {
×
115
                                results, steering := o.executeToolCalls(ctx, toolCalls, state)
×
116
                                steeringAfterTools = len(steering) > 0
×
117

×
118
                                // Add tool result messages
×
119
                                for _, result := range results {
×
120
                                        state.AddMessage(result)
×
121
                                }
×
122

123
                                // If steering messages arrived, skip remaining tools
124
                                if steeringAfterTools {
×
125
                                        pendingMessages = steering
×
126
                                        break
×
127
                                }
128
                        }
129

130
                        o.emit(NewEvent(EventTurnEnd))
×
131

×
132
                        // Get steering messages after turn completes
×
133
                        if !steeringAfterTools && len(pendingMessages) == 0 {
×
134
                                pendingMessages = o.fetchSteeringMessages()
×
135
                        }
×
136
                }
137

138
                // Agent would stop here. Check for follow-up messages.
139
                followUpMessages := o.fetchFollowUpMessages()
×
140
                if len(followUpMessages) > 0 {
×
NEW
141
                        pendingMessages = append(pendingMessages, followUpMessages...)
×
142
                        continue
×
143
                }
144

145
                // No more messages, exit
146
                break
×
147
        }
148

149
        return state.Messages, nil
×
150
}
151

152
// streamAssistantResponse calls the LLM and streams the response
153
func (o *Orchestrator) streamAssistantResponse(ctx context.Context, state *AgentState) (AgentMessage, error) {
×
154
        logger.Info("=== streamAssistantResponse Start ===",
×
155
                zap.Int("message_count", len(state.Messages)),
×
156
                zap.Strings("loaded_skills", state.LoadedSkills))
×
157

×
158
        state.IsStreaming = true
×
159
        defer func() { state.IsStreaming = false }()
×
160

161
        // Apply context transform if configured
162
        messages := state.Messages
×
163
        if o.config.TransformContext != nil {
×
164
                transformed, err := o.config.TransformContext(messages)
×
165
                if err == nil {
×
166
                        messages = transformed
×
167
                } else {
×
168
                        logger.Warn("Context transform failed, using original", zap.Error(err))
×
169
                }
×
170
        }
171

172
        // Convert to provider messages
173
        var providerMsgs []providers.Message
×
174
        if o.config.ConvertToLLM != nil {
×
175
                converted, err := o.config.ConvertToLLM(messages)
×
176
                if err != nil {
×
177
                        return AgentMessage{}, fmt.Errorf("convert to LLM failed: %w", err)
×
178
                }
×
179
                providerMsgs = converted
×
180
        } else {
×
181
                // Default conversion
×
182
                providerMsgs = convertToProviderMessages(messages)
×
183
        }
×
184

185
        // Prepare tool definitions
NEW
186
        toolDefs := convertToToolDefinitions(state.Tools)
×
187

×
188
        // Emit message start
×
189
        o.emit(NewEvent(EventMessageStart))
×
190

×
191
        // Call provider with system prompt as first message
×
192
        fullMessages := []providers.Message{}
×
193

×
194
        // Build system prompt with skills if context builder is available
×
195
        if o.config.ContextBuilder != nil {
×
196
                skillsContent := ""
×
197
                if len(state.LoadedSkills) > 0 {
×
198
                        // Second phase: inject full content of loaded skills
×
199
                        skillsContent = o.config.ContextBuilder.buildSelectedSkills(state.LoadedSkills, o.config.Skills)
×
200
                } else if len(o.config.Skills) > 0 {
×
201
                        // First phase: inject skill summary (available skills list)
×
202
                        skillsContent = o.config.ContextBuilder.buildSkillsPrompt(o.config.Skills, PromptModeFull)
×
203
                }
×
204
                systemPrompt := o.config.ContextBuilder.buildSystemPromptWithSkills(skillsContent, PromptModeFull)
×
205
                fullMessages = append(fullMessages, providers.Message{
×
206
                        Role:    "system",
×
207
                        Content: systemPrompt,
×
208
                })
×
209
        } else if state.SystemPrompt != "" {
×
210
                // Fallback to stored system prompt
×
211
                fullMessages = append(fullMessages, providers.Message{
×
212
                        Role:    "system",
×
213
                        Content: state.SystemPrompt,
×
214
                })
×
215
        }
×
216
        fullMessages = append(fullMessages, providerMsgs...)
×
217

×
218
        logger.Info("=== Calling LLM ===",
×
219
                zap.Int("messages_count", len(fullMessages)),
×
220
                zap.Int("tools_count", len(toolDefs)),
×
221
                zap.Bool("has_loaded_skills", len(state.LoadedSkills) > 0))
×
222

×
223
        response, err := o.config.Provider.Chat(ctx, fullMessages, toolDefs)
×
224
        if err != nil {
×
225
                logger.Error("LLM call failed", zap.Error(err))
×
226
                return AgentMessage{}, fmt.Errorf("LLM call failed: %w", err)
×
227
        }
×
228

229
        logger.Info("=== LLM Response Received ===",
×
230
                zap.Int("content_length", len(response.Content)),
×
231
                zap.Int("tool_calls_count", len(response.ToolCalls)),
×
232
                zap.String("content_preview", truncateString(response.Content, 200)))
×
233

×
234
        // Emit message end
×
235
        o.emit(NewEvent(EventMessageEnd))
×
236

×
237
        // Convert response to agent message
×
238
        assistantMsg := convertFromProviderResponse(response)
×
239

×
240
        logger.Info("=== streamAssistantResponse End ===",
×
241
                zap.Bool("has_tool_calls", len(response.ToolCalls) > 0),
×
242
                zap.Int("tool_calls_count", len(response.ToolCalls)))
×
243

×
244
        return assistantMsg, nil
×
245
}
246

247
// executeToolCalls executes tool calls with interruption support
248
func (o *Orchestrator) executeToolCalls(ctx context.Context, toolCalls []ToolCallContent, state *AgentState) ([]AgentMessage, []AgentMessage) {
×
249
        results := make([]AgentMessage, 0, len(toolCalls))
×
250

×
251
        logger.Info("=== Execute Tool Calls Start ===",
×
252
                zap.Int("count", len(toolCalls)))
×
253
        for _, tc := range toolCalls {
×
254
                logger.Info("Tool call start",
×
255
                        zap.String("tool_id", tc.ID),
×
256
                        zap.String("tool_name", tc.Name),
×
257
                        zap.Any("arguments", tc.Arguments))
×
258

×
259
                // Emit tool execution start
×
260
                o.emit(NewEvent(EventToolExecutionStart).WithToolExecution(tc.ID, tc.Name, tc.Arguments))
×
261

×
262
                // Find tool
×
263
                var tool Tool
×
264
                for _, t := range state.Tools {
×
265
                        if t.Name() == tc.Name {
×
266
                                tool = t
×
267
                                break
×
268
                        }
269
                }
270

271
                var result ToolResult
×
272
                var err error
×
273

×
274
                if tool == nil {
×
275
                        err = fmt.Errorf("tool %s not found", tc.Name)
×
276
                        result = ToolResult{
×
277
                                Content: []ContentBlock{TextContent{Text: fmt.Sprintf("Tool not found: %s", tc.Name)}},
×
278
                                Details: map[string]any{"error": err.Error()},
×
279
                        }
×
280
                        logger.Error("Tool not found",
×
281
                                zap.String("tool_name", tc.Name),
×
282
                                zap.String("tool_id", tc.ID))
×
283
                } else {
×
284
                        state.AddPendingTool(tc.ID)
×
285

×
286
                        // Execute tool with streaming support
×
287
                        result, err = tool.Execute(ctx, tc.Arguments, func(partial ToolResult) {
×
288
                                // Emit update event
×
289
                                o.emit(NewEvent(EventToolExecutionUpdate).
×
290
                                        WithToolExecution(tc.ID, tc.Name, tc.Arguments).
×
291
                                        WithToolResult(&partial, false))
×
292
                        })
×
293

294
                        state.RemovePendingTool(tc.ID)
×
295
                }
296

297
                // Log tool execution result
298
                if err != nil {
×
299
                        logger.Error("Tool execution failed",
×
300
                                zap.String("tool_id", tc.ID),
×
301
                                zap.String("tool_name", tc.Name),
×
302
                                zap.Any("arguments", tc.Arguments),
×
303
                                zap.Error(err))
×
304
                } else {
×
305
                        // Extract content for logging
×
306
                        contentText := extractToolResultContent(result.Content)
×
307
                        logger.Info("Tool execution success",
×
308
                                zap.String("tool_id", tc.ID),
×
309
                                zap.String("tool_name", tc.Name),
×
310
                                zap.Any("arguments", tc.Arguments),
×
311
                                zap.Int("result_length", len(contentText)),
×
312
                                zap.String("result_preview", truncateString(contentText, 200)))
×
313
                }
×
314

315
                // Convert result to message
316
                resultMsg := AgentMessage{
×
317
                        Role:      RoleToolResult,
×
318
                        Content:   result.Content,
×
319
                        Timestamp: time.Now().UnixMilli(),
×
320
                        Metadata:  map[string]any{"tool_call_id": tc.ID, "tool_name": tc.Name},
×
321
                }
×
322

×
323
                if err != nil {
×
324
                        resultMsg.Metadata["error"] = err.Error()
×
325
                        result.Content = []ContentBlock{TextContent{Text: err.Error()}}
×
326
                }
×
327

328
                results = append(results, resultMsg)
×
329

×
330
                // Check for use_skill and update LoadedSkills
×
331
                if tc.Name == "use_skill" && err == nil {
×
332
                        if skillName, ok := tc.Arguments["skill_name"].(string); ok && skillName != "" {
×
333
                                // Add to LoadedSkills if not already present
×
334
                                alreadyLoaded := false
×
335
                                for _, loaded := range state.LoadedSkills {
×
336
                                        if loaded == skillName {
×
337
                                                alreadyLoaded = true
×
338
                                                break
×
339
                                        }
340
                                }
341
                                if !alreadyLoaded {
×
342
                                        state.LoadedSkills = append(state.LoadedSkills, skillName)
×
343
                                        logger.Info("=== Skill Loaded ===",
×
344
                                                zap.String("skill_name", skillName),
×
345
                                                zap.Int("total_loaded", len(state.LoadedSkills)),
×
346
                                                zap.Strings("loaded_skills", state.LoadedSkills))
×
347
                                }
×
348
                        }
349
                }
350

351
                // Emit tool execution end
352
                event := NewEvent(EventToolExecutionEnd).
×
353
                        WithToolExecution(tc.ID, tc.Name, tc.Arguments).
×
354
                        WithToolResult(&result, err != nil)
×
355
                o.emit(event)
×
356

×
357
                // Check for steering messages (interruption)
×
358
                steering := o.fetchSteeringMessages()
×
359
                if len(steering) > 0 {
×
360
                        return results, steering
×
361
                }
×
362
        }
363

364
        logger.Info("=== Execute Tool Calls End ===",
×
365
                zap.Int("count", len(results)))
×
366
        return results, nil
×
367
}
368

369
// emit sends an event to the event channel
370
func (o *Orchestrator) emit(event *Event) {
×
371
        if o.eventChan != nil {
×
372
                o.eventChan <- event
×
373
        }
×
374
}
375

376
// emitErrorEnd emits an error end event
377
func (o *Orchestrator) emitErrorEnd(state *AgentState, err error) {
×
378
        event := NewEvent(EventTurnEnd).WithStopReason(err.Error())
×
379
        o.emit(event)
×
380
}
×
381

382
// fetchSteeringMessages gets steering messages from config
383
func (o *Orchestrator) fetchSteeringMessages() []AgentMessage {
×
384
        if o.config.GetSteeringMessages != nil {
×
385
                msgs, _ := o.config.GetSteeringMessages()
×
386
                return msgs
×
387
        }
×
388
        // Fall back to state queue
389
        return o.state.DequeueSteeringMessages()
×
390
}
391

392
// fetchFollowUpMessages gets follow-up messages from config
393
func (o *Orchestrator) fetchFollowUpMessages() []AgentMessage {
×
394
        if o.config.GetFollowUpMessages != nil {
×
395
                msgs, _ := o.config.GetFollowUpMessages()
×
396
                return msgs
×
397
        }
×
398
        // Fall back to state queue
399
        return o.state.DequeueFollowUpMessages()
×
400
}
401

402
// Stop stops the orchestrator
403
func (o *Orchestrator) Stop() {
×
404
        if o.cancelFunc != nil {
×
405
                o.cancelFunc()
×
406
        }
×
407
        if o.eventChan != nil {
×
408
                close(o.eventChan)
×
409
        }
×
410
}
411

412
// Subscribe returns the event channel
413
func (o *Orchestrator) Subscribe() <-chan *Event {
×
414
        return o.eventChan
×
415
}
×
416

417
// Helper functions
418

419
// convertToProviderMessages converts agent messages to provider messages
420
func convertToProviderMessages(messages []AgentMessage) []providers.Message {
×
421
        result := make([]providers.Message, 0, len(messages))
×
422

×
423
        for _, msg := range messages {
×
424
                // Skip system messages
×
425
                if msg.Role == RoleSystem {
×
426
                        continue
×
427
                }
428

429
                providerMsg := providers.Message{
×
430
                        Role: string(msg.Role),
×
431
                }
×
432

×
433
                // Extract content
×
434
                for _, block := range msg.Content {
×
435
                        switch b := block.(type) {
×
436
                        case TextContent:
×
437
                                if providerMsg.Content != "" {
×
438
                                        providerMsg.Content += "\n" + b.Text
×
439
                                } else {
×
440
                                        providerMsg.Content = b.Text
×
441
                                }
×
442
                        case ImageContent:
×
443
                                if b.Data != "" {
×
444
                                        providerMsg.Images = append(providerMsg.Images, b.Data)
×
445
                                } else if b.URL != "" {
×
446
                                        providerMsg.Images = append(providerMsg.Images, b.URL)
×
447
                                }
×
448
                        }
449
                }
450

451
                // Handle tool calls for assistant messages
452
                if msg.Role == RoleAssistant {
×
453
                        var toolCalls []providers.ToolCall
×
454
                        for _, block := range msg.Content {
×
455
                                if tc, ok := block.(ToolCallContent); ok {
×
456
                                        toolCalls = append(toolCalls, providers.ToolCall{
×
457
                                                ID:     tc.ID,
×
458
                                                Name:   tc.Name,
×
459
                                                Params: convertMapAnyToInterface(tc.Arguments),
×
460
                                        })
×
461
                                }
×
462
                        }
463
                        providerMsg.ToolCalls = toolCalls
×
464
                }
465

466
                // Handle tool_call_id and tool_name for tool result messages
467
                if msg.Role == RoleToolResult {
×
468
                        if toolCallID, ok := msg.Metadata["tool_call_id"].(string); ok {
×
469
                                providerMsg.ToolCallID = toolCallID
×
470
                        }
×
471
                        if toolName, ok := msg.Metadata["tool_name"].(string); ok {
×
472
                                providerMsg.ToolName = toolName
×
473
                        }
×
474
                }
475

476
                result = append(result, providerMsg)
×
477
        }
478

479
        return result
×
480
}
481

482
// convertFromProviderResponse converts provider response to agent message
483
func convertFromProviderResponse(response *providers.Response) AgentMessage {
×
484
        content := []ContentBlock{TextContent{Text: response.Content}}
×
485

×
486
        // Handle tool calls
×
487
        for _, tc := range response.ToolCalls {
×
488
                content = append(content, ToolCallContent{
×
489
                        ID:        tc.ID,
×
490
                        Name:      tc.Name,
×
491
                        Arguments: convertInterfaceToAny(tc.Params),
×
492
                })
×
493
        }
×
494

495
        return AgentMessage{
×
496
                Role:      RoleAssistant,
×
497
                Content:   content,
×
498
                Timestamp: time.Now().UnixMilli(),
×
499
                Metadata:  map[string]any{"stop_reason": response.FinishReason},
×
500
        }
×
501
}
502

503
// convertToToolDefinitions converts agent tools to provider tool definitions
504
func convertToToolDefinitions(tools []Tool) []providers.ToolDefinition {
×
505
        result := make([]providers.ToolDefinition, 0, len(tools))
×
506

×
507
        for _, tool := range tools {
×
508
                result = append(result, providers.ToolDefinition{
×
509
                        Name:        tool.Name(),
×
510
                        Description: tool.Description(),
×
511
                        Parameters:  convertMapAnyToInterface(tool.Parameters()),
×
512
                })
×
513
        }
×
514

515
        return result
×
516
}
517

518
// extractToolCalls extracts tool calls from a message
519
func extractToolCalls(msg AgentMessage) []ToolCallContent {
×
520
        var toolCalls []ToolCallContent
×
521

×
522
        for _, block := range msg.Content {
×
523
                if tc, ok := block.(ToolCallContent); ok {
×
524
                        toolCalls = append(toolCalls, tc)
×
525
                }
×
526
        }
527

528
        return toolCalls
×
529
}
530

531
// convertInterfaceToAny converts map[string]interface{} to map[string]any
532
func convertInterfaceToAny(m map[string]interface{}) map[string]any {
×
533
        result := make(map[string]any)
×
534
        for k, v := range m {
×
535
                result[k] = v
×
536
        }
×
537
        return result
×
538
}
539

540
// extractToolResultContent extracts text content from tool result
541
func extractToolResultContent(content []ContentBlock) string {
×
542
        var result strings.Builder
×
543
        for _, block := range content {
×
544
                if text, ok := block.(TextContent); ok {
×
545
                        if result.Len() > 0 {
×
546
                                result.WriteString("\n")
×
547
                        }
×
548
                        result.WriteString(text.Text)
×
549
                }
550
        }
551
        return result.String()
×
552
}
553

554
// truncateString truncates a string to a maximum length
555
func truncateString(s string, maxLen int) string {
×
556
        if len(s) <= maxLen {
×
557
                return s
×
558
        }
×
559
        if maxLen > 3 {
×
560
                return s[:maxLen-3] + "..."
×
561
        }
×
562
        return s[:maxLen]
×
563
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc