• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

snatalenko / node-cqrs / 22745360142

06 Mar 2026 01:50AM UTC coverage: 95.287% (+0.9%) from 94.396%
22745360142

Pull #28

github

web-flow
Merge 15ef847b4 into 828e39903
Pull Request #28: TypeScript and event dispatching pipeline refactoring

428 of 528 branches covered (81.06%)

1043 of 1091 new or added lines in 65 files covered. (95.6%)

3 existing lines in 2 files now uncovered.

1294 of 1358 relevant lines covered (95.29%)

31.11 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

95.74
/src/EventStore.ts
1
import {
13✔
2
        type IAggregateSnapshotStorage,
3
        type IEvent,
4
        type IEventStorageReader,
5
        type IEventSet,
6
        type ILogger,
7
        type IMessageHandler,
8
        type IObservable,
9
        type IEventStream,
10
        type IEventStore,
11
        type EventQueryAfter,
12
        type EventQueryBefore,
13
        type Identifier,
14
        type IIdentifierProvider,
15
        type IEventDispatcher,
16
        type IEventBus,
17
        type IContainer,
18
        type AggregateEventsQueryParams,
19
        isIdentifierProvider,
20
        isObservableQueueProvider
21
} from './interfaces/index.ts';
22
import {
13✔
23
        assertArray,
24
        assertDefined,
25
        assertObservable,
26
        assertStringArray,
27
        parseSagaId,
28
        setupOneTimeEmitterSubscription
29
} from './utils/index.ts';
30
import { EventDispatcher } from './EventDispatcher.ts';
13✔
31

32
export class EventStore implements IEventStore {
13✔
33

34
        #identifierProvider: IIdentifierProvider;
35
        #eventStorageReader: IEventStorageReader;
36
        #snapshotStorage: IAggregateSnapshotStorage | undefined;
37
        eventBus: IEventBus;
38
        #eventDispatcher: IEventDispatcher;
39
        #logger?: ILogger;
40

41
        constructor({
42
                eventStorage,
43
                eventStorageReader = eventStorage,
2✔
44
                identifierProvider = isIdentifierProvider(eventStorageReader) ? eventStorageReader : undefined,
4!
45
                snapshotStorage,
46
                eventBus,
47
                eventDispatcher,
48
                eventDispatchPipeline,
49
                eventDispatchPipelines,
50
                logger
51
        }: Pick<IContainer,
52
                'eventStorage' |
53
                'eventStorageReader' |
54
                'identifierProvider' |
55
                'snapshotStorage' |
56
                'eventBus' |
57
                'eventDispatcher' |
58
                'logger' |
59
                'eventDispatchPipeline' |
60
                'eventDispatchPipelines'
61
        >) {
62
                assertDefined(eventStorageReader, 'eventStorageReader or eventStorage');
65✔
63
                assertDefined(identifierProvider, 'identifierProvider');
64✔
64
                assertObservable(eventBus, 'eventBus');
64✔
65

66
                this.#eventStorageReader = eventStorageReader;
64✔
67
                this.#identifierProvider = identifierProvider;
64✔
68
                this.#snapshotStorage = snapshotStorage;
64✔
69
                this.#eventDispatcher = eventDispatcher ?? new EventDispatcher({
64!
70
                        eventBus,
71
                        eventDispatchPipeline,
72
                        eventDispatchPipelines
73
                });
74
                this.eventBus = eventBus ?? this.#eventDispatcher.eventBus;
64!
75
                this.#logger = logger && 'child' in logger ?
64!
76
                        logger.child({ service: new.target.name }) :
77
                        logger;
78
        }
79

80
        /**
81
         * Generates and returns a new unique identifier using the configured identifier provider.
82
         *
83
         * @returns A promise resolving to a unique identifier suitable for aggregates, sagas, and events.
84
         */
85
        async getNewId(): Promise<Identifier> {
86
                return this.#identifierProvider.getNewId();
12✔
87
        }
88

89
        async* getEventsByTypes(eventTypes: Readonly<string[]>, options?: EventQueryAfter): IEventStream {
90
                assertStringArray(eventTypes, 'eventTypes');
3✔
91

92
                this.#logger?.debug(`retrieving ${eventTypes.join(', ')} events...`);
3✔
93

94
                const eventsIterable = await this.#eventStorageReader.getEventsByTypes(eventTypes, options);
3✔
95

96
                yield* eventsIterable;
3✔
97

98
                this.#logger?.debug(`${eventTypes.join(', ')} events retrieved`);
2✔
99
        }
100

101
        /** Retrieve all events of specific Aggregate */
102
        async* getAggregateEvents(aggregateId: Identifier, options?: AggregateEventsQueryParams): IEventStream {
103
                assertDefined(aggregateId, 'aggregateId');
63✔
104

105
                this.#logger?.debug(`retrieving event stream for aggregate ${aggregateId}...`);
63✔
106

107
                // Get snapshot from snapshot storage if not provided in options
108
                let snapshot = options?.snapshot;
63✔
109
                if (!snapshot && this.#snapshotStorage)
63✔
110
                        snapshot = await this.#snapshotStorage.getAggregateSnapshot(aggregateId);
63✔
111

112
                if (snapshot)
63✔
113
                        yield snapshot;
1✔
114

115
                const eventsIterable = await this.#eventStorageReader.getAggregateEvents(aggregateId, {
63✔
116
                        ...options,
117
                        snapshot
118
                });
119

120
                yield* eventsIterable;
63✔
121

122
                this.#logger?.debug(`all events for aggregate ${aggregateId} retrieved`);
63✔
123
        }
124

125
        /** Retrieve events of specific Saga */
126
        async* getSagaEvents(sagaId: Identifier, filter: EventQueryBefore) {
127
                assertDefined(sagaId, 'sagaId');
5✔
128
                assertDefined(filter?.beforeEvent?.id, 'filter.beforeEvent.id');
5✔
129

130
                const { sagaDescriptor, originEventId } = parseSagaId(sagaId);
5✔
131
                if (filter.beforeEvent.sagaOrigins?.[sagaDescriptor] !== originEventId)
5!
NEW
132
                        throw new TypeError('filter.beforeEvent.sagaOrigins does not match sagaId');
×
133

134
                this.#logger?.debug(`retrieving event stream for saga ${sagaId} before event ${filter.beforeEvent.id}...`);
5✔
135

136
                const eventsIterable = await this.#eventStorageReader.getSagaEvents(sagaId, filter);
5✔
137

138
                yield* eventsIterable;
5✔
139

140
                this.#logger?.debug(`all events for saga ${sagaId} retrieved`);
5✔
141
        }
142

143
        /**
144
         * Validate events, commit to storage and publish to messageBus, if needed
145
         *
146
         * @param events - a set of events to commit
147
         * @returns Signed and committed events
148
         */
149
        async dispatch(events: IEventSet, meta?: Record<string, any>): Promise<IEventSet> {
150
                assertArray(events, 'events');
49✔
151

152
                return this.#eventDispatcher.dispatch(events, {
48✔
153
                        origin: 'internal',
154
                        ...meta
155
                });
156
        }
157

158
        on(messageType: string, handler: IMessageHandler) {
159
                this.eventBus.on(messageType, handler);
3✔
160
        }
161

162
        off(messageType: string, handler: IMessageHandler) {
163
                this.eventBus.off(messageType, handler);
1✔
164
        }
165

166
        queue(name: string): IObservable {
167
                if (!isObservableQueueProvider(this.eventBus))
2!
NEW
168
                        throw new Error('Injected eventBus does not support named queues');
×
169

170
                return this.eventBus.queue(name);
2✔
171
        }
172

173
        /** Creates one-time subscription for one or multiple events that match a filter */
174
        once(messageTypes: string | string[], handler?: IMessageHandler, filter?: (e: IEvent) => boolean): Promise<IEvent> {
175
                const subscribeTo = Array.isArray(messageTypes) ? messageTypes : [messageTypes];
2!
176

177
                return setupOneTimeEmitterSubscription(this.eventBus, subscribeTo, filter, handler, this.#logger);
2✔
178
        }
179
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc