• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

nestjs / nest / de00f050-29aa-4a07-94cd-4a5c3beeb9b5

05 Dec 2023 08:46AM UTC coverage: 92.992% (+0.7%) from 92.258%
de00f050-29aa-4a07-94cd-4a5c3beeb9b5

Pull #12880

circleci

이정현B
feat(core): add option to bind global
Pull Request #12880: fix(core): always bind global modules when scan for modules

2798 of 3460 branches covered (0.0%)

7 of 8 new or added lines in 1 file covered. (87.5%)

199 existing lines in 30 files now uncovered.

6555 of 7049 relevant lines covered (92.99%)

17.99 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

81.42
/packages/microservices/client/client-rmq.ts
1
import { Logger } from '@nestjs/common/services/logger.service';
1✔
2
import { loadPackage } from '@nestjs/common/utils/load-package.util';
1✔
3
import { randomStringGenerator } from '@nestjs/common/utils/random-string-generator.util';
1✔
4
import { isFunction } from '@nestjs/common/utils/shared.utils';
1✔
5
import { EventEmitter } from 'events';
1✔
6
import {
1✔
7
  EmptyError,
8
  firstValueFrom,
9
  fromEvent,
10
  merge,
11
  Observable,
12
  ReplaySubject,
13
} from 'rxjs';
14
import { first, map, retryWhen, scan, skip, switchMap } from 'rxjs/operators';
1✔
15
import {
1✔
16
  CONNECT_EVENT,
17
  CONNECT_FAILED_EVENT,
18
  DISCONNECTED_RMQ_MESSAGE,
19
  DISCONNECT_EVENT,
20
  ERROR_EVENT,
21
  RQM_DEFAULT_IS_GLOBAL_PREFETCH_COUNT,
22
  RQM_DEFAULT_NOACK,
23
  RQM_DEFAULT_NO_ASSERT,
24
  RQM_DEFAULT_PERSISTENT,
25
  RQM_DEFAULT_PREFETCH_COUNT,
26
  RQM_DEFAULT_QUEUE,
27
  RQM_DEFAULT_QUEUE_OPTIONS,
28
  RQM_DEFAULT_URL,
29
} from '../constants';
30
import { RmqUrl } from '../external/rmq-url.interface';
31
import { ReadPacket, RmqOptions, WritePacket } from '../interfaces';
32
import { RmqRecord } from '../record-builders';
33
import { RmqRecordSerializer } from '../serializers/rmq-record.serializer';
1✔
34
import { ClientProxy } from './client-proxy';
1✔
35

36
// import type {
37
//   AmqpConnectionManager,
38
//   ChannelWrapper,
39
// } from 'amqp-connection-manager';
40
// import type { Channel, ConsumeMessage } from 'amqplib';
41

42
type Channel = any;
43
type ChannelWrapper = any;
44
type ConsumeMessage = any;
45
type AmqpConnectionManager = any;
46

47
let rqmPackage: any = {};
1✔
48

49
const REPLY_QUEUE = 'amq.rabbitmq.reply-to';
1✔
50

51
/**
52
 * @publicApi
53
 */
54
export class ClientRMQ extends ClientProxy {
1✔
55
  protected readonly logger = new Logger(ClientProxy.name);
20✔
56
  protected connection$: ReplaySubject<any>;
57
  protected connection: Promise<any>;
58
  protected client: AmqpConnectionManager = null;
20✔
59
  protected channel: ChannelWrapper = null;
20✔
60
  protected urls: string[] | RmqUrl[];
61
  protected queue: string;
62
  protected queueOptions: Record<string, any>;
63
  protected responseEmitter: EventEmitter;
64
  protected replyQueue: string;
65
  protected persistent: boolean;
66
  protected noAssert: boolean;
67

68
  constructor(protected readonly options: RmqOptions['options']) {
20✔
69
    super();
20✔
70
    this.urls = this.getOptionsProp(this.options, 'urls') || [RQM_DEFAULT_URL];
20✔
71
    this.queue =
20✔
72
      this.getOptionsProp(this.options, 'queue') || RQM_DEFAULT_QUEUE;
40✔
73
    this.queueOptions =
20✔
74
      this.getOptionsProp(this.options, 'queueOptions') ||
40✔
75
      RQM_DEFAULT_QUEUE_OPTIONS;
76
    this.replyQueue =
20✔
77
      this.getOptionsProp(this.options, 'replyQueue') || REPLY_QUEUE;
40✔
78
    this.persistent =
20✔
79
      this.getOptionsProp(this.options, 'persistent') || RQM_DEFAULT_PERSISTENT;
40✔
80
    this.noAssert =
20✔
81
      this.getOptionsProp(this.options, 'noAssert') || RQM_DEFAULT_NO_ASSERT;
40✔
82

83
    loadPackage('amqplib', ClientRMQ.name, () => require('amqplib'));
20✔
84
    rqmPackage = loadPackage('amqp-connection-manager', ClientRMQ.name, () =>
20✔
85
      require('amqp-connection-manager'),
20✔
86
    );
87

88
    this.initializeSerializer(options);
20✔
89
    this.initializeDeserializer(options);
20✔
90
  }
91

92
  public close(): void {
93
    this.channel && this.channel.close();
2✔
94
    this.client && this.client.close();
2✔
95
    this.channel = null;
2✔
96
    this.client = null;
2✔
97
  }
98

99
  public connect(): Promise<any> {
100
    if (this.client) {
3!
101
      return this.convertConnectionToPromise();
×
102
    }
103
    this.client = this.createClient();
3✔
104
    this.handleError(this.client);
3✔
105
    this.handleDisconnectError(this.client);
3✔
106

107
    this.responseEmitter = new EventEmitter();
3✔
108
    this.responseEmitter.setMaxListeners(0);
3✔
109

110
    const connect$ = this.connect$(this.client);
3✔
111
    const withDisconnect$ = this.mergeDisconnectEvent(
3✔
112
      this.client,
113
      connect$,
114
    ).pipe(switchMap(() => this.createChannel()));
×
115

116
    const withReconnect$ = fromEvent(this.client, CONNECT_EVENT).pipe(skip(1));
3✔
117
    const source$ = merge(withDisconnect$, withReconnect$);
3✔
118

119
    this.connection$ = new ReplaySubject(1);
3✔
120
    source$.subscribe(this.connection$);
3✔
121

122
    return this.convertConnectionToPromise();
3✔
123
  }
124

125
  public createChannel(): Promise<void> {
126
    return new Promise(resolve => {
2✔
127
      this.channel = this.client.createChannel({
2✔
128
        json: false,
129
        setup: (channel: Channel) => this.setupChannel(channel, resolve),
2✔
130
      });
131
    });
132
  }
133

134
  public createClient(): AmqpConnectionManager {
135
    const socketOptions = this.getOptionsProp(this.options, 'socketOptions');
×
136
    return rqmPackage.connect(this.urls, {
×
137
      connectionOptions: socketOptions,
138
    });
139
  }
140

141
  public mergeDisconnectEvent<T = any>(
142
    instance: any,
143
    source$: Observable<T>,
144
  ): Observable<T> {
145
    const eventToError = (eventType: string) =>
×
146
      fromEvent(instance, eventType).pipe(
×
147
        map((err: unknown) => {
148
          throw err;
×
149
        }),
150
      );
151
    const disconnect$ = eventToError(DISCONNECT_EVENT);
×
152

153
    const urls = this.getOptionsProp(this.options, 'urls', []);
×
154
    const connectFailed$ = eventToError(CONNECT_FAILED_EVENT).pipe(
×
155
      retryWhen(e =>
156
        e.pipe(
×
157
          scan((errorCount, error: any) => {
158
            if (urls.indexOf(error.url) >= urls.length - 1) {
×
159
              throw error;
×
160
            }
161
            return errorCount + 1;
×
162
          }, 0),
163
        ),
164
      ),
165
    );
166
    // If we ever decide to propagate all disconnect errors & re-emit them through
167
    // the "connection" stream then comment out "first()" operator.
168
    return merge(source$, disconnect$, connectFailed$).pipe(first());
×
169
  }
170

171
  public async convertConnectionToPromise() {
172
    try {
3✔
173
      return await firstValueFrom(this.connection$);
3✔
174
    } catch (err) {
175
      if (err instanceof EmptyError) {
3!
176
        return;
×
177
      }
178
      throw err;
3✔
179
    }
180
  }
181

182
  public async setupChannel(channel: Channel, resolve: Function) {
183
    const prefetchCount =
184
      this.getOptionsProp(this.options, 'prefetchCount') ||
4!
185
      RQM_DEFAULT_PREFETCH_COUNT;
186
    const isGlobalPrefetchCount =
187
      this.getOptionsProp(this.options, 'isGlobalPrefetchCount') ||
4!
188
      RQM_DEFAULT_IS_GLOBAL_PREFETCH_COUNT;
189

190
    if (!this.queueOptions.noAssert) {
4!
191
      await channel.assertQueue(this.queue, this.queueOptions);
4✔
192
    }
193
    await channel.prefetch(prefetchCount, isGlobalPrefetchCount);
4✔
194
    await this.consumeChannel(channel);
4✔
195
    resolve();
4✔
196
  }
197

198
  public async consumeChannel(channel: Channel) {
199
    const noAck = this.getOptionsProp(this.options, 'noAck', RQM_DEFAULT_NOACK);
1✔
200
    await channel.consume(
1✔
201
      this.replyQueue,
202
      (msg: ConsumeMessage) =>
203
        this.responseEmitter.emit(msg.properties.correlationId, msg),
1✔
204
      {
205
        noAck,
206
      },
207
    );
208
  }
209

210
  public handleError(client: AmqpConnectionManager): void {
211
    client.addListener(ERROR_EVENT, (err: any) => this.logger.error(err));
3✔
212
  }
213

214
  public handleDisconnectError(client: AmqpConnectionManager): void {
215
    client.addListener(DISCONNECT_EVENT, (err: any) => {
3✔
216
      this.logger.error(DISCONNECTED_RMQ_MESSAGE);
×
217
      this.logger.error(err);
×
218
    });
219
  }
220

221
  public async handleMessage(
222
    packet: unknown,
223
    callback: (packet: WritePacket) => any,
224
  );
225
  public async handleMessage(
226
    packet: unknown,
227
    options: Record<string, unknown>,
228
    callback: (packet: WritePacket) => any,
229
  );
230
  public async handleMessage(
231
    packet: unknown,
232
    options: Record<string, unknown> | ((packet: WritePacket) => any),
233
    callback?: (packet: WritePacket) => any,
234
  ) {
235
    if (isFunction(options)) {
3!
236
      callback = options as (packet: WritePacket) => any;
3✔
237
      options = undefined;
3✔
238
    }
239

240
    const { err, response, isDisposed } = await this.deserializer.deserialize(
3✔
241
      packet,
242
      options,
243
    );
244
    if (isDisposed || err) {
3✔
245
      callback({
2✔
246
        err,
247
        response,
248
        isDisposed: true,
249
      });
250
    }
251
    callback({
3✔
252
      err,
253
      response,
254
    });
255
  }
256

257
  protected publish(
258
    message: ReadPacket,
259
    callback: (packet: WritePacket) => any,
260
  ): () => void {
261
    try {
7✔
262
      const correlationId = randomStringGenerator();
7✔
263
      const listener = ({
7✔
264
        content,
265
        options,
266
      }: {
267
        content: Buffer;
268
        options: Record<string, unknown>;
269
      }) =>
270
        this.handleMessage(JSON.parse(content.toString()), options, callback);
×
271

272
      Object.assign(message, { id: correlationId });
7✔
273
      const serializedPacket: ReadPacket & Partial<RmqRecord> =
274
        this.serializer.serialize(message);
7✔
275

276
      const options = serializedPacket.options;
7✔
277
      delete serializedPacket.options;
7✔
278

279
      this.responseEmitter.on(correlationId, listener);
7✔
280
      this.channel
7✔
281
        .sendToQueue(
282
          this.queue,
283
          Buffer.from(JSON.stringify(serializedPacket)),
284
          {
285
            replyTo: this.replyQueue,
286
            persistent: this.persistent,
287
            ...options,
288
            headers: this.mergeHeaders(options?.headers),
21✔
289
            correlationId,
290
          },
291
        )
UNCOV
292
        .catch(err => callback({ err }));
×
293
      return () => this.responseEmitter.removeListener(correlationId, listener);
7✔
294
    } catch (err) {
UNCOV
295
      callback({ err });
×
296
    }
297
  }
298

299
  protected dispatchEvent(packet: ReadPacket): Promise<any> {
300
    const serializedPacket: ReadPacket & Partial<RmqRecord> =
301
      this.serializer.serialize(packet);
6✔
302

303
    const options = serializedPacket.options;
6✔
304
    delete serializedPacket.options;
6✔
305

306
    return new Promise<void>((resolve, reject) =>
6✔
307
      this.channel.sendToQueue(
6✔
308
        this.queue,
309
        Buffer.from(JSON.stringify(serializedPacket)),
310
        {
311
          persistent: this.persistent,
312
          ...options,
313
          headers: this.mergeHeaders(options?.headers),
18✔
314
        },
315
        (err: unknown) => (err ? reject(err) : resolve()),
6✔
316
      ),
317
    );
318
  }
319

320
  protected initializeSerializer(options: RmqOptions['options']) {
321
    this.serializer = options?.serializer ?? new RmqRecordSerializer();
20!
322
  }
323

324
  protected mergeHeaders(
325
    requestHeaders?: Record<string, string>,
326
  ): Record<string, string> | undefined {
327
    if (!requestHeaders && !this.options?.headers) {
13!
328
      return undefined;
7✔
329
    }
330

331
    return {
6✔
332
      ...this.options?.headers,
18!
333
      ...requestHeaders,
334
    };
335
  }
336
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc