• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

snatalenko / node-cqrs / 21880136811

10 Feb 2026 07:52PM UTC coverage: 85.292% (-9.1%) from 94.396%
21880136811

Pull #28

github

web-flow
Merge 528c3aaa0 into 828e39903
Pull Request #28: TypeScript and event dispatching pipeline refactoring

673 of 1010 branches covered (66.63%)

935 of 1061 new or added lines in 68 files covered. (88.12%)

49 existing lines in 13 files now uncovered.

1270 of 1489 relevant lines covered (85.29%)

33.79 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

88.41
/src/SagaEventHandler.ts
1
import * as Event from './Event.ts';
28✔
2
import type {
3
        ICommandBus,
4
        IContainer,
5
        Identifier,
6
        IEvent,
7
        IEventReceptor,
8
        IEventStore,
9
        ILocker,
10
        ILogger,
11
        IObservable,
12
        ISaga,
13
        ISagaConstructor,
14
        ISagaFactory
15
} from './interfaces/index.ts';
16

17
import {
28✔
18
        subscribe,
19
        getClassName,
20
        Lock,
21
        makeSagaId,
22
        MapAssertable
23
} from './utils/index.ts';
24

25
/**
26
 * Listens to Saga events,
27
 * creates new saga or restores it from event store,
28
 * applies new events
29
 * and passes command(s) to command bus
30
 */
31
export class SagaEventHandler implements IEventReceptor {
28✔
32

33
        readonly #eventStore: IEventStore;
34
        readonly #commandBus: ICommandBus;
35
        readonly #queueName?: string;
36
        readonly #logger?: ILogger;
37
        readonly #sagaFactory: (params: any) => ISaga;
38
        readonly #startsWith?: string[];
39
        readonly #handles: string[];
40
        readonly #sagaDescriptor: string;
41
        readonly #executionLock: ILocker;
42
        readonly #sagasCache: MapAssertable<Identifier, Promise<ISaga>> = new MapAssertable();
48✔
43

44
        constructor(options: Pick<IContainer, 'eventStore' | 'commandBus' | 'executionLocker' | 'logger'> & {
45
                sagaType?: ISagaConstructor,
46
                sagaFactory?: ISagaFactory,
47
                sagaDescriptor?: string,
48
                queueName?: string,
49
                startsWith?: string[],
50
                handles?: string[]
51
        }) {
52
                if (!options)
48!
UNCOV
53
                        throw new TypeError('options argument required');
×
54
                if (!options.eventStore)
48!
UNCOV
55
                        throw new TypeError('options.eventStore argument required');
×
56
                if (!options.commandBus)
48!
UNCOV
57
                        throw new TypeError('options.commandBus argument required');
×
58

59
                this.#eventStore = options.eventStore;
48✔
60
                this.#commandBus = options.commandBus;
48✔
61
                this.#queueName = options.queueName;
48✔
62
                this.#executionLock = options.executionLocker ?? new Lock();
48✔
63
                this.#logger = options.logger && 'child' in options.logger ?
48!
64
                        options.logger.child({ service: getClassName(this) }) :
65
                        options.logger;
66

67
                if (options.sagaType) {
48✔
68
                        const SagaType = options.sagaType as ISagaConstructor;
40✔
69

70
                        this.#sagaFactory = params => new SagaType(params);
40✔
71
                        this.#startsWith = SagaType.startsWith;
40✔
72
                        this.#handles = SagaType.handles;
40✔
73
                        this.#sagaDescriptor = SagaType.sagaDescriptor ?? SagaType.name;
40✔
74
                }
75
                else if (options.sagaFactory) {
8!
76
                        if (!Array.isArray(options.handles))
8!
UNCOV
77
                                throw new TypeError('options.handles argument must be an Array');
×
78

79
                        this.#sagaFactory = options.sagaFactory;
8✔
80
                        this.#startsWith = options.startsWith;
8✔
81
                        this.#handles = options.handles;
8✔
82
                        this.#sagaDescriptor = options.sagaDescriptor ?? options.queueName ?? '';
8✔
83
                        if (!this.#sagaDescriptor)
8✔
84
                                throw new TypeError('options.sagaDescriptor argument required when sagaFactory is provided');
2✔
85
                }
86
                else {
87
                        throw new Error('Either sagaType or sagaFactory is required');
×
88
                }
89
        }
90

91
        /** Overrides observer subscribe method */
92
        subscribe(eventStore: IObservable) {
93
                subscribe(eventStore, this, {
4✔
94
                        messageTypes: [...this.#startsWith ?? [], ...this.#handles],
2!
95
                        masterHandler: this.handle,
96
                        queueName: this.#queueName
97
                });
98
        }
99

100
        /** Handle saga event */
101
        async handle(event: IEvent): Promise<void> {
102
                if (!event)
34!
UNCOV
103
                        throw new TypeError('event argument required');
×
104
                if (!event.type)
34!
UNCOV
105
                        throw new TypeError('event.type argument required');
×
106
                if (typeof event.id !== 'string' || !event.id.length)
34✔
107
                        throw new TypeError('event.id argument required');
2✔
108

109
                const sagaOriginFromEvent = event.sagaOrigins?.[this.#sagaDescriptor];
32✔
110
                const isStarterEvent = this.#startsWith?.includes(event.type) ?? !sagaOriginFromEvent;
32✔
111
                if (isStarterEvent && sagaOriginFromEvent)
32✔
112
                        throw new Error(`Starter event "${event.type}" already contains saga origin for "${this.#sagaDescriptor}"`);
4✔
113

114
                const sagaOrigin = isStarterEvent ? event.id : sagaOriginFromEvent;
28✔
115
                if (!sagaOrigin)
28!
NEW
116
                        throw new Error(`Event "${event.type}" does not contain saga origin for "${this.#sagaDescriptor}"`);
×
117

118
                const sagaId = makeSagaId(this.#sagaDescriptor, sagaOrigin);
28✔
119
                const saga = await this.#sagasCache.assert(sagaId, () => (isStarterEvent ?
28✔
120
                        this.#createSaga(sagaId) :
121
                        this.#restoreSaga(sagaId, event)
122
                ));
123

124
                // multiple events to a same saga ID will execute sequentially on a same saga instance
125
                const lease = await this.#executionLock.acquire(sagaId);
28✔
126

127
                try {
28✔
128
                        const commands = await saga.handle(event);
28✔
129
                        this.#logger?.debug(`"${Event.describe(event)}" processed, ${commands.map(c => c.type).join(',') || 'no commands'} produced`);
28!
130

131
                        for (const command of commands) {
28✔
132
                                // attach event context to produced command
133
                                if (command.context === undefined && event.context !== undefined)
20✔
134
                                        command.context = event.context;
2✔
135

136
                                command.sagaOrigins = {
20✔
137
                                        ...event.sagaOrigins,
138
                                        [this.#sagaDescriptor]: sagaOrigin,
139

140
                                        // `sagaOrigins` returned in a command has the highest priority
141
                                        ...command.sagaOrigins
142
                                };
143

144
                                await this.#commandBus.sendRaw(command);
20✔
145
                        }
146
                }
147
                finally {
148
                        lease.release();
28✔
149
                        this.#sagasCache.release(sagaId);
28✔
150
                }
151
        }
152

153
        /** Start new saga */
154
        async #createSaga(id: Identifier): Promise<ISaga> {
155
                return this.#sagaFactory.call(null, { id });
18✔
156
        }
157

158
        /** Restore saga from event store */
159
        async #restoreSaga(id: Identifier, event: IEvent): Promise<ISaga> {
160
                const saga = this.#sagaFactory.call(null, { id });
8✔
161

162
                const eventsIterable = this.#eventStore.getSagaEvents(id, { beforeEvent: event });
8✔
163
                let eventsCount = 0;
8✔
164
                for await (const oldEvent of eventsIterable) {
8✔
165
                        const r = saga.mutate(oldEvent);
10✔
166
                        if (r instanceof Promise)
10✔
167
                                await r;
4✔
168

169
                        eventsCount += 1;
10✔
170
                }
171

172
                this.#logger?.info(`Saga state restored from ${eventsCount} event(s)`);
8✔
173

174
                return saga;
8✔
175
        }
176
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc