• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

snatalenko / node-cqrs / 21844816144

09 Feb 2026 11:21PM UTC coverage: 83.491%. First build
21844816144

Pull #31

github

web-flow
Merge ae675944f into 025edb883
Pull Request #31: Multi-saga correlation via `message.sagaOrigins`

636 of 986 branches covered (64.5%)

117 of 155 new or added lines in 17 files covered. (75.48%)

1234 of 1478 relevant lines covered (83.49%)

29.86 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

82.86
/src/SagaEventHandler.ts
1
import * as Event from './Event.ts';
24✔
2
import type {
3
        ICommandBus,
4
        IContainer,
5
        Identifier,
6
        IEvent,
7
        IEventReceptor,
8
        IEventStore,
9
        ILocker,
10
        ILogger,
11
        IObservable,
12
        ISaga,
13
        ISagaConstructor,
14
        ISagaFactory
15
} from './interfaces/index.ts';
16

17
import {
24✔
18
        subscribe,
19
        getClassName,
20
        Lock,
21
        makeSagaId,
22
        MapAssertable
23
} from './utils/index.ts';
24

25
/**
26
 * Listens to Saga events,
27
 * creates new saga or restores it from event store,
28
 * applies new events
29
 * and passes command(s) to command bus
30
 */
31
export class SagaEventHandler implements IEventReceptor {
24✔
32

33
        readonly #eventStore: IEventStore;
34
        readonly #commandBus: ICommandBus;
35
        readonly #queueName?: string;
36
        readonly #logger?: ILogger;
37
        readonly #sagaFactory: (params: any) => ISaga;
38
        readonly #startsWith: string[];
39
        readonly #handles: string[];
40
        readonly #sagaDescriptor: string;
41
        readonly #executionLock: ILocker;
42
        readonly #sagasCache: MapAssertable<Identifier, Promise<ISaga>> = new MapAssertable();
26✔
43

44
        constructor(options: Pick<IContainer, 'eventStore' | 'commandBus' | 'executionLocker' | 'logger'> & {
45
                sagaType?: ISagaConstructor,
46
                sagaFactory?: ISagaFactory,
47
                sagaDescriptor?: string,
48
                queueName?: string,
49
                startsWith?: string[],
50
                handles?: string[]
51
        }) {
52
                if (!options)
26!
53
                        throw new TypeError('options argument required');
×
54
                if (!options.eventStore)
26!
55
                        throw new TypeError('options.eventStore argument required');
×
56
                if (!options.commandBus)
26!
57
                        throw new TypeError('options.commandBus argument required');
×
58

59
                this.#eventStore = options.eventStore;
26✔
60
                this.#commandBus = options.commandBus;
26✔
61
                this.#queueName = options.queueName;
26✔
62
                this.#executionLock = options.executionLocker ?? new Lock();
26✔
63
                this.#logger = options.logger && 'child' in options.logger ?
26!
64
                        options.logger.child({ service: getClassName(this) }) :
65
                        options.logger;
66

67
                if (options.sagaType) {
26✔
68
                        const SagaType = options.sagaType as ISagaConstructor;
24✔
69

70
                        this.#sagaFactory = params => new SagaType(params);
24✔
71
                        this.#startsWith = SagaType.startsWith;
24✔
72
                        this.#handles = SagaType.handles;
24✔
73
                        this.#sagaDescriptor = SagaType.sagaDescriptor ?? SagaType.name;
24✔
74
                }
75
                else if (options.sagaFactory) {
2!
76
                        if (!Array.isArray(options.startsWith))
2!
77
                                throw new TypeError('options.startsWith argument must be an Array');
×
78
                        if (!Array.isArray(options.handles))
2!
79
                                throw new TypeError('options.handles argument must be an Array');
×
80

81
                        this.#sagaFactory = options.sagaFactory;
2✔
82
                        this.#startsWith = options.startsWith;
2✔
83
                        this.#handles = options.handles;
2✔
84
                        this.#sagaDescriptor = options.sagaDescriptor ?? options.queueName ?? '';
2!
85
                        if (!this.#sagaDescriptor)
2!
NEW
86
                                throw new TypeError('options.sagaDescriptor argument required when sagaFactory is provided');
×
87
                }
88
                else {
89
                        throw new Error('Either sagaType or sagaFactory is required');
×
90
                }
91
        }
92

93
        /** Overrides observer subscribe method */
94
        subscribe(eventStore: IObservable) {
95
                subscribe(eventStore, this, {
2✔
96
                        messageTypes: [...this.#startsWith, ...this.#handles],
97
                        masterHandler: this.handle,
98
                        queueName: this.#queueName
99
                });
100
        }
101

102
        /** Handle saga event */
103
        async handle(event: IEvent): Promise<void> {
104
                if (!event)
22!
105
                        throw new TypeError('event argument required');
×
106
                if (!event.type)
22!
107
                        throw new TypeError('event.type argument required');
×
108
                if (typeof event.id !== 'string' || !event.id.length)
22✔
109
                        throw new TypeError('event.id argument required');
2✔
110

111
                const isStarterEvent = this.#startsWith.includes(event.type);
20✔
112
                if (isStarterEvent && event.sagaOrigins?.[this.#sagaDescriptor])
20✔
113
                        throw new Error(`Starter event "${event.type}" already contains saga origin for "${this.#sagaDescriptor}"`);
4✔
114

115
                const sagaOrigin = isStarterEvent ? event.id : event.sagaOrigins?.[this.#sagaDescriptor];
16✔
116
                if (!sagaOrigin)
16!
NEW
117
                        throw new Error(`Event "${event.type}" does not contain saga origin for "${this.#sagaDescriptor}"`);
×
118

119
                const sagaId = makeSagaId(this.#sagaDescriptor, sagaOrigin);
16✔
120
                const saga = await this.#sagasCache.assert(sagaId, () => (isStarterEvent ?
16✔
121
                        this.#createSaga(sagaId) :
122
                        this.#restoreSaga(sagaId, event)
123
                ));
124

125
                // multiple events to a same saga ID will execute sequentially on a same saga instance
126
                const lease = await this.#executionLock.acquire(sagaId);
16✔
127

128
                try {
16✔
129
                        const commands = await saga.handle(event);
16✔
130
                        this.#logger?.debug(`"${Event.describe(event)}" processed, ${commands.map(c => c.type).join(',') || 'no commands'} produced`);
16!
131

132
                        for (const command of commands) {
16✔
133
                                // attach event context to produced command
134
                                if (command.context === undefined && event.context !== undefined)
12!
NEW
135
                                        command.context = event.context;
×
136

137
                                command.sagaOrigins = {
12✔
138
                                        ...event.sagaOrigins,
139
                                        [this.#sagaDescriptor]: sagaOrigin,
140

141
                                        // `sagaOrigins` returned in a command has the highest priority
142
                                        ...command.sagaOrigins
143
                                };
144

145
                                await this.#commandBus.sendRaw(command);
12✔
146
                        }
147
                }
148
                finally {
149
                        lease.release();
16✔
150
                        this.#sagasCache.release(sagaId);
16✔
151
                }
152
        }
153

154
        /** Start new saga */
155
        async #createSaga(id: Identifier): Promise<ISaga> {
156
                return this.#sagaFactory.call(null, { id });
10✔
157
        }
158

159
        /** Restore saga from event store */
160
        async #restoreSaga(id: Identifier, event: IEvent): Promise<ISaga> {
161
                const saga = this.#sagaFactory.call(null, { id });
4✔
162

163
                const eventsIterable = this.#eventStore.getSagaEvents(id, { beforeEvent: event });
4✔
164
                let eventsCount = 0;
4✔
165
                for await (const oldEvent of eventsIterable) {
4✔
166
                        const r = saga.mutate(oldEvent);
4✔
167
                        if (r instanceof Promise)
4!
NEW
168
                                await r;
×
169

170
                        eventsCount += 1;
4✔
171
                }
172

173
                this.#logger?.info(`Saga state restored from ${eventsCount} event(s)`);
4✔
174

175
                return saga;
4✔
176
        }
177
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc