• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

apache / iotdb / #9852

17 Aug 2023 02:14AM UTC coverage: 48.081% (+0.003%) from 48.078%
#9852

push

travis_ci

web-flow
[RatisConsensus] Unify read timeout to Thrift connection timeout (#10876) (#10879)

19 of 19 new or added lines in 3 files covered. (100.0%)

79749 of 165865 relevant lines covered (48.08%)

0.48 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

47.06
/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/IoTConsensus.java
1
/*
2
 * Licensed to the Apache Software Foundation (ASF) under one
3
 * or more contributor license agreements.  See the NOTICE file
4
 * distributed with this work for additional information
5
 * regarding copyright ownership.  The ASF licenses this file
6
 * to you under the Apache License, Version 2.0 (the
7
 * "License"); you may not use this file except in compliance
8
 * with the License.  You may obtain a copy of the License at
9
 *
10
 *     http://www.apache.org/licenses/LICENSE-2.0
11
 *
12
 * Unless required by applicable law or agreed to in writing,
13
 * software distributed under the License is distributed on an
14
 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15
 * KIND, either express or implied.  See the License for the
16
 * specific language governing permissions and limitations
17
 * under the License.
18
 */
19

20
package org.apache.iotdb.consensus.iot;
21

22
import org.apache.iotdb.common.rpc.thrift.TEndPoint;
23
import org.apache.iotdb.common.rpc.thrift.TSStatus;
24
import org.apache.iotdb.commons.client.IClientManager;
25
import org.apache.iotdb.commons.concurrent.IoTDBThreadPoolFactory;
26
import org.apache.iotdb.commons.concurrent.ThreadName;
27
import org.apache.iotdb.commons.consensus.ConsensusGroupId;
28
import org.apache.iotdb.commons.exception.StartupException;
29
import org.apache.iotdb.commons.service.RegisterManager;
30
import org.apache.iotdb.commons.utils.FileUtils;
31
import org.apache.iotdb.consensus.IConsensus;
32
import org.apache.iotdb.consensus.IStateMachine;
33
import org.apache.iotdb.consensus.IStateMachine.Registry;
34
import org.apache.iotdb.consensus.common.Peer;
35
import org.apache.iotdb.consensus.common.request.IConsensusRequest;
36
import org.apache.iotdb.consensus.common.response.ConsensusGenericResponse;
37
import org.apache.iotdb.consensus.common.response.ConsensusReadResponse;
38
import org.apache.iotdb.consensus.common.response.ConsensusWriteResponse;
39
import org.apache.iotdb.consensus.config.ConsensusConfig;
40
import org.apache.iotdb.consensus.config.IoTConsensusConfig;
41
import org.apache.iotdb.consensus.exception.ConsensusException;
42
import org.apache.iotdb.consensus.exception.ConsensusGroupAlreadyExistException;
43
import org.apache.iotdb.consensus.exception.ConsensusGroupModifyPeerException;
44
import org.apache.iotdb.consensus.exception.ConsensusGroupNotExistException;
45
import org.apache.iotdb.consensus.exception.IllegalPeerEndpointException;
46
import org.apache.iotdb.consensus.exception.IllegalPeerNumException;
47
import org.apache.iotdb.consensus.iot.client.AsyncIoTConsensusServiceClient;
48
import org.apache.iotdb.consensus.iot.client.IoTConsensusClientPool.AsyncIoTConsensusServiceClientPoolFactory;
49
import org.apache.iotdb.consensus.iot.client.IoTConsensusClientPool.SyncIoTConsensusServiceClientPoolFactory;
50
import org.apache.iotdb.consensus.iot.client.SyncIoTConsensusServiceClient;
51
import org.apache.iotdb.consensus.iot.logdispatcher.IoTConsensusMemoryManager;
52
import org.apache.iotdb.consensus.iot.service.IoTConsensusRPCService;
53
import org.apache.iotdb.consensus.iot.service.IoTConsensusRPCServiceProcessor;
54
import org.apache.iotdb.rpc.RpcUtils;
55
import org.apache.iotdb.rpc.TSStatusCode;
56

57
import org.slf4j.Logger;
58
import org.slf4j.LoggerFactory;
59

60
import java.io.File;
61
import java.io.IOException;
62
import java.nio.file.DirectoryStream;
63
import java.nio.file.Files;
64
import java.nio.file.Path;
65
import java.util.ArrayList;
66
import java.util.List;
67
import java.util.Map;
68
import java.util.concurrent.ConcurrentHashMap;
69
import java.util.concurrent.ScheduledExecutorService;
70
import java.util.concurrent.TimeUnit;
71
import java.util.concurrent.atomic.AtomicBoolean;
72

73
public class IoTConsensus implements IConsensus {
74

75
  private final Logger logger = LoggerFactory.getLogger(IoTConsensus.class);
1✔
76

77
  private final TEndPoint thisNode;
78
  private final int thisNodeId;
79
  private final File storageDir;
80
  private final IStateMachine.Registry registry;
81
  private final Map<ConsensusGroupId, IoTConsensusServerImpl> stateMachineMap =
1✔
82
      new ConcurrentHashMap<>();
83
  private final IoTConsensusRPCService service;
84
  private final RegisterManager registerManager = new RegisterManager();
1✔
85
  private final IoTConsensusConfig config;
86
  private final IClientManager<TEndPoint, AsyncIoTConsensusServiceClient> clientManager;
87
  private final IClientManager<TEndPoint, SyncIoTConsensusServiceClient> syncClientManager;
88
  private final ScheduledExecutorService retryService;
89

90
  public IoTConsensus(ConsensusConfig config, Registry registry) {
1✔
91
    this.thisNode = config.getThisNodeEndPoint();
1✔
92
    this.thisNodeId = config.getThisNodeId();
1✔
93
    this.storageDir = new File(config.getStorageDir());
1✔
94
    this.config = config.getIoTConsensusConfig();
1✔
95
    this.registry = registry;
1✔
96
    this.service = new IoTConsensusRPCService(thisNode, config.getIoTConsensusConfig());
1✔
97
    this.clientManager =
1✔
98
        new IClientManager.Factory<TEndPoint, AsyncIoTConsensusServiceClient>()
99
            .createClientManager(
1✔
100
                new AsyncIoTConsensusServiceClientPoolFactory(config.getIoTConsensusConfig()));
1✔
101
    this.syncClientManager =
1✔
102
        new IClientManager.Factory<TEndPoint, SyncIoTConsensusServiceClient>()
103
            .createClientManager(
1✔
104
                new SyncIoTConsensusServiceClientPoolFactory(config.getIoTConsensusConfig()));
1✔
105
    this.retryService =
1✔
106
        IoTDBThreadPoolFactory.newSingleThreadScheduledExecutor(
1✔
107
            ThreadName.LOG_DISPATCHER_RETRY_EXECUTOR.getName());
1✔
108
    // init IoTConsensus memory manager
109
    IoTConsensusMemoryManager.getInstance()
1✔
110
        .init(
1✔
111
            config.getIoTConsensusConfig().getReplication().getAllocateMemoryForConsensus(),
1✔
112
            config.getIoTConsensusConfig().getReplication().getAllocateMemoryForQueue());
1✔
113
  }
1✔
114

115
  @Override
116
  public void start() throws IOException {
117
    initAndRecover();
1✔
118
    service.initAsyncedServiceImpl(new IoTConsensusRPCServiceProcessor(this));
1✔
119
    try {
120
      registerManager.register(service);
1✔
121
    } catch (StartupException e) {
×
122
      throw new IOException(e);
×
123
    }
1✔
124
  }
1✔
125

126
  private void initAndRecover() throws IOException {
127
    if (!storageDir.exists()) {
1✔
128
      if (!storageDir.mkdirs()) {
1✔
129
        logger.warn("Unable to create consensus dir at {}", storageDir);
×
130
      }
131
    } else {
132
      try (DirectoryStream<Path> stream = Files.newDirectoryStream(storageDir.toPath())) {
1✔
133
        for (Path path : stream) {
1✔
134
          String[] items = path.getFileName().toString().split("_");
1✔
135
          ConsensusGroupId consensusGroupId =
1✔
136
              ConsensusGroupId.Factory.create(
1✔
137
                  Integer.parseInt(items[0]), Integer.parseInt(items[1]));
1✔
138
          IoTConsensusServerImpl consensus =
1✔
139
              new IoTConsensusServerImpl(
140
                  path.toString(),
1✔
141
                  new Peer(consensusGroupId, thisNodeId, thisNode),
142
                  new ArrayList<>(),
143
                  registry.apply(consensusGroupId),
1✔
144
                  retryService,
145
                  clientManager,
146
                  syncClientManager,
147
                  config);
148
          stateMachineMap.put(consensusGroupId, consensus);
1✔
149
          consensus.start();
1✔
150
        }
1✔
151
      }
152
    }
153
  }
1✔
154

155
  @Override
156
  public void stop() {
157
    stateMachineMap.values().parallelStream().forEach(IoTConsensusServerImpl::stop);
1✔
158
    clientManager.close();
1✔
159
    syncClientManager.close();
1✔
160
    registerManager.deregisterAll();
1✔
161
    retryService.shutdown();
1✔
162
    try {
163
      retryService.awaitTermination(5, TimeUnit.SECONDS);
1✔
164
    } catch (InterruptedException e) {
×
165
      logger.warn("{}: interrupted when shutting down add Executor with exception {}", this, e);
×
166
      Thread.currentThread().interrupt();
×
167
    }
1✔
168
  }
1✔
169

170
  @Override
171
  public ConsensusWriteResponse write(ConsensusGroupId groupId, IConsensusRequest request) {
172
    IoTConsensusServerImpl impl = stateMachineMap.get(groupId);
1✔
173
    if (impl == null) {
1✔
174
      return ConsensusWriteResponse.newBuilder()
×
175
          .setException(new ConsensusGroupNotExistException(groupId))
×
176
          .build();
×
177
    }
178

179
    TSStatus status;
180
    if (impl.isReadOnly()) {
1✔
181
      status = new TSStatus(TSStatusCode.SYSTEM_READ_ONLY.getStatusCode());
×
182
      status.setMessage("Fail to do non-query operations because system is read-only.");
×
183
    } else if (!impl.isActive()) {
1✔
184
      // TODO: (xingtanzjr) whether we need to define a new status to indicate the inactive status ?
185
      status = RpcUtils.getStatus(TSStatusCode.WRITE_PROCESS_REJECT);
×
186
      status.setMessage("peer is inactive and not ready to receive sync log request.");
×
187
    } else {
188
      status = impl.write(request);
1✔
189
    }
190
    return ConsensusWriteResponse.newBuilder().setStatus(status).build();
1✔
191
  }
192

193
  @Override
194
  public ConsensusReadResponse read(ConsensusGroupId groupId, IConsensusRequest request) {
195
    IoTConsensusServerImpl impl = stateMachineMap.get(groupId);
×
196
    if (impl == null) {
×
197
      return ConsensusReadResponse.newBuilder()
×
198
          .setException(new ConsensusGroupNotExistException(groupId))
×
199
          .build();
×
200
    }
201
    return ConsensusReadResponse.newBuilder().setDataSet(impl.read(request)).build();
×
202
  }
203

204
  @Override
205
  public ConsensusGenericResponse createPeer(ConsensusGroupId groupId, List<Peer> peers) {
206
    int consensusGroupSize = peers.size();
1✔
207
    if (consensusGroupSize == 0) {
1✔
208
      return ConsensusGenericResponse.newBuilder()
×
209
          .setException(new IllegalPeerNumException(consensusGroupSize))
×
210
          .build();
×
211
    }
212
    if (!peers.contains(new Peer(groupId, thisNodeId, thisNode))) {
1✔
213
      return ConsensusGenericResponse.newBuilder()
×
214
          .setException(new IllegalPeerEndpointException(thisNode, peers))
×
215
          .build();
×
216
    }
217
    AtomicBoolean exist = new AtomicBoolean(true);
1✔
218
    stateMachineMap.computeIfAbsent(
1✔
219
        groupId,
220
        k -> {
221
          exist.set(false);
1✔
222
          String path = buildPeerDir(storageDir, groupId);
1✔
223
          File file = new File(path);
1✔
224
          if (!file.mkdirs()) {
1✔
225
            logger.warn("Unable to create consensus dir for group {} at {}", groupId, path);
×
226
          }
227
          IoTConsensusServerImpl impl =
1✔
228
              new IoTConsensusServerImpl(
229
                  path,
230
                  new Peer(groupId, thisNodeId, thisNode),
231
                  peers,
232
                  registry.apply(groupId),
1✔
233
                  retryService,
234
                  clientManager,
235
                  syncClientManager,
236
                  config);
237
          impl.start();
1✔
238
          return impl;
1✔
239
        });
240
    if (exist.get()) {
1✔
241
      return ConsensusGenericResponse.newBuilder()
×
242
          .setException(new ConsensusGroupAlreadyExistException(groupId))
×
243
          .build();
×
244
    }
245
    return ConsensusGenericResponse.newBuilder().setSuccess(true).build();
1✔
246
  }
247

248
  @Override
249
  public ConsensusGenericResponse deletePeer(ConsensusGroupId groupId) {
250
    AtomicBoolean exist = new AtomicBoolean(false);
1✔
251
    stateMachineMap.computeIfPresent(
1✔
252
        groupId,
253
        (k, v) -> {
254
          exist.set(true);
1✔
255
          v.stop();
1✔
256
          FileUtils.deleteDirectory(new File(buildPeerDir(storageDir, groupId)));
1✔
257
          return null;
1✔
258
        });
259

260
    if (!exist.get()) {
1✔
261
      return ConsensusGenericResponse.newBuilder()
×
262
          .setException(new ConsensusGroupNotExistException(groupId))
×
263
          .build();
×
264
    }
265
    return ConsensusGenericResponse.newBuilder().setSuccess(true).build();
1✔
266
  }
267

268
  @Override
269
  public ConsensusGenericResponse addPeer(ConsensusGroupId groupId, Peer peer) {
270
    IoTConsensusServerImpl impl = stateMachineMap.get(groupId);
×
271
    if (impl == null) {
×
272
      return ConsensusGenericResponse.newBuilder()
×
273
          .setException(new ConsensusGroupNotExistException(groupId))
×
274
          .build();
×
275
    }
276
    try {
277
      // step 1: inactive new Peer to prepare for following steps
278
      logger.info("[IoTConsensus] inactivate new peer: {}", peer);
×
279
      impl.inactivePeer(peer);
×
280

281
      // step 2: notify all the other Peers to build the sync connection to newPeer
282
      logger.info("[IoTConsensus] notify current peers to build sync log...");
×
283
      impl.checkAndLockSafeDeletedSearchIndex();
×
284
      impl.notifyPeersToBuildSyncLogChannel(peer);
×
285

286
      // step 3: take snapshot
287
      logger.info("[IoTConsensus] start to take snapshot...");
×
288
      impl.takeSnapshot();
×
289

290
      // step 4: transit snapshot
291
      logger.info("[IoTConsensus] start to transit snapshot...");
×
292
      impl.transitSnapshot(peer);
×
293

294
      // step 5: let the new peer load snapshot
295
      logger.info("[IoTConsensus] trigger new peer to load snapshot...");
×
296
      impl.triggerSnapshotLoad(peer);
×
297

298
      // step 6: active new Peer
299
      logger.info("[IoTConsensus] activate new peer...");
×
300
      impl.activePeer(peer);
×
301

302
      // step 7: spot clean
303
      logger.info("[IoTConsensus] do spot clean...");
×
304
      doSpotClean(peer, impl);
×
305

306
    } catch (ConsensusGroupModifyPeerException e) {
×
307
      logger.error("cannot execute addPeer() for {}", peer, e);
×
308
      return ConsensusGenericResponse.newBuilder()
×
309
          .setSuccess(false)
×
310
          .setException(new ConsensusException(e.getMessage()))
×
311
          .build();
×
312
    }
×
313

314
    return ConsensusGenericResponse.newBuilder().setSuccess(true).build();
×
315
  }
316

317
  private void doSpotClean(Peer peer, IoTConsensusServerImpl impl) {
318
    try {
319
      impl.cleanupRemoteSnapshot(peer);
×
320
    } catch (ConsensusGroupModifyPeerException e) {
×
321
      logger.warn("[IoTConsensus] failed to cleanup remote snapshot", e);
×
322
    }
×
323
  }
×
324

325
  @Override
326
  public ConsensusGenericResponse removePeer(ConsensusGroupId groupId, Peer peer) {
327
    IoTConsensusServerImpl impl = stateMachineMap.get(groupId);
×
328
    if (impl == null) {
×
329
      return ConsensusGenericResponse.newBuilder()
×
330
          .setException(new ConsensusGroupNotExistException(groupId))
×
331
          .build();
×
332
    }
333
    try {
334
      // let other peers remove the sync channel with target peer
335
      impl.notifyPeersToRemoveSyncLogChannel(peer);
×
336
    } catch (ConsensusGroupModifyPeerException e) {
×
337
      return ConsensusGenericResponse.newBuilder()
×
338
          .setSuccess(false)
×
339
          .setException(new ConsensusException(e.getMessage()))
×
340
          .build();
×
341
    }
×
342

343
    try {
344
      // let target peer reject new write
345
      impl.inactivePeer(peer);
×
346
      // wait its SyncLog to complete
347
      impl.waitTargetPeerUntilSyncLogCompleted(peer);
×
348
    } catch (ConsensusGroupModifyPeerException e) {
×
349
      // we only log warning here because sometimes the target peer may already be down
350
      logger.warn("cannot wait {} to complete SyncLog. error message: {}", peer, e.getMessage());
×
351
    }
×
352

353
    return ConsensusGenericResponse.newBuilder().setSuccess(true).build();
×
354
  }
355

356
  @Override
357
  public ConsensusGenericResponse updatePeer(ConsensusGroupId groupId, Peer oldPeer, Peer newPeer) {
358
    return ConsensusGenericResponse.newBuilder().setSuccess(true).build();
×
359
  }
360

361
  @Override
362
  public ConsensusGenericResponse changePeer(ConsensusGroupId groupId, List<Peer> newPeers) {
363
    return ConsensusGenericResponse.newBuilder().setSuccess(false).build();
×
364
  }
365

366
  @Override
367
  public ConsensusGenericResponse transferLeader(ConsensusGroupId groupId, Peer newLeader) {
368
    return ConsensusGenericResponse.newBuilder().setSuccess(true).build();
×
369
  }
370

371
  @Override
372
  public ConsensusGenericResponse triggerSnapshot(ConsensusGroupId groupId) {
373
    IoTConsensusServerImpl impl = stateMachineMap.get(groupId);
1✔
374
    if (impl == null) {
1✔
375
      return ConsensusGenericResponse.newBuilder()
×
376
          .setException(new ConsensusGroupNotExistException(groupId))
×
377
          .build();
×
378
    }
379
    try {
380
      impl.takeSnapshot();
1✔
381
    } catch (ConsensusGroupModifyPeerException e) {
×
382
      return ConsensusGenericResponse.newBuilder()
×
383
          .setSuccess(false)
×
384
          .setException(new ConsensusException(e.getMessage()))
×
385
          .build();
×
386
    }
1✔
387
    return ConsensusGenericResponse.newBuilder().setSuccess(true).build();
1✔
388
  }
389

390
  @Override
391
  public boolean isLeader(ConsensusGroupId groupId) {
392
    return true;
×
393
  }
394

395
  @Override
396
  public Peer getLeader(ConsensusGroupId groupId) {
397
    if (!stateMachineMap.containsKey(groupId)) {
×
398
      return null;
×
399
    }
400
    return new Peer(groupId, thisNodeId, thisNode);
×
401
  }
402

403
  @Override
404
  public List<ConsensusGroupId> getAllConsensusGroupIds() {
405
    return new ArrayList<>(stateMachineMap.keySet());
×
406
  }
407

408
  public IoTConsensusServerImpl getImpl(ConsensusGroupId groupId) {
409
    return stateMachineMap.get(groupId);
1✔
410
  }
411

412
  public static String buildPeerDir(File storageDir, ConsensusGroupId groupId) {
413
    return storageDir + File.separator + groupId.getType().getValue() + "_" + groupId.getId();
1✔
414
  }
415
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc