• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

nestjs / nest / d23343fc-9088-4892-9091-dc67809053b8

27 May 2025 08:47AM UTC coverage: 88.904% (-0.04%) from 88.94%
d23343fc-9088-4892-9091-dc67809053b8

Pull #15190

circleci

kamilmysliwiec
chore: revert tsconfig changes
Pull Request #15190: feat(microservices): nats v3 upgrade

2712 of 3429 branches covered (79.09%)

17 of 24 new or added lines in 5 files covered. (70.83%)

2 existing lines in 2 files now uncovered.

7179 of 8075 relevant lines covered (88.9%)

16.3 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

75.24
/packages/microservices/server/server-nats.ts
1
import { isObject, isUndefined } from '@nestjs/common/utils/shared.utils';
1✔
2
import { EventEmitter } from 'events';
1✔
3
import {
1✔
4
  NATS_DEFAULT_GRACE_PERIOD,
5
  NATS_DEFAULT_URL,
6
  NO_MESSAGE_HANDLER,
7
} from '../constants';
8
import { NatsContext } from '../ctx-host/nats.context';
1✔
9
import { NatsRequestJSONDeserializer } from '../deserializers/nats-request-json.deserializer';
1✔
10
import { Transport } from '../enums';
1✔
11
import { NatsEvents, NatsEventsMap, NatsStatus } from '../events/nats.events';
12
import {
13
  NatsOptions,
14
  TransportId,
15
} from '../interfaces/microservice-configuration.interface';
16
import { IncomingRequest } from '../interfaces/packet.interface';
17
import { NatsRecord } from '../record-builders';
18
import { NatsRecordSerializer } from '../serializers/nats-record.serializer';
1✔
19
import { Server } from './server';
1✔
20

21
let natsPackage = {} as any;
1✔
22

23
// To enable type safety for Nats. This cant be uncommented by default
24
// because it would require the user to install the nats package even if they dont use Nats
25
// Otherwise, TypeScript would fail to compile the code.
26
//
27
// type Client = import('@nats-io/transport-node').NatsConnection;
28
// type NatsMsg = import('@nats-io/transport-node').Msg;
29
// type Subscription = import('@nats-io/transport-node').Subscription;
30

31
type Client = any;
32
type NatsMsg = any;
33
type Subscription = any;
34

35
/**
36
 * @publicApi
37
 */
38
export class ServerNats<
1✔
39
  E extends NatsEvents = NatsEvents,
40
  S extends NatsStatus = NatsStatus,
41
> extends Server<E, S> {
42
  public transportId: TransportId = Transport.NATS;
23✔
43

44
  private natsClient: Client | null;
45
  protected statusEventEmitter = new EventEmitter<{
23✔
46
    [key in keyof NatsEvents]: Parameters<NatsEvents[key]>;
47
  }>();
48
  private readonly subscriptions: Subscription[] = [];
23✔
49

50
  constructor(private readonly options: Required<NatsOptions>['options']) {
23✔
51
    super();
23✔
52

53
    natsPackage = this.loadPackage(
23✔
54
      '@nats-io/transport-node',
55
      ServerNats.name,
56
      () => require('@nats-io/transport-node'),
23✔
57
    );
58

59
    this.initializeSerializer(options);
23✔
60
    this.initializeDeserializer(options);
23✔
61
  }
62

63
  public async listen(
64
    callback: (err?: unknown, ...optionalParams: unknown[]) => void,
65
  ) {
66
    try {
1✔
67
      this.natsClient = await this.createNatsClient();
1✔
68

69
      this._status$.next(NatsStatus.CONNECTED as S);
1✔
70
      void this.handleStatusUpdates(this.natsClient);
1✔
71
      this.start(callback);
1✔
72
    } catch (err) {
73
      callback(err);
1✔
74
    }
75
  }
76

77
  public start(
78
    callback: (err?: unknown, ...optionalParams: unknown[]) => void,
79
  ) {
NEW
80
    this.bindEvents(this.natsClient!);
×
81
    callback();
×
82
  }
83

84
  public bindEvents(client: Client) {
85
    const subscribe = (channel: string, queue: string) =>
3✔
86
      client.subscribe(channel, {
3✔
87
        queue,
88
        callback: this.getMessageHandler(channel).bind(this),
89
      });
90

91
    const defaultQueue = this.getOptionsProp(this.options, 'queue');
3✔
92
    const registeredPatterns = [...this.messageHandlers.keys()];
3✔
93
    for (const channel of registeredPatterns) {
3✔
94
      const handlerRef = this.messageHandlers.get(channel)!;
3✔
95
      const queue = handlerRef.extras?.queue ?? defaultQueue;
3✔
96
      const sub = subscribe(channel, queue);
3✔
97
      this.subscriptions.push(sub);
3✔
98
    }
99
  }
100

101
  private async waitForGracePeriod() {
102
    const gracePeriod = this.getOptionsProp(
×
103
      this.options,
104
      'gracePeriod',
105
      NATS_DEFAULT_GRACE_PERIOD,
106
    );
107
    await new Promise<void>(res => {
×
108
      setTimeout(() => {
×
109
        res();
×
110
      }, gracePeriod);
111
    });
112
  }
113

114
  public async close() {
115
    if (!this.natsClient) {
5!
116
      return;
×
117
    }
118
    const graceful = this.getOptionsProp(this.options, 'gracefulShutdown');
5✔
119
    if (graceful) {
5✔
120
      this.subscriptions.forEach(sub => sub.unsubscribe());
4✔
121
      await this.waitForGracePeriod();
2✔
122
    }
123
    await this.natsClient?.close();
5✔
124
    this.statusEventEmitter.removeAllListeners();
5✔
125
    this.natsClient = null;
5✔
126
  }
127

128
  public createNatsClient(): Promise<Client> {
129
    const options = this.options || ({} as NatsOptions);
×
130
    return natsPackage.connect({
×
131
      servers: NATS_DEFAULT_URL,
132
      ...options,
133
    });
134
  }
135

136
  public getMessageHandler(channel: string): Function {
137
    return async (error: object | undefined, message: NatsMsg) => {
5✔
138
      if (error) {
1!
139
        return this.logger.error(error);
×
140
      }
141
      return this.handleMessage(channel, message);
1✔
142
    };
143
  }
144

145
  public async handleMessage(channel: string, natsMsg: NatsMsg) {
146
    const callerSubject = natsMsg.subject;
3✔
147
    const rawMessage = natsMsg.data;
3✔
148
    const replyTo = natsMsg.reply;
3✔
149

150
    const natsCtx = new NatsContext([callerSubject, natsMsg.headers]);
3✔
151
    const message = await this.deserializer.deserialize(natsMsg, {
3✔
152
      channel,
153
      replyTo,
154
    });
155
    if (isUndefined((message as IncomingRequest).id)) {
3✔
156
      return this.handleEvent(channel, message, natsCtx);
1✔
157
    }
158
    const publish = this.getPublisher(natsMsg, (message as IncomingRequest).id);
2✔
159
    const handler = this.getHandlerByPattern(channel);
2✔
160
    if (!handler) {
2✔
161
      const status = 'error';
1✔
162
      const noHandlerPacket = {
1✔
163
        id: (message as IncomingRequest).id,
164
        status,
165
        err: NO_MESSAGE_HANDLER,
166
      };
167
      return publish(noHandlerPacket);
1✔
168
    }
169
    const response$ = this.transformToObservable(
1✔
170
      await handler(message.data, natsCtx),
171
    );
172
    response$ && this.send(response$, publish);
1✔
173
  }
174

175
  public getPublisher(natsMsg: NatsMsg, id: string) {
176
    if (natsMsg.reply) {
3✔
177
      return (response: any) => {
1✔
178
        Object.assign(response, { id });
1✔
179
        const outgoingResponse: NatsRecord =
180
          this.serializer.serialize(response);
1✔
181
        natsMsg.respond(outgoingResponse.data, {
1✔
182
          headers: outgoingResponse.headers,
183
        });
184
      };
185
    }
186

187
    // In case the "reply" topic is not provided, there's no need for a reply.
188
    // Method returns a noop function instead
189

190
    return () => {};
2✔
191
  }
192

193
  public async handleStatusUpdates(client: Client) {
194
    for await (const status of client.status()) {
4✔
195
      switch (status.type) {
4✔
196
        case 'error':
4!
197
          this.logger.error(
1✔
198
            `NatsError: type: "${status.type}", error: "${status.error}".`,
199
          );
200
          break;
1✔
201

202
        case 'disconnect':
203
          this.logger.error(`NatsError: type: "${status.type}".`);
1✔
204

205
          this._status$.next(NatsStatus.DISCONNECTED as S);
1✔
206
          this.statusEventEmitter.emit(NatsEventsMap.DISCONNECT, status.server);
1✔
207
          break;
1✔
208

209
        case 'ping':
210
          if (this.options.debug) {
×
211
            this.logger.debug!(
×
212
              `NatsStatus: type: "${status.type}", pending pings: "${status.pendingPings}".`,
213
            );
214
          }
215
          break;
×
216

217
        case 'reconnecting':
218
          this._status$.next(NatsStatus.RECONNECTING as S);
×
219
          break;
×
220

221
        case 'reconnect':
NEW
222
          this.logger.log(`NatsStatus: type: "${status.type}".`);
×
223

224
          this._status$.next(NatsStatus.CONNECTED as S);
×
NEW
225
          this.statusEventEmitter.emit(NatsEventsMap.RECONNECT, status.server);
×
UNCOV
226
          break;
×
227

228
        case 'update':
229
          this.logger.log(
×
230
            `NatsStatus: type: "${status.type}", added: "${status.added}", deleted: "${status.deleted}".`,
231
          );
NEW
232
          this.statusEventEmitter.emit(NatsEventsMap.UPDATE, undefined);
×
233
          break;
×
234

235
        default:
236
          const data =
237
            'data' in status && isObject(status.data)
2✔
238
              ? JSON.stringify(status.data)
2✔
239
              : 'data' in status
240
                ? status.data
1!
241
                : '';
242
          this.logger.log(
2✔
243
            `NatsStatus: type: "${status.type}", data: "${data}".`,
244
          );
245
          break;
2✔
246
      }
247
    }
248
  }
249

250
  public unwrap<T>(): T {
251
    if (!this.natsClient) {
×
252
      throw new Error(
×
253
        'Not initialized. Please call the "listen"/"startAllMicroservices" method before accessing the server.',
254
      );
255
    }
256
    return this.natsClient as T;
×
257
  }
258

259
  public on<
260
    EventKey extends keyof E = keyof E,
261
    EventCallback extends E[EventKey] = E[EventKey],
262
  >(event: EventKey, callback: EventCallback) {
263
    this.statusEventEmitter.on(event, callback as any);
×
264
  }
265

266
  protected initializeSerializer(options: NatsOptions['options']) {
267
    this.serializer = options?.serializer ?? new NatsRecordSerializer();
23✔
268
  }
269

270
  protected initializeDeserializer(options: NatsOptions['options']) {
271
    this.deserializer =
23✔
272
      options?.deserializer ?? new NatsRequestJSONDeserializer();
46✔
273
  }
274
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc