• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

nestjs / nest / f21074a3-8fcf-4ec1-a93c-75675d264f01

22 Oct 2025 12:34AM UTC coverage: 88.726% (-0.2%) from 88.888%
f21074a3-8fcf-4ec1-a93c-75675d264f01

Pull #15815

circleci

mag123c
test(core): add nested transient isolation integration test
Pull Request #15815: fix(core): ensure nested transient provider isolation

2742 of 3477 branches covered (78.86%)

7 of 7 new or added lines in 1 file covered. (100.0%)

52 existing lines in 5 files now uncovered.

7280 of 8205 relevant lines covered (88.73%)

17.3 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

65.84
/packages/microservices/client/client-rmq.ts
1
/* eslint-disable @typescript-eslint/no-redundant-type-constituents */
2
import { Logger } from '@nestjs/common/services/logger.service';
1✔
3
import { loadPackage } from '@nestjs/common/utils/load-package.util';
1✔
4
import { randomStringGenerator } from '@nestjs/common/utils/random-string-generator.util';
1✔
5
import { isFunction, isString } from '@nestjs/common/utils/shared.utils';
1✔
6
import { EventEmitter } from 'events';
1✔
7
import {
1✔
8
  EmptyError,
9
  firstValueFrom,
10
  fromEvent,
11
  merge,
12
  Observable,
13
  ReplaySubject,
14
} from 'rxjs';
15
import { first, map, retryWhen, scan, skip, switchMap } from 'rxjs/operators';
1✔
16
import {
1✔
17
  DISCONNECTED_RMQ_MESSAGE,
18
  RQM_DEFAULT_IS_GLOBAL_PREFETCH_COUNT,
19
  RQM_DEFAULT_NO_ASSERT,
20
  RQM_DEFAULT_NOACK,
21
  RQM_DEFAULT_PERSISTENT,
22
  RQM_DEFAULT_PREFETCH_COUNT,
23
  RQM_DEFAULT_QUEUE,
24
  RQM_DEFAULT_QUEUE_OPTIONS,
25
  RQM_DEFAULT_URL,
26
} from '../constants';
27
import { RmqEvents, RmqEventsMap, RmqStatus } from '../events/rmq.events';
28
import { ReadPacket, RmqOptions, WritePacket } from '../interfaces';
29
import { RmqRecord } from '../record-builders';
30
import { RmqRecordSerializer } from '../serializers/rmq-record.serializer';
1✔
31
import { ClientProxy } from './client-proxy';
1✔
32

33
// To enable type safety for RMQ. This cant be uncommented by default
34
// because it would require the user to install the amqplib package even if they dont use RabbitMQ
35
// Otherwise, TypeScript would fail to compile the code.
36
//
37
// type AmqpConnectionManager =
38
//   import('amqp-connection-manager').AmqpConnectionManager;
39
// type ChannelWrapper = import('amqp-connection-manager').ChannelWrapper;
40
// type Channel = import('amqplib').Channel;
41
// type ConsumeMessage = import('amqplib').ConsumeMessage;
42

43
type Channel = any;
44
type ChannelWrapper = any;
45
type ConsumeMessage = any;
46
type AmqpConnectionManager = any;
47

48
let rmqPackage = {} as any; // typeof import('amqp-connection-manager');
1✔
49

50
const REPLY_QUEUE = 'amq.rabbitmq.reply-to';
1✔
51

52
/**
53
 * @publicApi
54
 */
55
export class ClientRMQ extends ClientProxy<RmqEvents, RmqStatus> {
1✔
56
  protected readonly logger = new Logger(ClientProxy.name);
20✔
57
  protected connection$: ReplaySubject<any>;
58
  protected connectionPromise: Promise<void>;
59
  protected client: AmqpConnectionManager | null = null;
20✔
60
  protected channel: ChannelWrapper | null = null;
20✔
61
  protected pendingEventListeners: Array<{
20✔
62
    event: keyof RmqEvents;
63
    callback: RmqEvents[keyof RmqEvents];
64
  }> = [];
65
  protected isInitialConnect = true;
20✔
66
  protected responseEmitter: EventEmitter;
67
  protected queue: string;
68
  protected queueOptions: Record<string, any>;
69
  protected replyQueue: string;
70
  protected noAssert: boolean;
71

72
  constructor(protected readonly options: Required<RmqOptions>['options']) {
20✔
73
    super();
20✔
74
    this.queue = this.getOptionsProp(this.options, 'queue', RQM_DEFAULT_QUEUE);
20✔
75
    this.queueOptions = this.getOptionsProp(
20✔
76
      this.options,
77
      'queueOptions',
78
      RQM_DEFAULT_QUEUE_OPTIONS,
79
    );
80
    this.replyQueue = this.getOptionsProp(
20✔
81
      this.options,
82
      'replyQueue',
83
      REPLY_QUEUE,
84
    );
85
    this.noAssert =
20✔
86
      this.getOptionsProp(this.options, 'noAssert') ??
60✔
87
      this.queueOptions.noAssert ??
88
      RQM_DEFAULT_NO_ASSERT;
89

90
    loadPackage('amqplib', ClientRMQ.name, () => require('amqplib'));
20✔
91
    rmqPackage = loadPackage('amqp-connection-manager', ClientRMQ.name, () =>
20✔
92
      require('amqp-connection-manager'),
20✔
93
    );
94

95
    this.initializeSerializer(options);
20✔
96
    this.initializeDeserializer(options);
20✔
97
  }
98

99
  public async close(): Promise<void> {
100
    this.channel && (await this.channel.close());
2✔
101
    this.client && (await this.client.close());
2✔
102
    this.channel = null;
2✔
103
    this.client = null;
2✔
104
    this.pendingEventListeners = [];
2✔
105
  }
106

107
  public connect(): Promise<any> {
108
    if (this.client) {
3!
109
      return this.connectionPromise;
×
110
    }
111
    this.client = this.createClient();
3✔
112

113
    this.registerErrorListener(this.client);
3✔
114
    this.registerDisconnectListener(this.client);
3✔
115
    this.registerConnectListener(this.client);
3✔
116
    this.pendingEventListeners.forEach(({ event, callback }) =>
3✔
117
      this.client!.on(event, callback),
×
118
    );
119
    this.pendingEventListeners = [];
3✔
120

121
    this.responseEmitter = new EventEmitter();
3✔
122
    this.responseEmitter.setMaxListeners(0);
3✔
123

124
    const connect$ = this.connect$(this.client);
3✔
125
    const withDisconnect$ = this.mergeDisconnectEvent(
3✔
126
      this.client,
127
      connect$,
128
    ).pipe(switchMap(() => this.createChannel()));
×
129

130
    const withReconnect$ = fromEvent(this.client, RmqEventsMap.CONNECT).pipe(
3✔
131
      skip(1),
132
    );
133
    const source$ = merge(withDisconnect$, withReconnect$);
3✔
134

135
    this.connection$ = new ReplaySubject(1);
3✔
136
    source$.subscribe(this.connection$);
3✔
137
    this.connectionPromise = this.convertConnectionToPromise();
3✔
138

139
    return this.connectionPromise;
3✔
140
  }
141

142
  public createChannel(): Promise<void> {
143
    return new Promise(resolve => {
2✔
144
      this.channel = this.client!.createChannel({
2✔
145
        json: false,
146
        setup: (channel: Channel) => this.setupChannel(channel, resolve),
2✔
147
      });
148
    });
149
  }
150

151
  public createClient(): AmqpConnectionManager {
152
    const socketOptions = this.getOptionsProp(this.options, 'socketOptions');
×
153
    const urls = this.getOptionsProp(this.options, 'urls') || [RQM_DEFAULT_URL];
×
154
    return rmqPackage.connect(urls, socketOptions);
×
155
  }
156

157
  public mergeDisconnectEvent<T = any>(
158
    instance: any,
159
    source$: Observable<T>,
160
  ): Observable<T> {
161
    const eventToError = (eventType: string) =>
×
162
      fromEvent(instance, eventType).pipe(
×
163
        map((err: unknown) => {
164
          throw err;
×
165
        }),
166
      );
167
    const disconnect$ = eventToError(RmqEventsMap.DISCONNECT);
×
168

169
    const urls = this.getOptionsProp(this.options, 'urls', []);
×
170
    const connectFailedEventKey = 'connectFailed';
×
171
    const connectFailed$ = eventToError(connectFailedEventKey).pipe(
×
172
      retryWhen(e =>
173
        e.pipe(
×
174
          scan((errorCount, error: any) => {
175
            if (urls.indexOf(error.url) >= urls.length - 1) {
×
176
              throw error;
×
177
            }
178
            return errorCount + 1;
×
179
          }, 0),
180
        ),
181
      ),
182
    );
183
    // If we ever decide to propagate all disconnect errors & re-emit them through
184
    // the "connection" stream then comment out "first()" operator.
185
    return merge(source$, disconnect$, connectFailed$).pipe(first());
×
186
  }
187

188
  public async convertConnectionToPromise() {
189
    try {
3✔
190
      return await firstValueFrom(this.connection$);
3✔
191
    } catch (err) {
192
      if (err instanceof EmptyError) {
3!
193
        return;
×
194
      }
195
      throw err;
3✔
196
    }
197
  }
198

199
  public async setupChannel(channel: Channel, resolve: Function) {
200
    const prefetchCount =
201
      this.getOptionsProp(this.options, 'prefetchCount') ||
5!
202
      RQM_DEFAULT_PREFETCH_COUNT;
203
    const isGlobalPrefetchCount =
204
      this.getOptionsProp(this.options, 'isGlobalPrefetchCount') ||
5!
205
      RQM_DEFAULT_IS_GLOBAL_PREFETCH_COUNT;
206

207
    if (!this.noAssert) {
5✔
208
      await channel.assertQueue(this.queue, this.queueOptions);
1✔
209
    }
210

211
    if (this.options.exchange && this.options.routingKey) {
5!
UNCOV
212
      await channel.bindQueue(
×
213
        this.queue,
214
        this.options.exchange,
215
        this.options.routingKey,
216
      );
217
    }
218

219
    if (this.options.wildcards) {
5!
UNCOV
220
      const exchange = this.getOptionsProp(
×
221
        this.options,
222
        'exchange',
223
        this.options.queue,
224
      );
UNCOV
225
      const exchangeType = this.getOptionsProp(
×
226
        this.options,
227
        'exchangeType',
228
        'topic',
229
      );
UNCOV
230
      await channel.assertExchange(exchange, exchangeType, {
×
231
        durable: true,
232
        arguments: this.getOptionsProp(this.options, 'exchangeArguments', {}),
233
      });
234
    }
235

236
    await channel.prefetch(prefetchCount, isGlobalPrefetchCount);
5✔
237
    await this.consumeChannel(channel);
5✔
238
    resolve();
5✔
239
  }
240

241
  public async consumeChannel(channel: Channel) {
242
    const noAck = this.getOptionsProp(this.options, 'noAck', RQM_DEFAULT_NOACK);
1✔
243
    await channel.consume(
1✔
244
      this.replyQueue,
245
      (msg: ConsumeMessage | null) =>
246
        this.responseEmitter.emit(msg!.properties.correlationId, msg),
1✔
247
      {
248
        noAck,
249
      },
250
    );
251
  }
252

253
  public registerErrorListener(client: AmqpConnectionManager): void {
254
    client.addListener(RmqEventsMap.ERROR, (err: any) =>
3✔
255
      this.logger.error(err),
×
256
    );
257
  }
258

259
  public registerDisconnectListener(client: AmqpConnectionManager): void {
260
    client.addListener(RmqEventsMap.DISCONNECT, (err: any) => {
3✔
261
      this._status$.next(RmqStatus.DISCONNECTED);
×
262

263
      if (!this.isInitialConnect) {
×
264
        this.connectionPromise = Promise.reject(
×
265
          'Error: Connection lost. Trying to reconnect...',
266
        );
267

268
        // Prevent unhandled promise rejection
269
        this.connectionPromise.catch(() => {});
×
270
      }
271

272
      this.logger.error(DISCONNECTED_RMQ_MESSAGE);
×
273
      this.logger.error(err);
×
274
    });
275
  }
276

277
  private registerConnectListener(client: AmqpConnectionManager): void {
278
    client.addListener(RmqEventsMap.CONNECT, () => {
3✔
279
      this._status$.next(RmqStatus.CONNECTED);
×
280
      this.logger.log('Successfully connected to RMQ broker');
×
281

282
      if (this.isInitialConnect) {
×
283
        this.isInitialConnect = false;
×
284

285
        if (!this.channel) {
×
286
          this.connectionPromise = this.createChannel();
×
287
        }
288
      } else {
289
        this.connectionPromise = Promise.resolve();
×
290
      }
291
    });
292
  }
293

294
  public on<
295
    EventKey extends keyof RmqEvents = keyof RmqEvents,
296
    EventCallback extends RmqEvents[EventKey] = RmqEvents[EventKey],
297
  >(event: EventKey, callback: EventCallback) {
298
    if (this.client) {
×
299
      this.client.addListener(event, callback);
×
300
    } else {
301
      this.pendingEventListeners.push({ event, callback });
×
302
    }
303
  }
304

305
  public unwrap<T>(): T {
306
    if (!this.client) {
×
307
      throw new Error(
×
308
        'Not initialized. Please call the "connect" method first.',
309
      );
310
    }
311
    return this.client as T;
×
312
  }
313

314
  public async handleMessage(
315
    packet: unknown,
316
    callback: (packet: WritePacket) => any,
317
  ): Promise<void>;
318
  public async handleMessage(
319
    packet: unknown,
320
    options: Record<string, unknown>,
321
    callback: (packet: WritePacket) => any,
322
  ): Promise<void>;
323
  public async handleMessage(
324
    packet: unknown,
325
    options:
326
      | Record<string, unknown>
327
      | ((packet: WritePacket) => any)
328
      | undefined,
329
    callback?: (packet: WritePacket) => any,
330
  ): Promise<void> {
331
    if (isFunction(options)) {
3!
332
      callback = options as (packet: WritePacket) => any;
3✔
333
      options = undefined;
3✔
334
    }
335

336
    const { err, response, isDisposed } = await this.deserializer.deserialize(
3✔
337
      packet,
338
      options,
339
    );
340
    if (isDisposed || err) {
3✔
341
      callback?.({
2✔
342
        err,
343
        response,
344
        isDisposed: true,
345
      });
346
    }
347
    callback?.({
3✔
348
      err,
349
      response,
350
    });
351
  }
352

353
  protected publish(
354
    message: ReadPacket,
355
    callback: (packet: WritePacket) => any,
356
  ): () => void {
357
    try {
7✔
358
      const correlationId = randomStringGenerator();
7✔
359
      const listener = ({
7✔
360
        content,
361
        options,
362
      }: {
363
        content: Buffer;
364
        options: Record<string, unknown>;
365
      }) =>
366
        this.handleMessage(
×
367
          this.parseMessageContent(content),
368
          options,
369
          callback,
370
        );
371

372
      Object.assign(message, { id: correlationId });
7✔
373
      const serializedPacket: ReadPacket & Partial<RmqRecord> =
374
        this.serializer.serialize(message);
7✔
375

376
      const options = serializedPacket.options;
7✔
377
      delete serializedPacket.options;
7✔
378

379
      this.responseEmitter.on(correlationId, listener);
7✔
380

381
      const content = Buffer.from(JSON.stringify(serializedPacket));
7✔
382
      const sendOptions = {
7✔
383
        replyTo: this.replyQueue,
384
        persistent: this.getOptionsProp(
385
          this.options,
386
          'persistent',
387
          RQM_DEFAULT_PERSISTENT,
388
        ),
389
        ...options,
390
        headers: this.mergeHeaders(options?.headers),
391
        correlationId,
392
      };
393

394
      if (this.options.wildcards) {
7!
UNCOV
395
        const stringifiedPattern = isString(message.pattern)
×
396
          ? message.pattern
×
397
          : JSON.stringify(message.pattern);
398

399
        // The exchange is the same as the queue when wildcards are enabled
400
        // and the exchange is not explicitly set
UNCOV
401
        const exchange = this.getOptionsProp(
×
402
          this.options,
403
          'exchange',
404
          this.queue,
405
        );
406

UNCOV
407
        this.channel!.publish(
×
408
          exchange,
409
          stringifiedPattern,
410
          content,
411
          sendOptions,
412
        ).catch(err => callback({ err }));
×
413
      } else {
414
        this.channel!.sendToQueue(this.queue, content, sendOptions).catch(err =>
7✔
415
          callback({ err }),
×
416
        );
417
      }
418
      return () => this.responseEmitter.removeListener(correlationId, listener);
7✔
419
    } catch (err) {
420
      callback({ err });
×
421
      return () => {};
×
422
    }
423
  }
424

425
  protected dispatchEvent(packet: ReadPacket): Promise<any> {
426
    const serializedPacket: ReadPacket & Partial<RmqRecord> =
427
      this.serializer.serialize(packet);
6✔
428

429
    const options = serializedPacket.options;
6✔
430
    delete serializedPacket.options;
6✔
431

432
    return new Promise<void>((resolve, reject) => {
6✔
433
      const content = Buffer.from(JSON.stringify(serializedPacket));
6✔
434
      const sendOptions = {
6✔
435
        persistent: this.getOptionsProp(
436
          this.options,
437
          'persistent',
438
          RQM_DEFAULT_PERSISTENT,
439
        ),
440
        ...options,
441
        headers: this.mergeHeaders(options?.headers),
442
      };
443
      const errorCallback = (err: unknown) =>
6✔
444
        err ? reject(err as Error) : resolve();
6✔
445

446
      return this.options.wildcards
6✔
447
        ? this.channel!.publish(
6!
448
            // The exchange is the same as the queue when wildcards are enabled
449
            // and the exchange is not explicitly set
450
            this.getOptionsProp(this.options, 'exchange', this.queue),
451
            isString(packet.pattern)
452
              ? packet.pattern
×
453
              : JSON.stringify(packet.pattern),
454
            content,
455
            sendOptions,
456
            errorCallback,
457
          )
458
        : this.channel!.sendToQueue(
459
            this.queue,
460
            content,
461
            sendOptions,
462
            errorCallback,
463
          );
464
    });
465
  }
466

467
  protected initializeSerializer(options: RmqOptions['options']) {
468
    this.serializer = options?.serializer ?? new RmqRecordSerializer();
20✔
469
  }
470

471
  protected mergeHeaders(
472
    requestHeaders?: Record<string, string>,
473
  ): Record<string, string> | undefined {
474
    if (!requestHeaders && !this.options?.headers) {
13✔
475
      return undefined;
7✔
476
    }
477

478
    return {
6✔
479
      ...this.options?.headers,
480
      ...requestHeaders,
481
    };
482
  }
483

484
  protected parseMessageContent(content: Buffer) {
485
    const rawContent = content.toString();
×
486
    try {
×
487
      return JSON.parse(rawContent);
×
488
    } catch {
489
      return rawContent;
×
490
    }
491
  }
492
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc