• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

snatalenko / node-cqrs / 16884941303

11 Aug 2025 03:42PM UTC coverage: 83.547% (+0.01%) from 83.534%
16884941303

push

github

snatalenko
Update dependencies

507 of 809 branches covered (62.67%)

1041 of 1246 relevant lines covered (83.55%)

25.06 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

90.0
/src/EventDispatcher.ts
1
import {
24✔
2
        DispatchPipelineBatch,
3
        IEvent,
4
        IEventDispatcher,
5
        IDispatchPipelineProcessor,
6
        IEventSet,
7
        IEventBus,
8
        isEventSet,
9
        IContainer,
10
        isDispatchPipelineProcessor
11
} from './interfaces';
12
import { parallelPipe } from 'async-parallel-pipe';
24✔
13
import { AsyncIterableBuffer } from 'async-iterable-buffer';
24✔
14
import { getClassName, notEmpty } from './utils';
24✔
15
import { InMemoryMessageBus } from './in-memory';
24✔
16

17
type EventBatchEnvelope = {
18
        data: DispatchPipelineBatch<{ event?: IEvent }>;
19
        error?: Error;
20
        resolve: (event: IEvent[]) => void;
21
        reject: (error: Error) => void;
22
}
23

24
export class EventDispatcher implements IEventDispatcher {
24✔
25

26
        #pipelineInput = new AsyncIterableBuffer<EventBatchEnvelope>();
54✔
27
        #processors: Array<IDispatchPipelineProcessor> = [];
54✔
28
        #pipeline: AsyncIterableIterator<EventBatchEnvelope> | IterableIterator<EventBatchEnvelope> = this.#pipelineInput;
54✔
29

30
        /**
31
         * Event bus where dispatched messages are delivered after processing.
32
         *
33
         * If not provided in the constructor, defaults to an instance of `InMemoryMessageBus`.
34
         */
35
        eventBus: IEventBus;
36

37
        /**
38
         * Maximum number of event batches that each pipeline processor can handle in parallel.
39
         */
40
        concurrentLimit: number;
41

42
        constructor(o?: Pick<IContainer, 'eventBus' | 'eventDispatchPipeline'> & {
43
                eventDispatcherConfig?: {
44
                        concurrentLimit?: number
45
                }
46
        }) {
47
                this.eventBus = o?.eventBus ?? new InMemoryMessageBus();
54!
48
                this.concurrentLimit = o?.eventDispatcherConfig?.concurrentLimit ?? 100;
54✔
49

50
                if (o?.eventDispatchPipeline)
54✔
51
                        this.addPipelineProcessors(o.eventDispatchPipeline);
38✔
52
        }
53

54
        addPipelineProcessors(eventDispatchPipeline: IDispatchPipelineProcessor[]) {
55
                if (!Array.isArray(eventDispatchPipeline))
38!
56
                        throw new TypeError('eventDispatchPipeline argument must be an Array');
×
57

58
                for (const processor of eventDispatchPipeline) {
38✔
59
                        if (processor)
78✔
60
                                this.addPipelineProcessor(processor);
78✔
61
                }
62
        }
63

64
        /**
65
         * Adds a preprocessor to the event dispatch pipeline.
66
         *
67
         * Preprocessors run in order they are added but process separate batches in parallel, maintaining FIFO order.
68
         */
69
        addPipelineProcessor(preprocessor: IDispatchPipelineProcessor) {
70
                if (!isDispatchPipelineProcessor(preprocessor))
86!
71
                        throw new TypeError(`preprocessor ${getClassName(preprocessor)} does not implement IDispatchPipelineProcessor`);
×
72
                if (this.#pipelineProcessing)
86!
73
                        throw new Error('pipeline processing already started');
×
74

75
                this.#processors.push(preprocessor);
86✔
76

77
                // Build a processing pipeline that runs preprocessors concurrently, preserving FIFO ordering
78
                this.#pipeline = parallelPipe(this.#pipeline, this.concurrentLimit, async envelope => {
86✔
79
                        if (envelope.error)
96!
80
                                return envelope;
×
81

82
                        try {
96✔
83
                                return {
96✔
84
                                        ...envelope,
85
                                        data: await preprocessor.process(envelope.data)
86
                                };
87
                        }
88
                        catch (error: any) {
89
                                return {
2✔
90
                                        ...envelope,
91
                                        error
92
                                };
93
                        }
94
                });
95
        }
96

97
        #pipelineProcessing = false;
54✔
98

99
        /**
100
         * Consume the pipeline, publish events, and resolve/reject each batch
101
         */
102
        async #startPipelineProcessing() {
103
                if (this.#pipelineProcessing) // should never happen
34!
104
                        throw new Error('pipeline processing already started');
×
105

106
                this.#pipelineProcessing = true;
34✔
107

108
                for await (const { error, reject, data, resolve } of this.#pipeline) {
34✔
109
                        if (error) { // some of the preprocessors failed
52✔
110
                                await this.#revert(data);
2✔
111
                                reject(error);
2✔
112
                                continue;
2✔
113
                        }
114

115
                        try {
50✔
116
                                const events: IEvent[] = [];
50✔
117

118
                                for (const batch of data) {
50✔
119
                                        const { event, ...meta } = batch;
54✔
120
                                        if (event) {
54✔
121
                                                await this.eventBus.publish(event, meta);
54✔
122
                                                events.push(event);
54✔
123
                                        }
124
                                }
125

126
                                resolve(events);
50✔
127
                        }
128
                        catch (publishError: any) {
129
                                reject(publishError);
×
130
                        }
131
                }
132
        }
133

134
        /**
135
         * Revert side effects made by pipeline processors in case of a batch processing failure
136
         */
137
        async #revert(batch: DispatchPipelineBatch) {
138
                for (const processor of this.#processors)
2✔
139
                        await processor.revert?.(batch);
2✔
140
        }
141

142
        /**
143
         * Dispatch a set of events through the processing pipeline.
144
         *
145
         * Returns a promise that resolves after all events are processed and published.
146
         */
147
        async dispatch(events: IEventSet, meta?: Record<string, any>) {
148
                if (!isEventSet(events) || events.length === 0)
54✔
149
                        throw new Error('dispatch requires a non-empty array of events');
2✔
150

151
                // const { promise, resolve, reject } = Promise.withResolvers<IEventSet>();
152
                let resolve!: (value: IEventSet | PromiseLike<IEventSet>) => void;
153
                let reject!: (reason?: any) => void;
154
                const promise = new Promise<IEventSet>((res, rej) => {
52✔
155
                        resolve = res;
52✔
156
                        reject = rej;
52✔
157
                });
158

159
                const envelope: EventBatchEnvelope = {
52✔
160
                        data: events.map(event => ({
56✔
161
                                event,
162
                                ...meta
163
                        })),
164
                        resolve,
165
                        reject
166
                };
167

168
                if (!this.#pipelineProcessing)
52✔
169
                        this.#startPipelineProcessing();
34✔
170

171
                this.#pipelineInput.push(envelope);
52✔
172

173
                return promise;
52✔
174
        }
175
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc