• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

apache / iotdb / #9905

23 Aug 2023 06:20AM UTC coverage: 47.785% (-0.1%) from 47.922%
#9905

push

travis_ci

web-flow
[To rel/1.2][Metric] Fix flush point statistics (#10934)

23 of 23 new or added lines in 1 file covered. (100.0%)

79851 of 167106 relevant lines covered (47.78%)

0.48 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

82.95
/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/simple/SimpleConsensus.java
1
/*
2
 * Licensed to the Apache Software Foundation (ASF) under one
3
 * or more contributor license agreements.  See the NOTICE file
4
 * distributed with this work for additional information
5
 * regarding copyright ownership.  The ASF licenses this file
6
 * to you under the Apache License, Version 2.0 (the
7
 * "License"); you may not use this file except in compliance
8
 * with the License.  You may obtain a copy of the License at
9
 *
10
 *     http://www.apache.org/licenses/LICENSE-2.0
11
 *
12
 * Unless required by applicable law or agreed to in writing,
13
 * software distributed under the License is distributed on an
14
 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15
 * KIND, either express or implied.  See the License for the
16
 * specific language governing permissions and limitations
17
 * under the License.
18
 */
19

20
package org.apache.iotdb.consensus.simple;
21

22
import org.apache.iotdb.common.rpc.thrift.TEndPoint;
23
import org.apache.iotdb.common.rpc.thrift.TSStatus;
24
import org.apache.iotdb.commons.consensus.ConsensusGroupId;
25
import org.apache.iotdb.commons.consensus.DataRegionId;
26
import org.apache.iotdb.commons.service.metric.PerformanceOverviewMetrics;
27
import org.apache.iotdb.commons.utils.FileUtils;
28
import org.apache.iotdb.commons.utils.StatusUtils;
29
import org.apache.iotdb.consensus.IConsensus;
30
import org.apache.iotdb.consensus.IStateMachine;
31
import org.apache.iotdb.consensus.IStateMachine.Registry;
32
import org.apache.iotdb.consensus.common.DataSet;
33
import org.apache.iotdb.consensus.common.Peer;
34
import org.apache.iotdb.consensus.common.request.IConsensusRequest;
35
import org.apache.iotdb.consensus.config.ConsensusConfig;
36
import org.apache.iotdb.consensus.exception.ConsensusException;
37
import org.apache.iotdb.consensus.exception.ConsensusGroupAlreadyExistException;
38
import org.apache.iotdb.consensus.exception.ConsensusGroupNotExistException;
39
import org.apache.iotdb.consensus.exception.IllegalPeerEndpointException;
40
import org.apache.iotdb.consensus.exception.IllegalPeerNumException;
41
import org.apache.iotdb.rpc.TSStatusCode;
42

43
import org.slf4j.Logger;
44
import org.slf4j.LoggerFactory;
45

46
import java.io.File;
47
import java.io.IOException;
48
import java.nio.file.DirectoryStream;
49
import java.nio.file.Files;
50
import java.nio.file.Path;
51
import java.util.ArrayList;
52
import java.util.List;
53
import java.util.Map;
54
import java.util.Optional;
55
import java.util.concurrent.ConcurrentHashMap;
56
import java.util.concurrent.atomic.AtomicBoolean;
57

58
/**
59
 * A simple consensus implementation, which can be used when replicaNum is 1.
60
 *
61
 * <p>Notice: The stateMachine needs to implement WAL itself to ensure recovery after a restart
62
 */
63
class SimpleConsensus implements IConsensus {
64

65
  private final Logger logger = LoggerFactory.getLogger(SimpleConsensus.class);
1✔
66

67
  private final TEndPoint thisNode;
68
  private final int thisNodeId;
69
  private final File storageDir;
70
  private final IStateMachine.Registry registry;
71
  private final Map<ConsensusGroupId, SimpleConsensusServerImpl> stateMachineMap =
1✔
72
      new ConcurrentHashMap<>();
73
  private static final PerformanceOverviewMetrics PERFORMANCE_OVERVIEW_METRICS =
1✔
74
      PerformanceOverviewMetrics.getInstance();
1✔
75

76
  public SimpleConsensus(ConsensusConfig config, Registry registry) {
1✔
77
    this.thisNode = config.getThisNodeEndPoint();
1✔
78
    this.thisNodeId = config.getThisNodeId();
1✔
79
    this.storageDir = new File(config.getStorageDir());
1✔
80
    this.registry = registry;
1✔
81
  }
1✔
82

83
  @Override
84
  public synchronized void start() throws IOException {
85
    initAndRecover();
1✔
86
  }
1✔
87

88
  private void initAndRecover() throws IOException {
89
    if (!storageDir.exists()) {
1✔
90
      if (!storageDir.mkdirs()) {
1✔
91
        throw new IOException(String.format("Unable to create consensus dir at %s", storageDir));
×
92
      }
93
    } else {
94
      try (DirectoryStream<Path> stream = Files.newDirectoryStream(storageDir.toPath())) {
1✔
95
        for (Path path : stream) {
1✔
96
          String[] items = path.getFileName().toString().split("_");
1✔
97
          ConsensusGroupId consensusGroupId =
1✔
98
              ConsensusGroupId.Factory.create(
1✔
99
                  Integer.parseInt(items[0]), Integer.parseInt(items[1]));
1✔
100
          SimpleConsensusServerImpl consensus =
1✔
101
              new SimpleConsensusServerImpl(
102
                  new Peer(consensusGroupId, thisNodeId, thisNode),
103
                  registry.apply(consensusGroupId));
1✔
104
          stateMachineMap.put(consensusGroupId, consensus);
1✔
105
          consensus.start();
1✔
106
        }
1✔
107
      }
108
    }
109
  }
1✔
110

111
  @Override
112
  public synchronized void stop() throws IOException {
113
    stateMachineMap.values().parallelStream().forEach(SimpleConsensusServerImpl::stop);
1✔
114
  }
1✔
115

116
  @Override
117
  public TSStatus write(ConsensusGroupId groupId, IConsensusRequest request)
118
      throws ConsensusException {
119
    SimpleConsensusServerImpl impl =
1✔
120
        Optional.ofNullable(stateMachineMap.get(groupId))
1✔
121
            .orElseThrow(() -> new ConsensusGroupNotExistException(groupId));
1✔
122
    if (impl.isReadOnly()) {
1✔
123
      return StatusUtils.getStatus(TSStatusCode.SYSTEM_READ_ONLY);
×
124
    } else {
125
      TSStatus status;
126
      if (groupId instanceof DataRegionId) {
1✔
127
        long startWriteTime = System.nanoTime();
1✔
128
        status = impl.write(request);
1✔
129
        // only record time cost for data region in Performance Overview Dashboard
130
        PERFORMANCE_OVERVIEW_METRICS.recordEngineCost(System.nanoTime() - startWriteTime);
1✔
131
      } else {
1✔
132
        status = impl.write(request);
1✔
133
      }
134
      return status;
1✔
135
    }
136
  }
137

138
  @Override
139
  public DataSet read(ConsensusGroupId groupId, IConsensusRequest request)
140
      throws ConsensusException {
141
    return Optional.ofNullable(stateMachineMap.get(groupId))
×
142
        .orElseThrow(() -> new ConsensusGroupNotExistException(groupId))
×
143
        .read(request);
×
144
  }
145

146
  @SuppressWarnings("java:S2201")
147
  @Override
148
  public void createLocalPeer(ConsensusGroupId groupId, List<Peer> peers)
149
      throws ConsensusException {
150
    int consensusGroupSize = peers.size();
1✔
151
    if (consensusGroupSize != 1) {
1✔
152
      throw new IllegalPeerNumException(consensusGroupSize);
1✔
153
    }
154
    if (!peers.contains(new Peer(groupId, thisNodeId, thisNode))) {
1✔
155
      throw new IllegalPeerEndpointException(thisNode, peers);
1✔
156
    }
157
    AtomicBoolean exist = new AtomicBoolean(true);
1✔
158
    Optional.ofNullable(
1✔
159
            stateMachineMap.computeIfAbsent(
1✔
160
                groupId,
161
                k -> {
162
                  exist.set(false);
1✔
163

164
                  String path = buildPeerDir(groupId);
1✔
165
                  File file = new File(path);
1✔
166
                  if (!file.mkdirs()) {
1✔
167
                    logger.warn("Unable to create consensus dir for group {} at {}", groupId, path);
×
168
                    return null;
×
169
                  }
170

171
                  SimpleConsensusServerImpl impl =
1✔
172
                      new SimpleConsensusServerImpl(peers.get(0), registry.apply(groupId));
1✔
173
                  impl.start();
1✔
174
                  return impl;
1✔
175
                }))
176
        .orElseThrow(
1✔
177
            () ->
178
                new ConsensusException(
×
179
                    String.format("Unable to create consensus dir for group %s", groupId)));
×
180
    if (exist.get()) {
1✔
181
      throw new ConsensusGroupAlreadyExistException(groupId);
1✔
182
    }
183
  }
1✔
184

185
  @Override
186
  public void deleteLocalPeer(ConsensusGroupId groupId) throws ConsensusException {
187
    AtomicBoolean exist = new AtomicBoolean(false);
1✔
188
    stateMachineMap.computeIfPresent(
1✔
189
        groupId,
190
        (k, v) -> {
191
          exist.set(true);
1✔
192
          v.stop();
1✔
193
          FileUtils.deleteDirectory(new File(buildPeerDir(groupId)));
1✔
194
          return null;
1✔
195
        });
196
    if (!exist.get()) {
1✔
197
      throw new ConsensusGroupNotExistException(groupId);
1✔
198
    }
199
  }
1✔
200

201
  @Override
202
  public void addRemotePeer(ConsensusGroupId groupId, Peer peer) throws ConsensusException {
203
    throw new ConsensusException("SimpleConsensus does not support membership changes");
1✔
204
  }
205

206
  @Override
207
  public void removeRemotePeer(ConsensusGroupId groupId, Peer peer) throws ConsensusException {
208
    throw new ConsensusException("SimpleConsensus does not support membership changes");
1✔
209
  }
210

211
  @Override
212
  public void transferLeader(ConsensusGroupId groupId, Peer newLeader) throws ConsensusException {
213
    throw new ConsensusException("SimpleConsensus does not support leader transfer");
1✔
214
  }
215

216
  @Override
217
  public void triggerSnapshot(ConsensusGroupId groupId) throws ConsensusException {
218
    throw new ConsensusException("SimpleConsensus does not support snapshot trigger currently");
1✔
219
  }
220

221
  @Override
222
  public boolean isLeader(ConsensusGroupId groupId) {
223
    return true;
×
224
  }
225

226
  @Override
227
  public boolean isLeaderReady(ConsensusGroupId groupId) {
228
    return true;
×
229
  }
230

231
  @Override
232
  public Peer getLeader(ConsensusGroupId groupId) {
233
    if (!stateMachineMap.containsKey(groupId)) {
×
234
      return null;
×
235
    }
236
    return new Peer(groupId, thisNodeId, thisNode);
×
237
  }
238

239
  @Override
240
  public List<ConsensusGroupId> getAllConsensusGroupIds() {
241
    return new ArrayList<>(stateMachineMap.keySet());
×
242
  }
243

244
  private String buildPeerDir(ConsensusGroupId groupId) {
245
    return storageDir + File.separator + groupId.getType().getValue() + "_" + groupId.getId();
1✔
246
  }
247
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc