• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

snatalenko / node-cqrs / 21875627364

10 Feb 2026 05:34PM UTC coverage: 84.658% (-9.7%) from 94.396%
21875627364

Pull #28

github

web-flow
Merge f3844c4de into 828e39903
Pull Request #28: TypeScript and event dispatching pipeline refactoring

626 of 953 branches covered (65.69%)

832 of 947 new or added lines in 65 files covered. (87.86%)

59 existing lines in 13 files now uncovered.

1225 of 1447 relevant lines covered (84.66%)

28.81 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

86.89
/src/AggregateCommandHandler.ts
1
import { getClassName, Lock, MapAssertable } from './utils/index.ts';
22✔
2
import {
22✔
3
        type AggregateEventsQueryParams,
4
        type IAggregate,
5
        type IAggregateConstructor,
6
        type IAggregateFactory,
7
        type ICommand,
8
        type ICommandHandler,
9
        type IContainer,
10
        type Identifier,
11
        type IEventSet,
12
        type IEventStore,
13
        type ILogger,
14
        type IObservable,
15
        isIObservable
16
} from './interfaces/index.ts';
17

18
/**
19
 * Aggregate command handler.
20
 *
21
 * Subscribes to event store and awaits aggregate commands.
22
 * Upon command receiving creates an instance of aggregate,
23
 * restores its state, passes command and commits emitted events to event store.
24
 */
25
export class AggregateCommandHandler<TAggregate extends IAggregate> implements ICommandHandler {
22✔
26

27
        readonly #eventStore: IEventStore;
28
        readonly #logger?: ILogger;
29
        readonly #aggregateFactory: IAggregateFactory<TAggregate, any>;
30
        readonly #handles: Readonly<string[]>;
31
        readonly #restoresFrom?: Readonly<string[]>;
32

33
        /** Aggregate instances cache for concurrent command handling */
34
        #aggregatesCache: MapAssertable<Identifier, Promise<TAggregate>> = new MapAssertable();
26✔
35

36
        /** Lock for sequential aggregate command execution */
37
        #executionLock = new Lock();
26✔
38

39
        constructor({
40
                eventStore,
41
                aggregateType,
42
                aggregateFactory,
43
                handles,
44
                restoresFrom,
45
                logger
46
        }: Pick<IContainer, 'eventStore' | 'logger'> & {
47
                aggregateType?: IAggregateConstructor<TAggregate, any>,
48
                aggregateFactory?: IAggregateFactory<TAggregate, any>,
49
                handles?: Readonly<string[]>,
50
                restoresFrom?: Readonly<string[]>
51
        }) {
52
                if (!eventStore)
26!
UNCOV
53
                        throw new TypeError('eventStore argument required');
×
54

55
                this.#eventStore = eventStore;
26✔
56
                this.#logger = logger && 'child' in logger ?
26!
57
                        logger.child({ service: getClassName(this) }) :
58
                        logger;
59

60
                if (aggregateType) {
26✔
61
                        const AggregateType = aggregateType;
16✔
62
                        this.#aggregateFactory = params => new AggregateType(params);
16✔
63
                        this.#handles = AggregateType.handles;
16✔
64
                        this.#restoresFrom = AggregateType.restoresFrom;
16✔
65
                }
66
                else if (aggregateFactory) {
10!
67
                        if (!Array.isArray(handles) || !handles.length)
10✔
UNCOV
68
                                throw new TypeError('handles argument must be an non-empty Array');
×
69

70
                        this.#aggregateFactory = aggregateFactory;
10✔
71
                        this.#handles = handles;
10✔
72
                        this.#restoresFrom = restoresFrom;
10✔
73
                }
74
                else {
75
                        throw new TypeError('either aggregateType or aggregateFactory is required');
×
76
                }
77
        }
78

79
        /** Subscribe to all command types handled by aggregateType */
80
        subscribe(commandBus: IObservable) {
81
                if (!commandBus)
6!
NEW
82
                        throw new TypeError('commandBus argument required');
×
83
                if (!isIObservable(commandBus))
6!
NEW
84
                        throw new TypeError('commandBus argument must implement IObservable interface');
×
85

86
                for (const commandType of this.#handles)
6✔
87
                        commandBus.on(commandType, (cmd: ICommand) => this.execute(cmd));
8✔
88
        }
89

90
        /** Restore aggregate from event store events */
91
        async #restoreAggregate(id: Identifier): Promise<TAggregate> {
92
                if (!id)
18!
UNCOV
93
                        throw new TypeError('id argument required');
×
94

95
                const aggregate = this.#aggregateFactory({ id });
18✔
96

97
                const queryOptions = this.#restoresFrom?.length ?
18✔
98
                        { eventTypes: this.#restoresFrom, tail: 'last' } satisfies AggregateEventsQueryParams :
99
                        undefined;
100

101
                const eventsIterable = this.#eventStore.getAggregateEvents(id, queryOptions);
18✔
102

103
                let eventCount = 0;
18✔
104
                for await (const event of eventsIterable) {
18✔
105
                        aggregate.mutate(event);
14✔
106
                        eventCount += 1;
14✔
107
                }
108

109
                this.#logger?.info(`${aggregate} state restored from ${eventCount} event(s)`);
18✔
110

111
                return aggregate;
18✔
112
        }
113

114
        /** Create new aggregate with new Id generated by event store */
115
        async #createAggregate(): Promise<TAggregate> {
116
                const id = await this.#eventStore.getNewId();
14✔
117
                const aggregate = this.#aggregateFactory({ id });
14✔
118
                this.#logger?.info(`${aggregate} created`);
14✔
119

120
                return aggregate;
14✔
121
        }
122

123
        async #getAggregateInstance(aggregateId?: Identifier) {
124
                if (!aggregateId)
36✔
125
                        return this.#createAggregate();
14✔
126
                else
127
                        return this.#aggregatesCache.assert(aggregateId, () => this.#restoreAggregate(aggregateId));
22✔
128
        }
129

130
        /** Pass a command to corresponding aggregate */
131
        async execute(cmd: ICommand): Promise<IEventSet> {
132
                if (!cmd)
36!
NEW
133
                        throw new TypeError('cmd argument required');
×
134
                if (!cmd.type)
36!
NEW
135
                        throw new TypeError('cmd.type argument required');
×
136

137
                // create new or get cached aggregate instance promise
138
                // multiple concurrent calls to #getAggregateInstance will return the same promise
139
                const aggregate = await this.#getAggregateInstance(cmd.aggregateId);
36✔
140

141
                try {
36✔
142
                        // multiple concurrent commands to a same aggregateId will execute sequentially
143
                        if (cmd.aggregateId)
36✔
144
                                await this.#executionLock.acquire(String(cmd.aggregateId));
22✔
145

146
                        // pass command to aggregate instance
147
                        const events = await aggregate.handle(cmd);
36✔
148

149
                        this.#logger?.info(`${aggregate} "${cmd.type}" command processed, ${events.length} event(s) produced`);
36✔
150

151
                        if (events.length)
36✔
152
                                await this.#eventStore.dispatch(events);
32✔
153

154
                        return events;
36✔
155
                }
156
                finally {
157
                        if (cmd.aggregateId) {
36✔
158
                                this.#executionLock.release(String(cmd.aggregateId));
22✔
159
                                this.#aggregatesCache.release(cmd.aggregateId);
22✔
160
                        }
161
                }
162
        }
163
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc