• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

snatalenko / node-cqrs / 21845544207

09 Feb 2026 11:50PM UTC coverage: 85.047%. First build
21845544207

Pull #31

github

web-flow
Merge 788523d43 into 025edb883
Pull Request #31: Multi-saga correlation via `message.sagaOrigins`

650 of 986 branches covered (65.92%)

140 of 155 new or added lines in 17 files covered. (90.32%)

1257 of 1478 relevant lines covered (85.05%)

31.91 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

87.14
/src/SagaEventHandler.ts
1
import * as Event from './Event.ts';
28✔
2
import type {
3
        ICommandBus,
4
        IContainer,
5
        Identifier,
6
        IEvent,
7
        IEventReceptor,
8
        IEventStore,
9
        ILocker,
10
        ILogger,
11
        IObservable,
12
        ISaga,
13
        ISagaConstructor,
14
        ISagaFactory
15
} from './interfaces/index.ts';
16

17
import {
28✔
18
        subscribe,
19
        getClassName,
20
        Lock,
21
        makeSagaId,
22
        MapAssertable
23
} from './utils/index.ts';
24

25
/**
26
 * Listens to Saga events,
27
 * creates new saga or restores it from event store,
28
 * applies new events
29
 * and passes command(s) to command bus
30
 */
31
export class SagaEventHandler implements IEventReceptor {
28✔
32

33
        readonly #eventStore: IEventStore;
34
        readonly #commandBus: ICommandBus;
35
        readonly #queueName?: string;
36
        readonly #logger?: ILogger;
37
        readonly #sagaFactory: (params: any) => ISaga;
38
        readonly #startsWith: string[];
39
        readonly #handles: string[];
40
        readonly #sagaDescriptor: string;
41
        readonly #executionLock: ILocker;
42
        readonly #sagasCache: MapAssertable<Identifier, Promise<ISaga>> = new MapAssertable();
40✔
43

44
        constructor(options: Pick<IContainer, 'eventStore' | 'commandBus' | 'executionLocker' | 'logger'> & {
45
                sagaType?: ISagaConstructor,
46
                sagaFactory?: ISagaFactory,
47
                sagaDescriptor?: string,
48
                queueName?: string,
49
                startsWith?: string[],
50
                handles?: string[]
51
        }) {
52
                if (!options)
40!
53
                        throw new TypeError('options argument required');
×
54
                if (!options.eventStore)
40!
55
                        throw new TypeError('options.eventStore argument required');
×
56
                if (!options.commandBus)
40!
57
                        throw new TypeError('options.commandBus argument required');
×
58

59
                this.#eventStore = options.eventStore;
40✔
60
                this.#commandBus = options.commandBus;
40✔
61
                this.#queueName = options.queueName;
40✔
62
                this.#executionLock = options.executionLocker ?? new Lock();
40✔
63
                this.#logger = options.logger && 'child' in options.logger ?
40!
64
                        options.logger.child({ service: getClassName(this) }) :
65
                        options.logger;
66

67
                if (options.sagaType) {
40✔
68
                        const SagaType = options.sagaType as ISagaConstructor;
32✔
69

70
                        this.#sagaFactory = params => new SagaType(params);
32✔
71
                        this.#startsWith = SagaType.startsWith;
32✔
72
                        this.#handles = SagaType.handles;
32✔
73
                        this.#sagaDescriptor = SagaType.sagaDescriptor ?? SagaType.name;
32✔
74
                }
75
                else if (options.sagaFactory) {
8!
76
                        if (!Array.isArray(options.startsWith))
8!
77
                                throw new TypeError('options.startsWith argument must be an Array');
×
78
                        if (!Array.isArray(options.handles))
8!
79
                                throw new TypeError('options.handles argument must be an Array');
×
80

81
                        this.#sagaFactory = options.sagaFactory;
8✔
82
                        this.#startsWith = options.startsWith;
8✔
83
                        this.#handles = options.handles;
8✔
84
                        this.#sagaDescriptor = options.sagaDescriptor ?? options.queueName ?? '';
8✔
85
                        if (!this.#sagaDescriptor)
8✔
86
                                throw new TypeError('options.sagaDescriptor argument required when sagaFactory is provided');
2✔
87
                }
88
                else {
89
                        throw new Error('Either sagaType or sagaFactory is required');
×
90
                }
91
        }
92

93
        /** Overrides observer subscribe method */
94
        subscribe(eventStore: IObservable) {
95
                subscribe(eventStore, this, {
4✔
96
                        messageTypes: [...this.#startsWith, ...this.#handles],
97
                        masterHandler: this.handle,
98
                        queueName: this.#queueName
99
                });
100
        }
101

102
        /** Handle saga event */
103
        async handle(event: IEvent): Promise<void> {
104
                if (!event)
30!
105
                        throw new TypeError('event argument required');
×
106
                if (!event.type)
30!
107
                        throw new TypeError('event.type argument required');
×
108
                if (typeof event.id !== 'string' || !event.id.length)
30✔
109
                        throw new TypeError('event.id argument required');
2✔
110

111
                const isStarterEvent = this.#startsWith.includes(event.type);
28✔
112
                if (isStarterEvent && event.sagaOrigins?.[this.#sagaDescriptor])
28✔
113
                        throw new Error(`Starter event "${event.type}" already contains saga origin for "${this.#sagaDescriptor}"`);
4✔
114

115
                const sagaOrigin = isStarterEvent ? event.id : event.sagaOrigins?.[this.#sagaDescriptor];
24✔
116
                if (!sagaOrigin)
24!
NEW
117
                        throw new Error(`Event "${event.type}" does not contain saga origin for "${this.#sagaDescriptor}"`);
×
118

119
                const sagaId = makeSagaId(this.#sagaDescriptor, sagaOrigin);
24✔
120
                const saga = await this.#sagasCache.assert(sagaId, () => (isStarterEvent ?
24✔
121
                        this.#createSaga(sagaId) :
122
                        this.#restoreSaga(sagaId, event)
123
                ));
124

125
                // multiple events to a same saga ID will execute sequentially on a same saga instance
126
                const lease = await this.#executionLock.acquire(sagaId);
24✔
127

128
                try {
24✔
129
                        const commands = await saga.handle(event);
24✔
130
                        this.#logger?.debug(`"${Event.describe(event)}" processed, ${commands.map(c => c.type).join(',') || 'no commands'} produced`);
24!
131

132
                        for (const command of commands) {
24✔
133
                                // attach event context to produced command
134
                                if (command.context === undefined && event.context !== undefined)
16✔
135
                                        command.context = event.context;
2✔
136

137
                                command.sagaOrigins = {
16✔
138
                                        ...event.sagaOrigins,
139
                                        [this.#sagaDescriptor]: sagaOrigin,
140

141
                                        // `sagaOrigins` returned in a command has the highest priority
142
                                        ...command.sagaOrigins
143
                                };
144

145
                                await this.#commandBus.sendRaw(command);
16✔
146
                        }
147
                }
148
                finally {
149
                        lease.release();
24✔
150
                        this.#sagasCache.release(sagaId);
24✔
151
                }
152
        }
153

154
        /** Start new saga */
155
        async #createSaga(id: Identifier): Promise<ISaga> {
156
                return this.#sagaFactory.call(null, { id });
16✔
157
        }
158

159
        /** Restore saga from event store */
160
        async #restoreSaga(id: Identifier, event: IEvent): Promise<ISaga> {
161
                const saga = this.#sagaFactory.call(null, { id });
6✔
162

163
                const eventsIterable = this.#eventStore.getSagaEvents(id, { beforeEvent: event });
6✔
164
                let eventsCount = 0;
6✔
165
                for await (const oldEvent of eventsIterable) {
6✔
166
                        const r = saga.mutate(oldEvent);
8✔
167
                        if (r instanceof Promise)
8✔
168
                                await r;
4✔
169

170
                        eventsCount += 1;
8✔
171
                }
172

173
                this.#logger?.info(`Saga state restored from ${eventsCount} event(s)`);
6✔
174

175
                return saga;
6✔
176
        }
177
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc