Coveralls logob
Coveralls logo
  • Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

luin / ioredis / 2437661402

3 Jun 2022 - 23:36 coverage: 91.667% (-0.08%) from 91.751%
2437661402

Pull #1596

github

GitHub
Merge bdea96ad4 into da93f1850
Pull Request #1596: chore(deps): bump npm from 8.5.4 to 8.12.1

1102 of 1255 branches covered (87.81%)

Branch coverage included in aggregate %.

2143 of 2285 relevant lines covered (93.79%)

4125.24 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

91.0
/lib/cluster/index.ts
1
import { exists, hasFlag } from "@ioredis/commands";
1×
2
import { EventEmitter } from "events";
1×
3
import { AbortError, RedisError } from "redis-errors";
1×
4
import asCallback from "standard-as-callback";
1×
5
import Command from "../Command";
1×
6
import ClusterAllFailedError from "../errors/ClusterAllFailedError";
1×
7
import Pipeline from "../Pipeline";
8
import Redis from "../Redis";
1×
9
import ScanStream from "../ScanStream";
1×
10
import { addTransactionSupport, Transaction } from "../transaction";
1×
11
import { Callback, ScanStreamOptions, WriteableStream } from "../types";
12
import {
1×
13
  CONNECTION_CLOSED_ERROR_MSG,
14
  Debug,
15
  defaults,
16
  noop,
17
  sample,
18
  shuffle,
19
  timeout,
20
  zipMap,
21
} from "../utils";
22
import applyMixin from "../utils/applyMixin";
1×
23
import Commander from "../utils/Commander";
1×
24
import { ClusterOptions, DEFAULT_CLUSTER_OPTIONS } from "./ClusterOptions";
1×
25
import ClusterSubscriber from "./ClusterSubscriber";
1×
26
import ConnectionPool from "./ConnectionPool";
1×
27
import DelayQueue from "./DelayQueue";
1×
28
import {
1×
29
  getConnectionName,
30
  getUniqueHostnamesFromOptions,
31
  groupSrvRecords,
32
  NodeKey,
33
  nodeKeyToRedisOptions,
34
  NodeRole,
35
  normalizeNodeOptions,
36
  RedisOptions,
37
  weightSrvRecords,
38
} from "./util";
39
import Deque = require("denque");
1×
40

41
const debug = Debug("cluster");
1×
42

43
const REJECT_OVERWRITTEN_COMMANDS = new WeakSet<Command>();
1×
44

45
type OfflineQueueItem = {
46
  command: Command;
47
  stream: WriteableStream;
48
  node: unknown;
49
};
50

51
export type ClusterNode =
52
  | string
53
  | number
54
  | {
55
      host?: string | undefined;
56
      port?: number | undefined;
57
    };
58

59
type ClusterStatus =
60
  | "end"
61
  | "close"
62
  | "wait"
63
  | "connecting"
64
  | "connect"
65
  | "ready"
66
  | "reconnecting"
67
  | "disconnecting";
68

69
/**
70
 * Client for the official Redis Cluster
71
 */
72
class Cluster extends Commander {
73
  options: ClusterOptions;
74
  slots: NodeKey[][] = [];
104×
75
  status: ClusterStatus;
76

77
  /**
78
   * @ignore
79
   */
80
  _groupsIds: { [key: string]: number } = {};
104×
81

82
  /**
83
   * @ignore
84
   */
85
  _groupsBySlot: number[] = Array(16384);
104×
86

87
  /**
88
   * @ignore
89
   */
90
  isCluster = true;
104×
91

92
  private startupNodes: (string | number | object)[];
93
  private connectionPool: ConnectionPool;
94
  private manuallyClosing: boolean;
95
  private retryAttempts = 0;
104×
96
  private delayQueue: DelayQueue = new DelayQueue();
104×
97
  private offlineQueue = new Deque<OfflineQueueItem>();
104×
98
  private subscriber: ClusterSubscriber;
99
  private slotsTimer: NodeJS.Timer;
100
  private reconnectTimeout: NodeJS.Timer;
101
  private isRefreshing = false;
104×
102
  private _autoPipelines: Map<string, typeof Pipeline> = new Map();
104×
103
  private _runningAutoPipelines: Set<string> = new Set();
104×
104
  private _readyDelayedCallbacks: Callback[] = [];
104×
105

106
  /**
107
   * Every time Cluster#connect() is called, this value will be
108
   * auto-incrementing. The purpose of this value is used for
109
   * discarding previous connect attampts when creating a new
110
   * connection.
111
   */
112
  private connectionEpoch = 0;
104×
113

114
  /**
115
   * Creates an instance of Cluster.
116
   */
117
  constructor(startupNodes: ClusterNode[], options: ClusterOptions = {}) {
118
    super();
104×
119
    EventEmitter.call(this);
104×
120

121
    this.startupNodes = startupNodes;
104×
122
    this.options = defaults({}, options, DEFAULT_CLUSTER_OPTIONS, this.options);
104×
123

124
    if (
104×
125
      this.options.redisOptions &&
126
      this.options.redisOptions.keyPrefix &&
127
      !this.options.keyPrefix
128
    ) {
129
      this.options.keyPrefix = this.options.redisOptions.keyPrefix;
1×
130
    }
131

132
    // validate options
133
    if (
104×
134
      typeof this.options.scaleReads !== "function" &&
135
      ["all", "master", "slave"].indexOf(this.options.scaleReads) === -1
136
    ) {
137
      throw new Error(
1×
138
        'Invalid option scaleReads "' +
139
          this.options.scaleReads +
140
          '". Expected "all", "master", "slave" or a custom function'
141
      );
142
    }
143

144
    this.connectionPool = new ConnectionPool(this.options.redisOptions);
103×
145

146
    this.connectionPool.on("-node", (redis, key) => {
103×
147
      this.emit("-node", redis);
223×
148
    });
149
    this.connectionPool.on("+node", (redis) => {
103×
150
      this.emit("+node", redis);
226×
151
    });
152
    this.connectionPool.on("drain", () => {
103×
153
      this.setStatus("close");
218×
154
    });
155
    this.connectionPool.on("nodeError", (error, key) => {
103×
156
      this.emit("node error", error, key);
!
157
    });
158

159
    this.subscriber = new ClusterSubscriber(this.connectionPool, this);
103×
160

161
    if (this.options.scripts) {
103×
162
      Object.entries(this.options.scripts).forEach(([name, definition]) => {
1×
163
        this.defineCommand(name, definition);
2×
164
      });
165
    }
166

167
    if (this.options.lazyConnect) {
103×
168
      this.setStatus("wait");
9×
169
    } else {
170
      this.connect().catch((err) => {
94×
171
        debug("connecting failed: %s", err);
13×
172
      });
173
    }
174
  }
175

176
  /**
177
   * Connect to a cluster
178
   */
179
  connect(): Promise<void> {
180
    return new Promise((resolve, reject) => {
111×
181
      if (
Branches [[7, 0]] missed. 111×
182
        this.status === "connecting" ||
183
        this.status === "connect" ||
184
        this.status === "ready"
185
      ) {
186
        reject(new Error("Redis is already connecting/connected"));
!
187
        return;
!
188
      }
189

190
      const epoch = ++this.connectionEpoch;
111×
191
      this.setStatus("connecting");
111×
192

193
      this.resolveStartupNodeHostnames()
111×
194
        .then((nodes) => {
195
          if (this.connectionEpoch !== epoch) {
Branches [[9, 0]] missed. 107×
196
            debug(
!
197
              "discard connecting after resolving startup nodes because epoch not match: %d != %d",
198
              epoch,
199
              this.connectionEpoch
200
            );
201
            reject(
!
202
              new RedisError(
203
                "Connection is discarded because a new connection is made"
204
              )
205
            );
206
            return;
!
207
          }
208
          if (this.status !== "connecting") {
Branches [[10, 0]] missed. 107×
209
            debug(
!
210
              "discard connecting after resolving startup nodes because the status changed to %s",
211
              this.status
212
            );
213
            reject(new RedisError("Connection is aborted"));
!
214
            return;
!
215
          }
216
          this.connectionPool.reset(nodes);
107×
217

218
          const readyHandler = () => {
107×
219
            this.setStatus("ready");
74×
220
            this.retryAttempts = 0;
74×
221
            this.executeOfflineCommands();
74×
222
            this.resetNodesRefreshInterval();
74×
223
            resolve();
74×
224
          };
225

226
          let closeListener: () => void = undefined;
107×
227
          const refreshListener = () => {
107×
228
            this.invokeReadyDelayedCallbacks(undefined);
88×
229
            this.removeListener("close", closeListener);
88×
230
            this.manuallyClosing = false;
88×
231
            this.setStatus("connect");
88×
232
            if (this.options.enableReadyCheck) {
88×
233
              this.readyCheck((err, fail) => {
86×
234
                if (err || fail) {
86×
235
                  debug(
14×
236
                    "Ready check failed (%s). Reconnecting...",
237
                    err || fail
238
                  );
239
                  if (this.status === "connect") {
14×
240
                    this.disconnect(true);
6×
241
                  }
242
                } else {
243
                  readyHandler();
72×
244
                }
245
              });
246
            } else {
247
              readyHandler();
2×
248
            }
249
          };
250

251
          closeListener = () => {
107×
252
            const error = new Error("None of startup nodes is available");
19×
253

254
            this.removeListener("refresh", refreshListener);
19×
255
            this.invokeReadyDelayedCallbacks(error);
19×
256
            reject(error);
19×
257
          };
258

259
          this.once("refresh", refreshListener);
107×
260
          this.once("close", closeListener);
107×
261
          this.once("close", this.handleCloseEvent.bind(this));
107×
262

263
          this.refreshSlotsCache((err) => {
107×
264
            if (err && err.message === ClusterAllFailedError.defaultMessage) {
107×
265
              Redis.prototype.silentEmit.call(this, "error", err);
12×
266
              this.connectionPool.reset([]);
12×
267
            }
268
          });
269
          this.subscriber.start();
107×
270
        })
271
        .catch((err) => {
272
          this.setStatus("close");
6×
273
          this.handleCloseEvent(err);
6×
274
          this.invokeReadyDelayedCallbacks(err);
6×
275
          reject(err);
6×
276
        });
277
    });
278
  }
279

280
  /**
281
   * Disconnect from every node in the cluster.
282
   */
283
  disconnect(reconnect = false) {
284
    const status = this.status;
101×
285
    this.setStatus("disconnecting");
101×
286

287
    if (!reconnect) {
101×
288
      this.manuallyClosing = true;
92×
289
    }
290
    if (this.reconnectTimeout && !reconnect) {
101×
291
      clearTimeout(this.reconnectTimeout);
2×
292
      this.reconnectTimeout = null;
2×
293
      debug("Canceled reconnecting attempts");
2×
294
    }
295
    this.clearNodesRefreshInterval();
101×
296

297
    this.subscriber.stop();
101×
298
    if (status === "wait") {
101×
299
      this.setStatus("close");
2×
300
      this.handleCloseEvent();
2×
301
    } else {
302
      this.connectionPool.reset([]);
99×
303
    }
304
  }
305

306
  /**
307
   * Quit the cluster gracefully.
308
   */
309
  quit(callback?: Callback<"OK">): Promise<"OK"> {
310
    const status = this.status;
4×
311
    this.setStatus("disconnecting");
4×
312

313
    this.manuallyClosing = true;
4×
314

315
    if (this.reconnectTimeout) {
Branches [[23, 0]] missed. 4×
316
      clearTimeout(this.reconnectTimeout);
!
317
      this.reconnectTimeout = null;
!
318
    }
319
    this.clearNodesRefreshInterval();
4×
320

321
    this.subscriber.stop();
4×
322

323
    if (status === "wait") {
4×
324
      const ret = asCallback(Promise.resolve<"OK">("OK"), callback);
1×
325

326
      // use setImmediate to make sure "close" event
327
      // being emitted after quit() is returned
328
      setImmediate(
1×
329
        function () {
330
          this.setStatus("close");
1×
331
          this.handleCloseEvent();
1×
332
        }.bind(this)
333
      );
334

335
      return ret;
1×
336
    }
337
    return asCallback(
3×
338
      Promise.all(
339
        this.nodes().map((node) =>
340
          node.quit().catch((err) => {
7×
341
            // Ignore the error caused by disconnecting since
342
            // we're disconnecting...
343
            if (err.message === CONNECTION_CLOSED_ERROR_MSG) {
3×
344
              return "OK";
2×
345
            }
346
            throw err;
1×
347
          })
348
        )
349
      ).then(() => "OK"),
2×
350
      callback
351
    );
352
  }
353

354
  /**
355
   * Create a new instance with the same startup nodes and options as the current one.
356
   *
357
   * @example
358
   * ```js
359
   * var cluster = new Redis.Cluster([{ host: "127.0.0.1", port: "30001" }]);
360
   * var anotherCluster = cluster.duplicate();
361
   * ```
362
   */
363
  duplicate(overrideStartupNodes = [], overrideOptions = {}) {
Branches [[26, 0]] missed.
364
    const startupNodes =
365
      overrideStartupNodes.length > 0
Branches [[28, 1]] missed. 1×
366
        ? overrideStartupNodes
367
        : this.startupNodes.slice(0);
368
    const options = Object.assign({}, this.options, overrideOptions);
1×
369
    return new Cluster(startupNodes, options);
1×
370
  }
371

372
  /**
373
   * Get nodes with the specified role
374
   */
375
  nodes(role: NodeRole = "all"): Redis[] {
376
    if (role !== "all" && role !== "master" && role !== "slave") {
22×
377
      throw new Error(
1×
378
        'Invalid role "' + role + '". Expected "all", "master" or "slave"'
379
      );
380
    }
381
    return this.connectionPool.getNodes(role);
21×
382
  }
383

384
  /**
385
   * This is needed in order not to install a listener for each auto pipeline
386
   *
387
   * @ignore
388
   */
389
  delayUntilReady(callback: Callback) {
390
    this._readyDelayedCallbacks.push(callback);
12×
391
  }
392

393
  /**
394
   * Get the number of commands queued in automatic pipelines.
395
   *
396
   * This is not available (and returns 0) until the cluster is connected and slots information have been received.
397
   */
398
  get autoPipelineQueueSize(): number {
399
    let queued = 0;
38×
400

401
    for (const pipeline of this._autoPipelines.values()) {
38×
402
      queued += pipeline.length;
23×
403
    }
404

405
    return queued;
38×
406
  }
407

408
  /**
409
   * Refresh the slot cache
410
   *
411
   * @ignore
412
   */
413
  refreshSlotsCache(callback?: Callback<void>): void {
414
    if (this.isRefreshing) {
Branches [[32, 0]] missed. 124×
UNCOV
415
      if (callback) {
Branches [[33, 0], [33, 1]] missed. !
416
        process.nextTick(callback);
!
417
      }
UNCOV
418
      return;
!
419
    }
420
    this.isRefreshing = true;
124×
421

422
    const _this = this;
124×
423
    const wrapper = (error?: Error) => {
124×
424
      this.isRefreshing = false;
123×
425
      if (callback) {
123×
426
        callback(error);
117×
427
      }
428
    };
429

430
    const nodes = shuffle(this.connectionPool.getNodes());
124×
431

432
    let lastNodeError = null;
124×
433

434
    function tryNode(index: number) {
435
      if (index === nodes.length) {
145×
436
        const error = new ClusterAllFailedError(
12×
437
          ClusterAllFailedError.defaultMessage,
438
          lastNodeError
439
        );
440
        return wrapper(error);
12×
441
      }
442
      const node = nodes[index];
133×
443
      const key = `${node.options.host}:${node.options.port}`;
133×
444
      debug("getting slot cache from %s", key);
133×
445
      _this.getInfoFromNode(node, function (err) {
133×
446
        switch (_this.status) {
Branches [[36, 0]] missed. 132×
447
          case "close":
448
          case "end":
449
            return wrapper(new Error("Cluster is disconnected."));
7×
450
          case "disconnecting":
451
            return wrapper(new Error("Cluster is disconnecting."));
1×
452
        }
453
        if (err) {
124×
454
          _this.emit("node error", err, key);
21×
455
          lastNodeError = err;
21×
456
          tryNode(index + 1);
21×
457
        } else {
458
          _this.emit("refresh");
103×
459
          wrapper();
103×
460
        }
461
      });
462
    }
463

464
    tryNode(0);
124×
465
  }
466

467
  /**
468
   * @ignore
469
   */
470
  sendCommand(command: Command, stream?: WriteableStream, node?: any): unknown {
471
    if (this.status === "wait") {
Branches [[38, 0]] missed. 305×
472
      this.connect().catch(noop);
!
473
    }
474
    if (this.status === "end") {
Branches [[39, 0]] missed. 305×
475
      command.reject(new Error(CONNECTION_CLOSED_ERROR_MSG));
!
476
      return command.promise;
!
477
    }
478
    let to = this.options.scaleReads;
305×
479
    if (to !== "master") {
305×
480
      const isCommandReadOnly =
481
        command.isReadOnly ||
12×
482
        (exists(command.name) && hasFlag(command.name, "readonly"));
483
      if (!isCommandReadOnly) {
12×
484
        to = "master";
8×
485
      }
486
    }
487

488
    let targetSlot = node ? node.slot : command.getSlot();
305×
489
    const ttl = {};
305×
490
    const _this = this;
305×
491
    if (!node && !REJECT_OVERWRITTEN_COMMANDS.has(command)) {
305×
492
      REJECT_OVERWRITTEN_COMMANDS.add(command);
135×
493

494
      const reject = command.reject;
135×
495
      command.reject = function (err) {
135×
496
        const partialTry = tryConnection.bind(null, true);
35×
497
        _this.handleError(err, ttl, {
35×
498
          moved: function (slot, key) {
499
            debug("command %s is moved to %s", command.name, key);
5×
500
            targetSlot = Number(slot);
5×
501
            if (_this.slots[slot]) {
Branches [[46, 1]] missed. 5×
502
              _this.slots[slot][0] = key;
5×
503
            } else {
504
              _this.slots[slot] = [key];
!
505
            }
506
            _this._groupsBySlot[slot] =
5×
507
              _this._groupsIds[_this.slots[slot].join(";")];
508
            _this.connectionPool.findOrCreate(_this.natMapper(key));
5×
509
            tryConnection();
5×
510
            debug("refreshing slot caches... (triggered by MOVED error)");
5×
511
            _this.refreshSlotsCache();
5×
512
          },
513
          ask: function (slot, key) {
514
            debug("command %s is required to ask %s:%s", command.name, key);
8×
515
            const mapped = _this.natMapper(key);
8×
516
            _this.connectionPool.findOrCreate(mapped);
8×
517
            tryConnection(false, `${mapped.host}:${mapped.port}`);
8×
518
          },
519
          tryagain: partialTry,
520
          clusterDown: partialTry,
521
          connectionClosed: partialTry,
522
          maxRedirections: function (redirectionError) {
523
            reject.call(command, redirectionError);
1×
524
          },
525
          defaults: function () {
526
            reject.call(command, err);
15×
527
          },
528
        });
529
      };
530
    }
531
    tryConnection();
305×
532

533
    function tryConnection(random?: boolean, asking?: string) {
534
      if (_this.status === "end") {
Branches [[47, 0]] missed. 324×
535
        command.reject(new AbortError("Cluster is ended."));
!
536
        return;
!
537
      }
538
      let redis;
539
      if (_this.status === "ready" || command.name === "cluster") {
324×
540
        if (node && node.redis) {
251×
541
          redis = node.redis;
60×
542
        } else if (
191×
543
          Command.checkFlag("ENTER_SUBSCRIBER_MODE", command.name) ||
544
          Command.checkFlag("EXIT_SUBSCRIBER_MODE", command.name)
545
        ) {
546
          redis = _this.subscriber.getInstance();
5×
547
          if (!redis) {
Branches [[54, 0]] missed. 5×
548
            command.reject(new AbortError("No subscriber for the cluster"));
!
549
            return;
!
550
          }
551
        } else {
552
          if (!random) {
186×
553
            if (typeof targetSlot === "number" && _this.slots[targetSlot]) {
180×
554
              const nodeKeys = _this.slots[targetSlot];
94×
555
              if (typeof to === "function") {
94×
556
                const nodes = nodeKeys.map(function (key) {
1×
557
                  return _this.connectionPool.getInstanceByKey(key);
3×
558
                });
559
                redis = to(nodes, command);
1×
560
                if (Array.isArray(redis)) {
Branches [[59, 0]] missed. 1×
561
                  redis = sample(redis);
!
562
                }
563
                if (!redis) {
Branches [[60, 0]] missed. 1×
564
                  redis = nodes[0];
!
565
                }
566
              } else {
567
                let key;
568
                if (to === "all") {
93×
569
                  key = sample(nodeKeys);
1×
570
                } else if (to === "slave" && nodeKeys.length > 1) {
92×
571
                  key = sample(nodeKeys, 1);
2×
572
                } else {
573
                  key = nodeKeys[0];
90×
574
                }
575
                redis = _this.connectionPool.getInstanceByKey(key);
93×
576
              }
577
            }
578
            if (asking) {
180×
579
              redis = _this.connectionPool.getInstanceByKey(asking);
8×
580
              redis.asking();
8×
581
            }
582
          }
583
          if (!redis) {
186×
584
            redis =
93×
585
              (typeof to === "function"
Branches [[67, 0]] missed.
586
                ? null
587
                : _this.connectionPool.getSampleInstance(to)) ||
588
              _this.connectionPool.getSampleInstance("all");
589
          }
590
        }
591
        if (node && !node.redis) {
251×
592
          node.redis = redis;
41×
593
        }
594
      }
595
      if (redis) {
324×
596
        redis.sendCommand(command, stream);
250×
597
      } else if (_this.options.enableOfflineQueue) {
Branches [[71, 1]] missed. 74×
598
        _this.offlineQueue.push({
74×
599
          command: command,
600
          stream: stream,
601
          node: node,
602
        });
603
      } else {
604
        command.reject(
!
605
          new Error(
606
            "Cluster isn't ready and enableOfflineQueue options is false"
607
          )
608
        );
609
      }
610
    }
611
    return command.promise;
305×
612
  }
613

614
  sscanStream(key: string, options?: ScanStreamOptions) {
615
    return this.createScanStream("sscan", { key, options });
1×
616
  }
617

618
  sscanBufferStream(key: string, options?: ScanStreamOptions) {
619
    return this.createScanStream("sscanBuffer", { key, options });
!
620
  }
621

622
  hscanStream(key: string, options?: ScanStreamOptions) {
623
    return this.createScanStream("hscan", { key, options });
!
624
  }
625

626
  hscanBufferStream(key: string, options?: ScanStreamOptions) {
627
    return this.createScanStream("hscanBuffer", { key, options });
!
628
  }
629

630
  zscanStream(key: string, options?: ScanStreamOptions) {
631
    return this.createScanStream("zscan", { key, options });
!
632
  }
633

634
  zscanBufferStream(key: string, options?: ScanStreamOptions) {
635
    return this.createScanStream("zscanBuffer", { key, options });
!
636
  }
637

638
  /**
639
   * @ignore
640
   */
641
  handleError(error: Error, ttl: { value?: any }, handlers) {
642
    if (typeof ttl.value === "undefined") {
45×
643
      ttl.value = this.options.maxRedirections;
33×
644
    } else {
645
      ttl.value -= 1;
12×
646
    }
647
    if (ttl.value <= 0) {
45×
648
      handlers.maxRedirections(
2×
649
        new Error("Too many Cluster redirections. Last error: " + error)
650
      );
651
      return;
2×
652
    }
653
    const errv = error.message.split(" ");
43×
654
    if (errv[0] === "MOVED") {
43×
655
      const timeout = this.options.retryDelayOnMoved;
7×
656
      if (timeout && typeof timeout === "number") {
7×
657
        this.delayQueue.push(
1×
658
          "moved",
659
          handlers.moved.bind(null, errv[1], errv[2]),
660
          { timeout }
661
        );
662
      } else {
663
        handlers.moved(errv[1], errv[2]);
6×
664
      }
665
    } else if (errv[0] === "ASK") {
36×
666
      handlers.ask(errv[1], errv[2]);
10×
667
    } else if (errv[0] === "TRYAGAIN") {
26×
668
      this.delayQueue.push("tryagain", handlers.tryagain, {
2×
669
        timeout: this.options.retryDelayOnTryAgain,
670
      });
671
    } else if (
24×
672
      errv[0] === "CLUSTERDOWN" &&
673
      this.options.retryDelayOnClusterDown > 0
674
    ) {
675
      this.delayQueue.push("clusterdown", handlers.connectionClosed, {
5×
676
        timeout: this.options.retryDelayOnClusterDown,
677
        callback: this.refreshSlotsCache.bind(this),
678
      });
679
    } else if (
19×
680
      error.message === CONNECTION_CLOSED_ERROR_MSG &&
681
      this.options.retryDelayOnFailover > 0 &&
682
      this.status === "ready"
683
    ) {
684
      this.delayQueue.push("failover", handlers.connectionClosed, {
3×
685
        timeout: this.options.retryDelayOnFailover,
686
        callback: this.refreshSlotsCache.bind(this),
687
      });
688
    } else {
689
      handlers.defaults();
16×
690
    }
691
  }
692

693
  private resetOfflineQueue() {
694
    this.offlineQueue = new Deque();
47×
695
  }
696

697
  private clearNodesRefreshInterval() {
698
    if (this.slotsTimer) {
105×
699
      clearTimeout(this.slotsTimer);
1×
700
      this.slotsTimer = null;
1×
701
    }
702
  }
703

704
  private resetNodesRefreshInterval() {
705
    if (this.slotsTimer || !this.options.slotsRefreshInterval) {
74×
706
      return;
73×
707
    }
708
    const nextRound = () => {
1×
709
      this.slotsTimer = setTimeout(() => {
2×
710
        debug(
1×
711
          'refreshing slot caches... (triggered by "slotsRefreshInterval" option)'
712
        );
713
        this.refreshSlotsCache(() => {
1×
714
          nextRound();
1×
715
        });
716
      }, this.options.slotsRefreshInterval);
717
    };
718

719
    nextRound();
1×
720
  }
721

722
  /**
723
   * Change cluster instance's status
724
   */
725
  private setStatus(status: ClusterStatus): void {
726
    debug("status: %s -> %s", this.status || "[empty]", status);
728×
727
    this.status = status;
728×
728
    process.nextTick(() => {
728×
729
      this.emit(status);
728×
730
    });
731
  }
732

733
  /**
734
   * Called when closed to check whether a reconnection should be made
735
   */
736
  private handleCloseEvent(reason?: Error): void {
737
    if (reason) {
114×
738
      debug("closed because %s", reason);
6×
739
    }
740

741
    let retryDelay: unknown;
742
    if (
114×
743
      !this.manuallyClosing &&
744
      typeof this.options.clusterRetryStrategy === "function"
745
    ) {
746
      retryDelay = this.options.clusterRetryStrategy.call(
23×
747
        this,
748
        ++this.retryAttempts,
749
        reason
750
      );
751
    }
752
    if (typeof retryDelay === "number") {
114×
753
      this.setStatus("reconnecting");
20×
754
      this.reconnectTimeout = setTimeout(() => {
20×
755
        this.reconnectTimeout = null;
18×
756
        debug("Cluster is disconnected. Retrying after %dms", retryDelay);
18×
757
        this.connect().catch(function (err) {
18×
758
          debug("Got error %s when reconnecting. Ignoring...", err);
6×
759
        });
760
      }, retryDelay);
761
    } else {
762
      this.setStatus("end");
94×
763
      this.flushQueue(new Error("None of startup nodes is available"));
94×
764
    }
765
  }
766

767
  /**
768
   * Flush offline queue with error.
769
   */
770
  private flushQueue(error: Error) {
771
    let item: OfflineQueueItem;
772
    while ((item = this.offlineQueue.shift())) {
94×
773
      item.command.reject(error);
5×
774
    }
775
  }
776

777
  private executeOfflineCommands() {
778
    if (this.offlineQueue.length) {
74×
779
      debug("send %d commands in offline queue", this.offlineQueue.length);
47×
780
      const offlineQueue = this.offlineQueue;
47×
781
      this.resetOfflineQueue();
47×
782
      let item: OfflineQueueItem;
783
      while ((item = offlineQueue.shift())) {
47×
784
        this.sendCommand(item.command, item.stream, item.node);
69×
785
      }
786
    }
787
  }
788

789
  private natMapper(nodeKey: NodeKey | RedisOptions): RedisOptions {
790
    if (this.options.natMap && typeof this.options.natMap === "object") {
231×
791
      const key =
792
        typeof nodeKey === "string"
10×
793
          ? nodeKey
794
          : `${nodeKey.host}:${nodeKey.port}`;
795
      const mapped = this.options.natMap[key];
10×
796
      if (mapped) {
10×
797
        debug("NAT mapping %s -> %O", key, mapped);
9×
798
        return Object.assign({}, mapped);
9×
799
      }
800
    }
801
    return typeof nodeKey === "string"
222×
802
      ? nodeKeyToRedisOptions(nodeKey)
803
      : nodeKey;
804
  }
805

806
  private getInfoFromNode(redis: Redis, callback: Callback<void>) {
807
    if (!redis) {
Branches [[97, 0]] missed. 133×
808
      return callback(new Error("Node is disconnected"));
!
809
    }
810

811
    // Use a duplication of the connection to avoid
812
    // timeouts when the connection is in the blocking
813
    // mode (e.g. waiting for BLPOP).
814
    const duplicatedConnection = redis.duplicate({
133×
815
      enableOfflineQueue: true,
816
      enableReadyCheck: false,
817
      retryStrategy: null,
818
      connectionName: getConnectionName(
819
        "refresher",
820
        this.options.redisOptions && this.options.redisOptions.connectionName
821
      ),
822
    });
823

824
    // Ignore error events since we will handle
825
    // exceptions for the CLUSTER SLOTS command.
826
    duplicatedConnection.on("error", noop);
133×
827

828
    duplicatedConnection.cluster(
133×
829
      "SLOTS",
830
      timeout((err: Error, result) => {
831
        duplicatedConnection.disconnect();
132×
832
        if (err) {
132×
833
          return callback(err);
21×
834
        }
835
        if (
111×
836
          this.status === "disconnecting" ||
837
          this.status === "close" ||
838
          this.status === "end"
839
        ) {
840
          debug(
8×
841
            "ignore CLUSTER.SLOTS results (count: %d) since cluster status is %s",
842
            result.length,
843
            this.status
844
          );
845
          callback();
8×
846
          return;
8×
847
        }
848
        const nodes: RedisOptions[] = [];
103×
849

850
        debug("cluster slots result count: %d", result.length);
103×
851

852
        for (let i = 0; i < result.length; ++i) {
103×
853
          const items = result[i];
207×
854
          const slotRangeStart = items[0];
207×
855
          const slotRangeEnd = items[1];
207×
856

857
          const keys = [];
207×
858
          for (let j = 2; j < items.length; j++) {
207×
859
            if (!items[j][0]) {
Branches [[102, 0]] missed. 218×
860
              continue;
!
861
            }
862
            const node = this.natMapper({
218×
863
              host: items[j][0],
864
              port: items[j][1],
865
            });
866
            node.readOnly = j !== 2;
218×
867
            nodes.push(node);
218×
868
            keys.push(node.host + ":" + node.port);
218×
869
          }
870

871
          debug(
207×
872
            "cluster slots result [%d]: slots %d~%d served by %s",
873
            i,
874
            slotRangeStart,
875
            slotRangeEnd,
876
            keys
877
          );
878

879
          for (let slot = slotRangeStart; slot <= slotRangeEnd; slot++) {
207×
880
            this.slots[slot] = keys;
1,596,712×
881
          }
882
        }
883

884
        // Assign to each node keys a numeric value to make autopipeline comparison faster.
885
        this._groupsIds = Object.create(null);
103×
886
        let j = 0;
103×
887
        for (let i = 0; i < 16384; i++) {
103×
888
          const target = (this.slots[i] || []).join(";");
1,687,552×
889

890
          if (!target.length) {
1,687,552×
891
            this._groupsBySlot[i] = undefined;
75,457×
892
            continue;
75,457×
893
          }
894

895
          if (!this._groupsIds[target]) {
1,612,095×
896
            this._groupsIds[target] = ++j;
198×
897
          }
898

899
          this._groupsBySlot[i] = this._groupsIds[target];
1,612,095×
900
        }
901

902
        this.connectionPool.reset(nodes);
103×
903
        callback();
103×
904
      }, this.options.slotsRefreshTimeout)
905
    );
906
  }
907

908
  private invokeReadyDelayedCallbacks(err?: Error) {
909
    for (const c of this._readyDelayedCallbacks) {
113×
910
      process.nextTick(c, err);
12×
911
    }
912

913
    this._readyDelayedCallbacks = [];
113×
914
  }
915

916
  /**
917
   * Check whether Cluster is able to process commands
918
   */
919
  private readyCheck(callback: Callback<void | "fail">): void {
920
    this.cluster("INFO", (err, res) => {
86×
921
      if (err) {
86×
922
        return callback(err);
10×
923
      }
924
      if (typeof res !== "string") {
76×
925
        return callback();
10×
926
      }
927

928
      let state: string;
929
      const lines = res.split("\r\n");
66×
930
      for (let i = 0; i < lines.length; ++i) {
66×
931
        const parts = lines[i].split(":");
66×
932
        if (parts[0] === "cluster_state") {
66×
933
          state = parts[1];
6×
934
          break;
6×
935
        }
936
      }
937

938
      if (state === "fail") {
66×
939
        debug("cluster state not ok (%s)", state);
4×
940
        callback(null, state);
4×
941
      } else {
942
        callback();
62×
943
      }
944
    });
945
  }
946

947
  private resolveSrv(hostname: string): Promise<RedisOptions> {
948
    return new Promise((resolve, reject) => {
1×
949
      this.options.resolveSrv(hostname, (err, records) => {
1×
950
        if (err) {
Branches [[110, 0]] missed. 1×
951
          return reject(err);
!
952
        }
953

954
        const self = this,
1×
955
          groupedRecords = groupSrvRecords(records),
1×
956
          sortedKeys = Object.keys(groupedRecords).sort(
1×
957
            (a, b) => parseInt(a) - parseInt(b)
!
958
          );
959

960
        function tryFirstOne(err?) {
961
          if (!sortedKeys.length) {
Branches [[111, 0]] missed. 1×
962
            return reject(err);
!
963
          }
964

965
          const key = sortedKeys[0],
1×
966
            group = groupedRecords[key],
1×
967
            record = weightSrvRecords(group);
1×
968

969
          if (!group.records.length) {
Branches [[112, 1]] missed. 1×
970
            sortedKeys.shift();
1×
971
          }
972

973
          self.dnsLookup(record.name).then(
1×
974
            (host) =>
975
              resolve({
1×
976
                host,
977
                port: record.port,
978
              }),
979
            tryFirstOne
980
          );
981
        }
982

983
        tryFirstOne();
1×
984
      });
985
    });
986
  }
987

988
  private dnsLookup(hostname: string): Promise<string> {
989
    return new Promise((resolve, reject) => {
7×
990
      this.options.dnsLookup(hostname, (err, address) => {
7×
991
        if (err) {
6×
992
          debug(
1×
993
            "failed to resolve hostname %s to IP: %s",
994
            hostname,
995
            err.message
996
          );
997
          reject(err);
1×
998
        } else {
999
          debug("resolved hostname %s to IP %s", hostname, address);
5×
1000
          resolve(address);
5×
1001
        }
1002
      });
1003
    });
1004
  }
1005

1006
  /**
1007
   * Normalize startup nodes, and resolving hostnames to IPs.
1008
   *
1009
   * This process happens every time when #connect() is called since
1010
   * #startupNodes and DNS records may chanage.
1011
   */
1012
  private async resolveStartupNodeHostnames(): Promise<RedisOptions[]> {
1013
    if (!Array.isArray(this.startupNodes) || this.startupNodes.length === 0) {
111×
1014
      throw new Error("`startupNodes` should contain at least one node.");
2×
1015
    }
1016
    const startupNodes = normalizeNodeOptions(this.startupNodes);
109×
1017

1018
    const hostnames = getUniqueHostnamesFromOptions(startupNodes);
109×
1019
    if (hostnames.length === 0) {
109×
1020
      return startupNodes;
102×
1021
    }
1022

1023
    const configs = await Promise.all(
7×
1024
      hostnames.map(
1025
        (this.options.useSRVRecords ? this.resolveSrv : this.dnsLookup).bind(
1026
          this
1027
        )
1028
      )
1029
    );
1030
    const hostnameToConfig = zipMap(hostnames, configs);
5×
1031

1032
    return startupNodes.map((node) => {
5×
1033
      const config = hostnameToConfig.get(node.host);
5×
1034
      if (!config) {
Branches [[118, 0]] missed. 5×
1035
        return node;
!
1036
      }
1037
      if (this.options.useSRVRecords) {
5×
1038
        return Object.assign({}, node, config);
1×
1039
      }
1040
      return Object.assign({}, node, { host: config });
4×
1041
    });
1042
  }
1043

1044
  private createScanStream(
1045
    command: string,
1046
    { key, options = {} }: { key?: string; options?: ScanStreamOptions }
1047
  ) {
1048
    return new ScanStream({
1×
1049
      objectMode: true,
1050
      key: key,
1051
      redis: this,
1052
      command: command,
1053
      ...options,
1054
    });
1055
  }
1056
}
1057

1058
interface Cluster extends EventEmitter {}
1059
applyMixin(Cluster, EventEmitter);
1×
1060

1061
addTransactionSupport(Cluster.prototype);
1×
1062
interface Cluster extends Transaction {}
1063

1064
export default Cluster;
1×
Troubleshooting · Open an Issue · Sales · Support · ENTERPRISE · CAREERS · STATUS
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2023 Coveralls, Inc