• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

snatalenko / node-cqrs / 10223224402

02 Aug 2024 11:19PM UTC coverage: 96.856%. First build
10223224402

Pull #21

github

snatalenko
Delete travis config
Pull Request #21: Migrate to TypeScript

570 of 859 branches covered (66.36%)

2282 of 2360 new or added lines in 28 files covered. (96.69%)

2403 of 2481 relevant lines covered (96.86%)

21.84 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

96.27
/src/SagaEventHandler.ts
1
import * as Event from './Event';
4✔
2
import {
4✔
3
        ICommandBus,
4✔
4
        IEvent,
4✔
5
        IEventReceptor,
4✔
6
        IEventStore,
4✔
7
        IExtendableLogger,
4✔
8
        ILogger,
4✔
9
        IObservable,
4✔
10
        ISaga,
4✔
11
        ISagaConstructor,
4✔
12
        ISagaFactory
4✔
13
} from './interfaces';
4✔
14

4✔
15
import {
4✔
16
        subscribe,
4✔
17
        getClassName
4✔
18
} from './utils';
4✔
19

4✔
20
/**
4✔
21
 * Listens to Saga events,
4✔
22
 * creates new saga or restores it from event store,
4✔
23
 * applies new events
4✔
24
 * and passes command(s) to command bus
4✔
25
 */
4✔
26
export class SagaEventHandler implements IEventReceptor {
8✔
27

8✔
28
        #eventStore: IEventStore;
8✔
29
        #commandBus: ICommandBus;
8✔
30
        #queueName?: string;
8✔
31
        #logger?: ILogger;
8✔
32
        #sagaFactory: (params: any) => ISaga;
8✔
33
        #startsWith: string[];
8✔
34
        #handles: string[];
8✔
35

8✔
36
        constructor(options: {
8✔
37
                sagaType?: ISagaConstructor,
8✔
38
                sagaFactory?: ISagaFactory,
8✔
39
                eventStore: IEventStore,
8✔
40
                commandBus: ICommandBus,
8✔
41
                logger?: ILogger | IExtendableLogger,
8✔
42
                queueName?: string,
8✔
43
                startsWith?: string[],
8✔
44
                handles?: string[]
8✔
45
        }) {
8✔
46
                if (!options)
8✔
47
                        throw new TypeError('options argument required');
8!
48
                if (!options.eventStore)
8✔
49
                        throw new TypeError('options.eventStore argument required');
8!
50
                if (!options.commandBus)
8✔
51
                        throw new TypeError('options.commandBus argument required');
8!
52

8✔
53
                this.#eventStore = options.eventStore;
8✔
54
                this.#commandBus = options.commandBus;
8✔
55
                this.#queueName = options.queueName;
8✔
56
                this.#logger = options.logger && 'child' in options.logger ?
8!
57
                        options.logger.child({ service: getClassName(this) }) :
8✔
58
                        options.logger;
8✔
59

8✔
60
                if (options.sagaType) {
8✔
61
                        const SagaType = options.sagaType as ISagaConstructor;
6✔
62

6✔
63
                        this.#sagaFactory = params => new SagaType(params);
6✔
64
                        this.#startsWith = SagaType.startsWith;
6✔
65
                        this.#handles = SagaType.handles;
6✔
66
                }
6✔
67
                else if (options.sagaFactory) {
2✔
68
                        if (!Array.isArray(options.startsWith))
2✔
69
                                throw new TypeError('options.startsWith argument must be an Array');
2!
70
                        if (!Array.isArray(options.handles))
2✔
71
                                throw new TypeError('options.handles argument must be an Array');
2!
72

2✔
73
                        this.#sagaFactory = options.sagaFactory;
2✔
74
                        this.#startsWith = options.startsWith;
2✔
75
                        this.#handles = options.handles;
2✔
76
                }
2✔
NEW
77
                else {
×
NEW
78
                        throw new Error('Either sagaType or sagaFactory is required');
×
NEW
79
                }
×
80

8✔
81
                this.#eventStore.registerSagaStarters(options.startsWith);
8✔
82
        }
8✔
83

8✔
84
        /** Overrides observer subscribe method */
8✔
85
        subscribe(eventStore: IObservable) {
8✔
86
                subscribe(eventStore, this, {
2✔
87
                        messageTypes: [...this.#startsWith, ...this.#handles],
2✔
88
                        masterHandler: e => this.handle(e),
2✔
89
                        queueName: this.#queueName
2✔
90
                });
2✔
91
        }
2✔
92

8✔
93
        /** Handle saga event */
8✔
94
        async handle(event: IEvent): Promise<void> {
8✔
95
                if (!event)
6✔
96
                        throw new TypeError('event argument required');
6!
97
                if (!event.type)
6✔
98
                        throw new TypeError('event.type argument required');
6!
99

6✔
100
                const isSagaStarterEvent = this.#startsWith.includes(event.type);
6✔
101
                const saga = isSagaStarterEvent ?
6✔
102
                        await this.#createSaga() :
4✔
103
                        await this.#restoreSaga(event);
6✔
104

2✔
105
                const r = saga.apply(event);
2✔
106
                if (r instanceof Promise)
2✔
107
                        await r;
4!
108

6✔
109
                await this.#sendCommands(saga, event);
6✔
110

6✔
111
                // additional commands can be added by the saga.onError handler
6✔
112
                if (saga.uncommittedMessages.length)
6✔
113
                        await this.#sendCommands(saga, event);
6✔
114
        }
6✔
115

8✔
116
        async #sendCommands(saga: ISaga, event: IEvent<any>) {
8✔
117
                const commands = saga.uncommittedMessages;
8✔
118
                saga.resetUncommittedMessages();
8✔
119

8✔
120
                this.#logger?.debug(`"${Event.describe(event)}" processed, ${commands.map(c => c.type).join(',') || 'no commands'} produced`);
8!
121

8✔
122
                for (const command of commands) {
8✔
123

8✔
124
                        // attach event context to produced command
8✔
125
                        if (command.context === undefined && event.context !== undefined)
8✔
126
                                command.context = event.context;
8!
127

8✔
128
                        try {
8✔
129
                                await this.#commandBus.sendRaw(command);
8✔
130
                        }
6✔
131
                        catch (err: any) {
8✔
132
                                if (typeof saga.onError === 'function') {
2✔
133
                                        // let saga to handle the error
2✔
134
                                        saga.onError(err, { event, command });
2✔
135
                                }
2✔
NEW
136
                                else {
×
NEW
137
                                        throw err;
×
NEW
138
                                }
×
139
                        }
2✔
140
                }
8✔
141
        }
8✔
142

8✔
143
        /** Start new saga */
8✔
144
        async #createSaga(): Promise<ISaga> {
8✔
145
                const id = await this.#eventStore.getNewId();
4✔
146
                return this.#sagaFactory.call(null, { id });
4✔
147
        }
4✔
148

8✔
149
        /** Restore saga from event store */
8✔
150
        async #restoreSaga(event: IEvent): Promise<ISaga> {
8✔
151
                if (!event.sagaId)
2✔
152
                        throw new TypeError(`${Event.describe(event)} does not contain sagaId`);
2!
153

2✔
154
                const events = await this.#eventStore.getSagaEvents(event.sagaId, { beforeEvent: event });
2✔
155

2✔
156
                const saga = this.#sagaFactory.call(null, { id: event.sagaId, events });
2✔
157
                this.#logger?.info(`Saga state restored from ${events.length} event(s)`);
2!
158

2✔
159
                return saga;
2✔
160
        }
2✔
161
}
8✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc