• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

apache / iotdb / #9686

pending completion
#9686

push

travis_ci

web-flow
add build info in show cluster (#10595)

146 of 146 new or added lines in 13 files covered. (100.0%)

79232 of 165062 relevant lines covered (48.0%)

0.48 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

0.0
/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/node/NodeManager.java
1
/*
2
 * Licensed to the Apache Software Foundation (ASF) under one
3
 * or more contributor license agreements.  See the NOTICE file
4
 * distributed with this work for additional information
5
 * regarding copyright ownership.  The ASF licenses this file
6
 * to you under the Apache License, Version 2.0 (the
7
 * "License"); you may not use this file except in compliance
8
 * with the License.  You may obtain a copy of the License at
9
 *
10
 *     http://www.apache.org/licenses/LICENSE-2.0
11
 *
12
 * Unless required by applicable law or agreed to in writing,
13
 * software distributed under the License is distributed on an
14
 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15
 * KIND, either express or implied.  See the License for the
16
 * specific language governing permissions and limitations
17
 * under the License.
18
 */
19

20
package org.apache.iotdb.confignode.manager.node;
21

22
import org.apache.iotdb.common.rpc.thrift.TConfigNodeLocation;
23
import org.apache.iotdb.common.rpc.thrift.TDataNodeConfiguration;
24
import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation;
25
import org.apache.iotdb.common.rpc.thrift.TFlushReq;
26
import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet;
27
import org.apache.iotdb.common.rpc.thrift.TSStatus;
28
import org.apache.iotdb.commons.cluster.NodeStatus;
29
import org.apache.iotdb.commons.cluster.RegionRoleType;
30
import org.apache.iotdb.commons.conf.CommonConfig;
31
import org.apache.iotdb.commons.conf.CommonDescriptor;
32
import org.apache.iotdb.commons.consensus.ConsensusGroupId;
33
import org.apache.iotdb.commons.service.metric.MetricService;
34
import org.apache.iotdb.confignode.client.DataNodeRequestType;
35
import org.apache.iotdb.confignode.client.async.AsyncDataNodeClientPool;
36
import org.apache.iotdb.confignode.client.async.handlers.AsyncClientHandler;
37
import org.apache.iotdb.confignode.client.sync.SyncDataNodeClientPool;
38
import org.apache.iotdb.confignode.conf.ConfigNodeConfig;
39
import org.apache.iotdb.confignode.conf.ConfigNodeDescriptor;
40
import org.apache.iotdb.confignode.consensus.request.read.datanode.GetDataNodeConfigurationPlan;
41
import org.apache.iotdb.confignode.consensus.request.write.confignode.ApplyConfigNodePlan;
42
import org.apache.iotdb.confignode.consensus.request.write.confignode.RemoveConfigNodePlan;
43
import org.apache.iotdb.confignode.consensus.request.write.confignode.UpdateBuildInfoPlan;
44
import org.apache.iotdb.confignode.consensus.request.write.datanode.RegisterDataNodePlan;
45
import org.apache.iotdb.confignode.consensus.request.write.datanode.RemoveDataNodePlan;
46
import org.apache.iotdb.confignode.consensus.request.write.datanode.UpdateDataNodePlan;
47
import org.apache.iotdb.confignode.consensus.response.datanode.ConfigurationResp;
48
import org.apache.iotdb.confignode.consensus.response.datanode.DataNodeConfigurationResp;
49
import org.apache.iotdb.confignode.consensus.response.datanode.DataNodeRegisterResp;
50
import org.apache.iotdb.confignode.consensus.response.datanode.DataNodeToStatusResp;
51
import org.apache.iotdb.confignode.manager.ConfigManager;
52
import org.apache.iotdb.confignode.manager.IManager;
53
import org.apache.iotdb.confignode.manager.TriggerManager;
54
import org.apache.iotdb.confignode.manager.UDFManager;
55
import org.apache.iotdb.confignode.manager.consensus.ConsensusManager;
56
import org.apache.iotdb.confignode.manager.load.LoadManager;
57
import org.apache.iotdb.confignode.manager.load.cache.node.ConfigNodeHeartbeatCache;
58
import org.apache.iotdb.confignode.manager.partition.PartitionManager;
59
import org.apache.iotdb.confignode.manager.partition.PartitionMetrics;
60
import org.apache.iotdb.confignode.manager.pipe.PipeManager;
61
import org.apache.iotdb.confignode.manager.schema.ClusterSchemaManager;
62
import org.apache.iotdb.confignode.persistence.node.NodeInfo;
63
import org.apache.iotdb.confignode.procedure.env.DataNodeRemoveHandler;
64
import org.apache.iotdb.confignode.rpc.thrift.TCQConfig;
65
import org.apache.iotdb.confignode.rpc.thrift.TConfigNodeInfo;
66
import org.apache.iotdb.confignode.rpc.thrift.TConfigNodeRegisterReq;
67
import org.apache.iotdb.confignode.rpc.thrift.TConfigNodeRegisterResp;
68
import org.apache.iotdb.confignode.rpc.thrift.TDataNodeInfo;
69
import org.apache.iotdb.confignode.rpc.thrift.TDataNodeRegisterReq;
70
import org.apache.iotdb.confignode.rpc.thrift.TDataNodeRestartReq;
71
import org.apache.iotdb.confignode.rpc.thrift.TDataNodeRestartResp;
72
import org.apache.iotdb.confignode.rpc.thrift.TGlobalConfig;
73
import org.apache.iotdb.confignode.rpc.thrift.TRatisConfig;
74
import org.apache.iotdb.confignode.rpc.thrift.TRuntimeConfiguration;
75
import org.apache.iotdb.confignode.rpc.thrift.TSetDataNodeStatusReq;
76
import org.apache.iotdb.consensus.common.DataSet;
77
import org.apache.iotdb.consensus.common.Peer;
78
import org.apache.iotdb.consensus.common.response.ConsensusGenericResponse;
79
import org.apache.iotdb.consensus.common.response.ConsensusWriteResponse;
80
import org.apache.iotdb.rpc.RpcUtils;
81
import org.apache.iotdb.rpc.TSStatusCode;
82

83
import org.slf4j.Logger;
84
import org.slf4j.LoggerFactory;
85

86
import java.util.ArrayList;
87
import java.util.Comparator;
88
import java.util.HashMap;
89
import java.util.List;
90
import java.util.Map;
91
import java.util.Optional;
92
import java.util.Set;
93
import java.util.concurrent.ConcurrentHashMap;
94
import java.util.concurrent.atomic.AtomicInteger;
95
import java.util.concurrent.locks.ReentrantLock;
96

97
/** {@link NodeManager} manages cluster node addition and removal requests. */
98
public class NodeManager {
99

100
  private static final Logger LOGGER = LoggerFactory.getLogger(NodeManager.class);
×
101

102
  private static final ConfigNodeConfig CONF = ConfigNodeDescriptor.getInstance().getConf();
×
103
  public static final long HEARTBEAT_INTERVAL = CONF.getHeartbeatIntervalInMs();
×
104

105
  private final IManager configManager;
106
  private final NodeInfo nodeInfo;
107

108
  private final ReentrantLock removeConfigNodeLock;
109

110
  public NodeManager(IManager configManager, NodeInfo nodeInfo) {
×
111
    this.configManager = configManager;
×
112
    this.nodeInfo = nodeInfo;
×
113
    this.removeConfigNodeLock = new ReentrantLock();
×
114
  }
×
115

116
  /**
117
   * Get system configurations.
118
   *
119
   * @return ConfigurationResp. The TSStatus will be set to SUCCESS_STATUS.
120
   */
121
  public DataSet getSystemConfiguration() {
122
    ConfigurationResp dataSet = new ConfigurationResp();
×
123
    dataSet.setStatus(RpcUtils.getStatus(TSStatusCode.SUCCESS_STATUS));
×
124
    setGlobalConfig(dataSet);
×
125
    setRatisConfig(dataSet);
×
126
    setCQConfig(dataSet);
×
127
    return dataSet;
×
128
  }
129

130
  private void setGlobalConfig(ConfigurationResp dataSet) {
131
    // Set TGlobalConfig
132
    final ConfigNodeConfig configNodeConfig = ConfigNodeDescriptor.getInstance().getConf();
×
133
    final CommonConfig commonConfig = CommonDescriptor.getInstance().getConfig();
×
134
    TGlobalConfig globalConfig = new TGlobalConfig();
×
135
    globalConfig.setDataRegionConsensusProtocolClass(
×
136
        configNodeConfig.getDataRegionConsensusProtocolClass());
×
137
    globalConfig.setSchemaRegionConsensusProtocolClass(
×
138
        configNodeConfig.getSchemaRegionConsensusProtocolClass());
×
139
    globalConfig.setSeriesPartitionSlotNum(configNodeConfig.getSeriesSlotNum());
×
140
    globalConfig.setSeriesPartitionExecutorClass(
×
141
        configNodeConfig.getSeriesPartitionExecutorClass());
×
142
    globalConfig.setTimePartitionInterval(commonConfig.getTimePartitionInterval());
×
143
    globalConfig.setReadConsistencyLevel(configNodeConfig.getReadConsistencyLevel());
×
144
    globalConfig.setDiskSpaceWarningThreshold(commonConfig.getDiskSpaceWarningThreshold());
×
145
    globalConfig.setTimestampPrecision(commonConfig.getTimestampPrecision());
×
146
    globalConfig.setSchemaEngineMode(commonConfig.getSchemaEngineMode());
×
147
    globalConfig.setTagAttributeTotalSize(commonConfig.getTagAttributeTotalSize());
×
148
    dataSet.setGlobalConfig(globalConfig);
×
149
  }
×
150

151
  private void setRatisConfig(ConfigurationResp dataSet) {
152
    ConfigNodeConfig conf = ConfigNodeDescriptor.getInstance().getConf();
×
153
    TRatisConfig ratisConfig = new TRatisConfig();
×
154

155
    ratisConfig.setDataAppenderBufferSize(conf.getDataRegionRatisConsensusLogAppenderBufferSize());
×
156
    ratisConfig.setSchemaAppenderBufferSize(
×
157
        conf.getSchemaRegionRatisConsensusLogAppenderBufferSize());
×
158

159
    ratisConfig.setDataSnapshotTriggerThreshold(conf.getDataRegionRatisSnapshotTriggerThreshold());
×
160
    ratisConfig.setSchemaSnapshotTriggerThreshold(
×
161
        conf.getSchemaRegionRatisSnapshotTriggerThreshold());
×
162

163
    ratisConfig.setDataLogUnsafeFlushEnable(conf.isDataRegionRatisLogUnsafeFlushEnable());
×
164
    ratisConfig.setSchemaLogUnsafeFlushEnable(conf.isSchemaRegionRatisLogUnsafeFlushEnable());
×
165
    ratisConfig.setDataRegionLogForceSyncNum(conf.getDataRegionRatisLogForceSyncNum());
×
166

167
    ratisConfig.setDataLogSegmentSizeMax(conf.getDataRegionRatisLogSegmentSizeMax());
×
168
    ratisConfig.setSchemaLogSegmentSizeMax(conf.getSchemaRegionRatisLogSegmentSizeMax());
×
169

170
    ratisConfig.setDataGrpcFlowControlWindow(conf.getDataRegionRatisGrpcFlowControlWindow());
×
171
    ratisConfig.setSchemaGrpcFlowControlWindow(conf.getSchemaRegionRatisGrpcFlowControlWindow());
×
172
    ratisConfig.setDataRegionGrpcLeaderOutstandingAppendsMax(
×
173
        conf.getDataRegionRatisGrpcLeaderOutstandingAppendsMax());
×
174

175
    ratisConfig.setDataLeaderElectionTimeoutMin(
×
176
        conf.getDataRegionRatisRpcLeaderElectionTimeoutMinMs());
×
177
    ratisConfig.setSchemaLeaderElectionTimeoutMin(
×
178
        conf.getSchemaRegionRatisRpcLeaderElectionTimeoutMinMs());
×
179

180
    ratisConfig.setDataLeaderElectionTimeoutMax(
×
181
        conf.getDataRegionRatisRpcLeaderElectionTimeoutMaxMs());
×
182
    ratisConfig.setSchemaLeaderElectionTimeoutMax(
×
183
        conf.getSchemaRegionRatisRpcLeaderElectionTimeoutMaxMs());
×
184

185
    ratisConfig.setDataRequestTimeout(conf.getDataRegionRatisRequestTimeoutMs());
×
186
    ratisConfig.setSchemaRequestTimeout(conf.getSchemaRegionRatisRequestTimeoutMs());
×
187

188
    ratisConfig.setDataMaxRetryAttempts(conf.getDataRegionRatisMaxRetryAttempts());
×
189
    ratisConfig.setDataInitialSleepTime(conf.getDataRegionRatisInitialSleepTimeMs());
×
190
    ratisConfig.setDataMaxSleepTime(conf.getDataRegionRatisMaxSleepTimeMs());
×
191
    ratisConfig.setSchemaMaxRetryAttempts(conf.getSchemaRegionRatisMaxRetryAttempts());
×
192
    ratisConfig.setSchemaInitialSleepTime(conf.getSchemaRegionRatisInitialSleepTimeMs());
×
193
    ratisConfig.setSchemaMaxSleepTime(conf.getSchemaRegionRatisMaxSleepTimeMs());
×
194

195
    ratisConfig.setSchemaPreserveWhenPurge(conf.getSchemaRegionRatisPreserveLogsWhenPurge());
×
196
    ratisConfig.setDataPreserveWhenPurge(conf.getDataRegionRatisPreserveLogsWhenPurge());
×
197

198
    ratisConfig.setFirstElectionTimeoutMin(conf.getRatisFirstElectionTimeoutMinMs());
×
199
    ratisConfig.setFirstElectionTimeoutMax(conf.getRatisFirstElectionTimeoutMaxMs());
×
200

201
    ratisConfig.setSchemaRegionRatisLogMax(conf.getSchemaRegionRatisLogMax());
×
202
    ratisConfig.setDataRegionRatisLogMax(conf.getDataRegionRatisLogMax());
×
203

204
    dataSet.setRatisConfig(ratisConfig);
×
205
  }
×
206

207
  private void setCQConfig(ConfigurationResp dataSet) {
208
    final ConfigNodeConfig conf = ConfigNodeDescriptor.getInstance().getConf();
×
209
    TCQConfig cqConfig = new TCQConfig();
×
210
    cqConfig.setCqMinEveryIntervalInMs(conf.getCqMinEveryIntervalInMs());
×
211

212
    dataSet.setCqConfig(cqConfig);
×
213
  }
×
214

215
  private TRuntimeConfiguration getRuntimeConfiguration() {
216
    getPipeManager().getPipePluginCoordinator().lock();
×
217
    getTriggerManager().getTriggerInfo().acquireTriggerTableLock();
×
218
    getUDFManager().getUdfInfo().acquireUDFTableLock();
×
219
    try {
220
      final TRuntimeConfiguration runtimeConfiguration = new TRuntimeConfiguration();
×
221
      runtimeConfiguration.setTemplateInfo(getClusterSchemaManager().getAllTemplateSetInfo());
×
222
      runtimeConfiguration.setAllTriggerInformation(
×
223
          getTriggerManager().getTriggerTable(false).getAllTriggerInformation());
×
224
      runtimeConfiguration.setAllUDFInformation(
×
225
          getUDFManager().getUDFTable().getAllUDFInformation());
×
226
      runtimeConfiguration.setAllPipeInformation(
×
227
          getPipeManager().getPipePluginCoordinator().getPipePluginTable().getAllPipePluginMeta());
×
228
      runtimeConfiguration.setAllTTLInformation(
×
229
          DataNodeRegisterResp.convertAllTTLInformation(getClusterSchemaManager().getAllTTLInfo()));
×
230
      return runtimeConfiguration;
×
231
    } finally {
232
      getTriggerManager().getTriggerInfo().releaseTriggerTableLock();
×
233
      getUDFManager().getUdfInfo().releaseUDFTableLock();
×
234
      getPipeManager().getPipePluginCoordinator().unlock();
×
235
    }
236
  }
237

238
  /**
239
   * Register DataNode.
240
   *
241
   * @param req TDataNodeRegisterReq
242
   * @return DataNodeConfigurationDataSet. The {@link TSStatus} will be set to {@link
243
   *     TSStatusCode#SUCCESS_STATUS} when register success.
244
   */
245
  public DataSet registerDataNode(TDataNodeRegisterReq req) {
246
    int dataNodeId = nodeInfo.generateNextNodeId();
×
247

248
    RegisterDataNodePlan registerDataNodePlan =
×
249
        new RegisterDataNodePlan(req.getDataNodeConfiguration());
×
250
    // Register new DataNode
251
    registerDataNodePlan.getDataNodeConfiguration().getLocation().setDataNodeId(dataNodeId);
×
252
    getConsensusManager().write(registerDataNodePlan);
×
253

254
    // update datanode's buildInfo
255
    UpdateBuildInfoPlan updateBuildInfoPlan =
×
256
        new UpdateBuildInfoPlan(req.getBuildInfo(), dataNodeId);
×
257
    getConsensusManager().write(updateBuildInfoPlan);
×
258

259
    // Bind DataNode metrics
260
    PartitionMetrics.bindDataNodePartitionMetrics(
×
261
        MetricService.getInstance(), configManager, dataNodeId);
×
262

263
    // Adjust the maximum RegionGroup number of each StorageGroup
264
    getClusterSchemaManager().adjustMaxRegionGroupNum();
×
265

266
    DataNodeRegisterResp resp = new DataNodeRegisterResp();
×
267

268
    resp.setStatus(ClusterNodeStartUtils.ACCEPT_NODE_REGISTRATION);
×
269
    resp.setConfigNodeList(getRegisteredConfigNodes());
×
270
    resp.setDataNodeId(
×
271
        registerDataNodePlan.getDataNodeConfiguration().getLocation().getDataNodeId());
×
272
    resp.setRuntimeConfiguration(getRuntimeConfiguration());
×
273
    return resp;
×
274
  }
275

276
  public TDataNodeRestartResp updateDataNodeIfNecessary(TDataNodeRestartReq req) {
277
    int nodeId = req.getDataNodeConfiguration().getLocation().getDataNodeId();
×
278
    TDataNodeConfiguration dataNodeConfiguration = getRegisteredDataNode(nodeId);
×
279
    if (!req.getDataNodeConfiguration().equals(dataNodeConfiguration)) {
×
280
      // Update DataNodeConfiguration when modified during restart
281
      UpdateDataNodePlan updateDataNodePlan =
×
282
          new UpdateDataNodePlan(req.getDataNodeConfiguration());
×
283
      getConsensusManager().write(updateDataNodePlan);
×
284
    }
285
    String recordBuildInfo = nodeInfo.getBuildInfo(nodeId);
×
286
    if (!req.getBuildInfo().equals(recordBuildInfo)) {
×
287
      // Update buildInfo when modified during restart
288
      UpdateBuildInfoPlan updateBuildInfoPlan = new UpdateBuildInfoPlan(req.getBuildInfo(), nodeId);
×
289
      getConsensusManager().write(updateBuildInfoPlan);
×
290
    }
291

292
    TDataNodeRestartResp resp = new TDataNodeRestartResp();
×
293
    resp.setStatus(ClusterNodeStartUtils.ACCEPT_NODE_RESTART);
×
294
    resp.setConfigNodeList(getRegisteredConfigNodes());
×
295
    resp.setRuntimeConfiguration(getRuntimeConfiguration());
×
296
    return resp;
×
297
  }
298

299
  /**
300
   * Remove DataNodes.
301
   *
302
   * @param removeDataNodePlan removeDataNodePlan
303
   * @return DataNodeToStatusResp, The TSStatus will be SUCCEED_STATUS if the request is accepted,
304
   *     DATANODE_NOT_EXIST when some datanode does not exist.
305
   */
306
  public DataSet removeDataNode(RemoveDataNodePlan removeDataNodePlan) {
307
    LOGGER.info("NodeManager start to remove DataNode {}", removeDataNodePlan);
×
308

309
    DataNodeRemoveHandler dataNodeRemoveHandler =
×
310
        new DataNodeRemoveHandler((ConfigManager) configManager);
311
    DataNodeToStatusResp preCheckStatus =
×
312
        dataNodeRemoveHandler.checkRemoveDataNodeRequest(removeDataNodePlan);
×
313
    if (preCheckStatus.getStatus().getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
×
314
      LOGGER.error(
×
315
          "The remove DataNode request check failed. req: {}, check result: {}",
316
          removeDataNodePlan,
317
          preCheckStatus.getStatus());
×
318
      return preCheckStatus;
×
319
    }
320

321
    // Do transfer of the DataNodes before remove
322
    DataNodeToStatusResp dataSet = new DataNodeToStatusResp();
×
323
    if (configManager.transfer(removeDataNodePlan.getDataNodeLocations()).getCode()
×
324
        != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
×
325
      dataSet.setStatus(
×
326
          new TSStatus(TSStatusCode.REMOVE_DATANODE_ERROR.getStatusCode())
×
327
              .setMessage("Fail to do transfer of the DataNodes"));
×
328
      return dataSet;
×
329
    }
330

331
    // Add request to queue, then return to client
332
    boolean removeSucceed = configManager.getProcedureManager().removeDataNode(removeDataNodePlan);
×
333
    TSStatus status;
334
    if (removeSucceed) {
×
335
      status = new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode());
×
336
      status.setMessage("Server accepted the request");
×
337
    } else {
338
      status = new TSStatus(TSStatusCode.REMOVE_DATANODE_ERROR.getStatusCode());
×
339
      status.setMessage("Server rejected the request, maybe requests are too many");
×
340
    }
341
    dataSet.setStatus(status);
×
342

343
    LOGGER.info(
×
344
        "NodeManager submit RemoveDataNodePlan finished, removeDataNodePlan: {}",
345
        removeDataNodePlan);
346
    return dataSet;
×
347
  }
348

349
  public TConfigNodeRegisterResp registerConfigNode(TConfigNodeRegisterReq req) {
350
    int nodeId = nodeInfo.generateNextNodeId();
×
351
    req.getConfigNodeLocation().setConfigNodeId(nodeId);
×
352
    configManager.getProcedureManager().addConfigNode(req);
×
353
    return new TConfigNodeRegisterResp()
×
354
        .setStatus(ClusterNodeStartUtils.ACCEPT_NODE_REGISTRATION)
×
355
        .setConfigNodeId(nodeId);
×
356
  }
357

358
  public TSStatus updateConfigNodeIfNecessary(int configNodeId, String buildInfo) {
359
    String recordBuildInfo = nodeInfo.getBuildInfo(configNodeId);
×
360
    if (!recordBuildInfo.equals(buildInfo)) {
×
361
      // Update buildInfo when modified during restart
362
      UpdateBuildInfoPlan updateConfigNodePlan = new UpdateBuildInfoPlan(buildInfo, configNodeId);
×
363
      ConsensusWriteResponse result = getConsensusManager().write(updateConfigNodePlan);
×
364
      return result.getStatus();
×
365
    }
366
    return ClusterNodeStartUtils.ACCEPT_NODE_RESTART;
×
367
  }
368

369
  /**
370
   * Get TDataNodeConfiguration.
371
   *
372
   * @param req GetDataNodeConfigurationPlan
373
   * @return The specific DataNode's configuration or all DataNodes' configuration if dataNodeId in
374
   *     GetDataNodeConfigurationPlan is -1
375
   */
376
  public DataNodeConfigurationResp getDataNodeConfiguration(GetDataNodeConfigurationPlan req) {
377
    return (DataNodeConfigurationResp) getConsensusManager().read(req).getDataset();
×
378
  }
379

380
  /**
381
   * Only leader use this interface.
382
   *
383
   * @return The number of registered DataNodes
384
   */
385
  public int getRegisteredDataNodeCount() {
386
    return nodeInfo.getRegisteredDataNodeCount();
×
387
  }
388

389
  /**
390
   * Only leader use this interface.
391
   *
392
   * @return All registered DataNodes
393
   */
394
  public List<TDataNodeConfiguration> getRegisteredDataNodes() {
395
    return nodeInfo.getRegisteredDataNodes();
×
396
  }
397

398
  /**
399
   * Only leader use this interface.
400
   *
401
   * <p>Notice: The result will be an empty TDataNodeConfiguration if the specified DataNode doesn't
402
   * register
403
   *
404
   * @param dataNodeId The specified DataNode's index
405
   * @return The specified registered DataNode
406
   */
407
  public TDataNodeConfiguration getRegisteredDataNode(int dataNodeId) {
408
    return nodeInfo.getRegisteredDataNode(dataNodeId);
×
409
  }
410

411
  public Map<Integer, TDataNodeLocation> getRegisteredDataNodeLocations() {
412
    Map<Integer, TDataNodeLocation> dataNodeLocations = new ConcurrentHashMap<>();
×
413
    nodeInfo
×
414
        .getRegisteredDataNodes()
×
415
        .forEach(
×
416
            dataNodeConfiguration ->
417
                dataNodeLocations.put(
×
418
                    dataNodeConfiguration.getLocation().getDataNodeId(),
×
419
                    dataNodeConfiguration.getLocation()));
×
420
    return dataNodeLocations;
×
421
  }
422

423
  public List<TDataNodeInfo> getRegisteredDataNodeInfoList() {
424
    List<TDataNodeInfo> dataNodeInfoList = new ArrayList<>();
×
425
    List<TDataNodeConfiguration> registeredDataNodes = this.getRegisteredDataNodes();
×
426
    if (registeredDataNodes != null) {
×
427
      registeredDataNodes.forEach(
×
428
          (registeredDataNode) -> {
429
            TDataNodeInfo dataNodeInfo = new TDataNodeInfo();
×
430
            int dataNodeId = registeredDataNode.getLocation().getDataNodeId();
×
431
            dataNodeInfo.setDataNodeId(dataNodeId);
×
432
            dataNodeInfo.setStatus(getLoadManager().getNodeStatusWithReason(dataNodeId));
×
433
            dataNodeInfo.setRpcAddresss(
×
434
                registeredDataNode.getLocation().getClientRpcEndPoint().getIp());
×
435
            dataNodeInfo.setRpcPort(
×
436
                registeredDataNode.getLocation().getClientRpcEndPoint().getPort());
×
437
            dataNodeInfo.setDataRegionNum(0);
×
438
            dataNodeInfo.setSchemaRegionNum(0);
×
439
            dataNodeInfo.setCpuCoreNum(registeredDataNode.getResource().getCpuCoreNum());
×
440
            dataNodeInfoList.add(dataNodeInfo);
×
441
          });
×
442
    }
443

444
    // Map<DataNodeId, DataRegionNum>
445
    Map<Integer, AtomicInteger> dataRegionNumMap = new HashMap<>();
×
446
    // Map<DataNodeId, SchemaRegionNum>
447
    Map<Integer, AtomicInteger> schemaRegionNumMap = new HashMap<>();
×
448
    List<TRegionReplicaSet> regionReplicaSets = getPartitionManager().getAllReplicaSets();
×
449
    regionReplicaSets.forEach(
×
450
        regionReplicaSet ->
451
            regionReplicaSet
×
452
                .getDataNodeLocations()
×
453
                .forEach(
×
454
                    dataNodeLocation -> {
455
                      switch (regionReplicaSet.getRegionId().getType()) {
×
456
                        case SchemaRegion:
457
                          schemaRegionNumMap
×
458
                              .computeIfAbsent(
×
459
                                  dataNodeLocation.getDataNodeId(), key -> new AtomicInteger())
×
460
                              .getAndIncrement();
×
461
                          break;
×
462
                        case DataRegion:
463
                        default:
464
                          dataRegionNumMap
×
465
                              .computeIfAbsent(
×
466
                                  dataNodeLocation.getDataNodeId(), key -> new AtomicInteger())
×
467
                              .getAndIncrement();
×
468
                      }
469
                    }));
×
470
    AtomicInteger zero = new AtomicInteger(0);
×
471
    dataNodeInfoList.forEach(
×
472
        (dataNodesInfo -> {
473
          dataNodesInfo.setSchemaRegionNum(
×
474
              schemaRegionNumMap.getOrDefault(dataNodesInfo.getDataNodeId(), zero).get());
×
475
          dataNodesInfo.setDataRegionNum(
×
476
              dataRegionNumMap.getOrDefault(dataNodesInfo.getDataNodeId(), zero).get());
×
477
        }));
×
478

479
    dataNodeInfoList.sort(Comparator.comparingInt(TDataNodeInfo::getDataNodeId));
×
480
    return dataNodeInfoList;
×
481
  }
482

483
  public List<TConfigNodeLocation> getRegisteredConfigNodes() {
484
    return nodeInfo.getRegisteredConfigNodes();
×
485
  }
486

487
  public Map<Integer, String> getNodeBuildInfo() {
488
    return nodeInfo.getNodeBuildInfo();
×
489
  }
490

491
  public List<TConfigNodeInfo> getRegisteredConfigNodeInfoList() {
492
    List<TConfigNodeInfo> configNodeInfoList = new ArrayList<>();
×
493
    List<TConfigNodeLocation> registeredConfigNodes = this.getRegisteredConfigNodes();
×
494
    if (registeredConfigNodes != null) {
×
495
      registeredConfigNodes.forEach(
×
496
          (configNodeLocation) -> {
497
            TConfigNodeInfo info = new TConfigNodeInfo();
×
498
            int configNodeId = configNodeLocation.getConfigNodeId();
×
499
            info.setConfigNodeId(configNodeId);
×
500
            info.setStatus(getLoadManager().getNodeStatusWithReason(configNodeId));
×
501
            info.setInternalAddress(configNodeLocation.getInternalEndPoint().getIp());
×
502
            info.setInternalPort(configNodeLocation.getInternalEndPoint().getPort());
×
503
            info.setRoleType(
×
504
                configNodeLocation.getConfigNodeId() == ConfigNodeHeartbeatCache.CURRENT_NODE_ID
×
505
                    ? RegionRoleType.Leader.name()
×
506
                    : RegionRoleType.Follower.name());
×
507
            configNodeInfoList.add(info);
×
508
          });
×
509
    }
510
    configNodeInfoList.sort(Comparator.comparingInt(TConfigNodeInfo::getConfigNodeId));
×
511
    return configNodeInfoList;
×
512
  }
513

514
  /**
515
   * Only leader use this interface, record the new ConfigNode's information.
516
   *
517
   * @param configNodeLocation The new ConfigNode.
518
   * @param buildInfo The new ConfigNode's buildInfo.
519
   */
520
  public void applyConfigNode(TConfigNodeLocation configNodeLocation, String buildInfo) {
521
    ApplyConfigNodePlan applyConfigNodePlan = new ApplyConfigNodePlan(configNodeLocation);
×
522
    getConsensusManager().write(applyConfigNodePlan);
×
523
    UpdateBuildInfoPlan updateBuildInfoPlan =
×
524
        new UpdateBuildInfoPlan(buildInfo, configNodeLocation.getConfigNodeId());
×
525
    getConsensusManager().write(updateBuildInfoPlan);
×
526
  }
×
527

528
  /**
529
   * Only leader use this interface, check the ConfigNode before remove it.
530
   *
531
   * @param removeConfigNodePlan RemoveConfigNodePlan
532
   */
533
  public TSStatus checkConfigNodeBeforeRemove(RemoveConfigNodePlan removeConfigNodePlan) {
534
    removeConfigNodeLock.lock();
×
535
    try {
536
      // Check OnlineConfigNodes number
537
      if (filterConfigNodeThroughStatus(NodeStatus.Running).size() <= 1) {
×
538
        return new TSStatus(TSStatusCode.REMOVE_CONFIGNODE_ERROR.getStatusCode())
×
539
            .setMessage(
×
540
                "Remove ConfigNode failed because there is only one ConfigNode in current Cluster.");
541
      }
542

543
      // Check whether the registeredConfigNodes contain the ConfigNode to be removed.
544
      if (!getRegisteredConfigNodes().contains(removeConfigNodePlan.getConfigNodeLocation())) {
×
545
        return new TSStatus(TSStatusCode.REMOVE_CONFIGNODE_ERROR.getStatusCode())
×
546
            .setMessage("Remove ConfigNode failed because the ConfigNode not in current Cluster.");
×
547
      }
548

549
      // Check whether the remove ConfigNode is leader
550
      TConfigNodeLocation leader = getConsensusManager().getLeader();
×
551
      if (leader == null) {
×
552
        return new TSStatus(TSStatusCode.REMOVE_CONFIGNODE_ERROR.getStatusCode())
×
553
            .setMessage(
×
554
                "Remove ConfigNode failed because the ConfigNodeGroup is on leader election, please retry.");
555
      }
556

557
      if (leader
×
558
          .getInternalEndPoint()
×
559
          .equals(removeConfigNodePlan.getConfigNodeLocation().getInternalEndPoint())) {
×
560
        // transfer leader
561
        return transferLeader(removeConfigNodePlan, getConsensusManager().getConsensusGroupId());
×
562
      }
563

564
    } finally {
565
      removeConfigNodeLock.unlock();
×
566
    }
567

568
    return new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode())
×
569
        .setMessage("Successfully remove confignode.");
×
570
  }
571

572
  private TSStatus transferLeader(
573
      RemoveConfigNodePlan removeConfigNodePlan, ConsensusGroupId groupId) {
574
    Optional<TConfigNodeLocation> optional =
×
575
        filterConfigNodeThroughStatus(NodeStatus.Running).stream()
×
576
            .filter(e -> !e.equals(removeConfigNodePlan.getConfigNodeLocation()))
×
577
            .findAny();
×
578
    TConfigNodeLocation newLeader = null;
×
579
    if (optional.isPresent()) {
×
580
      newLeader = optional.get();
×
581
    } else {
582
      return new TSStatus(TSStatusCode.TRANSFER_LEADER_ERROR.getStatusCode())
×
583
          .setMessage(
×
584
              "Transfer ConfigNode leader failed because can not find any running ConfigNode.");
585
    }
586
    ConsensusGenericResponse resp =
×
587
        getConsensusManager()
×
588
            .getConsensusImpl()
×
589
            .transferLeader(
×
590
                groupId,
591
                new Peer(groupId, newLeader.getConfigNodeId(), newLeader.getConsensusEndPoint()));
×
592
    if (!resp.isSuccess()) {
×
593
      return new TSStatus(TSStatusCode.REMOVE_CONFIGNODE_ERROR.getStatusCode())
×
594
          .setMessage("Remove ConfigNode failed because transfer ConfigNode leader failed.");
×
595
    }
596
    return new TSStatus(TSStatusCode.REDIRECTION_RECOMMEND.getStatusCode())
×
597
        .setRedirectNode(newLeader.getInternalEndPoint())
×
598
        .setMessage(
×
599
            "The ConfigNode to be removed is leader, already transfer Leader to "
600
                + newLeader
601
                + ".");
602
  }
603

604
  public List<TSStatus> merge() {
605
    Map<Integer, TDataNodeLocation> dataNodeLocationMap =
×
606
        configManager.getNodeManager().getRegisteredDataNodeLocations();
×
607
    AsyncClientHandler<Object, TSStatus> clientHandler =
×
608
        new AsyncClientHandler<>(DataNodeRequestType.MERGE, dataNodeLocationMap);
609
    AsyncDataNodeClientPool.getInstance().sendAsyncRequestToDataNodeWithRetry(clientHandler);
×
610
    return clientHandler.getResponseList();
×
611
  }
612

613
  public List<TSStatus> flush(TFlushReq req) {
614
    Map<Integer, TDataNodeLocation> dataNodeLocationMap =
×
615
        configManager.getNodeManager().getRegisteredDataNodeLocations();
×
616
    AsyncClientHandler<TFlushReq, TSStatus> clientHandler =
×
617
        new AsyncClientHandler<>(DataNodeRequestType.FLUSH, req, dataNodeLocationMap);
618
    AsyncDataNodeClientPool.getInstance().sendAsyncRequestToDataNodeWithRetry(clientHandler);
×
619
    return clientHandler.getResponseList();
×
620
  }
621

622
  public List<TSStatus> clearCache() {
623
    Map<Integer, TDataNodeLocation> dataNodeLocationMap =
×
624
        configManager.getNodeManager().getRegisteredDataNodeLocations();
×
625
    AsyncClientHandler<Object, TSStatus> clientHandler =
×
626
        new AsyncClientHandler<>(DataNodeRequestType.CLEAR_CACHE, dataNodeLocationMap);
627
    AsyncDataNodeClientPool.getInstance().sendAsyncRequestToDataNodeWithRetry(clientHandler);
×
628
    return clientHandler.getResponseList();
×
629
  }
630

631
  public List<TSStatus> loadConfiguration() {
632
    Map<Integer, TDataNodeLocation> dataNodeLocationMap =
×
633
        configManager.getNodeManager().getRegisteredDataNodeLocations();
×
634
    AsyncClientHandler<Object, TSStatus> clientHandler =
×
635
        new AsyncClientHandler<>(DataNodeRequestType.LOAD_CONFIGURATION, dataNodeLocationMap);
636
    AsyncDataNodeClientPool.getInstance().sendAsyncRequestToDataNodeWithRetry(clientHandler);
×
637
    return clientHandler.getResponseList();
×
638
  }
639

640
  public List<TSStatus> setSystemStatus(String status) {
641
    Map<Integer, TDataNodeLocation> dataNodeLocationMap =
×
642
        configManager.getNodeManager().getRegisteredDataNodeLocations();
×
643
    AsyncClientHandler<String, TSStatus> clientHandler =
×
644
        new AsyncClientHandler<>(
645
            DataNodeRequestType.SET_SYSTEM_STATUS, status, dataNodeLocationMap);
646
    AsyncDataNodeClientPool.getInstance().sendAsyncRequestToDataNodeWithRetry(clientHandler);
×
647
    return clientHandler.getResponseList();
×
648
  }
649

650
  public TSStatus setDataNodeStatus(TSetDataNodeStatusReq setDataNodeStatusReq) {
651
    return SyncDataNodeClientPool.getInstance()
×
652
        .sendSyncRequestToDataNodeWithRetry(
×
653
            setDataNodeStatusReq.getTargetDataNode().getInternalEndPoint(),
×
654
            setDataNodeStatusReq.getStatus(),
×
655
            DataNodeRequestType.SET_SYSTEM_STATUS);
656
  }
657

658
  /**
659
   * Kill read on DataNode.
660
   *
661
   * @param queryId the id of specific read need to be killed, it will be NULL if kill all queries
662
   * @param dataNodeId the DataNode obtains target read, -1 means we will kill all queries on all
663
   *     DataNodes
664
   */
665
  public TSStatus killQuery(String queryId, int dataNodeId) {
666
    if (dataNodeId < 0) {
×
667
      return killAllQueries();
×
668
    } else {
669
      return killSpecificQuery(queryId, getRegisteredDataNodeLocations().get(dataNodeId));
×
670
    }
671
  }
672

673
  private TSStatus killAllQueries() {
674
    Map<Integer, TDataNodeLocation> dataNodeLocationMap =
×
675
        configManager.getNodeManager().getRegisteredDataNodeLocations();
×
676
    AsyncClientHandler<String, TSStatus> clientHandler =
×
677
        new AsyncClientHandler<>(DataNodeRequestType.KILL_QUERY_INSTANCE, dataNodeLocationMap);
678
    AsyncDataNodeClientPool.getInstance().sendAsyncRequestToDataNodeWithRetry(clientHandler);
×
679
    return RpcUtils.squashResponseStatusList(clientHandler.getResponseList());
×
680
  }
681

682
  private TSStatus killSpecificQuery(String queryId, TDataNodeLocation dataNodeLocation) {
683
    if (dataNodeLocation == null) {
×
684
      return new TSStatus(TSStatusCode.INTERNAL_SERVER_ERROR.getStatusCode())
×
685
          .setMessage(
×
686
              "The target DataNode is not existed, please ensure your input <queryId> is correct");
687
    } else {
688
      return SyncDataNodeClientPool.getInstance()
×
689
          .sendSyncRequestToDataNodeWithRetry(
×
690
              dataNodeLocation.getInternalEndPoint(),
×
691
              queryId,
692
              DataNodeRequestType.KILL_QUERY_INSTANCE);
693
    }
694
  }
695

696
  /**
697
   * Filter ConfigNodes through the specified NodeStatus.
698
   *
699
   * @param status The specified NodeStatus
700
   * @return Filtered ConfigNodes with the specified NodeStatus
701
   */
702
  public List<TConfigNodeLocation> filterConfigNodeThroughStatus(NodeStatus... status) {
703
    return nodeInfo.getRegisteredConfigNodes(
×
704
        getLoadManager().filterConfigNodeThroughStatus(status));
×
705
  }
706

707
  /**
708
   * Filter DataNodes through the specified NodeStatus.
709
   *
710
   * @param status The specified NodeStatus
711
   * @return Filtered DataNodes with the specified NodeStatus
712
   */
713
  public List<TDataNodeConfiguration> filterDataNodeThroughStatus(NodeStatus... status) {
714
    return nodeInfo.getRegisteredDataNodes(getLoadManager().filterDataNodeThroughStatus(status));
×
715
  }
716

717
  /**
718
   * Get the DataNodeLocation of the DataNode which has the lowest loadScore.
719
   *
720
   * @return TDataNodeLocation with the lowest loadScore
721
   */
722
  public Optional<TDataNodeLocation> getLowestLoadDataNode() {
723
    // TODO get real lowest load data node after scoring algorithm being implemented
724
    int dataNodeId = getLoadManager().getLowestLoadDataNode();
×
725
    return dataNodeId < 0
×
726
        ? Optional.empty()
×
727
        : Optional.of(getRegisteredDataNode(dataNodeId).getLocation());
×
728
  }
729

730
  /**
731
   * Get the DataNodeLocation which has the lowest loadScore within input.
732
   *
733
   * @return TDataNodeLocation with the lowest loadScore
734
   */
735
  public TDataNodeLocation getLowestLoadDataNode(Set<Integer> nodes) {
736
    int dataNodeId = getLoadManager().getLowestLoadDataNode(new ArrayList<>(nodes));
×
737
    return getRegisteredDataNode(dataNodeId).getLocation();
×
738
  }
739

740
  private ConsensusManager getConsensusManager() {
741
    return configManager.getConsensusManager();
×
742
  }
743

744
  private ClusterSchemaManager getClusterSchemaManager() {
745
    return configManager.getClusterSchemaManager();
×
746
  }
747

748
  private PartitionManager getPartitionManager() {
749
    return configManager.getPartitionManager();
×
750
  }
751

752
  private LoadManager getLoadManager() {
753
    return configManager.getLoadManager();
×
754
  }
755

756
  private TriggerManager getTriggerManager() {
757
    return configManager.getTriggerManager();
×
758
  }
759

760
  private PipeManager getPipeManager() {
761
    return configManager.getPipeManager();
×
762
  }
763

764
  private UDFManager getUDFManager() {
765
    return configManager.getUDFManager();
×
766
  }
767
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc