• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

nestjs / nest / cdb8c545-83c0-47d9-b846-7251e4e54cdd

06 May 2025 08:39PM UTC coverage: 88.926% (-0.01%) from 88.936%
cdb8c545-83c0-47d9-b846-7251e4e54cdd

Pull #15056

circleci

maxbronnikov10
fix(core): HTTP adapter error mapping
Pull Request #15056: fix(core): HTTP adapter error mapping

2699 of 3412 branches covered (79.1%)

3 of 3 new or added lines in 1 file covered. (100.0%)

41 existing lines in 2 files now uncovered.

7179 of 8073 relevant lines covered (88.93%)

16.27 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

62.25
/packages/microservices/server/server-rmq.ts
1
/* eslint-disable @typescript-eslint/no-redundant-type-constituents */
2
import {
1✔
3
  isNil,
4
  isString,
5
  isUndefined,
6
} from '@nestjs/common/utils/shared.utils';
7
import {
1✔
8
  CONNECTION_FAILED_MESSAGE,
9
  DISCONNECTED_RMQ_MESSAGE,
10
  NO_MESSAGE_HANDLER,
11
  RQM_DEFAULT_IS_GLOBAL_PREFETCH_COUNT,
12
  RQM_DEFAULT_NOACK,
13
  RQM_DEFAULT_NO_ASSERT,
14
  RQM_DEFAULT_PREFETCH_COUNT,
15
  RQM_DEFAULT_QUEUE,
16
  RQM_DEFAULT_QUEUE_OPTIONS,
17
  RQM_DEFAULT_URL,
18
  RQM_NO_EVENT_HANDLER,
19
  RQM_NO_MESSAGE_HANDLER,
20
} from '../constants';
21
import { RmqContext } from '../ctx-host';
1✔
22
import { Transport } from '../enums';
1✔
23
import { RmqEvents, RmqEventsMap, RmqStatus } from '../events/rmq.events';
24
import { RmqUrl } from '../external/rmq-url.interface';
25
import { MessageHandler, RmqOptions } from '../interfaces';
26
import {
27
  IncomingRequest,
28
  OutgoingResponse,
29
  ReadPacket,
30
} from '../interfaces/packet.interface';
31
import { RmqRecordSerializer } from '../serializers/rmq-record.serializer';
1✔
32
import { Server } from './server';
1✔
33

34
// To enable type safety for RMQ. This cant be uncommented by default
35
// because it would require the user to install the amqplib package even if they dont use RabbitMQ
36
// Otherwise, TypeScript would fail to compile the code.
37
//
38
// type AmqpConnectionManager =
39
//   import('amqp-connection-manager').AmqpConnectionManager;
40
// type ChannelWrapper = import('amqp-connection-manager').ChannelWrapper;
41
// type Message = import('amqplib').Message;
42
// type Channel = import('amqplib').Channel | import('amqplib').ConfirmChannel;
43

44
type AmqpConnectionManager = any;
45
type ChannelWrapper = any;
46
type Message = any;
47
type Channel = any;
48

49
let rmqPackage = {} as any; // as typeof import('amqp-connection-manager');
1✔
50

51
const INFINITE_CONNECTION_ATTEMPTS = -1;
1✔
52

53
/**
54
 * @publicApi
55
 */
56
export class ServerRMQ extends Server<RmqEvents, RmqStatus> {
1✔
57
  public readonly transportId = Transport.RMQ;
23✔
58

59
  protected server: AmqpConnectionManager | null = null;
23✔
60
  protected channel: ChannelWrapper | null = null;
23✔
61
  protected connectionAttempts = 0;
23✔
62
  protected readonly urls: string[] | RmqUrl[];
63
  protected readonly queue: string;
64
  protected readonly noAck: boolean;
65
  protected readonly queueOptions: any;
66
  protected readonly wildcardHandlers = new Map<RegExp, MessageHandler>();
23✔
67
  protected pendingEventListeners: Array<{
23✔
68
    event: keyof RmqEvents;
69
    callback: RmqEvents[keyof RmqEvents];
70
  }> = [];
71

72
  constructor(protected readonly options: Required<RmqOptions>['options']) {
23✔
73
    super();
23✔
74
    this.urls = this.getOptionsProp(this.options, 'urls') || [RQM_DEFAULT_URL];
23✔
75
    this.queue =
23✔
76
      this.getOptionsProp(this.options, 'queue') || RQM_DEFAULT_QUEUE;
46✔
77
    this.noAck = this.getOptionsProp(this.options, 'noAck', RQM_DEFAULT_NOACK);
23✔
78
    this.queueOptions =
23✔
79
      this.getOptionsProp(this.options, 'queueOptions') ||
46✔
80
      RQM_DEFAULT_QUEUE_OPTIONS;
81

82
    this.loadPackage('amqplib', ServerRMQ.name, () => require('amqplib'));
23✔
83
    rmqPackage = this.loadPackage(
23✔
84
      'amqp-connection-manager',
85
      ServerRMQ.name,
86
      () => require('amqp-connection-manager'),
23✔
87
    );
88

89
    this.initializeSerializer(options);
23✔
90
    this.initializeDeserializer(options);
23✔
91
  }
92

93
  public async listen(
94
    callback: (err?: unknown, ...optionalParams: unknown[]) => void,
95
  ): Promise<void> {
96
    try {
5✔
97
      await this.start(callback);
5✔
98
    } catch (err) {
99
      callback(err);
1✔
100
    }
101
  }
102

103
  public async close(): Promise<void> {
104
    this.channel && (await this.channel.close());
2✔
105
    this.server && (await this.server.close());
2✔
106
    this.pendingEventListeners = [];
2✔
107
  }
108

109
  public async start(
110
    callback?: (err?: unknown, ...optionalParams: unknown[]) => void,
111
  ) {
112
    this.server = this.createClient();
4✔
113
    this.server!.once(RmqEventsMap.CONNECT, () => {
4✔
114
      if (this.channel) {
4!
115
        return;
×
116
      }
117
      this._status$.next(RmqStatus.CONNECTED);
4✔
118
      this.channel = this.server!.createChannel({
4✔
119
        json: false,
120
        setup: (channel: Channel) => this.setupChannel(channel, callback!),
4✔
121
      });
122
    });
123

124
    const maxConnectionAttempts = this.getOptionsProp(
4✔
125
      this.options,
126
      'maxConnectionAttempts',
127
      INFINITE_CONNECTION_ATTEMPTS,
128
    );
129

130
    this.registerConnectListener();
4✔
131
    this.registerDisconnectListener();
4✔
132
    this.pendingEventListeners.forEach(({ event, callback }) =>
4✔
133
      this.server!.on(event, callback),
×
134
    );
135
    this.pendingEventListeners = [];
4✔
136

137
    const connectFailedEvent = 'connectFailed';
4✔
138
    this.server!.once(
4✔
139
      connectFailedEvent,
140
      async (error: Record<string, unknown>) => {
141
        this._status$.next(RmqStatus.DISCONNECTED);
×
142

143
        this.logger.error(CONNECTION_FAILED_MESSAGE);
×
144
        if (error?.err) {
×
145
          this.logger.error(error.err);
×
146
        }
147
        const isReconnecting = !!this.channel;
×
148
        if (
×
149
          maxConnectionAttempts === INFINITE_CONNECTION_ATTEMPTS ||
×
150
          isReconnecting
151
        ) {
152
          return;
×
153
        }
154
        if (++this.connectionAttempts === maxConnectionAttempts) {
×
155
          await this.close();
×
156
          callback?.(error.err ?? new Error(CONNECTION_FAILED_MESSAGE));
×
157
        }
158
      },
159
    );
160
  }
161

162
  public createClient<T = any>(): T {
163
    const socketOptions = this.getOptionsProp(this.options, 'socketOptions');
×
164
    return rmqPackage.connect(this.urls, {
×
165
      connectionOptions: socketOptions?.connectionOptions,
166
      heartbeatIntervalInSeconds: socketOptions?.heartbeatIntervalInSeconds,
167
      reconnectTimeInSeconds: socketOptions?.reconnectTimeInSeconds,
168
    });
169
  }
170

171
  private registerConnectListener() {
172
    this.server!.on(RmqEventsMap.CONNECT, (err: any) => {
4✔
173
      this._status$.next(RmqStatus.CONNECTED);
4✔
174
    });
175
  }
176

177
  private registerDisconnectListener() {
178
    this.server!.on(RmqEventsMap.DISCONNECT, (err: any) => {
4✔
179
      this._status$.next(RmqStatus.DISCONNECTED);
×
180
      this.logger.error(DISCONNECTED_RMQ_MESSAGE);
×
181
      this.logger.error(err);
×
182
    });
183
  }
184

185
  public async setupChannel(channel: Channel, callback: Function) {
186
    const noAssert =
187
      this.getOptionsProp(this.options, 'noAssert') ??
5✔
188
      this.queueOptions.noAssert ??
189
      RQM_DEFAULT_NO_ASSERT;
190

191
    if (!noAssert) {
5✔
192
      await channel.assertQueue(this.queue, this.queueOptions);
4✔
193
    }
194

195
    const isGlobalPrefetchCount = this.getOptionsProp(
5✔
196
      this.options,
197
      'isGlobalPrefetchCount',
198
      RQM_DEFAULT_IS_GLOBAL_PREFETCH_COUNT,
199
    );
200
    const prefetchCount = this.getOptionsProp(
5✔
201
      this.options,
202
      'prefetchCount',
203
      RQM_DEFAULT_PREFETCH_COUNT,
204
    );
205

206
    if (this.options.exchange || this.options.wildcards) {
5!
207
      // Use queue name as exchange name if exchange is not provided and "wildcards" is set to true
208
      const exchange = this.getOptionsProp(
×
209
        this.options,
210
        'exchange',
211
        this.options.queue,
212
      );
213
      const exchangeType = this.getOptionsProp(
×
214
        this.options,
215
        'exchangeType',
216
        'topic',
217
      );
218
      await channel.assertExchange(exchange, exchangeType, {
×
219
        durable: true,
220
      });
221

UNCOV
222
      if (this.options.routingKey) {
×
223
        await channel.bindQueue(this.queue, exchange, this.options.routingKey);
×
224
      }
225

UNCOV
226
      if (this.options.wildcards) {
×
227
        const routingKeys = Array.from(this.getHandlers().keys());
×
228
        await Promise.all(
×
229
          routingKeys.map(routingKey =>
UNCOV
230
            channel.bindQueue(this.queue, exchange, routingKey),
×
231
          ),
232
        );
233

234
        // When "wildcards" is set to true,  we need to initialize wildcard handlers
235
        // otherwise we would not be able to associate the incoming messages with the handlers
UNCOV
236
        this.initializeWildcardHandlersIfExist();
×
237
      }
238
    }
239

240
    await channel.prefetch(prefetchCount, isGlobalPrefetchCount);
5✔
241
    channel.consume(
5✔
242
      this.queue,
UNCOV
243
      (msg: Record<string, any> | null) => this.handleMessage(msg!, channel),
×
244
      {
245
        noAck: this.noAck,
246
        consumerTag: this.getOptionsProp(
247
          this.options,
248
          'consumerTag',
249
          undefined,
250
        ),
251
      },
252
    );
253
    callback();
5✔
254
  }
255

256
  public async handleMessage(
257
    message: Record<string, any>,
258
    channel: any,
259
  ): Promise<void> {
260
    if (isNil(message)) {
6!
UNCOV
261
      return;
×
262
    }
263
    const { content, properties } = message;
6✔
264
    const rawMessage = this.parseMessageContent(content);
6✔
265
    const packet = await this.deserializer.deserialize(rawMessage, properties);
6✔
266
    const pattern = isString(packet.pattern)
6✔
267
      ? packet.pattern
6✔
268
      : JSON.stringify(packet.pattern);
269

270
    const rmqContext = new RmqContext([message, channel, pattern]);
6✔
271
    if (isUndefined((packet as IncomingRequest).id)) {
6✔
272
      return this.handleEvent(pattern, packet, rmqContext);
2✔
273
    }
274
    const handler = this.getHandlerByPattern(pattern);
4✔
275

276
    if (!handler) {
4✔
277
      if (!this.noAck) {
3✔
278
        this.logger.warn(RQM_NO_MESSAGE_HANDLER`${pattern}`);
1✔
279
        this.channel!.nack(rmqContext.getMessage() as Message, false, false);
1✔
280
      }
281
      const status = 'error';
3✔
282
      const noHandlerPacket = {
3✔
283
        id: (packet as IncomingRequest).id,
284
        err: NO_MESSAGE_HANDLER,
285
        status,
286
      };
287
      return this.sendMessage(
3✔
288
        noHandlerPacket,
289
        properties.replyTo,
290
        properties.correlationId,
291
      );
292
    }
293
    const response$ = this.transformToObservable(
1✔
294
      await handler(packet.data, rmqContext),
295
    );
296

297
    const publish = <T>(data: T) =>
1✔
298
      this.sendMessage(data, properties.replyTo, properties.correlationId);
1✔
299

300
    response$ && this.send(response$, publish);
1✔
301
  }
302

303
  public async handleEvent(
304
    pattern: string,
305
    packet: ReadPacket,
306
    context: RmqContext,
307
  ): Promise<any> {
308
    const handler = this.getHandlerByPattern(pattern);
5✔
309
    if (!handler && !this.noAck) {
5✔
310
      this.channel!.nack(context.getMessage() as Message, false, false);
1✔
311
      return this.logger.warn(RQM_NO_EVENT_HANDLER`${pattern}`);
1✔
312
    }
313
    return super.handleEvent(pattern, packet, context);
4✔
314
  }
315

316
  public sendMessage<T = any>(
317
    message: T,
318
    replyTo: any,
319
    correlationId: string,
320
  ): void {
321
    const outgoingResponse = this.serializer.serialize(
1✔
322
      message as unknown as OutgoingResponse,
323
    );
324
    const options = outgoingResponse.options;
1✔
325
    delete outgoingResponse.options;
1✔
326

327
    const buffer = Buffer.from(JSON.stringify(outgoingResponse));
1✔
328
    const sendOptions = { correlationId, ...options };
1✔
329
    this.channel!.sendToQueue(replyTo, buffer, sendOptions);
1✔
330
  }
331

332
  public unwrap<T>(): T {
UNCOV
333
    if (!this.server) {
×
334
      throw new Error(
×
335
        'Not initialized. Please call the "listen"/"startAllMicroservices" method before accessing the server.',
336
      );
337
    }
UNCOV
338
    return this.server as T;
×
339
  }
340

341
  public on<
342
    EventKey extends keyof RmqEvents = keyof RmqEvents,
343
    EventCallback extends RmqEvents[EventKey] = RmqEvents[EventKey],
344
  >(event: EventKey, callback: EventCallback) {
UNCOV
345
    if (this.server) {
×
346
      this.server.addListener(event, callback);
×
347
    } else {
UNCOV
348
      this.pendingEventListeners.push({ event, callback });
×
349
    }
350
  }
351

352
  public getHandlerByPattern(pattern: string): MessageHandler | null {
353
    if (!this.options.wildcards) {
13!
354
      return super.getHandlerByPattern(pattern);
13✔
355
    }
356

357
    // Search for non-wildcard handler first
UNCOV
358
    const handler = super.getHandlerByPattern(pattern);
×
359
    if (handler) {
×
360
      return handler;
×
361
    }
362

363
    // Search for wildcard handler
UNCOV
364
    if (this.wildcardHandlers.size === 0) {
×
365
      return null;
×
366
    }
UNCOV
367
    for (const [regex, handler] of this.wildcardHandlers) {
×
368
      if (regex.test(pattern)) {
×
369
        return handler;
×
370
      }
371
    }
UNCOV
372
    return null;
×
373
  }
374

375
  protected initializeSerializer(options: RmqOptions['options']) {
376
    this.serializer = options?.serializer ?? new RmqRecordSerializer();
23✔
377
  }
378

379
  private parseMessageContent(content: Buffer) {
380
    try {
6✔
381
      return JSON.parse(content.toString());
6✔
382
    } catch {
383
      return content.toString();
1✔
384
    }
385
  }
386

387
  private initializeWildcardHandlersIfExist() {
UNCOV
388
    if (this.wildcardHandlers.size !== 0) {
×
389
      return;
×
390
    }
UNCOV
391
    const handlers = this.getHandlers();
×
392

UNCOV
393
    handlers.forEach((handler, pattern) => {
×
394
      const regex = this.convertRoutingKeyToRegex(pattern);
×
395
      if (regex) {
×
396
        this.wildcardHandlers.set(regex, handler);
×
397
      }
398
    });
399
  }
400

401
  private convertRoutingKeyToRegex(routingKey: string): RegExp | undefined {
UNCOV
402
    if (!routingKey.includes('#') && !routingKey.includes('*')) {
×
403
      return;
×
404
    }
UNCOV
405
    let regexPattern = routingKey.replace(/\\/g, '\\\\').replace(/\./g, '\\.');
×
406
    regexPattern = regexPattern.replace(/\*/g, '[^.]+');
×
407
    regexPattern = regexPattern.replace(/#/g, '.*');
×
408
    return new RegExp(`^${regexPattern}$`);
×
409
  }
410
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc