• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

snatalenko / node-cqrs / 22745360142

06 Mar 2026 01:50AM UTC coverage: 95.287% (+0.9%) from 94.396%
22745360142

Pull #28

github

web-flow
Merge 15ef847b4 into 828e39903
Pull Request #28: TypeScript and event dispatching pipeline refactoring

428 of 528 branches covered (81.06%)

1043 of 1091 new or added lines in 65 files covered. (95.6%)

3 existing lines in 2 files now uncovered.

1294 of 1358 relevant lines covered (95.29%)

31.11 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

97.73
/src/AggregateCommandHandler.ts
1
import {
14✔
2
        assertBoolean, assertDefined, assertMessage, assertNonNegativeInteger, assertObservable, assertStringArray,
3
        Lock, MapAssertable
4
} from './utils/index.ts';
5
import { ConcurrencyError } from './errors/index.ts';
14✔
6
import type {
7
        AggregateEventsQueryParams,
8
        IAggregate,
9
        IAggregateConstructor,
10
        IAggregateFactory,
11
        ICommand,
12
        ICommandHandler,
13
        IContainer,
14
        Identifier,
15
        IEventSet,
16
        IEventStore,
17
        ILocker,
18
        ILogger,
19
        IObservable,
20
        RetryOnConcurrencyErrorDecision,
21
        RetryOnConcurrencyErrorOptions,
22
        RetryOnConcurrencyErrorResolver
23
} from './interfaces/index.ts';
24
import { isObject } from './interfaces/isObject.ts';
14✔
25

26
const DEFAULT_MAX_RETRY_ATTEMPTS = 5;
14✔
27

28
function normalizeRetryResolver(value?: RetryOnConcurrencyErrorOptions): RetryOnConcurrencyErrorResolver {
29
        if (typeof value === 'function')
29✔
30
                return value;
2✔
31
        if (value === false)
27✔
32
                return () => false;
2✔
33
        if (value === 'ignore')
25✔
34
                return () => 'ignore';
2✔
35
        if (typeof value === 'number')
23✔
36
                return (err, events, attempt) => err instanceof ConcurrencyError && attempt < value;
3✔
37

38
        if (isObject(value)) {
22✔
39
                const { maxRetries = DEFAULT_MAX_RETRY_ATTEMPTS, ignoreAfterMaxRetries = false } = value;
4!
40
                assertNonNegativeInteger(maxRetries, 'retryOnConcurrencyError.maxRetries');
4✔
41
                assertBoolean(ignoreAfterMaxRetries, 'retryOnConcurrencyError.ignoreAfterMaxRetries');
3✔
42

43
                return (err, events, attempt): RetryOnConcurrencyErrorDecision => {
3✔
44
                        if (!(err instanceof ConcurrencyError))
6!
NEW
45
                                return false;
×
46

47
                        if (attempt < maxRetries)
6✔
48
                                return true;
3✔
49

50
                        return ignoreAfterMaxRetries ? 'ignore' : false;
3✔
51
                };
52
        }
53

54
        // undefined or true — default behavior
55
        return (err, events, attempt) =>
18✔
56
                err instanceof ConcurrencyError && attempt < DEFAULT_MAX_RETRY_ATTEMPTS;
9✔
57
}
58

59
/**
60
 * Aggregate command handler.
61
 *
62
 * Subscribes to event store and awaits aggregate commands.
63
 * Upon command receiving creates an instance of aggregate,
64
 * restores its state, passes command and commits emitted events to event store.
65
 */
66
export class AggregateCommandHandler<TAggregate extends IAggregate> implements ICommandHandler {
14✔
67

68
        readonly #eventStore: IEventStore;
69
        readonly #logger?: ILogger;
70
        readonly #aggregateFactory: IAggregateFactory<TAggregate, any>;
71
        readonly #handles: Readonly<string[]>;
72
        readonly #restoresFrom?: Readonly<string[]>;
73
        readonly #shouldRetry: RetryOnConcurrencyErrorResolver;
74

75
        /** Aggregate instances cache for concurrent command handling */
76
        #aggregatesCache: MapAssertable<Identifier, Promise<TAggregate>> = new MapAssertable();
29✔
77

78
        /** Lock for sequential aggregate command execution */
79
        #executionLock: ILocker;
80

81
        constructor({
82
                eventStore,
83
                aggregateType,
84
                aggregateFactory,
85
                handles,
86
                executionLocker = new Lock(),
29✔
87
                restoresFrom,
88
                retryOnConcurrencyError,
89
                logger
90
        }: Pick<IContainer, 'eventStore' | 'executionLocker' | 'logger'> & {
91
                aggregateType?: IAggregateConstructor<TAggregate, any>,
92
                aggregateFactory?: IAggregateFactory<TAggregate, any>,
93
                handles?: Readonly<string[]>,
94
                restoresFrom?: Readonly<string[]>,
95
                retryOnConcurrencyError?: RetryOnConcurrencyErrorOptions
96
        }) {
97
                assertDefined(eventStore, 'eventStore');
29✔
98

99
                this.#eventStore = eventStore;
29✔
100
                this.#executionLock = executionLocker;
29✔
101
                this.#logger = logger && 'child' in logger ?
29!
102
                        logger.child({ service: new.target.name }) :
103
                        logger;
104

105
                if (aggregateType) {
29✔
106
                        const AggregateType = aggregateType;
21✔
107
                        this.#aggregateFactory = params => new AggregateType(params);
54✔
108
                        this.#handles = AggregateType.handles;
21✔
109
                        this.#restoresFrom = AggregateType.restoresFrom;
21✔
110
                        this.#shouldRetry = normalizeRetryResolver(retryOnConcurrencyError ??
21✔
111
                                AggregateType.retryOnConcurrencyError);
112
                }
113
                else if (aggregateFactory) {
8!
114
                        assertStringArray(handles, 'handles');
8✔
115

116
                        this.#aggregateFactory = aggregateFactory;
8✔
117
                        this.#handles = handles;
8✔
118
                        this.#restoresFrom = restoresFrom;
8✔
119
                        this.#shouldRetry = normalizeRetryResolver(retryOnConcurrencyError);
8✔
120
                }
121
                else {
122
                        throw new TypeError('either aggregateType or aggregateFactory is required');
×
123
                }
124
        }
125

126
        /** Subscribe to all command types handled by aggregateType */
127
        subscribe(commandBus: IObservable) {
128
                assertObservable(commandBus, 'commandBus');
5✔
129

130
                for (const commandType of this.#handles)
5✔
131
                        commandBus.on(commandType, (cmd: ICommand) => this.execute(cmd));
6✔
132
        }
133

134
        /** Restore aggregate from event store events */
135
        async #restoreAggregate(id: Identifier): Promise<TAggregate> {
136
                assertDefined(id, 'id');
60✔
137

138
                const aggregate = this.#aggregateFactory({ id });
60✔
139

140
                const queryOptions = this.#restoresFrom?.length ?
60✔
141
                        { eventTypes: this.#restoresFrom, tail: 'last' } satisfies AggregateEventsQueryParams :
142
                        undefined;
143

144
                const eventsIterable = this.#eventStore.getAggregateEvents(id, queryOptions);
60✔
145

146
                let eventCount = 0;
60✔
147
                for await (const event of eventsIterable) {
60✔
148
                        aggregate.mutate(event);
46✔
149
                        eventCount += 1;
46✔
150
                }
151

152
                this.#logger?.info(`${aggregate} state restored from ${eventCount} event(s)`);
60✔
153

154
                return aggregate;
60✔
155
        }
156

157
        /** Create new aggregate with new Id generated by event store */
158
        async #createAggregate(): Promise<TAggregate> {
159
                const id = await this.#eventStore.getNewId();
9✔
160
                const aggregate = this.#aggregateFactory({ id });
9✔
161
                this.#logger?.info(`${aggregate} created`);
9✔
162

163
                return aggregate;
9✔
164
        }
165

166
        /** Pass a command to corresponding aggregate */
167
        async execute(cmd: ICommand): Promise<IEventSet> {
168
                assertMessage(cmd, 'cmd');
48✔
169

170
                const { aggregateId } = cmd;
48✔
171

172
                // Register interest in the cache entry before acquiring the lock, so concurrent
173
                // callers for the same aggregateId share one restoration promise instead of each
174
                // triggering a separate event-store read.
175
                if (aggregateId)
48✔
176
                        this.#aggregatesCache.assert(aggregateId, () => this.#restoreAggregate(aggregateId));
39✔
177

178
                // Serialize execution per aggregate — commands for the same id queue here.
179
                const lease = aggregateId ?
48✔
180
                        await this.#executionLock.acquire(String(aggregateId)) :
181
                        undefined;
182

183
                try {
48✔
184
                        for (let attempt = 0; ; attempt++) {
48✔
185
                                if (attempt > 0)
62✔
186
                                        this.#logger?.warn(`retrying "${cmd.type}" command on aggregate ${aggregateId}, attempt ${attempt + 1}`);
14✔
187

188
                                // Read the current cache entry after acquiring the lock. On the first attempt
189
                                // this is the pre-warmed (possibly shared) instance; on retries it is the
190
                                // fresh instance placed into the cache by the error handler below.
191
                                const aggregate = aggregateId ?
62✔
192
                                        await this.#aggregatesCache.get(aggregateId)! :
193
                                        await this.#createAggregate();
194

195
                                let events: IEventSet | undefined;
196
                                try {
62✔
197
                                        events = await aggregate.handle(cmd);
62✔
198

199
                                        this.#logger?.info(`${aggregate} "${cmd.type}" command processed, ${events.length} event(s) produced`);
62✔
200

201
                                        if (events.length)
62✔
202
                                                await this.#eventStore.dispatch(events);
60✔
203

204
                                        return events;
36✔
205
                                }
206
                                catch (err: unknown) {
207
                                        // The aggregate is now dirty (mutated by handle). Replace the cache entry
208
                                        // with a fresh restoration promise so both the retry below and any commands
209
                                        // queued on the lock start from a clean state.
210
                                        if (aggregateId)
26✔
211
                                                this.#aggregatesCache.set(aggregateId, this.#restoreAggregate(aggregateId));
25✔
212

213
                                        const retryDecision = this.#shouldRetry(err, events, attempt);
26✔
214
                                        if (!retryDecision)
26✔
215
                                                throw err;
7✔
216

217
                                        if (retryDecision === 'ignore' && events?.length) {
19✔
218
                                                this.#logger?.warn(`"${cmd.type}" concurrency error ignored after ${attempt + 1} attempt(s), force-dispatching`);
5✔
219
                                                await this.#eventStore.dispatch(events, { ignoreConcurrencyError: true });
5✔
220
                                                return events;
5✔
221
                                        }
222
                                }
223
                        }
224
                }
225
                finally {
226
                        lease?.release();
48✔
227

228
                        // Decrement the usage counter registered above; deletes the entry when
229
                        // the last concurrent caller for this aggregateId is done.
230
                        if (aggregateId)
48✔
231
                                this.#aggregatesCache.release(aggregateId);
39✔
232
                }
233
        }
234
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc