• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

nestjs / nest / 4985bf1d-5e80-46f6-bbaf-e681bc066260

03 Apr 2025 08:01AM UTC coverage: 89.274% (-0.03%) from 89.308%
4985bf1d-5e80-46f6-bbaf-e681bc066260

Pull #14900

circleci

kamilmysliwiec
feat(core): call hooks by components hierarchy level
Pull Request #14900: feat(core): call hooks by components hierarchy level (order)

2685 of 3377 branches covered (79.51%)

66 of 72 new or added lines in 9 files covered. (91.67%)

1 existing line in 1 file now uncovered.

7166 of 8027 relevant lines covered (89.27%)

16.47 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

81.71
/packages/microservices/client/client-grpc.ts
1
import { Logger } from '@nestjs/common/services/logger.service';
1✔
2
import { loadPackage } from '@nestjs/common/utils/load-package.util';
1✔
3
import { isFunction, isObject } from '@nestjs/common/utils/shared.utils';
1✔
4
import { Observable, Subscription } from 'rxjs';
1✔
5
import { GRPC_DEFAULT_PROTO_LOADER, GRPC_DEFAULT_URL } from '../constants';
1✔
6
import { InvalidGrpcPackageException } from '../errors/invalid-grpc-package.exception';
1✔
7
import { InvalidGrpcServiceException } from '../errors/invalid-grpc-service.exception';
1✔
8
import { InvalidProtoDefinitionException } from '../errors/invalid-proto-definition.exception';
1✔
9
import { ChannelOptions } from '../external/grpc-options.interface';
10
import { getGrpcPackageDefinition } from '../helpers';
1✔
11
import { ClientGrpc, GrpcOptions } from '../interfaces';
12
import { ClientProxy } from './client-proxy';
1✔
13

14
const GRPC_CANCELLED = 'Cancelled';
1✔
15

16
// To enable type safety for gRPC. This cant be uncommented by default
17
// because it would require the user to install the @grpc/grpc-js package even if they dont use gRPC
18
// Otherwise, TypeScript would fail to compile the code.
19
//
20
// type GrpcClient = import('@grpc/grpc-js').Client;
21
// let grpcPackage = {} as typeof import('@grpc/grpc-js');
22
// let grpcProtoLoaderPackage = {} as typeof import('@grpc/proto-loader');
23

24
type GrpcClient = any;
25
let grpcPackage = {} as any;
1✔
26
let grpcProtoLoaderPackage = {} as any;
1✔
27

28
/**
29
 * @publicApi
30
 */
31
export class ClientGrpcProxy
1✔
32
  extends ClientProxy<never, never>
33
  implements ClientGrpc
34
{
35
  protected readonly logger = new Logger(ClientProxy.name);
49✔
36
  protected readonly clients = new Map<string, any>();
49✔
37
  protected readonly url: string;
38
  protected grpcClients: GrpcClient[] = [];
49✔
39

40
  get status(): never {
41
    throw new Error(
×
42
      'The "status" attribute is not supported by the gRPC transport',
43
    );
44
  }
45

46
  constructor(protected readonly options: Required<GrpcOptions>['options']) {
49✔
47
    super();
49✔
48
    this.url = this.getOptionsProp(options, 'url') || GRPC_DEFAULT_URL;
49✔
49

50
    const protoLoader =
51
      this.getOptionsProp(options, 'protoLoader') || GRPC_DEFAULT_PROTO_LOADER;
49✔
52

53
    grpcPackage = loadPackage('@grpc/grpc-js', ClientGrpcProxy.name, () =>
49✔
54
      require('@grpc/grpc-js'),
49✔
55
    );
56

57
    grpcProtoLoaderPackage = loadPackage(
49✔
58
      protoLoader,
59
      ClientGrpcProxy.name,
60
      () =>
61
        protoLoader === GRPC_DEFAULT_PROTO_LOADER
49✔
62
          ? require('@grpc/proto-loader')
49!
63
          : require(protoLoader),
64
    );
65
    this.grpcClients = this.createClients();
49✔
66
  }
67

68
  public getService<T extends object>(name: string): T {
69
    const grpcClient = this.createClientByServiceName(name);
6✔
70
    const clientRef = this.getClient(name);
3✔
71
    if (!clientRef) {
3!
72
      throw new InvalidGrpcServiceException(name);
×
73
    }
74

75
    const protoMethods = Object.keys(clientRef[name].prototype);
3✔
76
    const grpcService = {} as T;
3✔
77

78
    protoMethods.forEach(m => {
3✔
79
      grpcService[m] = this.createServiceMethod(grpcClient, m);
×
80
    });
81
    return grpcService;
3✔
82
  }
83

84
  public getClientByServiceName<T = unknown>(name: string): T {
UNCOV
85
    return this.clients.get(name) || this.createClientByServiceName(name);
×
86
  }
87

88
  public createClientByServiceName(name: string) {
89
    const clientRef = this.getClient(name);
6✔
90
    if (!clientRef) {
6✔
91
      throw new InvalidGrpcServiceException(name);
3✔
92
    }
93

94
    const channelOptions: ChannelOptions =
95
      this.options && this.options.channelOptions
3✔
96
        ? this.options.channelOptions
3!
97
        : {};
98
    if (this.options && this.options.maxSendMessageLength) {
3!
99
      channelOptions['grpc.max_send_message_length'] =
×
100
        this.options.maxSendMessageLength;
101
    }
102
    if (this.options && this.options.maxReceiveMessageLength) {
3!
103
      channelOptions['grpc.max_receive_message_length'] =
×
104
        this.options.maxReceiveMessageLength;
105
    }
106
    if (this.options && this.options.maxMetadataSize) {
3!
107
      channelOptions['grpc.max_metadata_size'] = this.options.maxMetadataSize;
×
108
    }
109

110
    const keepaliveOptions = this.getKeepaliveOptions();
3✔
111
    const options: Record<string, string | number> = {
3✔
112
      ...channelOptions,
113
      ...keepaliveOptions,
114
    };
115

116
    const credentials =
117
      this.options.credentials || grpcPackage.credentials.createInsecure();
3✔
118

119
    const grpcClient = new clientRef[name](this.url, credentials, options);
3✔
120
    this.clients.set(name, grpcClient);
3✔
121
    return grpcClient;
3✔
122
  }
123

124
  public getKeepaliveOptions() {
125
    if (!isObject(this.options.keepalive)) {
3!
126
      return {};
3✔
127
    }
128
    const keepaliveKeys: Record<
129
      keyof GrpcOptions['options']['keepalive'],
130
      string
131
    > = {
×
132
      keepaliveTimeMs: 'grpc.keepalive_time_ms',
133
      keepaliveTimeoutMs: 'grpc.keepalive_timeout_ms',
134
      keepalivePermitWithoutCalls: 'grpc.keepalive_permit_without_calls',
135
      http2MaxPingsWithoutData: 'grpc.http2.max_pings_without_data',
136
      http2MinTimeBetweenPingsMs: 'grpc.http2.min_time_between_pings_ms',
137
      http2MinPingIntervalWithoutDataMs:
138
        'grpc.http2.min_ping_interval_without_data_ms',
139
      http2MaxPingStrikes: 'grpc.http2.max_ping_strikes',
140
    };
141

142
    const keepaliveOptions = {};
×
143
    for (const [optionKey, optionValue] of Object.entries(
×
144
      this.options.keepalive,
145
    )) {
146
      const key = keepaliveKeys[optionKey];
×
147
      if (key === undefined) {
×
148
        continue;
×
149
      }
150
      keepaliveOptions[key] = optionValue;
×
151
    }
152
    return keepaliveOptions;
×
153
  }
154

155
  public createServiceMethod(
156
    client: any,
157
    methodName: string,
158
  ): (...args: unknown[]) => Observable<unknown> {
159
    return client[methodName].responseStream
2✔
160
      ? this.createStreamServiceMethod(client, methodName)
2✔
161
      : this.createUnaryServiceMethod(client, methodName);
162
  }
163

164
  public createStreamServiceMethod(
165
    client: unknown,
166
    methodName: string,
167
  ): (...args: any[]) => Observable<any> {
168
    return (...args: any[]) => {
6✔
169
      const isRequestStream = client![methodName].requestStream;
5✔
170
      const stream = new Observable(observer => {
5✔
171
        let isClientCanceled = false;
4✔
172
        let upstreamSubscription: Subscription | null = null;
4✔
173

174
        const upstreamSubjectOrData = args[0];
4✔
175
        const maybeMetadata = args[1];
4✔
176

177
        const isUpstreamSubject =
178
          upstreamSubjectOrData && isFunction(upstreamSubjectOrData.subscribe);
4✔
179

180
        const call =
181
          isRequestStream && isUpstreamSubject
4✔
182
            ? client![methodName](maybeMetadata)
4✔
183
            : client![methodName](...args);
184

185
        if (isRequestStream && isUpstreamSubject) {
4✔
186
          upstreamSubscription = upstreamSubjectOrData.subscribe(
1✔
187
            (val: unknown) => call.write(val),
1✔
188
            (err: unknown) => call.emit('error', err),
×
189
            () => call.end(),
×
190
          );
191
        }
192
        call.on('data', (data: any) => observer.next(data));
8✔
193
        call.on('error', (error: any) => {
4✔
194
          if (error.details === GRPC_CANCELLED) {
4✔
195
            call.destroy();
1✔
196
            if (isClientCanceled) {
1!
197
              return;
1✔
198
            }
199
          }
200
          observer.error(this.serializeError(error));
1✔
201
        });
202
        call.on('end', () => {
2✔
203
          if (upstreamSubscription) {
1!
204
            upstreamSubscription.unsubscribe();
×
205
            upstreamSubscription = null;
×
206
          }
207
          call.removeAllListeners();
1✔
208
          observer.complete();
1✔
209
        });
210
        return () => {
2✔
211
          if (upstreamSubscription) {
2!
212
            upstreamSubscription.unsubscribe();
×
213
            upstreamSubscription = null;
×
214
          }
215

216
          if (call.finished) {
2✔
217
            return undefined;
1✔
218
          }
219
          isClientCanceled = true;
1✔
220
          call.cancel();
1✔
221
        };
222
      });
223
      return stream;
5✔
224
    };
225
  }
226

227
  public createUnaryServiceMethod(
228
    client: any,
229
    methodName: string,
230
  ): (...args: any[]) => Observable<any> {
231
    return (...args: any[]) => {
6✔
232
      const isRequestStream = client[methodName].requestStream;
5✔
233
      const upstreamSubjectOrData = args[0];
5✔
234
      const isUpstreamSubject =
235
        upstreamSubjectOrData && isFunction(upstreamSubjectOrData.subscribe);
5✔
236

237
      if (isRequestStream && isUpstreamSubject) {
5✔
238
        return new Observable(observer => {
2✔
239
          let isClientCanceled = false;
3✔
240
          const callArgs = [
3✔
241
            (error: any, data: unknown) => {
242
              if (error) {
2!
243
                if (error.details === GRPC_CANCELLED || error.code === 1) {
×
244
                  call.destroy();
×
245
                  if (isClientCanceled) {
×
246
                    return;
×
247
                  }
248
                }
249
                return observer.error(this.serializeError(error));
×
250
              }
251
              observer.next(data);
2✔
252
              observer.complete();
2✔
253
            },
254
          ];
255
          const maybeMetadata = args[1];
3✔
256
          if (maybeMetadata) {
3!
257
            callArgs.unshift(maybeMetadata);
×
258
          }
259
          const call = client[methodName](...callArgs);
3✔
260

261
          const upstreamSubscription: Subscription =
262
            upstreamSubjectOrData.subscribe(
3✔
263
              (val: unknown) => call.write(val),
2✔
264
              (err: unknown) => call.emit('error', err),
×
265
              () => call.end(),
×
266
            );
267

268
          return () => {
3✔
269
            upstreamSubscription.unsubscribe();
2✔
270
            if (!call.finished) {
2!
271
              isClientCanceled = true;
2✔
272
              call.cancel();
2✔
273
            }
274
          };
275
        });
276
      }
277
      return new Observable(observer => {
3✔
278
        const call = client[methodName](...args, (error: any, data: any) => {
2✔
279
          if (error) {
2!
280
            return observer.error(this.serializeError(error));
×
281
          }
282
          observer.next(data);
2✔
283
          observer.complete();
2✔
284
        });
285

286
        return () => {
2✔
287
          if (!call.finished) {
2✔
288
            call.cancel();
1✔
289
          }
290
        };
291
      });
292
    };
293
  }
294

295
  public createClients(): any[] {
296
    const grpcContext = this.loadProto();
50✔
297
    const packageOption = this.getOptionsProp(this.options, 'package');
50✔
298
    const grpcPackages: any[] = [];
50✔
299
    const packageNames = Array.isArray(packageOption)
50✔
300
      ? packageOption
50✔
301
      : [packageOption];
302

303
    for (const packageName of packageNames) {
50✔
304
      const grpcPkg = this.lookupPackage(grpcContext, packageName);
74✔
305

306
      if (!grpcPkg) {
74✔
307
        const invalidPackageError = new InvalidGrpcPackageException(
1✔
308
          packageName,
309
        );
310
        this.logger.error(
1✔
311
          invalidPackageError.message,
312
          invalidPackageError.stack,
313
        );
314
        throw invalidPackageError;
1✔
315
      }
316
      grpcPackages.push(grpcPkg);
73✔
317
    }
318
    return grpcPackages;
49✔
319
  }
320

321
  public loadProto(): any {
322
    try {
51✔
323
      const packageDefinition = getGrpcPackageDefinition(
51✔
324
        this.options,
325
        grpcProtoLoaderPackage,
326
      );
327
      return grpcPackage.loadPackageDefinition(packageDefinition);
50✔
328
    } catch (err) {
329
      const invalidProtoError = new InvalidProtoDefinitionException(err.path);
1✔
330
      const message =
331
        err && err.message ? err.message : invalidProtoError.message;
1!
332

333
      this.logger.error(message, invalidProtoError.stack);
1✔
334
      throw invalidProtoError;
1✔
335
    }
336
  }
337

338
  public lookupPackage(root: any, packageName: string) {
339
    /** Reference: https://github.com/kondi/rxjs-grpc */
340
    let pkg = root;
75✔
341

342
    if (packageName) {
75✔
343
      for (const name of packageName.split('.')) {
73✔
344
        pkg = pkg[name];
73✔
345
      }
346
    }
347

348
    return pkg;
75✔
349
  }
350

351
  public close() {
352
    this.clients.forEach(client => {
1✔
353
      if (client && isFunction(client.close)) {
1!
354
        client.close();
1✔
355
      }
356
    });
357
    this.clients.clear();
1✔
358
    this.grpcClients = [];
1✔
359
  }
360

361
  public async connect(): Promise<any> {
362
    throw new Error('The "connect()" method is not supported in gRPC mode.');
1✔
363
  }
364

365
  public send<TResult = any, TInput = any>(
366
    pattern: any,
367
    data: TInput,
368
  ): Observable<TResult> {
369
    throw new Error(
1✔
370
      'Method is not supported in gRPC mode. Use ClientGrpc instead (learn more in the documentation).',
371
    );
372
  }
373

374
  protected getClient(name: string): any {
375
    return this.grpcClients.find(client =>
9✔
376
      Object.hasOwnProperty.call(client, name),
11✔
377
    );
378
  }
379

380
  protected publish(packet: any, callback: (packet: any) => any): any {
381
    throw new Error(
1✔
382
      'Method is not supported in gRPC mode. Use ClientGrpc instead (learn more in the documentation).',
383
    );
384
  }
385

386
  protected async dispatchEvent(packet: any): Promise<any> {
387
    throw new Error(
1✔
388
      'Method is not supported in gRPC mode. Use ClientGrpc instead (learn more in the documentation).',
389
    );
390
  }
391

392
  public on<EventKey extends never = never, EventCallback = any>(
393
    event: EventKey,
394
    callback: EventCallback,
395
  ) {
396
    throw new Error('Method is not supported in gRPC mode.');
×
397
  }
398

399
  public unwrap<T>(): T {
400
    throw new Error('Method is not supported in gRPC mode.');
×
401
  }
402
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc