• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

apache / iotdb / #9686

pending completion
#9686

push

travis_ci

web-flow
add build info in show cluster (#10595)

146 of 146 new or added lines in 13 files covered. (100.0%)

79232 of 165062 relevant lines covered (48.0%)

0.48 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

73.24
/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/ratis/RatisConsensus.java
1
/*
2
 * Licensed to the Apache Software Foundation (ASF) under one
3
 * or more contributor license agreements.  See the NOTICE file
4
 * distributed with this work for additional information
5
 * regarding copyright ownership.  The ASF licenses this file
6
 * to you under the Apache License, Version 2.0 (the
7
 * "License"); you may not use this file except in compliance
8
 * with the License.  You may obtain a copy of the License at
9
 *
10
 *     http://www.apache.org/licenses/LICENSE-2.0
11
 *
12
 * Unless required by applicable law or agreed to in writing,
13
 * software distributed under the License is distributed on an
14
 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15
 * KIND, either express or implied.  See the License for the
16
 * specific language governing permissions and limitations
17
 * under the License.
18
 */
19

20
package org.apache.iotdb.consensus.ratis;
21

22
import org.apache.iotdb.common.rpc.thrift.TConsensusGroupType;
23
import org.apache.iotdb.common.rpc.thrift.TEndPoint;
24
import org.apache.iotdb.common.rpc.thrift.TSStatus;
25
import org.apache.iotdb.commons.client.ClientManager;
26
import org.apache.iotdb.commons.client.ClientManagerMetrics;
27
import org.apache.iotdb.commons.client.IClientManager;
28
import org.apache.iotdb.commons.client.IClientPoolFactory;
29
import org.apache.iotdb.commons.client.exception.ClientManagerException;
30
import org.apache.iotdb.commons.client.property.ClientPoolProperty;
31
import org.apache.iotdb.commons.concurrent.IoTDBThreadPoolFactory;
32
import org.apache.iotdb.commons.concurrent.ThreadName;
33
import org.apache.iotdb.commons.concurrent.threadpool.ScheduledExecutorUtil;
34
import org.apache.iotdb.commons.consensus.ConsensusGroupId;
35
import org.apache.iotdb.commons.service.metric.MetricService;
36
import org.apache.iotdb.commons.utils.TestOnly;
37
import org.apache.iotdb.consensus.IConsensus;
38
import org.apache.iotdb.consensus.IStateMachine;
39
import org.apache.iotdb.consensus.common.DataSet;
40
import org.apache.iotdb.consensus.common.Peer;
41
import org.apache.iotdb.consensus.common.request.IConsensusRequest;
42
import org.apache.iotdb.consensus.common.response.ConsensusGenericResponse;
43
import org.apache.iotdb.consensus.common.response.ConsensusReadResponse;
44
import org.apache.iotdb.consensus.common.response.ConsensusWriteResponse;
45
import org.apache.iotdb.consensus.config.ConsensusConfig;
46
import org.apache.iotdb.consensus.config.RatisConfig;
47
import org.apache.iotdb.consensus.exception.ConsensusException;
48
import org.apache.iotdb.consensus.exception.ConsensusGroupNotExistException;
49
import org.apache.iotdb.consensus.exception.NodeReadOnlyException;
50
import org.apache.iotdb.consensus.exception.PeerAlreadyInConsensusGroupException;
51
import org.apache.iotdb.consensus.exception.PeerNotInConsensusGroupException;
52
import org.apache.iotdb.consensus.exception.RatisRequestFailedException;
53
import org.apache.iotdb.consensus.exception.RatisUnderRecoveryException;
54
import org.apache.iotdb.consensus.ratis.metrics.RatisMetricSet;
55
import org.apache.iotdb.consensus.ratis.metrics.RatisMetricsManager;
56
import org.apache.iotdb.consensus.ratis.utils.RatisLogMonitor;
57
import org.apache.iotdb.consensus.ratis.utils.Utils;
58

59
import org.apache.commons.pool2.KeyedObjectPool;
60
import org.apache.commons.pool2.impl.GenericKeyedObjectPool;
61
import org.apache.ratis.client.RaftClientRpc;
62
import org.apache.ratis.conf.Parameters;
63
import org.apache.ratis.conf.RaftProperties;
64
import org.apache.ratis.grpc.GrpcConfigKeys;
65
import org.apache.ratis.grpc.GrpcFactory;
66
import org.apache.ratis.protocol.ClientId;
67
import org.apache.ratis.protocol.GroupManagementRequest;
68
import org.apache.ratis.protocol.Message;
69
import org.apache.ratis.protocol.RaftClientReply;
70
import org.apache.ratis.protocol.RaftClientRequest;
71
import org.apache.ratis.protocol.RaftGroup;
72
import org.apache.ratis.protocol.RaftGroupId;
73
import org.apache.ratis.protocol.RaftPeer;
74
import org.apache.ratis.protocol.RaftPeerId;
75
import org.apache.ratis.protocol.SnapshotManagementRequest;
76
import org.apache.ratis.protocol.exceptions.NotLeaderException;
77
import org.apache.ratis.protocol.exceptions.RaftException;
78
import org.apache.ratis.protocol.exceptions.ResourceUnavailableException;
79
import org.apache.ratis.server.DivisionInfo;
80
import org.apache.ratis.server.RaftServer;
81
import org.apache.ratis.server.RaftServerConfigKeys;
82
import org.apache.ratis.util.function.CheckedSupplier;
83
import org.slf4j.Logger;
84
import org.slf4j.LoggerFactory;
85

86
import java.io.File;
87
import java.io.IOException;
88
import java.util.ArrayList;
89
import java.util.Collections;
90
import java.util.List;
91
import java.util.Map;
92
import java.util.concurrent.ConcurrentHashMap;
93
import java.util.concurrent.ExecutorService;
94
import java.util.concurrent.ScheduledExecutorService;
95
import java.util.concurrent.TimeUnit;
96
import java.util.concurrent.atomic.AtomicBoolean;
97
import java.util.concurrent.atomic.AtomicLong;
98
import java.util.stream.Collectors;
99

100
/** A multi-raft consensus implementation based on Apache Ratis. */
101
class RatisConsensus implements IConsensus {
102

103
  private static final Logger logger = LoggerFactory.getLogger(RatisConsensus.class);
1✔
104

105
  /** the unique net communication endpoint */
106
  private final RaftPeer myself;
107

108
  private final RaftServer server;
109

110
  private final RaftProperties properties = new RaftProperties();
1✔
111
  private final RaftClientRpc clientRpc;
112

113
  private final IClientManager<RaftGroup, RatisClient> clientManager;
114

115
  private final Map<RaftGroupId, RaftGroup> lastSeen = new ConcurrentHashMap<>();
1✔
116

117
  private final ClientId localFakeId = ClientId.randomId();
1✔
118
  private final AtomicLong localFakeCallId = new AtomicLong(0);
1✔
119

120
  private static final int DEFAULT_PRIORITY = 0;
121
  private static final int LEADER_PRIORITY = 1;
122

123
  /** TODO make it configurable */
124
  private static final int DEFAULT_WAIT_LEADER_READY_TIMEOUT = (int) TimeUnit.SECONDS.toMillis(20);
1✔
125

126
  private final ExecutorService addExecutor;
127
  private final ScheduledExecutorService diskGuardian;
128
  private final long triggerSnapshotThreshold;
129

130
  private final RatisConfig config;
131

132
  private final RatisLogMonitor monitor = new RatisLogMonitor();
1✔
133

134
  private final RatisMetricSet ratisMetricSet;
135
  private final TConsensusGroupType consensusGroupType;
136

137
  private final ConcurrentHashMap<ConsensusGroupId, AtomicBoolean> canServeStaleRead =
1✔
138
      new ConcurrentHashMap<>();
139

140
  public RatisConsensus(ConsensusConfig config, IStateMachine.Registry registry)
141
      throws IOException {
1✔
142
    myself =
1✔
143
        Utils.fromNodeInfoAndPriorityToRaftPeer(
1✔
144
            config.getThisNodeId(), config.getThisNodeEndPoint(), DEFAULT_PRIORITY);
1✔
145

146
    RaftServerConfigKeys.setStorageDir(
1✔
147
        properties, Collections.singletonList(new File(config.getStorageDir())));
1✔
148
    GrpcConfigKeys.Server.setPort(properties, config.getThisNodeEndPoint().getPort());
1✔
149

150
    Utils.initRatisConfig(properties, config.getRatisConfig());
1✔
151
    this.config = config.getRatisConfig();
1✔
152
    this.consensusGroupType = config.getConsensusGroupType();
1✔
153
    this.ratisMetricSet = new RatisMetricSet();
1✔
154

155
    this.triggerSnapshotThreshold = this.config.getImpl().getTriggerSnapshotFileSize();
1✔
156
    addExecutor = IoTDBThreadPoolFactory.newCachedThreadPool(ThreadName.RATIS_ADD.getName());
1✔
157
    diskGuardian =
1✔
158
        IoTDBThreadPoolFactory.newSingleThreadScheduledExecutor(
1✔
159
            ThreadName.RATIS_BG_DISK_GUARDIAN.getName());
1✔
160

161
    clientManager =
1✔
162
        new IClientManager.Factory<RaftGroup, RatisClient>()
163
            .createClientManager(new RatisClientPoolFactory());
1✔
164

165
    clientRpc = new GrpcFactory(new Parameters()).newRaftClientRpc(ClientId.randomId(), properties);
1✔
166

167
    server =
1✔
168
        RaftServer.newBuilder()
1✔
169
            .setServerId(myself.getId())
1✔
170
            .setProperties(properties)
1✔
171
            .setStateMachineRegistry(
1✔
172
                raftGroupId ->
173
                    new ApplicationStateMachineProxy(
1✔
174
                        registry.apply(Utils.fromRaftGroupIdToConsensusGroupId(raftGroupId)),
1✔
175
                        raftGroupId,
176
                        canServeStaleRead))
177
            .build();
1✔
178
  }
1✔
179

180
  @Override
181
  public void start() throws IOException {
182
    MetricService.getInstance().addMetricSet(this.ratisMetricSet);
1✔
183
    server.start();
1✔
184
    startSnapshotGuardian();
1✔
185
  }
1✔
186

187
  @Override
188
  public void stop() throws IOException {
189
    addExecutor.shutdown();
1✔
190
    diskGuardian.shutdown();
1✔
191
    try {
192
      addExecutor.awaitTermination(5, TimeUnit.SECONDS);
1✔
193
      diskGuardian.awaitTermination(5, TimeUnit.SECONDS);
1✔
194
    } catch (InterruptedException e) {
×
195
      logger.warn("{}: interrupted when shutting down add Executor with exception {}", this, e);
×
196
      Thread.currentThread().interrupt();
×
197
    } finally {
198
      clientManager.close();
1✔
199
      server.close();
1✔
200
    }
201
    MetricService.getInstance().removeMetricSet(this.ratisMetricSet);
1✔
202
  }
1✔
203

204
  private boolean shouldRetry(RaftClientReply reply) {
205
    // currently, we only retry when ResourceUnavailableException is caught
206
    return !reply.isSuccess() && (reply.getException() instanceof ResourceUnavailableException);
1✔
207
  }
208

209
  /** launch a consensus write with retry mechanism */
210
  private RaftClientReply writeWithRetry(CheckedSupplier<RaftClientReply, IOException> caller)
211
      throws IOException {
212

213
    final int maxRetryTimes = config.getImpl().getRetryTimesMax();
1✔
214
    final long waitMillis = config.getImpl().getRetryWaitMillis();
1✔
215

216
    int retry = 0;
1✔
217
    RaftClientReply reply = null;
1✔
218
    while (retry < maxRetryTimes) {
1✔
219
      retry++;
1✔
220

221
      reply = caller.get();
1✔
222
      if (!shouldRetry(reply)) {
1✔
223
        return reply;
1✔
224
      }
225
      logger.debug("{} sending write request with retry = {} and reply = {}", this, retry, reply);
×
226

227
      try {
228
        Thread.sleep(waitMillis);
×
229
      } catch (InterruptedException e) {
×
230
        logger.warn("{} retry write sleep is interrupted: {}", this, e);
×
231
        Thread.currentThread().interrupt();
×
232
      }
×
233
    }
234
    if (reply == null) {
×
235
      return RaftClientReply.newBuilder()
×
236
          .setSuccess(false)
×
237
          .setException(
×
238
              new RaftException("null reply received in writeWithRetry for request " + caller))
239
          .build();
×
240
    }
241
    return reply;
×
242
  }
243

244
  private RaftClientReply writeLocallyWithRetry(RaftClientRequest request) throws IOException {
245
    return writeWithRetry(() -> server.submitClientRequest(request));
1✔
246
  }
247

248
  private RaftClientReply writeRemotelyWithRetry(RatisClient client, Message message)
249
      throws IOException {
250
    return writeWithRetry(() -> client.getRaftClient().io().send(message));
1✔
251
  }
252

253
  /**
254
   * write will first send request to local server use method call if local server is not leader, it
255
   * will use RaftClient to send RPC to read leader
256
   */
257
  @Override
258
  public ConsensusWriteResponse write(
259
      ConsensusGroupId consensusGroupId, IConsensusRequest IConsensusRequest) {
260
    // pre-condition: group exists and myself server serves this group
261
    RaftGroupId raftGroupId = Utils.fromConsensusGroupIdToRaftGroupId(consensusGroupId);
1✔
262
    RaftGroup raftGroup = getGroupInfo(raftGroupId);
1✔
263
    if (raftGroup == null || !raftGroup.getPeers().contains(myself)) {
1✔
264
      return failedWrite(new ConsensusGroupNotExistException(consensusGroupId));
×
265
    }
266

267
    // current Peer is group leader and in ReadOnly State
268
    if (isLeader(consensusGroupId) && Utils.rejectWrite()) {
1✔
269
      try {
270
        forceStepDownLeader(raftGroup);
×
271
      } catch (Exception e) {
×
272
        logger.warn("leader {} read only, force step down failed due to {}", myself, e);
×
273
      }
×
274
      return failedWrite(new NodeReadOnlyException(myself));
×
275
    }
276

277
    // serialize request into Message
278
    Message message = new RequestMessage(IConsensusRequest);
1✔
279

280
    // 1. first try the local server
281
    RaftClientRequest clientRequest =
1✔
282
        buildRawRequest(raftGroupId, message, RaftClientRequest.writeRequestType());
1✔
283

284
    RaftClientReply localServerReply;
285
    RaftPeer suggestedLeader = null;
1✔
286
    if (isLeader(consensusGroupId) && waitUntilLeaderReady(raftGroupId)) {
1✔
287
      try (AutoCloseable ignored =
288
          RatisMetricsManager.getInstance().startWriteLocallyTimer(consensusGroupType)) {
1✔
289
        localServerReply = writeLocallyWithRetry(clientRequest);
1✔
290
        if (localServerReply.isSuccess()) {
1✔
291
          ResponseMessage responseMessage = (ResponseMessage) localServerReply.getMessage();
1✔
292
          TSStatus writeStatus = (TSStatus) responseMessage.getContentHolder();
1✔
293
          return ConsensusWriteResponse.newBuilder().setStatus(writeStatus).build();
1✔
294
        }
295
        NotLeaderException ex = localServerReply.getNotLeaderException();
×
296
        if (ex != null) { // local server is not leader
×
297
          suggestedLeader = ex.getSuggestedLeader();
×
298
        }
299
      } catch (Exception e) {
1✔
300
        return failedWrite(new RatisRequestFailedException(e));
×
301
      }
×
302
    }
303

304
    // 2. try raft client
305
    TSStatus writeResult;
306
    try (AutoCloseable ignored =
307
            RatisMetricsManager.getInstance().startWriteRemotelyTimer(consensusGroupType);
1✔
308
        RatisClient client = getRaftClient(raftGroup)) {
1✔
309
      RaftClientReply reply = writeRemotelyWithRetry(client, message);
1✔
310
      if (!reply.isSuccess()) {
1✔
311
        return failedWrite(new RatisRequestFailedException(reply.getException()));
×
312
      }
313
      writeResult = Utils.deserializeFrom(reply.getMessage().getContent().asReadOnlyByteBuffer());
1✔
314
    } catch (Exception e) {
×
315
      return failedWrite(new RatisRequestFailedException(e));
×
316
    }
1✔
317

318
    if (suggestedLeader != null) {
1✔
319
      TEndPoint leaderEndPoint = Utils.fromRaftPeerAddressToTEndPoint(suggestedLeader.getAddress());
×
320
      writeResult.setRedirectNode(new TEndPoint(leaderEndPoint.getIp(), leaderEndPoint.getPort()));
×
321
    }
322
    return ConsensusWriteResponse.newBuilder().setStatus(writeResult).build();
1✔
323
  }
324

325
  /** Read directly from LOCAL COPY notice: May read stale data (not linearizable) */
326
  @Override
327
  public ConsensusReadResponse read(
328
      ConsensusGroupId consensusGroupId, IConsensusRequest IConsensusRequest) {
329
    RaftGroupId groupId = Utils.fromConsensusGroupIdToRaftGroupId(consensusGroupId);
1✔
330
    RaftGroup group = getGroupInfo(groupId);
1✔
331
    if (group == null || !group.getPeers().contains(myself)) {
1✔
332
      return failedRead(new ConsensusGroupNotExistException(consensusGroupId));
×
333
    }
334

335
    final boolean isLinearizableRead =
1✔
336
        !canServeStaleRead.computeIfAbsent(consensusGroupId, id -> new AtomicBoolean(false)).get();
1✔
337

338
    RaftClientReply reply;
339
    try {
340
      reply = doRead(groupId, IConsensusRequest, isLinearizableRead);
1✔
341
      // allow stale read if current linearizable read returns successfully
342
      if (isLinearizableRead) {
1✔
343
        canServeStaleRead.get(consensusGroupId).set(true);
1✔
344
      }
345
    } catch (Exception e) {
1✔
346
      if (isLinearizableRead) {
1✔
347
        // linearizable read failed. the RaftServer is recovering from Raft Log and cannot serve
348
        // read requests.
349
        return failedRead(new RatisUnderRecoveryException(e));
1✔
350
      } else {
351
        return failedRead(new RatisRequestFailedException(e));
×
352
      }
353
    }
1✔
354

355
    Message ret = reply.getMessage();
1✔
356
    ResponseMessage readResponseMessage = (ResponseMessage) ret;
1✔
357
    DataSet dataSet = (DataSet) readResponseMessage.getContentHolder();
1✔
358
    return ConsensusReadResponse.newBuilder().setDataSet(dataSet).build();
1✔
359
  }
360

361
  /** return a success raft client reply or throw an Exception */
362
  private RaftClientReply doRead(
363
      RaftGroupId gid, IConsensusRequest readRequest, boolean linearizable) throws Exception {
364
    final RaftClientRequest.Type readType =
365
        linearizable
1✔
366
            ? RaftClientRequest.readRequestType()
1✔
367
            : RaftClientRequest.staleReadRequestType(-1);
1✔
368
    final RequestMessage requestMessage = new RequestMessage(readRequest);
1✔
369
    final RaftClientRequest request = buildRawRequest(gid, requestMessage, readType);
1✔
370

371
    RaftClientReply reply;
372
    try (AutoCloseable ignored =
373
        RatisMetricsManager.getInstance().startReadTimer(consensusGroupType)) {
1✔
374
      reply = server.submitClientRequest(request);
1✔
375
    }
376

377
    // rethrow the exception if the reply is not successful
378
    if (!reply.isSuccess()) {
1✔
379
      throw reply.getException();
1✔
380
    }
381

382
    return reply;
1✔
383
  }
384

385
  /**
386
   * Add this IConsensus Peer into ConsensusGroup(groupId, peers) Caller's responsibility to call
387
   * addConsensusGroup to every peer of this group and ensure the group is all up
388
   *
389
   * <p>underlying Ratis will 1. initialize a RaftServer instance 2. call GroupManagementApi to
390
   * register self to the RaftGroup
391
   */
392
  @Override
393
  public ConsensusGenericResponse createPeer(ConsensusGroupId groupId, List<Peer> peers) {
394
    RaftGroup group = buildRaftGroup(groupId, peers);
1✔
395
    // add RaftPeer myself to this RaftGroup
396
    return addNewGroupToServer(group, myself);
1✔
397
  }
398

399
  private ConsensusGenericResponse addNewGroupToServer(RaftGroup group, RaftPeer server) {
400
    RaftClientReply reply;
401
    RaftGroup clientGroup =
402
        group.getPeers().isEmpty() ? RaftGroup.valueOf(group.getGroupId(), server) : group;
1✔
403
    try (RatisClient client = getRaftClient(clientGroup)) {
1✔
404
      reply = client.getRaftClient().getGroupManagementApi(server.getId()).add(group);
1✔
405
      if (!reply.isSuccess()) {
1✔
406
        return failed(new RatisRequestFailedException(reply.getException()));
×
407
      }
408
    } catch (Exception e) {
1✔
409
      return failed(new RatisRequestFailedException(e));
1✔
410
    }
1✔
411
    return ConsensusGenericResponse.newBuilder().setSuccess(reply.isSuccess()).build();
1✔
412
  }
413

414
  /**
415
   * Remove this IConsensus Peer out of ConsensusGroup(groupId, peers) Caller's responsibility to
416
   * call removeConsensusGroup to every peer of this group and ensure the group is fully removed
417
   *
418
   * <p>underlying Ratis will 1. call GroupManagementApi to unregister self off the RaftGroup 2.
419
   * clean up
420
   */
421
  @Override
422
  public ConsensusGenericResponse deletePeer(ConsensusGroupId groupId) {
423
    RaftGroupId raftGroupId = Utils.fromConsensusGroupIdToRaftGroupId(groupId);
1✔
424

425
    // send remove group to myself
426
    RaftClientReply reply;
427
    try {
428
      reply =
1✔
429
          server.groupManagement(
1✔
430
              GroupManagementRequest.newRemove(
1✔
431
                  localFakeId,
432
                  myself.getId(),
1✔
433
                  localFakeCallId.incrementAndGet(),
1✔
434
                  raftGroupId,
435
                  true,
436
                  false));
437
      if (!reply.isSuccess()) {
1✔
438
        return failed(new RatisRequestFailedException(reply.getException()));
×
439
      }
440
    } catch (IOException e) {
×
441
      return failed(new RatisRequestFailedException(e));
×
442
    }
1✔
443

444
    return ConsensusGenericResponse.newBuilder().setSuccess(reply.isSuccess()).build();
1✔
445
  }
446

447
  /**
448
   * Add a new IConsensus Peer into ConsensusGroup with groupId
449
   *
450
   * <p>underlying Ratis will 1. call the AdminApi to notify group leader of this configuration
451
   * change
452
   */
453
  @Override
454
  public ConsensusGenericResponse addPeer(ConsensusGroupId groupId, Peer peer) {
455
    RaftGroupId raftGroupId = Utils.fromConsensusGroupIdToRaftGroupId(groupId);
1✔
456
    RaftGroup group = getGroupInfo(raftGroupId);
1✔
457
    RaftPeer peerToAdd = Utils.fromPeerAndPriorityToRaftPeer(peer, DEFAULT_PRIORITY);
1✔
458

459
    // pre-conditions: group exists and myself in this group
460
    if (group == null || !group.getPeers().contains(myself)) {
1✔
461
      return failed(new ConsensusGroupNotExistException(groupId));
×
462
    }
463

464
    // pre-condition: peer not in this group
465
    if (group.getPeers().contains(peerToAdd)) {
1✔
466
      return failed(new PeerAlreadyInConsensusGroupException(groupId, peer));
×
467
    }
468

469
    List<RaftPeer> newConfig = new ArrayList<>(group.getPeers());
1✔
470
    newConfig.add(peerToAdd);
1✔
471

472
    RaftClientReply reply;
473
    try {
474
      reply = sendReconfiguration(RaftGroup.valueOf(raftGroupId, newConfig));
1✔
475
    } catch (RatisRequestFailedException e) {
×
476
      return failed(e);
×
477
    }
1✔
478

479
    return ConsensusGenericResponse.newBuilder().setSuccess(reply.isSuccess()).build();
1✔
480
  }
481

482
  /**
483
   * Remove IConsensus Peer from ConsensusGroup with groupId
484
   *
485
   * <p>underlying Ratis will 1. call the AdminApi to notify group leader of this configuration
486
   * change
487
   */
488
  @Override
489
  public ConsensusGenericResponse removePeer(ConsensusGroupId groupId, Peer peer) {
490
    RaftGroupId raftGroupId = Utils.fromConsensusGroupIdToRaftGroupId(groupId);
1✔
491
    RaftGroup group = getGroupInfo(raftGroupId);
1✔
492
    RaftPeer peerToRemove = Utils.fromPeerAndPriorityToRaftPeer(peer, DEFAULT_PRIORITY);
1✔
493

494
    // pre-conditions: group exists and myself in this group
495
    if (group == null || !group.getPeers().contains(myself)) {
1✔
496
      return failed(new ConsensusGroupNotExistException(groupId));
×
497
    }
498
    // pre-condition: peer is a member of groupId
499
    if (!group.getPeers().contains(peerToRemove)) {
1✔
500
      return failed(new PeerNotInConsensusGroupException(groupId, myself));
×
501
    }
502

503
    // update group peer information
504
    List<RaftPeer> newConfig =
1✔
505
        group.getPeers().stream()
1✔
506
            .filter(raftPeer -> !raftPeer.equals(peerToRemove))
1✔
507
            .collect(Collectors.toList());
1✔
508

509
    RaftClientReply reply;
510
    try {
511
      reply = sendReconfiguration(RaftGroup.valueOf(raftGroupId, newConfig));
1✔
512
    } catch (RatisRequestFailedException e) {
×
513
      return failed(e);
×
514
    }
1✔
515

516
    return ConsensusGenericResponse.newBuilder().setSuccess(reply.isSuccess()).build();
1✔
517
  }
518

519
  @Override
520
  public ConsensusGenericResponse updatePeer(ConsensusGroupId groupId, Peer oldPeer, Peer newPeer) {
521
    return ConsensusGenericResponse.newBuilder().setSuccess(false).build();
×
522
  }
523

524
  @Override
525
  public ConsensusGenericResponse changePeer(ConsensusGroupId groupId, List<Peer> newPeers) {
526
    RaftGroup raftGroup = buildRaftGroup(groupId, newPeers);
1✔
527

528
    // pre-conditions: myself in this group
529
    if (!raftGroup.getPeers().contains(myself)) {
1✔
530
      return failed(new ConsensusGroupNotExistException(groupId));
×
531
    }
532

533
    // add RaftPeer myself to this RaftGroup
534
    RaftClientReply reply;
535
    try {
536
      reply = sendReconfiguration(raftGroup);
1✔
537
    } catch (RatisRequestFailedException e) {
×
538
      return failed(e);
×
539
    }
1✔
540
    return ConsensusGenericResponse.newBuilder().setSuccess(reply.isSuccess()).build();
1✔
541
  }
542

543
  /**
544
   * NOTICE: transferLeader *does not guarantee* the leader be transferred to newLeader.
545
   * transferLeader is implemented by 1. modify peer priority 2. ask current leader to step down
546
   *
547
   * <p>1. call setConfiguration to upgrade newLeader's priority to 1 and degrade all follower peers
548
   * to 0. By default, Ratis gives every Raft Peer same priority 0. Ratis does not allow a peer with
549
   * priority <= currentLeader.priority to becomes the leader, so we have to upgrade leader's
550
   * priority to 1
551
   *
552
   * <p>2. call transferLeadership to force current leader to step down and raise a new round of
553
   * election. In this election, the newLeader peer with priority 1 is guaranteed to be elected.
554
   */
555
  @Override
556
  public ConsensusGenericResponse transferLeader(ConsensusGroupId groupId, Peer newLeader) {
557

558
    // first fetch the newest information
559

560
    RaftGroupId raftGroupId = Utils.fromConsensusGroupIdToRaftGroupId(groupId);
1✔
561
    RaftGroup raftGroup = getGroupInfo(raftGroupId);
1✔
562

563
    if (raftGroup == null) {
1✔
564
      return failed(new ConsensusGroupNotExistException(groupId));
×
565
    }
566

567
    RaftPeer newRaftLeader = Utils.fromPeerAndPriorityToRaftPeer(newLeader, LEADER_PRIORITY);
1✔
568

569
    ArrayList<RaftPeer> newConfiguration = new ArrayList<>();
1✔
570
    for (RaftPeer raftPeer : raftGroup.getPeers()) {
1✔
571
      if (raftPeer.getId().equals(newRaftLeader.getId())) {
1✔
572
        newConfiguration.add(newRaftLeader);
1✔
573
      } else {
574
        // degrade every other peer to default priority
575
        newConfiguration.add(
1✔
576
            Utils.fromNodeInfoAndPriorityToRaftPeer(
1✔
577
                Utils.fromRaftPeerIdToNodeId(raftPeer.getId()),
1✔
578
                Utils.fromRaftPeerAddressToTEndPoint(raftPeer.getAddress()),
1✔
579
                DEFAULT_PRIORITY));
580
      }
581
    }
1✔
582

583
    RaftClientReply reply;
584
    try (RatisClient client = getRaftClient(raftGroup)) {
1✔
585
      RaftClientReply configChangeReply =
1✔
586
          client.getRaftClient().admin().setConfiguration(newConfiguration);
1✔
587
      if (!configChangeReply.isSuccess()) {
1✔
588
        return failed(new RatisRequestFailedException(configChangeReply.getException()));
×
589
      }
590

591
      reply = transferLeader(raftGroup, newRaftLeader);
1✔
592
      if (!reply.isSuccess()) {
1✔
593
        return failed(new RatisRequestFailedException(reply.getException()));
×
594
      }
595
    } catch (Exception e) {
×
596
      return failed(new RatisRequestFailedException(e));
×
597
    }
1✔
598
    return ConsensusGenericResponse.newBuilder().setSuccess(reply.isSuccess()).build();
1✔
599
  }
600

601
  private void forceStepDownLeader(RaftGroup group) throws Exception {
602
    // when newLeaderPeerId == null, ratis forces current leader to step down and raise new
603
    // election
604
    transferLeader(group, null);
×
605
  }
×
606

607
  private RaftClientReply transferLeader(RaftGroup group, RaftPeer newLeader) throws Exception {
608
    try (RatisClient client = getRaftClient(group)) {
1✔
609
      // TODO tuning for timeoutMs
610
      return client
1✔
611
          .getRaftClient()
1✔
612
          .admin()
1✔
613
          .transferLeadership(newLeader != null ? newLeader.getId() : null, 10000);
1✔
614
    }
615
  }
616

617
  @Override
618
  public boolean isLeader(ConsensusGroupId groupId) {
619
    RaftGroupId raftGroupId = Utils.fromConsensusGroupIdToRaftGroupId(groupId);
1✔
620

621
    boolean isLeader;
622
    try {
623
      isLeader = server.getDivision(raftGroupId).getInfo().isLeader();
1✔
624
    } catch (IOException exception) {
1✔
625
      // if the read fails, simply return not leader
626
      logger.info("isLeader request failed with exception: ", exception);
1✔
627
      isLeader = false;
1✔
628
    }
1✔
629
    return isLeader;
1✔
630
  }
631

632
  private boolean waitUntilLeaderReady(RaftGroupId groupId) {
633
    DivisionInfo divisionInfo;
634
    try {
635
      divisionInfo = server.getDivision(groupId).getInfo();
1✔
636
    } catch (IOException e) {
×
637
      // if the read fails, simply return not leader
638
      logger.info("isLeaderReady checking failed with exception: ", e);
×
639
      return false;
×
640
    }
1✔
641
    long startTime = System.currentTimeMillis();
1✔
642
    try {
643
      while (divisionInfo.isLeader() && !divisionInfo.isLeaderReady()) {
1✔
644
        Thread.sleep(10);
×
645
        long consumedTime = System.currentTimeMillis() - startTime;
×
646
        if (consumedTime >= DEFAULT_WAIT_LEADER_READY_TIMEOUT) {
×
647
          logger.warn("{}: leader is still not ready after {}ms", groupId, consumedTime);
×
648
          return false;
×
649
        }
650
      }
×
651
    } catch (InterruptedException e) {
×
652
      Thread.currentThread().interrupt();
×
653
      logger.warn("Unexpected interruption", e);
×
654
      return false;
×
655
    }
1✔
656
    return divisionInfo.isLeader();
1✔
657
  }
658

659
  /**
660
   * returns the known leader to the given group. NOTICE: if the local peer isn't a member of given
661
   * group, getLeader will return null.
662
   *
663
   * @return null if local peer isn't in group, otherwise group leader.
664
   */
665
  @Override
666
  public Peer getLeader(ConsensusGroupId groupId) {
667
    RaftGroupId raftGroupId = Utils.fromConsensusGroupIdToRaftGroupId(groupId);
1✔
668
    RaftPeerId leaderId;
669

670
    try {
671
      leaderId = server.getDivision(raftGroupId).getInfo().getLeaderId();
1✔
672
    } catch (IOException e) {
×
673
      logger.warn("fetch division info for group " + groupId + " failed due to: ", e);
×
674
      return null;
×
675
    }
1✔
676
    if (leaderId == null) {
1✔
677
      return null;
1✔
678
    }
679
    int nodeId = Utils.fromRaftPeerIdToNodeId(leaderId);
1✔
680
    return new Peer(groupId, nodeId, null);
1✔
681
  }
682

683
  @Override
684
  public List<ConsensusGroupId> getAllConsensusGroupIds() {
685
    List<ConsensusGroupId> ids = new ArrayList<>();
×
686
    server
×
687
        .getGroupIds()
×
688
        .forEach(groupId -> ids.add(Utils.fromRaftGroupIdToConsensusGroupId(groupId)));
×
689
    return ids;
×
690
  }
691

692
  @Override
693
  public ConsensusGenericResponse triggerSnapshot(ConsensusGroupId groupId) {
694
    RaftGroupId raftGroupId = Utils.fromConsensusGroupIdToRaftGroupId(groupId);
1✔
695
    RaftGroup groupInfo = getGroupInfo(raftGroupId);
1✔
696

697
    if (groupInfo == null || !groupInfo.getPeers().contains(myself)) {
1✔
698
      return failed(new ConsensusGroupNotExistException(groupId));
×
699
    }
700

701
    // TODO tuning snapshot create timeout
702
    SnapshotManagementRequest request =
1✔
703
        SnapshotManagementRequest.newCreate(
1✔
704
            localFakeId, myself.getId(), raftGroupId, localFakeCallId.incrementAndGet(), 30000);
1✔
705

706
    RaftClientReply reply;
707
    try {
708
      reply = server.snapshotManagement(request);
1✔
709
      if (!reply.isSuccess()) {
1✔
710
        return failed(new RatisRequestFailedException(reply.getException()));
×
711
      }
712
    } catch (IOException ioException) {
×
713
      return failed(new RatisRequestFailedException(ioException));
×
714
    }
1✔
715

716
    return ConsensusGenericResponse.newBuilder().setSuccess(reply.isSuccess()).build();
1✔
717
  }
718

719
  private void triggerSnapshotByCustomize() {
720

721
    for (RaftGroupId raftGroupId : server.getGroupIds()) {
1✔
722
      File currentDir;
723

724
      try {
725
        currentDir =
1✔
726
            server.getDivision(raftGroupId).getRaftStorage().getStorageDir().getCurrentDir();
1✔
727
      } catch (IOException e) {
×
728
        logger.warn("{}: get division {} failed: ", this, raftGroupId, e);
×
729
        continue;
×
730
      }
1✔
731

732
      final long currentDirLength = monitor.updateAndGetDirectorySize(currentDir);
1✔
733

734
      if (currentDirLength >= triggerSnapshotThreshold) {
1✔
735
        final int filesCount = monitor.getFilesUnder(currentDir).size();
1✔
736
        logger.info(
1✔
737
            "{}: take snapshot for region {}, current dir size {}, {} files to be purged",
738
            this,
739
            raftGroupId,
740
            currentDirLength,
1✔
741
            filesCount);
1✔
742

743
        final ConsensusGenericResponse consensusGenericResponse =
1✔
744
            triggerSnapshot(Utils.fromRaftGroupIdToConsensusGroupId(raftGroupId));
1✔
745
        if (consensusGenericResponse.isSuccess()) {
1✔
746
          logger.info("Raft group {} took snapshot successfully", raftGroupId);
1✔
747
        } else {
748
          logger.warn(
×
749
              "Raft group {} failed to take snapshot due to",
750
              raftGroupId,
751
              consensusGenericResponse.getException());
×
752
        }
753
      }
754
    }
1✔
755
  }
1✔
756

757
  private void startSnapshotGuardian() {
758
    final long delay = config.getImpl().getTriggerSnapshotTime();
1✔
759
    ScheduledExecutorUtil.safelyScheduleWithFixedDelay(
1✔
760
        diskGuardian, this::triggerSnapshotByCustomize, 0, delay, TimeUnit.SECONDS);
761
  }
1✔
762

763
  private ConsensusGenericResponse failed(ConsensusException e) {
764
    logger.debug("{} request failed with exception {}", this, e);
1✔
765
    return ConsensusGenericResponse.newBuilder().setSuccess(false).setException(e).build();
1✔
766
  }
767

768
  private ConsensusWriteResponse failedWrite(ConsensusException e) {
769
    logger.debug("{} write request failed with exception {}", this, e);
×
770
    return ConsensusWriteResponse.newBuilder().setException(e).build();
×
771
  }
772

773
  private ConsensusReadResponse failedRead(ConsensusException e) {
774
    logger.debug("{} read request failed with exception {}", this, e);
1✔
775
    return ConsensusReadResponse.newBuilder().setException(e).build();
1✔
776
  }
777

778
  private RaftClientRequest buildRawRequest(
779
      RaftGroupId groupId, Message message, RaftClientRequest.Type type) {
780
    return RaftClientRequest.newBuilder()
1✔
781
        .setServerId(server.getId())
1✔
782
        .setClientId(localFakeId)
1✔
783
        .setCallId(localFakeCallId.incrementAndGet())
1✔
784
        .setGroupId(groupId)
1✔
785
        .setType(type)
1✔
786
        .setMessage(message)
1✔
787
        .build();
1✔
788
  }
789

790
  private RaftGroup getGroupInfo(RaftGroupId raftGroupId) {
791
    RaftGroup raftGroup = null;
1✔
792
    try {
793
      raftGroup = server.getDivision(raftGroupId).getGroup();
1✔
794
      RaftGroup lastSeenGroup = lastSeen.getOrDefault(raftGroupId, null);
1✔
795
      if (lastSeenGroup != null && !lastSeenGroup.equals(raftGroup)) {
1✔
796
        // delete the pooled raft-client of the out-dated group and cache the latest
797
        clientManager.clear(lastSeenGroup);
×
798
        lastSeen.put(raftGroupId, raftGroup);
×
799
      }
800
    } catch (IOException e) {
×
801
      logger.debug("get group {} failed ", raftGroupId, e);
×
802
    }
1✔
803
    return raftGroup;
1✔
804
  }
805

806
  private RaftGroup buildRaftGroup(ConsensusGroupId groupId, List<Peer> peers) {
807
    return RaftGroup.valueOf(
1✔
808
        Utils.fromConsensusGroupIdToRaftGroupId(groupId),
1✔
809
        Utils.fromPeersAndPriorityToRaftPeers(peers, DEFAULT_PRIORITY));
1✔
810
  }
811

812
  private RatisClient getRaftClient(RaftGroup group) throws ClientManagerException {
813
    try {
814
      return clientManager.borrowClient(group);
1✔
815
    } catch (ClientManagerException e) {
×
816
      logger.error(String.format("Borrow client from pool for group %s failed.", group), e);
×
817
      // rethrow the exception
818
      throw e;
×
819
    }
820
  }
821

822
  private RaftClientReply sendReconfiguration(RaftGroup newGroupConf)
823
      throws RatisRequestFailedException {
824
    // notify the group leader of configuration change
825
    RaftClientReply reply;
826
    try (RatisClient client = getRaftClient(newGroupConf)) {
1✔
827
      reply =
1✔
828
          client.getRaftClient().admin().setConfiguration(new ArrayList<>(newGroupConf.getPeers()));
1✔
829
      if (!reply.isSuccess()) {
1✔
830
        throw new RatisRequestFailedException(reply.getException());
×
831
      }
832
    } catch (Exception e) {
×
833
      throw new RatisRequestFailedException(e);
×
834
    }
1✔
835
    return reply;
1✔
836
  }
837

838
  @TestOnly
839
  public RaftServer getServer() {
840
    return server;
×
841
  }
842

843
  @TestOnly
844
  public void allowStaleRead(ConsensusGroupId consensusGroupId) {
845
    canServeStaleRead.computeIfAbsent(consensusGroupId, id -> new AtomicBoolean(false)).set(true);
1✔
846
  }
1✔
847

848
  private class RatisClientPoolFactory implements IClientPoolFactory<RaftGroup, RatisClient> {
1✔
849

850
    @Override
851
    public KeyedObjectPool<RaftGroup, RatisClient> createClientPool(
852
        ClientManager<RaftGroup, RatisClient> manager) {
853
      GenericKeyedObjectPool<RaftGroup, RatisClient> clientPool =
1✔
854
          new GenericKeyedObjectPool<>(
855
              new RatisClient.Factory(manager, properties, clientRpc, config.getClient()),
1✔
856
              new ClientPoolProperty.Builder<RatisClient>()
857
                  .setCoreClientNumForEachNode(config.getClient().getCoreClientNumForEachNode())
1✔
858
                  .setMaxClientNumForEachNode(config.getClient().getMaxClientNumForEachNode())
1✔
859
                  .build()
1✔
860
                  .getConfig());
1✔
861
      ClientManagerMetrics.getInstance()
1✔
862
          .registerClientManager(this.getClass().getSimpleName(), clientPool);
1✔
863
      return clientPool;
1✔
864
    }
865
  }
866
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc