• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

Rotorsoft / act-root / 16508103113

24 Jul 2025 09:20PM UTC coverage: 96.38%. First build
16508103113

Pull #58

github

web-flow
Merge ace481b8b into 2fff3a3d0
Pull Request #58: A better draining flow to support concurrent reactions and optimized fetches from built sources

350 of 362 branches covered (96.69%)

Branch coverage included in aggregate %.

362 of 403 new or added lines in 7 files covered. (89.83%)

1274 of 1323 relevant lines covered (96.3%)

116.68 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

84.91
/libs/act-pg/src/PostgresStore.ts
1
import type {
2
  Committed,
3
  EventMeta,
4
  Lease,
5
  Message,
6
  Query,
7
  Schemas,
8
  Store,
9
} from "@rotorsoft/act";
10
import { ConcurrencyError, SNAP_EVENT, logger } from "@rotorsoft/act";
1✔
11
import pg from "pg";
1✔
12
import { dateReviver } from "./utils.js";
1✔
13

14
const { Pool, types } = pg;
1✔
15
types.setTypeParser(types.builtins.JSONB, (val) =>
1✔
16
  JSON.parse(val, dateReviver)
150✔
17
);
1✔
18

19
type Config = Readonly<{
20
  host: string;
21
  port: number;
22
  database: string;
23
  user: string;
24
  password: string;
25
  schema: string;
26
  table: string;
27
}>;
28

29
const DEFAULT_CONFIG: Config = {
1✔
30
  host: "localhost",
1✔
31
  port: 5432,
1✔
32
  database: "postgres",
1✔
33
  user: "postgres",
1✔
34
  password: "postgres",
1✔
35
  schema: "public",
1✔
36
  table: "events",
1✔
37
};
1✔
38

39
/**
40
 * @category Adapters
41
 * @see Store
42
 *
43
 * PostgresStore is a production-ready event store adapter for Act, using PostgreSQL as the backend.
44
 *
45
 * - Supports event sourcing, leasing, snapshots, and concurrency control.
46
 * - Designed for high-throughput, scalable, and reliable event storage.
47
 * - Implements the Act Store interface.
48
 *
49
 * @example
50
 * import { PostgresStore } from "@act/pg";
51
 * const store = new PostgresStore({ schema: "my_schema", table: "events" });
52
 * await store.seed();
53
 *
54
 * @see https://github.com/rotorsoft/act-root
55
 */
56
export class PostgresStore implements Store {
1✔
57
  private _pool;
19✔
58
  readonly config: Config;
19✔
59
  private _fqt: string;
19✔
60
  private _fqs: string;
19✔
61

62
  /**
63
   * Create a new PostgresStore instance.
64
   * @param config Partial configuration (host, port, user, password, schema, table, etc.)
65
   */
66
  constructor(config: Partial<Config> = {}) {
19✔
67
    this.config = { ...DEFAULT_CONFIG, ...config };
19✔
68
    this._pool = new Pool(this.config);
19✔
69
    this._fqt = `"${this.config.schema}"."${this.config.table}"`;
19✔
70
    this._fqs = `"${this.config.schema}"."${this.config.table}_streams"`;
19✔
71
  }
19✔
72

73
  /**
74
   * Dispose of the store and close all database connections.
75
   * @returns Promise that resolves when all connections are closed
76
   */
77
  async dispose() {
19✔
78
    await this._pool.end();
1✔
79
  }
1✔
80

81
  /**
82
   * Seed the database with required tables, indexes, and schema for event storage.
83
   * @returns Promise that resolves when seeding is complete
84
   * @throws Error if seeding fails
85
   */
86
  async seed() {
19✔
87
    const client = await this._pool.connect();
4✔
88

89
    try {
3✔
90
      await client.query("BEGIN");
3✔
91

92
      // Create schema
93
      await client.query(
3✔
94
        `CREATE SCHEMA IF NOT EXISTS "${this.config.schema}";`
3✔
95
      );
3✔
96

97
      // Events table
98
      await client.query(
2✔
99
        `CREATE TABLE IF NOT EXISTS ${this._fqt} (
2✔
100
          id serial PRIMARY KEY,
101
          name varchar(100) COLLATE pg_catalog."default" NOT NULL,
102
          data jsonb,
103
          stream varchar(100) COLLATE pg_catalog."default" NOT NULL,
104
          version int NOT NULL,
105
          created timestamptz NOT NULL DEFAULT now(),
106
          meta jsonb
107
        ) TABLESPACE pg_default;`
108
      );
2✔
109

110
      // Indexes on events
111
      await client.query(
2✔
112
        `CREATE UNIQUE INDEX IF NOT EXISTS "${this.config.table}_stream_ix" 
2✔
113
        ON ${this._fqt} (stream COLLATE pg_catalog."default", version);`
2✔
114
      );
2✔
115
      await client.query(
2✔
116
        `CREATE INDEX IF NOT EXISTS "${this.config.table}_name_ix" 
2✔
117
        ON ${this._fqt} (name COLLATE pg_catalog."default");`
2✔
118
      );
2✔
119
      await client.query(
2✔
120
        `CREATE INDEX IF NOT EXISTS "${this.config.table}_created_id_ix" 
2✔
121
        ON ${this._fqt} (created, id);`
2✔
122
      );
2✔
123
      await client.query(
2✔
124
        `CREATE INDEX IF NOT EXISTS "${this.config.table}_correlation_ix" 
2✔
125
        ON ${this._fqt} ((meta ->> 'correlation') COLLATE pg_catalog."default");`
2✔
126
      );
2✔
127

128
      // Streams table
129
      await client.query(
2✔
130
        `CREATE TABLE IF NOT EXISTS ${this._fqs} (
2✔
131
          stream varchar(100) COLLATE pg_catalog."default" PRIMARY KEY,
132
          source varchar(100) COLLATE pg_catalog."default",
133
          at int NOT NULL DEFAULT -1,
134
          retry smallint NOT NULL DEFAULT 0,
135
          blocked boolean NOT NULL DEFAULT false,
136
          error text,
137
          leased_at int,
138
          leased_by uuid,
139
          leased_until timestamptz
140
        ) TABLESPACE pg_default;`
141
      );
2✔
142

143
      // Index for fetching streams
144
      await client.query(
2✔
145
        `CREATE INDEX IF NOT EXISTS "${this.config.table}_streams_fetch_ix" 
2✔
146
        ON ${this._fqs} (blocked, at);`
2✔
147
      );
2✔
148

149
      await client.query("COMMIT");
2✔
150
      logger.info(
2✔
151
        `Seeded schema "${this.config.schema}" with table "${this.config.table}"`
2✔
152
      );
2✔
153
    } catch (error) {
4✔
154
      await client.query("ROLLBACK");
1✔
155
      logger.error("Failed to seed store:", error);
1✔
156
      throw error;
1✔
157
    } finally {
4✔
158
      client.release();
3✔
159
    }
3✔
160
  }
4✔
161

162
  /**
163
   * Drop all tables and schema created by the store (for testing or cleanup).
164
   * @returns Promise that resolves when the schema is dropped
165
   */
166
  async drop() {
19✔
167
    await this._pool.query(
2✔
168
      `
2✔
169
      DO $$
170
      BEGIN
171
        IF EXISTS (SELECT 1 FROM information_schema.schemata
172
          WHERE schema_name = '${this.config.schema}'
2✔
173
        ) THEN
174
          EXECUTE 'DROP TABLE IF EXISTS ${this._fqt}';
2✔
175
          EXECUTE 'DROP TABLE IF EXISTS ${this._fqs}';
2✔
176
          IF '${this.config.schema}' <> 'public' THEN
2✔
177
            EXECUTE 'DROP SCHEMA "${this.config.schema}" CASCADE';
2✔
178
          END IF;
179
        END IF;
180
      END
181
      $$;
182
    `
183
    );
2✔
184
  }
2✔
185

186
  /**
187
   * Query events from the store, optionally filtered by stream, event name, time, etc.
188
   *
189
   * @param callback Function called for each event found
190
   * @param query (Optional) Query filter (stream, names, before, after, etc.)
191
   * @returns The number of events found
192
   *
193
   * @example
194
   * await store.query((event) => console.log(event), { stream: "A" });
195
   */
196
  async query<E extends Schemas>(
19✔
197
    callback: (event: Committed<E, keyof E>) => void,
15✔
198
    query?: Query
15✔
199
  ) {
15✔
200
    const {
15✔
201
      stream,
15✔
202
      names,
15✔
203
      before,
15✔
204
      after,
15✔
205
      limit,
15✔
206
      created_before,
15✔
207
      created_after,
15✔
208
      backward,
15✔
209
      correlation,
15✔
210
      with_snaps = false,
15✔
211
    } = query || {};
15✔
212

213
    let sql = `SELECT * FROM ${this._fqt}`;
15✔
214
    const conditions: string[] = [];
15✔
215
    const values: any[] = [];
15✔
216

217
    if (query) {
15✔
218
      if (typeof after !== "undefined") {
13✔
219
        values.push(after);
2✔
220
        conditions.push(`id>$${values.length}`);
2✔
221
      } else {
13✔
222
        conditions.push("id>-1");
11✔
223
      }
11✔
224
      if (stream) {
13✔
225
        values.push(stream);
8✔
226
        conditions.push(`stream ~ $${values.length}`);
8✔
227
      }
8✔
228
      if (names && names.length) {
13✔
229
        values.push(names);
1✔
230
        conditions.push(`name = ANY($${values.length})`);
1✔
231
      }
1✔
232
      if (before) {
13✔
233
        values.push(before);
1✔
234
        conditions.push(`id<$${values.length}`);
1✔
235
      }
1✔
236
      if (created_after) {
13✔
237
        values.push(created_after.toISOString());
1✔
238
        conditions.push(`created>$${values.length}`);
1✔
239
      }
1✔
240
      if (created_before) {
13✔
241
        values.push(created_before.toISOString());
1✔
242
        conditions.push(`created<$${values.length}`);
1✔
243
      }
1✔
244
      if (correlation) {
13✔
245
        values.push(correlation);
1✔
246
        conditions.push(`meta->>'correlation'=$${values.length}`);
1✔
247
      }
1✔
248
      if (!with_snaps) {
13✔
249
        conditions.push(`name <> '${SNAP_EVENT}'`);
11✔
250
      }
11✔
251
    }
13✔
252
    if (conditions.length) {
15✔
253
      sql += " WHERE " + conditions.join(" AND ");
13✔
254
    }
13✔
255
    sql += ` ORDER BY id ${backward ? "DESC" : "ASC"}`;
15✔
256
    if (limit) {
15✔
257
      values.push(limit);
4✔
258
      sql += ` LIMIT $${values.length}`;
4✔
259
    }
4✔
260

261
    const result = await this._pool.query<Committed<E, keyof E>>(sql, values);
15✔
262
    for (const row of result.rows) callback(row);
15✔
263

264
    return result.rowCount ?? 0;
15✔
265
  }
15✔
266

267
  /**
268
   * Commit new events to the store for a given stream, with concurrency control.
269
   *
270
   * @param stream The stream name
271
   * @param msgs Array of messages (event name and data)
272
   * @param meta Event metadata (correlation, causation, etc.)
273
   * @param expectedVersion (Optional) Expected stream version for concurrency control
274
   * @returns Array of committed events
275
   * @throws ConcurrencyError if the expected version does not match
276
   */
277
  async commit<E extends Schemas>(
19✔
278
    stream: string,
20✔
279
    msgs: Message<E, keyof E>[],
20✔
280
    meta: EventMeta,
20✔
281
    expectedVersion?: number
20✔
282
  ) {
20✔
283
    if (msgs.length === 0) return [];
20✔
284
    const client = await this._pool.connect();
18✔
285
    let version = -1;
18✔
286
    try {
18✔
287
      await client.query("BEGIN");
18✔
288

289
      const last = await client.query<Committed<E, keyof E>>(
17✔
290
        `SELECT version
17✔
291
        FROM ${this._fqt}
17✔
292
        WHERE stream=$1 ORDER BY version DESC LIMIT 1`,
293
        [stream]
17✔
294
      );
17✔
295
      version = last.rowCount ? last.rows[0].version : -1;
20✔
296
      if (typeof expectedVersion === "number" && version !== expectedVersion)
20✔
297
        throw new ConcurrencyError(
20✔
298
          stream,
1✔
299
          version,
1✔
300
          msgs as unknown as Message<Schemas, string>[],
1✔
301
          expectedVersion
1✔
302
        );
1✔
303

304
      const committed = await Promise.all(
16✔
305
        msgs.map(async ({ name, data }) => {
16✔
306
          version++;
28✔
307
          const sql = `
28✔
308
          INSERT INTO ${this._fqt}(name, data, stream, version, meta) 
28✔
309
          VALUES($1, $2, $3, $4, $5) RETURNING *`;
310
          const vals = [name, data, stream, version, meta];
28✔
311
          const { rows } = await client.query<Committed<E, keyof E>>(sql, vals);
28✔
312
          return rows.at(0)!;
27✔
313
        })
16✔
314
      );
16✔
315

316
      await client
15✔
317
        .query(
15✔
318
          `
15✔
319
            NOTIFY "${this.config.table}", '${JSON.stringify({
15✔
320
              operation: "INSERT",
15✔
321
              id: committed[0].name,
15✔
322
              position: committed[0].id,
15✔
323
            })}';
15✔
324
            COMMIT;
325
            `
326
        )
15✔
327
        .catch((error) => {
15✔
328
          logger.error(error);
1✔
329
          throw new ConcurrencyError(
1✔
330
            stream,
1✔
331
            version,
1✔
332
            msgs as unknown as Message<Schemas, string>[],
1✔
333
            expectedVersion || -1
1✔
334
          );
1✔
335
        });
15✔
336
      return committed;
14✔
337
    } catch (error) {
20✔
338
      await client.query("ROLLBACK").catch(() => {});
4✔
339
      throw error;
4✔
340
    } finally {
20!
341
      client.release();
18✔
342
    }
18✔
343
  }
20✔
344

345
  /**
346
   * Polls the store for unblocked streams needing processing, ordered by lease watermark ascending.
347
   * @param limit - Maximum number of streams to poll.
348
   * @returns The polled streams.
349
   */
350
  async poll(limit: number) {
19✔
351
    const { rows } = await this._pool.query<{
3✔
352
      stream: string;
353
      at: number;
354
      source: string;
355
    }>(
356
      `
3✔
357
      SELECT stream, at
358
      FROM ${this._fqs}
3✔
359
      WHERE blocked=false AND (leased_by IS NULL OR leased_until <= NOW())
360
      ORDER BY at ASC
361
      LIMIT $1::integer
362
      `,
363
      [limit]
3✔
364
    );
3✔
365
    return rows;
2✔
366
  }
3✔
367

368
  /**
369
   * Lease streams for reaction processing, marking them as in-progress.
370
   *
371
   * @param leases - Lease requests for streams, including end-of-lease watermark, lease holder, and source stream.
372
   * @param millis - Lease duration in milliseconds.
373
   * @returns Array of leased objects with updated lease info
374
   */
375
  async lease(leases: Lease[], millis: number): Promise<Lease[]> {
19✔
376
    const client = await this._pool.connect();
1✔
377
    try {
1✔
378
      await client.query("BEGIN");
1!
379
      // insert new streams
380
      await client.query(
×
381
        `
×
NEW
382
        INSERT INTO ${this._fqs} (stream, source)
×
383
        SELECT lease->>'stream', lease->>'source'
384
        FROM jsonb_array_elements($1::jsonb) AS lease
385
        ON CONFLICT (stream) DO NOTHING
386
        `,
NEW
387
        [JSON.stringify(leases)]
×
388
      );
×
389
      // set leases
390
      const { rows } = await client.query<{
×
391
        stream: string;
392
        source: string | null;
393
        leased_at: number;
394
        leased_by: string;
395
        leased_until: number;
396
        retry: number;
397
      }>(
398
        `
×
399
      WITH input AS (
400
        SELECT * FROM jsonb_to_recordset($1::jsonb)
401
        AS x(stream text, at int, by uuid)
402
      ), free AS (
NEW
403
        SELECT s.stream FROM ${this._fqs} s
×
404
        JOIN input i ON s.stream = i.stream
405
        WHERE s.leased_by IS NULL OR s.leased_until <= NOW()
406
        FOR UPDATE
407
      )
NEW
408
      UPDATE ${this._fqs} s
×
409
      SET
410
        leased_by = i.by,
411
        leased_at = i.at,
412
        leased_until = NOW() + ($2::integer || ' milliseconds')::interval,
413
        retry = CASE WHEN $2::integer > 0 THEN s.retry + 1 ELSE s.retry END
414
      FROM input i, free f
415
      WHERE s.stream = f.stream AND s.stream = i.stream
416
      RETURNING s.stream, s.source, s.leased_at, s.leased_until, s.retry
417
      `,
NEW
418
        [JSON.stringify(leases), millis]
×
419
      );
×
420
      await client.query("COMMIT");
×
421

NEW
422
      return rows.map(
×
NEW
423
        ({ stream, source, leased_at, leased_by, leased_until, retry }) => ({
×
NEW
424
          stream,
×
NEW
425
          source: source ?? undefined,
×
NEW
426
          at: leased_at,
×
NEW
427
          by: leased_by,
×
NEW
428
          until: new Date(leased_until),
×
NEW
429
          retry,
×
NEW
430
        })
×
NEW
431
      );
×
432
    } catch (error) {
1✔
433
      await client.query("ROLLBACK").catch(() => {});
1✔
434
      logger.error(error);
1✔
435
      return [];
1✔
436
    } finally {
1!
437
      client.release();
1✔
438
    }
1✔
439
  }
1✔
440

441
  /**
442
   * Acknowledge and release leases after processing, updating stream positions.
443
   *
444
   * @param leases - Leases to acknowledge, including last processed watermark and lease holder.
445
   * @returns Acked leases.
446
   */
447
  async ack(leases: Lease[]): Promise<Lease[]> {
19✔
448
    const client = await this._pool.connect();
1✔
449
    try {
1✔
450
      await client.query("BEGIN");
1!
NEW
451
      const { rows } = await client.query<{
×
452
        stream: string;
453
        source: string | null;
454
        at: number;
455
        retry: number;
456
      }>(
NEW
457
        `
×
458
      WITH input AS (
459
        SELECT * FROM jsonb_to_recordset($1::jsonb)
460
        AS x(stream text, by uuid, at int)
461
      )
NEW
462
      UPDATE ${this._fqs} AS s
×
463
      SET
464
        at = i.at,
465
        retry = -1,
466
        leased_by = NULL,
467
        leased_at = NULL,
468
        leased_until = NULL
469
      FROM input i
470
      WHERE s.stream = i.stream AND s.leased_by = i.by
471
      RETURNING s.stream, s.source, s.at, s.retry
472
      `,
NEW
473
        [JSON.stringify(leases)]
×
NEW
474
      );
×
NEW
475
      await client.query("COMMIT");
×
476

NEW
477
      return rows.map((row) => ({
×
NEW
478
        stream: row.stream,
×
NEW
479
        source: row.source ?? undefined,
×
NEW
480
        at: row.at,
×
NEW
481
        by: "",
×
NEW
482
        retry: row.retry,
×
NEW
483
      }));
×
484
    } catch (error) {
1✔
485
      await client.query("ROLLBACK").catch(() => {});
1✔
486
      logger.error(error);
1✔
487
      return [];
1✔
488
    } finally {
1!
489
      client.release();
1✔
490
    }
1✔
491
  }
1✔
492

493
  /**
494
   * Block a stream for processing after failing to process and reaching max retries with blocking enabled.
495
   * @param leases - Leases to block, including lease holder and last error message.
496
   * @returns Blocked leases.
497
   */
498
  async block(
19✔
499
    leases: Array<Lease & { error: string }>
1✔
500
  ): Promise<(Lease & { error: string })[]> {
1✔
501
    const client = await this._pool.connect();
1✔
502
    try {
1✔
503
      await client.query("BEGIN");
1!
NEW
504
      const { rows } = await client.query<{
×
505
        stream: string;
506
        source: string | null;
507
        at: number;
508
        by: string;
509
        retry: number;
510
        error: string;
511
      }>(
NEW
512
        `
×
513
      WITH input AS (
514
        SELECT * FROM jsonb_to_recordset($1::jsonb)
515
        AS x(stream text, by uuid, error text)
516
      )
NEW
517
      UPDATE ${this._fqs} AS s
×
518
      SET blocked = true, error = i.error
519
      FROM input i
520
      WHERE s.stream = i.stream AND s.leased_by = i.by AND s.blocked = false
521
      RETURNING s.stream, s.source, s.at, i.by, s.retry, s.error
522
      `,
NEW
523
        [JSON.stringify(leases)]
×
NEW
524
      );
×
525
      await client.query("COMMIT");
×
526

NEW
527
      return rows.map((row) => ({
×
NEW
528
        stream: row.stream,
×
NEW
529
        source: row.source ?? undefined,
×
NEW
530
        at: row.at,
×
NEW
531
        by: row.by,
×
NEW
532
        retry: row.retry,
×
NEW
533
        error: row.error ?? "",
×
NEW
534
      }));
×
535
    } catch (error) {
1✔
536
      await client.query("ROLLBACK").catch(() => {});
1✔
537
      logger.error(error);
1✔
538
      return [];
1✔
539
    } finally {
1!
540
      client.release();
1✔
541
    }
1✔
542
  }
1✔
543
}
19✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc