• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

smallnest / goclaw / 21978860214

13 Feb 2026 07:45AM UTC coverage: 5.772% (+0.008%) from 5.764%
21978860214

push

github

chaoyuepan
improve web fetch

4 of 24 new or added lines in 10 files covered. (16.67%)

221 existing lines in 4 files now uncovered.

1517 of 26284 relevant lines covered (5.77%)

0.55 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

0.0
/agent/subagent_registry.go
1
package agent
2

3
import (
4
        "encoding/json"
5
        "fmt"
6
        "os"
7
        "path/filepath"
8
        "strings"
9
        "sync"
10
        "time"
11

12
        "github.com/google/uuid"
13
        "github.com/smallnest/goclaw/internal/logger"
14
        "go.uber.org/zap"
15
)
16

17
// SubagentRunOutcome 分身运行结果
18
type SubagentRunOutcome struct {
19
        Status string `json:"status"` // ok, error, timeout, unknown
20
        Error  string `json:"error,omitempty"`
21
}
22

23
// DeliveryContext 传递上下文
24
type DeliveryContext struct {
25
        Channel   string `json:"channel,omitempty"`
26
        AccountID string `json:"account_id,omitempty"`
27
        To        string `json:"to,omitempty"`
28
        ThreadID  string `json:"thread_id,omitempty"`
29
}
30

31
// SubagentRunRecord 分身运行记录
32
type SubagentRunRecord struct {
33
        RunID               string              `json:"run_id"`
34
        ChildSessionKey     string              `json:"child_session_key"`
35
        RequesterSessionKey string              `json:"requester_session_key"`
36
        RequesterOrigin     *DeliveryContext    `json:"requester_origin,omitempty"`
37
        RequesterDisplayKey string              `json:"requester_display_key"`
38
        Task                string              `json:"task"`
39
        Cleanup             string              `json:"cleanup"` // delete, keep
40
        Label               string              `json:"label,omitempty"`
41
        CreatedAt           int64               `json:"created_at"`
42
        StartedAt           *int64              `json:"started_at,omitempty"`
43
        EndedAt             *int64              `json:"ended_at,omitempty"`
44
        Outcome             *SubagentRunOutcome `json:"outcome,omitempty"`
45
        ArchiveAtMs         *int64              `json:"archive_at_ms,omitempty"`
46
        CleanupCompletedAt  *int64              `json:"cleanup_completed_at,omitempty"`
47
        CleanupHandled      bool                `json:"cleanup_handled"`
48
}
49

50
// SubagentRegistry 分身注册表
51
type SubagentRegistry struct {
52
        runs        map[string]*SubagentRunRecord
53
        mu          sync.RWMutex
54
        dataDir     string
55
        storeFile   string
56
        sweeperStop chan struct{}
57
        sweeperOnce sync.Once
58
        // 事件回调
59
        onRunComplete func(runID string, record *SubagentRunRecord)
60
}
61

62
// NewSubagentRegistry 创建分身注册表
63
func NewSubagentRegistry(dataDir string) *SubagentRegistry {
×
64
        storeFile := filepath.Join(dataDir, "subagent_registry.json")
×
65
        return &SubagentRegistry{
×
66
                runs:      make(map[string]*SubagentRunRecord),
×
67
                dataDir:   dataDir,
×
68
                storeFile: storeFile,
×
69
        }
×
70
}
×
71

72
// RegisterRun 注册分身运行
73
func (r *SubagentRegistry) RegisterRun(params *SubagentRunParams) error {
×
74
        r.mu.Lock()
×
75
        defer r.mu.Unlock()
×
76

×
77
        now := time.Now().UnixMilli()
×
78
        archiveAfterMs := int64(params.ArchiveAfterMinutes) * 60_000
×
79
        var archiveAtMs *int64
×
80
        if archiveAfterMs > 0 {
×
81
                archiveAtMs = new(int64)
×
82
                *archiveAtMs = now + archiveAfterMs
×
83
        }
×
84

85
        record := &SubagentRunRecord{
×
86
                RunID:               params.RunID,
×
87
                ChildSessionKey:     params.ChildSessionKey,
×
88
                RequesterSessionKey: params.RequesterSessionKey,
×
89
                RequesterOrigin:     params.RequesterOrigin,
×
90
                RequesterDisplayKey: params.RequesterDisplayKey,
×
91
                Task:                params.Task,
×
92
                Cleanup:             params.Cleanup,
×
93
                Label:               params.Label,
×
94
                CreatedAt:           now,
×
95
                StartedAt:           &now,
×
96
                ArchiveAtMs:         archiveAtMs,
×
97
                CleanupHandled:      false,
×
98
        }
×
99

×
100
        r.runs[params.RunID] = record
×
101

×
102
        // 启动清理器
×
103
        if archiveAtMs != nil {
×
104
                r.startSweeper()
×
105
        }
×
106

107
        // 保存到磁盘
108
        if err := r.saveToDisk(); err != nil {
×
109
                logger.Error("Failed to save subagent registry", zap.Error(err))
×
110
        }
×
111

112
        logger.Info("Subagent run registered",
×
113
                zap.String("run_id", params.RunID),
×
114
                zap.String("child_session_key", params.ChildSessionKey),
×
115
                zap.String("task", params.Task))
×
116

×
117
        return nil
×
118
}
119

120
// SubagentRunParams 注册参数
121
type SubagentRunParams struct {
122
        RunID               string
123
        ChildSessionKey     string
124
        RequesterSessionKey string
125
        RequesterOrigin     *DeliveryContext
126
        RequesterDisplayKey string
127
        Task                string
128
        Cleanup             string
129
        Label               string
130
        ArchiveAfterMinutes int
131
}
132

133
// GetRun 获取运行记录
134
func (r *SubagentRegistry) GetRun(runID string) (*SubagentRunRecord, bool) {
×
135
        r.mu.RLock()
×
136
        defer r.mu.RUnlock()
×
137
        record, ok := r.runs[runID]
×
138
        return record, ok
×
139
}
×
140

141
// ListRunsForRequester 列出请求者的所有分身运行
142
func (r *SubagentRegistry) ListRunsForRequester(requesterSessionKey string) []*SubagentRunRecord {
×
143
        r.mu.RLock()
×
144
        defer r.mu.RUnlock()
×
145

×
146
        var result []*SubagentRunRecord
×
147
        key := requesterSessionKey
×
148
        if key == "" {
×
149
                return result
×
150
        }
×
151

152
        for _, record := range r.runs {
×
153
                if record.RequesterSessionKey == key {
×
154
                        result = append(result, record)
×
155
                }
×
156
        }
157
        return result
×
158
}
159

160
// MarkCompleted 标记分身运行完成
161
func (r *SubagentRegistry) MarkCompleted(runID string, outcome *SubagentRunOutcome, endedAt *int64) error {
×
162
        r.mu.Lock()
×
163
        defer r.mu.Unlock()
×
164

×
165
        record, ok := r.runs[runID]
×
166
        if !ok {
×
167
                return fmt.Errorf("run not found: %s", runID)
×
168
        }
×
169

170
        record.EndedAt = endedAt
×
171
        record.Outcome = outcome
×
172

×
173
        // 保存到磁盘
×
174
        if err := r.saveToDisk(); err != nil {
×
175
                logger.Error("Failed to save subagent registry", zap.Error(err))
×
176
        }
×
177

178
        // 触发回调
179
        if r.onRunComplete != nil && outcome != nil {
×
180
                go r.onRunComplete(runID, record)
×
181
        }
×
182

183
        return nil
×
184
}
185

186
// ReleaseRun 释放运行记录
187
func (r *SubagentRegistry) ReleaseRun(runID string) {
×
188
        r.mu.Lock()
×
189
        defer r.mu.Unlock()
×
190

×
191
        delete(r.runs, runID)
×
192

×
193
        // 如果没有运行记录了,停止清理器
×
194
        if len(r.runs) == 0 && r.sweeperStop != nil {
×
195
                close(r.sweeperStop)
×
196
                r.sweeperStop = nil
×
197
        }
×
198

NEW
199
        _ = r.saveToDisk()
×
200
}
201

202
// DeleteChildSession 删除子会话
203
func (r *SubagentRegistry) DeleteChildSession(sessionKey string) error {
×
204
        // 这里可以集成会话管理器的删除逻辑
×
205
        logger.Info("Deleting child session", zap.String("session_key", sessionKey))
×
206
        return nil
×
207
}
×
208

209
// SetOnRunComplete 设置运行完成回调
210
func (r *SubagentRegistry) SetOnRunComplete(fn func(runID string, record *SubagentRunRecord)) {
×
211
        r.mu.Lock()
×
212
        defer r.mu.Unlock()
×
213
        r.onRunComplete = fn
×
214
}
×
215

216
// LoadFromDisk 从磁盘加载
217
func (r *SubagentRegistry) LoadFromDisk() error {
×
218
        r.mu.Lock()
×
219
        defer r.mu.Unlock()
×
220

×
221
        data, err := os.ReadFile(r.storeFile)
×
222
        if err != nil {
×
223
                if os.IsNotExist(err) {
×
224
                        return nil
×
225
                }
×
226
                return err
×
227
        }
228

229
        var loaded map[string]*SubagentRunRecord
×
230
        if err := json.Unmarshal(data, &loaded); err != nil {
×
231
                return err
×
232
        }
×
233

234
        r.runs = loaded
×
235

×
236
        // 恢复有归档时间的运行记录的清理器
×
237
        for _, record := range r.runs {
×
238
                if record.ArchiveAtMs != nil {
×
239
                        r.startSweeper()
×
240
                        break
×
241
                }
242
        }
243

244
        logger.Info("Subagent registry loaded from disk",
×
245
                zap.Int("runs", len(r.runs)))
×
246
        return nil
×
247
}
248

249
// saveToDisk 保存到磁盘
250
func (r *SubagentRegistry) saveToDisk() error {
×
251
        data, err := json.MarshalIndent(r.runs, "", "  ")
×
252
        if err != nil {
×
253
                return err
×
254
        }
×
255

256
        // 确保目录存在
257
        if err := os.MkdirAll(r.dataDir, 0755); err != nil {
×
258
                return err
×
259
        }
×
260

261
        return os.WriteFile(r.storeFile, data, 0644)
×
262
}
263

264
// startSweeper 启动清理器
265
func (r *SubagentRegistry) startSweeper() {
×
266
        r.sweeperOnce.Do(func() {
×
267
                r.sweeperStop = make(chan struct{})
×
268
                go r.runSweeper()
×
269
        })
×
270
}
271

272
// runSweeper 运行清理器
273
func (r *SubagentRegistry) runSweeper() {
×
274
        ticker := time.NewTicker(60 * time.Second)
×
275
        defer ticker.Stop()
×
276

×
277
        for {
×
278
                select {
×
279
                case <-ticker.C:
×
280
                        r.sweep()
×
281
                case <-r.sweeperStop:
×
282
                        logger.Info("Subagent registry sweeper stopped")
×
283
                        return
×
284
                }
285
        }
286
}
287

288
// sweep 清理过期的运行记录
289
func (r *SubagentRegistry) sweep() {
×
290
        r.mu.Lock()
×
291
        defer r.mu.Unlock()
×
292

×
293
        now := time.Now().UnixMilli()
×
294
        var toDelete []string
×
295

×
296
        for runID, record := range r.runs {
×
297
                if record.ArchiveAtMs != nil && *record.ArchiveAtMs <= now {
×
298
                        toDelete = append(toDelete, runID)
×
299
                }
×
300
        }
301

302
        if len(toDelete) == 0 {
×
303
                return
×
304
        }
×
305

306
        for _, runID := range toDelete {
×
307
                record := r.runs[runID]
×
308
                // 删除子会话
×
309
                if err := r.DeleteChildSession(record.ChildSessionKey); err != nil {
×
310
                        logger.Error("Failed to delete child session",
×
311
                                zap.String("run_id", runID),
×
312
                                zap.Error(err))
×
313
                }
×
314
                delete(r.runs, runID)
×
315
                logger.Info("Subagent run archived and deleted",
×
316
                        zap.String("run_id", runID))
×
317
        }
318

NEW
319
        _ = r.saveToDisk()
×
320

×
321
        // 如果没有运行记录了,停止清理器
×
322
        if len(r.runs) == 0 && r.sweeperStop != nil {
×
323
                close(r.sweeperStop)
×
324
                r.sweeperStop = nil
×
325
        }
×
326
}
327

328
// Cleanup 标记清理已完成
329
func (r *SubagentRegistry) Cleanup(runID string, cleanup string, didAnnounce bool) {
×
330
        r.mu.Lock()
×
331
        defer r.mu.Unlock()
×
332

×
333
        record, ok := r.runs[runID]
×
334
        if !ok {
×
335
                return
×
336
        }
×
337

338
        if !didAnnounce {
×
339
                // 允许重试
×
340
                record.CleanupHandled = false
×
NEW
341
                _ = r.saveToDisk()
×
342
                return
×
343
        }
×
344

345
        if cleanup == "delete" {
×
346
                delete(r.runs, runID)
×
347
                if len(r.runs) == 0 && r.sweeperStop != nil {
×
348
                        close(r.sweeperStop)
×
349
                        r.sweeperStop = nil
×
350
                }
×
351
        } else {
×
352
                now := time.Now().UnixMilli()
×
353
                record.CleanupCompletedAt = &now
×
354
        }
×
355

NEW
356
        _ = r.saveToDisk()
×
357
}
358

359
// BeginCleanup 开始清理流程
360
func (r *SubagentRegistry) BeginCleanup(runID string) bool {
×
361
        r.mu.Lock()
×
362
        defer r.mu.Unlock()
×
363

×
364
        record, ok := r.runs[runID]
×
365
        if !ok {
×
366
                return false
×
367
        }
×
368

369
        if record.CleanupCompletedAt != nil {
×
370
                return false
×
371
        }
×
372

373
        if record.CleanupHandled {
×
374
                return false
×
375
        }
×
376

377
        record.CleanupHandled = true
×
NEW
378
        _ = r.saveToDisk()
×
379
        return true
×
380
}
381

382
// Count 获取运行数量
383
func (r *SubagentRegistry) Count() int {
×
384
        r.mu.RLock()
×
385
        defer r.mu.RUnlock()
×
386
        return len(r.runs)
×
387
}
×
388

389
// GenerateRunID 生成运行ID
390
func GenerateRunID() string {
×
391
        return uuid.New().String()
×
392
}
×
393

394
// IsSubagentSessionKey 判断是否为分身会话密钥
395
func IsSubagentSessionKey(sessionKey string) bool {
×
396
        // 分身会话格式: agent:<agentId>:subagent:<uuid>
×
397
        // 或: subagent:<uuid>
×
398
        if sessionKey == "" {
×
399
                return false
×
400
        }
×
401
        return containsSubagentMarker(sessionKey)
×
402
}
403

404
// containsSubagentMarker 检查是否包含分身标记
405
func containsSubagentMarker(s string) bool {
×
406
        marker := ":subagent:"
×
407
        for i := 0; i <= len(s)-len(marker); i++ {
×
408
                if s[i:i+len(marker)] == marker {
×
409
                        return true
×
410
                }
×
411
        }
412
        return false
×
413
}
414

415
// GenerateChildSessionKey 生成子会话密钥
416
func GenerateChildSessionKey(agentID string) string {
×
417
        u := uuid.New()
×
418
        return fmt.Sprintf("agent:%s:subagent:%s", agentID, u.String())
×
419
}
×
420

421
// ParseAgentSessionKey 解析 Agent 会话密钥
422
func ParseAgentSessionKey(sessionKey string) (agentID string, subagentID string, isSubagent bool) {
×
423
        if sessionKey == "" {
×
424
                return "", "", false
×
425
        }
×
426

427
        // 检查是否为分身会话
428
        if idx := findSubagentMarkerIndex(sessionKey); idx >= 0 {
×
429
                // 格式: agent:<agentId>:subagent:<uuid>
×
430
                parts := splitSessionKey(sessionKey)
×
431
                if len(parts) >= 4 && parts[0] == "agent" && parts[2] == "subagent" {
×
432
                        return parts[1], parts[3], true
×
433
                }
×
434
                // 格式: subagent:<uuid>
435
                if len(parts) >= 2 && parts[0] == "subagent" {
×
436
                        return "", parts[1], true
×
437
                }
×
438
                return "", "", true
×
439
        }
440

441
        // 格式: agent:<agentId>:<sessionKey>
442
        parts := splitSessionKey(sessionKey)
×
443
        if len(parts) >= 2 && parts[0] == "agent" {
×
444
                return parts[1], "", false
×
445
        }
×
446

447
        return "", "", false
×
448
}
449

450
// findSubagentMarkerIndex 查找分身标记位置
451
func findSubagentMarkerIndex(s string) int {
×
452
        marker := ":subagent:"
×
453
        for i := 0; i <= len(s)-len(marker); i++ {
×
454
                if s[i:i+len(marker)] == marker {
×
455
                        return i
×
456
                }
×
457
        }
458
        return -1
×
459
}
460

461
// splitSessionKey 分割会话密钥
462
func splitSessionKey(s string) []string {
×
463
        var parts []string
×
464
        var current strings.Builder
×
465

×
466
        for i := 0; i < len(s); i++ {
×
467
                if s[i] == ':' {
×
468
                        if current.Len() > 0 {
×
469
                                parts = append(parts, current.String())
×
470
                                current.Reset()
×
471
                        }
×
472
                } else {
×
473
                        current.WriteByte(s[i])
×
474
                }
×
475
        }
476
        if current.Len() > 0 {
×
477
                parts = append(parts, current.String())
×
478
        }
×
479
        return parts
×
480
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc