• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

snatalenko / node-cqrs / 21966362039

12 Feb 2026 10:15PM UTC coverage: 85.328% (-9.1%) from 94.396%
21966362039

Pull #28

github

web-flow
Merge 9adf24177 into 828e39903
Pull Request #28: TypeScript and event dispatching pipeline refactoring

671 of 1008 branches covered (66.57%)

927 of 1051 new or added lines in 67 files covered. (88.2%)

49 existing lines in 13 files now uncovered.

1262 of 1479 relevant lines covered (85.33%)

33.76 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

91.38
/src/in-memory/InMemoryEventStorage.ts
1
import type {
2
        IIdentifierProvider,
3
        IEvent,
4
        IEventSet,
5
        EventQueryAfter,
6
        IEventStorageReader,
7
        IEventStream,
8
        IEventStorageWriter,
9
        Identifier,
10
        IDispatchPipelineProcessor,
11
        DispatchPipelineBatch,
12
        AggregateEventsQueryParams
13
} from '../interfaces/index.ts';
14
import { parseSagaId } from '../utils/index.ts';
28✔
15
import { nextCycle } from './utils/index.ts';
28✔
16

17
/**
18
 * A simple event storage implementation intended to use for tests only.
19
 * Storage content resets on each app restart.
20
 */
21
export class InMemoryEventStorage implements
28✔
22
        IEventStorageReader,
23
        IEventStorageWriter,
24
        IIdentifierProvider,
25
        IDispatchPipelineProcessor {
26

27
        #nextId: number = 0;
88✔
28
        #events: IEventSet = [];
88✔
29

30
        getNewId(): string {
31
                this.#nextId += 1;
20✔
32
                return String(this.#nextId);
20✔
33
        }
34

35
        async commitEvents(events: IEventSet): Promise<IEventSet> {
36
                await nextCycle();
68✔
37

38
                this.#events = this.#events.concat(events);
68✔
39

40
                await nextCycle();
68✔
41

42
                return events;
68✔
43
        }
44

45
        async* getAggregateEvents(aggregateId: Identifier, options?: AggregateEventsQueryParams): IEventStream {
46
                await nextCycle();
24✔
47

48
                const afterVersion = options?.snapshot?.aggregateVersion;
24✔
49
                const allAfterSnapshot = !afterVersion ?
24✔
50
                        this.#events.filter(e => e.aggregateId === aggregateId) :
26✔
51
                        this.#events.filter(e =>
52
                                e.aggregateId === aggregateId &&
4✔
53
                                e.aggregateVersion !== undefined &&
54
                                e.aggregateVersion > afterVersion);
55

56
                const results = options?.eventTypes === undefined ?
24✔
57
                        allAfterSnapshot :
58
                        allAfterSnapshot.filter(e => options.eventTypes!.includes(e.type));
6✔
59

60
                await nextCycle();
24✔
61

62
                yield* results;
24✔
63

64
                if (options?.tail === 'last' && allAfterSnapshot.length) {
24✔
65
                        const tailEvent = allAfterSnapshot[allAfterSnapshot.length - 1];
2✔
66
                        const alreadyYieldedTail = results.length && results[results.length - 1] === tailEvent;
2✔
67
                        if (!alreadyYieldedTail)
2✔
68
                                yield tailEvent;
2✔
69
                }
70
        }
71

72
        async* getSagaEvents(sagaId: Identifier, { beforeEvent }: { beforeEvent: IEvent }): IEventStream {
73
                await nextCycle();
12✔
74

75
                if (typeof beforeEvent?.id !== 'string' || !beforeEvent.id.length)
12✔
NEW
76
                        throw new TypeError('beforeEvent.id is required');
×
77

78
                const { sagaDescriptor, originEventId } = parseSagaId(sagaId);
12✔
79
                if (beforeEvent.sagaOrigins?.[sagaDescriptor] !== originEventId)
12!
NEW
80
                        throw new TypeError('beforeEvent.sagaOrigins does not match sagaId');
×
81

82
                const originOffset = this.#events.findIndex(e => e.id === originEventId);
12✔
83
                if (originOffset === -1)
12!
NEW
84
                        throw new Error(`origin event ${originEventId} not found`);
×
85

86
                const beforeEventOffset = this.#events.findIndex(e => e.id === beforeEvent.id);
30✔
87
                if (beforeEventOffset === -1)
12!
NEW
88
                        throw new Error(`beforeEvent ${beforeEvent.id} not found`);
×
89

90
                const results = this.#events
12✔
91
                        .slice(originOffset, beforeEventOffset)
92
                        .filter(e => e.sagaOrigins?.[sagaDescriptor] === originEventId);
18✔
93

94
                await nextCycle();
12✔
95

96
                yield* results;
12✔
97
        }
98

99
        async* getEventsByTypes(eventTypes: Readonly<string[]>, options?: EventQueryAfter): IEventStream {
100
                await nextCycle();
10✔
101

102
                const lastEventId = options?.afterEvent?.id;
8✔
103
                if (options?.afterEvent && !lastEventId)
8✔
104
                        throw new TypeError('options.afterEvent.id is required');
2✔
105

106
                let offsetFound = !lastEventId;
6✔
107
                for (const event of this.#events) {
6✔
108
                        if (!offsetFound)
12✔
109
                                offsetFound = event.id === lastEventId;
2✔
110
                        else if (!eventTypes || eventTypes.includes(event.type))
10✔
111
                                yield event;
8✔
112
                }
113
        }
114

115
        /**
116
         * Processes a batch of dispatch pipeline items, extracts the events,
117
         * commits them to the in-memory storage, and returns the original batch.
118
         *
119
         * This method is part of the `IDispatchPipelineProcessor` interface.
120
         */
121
        async process(batch: DispatchPipelineBatch): Promise<DispatchPipelineBatch> {
122
                const events: IEvent[] = [];
44✔
123
                for (const { event } of batch) {
44✔
124
                        if (!event)
50!
NEW
125
                                throw new Error('Event batch does not contain `event`');
×
126

127
                        events.push(event);
50✔
128
                }
129

130
                await this.commitEvents(events);
44✔
131

132
                return batch;
44✔
133
        }
134
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc