• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

umputun / ralphex / 26063630696

18 May 2026 10:10PM UTC coverage: 83.136% (+0.2%) from 82.902%
26063630696

Pull #350

github

umputun
fix(progress): hold writeMu across each public method's full write sequence

Addresses Copilot review on commit 960bbcf (pkg/progress/progress.go:642):
the previous mutex implementation acquired and released the lock inside each
writeFile / writeStdout call independently. Public methods doing back-to-back
writeFile + writeStdout (Print, PrintRaw, PrintSection, PrintAligned,
LogQuestion, LogDraftReview, Close) could therefore have a concurrent producer
slip between the two sinks — the file gets producer A's line while stdout
shows producer B's, or LogQuestion's QUESTION line gets separated from its
companion OPTIONS line on either sink. The doc comments overstated the
guarantee.

Fix: rename writeFile / writeStdout to writeFileLocked / writeStdoutLocked
(require l.writeMu to be held by the caller; the inner methods just do the
fmt.Fprintf). Each public method that emits a coherent log entry now takes
the mutex once around its entire writeFile + writeStdout sequence so the
two sinks stay in step under concurrent producers (codex stderr processor +
rollout-file tailer both call OutputHandler -> Print on the same Logger).

Methods adjusted:
- writeTimestamped, PrintRaw, PrintSection — 2-write file+stdout pairs
- PrintAligned (per-line loop) — file+stdout pair per displayed line
- LogQuestion — 4-write QUESTION/OPTIONS sequence kept atomic
- LogDraftReview — DRAFT REVIEW + optional FEEDBACK pair
- LogDiffStats — single file write, but still takes the mutex so a producer
  holding it for a pair sequence is not interrupted by this method
- Close — separator + Completed/Failed footer kept atomic

NewLogger setup writes (header) intentionally not wrapped — they run before
any concurrent producer exists. Tests pass under -race; linter clean.
Pull Request #350: Add first-class codex executor mode

599 of 667 new or added lines in 9 files covered. (89.81%)

15 existing lines in 5 files now uncovered.

7385 of 8883 relevant lines covered (83.14%)

231.73 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

93.01
/pkg/executor/codex.go
1
package executor
2

3
import (
4
        "bytes"
5
        "context"
6
        "encoding/json"
7
        "errors"
8
        "fmt"
9
        "io"
10
        "log"
11
        "os"
12
        "os/exec"
13
        "path/filepath"
14
        "regexp"
15
        "strings"
16
        "sync/atomic"
17
        "time"
18
)
19

20
// CodexStreams holds both stderr and stdout from codex command.
21
type CodexStreams struct {
22
        Stderr io.Reader
23
        Stdout io.Reader
24
}
25

26
// CodexRunner abstracts command execution for codex.
27
// Returns both stderr (streaming progress) and stdout (final response).
28
type CodexRunner interface {
29
        Run(ctx context.Context, name string, args ...string) (streams CodexStreams, wait func() error, err error)
30
}
31

32
// execCodexRunner is the default command runner using os/exec for codex.
33
// codex outputs streaming progress to stderr, final response to stdout.
34
// when stdin is non-nil, it is connected to the child process's stdin (used to pass
35
// the prompt via pipe instead of a CLI argument to avoid Windows 8191-char cmd limit).
36
// stripAnthropicKey scopes ANTHROPIC_API_KEY filtering to first-class --codex runs;
37
// external codex review in default claude mode keeps the host env intact so custom
38
// codex wrappers proxying through Anthropic (e.g., scripts/codex-as-claude.sh) keep
39
// authenticating. CLAUDECODE is always stripped regardless of mode to prevent
40
// nested-session errors when codex is launched from inside a Claude Code session.
41
type execCodexRunner struct {
42
        stdin             io.Reader
43
        stripAnthropicKey bool
44
}
45

46
// childEnv builds the codex child-process env. CLAUDECODE is always stripped to
47
// prevent nested-session errors. ANTHROPIC_API_KEY is stripped only when the
48
// caller requested it (first-class --codex mode); default-claude external codex
49
// review passes the key through so custom Anthropic-proxying wrappers keep working.
50
func (r *execCodexRunner) childEnv(env []string) []string {
8✔
51
        if r.stripAnthropicKey {
11✔
52
                return filterEnv(env, "ANTHROPIC_API_KEY", "CLAUDECODE")
3✔
53
        }
3✔
54
        return filterEnv(env, "CLAUDECODE")
5✔
55
}
56

57
func (r *execCodexRunner) Run(ctx context.Context, name string, args ...string) (CodexStreams, func() error, error) {
3✔
58
        // check context before starting to avoid spawning a process that will be immediately killed
3✔
59
        if err := ctx.Err(); err != nil {
3✔
60
                return CodexStreams{}, nil, fmt.Errorf("context already canceled: %w", err)
×
61
        }
×
62

63
        // use exec.Command (not CommandContext) because we handle cancellation ourselves
64
        // to ensure the entire process group is killed, not just the direct child
65
        cmd := exec.Command(name, args...) //nolint:noctx // intentional: we handle context cancellation via process group kill
3✔
66

3✔
67
        cmd.Env = r.childEnv(os.Environ())
3✔
68

3✔
69
        // pass prompt via stdin when set (avoids Windows 8191-char command-line limit)
3✔
70
        if r.stdin != nil {
4✔
71
                cmd.Stdin = r.stdin
1✔
72
        }
1✔
73

74
        // create new process group so we can kill all descendants on cleanup
75
        setupProcessGroup(cmd)
3✔
76

3✔
77
        stderr, err := cmd.StderrPipe()
3✔
78
        if err != nil {
3✔
79
                return CodexStreams{}, nil, fmt.Errorf("stderr pipe: %w", err)
×
80
        }
×
81

82
        stdout, err := cmd.StdoutPipe()
3✔
83
        if err != nil {
3✔
84
                return CodexStreams{}, nil, fmt.Errorf("stdout pipe: %w", err)
×
85
        }
×
86

87
        if err := cmd.Start(); err != nil {
4✔
88
                return CodexStreams{}, nil, fmt.Errorf("start command: %w", err)
1✔
89
        }
1✔
90

91
        // setup process group cleanup with graceful shutdown on context cancellation
92
        cleanup := newProcessGroupCleanup(cmd, ctx.Done())
2✔
93

2✔
94
        return CodexStreams{Stderr: stderr, Stdout: stdout}, cleanup.Wait, nil
2✔
95
}
96

97
// CodexExecutor runs codex CLI commands and filters output.
98
type CodexExecutor struct {
99
        Command         string            // command to execute, defaults to "codex"
100
        Model           string            // model override; empty means inherit from ~/.codex/config.toml (no -c model= flag emitted)
101
        ReasoningEffort string            // reasoning effort override; empty means inherit from ~/.codex/config.toml
102
        TimeoutMs       int               // stream idle timeout in ms, defaults to 3600000
103
        Sandbox         string            // sandbox mode, defaults to "read-only"
104
        ProjectDoc      string            // path to project documentation file
105
        OutputHandler   func(text string) // called for each filtered output line in real-time
106
        Debug           bool              // enable debug output
107
        ErrorPatterns   []string          // patterns to detect in output (e.g., rate limit messages)
108
        LimitPatterns   []string          // patterns to detect rate limits (checked before error patterns)
109
        MultiAgent      bool              // enable codex multi_agent feature + reviewer agent registration; set to true on the review-phase codex instance built by processor.New() for first-class --codex mode
110
        headerEmitted   atomic.Bool       // tracks first invocation across Run() calls; false until first task/review then suppressed permanently — used to emit codex's resolved model/sandbox/effort once at the top of the run
111
        PassClaudeMd    bool              // pass project-level CLAUDE.md to codex via project_doc_fallback_filenames (set by processor.New() only when cfg.AppConfig.Executor == ExecutorCodex)
112
        IdleTimeout     time.Duration     // kill session after this duration of no output, zero = disabled
113
        runner          CodexRunner       // for testing, nil uses default
114
}
115

116
// CodexReviewerAgentName is the agent name registered with codex when
117
// features.multi_agent is enabled. shared with pkg/processor so the
118
// spawn_agent(agent=...) call in review prompts stays in sync with the
119
// registration here — if either side drifts, codex silently fails to
120
// resolve the agent and the review phase breaks.
121
const CodexReviewerAgentName = "reviewer"
122

123
// codexReviewerDescription is the description registered for the reviewer
124
// agent when features.multi_agent is enabled. behavior is driven by the task
125
// argument, so the description stays generic and stable.
126
//
127
// MUST stay ASCII without backslashes, control characters, or non-printable bytes:
128
// codexConfigOpts.cliArgs serializes this via fmt.Sprintf("...=%q", ...) which
129
// emits Go string-literal escapes; only the printable ASCII subset round-trips
130
// safely through TOML basic-string syntax.
131
const codexReviewerDescription = "general code review specialist; behavior driven by the task argument"
132

133
// configOverrides returns the -c key=value arg slice to splice into the codex CLI
134
// invocation based on the executor's MultiAgent and PassClaudeMd flags. All overrides
135
// are additive on top of the user's ~/.codex/config.toml.
136
func (e *CodexExecutor) configOverrides() []string {
57✔
137
        var args []string
57✔
138
        if e.MultiAgent {
61✔
139
                args = append(args,
4✔
140
                        "-c", "features.multi_agent=true",
4✔
141
                        "-c", fmt.Sprintf("agents.%s.description=%q", CodexReviewerAgentName, codexReviewerDescription),
4✔
142
                )
4✔
143
        }
4✔
144
        if e.PassClaudeMd {
61✔
145
                args = append(args, "-c", `project_doc_fallback_filenames=["CLAUDE.md"]`)
4✔
146
        }
4✔
147
        return args
57✔
148
}
149

150
// codexFilterState tracks header separator count for filtering.
151
type codexFilterState struct {
152
        headerCount int             // tracks "--------" separators seen (show content between first two)
153
        seen        map[string]bool // track all shown lines for deduplication
154
        firstRun    bool            // when true, whitelist model/sandbox/effort lines from the header block so the user sees codex's resolved config once at the top of the run
155
}
156

157
// Run executes codex CLI with the given prompt and returns filtered output.
158
// stderr is streamed line-by-line to OutputHandler for progress indication.
159
// stdout is captured entirely as the final response (returned in Result.Output).
160
func (e *CodexExecutor) Run(ctx context.Context, prompt string) Result {
52✔
161
        cmd := e.Command
52✔
162
        if cmd == "" {
103✔
163
                cmd = "codex"
51✔
164
        }
51✔
165

166
        timeoutMs := e.TimeoutMs
52✔
167
        if timeoutMs <= 0 {
103✔
168
                timeoutMs = 3600000
51✔
169
        }
51✔
170

171
        sandbox := e.Sandbox
52✔
172
        if sandbox == "" {
101✔
173
                sandbox = "read-only"
49✔
174
        }
49✔
175
        // disable sandbox in docker (landlock doesn't work in containers)
176
        if os.Getenv("RALPHEX_DOCKER") == "1" {
52✔
177
                sandbox = "danger-full-access"
×
178
        }
×
179

180
        args := []string{"exec"}
52✔
181
        args = append(args, e.configOverrides()...)
52✔
182
        // --dangerously-bypass-approvals-and-sandbox is required for unattended first-class
52✔
183
        // --codex runs (which use danger-full-access by default). External codex review in
52✔
184
        // claude mode worked on master without this flag and adding it would silently change
52✔
185
        // approval semantics for default-claude users (esp. Docker mode where the sandbox is
52✔
186
        // forced to danger-full-access); gate the flag on MultiAgent which is true only in
52✔
187
        // first-class --codex (set by processor.buildCodexExecutor).
52✔
188
        if sandbox == "danger-full-access" && e.MultiAgent {
53✔
189
                args = append(args, "--dangerously-bypass-approvals-and-sandbox")
1✔
190
        }
1✔
191
        args = append(args, "--sandbox", sandbox)
52✔
192
        // model and reasoning effort are emitted only when explicitly set in ralphex config,
52✔
193
        // so the user's ~/.codex/config.toml choice is preserved otherwise (matches the
52✔
194
        // "additive -c overrides" promise documented in CLAUDE.md / llms.txt).
52✔
195
        if e.Model != "" {
53✔
196
                args = append(args, "-c", fmt.Sprintf("model=%q", e.Model))
1✔
197
        }
1✔
198
        if e.ReasoningEffort != "" {
53✔
199
                args = append(args, "-c", "model_reasoning_effort="+e.ReasoningEffort)
1✔
200
        }
1✔
201
        args = append(args, "-c", fmt.Sprintf("stream_idle_timeout_ms=%d", timeoutMs))
52✔
202

52✔
203
        if e.ProjectDoc != "" {
53✔
204
                args = append(args, "-c", fmt.Sprintf("project_doc=%q", e.ProjectDoc))
1✔
205
        }
1✔
206

207
        // pass prompt via stdin to avoid Windows 8191-char command-line limit;
208
        // codex reads from stdin when no positional prompt argument is given.
209
        // MultiAgent signals first-class --codex (set by processor.buildCodexExecutor only;
210
        // external codex review built by buildExternalCodexExecutor leaves it false), so it
211
        // also gates ANTHROPIC_API_KEY stripping — default-claude external codex review
212
        // preserves the host env so wrappers proxying through Anthropic keep working.
213
        stdinReader := strings.NewReader(prompt)
52✔
214
        runner := e.runner
52✔
215
        if runner == nil {
52✔
NEW
216
                runner = &execCodexRunner{stdin: stdinReader, stripAnthropicKey: e.MultiAgent}
×
NEW
217
        }
×
218

219
        // set up idle timeout: derive a cancellable context that fires when no output
220
        // is received for IdleTimeout duration. the touch closure resets the timer on
221
        // each stderr line and on each stdout read; mirrors the ClaudeExecutor pattern.
222
        execCtx := ctx
52✔
223
        idleTouch := func() {} // no-op by default
576✔
224
        if e.IdleTimeout > 0 {
56✔
225
                var idleCancel context.CancelFunc
4✔
226
                execCtx, idleCancel = context.WithCancel(ctx)
4✔
227
                defer idleCancel()
4✔
228
                timer := time.AfterFunc(e.IdleTimeout, idleCancel)
4✔
229
                defer timer.Stop()
4✔
230
                idleTouch = func() { timer.Reset(e.IdleTimeout) }
17✔
231
        }
232

233
        streams, wait, err := runner.Run(execCtx, cmd, args...)
52✔
234
        if err != nil {
53✔
235
                return Result{Error: fmt.Errorf("start codex: %w", err)}
1✔
236
        }
1✔
237

238
        // process stderr for progress display (header block + bold summaries).
239
        // sessionIDCh receives the session id once stderr's header block surfaces
240
        // it; the tail goroutine below uses it to follow the rollout file.
241
        // firstRun is true exactly once across all Run() calls on this executor —
242
        // gives shouldDisplay license to leak codex's resolved model/sandbox/effort
243
        // once at the top of the run instead of repeating the full banner per phase.
244
        firstRun := e.headerEmitted.CompareAndSwap(false, true)
51✔
245
        sessionIDCh := make(chan string, 1)
51✔
246
        stderrDone := make(chan stderrResult, 1)
51✔
247
        go func() {
102✔
248
                stderrDone <- e.processStderr(execCtx, streams.Stderr, idleTouch, sessionIDCh, firstRun)
51✔
249
        }()
51✔
250

251
        tailCancel, tailDone := e.startRolloutTail(execCtx, sessionIDCh, idleTouch)
51✔
252

51✔
253
        // read stdout entirely as final response; wrap with touch-on-read so reads
51✔
254
        // keep the idle timer alive even while stderr is quiet.
51✔
255
        stdoutReader := streams.Stdout
51✔
256
        if e.IdleTimeout > 0 {
55✔
257
                stdoutReader = &touchReader{r: streams.Stdout, touch: idleTouch}
4✔
258
        }
4✔
259
        stdoutContent, stdoutErr := e.readStdout(stdoutReader)
51✔
260

51✔
261
        // wait for stderr processing to complete
51✔
262
        stderrRes := <-stderrDone
51✔
263

51✔
264
        // codex has exited; signal tailer to drain remaining file content and stop
51✔
265
        tailCancel()
51✔
266
        <-tailDone
51✔
267

51✔
268
        // wait for command completion
51✔
269
        waitErr := wait()
51✔
270

51✔
271
        // detect signal in stdout (the actual response)
51✔
272
        signal := detectSignal(stdoutContent)
51✔
273

51✔
274
        // idle timeout: derived context canceled but parent is alive — not an error.
51✔
275
        // mirrors the ClaudeExecutor idle-timeout completion path so callers see uniform behavior.
51✔
276
        if e.IdleTimeout > 0 && execCtx.Err() != nil && ctx.Err() == nil {
53✔
277
                e.logDroppedIdleErrors(stdoutErr, waitErr)
2✔
278
                return e.idleTimeoutResult(stdoutContent, signal, stderrRes)
2✔
279
        }
2✔
280

281
        finalErr := e.finalError(ctx, stderrRes, stdoutErr, waitErr)
49✔
282

49✔
283
        // only check error/limit patterns when the process failed (non-zero exit or stream error).
49✔
284
        // when codex exits cleanly, pattern matches in output are false positives from findings
49✔
285
        // (e.g., reviewing code that handles rate limits).
49✔
286
        // skip pattern checks on context cancellation — cancellation must propagate as-is.
49✔
287
        if finalErr != nil && ctx.Err() == nil {
71✔
288
                if patternErr := e.checkPatterns(stdoutContent, stderrRes); patternErr != nil {
37✔
289
                        return Result{Output: stdoutContent, Signal: signal, Error: patternErr}
15✔
290
                }
15✔
291
        }
292

293
        // return stdout content as the result (the actual answer from codex)
294
        return Result{Output: stdoutContent, Signal: signal, Error: finalErr}
34✔
295
}
296

297
// finalError reconciles stderr/stdout/wait errors into the single error returned
298
// from Run. stderr and stdout errors win over wait errors so callers see the
299
// root cause rather than the cascade exit code; ctx.Err() short-circuits to
300
// preserve cancellation semantics; non-zero exit with stderr tail produces a
301
// readable diagnostic that includes the last few stderr lines.
302
func (e *CodexExecutor) finalError(ctx context.Context, stderrRes stderrResult, stdoutErr, waitErr error) error {
49✔
303
        switch {
49✔
304
        case stderrRes.err != nil && !errors.Is(stderrRes.err, context.Canceled):
1✔
305
                return stderrRes.err
1✔
NEW
306
        case stdoutErr != nil:
×
NEW
307
                return stdoutErr
×
308
        case waitErr != nil:
24✔
309
                if ctx.Err() != nil {
27✔
310
                        return fmt.Errorf("context error: %w", ctx.Err())
3✔
311
                }
3✔
312
                if len(stderrRes.lastLines) > 0 {
31✔
313
                        return fmt.Errorf("codex exited with error: %w\nstderr: %s",
10✔
314
                                waitErr, strings.Join(stderrRes.lastLines, "\n"))
10✔
315
                }
10✔
316
                return fmt.Errorf("codex exited with error: %w", waitErr)
11✔
317
        }
318
        return nil
24✔
319
}
320

321
// touchReader wraps an io.Reader to invoke touch on each successful Read.
322
// used to keep the idle-timeout timer alive while stdout is being drained.
323
type touchReader struct {
324
        r     io.Reader
325
        touch func()
326
}
327

328
func (t *touchReader) Read(p []byte) (int, error) {
13✔
329
        n, err := t.r.Read(p)
13✔
330
        if n > 0 && t.touch != nil {
20✔
331
                t.touch()
7✔
332
        }
7✔
333
        return n, err //nolint:wrapcheck // pass-through reader; preserve EOF and original error semantics
13✔
334
}
335

336
// logDroppedIdleErrors surfaces concurrent stream/wait errors that would otherwise
337
// be discarded by the idle-timeout completion path. operators need this to
338
// distinguish "agent went silent" from "stream broke" before retrying.
339
func (e *CodexExecutor) logDroppedIdleErrors(stdoutErr, waitErr error) {
2✔
340
        if stdoutErr != nil {
2✔
NEW
341
                log.Printf("codex idle timeout fired with concurrent stdout error: %v", stdoutErr)
×
NEW
342
        }
×
343
        if waitErr != nil {
4✔
344
                log.Printf("codex idle timeout fired with concurrent wait error: %v", waitErr)
2✔
345
        }
2✔
346
}
347

348
// idleTimeoutResult builds the Result returned when the idle-timeout timer
349
// canceled the derived execution context (parent ctx still alive). limit and
350
// error patterns are still checked across stdout and stderr so a wait-and-retry
351
// triggered by a real quota diagnostic survives idle-timeout cancellation;
352
// otherwise IdleTimedOut is set and the caller treats this as a soft kill.
353
func (e *CodexExecutor) idleTimeoutResult(stdoutContent, signal string, stderr stderrResult) Result {
2✔
354
        if patternErr := e.checkPatterns(stdoutContent, stderr); patternErr != nil {
3✔
355
                return Result{Output: stdoutContent, Signal: signal, Error: patternErr}
1✔
356
        }
1✔
357
        return Result{Output: stdoutContent, Signal: signal, IdleTimedOut: true}
1✔
358
}
359

360
// checkPatterns scans stdout AND the stderr matches captured live during streaming
361
// for limit/error patterns. codex emits OpenAI/ChatGPT plan-quota errors (e.g.,
362
// "ERROR: You've hit your usage limit") to stderr while stdout is empty on failure;
363
// processStderr matches each line on the fly so detection is not subject to the
364
// 5-line / 256-rune tail truncation used for human-readable error context.
365
//
366
// Priority is limit-first across both sources before any error match: a real
367
// stderr quota diagnostic (already filtered through the CLI-error prefix gate
368
// in processStderr) must not be downgraded to a non-retryable PatternMatchError
369
// just because partial stdout happens to match a configured ErrorPattern. Within
370
// each severity class, stdout wins over stderr so an explicit stdout limit/error
371
// takes precedence when both sources fire.
372
//
373
// Order:
374
//  1. stdout LimitPatterns
375
//  2. stderr.limitMatch (prefix-gated)
376
//  3. stdout ErrorPatterns
377
//  4. stderr.errorMatch (prefix-gated)
378
//
379
// returns LimitPatternError or PatternMatchError when a pattern matches; nil otherwise.
380
func (e *CodexExecutor) checkPatterns(stdoutContent string, stderr stderrResult) error {
24✔
381
        // limit-class first — across both sources
24✔
382
        if pattern := matchPattern(stdoutContent, e.LimitPatterns); pattern != "" {
28✔
383
                return &LimitPatternError{Pattern: pattern, HelpCmd: "codex /status"}
4✔
384
        }
4✔
385
        if stderr.limitMatch != "" {
26✔
386
                return &LimitPatternError{Pattern: stderr.limitMatch, HelpCmd: "codex /status"}
6✔
387
        }
6✔
388

389
        // error-class second
390
        if pattern := matchPattern(stdoutContent, e.ErrorPatterns); pattern != "" {
19✔
391
                return &PatternMatchError{Pattern: pattern, HelpCmd: "codex /status"}
5✔
392
        }
5✔
393
        if stderr.errorMatch != "" {
10✔
394
                return &PatternMatchError{Pattern: stderr.errorMatch, HelpCmd: "codex /status"}
1✔
395
        }
1✔
396

397
        return nil
8✔
398
}
399

400
// stderrResult holds processed stderr output and any error from reading.
401
// limitMatch and errorMatch capture the FIRST limit/error pattern that fires
402
// during streaming, on the untruncated, un-evicted line — so detection is not
403
// subject to the lastLines tail truncation (5 lines, 256 runes per line).
404
type stderrResult struct {
405
        lastLines  []string // last few lines of stderr for error context
406
        limitMatch string   // first matched limit pattern seen on stderr (live scan)
407
        errorMatch string   // first matched error pattern seen on stderr (live scan)
408
        err        error
409
}
410

411
// processStderr reads stderr line-by-line, filters for progress display, and
412
// scans each line for configured limit/error patterns. shows header block
413
// (between first two "--------" separators) and bold summaries. captures last
414
// lines of unfiltered output for error reporting AND records the first
415
// limit/error pattern hit (untruncated, un-evicted) so callers can rely on it
416
// regardless of how much chatter follows. idleTouch is invoked for every
417
// stderr line so the idle-timeout timer is reset while codex is producing
418
// progress output; pass a no-op when idle timeout is disabled.
419
// when sessionIDCh is non-nil, the first detected "session id: <uuid>" line
420
// in the header block is written to it (non-blocking, buffered channel
421
// expected) so the caller can start tailing the rollout file in parallel.
422
// firstRun gates the one-time emission of codex's resolved model/sandbox/
423
// effort header lines so the user can see what codex actually picked from
424
// ~/.codex/config.toml; on subsequent invocations the header stays hidden.
425
func (e *CodexExecutor) processStderr(ctx context.Context, r io.Reader, idleTouch func(), sessionIDCh chan<- string, firstRun bool) stderrResult {
63✔
426
        const maxTailLines = 5    // keep last N lines for error context
63✔
427
        const maxLineLength = 256 // truncate long lines to avoid oversized error strings
63✔
428

63✔
429
        state := &codexFilterState{firstRun: firstRun}
63✔
430
        var tail []string
63✔
431
        var limitMatch, errorMatch string
63✔
432
        sessionIDSent := false
63✔
433

63✔
434
        err := readLines(ctx, r, func(line string) {
615✔
435
                if idleTouch != nil {
1,084✔
436
                        idleTouch() // reset idle timer on every stderr line
532✔
437
                }
532✔
438
                // scan untruncated line for patterns first; record only the first hit
439
                // per category so detection is eviction- and truncation-resistant.
440
                // restricted to CLI-error-prefixed lines (see scanLineForPatterns).
441
                e.scanLineForPatterns(line, &limitMatch, &errorMatch)
552✔
442

552✔
443
                // surface session id from header block to caller (once) so the rollout
552✔
444
                // file can be tailed in parallel for assistant-message streaming.
552✔
445
                if !sessionIDSent && sessionIDCh != nil {
1,081✔
446
                        if id := e.extractSessionID(line); id != "" {
533✔
447
                                select {
4✔
448
                                case sessionIDCh <- id:
4✔
NEW
449
                                default:
×
450
                                }
451
                                sessionIDSent = true
4✔
452
                        }
453
                }
454

455
                // capture non-empty lines for error context, preserving original formatting
456
                if strings.TrimSpace(line) != "" {
1,104✔
457
                        stored := line
552✔
458
                        if runes := []rune(stored); len(runes) > maxLineLength {
559✔
459
                                stored = string(runes[:maxLineLength]) + "..."
7✔
460
                        }
7✔
461
                        tail = append(tail, stored)
552✔
462
                        if len(tail) > maxTailLines {
992✔
463
                                copy(tail, tail[1:])
440✔
464
                                tail = tail[:maxTailLines]
440✔
465
                        }
440✔
466
                }
467

468
                if show, filtered := e.shouldDisplay(line, state); show {
775✔
469
                        if e.OutputHandler != nil {
240✔
470
                                e.OutputHandler(filtered + "\n")
17✔
471
                        }
17✔
472
                }
473
        })
474

475
        if err != nil {
71✔
476
                return stderrResult{lastLines: tail, limitMatch: limitMatch, errorMatch: errorMatch, err: fmt.Errorf("read stderr: %w", err)}
8✔
477
        }
8✔
478
        return stderrResult{lastLines: tail, limitMatch: limitMatch, errorMatch: errorMatch}
55✔
479
}
480

481
// scanLineForPatterns updates limitMatch / errorMatch with the first matching
482
// limit/error pattern found in line, gated by isCodexErrorLine so progress
483
// chatter cannot trigger false positives. Once each match has been recorded
484
// it sticks for the rest of the run.
485
func (e *CodexExecutor) scanLineForPatterns(line string, limitMatch, errorMatch *string) {
552✔
486
        if !isCodexErrorLine(line) {
1,092✔
487
                return
540✔
488
        }
540✔
489
        if *limitMatch == "" {
24✔
490
                if pattern := matchPattern(line, e.LimitPatterns); pattern != "" {
20✔
491
                        *limitMatch = pattern
8✔
492
                }
8✔
493
        }
494
        if *errorMatch == "" {
24✔
495
                if pattern := matchPattern(line, e.ErrorPatterns); pattern != "" {
17✔
496
                        *errorMatch = pattern
5✔
497
                }
5✔
498
        }
499
}
500

501
// isCodexErrorLine reports whether a stderr line looks like a CLI error message
502
// codex reliably prefixes diagnostics. limit/error pattern matching is gated on
503
// this prefix so progress text on stderr (header banners, bold summaries, model
504
// chatter that may legitimately mention "rate limit" while reviewing code) does
505
// not trigger false-positive matches.
506
func isCodexErrorLine(line string) bool {
565✔
507
        s := strings.TrimSpace(line)
565✔
508
        if s == "" {
567✔
509
                return false
2✔
510
        }
2✔
511
        // case-insensitive prefix match; codex uses "ERROR:" today, others are
512
        // defensive against possible future variants.
513
        lower := strings.ToLower(s)
563✔
514
        return strings.HasPrefix(lower, "error:") ||
563✔
515
                strings.HasPrefix(lower, "fatal:") ||
563✔
516
                strings.HasPrefix(lower, "panic:")
563✔
517
}
518

519
// readStdout reads the entire stdout content as the final response.
520
func (e *CodexExecutor) readStdout(r io.Reader) (string, error) {
53✔
521
        data, err := io.ReadAll(r)
53✔
522
        if err != nil {
54✔
523
                return "", fmt.Errorf("read stdout: %w", err)
1✔
524
        }
1✔
525
        return string(data), nil
52✔
526
}
527

528
// shouldDisplay implements a simple filter for codex stderr output.
529
// shows: bold reasoning summaries codex emits as live progress; on the very
530
// first codex invocation across this executor's lifetime (state.firstRun)
531
// also shows codex's resolved model/sandbox/effort lines from the header
532
// block so the user sees what codex actually picked from ~/.codex/config.toml.
533
// per-iteration header repetition (workdir/provider/approval/session id) is
534
// always suppressed to match ClaudeExecutor's empty-banner UX. session id
535
// detection in processStderr is independent of display so the rollout tailer
536
// still works whether the line is forwarded or not.
537
// also deduplicates lines to avoid non-consecutive repeats.
538
func (e *CodexExecutor) shouldDisplay(line string, state *codexFilterState) (bool, string) {
589✔
539
        s := strings.TrimSpace(line)
589✔
540
        if s == "" {
593✔
541
                return false, ""
4✔
542
        }
4✔
543

544
        var show bool
585✔
545
        var filtered string
585✔
546

585✔
547
        switch {
585✔
548
        case strings.HasPrefix(s, "--------"):
56✔
549
                // track separators only so subsequent header lines stay suppressed;
56✔
550
                // never displayed.
56✔
551
                state.headerCount++
56✔
552
        case state.headerCount == 1:
49✔
553
                // inside the header block. on the first run let codex's resolved
49✔
554
                // config (model / sandbox / reasoning effort) leak through so the
49✔
555
                // banner reflects what codex actually picked when ralphex did not
49✔
556
                // explicitly override these fields.
49✔
557
                if state.firstRun && e.isHeaderConfigLine(s) {
58✔
558
                        show = true
9✔
559
                        filtered = s
9✔
560
                }
9✔
561
        case strings.HasPrefix(s, "**"):
225✔
562
                // show bold summaries after header (progress indication)
225✔
563
                show = true
225✔
564
                filtered = e.stripBold(s)
225✔
565
        }
566

567
        // deduplicate displayed lines
568
        if show {
819✔
569
                if state.seen == nil {
252✔
570
                        state.seen = make(map[string]bool)
18✔
571
                }
18✔
572
                if state.seen[filtered] {
238✔
573
                        return false, "" // skip duplicate
4✔
574
                }
4✔
575
                state.seen[filtered] = true
230✔
576
        }
577

578
        return show, filtered
581✔
579
}
580

581
// isHeaderConfigLine returns true when line is one of codex's header-block
582
// lines describing the resolved per-session config that ralphex doesn't know
583
// up front (model picked from ~/.codex/config.toml, sandbox, reasoning effort).
584
// other header lines (workdir, provider, approval, reasoning summaries,
585
// session id) are either obvious from context or not useful to the user.
586
func (e *CodexExecutor) isHeaderConfigLine(s string) bool {
33✔
587
        return strings.HasPrefix(s, "model:") ||
33✔
588
                strings.HasPrefix(s, "sandbox:") ||
33✔
589
                strings.HasPrefix(s, "reasoning effort:")
33✔
590
}
33✔
591

592
// stripBold removes markdown bold markers (**text**) from text.
593
func (e *CodexExecutor) stripBold(s string) string {
231✔
594
        // replace **text** with text
231✔
595
        result := s
231✔
596
        for {
692✔
597
                start := strings.Index(result, "**")
461✔
598
                if start == -1 {
691✔
599
                        break
230✔
600
                }
601
                end := strings.Index(result[start+2:], "**")
231✔
602
                if end == -1 {
232✔
603
                        break
1✔
604
                }
605
                // remove both markers
606
                result = result[:start] + result[start+2:start+2+end] + result[start+2+end+2:]
230✔
607
        }
608
        return result
231✔
609
}
610

611
// sessionIDPattern matches the "session id: <uuid>" line codex emits in its
612
// startup banner. capture group 1 is the session id (lowercase hex + dashes).
613
var sessionIDPattern = regexp.MustCompile(`(?i)\bsession id:\s*([0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12})\b`)
614

615
// extractSessionID returns the codex session id from a stderr line that
616
// includes "session id: <uuid>", or "" when the line does not match. used
617
// by processStderr to surface the id to the rollout-tail goroutine.
618
func (e *CodexExecutor) extractSessionID(line string) string {
535✔
619
        m := sessionIDPattern.FindStringSubmatch(line)
535✔
620
        if len(m) < 2 {
1,063✔
621
                return ""
528✔
622
        }
528✔
623
        return m[1]
7✔
624
}
625

626
// startRolloutTail spawns the rollout-tail goroutine and returns a cancel
627
// function plus a done channel. tail goroutine waits for the session id on
628
// sessionIDCh, then follows codex's session rollout file until the returned
629
// cancel is called. caller must invoke tailCancel and wait on tailDone before
630
// returning so the tailer drains remaining file content and exits cleanly.
631
// the goroutine is a no-op when OutputHandler is nil — extracted from Run()
632
// to keep its cyclomatic complexity in check.
633
func (e *CodexExecutor) startRolloutTail(parent context.Context, sessionIDCh <-chan string, idleTouch func()) (context.CancelFunc, <-chan struct{}) {
51✔
634
        tailCtx, tailCancel := context.WithCancel(parent)
51✔
635
        done := make(chan struct{})
51✔
636
        go func() {
102✔
637
                defer close(done)
51✔
638
                select {
51✔
639
                case <-tailCtx.Done():
48✔
640
                        return
48✔
641
                case id := <-sessionIDCh:
3✔
642
                        e.tailRolloutFile(tailCtx, id, idleTouch)
3✔
643
                }
644
        }()
645
        return tailCancel, done
51✔
646
}
647

648
// findRolloutFile resolves the path to codex's session-rollout JSONL file
649
// for the given session id. codex stores the file under
650
// ~/.codex/sessions/<year>/<month>/<day>/rollout-<timestamp>-<session-id>.jsonl
651
// and may take a brief moment to create it after printing the session-id
652
// banner, so we poll up to ~5s. returns "" when the file cannot be located.
653
func (e *CodexExecutor) findRolloutFile(ctx context.Context, sessionID string) string {
7✔
654
        home, err := os.UserHomeDir()
7✔
655
        if err != nil {
7✔
NEW
656
                return ""
×
NEW
657
        }
×
658
        pattern := filepath.Join(home, ".codex", "sessions", "*", "*", "*", "rollout-*-"+sessionID+".jsonl")
7✔
659

7✔
660
        deadline := time.Now().Add(5 * time.Second)
7✔
661
        for {
16✔
662
                matches, _ := filepath.Glob(pattern)
9✔
663
                if len(matches) > 0 {
11✔
664
                        return matches[0]
2✔
665
                }
2✔
666
                if time.Now().After(deadline) {
7✔
NEW
667
                        return ""
×
NEW
668
                }
×
669
                select {
7✔
670
                case <-ctx.Done():
5✔
671
                        return ""
5✔
672
                case <-time.After(100 * time.Millisecond):
2✔
673
                }
674
        }
675
}
676

677
// tailRolloutFile follows codex's session rollout JSONL file like `tail -f`,
678
// parses each event, and emits human-readable progress lines via OutputHandler.
679
// runs until ctx is canceled. on cancellation, drains any remaining buffered
680
// lines before returning so late writes (e.g. codex flushing the final
681
// assistant message just before exit) are not lost.
682
func (e *CodexExecutor) tailRolloutFile(ctx context.Context, sessionID string, idleTouch func()) {
4✔
683
        if e.OutputHandler == nil {
4✔
NEW
684
                return
×
NEW
685
        }
×
686
        path := e.findRolloutFile(ctx, sessionID)
4✔
687
        if path == "" {
7✔
688
                return
3✔
689
        }
3✔
690
        f, err := os.Open(path) //nolint:gosec // path comes from codex's own session id
1✔
691
        if err != nil {
1✔
NEW
692
                return
×
NEW
693
        }
×
694
        defer func() { _ = f.Close() }()
2✔
695

696
        // accumulator holds bytes that may not yet form a complete line, so partial
697
        // reads at EOF do not lose content — the next Read after codex appends more
698
        // will complete the line.
699
        var acc []byte
1✔
700
        chunk := make([]byte, 4096)
1✔
701
        drainOnce := func() {
4✔
702
                for {
8✔
703
                        n, readErr := f.Read(chunk)
5✔
704
                        if n > 0 {
7✔
705
                                acc = append(acc, chunk[:n]...)
2✔
706
                                for {
7✔
707
                                        i := bytes.IndexByte(acc, '\n')
5✔
708
                                        if i < 0 {
7✔
709
                                                break
2✔
710
                                        }
711
                                        if msg := e.formatRolloutEvent(acc[:i]); msg != "" {
5✔
712
                                                e.OutputHandler(msg)
2✔
713
                                                if idleTouch != nil {
2✔
NEW
714
                                                        idleTouch()
×
NEW
715
                                                }
×
716
                                        }
717
                                        acc = acc[i+1:]
3✔
718
                                }
719
                        }
720
                        if readErr == io.EOF || n == 0 {
8✔
721
                                return
3✔
722
                        }
3✔
723
                        if readErr != nil {
2✔
NEW
724
                                return
×
NEW
725
                        }
×
726
                }
727
        }
728

729
        for {
3✔
730
                drainOnce()
2✔
731
                select {
2✔
732
                case <-ctx.Done():
1✔
733
                        // final drain after codex exits — pick up any late-flushed events
1✔
734
                        drainOnce()
1✔
735
                        return
1✔
736
                case <-time.After(200 * time.Millisecond):
1✔
737
                }
738
        }
739
}
740

741
// rolloutEvent is the outer wrapper for each line in codex's session rollout
742
// JSONL file. only `type` and `payload` are needed; we re-parse payload based
743
// on the type.
744
type rolloutEvent struct {
745
        Type    string          `json:"type"`
746
        Payload json.RawMessage `json:"payload"`
747
}
748

749
// rolloutPayload covers the response_item payload shape we render: assistant
750
// messages (payload.type=message, role=assistant). function_call records and
751
// reasoning records are dropped by formatRolloutEvent before any of those
752
// fields would be read, so the struct only carries the subset we actually
753
// consume.
754
type rolloutPayload struct {
755
        Type    string `json:"type"`
756
        Role    string `json:"role"`
757
        Content []struct {
758
                Type string `json:"type"`
759
                Text string `json:"text"`
760
        } `json:"content"`
761
}
762

763
// formatRolloutEvent turns one JSONL rollout line into a display string for
764
// OutputHandler, or "" when the event has no user-visible substance. only
765
// assistant message text (the model's actual reply, the codex equivalent of
766
// claude's stream-json text blocks) is forwarded.
767
//
768
// reasoning records are skipped because their summaries are already streamed
769
// live from stderr. all function_call records (exec_command for git/grep/file
770
// reads, spawn_agent for parallel reviewer dispatch) and their outputs are
771
// skipped because they are tool-machinery noise — the assistant message text
772
// already announces what the model is doing narratively (e.g. "I'll launch
773
// the five review agents together"). showing both yields redundant chatter.
774
func (e *CodexExecutor) formatRolloutEvent(line []byte) string {
16✔
775
        if len(bytes.TrimSpace(line)) == 0 {
18✔
776
                return ""
2✔
777
        }
2✔
778
        var ev rolloutEvent
14✔
779
        if err := json.Unmarshal(line, &ev); err != nil {
15✔
780
                return ""
1✔
781
        }
1✔
782
        if ev.Type != "response_item" {
16✔
783
                return ""
3✔
784
        }
3✔
785
        var p rolloutPayload
10✔
786
        if err := json.Unmarshal(ev.Payload, &p); err != nil {
10✔
NEW
787
                return ""
×
NEW
788
        }
×
789
        if p.Type != "message" || p.Role != "assistant" {
16✔
790
                return ""
6✔
791
        }
6✔
792
        var sb strings.Builder
4✔
793
        for _, c := range p.Content {
9✔
794
                if c.Type != "output_text" || c.Text == "" {
5✔
NEW
795
                        continue
×
796
                }
797
                if sb.Len() > 0 {
6✔
798
                        sb.WriteByte('\n')
1✔
799
                }
1✔
800
                sb.WriteString(c.Text)
5✔
801
        }
802
        return sb.String()
4✔
803
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc