• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

nestjs / nest / 481bc0ec-33a8-4162-8bf2-c5199bf1f5fa

15 Jul 2025 12:37PM UTC coverage: 88.918% (-0.02%) from 88.934%
481bc0ec-33a8-4162-8bf2-c5199bf1f5fa

Pull #15385

circleci

mag123c
feat(platform-fastify): implement lazy middleware registration
Pull Request #15385: fix(testing): auto-init fastify adapter for middleware registration

2711 of 3425 branches covered (79.15%)

7197 of 8094 relevant lines covered (88.92%)

16.35 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

82.73
/packages/microservices/server/server-mqtt.ts
1
import { isObject, isUndefined } from '@nestjs/common/utils/shared.utils';
1✔
2
import {
1✔
3
  MQTT_DEFAULT_URL,
4
  MQTT_SEPARATOR,
5
  MQTT_WILDCARD_ALL,
6
  MQTT_WILDCARD_SINGLE,
7
  NO_MESSAGE_HANDLER,
8
} from '../constants';
9
import { MqttContext } from '../ctx-host/mqtt.context';
1✔
10
import { Transport } from '../enums';
1✔
11
import { MqttEvents, MqttEventsMap, MqttStatus } from '../events/mqtt.events';
12
import {
13
  IncomingRequest,
14
  MessageHandler,
15
  PacketId,
16
  ReadPacket,
17
} from '../interfaces';
18
import {
19
  MqttOptions,
20
  TransportId,
21
} from '../interfaces/microservice-configuration.interface';
22
import { MqttRecord } from '../record-builders/mqtt.record-builder';
1✔
23
import { MqttRecordSerializer } from '../serializers/mqtt-record.serializer';
1✔
24
import { Server } from './server';
1✔
25

26
let mqttPackage: any = {};
1✔
27

28
// To enable type safety for MQTT. This cant be uncommented by default
29
// because it would require the user to install the mqtt package even if they dont use MQTT
30
// Otherwise, TypeScript would fail to compile the code.
31
//
32
// type MqttClient = import('mqtt').MqttClient;
33
type MqttClient = any;
34

35
/**
36
 * @publicApi
37
 */
38
export class ServerMqtt extends Server<MqttEvents, MqttStatus> {
1✔
39
  public transportId: TransportId = Transport.MQTT;
25✔
40
  protected readonly url: string;
41
  protected mqttClient: MqttClient;
42
  protected pendingEventListeners: Array<{
25✔
43
    event: keyof MqttEvents;
44
    callback: MqttEvents[keyof MqttEvents];
45
  }> = [];
46

47
  constructor(private readonly options: Required<MqttOptions>['options']) {
25✔
48
    super();
25✔
49
    this.url = this.getOptionsProp(options, 'url', MQTT_DEFAULT_URL);
25✔
50

51
    mqttPackage = this.loadPackage('mqtt', ServerMqtt.name, () =>
25✔
52
      require('mqtt'),
25✔
53
    );
54

55
    this.initializeSerializer(options);
25✔
56
    this.initializeDeserializer(options);
25✔
57
  }
58

59
  public async listen(
60
    callback: (err?: unknown, ...optionalParams: unknown[]) => void,
61
  ) {
62
    try {
7✔
63
      this.mqttClient = this.createMqttClient();
7✔
64
      this.start(callback);
7✔
65
    } catch (err) {
66
      callback(err);
1✔
67
    }
68
  }
69

70
  public start(
71
    callback: (err?: unknown, ...optionalParams: unknown[]) => void,
72
  ) {
73
    this.registerErrorListener(this.mqttClient);
6✔
74
    this.registerReconnectListener(this.mqttClient);
6✔
75
    this.registerDisconnectListener(this.mqttClient);
6✔
76
    this.registerCloseListener(this.mqttClient);
6✔
77
    this.registerConnectListener(this.mqttClient);
6✔
78

79
    this.pendingEventListeners.forEach(({ event, callback }) =>
6✔
80
      this.mqttClient.on(event, callback),
×
81
    );
82
    this.pendingEventListeners = [];
6✔
83
    this.bindEvents(this.mqttClient);
6✔
84

85
    this.mqttClient.on(MqttEventsMap.CONNECT, () => callback());
6✔
86
  }
87

88
  public bindEvents(mqttClient: MqttClient) {
89
    mqttClient.on('message', this.getMessageHandler(mqttClient).bind(this));
7✔
90

91
    const registeredPatterns = [...this.messageHandlers.keys()];
7✔
92
    registeredPatterns.forEach(pattern => {
7✔
93
      const { isEventHandler } = this.messageHandlers.get(pattern)!;
1✔
94
      mqttClient.subscribe(
1✔
95
        isEventHandler ? pattern : this.getRequestPattern(pattern),
1!
96
        this.getOptionsProp(this.options, 'subscribeOptions'),
97
      );
98
    });
99
  }
100

101
  public close() {
102
    this.mqttClient && this.mqttClient.end();
1✔
103
    this.pendingEventListeners = [];
1✔
104
  }
105

106
  public createMqttClient(): MqttClient {
107
    return mqttPackage.connect(this.url, this.options as MqttOptions);
×
108
  }
109

110
  public getMessageHandler(pub: MqttClient) {
111
    return async (
9✔
112
      channel: string,
113
      buffer: Buffer,
114
      originalPacket?: Record<string, any>,
115
    ) => this.handleMessage(channel, buffer, pub, originalPacket);
1✔
116
  }
117

118
  public async handleMessage(
119
    channel: string,
120
    buffer: Buffer,
121
    pub: MqttClient,
122
    originalPacket?: Record<string, any>,
123
  ): Promise<any> {
124
    const rawPacket = this.parseMessage(buffer.toString());
3✔
125
    const packet = await this.deserializer.deserialize(rawPacket, { channel });
3✔
126
    const mqttContext = new MqttContext([channel, originalPacket!]);
3✔
127
    if (isUndefined((packet as IncomingRequest).id)) {
3✔
128
      return this.handleEvent(channel, packet, mqttContext);
1✔
129
    }
130
    const publish = this.getPublisher(
2✔
131
      pub,
132
      channel,
133
      (packet as IncomingRequest).id,
134
    );
135
    const handler = this.getHandlerByPattern(channel);
2✔
136

137
    if (!handler) {
2✔
138
      const status = 'error';
1✔
139
      const noHandlerPacket = {
1✔
140
        id: (packet as IncomingRequest).id,
141
        status,
142
        err: NO_MESSAGE_HANDLER,
143
      };
144
      return publish(noHandlerPacket);
1✔
145
    }
146
    const response$ = this.transformToObservable(
1✔
147
      await handler(packet.data, mqttContext),
148
    );
149
    response$ && this.send(response$, publish);
1✔
150
  }
151

152
  public getPublisher(client: MqttClient, pattern: any, id: string): any {
153
    return (response: any) => {
3✔
154
      Object.assign(response, { id });
1✔
155

156
      const options =
157
        isObject(response?.data) && response.data instanceof MqttRecord
1!
158
          ? (response.data as MqttRecord)?.options
1!
159
          : {};
160
      delete response?.data?.options;
1✔
161

162
      const outgoingResponse: string | Buffer =
163
        this.serializer.serialize(response);
1✔
164
      return client.publish(
1✔
165
        this.getReplyPattern(pattern),
166
        outgoingResponse,
167
        options,
168
      );
169
    };
170
  }
171

172
  public parseMessage(content: any): ReadPacket & PacketId {
173
    try {
5✔
174
      return JSON.parse(content);
5✔
175
    } catch (e) {
176
      return content;
2✔
177
    }
178
  }
179

180
  public matchMqttPattern(pattern: string, topic: string) {
181
    const patternSegments = pattern.split(MQTT_SEPARATOR);
8✔
182
    const topicSegments = topic.split(MQTT_SEPARATOR);
8✔
183

184
    const patternSegmentsLength = patternSegments.length;
8✔
185
    const topicSegmentsLength = topicSegments.length;
8✔
186
    const lastIndex = patternSegmentsLength - 1;
8✔
187

188
    for (let i = 0; i < patternSegmentsLength; i++) {
8✔
189
      const currentPattern = patternSegments[i];
21✔
190
      const patternChar = currentPattern[0];
21✔
191
      const currentTopic = topicSegments[i];
21✔
192

193
      if (!currentTopic && !currentPattern) {
21!
194
        continue;
×
195
      }
196
      if (!currentTopic && currentPattern !== MQTT_WILDCARD_ALL) {
21!
197
        return false;
×
198
      }
199
      if (patternChar === MQTT_WILDCARD_ALL) {
21✔
200
        return i === lastIndex;
3✔
201
      }
202
      if (
18✔
203
        patternChar !== MQTT_WILDCARD_SINGLE &&
33✔
204
        currentPattern !== currentTopic
205
      ) {
206
        return false;
3✔
207
      }
208
    }
209
    return patternSegmentsLength === topicSegmentsLength;
2✔
210
  }
211

212
  public getHandlerByPattern(pattern: string): MessageHandler | null {
213
    const route = this.getRouteFromPattern(pattern);
4✔
214
    if (this.messageHandlers.has(route)) {
4✔
215
      return this.messageHandlers.get(route) || null;
2!
216
    }
217

218
    for (const [key, value] of this.messageHandlers) {
2✔
219
      const keyWithoutSharedPrefix = this.removeHandlerKeySharedPrefix(key);
×
220
      if (this.matchMqttPattern(keyWithoutSharedPrefix, route)) {
×
221
        return value;
×
222
      }
223
    }
224
    return null;
2✔
225
  }
226

227
  public removeHandlerKeySharedPrefix(handlerKey: string) {
228
    return handlerKey && handlerKey.startsWith('$share')
×
229
      ? handlerKey.split('/').slice(2).join('/')
×
230
      : handlerKey;
231
  }
232

233
  public getRequestPattern(pattern: string): string {
234
    return pattern;
2✔
235
  }
236

237
  public getReplyPattern(pattern: string): string {
238
    return `${pattern}/reply`;
2✔
239
  }
240

241
  public registerErrorListener(client: MqttClient) {
242
    client.on(MqttEventsMap.ERROR, (err: unknown) => this.logger.error(err));
6✔
243
  }
244

245
  public registerReconnectListener(client: MqttClient) {
246
    client.on(MqttEventsMap.RECONNECT, () => {
6✔
247
      this._status$.next(MqttStatus.RECONNECTING);
×
248

249
      this.logger.log('MQTT connection lost. Trying to reconnect...');
×
250
    });
251
  }
252

253
  public registerDisconnectListener(client: MqttClient) {
254
    client.on(MqttEventsMap.DISCONNECT, () => {
6✔
255
      this._status$.next(MqttStatus.DISCONNECTED);
×
256
    });
257
  }
258

259
  public registerCloseListener(client: MqttClient) {
260
    client.on(MqttEventsMap.CLOSE, () => {
6✔
261
      this._status$.next(MqttStatus.CLOSED);
×
262
    });
263
  }
264

265
  public registerConnectListener(client: MqttClient) {
266
    client.on(MqttEventsMap.CONNECT, () => {
6✔
267
      this._status$.next(MqttStatus.CONNECTED);
×
268
    });
269
  }
270

271
  public unwrap<T>(): T {
272
    if (!this.mqttClient) {
×
273
      throw new Error(
×
274
        'Not initialized. Please call the "listen"/"startAllMicroservices" method before accessing the server.',
275
      );
276
    }
277
    return this.mqttClient as T;
×
278
  }
279

280
  public on<
281
    EventKey extends keyof MqttEvents = keyof MqttEvents,
282
    EventCallback extends MqttEvents[EventKey] = MqttEvents[EventKey],
283
  >(event: EventKey, callback: EventCallback) {
284
    if (this.mqttClient) {
×
285
      this.mqttClient.on(event, callback as any);
×
286
    } else {
287
      this.pendingEventListeners.push({ event, callback });
×
288
    }
289
  }
290

291
  protected initializeSerializer(options: MqttOptions['options']) {
292
    this.serializer = options?.serializer ?? new MqttRecordSerializer();
25✔
293
  }
294
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc