• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

apache / iotdb / #9967

30 Aug 2023 04:22PM CUT coverage: 47.7% (+0.04%) from 47.658%
#9967

push

travis_ci

web-flow
Pipe: Fix start-time and end-time parameters not working when extracting history data (#11001) (#11002)

(cherry picked from commit 35736cc67)

12 of 12 new or added lines in 6 files covered. (100.0%)

80165 of 168062 relevant lines covered (47.7%)

0.48 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

40.71
/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/IoTConsensusServerImpl.java
1
/*
2
 * Licensed to the Apache Software Foundation (ASF) under one
3
 * or more contributor license agreements.  See the NOTICE file
4
 * distributed with this work for additional information
5
 * regarding copyright ownership.  The ASF licenses this file
6
 * to you under the Apache License, Version 2.0 (the
7
 * "License"); you may not use this file except in compliance
8
 * with the License.  You may obtain a copy of the License at
9
 *
10
 *     http://www.apache.org/licenses/LICENSE-2.0
11
 *
12
 * Unless required by applicable law or agreed to in writing,
13
 * software distributed under the License is distributed on an
14
 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15
 * KIND, either express or implied.  See the License for the
16
 * specific language governing permissions and limitations
17
 * under the License.
18
 */
19

20
package org.apache.iotdb.consensus.iot;
21

22
import org.apache.iotdb.common.rpc.thrift.TEndPoint;
23
import org.apache.iotdb.common.rpc.thrift.TSStatus;
24
import org.apache.iotdb.commons.client.IClientManager;
25
import org.apache.iotdb.commons.client.exception.ClientManagerException;
26
import org.apache.iotdb.commons.consensus.index.ComparableConsensusRequest;
27
import org.apache.iotdb.commons.consensus.index.impl.IoTProgressIndex;
28
import org.apache.iotdb.commons.service.metric.MetricService;
29
import org.apache.iotdb.commons.service.metric.PerformanceOverviewMetrics;
30
import org.apache.iotdb.consensus.IStateMachine;
31
import org.apache.iotdb.consensus.common.DataSet;
32
import org.apache.iotdb.consensus.common.Peer;
33
import org.apache.iotdb.consensus.common.request.DeserializedBatchIndexedConsensusRequest;
34
import org.apache.iotdb.consensus.common.request.IConsensusRequest;
35
import org.apache.iotdb.consensus.common.request.IndexedConsensusRequest;
36
import org.apache.iotdb.consensus.config.IoTConsensusConfig;
37
import org.apache.iotdb.consensus.exception.ConsensusGroupModifyPeerException;
38
import org.apache.iotdb.consensus.iot.client.AsyncIoTConsensusServiceClient;
39
import org.apache.iotdb.consensus.iot.client.SyncIoTConsensusServiceClient;
40
import org.apache.iotdb.consensus.iot.log.ConsensusReqReader;
41
import org.apache.iotdb.consensus.iot.log.GetConsensusReqReaderPlan;
42
import org.apache.iotdb.consensus.iot.logdispatcher.LogDispatcher;
43
import org.apache.iotdb.consensus.iot.snapshot.SnapshotFragmentReader;
44
import org.apache.iotdb.consensus.iot.thrift.TActivatePeerReq;
45
import org.apache.iotdb.consensus.iot.thrift.TActivatePeerRes;
46
import org.apache.iotdb.consensus.iot.thrift.TBuildSyncLogChannelReq;
47
import org.apache.iotdb.consensus.iot.thrift.TBuildSyncLogChannelRes;
48
import org.apache.iotdb.consensus.iot.thrift.TCleanupTransferredSnapshotReq;
49
import org.apache.iotdb.consensus.iot.thrift.TCleanupTransferredSnapshotRes;
50
import org.apache.iotdb.consensus.iot.thrift.TInactivatePeerReq;
51
import org.apache.iotdb.consensus.iot.thrift.TInactivatePeerRes;
52
import org.apache.iotdb.consensus.iot.thrift.TRemoveSyncLogChannelReq;
53
import org.apache.iotdb.consensus.iot.thrift.TRemoveSyncLogChannelRes;
54
import org.apache.iotdb.consensus.iot.thrift.TSendSnapshotFragmentReq;
55
import org.apache.iotdb.consensus.iot.thrift.TSendSnapshotFragmentRes;
56
import org.apache.iotdb.consensus.iot.thrift.TTriggerSnapshotLoadReq;
57
import org.apache.iotdb.consensus.iot.thrift.TTriggerSnapshotLoadRes;
58
import org.apache.iotdb.consensus.iot.thrift.TWaitSyncLogCompleteReq;
59
import org.apache.iotdb.consensus.iot.thrift.TWaitSyncLogCompleteRes;
60
import org.apache.iotdb.rpc.RpcUtils;
61
import org.apache.iotdb.rpc.TSStatusCode;
62
import org.apache.iotdb.tsfile.utils.PublicBAOS;
63

64
import org.apache.commons.io.FileUtils;
65
import org.apache.thrift.TException;
66
import org.slf4j.Logger;
67
import org.slf4j.LoggerFactory;
68

69
import java.io.DataOutputStream;
70
import java.io.File;
71
import java.io.IOException;
72
import java.nio.ByteBuffer;
73
import java.nio.file.Files;
74
import java.nio.file.Path;
75
import java.nio.file.Paths;
76
import java.nio.file.StandardOpenOption;
77
import java.util.ArrayList;
78
import java.util.Collections;
79
import java.util.LinkedList;
80
import java.util.List;
81
import java.util.PriorityQueue;
82
import java.util.concurrent.ConcurrentHashMap;
83
import java.util.concurrent.ScheduledExecutorService;
84
import java.util.concurrent.TimeUnit;
85
import java.util.concurrent.atomic.AtomicLong;
86
import java.util.concurrent.locks.Condition;
87
import java.util.concurrent.locks.Lock;
88
import java.util.concurrent.locks.ReentrantLock;
89
import java.util.regex.Pattern;
90

91
public class IoTConsensusServerImpl {
92

93
  private static final String CONFIGURATION_FILE_NAME = "configuration.dat";
94
  private static final String CONFIGURATION_TMP_FILE_NAME = "configuration.dat.tmp";
95
  public static final String SNAPSHOT_DIR_NAME = "snapshot";
96
  private static final Pattern SNAPSHOT_INDEX_PATTEN = Pattern.compile(".*[^\\d](?=(\\d+))");
1✔
97
  private static final PerformanceOverviewMetrics PERFORMANCE_OVERVIEW_METRICS =
1✔
98
      PerformanceOverviewMetrics.getInstance();
1✔
99
  private final Logger logger = LoggerFactory.getLogger(IoTConsensusServerImpl.class);
1✔
100
  private final Peer thisNode;
101
  private final IStateMachine stateMachine;
102
  private final ConcurrentHashMap<Integer, SyncLogCacheQueue> cacheQueueMap;
103
  private final Lock stateMachineLock = new ReentrantLock();
1✔
104
  private final Condition stateMachineCondition = stateMachineLock.newCondition();
1✔
105
  private final String storageDir;
106
  private final List<Peer> configuration;
107
  private final AtomicLong searchIndex;
108
  private final LogDispatcher logDispatcher;
109
  private final IoTConsensusConfig config;
110
  private final ConsensusReqReader consensusReqReader;
111
  private volatile boolean active;
112
  private String newSnapshotDirName;
113
  private final IClientManager<TEndPoint, SyncIoTConsensusServiceClient> syncClientManager;
114
  private final IoTConsensusServerMetrics ioTConsensusServerMetrics;
115
  private final String consensusGroupId;
116
  private final ScheduledExecutorService retryService;
117

118
  public IoTConsensusServerImpl(
119
      String storageDir,
120
      Peer thisNode,
121
      List<Peer> configuration,
122
      IStateMachine stateMachine,
123
      ScheduledExecutorService retryService,
124
      IClientManager<TEndPoint, AsyncIoTConsensusServiceClient> clientManager,
125
      IClientManager<TEndPoint, SyncIoTConsensusServiceClient> syncClientManager,
126
      IoTConsensusConfig config) {
1✔
127
    this.active = true;
1✔
128
    this.storageDir = storageDir;
1✔
129
    this.thisNode = thisNode;
1✔
130
    this.stateMachine = stateMachine;
1✔
131
    this.cacheQueueMap = new ConcurrentHashMap<>();
1✔
132
    this.syncClientManager = syncClientManager;
1✔
133
    this.configuration = configuration;
1✔
134
    if (configuration.isEmpty()) {
1✔
135
      recoverConfiguration();
1✔
136
    } else {
137
      persistConfiguration();
1✔
138
    }
139
    this.retryService = retryService;
1✔
140
    this.config = config;
1✔
141
    this.consensusGroupId = thisNode.getGroupId().toString();
1✔
142
    consensusReqReader = (ConsensusReqReader) stateMachine.read(new GetConsensusReqReaderPlan());
1✔
143
    this.searchIndex = new AtomicLong(consensusReqReader.getCurrentSearchIndex());
1✔
144
    this.ioTConsensusServerMetrics = new IoTConsensusServerMetrics(this);
1✔
145
    this.logDispatcher = new LogDispatcher(this, clientManager);
1✔
146
    // Since the underlying wal does not persist safelyDeletedSearchIndex, IoTConsensus needs to
147
    // update wal with its syncIndex recovered from the consensus layer when initializing.
148
    // This prevents wal from being piled up if the safelyDeletedSearchIndex is not updated after
149
    // the restart and Leader migration occurs
150
    checkAndUpdateSafeDeletedSearchIndex();
1✔
151
  }
1✔
152

153
  public IStateMachine getStateMachine() {
154
    return stateMachine;
1✔
155
  }
156

157
  public void start() {
158
    MetricService.getInstance().addMetricSet(this.ioTConsensusServerMetrics);
1✔
159
    stateMachine.start();
1✔
160
    logDispatcher.start();
1✔
161
  }
1✔
162

163
  public void stop() {
164
    logDispatcher.stop();
1✔
165
    stateMachine.stop();
1✔
166
    MetricService.getInstance().removeMetricSet(this.ioTConsensusServerMetrics);
1✔
167
  }
1✔
168

169
  /**
170
   * records the index of the log and writes locally, and then asynchronous replication is
171
   * performed.
172
   */
173
  public TSStatus write(IConsensusRequest request) {
174
    long consensusWriteStartTime = System.nanoTime();
1✔
175
    stateMachineLock.lock();
1✔
176
    try {
177
      long getStateMachineLockTime = System.nanoTime();
1✔
178
      // statistic the time of acquiring stateMachine lock
179
      ioTConsensusServerMetrics.recordGetStateMachineLockTime(
1✔
180
          getStateMachineLockTime - consensusWriteStartTime);
181
      if (needBlockWrite()) {
1✔
182
        logger.info(
×
183
            "[Throttle Down] index:{}, safeIndex:{}",
184
            getSearchIndex(),
×
185
            getCurrentSafelyDeletedSearchIndex());
×
186
        try {
187
          boolean timeout =
×
188
              !stateMachineCondition.await(
×
189
                  config.getReplication().getThrottleTimeOutMs(), TimeUnit.MILLISECONDS);
×
190
          if (timeout) {
×
191
            return RpcUtils.getStatus(
×
192
                TSStatusCode.WRITE_PROCESS_REJECT,
193
                String.format(
×
194
                    "The write is rejected because the wal directory size has reached the "
195
                        + "threshold %d bytes. You may need to adjust the flush policy of the "
196
                        + "storage storageengine or the IoTConsensus synchronization parameter",
197
                    config.getReplication().getWalThrottleThreshold()));
×
198
          }
199
        } catch (InterruptedException e) {
×
200
          logger.error("Failed to throttle down because ", e);
×
201
          Thread.currentThread().interrupt();
×
202
        }
×
203
      }
204
      long writeToStateMachineStartTime = System.nanoTime();
1✔
205
      // statistic the time of checking write block
206
      ioTConsensusServerMetrics.recordCheckingBeforeWriteTime(
1✔
207
          writeToStateMachineStartTime - getStateMachineLockTime);
208
      IndexedConsensusRequest indexedConsensusRequest =
1✔
209
          buildIndexedConsensusRequestForLocalRequest(request);
1✔
210
      if (indexedConsensusRequest.getSearchIndex() % 10000 == 0) {
1✔
211
        logger.info(
×
212
            "DataRegion[{}]: index after build: safeIndex:{}, searchIndex: {}",
213
            thisNode.getGroupId(),
×
214
            getCurrentSafelyDeletedSearchIndex(),
×
215
            indexedConsensusRequest.getSearchIndex());
×
216
      }
217
      IConsensusRequest planNode = stateMachine.deserializeRequest(indexedConsensusRequest);
1✔
218
      long startWriteTime = System.nanoTime();
1✔
219
      TSStatus result = stateMachine.write(planNode);
1✔
220
      PERFORMANCE_OVERVIEW_METRICS.recordEngineCost(System.nanoTime() - startWriteTime);
1✔
221

222
      long writeToStateMachineEndTime = System.nanoTime();
1✔
223
      // statistic the time of writing request into stateMachine
224
      ioTConsensusServerMetrics.recordWriteStateMachineTime(
1✔
225
          writeToStateMachineEndTime - writeToStateMachineStartTime);
226
      if (result.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
1✔
227
        // The index is used when constructing batch in LogDispatcher. If its value
228
        // increases but the corresponding request does not exist or is not put into
229
        // the queue, the dispatcher will try to find the request in WAL. This behavior
230
        // is not expected and will slow down the preparation speed for batch.
231
        // So we need to use the lock to ensure the `offer()` and `incrementAndGet()` are
232
        // in one transaction.
233
        synchronized (searchIndex) {
1✔
234
          logDispatcher.offer(indexedConsensusRequest);
1✔
235
          searchIndex.incrementAndGet();
1✔
236
        }
1✔
237
        // statistic the time of offering request into queue
238
        ioTConsensusServerMetrics.recordOfferRequestToQueueTime(
1✔
239
            System.nanoTime() - writeToStateMachineEndTime);
1✔
240
      } else {
241
        logger.debug(
×
242
            "{}: write operation failed. searchIndex: {}. Code: {}",
243
            thisNode.getGroupId(),
×
244
            indexedConsensusRequest.getSearchIndex(),
×
245
            result.getCode());
×
246
      }
247
      // statistic the time of total write process
248
      ioTConsensusServerMetrics.recordConsensusWriteTime(
1✔
249
          System.nanoTime() - consensusWriteStartTime);
1✔
250
      return result;
1✔
251
    } finally {
252
      stateMachineLock.unlock();
1✔
253
    }
254
  }
255

256
  public DataSet read(IConsensusRequest request) {
257
    return stateMachine.read(request);
×
258
  }
259

260
  public void takeSnapshot() throws ConsensusGroupModifyPeerException {
261
    try {
262
      long newSnapshotIndex = getLatestSnapshotIndex() + 1;
1✔
263
      newSnapshotDirName =
1✔
264
          String.format(
1✔
265
              "%s_%s_%d", SNAPSHOT_DIR_NAME, thisNode.getGroupId().getId(), newSnapshotIndex);
1✔
266
      File snapshotDir = new File(storageDir, newSnapshotDirName);
1✔
267
      if (snapshotDir.exists()) {
1✔
268
        FileUtils.deleteDirectory(snapshotDir);
×
269
      }
270
      if (!snapshotDir.mkdirs()) {
1✔
271
        throw new ConsensusGroupModifyPeerException(
×
272
            String.format("%s: cannot mkdir for snapshot", thisNode.getGroupId()));
×
273
      }
274
      if (!stateMachine.takeSnapshot(snapshotDir)) {
1✔
275
        throw new ConsensusGroupModifyPeerException("unknown error when taking snapshot");
×
276
      }
277
      clearOldSnapshot();
1✔
278
    } catch (IOException e) {
×
279
      throw new ConsensusGroupModifyPeerException("error when taking snapshot", e);
×
280
    }
1✔
281
  }
1✔
282

283
  public void transitSnapshot(Peer targetPeer) throws ConsensusGroupModifyPeerException {
284
    File snapshotDir = new File(storageDir, newSnapshotDirName);
×
285
    List<Path> snapshotPaths = stateMachine.getSnapshotFiles(snapshotDir);
×
286
    logger.info("transit snapshots: {}", snapshotPaths);
×
287
    try (SyncIoTConsensusServiceClient client =
×
288
        syncClientManager.borrowClient(targetPeer.getEndpoint())) {
×
289
      for (Path path : snapshotPaths) {
×
290
        SnapshotFragmentReader reader = new SnapshotFragmentReader(newSnapshotDirName, path);
×
291
        try {
292
          while (reader.hasNext()) {
×
293
            TSendSnapshotFragmentReq req = reader.next().toTSendSnapshotFragmentReq();
×
294
            req.setConsensusGroupId(targetPeer.getGroupId().convertToTConsensusGroupId());
×
295
            TSendSnapshotFragmentRes res = client.sendSnapshotFragment(req);
×
296
            if (!isSuccess(res.getStatus())) {
×
297
              throw new ConsensusGroupModifyPeerException(
×
298
                  String.format("error when sending snapshot fragment to %s", targetPeer));
×
299
            }
300
          }
×
301
        } finally {
302
          reader.close();
×
303
        }
304
      }
×
305
    } catch (Exception e) {
×
306
      throw new ConsensusGroupModifyPeerException(
×
307
          String.format("error when send snapshot file to %s", targetPeer), e);
×
308
    }
×
309
  }
×
310

311
  public void receiveSnapshotFragment(
312
      String snapshotId, String originalFilePath, ByteBuffer fileChunk)
313
      throws ConsensusGroupModifyPeerException {
314
    try {
315
      String targetFilePath = calculateSnapshotPath(snapshotId, originalFilePath);
×
316
      File targetFile = new File(storageDir, targetFilePath);
×
317
      Path parentDir = Paths.get(targetFile.getParent());
×
318
      if (!Files.exists(parentDir)) {
×
319
        Files.createDirectories(parentDir);
×
320
      }
321
      Files.write(
×
322
          Paths.get(targetFile.getAbsolutePath()),
×
323
          fileChunk.array(),
×
324
          StandardOpenOption.CREATE,
325
          StandardOpenOption.APPEND);
326
    } catch (IOException e) {
×
327
      throw new ConsensusGroupModifyPeerException(
×
328
          String.format("error when receiving snapshot %s", snapshotId), e);
×
329
    }
×
330
  }
×
331

332
  private String calculateSnapshotPath(String snapshotId, String originalFilePath)
333
      throws ConsensusGroupModifyPeerException {
334
    if (!originalFilePath.contains(snapshotId)) {
×
335
      throw new ConsensusGroupModifyPeerException(
×
336
          String.format(
×
337
              "invalid snapshot file. snapshotId: %s, filePath: %s", snapshotId, originalFilePath));
338
    }
339
    return originalFilePath.substring(originalFilePath.indexOf(snapshotId));
×
340
  }
341

342
  private long getLatestSnapshotIndex() {
343
    long snapShotIndex = 0;
1✔
344
    File directory = new File(storageDir);
1✔
345
    File[] versionFiles = directory.listFiles((dir, name) -> name.startsWith(SNAPSHOT_DIR_NAME));
1✔
346
    if (versionFiles == null || versionFiles.length == 0) {
1✔
347
      return snapShotIndex;
1✔
348
    }
349
    for (File file : versionFiles) {
1✔
350
      snapShotIndex =
1✔
351
          Math.max(
1✔
352
              snapShotIndex,
353
              Long.parseLong(SNAPSHOT_INDEX_PATTEN.matcher(file.getName()).replaceAll("")));
1✔
354
    }
355
    return snapShotIndex;
1✔
356
  }
357

358
  private void clearOldSnapshot() {
359
    File directory = new File(storageDir);
1✔
360
    File[] versionFiles = directory.listFiles((dir, name) -> name.startsWith(SNAPSHOT_DIR_NAME));
1✔
361
    if (versionFiles == null || versionFiles.length == 0) {
1✔
362
      logger.error(
×
363
          "Can not find any snapshot dir after build a new snapshot for group {}",
364
          thisNode.getGroupId());
×
365
      return;
×
366
    }
367
    for (File file : versionFiles) {
1✔
368
      if (!file.getName().equals(newSnapshotDirName)) {
1✔
369
        try {
370
          FileUtils.deleteDirectory(file);
1✔
371
        } catch (IOException e) {
×
372
          logger.error("Delete old snapshot dir {} failed", file.getAbsolutePath(), e);
×
373
        }
1✔
374
      }
375
    }
376
  }
1✔
377

378
  public void loadSnapshot(String snapshotId) {
379
    // TODO: (xingtanzjr) throw exception if the snapshot load failed
380
    stateMachine.loadSnapshot(new File(storageDir, snapshotId));
×
381
  }
×
382

383
  public void inactivePeer(Peer peer) throws ConsensusGroupModifyPeerException {
384
    try (SyncIoTConsensusServiceClient client =
×
385
        syncClientManager.borrowClient(peer.getEndpoint())) {
×
386
      TInactivatePeerRes res =
×
387
          client.inactivatePeer(
×
388
              new TInactivatePeerReq(peer.getGroupId().convertToTConsensusGroupId()));
×
389
      if (!isSuccess(res.status)) {
×
390
        throw new ConsensusGroupModifyPeerException(
×
391
            String.format("error when inactivating %s. %s", peer, res.getStatus()));
×
392
      }
393
    } catch (Exception e) {
×
394
      throw new ConsensusGroupModifyPeerException(
×
395
          String.format("error when inactivating %s", peer), e);
×
396
    }
×
397
  }
×
398

399
  public void triggerSnapshotLoad(Peer peer) throws ConsensusGroupModifyPeerException {
400
    try (SyncIoTConsensusServiceClient client =
×
401
        syncClientManager.borrowClient(peer.getEndpoint())) {
×
402
      TTriggerSnapshotLoadRes res =
×
403
          client.triggerSnapshotLoad(
×
404
              new TTriggerSnapshotLoadReq(
405
                  thisNode.getGroupId().convertToTConsensusGroupId(), newSnapshotDirName));
×
406
      if (!isSuccess(res.status)) {
×
407
        throw new ConsensusGroupModifyPeerException(
×
408
            String.format("error when triggering snapshot load %s. %s", peer, res.getStatus()));
×
409
      }
410
    } catch (Exception e) {
×
411
      throw new ConsensusGroupModifyPeerException(
×
412
          String.format("error when activating %s", peer), e);
×
413
    }
×
414
  }
×
415

416
  public void activePeer(Peer peer) throws ConsensusGroupModifyPeerException {
417
    try (SyncIoTConsensusServiceClient client =
×
418
        syncClientManager.borrowClient(peer.getEndpoint())) {
×
419
      TActivatePeerRes res =
×
420
          client.activatePeer(new TActivatePeerReq(peer.getGroupId().convertToTConsensusGroupId()));
×
421
      if (!isSuccess(res.status)) {
×
422
        throw new ConsensusGroupModifyPeerException(
×
423
            String.format("error when activating %s. %s", peer, res.getStatus()));
×
424
      }
425
    } catch (Exception e) {
×
426
      throw new ConsensusGroupModifyPeerException(
×
427
          String.format("error when activating %s", peer), e);
×
428
    }
×
429
  }
×
430

431
  public void notifyPeersToBuildSyncLogChannel(Peer targetPeer)
432
      throws ConsensusGroupModifyPeerException {
433
    // The configuration will be modified during iterating because we will add the targetPeer to
434
    // configuration
435
    List<Peer> currentMembers = new ArrayList<>(this.configuration);
×
436
    logger.info(
×
437
        "[IoTConsensus] notify current peers to build sync log. group member: {}, target: {}",
438
        currentMembers,
439
        targetPeer);
440
    for (Peer peer : currentMembers) {
×
441
      logger.info("[IoTConsensus] build sync log channel from {}", peer);
×
442
      if (peer.equals(thisNode)) {
×
443
        // use searchIndex for thisNode as the initialSyncIndex because targetPeer will load the
444
        // snapshot produced by thisNode
445
        buildSyncLogChannel(targetPeer, searchIndex.get());
×
446
      } else {
447
        // use RPC to tell other peers to build sync log channel to target peer
448
        try (SyncIoTConsensusServiceClient client =
×
449
            syncClientManager.borrowClient(peer.getEndpoint())) {
×
450
          TBuildSyncLogChannelRes res =
×
451
              client.buildSyncLogChannel(
×
452
                  new TBuildSyncLogChannelReq(
453
                      targetPeer.getGroupId().convertToTConsensusGroupId(),
×
454
                      targetPeer.getEndpoint(),
×
455
                      targetPeer.getNodeId()));
×
456
          if (!isSuccess(res.status)) {
×
457
            throw new ConsensusGroupModifyPeerException(
×
458
                String.format("build sync log channel failed from %s to %s", peer, targetPeer));
×
459
          }
460
        } catch (Exception e) {
×
461
          // We use a simple way to deal with the connection issue when notifying other nodes to
462
          // build sync log. If the un-responsible peer is the peer which will be removed, we cannot
463
          // suspend the operation and need to skip it. In order to keep the mechanism works fine,
464
          // we will skip the peer which cannot be reached.
465
          // If following error message appears, the un-responsible peer should be removed manually
466
          // after current operation
467
          // TODO: (xingtanzjr) design more reliable way for IoTConsensus
468
          logger.error(
×
469
              "cannot notify {} to build sync log channel. "
470
                  + "Please check the status of this node manually",
471
              peer,
472
              e);
473
        }
×
474
      }
475
    }
×
476
  }
×
477

478
  public void notifyPeersToRemoveSyncLogChannel(Peer targetPeer)
479
      throws ConsensusGroupModifyPeerException {
480
    // The configuration will be modified during iterating because we will add the targetPeer to
481
    // configuration
482
    List<Peer> currentMembers = new ArrayList<>(this.configuration);
×
483
    for (Peer peer : currentMembers) {
×
484
      if (peer.equals(targetPeer)) {
×
485
        // if the targetPeer is the same as current peer, skip it because removing itself is illegal
486
        continue;
×
487
      }
488
      if (peer.equals(thisNode)) {
×
489
        removeSyncLogChannel(targetPeer);
×
490
      } else {
491
        // use RPC to tell other peers to build sync log channel to target peer
492
        try (SyncIoTConsensusServiceClient client =
×
493
            syncClientManager.borrowClient(peer.getEndpoint())) {
×
494
          TRemoveSyncLogChannelRes res =
×
495
              client.removeSyncLogChannel(
×
496
                  new TRemoveSyncLogChannelReq(
497
                      targetPeer.getGroupId().convertToTConsensusGroupId(),
×
498
                      targetPeer.getEndpoint(),
×
499
                      targetPeer.getNodeId()));
×
500
          if (!isSuccess(res.status)) {
×
501
            throw new ConsensusGroupModifyPeerException(
×
502
                String.format("remove sync log channel failed from %s to %s", peer, targetPeer));
×
503
          }
504
        } catch (Exception e) {
×
505
          throw new ConsensusGroupModifyPeerException(
×
506
              String.format("error when removing sync log channel to %s", peer), e);
×
507
        }
×
508
      }
509
    }
×
510
  }
×
511

512
  public void waitTargetPeerUntilSyncLogCompleted(Peer targetPeer)
513
      throws ConsensusGroupModifyPeerException {
514
    long checkIntervalInMs = 10_000L;
×
515
    try (SyncIoTConsensusServiceClient client =
×
516
        syncClientManager.borrowClient(targetPeer.getEndpoint())) {
×
517
      while (true) {
518
        TWaitSyncLogCompleteRes res =
×
519
            client.waitSyncLogComplete(
×
520
                new TWaitSyncLogCompleteReq(targetPeer.getGroupId().convertToTConsensusGroupId()));
×
521
        if (res.complete) {
×
522
          logger.info(
×
523
              "{} SyncLog is completed. TargetIndex: {}, CurrentSyncIndex: {}",
524
              targetPeer,
525
              res.searchIndex,
×
526
              res.safeIndex);
×
527
          return;
×
528
        }
529
        logger.info(
×
530
            "{} SyncLog is still in progress. TargetIndex: {}, CurrentSyncIndex: {}",
531
            targetPeer,
532
            res.searchIndex,
×
533
            res.safeIndex);
×
534
        Thread.sleep(checkIntervalInMs);
×
535
      }
×
536
    } catch (ClientManagerException | TException e) {
×
537
      throw new ConsensusGroupModifyPeerException(
×
538
          String.format(
×
539
              "error when waiting %s to complete SyncLog. %s", targetPeer, e.getMessage()),
×
540
          e);
541
    } catch (InterruptedException e) {
×
542
      Thread.currentThread().interrupt();
×
543
      throw new ConsensusGroupModifyPeerException(
×
544
          String.format(
×
545
              "thread interrupted when waiting %s to complete SyncLog. %s",
546
              targetPeer, e.getMessage()),
×
547
          e);
548
    }
549
  }
550

551
  private boolean isSuccess(TSStatus status) {
552
    return status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode();
×
553
  }
554

555
  /**
556
   * build SyncLog channel with safeIndex as the default initial sync index.
557
   *
558
   * @throws ConsensusGroupModifyPeerException
559
   */
560
  public void buildSyncLogChannel(Peer targetPeer) throws ConsensusGroupModifyPeerException {
561
    buildSyncLogChannel(targetPeer, getCurrentSafelyDeletedSearchIndex());
×
562
  }
×
563

564
  public void buildSyncLogChannel(Peer targetPeer, long initialSyncIndex)
565
      throws ConsensusGroupModifyPeerException {
566
    // step 1, build sync channel in LogDispatcher
567
    logger.info(
×
568
        "[IoTConsensus] build sync log channel to {} with initialSyncIndex {}",
569
        targetPeer,
570
        initialSyncIndex);
×
571
    logDispatcher.addLogDispatcherThread(targetPeer, initialSyncIndex);
×
572
    // step 2, update configuration
573
    configuration.add(targetPeer);
×
574
    // step 3, persist configuration
575
    logger.info("[IoTConsensus] persist new configuration: {}", configuration);
×
576
    persistConfigurationUpdate();
×
577
  }
×
578

579
  public void removeSyncLogChannel(Peer targetPeer) throws ConsensusGroupModifyPeerException {
580
    try {
581
      // step 1, remove sync channel in LogDispatcher
582
      logDispatcher.removeLogDispatcherThread(targetPeer);
×
583
      logger.info("[IoTConsensus] log dispatcher to {} removed and cleanup", targetPeer);
×
584
      // step 2, update configuration
585
      configuration.remove(targetPeer);
×
586
      checkAndUpdateSafeDeletedSearchIndex();
×
587
      // step 3, persist configuration
588
      persistConfigurationUpdate();
×
589
      logger.info("[IoTConsensus] configuration updated to {}", this.configuration);
×
590
    } catch (IOException e) {
×
591
      throw new ConsensusGroupModifyPeerException("error when remove LogDispatcherThread", e);
×
592
    }
×
593
  }
×
594

595
  public void persistConfiguration() {
596
    try (PublicBAOS publicBAOS = new PublicBAOS();
1✔
597
        DataOutputStream outputStream = new DataOutputStream(publicBAOS)) {
1✔
598
      serializeConfigurationTo(outputStream);
1✔
599
      Files.write(
1✔
600
          Paths.get(new File(storageDir, CONFIGURATION_FILE_NAME).getAbsolutePath()),
1✔
601
          publicBAOS.getBuf());
1✔
602
    } catch (IOException e) {
×
603
      // TODO: (xingtanzjr) need to handle the IOException because the IoTConsensus won't
604
      // work expectedly
605
      //  if the exception occurs
606
      logger.error("Unexpected error occurs when persisting configuration", e);
×
607
    }
1✔
608
  }
1✔
609

610
  public void persistConfigurationUpdate() throws ConsensusGroupModifyPeerException {
611
    try (PublicBAOS publicBAOS = new PublicBAOS();
×
612
        DataOutputStream outputStream = new DataOutputStream(publicBAOS)) {
×
613
      serializeConfigurationTo(outputStream);
×
614
      Path tmpConfigurationPath =
×
615
          Paths.get(new File(storageDir, CONFIGURATION_TMP_FILE_NAME).getAbsolutePath());
×
616
      Path configurationPath =
×
617
          Paths.get(new File(storageDir, CONFIGURATION_FILE_NAME).getAbsolutePath());
×
618
      Files.write(tmpConfigurationPath, publicBAOS.getBuf());
×
619
      Files.delete(configurationPath);
×
620
      Files.move(tmpConfigurationPath, configurationPath);
×
621
    } catch (IOException e) {
×
622
      throw new ConsensusGroupModifyPeerException(
×
623
          "Unexpected error occurs when update configuration", e);
624
    }
×
625
  }
×
626

627
  private void serializeConfigurationTo(DataOutputStream outputStream) throws IOException {
628
    outputStream.writeInt(configuration.size());
1✔
629
    for (Peer peer : configuration) {
1✔
630
      peer.serialize(outputStream);
1✔
631
    }
1✔
632
  }
1✔
633

634
  public void recoverConfiguration() {
635
    ByteBuffer buffer;
636
    try {
637
      Path tmpConfigurationPath =
1✔
638
          Paths.get(new File(storageDir, CONFIGURATION_TMP_FILE_NAME).getAbsolutePath());
1✔
639
      Path configurationPath =
1✔
640
          Paths.get(new File(storageDir, CONFIGURATION_FILE_NAME).getAbsolutePath());
1✔
641
      // If the tmpConfigurationPath exists, it means the `persistConfigurationUpdate` is
642
      // interrupted
643
      // unexpectedly, we need substitute configuration with tmpConfiguration file
644
      if (Files.exists(tmpConfigurationPath)) {
1✔
645
        if (Files.exists(configurationPath)) {
×
646
          Files.delete(configurationPath);
×
647
        }
648
        Files.move(tmpConfigurationPath, configurationPath);
×
649
      }
650
      buffer = ByteBuffer.wrap(Files.readAllBytes(configurationPath));
1✔
651
      int size = buffer.getInt();
1✔
652
      for (int i = 0; i < size; i++) {
1✔
653
        configuration.add(Peer.deserialize(buffer));
1✔
654
      }
655
      logger.info("Recover IoTConsensus server Impl, configuration: {}", configuration);
1✔
656
    } catch (IOException e) {
×
657
      logger.error("Unexpected error occurs when recovering configuration", e);
×
658
    }
1✔
659
  }
1✔
660

661
  public IndexedConsensusRequest buildIndexedConsensusRequestForLocalRequest(
662
      IConsensusRequest request) {
663
    if (request instanceof ComparableConsensusRequest) {
1✔
664
      final IoTProgressIndex iotProgressIndex =
×
665
          new IoTProgressIndex(thisNode.getNodeId(), searchIndex.get() + 1);
×
666
      ((ComparableConsensusRequest) request).setProgressIndex(iotProgressIndex);
×
667
    }
668
    return new IndexedConsensusRequest(searchIndex.get() + 1, Collections.singletonList(request));
1✔
669
  }
670

671
  public IndexedConsensusRequest buildIndexedConsensusRequestForRemoteRequest(
672
      long syncIndex, List<IConsensusRequest> requests) {
673
    return new IndexedConsensusRequest(
1✔
674
        ConsensusReqReader.DEFAULT_SEARCH_INDEX, syncIndex, requests);
675
  }
676

677
  /**
678
   * In the case of multiple copies, the minimum synchronization index is selected. In the case of
679
   * single copies, the current index is selected
680
   */
681
  public long getCurrentSafelyDeletedSearchIndex() {
682
    return logDispatcher.getMinSyncIndex().orElseGet(searchIndex::get);
1✔
683
  }
684

685
  public String getStorageDir() {
686
    return storageDir;
1✔
687
  }
688

689
  public Peer getThisNode() {
690
    return thisNode;
1✔
691
  }
692

693
  public List<Peer> getConfiguration() {
694
    return configuration;
1✔
695
  }
696

697
  public long getSearchIndex() {
698
    return searchIndex.get();
1✔
699
  }
700

701
  public long getSyncLag() {
702
    long safeIndex = getCurrentSafelyDeletedSearchIndex();
×
703
    return getSearchIndex() - safeIndex;
×
704
  }
705

706
  public IoTConsensusConfig getConfig() {
707
    return config;
1✔
708
  }
709

710
  public long getLogEntriesFromWAL() {
711
    return logDispatcher.getLogEntriesFromWAL();
×
712
  }
713

714
  public long getLogEntriesFromQueue() {
715
    return logDispatcher.getLogEntriesFromQueue();
×
716
  }
717

718
  public boolean needBlockWrite() {
719
    return consensusReqReader.getTotalSize() > config.getReplication().getWalThrottleThreshold();
1✔
720
  }
721

722
  public boolean unblockWrite() {
723
    return consensusReqReader.getTotalSize() < config.getReplication().getWalThrottleThreshold();
1✔
724
  }
725

726
  public void signal() {
727
    stateMachineLock.lock();
1✔
728
    try {
729
      stateMachineCondition.signalAll();
1✔
730
    } finally {
731
      stateMachineLock.unlock();
1✔
732
    }
733
  }
1✔
734

735
  public AtomicLong getIndexObject() {
736
    return searchIndex;
1✔
737
  }
738

739
  public ScheduledExecutorService getRetryService() {
740
    return retryService;
1✔
741
  }
742

743
  public boolean isReadOnly() {
744
    return stateMachine.isReadOnly();
1✔
745
  }
746

747
  public boolean isActive() {
748
    return active;
1✔
749
  }
750

751
  public void setActive(boolean active) {
752
    logger.info("set {} active status to {}", this.thisNode, active);
×
753
    this.active = active;
×
754
  }
×
755

756
  public void cleanupRemoteSnapshot(Peer targetPeer) throws ConsensusGroupModifyPeerException {
757
    try (SyncIoTConsensusServiceClient client =
×
758
        syncClientManager.borrowClient(targetPeer.getEndpoint())) {
×
759
      TCleanupTransferredSnapshotReq req =
×
760
          new TCleanupTransferredSnapshotReq(
761
              targetPeer.getGroupId().convertToTConsensusGroupId(), newSnapshotDirName);
×
762
      TCleanupTransferredSnapshotRes res = client.cleanupTransferredSnapshot(req);
×
763
      if (!isSuccess(res.getStatus())) {
×
764
        throw new ConsensusGroupModifyPeerException(
×
765
            String.format(
×
766
                "cleanup remote snapshot failed of %s ,status is %s", targetPeer, res.getStatus()));
×
767
      }
768
    } catch (Exception e) {
×
769
      throw new ConsensusGroupModifyPeerException(
×
770
          String.format("cleanup remote snapshot failed of %s", targetPeer), e);
×
771
    }
×
772
  }
×
773

774
  public void cleanupTransferredSnapshot(String snapshotId)
775
      throws ConsensusGroupModifyPeerException {
776
    File snapshotDir = new File(storageDir, snapshotId);
×
777
    if (snapshotDir.exists()) {
×
778
      try {
779
        FileUtils.deleteDirectory(snapshotDir);
×
780
      } catch (IOException e) {
×
781
        throw new ConsensusGroupModifyPeerException(e);
×
782
      }
×
783
    }
784
  }
×
785

786
  /**
787
   * We should set safelyDeletedSearchIndex to searchIndex before addPeer to avoid potential data
788
   * lost.
789
   */
790
  public void checkAndLockSafeDeletedSearchIndex() {
791
    if (configuration.size() == 1) {
×
792
      consensusReqReader.setSafelyDeletedSearchIndex(searchIndex.get());
×
793
    }
794
  }
×
795

796
  /**
797
   * If there is only one replica, set it to Long.MAX_VALUE.、 If there are multiple replicas, get
798
   * the latest SafelyDeletedSearchIndex again. This enables wal to be deleted in a timely manner.
799
   */
800
  public void checkAndUpdateSafeDeletedSearchIndex() {
801
    if (configuration.size() == 1) {
1✔
802
      consensusReqReader.setSafelyDeletedSearchIndex(Long.MAX_VALUE);
1✔
803
    } else {
804
      consensusReqReader.setSafelyDeletedSearchIndex(getCurrentSafelyDeletedSearchIndex());
1✔
805
    }
806
  }
1✔
807

808
  public TSStatus syncLog(int sourcePeerId, IConsensusRequest request) {
809
    return cacheQueueMap
1✔
810
        .computeIfAbsent(sourcePeerId, SyncLogCacheQueue::new)
1✔
811
        .cacheAndInsertLatestNode((DeserializedBatchIndexedConsensusRequest) request);
1✔
812
  }
813

814
  public String getConsensusGroupId() {
815
    return consensusGroupId;
1✔
816
  }
817

818
  /**
819
   * This method is used for write of IoTConsensus SyncLog. By this method, we can keep write order
820
   * in follower the same as the leader. And besides order insurance, we can make the
821
   * deserialization of PlanNode to be concurrent
822
   */
823
  private class SyncLogCacheQueue {
824

825
    private final int sourcePeerId;
826
    private final Lock queueLock = new ReentrantLock();
1✔
827
    private final Condition queueSortCondition = queueLock.newCondition();
1✔
828
    private final PriorityQueue<DeserializedBatchIndexedConsensusRequest> requestCache;
829
    private long nextSyncIndex = -1;
1✔
830

831
    public SyncLogCacheQueue(int sourcePeerId) {
1✔
832
      this.sourcePeerId = sourcePeerId;
1✔
833
      this.requestCache = new PriorityQueue<>();
1✔
834
    }
1✔
835

836
    /**
837
     * This method is used for write of IoTConsensus SyncLog. By this method, we can keep write
838
     * order in follower the same as the leader. And besides order insurance, we can make the
839
     * deserialization of PlanNode to be concurrent
840
     */
841
    private TSStatus cacheAndInsertLatestNode(DeserializedBatchIndexedConsensusRequest request) {
842
      queueLock.lock();
1✔
843
      try {
844
        requestCache.add(request);
1✔
845
        // If the peek is not hold by current thread, it should notify the corresponding thread to
846
        // process the peek when the queue is full
847
        if (requestCache.size() == config.getReplication().getMaxPendingBatchesNum()
1✔
848
            && requestCache.peek() != null
1✔
849
            && requestCache.peek().getStartSyncIndex() != request.getStartSyncIndex()) {
1✔
850
          queueSortCondition.signalAll();
1✔
851
        }
852
        while (true) {
853
          // If current InsertNode is the next target InsertNode, write it
854
          if (request.getStartSyncIndex() == nextSyncIndex) {
1✔
855
            requestCache.remove(request);
1✔
856
            nextSyncIndex = request.getEndSyncIndex() + 1;
1✔
857
            break;
1✔
858
          }
859
          // If all write thread doesn't hit nextSyncIndex and the heap is full, write
860
          // the peek request. This is used to keep the whole write correct when nextSyncIndex
861
          // is not set. We won't persist the value of nextSyncIndex to reduce the complexity.
862
          // There are some cases that nextSyncIndex is not set:
863
          //   1. When the system was just started
864
          //   2. When some exception occurs during SyncLog
865
          if (requestCache.size() == config.getReplication().getMaxPendingBatchesNum()
1✔
866
              && requestCache.peek() != null
1✔
867
              && requestCache.peek().getStartSyncIndex() == request.getStartSyncIndex()) {
1✔
868
            requestCache.remove();
1✔
869
            nextSyncIndex = request.getEndSyncIndex() + 1;
1✔
870
            break;
1✔
871
          }
872
          try {
873
            boolean timeout =
1✔
874
                !queueSortCondition.await(
1✔
875
                    config.getReplication().getMaxWaitingTimeForWaitBatchInMs(),
1✔
876
                    TimeUnit.MILLISECONDS);
877
            // although the timeout is triggered, current thread cannot write its request
878
            // if current thread does not hold the peek request. And there should be some
879
            // other thread who hold the peek request. In this scenario, current thread
880
            // should go into await again and wait until its request becoming peek request
881
            if (timeout
1✔
882
                && requestCache.peek() != null
×
883
                && requestCache.peek().getStartSyncIndex() == request.getStartSyncIndex()) {
×
884
              // current thread hold the peek request thus it can write the peek immediately.
885
              logger.info(
×
886
                  "waiting target request timeout. current index: {}, target index: {}",
887
                  request.getStartSyncIndex(),
×
888
                  nextSyncIndex);
×
889
              requestCache.remove(request);
×
890
              nextSyncIndex = Math.max(nextSyncIndex, request.getEndSyncIndex() + 1);
×
891
              break;
×
892
            }
893
          } catch (InterruptedException e) {
×
894
            logger.warn(
×
895
                "current waiting is interrupted. SyncIndex: {}. Exception: ",
896
                request.getStartSyncIndex(),
×
897
                e);
898
            Thread.currentThread().interrupt();
×
899
          }
1✔
900
        }
901
        logger.debug(
1✔
902
            "source = {}, region = {}, queue size {}, startSyncIndex = {}, endSyncIndex = {}",
903
            sourcePeerId,
1✔
904
            consensusGroupId,
1✔
905
            requestCache.size(),
1✔
906
            request.getStartSyncIndex(),
1✔
907
            request.getEndSyncIndex());
1✔
908
        List<TSStatus> subStatus = new LinkedList<>();
1✔
909
        for (IConsensusRequest insertNode : request.getInsertNodes()) {
1✔
910
          subStatus.add(stateMachine.write(insertNode));
1✔
911
        }
1✔
912
        queueSortCondition.signalAll();
1✔
913
        return new TSStatus().setSubStatus(subStatus);
1✔
914
      } finally {
915
        queueLock.unlock();
1✔
916
      }
917
    }
918
  }
919
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc