• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

snatalenko / node-cqrs / 21645374105

03 Feb 2026 07:52PM UTC coverage: 84.53% (-9.9%) from 94.396%
21645374105

Pull #28

github

web-flow
Merge 2650d0e58 into 828e39903
Pull Request #28: TypeScript and event dispatching pipeline refactoring

611 of 939 branches covered (65.07%)

817 of 932 new or added lines in 65 files covered. (87.66%)

59 existing lines in 13 files now uncovered.

1213 of 1435 relevant lines covered (84.53%)

28.39 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

85.0
/src/AbstractAggregate.ts
1
import {
24✔
2
        type IAggregate,
3
        type IMutableAggregateState,
4
        type ICommand,
5
        type Identifier,
6
        type IEvent,
7
        type IEventSet,
8
        type IAggregateConstructorParams,
9
        SNAPSHOT_EVENT_TYPE
10
} from './interfaces/index.ts';
11

12
import { getClassName, validateHandlers, getHandler, getMessageHandlerNames, clone } from './utils/index.ts';
24✔
13

14
/**
15
 * Base class for Aggregate definition
16
 */
17
export abstract class AbstractAggregate<TState extends IMutableAggregateState | object | void = void> implements
24✔
18
        IAggregate {
19

20
        /**
21
         * List of command names handled by the Aggregate.
22
         *
23
         * Can be overridden in the Aggregate implementation to explicitly define supported commands.
24
         * If not overridden, all public methods will be treated as command handlers by default.
25
         *
26
         * @example ['createUser', 'changePassword'];
27
         */
28
        static get handles(): string[] {
29
                return getMessageHandlerNames(this);
8✔
30
        }
31

32
        #id: Identifier;
33
        #version: number = 0;
102✔
34
        #snapshotVersion: number | undefined;
35

36
        /** List of emitted events */
37
        protected changes: IEvent[] = [];
102✔
38

39
        /** Internal aggregate state */
40
        protected state: TState | undefined;
41

42
        /** Command being handled by aggregate */
43
        protected command?: ICommand;
44

45
        /** Unique aggregate instance identifier */
46
        get id(): Identifier {
47
                return this.#id;
72✔
48
        }
49

50
        /** Aggregate instance version */
51
        get version(): number {
52
                return this.#version;
118✔
53
        }
54

55
        /** Restored snapshot version */
56
        get snapshotVersion(): number | undefined {
57
                return this.#snapshotVersion;
24✔
58
        }
59

60
        /**
61
         * Override to define whether an aggregate state snapshot should be taken
62
         *
63
         * @example
64
         *   // create snapshot every 50 events if new events were emitted
65
         *   return !!this.changes.length
66
         *     && this.version - (this.snapshotVersion ?? 0) > 50;
67
         */
68
        // eslint-disable-next-line class-methods-use-this
69
        protected get shouldTakeSnapshot(): boolean {
70
                return false;
38✔
71
        }
72

73
        constructor(options: IAggregateConstructorParams<TState>) {
74
                const { id, state, events } = options;
102✔
75
                if (!id)
102!
UNCOV
76
                        throw new TypeError('id argument required');
×
77
                if (state && typeof state !== 'object')
102!
UNCOV
78
                        throw new TypeError('state argument, when provided, must be an Object');
×
79
                if (events && !Array.isArray(events))
102✔
UNCOV
80
                        throw new TypeError('events argument, when provided, must be an Array');
×
81

82
                this.#id = id;
102✔
83

84
                validateHandlers(this);
102✔
85

86
                if (state)
100✔
87
                        this.state = state;
94✔
88

89
                if (events)
100✔
90
                        events.forEach(event => this.mutate(event));
6✔
91
        }
92

93
        /** Mutate aggregate state and increment aggregate version */
94
        mutate(event: IEvent) {
95
                if (event.aggregateVersion !== undefined)
102✔
96
                        this.#version = event.aggregateVersion;
84✔
97

98
                if (event.type === SNAPSHOT_EVENT_TYPE) {
102✔
99
                        this.#snapshotVersion = event.aggregateVersion;
16✔
100
                        this.restoreSnapshot(event);
16✔
101
                }
102
                else if (this.state) {
86✔
103
                        const handler = 'mutate' in this.state ?
86✔
104
                                this.state.mutate :
105
                                getHandler(this.state, event.type);
106
                        if (handler)
86✔
107
                                handler.call(this.state, event);
44✔
108
                }
109

110
                this.#version += 1;
102✔
111
        }
112

113
        /** Pass command to command handler */
114
        handle(command: ICommand) {
115
                if (!command)
60!
NEW
116
                        throw new TypeError('command argument required');
×
117
                if (!command.type)
60!
NEW
118
                        throw new TypeError('command.type argument required');
×
119

120
                const handler = getHandler(this, command.type);
60✔
121
                if (!handler)
60✔
122
                        throw new Error(`'${command.type}' handler is not defined or not a function`);
2✔
123

124
                if (this.command)
58✔
125
                        throw new Error('Another command is being processed');
2✔
126

127
                try {
56✔
128
                        this.command = command;
56✔
129

130
                        const eventsOffset = this.changes.length;
56✔
131

132
                        const handlerResult = handler.call(this, command.payload, command.context);
56✔
133

134
                        if (handlerResult instanceof Promise) {
56✔
135
                                return handlerResult
52✔
136
                                        .then(() => this.getUncommittedEvents(eventsOffset))
52✔
137
                                        .finally(() => {
138
                                                this.command = undefined;
52✔
139
                                        });
140
                        }
141
                        else { // handle synchronous result
142
                                const events = this.getUncommittedEvents(eventsOffset);
4✔
143
                                this.command = undefined;
4✔
144
                                return events;
4✔
145
                        }
146
                }
147
                catch (err) {
NEW
148
                        this.command = undefined;
×
NEW
149
                        throw err;
×
150
                }
151
        }
152

153
        /**
154
         * Get the events emitted during commands processing.
155
         * If a snapshot should be taken, the snapshot event is added to the end.
156
         */
157
        protected getUncommittedEvents(offset?: number): IEventSet {
158
                if (this.shouldTakeSnapshot)
56✔
159
                        this.takeSnapshot();
10✔
160

161
                return this.changes.slice(offset);
56✔
162
        }
163

164
        /** Format and register aggregate event and mutate aggregate state */
165
        protected emit<TPayload>(type: string, payload?: TPayload): IEvent<TPayload> {
166
                if (typeof type !== 'string' || !type.length)
70!
UNCOV
167
                        throw new TypeError('type argument must be a non-empty string');
×
168

169
                const event = this.makeEvent<TPayload>(type, payload, this.command);
70✔
170

171
                this.emitRaw(event);
70✔
172

173
                return event;
70✔
174
        }
175

176
        /** Format event based on a current aggregate state and a command being executed */
177
        protected makeEvent<TPayload>(type: string, payload?: TPayload, sourceCommand?: ICommand): IEvent<TPayload> {
178
                const event: IEvent<TPayload> = {
70✔
179
                        aggregateId: this.id,
180
                        aggregateVersion: this.version,
181
                        type,
182
                        payload
183
                };
184

185
                if (sourceCommand) {
70✔
186
                        // augment event with command context
187
                        const { context, sagaId, sagaVersion } = sourceCommand;
62✔
188
                        if (context !== undefined)
62✔
189
                                event.context = context;
2✔
190
                        if (sagaId !== undefined)
62✔
191
                                event.sagaId = sagaId;
2✔
192
                        if (sagaVersion !== undefined)
62✔
193
                                event.sagaVersion = sagaVersion;
2✔
194
                }
195

196
                return event;
70✔
197
        }
198

199
        /** Register aggregate event and mutate aggregate state */
200
        protected emitRaw<TPayload>(event: IEvent<TPayload>): void {
201
                if (!event)
70!
UNCOV
202
                        throw new TypeError('event argument required');
×
203
                if (!event.aggregateId)
70!
UNCOV
204
                        throw new TypeError('event.aggregateId argument required');
×
205
                if (typeof event.aggregateVersion !== 'number')
70!
UNCOV
206
                        throw new TypeError('event.aggregateVersion argument must be a Number');
×
207
                if (typeof event.type !== 'string' || !event.type.length)
70!
UNCOV
208
                        throw new TypeError('event.type argument must be a non-empty String');
×
209

210
                this.mutate(event);
70✔
211

212
                this.changes.push(event);
70✔
213
        }
214

215
        /** Create an aggregate state snapshot */
216
        protected makeSnapshot(): any {
217
                if (!this.state)
10!
UNCOV
218
                        throw new Error('state property is empty, either define state or override makeSnapshot method');
×
219

220
                return clone(this.state);
10✔
221
        }
222

223
        /** Add snapshot event to the collection of emitted events */
224
        protected takeSnapshot() {
225
                const snapshotEvent = this.emit(SNAPSHOT_EVENT_TYPE, this.makeSnapshot());
10✔
226
                this.#snapshotVersion = snapshotEvent.aggregateVersion;
10✔
227
        }
228

229
        /** Restore aggregate state from a snapshot */
230
        protected restoreSnapshot(snapshotEvent: IEvent<TState>) {
231
                if (!snapshotEvent)
28✔
232
                        throw new TypeError('snapshotEvent argument required');
2✔
233
                if (!snapshotEvent.type)
26✔
234
                        throw new TypeError('snapshotEvent.type argument required');
2✔
235
                if (!snapshotEvent.payload)
24✔
236
                        throw new TypeError('snapshotEvent.payload argument required');
2✔
237

238
                if (snapshotEvent.type !== SNAPSHOT_EVENT_TYPE)
22✔
239
                        throw new Error(`${SNAPSHOT_EVENT_TYPE} event type expected`);
2✔
240
                if (!this.state)
20!
UNCOV
241
                        throw new Error('state property is empty, either defined state or override restoreSnapshot method');
×
242

243
                Object.assign(this.state, clone(snapshotEvent.payload));
20✔
244
        }
245

246
        /** Get human-readable aggregate identifier */
247
        toString(): string {
248
                return `${getClassName(this)} ${this.id} (v${this.version})`;
×
249
        }
250
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc