• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

snatalenko / node-cqrs / 22745360142

06 Mar 2026 01:50AM UTC coverage: 95.287% (+0.9%) from 94.396%
22745360142

Pull #28

github

web-flow
Merge 15ef847b4 into 828e39903
Pull Request #28: TypeScript and event dispatching pipeline refactoring

428 of 528 branches covered (81.06%)

1043 of 1091 new or added lines in 65 files covered. (95.6%)

3 existing lines in 2 files now uncovered.

1294 of 1358 relevant lines covered (95.29%)

31.11 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

97.47
/src/AbstractAggregate.ts
1
import { AggregateCommandHandler } from './AggregateCommandHandler.ts';
14✔
2
import {
14✔
3
        type IAggregate,
4
        type IMutableState,
5
        type ICommand,
6
        type Identifier,
7
        type IEvent,
8
        type IEventSet,
9
        type IAggregateConstructorParams,
10
        type ISnapshotEvent,
11
        type IAggregateConstructor,
12
        type IEventStore,
13
        type ICommandBus,
14
        type RetryOnConcurrencyErrorOptions,
15
        SNAPSHOT_EVENT_TYPE
16
} from './interfaces/index.ts';
17

18
import {
14✔
19
        getClassName,
20
        validateHandlers,
21
        getHandler,
22
        getMessageHandlerNames,
23
        clone,
24
        assertDefined,
25
        assertString,
26
        assertMessage,
27
        assertSnapshotEvent,
28
        assertNumber,
29
        assertObject,
30
        assertOptionalArray
31
} from './utils/index.ts';
32

33
/**
34
 * Base class for Aggregate definition
35
 */
36
export abstract class AbstractAggregate<TState extends IMutableState | object | void = void> implements
14✔
37
        IAggregate {
38

39
        /**
40
         * List of command names handled by the Aggregate.
41
         *
42
         * Can be overridden in the Aggregate implementation to explicitly define supported commands.
43
         * If not overridden, all public methods will be treated as command handlers by default.
44
         *
45
         * @example ['createUser', 'changePassword'];
46
         */
47
        static get handles(): string[] {
48
                return getMessageHandlerNames(this);
8✔
49
        }
50

51
        /**
52
         * Optional list of event types that are required to restore the aggregate state.
53
         *
54
         * @see IAggregateConstructor
55
         */
56
        static get restoresFrom(): Readonly<string[]> | undefined {
57
                return undefined;
23✔
58
        }
59

60
        /**
61
         * Defines retry behavior when a ConcurrencyError is thrown during event dispatch.
62
         *
63
         * @see IAggregateConstructor
64
         */
65
        static get retryOnConcurrencyError(): RetryOnConcurrencyErrorOptions | undefined {
66
                return undefined;
15✔
67
        }
68

69
        /**
70
         * Convenience helper to create an `AggregateCommandHandler` for this aggregate type and
71
         * subscribe it to the provided `commandBus`.
72
         */
73
        static register<T extends AbstractAggregate, S extends IMutableState | object | void>(
74
                this: IAggregateConstructor<T, S> & (new (options: IAggregateConstructorParams<S>) => T),
75
                eventStore: IEventStore,
76
                commandBus: ICommandBus
77
        ): AggregateCommandHandler<T> {
78
                const handler = new AggregateCommandHandler({ aggregateType: this, eventStore });
1✔
79
                handler.subscribe(commandBus);
1✔
80
                return handler;
1✔
81
        }
82

83
        #id: Identifier;
84
        #version: number = 0;
106✔
85
        #snapshotVersion: number | undefined;
86

87
        /** List of emitted events */
88
        protected changes: IEvent[] = [];
106✔
89

90
        /** Internal aggregate state */
91
        protected state: TState | undefined;
92

93
        /** Command being handled by aggregate */
94
        protected command?: ICommand;
95

96
        /** Unique aggregate instance identifier */
97
        get id(): Identifier {
98
                return this.#id;
82✔
99
        }
100

101
        /** Aggregate instance version */
102
        get version(): number {
103
                return this.#version;
105✔
104
        }
105

106
        /** Restored snapshot version */
107
        get snapshotVersion(): number | undefined {
108
                return this.#snapshotVersion;
12✔
109
        }
110

111
        /**
112
         * Override to define whether an aggregate state snapshot should be taken
113
         *
114
         * @example
115
         *   // create snapshot every 50 events if new events were emitted
116
         *   return !!this.changes.length
117
         *     && this.version - (this.snapshotVersion ?? 0) > 50;
118
         */
119
        // eslint-disable-next-line class-methods-use-this
120
        protected get shouldTakeSnapshot(): boolean {
121
                return false;
64✔
122
        }
123

124
        constructor(options: IAggregateConstructorParams<TState>) {
125
                const { id, state, events } = options;
106✔
126
                assertDefined(id, 'id');
106✔
127

128
                if (state)
106✔
129
                        assertObject(state, 'state');
100✔
130
                if (events)
106✔
131
                        assertOptionalArray(events, 'events');
1✔
132

133
                this.#id = id;
106✔
134

135
                validateHandlers(this);
106✔
136

137
                if (state)
105✔
138
                        this.state = state;
100✔
139

140
                if (events)
105✔
141
                        events.forEach(event => this.mutate(event));
3✔
142
        }
143

144
        /** Mutate aggregate state and increment aggregate version */
145
        mutate(event: IEvent) {
146
                if (event.aggregateVersion !== undefined)
137✔
147
                        this.#version = event.aggregateVersion;
128✔
148

149
                if (event.type === SNAPSHOT_EVENT_TYPE) {
137✔
150
                        this.#snapshotVersion = event.aggregateVersion;
8✔
151
                        this.restoreSnapshot(event as ISnapshotEvent<TState>);
8✔
152
                }
153
                else if (this.state) {
129✔
154
                        const handler = 'mutate' in this.state ?
127✔
155
                                this.state.mutate :
156
                                getHandler(this.state, event.type);
157
                        if (handler)
127✔
158
                                handler.call(this.state, event);
23✔
159
                }
160

161
                this.#version += 1;
137✔
162
        }
163

164
        /** Pass command to command handler */
165
        async handle(command: ICommand): Promise<IEventSet> {
166
                assertMessage(command, 'command');
75✔
167

168
                const handler = getHandler(this, command.type);
75✔
169
                if (!handler)
75✔
170
                        throw new Error(`'${command.type}' handler is not defined or not a function`);
1✔
171

172
                if (this.command)
74✔
173
                        throw new Error('Another command is being processed');
1✔
174

175
                this.command = command;
73✔
176
                const eventsOffset = this.changes.length;
73✔
177

178
                try {
73✔
179
                        await handler.call(this, command.payload, command.context);
73✔
180

181
                        return this.getUncommittedEvents(eventsOffset);
73✔
182
                }
183
                finally {
184
                        this.command = undefined;
73✔
185
                }
186
        }
187

188
        /**
189
         * Get the events emitted during commands processing.
190
         * If a snapshot should be taken, the snapshot event is added to the end.
191
         */
192
        protected getUncommittedEvents(offset?: number): IEventSet {
193
                if (this.shouldTakeSnapshot)
73✔
194
                        this.takeSnapshot();
5✔
195

196
                return this.changes.slice(offset);
73✔
197
        }
198

199
        /** Format and register aggregate event and mutate aggregate state */
200
        protected emit(type: string): IEvent<void>;
201
        protected emit<TPayload>(type: string, payload: TPayload): IEvent<TPayload>;
202
        protected emit<TPayload>(type: string, payload?: TPayload): IEvent<TPayload> {
203
                assertString(type, 'type');
80✔
204

205
                const event = this.makeEvent<TPayload>(type, payload as TPayload, this.command);
80✔
206

207
                this.emitRaw(event);
80✔
208

209
                return event;
80✔
210
        }
211

212
        /** Format event based on a current aggregate state and a command being executed */
213
        protected makeEvent<TPayload>(type: string, payload: TPayload, sourceCommand?: ICommand): IEvent<TPayload> {
214
                const event: IEvent<TPayload> = {
80✔
215
                        aggregateId: this.id,
216
                        aggregateVersion: this.version,
217
                        type,
218
                        payload
219
                };
220

221
                if (sourceCommand) {
80✔
222
                        // augment event with command context
223
                        const { context, sagaOrigins } = sourceCommand;
76✔
224
                        if (context !== undefined)
76✔
225
                                event.context = context;
1✔
226
                        if (sagaOrigins !== undefined)
76✔
227
                                event.sagaOrigins = { ...sagaOrigins };
1✔
228
                }
229

230
                return event;
80✔
231
        }
232

233
        /** Register aggregate event and mutate aggregate state */
234
        protected emitRaw<TPayload>(event: IEvent<TPayload>): void {
235
                assertDefined(event?.aggregateId, 'event.aggregateId');
80✔
236
                assertNumber(event?.aggregateVersion, 'event.aggregateVersion');
80✔
237

238
                this.mutate(event);
80✔
239

240
                this.changes.push(event);
80✔
241
        }
242

243
        /** Create an aggregate state snapshot */
244
        protected makeSnapshot(): any {
245
                if (!this.state)
5!
UNCOV
246
                        throw new Error('state property is empty, either define state or override makeSnapshot method');
×
247

248
                return clone(this.state);
5✔
249
        }
250

251
        /** Add snapshot event to the collection of emitted events */
252
        protected takeSnapshot() {
253
                const snapshotEvent = this.emit(SNAPSHOT_EVENT_TYPE, this.makeSnapshot());
5✔
254
                this.#snapshotVersion = snapshotEvent.aggregateVersion;
5✔
255
        }
256

257
        /** Restore aggregate state from a snapshot */
258
        protected restoreSnapshot(snapshotEvent: ISnapshotEvent<TState>) {
259
                assertSnapshotEvent(snapshotEvent, 'snapshotEvent');
14✔
260

261
                if (!this.state)
10!
UNCOV
262
                        throw new Error('state property is empty, either defined state or override restoreSnapshot method');
×
263

264
                Object.assign(this.state, clone(snapshotEvent.payload));
10✔
265
        }
266

267
        /** Get human-readable aggregate identifier */
268
        toString(): string {
269
                return `${getClassName(this)} ${this.id} (v${this.version})`;
1✔
270
        }
271
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc