• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

Rotorsoft / act-root / 15946344504

28 Jun 2025 05:05PM UTC coverage: 95.791% (-0.9%) from 96.662%
15946344504

push

github

rotorsoft
chore: up lock

233 of 254 branches covered (91.73%)

Branch coverage included in aggregate %.

427 of 435 relevant lines covered (98.16%)

18.99 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

93.96
/libs/act-pg/src/PostgresStore.ts
1
import type {
2
  Committed,
3
  EventMeta,
4
  Lease,
5
  Message,
6
  Query,
7
  Schemas,
8
  Store,
9
} from "@rotorsoft/act";
10
import { ConcurrencyError, SNAP_EVENT, logger } from "@rotorsoft/act";
11
import pg from "pg";
12
import { dateReviver } from "./utils.js";
13

14
const { Pool, types } = pg;
4✔
15
types.setTypeParser(types.builtins.JSONB, (val) =>
4✔
16
  JSON.parse(val, dateReviver)
180✔
17
);
18

19
type Config = Readonly<{
20
  host: string;
21
  port: number;
22
  database: string;
23
  user: string;
24
  password: string;
25
  schema: string;
26
  table: string;
27
  leaseMillis: number;
28
}>;
29

30
const DEFAULT_CONFIG: Config = {
4✔
31
  host: "localhost",
32
  port: 5432,
33
  database: "postgres",
34
  user: "postgres",
35
  password: "postgres",
36
  schema: "public",
37
  table: "events",
38
  leaseMillis: 30_000,
39
};
40

41
/**
42
 * PostgresStore is a production-ready event store adapter for Act, using PostgreSQL as the backend.
43
 *
44
 * - Supports event sourcing, leasing, snapshots, and concurrency control.
45
 * - Designed for high-throughput, scalable, and reliable event storage.
46
 * - Implements the Act Store interface.
47
 *
48
 * @example
49
 * import { PostgresStore } from "@act/pg";
50
 * const store = new PostgresStore({ schema: "my_schema", table: "events" });
51
 * await store.seed();
52
 *
53
 * @see https://github.com/rotorsoft/act-root
54
 */
55
export class PostgresStore implements Store {
56
  private _pool;
57
  readonly config: Config;
58

59
  /**
60
   * Create a new PostgresStore instance.
61
   * @param config Partial configuration (host, port, user, password, schema, table, etc.)
62
   */
63
  constructor(config: Partial<Config> = {}) {
×
64
    this.config = { ...DEFAULT_CONFIG, ...config };
9✔
65
    this._pool = new Pool(this.config);
9✔
66
  }
67

68
  /**
69
   * Dispose of the store and close all database connections.
70
   * @returns Promise that resolves when all connections are closed
71
   */
72
  async dispose() {
73
    await this._pool.end();
1✔
74
  }
75

76
  /**
77
   * Seed the database with required tables, indexes, and schema for event storage.
78
   * @returns Promise that resolves when seeding is complete
79
   * @throws Error if seeding fails
80
   */
81
  async seed() {
82
    const client = await this._pool.connect();
4✔
83

84
    try {
4✔
85
      await client.query("BEGIN");
4✔
86

87
      // Create schema
88
      await client.query(
3✔
89
        `CREATE SCHEMA IF NOT EXISTS "${this.config.schema}";`
90
      );
91

92
      // Events table
93
      await client.query(
3✔
94
        `CREATE TABLE IF NOT EXISTS "${this.config.schema}"."${this.config.table}" (
95
          id serial PRIMARY KEY,
96
          name varchar(100) COLLATE pg_catalog."default" NOT NULL,
97
          data jsonb,
98
          stream varchar(100) COLLATE pg_catalog."default" NOT NULL,
99
          version int NOT NULL,
100
          created timestamptz NOT NULL DEFAULT now(),
101
          meta jsonb
102
        ) TABLESPACE pg_default;`
103
      );
104

105
      // Indexes on events
106
      await client.query(
3✔
107
        `CREATE UNIQUE INDEX IF NOT EXISTS "${this.config.table}_stream_ix" 
108
        ON "${this.config.schema}"."${this.config.table}" (stream COLLATE pg_catalog."default", version);`
109
      );
110
      await client.query(
3✔
111
        `CREATE INDEX IF NOT EXISTS "${this.config.table}_name_ix" 
112
        ON "${this.config.schema}"."${this.config.table}" (name COLLATE pg_catalog."default");`
113
      );
114
      await client.query(
3✔
115
        `CREATE INDEX IF NOT EXISTS "${this.config.table}_created_id_ix" 
116
        ON "${this.config.schema}"."${this.config.table}" (created, id);`
117
      );
118
      await client.query(
3✔
119
        `CREATE INDEX IF NOT EXISTS "${this.config.table}_correlation_ix" 
120
        ON "${this.config.schema}"."${this.config.table}" ((meta ->> 'correlation') COLLATE pg_catalog."default");`
121
      );
122

123
      // Streams table
124
      await client.query(
3✔
125
        `CREATE TABLE IF NOT EXISTS "${this.config.schema}"."${this.config.table}_streams" (
126
          stream varchar(100) COLLATE pg_catalog."default" PRIMARY KEY,
127
          at int NOT NULL DEFAULT -1,
128
          retry smallint NOT NULL DEFAULT 0,
129
          blocked boolean NOT NULL DEFAULT false,
130
          leased_at int,
131
          leased_by uuid,
132
          leased_until timestamptz
133
        ) TABLESPACE pg_default;`
134
      );
135

136
      // Index for fetching streams
137
      await client.query(
3✔
138
        `CREATE INDEX IF NOT EXISTS "${this.config.table}_streams_fetch_ix" 
139
        ON "${this.config.schema}"."${this.config.table}_streams" (blocked, at);`
140
      );
141

142
      await client.query("COMMIT");
3✔
143
      logger.info(
3✔
144
        `Seeded schema "${this.config.schema}" with table "${this.config.table}"`
145
      );
146
    } catch (error) {
147
      await client.query("ROLLBACK");
1✔
148
      logger.error("Failed to seed store:", error);
×
149
      throw error;
×
150
    } finally {
151
      client.release();
4✔
152
    }
153
  }
154

155
  /**
156
   * Drop all tables and schema created by the store (for testing or cleanup).
157
   * @returns Promise that resolves when the schema is dropped
158
   */
159
  async drop() {
160
    await this._pool.query(
3✔
161
      `
162
      DO $$
163
      BEGIN
164
        IF EXISTS (SELECT 1 FROM information_schema.schemata
165
          WHERE schema_name = '${this.config.schema}'
166
        ) THEN
167
          EXECUTE 'DROP TABLE IF EXISTS "${this.config.schema}"."${this.config.table}"';
168
          EXECUTE 'DROP TABLE IF EXISTS "${this.config.schema}"."${this.config.table}_streams"';
169
          IF '${this.config.schema}' <> 'public' THEN
170
            EXECUTE 'DROP SCHEMA "${this.config.schema}" CASCADE';
171
          END IF;
172
        END IF;
173
      END
174
      $$;
175
    `
176
    );
177
  }
178

179
  /**
180
   * Query events from the store, optionally filtered by stream, event name, time, etc.
181
   *
182
   * @param callback Function called for each event found
183
   * @param query (Optional) Query filter (stream, names, before, after, etc.)
184
   * @param withSnaps (Optional) If true, includes only events after the last snapshot
185
   * @returns The number of events found
186
   *
187
   * @example
188
   * await store.query((event) => console.log(event), { stream: "A" });
189
   */
190
  async query<E extends Schemas>(
191
    callback: (event: Committed<E, keyof E>) => void,
192
    query?: Query,
193
    withSnaps = false
13✔
194
  ) {
195
    const {
196
      stream,
197
      names,
198
      before,
199
      after,
200
      limit,
201
      created_before,
202
      created_after,
203
      backward,
204
      correlation,
205
    } = query || {};
15!
206

207
    let sql = `SELECT * FROM "${this.config.schema}"."${this.config.table}" WHERE`;
15✔
208
    const values: any[] = [];
15✔
209

210
    if (withSnaps)
15✔
211
      sql = sql.concat(
2✔
212
        ` id>=COALESCE((SELECT id
213
            FROM "${this.config.schema}"."${this.config.table}"
214
            WHERE stream='${stream}' AND name='${SNAP_EVENT}'
215
            ORDER BY id DESC LIMIT 1), 0)
216
            AND stream='${stream}'`
217
      );
13!
218
    else if (query) {
219
      if (typeof after !== "undefined") {
13✔
220
        values.push(after);
8✔
221
        sql = sql.concat(" id>$1");
8✔
222
      } else sql = sql.concat(" id>-1");
5✔
223
      if (stream) {
13✔
224
        values.push(stream);
2✔
225
        sql = sql.concat(` AND stream=$${values.length}`);
2✔
226
      }
227
      if (names && names.length) {
13✔
228
        values.push(names);
1✔
229
        sql = sql.concat(` AND name = ANY($${values.length})`);
1✔
230
      }
231
      if (before) {
13✔
232
        values.push(before);
1✔
233
        sql = sql.concat(` AND id<$${values.length}`);
1✔
234
      }
235
      if (created_after) {
13✔
236
        values.push(created_after.toISOString());
1✔
237
        sql = sql.concat(` AND created>$${values.length}`);
1✔
238
      }
239
      if (created_before) {
13✔
240
        values.push(created_before.toISOString());
1✔
241
        sql = sql.concat(` AND created<$${values.length}`);
1✔
242
      }
243
      if (correlation) {
13✔
244
        values.push(correlation);
1✔
245
        sql = sql.concat(` AND meta->>'correlation'=$${values.length}`);
1✔
246
      }
247
    }
248
    sql = sql.concat(` ORDER BY id ${backward ? "DESC" : "ASC"}`);
15!
249
    if (limit) {
15✔
250
      values.push(limit);
10✔
251
      sql = sql.concat(` LIMIT $${values.length}`);
10✔
252
    }
253

254
    const result = await this._pool.query<Committed<E, keyof E>>(sql, values);
15✔
255
    for (const row of result.rows) callback(row);
63✔
256

257
    return result.rowCount ?? 0;
15!
258
  }
259

260
  /**
261
   * Commit new events to the store for a given stream, with concurrency control.
262
   *
263
   * @param stream The stream name
264
   * @param msgs Array of messages (event name and data)
265
   * @param meta Event metadata (correlation, causation, etc.)
266
   * @param expectedVersion (Optional) Expected stream version for concurrency control
267
   * @returns Array of committed events
268
   * @throws ConcurrencyError if the expected version does not match
269
   */
270
  async commit<E extends Schemas>(
271
    stream: string,
272
    msgs: Message<E, keyof E>[],
273
    meta: EventMeta,
274
    expectedVersion?: number
275
  ) {
276
    const client = await this._pool.connect();
16✔
277
    let version = -1;
16✔
278
    try {
16✔
279
      await client.query("BEGIN");
16✔
280

281
      const last = await client.query<Committed<E, keyof E>>(
16✔
282
        `SELECT version
283
        FROM "${this.config.schema}"."${this.config.table}"
284
        WHERE stream=$1 ORDER BY version DESC LIMIT 1`,
285
        [stream]
286
      );
287
      version = last.rowCount ? last.rows[0].version : -1;
16✔
288
      if (expectedVersion && version !== expectedVersion)
16✔
289
        throw new ConcurrencyError(
2✔
290
          version,
291
          msgs as unknown as Message<Schemas, string>[],
292
          expectedVersion
293
        );
294

295
      const committed = await Promise.all(
14✔
296
        msgs.map(async ({ name, data }) => {
297
          version++;
28✔
298
          const sql = `
28✔
299
          INSERT INTO "${this.config.schema}"."${this.config.table}"(name, data, stream, version, meta) 
300
          VALUES($1, $2, $3, $4, $5) RETURNING *`;
301
          const vals = [name, data, stream, version, meta];
28✔
302
          const { rows } = await client.query<Committed<E, keyof E>>(sql, vals);
28✔
303
          return rows.at(0)!;
28✔
304
        })
305
      );
306

307
      await client
14✔
308
        .query(
309
          `
310
            NOTIFY "${this.config.table}", '${JSON.stringify({
311
              operation: "INSERT",
312
              id: committed[0].name,
313
              position: committed[0].id,
314
            })}';
315
            COMMIT;
316
            `
317
        )
318
        .catch((error) => {
319
          logger.error(error);
1✔
320
          throw new ConcurrencyError(
1✔
321
            version,
322
            msgs as unknown as Message<Schemas, string>[],
323
            expectedVersion || -1
2✔
324
          );
325
        });
326
      return committed;
13✔
327
    } catch (error) {
328
      await client.query("ROLLBACK").catch(() => {});
3✔
329
      throw error;
3✔
330
    } finally {
331
      client.release();
16✔
332
    }
333
  }
334

335
  /**
336
   * Fetch a batch of events and streams for processing (drain cycle).
337
   *
338
   * @param limit The maximum number of events to fetch
339
   * @returns An object with arrays of streams and events
340
   */
341
  async fetch<E extends Schemas>(limit: number) {
342
    const { rows } = await this._pool.query<{ stream: string; at: number }>(
6✔
343
      `
344
      SELECT stream, at
345
      FROM "${this.config.schema}"."${this.config.table}_streams"
346
      WHERE blocked=false
347
      ORDER BY at ASC
348
      LIMIT $1::integer
349
      `,
350
      [limit]
351
    );
352

353
    const after = rows.length
6✔
354
      ? rows.reduce((min, r) => Math.min(min, r.at), Number.MAX_SAFE_INTEGER)
11✔
355
      : -1;
356

357
    const events: Committed<E, keyof E>[] = [];
6✔
358
    await this.query<E>((e) => e.name !== SNAP_EVENT && events.push(e), {
33✔
359
      after,
360
      limit,
361
    });
362
    return { streams: rows.map(({ stream }) => stream), events };
11✔
363
  }
364

365
  /**
366
   * Lease streams for reaction processing, marking them as in-progress.
367
   *
368
   * @param leases Array of lease objects (stream, at, etc.)
369
   * @returns Array of leased objects with updated lease info
370
   */
371
  async lease(leases: Lease[]) {
372
    const { by, at } = leases.at(0)!;
6✔
373
    const streams = leases.map(({ stream }) => stream);
7✔
374
    const client = await this._pool.connect();
6✔
375

376
    try {
6✔
377
      await client.query("BEGIN");
6✔
378
      // insert new streams
379
      await client.query(
6✔
380
        `
381
        INSERT INTO "${this.config.schema}"."${this.config.table}_streams" (stream)
382
        SELECT UNNEST($1::text[])
383
        ON CONFLICT (stream) DO NOTHING
384
        `,
385
        [streams]
386
      );
387
      // set leases
388
      const { rows } = await client.query<{
6✔
389
        stream: string;
390
        leased_at: number;
391
        retry: number;
392
      }>(
393
        `
394
        WITH free AS (
395
          SELECT * FROM "${this.config.schema}"."${this.config.table}_streams" 
396
          WHERE stream = ANY($1::text[]) AND (leased_by IS NULL OR leased_until <= NOW())
397
          FOR UPDATE
398
        )
399
        UPDATE "${this.config.schema}"."${this.config.table}_streams" U
400
        SET
401
          leased_by = $2::uuid,
402
          leased_at = $3::integer,
403
          leased_until = NOW() + ($4::integer || ' milliseconds')::interval
404
        FROM free
405
        WHERE U.stream = free.stream
406
        RETURNING U.stream, U.leased_at, U.retry
407
        `,
408
        [streams, by, at, this.config.leaseMillis]
409
      );
410
      await client.query("COMMIT");
6✔
411

412
      return rows.map(({ stream, leased_at, retry }) => ({
6✔
413
        stream,
414
        by,
415
        at: leased_at,
416
        retry,
417
        block: false,
418
      }));
419
    } catch (error) {
420
      await client.query("ROLLBACK").catch(() => {});
×
421
      throw error;
×
422
    } finally {
423
      client.release();
6✔
424
    }
425
  }
426

427
  /**
428
   * Acknowledge and release leases after processing, updating stream positions.
429
   *
430
   * @param leases Array of lease objects to acknowledge
431
   * @returns Promise that resolves when leases are acknowledged
432
   */
433
  async ack(leases: Lease[]) {
434
    const client = await this._pool.connect();
8✔
435

436
    try {
8✔
437
      await client.query("BEGIN");
8✔
438
      for (const { stream, by, at, retry, block } of leases) {
4✔
439
        await client.query(
4✔
440
          `UPDATE "${this.config.schema}"."${this.config.table}_streams"
441
          SET
442
            at = $3::integer,
443
            retry = $4::integer,
444
            blocked = $5::boolean,
445
            leased_by = NULL,
446
            leased_at = NULL,
447
            leased_until = NULL
448
          WHERE
449
            stream = $1::text
450
            AND leased_by = $2::uuid`,
451
          [stream, by, at, retry, block]
452
        );
453
      }
454
      await client.query("COMMIT");
4✔
455
    } catch {
456
      // leased_until fallback
457
      await client.query("ROLLBACK").catch(() => {});
4✔
458
    } finally {
459
      client.release();
8✔
460
    }
461
  }
462
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc