• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

smallnest / goclaw / 21978860214

13 Feb 2026 07:45AM UTC coverage: 5.772% (+0.008%) from 5.764%
21978860214

push

github

chaoyuepan
improve web fetch

4 of 24 new or added lines in 10 files covered. (16.67%)

221 existing lines in 4 files now uncovered.

1517 of 26284 relevant lines covered (5.77%)

0.55 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

0.0
/agent/agent.go
1
package agent
2

3
import (
4
        "context"
5
        "fmt"
6
        "sync"
7
        "time"
8

9
        "github.com/smallnest/goclaw/bus"
10
        "github.com/smallnest/goclaw/internal/logger"
11
        "github.com/smallnest/goclaw/providers"
12
        "github.com/smallnest/goclaw/session"
13
        "go.uber.org/zap"
14
)
15

16
// Agent represents the main AI agent
17
// New implementation inspired by pi-mono architecture
18
type Agent struct {
19
        orchestrator *Orchestrator
20
        bus          *bus.MessageBus
21
        provider     providers.Provider
22
        sessionMgr   *session.Manager
23
        tools        *ToolRegistry
24
        context      *ContextBuilder
25
        workspace    string
26
        skillsLoader *SkillsLoader
27

28
        mu        sync.RWMutex
29
        state     *AgentState
30
        eventSubs []chan *Event
31
        running   bool
32
}
33

34
// NewAgentConfig configures the agent
35
type NewAgentConfig struct {
36
        Bus          *bus.MessageBus
37
        Provider     providers.Provider
38
        SessionMgr   *session.Manager
39
        Tools        *ToolRegistry
40
        Context      *ContextBuilder
41
        Workspace    string
42
        MaxIteration int
43
        SkillsLoader *SkillsLoader
44
}
45

46
// NewAgent creates a new agent
47
func NewAgent(cfg *NewAgentConfig) (*Agent, error) {
×
48
        if cfg == nil {
×
49
                return nil, fmt.Errorf("config cannot be nil")
×
50
        }
×
51

52
        if cfg.MaxIteration <= 0 {
×
53
                cfg.MaxIteration = 15
×
54
        }
×
55

56
        state := NewAgentState()
×
57
        state.SystemPrompt = cfg.Context.BuildSystemPrompt(nil)
×
58
        state.Model = getModelName(cfg.Provider)
×
59
        state.Provider = "provider"
×
60
        state.SessionKey = "main"
×
61
        state.Tools = ToAgentTools(cfg.Tools.ListExisting())
×
62
        state.LoadedSkills = []string{} // Initialize with empty loaded skills
×
63

×
64
        // Load skills list
×
65
        var skills []*Skill
×
66
        if cfg.SkillsLoader != nil {
×
67
                if err := cfg.SkillsLoader.Discover(); err == nil {
×
68
                        skills = cfg.SkillsLoader.List()
×
69
                        logger.Info("Skills discovered for agent",
×
70
                                zap.Int("count", len(skills)))
×
71
                } else {
×
72
                        logger.Warn("Failed to discover skills", zap.Error(err))
×
73
                }
×
74
        }
75

76
        loopConfig := &LoopConfig{
×
77
                Model:            state.Model,
×
78
                Provider:         cfg.Provider,
×
79
                SessionMgr:       cfg.SessionMgr,
×
80
                MaxIterations:    cfg.MaxIteration,
×
81
                ConvertToLLM:     defaultConvertToLLM,
×
82
                TransformContext: nil,
×
83
                Skills:           skills,
×
84
                LoadedSkills:     state.LoadedSkills,
×
85
                ContextBuilder:   cfg.Context,
×
86
                GetSteeringMessages: func() ([]AgentMessage, error) {
×
87
                        state := state // Capture state
×
88
                        return state.DequeueSteeringMessages(), nil
×
89
                },
×
90
                GetFollowUpMessages: func() ([]AgentMessage, error) {
×
91
                        state := state // Capture state
×
92
                        return state.DequeueFollowUpMessages(), nil
×
93
                },
×
94
        }
95

96
        orchestrator := NewOrchestrator(loopConfig, state)
×
97

×
98
        return &Agent{
×
99
                orchestrator: orchestrator,
×
100
                bus:          cfg.Bus,
×
101
                provider:     cfg.Provider,
×
102
                sessionMgr:   cfg.SessionMgr,
×
103
                tools:        cfg.Tools,
×
104
                context:      cfg.Context,
×
105
                workspace:    cfg.Workspace,
×
106
                skillsLoader: cfg.SkillsLoader,
×
107
                state:        state,
×
108
                eventSubs:    make([]chan *Event, 0),
×
109
                running:      false,
×
110
        }, nil
×
111
}
112

113
// Start starts the agent loop
114
func (a *Agent) Start(ctx context.Context) error {
×
115
        a.mu.Lock()
×
116
        if a.running {
×
117
                a.mu.Unlock()
×
118
                return fmt.Errorf("agent already running")
×
119
        }
×
120
        a.running = true
×
121
        a.mu.Unlock()
×
122

×
123
        logger.Info("Starting agent loop")
×
124

×
125
        // Start event dispatcher
×
126
        go a.dispatchEvents(ctx)
×
127

×
128
        // Start message processor
×
129
        go a.processMessages(ctx)
×
130

×
131
        return nil
×
132
}
133

134
// Stop stops the agent
135
func (a *Agent) Stop() error {
×
136
        a.mu.Lock()
×
137
        a.running = false
×
138
        a.mu.Unlock()
×
139

×
140
        logger.Info("Stopping agent")
×
141
        a.orchestrator.Stop()
×
142
        return nil
×
143
}
×
144

145
// Prompt sends a user message to the agent
146
func (a *Agent) Prompt(ctx context.Context, content string) error {
×
147
        a.mu.Lock()
×
148
        defer a.mu.Unlock()
×
149

×
150
        msg := AgentMessage{
×
151
                Role:      RoleUser,
×
152
                Content:   []ContentBlock{TextContent{Text: content}},
×
153
                Timestamp: time.Now().UnixMilli(),
×
154
        }
×
155

×
156
        // Run orchestrator
×
157
        finalMessages, err := a.orchestrator.Run(ctx, []AgentMessage{msg})
×
158
        if err != nil {
×
159
                logger.Error("Agent execution failed", zap.Error(err))
×
160
                return err
×
161
        }
×
162

163
        // Update state
164
        a.mu.Lock()
×
165
        a.state.Messages = finalMessages
×
166
        a.mu.Unlock()
×
167

×
168
        // Publish final response
×
169
        if len(finalMessages) > 0 {
×
170
                lastMsg := finalMessages[len(finalMessages)-1]
×
171
                if lastMsg.Role == RoleAssistant {
×
172
                        a.publishResponse(ctx, lastMsg)
×
173
                }
×
174
        }
175

176
        return nil
×
177
}
178

179
// processMessages processes inbound messages from the bus
180
func (a *Agent) processMessages(ctx context.Context) {
×
181
        for a.isRunning() {
×
182
                select {
×
183
                case <-ctx.Done():
×
184
                        logger.Info("Message processor stopped")
×
185
                        return
×
186

187
                default:
×
188
                        msg, err := a.bus.ConsumeInbound(ctx)
×
189
                        if err != nil {
×
190
                                if err == context.DeadlineExceeded || err == context.Canceled {
×
191
                                        continue
×
192
                                }
193
                                logger.Error("Failed to consume inbound", zap.Error(err))
×
194
                                continue
×
195
                        }
196

197
                        a.handleInboundMessage(ctx, msg)
×
198
                }
199
        }
200
}
201

202
// handleInboundMessage processes a single inbound message
203
func (a *Agent) handleInboundMessage(ctx context.Context, msg *bus.InboundMessage) {
×
204
        logger.Info("Processing inbound message",
×
205
                zap.String("channel", msg.Channel),
×
206
                zap.String("chat_id", msg.ChatID),
×
207
        )
×
208

×
209
        // Generate fresh session key with timestamp for new sessions
×
210
        sessionKey := msg.SessionKey()
×
211
        if msg.ChatID == "default" || msg.ChatID == "" {
×
212
                // For CLI/default chat, always create a fresh session with timestamp
×
213
                sessionKey = fmt.Sprintf("%s:%d", msg.Channel, time.Now().Unix())
×
214
                logger.Info("Creating fresh session", zap.String("session_key", sessionKey))
×
215
        }
×
216

217
        // Get or create session
218
        sess, err := a.sessionMgr.GetOrCreate(sessionKey)
×
219
        if err != nil {
×
220
                logger.Error("Failed to get session", zap.Error(err))
×
221
                return
×
222
        }
×
223

224
        // Convert to agent message
225
        agentMsg := AgentMessage{
×
226
                Role:      RoleUser,
×
227
                Content:   []ContentBlock{TextContent{Text: msg.Content}},
×
228
                Timestamp: msg.Timestamp.UnixMilli(),
×
229
        }
×
230

×
231
        // Add media as image content
×
232
        for _, m := range msg.Media {
×
233
                if m.Type == "image" {
×
234
                        imgContent := ImageContent{
×
235
                                URL:      m.URL,
×
236
                                Data:     m.Base64,
×
237
                                MimeType: m.MimeType,
×
238
                        }
×
239
                        agentMsg.Content = append(agentMsg.Content, imgContent)
×
240
                }
×
241
        }
242

243
        // Run agent
244
        finalMessages, err := a.orchestrator.Run(ctx, []AgentMessage{agentMsg})
×
245
        if err != nil {
×
246
                logger.Error("Agent execution failed", zap.Error(err))
×
247

×
248
                // Send error response
×
249
                a.publishError(ctx, msg.Channel, msg.ChatID, err)
×
250
                return
×
251
        }
×
252

253
        // Update session
254
        a.updateSession(sess, finalMessages)
×
255

×
256
        // Publish response
×
257
        if len(finalMessages) > 0 {
×
258
                lastMsg := finalMessages[len(finalMessages)-1]
×
259
                if lastMsg.Role == RoleAssistant {
×
260
                        a.publishToBus(ctx, msg.Channel, msg.ChatID, lastMsg)
×
261
                }
×
262
        }
263
}
264

265
// updateSession updates the session with new messages
266
func (a *Agent) updateSession(sess *session.Session, messages []AgentMessage) {
×
267
        for _, msg := range messages {
×
268
                sessMsg := session.Message{
×
269
                        Role:      string(msg.Role),
×
270
                        Content:   extractTextContent(msg),
×
271
                        Timestamp: time.Unix(extractTimestamp(msg)/1000, 0),
×
272
                }
×
273

×
274
                // Handle tool calls
×
275
                if msg.Role == RoleAssistant {
×
276
                        for _, block := range msg.Content {
×
277
                                if tc, ok := block.(ToolCallContent); ok {
×
278
                                        sessMsg.ToolCalls = append(sessMsg.ToolCalls, session.ToolCall{
×
279
                                                ID:     tc.ID,
×
280
                                                Name:   tc.Name,
×
281
                                                Params: tc.Arguments,
×
282
                                        })
×
283
                                }
×
284
                        }
285
                }
286

287
                // Handle tool results
UNCOV
288
                if msg.Role == RoleToolResult {
×
UNCOV
289
                        if id, ok := msg.Metadata["tool_call_id"].(string); ok {
×
290
                                sessMsg.ToolCallID = id
×
291
                        }
×
292
                        // Preserve tool_name in metadata for validation
293
                        if toolName, ok := msg.Metadata["tool_name"].(string); ok {
×
UNCOV
294
                                if sessMsg.Metadata == nil {
×
UNCOV
295
                                        sessMsg.Metadata = make(map[string]interface{})
×
296
                                }
×
UNCOV
297
                                sessMsg.Metadata["tool_name"] = toolName
×
298
                        }
299
                }
300

301
                sess.AddMessage(sessMsg)
×
302
        }
303

UNCOV
304
        if err := a.sessionMgr.Save(sess); err != nil {
×
305
                logger.Error("Failed to save session", zap.Error(err))
×
306
        }
×
307
}
308

309
// publishResponse publishes the agent response to the bus
310
func (a *Agent) publishResponse(ctx context.Context, msg AgentMessage) {
×
311
        content := extractTextContent(msg)
×
312

×
313
        outbound := &bus.OutboundMessage{
×
314
                Channel:   a.GetCurrentChannel(),
×
315
                ChatID:    a.GetCurrentChatID(),
×
316
                Content:   content,
×
317
                Timestamp: time.Now(),
×
UNCOV
318
        }
×
UNCOV
319

×
UNCOV
320
        if err := a.bus.PublishOutbound(ctx, outbound); err != nil {
×
321
                logger.Error("Failed to publish outbound", zap.Error(err))
×
322
        }
×
323
}
324

325
// publishError publishes an error message
326
func (a *Agent) publishError(ctx context.Context, channel, chatID string, err error) {
×
327
        errorMsg := fmt.Sprintf("An error occurred: %v", err)
×
328

×
329
        outbound := &bus.OutboundMessage{
×
330
                Channel:   channel,
×
331
                ChatID:    chatID,
×
332
                Content:   errorMsg,
×
UNCOV
333
                Timestamp: time.Now(),
×
UNCOV
334
        }
×
335

×
336
        _ = a.bus.PublishOutbound(ctx, outbound)
×
337
}
×
338

339
// publishToBus publishes a message to the bus
340
func (a *Agent) publishToBus(ctx context.Context, channel, chatID string, msg AgentMessage) {
×
341
        content := extractTextContent(msg)
×
342

×
343
        outbound := &bus.OutboundMessage{
×
344
                Channel:   channel,
×
345
                ChatID:    chatID,
×
346
                Content:   content,
×
347
                Timestamp: time.Now(),
×
UNCOV
348
        }
×
UNCOV
349

×
UNCOV
350
        if err := a.bus.PublishOutbound(ctx, outbound); err != nil {
×
351
                logger.Error("Failed to publish outbound", zap.Error(err))
×
352
        }
×
353
}
354

355
// Subscribe subscribes to agent events
356
func (a *Agent) Subscribe() <-chan *Event {
×
357
        ch := make(chan *Event, 10)
×
358

×
359
        a.mu.Lock()
×
UNCOV
360
        a.eventSubs = append(a.eventSubs, ch)
×
UNCOV
361
        a.mu.Unlock()
×
362

×
363
        return ch
×
364
}
×
365

366
// Unsubscribe removes an event subscription
367
func (a *Agent) Unsubscribe(ch <-chan *Event) {
×
368
        a.mu.Lock()
×
369
        defer a.mu.Unlock()
×
370

×
UNCOV
371
        for i, sub := range a.eventSubs {
×
UNCOV
372
                if sub == ch {
×
UNCOV
373
                        a.eventSubs = append(a.eventSubs[:i], a.eventSubs[i+1:]...)
×
UNCOV
374
                        // Don't close receive-only channel
×
UNCOV
375
                        break
×
376
                }
377
        }
378
}
379

380
// dispatchEvents sends events to all subscribers
381
func (a *Agent) dispatchEvents(ctx context.Context) {
×
382
        eventChan := a.orchestrator.Subscribe()
×
383

×
384
        for {
×
385
                select {
×
386
                case <-ctx.Done():
×
UNCOV
387
                        return
×
388
                case event, ok := <-eventChan:
×
389
                        if !ok {
×
390
                                return
×
391
                        }
×
392

393
                        a.mu.RLock()
×
394
                        subs := make([]chan *Event, len(a.eventSubs))
×
395
                        copy(subs, a.eventSubs)
×
396
                        a.mu.RUnlock()
×
UNCOV
397

×
UNCOV
398
                        for _, ch := range subs {
×
UNCOV
399
                                select {
×
UNCOV
400
                                case ch <- event:
×
UNCOV
401
                                default:
×
402
                                        // Channel full, skip
403
                                }
404
                        }
405
                }
406
        }
407
}
408

409
// isRunning checks if agent is running
UNCOV
410
func (a *Agent) isRunning() bool {
×
UNCOV
411
        a.mu.RLock()
×
412
        defer a.mu.RUnlock()
×
413
        return a.running
×
414
}
×
415

416
// GetState returns a copy of the current agent state
UNCOV
417
func (a *Agent) GetState() *AgentState {
×
UNCOV
418
        a.mu.RLock()
×
419
        defer a.mu.RUnlock()
×
420
        return a.state.Clone()
×
421
}
×
422

423
// SetSystemPrompt updates the system prompt
424
func (a *Agent) SetSystemPrompt(prompt string) {
×
UNCOV
425
        a.mu.Lock()
×
UNCOV
426
        defer a.mu.Unlock()
×
427

×
428
        a.state.SystemPrompt = prompt
×
429
}
×
430

431
// SetTools updates the available tools
432
func (a *Agent) SetTools(tools []Tool) {
×
UNCOV
433
        a.mu.Lock()
×
UNCOV
434
        defer a.mu.Unlock()
×
435

×
436
        a.state.Tools = tools
×
437
}
×
438

439
// GetCurrentChannel returns the current output channel
440
func (a *Agent) GetCurrentChannel() string {
×
441
        return "cli"
×
442
}
×
443

444
// GetCurrentChatID returns the current chat ID
445
func (a *Agent) GetCurrentChatID() string {
×
446
        return "main"
×
447
}
×
448

449
// GetOrchestrator 获取 orchestrator(供 AgentManager 使用)
UNCOV
450
func (a *Agent) GetOrchestrator() *Orchestrator {
×
UNCOV
451
        return a.orchestrator
×
452
}
×
453

454
// Helper functions
455

456
// getModelName extracts model name from provider
UNCOV
457
func getModelName(p providers.Provider) string {
×
458
        // This is a placeholder - actual implementation would depend on provider type
×
459
        return "default"
×
460
}
×
461

462
// defaultConvertToLLM converts agent messages to provider messages
463
func defaultConvertToLLM(messages []AgentMessage) ([]providers.Message, error) {
×
464
        result := make([]providers.Message, 0, len(messages))
×
UNCOV
465

×
UNCOV
466
        for _, msg := range messages {
×
467
                // Skip system messages
×
468
                if msg.Role == RoleSystem {
×
469
                        continue
×
470
                }
471

472
                providerMsg := providers.Message{
×
473
                        Role: string(msg.Role),
×
474
                }
×
475

×
476
                // Extract content
×
477
                for _, block := range msg.Content {
×
478
                        switch b := block.(type) {
×
479
                        case TextContent:
×
480
                                if providerMsg.Content != "" {
×
481
                                        providerMsg.Content += "\n" + b.Text
×
482
                                } else {
×
483
                                        providerMsg.Content = b.Text
×
484
                                }
×
485
                        case ImageContent:
×
UNCOV
486
                                if b.Data != "" {
×
UNCOV
487
                                        providerMsg.Images = append(providerMsg.Images, b.Data)
×
UNCOV
488
                                } else if b.URL != "" {
×
UNCOV
489
                                        providerMsg.Images = append(providerMsg.Images, b.URL)
×
490
                                }
×
491
                        }
492
                }
493

494
                // Handle tool calls
495
                if msg.Role == RoleAssistant {
×
496
                        var toolCalls []providers.ToolCall
×
497
                        for _, block := range msg.Content {
×
498
                                if tc, ok := block.(ToolCallContent); ok {
×
499
                                        toolCalls = append(toolCalls, providers.ToolCall{
×
UNCOV
500
                                                ID:     tc.ID,
×
501
                                                Name:   tc.Name,
×
UNCOV
502
                                                Params: convertMapAnyToInterface(tc.Arguments),
×
UNCOV
503
                                        })
×
UNCOV
504
                                }
×
505
                        }
506
                        providerMsg.ToolCalls = toolCalls
×
507
                }
508

509
                // Handle tool_call_id and tool_name for tool result messages
510
                if msg.Role == RoleToolResult {
×
511
                        if toolCallID, ok := msg.Metadata["tool_call_id"].(string); ok {
×
UNCOV
512
                                providerMsg.ToolCallID = toolCallID
×
UNCOV
513
                        }
×
514
                        if toolName, ok := msg.Metadata["tool_name"].(string); ok {
×
UNCOV
515
                                providerMsg.ToolName = toolName
×
UNCOV
516
                        }
×
517
                }
518

UNCOV
519
                result = append(result, providerMsg)
×
520
        }
521

522
        return result, nil
×
523
}
524

525
// convertMapAnyToInterface converts map[string]any to map[string]interface{}
526
func convertMapAnyToInterface(m map[string]any) map[string]interface{} {
×
UNCOV
527
        result := make(map[string]interface{})
×
UNCOV
528
        for k, v := range m {
×
UNCOV
529
                result[k] = v
×
530
        }
×
531
        return result
×
532
}
533

534
// extractTextContent extracts text from content blocks
UNCOV
535
func extractTextContent(msg AgentMessage) string {
×
536
        for _, block := range msg.Content {
×
UNCOV
537
                if text, ok := block.(TextContent); ok {
×
UNCOV
538
                        return text.Text
×
UNCOV
539
                }
×
540
        }
541
        return ""
×
542
}
543

544
// extractTimestamp extracts timestamp from message
UNCOV
545
func extractTimestamp(msg AgentMessage) int64 {
×
UNCOV
546
        if msg.Timestamp > 0 {
×
UNCOV
547
                return msg.Timestamp
×
UNCOV
548
        }
×
UNCOV
549
        return time.Now().UnixMilli()
×
550
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc