• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

nestjs / nest / d23343fc-9088-4892-9091-dc67809053b8

27 May 2025 08:47AM UTC coverage: 88.904% (-0.04%) from 88.94%
d23343fc-9088-4892-9091-dc67809053b8

Pull #15190

circleci

kamilmysliwiec
chore: revert tsconfig changes
Pull Request #15190: feat(microservices): nats v3 upgrade

2712 of 3429 branches covered (79.09%)

17 of 24 new or added lines in 5 files covered. (70.83%)

2 existing lines in 2 files now uncovered.

7179 of 8075 relevant lines covered (88.9%)

16.3 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

76.85
/packages/microservices/client/client-nats.ts
1
import { Logger } from '@nestjs/common/services/logger.service';
1✔
2
import { loadPackage } from '@nestjs/common/utils/load-package.util';
1✔
3
import { isObject } from '@nestjs/common/utils/shared.utils';
1✔
4
import { EventEmitter } from 'stream';
1✔
5
import { NATS_DEFAULT_URL } from '../constants';
1✔
6
import { NatsResponseJSONDeserializer } from '../deserializers/nats-response-json.deserializer';
1✔
7
import { EmptyResponseException } from '../errors/empty-response.exception';
1✔
8
import { NatsEvents, NatsEventsMap, NatsStatus } from '../events/nats.events';
9
import { NatsOptions, PacketId, ReadPacket, WritePacket } from '../interfaces';
10
import { NatsRecord } from '../record-builders';
11
import { NatsRecordSerializer } from '../serializers/nats-record.serializer';
1✔
12
import { ClientProxy } from './client-proxy';
1✔
13

14
let natsPackage = {} as any;
1✔
15

16
// To enable type safety for Nats. This cant be uncommented by default
17
// because it would require the user to install the nats package even if they dont use Nats
18
// Otherwise, TypeScript would fail to compile the code.
19
//
20
// type Client = import('@nats-io/transport-node').NatsConnection;
21
// type NatsMsg = import('@nats-io/transport-node').Msg;
22

23
type Client = Record<string, any>;
24
type NatsMsg = Record<string, any>;
25

26
/**
27
 * @publicApi
28
 */
29
export class ClientNats extends ClientProxy<NatsEvents, NatsStatus> {
1✔
30
  protected readonly logger = new Logger(ClientNats.name);
14✔
31

32
  protected natsClient: Client | null = null;
14✔
33
  protected connectionPromise: Promise<Client> | null = null;
14✔
34
  protected statusEventEmitter = new EventEmitter<{
14✔
35
    [key in keyof NatsEvents]: Parameters<NatsEvents[key]>;
36
  }>();
37

38
  constructor(protected readonly options: Required<NatsOptions>['options']) {
14✔
39
    super();
14✔
40
    natsPackage = loadPackage('@nats-io/transport-node', ClientNats.name, () =>
14✔
41
      require('@nats-io/transport-node'),
14✔
42
    );
43

44
    this.initializeSerializer(options);
14✔
45
    this.initializeDeserializer(options);
14✔
46
  }
47

48
  public async close() {
49
    await this.natsClient?.close();
1✔
50
    this.statusEventEmitter.removeAllListeners();
1✔
51

52
    this.natsClient = null;
1✔
53
    this.connectionPromise = null;
1✔
54
  }
55

56
  public async connect(): Promise<any> {
57
    if (this.connectionPromise) {
6✔
58
      return this.connectionPromise;
3✔
59
    }
60
    this.connectionPromise = this.createClient();
3✔
61
    this.natsClient = await this.connectionPromise.catch(err => {
3✔
62
      this.connectionPromise = null;
×
63
      throw err;
×
64
    });
65

66
    this._status$.next(NatsStatus.CONNECTED);
3✔
67
    void this.handleStatusUpdates(this.natsClient);
3✔
68
    return this.natsClient;
3✔
69
  }
70

71
  public createClient(): Promise<Client> {
72
    const options = this.options || ({} as NatsOptions);
×
73
    return natsPackage.connect({
×
74
      servers: NATS_DEFAULT_URL,
75
      ...options,
76
    });
77
  }
78

79
  public async handleStatusUpdates(client: Client) {
80
    for await (const status of client.status()) {
6✔
81
      switch (status.type) {
4✔
82
        case 'error':
4!
83
          this.logger.error(
1✔
84
            `NatsError: type: "${status.type}", error: "${status.error}".`,
85
          );
86
          break;
1✔
87

88
        case 'disconnect':
89
          this.connectionPromise = Promise.reject(
1✔
90
            'Error: Connection lost. Trying to reconnect...',
91
          );
92
          // Prevent unhandled promise rejection
93
          this.connectionPromise.catch(() => {});
1✔
94

95
          this.logger.error(`NatsError: type: "${status.type}".`);
1✔
96

97
          this._status$.next(NatsStatus.DISCONNECTED);
1✔
98
          this.statusEventEmitter.emit(
1✔
99
            NatsEventsMap.DISCONNECT,
100
            status.server as string,
101
          );
102
          break;
1✔
103

104
        case 'reconnecting':
105
          this._status$.next(NatsStatus.RECONNECTING);
×
106
          break;
×
107

108
        case 'reconnect':
109
          this.connectionPromise = Promise.resolve(client);
×
NEW
110
          this.logger.log(`NatsStatus: type: "${status.type}".`);
×
111

112
          this._status$.next(NatsStatus.CONNECTED);
×
113
          this.statusEventEmitter.emit(
×
114
            NatsEventsMap.RECONNECT,
115
            status.server as string,
116
          );
117
          break;
×
118

119
        case 'ping':
120
          if (this.options.debug) {
×
NEW
121
            this.logger.debug!(
×
122
              `NatsStatus: type: "${status.type}", pending pings: "${status.pendingPings}".`,
123
            );
124
          }
125
          break;
×
126

127
        case 'update':
128
          this.logger.log(
×
129
            `NatsStatus: type: "${status.type}", added: "${status.added}", deleted: "${status.deleted}".`,
130
          );
NEW
131
          this.statusEventEmitter.emit(NatsEventsMap.UPDATE, undefined);
×
132
          break;
×
133

134
        default:
135
          const data =
136
            'data' in status && isObject(status.data)
2✔
137
              ? JSON.stringify(status.data)
2✔
138
              : 'data' in status
139
                ? status.data
1!
140
                : '';
141
          this.logger.log(
2✔
142
            `NatsStatus: type: "${status.type}", data: "${data}".`,
143
          );
144
          break;
2✔
145
      }
146
    }
147
  }
148

149
  public on<
150
    EventKey extends keyof NatsEvents = keyof NatsEvents,
151
    EventCallback extends NatsEvents[EventKey] = NatsEvents[EventKey],
152
  >(event: EventKey, callback: EventCallback) {
153
    this.statusEventEmitter.on(event, callback as any);
×
154
  }
155

156
  public unwrap<T>(): T {
157
    if (!this.natsClient) {
×
158
      throw new Error(
×
159
        'Not initialized. Please call the "connect" method first.',
160
      );
161
    }
162
    return this.natsClient as T;
×
163
  }
164

165
  public createSubscriptionHandler(
166
    packet: ReadPacket & PacketId,
167
    callback: (packet: WritePacket) => any,
168
  ) {
169
    return async (error: Error | null, natsMsg: NatsMsg) => {
9✔
170
      if (error) {
3!
171
        return callback({
×
172
          err: error,
173
        });
174
      }
175
      const rawPacket = natsMsg.data;
3✔
176
      if (rawPacket?.length === 0) {
3!
177
        return callback({
×
178
          err: new EmptyResponseException(
179
            this.normalizePattern(packet.pattern),
180
          ),
181
          isDisposed: true,
182
        });
183
      }
184
      const message = await this.deserializer.deserialize(natsMsg);
3✔
185
      if (message.id && message.id !== packet.id) {
2!
UNCOV
186
        return undefined;
×
187
      }
188
      const { err, response, isDisposed } = message;
2✔
189
      if (isDisposed || err) {
2✔
190
        return callback({
1✔
191
          err,
192
          response,
193
          isDisposed: true,
194
        });
195
      }
196
      callback({
1✔
197
        err,
198
        response,
199
      });
200
    };
201
  }
202

203
  protected publish(
204
    partialPacket: ReadPacket,
205
    callback: (packet: WritePacket) => any,
206
  ): () => void {
207
    try {
7✔
208
      const packet = this.assignPacketId(partialPacket);
7✔
209
      const channel = this.normalizePattern(partialPacket.pattern);
6✔
210
      const serializedPacket: NatsRecord = this.serializer.serialize(packet);
6✔
211
      const inbox = natsPackage.createInbox(this.options.inboxPrefix);
6✔
212

213
      const subscriptionHandler = this.createSubscriptionHandler(
6✔
214
        packet,
215
        callback,
216
      );
217

218
      const subscription = this.natsClient!.subscribe(inbox, {
6✔
219
        callback: subscriptionHandler as (
220
          err: Error | null,
221
          msg: NatsMsg,
222
        ) => Promise<never>,
223
      });
224

225
      const headers = this.mergeHeaders(serializedPacket.headers);
6✔
226
      this.natsClient!.publish(channel, serializedPacket.data, {
6✔
227
        reply: inbox,
228
        headers,
229
      });
230

231
      return () => subscription.unsubscribe();
6✔
232
    } catch (err) {
233
      callback({ err });
1✔
234
      return () => {};
1✔
235
    }
236
  }
237

238
  protected dispatchEvent(packet: ReadPacket): Promise<any> {
239
    const pattern = this.normalizePattern(packet.pattern);
6✔
240
    const serializedPacket: NatsRecord = this.serializer.serialize(packet);
6✔
241
    const headers = this.mergeHeaders(serializedPacket.headers);
6✔
242

243
    return new Promise<void>((resolve, reject) => {
6✔
244
      try {
6✔
245
        this.natsClient!.publish(pattern, serializedPacket.data, {
6✔
246
          headers,
247
        });
248
        resolve();
6✔
249
      } catch (err) {
250
        reject(err);
×
251
      }
252
    });
253
  }
254

255
  protected initializeSerializer(options: NatsOptions['options']) {
256
    this.serializer = options?.serializer ?? new NatsRecordSerializer();
14✔
257
  }
258

259
  protected initializeDeserializer(options: NatsOptions['options']) {
260
    this.deserializer =
14✔
261
      options?.deserializer ?? new NatsResponseJSONDeserializer();
28✔
262
  }
263

264
  protected mergeHeaders<THeaders = any>(requestHeaders?: THeaders) {
265
    if (!requestHeaders && !this.options?.headers) {
12✔
266
      return undefined;
6✔
267
    }
268

269
    const headers = requestHeaders ?? natsPackage.headers();
6!
270

271
    for (const [key, value] of Object.entries(this.options?.headers || {})) {
6✔
272
      if (!headers.has(key)) {
4✔
273
        headers.set(key, value);
2✔
274
      }
275
    }
276

277
    return headers;
6✔
278
  }
279
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc