• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

nestjs / nest / 3cfd9218-59df-4aa0-9b37-47f96a0aeb4f

pending completion
3cfd9218-59df-4aa0-9b37-47f96a0aeb4f

Pull #11910

circleci

esahin90
feat(microservices): handle kafka consumer crashes

To be able to react to consumer crashes within kafkajs, which are not
retryable, the ClientKafka and ServerKafka have to listen to CRASH
events and act accordingly. Listen to CRASH events and throw an
exception, if the crash is not retried.

Documentation: https://kafka.js.org/docs/instrumentation-events#a-name-consumer-a-consumer
Pull Request #11910: feat(microservices): handle kafka consumer crashes

2395 of 2867 branches covered (83.54%)

8 of 8 new or added lines in 2 files covered. (100.0%)

6365 of 6860 relevant lines covered (92.78%)

16.48 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

90.0
/packages/microservices/server/server-kafka.ts
1
import { Logger } from '@nestjs/common/services/logger.service';
1✔
2
import { isNil } from '@nestjs/common/utils/shared.utils';
1✔
3
import { isObservable, lastValueFrom, Observable, ReplaySubject } from 'rxjs';
1✔
4

5
import {
1✔
6
  KAFKA_DEFAULT_BROKER,
7
  KAFKA_DEFAULT_CLIENT,
8
  KAFKA_DEFAULT_GROUP,
9
  NO_EVENT_HANDLER,
10
  NO_MESSAGE_HANDLER,
11
} from '../constants';
12
import { KafkaContext } from '../ctx-host';
1✔
13
import { KafkaRequestDeserializer } from '../deserializers/kafka-request.deserializer';
1✔
14
import { KafkaHeaders, Transport } from '../enums';
1✔
15
import { KafkaRetriableException } from '../exceptions';
1✔
16
import {
17
  BrokersFunction,
18
  Consumer,
19
  ConsumerConfig,
20
  EachMessagePayload,
21
  InstrumentationEvent,
22
  Kafka,
23
  KafkaConfig,
24
  KafkaJSError,
25
  KafkaMessage,
26
  Message,
27
  Producer,
28
  RecordMetadata,
29
} from '../external/kafka.interface';
30
import { KafkaLogger, KafkaParser } from '../helpers';
1✔
31
import {
32
  CustomTransportStrategy,
33
  KafkaOptions,
34
  OutgoingResponse,
35
  ReadPacket,
36
} from '../interfaces';
37
import { KafkaRequestSerializer } from '../serializers/kafka-request.serializer';
1✔
38
import { Server } from './server';
1✔
39

40
let kafkaPackage: any = {};
1✔
41

42
export class ServerKafka extends Server implements CustomTransportStrategy {
1✔
43
  public readonly transportId = Transport.KAFKA;
26✔
44

45
  protected logger = new Logger(ServerKafka.name);
26✔
46
  protected client: Kafka = null;
26✔
47
  protected consumer: Consumer = null;
26✔
48
  protected producer: Producer = null;
26✔
49
  protected parser: KafkaParser = null;
26✔
50

51
  protected brokers: string[] | BrokersFunction;
52
  protected clientId: string;
53
  protected groupId: string;
54

55
  constructor(protected readonly options: KafkaOptions['options']) {
26✔
56
    super();
26✔
57

58
    const clientOptions =
59
      this.getOptionsProp(this.options, 'client') || ({} as KafkaConfig);
26✔
60
    const consumerOptions =
61
      this.getOptionsProp(this.options, 'consumer') || ({} as ConsumerConfig);
26✔
62
    const postfixId =
63
      this.getOptionsProp(this.options, 'postfixId') ?? '-server';
26✔
64

65
    this.brokers = clientOptions.brokers || [KAFKA_DEFAULT_BROKER];
26✔
66

67
    // append a unique id to the clientId and groupId
68
    // so they don't collide with a microservices client
69
    this.clientId =
26✔
70
      (clientOptions.clientId || KAFKA_DEFAULT_CLIENT) + postfixId;
52✔
71
    this.groupId = (consumerOptions.groupId || KAFKA_DEFAULT_GROUP) + postfixId;
26✔
72

73
    kafkaPackage = this.loadPackage('kafkajs', ServerKafka.name, () =>
26✔
74
      require('kafkajs'),
26✔
75
    );
76

77
    this.parser = new KafkaParser((options && options.parser) || undefined);
26✔
78

79
    this.initializeSerializer(options);
26✔
80
    this.initializeDeserializer(options);
26✔
81
  }
82

83
  public async listen(
84
    callback: (err?: unknown, ...optionalParams: unknown[]) => void,
85
  ): Promise<void> {
86
    try {
6✔
87
      this.client = this.createClient();
6✔
88
      await this.start(callback);
6✔
89
    } catch (err) {
90
      callback(err);
1✔
91
    }
92
  }
93

94
  public async close(): Promise<void> {
95
    this.consumer && (await this.consumer.disconnect());
1✔
96
    this.producer && (await this.producer.disconnect());
1✔
97
    this.consumer = null;
1✔
98
    this.producer = null;
1✔
99
    this.client = null;
1✔
100
  }
101

102
  public async start(callback: () => void): Promise<void> {
103
    const consumerOptions = Object.assign(this.options.consumer || {}, {
5✔
104
      groupId: this.groupId,
105
    });
106
    this.consumer = this.client.consumer(consumerOptions);
5✔
107
    this.producer = this.client.producer(this.options.producer);
5✔
108

109
    this.consumer.on(this.consumer.events.CRASH, this.handleConsumerCrash);
5✔
110

111
    await this.consumer.connect();
5✔
112
    await this.producer.connect();
5✔
113
    await this.bindEvents(this.consumer);
5✔
114
    callback();
5✔
115
  }
116

117
  public createClient<T = any>(): T {
118
    return new kafkaPackage.Kafka(
1✔
119
      Object.assign(
120
        { logCreator: KafkaLogger.bind(null, this.logger) },
121
        this.options.client,
122
        { clientId: this.clientId, brokers: this.brokers },
123
      ) as KafkaConfig,
124
    );
125
  }
126

127
  public async bindEvents(consumer: Consumer) {
128
    const registeredPatterns = [...this.messageHandlers.keys()];
7✔
129
    const consumerSubscribeOptions = this.options.subscribe || {};
7✔
130

131
    if (registeredPatterns.length > 0) {
7✔
132
      await this.consumer.subscribe({
2✔
133
        ...consumerSubscribeOptions,
134
        topics: registeredPatterns,
135
      });
136
    }
137

138
    const consumerRunOptions = Object.assign(this.options.run || {}, {
7✔
139
      eachMessage: this.getMessageHandler(),
140
    });
141
    await consumer.run(consumerRunOptions);
7✔
142
  }
143

144
  public getMessageHandler() {
145
    return async (payload: EachMessagePayload) => this.handleMessage(payload);
9✔
146
  }
147

148
  public getPublisher(
149
    replyTopic: string,
150
    replyPartition: string,
151
    correlationId: string,
152
  ): (data: any) => Promise<RecordMetadata[]> {
153
    return (data: any) =>
3✔
154
      this.sendMessage(data, replyTopic, replyPartition, correlationId);
1✔
155
  }
156

157
  public async handleMessage(payload: EachMessagePayload) {
158
    const channel = payload.topic;
6✔
159
    const rawMessage = this.parser.parse<KafkaMessage>(
6✔
160
      Object.assign(payload.message, {
161
        topic: payload.topic,
162
        partition: payload.partition,
163
      }),
164
    );
165
    const headers = rawMessage.headers as unknown as Record<string, any>;
6✔
166
    const correlationId = headers[KafkaHeaders.CORRELATION_ID];
6✔
167
    const replyTopic = headers[KafkaHeaders.REPLY_TOPIC];
6✔
168
    const replyPartition = headers[KafkaHeaders.REPLY_PARTITION];
6✔
169

170
    const packet = await this.deserializer.deserialize(rawMessage, { channel });
6✔
171
    const kafkaContext = new KafkaContext([
6✔
172
      rawMessage,
173
      payload.partition,
174
      payload.topic,
175
      this.consumer,
176
      payload.heartbeat,
177
      this.producer,
178
    ]);
179
    const handler = this.getHandlerByPattern(packet.pattern);
6✔
180
    // if the correlation id or reply topic is not set
181
    // then this is an event (events could still have correlation id)
182
    if (handler?.isEventHandler || !correlationId || !replyTopic) {
6✔
183
      return this.handleEvent(packet.pattern, packet, kafkaContext);
3✔
184
    }
185

186
    const publish = this.getPublisher(
3✔
187
      replyTopic,
188
      replyPartition,
189
      correlationId,
190
    );
191

192
    if (!handler) {
3✔
193
      return publish({
1✔
194
        id: correlationId,
195
        err: NO_MESSAGE_HANDLER,
196
      });
197
    }
198

199
    const response$ = this.transformToObservable(
2✔
200
      handler(packet.data, kafkaContext),
201
    );
202

203
    const replayStream$ = new ReplaySubject();
2✔
204
    await this.combineStreamsAndThrowIfRetriable(response$, replayStream$);
2✔
205

206
    this.send(replayStream$, publish);
2✔
207
  }
208

209
  private combineStreamsAndThrowIfRetriable(
210
    response$: Observable<any>,
211
    replayStream$: ReplaySubject<unknown>,
212
  ) {
213
    return new Promise<void>((resolve, reject) => {
2✔
214
      let isPromiseResolved = false;
2✔
215
      response$.subscribe({
2✔
216
        next: val => {
217
          replayStream$.next(val);
2✔
218
          if (!isPromiseResolved) {
2!
219
            isPromiseResolved = true;
2✔
220
            resolve();
2✔
221
          }
222
        },
223
        error: err => {
224
          if (err instanceof KafkaRetriableException && !isPromiseResolved) {
×
225
            isPromiseResolved = true;
×
226
            reject(err);
×
227
          } else {
228
            resolve();
×
229
          }
230
          replayStream$.error(err);
×
231
        },
232
        complete: () => replayStream$.complete(),
2✔
233
      });
234
    });
235
  }
236

237
  public async sendMessage(
238
    message: OutgoingResponse,
239
    replyTopic: string,
240
    replyPartition: string,
241
    correlationId: string,
242
  ): Promise<RecordMetadata[]> {
243
    const outgoingMessage = await this.serializer.serialize(message.response);
4✔
244
    this.assignReplyPartition(replyPartition, outgoingMessage);
4✔
245
    this.assignCorrelationIdHeader(correlationId, outgoingMessage);
4✔
246
    this.assignErrorHeader(message, outgoingMessage);
4✔
247
    this.assignIsDisposedHeader(message, outgoingMessage);
4✔
248

249
    const replyMessage = Object.assign(
4✔
250
      {
251
        topic: replyTopic,
252
        messages: [outgoingMessage],
253
      },
254
      this.options.send || {},
8✔
255
    );
256
    return this.producer.send(replyMessage);
4✔
257
  }
258

259
  public assignIsDisposedHeader(
260
    outgoingResponse: OutgoingResponse,
261
    outgoingMessage: Message,
262
  ) {
263
    if (!outgoingResponse.isDisposed) {
4✔
264
      return;
3✔
265
    }
266
    outgoingMessage.headers[KafkaHeaders.NEST_IS_DISPOSED] = Buffer.alloc(1);
1✔
267
  }
268

269
  public assignErrorHeader(
270
    outgoingResponse: OutgoingResponse,
271
    outgoingMessage: Message,
272
  ) {
273
    if (!outgoingResponse.err) {
4✔
274
      return;
3✔
275
    }
276
    const stringifiedError =
277
      typeof outgoingResponse.err === 'object'
1✔
278
        ? JSON.stringify(outgoingResponse.err)
1!
279
        : outgoingResponse.err;
280
    outgoingMessage.headers[KafkaHeaders.NEST_ERR] =
1✔
281
      Buffer.from(stringifiedError);
282
  }
283

284
  public assignCorrelationIdHeader(
285
    correlationId: string,
286
    outgoingMessage: Message,
287
  ) {
288
    outgoingMessage.headers[KafkaHeaders.CORRELATION_ID] =
4✔
289
      Buffer.from(correlationId);
290
  }
291

292
  public assignReplyPartition(
293
    replyPartition: string,
294
    outgoingMessage: Message,
295
  ) {
296
    if (isNil(replyPartition)) {
4✔
297
      return;
1✔
298
    }
299
    outgoingMessage.partition = parseFloat(replyPartition);
3✔
300
  }
301

302
  public async handleEvent(
303
    pattern: string,
304
    packet: ReadPacket,
305
    context: KafkaContext,
306
  ): Promise<any> {
307
    const handler = this.getHandlerByPattern(pattern);
5✔
308
    if (!handler) {
5✔
309
      return this.logger.error(NO_EVENT_HANDLER`${pattern}`);
2✔
310
    }
311
    const resultOrStream = await handler(packet.data, context);
3✔
312
    if (isObservable(resultOrStream)) {
2!
313
      await lastValueFrom(resultOrStream);
×
314
    }
315
  }
316

317
  protected initializeSerializer(options: KafkaOptions['options']) {
318
    this.serializer =
26✔
319
      (options && options.serializer) || new KafkaRequestSerializer();
77✔
320
  }
321

322
  protected initializeDeserializer(options: KafkaOptions['options']) {
323
    this.deserializer = options?.deserializer ?? new KafkaRequestDeserializer();
26✔
324
  }
325

326
  private handleConsumerCrash(
327
    event: InstrumentationEvent<{
328
      error: KafkaJSError;
329
      groupId: string;
330
      restart: boolean;
331
    }>,
332
  ) {
333
    const { error, groupId, restart } = event.payload;
×
334

335
    if (!restart) {
×
336
      throw new Error('Consumer crashed');
×
337
    }
338
  }
339
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc