• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

snatalenko / node-cqrs / 21873348409

10 Feb 2026 04:29PM UTC coverage: 85.173%. First build
21873348409

Pull #31

github

web-flow
Merge 18ef6962d into 025edb883
Pull Request #31: Multi-saga correlation via `message.sagaOrigins`

657 of 993 branches covered (66.16%)

143 of 158 new or added lines in 17 files covered. (90.51%)

1258 of 1477 relevant lines covered (85.17%)

33.43 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

90.2
/src/in-memory/InMemoryEventStorage.ts
1
import type {
2
        IIdentifierProvider,
3
        IEvent,
4
        IEventSet,
5
        EventQueryAfter,
6
        IEventStorageReader,
7
        IEventStream,
8
        IEventStorageWriter,
9
        Identifier,
10
        IDispatchPipelineProcessor,
11
        DispatchPipelineBatch
12
} from '../interfaces/index.ts';
13
import { parseSagaId } from '../utils/index.ts';
28✔
14
import { nextCycle } from './utils/index.ts';
28✔
15

16
/**
17
 * A simple event storage implementation intended to use for tests only.
18
 * Storage content resets on each app restart.
19
 */
20
export class InMemoryEventStorage implements
28✔
21
        IEventStorageReader,
22
        IEventStorageWriter,
23
        IIdentifierProvider,
24
        IDispatchPipelineProcessor {
25

26
        #nextId: number = 0;
86✔
27
        #events: IEventSet = [];
86✔
28

29
        getNewId(): string {
30
                this.#nextId += 1;
20✔
31
                return String(this.#nextId);
20✔
32
        }
33

34
        async commitEvents(events: IEventSet): Promise<IEventSet> {
35
                await nextCycle();
64✔
36

37
                this.#events = this.#events.concat(events);
64✔
38

39
                await nextCycle();
64✔
40

41
                return events;
64✔
42
        }
43

44
        async* getAggregateEvents(aggregateId: Identifier, options?: { snapshot: IEvent }): IEventStream {
45
                await nextCycle();
22✔
46

47
                const afterVersion = options?.snapshot?.aggregateVersion;
22✔
48
                const results = !afterVersion ?
22✔
49
                        this.#events.filter(e => e.aggregateId === aggregateId) :
20✔
50
                        this.#events.filter(e =>
51
                                e.aggregateId === aggregateId &&
4✔
52
                                e.aggregateVersion !== undefined &&
53
                                e.aggregateVersion > afterVersion);
54

55
                await nextCycle();
22✔
56

57
                yield* results;
22✔
58
        }
59

60
        async* getSagaEvents(sagaId: Identifier, { beforeEvent }: { beforeEvent: IEvent }): IEventStream {
61
                await nextCycle();
12✔
62

63
                if (typeof beforeEvent?.id !== 'string' || !beforeEvent.id.length)
12!
NEW
64
                        throw new TypeError('beforeEvent.id is required');
×
65

66
                const { sagaDescriptor, originEventId } = parseSagaId(sagaId);
12✔
67
                if (beforeEvent.sagaOrigins?.[sagaDescriptor] !== originEventId)
12!
NEW
68
                        throw new TypeError('beforeEvent.sagaOrigins does not match sagaId');
×
69

70
                const originOffset = this.#events.findIndex(e => e.id === originEventId);
12✔
71
                if (originOffset === -1)
12!
NEW
72
                        throw new Error(`origin event ${originEventId} not found`);
×
73

74
                const beforeEventOffset = this.#events.findIndex(e => e.id === beforeEvent.id);
30✔
75
                if (beforeEventOffset === -1)
12!
NEW
76
                        throw new Error(`beforeEvent ${beforeEvent.id} not found`);
×
77

78
                const results = this.#events
12✔
79
                        .slice(originOffset, beforeEventOffset)
80
                        .filter(e => e.sagaOrigins?.[sagaDescriptor] === originEventId);
18✔
81

82
                await nextCycle();
12✔
83

84
                yield* results;
12✔
85
        }
86

87
        async* getEventsByTypes(eventTypes: Readonly<string[]>, options?: EventQueryAfter): IEventStream {
88
                await nextCycle();
10✔
89

90
                const lastEventId = options?.afterEvent?.id;
8✔
91
                if (options?.afterEvent && !lastEventId)
8✔
92
                        throw new TypeError('options.afterEvent.id is required');
2✔
93

94
                let offsetFound = !lastEventId;
6✔
95
                for (const event of this.#events) {
6✔
96
                        if (!offsetFound)
12✔
97
                                offsetFound = event.id === lastEventId;
2✔
98
                        else if (!eventTypes || eventTypes.includes(event.type))
10✔
99
                                yield event;
8✔
100
                }
101
        }
102

103
        /**
104
         * Processes a batch of dispatch pipeline items, extracts the events,
105
         * commits them to the in-memory storage, and returns the original batch.
106
         *
107
         * This method is part of the `IDispatchPipelineProcessor` interface.
108
         */
109
        async process(batch: DispatchPipelineBatch): Promise<DispatchPipelineBatch> {
110
                const events: IEvent[] = [];
40✔
111
                for (const { event } of batch) {
40✔
112
                        if (!event)
42!
113
                                throw new Error('Event batch does not contain `event`');
×
114

115
                        events.push(event);
42✔
116
                }
117

118
                await this.commitEvents(events);
40✔
119

120
                return batch;
40✔
121
        }
122
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc