• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

snatalenko / node-cqrs / 21875775917

10 Feb 2026 05:38PM UTC coverage: 85.292%. First build
21875775917

Pull #31

github

web-flow
Merge 4cde8f205 into f3844c4de
Pull Request #31: Multi-saga correlation via `message.sagaOrigins`

673 of 1010 branches covered (66.63%)

143 of 158 new or added lines in 17 files covered. (90.51%)

1270 of 1489 relevant lines covered (85.29%)

33.79 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

80.33
/src/EventStore.ts
1
import {
26✔
2
        type IAggregateSnapshotStorage,
3
        type IEvent,
4
        type IEventStorageReader,
5
        type IEventSet,
6
        type ILogger,
7
        type IMessageHandler,
8
        type IObservable,
9
        type IEventStream,
10
        type IEventStore,
11
        type EventQueryAfter,
12
        type EventQueryBefore,
13
        type Identifier,
14
        type IIdentifierProvider,
15
        type IEventDispatcher,
16
        type IEventBus,
17
        type IContainer,
18
        type AggregateEventsQueryParams,
19
        isIdentifierProvider,
20
        isIEventBus,
21
        isIEventStorageReader,
22
        isEventSet,
23
        isIObservableQueueProvider
24
} from './interfaces/index.ts';
25
import {
26✔
26
        getClassName,
27
        parseSagaId,
28
        setupOneTimeEmitterSubscription
29
} from './utils/index.ts';
30
import { EventDispatcher } from './EventDispatcher.ts';
26✔
31

32
export class EventStore implements IEventStore {
26✔
33

34
        #identifierProvider: IIdentifierProvider;
35
        #eventStorageReader: IEventStorageReader;
36
        #snapshotStorage: IAggregateSnapshotStorage | undefined;
37
        eventBus: IEventBus;
38
        #eventDispatcher: IEventDispatcher;
39
        #logger?: ILogger;
40

41
        constructor({
42
                eventStorageReader,
43
                identifierProvider = isIdentifierProvider(eventStorageReader) ? eventStorageReader : undefined,
×
44
                snapshotStorage,
45
                eventBus,
46
                eventDispatcher,
47
                eventDispatchPipeline,
48
                eventDispatchPipelines,
49
                logger
50
        }: Pick<IContainer,
51
                'identifierProvider' |
52
                'eventStorageReader' |
53
                'snapshotStorage' |
54
                'eventBus' |
55
                'eventDispatcher' |
56
                'logger' |
57
                'eventDispatchPipeline' |
58
                'eventDispatchPipelines'
59
        >) {
60
                if (!eventStorageReader)
86!
61
                        throw new TypeError('eventStorageReader argument required');
×
62
                if (!identifierProvider)
86!
63
                        throw new TypeError('identifierProvider argument required');
×
64
                if (!isIEventStorageReader(eventStorageReader))
86!
65
                        throw new TypeError('storage does not implement IEventStorage interface');
×
66
                if (eventBus && !isIEventBus(eventBus))
86!
67
                        throw new TypeError('eventBus does not implement IMessageBus interface');
×
68

69
                this.#eventStorageReader = eventStorageReader;
86✔
70
                this.#identifierProvider = identifierProvider;
86✔
71
                this.#snapshotStorage = snapshotStorage;
86✔
72
                this.#eventDispatcher = eventDispatcher ?? new EventDispatcher({
86!
73
                        eventBus,
74
                        eventDispatchPipeline,
75
                        eventDispatchPipelines
76
                });
77
                this.eventBus = eventBus ?? this.#eventDispatcher.eventBus;
86!
78
                this.#logger = logger && 'child' in logger ?
86!
79
                        logger.child({ service: getClassName(this) }) :
80
                        logger;
81
        }
82

83
        /**
84
         * Generates and returns a new unique identifier using the configured identifier provider.
85
         *
86
         * @returns A promise resolving to a unique identifier suitable for aggregates, sagas, and events.
87
         */
88
        async getNewId(): Promise<Identifier> {
89
                return this.#identifierProvider.getNewId();
18✔
90
        }
91

92
        async* getEventsByTypes(eventTypes: Readonly<string[]>, options?: EventQueryAfter): IEventStream {
93
                if (!Array.isArray(eventTypes))
4!
94
                        throw new TypeError('eventTypes argument must be an Array');
×
95

96
                this.#logger?.debug(`retrieving ${eventTypes.join(', ')} events...`);
4✔
97

98
                const eventsIterable = await this.#eventStorageReader.getEventsByTypes(eventTypes, options);
4✔
99

100
                yield* eventsIterable;
4✔
101

102
                this.#logger?.debug(`${eventTypes.join(', ')} events retrieved`);
2✔
103
        }
104

105
        /** Retrieve all events of specific Aggregate */
106
        async* getAggregateEvents(aggregateId: Identifier, options?: AggregateEventsQueryParams): IEventStream {
107
                if (!aggregateId)
22!
108
                        throw new TypeError('aggregateId argument required');
×
109

110
                this.#logger?.debug(`retrieving event stream for aggregate ${aggregateId}...`);
22✔
111

112
                // Get snapshot from snapshot storage if not provided in options
113
                let snapshot = options?.snapshot;
22✔
114
                if (!snapshot && this.#snapshotStorage)
22✔
115
                        snapshot = await this.#snapshotStorage.getAggregateSnapshot(aggregateId);
22✔
116

117
                if (snapshot)
22✔
118
                        yield snapshot;
2✔
119

120
                const eventsIterable = await this.#eventStorageReader.getAggregateEvents(aggregateId, {
22✔
121
                        ...options,
122
                        snapshot
123
                });
124

125
                yield* eventsIterable;
22✔
126

127
                this.#logger?.debug(`all events for aggregate ${aggregateId} retrieved`);
22✔
128
        }
129

130
        /** Retrieve events of specific Saga */
131
        async* getSagaEvents(sagaId: Identifier, filter: EventQueryBefore) {
132
                if (!sagaId)
10!
133
                        throw new TypeError('sagaId argument required');
×
134
                if (!filter)
10!
135
                        throw new TypeError('filter argument required');
×
136
                if (!filter.beforeEvent)
10!
137
                        throw new TypeError('filter.beforeEvent argument required');
×
138
                if (typeof filter.beforeEvent.id !== 'string' || !filter.beforeEvent.id.length)
10!
NEW
139
                        throw new TypeError('filter.beforeEvent.id argument required');
×
140

141
                const { sagaDescriptor, originEventId } = parseSagaId(sagaId);
10✔
142
                if (filter.beforeEvent.sagaOrigins?.[sagaDescriptor] !== originEventId)
10!
NEW
143
                        throw new TypeError('filter.beforeEvent.sagaOrigins does not match sagaId');
×
144

145
                this.#logger?.debug(`retrieving event stream for saga ${sagaId} before event ${filter.beforeEvent.id}...`);
10✔
146

147
                const eventsIterable = await this.#eventStorageReader.getSagaEvents(sagaId, filter);
10✔
148

149
                yield* eventsIterable;
10✔
150

151
                this.#logger?.debug(`all events for saga ${sagaId} retrieved`);
10✔
152
        }
153

154
        /**
155
         * Validate events, commit to storage and publish to messageBus, if needed
156
         *
157
         * @param events - a set of events to commit
158
         * @returns Signed and committed events
159
         */
160
        async dispatch(events: IEventSet): Promise<IEventSet> {
161
                if (!isEventSet(events) || events.length === 0)
54✔
162
                        throw new TypeError('dispatch requires a non-empty array of events');
2✔
163

164
                return this.#eventDispatcher.dispatch(events, { origin: 'internal' });
52✔
165
        }
166

167
        on(messageType: string, handler: IMessageHandler) {
168
                this.eventBus.on(messageType, handler);
6✔
169
        }
170

171
        off(messageType: string, handler: IMessageHandler) {
172
                this.eventBus.off(messageType, handler);
2✔
173
        }
174

175
        queue(name: string): IObservable {
176
                if (!isIObservableQueueProvider(this.eventBus))
4!
177
                        throw new Error('Injected eventBus does not support named queues');
×
178

179
                return this.eventBus.queue(name);
4✔
180
        }
181

182
        /** Creates one-time subscription for one or multiple events that match a filter */
183
        once(messageTypes: string | string[], handler?: IMessageHandler, filter?: (e: IEvent) => boolean): Promise<IEvent> {
184
                const subscribeTo = Array.isArray(messageTypes) ? messageTypes : [messageTypes];
4!
185

186
                return setupOneTimeEmitterSubscription(this.eventBus, subscribeTo, filter, handler, this.#logger);
4✔
187
        }
188
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc