• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

snatalenko / node-cqrs / 10223191268

02 Aug 2024 11:15PM UTC coverage: 96.856%. First build
10223191268

Pull #21

github

snatalenko
Add script badges to readme
Pull Request #21: Migrate to TypeScript

570 of 859 branches covered (66.36%)

2282 of 2360 new or added lines in 28 files covered. (96.69%)

2403 of 2481 relevant lines covered (96.86%)

21.84 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

97.67
/src/AggregateCommandHandler.ts
1
import {
4✔
2
        IAggregate,
4✔
3
        IAggregateConstructor,
4✔
4
        IAggregateFactory,
4✔
5
        ICommand,
4✔
6
        ICommandBus,
4✔
7
        ICommandHandler,
4✔
8
        Identifier,
4✔
9
        IEventSet,
4✔
10
        IEventStore,
4✔
11
        IExtendableLogger,
4✔
12
        ILogger
4✔
13
} from "./interfaces";
4✔
14

4✔
15
import {
4✔
16
        getClassName,
4✔
17
        getHandledMessageTypes,
4✔
18
        subscribe
4✔
19
} from './utils';
4✔
20

4✔
21
/**
4✔
22
 * Aggregate command handler.
4✔
23
 *
4✔
24
 * Subscribes to event store and awaits aggregate commands.
4✔
25
 * Upon command receiving creates an instance of aggregate,
4✔
26
 * restores its state, passes command and commits emitted events to event store.
4✔
27
 */
4✔
28
export class AggregateCommandHandler implements ICommandHandler {
20✔
29

20✔
30
        #eventStore: IEventStore;
20✔
31
        #logger?: ILogger;
20✔
32

20✔
33
        #aggregateFactory: IAggregateFactory<any>;
20✔
34
        #handles: string[];
20✔
35

20✔
36
        constructor({
20✔
37
                eventStore,
20✔
38
                aggregateType,
20✔
39
                aggregateFactory,
20✔
40
                handles,
20✔
41
                logger
20✔
42
        }: {
20✔
43
                eventStore: IEventStore,
20✔
44
                aggregateType?: IAggregateConstructor<any>,
20✔
45
                aggregateFactory?: IAggregateFactory<any>,
20✔
46
                handles?: string[],
20✔
47
                logger?: ILogger | IExtendableLogger
20✔
48
        }) {
20✔
49
                if (!eventStore)
20✔
50
                        throw new TypeError('eventStore argument required');
20!
51

20✔
52
                this.#eventStore = eventStore;
20✔
53
                this.#logger = logger && 'child' in logger ?
20!
54
                        logger.child({ service: getClassName(this) }) :
20✔
55
                        logger;
20✔
56

20✔
57
                if (aggregateType) {
20✔
58
                        const AggregateType = aggregateType;
12✔
59
                        this.#aggregateFactory = params => new AggregateType(params);
12✔
60
                        this.#handles = getHandledMessageTypes(AggregateType);
12✔
61
                }
12✔
62
                else if (aggregateFactory) {
8✔
63
                        if (!Array.isArray(handles) || !handles.length)
8✔
64
                                throw new TypeError('handles argument must be an non-empty Array');
8!
65

8✔
66
                        this.#aggregateFactory = aggregateFactory;
8✔
67
                        this.#handles = handles;
8✔
68
                }
8✔
NEW
69
                else {
×
NEW
70
                        throw new TypeError('either aggregateType or aggregateFactory is required');
×
NEW
71
                }
×
72
        }
20✔
73

20✔
74
        /** Subscribe to all command types handled by aggregateType */
20✔
75
        subscribe(commandBus: ICommandBus) {
20✔
76
                subscribe(commandBus, this, {
6✔
77
                        messageTypes: this.#handles,
6✔
78
                        masterHandler: (c: ICommand) => this.execute(c)
6✔
79
                });
6✔
80
        }
6✔
81

20✔
82
        /** Restore aggregate from event store events */
20✔
83
        async #restoreAggregate(id: Identifier): Promise<IAggregate> {
20✔
84
                if (!id)
6✔
85
                        throw new TypeError('id argument required');
6!
86

6✔
87
                const events = await this.#eventStore.getAggregateEvents(id);
6✔
88
                const aggregate = this.#aggregateFactory({ id, events });
6✔
89

6✔
90
                this.#logger?.info(`${aggregate} state restored from ${events.length} event(s)`);
6!
91

6✔
92
                return aggregate;
6✔
93
        }
6✔
94

20✔
95
        /** Create new aggregate with new Id generated by event store */
20✔
96
        async #createAggregate(): Promise<IAggregate> {
20✔
97
                const id = await this.#eventStore.getNewId();
14✔
98
                const aggregate = this.#aggregateFactory({ id });
14✔
99
                this.#logger?.info(`${aggregate} created`);
14!
100

14✔
101
                return aggregate;
14✔
102
        }
14✔
103

20✔
104
        /** Pass a command to corresponding aggregate */
20✔
105
        async execute(cmd: ICommand): Promise<IEventSet> {
20✔
106
                if (!cmd) throw new TypeError('cmd argument required');
20!
107
                if (!cmd.type) throw new TypeError('cmd.type argument required');
20!
108

20✔
109
                const aggregate = cmd.aggregateId ?
20✔
110
                        await this.#restoreAggregate(cmd.aggregateId) :
20✔
111
                        await this.#createAggregate();
20✔
112

14✔
113
                await aggregate.handle(cmd);
14✔
114

20✔
115
                let events = aggregate.changes;
20✔
116
                this.#logger?.info(`${aggregate} "${cmd.type}" command processed, ${events.length} event(s) produced`);
20!
117
                if (!events.length)
20✔
118
                        return events;
20✔
119

16✔
120
                if (aggregate.shouldTakeSnapshot && this.#eventStore.snapshotsSupported) {
20✔
121
                        aggregate.takeSnapshot();
2✔
122
                        events = aggregate.changes;
2✔
123
                }
2✔
124

16✔
125
                await this.#eventStore.commit(events);
16✔
126

16✔
127
                return events;
16✔
128
        }
16✔
129
}
20✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc