• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

snatalenko / node-cqrs / 14431053423

13 Apr 2025 03:59PM UTC coverage: 83.069% (+0.7%) from 82.347%
14431053423

push

github

snatalenko
1.0.0-rc.8

490 of 782 branches covered (62.66%)

996 of 1199 relevant lines covered (83.07%)

21.49 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

80.65
/src/SagaEventHandler.ts
1
import * as Event from './Event';
22✔
2
import {
3
        ICommandBus,
4
        IContainer,
5
        IEvent,
6
        IEventReceptor,
7
        IEventStore,
8
        ILogger,
9
        IObservable,
10
        ISaga,
11
        ISagaConstructor,
12
        ISagaFactory
13
} from './interfaces';
14

15
import {
22✔
16
        subscribe,
17
        getClassName,
18
        iteratorToArray
19
} from './utils';
20

21
/**
22
 * Listens to Saga events,
23
 * creates new saga or restores it from event store,
24
 * applies new events
25
 * and passes command(s) to command bus
26
 */
27
export class SagaEventHandler implements IEventReceptor {
22✔
28

29
        #eventStore: IEventStore;
30
        #commandBus: ICommandBus;
31
        #queueName?: string;
32
        #logger?: ILogger;
33
        #sagaFactory: (params: any) => ISaga;
34
        #startsWith: string[];
35
        #handles: string[];
36

37
        constructor(options: Pick<IContainer, 'eventStore' | 'commandBus' | 'logger'> & {
38
                sagaType?: ISagaConstructor,
39
                sagaFactory?: ISagaFactory,
40
                queueName?: string,
41
                startsWith?: string[],
42
                handles?: string[]
43
        }) {
44
                if (!options)
8!
45
                        throw new TypeError('options argument required');
×
46
                if (!options.eventStore)
8!
47
                        throw new TypeError('options.eventStore argument required');
×
48
                if (!options.commandBus)
8!
49
                        throw new TypeError('options.commandBus argument required');
×
50

51
                this.#eventStore = options.eventStore;
8✔
52
                this.#commandBus = options.commandBus;
8✔
53
                this.#queueName = options.queueName;
8✔
54
                this.#logger = options.logger && 'child' in options.logger ?
8!
55
                        options.logger.child({ service: getClassName(this) }) :
56
                        options.logger;
57

58
                if (options.sagaType) {
8✔
59
                        const SagaType = options.sagaType as ISagaConstructor;
6✔
60

61
                        this.#sagaFactory = params => new SagaType(params);
6✔
62
                        this.#startsWith = SagaType.startsWith;
6✔
63
                        this.#handles = SagaType.handles;
6✔
64
                }
65
                else if (options.sagaFactory) {
2!
66
                        if (!Array.isArray(options.startsWith))
2!
67
                                throw new TypeError('options.startsWith argument must be an Array');
×
68
                        if (!Array.isArray(options.handles))
2!
69
                                throw new TypeError('options.handles argument must be an Array');
×
70

71
                        this.#sagaFactory = options.sagaFactory;
2✔
72
                        this.#startsWith = options.startsWith;
2✔
73
                        this.#handles = options.handles;
2✔
74
                }
75
                else {
76
                        throw new Error('Either sagaType or sagaFactory is required');
×
77
                }
78

79
                this.#eventStore.registerSagaStarters(options.startsWith);
8✔
80
        }
81

82
        /** Overrides observer subscribe method */
83
        subscribe(eventStore: IObservable) {
84
                subscribe(eventStore, this, {
2✔
85
                        messageTypes: [...this.#startsWith, ...this.#handles],
86
                        masterHandler: e => this.handle(e),
2✔
87
                        queueName: this.#queueName
88
                });
89
        }
90

91
        /** Handle saga event */
92
        async handle(event: IEvent): Promise<void> {
93
                if (!event)
6!
94
                        throw new TypeError('event argument required');
×
95
                if (!event.type)
6!
96
                        throw new TypeError('event.type argument required');
×
97

98
                const isSagaStarterEvent = this.#startsWith.includes(event.type);
6✔
99
                const saga = isSagaStarterEvent ?
6✔
100
                        await this.#createSaga() :
101
                        await this.#restoreSaga(event);
102

103
                const r = saga.apply(event);
6✔
104
                if (r instanceof Promise)
6!
105
                        await r;
×
106

107
                await this.#sendCommands(saga, event);
6✔
108

109
                // additional commands can be added by the saga.onError handler
110
                if (saga.uncommittedMessages.length)
6✔
111
                        await this.#sendCommands(saga, event);
2✔
112
        }
113

114
        async #sendCommands(saga: ISaga, event: IEvent<any>) {
115
                const commands = saga.uncommittedMessages;
8✔
116
                saga.resetUncommittedMessages();
8✔
117

118
                this.#logger?.debug(`"${Event.describe(event)}" processed, ${commands.map(c => c.type).join(',') || 'no commands'} produced`);
8!
119

120
                for (const command of commands) {
8✔
121

122
                        // attach event context to produced command
123
                        if (command.context === undefined && event.context !== undefined)
8✔
124
                                command.context = event.context;
×
125

126
                        try {
8✔
127
                                await this.#commandBus.sendRaw(command);
8✔
128
                        }
129
                        catch (err: any) {
130
                                if (typeof saga.onError === 'function') {
2!
131
                                        // let saga to handle the error
132
                                        saga.onError(err, { event, command });
2✔
133
                                }
134
                                else {
135
                                        throw err;
×
136
                                }
137
                        }
138
                }
139
        }
140

141
        /** Start new saga */
142
        async #createSaga(): Promise<ISaga> {
143
                const id = await this.#eventStore.getNewId();
4✔
144
                return this.#sagaFactory.call(null, { id });
4✔
145
        }
146

147
        /** Restore saga from event store */
148
        async #restoreSaga(event: IEvent): Promise<ISaga> {
149
                if (!event.sagaId)
2!
150
                        throw new TypeError(`${Event.describe(event)} does not contain sagaId`);
×
151

152
                const eventsIterable = this.#eventStore.getSagaEvents(event.sagaId, { beforeEvent: event });
2✔
153
                const events = await iteratorToArray(eventsIterable);
2✔
154

155
                const saga = this.#sagaFactory.call(null, { id: event.sagaId, events });
2✔
156
                this.#logger?.info(`Saga state restored from ${events.length} event(s)`);
2✔
157

158
                return saga;
2✔
159
        }
160
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc