• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

nestjs / nest / f21074a3-8fcf-4ec1-a93c-75675d264f01

22 Oct 2025 12:34AM UTC coverage: 88.726% (-0.2%) from 88.888%
f21074a3-8fcf-4ec1-a93c-75675d264f01

Pull #15815

circleci

mag123c
test(core): add nested transient isolation integration test
Pull Request #15815: fix(core): ensure nested transient provider isolation

2742 of 3477 branches covered (78.86%)

7 of 7 new or added lines in 1 file covered. (100.0%)

52 existing lines in 5 files now uncovered.

7280 of 8205 relevant lines covered (88.73%)

17.3 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

68.9
/packages/microservices/server/server-rmq.ts
1
/* eslint-disable @typescript-eslint/no-redundant-type-constituents */
2
import {
1✔
3
  isNil,
4
  isString,
5
  isUndefined,
6
} from '@nestjs/common/utils/shared.utils';
7
import {
1✔
8
  CONNECTION_FAILED_MESSAGE,
9
  DISCONNECTED_RMQ_MESSAGE,
10
  NO_MESSAGE_HANDLER,
11
  RMQ_SEPARATOR,
12
  RMQ_WILDCARD_ALL,
13
  RMQ_WILDCARD_SINGLE,
14
  RQM_DEFAULT_IS_GLOBAL_PREFETCH_COUNT,
15
  RQM_DEFAULT_NOACK,
16
  RQM_DEFAULT_NO_ASSERT,
17
  RQM_DEFAULT_PREFETCH_COUNT,
18
  RQM_DEFAULT_QUEUE,
19
  RQM_DEFAULT_QUEUE_OPTIONS,
20
  RQM_DEFAULT_URL,
21
  RQM_NO_EVENT_HANDLER,
22
  RQM_NO_MESSAGE_HANDLER,
23
} from '../constants';
24
import { RmqContext } from '../ctx-host';
1✔
25
import { Transport } from '../enums';
1✔
26
import { RmqEvents, RmqEventsMap, RmqStatus } from '../events/rmq.events';
27
import { RmqUrl } from '../external/rmq-url.interface';
28
import { MessageHandler, RmqOptions, TransportId } from '../interfaces';
29
import {
30
  IncomingRequest,
31
  OutgoingResponse,
32
  ReadPacket,
33
} from '../interfaces/packet.interface';
34
import { RmqRecordSerializer } from '../serializers/rmq-record.serializer';
1✔
35
import { Server } from './server';
1✔
36

37
// To enable type safety for RMQ. This cant be uncommented by default
38
// because it would require the user to install the amqplib package even if they dont use RabbitMQ
39
// Otherwise, TypeScript would fail to compile the code.
40
//
41
// type AmqpConnectionManager =
42
//   import('amqp-connection-manager').AmqpConnectionManager;
43
// type ChannelWrapper = import('amqp-connection-manager').ChannelWrapper;
44
// type Message = import('amqplib').Message;
45
// type Channel = import('amqplib').Channel | import('amqplib').ConfirmChannel;
46

47
type AmqpConnectionManager = any;
48
type ChannelWrapper = any;
49
type Message = any;
50
type Channel = any;
51

52
let rmqPackage = {} as any; // as typeof import('amqp-connection-manager');
1✔
53

54
const INFINITE_CONNECTION_ATTEMPTS = -1;
1✔
55

56
/**
57
 * @publicApi
58
 */
59
export class ServerRMQ extends Server<RmqEvents, RmqStatus> {
1✔
60
  public transportId: TransportId = Transport.RMQ;
37✔
61

62
  protected server: AmqpConnectionManager | null = null;
37✔
63
  protected channel: ChannelWrapper | null = null;
37✔
64
  protected connectionAttempts = 0;
37✔
65
  protected readonly urls: string[] | RmqUrl[];
66
  protected readonly queue: string;
67
  protected readonly noAck: boolean;
68
  protected readonly queueOptions: any;
69
  protected readonly wildcardHandlers = new Map<string, MessageHandler>();
37✔
70
  protected pendingEventListeners: Array<{
37✔
71
    event: keyof RmqEvents;
72
    callback: RmqEvents[keyof RmqEvents];
73
  }> = [];
74

75
  constructor(protected readonly options: Required<RmqOptions>['options']) {
37✔
76
    super();
37✔
77
    this.urls = this.getOptionsProp(this.options, 'urls') || [RQM_DEFAULT_URL];
37✔
78
    this.queue =
37✔
79
      this.getOptionsProp(this.options, 'queue') || RQM_DEFAULT_QUEUE;
74✔
80
    this.noAck = this.getOptionsProp(this.options, 'noAck', RQM_DEFAULT_NOACK);
37✔
81
    this.queueOptions =
37✔
82
      this.getOptionsProp(this.options, 'queueOptions') ||
74✔
83
      RQM_DEFAULT_QUEUE_OPTIONS;
84

85
    this.loadPackage('amqplib', ServerRMQ.name, () => require('amqplib'));
37✔
86
    rmqPackage = this.loadPackage(
37✔
87
      'amqp-connection-manager',
88
      ServerRMQ.name,
89
      () => require('amqp-connection-manager'),
37✔
90
    );
91

92
    this.initializeSerializer(options);
37✔
93
    this.initializeDeserializer(options);
37✔
94
  }
95

96
  public async listen(
97
    callback: (err?: unknown, ...optionalParams: unknown[]) => void,
98
  ): Promise<void> {
99
    try {
5✔
100
      await this.start(callback);
5✔
101
    } catch (err) {
102
      callback(err);
1✔
103
    }
104
  }
105

106
  public async close(): Promise<void> {
107
    this.channel && (await this.channel.close());
2✔
108
    this.server && (await this.server.close());
2✔
109
    this.pendingEventListeners = [];
2✔
110
  }
111

112
  public async start(
113
    callback?: (err?: unknown, ...optionalParams: unknown[]) => void,
114
  ) {
115
    this.server = this.createClient();
4✔
116
    this.server!.once(RmqEventsMap.CONNECT, () => {
4✔
117
      if (this.channel) {
4!
118
        return;
×
119
      }
120
      this._status$.next(RmqStatus.CONNECTED);
4✔
121
      this.channel = this.server!.createChannel({
4✔
122
        json: false,
123
        setup: (channel: Channel) => this.setupChannel(channel, callback!),
4✔
124
      });
125
    });
126

127
    const maxConnectionAttempts = this.getOptionsProp(
4✔
128
      this.options,
129
      'maxConnectionAttempts',
130
      INFINITE_CONNECTION_ATTEMPTS,
131
    );
132

133
    this.registerConnectListener();
4✔
134
    this.registerDisconnectListener();
4✔
135
    this.pendingEventListeners.forEach(({ event, callback }) =>
4✔
136
      this.server!.on(event, callback),
×
137
    );
138
    this.pendingEventListeners = [];
4✔
139

140
    const connectFailedEvent = 'connectFailed';
4✔
141
    this.server!.once(
4✔
142
      connectFailedEvent,
143
      async (error: Record<string, unknown>) => {
144
        this._status$.next(RmqStatus.DISCONNECTED);
×
145

146
        this.logger.error(CONNECTION_FAILED_MESSAGE);
×
147
        if (error?.err) {
×
148
          this.logger.error(error.err);
×
149
        }
150
        const isReconnecting = !!this.channel;
×
151
        if (
×
152
          maxConnectionAttempts === INFINITE_CONNECTION_ATTEMPTS ||
×
153
          isReconnecting
154
        ) {
155
          return;
×
156
        }
157
        if (++this.connectionAttempts === maxConnectionAttempts) {
×
158
          await this.close();
×
159
          callback?.(error.err ?? new Error(CONNECTION_FAILED_MESSAGE));
×
160
        }
161
      },
162
    );
163
  }
164

165
  public createClient<T = any>(): T {
166
    const socketOptions = this.getOptionsProp(this.options, 'socketOptions');
×
167
    return rmqPackage.connect(this.urls, {
×
168
      connectionOptions: socketOptions?.connectionOptions,
169
      heartbeatIntervalInSeconds: socketOptions?.heartbeatIntervalInSeconds,
170
      reconnectTimeInSeconds: socketOptions?.reconnectTimeInSeconds,
171
    });
172
  }
173

174
  private registerConnectListener() {
175
    this.server!.on(RmqEventsMap.CONNECT, (err: any) => {
4✔
176
      this._status$.next(RmqStatus.CONNECTED);
4✔
177
    });
178
  }
179

180
  private registerDisconnectListener() {
181
    this.server!.on(RmqEventsMap.DISCONNECT, (err: any) => {
4✔
182
      this._status$.next(RmqStatus.DISCONNECTED);
×
183
      this.logger.error(DISCONNECTED_RMQ_MESSAGE);
×
184
      this.logger.error(err);
×
185
    });
186
  }
187

188
  public async setupChannel(channel: Channel, callback: Function) {
189
    const noAssert =
190
      this.getOptionsProp(this.options, 'noAssert') ??
5✔
191
      this.queueOptions.noAssert ??
192
      RQM_DEFAULT_NO_ASSERT;
193

194
    if (!noAssert) {
5✔
195
      await channel.assertQueue(this.queue, this.queueOptions);
4✔
196
    }
197

198
    const isGlobalPrefetchCount = this.getOptionsProp(
5✔
199
      this.options,
200
      'isGlobalPrefetchCount',
201
      RQM_DEFAULT_IS_GLOBAL_PREFETCH_COUNT,
202
    );
203
    const prefetchCount = this.getOptionsProp(
5✔
204
      this.options,
205
      'prefetchCount',
206
      RQM_DEFAULT_PREFETCH_COUNT,
207
    );
208

209
    if (this.options.exchange || this.options.wildcards) {
5!
210
      // Use queue name as exchange name if exchange is not provided and "wildcards" is set to true
UNCOV
211
      const exchange = this.getOptionsProp(
×
212
        this.options,
213
        'exchange',
214
        this.options.queue,
215
      );
UNCOV
216
      const exchangeType = this.getOptionsProp(
×
217
        this.options,
218
        'exchangeType',
219
        'topic',
220
      );
UNCOV
221
      await channel.assertExchange(exchange, exchangeType, {
×
222
        durable: true,
223
        arguments: this.getOptionsProp(this.options, 'exchangeArguments', {}),
224
      });
225

UNCOV
226
      if (this.options.routingKey) {
×
UNCOV
227
        await channel.bindQueue(this.queue, exchange, this.options.routingKey);
×
228
      }
229

UNCOV
230
      if (this.options.wildcards) {
×
UNCOV
231
        const routingKeys = Array.from(this.getHandlers().keys());
×
UNCOV
232
        await Promise.all(
×
233
          routingKeys.map(routingKey =>
UNCOV
234
            channel.bindQueue(this.queue, exchange, routingKey),
×
235
          ),
236
        );
237

238
        // When "wildcards" is set to true,  we need to initialize wildcard handlers
239
        // otherwise we would not be able to associate the incoming messages with the handlers
UNCOV
240
        this.initializeWildcardHandlersIfExist();
×
241
      }
242
    }
243

244
    await channel.prefetch(prefetchCount, isGlobalPrefetchCount);
5✔
245
    channel.consume(
5✔
246
      this.queue,
UNCOV
247
      (msg: Record<string, any> | null) => this.handleMessage(msg!, channel),
×
248
      {
249
        noAck: this.noAck,
250
        consumerTag: this.getOptionsProp(
251
          this.options,
252
          'consumerTag',
253
          undefined,
254
        ),
255
      },
256
    );
257
    callback();
5✔
258
  }
259

260
  public async handleMessage(
261
    message: Record<string, any>,
262
    channel: any,
263
  ): Promise<void> {
264
    if (isNil(message)) {
6!
UNCOV
265
      return;
×
266
    }
267
    const { content, properties } = message;
6✔
268
    const rawMessage = this.parseMessageContent(content);
6✔
269
    const packet = await this.deserializer.deserialize(rawMessage, properties);
6✔
270
    const pattern = isString(packet.pattern)
6✔
271
      ? packet.pattern
6✔
272
      : JSON.stringify(packet.pattern);
273

274
    const rmqContext = new RmqContext([message, channel, pattern]);
6✔
275
    if (isUndefined((packet as IncomingRequest).id)) {
6✔
276
      return this.handleEvent(pattern, packet, rmqContext);
2✔
277
    }
278
    const handler = this.getHandlerByPattern(pattern);
4✔
279

280
    if (!handler) {
4✔
281
      if (!this.noAck) {
3✔
282
        this.logger.warn(RQM_NO_MESSAGE_HANDLER`${pattern}`);
1✔
283
        this.channel!.nack(rmqContext.getMessage() as Message, false, false);
1✔
284
      }
285
      const status = 'error';
3✔
286
      const noHandlerPacket = {
3✔
287
        id: (packet as IncomingRequest).id,
288
        err: NO_MESSAGE_HANDLER,
289
        status,
290
      };
291
      return this.sendMessage(
3✔
292
        noHandlerPacket,
293
        properties.replyTo,
294
        properties.correlationId,
295
        rmqContext,
296
      );
297
    }
298
    return this.onProcessingStartHook(
1✔
299
      this.transportId,
300
      rmqContext,
301
      async () => {
302
        const response$ = this.transformToObservable(
1✔
303
          await handler(packet.data, rmqContext),
304
        );
305

306
        const publish = <T>(data: T) =>
1✔
307
          this.sendMessage(
1✔
308
            data,
309
            properties.replyTo,
310
            properties.correlationId,
311
            rmqContext,
312
          );
313

314
        response$ && this.send(response$, publish);
1✔
315
      },
316
    );
317
  }
318

319
  public async handleEvent(
320
    pattern: string,
321
    packet: ReadPacket,
322
    context: RmqContext,
323
  ): Promise<any> {
324
    const handler = this.getHandlerByPattern(pattern);
5✔
325
    if (!handler && !this.noAck) {
5✔
326
      this.channel!.nack(context.getMessage() as Message, false, false);
1✔
327
      return this.logger.warn(RQM_NO_EVENT_HANDLER`${pattern}`);
1✔
328
    }
329
    return super.handleEvent(pattern, packet, context);
4✔
330
  }
331

332
  public sendMessage<T = any>(
333
    message: T,
334
    replyTo: any,
335
    correlationId: string,
336
    context: RmqContext,
337
  ): void {
338
    const outgoingResponse = this.serializer.serialize(
1✔
339
      message as unknown as OutgoingResponse,
340
    );
341
    const options = outgoingResponse.options;
1✔
342
    delete outgoingResponse.options;
1✔
343

344
    const buffer = Buffer.from(JSON.stringify(outgoingResponse));
1✔
345
    const sendOptions = { correlationId, ...options };
1✔
346

347
    this.onProcessingEndHook?.(this.transportId, context);
1✔
348
    this.channel!.sendToQueue(replyTo, buffer, sendOptions);
1✔
349
  }
350

351
  public unwrap<T>(): T {
UNCOV
352
    if (!this.server) {
×
UNCOV
353
      throw new Error(
×
354
        'Not initialized. Please call the "listen"/"startAllMicroservices" method before accessing the server.',
355
      );
356
    }
UNCOV
357
    return this.server as T;
×
358
  }
359

360
  public on<
361
    EventKey extends keyof RmqEvents = keyof RmqEvents,
362
    EventCallback extends RmqEvents[EventKey] = RmqEvents[EventKey],
363
  >(event: EventKey, callback: EventCallback) {
364
    if (this.server) {
×
365
      this.server.addListener(event, callback);
×
366
    } else {
UNCOV
367
      this.pendingEventListeners.push({ event, callback });
×
368
    }
369
  }
370

371
  public getHandlerByPattern(pattern: string): MessageHandler | null {
372
    if (!this.options.wildcards) {
13!
373
      return super.getHandlerByPattern(pattern);
13✔
374
    }
375

376
    // Search for non-wildcard handler first
377
    const handler = super.getHandlerByPattern(pattern);
×
UNCOV
378
    if (handler) {
×
379
      return handler;
×
380
    }
381

382
    // Search for wildcard handler
UNCOV
383
    if (this.wildcardHandlers.size === 0) {
×
UNCOV
384
      return null;
×
385
    }
UNCOV
386
    for (const [wildcardPattern, handler] of this.wildcardHandlers) {
×
UNCOV
387
      if (this.matchRmqPattern(wildcardPattern, pattern)) {
×
UNCOV
388
        return handler;
×
389
      }
390
    }
391
    return null;
×
392
  }
393

394
  protected initializeSerializer(options: RmqOptions['options']) {
395
    this.serializer = options?.serializer ?? new RmqRecordSerializer();
37✔
396
  }
397

398
  private parseMessageContent(content: Buffer) {
399
    try {
6✔
400
      return JSON.parse(content.toString());
6✔
401
    } catch {
402
      return content.toString();
1✔
403
    }
404
  }
405

406
  private initializeWildcardHandlersIfExist() {
UNCOV
407
    if (this.wildcardHandlers.size !== 0) {
×
UNCOV
408
      return;
×
409
    }
UNCOV
410
    const handlers = this.getHandlers();
×
411

UNCOV
412
    handlers.forEach((handler, pattern) => {
×
UNCOV
413
      if (
×
414
        pattern.includes(RMQ_WILDCARD_ALL) ||
×
415
        pattern.includes(RMQ_WILDCARD_SINGLE)
416
      ) {
UNCOV
417
        this.wildcardHandlers.set(pattern, handler);
×
418
      }
419
    });
420
  }
421

422
  private matchRmqPattern(pattern: string, routingKey: string): boolean {
423
    if (!routingKey) {
37✔
424
      return pattern === RMQ_WILDCARD_ALL;
3✔
425
    }
426

427
    const patternSegments = pattern.split(RMQ_SEPARATOR);
34✔
428
    const routingKeySegments = routingKey.split(RMQ_SEPARATOR);
34✔
429

430
    const patternSegmentsLength = patternSegments.length;
34✔
431
    const routingKeySegmentsLength = routingKeySegments.length;
34✔
432
    const lastIndex = patternSegmentsLength - 1;
34✔
433

434
    for (const [i, currentPattern] of patternSegments.entries()) {
34✔
435
      const currentRoutingKey = routingKeySegments[i];
76✔
436

437
      if (!currentRoutingKey && !currentPattern) {
76!
UNCOV
438
        continue;
×
439
      }
440
      if (!currentRoutingKey && currentPattern !== RMQ_WILDCARD_ALL) {
76✔
441
        return false;
1✔
442
      }
443
      if (currentPattern === RMQ_WILDCARD_ALL) {
75✔
444
        return i === lastIndex;
13✔
445
      }
446
      if (
62✔
447
        currentPattern !== RMQ_WILDCARD_SINGLE &&
107✔
448
        currentPattern !== currentRoutingKey
449
      ) {
450
        return false;
3✔
451
      }
452
    }
453
    return patternSegmentsLength === routingKeySegmentsLength;
17✔
454
  }
455
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc