• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

snatalenko / node-cqrs / 22268237579

22 Feb 2026 01:41AM UTC coverage: 86.867% (-7.5%) from 94.396%
22268237579

Pull #28

github

web-flow
Merge 2c467a1d5 into 828e39903
Pull Request #28: TypeScript and event dispatching pipeline refactoring

475 of 691 branches covered (68.74%)

882 of 973 new or added lines in 65 files covered. (90.65%)

49 existing lines in 13 files now uncovered.

1217 of 1401 relevant lines covered (86.87%)

21.53 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

92.31
/src/in-memory/InMemoryEventStorage.ts
1
import type {
2
        IIdentifierProvider,
3
        IEvent,
4
        IEventSet,
5
        EventQueryAfter,
6
        IEventStorageReader,
7
        IEventStream,
8
        IEventStorageWriter,
9
        Identifier,
10
        IDispatchPipelineProcessor,
11
        DispatchPipelineBatch,
12
        AggregateEventsQueryParams
13
} from '../interfaces/index.ts';
14
import { parseSagaId } from '../utils/index.ts';
14✔
15
import { nextCycle } from './utils/index.ts';
14✔
16
import { ConcurrencyError } from '../errors/index.ts';
14✔
17

18
/**
19
 * A simple event storage implementation intended to use for tests only.
20
 * Storage content resets on each app restart.
21
 */
22
export class InMemoryEventStorage implements
14✔
23
        IEventStorageReader,
24
        IEventStorageWriter,
25
        IIdentifierProvider,
26
        IDispatchPipelineProcessor {
27

28
        #nextId: number = 0;
57✔
29
        #events: IEventSet = [];
57✔
30

31
        getNewId(): string {
32
                this.#nextId += 1;
10✔
33
                return String(this.#nextId);
10✔
34
        }
35

36
        async commitEvents(events: IEventSet): Promise<IEventSet> {
37
                await nextCycle();
48✔
38

39
                for (const event of events) {
48✔
40
                        if (event.aggregateId !== undefined && event.aggregateVersion !== undefined) {
67✔
41
                                const conflict = this.#events.find(e =>
39✔
42
                                        e.aggregateId === event.aggregateId &&
22✔
43
                                        e.aggregateVersion === event.aggregateVersion);
44
                                if (conflict)
39✔
45
                                        throw new ConcurrencyError(`Duplicate aggregateVersion ${event.aggregateVersion} for aggregate ${event.aggregateId}`);
1✔
46
                        }
47
                }
48

49
                this.#events = this.#events.concat(events);
47✔
50

51
                await nextCycle();
47✔
52

53
                return events;
47✔
54
        }
55

56
        async* getAggregateEvents(aggregateId: Identifier, options?: AggregateEventsQueryParams): IEventStream {
57
                await nextCycle();
45✔
58

59
                const afterVersion = options?.snapshot?.aggregateVersion;
44✔
60
                const allAfterSnapshot = !afterVersion ?
44✔
61
                        this.#events.filter(e => e.aggregateId === aggregateId) :
41✔
62
                        this.#events.filter(e =>
63
                                e.aggregateId === aggregateId &&
2✔
64
                                e.aggregateVersion !== undefined &&
65
                                e.aggregateVersion > afterVersion);
66

67
                const results = options?.eventTypes === undefined ?
44✔
68
                        allAfterSnapshot :
69
                        allAfterSnapshot.filter(e => options.eventTypes!.includes(e.type));
3✔
70

71
                await nextCycle();
44✔
72

73
                yield* results;
44✔
74

75
                if (options?.tail === 'last' && allAfterSnapshot.length) {
44✔
76
                        const tailEvent = allAfterSnapshot[allAfterSnapshot.length - 1];
1✔
77
                        const alreadyYieldedTail = results.length && results[results.length - 1] === tailEvent;
1✔
78
                        if (!alreadyYieldedTail)
1✔
79
                                yield tailEvent;
1✔
80
                }
81
        }
82

83
        async* getSagaEvents(sagaId: Identifier, { beforeEvent }: { beforeEvent: IEvent }): IEventStream {
84
                await nextCycle();
6✔
85

86
                if (typeof beforeEvent?.id !== 'string' || !beforeEvent.id.length)
6!
NEW
87
                        throw new TypeError('beforeEvent.id is required');
×
88

89
                const { sagaDescriptor, originEventId } = parseSagaId(sagaId);
6✔
90
                if (beforeEvent.sagaOrigins?.[sagaDescriptor] !== originEventId)
6!
NEW
91
                        throw new TypeError('beforeEvent.sagaOrigins does not match sagaId');
×
92

93
                const originOffset = this.#events.findIndex(e => e.id === originEventId);
6✔
94
                if (originOffset === -1)
6!
NEW
95
                        throw new Error(`origin event ${originEventId} not found`);
×
96

97
                const beforeEventOffset = this.#events.findIndex(e => e.id === beforeEvent.id);
15✔
98
                if (beforeEventOffset === -1)
6!
NEW
99
                        throw new Error(`beforeEvent ${beforeEvent.id} not found`);
×
100

101
                const results = this.#events
6✔
102
                        .slice(originOffset, beforeEventOffset)
103
                        .filter(e => e.sagaOrigins?.[sagaDescriptor] === originEventId);
9✔
104

105
                await nextCycle();
6✔
106

107
                yield* results;
6✔
108
        }
109

110
        async* getEventsByTypes(eventTypes: Readonly<string[]>, options?: EventQueryAfter): IEventStream {
111
                await nextCycle();
5✔
112

113
                const lastEventId = options?.afterEvent?.id;
4✔
114
                if (options?.afterEvent && !lastEventId)
4✔
115
                        throw new TypeError('options.afterEvent.id is required');
1✔
116

117
                let offsetFound = !lastEventId;
3✔
118
                for (const event of this.#events) {
3✔
119
                        if (!offsetFound)
6✔
120
                                offsetFound = event.id === lastEventId;
1✔
121
                        else if (!eventTypes || eventTypes.includes(event.type))
5✔
122
                                yield event;
4✔
123
                }
124
        }
125

126
        /**
127
         * Processes a batch of dispatch pipeline items, extracts the events,
128
         * commits them to the in-memory storage, and returns the original batch.
129
         *
130
         * This method is part of the `IDispatchPipelineProcessor` interface.
131
         */
132
        async process(batch: DispatchPipelineBatch): Promise<DispatchPipelineBatch> {
133
                const events: IEvent[] = [];
34✔
134
                for (const { event } of batch) {
34✔
135
                        if (!event)
37!
NEW
136
                                throw new Error('Event batch does not contain `event`');
×
137

138
                        events.push(event);
37✔
139
                }
140

141
                await this.commitEvents(events);
34✔
142

143
                return batch;
34✔
144
        }
145
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc