• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

snatalenko / node-cqrs / 21966362039

12 Feb 2026 10:15PM UTC coverage: 85.328% (-9.1%) from 94.396%
21966362039

Pull #28

github

web-flow
Merge 9adf24177 into 828e39903
Pull Request #28: TypeScript and event dispatching pipeline refactoring

671 of 1008 branches covered (66.57%)

927 of 1051 new or added lines in 67 files covered. (88.2%)

49 existing lines in 13 files now uncovered.

1262 of 1479 relevant lines covered (85.33%)

33.76 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

86.67
/src/AggregateCommandHandler.ts
1
import { getClassName, Lock, MapAssertable } from './utils/index.ts';
28✔
2
import {
28✔
3
        type AggregateEventsQueryParams,
4
        type IAggregate,
5
        type IAggregateConstructor,
6
        type IAggregateFactory,
7
        type ICommand,
8
        type ICommandHandler,
9
        type IContainer,
10
        type Identifier,
11
        type IEventSet,
12
        type IEventStore,
13
        type ILocker,
14
        type ILogger,
15
        type IObservable,
16
        isIObservable
17
} from './interfaces/index.ts';
18

19
/**
20
 * Aggregate command handler.
21
 *
22
 * Subscribes to event store and awaits aggregate commands.
23
 * Upon command receiving creates an instance of aggregate,
24
 * restores its state, passes command and commits emitted events to event store.
25
 */
26
export class AggregateCommandHandler<TAggregate extends IAggregate> implements ICommandHandler {
28✔
27

28
        readonly #eventStore: IEventStore;
29
        readonly #logger?: ILogger;
30
        readonly #aggregateFactory: IAggregateFactory<TAggregate, any>;
31
        readonly #handles: Readonly<string[]>;
32
        readonly #restoresFrom?: Readonly<string[]>;
33

34
        /** Aggregate instances cache for concurrent command handling */
35
        #aggregatesCache: MapAssertable<Identifier, Promise<TAggregate>> = new MapAssertable();
28✔
36

37
        /** Lock for sequential aggregate command execution */
38
        #executionLock: ILocker;
39

40
        constructor({
41
                eventStore,
42
                aggregateType,
43
                aggregateFactory,
44
                handles,
45
                executionLocker = new Lock(),
14✔
46
                restoresFrom,
47
                logger
48
        }: Pick<IContainer, 'eventStore' | 'executionLocker' | 'logger'> & {
49
                aggregateType?: IAggregateConstructor<TAggregate, any>,
50
                aggregateFactory?: IAggregateFactory<TAggregate, any>,
51
                handles?: Readonly<string[]>,
52
                restoresFrom?: Readonly<string[]>
53
        }) {
54
                if (!eventStore)
28!
UNCOV
55
                        throw new TypeError('eventStore argument required');
×
56

57
                this.#eventStore = eventStore;
28✔
58
                this.#executionLock = executionLocker;
28✔
59
                this.#logger = logger && 'child' in logger ?
28!
60
                        logger.child({ service: getClassName(this) }) :
61
                        logger;
62

63
                if (aggregateType) {
28✔
64
                        const AggregateType = aggregateType;
18✔
65
                        this.#aggregateFactory = params => new AggregateType(params);
18✔
66
                        this.#handles = AggregateType.handles;
18✔
67
                        this.#restoresFrom = AggregateType.restoresFrom;
18✔
68
                }
69
                else if (aggregateFactory) {
10!
70
                        if (!Array.isArray(handles) || !handles.length)
10!
UNCOV
71
                                throw new TypeError('handles argument must be an non-empty Array');
×
72

73
                        this.#aggregateFactory = aggregateFactory;
10✔
74
                        this.#handles = handles;
10✔
75
                        this.#restoresFrom = restoresFrom;
10✔
76
                }
77
                else {
78
                        throw new TypeError('either aggregateType or aggregateFactory is required');
×
79
                }
80
        }
81

82
        /** Subscribe to all command types handled by aggregateType */
83
        subscribe(commandBus: IObservable) {
84
                if (!commandBus)
8!
NEW
85
                        throw new TypeError('commandBus argument required');
×
86
                if (!isIObservable(commandBus))
8!
NEW
87
                        throw new TypeError('commandBus argument must implement IObservable interface');
×
88

89
                for (const commandType of this.#handles)
8✔
90
                        commandBus.on(commandType, (cmd: ICommand) => this.execute(cmd));
10✔
91
        }
92

93
        /** Restore aggregate from event store events */
94
        async #restoreAggregate(id: Identifier): Promise<TAggregate> {
95
                if (!id)
18!
UNCOV
96
                        throw new TypeError('id argument required');
×
97

98
                const aggregate = this.#aggregateFactory({ id });
18✔
99

100
                const queryOptions = this.#restoresFrom?.length ?
18✔
101
                        { eventTypes: this.#restoresFrom, tail: 'last' } satisfies AggregateEventsQueryParams :
102
                        undefined;
103

104
                const eventsIterable = this.#eventStore.getAggregateEvents(id, queryOptions);
18✔
105

106
                let eventCount = 0;
18✔
107
                for await (const event of eventsIterable) {
18✔
108
                        aggregate.mutate(event);
14✔
109
                        eventCount += 1;
14✔
110
                }
111

112
                this.#logger?.info(`${aggregate} state restored from ${eventCount} event(s)`);
18✔
113

114
                return aggregate;
18✔
115
        }
116

117
        /** Create new aggregate with new Id generated by event store */
118
        async #createAggregate(): Promise<TAggregate> {
119
                const id = await this.#eventStore.getNewId();
16✔
120
                const aggregate = this.#aggregateFactory({ id });
16✔
121
                this.#logger?.info(`${aggregate} created`);
16✔
122

123
                return aggregate;
16✔
124
        }
125

126
        async #getAggregateInstance(aggregateId?: Identifier) {
127
                if (!aggregateId)
38✔
128
                        return this.#createAggregate();
16✔
129
                else
130
                        return this.#aggregatesCache.assert(aggregateId, () => this.#restoreAggregate(aggregateId));
22✔
131
        }
132

133
        /** Pass a command to corresponding aggregate */
134
        async execute(cmd: ICommand): Promise<IEventSet> {
135
                if (!cmd)
38!
NEW
136
                        throw new TypeError('cmd argument required');
×
137
                if (!cmd.type)
38!
NEW
138
                        throw new TypeError('cmd.type argument required');
×
139

140
                // create new or get cached aggregate instance promise
141
                // multiple concurrent calls to #getAggregateInstance will return the same promise
142
                const aggregate = await this.#getAggregateInstance(cmd.aggregateId);
38✔
143

144
                // multiple concurrent commands to a same aggregateId will execute sequentially
145
                const lease = cmd.aggregateId ?
38✔
146
                        await this.#executionLock.acquire(String(cmd.aggregateId)) :
147
                        undefined;
148

149
                try {
38✔
150
                        // pass command to aggregate instance
151
                        const events = await aggregate.handle(cmd);
38✔
152

153
                        this.#logger?.info(`${aggregate} "${cmd.type}" command processed, ${events.length} event(s) produced`);
38✔
154

155
                        if (events.length)
38✔
156
                                await this.#eventStore.dispatch(events);
34✔
157

158
                        return events;
38✔
159
                }
160
                finally {
161
                        lease?.release();
38✔
162
                        if (cmd.aggregateId)
38✔
163
                                this.#aggregatesCache.release(cmd.aggregateId);
22✔
164
                }
165
        }
166
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc