• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

snatalenko / node-cqrs / 11152250527

02 Oct 2024 10:01PM UTC coverage: 93.73% (-0.9%) from 94.648%
11152250527

push

github

snatalenko
1.0.0-rc.4

559 of 873 branches covered (64.03%)

2347 of 2504 relevant lines covered (93.73%)

21.92 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

95.87
/src/EventStore.ts
1
import {
4✔
2
        IAggregateSnapshotStorage,
4✔
3
        Identifier,
4✔
4
        IEvent,
4✔
5
        IEventQueryFilter,
4✔
6
        IEventStorage,
4✔
7
        IEventSet,
4✔
8
        IExtendableLogger,
4✔
9
        ILogger,
4✔
10
        IMessageBus,
4✔
11
        IMessageHandler,
4✔
12
        IObservable,
4✔
13
        IEventStream,
4✔
14
        IEventStore
4✔
15
} from "./interfaces";
4✔
16
import { getClassName, setupOneTimeEmitterSubscription } from "./utils";
4✔
17
import * as Event from './Event';
4✔
18

4✔
19
const isIEventStorage = (storage: IEventStorage): storage is IEventStorage =>
4✔
20
        storage
60✔
21
        && typeof storage.getNewId === 'function'
60✔
22
        && typeof storage.commitEvents === 'function'
60✔
23
        && typeof storage.getEvents === 'function'
60✔
24
        && typeof storage.getAggregateEvents === 'function'
60✔
25
        && typeof storage.getSagaEvents === 'function';
4✔
26

4✔
27
const isIObservable = (obj: IObservable | any): obj is IObservable =>
4✔
28
        obj
186✔
29
        && 'on' in obj
186✔
30
        && typeof obj.on === 'function'
186✔
31
        && 'off' in obj
186✔
32
        && typeof obj.off === 'function';
4✔
33

4✔
34
const isIMessageBus = (bus: IMessageBus | any): bus is IMessageBus =>
4✔
35
        bus
60✔
36
        && isIObservable(bus)
60✔
37
        && 'send' in bus
60✔
38
        && typeof bus.send === 'function'
60✔
39
        && 'publish' in bus
60✔
40
        && typeof bus.publish === 'function';
4✔
41

4✔
42
const SNAPSHOT_EVENT_TYPE = 'snapshot';
4✔
43

4✔
44
export class EventStore implements IEventStore {
60✔
45

60✔
46
        #publishAsync: boolean;
60✔
47
        #validator: (event: IEvent<any>) => void;
60✔
48
        #logger?: ILogger;
60✔
49
        #storage: IEventStorage;
60✔
50
        #messageBus?: IMessageBus;
60✔
51
        #snapshotStorage: IAggregateSnapshotStorage | undefined;
60✔
52
        #sagaStarters: string[] = [];
60✔
53
        #defaultEventEmitter: IObservable;
60✔
54

60✔
55
        /** Whether storage supports aggregate snapshots */
60✔
56
        get snapshotsSupported(): boolean {
60✔
57
                return Boolean(this.#snapshotStorage);
10✔
58
        }
10✔
59

60✔
60
        constructor({
60✔
61
                storage,
60✔
62
                messageBus,
60✔
63
                snapshotStorage,
60✔
64
                eventValidator = Event.validate,
60✔
65
                eventStoreConfig,
60✔
66
                logger
60✔
67
        }: {
60✔
68
                storage: IEventStorage,
60✔
69
                messageBus?: IMessageBus,
60✔
70
                snapshotStorage?: IAggregateSnapshotStorage,
60✔
71
                eventValidator?: IMessageHandler,
60✔
72
                eventStoreConfig?: {
60✔
73
                        publishAsync?: boolean
60✔
74
                },
60✔
75
                logger?: ILogger | IExtendableLogger
60✔
76
        }) {
60✔
77
                if (!storage)
60✔
78
                        throw new TypeError('storage argument required');
60!
79
                if (!isIEventStorage(storage))
60✔
80
                        throw new TypeError('storage does not implement IEventStorage interface');
60!
81
                if (messageBus && !isIMessageBus(messageBus))
60✔
82
                        throw new TypeError('messageBus does not implement IMessageBus interface');
60!
83
                if (messageBus && isIObservable(storage))
60✔
84
                        throw new TypeError('both storage and messageBus implement IObservable interface, it is not yet supported');
60!
85

60✔
86
                const defaultEventEmitter = isIObservable(storage) ? storage : messageBus;
60!
87
                if (!defaultEventEmitter)
60✔
88
                        throw new TypeError('storage must implement IObservable if messageBus is not injected');
60!
89

60✔
90
                this.#publishAsync = eventStoreConfig?.publishAsync ?? true;
60✔
91
                this.#validator = eventValidator;
60✔
92
                this.#logger = logger && 'child' in logger ?
60!
93
                        logger.child({ service: getClassName(this) }) :
60✔
94
                        logger;
60✔
95
                this.#storage = storage;
60✔
96
                this.#snapshotStorage = snapshotStorage;
60✔
97
                this.#messageBus = messageBus;
60✔
98
                this.#defaultEventEmitter = defaultEventEmitter;
60✔
99
        }
60✔
100

60✔
101
        /** Retrieve new ID from the storage */
60✔
102
        async getNewId(): Promise<Identifier> {
60✔
103
                return this.#storage.getNewId();
12✔
104
        }
12✔
105

60✔
106
        /** Retrieve all events of specific types */
60✔
107
        async* getAllEvents(eventTypes?: string[]): IEventStream {
60✔
108
                if (eventTypes && !Array.isArray(eventTypes))
6✔
109
                        throw new TypeError('eventTypes, if specified, must be an Array');
6!
110

6✔
111
                this.#logger?.debug(`retrieving ${eventTypes ? eventTypes.join(', ') : 'all'} events...`);
6!
112

6✔
113
                const eventsIterable = await this.#storage.getEvents(eventTypes);
6✔
114

6✔
115
                yield* eventsIterable;
6✔
116

6✔
117
                this.#logger?.debug(`${eventTypes ? eventTypes.join(', ') : 'all'} events retrieved`);
6!
118
        }
6✔
119

60✔
120
        /** Retrieve all events of specific Aggregate */
60✔
121
        async getAggregateEvents(aggregateId: Identifier): Promise<IEventSet> {
60✔
122
                if (!aggregateId)
10✔
123
                        throw new TypeError('aggregateId argument required');
10!
124

10✔
125
                this.#logger?.debug(`retrieving event stream for aggregate ${aggregateId}...`);
10!
126

10✔
127
                const snapshot = this.#snapshotStorage ?
10✔
128
                        await this.#snapshotStorage.getAggregateSnapshot(aggregateId) :
10!
129
                        undefined;
10✔
130

10✔
131
                const events: IEvent[] = [];
10✔
132
                if (snapshot)
10✔
133
                        events.push(snapshot);
10✔
134

10✔
135
                const eventsIterable = await this.#storage.getAggregateEvents(aggregateId, { snapshot });
10✔
136
                for await (const event of eventsIterable)
10✔
137
                        events.push(event);
10✔
138

10✔
139
                this.#logger?.debug(`${Event.describeMultiple(events)} retrieved`);
10!
140

10✔
141
                return events;
10✔
142
        }
10✔
143

60✔
144
        /** Retrieve events of specific Saga */
60✔
145
        async getSagaEvents(sagaId: Identifier, filter: Pick<IEventQueryFilter, "beforeEvent">) {
60✔
146
                if (!sagaId)
2✔
147
                        throw new TypeError('sagaId argument required');
2!
148
                if (!filter)
2✔
149
                        throw new TypeError('filter argument required');
2!
150
                if (!filter.beforeEvent)
2✔
151
                        throw new TypeError('filter.beforeEvent argument required');
2!
152
                if (filter.beforeEvent.sagaVersion === undefined)
2✔
153
                        throw new TypeError('filter.beforeEvent.sagaVersion argument required');
2!
154

2✔
155
                this.#logger?.debug(`retrieving event stream for saga ${sagaId}, v${filter.beforeEvent.sagaVersion}...`);
2!
156

2✔
157
                const events: IEvent[] = [];
2✔
158
                const eventsIterable = await this.#storage.getSagaEvents(sagaId, filter);
2✔
159
                for await (const event of eventsIterable)
2✔
160
                        events.push(event);
2✔
161

2✔
162
                this.#logger?.debug(`${Event.describeMultiple(events)} retrieved`);
2!
163

2✔
164
                return events;
2✔
165
        }
2✔
166

60✔
167
        /**
60✔
168
         * Register event types that start sagas.
60✔
169
         * Upon such event commit a new sagaId will be assigned
60✔
170
         */
60✔
171
        registerSagaStarters(eventTypes: string[] = []) {
60✔
172
                const uniqueEventTypes = eventTypes.filter(e => !this.#sagaStarters.includes(e));
8✔
173
                this.#sagaStarters.push(...uniqueEventTypes);
8✔
174
        }
8✔
175

60✔
176
        /**
60✔
177
         * Validate events, commit to storage and publish to messageBus, if needed
60✔
178
         *
60✔
179
         * @param {IEventSet} events - a set of events to commit
60✔
180
         * @returns {Promise<IEventSet>} - resolves to signed and committed events
60✔
181
         */
60✔
182
        async commit(events) {
60✔
183
                if (!Array.isArray(events))
48✔
184
                        throw new TypeError('events argument must be an Array');
48!
185

48✔
186
                const containsSagaStarters = this.#sagaStarters.length && events.some(e => this.#sagaStarters.includes(e.type));
48!
187
                const augmentedEvents = containsSagaStarters ?
48!
188
                        await this.#attachSagaIdToSagaStarterEvents(events) :
48✔
189
                        events;
48✔
190

48✔
191
                const eventStreamWithoutSnapshots = await this.save(augmentedEvents);
48✔
192

42✔
193
                // after events are saved to the persistent storage,
42✔
194
                // publish them to the event bus (i.e. RabbitMq)
42✔
195
                if (this.#messageBus)
42✔
196
                        await this.#publish(eventStreamWithoutSnapshots);
42✔
197

42✔
198
                return eventStreamWithoutSnapshots;
42✔
199
        }
42✔
200

60✔
201
        /** Generate and attach sagaId to events that start new sagas */
60✔
202
        async #attachSagaIdToSagaStarterEvents(events: IEventSet): Promise<IEventSet> {
60✔
203
                const augmentedEvents: IEvent[] = [];
2✔
204
                for (const event of events) {
2✔
205
                        if (this.#sagaStarters.includes(event.type)) {
2✔
206
                                if (event.sagaId)
2✔
207
                                        throw new Error(`Event "${event.type}" already contains sagaId. Multiple sagas with same event type are not supported`);
2!
208

2✔
209
                                (event as IEvent).sagaId = await this.getNewId();
2✔
210
                                (event as IEvent).sagaVersion = 0;
2✔
211

2✔
212
                                augmentedEvents.push(event);
2✔
213
                        }
2✔
214
                        else {
×
215
                                augmentedEvents.push(event);
×
216
                        }
×
217
                }
2✔
218
                return augmentedEvents;
2✔
219
        }
2✔
220

60✔
221
        /** Save events to the persistent storage(s) */
60✔
222
        async save(events: IEventSet): Promise<IEventSet> {
60✔
223
                if (!Array.isArray(events))
48✔
224
                        throw new TypeError('events argument must be an Array');
48!
225

48✔
226
                const snapshotEvents = events.filter(e => e.type === SNAPSHOT_EVENT_TYPE);
48✔
227
                if (snapshotEvents.length > 1)
48✔
228
                        throw new Error(`cannot commit a stream with more than 1 ${SNAPSHOT_EVENT_TYPE} event`);
48!
229
                if (snapshotEvents.length && !this.snapshotsSupported)
48✔
230
                        throw new Error(`${SNAPSHOT_EVENT_TYPE} event type is not supported by the storage`);
48!
231

48✔
232
                const snapshot = snapshotEvents[0];
48✔
233
                const eventsWithoutSnapshot = events.filter(e => e !== snapshot);
48✔
234

48✔
235
                this.#logger?.debug(`validating ${Event.describeMultiple(eventsWithoutSnapshot)}...`);
48!
236
                eventsWithoutSnapshot.forEach(this.#validator);
48✔
237

48✔
238
                this.#logger?.debug(`saving ${Event.describeMultiple(eventsWithoutSnapshot)}...`);
48!
239
                await Promise.all([
48✔
240
                        this.#storage.commitEvents(eventsWithoutSnapshot),
48✔
241
                        snapshot ?
48✔
242
                                this.#snapshotStorage?.saveAggregateSnapshot(snapshot) :
48✔
243
                                undefined
38✔
244
                ]);
48✔
245

42✔
246
                return eventsWithoutSnapshot;
42✔
247
        }
42✔
248

60✔
249
        async #publish(events: IEventSet) {
60✔
250
                if (this.#publishAsync) {
42✔
251
                        this.#logger?.debug(`publishing ${Event.describeMultiple(events)} asynchronously...`);
40!
252
                        setImmediate(() => this.#publishEvents(events));
40✔
253
                }
40✔
254
                else {
2✔
255
                        this.#logger?.debug(`publishing ${Event.describeMultiple(events)} synchronously...`);
2!
256
                        await this.#publishEvents(events);
2✔
257
                }
2✔
258
        }
42✔
259

60✔
260
        async #publishEvents(events: IEventSet) {
60✔
261
                if (!this.#messageBus)
40✔
262
                        return;
40!
263

40✔
264
                try {
40✔
265
                        await Promise.all(events.map(event =>
40✔
266
                                this.#messageBus?.publish(event)));
40✔
267

40✔
268
                        this.#logger?.debug(`${Event.describeMultiple(events)} published`);
40!
269
                }
40✔
270
                catch (error: any) {
40!
271
                        this.#logger?.error(`${Event.describeMultiple(events)} publishing failed: ${error.message}`, {
×
272
                                stack: error.stack
×
273
                        });
×
274
                        throw error;
×
275
                }
×
276
        }
40✔
277

60✔
278
        /** Setup a listener for a specific event type */
60✔
279
        on(messageType: string, handler: IMessageHandler) {
60✔
280
                if (typeof messageType !== 'string' || !messageType.length)
8✔
281
                        throw new TypeError('messageType argument must be a non-empty String');
8!
282
                if (typeof handler !== 'function')
8✔
283
                        throw new TypeError('handler argument must be a Function');
8!
284
                if (arguments.length !== 2)
8✔
285
                        throw new TypeError(`2 arguments are expected, but ${arguments.length} received`);
8!
286

8✔
287
                if (isIObservable(this.#storage))
8✔
288
                        this.#storage.on(messageType, handler);
8!
289

8✔
290
                this.#messageBus?.on(messageType, handler);
8✔
291
        }
8✔
292

60✔
293
        /** Remove previously installed listener */
60✔
294
        off(messageType: string, handler: IMessageHandler) {
60✔
295
                if (isIObservable(this.#storage))
×
296
                        this.#storage.off(messageType, handler);
×
297

×
298
                this.#messageBus?.off(messageType, handler);
×
299
        }
×
300

60✔
301
        /** Get or create a named queue, which delivers events to a single handler only */
60✔
302
        queue(name: string): IObservable {
60✔
303
                if (!this.#defaultEventEmitter.queue)
8✔
304
                        throw new Error('Named queues are not supported by the underlying message bus');
8!
305

8✔
306
                return this.#defaultEventEmitter.queue(name);
8✔
307
        }
8✔
308

60✔
309
        /** Creates one-time subscription for one or multiple events that match a filter */
60✔
310
        once(messageTypes: string | string[], handler: IMessageHandler, filter: (e: IEvent) => boolean): Promise<IEvent> {
60✔
311
                const subscribeTo = Array.isArray(messageTypes) ? messageTypes : [messageTypes];
6!
312

6✔
313
                return setupOneTimeEmitterSubscription(this.#defaultEventEmitter, subscribeTo, filter, handler, this.#logger);
6✔
314
        }
6✔
315
}
60✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc