• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

nestjs / nest / f7142b24-e394-4028-89c6-717e281431d1

25 Nov 2024 01:12PM UTC coverage: 90.141% (-1.8%) from 91.989%
f7142b24-e394-4028-89c6-717e281431d1

Pull #14177

circleci

web-flow
Merge pull request #14200 from nestjs/feat/allow-queue-per-handler

feat(microservices): support nats queue per handler
Pull Request #14177: release: version 11.0.0

2612 of 3236 branches covered (80.72%)

496 of 704 new or added lines in 48 files covered. (70.45%)

79 existing lines in 11 files now uncovered.

6985 of 7749 relevant lines covered (90.14%)

16.31 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

84.83
/packages/microservices/client/client-kafka.ts
1
import { Logger } from '@nestjs/common/services/logger.service';
1✔
2
import { loadPackage } from '@nestjs/common/utils/load-package.util';
1✔
3
import { isUndefined } from '@nestjs/common/utils/shared.utils';
1✔
4
import {
1✔
5
  KAFKA_DEFAULT_BROKER,
6
  KAFKA_DEFAULT_CLIENT,
7
  KAFKA_DEFAULT_GROUP,
8
} from '../constants';
9
import { KafkaResponseDeserializer } from '../deserializers/kafka-response.deserializer';
1✔
10
import { KafkaHeaders } from '../enums';
1✔
11
import { InvalidKafkaClientTopicException } from '../errors/invalid-kafka-client-topic.exception';
1✔
12
import { KafkaStatus } from '../events';
13
import {
14
  BrokersFunction,
15
  Consumer,
16
  ConsumerConfig,
17
  ConsumerGroupJoinEvent,
18
  EachMessagePayload,
19
  Kafka,
20
  KafkaConfig,
21
  KafkaMessage,
22
  Producer,
23
  TopicPartitionOffsetAndMetadata,
24
} from '../external/kafka.interface';
25
import {
1✔
26
  KafkaLogger,
27
  KafkaParser,
28
  KafkaReplyPartitionAssigner,
29
} from '../helpers';
30
import {
31
  ClientKafkaProxy,
32
  KafkaOptions,
33
  MsPattern,
34
  OutgoingEvent,
35
  ReadPacket,
36
  WritePacket,
37
} from '../interfaces';
38
import {
1✔
39
  KafkaRequest,
40
  KafkaRequestSerializer,
41
} from '../serializers/kafka-request.serializer';
42
import { ClientProxy } from './client-proxy';
1✔
43

44
let kafkaPackage: any = {};
1✔
45

46
/**
47
 * @publicApi
48
 */
49
export class ClientKafka
1✔
50
  extends ClientProxy<never, KafkaStatus>
51
  implements ClientKafkaProxy
52
{
53
  protected logger = new Logger(ClientKafka.name);
37✔
54
  protected client: Kafka | null = null;
37✔
55
  protected parser: KafkaParser | null = null;
37✔
56
  protected initialized: Promise<void> | null = null;
37✔
57
  protected responsePatterns: string[] = [];
37✔
58
  protected consumerAssignments: { [key: string]: number } = {};
37✔
59
  protected brokers: string[] | BrokersFunction;
60
  protected clientId: string;
61
  protected groupId: string;
62
  protected producerOnlyMode: boolean;
63
  protected _consumer: Consumer | null = null;
37✔
64
  protected _producer: Producer | null = null;
37✔
65

66
  get consumer(): Consumer {
67
    if (!this._consumer) {
3!
NEW
68
      throw new Error(
×
69
        'No consumer initialized. Please, call the "connect" method first.',
70
      );
71
    }
72
    return this._consumer;
3✔
73
  }
74

75
  get producer(): Producer {
NEW
76
    if (!this._consumer) {
×
NEW
77
      throw new Error(
×
78
        'No producer initialized. Please, call the "connect" method first.',
79
      );
80
    }
NEW
81
    return this._producer;
×
82
  }
83

84
  constructor(protected readonly options: KafkaOptions['options']) {
37✔
85
    super();
37✔
86

87
    const clientOptions = this.getOptionsProp(
37✔
88
      this.options,
89
      'client',
90
      {} as KafkaConfig,
91
    );
92
    const consumerOptions = this.getOptionsProp(
37✔
93
      this.options,
94
      'consumer',
95
      {} as ConsumerConfig,
96
    );
97
    const postfixId = this.getOptionsProp(this.options, 'postfixId', '-client');
37✔
98
    this.producerOnlyMode = this.getOptionsProp(
37✔
99
      this.options,
100
      'producerOnlyMode',
101
      false,
102
    );
103

104
    this.brokers = clientOptions.brokers || [KAFKA_DEFAULT_BROKER];
37✔
105

106
    // Append a unique id to the clientId and groupId
107
    // so they don't collide with a microservices client
108
    this.clientId =
37✔
109
      (clientOptions.clientId || KAFKA_DEFAULT_CLIENT) + postfixId;
74✔
110
    this.groupId = (consumerOptions.groupId || KAFKA_DEFAULT_GROUP) + postfixId;
37✔
111

112
    kafkaPackage = loadPackage('kafkajs', ClientKafka.name, () =>
37✔
113
      require('kafkajs'),
37✔
114
    );
115

116
    this.parser = new KafkaParser((options && options.parser) || undefined);
37✔
117

118
    this.initializeSerializer(options);
37✔
119
    this.initializeDeserializer(options);
37✔
120
  }
121

122
  public subscribeToResponseOf(pattern: unknown): void {
123
    const request = this.normalizePattern(pattern as MsPattern);
1✔
124
    this.responsePatterns.push(this.getResponsePatternName(request));
1✔
125
  }
126

127
  public async close(): Promise<void> {
128
    this._producer && (await this._producer.disconnect());
1✔
129
    this._consumer && (await this._consumer.disconnect());
1✔
130
    this._producer = null;
1✔
131
    this._consumer = null;
1✔
132
    this.initialized = null;
1✔
133
    this.client = null;
1✔
134
  }
135

136
  public async connect(): Promise<Producer> {
137
    if (this.initialized) {
6✔
138
      return this.initialized.then(() => this._producer);
2✔
139
    }
140
    this.initialized = new Promise(async (resolve, reject) => {
4✔
141
      try {
4✔
142
        this.client = this.createClient();
4✔
143
        if (!this.producerOnlyMode) {
4✔
144
          const partitionAssigners = [
3✔
145
            (
146
              config: ConstructorParameters<
147
                typeof KafkaReplyPartitionAssigner
148
              >[1],
UNCOV
149
            ) => new KafkaReplyPartitionAssigner(this, config),
×
150
          ];
151

152
          const consumerOptions = Object.assign(
3✔
153
            {
154
              partitionAssigners,
155
            },
156
            this.options.consumer || {},
6✔
157
            {
158
              groupId: this.groupId,
159
            },
160
          );
161

162
          this._consumer = this.client.consumer(consumerOptions);
3✔
163
          this.registerConsumerEventListeners();
3✔
164

165
          // Set member assignments on join and rebalance
166
          this._consumer.on(
3✔
167
            this._consumer.events.GROUP_JOIN,
168
            this.setConsumerAssignments.bind(this),
169
          );
170
          await this._consumer.connect();
3✔
171
          await this.bindTopics();
3✔
172
        }
173

174
        this._producer = this.client.producer(this.options.producer || {});
4✔
175
        this.registerProducerEventListeners();
4✔
176
        await this._producer.connect();
4✔
177

178
        resolve();
4✔
179
      } catch (err) {
UNCOV
180
        reject(err);
×
181
      }
182
    });
183
    return this.initialized.then(() => this._producer);
4✔
184
  }
185

186
  public async bindTopics(): Promise<void> {
187
    if (!this._consumer) {
4!
UNCOV
188
      throw Error('No consumer initialized');
×
189
    }
190

191
    const consumerSubscribeOptions = this.options.subscribe || {};
4✔
192

193
    if (this.responsePatterns.length > 0) {
4✔
194
      await this._consumer.subscribe({
2✔
195
        ...consumerSubscribeOptions,
196
        topics: this.responsePatterns,
197
      });
198
    }
199

200
    await this._consumer.run(
4✔
201
      Object.assign(this.options.run || {}, {
8✔
202
        eachMessage: this.createResponseCallback(),
203
      }),
204
    );
205
  }
206

207
  public createClient<T = any>(): T {
208
    const kafkaConfig: KafkaConfig = Object.assign(
1✔
209
      { logCreator: KafkaLogger.bind(null, this.logger) },
210
      this.options.client,
211
      { brokers: this.brokers, clientId: this.clientId },
212
    );
213

214
    return new kafkaPackage.Kafka(kafkaConfig);
1✔
215
  }
216

217
  public createResponseCallback(): (payload: EachMessagePayload) => any {
218
    return async (payload: EachMessagePayload) => {
9✔
219
      const rawMessage = this.parser.parse<KafkaMessage>(
5✔
220
        Object.assign(payload.message, {
221
          topic: payload.topic,
222
          partition: payload.partition,
223
        }),
224
      );
225
      if (isUndefined(rawMessage.headers[KafkaHeaders.CORRELATION_ID])) {
5✔
226
        return;
1✔
227
      }
228
      const { err, response, isDisposed, id } =
229
        await this.deserializer.deserialize(rawMessage);
4✔
230
      const callback = this.routingMap.get(id);
4✔
231
      if (!callback) {
4✔
232
        return;
1✔
233
      }
234
      if (err || isDisposed) {
3✔
235
        return callback({
2✔
236
          err,
237
          response,
238
          isDisposed,
239
        });
240
      }
241
      callback({
1✔
242
        err,
243
        response,
244
      });
245
    };
246
  }
247

248
  public getConsumerAssignments() {
249
    return this.consumerAssignments;
2✔
250
  }
251

252
  public commitOffsets(
253
    topicPartitions: TopicPartitionOffsetAndMetadata[],
254
  ): Promise<void> {
NEW
255
    if (this._consumer) {
×
NEW
256
      return this._consumer.commitOffsets(topicPartitions);
×
257
    } else {
NEW
258
      throw new Error('No consumer initialized');
×
259
    }
260
  }
261

262
  public unwrap<T>(): T {
NEW
263
    if (!this.client) {
×
NEW
264
      throw new Error(
×
265
        'Not initialized. Please call the "connect" method first.',
266
      );
267
    }
NEW
268
    return this.client as T;
×
269
  }
270

271
  protected registerConsumerEventListeners() {
272
    this._consumer.on(this._consumer.events.CONNECT, () =>
3✔
NEW
273
      this._status$.next(KafkaStatus.CONNECTED),
×
274
    );
275
    this._consumer.on(this._consumer.events.DISCONNECT, () =>
3✔
NEW
276
      this._status$.next(KafkaStatus.DISCONNECTED),
×
277
    );
278
    this._consumer.on(this._consumer.events.REBALANCING, () =>
3✔
NEW
279
      this._status$.next(KafkaStatus.REBALANCING),
×
280
    );
281
    this._consumer.on(this._consumer.events.STOP, () =>
3✔
NEW
282
      this._status$.next(KafkaStatus.STOPPED),
×
283
    );
284
    this.consumer.on(this._consumer.events.CRASH, () =>
3✔
NEW
285
      this._status$.next(KafkaStatus.CRASHED),
×
286
    );
287
  }
288

289
  protected registerProducerEventListeners() {
290
    this._producer.on(this._producer.events.CONNECT, () =>
4✔
NEW
291
      this._status$.next(KafkaStatus.CONNECTED),
×
292
    );
293
    this._producer.on(this._producer.events.DISCONNECT, () =>
4✔
NEW
294
      this._status$.next(KafkaStatus.DISCONNECTED),
×
295
    );
296
  }
297

298
  protected async dispatchEvent(packet: OutgoingEvent): Promise<any> {
299
    const pattern = this.normalizePattern(packet.pattern);
2✔
300
    const outgoingEvent = await this.serializer.serialize(packet.data, {
2✔
301
      pattern,
302
    });
303
    const message = Object.assign(
2✔
304
      {
305
        topic: pattern,
306
        messages: [outgoingEvent],
307
      },
308
      this.options.send || {},
4✔
309
    );
310

311
    return this._producer.send(message);
2✔
312
  }
313

314
  protected getReplyTopicPartition(topic: string): string {
315
    const minimumPartition = this.consumerAssignments[topic];
11✔
316
    if (isUndefined(minimumPartition)) {
11✔
317
      throw new InvalidKafkaClientTopicException(topic);
2✔
318
    }
319

320
    // Get the minimum partition
321
    return minimumPartition.toString();
9✔
322
  }
323

324
  protected publish(
325
    partialPacket: ReadPacket,
326
    callback: (packet: WritePacket) => any,
327
  ): () => void {
328
    const packet = this.assignPacketId(partialPacket);
9✔
329
    this.routingMap.set(packet.id, callback);
9✔
330

331
    const cleanup = () => this.routingMap.delete(packet.id);
9✔
332
    const errorCallback = (err: unknown) => {
9✔
333
      cleanup();
1✔
334
      callback({ err });
1✔
335
    };
336

337
    try {
9✔
338
      const pattern = this.normalizePattern(partialPacket.pattern);
9✔
339
      const replyTopic = this.getResponsePatternName(pattern);
9✔
340
      const replyPartition = this.getReplyTopicPartition(replyTopic);
9✔
341

342
      Promise.resolve(this.serializer.serialize(packet.data, { pattern }))
9✔
343
        .then((serializedPacket: KafkaRequest) => {
344
          serializedPacket.headers[KafkaHeaders.CORRELATION_ID] = packet.id;
9✔
345
          serializedPacket.headers[KafkaHeaders.REPLY_TOPIC] = replyTopic;
9✔
346
          serializedPacket.headers[KafkaHeaders.REPLY_PARTITION] =
9✔
347
            replyPartition;
348

349
          const message = Object.assign(
9✔
350
            {
351
              topic: pattern,
352
              messages: [serializedPacket],
353
            },
354
            this.options.send || {},
18✔
355
          );
356

357
          return this._producer.send(message);
9✔
358
        })
359
        .catch(err => errorCallback(err));
1✔
360

361
      return cleanup;
9✔
362
    } catch (err) {
UNCOV
363
      errorCallback(err);
×
364
    }
365
  }
366

367
  protected getResponsePatternName(pattern: string): string {
368
    return `${pattern}.reply`;
9✔
369
  }
370

371
  protected setConsumerAssignments(data: ConsumerGroupJoinEvent): void {
372
    const consumerAssignments: { [key: string]: number } = {};
2✔
373

374
    // Only need to set the minimum
375
    Object.keys(data.payload.memberAssignment).forEach(topic => {
2✔
376
      const memberPartitions = data.payload.memberAssignment[topic];
4✔
377

378
      if (memberPartitions.length) {
4✔
379
        consumerAssignments[topic] = Math.min(...memberPartitions);
3✔
380
      }
381
    });
382

383
    this.consumerAssignments = consumerAssignments;
2✔
384
  }
385

386
  protected initializeSerializer(options: KafkaOptions['options']) {
387
    this.serializer =
37✔
388
      (options && options.serializer) || new KafkaRequestSerializer();
111✔
389
  }
390

391
  protected initializeDeserializer(options: KafkaOptions['options']) {
392
    this.deserializer =
37✔
393
      (options && options.deserializer) || new KafkaResponseDeserializer();
111✔
394
  }
395

396
  public on<
397
    EventKey extends string | number | symbol = string | number | symbol,
398
    EventCallback = any,
399
  >(event: EventKey, callback: EventCallback) {
NEW
400
    throw new Error('Method is not supported for Kafka client');
×
401
  }
402
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc