• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

apache / iotdb / #9977

01 Sep 2023 12:40AM UTC coverage: 47.712% (+0.003%) from 47.709%
#9977

push

travis_ci

web-flow
[IOTDB-6134] Fill statement content in show queries for inner schema fetch

4 of 4 new or added lines in 1 file covered. (100.0%)

80191 of 168074 relevant lines covered (47.71%)

0.48 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

58.9
/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/IoTConsensus.java
1
/*
2
 * Licensed to the Apache Software Foundation (ASF) under one
3
 * or more contributor license agreements.  See the NOTICE file
4
 * distributed with this work for additional information
5
 * regarding copyright ownership.  The ASF licenses this file
6
 * to you under the Apache License, Version 2.0 (the
7
 * "License"); you may not use this file except in compliance
8
 * with the License.  You may obtain a copy of the License at
9
 *
10
 *     http://www.apache.org/licenses/LICENSE-2.0
11
 *
12
 * Unless required by applicable law or agreed to in writing,
13
 * software distributed under the License is distributed on an
14
 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15
 * KIND, either express or implied.  See the License for the
16
 * specific language governing permissions and limitations
17
 * under the License.
18
 */
19

20
package org.apache.iotdb.consensus.iot;
21

22
import org.apache.iotdb.common.rpc.thrift.TEndPoint;
23
import org.apache.iotdb.common.rpc.thrift.TSStatus;
24
import org.apache.iotdb.commons.client.IClientManager;
25
import org.apache.iotdb.commons.concurrent.IoTDBThreadPoolFactory;
26
import org.apache.iotdb.commons.concurrent.ThreadName;
27
import org.apache.iotdb.commons.consensus.ConsensusGroupId;
28
import org.apache.iotdb.commons.exception.StartupException;
29
import org.apache.iotdb.commons.service.RegisterManager;
30
import org.apache.iotdb.commons.utils.FileUtils;
31
import org.apache.iotdb.commons.utils.StatusUtils;
32
import org.apache.iotdb.consensus.IConsensus;
33
import org.apache.iotdb.consensus.IStateMachine;
34
import org.apache.iotdb.consensus.IStateMachine.Registry;
35
import org.apache.iotdb.consensus.common.DataSet;
36
import org.apache.iotdb.consensus.common.Peer;
37
import org.apache.iotdb.consensus.common.request.IConsensusRequest;
38
import org.apache.iotdb.consensus.config.ConsensusConfig;
39
import org.apache.iotdb.consensus.config.IoTConsensusConfig;
40
import org.apache.iotdb.consensus.exception.ConsensusException;
41
import org.apache.iotdb.consensus.exception.ConsensusGroupAlreadyExistException;
42
import org.apache.iotdb.consensus.exception.ConsensusGroupModifyPeerException;
43
import org.apache.iotdb.consensus.exception.ConsensusGroupNotExistException;
44
import org.apache.iotdb.consensus.exception.IllegalPeerEndpointException;
45
import org.apache.iotdb.consensus.exception.IllegalPeerNumException;
46
import org.apache.iotdb.consensus.exception.PeerAlreadyInConsensusGroupException;
47
import org.apache.iotdb.consensus.exception.PeerNotInConsensusGroupException;
48
import org.apache.iotdb.consensus.iot.client.AsyncIoTConsensusServiceClient;
49
import org.apache.iotdb.consensus.iot.client.IoTConsensusClientPool.AsyncIoTConsensusServiceClientPoolFactory;
50
import org.apache.iotdb.consensus.iot.client.IoTConsensusClientPool.SyncIoTConsensusServiceClientPoolFactory;
51
import org.apache.iotdb.consensus.iot.client.SyncIoTConsensusServiceClient;
52
import org.apache.iotdb.consensus.iot.logdispatcher.IoTConsensusMemoryManager;
53
import org.apache.iotdb.consensus.iot.service.IoTConsensusRPCService;
54
import org.apache.iotdb.consensus.iot.service.IoTConsensusRPCServiceProcessor;
55
import org.apache.iotdb.rpc.RpcUtils;
56
import org.apache.iotdb.rpc.TSStatusCode;
57

58
import org.slf4j.Logger;
59
import org.slf4j.LoggerFactory;
60

61
import java.io.File;
62
import java.io.IOException;
63
import java.nio.file.DirectoryStream;
64
import java.nio.file.Files;
65
import java.nio.file.Path;
66
import java.util.ArrayList;
67
import java.util.List;
68
import java.util.Map;
69
import java.util.Optional;
70
import java.util.concurrent.ConcurrentHashMap;
71
import java.util.concurrent.ScheduledExecutorService;
72
import java.util.concurrent.TimeUnit;
73
import java.util.concurrent.atomic.AtomicBoolean;
74

75
public class IoTConsensus implements IConsensus {
76

77
  private final Logger logger = LoggerFactory.getLogger(IoTConsensus.class);
1✔
78

79
  private final TEndPoint thisNode;
80
  private final int thisNodeId;
81
  private final File storageDir;
82
  private final IStateMachine.Registry registry;
83
  private final Map<ConsensusGroupId, IoTConsensusServerImpl> stateMachineMap =
1✔
84
      new ConcurrentHashMap<>();
85
  private final IoTConsensusRPCService service;
86
  private final RegisterManager registerManager = new RegisterManager();
1✔
87
  private final IoTConsensusConfig config;
88
  private final IClientManager<TEndPoint, AsyncIoTConsensusServiceClient> clientManager;
89
  private final IClientManager<TEndPoint, SyncIoTConsensusServiceClient> syncClientManager;
90
  private final ScheduledExecutorService retryService;
91

92
  public IoTConsensus(ConsensusConfig config, Registry registry) {
1✔
93
    this.thisNode = config.getThisNodeEndPoint();
1✔
94
    this.thisNodeId = config.getThisNodeId();
1✔
95
    this.storageDir = new File(config.getStorageDir());
1✔
96
    this.config = config.getIotConsensusConfig();
1✔
97
    this.registry = registry;
1✔
98
    this.service = new IoTConsensusRPCService(thisNode, config.getIotConsensusConfig());
1✔
99
    this.clientManager =
1✔
100
        new IClientManager.Factory<TEndPoint, AsyncIoTConsensusServiceClient>()
101
            .createClientManager(
1✔
102
                new AsyncIoTConsensusServiceClientPoolFactory(config.getIotConsensusConfig()));
1✔
103
    this.syncClientManager =
1✔
104
        new IClientManager.Factory<TEndPoint, SyncIoTConsensusServiceClient>()
105
            .createClientManager(
1✔
106
                new SyncIoTConsensusServiceClientPoolFactory(config.getIotConsensusConfig()));
1✔
107
    this.retryService =
1✔
108
        IoTDBThreadPoolFactory.newSingleThreadScheduledExecutor(
1✔
109
            ThreadName.LOG_DISPATCHER_RETRY_EXECUTOR.getName());
1✔
110
    // init IoTConsensus memory manager
111
    IoTConsensusMemoryManager.getInstance()
1✔
112
        .init(
1✔
113
            config.getIotConsensusConfig().getReplication().getAllocateMemoryForConsensus(),
1✔
114
            config.getIotConsensusConfig().getReplication().getAllocateMemoryForQueue());
1✔
115
  }
1✔
116

117
  @Override
118
  public synchronized void start() throws IOException {
119
    initAndRecover();
1✔
120
    service.initAsyncedServiceImpl(new IoTConsensusRPCServiceProcessor(this));
1✔
121
    try {
122
      registerManager.register(service);
1✔
123
    } catch (StartupException e) {
×
124
      throw new IOException(e);
×
125
    }
1✔
126
  }
1✔
127

128
  private void initAndRecover() throws IOException {
129
    if (!storageDir.exists()) {
1✔
130
      if (!storageDir.mkdirs()) {
1✔
131
        throw new IOException(String.format("Unable to create consensus dir at %s", storageDir));
×
132
      }
133
    } else {
134
      try (DirectoryStream<Path> stream = Files.newDirectoryStream(storageDir.toPath())) {
1✔
135
        for (Path path : stream) {
1✔
136
          String[] items = path.getFileName().toString().split("_");
1✔
137
          ConsensusGroupId consensusGroupId =
1✔
138
              ConsensusGroupId.Factory.create(
1✔
139
                  Integer.parseInt(items[0]), Integer.parseInt(items[1]));
1✔
140
          IoTConsensusServerImpl consensus =
1✔
141
              new IoTConsensusServerImpl(
142
                  path.toString(),
1✔
143
                  new Peer(consensusGroupId, thisNodeId, thisNode),
144
                  new ArrayList<>(),
145
                  registry.apply(consensusGroupId),
1✔
146
                  retryService,
147
                  clientManager,
148
                  syncClientManager,
149
                  config);
150
          stateMachineMap.put(consensusGroupId, consensus);
1✔
151
          consensus.start();
1✔
152
        }
1✔
153
      }
154
    }
155
  }
1✔
156

157
  @Override
158
  public synchronized void stop() {
159
    stateMachineMap.values().parallelStream().forEach(IoTConsensusServerImpl::stop);
1✔
160
    clientManager.close();
1✔
161
    syncClientManager.close();
1✔
162
    registerManager.deregisterAll();
1✔
163
    retryService.shutdown();
1✔
164
    try {
165
      retryService.awaitTermination(5, TimeUnit.SECONDS);
1✔
166
    } catch (InterruptedException e) {
×
167
      logger.warn("{}: interrupted when shutting down add Executor with exception {}", this, e);
×
168
      Thread.currentThread().interrupt();
×
169
    }
1✔
170
  }
1✔
171

172
  @Override
173
  public TSStatus write(ConsensusGroupId groupId, IConsensusRequest request)
174
      throws ConsensusException {
175
    IoTConsensusServerImpl impl =
1✔
176
        Optional.ofNullable(stateMachineMap.get(groupId))
1✔
177
            .orElseThrow(() -> new ConsensusGroupNotExistException(groupId));
1✔
178
    if (impl.isReadOnly()) {
1✔
179
      return StatusUtils.getStatus(TSStatusCode.SYSTEM_READ_ONLY);
×
180
    } else if (!impl.isActive()) {
1✔
181
      return RpcUtils.getStatus(
×
182
          TSStatusCode.WRITE_PROCESS_REJECT,
183
          "peer is inactive and not ready to receive sync log request.");
184
    } else {
185
      return impl.write(request);
1✔
186
    }
187
  }
188

189
  @Override
190
  public DataSet read(ConsensusGroupId groupId, IConsensusRequest request)
191
      throws ConsensusException {
192
    return Optional.ofNullable(stateMachineMap.get(groupId))
×
193
        .orElseThrow(() -> new ConsensusGroupNotExistException(groupId))
×
194
        .read(request);
×
195
  }
196

197
  @SuppressWarnings("java:S2201")
198
  @Override
199
  public void createLocalPeer(ConsensusGroupId groupId, List<Peer> peers)
200
      throws ConsensusException {
201
    int consensusGroupSize = peers.size();
1✔
202
    if (consensusGroupSize == 0) {
1✔
203
      throw new IllegalPeerNumException(consensusGroupSize);
1✔
204
    }
205
    if (!peers.contains(new Peer(groupId, thisNodeId, thisNode))) {
1✔
206
      throw new IllegalPeerEndpointException(thisNode, peers);
1✔
207
    }
208
    AtomicBoolean exist = new AtomicBoolean(true);
1✔
209
    Optional.ofNullable(
1✔
210
            stateMachineMap.computeIfAbsent(
1✔
211
                groupId,
212
                k -> {
213
                  exist.set(false);
1✔
214

215
                  String path = buildPeerDir(storageDir, groupId);
1✔
216
                  File file = new File(path);
1✔
217
                  if (!file.mkdirs()) {
1✔
218
                    logger.warn("Unable to create consensus dir for group {} at {}", groupId, path);
×
219
                    return null;
×
220
                  }
221

222
                  IoTConsensusServerImpl impl =
1✔
223
                      new IoTConsensusServerImpl(
224
                          path,
225
                          new Peer(groupId, thisNodeId, thisNode),
226
                          peers,
227
                          registry.apply(groupId),
1✔
228
                          retryService,
229
                          clientManager,
230
                          syncClientManager,
231
                          config);
232
                  impl.start();
1✔
233
                  return impl;
1✔
234
                }))
235
        .orElseThrow(
1✔
236
            () ->
237
                new ConsensusException(
×
238
                    String.format("Unable to create consensus dir for group %s", groupId)));
×
239
    if (exist.get()) {
1✔
240
      throw new ConsensusGroupAlreadyExistException(groupId);
1✔
241
    }
242
  }
1✔
243

244
  @Override
245
  public void deleteLocalPeer(ConsensusGroupId groupId) throws ConsensusException {
246
    AtomicBoolean exist = new AtomicBoolean(false);
1✔
247
    stateMachineMap.computeIfPresent(
1✔
248
        groupId,
249
        (k, v) -> {
250
          exist.set(true);
1✔
251
          v.stop();
1✔
252
          FileUtils.deleteDirectory(new File(buildPeerDir(storageDir, groupId)));
1✔
253
          return null;
1✔
254
        });
255
    if (!exist.get()) {
1✔
256
      throw new ConsensusGroupNotExistException(groupId);
1✔
257
    }
258
  }
1✔
259

260
  @Override
261
  public void addRemotePeer(ConsensusGroupId groupId, Peer peer) throws ConsensusException {
262
    IoTConsensusServerImpl impl =
×
263
        Optional.ofNullable(stateMachineMap.get(groupId))
×
264
            .orElseThrow(() -> new ConsensusGroupNotExistException(groupId));
×
265
    if (impl.getConfiguration().contains(peer)) {
×
266
      throw new PeerAlreadyInConsensusGroupException(groupId, peer);
×
267
    }
268
    try {
269
      // step 1: inactive new Peer to prepare for following steps
270
      logger.info("[IoTConsensus] inactivate new peer: {}", peer);
×
271
      impl.inactivePeer(peer);
×
272

273
      // step 2: notify all the other Peers to build the sync connection to newPeer
274
      logger.info("[IoTConsensus] notify current peers to build sync log...");
×
275
      impl.checkAndLockSafeDeletedSearchIndex();
×
276
      impl.notifyPeersToBuildSyncLogChannel(peer);
×
277

278
      // step 3: take snapshot
279
      logger.info("[IoTConsensus] start to take snapshot...");
×
280
      impl.takeSnapshot();
×
281

282
      // step 4: transit snapshot
283
      logger.info("[IoTConsensus] start to transit snapshot...");
×
284
      impl.transitSnapshot(peer);
×
285

286
      // step 5: let the new peer load snapshot
287
      logger.info("[IoTConsensus] trigger new peer to load snapshot...");
×
288
      impl.triggerSnapshotLoad(peer);
×
289

290
      // step 6: active new Peer
291
      logger.info("[IoTConsensus] activate new peer...");
×
292
      impl.activePeer(peer);
×
293

294
      // step 7: spot clean
295
      logger.info("[IoTConsensus] do spot clean...");
×
296
      doSpotClean(peer, impl);
×
297

298
    } catch (ConsensusGroupModifyPeerException e) {
×
299
      throw new ConsensusException(e.getMessage());
×
300
    }
×
301
  }
×
302

303
  private void doSpotClean(Peer peer, IoTConsensusServerImpl impl) {
304
    try {
305
      impl.cleanupRemoteSnapshot(peer);
×
306
    } catch (ConsensusGroupModifyPeerException e) {
×
307
      logger.warn("[IoTConsensus] failed to cleanup remote snapshot", e);
×
308
    }
×
309
  }
×
310

311
  @Override
312
  public void removeRemotePeer(ConsensusGroupId groupId, Peer peer) throws ConsensusException {
313
    IoTConsensusServerImpl impl =
×
314
        Optional.ofNullable(stateMachineMap.get(groupId))
×
315
            .orElseThrow(() -> new ConsensusGroupNotExistException(groupId));
×
316

317
    if (!impl.getConfiguration().contains(peer)) {
×
318
      throw new PeerNotInConsensusGroupException(groupId, peer.toString());
×
319
    }
320

321
    try {
322
      // let other peers remove the sync channel with target peer
323
      impl.notifyPeersToRemoveSyncLogChannel(peer);
×
324
    } catch (ConsensusGroupModifyPeerException e) {
×
325
      throw new ConsensusException(e.getMessage());
×
326
    }
×
327

328
    try {
329
      // let target peer reject new write
330
      impl.inactivePeer(peer);
×
331
      // wait its SyncLog to complete
332
      impl.waitTargetPeerUntilSyncLogCompleted(peer);
×
333
    } catch (ConsensusGroupModifyPeerException e) {
×
334
      throw new ConsensusException(e.getMessage());
×
335
    }
×
336
  }
×
337

338
  @Override
339
  public void transferLeader(ConsensusGroupId groupId, Peer newLeader) throws ConsensusException {
340
    throw new ConsensusException("IoTConsensus does not support leader transfer");
1✔
341
  }
342

343
  @Override
344
  public void triggerSnapshot(ConsensusGroupId groupId) throws ConsensusException {
345
    IoTConsensusServerImpl impl =
1✔
346
        Optional.ofNullable(stateMachineMap.get(groupId))
1✔
347
            .orElseThrow(() -> new ConsensusGroupNotExistException(groupId));
1✔
348
    try {
349
      impl.takeSnapshot();
1✔
350
    } catch (ConsensusGroupModifyPeerException e) {
×
351
      throw new ConsensusException(e.getMessage());
×
352
    }
1✔
353
  }
1✔
354

355
  @Override
356
  public boolean isLeader(ConsensusGroupId groupId) {
357
    return true;
×
358
  }
359

360
  @Override
361
  public boolean isLeaderReady(ConsensusGroupId groupId) {
362
    return true;
×
363
  }
364

365
  @Override
366
  public Peer getLeader(ConsensusGroupId groupId) {
367
    if (!stateMachineMap.containsKey(groupId)) {
×
368
      return null;
×
369
    }
370
    return new Peer(groupId, thisNodeId, thisNode);
×
371
  }
372

373
  @Override
374
  public List<ConsensusGroupId> getAllConsensusGroupIds() {
375
    return new ArrayList<>(stateMachineMap.keySet());
×
376
  }
377

378
  public IoTConsensusServerImpl getImpl(ConsensusGroupId groupId) {
379
    return stateMachineMap.get(groupId);
1✔
380
  }
381

382
  public static String buildPeerDir(File storageDir, ConsensusGroupId groupId) {
383
    return storageDir + File.separator + groupId.getType().getValue() + "_" + groupId.getId();
1✔
384
  }
385
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc