• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

nestjs / nest / cdb8c545-83c0-47d9-b846-7251e4e54cdd

06 May 2025 08:39PM UTC coverage: 88.926% (-0.01%) from 88.936%
cdb8c545-83c0-47d9-b846-7251e4e54cdd

Pull #15056

circleci

maxbronnikov10
fix(core): HTTP adapter error mapping
Pull Request #15056: fix(core): HTTP adapter error mapping

2699 of 3412 branches covered (79.1%)

3 of 3 new or added lines in 1 file covered. (100.0%)

41 existing lines in 2 files now uncovered.

7179 of 8073 relevant lines covered (88.93%)

16.27 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

65.84
/packages/microservices/client/client-rmq.ts
1
/* eslint-disable @typescript-eslint/no-redundant-type-constituents */
2
import { Logger } from '@nestjs/common/services/logger.service';
1✔
3
import { loadPackage } from '@nestjs/common/utils/load-package.util';
1✔
4
import { randomStringGenerator } from '@nestjs/common/utils/random-string-generator.util';
1✔
5
import { isFunction, isString } from '@nestjs/common/utils/shared.utils';
1✔
6
import { EventEmitter } from 'events';
1✔
7
import {
1✔
8
  EmptyError,
9
  firstValueFrom,
10
  fromEvent,
11
  merge,
12
  Observable,
13
  ReplaySubject,
14
} from 'rxjs';
15
import { first, map, retryWhen, scan, skip, switchMap } from 'rxjs/operators';
1✔
16
import {
1✔
17
  DISCONNECTED_RMQ_MESSAGE,
18
  RQM_DEFAULT_IS_GLOBAL_PREFETCH_COUNT,
19
  RQM_DEFAULT_NO_ASSERT,
20
  RQM_DEFAULT_NOACK,
21
  RQM_DEFAULT_PERSISTENT,
22
  RQM_DEFAULT_PREFETCH_COUNT,
23
  RQM_DEFAULT_QUEUE,
24
  RQM_DEFAULT_QUEUE_OPTIONS,
25
  RQM_DEFAULT_URL,
26
} from '../constants';
27
import { RmqEvents, RmqEventsMap, RmqStatus } from '../events/rmq.events';
28
import { ReadPacket, RmqOptions, WritePacket } from '../interfaces';
29
import { RmqRecord } from '../record-builders';
30
import { RmqRecordSerializer } from '../serializers/rmq-record.serializer';
1✔
31
import { ClientProxy } from './client-proxy';
1✔
32

33
// To enable type safety for RMQ. This cant be uncommented by default
34
// because it would require the user to install the amqplib package even if they dont use RabbitMQ
35
// Otherwise, TypeScript would fail to compile the code.
36
//
37
// type AmqpConnectionManager =
38
//   import('amqp-connection-manager').AmqpConnectionManager;
39
// type ChannelWrapper = import('amqp-connection-manager').ChannelWrapper;
40
// type Channel = import('amqplib').Channel;
41
// type ConsumeMessage = import('amqplib').ConsumeMessage;
42

43
type Channel = any;
44
type ChannelWrapper = any;
45
type ConsumeMessage = any;
46
type AmqpConnectionManager = any;
47

48
let rmqPackage = {} as any; // typeof import('amqp-connection-manager');
1✔
49

50
const REPLY_QUEUE = 'amq.rabbitmq.reply-to';
1✔
51

52
/**
53
 * @publicApi
54
 */
55
export class ClientRMQ extends ClientProxy<RmqEvents, RmqStatus> {
1✔
56
  protected readonly logger = new Logger(ClientProxy.name);
20✔
57
  protected connection$: ReplaySubject<any>;
58
  protected connectionPromise: Promise<void>;
59
  protected client: AmqpConnectionManager | null = null;
20✔
60
  protected channel: ChannelWrapper | null = null;
20✔
61
  protected pendingEventListeners: Array<{
20✔
62
    event: keyof RmqEvents;
63
    callback: RmqEvents[keyof RmqEvents];
64
  }> = [];
65
  protected isInitialConnect = true;
20✔
66
  protected responseEmitter: EventEmitter;
67
  protected queue: string;
68
  protected queueOptions: Record<string, any>;
69
  protected replyQueue: string;
70
  protected noAssert: boolean;
71

72
  constructor(protected readonly options: Required<RmqOptions>['options']) {
20✔
73
    super();
20✔
74
    this.queue = this.getOptionsProp(this.options, 'queue', RQM_DEFAULT_QUEUE);
20✔
75
    this.queueOptions = this.getOptionsProp(
20✔
76
      this.options,
77
      'queueOptions',
78
      RQM_DEFAULT_QUEUE_OPTIONS,
79
    );
80
    this.replyQueue = this.getOptionsProp(
20✔
81
      this.options,
82
      'replyQueue',
83
      REPLY_QUEUE,
84
    );
85
    this.noAssert =
20✔
86
      this.getOptionsProp(this.options, 'noAssert') ??
60✔
87
      this.queueOptions.noAssert ??
88
      RQM_DEFAULT_NO_ASSERT;
89

90
    loadPackage('amqplib', ClientRMQ.name, () => require('amqplib'));
20✔
91
    rmqPackage = loadPackage('amqp-connection-manager', ClientRMQ.name, () =>
20✔
92
      require('amqp-connection-manager'),
20✔
93
    );
94

95
    this.initializeSerializer(options);
20✔
96
    this.initializeDeserializer(options);
20✔
97
  }
98

99
  public close(): void {
100
    this.channel && this.channel.close();
2✔
101
    this.client && this.client.close();
2✔
102
    this.channel = null;
2✔
103
    this.client = null;
2✔
104
    this.pendingEventListeners = [];
2✔
105
  }
106

107
  public connect(): Promise<any> {
108
    if (this.client) {
3!
109
      return this.connectionPromise;
×
110
    }
111
    this.client = this.createClient();
3✔
112

113
    this.registerErrorListener(this.client);
3✔
114
    this.registerDisconnectListener(this.client);
3✔
115
    this.registerConnectListener(this.client);
3✔
116
    this.pendingEventListeners.forEach(({ event, callback }) =>
3✔
117
      this.client!.on(event, callback),
×
118
    );
119
    this.pendingEventListeners = [];
3✔
120

121
    this.responseEmitter = new EventEmitter();
3✔
122
    this.responseEmitter.setMaxListeners(0);
3✔
123

124
    const connect$ = this.connect$(this.client);
3✔
125
    const withDisconnect$ = this.mergeDisconnectEvent(
3✔
126
      this.client,
127
      connect$,
128
    ).pipe(switchMap(() => this.createChannel()));
×
129

130
    const withReconnect$ = fromEvent(this.client, RmqEventsMap.CONNECT).pipe(
3✔
131
      skip(1),
132
    );
133
    const source$ = merge(withDisconnect$, withReconnect$);
3✔
134

135
    this.connection$ = new ReplaySubject(1);
3✔
136
    source$.subscribe(this.connection$);
3✔
137
    this.connectionPromise = this.convertConnectionToPromise();
3✔
138

139
    return this.connectionPromise;
3✔
140
  }
141

142
  public createChannel(): Promise<void> {
143
    return new Promise(resolve => {
2✔
144
      this.channel = this.client!.createChannel({
2✔
145
        json: false,
146
        setup: (channel: Channel) => this.setupChannel(channel, resolve),
2✔
147
      });
148
    });
149
  }
150

151
  public createClient(): AmqpConnectionManager {
152
    const socketOptions = this.getOptionsProp(this.options, 'socketOptions');
×
153
    const urls = this.getOptionsProp(this.options, 'urls') || [RQM_DEFAULT_URL];
×
154
    return rmqPackage.connect(urls, socketOptions);
×
155
  }
156

157
  public mergeDisconnectEvent<T = any>(
158
    instance: any,
159
    source$: Observable<T>,
160
  ): Observable<T> {
161
    const eventToError = (eventType: string) =>
×
162
      fromEvent(instance, eventType).pipe(
×
163
        map((err: unknown) => {
164
          throw err;
×
165
        }),
166
      );
167
    const disconnect$ = eventToError(RmqEventsMap.DISCONNECT);
×
168

169
    const urls = this.getOptionsProp(this.options, 'urls', []);
×
170
    const connectFailedEventKey = 'connectFailed';
×
171
    const connectFailed$ = eventToError(connectFailedEventKey).pipe(
×
172
      retryWhen(e =>
173
        e.pipe(
×
174
          scan((errorCount, error: any) => {
175
            if (urls.indexOf(error.url) >= urls.length - 1) {
×
176
              throw error;
×
177
            }
178
            return errorCount + 1;
×
179
          }, 0),
180
        ),
181
      ),
182
    );
183
    // If we ever decide to propagate all disconnect errors & re-emit them through
184
    // the "connection" stream then comment out "first()" operator.
185
    return merge(source$, disconnect$, connectFailed$).pipe(first());
×
186
  }
187

188
  public async convertConnectionToPromise() {
189
    try {
3✔
190
      return await firstValueFrom(this.connection$);
3✔
191
    } catch (err) {
192
      if (err instanceof EmptyError) {
3!
193
        return;
×
194
      }
195
      throw err;
3✔
196
    }
197
  }
198

199
  public async setupChannel(channel: Channel, resolve: Function) {
200
    const prefetchCount =
201
      this.getOptionsProp(this.options, 'prefetchCount') ||
5!
202
      RQM_DEFAULT_PREFETCH_COUNT;
203
    const isGlobalPrefetchCount =
204
      this.getOptionsProp(this.options, 'isGlobalPrefetchCount') ||
5!
205
      RQM_DEFAULT_IS_GLOBAL_PREFETCH_COUNT;
206

207
    if (!this.noAssert) {
5✔
208
      await channel.assertQueue(this.queue, this.queueOptions);
1✔
209
    }
210

211
    if (this.options.exchange && this.options.routingKey) {
5!
212
      await channel.bindQueue(
×
213
        this.queue,
214
        this.options.exchange,
215
        this.options.routingKey,
216
      );
217
    }
218

219
    if (this.options.wildcards) {
5!
220
      const exchange = this.getOptionsProp(
×
221
        this.options,
222
        'exchange',
223
        this.options.queue,
224
      );
225
      const exchangeType = this.getOptionsProp(
×
226
        this.options,
227
        'exchangeType',
228
        'topic',
229
      );
230
      await channel.assertExchange(exchange, exchangeType, {
×
231
        durable: true,
232
      });
233
    }
234

235
    await channel.prefetch(prefetchCount, isGlobalPrefetchCount);
5✔
236
    await this.consumeChannel(channel);
5✔
237
    resolve();
5✔
238
  }
239

240
  public async consumeChannel(channel: Channel) {
241
    const noAck = this.getOptionsProp(this.options, 'noAck', RQM_DEFAULT_NOACK);
1✔
242
    await channel.consume(
1✔
243
      this.replyQueue,
244
      (msg: ConsumeMessage | null) =>
245
        this.responseEmitter.emit(msg!.properties.correlationId, msg),
1✔
246
      {
247
        noAck,
248
      },
249
    );
250
  }
251

252
  public registerErrorListener(client: AmqpConnectionManager): void {
253
    client.addListener(RmqEventsMap.ERROR, (err: any) =>
3✔
UNCOV
254
      this.logger.error(err),
×
255
    );
256
  }
257

258
  public registerDisconnectListener(client: AmqpConnectionManager): void {
259
    client.addListener(RmqEventsMap.DISCONNECT, (err: any) => {
3✔
UNCOV
260
      this._status$.next(RmqStatus.DISCONNECTED);
×
261

UNCOV
262
      if (!this.isInitialConnect) {
×
263
        this.connectionPromise = Promise.reject(
×
264
          'Error: Connection lost. Trying to reconnect...',
265
        );
266

267
        // Prevent unhandled promise rejection
UNCOV
268
        this.connectionPromise.catch(() => {});
×
269
      }
270

UNCOV
271
      this.logger.error(DISCONNECTED_RMQ_MESSAGE);
×
272
      this.logger.error(err);
×
273
    });
274
  }
275

276
  private registerConnectListener(client: AmqpConnectionManager): void {
277
    client.addListener(RmqEventsMap.CONNECT, () => {
3✔
UNCOV
278
      this._status$.next(RmqStatus.CONNECTED);
×
279
      this.logger.log('Successfully connected to RMQ broker');
×
280

UNCOV
281
      if (this.isInitialConnect) {
×
282
        this.isInitialConnect = false;
×
283

UNCOV
284
        if (!this.channel) {
×
285
          this.connectionPromise = this.createChannel();
×
286
        }
287
      } else {
UNCOV
288
        this.connectionPromise = Promise.resolve();
×
289
      }
290
    });
291
  }
292

293
  public on<
294
    EventKey extends keyof RmqEvents = keyof RmqEvents,
295
    EventCallback extends RmqEvents[EventKey] = RmqEvents[EventKey],
296
  >(event: EventKey, callback: EventCallback) {
UNCOV
297
    if (this.client) {
×
298
      this.client.addListener(event, callback);
×
299
    } else {
UNCOV
300
      this.pendingEventListeners.push({ event, callback });
×
301
    }
302
  }
303

304
  public unwrap<T>(): T {
UNCOV
305
    if (!this.client) {
×
306
      throw new Error(
×
307
        'Not initialized. Please call the "connect" method first.',
308
      );
309
    }
UNCOV
310
    return this.client as T;
×
311
  }
312

313
  public async handleMessage(
314
    packet: unknown,
315
    callback: (packet: WritePacket) => any,
316
  ): Promise<void>;
317
  public async handleMessage(
318
    packet: unknown,
319
    options: Record<string, unknown>,
320
    callback: (packet: WritePacket) => any,
321
  ): Promise<void>;
322
  public async handleMessage(
323
    packet: unknown,
324
    options:
325
      | Record<string, unknown>
326
      | ((packet: WritePacket) => any)
327
      | undefined,
328
    callback?: (packet: WritePacket) => any,
329
  ): Promise<void> {
330
    if (isFunction(options)) {
3!
331
      callback = options as (packet: WritePacket) => any;
3✔
332
      options = undefined;
3✔
333
    }
334

335
    const { err, response, isDisposed } = await this.deserializer.deserialize(
3✔
336
      packet,
337
      options,
338
    );
339
    if (isDisposed || err) {
3✔
340
      callback?.({
2✔
341
        err,
342
        response,
343
        isDisposed: true,
344
      });
345
    }
346
    callback?.({
3✔
347
      err,
348
      response,
349
    });
350
  }
351

352
  protected publish(
353
    message: ReadPacket,
354
    callback: (packet: WritePacket) => any,
355
  ): () => void {
356
    try {
7✔
357
      const correlationId = randomStringGenerator();
7✔
358
      const listener = ({
7✔
359
        content,
360
        options,
361
      }: {
362
        content: Buffer;
363
        options: Record<string, unknown>;
364
      }) =>
UNCOV
365
        this.handleMessage(
×
366
          this.parseMessageContent(content),
367
          options,
368
          callback,
369
        );
370

371
      Object.assign(message, { id: correlationId });
7✔
372
      const serializedPacket: ReadPacket & Partial<RmqRecord> =
373
        this.serializer.serialize(message);
7✔
374

375
      const options = serializedPacket.options;
7✔
376
      delete serializedPacket.options;
7✔
377

378
      this.responseEmitter.on(correlationId, listener);
7✔
379

380
      const content = Buffer.from(JSON.stringify(serializedPacket));
7✔
381
      const sendOptions = {
7✔
382
        replyTo: this.replyQueue,
383
        persistent: this.getOptionsProp(
384
          this.options,
385
          'persistent',
386
          RQM_DEFAULT_PERSISTENT,
387
        ),
388
        ...options,
389
        headers: this.mergeHeaders(options?.headers),
390
        correlationId,
391
      };
392

393
      if (this.options.wildcards) {
7!
UNCOV
394
        const stringifiedPattern = isString(message.pattern)
×
395
          ? message.pattern
×
396
          : JSON.stringify(message.pattern);
397

398
        // The exchange is the same as the queue when wildcards are enabled
399
        // and the exchange is not explicitly set
UNCOV
400
        const exchange = this.getOptionsProp(
×
401
          this.options,
402
          'exchange',
403
          this.queue,
404
        );
405

UNCOV
406
        this.channel!.publish(
×
407
          exchange,
408
          stringifiedPattern,
409
          content,
410
          sendOptions,
UNCOV
411
        ).catch(err => callback({ err }));
×
412
      } else {
413
        this.channel!.sendToQueue(this.queue, content, sendOptions).catch(err =>
7✔
UNCOV
414
          callback({ err }),
×
415
        );
416
      }
417
      return () => this.responseEmitter.removeListener(correlationId, listener);
7✔
418
    } catch (err) {
UNCOV
419
      callback({ err });
×
420
      return () => {};
×
421
    }
422
  }
423

424
  protected dispatchEvent(packet: ReadPacket): Promise<any> {
425
    const serializedPacket: ReadPacket & Partial<RmqRecord> =
426
      this.serializer.serialize(packet);
6✔
427

428
    const options = serializedPacket.options;
6✔
429
    delete serializedPacket.options;
6✔
430

431
    return new Promise<void>((resolve, reject) => {
6✔
432
      const content = Buffer.from(JSON.stringify(serializedPacket));
6✔
433
      const sendOptions = {
6✔
434
        persistent: this.getOptionsProp(
435
          this.options,
436
          'persistent',
437
          RQM_DEFAULT_PERSISTENT,
438
        ),
439
        ...options,
440
        headers: this.mergeHeaders(options?.headers),
441
      };
442
      const errorCallback = (err: unknown) =>
6✔
443
        err ? reject(err as Error) : resolve();
6✔
444

445
      return this.options.wildcards
6✔
446
        ? this.channel!.publish(
6!
447
            // The exchange is the same as the queue when wildcards are enabled
448
            // and the exchange is not explicitly set
449
            this.getOptionsProp(this.options, 'exchange', this.queue),
450
            isString(packet.pattern)
451
              ? packet.pattern
×
452
              : JSON.stringify(packet.pattern),
453
            content,
454
            sendOptions,
455
            errorCallback,
456
          )
457
        : this.channel!.sendToQueue(
458
            this.queue,
459
            content,
460
            sendOptions,
461
            errorCallback,
462
          );
463
    });
464
  }
465

466
  protected initializeSerializer(options: RmqOptions['options']) {
467
    this.serializer = options?.serializer ?? new RmqRecordSerializer();
20✔
468
  }
469

470
  protected mergeHeaders(
471
    requestHeaders?: Record<string, string>,
472
  ): Record<string, string> | undefined {
473
    if (!requestHeaders && !this.options?.headers) {
13✔
474
      return undefined;
7✔
475
    }
476

477
    return {
6✔
478
      ...this.options?.headers,
479
      ...requestHeaders,
480
    };
481
  }
482

483
  protected parseMessageContent(content: Buffer) {
UNCOV
484
    const rawContent = content.toString();
×
485
    try {
×
486
      return JSON.parse(rawContent);
×
487
    } catch {
UNCOV
488
      return rawContent;
×
489
    }
490
  }
491
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc