• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

apache / iotdb / #9733

pending completion
#9733

push

travis_ci

web-flow
[To rel/1.2] Add compression and encoding type check for FastCompactionPerformer (#10712)

32 of 32 new or added lines in 4 files covered. (100.0%)

79232 of 165563 relevant lines covered (47.86%)

0.48 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

0.0
/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/node/NodeManager.java
1
/*
2
 * Licensed to the Apache Software Foundation (ASF) under one
3
 * or more contributor license agreements.  See the NOTICE file
4
 * distributed with this work for additional information
5
 * regarding copyright ownership.  The ASF licenses this file
6
 * to you under the Apache License, Version 2.0 (the
7
 * "License"); you may not use this file except in compliance
8
 * with the License.  You may obtain a copy of the License at
9
 *
10
 *     http://www.apache.org/licenses/LICENSE-2.0
11
 *
12
 * Unless required by applicable law or agreed to in writing,
13
 * software distributed under the License is distributed on an
14
 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15
 * KIND, either express or implied.  See the License for the
16
 * specific language governing permissions and limitations
17
 * under the License.
18
 */
19

20
package org.apache.iotdb.confignode.manager.node;
21

22
import org.apache.iotdb.common.rpc.thrift.TConfigNodeLocation;
23
import org.apache.iotdb.common.rpc.thrift.TDataNodeConfiguration;
24
import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation;
25
import org.apache.iotdb.common.rpc.thrift.TFlushReq;
26
import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet;
27
import org.apache.iotdb.common.rpc.thrift.TSStatus;
28
import org.apache.iotdb.commons.cluster.NodeStatus;
29
import org.apache.iotdb.commons.cluster.RegionRoleType;
30
import org.apache.iotdb.commons.conf.CommonConfig;
31
import org.apache.iotdb.commons.conf.CommonDescriptor;
32
import org.apache.iotdb.commons.consensus.ConsensusGroupId;
33
import org.apache.iotdb.commons.service.metric.MetricService;
34
import org.apache.iotdb.confignode.client.DataNodeRequestType;
35
import org.apache.iotdb.confignode.client.async.AsyncDataNodeClientPool;
36
import org.apache.iotdb.confignode.client.async.handlers.AsyncClientHandler;
37
import org.apache.iotdb.confignode.client.sync.SyncDataNodeClientPool;
38
import org.apache.iotdb.confignode.conf.ConfigNodeConfig;
39
import org.apache.iotdb.confignode.conf.ConfigNodeDescriptor;
40
import org.apache.iotdb.confignode.consensus.request.read.datanode.GetDataNodeConfigurationPlan;
41
import org.apache.iotdb.confignode.consensus.request.write.confignode.ApplyConfigNodePlan;
42
import org.apache.iotdb.confignode.consensus.request.write.confignode.RemoveConfigNodePlan;
43
import org.apache.iotdb.confignode.consensus.request.write.confignode.UpdateVersionInfoPlan;
44
import org.apache.iotdb.confignode.consensus.request.write.datanode.RegisterDataNodePlan;
45
import org.apache.iotdb.confignode.consensus.request.write.datanode.RemoveDataNodePlan;
46
import org.apache.iotdb.confignode.consensus.request.write.datanode.UpdateDataNodePlan;
47
import org.apache.iotdb.confignode.consensus.response.datanode.ConfigurationResp;
48
import org.apache.iotdb.confignode.consensus.response.datanode.DataNodeConfigurationResp;
49
import org.apache.iotdb.confignode.consensus.response.datanode.DataNodeRegisterResp;
50
import org.apache.iotdb.confignode.consensus.response.datanode.DataNodeToStatusResp;
51
import org.apache.iotdb.confignode.manager.ConfigManager;
52
import org.apache.iotdb.confignode.manager.IManager;
53
import org.apache.iotdb.confignode.manager.TriggerManager;
54
import org.apache.iotdb.confignode.manager.UDFManager;
55
import org.apache.iotdb.confignode.manager.consensus.ConsensusManager;
56
import org.apache.iotdb.confignode.manager.load.LoadManager;
57
import org.apache.iotdb.confignode.manager.load.cache.node.ConfigNodeHeartbeatCache;
58
import org.apache.iotdb.confignode.manager.partition.PartitionManager;
59
import org.apache.iotdb.confignode.manager.partition.PartitionMetrics;
60
import org.apache.iotdb.confignode.manager.pipe.PipeManager;
61
import org.apache.iotdb.confignode.manager.schema.ClusterSchemaManager;
62
import org.apache.iotdb.confignode.persistence.node.NodeInfo;
63
import org.apache.iotdb.confignode.procedure.env.DataNodeRemoveHandler;
64
import org.apache.iotdb.confignode.rpc.thrift.TCQConfig;
65
import org.apache.iotdb.confignode.rpc.thrift.TConfigNodeInfo;
66
import org.apache.iotdb.confignode.rpc.thrift.TConfigNodeRegisterReq;
67
import org.apache.iotdb.confignode.rpc.thrift.TConfigNodeRegisterResp;
68
import org.apache.iotdb.confignode.rpc.thrift.TDataNodeInfo;
69
import org.apache.iotdb.confignode.rpc.thrift.TDataNodeRegisterReq;
70
import org.apache.iotdb.confignode.rpc.thrift.TDataNodeRestartReq;
71
import org.apache.iotdb.confignode.rpc.thrift.TDataNodeRestartResp;
72
import org.apache.iotdb.confignode.rpc.thrift.TGlobalConfig;
73
import org.apache.iotdb.confignode.rpc.thrift.TNodeVersionInfo;
74
import org.apache.iotdb.confignode.rpc.thrift.TRatisConfig;
75
import org.apache.iotdb.confignode.rpc.thrift.TRuntimeConfiguration;
76
import org.apache.iotdb.confignode.rpc.thrift.TSetDataNodeStatusReq;
77
import org.apache.iotdb.consensus.common.DataSet;
78
import org.apache.iotdb.consensus.common.Peer;
79
import org.apache.iotdb.consensus.common.response.ConsensusGenericResponse;
80
import org.apache.iotdb.consensus.common.response.ConsensusWriteResponse;
81
import org.apache.iotdb.rpc.RpcUtils;
82
import org.apache.iotdb.rpc.TSStatusCode;
83

84
import org.slf4j.Logger;
85
import org.slf4j.LoggerFactory;
86

87
import java.util.ArrayList;
88
import java.util.Comparator;
89
import java.util.HashMap;
90
import java.util.List;
91
import java.util.Map;
92
import java.util.Optional;
93
import java.util.Set;
94
import java.util.concurrent.ConcurrentHashMap;
95
import java.util.concurrent.atomic.AtomicInteger;
96
import java.util.concurrent.locks.ReentrantLock;
97

98
/** {@link NodeManager} manages cluster node addition and removal requests. */
99
public class NodeManager {
100

101
  private static final Logger LOGGER = LoggerFactory.getLogger(NodeManager.class);
×
102

103
  private static final ConfigNodeConfig CONF = ConfigNodeDescriptor.getInstance().getConf();
×
104
  public static final long HEARTBEAT_INTERVAL = CONF.getHeartbeatIntervalInMs();
×
105

106
  private final IManager configManager;
107
  private final NodeInfo nodeInfo;
108

109
  private final ReentrantLock removeConfigNodeLock;
110

111
  public NodeManager(IManager configManager, NodeInfo nodeInfo) {
×
112
    this.configManager = configManager;
×
113
    this.nodeInfo = nodeInfo;
×
114
    this.removeConfigNodeLock = new ReentrantLock();
×
115
  }
×
116

117
  /**
118
   * Get system configurations.
119
   *
120
   * @return ConfigurationResp. The TSStatus will be set to SUCCESS_STATUS.
121
   */
122
  public DataSet getSystemConfiguration() {
123
    ConfigurationResp dataSet = new ConfigurationResp();
×
124
    dataSet.setStatus(RpcUtils.getStatus(TSStatusCode.SUCCESS_STATUS));
×
125
    setGlobalConfig(dataSet);
×
126
    setRatisConfig(dataSet);
×
127
    setCQConfig(dataSet);
×
128
    return dataSet;
×
129
  }
130

131
  private void setGlobalConfig(ConfigurationResp dataSet) {
132
    // Set TGlobalConfig
133
    final ConfigNodeConfig configNodeConfig = ConfigNodeDescriptor.getInstance().getConf();
×
134
    final CommonConfig commonConfig = CommonDescriptor.getInstance().getConfig();
×
135
    TGlobalConfig globalConfig = new TGlobalConfig();
×
136
    globalConfig.setDataRegionConsensusProtocolClass(
×
137
        configNodeConfig.getDataRegionConsensusProtocolClass());
×
138
    globalConfig.setSchemaRegionConsensusProtocolClass(
×
139
        configNodeConfig.getSchemaRegionConsensusProtocolClass());
×
140
    globalConfig.setSeriesPartitionSlotNum(configNodeConfig.getSeriesSlotNum());
×
141
    globalConfig.setSeriesPartitionExecutorClass(
×
142
        configNodeConfig.getSeriesPartitionExecutorClass());
×
143
    globalConfig.setTimePartitionInterval(commonConfig.getTimePartitionInterval());
×
144
    globalConfig.setReadConsistencyLevel(configNodeConfig.getReadConsistencyLevel());
×
145
    globalConfig.setDiskSpaceWarningThreshold(commonConfig.getDiskSpaceWarningThreshold());
×
146
    globalConfig.setTimestampPrecision(commonConfig.getTimestampPrecision());
×
147
    globalConfig.setSchemaEngineMode(commonConfig.getSchemaEngineMode());
×
148
    globalConfig.setTagAttributeTotalSize(commonConfig.getTagAttributeTotalSize());
×
149
    dataSet.setGlobalConfig(globalConfig);
×
150
  }
×
151

152
  private void setRatisConfig(ConfigurationResp dataSet) {
153
    ConfigNodeConfig conf = ConfigNodeDescriptor.getInstance().getConf();
×
154
    TRatisConfig ratisConfig = new TRatisConfig();
×
155

156
    ratisConfig.setDataAppenderBufferSize(conf.getDataRegionRatisConsensusLogAppenderBufferSize());
×
157
    ratisConfig.setSchemaAppenderBufferSize(
×
158
        conf.getSchemaRegionRatisConsensusLogAppenderBufferSize());
×
159

160
    ratisConfig.setDataSnapshotTriggerThreshold(conf.getDataRegionRatisSnapshotTriggerThreshold());
×
161
    ratisConfig.setSchemaSnapshotTriggerThreshold(
×
162
        conf.getSchemaRegionRatisSnapshotTriggerThreshold());
×
163

164
    ratisConfig.setDataLogUnsafeFlushEnable(conf.isDataRegionRatisLogUnsafeFlushEnable());
×
165
    ratisConfig.setSchemaLogUnsafeFlushEnable(conf.isSchemaRegionRatisLogUnsafeFlushEnable());
×
166
    ratisConfig.setDataRegionLogForceSyncNum(conf.getDataRegionRatisLogForceSyncNum());
×
167

168
    ratisConfig.setDataLogSegmentSizeMax(conf.getDataRegionRatisLogSegmentSizeMax());
×
169
    ratisConfig.setSchemaLogSegmentSizeMax(conf.getSchemaRegionRatisLogSegmentSizeMax());
×
170

171
    ratisConfig.setDataGrpcFlowControlWindow(conf.getDataRegionRatisGrpcFlowControlWindow());
×
172
    ratisConfig.setSchemaGrpcFlowControlWindow(conf.getSchemaRegionRatisGrpcFlowControlWindow());
×
173
    ratisConfig.setDataRegionGrpcLeaderOutstandingAppendsMax(
×
174
        conf.getDataRegionRatisGrpcLeaderOutstandingAppendsMax());
×
175

176
    ratisConfig.setDataLeaderElectionTimeoutMin(
×
177
        conf.getDataRegionRatisRpcLeaderElectionTimeoutMinMs());
×
178
    ratisConfig.setSchemaLeaderElectionTimeoutMin(
×
179
        conf.getSchemaRegionRatisRpcLeaderElectionTimeoutMinMs());
×
180

181
    ratisConfig.setDataLeaderElectionTimeoutMax(
×
182
        conf.getDataRegionRatisRpcLeaderElectionTimeoutMaxMs());
×
183
    ratisConfig.setSchemaLeaderElectionTimeoutMax(
×
184
        conf.getSchemaRegionRatisRpcLeaderElectionTimeoutMaxMs());
×
185

186
    ratisConfig.setDataRequestTimeout(conf.getDataRegionRatisRequestTimeoutMs());
×
187
    ratisConfig.setSchemaRequestTimeout(conf.getSchemaRegionRatisRequestTimeoutMs());
×
188

189
    ratisConfig.setDataMaxRetryAttempts(conf.getDataRegionRatisMaxRetryAttempts());
×
190
    ratisConfig.setDataInitialSleepTime(conf.getDataRegionRatisInitialSleepTimeMs());
×
191
    ratisConfig.setDataMaxSleepTime(conf.getDataRegionRatisMaxSleepTimeMs());
×
192
    ratisConfig.setSchemaMaxRetryAttempts(conf.getSchemaRegionRatisMaxRetryAttempts());
×
193
    ratisConfig.setSchemaInitialSleepTime(conf.getSchemaRegionRatisInitialSleepTimeMs());
×
194
    ratisConfig.setSchemaMaxSleepTime(conf.getSchemaRegionRatisMaxSleepTimeMs());
×
195

196
    ratisConfig.setSchemaPreserveWhenPurge(conf.getSchemaRegionRatisPreserveLogsWhenPurge());
×
197
    ratisConfig.setDataPreserveWhenPurge(conf.getDataRegionRatisPreserveLogsWhenPurge());
×
198

199
    ratisConfig.setFirstElectionTimeoutMin(conf.getRatisFirstElectionTimeoutMinMs());
×
200
    ratisConfig.setFirstElectionTimeoutMax(conf.getRatisFirstElectionTimeoutMaxMs());
×
201

202
    ratisConfig.setSchemaRegionRatisLogMax(conf.getSchemaRegionRatisLogMax());
×
203
    ratisConfig.setDataRegionRatisLogMax(conf.getDataRegionRatisLogMax());
×
204

205
    dataSet.setRatisConfig(ratisConfig);
×
206
  }
×
207

208
  private void setCQConfig(ConfigurationResp dataSet) {
209
    final ConfigNodeConfig conf = ConfigNodeDescriptor.getInstance().getConf();
×
210
    TCQConfig cqConfig = new TCQConfig();
×
211
    cqConfig.setCqMinEveryIntervalInMs(conf.getCqMinEveryIntervalInMs());
×
212

213
    dataSet.setCqConfig(cqConfig);
×
214
  }
×
215

216
  private TRuntimeConfiguration getRuntimeConfiguration() {
217
    getPipeManager().getPipePluginCoordinator().lock();
×
218
    getTriggerManager().getTriggerInfo().acquireTriggerTableLock();
×
219
    getUDFManager().getUdfInfo().acquireUDFTableLock();
×
220
    try {
221
      final TRuntimeConfiguration runtimeConfiguration = new TRuntimeConfiguration();
×
222
      runtimeConfiguration.setTemplateInfo(getClusterSchemaManager().getAllTemplateSetInfo());
×
223
      runtimeConfiguration.setAllTriggerInformation(
×
224
          getTriggerManager().getTriggerTable(false).getAllTriggerInformation());
×
225
      runtimeConfiguration.setAllUDFInformation(
×
226
          getUDFManager().getUDFTable().getAllUDFInformation());
×
227
      runtimeConfiguration.setAllPipeInformation(
×
228
          getPipeManager().getPipePluginCoordinator().getPipePluginTable().getAllPipePluginMeta());
×
229
      runtimeConfiguration.setAllTTLInformation(
×
230
          DataNodeRegisterResp.convertAllTTLInformation(getClusterSchemaManager().getAllTTLInfo()));
×
231
      return runtimeConfiguration;
×
232
    } finally {
233
      getTriggerManager().getTriggerInfo().releaseTriggerTableLock();
×
234
      getUDFManager().getUdfInfo().releaseUDFTableLock();
×
235
      getPipeManager().getPipePluginCoordinator().unlock();
×
236
    }
237
  }
238

239
  /**
240
   * Register DataNode.
241
   *
242
   * @param req TDataNodeRegisterReq
243
   * @return DataNodeConfigurationDataSet. The {@link TSStatus} will be set to {@link
244
   *     TSStatusCode#SUCCESS_STATUS} when register success.
245
   */
246
  public DataSet registerDataNode(TDataNodeRegisterReq req) {
247
    int dataNodeId = nodeInfo.generateNextNodeId();
×
248

249
    RegisterDataNodePlan registerDataNodePlan =
×
250
        new RegisterDataNodePlan(req.getDataNodeConfiguration());
×
251
    // Register new DataNode
252
    registerDataNodePlan.getDataNodeConfiguration().getLocation().setDataNodeId(dataNodeId);
×
253
    getConsensusManager().write(registerDataNodePlan);
×
254

255
    // update datanode's versionInfo
256
    UpdateVersionInfoPlan updateVersionInfoPlan =
×
257
        new UpdateVersionInfoPlan(req.getVersionInfo(), dataNodeId);
×
258
    getConsensusManager().write(updateVersionInfoPlan);
×
259

260
    // Bind DataNode metrics
261
    PartitionMetrics.bindDataNodePartitionMetrics(
×
262
        MetricService.getInstance(), configManager, dataNodeId);
×
263

264
    // Adjust the maximum RegionGroup number of each StorageGroup
265
    getClusterSchemaManager().adjustMaxRegionGroupNum();
×
266

267
    DataNodeRegisterResp resp = new DataNodeRegisterResp();
×
268

269
    resp.setStatus(ClusterNodeStartUtils.ACCEPT_NODE_REGISTRATION);
×
270
    resp.setConfigNodeList(getRegisteredConfigNodes());
×
271
    resp.setDataNodeId(
×
272
        registerDataNodePlan.getDataNodeConfiguration().getLocation().getDataNodeId());
×
273
    resp.setRuntimeConfiguration(getRuntimeConfiguration());
×
274
    return resp;
×
275
  }
276

277
  public TDataNodeRestartResp updateDataNodeIfNecessary(TDataNodeRestartReq req) {
278
    int nodeId = req.getDataNodeConfiguration().getLocation().getDataNodeId();
×
279
    TDataNodeConfiguration dataNodeConfiguration = getRegisteredDataNode(nodeId);
×
280
    if (!req.getDataNodeConfiguration().equals(dataNodeConfiguration)) {
×
281
      // Update DataNodeConfiguration when modified during restart
282
      UpdateDataNodePlan updateDataNodePlan =
×
283
          new UpdateDataNodePlan(req.getDataNodeConfiguration());
×
284
      getConsensusManager().write(updateDataNodePlan);
×
285
    }
286
    TNodeVersionInfo versionInfo = nodeInfo.getVersionInfo(nodeId);
×
287
    if (!req.getVersionInfo().equals(versionInfo)) {
×
288
      // Update versionInfo when modified during restart
289
      UpdateVersionInfoPlan updateVersionInfoPlan =
×
290
          new UpdateVersionInfoPlan(req.getVersionInfo(), nodeId);
×
291
      getConsensusManager().write(updateVersionInfoPlan);
×
292
    }
293

294
    TDataNodeRestartResp resp = new TDataNodeRestartResp();
×
295
    resp.setStatus(ClusterNodeStartUtils.ACCEPT_NODE_RESTART);
×
296
    resp.setConfigNodeList(getRegisteredConfigNodes());
×
297
    resp.setRuntimeConfiguration(getRuntimeConfiguration());
×
298
    return resp;
×
299
  }
300

301
  /**
302
   * Remove DataNodes.
303
   *
304
   * @param removeDataNodePlan removeDataNodePlan
305
   * @return DataNodeToStatusResp, The TSStatus will be SUCCEED_STATUS if the request is accepted,
306
   *     DATANODE_NOT_EXIST when some datanode does not exist.
307
   */
308
  public DataSet removeDataNode(RemoveDataNodePlan removeDataNodePlan) {
309
    LOGGER.info("NodeManager start to remove DataNode {}", removeDataNodePlan);
×
310

311
    DataNodeRemoveHandler dataNodeRemoveHandler =
×
312
        new DataNodeRemoveHandler((ConfigManager) configManager);
313
    DataNodeToStatusResp preCheckStatus =
×
314
        dataNodeRemoveHandler.checkRemoveDataNodeRequest(removeDataNodePlan);
×
315
    if (preCheckStatus.getStatus().getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
×
316
      LOGGER.error(
×
317
          "The remove DataNode request check failed. req: {}, check result: {}",
318
          removeDataNodePlan,
319
          preCheckStatus.getStatus());
×
320
      return preCheckStatus;
×
321
    }
322

323
    // Do transfer of the DataNodes before remove
324
    DataNodeToStatusResp dataSet = new DataNodeToStatusResp();
×
325
    if (configManager.transfer(removeDataNodePlan.getDataNodeLocations()).getCode()
×
326
        != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
×
327
      dataSet.setStatus(
×
328
          new TSStatus(TSStatusCode.REMOVE_DATANODE_ERROR.getStatusCode())
×
329
              .setMessage("Fail to do transfer of the DataNodes"));
×
330
      return dataSet;
×
331
    }
332

333
    // Add request to queue, then return to client
334
    boolean removeSucceed = configManager.getProcedureManager().removeDataNode(removeDataNodePlan);
×
335
    TSStatus status;
336
    if (removeSucceed) {
×
337
      status = new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode());
×
338
      status.setMessage("Server accepted the request");
×
339
    } else {
340
      status = new TSStatus(TSStatusCode.REMOVE_DATANODE_ERROR.getStatusCode());
×
341
      status.setMessage("Server rejected the request, maybe requests are too many");
×
342
    }
343
    dataSet.setStatus(status);
×
344

345
    LOGGER.info(
×
346
        "NodeManager submit RemoveDataNodePlan finished, removeDataNodePlan: {}",
347
        removeDataNodePlan);
348
    return dataSet;
×
349
  }
350

351
  public TConfigNodeRegisterResp registerConfigNode(TConfigNodeRegisterReq req) {
352
    int nodeId = nodeInfo.generateNextNodeId();
×
353
    req.getConfigNodeLocation().setConfigNodeId(nodeId);
×
354
    configManager.getProcedureManager().addConfigNode(req);
×
355
    return new TConfigNodeRegisterResp()
×
356
        .setStatus(ClusterNodeStartUtils.ACCEPT_NODE_REGISTRATION)
×
357
        .setConfigNodeId(nodeId);
×
358
  }
359

360
  public TSStatus updateConfigNodeIfNecessary(int configNodeId, TNodeVersionInfo versionInfo) {
361
    TNodeVersionInfo recordVersionInfo = nodeInfo.getVersionInfo(configNodeId);
×
362
    if (!recordVersionInfo.equals(versionInfo)) {
×
363
      // Update versionInfo when modified during restart
364
      UpdateVersionInfoPlan updateConfigNodePlan =
×
365
          new UpdateVersionInfoPlan(versionInfo, configNodeId);
366
      ConsensusWriteResponse result = getConsensusManager().write(updateConfigNodePlan);
×
367
      if (result.getException() != null) {
×
368
        return new TSStatus(TSStatusCode.CONSENSUS_NOT_INITIALIZED.getStatusCode());
×
369
      }
370
      return result.getStatus();
×
371
    }
372
    return ClusterNodeStartUtils.ACCEPT_NODE_RESTART;
×
373
  }
374

375
  /**
376
   * Get TDataNodeConfiguration.
377
   *
378
   * @param req GetDataNodeConfigurationPlan
379
   * @return The specific DataNode's configuration or all DataNodes' configuration if dataNodeId in
380
   *     GetDataNodeConfigurationPlan is -1
381
   */
382
  public DataNodeConfigurationResp getDataNodeConfiguration(GetDataNodeConfigurationPlan req) {
383
    return (DataNodeConfigurationResp) getConsensusManager().read(req).getDataset();
×
384
  }
385

386
  /**
387
   * Only leader use this interface.
388
   *
389
   * @return The number of registered DataNodes
390
   */
391
  public int getRegisteredDataNodeCount() {
392
    return nodeInfo.getRegisteredDataNodeCount();
×
393
  }
394

395
  /**
396
   * Only leader use this interface.
397
   *
398
   * @return All registered DataNodes
399
   */
400
  public List<TDataNodeConfiguration> getRegisteredDataNodes() {
401
    return nodeInfo.getRegisteredDataNodes();
×
402
  }
403

404
  /**
405
   * Only leader use this interface.
406
   *
407
   * <p>Notice: The result will be an empty TDataNodeConfiguration if the specified DataNode doesn't
408
   * register
409
   *
410
   * @param dataNodeId The specified DataNode's index
411
   * @return The specified registered DataNode
412
   */
413
  public TDataNodeConfiguration getRegisteredDataNode(int dataNodeId) {
414
    return nodeInfo.getRegisteredDataNode(dataNodeId);
×
415
  }
416

417
  public Map<Integer, TDataNodeLocation> getRegisteredDataNodeLocations() {
418
    Map<Integer, TDataNodeLocation> dataNodeLocations = new ConcurrentHashMap<>();
×
419
    nodeInfo
×
420
        .getRegisteredDataNodes()
×
421
        .forEach(
×
422
            dataNodeConfiguration ->
423
                dataNodeLocations.put(
×
424
                    dataNodeConfiguration.getLocation().getDataNodeId(),
×
425
                    dataNodeConfiguration.getLocation()));
×
426
    return dataNodeLocations;
×
427
  }
428

429
  public List<TDataNodeInfo> getRegisteredDataNodeInfoList() {
430
    List<TDataNodeInfo> dataNodeInfoList = new ArrayList<>();
×
431
    List<TDataNodeConfiguration> registeredDataNodes = this.getRegisteredDataNodes();
×
432
    if (registeredDataNodes != null) {
×
433
      registeredDataNodes.forEach(
×
434
          (registeredDataNode) -> {
435
            TDataNodeInfo dataNodeInfo = new TDataNodeInfo();
×
436
            int dataNodeId = registeredDataNode.getLocation().getDataNodeId();
×
437
            dataNodeInfo.setDataNodeId(dataNodeId);
×
438
            dataNodeInfo.setStatus(getLoadManager().getNodeStatusWithReason(dataNodeId));
×
439
            dataNodeInfo.setRpcAddresss(
×
440
                registeredDataNode.getLocation().getClientRpcEndPoint().getIp());
×
441
            dataNodeInfo.setRpcPort(
×
442
                registeredDataNode.getLocation().getClientRpcEndPoint().getPort());
×
443
            dataNodeInfo.setDataRegionNum(0);
×
444
            dataNodeInfo.setSchemaRegionNum(0);
×
445
            dataNodeInfo.setCpuCoreNum(registeredDataNode.getResource().getCpuCoreNum());
×
446
            dataNodeInfoList.add(dataNodeInfo);
×
447
          });
×
448
    }
449

450
    // Map<DataNodeId, DataRegionNum>
451
    Map<Integer, AtomicInteger> dataRegionNumMap = new HashMap<>();
×
452
    // Map<DataNodeId, SchemaRegionNum>
453
    Map<Integer, AtomicInteger> schemaRegionNumMap = new HashMap<>();
×
454
    List<TRegionReplicaSet> regionReplicaSets = getPartitionManager().getAllReplicaSets();
×
455
    regionReplicaSets.forEach(
×
456
        regionReplicaSet ->
457
            regionReplicaSet
×
458
                .getDataNodeLocations()
×
459
                .forEach(
×
460
                    dataNodeLocation -> {
461
                      switch (regionReplicaSet.getRegionId().getType()) {
×
462
                        case SchemaRegion:
463
                          schemaRegionNumMap
×
464
                              .computeIfAbsent(
×
465
                                  dataNodeLocation.getDataNodeId(), key -> new AtomicInteger())
×
466
                              .getAndIncrement();
×
467
                          break;
×
468
                        case DataRegion:
469
                        default:
470
                          dataRegionNumMap
×
471
                              .computeIfAbsent(
×
472
                                  dataNodeLocation.getDataNodeId(), key -> new AtomicInteger())
×
473
                              .getAndIncrement();
×
474
                      }
475
                    }));
×
476
    AtomicInteger zero = new AtomicInteger(0);
×
477
    dataNodeInfoList.forEach(
×
478
        (dataNodesInfo -> {
479
          dataNodesInfo.setSchemaRegionNum(
×
480
              schemaRegionNumMap.getOrDefault(dataNodesInfo.getDataNodeId(), zero).get());
×
481
          dataNodesInfo.setDataRegionNum(
×
482
              dataRegionNumMap.getOrDefault(dataNodesInfo.getDataNodeId(), zero).get());
×
483
        }));
×
484

485
    dataNodeInfoList.sort(Comparator.comparingInt(TDataNodeInfo::getDataNodeId));
×
486
    return dataNodeInfoList;
×
487
  }
488

489
  public List<TConfigNodeLocation> getRegisteredConfigNodes() {
490
    return nodeInfo.getRegisteredConfigNodes();
×
491
  }
492

493
  public Map<Integer, TNodeVersionInfo> getNodeVersionInfo() {
494
    return nodeInfo.getNodeVersionInfo();
×
495
  }
496

497
  public List<TConfigNodeInfo> getRegisteredConfigNodeInfoList() {
498
    List<TConfigNodeInfo> configNodeInfoList = new ArrayList<>();
×
499
    List<TConfigNodeLocation> registeredConfigNodes = this.getRegisteredConfigNodes();
×
500
    if (registeredConfigNodes != null) {
×
501
      registeredConfigNodes.forEach(
×
502
          (configNodeLocation) -> {
503
            TConfigNodeInfo info = new TConfigNodeInfo();
×
504
            int configNodeId = configNodeLocation.getConfigNodeId();
×
505
            info.setConfigNodeId(configNodeId);
×
506
            info.setStatus(getLoadManager().getNodeStatusWithReason(configNodeId));
×
507
            info.setInternalAddress(configNodeLocation.getInternalEndPoint().getIp());
×
508
            info.setInternalPort(configNodeLocation.getInternalEndPoint().getPort());
×
509
            info.setRoleType(
×
510
                configNodeLocation.getConfigNodeId() == ConfigNodeHeartbeatCache.CURRENT_NODE_ID
×
511
                    ? RegionRoleType.Leader.name()
×
512
                    : RegionRoleType.Follower.name());
×
513
            configNodeInfoList.add(info);
×
514
          });
×
515
    }
516
    configNodeInfoList.sort(Comparator.comparingInt(TConfigNodeInfo::getConfigNodeId));
×
517
    return configNodeInfoList;
×
518
  }
519

520
  /**
521
   * Only leader use this interface, record the new ConfigNode's information.
522
   *
523
   * @param configNodeLocation The new ConfigNode.
524
   * @param versionInfo The new ConfigNode's versionInfo.
525
   */
526
  public void applyConfigNode(
527
      TConfigNodeLocation configNodeLocation, TNodeVersionInfo versionInfo) {
528
    ApplyConfigNodePlan applyConfigNodePlan = new ApplyConfigNodePlan(configNodeLocation);
×
529
    getConsensusManager().write(applyConfigNodePlan);
×
530
    UpdateVersionInfoPlan updateVersionInfoPlan =
×
531
        new UpdateVersionInfoPlan(versionInfo, configNodeLocation.getConfigNodeId());
×
532
    getConsensusManager().write(updateVersionInfoPlan);
×
533
  }
×
534

535
  /**
536
   * Only leader use this interface, check the ConfigNode before remove it.
537
   *
538
   * @param removeConfigNodePlan RemoveConfigNodePlan
539
   */
540
  public TSStatus checkConfigNodeBeforeRemove(RemoveConfigNodePlan removeConfigNodePlan) {
541
    removeConfigNodeLock.lock();
×
542
    try {
543
      // Check OnlineConfigNodes number
544
      if (filterConfigNodeThroughStatus(NodeStatus.Running).size() <= 1) {
×
545
        return new TSStatus(TSStatusCode.REMOVE_CONFIGNODE_ERROR.getStatusCode())
×
546
            .setMessage(
×
547
                "Remove ConfigNode failed because there is only one ConfigNode in current Cluster.");
548
      }
549

550
      // Check whether the registeredConfigNodes contain the ConfigNode to be removed.
551
      if (!getRegisteredConfigNodes().contains(removeConfigNodePlan.getConfigNodeLocation())) {
×
552
        return new TSStatus(TSStatusCode.REMOVE_CONFIGNODE_ERROR.getStatusCode())
×
553
            .setMessage("Remove ConfigNode failed because the ConfigNode not in current Cluster.");
×
554
      }
555

556
      // Check whether the remove ConfigNode is leader
557
      TConfigNodeLocation leader = getConsensusManager().getLeader();
×
558
      if (leader == null) {
×
559
        return new TSStatus(TSStatusCode.REMOVE_CONFIGNODE_ERROR.getStatusCode())
×
560
            .setMessage(
×
561
                "Remove ConfigNode failed because the ConfigNodeGroup is on leader election, please retry.");
562
      }
563

564
      if (leader
×
565
          .getInternalEndPoint()
×
566
          .equals(removeConfigNodePlan.getConfigNodeLocation().getInternalEndPoint())) {
×
567
        // transfer leader
568
        return transferLeader(removeConfigNodePlan, getConsensusManager().getConsensusGroupId());
×
569
      }
570

571
    } finally {
572
      removeConfigNodeLock.unlock();
×
573
    }
574

575
    return new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode())
×
576
        .setMessage("Successfully remove confignode.");
×
577
  }
578

579
  private TSStatus transferLeader(
580
      RemoveConfigNodePlan removeConfigNodePlan, ConsensusGroupId groupId) {
581
    Optional<TConfigNodeLocation> optional =
×
582
        filterConfigNodeThroughStatus(NodeStatus.Running).stream()
×
583
            .filter(e -> !e.equals(removeConfigNodePlan.getConfigNodeLocation()))
×
584
            .findAny();
×
585
    TConfigNodeLocation newLeader = null;
×
586
    if (optional.isPresent()) {
×
587
      newLeader = optional.get();
×
588
    } else {
589
      return new TSStatus(TSStatusCode.TRANSFER_LEADER_ERROR.getStatusCode())
×
590
          .setMessage(
×
591
              "Transfer ConfigNode leader failed because can not find any running ConfigNode.");
592
    }
593
    ConsensusGenericResponse resp =
×
594
        getConsensusManager()
×
595
            .getConsensusImpl()
×
596
            .transferLeader(
×
597
                groupId,
598
                new Peer(groupId, newLeader.getConfigNodeId(), newLeader.getConsensusEndPoint()));
×
599
    if (!resp.isSuccess()) {
×
600
      return new TSStatus(TSStatusCode.REMOVE_CONFIGNODE_ERROR.getStatusCode())
×
601
          .setMessage("Remove ConfigNode failed because transfer ConfigNode leader failed.");
×
602
    }
603
    return new TSStatus(TSStatusCode.REDIRECTION_RECOMMEND.getStatusCode())
×
604
        .setRedirectNode(newLeader.getInternalEndPoint())
×
605
        .setMessage(
×
606
            "The ConfigNode to be removed is leader, already transfer Leader to "
607
                + newLeader
608
                + ".");
609
  }
610

611
  public List<TSStatus> merge() {
612
    Map<Integer, TDataNodeLocation> dataNodeLocationMap =
×
613
        configManager.getNodeManager().getRegisteredDataNodeLocations();
×
614
    AsyncClientHandler<Object, TSStatus> clientHandler =
×
615
        new AsyncClientHandler<>(DataNodeRequestType.MERGE, dataNodeLocationMap);
616
    AsyncDataNodeClientPool.getInstance().sendAsyncRequestToDataNodeWithRetry(clientHandler);
×
617
    return clientHandler.getResponseList();
×
618
  }
619

620
  public List<TSStatus> flush(TFlushReq req) {
621
    Map<Integer, TDataNodeLocation> dataNodeLocationMap =
×
622
        configManager.getNodeManager().getRegisteredDataNodeLocations();
×
623
    AsyncClientHandler<TFlushReq, TSStatus> clientHandler =
×
624
        new AsyncClientHandler<>(DataNodeRequestType.FLUSH, req, dataNodeLocationMap);
625
    AsyncDataNodeClientPool.getInstance().sendAsyncRequestToDataNodeWithRetry(clientHandler);
×
626
    return clientHandler.getResponseList();
×
627
  }
628

629
  public List<TSStatus> clearCache() {
630
    Map<Integer, TDataNodeLocation> dataNodeLocationMap =
×
631
        configManager.getNodeManager().getRegisteredDataNodeLocations();
×
632
    AsyncClientHandler<Object, TSStatus> clientHandler =
×
633
        new AsyncClientHandler<>(DataNodeRequestType.CLEAR_CACHE, dataNodeLocationMap);
634
    AsyncDataNodeClientPool.getInstance().sendAsyncRequestToDataNodeWithRetry(clientHandler);
×
635
    return clientHandler.getResponseList();
×
636
  }
637

638
  public List<TSStatus> loadConfiguration() {
639
    Map<Integer, TDataNodeLocation> dataNodeLocationMap =
×
640
        configManager.getNodeManager().getRegisteredDataNodeLocations();
×
641
    AsyncClientHandler<Object, TSStatus> clientHandler =
×
642
        new AsyncClientHandler<>(DataNodeRequestType.LOAD_CONFIGURATION, dataNodeLocationMap);
643
    AsyncDataNodeClientPool.getInstance().sendAsyncRequestToDataNodeWithRetry(clientHandler);
×
644
    return clientHandler.getResponseList();
×
645
  }
646

647
  public List<TSStatus> setSystemStatus(String status) {
648
    Map<Integer, TDataNodeLocation> dataNodeLocationMap =
×
649
        configManager.getNodeManager().getRegisteredDataNodeLocations();
×
650
    AsyncClientHandler<String, TSStatus> clientHandler =
×
651
        new AsyncClientHandler<>(
652
            DataNodeRequestType.SET_SYSTEM_STATUS, status, dataNodeLocationMap);
653
    AsyncDataNodeClientPool.getInstance().sendAsyncRequestToDataNodeWithRetry(clientHandler);
×
654
    return clientHandler.getResponseList();
×
655
  }
656

657
  public TSStatus setDataNodeStatus(TSetDataNodeStatusReq setDataNodeStatusReq) {
658
    return SyncDataNodeClientPool.getInstance()
×
659
        .sendSyncRequestToDataNodeWithRetry(
×
660
            setDataNodeStatusReq.getTargetDataNode().getInternalEndPoint(),
×
661
            setDataNodeStatusReq.getStatus(),
×
662
            DataNodeRequestType.SET_SYSTEM_STATUS);
663
  }
664

665
  /**
666
   * Kill read on DataNode.
667
   *
668
   * @param queryId the id of specific read need to be killed, it will be NULL if kill all queries
669
   * @param dataNodeId the DataNode obtains target read, -1 means we will kill all queries on all
670
   *     DataNodes
671
   */
672
  public TSStatus killQuery(String queryId, int dataNodeId) {
673
    if (dataNodeId < 0) {
×
674
      return killAllQueries();
×
675
    } else {
676
      return killSpecificQuery(queryId, getRegisteredDataNodeLocations().get(dataNodeId));
×
677
    }
678
  }
679

680
  private TSStatus killAllQueries() {
681
    Map<Integer, TDataNodeLocation> dataNodeLocationMap =
×
682
        configManager.getNodeManager().getRegisteredDataNodeLocations();
×
683
    AsyncClientHandler<String, TSStatus> clientHandler =
×
684
        new AsyncClientHandler<>(DataNodeRequestType.KILL_QUERY_INSTANCE, dataNodeLocationMap);
685
    AsyncDataNodeClientPool.getInstance().sendAsyncRequestToDataNodeWithRetry(clientHandler);
×
686
    return RpcUtils.squashResponseStatusList(clientHandler.getResponseList());
×
687
  }
688

689
  private TSStatus killSpecificQuery(String queryId, TDataNodeLocation dataNodeLocation) {
690
    if (dataNodeLocation == null) {
×
691
      return new TSStatus(TSStatusCode.INTERNAL_SERVER_ERROR.getStatusCode())
×
692
          .setMessage(
×
693
              "The target DataNode is not existed, please ensure your input <queryId> is correct");
694
    } else {
695
      return SyncDataNodeClientPool.getInstance()
×
696
          .sendSyncRequestToDataNodeWithRetry(
×
697
              dataNodeLocation.getInternalEndPoint(),
×
698
              queryId,
699
              DataNodeRequestType.KILL_QUERY_INSTANCE);
700
    }
701
  }
702

703
  /**
704
   * Filter ConfigNodes through the specified NodeStatus.
705
   *
706
   * @param status The specified NodeStatus
707
   * @return Filtered ConfigNodes with the specified NodeStatus
708
   */
709
  public List<TConfigNodeLocation> filterConfigNodeThroughStatus(NodeStatus... status) {
710
    return nodeInfo.getRegisteredConfigNodes(
×
711
        getLoadManager().filterConfigNodeThroughStatus(status));
×
712
  }
713

714
  /**
715
   * Filter DataNodes through the specified NodeStatus.
716
   *
717
   * @param status The specified NodeStatus
718
   * @return Filtered DataNodes with the specified NodeStatus
719
   */
720
  public List<TDataNodeConfiguration> filterDataNodeThroughStatus(NodeStatus... status) {
721
    return nodeInfo.getRegisteredDataNodes(getLoadManager().filterDataNodeThroughStatus(status));
×
722
  }
723

724
  /**
725
   * Get the DataNodeLocation of the DataNode which has the lowest loadScore.
726
   *
727
   * @return TDataNodeLocation with the lowest loadScore
728
   */
729
  public Optional<TDataNodeLocation> getLowestLoadDataNode() {
730
    // TODO get real lowest load data node after scoring algorithm being implemented
731
    int dataNodeId = getLoadManager().getLowestLoadDataNode();
×
732
    return dataNodeId < 0
×
733
        ? Optional.empty()
×
734
        : Optional.of(getRegisteredDataNode(dataNodeId).getLocation());
×
735
  }
736

737
  /**
738
   * Get the DataNodeLocation which has the lowest loadScore within input.
739
   *
740
   * @return TDataNodeLocation with the lowest loadScore
741
   */
742
  public TDataNodeLocation getLowestLoadDataNode(Set<Integer> nodes) {
743
    int dataNodeId = getLoadManager().getLowestLoadDataNode(new ArrayList<>(nodes));
×
744
    return getRegisteredDataNode(dataNodeId).getLocation();
×
745
  }
746

747
  private ConsensusManager getConsensusManager() {
748
    return configManager.getConsensusManager();
×
749
  }
750

751
  private ClusterSchemaManager getClusterSchemaManager() {
752
    return configManager.getClusterSchemaManager();
×
753
  }
754

755
  private PartitionManager getPartitionManager() {
756
    return configManager.getPartitionManager();
×
757
  }
758

759
  private LoadManager getLoadManager() {
760
    return configManager.getLoadManager();
×
761
  }
762

763
  private TriggerManager getTriggerManager() {
764
    return configManager.getTriggerManager();
×
765
  }
766

767
  private PipeManager getPipeManager() {
768
    return configManager.getPipeManager();
×
769
  }
770

771
  private UDFManager getUDFManager() {
772
    return configManager.getUDFManager();
×
773
  }
774
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc