• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

umputun / ralphex / 21295900463

23 Jan 2026 05:55PM UTC coverage: 78.878% (-0.5%) from 79.395%
21295900463

Pull #17

github

melonamin
fix: address linting issues and add security hardening

- Fix variable shadowing in main.go (branchErr, gitignoreErr)
- Add path traversal validation for plan file requests
- Track and log dropped SSE events for slow clients
- Add warnings for invalid watch directories
- Update tests to use relative plan paths
Pull Request #17: feat: add web dashboard with real-time streaming and multi-session support

1513 of 1931 new or added lines in 19 files covered. (78.35%)

10 existing lines in 3 files now uncovered.

3066 of 3887 relevant lines covered (78.88%)

193.81 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

83.72
/pkg/web/session_manager.go
1
package web
2

3
import (
4
        "bufio"
5
        "errors"
6
        "fmt"
7
        "hash/fnv"
8
        "os"
9
        "path/filepath"
10
        "sort"
11
        "strings"
12
        "sync"
13
        "syscall"
14
        "time"
15

16
        "github.com/umputun/ralphex/pkg/processor"
17
        "github.com/umputun/ralphex/pkg/progress"
18
)
19

20
// MaxCompletedSessions is the maximum number of completed sessions to retain.
21
// active sessions are never evicted. oldest completed sessions are removed
22
// when this limit is exceeded to prevent unbounded memory growth.
23
const MaxCompletedSessions = 100
24

25
// SessionManager maintains a registry of all discovered sessions.
26
// it handles discovery of progress files, state detection via flock,
27
// and provides access to sessions by ID.
28
// completed sessions are automatically evicted when MaxCompletedSessions is exceeded.
29
type SessionManager struct {
30
        mu       sync.RWMutex
31
        sessions map[string]*Session // keyed by session ID
32
}
33

34
// NewSessionManager creates a new session manager with an empty registry.
35
func NewSessionManager() *SessionManager {
31✔
36
        return &SessionManager{
31✔
37
                sessions: make(map[string]*Session),
31✔
38
        }
31✔
39
}
31✔
40

41
// Discover scans a directory for progress files matching progress-*.txt pattern.
42
// for each file found, it creates or updates a session in the registry.
43
// returns the list of discovered session IDs.
44
func (m *SessionManager) Discover(dir string) ([]string, error) {
23✔
45
        pattern := filepath.Join(dir, "progress-*.txt")
23✔
46
        matches, err := filepath.Glob(pattern)
23✔
47
        if err != nil {
23✔
NEW
48
                return nil, fmt.Errorf("glob progress files: %w", err)
×
NEW
49
        }
×
50

51
        ids := make([]string, 0, len(matches))
23✔
52
        for _, path := range matches {
48✔
53
                id := sessionIDFromPath(path)
25✔
54
                ids = append(ids, id)
25✔
55

25✔
56
                // check if session already exists
25✔
57
                m.mu.RLock()
25✔
58
                existing := m.sessions[id]
25✔
59
                m.mu.RUnlock()
25✔
60

25✔
61
                if existing != nil {
28✔
62
                        // update existing session state
3✔
63
                        if err := m.updateSession(existing); err != nil {
3✔
NEW
64
                                // log error but continue with other sessions
×
NEW
65
                                continue
×
66
                        }
67
                } else {
22✔
68
                        // create new session
22✔
69
                        session := NewSession(id, path)
22✔
70
                        if err := m.updateSession(session); err != nil {
22✔
NEW
71
                                continue
×
72
                        }
73
                        m.mu.Lock()
22✔
74
                        m.sessions[id] = session
22✔
75
                        m.evictOldCompleted()
22✔
76
                        m.mu.Unlock()
22✔
77
                }
78
        }
79

80
        return ids, nil
23✔
81
}
82

83
// DiscoverRecursive walks a directory tree and discovers all progress files.
84
// unlike Discover, this searches subdirectories recursively.
85
// returns the list of all discovered session IDs (deduplicated).
86
func (m *SessionManager) DiscoverRecursive(root string) ([]string, error) {
7✔
87
        seenDirs := make(map[string]bool)
7✔
88
        seenIDs := make(map[string]bool)
7✔
89
        var allIDs []string
7✔
90

7✔
91
        err := filepath.WalkDir(root, func(path string, d os.DirEntry, err error) error {
17✔
92
                if err != nil {
10✔
NEW
93
                        // skip directories that can't be accessed
×
NEW
94
                        if d != nil && d.IsDir() {
×
NEW
95
                                return filepath.SkipDir
×
NEW
96
                        }
×
NEW
97
                        return nil
×
98
                }
99

100
                // skip hidden directories
101
                if d.IsDir() && strings.HasPrefix(d.Name(), ".") && path != root {
11✔
102
                        return filepath.SkipDir
1✔
103
                }
1✔
104

105
                // skip non-progress files
106
                if d.IsDir() || !isProgressFile(path) {
17✔
107
                        return nil
8✔
108
                }
8✔
109

110
                // only call Discover once per directory
111
                dir := filepath.Dir(path)
1✔
112
                if seenDirs[dir] {
1✔
NEW
113
                        return nil
×
NEW
114
                }
×
115
                seenDirs[dir] = true
1✔
116

1✔
117
                ids, discoverErr := m.Discover(dir)
1✔
118
                if discoverErr != nil {
1✔
NEW
119
                        return nil //nolint:nilerr // best-effort discovery, errors for individual directories are ignored
×
NEW
120
                }
×
121

122
                for _, id := range ids {
2✔
123
                        if !seenIDs[id] {
2✔
124
                                seenIDs[id] = true
1✔
125
                                allIDs = append(allIDs, id)
1✔
126
                        }
1✔
127
                }
128

129
                return nil
1✔
130
        })
131

132
        if err != nil {
7✔
NEW
133
                return allIDs, fmt.Errorf("walk directory %s: %w", root, err)
×
NEW
134
        }
×
135

136
        return allIDs, nil
7✔
137
}
138

139
// updateSession refreshes a session's state and metadata from its progress file.
140
// handles starting/stopping tailing based on state transitions.
141
func (m *SessionManager) updateSession(session *Session) error {
25✔
142
        prevState := session.GetState()
25✔
143

25✔
144
        // check if file is locked (active session)
25✔
145
        active, err := IsActive(session.Path)
25✔
146
        if err != nil {
25✔
NEW
147
                return fmt.Errorf("check active state: %w", err)
×
NEW
148
        }
×
149

150
        newState := SessionStateCompleted
25✔
151
        if active {
25✔
NEW
152
                newState = SessionStateActive
×
NEW
153
        }
×
154
        session.SetState(newState)
25✔
155

25✔
156
        // handle state transitions for tailing
25✔
157
        if prevState != newState {
25✔
NEW
158
                if newState == SessionStateActive && !session.IsTailing() {
×
NEW
159
                        // session became active, start tailing from beginning to capture existing content
×
NEW
160
                        _ = session.StartTailing(true)
×
NEW
161
                } else if newState == SessionStateCompleted && session.IsTailing() {
×
NEW
162
                        // session completed, stop tailing
×
NEW
163
                        session.StopTailing()
×
NEW
164
                }
×
165
        }
166

167
        // for completed sessions with empty buffer, load the file content once
168
        // this handles sessions discovered after they finished
169
        if newState == SessionStateCompleted && session.Buffer.Count() == 0 {
50✔
170
                loadProgressFileIntoBuffer(session.Path, session.Buffer)
25✔
171
        }
25✔
172

173
        // parse metadata from file header
174
        meta, err := ParseProgressHeader(session.Path)
25✔
175
        if err != nil {
25✔
NEW
176
                return fmt.Errorf("parse header: %w", err)
×
NEW
177
        }
×
178
        session.SetMetadata(meta)
25✔
179

25✔
180
        // update last modified time
25✔
181
        info, err := os.Stat(session.Path)
25✔
182
        if err != nil {
25✔
NEW
183
                return fmt.Errorf("stat file: %w", err)
×
NEW
184
        }
×
185
        session.SetLastModified(info.ModTime())
25✔
186

25✔
187
        return nil
25✔
188
}
189

190
// Get returns a session by ID, or nil if not found.
191
func (m *SessionManager) Get(id string) *Session {
27✔
192
        m.mu.RLock()
27✔
193
        defer m.mu.RUnlock()
27✔
194
        return m.sessions[id]
27✔
195
}
27✔
196

197
// All returns all sessions in the registry.
198
func (m *SessionManager) All() []*Session {
6✔
199
        m.mu.RLock()
6✔
200
        defer m.mu.RUnlock()
6✔
201

6✔
202
        result := make([]*Session, 0, len(m.sessions))
6✔
203
        for _, s := range m.sessions {
11✔
204
                result = append(result, s)
5✔
205
        }
5✔
206
        return result
6✔
207
}
208

209
// Remove removes a session from the registry and closes its resources.
210
func (m *SessionManager) Remove(id string) {
2✔
211
        m.mu.Lock()
2✔
212
        defer m.mu.Unlock()
2✔
213

2✔
214
        if session, ok := m.sessions[id]; ok {
4✔
215
                session.Close()
2✔
216
                delete(m.sessions, id)
2✔
217
        }
2✔
218
}
219

220
// Close closes all sessions and clears the registry.
221
func (m *SessionManager) Close() {
1✔
222
        m.mu.Lock()
1✔
223
        defer m.mu.Unlock()
1✔
224

1✔
225
        for _, session := range m.sessions {
3✔
226
                session.Close()
2✔
227
        }
2✔
228
        m.sessions = make(map[string]*Session)
1✔
229
}
230

231
// evictOldCompleted removes oldest completed sessions when count exceeds MaxCompletedSessions.
232
// active sessions are never evicted. must be called with lock held.
233
func (m *SessionManager) evictOldCompleted() {
22✔
234
        // count completed sessions
22✔
235
        var completed []*Session
22✔
236
        for _, s := range m.sessions {
47✔
237
                if s.GetState() == SessionStateCompleted {
50✔
238
                        completed = append(completed, s)
25✔
239
                }
25✔
240
        }
241

242
        if len(completed) <= MaxCompletedSessions {
44✔
243
                return
22✔
244
        }
22✔
245

246
        // sort by start time (oldest first)
NEW
247
        sort.Slice(completed, func(i, j int) bool {
×
NEW
248
                ti := completed[i].GetMetadata().StartTime
×
NEW
249
                tj := completed[j].GetMetadata().StartTime
×
NEW
250
                return ti.Before(tj)
×
NEW
251
        })
×
252

253
        // evict oldest sessions beyond the limit
NEW
254
        toEvict := len(completed) - MaxCompletedSessions
×
NEW
255
        for i := range toEvict {
×
NEW
256
                session := completed[i]
×
NEW
257
                session.Close()
×
NEW
258
                delete(m.sessions, session.ID)
×
NEW
259
        }
×
260
}
261

262
// StartTailingActive starts tailing for all active sessions.
263
// for each active session not already tailing, starts tailing from the beginning
264
// to populate the buffer with existing content.
265
func (m *SessionManager) StartTailingActive() {
7✔
266
        m.mu.RLock()
7✔
267
        sessions := make([]*Session, 0, len(m.sessions))
7✔
268
        for _, s := range m.sessions {
8✔
269
                sessions = append(sessions, s)
1✔
270
        }
1✔
271
        m.mu.RUnlock()
7✔
272

7✔
273
        for _, session := range sessions {
8✔
274
                if session.GetState() == SessionStateActive && !session.IsTailing() {
1✔
NEW
275
                        _ = session.StartTailing(true) // read from beginning to populate buffer
×
NEW
276
                }
×
277
        }
278
}
279

280
// RefreshStates checks all sessions for state changes (active->completed).
281
// stops tailing for sessions that have completed.
282
func (m *SessionManager) RefreshStates() {
2✔
283
        m.mu.RLock()
2✔
284
        sessions := make([]*Session, 0, len(m.sessions))
2✔
285
        for _, s := range m.sessions {
4✔
286
                sessions = append(sessions, s)
2✔
287
        }
2✔
288
        m.mu.RUnlock()
2✔
289

2✔
290
        for _, session := range sessions {
4✔
291
                // only check sessions that are currently tailing
2✔
292
                if !session.IsTailing() {
3✔
293
                        continue
1✔
294
                }
295

296
                // check if session is still active
297
                active, err := IsActive(session.Path)
1✔
298
                if err != nil {
1✔
NEW
299
                        continue
×
300
                }
301

302
                if !active {
2✔
303
                        // session completed, update state and stop tailing
1✔
304
                        session.SetState(SessionStateCompleted)
1✔
305
                        session.StopTailing()
1✔
306
                }
1✔
307
        }
308
}
309

310
// sessionIDFromPath derives a session ID from the progress file path.
311
// the ID includes the filename (without the "progress-" prefix and ".txt" suffix)
312
// plus an FNV-64a hash of the canonical absolute path to avoid collisions across directories.
313
//
314
// format: <plan-name>-<16-char-hex-hash>
315
// example: "/tmp/progress-my-plan.txt" -> "my-plan-a1b2c3d4e5f67890"
316
//
317
// the hash ensures uniqueness when the same plan name exists in different directories.
318
// the path is canonicalized (absolute + cleaned) before hashing for stability.
319
func sessionIDFromPath(path string) string {
49✔
320
        base := filepath.Base(path)
49✔
321
        id := strings.TrimPrefix(base, "progress-")
49✔
322
        id = strings.TrimSuffix(id, ".txt")
49✔
323

49✔
324
        canonical := path
49✔
325
        if abs, err := filepath.Abs(path); err == nil {
98✔
326
                canonical = abs
49✔
327
        }
49✔
328
        canonical = filepath.Clean(canonical)
49✔
329

49✔
330
        hasher := fnv.New64a()
49✔
331
        _, _ = hasher.Write([]byte(canonical))
49✔
332
        return fmt.Sprintf("%s-%016x", id, hasher.Sum64())
49✔
333
}
334

335
// IsActive checks if a progress file is locked by another process or the current one.
336
// returns true if the file is locked (session is running), false otherwise.
337
// uses flock with LOCK_EX|LOCK_NB to test without blocking.
338
func IsActive(path string) (bool, error) {
29✔
339
        if progress.IsPathLockedByCurrentProcess(path) {
30✔
340
                return true, nil
1✔
341
        }
1✔
342

343
        f, err := os.Open(path) //nolint:gosec // path from user-controlled glob pattern, acceptable for session discovery
28✔
344
        if err != nil {
29✔
345
                return false, fmt.Errorf("open file: %w", err)
1✔
346
        }
1✔
347
        defer f.Close()
27✔
348

27✔
349
        // try to acquire exclusive lock non-blocking
27✔
350
        err = syscall.Flock(int(f.Fd()), syscall.LOCK_EX|syscall.LOCK_NB)
27✔
351
        if err != nil {
27✔
NEW
352
                // EWOULDBLOCK means file is locked by another process
×
NEW
353
                if errors.Is(err, syscall.EWOULDBLOCK) {
×
NEW
354
                        return true, nil
×
NEW
355
                }
×
NEW
356
                return false, fmt.Errorf("flock: %w", err)
×
357
        }
358

359
        // we got the lock, release it immediately
360
        _ = syscall.Flock(int(f.Fd()), syscall.LOCK_UN)
27✔
361
        return false, nil
27✔
362
}
363

364
// ParseProgressHeader reads the header section of a progress file and extracts metadata.
365
// the header format is:
366
//
367
//        # Ralphex Progress Log
368
//        Plan: path/to/plan.md
369
//        Branch: feature-branch
370
//        Mode: full
371
//        Started: 2026-01-22 10:30:00
372
//        ------------------------------------------------------------
373
func ParseProgressHeader(path string) (SessionMetadata, error) {
29✔
374
        f, err := os.Open(path) //nolint:gosec // path from user-controlled glob pattern, acceptable for session discovery
29✔
375
        if err != nil {
30✔
376
                return SessionMetadata{}, fmt.Errorf("open file: %w", err)
1✔
377
        }
1✔
378
        defer f.Close()
28✔
379

28✔
380
        var meta SessionMetadata
28✔
381
        scanner := bufio.NewScanner(f)
28✔
382

28✔
383
        for scanner.Scan() {
192✔
384
                line := scanner.Text()
164✔
385

164✔
386
                // stop at separator line
164✔
387
                if strings.HasPrefix(line, "---") {
192✔
388
                        break
28✔
389
                }
390

391
                // parse key-value pairs using CutPrefix for efficiency
392
                switch {
136✔
393
                case strings.HasPrefix(line, "Plan: "):
26✔
394
                        if val, found := strings.CutPrefix(line, "Plan: "); found {
52✔
395
                                meta.PlanPath = val
26✔
396
                        }
26✔
397
                case strings.HasPrefix(line, "Branch: "):
28✔
398
                        if val, found := strings.CutPrefix(line, "Branch: "); found {
56✔
399
                                meta.Branch = val
28✔
400
                        }
28✔
401
                case strings.HasPrefix(line, "Mode: "):
27✔
402
                        if val, found := strings.CutPrefix(line, "Mode: "); found {
54✔
403
                                meta.Mode = val
27✔
404
                        }
27✔
405
                case strings.HasPrefix(line, "Started: "):
27✔
406
                        if val, found := strings.CutPrefix(line, "Started: "); found {
54✔
407
                                t, err := time.Parse("2006-01-02 15:04:05", val)
27✔
408
                                if err == nil {
54✔
409
                                        meta.StartTime = t
27✔
410
                                }
27✔
411
                        }
412
                }
413
        }
414

415
        if err := scanner.Err(); err != nil {
28✔
NEW
416
                return SessionMetadata{}, fmt.Errorf("scan file: %w", err)
×
NEW
417
        }
×
418

419
        return meta, nil
28✔
420
}
421

422
// loadProgressFileIntoBuffer reads a progress file and populates the buffer with events.
423
// used for completed sessions that were discovered after they finished.
424
// errors are silently ignored since this is best-effort loading.
425
func loadProgressFileIntoBuffer(path string, buffer *Buffer) {
28✔
426
        f, err := os.Open(path) //nolint:gosec // path from user-controlled glob pattern, acceptable for session discovery
28✔
427
        if err != nil {
29✔
428
                return
1✔
429
        }
1✔
430
        defer f.Close()
27✔
431

27✔
432
        scanner := bufio.NewScanner(f)
27✔
433
        inHeader := true
27✔
434
        phase := processor.PhaseTask
27✔
435

27✔
436
        for scanner.Scan() {
215✔
437
                line := scanner.Text()
188✔
438

188✔
439
                // skip empty lines
188✔
440
                if line == "" {
203✔
441
                        continue
15✔
442
                }
443

444
                // check for header separator (line of dashes without spaces)
445
                if strings.HasPrefix(line, "---") && strings.Count(line, "-") > 20 && !strings.Contains(line, " ") {
200✔
446
                        inHeader = false
27✔
447
                        continue
27✔
448
                }
449

450
                // skip header lines
451
                if inHeader {
280✔
452
                        continue
134✔
453
                }
454

455
                // check for section header (--- section name ---)
456
                if matches := sectionRegex.FindStringSubmatch(line); matches != nil {
15✔
457
                        sectionName := matches[1]
3✔
458
                        phase = phaseFromSection(sectionName)
3✔
459
                        buffer.Add(Event{
3✔
460
                                Type:      EventTypeSection,
3✔
461
                                Phase:     phase,
3✔
462
                                Section:   sectionName,
3✔
463
                                Text:      sectionName,
3✔
464
                                Timestamp: time.Now(),
3✔
465
                        })
3✔
466
                        continue
3✔
467
                }
468

469
                // check for timestamped line
470
                if matches := timestampRegex.FindStringSubmatch(line); matches != nil {
17✔
471
                        text := matches[2]
8✔
472

8✔
473
                        // parse timestamp
8✔
474
                        ts, err := time.Parse("06-01-02 15:04:05", matches[1])
8✔
475
                        if err != nil {
8✔
NEW
476
                                ts = time.Now()
×
NEW
477
                        }
×
478

479
                        eventType := detectEventType(text)
8✔
480
                        event := Event{
8✔
481
                                Type:      eventType,
8✔
482
                                Phase:     phase,
8✔
483
                                Text:      text,
8✔
484
                                Timestamp: ts,
8✔
485
                        }
8✔
486

8✔
487
                        if sig := extractSignalFromText(text); sig != "" {
10✔
488
                                event.Signal = sig
2✔
489
                                event.Type = EventTypeSignal
2✔
490
                        }
2✔
491

492
                        buffer.Add(event)
8✔
493
                        continue
8✔
494
                }
495

496
                // plain line (no timestamp)
497
                buffer.Add(Event{
1✔
498
                        Type:      EventTypeOutput,
1✔
499
                        Phase:     phase,
1✔
500
                        Text:      line,
1✔
501
                        Timestamp: time.Now(),
1✔
502
                })
1✔
503
        }
504
}
505

506
// phaseFromSection determines the phase from a section name.
507
func phaseFromSection(name string) processor.Phase {
10✔
508
        nameLower := strings.ToLower(name)
10✔
509
        switch {
10✔
510
        case strings.Contains(nameLower, "task"):
5✔
511
                return processor.PhaseTask
5✔
512
        case strings.Contains(nameLower, "review"):
2✔
513
                return processor.PhaseReview
2✔
514
        case strings.Contains(nameLower, "codex"):
1✔
515
                return processor.PhaseCodex
1✔
516
        case strings.Contains(nameLower, "claude-eval") || strings.Contains(nameLower, "claude eval"):
2✔
517
                return processor.PhaseClaudeEval
2✔
NEW
518
        default:
×
NEW
519
                return processor.PhaseTask
×
520
        }
521
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc