• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

nestjs / nest / f7142b24-e394-4028-89c6-717e281431d1

25 Nov 2024 01:12PM UTC coverage: 90.141% (-1.8%) from 91.989%
f7142b24-e394-4028-89c6-717e281431d1

Pull #14177

circleci

web-flow
Merge pull request #14200 from nestjs/feat/allow-queue-per-handler

feat(microservices): support nats queue per handler
Pull Request #14177: release: version 11.0.0

2612 of 3236 branches covered (80.72%)

496 of 704 new or added lines in 48 files covered. (70.45%)

79 existing lines in 11 files now uncovered.

6985 of 7749 relevant lines covered (90.14%)

16.31 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

75.95
/packages/microservices/server/server-tcp.ts
1
import { Type } from '@nestjs/common';
2
import { isString, isUndefined } from '@nestjs/common/utils/shared.utils';
1✔
3
import * as net from 'net';
1✔
4
import { Server as NetSocket, Socket } from 'net';
5
import { createServer as tlsCreateServer, TlsOptions } from 'tls';
1✔
6
import {
1✔
7
  EADDRINUSE,
8
  ECONNREFUSED,
9
  NO_MESSAGE_HANDLER,
10
  TCP_DEFAULT_HOST,
11
  TCP_DEFAULT_PORT,
12
} from '../constants';
13
import { TcpContext } from '../ctx-host/tcp.context';
1✔
14
import { Transport } from '../enums';
1✔
15
import { TcpEvents, TcpEventsMap, TcpStatus } from '../events/tcp.events';
16
import { JsonSocket, TcpSocket } from '../helpers';
1✔
17
import {
18
  IncomingRequest,
19
  PacketId,
20
  ReadPacket,
21
  WritePacket,
22
} from '../interfaces';
23
import { TcpOptions } from '../interfaces/microservice-configuration.interface';
24
import { Server } from './server';
1✔
25

26
/**
27
 * @publicApi
28
 */
29
export class ServerTCP extends Server<TcpEvents, TcpStatus> {
1✔
30
  public readonly transportId = Transport.TCP;
13✔
31

32
  protected server: NetSocket;
33
  protected readonly port: number;
34
  protected readonly host: string;
35
  protected readonly socketClass: Type<TcpSocket>;
36
  protected isManuallyTerminated = false;
13✔
37
  protected retryAttemptsCount = 0;
13✔
38
  protected tlsOptions?: TlsOptions;
39
  protected pendingEventListeners: Array<{
13✔
40
    event: keyof TcpEvents;
41
    callback: TcpEvents[keyof TcpEvents];
42
  }> = [];
43

44
  constructor(private readonly options: TcpOptions['options']) {
13✔
45
    super();
13✔
46
    this.port = this.getOptionsProp(options, 'port', TCP_DEFAULT_PORT);
13✔
47
    this.host = this.getOptionsProp(options, 'host', TCP_DEFAULT_HOST);
13✔
48
    this.socketClass = this.getOptionsProp(options, 'socketClass', JsonSocket);
13✔
49
    this.tlsOptions = this.getOptionsProp(options, 'tlsOptions');
13✔
50

51
    this.init();
13✔
52
    this.initializeSerializer(options);
13✔
53
    this.initializeDeserializer(options);
13✔
54
  }
55

56
  public listen(
57
    callback: (err?: unknown, ...optionalParams: unknown[]) => void,
58
  ) {
59
    this.server.once(TcpEventsMap.ERROR, (err: Record<string, unknown>) => {
1✔
60
      if (err?.code === EADDRINUSE || err?.code === ECONNREFUSED) {
×
NEW
61
        this._status$.next(TcpStatus.DISCONNECTED);
×
62

UNCOV
63
        return callback(err);
×
64
      }
65
    });
66
    this.server.listen(this.port, this.host, callback as () => void);
1✔
67
  }
68

69
  public close() {
70
    this.isManuallyTerminated = true;
1✔
71

72
    this.server.close();
1✔
73
    this.pendingEventListeners = [];
1✔
74
  }
75

76
  public bindHandler(socket: Socket) {
77
    const readSocket = this.getSocketInstance(socket);
1✔
78
    readSocket.on('message', async (msg: ReadPacket & PacketId) =>
1✔
79
      this.handleMessage(readSocket, msg),
×
80
    );
81
    readSocket.on(TcpEventsMap.ERROR, this.handleError.bind(this));
1✔
82
  }
83

84
  public async handleMessage(socket: TcpSocket, rawMessage: unknown) {
85
    const packet = await this.deserializer.deserialize(rawMessage);
2✔
86
    const pattern = !isString(packet.pattern)
2✔
87
      ? JSON.stringify(packet.pattern)
2!
88
      : packet.pattern;
89

90
    const tcpContext = new TcpContext([socket, pattern]);
2✔
91
    if (isUndefined((packet as IncomingRequest).id)) {
2!
92
      return this.handleEvent(pattern, packet, tcpContext);
×
93
    }
94

95
    const handler = this.getHandlerByPattern(pattern);
2✔
96
    if (!handler) {
2✔
97
      const status = 'error';
1✔
98
      const noHandlerPacket = this.serializer.serialize({
1✔
99
        id: (packet as IncomingRequest).id,
100
        status,
101
        err: NO_MESSAGE_HANDLER,
102
      });
103
      return socket.sendMessage(noHandlerPacket);
1✔
104
    }
105
    const response$ = this.transformToObservable(
1✔
106
      await handler(packet.data, tcpContext),
107
    );
108

109
    response$ &&
1✔
110
      this.send(response$, data => {
111
        Object.assign(data, { id: (packet as IncomingRequest).id });
1✔
112
        const outgoingResponse = this.serializer.serialize(
1✔
113
          data as WritePacket & PacketId,
114
        );
115
        socket.sendMessage(outgoingResponse);
1✔
116
      });
117
  }
118

119
  public handleClose(): undefined | number | NodeJS.Timer {
120
    if (
4✔
121
      this.isManuallyTerminated ||
10✔
122
      !this.getOptionsProp(this.options, 'retryAttempts') ||
123
      this.retryAttemptsCount >=
124
        this.getOptionsProp(this.options, 'retryAttempts')
125
    ) {
126
      return undefined;
3✔
127
    }
128
    ++this.retryAttemptsCount;
1✔
129
    return setTimeout(
1✔
130
      () => this.server.listen(this.port, this.host),
1✔
131
      this.getOptionsProp(this.options, 'retryDelay') || 0,
1!
132
    );
133
  }
134

135
  public unwrap<T>(): T {
NEW
136
    if (!this.server) {
×
NEW
137
      throw new Error(
×
138
        'Not initialized. Please call the "listen"/"startAllMicroservices" method before accessing the server.',
139
      );
140
    }
NEW
141
    return this.server as T;
×
142
  }
143

144
  public on<
145
    EventKey extends keyof TcpEvents = keyof TcpEvents,
146
    EventCallback extends TcpEvents[EventKey] = TcpEvents[EventKey],
147
  >(event: EventKey, callback: EventCallback) {
NEW
148
    if (this.server) {
×
NEW
149
      this.server.on(event, callback as any);
×
150
    } else {
NEW
151
      this.pendingEventListeners.push({ event, callback });
×
152
    }
153
  }
154

155
  protected init() {
156
    if (this.tlsOptions) {
13!
157
      // TLS enabled, use tls server
158
      this.server = tlsCreateServer(
×
159
        this.tlsOptions,
160
        this.bindHandler.bind(this),
161
      );
162
    } else {
163
      // TLS disabled, use net server
164
      this.server = net.createServer(this.bindHandler.bind(this));
13✔
165
    }
166
    this.registerListeningListener(this.server);
13✔
167
    this.registerErrorListener(this.server);
13✔
168
    this.registerCloseListener(this.server);
13✔
169

170
    this.pendingEventListeners.forEach(({ event, callback }) =>
13✔
NEW
171
      this.server.on(event, callback),
×
172
    );
173
    this.pendingEventListeners = [];
13✔
174
  }
175

176
  protected registerListeningListener(socket: net.Server) {
177
    socket.on(TcpEventsMap.LISTENING, () => {
13✔
178
      this._status$.next(TcpStatus.CONNECTED);
1✔
179
    });
180
  }
181

182
  protected registerErrorListener(socket: net.Server) {
183
    socket.on(TcpEventsMap.ERROR, err => {
13✔
NEW
184
      if ('code' in err && err.code === ECONNREFUSED) {
×
NEW
185
        this._status$.next(TcpStatus.DISCONNECTED);
×
186
      }
NEW
187
      this.handleError(err as any);
×
188
    });
189
  }
190

191
  protected registerCloseListener(socket: net.Server) {
192
    socket.on(TcpEventsMap.CLOSE, () => {
13✔
NEW
193
      this._status$.next(TcpStatus.DISCONNECTED);
×
NEW
194
      this.handleClose();
×
195
    });
196
  }
197

198
  protected getSocketInstance(socket: Socket): TcpSocket {
199
    return new this.socketClass(socket);
×
200
  }
201
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc