• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

umputun / ralphex / 26431227500

26 May 2026 03:44AM UTC coverage: 83.252% (-0.1%) from 83.363%
26431227500

Pull #364

github

umputun
docs: document processor phase architecture

Archive the completed implementation plan and update project guidance with the phase package boundaries.
Pull Request #364: Refactor processor runner into phase engines

1101 of 1215 new or added lines in 15 files covered. (90.62%)

20 existing lines in 4 files now uncovered.

7670 of 9213 relevant lines covered (83.25%)

222.9 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

82.11
/pkg/processor/phase/task.go
1
package phase
2

3
import (
4
        "context"
5
        "errors"
6
        "fmt"
7
        "io/fs"
8
        "time"
9

10
        "github.com/umputun/ralphex/pkg/plan"
11
        "github.com/umputun/ralphex/pkg/status"
12
)
13

14
// TaskPhase executes plan tasks until completion.
15
type TaskPhase struct {
16
        cfg            Config
17
        log            TaskLogger
18
        exec           Executor
19
        policy         Policy
20
        prompts        TaskPrompts
21
        locator        Locator
22
        deps           *Deps
23
        breaks         *BreakController
24
        iterationDelay time.Duration
25
        retryCount     int
26
}
27

28
// TaskPhaseOpts contains dependencies for TaskPhase.
29
type TaskPhaseOpts struct {
30
        Cfg            Config
31
        Log            TaskLogger
32
        Exec           Executor
33
        Policy         Policy
34
        Prompts        TaskPrompts
35
        Locator        Locator
36
        Deps           *Deps
37
        Breaks         *BreakController
38
        IterationDelay time.Duration
39
        RetryCount     int
40
}
41

42
// NewTaskPhase creates a task phase engine.
43
func NewTaskPhase(opts TaskPhaseOpts) *TaskPhase {
75✔
44
        breaks := opts.Breaks
75✔
45
        if breaks == nil {
75✔
NEW
46
                breaks = NewBreakController(opts.Deps)
×
NEW
47
        }
×
48
        return &TaskPhase{
75✔
49
                cfg: opts.Cfg, log: opts.Log, exec: opts.Exec, policy: opts.Policy,
75✔
50
                prompts: opts.Prompts, locator: opts.Locator, deps: opts.Deps, breaks: breaks,
75✔
51
                iterationDelay: opts.IterationDelay, retryCount: opts.RetryCount,
75✔
52
        }
75✔
53
}
54

55
// Run executes one plan task per iteration until all actionable task checkboxes are complete.
56
func (p *TaskPhase) Run(ctx context.Context) error {
9✔
57
        prompt := p.prompts.TaskPrompt()
9✔
58
        retryCount := 0
9✔
59

9✔
60
        for i := 1; i <= p.cfg.MaxIterations; i++ {
26✔
61
                select {
17✔
62
                case <-ctx.Done():
1✔
63
                        return fmt.Errorf("task phase: %w", ctx.Err())
1✔
64
                default:
16✔
65
                }
66

67
                taskNum := i
16✔
68
                if pos := p.NextPlanTaskPosition(); pos > 0 {
26✔
69
                        taskNum = pos
10✔
70
                }
10✔
71
                p.log.PrintSection(status.NewTaskIterationSection(taskNum))
16✔
72

16✔
73
                loopCtx, loopCancel := p.breaks.context(ctx)
16✔
74

16✔
75
                execName := p.executorName()
16✔
76
                execResult := p.policy.Run(loopCtx, p.exec.Run, prompt, execName)
16✔
77
                result := execResult.Result
16✔
78

16✔
79
                manualBreak := p.breaks.isBreak(loopCtx, ctx)
16✔
80
                loopCancel()
16✔
81

16✔
82
                if manualBreak {
19✔
83
                        p.log.Print("session interrupted by break signal")
3✔
84
                        p.breaks.drain()
3✔
85
                        if p.deps.PauseHandler == nil || !p.deps.PauseHandler(ctx) {
4✔
86
                                return ErrUserAborted
1✔
87
                        }
1✔
88
                        p.breaks.drain()
2✔
89
                        i--
2✔
90
                        retryCount = 0
2✔
91
                        continue
2✔
92
                }
93

94
                if result.Error != nil {
13✔
NEW
95
                        if err := p.policy.HandlePatternMatchError(result.Error, execName); err != nil {
×
NEW
96
                                return fmt.Errorf("%s pattern handling: %w", execName, err)
×
NEW
97
                        }
×
NEW
98
                        return fmt.Errorf("%s execution: %w", execName, result.Error)
×
99
                }
100

101
                if execResult.TimedOut {
14✔
102
                        p.log.Print("%s session timed out, retrying task iteration...", execName)
1✔
103
                        continue
1✔
104
                }
105

106
                if result.Signal == SignalCompleted {
16✔
107
                        if p.HasUncompletedTasks() {
4✔
NEW
108
                                p.log.Print("warning: completion signal received but plan still has [ ] items, continuing...")
×
NEW
109
                                continue
×
110
                        }
111
                        p.log.PrintRaw("\nall tasks completed, starting code review...\n")
4✔
112
                        return nil
4✔
113
                }
114

115
                if result.Signal == SignalFailed {
13✔
116
                        if retryCount < p.retryCount {
8✔
117
                                p.log.Print("task failed, retrying...")
3✔
118
                                retryCount++
3✔
119
                                if err := p.policy.Sleep(ctx, p.iterationDelay); err != nil {
3✔
NEW
120
                                        return fmt.Errorf("interrupted: %w", err)
×
NEW
121
                                }
×
122
                                continue
3✔
123
                        }
124
                        return errors.New("task execution failed after retry (FAILED signal received)")
2✔
125
                }
126

127
                retryCount = 0
3✔
128
                if err := p.policy.Sleep(ctx, p.iterationDelay); err != nil {
3✔
NEW
129
                        return fmt.Errorf("interrupted: %w", err)
×
NEW
130
                }
×
131
        }
132

133
        return fmt.Errorf("max iterations (%d) reached without completion", p.cfg.MaxIterations)
1✔
134
}
135

136
// ValidatePlanHasTasks rejects plan files without executable task sections.
137
func (p *TaskPhase) ValidatePlanHasTasks() error {
4✔
138
        path := p.locator.Path()
4✔
139
        parsed, err := plan.ParsePlanFile(path)
4✔
140
        if err != nil {
4✔
NEW
141
                return fmt.Errorf("parse plan for validation: %w", err)
×
NEW
142
        }
×
143
        if len(parsed.Tasks) == 0 {
6✔
144
                return fmt.Errorf("plan file %q has no executable task sections (### Task N: or ### Iteration N:); add task sections or pass a different plan file", path)
2✔
145
        }
2✔
146
        return nil
2✔
147
}
148

149
// HasUncompletedTasks reports whether the current plan still has actionable unchecked task work.
150
func (p *TaskPhase) HasUncompletedTasks() bool {
11✔
151
        path := p.locator.Path()
11✔
152
        if path == "" {
11✔
NEW
153
                return false
×
NEW
154
        }
×
155
        parsed, err := plan.ParsePlanFile(path)
11✔
156
        if err != nil {
12✔
157
                if errors.Is(err, fs.ErrNotExist) {
2✔
158
                        return false
1✔
159
                }
1✔
NEW
160
                p.log.Print("[WARN] failed to parse plan file for completion check: %v", err)
×
NEW
161
                return true
×
162
        }
163
        for _, t := range parsed.Tasks {
22✔
164
                if t.HasUncompletedActionableWork() {
15✔
165
                        return true
3✔
166
                }
3✔
167
        }
168
        if len(parsed.Tasks) == 0 {
8✔
169
                has, err := plan.FileHasUncompletedCheckbox(path)
1✔
170
                if err != nil {
1✔
NEW
171
                        return true
×
NEW
172
                }
×
173
                if has {
2✔
174
                        return true
1✔
175
                }
1✔
176
        }
177
        return false
6✔
178
}
179

180
// NextPlanTaskPosition returns the 1-indexed first uncompleted task position, or zero when unavailable.
181
func (p *TaskPhase) NextPlanTaskPosition() int {
23✔
182
        parsed, err := plan.ParsePlanFile(p.locator.Path())
23✔
183
        if err != nil {
25✔
184
                p.log.Print("[WARN] failed to parse plan file for task position: %v", err)
2✔
185
                return 0
2✔
186
        }
2✔
187
        for i, t := range parsed.Tasks {
44✔
188
                if t.HasUncompletedActionableWork() {
35✔
189
                        return i + 1
12✔
190
                }
12✔
191
        }
192
        return 0
9✔
193
}
194

195
func (p *TaskPhase) executorName() string {
16✔
196
        if p.cfg.isCodexExecutor() {
16✔
NEW
197
                return "codex"
×
NEW
198
        }
×
199
        return "claude"
16✔
200
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc