• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

snatalenko / node-cqrs / 22268237579

22 Feb 2026 01:41AM UTC coverage: 86.867% (-7.5%) from 94.396%
22268237579

Pull #28

github

web-flow
Merge 2c467a1d5 into 828e39903
Pull Request #28: TypeScript and event dispatching pipeline refactoring

475 of 691 branches covered (68.74%)

882 of 973 new or added lines in 65 files covered. (90.65%)

49 existing lines in 13 files now uncovered.

1217 of 1401 relevant lines covered (86.87%)

21.53 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

89.87
/src/AggregateCommandHandler.ts
1
import { getClassName, Lock, MapAssertable } from './utils/index.ts';
14✔
2
import { ConcurrencyError } from './errors/index.ts';
14✔
3
import {
14✔
4
        type AggregateEventsQueryParams,
5
        type IAggregate,
6
        type IAggregateConstructor,
7
        type IAggregateFactory,
8
        type ICommand,
9
        type ICommandHandler,
10
        type IContainer,
11
        type Identifier,
12
        type IEventSet,
13
        type IEventStore,
14
        type ILocker,
15
        type ILogger,
16
        type IObservable,
17
        isObservable
18
} from './interfaces/index.ts';
19

20
const DEFAULT_MAX_RETRY_ATTEMPTS = 5;
14✔
21

22
type RetryResolver = (err: unknown, attempt: number) => boolean;
23

24
function normalizeRetryResolver(value: boolean | number | RetryResolver | undefined): RetryResolver {
25
        if (typeof value === 'function')
22✔
26
                return value;
1✔
27
        if (value === false)
21✔
28
                return () => false;
2✔
29
        if (typeof value === 'number')
19✔
30
                return (_err, attempt) => attempt < value;
3✔
31

32
        // undefined or true — default behavior
33
        return (err, attempt) => err instanceof ConcurrencyError && attempt < DEFAULT_MAX_RETRY_ATTEMPTS;
18✔
34
}
35

36
/**
37
 * Aggregate command handler.
38
 *
39
 * Subscribes to event store and awaits aggregate commands.
40
 * Upon command receiving creates an instance of aggregate,
41
 * restores its state, passes command and commits emitted events to event store.
42
 */
43
export class AggregateCommandHandler<TAggregate extends IAggregate> implements ICommandHandler {
14✔
44

45
        readonly #eventStore: IEventStore;
46
        readonly #logger?: ILogger;
47
        readonly #aggregateFactory: IAggregateFactory<TAggregate, any>;
48
        readonly #handles: Readonly<string[]>;
49
        readonly #restoresFrom?: Readonly<string[]>;
50
        readonly #shouldRetry: RetryResolver;
51

52
        /** Aggregate instances cache for concurrent command handling */
53
        #aggregatesCache: MapAssertable<Identifier, Promise<TAggregate>> = new MapAssertable();
22✔
54

55
        /** Lock for sequential aggregate command execution */
56
        #executionLock: ILocker;
57

58
        constructor({
59
                eventStore,
60
                aggregateType,
61
                aggregateFactory,
62
                handles,
63
                executionLocker = new Lock(),
22✔
64
                restoresFrom,
65
                retryOnConcurrencyError,
66
                logger
67
        }: Pick<IContainer, 'eventStore' | 'executionLocker' | 'logger'> & {
68
                aggregateType?: IAggregateConstructor<TAggregate, any>,
69
                aggregateFactory?: IAggregateFactory<TAggregate, any>,
70
                handles?: Readonly<string[]>,
71
                restoresFrom?: Readonly<string[]>,
72
                retryOnConcurrencyError?: boolean | number | RetryResolver
73
        }) {
74
                if (!eventStore)
22!
UNCOV
75
                        throw new TypeError('eventStore argument required');
×
76

77
                this.#eventStore = eventStore;
22✔
78
                this.#executionLock = executionLocker;
22✔
79
                this.#logger = logger && 'child' in logger ?
22!
80
                        logger.child({ service: getClassName(this) }) :
81
                        logger;
82

83
                if (aggregateType) {
22✔
84
                        const AggregateType = aggregateType;
16✔
85
                        this.#aggregateFactory = params => new AggregateType(params);
38✔
86
                        this.#handles = AggregateType.handles;
16✔
87
                        this.#restoresFrom = AggregateType.restoresFrom;
16✔
88
                        this.#shouldRetry = normalizeRetryResolver(retryOnConcurrencyError ??
16✔
89
                                AggregateType.retryOnConcurrencyError);
90
                }
91
                else if (aggregateFactory) {
6!
92
                        if (!Array.isArray(handles) || !handles.length)
6!
UNCOV
93
                                throw new TypeError('handles argument must be an non-empty Array');
×
94

95
                        this.#aggregateFactory = aggregateFactory;
6✔
96
                        this.#handles = handles;
6✔
97
                        this.#restoresFrom = restoresFrom;
6✔
98
                        this.#shouldRetry = normalizeRetryResolver(retryOnConcurrencyError);
6✔
99
                }
100
                else {
101
                        throw new TypeError('either aggregateType or aggregateFactory is required');
×
102
                }
103
        }
104

105
        /** Subscribe to all command types handled by aggregateType */
106
        subscribe(commandBus: IObservable) {
107
                if (!commandBus)
4!
NEW
108
                        throw new TypeError('commandBus argument required');
×
109
                if (!isObservable(commandBus))
4!
NEW
110
                        throw new TypeError('commandBus argument must implement IObservable interface');
×
111

112
                for (const commandType of this.#handles)
4✔
113
                        commandBus.on(commandType, (cmd: ICommand) => this.execute(cmd));
5✔
114
        }
115

116
        /** Restore aggregate from event store events */
117
        async #restoreAggregate(id: Identifier): Promise<TAggregate> {
118
                if (!id)
41!
UNCOV
119
                        throw new TypeError('id argument required');
×
120

121
                const aggregate = this.#aggregateFactory({ id });
41✔
122

123
                const queryOptions = this.#restoresFrom?.length ?
41✔
124
                        { eventTypes: this.#restoresFrom, tail: 'last' } satisfies AggregateEventsQueryParams :
125
                        undefined;
126

127
                const eventsIterable = this.#eventStore.getAggregateEvents(id, queryOptions);
41✔
128

129
                let eventCount = 0;
41✔
130
                for await (const event of eventsIterable) {
41✔
131
                        aggregate.mutate(event);
31✔
132
                        eventCount += 1;
31✔
133
                }
134

135
                this.#logger?.info(`${aggregate} state restored from ${eventCount} event(s)`);
40✔
136

137
                return aggregate;
40✔
138
        }
139

140
        /** Create new aggregate with new Id generated by event store */
141
        async #createAggregate(): Promise<TAggregate> {
142
                const id = await this.#eventStore.getNewId();
8✔
143
                const aggregate = this.#aggregateFactory({ id });
8✔
144
                this.#logger?.info(`${aggregate} created`);
8✔
145

146
                return aggregate;
8✔
147
        }
148

149
        /** Pass a command to corresponding aggregate */
150
        async execute(cmd: ICommand): Promise<IEventSet> {
151
                if (!cmd)
37!
NEW
152
                        throw new TypeError('cmd argument required');
×
153
                if (!cmd.type)
37!
NEW
154
                        throw new TypeError('cmd.type argument required');
×
155

156
                const { aggregateId } = cmd;
37✔
157

158
                // Register interest in the cache entry before acquiring the lock, so concurrent
159
                // callers for the same aggregateId share one restoration promise instead of each
160
                // triggering a separate event-store read.
161
                if (aggregateId)
37✔
162
                        this.#aggregatesCache.assert(aggregateId, () => this.#restoreAggregate(aggregateId));
29✔
163

164
                // Serialize execution per aggregate — commands for the same id queue here.
165
                const lease = aggregateId ?
37✔
166
                        await this.#executionLock.acquire(String(aggregateId)) :
167
                        undefined;
168

169
                try {
37✔
170
                        for (let attempt = 0; ; attempt++) {
37✔
171
                                if (attempt > 0)
47✔
172
                                        this.#logger?.info(`retrying "${cmd.type}" command on aggregate ${aggregateId}, attempt ${attempt + 1}`);
10✔
173

174
                                // Read the current cache entry after acquiring the lock. On the first attempt
175
                                // this is the pre-warmed (possibly shared) instance; on retries it is the
176
                                // fresh instance placed into the cache by the error handler below.
177
                                const aggregate = aggregateId ?
47✔
178
                                        await this.#aggregatesCache.get(aggregateId)! :
179
                                        await this.#createAggregate();
180

181
                                try {
47✔
182
                                        const events = await aggregate.handle(cmd);
47✔
183

184
                                        this.#logger?.info(`${aggregate} "${cmd.type}" command processed, ${events.length} event(s) produced`);
47✔
185

186
                                        if (events.length)
47✔
187
                                                await this.#eventStore.dispatch(events);
45✔
188

189
                                        return events;
31✔
190
                                }
191
                                catch (err: unknown) {
192
                                        // The aggregate is now dirty (mutated by handle). Replace the cache entry
193
                                        // with a fresh restoration promise so both the retry below and any commands
194
                                        // queued on the lock start from a clean state.
195
                                        if (aggregateId)
16✔
196
                                                this.#aggregatesCache.set(aggregateId, this.#restoreAggregate(aggregateId));
16✔
197

198
                                        if (!this.#shouldRetry(err, attempt))
16✔
199
                                                throw err;
6✔
200
                                }
201
                        }
202
                }
203
                finally {
204
                        lease?.release();
37✔
205

206
                        // Decrement the usage counter registered above; deletes the entry when
207
                        // the last concurrent caller for this aggregateId is done.
208
                        if (aggregateId)
37✔
209
                                this.#aggregatesCache.release(aggregateId);
29✔
210
                }
211
        }
212
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc