• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

snatalenko / node-cqrs / 21966362039

12 Feb 2026 10:15PM UTC coverage: 85.328% (-9.1%) from 94.396%
21966362039

Pull #28

github

web-flow
Merge 9adf24177 into 828e39903
Pull Request #28: TypeScript and event dispatching pipeline refactoring

671 of 1008 branches covered (66.57%)

927 of 1051 new or added lines in 67 files covered. (88.2%)

49 existing lines in 13 files now uncovered.

1262 of 1479 relevant lines covered (85.33%)

33.76 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

86.52
/src/AbstractAggregate.ts
1
import { AggregateCommandHandler } from './AggregateCommandHandler.ts';
28✔
2
import {
28✔
3
        type IAggregate,
4
        type IMutableState,
5
        type ICommand,
6
        type Identifier,
7
        type IEvent,
8
        type IEventSet,
9
        type IAggregateConstructorParams,
10
        type ISnapshotEvent,
11
        type IAggregateConstructor,
12
        type IEventStore,
13
        type ICommandBus,
14
        SNAPSHOT_EVENT_TYPE,
15
        isSnapshotEvent,
16
        isEvent
17
} from './interfaces/index.ts';
18

19
import {
28✔
20
        getClassName,
21
        validateHandlers,
22
        getHandler,
23
        getMessageHandlerNames,
24
        clone
25
} from './utils/index.ts';
26

27
/**
28
 * Base class for Aggregate definition
29
 */
30
export abstract class AbstractAggregate<TState extends IMutableState | object | void = void> implements
28✔
31
        IAggregate {
32

33
        /**
34
         * List of command names handled by the Aggregate.
35
         *
36
         * Can be overridden in the Aggregate implementation to explicitly define supported commands.
37
         * If not overridden, all public methods will be treated as command handlers by default.
38
         *
39
         * @example ['createUser', 'changePassword'];
40
         */
41
        static get handles(): string[] {
42
                return getMessageHandlerNames(this);
12✔
43
        }
44

45
        /**
46
         * Convenience helper to create an `AggregateCommandHandler` for this aggregate type and
47
         * subscribe it to the provided `commandBus`.
48
         */
49
        static register<T extends AbstractAggregate, S extends IMutableState | object | void>(
50
                this: IAggregateConstructor<T, S> & (new (options: IAggregateConstructorParams<S>) => T),
51
                eventStore: IEventStore,
52
                commandBus: ICommandBus
53
        ): AggregateCommandHandler<T> {
54
                const handler = new AggregateCommandHandler({ aggregateType: this, eventStore });
2✔
55
                handler.subscribe(commandBus);
2✔
56
                return handler;
2✔
57
        }
58

59
        #id: Identifier;
60
        #version: number = 0;
106✔
61
        #snapshotVersion: number | undefined;
62

63
        /** List of emitted events */
64
        protected changes: IEvent[] = [];
106✔
65

66
        /** Internal aggregate state */
67
        protected state: TState | undefined;
68

69
        /** Command being handled by aggregate */
70
        protected command?: ICommand;
71

72
        /** Unique aggregate instance identifier */
73
        get id(): Identifier {
74
                return this.#id;
76✔
75
        }
76

77
        /** Aggregate instance version */
78
        get version(): number {
79
                return this.#version;
122✔
80
        }
81

82
        /** Restored snapshot version */
83
        get snapshotVersion(): number | undefined {
84
                return this.#snapshotVersion;
24✔
85
        }
86

87
        /**
88
         * Override to define whether an aggregate state snapshot should be taken
89
         *
90
         * @example
91
         *   // create snapshot every 50 events if new events were emitted
92
         *   return !!this.changes.length
93
         *     && this.version - (this.snapshotVersion ?? 0) > 50;
94
         */
95
        // eslint-disable-next-line class-methods-use-this
96
        protected get shouldTakeSnapshot(): boolean {
97
                return false;
42✔
98
        }
99

100
        constructor(options: IAggregateConstructorParams<TState>) {
101
                const { id, state, events } = options;
106✔
102
                if (!id)
106!
UNCOV
103
                        throw new TypeError('id argument required');
×
104
                if (state && typeof state !== 'object')
106!
UNCOV
105
                        throw new TypeError('state argument, when provided, must be an Object');
×
106
                if (events && !Array.isArray(events))
106✔
UNCOV
107
                        throw new TypeError('events argument, when provided, must be an Array');
×
108

109
                this.#id = id;
106✔
110

111
                validateHandlers(this);
106✔
112

113
                if (state)
104✔
114
                        this.state = state;
96✔
115

116
                if (events)
104✔
117
                        events.forEach(event => this.mutate(event));
6✔
118
        }
119

120
        /** Mutate aggregate state and increment aggregate version */
121
        mutate(event: IEvent) {
122
                if (event.aggregateVersion !== undefined)
110✔
123
                        this.#version = event.aggregateVersion;
92✔
124

125
                if (event.type === SNAPSHOT_EVENT_TYPE) {
110✔
126
                        this.#snapshotVersion = event.aggregateVersion;
16✔
127
                        this.restoreSnapshot(event as ISnapshotEvent<TState>);
16✔
128
                }
129
                else if (this.state) {
94✔
130
                        const handler = 'mutate' in this.state ?
92✔
131
                                this.state.mutate :
132
                                getHandler(this.state, event.type);
133
                        if (handler)
92✔
134
                                handler.call(this.state, event);
46✔
135
                }
136

137
                this.#version += 1;
110✔
138
        }
139

140
        /** Pass command to command handler */
141
        async handle(command: ICommand): Promise<IEventSet> {
142
                if (!command)
64!
NEW
143
                        throw new TypeError('command argument required');
×
144
                if (!command.type)
64!
NEW
145
                        throw new TypeError('command.type argument required');
×
146

147
                const handler = getHandler(this, command.type);
64✔
148
                if (!handler)
64✔
149
                        throw new Error(`'${command.type}' handler is not defined or not a function`);
2✔
150

151
                if (this.command)
62✔
152
                        throw new Error('Another command is being processed');
2✔
153

154
                this.command = command;
60✔
155
                const eventsOffset = this.changes.length;
60✔
156

157
                try {
60✔
158
                        await handler.call(this, command.payload, command.context);
60✔
159

160
                        return this.getUncommittedEvents(eventsOffset);
60✔
161
                }
162
                finally {
163
                        this.command = undefined;
60✔
164
                }
165
        }
166

167
        /**
168
         * Get the events emitted during commands processing.
169
         * If a snapshot should be taken, the snapshot event is added to the end.
170
         */
171
        protected getUncommittedEvents(offset?: number): IEventSet {
172
                if (this.shouldTakeSnapshot)
60✔
173
                        this.takeSnapshot();
10✔
174

175
                return this.changes.slice(offset);
60✔
176
        }
177

178
        /** Format and register aggregate event and mutate aggregate state */
179
        protected emit(type: string): IEvent<void>;
180
        protected emit<TPayload>(type: string, payload: TPayload): IEvent<TPayload>;
181
        protected emit<TPayload>(type: string, payload?: TPayload): IEvent<TPayload> {
182
                if (typeof type !== 'string' || !type.length)
74!
UNCOV
183
                        throw new TypeError('type argument must be a non-empty string');
×
184

185
                const event = this.makeEvent<TPayload>(type, payload as TPayload, this.command);
74✔
186

187
                this.emitRaw(event);
74✔
188

189
                return event;
74✔
190
        }
191

192
        /** Format event based on a current aggregate state and a command being executed */
193
        protected makeEvent<TPayload>(type: string, payload: TPayload, sourceCommand?: ICommand): IEvent<TPayload> {
194
                const event: IEvent<TPayload> = {
74✔
195
                        aggregateId: this.id,
196
                        aggregateVersion: this.version,
197
                        type,
198
                        payload
199
                };
200

201
                if (sourceCommand) {
74✔
202
                        // augment event with command context
203
                        const { context, sagaOrigins } = sourceCommand;
66✔
204
                        if (context !== undefined)
66✔
205
                                event.context = context;
2✔
206
                        if (sagaOrigins !== undefined)
66✔
207
                                event.sagaOrigins = { ...sagaOrigins };
2✔
208
                }
209

210
                return event;
74✔
211
        }
212

213
        /** Register aggregate event and mutate aggregate state */
214
        protected emitRaw<TPayload>(event: IEvent<TPayload>): void {
215
                if (!isEvent(event))
74!
NEW
216
                        throw new TypeError('event argument must be a valid IEvent');
×
217
                if (!event.aggregateId)
74!
UNCOV
218
                        throw new TypeError('event.aggregateId argument required');
×
219
                if (typeof event.aggregateVersion !== 'number')
74!
UNCOV
220
                        throw new TypeError('event.aggregateVersion argument must be a Number');
×
221

222
                this.mutate(event);
74✔
223

224
                this.changes.push(event);
74✔
225
        }
226

227
        /** Create an aggregate state snapshot */
228
        protected makeSnapshot(): any {
229
                if (!this.state)
10!
UNCOV
230
                        throw new Error('state property is empty, either define state or override makeSnapshot method');
×
231

232
                return clone(this.state);
10✔
233
        }
234

235
        /** Add snapshot event to the collection of emitted events */
236
        protected takeSnapshot() {
237
                const snapshotEvent = this.emit(SNAPSHOT_EVENT_TYPE, this.makeSnapshot());
10✔
238
                this.#snapshotVersion = snapshotEvent.aggregateVersion;
10✔
239
        }
240

241
        /** Restore aggregate state from a snapshot */
242
        protected restoreSnapshot(snapshotEvent: ISnapshotEvent<TState>) {
243
                if (!isSnapshotEvent(snapshotEvent))
28✔
244
                        throw new TypeError('snapshotEvent argument must be a valid ISnapshotEvent');
6✔
245
                if (!snapshotEvent.payload)
22✔
246
                        throw new TypeError('snapshotEvent.payload argument required');
2✔
247
                if (!this.state)
20!
UNCOV
248
                        throw new Error('state property is empty, either defined state or override restoreSnapshot method');
×
249

250
                Object.assign(this.state, clone(snapshotEvent.payload));
20✔
251
        }
252

253
        /** Get human-readable aggregate identifier */
254
        toString(): string {
255
                return `${getClassName(this)} ${this.id} (v${this.version})`;
×
256
        }
257
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc