• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

snatalenko / node-cqrs / 21870697222

10 Feb 2026 03:17PM UTC coverage: 85.173%. First build
21870697222

Pull #31

github

web-flow
Merge 34cc162e0 into 025edb883
Pull Request #31: Multi-saga correlation via `message.sagaOrigins`

657 of 993 branches covered (66.16%)

143 of 158 new or added lines in 17 files covered. (90.51%)

1258 of 1477 relevant lines covered (85.17%)

33.43 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

79.66
/src/EventStore.ts
1
import {
26✔
2
        type IAggregateSnapshotStorage,
3
        type IEvent,
4
        type IEventStorageReader,
5
        type IEventSet,
6
        type ILogger,
7
        type IMessageHandler,
8
        type IObservable,
9
        type IEventStream,
10
        type IEventStore,
11
        type EventQueryAfter,
12
        type EventQueryBefore,
13
        type Identifier,
14
        type IIdentifierProvider,
15
        type IEventDispatcher,
16
        type IEventBus,
17
        type IContainer,
18
        isIdentifierProvider,
19
        isIEventBus,
20
        isIEventStorageReader,
21
        isEventSet,
22
        isIObservableQueueProvider
23
} from './interfaces/index.ts';
24
import {
26✔
25
        getClassName,
26
        parseSagaId,
27
        setupOneTimeEmitterSubscription
28
} from './utils/index.ts';
29
import { EventDispatcher } from './EventDispatcher.ts';
26✔
30

31
export class EventStore implements IEventStore {
26✔
32

33
        #identifierProvider: IIdentifierProvider;
34
        #eventStorageReader: IEventStorageReader;
35
        #snapshotStorage: IAggregateSnapshotStorage | undefined;
36
        eventBus: IEventBus;
37
        #eventDispatcher: IEventDispatcher;
38
        #logger?: ILogger;
39

40
        constructor({
41
                eventStorageReader,
42
                identifierProvider = isIdentifierProvider(eventStorageReader) ? eventStorageReader : undefined,
×
43
                snapshotStorage,
44
                eventBus,
45
                eventDispatcher,
46
                eventDispatchPipeline,
47
                eventDispatchPipelines,
48
                logger
49
        }: Pick<IContainer,
50
                'identifierProvider' |
51
                'eventStorageReader' |
52
                'snapshotStorage' |
53
                'eventBus' |
54
                'eventDispatcher' |
55
                'logger' |
56
                'eventDispatchPipeline' |
57
                'eventDispatchPipelines'
58
        >) {
59
                if (!eventStorageReader)
84!
60
                        throw new TypeError('eventStorageReader argument required');
×
61
                if (!identifierProvider)
84!
62
                        throw new TypeError('identifierProvider argument required');
×
63
                if (!isIEventStorageReader(eventStorageReader))
84!
64
                        throw new TypeError('storage does not implement IEventStorage interface');
×
65
                if (eventBus && !isIEventBus(eventBus))
84!
66
                        throw new TypeError('eventBus does not implement IMessageBus interface');
×
67

68
                this.#eventStorageReader = eventStorageReader;
84✔
69
                this.#identifierProvider = identifierProvider;
84✔
70
                this.#snapshotStorage = snapshotStorage;
84✔
71
                this.#eventDispatcher = eventDispatcher ?? new EventDispatcher({
84!
72
                        eventBus,
73
                        eventDispatchPipeline,
74
                        eventDispatchPipelines
75
                });
76
                this.eventBus = eventBus ?? this.#eventDispatcher.eventBus;
84!
77
                this.#logger = logger && 'child' in logger ?
84!
78
                        logger.child({ service: getClassName(this) }) :
79
                        logger;
80
        }
81

82
        /**
83
         * Generates and returns a new unique identifier using the configured identifier provider.
84
         *
85
         * @returns A promise resolving to a unique identifier suitable for aggregates, sagas, and events.
86
         */
87
        async getNewId(): Promise<Identifier> {
88
                return this.#identifierProvider.getNewId();
18✔
89
        }
90

91
        async* getEventsByTypes(eventTypes: Readonly<string[]>, options?: EventQueryAfter): IEventStream {
92
                if (!Array.isArray(eventTypes))
4!
93
                        throw new TypeError('eventTypes argument must be an Array');
×
94

95
                this.#logger?.debug(`retrieving ${eventTypes.join(', ')} events...`);
4✔
96

97
                const eventsIterable = await this.#eventStorageReader.getEventsByTypes(eventTypes, options);
4✔
98

99
                yield* eventsIterable;
4✔
100

101
                this.#logger?.debug(`${eventTypes.join(', ')} events retrieved`);
2✔
102
        }
103

104
        /** Retrieve all events of specific Aggregate */
105
        async* getAggregateEvents(aggregateId: Identifier): IEventStream {
106
                if (!aggregateId)
20!
107
                        throw new TypeError('aggregateId argument required');
×
108

109
                this.#logger?.debug(`retrieving event stream for aggregate ${aggregateId}...`);
20✔
110

111
                const snapshot = this.#snapshotStorage ?
20!
112
                        await this.#snapshotStorage.getAggregateSnapshot(aggregateId) :
113
                        undefined;
114

115
                if (snapshot)
20✔
116
                        yield snapshot;
2✔
117

118
                const eventsIterable = await this.#eventStorageReader.getAggregateEvents(aggregateId, { snapshot });
20✔
119

120
                yield* eventsIterable;
20✔
121

122
                this.#logger?.debug(`all events for aggregate ${aggregateId} retrieved`);
20✔
123
        }
124

125
        /** Retrieve events of specific Saga */
126
        async* getSagaEvents(sagaId: Identifier, filter: EventQueryBefore) {
127
                if (!sagaId)
10!
128
                        throw new TypeError('sagaId argument required');
×
129
                if (!filter)
10!
130
                        throw new TypeError('filter argument required');
×
131
                if (!filter.beforeEvent)
10!
132
                        throw new TypeError('filter.beforeEvent argument required');
×
133
                if (typeof filter.beforeEvent.id !== 'string' || !filter.beforeEvent.id.length)
10!
NEW
134
                        throw new TypeError('filter.beforeEvent.id argument required');
×
135

136
                const { sagaDescriptor, originEventId } = parseSagaId(sagaId);
10✔
137
                if (filter.beforeEvent.sagaOrigins?.[sagaDescriptor] !== originEventId)
10!
NEW
138
                        throw new TypeError('filter.beforeEvent.sagaOrigins does not match sagaId');
×
139

140
                this.#logger?.debug(`retrieving event stream for saga ${sagaId} before event ${filter.beforeEvent.id}...`);
10✔
141

142
                const eventsIterable = await this.#eventStorageReader.getSagaEvents(sagaId, filter);
10✔
143

144
                yield* eventsIterable;
10✔
145

146
                this.#logger?.debug(`all events for saga ${sagaId} retrieved`);
10✔
147
        }
148

149
        /**
150
         * Validate events, commit to storage and publish to messageBus, if needed
151
         *
152
         * @param events - a set of events to commit
153
         * @returns Signed and committed events
154
         */
155
        async dispatch(events: IEventSet): Promise<IEventSet> {
156
                if (!isEventSet(events) || events.length === 0)
50✔
157
                        throw new TypeError('dispatch requires a non-empty array of events');
2✔
158

159
                return this.#eventDispatcher.dispatch(events, { origin: 'internal' });
48✔
160
        }
161

162
        on(messageType: string, handler: IMessageHandler) {
163
                this.eventBus.on(messageType, handler);
6✔
164
        }
165

166
        off(messageType: string, handler: IMessageHandler) {
167
                this.eventBus.off(messageType, handler);
2✔
168
        }
169

170
        queue(name: string): IObservable {
171
                if (!isIObservableQueueProvider(this.eventBus))
4!
172
                        throw new Error('Injected eventBus does not support named queues');
×
173

174
                return this.eventBus.queue(name);
4✔
175
        }
176

177
        /** Creates one-time subscription for one or multiple events that match a filter */
178
        once(messageTypes: string | string[], handler?: IMessageHandler, filter?: (e: IEvent) => boolean): Promise<IEvent> {
179
                const subscribeTo = Array.isArray(messageTypes) ? messageTypes : [messageTypes];
4!
180

181
                return setupOneTimeEmitterSubscription(this.eventBus, subscribeTo, filter, handler, this.#logger);
4✔
182
        }
183
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc