• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

snatalenko / node-cqrs / 21875627364

10 Feb 2026 05:34PM UTC coverage: 84.658% (-9.7%) from 94.396%
21875627364

Pull #28

github

web-flow
Merge f3844c4de into 828e39903
Pull Request #28: TypeScript and event dispatching pipeline refactoring

626 of 953 branches covered (65.69%)

832 of 947 new or added lines in 65 files covered. (87.86%)

59 existing lines in 13 files now uncovered.

1225 of 1447 relevant lines covered (84.66%)

28.81 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

82.43
/src/EventStore.ts
1
import {
22✔
2
        type IAggregateSnapshotStorage,
3
        type IEvent,
4
        type IEventStorageReader,
5
        type IEventSet,
6
        type ILogger,
7
        type IMessageHandler,
8
        type IObservable,
9
        type IEventStream,
10
        type IEventStore,
11
        type EventQueryAfter,
12
        type EventQueryBefore,
13
        type Identifier,
14
        type IIdentifierProvider,
15
        type IEventDispatcher,
16
        type IEventBus,
17
        type IContainer,
18
        type AggregateEventsQueryParams,
19
        isIdentifierProvider,
20
        isIEventBus,
21
        isIEventStorageReader,
22
        isEventSet,
23
        isIObservableQueueProvider
24
} from './interfaces/index.ts';
25
import {
22✔
26
        getClassName,
27
        setupOneTimeEmitterSubscription
28
} from './utils/index.ts';
29
import { EventDispatcher } from './EventDispatcher.ts';
22✔
30

31
export class EventStore implements IEventStore {
22✔
32

33
        #identifierProvider: IIdentifierProvider;
34
        #eventStorageReader: IEventStorageReader;
35
        #snapshotStorage: IAggregateSnapshotStorage | undefined;
36
        eventBus: IEventBus;
37
        #eventDispatcher: IEventDispatcher;
38
        #sagaStarters: Set<string> = new Set();
58✔
39
        #logger?: ILogger;
40

41
        constructor({
42
                eventStorageReader,
43
                identifierProvider = isIdentifierProvider(eventStorageReader) ? eventStorageReader : undefined,
×
44
                snapshotStorage,
45
                eventBus,
46
                eventDispatcher,
47
                eventDispatchPipeline,
48
                eventDispatchPipelines,
49
                logger
50
        }: Pick<IContainer,
51
                'identifierProvider' |
52
                'eventStorageReader' |
53
                'snapshotStorage' |
54
                'eventBus' |
55
                'eventDispatcher' |
56
                'logger' |
57
                'eventDispatchPipeline' |
58
                'eventDispatchPipelines'
59
        >) {
60
                if (!eventStorageReader)
58!
NEW
61
                        throw new TypeError('eventStorageReader argument required');
×
62
                if (!identifierProvider)
58!
NEW
63
                        throw new TypeError('identifierProvider argument required');
×
64
                if (!isIEventStorageReader(eventStorageReader))
58!
UNCOV
65
                        throw new TypeError('storage does not implement IEventStorage interface');
×
66
                if (eventBus && !isIEventBus(eventBus))
58!
NEW
67
                        throw new TypeError('eventBus does not implement IMessageBus interface');
×
68

69
                this.#eventStorageReader = eventStorageReader;
58✔
70
                this.#identifierProvider = identifierProvider;
58✔
71
                this.#snapshotStorage = snapshotStorage;
58✔
72
                this.#eventDispatcher = eventDispatcher ?? new EventDispatcher({
58!
73
                        eventBus,
74
                        eventDispatchPipeline,
75
                        eventDispatchPipelines
76
                });
77
                this.eventBus = eventBus ?? this.#eventDispatcher.eventBus;
58!
78
                this.#logger = logger && 'child' in logger ?
58!
79
                        logger.child({ service: getClassName(this) }) :
80
                        logger;
81
        }
82

83
        /**
84
         * Generates and returns a new unique identifier using the configured identifier provider.
85
         *
86
         * @returns A promise resolving to a unique identifier suitable for aggregates, sagas, and events.
87
         */
88
        async getNewId(): Promise<Identifier> {
89
                return this.#identifierProvider.getNewId();
24✔
90
        }
91

92
        async* getEventsByTypes(eventTypes: Readonly<string[]>, options?: EventQueryAfter): IEventStream {
93
                if (!Array.isArray(eventTypes))
4!
NEW
94
                        throw new TypeError('eventTypes argument must be an Array');
×
95

96
                this.#logger?.debug(`retrieving ${eventTypes.join(', ')} events...`);
4✔
97

98
                const eventsIterable = await this.#eventStorageReader.getEventsByTypes(eventTypes, options);
4✔
99

100
                yield* eventsIterable;
4✔
101

102
                this.#logger?.debug(`${eventTypes.join(', ')} events retrieved`);
2✔
103
        }
104

105
        /** Retrieve all events of specific Aggregate */
106
        async* getAggregateEvents(aggregateId: Identifier, options?: AggregateEventsQueryParams): IEventStream {
107
                if (!aggregateId)
22!
UNCOV
108
                        throw new TypeError('aggregateId argument required');
×
109

110
                this.#logger?.debug(`retrieving event stream for aggregate ${aggregateId}...`);
22✔
111

112
                // Get snapshot from snapshot storage if not provided in options
113
                let snapshot = options?.snapshot;
22✔
114
                if (!snapshot && this.#snapshotStorage)
22✔
115
                        snapshot = await this.#snapshotStorage.getAggregateSnapshot(aggregateId);
22✔
116

117
                if (snapshot)
22✔
118
                        yield snapshot;
2✔
119

120
                const eventsIterable = await this.#eventStorageReader.getAggregateEvents(aggregateId, {
22✔
121
                        ...options,
122
                        snapshot
123
                });
124

125
                yield* eventsIterable;
22✔
126

127
                this.#logger?.debug(`all events for aggregate ${aggregateId} retrieved`);
22✔
128
        }
129

130
        /** Retrieve events of specific Saga */
131
        async* getSagaEvents(sagaId: Identifier, filter: EventQueryBefore) {
132
                if (!sagaId)
4!
UNCOV
133
                        throw new TypeError('sagaId argument required');
×
134
                if (!filter)
4!
UNCOV
135
                        throw new TypeError('filter argument required');
×
136
                if (!filter.beforeEvent)
4!
UNCOV
137
                        throw new TypeError('filter.beforeEvent argument required');
×
138
                if (filter.beforeEvent.sagaVersion === undefined)
4!
UNCOV
139
                        throw new TypeError('filter.beforeEvent.sagaVersion argument required');
×
140

141
                this.#logger?.debug(`retrieving event stream for saga ${sagaId}, v${filter.beforeEvent.sagaVersion}...`);
4✔
142

143
                const eventsIterable = await this.#eventStorageReader.getSagaEvents(sagaId, filter);
4✔
144

145
                yield* eventsIterable;
4✔
146

147
                this.#logger?.debug(`all events for saga ${sagaId} retrieved`);
4✔
148
        }
149

150
        /**
151
         * Register event types that start sagas.
152
         * Upon such event commit a new sagaId will be assigned
153
         */
154
        registerSagaStarters(eventTypes: string[] = []) {
3✔
155
                for (const eventType of eventTypes)
10✔
156
                        this.#sagaStarters.add(eventType);
4✔
157
        }
158

159
        /**
160
         * Validate events, commit to storage and publish to messageBus, if needed
161
         *
162
         * @param events - a set of events to commit
163
         * @returns Signed and committed events
164
         */
165
        async dispatch(events: IEventSet): Promise<IEventSet> {
166
                if (!isEventSet(events) || events.length === 0)
48✔
167
                        throw new TypeError('dispatch requires a non-empty array of events');
2✔
168

169
                const augmentedEvents = await this.#attachSagaIdToSagaStarterEvents(events);
46✔
170

171
                return this.#eventDispatcher.dispatch(augmentedEvents, { origin: 'internal' });
46✔
172
        }
173

174
        /**
175
         * Generate and attach sagaId to events that start new sagas
176
         */
177
        async #attachSagaIdToSagaStarterEvents(events: IEventSet): Promise<IEventSet> {
178
                if (!this.#sagaStarters.size)
46✔
179
                        return events;
42✔
180

181
                const augmentedEvents: IEvent[] = [];
4✔
182
                for (const event of events) {
4✔
183
                        if (this.#sagaStarters.has(event.type)) {
4!
184
                                if (event.sagaId)
4!
UNCOV
185
                                        throw new Error(`Event "${event.type}" already contains sagaId. Multiple sagas with same event type are not supported`);
×
186

187
                                (event as IEvent).sagaId = await this.getNewId();
4✔
188
                                (event as IEvent).sagaVersion = 0;
4✔
189

190
                                augmentedEvents.push(event);
4✔
191
                        }
192
                        else {
193
                                augmentedEvents.push(event);
×
194
                        }
195
                }
196
                return augmentedEvents;
4✔
197
        }
198

199
        on(messageType: string, handler: IMessageHandler) {
200
                this.eventBus.on(messageType, handler);
4✔
201
        }
202

203
        off(messageType: string, handler: IMessageHandler) {
204
                this.eventBus.off(messageType, handler);
2✔
205
        }
206

207
        queue(name: string): IObservable {
208
                if (!isIObservableQueueProvider(this.eventBus))
4!
NEW
209
                        throw new Error('Injected eventBus does not support named queues');
×
210

211
                return this.eventBus.queue(name);
4✔
212
        }
213

214
        /** Creates one-time subscription for one or multiple events that match a filter */
215
        once(messageTypes: string | string[], handler?: IMessageHandler, filter?: (e: IEvent) => boolean): Promise<IEvent> {
216
                const subscribeTo = Array.isArray(messageTypes) ? messageTypes : [messageTypes];
4!
217

218
                return setupOneTimeEmitterSubscription(this.eventBus, subscribeTo, filter, handler, this.#logger);
4✔
219
        }
220
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc