• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

Rotorsoft / act-root / 15800362180

21 Jun 2025 10:45PM UTC coverage: 95.007% (+3.4%) from 91.629%
15800362180

push

github

rotorsoft
chore: reduce branch coverage

233 of 260 branches covered (89.62%)

Branch coverage included in aggregate %.

414 of 421 relevant lines covered (98.34%)

16.47 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

94.74
/libs/act-pg/src/PostgresStore.ts
1
import type {
2
  Committed,
3
  EventMeta,
4
  Lease,
5
  Message,
6
  Query,
7
  Schemas,
8
  Store,
9
} from "@rotorsoft/act";
10
import { ConcurrencyError, SNAP_EVENT, logger } from "@rotorsoft/act";
11
import pg from "pg";
12
import { config } from "./config.js";
13
import { seed_store } from "./seed.js";
14
import { dateReviver } from "./utils.js";
15

16
const { Pool, types } = pg;
3✔
17
types.setTypeParser(types.builtins.JSONB, (val) =>
3✔
18
  JSON.parse(val, dateReviver)
180✔
19
);
20

21
export class PostgresStore implements Store {
22
  private _pool = new Pool(config.pg);
3✔
23

24
  constructor(
25
    readonly table: string,
3✔
26
    readonly leaseMillis = 30_000
3✔
27
  ) {}
28
  async dispose() {
29
    await this._pool.end();
1✔
30
  }
31

32
  async seed() {
33
    const seed = seed_store(this.table);
2✔
34
    await this._pool.query(seed);
2✔
35
  }
36

37
  async drop() {
38
    await this._pool.query(`DROP TABLE IF EXISTS "${this.table}"`);
2✔
39
    await this._pool.query(`DROP TABLE IF EXISTS "${this.table}_streams"`);
2✔
40
  }
41

42
  async query<E extends Schemas>(
43
    callback: (event: Committed<E, keyof E>) => void,
44
    query?: Query,
45
    withSnaps = false
13✔
46
  ) {
47
    const {
48
      stream,
49
      names,
50
      before,
51
      after,
52
      limit,
53
      created_before,
54
      created_after,
55
      backward,
56
      correlation,
57
    } = query || {};
15!
58

59
    let sql = `SELECT * FROM "${this.table}" WHERE`;
15✔
60
    const values: any[] = [];
15✔
61

62
    if (withSnaps)
15✔
63
      sql = sql.concat(
2✔
64
        ` id>=COALESCE((SELECT id
65
            FROM "${this.table}"
66
            WHERE stream='${stream}' AND name='${SNAP_EVENT}'
67
            ORDER BY id DESC LIMIT 1), 0)
68
            AND stream='${stream}'`
69
      );
13!
70
    else if (query) {
71
      if (typeof after !== "undefined") {
13✔
72
        values.push(after);
8✔
73
        sql = sql.concat(" id>$1");
8✔
74
      } else sql = sql.concat(" id>-1");
5✔
75
      if (stream) {
13✔
76
        values.push(stream);
2✔
77
        sql = sql.concat(` AND stream=$${values.length}`);
2✔
78
      }
79
      if (names && names.length) {
13✔
80
        values.push(names);
1✔
81
        sql = sql.concat(` AND name = ANY($${values.length})`);
1✔
82
      }
83
      if (before) {
13✔
84
        values.push(before);
1✔
85
        sql = sql.concat(` AND id<$${values.length}`);
1✔
86
      }
87
      if (created_after) {
13✔
88
        values.push(created_after.toISOString());
1✔
89
        sql = sql.concat(` AND created>$${values.length}`);
1✔
90
      }
91
      if (created_before) {
13✔
92
        values.push(created_before.toISOString());
1✔
93
        sql = sql.concat(` AND created<$${values.length}`);
1✔
94
      }
95
      if (correlation) {
13✔
96
        values.push(correlation);
1✔
97
        sql = sql.concat(` AND meta->>'correlation'=$${values.length}`);
1✔
98
      }
99
    }
100
    sql = sql.concat(` ORDER BY id ${backward ? "DESC" : "ASC"}`);
15!
101
    if (limit) {
15✔
102
      values.push(limit);
10✔
103
      sql = sql.concat(` LIMIT $${values.length}`);
10✔
104
    }
105

106
    const result = await this._pool.query<Committed<E, keyof E>>(sql, values);
15✔
107
    for (const row of result.rows) callback(row);
63✔
108

109
    return result.rowCount ?? 0;
15!
110
  }
111

112
  async commit<E extends Schemas>(
113
    stream: string,
114
    msgs: Message<E, keyof E>[],
115
    meta: EventMeta,
116
    expectedVersion?: number
117
  ) {
118
    const client = await this._pool.connect();
15✔
119
    let version = -1;
15✔
120
    try {
15✔
121
      await client.query("BEGIN");
15✔
122

123
      const last = await client.query<Committed<E, keyof E>>(
15✔
124
        `SELECT version FROM "${this.table}" WHERE stream=$1 ORDER BY version DESC LIMIT 1`,
125
        [stream]
126
      );
127
      version = last.rowCount ? last.rows[0].version : -1;
15✔
128
      if (expectedVersion && version !== expectedVersion)
15✔
129
        throw new ConcurrencyError(
1✔
130
          version,
131
          msgs as unknown as Message<Schemas, string>[],
132
          expectedVersion
133
        );
134

135
      const committed = await Promise.all(
14✔
136
        msgs.map(async ({ name, data }) => {
137
          version++;
28✔
138
          const sql = `
28✔
139
          INSERT INTO "${this.table}"(name, data, stream, version, meta) 
140
          VALUES($1, $2, $3, $4, $5) RETURNING *`;
141
          const vals = [name, data, stream, version, meta];
28✔
142
          const { rows } = await client.query<Committed<E, keyof E>>(sql, vals);
28✔
143
          return rows.at(0)!;
28✔
144
        })
145
      );
146

147
      await client
14✔
148
        .query(
149
          `
150
            NOTIFY "${this.table}", '${JSON.stringify({
151
              operation: "INSERT",
152
              id: committed[0].name,
153
              position: committed[0].id,
154
            })}';
155
            COMMIT;
156
            `
157
        )
158
        .catch((error) => {
159
          logger.error(error);
1✔
160
          throw new ConcurrencyError(
1✔
161
            version,
162
            msgs as unknown as Message<Schemas, string>[],
163
            expectedVersion || -1
2✔
164
          );
165
        });
166
      return committed;
13✔
167
    } catch (error) {
168
      await client.query("ROLLBACK").catch(() => {});
2✔
169
      throw error;
2✔
170
    } finally {
171
      client.release();
15✔
172
    }
173
  }
174

175
  async fetch<E extends Schemas>(limit: number) {
176
    const { rows } = await this._pool.query<{ stream: string; at: number }>(
6✔
177
      `
178
      SELECT stream, at
179
      FROM "${this.table}_streams"
180
      WHERE blocked=false
181
      ORDER BY at ASC
182
      LIMIT $1::integer
183
      `,
184
      [limit]
185
    );
186

187
    const after = rows.length
6✔
188
      ? rows.reduce((min, r) => Math.min(min, r.at), Number.MAX_SAFE_INTEGER)
11✔
189
      : -1;
190

191
    const events: Committed<E, keyof E>[] = [];
6✔
192
    await this.query<E>((e) => events.push(e), { after, limit });
33✔
193
    return { streams: rows.map(({ stream }) => stream), events };
11✔
194
  }
195

196
  async lease(leases: Lease[]) {
197
    const { by, at } = leases.at(0)!;
6✔
198
    const streams = leases.map(({ stream }) => stream);
7✔
199
    const client = await this._pool.connect();
6✔
200

201
    try {
6✔
202
      await client.query("BEGIN");
6✔
203
      // insert new streams
204
      await client.query(
6✔
205
        `
206
        INSERT INTO "${this.table}_streams" (stream)
207
        SELECT UNNEST($1::text[])
208
        ON CONFLICT (stream) DO NOTHING
209
        `,
210
        [streams]
211
      );
212
      // set leases
213
      const { rows } = await client.query<{
6✔
214
        stream: string;
215
        leased_at: number;
216
        retry: number;
217
      }>(
218
        `
219
        WITH free AS (
220
          SELECT * FROM "${this.table}_streams" 
221
          WHERE stream = ANY($1::text[]) AND (leased_by IS NULL OR leased_until <= NOW())
222
          FOR UPDATE
223
        )
224
        UPDATE "${this.table}_streams" U
225
        SET
226
          leased_by = $2::uuid,
227
          leased_at = $3::integer,
228
          leased_until = NOW() + ($4::integer || ' milliseconds')::interval
229
        FROM free
230
        WHERE U.stream = free.stream
231
        RETURNING U.stream, U.leased_at, U.retry
232
        `,
233
        [streams, by, at, this.leaseMillis]
234
      );
235
      await client.query("COMMIT");
6✔
236

237
      return rows.map(({ stream, leased_at, retry }) => ({
6✔
238
        stream,
239
        by,
240
        at: leased_at,
241
        retry,
242
        block: false,
243
      }));
244
    } catch (error) {
245
      await client.query("ROLLBACK").catch(() => {});
×
246
      throw error;
×
247
    } finally {
248
      client.release();
6✔
249
    }
250
  }
251

252
  async ack(leases: Lease[]) {
253
    const client = await this._pool.connect();
4✔
254

255
    try {
4✔
256
      await client.query("BEGIN");
4✔
257
      for (const { stream, by, at, retry, block } of leases) {
4✔
258
        await client.query(
4✔
259
          `UPDATE "${this.table}_streams"
260
          SET
261
            at = $3::integer,
262
            retry = $4::integer,
263
            blocked = $5::boolean,
264
            leased_by = NULL,
265
            leased_at = NULL,
266
            leased_until = NULL
267
          WHERE
268
            stream = $1::text
269
            AND leased_by = $2::uuid`,
270
          [stream, by, at, retry, block]
271
        );
272
      }
273
      await client.query("COMMIT");
4✔
274
    } catch {
275
      // leased_until fallback
276
      await client.query("ROLLBACK").catch(() => {});
×
277
    } finally {
278
      client.release();
4✔
279
    }
280
  }
281
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc