• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

snatalenko / node-cqrs / 22745197368

06 Mar 2026 01:43AM UTC coverage: 95.287% (+0.9%) from 94.396%
22745197368

Pull #28

github

web-flow
Merge dd3952cc7 into 828e39903
Pull Request #28: TypeScript and event dispatching pipeline refactoring

428 of 528 branches covered (81.06%)

1043 of 1091 new or added lines in 65 files covered. (95.6%)

3 existing lines in 2 files now uncovered.

1294 of 1358 relevant lines covered (95.29%)

31.11 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

97.5
/src/in-memory/InMemoryMessageBus.ts
1
import type {
2
        ICommand,
3
        IEvent,
4
        IMessageBus,
5
        IMessageHandler,
6
        IObservable,
7
        IObservableQueueProvider
8
} from '../interfaces/index.ts';
9
import {
14✔
10
        assertEvent,
11
        assertFunction,
12
        assertMessage,
13
        assertString
14
} from '../utils/assert.ts';
15

16
/**
17
 * Default implementation of the message bus.
18
 * Keeps all subscriptions and messages in memory.
19
 */
20
export class InMemoryMessageBus implements IMessageBus, IObservableQueueProvider {
14✔
21

22
        protected handlers: Map<string, Set<IMessageHandler>> = new Map();
104✔
23
        protected uniqueEventHandlers: boolean;
24
        protected queueName: string | undefined;
25
        protected queues: Map<string, IMessageBus> = new Map();
104✔
26

27
        constructor({ queueName, uniqueEventHandlers = !!queueName }: {
191✔
28
                queueName?: string,
29
                uniqueEventHandlers?: boolean
30
        } = {}) {
31
                this.queueName = queueName;
104✔
32
                this.uniqueEventHandlers = uniqueEventHandlers;
104✔
33
        }
34

35
        /**
36
         * Subscribe to message type
37
         */
38
        on(messageType: string, handler: IMessageHandler) {
39
                assertString(messageType, 'messageType');
39✔
40
                assertFunction(handler, 'handler');
39✔
41

42
                // Events published to a named queue must be consumed only once.
43
                // For example, for sending a welcome email, NotificationReceptor will subscribe to "notifications:userCreated".
44
                // Since we use an in-memory bus, there is no need to track message handling by multiple distributed
45
                // subscribers, and we only need to make sure that no more than 1 such subscriber will be created
46
                if (!this.handlers.has(messageType))
39✔
47
                        this.handlers.set(messageType, new Set());
35✔
48
                else if (this.uniqueEventHandlers)
4✔
49
                        throw new Error(`"${messageType}" handler is already set up on the "${this.queueName}" queue`);
1✔
50

51
                this.handlers.get(messageType)?.add(handler);
38✔
52
        }
53

54
        /**
55
         * Get or create a named queue.
56
         * Named queues support only one handler per event type.
57
         */
58
        queue(queueName: string): IObservable {
59
                let queue = this.queues.get(queueName);
6✔
60
                if (!queue) {
6✔
61
                        queue = new InMemoryMessageBus({ queueName, uniqueEventHandlers: true });
5✔
62
                        this.queues.set(queueName, queue);
5✔
63
                }
64

65
                return queue;
6✔
66
        }
67

68
        /**
69
         * Remove subscription
70
         */
71
        off(messageType: string, handler: IMessageHandler) {
72
                assertString(messageType, 'messageType');
3✔
73
                assertFunction(handler, 'handler');
3✔
74
                if (!this.handlers.has(messageType))
3!
NEW
75
                        throw new Error(`No ${messageType} subscribers found`);
×
76

77
                this.handlers.get(messageType)?.delete(handler);
3✔
78
        }
79

80
        /**
81
         * Send command to exactly 1 command handler
82
         */
83
        async send(command: ICommand): Promise<any> {
84
                assertMessage(command, 'command');
23✔
85

86
                const handlers = this.handlers.get(command.type);
23✔
87
                if (!handlers || !handlers.size)
23✔
88
                        throw new Error(`No '${command.type}' subscribers found`);
1✔
89
                if (handlers.size > 1)
22✔
90
                        throw new Error(`More than one '${command.type}' subscriber found`);
1✔
91

92
                const commandHandler = handlers.values().next().value;
21✔
93

94
                return commandHandler!(command);
21✔
95
        }
96

97
        /**
98
         * Publish event to all subscribers (if any)
99
         */
100
        async publish(event: IEvent, meta?: Record<string, any>): Promise<unknown[]> {
101
                assertEvent(event, 'event');
61✔
102

103
                const promises: (unknown | Promise<unknown>)[] = [];
61✔
104

105
                for (const handler of this.handlers.get(event.type) ?? []) {
61✔
106
                        try {
11✔
107
                                promises.push(handler(event, meta));
11✔
108
                        }
109
                        catch (err) {
110
                                promises.push(Promise.reject(err));
2✔
111
                        }
112
                }
113

114
                for (const namedQueue of this.queues.values())
61✔
115
                        promises.push(namedQueue.publish(event, meta));
3✔
116

117
                return Promise.all(promises);
61✔
118
        }
119
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc