• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

nestjs / nest / 4afa0176-b936-482c-ad88-b635a9db93b4

14 Jul 2025 11:37AM UTC coverage: 88.866% (-0.02%) from 88.886%
4afa0176-b936-482c-ad88-b635a9db93b4

Pull #15386

circleci

kamilmysliwiec
style: address linter warnings
Pull Request #15386: feat: enhance introspection capabilities

2714 of 3431 branches covered (79.1%)

101 of 118 new or added lines in 15 files covered. (85.59%)

12 existing lines in 1 file now uncovered.

7239 of 8146 relevant lines covered (88.87%)

16.53 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

86.01
/packages/microservices/server/server-kafka.ts
1
import { Logger } from '@nestjs/common/services/logger.service';
1✔
2
import { isNil } from '@nestjs/common/utils/shared.utils';
1✔
3
import { isObservable, lastValueFrom, Observable, ReplaySubject } from 'rxjs';
1✔
4
import {
1✔
5
  KAFKA_DEFAULT_BROKER,
6
  KAFKA_DEFAULT_CLIENT,
7
  KAFKA_DEFAULT_GROUP,
8
  NO_EVENT_HANDLER,
9
  NO_MESSAGE_HANDLER,
10
} from '../constants';
11
import { KafkaContext } from '../ctx-host';
1✔
12
import { KafkaRequestDeserializer } from '../deserializers/kafka-request.deserializer';
1✔
13
import { KafkaHeaders, Transport } from '../enums';
1✔
14
import { KafkaStatus } from '../events';
15
import { KafkaRetriableException } from '../exceptions';
1✔
16
import {
17
  BrokersFunction,
18
  Consumer,
19
  ConsumerConfig,
20
  EachMessagePayload,
21
  Kafka,
22
  KafkaConfig,
23
  KafkaMessage,
24
  Message,
25
  Producer,
26
  RecordMetadata,
27
} from '../external/kafka.interface';
28
import { KafkaLogger, KafkaParser } from '../helpers';
1✔
29
import {
30
  KafkaOptions,
31
  OutgoingResponse,
32
  ReadPacket,
33
  TransportId,
34
} from '../interfaces';
35
import { KafkaRequestSerializer } from '../serializers/kafka-request.serializer';
1✔
36
import { Server } from './server';
1✔
37

38
let kafkaPackage: any = {};
1✔
39

40
/**
41
 * @publicApi
42
 */
43
export class ServerKafka extends Server<never, KafkaStatus> {
1✔
44
  public transportId: TransportId = Transport.KAFKA;
27✔
45

46
  protected logger = new Logger(ServerKafka.name);
27✔
47
  protected client: Kafka | null = null;
27✔
48
  protected consumer: Consumer | null = null;
27✔
49
  protected producer: Producer | null = null;
27✔
50
  protected parser: KafkaParser | null = null;
27✔
51
  protected brokers: string[] | BrokersFunction;
52
  protected clientId: string;
53
  protected groupId: string;
54

55
  constructor(protected readonly options: Required<KafkaOptions>['options']) {
27✔
56
    super();
27✔
57

58
    const clientOptions = this.getOptionsProp(
27✔
59
      this.options,
60
      'client',
61
      {} as KafkaConfig,
62
    );
63
    const consumerOptions = this.getOptionsProp(
27✔
64
      this.options,
65
      'consumer',
66
      {} as ConsumerConfig,
67
    );
68
    const postfixId = this.getOptionsProp(this.options, 'postfixId', '-server');
27✔
69

70
    this.brokers = clientOptions.brokers || [KAFKA_DEFAULT_BROKER];
27✔
71

72
    // Append a unique id to the clientId and groupId
73
    // so they don't collide with a microservices client
74
    this.clientId =
27✔
75
      (clientOptions.clientId || KAFKA_DEFAULT_CLIENT) + postfixId;
54✔
76
    this.groupId = (consumerOptions.groupId || KAFKA_DEFAULT_GROUP) + postfixId;
27✔
77

78
    kafkaPackage = this.loadPackage('kafkajs', ServerKafka.name, () =>
27✔
79
      require('kafkajs'),
27✔
80
    );
81

82
    this.parser = new KafkaParser((options && options.parser) || undefined);
27✔
83

84
    this.initializeSerializer(options);
27✔
85
    this.initializeDeserializer(options);
27✔
86
  }
87

88
  public async listen(
89
    callback: (err?: unknown, ...optionalParams: unknown[]) => void,
90
  ): Promise<void> {
91
    try {
6✔
92
      this.client = this.createClient();
6✔
93
      await this.start(callback);
6✔
94
    } catch (err) {
95
      callback(err);
1✔
96
    }
97
  }
98

99
  public async close(): Promise<void> {
100
    this.consumer && (await this.consumer.disconnect());
1✔
101
    this.producer && (await this.producer.disconnect());
1✔
102
    this.consumer = null;
1✔
103
    this.producer = null;
1✔
104
    this.client = null;
1✔
105
  }
106

107
  public async start(callback: () => void): Promise<void> {
108
    const consumerOptions = Object.assign(this.options.consumer || {}, {
5✔
109
      groupId: this.groupId,
110
    });
111
    this.consumer = this.client!.consumer(consumerOptions);
5✔
112
    this.producer = this.client!.producer(this.options.producer);
5✔
113
    this.registerConsumerEventListeners();
5✔
114
    this.registerProducerEventListeners();
5✔
115

116
    await this.consumer.connect();
5✔
117
    await this.producer.connect();
5✔
118
    await this.bindEvents(this.consumer);
5✔
119
    callback();
5✔
120
  }
121

122
  protected registerConsumerEventListeners() {
123
    if (!this.consumer) {
5!
124
      return;
×
125
    }
126
    this.consumer.on(this.consumer.events.CONNECT, () =>
5✔
127
      this._status$.next(KafkaStatus.CONNECTED),
×
128
    );
129
    this.consumer.on(this.consumer.events.DISCONNECT, () =>
5✔
130
      this._status$.next(KafkaStatus.DISCONNECTED),
×
131
    );
132
    this.consumer.on(this.consumer.events.REBALANCING, () =>
5✔
133
      this._status$.next(KafkaStatus.REBALANCING),
×
134
    );
135
    this.consumer.on(this.consumer.events.STOP, () =>
5✔
136
      this._status$.next(KafkaStatus.STOPPED),
×
137
    );
138
    this.consumer.on(this.consumer.events.CRASH, () =>
5✔
139
      this._status$.next(KafkaStatus.CRASHED),
×
140
    );
141
  }
142

143
  protected registerProducerEventListeners() {
144
    if (!this.producer) {
5!
145
      return;
×
146
    }
147
    this.producer.on(this.producer.events.CONNECT, () =>
5✔
148
      this._status$.next(KafkaStatus.CONNECTED),
×
149
    );
150
    this.producer.on(this.producer.events.DISCONNECT, () =>
5✔
151
      this._status$.next(KafkaStatus.DISCONNECTED),
×
152
    );
153
  }
154

155
  public createClient<T = any>(): T {
156
    return new kafkaPackage.Kafka(
1✔
157
      Object.assign(
158
        { logCreator: KafkaLogger.bind(null, this.logger) },
159
        this.options.client,
160
        { clientId: this.clientId, brokers: this.brokers },
161
      ) as KafkaConfig,
162
    );
163
  }
164

165
  public async bindEvents(consumer: Consumer) {
166
    const registeredPatterns = [...this.messageHandlers.keys()];
7✔
167
    const consumerSubscribeOptions = this.options.subscribe || {};
7✔
168

169
    if (registeredPatterns.length > 0) {
7✔
170
      await this.consumer!.subscribe({
2✔
171
        ...consumerSubscribeOptions,
172
        topics: registeredPatterns,
173
      });
174
    }
175

176
    const consumerRunOptions = Object.assign(this.options.run || {}, {
7✔
177
      eachMessage: this.getMessageHandler(),
178
    });
179
    await consumer.run(consumerRunOptions);
7✔
180
  }
181

182
  public getMessageHandler() {
183
    return async (payload: EachMessagePayload) => this.handleMessage(payload);
9✔
184
  }
185

186
  public getPublisher(
187
    replyTopic: string,
188
    replyPartition: string,
189
    correlationId: string,
190
    context: KafkaContext,
191
  ): (data: any) => Promise<RecordMetadata[]> {
192
    return (data: any) =>
3✔
193
      this.sendMessage(
1✔
194
        data,
195
        replyTopic,
196
        replyPartition,
197
        correlationId,
198
        context,
199
      );
200
  }
201

202
  public async handleMessage(payload: EachMessagePayload) {
203
    const channel = payload.topic;
6✔
204
    const rawMessage = this.parser!.parse<KafkaMessage>(
6✔
205
      Object.assign(payload.message, {
206
        topic: payload.topic,
207
        partition: payload.partition,
208
      }),
209
    );
210
    const headers = rawMessage.headers as unknown as Record<string, any>;
6✔
211
    const correlationId = headers[KafkaHeaders.CORRELATION_ID];
6✔
212
    const replyTopic = headers[KafkaHeaders.REPLY_TOPIC];
6✔
213
    const replyPartition = headers[KafkaHeaders.REPLY_PARTITION];
6✔
214

215
    const packet = await this.deserializer.deserialize(rawMessage, { channel });
6✔
216
    const kafkaContext = new KafkaContext([
6✔
217
      rawMessage,
218
      payload.partition,
219
      payload.topic,
220
      this.consumer!,
221
      payload.heartbeat,
222
      this.producer!,
223
    ]);
224
    const handler = this.getHandlerByPattern(packet.pattern);
6✔
225
    // if the correlation id or reply topic is not set
226
    // then this is an event (events could still have correlation id)
227
    if (handler?.isEventHandler || !correlationId || !replyTopic) {
6✔
228
      return this.handleEvent(packet.pattern, packet, kafkaContext);
3✔
229
    }
230

231
    const publish = this.getPublisher(
3✔
232
      replyTopic,
233
      replyPartition,
234
      correlationId,
235
      kafkaContext,
236
    );
237

238
    if (!handler) {
3✔
239
      return publish({
1✔
240
        id: correlationId,
241
        err: NO_MESSAGE_HANDLER,
242
      });
243
    }
244
    return this.onProcessingStartHook(
2✔
245
      this.transportId,
246
      kafkaContext,
247
      async () => {
248
        const response$ = this.transformToObservable(
2✔
249
          handler(packet.data, kafkaContext),
250
        );
251

252
        const replayStream$ = new ReplaySubject();
2✔
253
        await this.combineStreamsAndThrowIfRetriable(response$, replayStream$);
2✔
254

255
        this.send(replayStream$, publish);
2✔
256
      },
257
    );
258
  }
259

260
  public unwrap<T>(): T {
261
    if (!this.client) {
×
262
      throw new Error(
×
263
        'Not initialized. Please call the "listen"/"startAllMicroservices" method before accessing the server.',
264
      );
265
    }
266
    return [this.client, this.consumer, this.producer] as T;
×
267
  }
268

269
  public on<
270
    EventKey extends string | number | symbol = string | number | symbol,
271
    EventCallback = any,
272
  >(event: EventKey, callback: EventCallback) {
273
    throw new Error('Method is not supported for Kafka server');
×
274
  }
275

276
  private combineStreamsAndThrowIfRetriable(
277
    response$: Observable<any>,
278
    replayStream$: ReplaySubject<unknown>,
279
  ) {
280
    return new Promise<void>((resolve, reject) => {
2✔
281
      let isPromiseResolved = false;
2✔
282
      response$.subscribe({
2✔
283
        next: val => {
284
          replayStream$.next(val);
2✔
285
          if (!isPromiseResolved) {
2!
286
            isPromiseResolved = true;
2✔
287
            resolve();
2✔
288
          }
289
        },
290
        error: err => {
291
          if (err instanceof KafkaRetriableException && !isPromiseResolved) {
×
292
            isPromiseResolved = true;
×
293
            reject(err);
×
294
          } else {
295
            resolve();
×
296
          }
297
          replayStream$.error(err);
×
298
        },
299
        complete: () => replayStream$.complete(),
2✔
300
      });
301
    });
302
  }
303

304
  public async sendMessage(
305
    message: OutgoingResponse,
306
    replyTopic: string,
307
    replyPartition: string | undefined | null,
308
    correlationId: string,
309
    context: KafkaContext,
310
  ): Promise<RecordMetadata[]> {
311
    const outgoingMessage = await this.serializer.serialize(message.response);
4✔
312
    this.assignReplyPartition(replyPartition, outgoingMessage);
4✔
313
    this.assignCorrelationIdHeader(correlationId, outgoingMessage);
4✔
314
    this.assignErrorHeader(message, outgoingMessage);
4✔
315
    this.assignIsDisposedHeader(message, outgoingMessage);
4✔
316

317
    const replyMessage = Object.assign(
4✔
318
      {
319
        topic: replyTopic,
320
        messages: [outgoingMessage],
321
      },
322
      this.options.send || {},
8✔
323
    );
324
    return this.producer!.send(replyMessage).finally(() => {
4✔
325
      this.onProcessingEndHook?.(this.transportId, context);
4✔
326
    });
327
  }
328

329
  public assignIsDisposedHeader(
330
    outgoingResponse: OutgoingResponse,
331
    outgoingMessage: Message,
332
  ) {
333
    if (!outgoingResponse.isDisposed) {
4✔
334
      return;
3✔
335
    }
336
    outgoingMessage.headers![KafkaHeaders.NEST_IS_DISPOSED] = Buffer.alloc(1);
1✔
337
  }
338

339
  public assignErrorHeader(
340
    outgoingResponse: OutgoingResponse,
341
    outgoingMessage: Message,
342
  ) {
343
    if (!outgoingResponse.err) {
4✔
344
      return;
3✔
345
    }
346
    const stringifiedError =
347
      typeof outgoingResponse.err === 'object'
1✔
348
        ? JSON.stringify(outgoingResponse.err)
1!
349
        : outgoingResponse.err;
350
    outgoingMessage.headers![KafkaHeaders.NEST_ERR] =
1✔
351
      Buffer.from(stringifiedError);
352
  }
353

354
  public assignCorrelationIdHeader(
355
    correlationId: string,
356
    outgoingMessage: Message,
357
  ) {
358
    outgoingMessage.headers![KafkaHeaders.CORRELATION_ID] =
4✔
359
      Buffer.from(correlationId);
360
  }
361

362
  public assignReplyPartition(
363
    replyPartition: string | null | undefined,
364
    outgoingMessage: Message,
365
  ) {
366
    if (isNil(replyPartition)) {
4✔
367
      return;
1✔
368
    }
369
    outgoingMessage.partition = parseFloat(replyPartition);
3✔
370
  }
371

372
  public async handleEvent(
373
    pattern: string,
374
    packet: ReadPacket,
375
    context: KafkaContext,
376
  ): Promise<any> {
377
    const handler = this.getHandlerByPattern(pattern);
5✔
378
    if (!handler) {
5✔
379
      return this.logger.error(NO_EVENT_HANDLER`${pattern}`);
2✔
380
    }
381

382
    return this.onProcessingStartHook(this.transportId, context, async () => {
3✔
383
      const resultOrStream = await handler(packet.data, context);
3✔
384
      if (isObservable(resultOrStream)) {
2!
NEW
385
        await lastValueFrom(resultOrStream);
×
NEW
386
        this.onProcessingEndHook?.(this.transportId, context);
×
387
      }
388
    });
389
  }
390

391
  protected initializeSerializer(options: KafkaOptions['options']) {
392
    this.serializer =
27✔
393
      (options && options.serializer) || new KafkaRequestSerializer();
79✔
394
  }
395

396
  protected initializeDeserializer(options: KafkaOptions['options']) {
397
    this.deserializer = options?.deserializer ?? new KafkaRequestDeserializer();
27✔
398
  }
399
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc