• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

nestjs / nest / de00f050-29aa-4a07-94cd-4a5c3beeb9b5

05 Dec 2023 08:46AM UTC coverage: 92.992% (+0.7%) from 92.258%
de00f050-29aa-4a07-94cd-4a5c3beeb9b5

Pull #12880

circleci

이정현B
feat(core): add option to bind global
Pull Request #12880: fix(core): always bind global modules when scan for modules

2798 of 3460 branches covered (0.0%)

7 of 8 new or added lines in 1 file covered. (87.5%)

199 existing lines in 30 files now uncovered.

6555 of 7049 relevant lines covered (92.99%)

17.99 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

97.01
/packages/microservices/server/server-redis.ts
1
import { isUndefined } from '@nestjs/common/utils/shared.utils';
1✔
2
import {
1✔
3
  ERROR_EVENT,
4
  MESSAGE_EVENT,
5
  NO_MESSAGE_HANDLER,
6
  REDIS_DEFAULT_HOST,
7
  REDIS_DEFAULT_PORT,
8
} from '../constants';
9
import { RedisContext } from '../ctx-host';
1✔
10
import { Transport } from '../enums';
1✔
11
import {
12
  CustomTransportStrategy,
13
  IncomingRequest,
14
  RedisOptions,
15
} from '../interfaces';
16
import { Server } from './server';
1✔
17

18
type Redis = any;
19

20
let redisPackage = {} as any;
1✔
21

22
export class ServerRedis extends Server implements CustomTransportStrategy {
1✔
23
  public readonly transportId = Transport.REDIS;
24✔
24

25
  private subClient: Redis;
26
  private pubClient: Redis;
27
  private isExplicitlyTerminated = false;
24✔
28

29
  constructor(private readonly options: RedisOptions['options']) {
24✔
30
    super();
24✔
31

32
    redisPackage = this.loadPackage('ioredis', ServerRedis.name, () =>
24✔
33
      require('ioredis'),
24✔
34
    );
35

36
    this.initializeSerializer(options);
24✔
37
    this.initializeDeserializer(options);
24✔
38
  }
39

40
  public listen(
41
    callback: (err?: unknown, ...optionalParams: unknown[]) => void,
42
  ) {
43
    try {
3✔
44
      this.subClient = this.createRedisClient();
3✔
45
      this.pubClient = this.createRedisClient();
3✔
46

47
      this.handleError(this.pubClient);
3✔
48
      this.handleError(this.subClient);
3✔
49

50
      this.start(callback);
3✔
51
    } catch (err) {
52
      callback(err);
1✔
53
    }
54
  }
55

56
  public start(callback?: () => void) {
57
    Promise.all([this.subClient.connect(), this.pubClient.connect()])
2✔
58
      .then(() => {
59
        this.bindEvents(this.subClient, this.pubClient);
2✔
60
        callback();
2✔
61
      })
62
      .catch(callback);
63
  }
64

65
  public bindEvents(subClient: Redis, pubClient: Redis) {
66
    subClient.on(MESSAGE_EVENT, this.getMessageHandler(pubClient).bind(this));
4✔
67
    const subscribePatterns = [...this.messageHandlers.keys()];
4✔
68
    subscribePatterns.forEach(pattern => {
4✔
69
      const { isEventHandler } = this.messageHandlers.get(pattern);
1✔
70
      subClient.subscribe(
1✔
71
        isEventHandler ? pattern : this.getRequestPattern(pattern),
1!
72
      );
73
    });
74
  }
75

76
  public close() {
77
    this.isExplicitlyTerminated = true;
1✔
78
    this.pubClient && this.pubClient.quit();
1✔
79
    this.subClient && this.subClient.quit();
1✔
80
  }
81

82
  public createRedisClient(): Redis {
UNCOV
83
    return new redisPackage({
×
84
      port: REDIS_DEFAULT_PORT,
85
      host: REDIS_DEFAULT_HOST,
86
      ...this.getClientOptions(),
87
      lazyConnect: true,
88
    });
89
  }
90

91
  public getMessageHandler(pub: Redis) {
92
    return async (channel: string, buffer: string | any) =>
5✔
93
      this.handleMessage(channel, buffer, pub);
×
94
  }
95

96
  public async handleMessage(
97
    channel: string,
98
    buffer: string | any,
99
    pub: Redis,
100
  ) {
101
    const rawMessage = this.parseMessage(buffer);
3✔
102
    const packet = await this.deserializer.deserialize(rawMessage, { channel });
3✔
103
    const redisCtx = new RedisContext([channel]);
3✔
104

105
    if (isUndefined((packet as IncomingRequest).id)) {
3✔
106
      return this.handleEvent(channel, packet, redisCtx);
1✔
107
    }
108
    const publish = this.getPublisher(
2✔
109
      pub,
110
      channel,
111
      (packet as IncomingRequest).id,
112
    );
113
    const handler = this.getHandlerByPattern(channel);
2✔
114

115
    if (!handler) {
2✔
116
      const status = 'error';
1✔
117
      const noHandlerPacket = {
1✔
118
        id: (packet as IncomingRequest).id,
119
        status,
120
        err: NO_MESSAGE_HANDLER,
121
      };
122
      return publish(noHandlerPacket);
1✔
123
    }
124
    const response$ = this.transformToObservable(
1✔
125
      await handler(packet.data, redisCtx),
126
    );
127
    response$ && this.send(response$, publish);
1✔
128
  }
129

130
  public getPublisher(pub: Redis, pattern: any, id: string) {
131
    return (response: any) => {
3✔
132
      Object.assign(response, { id });
1✔
133
      const outgoingResponse = this.serializer.serialize(response);
1✔
134

135
      return pub.publish(
1✔
136
        this.getReplyPattern(pattern),
137
        JSON.stringify(outgoingResponse),
138
      );
139
    };
140
  }
141

142
  public parseMessage(content: any): Record<string, any> {
143
    try {
2✔
144
      return JSON.parse(content);
2✔
145
    } catch (e) {
146
      return content;
2✔
147
    }
148
  }
149

150
  public getRequestPattern(pattern: string): string {
151
    return pattern;
2✔
152
  }
153

154
  public getReplyPattern(pattern: string): string {
155
    return `${pattern}.reply`;
2✔
156
  }
157

158
  public handleError(stream: any) {
159
    stream.on(ERROR_EVENT, (err: any) => this.logger.error(err));
6✔
160
  }
161

162
  public getClientOptions(): Partial<RedisOptions['options']> {
163
    const retryStrategy = (times: number) => this.createRetryStrategy(times);
1✔
164

165
    return {
1✔
166
      ...(this.options || {}),
1!
167
      retryStrategy,
168
    };
169
  }
170

171
  public createRetryStrategy(times: number): undefined | number | void {
172
    if (this.isExplicitlyTerminated) {
5✔
173
      return undefined;
1✔
174
    }
175
    if (
4✔
176
      !this.getOptionsProp(this.options, 'retryAttempts') ||
5✔
177
      times > this.getOptionsProp(this.options, 'retryAttempts')
178
    ) {
179
      this.logger.error(`Retry time exhausted`);
3✔
180
      return;
3✔
181
    }
182
    return this.getOptionsProp(this.options, 'retryDelay') || 0;
1!
183
  }
184
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc