• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

umputun / ralphex / 26265094477

22 May 2026 02:34AM UTC coverage: 83.341% (+0.4%) from 82.902%
26265094477

Pull #350

github

umputun
feat(codex): inject task-phase directive to suppress conflicting skill workflows

Codex auto-activates skills from the user's ~/.codex/skills/ based on prompt
content. A user's plan-execution skill triggers on the same wording ralphex's
task prompt uses, runs a competing workflow, and floods the progress stream
with recited skill text. prependCodexTaskGuidance prepends a generic directive
telling codex that ralphex's task prompt is authoritative when an
auto-activated skill's workflow conflicts with it. Active only under the codex
executor; wired in runTaskPhase. Soft mitigation — codex 0.133.0 exposes no
per-invocation skill-disable flag.
Pull Request #350: Add first-class codex executor mode

649 of 712 new or added lines in 9 files covered. (91.15%)

14 existing lines in 4 files now uncovered.

7439 of 8926 relevant lines covered (83.34%)

232.26 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

93.05
/pkg/executor/codex.go
1
package executor
2

3
import (
4
        "bytes"
5
        "context"
6
        "encoding/json"
7
        "errors"
8
        "fmt"
9
        "io"
10
        "log"
11
        "os"
12
        "os/exec"
13
        "path/filepath"
14
        "regexp"
15
        "strings"
16
        "sync/atomic"
17
        "time"
18
)
19

20
// CodexStreams holds both stderr and stdout from codex command.
21
type CodexStreams struct {
22
        Stderr io.Reader
23
        Stdout io.Reader
24
}
25

26
// CodexRunner abstracts command execution for codex.
27
// Returns both stderr (streaming progress) and stdout (final response).
28
type CodexRunner interface {
29
        Run(ctx context.Context, name string, args ...string) (streams CodexStreams, wait func() error, err error)
30
}
31

32
// execCodexRunner is the default command runner using os/exec for codex.
33
// codex outputs streaming progress to stderr, final response to stdout.
34
// when stdin is non-nil, it is connected to the child process's stdin (used to pass
35
// the prompt via pipe instead of a CLI argument to avoid Windows 8191-char cmd limit).
36
// stripAnthropicKey scopes ANTHROPIC_API_KEY filtering to first-class --codex runs;
37
// external codex review in default claude mode keeps the host env intact so custom
38
// codex wrappers proxying through Anthropic (e.g., scripts/codex-as-claude/codex-as-claude.sh) keep
39
// authenticating. CLAUDECODE is always stripped regardless of mode to prevent
40
// nested-session errors when codex is launched from inside a Claude Code session.
41
type execCodexRunner struct {
42
        stdin             io.Reader
43
        stripAnthropicKey bool
44
}
45

46
// childEnv builds the codex child-process env. CLAUDECODE is always stripped to
47
// prevent nested-session errors. ANTHROPIC_API_KEY is stripped only when the
48
// caller requested it (first-class --codex mode); default-claude external codex
49
// review passes the key through so custom Anthropic-proxying wrappers keep working.
50
func (r *execCodexRunner) childEnv(env []string) []string {
8✔
51
        if r.stripAnthropicKey {
11✔
52
                return filterEnv(env, "ANTHROPIC_API_KEY", "CLAUDECODE")
3✔
53
        }
3✔
54
        return filterEnv(env, "CLAUDECODE")
5✔
55
}
56

57
func (r *execCodexRunner) Run(ctx context.Context, name string, args ...string) (CodexStreams, func() error, error) {
3✔
58
        // check context before starting to avoid spawning a process that will be immediately killed
3✔
59
        if err := ctx.Err(); err != nil {
3✔
60
                return CodexStreams{}, nil, fmt.Errorf("context already canceled: %w", err)
×
61
        }
×
62

63
        // use exec.Command (not CommandContext) because we handle cancellation ourselves
64
        // to ensure the entire process group is killed, not just the direct child
65
        cmd := exec.Command(name, args...) //nolint:noctx // intentional: we handle context cancellation via process group kill
3✔
66

3✔
67
        cmd.Env = r.childEnv(os.Environ())
3✔
68

3✔
69
        // pass prompt via stdin when set (avoids Windows 8191-char command-line limit)
3✔
70
        if r.stdin != nil {
4✔
71
                cmd.Stdin = r.stdin
1✔
72
        }
1✔
73

74
        // create new process group so we can kill all descendants on cleanup
75
        setupProcessGroup(cmd)
3✔
76

3✔
77
        stderr, err := cmd.StderrPipe()
3✔
78
        if err != nil {
3✔
79
                return CodexStreams{}, nil, fmt.Errorf("stderr pipe: %w", err)
×
80
        }
×
81

82
        stdout, err := cmd.StdoutPipe()
3✔
83
        if err != nil {
3✔
84
                return CodexStreams{}, nil, fmt.Errorf("stdout pipe: %w", err)
×
85
        }
×
86

87
        if err := cmd.Start(); err != nil {
4✔
88
                return CodexStreams{}, nil, fmt.Errorf("start command: %w", err)
1✔
89
        }
1✔
90

91
        // setup process group cleanup with graceful shutdown on context cancellation
92
        cleanup := newProcessGroupCleanup(cmd, ctx.Done())
2✔
93

2✔
94
        return CodexStreams{Stderr: stderr, Stdout: stdout}, cleanup.Wait, nil
2✔
95
}
96

97
// CodexExecutor runs codex CLI commands and filters output.
98
type CodexExecutor struct {
99
        Command         string            // command to execute, defaults to "codex"
100
        Model           string            // model override; empty means inherit from ~/.codex/config.toml (no -c model= flag emitted)
101
        ReasoningEffort string            // reasoning effort override; empty means inherit from ~/.codex/config.toml
102
        TimeoutMs       int               // stream idle timeout in ms, defaults to 3600000
103
        Sandbox         string            // sandbox mode, defaults to "read-only"
104
        ProjectDoc      string            // path to project documentation file
105
        OutputHandler   func(text string) // called for each filtered output line in real-time
106
        Debug           bool              // enable debug output
107
        ErrorPatterns   []string          // patterns to detect in output (e.g., rate limit messages)
108
        LimitPatterns   []string          // patterns to detect rate limits (checked before error patterns)
109
        MultiAgent      bool              // enable codex multi_agent feature + reviewer agent registration; set to true on the review-phase codex instance built by processor.New() for first-class --codex mode
110
        PassClaudeMd    bool              // pass project-level CLAUDE.md to codex via project_doc_fallback_filenames (set by processor.New() only when cfg.AppConfig.Executor == ExecutorCodex)
111
        IdleTimeout     time.Duration     // kill session after this duration of no output, zero = disabled
112
        headerEmitted   atomic.Bool       // tracks first invocation across Run() calls; false until first task/review then suppressed permanently — used to emit codex's resolved model/sandbox/effort once at the top of the run
113
        runner          CodexRunner       // for testing, nil uses default
114
}
115

116
// CodexReviewerAgentName is the agent name registered with codex when
117
// features.multi_agent is enabled. shared with pkg/processor so the
118
// spawn_agent(agent=...) call in review prompts stays in sync with the
119
// registration here — if either side drifts, codex silently fails to
120
// resolve the agent and the review phase breaks.
121
const CodexReviewerAgentName = "reviewer"
122

123
// codexReviewerDescription is the description registered for the reviewer
124
// agent when features.multi_agent is enabled. behavior is driven by the task
125
// argument, so the description stays generic and stable.
126
//
127
// MUST stay ASCII without backslashes, control characters, or non-printable bytes:
128
// codexConfigOpts.cliArgs serializes this via fmt.Sprintf("...=%q", ...) which
129
// emits Go string-literal escapes; only the printable ASCII subset round-trips
130
// safely through TOML basic-string syntax.
131
const codexReviewerDescription = "general code review specialist; behavior driven by the task argument"
132

133
// configOverrides returns the -c key=value arg slice to splice into the codex CLI
134
// invocation based on the executor's MultiAgent and PassClaudeMd flags. All overrides
135
// are additive on top of the user's ~/.codex/config.toml.
136
func (e *CodexExecutor) configOverrides() []string {
57✔
137
        var args []string
57✔
138
        if e.MultiAgent {
61✔
139
                args = append(args,
4✔
140
                        "-c", "features.multi_agent=true",
4✔
141
                        "-c", fmt.Sprintf("agents.%s.description=%q", CodexReviewerAgentName, codexReviewerDescription),
4✔
142
                )
4✔
143
        }
4✔
144
        if e.PassClaudeMd {
61✔
145
                args = append(args, "-c", `project_doc_fallback_filenames=["CLAUDE.md"]`)
4✔
146
        }
4✔
147
        return args
57✔
148
}
149

150
// codexFilterState tracks header separator count for filtering.
151
type codexFilterState struct {
152
        headerCount int             // tracks "--------" separators seen (show content between first two)
153
        seen        map[string]bool // track all shown lines for deduplication
154
        firstRun    bool            // when true, whitelist model/sandbox/effort lines from the header block so the user sees codex's resolved config once at the top of the run
155
}
156

157
// Run executes codex CLI with the given prompt and returns filtered output.
158
// stderr is streamed line-by-line to OutputHandler for progress indication.
159
// stdout is captured entirely as the final response (returned in Result.Output).
160
func (e *CodexExecutor) Run(ctx context.Context, prompt string) Result {
52✔
161
        cmd := e.Command
52✔
162
        if cmd == "" {
103✔
163
                cmd = "codex"
51✔
164
        }
51✔
165

166
        timeoutMs := e.TimeoutMs
52✔
167
        if timeoutMs <= 0 {
103✔
168
                timeoutMs = 3600000
51✔
169
        }
51✔
170

171
        sandbox := e.Sandbox
52✔
172
        if sandbox == "" {
101✔
173
                sandbox = "read-only"
49✔
174
        }
49✔
175
        // disable sandbox in docker (landlock doesn't work in containers)
176
        if os.Getenv("RALPHEX_DOCKER") == "1" {
52✔
177
                sandbox = "danger-full-access"
×
178
        }
×
179

180
        args := []string{"exec"}
52✔
181
        args = append(args, e.configOverrides()...)
52✔
182
        // --dangerously-bypass-approvals-and-sandbox is required for unattended first-class
52✔
183
        // --codex runs (which use danger-full-access by default). External codex review in
52✔
184
        // claude mode worked on master without this flag and adding it would silently change
52✔
185
        // approval semantics for default-claude users (esp. Docker mode where the sandbox is
52✔
186
        // forced to danger-full-access); gate the flag on MultiAgent which is true only in
52✔
187
        // first-class --codex (set by processor.buildCodexExecutor).
52✔
188
        if sandbox == "danger-full-access" && e.MultiAgent {
53✔
189
                args = append(args, "--dangerously-bypass-approvals-and-sandbox")
1✔
190
        }
1✔
191
        args = append(args, "--sandbox", sandbox)
52✔
192
        // model and reasoning effort are emitted only when explicitly set in ralphex config,
52✔
193
        // so the user's ~/.codex/config.toml choice is preserved otherwise (matches the
52✔
194
        // "additive -c overrides" promise documented in CLAUDE.md / llms.txt).
52✔
195
        if e.Model != "" {
53✔
196
                args = append(args, "-c", fmt.Sprintf("model=%q", e.Model))
1✔
197
        }
1✔
198
        if e.ReasoningEffort != "" {
53✔
199
                args = append(args, "-c", "model_reasoning_effort="+e.ReasoningEffort)
1✔
200
        }
1✔
201
        args = append(args, "-c", fmt.Sprintf("stream_idle_timeout_ms=%d", timeoutMs))
52✔
202

52✔
203
        if e.ProjectDoc != "" {
53✔
204
                args = append(args, "-c", fmt.Sprintf("project_doc=%q", e.ProjectDoc))
1✔
205
        }
1✔
206

207
        // pass prompt via stdin to avoid Windows 8191-char command-line limit;
208
        // codex reads from stdin when no positional prompt argument is given.
209
        // MultiAgent signals first-class --codex (set by processor.buildCodexExecutor only;
210
        // external codex review built by buildExternalCodexExecutor leaves it false), so it
211
        // also gates ANTHROPIC_API_KEY stripping — default-claude external codex review
212
        // preserves the host env so wrappers proxying through Anthropic keep working.
213
        stdinReader := strings.NewReader(prompt)
52✔
214
        runner := e.runner
52✔
215
        if runner == nil {
52✔
NEW
216
                runner = &execCodexRunner{stdin: stdinReader, stripAnthropicKey: e.MultiAgent}
×
NEW
217
        }
×
218

219
        // set up idle timeout: derive a cancellable context that fires when no output
220
        // is received for IdleTimeout duration. the touch closure resets the timer on
221
        // each stderr line and on each stdout read; mirrors the ClaudeExecutor pattern.
222
        execCtx := ctx
52✔
223
        idleTouch := func() {} // no-op by default
576✔
224
        if e.IdleTimeout > 0 {
56✔
225
                var idleCancel context.CancelFunc
4✔
226
                execCtx, idleCancel = context.WithCancel(ctx)
4✔
227
                defer idleCancel()
4✔
228
                timer := time.AfterFunc(e.IdleTimeout, idleCancel)
4✔
229
                defer timer.Stop()
4✔
230
                idleTouch = func() { timer.Reset(e.IdleTimeout) }
17✔
231
        }
232

233
        streams, wait, err := runner.Run(execCtx, cmd, args...)
52✔
234
        if err != nil {
53✔
235
                return Result{Error: fmt.Errorf("start codex: %w", err)}
1✔
236
        }
1✔
237

238
        // process stderr for progress display (header block + bold summaries).
239
        // sessionIDCh receives the session id once stderr's header block surfaces
240
        // it; the tail goroutine below uses it to follow the rollout file.
241
        // firstRun is true exactly once across all Run() calls on this executor —
242
        // gives shouldDisplay license to leak codex's resolved model/sandbox/effort
243
        // once at the top of the run instead of repeating the full banner per phase.
244
        firstRun := e.headerEmitted.CompareAndSwap(false, true)
51✔
245
        sessionIDCh := make(chan string, 1)
51✔
246
        stderrDone := make(chan stderrResult, 1)
51✔
247
        go func() {
102✔
248
                stderrDone <- e.processStderr(execCtx, streams.Stderr, stderrStreamOpts{
51✔
249
                        idleTouch:   idleTouch,
51✔
250
                        sessionIDCh: sessionIDCh,
51✔
251
                        firstRun:    firstRun,
51✔
252
                })
51✔
253
        }()
51✔
254

255
        tailCancel, tailDone := e.startRolloutTail(execCtx, sessionIDCh, idleTouch)
51✔
256

51✔
257
        // read stdout entirely as final response; wrap with touch-on-read so reads
51✔
258
        // keep the idle timer alive even while stderr is quiet.
51✔
259
        stdoutReader := streams.Stdout
51✔
260
        if e.IdleTimeout > 0 {
55✔
261
                stdoutReader = &touchReader{r: streams.Stdout, touch: idleTouch}
4✔
262
        }
4✔
263
        stdoutContent, stdoutErr := e.readStdout(stdoutReader)
51✔
264

51✔
265
        // wait for stderr processing to complete
51✔
266
        stderrRes := <-stderrDone
51✔
267

51✔
268
        // wait for command completion; once wait() returns the codex process has
51✔
269
        // fully exited and flushed the last assistant message to its rollout file
51✔
270
        waitErr := wait()
51✔
271

51✔
272
        // codex has exited; signal tailer to do its final drain and stop. done
51✔
273
        // after wait() so the tailer keeps following until the rollout file is
51✔
274
        // guaranteed complete and the final assistant line is not dropped.
51✔
275
        tailCancel()
51✔
276
        <-tailDone
51✔
277

51✔
278
        // detect signal in stdout (the actual response)
51✔
279
        signal := detectSignal(stdoutContent)
51✔
280

51✔
281
        // idle timeout: derived context canceled but parent is alive — not an error.
51✔
282
        // mirrors the ClaudeExecutor idle-timeout completion path so callers see uniform behavior.
51✔
283
        if e.IdleTimeout > 0 && execCtx.Err() != nil && ctx.Err() == nil {
53✔
284
                e.logDroppedIdleErrors(stdoutErr, waitErr)
2✔
285
                return e.idleTimeoutResult(stdoutContent, signal, stderrRes)
2✔
286
        }
2✔
287

288
        finalErr := e.finalError(ctx, stderrRes, stdoutErr, waitErr)
49✔
289

49✔
290
        // only check error/limit patterns when the process failed (non-zero exit or stream error).
49✔
291
        // when codex exits cleanly, pattern matches in output are false positives from findings
49✔
292
        // (e.g., reviewing code that handles rate limits).
49✔
293
        // skip pattern checks on context cancellation — cancellation must propagate as-is.
49✔
294
        if finalErr != nil && ctx.Err() == nil {
71✔
295
                if patternErr := e.checkPatterns(stdoutContent, stderrRes); patternErr != nil {
37✔
296
                        return Result{Output: stdoutContent, Signal: signal, Error: patternErr}
15✔
297
                }
15✔
298
        }
299

300
        // return stdout content as the result (the actual answer from codex)
301
        return Result{Output: stdoutContent, Signal: signal, Error: finalErr}
34✔
302
}
303

304
// finalError reconciles stderr/stdout/wait errors into the single error returned
305
// from Run. stderr and stdout errors win over wait errors so callers see the
306
// root cause rather than the cascade exit code; ctx.Err() short-circuits to
307
// preserve cancellation semantics; non-zero exit with stderr tail produces a
308
// readable diagnostic that includes the last few stderr lines.
309
func (e *CodexExecutor) finalError(ctx context.Context, stderrRes stderrResult, stdoutErr, waitErr error) error {
56✔
310
        switch {
56✔
311
        case stderrRes.err != nil && !errors.Is(stderrRes.err, context.Canceled):
2✔
312
                return stderrRes.err
2✔
313
        case stdoutErr != nil:
2✔
314
                return stdoutErr
2✔
315
        case waitErr != nil:
27✔
316
                if ctx.Err() != nil {
31✔
317
                        return fmt.Errorf("context error: %w", ctx.Err())
4✔
318
                }
4✔
319
                if len(stderrRes.lastLines) > 0 {
34✔
320
                        return fmt.Errorf("codex exited with error: %w\nstderr: %s",
11✔
321
                                waitErr, strings.Join(stderrRes.lastLines, "\n"))
11✔
322
                }
11✔
323
                return fmt.Errorf("codex exited with error: %w", waitErr)
12✔
324
        }
325
        return nil
25✔
326
}
327

328
// touchReader wraps an io.Reader to invoke touch on each successful Read.
329
// used to keep the idle-timeout timer alive while stdout is being drained.
330
type touchReader struct {
331
        r     io.Reader
332
        touch func()
333
}
334

335
func (t *touchReader) Read(p []byte) (int, error) {
13✔
336
        n, err := t.r.Read(p)
13✔
337
        if n > 0 && t.touch != nil {
20✔
338
                t.touch()
7✔
339
        }
7✔
340
        return n, err //nolint:wrapcheck // pass-through reader; preserve EOF and original error semantics
13✔
341
}
342

343
// logDroppedIdleErrors surfaces concurrent stream/wait errors that would otherwise
344
// be discarded by the idle-timeout completion path. operators need this to
345
// distinguish "agent went silent" from "stream broke" before retrying.
346
func (e *CodexExecutor) logDroppedIdleErrors(stdoutErr, waitErr error) {
2✔
347
        if stdoutErr != nil {
2✔
NEW
348
                log.Printf("codex idle timeout fired with concurrent stdout error: %v", stdoutErr)
×
NEW
349
        }
×
350
        if waitErr != nil {
4✔
351
                log.Printf("codex idle timeout fired with concurrent wait error: %v", waitErr)
2✔
352
        }
2✔
353
}
354

355
// idleTimeoutResult builds the Result returned when the idle-timeout timer
356
// canceled the derived execution context (parent ctx still alive). limit and
357
// error patterns are still checked across stdout and stderr so a wait-and-retry
358
// triggered by a real quota diagnostic survives idle-timeout cancellation;
359
// otherwise IdleTimedOut is set and the caller treats this as a soft kill.
360
func (e *CodexExecutor) idleTimeoutResult(stdoutContent, signal string, stderr stderrResult) Result {
2✔
361
        if patternErr := e.checkPatterns(stdoutContent, stderr); patternErr != nil {
3✔
362
                return Result{Output: stdoutContent, Signal: signal, Error: patternErr}
1✔
363
        }
1✔
364
        return Result{Output: stdoutContent, Signal: signal, IdleTimedOut: true}
1✔
365
}
366

367
// checkPatterns scans stdout AND the stderr matches captured live during streaming
368
// for limit/error patterns. codex emits OpenAI/ChatGPT plan-quota errors (e.g.,
369
// "ERROR: You've hit your usage limit") to stderr while stdout is empty on failure;
370
// processStderr matches each line on the fly so detection is not subject to the
371
// 5-line / 256-rune tail truncation used for human-readable error context.
372
//
373
// Priority is limit-first across both sources before any error match: a real
374
// stderr quota diagnostic (already filtered through the CLI-error prefix gate
375
// in processStderr) must not be downgraded to a non-retryable PatternMatchError
376
// just because partial stdout happens to match a configured ErrorPattern. Within
377
// each severity class, stdout wins over stderr so an explicit stdout limit/error
378
// takes precedence when both sources fire.
379
//
380
// Order:
381
//  1. stdout LimitPatterns
382
//  2. stderr.limitMatch (prefix-gated)
383
//  3. stdout ErrorPatterns
384
//  4. stderr.errorMatch (prefix-gated)
385
//
386
// returns LimitPatternError or PatternMatchError when a pattern matches; nil otherwise.
387
func (e *CodexExecutor) checkPatterns(stdoutContent string, stderr stderrResult) error {
24✔
388
        // limit-class first — across both sources
24✔
389
        if pattern := matchPattern(stdoutContent, e.LimitPatterns); pattern != "" {
28✔
390
                return &LimitPatternError{Pattern: pattern, HelpCmd: "codex /status"}
4✔
391
        }
4✔
392
        if stderr.limitMatch != "" {
26✔
393
                return &LimitPatternError{Pattern: stderr.limitMatch, HelpCmd: "codex /status"}
6✔
394
        }
6✔
395

396
        // error-class second
397
        if pattern := matchPattern(stdoutContent, e.ErrorPatterns); pattern != "" {
19✔
398
                return &PatternMatchError{Pattern: pattern, HelpCmd: "codex /status"}
5✔
399
        }
5✔
400
        if stderr.errorMatch != "" {
10✔
401
                return &PatternMatchError{Pattern: stderr.errorMatch, HelpCmd: "codex /status"}
1✔
402
        }
1✔
403

404
        return nil
8✔
405
}
406

407
// stderrResult holds processed stderr output and any error from reading.
408
// limitMatch and errorMatch capture the FIRST limit/error pattern that fires
409
// during streaming, on the untruncated, un-evicted line — so detection is not
410
// subject to the lastLines tail truncation (5 lines, 256 runes per line).
411
type stderrResult struct {
412
        lastLines  []string // last few lines of stderr for error context
413
        limitMatch string   // first matched limit pattern seen on stderr (live scan)
414
        errorMatch string   // first matched error pattern seen on stderr (live scan)
415
        err        error
416
}
417

418
// stderrStreamOpts bundles the per-invocation streaming inputs for processStderr.
419
type stderrStreamOpts struct {
420
        idleTouch   func()        // invoked for every stderr line to reset the idle-timeout timer; pass a no-op when idle timeout is disabled
421
        sessionIDCh chan<- string // when non-nil, receives the first detected "session id: <uuid>" (non-blocking, buffered channel expected)
422
        firstRun    bool          // gates the one-time emission of codex's resolved model/sandbox/effort header lines
423
}
424

425
// processStderr reads stderr line-by-line, filters for progress display, and
426
// scans each line for configured limit/error patterns. shows header block
427
// (between first two "--------" separators) and bold summaries. captures last
428
// lines of unfiltered output for error reporting AND records the first
429
// limit/error pattern hit (untruncated, un-evicted) so callers can rely on it
430
// regardless of how much chatter follows. see stderrStreamOpts for the
431
// per-invocation streaming inputs.
432
func (e *CodexExecutor) processStderr(ctx context.Context, r io.Reader, opts stderrStreamOpts) stderrResult {
63✔
433
        const maxTailLines = 5    // keep last N lines for error context
63✔
434
        const maxLineLength = 256 // truncate long lines to avoid oversized error strings
63✔
435

63✔
436
        state := &codexFilterState{firstRun: opts.firstRun}
63✔
437
        var tail []string
63✔
438
        var limitMatch, errorMatch string
63✔
439
        sessionIDSent := false
63✔
440

63✔
441
        err := readLines(ctx, r, func(line string) {
615✔
442
                if opts.idleTouch != nil {
1,084✔
443
                        opts.idleTouch() // reset idle timer on every stderr line
532✔
444
                }
532✔
445
                // scan untruncated line for patterns first; record only the first hit
446
                // per category so detection is eviction- and truncation-resistant.
447
                // restricted to CLI-error-prefixed lines (see scanLineForPatterns).
448
                e.scanLineForPatterns(line, &limitMatch, &errorMatch)
552✔
449

552✔
450
                // surface session id from header block to caller (once) so the rollout
552✔
451
                // file can be tailed in parallel for assistant-message streaming.
552✔
452
                if !sessionIDSent && opts.sessionIDCh != nil {
1,081✔
453
                        if id := e.extractSessionID(line); id != "" {
533✔
454
                                select {
4✔
455
                                case opts.sessionIDCh <- id:
4✔
NEW
456
                                default:
×
457
                                }
458
                                sessionIDSent = true
4✔
459
                        }
460
                }
461

462
                // capture non-empty lines for error context, preserving original formatting
463
                if strings.TrimSpace(line) != "" {
1,104✔
464
                        stored := line
552✔
465
                        if runes := []rune(stored); len(runes) > maxLineLength {
559✔
466
                                stored = string(runes[:maxLineLength]) + "..."
7✔
467
                        }
7✔
468
                        tail = append(tail, stored)
552✔
469
                        if len(tail) > maxTailLines {
992✔
470
                                copy(tail, tail[1:])
440✔
471
                                tail = tail[:maxTailLines]
440✔
472
                        }
440✔
473
                }
474

475
                if show, filtered := e.shouldDisplay(line, state); show {
775✔
476
                        if e.OutputHandler != nil {
240✔
477
                                e.OutputHandler(filtered + "\n")
17✔
478
                        }
17✔
479
                }
480
        })
481

482
        if err != nil {
71✔
483
                return stderrResult{lastLines: tail, limitMatch: limitMatch, errorMatch: errorMatch, err: fmt.Errorf("read stderr: %w", err)}
8✔
484
        }
8✔
485
        return stderrResult{lastLines: tail, limitMatch: limitMatch, errorMatch: errorMatch}
55✔
486
}
487

488
// scanLineForPatterns updates limitMatch / errorMatch with the first matching
489
// limit/error pattern found in line, gated by isCodexErrorLine so progress
490
// chatter cannot trigger false positives. Once each match has been recorded
491
// it sticks for the rest of the run.
492
func (e *CodexExecutor) scanLineForPatterns(line string, limitMatch, errorMatch *string) {
552✔
493
        if !isCodexErrorLine(line) {
1,092✔
494
                return
540✔
495
        }
540✔
496
        if *limitMatch == "" {
24✔
497
                if pattern := matchPattern(line, e.LimitPatterns); pattern != "" {
20✔
498
                        *limitMatch = pattern
8✔
499
                }
8✔
500
        }
501
        if *errorMatch == "" {
24✔
502
                if pattern := matchPattern(line, e.ErrorPatterns); pattern != "" {
17✔
503
                        *errorMatch = pattern
5✔
504
                }
5✔
505
        }
506
}
507

508
// isCodexErrorLine reports whether a stderr line looks like a CLI error message
509
// codex reliably prefixes diagnostics. limit/error pattern matching is gated on
510
// this prefix so progress text on stderr (header banners, bold summaries, model
511
// chatter that may legitimately mention "rate limit" while reviewing code) does
512
// not trigger false-positive matches.
513
func isCodexErrorLine(line string) bool {
565✔
514
        s := strings.TrimSpace(line)
565✔
515
        if s == "" {
567✔
516
                return false
2✔
517
        }
2✔
518
        // case-insensitive prefix match; codex uses "ERROR:" today, others are
519
        // defensive against possible future variants.
520
        lower := strings.ToLower(s)
563✔
521
        return strings.HasPrefix(lower, "error:") ||
563✔
522
                strings.HasPrefix(lower, "fatal:") ||
563✔
523
                strings.HasPrefix(lower, "panic:")
563✔
524
}
525

526
// readStdout reads the entire stdout content as the final response.
527
func (e *CodexExecutor) readStdout(r io.Reader) (string, error) {
53✔
528
        data, err := io.ReadAll(r)
53✔
529
        if err != nil {
54✔
530
                return "", fmt.Errorf("read stdout: %w", err)
1✔
531
        }
1✔
532
        return string(data), nil
52✔
533
}
534

535
// shouldDisplay implements a simple filter for codex stderr output.
536
// shows: bold reasoning summaries codex emits as live progress; on the very
537
// first codex invocation across this executor's lifetime (state.firstRun)
538
// also shows codex's resolved model/sandbox/effort lines from the header
539
// block so the user sees what codex actually picked from ~/.codex/config.toml.
540
// per-iteration header repetition (workdir/provider/approval/session id) is
541
// always suppressed to match ClaudeExecutor's empty-banner UX. session id
542
// detection in processStderr is independent of display so the rollout tailer
543
// still works whether the line is forwarded or not.
544
// also deduplicates lines to avoid non-consecutive repeats.
545
func (e *CodexExecutor) shouldDisplay(line string, state *codexFilterState) (bool, string) {
589✔
546
        s := strings.TrimSpace(line)
589✔
547
        if s == "" {
593✔
548
                return false, ""
4✔
549
        }
4✔
550

551
        var show bool
585✔
552
        var filtered string
585✔
553

585✔
554
        switch {
585✔
555
        case strings.HasPrefix(s, "--------"):
56✔
556
                // track separators only so subsequent header lines stay suppressed;
56✔
557
                // never displayed.
56✔
558
                state.headerCount++
56✔
559
        case state.headerCount == 1:
49✔
560
                // inside the header block. on the first run let codex's resolved
49✔
561
                // config (model / sandbox / reasoning effort) leak through so the
49✔
562
                // banner reflects what codex actually picked when ralphex did not
49✔
563
                // explicitly override these fields.
49✔
564
                if state.firstRun && e.isHeaderConfigLine(s) {
58✔
565
                        show = true
9✔
566
                        filtered = s
9✔
567
                }
9✔
568
        case strings.HasPrefix(s, "**"):
225✔
569
                // show bold summaries after header (progress indication)
225✔
570
                show = true
225✔
571
                filtered = e.stripBold(s)
225✔
572
        }
573

574
        // deduplicate displayed lines
575
        if show {
819✔
576
                if state.seen == nil {
252✔
577
                        state.seen = make(map[string]bool)
18✔
578
                }
18✔
579
                if state.seen[filtered] {
238✔
580
                        return false, "" // skip duplicate
4✔
581
                }
4✔
582
                state.seen[filtered] = true
230✔
583
        }
584

585
        return show, filtered
581✔
586
}
587

588
// isHeaderConfigLine returns true when line is one of codex's header-block
589
// lines describing the resolved per-session config that ralphex doesn't know
590
// up front (model picked from ~/.codex/config.toml, sandbox, reasoning effort).
591
// other header lines (workdir, provider, approval, reasoning summaries,
592
// session id) are either obvious from context or not useful to the user.
593
func (e *CodexExecutor) isHeaderConfigLine(s string) bool {
33✔
594
        return strings.HasPrefix(s, "model:") ||
33✔
595
                strings.HasPrefix(s, "sandbox:") ||
33✔
596
                strings.HasPrefix(s, "reasoning effort:")
33✔
597
}
33✔
598

599
// stripBold removes markdown bold markers (**text**) from text.
600
func (e *CodexExecutor) stripBold(s string) string {
231✔
601
        // replace **text** with text
231✔
602
        result := s
231✔
603
        for {
692✔
604
                start := strings.Index(result, "**")
461✔
605
                if start == -1 {
691✔
606
                        break
230✔
607
                }
608
                end := strings.Index(result[start+2:], "**")
231✔
609
                if end == -1 {
232✔
610
                        break
1✔
611
                }
612
                // remove both markers
613
                result = result[:start] + result[start+2:start+2+end] + result[start+2+end+2:]
230✔
614
        }
615
        return result
231✔
616
}
617

618
// sessionIDPattern matches the "session id: <uuid>" line codex emits in its
619
// startup banner. capture group 1 is the session id (lowercase hex + dashes).
620
var sessionIDPattern = regexp.MustCompile(`(?i)\bsession id:\s*([0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12})\b`)
621

622
// extractSessionID returns the codex session id from a stderr line that
623
// includes "session id: <uuid>", or "" when the line does not match. used
624
// by processStderr to surface the id to the rollout-tail goroutine.
625
func (e *CodexExecutor) extractSessionID(line string) string {
535✔
626
        m := sessionIDPattern.FindStringSubmatch(line)
535✔
627
        if len(m) < 2 {
1,063✔
628
                return ""
528✔
629
        }
528✔
630
        return m[1]
7✔
631
}
632

633
// startRolloutTail spawns the rollout-tail goroutine and returns a cancel
634
// function plus a done channel. tail goroutine waits for the session id on
635
// sessionIDCh, then follows codex's session rollout file until the returned
636
// cancel is called. caller must invoke tailCancel and wait on tailDone before
637
// returning so the tailer drains remaining file content and exits cleanly.
638
// the goroutine is a no-op when OutputHandler is nil — extracted from Run()
639
// to keep its cyclomatic complexity in check.
640
func (e *CodexExecutor) startRolloutTail(parent context.Context, sessionIDCh <-chan string, idleTouch func()) (context.CancelFunc, <-chan struct{}) {
51✔
641
        tailCtx, tailCancel := context.WithCancel(parent)
51✔
642
        done := make(chan struct{})
51✔
643
        go func() {
102✔
644
                defer close(done)
51✔
645
                select {
51✔
646
                case <-tailCtx.Done():
48✔
647
                        return
48✔
648
                case id := <-sessionIDCh:
3✔
649
                        e.tailRolloutFile(tailCtx, id, idleTouch)
3✔
650
                }
651
        }()
652
        return tailCancel, done
51✔
653
}
654

655
// findRolloutFile resolves the path to codex's session-rollout JSONL file
656
// for the given session id. codex stores the file under
657
// ~/.codex/sessions/<year>/<month>/<day>/rollout-<timestamp>-<session-id>.jsonl
658
// and may take a brief moment to create it after printing the session-id
659
// banner, so we poll up to ~5s. returns "" when the file cannot be located.
660
func (e *CodexExecutor) findRolloutFile(ctx context.Context, sessionID string) string {
7✔
661
        home, err := os.UserHomeDir()
7✔
662
        if err != nil {
7✔
NEW
663
                return ""
×
NEW
664
        }
×
665
        pattern := filepath.Join(home, ".codex", "sessions", "*", "*", "*", "rollout-*-"+sessionID+".jsonl")
7✔
666

7✔
667
        deadline := time.Now().Add(5 * time.Second)
7✔
668
        for {
16✔
669
                matches, _ := filepath.Glob(pattern)
9✔
670
                if len(matches) > 0 {
11✔
671
                        return matches[0]
2✔
672
                }
2✔
673
                if time.Now().After(deadline) {
7✔
NEW
674
                        return ""
×
NEW
675
                }
×
676
                select {
7✔
677
                case <-ctx.Done():
5✔
678
                        return ""
5✔
679
                case <-time.After(100 * time.Millisecond):
2✔
680
                }
681
        }
682
}
683

684
// tailRolloutFile follows codex's session rollout JSONL file like `tail -f`,
685
// parses each event, and emits human-readable progress lines via OutputHandler.
686
// runs until ctx is canceled. on cancellation, drains any remaining buffered
687
// lines before returning so late writes (e.g. codex flushing the final
688
// assistant message just before exit) are not lost.
689
func (e *CodexExecutor) tailRolloutFile(ctx context.Context, sessionID string, idleTouch func()) {
4✔
690
        if e.OutputHandler == nil {
4✔
NEW
691
                return
×
NEW
692
        }
×
693
        path := e.findRolloutFile(ctx, sessionID)
4✔
694
        if path == "" {
7✔
695
                // suppress the diagnostic when the session was canceled — findRolloutFile
3✔
696
                // also returns "" on ctx.Done(), and that is not a failure worth logging.
3✔
697
                if ctx.Err() == nil {
3✔
NEW
698
                        log.Printf("codex rollout file not found for session %s; assistant output streaming disabled for this session", sessionID)
×
NEW
699
                }
×
700
                return
3✔
701
        }
702
        f, err := os.Open(path) //nolint:gosec // path comes from codex's own session id
1✔
703
        if err != nil {
1✔
NEW
704
                log.Printf("codex rollout file open failed (%s): %v; assistant output streaming disabled for this session", path, err)
×
NEW
705
                return
×
NEW
706
        }
×
707
        defer func() { _ = f.Close() }()
2✔
708

709
        // accumulator holds bytes that may not yet form a complete line, so partial
710
        // reads at EOF do not lose content — the next Read after codex appends more
711
        // will complete the line.
712
        var acc []byte
1✔
713
        chunk := make([]byte, 4096)
1✔
714
        drainOnce := func() {
4✔
715
                for {
8✔
716
                        n, readErr := f.Read(chunk)
5✔
717
                        if n > 0 {
7✔
718
                                // any rollout bytes count as liveness — reset the idle timer
2✔
719
                                // before display filtering so a session actively dispatching
2✔
720
                                // tool calls (function_call records that formatRolloutEvent
2✔
721
                                // drops) is not killed as idle while still making progress.
2✔
722
                                if idleTouch != nil {
2✔
NEW
723
                                        idleTouch()
×
NEW
724
                                }
×
725
                                acc = append(acc, chunk[:n]...)
2✔
726
                                for {
7✔
727
                                        i := bytes.IndexByte(acc, '\n')
5✔
728
                                        if i < 0 {
7✔
729
                                                break
2✔
730
                                        }
731
                                        if msg := e.formatRolloutEvent(acc[:i]); msg != "" {
5✔
732
                                                e.OutputHandler(msg)
2✔
733
                                        }
2✔
734
                                        acc = acc[i+1:]
3✔
735
                                }
736
                        }
737
                        if readErr == io.EOF || n == 0 {
8✔
738
                                return
3✔
739
                        }
3✔
740
                        if readErr != nil {
2✔
NEW
741
                                return
×
NEW
742
                        }
×
743
                }
744
        }
745

746
        for {
3✔
747
                drainOnce()
2✔
748
                select {
2✔
749
                case <-ctx.Done():
1✔
750
                        // final drain after codex exits — pick up any late-flushed events
1✔
751
                        drainOnce()
1✔
752
                        return
1✔
753
                case <-time.After(200 * time.Millisecond):
1✔
754
                }
755
        }
756
}
757

758
// rolloutEvent is the outer wrapper for each line in codex's session rollout
759
// JSONL file. only `type` and `payload` are needed; we re-parse payload based
760
// on the type.
761
type rolloutEvent struct {
762
        Type    string          `json:"type"`
763
        Payload json.RawMessage `json:"payload"`
764
}
765

766
// rolloutPayload covers the response_item payload shape we render: assistant
767
// messages (payload.type=message, role=assistant). function_call records and
768
// reasoning records are dropped by formatRolloutEvent before any of those
769
// fields would be read, so the struct only carries the subset we actually
770
// consume.
771
type rolloutPayload struct {
772
        Type    string `json:"type"`
773
        Role    string `json:"role"`
774
        Content []struct {
775
                Type string `json:"type"`
776
                Text string `json:"text"`
777
        } `json:"content"`
778
}
779

780
// formatRolloutEvent turns one JSONL rollout line into a display string for
781
// OutputHandler, or "" when the event has no user-visible substance. only
782
// assistant message text (the model's actual reply, the codex equivalent of
783
// claude's stream-json text blocks) is forwarded.
784
//
785
// reasoning records are skipped because their summaries are already streamed
786
// live from stderr. all function_call records (exec_command for git/grep/file
787
// reads, spawn_agent for parallel reviewer dispatch) and their outputs are
788
// skipped because they are tool-machinery noise — the assistant message text
789
// already announces what the model is doing narratively (e.g. "I'll launch
790
// the five review agents together"). showing both yields redundant chatter.
791
func (e *CodexExecutor) formatRolloutEvent(line []byte) string {
16✔
792
        if len(bytes.TrimSpace(line)) == 0 {
18✔
793
                return ""
2✔
794
        }
2✔
795
        var ev rolloutEvent
14✔
796
        if err := json.Unmarshal(line, &ev); err != nil {
15✔
797
                return ""
1✔
798
        }
1✔
799
        if ev.Type != "response_item" {
16✔
800
                return ""
3✔
801
        }
3✔
802
        var p rolloutPayload
10✔
803
        if err := json.Unmarshal(ev.Payload, &p); err != nil {
10✔
NEW
804
                return ""
×
NEW
805
        }
×
806
        if p.Type != "message" || p.Role != "assistant" {
16✔
807
                return ""
6✔
808
        }
6✔
809
        var sb strings.Builder
4✔
810
        for _, c := range p.Content {
9✔
811
                if c.Type != "output_text" || c.Text == "" {
5✔
NEW
812
                        continue
×
813
                }
814
                if sb.Len() > 0 {
6✔
815
                        sb.WriteByte('\n')
1✔
816
                }
1✔
817
                sb.WriteString(c.Text)
5✔
818
        }
819
        return sb.String()
4✔
820
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc