• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

snatalenko / node-cqrs / 11544751153

27 Oct 2024 11:37PM UTC coverage: 95.151% (+1.4%) from 93.73%
11544751153

push

github

snatalenko
1.0.0-rc.5

563 of 866 branches covered (65.01%)

2394 of 2516 relevant lines covered (95.15%)

21.46 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

86.96
/src/SagaEventHandler.ts
1
import * as Event from './Event';
4✔
2
import {
4✔
3
        ICommandBus,
4✔
4
        IEvent,
4✔
5
        IEventReceptor,
4✔
6
        IEventStore,
4✔
7
        IExtendableLogger,
4✔
8
        ILogger,
4✔
9
        IObservable,
4✔
10
        ISaga,
4✔
11
        ISagaConstructor,
4✔
12
        ISagaFactory
4✔
13
} from './interfaces';
4✔
14

4✔
15
import {
4✔
16
        subscribe,
4✔
17
        getClassName
4✔
18
} from './utils';
4✔
19

4✔
20
/**
4✔
21
 * Listens to Saga events,
4✔
22
 * creates new saga or restores it from event store,
4✔
23
 * applies new events
4✔
24
 * and passes command(s) to command bus
4✔
25
 */
4✔
26
export class SagaEventHandler implements IEventReceptor {
2✔
27

2✔
28
        #eventStore: IEventStore;
2✔
29
        #commandBus: ICommandBus;
2✔
30
        #queueName?: string;
2✔
31
        #logger?: ILogger;
2✔
32
        #sagaFactory: (params: any) => ISaga;
2✔
33
        #startsWith: string[];
2✔
34
        #handles: string[];
2✔
35

2✔
36
        constructor(options: {
2✔
37
                sagaType?: ISagaConstructor,
2✔
38
                sagaFactory?: ISagaFactory,
2✔
39
                eventStore: IEventStore,
2✔
40
                commandBus: ICommandBus,
2✔
41
                logger?: ILogger | IExtendableLogger,
2✔
42
                queueName?: string,
2✔
43
                startsWith?: string[],
2✔
44
                handles?: string[]
2✔
45
        }) {
2✔
46
                if (!options)
2✔
47
                        throw new TypeError('options argument required');
2!
48
                if (!options.eventStore)
2✔
49
                        throw new TypeError('options.eventStore argument required');
2!
50
                if (!options.commandBus)
2✔
51
                        throw new TypeError('options.commandBus argument required');
2!
52

2✔
53
                this.#eventStore = options.eventStore;
2✔
54
                this.#commandBus = options.commandBus;
2✔
55
                this.#queueName = options.queueName;
2✔
56
                this.#logger = options.logger && 'child' in options.logger ?
2!
57
                        options.logger.child({ service: getClassName(this) }) :
2✔
58
                        options.logger;
2✔
59

2✔
60
                if (options.sagaType) {
2!
61
                        const SagaType = options.sagaType as ISagaConstructor;
×
62

×
63
                        this.#sagaFactory = params => new SagaType(params);
✔
64
                        this.#startsWith = SagaType.startsWith;
×
65
                        this.#handles = SagaType.handles;
×
66
                }
×
67
                else if (options.sagaFactory) {
2✔
68
                        if (!Array.isArray(options.startsWith))
2✔
69
                                throw new TypeError('options.startsWith argument must be an Array');
2!
70
                        if (!Array.isArray(options.handles))
2✔
71
                                throw new TypeError('options.handles argument must be an Array');
2!
72

2✔
73
                        this.#sagaFactory = options.sagaFactory;
2✔
74
                        this.#startsWith = options.startsWith;
2✔
75
                        this.#handles = options.handles;
2✔
76
                }
2✔
77
                else {
×
78
                        throw new Error('Either sagaType or sagaFactory is required');
×
79
                }
×
80

2✔
81
                this.#eventStore.registerSagaStarters(options.startsWith);
2✔
82
        }
2✔
83

2✔
84
        /** Overrides observer subscribe method */
2✔
85
        subscribe(eventStore: IObservable) {
2✔
86
                subscribe(eventStore, this, {
2✔
87
                        messageTypes: [...this.#startsWith, ...this.#handles],
2✔
88
                        masterHandler: e => this.handle(e),
2✔
89
                        queueName: this.#queueName
2✔
90
                });
2✔
91
        }
2✔
92

2✔
93
        /** Handle saga event */
2✔
94
        async handle(event: IEvent): Promise<void> {
2✔
95
                if (!event)
2✔
96
                        throw new TypeError('event argument required');
2!
97
                if (!event.type)
2✔
98
                        throw new TypeError('event.type argument required');
2!
99

2✔
100
                const isSagaStarterEvent = this.#startsWith.includes(event.type);
2✔
101
                const saga = isSagaStarterEvent ?
2✔
102
                        await this.#createSaga() :
2!
103
                        await this.#restoreSaga(event);
2✔
104

×
105
                const r = saga.apply(event);
×
106
                if (r instanceof Promise)
×
107
                        await r;
×
108

2✔
109
                await this.#sendCommands(saga, event);
2✔
110

2✔
111
                // additional commands can be added by the saga.onError handler
2✔
112
                if (saga.uncommittedMessages.length)
2✔
113
                        await this.#sendCommands(saga, event);
2!
114
        }
2✔
115

2✔
116
        async #sendCommands(saga: ISaga, event: IEvent<any>) {
2✔
117
                const commands = saga.uncommittedMessages;
2✔
118
                saga.resetUncommittedMessages();
2✔
119

2✔
120
                this.#logger?.debug(`"${Event.describe(event)}" processed, ${commands.map(c => c.type).join(',') || 'no commands'} produced`);
2!
121

2✔
122
                for (const command of commands) {
2✔
123

2✔
124
                        // attach event context to produced command
2✔
125
                        if (command.context === undefined && event.context !== undefined)
2✔
126
                                command.context = event.context;
2!
127

2✔
128
                        try {
2✔
129
                                await this.#commandBus.sendRaw(command);
2✔
130
                        }
2✔
131
                        catch (err: any) {
2!
132
                                if (typeof saga.onError === 'function') {
×
133
                                        // let saga to handle the error
×
134
                                        saga.onError(err, { event, command });
×
135
                                }
×
136
                                else {
×
137
                                        throw err;
×
138
                                }
×
139
                        }
×
140
                }
2✔
141
        }
2✔
142

2✔
143
        /** Start new saga */
2✔
144
        async #createSaga(): Promise<ISaga> {
2✔
145
                const id = await this.#eventStore.getNewId();
2✔
146
                return this.#sagaFactory.call(null, { id });
2✔
147
        }
2✔
148

2✔
149
        /** Restore saga from event store */
2✔
150
        async #restoreSaga(event: IEvent): Promise<ISaga> {
2✔
151
                if (!event.sagaId)
2✔
152
                        throw new TypeError(`${Event.describe(event)} does not contain sagaId`);
2!
153

2✔
154
                const events = await this.#eventStore.getSagaEvents(event.sagaId, { beforeEvent: event });
2✔
155

2✔
156
                const saga = this.#sagaFactory.call(null, { id: event.sagaId, events });
2✔
157
                this.#logger?.info(`Saga state restored from ${events.length} event(s)`);
2!
158

2✔
159
                return saga;
2✔
160
        }
2✔
161
}
2✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc