• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

nestjs / nest / f7142b24-e394-4028-89c6-717e281431d1

25 Nov 2024 01:12PM UTC coverage: 90.141% (-1.8%) from 91.989%
f7142b24-e394-4028-89c6-717e281431d1

Pull #14177

circleci

web-flow
Merge pull request #14200 from nestjs/feat/allow-queue-per-handler

feat(microservices): support nats queue per handler
Pull Request #14177: release: version 11.0.0

2612 of 3236 branches covered (80.72%)

496 of 704 new or added lines in 48 files covered. (70.45%)

79 existing lines in 11 files now uncovered.

6985 of 7749 relevant lines covered (90.14%)

16.31 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

83.33
/packages/microservices/client/client-proxy.ts
1
import { randomStringGenerator } from '@nestjs/common/utils/random-string-generator.util';
1✔
2
import { isNil } from '@nestjs/common/utils/shared.utils';
1✔
3
import {
1✔
4
  throwError as _throw,
5
  connectable,
6
  defer,
7
  fromEvent,
8
  merge,
9
  Observable,
10
  Observer,
11
  ReplaySubject,
12
  Subject,
13
} from 'rxjs';
14
import { distinctUntilChanged, map, mergeMap, take } from 'rxjs/operators';
1✔
15
import { IncomingResponseDeserializer } from '../deserializers/incoming-response.deserializer';
1✔
16
import { InvalidMessageException } from '../errors/invalid-message.exception';
1✔
17
import {
18
  ClientOptions,
19
  KafkaOptions,
20
  MqttOptions,
21
  MsPattern,
22
  NatsOptions,
23
  PacketId,
24
  ReadPacket,
25
  RedisOptions,
26
  RmqOptions,
27
  TcpClientOptions,
28
  WritePacket,
29
} from '../interfaces';
30
import { ProducerDeserializer } from '../interfaces/deserializer.interface';
31
import { ProducerSerializer } from '../interfaces/serializer.interface';
32
import { IdentitySerializer } from '../serializers/identity.serializer';
1✔
33
import { transformPatternToRoute } from '../utils';
1✔
34

35
/**
36
 * @publicApi
37
 */
38
export abstract class ClientProxy<
1✔
39
  EventsMap extends Record<never, Function> = Record<never, Function>,
40
  Status extends string = string,
41
> {
42
  protected routingMap = new Map<string, Function>();
187✔
43
  protected serializer: ProducerSerializer;
44
  protected deserializer: ProducerDeserializer;
45
  protected _status$ = new ReplaySubject<Status>(1);
187✔
46

47
  /**
48
   * Returns an observable that emits status changes.
49
   */
50
  public get status(): Observable<Status> {
NEW
51
    return this._status$.asObservable().pipe(distinctUntilChanged());
×
52
  }
53

54
  /**
55
   * Establishes the connection to the underlying server/broker.
56
   */
57
  public abstract connect(): Promise<any>;
58
  /**
59
   * Closes the underlying connection to the server/broker.
60
   */
61
  public abstract close(): any;
62
  /**
63
   * Registers an event listener for the given event.
64
   * @param event Event name
65
   * @param callback Callback to be executed when the event is emitted
66
   */
67
  public on<
68
    EventKey extends keyof EventsMap = keyof EventsMap,
69
    EventCallback extends EventsMap[EventKey] = EventsMap[EventKey],
70
  >(event: EventKey, callback: EventCallback) {
NEW
71
    throw new Error('Method not implemented.');
×
72
  }
73
  /**
74
   * Returns an instance of the underlying server/broker instance,
75
   * or a group of servers if there are more than one.
76
   */
77
  public abstract unwrap<T>(): T;
78

79
  /**
80
   * Send a message to the server/broker.
81
   * Used for message-driven communication style between microservices.
82
   * @param pattern Pattern to identify the message
83
   * @param data Data to be sent
84
   * @returns Observable with the result
85
   */
86
  public send<TResult = any, TInput = any>(
87
    pattern: any,
88
    data: TInput,
89
  ): Observable<TResult> {
90
    if (isNil(pattern) || isNil(data)) {
5✔
91
      return _throw(() => new InvalidMessageException());
1✔
92
    }
93
    return defer(async () => this.connect()).pipe(
4✔
94
      mergeMap(
95
        () =>
96
          new Observable((observer: Observer<TResult>) => {
2✔
97
            const callback = this.createObserver(observer);
2✔
98
            return this.publish({ pattern, data }, callback);
2✔
99
          }),
100
      ),
101
    );
102
  }
103

104
  /**
105
   * Emits an event to the server/broker.
106
   * Used for event-driven communication style between microservices.
107
   * @param pattern Pattern to identify the event
108
   * @param data Data to be sent
109
   * @returns Observable that completes when the event is successfully emitted
110
   */
111
  public emit<TResult = any, TInput = any>(
112
    pattern: any,
113
    data: TInput,
114
  ): Observable<TResult> {
115
    if (isNil(pattern) || isNil(data)) {
5✔
116
      return _throw(() => new InvalidMessageException());
1✔
117
    }
118
    const source = defer(async () => this.connect()).pipe(
4✔
119
      mergeMap(() => this.dispatchEvent({ pattern, data })),
3✔
120
    );
121
    const connectableSource = connectable(source, {
4✔
122
      connector: () => new Subject(),
4✔
123
      resetOnDisconnect: false,
124
    });
125
    connectableSource.connect();
4✔
126
    return connectableSource;
4✔
127
  }
128

129
  protected abstract publish(
130
    packet: ReadPacket,
131
    callback: (packet: WritePacket) => void,
132
  ): () => void;
133

134
  protected abstract dispatchEvent<T = any>(packet: ReadPacket): Promise<T>;
135

136
  protected createObserver<T>(
137
    observer: Observer<T>,
138
  ): (packet: WritePacket) => void {
139
    return ({ err, response, isDisposed }: WritePacket) => {
5✔
140
      if (err) {
3✔
141
        return observer.error(this.serializeError(err));
1✔
142
      } else if (response !== undefined && isDisposed) {
2!
143
        observer.next(this.serializeResponse(response));
×
144
        return observer.complete();
×
145
      } else if (isDisposed) {
2✔
146
        return observer.complete();
1✔
147
      }
148
      observer.next(this.serializeResponse(response));
1✔
149
    };
150
  }
151

152
  protected serializeError(err: any): any {
153
    return err;
2✔
154
  }
155

156
  protected serializeResponse(response: any): any {
157
    return response;
1✔
158
  }
159

160
  protected assignPacketId(packet: ReadPacket): ReadPacket & PacketId {
161
    const id = randomStringGenerator();
9✔
162
    return Object.assign(packet, { id });
9✔
163
  }
164

165
  protected connect$(
166
    instance: any,
167
    errorEvent = 'error',
×
168
    connectEvent = 'connect',
×
169
  ): Observable<any> {
170
    const error$ = fromEvent(instance, errorEvent).pipe(
×
171
      map((err: any) => {
172
        throw err;
×
173
      }),
174
    );
175
    const connect$ = fromEvent(instance, connectEvent);
×
176
    return merge(error$, connect$).pipe(take(1));
×
177
  }
178

179
  protected getOptionsProp<
180
    T extends ClientOptions['options'],
181
    K extends keyof T,
182
  >(obj: T, prop: K, defaultValue: T[K] = undefined) {
385✔
183
    return obj && prop in obj ? obj[prop] : defaultValue;
547✔
184
  }
185

186
  protected normalizePattern(pattern: MsPattern): string {
187
    return transformPatternToRoute(pattern);
46✔
188
  }
189

190
  protected initializeSerializer(options: ClientOptions['options']) {
191
    this.serializer =
33✔
192
      (options &&
99✔
193
        (
194
          options as
195
            | RedisOptions['options']
196
            | NatsOptions['options']
197
            | MqttOptions['options']
198
            | TcpClientOptions['options']
199
            | RmqOptions['options']
200
            | KafkaOptions['options']
201
        ).serializer) ||
202
      new IdentitySerializer();
203
  }
204

205
  protected initializeDeserializer(options: ClientOptions['options']) {
206
    this.deserializer =
71✔
207
      (options &&
213✔
208
        (
209
          options as
210
            | RedisOptions['options']
211
            | NatsOptions['options']
212
            | MqttOptions['options']
213
            | TcpClientOptions['options']
214
            | RmqOptions['options']
215
            | KafkaOptions['options']
216
        ).deserializer) ||
217
      new IncomingResponseDeserializer();
218
  }
219
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc