• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

nestjs / nest / 481bc0ec-33a8-4162-8bf2-c5199bf1f5fa

15 Jul 2025 12:37PM UTC coverage: 88.918% (-0.02%) from 88.934%
481bc0ec-33a8-4162-8bf2-c5199bf1f5fa

Pull #15385

circleci

mag123c
feat(platform-fastify): implement lazy middleware registration
Pull Request #15385: fix(testing): auto-init fastify adapter for middleware registration

2711 of 3425 branches covered (79.15%)

7197 of 8094 relevant lines covered (88.92%)

16.35 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

86.33
/packages/microservices/server/server-kafka.ts
1
import { Logger } from '@nestjs/common/services/logger.service';
1✔
2
import { isNil } from '@nestjs/common/utils/shared.utils';
1✔
3
import { isObservable, lastValueFrom, Observable, ReplaySubject } from 'rxjs';
1✔
4
import {
1✔
5
  KAFKA_DEFAULT_BROKER,
6
  KAFKA_DEFAULT_CLIENT,
7
  KAFKA_DEFAULT_GROUP,
8
  NO_EVENT_HANDLER,
9
  NO_MESSAGE_HANDLER,
10
} from '../constants';
11
import { KafkaContext } from '../ctx-host';
1✔
12
import { KafkaRequestDeserializer } from '../deserializers/kafka-request.deserializer';
1✔
13
import { KafkaHeaders, Transport } from '../enums';
1✔
14
import { KafkaStatus } from '../events';
15
import { KafkaRetriableException } from '../exceptions';
1✔
16
import {
17
  BrokersFunction,
18
  Consumer,
19
  ConsumerConfig,
20
  EachMessagePayload,
21
  Kafka,
22
  KafkaConfig,
23
  KafkaMessage,
24
  Message,
25
  Producer,
26
  RecordMetadata,
27
} from '../external/kafka.interface';
28
import { KafkaLogger, KafkaParser } from '../helpers';
1✔
29
import {
30
  KafkaOptions,
31
  OutgoingResponse,
32
  ReadPacket,
33
  TransportId,
34
} from '../interfaces';
35
import { KafkaRequestSerializer } from '../serializers/kafka-request.serializer';
1✔
36
import { Server } from './server';
1✔
37

38
let kafkaPackage: any = {};
1✔
39

40
/**
41
 * @publicApi
42
 */
43
export class ServerKafka extends Server<never, KafkaStatus> {
1✔
44
  public transportId: TransportId = Transport.KAFKA;
27✔
45

46
  protected logger = new Logger(ServerKafka.name);
27✔
47
  protected client: Kafka | null = null;
27✔
48
  protected consumer: Consumer | null = null;
27✔
49
  protected producer: Producer | null = null;
27✔
50
  protected parser: KafkaParser | null = null;
27✔
51
  protected brokers: string[] | BrokersFunction;
52
  protected clientId: string;
53
  protected groupId: string;
54

55
  constructor(protected readonly options: Required<KafkaOptions>['options']) {
27✔
56
    super();
27✔
57

58
    const clientOptions = this.getOptionsProp(
27✔
59
      this.options,
60
      'client',
61
      {} as KafkaConfig,
62
    );
63
    const consumerOptions = this.getOptionsProp(
27✔
64
      this.options,
65
      'consumer',
66
      {} as ConsumerConfig,
67
    );
68
    const postfixId = this.getOptionsProp(this.options, 'postfixId', '-server');
27✔
69

70
    this.brokers = clientOptions.brokers || [KAFKA_DEFAULT_BROKER];
27✔
71

72
    // Append a unique id to the clientId and groupId
73
    // so they don't collide with a microservices client
74
    this.clientId =
27✔
75
      (clientOptions.clientId || KAFKA_DEFAULT_CLIENT) + postfixId;
54✔
76
    this.groupId = (consumerOptions.groupId || KAFKA_DEFAULT_GROUP) + postfixId;
27✔
77

78
    kafkaPackage = this.loadPackage('kafkajs', ServerKafka.name, () =>
27✔
79
      require('kafkajs'),
27✔
80
    );
81

82
    this.parser = new KafkaParser((options && options.parser) || undefined);
27✔
83

84
    this.initializeSerializer(options);
27✔
85
    this.initializeDeserializer(options);
27✔
86
  }
87

88
  public async listen(
89
    callback: (err?: unknown, ...optionalParams: unknown[]) => void,
90
  ): Promise<void> {
91
    try {
6✔
92
      this.client = this.createClient();
6✔
93
      await this.start(callback);
6✔
94
    } catch (err) {
95
      callback(err);
1✔
96
    }
97
  }
98

99
  public async close(): Promise<void> {
100
    this.consumer && (await this.consumer.disconnect());
1✔
101
    this.producer && (await this.producer.disconnect());
1✔
102
    this.consumer = null;
1✔
103
    this.producer = null;
1✔
104
    this.client = null;
1✔
105
  }
106

107
  public async start(callback: () => void): Promise<void> {
108
    const consumerOptions = Object.assign(this.options.consumer || {}, {
5✔
109
      groupId: this.groupId,
110
    });
111
    this.consumer = this.client!.consumer(consumerOptions);
5✔
112
    this.producer = this.client!.producer(this.options.producer);
5✔
113
    this.registerConsumerEventListeners();
5✔
114
    this.registerProducerEventListeners();
5✔
115

116
    await this.consumer.connect();
5✔
117
    await this.producer.connect();
5✔
118
    await this.bindEvents(this.consumer);
5✔
119
    callback();
5✔
120
  }
121

122
  protected registerConsumerEventListeners() {
123
    if (!this.consumer) {
5!
124
      return;
×
125
    }
126
    this.consumer.on(this.consumer.events.CONNECT, () =>
5✔
127
      this._status$.next(KafkaStatus.CONNECTED),
×
128
    );
129
    this.consumer.on(this.consumer.events.DISCONNECT, () =>
5✔
130
      this._status$.next(KafkaStatus.DISCONNECTED),
×
131
    );
132
    this.consumer.on(this.consumer.events.REBALANCING, () =>
5✔
133
      this._status$.next(KafkaStatus.REBALANCING),
×
134
    );
135
    this.consumer.on(this.consumer.events.STOP, () =>
5✔
136
      this._status$.next(KafkaStatus.STOPPED),
×
137
    );
138
    this.consumer.on(this.consumer.events.CRASH, () =>
5✔
139
      this._status$.next(KafkaStatus.CRASHED),
×
140
    );
141
  }
142

143
  protected registerProducerEventListeners() {
144
    if (!this.producer) {
5!
145
      return;
×
146
    }
147
    this.producer.on(this.producer.events.CONNECT, () =>
5✔
148
      this._status$.next(KafkaStatus.CONNECTED),
×
149
    );
150
    this.producer.on(this.producer.events.DISCONNECT, () =>
5✔
151
      this._status$.next(KafkaStatus.DISCONNECTED),
×
152
    );
153
  }
154

155
  public createClient<T = any>(): T {
156
    return new kafkaPackage.Kafka(
1✔
157
      Object.assign(
158
        { logCreator: KafkaLogger.bind(null, this.logger) },
159
        this.options.client,
160
        { clientId: this.clientId, brokers: this.brokers },
161
      ) as KafkaConfig,
162
    );
163
  }
164

165
  public async bindEvents(consumer: Consumer) {
166
    const registeredPatterns = [...this.messageHandlers.keys()];
7✔
167
    const consumerSubscribeOptions = this.options.subscribe || {};
7✔
168

169
    if (registeredPatterns.length > 0) {
7✔
170
      await this.consumer!.subscribe({
2✔
171
        ...consumerSubscribeOptions,
172
        topics: registeredPatterns,
173
      });
174
    }
175

176
    const consumerRunOptions = Object.assign(this.options.run || {}, {
7✔
177
      eachMessage: this.getMessageHandler(),
178
    });
179
    await consumer.run(consumerRunOptions);
7✔
180
  }
181

182
  public getMessageHandler() {
183
    return async (payload: EachMessagePayload) => this.handleMessage(payload);
9✔
184
  }
185

186
  public getPublisher(
187
    replyTopic: string,
188
    replyPartition: string,
189
    correlationId: string,
190
  ): (data: any) => Promise<RecordMetadata[]> {
191
    return (data: any) =>
3✔
192
      this.sendMessage(data, replyTopic, replyPartition, correlationId);
1✔
193
  }
194

195
  public async handleMessage(payload: EachMessagePayload) {
196
    const channel = payload.topic;
6✔
197
    const rawMessage = this.parser!.parse<KafkaMessage>(
6✔
198
      Object.assign(payload.message, {
199
        topic: payload.topic,
200
        partition: payload.partition,
201
      }),
202
    );
203
    const headers = rawMessage.headers as unknown as Record<string, any>;
6✔
204
    const correlationId = headers[KafkaHeaders.CORRELATION_ID];
6✔
205
    const replyTopic = headers[KafkaHeaders.REPLY_TOPIC];
6✔
206
    const replyPartition = headers[KafkaHeaders.REPLY_PARTITION];
6✔
207

208
    const packet = await this.deserializer.deserialize(rawMessage, { channel });
6✔
209
    const kafkaContext = new KafkaContext([
6✔
210
      rawMessage,
211
      payload.partition,
212
      payload.topic,
213
      this.consumer!,
214
      payload.heartbeat,
215
      this.producer!,
216
    ]);
217
    const handler = this.getHandlerByPattern(packet.pattern);
6✔
218
    // if the correlation id or reply topic is not set
219
    // then this is an event (events could still have correlation id)
220
    if (handler?.isEventHandler || !correlationId || !replyTopic) {
6✔
221
      return this.handleEvent(packet.pattern, packet, kafkaContext);
3✔
222
    }
223

224
    const publish = this.getPublisher(
3✔
225
      replyTopic,
226
      replyPartition,
227
      correlationId,
228
    );
229

230
    if (!handler) {
3✔
231
      return publish({
1✔
232
        id: correlationId,
233
        err: NO_MESSAGE_HANDLER,
234
      });
235
    }
236

237
    const response$ = this.transformToObservable(
2✔
238
      handler(packet.data, kafkaContext),
239
    );
240

241
    const replayStream$ = new ReplaySubject();
2✔
242
    await this.combineStreamsAndThrowIfRetriable(response$, replayStream$);
2✔
243

244
    this.send(replayStream$, publish);
2✔
245
  }
246

247
  public unwrap<T>(): T {
248
    if (!this.client) {
×
249
      throw new Error(
×
250
        'Not initialized. Please call the "listen"/"startAllMicroservices" method before accessing the server.',
251
      );
252
    }
253
    return [this.client, this.consumer, this.producer] as T;
×
254
  }
255

256
  public on<
257
    EventKey extends string | number | symbol = string | number | symbol,
258
    EventCallback = any,
259
  >(event: EventKey, callback: EventCallback) {
260
    throw new Error('Method is not supported for Kafka server');
×
261
  }
262

263
  private combineStreamsAndThrowIfRetriable(
264
    response$: Observable<any>,
265
    replayStream$: ReplaySubject<unknown>,
266
  ) {
267
    return new Promise<void>((resolve, reject) => {
2✔
268
      let isPromiseResolved = false;
2✔
269
      response$.subscribe({
2✔
270
        next: val => {
271
          replayStream$.next(val);
2✔
272
          if (!isPromiseResolved) {
2!
273
            isPromiseResolved = true;
2✔
274
            resolve();
2✔
275
          }
276
        },
277
        error: err => {
278
          if (err instanceof KafkaRetriableException && !isPromiseResolved) {
×
279
            isPromiseResolved = true;
×
280
            reject(err);
×
281
          } else {
282
            resolve();
×
283
          }
284
          replayStream$.error(err);
×
285
        },
286
        complete: () => replayStream$.complete(),
2✔
287
      });
288
    });
289
  }
290

291
  public async sendMessage(
292
    message: OutgoingResponse,
293
    replyTopic: string,
294
    replyPartition: string | undefined | null,
295
    correlationId: string,
296
  ): Promise<RecordMetadata[]> {
297
    const outgoingMessage = await this.serializer.serialize(message.response);
4✔
298
    this.assignReplyPartition(replyPartition, outgoingMessage);
4✔
299
    this.assignCorrelationIdHeader(correlationId, outgoingMessage);
4✔
300
    this.assignErrorHeader(message, outgoingMessage);
4✔
301
    this.assignIsDisposedHeader(message, outgoingMessage);
4✔
302

303
    const replyMessage = Object.assign(
4✔
304
      {
305
        topic: replyTopic,
306
        messages: [outgoingMessage],
307
      },
308
      this.options.send || {},
8✔
309
    );
310
    return this.producer!.send(replyMessage);
4✔
311
  }
312

313
  public assignIsDisposedHeader(
314
    outgoingResponse: OutgoingResponse,
315
    outgoingMessage: Message,
316
  ) {
317
    if (!outgoingResponse.isDisposed) {
4✔
318
      return;
3✔
319
    }
320
    outgoingMessage.headers![KafkaHeaders.NEST_IS_DISPOSED] = Buffer.alloc(1);
1✔
321
  }
322

323
  public assignErrorHeader(
324
    outgoingResponse: OutgoingResponse,
325
    outgoingMessage: Message,
326
  ) {
327
    if (!outgoingResponse.err) {
4✔
328
      return;
3✔
329
    }
330
    const stringifiedError =
331
      typeof outgoingResponse.err === 'object'
1✔
332
        ? JSON.stringify(outgoingResponse.err)
1!
333
        : outgoingResponse.err;
334
    outgoingMessage.headers![KafkaHeaders.NEST_ERR] =
1✔
335
      Buffer.from(stringifiedError);
336
  }
337

338
  public assignCorrelationIdHeader(
339
    correlationId: string,
340
    outgoingMessage: Message,
341
  ) {
342
    outgoingMessage.headers![KafkaHeaders.CORRELATION_ID] =
4✔
343
      Buffer.from(correlationId);
344
  }
345

346
  public assignReplyPartition(
347
    replyPartition: string | null | undefined,
348
    outgoingMessage: Message,
349
  ) {
350
    if (isNil(replyPartition)) {
4✔
351
      return;
1✔
352
    }
353
    outgoingMessage.partition = parseFloat(replyPartition);
3✔
354
  }
355

356
  public async handleEvent(
357
    pattern: string,
358
    packet: ReadPacket,
359
    context: KafkaContext,
360
  ): Promise<any> {
361
    const handler = this.getHandlerByPattern(pattern);
5✔
362
    if (!handler) {
5✔
363
      return this.logger.error(NO_EVENT_HANDLER`${pattern}`);
2✔
364
    }
365
    const resultOrStream = await handler(packet.data, context);
3✔
366
    if (isObservable(resultOrStream)) {
2!
367
      await lastValueFrom(resultOrStream);
×
368
    }
369
  }
370

371
  protected initializeSerializer(options: KafkaOptions['options']) {
372
    this.serializer =
27✔
373
      (options && options.serializer) || new KafkaRequestSerializer();
79✔
374
  }
375

376
  protected initializeDeserializer(options: KafkaOptions['options']) {
377
    this.deserializer = options?.deserializer ?? new KafkaRequestDeserializer();
27✔
378
  }
379
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc