• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

snatalenko / node-cqrs / 14431053423

13 Apr 2025 03:59PM UTC coverage: 83.069% (+0.7%) from 82.347%
14431053423

push

github

snatalenko
1.0.0-rc.8

490 of 782 branches covered (62.66%)

996 of 1199 relevant lines covered (83.07%)

21.49 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

81.94
/src/EventStore.ts
1
import {
22✔
2
        IAggregateSnapshotStorage,
3
        IEvent,
4
        IEventStorageReader,
5
        IEventSet,
6
        ILogger,
7
        IMessageHandler,
8
        IObservable,
9
        IEventStream,
10
        IEventStore,
11
        EventQueryAfter,
12
        EventQueryBefore,
13
        Identifier,
14
        IIdentifierProvider,
15
        isIdentifierProvider,
16
        IEventDispatcher,
17
        IEventBus,
18
        isIEventBus,
19
        isIEventStorageReader,
20
        IContainer
21
} from './interfaces';
22
import {
22✔
23
        getClassName,
24
        setupOneTimeEmitterSubscription
25
} from './utils';
26
import { EventDispatcher } from './EventDispatcher';
22✔
27

28
export class EventStore implements IEventStore {
22✔
29

30
        #identifierProvider: IIdentifierProvider;
31
        #eventStorageReader: IEventStorageReader;
32
        #snapshotStorage: IAggregateSnapshotStorage | undefined;
33
        eventBus: IEventBus;
34
        #eventDispatcher: IEventDispatcher;
35
        #sagaStarters: Set<string> = new Set();
52✔
36
        #logger?: ILogger;
37

38
        constructor({
39
                eventStorageReader,
40
                identifierProvider = isIdentifierProvider(eventStorageReader) ? eventStorageReader : undefined,
×
41
                snapshotStorage,
42
                eventBus,
43
                eventDispatcher,
44
                logger
45
        }: Pick<IContainer,
46
                'identifierProvider' |
47
                'eventStorageReader' |
48
                'snapshotStorage' |
49
                'eventBus' |
50
                'eventDispatcher' |
51
                'logger'
52
        >) {
53
                if (!eventStorageReader)
52!
54
                        throw new TypeError('eventStorageReader argument required');
×
55
                if (!identifierProvider)
52!
56
                        throw new TypeError('identifierProvider argument required');
×
57
                if (!isIEventStorageReader(eventStorageReader))
52!
58
                        throw new TypeError('storage does not implement IEventStorage interface');
×
59
                if (eventBus && !isIEventBus(eventBus))
52!
60
                        throw new TypeError('eventBus does not implement IMessageBus interface');
×
61

62
                this.#eventStorageReader = eventStorageReader;
52✔
63
                this.#identifierProvider = identifierProvider;
52✔
64
                this.#snapshotStorage = snapshotStorage;
52✔
65
                this.#eventDispatcher = eventDispatcher ?? new EventDispatcher({ eventBus });
52!
66
                this.eventBus = eventBus ?? this.#eventDispatcher.eventBus;
52!
67
                this.#logger = logger && 'child' in logger ?
52!
68
                        logger.child({ service: getClassName(this) }) :
69
                        logger;
70
        }
71

72
        /**
73
         * Generates and returns a new unique identifier using the configured identifier provider.
74
         *
75
         * @returns A promise resolving to a unique identifier suitable for aggregates, sagas, and events.
76
         */
77
        async getNewId(): Promise<Identifier> {
78
                return this.#identifierProvider.getNewId();
24✔
79
        }
80

81
        async* getEventsByTypes(eventTypes: Readonly<string[]>, options?: EventQueryAfter): IEventStream {
82
                if (!Array.isArray(eventTypes))
4!
83
                        throw new TypeError('eventTypes argument must be an Array');
×
84

85
                this.#logger?.debug(`retrieving ${eventTypes.join(', ')} events...`);
4✔
86

87
                const eventsIterable = await this.#eventStorageReader.getEventsByTypes(eventTypes, options);
4✔
88

89
                yield* eventsIterable;
4✔
90

91
                this.#logger?.debug(`${eventTypes.join(', ')} events retrieved`);
2✔
92
        }
93

94
        /** Retrieve all events of specific Aggregate */
95
        async* getAggregateEvents(aggregateId: Identifier): IEventStream {
96
                if (!aggregateId)
8!
97
                        throw new TypeError('aggregateId argument required');
×
98

99
                this.#logger?.debug(`retrieving event stream for aggregate ${aggregateId}...`);
8✔
100

101
                const snapshot = this.#snapshotStorage ?
8!
102
                        await this.#snapshotStorage.getAggregateSnapshot(aggregateId) :
103
                        undefined;
104

105
                if (snapshot)
8✔
106
                        yield snapshot;
2✔
107

108
                const eventsIterable = await this.#eventStorageReader.getAggregateEvents(aggregateId, { snapshot });
8✔
109

110
                yield* eventsIterable;
8✔
111

112
                this.#logger?.debug(`all events for aggregate ${aggregateId} retrieved`);
8✔
113
        }
114

115
        /** Retrieve events of specific Saga */
116
        async* getSagaEvents(sagaId: Identifier, filter: EventQueryBefore) {
117
                if (!sagaId)
4!
118
                        throw new TypeError('sagaId argument required');
×
119
                if (!filter)
4!
120
                        throw new TypeError('filter argument required');
×
121
                if (!filter.beforeEvent)
4!
122
                        throw new TypeError('filter.beforeEvent argument required');
×
123
                if (filter.beforeEvent.sagaVersion === undefined)
4!
124
                        throw new TypeError('filter.beforeEvent.sagaVersion argument required');
×
125

126
                this.#logger?.debug(`retrieving event stream for saga ${sagaId}, v${filter.beforeEvent.sagaVersion}...`);
4✔
127

128
                const eventsIterable = await this.#eventStorageReader.getSagaEvents(sagaId, filter);
4✔
129

130
                yield* eventsIterable;
4✔
131

132
                this.#logger?.debug(`all events for saga ${sagaId} retrieved`);
4✔
133
        }
134

135
        /**
136
         * Register event types that start sagas.
137
         * Upon such event commit a new sagaId will be assigned
138
         */
139
        registerSagaStarters(eventTypes: string[] = []) {
3✔
140
                for (const eventType of eventTypes)
10✔
141
                        this.#sagaStarters.add(eventType);
4✔
142
        }
143

144
        /**
145
         * Validate events, commit to storage and publish to messageBus, if needed
146
         *
147
         * @param events - a set of events to commit
148
         * @returns Signed and committed events
149
         */
150
        async dispatch(events: IEventSet): Promise<IEventSet> {
151
                if (!Array.isArray(events))
30✔
152
                        throw new TypeError('events argument must be an Array');
2✔
153

154
                const augmentedEvents = await this.#attachSagaIdToSagaStarterEvents(events);
28✔
155

156
                return this.#eventDispatcher.dispatch(augmentedEvents, { origin: 'internal' });
28✔
157
        }
158

159
        /**
160
         * Generate and attach sagaId to events that start new sagas
161
         */
162
        async #attachSagaIdToSagaStarterEvents(events: IEventSet): Promise<IEventSet> {
163
                if (!this.#sagaStarters.size)
28✔
164
                        return events;
24✔
165

166
                const augmentedEvents: IEvent[] = [];
4✔
167
                for (const event of events) {
4✔
168
                        if (this.#sagaStarters.has(event.type)) {
4!
169
                                if (event.sagaId)
4!
170
                                        throw new Error(`Event "${event.type}" already contains sagaId. Multiple sagas with same event type are not supported`);
×
171

172
                                (event as IEvent).sagaId = await this.getNewId();
4✔
173
                                (event as IEvent).sagaVersion = 0;
4✔
174

175
                                augmentedEvents.push(event);
4✔
176
                        }
177
                        else {
178
                                augmentedEvents.push(event);
×
179
                        }
180
                }
181
                return augmentedEvents;
4✔
182
        }
183

184
        on(messageType: string, handler: IMessageHandler) {
185
                this.eventBus.on(messageType, handler);
4✔
186
        }
187

188
        off(messageType: string, handler: IMessageHandler) {
189
                this.eventBus.off(messageType, handler);
2✔
190
        }
191

192
        queue(name: string): IObservable {
193
                if (!this.eventBus.queue)
4!
194
                        throw new Error('Injected eventBus does not support named queues');
×
195

196
                return this.eventBus.queue(name);
4✔
197
        }
198

199
        /** Creates one-time subscription for one or multiple events that match a filter */
200
        once(messageTypes: string | string[], handler?: IMessageHandler, filter?: (e: IEvent) => boolean): Promise<IEvent> {
201
                const subscribeTo = Array.isArray(messageTypes) ? messageTypes : [messageTypes];
4!
202

203
                return setupOneTimeEmitterSubscription(this.eventBus, subscribeTo, filter, handler, this.#logger);
4✔
204
        }
205
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc