• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

snatalenko / node-cqrs / 22745360142

06 Mar 2026 01:50AM UTC coverage: 95.287% (+0.9%) from 94.396%
22745360142

Pull #28

github

web-flow
Merge 15ef847b4 into 828e39903
Pull Request #28: TypeScript and event dispatching pipeline refactoring

428 of 528 branches covered (81.06%)

1043 of 1091 new or added lines in 65 files covered. (95.6%)

3 existing lines in 2 files now uncovered.

1294 of 1358 relevant lines covered (95.29%)

31.11 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

96.77
/src/SagaEventHandler.ts
1
import * as Event from './Event.ts';
14✔
2
import type {
3
        ICommandBus,
4
        IContainer,
5
        Identifier,
6
        IEvent,
7
        IEventReceptor,
8
        IEventStore,
9
        ILocker,
10
        ILogger,
11
        IObservable,
12
        ISaga,
13
        ISagaConstructor,
14
        ISagaFactory
15
} from './interfaces/index.ts';
16

17
import {
14✔
18
        subscribe,
19
        Lock,
20
        makeSagaId,
21
        MapAssertable,
22
        assertDefined,
23
        assertString,
24
        assertOptionalArray
25
} from './utils/index.ts';
26

27
/**
28
 * Listens to Saga events,
29
 * creates new saga or restores it from event store,
30
 * applies new events
31
 * and passes command(s) to command bus
32
 */
33
export class SagaEventHandler implements IEventReceptor {
14✔
34

35
        readonly #eventStore: IEventStore;
36
        readonly #commandBus: ICommandBus;
37
        readonly #queueName?: string;
38
        readonly #logger?: ILogger;
39
        readonly #sagaFactory: (params: any) => ISaga;
40
        readonly #startsWith?: string[];
41
        readonly #handles: string[];
42
        readonly #sagaDescriptor: string;
43
        readonly #executionLock: ILocker;
44
        readonly #sagasCache: MapAssertable<Identifier, Promise<ISaga>> = new MapAssertable();
24✔
45

46
        constructor(options: Pick<IContainer, 'eventStore' | 'commandBus' | 'executionLocker' | 'logger'> & {
47
                sagaType?: ISagaConstructor,
48
                sagaFactory?: ISagaFactory,
49
                sagaDescriptor?: string,
50
                queueName?: string,
51
                startsWith?: string[],
52
                handles?: string[]
53
        }) {
54
                assertDefined(options, 'options');
24✔
55
                assertDefined(options.eventStore, 'options.eventStore');
24✔
56
                assertDefined(options.commandBus, 'options.commandBus');
24✔
57

58
                this.#eventStore = options.eventStore;
24✔
59
                this.#commandBus = options.commandBus;
24✔
60
                this.#queueName = options.queueName;
24✔
61
                this.#executionLock = options.executionLocker ?? new Lock();
24✔
62
                this.#logger = options.logger && 'child' in options.logger ?
24!
63
                        options.logger.child({ service: new.target.name }) :
64
                        options.logger;
65

66
                if (options.sagaType) {
24✔
67
                        const SagaType = options.sagaType as ISagaConstructor;
20✔
68

69
                        this.#sagaFactory = params => new SagaType(params);
20✔
70
                        this.#startsWith = SagaType.startsWith;
20✔
71
                        this.#handles = SagaType.handles;
20✔
72
                        this.#sagaDescriptor = SagaType.sagaDescriptor ?? SagaType.name;
20✔
73
                }
74
                else if (options.sagaFactory) {
4!
75
                        assertOptionalArray(options.handles, 'options.handles');
4✔
76
                        assertString(options.sagaDescriptor, 'options.sagaDescriptor');
4✔
77

78
                        this.#sagaFactory = options.sagaFactory;
3✔
79
                        this.#startsWith = options.startsWith;
3✔
80
                        this.#handles = options.handles;
3✔
81
                        this.#sagaDescriptor = options.sagaDescriptor;
3✔
82
                }
83
                else {
84
                        throw new Error('Either sagaType or sagaFactory is required');
×
85
                }
86
        }
87

88
        /** Overrides observer subscribe method */
89
        subscribe(eventStore: IObservable) {
90
                subscribe(eventStore, this, {
2✔
91
                        messageTypes: [...this.#startsWith ?? [], ...this.#handles],
2!
92
                        masterHandler: this.handle,
93
                        queueName: this.#queueName
94
                });
95
        }
96

97
        /** Handle saga event */
98
        async handle(event: IEvent): Promise<void> {
99
                assertDefined(event, 'event');
17✔
100
                assertDefined(event.type, 'event.type');
17✔
101
                assertString(event.id, 'event.id');
17✔
102

103
                const sagaOriginFromEvent = event.sagaOrigins?.[this.#sagaDescriptor];
16✔
104
                const isStarterEvent = this.#startsWith?.includes(event.type) ?? !sagaOriginFromEvent;
16✔
105
                if (isStarterEvent && sagaOriginFromEvent)
16✔
106
                        throw new Error(`Starter event "${event.type}" already contains saga origin for "${this.#sagaDescriptor}"`);
2✔
107

108
                const sagaOrigin = isStarterEvent ? event.id : sagaOriginFromEvent;
14✔
109
                if (!sagaOrigin)
14!
NEW
110
                        throw new Error(`Event "${event.type}" does not contain saga origin for "${this.#sagaDescriptor}"`);
×
111

112
                const sagaId = makeSagaId(this.#sagaDescriptor, sagaOrigin);
14✔
113
                const saga = await this.#sagasCache.assert(sagaId, () => (isStarterEvent ?
14✔
114
                        this.#createSaga(sagaId) :
115
                        this.#restoreSaga(sagaId, event)
116
                ));
117

118
                // multiple events to a same saga ID will execute sequentially on a same saga instance
119
                const lease = await this.#executionLock.acquire(sagaId);
14✔
120

121
                try {
14✔
122
                        const commands = await saga.handle(event);
14✔
123
                        this.#logger?.debug(`"${Event.describe(event)}" processed, ${commands.map(c => c.type).join(',') || 'no commands'} produced`);
14!
124

125
                        for (const command of commands) {
14✔
126
                                // attach event context to produced command
127
                                if (command.context === undefined && event.context !== undefined)
10✔
128
                                        command.context = event.context;
1✔
129

130
                                if (command.sagaOrigins === undefined) {
10✔
131
                                        command.sagaOrigins = {
10✔
132
                                                ...event.sagaOrigins,
133
                                                [this.#sagaDescriptor]: sagaOrigin
134
                                        };
135
                                }
136

137
                                await this.#commandBus.sendRaw(command);
10✔
138
                        }
139
                }
140
                finally {
141
                        lease.release();
14✔
142
                        this.#sagasCache.release(sagaId);
14✔
143
                }
144
        }
145

146
        /** Start new saga */
147
        async #createSaga(id: Identifier): Promise<ISaga> {
148
                return this.#sagaFactory.call(null, { id });
9✔
149
        }
150

151
        /** Restore saga from event store */
152
        async #restoreSaga(id: Identifier, event: IEvent): Promise<ISaga> {
153
                const saga = this.#sagaFactory.call(null, { id });
4✔
154

155
                const eventsIterable = this.#eventStore.getSagaEvents(id, { beforeEvent: event });
4✔
156
                let eventsCount = 0;
4✔
157
                for await (const oldEvent of eventsIterable) {
4✔
158
                        const r = saga.mutate(oldEvent);
5✔
159
                        if (r instanceof Promise)
5✔
160
                                await r;
2✔
161

162
                        eventsCount += 1;
5✔
163
                }
164

165
                this.#logger?.info(`Saga state restored from ${eventsCount} event(s)`);
4✔
166

167
                return saga;
4✔
168
        }
169
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc