• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

snatalenko / node-cqrs / 21844816144

09 Feb 2026 11:21PM UTC coverage: 83.491%. First build
21844816144

Pull #31

github

web-flow
Merge ae675944f into 025edb883
Pull Request #31: Multi-saga correlation via `message.sagaOrigins`

636 of 986 branches covered (64.5%)

117 of 155 new or added lines in 17 files covered. (75.48%)

1234 of 1478 relevant lines covered (83.49%)

29.86 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

83.15
/src/AbstractAggregate.ts
1
import { AggregateCommandHandler } from './AggregateCommandHandler.ts';
24✔
2
import {
24✔
3
        type IAggregate,
4
        type IMutableState,
5
        type ICommand,
6
        type Identifier,
7
        type IEvent,
8
        type IEventSet,
9
        type IAggregateConstructorParams,
10
        type ISnapshotEvent,
11
        type IAggregateConstructor,
12
        type IEventStore,
13
        type ICommandBus,
14
        SNAPSHOT_EVENT_TYPE,
15
        isSnapshotEvent,
16
        isEvent
17
} from './interfaces/index.ts';
18

19
import {
24✔
20
        getClassName,
21
        validateHandlers,
22
        getHandler,
23
        getMessageHandlerNames,
24
        clone,
25
        promiseOrSync
26
} from './utils/index.ts';
27

28
/**
29
 * Base class for Aggregate definition
30
 */
31
export abstract class AbstractAggregate<TState extends IMutableState | object | void = void> implements
24✔
32
        IAggregate {
33

34
        /**
35
         * List of command names handled by the Aggregate.
36
         *
37
         * Can be overridden in the Aggregate implementation to explicitly define supported commands.
38
         * If not overridden, all public methods will be treated as command handlers by default.
39
         *
40
         * @example ['createUser', 'changePassword'];
41
         */
42
        static get handles(): string[] {
43
                return getMessageHandlerNames(this);
8✔
44
        }
45

46
        /**
47
         * Convenience helper to create an `AggregateCommandHandler` for this aggregate type and
48
         * subscribe it to the provided `commandBus`.
49
         */
50
        static register<T extends AbstractAggregate, S extends IMutableState | object | void>(
51
                this: IAggregateConstructor<T, S> & (new (options: IAggregateConstructorParams<S>) => T),
52
                eventStore: IEventStore,
53
                commandBus: ICommandBus
54
        ): AggregateCommandHandler<T> {
NEW
55
                const handler = new AggregateCommandHandler({ aggregateType: this, eventStore });
×
NEW
56
                handler.subscribe(commandBus);
×
NEW
57
                return handler;
×
58
        }
59

60
        #id: Identifier;
61
        #version: number = 0;
102✔
62
        #snapshotVersion: number | undefined;
63

64
        /** List of emitted events */
65
        protected changes: IEvent[] = [];
102✔
66

67
        /** Internal aggregate state */
68
        protected state: TState | undefined;
69

70
        /** Command being handled by aggregate */
71
        protected command?: ICommand;
72

73
        /** Unique aggregate instance identifier */
74
        get id(): Identifier {
75
                return this.#id;
72✔
76
        }
77

78
        /** Aggregate instance version */
79
        get version(): number {
80
                return this.#version;
118✔
81
        }
82

83
        /** Restored snapshot version */
84
        get snapshotVersion(): number | undefined {
85
                return this.#snapshotVersion;
24✔
86
        }
87

88
        /**
89
         * Override to define whether an aggregate state snapshot should be taken
90
         *
91
         * @example
92
         *   // create snapshot every 50 events if new events were emitted
93
         *   return !!this.changes.length
94
         *     && this.version - (this.snapshotVersion ?? 0) > 50;
95
         */
96
        // eslint-disable-next-line class-methods-use-this
97
        protected get shouldTakeSnapshot(): boolean {
98
                return false;
38✔
99
        }
100

101
        constructor(options: IAggregateConstructorParams<TState>) {
102
                const { id, state, events } = options;
102✔
103
                if (!id)
102!
104
                        throw new TypeError('id argument required');
×
105
                if (state && typeof state !== 'object')
102!
106
                        throw new TypeError('state argument, when provided, must be an Object');
×
107
                if (events && !Array.isArray(events))
102✔
108
                        throw new TypeError('events argument, when provided, must be an Array');
×
109

110
                this.#id = id;
102✔
111

112
                validateHandlers(this);
102✔
113

114
                if (state)
100✔
115
                        this.state = state;
94✔
116

117
                if (events)
100✔
118
                        events.forEach(event => this.mutate(event));
6✔
119
        }
120

121
        /** Mutate aggregate state and increment aggregate version */
122
        mutate(event: IEvent) {
123
                if (event.aggregateVersion !== undefined)
102✔
124
                        this.#version = event.aggregateVersion;
84✔
125

126
                if (event.type === SNAPSHOT_EVENT_TYPE) {
102✔
127
                        this.#snapshotVersion = event.aggregateVersion;
16✔
128
                        this.restoreSnapshot(event as ISnapshotEvent<TState>);
16✔
129
                }
130
                else if (this.state) {
86✔
131
                        const handler = 'mutate' in this.state ?
86✔
132
                                this.state.mutate :
133
                                getHandler(this.state, event.type);
134
                        if (handler)
86✔
135
                                handler.call(this.state, event);
44✔
136
                }
137

138
                this.#version += 1;
102✔
139
        }
140

141
        /** Pass command to command handler */
142
        handle(command: ICommand): IEventSet | Promise<IEventSet> {
143
                if (!command)
60!
144
                        throw new TypeError('command argument required');
×
145
                if (!command.type)
60!
146
                        throw new TypeError('command.type argument required');
×
147

148
                const handler = getHandler(this, command.type);
60✔
149
                if (!handler)
60✔
150
                        throw new Error(`'${command.type}' handler is not defined or not a function`);
2✔
151

152
                if (this.command)
58✔
153
                        throw new Error('Another command is being processed');
2✔
154

155
                this.command = command;
56✔
156

157
                const eventsOffset = this.changes.length;
56✔
158

159
                const handlerResult = handler.call(this, command.payload, command.context);
56✔
160

161
                return promiseOrSync(handlerResult,
56✔
162
                        () => this.getUncommittedEvents(eventsOffset),
56✔
163
                        () => {
164
                                this.command = undefined;
56✔
165
                        });
166
        }
167

168
        /**
169
         * Get the events emitted during commands processing.
170
         * If a snapshot should be taken, the snapshot event is added to the end.
171
         */
172
        protected getUncommittedEvents(offset?: number): IEventSet {
173
                if (this.shouldTakeSnapshot)
56✔
174
                        this.takeSnapshot();
10✔
175

176
                return this.changes.slice(offset);
56✔
177
        }
178

179
        /** Format and register aggregate event and mutate aggregate state */
180
        protected emit(type: string): IEvent<void>;
181
        protected emit<TPayload>(type: string, payload: TPayload): IEvent<TPayload>;
182
        protected emit<TPayload>(type: string, payload?: TPayload): IEvent<TPayload> {
183
                if (typeof type !== 'string' || !type.length)
70!
184
                        throw new TypeError('type argument must be a non-empty string');
×
185

186
                const event = this.makeEvent<TPayload>(type, payload as TPayload, this.command);
70✔
187

188
                this.emitRaw(event);
70✔
189

190
                return event;
70✔
191
        }
192

193
        /** Format event based on a current aggregate state and a command being executed */
194
        protected makeEvent<TPayload>(type: string, payload: TPayload, sourceCommand?: ICommand): IEvent<TPayload> {
195
                const event: IEvent<TPayload> = {
70✔
196
                        aggregateId: this.id,
197
                        aggregateVersion: this.version,
198
                        type,
199
                        payload
200
                };
201

202
                if (sourceCommand) {
70✔
203
                        // augment event with command context
204
                        const { context, sagaOrigins } = sourceCommand;
62✔
205
                        if (context !== undefined)
62✔
206
                                event.context = context;
2✔
207
                        if (sagaOrigins !== undefined)
62✔
208
                                event.sagaOrigins = { ...sagaOrigins };
2✔
209
                }
210

211
                return event;
70✔
212
        }
213

214
        /** Register aggregate event and mutate aggregate state */
215
        protected emitRaw<TPayload>(event: IEvent<TPayload>): void {
216
                if (!isEvent(event))
70!
NEW
217
                        throw new TypeError('event argument must be a valid IEvent');
×
218
                if (!event.aggregateId)
70!
219
                        throw new TypeError('event.aggregateId argument required');
×
220
                if (typeof event.aggregateVersion !== 'number')
70!
221
                        throw new TypeError('event.aggregateVersion argument must be a Number');
×
222

223
                this.mutate(event);
70✔
224

225
                this.changes.push(event);
70✔
226
        }
227

228
        /** Create an aggregate state snapshot */
229
        protected makeSnapshot(): any {
230
                if (!this.state)
10!
231
                        throw new Error('state property is empty, either define state or override makeSnapshot method');
×
232

233
                return clone(this.state);
10✔
234
        }
235

236
        /** Add snapshot event to the collection of emitted events */
237
        protected takeSnapshot() {
238
                const snapshotEvent = this.emit(SNAPSHOT_EVENT_TYPE, this.makeSnapshot());
10✔
239
                this.#snapshotVersion = snapshotEvent.aggregateVersion;
10✔
240
        }
241

242
        /** Restore aggregate state from a snapshot */
243
        protected restoreSnapshot(snapshotEvent: ISnapshotEvent<TState>) {
244
                if (!isSnapshotEvent(snapshotEvent))
28✔
245
                        throw new TypeError('snapshotEvent argument must be a valid ISnapshotEvent');
6✔
246
                if (!snapshotEvent.payload)
22✔
247
                        throw new TypeError('snapshotEvent.payload argument required');
2✔
248
                if (!this.state)
20!
249
                        throw new Error('state property is empty, either defined state or override restoreSnapshot method');
×
250

251
                Object.assign(this.state, clone(snapshotEvent.payload));
20✔
252
        }
253

254
        /** Get human-readable aggregate identifier */
255
        toString(): string {
256
                return `${getClassName(this)} ${this.id} (v${this.version})`;
×
257
        }
258
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc