• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

snatalenko / node-cqrs / 23367282166

20 Mar 2026 11:56PM UTC coverage: 99.931%. First build
23367282166

Pull #37

github

web-flow
Merge ffaf76691 into 0d958cf95
Pull Request #37: New: SqliteEventStorage

503 of 546 branches covered (92.12%)

94 of 95 new or added lines in 4 files covered. (98.95%)

1439 of 1440 relevant lines covered (99.93%)

35.18 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

98.91
/src/sqlite/SqliteEventStorage.ts
1
import { randomUUID } from 'node:crypto';
7✔
2
import type { Statement, Database } from 'better-sqlite3';
3
import type {
4
        IIdentifierProvider,
5
        IEvent,
6
        IEventSet,
7
        EventQueryAfter,
8
        IEventStorageReader,
9
        IEventStream,
10
        Identifier,
11
        IDispatchPipelineProcessor,
12
        DispatchPipelineBatch,
13
        DispatchPipelineEnvelope,
14
        AggregateEventsQueryParams
15
} from '../interfaces/index.ts';
16
import { assertString, parseSagaId } from '../utils/index.ts';
7✔
17
import { ConcurrencyError } from '../errors/index.ts';
7✔
18
import { AbstractSqliteAccessor } from './AbstractSqliteAccessor.ts';
7✔
19
import { guid, bufferToGuid } from './utils/index.ts';
7✔
20

21
type EventRow = {
22
        id: Buffer;
23
        aggregate_id: Buffer | null;
24
        aggregate_version: number | null;
25
        type: string;
26
        data: string;
27
        meta: string | null;
28
        rowid: number;
29
};
30

31
type SagaRefRow = {
32
        saga_descriptor: string;
33
        origin_id: Buffer;
34
};
35

36
type RowidRow = {
37
        rowid: number;
38
};
39

40
function extractMeta(envelope: DispatchPipelineEnvelope): Record<string, unknown> | null {
41
        const { event: _event, ignoreConcurrencyError: _ignore, ...rest } = envelope;
1✔
42
        if (Object.keys(rest).length === 0)
1!
NEW
43
                return null;
×
44

45
        return rest;
1✔
46
}
47

48
export class SqliteEventStorage extends AbstractSqliteAccessor implements
7✔
49
        IEventStorageReader,
50
        IIdentifierProvider,
51
        IDispatchPipelineProcessor {
52

53
        #insertEventQuery!: Statement<[Buffer, Buffer | null, number | null, string, string, string | null]>;
54
        #insertSagaRefQuery!: Statement<[string, Buffer, Buffer]>;
55
        #checkConcurrencyQuery!: Statement<[Buffer, number]>;
56
        #getRowidQuery!: Statement<[Buffer], RowidRow>;
57
        #getAggregateEventsQuery!: Statement<[{
58
                aggregateId: Buffer;
59
                afterVersion: number | null;
60
                eventTypes: string | null;
61
                tail: string | null;
62
        }], EventRow>;
63
        #getSagaEventsQuery!: Statement<[{
64
                sagaDescriptor: string;
65
                originId: Buffer;
66
                originRowid: number;
67
                beforeRowid: number;
68
        }], EventRow>;
69
        #getSagaRefsQuery!: Statement<[Buffer], SagaRefRow>;
70
        #getEventsByTypesQuery!: Statement<[number], EventRow>;
71

72
        protected initialize(db: Database) {
73
                db.pragma('foreign_keys = ON');
19✔
74

75
                db.exec(`CREATE TABLE IF NOT EXISTS tbl_events (
19✔
76
                        id BLOB PRIMARY KEY,
77
                        aggregate_id BLOB,
78
                        aggregate_version INTEGER,
79
                        type TEXT NOT NULL,
80
                        data JSON NOT NULL,
81
                        meta JSON
82
                )`);
83

84
                db.exec(`CREATE TABLE IF NOT EXISTS tbl_event_sagas (
19✔
85
                        saga_descriptor TEXT NOT NULL,
86
                        origin_id BLOB NOT NULL,
87
                        event_id BLOB NOT NULL,
88
                        PRIMARY KEY (saga_descriptor, origin_id, event_id),
89
                        FOREIGN KEY (event_id) REFERENCES tbl_events(id)
90
                )`);
91

92
                db.exec('CREATE INDEX IF NOT EXISTS idx_events_aggregate_id ON tbl_events (aggregate_id)');
19✔
93
                db.exec('CREATE INDEX IF NOT EXISTS idx_event_sagas_event_id ON tbl_event_sagas (event_id)');
19✔
94

95
                this.#insertEventQuery = db.prepare(`
19✔
96
                        INSERT INTO tbl_events (id, aggregate_id, aggregate_version, type, data, meta)
97
                        VALUES (?, ?, ?, ?, ?, ?)
98
                `);
99

100
                this.#insertSagaRefQuery = db.prepare(`
19✔
101
                        INSERT INTO tbl_event_sagas (saga_descriptor, origin_id, event_id)
102
                        VALUES (?, ?, ?)
103
                `);
104

105
                this.#checkConcurrencyQuery = db.prepare(`
19✔
106
                        SELECT 1 FROM tbl_events
107
                        WHERE aggregate_id = ? AND aggregate_version = ?
108
                        LIMIT 1
109
                `);
110

111
                this.#getRowidQuery = db.prepare(`
19✔
112
                        SELECT rowid FROM tbl_events WHERE id = ?
113
                `);
114

115
                this.#getAggregateEventsQuery = db.prepare(`
19✔
116
                        WITH tail AS (
117
                                SELECT id AS tail_id
118
                                FROM tbl_events
119
                                WHERE aggregate_id = @aggregateId
120
                                        AND (@afterVersion IS NULL OR aggregate_version > @afterVersion)
121
                                ORDER BY rowid DESC
122
                                LIMIT 1
123
                        )
124
                        SELECT e.id, e.aggregate_id, e.aggregate_version, e.type, e.data, e.meta, e.rowid
125
                        FROM tbl_events e, tail
126
                        WHERE e.aggregate_id = @aggregateId
127
                                AND (@afterVersion IS NULL OR e.aggregate_version > @afterVersion)
128
                                AND (
129
                                        @eventTypes IS NULL
130
                                        OR e.type IN (SELECT value FROM json_each(@eventTypes))
131
                                        OR (@tail = 'last' AND e.id = tail.tail_id)
132
                                )
133
                        ORDER BY e.rowid
134
                `);
135

136
                this.#getSagaEventsQuery = db.prepare(`
19✔
137
                        SELECT e.id, e.aggregate_id, e.aggregate_version, e.type, e.data, e.meta, e.rowid
138
                        FROM tbl_events e
139
                        LEFT JOIN tbl_event_sagas sr
140
                                ON sr.event_id = e.id
141
                                AND sr.saga_descriptor = @sagaDescriptor
142
                                AND sr.origin_id = @originId
143
                        WHERE e.rowid >= @originRowid AND e.rowid < @beforeRowid
144
                                AND (e.id = @originId OR sr.event_id IS NOT NULL)
145
                        ORDER BY e.rowid
146
                `);
147

148
                this.#getSagaRefsQuery = db.prepare(`
19✔
149
                        SELECT saga_descriptor, origin_id
150
                        FROM tbl_event_sagas
151
                        WHERE event_id = ?
152
                `);
153

154
                this.#getEventsByTypesQuery = db.prepare(`
19✔
155
                        SELECT id, aggregate_id, aggregate_version, type, data, meta, rowid
156
                        FROM tbl_events
157
                        WHERE rowid > ?
158
                        ORDER BY rowid
159
                `);
160

161
        }
162

163
        // eslint-disable-next-line class-methods-use-this
164
        getNewId(): string {
165
                return randomUUID().replaceAll('-', '');
2✔
166
        }
167

168
        async commitEvents(events: IEventSet, options?: {
169
                ignoreConcurrencyError?: boolean;
170
                meta?: Record<string, unknown> | null;
171
        }): Promise<IEventSet> {
172
                await this.assertConnection();
19✔
173

174
                type EventWithSagas = IEvent & { sagaOrigins?: Record<string, string> };
175
                const metaJson = options?.meta ? JSON.stringify(options.meta) : null;
19✔
176

177
                this.db!.transaction(() => {
19✔
178
                        for (const event of events) {
19✔
179
                                if (!options?.ignoreConcurrencyError
34✔
180
                                        && event.aggregateId !== undefined
181
                                        && event.aggregateVersion !== undefined) {
182
                                        const conflict = this.#checkConcurrencyQuery.get(
14✔
183
                                                guid(String(event.aggregateId)),
184
                                                event.aggregateVersion
185
                                        );
186
                                        if (conflict)
14✔
187
                                                throw new ConcurrencyError(`Duplicate aggregateVersion ${event.aggregateVersion} for aggregate ${event.aggregateId}`);
1✔
188
                                }
189

190
                                const { sagaOrigins, id, ...eventData } = event as EventWithSagas;
33✔
191
                                const eventId = guid(String(id));
33✔
192

193
                                this.#insertEventQuery.run(
33✔
194
                                        eventId,
195
                                        event.aggregateId !== undefined ? guid(String(event.aggregateId)) : null,
33✔
196
                                        event.aggregateVersion ?? null,
52✔
197
                                        event.type,
198
                                        JSON.stringify(eventData),
199
                                        metaJson
200
                                );
201

202
                                if (sagaOrigins) {
33✔
203
                                        for (const [descriptor, originId] of Object.entries(sagaOrigins)) {
11✔
204
                                                this.#insertSagaRefQuery.run(
12✔
205
                                                        descriptor,
206
                                                        guid(originId),
207
                                                        eventId
208
                                                );
209
                                        }
210
                                }
211
                        }
212
                })();
213

214
                return events;
18✔
215
        }
216

217
        async* getAggregateEvents(aggregateId: Identifier, options?: AggregateEventsQueryParams): IEventStream {
218
                await this.assertConnection();
5✔
219

220
                const rows = this.#getAggregateEventsQuery.all({
5✔
221
                        aggregateId: guid(String(aggregateId)),
222
                        afterVersion: options?.snapshot?.aggregateVersion ?? null,
9✔
223
                        eventTypes: options?.eventTypes
5✔
224
                                ? JSON.stringify(options.eventTypes)
225
                                : null,
226
                        tail: options?.tail ?? null
9✔
227
                });
228

229
                for (const row of rows)
5✔
230
                        yield this.#reconstructEvent(row);
7✔
231
        }
232

233
        async* getSagaEvents(sagaId: Identifier, { beforeEvent }: { beforeEvent: IEvent }): IEventStream {
234
                await this.assertConnection();
6✔
235

236
                assertString(beforeEvent?.id, 'beforeEvent.id');
6✔
237

238
                const { sagaDescriptor, originEventId } = parseSagaId(sagaId);
6✔
239
                if (beforeEvent.sagaOrigins?.[sagaDescriptor] !== originEventId)
6✔
240
                        throw new TypeError('beforeEvent.sagaOrigins does not match sagaId');
1✔
241

242
                const originRowid = this.#getRowidQuery.get(guid(originEventId));
5✔
243
                if (!originRowid)
5✔
244
                        throw new Error(`origin event ${originEventId} not found`);
1✔
245

246
                const beforeRowid = this.#getRowidQuery.get(guid(String(beforeEvent.id)));
4✔
247
                if (!beforeRowid)
4✔
248
                        throw new Error(`beforeEvent ${beforeEvent.id} not found`);
1✔
249

250
                const rows = this.#getSagaEventsQuery.all({
3✔
251
                        sagaDescriptor,
252
                        originId: guid(originEventId),
253
                        originRowid: originRowid.rowid,
254
                        beforeRowid: beforeRowid.rowid
255
                });
256

257
                for (const row of rows)
3✔
258
                        yield this.#reconstructEvent(row);
6✔
259
        }
260

261
        async* getEventsByTypes(eventTypes: Readonly<string[]>, options?: EventQueryAfter): IEventStream {
262
                await this.assertConnection();
3✔
263

264
                const lastEventId = options?.afterEvent?.id;
3✔
265
                if (options?.afterEvent)
3✔
266
                        assertString(options.afterEvent.id, 'options.afterEvent.id');
2✔
267

268
                let afterRowid = 0;
2✔
269
                if (lastEventId) {
2✔
270
                        const row = this.#getRowidQuery.get(guid(String(lastEventId)));
1✔
271
                        if (row)
1✔
272
                                afterRowid = row.rowid;
1✔
273
                }
274

275
                const rows = this.#getEventsByTypesQuery.all(afterRowid);
2✔
276

277
                for (const row of rows) {
2✔
278
                        if (eventTypes.includes(row.type))
5✔
279
                                yield this.#reconstructEvent(row);
4✔
280
                }
281
        }
282

283
        async process(batch: DispatchPipelineBatch): Promise<DispatchPipelineBatch> {
284
                const events: IEvent[] = [];
2✔
285
                for (const item of batch) {
2✔
286
                        if (!item.event)
2✔
287
                                throw new Error('Event batch does not contain `event`');
1✔
288
                        events.push(item.event);
1✔
289
                }
290

291
                const meta = extractMeta(batch[0]);
1✔
292
                const ignoreConcurrencyError = batch[0]?.ignoreConcurrencyError;
1✔
293

294
                await this.commitEvents(events, { ignoreConcurrencyError, meta });
1✔
295

296
                return batch;
1✔
297
        }
298

299
        #getSagaOriginsForEvent(eventIdBuf: Buffer): Record<string, string> {
300
                const refs = this.#getSagaRefsQuery.all(eventIdBuf);
17✔
301
                if (refs.length === 0)
17✔
302
                        return {};
12✔
303

304
                const sagaOrigins: Record<string, string> = {};
5✔
305
                for (const ref of refs)
5✔
306
                        sagaOrigins[ref.saga_descriptor] = bufferToGuid(ref.origin_id);
6✔
307
                return sagaOrigins;
5✔
308
        }
309

310
        #reconstructEvent(row: EventRow): Readonly<IEvent> {
311
                const data = JSON.parse(row.data);
17✔
312
                const sagaOrigins = this.#getSagaOriginsForEvent(row.id);
17✔
313

314
                const event: IEvent = {
17✔
315
                        id: bufferToGuid(row.id),
316
                        ...data
317
                };
318

319
                if (Object.keys(sagaOrigins).length > 0)
17✔
320
                        event.sagaOrigins = sagaOrigins;
5✔
321

322
                return event;
17✔
323
        }
324
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc