• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

umputun / ralphex / 21959106240

12 Feb 2026 06:24PM UTC coverage: 80.884% (-0.04%) from 80.928%
21959106240

push

github

umputun
docs: update changelog for v0.10.5

5471 of 6764 relevant lines covered (80.88%)

171.88 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

88.33
/pkg/web/session_manager.go
1
package web
2

3
import (
4
        "bufio"
5
        "fmt"
6
        "hash/fnv"
7
        "log"
8
        "os"
9
        "path/filepath"
10
        "sort"
11
        "strconv"
12
        "strings"
13
        "sync"
14
        "time"
15

16
        "github.com/umputun/ralphex/pkg/executor"
17
        "github.com/umputun/ralphex/pkg/progress"
18
        "github.com/umputun/ralphex/pkg/status"
19
)
20

21
// MaxCompletedSessions is the maximum number of completed sessions to retain.
22
// active sessions are never evicted. oldest completed sessions are removed
23
// when this limit is exceeded to prevent unbounded memory growth.
24
const MaxCompletedSessions = 100
25

26
// SessionManager maintains a registry of all discovered sessions.
27
// it handles discovery of progress files, state detection via flock,
28
// and provides access to sessions by ID.
29
// completed sessions are automatically evicted when MaxCompletedSessions is exceeded.
30
type SessionManager struct {
31
        mu       sync.RWMutex
32
        sessions map[string]*Session // keyed by session ID
33
}
34

35
// NewSessionManager creates a new session manager with an empty registry.
36
func NewSessionManager() *SessionManager {
59✔
37
        return &SessionManager{
59✔
38
                sessions: make(map[string]*Session),
59✔
39
        }
59✔
40
}
59✔
41

42
// Discover scans a directory for progress files matching progress-*.txt pattern.
43
// for each file found, it creates or updates a session in the registry.
44
// returns the list of discovered session IDs.
45
func (m *SessionManager) Discover(dir string) ([]string, error) {
29✔
46
        pattern := filepath.Join(dir, "progress-*.txt")
29✔
47
        matches, err := filepath.Glob(pattern)
29✔
48
        if err != nil {
29✔
49
                return nil, fmt.Errorf("glob progress files: %w", err)
×
50
        }
×
51

52
        ids := make([]string, 0, len(matches))
29✔
53
        for _, path := range matches {
169✔
54
                id := sessionIDFromPath(path)
140✔
55
                ids = append(ids, id)
140✔
56

140✔
57
                // check if session already exists
140✔
58
                m.mu.RLock()
140✔
59
                existing := m.sessions[id]
140✔
60
                m.mu.RUnlock()
140✔
61

140✔
62
                if existing != nil {
145✔
63
                        // update existing session state
5✔
64
                        if err := m.updateSession(existing); err != nil {
5✔
65
                                log.Printf("[WARN] failed to update session %s: %v", id, err)
×
66
                                continue
×
67
                        }
68
                } else {
135✔
69
                        // create new session
135✔
70
                        session := NewSession(id, path)
135✔
71
                        if err := m.updateSession(session); err != nil {
136✔
72
                                log.Printf("[WARN] failed to create session %s: %v", id, err)
1✔
73
                                continue
1✔
74
                        }
75
                        m.mu.Lock()
134✔
76
                        m.sessions[id] = session
134✔
77
                        m.evictOldCompleted()
134✔
78
                        m.mu.Unlock()
134✔
79
                }
80
        }
81

82
        return ids, nil
29✔
83
}
84

85
// DiscoverRecursive walks a directory tree and discovers all progress files.
86
// unlike Discover, this searches subdirectories recursively.
87
// returns the list of all discovered session IDs (deduplicated).
88
func (m *SessionManager) DiscoverRecursive(root string) ([]string, error) {
18✔
89
        seenDirs := make(map[string]bool)
18✔
90
        seenIDs := make(map[string]bool)
18✔
91
        var allIDs []string
18✔
92

18✔
93
        err := filepath.WalkDir(root, func(path string, d os.DirEntry, err error) error {
46✔
94
                if err != nil {
29✔
95
                        // skip directories that can't be accessed
1✔
96
                        if d != nil && d.IsDir() {
2✔
97
                                return filepath.SkipDir
1✔
98
                        }
1✔
99
                        return nil
×
100
                }
101

102
                // skip directories that typically contain many subdirs and no progress files
103
                if d.IsDir() && skipDirs[d.Name()] && path != root {
33✔
104
                        return filepath.SkipDir
6✔
105
                }
6✔
106

107
                // skip non-progress files
108
                if d.IsDir() || !isProgressFile(path) {
41✔
109
                        return nil
20✔
110
                }
20✔
111

112
                // only call Discover once per directory
113
                dir := filepath.Dir(path)
1✔
114
                if seenDirs[dir] {
1✔
115
                        return nil
×
116
                }
×
117
                seenDirs[dir] = true
1✔
118

1✔
119
                ids, discoverErr := m.Discover(dir)
1✔
120
                if discoverErr != nil {
1✔
121
                        return nil //nolint:nilerr // best-effort discovery, errors for individual directories are ignored
×
122
                }
×
123

124
                for _, id := range ids {
2✔
125
                        if !seenIDs[id] {
2✔
126
                                seenIDs[id] = true
1✔
127
                                allIDs = append(allIDs, id)
1✔
128
                        }
1✔
129
                }
130

131
                return nil
1✔
132
        })
133

134
        if err != nil {
18✔
135
                return allIDs, fmt.Errorf("walk directory %s: %w", root, err)
×
136
        }
×
137

138
        return allIDs, nil
18✔
139
}
140

141
// updateSession refreshes a session's state and metadata from its progress file.
142
// handles starting/stopping tailing based on state transitions.
143
func (m *SessionManager) updateSession(session *Session) error {
140✔
144
        prevState := session.GetState()
140✔
145

140✔
146
        // check if file is locked (active session)
140✔
147
        active, err := IsActive(session.Path)
140✔
148
        if err != nil {
141✔
149
                return fmt.Errorf("check active state: %w", err)
1✔
150
        }
1✔
151

152
        newState := SessionStateCompleted
139✔
153
        if active {
139✔
154
                newState = SessionStateActive
×
155
        }
×
156
        session.SetState(newState)
139✔
157

139✔
158
        // handle state transitions for tailing
139✔
159
        if prevState != newState {
139✔
160
                if newState == SessionStateActive && !session.IsTailing() {
×
161
                        // session became active, start tailing from beginning to capture existing content
×
162
                        if tailErr := session.StartTailing(true); tailErr != nil {
×
163
                                log.Printf("[WARN] failed to start tailing for session %s: %v", session.ID, tailErr)
×
164
                        }
×
165
                } else if newState == SessionStateCompleted && session.IsTailing() {
×
166
                        // session completed, stop tailing
×
167
                        session.StopTailing()
×
168
                }
×
169
        }
170

171
        // for completed sessions that haven't been loaded yet, load the file content once
172
        // this handles sessions discovered after they finished.
173
        // MarkLoadedIfNot is atomic to prevent double-loading from concurrent goroutines.
174
        if newState == SessionStateCompleted && session.MarkLoadedIfNot() {
273✔
175
                m.loadProgressFileIntoSession(session.Path, session)
134✔
176
        }
134✔
177

178
        // parse metadata from file header
179
        meta, err := ParseProgressHeader(session.Path)
139✔
180
        if err != nil {
139✔
181
                return fmt.Errorf("parse header: %w", err)
×
182
        }
×
183
        session.SetMetadata(meta)
139✔
184

139✔
185
        // update last modified time
139✔
186
        info, err := os.Stat(session.Path)
139✔
187
        if err != nil {
139✔
188
                return fmt.Errorf("stat file: %w", err)
×
189
        }
×
190
        session.SetLastModified(info.ModTime())
139✔
191

139✔
192
        return nil
139✔
193
}
194

195
// Get returns a session by ID, or nil if not found.
196
func (m *SessionManager) Get(id string) *Session {
41✔
197
        m.mu.RLock()
41✔
198
        defer m.mu.RUnlock()
41✔
199
        return m.sessions[id]
41✔
200
}
41✔
201

202
// All returns all sessions in the registry.
203
func (m *SessionManager) All() []*Session {
8✔
204
        m.mu.RLock()
8✔
205
        defer m.mu.RUnlock()
8✔
206

8✔
207
        result := make([]*Session, 0, len(m.sessions))
8✔
208
        for _, s := range m.sessions {
118✔
209
                result = append(result, s)
110✔
210
        }
110✔
211
        return result
8✔
212
}
213

214
// Remove removes a session from the registry and closes its resources.
215
func (m *SessionManager) Remove(id string) {
2✔
216
        m.mu.Lock()
2✔
217
        defer m.mu.Unlock()
2✔
218

2✔
219
        if session, ok := m.sessions[id]; ok {
4✔
220
                session.Close()
2✔
221
                delete(m.sessions, id)
2✔
222
        }
2✔
223
}
224

225
// Register adds an externally-created session to the manager.
226
// This is used when a session is created for live execution (BroadcastLogger)
227
// and needs to be visible in the multi-session dashboard.
228
// The session's ID is derived from its path using sessionIDFromPath.
229
func (m *SessionManager) Register(session *Session) {
6✔
230
        id := sessionIDFromPath(session.Path)
6✔
231
        session.ID = id // ensure ID matches what SessionManager expects
6✔
232

6✔
233
        m.mu.Lock()
6✔
234
        defer m.mu.Unlock()
6✔
235

6✔
236
        // don't overwrite existing session
6✔
237
        if _, exists := m.sessions[id]; exists {
7✔
238
                return
1✔
239
        }
1✔
240

241
        m.sessions[id] = session
5✔
242
}
243

244
// Close closes all sessions and clears the registry.
245
func (m *SessionManager) Close() {
26✔
246
        m.mu.Lock()
26✔
247
        defer m.mu.Unlock()
26✔
248

26✔
249
        for _, session := range m.sessions {
39✔
250
                session.Close()
13✔
251
        }
13✔
252
        m.sessions = make(map[string]*Session)
26✔
253
}
254

255
// evictOldCompleted removes oldest completed sessions when count exceeds MaxCompletedSessions.
256
// active sessions are never evicted. must be called with lock held.
257
func (m *SessionManager) evictOldCompleted() {
134✔
258
        // count completed sessions
134✔
259
        var completed []*Session
134✔
260
        for _, s := range m.sessions {
5,731✔
261
                if s.GetState() == SessionStateCompleted {
11,194✔
262
                        completed = append(completed, s)
5,597✔
263
                }
5,597✔
264
        }
265

266
        if len(completed) <= MaxCompletedSessions {
263✔
267
                return
129✔
268
        }
129✔
269

270
        // sort by start time (oldest first)
271
        sort.Slice(completed, func(i, j int) bool {
3,419✔
272
                ti := completed[i].GetMetadata().StartTime
3,414✔
273
                tj := completed[j].GetMetadata().StartTime
3,414✔
274
                return ti.Before(tj)
3,414✔
275
        })
3,414✔
276

277
        // evict oldest sessions beyond the limit
278
        toEvict := len(completed) - MaxCompletedSessions
5✔
279
        for i := range toEvict {
10✔
280
                session := completed[i]
5✔
281
                session.Close()
5✔
282
                delete(m.sessions, session.ID)
5✔
283
        }
5✔
284
}
285

286
// StartTailingActive starts tailing for all active sessions.
287
// for each active session not already tailing, starts tailing from the beginning
288
// to populate the buffer with existing content.
289
func (m *SessionManager) StartTailingActive() {
18✔
290
        m.mu.RLock()
18✔
291
        sessions := make([]*Session, 0, len(m.sessions))
18✔
292
        for _, s := range m.sessions {
20✔
293
                sessions = append(sessions, s)
2✔
294
        }
2✔
295
        m.mu.RUnlock()
18✔
296

18✔
297
        for _, session := range sessions {
20✔
298
                if session.GetState() == SessionStateActive && !session.IsTailing() {
2✔
299
                        if err := session.StartTailing(true); err != nil { // read from beginning to populate buffer
×
300
                                log.Printf("[WARN] failed to start tailing for session %s: %v", session.ID, err)
×
301
                        }
×
302
                }
303
        }
304
}
305

306
// RefreshStates checks all sessions for state changes (active->completed).
307
// stops tailing for sessions that have completed.
308
func (m *SessionManager) RefreshStates() {
2✔
309
        m.mu.RLock()
2✔
310
        sessions := make([]*Session, 0, len(m.sessions))
2✔
311
        for _, s := range m.sessions {
4✔
312
                sessions = append(sessions, s)
2✔
313
        }
2✔
314
        m.mu.RUnlock()
2✔
315

2✔
316
        for _, session := range sessions {
4✔
317
                // only check sessions that are currently tailing
2✔
318
                if !session.IsTailing() {
3✔
319
                        continue
1✔
320
                }
321

322
                // check if session is still active
323
                active, err := IsActive(session.Path)
1✔
324
                if err != nil {
1✔
325
                        continue
×
326
                }
327

328
                if !active {
2✔
329
                        // session completed, update state and stop tailing
1✔
330
                        session.SetState(SessionStateCompleted)
1✔
331
                        session.StopTailing()
1✔
332
                }
1✔
333
        }
334
}
335

336
// sessionIDFromPath derives a session ID from the progress file path.
337
// the ID includes the filename (without the "progress-" prefix and ".txt" suffix)
338
// plus an FNV-64a hash of the canonical absolute path to avoid collisions across directories.
339
//
340
// format: <plan-name>-<16-char-hex-hash>
341
// example: "/tmp/progress-my-plan.txt" -> "my-plan-a1b2c3d4e5f67890"
342
//
343
// the hash ensures uniqueness when the same plan name exists in different directories.
344
// the path is canonicalized (absolute + cleaned) before hashing for stability.
345
func sessionIDFromPath(path string) string {
183✔
346
        base := filepath.Base(path)
183✔
347
        id := strings.TrimPrefix(base, "progress-")
183✔
348
        id = strings.TrimSuffix(id, ".txt")
183✔
349

183✔
350
        canonical := path
183✔
351
        if abs, err := filepath.Abs(path); err == nil {
366✔
352
                canonical = abs
183✔
353
        }
183✔
354
        canonical = filepath.Clean(canonical)
183✔
355

183✔
356
        hasher := fnv.New64a()
183✔
357
        _, _ = hasher.Write([]byte(canonical))
183✔
358
        return fmt.Sprintf("%s-%016x", id, hasher.Sum64())
183✔
359
}
360

361
// IsActive checks if a progress file is locked by another process or the current one.
362
// returns true if the file is locked (session is running), false otherwise.
363
// uses flock with LOCK_EX|LOCK_NB to test without blocking.
364
func IsActive(path string) (bool, error) {
144✔
365
        if progress.IsPathLockedByCurrentProcess(path) {
145✔
366
                return true, nil
1✔
367
        }
1✔
368

369
        f, err := os.Open(path) //nolint:gosec // path from user-controlled glob pattern, acceptable for session discovery
143✔
370
        if err != nil {
145✔
371
                return false, fmt.Errorf("open file: %w", err)
2✔
372
        }
2✔
373
        defer f.Close()
141✔
374

141✔
375
        // try to acquire exclusive lock non-blocking
141✔
376
        gotLock, err := progress.TryLockFile(f)
141✔
377
        if err != nil {
141✔
378
                return false, fmt.Errorf("flock: %w", err)
×
379
        }
×
380

381
        // if we got the lock, file is not active
382
        // if we didn't get the lock, file is locked by another process (active)
383
        return !gotLock, nil
141✔
384
}
385

386
// ParseProgressHeader reads the header section of a progress file and extracts metadata.
387
// the header format is:
388
//
389
//        # Ralphex Progress Log
390
//        Plan: path/to/plan.md
391
//        Branch: feature-branch
392
//        Mode: full
393
//        Started: 2026-01-22 10:30:00
394
//        ------------------------------------------------------------
395
func ParseProgressHeader(path string) (SessionMetadata, error) {
144✔
396
        f, err := os.Open(path) //nolint:gosec // path from user-controlled glob pattern, acceptable for session discovery
144✔
397
        if err != nil {
145✔
398
                return SessionMetadata{}, fmt.Errorf("open file: %w", err)
1✔
399
        }
1✔
400
        defer f.Close()
143✔
401

143✔
402
        var meta SessionMetadata
143✔
403
        scanner := bufio.NewScanner(f)
143✔
404
        // increase buffer size for large lines (matching executor)
143✔
405
        buf := make([]byte, 0, 64*1024)
143✔
406
        scanner.Buffer(buf, executor.MaxScannerBuffer)
143✔
407

143✔
408
        for scanner.Scan() {
997✔
409
                line := scanner.Text()
854✔
410

854✔
411
                // stop at separator line
854✔
412
                if strings.HasPrefix(line, "---") {
997✔
413
                        break
143✔
414
                }
415

416
                // parse key-value pairs
417
                if val, found := strings.CutPrefix(line, "Plan: "); found {
852✔
418
                        meta.PlanPath = val
141✔
419
                } else if val, found := strings.CutPrefix(line, "Branch: "); found {
854✔
420
                        meta.Branch = val
143✔
421
                } else if val, found := strings.CutPrefix(line, "Mode: "); found {
712✔
422
                        meta.Mode = val
142✔
423
                } else if val, found := strings.CutPrefix(line, "Started: "); found {
569✔
424
                        // header timestamps are written in local time without a zone offset
142✔
425
                        t, err := time.ParseInLocation("2006-01-02 15:04:05", val, time.Local)
142✔
426
                        if err == nil {
284✔
427
                                meta.StartTime = t
142✔
428
                        }
142✔
429
                }
430
        }
431

432
        if err := scanner.Err(); err != nil {
143✔
433
                return SessionMetadata{}, fmt.Errorf("scan file: %w", err)
×
434
        }
×
435

436
        return meta, nil
143✔
437
}
438

439
// loadProgressFileIntoSession reads a progress file and publishes events to the session's SSE server.
440
// used for completed sessions that were discovered after they finished.
441
// errors are silently ignored since this is best-effort loading.
442
func (m *SessionManager) loadProgressFileIntoSession(path string, session *Session) {
143✔
443
        f, err := os.Open(path) //nolint:gosec // path from user-controlled glob pattern, acceptable for session discovery
143✔
444
        if err != nil {
144✔
445
                return
1✔
446
        }
1✔
447
        defer f.Close()
142✔
448

142✔
449
        scanner := bufio.NewScanner(f)
142✔
450
        // increase buffer size for large lines (matching executor)
142✔
451
        buf := make([]byte, 0, 64*1024)
142✔
452
        scanner.Buffer(buf, executor.MaxScannerBuffer)
142✔
453
        inHeader := true
142✔
454
        phase := status.PhaseTask
142✔
455
        var pendingSection string // section header waiting for first timestamped event
142✔
456

142✔
457
        for scanner.Scan() {
1,058✔
458
                line := scanner.Text()
916✔
459
                if line == "" {
942✔
460
                        continue
26✔
461
                }
462

463
                parsed, newInHeader := parseProgressLine(line, inHeader)
890✔
464
                inHeader = newInHeader
890✔
465

890✔
466
                switch parsed.Type {
890✔
467
                case ParsedLineSkip:
851✔
468
                        continue
851✔
469
                case ParsedLineSection:
14✔
470
                        if pendingSection != "" {
14✔
471
                                m.emitPendingSection(session, pendingSection, phase, time.Now())
×
472
                        }
×
473
                        phase = parsed.Phase
14✔
474
                        // defer emitting section until we see a timestamped event
14✔
475
                        pendingSection = parsed.Section
14✔
476
                case ParsedLineTimestamp:
24✔
477
                        // emit pending section with this event's timestamp (for accurate durations)
24✔
478
                        if pendingSection != "" {
38✔
479
                                m.emitPendingSection(session, pendingSection, phase, parsed.Timestamp)
14✔
480
                                pendingSection = ""
14✔
481
                        }
14✔
482
                        event := Event{
24✔
483
                                Type:      parsed.EventType,
24✔
484
                                Phase:     phase,
24✔
485
                                Text:      parsed.Text,
24✔
486
                                Timestamp: parsed.Timestamp,
24✔
487
                                Signal:    parsed.Signal,
24✔
488
                        }
24✔
489
                        if event.Type == EventTypeOutput {
46✔
490
                                if stats, ok := parseDiffStats(event.Text); ok {
23✔
491
                                        session.SetDiffStats(stats)
1✔
492
                                }
1✔
493
                        }
494
                        _ = session.Publish(event)
24✔
495
                case ParsedLinePlain:
1✔
496
                        _ = session.Publish(Event{
1✔
497
                                Type:      EventTypeOutput,
1✔
498
                                Phase:     phase,
1✔
499
                                Text:      parsed.Text,
1✔
500
                                Timestamp: time.Now(),
1✔
501
                        })
1✔
502
                }
503
        }
504

505
        if pendingSection != "" {
142✔
506
                m.emitPendingSection(session, pendingSection, phase, time.Now())
×
507
        }
×
508
}
509

510
// phaseFromSection determines the phase from a section name.
511
// checks "codex"/"custom" before "review" because external review sections should be PhaseCodex.
512
func phaseFromSection(name string) status.Phase {
51✔
513
        nameLower := strings.ToLower(name)
51✔
514
        switch {
51✔
515
        case strings.Contains(nameLower, "task"):
21✔
516
                return status.PhaseTask
21✔
517
        case strings.Contains(nameLower, "codex"), strings.Contains(nameLower, "custom"):
11✔
518
                return status.PhaseCodex
11✔
519
        case strings.Contains(nameLower, "review"):
10✔
520
                return status.PhaseReview
10✔
521
        case strings.Contains(nameLower, "claude-eval") || strings.Contains(nameLower, "claude eval"):
8✔
522
                return status.PhaseClaudeEval
8✔
523
        default:
1✔
524
                return status.PhaseTask
1✔
525
        }
526
}
527

528
// emitPendingSection publishes section and task_start events for a pending section.
529
// task_start is emitted before section for task iteration sections.
530
func (m *SessionManager) emitPendingSection(session *Session, sectionName string, phase status.Phase, ts time.Time) {
14✔
531
        // emit task_start event for task iteration sections
14✔
532
        if matches := taskIterationRegex.FindStringSubmatch(sectionName); matches != nil {
20✔
533
                taskNum, err := strconv.Atoi(matches[1])
6✔
534
                if err != nil {
6✔
535
                        // log parse error but continue - section will still be emitted
×
536
                        log.Printf("[WARN] failed to parse task number from section %q: %v", sectionName, err)
×
537
                } else {
6✔
538
                        if err := session.Publish(Event{
6✔
539
                                Type:      EventTypeTaskStart,
6✔
540
                                Phase:     phase,
6✔
541
                                TaskNum:   taskNum,
6✔
542
                                Text:      sectionName,
6✔
543
                                Timestamp: ts,
6✔
544
                        }); err != nil {
6✔
545
                                log.Printf("[WARN] failed to publish task_start event: %v", err)
×
546
                        }
×
547
                }
548
        }
549

550
        if err := session.Publish(Event{
14✔
551
                Type:      EventTypeSection,
14✔
552
                Phase:     phase,
14✔
553
                Section:   sectionName,
14✔
554
                Text:      sectionName,
14✔
555
                Timestamp: ts,
14✔
556
        }); err != nil {
14✔
557
                log.Printf("[WARN] failed to publish section event: %v", err)
×
558
        }
×
559
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc