• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

apache / iotdb / #9963

30 Aug 2023 06:16AM UTC coverage: 47.759% (+0.001%) from 47.758%
#9963

push

travis_ci

web-flow
[IOTDB-6061] Fix the instability failure caused by initServer in IoTConsensus UT not binding to the corresponding port  (#10991)

80380 of 168305 relevant lines covered (47.76%)

0.48 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

73.01
/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/ratis/RatisConsensus.java
1
/*
2
 * Licensed to the Apache Software Foundation (ASF) under one
3
 * or more contributor license agreements.  See the NOTICE file
4
 * distributed with this work for additional information
5
 * regarding copyright ownership.  The ASF licenses this file
6
 * to you under the Apache License, Version 2.0 (the
7
 * "License"); you may not use this file except in compliance
8
 * with the License.  You may obtain a copy of the License at
9
 *
10
 *     http://www.apache.org/licenses/LICENSE-2.0
11
 *
12
 * Unless required by applicable law or agreed to in writing,
13
 * software distributed under the License is distributed on an
14
 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15
 * KIND, either express or implied.  See the License for the
16
 * specific language governing permissions and limitations
17
 * under the License.
18
 */
19

20
package org.apache.iotdb.consensus.ratis;
21

22
import org.apache.iotdb.common.rpc.thrift.TConsensusGroupType;
23
import org.apache.iotdb.common.rpc.thrift.TEndPoint;
24
import org.apache.iotdb.common.rpc.thrift.TSStatus;
25
import org.apache.iotdb.commons.client.ClientManager;
26
import org.apache.iotdb.commons.client.ClientManagerMetrics;
27
import org.apache.iotdb.commons.client.IClientManager;
28
import org.apache.iotdb.commons.client.IClientPoolFactory;
29
import org.apache.iotdb.commons.client.exception.ClientManagerException;
30
import org.apache.iotdb.commons.client.property.ClientPoolProperty;
31
import org.apache.iotdb.commons.concurrent.IoTDBThreadPoolFactory;
32
import org.apache.iotdb.commons.concurrent.ThreadName;
33
import org.apache.iotdb.commons.concurrent.threadpool.ScheduledExecutorUtil;
34
import org.apache.iotdb.commons.consensus.ConsensusGroupId;
35
import org.apache.iotdb.commons.service.metric.MetricService;
36
import org.apache.iotdb.commons.utils.StatusUtils;
37
import org.apache.iotdb.commons.utils.TestOnly;
38
import org.apache.iotdb.consensus.IConsensus;
39
import org.apache.iotdb.consensus.IStateMachine;
40
import org.apache.iotdb.consensus.common.DataSet;
41
import org.apache.iotdb.consensus.common.Peer;
42
import org.apache.iotdb.consensus.common.request.IConsensusRequest;
43
import org.apache.iotdb.consensus.config.ConsensusConfig;
44
import org.apache.iotdb.consensus.config.RatisConfig;
45
import org.apache.iotdb.consensus.exception.ConsensusException;
46
import org.apache.iotdb.consensus.exception.ConsensusGroupAlreadyExistException;
47
import org.apache.iotdb.consensus.exception.ConsensusGroupNotExistException;
48
import org.apache.iotdb.consensus.exception.PeerAlreadyInConsensusGroupException;
49
import org.apache.iotdb.consensus.exception.PeerNotInConsensusGroupException;
50
import org.apache.iotdb.consensus.exception.RatisRequestFailedException;
51
import org.apache.iotdb.consensus.exception.RatisUnderRecoveryException;
52
import org.apache.iotdb.consensus.ratis.metrics.RatisMetricSet;
53
import org.apache.iotdb.consensus.ratis.metrics.RatisMetricsManager;
54
import org.apache.iotdb.consensus.ratis.utils.RatisLogMonitor;
55
import org.apache.iotdb.consensus.ratis.utils.Utils;
56
import org.apache.iotdb.rpc.TSStatusCode;
57

58
import org.apache.commons.pool2.KeyedObjectPool;
59
import org.apache.commons.pool2.impl.GenericKeyedObjectPool;
60
import org.apache.ratis.client.RaftClientRpc;
61
import org.apache.ratis.conf.Parameters;
62
import org.apache.ratis.conf.RaftProperties;
63
import org.apache.ratis.grpc.GrpcConfigKeys;
64
import org.apache.ratis.grpc.GrpcFactory;
65
import org.apache.ratis.protocol.ClientId;
66
import org.apache.ratis.protocol.GroupManagementRequest;
67
import org.apache.ratis.protocol.Message;
68
import org.apache.ratis.protocol.RaftClientReply;
69
import org.apache.ratis.protocol.RaftClientRequest;
70
import org.apache.ratis.protocol.RaftGroup;
71
import org.apache.ratis.protocol.RaftGroupId;
72
import org.apache.ratis.protocol.RaftPeer;
73
import org.apache.ratis.protocol.RaftPeerId;
74
import org.apache.ratis.protocol.SnapshotManagementRequest;
75
import org.apache.ratis.protocol.exceptions.AlreadyExistsException;
76
import org.apache.ratis.protocol.exceptions.GroupMismatchException;
77
import org.apache.ratis.protocol.exceptions.NotLeaderException;
78
import org.apache.ratis.protocol.exceptions.RaftException;
79
import org.apache.ratis.protocol.exceptions.ReadException;
80
import org.apache.ratis.protocol.exceptions.ReadIndexException;
81
import org.apache.ratis.protocol.exceptions.ResourceUnavailableException;
82
import org.apache.ratis.server.DivisionInfo;
83
import org.apache.ratis.server.RaftServer;
84
import org.apache.ratis.server.RaftServerConfigKeys;
85
import org.apache.ratis.util.function.CheckedSupplier;
86
import org.slf4j.Logger;
87
import org.slf4j.LoggerFactory;
88

89
import java.io.File;
90
import java.io.IOException;
91
import java.util.ArrayList;
92
import java.util.Collections;
93
import java.util.List;
94
import java.util.Map;
95
import java.util.Optional;
96
import java.util.concurrent.ConcurrentHashMap;
97
import java.util.concurrent.ScheduledExecutorService;
98
import java.util.concurrent.TimeUnit;
99
import java.util.concurrent.atomic.AtomicBoolean;
100
import java.util.concurrent.atomic.AtomicLong;
101
import java.util.stream.Collectors;
102

103
/** A multi-raft consensus implementation based on Apache Ratis. */
104
class RatisConsensus implements IConsensus {
105

106
  private static final Logger logger = LoggerFactory.getLogger(RatisConsensus.class);
1✔
107

108
  /** the unique net communication endpoint */
109
  private final RaftPeer myself;
110

111
  private final RaftServer server;
112

113
  private final RaftProperties properties = new RaftProperties();
1✔
114
  private final RaftClientRpc clientRpc;
115

116
  private final IClientManager<RaftGroup, RatisClient> clientManager;
117

118
  private final Map<RaftGroupId, RaftGroup> lastSeen = new ConcurrentHashMap<>();
1✔
119

120
  private final ClientId localFakeId = ClientId.randomId();
1✔
121
  private final AtomicLong localFakeCallId = new AtomicLong(0);
1✔
122

123
  private static final int DEFAULT_PRIORITY = 0;
124
  private static final int LEADER_PRIORITY = 1;
125

126
  private static final int DEFAULT_WAIT_LEADER_READY_TIMEOUT = (int) TimeUnit.SECONDS.toMillis(20);
1✔
127

128
  private final ScheduledExecutorService diskGuardian;
129
  private final long triggerSnapshotThreshold;
130

131
  private final RatisConfig config;
132

133
  private final RatisLogMonitor monitor = new RatisLogMonitor();
1✔
134

135
  private final RatisMetricSet ratisMetricSet;
136
  private final TConsensusGroupType consensusGroupType;
137

138
  private final ConcurrentHashMap<ConsensusGroupId, AtomicBoolean> canServeStaleRead =
1✔
139
      new ConcurrentHashMap<>();
140

141
  public RatisConsensus(ConsensusConfig config, IStateMachine.Registry registry)
142
      throws IOException {
1✔
143
    myself =
1✔
144
        Utils.fromNodeInfoAndPriorityToRaftPeer(
1✔
145
            config.getThisNodeId(), config.getThisNodeEndPoint(), DEFAULT_PRIORITY);
1✔
146

147
    RaftServerConfigKeys.setStorageDir(
1✔
148
        properties, Collections.singletonList(new File(config.getStorageDir())));
1✔
149
    GrpcConfigKeys.Server.setPort(properties, config.getThisNodeEndPoint().getPort());
1✔
150

151
    Utils.initRatisConfig(properties, config.getRatisConfig());
1✔
152
    this.config = config.getRatisConfig();
1✔
153
    this.consensusGroupType = config.getConsensusGroupType();
1✔
154
    this.ratisMetricSet = new RatisMetricSet();
1✔
155

156
    this.triggerSnapshotThreshold = this.config.getImpl().getTriggerSnapshotFileSize();
1✔
157
    diskGuardian =
1✔
158
        IoTDBThreadPoolFactory.newSingleThreadScheduledExecutor(
1✔
159
            ThreadName.RATIS_BG_DISK_GUARDIAN.getName());
1✔
160

161
    clientManager =
1✔
162
        new IClientManager.Factory<RaftGroup, RatisClient>()
163
            .createClientManager(new RatisClientPoolFactory());
1✔
164

165
    clientRpc = new GrpcFactory(new Parameters()).newRaftClientRpc(ClientId.randomId(), properties);
1✔
166

167
    server =
1✔
168
        RaftServer.newBuilder()
1✔
169
            .setServerId(myself.getId())
1✔
170
            .setProperties(properties)
1✔
171
            .setStateMachineRegistry(
1✔
172
                raftGroupId ->
173
                    new ApplicationStateMachineProxy(
1✔
174
                        registry.apply(Utils.fromRaftGroupIdToConsensusGroupId(raftGroupId)),
1✔
175
                        raftGroupId,
176
                        canServeStaleRead))
177
            .build();
1✔
178
  }
1✔
179

180
  @Override
181
  public synchronized void start() throws IOException {
182
    MetricService.getInstance().addMetricSet(this.ratisMetricSet);
1✔
183
    server.start();
1✔
184
    startSnapshotGuardian();
1✔
185
  }
1✔
186

187
  @Override
188
  public synchronized void stop() throws IOException {
189
    diskGuardian.shutdown();
1✔
190
    try {
191
      diskGuardian.awaitTermination(5, TimeUnit.SECONDS);
1✔
192
    } catch (InterruptedException e) {
×
193
      logger.warn("{}: interrupted when shutting down add Executor with exception {}", this, e);
×
194
      Thread.currentThread().interrupt();
×
195
    } finally {
196
      clientManager.close();
1✔
197
      server.close();
1✔
198
      MetricService.getInstance().removeMetricSet(this.ratisMetricSet);
1✔
199
    }
200
  }
1✔
201

202
  private boolean shouldRetry(RaftClientReply reply) {
203
    // currently, we only retry when ResourceUnavailableException is caught
204
    return !reply.isSuccess() && (reply.getException() instanceof ResourceUnavailableException);
1✔
205
  }
206

207
  /** launch a consensus write with retry mechanism */
208
  private RaftClientReply writeWithRetry(CheckedSupplier<RaftClientReply, IOException> caller)
209
      throws IOException {
210

211
    final int maxRetryTimes = config.getImpl().getRetryTimesMax();
1✔
212
    final long waitMillis = config.getImpl().getRetryWaitMillis();
1✔
213

214
    int retry = 0;
1✔
215
    RaftClientReply reply = null;
1✔
216
    while (retry < maxRetryTimes) {
1✔
217
      retry++;
1✔
218

219
      reply = caller.get();
1✔
220
      if (!shouldRetry(reply)) {
1✔
221
        return reply;
1✔
222
      }
223
      logger.debug("{} sending write request with retry = {} and reply = {}", this, retry, reply);
×
224

225
      try {
226
        Thread.sleep(waitMillis);
×
227
      } catch (InterruptedException e) {
×
228
        logger.warn("{} retry write sleep is interrupted: {}", this, e);
×
229
        Thread.currentThread().interrupt();
×
230
      }
×
231
    }
232
    if (reply == null) {
×
233
      return RaftClientReply.newBuilder()
×
234
          .setSuccess(false)
×
235
          .setException(
×
236
              new RaftException("null reply received in writeWithRetry for request " + caller))
237
          .build();
×
238
    }
239
    return reply;
×
240
  }
241

242
  private RaftClientReply writeLocallyWithRetry(RaftClientRequest request) throws IOException {
243
    return writeWithRetry(() -> server.submitClientRequest(request));
1✔
244
  }
245

246
  private RaftClientReply writeRemotelyWithRetry(RatisClient client, Message message)
247
      throws IOException {
248
    return writeWithRetry(() -> client.getRaftClient().io().send(message));
1✔
249
  }
250

251
  /**
252
   * write will first send request to local server using local method call. If local server is not
253
   * leader, it will use RaftClient to send RPC to read leader
254
   */
255
  @Override
256
  public TSStatus write(ConsensusGroupId groupId, IConsensusRequest request)
257
      throws ConsensusException {
258
    // pre-condition: group exists and myself server serves this group
259
    RaftGroupId raftGroupId = Utils.fromConsensusGroupIdToRaftGroupId(groupId);
1✔
260
    RaftGroup raftGroup = getGroupInfo(raftGroupId);
1✔
261
    if (raftGroup == null || !raftGroup.getPeers().contains(myself)) {
1✔
262
      throw new ConsensusGroupNotExistException(groupId);
×
263
    }
264

265
    // current Peer is group leader and in ReadOnly State
266
    if (isLeader(groupId) && Utils.rejectWrite()) {
1✔
267
      try {
268
        forceStepDownLeader(raftGroup);
×
269
      } catch (Exception e) {
×
270
        logger.warn("leader {} read only, force step down failed due to {}", myself, e);
×
271
      }
×
272
      return StatusUtils.getStatus(TSStatusCode.SYSTEM_READ_ONLY);
×
273
    }
274

275
    // serialize request into Message
276
    Message message = new RequestMessage(request);
1✔
277

278
    // 1. first try the local server
279
    RaftClientRequest clientRequest =
1✔
280
        buildRawRequest(raftGroupId, message, RaftClientRequest.writeRequestType());
1✔
281

282
    RaftPeer suggestedLeader = null;
1✔
283
    if (isLeader(groupId) && waitUntilLeaderReady(raftGroupId)) {
1✔
284
      try (AutoCloseable ignored =
285
          RatisMetricsManager.getInstance().startWriteLocallyTimer(consensusGroupType)) {
1✔
286
        RaftClientReply localServerReply = writeLocallyWithRetry(clientRequest);
1✔
287
        if (localServerReply.isSuccess()) {
1✔
288
          ResponseMessage responseMessage = (ResponseMessage) localServerReply.getMessage();
1✔
289
          return (TSStatus) responseMessage.getContentHolder();
1✔
290
        }
291
        NotLeaderException ex = localServerReply.getNotLeaderException();
×
292
        if (ex != null) {
×
293
          suggestedLeader = ex.getSuggestedLeader();
×
294
        }
295
      } catch (Exception e) {
1✔
296
        throw new RatisRequestFailedException(e);
×
297
      }
×
298
    }
299

300
    // 2. try raft client
301
    TSStatus writeResult;
302
    try (AutoCloseable ignored =
303
            RatisMetricsManager.getInstance().startWriteRemotelyTimer(consensusGroupType);
1✔
304
        RatisClient client = getRaftClient(raftGroup)) {
1✔
305
      RaftClientReply reply = writeRemotelyWithRetry(client, message);
1✔
306
      if (!reply.isSuccess()) {
1✔
307
        throw new RatisRequestFailedException(reply.getException());
×
308
      }
309
      writeResult = Utils.deserializeFrom(reply.getMessage().getContent().asReadOnlyByteBuffer());
1✔
310
    } catch (Exception e) {
×
311
      throw new RatisRequestFailedException(e);
×
312
    }
1✔
313

314
    if (suggestedLeader != null) {
1✔
315
      TEndPoint leaderEndPoint = Utils.fromRaftPeerAddressToTEndPoint(suggestedLeader.getAddress());
×
316
      writeResult.setRedirectNode(new TEndPoint(leaderEndPoint.getIp(), leaderEndPoint.getPort()));
×
317
    }
318
    return writeResult;
1✔
319
  }
320

321
  /**
322
   * Read directly from LOCAL COPY notice, although we do some optimizations to try to ensure
323
   * linearizable (such as enforcing linearizable reads when the leader transfers), linearizable can
324
   * be violated in some extreme cases.
325
   */
326
  @Override
327
  public DataSet read(ConsensusGroupId groupId, IConsensusRequest request)
328
      throws ConsensusException {
329
    RaftGroupId raftGroupId = Utils.fromConsensusGroupIdToRaftGroupId(groupId);
1✔
330
    RaftGroup group = getGroupInfo(raftGroupId);
1✔
331
    if (group == null || !group.getPeers().contains(myself)) {
1✔
332
      throw new ConsensusGroupNotExistException(groupId);
×
333
    }
334

335
    final boolean isLinearizableRead =
1✔
336
        !canServeStaleRead.computeIfAbsent(groupId, id -> new AtomicBoolean(false)).get();
1✔
337

338
    RaftClientReply reply;
339
    try {
340
      reply = doRead(raftGroupId, request, isLinearizableRead);
1✔
341
      // allow stale read if current linearizable read returns successfully
342
      if (isLinearizableRead) {
1✔
343
        canServeStaleRead.get(groupId).set(true);
1✔
344
      }
345
    } catch (ReadException | ReadIndexException e) {
1✔
346
      if (isLinearizableRead) {
1✔
347
        // linearizable read failed. the RaftServer is recovering from Raft Log and cannot serve
348
        // read requests.
349
        throw new RatisUnderRecoveryException(e);
1✔
350
      } else {
351
        throw new RatisRequestFailedException(e);
×
352
      }
353
    } catch (Exception e) {
×
354
      throw new RatisRequestFailedException(e);
×
355
    }
1✔
356
    Message ret = reply.getMessage();
1✔
357
    ResponseMessage readResponseMessage = (ResponseMessage) ret;
1✔
358
    return (DataSet) readResponseMessage.getContentHolder();
1✔
359
  }
360

361
  /** return a success raft client reply or throw an Exception */
362
  private RaftClientReply doRead(
363
      RaftGroupId gid, IConsensusRequest readRequest, boolean linearizable) throws Exception {
364
    final RaftClientRequest.Type readType =
365
        linearizable
1✔
366
            ? RaftClientRequest.readRequestType()
1✔
367
            : RaftClientRequest.staleReadRequestType(-1);
1✔
368
    final RequestMessage requestMessage = new RequestMessage(readRequest);
1✔
369
    final RaftClientRequest request = buildRawRequest(gid, requestMessage, readType);
1✔
370

371
    RaftClientReply reply;
372
    try (AutoCloseable ignored =
373
        RatisMetricsManager.getInstance().startReadTimer(consensusGroupType)) {
1✔
374
      reply = server.submitClientRequest(request);
1✔
375
    }
376

377
    // rethrow the exception if the reply is not successful
378
    if (!reply.isSuccess()) {
1✔
379
      throw reply.getException();
1✔
380
    }
381

382
    return reply;
1✔
383
  }
384

385
  /**
386
   * Add this IConsensus Peer into ConsensusGroup(groupId, peers) Caller's responsibility to call
387
   * addConsensusGroup to every peer of this group and ensure the group is all up
388
   *
389
   * <p>underlying Ratis will 1. initialize a RaftServer instance 2. call GroupManagementApi to
390
   * register self to the RaftGroup
391
   */
392
  @Override
393
  public void createLocalPeer(ConsensusGroupId groupId, List<Peer> peers)
394
      throws ConsensusException {
395
    RaftGroup group = buildRaftGroup(groupId, peers);
1✔
396
    RaftGroup clientGroup =
397
        group.getPeers().isEmpty() ? RaftGroup.valueOf(group.getGroupId(), myself) : group;
1✔
398
    try (RatisClient client = getRaftClient(clientGroup)) {
1✔
399
      RaftClientReply reply =
1✔
400
          client.getRaftClient().getGroupManagementApi(myself.getId()).add(group);
1✔
401
      if (!reply.isSuccess()) {
1✔
402
        throw new RatisRequestFailedException(reply.getException());
×
403
      }
404
    } catch (AlreadyExistsException e) {
1✔
405
      throw new ConsensusGroupAlreadyExistException(groupId);
1✔
406
    } catch (Exception e) {
×
407
      throw new RatisRequestFailedException(e);
×
408
    }
1✔
409
  }
1✔
410

411
  /**
412
   * Remove this IConsensus Peer out of ConsensusGroup(groupId, peers) Caller's responsibility to
413
   * call removeConsensusGroup to every peer of this group and ensure the group is fully removed
414
   *
415
   * <p>underlying Ratis will 1. call GroupManagementApi to unregister self off the RaftGroup 2.
416
   * clean up
417
   */
418
  @Override
419
  public void deleteLocalPeer(ConsensusGroupId groupId) throws ConsensusException {
420
    RaftGroupId raftGroupId = Utils.fromConsensusGroupIdToRaftGroupId(groupId);
1✔
421

422
    // send remove group to myself
423
    RaftClientReply reply;
424
    try {
425
      reply =
1✔
426
          server.groupManagement(
1✔
427
              GroupManagementRequest.newRemove(
1✔
428
                  localFakeId,
429
                  myself.getId(),
1✔
430
                  localFakeCallId.incrementAndGet(),
1✔
431
                  raftGroupId,
432
                  true,
433
                  false));
434
      if (!reply.isSuccess()) {
1✔
435
        throw new RatisRequestFailedException(reply.getException());
×
436
      }
437
    } catch (GroupMismatchException e) {
1✔
438
      throw new ConsensusGroupNotExistException(groupId);
1✔
439
    } catch (IOException e) {
×
440
      throw new RatisRequestFailedException(e);
×
441
    }
1✔
442
  }
1✔
443

444
  /**
445
   * Add a new IConsensus Peer into ConsensusGroup with groupId
446
   *
447
   * <p>underlying Ratis will 1. call the AdminApi to notify group leader of this configuration
448
   * change
449
   */
450
  @Override
451
  public void addRemotePeer(ConsensusGroupId groupId, Peer peer) throws ConsensusException {
452
    RaftGroupId raftGroupId = Utils.fromConsensusGroupIdToRaftGroupId(groupId);
1✔
453

454
    RaftGroup group = getGroupInfo(raftGroupId);
1✔
455
    // pre-conditions: group exists and myself in this group
456
    if (group == null || !group.getPeers().contains(myself)) {
1✔
457
      throw new ConsensusGroupNotExistException(groupId);
1✔
458
    }
459

460
    RaftPeer peerToAdd = Utils.fromPeerAndPriorityToRaftPeer(peer, DEFAULT_PRIORITY);
1✔
461
    // pre-condition: peer not in this group
462
    if (group.getPeers().contains(peerToAdd)) {
1✔
463
      throw new PeerAlreadyInConsensusGroupException(groupId, peer);
1✔
464
    }
465

466
    List<RaftPeer> newConfig = new ArrayList<>(group.getPeers());
1✔
467
    newConfig.add(peerToAdd);
1✔
468

469
    sendReconfiguration(RaftGroup.valueOf(raftGroupId, newConfig));
1✔
470
  }
1✔
471

472
  /**
473
   * Remove IConsensus Peer from ConsensusGroup with groupId
474
   *
475
   * <p>underlying Ratis will 1. call the AdminApi to notify group leader of this configuration
476
   * change
477
   */
478
  @Override
479
  public void removeRemotePeer(ConsensusGroupId groupId, Peer peer) throws ConsensusException {
480
    RaftGroupId raftGroupId = Utils.fromConsensusGroupIdToRaftGroupId(groupId);
1✔
481
    RaftGroup group = getGroupInfo(raftGroupId);
1✔
482
    RaftPeer peerToRemove = Utils.fromPeerAndPriorityToRaftPeer(peer, DEFAULT_PRIORITY);
1✔
483

484
    // pre-conditions: group exists and myself in this group
485
    if (group == null || !group.getPeers().contains(myself)) {
1✔
486
      throw new ConsensusGroupNotExistException(groupId);
×
487
    }
488
    // pre-condition: peer is a member of groupId
489
    if (!group.getPeers().contains(peerToRemove)) {
1✔
490
      throw new PeerNotInConsensusGroupException(groupId, myself.getAddress());
1✔
491
    }
492

493
    // update group peer information
494
    List<RaftPeer> newConfig =
1✔
495
        group.getPeers().stream()
1✔
496
            .filter(raftPeer -> !raftPeer.equals(peerToRemove))
1✔
497
            .collect(Collectors.toList());
1✔
498

499
    sendReconfiguration(RaftGroup.valueOf(raftGroupId, newConfig));
1✔
500
  }
1✔
501

502
  /**
503
   * NOTICE: transferLeader *does not guarantee* the leader be transferred to newLeader.
504
   * transferLeader is implemented by 1. modify peer priority 2. ask current leader to step down
505
   *
506
   * <p>1. call setConfiguration to upgrade newLeader's priority to 1 and degrade all follower peers
507
   * to 0. By default, Ratis gives every Raft Peer same priority 0. Ratis does not allow a peer with
508
   * priority <= currentLeader.priority to becomes the leader, so we have to upgrade leader's
509
   * priority to 1
510
   *
511
   * <p>2. call transferLeadership to force current leader to step down and raise a new round of
512
   * election. In this election, the newLeader peer with priority 1 is guaranteed to be elected.
513
   */
514
  @Override
515
  public void transferLeader(ConsensusGroupId groupId, Peer newLeader) throws ConsensusException {
516

517
    // first fetch the newest information
518
    RaftGroupId raftGroupId = Utils.fromConsensusGroupIdToRaftGroupId(groupId);
1✔
519
    RaftGroup raftGroup =
1✔
520
        Optional.ofNullable(getGroupInfo(raftGroupId))
1✔
521
            .orElseThrow(() -> new ConsensusGroupNotExistException(groupId));
1✔
522

523
    RaftPeer newRaftLeader = Utils.fromPeerAndPriorityToRaftPeer(newLeader, LEADER_PRIORITY);
1✔
524

525
    ArrayList<RaftPeer> newConfiguration = new ArrayList<>();
1✔
526
    for (RaftPeer raftPeer : raftGroup.getPeers()) {
1✔
527
      if (raftPeer.getId().equals(newRaftLeader.getId())) {
1✔
528
        newConfiguration.add(newRaftLeader);
1✔
529
      } else {
530
        // degrade every other peer to default priority
531
        newConfiguration.add(
1✔
532
            Utils.fromNodeInfoAndPriorityToRaftPeer(
1✔
533
                Utils.fromRaftPeerIdToNodeId(raftPeer.getId()),
1✔
534
                Utils.fromRaftPeerAddressToTEndPoint(raftPeer.getAddress()),
1✔
535
                DEFAULT_PRIORITY));
536
      }
537
    }
1✔
538

539
    RaftClientReply reply;
540
    try (RatisClient client = getRaftClient(raftGroup)) {
1✔
541
      RaftClientReply configChangeReply =
1✔
542
          client.getRaftClient().admin().setConfiguration(newConfiguration);
1✔
543
      if (!configChangeReply.isSuccess()) {
1✔
544
        throw new RatisRequestFailedException(configChangeReply.getException());
×
545
      }
546

547
      reply = transferLeader(raftGroup, newRaftLeader);
1✔
548
      if (!reply.isSuccess()) {
1✔
549
        throw new RatisRequestFailedException(reply.getException());
×
550
      }
551
    } catch (Exception e) {
×
552
      throw new RatisRequestFailedException(e);
×
553
    }
1✔
554
  }
1✔
555

556
  private void forceStepDownLeader(RaftGroup group) throws Exception {
557
    // when newLeaderPeerId == null, ratis forces current leader to step down and raise new
558
    // election
559
    transferLeader(group, null);
×
560
  }
×
561

562
  private RaftClientReply transferLeader(RaftGroup group, RaftPeer newLeader) throws Exception {
563
    try (RatisClient client = getRaftClient(group)) {
1✔
564
      return client
1✔
565
          .getRaftClient()
1✔
566
          .admin()
1✔
567
          .transferLeadership(newLeader != null ? newLeader.getId() : null, 10000);
1✔
568
    }
569
  }
570

571
  @Override
572
  public boolean isLeader(ConsensusGroupId groupId) {
573
    RaftGroupId raftGroupId = Utils.fromConsensusGroupIdToRaftGroupId(groupId);
1✔
574
    try {
575
      return server.getDivision(raftGroupId).getInfo().isLeader();
1✔
576
    } catch (IOException exception) {
×
577
      // if the read fails, simply return not leader
578
      logger.info("isLeader request failed with exception: ", exception);
×
579
      return false;
×
580
    }
581
  }
582

583
  @Override
584
  public boolean isLeaderReady(ConsensusGroupId groupId) {
585
    RaftGroupId raftGroupId = Utils.fromConsensusGroupIdToRaftGroupId(groupId);
×
586
    try {
587
      return server.getDivision(raftGroupId).getInfo().isLeaderReady();
×
588
    } catch (IOException exception) {
×
589
      // if the read fails, simply return not ready
590
      logger.info("isLeaderReady request failed with exception: ", exception);
×
591
      return false;
×
592
    }
593
  }
594

595
  private boolean waitUntilLeaderReady(RaftGroupId groupId) {
596
    DivisionInfo divisionInfo;
597
    try {
598
      divisionInfo = server.getDivision(groupId).getInfo();
1✔
599
    } catch (IOException e) {
×
600
      // if the read fails, simply return not leader
601
      logger.info("isLeaderReady checking failed with exception: ", e);
×
602
      return false;
×
603
    }
1✔
604
    long startTime = System.currentTimeMillis();
1✔
605
    try {
606
      while (divisionInfo.isLeader() && !divisionInfo.isLeaderReady()) {
1✔
607
        Thread.sleep(10);
×
608
        long consumedTime = System.currentTimeMillis() - startTime;
×
609
        if (consumedTime >= DEFAULT_WAIT_LEADER_READY_TIMEOUT) {
×
610
          logger.warn("{}: leader is still not ready after {}ms", groupId, consumedTime);
×
611
          return false;
×
612
        }
613
      }
×
614
    } catch (InterruptedException e) {
×
615
      Thread.currentThread().interrupt();
×
616
      logger.warn("Unexpected interruption when waitUntilLeaderReady", e);
×
617
      return false;
×
618
    }
1✔
619
    return divisionInfo.isLeader();
1✔
620
  }
621

622
  /**
623
   * returns the known leader to the given group. NOTICE: if the local peer isn't a member of given
624
   * group, getLeader will return null.
625
   *
626
   * @return null if local peer isn't in group, otherwise group leader.
627
   */
628
  @Override
629
  public Peer getLeader(ConsensusGroupId groupId) {
630
    RaftGroupId raftGroupId = Utils.fromConsensusGroupIdToRaftGroupId(groupId);
1✔
631
    RaftPeerId leaderId;
632

633
    try {
634
      leaderId = server.getDivision(raftGroupId).getInfo().getLeaderId();
1✔
635
    } catch (IOException e) {
×
636
      logger.warn("fetch division info for group " + groupId + " failed due to: ", e);
×
637
      return null;
×
638
    }
1✔
639
    if (leaderId == null) {
1✔
640
      return null;
1✔
641
    }
642
    int nodeId = Utils.fromRaftPeerIdToNodeId(leaderId);
1✔
643
    return new Peer(groupId, nodeId, null);
1✔
644
  }
645

646
  @Override
647
  public List<ConsensusGroupId> getAllConsensusGroupIds() {
648
    List<ConsensusGroupId> ids = new ArrayList<>();
×
649
    server
×
650
        .getGroupIds()
×
651
        .forEach(groupId -> ids.add(Utils.fromRaftGroupIdToConsensusGroupId(groupId)));
×
652
    return ids;
×
653
  }
654

655
  @Override
656
  public void triggerSnapshot(ConsensusGroupId groupId) throws ConsensusException {
657
    RaftGroupId raftGroupId = Utils.fromConsensusGroupIdToRaftGroupId(groupId);
1✔
658
    RaftGroup groupInfo = getGroupInfo(raftGroupId);
1✔
659
    if (groupInfo == null || !groupInfo.getPeers().contains(myself)) {
1✔
660
      throw new ConsensusGroupNotExistException(groupId);
×
661
    }
662

663
    // TODO tuning snapshot create timeout
664
    SnapshotManagementRequest request =
1✔
665
        SnapshotManagementRequest.newCreate(
1✔
666
            localFakeId, myself.getId(), raftGroupId, localFakeCallId.incrementAndGet(), 30000);
1✔
667

668
    RaftClientReply reply;
669
    try {
670
      reply = server.snapshotManagement(request);
1✔
671
      if (!reply.isSuccess()) {
1✔
672
        throw new RatisRequestFailedException(reply.getException());
×
673
      }
674
    } catch (IOException ioException) {
×
675
      throw new RatisRequestFailedException(ioException);
×
676
    }
1✔
677
  }
1✔
678

679
  private void triggerSnapshotByCustomize() {
680

681
    for (RaftGroupId raftGroupId : server.getGroupIds()) {
1✔
682
      File currentDir;
683

684
      try {
685
        currentDir =
1✔
686
            server.getDivision(raftGroupId).getRaftStorage().getStorageDir().getCurrentDir();
1✔
687
      } catch (IOException e) {
×
688
        logger.warn("{}: get division {} failed: ", this, raftGroupId, e);
×
689
        continue;
×
690
      }
1✔
691

692
      final long currentDirLength = monitor.updateAndGetDirectorySize(currentDir);
1✔
693

694
      if (currentDirLength >= triggerSnapshotThreshold) {
1✔
695
        final int filesCount = monitor.getFilesUnder(currentDir).size();
1✔
696
        logger.info(
1✔
697
            "{}: take snapshot for region {}, current dir size {}, {} files to be purged",
698
            this,
699
            raftGroupId,
700
            currentDirLength,
1✔
701
            filesCount);
1✔
702

703
        try {
704
          triggerSnapshot(Utils.fromRaftGroupIdToConsensusGroupId(raftGroupId));
1✔
705
          logger.info("Raft group {} took snapshot successfully", raftGroupId);
1✔
706
        } catch (ConsensusException e) {
×
707
          logger.warn("Raft group {} failed to take snapshot due to", raftGroupId, e);
×
708
        }
1✔
709
      }
710
    }
1✔
711
  }
1✔
712

713
  private void startSnapshotGuardian() {
714
    final long delay = config.getImpl().getTriggerSnapshotTime();
1✔
715
    ScheduledExecutorUtil.safelyScheduleWithFixedDelay(
1✔
716
        diskGuardian, this::triggerSnapshotByCustomize, 0, delay, TimeUnit.SECONDS);
717
  }
1✔
718

719
  private RaftClientRequest buildRawRequest(
720
      RaftGroupId groupId, Message message, RaftClientRequest.Type type) {
721
    return RaftClientRequest.newBuilder()
1✔
722
        .setServerId(server.getId())
1✔
723
        .setClientId(localFakeId)
1✔
724
        .setCallId(localFakeCallId.incrementAndGet())
1✔
725
        .setGroupId(groupId)
1✔
726
        .setType(type)
1✔
727
        .setMessage(message)
1✔
728
        .build();
1✔
729
  }
730

731
  private RaftGroup getGroupInfo(RaftGroupId raftGroupId) {
732
    RaftGroup raftGroup = null;
1✔
733
    try {
734
      raftGroup = server.getDivision(raftGroupId).getGroup();
1✔
735
      RaftGroup lastSeenGroup = lastSeen.getOrDefault(raftGroupId, null);
1✔
736
      if (lastSeenGroup != null && !lastSeenGroup.equals(raftGroup)) {
1✔
737
        // delete the pooled raft-client of the out-dated group and cache the latest
738
        clientManager.clear(lastSeenGroup);
×
739
        lastSeen.put(raftGroupId, raftGroup);
×
740
      }
741
    } catch (IOException e) {
1✔
742
      logger.debug("get group {} failed ", raftGroupId, e);
1✔
743
    }
1✔
744
    return raftGroup;
1✔
745
  }
746

747
  private RaftGroup buildRaftGroup(ConsensusGroupId groupId, List<Peer> peers) {
748
    return RaftGroup.valueOf(
1✔
749
        Utils.fromConsensusGroupIdToRaftGroupId(groupId),
1✔
750
        Utils.fromPeersAndPriorityToRaftPeers(peers, DEFAULT_PRIORITY));
1✔
751
  }
752

753
  private RatisClient getRaftClient(RaftGroup group) throws ClientManagerException {
754
    try {
755
      return clientManager.borrowClient(group);
1✔
756
    } catch (ClientManagerException e) {
×
757
      logger.error(String.format("Borrow client from pool for group %s failed.", group), e);
×
758
      // rethrow the exception
759
      throw e;
×
760
    }
761
  }
762

763
  private RaftClientReply sendReconfiguration(RaftGroup newGroupConf)
764
      throws RatisRequestFailedException {
765
    // notify the group leader of configuration change
766
    RaftClientReply reply;
767
    try (RatisClient client = getRaftClient(newGroupConf)) {
1✔
768
      reply =
1✔
769
          client.getRaftClient().admin().setConfiguration(new ArrayList<>(newGroupConf.getPeers()));
1✔
770
      if (!reply.isSuccess()) {
1✔
771
        throw new RatisRequestFailedException(reply.getException());
×
772
      }
773
    } catch (Exception e) {
×
774
      throw new RatisRequestFailedException(e);
×
775
    }
1✔
776
    return reply;
1✔
777
  }
778

779
  @TestOnly
780
  public RaftServer getServer() {
781
    return server;
×
782
  }
783

784
  @TestOnly
785
  public void allowStaleRead(ConsensusGroupId consensusGroupId) {
786
    canServeStaleRead.computeIfAbsent(consensusGroupId, id -> new AtomicBoolean(false)).set(true);
1✔
787
  }
1✔
788

789
  private class RatisClientPoolFactory implements IClientPoolFactory<RaftGroup, RatisClient> {
1✔
790

791
    @Override
792
    public KeyedObjectPool<RaftGroup, RatisClient> createClientPool(
793
        ClientManager<RaftGroup, RatisClient> manager) {
794
      GenericKeyedObjectPool<RaftGroup, RatisClient> clientPool =
1✔
795
          new GenericKeyedObjectPool<>(
796
              new RatisClient.Factory(manager, properties, clientRpc, config.getClient()),
1✔
797
              new ClientPoolProperty.Builder<RatisClient>()
798
                  .setCoreClientNumForEachNode(config.getClient().getCoreClientNumForEachNode())
1✔
799
                  .setMaxClientNumForEachNode(config.getClient().getMaxClientNumForEachNode())
1✔
800
                  .build()
1✔
801
                  .getConfig());
1✔
802
      ClientManagerMetrics.getInstance()
1✔
803
          .registerClientManager(this.getClass().getSimpleName(), clientPool);
1✔
804
      return clientPool;
1✔
805
    }
806
  }
807
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc