• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

snatalenko / node-cqrs / 14002510880

21 Mar 2025 11:15PM UTC coverage: 82.347% (-12.1%) from 94.436%
14002510880

push

github

snatalenko
1.0.0-rc.6

450 of 731 branches covered (61.56%)

877 of 1065 relevant lines covered (82.35%)

21.41 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

84.52
/src/AbstractAggregate.ts
1
import {
2
        IAggregate,
3
        IMutableAggregateState,
4
        ICommand,
5
        Identifier,
6
        IEvent,
7
        IEventSet,
8
        IAggregateConstructorParams
9
} from "./interfaces";
10

11
import { getClassName, validateHandlers, getHandler, getMessageHandlerNames } from './utils';
22✔
12

13
/**
14
 * Deep-clone simple JS object
15
 */
16
function clone<T>(obj: T): T {
17
        return JSON.parse(JSON.stringify(obj));
18✔
18
}
19

20
const SNAPSHOT_EVENT_TYPE = 'snapshot';
22✔
21

22
/**
23
 * Base class for Aggregate definition
24
 */
25
export abstract class AbstractAggregate<TState extends IMutableAggregateState | object | void> implements IAggregate {
22✔
26

27
        /**
28
         * List of command names handled by the Aggregate.
29
         *
30
         * Can be overridden in the Aggregate implementation to explicitly define supported commands.
31
         * If not overridden, all public methods will be treated as command handlers by default.
32
         *
33
         * @example ['createUser', 'changePassword'];
34
         */
35
        static get handles(): string[] {
36
                return getMessageHandlerNames(this);
8✔
37
        }
38

39
        #id: Identifier;
40
        #changes: IEvent[] = [];
82✔
41
        #version: number = 0;
82✔
42
        #snapshotVersion: number | undefined;
43

44
        /** Internal aggregate state */
45
        protected state: TState;
46

47
        /** Command being handled by aggregate */
48
        protected command?: ICommand;
49

50
        /** Unique aggregate instance identifier */
51
        get id(): Identifier {
52
                return this.#id;
38✔
53
        }
54

55
        /** Aggregate instance version */
56
        get version(): number {
57
                return this.#version;
64✔
58
        }
59

60
        /** Restored snapshot version */
61
        get snapshotVersion(): number | undefined {
62
                return this.#snapshotVersion;
4✔
63
        }
64

65
        /** Events emitted by Aggregate */
66
        get changes(): IEventSet {
67
                return [...this.#changes];
48✔
68
        }
69

70
        /**
71
         * Override to define whether an aggregate state snapshot should be taken
72
         *
73
         * @example
74
         *         // create snapshot every 50 events
75
         *         return this.version % 50 === 0;
76
         */
77
        get shouldTakeSnapshot(): boolean {
78
                return false;
12✔
79
        }
80

81
        constructor(options: IAggregateConstructorParams<TState>) {
82
                const { id, state, events } = options;
82✔
83
                if (!id)
82!
84
                        throw new TypeError('id argument required');
×
85
                if (state && typeof state !== 'object')
82!
86
                        throw new TypeError('state argument, when provided, must be an Object');
×
87
                if (events && !Array.isArray(events))
82✔
88
                        throw new TypeError('events argument, when provided, must be an Array');
×
89

90
                this.#id = id;
82✔
91

92
                validateHandlers(this);
82✔
93

94
                if (state)
80✔
95
                        this.state = state;
74✔
96

97
                if (events)
80✔
98
                        events.forEach(event => this.mutate(event));
8✔
99
        }
100

101
        /** Pass command to command handler */
102
        handle(command: ICommand) {
103
                if (!command)
28!
104
                        throw new TypeError('command argument required');
×
105
                if (!command.type)
28!
106
                        throw new TypeError('command.type argument required');
×
107

108
                const handler = getHandler(this, command.type);
28✔
109
                if (!handler)
28✔
110
                        throw new Error(`'${command.type}' handler is not defined or not a function`);
2✔
111

112
                this.command = command;
26✔
113

114
                return handler.call(this, command.payload, command.context);
26✔
115
        }
116

117
        /** Mutate aggregate state and increment aggregate version */
118
        mutate(event) {
119
                if (event.aggregateVersion !== undefined)
58✔
120
                        this.#version = event.aggregateVersion;
40✔
121

122
                if (event.type === SNAPSHOT_EVENT_TYPE) {
58✔
123
                        this.#snapshotVersion = event.aggregateVersion;
10✔
124
                        this.restoreSnapshot(event);
10✔
125
                }
126
                else if (this.state) {
48✔
127
                        const handler = 'mutate' in this.state ?
48✔
128
                                this.state.mutate :
129
                                getHandler(this.state, event.type);
130
                        if (handler)
48✔
131
                                handler.call(this.state, event);
30✔
132
                }
133

134
                this.#version += 1;
58✔
135
        }
136

137
        /** Format and register aggregate event and mutate aggregate state */
138
        protected emit<TPayload>(type: string, payload?: TPayload) {
139
                if (typeof type !== 'string' || !type.length)
36!
140
                        throw new TypeError('type argument must be a non-empty string');
×
141

142
                const event = this.makeEvent<TPayload>(type, payload, this.command);
36✔
143

144
                this.emitRaw(event);
36✔
145
        }
146

147
        /** Format event based on a current aggregate state and a command being executed */
148
        protected makeEvent<TPayload>(type: string, payload?: TPayload, sourceCommand?: ICommand): IEvent<TPayload> {
149
                const event: IEvent<TPayload> = {
36✔
150
                        aggregateId: this.id,
151
                        aggregateVersion: this.version,
152
                        type,
153
                        payload
154
                };
155

156
                if (sourceCommand) {
36✔
157
                        // augment event with command context
158
                        const { context, sagaId, sagaVersion } = sourceCommand;
26✔
159
                        if (context !== undefined)
26✔
160
                                event.context = context;
2✔
161
                        if (sagaId !== undefined)
26✔
162
                                event.sagaId = sagaId;
2✔
163
                        if (sagaVersion !== undefined)
26✔
164
                                event.sagaVersion = sagaVersion;
2✔
165
                }
166

167
                return event;
36✔
168
        }
169

170
        /** Register aggregate event and mutate aggregate state */
171
        protected emitRaw<TPayload>(event: IEvent<TPayload>): void {
172
                if (!event)
36!
173
                        throw new TypeError('event argument required');
×
174
                if (!event.aggregateId)
36!
175
                        throw new TypeError('event.aggregateId argument required');
×
176
                if (typeof event.aggregateVersion !== 'number')
36!
177
                        throw new TypeError('event.aggregateVersion argument must be a Number');
×
178
                if (typeof event.type !== 'string' || !event.type.length)
36!
179
                        throw new TypeError('event.type argument must be a non-empty String');
×
180

181
                this.mutate(event);
36✔
182

183
                this.#changes.push(event);
36✔
184
        }
185

186
        /**
187
         * Take an aggregate state snapshot and add it to the changes queue
188
         */
189
        takeSnapshot() {
190
                this.emit(SNAPSHOT_EVENT_TYPE, this.makeSnapshot());
4✔
191
        }
192

193
        /** Create an aggregate state snapshot */
194
        makeSnapshot(): TState {
195
                if (!this.state)
4!
196
                        throw new Error('state property is empty, either define state or override makeSnapshot method');
×
197

198
                return clone(this.state);
4✔
199
        }
200

201
        /** Restore aggregate state from a snapshot */
202
        protected restoreSnapshot(snapshotEvent: IEvent<TState>) {
203
                if (!snapshotEvent)
22✔
204
                        throw new TypeError('snapshotEvent argument required');
2✔
205
                if (!snapshotEvent.type)
20✔
206
                        throw new TypeError('snapshotEvent.type argument required');
2✔
207
                if (!snapshotEvent.payload)
18✔
208
                        throw new TypeError('snapshotEvent.payload argument required');
2✔
209

210
                if (snapshotEvent.type !== SNAPSHOT_EVENT_TYPE)
16✔
211
                        throw new Error(`${SNAPSHOT_EVENT_TYPE} event type expected`);
2✔
212
                if (!this.state)
14!
213
                        throw new Error('state property is empty, either defined state or override restoreSnapshot method');
×
214

215
                Object.assign(this.state, clone(snapshotEvent.payload));
14✔
216
        }
217

218
        /** Get human-readable aggregate identifier */
219
        toString(): string {
220
                return `${getClassName(this)} ${this.id} (v${this.version})`;
×
221
        }
222
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc