• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

nestjs / nest / e79cdc3c-762e-40cd-acb3-21e17deb1cd9

18 Aug 2024 03:11PM UTC coverage: 92.124% (-0.09%) from 92.213%
e79cdc3c-762e-40cd-acb3-21e17deb1cd9

Pull #13485

circleci

DylanVeldra
fix(core): merge req context with tenant payload in the request instance
Pull Request #13485: fix(core): merge request context with tenant context payload in the request singleton

2559 of 3078 branches covered (83.14%)

1 of 2 new or added lines in 2 files covered. (50.0%)

74 existing lines in 13 files now uncovered.

6737 of 7313 relevant lines covered (92.12%)

17.03 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

94.78
/packages/microservices/server/server-kafka.ts
1
import { Logger } from '@nestjs/common/services/logger.service';
1✔
2
import { isNil } from '@nestjs/common/utils/shared.utils';
1✔
3
import { isObservable, lastValueFrom, Observable, ReplaySubject } from 'rxjs';
1✔
4
import {
1✔
5
  KAFKA_DEFAULT_BROKER,
6
  KAFKA_DEFAULT_CLIENT,
7
  KAFKA_DEFAULT_GROUP,
8
  NO_EVENT_HANDLER,
9
  NO_MESSAGE_HANDLER,
10
} from '../constants';
11
import { KafkaContext } from '../ctx-host';
1✔
12
import { KafkaRequestDeserializer } from '../deserializers/kafka-request.deserializer';
1✔
13
import { KafkaHeaders, Transport } from '../enums';
1✔
14
import { KafkaRetriableException } from '../exceptions';
1✔
15
import {
16
  BrokersFunction,
17
  Consumer,
18
  ConsumerConfig,
19
  EachMessagePayload,
20
  Kafka,
21
  KafkaConfig,
22
  KafkaMessage,
23
  Message,
24
  Producer,
25
  RecordMetadata,
26
} from '../external/kafka.interface';
27
import { KafkaLogger, KafkaParser } from '../helpers';
1✔
28
import {
29
  CustomTransportStrategy,
30
  KafkaOptions,
31
  OutgoingResponse,
32
  ReadPacket,
33
} from '../interfaces';
34
import { KafkaRequestSerializer } from '../serializers/kafka-request.serializer';
1✔
35
import { Server } from './server';
1✔
36

37
let kafkaPackage: any = {};
1✔
38

39
export class ServerKafka extends Server implements CustomTransportStrategy {
1✔
40
  public readonly transportId = Transport.KAFKA;
26✔
41

42
  protected logger = new Logger(ServerKafka.name);
26✔
43
  protected client: Kafka = null;
26✔
44
  protected consumer: Consumer = null;
26✔
45
  protected producer: Producer = null;
26✔
46
  protected parser: KafkaParser = null;
26✔
47

48
  protected brokers: string[] | BrokersFunction;
49
  protected clientId: string;
50
  protected groupId: string;
51

52
  constructor(protected readonly options: KafkaOptions['options']) {
26✔
53
    super();
26✔
54

55
    const clientOptions =
56
      this.getOptionsProp(this.options, 'client') || ({} as KafkaConfig);
26✔
57
    const consumerOptions =
58
      this.getOptionsProp(this.options, 'consumer') || ({} as ConsumerConfig);
26✔
59
    const postfixId =
60
      this.getOptionsProp(this.options, 'postfixId') ?? '-server';
26✔
61

62
    this.brokers = clientOptions.brokers || [KAFKA_DEFAULT_BROKER];
26✔
63

64
    // append a unique id to the clientId and groupId
65
    // so they don't collide with a microservices client
66
    this.clientId =
26✔
67
      (clientOptions.clientId || KAFKA_DEFAULT_CLIENT) + postfixId;
52✔
68
    this.groupId = (consumerOptions.groupId || KAFKA_DEFAULT_GROUP) + postfixId;
26✔
69

70
    kafkaPackage = this.loadPackage('kafkajs', ServerKafka.name, () =>
26✔
71
      require('kafkajs'),
26✔
72
    );
73

74
    this.parser = new KafkaParser((options && options.parser) || undefined);
26✔
75

76
    this.initializeSerializer(options);
26✔
77
    this.initializeDeserializer(options);
26✔
78
  }
79

80
  public async listen(
81
    callback: (err?: unknown, ...optionalParams: unknown[]) => void,
82
  ): Promise<void> {
83
    try {
6✔
84
      this.client = this.createClient();
6✔
85
      await this.start(callback);
6✔
86
    } catch (err) {
87
      callback(err);
1✔
88
    }
89
  }
90

91
  public async close(): Promise<void> {
92
    this.consumer && (await this.consumer.disconnect());
1✔
93
    this.producer && (await this.producer.disconnect());
1✔
94
    this.consumer = null;
1✔
95
    this.producer = null;
1✔
96
    this.client = null;
1✔
97
  }
98

99
  public async start(callback: () => void): Promise<void> {
100
    const consumerOptions = Object.assign(this.options.consumer || {}, {
5✔
101
      groupId: this.groupId,
102
    });
103
    this.consumer = this.client.consumer(consumerOptions);
5✔
104
    this.producer = this.client.producer(this.options.producer);
5✔
105

106
    await this.consumer.connect();
5✔
107
    await this.producer.connect();
5✔
108
    await this.bindEvents(this.consumer);
5✔
109
    callback();
5✔
110
  }
111

112
  public createClient<T = any>(): T {
113
    return new kafkaPackage.Kafka(
1✔
114
      Object.assign(
115
        { logCreator: KafkaLogger.bind(null, this.logger) },
116
        this.options.client,
117
        { clientId: this.clientId, brokers: this.brokers },
118
      ) as KafkaConfig,
119
    );
120
  }
121

122
  public async bindEvents(consumer: Consumer) {
123
    const registeredPatterns = [...this.messageHandlers.keys()];
7✔
124
    const consumerSubscribeOptions = this.options.subscribe || {};
7✔
125

126
    if (registeredPatterns.length > 0) {
7✔
127
      await this.consumer.subscribe({
2✔
128
        ...consumerSubscribeOptions,
129
        topics: registeredPatterns,
130
      });
131
    }
132

133
    const consumerRunOptions = Object.assign(this.options.run || {}, {
7✔
134
      eachMessage: this.getMessageHandler(),
135
    });
136
    await consumer.run(consumerRunOptions);
7✔
137
  }
138

139
  public getMessageHandler() {
140
    return async (payload: EachMessagePayload) => this.handleMessage(payload);
9✔
141
  }
142

143
  public getPublisher(
144
    replyTopic: string,
145
    replyPartition: string,
146
    correlationId: string,
147
  ): (data: any) => Promise<RecordMetadata[]> {
148
    return (data: any) =>
3✔
149
      this.sendMessage(data, replyTopic, replyPartition, correlationId);
1✔
150
  }
151

152
  public async handleMessage(payload: EachMessagePayload) {
153
    const channel = payload.topic;
6✔
154
    const rawMessage = this.parser.parse<KafkaMessage>(
6✔
155
      Object.assign(payload.message, {
156
        topic: payload.topic,
157
        partition: payload.partition,
158
      }),
159
    );
160
    const headers = rawMessage.headers as unknown as Record<string, any>;
6✔
161
    const correlationId = headers[KafkaHeaders.CORRELATION_ID];
6✔
162
    const replyTopic = headers[KafkaHeaders.REPLY_TOPIC];
6✔
163
    const replyPartition = headers[KafkaHeaders.REPLY_PARTITION];
6✔
164

165
    const packet = await this.deserializer.deserialize(rawMessage, { channel });
6✔
166
    const kafkaContext = new KafkaContext([
6✔
167
      rawMessage,
168
      payload.partition,
169
      payload.topic,
170
      this.consumer,
171
      payload.heartbeat,
172
      this.producer,
173
    ]);
174
    const handler = this.getHandlerByPattern(packet.pattern);
6✔
175
    // if the correlation id or reply topic is not set
176
    // then this is an event (events could still have correlation id)
177
    if (handler?.isEventHandler || !correlationId || !replyTopic) {
6✔
178
      return this.handleEvent(packet.pattern, packet, kafkaContext);
3✔
179
    }
180

181
    const publish = this.getPublisher(
3✔
182
      replyTopic,
183
      replyPartition,
184
      correlationId,
185
    );
186

187
    if (!handler) {
3✔
188
      return publish({
1✔
189
        id: correlationId,
190
        err: NO_MESSAGE_HANDLER,
191
      });
192
    }
193

194
    const response$ = this.transformToObservable(
2✔
195
      handler(packet.data, kafkaContext),
196
    );
197

198
    const replayStream$ = new ReplaySubject();
2✔
199
    await this.combineStreamsAndThrowIfRetriable(response$, replayStream$);
2✔
200

201
    this.send(replayStream$, publish);
2✔
202
  }
203

204
  private combineStreamsAndThrowIfRetriable(
205
    response$: Observable<any>,
206
    replayStream$: ReplaySubject<unknown>,
207
  ) {
208
    return new Promise<void>((resolve, reject) => {
2✔
209
      let isPromiseResolved = false;
2✔
210
      response$.subscribe({
2✔
211
        next: val => {
212
          replayStream$.next(val);
2✔
213
          if (!isPromiseResolved) {
2!
214
            isPromiseResolved = true;
2✔
215
            resolve();
2✔
216
          }
217
        },
218
        error: err => {
UNCOV
219
          if (err instanceof KafkaRetriableException && !isPromiseResolved) {
×
UNCOV
220
            isPromiseResolved = true;
×
UNCOV
221
            reject(err);
×
222
          } else {
223
            resolve();
×
224
          }
UNCOV
225
          replayStream$.error(err);
×
226
        },
227
        complete: () => replayStream$.complete(),
2✔
228
      });
229
    });
230
  }
231

232
  public async sendMessage(
233
    message: OutgoingResponse,
234
    replyTopic: string,
235
    replyPartition: string,
236
    correlationId: string,
237
  ): Promise<RecordMetadata[]> {
238
    const outgoingMessage = await this.serializer.serialize(message.response);
4✔
239
    this.assignReplyPartition(replyPartition, outgoingMessage);
4✔
240
    this.assignCorrelationIdHeader(correlationId, outgoingMessage);
4✔
241
    this.assignErrorHeader(message, outgoingMessage);
4✔
242
    this.assignIsDisposedHeader(message, outgoingMessage);
4✔
243

244
    const replyMessage = Object.assign(
4✔
245
      {
246
        topic: replyTopic,
247
        messages: [outgoingMessage],
248
      },
249
      this.options.send || {},
8✔
250
    );
251
    return this.producer.send(replyMessage);
4✔
252
  }
253

254
  public assignIsDisposedHeader(
255
    outgoingResponse: OutgoingResponse,
256
    outgoingMessage: Message,
257
  ) {
258
    if (!outgoingResponse.isDisposed) {
4✔
259
      return;
3✔
260
    }
261
    outgoingMessage.headers[KafkaHeaders.NEST_IS_DISPOSED] = Buffer.alloc(1);
1✔
262
  }
263

264
  public assignErrorHeader(
265
    outgoingResponse: OutgoingResponse,
266
    outgoingMessage: Message,
267
  ) {
268
    if (!outgoingResponse.err) {
4✔
269
      return;
3✔
270
    }
271
    const stringifiedError =
272
      typeof outgoingResponse.err === 'object'
1✔
273
        ? JSON.stringify(outgoingResponse.err)
1!
274
        : outgoingResponse.err;
275
    outgoingMessage.headers[KafkaHeaders.NEST_ERR] =
1✔
276
      Buffer.from(stringifiedError);
277
  }
278

279
  public assignCorrelationIdHeader(
280
    correlationId: string,
281
    outgoingMessage: Message,
282
  ) {
283
    outgoingMessage.headers[KafkaHeaders.CORRELATION_ID] =
4✔
284
      Buffer.from(correlationId);
285
  }
286

287
  public assignReplyPartition(
288
    replyPartition: string,
289
    outgoingMessage: Message,
290
  ) {
291
    if (isNil(replyPartition)) {
4✔
292
      return;
1✔
293
    }
294
    outgoingMessage.partition = parseFloat(replyPartition);
3✔
295
  }
296

297
  public async handleEvent(
298
    pattern: string,
299
    packet: ReadPacket,
300
    context: KafkaContext,
301
  ): Promise<any> {
302
    const handler = this.getHandlerByPattern(pattern);
5✔
303
    if (!handler) {
5✔
304
      return this.logger.error(NO_EVENT_HANDLER`${pattern}`);
2✔
305
    }
306
    const resultOrStream = await handler(packet.data, context);
3✔
307
    if (isObservable(resultOrStream)) {
2!
UNCOV
308
      await lastValueFrom(resultOrStream);
×
309
    }
310
  }
311

312
  protected initializeSerializer(options: KafkaOptions['options']) {
313
    this.serializer =
26✔
314
      (options && options.serializer) || new KafkaRequestSerializer();
77✔
315
  }
316

317
  protected initializeDeserializer(options: KafkaOptions['options']) {
318
    this.deserializer = options?.deserializer ?? new KafkaRequestDeserializer();
26✔
319
  }
320
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc