• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

rom8726 / floxy / 19539639064

20 Nov 2025 02:08PM UTC coverage: 46.776% (+0.04%) from 46.739%
19539639064

Pull #19

github

rom8726
Fixed Test_executeCompensationStep_HandlerSuccess_RolledBackAndEvent.
Pull Request #19: Stop all parallel branches before rollback.

33 of 46 new or added lines in 1 file covered. (71.74%)

5 existing lines in 3 files now uncovered.

4875 of 10422 relevant lines covered (46.78%)

47.36 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

32.37
/sqlite_store.go
1
package floxy
2

3
import (
4
        "context"
5
        "database/sql"
6
        "encoding/json"
7
        "errors"
8
        "fmt"
9
        "math"
10
        "sync"
11
        "time"
12

13
        _ "modernc.org/sqlite"
14
)
15

16
// Ensure interface compliance
17
var _ Store = (*SQLiteStore)(nil)
18

19
const (
20
        // MinAgingRate is the minimum allowed value for aging rate (0.0 = no aging)
21
        MinAgingRate = 0.0
22
        // MaxAgingRate is the maximum allowed value for aging rate to prevent excessive priority boosts
23
        MaxAgingRate = 100.0
24
)
25

26
// SQLiteStore provides a lightweight Store backed by SQLite.
27
// It implements only the subset of capabilities required by the SQLite tests.
28
// Non‑essential methods return a not-implemented error.
29
type SQLiteStore struct {
30
        db           *sql.DB
31
        agingEnabled bool
32
        agingRate    float64
33
        mu           sync.Mutex // serialize critical sections for SQLite
34
}
35

36
// NewSQLiteStore creates a persistent SQLite database stored in a file and initializes schema.
37
// The filepath parameter specifies the path to the SQLite database file.
38
// If the file doesn't exist, it will be created automatically.
39
func NewSQLiteStore(filepath string) (*SQLiteStore, error) {
2✔
40
        if filepath == "" {
2✔
41
                return nil, fmt.Errorf("filepath cannot be empty")
×
42
        }
×
43
        return newSQLiteStore(filepath, false)
2✔
44
}
45

46
// NewSQLiteInMemoryStore creates an in-memory SQLite database and initializes schema.
47
// This is useful for testing. For production use, prefer NewSQLiteStore with a file path.
48
func NewSQLiteInMemoryStore() (*SQLiteStore, error) {
5✔
49
        return newSQLiteStore(":memory:", true)
5✔
50
}
5✔
51

52
// newSQLiteStore is an internal helper function that creates a SQLite store.
53
// isInMemory indicates whether this is an in-memory database.
54
func newSQLiteStore(dsn string, isInMemory bool) (*SQLiteStore, error) {
7✔
55
        db, err := sql.Open("sqlite", dsn)
7✔
56
        if err != nil {
7✔
57
                return nil, fmt.Errorf("open sqlite: %w", err)
×
58
        }
×
59

60
        // Configure SQLite pragmas for better performance and reliability
61
        _, _ = db.Exec("PRAGMA journal_mode=WAL;")
7✔
62
        _, _ = db.Exec("PRAGMA foreign_keys=ON;")
7✔
63
        _, _ = db.Exec("PRAGMA busy_timeout=5000;")
7✔
64

7✔
65
        if isInMemory {
12✔
66
                // Single connection for in-memory databases to avoid locks
5✔
67
                db.SetMaxOpenConns(1)
5✔
68
                db.SetMaxIdleConns(1)
5✔
69
        } else {
7✔
70
                // For persistent databases, allow multiple connections for better concurrency
2✔
71
                db.SetMaxOpenConns(10)
2✔
72
                db.SetMaxIdleConns(5)
2✔
73
        }
2✔
74

75
        store := &SQLiteStore{db: db, agingEnabled: true, agingRate: 0.5}
7✔
76
        if err := RunSQLiteMigrations(context.Background(), db); err != nil {
7✔
77
                _ = db.Close()
×
78
                return nil, err
×
79
        }
×
80
        return store, nil
7✔
81
}
82

83
func (s *SQLiteStore) SetAgingEnabled(enabled bool) { s.agingEnabled = enabled }
1✔
84

85
// SetAgingRate sets the aging rate for queue priority adjustment.
86
// The rate is clamped to [MinAgingRate, MaxAgingRate] to prevent SQL injection
87
// and ensure valid SQL expressions. NaN and Inf values are clamped to 0.0.
88
func (s *SQLiteStore) SetAgingRate(rate float64) {
9✔
89
        s.agingRate = clampAgingRate(rate)
9✔
90
}
9✔
91

92
// clampAgingRate ensures the aging rate is within valid bounds.
93
// Returns 0.0 for NaN/Inf, and clamps to [MinAgingRate, MaxAgingRate].
94
func clampAgingRate(rate float64) float64 {
340✔
95
        // Handle NaN and Inf
340✔
96
        if math.IsNaN(rate) || math.IsInf(rate, 0) {
343✔
97
                return MinAgingRate
3✔
98
        }
3✔
99
        // Clamp to valid range
100
        if rate < MinAgingRate {
338✔
101
                return MinAgingRate
1✔
102
        }
1✔
103
        if rate > MaxAgingRate {
337✔
104
                return MaxAgingRate
1✔
105
        }
1✔
106
        return rate
335✔
107
}
108

109
// Close closes the database connection.
110
// It's recommended to call this method when the store is no longer needed,
111
// especially for persistent SQLite stores.
112
func (s *SQLiteStore) Close() error {
3✔
113
        if s.db != nil {
6✔
114
                return s.db.Close()
3✔
115
        }
3✔
116
        return nil
×
117
}
118

119
// Definitions
120
func (s *SQLiteStore) SaveWorkflowDefinition(ctx context.Context, def *WorkflowDefinition) error {
3✔
121
        definitionJSON, err := json.Marshal(def.Definition)
3✔
122
        if err != nil {
3✔
123
                return err
×
124
        }
×
125
        q := `INSERT INTO workflow_definitions (id, name, version, definition, created_at)
3✔
126
                VALUES(?, ?, ?, ?, ?)
3✔
127
                ON CONFLICT(name, version) DO UPDATE SET definition=excluded.definition`
3✔
128
        if _, err := s.db.ExecContext(ctx, q, def.ID, def.Name, def.Version, definitionJSON, time.Now()); err != nil {
3✔
129
                return err
×
130
        }
×
131
        // We keep provided ID/CreatedAt; in SQLite we don't fetch RETURNING here.
132
        return nil
3✔
133
}
134

135
func (s *SQLiteStore) GetWorkflowDefinition(ctx context.Context, id string) (*WorkflowDefinition, error) {
33✔
136
        q := `SELECT id, name, version, definition, created_at FROM workflow_definitions WHERE id=?`
33✔
137
        row := s.db.QueryRowContext(ctx, q, id)
33✔
138
        var def WorkflowDefinition
33✔
139
        var defJSON []byte
33✔
140
        if err := row.Scan(&def.ID, &def.Name, &def.Version, &defJSON, &def.CreatedAt); err != nil {
33✔
141
                if errors.Is(err, sql.ErrNoRows) {
×
142
                        return nil, ErrEntityNotFound
×
143
                }
×
144
                return nil, err
×
145
        }
146
        if err := json.Unmarshal(defJSON, &def.Definition); err != nil {
33✔
147
                return nil, err
×
148
        }
×
149
        return &def, nil
33✔
150
}
151

152
// Instances
153
func (s *SQLiteStore) CreateInstance(ctx context.Context, workflowID string, input json.RawMessage) (*WorkflowInstance, error) {
3✔
154
        now := time.Now()
3✔
155
        q := `INSERT INTO workflow_instances (workflow_id, status, input, created_at, updated_at)
3✔
156
                VALUES(?, ?, ?, ?, ?)`
3✔
157
        res, err := s.db.ExecContext(ctx, q, workflowID, StatusPending, input, now, now)
3✔
158
        if err != nil {
3✔
159
                return nil, err
×
160
        }
×
161
        id, _ := res.LastInsertId()
3✔
162
        return s.GetInstance(ctx, id)
3✔
163
}
164

165
func (s *SQLiteStore) UpdateInstanceStatus(ctx context.Context, instanceID int64, status WorkflowStatus, output json.RawMessage, errMsg *string) error {
4✔
166
        now := time.Now()
4✔
167
        // Update completed_at when status is completed, failed, or cancelled
4✔
168
        // Update started_at when status is running and started_at is NULL
4✔
169
        q := `UPDATE workflow_instances 
4✔
170
                SET status=?, output=?, error=?, updated_at=?,
4✔
171
                        completed_at = CASE WHEN ? IN ('completed', 'failed', 'cancelled') THEN ? ELSE completed_at END,
4✔
172
                        started_at = CASE WHEN started_at IS NULL AND ? = 'running' THEN ? ELSE started_at END
4✔
173
                WHERE id=?`
4✔
174
        _, err := s.db.ExecContext(ctx, q, status, output, errMsg, now, status, now, status, now, instanceID)
4✔
175
        return err
4✔
176
}
4✔
177

178
func (s *SQLiteStore) GetInstance(ctx context.Context, instanceID int64) (*WorkflowInstance, error) {
24✔
179
        q := `SELECT id, workflow_id, status, input, output, error, started_at, completed_at, created_at, updated_at
24✔
180
                FROM workflow_instances WHERE id=?`
24✔
181
        row := s.db.QueryRowContext(ctx, q, instanceID)
24✔
182
        var inst WorkflowInstance
24✔
183
        var inputBytes, outputBytes []byte
24✔
184
        if err := row.Scan(
24✔
185
                &inst.ID, &inst.WorkflowID, &inst.Status, &inputBytes, &outputBytes, &inst.Error,
24✔
186
                &inst.StartedAt, &inst.CompletedAt, &inst.CreatedAt, &inst.UpdatedAt,
24✔
187
        ); err != nil {
24✔
188
                if errors.Is(err, sql.ErrNoRows) {
×
189
                        return nil, ErrEntityNotFound
×
190
                }
×
191
                return nil, err
×
192
        }
193
        inst.Input = json.RawMessage(inputBytes)
24✔
194
        if outputBytes != nil {
25✔
195
                inst.Output = json.RawMessage(outputBytes)
1✔
196
        } else {
24✔
197
                inst.Output = nil
23✔
198
        }
23✔
199
        return &inst, nil
24✔
200
}
201

202
// Steps
203
func (s *SQLiteStore) CreateStep(ctx context.Context, step *WorkflowStep) error {
5✔
204
        now := time.Now()
5✔
205
        q := `INSERT INTO workflow_steps (
5✔
206
                instance_id, step_name, step_type, status, input, output, error, retry_count,
5✔
207
                max_retries, compensation_retry_count, idempotency_key, started_at, completed_at, created_at)
5✔
208
                VALUES(?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)`
5✔
209
        res, err := s.db.ExecContext(ctx, q,
5✔
210
                step.InstanceID, step.StepName, step.StepType, step.Status, step.Input, step.Output, step.Error,
5✔
211
                step.RetryCount, step.MaxRetries, step.CompensationRetryCount, step.IdempotencyKey,
5✔
212
                step.StartedAt, step.CompletedAt, now,
5✔
213
        )
5✔
214
        if err != nil {
5✔
215
                return err
×
216
        }
×
217
        id, _ := res.LastInsertId()
5✔
218
        step.ID = id
5✔
219
        step.CreatedAt = now
5✔
220
        return nil
5✔
221
}
222

223
func (s *SQLiteStore) UpdateStep(ctx context.Context, stepID int64, status StepStatus, output json.RawMessage, errMsg *string) error {
12✔
224
        now := time.Now()
12✔
225
        // Update completed_at when status is completed, failed, skipped, or rolled_back
12✔
226
        // Update started_at when status is running and started_at is NULL
12✔
227
        // Increment retry_count when status is failed
12✔
228
        q := `UPDATE workflow_steps 
12✔
229
                SET status=?, output=?, error=?,
12✔
230
                        completed_at = CASE WHEN ? IN ('completed', 'failed', 'skipped', 'rolled_back') THEN ? ELSE completed_at END,
12✔
231
                        started_at = CASE WHEN started_at IS NULL AND ? = 'running' THEN ? ELSE started_at END,
12✔
232
                        retry_count = CASE WHEN ? = 'failed' THEN retry_count + 1 ELSE retry_count END
12✔
233
                WHERE id=?`
12✔
234
        _, err := s.db.ExecContext(ctx, q, status, output, errMsg, status, now, status, now, status, stepID)
12✔
235
        return err
12✔
236
}
12✔
237

238
func (s *SQLiteStore) GetStepsByInstance(ctx context.Context, instanceID int64) ([]WorkflowStep, error) {
16✔
239
        q := `SELECT id, instance_id, step_name, step_type, status, input, output, error, retry_count,
16✔
240
                max_retries, compensation_retry_count, idempotency_key, started_at, completed_at, created_at
16✔
241
                FROM workflow_steps WHERE instance_id=? ORDER BY id`
16✔
242
        rows, err := s.db.QueryContext(ctx, q, instanceID)
16✔
243
        if err != nil {
16✔
244
                return nil, err
×
245
        }
×
246
        defer rows.Close()
16✔
247
        var res []WorkflowStep
16✔
248
        for rows.Next() {
49✔
249
                var srec WorkflowStep
33✔
250
                var inputBytes, outputBytes []byte
33✔
251
                if err := rows.Scan(&srec.ID, &srec.InstanceID, &srec.StepName, &srec.StepType, &srec.Status,
33✔
252
                        &inputBytes, &outputBytes, &srec.Error, &srec.RetryCount, &srec.MaxRetries,
33✔
253
                        &srec.CompensationRetryCount, &srec.IdempotencyKey, &srec.StartedAt, &srec.CompletedAt, &srec.CreatedAt); err != nil {
33✔
254
                        return nil, err
×
255
                }
×
256
                srec.Input = json.RawMessage(inputBytes)
33✔
257
                if outputBytes != nil {
59✔
258
                        srec.Output = json.RawMessage(outputBytes)
26✔
259
                }
26✔
260
                res = append(res, srec)
33✔
261
        }
262
        return res, nil
16✔
263
}
264

265
// Queue
266
func (s *SQLiteStore) EnqueueStep(ctx context.Context, instanceID int64, stepID *int64, priority Priority, delay time.Duration) error {
8✔
267
        sched := time.Now().Add(delay)
8✔
268
        q := `INSERT INTO queue (instance_id, step_id, scheduled_at, priority) VALUES(?, ?, ?, ?)`
8✔
269
        _, err := s.db.ExecContext(ctx, q, instanceID, stepID, sched, int(priority))
8✔
270
        return err
8✔
271
}
8✔
272

273
func (s *SQLiteStore) UpdateStepCompensationRetry(ctx context.Context, stepID int64, retryCount int, status StepStatus) error {
1✔
274
        q := `UPDATE workflow_steps SET compensation_retry_count=?, status=? WHERE id=?`
1✔
275
        _, err := s.db.ExecContext(ctx, q, retryCount, status, stepID)
1✔
276
        return err
1✔
277
}
1✔
278

279
func (s *SQLiteStore) DequeueStep(ctx context.Context, workerID string) (*QueueItem, error) {
331✔
280
        s.mu.Lock()
331✔
281
        defer s.mu.Unlock()
331✔
282
        // Simple transactional dequeue.
331✔
283
        tx, err := s.db.BeginTx(ctx, &sql.TxOptions{})
331✔
284
        if err != nil {
331✔
UNCOV
285
                return nil, err
×
UNCOV
286
        }
×
287
        defer func() {
662✔
288
                if tx != nil {
654✔
289
                        _ = tx.Rollback()
323✔
290
                }
323✔
291
        }()
292

293
        orderExpr := "priority"
331✔
294
        if s.agingEnabled {
662✔
295
                // Clamp aging rate to ensure valid SQL expression (defense in depth)
331✔
296
                rate := clampAgingRate(s.agingRate)
331✔
297
                // effective_priority = min(100, priority + floor(wait_seconds * rate))
331✔
298
                orderExpr = fmt.Sprintf("MIN(100, priority + CAST(((strftime('%%s','now') - strftime('%%s', scheduled_at)) * %.6f) AS INTEGER))", rate)
331✔
299
        }
331✔
300

301
        row := tx.QueryRowContext(ctx, fmt.Sprintf(`
331✔
302
                SELECT id, instance_id, step_id, scheduled_at, attempted_at, attempted_by, priority
331✔
303
                FROM queue
331✔
304
                WHERE scheduled_at <= ? AND (attempted_by IS NULL)
331✔
305
                ORDER BY %s DESC, scheduled_at ASC, id ASC
331✔
306
                LIMIT 1`, orderExpr), time.Now())
331✔
307
        var qi QueueItem
331✔
308
        if err := row.Scan(&qi.ID, &qi.InstanceID, &qi.StepID, &qi.ScheduledAt, &qi.AttemptedAt, &qi.AttemptedBy, &qi.Priority); err != nil {
654✔
309
                if errors.Is(err, sql.ErrNoRows) {
646✔
310
                        return nil, nil
323✔
311
                }
323✔
312
                return nil, err
×
313
        }
314
        // mark as attempted by worker
315
        now := time.Now()
8✔
316
        res, err := tx.ExecContext(ctx, `UPDATE queue SET attempted_at=?, attempted_by=? WHERE id=? AND attempted_by IS NULL`, now, workerID, qi.ID)
8✔
317
        if err != nil {
8✔
318
                return nil, err
×
319
        }
×
320
        if rows, _ := res.RowsAffected(); rows == 0 {
8✔
321
                // another worker claimed it; let caller retry
×
322
                return nil, nil
×
323
        }
×
324
        if err != nil {
8✔
325
                return nil, err
×
326
        }
×
327
        qi.AttemptedAt = &now
8✔
328
        qi.AttemptedBy = &workerID
8✔
329

8✔
330
        if err := tx.Commit(); err != nil {
8✔
331
                return nil, err
×
332
        }
×
333
        tx = nil
8✔
334
        return &qi, nil
8✔
335
}
336

337
func (s *SQLiteStore) RemoveFromQueue(ctx context.Context, queueID int64) error {
7✔
338
        _, err := s.db.ExecContext(ctx, `DELETE FROM queue WHERE id=?`, queueID)
7✔
339
        return err
7✔
340
}
7✔
341

342
func (s *SQLiteStore) ReleaseQueueItem(ctx context.Context, queueID int64) error {
×
343
        _, err := s.db.ExecContext(ctx, `UPDATE queue SET attempted_at=NULL, attempted_by=NULL WHERE id=?`, queueID)
×
344
        return err
×
345
}
×
346

347
func (s *SQLiteStore) RescheduleAndReleaseQueueItem(ctx context.Context, queueID int64, delay time.Duration) error {
×
348
        sched := time.Now().Add(delay)
×
349
        _, err := s.db.ExecContext(ctx, `UPDATE queue SET scheduled_at=?, attempted_at=NULL, attempted_by=NULL WHERE id=?`, sched, queueID)
×
350
        return err
×
351
}
×
352

353
// Events
354
func (s *SQLiteStore) LogEvent(ctx context.Context, instanceID int64, stepID *int64, eventType string, payload any) error {
16✔
355
        payloadJSON, err := json.Marshal(payload)
16✔
356
        if err != nil {
16✔
357
                return err
×
358
        }
×
359
        _, err = s.db.ExecContext(ctx, `INSERT INTO workflow_events (instance_id, step_id, event_type, payload, created_at) VALUES(?, ?, ?, ?, ?)`, instanceID, stepID, eventType, payloadJSON, time.Now())
16✔
360
        return err
16✔
361
}
362

363
func (s *SQLiteStore) GetWorkflowEvents(ctx context.Context, instanceID int64) ([]WorkflowEvent, error) {
1✔
364
        rows, err := s.db.QueryContext(ctx, `SELECT id, instance_id, step_id, event_type, payload, created_at FROM workflow_events WHERE instance_id=? ORDER BY id`, instanceID)
1✔
365
        if err != nil {
1✔
366
                return nil, err
×
367
        }
×
368
        defer rows.Close()
1✔
369
        var res []WorkflowEvent
1✔
370
        for rows.Next() {
2✔
371
                var ev WorkflowEvent
1✔
372
                if err := rows.Scan(&ev.ID, &ev.InstanceID, &ev.StepID, &ev.EventType, &ev.Payload, &ev.CreatedAt); err != nil {
1✔
373
                        return nil, err
×
374
                }
×
375
                res = append(res, ev)
1✔
376
        }
377
        return res, nil
1✔
378
}
379

380
// Joins and other auxiliary features — minimal or not implemented for now
381
func (s *SQLiteStore) CreateJoinState(ctx context.Context, instanceID int64, joinStepName string, waitingFor []string, strategy JoinStrategy) error {
×
382
        wf, _ := json.Marshal(waitingFor)
×
383
        now := time.Now()
×
384
        _, err := s.db.ExecContext(ctx, `INSERT INTO join_states (instance_id, join_step_name, waiting_for, completed, failed, join_strategy, is_ready, created_at, updated_at) VALUES(?, ?, ?, '[]', '[]', ?, 0, ?, ?)`, instanceID, joinStepName, string(wf), strategy, now, now)
×
385
        return err
×
386
}
×
387
func (s *SQLiteStore) UpdateJoinState(ctx context.Context, instanceID int64, joinStepName, completedStep string, success bool) (bool, error) {
×
388
        row := s.db.QueryRowContext(ctx, `SELECT waiting_for, completed, failed, join_strategy FROM join_states WHERE instance_id=? AND join_step_name=?`, instanceID, joinStepName)
×
389
        var waitingJSON, completedJSON, failedJSON string
×
390
        var strategy JoinStrategy
×
391
        if err := row.Scan(&waitingJSON, &completedJSON, &failedJSON, &strategy); err != nil {
×
392
                if errors.Is(err, sql.ErrNoRows) {
×
393
                        // create default with this single step
×
394
                        _ = s.CreateJoinState(ctx, instanceID, joinStepName, []string{completedStep}, JoinStrategyAll)
×
395
                        waitingJSON = "[\"" + completedStep + "\"]"
×
396
                        completedJSON = "[]"
×
397
                        failedJSON = "[]"
×
398
                        strategy = JoinStrategyAll
×
399
                } else {
×
400
                        return false, err
×
401
                }
×
402
        }
403
        var waitingFor, completed, failed []string
×
404
        _ = json.Unmarshal([]byte(waitingJSON), &waitingFor)
×
405
        _ = json.Unmarshal([]byte(completedJSON), &completed)
×
406
        _ = json.Unmarshal([]byte(failedJSON), &failed)
×
407
        // append to completed or failed if not present
×
408
        target := &completed
×
409
        if !success {
×
410
                target = &failed
×
411
        }
×
412
        found := false
×
413
        for _, v := range *target {
×
414
                if v == completedStep {
×
415
                        found = true
×
416
                        break
×
417
                }
418
        }
419
        if !found {
×
420
                *target = append(*target, completedStep)
×
421
        }
×
422
        // recompute readiness
423
        isReady := false
×
424
        if strategy == JoinStrategyAny {
×
425
                isReady = len(completed) > 0 || len(failed) > 0
×
426
        } else {
×
427
                totalProcessed := len(completed) + len(failed)
×
428
                isReady = totalProcessed >= len(waitingFor)
×
429
        }
×
430
        compJSON, _ := json.Marshal(completed)
×
431
        failJSON, _ := json.Marshal(failed)
×
432
        _, err := s.db.ExecContext(ctx, `UPDATE join_states SET completed=?, failed=?, is_ready=?, updated_at=? WHERE instance_id=? AND join_step_name=?`, string(compJSON), string(failJSON), boolToInt(isReady), time.Now(), instanceID, joinStepName)
×
433
        return isReady, err
×
434
}
435
func (s *SQLiteStore) GetJoinState(ctx context.Context, instanceID int64, joinStepName string) (*JoinState, error) {
×
436
        row := s.db.QueryRowContext(ctx, `SELECT waiting_for, completed, failed, join_strategy, is_ready, created_at, updated_at FROM join_states WHERE instance_id=? AND join_step_name=?`, instanceID, joinStepName)
×
437
        var waitingJSON, completedJSON, failedJSON string
×
438
        var strategy JoinStrategy
×
439
        var isReadyInt int
×
440
        var createdAt, updatedAt time.Time
×
441
        if err := row.Scan(&waitingJSON, &completedJSON, &failedJSON, &strategy, &isReadyInt, &createdAt, &updatedAt); err != nil {
×
442
                if errors.Is(err, sql.ErrNoRows) {
×
443
                        return nil, ErrEntityNotFound
×
444
                }
×
445
                return nil, err
×
446
        }
447
        var waitingFor, completed, failed []string
×
448
        _ = json.Unmarshal([]byte(waitingJSON), &waitingFor)
×
449
        _ = json.Unmarshal([]byte(completedJSON), &completed)
×
450
        _ = json.Unmarshal([]byte(failedJSON), &failed)
×
451
        return &JoinState{InstanceID: instanceID, JoinStepName: joinStepName, WaitingFor: waitingFor, Completed: completed, Failed: failed, JoinStrategy: strategy, IsReady: isReadyInt == 1, CreatedAt: createdAt, UpdatedAt: updatedAt}, nil
×
452
}
453
func (s *SQLiteStore) AddToJoinWaitFor(ctx context.Context, instanceID int64, joinStepName, stepToAdd string) error {
×
454
        row := s.db.QueryRowContext(ctx, `SELECT waiting_for, completed, failed, join_strategy FROM join_states WHERE instance_id=? AND join_step_name=?`, instanceID, joinStepName)
×
455
        var waitingJSON, completedJSON, failedJSON string
×
456
        var strategy JoinStrategy
×
457
        if err := row.Scan(&waitingJSON, &completedJSON, &failedJSON, &strategy); err != nil {
×
458
                if errors.Is(err, sql.ErrNoRows) {
×
459
                        return s.CreateJoinState(ctx, instanceID, joinStepName, []string{stepToAdd}, strategy)
×
460
                }
×
461
                return err
×
462
        }
463
        var waitingFor, completed, failed []string
×
464
        _ = json.Unmarshal([]byte(waitingJSON), &waitingFor)
×
465
        _ = json.Unmarshal([]byte(completedJSON), &completed)
×
466
        _ = json.Unmarshal([]byte(failedJSON), &failed)
×
467
        waitingFor = append(waitingFor, stepToAdd)
×
468
        isReady := checkJoinReady(waitingFor, completed, failed, strategy)
×
469
        wfJSON, _ := json.Marshal(waitingFor)
×
470
        _, err := s.db.ExecContext(ctx, `UPDATE join_states SET waiting_for=?, is_ready=?, updated_at=? WHERE instance_id=? AND join_step_name=?`, string(wfJSON), boolToInt(isReady), time.Now(), instanceID, joinStepName)
×
471
        return err
×
472
}
473
func (s *SQLiteStore) ReplaceInJoinWaitFor(ctx context.Context, instanceID int64, joinStepName, virtualStep, realStep string) error {
×
474
        row := s.db.QueryRowContext(ctx, `SELECT waiting_for, completed, failed, join_strategy FROM join_states WHERE instance_id=? AND join_step_name=?`, instanceID, joinStepName)
×
475
        var waitingJSON, completedJSON, failedJSON string
×
476
        var strategy JoinStrategy
×
477
        if err := row.Scan(&waitingJSON, &completedJSON, &failedJSON, &strategy); err != nil {
×
478
                if errors.Is(err, sql.ErrNoRows) {
×
479
                        return s.CreateJoinState(ctx, instanceID, joinStepName, []string{realStep}, JoinStrategyAll)
×
480
                }
×
481
                return err
×
482
        }
483
        var waitingFor, completed, failed []string
×
484
        _ = json.Unmarshal([]byte(waitingJSON), &waitingFor)
×
485
        _ = json.Unmarshal([]byte(completedJSON), &completed)
×
486
        _ = json.Unmarshal([]byte(failedJSON), &failed)
×
487
        found := false
×
488
        for i, w := range waitingFor {
×
489
                if w == virtualStep {
×
490
                        waitingFor[i] = realStep
×
491
                        found = true
×
492
                        break
×
493
                }
494
        }
495
        if !found {
×
496
                waitingFor = append(waitingFor, realStep)
×
497
        }
×
498
        isReady := checkJoinReady(waitingFor, completed, failed, strategy)
×
499
        wfJSON, _ := json.Marshal(waitingFor)
×
500
        _, err := s.db.ExecContext(ctx, `UPDATE join_states SET waiting_for=?, is_ready=?, updated_at=? WHERE instance_id=? AND join_step_name=?`, string(wfJSON), boolToInt(isReady), time.Now(), instanceID, joinStepName)
×
501
        return err
×
502
}
503

504
func (s *SQLiteStore) GetSummaryStats(ctx context.Context) (*SummaryStats, error) {
×
505
        row := s.db.QueryRowContext(ctx, `SELECT 
×
506
                COUNT(*) as total,
×
507
                SUM(CASE WHEN status='completed' THEN 1 ELSE 0 END),
×
508
                SUM(CASE WHEN status='failed' THEN 1 ELSE 0 END),
×
509
                SUM(CASE WHEN status='running' THEN 1 ELSE 0 END),
×
510
                SUM(CASE WHEN status='pending' THEN 1 ELSE 0 END)
×
511
                FROM workflow_instances`)
×
512
        var stats SummaryStats
×
513
        if err := row.Scan(&stats.TotalWorkflows, &stats.CompletedWorkflows, &stats.FailedWorkflows, &stats.RunningWorkflows, &stats.PendingWorkflows); err != nil {
×
514
                return nil, err
×
515
        }
×
516
        // active = running+pending in this simplified model
517
        stats.ActiveWorkflows = uint(stats.RunningWorkflows + stats.PendingWorkflows)
×
518
        return &stats, nil
×
519
}
520
func (s *SQLiteStore) GetActiveInstances(ctx context.Context) ([]ActiveWorkflowInstance, error) {
×
521
        rows, err := s.db.QueryContext(ctx, `SELECT wi.id, wi.workflow_id, COALESCE(wd.name,''), wi.status, wi.created_at, wi.updated_at,
×
522
                (SELECT step_name FROM workflow_steps WHERE instance_id=wi.id AND status='running' LIMIT 1) as current_step,
×
523
                (SELECT COUNT(*) FROM workflow_steps WHERE instance_id=wi.id) as total_steps,
×
524
                (SELECT COUNT(*) FROM workflow_steps WHERE instance_id=wi.id AND status='completed') as completed_steps,
×
525
                (SELECT COUNT(*) FROM workflow_steps WHERE instance_id=wi.id AND status='rolled_back') as rolled_back_steps
×
526
                FROM workflow_instances wi LEFT JOIN workflow_definitions wd ON wi.workflow_id = wd.id WHERE wi.status IN ('running','pending','dlq') ORDER BY wi.created_at DESC`)
×
527
        if err != nil {
×
528
                return nil, err
×
529
        }
×
530
        defer rows.Close()
×
531
        var res []ActiveWorkflowInstance
×
532
        for rows.Next() {
×
533
                var a ActiveWorkflowInstance
×
534
                var currentStep *string
×
535
                if err := rows.Scan(&a.ID, &a.WorkflowID, &a.WorkflowName, &a.Status, &a.StartedAt, &a.UpdatedAt, &currentStep, &a.TotalSteps, &a.CompletedSteps, &a.RolledBackSteps); err != nil {
×
536
                        return nil, err
×
537
                }
×
538
                if currentStep != nil {
×
539
                        a.CurrentStep = *currentStep
×
540
                }
×
541
                res = append(res, a)
×
542
        }
543
        return res, nil
×
544
}
545
func (s *SQLiteStore) GetWorkflowDefinitions(ctx context.Context) ([]WorkflowDefinition, error) {
×
546
        rows, err := s.db.QueryContext(ctx, `SELECT id, name, version, definition, created_at FROM workflow_definitions ORDER BY created_at DESC`)
×
547
        if err != nil {
×
548
                return nil, err
×
549
        }
×
550
        defer rows.Close()
×
551
        var res []WorkflowDefinition
×
552
        for rows.Next() {
×
553
                var d WorkflowDefinition
×
554
                var defJSON []byte
×
555
                if err := rows.Scan(&d.ID, &d.Name, &d.Version, &defJSON, &d.CreatedAt); err != nil {
×
556
                        return nil, err
×
557
                }
×
558
                _ = json.Unmarshal(defJSON, &d.Definition)
×
559
                res = append(res, d)
×
560
        }
561
        return res, nil
×
562
}
563
func (s *SQLiteStore) GetWorkflowInstances(ctx context.Context, workflowID string) ([]WorkflowInstance, error) {
×
564
        rows, err := s.db.QueryContext(ctx, `SELECT id, workflow_id, status, input, output, error, started_at, completed_at, created_at, updated_at FROM workflow_instances WHERE workflow_id=? ORDER BY id`, workflowID)
×
565
        if err != nil {
×
566
                return nil, err
×
567
        }
×
568
        defer rows.Close()
×
569
        var res []WorkflowInstance
×
570
        for rows.Next() {
×
571
                var inst WorkflowInstance
×
572
                var inb, outb []byte
×
573
                if err := rows.Scan(&inst.ID, &inst.WorkflowID, &inst.Status, &inb, &outb, &inst.Error, &inst.StartedAt, &inst.CompletedAt, &inst.CreatedAt, &inst.UpdatedAt); err != nil {
×
574
                        return nil, err
×
575
                }
×
576
                inst.Input = json.RawMessage(inb)
×
577
                if outb != nil {
×
578
                        inst.Output = json.RawMessage(outb)
×
579
                }
×
580
                res = append(res, inst)
×
581
        }
582
        return res, nil
×
583
}
584
func (s *SQLiteStore) GetAllWorkflowInstances(ctx context.Context) ([]WorkflowInstance, error) {
×
585
        rows, err := s.db.QueryContext(ctx, `SELECT id, workflow_id, status, input, output, error, started_at, completed_at, created_at, updated_at FROM workflow_instances ORDER BY id`)
×
586
        if err != nil {
×
587
                return nil, err
×
588
        }
×
589
        defer rows.Close()
×
590
        var res []WorkflowInstance
×
591
        for rows.Next() {
×
592
                var inst WorkflowInstance
×
593
                var inb, outb []byte
×
594
                if err := rows.Scan(&inst.ID, &inst.WorkflowID, &inst.Status, &inb, &outb, &inst.Error, &inst.StartedAt, &inst.CompletedAt, &inst.CreatedAt, &inst.UpdatedAt); err != nil {
×
595
                        return nil, err
×
596
                }
×
597
                inst.Input = json.RawMessage(inb)
×
598
                if outb != nil {
×
599
                        inst.Output = json.RawMessage(outb)
×
600
                }
×
601
                res = append(res, inst)
×
602
        }
603
        return res, nil
×
604
}
605
func (s *SQLiteStore) GetWorkflowSteps(ctx context.Context, instanceID int64) ([]WorkflowStep, error) {
×
606
        return s.GetStepsByInstance(ctx, instanceID)
×
607
}
×
608
func (s *SQLiteStore) GetActiveStepsForUpdate(ctx context.Context, instanceID int64) ([]WorkflowStep, error) {
×
609
        q := `SELECT id, instance_id, step_name, step_type, status, input, output, error,
×
610
                retry_count, max_retries, compensation_retry_count, idempotency_key,
×
611
                started_at, completed_at, created_at
×
612
                FROM workflow_steps
×
613
                WHERE instance_id=? AND status IN ('pending', 'running', 'waiting_decision')
×
614
                ORDER BY created_at DESC`
×
615
        rows, err := s.db.QueryContext(ctx, q, instanceID)
×
616
        if err != nil {
×
617
                return nil, err
×
618
        }
×
619
        defer rows.Close()
×
620

×
621
        steps := make([]WorkflowStep, 0)
×
622
        for rows.Next() {
×
623
                var step WorkflowStep
×
624
                err := rows.Scan(
×
625
                        &step.ID, &step.InstanceID, &step.StepName, &step.StepType,
×
626
                        &step.Status, &step.Input, &step.Output, &step.Error,
×
627
                        &step.RetryCount, &step.MaxRetries, &step.CompensationRetryCount,
×
628
                        &step.IdempotencyKey, &step.StartedAt, &step.CompletedAt, &step.CreatedAt,
×
629
                )
×
630
                if err != nil {
×
631
                        return nil, err
×
632
                }
×
633
                steps = append(steps, step)
×
634
        }
635
        return steps, rows.Err()
×
636
}
637

638
func (s *SQLiteStore) CreateCancelRequest(ctx context.Context, req *WorkflowCancelRequest) error {
×
639
        _, err := s.db.ExecContext(ctx, `INSERT OR REPLACE INTO cancel_requests (instance_id, requested_by, cancel_type, reason, created_at) VALUES(?, ?, ?, ?, ?)`, req.InstanceID, req.RequestedBy, req.CancelType, req.Reason, time.Now())
×
640
        return err
×
641
}
×
642
func (s *SQLiteStore) GetCancelRequest(ctx context.Context, instanceID int64) (*WorkflowCancelRequest, error) {
5✔
643
        row := s.db.QueryRowContext(ctx, `SELECT instance_id, requested_by, cancel_type, reason, created_at FROM cancel_requests WHERE instance_id=?`, instanceID)
5✔
644
        var req WorkflowCancelRequest
5✔
645
        if err := row.Scan(&req.InstanceID, &req.RequestedBy, &req.CancelType, &req.Reason, &req.CreatedAt); err != nil {
10✔
646
                if errors.Is(err, sql.ErrNoRows) {
10✔
647
                        return nil, ErrEntityNotFound
5✔
648
                }
5✔
649
                return nil, err
×
650
        }
651
        return &req, nil
×
652
}
653
func (s *SQLiteStore) DeleteCancelRequest(ctx context.Context, instanceID int64) error {
×
654
        _, err := s.db.ExecContext(ctx, `DELETE FROM cancel_requests WHERE instance_id=?`, instanceID)
×
655
        return err
×
656
}
×
657

658
func (s *SQLiteStore) CreateHumanDecision(ctx context.Context, decision *HumanDecisionRecord) error {
×
659
        now := time.Now()
×
660
        res, err := s.db.ExecContext(ctx, `INSERT INTO human_decisions (instance_id, step_id, decided_by, decision, comment, decided_at, created_at) VALUES(?, ?, ?, ?, ?, ?, ?)`, decision.InstanceID, decision.StepID, decision.DecidedBy, decision.Decision, decision.Comment, decision.DecidedAt, now)
×
661
        if err != nil {
×
662
                return err
×
663
        }
×
664
        id, _ := res.LastInsertId()
×
665
        decision.ID = id
×
666
        decision.CreatedAt = now
×
667
        return nil
×
668
}
669
func (s *SQLiteStore) GetHumanDecision(ctx context.Context, stepID int64) (*HumanDecisionRecord, error) {
×
670
        row := s.db.QueryRowContext(ctx, `SELECT id, instance_id, step_id, decided_by, decision, comment, decided_at, created_at FROM human_decisions WHERE step_id=?`, stepID)
×
671
        var d HumanDecisionRecord
×
672
        if err := row.Scan(&d.ID, &d.InstanceID, &d.StepID, &d.DecidedBy, &d.Decision, &d.Comment, &d.DecidedAt, &d.CreatedAt); err != nil {
×
673
                if errors.Is(err, sql.ErrNoRows) {
×
674
                        return nil, ErrEntityNotFound
×
675
                }
×
676
                return nil, err
×
677
        }
678
        return &d, nil
×
679
}
680
func (s *SQLiteStore) UpdateStepStatus(ctx context.Context, stepID int64, status StepStatus) error {
×
681
        _, err := s.db.ExecContext(ctx, `UPDATE workflow_steps SET status=? WHERE id=?`, status, stepID)
×
682
        return err
×
683
}
×
684
func (s *SQLiteStore) GetStepByID(ctx context.Context, stepID int64) (*WorkflowStep, error) {
×
685
        row := s.db.QueryRowContext(ctx, `SELECT id, instance_id, step_name, step_type, status, input, output, error, retry_count,
×
686
                max_retries, compensation_retry_count, idempotency_key, started_at, completed_at, created_at FROM workflow_steps WHERE id=?`, stepID)
×
687
        var step WorkflowStep
×
688
        var inputBytes, outputBytes []byte
×
689
        if err := row.Scan(&step.ID, &step.InstanceID, &step.StepName, &step.StepType, &step.Status, &inputBytes, &outputBytes, &step.Error,
×
690
                &step.RetryCount, &step.MaxRetries, &step.CompensationRetryCount, &step.IdempotencyKey, &step.StartedAt, &step.CompletedAt, &step.CreatedAt); err != nil {
×
691
                if errors.Is(err, sql.ErrNoRows) {
×
692
                        return nil, ErrEntityNotFound
×
693
                }
×
694
                return nil, err
×
695
        }
696
        step.Input = json.RawMessage(inputBytes)
×
697
        if outputBytes != nil {
×
698
                step.Output = json.RawMessage(outputBytes)
×
699
        }
×
700
        return &step, nil
×
701
}
702
func (s *SQLiteStore) GetHumanDecisionStepByInstanceID(ctx context.Context, instanceID int64) (*WorkflowStep, error) {
×
703
        row := s.db.QueryRowContext(ctx, `SELECT id, instance_id, step_name, step_type, status, input, output, error, retry_count, max_retries, compensation_retry_count, idempotency_key, started_at, completed_at, created_at FROM workflow_steps WHERE instance_id=? AND step_type='human' ORDER BY created_at DESC LIMIT 1`, instanceID)
×
704
        var step WorkflowStep
×
705
        var inb, outb []byte
×
706
        if err := row.Scan(&step.ID, &step.InstanceID, &step.StepName, &step.StepType, &step.Status, &inb, &outb, &step.Error, &step.RetryCount, &step.MaxRetries, &step.CompensationRetryCount, &step.IdempotencyKey, &step.StartedAt, &step.CompletedAt, &step.CreatedAt); err != nil {
×
707
                if errors.Is(err, sql.ErrNoRows) {
×
708
                        return nil, ErrEntityNotFound
×
709
                }
×
710
                return nil, err
×
711
        }
712
        step.Input = json.RawMessage(inb)
×
713
        if outb != nil {
×
714
                step.Output = json.RawMessage(outb)
×
715
        }
×
716
        return &step, nil
×
717
}
718

719
func (s *SQLiteStore) CreateDeadLetterRecord(ctx context.Context, rec *DeadLetterRecord) error {
×
720
        _, err := s.db.ExecContext(ctx, `INSERT INTO workflow_dlq (instance_id, workflow_id, step_id, step_name, step_type, input, error, reason, created_at) VALUES(?, ?, ?, ?, ?, ?, ?, ?, ?)`, rec.InstanceID, rec.WorkflowID, rec.StepID, rec.StepName, rec.StepType, rec.Input, rec.Error, rec.Reason, time.Now())
×
721
        return err
×
722
}
×
723
func (s *SQLiteStore) RequeueDeadLetter(ctx context.Context, dlqID int64, newInput *json.RawMessage) error {
×
724
        s.mu.Lock()
×
725
        defer s.mu.Unlock()
×
726
        tx, err := s.db.BeginTx(ctx, &sql.TxOptions{})
×
727
        if err != nil {
×
728
                return err
×
729
        }
×
730
        defer func() {
×
731
                if tx != nil {
×
732
                        _ = tx.Rollback()
×
733
                }
×
734
        }()
735
        var instanceID, stepID int64
×
736
        var input []byte
×
737
        if err := tx.QueryRowContext(ctx, `SELECT instance_id, step_id, input FROM workflow_dlq WHERE id=?`, dlqID).Scan(&instanceID, &stepID, &input); err != nil {
×
738
                return err
×
739
        }
×
740
        var setInput any
×
741
        if newInput != nil {
×
742
                setInput = *newInput
×
743
        } else {
×
744
                setInput = input
×
745
        }
×
746
        if _, err := tx.ExecContext(ctx, `UPDATE workflow_steps SET status='pending', input=?, error=NULL, retry_count=0, compensation_retry_count=0, started_at=NULL, completed_at=NULL WHERE id=?`, setInput, stepID); err != nil {
×
747
                return err
×
748
        }
×
749
        if _, err := tx.ExecContext(ctx, `INSERT INTO queue (instance_id, step_id, scheduled_at, priority) VALUES(?, ?, ?, ?)`, instanceID, stepID, time.Now(), int(PriorityNormal)); err != nil {
×
750
                return err
×
751
        }
×
752
        if _, err := tx.ExecContext(ctx, `UPDATE workflow_instances SET status='running', error=NULL WHERE id=? AND status IN ('failed','dlq')`, instanceID); err != nil {
×
753
                return err
×
754
        }
×
755
        if _, err := tx.ExecContext(ctx, `DELETE FROM workflow_dlq WHERE id=?`, dlqID); err != nil {
×
756
                return err
×
757
        }
×
758
        if err := tx.Commit(); err != nil {
×
759
                return err
×
760
        }
×
761
        tx = nil
×
762
        return nil
×
763
}
764
func (s *SQLiteStore) ListDeadLetters(ctx context.Context, offset int, limit int) ([]DeadLetterRecord, int64, error) {
×
765
        row := s.db.QueryRowContext(ctx, `SELECT COUNT(*) FROM workflow_dlq`)
×
766
        var total int64
×
767
        if err := row.Scan(&total); err != nil {
×
768
                return nil, 0, err
×
769
        }
×
770
        rows, err := s.db.QueryContext(ctx, `SELECT id, instance_id, workflow_id, step_id, step_name, step_type, input, error, reason, created_at FROM workflow_dlq ORDER BY created_at DESC LIMIT ? OFFSET ?`, limit, offset)
×
771
        if err != nil {
×
772
                return nil, 0, err
×
773
        }
×
774
        defer rows.Close()
×
775
        var res []DeadLetterRecord
×
776
        for rows.Next() {
×
777
                var r DeadLetterRecord
×
778
                if err := rows.Scan(&r.ID, &r.InstanceID, &r.WorkflowID, &r.StepID, &r.StepName, &r.StepType, &r.Input, &r.Error, &r.Reason, &r.CreatedAt); err != nil {
×
779
                        return nil, 0, err
×
780
                }
×
781
                res = append(res, r)
×
782
        }
783
        return res, total, nil
×
784
}
785
func (s *SQLiteStore) GetDeadLetterByID(ctx context.Context, id int64) (*DeadLetterRecord, error) {
×
786
        row := s.db.QueryRowContext(ctx, `SELECT id, instance_id, workflow_id, step_id, step_name, step_type, input, error, reason, created_at FROM workflow_dlq WHERE id=?`, id)
×
787
        var r DeadLetterRecord
×
788
        if err := row.Scan(&r.ID, &r.InstanceID, &r.WorkflowID, &r.StepID, &r.StepName, &r.StepType, &r.Input, &r.Error, &r.Reason, &r.CreatedAt); err != nil {
×
789
                if errors.Is(err, sql.ErrNoRows) {
×
790
                        return nil, ErrEntityNotFound
×
791
                }
×
792
                return nil, err
×
793
        }
794
        return &r, nil
×
795
}
796
func (s *SQLiteStore) PauseActiveStepsAndClearQueue(ctx context.Context, instanceID int64) error {
×
797
        _, err := s.db.ExecContext(ctx, `UPDATE workflow_steps SET status='paused' WHERE instance_id=? AND status IN ('running','pending','compensation')`, instanceID)
×
798
        if err != nil {
×
799
                return err
×
800
        }
×
801
        _, err = s.db.ExecContext(ctx, `DELETE FROM queue WHERE instance_id=?`, instanceID)
×
802
        return err
×
803
}
804

805
func (s *SQLiteStore) CleanupOldWorkflows(ctx context.Context, daysToKeep int) (int64, error) {
×
806
        cutoff := time.Now().AddDate(0, 0, -daysToKeep)
×
807
        // delete related rows first
×
808
        _, _ = s.db.ExecContext(ctx, `DELETE FROM workflow_events WHERE instance_id IN (SELECT id FROM workflow_instances WHERE updated_at < ?)`, cutoff)
×
809
        _, _ = s.db.ExecContext(ctx, `DELETE FROM workflow_steps WHERE instance_id IN (SELECT id FROM workflow_instances WHERE updated_at < ?)`, cutoff)
×
810
        res, err := s.db.ExecContext(ctx, `DELETE FROM workflow_instances WHERE updated_at < ?`, cutoff)
×
811
        if err != nil {
×
812
                return 0, err
×
813
        }
×
814
        rows, _ := res.RowsAffected()
×
815
        return rows, nil
×
816
}
817

818
// Minimal implementation to satisfy interface; not used in SQLite tests
819
func (s *SQLiteStore) GetWorkflowStats(ctx context.Context) ([]WorkflowStats, error) {
×
820
        rows, err := s.db.QueryContext(ctx, `SELECT name, version, (SELECT COUNT(*) FROM workflow_instances WHERE workflow_id=wd.id) as total,
×
821
                (SELECT COUNT(*) FROM workflow_instances WHERE workflow_id=wd.id AND status='completed') as completed,
×
822
                (SELECT COUNT(*) FROM workflow_instances WHERE workflow_id=wd.id AND status='failed') as failed,
×
823
                (SELECT COUNT(*) FROM workflow_instances WHERE workflow_id=wd.id AND status='running') as running
×
824
                FROM workflow_definitions wd`)
×
825
        if err != nil {
×
826
                return nil, err
×
827
        }
×
828
        defer rows.Close()
×
829
        var stats []WorkflowStats
×
830
        for rows.Next() {
×
831
                var sst WorkflowStats
×
832
                var total, completed, failed, running int
×
833
                if err := rows.Scan(&sst.WorkflowName, &sst.Version, &total, &completed, &failed, &running); err != nil {
×
834
                        return nil, err
×
835
                }
×
836
                sst.TotalInstances = total
×
837
                sst.CompletedInstances = completed
×
838
                sst.FailedInstances = failed
×
839
                sst.RunningInstances = running
×
840
                // average duration not tracked; leave zero
×
841
                stats = append(stats, sst)
×
842
        }
843
        return stats, nil
×
844
}
845

846
func boolToInt(b bool) int {
×
847
        if b {
×
848
                return 1
×
849
        }
×
850
        return 0
×
851
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc