• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

umputun / ralphex / 26409766960

25 May 2026 04:15PM UTC coverage: 83.319% (-0.01%) from 83.33%
26409766960

Pull #362

github

umputun
fix(config): match Claude Code "session limit" wording in default patterns

Claude Code emits "You've hit your session limit · resets …" which does not
substring-match the existing "You've hit your limit" default — "session" sits
between "your" and "limit", so the limit detection misses it. Result: without
--wait, ralphex exited with a raw runner error; with --wait, the retry loop
never engaged.

Add "You've hit your session limit" to both claude_error_patterns and
claude_limit_patterns defaults so --wait retries through the reset window
and users without --wait still get a graceful exit (follows the #317
precedent for new Anthropic-wording additions).

Related to #361
Pull Request #362: fix(config): detect Claude session-limit message

7497 of 8998 relevant lines covered (83.32%)

231.34 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

90.75
/pkg/web/session_manager.go
1
package web
2

3
import (
4
        "fmt"
5
        "hash/fnv"
6
        "log"
7
        "os"
8
        "path/filepath"
9
        "sort"
10
        "strings"
11
        "sync"
12

13
        "github.com/umputun/ralphex/pkg/progress"
14
)
15

16
// MaxCompletedSessions is the maximum number of completed sessions to retain.
17
// active sessions are never evicted. oldest completed sessions are removed
18
// when this limit is exceeded to prevent unbounded memory growth.
19
const MaxCompletedSessions = 100
20

21
// SessionManager maintains a registry of all discovered sessions.
22
// it handles discovery of progress files, state detection via flock,
23
// and provides access to sessions by ID.
24
// completed sessions are automatically evicted when MaxCompletedSessions is exceeded.
25
type SessionManager struct {
26
        mu       sync.RWMutex
27
        sessions map[string]*Session // keyed by session ID
28
}
29

30
// NewSessionManager creates a new session manager with an empty registry.
31
func NewSessionManager() *SessionManager {
78✔
32
        return &SessionManager{
78✔
33
                sessions: make(map[string]*Session),
78✔
34
        }
78✔
35
}
78✔
36

37
// Discover scans a directory for progress files matching progress-*.txt pattern.
38
// for each file found, it creates or updates a session in the registry.
39
// returns the list of discovered session IDs.
40
func (m *SessionManager) Discover(dir string) ([]string, error) {
40✔
41
        pattern := filepath.Join(dir, "progress-*.txt")
40✔
42
        matches, err := filepath.Glob(pattern)
40✔
43
        if err != nil {
40✔
44
                return nil, fmt.Errorf("glob progress files: %w", err)
×
45
        }
×
46

47
        ids := make([]string, 0, len(matches))
40✔
48
        for _, path := range matches {
194✔
49
                id := sessionIDFromPath(path)
154✔
50
                ids = append(ids, id)
154✔
51

154✔
52
                // check if session already exists
154✔
53
                m.mu.RLock()
154✔
54
                existing := m.sessions[id]
154✔
55
                m.mu.RUnlock()
154✔
56

154✔
57
                if existing != nil {
165✔
58
                        // update existing session state
11✔
59
                        if err := m.updateSession(existing); err != nil {
11✔
60
                                log.Printf("[WARN] failed to update session %s: %v", id, err)
×
61
                                continue
×
62
                        }
63
                } else {
143✔
64
                        // create new session
143✔
65
                        session := NewSession(id, path)
143✔
66
                        if err := m.updateSession(session); err != nil {
144✔
67
                                log.Printf("[WARN] failed to create session %s: %v", id, err)
1✔
68
                                continue
1✔
69
                        }
70
                        m.mu.Lock()
142✔
71
                        m.sessions[id] = session
142✔
72
                        m.evictOldCompleted()
142✔
73
                        m.mu.Unlock()
142✔
74
                }
75
        }
76

77
        return ids, nil
40✔
78
}
79

80
// DiscoverRecursive walks a directory tree and discovers all progress files.
81
// unlike Discover, this searches subdirectories recursively.
82
// returns the list of all discovered session IDs (deduplicated).
83
func (m *SessionManager) DiscoverRecursive(root string) ([]string, error) {
24✔
84
        seenDirs := make(map[string]bool)
24✔
85
        seenIDs := make(map[string]bool)
24✔
86
        var allIDs []string
24✔
87

24✔
88
        err := filepath.WalkDir(root, func(path string, d os.DirEntry, err error) error {
71✔
89
                if err != nil {
48✔
90
                        // skip directories that can't be accessed
1✔
91
                        if d != nil && d.IsDir() {
1✔
92
                                return filepath.SkipDir
×
93
                        }
×
94
                        return nil
1✔
95
                }
96

97
                // skip directories that typically contain many subdirs and no progress files
98
                if d.IsDir() && skipDirs[d.Name()] && path != root {
52✔
99
                        return filepath.SkipDir
6✔
100
                }
6✔
101

102
                // skip non-progress files
103
                if d.IsDir() || !isProgressFile(path) {
70✔
104
                        return nil
30✔
105
                }
30✔
106

107
                // only call Discover once per directory
108
                dir := filepath.Dir(path)
10✔
109
                if seenDirs[dir] {
12✔
110
                        return nil
2✔
111
                }
2✔
112
                seenDirs[dir] = true
8✔
113

8✔
114
                ids, discoverErr := m.Discover(dir)
8✔
115
                if discoverErr != nil {
8✔
116
                        return nil //nolint:nilerr // best-effort discovery, errors for individual directories are ignored
×
117
                }
×
118

119
                for _, id := range ids {
18✔
120
                        if !seenIDs[id] {
20✔
121
                                seenIDs[id] = true
10✔
122
                                allIDs = append(allIDs, id)
10✔
123
                        }
10✔
124
                }
125

126
                return nil
8✔
127
        })
128

129
        if err != nil {
24✔
130
                return allIDs, fmt.Errorf("walk directory %s: %w", root, err)
×
131
        }
×
132

133
        return allIDs, nil
24✔
134
}
135

136
// updateSession refreshes a session's state and metadata from its progress file.
137
// handles starting/stopping tailing based on state transitions.
138
//
139
// the header's Started: timestamp is compared against the previously stored
140
// metadata to detect a new ralphex run that reused the progress file (truncate
141
// + rewrite). when a restart is detected, per-run state (lastOffset, loaded,
142
// phase, pending section, diff stats) is reset before the state-transition
143
// path runs, so the subsequent loader / tailer reads the fresh file from byte
144
// 0 rather than seeking to the stale offset from the previous run. this
145
// closes the truncation race where StartFromOffset's `offset > fileSize` check
146
// misses the case where the new run has already grown past the old offset.
147
//
148
// the restart check and stored-metadata update are both gated on the header
149
// being complete (terminating separator observed). a mid-write read can return
150
// incomplete metadata with a zero StartTime; overwriting the stored value with
151
// that zero would erase the previous run's StartTime and defeat the restart
152
// detection on a later event when the full header is finally visible.
153
func (m *SessionManager) updateSession(session *Session) error {
165✔
154
        // parse header first so we can detect a new ralphex run that reused this
165✔
155
        // progress file. a changed "Started:" timestamp means the file was
165✔
156
        // truncated and re-initialized: the stored lastOffset is from the
165✔
157
        // previous run and no longer corresponds to current content.
165✔
158
        meta, headerComplete, err := ParseProgressHeader(session.Path)
165✔
159
        if err != nil {
166✔
160
                return fmt.Errorf("parse header: %w", err)
1✔
161
        }
1✔
162
        if headerComplete {
326✔
163
                oldMeta := session.GetMetadata()
162✔
164
                if !oldMeta.StartTime.IsZero() && !meta.StartTime.IsZero() &&
162✔
165
                        !oldMeta.StartTime.Equal(meta.StartTime) {
164✔
166
                        // new run on same file: stop any ongoing tailer (captures no useful
2✔
167
                        // offset since the file was replaced), then reset per-run state so
2✔
168
                        // handleStateTransition / MarkLoadedIfNot pick up the fresh content
2✔
169
                        // from byte 0 regardless of how far the new run has grown.
2✔
170
                        if session.IsTailing() {
2✔
171
                                session.StopTailing()
×
172
                        }
×
173
                        session.resetForNewRun()
2✔
174
                }
175
                session.SetMetadata(meta)
162✔
176
        }
177

178
        prevState := session.GetState()
164✔
179

164✔
180
        // check if file is locked (active session)
164✔
181
        active, err := IsActive(session.Path)
164✔
182
        if err != nil {
164✔
183
                return fmt.Errorf("check active state: %w", err)
×
184
        }
×
185

186
        newState := SessionStateCompleted
164✔
187
        if active {
167✔
188
                newState = SessionStateActive
3✔
189
        }
3✔
190
        session.SetState(newState)
164✔
191

164✔
192
        // handle state transitions for tailing
164✔
193
        m.handleStateTransition(session, prevState, newState)
164✔
194

164✔
195
        // for completed sessions that haven't been loaded yet, load the file content once.
164✔
196
        // this handles sessions discovered after they finished.
164✔
197
        // MarkLoadedIfNot is atomic to prevent double-loading from concurrent goroutines.
164✔
198
        // the lastOffset==0 guard prevents re-reading content a previous tailer already
164✔
199
        // ingested: an active session tailed from the start can be marked completed by a
164✔
200
        // RefreshStates flock race that captures the tailer's offset into lastOffset; if
164✔
201
        // IsActive races false a second time here, newState stays completed and the
164✔
202
        // loader would otherwise re-emit events the tailer has already published. we
164✔
203
        // still mark the session loaded in that case so the watcher's IsLoaded gate
164✔
204
        // allows Reactivate to resume tailing from the captured offset.
164✔
205
        //
164✔
206
        // gated on headerComplete so that a mid-write discovery (e.g. Windows, where
164✔
207
        // IsActive is always false, or a rare Unix race where IsActive momentarily
164✔
208
        // returns false) does not mark the session loaded and record a lastOffset
164✔
209
        // pointing inside an unfinished header. without this gate, a later Reactivate
164✔
210
        // would resume from mid-header and emit the remaining header lines as output
164✔
211
        // events. the loader will run on a later updateSession call once the header
164✔
212
        // separator is visible.
164✔
213
        if newState == SessionStateCompleted && headerComplete && session.MarkLoadedIfNot() {
313✔
214
                if session.getLastOffset() == 0 {
297✔
215
                        m.loadProgressFileIntoSession(session.Path, session)
148✔
216
                }
148✔
217
        }
218

219
        // update last modified time
220
        info, err := os.Stat(session.Path)
164✔
221
        if err != nil {
164✔
222
                return fmt.Errorf("stat file: %w", err)
×
223
        }
×
224
        session.SetLastModified(info.ModTime())
164✔
225

164✔
226
        return nil
164✔
227
}
228

229
// handleStateTransition starts or stops tailing for a session whose state just
230
// changed. when a session becomes active with content already ingested
231
// (lastOffset > 0), resumption goes through Reactivate so the SSE replay buffer
232
// is not filled with duplicates; fresh-active sessions read from the beginning.
233
// completed→no-op and active→active transitions do nothing.
234
func (m *SessionManager) handleStateTransition(session *Session, prevState, newState SessionState) {
164✔
235
        if prevState == newState {
323✔
236
                return
159✔
237
        }
159✔
238
        switch {
5✔
239
        case newState == SessionStateActive && !session.IsTailing():
1✔
240
                m.activateSession(session)
1✔
241
        case newState == SessionStateCompleted && session.IsTailing():
4✔
242
                session.StopTailing()
4✔
243
        }
244
}
245

246
// activateSession starts tailing for a session that just became active.
247
// chooses between Reactivate (resume from stored offset) and StartTailing(true)
248
// (read from the beginning) based on whether content has already been ingested.
249
func (m *SessionManager) activateSession(session *Session) {
2✔
250
        if session.getLastOffset() > 0 {
4✔
251
                // content already in SSE replay (loader ran previously, or a
2✔
252
                // previous tailer captured an offset before StopTailing). resume
2✔
253
                // from the stored offset to avoid re-emitting events. this covers
2✔
254
                // the flock-race recovery path: RefreshStates falsely marks a live
2✔
255
                // session completed and captures the tailer offset; a later Write
2✔
256
                // event triggers this transition back to active.
2✔
257
                if err := session.Reactivate(); err != nil {
2✔
258
                        log.Printf("[WARN] failed to reactivate session %s: %v", session.ID, err)
×
259
                }
×
260
                return
2✔
261
        }
262
        // fresh discovery of an active session, read from the beginning
263
        if err := session.StartTailing(true); err != nil {
×
264
                log.Printf("[WARN] failed to start tailing for session %s: %v", session.ID, err)
×
265
        }
×
266
}
267

268
// Get returns a session by ID, or nil if not found.
269
func (m *SessionManager) Get(id string) *Session {
75✔
270
        m.mu.RLock()
75✔
271
        defer m.mu.RUnlock()
75✔
272
        return m.sessions[id]
75✔
273
}
75✔
274

275
// All returns all sessions in the registry.
276
func (m *SessionManager) All() []*Session {
8✔
277
        m.mu.RLock()
8✔
278
        defer m.mu.RUnlock()
8✔
279

8✔
280
        result := make([]*Session, 0, len(m.sessions))
8✔
281
        for _, s := range m.sessions {
118✔
282
                result = append(result, s)
110✔
283
        }
110✔
284
        return result
8✔
285
}
286

287
// Remove removes a session from the registry and closes its resources.
288
func (m *SessionManager) Remove(id string) {
2✔
289
        m.mu.Lock()
2✔
290
        defer m.mu.Unlock()
2✔
291

2✔
292
        if session, ok := m.sessions[id]; ok {
4✔
293
                session.Close()
2✔
294
                delete(m.sessions, id)
2✔
295
        }
2✔
296
}
297

298
// Register adds an externally-created session to the manager.
299
// This is used when a session is created for live execution (BroadcastLogger)
300
// and needs to be visible in the multi-session dashboard.
301
// The session's ID is derived from its path using sessionIDFromPath.
302
func (m *SessionManager) Register(session *Session) {
14✔
303
        id := sessionIDFromPath(session.Path)
14✔
304
        session.ID = id // ensure ID matches what SessionManager expects
14✔
305

14✔
306
        m.mu.Lock()
14✔
307
        defer m.mu.Unlock()
14✔
308

14✔
309
        // don't overwrite existing session
14✔
310
        if _, exists := m.sessions[id]; exists {
15✔
311
                return
1✔
312
        }
1✔
313

314
        m.sessions[id] = session
13✔
315
}
316

317
// Close closes all sessions and clears the registry.
318
func (m *SessionManager) Close() {
39✔
319
        m.mu.Lock()
39✔
320
        defer m.mu.Unlock()
39✔
321

39✔
322
        for _, session := range m.sessions {
61✔
323
                session.Close()
22✔
324
        }
22✔
325
        m.sessions = make(map[string]*Session)
39✔
326
}
327

328
// evictOldCompleted removes oldest completed sessions when count exceeds MaxCompletedSessions.
329
// active sessions are never evicted. must be called with lock held.
330
func (m *SessionManager) evictOldCompleted() {
142✔
331
        // count completed sessions
142✔
332
        var completed []*Session
142✔
333
        for _, s := range m.sessions {
5,750✔
334
                if s.GetState() == SessionStateCompleted {
11,216✔
335
                        completed = append(completed, s)
5,608✔
336
                }
5,608✔
337
        }
338

339
        if len(completed) <= MaxCompletedSessions {
279✔
340
                return
137✔
341
        }
137✔
342

343
        // sort by start time (oldest first)
344
        sort.Slice(completed, func(i, j int) bool {
3,306✔
345
                ti := completed[i].GetMetadata().StartTime
3,301✔
346
                tj := completed[j].GetMetadata().StartTime
3,301✔
347
                return ti.Before(tj)
3,301✔
348
        })
3,301✔
349

350
        // evict oldest sessions beyond the limit
351
        toEvict := len(completed) - MaxCompletedSessions
5✔
352
        for i := range toEvict {
10✔
353
                session := completed[i]
5✔
354
                session.Close()
5✔
355
                delete(m.sessions, session.ID)
5✔
356
        }
5✔
357
}
358

359
// StartTailingActive starts tailing for all active sessions.
360
// for each active session not already tailing, starts tailing from the beginning
361
// to populate the buffer with existing content.
362
func (m *SessionManager) StartTailingActive() {
22✔
363
        m.mu.RLock()
22✔
364
        sessions := make([]*Session, 0, len(m.sessions))
22✔
365
        for _, s := range m.sessions {
29✔
366
                sessions = append(sessions, s)
7✔
367
        }
7✔
368
        m.mu.RUnlock()
22✔
369

22✔
370
        for _, session := range sessions {
29✔
371
                if session.GetState() == SessionStateActive && !session.IsTailing() {
7✔
372
                        if err := session.StartTailing(true); err != nil { // read from beginning to populate buffer
×
373
                                log.Printf("[WARN] failed to start tailing for session %s: %v", session.ID, err)
×
374
                        }
×
375
                }
376
        }
377
}
378

379
// RefreshStates checks all sessions for state changes (active->completed).
380
// stops tailing for sessions that have completed.
381
func (m *SessionManager) RefreshStates() {
2✔
382
        m.mu.RLock()
2✔
383
        sessions := make([]*Session, 0, len(m.sessions))
2✔
384
        for _, s := range m.sessions {
4✔
385
                sessions = append(sessions, s)
2✔
386
        }
2✔
387
        m.mu.RUnlock()
2✔
388

2✔
389
        for _, session := range sessions {
4✔
390
                // only check sessions that are currently tailing
2✔
391
                if !session.IsTailing() {
3✔
392
                        continue
1✔
393
                }
394

395
                // check if session is still active
396
                active, err := IsActive(session.Path)
1✔
397
                if err != nil {
1✔
398
                        continue
×
399
                }
400

401
                if !active {
2✔
402
                        // session completed, update state and stop tailing
1✔
403
                        session.SetState(SessionStateCompleted)
1✔
404
                        session.StopTailing()
1✔
405
                }
1✔
406
        }
407
}
408

409
// sessionIDFromPath derives a session ID from the progress file path.
410
// the ID includes the filename (without the "progress-" prefix and ".txt" suffix)
411
// plus an FNV-64a hash of the canonical absolute path to avoid collisions across directories.
412
//
413
// format: <plan-name>-<16-char-hex-hash>
414
// example: "/tmp/progress-my-plan.txt" -> "my-plan-a1b2c3d4e5f67890"
415
//
416
// the hash ensures uniqueness when the same plan name exists in different directories.
417
// the path is canonicalized (absolute + cleaned) before hashing for stability.
418
func sessionIDFromPath(path string) string {
233✔
419
        base := filepath.Base(path)
233✔
420
        id := strings.TrimPrefix(base, "progress-")
233✔
421
        id = strings.TrimSuffix(id, ".txt")
233✔
422

233✔
423
        canonical := path
233✔
424
        if abs, err := filepath.Abs(path); err == nil {
466✔
425
                canonical = abs
233✔
426
        }
233✔
427
        canonical = filepath.Clean(canonical)
233✔
428

233✔
429
        hasher := fnv.New64a()
233✔
430
        _, _ = hasher.Write([]byte(canonical))
233✔
431
        return fmt.Sprintf("%s-%016x", id, hasher.Sum64())
233✔
432
}
433

434
// IsActive checks if a progress file is locked by another process or the current one.
435
// returns true if the file is locked (session is running), false otherwise.
436
// uses flock with LOCK_EX|LOCK_NB to test without blocking.
437
func IsActive(path string) (bool, error) {
170✔
438
        if progress.IsPathLockedByCurrentProcess(path) {
173✔
439
                return true, nil
3✔
440
        }
3✔
441

442
        f, err := os.Open(path) //nolint:gosec // path from user-controlled glob pattern, acceptable for session discovery
167✔
443
        if err != nil {
168✔
444
                return false, fmt.Errorf("open file: %w", err)
1✔
445
        }
1✔
446
        defer f.Close()
166✔
447

166✔
448
        // try to acquire exclusive lock non-blocking
166✔
449
        gotLock, err := progress.TryLockFile(f)
166✔
450
        if err != nil {
166✔
451
                return false, fmt.Errorf("flock: %w", err)
×
452
        }
×
453

454
        // if we got the lock, file is not active
455
        // if we didn't get the lock, file is locked by another process (active)
456
        return !gotLock, nil
166✔
457
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc