• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

snatalenko / node-cqrs / 21875627364

10 Feb 2026 05:34PM UTC coverage: 84.658% (-9.7%) from 94.396%
21875627364

Pull #28

github

web-flow
Merge f3844c4de into 828e39903
Pull Request #28: TypeScript and event dispatching pipeline refactoring

626 of 953 branches covered (65.69%)

832 of 947 new or added lines in 65 files covered. (87.86%)

59 existing lines in 13 files now uncovered.

1225 of 1447 relevant lines covered (84.66%)

28.81 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

97.83
/src/in-memory/InMemoryEventStorage.ts
1
import type {
2
        IIdentifierProvider,
3
        IEvent,
4
        IEventSet,
5
        EventQueryAfter,
6
        IEventStorageReader,
7
        IEventStream,
8
        IEventStorageWriter,
9
        Identifier,
10
        IDispatchPipelineProcessor,
11
        DispatchPipelineBatch,
12
        AggregateEventsQueryParams
13
} from '../interfaces/index.ts';
14
import { nextCycle } from './utils/index.ts';
24✔
15

16
/**
17
 * A simple event storage implementation intended to use for tests only.
18
 * Storage content resets on each app restart.
19
 */
20
export class InMemoryEventStorage implements
24✔
21
        IEventStorageReader,
22
        IEventStorageWriter,
23
        IIdentifierProvider,
24
        IDispatchPipelineProcessor {
25

26
        #nextId: number = 0;
60✔
27
        #events: IEventSet = [];
60✔
28

29
        getNewId(): string {
30
                this.#nextId += 1;
24✔
31
                return String(this.#nextId);
24✔
32
        }
33

34
        async commitEvents(events: IEventSet): Promise<IEventSet> {
35
                await nextCycle();
58✔
36

37
                this.#events = this.#events.concat(events);
58✔
38

39
                await nextCycle();
58✔
40

41
                return events;
58✔
42
        }
43

44
        async* getAggregateEvents(aggregateId: Identifier, options?: AggregateEventsQueryParams): IEventStream {
45
                await nextCycle();
24✔
46

47
                const afterVersion = options?.snapshot?.aggregateVersion;
24✔
48
                const allAfterSnapshot = !afterVersion ?
24✔
49
                        this.#events.filter(e => e.aggregateId === aggregateId) :
26✔
50
                        this.#events.filter(e =>
51
                                e.aggregateId === aggregateId &&
4✔
52
                                e.aggregateVersion !== undefined &&
53
                                e.aggregateVersion > afterVersion);
54

55
                const results = options?.eventTypes === undefined ?
24✔
56
                        allAfterSnapshot :
57
                        allAfterSnapshot.filter(e => options.eventTypes!.includes(e.type));
6✔
58

59
                await nextCycle();
24✔
60

61
                yield* results;
24✔
62

63
                if (options?.tail === 'last' && allAfterSnapshot.length) {
24✔
64
                        const tailEvent = allAfterSnapshot[allAfterSnapshot.length - 1];
2✔
65
                        const alreadyYieldedTail = results.length && results[results.length - 1] === tailEvent;
2✔
66
                        if (!alreadyYieldedTail)
2✔
67
                                yield tailEvent;
2✔
68
                }
69
        }
70

71
        async* getSagaEvents(sagaId: Identifier, { beforeEvent }: { beforeEvent: IEvent }): IEventStream {
72
                await nextCycle();
4✔
73

74
                const results = this.#events.filter(e =>
4✔
75
                        e.sagaId === sagaId &&
6✔
76
                        e.sagaVersion !== undefined &&
77
                        beforeEvent.sagaVersion !== undefined &&
78
                        e.sagaVersion < beforeEvent.sagaVersion);
79

80
                await nextCycle();
4✔
81

82
                yield* results;
4✔
83
        }
84

85
        async* getEventsByTypes(eventTypes: Readonly<string[]>, options?: EventQueryAfter): IEventStream {
86
                await nextCycle();
10✔
87

88
                const lastEventId = options?.afterEvent?.id;
8✔
89
                if (options?.afterEvent && !lastEventId)
8✔
90
                        throw new TypeError('options.afterEvent.id is required');
2✔
91

92
                let offsetFound = !lastEventId;
6✔
93
                for (const event of this.#events) {
6✔
94
                        if (!offsetFound)
12✔
95
                                offsetFound = event.id === lastEventId;
2✔
96
                        else if (!eventTypes || eventTypes.includes(event.type))
10✔
97
                                yield event;
8✔
98
                }
99
        }
100

101
        /**
102
         * Processes a batch of dispatch pipeline items, extracts the events,
103
         * commits them to the in-memory storage, and returns the original batch.
104
         *
105
         * This method is part of the `IDispatchPipelineProcessor` interface.
106
         */
107
        async process(batch: DispatchPipelineBatch): Promise<DispatchPipelineBatch> {
108
                const events: IEvent[] = [];
44✔
109
                for (const { event } of batch) {
44✔
110
                        if (!event)
50!
NEW
111
                                throw new Error('Event batch does not contain `event`');
×
112

113
                        events.push(event);
50✔
114
                }
115

116
                await this.commitEvents(events);
44✔
117

118
                return batch;
44✔
119
        }
120
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc