• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

apache / iotdb / #9723

pending completion
#9723

push

travis_ci

web-flow
[To rel/1.2][IOTDB-6079] Cluster computing resource balance (#10746)

264 of 264 new or added lines in 11 files covered. (100.0%)

79280 of 165370 relevant lines covered (47.94%)

0.48 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

0.0
/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/node/NodeManager.java
1
/*
2
 * Licensed to the Apache Software Foundation (ASF) under one
3
 * or more contributor license agreements.  See the NOTICE file
4
 * distributed with this work for additional information
5
 * regarding copyright ownership.  The ASF licenses this file
6
 * to you under the Apache License, Version 2.0 (the
7
 * "License"); you may not use this file except in compliance
8
 * with the License.  You may obtain a copy of the License at
9
 *
10
 *     http://www.apache.org/licenses/LICENSE-2.0
11
 *
12
 * Unless required by applicable law or agreed to in writing,
13
 * software distributed under the License is distributed on an
14
 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15
 * KIND, either express or implied.  See the License for the
16
 * specific language governing permissions and limitations
17
 * under the License.
18
 */
19

20
package org.apache.iotdb.confignode.manager.node;
21

22
import org.apache.iotdb.common.rpc.thrift.TConfigNodeLocation;
23
import org.apache.iotdb.common.rpc.thrift.TDataNodeConfiguration;
24
import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation;
25
import org.apache.iotdb.common.rpc.thrift.TFlushReq;
26
import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet;
27
import org.apache.iotdb.common.rpc.thrift.TSStatus;
28
import org.apache.iotdb.commons.cluster.NodeStatus;
29
import org.apache.iotdb.commons.cluster.RegionRoleType;
30
import org.apache.iotdb.commons.conf.CommonConfig;
31
import org.apache.iotdb.commons.conf.CommonDescriptor;
32
import org.apache.iotdb.commons.consensus.ConsensusGroupId;
33
import org.apache.iotdb.commons.service.metric.MetricService;
34
import org.apache.iotdb.confignode.client.DataNodeRequestType;
35
import org.apache.iotdb.confignode.client.async.AsyncDataNodeClientPool;
36
import org.apache.iotdb.confignode.client.async.handlers.AsyncClientHandler;
37
import org.apache.iotdb.confignode.client.sync.SyncDataNodeClientPool;
38
import org.apache.iotdb.confignode.conf.ConfigNodeConfig;
39
import org.apache.iotdb.confignode.conf.ConfigNodeDescriptor;
40
import org.apache.iotdb.confignode.consensus.request.read.datanode.GetDataNodeConfigurationPlan;
41
import org.apache.iotdb.confignode.consensus.request.write.confignode.ApplyConfigNodePlan;
42
import org.apache.iotdb.confignode.consensus.request.write.confignode.RemoveConfigNodePlan;
43
import org.apache.iotdb.confignode.consensus.request.write.confignode.UpdateVersionInfoPlan;
44
import org.apache.iotdb.confignode.consensus.request.write.datanode.RegisterDataNodePlan;
45
import org.apache.iotdb.confignode.consensus.request.write.datanode.RemoveDataNodePlan;
46
import org.apache.iotdb.confignode.consensus.request.write.datanode.UpdateDataNodePlan;
47
import org.apache.iotdb.confignode.consensus.response.datanode.ConfigurationResp;
48
import org.apache.iotdb.confignode.consensus.response.datanode.DataNodeConfigurationResp;
49
import org.apache.iotdb.confignode.consensus.response.datanode.DataNodeRegisterResp;
50
import org.apache.iotdb.confignode.consensus.response.datanode.DataNodeToStatusResp;
51
import org.apache.iotdb.confignode.manager.ConfigManager;
52
import org.apache.iotdb.confignode.manager.IManager;
53
import org.apache.iotdb.confignode.manager.TriggerManager;
54
import org.apache.iotdb.confignode.manager.UDFManager;
55
import org.apache.iotdb.confignode.manager.consensus.ConsensusManager;
56
import org.apache.iotdb.confignode.manager.load.LoadManager;
57
import org.apache.iotdb.confignode.manager.load.cache.node.ConfigNodeHeartbeatCache;
58
import org.apache.iotdb.confignode.manager.partition.PartitionManager;
59
import org.apache.iotdb.confignode.manager.partition.PartitionMetrics;
60
import org.apache.iotdb.confignode.manager.pipe.PipeManager;
61
import org.apache.iotdb.confignode.manager.schema.ClusterSchemaManager;
62
import org.apache.iotdb.confignode.persistence.node.NodeInfo;
63
import org.apache.iotdb.confignode.procedure.env.DataNodeRemoveHandler;
64
import org.apache.iotdb.confignode.rpc.thrift.TCQConfig;
65
import org.apache.iotdb.confignode.rpc.thrift.TConfigNodeInfo;
66
import org.apache.iotdb.confignode.rpc.thrift.TConfigNodeRegisterReq;
67
import org.apache.iotdb.confignode.rpc.thrift.TConfigNodeRegisterResp;
68
import org.apache.iotdb.confignode.rpc.thrift.TDataNodeInfo;
69
import org.apache.iotdb.confignode.rpc.thrift.TDataNodeRegisterReq;
70
import org.apache.iotdb.confignode.rpc.thrift.TDataNodeRestartReq;
71
import org.apache.iotdb.confignode.rpc.thrift.TDataNodeRestartResp;
72
import org.apache.iotdb.confignode.rpc.thrift.TGlobalConfig;
73
import org.apache.iotdb.confignode.rpc.thrift.TNodeVersionInfo;
74
import org.apache.iotdb.confignode.rpc.thrift.TRatisConfig;
75
import org.apache.iotdb.confignode.rpc.thrift.TRuntimeConfiguration;
76
import org.apache.iotdb.confignode.rpc.thrift.TSetDataNodeStatusReq;
77
import org.apache.iotdb.consensus.common.DataSet;
78
import org.apache.iotdb.consensus.common.Peer;
79
import org.apache.iotdb.consensus.common.response.ConsensusGenericResponse;
80
import org.apache.iotdb.consensus.common.response.ConsensusWriteResponse;
81
import org.apache.iotdb.rpc.RpcUtils;
82
import org.apache.iotdb.rpc.TSStatusCode;
83

84
import org.slf4j.Logger;
85
import org.slf4j.LoggerFactory;
86

87
import java.util.ArrayList;
88
import java.util.Comparator;
89
import java.util.HashMap;
90
import java.util.List;
91
import java.util.Map;
92
import java.util.Optional;
93
import java.util.Set;
94
import java.util.concurrent.ConcurrentHashMap;
95
import java.util.concurrent.atomic.AtomicInteger;
96
import java.util.concurrent.locks.ReentrantLock;
97

98
/** {@link NodeManager} manages cluster node addition and removal requests. */
99
public class NodeManager {
100

101
  private static final Logger LOGGER = LoggerFactory.getLogger(NodeManager.class);
×
102

103
  private static final ConfigNodeConfig CONF = ConfigNodeDescriptor.getInstance().getConf();
×
104
  public static final long HEARTBEAT_INTERVAL = CONF.getHeartbeatIntervalInMs();
×
105

106
  private final IManager configManager;
107
  private final NodeInfo nodeInfo;
108

109
  private final ReentrantLock removeConfigNodeLock;
110

111
  public NodeManager(IManager configManager, NodeInfo nodeInfo) {
×
112
    this.configManager = configManager;
×
113
    this.nodeInfo = nodeInfo;
×
114
    this.removeConfigNodeLock = new ReentrantLock();
×
115
  }
×
116

117
  /**
118
   * Get system configurations.
119
   *
120
   * @return ConfigurationResp. The TSStatus will be set to SUCCESS_STATUS.
121
   */
122
  public DataSet getSystemConfiguration() {
123
    ConfigurationResp dataSet = new ConfigurationResp();
×
124
    dataSet.setStatus(RpcUtils.getStatus(TSStatusCode.SUCCESS_STATUS));
×
125
    setGlobalConfig(dataSet);
×
126
    setRatisConfig(dataSet);
×
127
    setCQConfig(dataSet);
×
128
    return dataSet;
×
129
  }
130

131
  private void setGlobalConfig(ConfigurationResp dataSet) {
132
    // Set TGlobalConfig
133
    final ConfigNodeConfig configNodeConfig = ConfigNodeDescriptor.getInstance().getConf();
×
134
    final CommonConfig commonConfig = CommonDescriptor.getInstance().getConfig();
×
135
    TGlobalConfig globalConfig = new TGlobalConfig();
×
136
    globalConfig.setDataRegionConsensusProtocolClass(
×
137
        configNodeConfig.getDataRegionConsensusProtocolClass());
×
138
    globalConfig.setSchemaRegionConsensusProtocolClass(
×
139
        configNodeConfig.getSchemaRegionConsensusProtocolClass());
×
140
    globalConfig.setSeriesPartitionSlotNum(configNodeConfig.getSeriesSlotNum());
×
141
    globalConfig.setSeriesPartitionExecutorClass(
×
142
        configNodeConfig.getSeriesPartitionExecutorClass());
×
143
    globalConfig.setTimePartitionInterval(commonConfig.getTimePartitionInterval());
×
144
    globalConfig.setReadConsistencyLevel(configNodeConfig.getReadConsistencyLevel());
×
145
    globalConfig.setDiskSpaceWarningThreshold(commonConfig.getDiskSpaceWarningThreshold());
×
146
    globalConfig.setTimestampPrecision(commonConfig.getTimestampPrecision());
×
147
    globalConfig.setSchemaEngineMode(commonConfig.getSchemaEngineMode());
×
148
    globalConfig.setTagAttributeTotalSize(commonConfig.getTagAttributeTotalSize());
×
149
    dataSet.setGlobalConfig(globalConfig);
×
150
  }
×
151

152
  private void setRatisConfig(ConfigurationResp dataSet) {
153
    ConfigNodeConfig conf = ConfigNodeDescriptor.getInstance().getConf();
×
154
    TRatisConfig ratisConfig = new TRatisConfig();
×
155

156
    ratisConfig.setDataAppenderBufferSize(conf.getDataRegionRatisConsensusLogAppenderBufferSize());
×
157
    ratisConfig.setSchemaAppenderBufferSize(
×
158
        conf.getSchemaRegionRatisConsensusLogAppenderBufferSize());
×
159

160
    ratisConfig.setDataSnapshotTriggerThreshold(conf.getDataRegionRatisSnapshotTriggerThreshold());
×
161
    ratisConfig.setSchemaSnapshotTriggerThreshold(
×
162
        conf.getSchemaRegionRatisSnapshotTriggerThreshold());
×
163

164
    ratisConfig.setDataLogUnsafeFlushEnable(conf.isDataRegionRatisLogUnsafeFlushEnable());
×
165
    ratisConfig.setSchemaLogUnsafeFlushEnable(conf.isSchemaRegionRatisLogUnsafeFlushEnable());
×
166
    ratisConfig.setDataRegionLogForceSyncNum(conf.getDataRegionRatisLogForceSyncNum());
×
167

168
    ratisConfig.setDataLogSegmentSizeMax(conf.getDataRegionRatisLogSegmentSizeMax());
×
169
    ratisConfig.setSchemaLogSegmentSizeMax(conf.getSchemaRegionRatisLogSegmentSizeMax());
×
170

171
    ratisConfig.setDataGrpcFlowControlWindow(conf.getDataRegionRatisGrpcFlowControlWindow());
×
172
    ratisConfig.setSchemaGrpcFlowControlWindow(conf.getSchemaRegionRatisGrpcFlowControlWindow());
×
173
    ratisConfig.setDataRegionGrpcLeaderOutstandingAppendsMax(
×
174
        conf.getDataRegionRatisGrpcLeaderOutstandingAppendsMax());
×
175

176
    ratisConfig.setDataLeaderElectionTimeoutMin(
×
177
        conf.getDataRegionRatisRpcLeaderElectionTimeoutMinMs());
×
178
    ratisConfig.setSchemaLeaderElectionTimeoutMin(
×
179
        conf.getSchemaRegionRatisRpcLeaderElectionTimeoutMinMs());
×
180

181
    ratisConfig.setDataLeaderElectionTimeoutMax(
×
182
        conf.getDataRegionRatisRpcLeaderElectionTimeoutMaxMs());
×
183
    ratisConfig.setSchemaLeaderElectionTimeoutMax(
×
184
        conf.getSchemaRegionRatisRpcLeaderElectionTimeoutMaxMs());
×
185

186
    ratisConfig.setDataRequestTimeout(conf.getDataRegionRatisRequestTimeoutMs());
×
187
    ratisConfig.setSchemaRequestTimeout(conf.getSchemaRegionRatisRequestTimeoutMs());
×
188

189
    ratisConfig.setDataMaxRetryAttempts(conf.getDataRegionRatisMaxRetryAttempts());
×
190
    ratisConfig.setDataInitialSleepTime(conf.getDataRegionRatisInitialSleepTimeMs());
×
191
    ratisConfig.setDataMaxSleepTime(conf.getDataRegionRatisMaxSleepTimeMs());
×
192
    ratisConfig.setSchemaMaxRetryAttempts(conf.getSchemaRegionRatisMaxRetryAttempts());
×
193
    ratisConfig.setSchemaInitialSleepTime(conf.getSchemaRegionRatisInitialSleepTimeMs());
×
194
    ratisConfig.setSchemaMaxSleepTime(conf.getSchemaRegionRatisMaxSleepTimeMs());
×
195

196
    ratisConfig.setSchemaPreserveWhenPurge(conf.getSchemaRegionRatisPreserveLogsWhenPurge());
×
197
    ratisConfig.setDataPreserveWhenPurge(conf.getDataRegionRatisPreserveLogsWhenPurge());
×
198

199
    ratisConfig.setFirstElectionTimeoutMin(conf.getRatisFirstElectionTimeoutMinMs());
×
200
    ratisConfig.setFirstElectionTimeoutMax(conf.getRatisFirstElectionTimeoutMaxMs());
×
201

202
    ratisConfig.setSchemaRegionRatisLogMax(conf.getSchemaRegionRatisLogMax());
×
203
    ratisConfig.setDataRegionRatisLogMax(conf.getDataRegionRatisLogMax());
×
204

205
    dataSet.setRatisConfig(ratisConfig);
×
206
  }
×
207

208
  private void setCQConfig(ConfigurationResp dataSet) {
209
    final ConfigNodeConfig conf = ConfigNodeDescriptor.getInstance().getConf();
×
210
    TCQConfig cqConfig = new TCQConfig();
×
211
    cqConfig.setCqMinEveryIntervalInMs(conf.getCqMinEveryIntervalInMs());
×
212

213
    dataSet.setCqConfig(cqConfig);
×
214
  }
×
215

216
  private TRuntimeConfiguration getRuntimeConfiguration() {
217
    getPipeManager().getPipePluginCoordinator().lock();
×
218
    getTriggerManager().getTriggerInfo().acquireTriggerTableLock();
×
219
    getUDFManager().getUdfInfo().acquireUDFTableLock();
×
220
    try {
221
      final TRuntimeConfiguration runtimeConfiguration = new TRuntimeConfiguration();
×
222
      runtimeConfiguration.setTemplateInfo(getClusterSchemaManager().getAllTemplateSetInfo());
×
223
      runtimeConfiguration.setAllTriggerInformation(
×
224
          getTriggerManager().getTriggerTable(false).getAllTriggerInformation());
×
225
      runtimeConfiguration.setAllUDFInformation(
×
226
          getUDFManager().getUDFTable().getAllUDFInformation());
×
227
      runtimeConfiguration.setAllPipeInformation(
×
228
          getPipeManager().getPipePluginCoordinator().getPipePluginTable().getAllPipePluginMeta());
×
229
      runtimeConfiguration.setAllTTLInformation(
×
230
          DataNodeRegisterResp.convertAllTTLInformation(getClusterSchemaManager().getAllTTLInfo()));
×
231
      return runtimeConfiguration;
×
232
    } finally {
233
      getTriggerManager().getTriggerInfo().releaseTriggerTableLock();
×
234
      getUDFManager().getUdfInfo().releaseUDFTableLock();
×
235
      getPipeManager().getPipePluginCoordinator().unlock();
×
236
    }
237
  }
238

239
  /**
240
   * Register DataNode.
241
   *
242
   * @param req TDataNodeRegisterReq
243
   * @return DataNodeConfigurationDataSet. The {@link TSStatus} will be set to {@link
244
   *     TSStatusCode#SUCCESS_STATUS} when register success.
245
   */
246
  public DataSet registerDataNode(TDataNodeRegisterReq req) {
247
    int dataNodeId = nodeInfo.generateNextNodeId();
×
248

249
    RegisterDataNodePlan registerDataNodePlan =
×
250
        new RegisterDataNodePlan(req.getDataNodeConfiguration());
×
251
    // Register new DataNode
252
    registerDataNodePlan.getDataNodeConfiguration().getLocation().setDataNodeId(dataNodeId);
×
253
    getConsensusManager().write(registerDataNodePlan);
×
254

255
    // update datanode's versionInfo
256
    UpdateVersionInfoPlan updateVersionInfoPlan =
×
257
        new UpdateVersionInfoPlan(req.getVersionInfo(), dataNodeId);
×
258
    getConsensusManager().write(updateVersionInfoPlan);
×
259

260
    // Bind DataNode metrics
261
    PartitionMetrics.bindDataNodePartitionMetrics(
×
262
        MetricService.getInstance(), configManager, dataNodeId);
×
263

264
    // Adjust the maximum RegionGroup number of each StorageGroup
265
    getClusterSchemaManager().adjustMaxRegionGroupNum();
×
266

267
    DataNodeRegisterResp resp = new DataNodeRegisterResp();
×
268

269
    resp.setStatus(ClusterNodeStartUtils.ACCEPT_NODE_REGISTRATION);
×
270
    resp.setConfigNodeList(getRegisteredConfigNodes());
×
271
    resp.setDataNodeId(
×
272
        registerDataNodePlan.getDataNodeConfiguration().getLocation().getDataNodeId());
×
273
    resp.setRuntimeConfiguration(getRuntimeConfiguration());
×
274
    return resp;
×
275
  }
276

277
  public TDataNodeRestartResp updateDataNodeIfNecessary(TDataNodeRestartReq req) {
278
    int nodeId = req.getDataNodeConfiguration().getLocation().getDataNodeId();
×
279
    TDataNodeConfiguration dataNodeConfiguration = getRegisteredDataNode(nodeId);
×
280
    if (!req.getDataNodeConfiguration().equals(dataNodeConfiguration)) {
×
281
      // Update DataNodeConfiguration when modified during restart
282
      UpdateDataNodePlan updateDataNodePlan =
×
283
          new UpdateDataNodePlan(req.getDataNodeConfiguration());
×
284
      getConsensusManager().write(updateDataNodePlan);
×
285
    }
286
    TNodeVersionInfo versionInfo = nodeInfo.getVersionInfo(nodeId);
×
287
    if (!req.getVersionInfo().equals(versionInfo)) {
×
288
      // Update versionInfo when modified during restart
289
      UpdateVersionInfoPlan updateVersionInfoPlan =
×
290
          new UpdateVersionInfoPlan(req.getVersionInfo(), nodeId);
×
291
      getConsensusManager().write(updateVersionInfoPlan);
×
292
    }
293

294
    TDataNodeRestartResp resp = new TDataNodeRestartResp();
×
295
    resp.setStatus(ClusterNodeStartUtils.ACCEPT_NODE_RESTART);
×
296
    resp.setConfigNodeList(getRegisteredConfigNodes());
×
297
    resp.setRuntimeConfiguration(getRuntimeConfiguration());
×
298
    return resp;
×
299
  }
300

301
  /**
302
   * Remove DataNodes.
303
   *
304
   * @param removeDataNodePlan removeDataNodePlan
305
   * @return DataNodeToStatusResp, The TSStatus will be SUCCEED_STATUS if the request is accepted,
306
   *     DATANODE_NOT_EXIST when some datanode does not exist.
307
   */
308
  public DataSet removeDataNode(RemoveDataNodePlan removeDataNodePlan) {
309
    LOGGER.info("NodeManager start to remove DataNode {}", removeDataNodePlan);
×
310

311
    DataNodeRemoveHandler dataNodeRemoveHandler =
×
312
        new DataNodeRemoveHandler((ConfigManager) configManager);
313
    DataNodeToStatusResp preCheckStatus =
×
314
        dataNodeRemoveHandler.checkRemoveDataNodeRequest(removeDataNodePlan);
×
315
    if (preCheckStatus.getStatus().getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
×
316
      LOGGER.error(
×
317
          "The remove DataNode request check failed. req: {}, check result: {}",
318
          removeDataNodePlan,
319
          preCheckStatus.getStatus());
×
320
      return preCheckStatus;
×
321
    }
322

323
    // Do transfer of the DataNodes before remove
324
    DataNodeToStatusResp dataSet = new DataNodeToStatusResp();
×
325
    if (configManager.transfer(removeDataNodePlan.getDataNodeLocations()).getCode()
×
326
        != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
×
327
      dataSet.setStatus(
×
328
          new TSStatus(TSStatusCode.REMOVE_DATANODE_ERROR.getStatusCode())
×
329
              .setMessage("Fail to do transfer of the DataNodes"));
×
330
      return dataSet;
×
331
    }
332

333
    // Add request to queue, then return to client
334
    boolean removeSucceed = configManager.getProcedureManager().removeDataNode(removeDataNodePlan);
×
335
    TSStatus status;
336
    if (removeSucceed) {
×
337
      status = new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode());
×
338
      status.setMessage("Server accepted the request");
×
339
    } else {
340
      status = new TSStatus(TSStatusCode.REMOVE_DATANODE_ERROR.getStatusCode());
×
341
      status.setMessage("Server rejected the request, maybe requests are too many");
×
342
    }
343
    dataSet.setStatus(status);
×
344

345
    LOGGER.info(
×
346
        "NodeManager submit RemoveDataNodePlan finished, removeDataNodePlan: {}",
347
        removeDataNodePlan);
348
    return dataSet;
×
349
  }
350

351
  public TConfigNodeRegisterResp registerConfigNode(TConfigNodeRegisterReq req) {
352
    int nodeId = nodeInfo.generateNextNodeId();
×
353
    req.getConfigNodeLocation().setConfigNodeId(nodeId);
×
354
    configManager.getProcedureManager().addConfigNode(req);
×
355
    return new TConfigNodeRegisterResp()
×
356
        .setStatus(ClusterNodeStartUtils.ACCEPT_NODE_REGISTRATION)
×
357
        .setConfigNodeId(nodeId);
×
358
  }
359

360
  public TSStatus updateConfigNodeIfNecessary(int configNodeId, TNodeVersionInfo versionInfo) {
361
    TNodeVersionInfo recordVersionInfo = nodeInfo.getVersionInfo(configNodeId);
×
362
    if (!recordVersionInfo.equals(versionInfo)) {
×
363
      // Update versionInfo when modified during restart
364
      UpdateVersionInfoPlan updateConfigNodePlan =
×
365
          new UpdateVersionInfoPlan(versionInfo, configNodeId);
366
      ConsensusWriteResponse result = getConsensusManager().write(updateConfigNodePlan);
×
367
      return result.getStatus();
×
368
    }
369
    return ClusterNodeStartUtils.ACCEPT_NODE_RESTART;
×
370
  }
371

372
  /**
373
   * Get TDataNodeConfiguration.
374
   *
375
   * @param req GetDataNodeConfigurationPlan
376
   * @return The specific DataNode's configuration or all DataNodes' configuration if dataNodeId in
377
   *     GetDataNodeConfigurationPlan is -1
378
   */
379
  public DataNodeConfigurationResp getDataNodeConfiguration(GetDataNodeConfigurationPlan req) {
380
    return (DataNodeConfigurationResp) getConsensusManager().read(req).getDataset();
×
381
  }
382

383
  /**
384
   * Only leader use this interface.
385
   *
386
   * @return The number of registered DataNodes
387
   */
388
  public int getRegisteredDataNodeCount() {
389
    return nodeInfo.getRegisteredDataNodeCount();
×
390
  }
391

392
  /**
393
   * Only leader use this interface.
394
   *
395
   * @return All registered DataNodes
396
   */
397
  public List<TDataNodeConfiguration> getRegisteredDataNodes() {
398
    return nodeInfo.getRegisteredDataNodes();
×
399
  }
400

401
  /**
402
   * Only leader use this interface.
403
   *
404
   * <p>Notice: The result will be an empty TDataNodeConfiguration if the specified DataNode doesn't
405
   * register
406
   *
407
   * @param dataNodeId The specified DataNode's index
408
   * @return The specified registered DataNode
409
   */
410
  public TDataNodeConfiguration getRegisteredDataNode(int dataNodeId) {
411
    return nodeInfo.getRegisteredDataNode(dataNodeId);
×
412
  }
413

414
  public Map<Integer, TDataNodeLocation> getRegisteredDataNodeLocations() {
415
    Map<Integer, TDataNodeLocation> dataNodeLocations = new ConcurrentHashMap<>();
×
416
    nodeInfo
×
417
        .getRegisteredDataNodes()
×
418
        .forEach(
×
419
            dataNodeConfiguration ->
420
                dataNodeLocations.put(
×
421
                    dataNodeConfiguration.getLocation().getDataNodeId(),
×
422
                    dataNodeConfiguration.getLocation()));
×
423
    return dataNodeLocations;
×
424
  }
425

426
  public List<TDataNodeInfo> getRegisteredDataNodeInfoList() {
427
    List<TDataNodeInfo> dataNodeInfoList = new ArrayList<>();
×
428
    List<TDataNodeConfiguration> registeredDataNodes = this.getRegisteredDataNodes();
×
429
    if (registeredDataNodes != null) {
×
430
      registeredDataNodes.forEach(
×
431
          (registeredDataNode) -> {
432
            TDataNodeInfo dataNodeInfo = new TDataNodeInfo();
×
433
            int dataNodeId = registeredDataNode.getLocation().getDataNodeId();
×
434
            dataNodeInfo.setDataNodeId(dataNodeId);
×
435
            dataNodeInfo.setStatus(getLoadManager().getNodeStatusWithReason(dataNodeId));
×
436
            dataNodeInfo.setRpcAddresss(
×
437
                registeredDataNode.getLocation().getClientRpcEndPoint().getIp());
×
438
            dataNodeInfo.setRpcPort(
×
439
                registeredDataNode.getLocation().getClientRpcEndPoint().getPort());
×
440
            dataNodeInfo.setDataRegionNum(0);
×
441
            dataNodeInfo.setSchemaRegionNum(0);
×
442
            dataNodeInfo.setCpuCoreNum(registeredDataNode.getResource().getCpuCoreNum());
×
443
            dataNodeInfoList.add(dataNodeInfo);
×
444
          });
×
445
    }
446

447
    // Map<DataNodeId, DataRegionNum>
448
    Map<Integer, AtomicInteger> dataRegionNumMap = new HashMap<>();
×
449
    // Map<DataNodeId, SchemaRegionNum>
450
    Map<Integer, AtomicInteger> schemaRegionNumMap = new HashMap<>();
×
451
    List<TRegionReplicaSet> regionReplicaSets = getPartitionManager().getAllReplicaSets();
×
452
    regionReplicaSets.forEach(
×
453
        regionReplicaSet ->
454
            regionReplicaSet
×
455
                .getDataNodeLocations()
×
456
                .forEach(
×
457
                    dataNodeLocation -> {
458
                      switch (regionReplicaSet.getRegionId().getType()) {
×
459
                        case SchemaRegion:
460
                          schemaRegionNumMap
×
461
                              .computeIfAbsent(
×
462
                                  dataNodeLocation.getDataNodeId(), key -> new AtomicInteger())
×
463
                              .getAndIncrement();
×
464
                          break;
×
465
                        case DataRegion:
466
                        default:
467
                          dataRegionNumMap
×
468
                              .computeIfAbsent(
×
469
                                  dataNodeLocation.getDataNodeId(), key -> new AtomicInteger())
×
470
                              .getAndIncrement();
×
471
                      }
472
                    }));
×
473
    AtomicInteger zero = new AtomicInteger(0);
×
474
    dataNodeInfoList.forEach(
×
475
        (dataNodesInfo -> {
476
          dataNodesInfo.setSchemaRegionNum(
×
477
              schemaRegionNumMap.getOrDefault(dataNodesInfo.getDataNodeId(), zero).get());
×
478
          dataNodesInfo.setDataRegionNum(
×
479
              dataRegionNumMap.getOrDefault(dataNodesInfo.getDataNodeId(), zero).get());
×
480
        }));
×
481

482
    dataNodeInfoList.sort(Comparator.comparingInt(TDataNodeInfo::getDataNodeId));
×
483
    return dataNodeInfoList;
×
484
  }
485

486
  public List<TConfigNodeLocation> getRegisteredConfigNodes() {
487
    return nodeInfo.getRegisteredConfigNodes();
×
488
  }
489

490
  public Map<Integer, TNodeVersionInfo> getNodeVersionInfo() {
491
    return nodeInfo.getNodeVersionInfo();
×
492
  }
493

494
  public List<TConfigNodeInfo> getRegisteredConfigNodeInfoList() {
495
    List<TConfigNodeInfo> configNodeInfoList = new ArrayList<>();
×
496
    List<TConfigNodeLocation> registeredConfigNodes = this.getRegisteredConfigNodes();
×
497
    if (registeredConfigNodes != null) {
×
498
      registeredConfigNodes.forEach(
×
499
          (configNodeLocation) -> {
500
            TConfigNodeInfo info = new TConfigNodeInfo();
×
501
            int configNodeId = configNodeLocation.getConfigNodeId();
×
502
            info.setConfigNodeId(configNodeId);
×
503
            info.setStatus(getLoadManager().getNodeStatusWithReason(configNodeId));
×
504
            info.setInternalAddress(configNodeLocation.getInternalEndPoint().getIp());
×
505
            info.setInternalPort(configNodeLocation.getInternalEndPoint().getPort());
×
506
            info.setRoleType(
×
507
                configNodeLocation.getConfigNodeId() == ConfigNodeHeartbeatCache.CURRENT_NODE_ID
×
508
                    ? RegionRoleType.Leader.name()
×
509
                    : RegionRoleType.Follower.name());
×
510
            configNodeInfoList.add(info);
×
511
          });
×
512
    }
513
    configNodeInfoList.sort(Comparator.comparingInt(TConfigNodeInfo::getConfigNodeId));
×
514
    return configNodeInfoList;
×
515
  }
516

517
  /**
518
   * Only leader use this interface, record the new ConfigNode's information.
519
   *
520
   * @param configNodeLocation The new ConfigNode.
521
   * @param versionInfo The new ConfigNode's versionInfo.
522
   */
523
  public void applyConfigNode(
524
      TConfigNodeLocation configNodeLocation, TNodeVersionInfo versionInfo) {
525
    ApplyConfigNodePlan applyConfigNodePlan = new ApplyConfigNodePlan(configNodeLocation);
×
526
    getConsensusManager().write(applyConfigNodePlan);
×
527
    UpdateVersionInfoPlan updateVersionInfoPlan =
×
528
        new UpdateVersionInfoPlan(versionInfo, configNodeLocation.getConfigNodeId());
×
529
    getConsensusManager().write(updateVersionInfoPlan);
×
530
  }
×
531

532
  /**
533
   * Only leader use this interface, check the ConfigNode before remove it.
534
   *
535
   * @param removeConfigNodePlan RemoveConfigNodePlan
536
   */
537
  public TSStatus checkConfigNodeBeforeRemove(RemoveConfigNodePlan removeConfigNodePlan) {
538
    removeConfigNodeLock.lock();
×
539
    try {
540
      // Check OnlineConfigNodes number
541
      if (filterConfigNodeThroughStatus(NodeStatus.Running).size() <= 1) {
×
542
        return new TSStatus(TSStatusCode.REMOVE_CONFIGNODE_ERROR.getStatusCode())
×
543
            .setMessage(
×
544
                "Remove ConfigNode failed because there is only one ConfigNode in current Cluster.");
545
      }
546

547
      // Check whether the registeredConfigNodes contain the ConfigNode to be removed.
548
      if (!getRegisteredConfigNodes().contains(removeConfigNodePlan.getConfigNodeLocation())) {
×
549
        return new TSStatus(TSStatusCode.REMOVE_CONFIGNODE_ERROR.getStatusCode())
×
550
            .setMessage("Remove ConfigNode failed because the ConfigNode not in current Cluster.");
×
551
      }
552

553
      // Check whether the remove ConfigNode is leader
554
      TConfigNodeLocation leader = getConsensusManager().getLeader();
×
555
      if (leader == null) {
×
556
        return new TSStatus(TSStatusCode.REMOVE_CONFIGNODE_ERROR.getStatusCode())
×
557
            .setMessage(
×
558
                "Remove ConfigNode failed because the ConfigNodeGroup is on leader election, please retry.");
559
      }
560

561
      if (leader
×
562
          .getInternalEndPoint()
×
563
          .equals(removeConfigNodePlan.getConfigNodeLocation().getInternalEndPoint())) {
×
564
        // transfer leader
565
        return transferLeader(removeConfigNodePlan, getConsensusManager().getConsensusGroupId());
×
566
      }
567

568
    } finally {
569
      removeConfigNodeLock.unlock();
×
570
    }
571

572
    return new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode())
×
573
        .setMessage("Successfully remove confignode.");
×
574
  }
575

576
  private TSStatus transferLeader(
577
      RemoveConfigNodePlan removeConfigNodePlan, ConsensusGroupId groupId) {
578
    Optional<TConfigNodeLocation> optional =
×
579
        filterConfigNodeThroughStatus(NodeStatus.Running).stream()
×
580
            .filter(e -> !e.equals(removeConfigNodePlan.getConfigNodeLocation()))
×
581
            .findAny();
×
582
    TConfigNodeLocation newLeader = null;
×
583
    if (optional.isPresent()) {
×
584
      newLeader = optional.get();
×
585
    } else {
586
      return new TSStatus(TSStatusCode.TRANSFER_LEADER_ERROR.getStatusCode())
×
587
          .setMessage(
×
588
              "Transfer ConfigNode leader failed because can not find any running ConfigNode.");
589
    }
590
    ConsensusGenericResponse resp =
×
591
        getConsensusManager()
×
592
            .getConsensusImpl()
×
593
            .transferLeader(
×
594
                groupId,
595
                new Peer(groupId, newLeader.getConfigNodeId(), newLeader.getConsensusEndPoint()));
×
596
    if (!resp.isSuccess()) {
×
597
      return new TSStatus(TSStatusCode.REMOVE_CONFIGNODE_ERROR.getStatusCode())
×
598
          .setMessage("Remove ConfigNode failed because transfer ConfigNode leader failed.");
×
599
    }
600
    return new TSStatus(TSStatusCode.REDIRECTION_RECOMMEND.getStatusCode())
×
601
        .setRedirectNode(newLeader.getInternalEndPoint())
×
602
        .setMessage(
×
603
            "The ConfigNode to be removed is leader, already transfer Leader to "
604
                + newLeader
605
                + ".");
606
  }
607

608
  public List<TSStatus> merge() {
609
    Map<Integer, TDataNodeLocation> dataNodeLocationMap =
×
610
        configManager.getNodeManager().getRegisteredDataNodeLocations();
×
611
    AsyncClientHandler<Object, TSStatus> clientHandler =
×
612
        new AsyncClientHandler<>(DataNodeRequestType.MERGE, dataNodeLocationMap);
613
    AsyncDataNodeClientPool.getInstance().sendAsyncRequestToDataNodeWithRetry(clientHandler);
×
614
    return clientHandler.getResponseList();
×
615
  }
616

617
  public List<TSStatus> flush(TFlushReq req) {
618
    Map<Integer, TDataNodeLocation> dataNodeLocationMap =
×
619
        configManager.getNodeManager().getRegisteredDataNodeLocations();
×
620
    AsyncClientHandler<TFlushReq, TSStatus> clientHandler =
×
621
        new AsyncClientHandler<>(DataNodeRequestType.FLUSH, req, dataNodeLocationMap);
622
    AsyncDataNodeClientPool.getInstance().sendAsyncRequestToDataNodeWithRetry(clientHandler);
×
623
    return clientHandler.getResponseList();
×
624
  }
625

626
  public List<TSStatus> clearCache() {
627
    Map<Integer, TDataNodeLocation> dataNodeLocationMap =
×
628
        configManager.getNodeManager().getRegisteredDataNodeLocations();
×
629
    AsyncClientHandler<Object, TSStatus> clientHandler =
×
630
        new AsyncClientHandler<>(DataNodeRequestType.CLEAR_CACHE, dataNodeLocationMap);
631
    AsyncDataNodeClientPool.getInstance().sendAsyncRequestToDataNodeWithRetry(clientHandler);
×
632
    return clientHandler.getResponseList();
×
633
  }
634

635
  public List<TSStatus> loadConfiguration() {
636
    Map<Integer, TDataNodeLocation> dataNodeLocationMap =
×
637
        configManager.getNodeManager().getRegisteredDataNodeLocations();
×
638
    AsyncClientHandler<Object, TSStatus> clientHandler =
×
639
        new AsyncClientHandler<>(DataNodeRequestType.LOAD_CONFIGURATION, dataNodeLocationMap);
640
    AsyncDataNodeClientPool.getInstance().sendAsyncRequestToDataNodeWithRetry(clientHandler);
×
641
    return clientHandler.getResponseList();
×
642
  }
643

644
  public List<TSStatus> setSystemStatus(String status) {
645
    Map<Integer, TDataNodeLocation> dataNodeLocationMap =
×
646
        configManager.getNodeManager().getRegisteredDataNodeLocations();
×
647
    AsyncClientHandler<String, TSStatus> clientHandler =
×
648
        new AsyncClientHandler<>(
649
            DataNodeRequestType.SET_SYSTEM_STATUS, status, dataNodeLocationMap);
650
    AsyncDataNodeClientPool.getInstance().sendAsyncRequestToDataNodeWithRetry(clientHandler);
×
651
    return clientHandler.getResponseList();
×
652
  }
653

654
  public TSStatus setDataNodeStatus(TSetDataNodeStatusReq setDataNodeStatusReq) {
655
    return SyncDataNodeClientPool.getInstance()
×
656
        .sendSyncRequestToDataNodeWithRetry(
×
657
            setDataNodeStatusReq.getTargetDataNode().getInternalEndPoint(),
×
658
            setDataNodeStatusReq.getStatus(),
×
659
            DataNodeRequestType.SET_SYSTEM_STATUS);
660
  }
661

662
  /**
663
   * Kill read on DataNode.
664
   *
665
   * @param queryId the id of specific read need to be killed, it will be NULL if kill all queries
666
   * @param dataNodeId the DataNode obtains target read, -1 means we will kill all queries on all
667
   *     DataNodes
668
   */
669
  public TSStatus killQuery(String queryId, int dataNodeId) {
670
    if (dataNodeId < 0) {
×
671
      return killAllQueries();
×
672
    } else {
673
      return killSpecificQuery(queryId, getRegisteredDataNodeLocations().get(dataNodeId));
×
674
    }
675
  }
676

677
  private TSStatus killAllQueries() {
678
    Map<Integer, TDataNodeLocation> dataNodeLocationMap =
×
679
        configManager.getNodeManager().getRegisteredDataNodeLocations();
×
680
    AsyncClientHandler<String, TSStatus> clientHandler =
×
681
        new AsyncClientHandler<>(DataNodeRequestType.KILL_QUERY_INSTANCE, dataNodeLocationMap);
682
    AsyncDataNodeClientPool.getInstance().sendAsyncRequestToDataNodeWithRetry(clientHandler);
×
683
    return RpcUtils.squashResponseStatusList(clientHandler.getResponseList());
×
684
  }
685

686
  private TSStatus killSpecificQuery(String queryId, TDataNodeLocation dataNodeLocation) {
687
    if (dataNodeLocation == null) {
×
688
      return new TSStatus(TSStatusCode.INTERNAL_SERVER_ERROR.getStatusCode())
×
689
          .setMessage(
×
690
              "The target DataNode is not existed, please ensure your input <queryId> is correct");
691
    } else {
692
      return SyncDataNodeClientPool.getInstance()
×
693
          .sendSyncRequestToDataNodeWithRetry(
×
694
              dataNodeLocation.getInternalEndPoint(),
×
695
              queryId,
696
              DataNodeRequestType.KILL_QUERY_INSTANCE);
697
    }
698
  }
699

700
  /**
701
   * Filter ConfigNodes through the specified NodeStatus.
702
   *
703
   * @param status The specified NodeStatus
704
   * @return Filtered ConfigNodes with the specified NodeStatus
705
   */
706
  public List<TConfigNodeLocation> filterConfigNodeThroughStatus(NodeStatus... status) {
707
    return nodeInfo.getRegisteredConfigNodes(
×
708
        getLoadManager().filterConfigNodeThroughStatus(status));
×
709
  }
710

711
  /**
712
   * Filter DataNodes through the specified NodeStatus.
713
   *
714
   * @param status The specified NodeStatus
715
   * @return Filtered DataNodes with the specified NodeStatus
716
   */
717
  public List<TDataNodeConfiguration> filterDataNodeThroughStatus(NodeStatus... status) {
718
    return nodeInfo.getRegisteredDataNodes(getLoadManager().filterDataNodeThroughStatus(status));
×
719
  }
720

721
  /**
722
   * Get the DataNodeLocation of the DataNode which has the lowest loadScore.
723
   *
724
   * @return TDataNodeLocation with the lowest loadScore
725
   */
726
  public Optional<TDataNodeLocation> getLowestLoadDataNode() {
727
    // TODO get real lowest load data node after scoring algorithm being implemented
728
    int dataNodeId = getLoadManager().getLowestLoadDataNode();
×
729
    return dataNodeId < 0
×
730
        ? Optional.empty()
×
731
        : Optional.of(getRegisteredDataNode(dataNodeId).getLocation());
×
732
  }
733

734
  /**
735
   * Get the DataNodeLocation which has the lowest loadScore within input.
736
   *
737
   * @return TDataNodeLocation with the lowest loadScore
738
   */
739
  public TDataNodeLocation getLowestLoadDataNode(Set<Integer> nodes) {
740
    int dataNodeId = getLoadManager().getLowestLoadDataNode(new ArrayList<>(nodes));
×
741
    return getRegisteredDataNode(dataNodeId).getLocation();
×
742
  }
743

744
  private ConsensusManager getConsensusManager() {
745
    return configManager.getConsensusManager();
×
746
  }
747

748
  private ClusterSchemaManager getClusterSchemaManager() {
749
    return configManager.getClusterSchemaManager();
×
750
  }
751

752
  private PartitionManager getPartitionManager() {
753
    return configManager.getPartitionManager();
×
754
  }
755

756
  private LoadManager getLoadManager() {
757
    return configManager.getLoadManager();
×
758
  }
759

760
  private TriggerManager getTriggerManager() {
761
    return configManager.getTriggerManager();
×
762
  }
763

764
  private PipeManager getPipeManager() {
765
    return configManager.getPipeManager();
×
766
  }
767

768
  private UDFManager getUDFManager() {
769
    return configManager.getUDFManager();
×
770
  }
771
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc