• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

nestjs / nest / f7142b24-e394-4028-89c6-717e281431d1

25 Nov 2024 01:12PM UTC coverage: 90.141% (-1.8%) from 91.989%
f7142b24-e394-4028-89c6-717e281431d1

Pull #14177

circleci

web-flow
Merge pull request #14200 from nestjs/feat/allow-queue-per-handler

feat(microservices): support nats queue per handler
Pull Request #14177: release: version 11.0.0

2612 of 3236 branches covered (80.72%)

496 of 704 new or added lines in 48 files covered. (70.45%)

79 existing lines in 11 files now uncovered.

6985 of 7749 relevant lines covered (90.14%)

16.31 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

75.7
/packages/microservices/server/server-redis.ts
1
import { isUndefined } from '@nestjs/common/utils/shared.utils';
1✔
2
import {
1✔
3
  NO_MESSAGE_HANDLER,
4
  REDIS_DEFAULT_HOST,
5
  REDIS_DEFAULT_PORT,
6
} from '../constants';
7
import { RedisContext } from '../ctx-host';
1✔
8
import { Transport } from '../enums';
1✔
9
import {
10
  RedisEvents,
11
  RedisEventsMap,
12
  RedisStatus,
13
} from '../events/redis.events';
14
import { IncomingRequest, RedisOptions } from '../interfaces';
15
import { Server } from './server';
1✔
16

17
// To enable type safety for Redis. This cant be uncommented by default
18
// because it would require the user to install the ioredis package even if they dont use Redis
19
// Otherwise, TypeScript would fail to compile the code.
20
//
21
// type Redis = import('ioredis').Redis;
22
type Redis = any;
23

24
let redisPackage = {} as any;
1✔
25

26
/**
27
 * @publicApi
28
 */
29
export class ServerRedis extends Server<RedisEvents, RedisStatus> {
1✔
30
  public readonly transportId = Transport.REDIS;
26✔
31

32
  protected subClient: Redis;
33
  protected pubClient: Redis;
34
  protected isManuallyClosed = false;
26✔
35
  protected wasInitialConnectionSuccessful = false;
26✔
36
  protected pendingEventListeners: Array<{
26✔
37
    event: keyof RedisEvents;
38
    callback: RedisEvents[keyof RedisEvents];
39
  }> = [];
40

41
  constructor(protected readonly options: RedisOptions['options']) {
26✔
42
    super();
26✔
43

44
    redisPackage = this.loadPackage('ioredis', ServerRedis.name, () =>
26✔
45
      require('ioredis'),
26✔
46
    );
47

48
    this.initializeSerializer(options);
26✔
49
    this.initializeDeserializer(options);
26✔
50
  }
51

52
  public listen(
53
    callback: (err?: unknown, ...optionalParams: unknown[]) => void,
54
  ) {
55
    try {
3✔
56
      this.subClient = this.createRedisClient();
3✔
57
      this.pubClient = this.createRedisClient();
3✔
58

59
      [this.subClient, this.pubClient].forEach((client, index) => {
3✔
60
        const type = index === 0 ? 'pub' : 'sub';
6✔
61
        this.registerErrorListener(client);
6✔
62
        this.registerReconnectListener(client);
6✔
63
        this.registerReadyListener(client);
6✔
64
        this.registerEndListener(client);
6✔
65
        this.pendingEventListeners.forEach(({ event, callback }) =>
6✔
NEW
66
          client.on(event, (...args: [any]) => callback(type, ...args)),
×
67
        );
68
      });
69
      this.pendingEventListeners = [];
3✔
70

71
      this.start(callback);
3✔
72
    } catch (err) {
73
      callback(err);
1✔
74
    }
75
  }
76

77
  public start(callback?: () => void) {
78
    Promise.all([this.subClient.connect(), this.pubClient.connect()])
2✔
79
      .then(() => {
80
        this.bindEvents(this.subClient, this.pubClient);
2✔
81
        callback();
2✔
82
      })
83
      .catch(callback);
84
  }
85

86
  public bindEvents(subClient: Redis, pubClient: Redis) {
87
    subClient.on(
6✔
88
      this.options?.wildcards ? 'pmessage' : 'message',
6✔
89
      this.getMessageHandler(pubClient).bind(this),
90
    );
91
    const subscribePatterns = [...this.messageHandlers.keys()];
6✔
92
    subscribePatterns.forEach(pattern => {
6✔
93
      const { isEventHandler } = this.messageHandlers.get(pattern);
2✔
94

95
      const channel = isEventHandler
2✔
96
        ? pattern
2!
97
        : this.getRequestPattern(pattern);
98

99
      if (this.options?.wildcards) {
2✔
100
        subClient.psubscribe(channel);
1✔
101
      } else {
102
        subClient.subscribe(channel);
1✔
103
      }
104
    });
105
  }
106

107
  public close() {
108
    this.isManuallyClosed = true;
1✔
109
    this.pubClient && this.pubClient.quit();
1✔
110
    this.subClient && this.subClient.quit();
1✔
111
    this.pendingEventListeners = [];
1✔
112
  }
113

114
  public createRedisClient(): Redis {
115
    return new redisPackage({
×
116
      port: REDIS_DEFAULT_PORT,
117
      host: REDIS_DEFAULT_HOST,
118
      ...this.getClientOptions(),
119
      lazyConnect: true,
120
    });
121
  }
122

123
  public getMessageHandler(pub: Redis) {
124
    return this.options?.wildcards
7✔
125
      ? (channel: string, pattern: string, buffer: string | any) =>
7✔
126
          this.handleMessage(channel, buffer, pub, pattern)
×
127
      : (channel: string, buffer: string | any) =>
128
          this.handleMessage(channel, buffer, pub, channel);
×
129
  }
130

131
  public async handleMessage(
132
    channel: string,
133
    buffer: string | any,
134
    pub: Redis,
135
    pattern: string,
136
  ) {
137
    const rawMessage = this.parseMessage(buffer);
3✔
138
    const packet = await this.deserializer.deserialize(rawMessage, { channel });
3✔
139
    const redisCtx = new RedisContext([pattern]);
3✔
140

141
    if (isUndefined((packet as IncomingRequest).id)) {
3✔
142
      return this.handleEvent(channel, packet, redisCtx);
1✔
143
    }
144
    const publish = this.getPublisher(
2✔
145
      pub,
146
      channel,
147
      (packet as IncomingRequest).id,
148
    );
149
    const handler = this.getHandlerByPattern(channel);
2✔
150

151
    if (!handler) {
2✔
152
      const status = 'error';
1✔
153
      const noHandlerPacket = {
1✔
154
        id: (packet as IncomingRequest).id,
155
        status,
156
        err: NO_MESSAGE_HANDLER,
157
      };
158
      return publish(noHandlerPacket);
1✔
159
    }
160
    const response$ = this.transformToObservable(
1✔
161
      await handler(packet.data, redisCtx),
162
    );
163
    response$ && this.send(response$, publish);
1✔
164
  }
165

166
  public getPublisher(pub: Redis, pattern: any, id: string) {
167
    return (response: any) => {
3✔
168
      Object.assign(response, { id });
1✔
169
      const outgoingResponse = this.serializer.serialize(response);
1✔
170

171
      return pub.publish(
1✔
172
        this.getReplyPattern(pattern),
173
        JSON.stringify(outgoingResponse),
174
      );
175
    };
176
  }
177

178
  public parseMessage(content: any): Record<string, any> {
179
    try {
2✔
180
      return JSON.parse(content);
2✔
181
    } catch (e) {
182
      return content;
2✔
183
    }
184
  }
185

186
  public getRequestPattern(pattern: string): string {
187
    return pattern;
3✔
188
  }
189

190
  public getReplyPattern(pattern: string): string {
191
    return `${pattern}.reply`;
2✔
192
  }
193

194
  public registerErrorListener(client: any) {
195
    client.on(RedisEventsMap.ERROR, (err: any) => this.logger.error(err));
6✔
196
  }
197

198
  public registerReconnectListener(client: {
199
    on: (event: string, fn: () => void) => void;
200
  }) {
201
    client.on(RedisEventsMap.RECONNECTING, () => {
6✔
NEW
202
      if (this.isManuallyClosed) {
×
NEW
203
        return;
×
204
      }
NEW
205
      this._status$.next(RedisStatus.RECONNECTING);
×
206

NEW
207
      if (this.wasInitialConnectionSuccessful) {
×
NEW
208
        this.logger.log('Reconnecting to Redis...');
×
209
      }
210
    });
211
  }
212

213
  public registerReadyListener(client: {
214
    on: (event: string, fn: () => void) => void;
215
  }) {
216
    client.on(RedisEventsMap.READY, () => {
6✔
NEW
217
      this._status$.next(RedisStatus.CONNECTED);
×
218

NEW
219
      this.logger.log('Connected to Redis. Subscribing to channels...');
×
220

NEW
221
      if (!this.wasInitialConnectionSuccessful) {
×
NEW
222
        this.wasInitialConnectionSuccessful = true;
×
223
      }
224
    });
225
  }
226

227
  public registerEndListener(client: {
228
    on: (event: string, fn: () => void) => void;
229
  }) {
230
    client.on('end', () => {
6✔
NEW
231
      if (this.isManuallyClosed) {
×
NEW
232
        return;
×
233
      }
NEW
234
      this._status$.next(RedisStatus.DISCONNECTED);
×
235

NEW
236
      this.logger.error(
×
237
        'Disconnected from Redis. No further reconnection attempts will be made.',
238
      );
239
    });
240
  }
241

242
  public getClientOptions(): Partial<RedisOptions['options']> {
243
    const retryStrategy = (times: number) => this.createRetryStrategy(times);
1✔
244

245
    return {
1✔
246
      ...(this.options || {}),
1!
247
      retryStrategy,
248
    };
249
  }
250

251
  public createRetryStrategy(times: number): undefined | number | void {
252
    if (this.isManuallyClosed) {
5✔
253
      return undefined;
1✔
254
    }
255
    if (!this.getOptionsProp(this.options, 'retryAttempts')) {
4✔
256
      this.logger.error(
3✔
257
        'Redis connection closed and retry attempts not specified',
258
      );
259
      return;
3✔
260
    }
261
    if (times > this.getOptionsProp(this.options, 'retryAttempts')) {
1!
UNCOV
262
      this.logger.error(`Retry time exhausted`);
×
UNCOV
263
      return;
×
264
    }
265
    return this.getOptionsProp(this.options, 'retryDelay') ?? 5000;
1!
266
  }
267

268
  public unwrap<T>(): T {
NEW
269
    if (!this.pubClient || !this.subClient) {
×
NEW
270
      throw new Error(
×
271
        'Not initialized. Please call the "listen"/"startAllMicroservices" method before accessing the server.',
272
      );
273
    }
NEW
274
    return [this.pubClient, this.subClient] as T;
×
275
  }
276

277
  public on<
278
    EventKey extends keyof RedisEvents = keyof RedisEvents,
279
    EventCallback extends RedisEvents[EventKey] = RedisEvents[EventKey],
280
  >(event: EventKey, callback: EventCallback) {
NEW
281
    if (this.subClient && this.pubClient) {
×
NEW
282
      this.subClient.on(event, (...args: [any]) => callback('sub', ...args));
×
NEW
283
      this.pubClient.on(event, (...args: [any]) => callback('pub', ...args));
×
284
    } else {
NEW
285
      this.pendingEventListeners.push({ event, callback });
×
286
    }
287
  }
288
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc