• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

snatalenko / node-cqrs / 14431053423

13 Apr 2025 03:59PM UTC coverage: 83.069% (+0.7%) from 82.347%
14431053423

push

github

snatalenko
1.0.0-rc.8

490 of 782 branches covered (62.66%)

996 of 1199 relevant lines covered (83.07%)

21.49 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

86.36
/src/AggregateCommandHandler.ts
1
import {
2
        IAggregate,
3
        IAggregateConstructor,
4
        IAggregateFactory,
5
        ICommand,
6
        ICommandBus,
7
        ICommandHandler,
8
        IContainer,
9
        Identifier,
10
        IEventSet,
11
        IEventStore,
12
        ILogger
13
} from './interfaces';
14

15
import {
22✔
16
        iteratorToArray,
17
        getClassName,
18
        subscribe
19
} from './utils';
20

21
/**
22
 * Aggregate command handler.
23
 *
24
 * Subscribes to event store and awaits aggregate commands.
25
 * Upon command receiving creates an instance of aggregate,
26
 * restores its state, passes command and commits emitted events to event store.
27
 */
28
export class AggregateCommandHandler implements ICommandHandler {
22✔
29

30
        #eventStore: IEventStore;
31
        #logger?: ILogger;
32

33
        #aggregateFactory: IAggregateFactory<any>;
34
        #handles: string[];
35

36
        constructor({
37
                eventStore,
38
                aggregateType,
39
                aggregateFactory,
40
                handles,
41
                logger
42
        }: Pick<IContainer, 'eventStore' | 'logger'> & {
43
                aggregateType?: IAggregateConstructor<any>,
44
                aggregateFactory?: IAggregateFactory<any>,
45
                handles?: string[]
46
        }) {
47
                if (!eventStore)
20!
48
                        throw new TypeError('eventStore argument required');
×
49

50
                this.#eventStore = eventStore;
20✔
51
                this.#logger = logger && 'child' in logger ?
20!
52
                        logger.child({ service: getClassName(this) }) :
53
                        logger;
54

55
                if (aggregateType) {
20✔
56
                        const AggregateType = aggregateType;
12✔
57
                        this.#aggregateFactory = params => new AggregateType(params);
12✔
58
                        this.#handles = AggregateType.handles;
12✔
59
                }
60
                else if (aggregateFactory) {
8!
61
                        if (!Array.isArray(handles) || !handles.length)
8✔
62
                                throw new TypeError('handles argument must be an non-empty Array');
×
63

64
                        this.#aggregateFactory = aggregateFactory;
8✔
65
                        this.#handles = handles;
8✔
66
                }
67
                else {
68
                        throw new TypeError('either aggregateType or aggregateFactory is required');
×
69
                }
70
        }
71

72
        /** Subscribe to all command types handled by aggregateType */
73
        subscribe(commandBus: ICommandBus) {
74
                subscribe(commandBus, this, {
6✔
75
                        messageTypes: this.#handles,
76
                        masterHandler: (c: ICommand) => this.execute(c)
4✔
77
                });
78
        }
79

80
        /** Restore aggregate from event store events */
81
        async #restoreAggregate(id: Identifier): Promise<IAggregate> {
82
                if (!id)
6!
83
                        throw new TypeError('id argument required');
×
84

85
                const eventsIterable = this.#eventStore.getAggregateEvents(id);
6✔
86
                const events = await iteratorToArray(eventsIterable);
6✔
87

88
                const aggregate = this.#aggregateFactory({ id, events });
6✔
89

90
                this.#logger?.info(`${aggregate} state restored from ${events.length} event(s)`);
6✔
91

92
                return aggregate;
6✔
93
        }
94

95
        /** Create new aggregate with new Id generated by event store */
96
        async #createAggregate(): Promise<IAggregate> {
97
                const id = await this.#eventStore.getNewId();
14✔
98
                const aggregate = this.#aggregateFactory({ id });
14✔
99
                this.#logger?.info(`${aggregate} created`);
14✔
100

101
                return aggregate;
14✔
102
        }
103

104
        /** Pass a command to corresponding aggregate */
105
        async execute(cmd: ICommand): Promise<IEventSet> {
106
                if (!cmd)
20!
107
                        throw new TypeError('cmd argument required');
×
108
                if (!cmd.type)
20!
109
                        throw new TypeError('cmd.type argument required');
×
110

111
                const aggregate = cmd.aggregateId ?
20✔
112
                        await this.#restoreAggregate(cmd.aggregateId) :
113
                        await this.#createAggregate();
114

115
                await aggregate.handle(cmd);
20✔
116

117
                let events = aggregate.changes;
20✔
118
                this.#logger?.info(`${aggregate} "${cmd.type}" command processed, ${events.length} event(s) produced`);
20✔
119
                if (!events.length)
20✔
120
                        return events;
4✔
121

122
                if (aggregate.shouldTakeSnapshot) {
16✔
123
                        aggregate.takeSnapshot();
2✔
124
                        events = aggregate.changes;
2✔
125
                }
126

127
                await this.#eventStore.dispatch(events);
16✔
128

129
                return events;
16✔
130
        }
131
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc