• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

snatalenko / node-cqrs / 21717407497

05 Feb 2026 03:26PM UTC coverage: 84.53% (-9.9%) from 94.396%
21717407497

Pull #28

github

web-flow
Merge 025edb883 into 828e39903
Pull Request #28: TypeScript and event dispatching pipeline refactoring

611 of 939 branches covered (65.07%)

819 of 934 new or added lines in 65 files covered. (87.69%)

59 existing lines in 13 files now uncovered.

1213 of 1435 relevant lines covered (84.53%)

28.39 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

97.44
/src/in-memory/InMemoryEventStorage.ts
1
import type {
2
        IIdentifierProvider,
3
        IEvent,
4
        IEventSet,
5
        EventQueryAfter,
6
        IEventStorageReader,
7
        IEventStream,
8
        IEventStorageWriter,
9
        Identifier,
10
        IDispatchPipelineProcessor,
11
        DispatchPipelineBatch
12
} from '../interfaces/index.ts';
13
import { nextCycle } from './utils/index.ts';
24✔
14

15
/**
16
 * A simple event storage implementation intended to use for tests only.
17
 * Storage content resets on each app restart.
18
 */
19
export class InMemoryEventStorage implements
24✔
20
        IEventStorageReader,
21
        IEventStorageWriter,
22
        IIdentifierProvider,
23
        IDispatchPipelineProcessor {
24

25
        #nextId: number = 0;
58✔
26
        #events: IEventSet = [];
58✔
27

28
        getNewId(): string {
29
                this.#nextId += 1;
24✔
30
                return String(this.#nextId);
24✔
31
        }
32

33
        async commitEvents(events: IEventSet): Promise<IEventSet> {
34
                await nextCycle();
54✔
35

36
                this.#events = this.#events.concat(events);
54✔
37

38
                await nextCycle();
54✔
39

40
                return events;
54✔
41
        }
42

43
        async* getAggregateEvents(aggregateId: Identifier, options?: { snapshot: IEvent }): IEventStream {
44
                await nextCycle();
22✔
45

46
                const afterVersion = options?.snapshot?.aggregateVersion;
22✔
47
                const results = !afterVersion ?
22✔
48
                        this.#events.filter(e => e.aggregateId === aggregateId) :
20✔
49
                        this.#events.filter(e =>
50
                                e.aggregateId === aggregateId &&
4✔
51
                                e.aggregateVersion !== undefined &&
52
                                e.aggregateVersion > afterVersion);
53

54
                await nextCycle();
22✔
55

56
                yield* results;
22✔
57
        }
58

59
        async* getSagaEvents(sagaId: Identifier, { beforeEvent }: { beforeEvent: IEvent }): IEventStream {
60
                await nextCycle();
4✔
61

62
                const results = this.#events.filter(e =>
4✔
63
                        e.sagaId === sagaId &&
6✔
64
                        e.sagaVersion !== undefined &&
65
                        beforeEvent.sagaVersion !== undefined &&
66
                        e.sagaVersion < beforeEvent.sagaVersion);
67

68
                await nextCycle();
4✔
69

70
                yield* results;
4✔
71
        }
72

73
        async* getEventsByTypes(eventTypes: Readonly<string[]>, options?: EventQueryAfter): IEventStream {
74
                await nextCycle();
10✔
75

76
                const lastEventId = options?.afterEvent?.id;
8✔
77
                if (options?.afterEvent && !lastEventId)
8✔
78
                        throw new TypeError('options.afterEvent.id is required');
2✔
79

80
                let offsetFound = !lastEventId;
6✔
81
                for (const event of this.#events) {
6✔
82
                        if (!offsetFound)
12✔
83
                                offsetFound = event.id === lastEventId;
2✔
84
                        else if (!eventTypes || eventTypes.includes(event.type))
10✔
85
                                yield event;
8✔
86
                }
87
        }
88

89
        /**
90
         * Processes a batch of dispatch pipeline items, extracts the events,
91
         * commits them to the in-memory storage, and returns the original batch.
92
         *
93
         * This method is part of the `IDispatchPipelineProcessor` interface.
94
         */
95
        async process(batch: DispatchPipelineBatch): Promise<DispatchPipelineBatch> {
96
                const events: IEvent[] = [];
40✔
97
                for (const { event } of batch) {
40✔
98
                        if (!event)
42!
NEW
99
                                throw new Error('Event batch does not contain `event`');
×
100

101
                        events.push(event);
42✔
102
                }
103

104
                await this.commitEvents(events);
40✔
105

106
                return batch;
40✔
107
        }
108
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc