• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

snatalenko / node-cqrs / 22745360142

06 Mar 2026 01:50AM UTC coverage: 95.287% (+0.9%) from 94.396%
22745360142

Pull #28

github

web-flow
Merge 15ef847b4 into 828e39903
Pull Request #28: TypeScript and event dispatching pipeline refactoring

428 of 528 branches covered (81.06%)

1043 of 1091 new or added lines in 65 files covered. (95.6%)

3 existing lines in 2 files now uncovered.

1294 of 1358 relevant lines covered (95.29%)

31.11 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

94.03
/src/in-memory/InMemoryEventStorage.ts
1
import type {
2
        IIdentifierProvider,
3
        IEvent,
4
        IEventSet,
5
        EventQueryAfter,
6
        IEventStorageReader,
7
        IEventStream,
8
        Identifier,
9
        IDispatchPipelineProcessor,
10
        DispatchPipelineBatch,
11
        AggregateEventsQueryParams
12
} from '../interfaces/index.ts';
13
import { assertString, parseSagaId } from '../utils/index.ts';
14✔
14
import { nextCycle } from './utils/index.ts';
14✔
15
import { ConcurrencyError } from '../errors/index.ts';
14✔
16

17
/**
18
 * A simple event storage implementation intended to use for tests only.
19
 * Storage content resets on each app restart.
20
 */
21
export class InMemoryEventStorage implements
14✔
22
        IEventStorageReader,
23
        IIdentifierProvider,
24
        IDispatchPipelineProcessor {
25

26
        #nextId: number = 0;
65✔
27
        #events: IEventSet = [];
65✔
28

29
        getNewId(): string {
30
                this.#nextId += 1;
11✔
31
                return String(this.#nextId);
11✔
32
        }
33

34
        async commitEvents(events: IEventSet, options?: { ignoreConcurrencyError?: boolean }): Promise<IEventSet> {
35
                await nextCycle();
59✔
36

37
                if (!options?.ignoreConcurrencyError) {
59✔
38
                        for (const event of events) {
54✔
39
                                if (event.aggregateId !== undefined && event.aggregateVersion !== undefined) {
73✔
40
                                        const conflict = this.#events.find(e =>
45✔
41
                                                e.aggregateId === event.aggregateId &&
22✔
42
                                                e.aggregateVersion === event.aggregateVersion);
43
                                        if (conflict)
45✔
44
                                                throw new ConcurrencyError(`Duplicate aggregateVersion ${event.aggregateVersion} for aggregate ${event.aggregateId}`);
1✔
45
                                }
46
                        }
47
                }
48

49
                this.#events = this.#events.concat(events);
58✔
50

51
                await nextCycle();
58✔
52

53
                return events;
58✔
54
        }
55

56
        async* getAggregateEvents(aggregateId: Identifier, options?: AggregateEventsQueryParams): IEventStream {
57
                await nextCycle();
64✔
58

59
                const afterVersion = options?.snapshot?.aggregateVersion;
64✔
60
                const allAfterSnapshot = !afterVersion ?
64✔
61
                        this.#events.filter(e => e.aggregateId === aggregateId) :
56✔
62
                        this.#events.filter(e =>
63
                                e.aggregateId === aggregateId &&
2✔
64
                                e.aggregateVersion !== undefined &&
65
                                e.aggregateVersion > afterVersion);
66

67
                const results = options?.eventTypes === undefined ?
64✔
68
                        allAfterSnapshot :
69
                        allAfterSnapshot.filter(e => options.eventTypes!.includes(e.type));
3✔
70

71
                await nextCycle();
64✔
72

73
                yield* results;
64✔
74

75
                if (options?.tail === 'last' && allAfterSnapshot.length) {
64✔
76
                        const tailEvent = allAfterSnapshot[allAfterSnapshot.length - 1];
1✔
77
                        const alreadyYieldedTail = results.length && results[results.length - 1] === tailEvent;
1✔
78
                        if (!alreadyYieldedTail)
1✔
79
                                yield tailEvent;
1✔
80
                }
81
        }
82

83
        async* getSagaEvents(sagaId: Identifier, { beforeEvent }: { beforeEvent: IEvent }): IEventStream {
84
                await nextCycle();
6✔
85

86
                assertString(beforeEvent?.id, 'beforeEvent.id');
6✔
87

88
                const { sagaDescriptor, originEventId } = parseSagaId(sagaId);
6✔
89
                if (beforeEvent.sagaOrigins?.[sagaDescriptor] !== originEventId)
6!
NEW
90
                        throw new TypeError('beforeEvent.sagaOrigins does not match sagaId');
×
91

92
                const originOffset = this.#events.findIndex(e => e.id === originEventId);
6✔
93
                if (originOffset === -1)
6!
NEW
94
                        throw new Error(`origin event ${originEventId} not found`);
×
95

96
                const beforeEventOffset = this.#events.findIndex(e => e.id === beforeEvent.id);
15✔
97
                if (beforeEventOffset === -1)
6!
NEW
98
                        throw new Error(`beforeEvent ${beforeEvent.id} not found`);
×
99

100
                const results = this.#events
6✔
101
                        .slice(originOffset, beforeEventOffset)
102
                        .filter(e => e.sagaOrigins?.[sagaDescriptor] === originEventId);
9✔
103

104
                await nextCycle();
6✔
105

106
                yield* results;
6✔
107
        }
108

109
        async* getEventsByTypes(eventTypes: Readonly<string[]>, options?: EventQueryAfter): IEventStream {
110
                await nextCycle();
5✔
111

112
                const lastEventId = options?.afterEvent?.id;
4✔
113
                if (options?.afterEvent)
4✔
114
                        assertString(options.afterEvent.id, 'options.afterEvent.id');
2✔
115

116
                let offsetFound = !lastEventId;
3✔
117
                for (const event of this.#events) {
3✔
118
                        if (!offsetFound)
6✔
119
                                offsetFound = event.id === lastEventId;
1✔
120
                        else if (!eventTypes || eventTypes.includes(event.type))
5✔
121
                                yield event;
4✔
122
                }
123
        }
124

125
        /**
126
         * Processes a batch of dispatch pipeline items, extracts the events,
127
         * commits them to the in-memory storage, and returns the original batch.
128
         *
129
         * This method is part of the `IDispatchPipelineProcessor` interface.
130
         */
131
        async process(batch: DispatchPipelineBatch): Promise<DispatchPipelineBatch> {
132
                const events: IEvent[] = [];
43✔
133
                for (const { event } of batch) {
43✔
134
                        if (!event)
46!
NEW
135
                                throw new Error('Event batch does not contain `event`');
×
136

137
                        events.push(event);
46✔
138
                }
139

140
                if (batch.at(0)?.ignoreConcurrencyError)
43✔
141
                        await this.commitEvents(events, { ignoreConcurrencyError: true });
4✔
142
                else
143
                        await this.commitEvents(events);
39✔
144

145
                return batch;
43✔
146
        }
147
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc