• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

nestjs / nest / 88e8b676-a087-4894-ad55-c3b4e77ab62f

pending completion
88e8b676-a087-4894-ad55-c3b4e77ab62f

Pull #10390

circleci

GitHub
Update packages/core/middleware/route-info-path-extractor.ts
Pull Request #10390: fix(core): let the middleware can get the params in the global prefix

2719 of 3355 branches covered (81.04%)

30 of 30 new or added lines in 5 files covered. (100.0%)

6401 of 6883 relevant lines covered (93.0%)

15.62 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

91.89
/packages/microservices/server/server-kafka.ts
1
import { Logger } from '@nestjs/common/services/logger.service';
1✔
2
import { isNil } from '@nestjs/common/utils/shared.utils';
1✔
3
import { isObservable, lastValueFrom, Observable, ReplaySubject } from 'rxjs';
1✔
4
import {
1✔
5
  KAFKA_DEFAULT_BROKER,
6
  KAFKA_DEFAULT_CLIENT,
7
  KAFKA_DEFAULT_GROUP,
8
  NO_EVENT_HANDLER,
9
  NO_MESSAGE_HANDLER,
10
} from '../constants';
11
import { KafkaContext } from '../ctx-host';
1✔
12
import { KafkaRequestDeserializer } from '../deserializers/kafka-request.deserializer';
1✔
13
import { KafkaHeaders, Transport } from '../enums';
1✔
14
import { KafkaRetriableException } from '../exceptions';
1✔
15
import {
16
  BrokersFunction,
17
  Consumer,
18
  ConsumerConfig,
19
  EachMessagePayload,
20
  Kafka,
21
  KafkaConfig,
22
  KafkaMessage,
23
  Message,
24
  Producer,
25
  RecordMetadata,
26
} from '../external/kafka.interface';
27
import { KafkaLogger, KafkaParser } from '../helpers';
1✔
28
import {
29
  CustomTransportStrategy,
30
  KafkaOptions,
31
  OutgoingResponse,
32
  ReadPacket,
33
} from '../interfaces';
34
import { KafkaRequestSerializer } from '../serializers/kafka-request.serializer';
1✔
35
import { Server } from './server';
1✔
36

37
let kafkaPackage: any = {};
1✔
38

39
export class ServerKafka extends Server implements CustomTransportStrategy {
1✔
40
  public readonly transportId = Transport.KAFKA;
26✔
41

42
  protected logger = new Logger(ServerKafka.name);
26✔
43
  protected client: Kafka = null;
26✔
44
  protected consumer: Consumer = null;
26✔
45
  protected producer: Producer = null;
26✔
46
  protected parser: KafkaParser = null;
26✔
47

48
  protected brokers: string[] | BrokersFunction;
49
  protected clientId: string;
50
  protected groupId: string;
51

52
  constructor(protected readonly options: KafkaOptions['options']) {
26✔
53
    super();
26✔
54

55
    const clientOptions =
56
      this.getOptionsProp(this.options, 'client') || ({} as KafkaConfig);
26✔
57
    const consumerOptions =
58
      this.getOptionsProp(this.options, 'consumer') || ({} as ConsumerConfig);
26✔
59
    const postfixId =
60
      this.getOptionsProp(this.options, 'postfixId') ?? '-server';
26!
61

62
    this.brokers = clientOptions.brokers || [KAFKA_DEFAULT_BROKER];
26✔
63

64
    // append a unique id to the clientId and groupId
65
    // so they don't collide with a microservices client
66
    this.clientId =
26✔
67
      (clientOptions.clientId || KAFKA_DEFAULT_CLIENT) + postfixId;
52✔
68
    this.groupId = (consumerOptions.groupId || KAFKA_DEFAULT_GROUP) + postfixId;
26✔
69

70
    kafkaPackage = this.loadPackage('kafkajs', ServerKafka.name, () =>
26✔
71
      require('kafkajs'),
26✔
72
    );
73

74
    this.parser = new KafkaParser((options && options.parser) || undefined);
26✔
75

76
    this.initializeSerializer(options);
26✔
77
    this.initializeDeserializer(options);
26✔
78
  }
79

80
  public async listen(
81
    callback: (err?: unknown, ...optionalParams: unknown[]) => void,
82
  ): Promise<void> {
83
    try {
6✔
84
      this.client = this.createClient();
6✔
85
      await this.start(callback);
6✔
86
    } catch (err) {
87
      callback(err);
1✔
88
    }
89
  }
90

91
  public async close(): Promise<void> {
92
    this.consumer && (await this.consumer.disconnect());
1✔
93
    this.producer && (await this.producer.disconnect());
1✔
94
    this.consumer = null;
1✔
95
    this.producer = null;
1✔
96
    this.client = null;
1✔
97
  }
98

99
  public async start(callback: () => void): Promise<void> {
100
    const consumerOptions = Object.assign(this.options.consumer || {}, {
5✔
101
      groupId: this.groupId,
102
    });
103
    this.consumer = this.client.consumer(consumerOptions);
5✔
104
    this.producer = this.client.producer(this.options.producer);
5✔
105

106
    await this.consumer.connect();
5✔
107
    await this.producer.connect();
5✔
108
    await this.bindEvents(this.consumer);
5✔
109
    callback();
5✔
110
  }
111

112
  public createClient<T = any>(): T {
113
    return new kafkaPackage.Kafka(
1✔
114
      Object.assign(
115
        { logCreator: KafkaLogger.bind(null, this.logger) },
116
        this.options.client,
117
        { clientId: this.clientId, brokers: this.brokers },
118
      ) as KafkaConfig,
119
    );
120
  }
121

122
  public async bindEvents(consumer: Consumer) {
123
    const registeredPatterns = [...this.messageHandlers.keys()];
7✔
124
    const consumerSubscribeOptions = this.options.subscribe || {};
7✔
125
    const subscribeToPattern = async (pattern: string) =>
7✔
126
      consumer.subscribe({
2✔
127
        topic: pattern,
128
        ...consumerSubscribeOptions,
129
      });
130
    await Promise.all(registeredPatterns.map(subscribeToPattern));
7✔
131

132
    const consumerRunOptions = Object.assign(this.options.run || {}, {
7✔
133
      eachMessage: this.getMessageHandler(),
134
    });
135
    await consumer.run(consumerRunOptions);
7✔
136
  }
137

138
  public getMessageHandler() {
139
    return async (payload: EachMessagePayload) => this.handleMessage(payload);
9✔
140
  }
141

142
  public getPublisher(
143
    replyTopic: string,
144
    replyPartition: string,
145
    correlationId: string,
146
  ): (data: any) => Promise<RecordMetadata[]> {
147
    return (data: any) =>
3✔
148
      this.sendMessage(data, replyTopic, replyPartition, correlationId);
1✔
149
  }
150

151
  public async handleMessage(payload: EachMessagePayload) {
152
    const channel = payload.topic;
6✔
153
    const rawMessage = this.parser.parse<KafkaMessage>(
6✔
154
      Object.assign(payload.message, {
155
        topic: payload.topic,
156
        partition: payload.partition,
157
      }),
158
    );
159
    const headers = rawMessage.headers as unknown as Record<string, any>;
6✔
160
    const correlationId = headers[KafkaHeaders.CORRELATION_ID];
6✔
161
    const replyTopic = headers[KafkaHeaders.REPLY_TOPIC];
6✔
162
    const replyPartition = headers[KafkaHeaders.REPLY_PARTITION];
6✔
163

164
    const packet = await this.deserializer.deserialize(rawMessage, { channel });
6✔
165
    const kafkaContext = new KafkaContext([
6✔
166
      rawMessage,
167
      payload.partition,
168
      payload.topic,
169
      this.consumer,
170
      payload.heartbeat,
171
      this.producer,
172
    ]);
173
    const handler = this.getHandlerByPattern(packet.pattern);
6✔
174
    // if the correlation id or reply topic is not set
175
    // then this is an event (events could still have correlation id)
176
    if (handler?.isEventHandler || !correlationId || !replyTopic) {
6✔
177
      return this.handleEvent(packet.pattern, packet, kafkaContext);
3✔
178
    }
179

180
    const publish = this.getPublisher(
3✔
181
      replyTopic,
182
      replyPartition,
183
      correlationId,
184
    );
185

186
    if (!handler) {
3✔
187
      return publish({
1✔
188
        id: correlationId,
189
        err: NO_MESSAGE_HANDLER,
190
      });
191
    }
192

193
    const response$ = this.transformToObservable(
2✔
194
      await handler(packet.data, kafkaContext),
195
    );
196

197
    const replayStream$ = new ReplaySubject();
2✔
198
    await this.combineStreamsAndThrowIfRetriable(response$, replayStream$);
2✔
199

200
    this.send(replayStream$, publish);
2✔
201
  }
202

203
  private combineStreamsAndThrowIfRetriable(
204
    response$: Observable<any>,
205
    replayStream$: ReplaySubject<unknown>,
206
  ) {
207
    return new Promise<void>((resolve, reject) => {
2✔
208
      let isPromiseResolved = false;
2✔
209
      response$.subscribe({
2✔
210
        next: val => {
211
          replayStream$.next(val);
2✔
212
          if (!isPromiseResolved) {
2!
213
            isPromiseResolved = true;
2✔
214
            resolve();
2✔
215
          }
216
        },
217
        error: err => {
218
          if (err instanceof KafkaRetriableException && !isPromiseResolved) {
×
219
            isPromiseResolved = true;
×
220
            reject(err);
×
221
          } else {
222
            resolve();
×
223
          }
224
          replayStream$.error(err);
×
225
        },
226
        complete: () => replayStream$.complete(),
2✔
227
      });
228
    });
229
  }
230

231
  public async sendMessage(
232
    message: OutgoingResponse,
233
    replyTopic: string,
234
    replyPartition: string,
235
    correlationId: string,
236
  ): Promise<RecordMetadata[]> {
237
    const outgoingMessage = await this.serializer.serialize(message.response);
4✔
238
    this.assignReplyPartition(replyPartition, outgoingMessage);
4✔
239
    this.assignCorrelationIdHeader(correlationId, outgoingMessage);
4✔
240
    this.assignErrorHeader(message, outgoingMessage);
4✔
241
    this.assignIsDisposedHeader(message, outgoingMessage);
4✔
242

243
    const replyMessage = Object.assign(
4✔
244
      {
245
        topic: replyTopic,
246
        messages: [outgoingMessage],
247
      },
248
      this.options.send || {},
8✔
249
    );
250
    return this.producer.send(replyMessage);
4✔
251
  }
252

253
  public assignIsDisposedHeader(
254
    outgoingResponse: OutgoingResponse,
255
    outgoingMessage: Message,
256
  ) {
257
    if (!outgoingResponse.isDisposed) {
4✔
258
      return;
3✔
259
    }
260
    outgoingMessage.headers[KafkaHeaders.NEST_IS_DISPOSED] = Buffer.alloc(1);
1✔
261
  }
262

263
  public assignErrorHeader(
264
    outgoingResponse: OutgoingResponse,
265
    outgoingMessage: Message,
266
  ) {
267
    if (!outgoingResponse.err) {
4✔
268
      return;
3✔
269
    }
270
    const stringifiedError =
271
      typeof outgoingResponse.err === 'object'
1✔
272
        ? JSON.stringify(outgoingResponse.err)
1!
273
        : outgoingResponse.err;
274
    outgoingMessage.headers[KafkaHeaders.NEST_ERR] =
1✔
275
      Buffer.from(stringifiedError);
276
  }
277

278
  public assignCorrelationIdHeader(
279
    correlationId: string,
280
    outgoingMessage: Message,
281
  ) {
282
    outgoingMessage.headers[KafkaHeaders.CORRELATION_ID] =
4✔
283
      Buffer.from(correlationId);
284
  }
285

286
  public assignReplyPartition(
287
    replyPartition: string,
288
    outgoingMessage: Message,
289
  ) {
290
    if (isNil(replyPartition)) {
4✔
291
      return;
1✔
292
    }
293
    outgoingMessage.partition = parseFloat(replyPartition);
3✔
294
  }
295

296
  public async handleEvent(
297
    pattern: string,
298
    packet: ReadPacket,
299
    context: KafkaContext,
300
  ): Promise<any> {
301
    const handler = this.getHandlerByPattern(pattern);
5✔
302
    if (!handler) {
5✔
303
      return this.logger.error(NO_EVENT_HANDLER`${pattern}`);
2✔
304
    }
305
    const resultOrStream = await handler(packet.data, context);
3✔
306
    if (isObservable(resultOrStream)) {
2!
307
      await lastValueFrom(resultOrStream);
×
308
    }
309
  }
310

311
  protected initializeSerializer(options: KafkaOptions['options']) {
312
    this.serializer =
26✔
313
      (options && options.serializer) || new KafkaRequestSerializer();
77✔
314
  }
315

316
  protected initializeDeserializer(options: KafkaOptions['options']) {
317
    this.deserializer = options?.deserializer ?? new KafkaRequestDeserializer();
26!
318
  }
319
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc