• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

nestjs / nest / e79cdc3c-762e-40cd-acb3-21e17deb1cd9

18 Aug 2024 03:11PM UTC coverage: 92.124% (-0.09%) from 92.213%
e79cdc3c-762e-40cd-acb3-21e17deb1cd9

Pull #13485

circleci

DylanVeldra
fix(core): merge req context with tenant payload in the request instance
Pull Request #13485: fix(core): merge request context with tenant context payload in the request singleton

2559 of 3078 branches covered (83.14%)

1 of 2 new or added lines in 2 files covered. (50.0%)

74 existing lines in 13 files now uncovered.

6737 of 7313 relevant lines covered (92.12%)

17.03 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

91.76
/packages/microservices/server/server-mqtt.ts
1
import { isUndefined } from '@nestjs/common/utils/shared.utils';
1✔
2
import {
1✔
3
  CONNECT_EVENT,
4
  ERROR_EVENT,
5
  MESSAGE_EVENT,
6
  MQTT_DEFAULT_URL,
7
  MQTT_SEPARATOR,
8
  MQTT_WILDCARD_ALL,
9
  MQTT_WILDCARD_SINGLE,
10
  NO_MESSAGE_HANDLER,
11
} from '../constants';
12
import { MqttContext } from '../ctx-host/mqtt.context';
1✔
13
import { Transport } from '../enums';
1✔
14
import { MqttClient } from '../external/mqtt-client.interface';
15
import {
16
  CustomTransportStrategy,
17
  IncomingRequest,
18
  MessageHandler,
19
  PacketId,
20
  ReadPacket,
21
} from '../interfaces';
22
import { MqttOptions } from '../interfaces/microservice-configuration.interface';
23
import { MqttRecord } from '../record-builders/mqtt.record-builder';
24
import { MqttRecordSerializer } from '../serializers/mqtt-record.serializer';
1✔
25
import { Server } from './server';
1✔
26

27
let mqttPackage: any = {};
1✔
28

29
export class ServerMqtt extends Server implements CustomTransportStrategy {
1✔
30
  public readonly transportId = Transport.MQTT;
21✔
31

32
  protected mqttClient: MqttClient;
33

34
  private readonly url: string;
35

36
  constructor(private readonly options: MqttOptions['options']) {
21✔
37
    super();
21✔
38
    this.url = this.getOptionsProp(options, 'url') || MQTT_DEFAULT_URL;
21✔
39

40
    mqttPackage = this.loadPackage('mqtt', ServerMqtt.name, () =>
21✔
41
      require('mqtt'),
21✔
42
    );
43

44
    this.initializeSerializer(options);
21✔
45
    this.initializeDeserializer(options);
21✔
46
  }
47

48
  public async listen(
49
    callback: (err?: unknown, ...optionalParams: unknown[]) => void,
50
  ) {
51
    try {
4✔
52
      this.mqttClient = this.createMqttClient();
4✔
53
      this.start(callback);
4✔
54
    } catch (err) {
55
      callback(err);
1✔
56
    }
57
  }
58

59
  public start(
60
    callback: (err?: unknown, ...optionalParams: unknown[]) => void,
61
  ) {
62
    this.handleError(this.mqttClient);
3✔
63
    this.bindEvents(this.mqttClient);
3✔
64

65
    this.mqttClient.on(CONNECT_EVENT, () => callback());
3✔
66
  }
67

68
  public bindEvents(mqttClient: MqttClient) {
69
    mqttClient.on(MESSAGE_EVENT, this.getMessageHandler(mqttClient).bind(this));
4✔
70

71
    const registeredPatterns = [...this.messageHandlers.keys()];
4✔
72
    registeredPatterns.forEach(pattern => {
4✔
73
      const { isEventHandler } = this.messageHandlers.get(pattern);
1✔
74
      mqttClient.subscribe(
1✔
75
        isEventHandler ? pattern : this.getRequestPattern(pattern),
1!
76
        this.getOptionsProp(this.options, 'subscribeOptions'),
77
      );
78
    });
79
  }
80

81
  public close() {
82
    this.mqttClient && this.mqttClient.end();
1✔
83
  }
84

85
  public createMqttClient(): MqttClient {
UNCOV
86
    return mqttPackage.connect(this.url, this.options as MqttOptions);
×
87
  }
88

89
  public getMessageHandler(pub: MqttClient): Function {
90
    return async (
6✔
91
      channel: string,
92
      buffer: Buffer,
93
      originalPacket?: Record<string, any>,
94
    ) => this.handleMessage(channel, buffer, pub, originalPacket);
1✔
95
  }
96

97
  public async handleMessage(
98
    channel: string,
99
    buffer: Buffer,
100
    pub: MqttClient,
101
    originalPacket?: Record<string, any>,
102
  ): Promise<any> {
103
    const rawPacket = this.parseMessage(buffer.toString());
3✔
104
    const packet = await this.deserializer.deserialize(rawPacket, { channel });
3✔
105
    const mqttContext = new MqttContext([channel, originalPacket]);
3✔
106
    if (isUndefined((packet as IncomingRequest).id)) {
3✔
107
      return this.handleEvent(channel, packet, mqttContext);
1✔
108
    }
109
    const publish = this.getPublisher(
2✔
110
      pub,
111
      channel,
112
      (packet as IncomingRequest).id,
113
    );
114
    const handler = this.getHandlerByPattern(channel);
2✔
115

116
    if (!handler) {
2✔
117
      const status = 'error';
1✔
118
      const noHandlerPacket = {
1✔
119
        id: (packet as IncomingRequest).id,
120
        status,
121
        err: NO_MESSAGE_HANDLER,
122
      };
123
      return publish(noHandlerPacket);
1✔
124
    }
125
    const response$ = this.transformToObservable(
1✔
126
      await handler(packet.data, mqttContext),
127
    );
128
    response$ && this.send(response$, publish);
1✔
129
  }
130

131
  public getPublisher(client: MqttClient, pattern: any, id: string): any {
132
    return (response: any) => {
3✔
133
      Object.assign(response, { id });
1✔
134
      const outgoingResponse: Partial<MqttRecord> =
135
        this.serializer.serialize(response);
1✔
136
      const options = outgoingResponse.options;
1✔
137
      delete outgoingResponse.options;
1✔
138

139
      return client.publish(
1✔
140
        this.getReplyPattern(pattern),
141
        JSON.stringify(outgoingResponse),
142
        options,
143
      );
144
    };
145
  }
146

147
  public parseMessage(content: any): ReadPacket & PacketId {
148
    try {
5✔
149
      return JSON.parse(content);
5✔
150
    } catch (e) {
151
      return content;
2✔
152
    }
153
  }
154

155
  public matchMqttPattern(pattern: string, topic: string) {
156
    const patternSegments = pattern.split(MQTT_SEPARATOR);
8✔
157
    const topicSegments = topic.split(MQTT_SEPARATOR);
8✔
158

159
    const patternSegmentsLength = patternSegments.length;
8✔
160
    const topicSegmentsLength = topicSegments.length;
8✔
161
    const lastIndex = patternSegmentsLength - 1;
8✔
162

163
    for (let i = 0; i < patternSegmentsLength; i++) {
8✔
164
      const currentPattern = patternSegments[i];
21✔
165
      const patternChar = currentPattern[0];
21✔
166
      const currentTopic = topicSegments[i];
21✔
167

168
      if (!currentTopic && !currentPattern) {
21!
UNCOV
169
        continue;
×
170
      }
171
      if (!currentTopic && currentPattern !== MQTT_WILDCARD_ALL) {
21!
172
        return false;
×
173
      }
174
      if (patternChar === MQTT_WILDCARD_ALL) {
21✔
175
        return i === lastIndex;
3✔
176
      }
177
      if (
18✔
178
        patternChar !== MQTT_WILDCARD_SINGLE &&
33✔
179
        currentPattern !== currentTopic
180
      ) {
181
        return false;
3✔
182
      }
183
    }
184
    return patternSegmentsLength === topicSegmentsLength;
2✔
185
  }
186

187
  public getHandlerByPattern(pattern: string): MessageHandler | null {
188
    const route = this.getRouteFromPattern(pattern);
4✔
189
    if (this.messageHandlers.has(route)) {
4✔
190
      return this.messageHandlers.get(route) || null;
2!
191
    }
192

193
    for (const [key, value] of this.messageHandlers) {
2✔
UNCOV
194
      const keyWithoutSharedPrefix = this.removeHandlerKeySharedPrefix(key);
×
UNCOV
195
      if (this.matchMqttPattern(keyWithoutSharedPrefix, route)) {
×
UNCOV
196
        return value;
×
197
      }
198
    }
199
    return null;
2✔
200
  }
201

202
  public removeHandlerKeySharedPrefix(handlerKey: string) {
UNCOV
203
    return handlerKey && handlerKey.startsWith('$share')
×
204
      ? handlerKey.split('/').slice(2).join('/')
×
205
      : handlerKey;
206
  }
207

208
  public getRequestPattern(pattern: string): string {
209
    return pattern;
2✔
210
  }
211

212
  public getReplyPattern(pattern: string): string {
213
    return `${pattern}/reply`;
2✔
214
  }
215

216
  public handleError(stream: any) {
217
    stream.on(ERROR_EVENT, (err: any) => this.logger.error(err));
3✔
218
  }
219

220
  protected initializeSerializer(options: MqttOptions['options']) {
221
    this.serializer = options?.serializer ?? new MqttRecordSerializer();
21✔
222
  }
223
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc