• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

rom8726 / floxy / 19686916629

25 Nov 2025 11:11PM UTC coverage: 46.304% (+0.2%) from 46.116%
19686916629

Pull #24

github

rom8726
Add rollback logic to handle nested Fork/Join workflows with cycle prevention.
Pull Request #24: Add rollback logic to handle nested Fork/Join workflows

69 of 86 new or added lines in 1 file covered. (80.23%)

2 existing lines in 2 files now uncovered.

5375 of 11608 relevant lines covered (46.3%)

79.4 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

28.06
/sqlite_store.go
1
package floxy
2

3
import (
4
        "context"
5
        "database/sql"
6
        "encoding/json"
7
        "errors"
8
        "fmt"
9
        "math"
10
        "sync"
11
        "time"
12

13
        _ "modernc.org/sqlite"
14
)
15

16
// Ensure interface compliance
17
var _ Store = (*SQLiteStore)(nil)
18

19
const (
20
        // MinAgingRate is the minimum allowed value for aging rate (0.0 = no aging)
21
        MinAgingRate = 0.0
22
        // MaxAgingRate is the maximum allowed value for aging rate to prevent excessive priority boosts
23
        MaxAgingRate = 100.0
24
)
25

26
// SQLiteStore provides a lightweight Store backed by SQLite.
27
// It implements only the subset of capabilities required by the SQLite tests.
28
// Non‑essential methods return a not-implemented error.
29
type SQLiteStore struct {
30
        db           *sql.DB
31
        agingEnabled bool
32
        agingRate    float64
33
        mu           sync.Mutex // serialize critical sections for SQLite
34
}
35

36
// NewSQLiteStore creates a persistent SQLite database stored in a file and initializes schema.
37
// The filepath parameter specifies the path to the SQLite database file.
38
// If the file doesn't exist, it will be created automatically.
39
func NewSQLiteStore(filepath string) (*SQLiteStore, error) {
2✔
40
        if filepath == "" {
2✔
41
                return nil, fmt.Errorf("filepath cannot be empty")
×
42
        }
×
43
        return newSQLiteStore(filepath, false)
2✔
44
}
45

46
// NewSQLiteInMemoryStore creates an in-memory SQLite database and initializes schema.
47
// This is useful for testing. For production use, prefer NewSQLiteStore with a file path.
48
func NewSQLiteInMemoryStore() (*SQLiteStore, error) {
5✔
49
        return newSQLiteStore(":memory:", true)
5✔
50
}
5✔
51

52
// newSQLiteStore is an internal helper function that creates a SQLite store.
53
// isInMemory indicates whether this is an in-memory database.
54
func newSQLiteStore(dsn string, isInMemory bool) (*SQLiteStore, error) {
7✔
55
        db, err := sql.Open("sqlite", dsn)
7✔
56
        if err != nil {
7✔
57
                return nil, fmt.Errorf("open sqlite: %w", err)
×
58
        }
×
59

60
        // Configure SQLite pragmas for better performance and reliability
61
        _, _ = db.Exec("PRAGMA journal_mode=WAL;")
7✔
62
        _, _ = db.Exec("PRAGMA foreign_keys=ON;")
7✔
63
        _, _ = db.Exec("PRAGMA busy_timeout=5000;")
7✔
64

7✔
65
        if isInMemory {
12✔
66
                // Single connection for in-memory databases to avoid locks
5✔
67
                db.SetMaxOpenConns(1)
5✔
68
                db.SetMaxIdleConns(1)
5✔
69
        } else {
7✔
70
                // For persistent databases, allow multiple connections for better concurrency
2✔
71
                db.SetMaxOpenConns(10)
2✔
72
                db.SetMaxIdleConns(5)
2✔
73
        }
2✔
74

75
        store := &SQLiteStore{db: db, agingEnabled: true, agingRate: 0.5}
7✔
76
        if err := RunSQLiteMigrations(context.Background(), db); err != nil {
7✔
77
                _ = db.Close()
×
78
                return nil, err
×
79
        }
×
80
        return store, nil
7✔
81
}
82

83
func (s *SQLiteStore) SetAgingEnabled(enabled bool) { s.agingEnabled = enabled }
1✔
84

85
// SetAgingRate sets the aging rate for queue priority adjustment.
86
// The rate is clamped to [MinAgingRate, MaxAgingRate] to prevent SQL injection
87
// and ensure valid SQL expressions. NaN and Inf values are clamped to 0.0.
88
func (s *SQLiteStore) SetAgingRate(rate float64) {
9✔
89
        s.agingRate = clampAgingRate(rate)
9✔
90
}
9✔
91

92
// clampAgingRate ensures the aging rate is within valid bounds.
93
// Returns 0.0 for NaN/Inf, and clamps to [MinAgingRate, MaxAgingRate].
94
func clampAgingRate(rate float64) float64 {
339✔
95
        // Handle NaN and Inf
339✔
96
        if math.IsNaN(rate) || math.IsInf(rate, 0) {
342✔
97
                return MinAgingRate
3✔
98
        }
3✔
99
        // Clamp to valid range
100
        if rate < MinAgingRate {
337✔
101
                return MinAgingRate
1✔
102
        }
1✔
103
        if rate > MaxAgingRate {
336✔
104
                return MaxAgingRate
1✔
105
        }
1✔
106
        return rate
334✔
107
}
108

109
// Close closes the database connection.
110
// It's recommended to call this method when the store is no longer needed,
111
// especially for persistent SQLite stores.
112
func (s *SQLiteStore) Close() error {
3✔
113
        if s.db != nil {
6✔
114
                return s.db.Close()
3✔
115
        }
3✔
116
        return nil
×
117
}
118

119
// Definitions
120
func (s *SQLiteStore) SaveWorkflowDefinition(ctx context.Context, def *WorkflowDefinition) error {
3✔
121
        definitionJSON, err := json.Marshal(def.Definition)
3✔
122
        if err != nil {
3✔
123
                return err
×
124
        }
×
125
        const query = `INSERT INTO workflow_definitions (id, name, version, definition, created_at)
3✔
126
                VALUES(?, ?, ?, ?, ?)
3✔
127
                ON CONFLICT(name, version) DO UPDATE SET definition=excluded.definition`
3✔
128
        if _, err := s.db.ExecContext(
3✔
129
                ctx, query, def.ID, def.Name, def.Version, definitionJSON, time.Now(),
3✔
130
        ); err != nil {
3✔
131
                return err
×
132
        }
×
133
        // We keep provided ID/CreatedAt; in SQLite we don't fetch RETURNING here.
134
        return nil
3✔
135
}
136

137
func (s *SQLiteStore) GetWorkflowDefinition(ctx context.Context, id string) (*WorkflowDefinition, error) {
33✔
138
        const query = `SELECT id, name, version, definition, created_at
33✔
139
                FROM workflow_definitions
33✔
140
                WHERE id=?`
33✔
141
        row := s.db.QueryRowContext(ctx, query, id)
33✔
142
        var def WorkflowDefinition
33✔
143
        var defJSON []byte
33✔
144
        if err := row.Scan(
33✔
145
                &def.ID, &def.Name, &def.Version, &defJSON, &def.CreatedAt,
33✔
146
        ); err != nil {
33✔
147
                if errors.Is(err, sql.ErrNoRows) {
×
148
                        return nil, ErrEntityNotFound
×
149
                }
×
150
                return nil, err
×
151
        }
152
        if err := json.Unmarshal(defJSON, &def.Definition); err != nil {
33✔
153
                return nil, err
×
154
        }
×
155
        return &def, nil
33✔
156
}
157

158
// Instances
159
func (s *SQLiteStore) CreateInstance(ctx context.Context, workflowID string, input json.RawMessage) (*WorkflowInstance, error) {
3✔
160
        now := time.Now()
3✔
161
        const query = `INSERT INTO workflow_instances (workflow_id, status, input, created_at, updated_at)
3✔
162
                VALUES(?, ?, ?, ?, ?)`
3✔
163
        res, err := s.db.ExecContext(ctx, query, workflowID, StatusPending, input, now, now)
3✔
164
        if err != nil {
3✔
165
                return nil, err
×
166
        }
×
167
        id, _ := res.LastInsertId()
3✔
168
        return s.GetInstance(ctx, id)
3✔
169
}
170

171
func (s *SQLiteStore) UpdateInstanceStatus(ctx context.Context, instanceID int64, status WorkflowStatus, output json.RawMessage, errMsg *string) error {
5✔
172
        now := time.Now()
5✔
173
        // Update completed_at when status is completed, failed, or cancelled
5✔
174
        // Update started_at when status is running and started_at is NULL
5✔
175
        const query = `UPDATE workflow_instances 
5✔
176
                SET status=?, output=?, error=?, updated_at=?,
5✔
177
                        completed_at = CASE WHEN ? IN ('completed', 'failed', 'cancelled') THEN ? ELSE completed_at END,
5✔
178
                        started_at = CASE WHEN started_at IS NULL AND ? = 'running' THEN ? ELSE started_at END
5✔
179
                WHERE id=?`
5✔
180
        _, err := s.db.ExecContext(
5✔
181
                ctx, query, status, output, errMsg, now, status, now, status, now, instanceID,
5✔
182
        )
5✔
183
        return err
5✔
184
}
5✔
185

186
func (s *SQLiteStore) GetInstance(ctx context.Context, instanceID int64) (*WorkflowInstance, error) {
24✔
187
        const query = `SELECT id, workflow_id, status, input, output, error,
24✔
188
                        started_at, completed_at, created_at, updated_at
24✔
189
                FROM workflow_instances
24✔
190
                WHERE id=?`
24✔
191
        row := s.db.QueryRowContext(ctx, query, instanceID)
24✔
192
        var inst WorkflowInstance
24✔
193
        var inputBytes, outputBytes []byte
24✔
194
        if err := row.Scan(
24✔
195
                &inst.ID, &inst.WorkflowID, &inst.Status, &inputBytes, &outputBytes, &inst.Error,
24✔
196
                &inst.StartedAt, &inst.CompletedAt, &inst.CreatedAt, &inst.UpdatedAt,
24✔
197
        ); err != nil {
24✔
198
                if errors.Is(err, sql.ErrNoRows) {
×
199
                        return nil, ErrEntityNotFound
×
200
                }
×
201
                return nil, err
×
202
        }
203
        inst.Input = json.RawMessage(inputBytes)
24✔
204
        if outputBytes != nil {
25✔
205
                inst.Output = json.RawMessage(outputBytes)
1✔
206
        } else {
24✔
207
                inst.Output = nil
23✔
208
        }
23✔
209
        return &inst, nil
24✔
210
}
211

212
// Steps
213
func (s *SQLiteStore) CreateStep(ctx context.Context, step *WorkflowStep) error {
5✔
214
        now := time.Now()
5✔
215
        const query = `INSERT INTO workflow_steps (
5✔
216
                instance_id, step_name, step_type, status, input, output, error, retry_count,
5✔
217
                max_retries, compensation_retry_count, idempotency_key, started_at, completed_at, created_at)
5✔
218
                VALUES(?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)`
5✔
219
        res, err := s.db.ExecContext(ctx, query,
5✔
220
                step.InstanceID, step.StepName, step.StepType, step.Status, step.Input, step.Output, step.Error,
5✔
221
                step.RetryCount, step.MaxRetries, step.CompensationRetryCount, step.IdempotencyKey,
5✔
222
                step.StartedAt, step.CompletedAt, now,
5✔
223
        )
5✔
224
        if err != nil {
5✔
225
                return err
×
226
        }
×
227
        id, _ := res.LastInsertId()
5✔
228
        step.ID = id
5✔
229
        step.CreatedAt = now
5✔
230
        return nil
5✔
231
}
232

233
func (s *SQLiteStore) UpdateStep(ctx context.Context, stepID int64, status StepStatus, output json.RawMessage, errMsg *string) error {
12✔
234
        now := time.Now()
12✔
235
        // Update completed_at when status is completed, failed, skipped, or rolled_back
12✔
236
        // Update started_at when status is running and started_at is NULL
12✔
237
        // Increment retry_count when status is failed
12✔
238
        const query = `UPDATE workflow_steps 
12✔
239
                SET status=?, output=?, error=?,
12✔
240
                        completed_at = CASE WHEN ? IN ('completed', 'failed', 'skipped', 'rolled_back') THEN ? ELSE completed_at END,
12✔
241
                        started_at = CASE WHEN started_at IS NULL AND ? = 'running' THEN ? ELSE started_at END,
12✔
242
                        retry_count = CASE WHEN ? = 'failed' THEN retry_count + 1 ELSE retry_count END
12✔
243
                WHERE id=?`
12✔
244
        _, err := s.db.ExecContext(ctx, query, status, output, errMsg, status, now, status, now, status, stepID)
12✔
245
        return err
12✔
246
}
12✔
247

248
func (s *SQLiteStore) GetStepsByInstance(ctx context.Context, instanceID int64) ([]WorkflowStep, error) {
14✔
249
        const query = `SELECT id, instance_id, step_name, step_type, status, input, output, error,
14✔
250
                        retry_count, max_retries, compensation_retry_count, idempotency_key,
14✔
251
                        started_at, completed_at, created_at
14✔
252
                FROM workflow_steps
14✔
253
                WHERE instance_id=?
14✔
254
                ORDER BY id`
14✔
255
        rows, err := s.db.QueryContext(ctx, query, instanceID)
14✔
256
        if err != nil {
14✔
257
                return nil, err
×
258
        }
×
259
        defer rows.Close()
14✔
260
        var res []WorkflowStep
14✔
261
        for rows.Next() {
44✔
262
                var srec WorkflowStep
30✔
263
                var inputBytes, outputBytes []byte
30✔
264
                if err := rows.Scan(
30✔
265
                        &srec.ID, &srec.InstanceID, &srec.StepName, &srec.StepType, &srec.Status,
30✔
266
                        &inputBytes, &outputBytes, &srec.Error, &srec.RetryCount, &srec.MaxRetries,
30✔
267
                        &srec.CompensationRetryCount, &srec.IdempotencyKey, &srec.StartedAt,
30✔
268
                        &srec.CompletedAt, &srec.CreatedAt,
30✔
269
                ); err != nil {
30✔
270
                        return nil, err
×
271
                }
×
272
                srec.Input = json.RawMessage(inputBytes)
30✔
273
                if outputBytes != nil {
54✔
274
                        srec.Output = json.RawMessage(outputBytes)
24✔
275
                }
24✔
276
                res = append(res, srec)
30✔
277
        }
278
        return res, nil
14✔
279
}
280

281
// Queue
282
func (s *SQLiteStore) EnqueueStep(ctx context.Context, instanceID int64, stepID *int64, priority Priority, delay time.Duration) error {
8✔
283
        sched := time.Now().Add(delay)
8✔
284
        const query = `INSERT INTO queue (instance_id, step_id, scheduled_at, priority)
8✔
285
                VALUES(?, ?, ?, ?)`
8✔
286
        _, err := s.db.ExecContext(ctx, query, instanceID, stepID, sched, int(priority))
8✔
287
        return err
8✔
288
}
8✔
289

290
func (s *SQLiteStore) UpdateStepCompensationRetry(ctx context.Context, stepID int64, retryCount int, status StepStatus) error {
1✔
291
        const query = `UPDATE workflow_steps
1✔
292
                SET compensation_retry_count=?, status=?
1✔
293
                WHERE id=?`
1✔
294
        _, err := s.db.ExecContext(ctx, query, retryCount, status, stepID)
1✔
295
        return err
1✔
296
}
1✔
297

298
func (s *SQLiteStore) DequeueStep(ctx context.Context, workerID string) (*QueueItem, error) {
331✔
299
        s.mu.Lock()
331✔
300
        defer s.mu.Unlock()
331✔
301
        // Simple transactional dequeue.
331✔
302
        tx, err := s.db.BeginTx(ctx, &sql.TxOptions{})
331✔
303
        if err != nil {
332✔
304
                return nil, err
1✔
305
        }
1✔
306
        defer func() {
660✔
307
                if tx != nil {
652✔
308
                        _ = tx.Rollback()
322✔
309
                }
322✔
310
        }()
311

312
        orderExpr := "priority"
330✔
313
        if s.agingEnabled {
660✔
314
                // Clamp aging rate to ensure valid SQL expression (defense in depth)
330✔
315
                rate := clampAgingRate(s.agingRate)
330✔
316
                // effective_priority = min(100, priority + floor(wait_seconds * rate))
330✔
317
                orderExpr = fmt.Sprintf(
330✔
318
                        "MIN(100, priority + CAST(((strftime('%%s','now') - strftime('%%s', scheduled_at)) * %.6f) AS INTEGER))",
330✔
319
                        rate,
330✔
320
                )
330✔
321
        }
330✔
322

323
        row := tx.QueryRowContext(
330✔
324
                ctx,
330✔
325
                fmt.Sprintf(`
330✔
326
                        SELECT id, instance_id, step_id, scheduled_at, attempted_at, attempted_by, priority
330✔
327
                        FROM queue
330✔
328
                        WHERE scheduled_at <= ? AND (attempted_by IS NULL)
330✔
329
                        ORDER BY %s DESC, scheduled_at ASC, id ASC
330✔
330
                        LIMIT 1`,
330✔
331
                        orderExpr,
330✔
332
                ),
330✔
333
                time.Now(),
330✔
334
        )
330✔
335
        var qi QueueItem
330✔
336
        if err := row.Scan(
330✔
337
                &qi.ID, &qi.InstanceID, &qi.StepID, &qi.ScheduledAt,
330✔
338
                &qi.AttemptedAt, &qi.AttemptedBy, &qi.Priority,
330✔
339
        ); err != nil {
652✔
340
                if errors.Is(err, sql.ErrNoRows) {
644✔
341
                        return nil, nil
322✔
342
                }
322✔
UNCOV
343
                return nil, err
×
344
        }
345
        // mark as attempted by worker
346
        now := time.Now()
8✔
347
        res, err := tx.ExecContext(
8✔
348
                ctx,
8✔
349
                `UPDATE queue
8✔
350
                        SET attempted_at=?, attempted_by=?
8✔
351
                        WHERE id=? AND attempted_by IS NULL`,
8✔
352
                now, workerID, qi.ID,
8✔
353
        )
8✔
354
        if err != nil {
8✔
355
                return nil, err
×
356
        }
×
357
        if rows, _ := res.RowsAffected(); rows == 0 {
8✔
358
                // another worker claimed it; let caller retry
×
359
                return nil, nil
×
360
        }
×
361
        qi.AttemptedAt = &now
8✔
362
        qi.AttemptedBy = &workerID
8✔
363

8✔
364
        if err := tx.Commit(); err != nil {
8✔
365
                return nil, err
×
366
        }
×
367
        tx = nil
8✔
368
        return &qi, nil
8✔
369
}
370

371
func (s *SQLiteStore) RemoveFromQueue(ctx context.Context, queueID int64) error {
7✔
372
        _, err := s.db.ExecContext(
7✔
373
                ctx, `DELETE FROM queue WHERE id=?`, queueID,
7✔
374
        )
7✔
375
        return err
7✔
376
}
7✔
377

378
func (s *SQLiteStore) ReleaseQueueItem(ctx context.Context, queueID int64) error {
×
379
        _, err := s.db.ExecContext(
×
380
                ctx,
×
381
                `UPDATE queue
×
382
                        SET attempted_at=NULL, attempted_by=NULL
×
383
                        WHERE id=?`,
×
384
                queueID,
×
385
        )
×
386
        return err
×
387
}
×
388

389
func (s *SQLiteStore) RescheduleAndReleaseQueueItem(ctx context.Context, queueID int64, delay time.Duration) error {
×
390
        sched := time.Now().Add(delay)
×
391
        _, err := s.db.ExecContext(
×
392
                ctx,
×
393
                `UPDATE queue
×
394
                        SET scheduled_at=?, attempted_at=NULL, attempted_by=NULL
×
395
                        WHERE id=?`,
×
396
                sched, queueID,
×
397
        )
×
398
        return err
×
399
}
×
400

401
func (s *SQLiteStore) LogEvent(ctx context.Context, instanceID int64, stepID *int64, eventType string, payload any) error {
17✔
402
        payloadJSON, err := json.Marshal(payload)
17✔
403
        if err != nil {
17✔
404
                return err
×
405
        }
×
406
        _, err = s.db.ExecContext(
17✔
407
                ctx,
17✔
408
                `INSERT INTO workflow_events (instance_id, step_id, event_type, payload, created_at)
17✔
409
                        VALUES(?, ?, ?, ?, ?)`,
17✔
410
                instanceID, stepID, eventType, payloadJSON, time.Now(),
17✔
411
        )
17✔
412
        return err
17✔
413
}
414

415
func (s *SQLiteStore) GetWorkflowEvents(ctx context.Context, instanceID int64) ([]WorkflowEvent, error) {
1✔
416
        rows, err := s.db.QueryContext(
1✔
417
                ctx,
1✔
418
                `SELECT id, instance_id, step_id, event_type, payload, created_at
1✔
419
                        FROM workflow_events
1✔
420
                        WHERE instance_id=?
1✔
421
                        ORDER BY id`,
1✔
422
                instanceID,
1✔
423
        )
1✔
424
        if err != nil {
1✔
425
                return nil, err
×
426
        }
×
427
        defer rows.Close()
1✔
428
        var res []WorkflowEvent
1✔
429
        for rows.Next() {
2✔
430
                var ev WorkflowEvent
1✔
431
                if err := rows.Scan(&ev.ID, &ev.InstanceID, &ev.StepID, &ev.EventType, &ev.Payload, &ev.CreatedAt); err != nil {
1✔
432
                        return nil, err
×
433
                }
×
434
                res = append(res, ev)
1✔
435
        }
436
        return res, nil
1✔
437
}
438

439
func (s *SQLiteStore) CreateJoinState(ctx context.Context, instanceID int64, joinStepName string, waitingFor []string, strategy JoinStrategy) error {
×
440
        wf, _ := json.Marshal(waitingFor)
×
441
        now := time.Now()
×
442
        _, err := s.db.ExecContext(
×
443
                ctx,
×
444
                `INSERT INTO join_states (
×
445
                        instance_id, join_step_name, waiting_for, completed, failed,
×
446
                        join_strategy, is_ready, created_at, updated_at
×
447
                ) VALUES(?, ?, ?, '[]', '[]', ?, 0, ?, ?)`,
×
448
                instanceID, joinStepName, string(wf), strategy, now, now,
×
449
        )
×
450
        return err
×
451
}
×
452

453
func (s *SQLiteStore) UpdateJoinState(ctx context.Context, instanceID int64, joinStepName, completedStep string, success bool) (bool, error) {
×
454
        row := s.db.QueryRowContext(ctx, `SELECT waiting_for, completed, failed, join_strategy FROM join_states WHERE instance_id=? AND join_step_name=?`, instanceID, joinStepName)
×
455
        var waitingJSON, completedJSON, failedJSON string
×
456
        var strategy JoinStrategy
×
457
        if err := row.Scan(&waitingJSON, &completedJSON, &failedJSON, &strategy); err != nil {
×
458
                if errors.Is(err, sql.ErrNoRows) {
×
459
                        // create default with this single step
×
460
                        _ = s.CreateJoinState(ctx, instanceID, joinStepName, []string{completedStep}, JoinStrategyAll)
×
461
                        waitingJSON = "[\"" + completedStep + "\"]"
×
462
                        completedJSON = "[]"
×
463
                        failedJSON = "[]"
×
464
                        strategy = JoinStrategyAll
×
465
                } else {
×
466
                        return false, err
×
467
                }
×
468
        }
469
        var waitingFor, completed, failed []string
×
470
        _ = json.Unmarshal([]byte(waitingJSON), &waitingFor)
×
471
        _ = json.Unmarshal([]byte(completedJSON), &completed)
×
472
        _ = json.Unmarshal([]byte(failedJSON), &failed)
×
473
        // append to completed or failed if not present
×
474
        target := &completed
×
475
        if !success {
×
476
                target = &failed
×
477
        }
×
478
        found := false
×
479
        for _, v := range *target {
×
480
                if v == completedStep {
×
481
                        found = true
×
482
                        break
×
483
                }
484
        }
485
        if !found {
×
486
                *target = append(*target, completedStep)
×
487
        }
×
488
        // recompute readiness
489
        isReady := false
×
490
        if strategy == JoinStrategyAny {
×
491
                isReady = len(completed) > 0 || len(failed) > 0
×
492
        } else {
×
493
                totalProcessed := len(completed) + len(failed)
×
494
                isReady = totalProcessed >= len(waitingFor)
×
495
        }
×
496
        compJSON, _ := json.Marshal(completed)
×
497
        failJSON, _ := json.Marshal(failed)
×
498
        _, err := s.db.ExecContext(
×
499
                ctx,
×
500
                `UPDATE join_states
×
501
                        SET completed=?, failed=?, is_ready=?, updated_at=?
×
502
                        WHERE instance_id=? AND join_step_name=?`,
×
503
                string(compJSON), string(failJSON), boolToInt(isReady), time.Now(),
×
504
                instanceID, joinStepName,
×
505
        )
×
506
        return isReady, err
×
507
}
508

509
func (s *SQLiteStore) GetJoinState(ctx context.Context, instanceID int64, joinStepName string) (*JoinState, error) {
×
510
        row := s.db.QueryRowContext(
×
511
                ctx,
×
512
                `SELECT waiting_for, completed, failed, join_strategy, is_ready,
×
513
                        created_at, updated_at
×
514
                        FROM join_states
×
515
                        WHERE instance_id=? AND join_step_name=?`,
×
516
                instanceID, joinStepName,
×
517
        )
×
518
        var waitingJSON, completedJSON, failedJSON string
×
519
        var strategy JoinStrategy
×
520
        var isReadyInt int
×
521
        var createdAt, updatedAt time.Time
×
522
        if err := row.Scan(&waitingJSON, &completedJSON, &failedJSON, &strategy, &isReadyInt, &createdAt, &updatedAt); err != nil {
×
523
                if errors.Is(err, sql.ErrNoRows) {
×
524
                        return nil, ErrEntityNotFound
×
525
                }
×
526
                return nil, err
×
527
        }
528
        var waitingFor, completed, failed []string
×
529
        _ = json.Unmarshal([]byte(waitingJSON), &waitingFor)
×
530
        _ = json.Unmarshal([]byte(completedJSON), &completed)
×
531
        _ = json.Unmarshal([]byte(failedJSON), &failed)
×
532
        return &JoinState{
×
533
                InstanceID:   instanceID,
×
534
                JoinStepName: joinStepName,
×
535
                WaitingFor:   waitingFor,
×
536
                Completed:    completed,
×
537
                Failed:       failed,
×
538
                JoinStrategy: strategy,
×
539
                IsReady:      isReadyInt == 1,
×
540
                CreatedAt:    createdAt,
×
541
                UpdatedAt:    updatedAt,
×
542
        }, nil
×
543
}
544

545
func (s *SQLiteStore) AddToJoinWaitFor(ctx context.Context, instanceID int64, joinStepName, stepToAdd string) error {
×
546
        row := s.db.QueryRowContext(ctx, `SELECT waiting_for, completed, failed, join_strategy FROM join_states WHERE instance_id=? AND join_step_name=?`, instanceID, joinStepName)
×
547
        var waitingJSON, completedJSON, failedJSON string
×
548
        var strategy JoinStrategy
×
549
        if err := row.Scan(&waitingJSON, &completedJSON, &failedJSON, &strategy); err != nil {
×
550
                if errors.Is(err, sql.ErrNoRows) {
×
551
                        return s.CreateJoinState(ctx, instanceID, joinStepName, []string{stepToAdd}, strategy)
×
552
                }
×
553
                return err
×
554
        }
555
        var waitingFor, completed, failed []string
×
556
        _ = json.Unmarshal([]byte(waitingJSON), &waitingFor)
×
557
        _ = json.Unmarshal([]byte(completedJSON), &completed)
×
558
        _ = json.Unmarshal([]byte(failedJSON), &failed)
×
559
        waitingFor = append(waitingFor, stepToAdd)
×
560
        isReady := checkJoinReady(waitingFor, completed, failed, strategy)
×
561
        wfJSON, _ := json.Marshal(waitingFor)
×
562
        _, err := s.db.ExecContext(ctx, `UPDATE join_states SET waiting_for=?, is_ready=?, updated_at=? WHERE instance_id=? AND join_step_name=?`, string(wfJSON), boolToInt(isReady), time.Now(), instanceID, joinStepName)
×
563
        return err
×
564
}
565

566
func (s *SQLiteStore) ReplaceInJoinWaitFor(ctx context.Context, instanceID int64, joinStepName, virtualStep, realStep string) error {
×
567
        row := s.db.QueryRowContext(ctx, `SELECT waiting_for, completed, failed, join_strategy FROM join_states WHERE instance_id=? AND join_step_name=?`, instanceID, joinStepName)
×
568
        var waitingJSON, completedJSON, failedJSON string
×
569
        var strategy JoinStrategy
×
570
        if err := row.Scan(&waitingJSON, &completedJSON, &failedJSON, &strategy); err != nil {
×
571
                if errors.Is(err, sql.ErrNoRows) {
×
572
                        return s.CreateJoinState(ctx, instanceID, joinStepName, []string{realStep}, JoinStrategyAll)
×
573
                }
×
574
                return err
×
575
        }
576
        var waitingFor, completed, failed []string
×
577
        _ = json.Unmarshal([]byte(waitingJSON), &waitingFor)
×
578
        _ = json.Unmarshal([]byte(completedJSON), &completed)
×
579
        _ = json.Unmarshal([]byte(failedJSON), &failed)
×
580
        found := false
×
581
        for i, w := range waitingFor {
×
582
                if w == virtualStep {
×
583
                        waitingFor[i] = realStep
×
584
                        found = true
×
585
                        break
×
586
                }
587
        }
588
        if !found {
×
589
                waitingFor = append(waitingFor, realStep)
×
590
        }
×
591
        isReady := checkJoinReady(waitingFor, completed, failed, strategy)
×
592
        wfJSON, _ := json.Marshal(waitingFor)
×
593
        _, err := s.db.ExecContext(ctx, `UPDATE join_states SET waiting_for=?, is_ready=?, updated_at=? WHERE instance_id=? AND join_step_name=?`, string(wfJSON), boolToInt(isReady), time.Now(), instanceID, joinStepName)
×
594
        return err
×
595
}
596

597
func (s *SQLiteStore) GetSummaryStats(ctx context.Context) (*SummaryStats, error) {
×
598
        row := s.db.QueryRowContext(ctx, `SELECT 
×
599
                COUNT(*) as total,
×
600
                SUM(CASE WHEN status='completed' THEN 1 ELSE 0 END),
×
601
                SUM(CASE WHEN status='failed' THEN 1 ELSE 0 END),
×
602
                SUM(CASE WHEN status='running' THEN 1 ELSE 0 END),
×
603
                SUM(CASE WHEN status='pending' THEN 1 ELSE 0 END)
×
604
                FROM workflow_instances`)
×
605
        var stats SummaryStats
×
606
        if err := row.Scan(&stats.TotalWorkflows, &stats.CompletedWorkflows, &stats.FailedWorkflows,
×
607
                &stats.RunningWorkflows, &stats.PendingWorkflows); err != nil {
×
608
                return nil, err
×
609
        }
×
610
        // active = running+pending in this simplified model
611
        stats.ActiveWorkflows = uint(stats.RunningWorkflows + stats.PendingWorkflows)
×
612
        return &stats, nil
×
613
}
614
func (s *SQLiteStore) GetActiveInstances(ctx context.Context) ([]ActiveWorkflowInstance, error) {
×
615
        rows, err := s.db.QueryContext(
×
616
                ctx,
×
617
                `SELECT
×
618
                        wi.id, wi.workflow_id, COALESCE(wd.name,''), wi.status,
×
619
                        wi.created_at, wi.updated_at,
×
620
                        (SELECT step_name FROM workflow_steps WHERE instance_id=wi.id AND status='running' LIMIT 1) as current_step,
×
621
                        (SELECT COUNT(*) FROM workflow_steps WHERE instance_id=wi.id) as total_steps,
×
622
                        (SELECT COUNT(*) FROM workflow_steps WHERE instance_id=wi.id AND status='completed') as completed_steps,
×
623
                        (SELECT COUNT(*) FROM workflow_steps WHERE instance_id=wi.id AND status='rolled_back') as rolled_back_steps
×
624
                FROM workflow_instances wi
×
625
                LEFT JOIN workflow_definitions wd ON wi.workflow_id = wd.id
×
626
                WHERE wi.status IN ('running','pending','dlq')
×
627
                ORDER BY wi.created_at DESC`,
×
628
        )
×
629
        if err != nil {
×
630
                return nil, err
×
631
        }
×
632
        defer rows.Close()
×
633
        var res []ActiveWorkflowInstance
×
634
        for rows.Next() {
×
635
                var a ActiveWorkflowInstance
×
636
                var currentStep *string
×
637
                if err := rows.Scan(
×
638
                        &a.ID, &a.WorkflowID, &a.WorkflowName, &a.Status,
×
639
                        &a.StartedAt, &a.UpdatedAt, &currentStep, &a.TotalSteps,
×
640
                        &a.CompletedSteps, &a.RolledBackSteps,
×
641
                ); err != nil {
×
642
                        return nil, err
×
643
                }
×
644
                if currentStep != nil {
×
645
                        a.CurrentStep = *currentStep
×
646
                }
×
647
                res = append(res, a)
×
648
        }
649
        return res, nil
×
650
}
651
func (s *SQLiteStore) GetWorkflowDefinitions(ctx context.Context) ([]WorkflowDefinition, error) {
×
652
        rows, err := s.db.QueryContext(
×
653
                ctx,
×
654
                `SELECT id, name, version, definition, created_at
×
655
                        FROM workflow_definitions
×
656
                        ORDER BY created_at DESC`,
×
657
        )
×
658
        if err != nil {
×
659
                return nil, err
×
660
        }
×
661
        defer rows.Close()
×
662
        var res []WorkflowDefinition
×
663
        for rows.Next() {
×
664
                var d WorkflowDefinition
×
665
                var defJSON []byte
×
666
                if err := rows.Scan(&d.ID, &d.Name, &d.Version, &defJSON, &d.CreatedAt); err != nil {
×
667
                        return nil, err
×
668
                }
×
669
                _ = json.Unmarshal(defJSON, &d.Definition)
×
670
                res = append(res, d)
×
671
        }
672
        return res, nil
×
673
}
674
func (s *SQLiteStore) GetWorkflowInstances(ctx context.Context, workflowID string) ([]WorkflowInstance, error) {
×
675
        rows, err := s.db.QueryContext(
×
676
                ctx,
×
677
                `SELECT id, workflow_id, status, input, output, error,
×
678
                        started_at, completed_at, created_at, updated_at
×
679
                        FROM workflow_instances
×
680
                        WHERE workflow_id=?
×
681
                        ORDER BY id`,
×
682
                workflowID,
×
683
        )
×
684
        if err != nil {
×
685
                return nil, err
×
686
        }
×
687
        defer rows.Close()
×
688
        var res []WorkflowInstance
×
689
        for rows.Next() {
×
690
                var inst WorkflowInstance
×
691
                var inb, outb []byte
×
692
                if err := rows.Scan(&inst.ID, &inst.WorkflowID, &inst.Status, &inb, &outb, &inst.Error, &inst.StartedAt,
×
693
                        &inst.CompletedAt, &inst.CreatedAt, &inst.UpdatedAt); err != nil {
×
694
                        return nil, err
×
695
                }
×
696
                inst.Input = json.RawMessage(inb)
×
697
                if outb != nil {
×
698
                        inst.Output = json.RawMessage(outb)
×
699
                }
×
700
                res = append(res, inst)
×
701
        }
702
        return res, nil
×
703
}
704
func (s *SQLiteStore) GetAllWorkflowInstances(ctx context.Context) ([]WorkflowInstance, error) {
×
705
        rows, err := s.db.QueryContext(
×
706
                ctx,
×
707
                `SELECT id, workflow_id, status, input, output, error,
×
708
                        started_at, completed_at, created_at, updated_at
×
709
                        FROM workflow_instances
×
710
                        ORDER BY id`,
×
711
        )
×
712
        if err != nil {
×
713
                return nil, err
×
714
        }
×
715
        defer rows.Close()
×
716
        var res []WorkflowInstance
×
717
        for rows.Next() {
×
718
                var inst WorkflowInstance
×
719
                var inb, outb []byte
×
720
                if err := rows.Scan(&inst.ID, &inst.WorkflowID, &inst.Status, &inb, &outb, &inst.Error, &inst.StartedAt,
×
721
                        &inst.CompletedAt, &inst.CreatedAt, &inst.UpdatedAt); err != nil {
×
722
                        return nil, err
×
723
                }
×
724
                inst.Input = json.RawMessage(inb)
×
725
                if outb != nil {
×
726
                        inst.Output = json.RawMessage(outb)
×
727
                }
×
728
                res = append(res, inst)
×
729
        }
730
        return res, nil
×
731
}
732

733
func (s *SQLiteStore) GetWorkflowInstancesPaginated(ctx context.Context, workflowID string, offset int, limit int) ([]WorkflowInstance, int64, error) {
×
734
        row := s.db.QueryRowContext(ctx, `SELECT COUNT(*) FROM workflow_instances WHERE workflow_id=?`, workflowID)
×
735
        var total int64
×
736
        if err := row.Scan(&total); err != nil {
×
737
                return nil, 0, err
×
738
        }
×
739

740
        rows, err := s.db.QueryContext(
×
741
                ctx,
×
742
                `SELECT id, workflow_id, status, input, output, error,
×
743
                        started_at, completed_at, created_at, updated_at
×
744
                        FROM workflow_instances
×
745
                        WHERE workflow_id=?
×
746
                        ORDER BY created_at DESC
×
747
                        LIMIT ? OFFSET ?`,
×
748
                workflowID, limit, offset,
×
749
        )
×
750
        if err != nil {
×
751
                return nil, 0, err
×
752
        }
×
753
        defer rows.Close()
×
754

×
755
        var res []WorkflowInstance
×
756
        for rows.Next() {
×
757
                var inst WorkflowInstance
×
758
                var inb, outb []byte
×
759
                if err := rows.Scan(&inst.ID, &inst.WorkflowID, &inst.Status, &inb, &outb, &inst.Error, &inst.StartedAt,
×
760
                        &inst.CompletedAt, &inst.CreatedAt, &inst.UpdatedAt); err != nil {
×
761
                        return nil, 0, err
×
762
                }
×
763
                inst.Input = inb
×
764
                if outb != nil {
×
765
                        inst.Output = outb
×
766
                }
×
767
                res = append(res, inst)
×
768
        }
769
        return res, total, nil
×
770
}
771

772
func (s *SQLiteStore) GetAllWorkflowInstancesPaginated(ctx context.Context, offset int, limit int) ([]WorkflowInstance, int64, error) {
×
773
        row := s.db.QueryRowContext(ctx, `SELECT COUNT(*) FROM workflow_instances`)
×
774
        var total int64
×
775
        if err := row.Scan(&total); err != nil {
×
776
                return nil, 0, err
×
777
        }
×
778

779
        rows, err := s.db.QueryContext(
×
780
                ctx,
×
781
                `SELECT id, workflow_id, status, input, output, error,
×
782
                        started_at, completed_at, created_at, updated_at
×
783
                        FROM workflow_instances
×
784
                        ORDER BY created_at DESC
×
785
                        LIMIT ? OFFSET ?`,
×
786
                limit, offset,
×
787
        )
×
788
        if err != nil {
×
789
                return nil, 0, err
×
790
        }
×
791
        defer rows.Close()
×
792

×
793
        var res []WorkflowInstance
×
794
        for rows.Next() {
×
795
                var inst WorkflowInstance
×
796
                var inb, outb []byte
×
797
                if err := rows.Scan(&inst.ID, &inst.WorkflowID, &inst.Status, &inb, &outb, &inst.Error,
×
798
                        &inst.StartedAt, &inst.CompletedAt, &inst.CreatedAt, &inst.UpdatedAt); err != nil {
×
799
                        return nil, 0, err
×
800
                }
×
801
                inst.Input = inb
×
802
                if outb != nil {
×
803
                        inst.Output = outb
×
804
                }
×
805
                res = append(res, inst)
×
806
        }
807
        return res, total, nil
×
808
}
809

810
func (s *SQLiteStore) GetWorkflowSteps(ctx context.Context, instanceID int64) ([]WorkflowStep, error) {
×
811
        return s.GetStepsByInstance(ctx, instanceID)
×
812
}
×
813

814
func (s *SQLiteStore) GetActiveStepsForUpdate(ctx context.Context, instanceID int64) ([]WorkflowStep, error) {
×
815
        const query = `SELECT id, instance_id, step_name, step_type, status, input, output, error,
×
816
                retry_count, max_retries, compensation_retry_count, idempotency_key,
×
817
                started_at, completed_at, created_at
×
818
                FROM workflow_steps
×
819
                WHERE instance_id=? AND status IN ('pending', 'running', 'waiting_decision')
×
820
                ORDER BY created_at DESC`
×
821
        rows, err := s.db.QueryContext(ctx, query, instanceID)
×
822
        if err != nil {
×
823
                return nil, err
×
824
        }
×
825
        defer rows.Close()
×
826

×
827
        steps := make([]WorkflowStep, 0)
×
828
        for rows.Next() {
×
829
                var step WorkflowStep
×
830
                err := rows.Scan(
×
831
                        &step.ID, &step.InstanceID, &step.StepName, &step.StepType,
×
832
                        &step.Status, &step.Input, &step.Output, &step.Error,
×
833
                        &step.RetryCount, &step.MaxRetries, &step.CompensationRetryCount,
×
834
                        &step.IdempotencyKey, &step.StartedAt, &step.CompletedAt, &step.CreatedAt,
×
835
                )
×
836
                if err != nil {
×
837
                        return nil, err
×
838
                }
×
839
                steps = append(steps, step)
×
840
        }
841
        return steps, rows.Err()
×
842
}
843

844
func (s *SQLiteStore) CreateCancelRequest(ctx context.Context, req *WorkflowCancelRequest) error {
×
845
        _, err := s.db.ExecContext(
×
846
                ctx,
×
847
                `INSERT OR REPLACE INTO cancel_requests (
×
848
                        instance_id, requested_by, cancel_type, reason, created_at
×
849
                ) VALUES(?, ?, ?, ?, ?)`,
×
850
                req.InstanceID, req.RequestedBy, req.CancelType, req.Reason, time.Now(),
×
851
        )
×
852
        return err
×
853
}
×
854

855
func (s *SQLiteStore) GetCancelRequest(ctx context.Context, instanceID int64) (*WorkflowCancelRequest, error) {
5✔
856
        row := s.db.QueryRowContext(
5✔
857
                ctx,
5✔
858
                `SELECT instance_id, requested_by, cancel_type, reason, created_at
5✔
859
                        FROM cancel_requests
5✔
860
                        WHERE instance_id=?`,
5✔
861
                instanceID,
5✔
862
        )
5✔
863
        var req WorkflowCancelRequest
5✔
864
        if err := row.Scan(&req.InstanceID, &req.RequestedBy, &req.CancelType, &req.Reason, &req.CreatedAt); err != nil {
10✔
865
                if errors.Is(err, sql.ErrNoRows) {
10✔
866
                        return nil, ErrEntityNotFound
5✔
867
                }
5✔
868
                return nil, err
×
869
        }
870
        return &req, nil
×
871
}
872

873
func (s *SQLiteStore) DeleteCancelRequest(ctx context.Context, instanceID int64) error {
×
874
        _, err := s.db.ExecContext(ctx, `DELETE FROM cancel_requests WHERE instance_id=?`, instanceID)
×
875
        return err
×
876
}
×
877

878
func (s *SQLiteStore) CreateHumanDecision(ctx context.Context, decision *HumanDecisionRecord) error {
×
879
        now := time.Now()
×
880
        res, err := s.db.ExecContext(
×
881
                ctx,
×
882
                `INSERT INTO human_decisions (
×
883
                        instance_id, step_id, decided_by, decision, comment, decided_at, created_at
×
884
                ) VALUES(?, ?, ?, ?, ?, ?, ?)`,
×
885
                decision.InstanceID, decision.StepID, decision.DecidedBy,
×
886
                decision.Decision, decision.Comment, decision.DecidedAt, now,
×
887
        )
×
888
        if err != nil {
×
889
                return err
×
890
        }
×
891
        id, _ := res.LastInsertId()
×
892
        decision.ID = id
×
893
        decision.CreatedAt = now
×
894
        return nil
×
895
}
896

897
func (s *SQLiteStore) GetHumanDecision(ctx context.Context, stepID int64) (*HumanDecisionRecord, error) {
×
898
        row := s.db.QueryRowContext(
×
899
                ctx,
×
900
                `SELECT id, instance_id, step_id, decided_by, decision, comment,
×
901
                        decided_at, created_at
×
902
                        FROM human_decisions
×
903
                        WHERE step_id=?`,
×
904
                stepID,
×
905
        )
×
906
        var d HumanDecisionRecord
×
907
        if err := row.Scan(&d.ID, &d.InstanceID, &d.StepID, &d.DecidedBy, &d.Decision, &d.Comment, &d.DecidedAt, &d.CreatedAt); err != nil {
×
908
                if errors.Is(err, sql.ErrNoRows) {
×
909
                        return nil, ErrEntityNotFound
×
910
                }
×
911
                return nil, err
×
912
        }
913
        return &d, nil
×
914
}
915

916
func (s *SQLiteStore) UpdateStepStatus(ctx context.Context, stepID int64, status StepStatus) error {
×
917
        _, err := s.db.ExecContext(ctx, `UPDATE workflow_steps SET status=? WHERE id=?`, status, stepID)
×
918
        return err
×
919
}
×
920

921
func (s *SQLiteStore) GetStepByID(ctx context.Context, stepID int64) (*WorkflowStep, error) {
×
922
        row := s.db.QueryRowContext(
×
923
                ctx,
×
924
                `SELECT id, instance_id, step_name, step_type, status, input, output, error,
×
925
                        retry_count, max_retries, compensation_retry_count, idempotency_key,
×
926
                        started_at, completed_at, created_at
×
927
                        FROM workflow_steps
×
928
                        WHERE id=?`,
×
929
                stepID,
×
930
        )
×
931
        var step WorkflowStep
×
932
        var inputBytes, outputBytes []byte
×
933
        if err := row.Scan(
×
934
                &step.ID, &step.InstanceID, &step.StepName, &step.StepType, &step.Status,
×
935
                &inputBytes, &outputBytes, &step.Error, &step.RetryCount, &step.MaxRetries,
×
936
                &step.CompensationRetryCount, &step.IdempotencyKey, &step.StartedAt,
×
937
                &step.CompletedAt, &step.CreatedAt,
×
938
        ); err != nil {
×
939
                if errors.Is(err, sql.ErrNoRows) {
×
940
                        return nil, ErrEntityNotFound
×
941
                }
×
942
                return nil, err
×
943
        }
944
        step.Input = json.RawMessage(inputBytes)
×
945
        if outputBytes != nil {
×
946
                step.Output = json.RawMessage(outputBytes)
×
947
        }
×
948
        return &step, nil
×
949
}
950

951
func (s *SQLiteStore) GetHumanDecisionStepByInstanceID(ctx context.Context, instanceID int64) (*WorkflowStep, error) {
×
952
        row := s.db.QueryRowContext(
×
953
                ctx,
×
954
                `SELECT id, instance_id, step_name, step_type, status, input, output, error,
×
955
                        retry_count, max_retries, compensation_retry_count, idempotency_key,
×
956
                        started_at, completed_at, created_at
×
957
                        FROM workflow_steps
×
958
                        WHERE instance_id=? AND step_type='human'
×
959
                        ORDER BY created_at DESC
×
960
                        LIMIT 1`,
×
961
                instanceID,
×
962
        )
×
963
        var step WorkflowStep
×
964
        var inb, outb []byte
×
965
        if err := row.Scan(
×
966
                &step.ID, &step.InstanceID, &step.StepName, &step.StepType, &step.Status,
×
967
                &inb, &outb, &step.Error, &step.RetryCount, &step.MaxRetries,
×
968
                &step.CompensationRetryCount, &step.IdempotencyKey, &step.StartedAt,
×
969
                &step.CompletedAt, &step.CreatedAt,
×
970
        ); err != nil {
×
971
                if errors.Is(err, sql.ErrNoRows) {
×
972
                        return nil, ErrEntityNotFound
×
973
                }
×
974
                return nil, err
×
975
        }
976
        step.Input = json.RawMessage(inb)
×
977
        if outb != nil {
×
978
                step.Output = json.RawMessage(outb)
×
979
        }
×
980
        return &step, nil
×
981
}
982

983
func (s *SQLiteStore) CreateDeadLetterRecord(ctx context.Context, rec *DeadLetterRecord) error {
×
984
        _, err := s.db.ExecContext(
×
985
                ctx,
×
986
                `INSERT INTO workflow_dlq (
×
987
                        instance_id, workflow_id, step_id, step_name, step_type,
×
988
                        input, error, reason, created_at
×
989
                ) VALUES(?, ?, ?, ?, ?, ?, ?, ?, ?)`,
×
990
                rec.InstanceID, rec.WorkflowID, rec.StepID, rec.StepName, rec.StepType,
×
991
                rec.Input, rec.Error, rec.Reason, time.Now(),
×
992
        )
×
993
        return err
×
994
}
×
995

996
func (s *SQLiteStore) RequeueDeadLetter(ctx context.Context, dlqID int64, newInput *json.RawMessage) error {
×
997
        s.mu.Lock()
×
998
        defer s.mu.Unlock()
×
999
        tx, err := s.db.BeginTx(ctx, &sql.TxOptions{})
×
1000
        if err != nil {
×
1001
                return err
×
1002
        }
×
1003
        defer func() {
×
1004
                if tx != nil {
×
1005
                        _ = tx.Rollback()
×
1006
                }
×
1007
        }()
1008
        var instanceID, stepID int64
×
1009
        var input []byte
×
1010
        if err := tx.QueryRowContext(
×
1011
                ctx,
×
1012
                `SELECT instance_id, step_id, input
×
1013
                        FROM workflow_dlq
×
1014
                        WHERE id=?`,
×
1015
                dlqID,
×
1016
        ).Scan(&instanceID, &stepID, &input); err != nil {
×
1017
                return err
×
1018
        }
×
1019
        var setInput any
×
1020
        if newInput != nil {
×
1021
                setInput = *newInput
×
1022
        } else {
×
1023
                setInput = input
×
1024
        }
×
1025
        if _, err := tx.ExecContext(
×
1026
                ctx,
×
1027
                `UPDATE workflow_steps
×
1028
                        SET status='pending', input=?, error=NULL, retry_count=0,
×
1029
                                compensation_retry_count=0, started_at=NULL, completed_at=NULL
×
1030
                        WHERE id=?`,
×
1031
                setInput, stepID,
×
1032
        ); err != nil {
×
1033
                return err
×
1034
        }
×
1035
        if _, err := tx.ExecContext(
×
1036
                ctx,
×
1037
                `INSERT INTO queue (instance_id, step_id, scheduled_at, priority)
×
1038
                        VALUES(?, ?, ?, ?)`,
×
1039
                instanceID, stepID, time.Now(), int(PriorityNormal),
×
1040
        ); err != nil {
×
1041
                return err
×
1042
        }
×
1043
        if _, err := tx.ExecContext(
×
1044
                ctx,
×
1045
                `UPDATE workflow_instances
×
1046
                        SET status='running', error=NULL
×
1047
                        WHERE id=? AND status IN ('failed','dlq')`,
×
1048
                instanceID,
×
1049
        ); err != nil {
×
1050
                return err
×
1051
        }
×
1052
        if _, err := tx.ExecContext(ctx, `DELETE FROM workflow_dlq WHERE id=?`, dlqID); err != nil {
×
1053
                return err
×
1054
        }
×
1055
        if err := tx.Commit(); err != nil {
×
1056
                return err
×
1057
        }
×
1058
        tx = nil
×
1059
        return nil
×
1060
}
1061

1062
func (s *SQLiteStore) ListDeadLetters(ctx context.Context, offset int, limit int) ([]DeadLetterRecord, int64, error) {
×
1063
        row := s.db.QueryRowContext(ctx, `SELECT COUNT(*) FROM workflow_dlq`)
×
1064
        var total int64
×
1065
        if err := row.Scan(&total); err != nil {
×
1066
                return nil, 0, err
×
1067
        }
×
1068
        rows, err := s.db.QueryContext(
×
1069
                ctx,
×
1070
                `SELECT id, instance_id, workflow_id, step_id, step_name, step_type,
×
1071
                        input, error, reason, created_at
×
1072
                        FROM workflow_dlq
×
1073
                        ORDER BY created_at DESC
×
1074
                        LIMIT ? OFFSET ?`,
×
1075
                limit, offset,
×
1076
        )
×
1077
        if err != nil {
×
1078
                return nil, 0, err
×
1079
        }
×
1080
        defer rows.Close()
×
1081
        var res []DeadLetterRecord
×
1082
        for rows.Next() {
×
1083
                var r DeadLetterRecord
×
1084
                if err := rows.Scan(&r.ID, &r.InstanceID, &r.WorkflowID, &r.StepID, &r.StepName, &r.StepType, &r.Input, &r.Error, &r.Reason, &r.CreatedAt); err != nil {
×
1085
                        return nil, 0, err
×
1086
                }
×
1087
                res = append(res, r)
×
1088
        }
1089
        return res, total, nil
×
1090
}
1091

1092
func (s *SQLiteStore) GetDeadLetterByID(ctx context.Context, id int64) (*DeadLetterRecord, error) {
×
1093
        row := s.db.QueryRowContext(
×
1094
                ctx,
×
1095
                `SELECT id, instance_id, workflow_id, step_id, step_name, step_type,
×
1096
                        input, error, reason, created_at
×
1097
                        FROM workflow_dlq
×
1098
                        WHERE id=?`,
×
1099
                id,
×
1100
        )
×
1101
        var r DeadLetterRecord
×
1102
        if err := row.Scan(&r.ID, &r.InstanceID, &r.WorkflowID, &r.StepID, &r.StepName, &r.StepType, &r.Input, &r.Error, &r.Reason, &r.CreatedAt); err != nil {
×
1103
                if errors.Is(err, sql.ErrNoRows) {
×
1104
                        return nil, ErrEntityNotFound
×
1105
                }
×
1106
                return nil, err
×
1107
        }
1108
        return &r, nil
×
1109
}
1110

1111
func (s *SQLiteStore) PauseActiveStepsAndClearQueue(ctx context.Context, instanceID int64) error {
×
1112
        _, err := s.db.ExecContext(
×
1113
                ctx,
×
1114
                `UPDATE workflow_steps
×
1115
                        SET status='paused'
×
1116
                        WHERE instance_id=? AND status IN ('running','pending','compensation')`,
×
1117
                instanceID,
×
1118
        )
×
1119
        if err != nil {
×
1120
                return err
×
1121
        }
×
1122
        _, err = s.db.ExecContext(ctx, `DELETE FROM queue WHERE instance_id=?`, instanceID)
×
1123
        return err
×
1124
}
1125

1126
func (s *SQLiteStore) CleanupOldWorkflows(ctx context.Context, daysToKeep int) (int64, error) {
×
1127
        cutoff := time.Now().AddDate(0, 0, -daysToKeep)
×
1128
        // delete related rows first
×
1129
        _, _ = s.db.ExecContext(
×
1130
                ctx,
×
1131
                `DELETE FROM workflow_events
×
1132
                        WHERE instance_id IN (SELECT id FROM workflow_instances WHERE updated_at < ?)`,
×
1133
                cutoff,
×
1134
        )
×
1135
        _, _ = s.db.ExecContext(
×
1136
                ctx,
×
1137
                `DELETE FROM workflow_steps
×
1138
                        WHERE instance_id IN (SELECT id FROM workflow_instances WHERE updated_at < ?)`,
×
1139
                cutoff,
×
1140
        )
×
1141
        res, err := s.db.ExecContext(ctx, `DELETE FROM workflow_instances WHERE updated_at < ?`, cutoff)
×
1142
        if err != nil {
×
1143
                return 0, err
×
1144
        }
×
1145
        rows, _ := res.RowsAffected()
×
1146
        return rows, nil
×
1147
}
1148

1149
func (s *SQLiteStore) GetWorkflowStats(ctx context.Context) ([]WorkflowStats, error) {
×
1150
        rows, err := s.db.QueryContext(
×
1151
                ctx,
×
1152
                `SELECT name, version,
×
1153
                        (SELECT COUNT(*) FROM workflow_instances WHERE workflow_id=wd.id) as total,
×
1154
                        (SELECT COUNT(*) FROM workflow_instances WHERE workflow_id=wd.id AND status='completed') as completed,
×
1155
                        (SELECT COUNT(*) FROM workflow_instances WHERE workflow_id=wd.id AND status='failed') as failed,
×
1156
                        (SELECT COUNT(*) FROM workflow_instances WHERE workflow_id=wd.id AND status='running') as running
×
1157
                        FROM workflow_definitions wd`,
×
1158
        )
×
1159
        if err != nil {
×
1160
                return nil, err
×
1161
        }
×
1162
        defer rows.Close()
×
1163
        var stats []WorkflowStats
×
1164
        for rows.Next() {
×
1165
                var sst WorkflowStats
×
1166
                var total, completed, failed, running int
×
1167
                if err := rows.Scan(&sst.WorkflowName, &sst.Version, &total, &completed, &failed, &running); err != nil {
×
1168
                        return nil, err
×
1169
                }
×
1170
                sst.TotalInstances = total
×
1171
                sst.CompletedInstances = completed
×
1172
                sst.FailedInstances = failed
×
1173
                sst.RunningInstances = running
×
1174
                // average duration not tracked; leave zero
×
1175
                stats = append(stats, sst)
×
1176
        }
1177
        return stats, nil
×
1178
}
1179

1180
func boolToInt(b bool) int {
×
1181
        if b {
×
1182
                return 1
×
1183
        }
×
1184
        return 0
×
1185
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc