• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

snatalenko / node-cqrs / 21717407497

05 Feb 2026 03:26PM UTC coverage: 84.53% (-9.9%) from 94.396%
21717407497

Pull #28

github

web-flow
Merge 025edb883 into 828e39903
Pull Request #28: TypeScript and event dispatching pipeline refactoring

611 of 939 branches covered (65.07%)

819 of 934 new or added lines in 65 files covered. (87.69%)

59 existing lines in 13 files now uncovered.

1213 of 1435 relevant lines covered (84.53%)

28.39 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

94.59
/src/sqlite/SqliteEventLocker.ts
1
import type { Database, Statement } from 'better-sqlite3';
2
import type { IContainer, IEvent, IEventLocker } from '../interfaces/index.ts';
3
import { getEventId } from './utils/index.ts';
8✔
4
import { viewLockTableInit, eventLockTableInit } from './queries/index.ts';
8✔
5
import type { SqliteViewLockerParams } from './SqliteViewLocker.ts';
6
import type { SqliteProjectionDataParams } from './SqliteProjectionDataParams.ts';
7
import { AbstractSqliteAccessor } from './AbstractSqliteAccessor.ts';
8✔
8

9
export type SqliteEventLockerParams =
10
        SqliteProjectionDataParams
11
        & Pick<SqliteViewLockerParams, 'viewLockTableName'>
12
        & {
13

14
                /**
15
                 * (Optional) SQLite table name where event locks are stored
16
                 *
17
                 * @default "tbl_event_lock"
18
                 */
19
                eventLockTableName?: string;
20

21
                /**
22
                 * (Optional) Time-to-live (TTL) duration in milliseconds
23
                 * for which an event remains in the "processing" state until released.
24
                 *
25
                 * @default 15_000
26
                 */
27
                eventLockTtl?: number;
28
        };
29

30
export class SqliteEventLocker extends AbstractSqliteAccessor implements IEventLocker {
8✔
31

32
        #projectionName: string;
33
        #schemaVersion: string;
34
        #viewLockTableName: string;
35
        #eventLockTableName: string;
36
        #eventLockTtl: number;
37

38
        #upsertLastEventQuery!: Statement<[string, string, string], void>;
39
        #getLastEventQuery!: Statement<[string, string], { last_event: string }>;
40
        #lockEventQuery!: Statement<[string, string, Buffer], void>;
41
        #finalizeEventLockQuery!: Statement<[string, string, Buffer], void>;
42

43
        constructor(o: Pick<IContainer, 'viewModelSqliteDb' | 'viewModelSqliteDbFactory'> & SqliteEventLockerParams) {
44
                super(o);
24✔
45

46
                if (!o.projectionName)
24!
NEW
47
                        throw new TypeError('projectionName argument required');
×
48
                if (!o.schemaVersion)
24!
NEW
49
                        throw new TypeError('schemaVersion argument required');
×
50

51
                this.#projectionName = o.projectionName;
24✔
52
                this.#schemaVersion = o.schemaVersion;
24✔
53
                this.#viewLockTableName = o.viewLockTableName ?? 'tbl_view_lock';
24✔
54
                this.#eventLockTableName = o.eventLockTableName ?? 'tbl_event_lock';
24✔
55
                this.#eventLockTtl = o.eventLockTtl ?? 15_000;
24✔
56
        }
57

58
        protected initialize(db: Database) {
59
                db.exec(viewLockTableInit(this.#viewLockTableName));
16✔
60
                db.exec(eventLockTableInit(this.#eventLockTableName));
16✔
61

62
                this.#upsertLastEventQuery = db.prepare(`
16✔
63
                        INSERT INTO ${this.#viewLockTableName} (projection_name, schema_version, last_event)
64
                        VALUES (?, ?, ?)
65
                        ON CONFLICT (projection_name, schema_version)
66
                        DO UPDATE SET
67
                                last_event = excluded.last_event
68
                `);
69

70
                this.#getLastEventQuery = db.prepare(`
16✔
71
                        SELECT
72
                                last_event
73
                        FROM ${this.#viewLockTableName}
74
                        WHERE
75
                                projection_name = ?
76
                                AND schema_version =?
77
                `);
78

79
                this.#lockEventQuery = db.prepare(`
16✔
80
                        INSERT INTO ${this.#eventLockTableName} (projection_name, schema_version, event_id)
81
                        VALUES (?, ?, ?)
82
                        ON CONFLICT (projection_name, schema_version, event_id)
83
                        DO UPDATE SET
84
                                processing_at = cast(unixepoch('subsec') * 1000 as INTEGER)
85
                        WHERE
86
                                processed_at IS NULL
87
                                AND processing_at <= cast(unixepoch('subsec') * 1000 as INTEGER) - ${this.#eventLockTtl}
88
                `);
89

90
                this.#finalizeEventLockQuery = db.prepare(`
16✔
91
                        UPDATE ${this.#eventLockTableName}
92
                        SET
93
                                processed_at = cast(unixepoch('subsec') * 1000 as INTEGER)
94
                        WHERE
95
                                projection_name = ?
96
                                AND schema_version = ?
97
                                AND event_id = ?
98
                                AND processed_at IS NULL
99
                `);
100
        }
101

102
        async tryMarkAsProjecting(event: IEvent<any>) {
103
                await this.assertConnection();
16✔
104

105
                const eventId = getEventId(event);
16✔
106

107
                const r = this.#lockEventQuery.run(this.#projectionName, this.#schemaVersion, eventId);
16✔
108

109
                return r.changes !== 0;
16✔
110
        }
111

112
        async markAsProjected(event: IEvent<any>) {
113
                await this.assertConnection();
8✔
114

115
                const eventId = getEventId(event);
8✔
116

117
                const transaction = this.db!.transaction(() => {
8✔
118
                        const updateResult = this.#finalizeEventLockQuery.run(this.#projectionName, this.#schemaVersion, eventId);
8✔
119
                        if (updateResult.changes === 0)
8✔
120
                                throw new Error(`Event ${event.id} could not be marked as processed`);
4✔
121

122
                        this.#upsertLastEventQuery.run(this.#projectionName, this.#schemaVersion, JSON.stringify(event));
4✔
123
                });
124

125
                transaction();
8✔
126
        }
127

128
        async getLastEvent(): Promise<IEvent<any> | undefined> {
129
                await this.assertConnection();
4✔
130

131
                const viewInfoRecord = this.#getLastEventQuery.get(this.#projectionName, this.#schemaVersion);
4✔
132
                if (!viewInfoRecord?.last_event)
4✔
133
                        return undefined;
2✔
134

135
                return JSON.parse(viewInfoRecord.last_event);
2✔
136
        }
137
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc