• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

nestjs / nest / f7142b24-e394-4028-89c6-717e281431d1

25 Nov 2024 01:12PM UTC coverage: 90.141% (-1.8%) from 91.989%
f7142b24-e394-4028-89c6-717e281431d1

Pull #14177

circleci

web-flow
Merge pull request #14200 from nestjs/feat/allow-queue-per-handler

feat(microservices): support nats queue per handler
Pull Request #14177: release: version 11.0.0

2612 of 3236 branches covered (80.72%)

496 of 704 new or added lines in 48 files covered. (70.45%)

79 existing lines in 11 files now uncovered.

6985 of 7749 relevant lines covered (90.14%)

16.31 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

92.42
/packages/microservices/server/server-grpc.ts
1
import {
1✔
2
  isObject,
3
  isString,
4
  isUndefined,
5
} from '@nestjs/common/utils/shared.utils';
6
import {
1✔
7
  EMPTY,
8
  Observable,
9
  Subject,
10
  Subscription,
11
  defaultIfEmpty,
12
  fromEvent,
13
  lastValueFrom,
14
} from 'rxjs';
15
import { catchError, takeUntil } from 'rxjs/operators';
1✔
16
import { GRPC_DEFAULT_PROTO_LOADER, GRPC_DEFAULT_URL } from '../constants';
1✔
17
import { GrpcMethodStreamingType } from '../decorators';
1✔
18
import { Transport } from '../enums';
1✔
19
import { InvalidGrpcPackageException } from '../errors/invalid-grpc-package.exception';
1✔
20
import { InvalidProtoDefinitionException } from '../errors/invalid-proto-definition.exception';
1✔
21
import { ChannelOptions } from '../external/grpc-options.interface';
22
import { getGrpcPackageDefinition } from '../helpers';
1✔
23
import { MessageHandler } from '../interfaces';
24
import { GrpcOptions } from '../interfaces/microservice-configuration.interface';
25
import { Server } from './server';
1✔
26

27
const CANCELLED_EVENT = 'cancelled';
1✔
28

29
// To enable type safety for gRPC. This cant be uncommented by default
30
// because it would require the user to install the @grpc/grpc-js package even if they dont use gRPC
31
// Otherwise, TypeScript would fail to compile the code.
32
//
33
// type GrpcServer = import('@grpc/grpc-js').Server;
34
// let grpcPackage = {} as typeof import('@grpc/grpc-js');
35
// let grpcProtoLoaderPackage = {} as typeof import('@grpc/proto-loader');
36

37
type GrpcServer = any;
38
let grpcPackage = {} as any;
1✔
39
let grpcProtoLoaderPackage = {} as any;
1✔
40

41
interface GrpcCall<TRequest = any, TMetadata = any> {
42
  request: TRequest;
43
  metadata: TMetadata;
44
  sendMetadata: Function;
45
  end: Function;
46
  write: Function;
47
  on: Function;
48
  off: Function;
49
  emit: Function;
50
}
51

52
/**
53
 * @publicApi
54
 */
55
export class ServerGrpc extends Server<never, never> {
1✔
56
  public readonly transportId = Transport.GRPC;
97✔
57
  protected readonly url: string;
58
  protected grpcClient: GrpcServer;
59

60
  get status(): never {
NEW
61
    throw new Error(
×
62
      'The "status" attribute is not supported by the gRPC transport',
63
    );
64
  }
65

66
  constructor(private readonly options: GrpcOptions['options']) {
97✔
67
    super();
97✔
68
    this.url = this.getOptionsProp(options, 'url') || GRPC_DEFAULT_URL;
97✔
69

70
    const protoLoader =
71
      this.getOptionsProp(options, 'protoLoader') || GRPC_DEFAULT_PROTO_LOADER;
97✔
72

73
    grpcPackage = this.loadPackage('@grpc/grpc-js', ServerGrpc.name, () =>
97✔
74
      require('@grpc/grpc-js'),
97✔
75
    );
76
    grpcProtoLoaderPackage = this.loadPackage(
97✔
77
      protoLoader,
78
      ServerGrpc.name,
79
      () =>
80
        protoLoader === GRPC_DEFAULT_PROTO_LOADER
97✔
81
          ? require('@grpc/proto-loader')
97!
82
          : require(protoLoader),
83
    );
84
  }
85

86
  public async listen(
87
    callback: (err?: unknown, ...optionalParams: unknown[]) => void,
88
  ) {
89
    try {
5✔
90
      this.grpcClient = await this.createClient();
5✔
91
      await this.start(callback);
5✔
92
    } catch (err) {
93
      callback(err);
1✔
94
    }
95
  }
96

97
  public async start(callback?: () => void) {
98
    await this.bindEvents();
4✔
99
    callback();
4✔
100
  }
101

102
  public async bindEvents() {
103
    const grpcContext = this.loadProto();
4✔
104
    const packageOption = this.getOptionsProp(this.options, 'package');
4✔
105
    const packageNames = Array.isArray(packageOption)
4✔
106
      ? packageOption
4✔
107
      : [packageOption];
108

109
    for (const packageName of packageNames) {
4✔
110
      const grpcPkg = this.lookupPackage(grpcContext, packageName);
5✔
111
      await this.createServices(grpcPkg, packageName);
5✔
112
    }
113
  }
114

115
  /**
116
   * Will return all of the services along with their fully namespaced
117
   * names as an array of objects.
118
   * This method initiates recursive scan of grpcPkg object
119
   */
120
  public getServiceNames(grpcPkg: any): { name: string; service: any }[] {
121
    // Define accumulator to collect all of the services available to load
122
    const services: { name: string; service: any }[] = [];
3✔
123
    // Initiate recursive services collector starting with empty name
124
    this.collectDeepServices('', grpcPkg, services);
3✔
125
    return services;
3✔
126
  }
127

128
  /**
129
   * Will create service mapping from gRPC generated Object to handlers
130
   * defined with @GrpcMethod or @GrpcStreamMethod annotations
131
   *
132
   * @param grpcService
133
   * @param name
134
   */
135
  public async createService(grpcService: any, name: string) {
136
    const service = {};
7✔
137

138
    for (const methodName in grpcService.prototype) {
7✔
139
      let methodHandler = null;
4✔
140
      let streamingType = GrpcMethodStreamingType.NO_STREAMING;
4✔
141

142
      const methodFunction = grpcService.prototype[methodName];
4✔
143
      const methodReqStreaming = methodFunction.requestStream;
4✔
144

145
      if (!isUndefined(methodReqStreaming) && methodReqStreaming) {
4✔
146
        // Try first pattern to be presented, RX streaming pattern would be
147
        // a preferable pattern to select among a few defined
148
        methodHandler = this.getMessageHandler(
2✔
149
          name,
150
          methodName,
151
          GrpcMethodStreamingType.RX_STREAMING,
152
          methodFunction,
153
        );
154
        streamingType = GrpcMethodStreamingType.RX_STREAMING;
2✔
155
        // If first pattern didn't match to any of handlers then try
156
        // pass-through handler to be presented
157
        if (!methodHandler) {
2✔
158
          methodHandler = this.getMessageHandler(
1✔
159
            name,
160
            methodName,
161
            GrpcMethodStreamingType.PT_STREAMING,
162
            methodFunction,
163
          );
164
          streamingType = GrpcMethodStreamingType.PT_STREAMING;
1✔
165
        }
166
      } else {
167
        // Select handler if any presented for No-Streaming pattern
168
        methodHandler = this.getMessageHandler(
2✔
169
          name,
170
          methodName,
171
          GrpcMethodStreamingType.NO_STREAMING,
172
          methodFunction,
173
        );
174
        streamingType = GrpcMethodStreamingType.NO_STREAMING;
2✔
175
      }
176
      if (!methodHandler) {
4✔
177
        continue;
1✔
178
      }
179
      service[methodName] = await this.createServiceMethod(
3✔
180
        methodHandler,
181
        grpcService.prototype[methodName],
182
        streamingType,
183
      );
184
    }
185
    return service;
7✔
186
  }
187

188
  public getMessageHandler(
189
    serviceName: string,
190
    methodName: string,
191
    streaming: GrpcMethodStreamingType,
192
    grpcMethod: { path?: string },
193
  ) {
194
    let pattern = this.createPattern(serviceName, methodName, streaming);
8✔
195
    let methodHandler = this.messageHandlers.get(pattern);
8✔
196
    if (!methodHandler) {
8✔
197
      const packageServiceName = grpcMethod.path?.split?.('/')[1];
5✔
198
      pattern = this.createPattern(packageServiceName, methodName, streaming);
5✔
199
      methodHandler = this.messageHandlers.get(pattern);
5✔
200
    }
201
    return methodHandler;
8✔
202
  }
203

204
  /**
205
   * Will create a string of a JSON serialized format
206
   *
207
   * @param service name of the service which should be a match to gRPC service definition name
208
   * @param methodName name of the method which is coming after rpc keyword
209
   * @param streaming GrpcMethodStreamingType parameter which should correspond to
210
   * stream keyword in gRPC service request part
211
   */
212
  public createPattern(
213
    service: string,
214
    methodName: string,
215
    streaming: GrpcMethodStreamingType,
216
  ): string {
217
    return JSON.stringify({
9✔
218
      service,
219
      rpc: methodName,
220
      streaming,
221
    });
222
  }
223

224
  /**
225
   * Will return async function which will handle gRPC call
226
   * with Rx streams or as a direct call passthrough
227
   *
228
   * @param methodHandler
229
   * @param protoNativeHandler
230
   * @param streamType
231
   */
232
  public createServiceMethod(
233
    methodHandler: Function,
234
    protoNativeHandler: any,
235
    streamType: GrpcMethodStreamingType,
236
  ): Function {
237
    // If proto handler has request stream as "true" then we expect it to have
238
    // streaming from the side of requester
239
    if (protoNativeHandler.requestStream) {
4✔
240
      // If any handlers were defined with GrpcStreamMethod annotation use RX
241
      if (streamType === GrpcMethodStreamingType.RX_STREAMING) {
2✔
242
        return this.createRequestStreamMethod(
1✔
243
          methodHandler,
244
          protoNativeHandler.responseStream,
245
        );
246
      }
247
      // If any handlers were defined with GrpcStreamCall annotation
248
      else if (streamType === GrpcMethodStreamingType.PT_STREAMING) {
1!
249
        return this.createStreamCallMethod(
1✔
250
          methodHandler,
251
          protoNativeHandler.responseStream,
252
        );
253
      }
254
    }
255
    return protoNativeHandler.responseStream
2✔
256
      ? this.createStreamServiceMethod(methodHandler)
2✔
257
      : this.createUnaryServiceMethod(methodHandler);
258
  }
259

260
  public createUnaryServiceMethod(methodHandler: Function): Function {
261
    return async (call: GrpcCall, callback: Function) => {
4✔
262
      const handler = methodHandler(call.request, call.metadata, call);
2✔
263
      this.transformToObservable(await handler).subscribe({
2✔
264
        next: async data => callback(null, await data),
2✔
UNCOV
265
        error: (err: any) => callback(err),
×
266
      });
267
    };
268
  }
269

270
  public createStreamServiceMethod(methodHandler: Function): Function {
271
    return async (call: GrpcCall, callback: Function) => {
5✔
272
      const handler = methodHandler(call.request, call.metadata, call);
3✔
273
      const result$ = this.transformToObservable(await handler);
3✔
274
      await this.writeObservableToGrpc(result$, call);
3✔
275
    };
276
  }
277

278
  public unwrap<T>(): T {
NEW
279
    throw new Error('Method is not supported for gRPC transport');
×
280
  }
281

282
  public on<
283
    EventKey extends string | number | symbol = string | number | symbol,
284
    EventCallback = any,
285
  >(event: EventKey, callback: EventCallback) {
NEW
286
    throw new Error('Method is not supported in gRPC mode.');
×
287
  }
288

289
  /**
290
   * Writes an observable to a GRPC call.
291
   *
292
   * This function will ensure that backpressure is managed while writing values
293
   * that come from an observable to a GRPC call.
294
   *
295
   * @param source The observable we want to write out to the GRPC call.
296
   * @param call The GRPC call we want to write to.
297
   * @returns A promise that resolves when we're done writing to the call.
298
   */
299
  private writeObservableToGrpc<T>(
300
    source: Observable<T>,
301
    call: GrpcCall<T>,
302
  ): Promise<void> {
303
    // this promise should **not** reject, as we're handling errors in the observable for the Call
304
    // the promise is only needed to signal when writing/draining has been completed
305
    return new Promise((resolve, _doNotUse) => {
9✔
306
      const valuesWaitingToBeDrained: T[] = [];
9✔
307
      let shouldErrorAfterDraining = false;
9✔
308
      let error: any;
309
      let shouldResolveAfterDraining = false;
9✔
310
      let writing = true;
9✔
311

312
      // Used to manage finalization
313
      const subscription = new Subscription();
9✔
314

315
      // If the call is cancelled, unsubscribe from the source
316
      const cancelHandler = () => {
9✔
317
        subscription.unsubscribe();
2✔
318
        // Calls that are cancelled by the client should be successfully resolved here
319
        resolve();
2✔
320
      };
321
      call.on(CANCELLED_EVENT, cancelHandler);
9✔
322
      subscription.add(() => call.off(CANCELLED_EVENT, cancelHandler));
9✔
323

324
      // In all cases, when we finalize, end the writable stream
325
      // being careful that errors and writes must be emitted _before_ this call is ended
326
      subscription.add(() => call.end());
9✔
327

328
      const drain = () => {
9✔
329
        writing = true;
9✔
330
        while (valuesWaitingToBeDrained.length > 0) {
9✔
331
          const value = valuesWaitingToBeDrained.shift()!;
11✔
332
          if (writing) {
11!
333
            // The first time `call.write` returns false, we need to stop.
334
            // It wrote the value, but it won't write anything else.
335
            writing = call.write(value);
11✔
336
            if (!writing) {
11✔
337
              // We can't write anymore so we need to wait for the drain event
338
              return;
3✔
339
            }
340
          }
341
        }
342

343
        if (shouldResolveAfterDraining) {
6✔
344
          subscription.unsubscribe();
1✔
345
          resolve();
1✔
346
        } else if (shouldErrorAfterDraining) {
5✔
347
          call.emit('error', error);
1✔
348
          subscription.unsubscribe();
1✔
349
          resolve();
1✔
350
        }
351
      };
352

353
      call.on('drain', drain);
9✔
354
      subscription.add(() => call.off('drain', drain));
9✔
355

356
      subscription.add(
9✔
357
        source.subscribe({
358
          next(value) {
359
            if (writing) {
26✔
360
              writing = call.write(value);
15✔
361
            } else {
362
              // If we can't write, that's because we need to
363
              // wait for the drain event before we can write again
364
              // buffer the value and wait for the drain event
365
              valuesWaitingToBeDrained.push(value);
11✔
366
            }
367
          },
368
          error(err) {
369
            if (valuesWaitingToBeDrained.length === 0) {
2✔
370
              // We're not waiting for a drain event, so we can just
371
              // reject and teardown.
372
              call.emit('error', err);
1✔
373
              subscription.unsubscribe();
1✔
374
              resolve();
1✔
375
            } else {
376
              // We're waiting for a drain event, record the
377
              // error so it can be handled after everything is drained.
378
              shouldErrorAfterDraining = true;
1✔
379
              error = err;
1✔
380
            }
381
          },
382
          complete() {
383
            if (valuesWaitingToBeDrained.length === 0) {
5✔
384
              // We're not waiting for a drain event, so we can just
385
              // resolve and teardown.
386
              subscription.unsubscribe();
4✔
387
              resolve();
4✔
388
            } else {
389
              shouldResolveAfterDraining = true;
1✔
390
            }
391
          },
392
        }),
393
      );
394
    });
395
  }
396

397
  public createRequestStreamMethod(
398
    methodHandler: Function,
399
    isResponseStream: boolean,
400
  ) {
401
    return async (
11✔
402
      call: GrpcCall,
403
      callback: (err: unknown, value: unknown) => void,
404
    ) => {
405
      const req = new Subject<any>();
10✔
406
      call.on('data', (m: any) => req.next(m));
10✔
407
      call.on('error', (e: any) => {
10✔
408
        // Check if error means that stream ended on other end
409
        const isCancelledError = String(e).toLowerCase().indexOf('cancelled');
5✔
410

411
        if (isCancelledError) {
5!
412
          call.end();
5✔
413
          return;
5✔
414
        }
415
        // If another error then just pass it along
UNCOV
416
        req.error(e);
×
417
      });
418
      call.on('end', () => req.complete());
10✔
419

420
      const handler = methodHandler(req.asObservable(), call.metadata, call);
10✔
421
      const res = this.transformToObservable(await handler);
10✔
422
      if (isResponseStream) {
10✔
423
        await this.writeObservableToGrpc(res, call);
6✔
424
      } else {
425
        const response = await lastValueFrom(
4✔
426
          res.pipe(
427
            takeUntil(fromEvent(call as any, CANCELLED_EVENT)),
428
            catchError(err => {
429
              callback(err, null);
1✔
430
              return EMPTY;
1✔
431
            }),
432
            defaultIfEmpty(undefined),
433
          ),
434
        );
435

436
        if (!isUndefined(response)) {
4✔
437
          callback(null, response);
1✔
438
        }
439
      }
440
    };
441
  }
442

443
  public createStreamCallMethod(
444
    methodHandler: Function,
445
    isResponseStream: boolean,
446
  ) {
447
    return async (
2✔
448
      call: GrpcCall,
449
      callback: (err: unknown, value: unknown) => void,
450
    ) => {
451
      if (isResponseStream) {
1!
UNCOV
452
        methodHandler(call);
×
453
      } else {
454
        methodHandler(call, callback);
1✔
455
      }
456
    };
457
  }
458

459
  public async close(): Promise<void> {
460
    if (this.grpcClient) {
7!
461
      const graceful = this.getOptionsProp(this.options, 'gracefulShutdown');
7✔
462
      if (graceful) {
7✔
463
        await new Promise<void>((resolve, reject) => {
1✔
464
          this.grpcClient.tryShutdown((error: Error) => {
1✔
465
            if (error) reject(error);
1!
466
            else resolve();
1✔
467
          });
468
        });
469
      } else {
470
        this.grpcClient.forceShutdown();
6✔
471
      }
472
    }
473
    this.grpcClient = null;
7✔
474
  }
475

476
  public deserialize(obj: any): any {
477
    try {
2✔
478
      return JSON.parse(obj);
2✔
479
    } catch (e) {
480
      return obj;
2✔
481
    }
482
  }
483

484
  public addHandler(
485
    pattern: unknown,
486
    callback: MessageHandler,
487
    isEventHandler = false,
1✔
488
  ) {
489
    const route = isString(pattern) ? pattern : JSON.stringify(pattern);
1!
490
    callback.isEventHandler = isEventHandler;
1✔
491
    this.messageHandlers.set(route, callback);
1✔
492
  }
493

494
  public async createClient() {
495
    const channelOptions: ChannelOptions =
496
      this.options && this.options.channelOptions
4✔
497
        ? this.options.channelOptions
4!
498
        : {};
499
    if (this.options && this.options.maxSendMessageLength) {
4!
UNCOV
500
      channelOptions['grpc.max_send_message_length'] =
×
501
        this.options.maxSendMessageLength;
502
    }
503
    if (this.options && this.options.maxReceiveMessageLength) {
4!
UNCOV
504
      channelOptions['grpc.max_receive_message_length'] =
×
505
        this.options.maxReceiveMessageLength;
506
    }
507
    if (this.options && this.options.maxMetadataSize) {
4!
UNCOV
508
      channelOptions['grpc.max_metadata_size'] = this.options.maxMetadataSize;
×
509
    }
510
    const server = new grpcPackage.Server(channelOptions);
4✔
511
    const credentials = this.getOptionsProp(this.options, 'credentials');
4✔
512

513
    await new Promise((resolve, reject) => {
4✔
514
      server.bindAsync(
4✔
515
        this.url,
516
        credentials || grpcPackage.ServerCredentials.createInsecure(),
8✔
517
        (error: Error | null, port: number) =>
518
          error ? reject(error) : resolve(port),
4!
519
      );
520
    });
521

522
    return server;
4✔
523
  }
524

525
  public lookupPackage(root: any, packageName: string) {
526
    /** Reference: https://github.com/kondi/rxjs-grpc */
UNCOV
527
    let pkg = root;
×
UNCOV
528
    for (const name of packageName.split(/\./)) {
×
UNCOV
529
      pkg = pkg[name];
×
530
    }
UNCOV
531
    return pkg;
×
532
  }
533

534
  public loadProto(): any {
535
    try {
1✔
536
      const packageDefinition = getGrpcPackageDefinition(
1✔
537
        this.options,
538
        grpcProtoLoaderPackage,
539
      );
540

541
      if (this.options.onLoadPackageDefinition) {
×
UNCOV
542
        this.options.onLoadPackageDefinition(
×
543
          packageDefinition,
544
          this.grpcClient,
545
        );
546
      }
547

UNCOV
548
      return grpcPackage.loadPackageDefinition(packageDefinition);
×
549
    } catch (err) {
550
      const invalidProtoError = new InvalidProtoDefinitionException(err.path);
1✔
551
      const message =
552
        err && err.message ? err.message : invalidProtoError.message;
1!
553

554
      this.logger.error(message, invalidProtoError.stack);
1✔
555
      throw invalidProtoError;
1✔
556
    }
557
  }
558

559
  /**
560
   * Recursively fetch all of the service methods available on loaded
561
   * protobuf descriptor object, and collect those as an objects with
562
   * dot-syntax full-path names.
563
   *
564
   * Example:
565
   *  for proto package Bundle.FirstService with service Events { rpc...
566
   *  will be resolved to object of (while loaded for Bundle package):
567
   *    {
568
   *      name: "FirstService.Events",
569
   *      service: {Object}
570
   *    }
571
   */
572
  private collectDeepServices(
573
    name: string,
574
    grpcDefinition: any,
575
    accumulator: { name: string; service: any }[],
576
  ) {
577
    if (!isObject(grpcDefinition)) {
8✔
578
      return;
1✔
579
    }
580
    const keysToTraverse = Object.keys(grpcDefinition);
7✔
581
    // Traverse definitions or namespace extensions
582
    for (const key of keysToTraverse) {
7✔
583
      const nameExtended = this.parseDeepServiceName(name, key);
11✔
584
      const deepDefinition = grpcDefinition[key];
11✔
585

586
      const isServiceDefined =
587
        deepDefinition && !isUndefined(deepDefinition.service);
11✔
588
      const isServiceBoolean = isServiceDefined
11✔
589
        ? deepDefinition.service !== false
11✔
590
        : false;
591

592
      // grpc namespace object does not have 'format' or 'service' properties defined
593
      const isFormatDefined =
594
        deepDefinition && !isUndefined(deepDefinition.format);
11✔
595

596
      if (isServiceDefined && isServiceBoolean) {
11✔
597
        accumulator.push({
6✔
598
          name: nameExtended,
599
          service: deepDefinition,
600
        });
601
      } else if (isFormatDefined) {
5!
602
        // Do nothing
603
      } else {
604
        // Continue recursion for namespace object until objects end or service definition found
605
        this.collectDeepServices(nameExtended, deepDefinition, accumulator);
5✔
606
      }
607
    }
608
  }
609

610
  private parseDeepServiceName(name: string, key: string): string {
611
    // If depth is zero then just return key
612
    if (name.length === 0) {
11✔
613
      return key;
7✔
614
    }
615
    // Otherwise add next through dot syntax
616
    return name + '.' + key;
4✔
617
  }
618

619
  private async createServices(grpcPkg: any, packageName: string) {
620
    if (!grpcPkg) {
5✔
621
      const invalidPackageError = new InvalidGrpcPackageException(packageName);
2✔
622
      this.logger.error(invalidPackageError);
2✔
623
      throw invalidPackageError;
2✔
624
    }
625

626
    // Take all of the services defined in grpcPkg and assign them to
627
    // method handlers defined in Controllers
628
    for (const definition of this.getServiceNames(grpcPkg)) {
3✔
629
      this.grpcClient.addService(
4✔
630
        // First parameter requires exact service definition from proto
631
        definition.service.service,
632
        // Here full proto definition required along with namespaced pattern name
633
        await this.createService(definition.service, definition.name),
634
      );
635
    }
636
  }
637
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc