• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

nestjs / nest / 3cfd9218-59df-4aa0-9b37-47f96a0aeb4f

pending completion
3cfd9218-59df-4aa0-9b37-47f96a0aeb4f

Pull #11910

circleci

esahin90
feat(microservices): handle kafka consumer crashes

To be able to react to consumer crashes within kafkajs, which are not
retryable, the ClientKafka and ServerKafka have to listen to CRASH
events and act accordingly. Listen to CRASH events and throw an
exception, if the crash is not retried.

Documentation: https://kafka.js.org/docs/instrumentation-events#a-name-consumer-a-consumer
Pull Request #11910: feat(microservices): handle kafka consumer crashes

2395 of 2867 branches covered (83.54%)

8 of 8 new or added lines in 2 files covered. (100.0%)

6365 of 6860 relevant lines covered (92.78%)

16.48 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

91.57
/packages/microservices/client/client-kafka.ts
1
import { Logger } from '@nestjs/common/services/logger.service';
1✔
2
import { loadPackage } from '@nestjs/common/utils/load-package.util';
1✔
3
import { isUndefined } from '@nestjs/common/utils/shared.utils';
1✔
4

5
import {
1✔
6
  KAFKA_DEFAULT_BROKER,
7
  KAFKA_DEFAULT_CLIENT,
8
  KAFKA_DEFAULT_GROUP,
9
} from '../constants';
10
import { KafkaResponseDeserializer } from '../deserializers/kafka-response.deserializer';
1✔
11
import { KafkaHeaders } from '../enums';
1✔
12
import { InvalidKafkaClientTopicException } from '../errors/invalid-kafka-client-topic.exception';
1✔
13
import {
14
  BrokersFunction,
15
  Consumer,
16
  ConsumerConfig,
17
  ConsumerGroupJoinEvent,
18
  EachMessagePayload,
19
  InstrumentationEvent,
20
  Kafka,
21
  KafkaConfig,
22
  KafkaJSError,
23
  KafkaMessage,
24
  Producer,
25
  TopicPartitionOffsetAndMetadata,
26
} from '../external/kafka.interface';
27
import {
1✔
28
  KafkaLogger,
29
  KafkaParser,
30
  KafkaReplyPartitionAssigner,
31
} from '../helpers';
32
import {
33
  KafkaOptions,
34
  OutgoingEvent,
35
  ReadPacket,
36
  WritePacket,
37
} from '../interfaces';
38
import {
1✔
39
  KafkaRequest,
40
  KafkaRequestSerializer,
41
} from '../serializers/kafka-request.serializer';
42
import { ClientProxy } from './client-proxy';
1✔
43

44
let kafkaPackage: any = {};
1✔
45

46
/**
47
 * @publicApi
48
 */
49
export class ClientKafka extends ClientProxy {
1✔
50
  protected logger = new Logger(ClientKafka.name);
37✔
51
  protected client: Kafka | null = null;
37✔
52
  protected consumer: Consumer | null = null;
37✔
53
  protected producer: Producer | null = null;
37✔
54
  protected parser: KafkaParser | null = null;
37✔
55
  protected initialized: Promise<void> | null = null;
37✔
56
  protected responsePatterns: string[] = [];
37✔
57
  protected consumerAssignments: { [key: string]: number } = {};
37✔
58
  protected brokers: string[] | BrokersFunction;
59
  protected clientId: string;
60
  protected groupId: string;
61
  protected producerOnlyMode: boolean;
62

63
  constructor(protected readonly options: KafkaOptions['options']) {
37✔
64
    super();
37✔
65

66
    const clientOptions = this.getOptionsProp(
37✔
67
      this.options,
68
      'client',
69
      {} as KafkaConfig,
70
    );
71
    const consumerOptions = this.getOptionsProp(
37✔
72
      this.options,
73
      'consumer',
74
      {} as ConsumerConfig,
75
    );
76
    const postfixId = this.getOptionsProp(this.options, 'postfixId', '-client');
37✔
77
    this.producerOnlyMode = this.getOptionsProp(
37✔
78
      this.options,
79
      'producerOnlyMode',
80
      false,
81
    );
82

83
    this.brokers = clientOptions.brokers || [KAFKA_DEFAULT_BROKER];
37✔
84

85
    // Append a unique id to the clientId and groupId
86
    // so they don't collide with a microservices client
87
    this.clientId =
37✔
88
      (clientOptions.clientId || KAFKA_DEFAULT_CLIENT) + postfixId;
74✔
89
    this.groupId = (consumerOptions.groupId || KAFKA_DEFAULT_GROUP) + postfixId;
37✔
90

91
    kafkaPackage = loadPackage('kafkajs', ClientKafka.name, () =>
37✔
92
      require('kafkajs'),
37✔
93
    );
94

95
    this.parser = new KafkaParser((options && options.parser) || undefined);
37✔
96

97
    this.initializeSerializer(options);
37✔
98
    this.initializeDeserializer(options);
37✔
99
  }
100

101
  public subscribeToResponseOf(pattern: any): void {
102
    const request = this.normalizePattern(pattern);
1✔
103
    this.responsePatterns.push(this.getResponsePatternName(request));
1✔
104
  }
105

106
  public async close(): Promise<void> {
107
    this.producer && (await this.producer.disconnect());
1✔
108
    this.consumer && (await this.consumer.disconnect());
1✔
109
    this.producer = null;
1✔
110
    this.consumer = null;
1✔
111
    this.initialized = null;
1✔
112
    this.client = null;
1✔
113
  }
114

115
  public async connect(): Promise<Producer> {
116
    if (this.initialized) {
6✔
117
      return this.initialized.then(() => this.producer);
2✔
118
    }
119
    this.initialized = new Promise(async (resolve, reject) => {
4✔
120
      try {
4✔
121
        this.client = this.createClient();
4✔
122

123
        if (!this.producerOnlyMode) {
4✔
124
          const partitionAssigners = [
3✔
125
            (
126
              config: ConstructorParameters<
127
                typeof KafkaReplyPartitionAssigner
128
              >[1],
129
            ) => new KafkaReplyPartitionAssigner(this, config),
×
130
          ];
131

132
          const consumerOptions = Object.assign(
3✔
133
            {
134
              partitionAssigners,
135
            },
136
            this.options.consumer || {},
6✔
137
            {
138
              groupId: this.groupId,
139
            },
140
          );
141

142
          this.consumer = this.client.consumer(consumerOptions);
3✔
143
          // set member assignments on join and rebalance
144
          this.consumer.on(
3✔
145
            this.consumer.events.GROUP_JOIN,
146
            this.setConsumerAssignments.bind(this),
147
          );
148
          this.consumer.on(
3✔
149
            this.consumer.events.CRASH,
150
            this.handleConsumerCrash,
151
          );
152
          await this.consumer.connect();
3✔
153
          await this.bindTopics();
3✔
154
        }
155

156
        this.producer = this.client.producer(this.options.producer || {});
4✔
157
        await this.producer.connect();
4✔
158

159
        resolve();
4✔
160
      } catch (err) {
161
        reject(err);
×
162
      }
163
    });
164
    return this.initialized.then(() => this.producer);
4✔
165
  }
166

167
  public async bindTopics(): Promise<void> {
168
    if (!this.consumer) {
4!
169
      throw Error('No consumer initialized');
×
170
    }
171

172
    const consumerSubscribeOptions = this.options.subscribe || {};
4✔
173

174
    if (this.responsePatterns.length > 0) {
4✔
175
      await this.consumer.subscribe({
2✔
176
        ...consumerSubscribeOptions,
177
        topics: this.responsePatterns,
178
      });
179
    }
180

181
    await this.consumer.run(
4✔
182
      Object.assign(this.options.run || {}, {
8✔
183
        eachMessage: this.createResponseCallback(),
184
      }),
185
    );
186
  }
187

188
  public createClient<T = any>(): T {
189
    const kafkaConfig: KafkaConfig = Object.assign(
1✔
190
      { logCreator: KafkaLogger.bind(null, this.logger) },
191
      this.options.client,
192
      { brokers: this.brokers, clientId: this.clientId },
193
    );
194

195
    return new kafkaPackage.Kafka(kafkaConfig);
1✔
196
  }
197

198
  public createResponseCallback(): (payload: EachMessagePayload) => any {
199
    return async (payload: EachMessagePayload) => {
9✔
200
      const rawMessage = this.parser.parse<KafkaMessage>(
5✔
201
        Object.assign(payload.message, {
202
          topic: payload.topic,
203
          partition: payload.partition,
204
        }),
205
      );
206
      if (isUndefined(rawMessage.headers[KafkaHeaders.CORRELATION_ID])) {
5✔
207
        return;
1✔
208
      }
209
      const { err, response, isDisposed, id } =
210
        await this.deserializer.deserialize(rawMessage);
4✔
211
      const callback = this.routingMap.get(id);
4✔
212
      if (!callback) {
4✔
213
        return;
1✔
214
      }
215
      if (err || isDisposed) {
3✔
216
        return callback({
2✔
217
          err,
218
          response,
219
          isDisposed,
220
        });
221
      }
222
      callback({
1✔
223
        err,
224
        response,
225
      });
226
    };
227
  }
228

229
  public getConsumerAssignments() {
230
    return this.consumerAssignments;
2✔
231
  }
232

233
  protected async dispatchEvent(packet: OutgoingEvent): Promise<any> {
234
    const pattern = this.normalizePattern(packet.pattern);
2✔
235
    const outgoingEvent = await this.serializer.serialize(packet.data, {
2✔
236
      pattern,
237
    });
238
    const message = Object.assign(
2✔
239
      {
240
        topic: pattern,
241
        messages: [outgoingEvent],
242
      },
243
      this.options.send || {},
4✔
244
    );
245

246
    return this.producer.send(message);
2✔
247
  }
248

249
  protected getReplyTopicPartition(topic: string): string {
250
    const minimumPartition = this.consumerAssignments[topic];
11✔
251
    if (isUndefined(minimumPartition)) {
11✔
252
      throw new InvalidKafkaClientTopicException(topic);
2✔
253
    }
254

255
    // get the minimum partition
256
    return minimumPartition.toString();
9✔
257
  }
258

259
  protected publish(
260
    partialPacket: ReadPacket,
261
    callback: (packet: WritePacket) => any,
262
  ): () => void {
263
    const packet = this.assignPacketId(partialPacket);
9✔
264
    this.routingMap.set(packet.id, callback);
9✔
265

266
    const cleanup = () => this.routingMap.delete(packet.id);
9✔
267
    const errorCallback = (err: unknown) => {
9✔
268
      cleanup();
1✔
269
      callback({ err });
1✔
270
    };
271

272
    try {
9✔
273
      const pattern = this.normalizePattern(partialPacket.pattern);
9✔
274
      const replyTopic = this.getResponsePatternName(pattern);
9✔
275
      const replyPartition = this.getReplyTopicPartition(replyTopic);
9✔
276

277
      Promise.resolve(this.serializer.serialize(packet.data, { pattern }))
9✔
278
        .then((serializedPacket: KafkaRequest) => {
279
          serializedPacket.headers[KafkaHeaders.CORRELATION_ID] = packet.id;
9✔
280
          serializedPacket.headers[KafkaHeaders.REPLY_TOPIC] = replyTopic;
9✔
281
          serializedPacket.headers[KafkaHeaders.REPLY_PARTITION] =
9✔
282
            replyPartition;
283

284
          const message = Object.assign(
9✔
285
            {
286
              topic: pattern,
287
              messages: [serializedPacket],
288
            },
289
            this.options.send || {},
18✔
290
          );
291

292
          return this.producer.send(message);
9✔
293
        })
294
        .catch(err => errorCallback(err));
1✔
295

296
      return cleanup;
9✔
297
    } catch (err) {
298
      errorCallback(err);
×
299
    }
300
  }
301

302
  protected getResponsePatternName(pattern: string): string {
303
    return `${pattern}.reply`;
9✔
304
  }
305

306
  protected setConsumerAssignments(data: ConsumerGroupJoinEvent): void {
307
    const consumerAssignments: { [key: string]: number } = {};
2✔
308

309
    // only need to set the minimum
310
    Object.keys(data.payload.memberAssignment).forEach(topic => {
2✔
311
      const memberPartitions = data.payload.memberAssignment[topic];
4✔
312

313
      if (memberPartitions.length) {
4✔
314
        consumerAssignments[topic] = Math.min(...memberPartitions);
3✔
315
      }
316
    });
317

318
    this.consumerAssignments = consumerAssignments;
2✔
319
  }
320

321
  protected initializeSerializer(options: KafkaOptions['options']) {
322
    this.serializer =
37✔
323
      (options && options.serializer) || new KafkaRequestSerializer();
110✔
324
  }
325

326
  protected initializeDeserializer(options: KafkaOptions['options']) {
327
    this.deserializer =
37✔
328
      (options && options.deserializer) || new KafkaResponseDeserializer();
110✔
329
  }
330

331
  public commitOffsets(
332
    topicPartitions: TopicPartitionOffsetAndMetadata[],
333
  ): Promise<void> {
334
    if (this.consumer) {
×
335
      return this.consumer.commitOffsets(topicPartitions);
×
336
    } else {
337
      throw new Error('No consumer initialized');
×
338
    }
339
  }
340

341
  private handleConsumerCrash(
342
    event: InstrumentationEvent<{
343
      error: KafkaJSError;
344
      groupId: string;
345
      restart: boolean;
346
    }>,
347
  ) {
348
    const { error, groupId, restart } = event.payload;
×
349

350
    if (!restart) {
×
351
      throw new Error('Consumer crashed');
×
352
    }
353
  }
354
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc