• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

apache / iotdb / #9852

17 Aug 2023 02:14AM UTC coverage: 48.081% (+0.003%) from 48.078%
#9852

push

travis_ci

web-flow
[RatisConsensus] Unify read timeout to Thrift connection timeout (#10876) (#10879)

19 of 19 new or added lines in 3 files covered. (100.0%)

79749 of 165865 relevant lines covered (48.08%)

0.48 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

71.39
/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/ratis/RatisConsensus.java
1
/*
2
 * Licensed to the Apache Software Foundation (ASF) under one
3
 * or more contributor license agreements.  See the NOTICE file
4
 * distributed with this work for additional information
5
 * regarding copyright ownership.  The ASF licenses this file
6
 * to you under the Apache License, Version 2.0 (the
7
 * "License"); you may not use this file except in compliance
8
 * with the License.  You may obtain a copy of the License at
9
 *
10
 *     http://www.apache.org/licenses/LICENSE-2.0
11
 *
12
 * Unless required by applicable law or agreed to in writing,
13
 * software distributed under the License is distributed on an
14
 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15
 * KIND, either express or implied.  See the License for the
16
 * specific language governing permissions and limitations
17
 * under the License.
18
 */
19

20
package org.apache.iotdb.consensus.ratis;
21

22
import org.apache.iotdb.common.rpc.thrift.TConsensusGroupType;
23
import org.apache.iotdb.common.rpc.thrift.TEndPoint;
24
import org.apache.iotdb.common.rpc.thrift.TSStatus;
25
import org.apache.iotdb.commons.client.ClientManager;
26
import org.apache.iotdb.commons.client.ClientManagerMetrics;
27
import org.apache.iotdb.commons.client.IClientManager;
28
import org.apache.iotdb.commons.client.IClientPoolFactory;
29
import org.apache.iotdb.commons.client.exception.ClientManagerException;
30
import org.apache.iotdb.commons.client.property.ClientPoolProperty;
31
import org.apache.iotdb.commons.concurrent.IoTDBThreadPoolFactory;
32
import org.apache.iotdb.commons.concurrent.ThreadName;
33
import org.apache.iotdb.commons.concurrent.threadpool.ScheduledExecutorUtil;
34
import org.apache.iotdb.commons.consensus.ConsensusGroupId;
35
import org.apache.iotdb.commons.service.metric.MetricService;
36
import org.apache.iotdb.commons.utils.TestOnly;
37
import org.apache.iotdb.consensus.IConsensus;
38
import org.apache.iotdb.consensus.IStateMachine;
39
import org.apache.iotdb.consensus.common.DataSet;
40
import org.apache.iotdb.consensus.common.Peer;
41
import org.apache.iotdb.consensus.common.request.IConsensusRequest;
42
import org.apache.iotdb.consensus.common.response.ConsensusGenericResponse;
43
import org.apache.iotdb.consensus.common.response.ConsensusReadResponse;
44
import org.apache.iotdb.consensus.common.response.ConsensusWriteResponse;
45
import org.apache.iotdb.consensus.config.ConsensusConfig;
46
import org.apache.iotdb.consensus.config.RatisConfig;
47
import org.apache.iotdb.consensus.exception.ConsensusException;
48
import org.apache.iotdb.consensus.exception.ConsensusGroupAlreadyExistException;
49
import org.apache.iotdb.consensus.exception.ConsensusGroupNotExistException;
50
import org.apache.iotdb.consensus.exception.NodeReadOnlyException;
51
import org.apache.iotdb.consensus.exception.PeerAlreadyInConsensusGroupException;
52
import org.apache.iotdb.consensus.exception.PeerNotInConsensusGroupException;
53
import org.apache.iotdb.consensus.exception.RatisRequestFailedException;
54
import org.apache.iotdb.consensus.exception.RatisUnderRecoveryException;
55
import org.apache.iotdb.consensus.ratis.metrics.RatisMetricSet;
56
import org.apache.iotdb.consensus.ratis.metrics.RatisMetricsManager;
57
import org.apache.iotdb.consensus.ratis.utils.RatisLogMonitor;
58
import org.apache.iotdb.consensus.ratis.utils.Utils;
59

60
import org.apache.commons.pool2.KeyedObjectPool;
61
import org.apache.commons.pool2.impl.GenericKeyedObjectPool;
62
import org.apache.ratis.client.RaftClientRpc;
63
import org.apache.ratis.conf.Parameters;
64
import org.apache.ratis.conf.RaftProperties;
65
import org.apache.ratis.grpc.GrpcConfigKeys;
66
import org.apache.ratis.grpc.GrpcFactory;
67
import org.apache.ratis.protocol.ClientId;
68
import org.apache.ratis.protocol.GroupManagementRequest;
69
import org.apache.ratis.protocol.Message;
70
import org.apache.ratis.protocol.RaftClientReply;
71
import org.apache.ratis.protocol.RaftClientRequest;
72
import org.apache.ratis.protocol.RaftGroup;
73
import org.apache.ratis.protocol.RaftGroupId;
74
import org.apache.ratis.protocol.RaftPeer;
75
import org.apache.ratis.protocol.RaftPeerId;
76
import org.apache.ratis.protocol.SnapshotManagementRequest;
77
import org.apache.ratis.protocol.exceptions.AlreadyExistsException;
78
import org.apache.ratis.protocol.exceptions.GroupMismatchException;
79
import org.apache.ratis.protocol.exceptions.NotLeaderException;
80
import org.apache.ratis.protocol.exceptions.RaftException;
81
import org.apache.ratis.protocol.exceptions.ResourceUnavailableException;
82
import org.apache.ratis.server.DivisionInfo;
83
import org.apache.ratis.server.RaftServer;
84
import org.apache.ratis.server.RaftServerConfigKeys;
85
import org.apache.ratis.util.function.CheckedSupplier;
86
import org.slf4j.Logger;
87
import org.slf4j.LoggerFactory;
88

89
import java.io.File;
90
import java.io.IOException;
91
import java.util.ArrayList;
92
import java.util.Collections;
93
import java.util.List;
94
import java.util.Map;
95
import java.util.concurrent.ConcurrentHashMap;
96
import java.util.concurrent.ScheduledExecutorService;
97
import java.util.concurrent.TimeUnit;
98
import java.util.concurrent.atomic.AtomicBoolean;
99
import java.util.concurrent.atomic.AtomicLong;
100
import java.util.stream.Collectors;
101

102
/** A multi-raft consensus implementation based on Apache Ratis. */
103
class RatisConsensus implements IConsensus {
104

105
  private static final Logger logger = LoggerFactory.getLogger(RatisConsensus.class);
1✔
106

107
  /** the unique net communication endpoint */
108
  private final RaftPeer myself;
109

110
  private final RaftServer server;
111

112
  private final RaftProperties properties = new RaftProperties();
1✔
113
  private final RaftClientRpc clientRpc;
114

115
  private final IClientManager<RaftGroup, RatisClient> clientManager;
116

117
  private final Map<RaftGroupId, RaftGroup> lastSeen = new ConcurrentHashMap<>();
1✔
118

119
  private final ClientId localFakeId = ClientId.randomId();
1✔
120
  private final AtomicLong localFakeCallId = new AtomicLong(0);
1✔
121

122
  private static final int DEFAULT_PRIORITY = 0;
123
  private static final int LEADER_PRIORITY = 1;
124

125
  /** TODO make it configurable */
126
  private static final int DEFAULT_WAIT_LEADER_READY_TIMEOUT = (int) TimeUnit.SECONDS.toMillis(20);
1✔
127

128
  private final ScheduledExecutorService diskGuardian;
129
  private final long triggerSnapshotThreshold;
130

131
  private final RatisConfig config;
132

133
  private final RatisLogMonitor monitor = new RatisLogMonitor();
1✔
134

135
  private final RatisMetricSet ratisMetricSet;
136
  private final TConsensusGroupType consensusGroupType;
137

138
  private final ConcurrentHashMap<ConsensusGroupId, AtomicBoolean> canServeStaleRead =
1✔
139
      new ConcurrentHashMap<>();
140

141
  public RatisConsensus(ConsensusConfig config, IStateMachine.Registry registry)
142
      throws IOException {
1✔
143
    myself =
1✔
144
        Utils.fromNodeInfoAndPriorityToRaftPeer(
1✔
145
            config.getThisNodeId(), config.getThisNodeEndPoint(), DEFAULT_PRIORITY);
1✔
146

147
    RaftServerConfigKeys.setStorageDir(
1✔
148
        properties, Collections.singletonList(new File(config.getStorageDir())));
1✔
149
    GrpcConfigKeys.Server.setPort(properties, config.getThisNodeEndPoint().getPort());
1✔
150

151
    Utils.initRatisConfig(properties, config.getRatisConfig());
1✔
152
    this.config = config.getRatisConfig();
1✔
153
    this.consensusGroupType = config.getConsensusGroupType();
1✔
154
    this.ratisMetricSet = new RatisMetricSet();
1✔
155

156
    this.triggerSnapshotThreshold = this.config.getImpl().getTriggerSnapshotFileSize();
1✔
157
    diskGuardian =
1✔
158
        IoTDBThreadPoolFactory.newSingleThreadScheduledExecutor(
1✔
159
            ThreadName.RATIS_BG_DISK_GUARDIAN.getName());
1✔
160

161
    clientManager =
1✔
162
        new IClientManager.Factory<RaftGroup, RatisClient>()
163
            .createClientManager(new RatisClientPoolFactory());
1✔
164

165
    clientRpc = new GrpcFactory(new Parameters()).newRaftClientRpc(ClientId.randomId(), properties);
1✔
166

167
    server =
1✔
168
        RaftServer.newBuilder()
1✔
169
            .setServerId(myself.getId())
1✔
170
            .setProperties(properties)
1✔
171
            .setStateMachineRegistry(
1✔
172
                raftGroupId ->
173
                    new ApplicationStateMachineProxy(
1✔
174
                        registry.apply(Utils.fromRaftGroupIdToConsensusGroupId(raftGroupId)),
1✔
175
                        raftGroupId,
176
                        canServeStaleRead))
177
            .build();
1✔
178
  }
1✔
179

180
  @Override
181
  public void start() throws IOException {
182
    MetricService.getInstance().addMetricSet(this.ratisMetricSet);
1✔
183
    server.start();
1✔
184
    startSnapshotGuardian();
1✔
185
  }
1✔
186

187
  @Override
188
  public void stop() throws IOException {
189
    diskGuardian.shutdown();
1✔
190
    try {
191
      diskGuardian.awaitTermination(5, TimeUnit.SECONDS);
1✔
192
    } catch (InterruptedException e) {
×
193
      logger.warn("{}: interrupted when shutting down add Executor with exception {}", this, e);
×
194
      Thread.currentThread().interrupt();
×
195
    } finally {
196
      clientManager.close();
1✔
197
      server.close();
1✔
198
      MetricService.getInstance().removeMetricSet(this.ratisMetricSet);
1✔
199
    }
200
  }
1✔
201

202
  private boolean shouldRetry(RaftClientReply reply) {
203
    // currently, we only retry when ResourceUnavailableException is caught
204
    return !reply.isSuccess() && (reply.getException() instanceof ResourceUnavailableException);
1✔
205
  }
206

207
  /** launch a consensus write with retry mechanism */
208
  private RaftClientReply writeWithRetry(CheckedSupplier<RaftClientReply, IOException> caller)
209
      throws IOException {
210

211
    final int maxRetryTimes = config.getImpl().getRetryTimesMax();
1✔
212
    final long waitMillis = config.getImpl().getRetryWaitMillis();
1✔
213

214
    int retry = 0;
1✔
215
    RaftClientReply reply = null;
1✔
216
    while (retry < maxRetryTimes) {
1✔
217
      retry++;
1✔
218

219
      reply = caller.get();
1✔
220
      if (!shouldRetry(reply)) {
1✔
221
        return reply;
1✔
222
      }
223
      logger.debug("{} sending write request with retry = {} and reply = {}", this, retry, reply);
×
224

225
      try {
226
        Thread.sleep(waitMillis);
×
227
      } catch (InterruptedException e) {
×
228
        logger.warn("{} retry write sleep is interrupted: {}", this, e);
×
229
        Thread.currentThread().interrupt();
×
230
      }
×
231
    }
232
    if (reply == null) {
×
233
      return RaftClientReply.newBuilder()
×
234
          .setSuccess(false)
×
235
          .setException(
×
236
              new RaftException("null reply received in writeWithRetry for request " + caller))
237
          .build();
×
238
    }
239
    return reply;
×
240
  }
241

242
  private RaftClientReply writeLocallyWithRetry(RaftClientRequest request) throws IOException {
243
    return writeWithRetry(() -> server.submitClientRequest(request));
1✔
244
  }
245

246
  private RaftClientReply writeRemotelyWithRetry(RatisClient client, Message message)
247
      throws IOException {
248
    return writeWithRetry(() -> client.getRaftClient().io().send(message));
1✔
249
  }
250

251
  /**
252
   * write will first send request to local server use method call if local server is not leader, it
253
   * will use RaftClient to send RPC to read leader
254
   */
255
  @Override
256
  public ConsensusWriteResponse write(
257
      ConsensusGroupId consensusGroupId, IConsensusRequest IConsensusRequest) {
258
    // pre-condition: group exists and myself server serves this group
259
    RaftGroupId raftGroupId = Utils.fromConsensusGroupIdToRaftGroupId(consensusGroupId);
1✔
260
    RaftGroup raftGroup = getGroupInfo(raftGroupId);
1✔
261
    if (raftGroup == null || !raftGroup.getPeers().contains(myself)) {
1✔
262
      return failedWrite(new ConsensusGroupNotExistException(consensusGroupId));
×
263
    }
264

265
    // current Peer is group leader and in ReadOnly State
266
    if (isLeader(consensusGroupId) && Utils.rejectWrite()) {
1✔
267
      try {
268
        forceStepDownLeader(raftGroup);
×
269
      } catch (Exception e) {
×
270
        logger.warn("leader {} read only, force step down failed due to {}", myself, e);
×
271
      }
×
272
      return failedWrite(new NodeReadOnlyException(myself));
×
273
    }
274

275
    // serialize request into Message
276
    Message message = new RequestMessage(IConsensusRequest);
1✔
277

278
    // 1. first try the local server
279
    RaftClientRequest clientRequest =
1✔
280
        buildRawRequest(raftGroupId, message, RaftClientRequest.writeRequestType());
1✔
281

282
    RaftClientReply localServerReply;
283
    RaftPeer suggestedLeader = null;
1✔
284
    if (isLeader(consensusGroupId) && waitUntilLeaderReady(raftGroupId)) {
1✔
285
      try (AutoCloseable ignored =
286
          RatisMetricsManager.getInstance().startWriteLocallyTimer(consensusGroupType)) {
1✔
287
        localServerReply = writeLocallyWithRetry(clientRequest);
1✔
288
        if (localServerReply.isSuccess()) {
1✔
289
          ResponseMessage responseMessage = (ResponseMessage) localServerReply.getMessage();
1✔
290
          TSStatus writeStatus = (TSStatus) responseMessage.getContentHolder();
1✔
291
          return ConsensusWriteResponse.newBuilder().setStatus(writeStatus).build();
1✔
292
        }
293
        NotLeaderException ex = localServerReply.getNotLeaderException();
×
294
        if (ex != null) { // local server is not leader
×
295
          suggestedLeader = ex.getSuggestedLeader();
×
296
        }
297
      } catch (Exception e) {
1✔
298
        return failedWrite(new RatisRequestFailedException(e));
×
299
      }
×
300
    }
301

302
    // 2. try raft client
303
    TSStatus writeResult;
304
    try (AutoCloseable ignored =
305
            RatisMetricsManager.getInstance().startWriteRemotelyTimer(consensusGroupType);
1✔
306
        RatisClient client = getRaftClient(raftGroup)) {
1✔
307
      RaftClientReply reply = writeRemotelyWithRetry(client, message);
1✔
308
      if (!reply.isSuccess()) {
1✔
309
        return failedWrite(new RatisRequestFailedException(reply.getException()));
×
310
      }
311
      writeResult = Utils.deserializeFrom(reply.getMessage().getContent().asReadOnlyByteBuffer());
1✔
312
    } catch (Exception e) {
×
313
      return failedWrite(new RatisRequestFailedException(e));
×
314
    }
1✔
315

316
    if (suggestedLeader != null) {
1✔
317
      TEndPoint leaderEndPoint = Utils.fromRaftPeerAddressToTEndPoint(suggestedLeader.getAddress());
×
318
      writeResult.setRedirectNode(new TEndPoint(leaderEndPoint.getIp(), leaderEndPoint.getPort()));
×
319
    }
320
    return ConsensusWriteResponse.newBuilder().setStatus(writeResult).build();
1✔
321
  }
322

323
  /** Read directly from LOCAL COPY notice: May read stale data (not linearizable) */
324
  @Override
325
  public ConsensusReadResponse read(
326
      ConsensusGroupId consensusGroupId, IConsensusRequest IConsensusRequest) {
327
    RaftGroupId groupId = Utils.fromConsensusGroupIdToRaftGroupId(consensusGroupId);
1✔
328
    RaftGroup group = getGroupInfo(groupId);
1✔
329
    if (group == null || !group.getPeers().contains(myself)) {
1✔
330
      return failedRead(new ConsensusGroupNotExistException(consensusGroupId));
×
331
    }
332

333
    final boolean isLinearizableRead =
1✔
334
        !canServeStaleRead.computeIfAbsent(consensusGroupId, id -> new AtomicBoolean(false)).get();
1✔
335

336
    RaftClientReply reply;
337
    try {
338
      reply = doRead(groupId, IConsensusRequest, isLinearizableRead);
1✔
339
      // allow stale read if current linearizable read returns successfully
340
      if (isLinearizableRead) {
1✔
341
        canServeStaleRead.get(consensusGroupId).set(true);
1✔
342
      }
343
    } catch (Exception e) {
1✔
344
      if (isLinearizableRead) {
1✔
345
        // linearizable read failed. the RaftServer is recovering from Raft Log and cannot serve
346
        // read requests.
347
        return failedRead(new RatisUnderRecoveryException(e));
1✔
348
      } else {
349
        return failedRead(new RatisRequestFailedException(e));
×
350
      }
351
    }
1✔
352

353
    Message ret = reply.getMessage();
1✔
354
    ResponseMessage readResponseMessage = (ResponseMessage) ret;
1✔
355
    DataSet dataSet = (DataSet) readResponseMessage.getContentHolder();
1✔
356
    return ConsensusReadResponse.newBuilder().setDataSet(dataSet).build();
1✔
357
  }
358

359
  /** return a success raft client reply or throw an Exception */
360
  private RaftClientReply doRead(
361
      RaftGroupId gid, IConsensusRequest readRequest, boolean linearizable) throws Exception {
362
    final RaftClientRequest.Type readType =
363
        linearizable
1✔
364
            ? RaftClientRequest.readRequestType()
1✔
365
            : RaftClientRequest.staleReadRequestType(-1);
1✔
366
    final RequestMessage requestMessage = new RequestMessage(readRequest);
1✔
367
    final RaftClientRequest request = buildRawRequest(gid, requestMessage, readType);
1✔
368

369
    RaftClientReply reply;
370
    try (AutoCloseable ignored =
371
        RatisMetricsManager.getInstance().startReadTimer(consensusGroupType)) {
1✔
372
      reply = server.submitClientRequest(request);
1✔
373
    }
374

375
    // rethrow the exception if the reply is not successful
376
    if (!reply.isSuccess()) {
1✔
377
      throw reply.getException();
1✔
378
    }
379

380
    return reply;
1✔
381
  }
382

383
  /**
384
   * Add this IConsensus Peer into ConsensusGroup(groupId, peers) Caller's responsibility to call
385
   * addConsensusGroup to every peer of this group and ensure the group is all up
386
   *
387
   * <p>underlying Ratis will 1. initialize a RaftServer instance 2. call GroupManagementApi to
388
   * register self to the RaftGroup
389
   */
390
  @Override
391
  public ConsensusGenericResponse createPeer(ConsensusGroupId groupId, List<Peer> peers) {
392
    RaftGroup group = buildRaftGroup(groupId, peers);
1✔
393
    RaftClientReply reply;
394
    RaftGroup clientGroup =
395
        group.getPeers().isEmpty() ? RaftGroup.valueOf(group.getGroupId(), myself) : group;
1✔
396
    try (RatisClient client = getRaftClient(clientGroup)) {
1✔
397
      reply = client.getRaftClient().getGroupManagementApi(myself.getId()).add(group);
1✔
398
      if (!reply.isSuccess()) {
1✔
399
        return failed(new RatisRequestFailedException(reply.getException()));
×
400
      }
401
    } catch (AlreadyExistsException e) {
1✔
402
      return ConsensusGenericResponse.newBuilder()
1✔
403
          .setException(new ConsensusGroupAlreadyExistException(groupId))
1✔
404
          .build();
1✔
405
    } catch (Exception e) {
×
406
      return failed(new RatisRequestFailedException(e));
×
407
    }
1✔
408
    return ConsensusGenericResponse.newBuilder().setSuccess(reply.isSuccess()).build();
1✔
409
  }
410

411
  /**
412
   * Remove this IConsensus Peer out of ConsensusGroup(groupId, peers) Caller's responsibility to
413
   * call removeConsensusGroup to every peer of this group and ensure the group is fully removed
414
   *
415
   * <p>underlying Ratis will 1. call GroupManagementApi to unregister self off the RaftGroup 2.
416
   * clean up
417
   */
418
  @Override
419
  public ConsensusGenericResponse deletePeer(ConsensusGroupId groupId) {
420
    RaftGroupId raftGroupId = Utils.fromConsensusGroupIdToRaftGroupId(groupId);
1✔
421

422
    // send remove group to myself
423
    RaftClientReply reply;
424
    try {
425
      reply =
1✔
426
          server.groupManagement(
1✔
427
              GroupManagementRequest.newRemove(
1✔
428
                  localFakeId,
429
                  myself.getId(),
1✔
430
                  localFakeCallId.incrementAndGet(),
1✔
431
                  raftGroupId,
432
                  true,
433
                  false));
434
      if (!reply.isSuccess()) {
1✔
435
        return failed(new RatisRequestFailedException(reply.getException()));
×
436
      }
437
    } catch (GroupMismatchException e) {
×
438
      return ConsensusGenericResponse.newBuilder()
×
439
          .setException(new ConsensusGroupNotExistException(groupId))
×
440
          .build();
×
441
    } catch (IOException e) {
×
442
      return failed(new RatisRequestFailedException(e));
×
443
    }
1✔
444

445
    return ConsensusGenericResponse.newBuilder().setSuccess(reply.isSuccess()).build();
1✔
446
  }
447

448
  /**
449
   * Add a new IConsensus Peer into ConsensusGroup with groupId
450
   *
451
   * <p>underlying Ratis will 1. call the AdminApi to notify group leader of this configuration
452
   * change
453
   */
454
  @Override
455
  public ConsensusGenericResponse addPeer(ConsensusGroupId groupId, Peer peer) {
456
    RaftGroupId raftGroupId = Utils.fromConsensusGroupIdToRaftGroupId(groupId);
1✔
457
    RaftGroup group = getGroupInfo(raftGroupId);
1✔
458
    RaftPeer peerToAdd = Utils.fromPeerAndPriorityToRaftPeer(peer, DEFAULT_PRIORITY);
1✔
459

460
    // pre-conditions: group exists and myself in this group
461
    if (group == null || !group.getPeers().contains(myself)) {
1✔
462
      return failed(new ConsensusGroupNotExistException(groupId));
×
463
    }
464

465
    // pre-condition: peer not in this group
466
    if (group.getPeers().contains(peerToAdd)) {
1✔
467
      return failed(new PeerAlreadyInConsensusGroupException(groupId, peer));
×
468
    }
469

470
    List<RaftPeer> newConfig = new ArrayList<>(group.getPeers());
1✔
471
    newConfig.add(peerToAdd);
1✔
472

473
    RaftClientReply reply;
474
    try {
475
      reply = sendReconfiguration(RaftGroup.valueOf(raftGroupId, newConfig));
1✔
476
    } catch (RatisRequestFailedException e) {
×
477
      return failed(e);
×
478
    }
1✔
479

480
    return ConsensusGenericResponse.newBuilder().setSuccess(reply.isSuccess()).build();
1✔
481
  }
482

483
  /**
484
   * Remove IConsensus Peer from ConsensusGroup with groupId
485
   *
486
   * <p>underlying Ratis will 1. call the AdminApi to notify group leader of this configuration
487
   * change
488
   */
489
  @Override
490
  public ConsensusGenericResponse removePeer(ConsensusGroupId groupId, Peer peer) {
491
    RaftGroupId raftGroupId = Utils.fromConsensusGroupIdToRaftGroupId(groupId);
1✔
492
    RaftGroup group = getGroupInfo(raftGroupId);
1✔
493
    RaftPeer peerToRemove = Utils.fromPeerAndPriorityToRaftPeer(peer, DEFAULT_PRIORITY);
1✔
494

495
    // pre-conditions: group exists and myself in this group
496
    if (group == null || !group.getPeers().contains(myself)) {
1✔
497
      return failed(new ConsensusGroupNotExistException(groupId));
×
498
    }
499
    // pre-condition: peer is a member of groupId
500
    if (!group.getPeers().contains(peerToRemove)) {
1✔
501
      return failed(new PeerNotInConsensusGroupException(groupId, myself));
×
502
    }
503

504
    // update group peer information
505
    List<RaftPeer> newConfig =
1✔
506
        group.getPeers().stream()
1✔
507
            .filter(raftPeer -> !raftPeer.equals(peerToRemove))
1✔
508
            .collect(Collectors.toList());
1✔
509

510
    RaftClientReply reply;
511
    try {
512
      reply = sendReconfiguration(RaftGroup.valueOf(raftGroupId, newConfig));
1✔
513
    } catch (RatisRequestFailedException e) {
×
514
      return failed(e);
×
515
    }
1✔
516

517
    return ConsensusGenericResponse.newBuilder().setSuccess(reply.isSuccess()).build();
1✔
518
  }
519

520
  @Override
521
  public ConsensusGenericResponse updatePeer(ConsensusGroupId groupId, Peer oldPeer, Peer newPeer) {
522
    return ConsensusGenericResponse.newBuilder().setSuccess(false).build();
×
523
  }
524

525
  @Override
526
  public ConsensusGenericResponse changePeer(ConsensusGroupId groupId, List<Peer> newPeers) {
527
    RaftGroup raftGroup = buildRaftGroup(groupId, newPeers);
1✔
528

529
    // pre-conditions: myself in this group
530
    if (!raftGroup.getPeers().contains(myself)) {
1✔
531
      return failed(new ConsensusGroupNotExistException(groupId));
×
532
    }
533

534
    // add RaftPeer myself to this RaftGroup
535
    RaftClientReply reply;
536
    try {
537
      reply = sendReconfiguration(raftGroup);
1✔
538
    } catch (RatisRequestFailedException e) {
×
539
      return failed(e);
×
540
    }
1✔
541
    return ConsensusGenericResponse.newBuilder().setSuccess(reply.isSuccess()).build();
1✔
542
  }
543

544
  /**
545
   * NOTICE: transferLeader *does not guarantee* the leader be transferred to newLeader.
546
   * transferLeader is implemented by 1. modify peer priority 2. ask current leader to step down
547
   *
548
   * <p>1. call setConfiguration to upgrade newLeader's priority to 1 and degrade all follower peers
549
   * to 0. By default, Ratis gives every Raft Peer same priority 0. Ratis does not allow a peer with
550
   * priority <= currentLeader.priority to becomes the leader, so we have to upgrade leader's
551
   * priority to 1
552
   *
553
   * <p>2. call transferLeadership to force current leader to step down and raise a new round of
554
   * election. In this election, the newLeader peer with priority 1 is guaranteed to be elected.
555
   */
556
  @Override
557
  public ConsensusGenericResponse transferLeader(ConsensusGroupId groupId, Peer newLeader) {
558

559
    // first fetch the newest information
560

561
    RaftGroupId raftGroupId = Utils.fromConsensusGroupIdToRaftGroupId(groupId);
1✔
562
    RaftGroup raftGroup = getGroupInfo(raftGroupId);
1✔
563

564
    if (raftGroup == null) {
1✔
565
      return failed(new ConsensusGroupNotExistException(groupId));
×
566
    }
567

568
    RaftPeer newRaftLeader = Utils.fromPeerAndPriorityToRaftPeer(newLeader, LEADER_PRIORITY);
1✔
569

570
    ArrayList<RaftPeer> newConfiguration = new ArrayList<>();
1✔
571
    for (RaftPeer raftPeer : raftGroup.getPeers()) {
1✔
572
      if (raftPeer.getId().equals(newRaftLeader.getId())) {
1✔
573
        newConfiguration.add(newRaftLeader);
1✔
574
      } else {
575
        // degrade every other peer to default priority
576
        newConfiguration.add(
1✔
577
            Utils.fromNodeInfoAndPriorityToRaftPeer(
1✔
578
                Utils.fromRaftPeerIdToNodeId(raftPeer.getId()),
1✔
579
                Utils.fromRaftPeerAddressToTEndPoint(raftPeer.getAddress()),
1✔
580
                DEFAULT_PRIORITY));
581
      }
582
    }
1✔
583

584
    RaftClientReply reply;
585
    try (RatisClient client = getRaftClient(raftGroup)) {
1✔
586
      RaftClientReply configChangeReply =
1✔
587
          client.getRaftClient().admin().setConfiguration(newConfiguration);
1✔
588
      if (!configChangeReply.isSuccess()) {
1✔
589
        return failed(new RatisRequestFailedException(configChangeReply.getException()));
×
590
      }
591

592
      reply = transferLeader(raftGroup, newRaftLeader);
1✔
593
      if (!reply.isSuccess()) {
1✔
594
        return failed(new RatisRequestFailedException(reply.getException()));
×
595
      }
596
    } catch (Exception e) {
×
597
      return failed(new RatisRequestFailedException(e));
×
598
    }
1✔
599
    return ConsensusGenericResponse.newBuilder().setSuccess(reply.isSuccess()).build();
1✔
600
  }
601

602
  private void forceStepDownLeader(RaftGroup group) throws Exception {
603
    // when newLeaderPeerId == null, ratis forces current leader to step down and raise new
604
    // election
605
    transferLeader(group, null);
×
606
  }
×
607

608
  private RaftClientReply transferLeader(RaftGroup group, RaftPeer newLeader) throws Exception {
609
    try (RatisClient client = getRaftClient(group)) {
1✔
610
      // TODO tuning for timeoutMs
611
      return client
1✔
612
          .getRaftClient()
1✔
613
          .admin()
1✔
614
          .transferLeadership(newLeader != null ? newLeader.getId() : null, 10000);
1✔
615
    }
616
  }
617

618
  @Override
619
  public boolean isLeader(ConsensusGroupId groupId) {
620
    RaftGroupId raftGroupId = Utils.fromConsensusGroupIdToRaftGroupId(groupId);
1✔
621

622
    boolean isLeader;
623
    try {
624
      isLeader = server.getDivision(raftGroupId).getInfo().isLeader();
1✔
625
    } catch (IOException exception) {
1✔
626
      // if the read fails, simply return not leader
627
      logger.info("isLeader request failed with exception: ", exception);
1✔
628
      isLeader = false;
1✔
629
    }
1✔
630
    return isLeader;
1✔
631
  }
632

633
  private boolean waitUntilLeaderReady(RaftGroupId groupId) {
634
    DivisionInfo divisionInfo;
635
    try {
636
      divisionInfo = server.getDivision(groupId).getInfo();
1✔
637
    } catch (IOException e) {
×
638
      // if the read fails, simply return not leader
639
      logger.info("isLeaderReady checking failed with exception: ", e);
×
640
      return false;
×
641
    }
1✔
642
    long startTime = System.currentTimeMillis();
1✔
643
    try {
644
      while (divisionInfo.isLeader() && !divisionInfo.isLeaderReady()) {
1✔
645
        Thread.sleep(10);
×
646
        long consumedTime = System.currentTimeMillis() - startTime;
×
647
        if (consumedTime >= DEFAULT_WAIT_LEADER_READY_TIMEOUT) {
×
648
          logger.warn("{}: leader is still not ready after {}ms", groupId, consumedTime);
×
649
          return false;
×
650
        }
651
      }
×
652
    } catch (InterruptedException e) {
×
653
      Thread.currentThread().interrupt();
×
654
      logger.warn("Unexpected interruption", e);
×
655
      return false;
×
656
    }
1✔
657
    return divisionInfo.isLeader();
1✔
658
  }
659

660
  /**
661
   * returns the known leader to the given group. NOTICE: if the local peer isn't a member of given
662
   * group, getLeader will return null.
663
   *
664
   * @return null if local peer isn't in group, otherwise group leader.
665
   */
666
  @Override
667
  public Peer getLeader(ConsensusGroupId groupId) {
668
    RaftGroupId raftGroupId = Utils.fromConsensusGroupIdToRaftGroupId(groupId);
1✔
669
    RaftPeerId leaderId;
670

671
    try {
672
      leaderId = server.getDivision(raftGroupId).getInfo().getLeaderId();
1✔
673
    } catch (IOException e) {
×
674
      logger.warn("fetch division info for group " + groupId + " failed due to: ", e);
×
675
      return null;
×
676
    }
1✔
677
    if (leaderId == null) {
1✔
678
      return null;
1✔
679
    }
680
    int nodeId = Utils.fromRaftPeerIdToNodeId(leaderId);
1✔
681
    return new Peer(groupId, nodeId, null);
1✔
682
  }
683

684
  @Override
685
  public List<ConsensusGroupId> getAllConsensusGroupIds() {
686
    List<ConsensusGroupId> ids = new ArrayList<>();
×
687
    server
×
688
        .getGroupIds()
×
689
        .forEach(groupId -> ids.add(Utils.fromRaftGroupIdToConsensusGroupId(groupId)));
×
690
    return ids;
×
691
  }
692

693
  @Override
694
  public ConsensusGenericResponse triggerSnapshot(ConsensusGroupId groupId) {
695
    RaftGroupId raftGroupId = Utils.fromConsensusGroupIdToRaftGroupId(groupId);
1✔
696
    RaftGroup groupInfo = getGroupInfo(raftGroupId);
1✔
697

698
    if (groupInfo == null || !groupInfo.getPeers().contains(myself)) {
1✔
699
      return failed(new ConsensusGroupNotExistException(groupId));
×
700
    }
701

702
    // TODO tuning snapshot create timeout
703
    SnapshotManagementRequest request =
1✔
704
        SnapshotManagementRequest.newCreate(
1✔
705
            localFakeId, myself.getId(), raftGroupId, localFakeCallId.incrementAndGet(), 30000);
1✔
706

707
    RaftClientReply reply;
708
    try {
709
      reply = server.snapshotManagement(request);
1✔
710
      if (!reply.isSuccess()) {
1✔
711
        return failed(new RatisRequestFailedException(reply.getException()));
×
712
      }
713
    } catch (IOException ioException) {
×
714
      return failed(new RatisRequestFailedException(ioException));
×
715
    }
1✔
716

717
    return ConsensusGenericResponse.newBuilder().setSuccess(reply.isSuccess()).build();
1✔
718
  }
719

720
  private void triggerSnapshotByCustomize() {
721

722
    for (RaftGroupId raftGroupId : server.getGroupIds()) {
1✔
723
      File currentDir;
724

725
      try {
726
        currentDir =
1✔
727
            server.getDivision(raftGroupId).getRaftStorage().getStorageDir().getCurrentDir();
1✔
728
      } catch (IOException e) {
×
729
        logger.warn("{}: get division {} failed: ", this, raftGroupId, e);
×
730
        continue;
×
731
      }
1✔
732

733
      final long currentDirLength = monitor.updateAndGetDirectorySize(currentDir);
1✔
734

735
      if (currentDirLength >= triggerSnapshotThreshold) {
1✔
736
        final int filesCount = monitor.getFilesUnder(currentDir).size();
1✔
737
        logger.info(
1✔
738
            "{}: take snapshot for region {}, current dir size {}, {} files to be purged",
739
            this,
740
            raftGroupId,
741
            currentDirLength,
1✔
742
            filesCount);
1✔
743

744
        final ConsensusGenericResponse consensusGenericResponse =
1✔
745
            triggerSnapshot(Utils.fromRaftGroupIdToConsensusGroupId(raftGroupId));
1✔
746
        if (consensusGenericResponse.isSuccess()) {
1✔
747
          logger.info("Raft group {} took snapshot successfully", raftGroupId);
1✔
748
        } else {
749
          logger.warn(
×
750
              "Raft group {} failed to take snapshot due to",
751
              raftGroupId,
752
              consensusGenericResponse.getException());
×
753
        }
754
      }
755
    }
1✔
756
  }
1✔
757

758
  private void startSnapshotGuardian() {
759
    final long delay = config.getImpl().getTriggerSnapshotTime();
1✔
760
    ScheduledExecutorUtil.safelyScheduleWithFixedDelay(
1✔
761
        diskGuardian, this::triggerSnapshotByCustomize, 0, delay, TimeUnit.SECONDS);
762
  }
1✔
763

764
  private ConsensusGenericResponse failed(ConsensusException e) {
765
    logger.debug("{} request failed with exception {}", this, e);
×
766
    return ConsensusGenericResponse.newBuilder().setSuccess(false).setException(e).build();
×
767
  }
768

769
  private ConsensusWriteResponse failedWrite(ConsensusException e) {
770
    logger.debug("{} write request failed with exception {}", this, e);
×
771
    return ConsensusWriteResponse.newBuilder().setException(e).build();
×
772
  }
773

774
  private ConsensusReadResponse failedRead(ConsensusException e) {
775
    logger.debug("{} read request failed with exception {}", this, e);
1✔
776
    return ConsensusReadResponse.newBuilder().setException(e).build();
1✔
777
  }
778

779
  private RaftClientRequest buildRawRequest(
780
      RaftGroupId groupId, Message message, RaftClientRequest.Type type) {
781
    return RaftClientRequest.newBuilder()
1✔
782
        .setServerId(server.getId())
1✔
783
        .setClientId(localFakeId)
1✔
784
        .setCallId(localFakeCallId.incrementAndGet())
1✔
785
        .setGroupId(groupId)
1✔
786
        .setType(type)
1✔
787
        .setMessage(message)
1✔
788
        .build();
1✔
789
  }
790

791
  private RaftGroup getGroupInfo(RaftGroupId raftGroupId) {
792
    RaftGroup raftGroup = null;
1✔
793
    try {
794
      raftGroup = server.getDivision(raftGroupId).getGroup();
1✔
795
      RaftGroup lastSeenGroup = lastSeen.getOrDefault(raftGroupId, null);
1✔
796
      if (lastSeenGroup != null && !lastSeenGroup.equals(raftGroup)) {
1✔
797
        // delete the pooled raft-client of the out-dated group and cache the latest
798
        clientManager.clear(lastSeenGroup);
×
799
        lastSeen.put(raftGroupId, raftGroup);
×
800
      }
801
    } catch (IOException e) {
×
802
      logger.debug("get group {} failed ", raftGroupId, e);
×
803
    }
1✔
804
    return raftGroup;
1✔
805
  }
806

807
  private RaftGroup buildRaftGroup(ConsensusGroupId groupId, List<Peer> peers) {
808
    return RaftGroup.valueOf(
1✔
809
        Utils.fromConsensusGroupIdToRaftGroupId(groupId),
1✔
810
        Utils.fromPeersAndPriorityToRaftPeers(peers, DEFAULT_PRIORITY));
1✔
811
  }
812

813
  private RatisClient getRaftClient(RaftGroup group) throws ClientManagerException {
814
    try {
815
      return clientManager.borrowClient(group);
1✔
816
    } catch (ClientManagerException e) {
×
817
      logger.error(String.format("Borrow client from pool for group %s failed.", group), e);
×
818
      // rethrow the exception
819
      throw e;
×
820
    }
821
  }
822

823
  private RaftClientReply sendReconfiguration(RaftGroup newGroupConf)
824
      throws RatisRequestFailedException {
825
    // notify the group leader of configuration change
826
    RaftClientReply reply;
827
    try (RatisClient client = getRaftClient(newGroupConf)) {
1✔
828
      reply =
1✔
829
          client.getRaftClient().admin().setConfiguration(new ArrayList<>(newGroupConf.getPeers()));
1✔
830
      if (!reply.isSuccess()) {
1✔
831
        throw new RatisRequestFailedException(reply.getException());
×
832
      }
833
    } catch (Exception e) {
×
834
      throw new RatisRequestFailedException(e);
×
835
    }
1✔
836
    return reply;
1✔
837
  }
838

839
  @TestOnly
840
  public RaftServer getServer() {
841
    return server;
×
842
  }
843

844
  @TestOnly
845
  public void allowStaleRead(ConsensusGroupId consensusGroupId) {
846
    canServeStaleRead.computeIfAbsent(consensusGroupId, id -> new AtomicBoolean(false)).set(true);
1✔
847
  }
1✔
848

849
  private class RatisClientPoolFactory implements IClientPoolFactory<RaftGroup, RatisClient> {
1✔
850

851
    @Override
852
    public KeyedObjectPool<RaftGroup, RatisClient> createClientPool(
853
        ClientManager<RaftGroup, RatisClient> manager) {
854
      GenericKeyedObjectPool<RaftGroup, RatisClient> clientPool =
1✔
855
          new GenericKeyedObjectPool<>(
856
              new RatisClient.Factory(manager, properties, clientRpc, config.getClient()),
1✔
857
              new ClientPoolProperty.Builder<RatisClient>()
858
                  .setCoreClientNumForEachNode(config.getClient().getCoreClientNumForEachNode())
1✔
859
                  .setMaxClientNumForEachNode(config.getClient().getMaxClientNumForEachNode())
1✔
860
                  .build()
1✔
861
                  .getConfig());
1✔
862
      ClientManagerMetrics.getInstance()
1✔
863
          .registerClientManager(this.getClass().getSimpleName(), clientPool);
1✔
864
      return clientPool;
1✔
865
    }
866
  }
867
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc