• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

snatalenko / node-cqrs / 23323662297

20 Mar 2026 12:31AM UTC coverage: 99.931%. First build
23323662297

Pull #37

github

web-flow
Merge 23fcf97b2 into a33b40157
Pull Request #37: New: SqliteEventStorage

503 of 546 branches covered (92.12%)

94 of 95 new or added lines in 4 files covered. (98.95%)

1439 of 1440 relevant lines covered (99.93%)

35.09 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

98.91
/src/sqlite/SqliteEventStorage.ts
1
import { randomUUID } from 'node:crypto';
7✔
2
import type { Statement, Database } from 'better-sqlite3';
3
import type {
4
        IIdentifierProvider,
5
        IEvent,
6
        IEventSet,
7
        EventQueryAfter,
8
        IEventStorageReader,
9
        IEventStream,
10
        Identifier,
11
        IDispatchPipelineProcessor,
12
        DispatchPipelineBatch,
13
        DispatchPipelineEnvelope,
14
        AggregateEventsQueryParams
15
} from '../interfaces/index.ts';
16
import { assertString, parseSagaId } from '../utils/index.ts';
7✔
17
import { ConcurrencyError } from '../errors/index.ts';
7✔
18
import { AbstractSqliteAccessor } from './AbstractSqliteAccessor.ts';
7✔
19
import { guid, bufferToGuid } from './utils/index.ts';
7✔
20

21
type EventRow = {
22
        id: Buffer;
23
        aggregate_id: Buffer | null;
24
        aggregate_version: number | null;
25
        type: string;
26
        data: string;
27
        meta: string | null;
28
        rowid: number;
29
};
30

31
type SagaRefRow = {
32
        saga_descriptor: string;
33
        origin_id: Buffer;
34
};
35

36
type RowidRow = {
37
        rowid: number;
38
};
39

40
function extractMeta(envelope: DispatchPipelineEnvelope): Record<string, unknown> | null {
41
        const { event: _event, ignoreConcurrencyError: _ignore, ...rest } = envelope;
1✔
42
        if (Object.keys(rest).length === 0)
1!
NEW
43
                return null;
×
44

45
        return rest;
1✔
46
}
47

48
export class SqliteEventStorage extends AbstractSqliteAccessor implements
7✔
49
        IEventStorageReader,
50
        IIdentifierProvider,
51
        IDispatchPipelineProcessor {
52

53
        #insertEventQuery!: Statement;
54
        #insertSagaRefQuery!: Statement;
55
        #checkConcurrencyQuery!: Statement<[Buffer, number]>;
56
        #getRowidQuery!: Statement<[Buffer]>;
57
        #getAggregateEventsQuery!: Statement;
58
        #getSagaEventsQuery!: Statement;
59
        #getSagaRefsQuery!: Statement;
60
        #getEventsByTypesQuery!: Statement;
61

62
        protected initialize(db: Database) {
63
                db.pragma('foreign_keys = ON');
18✔
64

65
                db.exec(`CREATE TABLE IF NOT EXISTS tbl_events (
18✔
66
                        id BLOB PRIMARY KEY,
67
                        aggregate_id BLOB,
68
                        aggregate_version INTEGER,
69
                        type TEXT NOT NULL,
70
                        data JSON NOT NULL,
71
                        meta JSON
72
                )`);
73

74
                db.exec(`CREATE TABLE IF NOT EXISTS tbl_event_sagas (
18✔
75
                        saga_descriptor TEXT NOT NULL,
76
                        origin_id BLOB NOT NULL,
77
                        event_id BLOB NOT NULL,
78
                        PRIMARY KEY (saga_descriptor, origin_id, event_id),
79
                        FOREIGN KEY (event_id) REFERENCES tbl_events(id)
80
                )`);
81

82
                db.exec('CREATE INDEX IF NOT EXISTS idx_events_aggregate_id ON tbl_events (aggregate_id)');
18✔
83
                db.exec('CREATE INDEX IF NOT EXISTS idx_event_sagas_event_id ON tbl_event_sagas (event_id)');
18✔
84

85
                this.#insertEventQuery = db.prepare(`
18✔
86
                        INSERT INTO tbl_events (id, aggregate_id, aggregate_version, type, data, meta)
87
                        VALUES (?, ?, ?, ?, ?, ?)
88
                `);
89

90
                this.#insertSagaRefQuery = db.prepare(`
18✔
91
                        INSERT INTO tbl_event_sagas (saga_descriptor, origin_id, event_id)
92
                        VALUES (?, ?, ?)
93
                `);
94

95
                this.#checkConcurrencyQuery = db.prepare(`
18✔
96
                        SELECT 1 FROM tbl_events
97
                        WHERE aggregate_id = ? AND aggregate_version = ?
98
                        LIMIT 1
99
                `);
100

101
                this.#getRowidQuery = db.prepare(`
18✔
102
                        SELECT rowid FROM tbl_events WHERE id = ?
103
                `);
104

105
                this.#getAggregateEventsQuery = db.prepare(`
18✔
106
                        WITH tail AS (
107
                                SELECT id AS tail_id
108
                                FROM tbl_events
109
                                WHERE aggregate_id = @aggregateId
110
                                        AND (@afterVersion IS NULL OR aggregate_version > @afterVersion)
111
                                ORDER BY rowid DESC
112
                                LIMIT 1
113
                        )
114
                        SELECT e.id, e.aggregate_id, e.aggregate_version, e.type, e.data, e.meta, e.rowid
115
                        FROM tbl_events e, tail
116
                        WHERE e.aggregate_id = @aggregateId
117
                                AND (@afterVersion IS NULL OR e.aggregate_version > @afterVersion)
118
                                AND (
119
                                        @eventTypes IS NULL
120
                                        OR e.type IN (SELECT value FROM json_each(@eventTypes))
121
                                        OR (@tail = 'last' AND e.id = tail.tail_id)
122
                                )
123
                        ORDER BY e.rowid
124
                `);
125

126
                this.#getSagaEventsQuery = db.prepare(`
18✔
127
                        SELECT e.id, e.aggregate_id, e.aggregate_version, e.type, e.data, e.meta, e.rowid
128
                        FROM tbl_events e
129
                        JOIN tbl_event_sagas sr ON sr.event_id = e.id
130
                        WHERE sr.saga_descriptor = ? AND sr.origin_id = ?
131
                                AND e.rowid >= ? AND e.rowid < ?
132
                        ORDER BY e.rowid
133
                `);
134

135
                this.#getSagaRefsQuery = db.prepare(`
18✔
136
                        SELECT saga_descriptor, origin_id
137
                        FROM tbl_event_sagas
138
                        WHERE event_id = ?
139
                `);
140

141
                this.#getEventsByTypesQuery = db.prepare(`
18✔
142
                        SELECT id, aggregate_id, aggregate_version, type, data, meta, rowid
143
                        FROM tbl_events
144
                        WHERE rowid > ?
145
                        ORDER BY rowid
146
                `);
147

148
        }
149

150
        // eslint-disable-next-line class-methods-use-this
151
        getNewId(): string {
152
                return randomUUID().replaceAll('-', '');
2✔
153
        }
154

155
        async commitEvents(events: IEventSet, options?: {
156
                ignoreConcurrencyError?: boolean;
157
                meta?: Record<string, unknown> | null;
158
        }): Promise<IEventSet> {
159
                await this.assertConnection();
18✔
160

161
                type EventWithSagas = IEvent & { sagaOrigins?: Record<string, string> };
162
                const metaJson = options?.meta ? JSON.stringify(options.meta) : null;
18✔
163

164
                this.db!.transaction(() => {
18✔
165
                        for (const event of events) {
18✔
166
                                if (!options?.ignoreConcurrencyError
31✔
167
                                        && event.aggregateId !== undefined
168
                                        && event.aggregateVersion !== undefined) {
169
                                        const conflict = this.#checkConcurrencyQuery.get(
14✔
170
                                                guid(String(event.aggregateId)),
171
                                                event.aggregateVersion
172
                                        );
173
                                        if (conflict)
14✔
174
                                                throw new ConcurrencyError(`Duplicate aggregateVersion ${event.aggregateVersion} for aggregate ${event.aggregateId}`);
1✔
175
                                }
176

177
                                const eventId = guid(String(event.id));
30✔
178
                                const { sagaOrigins, id: _id, ...eventData } = event as EventWithSagas;
30✔
179

180
                                this.#insertEventQuery.run(
30✔
181
                                        eventId,
182
                                        event.aggregateId !== undefined ? guid(String(event.aggregateId)) : null,
30✔
183
                                        event.aggregateVersion ?? null,
46✔
184
                                        event.type,
185
                                        JSON.stringify(eventData),
186
                                        metaJson
187
                                );
188

189
                                if (sagaOrigins) {
30✔
190
                                        for (const [descriptor, originId] of Object.entries(sagaOrigins)) {
9✔
191
                                                this.#insertSagaRefQuery.run(
10✔
192
                                                        descriptor,
193
                                                        guid(originId),
194
                                                        eventId
195
                                                );
196
                                        }
197
                                }
198
                        }
199
                })();
200

201
                return events;
17✔
202
        }
203

204
        async* getAggregateEvents(aggregateId: Identifier, options?: AggregateEventsQueryParams): IEventStream {
205
                await this.assertConnection();
5✔
206

207
                const rows = this.#getAggregateEventsQuery.all({
5✔
208
                        aggregateId: guid(String(aggregateId)),
209
                        afterVersion: options?.snapshot?.aggregateVersion ?? null,
9✔
210
                        eventTypes: options?.eventTypes
5✔
211
                                ? JSON.stringify(options.eventTypes)
212
                                : null,
213
                        tail: options?.tail ?? null
9✔
214
                }) as EventRow[];
215

216
                for (const row of rows)
5✔
217
                        yield this.#reconstructEvent(row);
7✔
218
        }
219

220
        async* getSagaEvents(sagaId: Identifier, { beforeEvent }: { beforeEvent: IEvent }): IEventStream {
221
                await this.assertConnection();
5✔
222

223
                assertString(beforeEvent?.id, 'beforeEvent.id');
5✔
224

225
                const { sagaDescriptor, originEventId } = parseSagaId(sagaId);
5✔
226
                if (beforeEvent.sagaOrigins?.[sagaDescriptor] !== originEventId)
5✔
227
                        throw new TypeError('beforeEvent.sagaOrigins does not match sagaId');
1✔
228

229
                const originRowid = this.#getRowidQuery.get(guid(originEventId)) as RowidRow | undefined;
4✔
230
                if (!originRowid)
4✔
231
                        throw new Error(`origin event ${originEventId} not found`);
1✔
232

233
                const beforeRowid = this.#getRowidQuery.get(guid(String(beforeEvent.id))) as RowidRow | undefined;
3✔
234
                if (!beforeRowid)
3✔
235
                        throw new Error(`beforeEvent ${beforeEvent.id} not found`);
1✔
236

237
                const rows = this.#getSagaEventsQuery.all(
2✔
238
                        sagaDescriptor,
239
                        guid(originEventId),
240
                        originRowid.rowid,
241
                        beforeRowid.rowid
242
                ) as EventRow[];
243

244
                for (const row of rows)
2✔
245
                        yield this.#reconstructEvent(row);
4✔
246
        }
247

248
        async* getEventsByTypes(eventTypes: Readonly<string[]>, options?: EventQueryAfter): IEventStream {
249
                await this.assertConnection();
3✔
250

251
                const lastEventId = options?.afterEvent?.id;
3✔
252
                if (options?.afterEvent)
3✔
253
                        assertString(options.afterEvent.id, 'options.afterEvent.id');
2✔
254

255
                let afterRowid = 0;
2✔
256
                if (lastEventId) {
2✔
257
                        const row = this.#getRowidQuery.get(guid(String(lastEventId))) as RowidRow | undefined;
1✔
258
                        if (row)
1✔
259
                                afterRowid = row.rowid;
1✔
260
                }
261

262
                const rows = this.#getEventsByTypesQuery.all(afterRowid) as EventRow[];
2✔
263

264
                for (const row of rows) {
2✔
265
                        if (eventTypes.includes(row.type))
5✔
266
                                yield this.#reconstructEvent(row);
4✔
267
                }
268
        }
269

270
        async process(batch: DispatchPipelineBatch): Promise<DispatchPipelineBatch> {
271
                const events: IEvent[] = [];
2✔
272
                for (const item of batch) {
2✔
273
                        if (!item.event)
2✔
274
                                throw new Error('Event batch does not contain `event`');
1✔
275
                        events.push(item.event);
1✔
276
                }
277

278
                const meta = extractMeta(batch[0]);
1✔
279
                const ignoreConcurrencyError = batch[0]?.ignoreConcurrencyError;
1✔
280

281
                await this.commitEvents(events, { ignoreConcurrencyError, meta });
1✔
282

283
                return batch;
1✔
284
        }
285

286
        #getSagaOriginsForEvent(eventIdBuf: Buffer): Record<string, string> {
287
                const refs = this.#getSagaRefsQuery.all(eventIdBuf) as SagaRefRow[];
15✔
288
                if (refs.length === 0)
15✔
289
                        return {};
11✔
290

291
                const sagaOrigins: Record<string, string> = {};
4✔
292
                for (const ref of refs)
4✔
293
                        sagaOrigins[ref.saga_descriptor] = bufferToGuid(ref.origin_id);
5✔
294
                return sagaOrigins;
4✔
295
        }
296

297
        #reconstructEvent(row: EventRow): Readonly<IEvent> {
298
                const data = JSON.parse(row.data);
15✔
299
                const sagaOrigins = this.#getSagaOriginsForEvent(row.id);
15✔
300

301
                const event: IEvent = {
15✔
302
                        id: bufferToGuid(row.id),
303
                        ...data
304
                };
305

306
                if (Object.keys(sagaOrigins).length > 0)
15✔
307
                        event.sagaOrigins = sagaOrigins;
4✔
308

309
                return event;
15✔
310
        }
311
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc