• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

nestjs / nest / e79cdc3c-762e-40cd-acb3-21e17deb1cd9

18 Aug 2024 03:11PM UTC coverage: 92.124% (-0.09%) from 92.213%
e79cdc3c-762e-40cd-acb3-21e17deb1cd9

Pull #13485

circleci

DylanVeldra
fix(core): merge req context with tenant payload in the request instance
Pull Request #13485: fix(core): merge request context with tenant context payload in the request singleton

2559 of 3078 branches covered (83.14%)

1 of 2 new or added lines in 2 files covered. (50.0%)

74 existing lines in 13 files now uncovered.

6737 of 7313 relevant lines covered (92.12%)

17.03 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

89.47
/packages/microservices/client/client-nats.ts
1
import { Logger } from '@nestjs/common/services/logger.service';
1✔
2
import { loadPackage } from '@nestjs/common/utils/load-package.util';
1✔
3
import { isObject } from '@nestjs/common/utils/shared.utils';
1✔
4
import { NATS_DEFAULT_URL } from '../constants';
1✔
5
import { NatsResponseJSONDeserializer } from '../deserializers/nats-response-json.deserializer';
1✔
6
import { EmptyResponseException } from '../errors/empty-response.exception';
1✔
7
import { Client, NatsMsg } from '../external/nats-client.interface';
8
import { NatsOptions, PacketId, ReadPacket, WritePacket } from '../interfaces';
9
import { NatsRecord } from '../record-builders';
10
import { NatsRecordSerializer } from '../serializers/nats-record.serializer';
1✔
11
import { ClientProxy } from './client-proxy';
1✔
12

13
let natsPackage = {} as any;
1✔
14

15
/**
16
 * @publicApi
17
 */
18
export class ClientNats extends ClientProxy {
1✔
19
  protected readonly logger = new Logger(ClientNats.name);
14✔
20
  protected natsClient: Client;
21

22
  constructor(protected readonly options: NatsOptions['options']) {
14✔
23
    super();
14✔
24
    natsPackage = loadPackage('nats', ClientNats.name, () => require('nats'));
14✔
25

26
    this.initializeSerializer(options);
14✔
27
    this.initializeDeserializer(options);
14✔
28
  }
29

30
  public async close() {
31
    await this.natsClient?.close();
1✔
32
    this.natsClient = null;
1✔
33
  }
34

35
  public async connect(): Promise<any> {
36
    if (this.natsClient) {
6✔
37
      return this.natsClient;
3✔
38
    }
39
    this.natsClient = await this.createClient();
3✔
40
    this.handleStatusUpdates(this.natsClient);
3✔
41
    return this.natsClient;
3✔
42
  }
43

44
  public createClient(): Promise<Client> {
UNCOV
45
    const options: any = this.options || ({} as NatsOptions);
×
UNCOV
46
    return natsPackage.connect({
×
47
      servers: NATS_DEFAULT_URL,
48
      ...options,
49
    });
50
  }
51

52
  public async handleStatusUpdates(client: Client) {
53
    for await (const status of client.status()) {
6✔
54
      const data =
55
        status.data && isObject(status.data)
4✔
56
          ? JSON.stringify(status.data)
4✔
57
          : status.data;
58

59
      switch (status.type) {
4✔
60
        case 'error':
5!
61
        case 'disconnect':
62
          this.logger.error(
2✔
63
            `NatsError: type: "${status.type}", data: "${data}".`,
64
          );
65
          break;
2✔
66

67
        case 'pingTimer':
UNCOV
68
          if (this.options.debug) {
×
UNCOV
69
            this.logger.debug(
×
70
              `NatsStatus: type: "${status.type}", data: "${data}".`,
71
            );
72
          }
UNCOV
73
          break;
×
74

75
        default:
76
          this.logger.log(
2✔
77
            `NatsStatus: type: "${status.type}", data: "${data}".`,
78
          );
79
          break;
2✔
80
      }
81
    }
82
  }
83

84
  public createSubscriptionHandler(
85
    packet: ReadPacket & PacketId,
86
    callback: (packet: WritePacket) => any,
87
  ) {
88
    return async (error: unknown | undefined, natsMsg: NatsMsg) => {
9✔
89
      if (error) {
3!
UNCOV
90
        return callback({
×
91
          err: error,
92
        });
93
      }
94
      const rawPacket = natsMsg.data;
3✔
95
      if (rawPacket?.length === 0) {
3!
UNCOV
96
        return callback({
×
97
          err: new EmptyResponseException(
98
            this.normalizePattern(packet.pattern),
99
          ),
100
          isDisposed: true,
101
        });
102
      }
103
      const message = await this.deserializer.deserialize(rawPacket);
3✔
104
      if (message.id && message.id !== packet.id) {
3✔
105
        return undefined;
1✔
106
      }
107
      const { err, response, isDisposed } = message;
2✔
108
      if (isDisposed || err) {
2✔
109
        return callback({
1✔
110
          err,
111
          response,
112
          isDisposed: true,
113
        });
114
      }
115
      callback({
1✔
116
        err,
117
        response,
118
      });
119
    };
120
  }
121

122
  protected publish(
123
    partialPacket: ReadPacket,
124
    callback: (packet: WritePacket) => any,
125
  ): () => void {
126
    try {
7✔
127
      const packet = this.assignPacketId(partialPacket);
7✔
128
      const channel = this.normalizePattern(partialPacket.pattern);
6✔
129
      const serializedPacket: NatsRecord = this.serializer.serialize(packet);
6✔
130
      const inbox = natsPackage.createInbox(this.options.inboxPrefix);
6✔
131

132
      const subscriptionHandler = this.createSubscriptionHandler(
6✔
133
        packet,
134
        callback,
135
      );
136

137
      const subscription = this.natsClient.subscribe(inbox, {
6✔
138
        callback: subscriptionHandler,
139
      });
140

141
      const headers = this.mergeHeaders(serializedPacket.headers);
6✔
142
      this.natsClient.publish(channel, serializedPacket.data, {
6✔
143
        reply: inbox,
144
        headers,
145
      });
146

147
      return () => subscription.unsubscribe();
6✔
148
    } catch (err) {
149
      callback({ err });
1✔
150
    }
151
  }
152

153
  protected dispatchEvent(packet: ReadPacket): Promise<any> {
154
    const pattern = this.normalizePattern(packet.pattern);
6✔
155
    const serializedPacket: NatsRecord = this.serializer.serialize(packet);
6✔
156
    const headers = this.mergeHeaders(serializedPacket.headers);
6✔
157

158
    return new Promise<void>((resolve, reject) => {
6✔
159
      try {
6✔
160
        this.natsClient.publish(pattern, serializedPacket.data, {
6✔
161
          headers,
162
        });
163
        resolve();
6✔
164
      } catch (err) {
UNCOV
165
        reject(err);
×
166
      }
167
    });
168
  }
169

170
  protected initializeSerializer(options: NatsOptions['options']) {
171
    this.serializer = options?.serializer ?? new NatsRecordSerializer();
14✔
172
  }
173

174
  protected initializeDeserializer(options: NatsOptions['options']) {
175
    this.deserializer =
14✔
176
      options?.deserializer ?? new NatsResponseJSONDeserializer();
28✔
177
  }
178

179
  protected mergeHeaders<THeaders = any>(requestHeaders?: THeaders) {
180
    if (!requestHeaders && !this.options?.headers) {
12✔
181
      return undefined;
6✔
182
    }
183

184
    const headers = requestHeaders ?? natsPackage.headers();
6!
185

186
    for (const [key, value] of Object.entries(this.options?.headers || {})) {
6✔
187
      if (!headers.has(key)) {
4✔
188
        headers.set(key, value);
2✔
189
      }
190
    }
191

192
    return headers;
6✔
193
  }
194
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc