• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

snatalenko / node-cqrs / 14002510880

21 Mar 2025 11:15PM UTC coverage: 82.347% (-12.1%) from 94.436%
14002510880

push

github

snatalenko
1.0.0-rc.6

450 of 731 branches covered (61.56%)

877 of 1065 relevant lines covered (82.35%)

21.41 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

79.66
/src/AbstractProjection.ts
1
import { describe } from './Event';
20✔
2
import { InMemoryView } from './infrastructure/memory/InMemoryView';
20✔
3
import {
20✔
4
        IViewLocker,
5
        IEventLocker,
6
        IProjection,
7
        ILogger,
8
        IExtendableLogger,
9
        IEventStore,
10
        IEvent,
11
        isViewLocker,
12
        isEventLocker
13
} from './interfaces';
14

15
import {
20✔
16
        getClassName,
17
        validateHandlers,
18
        getHandler,
19
        subscribe,
20
        getMessageHandlerNames
21
} from './utils';
22

23
export type AbstractProjectionParams<T> = {
24
        /**
25
         * The default view associated with the projection.
26
         * Can optionally implement IViewLocker and/or IEventLocker.
27
         */
28
        view?: T,
29

30
        /**
31
         * Manages view restoration state to prevent early access to an inconsistent view
32
         * or conflicts from concurrent restoration by other processes.
33
         */
34
        viewLocker?: IViewLocker,
35

36
        /**
37
         * Tracks event processing state to prevent concurrent handling by multiple processes.
38
         */
39
        eventLocker?: IEventLocker,
40

41
        logger?: ILogger | IExtendableLogger
42
}
43

44
/**
45
 * Base class for Projection definition
46
 */
47
export abstract class AbstractProjection<TView = InMemoryView<any>> implements IProjection<TView> {
20✔
48

49
        /**
50
         * List of event types handled by the projection. Can be overridden in the projection implementation.
51
         * If not overridden, event types will be inferred from handler methods defined on the Projection class.
52
         */
53
        static get handles(): string[] {
54
                return getMessageHandlerNames(this);
16✔
55
        }
56

57
        #view?: TView;
58
        #viewLocker?: IViewLocker;
59
        #eventLocker?: IEventLocker;
60
        protected _logger?: ILogger;
61

62
        /**
63
         * The default view associated with the projection.
64
         * Can optionally implement IViewLocker and/or IEventLocker.
65
         */
66
        public get view(): TView {
67
                return this.#view ?? (this.#view = new InMemoryView() as TView);
284✔
68
        }
69

70
        protected set view(value: TView) {
71
                this.#view = value;
×
72
        }
73

74
        /**
75
         * Manages view restoration state to prevent early access to an inconsistent view
76
         * or conflicts from concurrent restoration by other processes.
77
         */
78
        protected get _viewLocker(): IViewLocker | undefined {
79
                return this.#viewLocker ?? (isViewLocker(this.view) ? this.view : undefined);
82!
80
        }
81

82
        protected set _viewLocker(value: IViewLocker | undefined) {
83
                this.#viewLocker = value;
×
84
        }
85

86
        /**
87
         * Tracks event processing state to prevent concurrent handling by multiple processes.
88
         */
89
        protected get _eventLocker(): IEventLocker | undefined {
90
                return this.#eventLocker ?? (isEventLocker(this.view) ? this.view : undefined);
80!
91
        }
92

93
        protected set _eventLocker(value: IEventLocker | undefined) {
94
                this.#eventLocker = value;
×
95
        }
96

97
        constructor({
3✔
98
                view,
99
                viewLocker,
100
                eventLocker,
101
                logger
102
        }: AbstractProjectionParams<TView> = {}) {
103
                validateHandlers(this);
30✔
104

105
                this.#view = view;
30✔
106
                this.#viewLocker = viewLocker;
30✔
107
                this.#eventLocker = eventLocker;
30✔
108

109
                this._logger = logger && 'child' in logger ?
30!
110
                        logger.child({ service: getClassName(this) }) :
111
                        logger;
112
        }
113

114
        /** Subscribe to event store */
115
        async subscribe(eventStore: IEventStore): Promise<void> {
116
                subscribe(eventStore, this, {
8✔
117
                        masterHandler: (e: IEvent) => this.project(e)
×
118
                });
119

120
                await this.restore(eventStore);
8✔
121
        }
122

123
        /** Pass event to projection event handler */
124
        async project(event: IEvent): Promise<void> {
125
                if (this._viewLocker && !this._viewLocker?.ready) {
4✔
126
                        this._logger?.debug('view is locked, awaiting until it is ready');
2✔
127
                        await this._viewLocker.once('ready');
2✔
128
                }
129

130
                return this._project(event);
4✔
131
        }
132

133
        /** Pass event to projection event handler, without awaiting for restore operation to complete */
134
        protected async _project(event: IEvent): Promise<void> {
135
                const handler = getHandler(this, event.type);
32✔
136
                if (!handler)
32✔
137
                        throw new Error(`'${event.type}' handler is not defined or not a function`);
2✔
138

139
                if (this._eventLocker) {
30!
140
                        const eventLockObtained = await this._eventLocker.tryMarkAsProjecting(event);
×
141
                        if (!eventLockObtained)
×
142
                                return;
×
143
                }
144

145
                await handler.call(this, event);
30✔
146

147
                if (this._eventLocker)
30!
148
                        await this._eventLocker.markAsProjected(event);
×
149
        }
150

151
        /** Restore projection view from event store */
152
        async restore(eventStore: IEventStore): Promise<void> {
153
                // lock the view to ensure same restoring procedure
154
                // won't be performed by another projection instance
155
                if (this._viewLocker)
20✔
156
                        await this._viewLocker.lock();
20✔
157

158
                await this._restore(eventStore);
20✔
159

160
                if (this._viewLocker)
16✔
161
                        this._viewLocker.unlock();
16✔
162
        }
163

164
        /** Restore projection view from event store */
165
        protected async _restore(eventStore: IEventStore): Promise<void> {
166
                if (!eventStore)
20!
167
                        throw new TypeError('eventStore argument required');
×
168
                if (typeof eventStore.getEventsByTypes !== 'function')
20!
169
                        throw new TypeError('eventStore.getEventsByTypes must be a Function');
×
170

171
                let lastEvent: IEvent | undefined;
172

173
                if (this._eventLocker) {
20!
174
                        this._logger?.debug('retrieving last event projected');
×
175
                        lastEvent = await this._eventLocker.getLastEvent();
×
176
                }
177

178
                this._logger?.debug(`retrieving ${lastEvent ? `events after ${describe(lastEvent)}` : 'all events'}...`);
20!
179

180
                const messageTypes = (this.constructor as typeof AbstractProjection).handles;
20✔
181
                const eventsIterable = eventStore.getEventsByTypes(messageTypes, { afterEvent: lastEvent });
20✔
182

183
                let eventsCount = 0;
20✔
184
                const startTs = Date.now();
20✔
185

186
                for await (const event of eventsIterable) {
20✔
187
                        try {
26✔
188
                                await this._project(event);
26✔
189
                                eventsCount += 1;
24✔
190
                        }
191
                        catch (err) {
192
                                this._onRestoringError(err, event);
2✔
193
                        }
194
                }
195

196
                this._logger?.info(`view restored from ${eventsCount} event(s) in ${Date.now() - startTs} ms`);
16✔
197
        }
198

199
        /** Handle error on restoring. Logs and throws error by default */
200
        protected _onRestoringError(error: Error, event: IEvent) {
201
                this._logger?.error(`view restoring has failed (view will remain locked): ${error.message}`, {
2✔
202
                        service: getClassName(this),
203
                        event,
204
                        stack: error.stack
205
                });
206
                throw error;
2✔
207
        }
208
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc