• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

nestjs / nest / 4afa0176-b936-482c-ad88-b635a9db93b4

14 Jul 2025 11:37AM UTC coverage: 88.866% (-0.02%) from 88.886%
4afa0176-b936-482c-ad88-b635a9db93b4

Pull #15386

circleci

kamilmysliwiec
style: address linter warnings
Pull Request #15386: feat: enhance introspection capabilities

2714 of 3431 branches covered (79.1%)

101 of 118 new or added lines in 15 files covered. (85.59%)

12 existing lines in 1 file now uncovered.

7239 of 8146 relevant lines covered (88.87%)

16.53 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

89.61
/packages/microservices/server/server.ts
1
import { Logger, LoggerService } from '@nestjs/common/services/logger.service';
1✔
2
import { loadPackage } from '@nestjs/common/utils/load-package.util';
1✔
3
import {
1✔
4
  connectable,
5
  EMPTY,
6
  from as fromPromise,
7
  isObservable,
8
  Observable,
9
  ObservedValueOf,
10
  of,
11
  ReplaySubject,
12
  Subject,
13
  Subscription,
14
} from 'rxjs';
15
import {
1✔
16
  catchError,
17
  distinctUntilChanged,
18
  finalize,
19
  mergeMap,
20
} from 'rxjs/operators';
21
import { NO_EVENT_HANDLER } from '../constants';
1✔
22
import { BaseRpcContext } from '../ctx-host/base-rpc.context';
23
import { IncomingRequestDeserializer } from '../deserializers/incoming-request.deserializer';
1✔
24
import { Transport } from '../enums';
25
import {
26
  ClientOptions,
27
  KafkaOptions,
28
  MessageHandler,
29
  MicroserviceOptions,
30
  MqttOptions,
31
  MsPattern,
32
  NatsOptions,
33
  ReadPacket,
34
  RedisOptions,
35
  RmqOptions,
36
  TcpOptions,
37
  WritePacket,
38
} from '../interfaces';
39
import { ConsumerDeserializer } from '../interfaces/deserializer.interface';
40
import { ConsumerSerializer } from '../interfaces/serializer.interface';
41
import { IdentitySerializer } from '../serializers/identity.serializer';
1✔
42
import { transformPatternToRoute } from '../utils';
1✔
43

44
/**
45
 * @publicApi
46
 */
47
export abstract class Server<
1✔
48
  EventsMap extends Record<string, Function> = Record<string, Function>,
49
  Status extends string = string,
50
> {
51
  /**
52
   * Unique transport identifier.
53
   */
54
  public transportId?: Transport | symbol;
55

56
  protected readonly messageHandlers = new Map<string, MessageHandler>();
246✔
57
  protected readonly logger: LoggerService = new Logger(Server.name);
246✔
58
  protected serializer: ConsumerSerializer;
59
  protected deserializer: ConsumerDeserializer;
60
  protected onProcessingStartHook: (
246✔
61
    transportId: Transport | symbol,
62
    context: BaseRpcContext,
63
    done: () => Promise<any>,
64
  ) => void = (
65
    transportId: Transport | symbol,
66
    context: BaseRpcContext,
67
    done: () => Promise<any>,
68
  ) => done();
31✔
69
  protected onProcessingEndHook: (
70
    transportId: Transport | symbol,
71
    context: BaseRpcContext,
72
  ) => void;
73
  protected _status$ = new ReplaySubject<Status>(1);
246✔
74

75
  /**
76
   * Returns an observable that emits status changes.
77
   */
78
  public get status(): Observable<Status> {
79
    return this._status$.asObservable().pipe(distinctUntilChanged());
×
80
  }
81

82
  /**
83
   * Registers an event listener for the given event.
84
   * @param event Event name
85
   * @param callback Callback to be executed when the event is emitted
86
   */
87
  public abstract on<
88
    EventKey extends keyof EventsMap = keyof EventsMap,
89
    EventCallback extends EventsMap[EventKey] = EventsMap[EventKey],
90
  >(event: EventKey, callback: EventCallback): any;
91

92
  /**
93
   * Returns an instance of the underlying server/broker instance,
94
   * or a group of servers if there are more than one.
95
   */
96
  public abstract unwrap<T>(): T;
97

98
  /**
99
   * Method called when server is being initialized.
100
   * @param callback Function to be called upon initialization
101
   */
102
  public abstract listen(callback: (...optionalParams: unknown[]) => any): any;
103

104
  /**
105
   * Method called when server is being terminated.
106
   */
107
  public abstract close(): any;
108

109
  /**
110
   * Sets the transport identifier.
111
   * @param transportId Unique transport identifier.
112
   */
113
  public setTransportId(transportId: Transport | symbol): void {
114
    this.transportId = transportId;
6✔
115
  }
116

117
  /**
118
   * Sets a hook that will be called when processing starts.
119
   */
120
  public setOnProcessingStartHook(
121
    hook: (
122
      transportId: Transport | symbol,
123
      context: unknown,
124
      done: () => Promise<any>,
125
    ) => void,
126
  ): void {
NEW
127
    this.onProcessingStartHook = hook;
×
128
  }
129

130
  /**
131
   * Sets a hook that will be called when processing ends.
132
   */
133
  public setOnProcessingEndHook(
134
    hook: (transportId: Transport | symbol, context: unknown) => void,
135
  ): void {
NEW
136
    this.onProcessingEndHook = hook;
×
137
  }
138

139
  public addHandler(
140
    pattern: any,
141
    callback: MessageHandler,
142
    isEventHandler = false,
1✔
143
    extras: Record<string, any> = {},
2✔
144
  ) {
145
    const normalizedPattern = this.normalizePattern(pattern);
2✔
146
    callback.isEventHandler = isEventHandler;
2✔
147
    callback.extras = extras;
2✔
148

149
    if (this.messageHandlers.has(normalizedPattern) && isEventHandler) {
2✔
150
      const headRef = this.messageHandlers.get(normalizedPattern)!;
1✔
151
      const getTail = (handler: MessageHandler) =>
1✔
152
        handler?.next ? getTail(handler.next) : handler;
2✔
153

154
      const tailRef = getTail(headRef);
1✔
155
      tailRef.next = callback;
1✔
156
    } else {
157
      this.messageHandlers.set(normalizedPattern, callback);
1✔
158
    }
159
  }
160

161
  public getHandlers(): Map<string, MessageHandler> {
162
    return this.messageHandlers;
1✔
163
  }
164

165
  public getHandlerByPattern(pattern: string): MessageHandler | null {
166
    const route = this.getRouteFromPattern(pattern);
35✔
167
    return this.messageHandlers.has(route)
35✔
168
      ? this.messageHandlers.get(route)!
35✔
169
      : null;
170
  }
171

172
  public send(
173
    stream$: Observable<any>,
174
    respond: (data: WritePacket) => Promise<unknown> | void,
175
  ): Subscription {
176
    const dataQueue: WritePacket[] = [];
9✔
177
    let isProcessing = false;
9✔
178
    const scheduleOnNextTick = (data: WritePacket) => {
9✔
179
      if (data.isDisposed && dataQueue.length > 0) {
18✔
180
        dataQueue[dataQueue.length - 1].isDisposed = true;
9✔
181
      } else {
182
        dataQueue.push(data);
9✔
183
      }
184
      if (!isProcessing) {
18✔
185
        isProcessing = true;
9✔
186
        process.nextTick(async () => {
9✔
187
          while (dataQueue.length > 0) {
9✔
188
            const packet = dataQueue.shift();
9✔
189
            if (packet) {
9!
190
              await respond(packet);
9✔
191
            }
192
          }
193
          isProcessing = false;
9✔
194
        });
195
      }
196
    };
197
    return stream$
9✔
198
      .pipe(
199
        catchError((err: any) => {
200
          scheduleOnNextTick({ err });
1✔
201
          return EMPTY;
1✔
202
        }),
203
        finalize(() => scheduleOnNextTick({ isDisposed: true })),
9✔
204
      )
205
      .subscribe((response: any) => scheduleOnNextTick({ response }));
8✔
206
  }
207

208
  public async handleEvent(
209
    pattern: string,
210
    packet: ReadPacket,
211
    context: BaseRpcContext,
212
  ): Promise<any> {
213
    const handler = this.getHandlerByPattern(pattern);
11✔
214
    if (!handler) {
11✔
215
      return this.logger.error(NO_EVENT_HANDLER`${pattern}`);
6✔
216
    }
217
    return this.onProcessingStartHook(this.transportId!, context, async () => {
5✔
218
      const resultOrStream = await handler(packet.data, context);
5✔
219
      if (isObservable(resultOrStream)) {
5!
NEW
220
        const connectableSource = connectable(
×
221
          resultOrStream.pipe(
222
            finalize(() =>
NEW
223
              this.onProcessingEndHook?.(this.transportId!, context),
×
224
            ),
225
          ),
226
          {
NEW
227
            connector: () => new Subject(),
×
228
            resetOnDisconnect: false,
229
          },
230
        );
NEW
231
        connectableSource.connect();
×
232
      } else {
233
        this.onProcessingEndHook?.(this.transportId!, context);
5✔
234
      }
235
    });
236
  }
237

238
  public transformToObservable<T>(
239
    resultOrDeferred: Observable<T> | Promise<T>,
240
  ): Observable<T>;
241
  public transformToObservable<T>(
242
    resultOrDeferred: T,
243
  ): never extends Observable<ObservedValueOf<T>>
244
    ? Observable<T>
245
    : Observable<ObservedValueOf<T>>;
246
  public transformToObservable(resultOrDeferred: any) {
247
    if (resultOrDeferred instanceof Promise) {
27✔
248
      return fromPromise(resultOrDeferred).pipe(
1✔
249
        mergeMap(val => (isObservable(val) ? val : of(val))),
1!
250
      );
251
    }
252

253
    if (isObservable(resultOrDeferred)) {
26✔
254
      return resultOrDeferred;
10✔
255
    }
256

257
    return of(resultOrDeferred);
16✔
258
  }
259

260
  public getOptionsProp<
261
    Options extends MicroserviceOptions['options'],
262
    Attribute extends keyof Options,
263
  >(obj: Options, prop: Attribute): Options[Attribute];
264
  public getOptionsProp<
265
    Options extends MicroserviceOptions['options'],
266
    Attribute extends keyof Options,
267
    DefaultValue extends Options[Attribute] = Options[Attribute],
268
  >(
269
    obj: Options,
270
    prop: Attribute,
271
    defaultValue: DefaultValue,
272
  ): Required<Options>[Attribute];
273
  public getOptionsProp<
274
    Options extends MicroserviceOptions['options'],
275
    Attribute extends keyof Options,
276
    DefaultValue extends Options[Attribute] = Options[Attribute],
277
  >(
278
    obj: Options,
279
    prop: Attribute,
280
    defaultValue: DefaultValue = undefined as DefaultValue,
326✔
281
  ) {
282
    return obj && prop in obj ? (obj as any)[prop] : defaultValue;
523✔
283
  }
284

285
  protected handleError(error: string) {
286
    this.logger.error(error);
×
287
  }
288

289
  protected loadPackage<T = any>(
290
    name: string,
291
    ctx: string,
292
    loader?: Function,
293
  ): T {
294
    return loadPackage(name, ctx, loader);
346✔
295
  }
296

297
  protected initializeSerializer(options: ClientOptions['options']) {
298
    this.serializer =
43✔
299
      (options &&
120✔
300
        (options as
301
          | RedisOptions['options']
302
          | NatsOptions['options']
303
          | MqttOptions['options']
304
          | TcpOptions['options']
305
          | RmqOptions['options']
306
          | KafkaOptions['options'])!.serializer) ||
307
      new IdentitySerializer();
308
  }
309

310
  protected initializeDeserializer(options: ClientOptions['options']) {
311
    this.deserializer =
92✔
312
      (options! &&
263✔
313
        (options as
314
          | RedisOptions['options']
315
          | NatsOptions['options']
316
          | MqttOptions['options']
317
          | TcpOptions['options']
318
          | RmqOptions['options']
319
          | KafkaOptions['options'])!.deserializer) ||
320
      new IncomingRequestDeserializer();
321
  }
322

323
  /**
324
   * Transforms the server Pattern to valid type and returns a route for him.
325
   *
326
   * @param  {string} pattern - server pattern
327
   * @returns string
328
   */
329
  protected getRouteFromPattern(pattern: string): string {
330
    let validPattern: MsPattern;
331

332
    try {
39✔
333
      validPattern = JSON.parse(pattern);
39✔
334
    } catch (error) {
335
      // Uses a fundamental object (`pattern` variable without any conversion)
336
      validPattern = pattern;
38✔
337
    }
338
    return this.normalizePattern(validPattern);
39✔
339
  }
340

341
  protected normalizePattern(pattern: MsPattern): string {
342
    return transformPatternToRoute(pattern);
37✔
343
  }
344
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc