• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

snatalenko / node-cqrs / 21717407497

05 Feb 2026 03:26PM UTC coverage: 84.53% (-9.9%) from 94.396%
21717407497

Pull #28

github

web-flow
Merge 025edb883 into 828e39903
Pull Request #28: TypeScript and event dispatching pipeline refactoring

611 of 939 branches covered (65.07%)

819 of 934 new or added lines in 65 files covered. (87.69%)

59 existing lines in 13 files now uncovered.

1213 of 1435 relevant lines covered (84.53%)

28.39 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

62.35
/src/workers/AbstractWorkerProjection.ts
1
import { isMainThread, Worker, MessageChannel, parentPort, workerData } from 'node:worker_threads';
2✔
2
import { AbstractProjection, type AbstractProjectionParams } from '../AbstractProjection.ts';
2✔
3
import type { IEvent } from '../interfaces/index.ts';
4
import * as Comlink from 'comlink';
2✔
5
import { nodeEndpoint, createWorker } from './utils/index.ts';
2✔
6
import { extractErrorDetails } from '../utils/index.ts';
2✔
7
import { isWorkerData, type IWorkerData, type WorkerInitMessage } from './protocol.ts';
2✔
8

9
export type AbstractWorkerProjectionParams<TView> = AbstractProjectionParams<TView> & {
10

11
        /**
12
         * Required in the main thread to spawn a worker (derived projection module path).
13
         * Not used in the worker thread.
14
         */
15
        workerModulePath?: string;
16

17
        /**
18
         * When `false`, runs projection + view in the current thread (no Worker, no RPC).
19
         * Intended for tests and environments where worker threads aren't desired.
20
         */
21
        useWorkerThreads?: boolean;
22
};
23

24
interface IRemoteProjectionApi {
25
        project(event: IEvent): Promise<void> | void;
26
        _project(event: IEvent): Promise<void> | void;
27
        ping(): true;
28
}
29

30
interface IMainThreadProjection<TView> {
31
        get remoteProjection(): Comlink.Remote<IRemoteProjectionApi>;
32
        get remoteView(): Comlink.Remote<TView>;
33
}
34

35
/**
36
 * Projection base class that can run projection handlers and the associated view in a worker thread
37
 * to isolate CPU-heavy work and keep the main thread responsive
38
 */
39
export abstract class AbstractWorkerProjection<TView> extends AbstractProjection<TView> {
2✔
40

41
        #worker?: Worker;
42
        readonly #workerInit?: Promise<Worker>;
43
        readonly #remoteProjection?: Comlink.Remote<IRemoteProjectionApi>;
44
        readonly #remoteView?: Comlink.Remote<TView>;
45
        readonly #useWorkerThreads: boolean;
46

47
        /**
48
         * Creates an instance of a class derived from AbstractWorkerProjection in a Worker thread
49
         *
50
         * @param factory - Optional factory function to create the projection instance
51
         */
52
        static createWorkerInstance<V, T extends AbstractWorkerProjection<V>>(
53
                this: new () => T,
54
                factory?: () => T
55
        ): T {
NEW
56
                if (!parentPort)
×
NEW
57
                        throw new Error('createWorkerInstance can only be called from a Worker thread');
×
NEW
58
                if (!isWorkerData(workerData))
×
NEW
59
                        throw new Error('workerData does not contain projectionPort and viewPort');
×
60

NEW
61
                const workerProjectionInstance = factory?.() ?? new this();
×
NEW
62
                const workerProjectionInstanceApi: IRemoteProjectionApi = {
×
NEW
63
                        project: event => workerProjectionInstance.project(event),
×
NEW
64
                        _project: event => workerProjectionInstance._project(event),
×
NEW
65
                        ping: () => workerProjectionInstance._pong()
×
66
                };
67

NEW
68
                Comlink.expose(workerProjectionInstanceApi, nodeEndpoint(workerData.projectionPort));
×
NEW
69
                Comlink.expose(workerProjectionInstance.view, nodeEndpoint(workerData.viewPort));
×
70

NEW
71
                parentPort.postMessage({ type: 'ready' } satisfies WorkerInitMessage);
×
72

NEW
73
                return workerProjectionInstance;
×
74
        }
75

76
        /**
77
         * Convenience wrapper for module-level bootstrapping.
78
         *
79
         * In the main thread, does nothing.
80
         * In a worker thread, creates and exposes the projection singleton (same as createWorkerInstance).
81
         */
82
        static createInstanceIfWorkerThread<V, T extends AbstractWorkerProjection<V>>(
83
                this: (new () => T) & { createWorkerInstance: (factory?: () => T) => T },
84
                factory?: () => T
85
        ): T | undefined {
86
                if (isMainThread)
2✔
87
                        return undefined;
2✔
88

NEW
89
                return this.createWorkerInstance(factory);
×
90
        }
91

92
        async project(event: IEvent): Promise<void> {
93
                if (this.#useWorkerThreads && isMainThread) {
2!
NEW
94
                        if (!this.#worker)
×
NEW
95
                                await this.#workerInit;
×
96

NEW
97
                        return this.remoteProjection.project(event);
×
98
                }
99

100
                return super.project(event);
2✔
101
        }
102

103
        /**
104
         * Proxy to the projection instance in the worker thread
105
         */
106
        get remoteProjection(): Comlink.Remote<IRemoteProjectionApi> {
107
                this.assertMainThread();
24✔
108
                return this.#remoteProjection!;
22✔
109
        }
110

111
        /**
112
         * Proxy to the projection instance in the worker thread (awaits worker init)
113
         */
114
        get remoteProjectionInitializer(): Promise<Comlink.Remote<IRemoteProjectionApi>> {
NEW
115
                this.assertMainThread();
×
NEW
116
                return this.ensureWorkerReady().then(() => this.remoteProjection);
×
117
        }
118

119
        /**
120
         * Proxy to the view instance in the worker thread
121
         */
122
        get remoteView(): Comlink.Remote<TView> {
123
                this.assertMainThread();
72✔
124
                return this.#remoteView!;
72✔
125
        }
126

127
        get view(): TView {
128
                if (this.#useWorkerThreads && isMainThread)
84✔
129
                        return this.remoteView as unknown as TView;
72✔
130

131
                return super.view;
12✔
132
        }
133

134
        /**
135
         * Proxy to the view instance in the worker thread (awaits worker init)
136
         */
137
        get remoteViewInitializer(): Promise<Comlink.Remote<TView>> {
NEW
138
                this.assertMainThread();
×
NEW
139
                return this.ensureWorkerReady().then(() => this.remoteView);
×
140
        }
141

142
        constructor({
×
143
                workerModulePath,
144
                useWorkerThreads = true,
8✔
145
                view,
146
                viewLocker,
147
                eventLocker,
148
                logger
149
        }: AbstractWorkerProjectionParams<TView> = {}) {
150
                super({
18✔
151
                        view,
152
                        viewLocker,
153
                        eventLocker,
154
                        logger
155
                });
156

157
                this.#useWorkerThreads = useWorkerThreads;
18✔
158

159
                if (this.#useWorkerThreads && isMainThread) {
18✔
160
                        if (!workerModulePath)
16!
NEW
161
                                throw new TypeError('workerModulePath parameter is required in the main thread when useWorkerThreads=true');
×
162

163
                        const { port1: projectionPortMain, port2: projectionPort } = new MessageChannel();
16✔
164
                        const { port1: viewPortMain, port2: viewPort } = new MessageChannel();
16✔
165

166
                        this.#workerInit = this._createWorker(workerModulePath, {
16✔
167
                                projectionPort,
168
                                viewPort
169
                        }).then(worker => {
170
                                this.#worker = worker;
14✔
171
                                worker.once('error', this._onWorkerError);
14✔
172
                                worker.once('exit', this._onWorkerExit);
14✔
173
                                return worker;
14✔
174
                        });
175

176
                        this.#workerInit.catch(() => { });
16✔
177

178
                        this.#remoteProjection = Comlink.wrap<IRemoteProjectionApi>(nodeEndpoint(projectionPortMain));
16✔
179
                        this.#remoteView = Comlink.wrap<TView>(nodeEndpoint(viewPortMain));
16✔
180
                }
181
        }
182

183
        // eslint-disable-next-line class-methods-use-this
184
        protected async _createWorker(workerModulePath: string, data: IWorkerData): Promise<Worker> {
185
                return createWorker(workerModulePath, data);
16✔
186
        }
187

188
        protected _onWorkerError = (error: unknown) => {
18✔
NEW
189
                this._logger?.error('worker error', {
×
190
                        error: extractErrorDetails(error)
191
                });
192
        };
193

194
        protected _onWorkerExit = (exitCode: number) => {
18✔
195
                if (exitCode !== 0)
12✔
196
                        this._logger?.error(`worker exited with code ${exitCode}`);
12✔
197
        };
198

199
        protected _pong(): true {
NEW
200
                this.assertWorkerThread();
×
NEW
201
                return true;
×
202
        }
203

204
        protected assertMainThread(): asserts this is this & IMainThreadProjection<TView> {
205
                if (!isMainThread)
96!
NEW
206
                        throw new Error('This method can only be called from the main thread');
×
207
                if (!this.#useWorkerThreads)
96✔
208
                        throw new Error('Worker threads are disabled for this projection instance');
2✔
209
                if (!this.#workerInit)
94!
NEW
210
                        throw new Error('Worker instance is not initialized');
×
211
                if (!this.#remoteProjection)
94!
NEW
212
                        throw new Error('Remote projection instance is not initialized');
×
213
                if (!this.#remoteView)
94!
NEW
214
                        throw new Error('Remote view instance is not initialized');
×
215
        }
216

217
        // eslint-disable-next-line class-methods-use-this
218
        protected assertWorkerThread() {
NEW
219
                if (!parentPort)
×
NEW
220
                        throw new Error('This method can only be called from a Worker thread');
×
221
        }
222

223
        async ensureWorkerReady(): Promise<void> {
224
                if (this.#useWorkerThreads && isMainThread)
18✔
225
                        await this.#workerInit;
16✔
226
        }
227

228
        protected async _project(event: IEvent): Promise<void> {
229
                if (this.#useWorkerThreads && isMainThread) {
22✔
230
                        if (!this.#worker)
20!
NEW
231
                                await this.#workerInit;
×
232

233
                        return this.remoteProjection._project(event);
20✔
234
                }
235

236
                return super._project(event);
2✔
237
        }
238

239
        dispose() {
240
                if (this.#useWorkerThreads && isMainThread) {
18✔
241
                        this.#remoteProjection?.[Comlink.releaseProxy]();
16✔
242
                        this.#remoteView?.[Comlink.releaseProxy]();
16✔
243
                        this.#worker?.terminate();
16✔
244
                }
245
        }
246
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc