• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

umputun / ralphex / 21743130138

06 Feb 2026 07:57AM UTC coverage: 80.111% (-0.8%) from 80.861%
21743130138

push

github

Claude
refactor: address code review findings and improve package structure

- Remove dead code: IsPlanDraft, IsTerminalSignal, NewErrorEvent, NewWarnEvent, Phase aliases
- Fix stale comments, extract magic numbers, remove redundant conditions
- Improve session manager: add error logging, convert standalone functions to methods
- Deduplicate shared code: maxScannerBuffer constant, progress file parsing, review pipeline
- Extract shared types to pkg/status (signals, Phase, Section) from processor/signals
- Merge pkg/render into pkg/input
- Move Logger interface to consumer-side in pkg/web, decoupling web from processor

195 of 205 new or added lines in 13 files covered. (95.12%)

1 existing line in 1 file now uncovered.

4463 of 5571 relevant lines covered (80.11%)

157.27 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

88.77
/pkg/web/session_manager.go
1
package web
2

3
import (
4
        "bufio"
5
        "fmt"
6
        "hash/fnv"
7
        "log"
8
        "os"
9
        "path/filepath"
10
        "sort"
11
        "strconv"
12
        "strings"
13
        "sync"
14
        "time"
15

16
        "github.com/umputun/ralphex/pkg/executor"
17
        "github.com/umputun/ralphex/pkg/progress"
18
        "github.com/umputun/ralphex/pkg/status"
19
)
20

21
// MaxCompletedSessions is the maximum number of completed sessions to retain.
22
// active sessions are never evicted. oldest completed sessions are removed
23
// when this limit is exceeded to prevent unbounded memory growth.
24
const MaxCompletedSessions = 100
25

26
// SessionManager maintains a registry of all discovered sessions.
27
// it handles discovery of progress files, state detection via flock,
28
// and provides access to sessions by ID.
29
// completed sessions are automatically evicted when MaxCompletedSessions is exceeded.
30
type SessionManager struct {
31
        mu       sync.RWMutex
32
        sessions map[string]*Session // keyed by session ID
33
}
34

35
// NewSessionManager creates a new session manager with an empty registry.
36
func NewSessionManager() *SessionManager {
52✔
37
        return &SessionManager{
52✔
38
                sessions: make(map[string]*Session),
52✔
39
        }
52✔
40
}
52✔
41

42
// Discover scans a directory for progress files matching progress-*.txt pattern.
43
// for each file found, it creates or updates a session in the registry.
44
// returns the list of discovered session IDs.
45
func (m *SessionManager) Discover(dir string) ([]string, error) {
27✔
46
        pattern := filepath.Join(dir, "progress-*.txt")
27✔
47
        matches, err := filepath.Glob(pattern)
27✔
48
        if err != nil {
27✔
49
                return nil, fmt.Errorf("glob progress files: %w", err)
×
50
        }
×
51

52
        ids := make([]string, 0, len(matches))
27✔
53
        for _, path := range matches {
165✔
54
                id := sessionIDFromPath(path)
138✔
55
                ids = append(ids, id)
138✔
56

138✔
57
                // check if session already exists
138✔
58
                m.mu.RLock()
138✔
59
                existing := m.sessions[id]
138✔
60
                m.mu.RUnlock()
138✔
61

138✔
62
                if existing != nil {
142✔
63
                        // update existing session state
4✔
64
                        if err := m.updateSession(existing); err != nil {
4✔
NEW
65
                                log.Printf("[WARN] failed to update session %s: %v", id, err)
×
66
                                continue
×
67
                        }
68
                } else {
134✔
69
                        // create new session
134✔
70
                        session := NewSession(id, path)
134✔
71
                        if err := m.updateSession(session); err != nil {
135✔
72
                                log.Printf("[WARN] failed to create session %s: %v", id, err)
1✔
73
                                continue
1✔
74
                        }
75
                        m.mu.Lock()
133✔
76
                        m.sessions[id] = session
133✔
77
                        m.evictOldCompleted()
133✔
78
                        m.mu.Unlock()
133✔
79
                }
80
        }
81

82
        return ids, nil
27✔
83
}
84

85
// DiscoverRecursive walks a directory tree and discovers all progress files.
86
// unlike Discover, this searches subdirectories recursively.
87
// returns the list of all discovered session IDs (deduplicated).
88
func (m *SessionManager) DiscoverRecursive(root string) ([]string, error) {
12✔
89
        seenDirs := make(map[string]bool)
12✔
90
        seenIDs := make(map[string]bool)
12✔
91
        var allIDs []string
12✔
92

12✔
93
        err := filepath.WalkDir(root, func(path string, d os.DirEntry, err error) error {
27✔
94
                if err != nil {
16✔
95
                        // skip directories that can't be accessed
1✔
96
                        if d != nil && d.IsDir() {
1✔
97
                                return filepath.SkipDir
×
98
                        }
×
99
                        return nil
1✔
100
                }
101

102
                // skip hidden directories
103
                if d.IsDir() && strings.HasPrefix(d.Name(), ".") && path != root {
15✔
104
                        return filepath.SkipDir
1✔
105
                }
1✔
106

107
                // skip non-progress files
108
                if d.IsDir() || !isProgressFile(path) {
25✔
109
                        return nil
12✔
110
                }
12✔
111

112
                // only call Discover once per directory
113
                dir := filepath.Dir(path)
1✔
114
                if seenDirs[dir] {
1✔
115
                        return nil
×
116
                }
×
117
                seenDirs[dir] = true
1✔
118

1✔
119
                ids, discoverErr := m.Discover(dir)
1✔
120
                if discoverErr != nil {
1✔
121
                        return nil //nolint:nilerr // best-effort discovery, errors for individual directories are ignored
×
122
                }
×
123

124
                for _, id := range ids {
2✔
125
                        if !seenIDs[id] {
2✔
126
                                seenIDs[id] = true
1✔
127
                                allIDs = append(allIDs, id)
1✔
128
                        }
1✔
129
                }
130

131
                return nil
1✔
132
        })
133

134
        if err != nil {
12✔
135
                return allIDs, fmt.Errorf("walk directory %s: %w", root, err)
×
136
        }
×
137

138
        return allIDs, nil
12✔
139
}
140

141
// updateSession refreshes a session's state and metadata from its progress file.
142
// handles starting/stopping tailing based on state transitions.
143
func (m *SessionManager) updateSession(session *Session) error {
138✔
144
        prevState := session.GetState()
138✔
145

138✔
146
        // check if file is locked (active session)
138✔
147
        active, err := IsActive(session.Path)
138✔
148
        if err != nil {
139✔
149
                return fmt.Errorf("check active state: %w", err)
1✔
150
        }
1✔
151

152
        newState := SessionStateCompleted
137✔
153
        if active {
137✔
154
                newState = SessionStateActive
×
155
        }
×
156
        session.SetState(newState)
137✔
157

137✔
158
        // handle state transitions for tailing
137✔
159
        if prevState != newState {
137✔
160
                if newState == SessionStateActive && !session.IsTailing() {
×
161
                        // session became active, start tailing from beginning to capture existing content
×
162
                        if tailErr := session.StartTailing(true); tailErr != nil {
×
163
                                log.Printf("[WARN] failed to start tailing for session %s: %v", session.ID, tailErr)
×
164
                        }
×
165
                } else if newState == SessionStateCompleted && session.IsTailing() {
×
166
                        // session completed, stop tailing
×
167
                        session.StopTailing()
×
168
                }
×
169
        }
170

171
        // for completed sessions that haven't been loaded yet, load the file content once
172
        // this handles sessions discovered after they finished.
173
        // MarkLoadedIfNot is atomic to prevent double-loading from concurrent goroutines.
174
        if newState == SessionStateCompleted && session.MarkLoadedIfNot() {
270✔
175
                m.loadProgressFileIntoSession(session.Path, session)
133✔
176
        }
133✔
177

178
        // parse metadata from file header
179
        meta, err := ParseProgressHeader(session.Path)
137✔
180
        if err != nil {
137✔
181
                return fmt.Errorf("parse header: %w", err)
×
182
        }
×
183
        session.SetMetadata(meta)
137✔
184

137✔
185
        // update last modified time
137✔
186
        info, err := os.Stat(session.Path)
137✔
187
        if err != nil {
137✔
188
                return fmt.Errorf("stat file: %w", err)
×
189
        }
×
190
        session.SetLastModified(info.ModTime())
137✔
191

137✔
192
        return nil
137✔
193
}
194

195
// Get returns a session by ID, or nil if not found.
196
func (m *SessionManager) Get(id string) *Session {
33✔
197
        m.mu.RLock()
33✔
198
        defer m.mu.RUnlock()
33✔
199
        return m.sessions[id]
33✔
200
}
33✔
201

202
// All returns all sessions in the registry.
203
func (m *SessionManager) All() []*Session {
8✔
204
        m.mu.RLock()
8✔
205
        defer m.mu.RUnlock()
8✔
206

8✔
207
        result := make([]*Session, 0, len(m.sessions))
8✔
208
        for _, s := range m.sessions {
118✔
209
                result = append(result, s)
110✔
210
        }
110✔
211
        return result
8✔
212
}
213

214
// Remove removes a session from the registry and closes its resources.
215
func (m *SessionManager) Remove(id string) {
2✔
216
        m.mu.Lock()
2✔
217
        defer m.mu.Unlock()
2✔
218

2✔
219
        if session, ok := m.sessions[id]; ok {
4✔
220
                session.Close()
2✔
221
                delete(m.sessions, id)
2✔
222
        }
2✔
223
}
224

225
// Register adds an externally-created session to the manager.
226
// This is used when a session is created for live execution (BroadcastLogger)
227
// and needs to be visible in the multi-session dashboard.
228
// The session's ID is derived from its path using sessionIDFromPath.
229
func (m *SessionManager) Register(session *Session) {
6✔
230
        id := sessionIDFromPath(session.Path)
6✔
231
        session.ID = id // ensure ID matches what SessionManager expects
6✔
232

6✔
233
        m.mu.Lock()
6✔
234
        defer m.mu.Unlock()
6✔
235

6✔
236
        // don't overwrite existing session
6✔
237
        if _, exists := m.sessions[id]; exists {
7✔
238
                return
1✔
239
        }
1✔
240

241
        m.sessions[id] = session
5✔
242
}
243

244
// Close closes all sessions and clears the registry.
245
func (m *SessionManager) Close() {
26✔
246
        m.mu.Lock()
26✔
247
        defer m.mu.Unlock()
26✔
248

26✔
249
        for _, session := range m.sessions {
39✔
250
                session.Close()
13✔
251
        }
13✔
252
        m.sessions = make(map[string]*Session)
26✔
253
}
254

255
// evictOldCompleted removes oldest completed sessions when count exceeds MaxCompletedSessions.
256
// active sessions are never evicted. must be called with lock held.
257
func (m *SessionManager) evictOldCompleted() {
133✔
258
        // count completed sessions
133✔
259
        var completed []*Session
133✔
260
        for _, s := range m.sessions {
5,729✔
261
                if s.GetState() == SessionStateCompleted {
11,192✔
262
                        completed = append(completed, s)
5,596✔
263
                }
5,596✔
264
        }
265

266
        if len(completed) <= MaxCompletedSessions {
261✔
267
                return
128✔
268
        }
128✔
269

270
        // sort by start time (oldest first)
271
        sort.Slice(completed, func(i, j int) bool {
3,376✔
272
                ti := completed[i].GetMetadata().StartTime
3,371✔
273
                tj := completed[j].GetMetadata().StartTime
3,371✔
274
                return ti.Before(tj)
3,371✔
275
        })
3,371✔
276

277
        // evict oldest sessions beyond the limit
278
        toEvict := len(completed) - MaxCompletedSessions
5✔
279
        for i := range toEvict {
10✔
280
                session := completed[i]
5✔
281
                session.Close()
5✔
282
                delete(m.sessions, session.ID)
5✔
283
        }
5✔
284
}
285

286
// StartTailingActive starts tailing for all active sessions.
287
// for each active session not already tailing, starts tailing from the beginning
288
// to populate the buffer with existing content.
289
func (m *SessionManager) StartTailingActive() {
12✔
290
        m.mu.RLock()
12✔
291
        sessions := make([]*Session, 0, len(m.sessions))
12✔
292
        for _, s := range m.sessions {
14✔
293
                sessions = append(sessions, s)
2✔
294
        }
2✔
295
        m.mu.RUnlock()
12✔
296

12✔
297
        for _, session := range sessions {
14✔
298
                if session.GetState() == SessionStateActive && !session.IsTailing() {
2✔
299
                        if err := session.StartTailing(true); err != nil { // read from beginning to populate buffer
×
300
                                log.Printf("[WARN] failed to start tailing for session %s: %v", session.ID, err)
×
301
                        }
×
302
                }
303
        }
304
}
305

306
// RefreshStates checks all sessions for state changes (active->completed).
307
// stops tailing for sessions that have completed.
308
func (m *SessionManager) RefreshStates() {
2✔
309
        m.mu.RLock()
2✔
310
        sessions := make([]*Session, 0, len(m.sessions))
2✔
311
        for _, s := range m.sessions {
4✔
312
                sessions = append(sessions, s)
2✔
313
        }
2✔
314
        m.mu.RUnlock()
2✔
315

2✔
316
        for _, session := range sessions {
4✔
317
                // only check sessions that are currently tailing
2✔
318
                if !session.IsTailing() {
3✔
319
                        continue
1✔
320
                }
321

322
                // check if session is still active
323
                active, err := IsActive(session.Path)
1✔
324
                if err != nil {
1✔
325
                        continue
×
326
                }
327

328
                if !active {
2✔
329
                        // session completed, update state and stop tailing
1✔
330
                        session.SetState(SessionStateCompleted)
1✔
331
                        session.StopTailing()
1✔
332
                }
1✔
333
        }
334
}
335

336
// sessionIDFromPath derives a session ID from the progress file path.
337
// the ID includes the filename (without the "progress-" prefix and ".txt" suffix)
338
// plus an FNV-64a hash of the canonical absolute path to avoid collisions across directories.
339
//
340
// format: <plan-name>-<16-char-hex-hash>
341
// example: "/tmp/progress-my-plan.txt" -> "my-plan-a1b2c3d4e5f67890"
342
//
343
// the hash ensures uniqueness when the same plan name exists in different directories.
344
// the path is canonicalized (absolute + cleaned) before hashing for stability.
345
func sessionIDFromPath(path string) string {
174✔
346
        base := filepath.Base(path)
174✔
347
        id := strings.TrimPrefix(base, "progress-")
174✔
348
        id = strings.TrimSuffix(id, ".txt")
174✔
349

174✔
350
        canonical := path
174✔
351
        if abs, err := filepath.Abs(path); err == nil {
348✔
352
                canonical = abs
174✔
353
        }
174✔
354
        canonical = filepath.Clean(canonical)
174✔
355

174✔
356
        hasher := fnv.New64a()
174✔
357
        _, _ = hasher.Write([]byte(canonical))
174✔
358
        return fmt.Sprintf("%s-%016x", id, hasher.Sum64())
174✔
359
}
360

361
// IsActive checks if a progress file is locked by another process or the current one.
362
// returns true if the file is locked (session is running), false otherwise.
363
// uses flock with LOCK_EX|LOCK_NB to test without blocking.
364
func IsActive(path string) (bool, error) {
142✔
365
        if progress.IsPathLockedByCurrentProcess(path) {
143✔
366
                return true, nil
1✔
367
        }
1✔
368

369
        f, err := os.Open(path) //nolint:gosec // path from user-controlled glob pattern, acceptable for session discovery
141✔
370
        if err != nil {
143✔
371
                return false, fmt.Errorf("open file: %w", err)
2✔
372
        }
2✔
373
        defer f.Close()
139✔
374

139✔
375
        // try to acquire exclusive lock non-blocking
139✔
376
        gotLock, err := progress.TryLockFile(f)
139✔
377
        if err != nil {
139✔
378
                return false, fmt.Errorf("flock: %w", err)
×
379
        }
×
380

381
        // if we got the lock, file is not active
382
        // if we didn't get the lock, file is locked by another process (active)
383
        return !gotLock, nil
139✔
384
}
385

386
// ParseProgressHeader reads the header section of a progress file and extracts metadata.
387
// the header format is:
388
//
389
//        # Ralphex Progress Log
390
//        Plan: path/to/plan.md
391
//        Branch: feature-branch
392
//        Mode: full
393
//        Started: 2026-01-22 10:30:00
394
//        ------------------------------------------------------------
395
func ParseProgressHeader(path string) (SessionMetadata, error) {
142✔
396
        f, err := os.Open(path) //nolint:gosec // path from user-controlled glob pattern, acceptable for session discovery
142✔
397
        if err != nil {
143✔
398
                return SessionMetadata{}, fmt.Errorf("open file: %w", err)
1✔
399
        }
1✔
400
        defer f.Close()
141✔
401

141✔
402
        var meta SessionMetadata
141✔
403
        scanner := bufio.NewScanner(f)
141✔
404
        // increase buffer size for large lines (matching executor)
141✔
405
        buf := make([]byte, 0, 64*1024)
141✔
406
        scanner.Buffer(buf, executor.MaxScannerBuffer)
141✔
407

141✔
408
        for scanner.Scan() {
983✔
409
                line := scanner.Text()
842✔
410

842✔
411
                // stop at separator line
842✔
412
                if strings.HasPrefix(line, "---") {
983✔
413
                        break
141✔
414
                }
415

416
                // parse key-value pairs
417
                if val, found := strings.CutPrefix(line, "Plan: "); found {
840✔
418
                        meta.PlanPath = val
139✔
419
                } else if val, found := strings.CutPrefix(line, "Branch: "); found {
842✔
420
                        meta.Branch = val
141✔
421
                } else if val, found := strings.CutPrefix(line, "Mode: "); found {
702✔
422
                        meta.Mode = val
140✔
423
                } else if val, found := strings.CutPrefix(line, "Started: "); found {
561✔
424
                        t, err := time.Parse("2006-01-02 15:04:05", val)
140✔
425
                        if err == nil {
280✔
426
                                meta.StartTime = t
140✔
427
                        }
140✔
428
                }
429
        }
430

431
        if err := scanner.Err(); err != nil {
141✔
432
                return SessionMetadata{}, fmt.Errorf("scan file: %w", err)
×
433
        }
×
434

435
        return meta, nil
141✔
436
}
437

438
// loadProgressFileIntoSession reads a progress file and publishes events to the session's SSE server.
439
// used for completed sessions that were discovered after they finished.
440
// errors are silently ignored since this is best-effort loading.
441
func (m *SessionManager) loadProgressFileIntoSession(path string, session *Session) {
141✔
442
        f, err := os.Open(path) //nolint:gosec // path from user-controlled glob pattern, acceptable for session discovery
141✔
443
        if err != nil {
142✔
444
                return
1✔
445
        }
1✔
446
        defer f.Close()
140✔
447

140✔
448
        scanner := bufio.NewScanner(f)
140✔
449
        // increase buffer size for large lines (matching executor)
140✔
450
        buf := make([]byte, 0, 64*1024)
140✔
451
        scanner.Buffer(buf, executor.MaxScannerBuffer)
140✔
452
        inHeader := true
140✔
453
        phase := status.PhaseTask
140✔
454
        var pendingSection string // section header waiting for first timestamped event
140✔
455

140✔
456
        for scanner.Scan() {
1,041✔
457
                line := scanner.Text()
901✔
458
                if line == "" {
926✔
459
                        continue
25✔
460
                }
461

462
                parsed, newInHeader := parseProgressLine(line, inHeader)
876✔
463
                inHeader = newInHeader
876✔
464

876✔
465
                switch parsed.Type {
876✔
466
                case ParsedLineSkip:
839✔
467
                        continue
839✔
468
                case ParsedLineSection:
14✔
469
                        phase = parsed.Phase
14✔
470
                        // defer emitting section until we see a timestamped event
14✔
471
                        pendingSection = parsed.Section
14✔
472
                case ParsedLineTimestamp:
22✔
473
                        // emit pending section with this event's timestamp (for accurate durations)
22✔
474
                        if pendingSection != "" {
36✔
475
                                m.emitPendingSection(session, pendingSection, phase, parsed.Timestamp)
14✔
476
                                pendingSection = ""
14✔
477
                        }
14✔
478
                        _ = session.Publish(Event{
22✔
479
                                Type:      parsed.EventType,
22✔
480
                                Phase:     phase,
22✔
481
                                Text:      parsed.Text,
22✔
482
                                Timestamp: parsed.Timestamp,
22✔
483
                                Signal:    parsed.Signal,
22✔
484
                        })
22✔
485
                case ParsedLinePlain:
1✔
486
                        _ = session.Publish(Event{
1✔
487
                                Type:      EventTypeOutput,
1✔
488
                                Phase:     phase,
1✔
489
                                Text:      parsed.Text,
1✔
490
                                Timestamp: time.Now(),
1✔
491
                        })
1✔
492
                }
493
        }
494
}
495

496
// phaseFromSection determines the phase from a section name.
497
// checks "codex"/"custom" before "review" because external review sections should be PhaseCodex.
498
func phaseFromSection(name string) status.Phase {
46✔
499
        nameLower := strings.ToLower(name)
46✔
500
        switch {
46✔
501
        case strings.Contains(nameLower, "task"):
18✔
502
                return status.PhaseTask
18✔
503
        case strings.Contains(nameLower, "codex"), strings.Contains(nameLower, "custom"):
10✔
504
                return status.PhaseCodex
10✔
505
        case strings.Contains(nameLower, "review"):
9✔
506
                return status.PhaseReview
9✔
507
        case strings.Contains(nameLower, "claude-eval") || strings.Contains(nameLower, "claude eval"):
8✔
508
                return status.PhaseClaudeEval
8✔
509
        default:
1✔
510
                return status.PhaseTask
1✔
511
        }
512
}
513

514
// emitPendingSection publishes section and task_start events for a pending section.
515
// task_start is emitted before section for task iteration sections.
516
func (m *SessionManager) emitPendingSection(session *Session, sectionName string, phase status.Phase, ts time.Time) {
14✔
517
        // emit task_start event for task iteration sections
14✔
518
        if matches := taskIterationRegex.FindStringSubmatch(sectionName); matches != nil {
20✔
519
                taskNum, err := strconv.Atoi(matches[1])
6✔
520
                if err != nil {
6✔
521
                        // log parse error but continue - section will still be emitted
×
522
                        log.Printf("[WARN] failed to parse task number from section %q: %v", sectionName, err)
×
523
                } else {
6✔
524
                        if err := session.Publish(Event{
6✔
525
                                Type:      EventTypeTaskStart,
6✔
526
                                Phase:     phase,
6✔
527
                                TaskNum:   taskNum,
6✔
528
                                Text:      sectionName,
6✔
529
                                Timestamp: ts,
6✔
530
                        }); err != nil {
6✔
531
                                log.Printf("[WARN] failed to publish task_start event: %v", err)
×
532
                        }
×
533
                }
534
        }
535

536
        if err := session.Publish(Event{
14✔
537
                Type:      EventTypeSection,
14✔
538
                Phase:     phase,
14✔
539
                Section:   sectionName,
14✔
540
                Text:      sectionName,
14✔
541
                Timestamp: ts,
14✔
542
        }); err != nil {
14✔
543
                log.Printf("[WARN] failed to publish section event: %v", err)
×
544
        }
×
545
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc