• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

nestjs / nest / f7142b24-e394-4028-89c6-717e281431d1

25 Nov 2024 01:12PM UTC coverage: 90.141% (-1.8%) from 91.989%
f7142b24-e394-4028-89c6-717e281431d1

Pull #14177

circleci

web-flow
Merge pull request #14200 from nestjs/feat/allow-queue-per-handler

feat(microservices): support nats queue per handler
Pull Request #14177: release: version 11.0.0

2612 of 3236 branches covered (80.72%)

496 of 704 new or added lines in 48 files covered. (70.45%)

79 existing lines in 11 files now uncovered.

6985 of 7749 relevant lines covered (90.14%)

16.31 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

90.91
/packages/microservices/server/server.ts
1
import { Logger, LoggerService } from '@nestjs/common/services/logger.service';
1✔
2
import { loadPackage } from '@nestjs/common/utils/load-package.util';
1✔
3
import {
1✔
4
  connectable,
5
  EMPTY,
6
  from as fromPromise,
7
  isObservable,
8
  Observable,
9
  ObservedValueOf,
10
  of,
11
  ReplaySubject,
12
  Subject,
13
  Subscription,
14
} from 'rxjs';
15
import {
1✔
16
  catchError,
17
  distinctUntilChanged,
18
  finalize,
19
  mergeMap,
20
} from 'rxjs/operators';
21
import { NO_EVENT_HANDLER } from '../constants';
1✔
22
import { BaseRpcContext } from '../ctx-host/base-rpc.context';
23
import { IncomingRequestDeserializer } from '../deserializers/incoming-request.deserializer';
1✔
24
import { Transport } from '../enums';
25
import {
26
  ClientOptions,
27
  KafkaOptions,
28
  MessageHandler,
29
  MicroserviceOptions,
30
  MqttOptions,
31
  MsPattern,
32
  NatsOptions,
33
  ReadPacket,
34
  RedisOptions,
35
  RmqOptions,
36
  TcpOptions,
37
  WritePacket,
38
} from '../interfaces';
39
import { ConsumerDeserializer } from '../interfaces/deserializer.interface';
40
import { ConsumerSerializer } from '../interfaces/serializer.interface';
41
import { IdentitySerializer } from '../serializers/identity.serializer';
1✔
42
import { transformPatternToRoute } from '../utils';
1✔
43

44
/**
45
 * @publicApi
46
 */
47
export abstract class Server<
1✔
48
  EventsMap extends Record<string, Function> = Record<string, Function>,
49
  Status extends string = string,
50
> {
51
  /**
52
   * Unique transport identifier.
53
   */
54
  readonly transportId?: Transport | symbol;
55

56
  protected readonly messageHandlers = new Map<string, MessageHandler>();
232✔
57
  protected readonly logger: LoggerService = new Logger(Server.name);
232✔
58
  protected serializer: ConsumerSerializer;
59
  protected deserializer: ConsumerDeserializer;
60
  protected _status$ = new ReplaySubject<Status>(1);
232✔
61

62
  /**
63
   * Returns an observable that emits status changes.
64
   */
65
  public get status(): Observable<Status> {
NEW
66
    return this._status$.asObservable().pipe(distinctUntilChanged());
×
67
  }
68

69
  /**
70
   * Registers an event listener for the given event.
71
   * @param event Event name
72
   * @param callback Callback to be executed when the event is emitted
73
   */
74
  public abstract on<
75
    EventKey extends keyof EventsMap = keyof EventsMap,
76
    EventCallback extends EventsMap[EventKey] = EventsMap[EventKey],
77
  >(event: EventKey, callback: EventCallback): any;
78
  /**
79
   * Returns an instance of the underlying server/broker instance,
80
   * or a group of servers if there are more than one.
81
   */
82
  public abstract unwrap<T>(): T;
83

84
  /**
85
   * Method called when server is being initialized.
86
   * @param callback Function to be called upon initialization
87
   */
88
  public abstract listen(callback: (...optionalParams: unknown[]) => any): any;
89
  /**
90
   * Method called when server is being terminated.
91
   */
92
  public abstract close(): any;
93

94
  public addHandler(
95
    pattern: any,
96
    callback: MessageHandler,
97
    isEventHandler = false,
1✔
98
    extras: Record<string, any> = {},
2✔
99
  ) {
100
    const normalizedPattern = this.normalizePattern(pattern);
2✔
101
    callback.isEventHandler = isEventHandler;
2✔
102
    callback.extras = extras;
2✔
103

104
    if (this.messageHandlers.has(normalizedPattern) && isEventHandler) {
2✔
105
      const headRef = this.messageHandlers.get(normalizedPattern);
1✔
106
      const getTail = (handler: MessageHandler) =>
1✔
107
        handler?.next ? getTail(handler.next) : handler;
2✔
108

109
      const tailRef = getTail(headRef);
1✔
110
      tailRef.next = callback;
1✔
111
    } else {
112
      this.messageHandlers.set(normalizedPattern, callback);
1✔
113
    }
114
  }
115

116
  public getHandlers(): Map<string, MessageHandler> {
117
    return this.messageHandlers;
1✔
118
  }
119

120
  public getHandlerByPattern(pattern: string): MessageHandler | null {
121
    const route = this.getRouteFromPattern(pattern);
35✔
122
    return this.messageHandlers.has(route)
35✔
123
      ? this.messageHandlers.get(route)
35✔
124
      : null;
125
  }
126

127
  public send(
128
    stream$: Observable<any>,
129
    respond: (data: WritePacket) => unknown | Promise<unknown>,
130
  ): Subscription {
131
    let dataBuffer: WritePacket[] = null;
9✔
132
    const scheduleOnNextTick = (data: WritePacket) => {
9✔
133
      if (!dataBuffer) {
18✔
134
        dataBuffer = [data];
9✔
135
        process.nextTick(async () => {
9✔
136
          for (const item of dataBuffer) {
9✔
137
            await respond(item);
9✔
138
          }
139
          dataBuffer = null;
9✔
140
        });
141
      } else if (!data.isDisposed) {
9!
142
        dataBuffer = dataBuffer.concat(data);
×
143
      } else {
144
        dataBuffer[dataBuffer.length - 1].isDisposed = data.isDisposed;
9✔
145
      }
146
    };
147
    return stream$
9✔
148
      .pipe(
149
        catchError((err: any) => {
150
          scheduleOnNextTick({ err });
1✔
151
          return EMPTY;
1✔
152
        }),
153
        finalize(() => scheduleOnNextTick({ isDisposed: true })),
9✔
154
      )
155
      .subscribe((response: any) => scheduleOnNextTick({ response }));
8✔
156
  }
157

158
  public async handleEvent(
159
    pattern: string,
160
    packet: ReadPacket,
161
    context: BaseRpcContext,
162
  ): Promise<any> {
163
    const handler = this.getHandlerByPattern(pattern);
11✔
164
    if (!handler) {
11✔
165
      return this.logger.error(NO_EVENT_HANDLER`${pattern}`);
6✔
166
    }
167
    const resultOrStream = await handler(packet.data, context);
5✔
168
    if (isObservable(resultOrStream)) {
5!
169
      const connectableSource = connectable(resultOrStream, {
×
170
        connector: () => new Subject(),
×
171
        resetOnDisconnect: false,
172
      });
173
      connectableSource.connect();
×
174
    }
175
  }
176

177
  public transformToObservable<T>(
178
    resultOrDeferred: Observable<T> | Promise<T>,
179
  ): Observable<T>;
180
  public transformToObservable<T>(
181
    resultOrDeferred: T,
182
  ): never extends Observable<ObservedValueOf<T>>
183
    ? Observable<T>
184
    : Observable<ObservedValueOf<T>>;
185
  public transformToObservable(resultOrDeferred: any) {
186
    if (resultOrDeferred instanceof Promise) {
26✔
187
      return fromPromise(resultOrDeferred).pipe(
1✔
188
        mergeMap(val => (isObservable(val) ? val : of(val))),
1!
189
      );
190
    }
191

192
    if (isObservable(resultOrDeferred)) {
25✔
193
      return resultOrDeferred;
10✔
194
    }
195

196
    return of(resultOrDeferred);
15✔
197
  }
198

199
  public getOptionsProp<
200
    T extends MicroserviceOptions['options'],
201
    K extends keyof T,
202
  >(obj: T, prop: K, defaultValue: T[K] = undefined) {
425✔
203
    return obj && prop in obj ? obj[prop] : defaultValue;
501✔
204
  }
205

206
  protected handleError(error: string) {
207
    this.logger.error(error);
×
208
  }
209

210
  protected loadPackage<T = any>(
211
    name: string,
212
    ctx: string,
213
    loader?: Function,
214
  ): T {
215
    return loadPackage(name, ctx, loader);
338✔
216
  }
217

218
  protected initializeSerializer(options: ClientOptions['options']) {
219
    this.serializer =
39✔
220
      (options &&
112✔
221
        (
222
          options as
223
            | RedisOptions['options']
224
            | NatsOptions['options']
225
            | MqttOptions['options']
226
            | TcpOptions['options']
227
            | RmqOptions['options']
228
            | KafkaOptions['options']
229
        ).serializer) ||
230
      new IdentitySerializer();
231
  }
232

233
  protected initializeDeserializer(options: ClientOptions['options']) {
234
    this.deserializer =
86✔
235
      (options &&
251✔
236
        (
237
          options as
238
            | RedisOptions['options']
239
            | NatsOptions['options']
240
            | MqttOptions['options']
241
            | TcpOptions['options']
242
            | RmqOptions['options']
243
            | KafkaOptions['options']
244
        ).deserializer) ||
245
      new IncomingRequestDeserializer();
246
  }
247

248
  /**
249
   * Transforms the server Pattern to valid type and returns a route for him.
250
   *
251
   * @param  {string} pattern - server pattern
252
   * @returns string
253
   */
254
  protected getRouteFromPattern(pattern: string): string {
255
    let validPattern: MsPattern;
256

257
    try {
39✔
258
      validPattern = JSON.parse(pattern);
39✔
259
    } catch (error) {
260
      // Uses a fundamental object (`pattern` variable without any conversion)
261
      validPattern = pattern;
38✔
262
    }
263
    return this.normalizePattern(validPattern);
39✔
264
  }
265

266
  protected normalizePattern(pattern: MsPattern): string {
267
    return transformPatternToRoute(pattern);
37✔
268
  }
269
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc