Coveralls logob
Coveralls logo
  • Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

luin / ioredis / 1984806686

15 Mar 2022 - 5:12 coverage: 91.617% (-0.03%) from 91.645%
1984806686

Pull #1532

github

GitHub
Merge 3bcd4579b into decf10cdd
Pull Request #1532: feat: improve typings for smismember

1087 of 1239 branches covered (87.73%)

Branch coverage included in aggregate %.

2115 of 2256 relevant lines covered (93.75%)

4064.98 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

91.0
/lib/cluster/index.ts
1
import { exists, hasFlag } from "@ioredis/commands";
1×
2
import { EventEmitter } from "events";
1×
3
import { AbortError, RedisError } from "redis-errors";
1×
4
import asCallback from "standard-as-callback";
1×
5
import Command from "../Command";
1×
6
import ClusterAllFailedError from "../errors/ClusterAllFailedError";
1×
7
import Pipeline from "../Pipeline";
8
import Redis from "../Redis";
1×
9
import ScanStream from "../ScanStream";
1×
10
import { addTransactionSupport, Transaction } from "../transaction";
1×
11
import { Callback, ScanStreamOptions, WriteableStream } from "../types";
12
import {
1×
13
  CONNECTION_CLOSED_ERROR_MSG,
14
  Debug,
15
  defaults,
16
  noop,
17
  sample,
18
  shuffle,
19
  timeout,
20
  zipMap,
21
} from "../utils";
22
import applyMixin from "../utils/applyMixin";
1×
23
import Commander from "../utils/Commander";
1×
24
import { ClusterOptions, DEFAULT_CLUSTER_OPTIONS } from "./ClusterOptions";
1×
25
import ClusterSubscriber from "./ClusterSubscriber";
1×
26
import ConnectionPool from "./ConnectionPool";
1×
27
import DelayQueue from "./DelayQueue";
1×
28
import {
1×
29
  getConnectionName,
30
  getUniqueHostnamesFromOptions,
31
  groupSrvRecords,
32
  NodeKey,
33
  nodeKeyToRedisOptions,
34
  NodeRole,
35
  normalizeNodeOptions,
36
  RedisOptions,
37
  weightSrvRecords,
38
} from "./util";
39
import Deque = require("denque");
1×
40

41
const debug = Debug("cluster");
1×
42

43
const REJECT_OVERWRITTEN_COMMANDS = new WeakSet<Command>();
1×
44

45
type OfflineQueueItem = {
46
  command: Command;
47
  stream: WriteableStream;
48
  node: unknown;
49
};
50

51
type ClusterStatus =
52
  | "end"
53
  | "close"
54
  | "wait"
55
  | "connecting"
56
  | "connect"
57
  | "ready"
58
  | "reconnecting"
59
  | "disconnecting";
60

61
/**
62
 * Client for the official Redis Cluster
63
 */
64
class Cluster extends Commander {
65
  options: ClusterOptions;
66
  slots: NodeKey[][] = [];
104×
67
  status: ClusterStatus;
68

69
  /**
70
   * @ignore
71
   */
72
  _groupsIds: { [key: string]: number } = {};
104×
73

74
  /**
75
   * @ignore
76
   */
77
  _groupsBySlot: number[] = Array(16384);
104×
78

79
  /**
80
   * @ignore
81
   */
82
  isCluster = true;
104×
83

84
  private startupNodes: (string | number | object)[];
85
  private connectionPool: ConnectionPool;
86
  private manuallyClosing: boolean;
87
  private retryAttempts = 0;
104×
88
  private delayQueue: DelayQueue = new DelayQueue();
104×
89
  private offlineQueue = new Deque<OfflineQueueItem>();
104×
90
  private subscriber: ClusterSubscriber;
91
  private slotsTimer: NodeJS.Timer;
92
  private reconnectTimeout: NodeJS.Timer;
93
  private isRefreshing = false;
104×
94
  private _autoPipelines: Map<string, typeof Pipeline> = new Map();
104×
95
  private _runningAutoPipelines: Set<string> = new Set();
104×
96
  private _readyDelayedCallbacks: Callback[] = [];
104×
97

98
  /**
99
   * Every time Cluster#connect() is called, this value will be
100
   * auto-incrementing. The purpose of this value is used for
101
   * discarding previous connect attampts when creating a new
102
   * connection.
103
   */
104
  private connectionEpoch = 0;
104×
105

106
  /**
107
   * Creates an instance of Cluster.
108
   */
109
  constructor(
110
    startupNodes: (string | number | object)[],
111
    options: ClusterOptions = {}
112
  ) {
113
    super();
104×
114
    EventEmitter.call(this);
104×
115

116
    this.startupNodes = startupNodes;
104×
117
    this.options = defaults({}, options, DEFAULT_CLUSTER_OPTIONS, this.options);
104×
118

119
    if (
104×
120
      this.options.redisOptions &&
121
      this.options.redisOptions.keyPrefix &&
122
      !this.options.keyPrefix
123
    ) {
124
      this.options.keyPrefix = this.options.redisOptions.keyPrefix;
1×
125
    }
126

127
    // validate options
128
    if (
104×
129
      typeof this.options.scaleReads !== "function" &&
130
      ["all", "master", "slave"].indexOf(this.options.scaleReads) === -1
131
    ) {
132
      throw new Error(
1×
133
        'Invalid option scaleReads "' +
134
          this.options.scaleReads +
135
          '". Expected "all", "master", "slave" or a custom function'
136
      );
137
    }
138

139
    this.connectionPool = new ConnectionPool(this.options.redisOptions);
103×
140

141
    this.connectionPool.on("-node", (redis, key) => {
103×
142
      this.emit("-node", redis);
222×
143
    });
144
    this.connectionPool.on("+node", (redis) => {
103×
145
      this.emit("+node", redis);
224×
146
    });
147
    this.connectionPool.on("drain", () => {
103×
148
      this.setStatus("close");
217×
149
    });
150
    this.connectionPool.on("nodeError", (error, key) => {
103×
151
      this.emit("node error", error, key);
!
152
    });
153

154
    this.subscriber = new ClusterSubscriber(this.connectionPool, this);
103×
155

156
    if (this.options.scripts) {
103×
157
      Object.entries(this.options.scripts).forEach(([name, definition]) => {
1×
158
        this.defineCommand(name, definition);
2×
159
      });
160
    }
161

162
    if (this.options.lazyConnect) {
103×
163
      this.setStatus("wait");
9×
164
    } else {
165
      this.connect().catch((err) => {
94×
166
        debug("connecting failed: %s", err);
13×
167
      });
168
    }
169
  }
170

171
  /**
172
   * Connect to a cluster
173
   */
174
  connect(): Promise<void> {
175
    return new Promise((resolve, reject) => {
110×
176
      if (
Branches [[7, 0]] missed. 110×
177
        this.status === "connecting" ||
178
        this.status === "connect" ||
179
        this.status === "ready"
180
      ) {
181
        reject(new Error("Redis is already connecting/connected"));
!
182
        return;
!
183
      }
184

185
      const epoch = ++this.connectionEpoch;
110×
186
      this.setStatus("connecting");
110×
187

188
      this.resolveStartupNodeHostnames()
110×
189
        .then((nodes) => {
190
          if (this.connectionEpoch !== epoch) {
Branches [[9, 0]] missed. 106×
191
            debug(
!
192
              "discard connecting after resolving startup nodes because epoch not match: %d != %d",
193
              epoch,
194
              this.connectionEpoch
195
            );
196
            reject(
!
197
              new RedisError(
198
                "Connection is discarded because a new connection is made"
199
              )
200
            );
201
            return;
!
202
          }
203
          if (this.status !== "connecting") {
Branches [[10, 0]] missed. 106×
204
            debug(
!
205
              "discard connecting after resolving startup nodes because the status changed to %s",
206
              this.status
207
            );
208
            reject(new RedisError("Connection is aborted"));
!
209
            return;
!
210
          }
211
          this.connectionPool.reset(nodes);
106×
212

213
          const readyHandler = () => {
106×
214
            this.setStatus("ready");
75×
215
            this.retryAttempts = 0;
75×
216
            this.executeOfflineCommands();
75×
217
            this.resetNodesRefreshInterval();
75×
218
            resolve();
75×
219
          };
220

221
          let closeListener: () => void = undefined;
106×
222
          const refreshListener = () => {
106×
223
            this.invokeReadyDelayedCallbacks(undefined);
87×
224
            this.removeListener("close", closeListener);
87×
225
            this.manuallyClosing = false;
87×
226
            this.setStatus("connect");
87×
227
            if (this.options.enableReadyCheck) {
87×
228
              this.readyCheck((err, fail) => {
85×
229
                if (err || fail) {
85×
230
                  debug(
12×
231
                    "Ready check failed (%s). Reconnecting...",
232
                    err || fail
233
                  );
234
                  if (this.status === "connect") {
12×
235
                    this.disconnect(true);
6×
236
                  }
237
                } else {
238
                  readyHandler();
73×
239
                }
240
              });
241
            } else {
242
              readyHandler();
2×
243
            }
244
          };
245

246
          closeListener = () => {
106×
247
            const error = new Error("None of startup nodes is available");
19×
248

249
            this.removeListener("refresh", refreshListener);
19×
250
            this.invokeReadyDelayedCallbacks(error);
19×
251
            reject(error);
19×
252
          };
253

254
          this.once("refresh", refreshListener);
106×
255
          this.once("close", closeListener);
106×
256
          this.once("close", this.handleCloseEvent.bind(this));
106×
257

258
          this.refreshSlotsCache((err) => {
106×
259
            if (err && err.message === ClusterAllFailedError.defaultMessage) {
106×
260
              Redis.prototype.silentEmit.call(this, "error", err);
12×
261
              this.connectionPool.reset([]);
12×
262
            }
263
          });
264
          this.subscriber.start();
106×
265
        })
266
        .catch((err) => {
267
          this.setStatus("close");
6×
268
          this.handleCloseEvent(err);
6×
269
          this.invokeReadyDelayedCallbacks(err);
6×
270
          reject(err);
6×
271
        });
272
    });
273
  }
274

275
  /**
276
   * Disconnect from every node in the cluster.
277
   */
278
  disconnect(reconnect = false) {
279
    const status = this.status;
101×
280
    this.setStatus("disconnecting");
101×
281

282
    if (!reconnect) {
101×
283
      this.manuallyClosing = true;
92×
284
    }
285
    if (this.reconnectTimeout && !reconnect) {
101×
286
      clearTimeout(this.reconnectTimeout);
2×
287
      this.reconnectTimeout = null;
2×
288
      debug("Canceled reconnecting attempts");
2×
289
    }
290
    this.clearNodesRefreshInterval();
101×
291

292
    this.subscriber.stop();
101×
293
    if (status === "wait") {
101×
294
      this.setStatus("close");
2×
295
      this.handleCloseEvent();
2×
296
    } else {
297
      this.connectionPool.reset([]);
99×
298
    }
299
  }
300

301
  /**
302
   * Quit the cluster gracefully.
303
   */
304
  quit(callback?: Callback<"OK">): Promise<"OK"> {
305
    const status = this.status;
4×
306
    this.setStatus("disconnecting");
4×
307

308
    this.manuallyClosing = true;
4×
309

310
    if (this.reconnectTimeout) {
Branches [[23, 0]] missed. 4×
311
      clearTimeout(this.reconnectTimeout);
!
312
      this.reconnectTimeout = null;
!
313
    }
314
    this.clearNodesRefreshInterval();
4×
315

316
    this.subscriber.stop();
4×
317

318
    if (status === "wait") {
4×
319
      const ret = asCallback(Promise.resolve<"OK">("OK"), callback);
1×
320

321
      // use setImmediate to make sure "close" event
322
      // being emitted after quit() is returned
323
      setImmediate(
1×
324
        function () {
325
          this.setStatus("close");
1×
326
          this.handleCloseEvent();
1×
327
        }.bind(this)
328
      );
329

330
      return ret;
1×
331
    }
332
    return asCallback(
3×
333
      Promise.all(
334
        this.nodes().map((node) =>
335
          node.quit().catch((err) => {
7×
336
            // Ignore the error caused by disconnecting since
337
            // we're disconnecting...
338
            if (err.message === CONNECTION_CLOSED_ERROR_MSG) {
3×
339
              return "OK";
2×
340
            }
341
            throw err;
1×
342
          })
343
        )
344
      ).then(() => "OK"),
2×
345
      callback
346
    );
347
  }
348

349
  /**
350
   * Create a new instance with the same startup nodes and options as the current one.
351
   *
352
   * @example
353
   * ```js
354
   * var cluster = new Redis.Cluster([{ host: "127.0.0.1", port: "30001" }]);
355
   * var anotherCluster = cluster.duplicate();
356
   * ```
357
   */
358
  duplicate(overrideStartupNodes = [], overrideOptions = {}) {
Branches [[26, 0]] missed.
359
    const startupNodes =
360
      overrideStartupNodes.length > 0
Branches [[28, 1]] missed. 1×
361
        ? overrideStartupNodes
362
        : this.startupNodes.slice(0);
363
    const options = Object.assign({}, this.options, overrideOptions);
1×
364
    return new Cluster(startupNodes, options);
1×
365
  }
366

367
  /**
368
   * Get nodes with the specified role
369
   */
370
  nodes(role: NodeRole = "all"): Redis[] {
371
    if (role !== "all" && role !== "master" && role !== "slave") {
22×
372
      throw new Error(
1×
373
        'Invalid role "' + role + '". Expected "all", "master" or "slave"'
374
      );
375
    }
376
    return this.connectionPool.getNodes(role);
21×
377
  }
378

379
  /**
380
   * This is needed in order not to install a listener for each auto pipeline
381
   *
382
   * @ignore
383
   */
384
  delayUntilReady(callback: Callback) {
385
    this._readyDelayedCallbacks.push(callback);
12×
386
  }
387

388
  /**
389
   * Get the number of commands queued in automatic pipelines.
390
   *
391
   * This is not available (and returns 0) until the cluster is connected and slots information have been received.
392
   */
393
  get autoPipelineQueueSize(): number {
394
    let queued = 0;
38×
395

396
    for (const pipeline of this._autoPipelines.values()) {
38×
397
      queued += pipeline.length;
23×
398
    }
399

400
    return queued;
38×
401
  }
402

403
  /**
404
   * Refresh the slot cache
405
   *
406
   * @ignore
407
   */
408
  refreshSlotsCache(callback?: Callback<void>): void {
409
    if (this.isRefreshing) {
Branches [[32, 0]] missed. 121×
UNCOV
410
      if (callback) {
Branches [[33, 0], [33, 1]] missed. !
411
        process.nextTick(callback);
!
412
      }
UNCOV
413
      return;
!
414
    }
415
    this.isRefreshing = true;
121×
416

417
    const _this = this;
121×
418
    const wrapper = (error?: Error) => {
121×
419
      this.isRefreshing = false;
120×
420
      if (callback) {
120×
421
        callback(error);
114×
422
      }
423
    };
424

425
    const nodes = shuffle(this.connectionPool.getNodes());
121×
426

427
    let lastNodeError = null;
121×
428

429
    function tryNode(index: number) {
430
      if (index === nodes.length) {
142×
431
        const error = new ClusterAllFailedError(
12×
432
          ClusterAllFailedError.defaultMessage,
433
          lastNodeError
434
        );
435
        return wrapper(error);
12×
436
      }
437
      const node = nodes[index];
130×
438
      const key = `${node.options.host}:${node.options.port}`;
130×
439
      debug("getting slot cache from %s", key);
130×
440
      _this.getInfoFromNode(node, function (err) {
130×
441
        switch (_this.status) {
Branches [[36, 0]] missed. 129×
442
          case "close":
443
          case "end":
444
            return wrapper(new Error("Cluster is disconnected."));
7×
445
          case "disconnecting":
446
            return wrapper(new Error("Cluster is disconnecting."));
1×
447
        }
448
        if (err) {
121×
449
          _this.emit("node error", err, key);
21×
450
          lastNodeError = err;
21×
451
          tryNode(index + 1);
21×
452
        } else {
453
          _this.emit("refresh");
100×
454
          wrapper();
100×
455
        }
456
      });
457
    }
458

459
    tryNode(0);
121×
460
  }
461

462
  /**
463
   * @ignore
464
   */
465
  sendCommand(command: Command, stream?: WriteableStream, node?: any): unknown {
466
    if (this.status === "wait") {
Branches [[38, 0]] missed. 304×
467
      this.connect().catch(noop);
!
468
    }
469
    if (this.status === "end") {
Branches [[39, 0]] missed. 304×
470
      command.reject(new Error(CONNECTION_CLOSED_ERROR_MSG));
!
471
      return command.promise;
!
472
    }
473
    let to = this.options.scaleReads;
304×
474
    if (to !== "master") {
304×
475
      const isCommandReadOnly =
476
        command.isReadOnly ||
12×
477
        (exists(command.name) && hasFlag(command.name, "readonly"));
478
      if (!isCommandReadOnly) {
12×
479
        to = "master";
8×
480
      }
481
    }
482

483
    let targetSlot = node ? node.slot : command.getSlot();
304×
484
    const ttl = {};
304×
485
    const _this = this;
304×
486
    if (!node && !REJECT_OVERWRITTEN_COMMANDS.has(command)) {
304×
487
      REJECT_OVERWRITTEN_COMMANDS.add(command);
134×
488

489
      const reject = command.reject;
134×
490
      command.reject = function (err) {
134×
491
        const partialTry = tryConnection.bind(null, true);
31×
492
        _this.handleError(err, ttl, {
31×
493
          moved: function (slot, key) {
494
            debug("command %s is moved to %s", command.name, key);
5×
495
            targetSlot = Number(slot);
5×
496
            if (_this.slots[slot]) {
Branches [[46, 1]] missed. 5×
497
              _this.slots[slot][0] = key;
5×
498
            } else {
499
              _this.slots[slot] = [key];
!
500
            }
501
            _this._groupsBySlot[slot] =
5×
502
              _this._groupsIds[_this.slots[slot].join(";")];
503
            _this.connectionPool.findOrCreate(_this.natMapper(key));
5×
504
            tryConnection();
5×
505
            debug("refreshing slot caches... (triggered by MOVED error)");
5×
506
            _this.refreshSlotsCache();
5×
507
          },
508
          ask: function (slot, key) {
509
            debug("command %s is required to ask %s:%s", command.name, key);
8×
510
            const mapped = _this.natMapper(key);
8×
511
            _this.connectionPool.findOrCreate(mapped);
8×
512
            tryConnection(false, `${mapped.host}:${mapped.port}`);
8×
513
          },
514
          tryagain: partialTry,
515
          clusterDown: partialTry,
516
          connectionClosed: partialTry,
517
          maxRedirections: function (redirectionError) {
518
            reject.call(command, redirectionError);
1×
519
          },
520
          defaults: function () {
521
            reject.call(command, err);
13×
522
          },
523
        });
524
      };
525
    }
526
    tryConnection();
304×
527

528
    function tryConnection(random?: boolean, asking?: string) {
529
      if (_this.status === "end") {
Branches [[47, 0]] missed. 321×
530
        command.reject(new AbortError("Cluster is ended."));
!
531
        return;
!
532
      }
533
      let redis;
534
      if (_this.status === "ready" || command.name === "cluster") {
321×
535
        if (node && node.redis) {
248×
536
          redis = node.redis;
60×
537
        } else if (
188×
538
          Command.checkFlag("ENTER_SUBSCRIBER_MODE", command.name) ||
539
          Command.checkFlag("EXIT_SUBSCRIBER_MODE", command.name)
540
        ) {
541
          redis = _this.subscriber.getInstance();
5×
542
          if (!redis) {
Branches [[54, 0]] missed. 5×
543
            command.reject(new AbortError("No subscriber for the cluster"));
!
544
            return;
!
545
          }
546
        } else {
547
          if (!random) {
183×
548
            if (typeof targetSlot === "number" && _this.slots[targetSlot]) {
179×
549
              const nodeKeys = _this.slots[targetSlot];
94×
550
              if (typeof to === "function") {
94×
551
                const nodes = nodeKeys.map(function (key) {
1×
552
                  return _this.connectionPool.getInstanceByKey(key);
3×
553
                });
554
                redis = to(nodes, command);
1×
555
                if (Array.isArray(redis)) {
Branches [[59, 0]] missed. 1×
556
                  redis = sample(redis);
!
557
                }
558
                if (!redis) {
Branches [[60, 0]] missed. 1×
559
                  redis = nodes[0];
!
560
                }
561
              } else {
562
                let key;
563
                if (to === "all") {
93×
564
                  key = sample(nodeKeys);
1×
565
                } else if (to === "slave" && nodeKeys.length > 1) {
92×
566
                  key = sample(nodeKeys, 1);
2×
567
                } else {
568
                  key = nodeKeys[0];
90×
569
                }
570
                redis = _this.connectionPool.getInstanceByKey(key);
93×
571
              }
572
            }
573
            if (asking) {
179×
574
              redis = _this.connectionPool.getInstanceByKey(asking);
8×
575
              redis.asking();
8×
576
            }
577
          }
578
          if (!redis) {
183×
579
            redis =
90×
580
              (typeof to === "function"
Branches [[67, 0]] missed.
581
                ? null
582
                : _this.connectionPool.getSampleInstance(to)) ||
583
              _this.connectionPool.getSampleInstance("all");
584
          }
585
        }
586
        if (node && !node.redis) {
248×
587
          node.redis = redis;
41×
588
        }
589
      }
590
      if (redis) {
321×
591
        redis.sendCommand(command, stream);
247×
592
      } else if (_this.options.enableOfflineQueue) {
Branches [[71, 1]] missed. 74×
593
        _this.offlineQueue.push({
74×
594
          command: command,
595
          stream: stream,
596
          node: node,
597
        });
598
      } else {
599
        command.reject(
!
600
          new Error(
601
            "Cluster isn't ready and enableOfflineQueue options is false"
602
          )
603
        );
604
      }
605
    }
606
    return command.promise;
304×
607
  }
608

609
  sscanStream(key: string, options?: ScanStreamOptions) {
610
    return this.createScanStream("sscan", { key, options });
1×
611
  }
612

613
  sscanBufferStream(key: string, options?: ScanStreamOptions) {
614
    return this.createScanStream("sscanBuffer", { key, options });
!
615
  }
616

617
  hscanStream(key: string, options?: ScanStreamOptions) {
618
    return this.createScanStream("hscan", { key, options });
!
619
  }
620

621
  hscanBufferStream(key: string, options?: ScanStreamOptions) {
622
    return this.createScanStream("hscanBuffer", { key, options });
!
623
  }
624

625
  zscanStream(key: string, options?: ScanStreamOptions) {
626
    return this.createScanStream("zscan", { key, options });
!
627
  }
628

629
  zscanBufferStream(key: string, options?: ScanStreamOptions) {
630
    return this.createScanStream("zscanBuffer", { key, options });
!
631
  }
632

633
  /**
634
   * @ignore
635
   */
636
  handleError(error: Error, ttl: { value?: any }, handlers) {
637
    if (typeof ttl.value === "undefined") {
41×
638
      ttl.value = this.options.maxRedirections;
31×
639
    } else {
640
      ttl.value -= 1;
10×
641
    }
642
    if (ttl.value <= 0) {
41×
643
      handlers.maxRedirections(
2×
644
        new Error("Too many Cluster redirections. Last error: " + error)
645
      );
646
      return;
2×
647
    }
648
    const errv = error.message.split(" ");
39×
649
    if (errv[0] === "MOVED") {
39×
650
      const timeout = this.options.retryDelayOnMoved;
7×
651
      if (timeout && typeof timeout === "number") {
7×
652
        this.delayQueue.push(
1×
653
          "moved",
654
          handlers.moved.bind(null, errv[1], errv[2]),
655
          { timeout }
656
        );
657
      } else {
658
        handlers.moved(errv[1], errv[2]);
6×
659
      }
660
    } else if (errv[0] === "ASK") {
32×
661
      handlers.ask(errv[1], errv[2]);
10×
662
    } else if (errv[0] === "TRYAGAIN") {
22×
663
      this.delayQueue.push("tryagain", handlers.tryagain, {
2×
664
        timeout: this.options.retryDelayOnTryAgain,
665
      });
666
    } else if (
20×
667
      errv[0] === "CLUSTERDOWN" &&
668
      this.options.retryDelayOnClusterDown > 0
669
    ) {
670
      this.delayQueue.push("clusterdown", handlers.connectionClosed, {
3×
671
        timeout: this.options.retryDelayOnClusterDown,
672
        callback: this.refreshSlotsCache.bind(this),
673
      });
674
    } else if (
17×
675
      error.message === CONNECTION_CLOSED_ERROR_MSG &&
676
      this.options.retryDelayOnFailover > 0 &&
677
      this.status === "ready"
678
    ) {
679
      this.delayQueue.push("failover", handlers.connectionClosed, {
3×
680
        timeout: this.options.retryDelayOnFailover,
681
        callback: this.refreshSlotsCache.bind(this),
682
      });
683
    } else {
684
      handlers.defaults();
14×
685
    }
686
  }
687

688
  private resetOfflineQueue() {
689
    this.offlineQueue = new Deque();
47×
690
  }
691

692
  private clearNodesRefreshInterval() {
693
    if (this.slotsTimer) {
105×
694
      clearTimeout(this.slotsTimer);
1×
695
      this.slotsTimer = null;
1×
696
    }
697
  }
698

699
  private resetNodesRefreshInterval() {
700
    if (this.slotsTimer || !this.options.slotsRefreshInterval) {
75×
701
      return;
74×
702
    }
703
    const nextRound = () => {
1×
704
      this.slotsTimer = setTimeout(() => {
2×
705
        debug(
1×
706
          'refreshing slot caches... (triggered by "slotsRefreshInterval" option)'
707
        );
708
        this.refreshSlotsCache(() => {
1×
709
          nextRound();
1×
710
        });
711
      }, this.options.slotsRefreshInterval);
712
    };
713

714
    nextRound();
1×
715
  }
716

717
  /**
718
   * Change cluster instance's status
719
   */
720
  private setStatus(status: ClusterStatus): void {
721
    debug("status: %s -> %s", this.status || "[empty]", status);
725×
722
    this.status = status;
725×
723
    process.nextTick(() => {
725×
724
      this.emit(status);
725×
725
    });
726
  }
727

728
  /**
729
   * Called when closed to check whether a reconnection should be made
730
   */
731
  private handleCloseEvent(reason?: Error): void {
732
    if (reason) {
113×
733
      debug("closed because %s", reason);
6×
734
    }
735

736
    let retryDelay: unknown;
737
    if (
113×
738
      !this.manuallyClosing &&
739
      typeof this.options.clusterRetryStrategy === "function"
740
    ) {
741
      retryDelay = this.options.clusterRetryStrategy.call(
22×
742
        this,
743
        ++this.retryAttempts,
744
        reason
745
      );
746
    }
747
    if (typeof retryDelay === "number") {
113×
748
      this.setStatus("reconnecting");
19×
749
      this.reconnectTimeout = setTimeout(() => {
19×
750
        this.reconnectTimeout = null;
17×
751
        debug("Cluster is disconnected. Retrying after %dms", retryDelay);
17×
752
        this.connect().catch(function (err) {
17×
753
          debug("Got error %s when reconnecting. Ignoring...", err);
6×
754
        });
755
      }, retryDelay);
756
    } else {
757
      this.setStatus("end");
94×
758
      this.flushQueue(new Error("None of startup nodes is available"));
94×
759
    }
760
  }
761

762
  /**
763
   * Flush offline queue with error.
764
   */
765
  private flushQueue(error: Error) {
766
    let item: OfflineQueueItem;
767
    while ((item = this.offlineQueue.shift())) {
94×
768
      item.command.reject(error);
5×
769
    }
770
  }
771

772
  private executeOfflineCommands() {
773
    if (this.offlineQueue.length) {
75×
774
      debug("send %d commands in offline queue", this.offlineQueue.length);
47×
775
      const offlineQueue = this.offlineQueue;
47×
776
      this.resetOfflineQueue();
47×
777
      let item: OfflineQueueItem;
778
      while ((item = offlineQueue.shift())) {
47×
779
        this.sendCommand(item.command, item.stream, item.node);
69×
780
      }
781
    }
782
  }
783

784
  private natMapper(nodeKey: NodeKey | RedisOptions): RedisOptions {
785
    if (this.options.natMap && typeof this.options.natMap === "object") {
225×
786
      const key =
787
        typeof nodeKey === "string"
10×
788
          ? nodeKey
789
          : `${nodeKey.host}:${nodeKey.port}`;
790
      const mapped = this.options.natMap[key];
10×
791
      if (mapped) {
10×
792
        debug("NAT mapping %s -> %O", key, mapped);
9×
793
        return Object.assign({}, mapped);
9×
794
      }
795
    }
796
    return typeof nodeKey === "string"
216×
797
      ? nodeKeyToRedisOptions(nodeKey)
798
      : nodeKey;
799
  }
800

801
  private getInfoFromNode(redis: Redis, callback: Callback<void>) {
802
    if (!redis) {
Branches [[97, 0]] missed. 130×
803
      return callback(new Error("Node is disconnected"));
!
804
    }
805

806
    // Use a duplication of the connection to avoid
807
    // timeouts when the connection is in the blocking
808
    // mode (e.g. waiting for BLPOP).
809
    const duplicatedConnection = redis.duplicate({
130×
810
      enableOfflineQueue: true,
811
      enableReadyCheck: false,
812
      retryStrategy: null,
813
      connectionName: getConnectionName(
814
        "refresher",
815
        this.options.redisOptions && this.options.redisOptions.connectionName
816
      ),
817
    });
818

819
    // Ignore error events since we will handle
820
    // exceptions for the CLUSTER SLOTS command.
821
    duplicatedConnection.on("error", noop);
130×
822

823
    duplicatedConnection.cluster(
130×
824
      "SLOTS",
825
      timeout((err: Error, result) => {
826
        duplicatedConnection.disconnect();
129×
827
        if (err) {
129×
828
          return callback(err);
21×
829
        }
830
        if (
108×
831
          this.status === "disconnecting" ||
832
          this.status === "close" ||
833
          this.status === "end"
834
        ) {
835
          debug(
8×
836
            "ignore CLUSTER.SLOTS results (count: %d) since cluster status is %s",
837
            result.length,
838
            this.status
839
          );
840
          callback();
8×
841
          return;
8×
842
        }
843
        const nodes: RedisOptions[] = [];
100×
844

845
        debug("cluster slots result count: %d", result.length);
100×
846

847
        for (let i = 0; i < result.length; ++i) {
100×
848
          const items = result[i];
201×
849
          const slotRangeStart = items[0];
201×
850
          const slotRangeEnd = items[1];
201×
851

852
          const keys = [];
201×
853
          for (let j = 2; j < items.length; j++) {
201×
854
            if (!items[j][0]) {
Branches [[102, 0]] missed. 212×
855
              continue;
!
856
            }
857
            const node = this.natMapper({
212×
858
              host: items[j][0],
859
              port: items[j][1],
860
            });
861
            node.readOnly = j !== 2;
212×
862
            nodes.push(node);
212×
863
            keys.push(node.host + ":" + node.port);
212×
864
          }
865

866
          debug(
201×
867
            "cluster slots result [%d]: slots %d~%d served by %s",
868
            i,
869
            slotRangeStart,
870
            slotRangeEnd,
871
            keys
872
          );
873

874
          for (let slot = slotRangeStart; slot <= slotRangeEnd; slot++) {
201×
875
            this.slots[slot] = keys;
1,562,943×
876
          }
877
        }
878

879
        // Assign to each node keys a numeric value to make autopipeline comparison faster.
880
        this._groupsIds = Object.create(null);
100×
881
        let j = 0;
100×
882
        for (let i = 0; i < 16384; i++) {
100×
883
          const target = (this.slots[i] || []).join(";");
1,638,400×
884

885
          if (!target.length) {
1,638,400×
886
            this._groupsBySlot[i] = undefined;
75,457×
887
            continue;
75,457×
888
          }
889

890
          if (!this._groupsIds[target]) {
1,562,943×
891
            this._groupsIds[target] = ++j;
192×
892
          }
893

894
          this._groupsBySlot[i] = this._groupsIds[target];
1,562,943×
895
        }
896

897
        this.connectionPool.reset(nodes);
100×
898
        callback();
100×
899
      }, this.options.slotsRefreshTimeout)
900
    );
901
  }
902

903
  private invokeReadyDelayedCallbacks(err?: Error) {
904
    for (const c of this._readyDelayedCallbacks) {
112×
905
      process.nextTick(c, err);
12×
906
    }
907

908
    this._readyDelayedCallbacks = [];
112×
909
  }
910

911
  /**
912
   * Check whether Cluster is able to process commands
913
   */
914
  private readyCheck(callback: Callback<void | "fail">): void {
915
    this.cluster("INFO", (err, res) => {
85×
916
      if (err) {
85×
917
        return callback(err);
8×
918
      }
919
      if (typeof res !== "string") {
77×
920
        return callback();
8×
921
      }
922

923
      let state: string;
924
      const lines = res.split("\r\n");
69×
925
      for (let i = 0; i < lines.length; ++i) {
69×
926
        const parts = lines[i].split(":");
69×
927
        if (parts[0] === "cluster_state") {
69×
928
          state = parts[1];
6×
929
          break;
6×
930
        }
931
      }
932

933
      if (state === "fail") {
69×
934
        debug("cluster state not ok (%s)", state);
4×
935
        callback(null, state);
4×
936
      } else {
937
        callback();
65×
938
      }
939
    });
940
  }
941

942
  private resolveSrv(hostname: string): Promise<RedisOptions> {
943
    return new Promise((resolve, reject) => {
1×
944
      this.options.resolveSrv(hostname, (err, records) => {
1×
945
        if (err) {
Branches [[110, 0]] missed. 1×
946
          return reject(err);
!
947
        }
948

949
        const self = this,
1×
950
          groupedRecords = groupSrvRecords(records),
1×
951
          sortedKeys = Object.keys(groupedRecords).sort(
1×
952
            (a, b) => parseInt(a) - parseInt(b)
!
953
          );
954

955
        function tryFirstOne(err?) {
956
          if (!sortedKeys.length) {
Branches [[111, 0]] missed. 1×
957
            return reject(err);
!
958
          }
959

960
          const key = sortedKeys[0],
1×
961
            group = groupedRecords[key],
1×
962
            record = weightSrvRecords(group);
1×
963

964
          if (!group.records.length) {
Branches [[112, 1]] missed. 1×
965
            sortedKeys.shift();
1×
966
          }
967

968
          self.dnsLookup(record.name).then(
1×
969
            (host) =>
970
              resolve({
1×
971
                host,
972
                port: record.port,
973
              }),
974
            tryFirstOne
975
          );
976
        }
977

978
        tryFirstOne();
1×
979
      });
980
    });
981
  }
982

983
  private dnsLookup(hostname: string): Promise<string> {
984
    return new Promise((resolve, reject) => {
7×
985
      this.options.dnsLookup(hostname, (err, address) => {
7×
986
        if (err) {
6×
987
          debug(
1×
988
            "failed to resolve hostname %s to IP: %s",
989
            hostname,
990
            err.message
991
          );
992
          reject(err);
1×
993
        } else {
994
          debug("resolved hostname %s to IP %s", hostname, address);
5×
995
          resolve(address);
5×
996
        }
997
      });
998
    });
999
  }
1000

1001
  /**
1002
   * Normalize startup nodes, and resolving hostnames to IPs.
1003
   *
1004
   * This process happens every time when #connect() is called since
1005
   * #startupNodes and DNS records may chanage.
1006
   */
1007
  private async resolveStartupNodeHostnames(): Promise<RedisOptions[]> {
1008
    if (!Array.isArray(this.startupNodes) || this.startupNodes.length === 0) {
110×
1009
      throw new Error("`startupNodes` should contain at least one node.");
2×
1010
    }
1011
    const startupNodes = normalizeNodeOptions(this.startupNodes);
108×
1012

1013
    const hostnames = getUniqueHostnamesFromOptions(startupNodes);
108×
1014
    if (hostnames.length === 0) {
108×
1015
      return startupNodes;
101×
1016
    }
1017

1018
    const configs = await Promise.all(
7×
1019
      hostnames.map(
1020
        (this.options.useSRVRecords ? this.resolveSrv : this.dnsLookup).bind(
1021
          this
1022
        )
1023
      )
1024
    );
1025
    const hostnameToConfig = zipMap(hostnames, configs);
5×
1026

1027
    return startupNodes.map((node) => {
5×
1028
      const config = hostnameToConfig.get(node.host);
5×
1029
      if (!config) {
Branches [[118, 0]] missed. 5×
1030
        return node;
!
1031
      }
1032
      if (this.options.useSRVRecords) {
5×
1033
        return Object.assign({}, node, config);
1×
1034
      }
1035
      return Object.assign({}, node, { host: config });
4×
1036
    });
1037
  }
1038

1039
  private createScanStream(
1040
    command: string,
1041
    { key, options = {} }: { key?: string; options?: ScanStreamOptions }
1042
  ) {
1043
    return new ScanStream({
1×
1044
      objectMode: true,
1045
      key: key,
1046
      redis: this,
1047
      command: command,
1048
      ...options,
1049
    });
1050
  }
1051
}
1052

1053
interface Cluster extends EventEmitter {}
1054
applyMixin(Cluster, EventEmitter);
1×
1055

1056
addTransactionSupport(Cluster.prototype);
1×
1057
interface Cluster extends Transaction {}
1058

1059
export default Cluster;
1×
Troubleshooting · Open an Issue · Sales · Support · ENTERPRISE · CAREERS · STATUS
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2023 Coveralls, Inc