• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

snatalenko / node-cqrs / 21717407497

05 Feb 2026 03:26PM UTC coverage: 84.53% (-9.9%) from 94.396%
21717407497

Pull #28

github

web-flow
Merge 025edb883 into 828e39903
Pull Request #28: TypeScript and event dispatching pipeline refactoring

611 of 939 branches covered (65.07%)

819 of 934 new or added lines in 65 files covered. (87.69%)

59 existing lines in 13 files now uncovered.

1213 of 1435 relevant lines covered (84.53%)

28.39 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

81.94
/src/EventStore.ts
1
import {
22✔
2
        type IAggregateSnapshotStorage,
3
        type IEvent,
4
        type IEventStorageReader,
5
        type IEventSet,
6
        type ILogger,
7
        type IMessageHandler,
8
        type IObservable,
9
        type IEventStream,
10
        type IEventStore,
11
        type EventQueryAfter,
12
        type EventQueryBefore,
13
        type Identifier,
14
        type IIdentifierProvider,
15
        type IEventDispatcher,
16
        type IEventBus,
17
        type IContainer,
18
        isIdentifierProvider,
19
        isIEventBus,
20
        isIEventStorageReader,
21
        isEventSet,
22
        isIObservableQueueProvider
23
} from './interfaces/index.ts';
24
import {
22✔
25
        getClassName,
26
        setupOneTimeEmitterSubscription
27
} from './utils/index.ts';
28
import { EventDispatcher } from './EventDispatcher.ts';
22✔
29

30
export class EventStore implements IEventStore {
22✔
31

32
        #identifierProvider: IIdentifierProvider;
33
        #eventStorageReader: IEventStorageReader;
34
        #snapshotStorage: IAggregateSnapshotStorage | undefined;
35
        eventBus: IEventBus;
36
        #eventDispatcher: IEventDispatcher;
37
        #sagaStarters: Set<string> = new Set();
56✔
38
        #logger?: ILogger;
39

40
        constructor({
41
                eventStorageReader,
42
                identifierProvider = isIdentifierProvider(eventStorageReader) ? eventStorageReader : undefined,
×
43
                snapshotStorage,
44
                eventBus,
45
                eventDispatcher,
46
                eventDispatchPipeline,
47
                eventDispatchPipelines,
48
                logger
49
        }: Pick<IContainer,
50
                'identifierProvider' |
51
                'eventStorageReader' |
52
                'snapshotStorage' |
53
                'eventBus' |
54
                'eventDispatcher' |
55
                'logger' |
56
                'eventDispatchPipeline' |
57
                'eventDispatchPipelines'
58
        >) {
59
                if (!eventStorageReader)
56!
NEW
60
                        throw new TypeError('eventStorageReader argument required');
×
61
                if (!identifierProvider)
56!
NEW
62
                        throw new TypeError('identifierProvider argument required');
×
63
                if (!isIEventStorageReader(eventStorageReader))
56!
UNCOV
64
                        throw new TypeError('storage does not implement IEventStorage interface');
×
65
                if (eventBus && !isIEventBus(eventBus))
56!
NEW
66
                        throw new TypeError('eventBus does not implement IMessageBus interface');
×
67

68
                this.#eventStorageReader = eventStorageReader;
56✔
69
                this.#identifierProvider = identifierProvider;
56✔
70
                this.#snapshotStorage = snapshotStorage;
56✔
71
                this.#eventDispatcher = eventDispatcher ?? new EventDispatcher({
56!
72
                        eventBus,
73
                        eventDispatchPipeline,
74
                        eventDispatchPipelines
75
                });
76
                this.eventBus = eventBus ?? this.#eventDispatcher.eventBus;
56!
77
                this.#logger = logger && 'child' in logger ?
56!
78
                        logger.child({ service: getClassName(this) }) :
79
                        logger;
80
        }
81

82
        /**
83
         * Generates and returns a new unique identifier using the configured identifier provider.
84
         *
85
         * @returns A promise resolving to a unique identifier suitable for aggregates, sagas, and events.
86
         */
87
        async getNewId(): Promise<Identifier> {
88
                return this.#identifierProvider.getNewId();
24✔
89
        }
90

91
        async* getEventsByTypes(eventTypes: Readonly<string[]>, options?: EventQueryAfter): IEventStream {
92
                if (!Array.isArray(eventTypes))
4!
NEW
93
                        throw new TypeError('eventTypes argument must be an Array');
×
94

95
                this.#logger?.debug(`retrieving ${eventTypes.join(', ')} events...`);
4✔
96

97
                const eventsIterable = await this.#eventStorageReader.getEventsByTypes(eventTypes, options);
4✔
98

99
                yield* eventsIterable;
4✔
100

101
                this.#logger?.debug(`${eventTypes.join(', ')} events retrieved`);
2✔
102
        }
103

104
        /** Retrieve all events of specific Aggregate */
105
        async* getAggregateEvents(aggregateId: Identifier): IEventStream {
106
                if (!aggregateId)
20!
UNCOV
107
                        throw new TypeError('aggregateId argument required');
×
108

109
                this.#logger?.debug(`retrieving event stream for aggregate ${aggregateId}...`);
20✔
110

111
                const snapshot = this.#snapshotStorage ?
20!
112
                        await this.#snapshotStorage.getAggregateSnapshot(aggregateId) :
113
                        undefined;
114

115
                if (snapshot)
20✔
116
                        yield snapshot;
2✔
117

118
                const eventsIterable = await this.#eventStorageReader.getAggregateEvents(aggregateId, { snapshot });
20✔
119

120
                yield* eventsIterable;
20✔
121

122
                this.#logger?.debug(`all events for aggregate ${aggregateId} retrieved`);
20✔
123
        }
124

125
        /** Retrieve events of specific Saga */
126
        async* getSagaEvents(sagaId: Identifier, filter: EventQueryBefore) {
127
                if (!sagaId)
4!
UNCOV
128
                        throw new TypeError('sagaId argument required');
×
129
                if (!filter)
4!
UNCOV
130
                        throw new TypeError('filter argument required');
×
131
                if (!filter.beforeEvent)
4!
UNCOV
132
                        throw new TypeError('filter.beforeEvent argument required');
×
133
                if (filter.beforeEvent.sagaVersion === undefined)
4!
UNCOV
134
                        throw new TypeError('filter.beforeEvent.sagaVersion argument required');
×
135

136
                this.#logger?.debug(`retrieving event stream for saga ${sagaId}, v${filter.beforeEvent.sagaVersion}...`);
4✔
137

138
                const eventsIterable = await this.#eventStorageReader.getSagaEvents(sagaId, filter);
4✔
139

140
                yield* eventsIterable;
4✔
141

142
                this.#logger?.debug(`all events for saga ${sagaId} retrieved`);
4✔
143
        }
144

145
        /**
146
         * Register event types that start sagas.
147
         * Upon such event commit a new sagaId will be assigned
148
         */
149
        registerSagaStarters(eventTypes: string[] = []) {
3✔
150
                for (const eventType of eventTypes)
10✔
151
                        this.#sagaStarters.add(eventType);
4✔
152
        }
153

154
        /**
155
         * Validate events, commit to storage and publish to messageBus, if needed
156
         *
157
         * @param events - a set of events to commit
158
         * @returns Signed and committed events
159
         */
160
        async dispatch(events: IEventSet): Promise<IEventSet> {
161
                if (!isEventSet(events) || events.length === 0)
44✔
162
                        throw new TypeError('dispatch requires a non-empty array of events');
2✔
163

164
                const augmentedEvents = await this.#attachSagaIdToSagaStarterEvents(events);
42✔
165

166
                return this.#eventDispatcher.dispatch(augmentedEvents, { origin: 'internal' });
42✔
167
        }
168

169
        /**
170
         * Generate and attach sagaId to events that start new sagas
171
         */
172
        async #attachSagaIdToSagaStarterEvents(events: IEventSet): Promise<IEventSet> {
173
                if (!this.#sagaStarters.size)
42✔
174
                        return events;
38✔
175

176
                const augmentedEvents: IEvent[] = [];
4✔
177
                for (const event of events) {
4✔
178
                        if (this.#sagaStarters.has(event.type)) {
4!
179
                                if (event.sagaId)
4!
UNCOV
180
                                        throw new Error(`Event "${event.type}" already contains sagaId. Multiple sagas with same event type are not supported`);
×
181

182
                                (event as IEvent).sagaId = await this.getNewId();
4✔
183
                                (event as IEvent).sagaVersion = 0;
4✔
184

185
                                augmentedEvents.push(event);
4✔
186
                        }
187
                        else {
188
                                augmentedEvents.push(event);
×
189
                        }
190
                }
191
                return augmentedEvents;
4✔
192
        }
193

194
        on(messageType: string, handler: IMessageHandler) {
195
                this.eventBus.on(messageType, handler);
4✔
196
        }
197

198
        off(messageType: string, handler: IMessageHandler) {
199
                this.eventBus.off(messageType, handler);
2✔
200
        }
201

202
        queue(name: string): IObservable {
203
                if (!isIObservableQueueProvider(this.eventBus))
4!
NEW
204
                        throw new Error('Injected eventBus does not support named queues');
×
205

206
                return this.eventBus.queue(name);
4✔
207
        }
208

209
        /** Creates one-time subscription for one or multiple events that match a filter */
210
        once(messageTypes: string | string[], handler?: IMessageHandler, filter?: (e: IEvent) => boolean): Promise<IEvent> {
211
                const subscribeTo = Array.isArray(messageTypes) ? messageTypes : [messageTypes];
4!
212

213
                return setupOneTimeEmitterSubscription(this.eventBus, subscribeTo, filter, handler, this.#logger);
4✔
214
        }
215
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc