• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

rom8726 / floxy / 19547218997

20 Nov 2025 06:23PM UTC coverage: 46.889% (+0.2%) from 46.739%
19547218997

push

github

web-flow
Prevent parallel branch steps from completing during rollback. (#20)

38 of 51 new or added lines in 1 file covered. (74.51%)

1 existing line in 1 file now uncovered.

4943 of 10542 relevant lines covered (46.89%)

47.11 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

75.91
/engine.go
1
package floxy
2

3
import (
4
        "context"
5
        "encoding/json"
6
        "errors"
7
        "fmt"
8
        "log/slog"
9
        "math/rand"
10
        "sort"
11
        "sync"
12
        "time"
13

14
        "github.com/jackc/pgx/v5/pgxpool"
15
)
16

17
const (
18
        defaultCancelWorkerInterval = 100 * time.Millisecond
19
        defaultShutdownTimeout      = 5 * time.Second
20
)
21

22
type Engine struct {
23
        txManager            TxManager
24
        store                Store
25
        handlers             map[string]StepHandler
26
        mu                   sync.RWMutex
27
        cancelContexts       map[int64]map[int64]context.CancelFunc // instanceID -> stepID -> cancel function
28
        cancelMu             sync.RWMutex
29
        cancelWorkerInterval time.Duration
30

31
        // Shutdown logic controls
32
        shutdownCh     chan struct{}
33
        shutdownOnce   sync.Once
34
        shutdownCtx    context.Context
35
        shutdownCancel context.CancelFunc
36
        activeSteps    sync.WaitGroup
37
        isShuttingDown bool
38
        shutdownMu     sync.RWMutex
39

40
        humanDecisionWaitingOnce   sync.Once
41
        humanDecisionWaitingEvents chan HumanDecisionWaitingEvent
42

43
        pluginManager *PluginManager
44

45
        // Missing-handler behavior controls
46
        missingHandlerCooldown    time.Duration
47
        missingHandlerLogThrottle time.Duration
48
        missingHandlerJitterPct   float64
49
        skipLogMu                 sync.Mutex
50
        skipLogNextAllowed        map[string]time.Time
51
}
52

53
func NewEngine(pool *pgxpool.Pool, opts ...EngineOption) *Engine {
144✔
54
        shutdownCtx, shutdownCancel := context.WithCancel(context.Background())
144✔
55

144✔
56
        engine := &Engine{
144✔
57
                txManager:            NewTxManager(pool),
144✔
58
                store:                NewStore(pool),
144✔
59
                handlers:             make(map[string]StepHandler),
144✔
60
                cancelContexts:       make(map[int64]map[int64]context.CancelFunc),
144✔
61
                shutdownCh:           make(chan struct{}),
144✔
62
                cancelWorkerInterval: defaultCancelWorkerInterval,
144✔
63
                shutdownCtx:          shutdownCtx,
144✔
64
                shutdownCancel:       shutdownCancel,
144✔
65
                // defaults for missing-handler behavior
144✔
66
                missingHandlerCooldown:    time.Second,
144✔
67
                missingHandlerLogThrottle: 5 * time.Second,
144✔
68
                missingHandlerJitterPct:   0.2,
144✔
69
                skipLogNextAllowed:        make(map[string]time.Time),
144✔
70
        }
144✔
71

144✔
72
        for _, opt := range opts {
467✔
73
                opt(engine)
323✔
74
        }
323✔
75

76
        go engine.cancelRequestsWorker()
144✔
77

144✔
78
        return engine
144✔
79
}
80

81
// jitteredCooldown returns cooldown with +/-jitter percentage applied.
82
func (engine *Engine) jitteredCooldown() time.Duration {
32✔
83
        base := engine.missingHandlerCooldown
32✔
84
        if base <= 0 {
37✔
85
                return 0
5✔
86
        }
5✔
87
        pct := engine.missingHandlerJitterPct
27✔
88
        if pct <= 0 {
34✔
89
                return base
7✔
90
        }
7✔
91
        // random in [-pct, +pct]
92
        delta := (rand.Float64()*2 - 1) * pct
20✔
93
        adj := float64(base) * (1 + delta)
20✔
94
        if adj < float64(time.Millisecond) {
20✔
95
                adj = float64(time.Millisecond)
×
96
        }
×
97

98
        return time.Duration(adj)
20✔
99
}
100

101
// shouldLogSkip returns true if we should emit a skip event for the given key now.
102
func (engine *Engine) shouldLogSkip(key string) bool {
12✔
103
        if engine.missingHandlerLogThrottle <= 0 {
15✔
104
                return true
3✔
105
        }
3✔
106

107
        now := time.Now()
9✔
108

9✔
109
        engine.skipLogMu.Lock()
9✔
110
        defer engine.skipLogMu.Unlock()
9✔
111

9✔
112
        if next, ok := engine.skipLogNextAllowed[key]; ok && now.Before(next) {
11✔
113
                return false
2✔
114
        }
2✔
115

116
        engine.skipLogNextAllowed[key] = now.Add(engine.missingHandlerLogThrottle)
7✔
117

7✔
118
        return true
7✔
119
}
120

121
func (engine *Engine) RegisterHandler(handler StepHandler) {
125✔
122
        engine.mu.Lock()
125✔
123
        defer engine.mu.Unlock()
125✔
124
        engine.handlers[handler.Name()] = wrapProcessPanicHandler(handler)
125✔
125
}
125✔
126

127
func (engine *Engine) RegisterPlugin(plugin Plugin) {
1✔
128
        engine.mu.Lock()
1✔
129
        defer engine.mu.Unlock()
1✔
130

1✔
131
        if engine.pluginManager == nil {
2✔
132
                engine.pluginManager = NewPluginManager()
1✔
133
        }
1✔
134

135
        engine.pluginManager.Register(plugin)
1✔
136
}
137

138
func (engine *Engine) RegisterWorkflow(ctx context.Context, def *WorkflowDefinition) error {
59✔
139
        if err := engine.validateDefinition(def); err != nil {
66✔
140
                return fmt.Errorf("invalid workflow definition: %w", err)
7✔
141
        }
7✔
142

143
        return engine.store.SaveWorkflowDefinition(ctx, def)
52✔
144
}
145

146
// RequeueFromDLQ extracts a record from the DLQ and re-enqueues its step.
147
// If newInput is non-nil, it will be used as the step input before enqueueing.
148
func (engine *Engine) RequeueFromDLQ(ctx context.Context, dlqID int64, newInput *json.RawMessage) error {
3✔
149
        return engine.txManager.ReadCommitted(ctx, func(ctx context.Context) error {
6✔
150
                if err := engine.store.RequeueDeadLetter(ctx, dlqID, newInput); err != nil {
3✔
151
                        return fmt.Errorf("requeue dead letter: %w", err)
×
152
                }
×
153

154
                return nil
3✔
155
        })
156
}
157

158
func (engine *Engine) Start(ctx context.Context, workflowID string, input json.RawMessage) (int64, error) {
58✔
159
        var instanceID int64
58✔
160

58✔
161
        err := engine.txManager.ReadCommitted(ctx, func(ctx context.Context) error {
116✔
162
                def, err := engine.store.GetWorkflowDefinition(ctx, workflowID)
58✔
163
                if err != nil {
59✔
164
                        return fmt.Errorf("get workflow definition: %w", err)
1✔
165
                }
1✔
166

167
                instance, err := engine.store.CreateInstance(ctx, workflowID, input)
57✔
168
                if err != nil {
58✔
169
                        return fmt.Errorf("create instance: %w", err)
1✔
170
                }
1✔
171

172
                // PLUGIN HOOK: OnWorkflowStart
173
                if engine.pluginManager != nil {
56✔
174
                        if err := engine.pluginManager.ExecuteWorkflowStart(ctx, instance); err != nil {
×
175
                                return fmt.Errorf("plugin hook failed: %w", err)
×
176
                        }
×
177
                }
178

179
                _ = engine.store.LogEvent(ctx, instance.ID, nil, EventWorkflowStarted, map[string]any{
56✔
180
                        KeyWorkflowID: workflowID,
56✔
181
                })
56✔
182

56✔
183
                if err := engine.store.UpdateInstanceStatus(ctx, instance.ID, StatusRunning, nil, nil); err != nil {
56✔
184
                        return fmt.Errorf("update status: %w", err)
×
185
                }
×
186

187
                startStep := def.Definition.Start
56✔
188
                if startStep == "" {
57✔
189
                        return errors.New("no start step defined")
1✔
190
                }
1✔
191

192
                if err := engine.enqueueNextSteps(ctx, instance.ID, []string{startStep}, input); err != nil {
55✔
193
                        return fmt.Errorf("enqueue start step: %w", err)
×
194
                }
×
195

196
                instanceID = instance.ID
55✔
197

55✔
198
                return nil
55✔
199
        })
200
        if err != nil {
61✔
201
                return 0, err
3✔
202
        }
3✔
203

204
        return instanceID, nil
55✔
205
}
206

207
func (engine *Engine) Shutdown(timeoutOpt ...time.Duration) error {
144✔
208
        var shutdownErr error
144✔
209

144✔
210
        engine.shutdownOnce.Do(func() {
288✔
211
                slog.Info("[floxy] starting graceful shutdown")
144✔
212

144✔
213
                engine.shutdownMu.Lock()
144✔
214
                engine.isShuttingDown = true
144✔
215
                engine.shutdownMu.Unlock()
144✔
216

144✔
217
                close(engine.shutdownCh)
144✔
218
                engine.shutdownCancel()
144✔
219

144✔
220
                var timeout time.Duration
144✔
221
                if len(timeoutOpt) > 0 {
146✔
222
                        timeout = timeoutOpt[0]
2✔
223
                }
2✔
224

225
                if timeout == 0 {
286✔
226
                        timeout = defaultShutdownTimeout
142✔
227
                }
142✔
228

229
                done := make(chan struct{})
144✔
230
                go func() {
288✔
231
                        engine.activeSteps.Wait()
144✔
232
                        close(done)
144✔
233
                }()
144✔
234

235
                select {
144✔
236
                case <-done:
144✔
237
                        slog.Info("[floxy] graceful shutdown completed")
144✔
238
                case <-time.After(timeout):
×
239
                        shutdownErr = fmt.Errorf("shutdown timeout after %v", timeout)
×
240
                        slog.Warn("[floxy] shutdown timeout", "timeout", timeout)
×
241
                }
242
        })
243

244
        return shutdownErr
144✔
245
}
246

247
func (engine *Engine) ExecuteNext(ctx context.Context, workerID string) (empty bool, err error) {
937✔
248
        if engine.isShutdown() {
937✔
249
                return true, nil
×
250
        }
×
251

252
        err = engine.txManager.ReadCommitted(ctx, func(ctx context.Context) error {
1,874✔
253
                if engine.isShutdown() {
937✔
254
                        empty = true
×
255

×
256
                        return nil
×
257
                }
×
258

259
                item, err := engine.store.DequeueStep(ctx, workerID)
937✔
260
                if err != nil {
940✔
261
                        return fmt.Errorf("dequeue step: %w", err)
3✔
262
                }
3✔
263

264
                if item == nil {
1,549✔
265
                        empty = true
615✔
266

615✔
267
                        return nil
615✔
268
                }
615✔
269

270
                if engine.isShutdown() {
319✔
271
                        _ = engine.store.ReleaseQueueItem(ctx, item.ID)
×
272
                        empty = true
×
273

×
274
                        return nil
×
275
                }
×
276

277
                engine.activeSteps.Add(1)
319✔
278
                defer engine.activeSteps.Done()
319✔
279

319✔
280
                removeFromQueue := true
319✔
281
                defer func() {
638✔
282
                        if removeFromQueue {
632✔
283
                                _ = engine.store.RemoveFromQueue(ctx, item.ID)
313✔
284
                        }
313✔
285
                }()
286

287
                instance, err := engine.store.GetInstance(ctx, item.InstanceID)
319✔
288
                if err != nil {
319✔
289
                        return fmt.Errorf("get instance: %w", err)
×
290
                }
×
291

292
                // If workflow is in DLQ state, skip execution to prevent progress until operator intervention
293
                if instance.Status == StatusDLQ {
319✔
294
                        _ = engine.store.LogEvent(ctx, instance.ID, nil, EventStepFailed, map[string]any{
×
295
                                KeyMessage: "instance in DLQ state, skipping queued item",
×
296
                        })
×
297
                        return nil
×
298
                }
×
299

300
                var step *WorkflowStep
319✔
301
                if item.StepID == nil {
319✔
302
                        step, err = engine.createFirstStep(ctx, instance)
×
303
                        if err != nil {
×
304
                                return fmt.Errorf("create first step: %w", err)
×
305
                        }
×
306
                } else {
319✔
307
                        steps, err := engine.store.GetStepsByInstance(ctx, instance.ID)
319✔
308
                        if err != nil {
319✔
309
                                return fmt.Errorf("get steps: %w", err)
×
310
                        }
×
311

312
                        for _, currStep := range steps {
1,587✔
313
                                currStep := currStep
1,268✔
314
                                if currStep.ID == *item.StepID {
1,586✔
315
                                        step = &currStep
318✔
316

318✔
317
                                        break
318✔
318
                                }
319
                        }
320

321
                        if step == nil {
320✔
322
                                return fmt.Errorf("step not found: %d", *item.StepID)
1✔
323
                        }
1✔
324
                }
325

326
                // Check if this is a compensation
327
                if step.Status == StepStatusCompensation {
332✔
328
                        // For distributed setup: if local engine doesn't have the compensation handler,
14✔
329
                        // release the queue item so another service can execute the compensation.
14✔
330
                        def, err := engine.store.GetWorkflowDefinition(ctx, instance.WorkflowID)
14✔
331
                        if err != nil {
14✔
332
                                return fmt.Errorf("get workflow definition: %w", err)
×
333
                        }
×
334
                        stepDef, ok := def.Definition.Steps[step.StepName]
14✔
335
                        if !ok {
14✔
336
                                return fmt.Errorf("step definition not found: %s", step.StepName)
×
337
                        }
×
338
                        if stepDef.OnFailure != "" {
28✔
339
                                if onFailDef, ok := def.Definition.Steps[stepDef.OnFailure]; ok {
28✔
340
                                        engine.mu.RLock()
14✔
341
                                        _, has := engine.handlers[onFailDef.Handler]
14✔
342
                                        engine.mu.RUnlock()
14✔
343
                                        if !has {
16✔
344
                                                // Reschedule with cooldown and release so another service can pick it up later
2✔
345
                                                delay := engine.jitteredCooldown()
2✔
346
                                                if delay > 0 {
3✔
347
                                                        if err := engine.store.RescheduleAndReleaseQueueItem(ctx, item.ID, delay); err != nil {
1✔
348
                                                                return fmt.Errorf("reschedule queue item: %w", err)
×
349
                                                        }
×
350
                                                } else {
1✔
351
                                                        if err := engine.store.ReleaseQueueItem(ctx, item.ID); err != nil {
1✔
352
                                                                return fmt.Errorf("release queue item: %w", err)
×
353
                                                        }
×
354
                                                }
355

356
                                                removeFromQueue = false
2✔
357
                                                // Throttle skip logs to avoid flooding
2✔
358
                                                logKey := fmt.Sprintf("comp-skip:%d:%s", instance.ID, step.StepName)
2✔
359
                                                if engine.shouldLogSkip(logKey) {
4✔
360
                                                        _ = engine.store.LogEvent(ctx, instance.ID, nil, EventStepSkippedMissingHandler, map[string]any{
2✔
361
                                                                KeyStepName: step.StepName,
2✔
362
                                                                KeyMessage:  "no local compensation handler registered; rescheduled",
2✔
363
                                                        })
2✔
364
                                                }
2✔
365

366
                                                return nil
2✔
367
                                        }
368
                                }
369
                        }
370

371
                        return engine.executeCompensationStep(ctx, instance, step)
12✔
372
                } else if step.Status == StepStatusRolledBack {
308✔
373
                        return nil
4✔
374
                }
4✔
375

376
                // Distributed handlers: if this is a task step and no local handler is registered,
377
                // release the queue item so another service can pick it up, without failing the step.
378
                def, err := engine.store.GetWorkflowDefinition(ctx, instance.WorkflowID)
300✔
379
                if err != nil {
300✔
380
                        return fmt.Errorf("get workflow definition: %w", err)
×
381
                }
×
382
                stepDef, ok := def.Definition.Steps[step.StepName]
300✔
383
                if !ok {
300✔
384
                        return fmt.Errorf("step definition not found: %s", step.StepName)
×
385
                }
×
386
                if step.StepType == StepTypeTask {
510✔
387
                        engine.mu.RLock()
210✔
388
                        _, has := engine.handlers[stepDef.Handler]
210✔
389
                        engine.mu.RUnlock()
210✔
390
                        if !has {
214✔
391
                                // Reschedule with cooldown and release so another service can pick it up later
4✔
392
                                delay := engine.jitteredCooldown()
4✔
393
                                if delay > 0 {
5✔
394
                                        if err := engine.store.RescheduleAndReleaseQueueItem(ctx, item.ID, delay); err != nil {
1✔
395
                                                return fmt.Errorf("reschedule queue item: %w", err)
×
396
                                        }
×
397
                                } else {
3✔
398
                                        if err := engine.store.ReleaseQueueItem(ctx, item.ID); err != nil {
3✔
399
                                                return fmt.Errorf("release queue item: %w", err)
×
400
                                        }
×
401
                                }
402
                                removeFromQueue = false
4✔
403
                                // Throttle skip logs to avoid flooding
4✔
404
                                logKey := fmt.Sprintf("task-skip:%d:%s", instance.ID, step.StepName)
4✔
405
                                if engine.shouldLogSkip(logKey) {
7✔
406
                                        _ = engine.store.LogEvent(ctx, instance.ID, nil, EventStepSkippedMissingHandler, map[string]any{
3✔
407
                                                KeyStepName: step.StepName,
3✔
408
                                                KeyMessage:  "no local handler registered; rescheduled",
3✔
409
                                        })
3✔
410
                                }
3✔
411
                                return nil
4✔
412
                        }
413
                }
414

415
                return engine.executeStep(ctx, instance, step)
296✔
416
        })
417
        if err != nil {
941✔
418
                return empty, err
4✔
419
        }
4✔
420

421
        return empty, nil
933✔
422
}
423

424
func (engine *Engine) MakeHumanDecision(
425
        ctx context.Context,
426
        stepID int64,
427
        decidedBy string,
428
        decision HumanDecision,
429
        comment *string,
430
) error {
14✔
431
        return engine.txManager.ReadCommitted(ctx, func(ctx context.Context) error {
28✔
432
                step, err := engine.store.GetStepByID(ctx, stepID)
14✔
433
                if err != nil {
15✔
434
                        return fmt.Errorf("get step: %w", err)
1✔
435
                }
1✔
436

437
                if step.StepType != StepTypeHuman {
14✔
438
                        return fmt.Errorf("step %d is not a human step", stepID)
1✔
439
                }
1✔
440

441
                if step.Status != StepStatusWaitingDecision {
13✔
442
                        return fmt.Errorf("step %d is not waiting for decision (current status: %s)", stepID, step.Status)
1✔
443
                }
1✔
444

445
                decisionRecord := &HumanDecisionRecord{
11✔
446
                        InstanceID: step.InstanceID,
11✔
447
                        StepID:     stepID,
11✔
448
                        DecidedBy:  decidedBy,
11✔
449
                        Decision:   decision,
11✔
450
                        Comment:    comment,
11✔
451
                        DecidedAt:  time.Now(),
11✔
452
                }
11✔
453

11✔
454
                if err := engine.store.CreateHumanDecision(ctx, decisionRecord); err != nil {
12✔
455
                        return fmt.Errorf("create human decision: %w", err)
1✔
456
                }
1✔
457

458
                var newStatus StepStatus
10✔
459
                switch decision {
10✔
460
                case HumanDecisionConfirmed:
7✔
461
                        newStatus = StepStatusConfirmed
7✔
462
                case HumanDecisionRejected:
3✔
463
                        newStatus = StepStatusRejected
3✔
464
                }
465

466
                if err := engine.store.UpdateStepStatus(ctx, stepID, newStatus); err != nil {
11✔
467
                        return fmt.Errorf("update step status: %w", err)
1✔
468
                }
1✔
469

470
                _ = engine.store.LogEvent(ctx, step.InstanceID, &stepID, EventStepCompleted, map[string]any{
9✔
471
                        KeyStepName:  step.StepName,
9✔
472
                        KeyDecision:  decision,
9✔
473
                        KeyDecidedBy: decidedBy,
9✔
474
                })
9✔
475

9✔
476
                // If the decision is confirmed, continue workflow execution
9✔
477
                if decision == HumanDecisionConfirmed {
15✔
478
                        // Set the workflow to running status if it was paused
6✔
479
                        instance, err := engine.store.GetInstance(ctx, step.InstanceID)
6✔
480
                        if err != nil {
7✔
481
                                return fmt.Errorf("get instance: %w", err)
1✔
482
                        }
1✔
483

484
                        if instance.Status == StatusPending {
7✔
485
                                if err := engine.store.UpdateInstanceStatus(ctx, step.InstanceID, StatusRunning, nil, nil); err != nil {
3✔
486
                                        return fmt.Errorf("update instance status: %w", err)
1✔
487
                                }
1✔
488
                        }
489

490
                        // Continue execution of next steps
491
                        return engine.continueWorkflowAfterHumanDecision(ctx, instance, step)
4✔
492
                } else {
3✔
493
                        // If the decision is rejected, stop the workflow
3✔
494
                        return engine.store.UpdateInstanceStatus(ctx, step.InstanceID, StatusAborted, nil, nil)
3✔
495
                }
3✔
496
        })
497
}
498

499
func (engine *Engine) CancelWorkflow(ctx context.Context, instanceID int64, requestedBy, reason string) error {
8✔
500
        return engine.txManager.ReadCommitted(ctx, func(ctx context.Context) error {
16✔
501
                instance, err := engine.store.GetInstance(ctx, instanceID)
8✔
502
                if err != nil {
9✔
503
                        return fmt.Errorf("get instance: %w", err)
1✔
504
                }
1✔
505

506
                if instance.Status == StatusCompleted ||
7✔
507
                        instance.Status == StatusFailed ||
7✔
508
                        instance.Status == StatusCancelled ||
7✔
509
                        instance.Status == StatusAborted {
9✔
510
                        return fmt.Errorf("workflow %d is already in terminal state: %s", instanceID, instance.Status)
2✔
511
                }
2✔
512

513
                req := &WorkflowCancelRequest{
5✔
514
                        InstanceID:  instanceID,
5✔
515
                        RequestedBy: requestedBy,
5✔
516
                        CancelType:  CancelTypeCancel,
5✔
517
                        Reason:      &reason,
5✔
518
                }
5✔
519

5✔
520
                if err := engine.store.CreateCancelRequest(ctx, req); err != nil {
6✔
521
                        return fmt.Errorf("create cancel request: %w", err)
1✔
522
                }
1✔
523

524
                _ = engine.store.LogEvent(ctx, instanceID, nil, EventCancellationStarted, map[string]any{
4✔
525
                        KeyRequestedBy: requestedBy,
4✔
526
                        KeyReason:      reason,
4✔
527
                        KeyCancelType:  CancelTypeCancel,
4✔
528
                })
4✔
529

4✔
530
                return nil
4✔
531
        })
532
}
533

534
func (engine *Engine) AbortWorkflow(ctx context.Context, instanceID int64, requestedBy, reason string) error {
7✔
535
        return engine.txManager.ReadCommitted(ctx, func(ctx context.Context) error {
14✔
536
                instance, err := engine.store.GetInstance(ctx, instanceID)
7✔
537
                if err != nil {
8✔
538
                        return fmt.Errorf("get instance: %w", err)
1✔
539
                }
1✔
540

541
                if instance.Status == StatusCompleted ||
6✔
542
                        instance.Status == StatusFailed ||
6✔
543
                        instance.Status == StatusCancelled ||
6✔
544
                        instance.Status == StatusAborted {
8✔
545
                        return fmt.Errorf("workflow %d is already in terminal state: %s", instanceID, instance.Status)
2✔
546
                }
2✔
547

548
                req := &WorkflowCancelRequest{
4✔
549
                        InstanceID:  instanceID,
4✔
550
                        RequestedBy: requestedBy,
4✔
551
                        CancelType:  CancelTypeAbort,
4✔
552
                        Reason:      &reason,
4✔
553
                }
4✔
554

4✔
555
                if err := engine.store.CreateCancelRequest(ctx, req); err != nil {
5✔
556
                        return fmt.Errorf("create cancel request: %w", err)
1✔
557
                }
1✔
558

559
                _ = engine.store.LogEvent(ctx, instanceID, nil, EventAbortStarted, map[string]any{
3✔
560
                        KeyRequestedBy: requestedBy,
3✔
561
                        KeyReason:      reason,
3✔
562
                        KeyCancelType:  CancelTypeAbort,
3✔
563
                })
3✔
564

3✔
565
                return nil
3✔
566
        })
567
}
568

569
func (engine *Engine) isShutdown() bool {
2,193✔
570
        engine.shutdownMu.RLock()
2,193✔
571
        defer engine.shutdownMu.RUnlock()
2,193✔
572

2,193✔
573
        return engine.isShuttingDown
2,193✔
574
}
2,193✔
575

576
func (engine *Engine) cancelRequestsWorker() {
144✔
577
        ticker := time.NewTicker(engine.cancelWorkerInterval)
144✔
578
        defer ticker.Stop()
144✔
579

144✔
580
        for {
717✔
581
                select {
573✔
582
                case <-engine.shutdownCh:
144✔
583
                        return
144✔
584
                case <-ticker.C:
429✔
585
                        engine.processCancelRequests()
429✔
586
                }
587
        }
588
}
589

590
func (engine *Engine) processCancelRequests() {
429✔
591
        engine.cancelMu.RLock()
429✔
592
        instanceIDs := make([]int64, 0, len(engine.cancelContexts))
429✔
593
        for instanceID := range engine.cancelContexts {
564✔
594
                instanceIDs = append(instanceIDs, instanceID)
135✔
595
        }
135✔
596
        engine.cancelMu.RUnlock()
429✔
597

429✔
598
        for _, instanceID := range instanceIDs {
564✔
599
                ctx := context.Background()
135✔
600
                _, err := engine.store.GetCancelRequest(ctx, instanceID)
135✔
601
                if err != nil {
270✔
602
                        if !errors.Is(err, ErrEntityNotFound) {
135✔
603
                                slog.Error("[floxy] get cancel request for instance failed",
×
604
                                        "instance_id", instanceID, "error", err)
×
605
                        }
×
606

607
                        continue
135✔
608
                }
609

610
                engine.cancelMu.Lock()
×
611
                if stepContexts, exists := engine.cancelContexts[instanceID]; exists {
×
612
                        for stepID, cancelFunc := range stepContexts {
×
613
                                cancelFunc()
×
614
                                delete(stepContexts, stepID)
×
615
                        }
×
616

617
                        delete(engine.cancelContexts, instanceID)
×
618
                }
619
                engine.cancelMu.Unlock()
×
620
        }
621
}
622

623
func (engine *Engine) registerInstanceContext(instanceID int64, stepID int64, cancel context.CancelFunc) {
297✔
624
        engine.cancelMu.Lock()
297✔
625
        defer engine.cancelMu.Unlock()
297✔
626

297✔
627
        if engine.cancelContexts[instanceID] == nil {
591✔
628
                engine.cancelContexts[instanceID] = make(map[int64]context.CancelFunc)
294✔
629
        }
294✔
630
        engine.cancelContexts[instanceID][stepID] = cancel
297✔
631
}
632

633
func (engine *Engine) unregisterInstanceContext(instanceID int64, stepID int64) {
297✔
634
        engine.cancelMu.Lock()
297✔
635
        defer engine.cancelMu.Unlock()
297✔
636

297✔
637
        if stepContexts, exists := engine.cancelContexts[instanceID]; exists {
594✔
638
                delete(stepContexts, stepID)
297✔
639
                if len(stepContexts) == 0 {
591✔
640
                        delete(engine.cancelContexts, instanceID)
294✔
641
                }
294✔
642
        }
643
}
644

645
func (engine *Engine) stopActiveSteps(ctx context.Context, instanceID int64) error {
8✔
646
        activeSteps, err := engine.store.GetActiveStepsForUpdate(ctx, instanceID)
8✔
647
        if err != nil {
8✔
648
                return fmt.Errorf("get active steps: %w", err)
×
649
        }
×
650

651
        for _, step := range activeSteps {
13✔
652
                skipMsg := "Stopped due to workflow cancellation/abort"
5✔
653
                if err := engine.store.UpdateStep(ctx, step.ID, StepStatusSkipped, nil, &skipMsg); err != nil {
5✔
654
                        return fmt.Errorf("update step %d status to skipped: %w", step.ID, err)
×
655
                }
×
656

657
                _ = engine.store.LogEvent(ctx, instanceID, &step.ID, EventStepFailed, map[string]any{
5✔
658
                        KeyStepName: step.StepName,
5✔
659
                        KeyReason:   "workflow_stopped",
5✔
660
                })
5✔
661
        }
662

663
        return nil
8✔
664
}
665

666
func (engine *Engine) executeStep(ctx context.Context, instance *WorkflowInstance, step *WorkflowStep) error {
297✔
667
        // If workflow is in DLQ state, do not execute any steps until operator requeues
297✔
668
        if instance.Status == StatusDLQ {
297✔
669
                return nil
×
670
        }
×
671

672
        def, err := engine.store.GetWorkflowDefinition(ctx, instance.WorkflowID)
297✔
673
        if err != nil {
297✔
674
                return fmt.Errorf("get workflow definition: %w", err)
×
675
        }
×
676

677
        stepDef, ok := def.Definition.Steps[step.StepName]
297✔
678
        if !ok {
297✔
679
                return fmt.Errorf("step definition not found: %s", step.StepName)
×
680
        }
×
681

682
        // PLUGIN HOOK: OnStepStart
683
        if engine.pluginManager != nil {
297✔
684
                if err := engine.pluginManager.ExecuteStepStart(ctx, instance, step); err != nil {
×
685
                        return fmt.Errorf("plugin hook OnStepStart failed: %w", err)
×
686
                }
×
687
        }
688

689
        handlerCtx, cancel := context.WithCancel(ctx)
297✔
690
        defer cancel()
297✔
691

297✔
692
        engine.registerInstanceContext(instance.ID, step.ID, cancel)
297✔
693
        defer engine.unregisterInstanceContext(instance.ID, step.ID)
297✔
694

297✔
695
        cancelReq, err := engine.store.GetCancelRequest(ctx, instance.ID)
297✔
696
        if err == nil && cancelReq != nil {
302✔
697
                return engine.handleCancellation(ctx, instance, step, cancelReq)
5✔
698
        }
5✔
699

700
        if stepDef.Timeout != 0 {
292✔
701
                var timeoutCancel context.CancelFunc
×
702
                handlerCtx, timeoutCancel = context.WithTimeout(handlerCtx, stepDef.Timeout)
×
703
                defer timeoutCancel()
×
704
        }
×
705

706
        if err := engine.store.UpdateStep(ctx, step.ID, StepStatusRunning, nil, nil); err != nil {
292✔
707
                return fmt.Errorf("update step status: %w", err)
×
708
        }
×
709

710
        _ = engine.store.LogEvent(ctx, instance.ID, &step.ID, EventStepStarted, map[string]any{
292✔
711
                KeyStepName: step.StepName,
292✔
712
                KeyStepType: stepDef.Type,
292✔
713
        })
292✔
714

292✔
715
        var output json.RawMessage
292✔
716
        var stepErr error
292✔
717
        next := true
292✔
718

292✔
719
        switch stepDef.Type {
292✔
720
        case StepTypeTask:
203✔
721
                output, stepErr = engine.executeTask(handlerCtx, instance, step, stepDef)
203✔
722
        case StepTypeFork:
22✔
723
                output, stepErr = engine.executeFork(handlerCtx, instance, step, stepDef)
22✔
724
        case StepTypeJoin:
17✔
725
                output, stepErr = engine.executeJoin(handlerCtx, instance, step, stepDef)
17✔
726
        case StepTypeParallel:
×
727
                output, stepErr = engine.executeFork(handlerCtx, instance, step, stepDef)
×
728
        case StepTypeSavePoint:
5✔
729
                output = step.Input
5✔
730
        case StepTypeCondition:
41✔
731
                output, next, stepErr = engine.executeCondition(handlerCtx, instance, step, stepDef)
41✔
732
        case StepTypeHuman:
4✔
733
                var aborted bool
4✔
734
                output, aborted, stepErr = engine.executeHuman(handlerCtx, instance, step, stepDef)
4✔
735
                if stepErr == nil && aborted {
5✔
736
                        return nil
1✔
737
                }
1✔
738
        default:
×
739
                stepErr = fmt.Errorf("unsupported step type: %s", stepDef.Type)
×
740
        }
741

742
        if errors.Is(handlerCtx.Err(), context.Canceled) {
291✔
743
                cancelReq, err = engine.store.GetCancelRequest(ctx, instance.ID)
×
744
                if err == nil && cancelReq != nil {
×
745
                        return engine.handleCancellation(ctx, instance, step, cancelReq)
×
746
                }
×
747
        }
748

749
        if stepErr != nil {
344✔
750
                // PLUGIN HOOK: OnStepFailed
53✔
751
                if engine.pluginManager != nil {
53✔
752
                        if errPlugin := engine.pluginManager.ExecuteStepFailed(ctx, instance, step, stepErr); errPlugin != nil {
×
753
                                slog.Warn("[floxy] plugin hook OnStepFailed failed", "error", err)
×
754
                        }
×
755
                }
756

757
                return engine.handleStepFailure(ctx, instance, step, stepDef, stepErr)
53✔
758
        }
759

760
        // PLUGIN HOOK: OnStepComplete
761
        if engine.pluginManager != nil {
238✔
762
                if err := engine.pluginManager.ExecuteStepComplete(ctx, instance, step); err != nil {
×
763
                        slog.Warn("[floxy] plugin hook OnStepComplete failed", "error", err)
×
764
                }
×
765
        }
766

767
        return engine.handleStepSuccess(ctx, instance, step, stepDef, output, next)
238✔
768
}
769

770
func (engine *Engine) handleCancellation(
771
        ctx context.Context,
772
        instance *WorkflowInstance,
773
        step *WorkflowStep,
774
        req *WorkflowCancelRequest,
775
) error {
8✔
776
        if err := engine.stopActiveSteps(ctx, instance.ID); err != nil {
8✔
777
                return fmt.Errorf("stop active steps: %w", err)
×
778
        }
×
779

780
        reason := ""
8✔
781
        if req.Reason != nil {
16✔
782
                reason = *req.Reason
8✔
783
        }
8✔
784

785
        if req.CancelType == CancelTypeCancel {
13✔
786
                err := engine.store.UpdateInstanceStatus(ctx, instance.ID, StatusCancelling, nil, nil)
5✔
787
                if err != nil {
5✔
788
                        return fmt.Errorf("update instance status to cancelling: %w", err)
×
789
                }
×
790

791
                def, err := engine.store.GetWorkflowDefinition(ctx, instance.WorkflowID)
5✔
792
                if err != nil {
5✔
793
                        return fmt.Errorf("get workflow definition: %w", err)
×
794
                }
×
795

796
                // Get fresh steps after stopActiveSteps to ensure we have current statuses
797
                steps, err := engine.store.GetStepsByInstance(ctx, instance.ID)
5✔
798
                if err != nil {
5✔
799
                        return fmt.Errorf("get steps: %w", err)
×
800
                }
×
801

802
                // Create stepMap with pointers to actual step values, not copies
803
                stepMap := make(map[string]*WorkflowStep)
5✔
804
                for i := range steps {
18✔
805
                        stepMap[steps[i].StepName] = &steps[i]
13✔
806
                }
13✔
807

808
                var lastCompletedStep *WorkflowStep
5✔
809
                for i := len(steps) - 1; i >= 0; i-- {
14✔
810
                        if steps[i].Status == StepStatusCompleted {
13✔
811
                                lastCompletedStep = &steps[i]
4✔
812

4✔
813
                                break
4✔
814
                        }
815
                }
816

817
                if lastCompletedStep != nil {
9✔
818
                        err := engine.rollbackStepChain(ctx, instance.ID, lastCompletedStep.StepName, rootStepName, def, stepMap, false, 0)
4✔
819
                        if err != nil {
5✔
820
                                _ = engine.store.LogEvent(ctx, instance.ID, &step.ID, EventWorkflowCancelled, map[string]any{
1✔
821
                                        KeyRequestedBy: req.RequestedBy,
1✔
822
                                        KeyReason:      reason,
1✔
823
                                        KeyError:       fmt.Sprintf("rollback failed: %v", err),
1✔
824
                                })
1✔
825

1✔
826
                                errMsg := fmt.Sprintf("cancellation rollback failed: %v", err)
1✔
827
                                _ = engine.store.UpdateInstanceStatus(ctx, instance.ID, StatusFailed, nil, &errMsg)
1✔
828
                                _ = engine.store.DeleteCancelRequest(ctx, instance.ID)
1✔
829

1✔
830
                                return fmt.Errorf("rollback failed: %w", err)
1✔
831
                        }
1✔
832
                }
833

834
                cancelMsg := fmt.Sprintf("Cancelled by %s: %s", req.RequestedBy, reason)
4✔
835
                err = engine.store.UpdateInstanceStatus(ctx, instance.ID, StatusCancelled, nil, &cancelMsg)
4✔
836
                if err != nil {
4✔
837
                        return fmt.Errorf("update instance status to cancelled: %w", err)
×
838
                }
×
839

840
                _ = engine.store.LogEvent(ctx, instance.ID, &step.ID, EventWorkflowCancelled, map[string]any{
4✔
841
                        KeyRequestedBy: req.RequestedBy,
4✔
842
                        KeyReason:      reason,
4✔
843
                })
4✔
844
        } else {
3✔
845
                abortMsg := fmt.Sprintf("Aborted by %s: %s", req.RequestedBy, reason)
3✔
846
                if err := engine.store.UpdateInstanceStatus(ctx, instance.ID, StatusAborted, nil, &abortMsg); err != nil {
3✔
847
                        return fmt.Errorf("update instance status to aborted: %w", err)
×
848
                }
×
849

850
                _ = engine.store.LogEvent(ctx, instance.ID, &step.ID, EventWorkflowAborted, map[string]any{
3✔
851
                        KeyRequestedBy: req.RequestedBy,
3✔
852
                        KeyReason:      reason,
3✔
853
                })
3✔
854
        }
855

856
        _ = engine.store.DeleteCancelRequest(ctx, instance.ID)
7✔
857

7✔
858
        return nil
7✔
859
}
860

861
func (engine *Engine) executeCompensationStep(ctx context.Context, instance *WorkflowInstance, step *WorkflowStep) error {
17✔
862
        def, err := engine.store.GetWorkflowDefinition(ctx, instance.WorkflowID)
17✔
863
        if err != nil {
17✔
864
                return fmt.Errorf("get workflow definition: %w", err)
×
865
        }
×
866

867
        stepDef, ok := def.Definition.Steps[step.StepName]
17✔
868
        if !ok {
17✔
869
                return fmt.Errorf("step definition not found: %s", step.StepName)
×
870
        }
×
871

872
        if stepDef.Timeout != 0 {
17✔
873
                var cancel context.CancelFunc
×
874
                ctx, cancel = context.WithTimeout(ctx, stepDef.Timeout)
×
875
                defer cancel()
×
876
        }
×
877

878
        onFailureStep, ok := def.Definition.Steps[stepDef.OnFailure]
17✔
879
        if !ok {
18✔
880
                // No compensation handler, mark as rolled back
1✔
881
                if err := engine.store.UpdateStep(ctx, step.ID, StepStatusRolledBack, step.Input, nil); err != nil {
1✔
882
                        return fmt.Errorf("update step status: %w", err)
×
883
                }
×
884
                return nil
1✔
885
        }
886

887
        handler, exists := engine.handlers[onFailureStep.Handler]
16✔
888
        if !exists {
17✔
889
                // Handler not found, mark as rolled back
1✔
890
                if err := engine.store.UpdateStep(ctx, step.ID, StepStatusRolledBack, step.Input, nil); err != nil {
1✔
891
                        return fmt.Errorf("update step status: %w", err)
×
892
                }
×
893
                return nil
1✔
894
        }
895

896
        variables := make(map[string]any, len(onFailureStep.Metadata)+1)
15✔
897
        for k, v := range onFailureStep.Metadata {
28✔
898
                variables[k] = v
13✔
899
        }
13✔
900
        variables["reason"] = "compensation"
15✔
901

15✔
902
        stepCtx := executionContext{
15✔
903
                instanceID:     step.InstanceID,
15✔
904
                stepName:       step.StepName,
15✔
905
                idempotencyKey: step.IdempotencyKey,
15✔
906
                retryCount:     step.CompensationRetryCount,
15✔
907
                variables:      variables,
15✔
908
        }
15✔
909

15✔
910
        // Execute the compensation handler
15✔
911
        _, compensationErr := handler.Execute(ctx, &stepCtx, step.Input)
15✔
912
        if compensationErr != nil {
17✔
913
                // Compensation failed, check if we can retry
2✔
914
                if step.CompensationRetryCount < onFailureStep.MaxRetries {
3✔
915
                        // Retry compensation
1✔
916
                        newRetryCount := step.CompensationRetryCount + 1
1✔
917
                        if err := engine.store.UpdateStepCompensationRetry(ctx, step.ID, newRetryCount, StepStatusCompensation); err != nil {
1✔
918
                                return fmt.Errorf("update compensation retry: %w", err)
×
919
                        }
×
920

921
                        // Re-enqueue for retry
922
                        if err := engine.store.EnqueueStep(ctx, step.InstanceID, &step.ID, PriorityHigh, stepDef.Delay); err != nil {
1✔
923
                                return fmt.Errorf("enqueue compensation retry: %w", err)
×
924
                        }
×
925

926
                        _ = engine.store.LogEvent(ctx, step.InstanceID, &step.ID, EventStepFailed, map[string]any{
1✔
927
                                KeyStepName:   step.StepName,
1✔
928
                                KeyStepType:   step.StepType,
1✔
929
                                KeyError:      compensationErr.Error(),
1✔
930
                                KeyRetryCount: newRetryCount,
1✔
931
                                KeyReason:     "compensation_retry",
1✔
932
                        })
1✔
933

1✔
934
                        return nil
1✔
935
                } else {
1✔
936
                        // Max retries exceeded, mark as failed
1✔
937
                        errorMsg := compensationErr.Error()
1✔
938
                        if err := engine.store.UpdateStep(ctx, step.ID, StepStatusFailed, step.Input, &errorMsg); err != nil {
1✔
939
                                return fmt.Errorf("update step status: %w", err)
×
940
                        }
×
941

942
                        _ = engine.store.LogEvent(ctx, step.InstanceID, &step.ID, EventStepFailed, map[string]any{
1✔
943
                                KeyStepName: step.StepName,
1✔
944
                                KeyStepType: step.StepType,
1✔
945
                                KeyError:    "compensation max retries exceeded",
1✔
946
                        })
1✔
947

1✔
948
                        return nil
1✔
949
                }
950
        }
951

952
        // Compensation successful, mark as rolled back
953
        if err := engine.store.UpdateStep(ctx, step.ID, StepStatusRolledBack, step.Input, nil); err != nil {
13✔
954
                return fmt.Errorf("update step status: %w", err)
×
955
        }
×
956

957
        _ = engine.store.LogEvent(ctx, step.InstanceID, &step.ID, EventStepCompleted, map[string]any{
13✔
958
                KeyStepName:   step.StepName,
13✔
959
                KeyStepType:   step.StepType,
13✔
960
                KeyRetryCount: step.CompensationRetryCount,
13✔
961
                KeyReason:     "compensation_success",
13✔
962
        })
13✔
963

13✔
964
        // Check if all compensation steps are finished
13✔
965
        // If no more unfinished steps and no compensation steps, finalize workflow status
13✔
966
        if !engine.hasUnfinishedSteps(ctx, step.InstanceID) && !engine.hasStepsInCompensation(ctx, step.InstanceID) {
19✔
967
                // All compensation done, now check final status
6✔
968
                if engine.hasFailedOrRolledBackSteps(ctx, step.InstanceID) {
11✔
969
                        // Mark workflow as failed
5✔
970
                        errMsg := "workflow has failed or rolled back steps"
5✔
971
                        if err := engine.store.UpdateInstanceStatus(ctx, step.InstanceID, StatusFailed, nil, &errMsg); err != nil {
5✔
972
                                return fmt.Errorf("update instance status to failed: %w", err)
×
973
                        }
×
974

975
                        _ = engine.store.LogEvent(ctx, step.InstanceID, nil, EventWorkflowFailed, map[string]any{
5✔
976
                                KeyWorkflowID: instance.WorkflowID,
5✔
977
                                KeyReason:     errMsg,
5✔
978
                        })
5✔
979

5✔
980
                        // PLUGIN HOOK: OnWorkflowFailed
5✔
981
                        if engine.pluginManager != nil {
5✔
982
                                finalInstance, _ := engine.store.GetInstance(ctx, step.InstanceID)
×
983
                                if finalInstance != nil {
×
984
                                        if errPlugin := engine.pluginManager.ExecuteWorkflowFailed(ctx, finalInstance); errPlugin != nil {
×
985
                                                slog.Warn("[floxy] plugin hook OnWorkflowFailed failed", "error", errPlugin)
×
986
                                        }
×
987
                                }
988
                        }
989
                }
990
        }
991

992
        return nil
13✔
993
}
994

995
func (engine *Engine) executeTask(
996
        ctx context.Context,
997
        instance *WorkflowInstance,
998
        step *WorkflowStep,
999
        stepDef *StepDefinition,
1000
) (json.RawMessage, error) {
205✔
1001
        engine.mu.RLock()
205✔
1002
        handler, ok := engine.handlers[stepDef.Handler]
205✔
1003
        engine.mu.RUnlock()
205✔
1004

205✔
1005
        if !ok {
206✔
1006
                return nil, fmt.Errorf("handler not found: %s", stepDef.Handler)
1✔
1007
        }
1✔
1008

1009
        execCtx := &executionContext{
204✔
1010
                instanceID:     instance.ID,
204✔
1011
                stepName:       step.StepName,
204✔
1012
                idempotencyKey: step.IdempotencyKey,
204✔
1013
                retryCount:     step.RetryCount,
204✔
1014
                variables:      stepDef.Metadata,
204✔
1015
        }
204✔
1016

204✔
1017
        return handler.Execute(ctx, execCtx, step.Input)
204✔
1018
}
1019

1020
func (engine *Engine) executeFork(
1021
        ctx context.Context,
1022
        instance *WorkflowInstance,
1023
        step *WorkflowStep,
1024
        stepDef *StepDefinition,
1025
) (json.RawMessage, error) {
23✔
1026
        _ = engine.store.LogEvent(ctx, instance.ID, &step.ID, EventForkStarted, map[string]any{
23✔
1027
                KeyParallelSteps: stepDef.Parallel,
23✔
1028
        })
23✔
1029

23✔
1030
        def, err := engine.store.GetWorkflowDefinition(ctx, instance.WorkflowID)
23✔
1031
        if err != nil {
23✔
1032
                return nil, fmt.Errorf("get workflow definition: %w", err)
×
1033
        }
×
1034

1035
        for _, parallelStepName := range stepDef.Parallel {
72✔
1036
                parallelStepDef, ok := def.Definition.Steps[parallelStepName]
49✔
1037
                if !ok {
49✔
1038
                        return nil, fmt.Errorf("parallel step definition not found: %s", parallelStepName)
×
1039
                }
×
1040

1041
                parallelStep := &WorkflowStep{
49✔
1042
                        InstanceID: instance.ID,
49✔
1043
                        StepName:   parallelStepName,
49✔
1044
                        StepType:   parallelStepDef.Type,
49✔
1045
                        Status:     StepStatusPending,
49✔
1046
                        Input:      step.Input,
49✔
1047
                        MaxRetries: parallelStepDef.MaxRetries,
49✔
1048
                }
49✔
1049

49✔
1050
                if err := engine.store.CreateStep(ctx, parallelStep); err != nil {
49✔
1051
                        return nil, fmt.Errorf("create fork step %s: %w", parallelStepName, err)
×
1052
                }
×
1053

1054
                if err := engine.store.EnqueueStep(ctx, instance.ID, &parallelStep.ID, PriorityNormal, parallelStepDef.Delay); err != nil {
49✔
1055
                        return nil, fmt.Errorf("enqueue fork step %s: %w", parallelStepName, err)
×
1056
                }
×
1057
        }
1058

1059
        for _, nextStepName := range stepDef.Next {
46✔
1060
                nextStepDef, ok := def.Definition.Steps[nextStepName]
23✔
1061

23✔
1062
                if ok && nextStepDef.Type == StepTypeJoin {
46✔
1063
                        strategy := nextStepDef.JoinStrategy
23✔
1064
                        if strategy == "" {
23✔
1065
                                strategy = JoinStrategyAll
×
1066
                        }
×
1067

1068
                        waitFor := nextStepDef.WaitFor
23✔
1069
                        if len(waitFor) == 0 {
23✔
1070
                                waitFor = stepDef.Parallel
×
1071
                        }
×
1072

1073
                        err := engine.store.CreateJoinState(ctx, instance.ID, nextStepName, waitFor, strategy)
23✔
1074
                        if err != nil {
23✔
1075
                                return nil, fmt.Errorf("create join state: %w", err)
×
1076
                        }
×
1077

1078
                        _ = engine.store.LogEvent(ctx, instance.ID, &step.ID, EventJoinStateCreated, map[string]any{
23✔
1079
                                KeyJoinStep:   nextStepName,
23✔
1080
                                KeyWaitingFor: waitFor,
23✔
1081
                                KeyStrategy:   strategy,
23✔
1082
                        })
23✔
1083
                }
1084
        }
1085

1086
        return json.Marshal(map[string]any{
23✔
1087
                KeyStatus:        "forked",
23✔
1088
                KeyParallelSteps: stepDef.Parallel,
23✔
1089
        })
23✔
1090
}
1091

1092
func (engine *Engine) executeJoin(
1093
        ctx context.Context,
1094
        instance *WorkflowInstance,
1095
        step *WorkflowStep,
1096
        _ *StepDefinition,
1097
) (json.RawMessage, error) {
20✔
1098
        joinState, err := engine.store.GetJoinState(ctx, instance.ID, step.StepName)
20✔
1099
        if err != nil {
20✔
1100
                return nil, fmt.Errorf("get join state: %w", err)
×
1101
        }
×
1102

1103
        _ = engine.store.LogEvent(ctx, instance.ID, &step.ID, EventJoinCheck, map[string]any{
20✔
1104
                KeyWaitingFor: joinState.WaitingFor,
20✔
1105
                KeyCompleted:  joinState.Completed,
20✔
1106
                KeyFailed:     joinState.Failed,
20✔
1107
                KeyIsReady:    joinState.IsReady,
20✔
1108
        })
20✔
1109

20✔
1110
        if !joinState.IsReady {
21✔
1111
                return nil, fmt.Errorf("join not ready: waiting for %v", joinState.WaitingFor)
1✔
1112
        }
1✔
1113

1114
        results := make(map[string]any)
19✔
1115
        results[KeyCompleted] = joinState.Completed
19✔
1116
        results[KeyFailed] = joinState.Failed
19✔
1117
        results[KeyStrategy] = joinState.JoinStrategy
19✔
1118

19✔
1119
        steps, err := engine.store.GetStepsByInstance(ctx, instance.ID)
19✔
1120
        if err != nil {
19✔
1121
                return nil, fmt.Errorf("get steps: %w", err)
×
1122
        }
×
1123

1124
        outputs := make(map[string]json.RawMessage)
19✔
1125
        for _, s := range steps {
170✔
1126
                for _, waitFor := range joinState.WaitingFor {
500✔
1127
                        if s.StepName == waitFor && s.Status == StepStatusCompleted {
372✔
1128
                                outputs[s.StepName] = s.Output
23✔
1129
                        }
23✔
1130
                }
1131
        }
1132
        results[KeyOutputs] = outputs
19✔
1133

19✔
1134
        if len(joinState.Failed) > 0 && joinState.JoinStrategy == JoinStrategyAll {
29✔
1135
                results[KeyStatus] = "failed"
10✔
1136
                failedData, _ := json.Marshal(results)
10✔
1137

10✔
1138
                return failedData, fmt.Errorf("join failed: %d steps failed", len(joinState.Failed))
10✔
1139
        }
10✔
1140

1141
        results[KeyStatus] = "success"
9✔
1142

9✔
1143
        _ = engine.store.LogEvent(ctx, instance.ID, &step.ID, EventJoinCompleted, results)
9✔
1144

9✔
1145
        return json.Marshal(results)
9✔
1146
}
1147

1148
func (engine *Engine) executeCondition(
1149
        ctx context.Context,
1150
        instance *WorkflowInstance,
1151
        step *WorkflowStep,
1152
        stepDef *StepDefinition,
1153
) (json.RawMessage, bool, error) {
41✔
1154
        var inputData map[string]any
41✔
1155
        _ = json.Unmarshal(step.Input, &inputData)
41✔
1156

41✔
1157
        stepCtx := executionContext{
41✔
1158
                instanceID:     step.InstanceID,
41✔
1159
                stepName:       step.StepName,
41✔
1160
                idempotencyKey: step.IdempotencyKey,
41✔
1161
                variables:      inputData,
41✔
1162
        }
41✔
1163

41✔
1164
        result, err := evaluateCondition(stepDef.Condition, &stepCtx)
41✔
1165
        if err != nil {
41✔
1166
                _ = engine.store.LogEvent(ctx, instance.ID, &step.ID, EventConditionCheck, map[string]any{
×
1167
                        KeyStepName: step.StepName,
×
1168
                        KeyError:    err.Error(),
×
1169
                })
×
1170

×
1171
                return nil, false, fmt.Errorf("evaluate condition: %w", err)
×
1172
        }
×
1173

1174
        _ = engine.store.LogEvent(ctx, instance.ID, &step.ID, EventConditionCheck, map[string]any{
41✔
1175
                KeyStepName: step.StepName,
41✔
1176
                KeyResult:   result,
41✔
1177
        })
41✔
1178

41✔
1179
        return step.Input, result, nil
41✔
1180
}
1181

1182
func (engine *Engine) executeHuman(
1183
        ctx context.Context,
1184
        instance *WorkflowInstance,
1185
        step *WorkflowStep,
1186
        stepDef *StepDefinition,
1187
) (json.RawMessage, bool, error) {
4✔
1188
        // Check if there's already a decision for this step
4✔
1189
        decision, err := engine.store.GetHumanDecision(ctx, step.ID)
4✔
1190
        if err != nil && !errors.Is(err, ErrEntityNotFound) {
4✔
1191
                return nil, false, fmt.Errorf("get human decision: %w", err)
×
1192
        }
×
1193

1194
        if decision != nil {
6✔
1195
                // Decision already made, process it
2✔
1196
                return engine.processHumanDecision(ctx, instance, step, decision)
2✔
1197
        }
2✔
1198

1199
        // No decision yet, set step to waiting state
1200
        step.Status = StepStatusWaitingDecision
2✔
1201
        if err := engine.store.UpdateStepStatus(ctx, step.ID, StepStatusWaitingDecision); err != nil {
2✔
1202
                return nil, false, fmt.Errorf("update step status to waiting_decision: %w", err)
×
1203
        }
×
1204

1205
        if err := engine.store.EnqueueStep(ctx, instance.ID, &step.ID, PriorityHigher, stepDef.Delay); err != nil {
2✔
1206
                return nil, false, fmt.Errorf("enqueue step: %w", err)
×
1207
        }
×
1208

1209
        _ = engine.store.LogEvent(ctx, instance.ID, &step.ID, EventStepStarted, map[string]any{
2✔
1210
                KeyStepName: step.StepName,
2✔
1211
                KeyStepType: stepDef.Type,
2✔
1212
                KeyReason:   "waiting_for_human_decision",
2✔
1213
        })
2✔
1214

2✔
1215
        // Parse input data and add waiting status
2✔
1216
        var inputData map[string]any
2✔
1217
        if err := json.Unmarshal(step.Input, &inputData); err != nil {
2✔
1218
                // If input is not valid JSON, create empty map
×
1219
                inputData = make(map[string]any)
×
1220
        }
×
1221

1222
        // Add waiting status to the data
1223
        inputData["status"] = "waiting_decision"
2✔
1224
        inputData["message"] = "Step is waiting for human decision"
2✔
1225

2✔
1226
        // Encode back to JSON
2✔
1227
        output, err := json.Marshal(inputData)
2✔
1228
        if err != nil {
2✔
1229
                return nil, false, fmt.Errorf("marshal output: %w", err)
×
1230
        }
×
1231

1232
        event := HumanDecisionWaitingEvent{
2✔
1233
                InstanceID: instance.ID,
2✔
1234
                OutputData: output,
2✔
1235
        }
2✔
1236

2✔
1237
        if engine.humanDecisionWaitingEvents != nil {
2✔
1238
                engine.humanDecisionWaitingEvents <- event
×
1239
        }
×
1240

1241
        return output, false, nil
2✔
1242
}
1243

1244
func (engine *Engine) processHumanDecision(
1245
        ctx context.Context,
1246
        instance *WorkflowInstance,
1247
        step *WorkflowStep,
1248
        decision *HumanDecisionRecord,
1249
) (json.RawMessage, bool, error) {
2✔
1250
        // Update step status based on decision
2✔
1251
        var newStatus StepStatus
2✔
1252

2✔
1253
        switch decision.Decision {
2✔
1254
        case HumanDecisionConfirmed:
1✔
1255
                newStatus = StepStatusConfirmed
1✔
1256

1✔
1257
                _ = engine.store.LogEvent(ctx, instance.ID, &step.ID, EventStepCompleted, map[string]any{
1✔
1258
                        KeyStepName:  step.StepName,
1✔
1259
                        KeyDecision:  decision.Decision,
1✔
1260
                        KeyDecidedBy: decision.DecidedBy,
1✔
1261
                })
1✔
1262

1263
        case HumanDecisionRejected:
1✔
1264
                newStatus = StepStatusRejected
1✔
1265

1✔
1266
                errMsg := "Step was rejected by human"
1✔
1267
                if err := engine.store.UpdateInstanceStatus(ctx, instance.ID, StatusAborted, nil, &errMsg); err != nil {
1✔
1268
                        return nil, false, fmt.Errorf("update instance status: %w", err)
×
1269
                }
×
1270

1271
                _ = engine.store.LogEvent(ctx, instance.ID, &step.ID, EventStepFailed, map[string]any{
1✔
1272
                        KeyStepName:  step.StepName,
1✔
1273
                        KeyDecision:  decision.Decision,
1✔
1274
                        KeyDecidedBy: decision.DecidedBy,
1✔
1275
                })
1✔
1276
        }
1277

1278
        // Update step status
1279
        if err := engine.store.UpdateStepStatus(ctx, step.ID, newStatus); err != nil {
2✔
1280
                return nil, false, fmt.Errorf("update step status: %w", err)
×
1281
        }
×
1282

1283
        // Parse input data and add decision information
1284
        var inputData map[string]any
2✔
1285
        if err := json.Unmarshal(step.Input, &inputData); err != nil {
2✔
1286
                // If input is not valid JSON, create empty map
×
1287
                inputData = make(map[string]any)
×
1288
        }
×
1289

1290
        // Add decision information to the data
1291
        inputData["status"] = string(decision.Decision)
2✔
1292
        inputData["decided_by"] = decision.DecidedBy
2✔
1293
        if decision.Comment != nil {
4✔
1294
                inputData["comment"] = *decision.Comment
2✔
1295
        }
2✔
1296
        inputData["decided_at"] = decision.DecidedAt
2✔
1297

2✔
1298
        // Encode back to JSON
2✔
1299
        output, err := json.Marshal(inputData)
2✔
1300
        if err != nil {
2✔
1301
                return nil, false, fmt.Errorf("marshal output: %w", err)
×
1302
        }
×
1303

1304
        return output, newStatus == StepStatusRejected, nil
2✔
1305
}
1306

1307
func (engine *Engine) continueWorkflowAfterHumanDecision(
1308
        ctx context.Context,
1309
        instance *WorkflowInstance,
1310
        step *WorkflowStep,
1311
) error {
4✔
1312
        // Get workflow definition
4✔
1313
        def, err := engine.store.GetWorkflowDefinition(ctx, instance.WorkflowID)
4✔
1314
        if err != nil {
5✔
1315
                return fmt.Errorf("get workflow definition: %w", err)
1✔
1316
        }
1✔
1317

1318
        stepDef, ok := def.Definition.Steps[step.StepName]
3✔
1319
        if !ok {
3✔
1320
                return fmt.Errorf("step definition not found: %s", step.StepName)
×
1321
        }
×
1322

1323
        // Continue execution of next steps
1324
        output := json.RawMessage(`{"status": "confirmed"}`)
3✔
1325
        return engine.handleStepSuccess(ctx, instance, step, stepDef, output, true)
3✔
1326
}
1327

1328
func (engine *Engine) handleStepSuccess(
1329
        ctx context.Context,
1330
        instance *WorkflowInstance,
1331
        step *WorkflowStep,
1332
        stepDef *StepDefinition,
1333
        output json.RawMessage,
1334
        next bool,
1335
) error {
244✔
1336
        // For human steps waiting for decision, don't update status
244✔
1337
        if stepDef.Type == StepTypeHuman && step.Status == StepStatusWaitingDecision {
250✔
1338
                // Don't continue execution, wait for human decision
6✔
1339
                return nil
6✔
1340
        }
6✔
1341

1342
        if err := engine.store.UpdateStep(ctx, step.ID, StepStatusCompleted, output, nil); err != nil {
238✔
1343
                return fmt.Errorf("update step: %w", err)
×
1344
        }
×
1345

1346
        _ = engine.store.LogEvent(ctx, instance.ID, &step.ID, EventStepCompleted, map[string]any{
238✔
1347
                KeyStepName: step.StepName,
238✔
1348
        })
238✔
1349

238✔
1350
        // Check if this is a terminal step in a fork branch
238✔
1351
        def, err := engine.store.GetWorkflowDefinition(ctx, instance.WorkflowID)
238✔
1352
        if err == nil {
476✔
1353
                // First, check if we're in a Condition branch and need to replace virtual step
238✔
1354
                conditionStepName := engine.findConditionStepInBranch(stepDef, def)
238✔
1355
                if conditionStepName != "" {
269✔
1356
                        // Find Join step for this fork branch
31✔
1357
                        joinStepName, err := engine.findJoinStepForForkBranch(ctx, instance.ID, step.StepName, def)
31✔
1358
                        if err == nil && joinStepName != "" {
49✔
1359
                                virtualStep := fmt.Sprintf("cond#%s", conditionStepName)
18✔
1360
                                // Replace virtual step with real terminal step
18✔
1361
                                if err := engine.store.ReplaceInJoinWaitFor(ctx, instance.ID, joinStepName, virtualStep, step.StepName); err != nil {
18✔
1362
                                        _ = engine.store.LogEvent(ctx, instance.ID, &step.ID, EventStepCompleted, map[string]any{
×
1363
                                                KeyStepName: step.StepName,
×
1364
                                                KeyError:    fmt.Sprintf("Failed to replace virtual step in join waitFor: %v", err),
×
1365
                                        })
×
1366
                                } else {
18✔
1367
                                        _ = engine.store.LogEvent(ctx, instance.ID, &step.ID, EventStepCompleted, map[string]any{
18✔
1368
                                                KeyStepName: step.StepName,
18✔
1369
                                                KeyMessage:  fmt.Sprintf("Replaced virtual step %s with %s in join %s", virtualStep, step.StepName, joinStepName),
18✔
1370
                                        })
18✔
1371
                                        // After replacing virtual step with real step, notify Join about the completion
18✔
1372
                                        // This ensures Join is aware that the real step has completed
18✔
1373
                                        if err := engine.notifyJoinStepsForStep(ctx, instance.ID, joinStepName, step.StepName, true); err != nil {
18✔
1374
                                                _ = engine.store.LogEvent(ctx, instance.ID, &step.ID, EventStepCompleted, map[string]any{
×
1375
                                                        KeyStepName: step.StepName,
×
1376
                                                        KeyError:    fmt.Sprintf("Failed to notify join after virtual step replacement: %v", err),
×
1377
                                                })
×
1378
                                        }
×
1379
                                }
1380
                        }
1381
                } else if engine.isTerminalStepInForkBranch(ctx, instance.ID, step.StepName, def) {
231✔
1382
                        // Not in Condition branch, use dynamic detection
24✔
1383
                        joinStepName, err := engine.findJoinStepForForkBranch(ctx, instance.ID, step.StepName, def)
24✔
1384
                        if err == nil && joinStepName != "" {
48✔
1385
                                // Check if this step is not already in the WaitFor list
24✔
1386
                                joinState, err := engine.store.GetJoinState(ctx, instance.ID, joinStepName)
24✔
1387
                                if err == nil && joinState != nil {
48✔
1388
                                        isAlreadyWaiting := false
24✔
1389
                                        isVirtual := false
24✔
1390
                                        for _, waitFor := range joinState.WaitingFor {
71✔
1391
                                                if waitFor == step.StepName {
65✔
1392
                                                        isAlreadyWaiting = true
18✔
1393
                                                        break
18✔
1394
                                                }
1395
                                                // Check if this is a virtual step (shouldn't happen here, but just in case)
1396
                                                if len(waitFor) > 5 && waitFor[:5] == "cond#" {
33✔
1397
                                                        isVirtual = true
4✔
1398
                                                }
4✔
1399
                                        }
1400
                                        if !isAlreadyWaiting && !isVirtual {
30✔
1401
                                                // Add this terminal step to the Join step's WaitFor list
6✔
1402
                                                if err := engine.store.AddToJoinWaitFor(ctx, instance.ID, joinStepName, step.StepName); err != nil {
6✔
1403
                                                        // Log error but don't fail the step
×
1404
                                                        _ = engine.store.LogEvent(ctx, instance.ID, &step.ID, EventStepCompleted, map[string]any{
×
1405
                                                                KeyStepName: step.StepName,
×
1406
                                                                KeyError:    fmt.Sprintf("Failed to add step to join waitFor: %v", err),
×
1407
                                                        })
×
1408
                                                } else {
6✔
1409
                                                        _ = engine.store.LogEvent(ctx, instance.ID, &step.ID, EventStepCompleted, map[string]any{
6✔
1410
                                                                KeyStepName: step.StepName,
6✔
1411
                                                                KeyMessage:  fmt.Sprintf("Added terminal step to join %s waitFor", joinStepName),
6✔
1412
                                                        })
6✔
1413
                                                }
6✔
1414
                                        }
1415
                                } else {
×
1416
                                        // JoinState doesn't exist yet, create it with this terminal step
×
1417
                                        joinStepDef, ok := def.Definition.Steps[joinStepName]
×
1418
                                        if ok {
×
1419
                                                strategy := joinStepDef.JoinStrategy
×
1420
                                                if strategy == "" {
×
1421
                                                        strategy = JoinStrategyAll
×
1422
                                                }
×
1423
                                                if err := engine.store.CreateJoinState(ctx, instance.ID, joinStepName, []string{step.StepName}, strategy); err != nil {
×
1424
                                                        _ = engine.store.LogEvent(ctx, instance.ID, &step.ID, EventStepCompleted, map[string]any{
×
1425
                                                                KeyStepName: step.StepName,
×
1426
                                                                KeyError:    fmt.Sprintf("Failed to create join state: %v", err),
×
1427
                                                        })
×
1428
                                                }
×
1429
                                        }
1430
                                }
1431
                        }
1432
                }
1433
        }
1434

1435
        if err := engine.notifyJoinSteps(ctx, instance.ID, step.StepName, true); err != nil {
238✔
1436
                return fmt.Errorf("notify join steps: %w", err)
×
1437
        }
×
1438

1439
        if (next && len(stepDef.Next) == 0) || (!next && stepDef.Else == "") {
302✔
1440
                if !engine.hasUnfinishedSteps(ctx, instance.ID) {
96✔
1441
                        return engine.completeWorkflow(ctx, instance, output)
32✔
1442
                }
32✔
1443

1444
                return nil
32✔
1445
        }
1446

1447
        // Get workflow definition if we haven't already
1448
        if def == nil {
174✔
1449
                var err error
×
1450
                def, err = engine.store.GetWorkflowDefinition(ctx, instance.WorkflowID)
×
1451
                if err != nil {
×
1452
                        return fmt.Errorf("get workflow definition: %w", err)
×
1453
                }
×
1454
        }
1455

1456
        if next {
329✔
1457
                for _, nextStepName := range stepDef.Next {
310✔
1458
                        nextStepDef, ok := def.Definition.Steps[nextStepName]
155✔
1459
                        if !ok {
155✔
1460
                                return fmt.Errorf("next step definition not found: %s", nextStepName)
×
1461
                        }
×
1462

1463
                        if nextStepDef.Type == StepTypeJoin {
177✔
1464
                                continue
22✔
1465
                        }
1466

1467
                        if err := engine.enqueueNextSteps(ctx, instance.ID, []string{nextStepName}, output); err != nil {
133✔
1468
                                return err
×
1469
                        }
×
1470
                }
1471
        } else {
19✔
1472
                if err := engine.enqueueNextSteps(ctx, instance.ID, []string{stepDef.Else}, output); err != nil {
19✔
1473
                        return err
×
1474
                }
×
1475
        }
1476

1477
        return nil
174✔
1478
}
1479

1480
func (engine *Engine) handleStepFailure(
1481
        ctx context.Context,
1482
        instance *WorkflowInstance,
1483
        step *WorkflowStep,
1484
        stepDef *StepDefinition,
1485
        stepErr error,
1486
) error {
58✔
1487
        errMsg := stepErr.Error()
58✔
1488

58✔
1489
        if (step.RetryCount == 0 && stepDef.MaxRetries > 0) ||
58✔
1490
                (step.RetryCount > 0 && step.RetryCount < step.MaxRetries && !stepDef.NoIdempotent) {
83✔
1491

25✔
1492
                if err := engine.store.UpdateStep(ctx, step.ID, StepStatusFailed, nil, &errMsg); err != nil {
25✔
1493
                        return fmt.Errorf("update step: %w", err)
×
1494
                }
×
1495

1496
                _ = engine.store.LogEvent(ctx, instance.ID, &step.ID, EventStepRetry, map[string]any{
25✔
1497
                        KeyStepName:   step.StepName,
25✔
1498
                        KeyRetryCount: step.RetryCount + 1,
25✔
1499
                        KeyError:      errMsg,
25✔
1500
                })
25✔
1501

25✔
1502
                return engine.store.EnqueueStep(ctx, instance.ID, &step.ID, PriorityHigh, stepDef.Delay)
25✔
1503
        }
1504

1505
        // If DLQ mode is enabled, pause instead of failing and skip rollback
1506
        if def, defErr := engine.store.GetWorkflowDefinition(ctx, instance.WorkflowID); defErr == nil && def.Definition.DLQEnabled {
37✔
1507
                // Mark step as paused with error
4✔
1508
                if err := engine.store.UpdateStep(ctx, step.ID, StepStatusPaused, nil, &errMsg); err != nil {
4✔
1509
                        return fmt.Errorf("update step (paused): %w", err)
×
1510
                }
×
1511

1512
                _ = engine.store.LogEvent(ctx, instance.ID, &step.ID, EventStepFailed, map[string]any{
4✔
1513
                        KeyStepName: step.StepName,
4✔
1514
                        KeyError:    errMsg,
4✔
1515
                        KeyReason:   "dlq",
4✔
1516
                })
4✔
1517

4✔
1518
                // Notify join steps about failure in this branch
4✔
1519
                if err := engine.notifyJoinSteps(ctx, instance.ID, step.StepName, false); err != nil {
4✔
1520
                        return fmt.Errorf("notify join steps: %w", err)
×
1521
                }
×
1522

1523
                // Create DLQ record
1524
                reason := "dlq enabled: rollback/compensation skipped"
4✔
1525
                rec := &DeadLetterRecord{
4✔
1526
                        InstanceID: step.InstanceID,
4✔
1527
                        WorkflowID: def.ID,
4✔
1528
                        StepID:     step.ID,
4✔
1529
                        StepName:   step.StepName,
4✔
1530
                        StepType:   string(step.StepType),
4✔
1531
                        Input:      step.Input,
4✔
1532
                        Error:      &errMsg,
4✔
1533
                        Reason:     reason,
4✔
1534
                }
4✔
1535
                if err := engine.store.CreateDeadLetterRecord(ctx, rec); err != nil {
4✔
1536
                        return fmt.Errorf("create dead letter record: %w", err)
×
1537
                }
×
1538

1539
                // Freeze execution: pause active running steps and clear the instance queue
1540
                if err := engine.store.PauseActiveStepsAndClearQueue(ctx, instance.ID); err != nil {
4✔
1541
                        return fmt.Errorf("freeze instance for dlq: %w", err)
×
1542
                }
×
1543

1544
                // Set workflow instance to DLQ state
1545
                if err := engine.store.UpdateInstanceStatus(ctx, instance.ID, StatusDLQ, nil, &errMsg); err != nil {
4✔
1546
                        return fmt.Errorf("update instance status to dlq: %w", err)
×
1547
                }
×
1548

1549
                return nil
4✔
1550
        }
1551

1552
        if err := engine.store.UpdateStep(ctx, step.ID, StepStatusFailed, nil, &errMsg); err != nil {
29✔
1553
                return fmt.Errorf("update step: %w", err)
×
1554
        }
×
1555

1556
        _ = engine.store.LogEvent(ctx, instance.ID, &step.ID, EventStepFailed, map[string]any{
29✔
1557
                KeyStepName: step.StepName,
29✔
1558
                KeyError:    errMsg,
29✔
1559
        })
29✔
1560

29✔
1561
        // Check if this is a terminal step in a fork branch with Condition
29✔
1562
        // If so, replace virtual step with real step before notifying Join
29✔
1563
        def, defErr := engine.store.GetWorkflowDefinition(ctx, instance.WorkflowID)
29✔
1564
        if defErr == nil {
58✔
1565
                // Check if we're in a Condition branch and need to replace virtual step
29✔
1566
                conditionStepName := engine.findConditionStepInBranch(stepDef, def)
29✔
1567
                if conditionStepName != "" {
39✔
1568
                        // Find Join step for this fork branch
10✔
1569
                        joinStepName, err := engine.findJoinStepForForkBranch(ctx, instance.ID, step.StepName, def)
10✔
1570
                        if err == nil && joinStepName != "" {
18✔
1571
                                virtualStep := fmt.Sprintf("cond#%s", conditionStepName)
8✔
1572
                                // Replace virtual step with real terminal step (even though it failed)
8✔
1573
                                if err := engine.store.ReplaceInJoinWaitFor(ctx, instance.ID, joinStepName, virtualStep, step.StepName); err != nil {
8✔
1574
                                        _ = engine.store.LogEvent(ctx, instance.ID, &step.ID, EventStepFailed, map[string]any{
×
1575
                                                KeyStepName: step.StepName,
×
1576
                                                KeyError:    fmt.Sprintf("Failed to replace virtual step in join waitFor: %v", err),
×
1577
                                        })
×
1578
                                } else {
8✔
1579
                                        _ = engine.store.LogEvent(ctx, instance.ID, &step.ID, EventStepFailed, map[string]any{
8✔
1580
                                                KeyStepName: step.StepName,
8✔
1581
                                                KeyMessage:  fmt.Sprintf("Replaced virtual step %s with %s in join %s (failed)", virtualStep, step.StepName, joinStepName),
8✔
1582
                                        })
8✔
1583
                                        // After replacing virtual step with real step, notify Join about the failure
8✔
1584
                                        // This ensures Join is aware that the real step has failed
8✔
1585
                                        if err := engine.notifyJoinStepsForStep(ctx, instance.ID, joinStepName, step.StepName, false); err != nil {
8✔
1586
                                                _ = engine.store.LogEvent(ctx, instance.ID, &step.ID, EventStepFailed, map[string]any{
×
1587
                                                        KeyStepName: step.StepName,
×
1588
                                                        KeyError:    fmt.Sprintf("Failed to notify join after virtual step replacement: %v", err),
×
1589
                                                })
×
1590
                                        }
×
1591
                                }
1592
                        }
1593
                }
1594
        }
1595

1596
        if err := engine.notifyJoinSteps(ctx, instance.ID, step.StepName, false); err != nil {
29✔
1597
                return fmt.Errorf("notify join steps: %w", err)
×
1598
        }
×
1599

1600
        // PREVENTIVE FIX: Stop parallel branches before rollback
1601
        // This is the first line of defense - try to prevent parallel steps from completing
1602
        // However, this doesn't guarantee success (step might already be executing in worker)
1603
        if def == nil {
29✔
1604
                var defErr error
×
1605
                def, defErr = engine.store.GetWorkflowDefinition(ctx, instance.WorkflowID)
×
NEW
1606
                if defErr != nil {
×
NEW
1607
                        def = nil
×
NEW
1608
                }
×
1609
        }
1610

1611
        if def != nil {
58✔
1612
                // Check if this step is part of a fork branch
29✔
1613
                if engine.isStepInForkBranch(ctx, instance.ID, step.StepName, def) {
30✔
1614
                        // Stop all active steps in the same fork branch (parallel siblings)
1✔
1615
                        if err := engine.stopParallelBranchesInFork(ctx, instance.ID, step.StepName, def); err != nil {
1✔
NEW
1616
                                slog.Warn("[floxy] failed to stop parallel branches", "error", err)
×
UNCOV
1617
                        }
×
1618
                }
1619
        }
1620

1621
        // Try to rollback to save point before handling failure
1622
        if def != nil {
58✔
1623
                if rollbackErr := engine.rollbackToSavePointOrRoot(ctx, instance.ID, step, def); rollbackErr != nil {
29✔
1624
                        // Log rollback error but continue with failure handling
×
1625
                        _ = engine.store.LogEvent(ctx, instance.ID, &step.ID, EventStepFailed, map[string]any{
×
1626
                                KeyStepName: step.StepName,
×
1627
                                KeyError:    fmt.Sprintf("rollback failed: %v", rollbackErr),
×
1628
                        })
×
1629
                }
×
1630
        }
1631

1632
        if err := engine.store.UpdateInstanceStatus(ctx, instance.ID, StatusFailed, nil, &errMsg); err != nil {
29✔
1633
                return fmt.Errorf("update instance status: %w", err)
×
1634
        }
×
1635

1636
        // PLUGIN HOOK: OnWorkflowFailed
1637
        if engine.pluginManager != nil {
29✔
1638
                finalInstance, _ := engine.store.GetInstance(ctx, instance.ID)
×
1639
                if finalInstance != nil {
×
1640
                        if errPlugin := engine.pluginManager.ExecuteWorkflowFailed(ctx, finalInstance); errPlugin != nil {
×
1641
                                slog.Warn("[floxy] plugin hook OnWorkflowFailed failed", "error", errPlugin)
×
1642
                        }
×
1643
                }
1644
        }
1645

1646
        return nil
29✔
1647
}
1648

1649
// notifyJoinStepsForStep notifies a specific Join step about a specific step completion.
1650
// This is used after replacing virtual steps to ensure Join is aware of real step completion.
1651
func (engine *Engine) notifyJoinStepsForStep(
1652
        ctx context.Context,
1653
        instanceID int64,
1654
        joinStepName, completedStepName string,
1655
        success bool,
1656
) error {
28✔
1657
        instance, err := engine.store.GetInstance(ctx, instanceID)
28✔
1658
        if err != nil {
28✔
1659
                return err
×
1660
        }
×
1661

1662
        steps, err := engine.store.GetStepsByInstance(ctx, instanceID)
28✔
1663
        if err != nil {
28✔
1664
                return err
×
1665
        }
×
1666

1667
        // Get Join step definition to check strategy
1668
        def, err := engine.store.GetWorkflowDefinition(ctx, instance.WorkflowID)
28✔
1669
        if err != nil {
28✔
1670
                return err
×
1671
        }
×
1672

1673
        stepDef, ok := def.Definition.Steps[joinStepName]
28✔
1674
        if !ok || stepDef.Type != StepTypeJoin {
28✔
1675
                return fmt.Errorf("join step %s not found or not a join step", joinStepName)
×
1676
        }
×
1677

1678
        // Update join state for this specific step
1679
        isReady, err := engine.store.UpdateJoinState(ctx, instanceID, joinStepName, completedStepName, success)
28✔
1680
        if err != nil {
28✔
1681
                return fmt.Errorf("update join state for %s: %w", joinStepName, err)
×
1682
        }
×
1683

1684
        // Additional check: don't consider join ready if there are still pending/running steps
1685
        if isReady {
45✔
1686
                hasPendingSteps := engine.hasPendingStepsInParallelBranches(ctx, instanceID, stepDef, steps)
17✔
1687
                if hasPendingSteps {
17✔
1688
                        isReady = false
×
1689
                        _, _ = engine.store.UpdateJoinState(ctx, instanceID, joinStepName, completedStepName, success)
×
1690
                }
×
1691
        }
1692

1693
        _ = engine.store.LogEvent(ctx, instanceID, nil, EventJoinUpdated, map[string]any{
28✔
1694
                KeyJoinStep:      joinStepName,
28✔
1695
                KeyCompletedStep: completedStepName,
28✔
1696
                KeySuccess:       success,
28✔
1697
                KeyIsReady:       isReady,
28✔
1698
        })
28✔
1699

28✔
1700
        if isReady {
45✔
1701
                joinStepExists := false
17✔
1702
                for _, s := range steps {
157✔
1703
                        if s.StepName == joinStepName {
143✔
1704
                                joinStepExists = true
3✔
1705

3✔
1706
                                break
3✔
1707
                        }
1708
                }
1709

1710
                if !joinStepExists {
31✔
1711
                        var joinInput json.RawMessage
14✔
1712
                        for _, s := range steps {
111✔
1713
                                if s.StepName == completedStepName {
111✔
1714
                                        joinInput = s.Input
14✔
1715

14✔
1716
                                        break
14✔
1717
                                }
1718
                        }
1719

1720
                        joinStep := &WorkflowStep{
14✔
1721
                                InstanceID: instanceID,
14✔
1722
                                StepName:   joinStepName,
14✔
1723
                                StepType:   StepTypeJoin,
14✔
1724
                                Status:     StepStatusPending,
14✔
1725
                                Input:      joinInput,
14✔
1726
                                MaxRetries: 0,
14✔
1727
                        }
14✔
1728

14✔
1729
                        if instance.Status == StatusDLQ {
15✔
1730
                                joinStep.Status = StepStatusPaused
1✔
1731
                                if err := engine.store.CreateStep(ctx, joinStep); err != nil {
1✔
1732
                                        return fmt.Errorf("create join step: %w", err)
×
1733
                                }
×
1734
                                _ = engine.store.LogEvent(ctx, instanceID, &joinStep.ID, EventJoinReady, map[string]any{
1✔
1735
                                        KeyJoinStep: joinStepName,
1✔
1736
                                })
1✔
1737
                        } else {
13✔
1738
                                if err := engine.store.CreateStep(ctx, joinStep); err != nil {
13✔
1739
                                        return fmt.Errorf("create join step: %w", err)
×
1740
                                }
×
1741
                                if err := engine.store.EnqueueStep(ctx, instanceID, &joinStep.ID, PriorityNormal, 0); err != nil {
13✔
1742
                                        return fmt.Errorf("enqueue join step: %w", err)
×
1743
                                }
×
1744
                                _ = engine.store.LogEvent(ctx, instanceID, &joinStep.ID, EventJoinReady, map[string]any{
13✔
1745
                                        KeyJoinStep: joinStepName,
13✔
1746
                                })
13✔
1747
                        }
1748
                }
1749
        }
1750

1751
        return nil
28✔
1752
}
1753

1754
func (engine *Engine) notifyJoinSteps(
1755
        ctx context.Context,
1756
        instanceID int64,
1757
        completedStepName string,
1758
        success bool,
1759
) error {
273✔
1760
        instance, err := engine.store.GetInstance(ctx, instanceID)
273✔
1761
        if err != nil {
273✔
1762
                return err
×
1763
        }
×
1764

1765
        def, err := engine.store.GetWorkflowDefinition(ctx, instance.WorkflowID)
273✔
1766
        if err != nil {
273✔
1767
                return err
×
1768
        }
×
1769

1770
        steps, err := engine.store.GetStepsByInstance(ctx, instanceID)
273✔
1771
        if err != nil {
273✔
1772
                return err
×
1773
        }
×
1774

1775
        for stepName, stepDef := range def.Definition.Steps {
2,772✔
1776
                if stepDef.Type != StepTypeJoin {
4,813✔
1777
                        continue
2,314✔
1778
                }
1779

1780
                // Check if Join is waiting for this step by checking JoinState (which has actual waitFor after replacements)
1781
                // Also check stepDef.WaitFor for backwards compatibility and for virtual steps that haven't been replaced yet
1782
                waitingForThis := false
185✔
1783

185✔
1784
                // First check JoinState (actual state after virtual step replacements)
185✔
1785
                joinState, err := engine.store.GetJoinState(ctx, instanceID, stepName)
185✔
1786
                if err == nil && joinState != nil {
338✔
1787
                        for _, waitFor := range joinState.WaitingFor {
481✔
1788
                                if waitFor == completedStepName {
380✔
1789
                                        waitingForThis = true
52✔
1790

52✔
1791
                                        break
52✔
1792
                                }
1793
                        }
1794
                }
1795

1796
                // Also check stepDef.WaitFor for virtual steps and initial setup
1797
                if !waitingForThis {
318✔
1798
                        for _, waitFor := range stepDef.WaitFor {
414✔
1799
                                if waitFor == completedStepName {
283✔
1800
                                        waitingForThis = true
2✔
1801

2✔
1802
                                        break
2✔
1803
                                }
1804
                        }
1805
                }
1806

1807
                if !waitingForThis {
316✔
1808
                        continue
131✔
1809
                }
1810

1811
                isReady, err := engine.store.UpdateJoinState(ctx, instanceID, stepName, completedStepName, success)
54✔
1812
                if err != nil {
54✔
1813
                        return fmt.Errorf("update join state for %s: %w", stepName, err)
×
1814
                }
×
1815

1816
                // Additional check: don't consider join ready if there are still pending/running steps
1817
                // in parallel branches that could affect the join result
1818
                if isReady {
84✔
1819
                        hasPendingSteps := engine.hasPendingStepsInParallelBranches(ctx, instanceID, stepDef, steps)
30✔
1820
                        if hasPendingSteps {
31✔
1821
                                isReady = false
1✔
1822
                                // Update the join state to reflect that it's not ready
1✔
1823
                                _, _ = engine.store.UpdateJoinState(ctx, instanceID, stepName, completedStepName, success)
1✔
1824
                        }
1✔
1825
                }
1826

1827
                _ = engine.store.LogEvent(ctx, instanceID, nil, EventJoinUpdated, map[string]any{
54✔
1828
                        KeyJoinStep:      stepName,
54✔
1829
                        KeyCompletedStep: completedStepName,
54✔
1830
                        KeySuccess:       success,
54✔
1831
                        KeyIsReady:       isReady,
54✔
1832
                })
54✔
1833

54✔
1834
                if isReady {
83✔
1835
                        joinStepExists := false
29✔
1836
                        for _, s := range steps {
270✔
1837
                                if s.StepName == stepName {
262✔
1838
                                        joinStepExists = true
21✔
1839

21✔
1840
                                        break
21✔
1841
                                }
1842
                        }
1843

1844
                        if !joinStepExists {
37✔
1845
                                var joinInput json.RawMessage
8✔
1846
                                for _, s := range steps {
49✔
1847
                                        if s.StepName == completedStepName {
49✔
1848
                                                joinInput = s.Input
8✔
1849

8✔
1850
                                                break
8✔
1851
                                        }
1852
                                }
1853

1854
                                joinStep := &WorkflowStep{
8✔
1855
                                        InstanceID: instanceID,
8✔
1856
                                        StepName:   stepName,
8✔
1857
                                        StepType:   StepTypeJoin,
8✔
1858
                                        Status:     StepStatusPending,
8✔
1859
                                        Input:      joinInput,
8✔
1860
                                        MaxRetries: 0,
8✔
1861
                                }
8✔
1862

8✔
1863
                                // If the instance is in DLQ, create the join step as paused and do not enqueue it
8✔
1864
                                if instance.Status == StatusDLQ {
8✔
1865
                                        joinStep.Status = StepStatusPaused
×
1866
                                        if err := engine.store.CreateStep(ctx, joinStep); err != nil {
×
1867
                                                return fmt.Errorf("create join step: %w", err)
×
1868
                                        }
×
1869
                                        _ = engine.store.LogEvent(ctx, instanceID, &joinStep.ID, EventJoinReady, map[string]any{
×
1870
                                                KeyJoinStep: stepName,
×
1871
                                        })
×
1872
                                } else {
8✔
1873
                                        if err := engine.store.CreateStep(ctx, joinStep); err != nil {
8✔
1874
                                                return fmt.Errorf("create join step: %w", err)
×
1875
                                        }
×
1876
                                        if err := engine.store.EnqueueStep(ctx, instanceID, &joinStep.ID, PriorityNormal, 0); err != nil {
8✔
1877
                                                return fmt.Errorf("enqueue join step: %w", err)
×
1878
                                        }
×
1879
                                        _ = engine.store.LogEvent(ctx, instanceID, &joinStep.ID, EventJoinReady, map[string]any{
8✔
1880
                                                KeyJoinStep: stepName,
8✔
1881
                                        })
8✔
1882
                                }
1883
                        }
1884
                }
1885
        }
1886

1887
        return nil
273✔
1888
}
1889

1890
func (engine *Engine) hasUnfinishedSteps(ctx context.Context, instanceID int64) bool {
77✔
1891
        steps, err := engine.store.GetStepsByInstance(ctx, instanceID)
77✔
1892
        if err != nil {
77✔
1893
                return false
×
1894
        }
×
1895

1896
        for _, step := range steps {
514✔
1897
                if step.Status == StepStatusPending || step.Status == StepStatusRunning || step.Status == StepStatusPaused {
473✔
1898
                        return true
36✔
1899
                }
36✔
1900
        }
1901

1902
        return false
41✔
1903
}
1904

1905
func (engine *Engine) hasFailedOrRolledBackSteps(ctx context.Context, instanceID int64) bool {
38✔
1906
        steps, err := engine.store.GetStepsByInstance(ctx, instanceID)
38✔
1907
        if err != nil {
38✔
1908
                return false
×
1909
        }
×
1910

1911
        for _, step := range steps {
193✔
1912
                if step.Status == StepStatusFailed || step.Status == StepStatusRolledBack {
161✔
1913
                        return true
6✔
1914
                }
6✔
1915
        }
1916

1917
        return false
32✔
1918
}
1919

1920
func (engine *Engine) hasStepsInCompensation(ctx context.Context, instanceID int64) bool {
10✔
1921
        steps, err := engine.store.GetStepsByInstance(ctx, instanceID)
10✔
1922
        if err != nil {
10✔
1923
                return false
×
1924
        }
×
1925

1926
        for _, step := range steps {
34✔
1927
                if step.Status == StepStatusCompensation {
28✔
1928
                        return true
4✔
1929
                }
4✔
1930
        }
1931

1932
        return false
6✔
1933
}
1934

1935
// enqueueCompletedStepsForRollback finds completed steps after savepoint and enqueues them for compensation.
1936
// This follows the saga pattern: rollback happens step-by-step through the queue, not in one transaction.
1937
func (engine *Engine) enqueueCompletedStepsForRollback(ctx context.Context, instanceID int64) error {
1✔
1938
        steps, err := engine.store.GetStepsByInstance(ctx, instanceID)
1✔
1939
        if err != nil {
1✔
1940
                return fmt.Errorf("get steps by instance: %w", err)
×
1941
        }
×
1942

1943
        // Find the last savepoint by created_at
1944
        var lastSavePoint *WorkflowStep
1✔
1945
        for i := range steps {
5✔
1946
                step := &steps[i]
4✔
1947
                if step.StepType == StepTypeSavePoint {
4✔
1948
                        if lastSavePoint == nil || step.CreatedAt.After(lastSavePoint.CreatedAt) {
×
1949
                                lastSavePoint = step
×
1950
                        }
×
1951
                }
1952
        }
1953

1954
        // Get workflow definition
1955
        instance, err := engine.store.GetInstance(ctx, instanceID)
1✔
1956
        if err != nil {
1✔
1957
                return fmt.Errorf("get instance: %w", err)
×
1958
        }
×
1959

1960
        def, err := engine.store.GetWorkflowDefinition(ctx, instance.WorkflowID)
1✔
1961
        if err != nil {
1✔
1962
                return fmt.Errorf("get workflow definition: %w", err)
×
1963
        }
×
1964

1965
        // Find all completed steps created after the last savepoint
1966
        var stepsToRollback []*WorkflowStep
1✔
1967
        for i := range steps {
5✔
1968
                step := &steps[i]
4✔
1969
                // Only process completed steps that need rollback
4✔
1970
                if step.Status != StepStatusCompleted {
7✔
1971
                        continue
3✔
1972
                }
1973
                // If there's a savepoint, only rollback steps created after it
1974
                if lastSavePoint != nil && step.CreatedAt.After(lastSavePoint.CreatedAt) {
1✔
1975
                        // Check if step has compensation handler defined
×
1976
                        stepDef, ok := def.Definition.Steps[step.StepName]
×
1977
                        if ok && stepDef.OnFailure != "" {
×
1978
                                stepsToRollback = append(stepsToRollback, step)
×
1979
                        } else {
×
1980
                                // No compensation handler, just mark as rolled_back
×
1981
                                if err := engine.store.UpdateStep(ctx, step.ID, StepStatusRolledBack, step.Input, nil); err != nil {
×
1982
                                        slog.Warn("[floxy] failed to mark step as rolled_back", "step_id", step.ID, "error", err)
×
1983
                                }
×
1984
                                _ = engine.store.LogEvent(ctx, instanceID, &step.ID, EventStepCompleted, map[string]any{
×
1985
                                        KeyStepName: step.StepName,
×
1986
                                        KeyReason:   "rolled_back_no_compensation",
×
1987
                                })
×
1988
                        }
1989
                } else if lastSavePoint == nil {
2✔
1990
                        // If no savepoint, rollback all completed steps
1✔
1991
                        stepDef, ok := def.Definition.Steps[step.StepName]
1✔
1992
                        if ok && stepDef.OnFailure != "" {
2✔
1993
                                stepsToRollback = append(stepsToRollback, step)
1✔
1994
                        } else {
1✔
1995
                                if err := engine.store.UpdateStep(ctx, step.ID, StepStatusRolledBack, step.Input, nil); err != nil {
×
1996
                                        slog.Warn("[floxy] failed to mark step as rolled_back", "step_id", step.ID, "error", err)
×
1997
                                }
×
1998
                                _ = engine.store.LogEvent(ctx, instanceID, &step.ID, EventStepCompleted, map[string]any{
×
1999
                                        KeyStepName: step.StepName,
×
2000
                                        KeyReason:   "rolled_back_no_compensation",
×
2001
                                })
×
2002
                        }
2003
                }
2004
        }
2005

2006
        sort.Slice(stepsToRollback, func(i, j int) bool {
1✔
2007
                return stepsToRollback[i].CreatedAt.After(stepsToRollback[j].CreatedAt)
×
2008
        })
×
2009

2010
        // Mark each step as requiring compensation and enqueue for processing
2011
        for idx, step := range stepsToRollback {
2✔
2012
                // Update step status to compensation with retry count = 0
1✔
2013
                if err := engine.store.UpdateStepCompensationRetry(ctx, step.ID, 1, StepStatusCompensation); err != nil {
1✔
2014
                        slog.Warn("[floxy] failed to mark step for compensation", "step_id", step.ID, "error", err)
×
2015
                        continue
×
2016
                }
2017

2018
                // Enqueue step for compensation processing
2019
                delay := time.Millisecond * time.Duration(idx*10)
1✔
2020
                if err := engine.store.EnqueueStep(ctx, instanceID, &step.ID, PriorityHigh, delay); err != nil {
1✔
2021
                        slog.Warn("[floxy] failed to enqueue step for compensation", "step_id", step.ID, "error", err)
×
2022
                        continue
×
2023
                }
2024

2025
                _ = engine.store.LogEvent(ctx, instanceID, &step.ID, EventStepStarted, map[string]any{
1✔
2026
                        KeyStepName: step.StepName,
1✔
2027
                        KeyReason:   "enqueued_for_rollback_after_failure",
1✔
2028
                })
1✔
2029
        }
2030

2031
        return nil
1✔
2032
}
2033

2034
func (engine *Engine) enqueueNextSteps(
2035
        ctx context.Context,
2036
        instanceID int64,
2037
        nextSteps []string,
2038
        input json.RawMessage,
2039
) error {
207✔
2040
        instance, err := engine.store.GetInstance(ctx, instanceID)
207✔
2041
        if err != nil {
207✔
2042
                return fmt.Errorf("get instance: %w", err)
×
2043
        }
×
2044

2045
        // Do not enqueue new steps while the instance is in DLQ state
2046
        if instance.Status == StatusDLQ {
207✔
2047
                return nil
×
2048
        }
×
2049

2050
        def, err := engine.store.GetWorkflowDefinition(ctx, instance.WorkflowID)
207✔
2051
        if err != nil {
207✔
2052
                return fmt.Errorf("get workflow definition: %w", err)
×
2053
        }
×
2054

2055
        for _, nextStepName := range nextSteps {
414✔
2056
                stepDef, ok := def.Definition.Steps[nextStepName]
207✔
2057
                if !ok {
207✔
2058
                        return fmt.Errorf("step definition not found: %s", nextStepName)
×
2059
                }
×
2060

2061
                step := &WorkflowStep{
207✔
2062
                        InstanceID: instanceID,
207✔
2063
                        StepName:   nextStepName,
207✔
2064
                        StepType:   stepDef.Type,
207✔
2065
                        Status:     StepStatusPending,
207✔
2066
                        Input:      input,
207✔
2067
                        MaxRetries: stepDef.MaxRetries,
207✔
2068
                }
207✔
2069

207✔
2070
                if err := engine.store.CreateStep(ctx, step); err != nil {
207✔
2071
                        return fmt.Errorf("create step: %w", err)
×
2072
                }
×
2073

2074
                if err := engine.store.EnqueueStep(ctx, instanceID, &step.ID, PriorityNormal, stepDef.Delay); err != nil {
207✔
2075
                        return fmt.Errorf("enqueue step: %w", err)
×
2076
                }
×
2077
        }
2078

2079
        return nil
207✔
2080
}
2081

2082
func (engine *Engine) createFirstStep(ctx context.Context, instance *WorkflowInstance) (*WorkflowStep, error) {
×
2083
        def, err := engine.store.GetWorkflowDefinition(ctx, instance.WorkflowID)
×
2084
        if err != nil {
×
2085
                return nil, err
×
2086
        }
×
2087

2088
        startStepDef, ok := def.Definition.Steps[def.Definition.Start]
×
2089
        if !ok {
×
2090
                return nil, fmt.Errorf("start step definition not found: %s", def.Definition.Start)
×
2091
        }
×
2092

2093
        step := &WorkflowStep{
×
2094
                InstanceID: instance.ID,
×
2095
                StepName:   def.Definition.Start,
×
2096
                StepType:   startStepDef.Type,
×
2097
                Status:     StepStatusPending,
×
2098
                Input:      instance.Input,
×
2099
                MaxRetries: startStepDef.MaxRetries,
×
2100
        }
×
2101

×
2102
        if err := engine.store.CreateStep(ctx, step); err != nil {
×
2103
                return nil, err
×
2104
        }
×
2105

2106
        return step, nil
×
2107
}
2108

2109
func (engine *Engine) completeWorkflow(ctx context.Context, instance *WorkflowInstance, output json.RawMessage) error {
32✔
2110
        // Check if there are any failed or rolled_back steps
32✔
2111
        // If so, the workflow should be marked as failed, not completed
32✔
2112
        if engine.hasFailedOrRolledBackSteps(ctx, instance.ID) {
33✔
2113
                // REACTIVE FIX: Enqueue completed steps after savepoint for compensation
1✔
2114
                // This is the second line of defense - handles steps that completed despite
1✔
2115
                // preventive measures (stopParallelBranchesInFork).
1✔
2116
                // Race condition scenario: worker already picked up step from queue before we marked it as skipped.
1✔
2117
                // DEFENSE IN DEPTH: Preventive (stop pending/running) + Reactive (rollback completed)
1✔
2118
                if err := engine.enqueueCompletedStepsForRollback(ctx, instance.ID); err != nil {
1✔
2119
                        slog.Warn("[floxy] failed to enqueue completed steps for rollback", "error", err)
×
2120
                        // Continue with marking workflow as failed even if enqueue fails
×
2121
                }
×
2122

2123
                // Check if there are any steps enqueued for compensation
2124
                // If yes, don't mark workflow as failed yet - let compensation finish first
2125
                hasCompensationSteps := engine.hasStepsInCompensation(ctx, instance.ID)
1✔
2126
                if hasCompensationSteps {
2✔
2127
                        // Compensation is in progress, don't mark as failed yet
1✔
2128
                        _ = engine.store.LogEvent(ctx, instance.ID, nil, EventWorkflowFailed, map[string]any{
1✔
2129
                                KeyWorkflowID: instance.WorkflowID,
1✔
2130
                                KeyReason:     "compensation_in_progress",
1✔
2131
                        })
1✔
2132
                        return nil
1✔
2133
                }
1✔
2134

2135
                errMsg := "workflow has failed or rolled back steps"
×
2136
                if err := engine.store.UpdateInstanceStatus(ctx, instance.ID, StatusFailed, nil, &errMsg); err != nil {
×
2137
                        return fmt.Errorf("update instance status to failed: %w", err)
×
2138
                }
×
2139

2140
                _ = engine.store.LogEvent(ctx, instance.ID, nil, EventWorkflowFailed, map[string]any{
×
2141
                        KeyWorkflowID: instance.WorkflowID,
×
2142
                        KeyReason:     errMsg,
×
2143
                })
×
2144

×
2145
                // PLUGIN HOOK: OnWorkflowFailed
×
2146
                if engine.pluginManager != nil {
×
2147
                        finalInstance, _ := engine.store.GetInstance(ctx, instance.ID)
×
2148
                        if finalInstance != nil {
×
2149
                                if errPlugin := engine.pluginManager.ExecuteWorkflowFailed(ctx, finalInstance); errPlugin != nil {
×
2150
                                        slog.Warn("[floxy] plugin hook OnWorkflowFailed failed", "error", errPlugin)
×
2151
                                }
×
2152
                        }
2153
                }
2154

2155
                return nil
×
2156
        }
2157

2158
        if err := engine.store.UpdateInstanceStatus(ctx, instance.ID, StatusCompleted, output, nil); err != nil {
31✔
2159
                return fmt.Errorf("update instance status: %w", err)
×
2160
        }
×
2161

2162
        _ = engine.store.LogEvent(ctx, instance.ID, nil, EventWorkflowCompleted, map[string]any{
31✔
2163
                KeyWorkflowID: instance.WorkflowID,
31✔
2164
        })
31✔
2165

31✔
2166
        // PLUGIN HOOK: OnWorkflowComplete
31✔
2167
        if engine.pluginManager != nil {
31✔
2168
                // Reload instance to get the final state
×
2169
                finalInstance, _ := engine.store.GetInstance(ctx, instance.ID)
×
2170
                if finalInstance != nil {
×
2171
                        if errPlugin := engine.pluginManager.ExecuteWorkflowComplete(ctx, finalInstance); errPlugin != nil {
×
2172
                                slog.Warn("[floxy] plugin hook OnWorkflowComplete failed", "error", errPlugin)
×
2173
                        }
×
2174
                }
2175
        }
2176

2177
        return nil
31✔
2178
}
2179

2180
func (engine *Engine) GetStatus(ctx context.Context, instanceID int64) (WorkflowStatus, error) {
40✔
2181
        instance, err := engine.store.GetInstance(ctx, instanceID)
40✔
2182
        if err != nil {
40✔
2183
                return "", fmt.Errorf("get instance: %w", err)
×
2184
        }
×
2185

2186
        return instance.Status, nil
40✔
2187
}
2188

2189
func (engine *Engine) GetSteps(ctx context.Context, instanceID int64) ([]WorkflowStep, error) {
47✔
2190
        return engine.store.GetStepsByInstance(ctx, instanceID)
47✔
2191
}
47✔
2192

2193
func (engine *Engine) HumanDecisionWaitingEvents() <-chan HumanDecisionWaitingEvent {
×
2194
        engine.humanDecisionWaitingOnce.Do(func() {
×
2195
                engine.humanDecisionWaitingEvents = make(chan HumanDecisionWaitingEvent)
×
2196
        })
×
2197

2198
        return engine.humanDecisionWaitingEvents
×
2199
}
2200

2201
func (engine *Engine) validateDefinition(def *WorkflowDefinition) error {
69✔
2202
        if def.Name == "" {
71✔
2203
                return fmt.Errorf("workflow name is required")
2✔
2204
        }
2✔
2205

2206
        if def.Definition.Start == "" {
69✔
2207
                return fmt.Errorf("start step is required")
2✔
2208
        }
2✔
2209

2210
        if len(def.Definition.Steps) == 0 {
67✔
2211
                return fmt.Errorf("at least one step is required")
2✔
2212
        }
2✔
2213

2214
        if _, ok := def.Definition.Steps[def.Definition.Start]; !ok {
65✔
2215
                return fmt.Errorf("start step not found: %s", def.Definition.Start)
2✔
2216
        }
2✔
2217

2218
        for stepName, stepDef := range def.Definition.Steps {
412✔
2219
                for _, nextStep := range stepDef.Next {
537✔
2220
                        if _, ok := def.Definition.Steps[nextStep]; !ok {
188✔
2221
                                return fmt.Errorf("step %s references unknown step: %s", stepName, nextStep)
2✔
2222
                        }
2✔
2223
                }
2224

2225
                if stepDef.OnFailure != "" {
372✔
2226
                        if _, ok := def.Definition.Steps[stepDef.OnFailure]; !ok {
25✔
2227
                                return fmt.Errorf("step %s references unknown compensation step: %s",
2✔
2228
                                        stepName, stepDef.OnFailure)
2✔
2229
                        }
2✔
2230
                }
2231

2232
                for _, parallelStep := range stepDef.Parallel {
394✔
2233
                        if _, ok := def.Definition.Steps[parallelStep]; !ok {
49✔
2234
                                return fmt.Errorf("step %s references unknown parallel step: %s", stepName, parallelStep)
2✔
2235
                        }
2✔
2236
                }
2237
        }
2238

2239
        return nil
55✔
2240
}
2241

2242
func (engine *Engine) rollbackToSavePointOrRoot(
2243
        ctx context.Context,
2244
        instanceID int64,
2245
        failedStep *WorkflowStep,
2246
        def *WorkflowDefinition,
2247
) error {
29✔
2248
        savePointName := engine.findNearestSavePoint(failedStep.StepName, def) // nearest save point or empty string (root)
29✔
2249

29✔
2250
        return engine.rollbackStepsToSavePoint(ctx, instanceID, failedStep, savePointName, def)
29✔
2251
}
29✔
2252

2253
func (engine *Engine) findNearestSavePoint(stepName string, def *WorkflowDefinition) string {
29✔
2254
        visited := make(map[string]bool)
29✔
2255

29✔
2256
        for stepName != "" {
148✔
2257
                if visited[stepName] {
119✔
2258
                        break // Prevent infinite loops
×
2259
                }
2260
                visited[stepName] = true
119✔
2261

119✔
2262
                stepDef, ok := def.Definition.Steps[stepName]
119✔
2263
                if !ok {
142✔
2264
                        break
23✔
2265
                }
2266
                if stepDef.Prev == "" {
98✔
2267
                        for _, stepDefCurr := range def.Definition.Steps {
5✔
2268
                                if stepDefCurr.Prev != "" {
3✔
2269
                                        stepDef = stepDefCurr
×
2270

×
2271
                                        break
×
2272
                                }
2273
                        }
2274
                }
2275

2276
                if stepDef.Type == StepTypeSavePoint {
100✔
2277
                        return stepName
4✔
2278
                }
4✔
2279

2280
                stepName = stepDef.Prev
92✔
2281
        }
2282

2283
        return rootStepName
25✔
2284
}
2285

2286
func (engine *Engine) rollbackStepsToSavePoint(
2287
        ctx context.Context,
2288
        instanceID int64,
2289
        failedStep *WorkflowStep,
2290
        savePointName string,
2291
        def *WorkflowDefinition,
2292
) error {
29✔
2293
        steps, err := engine.store.GetStepsByInstance(ctx, instanceID)
29✔
2294
        if err != nil {
29✔
2295
                return fmt.Errorf("get steps by instance: %w", err)
×
2296
        }
×
2297

2298
        stepMap := make(map[string]*WorkflowStep)
29✔
2299
        for _, step := range steps {
223✔
2300
                step := step
194✔
2301
                stepMap[step.StepName] = &step
194✔
2302
        }
194✔
2303
        if _, exists := stepMap[failedStep.StepName]; !exists {
29✔
2304
                stepMap[failedStep.StepName] = failedStep
×
2305
        }
×
2306

2307
        return engine.rollbackStepChain(ctx, instanceID, failedStep.StepName, savePointName, def, stepMap, false, 0)
29✔
2308
}
2309

2310
func (engine *Engine) rollbackStepChain(
2311
        ctx context.Context,
2312
        instanceID int64,
2313
        currentStep, savePointName string,
2314
        def *WorkflowDefinition,
2315
        stepMap map[string]*WorkflowStep,
2316
        isParallel bool,
2317
        depth int,
2318
) error {
230✔
2319
        // PLUGIN HOOK: OnRollbackStepChain
230✔
2320
        if engine.pluginManager != nil {
230✔
2321
                if err := engine.pluginManager.ExecuteRollbackStepChain(ctx, instanceID, currentStep, depth); err != nil {
×
2322
                        slog.Warn("[floxy] plugin hook OnRollbackStepChain failed", "error", err)
×
2323
                }
×
2324
        }
2325

2326
        if currentStep == savePointName {
261✔
2327
                return nil // Reached save point
31✔
2328
        }
31✔
2329

2330
        stepDef, ok := def.Definition.Steps[currentStep]
199✔
2331
        if !ok {
200✔
2332
                return fmt.Errorf("step definition not found: %s", currentStep)
1✔
2333
        }
1✔
2334

2335
        // First, traverse to the end of the chain (depth-first)
2336
        // Handle parallel steps (fork branches)
2337
        if stepDef.Type == StepTypeFork || stepDef.Type == StepTypeParallel {
219✔
2338
                for _, parallelStepName := range stepDef.Parallel {
63✔
2339
                        if err := engine.rollbackStepChain(ctx, instanceID, parallelStepName, savePointName, def, stepMap, true, depth+1); err != nil {
42✔
2340
                                return err
×
2341
                        }
×
2342
                }
2343
        }
2344

2345
        // For parallel branches, traverse all subsequent steps in the chain
2346
        if isParallel {
298✔
2347
                // For condition steps, we need to determine which branch was executed
100✔
2348
                if stepDef.Type == StepTypeCondition {
133✔
2349
                        // Check if the condition step was executed and determine which branch was taken
33✔
2350
                        if step, exists := stepMap[currentStep]; exists && step.Status == StepStatusCompleted {
50✔
2351
                                // Determine which branch was executed by checking which subsequent steps exist
17✔
2352
                                executedBranch := engine.determineExecutedBranch(stepDef, stepMap)
17✔
2353

17✔
2354
                                if executedBranch == "next" {
25✔
2355
                                        // Rollback the Next branch
8✔
2356
                                        for _, nextStepName := range stepDef.Next {
16✔
2357
                                                if err := engine.rollbackStepChain(ctx, instanceID, nextStepName, savePointName, def, stepMap, true, depth+1); err != nil {
8✔
2358
                                                        return err
×
2359
                                                }
×
2360
                                        }
2361
                                } else if executedBranch == "else" && stepDef.Else != "" {
18✔
2362
                                        // Rollback the Else branch
9✔
2363
                                        if err := engine.rollbackStepChain(ctx, instanceID, stepDef.Else, savePointName, def, stepMap, true, depth+1); err != nil {
9✔
2364
                                                return err
×
2365
                                        }
×
2366
                                }
2367
                                // If no branch was executed, skip rollback for this condition step
2368
                        }
2369
                } else {
67✔
2370
                        // For non-condition steps, traverse all next steps
67✔
2371
                        for _, nextStepName := range stepDef.Next {
107✔
2372
                                if err := engine.rollbackStepChain(ctx, instanceID, nextStepName, savePointName, def, stepMap, true, depth+1); err != nil {
40✔
2373
                                        return err
×
2374
                                }
×
2375
                        }
2376
                }
2377
        }
2378

2379
        // Do the actual rollback BEFORE traversing to previous steps (reverse order)
2380
        if step, exists := stepMap[currentStep]; exists &&
198✔
2381
                (step.Status == StepStatusCompleted || step.Status == StepStatusFailed) {
333✔
2382
                if err := engine.rollbackStep(ctx, step, def); err != nil {
135✔
2383
                        return fmt.Errorf("rollback step %s: %w", currentStep, err)
×
2384
                }
×
2385
        }
2386

2387
        // Continue with a previous step (traverse backwards)
2388
        if stepDef.Prev != "" && !isParallel {
294✔
2389
                if err := engine.rollbackStepChain(ctx, instanceID, stepDef.Prev, savePointName, def, stepMap, isParallel, depth+1); err != nil {
96✔
2390
                        return err
×
2391
                }
×
2392
        }
2393

2394
        return nil
198✔
2395
}
2396

2397
func (engine *Engine) rollbackStep(ctx context.Context, step *WorkflowStep, def *WorkflowDefinition) error {
138✔
2398
        stepDef, ok := def.Definition.Steps[step.StepName]
138✔
2399
        if !ok {
138✔
2400
                return fmt.Errorf("step definition not found: %s", step.StepName)
×
2401
        }
×
2402

2403
        onFailureStep, ok := def.Definition.Steps[stepDef.OnFailure]
138✔
2404
        if !ok {
257✔
2405
                // No compensation handler, mark as rolled back directly
119✔
2406
                if err := engine.store.UpdateStep(ctx, step.ID, StepStatusRolledBack, step.Input, nil); err != nil {
119✔
2407
                        return fmt.Errorf("update step status: %w", err)
×
2408
                }
×
2409

2410
                return nil
119✔
2411
        }
2412

2413
        // Check if we need to retry compensation
2414
        if step.CompensationRetryCount >= onFailureStep.MaxRetries {
22✔
2415
                // Max compensation retries exceeded, mark as failed
3✔
2416
                if err := engine.store.UpdateStep(ctx, step.ID, StepStatusFailed, step.Input, nil); err != nil {
3✔
2417
                        return fmt.Errorf("update step status: %w", err)
×
2418
                }
×
2419
                reason := "compensation max retries exceeded"
3✔
2420
                _ = engine.store.LogEvent(ctx, step.InstanceID, &step.ID, EventStepFailed, map[string]any{
3✔
2421
                        KeyStepName: step.StepName,
3✔
2422
                        KeyStepType: step.StepType,
3✔
2423
                        KeyError:    reason,
3✔
2424
                })
3✔
2425

3✔
2426
                // Send to Dead Letter Queue
3✔
2427
                rec := &DeadLetterRecord{
3✔
2428
                        InstanceID: step.InstanceID,
3✔
2429
                        WorkflowID: def.ID,
3✔
2430
                        StepID:     step.ID,
3✔
2431
                        StepName:   step.StepName,
3✔
2432
                        StepType:   string(step.StepType),
3✔
2433
                        Input:      step.Input,
3✔
2434
                        Error:      step.Error,
3✔
2435
                        Reason:     reason,
3✔
2436
                }
3✔
2437
                if err := engine.store.CreateDeadLetterRecord(ctx, rec); err != nil {
3✔
2438
                        return fmt.Errorf("create dead letter record: %w", err)
×
2439
                }
×
2440

2441
                return nil
3✔
2442
        }
2443

2444
        // Increment compensation retry count and update status to compensation
2445
        newRetryCount := step.CompensationRetryCount + 1
16✔
2446
        if err := engine.store.UpdateStepCompensationRetry(ctx, step.ID, newRetryCount, StepStatusCompensation); err != nil {
16✔
2447
                return fmt.Errorf("update step compensation retry: %w", err)
×
2448
        }
×
2449

2450
        // Enqueue compensation step for execution
2451
        retryDelay := CalculateRetryDelay(stepDef.RetryStrategy, stepDef.RetryDelay, newRetryCount)
16✔
2452
        if err := engine.store.EnqueueStep(ctx, step.InstanceID, &step.ID, PriorityHigh, retryDelay); err != nil {
16✔
2453
                return fmt.Errorf("enqueue compensation step: %w", err)
×
2454
        }
×
2455

2456
        _ = engine.store.LogEvent(ctx, step.InstanceID, &step.ID, EventStepStarted, map[string]any{
16✔
2457
                KeyStepName:   step.StepName,
16✔
2458
                KeyStepType:   step.StepType,
16✔
2459
                KeyRetryCount: newRetryCount,
16✔
2460
                KeyReason:     "compensation",
16✔
2461
        })
16✔
2462

16✔
2463
        return nil
16✔
2464
}
2465

2466
func (engine *Engine) determineExecutedBranch(
2467
        stepDef *StepDefinition,
2468
        stepMap map[string]*WorkflowStep,
2469
) string {
23✔
2470
        // Check if any steps from the Next branch were executed
23✔
2471
        for _, nextStepName := range stepDef.Next {
46✔
2472
                if step, exists := stepMap[nextStepName]; exists &&
23✔
2473
                        (step.Status == StepStatusCompleted ||
23✔
2474
                                step.Status == StepStatusFailed ||
23✔
2475
                                step.Status == StepStatusCompensation) {
33✔
2476
                        return "next"
10✔
2477
                }
10✔
2478
        }
2479

2480
        // Check if any steps from the Else branch were executed
2481
        if stepDef.Else != "" {
26✔
2482
                if step, exists := stepMap[stepDef.Else]; exists &&
13✔
2483
                        (step.Status == StepStatusCompleted ||
13✔
2484
                                step.Status == StepStatusFailed ||
13✔
2485
                                step.Status == StepStatusCompensation) {
24✔
2486
                        return "else"
11✔
2487
                }
11✔
2488
        }
2489

2490
        // If no branch was executed, return an empty string
2491
        return ""
2✔
2492
}
2493

2494
// hasPendingStepsInParallelBranches checks if there are any pending/running steps
2495
// in parallel branches that could affect the join result
2496
func (engine *Engine) hasPendingStepsInParallelBranches(
2497
        ctx context.Context,
2498
        instanceID int64,
2499
        joinStepDef *StepDefinition,
2500
        allSteps []WorkflowStep,
2501
) bool {
51✔
2502
        // Get the fork step that created the parallel branches
51✔
2503
        forkStepName := ""
51✔
2504
        for _, waitFor := range joinStepDef.WaitFor {
149✔
2505
                // Find the fork step that created this parallel branch
98✔
2506
                // by looking for steps that have this step in their Parallel array
98✔
2507
                for _, step := range allSteps {
931✔
2508
                        if step.StepName == waitFor {
872✔
2509
                                // This is a step from a parallel branch
39✔
2510
                                // We need to find the fork step that created this branch
39✔
2511
                                forkStepName = engine.findForkStepForParallelStep(ctx, instanceID, waitFor)
39✔
2512

39✔
2513
                                break
39✔
2514
                        }
2515
                }
2516
                if forkStepName != "" {
120✔
2517
                        break
22✔
2518
                }
2519
        }
2520

2521
        if forkStepName == "" {
80✔
2522
                return false
29✔
2523
        }
29✔
2524

2525
        // Get workflow definition to find the fork step
2526
        instance, err := engine.store.GetInstance(ctx, instanceID)
22✔
2527
        if err != nil {
22✔
2528
                return false
×
2529
        }
×
2530

2531
        def, err := engine.store.GetWorkflowDefinition(ctx, instance.WorkflowID)
22✔
2532
        if err != nil {
22✔
2533
                return false
×
2534
        }
×
2535

2536
        forkStepDef, ok := def.Definition.Steps[forkStepName]
22✔
2537
        if !ok || forkStepDef.Type != StepTypeFork {
22✔
2538
                return false
×
2539
        }
×
2540

2541
        // Check if there are any pending/running steps in the parallel branches
2542
        // that are not in the WaitFor list (i.e., dynamically created steps)
2543
        for _, step := range allSteps {
143✔
2544
                if step.Status == StepStatusPending || step.Status == StepStatusRunning {
128✔
2545
                        // Check if this step belongs to one of the parallel branches
7✔
2546
                        if engine.isStepInParallelBranch(step.StepName, forkStepDef, def) {
11✔
2547
                                // Check if this step is not in the WaitFor list
4✔
2548
                                isInWaitFor := false
4✔
2549
                                for _, waitFor := range joinStepDef.WaitFor {
12✔
2550
                                        if step.StepName == waitFor {
9✔
2551
                                                isInWaitFor = true
1✔
2552

1✔
2553
                                                break
1✔
2554
                                        }
2555
                                }
2556
                                if !isInWaitFor {
7✔
2557
                                        return true
3✔
2558
                                }
3✔
2559
                        }
2560
                }
2561
        }
2562

2563
        return false
19✔
2564
}
2565

2566
// findForkStepForParallelStep finds the fork step that created the given parallel step
2567
func (engine *Engine) findForkStepForParallelStep(ctx context.Context, instanceID int64, parallelStepName string) string {
174✔
2568
        instance, err := engine.store.GetInstance(ctx, instanceID)
174✔
2569
        if err != nil {
174✔
2570
                return ""
×
2571
        }
×
2572

2573
        def, err := engine.store.GetWorkflowDefinition(ctx, instance.WorkflowID)
174✔
2574
        if err != nil {
174✔
2575
                return ""
×
2576
        }
×
2577

2578
        // Look for fork steps that have this step in their Parallel array
2579
        for stepName, stepDef := range def.Definition.Steps {
1,572✔
2580
                if stepDef.Type == StepTypeFork {
1,539✔
2581
                        for _, parallelStep := range stepDef.Parallel {
411✔
2582
                                if parallelStep == parallelStepName {
325✔
2583
                                        return stepName
55✔
2584
                                }
55✔
2585
                        }
2586
                }
2587
        }
2588

2589
        return ""
119✔
2590
}
2591

2592
// isStepInForkBranch checks if a step is part of any fork/parallel branch.
2593
func (engine *Engine) isStepInForkBranch(
2594
        ctx context.Context,
2595
        instanceID int64,
2596
        stepName string,
2597
        def *WorkflowDefinition,
2598
) bool {
29✔
2599
        return engine.findForkStepForParallelStep(ctx, instanceID, stepName) != ""
29✔
2600
}
29✔
2601

2602
// stopParallelBranchesInFork stops all active steps in parallel branches of the same fork.
2603
// PREVENTIVE MEASURE: Try to prevent parallel steps from completing during rollback.
2604
// Note: This doesn't guarantee success - steps might already be executing in workers.
2605
// The REACTIVE measure (enqueueCompletedStepsForRollback) handles steps that complete anyway.
2606
func (engine *Engine) stopParallelBranchesInFork(
2607
        ctx context.Context,
2608
        instanceID int64,
2609
        failedStepName string,
2610
        def *WorkflowDefinition,
2611
) error {
1✔
2612
        // Find the fork step that contains this failed step
1✔
2613
        forkStepName := engine.findForkStepForParallelStep(ctx, instanceID, failedStepName)
1✔
2614
        if forkStepName == "" {
1✔
NEW
2615
                // Not in a fork, nothing to stop
×
NEW
2616
                return nil
×
NEW
2617
        }
×
2618

2619
        forkStepDef, ok := def.Definition.Steps[forkStepName]
1✔
2620
        if !ok || forkStepDef.Type != StepTypeFork {
1✔
NEW
2621
                return nil
×
NEW
2622
        }
×
2623

2624
        // Get all steps in this instance
2625
        steps, err := engine.store.GetStepsByInstance(ctx, instanceID)
1✔
2626
        if err != nil {
1✔
NEW
2627
                return fmt.Errorf("get steps: %w", err)
×
NEW
2628
        }
×
2629

2630
        // Find all active steps (pending/running) in parallel branches
2631
        for _, step := range steps {
5✔
2632
                // Skip the failed step itself
4✔
2633
                if step.StepName == failedStepName {
5✔
2634
                        continue
1✔
2635
                }
2636

2637
                // Check if this step is in a parallel branch
2638
                if !engine.isStepInParallelBranch(step.StepName, forkStepDef, def) {
4✔
2639
                        continue
1✔
2640
                }
2641

2642
                // Only stop active steps (pending/running)
2643
                if step.Status != StepStatusPending && step.Status != StepStatusRunning {
3✔
2644
                        continue
1✔
2645
                }
2646

2647
                // Mark step as skipped
2648
                skipMsg := "Skipped due to parallel branch failure"
1✔
2649
                if err := engine.store.UpdateStep(ctx, step.ID, StepStatusSkipped, nil, &skipMsg); err != nil {
1✔
NEW
2650
                        slog.Warn("[floxy] failed to skip step in parallel branch", "step_id", step.ID, "error", err)
×
NEW
2651
                        continue
×
2652
                }
2653

2654
                _ = engine.store.LogEvent(ctx, instanceID, &step.ID, EventStepFailed, map[string]any{
1✔
2655
                        KeyStepName: step.StepName,
1✔
2656
                        KeyReason:   "parallel_branch_failure",
1✔
2657
                        KeyMessage:  skipMsg,
1✔
2658
                })
1✔
2659
        }
2660

2661
        return nil
1✔
2662
}
2663

2664
// isStepInParallelBranch checks if a step belongs to one of the parallel branches
2665
func (engine *Engine) isStepInParallelBranch(stepName string, forkStepDef *StepDefinition, def *WorkflowDefinition) bool {
16✔
2666
        // Check if this step is a direct parallel step
16✔
2667
        for _, parallelStep := range forkStepDef.Parallel {
45✔
2668
                if stepName == parallelStep {
33✔
2669
                        return true
4✔
2670
                }
4✔
2671
        }
2672

2673
        // Check if this step is a descendant of any parallel step
2674
        for _, parallelStep := range forkStepDef.Parallel {
30✔
2675
                if engine.isStepDescendantOf(stepName, parallelStep, def) {
24✔
2676
                        return true
6✔
2677
                }
6✔
2678
        }
2679

2680
        return false
6✔
2681
}
2682

2683
// isStepDescendantOf checks if a step is a descendant of another step
2684
func (engine *Engine) isStepDescendantOf(stepName, ancestorStepName string, def *WorkflowDefinition) bool {
26✔
2685
        visited := make(map[string]bool)
26✔
2686

26✔
2687
        for stepName != "" {
101✔
2688
                if visited[stepName] {
76✔
2689
                        break // Prevent infinite loops
1✔
2690
                }
2691
                visited[stepName] = true
74✔
2692

74✔
2693
                stepDef, ok := def.Definition.Steps[stepName]
74✔
2694
                if !ok {
89✔
2695
                        break
15✔
2696
                }
2697

2698
                if stepName == ancestorStepName {
69✔
2699
                        return true
10✔
2700
                }
10✔
2701

2702
                // Check if this step is a descendant through Next or Else
2703
                if stepDef.Prev != "" {
98✔
2704
                        stepName = stepDef.Prev
49✔
2705
                } else {
49✔
2706
                        break
×
2707
                }
2708
        }
2709

2710
        return false
16✔
2711
}
2712

2713
// isTerminalStepInForkBranch checks if a step is a terminal step in a fork branch.
2714
// A step is terminal if it has no Next steps, is not a Join step, and is in a fork branch.
2715
func (engine *Engine) isTerminalStepInForkBranch(
2716
        ctx context.Context,
2717
        instanceID int64,
2718
        stepName string,
2719
        def *WorkflowDefinition,
2720
) bool {
210✔
2721
        stepDef, ok := def.Definition.Steps[stepName]
210✔
2722
        if !ok {
210✔
2723
                return false
×
2724
        }
×
2725

2726
        // Join steps are not terminal (they are the end of fork branches)
2727
        if stepDef.Type == StepTypeJoin {
219✔
2728
                return false
9✔
2729
        }
9✔
2730

2731
        // A step is terminal if it has no Next steps
2732
        if len(stepDef.Next) > 0 {
366✔
2733
                return false
165✔
2734
        }
165✔
2735

2736
        // Check if this step is actually in a fork branch by finding the fork step
2737
        forkStepName := engine.findForkStepForParallelStep(ctx, instanceID, stepName)
36✔
2738
        if forkStepName != "" {
51✔
2739
                return true
15✔
2740
        }
15✔
2741

2742
        // Try to find fork by traversing backwards through Prev
2743
        visited := make(map[string]bool)
21✔
2744
        current := stepDef.Prev
21✔
2745
        for current != "" && current != rootStepName {
61✔
2746
                if visited[current] {
40✔
2747
                        break
×
2748
                }
2749
                visited[current] = true
40✔
2750

40✔
2751
                currentDef, ok := def.Definition.Steps[current]
40✔
2752
                if !ok {
40✔
2753
                        break
×
2754
                }
2755

2756
                if currentDef.Type == StepTypeFork {
46✔
2757
                        return true
6✔
2758
                }
6✔
2759

2760
                // Check if this step is in the Parallel array of a fork
2761
                for _, defStep := range def.Definition.Steps {
237✔
2762
                        if defStep.Type == StepTypeFork {
217✔
2763
                                for _, parallelStep := range defStep.Parallel {
44✔
2764
                                        if parallelStep == current {
34✔
2765
                                                return true
4✔
2766
                                        }
4✔
2767
                                }
2768
                        }
2769
                }
2770

2771
                current = currentDef.Prev
30✔
2772
        }
2773

2774
        return false
11✔
2775
}
2776

2777
// findJoinStepForForkBranch finds the Join step that should wait for steps in the given fork branch.
2778
// It looks for a Join step that follows the fork step which created the branch containing stepName.
2779
func (engine *Engine) findJoinStepForForkBranch(
2780
        ctx context.Context,
2781
        instanceID int64,
2782
        stepName string,
2783
        def *WorkflowDefinition,
2784
) (string, error) {
67✔
2785
        // Find the fork step that created the branch containing this step
67✔
2786
        forkStepName := engine.findForkStepForParallelStep(ctx, instanceID, stepName)
67✔
2787
        if forkStepName == "" {
119✔
2788
                // Try to find fork by traversing backwards through Prev
52✔
2789
                stepDef, ok := def.Definition.Steps[stepName]
52✔
2790
                if !ok {
52✔
2791
                        return "", nil
×
2792
                }
×
2793

2794
                // Traverse backwards to find a fork step
2795
                visited := make(map[string]bool)
52✔
2796
                current := stepDef.Prev
52✔
2797
                for current != "" && current != rootStepName {
158✔
2798
                        if visited[current] {
106✔
2799
                                break
×
2800
                        }
2801
                        visited[current] = true
106✔
2802

106✔
2803
                        currentDef, ok := def.Definition.Steps[current]
106✔
2804
                        if !ok {
106✔
2805
                                break
×
2806
                        }
2807

2808
                        if currentDef.Type == StepTypeFork {
112✔
2809
                                forkStepName = current
6✔
2810
                                break
6✔
2811
                        }
2812

2813
                        // Check if this step is in the Parallel array of a fork
2814
                        for name, defStep := range def.Definition.Steps {
979✔
2815
                                if defStep.Type == StepTypeFork {
948✔
2816
                                        for _, parallelStep := range defStep.Parallel {
206✔
2817
                                                if parallelStep == current {
167✔
2818
                                                        forkStepName = name
30✔
2819
                                                        break
30✔
2820
                                                }
2821
                                        }
2822
                                        if forkStepName != "" {
99✔
2823
                                                break
30✔
2824
                                        }
2825
                                }
2826
                        }
2827

2828
                        if forkStepName != "" {
130✔
2829
                                break
30✔
2830
                        }
2831

2832
                        current = currentDef.Prev
70✔
2833
                }
2834
        }
2835

2836
        if forkStepName == "" {
83✔
2837
                return "", nil // Not in a fork branch
16✔
2838
        }
16✔
2839

2840
        // Find Join step that follows this fork step
2841
        forkStepDef, ok := def.Definition.Steps[forkStepName]
51✔
2842
        if !ok {
51✔
2843
                return "", nil
×
2844
        }
×
2845

2846
        // Look for Join step in Next steps of the fork
2847
        for _, nextStepName := range forkStepDef.Next {
102✔
2848
                nextStepDef, ok := def.Definition.Steps[nextStepName]
51✔
2849
                if ok && nextStepDef.Type == StepTypeJoin {
102✔
2850
                        return nextStepName, nil
51✔
2851
                }
51✔
2852
        }
2853

2854
        return "", nil
×
2855
}
2856

2857
// findConditionStepInBranch finds the Condition step in the branch that contains stepName.
2858
// It traverses backwards from stepName through Prev links to find the first Condition step.
2859
// Returns empty string if no Condition step is found.
2860
func (engine *Engine) findConditionStepInBranch(
2861
        stepDef *StepDefinition,
2862
        def *WorkflowDefinition,
2863
) string {
269✔
2864
        visited := make(map[string]bool)
269✔
2865
        current := stepDef.Prev
269✔
2866

269✔
2867
        for current != "" && current != rootStepName {
682✔
2868
                if visited[current] {
413✔
2869
                        break // Prevent cycles
×
2870
                }
2871
                visited[current] = true
413✔
2872

413✔
2873
                currentDef, ok := def.Definition.Steps[current]
413✔
2874
                if !ok {
413✔
2875
                        break
×
2876
                }
2877

2878
                // Check if this is a Condition step
2879
                if currentDef.Type == StepTypeCondition {
455✔
2880
                        return current
42✔
2881
                }
42✔
2882

2883
                // Continue traversing backwards
2884
                current = currentDef.Prev
371✔
2885
        }
2886

2887
        return ""
227✔
2888
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc