• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

apache / iotdb / #9905

23 Aug 2023 06:20AM UTC coverage: 47.785% (-0.1%) from 47.922%
#9905

push

travis_ci

web-flow
[To rel/1.2][Metric] Fix flush point statistics (#10934)

23 of 23 new or added lines in 1 file covered. (100.0%)

79851 of 167106 relevant lines covered (47.78%)

0.48 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

94.4
/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/consensus/DataRegionConsensusImpl.java
1
/*
2
 * Licensed to the Apache Software Foundation (ASF) under one
3
 * or more contributor license agreements.  See the NOTICE file
4
 * distributed with this work for additional information
5
 * regarding copyright ownership.  The ASF licenses this file
6
 * to you under the Apache License, Version 2.0 (the
7
 * "License"); you may not use this file except in compliance
8
 * with the License.  You may obtain a copy of the License at
9
 *
10
 *     http://www.apache.org/licenses/LICENSE-2.0
11
 *
12
 * Unless required by applicable law or agreed to in writing,
13
 * software distributed under the License is distributed on an
14
 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15
 * KIND, either express or implied.  See the License for the
16
 * specific language governing permissions and limitations
17
 * under the License.
18
 */
19

20
package org.apache.iotdb.db.consensus;
21

22
import org.apache.iotdb.common.rpc.thrift.TConsensusGroupType;
23
import org.apache.iotdb.common.rpc.thrift.TEndPoint;
24
import org.apache.iotdb.commons.consensus.ConsensusGroupId;
25
import org.apache.iotdb.commons.consensus.DataRegionId;
26
import org.apache.iotdb.consensus.ConsensusFactory;
27
import org.apache.iotdb.consensus.IConsensus;
28
import org.apache.iotdb.consensus.config.ConsensusConfig;
29
import org.apache.iotdb.consensus.config.IoTConsensusConfig;
30
import org.apache.iotdb.consensus.config.IoTConsensusConfig.RPC;
31
import org.apache.iotdb.consensus.config.RatisConfig;
32
import org.apache.iotdb.consensus.config.RatisConfig.Snapshot;
33
import org.apache.iotdb.db.conf.IoTDBConfig;
34
import org.apache.iotdb.db.conf.IoTDBDescriptor;
35
import org.apache.iotdb.db.consensus.statemachine.dataregion.DataRegionStateMachine;
36
import org.apache.iotdb.db.consensus.statemachine.dataregion.IoTConsensusDataRegionStateMachine;
37
import org.apache.iotdb.db.storageengine.StorageEngine;
38
import org.apache.iotdb.db.storageengine.dataregion.DataRegion;
39

40
import org.apache.ratis.util.SizeInBytes;
41
import org.apache.ratis.util.TimeDuration;
42

43
import java.util.concurrent.TimeUnit;
44

45
/**
46
 * We can use DataRegionConsensusImpl.getInstance() to obtain a consensus layer reference for
47
 * dataRegion's reading and writing
48
 */
49
public class DataRegionConsensusImpl {
50

51
  private DataRegionConsensusImpl() {
52
    // do nothing
53
  }
54

55
  public static IConsensus getInstance() {
56
    return DataRegionConsensusImplHolder.INSTANCE;
1✔
57
  }
58

59
  private static class DataRegionConsensusImplHolder {
60

61
    private static final IoTDBConfig CONF = IoTDBDescriptor.getInstance().getConfig();
1✔
62

63
    private static final IConsensus INSTANCE =
1✔
64
        ConsensusFactory.getConsensusImpl(
1✔
65
                CONF.getDataRegionConsensusProtocolClass(),
1✔
66
                ConsensusConfig.newBuilder()
1✔
67
                    .setThisNodeId(CONF.getDataNodeId())
1✔
68
                    .setThisNode(
1✔
69
                        new TEndPoint(CONF.getInternalAddress(), CONF.getDataRegionConsensusPort()))
1✔
70
                    .setStorageDir(CONF.getDataRegionConsensusDir())
1✔
71
                    .setConsensusGroupType(TConsensusGroupType.DataRegion)
1✔
72
                    .setIoTConsensusConfig(
1✔
73
                        IoTConsensusConfig.newBuilder()
1✔
74
                            .setRpc(
1✔
75
                                RPC.newBuilder()
1✔
76
                                    .setConnectionTimeoutInMs(CONF.getConnectionTimeoutInMS())
1✔
77
                                    .setRpcSelectorThreadNum(CONF.getRpcSelectorThreadCount())
1✔
78
                                    .setRpcMinConcurrentClientNum(
1✔
79
                                        CONF.getRpcMinConcurrentClientNum())
1✔
80
                                    .setRpcMaxConcurrentClientNum(
1✔
81
                                        CONF.getRpcMaxConcurrentClientNum())
1✔
82
                                    .setRpcThriftCompressionEnabled(
1✔
83
                                        CONF.isRpcThriftCompressionEnable())
1✔
84
                                    .setSelectorNumOfClientManager(
1✔
85
                                        CONF.getSelectorNumOfClientManager())
1✔
86
                                    .setThriftServerAwaitTimeForStopService(
1✔
87
                                        CONF.getThriftServerAwaitTimeForStopService())
1✔
88
                                    .setThriftMaxFrameSize(CONF.getThriftMaxFrameSize())
1✔
89
                                    .setCoreClientNumForEachNode(CONF.getCoreClientNumForEachNode())
1✔
90
                                    .setMaxClientNumForEachNode(CONF.getMaxClientNumForEachNode())
1✔
91
                                    .build())
1✔
92
                            .setReplication(
1✔
93
                                IoTConsensusConfig.Replication.newBuilder()
1✔
94
                                    .setWalThrottleThreshold(CONF.getThrottleThreshold())
1✔
95
                                    .setAllocateMemoryForConsensus(
1✔
96
                                        CONF.getAllocateMemoryForConsensus())
1✔
97
                                    .setMaxLogEntriesNumPerBatch(CONF.getMaxLogEntriesNumPerBatch())
1✔
98
                                    .setMaxSizePerBatch(CONF.getMaxSizePerBatch())
1✔
99
                                    .setMaxPendingBatchesNum(CONF.getMaxPendingBatchesNum())
1✔
100
                                    .setMaxMemoryRatioForQueue(CONF.getMaxMemoryRatioForQueue())
1✔
101
                                    .build())
1✔
102
                            .build())
1✔
103
                    .setRatisConfig(
1✔
104
                        RatisConfig.newBuilder()
1✔
105
                            // An empty log is committed after each restart, even if no data is
106
                            // written. This setting ensures that compaction work is not discarded
107
                            // even if there are frequent restarts
108
                            .setSnapshot(
1✔
109
                                Snapshot.newBuilder()
1✔
110
                                    .setCreationGap(1)
1✔
111
                                    .setAutoTriggerThreshold(
1✔
112
                                        CONF.getDataRatisConsensusSnapshotTriggerThreshold())
1✔
113
                                    .build())
1✔
114
                            .setLog(
1✔
115
                                RatisConfig.Log.newBuilder()
1✔
116
                                    .setUnsafeFlushEnabled(
1✔
117
                                        CONF.isDataRatisConsensusLogUnsafeFlushEnable())
1✔
118
                                    .setForceSyncNum(CONF.getDataRatisConsensusLogForceSyncNum())
1✔
119
                                    .setSegmentSizeMax(
1✔
120
                                        SizeInBytes.valueOf(
1✔
121
                                            CONF.getDataRatisConsensusLogSegmentSizeMax()))
1✔
122
                                    .setPreserveNumsWhenPurge(
1✔
123
                                        CONF.getDataRatisConsensusPreserveWhenPurge())
1✔
124
                                    .build())
1✔
125
                            .setGrpc(
1✔
126
                                RatisConfig.Grpc.newBuilder()
1✔
127
                                    .setFlowControlWindow(
1✔
128
                                        SizeInBytes.valueOf(
1✔
129
                                            CONF.getDataRatisConsensusGrpcFlowControlWindow()))
1✔
130
                                    .setLeaderOutstandingAppendsMax(
1✔
131
                                        CONF.getDataRatisConsensusGrpcLeaderOutstandingAppendsMax())
1✔
132
                                    .build())
1✔
133
                            .setRpc(
1✔
134
                                RatisConfig.Rpc.newBuilder()
1✔
135
                                    .setTimeoutMin(
1✔
136
                                        TimeDuration.valueOf(
1✔
137
                                            CONF.getDataRatisConsensusLeaderElectionTimeoutMinMs(),
1✔
138
                                            TimeUnit.MILLISECONDS))
139
                                    .setTimeoutMax(
1✔
140
                                        TimeDuration.valueOf(
1✔
141
                                            CONF.getDataRatisConsensusLeaderElectionTimeoutMaxMs(),
1✔
142
                                            TimeUnit.MILLISECONDS))
143
                                    .setRequestTimeout(
1✔
144
                                        TimeDuration.valueOf(
1✔
145
                                            CONF.getDataRatisConsensusRequestTimeoutMs(),
1✔
146
                                            TimeUnit.MILLISECONDS))
147
                                    .setFirstElectionTimeoutMin(
1✔
148
                                        TimeDuration.valueOf(
1✔
149
                                            CONF.getRatisFirstElectionTimeoutMinMs(),
1✔
150
                                            TimeUnit.MILLISECONDS))
151
                                    .setFirstElectionTimeoutMax(
1✔
152
                                        TimeDuration.valueOf(
1✔
153
                                            CONF.getRatisFirstElectionTimeoutMaxMs(),
1✔
154
                                            TimeUnit.MILLISECONDS))
155
                                    .build())
1✔
156
                            .setClient(
1✔
157
                                RatisConfig.Client.newBuilder()
1✔
158
                                    .setClientRequestTimeoutMillis(
1✔
159
                                        CONF.getDataRatisConsensusRequestTimeoutMs())
1✔
160
                                    .setClientMaxRetryAttempt(
1✔
161
                                        CONF.getDataRatisConsensusMaxRetryAttempts())
1✔
162
                                    .setClientRetryInitialSleepTimeMs(
1✔
163
                                        CONF.getDataRatisConsensusInitialSleepTimeMs())
1✔
164
                                    .setClientRetryMaxSleepTimeMs(
1✔
165
                                        CONF.getDataRatisConsensusMaxSleepTimeMs())
1✔
166
                                    .setCoreClientNumForEachNode(CONF.getCoreClientNumForEachNode())
1✔
167
                                    .setMaxClientNumForEachNode(CONF.getMaxClientNumForEachNode())
1✔
168
                                    .build())
1✔
169
                            .setImpl(
1✔
170
                                RatisConfig.Impl.newBuilder()
1✔
171
                                    .setTriggerSnapshotFileSize(CONF.getDataRatisLogMax())
1✔
172
                                    .build())
1✔
173
                            .setLeaderLogAppender(
1✔
174
                                RatisConfig.LeaderLogAppender.newBuilder()
1✔
175
                                    .setBufferByteLimit(
1✔
176
                                        CONF.getDataRatisConsensusLogAppenderBufferSizeMax())
1✔
177
                                    .build())
1✔
178
                            .setRead(
1✔
179
                                RatisConfig.Read.newBuilder()
1✔
180
                                    // use thrift connection timeout to unify read timeout
181
                                    .setReadTimeout(
1✔
182
                                        TimeDuration.valueOf(
1✔
183
                                            CONF.getConnectionTimeoutInMS(), TimeUnit.MILLISECONDS))
1✔
184
                                    .build())
1✔
185
                            .build())
1✔
186
                    .build(),
1✔
187
                DataRegionConsensusImplHolder::createDataRegionStateMachine)
188
            .orElseThrow(
1✔
189
                () ->
190
                    new IllegalArgumentException(
×
191
                        String.format(
×
192
                            ConsensusFactory.CONSTRUCT_FAILED_MSG,
193
                            CONF.getDataRegionConsensusProtocolClass())));
×
194

195
    private static DataRegionStateMachine createDataRegionStateMachine(ConsensusGroupId gid) {
196
      DataRegion dataRegion = StorageEngine.getInstance().getDataRegion((DataRegionId) gid);
×
197
      if (ConsensusFactory.IOT_CONSENSUS.equals(CONF.getDataRegionConsensusProtocolClass())) {
×
198
        return new IoTConsensusDataRegionStateMachine(dataRegion);
×
199
      } else {
200
        return new DataRegionStateMachine(dataRegion);
×
201
      }
202
    }
203
  }
204
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc