• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

snatalenko / node-cqrs / 22745360142

06 Mar 2026 01:50AM UTC coverage: 95.287% (+0.9%) from 94.396%
22745360142

Pull #28

github

web-flow
Merge 15ef847b4 into 828e39903
Pull Request #28: TypeScript and event dispatching pipeline refactoring

428 of 528 branches covered (81.06%)

1043 of 1091 new or added lines in 65 files covered. (95.6%)

3 existing lines in 2 files now uncovered.

1294 of 1358 relevant lines covered (95.29%)

31.11 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

96.36
/src/sqlite/SqliteViewLocker.ts
1
import type { Database, Statement } from 'better-sqlite3';
2
import type { IContainer, ILogger, IViewLocker } from '../interfaces/index.ts';
3
import { assertString, Deferred } from '../utils/index.ts';
5✔
4
import { promisify } from 'util';
5✔
5
import { viewLockTableInit } from './queries/index.ts';
5✔
6
import type { SqliteProjectionDataParams } from './SqliteProjectionDataParams.ts';
7
import { AbstractSqliteAccessor } from './AbstractSqliteAccessor.ts';
5✔
8
const delay = promisify(setTimeout);
5✔
9

10
export type SqliteViewLockerParams = SqliteProjectionDataParams & {
11

12
        /**
13
         * (Optional) SQLite table name where event locks along with the latest event are stored
14
         *
15
         * @default "tbl_view_lock"
16
         */
17
        viewLockTableName?: string;
18

19
        /**
20
         * (Optional) Time-to-live (TTL) duration (in milliseconds) for which a view remains locked
21
         *
22
         * @default 120_000
23
         */
24
        viewLockTtl?: number;
25
};
26

27
export class SqliteViewLocker extends AbstractSqliteAccessor implements IViewLocker {
5✔
28

29
        #projectionName: string;
30
        #schemaVersion: string;
31

32
        #viewLockTableName: string;
33
        #viewLockTtl: number;
34
        #logger: ILogger | undefined;
35

36
        #upsertTableLockQuery!: Statement<[string, string, number], void>;
37
        #updateTableLockQuery!: Statement<[number, string, string], void>;
38
        #removeTableLockQuery!: Statement<[string, string], void>;
39

40
        #lockMarker: Deferred<void> | undefined;
41
        #lockProlongationTimeout: NodeJS.Timeout | undefined;
42

43
        constructor(o: Partial<Pick<IContainer, 'viewModelSqliteDb' | 'viewModelSqliteDbFactory' | 'logger'>>
44
                & SqliteViewLockerParams) {
45
                super(o);
43✔
46

47
                assertString(o?.projectionName, 'o.projectionName');
43✔
48
                assertString(o?.schemaVersion, 'o.schemaVersion');
43✔
49

50
                this.#projectionName = o.projectionName;
43✔
51
                this.#schemaVersion = o.schemaVersion;
43✔
52

53
                this.#viewLockTableName = o.viewLockTableName ?? 'tbl_view_lock';
43✔
54
                this.#viewLockTtl = o.viewLockTtl ?? 120_000;
43✔
55
                this.#logger = o.logger && 'child' in o.logger ?
43!
56
                        o.logger.child({ service: this.constructor.name }) :
57
                        o.logger;
58
        }
59

60
        protected initialize(db: Database) {
61
                db.exec(viewLockTableInit(this.#viewLockTableName));
16✔
62

63
                this.#upsertTableLockQuery = db.prepare(`
16✔
64
                        INSERT INTO ${this.#viewLockTableName} (projection_name, schema_version, locked_till)
65
                        VALUES (?, ?, ?)
66
                        ON CONFLICT (projection_name, schema_version)
67
                        DO UPDATE SET
68
                                locked_till = excluded.locked_till
69
                        WHERE
70
                                locked_till IS NULL
71
                                OR locked_till < excluded.locked_till
72
                `);
73

74
                this.#updateTableLockQuery = db.prepare(`
16✔
75
                        UPDATE ${this.#viewLockTableName}
76
                        SET
77
                                locked_till = ?
78
                        WHERE
79
                                projection_name = ?
80
                                AND schema_version = ?
81
                                AND locked_till IS NOT NULL
82
                `);
83

84
                this.#removeTableLockQuery = db.prepare(`
16✔
85
                        UPDATE ${this.#viewLockTableName}
86
                        SET
87
                                locked_till = NULL
88
                        WHERE
89
                                projection_name = ?
90
                                AND schema_version = ?
91
                                AND locked_till IS NOT NULL
92
                `);
93
        }
94

95
        get ready(): boolean {
96
                return !this.#lockMarker;
16✔
97
        }
98

99
        async lock() {
100
                this.#lockMarker = new Deferred();
17✔
101

102
                await this.assertConnection();
17✔
103

104
                let lockAcquired = false;
17✔
105
                while (!lockAcquired) {
17✔
106
                        const lockedTill = Date.now() + this.#viewLockTtl;
18✔
107
                        const upsertResult = this.#upsertTableLockQuery.run(this.#projectionName, this.#schemaVersion, lockedTill);
18✔
108

109
                        lockAcquired = upsertResult.changes === 1;
18✔
110
                        if (!lockAcquired) {
18✔
111
                                this.#logger?.debug(`"${this.#projectionName}" is locked by another process`);
1✔
112
                                await delay(this.#viewLockTtl / 2);
1✔
113
                        }
114
                }
115

116
                this.#logger?.debug(`"${this.#projectionName}" lock obtained for ${this.#viewLockTtl}s`);
17✔
117

118
                this.scheduleLockProlongation();
17✔
119

120
                return true;
17✔
121
        }
122

123
        private scheduleLockProlongation() {
124
                const ms = this.#viewLockTtl / 2;
17✔
125

126
                this.#lockProlongationTimeout = setTimeout(() => this.prolongLock(), ms);
17✔
127
                this.#lockProlongationTimeout.unref();
17✔
128

129
                this.#logger?.debug(`"${this.#projectionName}" lock refresh scheduled in ${ms} ms`);
17✔
130
        }
131

132
        private cancelLockProlongation() {
133
                clearTimeout(this.#lockProlongationTimeout);
9✔
134
                this.#logger?.debug(`"${this.#projectionName}" lock refresh canceled`);
9✔
135
        }
136

137
        private async prolongLock() {
138
                await this.assertConnection();
3✔
139

140
                const lockedTill = Date.now() + this.#viewLockTtl;
3✔
141
                const r = this.#updateTableLockQuery.run(lockedTill, this.#projectionName, this.#schemaVersion);
3✔
142
                if (r.changes !== 1)
3✔
143
                        throw new Error(`"${this.#projectionName}" lock could not be prolonged`);
1✔
144

145
                this.#logger?.debug(`"${this.#projectionName}" lock prolonged for ${this.#viewLockTtl}s`);
2✔
146
        }
147

148
        async unlock() {
149
                this.#lockMarker?.resolve();
9✔
150
                this.#lockMarker = undefined;
9✔
151

152
                this.cancelLockProlongation();
9✔
153

154
                await this.assertConnection();
9✔
155

156
                const updateResult = this.#removeTableLockQuery.run(this.#projectionName, this.#schemaVersion);
9✔
157
                if (updateResult.changes === 1)
9!
158
                        this.#logger?.debug(`"${this.#projectionName}" lock released`);
9✔
159
                else
NEW
160
                        this.#logger?.warn(`"${this.#projectionName}" lock didn't exist`);
×
161
        }
162

163
        once(event: 'ready'): Promise<void> {
164
                if (event !== 'ready')
3!
NEW
165
                        throw new TypeError(`Unexpected event: ${event}`);
×
166

167
                return this.#lockMarker?.promise ?? Promise.resolve();
3✔
168
        }
169
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc