• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

nestjs / nest / f7142b24-e394-4028-89c6-717e281431d1

25 Nov 2024 01:12PM UTC coverage: 90.141% (-1.8%) from 91.989%
f7142b24-e394-4028-89c6-717e281431d1

Pull #14177

circleci

web-flow
Merge pull request #14200 from nestjs/feat/allow-queue-per-handler

feat(microservices): support nats queue per handler
Pull Request #14177: release: version 11.0.0

2612 of 3236 branches covered (80.72%)

496 of 704 new or added lines in 48 files covered. (70.45%)

79 existing lines in 11 files now uncovered.

6985 of 7749 relevant lines covered (90.14%)

16.31 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

67.81
/packages/microservices/client/client-rmq.ts
1
import { Logger } from '@nestjs/common/services/logger.service';
1✔
2
import { loadPackage } from '@nestjs/common/utils/load-package.util';
1✔
3
import { randomStringGenerator } from '@nestjs/common/utils/random-string-generator.util';
1✔
4
import { isFunction } from '@nestjs/common/utils/shared.utils';
1✔
5
import { EventEmitter } from 'events';
1✔
6
import {
1✔
7
  EmptyError,
8
  firstValueFrom,
9
  fromEvent,
10
  merge,
11
  Observable,
12
  ReplaySubject,
13
} from 'rxjs';
14
import { first, map, retryWhen, scan, skip, switchMap } from 'rxjs/operators';
1✔
15
import {
1✔
16
  DISCONNECTED_RMQ_MESSAGE,
17
  RQM_DEFAULT_IS_GLOBAL_PREFETCH_COUNT,
18
  RQM_DEFAULT_NO_ASSERT,
19
  RQM_DEFAULT_NOACK,
20
  RQM_DEFAULT_PERSISTENT,
21
  RQM_DEFAULT_PREFETCH_COUNT,
22
  RQM_DEFAULT_QUEUE,
23
  RQM_DEFAULT_QUEUE_OPTIONS,
24
  RQM_DEFAULT_URL,
25
} from '../constants';
26
import { RmqEvents, RmqEventsMap, RmqStatus } from '../events/rmq.events';
27
import { ReadPacket, RmqOptions, WritePacket } from '../interfaces';
28
import { RmqRecord } from '../record-builders';
29
import { RmqRecordSerializer } from '../serializers/rmq-record.serializer';
1✔
30
import { ClientProxy } from './client-proxy';
1✔
31

32
// To enable type safety for RMQ. This cant be uncommented by default
33
// because it would require the user to install the amqplib package even if they dont use RabbitMQ
34
// Otherwise, TypeScript would fail to compile the code.
35
//
36
// type AmqpConnectionManager =
37
//   import('amqp-connection-manager').AmqpConnectionManager;
38
// type ChannelWrapper = import('amqp-connection-manager').ChannelWrapper;
39
// type Channel = import('amqplib').Channel;
40
// type ConsumeMessage = import('amqplib').ConsumeMessage;
41

42
type Channel = any;
43
type ChannelWrapper = any;
44
type ConsumeMessage = any;
45
type AmqpConnectionManager = any;
46

47
let rmqPackage = {} as any; // typeof import('amqp-connection-manager');
1✔
48

49
const REPLY_QUEUE = 'amq.rabbitmq.reply-to';
1✔
50

51
/**
52
 * @publicApi
53
 */
54
export class ClientRMQ extends ClientProxy<RmqEvents, RmqStatus> {
1✔
55
  protected readonly logger = new Logger(ClientProxy.name);
20✔
56
  protected connection$: ReplaySubject<any>;
57
  protected connectionPromise: Promise<void>;
58
  protected client: AmqpConnectionManager = null;
20✔
59
  protected channel: ChannelWrapper = null;
20✔
60
  protected pendingEventListeners: Array<{
20✔
61
    event: keyof RmqEvents;
62
    callback: RmqEvents[keyof RmqEvents];
63
  }> = [];
64
  protected isInitialConnect = true;
20✔
65
  protected responseEmitter: EventEmitter;
66
  protected queue: string;
67
  protected queueOptions: Record<string, any>;
68
  protected replyQueue: string;
69
  protected noAssert: boolean;
70

71
  constructor(protected readonly options: RmqOptions['options']) {
20✔
72
    super();
20✔
73
    this.queue =
20✔
74
      this.getOptionsProp(this.options, 'queue') || RQM_DEFAULT_QUEUE;
40✔
75
    this.queueOptions =
20✔
76
      this.getOptionsProp(this.options, 'queueOptions') ||
40✔
77
      RQM_DEFAULT_QUEUE_OPTIONS;
78
    this.replyQueue =
20✔
79
      this.getOptionsProp(this.options, 'replyQueue') || REPLY_QUEUE;
40✔
80
    this.noAssert =
20✔
81
      this.getOptionsProp(this.options, 'noAssert') ??
60✔
82
      this.queueOptions.noAssert ??
83
      RQM_DEFAULT_NO_ASSERT;
84

85
    loadPackage('amqplib', ClientRMQ.name, () => require('amqplib'));
20✔
86
    rmqPackage = loadPackage('amqp-connection-manager', ClientRMQ.name, () =>
20✔
87
      require('amqp-connection-manager'),
20✔
88
    );
89

90
    this.initializeSerializer(options);
20✔
91
    this.initializeDeserializer(options);
20✔
92
  }
93

94
  public close(): void {
95
    this.channel && this.channel.close();
2✔
96
    this.client && this.client.close();
2✔
97
    this.channel = null;
2✔
98
    this.client = null;
2✔
99
    this.pendingEventListeners = [];
2✔
100
  }
101

102
  public connect(): Promise<any> {
103
    if (this.client) {
3!
NEW
104
      return this.connectionPromise;
×
105
    }
106
    this.client = this.createClient();
3✔
107

108
    this.registerErrorListener(this.client);
3✔
109
    this.registerDisconnectListener(this.client);
3✔
110
    this.registerConnectListener(this.client);
3✔
111
    this.pendingEventListeners.forEach(({ event, callback }) =>
3✔
NEW
112
      this.client.on(event, callback),
×
113
    );
114
    this.pendingEventListeners = [];
3✔
115

116
    this.responseEmitter = new EventEmitter();
3✔
117
    this.responseEmitter.setMaxListeners(0);
3✔
118

119
    const connect$ = this.connect$(this.client);
3✔
120
    const withDisconnect$ = this.mergeDisconnectEvent(
3✔
121
      this.client,
122
      connect$,
123
    ).pipe(switchMap(() => this.createChannel()));
×
124

125
    const withReconnect$ = fromEvent(this.client, RmqEventsMap.CONNECT).pipe(
3✔
126
      skip(1),
127
    );
128
    const source$ = merge(withDisconnect$, withReconnect$);
3✔
129

130
    this.connection$ = new ReplaySubject(1);
3✔
131
    source$.subscribe(this.connection$);
3✔
132
    this.connectionPromise = this.convertConnectionToPromise();
3✔
133

134
    return this.connectionPromise;
3✔
135
  }
136

137
  public createChannel(): Promise<void> {
138
    return new Promise(resolve => {
2✔
139
      this.channel = this.client.createChannel({
2✔
140
        json: false,
141
        setup: (channel: Channel) => this.setupChannel(channel, resolve),
2✔
142
      });
143
    });
144
  }
145

146
  public createClient(): AmqpConnectionManager {
147
    const socketOptions = this.getOptionsProp(this.options, 'socketOptions');
×
NEW
148
    const urls = this.getOptionsProp(this.options, 'urls') || [RQM_DEFAULT_URL];
×
NEW
149
    return rmqPackage.connect(urls, socketOptions);
×
150
  }
151

152
  public mergeDisconnectEvent<T = any>(
153
    instance: any,
154
    source$: Observable<T>,
155
  ): Observable<T> {
UNCOV
156
    const eventToError = (eventType: string) =>
×
157
      fromEvent(instance, eventType).pipe(
×
158
        map((err: unknown) => {
UNCOV
159
          throw err;
×
160
        }),
161
      );
NEW
162
    const disconnect$ = eventToError(RmqEventsMap.DISCONNECT);
×
163

UNCOV
164
    const urls = this.getOptionsProp(this.options, 'urls', []);
×
NEW
165
    const connectFailedEventKey = 'connectFailed';
×
NEW
166
    const connectFailed$ = eventToError(connectFailedEventKey).pipe(
×
167
      retryWhen(e =>
168
        e.pipe(
×
169
          scan((errorCount, error: any) => {
UNCOV
170
            if (urls.indexOf(error.url) >= urls.length - 1) {
×
171
              throw error;
×
172
            }
UNCOV
173
            return errorCount + 1;
×
174
          }, 0),
175
        ),
176
      ),
177
    );
178
    // If we ever decide to propagate all disconnect errors & re-emit them through
179
    // the "connection" stream then comment out "first()" operator.
UNCOV
180
    return merge(source$, disconnect$, connectFailed$).pipe(first());
×
181
  }
182

183
  public async convertConnectionToPromise() {
184
    try {
3✔
185
      return await firstValueFrom(this.connection$);
3✔
186
    } catch (err) {
187
      if (err instanceof EmptyError) {
3!
UNCOV
188
        return;
×
189
      }
190
      throw err;
3✔
191
    }
192
  }
193

194
  public async setupChannel(channel: Channel, resolve: Function) {
195
    const prefetchCount =
196
      this.getOptionsProp(this.options, 'prefetchCount') ||
5!
197
      RQM_DEFAULT_PREFETCH_COUNT;
198
    const isGlobalPrefetchCount =
199
      this.getOptionsProp(this.options, 'isGlobalPrefetchCount') ||
5!
200
      RQM_DEFAULT_IS_GLOBAL_PREFETCH_COUNT;
201

202
    if (!this.noAssert) {
5✔
203
      await channel.assertQueue(this.queue, this.queueOptions);
1✔
204
    }
205

206
    if (this.options.exchange && this.options.routingKey) {
5!
NEW
207
      await channel.bindQueue(
×
208
        this.queue,
209
        this.options.exchange,
210
        this.options.routingKey,
211
      );
212
    }
213

214
    await channel.prefetch(prefetchCount, isGlobalPrefetchCount);
5✔
215
    await this.consumeChannel(channel);
5✔
216
    resolve();
5✔
217
  }
218

219
  public async consumeChannel(channel: Channel) {
220
    const noAck = this.getOptionsProp(this.options, 'noAck', RQM_DEFAULT_NOACK);
1✔
221
    await channel.consume(
1✔
222
      this.replyQueue,
223
      (msg: ConsumeMessage) =>
224
        this.responseEmitter.emit(msg.properties.correlationId, msg),
1✔
225
      {
226
        noAck,
227
      },
228
    );
229
  }
230

231
  public registerErrorListener(client: AmqpConnectionManager): void {
232
    client.addListener(RmqEventsMap.ERROR, (err: any) =>
3✔
NEW
233
      this.logger.error(err),
×
234
    );
235
  }
236

237
  public registerDisconnectListener(client: AmqpConnectionManager): void {
238
    client.addListener(RmqEventsMap.DISCONNECT, (err: any) => {
3✔
NEW
239
      this._status$.next(RmqStatus.DISCONNECTED);
×
240

NEW
241
      if (!this.isInitialConnect) {
×
NEW
242
        this.connectionPromise = Promise.reject(
×
243
          'Error: Connection lost. Trying to reconnect...',
244
        );
245

246
        // Prevent unhandled promise rejection
NEW
247
        this.connectionPromise.catch(() => {});
×
248
      }
249

UNCOV
250
      this.logger.error(DISCONNECTED_RMQ_MESSAGE);
×
UNCOV
251
      this.logger.error(err);
×
252
    });
253
  }
254

255
  private registerConnectListener(client: AmqpConnectionManager): void {
256
    client.addListener(RmqEventsMap.CONNECT, () => {
3✔
NEW
257
      this._status$.next(RmqStatus.CONNECTED);
×
NEW
258
      this.logger.log('Successfully connected to RMQ broker');
×
259

NEW
260
      if (this.isInitialConnect) {
×
NEW
261
        this.isInitialConnect = false;
×
262

NEW
263
        if (!this.channel) {
×
NEW
264
          this.connectionPromise = this.createChannel();
×
265
        }
266
      } else {
NEW
267
        this.connectionPromise = Promise.resolve();
×
268
      }
269
    });
270
  }
271

272
  public on<
273
    EventKey extends keyof RmqEvents = keyof RmqEvents,
274
    EventCallback extends RmqEvents[EventKey] = RmqEvents[EventKey],
275
  >(event: EventKey, callback: EventCallback) {
NEW
276
    if (this.client) {
×
NEW
277
      this.client.addListener(event, callback);
×
278
    } else {
NEW
279
      this.pendingEventListeners.push({ event, callback });
×
280
    }
281
  }
282

283
  public unwrap<T>(): T {
NEW
284
    if (!this.client) {
×
NEW
285
      throw new Error(
×
286
        'Not initialized. Please call the "connect" method first.',
287
      );
288
    }
NEW
289
    return this.client as T;
×
290
  }
291

292
  public async handleMessage(
293
    packet: unknown,
294
    callback: (packet: WritePacket) => any,
295
  ): Promise<void>;
296
  public async handleMessage(
297
    packet: unknown,
298
    options: Record<string, unknown>,
299
    callback: (packet: WritePacket) => any,
300
  ): Promise<void>;
301
  public async handleMessage(
302
    packet: unknown,
303
    options: Record<string, unknown> | ((packet: WritePacket) => any),
304
    callback?: (packet: WritePacket) => any,
305
  ): Promise<void> {
306
    if (isFunction(options)) {
3!
307
      callback = options as (packet: WritePacket) => any;
3✔
308
      options = undefined;
3✔
309
    }
310

311
    const { err, response, isDisposed } = await this.deserializer.deserialize(
3✔
312
      packet,
313
      options,
314
    );
315
    if (isDisposed || err) {
3✔
316
      callback({
2✔
317
        err,
318
        response,
319
        isDisposed: true,
320
      });
321
    }
322
    callback({
3✔
323
      err,
324
      response,
325
    });
326
  }
327

328
  protected publish(
329
    message: ReadPacket,
330
    callback: (packet: WritePacket) => any,
331
  ): () => void {
332
    try {
7✔
333
      const correlationId = randomStringGenerator();
7✔
334
      const listener = ({
7✔
335
        content,
336
        options,
337
      }: {
338
        content: Buffer;
339
        options: Record<string, unknown>;
340
      }) =>
UNCOV
341
        this.handleMessage(
×
342
          this.parseMessageContent(content),
343
          options,
344
          callback,
345
        );
346

347
      Object.assign(message, { id: correlationId });
7✔
348
      const serializedPacket: ReadPacket & Partial<RmqRecord> =
349
        this.serializer.serialize(message);
7✔
350

351
      const options = serializedPacket.options;
7✔
352
      delete serializedPacket.options;
7✔
353

354
      this.responseEmitter.on(correlationId, listener);
7✔
355
      this.channel
7✔
356
        .sendToQueue(
357
          this.queue,
358
          Buffer.from(JSON.stringify(serializedPacket)),
359
          {
360
            replyTo: this.replyQueue,
361
            persistent: this.getOptionsProp(
362
              this.options,
363
              'persistent',
364
              RQM_DEFAULT_PERSISTENT,
365
            ),
366
            ...options,
367
            headers: this.mergeHeaders(options?.headers),
368
            correlationId,
369
          },
370
        )
UNCOV
371
        .catch(err => callback({ err }));
×
372
      return () => this.responseEmitter.removeListener(correlationId, listener);
7✔
373
    } catch (err) {
UNCOV
374
      callback({ err });
×
375
    }
376
  }
377

378
  protected dispatchEvent(packet: ReadPacket): Promise<any> {
379
    const serializedPacket: ReadPacket & Partial<RmqRecord> =
380
      this.serializer.serialize(packet);
6✔
381

382
    const options = serializedPacket.options;
6✔
383
    delete serializedPacket.options;
6✔
384

385
    return new Promise<void>((resolve, reject) =>
6✔
386
      this.channel.sendToQueue(
6✔
387
        this.queue,
388
        Buffer.from(JSON.stringify(serializedPacket)),
389
        {
390
          persistent: this.getOptionsProp(
391
            this.options,
392
            'persistent',
393
            RQM_DEFAULT_PERSISTENT,
394
          ),
395
          ...options,
396
          headers: this.mergeHeaders(options?.headers),
397
        },
398
        (err: unknown) => (err ? reject(err) : resolve()),
6✔
399
      ),
400
    );
401
  }
402

403
  protected initializeSerializer(options: RmqOptions['options']) {
404
    this.serializer = options?.serializer ?? new RmqRecordSerializer();
20✔
405
  }
406

407
  protected mergeHeaders(
408
    requestHeaders?: Record<string, string>,
409
  ): Record<string, string> | undefined {
410
    if (!requestHeaders && !this.options?.headers) {
13✔
411
      return undefined;
7✔
412
    }
413

414
    return {
6✔
415
      ...this.options?.headers,
416
      ...requestHeaders,
417
    };
418
  }
419

420
  protected parseMessageContent(content: Buffer) {
421
    const rawContent = content.toString();
×
UNCOV
422
    try {
×
423
      return JSON.parse(rawContent);
×
424
    } catch {
UNCOV
425
      return rawContent;
×
426
    }
427
  }
428
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc