• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

snatalenko / node-cqrs / 14938331406

09 May 2025 09:51PM UTC coverage: 83.454% (+0.3%) from 83.126%
14938331406

push

github

snatalenko
1.0.0-rc.11

509 of 809 branches covered (62.92%)

1039 of 1245 relevant lines covered (83.45%)

25.06 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

86.21
/src/AggregateCommandHandler.ts
1
import { getClassName, Lock, MapAssertable } from './utils';
22✔
2
import {
22✔
3
        IAggregate,
4
        IAggregateConstructor,
5
        IAggregateFactory,
6
        ICommand,
7
        ICommandHandler,
8
        IContainer,
9
        Identifier,
10
        IEventSet,
11
        IEventStore,
12
        ILogger,
13
        IObservable,
14
        isIObservable
15
} from './interfaces';
16

17
/**
18
 * Aggregate command handler.
19
 *
20
 * Subscribes to event store and awaits aggregate commands.
21
 * Upon command receiving creates an instance of aggregate,
22
 * restores its state, passes command and commits emitted events to event store.
23
 */
24
export class AggregateCommandHandler<TAggregate extends IAggregate> implements ICommandHandler {
22✔
25

26
        #eventStore: IEventStore;
27
        #logger?: ILogger;
28
        #aggregateFactory: IAggregateFactory<TAggregate, any>;
29
        #handles: string[];
30

31
        /** Aggregate instances cache for concurrent command handling */
32
        #aggregatesCache: MapAssertable<Identifier, Promise<TAggregate>> = new MapAssertable();
24✔
33

34
        /** Lock for sequential aggregate command execution */
35
        #executionLock = new Lock();
24✔
36

37
        constructor({
38
                eventStore,
39
                aggregateType,
40
                aggregateFactory,
41
                handles,
42
                logger
43
        }: Pick<IContainer, 'eventStore' | 'logger'> & {
44
                aggregateType?: IAggregateConstructor<TAggregate, any>,
45
                aggregateFactory?: IAggregateFactory<TAggregate, any>,
46
                handles?: string[]
47
        }) {
48
                if (!eventStore)
24!
49
                        throw new TypeError('eventStore argument required');
×
50

51
                this.#eventStore = eventStore;
24✔
52
                this.#logger = logger && 'child' in logger ?
24!
53
                        logger.child({ service: getClassName(this) }) :
54
                        logger;
55

56
                if (aggregateType) {
24✔
57
                        const AggregateType = aggregateType;
14✔
58
                        this.#aggregateFactory = params => new AggregateType(params);
14✔
59
                        this.#handles = AggregateType.handles;
14✔
60
                }
61
                else if (aggregateFactory) {
10!
62
                        if (!Array.isArray(handles) || !handles.length)
10✔
63
                                throw new TypeError('handles argument must be an non-empty Array');
×
64

65
                        this.#aggregateFactory = aggregateFactory;
10✔
66
                        this.#handles = handles;
10✔
67
                }
68
                else {
69
                        throw new TypeError('either aggregateType or aggregateFactory is required');
×
70
                }
71
        }
72

73
        /** Subscribe to all command types handled by aggregateType */
74
        subscribe(commandBus: IObservable) {
75
                if (!commandBus)
6!
76
                        throw new TypeError('commandBus argument required');
×
77
                if (!isIObservable(commandBus))
6!
78
                        throw new TypeError('commandBus argument must implement IObservable interface');
×
79

80
                for (const commandType of this.#handles)
6✔
81
                        commandBus.on(commandType, (cmd: ICommand) => this.execute(cmd));
8✔
82
        }
83

84
        /** Restore aggregate from event store events */
85
        async #restoreAggregate(id: Identifier): Promise<TAggregate> {
86
                if (!id)
16!
87
                        throw new TypeError('id argument required');
×
88

89
                const eventsIterable = this.#eventStore.getAggregateEvents(id);
16✔
90
                const aggregate = this.#aggregateFactory({ id });
16✔
91

92
                let eventCount = 0;
16✔
93
                for await (const event of eventsIterable) {
16✔
94
                        aggregate.mutate(event);
10✔
95
                        eventCount += 1;
10✔
96
                }
97

98
                this.#logger?.info(`${aggregate} state restored from ${eventCount} event(s)`);
16✔
99

100
                return aggregate;
16✔
101
        }
102

103
        /** Create new aggregate with new Id generated by event store */
104
        async #createAggregate(): Promise<TAggregate> {
105
                const id = await this.#eventStore.getNewId();
14✔
106
                const aggregate = this.#aggregateFactory({ id });
14✔
107
                this.#logger?.info(`${aggregate} created`);
14✔
108

109
                return aggregate;
14✔
110
        }
111

112
        async #getAggregateInstance(aggregateId?: Identifier) {
113
                if (!aggregateId)
34✔
114
                        return this.#createAggregate();
14✔
115
                else
116
                        return this.#aggregatesCache.assert(aggregateId, () => this.#restoreAggregate(aggregateId));
20✔
117
        }
118

119
        /** Pass a command to corresponding aggregate */
120
        async execute(cmd: ICommand): Promise<IEventSet> {
121
                if (!cmd)
34!
122
                        throw new TypeError('cmd argument required');
×
123
                if (!cmd.type)
34!
124
                        throw new TypeError('cmd.type argument required');
×
125

126
                // create new or get cached aggregate instance promise
127
                // multiple concurrent calls to #getAggregateInstance will return the same promise
128
                const aggregate = await this.#getAggregateInstance(cmd.aggregateId);
34✔
129

130
                try {
34✔
131
                        // multiple concurrent commands to a same aggregateId will execute sequentially
132
                        if (cmd.aggregateId)
34✔
133
                                this.#executionLock.acquire(String(cmd.aggregateId));
20✔
134

135
                        // pass command to aggregate instance
136
                        const events = await aggregate.handle(cmd);
34✔
137

138
                        this.#logger?.info(`${aggregate} "${cmd.type}" command processed, ${events.length} event(s) produced`);
34✔
139

140
                        if (events.length)
34✔
141
                                await this.#eventStore.dispatch(events);
30✔
142

143
                        return events;
34✔
144
                }
145
                finally {
146
                        if (cmd.aggregateId) {
34✔
147
                                this.#executionLock.release(String(cmd.aggregateId));
20✔
148
                                this.#aggregatesCache.release(cmd.aggregateId);
20✔
149
                        }
150
                }
151
        }
152
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc