• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

snatalenko / node-cqrs / 14938331406

09 May 2025 09:51PM UTC coverage: 83.454% (+0.3%) from 83.126%
14938331406

push

github

snatalenko
1.0.0-rc.11

509 of 809 branches covered (62.92%)

1039 of 1245 relevant lines covered (83.45%)

25.06 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

84.71
/src/AbstractAggregate.ts
1
import {
2
        IAggregate,
3
        IMutableAggregateState,
4
        ICommand,
5
        Identifier,
6
        IEvent,
7
        IEventSet,
8
        IAggregateConstructorParams
9
} from './interfaces';
10

11
import { getClassName, validateHandlers, getHandler, getMessageHandlerNames } from './utils';
24✔
12

13
const SNAPSHOT_EVENT_TYPE = 'snapshot';
24✔
14

15
/**
16
 * Base class for Aggregate definition
17
 */
18
export abstract class AbstractAggregate<TState extends IMutableAggregateState | object | void> implements IAggregate {
24✔
19

20
        /**
21
         * List of command names handled by the Aggregate.
22
         *
23
         * Can be overridden in the Aggregate implementation to explicitly define supported commands.
24
         * If not overridden, all public methods will be treated as command handlers by default.
25
         *
26
         * @example ['createUser', 'changePassword'];
27
         */
28
        static get handles(): string[] {
29
                return getMessageHandlerNames(this);
8✔
30
        }
31

32
        #id: Identifier;
33
        #changes: IEvent[] = [];
92✔
34
        #version: number = 0;
92✔
35
        #snapshotVersion: number | undefined;
36

37
        /** Internal aggregate state */
38
        protected state: TState | undefined;
39

40
        /** Command being handled by aggregate */
41
        protected command?: ICommand;
42

43
        /** Unique aggregate instance identifier */
44
        get id(): Identifier {
45
                return this.#id;
52✔
46
        }
47

48
        /** Aggregate instance version */
49
        get version(): number {
50
                return this.#version;
78✔
51
        }
52

53
        /** Restored snapshot version */
54
        get snapshotVersion(): number | undefined {
55
                return this.#snapshotVersion;
4✔
56
        }
57

58
        /**
59
         * Override to define whether an aggregate state snapshot should be taken
60
         *
61
         * @example
62
         *         // create snapshot every 50 events
63
         *         return this.version % 50 === 0;
64
         */
65
        // eslint-disable-next-line class-methods-use-this
66
        protected get shouldTakeSnapshot(): boolean {
67
                return false;
40✔
68
        }
69

70
        constructor(options: IAggregateConstructorParams<TState>) {
71
                const { id, state, events } = options;
92✔
72
                if (!id)
92!
73
                        throw new TypeError('id argument required');
×
74
                if (state && typeof state !== 'object')
92!
75
                        throw new TypeError('state argument, when provided, must be an Object');
×
76
                if (events && !Array.isArray(events))
92✔
77
                        throw new TypeError('events argument, when provided, must be an Array');
×
78

79
                this.#id = id;
92✔
80

81
                validateHandlers(this);
92✔
82

83
                if (state)
90✔
84
                        this.state = state;
84✔
85

86
                if (events)
90✔
87
                        events.forEach(event => this.mutate(event));
6✔
88
        }
89

90
        /** Mutate aggregate state and increment aggregate version */
91
        mutate(event: IEvent) {
92
                if (event.aggregateVersion !== undefined)
82✔
93
                        this.#version = event.aggregateVersion;
64✔
94

95
                if (event.type === SNAPSHOT_EVENT_TYPE) {
82✔
96
                        this.#snapshotVersion = event.aggregateVersion;
10✔
97
                        this.restoreSnapshot(event);
10✔
98
                }
99
                else if (this.state) {
72✔
100
                        const handler = 'mutate' in this.state ?
72✔
101
                                this.state.mutate :
102
                                getHandler(this.state, event.type);
103
                        if (handler)
72✔
104
                                handler.call(this.state, event);
30✔
105
                }
106

107
                this.#version += 1;
82✔
108
        }
109

110
        /** Pass command to command handler */
111
        async handle(command: ICommand) {
112
                if (!command)
44!
113
                        throw new TypeError('command argument required');
×
114
                if (!command.type)
44!
115
                        throw new TypeError('command.type argument required');
×
116

117
                const handler = getHandler(this, command.type);
44✔
118
                if (!handler)
44✔
119
                        throw new Error(`'${command.type}' handler is not defined or not a function`);
2✔
120

121
                this.command = command;
42✔
122

123
                await handler.call(this, command.payload, command.context);
42✔
124

125
                return this.popChanges();
42✔
126
        }
127

128
        /** Get events emitted during command(s) handling and reset the `changes` collection */
129
        protected popChanges(): IEventSet {
130
                if (this.shouldTakeSnapshot)
46✔
131
                        this.emit(SNAPSHOT_EVENT_TYPE, this.makeSnapshot());
4✔
132

133
                return this.#changes.splice(0);
46✔
134
        }
135

136
        /** Format and register aggregate event and mutate aggregate state */
137
        protected emit<TPayload>(type: string, payload?: TPayload) {
138
                if (typeof type !== 'string' || !type.length)
50!
139
                        throw new TypeError('type argument must be a non-empty string');
×
140

141
                const event = this.makeEvent<TPayload>(type, payload, this.command);
50✔
142

143
                this.emitRaw(event);
50✔
144
        }
145

146
        /** Format event based on a current aggregate state and a command being executed */
147
        protected makeEvent<TPayload>(type: string, payload?: TPayload, sourceCommand?: ICommand): IEvent<TPayload> {
148
                const event: IEvent<TPayload> = {
50✔
149
                        aggregateId: this.id,
150
                        aggregateVersion: this.version,
151
                        type,
152
                        payload
153
                };
154

155
                if (sourceCommand) {
50✔
156
                        // augment event with command context
157
                        const { context, sagaId, sagaVersion } = sourceCommand;
42✔
158
                        if (context !== undefined)
42✔
159
                                event.context = context;
2✔
160
                        if (sagaId !== undefined)
42✔
161
                                event.sagaId = sagaId;
2✔
162
                        if (sagaVersion !== undefined)
42✔
163
                                event.sagaVersion = sagaVersion;
2✔
164
                }
165

166
                return event;
50✔
167
        }
168

169
        /** Register aggregate event and mutate aggregate state */
170
        protected emitRaw<TPayload>(event: IEvent<TPayload>): void {
171
                if (!event)
50!
172
                        throw new TypeError('event argument required');
×
173
                if (!event.aggregateId)
50!
174
                        throw new TypeError('event.aggregateId argument required');
×
175
                if (typeof event.aggregateVersion !== 'number')
50!
176
                        throw new TypeError('event.aggregateVersion argument must be a Number');
×
177
                if (typeof event.type !== 'string' || !event.type.length)
50!
178
                        throw new TypeError('event.type argument must be a non-empty String');
×
179

180
                this.mutate(event);
50✔
181

182
                this.#changes.push(event);
50✔
183
        }
184

185
        /** Create an aggregate state snapshot */
186
        protected makeSnapshot(): any {
187
                if (!this.state)
4!
188
                        throw new Error('state property is empty, either define state or override makeSnapshot method');
×
189

190
                return structuredClone(this.state);
4✔
191
        }
192

193
        /** Restore aggregate state from a snapshot */
194
        protected restoreSnapshot(snapshotEvent: IEvent<TState>) {
195
                if (!snapshotEvent)
22✔
196
                        throw new TypeError('snapshotEvent argument required');
2✔
197
                if (!snapshotEvent.type)
20✔
198
                        throw new TypeError('snapshotEvent.type argument required');
2✔
199
                if (!snapshotEvent.payload)
18✔
200
                        throw new TypeError('snapshotEvent.payload argument required');
2✔
201

202
                if (snapshotEvent.type !== SNAPSHOT_EVENT_TYPE)
16✔
203
                        throw new Error(`${SNAPSHOT_EVENT_TYPE} event type expected`);
2✔
204
                if (!this.state)
14!
205
                        throw new Error('state property is empty, either defined state or override restoreSnapshot method');
×
206

207
                Object.assign(this.state, structuredClone(snapshotEvent.payload));
14✔
208
        }
209

210
        /** Get human-readable aggregate identifier */
211
        toString(): string {
212
                return `${getClassName(this)} ${this.id} (v${this.version})`;
×
213
        }
214
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc