• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

apache / iotdb / #9999

05 Sep 2023 08:10AM CUT coverage: 47.669% (-0.03%) from 47.697%
#9999

push

travis_ci

web-flow
[IOTDB-6130] Delete data by specific pattern didn't work

80151 of 168139 relevant lines covered (47.67%)

0.48 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

99.28
/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/ratis/utils/Utils.java
1
/*
2
 * Licensed to the Apache Software Foundation (ASF) under one
3
 * or more contributor license agreements.  See the NOTICE file
4
 * distributed with this work for additional information
5
 * regarding copyright ownership.  The ASF licenses this file
6
 * to you under the Apache License, Version 2.0 (the
7
 * "License"); you may not use this file except in compliance
8
 * with the License.  You may obtain a copy of the License at
9
 *
10
 *     http://www.apache.org/licenses/LICENSE-2.0
11
 *
12
 * Unless required by applicable law or agreed to in writing,
13
 * software distributed under the License is distributed on an
14
 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15
 * KIND, either express or implied.  See the License for the
16
 * specific language governing permissions and limitations
17
 * under the License.
18
 */
19

20
package org.apache.iotdb.consensus.ratis.utils;
21

22
import org.apache.iotdb.common.rpc.thrift.TConsensusGroupType;
23
import org.apache.iotdb.common.rpc.thrift.TEndPoint;
24
import org.apache.iotdb.common.rpc.thrift.TSStatus;
25
import org.apache.iotdb.commons.conf.CommonConfig;
26
import org.apache.iotdb.commons.conf.CommonDescriptor;
27
import org.apache.iotdb.commons.consensus.ConsensusGroupId;
28
import org.apache.iotdb.consensus.common.Peer;
29
import org.apache.iotdb.consensus.config.RatisConfig;
30
import org.apache.iotdb.rpc.AutoScalingBufferWriteTransport;
31

32
import org.apache.ratis.client.RaftClientConfigKeys;
33
import org.apache.ratis.conf.RaftProperties;
34
import org.apache.ratis.grpc.GrpcConfigKeys;
35
import org.apache.ratis.proto.RaftProtos.RaftPeerProto;
36
import org.apache.ratis.protocol.RaftGroupId;
37
import org.apache.ratis.protocol.RaftPeer;
38
import org.apache.ratis.protocol.RaftPeerId;
39
import org.apache.ratis.server.RaftServerConfigKeys;
40
import org.apache.ratis.server.protocol.TermIndex;
41
import org.apache.ratis.thirdparty.com.google.protobuf.ByteString;
42
import org.apache.thrift.TException;
43
import org.apache.thrift.protocol.TCompactProtocol;
44
import org.apache.thrift.transport.TByteBuffer;
45

46
import java.io.File;
47
import java.nio.ByteBuffer;
48
import java.util.List;
49
import java.util.stream.Collectors;
50

51
public class Utils {
52
  private static final int TEMP_BUFFER_SIZE = 1024;
53
  private static final byte PADDING_MAGIC = 0x47;
54
  private static final String DATA_REGION_GROUP = "group-0001";
55
  private static final String SCHEMA_REGION_GROUP = "group-0002";
56
  private static final CommonConfig config = CommonDescriptor.getInstance().getConfig();
1✔
57

58
  private Utils() {}
59

60
  public static String hostAddress(TEndPoint endpoint) {
61
    return String.format("%s:%d", endpoint.getIp(), endpoint.getPort());
1✔
62
  }
63

64
  public static String fromTEndPointToString(TEndPoint endpoint) {
65
    return String.format("%s_%d", endpoint.getIp(), endpoint.getPort());
1✔
66
  }
67

68
  /** Encode the ConsensusGroupId into 6 bytes: 2 Bytes for Group Type and 4 Bytes for Group ID. */
69
  public static long groupEncode(ConsensusGroupId consensusGroupId) {
70
    // use abbreviations to prevent overflow
71
    long groupType = consensusGroupId.getType().getValue();
1✔
72
    long groupCode = groupType << 32;
1✔
73
    groupCode += consensusGroupId.getId();
1✔
74
    return groupCode;
1✔
75
  }
76

77
  public static RaftPeerId fromNodeIdToRaftPeerId(int nodeId) {
78
    return RaftPeerId.valueOf(String.valueOf(nodeId));
1✔
79
  }
80

81
  public static TEndPoint fromRaftPeerAddressToTEndPoint(String address) {
82
    String[] items = address.split(":");
1✔
83
    return new TEndPoint(items[0], Integer.parseInt(items[1]));
1✔
84
  }
85

86
  public static int fromRaftPeerIdToNodeId(RaftPeerId id) {
87
    return Integer.parseInt(id.toString());
1✔
88
  }
89

90
  public static TEndPoint fromRaftPeerProtoToTEndPoint(RaftPeerProto proto) {
91
    String[] items = proto.getAddress().split(":");
1✔
92
    return new TEndPoint(items[0], Integer.parseInt(items[1]));
1✔
93
  }
94

95
  // priority is used as ordinal of leader election
96
  public static RaftPeer fromNodeInfoAndPriorityToRaftPeer(
97
      int nodeId, TEndPoint endpoint, int priority) {
98
    return RaftPeer.newBuilder()
1✔
99
        .setId(fromNodeIdToRaftPeerId(nodeId))
1✔
100
        .setAddress(hostAddress(endpoint))
1✔
101
        .setPriority(priority)
1✔
102
        .build();
1✔
103
  }
104

105
  public static RaftPeer fromPeerAndPriorityToRaftPeer(Peer peer, int priority) {
106
    return fromNodeInfoAndPriorityToRaftPeer(peer.getNodeId(), peer.getEndpoint(), priority);
1✔
107
  }
108

109
  public static List<RaftPeer> fromPeersAndPriorityToRaftPeers(List<Peer> peers, int priority) {
110
    return peers.stream()
1✔
111
        .map(peer -> Utils.fromPeerAndPriorityToRaftPeer(peer, priority))
1✔
112
        .collect(Collectors.toList());
1✔
113
  }
114

115
  public static int fromRaftPeerProtoToNodeId(RaftPeerProto proto) {
116
    return Integer.parseInt(proto.getId().toStringUtf8());
1✔
117
  }
118

119
  public static List<Peer> fromRaftProtoListAndRaftGroupIdToPeers(
120
      List<RaftPeerProto> raftProtoList, RaftGroupId id) {
121
    ConsensusGroupId consensusGroupId = Utils.fromRaftGroupIdToConsensusGroupId(id);
1✔
122
    return raftProtoList.stream()
1✔
123
        .map(
1✔
124
            peer ->
125
                new Peer(
1✔
126
                    consensusGroupId,
127
                    Utils.fromRaftPeerProtoToNodeId(peer),
1✔
128
                    Utils.fromRaftPeerProtoToTEndPoint(peer)))
1✔
129
        .collect(Collectors.toList());
1✔
130
  }
131

132
  /** Given ConsensusGroupId, generate a deterministic RaftGroupId current scheme. */
133
  public static RaftGroupId fromConsensusGroupIdToRaftGroupId(ConsensusGroupId consensusGroupId) {
134
    long groupCode = groupEncode(consensusGroupId);
1✔
135
    byte[] byteGroupCode = ByteBuffer.allocate(Long.BYTES).putLong(groupCode).array();
1✔
136
    byte[] bytePaddedGroupName = new byte[16];
1✔
137
    for (int i = 0; i < 10; i++) {
1✔
138
      bytePaddedGroupName[i] = PADDING_MAGIC;
1✔
139
    }
140
    System.arraycopy(byteGroupCode, 2, bytePaddedGroupName, 10, byteGroupCode.length - 2);
1✔
141

142
    return RaftGroupId.valueOf(ByteString.copyFrom(bytePaddedGroupName));
1✔
143
  }
144

145
  /** Given raftGroupId, decrypt ConsensusGroupId out of it. */
146
  public static ConsensusGroupId fromRaftGroupIdToConsensusGroupId(RaftGroupId raftGroupId) {
147
    byte[] padded = raftGroupId.toByteString().toByteArray();
1✔
148
    long type = (long) ((padded[10] & 0xff) << 8) + (padded[11] & 0xff);
1✔
149
    ByteBuffer byteBuffer = ByteBuffer.allocate(Integer.BYTES);
1✔
150
    byteBuffer.put(padded, 12, 4);
1✔
151
    byteBuffer.flip();
1✔
152
    return ConsensusGroupId.Factory.create((int) type, byteBuffer.getInt());
1✔
153
  }
154

155
  public static ByteBuffer serializeTSStatus(TSStatus status) throws TException {
156
    AutoScalingBufferWriteTransport byteBuffer =
1✔
157
        new AutoScalingBufferWriteTransport(TEMP_BUFFER_SIZE);
158
    TCompactProtocol protocol = new TCompactProtocol(byteBuffer);
1✔
159
    status.write(protocol);
1✔
160
    return ByteBuffer.wrap(byteBuffer.getBuffer());
1✔
161
  }
162

163
  public static TSStatus deserializeFrom(ByteBuffer buffer) throws TException {
164
    TSStatus status = new TSStatus();
1✔
165
    TByteBuffer byteBuffer = new TByteBuffer(buffer);
1✔
166
    TCompactProtocol protocol = new TCompactProtocol(byteBuffer);
1✔
167
    status.read(protocol);
1✔
168
    return status;
1✔
169
  }
170

171
  public static String getMetadataFromTermIndex(TermIndex termIndex) {
172
    return String.format("%d_%d", termIndex.getTerm(), termIndex.getIndex());
1✔
173
  }
174

175
  public static TermIndex getTermIndexFromDir(File snapshotDir) {
176
    return getTermIndexFromMetadataString(snapshotDir.getName());
1✔
177
  }
178

179
  public static TermIndex getTermIndexFromMetadataString(String metadata) {
180
    String[] items = metadata.split("_");
1✔
181
    return TermIndex.valueOf(Long.parseLong(items[0]), Long.parseLong(items[1]));
1✔
182
  }
183

184
  public static TConsensusGroupType getConsensusGroupTypeFromPrefix(String prefix) {
185
    TConsensusGroupType consensusGroupType;
186
    if (prefix.contains(DATA_REGION_GROUP)) {
1✔
187
      consensusGroupType = TConsensusGroupType.DataRegion;
1✔
188
    } else if (prefix.contains(SCHEMA_REGION_GROUP)) {
1✔
189
      consensusGroupType = TConsensusGroupType.SchemaRegion;
1✔
190
    } else {
191
      consensusGroupType = TConsensusGroupType.ConfigRegion;
1✔
192
    }
193
    return consensusGroupType;
1✔
194
  }
195

196
  public static boolean rejectWrite() {
197
    return config.isReadOnly();
1✔
198
  }
199

200
  /**
201
   * Normally, the RatisConsensus should reject write when system is read-only, i.e, {@link
202
   * #rejectWrite()}. However, Ratis RaftServer close() will wait for applyIndex advancing to
203
   * commitIndex. So when the system is shutting down, RatisConsensus should still allow
204
   * statemachine to apply while rejecting new client write requests.
205
   */
206
  public static boolean stallApply() {
207
    return config.isReadOnly() && !config.isStopping();
1✔
208
  }
209

210
  public static void initRatisConfig(RaftProperties properties, RatisConfig config) {
211
    GrpcConfigKeys.setMessageSizeMax(properties, config.getGrpc().getMessageSizeMax());
1✔
212
    GrpcConfigKeys.setFlowControlWindow(properties, config.getGrpc().getFlowControlWindow());
1✔
213
    GrpcConfigKeys.Server.setAsyncRequestThreadPoolCached(
1✔
214
        properties, config.getGrpc().isAsyncRequestThreadPoolCached());
1✔
215
    GrpcConfigKeys.Server.setAsyncRequestThreadPoolSize(
1✔
216
        properties, config.getGrpc().getAsyncRequestThreadPoolSize());
1✔
217
    GrpcConfigKeys.Server.setLeaderOutstandingAppendsMax(
1✔
218
        properties, config.getGrpc().getLeaderOutstandingAppendsMax());
1✔
219

220
    RaftServerConfigKeys.Rpc.setSlownessTimeout(properties, config.getRpc().getSlownessTimeout());
1✔
221
    RaftServerConfigKeys.Rpc.setTimeoutMin(properties, config.getRpc().getTimeoutMin());
1✔
222
    RaftServerConfigKeys.Rpc.setTimeoutMax(properties, config.getRpc().getTimeoutMax());
1✔
223
    RaftServerConfigKeys.Rpc.setSleepTime(properties, config.getRpc().getSleepTime());
1✔
224
    RaftClientConfigKeys.Rpc.setRequestTimeout(properties, config.getRpc().getRequestTimeout());
1✔
225

226
    RaftServerConfigKeys.LeaderElection.setLeaderStepDownWaitTime(
1✔
227
        properties, config.getLeaderElection().getLeaderStepDownWaitTimeKey());
1✔
228
    RaftServerConfigKeys.LeaderElection.setPreVote(
1✔
229
        properties, config.getLeaderElection().isPreVote());
1✔
230

231
    RaftServerConfigKeys.Snapshot.setAutoTriggerEnabled(
1✔
232
        properties, config.getSnapshot().isAutoTriggerEnabled());
1✔
233
    RaftServerConfigKeys.Snapshot.setAutoTriggerThreshold(
1✔
234
        properties, config.getSnapshot().getAutoTriggerThreshold());
1✔
235
    RaftServerConfigKeys.Snapshot.setCreationGap(properties, config.getSnapshot().getCreationGap());
1✔
236
    RaftServerConfigKeys.Snapshot.setRetentionFileNum(
1✔
237
        properties, config.getSnapshot().getRetentionFileNum());
1✔
238
    // FIXME: retain 2 copies to avoid race conditions between (delete) and (transfer)
239
    RaftServerConfigKeys.Snapshot.setRetentionFileNum(properties, 2);
1✔
240

241
    RaftServerConfigKeys.ThreadPool.setClientCached(
1✔
242
        properties, config.getThreadPool().isClientCached());
1✔
243
    RaftServerConfigKeys.ThreadPool.setClientSize(
1✔
244
        properties, config.getThreadPool().getClientSize());
1✔
245
    RaftServerConfigKeys.ThreadPool.setProxyCached(
1✔
246
        properties, config.getThreadPool().isProxyCached());
1✔
247
    RaftServerConfigKeys.ThreadPool.setProxySize(properties, config.getThreadPool().getProxySize());
1✔
248
    RaftServerConfigKeys.ThreadPool.setServerCached(
1✔
249
        properties, config.getThreadPool().isServerCached());
1✔
250
    RaftServerConfigKeys.ThreadPool.setServerSize(
1✔
251
        properties, config.getThreadPool().getServerSize());
1✔
252

253
    RaftServerConfigKeys.Log.setUseMemory(properties, config.getLog().isUseMemory());
1✔
254
    RaftServerConfigKeys.Log.setQueueElementLimit(
1✔
255
        properties, config.getLog().getQueueElementLimit());
1✔
256
    RaftServerConfigKeys.Log.setQueueByteLimit(properties, config.getLog().getQueueByteLimit());
1✔
257
    RaftServerConfigKeys.Log.setPurgeGap(properties, config.getLog().getPurgeGap());
1✔
258
    RaftServerConfigKeys.Log.setPurgeUptoSnapshotIndex(
1✔
259
        properties, config.getLog().isPurgeUptoSnapshotIndex());
1✔
260
    RaftServerConfigKeys.Log.setPurgePreservationLogNum(
1✔
261
        properties, config.getLog().getPreserveNumsWhenPurge());
1✔
262
    RaftServerConfigKeys.Log.setSegmentSizeMax(properties, config.getLog().getSegmentSizeMax());
1✔
263
    RaftServerConfigKeys.Log.setSegmentCacheNumMax(
1✔
264
        properties, config.getLog().getSegmentCacheNumMax());
1✔
265
    RaftServerConfigKeys.Log.setSegmentCacheSizeMax(
1✔
266
        properties, config.getLog().getSegmentCacheSizeMax());
1✔
267
    RaftServerConfigKeys.Log.setPreallocatedSize(properties, config.getLog().getPreallocatedSize());
1✔
268
    RaftServerConfigKeys.Log.setWriteBufferSize(properties, config.getLog().getWriteBufferSize());
1✔
269
    RaftServerConfigKeys.Log.setForceSyncNum(properties, config.getLog().getForceSyncNum());
1✔
270
    RaftServerConfigKeys.Log.setUnsafeFlushEnabled(
1✔
271
        properties, config.getLog().isUnsafeFlushEnabled());
1✔
272
    RaftServerConfigKeys.Log.setCorruptionPolicy(
1✔
273
        properties, RaftServerConfigKeys.Log.CorruptionPolicy.WARN_AND_RETURN);
274

275
    RaftServerConfigKeys.Log.Appender.setBufferByteLimit(
1✔
276
        properties, config.getLeaderLogAppender().getBufferByteLimit());
1✔
277
    RaftServerConfigKeys.Log.Appender.setSnapshotChunkSizeMax(
1✔
278
        properties, config.getLeaderLogAppender().getSnapshotChunkSizeMax());
1✔
279
    RaftServerConfigKeys.Log.Appender.setInstallSnapshotEnabled(
1✔
280
        properties, config.getLeaderLogAppender().isInstallSnapshotEnabled());
1✔
281

282
    GrpcConfigKeys.Server.setHeartbeatChannel(properties, true);
1✔
283
    RaftServerConfigKeys.Rpc.setFirstElectionTimeoutMin(
1✔
284
        properties, config.getRpc().getFirstElectionTimeoutMin());
1✔
285
    RaftServerConfigKeys.Rpc.setFirstElectionTimeoutMax(
1✔
286
        properties, config.getRpc().getFirstElectionTimeoutMax());
1✔
287

288
    RaftServerConfigKeys.Read.Option option =
289
        config.getRead().getReadOption() == RatisConfig.Read.Option.DEFAULT
1✔
290
            ? RaftServerConfigKeys.Read.Option.DEFAULT
×
291
            : RaftServerConfigKeys.Read.Option.LINEARIZABLE;
1✔
292
    RaftServerConfigKeys.Read.setOption(properties, option);
1✔
293
    RaftServerConfigKeys.Read.setTimeout(properties, config.getRead().getReadTimeout());
1✔
294

295
    RaftServerConfigKeys.setSleepDeviationThreshold(
1✔
296
        properties, config.getUtils().getSleepDeviationThresholdMs());
1✔
297
  }
1✔
298
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc