• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

Rotorsoft / act-root / 25968856277

16 May 2026 05:51PM UTC coverage: 99.956%. First build
25968856277

Pull #736

github

web-flow
Merge 0374df897 into 836ae3c9d
Pull Request #736: feat(act): NonRetryableError + app.unblock recovery primitive

2390 of 2391 branches covered (99.96%)

Branch coverage included in aggregate %.

83 of 85 new or added lines in 9 files covered. (97.65%)

4470 of 4472 relevant lines covered (99.96%)

419.99 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

99.4
/libs/act-sqlite/src/sqlite-store.ts
1
import { type Client, createClient } from "@libsql/client";
2
import type {
3
  BlockedLease,
4
  Committed,
5
  EventMeta,
6
  Lease,
7
  Message,
8
  PrioritizeFilter,
9
  Query,
10
  QueryStreams,
11
  QueryStreamsResult,
12
  Schemas,
13
  Store,
14
  StreamPosition,
15
} from "@rotorsoft/act";
16

17
/**
18
 * SQLite store configuration
19
 */
20
export interface SqliteConfig {
21
  /** Path to the SQLite database file (default: ":memory:") */
22
  url: string;
23
  /** Auth token for libSQL server connections (optional) */
24
  authToken?: string;
25
}
26

27
const DEFAULT_CONFIG: SqliteConfig = {
5✔
28
  url: "file::memory:",
29
};
30

31
/** Translate a stream filter (regex-shaped or plain substring) into a
32
 *  SQL LIKE pattern. Honors `^` / `$` anchors and converts `.*` → `%`,
33
 *  `.` → `_`. Unanchored input gets `%` wildcards on both sides.
34
 *
35
 *  Examples:
36
 *  - `^abc$`  → `abc`        (exact)
37
 *  - `^abc.*` → `abc%`       (starts-with)
38
 *  - `.*abc$` → `%abc`       (ends-with)
39
 *  - `abc`    → `%abc%`      (contains)
40
 *  - `a.c`    → `%a_c%`      (single-char wildcard, contains)
41
 *
42
 *  @internal exported for testing
43
 */
44
export function streamPatternToLike(input: string): string {
45
  let s = input;
53✔
46
  const start = s.startsWith("^");
53✔
47
  const end = s.endsWith("$");
53✔
48
  if (start) s = s.slice(1);
53✔
49
  if (end) s = s.slice(0, -1);
53✔
50
  s = s.replace(/\.\*/g, "%").replace(/\./g, "_");
53✔
51
  const out = (start ? "" : "%") + s + (end ? "" : "%");
53✔
52
  // Collapse adjacent `%` — e.g. `^a.*` would otherwise yield `a%%`.
53
  // Same matching semantics, cleaner output.
54
  return out.replace(/%+/g, "%");
53✔
55
}
56

57
/**
58
 * SQLite event store adapter for [@rotorsoft/act](https://www.npmjs.com/package/@rotorsoft/act).
59
 *
60
 * Provides persistent event storage using SQLite via `@libsql/client`.
61
 * All write operations use transactions for ACID guarantees.
62
 * Since SQLite serializes writes at the database level, the concurrency
63
 * model is equivalent to PostgreSQL's `FOR UPDATE SKIP LOCKED` for
64
 * single-server deployments.
65
 *
66
 * **`Store.notify` is intentionally not implemented.** The notify hook is
67
 * a cross-process wake-up signal that lets a horizontally-scaled Act
68
 * deployment wake `settle()` immediately on remote commits. SQLite is
69
 * single-node by design — there is no remote writer to be notified of —
70
 * so the {@link Act} orchestrator falls back to the existing
71
 * debounce/poll path, which is correct for this topology.
72
 *
73
 * @example
74
 * ```typescript
75
 * import { store } from "@rotorsoft/act";
76
 * import { SqliteStore } from "@rotorsoft/act-sqlite";
77
 *
78
 * store(new SqliteStore({ url: "file:myapp.db" }));
79
 * await store().seed();
80
 * ```
81
 */
82
export class SqliteStore implements Store {
83
  private client: Client;
84

85
  constructor(config: Partial<SqliteConfig> = {}) {
27✔
86
    const cfg = { ...DEFAULT_CONFIG, ...config };
27✔
87
    this.client = createClient({
27✔
88
      url: cfg.url,
89
      authToken: cfg.authToken,
90
    });
91
  }
92

93
  async seed() {
94
    await this.client.execute("PRAGMA journal_mode=WAL");
16✔
95
    await this.client.execute(`
16✔
96
      CREATE TABLE IF NOT EXISTS events (
97
        id INTEGER PRIMARY KEY AUTOINCREMENT,
98
        stream TEXT NOT NULL,
99
        version INTEGER NOT NULL,
100
        name TEXT NOT NULL,
101
        data TEXT NOT NULL,
102
        meta TEXT NOT NULL,
103
        created TEXT NOT NULL,
104
        UNIQUE(stream, version)
105
      )
106
    `);
107
    await this.client.execute(
16✔
108
      "CREATE INDEX IF NOT EXISTS idx_events_stream ON events(stream)"
109
    );
110
    await this.client.execute(
16✔
111
      "CREATE INDEX IF NOT EXISTS idx_events_name ON events(name)"
112
    );
113
    await this.client.execute(`
16✔
114
      CREATE TABLE IF NOT EXISTS streams (
115
        stream TEXT PRIMARY KEY,
116
        source TEXT,
117
        at INTEGER NOT NULL DEFAULT -1,
118
        retry INTEGER NOT NULL DEFAULT 0,
119
        blocked INTEGER NOT NULL DEFAULT 0,
120
        error TEXT NOT NULL DEFAULT '',
121
        leased_by TEXT,
122
        leased_until TEXT,
123
        priority INTEGER NOT NULL DEFAULT 0
124
      )
125
    `);
126
    // Migration for tables created before priority lanes (ACT-102).
127
    // libSQL surfaces "duplicate column" as an error, hence the
128
    // try/swallow — this mirrors PG's `ADD COLUMN IF NOT EXISTS`.
129
    try {
16✔
130
      await this.client.execute(
16✔
131
        "ALTER TABLE streams ADD COLUMN priority INTEGER NOT NULL DEFAULT 0"
132
      );
133
    } catch {
134
      // already present
135
    }
136
    await this.client.execute(
16✔
137
      "CREATE INDEX IF NOT EXISTS idx_streams_claim ON streams(blocked, priority DESC, at)"
138
    );
139
  }
140

141
  async drop() {
142
    await this.client.execute("DROP TABLE IF EXISTS events");
3✔
143
    await this.client.execute("DROP TABLE IF EXISTS streams");
3✔
144
  }
145

146
  async dispose() {
147
    await Promise.resolve();
15✔
148
    this.client.close();
15✔
149
  }
150

151
  // --- commit: transaction + optimistic concurrency ---
152
  async commit<E extends Schemas>(
153
    stream: string,
154
    msgs: Message<E, keyof E>[],
155
    meta: EventMeta,
156
    expectedVersion?: number
157
  ): Promise<Committed<E, keyof E>[]> {
158
    const tx = await this.client.transaction("write");
63✔
159
    try {
63✔
160
      const versionRow = await tx.execute({
63✔
161
        sql: "SELECT COALESCE(MAX(version), -1) as v FROM events WHERE stream = ?",
162
        args: [stream],
163
      });
164
      const currentVersion = Number(versionRow.rows[0].v);
63✔
165

166
      if (
63✔
167
        typeof expectedVersion === "number" &&
67✔
168
        currentVersion !== expectedVersion
169
      ) {
170
        const { ConcurrencyError } = await import("@rotorsoft/act");
3✔
171
        throw new ConcurrencyError(
3✔
172
          stream,
173
          currentVersion,
174
          msgs as Message<Schemas, keyof Schemas>[],
175
          expectedVersion
176
        );
177
      }
178

179
      const now = new Date().toISOString();
60✔
180
      const committed: Committed<E, keyof E>[] = [];
60✔
181
      let version = currentVersion + 1;
60✔
182

183
      for (const { name, data } of msgs) {
60✔
184
        const result = await tx.execute({
75✔
185
          sql: "INSERT INTO events (stream, version, name, data, meta, created) VALUES (?, ?, ?, ?, ?, ?)",
186
          args: [
187
            stream,
188
            version,
189
            name as string,
190
            JSON.stringify(data),
191
            JSON.stringify(meta),
192
            now,
193
          ],
194
        });
195
        committed.push({
74✔
196
          id: Number(result.lastInsertRowid),
197
          stream,
198
          version,
199
          created: new Date(now),
200
          name,
201
          data,
202
          meta,
203
        });
204
        version++;
74✔
205
      }
206

207
      await tx.commit();
59✔
208
      return committed;
59✔
209
    } catch (e) {
210
      await tx.rollback();
4✔
211
      throw e;
4✔
212
    }
213
  }
214

215
  // --- query: read-only, no transaction needed ---
216
  async query<E extends Schemas>(
217
    callback: (event: Committed<E, keyof E>) => void,
218
    query?: Query
219
  ): Promise<number> {
220
    let sql = "SELECT * FROM events WHERE 1=1";
29✔
221
    const args: unknown[] = [];
29✔
222

223
    if (query?.stream) {
29✔
224
      if (query.stream_exact) {
27✔
225
        sql += " AND stream = ?";
19✔
226
        args.push(query.stream);
19✔
227
      } else {
228
        sql += " AND stream LIKE ?";
8✔
229
        args.push(streamPatternToLike(query.stream));
8✔
230
      }
231
    }
232
    if (query?.names) {
29✔
233
      sql += ` AND name IN (${query.names.map(() => "?").join(",")})`;
1✔
234
      args.push(...query.names);
1✔
235
    }
236
    if ((query as any)?.correlation) {
29✔
237
      sql += " AND json_extract(meta, '$.correlation') = ?";
2✔
238
      args.push((query as any).correlation);
2✔
239
    }
240
    if (query?.after !== undefined) {
29✔
241
      sql += " AND id > ?";
4✔
242
      args.push(query.after);
4✔
243
    }
244
    if (query?.before !== undefined) {
29✔
245
      sql += " AND id < ?";
1✔
246
      args.push(query.before);
1✔
247
    }
248
    if (query?.created_after) {
29✔
249
      sql += " AND created > ?";
3✔
250
      args.push(query.created_after.toISOString());
3✔
251
    }
252
    if (query?.created_before) {
29✔
253
      sql += " AND created < ?";
2✔
254
      args.push(query.created_before.toISOString());
2✔
255
    }
256
    if (!query?.with_snaps) {
29✔
257
      sql += " AND name != '__snapshot__'";
27✔
258
    }
259

260
    sql += query?.backward ? " ORDER BY id DESC" : " ORDER BY id ASC";
29✔
261

262
    if (query?.limit) {
29✔
263
      sql += " LIMIT ?";
1✔
264
      args.push(query.limit);
1✔
265
    }
266

267
    const result = await this.client.execute({ sql, args: args as any[] });
29✔
268
    let count = 0;
29✔
269

270
    for (const row of result.rows) {
29✔
271
      callback({
46✔
272
        id: Number(row.id),
273
        stream: row.stream as string,
274
        version: Number(row.version),
275
        created: new Date(row.created as string),
276
        name: row.name as string,
277
        data: JSON.parse(row.data as string),
278
        meta: JSON.parse(row.meta as string),
279
      });
280
      count++;
46✔
281
    }
282

283
    return count;
29✔
284
  }
285

286
  // --- subscribe: idempotent INSERT OR IGNORE (= PG ON CONFLICT DO NOTHING)
287
  //     plus a UPDATE pass to keep the *max* priority across reactions
288
  //     targeting the same stream (ACT-102). Operator overrides go
289
  //     through `prioritize()` instead.
290
  async subscribe(
291
    streams: Array<{ stream: string; source?: string; priority?: number }>
292
  ) {
293
    const tx = await this.client.transaction("write");
41✔
294
    try {
41✔
295
      let subscribed = 0;
41✔
296
      for (const { stream, source, priority = 0 } of streams) {
41✔
297
        const inserted = await tx.execute({
66✔
298
          sql: "INSERT OR IGNORE INTO streams (stream, source, priority) VALUES (?, ?, ?)",
299
          args: [stream, source ?? null, priority],
119✔
300
        });
301
        if (inserted.rowsAffected > 0) {
65✔
302
          subscribed++;
62✔
303
        } else if (priority > 0) {
3✔
304
          await tx.execute({
2✔
305
            sql: "UPDATE streams SET priority = ? WHERE stream = ? AND priority < ?",
306
            args: [priority, stream, priority],
307
          });
308
        }
309
      }
310
      const wm = await tx.execute(
40✔
311
        "SELECT COALESCE(MAX(at), -1) as w FROM streams"
312
      );
313
      await tx.commit();
40✔
314
      return { subscribed, watermark: Number(wm.rows[0].w) };
40✔
315
    } catch (e) {
316
      await tx.rollback();
1✔
317
      throw e;
1✔
318
    }
319
  }
320

321
  // --- claim: write transaction (SQLite serializes writes = equivalent
322
  //     to PG FOR UPDATE SKIP LOCKED for single-server) ---
323
  async claim(lagging: number, leading: number, by: string, millis: number) {
324
    const tx = await this.client.transaction("write");
28✔
325
    try {
28✔
326
      const now = new Date().toISOString();
28✔
327

328
      const result = await tx.execute({
28✔
329
        sql: `SELECT stream, source, at, priority FROM streams
330
              WHERE blocked = 0 AND (leased_until IS NULL OR leased_until <= ?)
331
              ORDER BY priority DESC, at ASC`,
332
        args: [now],
333
      });
334

335
      const candidates: {
336
        stream: string;
337
        source: string | undefined;
338
        at: number;
339
        priority: number;
340
      }[] = [];
27✔
341
      for (const row of result.rows) {
27✔
342
        const stream = row.stream as string;
49✔
343
        const source = row.source as string | null;
49✔
344
        const at = Number(row.at);
49✔
345

346
        let hasEvents: boolean;
347
        if (source) {
49✔
348
          const check = await tx.execute({
11✔
349
            sql: `SELECT 1 FROM events WHERE id > ? AND name != '__snapshot__' AND stream LIKE ? LIMIT 1`,
350
            args: [at, streamPatternToLike(source)],
351
          });
352
          hasEvents = check.rows.length > 0;
11✔
353
        } else {
354
          const check = await tx.execute({
38✔
355
            sql: `SELECT 1 FROM events WHERE id > ? AND name != '__snapshot__' LIMIT 1`,
356
            args: [at],
357
          });
358
          hasEvents = check.rows.length > 0;
38✔
359
        }
360

361
        if (hasEvents) {
49✔
362
          candidates.push({
46✔
363
            stream,
364
            source: source ?? undefined,
84✔
365
            at,
366
            priority: Number(row.priority),
367
          });
368
        }
369
      }
370

371
      // Dual frontier: lagging (priority DESC, watermark ASC — ACT-102)
372
      // + leading (newest first). The candidates list arrives sorted
373
      // by `priority DESC, at ASC` from the SELECT above, so the
374
      // `slice(0, lagging)` already does the right thing.
375
      const lag = candidates.slice(0, lagging);
27✔
376
      const lead = [...candidates]
27✔
377
        .sort((a, b) => b.at - a.at)
22✔
378
        .slice(0, leading);
379
      const seen = new Set<string>();
27✔
380
      const combined = [...lag, ...lead].filter((p) => {
27✔
381
        if (seen.has(p.stream)) return false;
47✔
382
        seen.add(p.stream);
45✔
383
        return true;
45✔
384
      });
385

386
      const leases: Lease[] = [];
27✔
387
      const until = new Date(Date.now() + millis).toISOString();
27✔
388
      for (const row of combined) {
27✔
389
        await tx.execute({
45✔
390
          sql: "UPDATE streams SET leased_by = ?, leased_until = ?, retry = retry + 1 WHERE stream = ?",
391
          args: [by, until, row.stream],
392
        });
393
        leases.push({
45✔
394
          stream: row.stream,
395
          source: row.source,
396
          at: row.at,
397
          by,
398
          retry: 0,
399
          lagging: row.at < 0,
400
        });
401
      }
402

403
      await tx.commit();
27✔
404
      return leases;
27✔
405
    } catch (e) {
406
      await tx.rollback();
1✔
407
      throw e;
1✔
408
    }
409
  }
410

411
  // --- ack: transaction + ownership check (= PG WHERE leased_by) ---
412
  async ack(leases: Lease[]) {
413
    const tx = await this.client.transaction("write");
14✔
414
    try {
14✔
415
      const result: Lease[] = [];
14✔
416
      for (const l of leases) {
14✔
417
        const r = await tx.execute({
24✔
418
          sql: `UPDATE streams SET at = ?, leased_by = NULL, leased_until = NULL, retry = -1
419
                WHERE stream = ? AND leased_by = ?`,
420
          args: [l.at, l.stream, l.by],
421
        });
422
        if (r.rowsAffected > 0) result.push(l);
23✔
423
      }
424
      await tx.commit();
13✔
425
      return result;
13✔
426
    } catch (e) {
427
      await tx.rollback();
1✔
428
      throw e;
1✔
429
    }
430
  }
431

432
  // --- block: transaction + ownership + idempotent (= PG) ---
433
  async block(leases: BlockedLease[]) {
434
    const tx = await this.client.transaction("write");
9✔
435
    try {
9✔
436
      const result: BlockedLease[] = [];
9✔
437
      for (const l of leases) {
9✔
438
        const r = await tx.execute({
9✔
439
          sql: `UPDATE streams SET blocked = 1, error = ?
440
                WHERE stream = ? AND leased_by = ? AND blocked = 0`,
441
          args: [l.error, l.stream, l.by],
442
        });
443
        if (r.rowsAffected > 0) result.push(l);
8✔
444
      }
445
      await tx.commit();
8✔
446
      return result;
8✔
447
    } catch (e) {
448
      await tx.rollback();
1✔
449
      throw e;
1✔
450
    }
451
  }
452

453
  // --- reset: transactional ---
454
  async reset(streams: string[]) {
455
    const tx = await this.client.transaction("write");
5✔
456
    try {
5✔
457
      let count = 0;
5✔
458
      for (const stream of streams) {
5✔
459
        const r = await tx.execute({
4✔
460
          sql: `UPDATE streams SET at = -1, retry = 0, blocked = 0, error = '',
461
                leased_by = NULL, leased_until = NULL WHERE stream = ?`,
462
          args: [stream],
463
        });
464
        count += r.rowsAffected;
3✔
465
      }
466
      await tx.commit();
4✔
467
      return count;
4✔
468
    } catch (e) {
469
      await tx.rollback();
1✔
470
      throw e;
1✔
471
    }
472
  }
473

474
  // --- unblock: clear blocked + retry + lease without touching watermark ---
475
  // `retry = -1` so claim's post-bump returns retry=0 (first attempt),
476
  // matching the InMemoryStore convention.
477
  async unblock(streams: string[]) {
478
    const tx = await this.client.transaction("write");
5✔
479
    try {
5✔
480
      let count = 0;
5✔
481
      for (const stream of streams) {
5✔
482
        const r = await tx.execute({
5✔
483
          sql: `UPDATE streams SET retry = -1, blocked = 0, error = '',
484
                leased_by = NULL, leased_until = NULL
485
                WHERE stream = ? AND blocked = 1`,
486
          args: [stream],
487
        });
488
        count += r.rowsAffected;
5✔
489
      }
490
      await tx.commit();
5✔
491
      return count;
5✔
492
    } catch (e) {
NEW
493
      await tx.rollback();
×
NEW
494
      throw e;
×
495
    }
496
  }
497

498
  // --- query_streams: read-only introspection with filters ---
499
  async query_streams(
500
    callback: (position: StreamPosition) => void,
501
    query?: QueryStreams
502
  ): Promise<QueryStreamsResult> {
503
    const limit = query?.limit ?? 100;
18✔
504
    let sql =
505
      "SELECT stream, source, at, retry, blocked, error, leased_by, leased_until, priority FROM streams WHERE 1=1";
18✔
506
    const args: unknown[] = [];
18✔
507

508
    if (query?.stream !== undefined) {
18✔
509
      if (query.stream_exact) {
17✔
510
        sql += " AND stream = ?";
2✔
511
        args.push(query.stream);
2✔
512
      } else {
513
        sql += " AND stream LIKE ?";
15✔
514
        args.push(streamPatternToLike(query.stream));
15✔
515
      }
516
    }
517
    if (query?.source !== undefined) {
18✔
518
      sql += " AND source IS NOT NULL";
2✔
519
      if (query.source_exact) {
2✔
520
        sql += " AND source = ?";
1✔
521
        args.push(query.source);
1✔
522
      } else {
523
        sql += " AND source LIKE ?";
1✔
524
        args.push(streamPatternToLike(query.source));
1✔
525
      }
526
    }
527
    if (query?.blocked !== undefined) {
18✔
528
      sql += " AND blocked = ?";
2✔
529
      args.push(query.blocked ? 1 : 0);
2✔
530
    }
531
    if (query?.after !== undefined) {
18✔
532
      sql += " AND stream > ?";
1✔
533
      args.push(query.after);
1✔
534
    }
535
    sql += " ORDER BY stream LIMIT ?";
18✔
536
    args.push(limit);
18✔
537

538
    const [streamsResult, maxResult] = await Promise.all([
18✔
539
      this.client.execute({ sql, args: args as any[] }),
540
      this.client.execute("SELECT COALESCE(MAX(id), -1) AS m FROM events"),
541
    ]);
542

543
    let count = 0;
18✔
544
    for (const row of streamsResult.rows) {
18✔
545
      const leased_until = row.leased_until as string | null;
30✔
546
      callback({
30✔
547
        stream: row.stream as string,
548
        source: (row.source as string | null) ?? undefined,
54✔
549
        at: Number(row.at),
550
        retry: Number(row.retry),
551
        blocked: Number(row.blocked) === 1,
552
        error: row.error as string,
553
        priority: Number(row.priority),
554
        leased_by: (row.leased_by as string | null) ?? undefined,
59✔
555
        leased_until: leased_until ? new Date(leased_until) : undefined,
30✔
556
      });
557
      count++;
30✔
558
    }
559

560
    return { maxEventId: Number(maxResult.rows[0].m), count };
18✔
561
  }
562

563
  // --- prioritize: bulk priority update with filter (ACT-102) ---
564
  async prioritize(
565
    filter: PrioritizeFilter,
566
    priority: number
567
  ): Promise<number> {
568
    // libSQL `?` placeholders are positional and NOT reusable, so we
569
    // bind `priority` twice: once for SET, once for the no-op skip
570
    // in WHERE.
571
    const args: unknown[] = [priority, priority];
10✔
572
    const conditions: string[] = ["priority <> ?"];
10✔
573

574
    if (filter.stream !== undefined) {
10✔
575
      if (filter.stream_exact) {
4✔
576
        conditions.push("stream = ?");
2✔
577
        args.push(filter.stream);
2✔
578
      } else {
579
        conditions.push("stream LIKE ?");
2✔
580
        args.push(streamPatternToLike(filter.stream));
2✔
581
      }
582
    }
583
    if (filter.source !== undefined) {
10✔
584
      conditions.push("source IS NOT NULL");
2✔
585
      if (filter.source_exact) {
2✔
586
        conditions.push("source = ?");
1✔
587
        args.push(filter.source);
1✔
588
      } else {
589
        conditions.push("source LIKE ?");
1✔
590
        args.push(streamPatternToLike(filter.source));
1✔
591
      }
592
    }
593
    if (filter.blocked !== undefined) {
10✔
594
      conditions.push("blocked = ?");
2✔
595
      args.push(filter.blocked ? 1 : 0);
2✔
596
    }
597
    const sql = `UPDATE streams SET priority = ? WHERE ${conditions.join(" AND ")}`;
10✔
598
    const result = await this.client.execute({ sql, args: args as any[] });
10✔
599
    return result.rowsAffected;
10✔
600
  }
601

602
  // --- truncate: transactional delete + seed ---
603
  async truncate(
604
    targets: Array<{
605
      stream: string;
606
      snapshot?: Record<string, unknown>;
607
      meta?: EventMeta;
608
    }>
609
  ) {
610
    const result = new Map<
5✔
611
      string,
612
      { deleted: number; committed: Committed<Schemas, keyof Schemas> }
613
    >();
614

615
    const tx = await this.client.transaction("write");
5✔
616
    try {
5✔
617
      for (const { stream, snapshot, meta } of targets) {
5✔
618
        const countRow = await tx.execute({
4✔
619
          sql: "SELECT COUNT(*) as c FROM events WHERE stream = ?",
620
          args: [stream],
621
        });
622
        const deleted = Number(countRow.rows[0].c);
4✔
623
        await tx.execute({
4✔
624
          sql: "DELETE FROM events WHERE stream = ?",
625
          args: [stream],
626
        });
627
        await tx.execute({
3✔
628
          sql: "DELETE FROM streams WHERE stream = ?",
629
          args: [stream],
630
        });
631

632
        const eventName =
633
          snapshot !== undefined ? "__snapshot__" : "__tombstone__";
3✔
634
        const eventMeta = meta ?? { correlation: "", causation: {} };
4✔
635
        const now = new Date().toISOString();
4✔
636
        const ins = await tx.execute({
4✔
637
          sql: "INSERT INTO events (stream, version, name, data, meta, created) VALUES (?, 0, ?, ?, ?, ?)",
638
          args: [
639
            stream,
640
            eventName,
641
            JSON.stringify(snapshot ?? {}),
6✔
642
            JSON.stringify(eventMeta),
643
            now,
644
          ],
645
        });
646

647
        result.set(stream, {
3✔
648
          deleted,
649
          committed: {
650
            id: Number(ins.lastInsertRowid),
651
            stream,
652
            version: 0,
653
            created: new Date(now),
654
            name: eventName,
655
            data: snapshot ?? {},
5✔
656
            meta: eventMeta,
657
          },
658
        });
659
      }
660
      await tx.commit();
4✔
661
    } catch (e) {
662
      await tx.rollback();
1✔
663
      throw e;
1✔
664
    }
665

666
    return result;
4✔
667
  }
668
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc