• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

nestjs / nest / f7142b24-e394-4028-89c6-717e281431d1

25 Nov 2024 01:12PM UTC coverage: 90.141% (-1.8%) from 91.989%
f7142b24-e394-4028-89c6-717e281431d1

Pull #14177

circleci

web-flow
Merge pull request #14200 from nestjs/feat/allow-queue-per-handler

feat(microservices): support nats queue per handler
Pull Request #14177: release: version 11.0.0

2612 of 3236 branches covered (80.72%)

496 of 704 new or added lines in 48 files covered. (70.45%)

79 existing lines in 11 files now uncovered.

6985 of 7749 relevant lines covered (90.14%)

16.31 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

78.05
/packages/microservices/client/client-redis.ts
1
import { Logger } from '@nestjs/common/services/logger.service';
1✔
2
import { loadPackage } from '@nestjs/common/utils/load-package.util';
1✔
3
import { REDIS_DEFAULT_HOST, REDIS_DEFAULT_PORT } from '../constants';
1✔
4
import {
5
  RedisEvents,
6
  RedisEventsMap,
7
  RedisStatus,
8
} from '../events/redis.events';
9
import { ReadPacket, RedisOptions, WritePacket } from '../interfaces';
10
import { ClientProxy } from './client-proxy';
1✔
11

12
// To enable type safety for Redis. This cant be uncommented by default
13
// because it would require the user to install the ioredis package even if they dont use Redis
14
// Otherwise, TypeScript would fail to compile the code.
15
//
16
// type Redis = import('ioredis').Redis;
17
type Redis = any;
18

19
let redisPackage = {} as any;
1✔
20

21
/**
22
 * @publicApi
23
 */
24
export class ClientRedis extends ClientProxy<RedisEvents, RedisStatus> {
1✔
25
  protected readonly logger = new Logger(ClientProxy.name);
2✔
26
  protected readonly subscriptionsCount = new Map<string, number>();
2✔
27
  protected pubClient: Redis;
28
  protected subClient: Redis;
29
  protected connectionPromise: Promise<any>;
30
  protected isManuallyClosed = false;
2✔
31
  protected wasInitialConnectionSuccessful = false;
2✔
32
  protected pendingEventListeners: Array<{
2✔
33
    event: keyof RedisEvents;
34
    callback: RedisEvents[keyof RedisEvents];
35
  }> = [];
36

37
  constructor(protected readonly options: RedisOptions['options']) {
2✔
38
    super();
2✔
39

40
    redisPackage = loadPackage('ioredis', ClientRedis.name, () =>
2✔
41
      require('ioredis'),
2✔
42
    );
43

44
    this.initializeSerializer(options);
2✔
45
    this.initializeDeserializer(options);
2✔
46
  }
47

48
  public getRequestPattern(pattern: string): string {
49
    return pattern;
5✔
50
  }
51

52
  public getReplyPattern(pattern: string): string {
53
    return `${pattern}.reply`;
3✔
54
  }
55

56
  public close() {
57
    this.pubClient && this.pubClient.quit();
4✔
58
    this.subClient && this.subClient.quit();
4✔
59
    this.pubClient = this.subClient = null;
4✔
60
    this.isManuallyClosed = true;
4✔
61
    this.pendingEventListeners = [];
4✔
62
  }
63

64
  public async connect(): Promise<any> {
65
    if (this.pubClient && this.subClient) {
2!
NEW
66
      return this.connectionPromise;
×
67
    }
68
    this.pubClient = this.createClient();
2✔
69
    this.subClient = this.createClient();
2✔
70

71
    [this.pubClient, this.subClient].forEach((client, index) => {
2✔
72
      const type = index === 0 ? 'pub' : 'sub';
4✔
73
      this.registerErrorListener(client);
4✔
74
      this.registerReconnectListener(client);
4✔
75
      this.registerReadyListener(client);
4✔
76
      this.registerEndListener(client);
4✔
77
      this.pendingEventListeners.forEach(({ event, callback }) =>
4✔
NEW
78
        client.on(event, (...args: [any]) => callback(type, ...args)),
×
79
      );
80
    });
81
    this.pendingEventListeners = [];
2✔
82

83
    this.connectionPromise = Promise.all([
2✔
84
      this.subClient.connect(),
85
      this.pubClient.connect(),
86
    ]);
NEW
87
    await this.connectionPromise;
×
NEW
88
    return this.connectionPromise;
×
89
  }
90

91
  public createClient(): Redis {
92
    return new redisPackage({
×
93
      host: REDIS_DEFAULT_HOST,
94
      port: REDIS_DEFAULT_PORT,
95
      ...this.getClientOptions(),
96
      lazyConnect: true,
97
    });
98
  }
99

100
  public registerErrorListener(client: Redis) {
101
    client.addListener(RedisEventsMap.ERROR, (err: any) =>
5✔
102
      this.logger.error(err),
1✔
103
    );
104
  }
105

106
  public registerReconnectListener(client: {
107
    on: (event: string, fn: () => void) => void;
108
  }) {
109
    client.on(RedisEventsMap.RECONNECTING, () => {
5✔
110
      if (this.isManuallyClosed) {
1!
111
        return;
1✔
112
      }
113

NEW
114
      this.connectionPromise = Promise.reject(
×
115
        'Error: Connection lost. Trying to reconnect...',
116
      );
117

118
      // Prevent unhandled rejections
NEW
119
      this.connectionPromise.catch(() => {});
×
120

NEW
121
      this._status$.next(RedisStatus.RECONNECTING);
×
122

NEW
123
      if (this.wasInitialConnectionSuccessful) {
×
NEW
124
        this.logger.log('Reconnecting to Redis...');
×
125
      }
126
    });
127
  }
128

129
  public registerReadyListener(client: {
130
    on: (event: string, fn: () => void) => void;
131
  }) {
132
    client.on(RedisEventsMap.READY, () => {
5✔
133
      this.connectionPromise = Promise.resolve();
1✔
134
      this._status$.next(RedisStatus.CONNECTED);
1✔
135

136
      this.logger.log('Connected to Redis. Subscribing to channels...');
1✔
137

138
      if (!this.wasInitialConnectionSuccessful) {
1!
139
        this.wasInitialConnectionSuccessful = true;
1✔
140
        this.subClient.on('message', this.createResponseCallback());
1✔
141
      }
142
    });
143
  }
144

145
  public registerEndListener(client: {
146
    on: (event: string, fn: () => void) => void;
147
  }) {
148
    client.on('end', () => {
5✔
149
      if (this.isManuallyClosed) {
1!
150
        return;
1✔
151
      }
NEW
152
      this._status$.next(RedisStatus.DISCONNECTED);
×
153

NEW
154
      if (this.getOptionsProp(this.options, 'retryAttempts') === undefined) {
×
155
        // When retryAttempts is not specified, the connection will not be re-established
NEW
156
        this.logger.error('Disconnected from Redis.');
×
157

158
        // Clean up client instances and just recreate them when connect is called
NEW
159
        this.pubClient = this.subClient = null;
×
160
      } else {
NEW
161
        this.logger.error('Disconnected from Redis.');
×
NEW
162
        this.connectionPromise = Promise.reject(
×
163
          'Error: Connection lost. Trying to reconnect...',
164
        );
165

166
        // Prevent unhandled rejections
NEW
167
        this.connectionPromise.catch(() => {});
×
168
      }
169
    });
170
  }
171

172
  public getClientOptions(): Partial<RedisOptions['options']> {
173
    const retryStrategy = (times: number) => this.createRetryStrategy(times);
1✔
174

175
    return {
1✔
176
      ...(this.options || {}),
1!
177
      retryStrategy,
178
    };
179
  }
180

181
  public on<
182
    EventKey extends keyof RedisEvents = keyof RedisEvents,
183
    EventCallback extends RedisEvents[EventKey] = RedisEvents[EventKey],
184
  >(event: EventKey, callback: EventCallback) {
NEW
185
    if (this.subClient && this.pubClient) {
×
NEW
186
      this.subClient.on(event, (...args: [any]) => callback('sub', ...args));
×
NEW
187
      this.pubClient.on(event, (...args: [any]) => callback('pub', ...args));
×
188
    } else {
NEW
189
      this.pendingEventListeners.push({ event, callback });
×
190
    }
191
  }
192

193
  public unwrap<T>(): T {
NEW
194
    if (!this.pubClient || !this.subClient) {
×
NEW
195
      throw new Error(
×
196
        'Not initialized. Please call the "connect" method first.',
197
      );
198
    }
NEW
199
    return [this.pubClient, this.subClient] as T;
×
200
  }
201

202
  public createRetryStrategy(times: number): undefined | number {
203
    if (this.isManuallyClosed) {
5✔
204
      return undefined;
2✔
205
    }
206
    if (!this.getOptionsProp(this.options, 'retryAttempts')) {
3✔
207
      this.logger.error(
2✔
208
        'Redis connection closed and retry attempts not specified',
209
      );
210
      return;
2✔
211
    }
212
    if (times > this.getOptionsProp(this.options, 'retryAttempts')) {
1!
UNCOV
213
      this.logger.error('Retry time exhausted');
×
UNCOV
214
      return;
×
215
    }
216
    return this.getOptionsProp(this.options, 'retryDelay') ?? 5000;
1!
217
  }
218

219
  public createResponseCallback(): (
220
    channel: string,
221
    buffer: string,
222
  ) => Promise<void> {
223
    return async (channel: string, buffer: string) => {
4✔
224
      const packet = JSON.parse(buffer);
3✔
225
      const { err, response, isDisposed, id } =
226
        await this.deserializer.deserialize(packet);
3✔
227

228
      const callback = this.routingMap.get(id);
3✔
229
      if (!callback) {
3!
230
        return;
×
231
      }
232
      if (isDisposed || err) {
3✔
233
        return callback({
1✔
234
          err,
235
          response,
236
          isDisposed: true,
237
        });
238
      }
239
      callback({
2✔
240
        err,
241
        response,
242
      });
243
    };
244
  }
245

246
  protected publish(
247
    partialPacket: ReadPacket,
248
    callback: (packet: WritePacket) => any,
249
  ): () => void {
250
    try {
5✔
251
      const packet = this.assignPacketId(partialPacket);
5✔
252
      const pattern = this.normalizePattern(partialPacket.pattern);
4✔
253
      const serializedPacket = this.serializer.serialize(packet);
4✔
254
      const responseChannel = this.getReplyPattern(pattern);
4✔
255
      let subscriptionsCount =
256
        this.subscriptionsCount.get(responseChannel) || 0;
4✔
257

258
      const publishPacket = () => {
4✔
259
        subscriptionsCount = this.subscriptionsCount.get(responseChannel) || 0;
4✔
260
        this.subscriptionsCount.set(responseChannel, subscriptionsCount + 1);
4✔
261
        this.routingMap.set(packet.id, callback);
4✔
262
        this.pubClient.publish(
4✔
263
          this.getRequestPattern(pattern),
264
          JSON.stringify(serializedPacket),
265
        );
266
      };
267

268
      if (subscriptionsCount <= 0) {
4✔
269
        this.subClient.subscribe(
3✔
270
          responseChannel,
271
          (err: any) => !err && publishPacket(),
3✔
272
        );
273
      } else {
274
        publishPacket();
1✔
275
      }
276

277
      return () => {
4✔
278
        this.unsubscribeFromChannel(responseChannel);
2✔
279
        this.routingMap.delete(packet.id);
2✔
280
      };
281
    } catch (err) {
282
      callback({ err });
1✔
283
    }
284
  }
285

286
  protected dispatchEvent(packet: ReadPacket): Promise<any> {
287
    const pattern = this.normalizePattern(packet.pattern);
2✔
288
    const serializedPacket = this.serializer.serialize(packet);
2✔
289

290
    return new Promise<void>((resolve, reject) =>
2✔
291
      this.pubClient.publish(pattern, JSON.stringify(serializedPacket), err =>
2✔
292
        err ? reject(err) : resolve(),
2✔
293
      ),
294
    );
295
  }
296

297
  protected unsubscribeFromChannel(channel: string) {
298
    const subscriptionCount = this.subscriptionsCount.get(channel);
2✔
299
    this.subscriptionsCount.set(channel, subscriptionCount - 1);
2✔
300

301
    if (subscriptionCount - 1 <= 0) {
2!
302
      this.subClient.unsubscribe(channel);
2✔
303
    }
304
  }
305
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc