• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

snatalenko / node-cqrs / 21643388185

03 Feb 2026 06:52PM UTC coverage: 75.749% (-18.6%) from 94.396%
21643388185

Pull #28

github

web-flow
Merge 8c7b95a7f into 828e39903
Pull Request #28: TypeScript and event dispatching pipeline refactoring

522 of 939 branches covered (55.59%)

696 of 932 new or added lines in 65 files covered. (74.68%)

59 existing lines in 13 files now uncovered.

1087 of 1435 relevant lines covered (75.75%)

25.48 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

28.57
/src/in-memory/InMemorySnapshotStorage.ts
1
import {
24✔
2
        type DispatchPipelineBatch,
3
        type IAggregateSnapshotStorage,
4
        type IContainer,
5
        type Identifier,
6
        type IDispatchPipelineProcessor,
7
        type IEvent,
8
        type ILogger,
9
        isSnapshotEvent
10
} from '../interfaces/index.ts';
11
import * as Event from '../Event.ts';
24✔
12

13
/**
14
 * In-memory storage for aggregate snapshots.
15
 * Storage content resets on app restart
16
 */
17
export class InMemorySnapshotStorage implements IAggregateSnapshotStorage, IDispatchPipelineProcessor {
24✔
18

19
        #snapshots: Map<Identifier, IEvent> = new Map();
22✔
20
        #logger: ILogger | undefined;
21

22
        constructor(c?: Partial<Pick<IContainer, 'logger'>>) {
23
                this.#logger = c?.logger && 'child' in c?.logger ?
22!
24
                        c?.logger.child({ service: new.target.name }) :
25
                        c?.logger;
26
        }
27

28
        /**
29
         * Get latest aggregate snapshot
30
         */
31
        async getAggregateSnapshot(aggregateId: string): Promise<IEvent | undefined> {
32
                return this.#snapshots.get(aggregateId);
18✔
33
        }
34

35
        /**
36
         * Save new aggregate snapshot
37
         */
38
        async saveAggregateSnapshot(snapshotEvent: IEvent) {
NEW
39
                if (!snapshotEvent.aggregateId)
×
NEW
40
                        throw new TypeError('event.aggregateId is required');
×
41

NEW
42
                this.#logger?.debug(`Persisting ${Event.describe(snapshotEvent)}`);
×
43

NEW
44
                this.#snapshots.set(snapshotEvent.aggregateId, snapshotEvent);
×
45
        }
46

47
        /**
48
         * Delete aggregate snapshot
49
         */
50
        deleteAggregateSnapshot<TState>(snapshotEvent: IEvent<TState>): Promise<void> | void {
NEW
51
                if (!snapshotEvent.aggregateId)
×
NEW
52
                        throw new TypeError('snapshotEvent.aggregateId argument required');
×
53

NEW
54
                this.#logger?.debug(`Removing ${Event.describe(snapshotEvent)}`);
×
55

NEW
56
                this.#snapshots.delete(snapshotEvent.aggregateId);
×
57
        }
58

59
        /**
60
         * Processes a batch of events, saves any snapshot events found, and returns the batch
61
         * without the snapshot events.
62
         *
63
         * This method is part of the `IDispatchPipelineProcessor` interface.
64
         */
65
        async process(batch: DispatchPipelineBatch): Promise<DispatchPipelineBatch> {
NEW
66
                const snapshotEvents = batch.map(e => e.event).filter(isSnapshotEvent);
×
NEW
67
                for (const event of snapshotEvents)
×
NEW
68
                        await this.saveAggregateSnapshot(event);
×
69

NEW
70
                return batch.filter(e => !isSnapshotEvent(e.event));
×
71
        }
72

73
        /**
74
         * Reverts the snapshots associated with the events in the given batch.
75
         * It filters the batch for snapshot events and deletes the corresponding aggregate snapshots.
76
         *
77
         * This method is part of the `IDispatchPipelineProcessor` interface.
78
         *
79
         * @param batch The batch of events to revert snapshots for.
80
         */
81
        async revert(batch: DispatchPipelineBatch): Promise<void> {
NEW
82
                const snapshotEvents = batch.map(e => e.event).filter(isSnapshotEvent);
×
NEW
83
                for (const snapshotEvent of snapshotEvents)
×
NEW
84
                        await this.deleteAggregateSnapshot(snapshotEvent);
×
85
        }
86
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc