• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

smallnest / goclaw / 21978860214

13 Feb 2026 07:45AM UTC coverage: 5.772% (+0.008%) from 5.764%
21978860214

push

github

chaoyuepan
improve web fetch

4 of 24 new or added lines in 10 files covered. (16.67%)

221 existing lines in 4 files now uncovered.

1517 of 26284 relevant lines covered (5.77%)

0.55 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

0.0
/agent/manager.go
1
package agent
2

3
import (
4
        "context"
5
        "fmt"
6
        "strings"
7
        "sync"
8
        "time"
9

10
        "github.com/smallnest/goclaw/agent/tools"
11
        "github.com/smallnest/goclaw/bus"
12
        "github.com/smallnest/goclaw/config"
13
        "github.com/smallnest/goclaw/internal/logger"
14
        "github.com/smallnest/goclaw/providers"
15
        "github.com/smallnest/goclaw/session"
16
        "go.uber.org/zap"
17
)
18

19
// AgentManager 管理多个 Agent 实例
20
type AgentManager struct {
21
        agents         map[string]*Agent        // agentID -> Agent
22
        bindings       map[string]*BindingEntry // channel:accountID -> BindingEntry
23
        defaultAgent   *Agent                   // 默认 Agent
24
        bus            *bus.MessageBus
25
        sessionMgr     *session.Manager
26
        provider       providers.Provider
27
        tools          *ToolRegistry
28
        mu             sync.RWMutex
29
        cfg            *config.Config
30
        contextBuilder *ContextBuilder
31
        skillsLoader   *SkillsLoader
32
        // 分身支持
33
        subagentRegistry  *SubagentRegistry
34
        subagentAnnouncer *SubagentAnnouncer
35
        dataDir           string
36
}
37

38
// BindingEntry Agent 绑定条目
39
type BindingEntry struct {
40
        AgentID   string
41
        Channel   string
42
        AccountID string
43
        Agent     *Agent
44
}
45

46
// NewAgentManagerConfig AgentManager 配置
47
type NewAgentManagerConfig struct {
48
        Bus            *bus.MessageBus
49
        Provider       providers.Provider
50
        SessionMgr     *session.Manager
51
        Tools          *ToolRegistry
52
        DataDir        string          // 数据目录,用于存储分身注册表
53
        ContextBuilder *ContextBuilder // 上下文构建器
54
        SkillsLoader   *SkillsLoader   // 技能加载器
55
}
56

57
// NewAgentManager 创建 Agent 管理器
58
func NewAgentManager(cfg *NewAgentManagerConfig) *AgentManager {
×
59
        // 创建分身注册表
×
60
        subagentRegistry := NewSubagentRegistry(cfg.DataDir)
×
61

×
62
        // 创建分身宣告器
×
63
        subagentAnnouncer := NewSubagentAnnouncer(nil) // 回调在 Start 中设置
×
64

×
65
        return &AgentManager{
×
66
                agents:            make(map[string]*Agent),
×
67
                bindings:          make(map[string]*BindingEntry),
×
68
                bus:               cfg.Bus,
×
69
                sessionMgr:        cfg.SessionMgr,
×
70
                provider:          cfg.Provider,
×
71
                tools:             cfg.Tools,
×
72
                subagentRegistry:  subagentRegistry,
×
73
                subagentAnnouncer: subagentAnnouncer,
×
74
                dataDir:           cfg.DataDir,
×
75
                contextBuilder:    cfg.ContextBuilder,
×
76
                skillsLoader:      cfg.SkillsLoader,
×
77
        }
×
78
}
×
79

80
// handleSubagentCompletion 处理分身完成事件
81
func (m *AgentManager) handleSubagentCompletion(runID string, record *SubagentRunRecord) {
×
82
        logger.Info("Subagent completed",
×
83
                zap.String("run_id", runID),
×
84
                zap.String("task", record.Task))
×
85

×
86
        // 启动宣告流程
×
87
        if record.Outcome != nil {
×
88
                announceParams := &SubagentAnnounceParams{
×
89
                        ChildSessionKey:     record.ChildSessionKey,
×
90
                        ChildRunID:          record.RunID,
×
91
                        RequesterSessionKey: record.RequesterSessionKey,
×
92
                        RequesterOrigin:     record.RequesterOrigin,
×
93
                        RequesterDisplayKey: record.RequesterDisplayKey,
×
94
                        Task:                record.Task,
×
95
                        Label:               record.Label,
×
96
                        StartedAt:           record.StartedAt,
×
97
                        EndedAt:             record.EndedAt,
×
98
                        Outcome:             record.Outcome,
×
99
                        Cleanup:             record.Cleanup,
×
100
                        AnnounceType:        SubagentAnnounceTypeTask,
×
101
                }
×
102

×
103
                if err := m.subagentAnnouncer.RunAnnounceFlow(announceParams); err != nil {
×
104
                        logger.Error("Failed to announce subagent result",
×
105
                                zap.String("run_id", runID),
×
106
                                zap.Error(err))
×
107
                }
×
108

109
                // 标记清理完成
110
                m.subagentRegistry.Cleanup(runID, record.Cleanup, true)
×
111
        }
112
}
113

114
// SetupFromConfig 从配置设置 Agent 和绑定
115
func (m *AgentManager) SetupFromConfig(cfg *config.Config, contextBuilder *ContextBuilder) error {
×
116
        m.mu.Lock()
×
117
        defer m.mu.Unlock()
×
118

×
119
        m.cfg = cfg
×
120
        m.contextBuilder = contextBuilder
×
121

×
122
        logger.Info("Setting up agents from config")
×
123

×
124
        // 1. 创建 Agent 实例
×
125
        for _, agentCfg := range cfg.Agents.List {
×
126
                if err := m.createAgent(agentCfg, contextBuilder, cfg); err != nil {
×
127
                        logger.Error("Failed to create agent",
×
128
                                zap.String("agent_id", agentCfg.ID),
×
129
                                zap.Error(err))
×
130
                        continue
×
131
                }
132
        }
133

134
        // 2. 如果没有配置 Agent,创建默认 Agent
135
        if len(m.agents) == 0 {
×
136
                logger.Info("No agents configured, creating default agent")
×
137
                defaultAgentCfg := config.AgentConfig{
×
138
                        ID:        "default",
×
139
                        Name:      "Default Agent",
×
140
                        Default:   true,
×
141
                        Model:     cfg.Agents.Defaults.Model,
×
142
                        Workspace: cfg.Workspace.Path,
×
143
                }
×
144
                if err := m.createAgent(defaultAgentCfg, contextBuilder, cfg); err != nil {
×
145
                        return fmt.Errorf("failed to create default agent: %w", err)
×
146
                }
×
147
        }
148

149
        // 3. 设置绑定
150
        for _, binding := range cfg.Bindings {
×
151
                if err := m.setupBinding(binding); err != nil {
×
152
                        logger.Error("Failed to setup binding",
×
153
                                zap.String("agent_id", binding.AgentID),
×
154
                                zap.String("channel", binding.Match.Channel),
×
155
                                zap.String("account_id", binding.Match.AccountID),
×
156
                                zap.Error(err))
×
157
                }
×
158
        }
159

160
        // 4. 设置分身支持
161
        m.setupSubagentSupport(cfg, contextBuilder)
×
162

×
163
        logger.Info("Agent manager setup complete",
×
164
                zap.Int("agents", len(m.agents)),
×
165
                zap.Int("bindings", len(m.bindings)))
×
166

×
167
        return nil
×
168
}
169

170
// setupSubagentSupport 设置分身支持
171
func (m *AgentManager) setupSubagentSupport(cfg *config.Config, contextBuilder *ContextBuilder) {
×
172
        // 加载分身注册表
×
173
        if err := m.subagentRegistry.LoadFromDisk(); err != nil {
×
174
                logger.Warn("Failed to load subagent registry", zap.Error(err))
×
175
        }
×
176

177
        // 设置分身运行完成回调
178
        m.subagentRegistry.SetOnRunComplete(func(runID string, record *SubagentRunRecord) {
×
179
                m.handleSubagentCompletion(runID, record)
×
180
        })
×
181

182
        // 更新宣告器回调
183
        m.subagentAnnouncer = NewSubagentAnnouncer(func(sessionKey, message string) error {
×
184
                // 发送宣告消息到指定会话
×
185
                return m.sendToSession(sessionKey, message)
×
186
        })
×
187

188
        // 创建分身注册表适配器
189
        registryAdapter := &subagentRegistryAdapter{registry: m.subagentRegistry}
×
190

×
191
        // 注册 sessions_spawn 工具
×
192
        spawnTool := tools.NewSubagentSpawnTool(registryAdapter)
×
193
        spawnTool.SetAgentConfigGetter(func(agentID string) *config.AgentConfig {
×
194
                for _, agentCfg := range cfg.Agents.List {
×
195
                        if agentCfg.ID == agentID {
×
196
                                return &agentCfg
×
197
                        }
×
198
                }
199
                return nil
×
200
        })
201
        spawnTool.SetDefaultConfigGetter(func() *config.AgentDefaults {
×
202
                return &cfg.Agents.Defaults
×
203
        })
×
204
        spawnTool.SetAgentIDGetter(func(sessionKey string) string {
×
205
                // 从会话密钥中解析 agent ID
×
206
                agentID, _, _ := ParseAgentSessionKey(sessionKey)
×
207
                if agentID == "" {
×
208
                        // 尝试从绑定中查找
×
209
                        for _, entry := range m.bindings {
×
210
                                if entry.Agent != nil {
×
211
                                        return entry.AgentID
×
212
                                }
×
213
                        }
214
                }
215
                return agentID
×
216
        })
217
        spawnTool.SetOnSpawn(func(result *tools.SubagentSpawnResult) error {
×
218
                return m.handleSubagentSpawn(result)
×
219
        })
×
220

221
        // 注册工具
222
        if err := m.tools.RegisterExisting(spawnTool); err != nil {
×
223
                logger.Error("Failed to register sessions_spawn tool", zap.Error(err))
×
224
        }
×
225

226
        logger.Info("Subagent support configured")
×
227
}
228

229
// subagentRegistryAdapter 分身注册表适配器
230
type subagentRegistryAdapter struct {
231
        registry *SubagentRegistry
232
}
233

234
// RegisterRun 注册分身运行
235
func (a *subagentRegistryAdapter) RegisterRun(params *tools.SubagentRunParams) error {
×
236
        // 转换 RequesterOrigin
×
237
        var requesterOrigin *DeliveryContext
×
238
        if params.RequesterOrigin != nil {
×
239
                requesterOrigin = &DeliveryContext{
×
240
                        Channel:   params.RequesterOrigin.Channel,
×
241
                        AccountID: params.RequesterOrigin.AccountID,
×
242
                        To:        params.RequesterOrigin.To,
×
243
                        ThreadID:  params.RequesterOrigin.ThreadID,
×
244
                }
×
245
        }
×
246

247
        return a.registry.RegisterRun(&SubagentRunParams{
×
248
                RunID:               params.RunID,
×
249
                ChildSessionKey:     params.ChildSessionKey,
×
250
                RequesterSessionKey: params.RequesterSessionKey,
×
251
                RequesterOrigin:     requesterOrigin,
×
252
                RequesterDisplayKey: params.RequesterDisplayKey,
×
253
                Task:                params.Task,
×
254
                Cleanup:             params.Cleanup,
×
255
                Label:               params.Label,
×
256
                ArchiveAfterMinutes: params.ArchiveAfterMinutes,
×
257
        })
×
258
}
259

260
// handleSubagentSpawn 处理分身生成
261
func (m *AgentManager) handleSubagentSpawn(result *tools.SubagentSpawnResult) error {
×
262
        // 解析子会话密钥
×
263
        _, subagentID, isSubagent := ParseAgentSessionKey(result.ChildSessionKey)
×
264
        if !isSubagent {
×
265
                return fmt.Errorf("invalid subagent session key: %s", result.ChildSessionKey)
×
266
        }
×
267

268
        // TODO: 启动分身运行
269
        // 这里需要创建新的 Agent 实例来运行分身任务
270
        logger.Info("Subagent spawn handled",
×
271
                zap.String("run_id", result.RunID),
×
272
                zap.String("subagent_id", subagentID),
×
273
                zap.String("child_session_key", result.ChildSessionKey))
×
274

×
275
        return nil
×
276
}
277

278
// sendToSession 发送消息到指定会话
279
func (m *AgentManager) sendToSession(sessionKey, message string) error {
×
280
        // 解析会话密钥获取 agent ID
×
281
        agentID, _, _ := ParseAgentSessionKey(sessionKey)
×
282

×
283
        // 获取对应的 Agent
×
284
        agent, ok := m.GetAgent(agentID)
×
285
        if !ok {
×
286
                // 尝试使用默认 Agent
×
287
                agent = m.defaultAgent
×
288
        }
×
289

290
        if agent == nil {
×
291
                return fmt.Errorf("no agent found for session: %s", sessionKey)
×
292
        }
×
293

294
        // TODO: 实现将消息发送到 Agent 的逻辑
295
        // 这可能需要将消息注入到 Agent 的消息队列中
296

297
        logger.Info("Message sent to session",
×
298
                zap.String("session_key", sessionKey),
×
299
                zap.Int("message_length", len(message)))
×
300

×
301
        return nil
×
302
}
303

304
// createAgent 创建 Agent 实例
305
func (m *AgentManager) createAgent(cfg config.AgentConfig, contextBuilder *ContextBuilder, globalCfg *config.Config) error {
×
306
        // 获取 workspace 路径
×
307
        workspace := cfg.Workspace
×
308
        if workspace == "" {
×
309
                workspace = globalCfg.Workspace.Path
×
310
        }
×
311

312
        // 获取模型
313
        model := cfg.Model
×
314
        if model == "" {
×
315
                model = globalCfg.Agents.Defaults.Model
×
316
        }
×
317

318
        // 获取最大迭代次数
319
        maxIterations := globalCfg.Agents.Defaults.MaxIterations
×
320
        if maxIterations == 0 {
×
321
                maxIterations = 15
×
322
        }
×
323

324
        // 创建 Agent
325
        agent, err := NewAgent(&NewAgentConfig{
×
326
                Bus:          m.bus,
×
327
                Provider:     m.provider,
×
328
                SessionMgr:   m.sessionMgr,
×
329
                Tools:        m.tools,
×
330
                Context:      contextBuilder,
×
331
                Workspace:    workspace,
×
332
                MaxIteration: maxIterations,
×
333
                SkillsLoader: m.skillsLoader,
×
334
        })
×
335
        if err != nil {
×
336
                return fmt.Errorf("failed to create agent %s: %w", cfg.ID, err)
×
337
        }
×
338

339
        // 设置系统提示词
340
        if cfg.SystemPrompt != "" {
×
341
                agent.SetSystemPrompt(cfg.SystemPrompt)
×
342
        }
×
343

344
        // 存储到管理器
345
        m.agents[cfg.ID] = agent
×
346

×
347
        // 如果是默认 Agent,设置默认
×
348
        if cfg.Default {
×
349
                m.defaultAgent = agent
×
350
        }
×
351

352
        logger.Info("Agent created",
×
353
                zap.String("agent_id", cfg.ID),
×
354
                zap.String("name", cfg.Name),
×
355
                zap.String("workspace", workspace),
×
356
                zap.String("model", model),
×
357
                zap.Bool("is_default", cfg.Default))
×
358

×
359
        return nil
×
360
}
361

362
// setupBinding 设置 Agent 绑定
363
func (m *AgentManager) setupBinding(binding config.BindingConfig) error {
×
364
        // 获取 Agent
×
365
        agent, ok := m.agents[binding.AgentID]
×
366
        if !ok {
×
367
                return fmt.Errorf("agent not found: %s", binding.AgentID)
×
368
        }
×
369

370
        // 构建绑定键
371
        bindingKey := fmt.Sprintf("%s:%s", binding.Match.Channel, binding.Match.AccountID)
×
372

×
373
        // 存储绑定
×
374
        m.bindings[bindingKey] = &BindingEntry{
×
375
                AgentID:   binding.AgentID,
×
376
                Channel:   binding.Match.Channel,
×
377
                AccountID: binding.Match.AccountID,
×
378
                Agent:     agent,
×
379
        }
×
380

×
381
        logger.Info("Binding setup",
×
382
                zap.String("binding_key", bindingKey),
×
383
                zap.String("agent_id", binding.AgentID))
×
384

×
385
        return nil
×
386
}
387

388
// RouteInbound 路由入站消息到对应的 Agent
389
func (m *AgentManager) RouteInbound(ctx context.Context, msg *bus.InboundMessage) error {
×
390
        m.mu.RLock()
×
391
        defer m.mu.RUnlock()
×
392

×
393
        // 构建绑定键
×
394
        bindingKey := fmt.Sprintf("%s:%s", msg.Channel, msg.AccountID)
×
395

×
396
        // 查找绑定的 Agent
×
397
        entry, ok := m.bindings[bindingKey]
×
398
        var agent *Agent
×
399
        if ok {
×
400
                agent = entry.Agent
×
401
                logger.Debug("Message routed by binding",
×
402
                        zap.String("binding_key", bindingKey),
×
403
                        zap.String("agent_id", entry.AgentID))
×
404
        } else if m.defaultAgent != nil {
×
405
                // 使用默认 Agent
×
406
                agent = m.defaultAgent
×
407
                logger.Debug("Message routed to default agent",
×
408
                        zap.String("channel", msg.Channel),
×
409
                        zap.String("account_id", msg.AccountID))
×
410
        } else {
×
411
                return fmt.Errorf("no agent found for message: %s", bindingKey)
×
412
        }
×
413

414
        // 处理消息
415
        return m.handleInboundMessage(ctx, msg, agent)
×
416
}
417

418
// handleInboundMessage 处理入站消息
419
func (m *AgentManager) handleInboundMessage(ctx context.Context, msg *bus.InboundMessage, agent *Agent) error {
×
420
        // 调用 Agent 处理消息(内部逻辑和 agent.go 中的 handleInboundMessage 类似)
×
421
        logger.Info("Processing inbound message",
×
422
                zap.String("channel", msg.Channel),
×
423
                zap.String("account_id", msg.AccountID),
×
424
                zap.String("chat_id", msg.ChatID))
×
425

×
426
        // 生成会话键(包含 account_id 以区分不同账号的消息)
×
427
        sessionKey := fmt.Sprintf("%s:%s:%s", msg.Channel, msg.AccountID, msg.ChatID)
×
428
        if msg.ChatID == "default" || msg.ChatID == "" {
×
429
                sessionKey = fmt.Sprintf("%s:%s:%d", msg.Channel, msg.AccountID, msg.Timestamp.Unix())
×
430
                logger.Info("Creating fresh session", zap.String("session_key", sessionKey))
×
431
        }
×
432

433
        // 获取或创建会话
434
        sess, err := m.sessionMgr.GetOrCreate(sessionKey)
×
435
        if err != nil {
×
436
                logger.Error("Failed to get session", zap.Error(err))
×
437
                return err
×
438
        }
×
439

440
        // 转换为 Agent 消息
441
        agentMsg := AgentMessage{
×
442
                Role:      RoleUser,
×
443
                Content:   []ContentBlock{TextContent{Text: msg.Content}},
×
444
                Timestamp: msg.Timestamp.UnixMilli(),
×
445
        }
×
446

×
447
        // 添加媒体内容
×
448
        for _, media := range msg.Media {
×
449
                if media.Type == "image" {
×
450
                        agentMsg.Content = append(agentMsg.Content, ImageContent{
×
451
                                URL:      media.URL,
×
452
                                Data:     media.Base64,
×
453
                                MimeType: media.MimeType,
×
454
                        })
×
455
                }
×
456
        }
457

458
        // 获取 Agent 的 orchestrator
459
        orchestrator := agent.GetOrchestrator()
×
460

×
461
        // 加载历史消息并添加当前消息
×
462
        history := sess.GetHistory(-1) // -1 表示加载所有历史消息
×
463
        historyAgentMsgs := sessionMessagesToAgentMessages(history)
×
464
        allMessages := append(historyAgentMsgs, agentMsg)
×
465

×
466
        logger.Info("About to call orchestrator.Run",
×
467
                zap.String("session_key", sessionKey),
×
468
                zap.Int("history_count", len(history)),
×
469
                zap.Int("all_messages_count", len(allMessages)))
×
470

×
471
        // 执行 Agent
×
472
        finalMessages, err := orchestrator.Run(ctx, allMessages)
×
473
        logger.Info("orchestrator.Run returned",
×
474
                zap.String("session_key", sessionKey),
×
475
                zap.Int("final_messages_count", len(finalMessages)),
×
476
                zap.Error(err))
×
477
        if err != nil {
×
478
                // Check if error is related to tool_call_id mismatch (old session format)
×
479
                errStr := err.Error()
×
480
                if strings.Contains(errStr, "tool_call_id") && strings.Contains(errStr, "mismatch") {
×
481
                        logger.Warn("Detected old session format, clearing session",
×
482
                                zap.String("session_key", sessionKey),
×
483
                                zap.Error(err))
×
484
                        // Clear old session and retry
×
485
                        if delErr := m.sessionMgr.Delete(sessionKey); delErr != nil {
×
486
                                logger.Error("Failed to clear old session", zap.Error(delErr))
×
487
                        } else {
×
488
                                logger.Info("Cleared old session, retrying with fresh session")
×
489
                                // Get fresh session
×
490
                                sess, getErr := m.sessionMgr.GetOrCreate(sessionKey)
×
491
                                if getErr != nil {
×
492
                                        logger.Error("Failed to create fresh session", zap.Error(getErr))
×
493
                                        return getErr
×
494
                                }
×
495
                                // Retry with fresh session (no history)
496
                                finalMessages, retryErr := orchestrator.Run(ctx, []AgentMessage{agentMsg})
×
497
                                if retryErr != nil {
×
498
                                        logger.Error("Agent execution failed on retry", zap.Error(retryErr))
×
499
                                        return retryErr
×
500
                                }
×
501
                                // Update session with new messages
502
                                m.updateSession(sess, finalMessages, 0)
×
503
                                // Publish response
×
504
                                if len(finalMessages) > 0 {
×
505
                                        lastMsg := finalMessages[len(finalMessages)-1]
×
506
                                        if lastMsg.Role == RoleAssistant {
×
507
                                                m.publishToBus(ctx, msg.Channel, msg.ChatID, lastMsg)
×
508
                                        }
×
509
                                }
510
                                return nil
×
511
                        }
512
                }
513
                logger.Error("Agent execution failed", zap.Error(err))
×
514
                return err
×
515
        }
516

517
        // 更新会话(只保存新产生的消息)
518
        m.updateSession(sess, finalMessages, len(history))
×
519

×
520
        // 发布响应
×
521
        if len(finalMessages) > 0 {
×
522
                lastMsg := finalMessages[len(finalMessages)-1]
×
523
                if lastMsg.Role == RoleAssistant {
×
524
                        m.publishToBus(ctx, msg.Channel, msg.ChatID, lastMsg)
×
525
                }
×
526
        }
527

528
        return nil
×
529
}
530

531
// updateSession 更新会话
532
func (m *AgentManager) updateSession(sess *session.Session, messages []AgentMessage, historyLen int) {
×
533
        // 只保存新产生的消息(不包括历史消息)
×
534
        newMessages := messages
×
535
        if historyLen >= 0 && len(messages) > historyLen {
×
536
                newMessages = messages[historyLen:]
×
537
        }
×
538

539
        for _, msg := range newMessages {
×
540
                sessMsg := session.Message{
×
541
                        Role:      string(msg.Role),
×
542
                        Content:   extractTextContent(msg),
×
543
                        Timestamp: time.Unix(msg.Timestamp/1000, 0),
×
544
                }
×
545

×
546
                if msg.Role == RoleAssistant {
×
547
                        for _, block := range msg.Content {
×
548
                                if tc, ok := block.(ToolCallContent); ok {
×
549
                                        sessMsg.ToolCalls = append(sessMsg.ToolCalls, session.ToolCall{
×
550
                                                ID:     tc.ID,
×
551
                                                Name:   tc.Name,
×
552
                                                Params: tc.Arguments,
×
UNCOV
553
                                        })
×
UNCOV
554
                                }
×
555
                        }
556
                }
557

558
                if msg.Role == RoleToolResult {
×
559
                        if id, ok := msg.Metadata["tool_call_id"].(string); ok {
×
UNCOV
560
                                sessMsg.ToolCallID = id
×
UNCOV
561
                        }
×
562
                        // Preserve tool_name in metadata for validation
UNCOV
563
                        if toolName, ok := msg.Metadata["tool_name"].(string); ok {
×
UNCOV
564
                                if sessMsg.Metadata == nil {
×
565
                                        sessMsg.Metadata = make(map[string]interface{})
×
566
                                }
×
567
                                sessMsg.Metadata["tool_name"] = toolName
×
568
                        }
569
                }
570

571
                sess.AddMessage(sessMsg)
×
572
        }
573

574
        if err := m.sessionMgr.Save(sess); err != nil {
×
575
                logger.Error("Failed to save session", zap.Error(err))
×
576
        }
×
577
}
578

579
// publishToBus 发布消息到总线
580
func (m *AgentManager) publishToBus(ctx context.Context, channel, chatID string, msg AgentMessage) {
×
581
        content := extractTextContent(msg)
×
582

×
583
        outbound := &bus.OutboundMessage{
×
UNCOV
584
                Channel:   channel,
×
UNCOV
585
                ChatID:    chatID,
×
UNCOV
586
                Content:   content,
×
587
                Timestamp: time.Unix(msg.Timestamp/1000, 0),
×
588
        }
×
589

×
590
        if err := m.bus.PublishOutbound(ctx, outbound); err != nil {
×
591
                logger.Error("Failed to publish outbound", zap.Error(err))
×
592
        }
×
593
}
594

595
// sessionMessagesToAgentMessages 将 session 消息转换为 Agent 消息
596
func sessionMessagesToAgentMessages(sessMsgs []session.Message) []AgentMessage {
×
597
        result := make([]AgentMessage, 0, len(sessMsgs))
×
598
        for _, sessMsg := range sessMsgs {
×
599
                agentMsg := AgentMessage{
×
600
                        Role:      MessageRole(sessMsg.Role),
×
601
                        Content:   []ContentBlock{TextContent{Text: sessMsg.Content}},
×
602
                        Timestamp: sessMsg.Timestamp.UnixMilli(),
×
603
                }
×
604

×
605
                // Handle tool calls in assistant messages
×
606
                if sessMsg.Role == "assistant" && len(sessMsg.ToolCalls) > 0 {
×
UNCOV
607
                        // Clear the text content if there are tool calls
×
UNCOV
608
                        agentMsg.Content = []ContentBlock{}
×
UNCOV
609
                        for _, tc := range sessMsg.ToolCalls {
×
610
                                agentMsg.Content = append(agentMsg.Content, ToolCallContent{
×
611
                                        ID:        tc.ID,
×
612
                                        Name:      tc.Name,
×
613
                                        Arguments: tc.Params,
×
614
                                })
×
615
                        }
×
616
                }
617

618
                // Handle tool result messages
UNCOV
619
                if sessMsg.Role == "tool" {
×
UNCOV
620
                        agentMsg.Role = RoleToolResult
×
621
                        // Set tool_call_id in metadata
×
UNCOV
622
                        if sessMsg.ToolCallID != "" {
×
623
                                if agentMsg.Metadata == nil {
×
UNCOV
624
                                        agentMsg.Metadata = make(map[string]any)
×
UNCOV
625
                                }
×
UNCOV
626
                                agentMsg.Metadata["tool_call_id"] = sessMsg.ToolCallID
×
627
                        }
628
                }
629

630
                result = append(result, agentMsg)
×
631
        }
632
        return result
×
633
}
634

635
// GetAgent 获取 Agent
636
func (m *AgentManager) GetAgent(agentID string) (*Agent, bool) {
×
637
        m.mu.RLock()
×
638
        defer m.mu.RUnlock()
×
639

×
640
        agent, ok := m.agents[agentID]
×
641
        return agent, ok
×
642
}
×
643

644
// ListAgents 列出所有 Agent ID
UNCOV
645
func (m *AgentManager) ListAgents() []string {
×
UNCOV
646
        m.mu.RLock()
×
UNCOV
647
        defer m.mu.RUnlock()
×
648

×
649
        ids := make([]string, 0, len(m.agents))
×
650
        for id := range m.agents {
×
651
                ids = append(ids, id)
×
652
        }
×
653
        return ids
×
654
}
655

656
// Start 启动所有 Agent
657
func (m *AgentManager) Start(ctx context.Context) error {
×
658
        m.mu.RLock()
×
659
        defer m.mu.RUnlock()
×
UNCOV
660

×
UNCOV
661
        for id, agent := range m.agents {
×
UNCOV
662
                if err := agent.Start(ctx); err != nil {
×
663
                        logger.Error("Failed to start agent",
×
664
                                zap.String("agent_id", id),
×
665
                                zap.Error(err))
×
UNCOV
666
                } else {
×
UNCOV
667
                        logger.Info("Agent started", zap.String("agent_id", id))
×
UNCOV
668
                }
×
669
        }
670

671
        // 启动消息处理器
672
        go m.processMessages(ctx)
×
673

×
674
        return nil
×
675
}
676

677
// Stop 停止所有 Agent
678
func (m *AgentManager) Stop() error {
×
UNCOV
679
        m.mu.RLock()
×
UNCOV
680
        defer m.mu.RUnlock()
×
681

×
UNCOV
682
        for id, agent := range m.agents {
×
UNCOV
683
                if err := agent.Stop(); err != nil {
×
UNCOV
684
                        logger.Error("Failed to stop agent",
×
685
                                zap.String("agent_id", id),
×
686
                                zap.Error(err))
×
687
                }
×
688
        }
689

690
        return nil
×
691
}
692

693
// processMessages 处理入站消息
694
func (m *AgentManager) processMessages(ctx context.Context) {
×
695
        for {
×
UNCOV
696
                select {
×
697
                case <-ctx.Done():
×
698
                        logger.Info("Agent manager message processor stopped")
×
UNCOV
699
                        return
×
UNCOV
700
                default:
×
701
                        msg, err := m.bus.ConsumeInbound(ctx)
×
702
                        if err != nil {
×
703
                                if err == context.DeadlineExceeded || err == context.Canceled {
×
704
                                        continue
×
705
                                }
706
                                logger.Error("Failed to consume inbound", zap.Error(err))
×
UNCOV
707
                                continue
×
708
                        }
709

UNCOV
710
                        if err := m.RouteInbound(ctx, msg); err != nil {
×
UNCOV
711
                                logger.Error("Failed to route message",
×
712
                                        zap.String("channel", msg.Channel),
×
713
                                        zap.String("account_id", msg.AccountID),
×
714
                                        zap.Error(err))
×
715
                        }
×
716
                }
717
        }
718
}
719

720
// GetDefaultAgent 获取默认 Agent
721
func (m *AgentManager) GetDefaultAgent() *Agent {
×
722
        m.mu.RLock()
×
723
        defer m.mu.RUnlock()
×
724
        return m.defaultAgent
×
725
}
×
726

727
// GetToolsInfo 获取工具信息
728
func (m *AgentManager) GetToolsInfo() (map[string]interface{}, error) {
×
729
        m.mu.RLock()
×
730
        defer m.mu.RUnlock()
×
731

×
732
        // 从 tool registry 获取工具列表
×
733
        existingTools := m.tools.ListExisting()
×
UNCOV
734
        result := make(map[string]interface{})
×
735

×
UNCOV
736
        for _, tool := range existingTools {
×
UNCOV
737
                result[tool.Name()] = map[string]interface{}{
×
UNCOV
738
                        "name":        tool.Name(),
×
UNCOV
739
                        "description": tool.Description(),
×
UNCOV
740
                        "parameters":  tool.Parameters(),
×
UNCOV
741
                }
×
UNCOV
742
        }
×
743

UNCOV
744
        return result, nil
×
745
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc