• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

snatalenko / node-cqrs / 14434384385

13 Apr 2025 11:29PM UTC coverage: 83.126% (+0.03%) from 83.097%
14434384385

push

github

snatalenko
1.0.0-rc.10

490 of 782 branches covered (62.66%)

1000 of 1203 relevant lines covered (83.13%)

21.53 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

92.98
/src/sqlite/SqliteViewLocker.ts
1
import { Database, Statement } from 'better-sqlite3';
2
import { IContainer, ILogger, IViewLocker } from '../interfaces';
3
import { Deferred } from '../utils';
8✔
4
import { promisify } from 'util';
8✔
5
import { viewLockTableInit } from './queries';
8✔
6
import { SqliteProjectionDataParams } from './SqliteProjectionDataParams';
7
import { AbstractSqliteAccessor } from './AbstractSqliteAccessor';
8✔
8
const delay = promisify(setTimeout);
8✔
9

10
export type SqliteViewLockerParams = SqliteProjectionDataParams & {
11

12
        /**
13
         * (Optional) SQLite table name where event locks along with the latest event are stored
14
         *
15
         * @default "tbl_view_lock"
16
         */
17
        viewLockTableName?: string;
18

19
        /**
20
         * (Optional) Time-to-live (TTL) duration (in milliseconds) for which a view remains locked
21
         *
22
         * @default 120_000
23
         */
24
        viewLockTtl?: number;
25
};
26

27
export class SqliteViewLocker extends AbstractSqliteAccessor implements IViewLocker {
8✔
28

29
        #projectionName: string;
30
        #schemaVersion: string;
31

32
        #viewLockTableName: string;
33
        #viewLockTtl: number;
34
        #logger: ILogger | undefined;
35

36
        #upsertTableLockQuery!: Statement<[string, string, number], void>;
37
        #updateTableLockQuery!: Statement<[number, string, string], void>;
38
        #removeTableLockQuery!: Statement<[string, string], void>;
39

40
        #lockMarker: Deferred<void> | undefined;
41
        #lockProlongationTimeout: NodeJS.Timeout | undefined;
42

43
        constructor(o: Partial<Pick<IContainer, 'viewModelSqliteDb' | 'viewModelSqliteDbFactory' | 'logger'>>
44
                & SqliteViewLockerParams) {
45
                super(o);
40✔
46

47
                if (!o.projectionName)
40!
48
                        throw new TypeError('projectionName argument required');
×
49
                if (!o.schemaVersion)
40!
50
                        throw new TypeError('schemaVersion argument required');
×
51

52
                this.#projectionName = o.projectionName;
40✔
53
                this.#schemaVersion = o.schemaVersion;
40✔
54

55
                this.#viewLockTableName = o.viewLockTableName ?? 'tbl_view_lock';
40✔
56
                this.#viewLockTtl = o.viewLockTtl ?? 120_000;
40✔
57
                this.#logger = o.logger && 'child' in o.logger ?
40!
58
                        o.logger.child({ service: this.constructor.name }) :
59
                        o.logger;
60
        }
61

62
        protected initialize(db: Database) {
63
                db.exec(viewLockTableInit(this.#viewLockTableName));
22✔
64

65
                this.#upsertTableLockQuery = db.prepare(`
22✔
66
                        INSERT INTO ${this.#viewLockTableName} (projection_name, schema_version, locked_till)
67
                        VALUES (?, ?, ?)
68
                        ON CONFLICT (projection_name, schema_version)
69
                        DO UPDATE SET
70
                                locked_till = excluded.locked_till
71
                        WHERE
72
                                locked_till IS NULL
73
                                OR locked_till < excluded.locked_till
74
                `);
75

76
                this.#updateTableLockQuery = db.prepare(`
22✔
77
                        UPDATE ${this.#viewLockTableName}
78
                        SET
79
                                locked_till = ?
80
                        WHERE
81
                                projection_name = ?
82
                                AND schema_version = ?
83
                                AND locked_till IS NOT NULL
84
                `);
85

86
                this.#removeTableLockQuery = db.prepare(`
22✔
87
                        UPDATE ${this.#viewLockTableName}
88
                        SET
89
                                locked_till = NULL
90
                        WHERE
91
                                projection_name = ?
92
                                AND schema_version = ?
93
                                AND locked_till IS NOT NULL
94
                `);
95
        }
96

97
        get ready(): boolean {
98
                return !this.#lockMarker;
14✔
99
        }
100

101
        async lock() {
102
                this.#lockMarker = new Deferred();
22✔
103

104
                await this.assertConnection();
22✔
105

106
                let lockAcquired = false;
22✔
107
                while (!lockAcquired) {
22✔
108
                        const lockedTill = Date.now() + this.#viewLockTtl;
24✔
109
                        const upsertResult = this.#upsertTableLockQuery.run(this.#projectionName, this.#schemaVersion, lockedTill);
24✔
110

111
                        lockAcquired = upsertResult.changes === 1;
24✔
112
                        if (!lockAcquired) {
24✔
113
                                this.#logger?.debug(`"${this.#projectionName}" is locked by another process`);
2✔
114
                                await delay(this.#viewLockTtl / 2);
2✔
115
                        }
116
                }
117

118
                this.#logger?.debug(`"${this.#projectionName}" lock obtained for ${this.#viewLockTtl}s`);
22✔
119

120
                this.scheduleLockProlongation();
22✔
121

122
                return true;
22✔
123
        }
124

125
        private scheduleLockProlongation() {
126
                const ms = this.#viewLockTtl / 2;
22✔
127

128
                this.#lockProlongationTimeout = setTimeout(() => this.prolongLock(), ms);
22✔
129
                this.#lockProlongationTimeout.unref();
22✔
130

131
                this.#logger?.debug(`"${this.#projectionName}" lock refresh scheduled in ${ms} ms`);
22✔
132
        }
133

134
        private cancelLockProlongation() {
135
                clearTimeout(this.#lockProlongationTimeout);
12✔
136
                this.#logger?.debug(`"${this.#projectionName}" lock refresh canceled`);
12✔
137
        }
138

139
        private async prolongLock() {
140
                await this.assertConnection();
6✔
141

142
                const lockedTill = Date.now() + this.#viewLockTtl;
6✔
143
                const r = this.#updateTableLockQuery.run(lockedTill, this.#projectionName, this.#schemaVersion);
6✔
144
                if (r.changes !== 1)
6✔
145
                        throw new Error(`"${this.#projectionName}" lock could not be prolonged`);
2✔
146

147
                this.#logger?.debug(`"${this.#projectionName}" lock prolonged for ${this.#viewLockTtl}s`);
4✔
148
        }
149

150
        async unlock() {
151
                this.#lockMarker?.resolve();
12✔
152
                this.#lockMarker = undefined;
12✔
153

154
                this.cancelLockProlongation();
12✔
155

156
                await this.assertConnection();
12✔
157

158
                const updateResult = this.#removeTableLockQuery.run(this.#projectionName, this.#schemaVersion);
12✔
159
                if (updateResult.changes === 1)
12!
160
                        this.#logger?.debug(`"${this.#projectionName}" lock released`);
12✔
161
                else
162
                        this.#logger?.warn(`"${this.#projectionName}" lock didn't exist`);
×
163
        }
164

165
        once(event: 'ready'): Promise<void> {
166
                if (event !== 'ready')
2!
167
                        throw new TypeError(`Unexpected event: ${event}`);
×
168

169
                return this.#lockMarker?.promise ?? Promise.resolve();
2!
170
        }
171
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc