• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

snatalenko / node-cqrs / 21717407497

05 Feb 2026 03:26PM UTC coverage: 84.53% (-9.9%) from 94.396%
21717407497

Pull #28

github

web-flow
Merge 025edb883 into 828e39903
Pull Request #28: TypeScript and event dispatching pipeline refactoring

611 of 939 branches covered (65.07%)

819 of 934 new or added lines in 65 files covered. (87.69%)

59 existing lines in 13 files now uncovered.

1213 of 1435 relevant lines covered (84.53%)

28.39 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

85.0
/src/AbstractAggregate.ts
1
import {
24✔
2
        type IAggregate,
3
        type IMutableAggregateState,
4
        type ICommand,
5
        type Identifier,
6
        type IEvent,
7
        type IEventSet,
8
        type IAggregateConstructorParams,
9
        SNAPSHOT_EVENT_TYPE
10
} from './interfaces/index.ts';
11

12
import { getClassName, validateHandlers, getHandler, getMessageHandlerNames, clone } from './utils/index.ts';
24✔
13

14
/**
15
 * Base class for Aggregate definition
16
 */
17
export abstract class AbstractAggregate<TState extends IMutableAggregateState | object | void = void> implements
24✔
18
        IAggregate {
19

20
        /**
21
         * List of command names handled by the Aggregate.
22
         *
23
         * Can be overridden in the Aggregate implementation to explicitly define supported commands.
24
         * If not overridden, all public methods will be treated as command handlers by default.
25
         *
26
         * @example ['createUser', 'changePassword'];
27
         */
28
        static get handles(): string[] {
29
                return getMessageHandlerNames(this);
8✔
30
        }
31

32
        #id: Identifier;
33
        #version: number = 0;
102✔
34
        #snapshotVersion: number | undefined;
35

36
        /** List of emitted events */
37
        protected changes: IEvent[] = [];
102✔
38

39
        /** Internal aggregate state */
40
        protected state: TState | undefined;
41

42
        /** Command being handled by aggregate */
43
        protected command?: ICommand;
44

45
        /** Unique aggregate instance identifier */
46
        get id(): Identifier {
47
                return this.#id;
72✔
48
        }
49

50
        /** Aggregate instance version */
51
        get version(): number {
52
                return this.#version;
118✔
53
        }
54

55
        /** Restored snapshot version */
56
        get snapshotVersion(): number | undefined {
57
                return this.#snapshotVersion;
24✔
58
        }
59

60
        /**
61
         * Override to define whether an aggregate state snapshot should be taken
62
         *
63
         * @example
64
         *   // create snapshot every 50 events if new events were emitted
65
         *   return !!this.changes.length
66
         *     && this.version - (this.snapshotVersion ?? 0) > 50;
67
         */
68
        // eslint-disable-next-line class-methods-use-this
69
        protected get shouldTakeSnapshot(): boolean {
70
                return false;
38✔
71
        }
72

73
        constructor(options: IAggregateConstructorParams<TState>) {
74
                const { id, state, events } = options;
102✔
75
                if (!id)
102!
UNCOV
76
                        throw new TypeError('id argument required');
×
77
                if (state && typeof state !== 'object')
102!
UNCOV
78
                        throw new TypeError('state argument, when provided, must be an Object');
×
79
                if (events && !Array.isArray(events))
102✔
UNCOV
80
                        throw new TypeError('events argument, when provided, must be an Array');
×
81

82
                this.#id = id;
102✔
83

84
                validateHandlers(this);
102✔
85

86
                if (state)
100✔
87
                        this.state = state;
94✔
88

89
                if (events)
100✔
90
                        events.forEach(event => this.mutate(event));
6✔
91
        }
92

93
        /** Mutate aggregate state and increment aggregate version */
94
        mutate(event: IEvent) {
95
                if (event.aggregateVersion !== undefined)
102✔
96
                        this.#version = event.aggregateVersion;
84✔
97

98
                if (event.type === SNAPSHOT_EVENT_TYPE) {
102✔
99
                        this.#snapshotVersion = event.aggregateVersion;
16✔
100
                        this.restoreSnapshot(event);
16✔
101
                }
102
                else if (this.state) {
86✔
103
                        const handler = 'mutate' in this.state ?
86✔
104
                                this.state.mutate :
105
                                getHandler(this.state, event.type);
106
                        if (handler)
86✔
107
                                handler.call(this.state, event);
44✔
108
                }
109

110
                this.#version += 1;
102✔
111
        }
112

113
        /** Pass command to command handler */
114
        handle(command: ICommand) {
115
                if (!command)
60!
NEW
116
                        throw new TypeError('command argument required');
×
117
                if (!command.type)
60!
NEW
118
                        throw new TypeError('command.type argument required');
×
119

120
                const handler = getHandler(this, command.type);
60✔
121
                if (!handler)
60✔
122
                        throw new Error(`'${command.type}' handler is not defined or not a function`);
2✔
123

124
                if (this.command)
58✔
125
                        throw new Error('Another command is being processed');
2✔
126

127
                try {
56✔
128
                        this.command = command;
56✔
129

130
                        const eventsOffset = this.changes.length;
56✔
131

132
                        const handlerResult = handler.call(this, command.payload, command.context);
56✔
133

134
                        if (handlerResult instanceof Promise) {
56✔
135
                                return handlerResult
52✔
136
                                        .then(() => this.getUncommittedEvents(eventsOffset))
52✔
137
                                        .finally(() => {
138
                                                this.command = undefined;
52✔
139
                                        });
140
                        }
141
                        else { // handle synchronous result
142
                                const events = this.getUncommittedEvents(eventsOffset);
4✔
143
                                this.command = undefined;
4✔
144
                                return events;
4✔
145
                        }
146
                }
147
                catch (err) {
NEW
148
                        this.command = undefined;
×
NEW
149
                        throw err;
×
150
                }
151
        }
152

153
        /**
154
         * Get the events emitted during commands processing.
155
         * If a snapshot should be taken, the snapshot event is added to the end.
156
         */
157
        protected getUncommittedEvents(offset?: number): IEventSet {
158
                if (this.shouldTakeSnapshot)
56✔
159
                        this.takeSnapshot();
10✔
160

161
                return this.changes.slice(offset);
56✔
162
        }
163

164
        /** Format and register aggregate event and mutate aggregate state */
165
        protected emit(type: string): IEvent<void>;
166
        protected emit<TPayload>(type: string, payload: TPayload): IEvent<TPayload>;
167
        protected emit<TPayload>(type: string, payload?: TPayload): IEvent<TPayload> {
168
                if (typeof type !== 'string' || !type.length)
70!
UNCOV
169
                        throw new TypeError('type argument must be a non-empty string');
×
170

171
                const event = this.makeEvent<TPayload>(type, payload as TPayload, this.command);
70✔
172

173
                this.emitRaw(event);
70✔
174

175
                return event;
70✔
176
        }
177

178
        /** Format event based on a current aggregate state and a command being executed */
179
        protected makeEvent<TPayload>(type: string, payload: TPayload, sourceCommand?: ICommand): IEvent<TPayload> {
180
                const event: IEvent<TPayload> = {
70✔
181
                        aggregateId: this.id,
182
                        aggregateVersion: this.version,
183
                        type,
184
                        payload
185
                };
186

187
                if (sourceCommand) {
70✔
188
                        // augment event with command context
189
                        const { context, sagaId, sagaVersion } = sourceCommand;
62✔
190
                        if (context !== undefined)
62✔
191
                                event.context = context;
2✔
192
                        if (sagaId !== undefined)
62✔
193
                                event.sagaId = sagaId;
2✔
194
                        if (sagaVersion !== undefined)
62✔
195
                                event.sagaVersion = sagaVersion;
2✔
196
                }
197

198
                return event;
70✔
199
        }
200

201
        /** Register aggregate event and mutate aggregate state */
202
        protected emitRaw<TPayload>(event: IEvent<TPayload>): void {
203
                if (!event)
70!
UNCOV
204
                        throw new TypeError('event argument required');
×
205
                if (!event.aggregateId)
70!
UNCOV
206
                        throw new TypeError('event.aggregateId argument required');
×
207
                if (typeof event.aggregateVersion !== 'number')
70!
UNCOV
208
                        throw new TypeError('event.aggregateVersion argument must be a Number');
×
209
                if (typeof event.type !== 'string' || !event.type.length)
70!
UNCOV
210
                        throw new TypeError('event.type argument must be a non-empty String');
×
211

212
                this.mutate(event);
70✔
213

214
                this.changes.push(event);
70✔
215
        }
216

217
        /** Create an aggregate state snapshot */
218
        protected makeSnapshot(): any {
219
                if (!this.state)
10!
UNCOV
220
                        throw new Error('state property is empty, either define state or override makeSnapshot method');
×
221

222
                return clone(this.state);
10✔
223
        }
224

225
        /** Add snapshot event to the collection of emitted events */
226
        protected takeSnapshot() {
227
                const snapshotEvent = this.emit(SNAPSHOT_EVENT_TYPE, this.makeSnapshot());
10✔
228
                this.#snapshotVersion = snapshotEvent.aggregateVersion;
10✔
229
        }
230

231
        /** Restore aggregate state from a snapshot */
232
        protected restoreSnapshot(snapshotEvent: IEvent<TState>) {
233
                if (!snapshotEvent)
28✔
234
                        throw new TypeError('snapshotEvent argument required');
2✔
235
                if (!snapshotEvent.type)
26✔
236
                        throw new TypeError('snapshotEvent.type argument required');
2✔
237
                if (!snapshotEvent.payload)
24✔
238
                        throw new TypeError('snapshotEvent.payload argument required');
2✔
239

240
                if (snapshotEvent.type !== SNAPSHOT_EVENT_TYPE)
22✔
241
                        throw new Error(`${SNAPSHOT_EVENT_TYPE} event type expected`);
2✔
242
                if (!this.state)
20!
UNCOV
243
                        throw new Error('state property is empty, either defined state or override restoreSnapshot method');
×
244

245
                Object.assign(this.state, clone(snapshotEvent.payload));
20✔
246
        }
247

248
        /** Get human-readable aggregate identifier */
249
        toString(): string {
250
                return `${getClassName(this)} ${this.id} (v${this.version})`;
×
251
        }
252
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc