• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

nestjs / nest / 4afa0176-b936-482c-ad88-b635a9db93b4

14 Jul 2025 11:37AM UTC coverage: 88.866% (-0.02%) from 88.886%
4afa0176-b936-482c-ad88-b635a9db93b4

Pull #15386

circleci

kamilmysliwiec
style: address linter warnings
Pull Request #15386: feat: enhance introspection capabilities

2714 of 3431 branches covered (79.1%)

101 of 118 new or added lines in 15 files covered. (85.59%)

12 existing lines in 1 file now uncovered.

7239 of 8146 relevant lines covered (88.87%)

16.53 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

87.84
/packages/microservices/server/server-grpc.ts
1
import {
1✔
2
  isObject,
3
  isString,
4
  isUndefined,
5
} from '@nestjs/common/utils/shared.utils';
6
import {
1✔
7
  EMPTY,
8
  Observable,
9
  ReplaySubject,
10
  Subject,
11
  Subscription,
12
  defaultIfEmpty,
13
  fromEvent,
14
  lastValueFrom,
15
} from 'rxjs';
16
import { catchError, takeUntil } from 'rxjs/operators';
1✔
17
import { GRPC_DEFAULT_PROTO_LOADER, GRPC_DEFAULT_URL } from '../constants';
1✔
18
import { GrpcMethodStreamingType } from '../decorators';
1✔
19
import { Transport } from '../enums';
1✔
20
import { InvalidGrpcPackageException } from '../errors/invalid-grpc-package.exception';
1✔
21
import { InvalidProtoDefinitionException } from '../errors/invalid-proto-definition.exception';
1✔
22
import { ChannelOptions } from '../external/grpc-options.interface';
23
import { getGrpcPackageDefinition } from '../helpers';
1✔
24
import { MessageHandler } from '../interfaces';
25
import {
26
  GrpcOptions,
27
  TransportId,
28
} from '../interfaces/microservice-configuration.interface';
29
import { Server } from './server';
1✔
30

31
const CANCELLED_EVENT = 'cancelled';
1✔
32

33
// To enable type safety for gRPC. This cant be uncommented by default
34
// because it would require the user to install the @grpc/grpc-js package even if they dont use gRPC
35
// Otherwise, TypeScript would fail to compile the code.
36
//
37
// type GrpcServer = import('@grpc/grpc-js').Server;
38
// let grpcPackage = {} as typeof import('@grpc/grpc-js');
39
// let grpcProtoLoaderPackage = {} as typeof import('@grpc/proto-loader');
40

41
type GrpcServer = any;
42
let grpcPackage = {} as any;
1✔
43
let grpcProtoLoaderPackage = {} as any;
1✔
44

45
interface GrpcCall<TRequest = any, TMetadata = any> {
46
  request: TRequest;
47
  metadata: TMetadata;
48
  sendMetadata: Function;
49
  end: Function;
50
  write: Function;
51
  on: Function;
52
  off: Function;
53
  emit: Function;
54
}
55

56
/**
57
 * @publicApi
58
 */
59
export class ServerGrpc extends Server<never, never> {
1✔
60
  public transportId: TransportId = Transport.GRPC;
98✔
61
  protected readonly url: string;
62
  protected grpcClient: GrpcServer;
63

64
  get status(): never {
65
    throw new Error(
×
66
      'The "status" attribute is not supported by the gRPC transport',
67
    );
68
  }
69

70
  constructor(private readonly options: Readonly<GrpcOptions>['options']) {
98✔
71
    super();
98✔
72
    this.url = this.getOptionsProp(options, 'url') || GRPC_DEFAULT_URL;
98✔
73

74
    const protoLoader =
75
      this.getOptionsProp(options, 'protoLoader') || GRPC_DEFAULT_PROTO_LOADER;
98✔
76

77
    grpcPackage = this.loadPackage('@grpc/grpc-js', ServerGrpc.name, () =>
98✔
78
      require('@grpc/grpc-js'),
98✔
79
    );
80
    grpcProtoLoaderPackage = this.loadPackage(
98✔
81
      protoLoader,
82
      ServerGrpc.name,
83
      () =>
84
        protoLoader === GRPC_DEFAULT_PROTO_LOADER
98✔
85
          ? require('@grpc/proto-loader')
98!
86
          : require(protoLoader),
87
    );
88
  }
89

90
  public async listen(
91
    callback: (err?: unknown, ...optionalParams: unknown[]) => void,
92
  ) {
93
    try {
5✔
94
      this.grpcClient = await this.createClient();
5✔
95
      await this.start(callback);
5✔
96
    } catch (err) {
97
      callback(err);
1✔
98
    }
99
  }
100

101
  public async start(callback?: () => void) {
102
    await this.bindEvents();
4✔
103
    callback?.();
4✔
104
  }
105

106
  public async bindEvents() {
107
    const grpcContext = this.loadProto();
4✔
108
    const packageOption = this.getOptionsProp(this.options, 'package');
4✔
109
    const packageNames = Array.isArray(packageOption)
4✔
110
      ? packageOption
4✔
111
      : [packageOption];
112

113
    for (const packageName of packageNames) {
4✔
114
      const grpcPkg = this.lookupPackage(grpcContext, packageName);
5✔
115
      await this.createServices(grpcPkg, packageName);
5✔
116
    }
117
  }
118

119
  /**
120
   * Will return all of the services along with their fully namespaced
121
   * names as an array of objects.
122
   * This method initiates recursive scan of grpcPkg object
123
   */
124
  public getServiceNames(grpcPkg: any): { name: string; service: any }[] {
125
    // Define accumulator to collect all of the services available to load
126
    const services: { name: string; service: any }[] = [];
3✔
127
    // Initiate recursive services collector starting with empty name
128
    this.collectDeepServices('', grpcPkg, services);
3✔
129
    return services;
3✔
130
  }
131

132
  /**
133
   * Will create service mapping from gRPC generated Object to handlers
134
   * defined with @GrpcMethod or @GrpcStreamMethod annotations
135
   *
136
   * @param grpcService
137
   * @param name
138
   */
139
  public async createService(grpcService: any, name: string) {
140
    const service = {};
7✔
141

142
    for (const methodName in grpcService.prototype) {
7✔
143
      let methodHandler: MessageHandler | null = null;
4✔
144
      let streamingType = GrpcMethodStreamingType.NO_STREAMING;
4✔
145

146
      const methodFunction = grpcService.prototype[methodName];
4✔
147
      const methodReqStreaming = methodFunction.requestStream;
4✔
148

149
      if (!isUndefined(methodReqStreaming) && methodReqStreaming) {
4✔
150
        // Try first pattern to be presented, RX streaming pattern would be
151
        // a preferable pattern to select among a few defined
152
        methodHandler = this.getMessageHandler(
2✔
153
          name,
154
          methodName,
155
          GrpcMethodStreamingType.RX_STREAMING,
156
          methodFunction,
157
        );
158
        streamingType = GrpcMethodStreamingType.RX_STREAMING;
2✔
159
        // If first pattern didn't match to any of handlers then try
160
        // pass-through handler to be presented
161
        if (!methodHandler) {
2✔
162
          methodHandler = this.getMessageHandler(
1✔
163
            name,
164
            methodName,
165
            GrpcMethodStreamingType.PT_STREAMING,
166
            methodFunction,
167
          );
168
          streamingType = GrpcMethodStreamingType.PT_STREAMING;
1✔
169
        }
170
      } else {
171
        // Select handler if any presented for No-Streaming pattern
172
        methodHandler = this.getMessageHandler(
2✔
173
          name,
174
          methodName,
175
          GrpcMethodStreamingType.NO_STREAMING,
176
          methodFunction,
177
        );
178
        streamingType = GrpcMethodStreamingType.NO_STREAMING;
2✔
179
      }
180
      if (!methodHandler) {
4✔
181
        continue;
1✔
182
      }
183

184
      Object.defineProperty(methodHandler, 'name', {
3✔
185
        value: methodName,
186
        writable: false,
187
      });
188
      service[methodName] = this.createServiceMethod(
3✔
189
        methodHandler,
190
        grpcService.prototype[methodName],
191
        streamingType,
192
      );
193
    }
194
    return service;
7✔
195
  }
196

197
  public getMessageHandler(
198
    serviceName: string,
199
    methodName: string,
200
    streaming: GrpcMethodStreamingType,
201
    grpcMethod: { path?: string },
202
  ): MessageHandler {
203
    let pattern = this.createPattern(serviceName, methodName, streaming);
8✔
204
    let methodHandler = this.messageHandlers.get(pattern)!;
8✔
205
    if (!methodHandler) {
8✔
206
      const packageServiceName = grpcMethod.path?.split?.('/')[1];
5✔
207
      pattern = this.createPattern(packageServiceName!, methodName, streaming);
5✔
208
      methodHandler = this.messageHandlers.get(pattern)!;
5✔
209
    }
210
    return methodHandler;
8✔
211
  }
212

213
  /**
214
   * Will create a string of a JSON serialized format
215
   *
216
   * @param service name of the service which should be a match to gRPC service definition name
217
   * @param methodName name of the method which is coming after rpc keyword
218
   * @param streaming GrpcMethodStreamingType parameter which should correspond to
219
   * stream keyword in gRPC service request part
220
   */
221
  public createPattern(
222
    service: string,
223
    methodName: string,
224
    streaming: GrpcMethodStreamingType,
225
  ): string {
226
    return JSON.stringify({
9✔
227
      service,
228
      rpc: methodName,
229
      streaming,
230
    });
231
  }
232

233
  /**
234
   * Will return async function which will handle gRPC call
235
   * with Rx streams or as a direct call passthrough
236
   *
237
   * @param methodHandler
238
   * @param protoNativeHandler
239
   * @param streamType
240
   */
241
  public createServiceMethod(
242
    methodHandler: Function,
243
    protoNativeHandler: any,
244
    streamType: GrpcMethodStreamingType,
245
  ): Function {
246
    // If proto handler has request stream as "true" then we expect it to have
247
    // streaming from the side of requester
248
    if (protoNativeHandler.requestStream) {
4✔
249
      // If any handlers were defined with GrpcStreamMethod annotation use RX
250
      if (streamType === GrpcMethodStreamingType.RX_STREAMING) {
2✔
251
        return this.createRequestStreamMethod(
1✔
252
          methodHandler,
253
          protoNativeHandler.responseStream,
254
        );
255
      }
256
      // If any handlers were defined with GrpcStreamCall annotation
257
      else if (streamType === GrpcMethodStreamingType.PT_STREAMING) {
1!
258
        return this.createStreamCallMethod(
1✔
259
          methodHandler,
260
          protoNativeHandler.responseStream,
261
        );
262
      }
263
    }
264
    return protoNativeHandler.responseStream
2✔
265
      ? this.createStreamServiceMethod(methodHandler)
2✔
266
      : this.createUnaryServiceMethod(methodHandler);
267
  }
268

269
  public createUnaryServiceMethod(methodHandler: Function): Function {
270
    return async (call: GrpcCall, callback: Function) => {
4✔
271
      return this.onProcessingStartHook(
2✔
272
        this.transportId,
273
        { ...call, operationId: methodHandler.name } as any,
274
        async () => {
275
          const handler = methodHandler(call.request, call.metadata, call);
2✔
276
          this.transformToObservable(await handler).subscribe({
2✔
277
            next: async data => callback(null, await data),
2✔
NEW
278
            error: (err: any) => callback(err),
×
279
            complete: () => {
280
              this.onProcessingEndHook?.(this.transportId, call.request);
1✔
281
            },
282
          });
283
        },
284
      );
285
    };
286
  }
287

288
  public createStreamServiceMethod(methodHandler: Function): Function {
289
    return async (call: GrpcCall, callback: Function) => {
5✔
290
      return this.onProcessingStartHook(
3✔
291
        this.transportId,
292
        { ...call, operationId: methodHandler.name } as any,
293
        async () => {
294
          const handler = methodHandler(call.request, call.metadata, call);
3✔
295
          const result$ = this.transformToObservable(await handler);
3✔
296
          await this.writeObservableToGrpc(result$, call);
3✔
297

298
          this.onProcessingEndHook?.(this.transportId, call.request);
3✔
299
        },
300
      );
301
    };
302
  }
303

304
  public unwrap<T>(): T {
305
    throw new Error('Method is not supported for gRPC transport');
×
306
  }
307

308
  public on<
309
    EventKey extends string | number | symbol = string | number | symbol,
310
    EventCallback = any,
311
  >(event: EventKey, callback: EventCallback) {
312
    throw new Error('Method is not supported in gRPC mode.');
×
313
  }
314

315
  /**
316
   * Writes an observable to a GRPC call.
317
   *
318
   * This function will ensure that backpressure is managed while writing values
319
   * that come from an observable to a GRPC call.
320
   *
321
   * @param source The observable we want to write out to the GRPC call.
322
   * @param call The GRPC call we want to write to.
323
   * @returns A promise that resolves when we're done writing to the call.
324
   */
325
  private writeObservableToGrpc<T>(
326
    source: Observable<T>,
327
    call: GrpcCall<T>,
328
  ): Promise<void> {
329
    // This promise should **not** reject, as we're handling errors in the observable for the Call
330
    // the promise is only needed to signal when writing/draining has been completed
331
    return new Promise((resolve, _doNotUse) => {
9✔
332
      const valuesWaitingToBeDrained: T[] = [];
9✔
333
      let shouldErrorAfterDraining = false;
9✔
334
      let error: any;
335
      let shouldResolveAfterDraining = false;
9✔
336
      let writing = true;
9✔
337

338
      // Used to manage finalization
339
      const subscription = new Subscription();
9✔
340

341
      // If the call is cancelled, unsubscribe from the source
342
      const cancelHandler = () => {
9✔
343
        subscription.unsubscribe();
2✔
344
        // Calls that are cancelled by the client should be successfully resolved here
345
        resolve();
2✔
346
      };
347
      call.on(CANCELLED_EVENT, cancelHandler);
9✔
348
      subscription.add(() => call.off(CANCELLED_EVENT, cancelHandler));
9✔
349

350
      // In all cases, when we finalize, end the writable stream
351
      // being careful that errors and writes must be emitted _before_ this call is ended
352
      subscription.add(() => call.end());
9✔
353

354
      const drain = () => {
9✔
355
        writing = true;
9✔
356
        while (valuesWaitingToBeDrained.length > 0) {
9✔
357
          const value = valuesWaitingToBeDrained.shift();
11✔
358
          if (writing) {
11!
359
            // The first time `call.write` returns false, we need to stop.
360
            // It wrote the value, but it won't write anything else.
361
            writing = call.write(value);
11✔
362
            if (!writing) {
11✔
363
              // We can't write anymore so we need to wait for the drain event
364
              return;
3✔
365
            }
366
          }
367
        }
368

369
        if (shouldResolveAfterDraining) {
6✔
370
          subscription.unsubscribe();
1✔
371
          resolve();
1✔
372
        } else if (shouldErrorAfterDraining) {
5✔
373
          call.emit('error', error);
1✔
374
          subscription.unsubscribe();
1✔
375
          resolve();
1✔
376
        }
377
      };
378

379
      call.on('drain', drain);
9✔
380
      subscription.add(() => call.off('drain', drain));
9✔
381

382
      subscription.add(
9✔
383
        source.subscribe({
384
          next(value) {
385
            if (writing) {
26✔
386
              writing = call.write(value);
15✔
387
            } else {
388
              // If we can't write, that's because we need to
389
              // wait for the drain event before we can write again
390
              // buffer the value and wait for the drain event
391
              valuesWaitingToBeDrained.push(value);
11✔
392
            }
393
          },
394
          error(err) {
395
            if (valuesWaitingToBeDrained.length === 0) {
2✔
396
              // We're not waiting for a drain event, so we can just
397
              // reject and teardown.
398
              call.emit('error', err);
1✔
399
              subscription.unsubscribe();
1✔
400
              resolve();
1✔
401
            } else {
402
              // We're waiting for a drain event, record the
403
              // error so it can be handled after everything is drained.
404
              shouldErrorAfterDraining = true;
1✔
405
              error = err;
1✔
406
            }
407
          },
408
          complete() {
409
            if (valuesWaitingToBeDrained.length === 0) {
5✔
410
              // We're not waiting for a drain event, so we can just
411
              // resolve and teardown.
412
              subscription.unsubscribe();
4✔
413
              resolve();
4✔
414
            } else {
415
              shouldResolveAfterDraining = true;
1✔
416
            }
417
          },
418
        }),
419
      );
420
    });
421
  }
422

423
  public createRequestStreamMethod(
424
    methodHandler: Function,
425
    isResponseStream: boolean,
426
  ) {
427
    return async (
11✔
428
      call: GrpcCall,
429
      callback: (err: unknown, value: unknown) => void,
430
    ) => {
431
      return this.onProcessingStartHook(
10✔
432
        this.transportId,
433
        { ...call, operationId: methodHandler.name } as any,
434
        async () => {
435
          // Needs to be a Proxy in order to buffer messages that come before handler is executed
436
          // This could happen if handler has any async guards or interceptors registered that would delay
437
          // the execution.
438
          const { subject, next, error, complete, cleanup } =
439
            this.bufferUntilDrained();
10✔
440
          call.on('data', (m: any) => next(m));
10✔
441
          call.on('error', (e: any) => {
10✔
442
            // Check if error means that stream ended on other end
443
            const isCancelledError = String(e)
5✔
444
              .toLowerCase()
445
              .indexOf('cancelled');
446

447
            if (isCancelledError) {
5!
448
              call.end();
5✔
449
              return;
5✔
450
            }
451
            // If another error then just pass it along
NEW
452
            error(e);
×
453
          });
454
          call.on('end', () => {
10✔
455
            complete();
5✔
456
            cleanup();
5✔
457

458
            this.onProcessingEndHook?.(this.transportId, call.request);
5✔
459
          });
460

461
          const handler = methodHandler(
10✔
462
            subject.asObservable(),
463
            call.metadata,
464
            call,
465
          );
466
          const res = this.transformToObservable(await handler);
10✔
467
          if (isResponseStream) {
10✔
468
            await this.writeObservableToGrpc(res, call);
6✔
469
          } else {
470
            const response = await lastValueFrom(
4✔
471
              res.pipe(
472
                takeUntil(fromEvent(call as any, CANCELLED_EVENT)),
473
                catchError(err => {
474
                  callback(err, null);
1✔
475
                  return EMPTY;
1✔
476
                }),
477
                defaultIfEmpty(undefined),
478
              ),
479
            );
480

481
            if (!isUndefined(response)) {
4✔
482
              callback(null, response);
1✔
483
            }
484
          }
485
        },
486
      );
487
    };
488
  }
489

490
  public createStreamCallMethod(
491
    methodHandler: Function,
492
    isResponseStream: boolean,
493
  ) {
494
    return async (
2✔
495
      call: GrpcCall,
496
      callback: (err: unknown, value: unknown) => void,
497
    ) => {
498
      return this.onProcessingStartHook(
1✔
499
        this.transportId,
500
        { ...call, operationId: methodHandler.name } as any,
501
        async () => {
502
          let handlerStream: Observable<any>;
503
          if (isResponseStream) {
1!
NEW
504
            handlerStream = this.transformToObservable(
×
505
              await methodHandler(call),
506
            );
507
          } else {
508
            handlerStream = this.transformToObservable(
1✔
509
              await methodHandler(call, callback),
510
            );
511
          }
512
          await lastValueFrom(handlerStream).finally(() => {
1✔
513
            this.onProcessingEndHook?.(this.transportId, call.request);
1✔
514
          });
515
        },
516
      );
517
    };
518
  }
519

520
  public async close(): Promise<void> {
521
    if (this.grpcClient) {
7!
522
      const graceful = this.getOptionsProp(this.options, 'gracefulShutdown');
7✔
523
      if (graceful) {
7✔
524
        await new Promise<void>((resolve, reject) => {
1✔
525
          this.grpcClient.tryShutdown((error: Error) => {
1✔
526
            if (error) reject(error);
1!
527
            else resolve();
1✔
528
          });
529
        });
530
      } else {
531
        this.grpcClient.forceShutdown();
6✔
532
      }
533
    }
534
    this.grpcClient = null;
7✔
535
  }
536

537
  public deserialize(obj: any): any {
538
    try {
2✔
539
      return JSON.parse(obj);
2✔
540
    } catch (e) {
541
      return obj;
2✔
542
    }
543
  }
544

545
  public addHandler(
546
    pattern: unknown,
547
    callback: MessageHandler,
548
    isEventHandler = false,
1✔
549
  ) {
550
    const route = isString(pattern) ? pattern : JSON.stringify(pattern);
1!
551
    callback.isEventHandler = isEventHandler;
1✔
552
    this.messageHandlers.set(route, callback);
1✔
553
  }
554

555
  public async createClient() {
556
    const channelOptions: ChannelOptions =
557
      this.options && this.options.channelOptions
4✔
558
        ? this.options.channelOptions
4!
559
        : {};
560
    if (this.options && this.options.maxSendMessageLength) {
4!
561
      channelOptions['grpc.max_send_message_length'] =
×
562
        this.options.maxSendMessageLength;
563
    }
564
    if (this.options && this.options.maxReceiveMessageLength) {
4!
565
      channelOptions['grpc.max_receive_message_length'] =
×
566
        this.options.maxReceiveMessageLength;
567
    }
568
    if (this.options && this.options.maxMetadataSize) {
4!
569
      channelOptions['grpc.max_metadata_size'] = this.options.maxMetadataSize;
×
570
    }
571
    const server = new grpcPackage.Server(channelOptions);
4✔
572
    const credentials = this.getOptionsProp(this.options, 'credentials');
4✔
573

574
    await new Promise((resolve, reject) => {
4✔
575
      server.bindAsync(
4✔
576
        this.url,
577
        credentials || grpcPackage.ServerCredentials.createInsecure(),
8✔
578
        (error: Error | null, port: number) =>
579
          error ? reject(error) : resolve(port),
4!
580
      );
581
    });
582

583
    return server;
4✔
584
  }
585

586
  public lookupPackage(root: any, packageName: string) {
587
    /** Reference: https://github.com/kondi/rxjs-grpc */
588
    let pkg = root;
×
589
    for (const name of packageName.split(/\./)) {
×
590
      pkg = pkg[name];
×
591
    }
592
    return pkg;
×
593
  }
594

595
  public loadProto(): any {
596
    try {
1✔
597
      const packageDefinition = getGrpcPackageDefinition(
1✔
598
        this.options,
599
        grpcProtoLoaderPackage,
600
      );
601

602
      if (this.options.onLoadPackageDefinition) {
×
603
        this.options.onLoadPackageDefinition(
×
604
          packageDefinition,
605
          this.grpcClient,
606
        );
607
      }
608

609
      return grpcPackage.loadPackageDefinition(packageDefinition);
×
610
    } catch (err) {
611
      const invalidProtoError = new InvalidProtoDefinitionException(err.path);
1✔
612
      const message =
613
        err && err.message ? err.message : invalidProtoError.message;
1!
614

615
      this.logger.error(message, invalidProtoError.stack);
1✔
616
      throw invalidProtoError;
1✔
617
    }
618
  }
619

620
  /**
621
   * Recursively fetch all of the service methods available on loaded
622
   * protobuf descriptor object, and collect those as an objects with
623
   * dot-syntax full-path names.
624
   *
625
   * Example:
626
   *  for proto package Bundle.FirstService with service Events { rpc...
627
   *  will be resolved to object of (while loaded for Bundle package):
628
   *    {
629
   *      name: "FirstService.Events",
630
   *      service: {Object}
631
   *    }
632
   */
633
  private collectDeepServices(
634
    name: string,
635
    grpcDefinition: any,
636
    accumulator: { name: string; service: any }[],
637
  ) {
638
    if (!isObject(grpcDefinition)) {
8✔
639
      return;
1✔
640
    }
641
    const keysToTraverse = Object.keys(grpcDefinition);
7✔
642
    // Traverse definitions or namespace extensions
643
    for (const key of keysToTraverse) {
7✔
644
      const nameExtended = this.parseDeepServiceName(name, key);
11✔
645
      const deepDefinition = grpcDefinition[key];
11✔
646

647
      const isServiceDefined =
648
        deepDefinition && !isUndefined(deepDefinition.service);
11✔
649
      const isServiceBoolean = isServiceDefined
11✔
650
        ? deepDefinition.service !== false
11✔
651
        : false;
652

653
      // grpc namespace object does not have 'format' or 'service' properties defined
654
      const isFormatDefined =
655
        deepDefinition && !isUndefined(deepDefinition.format);
11✔
656

657
      if (isServiceDefined && isServiceBoolean) {
11✔
658
        accumulator.push({
6✔
659
          name: nameExtended,
660
          service: deepDefinition,
661
        });
662
      } else if (isFormatDefined) {
5!
663
        // Do nothing
664
      } else {
665
        // Continue recursion for namespace object until objects end or service definition found
666
        this.collectDeepServices(nameExtended, deepDefinition, accumulator);
5✔
667
      }
668
    }
669
  }
670

671
  private parseDeepServiceName(name: string, key: string): string {
672
    // If depth is zero then just return key
673
    if (name.length === 0) {
11✔
674
      return key;
7✔
675
    }
676
    // Otherwise add next through dot syntax
677
    return name + '.' + key;
4✔
678
  }
679

680
  private async createServices(grpcPkg: any, packageName: string) {
681
    if (!grpcPkg) {
5✔
682
      const invalidPackageError = new InvalidGrpcPackageException(packageName);
2✔
683
      this.logger.error(invalidPackageError);
2✔
684
      throw invalidPackageError;
2✔
685
    }
686

687
    // Take all of the services defined in grpcPkg and assign them to
688
    // method handlers defined in Controllers
689
    for (const definition of this.getServiceNames(grpcPkg)) {
3✔
690
      this.grpcClient.addService(
4✔
691
        // First parameter requires exact service definition from proto
692
        definition.service.service,
693
        // Here full proto definition required along with namespaced pattern name
694
        await this.createService(definition.service, definition.name),
695
      );
696
    }
697
  }
698

699
  private bufferUntilDrained<T>() {
700
    type DrainableSubject<T> = Subject<T> & { drainBuffer: () => void };
701

702
    const subject = new Subject<T>();
10✔
703
    let replayBuffer: ReplaySubject<T> | null = new ReplaySubject<T>();
10✔
704
    let hasDrained = false;
10✔
705

706
    function drainBuffer(this: DrainableSubject<T>) {
707
      if (hasDrained || !replayBuffer) {
×
708
        return;
×
709
      }
710
      hasDrained = true;
×
711

712
      // Replay buffered values to the new subscriber
713
      setImmediate(() => {
×
714
        const subcription = replayBuffer!.subscribe(subject);
×
715
        subcription.unsubscribe();
×
716
        replayBuffer = null;
×
717
      });
718
    }
719

720
    return {
10✔
721
      subject: new Proxy<DrainableSubject<T>>(subject as DrainableSubject<T>, {
722
        get(target, prop, receiver) {
723
          if (prop === 'asObservable') {
10!
724
            return () => {
10✔
725
              const stream = subject.asObservable();
10✔
726

727
              // "drainBuffer" will be called before the evaluation of the handler
728
              // but after any enhancers have been applied (e.g., `interceptors`)
729
              Object.defineProperty(stream, drainBuffer.name, {
10✔
730
                value: drainBuffer,
731
              });
732
              return stream;
10✔
733
            };
734
          }
735
          if (hasDrained) {
×
736
            return Reflect.get(target, prop, receiver);
×
737
          }
738
          return Reflect.get(replayBuffer!, prop, receiver);
×
739
        },
740
      }),
741
      next: (value: T) => {
742
        if (!hasDrained) {
5!
743
          replayBuffer!.next(value);
5✔
744
        }
745
        subject.next(value);
5✔
746
      },
747
      error: (err: any) => {
748
        if (!hasDrained) {
×
749
          replayBuffer!.error(err);
×
750
        }
751
        subject.error(err);
×
752
      },
753
      complete: () => {
754
        if (!hasDrained) {
5!
755
          replayBuffer!.complete();
5✔
756
          // Replay buffer is no longer needed
757
          // Return early to allow subject to complete later, after the replay buffer
758
          // has been drained
759
          return;
5✔
760
        }
761
        subject.complete();
×
762
      },
763
      cleanup: () => {
764
        if (hasDrained) {
5!
765
          return;
×
766
        }
767
        replayBuffer = null;
5✔
768
      },
769
    };
770
  }
771
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc