• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

Rotorsoft / act-root / 15932246775

27 Jun 2025 05:25PM UTC coverage: 96.662% (+0.8%) from 95.888%
15932246775

push

github

rotorsoft
chore: fix test

238 of 254 branches covered (93.7%)

Branch coverage included in aggregate %.

428 of 435 relevant lines covered (98.39%)

161.76 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

93.96
/libs/act-pg/src/PostgresStore.ts
1
import type {
2
  Committed,
3
  EventMeta,
4
  Lease,
5
  Message,
6
  Query,
7
  Schemas,
8
  Store,
9
} from "@rotorsoft/act";
10
import { ConcurrencyError, SNAP_EVENT, logger } from "@rotorsoft/act";
11
import pg from "pg";
12
import { dateReviver } from "./utils.js";
13

14
const { Pool, types } = pg;
4✔
15
types.setTypeParser(types.builtins.JSONB, (val) =>
4✔
16
  JSON.parse(val, dateReviver)
180✔
17
);
18

19
type Config = Readonly<{
20
  host: string;
21
  port: number;
22
  database: string;
23
  user: string;
24
  password: string;
25
  schema: string;
26
  table: string;
27
  leaseMillis: number;
28
}>;
29

30
const DEFAULT_CONFIG: Config = {
4✔
31
  host: "localhost",
32
  port: 5432,
33
  database: "postgres",
34
  user: "postgres",
35
  password: "postgres",
36
  schema: "public",
37
  table: "events",
38
  leaseMillis: 30_000,
39
};
40

41
export class PostgresStore implements Store {
42
  private _pool;
43
  readonly config: Config;
44

45
  constructor(config: Partial<Config> = {}) {
×
46
    this.config = { ...DEFAULT_CONFIG, ...config };
9✔
47
    this._pool = new Pool(this.config);
9✔
48
  }
49

50
  async dispose() {
51
    await this._pool.end();
1✔
52
  }
53

54
  async seed() {
55
    const client = await this._pool.connect();
4✔
56

57
    try {
4✔
58
      await client.query("BEGIN");
4✔
59

60
      // Create schema
61
      await client.query(
3✔
62
        `CREATE SCHEMA IF NOT EXISTS "${this.config.schema}";`
63
      );
64

65
      // Events table
66
      await client.query(
3✔
67
        `CREATE TABLE IF NOT EXISTS "${this.config.schema}"."${this.config.table}" (
68
          id serial PRIMARY KEY,
69
          name varchar(100) COLLATE pg_catalog."default" NOT NULL,
70
          data jsonb,
71
          stream varchar(100) COLLATE pg_catalog."default" NOT NULL,
72
          version int NOT NULL,
73
          created timestamptz NOT NULL DEFAULT now(),
74
          meta jsonb
75
        ) TABLESPACE pg_default;`
76
      );
77

78
      // Indexes on events
79
      await client.query(
3✔
80
        `CREATE UNIQUE INDEX IF NOT EXISTS "${this.config.table}_stream_ix" 
81
        ON "${this.config.schema}"."${this.config.table}" (stream COLLATE pg_catalog."default", version);`
82
      );
83
      await client.query(
3✔
84
        `CREATE INDEX IF NOT EXISTS "${this.config.table}_name_ix" 
85
        ON "${this.config.schema}"."${this.config.table}" (name COLLATE pg_catalog."default");`
86
      );
87
      await client.query(
3✔
88
        `CREATE INDEX IF NOT EXISTS "${this.config.table}_created_id_ix" 
89
        ON "${this.config.schema}"."${this.config.table}" (created, id);`
90
      );
91
      await client.query(
3✔
92
        `CREATE INDEX IF NOT EXISTS "${this.config.table}_correlation_ix" 
93
        ON "${this.config.schema}"."${this.config.table}" ((meta ->> 'correlation') COLLATE pg_catalog."default");`
94
      );
95

96
      // Streams table
97
      await client.query(
3✔
98
        `CREATE TABLE IF NOT EXISTS "${this.config.schema}"."${this.config.table}_streams" (
99
          stream varchar(100) COLLATE pg_catalog."default" PRIMARY KEY,
100
          at int NOT NULL DEFAULT -1,
101
          retry smallint NOT NULL DEFAULT 0,
102
          blocked boolean NOT NULL DEFAULT false,
103
          leased_at int,
104
          leased_by uuid,
105
          leased_until timestamptz
106
        ) TABLESPACE pg_default;`
107
      );
108

109
      // Index for fetching streams
110
      await client.query(
3✔
111
        `CREATE INDEX IF NOT EXISTS "${this.config.table}_streams_fetch_ix" 
112
        ON "${this.config.schema}"."${this.config.table}_streams" (blocked, at);`
113
      );
114

115
      await client.query("COMMIT");
3✔
116
      logger.info(
3✔
117
        `Seeded schema "${this.config.schema}" with table "${this.config.table}"`
118
      );
119
    } catch (error) {
120
      await client.query("ROLLBACK");
1✔
121
      logger.error("Failed to seed store:", error);
×
122
      throw error;
×
123
    } finally {
124
      client.release();
4✔
125
    }
126
  }
127

128
  async drop() {
129
    await this._pool.query(
3✔
130
      `
131
      DO $$
132
      BEGIN
133
        IF EXISTS (SELECT 1 FROM information_schema.schemata
134
          WHERE schema_name = '${this.config.schema}'
135
        ) THEN
136
          EXECUTE 'DROP TABLE IF EXISTS "${this.config.schema}"."${this.config.table}"';
137
          EXECUTE 'DROP TABLE IF EXISTS "${this.config.schema}"."${this.config.table}_streams"';
138
          IF '${this.config.schema}' <> 'public' THEN
139
            EXECUTE 'DROP SCHEMA "${this.config.schema}" CASCADE';
140
          END IF;
141
        END IF;
142
      END
143
      $$;
144
    `
145
    );
146
  }
147

148
  async query<E extends Schemas>(
149
    callback: (event: Committed<E, keyof E>) => void,
150
    query?: Query,
151
    withSnaps = false
13✔
152
  ) {
153
    const {
154
      stream,
155
      names,
156
      before,
157
      after,
158
      limit,
159
      created_before,
160
      created_after,
161
      backward,
162
      correlation,
163
    } = query || {};
15!
164

165
    let sql = `SELECT * FROM "${this.config.schema}"."${this.config.table}" WHERE`;
15✔
166
    const values: any[] = [];
15✔
167

168
    if (withSnaps)
15✔
169
      sql = sql.concat(
2✔
170
        ` id>=COALESCE((SELECT id
171
            FROM "${this.config.schema}"."${this.config.table}"
172
            WHERE stream='${stream}' AND name='${SNAP_EVENT}'
173
            ORDER BY id DESC LIMIT 1), 0)
174
            AND stream='${stream}'`
175
      );
13!
176
    else if (query) {
177
      if (typeof after !== "undefined") {
13✔
178
        values.push(after);
8✔
179
        sql = sql.concat(" id>$1");
8✔
180
      } else sql = sql.concat(" id>-1");
5✔
181
      if (stream) {
13✔
182
        values.push(stream);
2✔
183
        sql = sql.concat(` AND stream=$${values.length}`);
2✔
184
      }
185
      if (names && names.length) {
13✔
186
        values.push(names);
1✔
187
        sql = sql.concat(` AND name = ANY($${values.length})`);
1✔
188
      }
189
      if (before) {
13✔
190
        values.push(before);
1✔
191
        sql = sql.concat(` AND id<$${values.length}`);
1✔
192
      }
193
      if (created_after) {
13✔
194
        values.push(created_after.toISOString());
1✔
195
        sql = sql.concat(` AND created>$${values.length}`);
1✔
196
      }
197
      if (created_before) {
13✔
198
        values.push(created_before.toISOString());
1✔
199
        sql = sql.concat(` AND created<$${values.length}`);
1✔
200
      }
201
      if (correlation) {
13✔
202
        values.push(correlation);
1✔
203
        sql = sql.concat(` AND meta->>'correlation'=$${values.length}`);
1✔
204
      }
205
    }
206
    sql = sql.concat(` ORDER BY id ${backward ? "DESC" : "ASC"}`);
15!
207
    if (limit) {
15✔
208
      values.push(limit);
10✔
209
      sql = sql.concat(` LIMIT $${values.length}`);
10✔
210
    }
211

212
    const result = await this._pool.query<Committed<E, keyof E>>(sql, values);
15✔
213
    for (const row of result.rows) callback(row);
63✔
214

215
    return result.rowCount ?? 0;
15!
216
  }
217

218
  async commit<E extends Schemas>(
219
    stream: string,
220
    msgs: Message<E, keyof E>[],
221
    meta: EventMeta,
222
    expectedVersion?: number
223
  ) {
224
    const client = await this._pool.connect();
16✔
225
    let version = -1;
16✔
226
    try {
16✔
227
      await client.query("BEGIN");
16✔
228

229
      const last = await client.query<Committed<E, keyof E>>(
16✔
230
        `SELECT version
231
        FROM "${this.config.schema}"."${this.config.table}"
232
        WHERE stream=$1 ORDER BY version DESC LIMIT 1`,
233
        [stream]
234
      );
235
      version = last.rowCount ? last.rows[0].version : -1;
16✔
236
      if (expectedVersion && version !== expectedVersion)
16✔
237
        throw new ConcurrencyError(
2✔
238
          version,
239
          msgs as unknown as Message<Schemas, string>[],
240
          expectedVersion
241
        );
242

243
      const committed = await Promise.all(
14✔
244
        msgs.map(async ({ name, data }) => {
245
          version++;
28✔
246
          const sql = `
28✔
247
          INSERT INTO "${this.config.schema}"."${this.config.table}"(name, data, stream, version, meta) 
248
          VALUES($1, $2, $3, $4, $5) RETURNING *`;
249
          const vals = [name, data, stream, version, meta];
28✔
250
          const { rows } = await client.query<Committed<E, keyof E>>(sql, vals);
28✔
251
          return rows.at(0)!;
28✔
252
        })
253
      );
254

255
      await client
14✔
256
        .query(
257
          `
258
            NOTIFY "${this.config.table}", '${JSON.stringify({
259
              operation: "INSERT",
260
              id: committed[0].name,
261
              position: committed[0].id,
262
            })}';
263
            COMMIT;
264
            `
265
        )
266
        .catch((error) => {
267
          logger.error(error);
1✔
268
          throw new ConcurrencyError(
1✔
269
            version,
270
            msgs as unknown as Message<Schemas, string>[],
271
            expectedVersion || -1
2✔
272
          );
273
        });
274
      return committed;
13✔
275
    } catch (error) {
276
      await client.query("ROLLBACK").catch(() => {});
3✔
277
      throw error;
3✔
278
    } finally {
279
      client.release();
16✔
280
    }
281
  }
282

283
  async fetch<E extends Schemas>(limit: number) {
284
    const { rows } = await this._pool.query<{ stream: string; at: number }>(
6✔
285
      `
286
      SELECT stream, at
287
      FROM "${this.config.schema}"."${this.config.table}_streams"
288
      WHERE blocked=false
289
      ORDER BY at ASC
290
      LIMIT $1::integer
291
      `,
292
      [limit]
293
    );
294

295
    const after = rows.length
6✔
296
      ? rows.reduce((min, r) => Math.min(min, r.at), Number.MAX_SAFE_INTEGER)
11✔
297
      : -1;
298

299
    const events: Committed<E, keyof E>[] = [];
6✔
300
    await this.query<E>((e) => e.name !== SNAP_EVENT && events.push(e), {
33✔
301
      after,
302
      limit,
303
    });
304
    return { streams: rows.map(({ stream }) => stream), events };
11✔
305
  }
306

307
  async lease(leases: Lease[]) {
308
    const { by, at } = leases.at(0)!;
6✔
309
    const streams = leases.map(({ stream }) => stream);
7✔
310
    const client = await this._pool.connect();
6✔
311

312
    try {
6✔
313
      await client.query("BEGIN");
6✔
314
      // insert new streams
315
      await client.query(
6✔
316
        `
317
        INSERT INTO "${this.config.schema}"."${this.config.table}_streams" (stream)
318
        SELECT UNNEST($1::text[])
319
        ON CONFLICT (stream) DO NOTHING
320
        `,
321
        [streams]
322
      );
323
      // set leases
324
      const { rows } = await client.query<{
6✔
325
        stream: string;
326
        leased_at: number;
327
        retry: number;
328
      }>(
329
        `
330
        WITH free AS (
331
          SELECT * FROM "${this.config.schema}"."${this.config.table}_streams" 
332
          WHERE stream = ANY($1::text[]) AND (leased_by IS NULL OR leased_until <= NOW())
333
          FOR UPDATE
334
        )
335
        UPDATE "${this.config.schema}"."${this.config.table}_streams" U
336
        SET
337
          leased_by = $2::uuid,
338
          leased_at = $3::integer,
339
          leased_until = NOW() + ($4::integer || ' milliseconds')::interval
340
        FROM free
341
        WHERE U.stream = free.stream
342
        RETURNING U.stream, U.leased_at, U.retry
343
        `,
344
        [streams, by, at, this.config.leaseMillis]
345
      );
346
      await client.query("COMMIT");
6✔
347

348
      return rows.map(({ stream, leased_at, retry }) => ({
6✔
349
        stream,
350
        by,
351
        at: leased_at,
352
        retry,
353
        block: false,
354
      }));
355
    } catch (error) {
356
      await client.query("ROLLBACK").catch(() => {});
×
357
      throw error;
×
358
    } finally {
359
      client.release();
6✔
360
    }
361
  }
362

363
  async ack(leases: Lease[]) {
364
    const client = await this._pool.connect();
8✔
365

366
    try {
8✔
367
      await client.query("BEGIN");
8✔
368
      for (const { stream, by, at, retry, block } of leases) {
4✔
369
        await client.query(
4✔
370
          `UPDATE "${this.config.schema}"."${this.config.table}_streams"
371
          SET
372
            at = $3::integer,
373
            retry = $4::integer,
374
            blocked = $5::boolean,
375
            leased_by = NULL,
376
            leased_at = NULL,
377
            leased_until = NULL
378
          WHERE
379
            stream = $1::text
380
            AND leased_by = $2::uuid`,
381
          [stream, by, at, retry, block]
382
        );
383
      }
384
      await client.query("COMMIT");
4✔
385
    } catch {
386
      // leased_until fallback
387
      await client.query("ROLLBACK").catch(() => {});
4✔
388
    } finally {
389
      client.release();
8✔
390
    }
391
  }
392
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc