• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

apache / iotdb / #10032

08 Sep 2023 06:40AM UTC coverage: 47.635% (-0.003%) from 47.638%
#10032

push

travis_ci

web-flow
[To rel/1.2] Remove some copyright info (#11096)

80288 of 168549 relevant lines covered (47.63%)

0.48 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

0.0
/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/node/NodeManager.java
1
/*
2
 * Licensed to the Apache Software Foundation (ASF) under one
3
 * or more contributor license agreements.  See the NOTICE file
4
 * distributed with this work for additional information
5
 * regarding copyright ownership.  The ASF licenses this file
6
 * to you under the Apache License, Version 2.0 (the
7
 * "License"); you may not use this file except in compliance
8
 * with the License.  You may obtain a copy of the License at
9
 *
10
 *     http://www.apache.org/licenses/LICENSE-2.0
11
 *
12
 * Unless required by applicable law or agreed to in writing,
13
 * software distributed under the License is distributed on an
14
 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15
 * KIND, either express or implied.  See the License for the
16
 * specific language governing permissions and limitations
17
 * under the License.
18
 */
19

20
package org.apache.iotdb.confignode.manager.node;
21

22
import org.apache.iotdb.common.rpc.thrift.TConfigNodeLocation;
23
import org.apache.iotdb.common.rpc.thrift.TDataNodeConfiguration;
24
import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation;
25
import org.apache.iotdb.common.rpc.thrift.TFlushReq;
26
import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet;
27
import org.apache.iotdb.common.rpc.thrift.TSStatus;
28
import org.apache.iotdb.commons.cluster.NodeStatus;
29
import org.apache.iotdb.commons.cluster.RegionRoleType;
30
import org.apache.iotdb.commons.conf.CommonConfig;
31
import org.apache.iotdb.commons.conf.CommonDescriptor;
32
import org.apache.iotdb.commons.consensus.ConsensusGroupId;
33
import org.apache.iotdb.commons.service.metric.MetricService;
34
import org.apache.iotdb.confignode.client.DataNodeRequestType;
35
import org.apache.iotdb.confignode.client.async.AsyncDataNodeClientPool;
36
import org.apache.iotdb.confignode.client.async.handlers.AsyncClientHandler;
37
import org.apache.iotdb.confignode.client.sync.SyncDataNodeClientPool;
38
import org.apache.iotdb.confignode.conf.ConfigNodeConfig;
39
import org.apache.iotdb.confignode.conf.ConfigNodeDescriptor;
40
import org.apache.iotdb.confignode.consensus.request.read.datanode.GetDataNodeConfigurationPlan;
41
import org.apache.iotdb.confignode.consensus.request.write.confignode.ApplyConfigNodePlan;
42
import org.apache.iotdb.confignode.consensus.request.write.confignode.RemoveConfigNodePlan;
43
import org.apache.iotdb.confignode.consensus.request.write.confignode.UpdateVersionInfoPlan;
44
import org.apache.iotdb.confignode.consensus.request.write.datanode.RegisterDataNodePlan;
45
import org.apache.iotdb.confignode.consensus.request.write.datanode.RemoveDataNodePlan;
46
import org.apache.iotdb.confignode.consensus.request.write.datanode.UpdateDataNodePlan;
47
import org.apache.iotdb.confignode.consensus.response.datanode.ConfigurationResp;
48
import org.apache.iotdb.confignode.consensus.response.datanode.DataNodeConfigurationResp;
49
import org.apache.iotdb.confignode.consensus.response.datanode.DataNodeRegisterResp;
50
import org.apache.iotdb.confignode.consensus.response.datanode.DataNodeToStatusResp;
51
import org.apache.iotdb.confignode.manager.ConfigManager;
52
import org.apache.iotdb.confignode.manager.IManager;
53
import org.apache.iotdb.confignode.manager.TriggerManager;
54
import org.apache.iotdb.confignode.manager.UDFManager;
55
import org.apache.iotdb.confignode.manager.consensus.ConsensusManager;
56
import org.apache.iotdb.confignode.manager.load.LoadManager;
57
import org.apache.iotdb.confignode.manager.load.cache.node.ConfigNodeHeartbeatCache;
58
import org.apache.iotdb.confignode.manager.partition.PartitionManager;
59
import org.apache.iotdb.confignode.manager.partition.PartitionMetrics;
60
import org.apache.iotdb.confignode.manager.pipe.PipeManager;
61
import org.apache.iotdb.confignode.manager.schema.ClusterSchemaManager;
62
import org.apache.iotdb.confignode.persistence.node.NodeInfo;
63
import org.apache.iotdb.confignode.procedure.env.DataNodeRemoveHandler;
64
import org.apache.iotdb.confignode.rpc.thrift.TCQConfig;
65
import org.apache.iotdb.confignode.rpc.thrift.TConfigNodeInfo;
66
import org.apache.iotdb.confignode.rpc.thrift.TConfigNodeRegisterReq;
67
import org.apache.iotdb.confignode.rpc.thrift.TConfigNodeRegisterResp;
68
import org.apache.iotdb.confignode.rpc.thrift.TDataNodeInfo;
69
import org.apache.iotdb.confignode.rpc.thrift.TDataNodeRegisterReq;
70
import org.apache.iotdb.confignode.rpc.thrift.TDataNodeRestartReq;
71
import org.apache.iotdb.confignode.rpc.thrift.TDataNodeRestartResp;
72
import org.apache.iotdb.confignode.rpc.thrift.TGlobalConfig;
73
import org.apache.iotdb.confignode.rpc.thrift.TNodeVersionInfo;
74
import org.apache.iotdb.confignode.rpc.thrift.TRatisConfig;
75
import org.apache.iotdb.confignode.rpc.thrift.TRuntimeConfiguration;
76
import org.apache.iotdb.confignode.rpc.thrift.TSetDataNodeStatusReq;
77
import org.apache.iotdb.consensus.common.DataSet;
78
import org.apache.iotdb.consensus.common.Peer;
79
import org.apache.iotdb.consensus.exception.ConsensusException;
80
import org.apache.iotdb.rpc.RpcUtils;
81
import org.apache.iotdb.rpc.TSStatusCode;
82

83
import org.slf4j.Logger;
84
import org.slf4j.LoggerFactory;
85

86
import java.util.ArrayList;
87
import java.util.Comparator;
88
import java.util.HashMap;
89
import java.util.List;
90
import java.util.Map;
91
import java.util.Optional;
92
import java.util.Set;
93
import java.util.concurrent.ConcurrentHashMap;
94
import java.util.concurrent.atomic.AtomicInteger;
95
import java.util.concurrent.locks.ReentrantLock;
96

97
/** {@link NodeManager} manages cluster node addition and removal requests. */
98
public class NodeManager {
99

100
  private static final Logger LOGGER = LoggerFactory.getLogger(NodeManager.class);
×
101

102
  private static final ConfigNodeConfig CONF = ConfigNodeDescriptor.getInstance().getConf();
×
103
  public static final long HEARTBEAT_INTERVAL = CONF.getHeartbeatIntervalInMs();
×
104

105
  private final IManager configManager;
106
  private final NodeInfo nodeInfo;
107

108
  private final ReentrantLock removeConfigNodeLock;
109

110
  private static final String CONSENSUS_WRITE_ERROR =
111
      "Failed in the write API executing the consensus layer due to: ";
112

113
  public NodeManager(IManager configManager, NodeInfo nodeInfo) {
×
114
    this.configManager = configManager;
×
115
    this.nodeInfo = nodeInfo;
×
116
    this.removeConfigNodeLock = new ReentrantLock();
×
117
  }
×
118

119
  /**
120
   * Get system configurations.
121
   *
122
   * @return ConfigurationResp. The TSStatus will be set to SUCCESS_STATUS.
123
   */
124
  public DataSet getSystemConfiguration() {
125
    ConfigurationResp dataSet = new ConfigurationResp();
×
126
    dataSet.setStatus(RpcUtils.getStatus(TSStatusCode.SUCCESS_STATUS));
×
127
    setGlobalConfig(dataSet);
×
128
    setRatisConfig(dataSet);
×
129
    setCQConfig(dataSet);
×
130
    return dataSet;
×
131
  }
132

133
  private void setGlobalConfig(ConfigurationResp dataSet) {
134
    // Set TGlobalConfig
135
    final ConfigNodeConfig configNodeConfig = ConfigNodeDescriptor.getInstance().getConf();
×
136
    final CommonConfig commonConfig = CommonDescriptor.getInstance().getConfig();
×
137
    TGlobalConfig globalConfig = new TGlobalConfig();
×
138
    globalConfig.setDataRegionConsensusProtocolClass(
×
139
        configNodeConfig.getDataRegionConsensusProtocolClass());
×
140
    globalConfig.setSchemaRegionConsensusProtocolClass(
×
141
        configNodeConfig.getSchemaRegionConsensusProtocolClass());
×
142
    globalConfig.setSeriesPartitionSlotNum(configNodeConfig.getSeriesSlotNum());
×
143
    globalConfig.setSeriesPartitionExecutorClass(
×
144
        configNodeConfig.getSeriesPartitionExecutorClass());
×
145
    globalConfig.setTimePartitionInterval(commonConfig.getTimePartitionInterval());
×
146
    globalConfig.setReadConsistencyLevel(configNodeConfig.getReadConsistencyLevel());
×
147
    globalConfig.setDiskSpaceWarningThreshold(commonConfig.getDiskSpaceWarningThreshold());
×
148
    globalConfig.setTimestampPrecision(commonConfig.getTimestampPrecision());
×
149
    globalConfig.setSchemaEngineMode(commonConfig.getSchemaEngineMode());
×
150
    globalConfig.setTagAttributeTotalSize(commonConfig.getTagAttributeTotalSize());
×
151
    dataSet.setGlobalConfig(globalConfig);
×
152
  }
×
153

154
  private void setRatisConfig(ConfigurationResp dataSet) {
155
    ConfigNodeConfig conf = ConfigNodeDescriptor.getInstance().getConf();
×
156
    TRatisConfig ratisConfig = new TRatisConfig();
×
157

158
    ratisConfig.setDataAppenderBufferSize(conf.getDataRegionRatisConsensusLogAppenderBufferSize());
×
159
    ratisConfig.setSchemaAppenderBufferSize(
×
160
        conf.getSchemaRegionRatisConsensusLogAppenderBufferSize());
×
161

162
    ratisConfig.setDataSnapshotTriggerThreshold(conf.getDataRegionRatisSnapshotTriggerThreshold());
×
163
    ratisConfig.setSchemaSnapshotTriggerThreshold(
×
164
        conf.getSchemaRegionRatisSnapshotTriggerThreshold());
×
165

166
    ratisConfig.setDataLogUnsafeFlushEnable(conf.isDataRegionRatisLogUnsafeFlushEnable());
×
167
    ratisConfig.setSchemaLogUnsafeFlushEnable(conf.isSchemaRegionRatisLogUnsafeFlushEnable());
×
168

169
    ratisConfig.setDataLogSegmentSizeMax(conf.getDataRegionRatisLogSegmentSizeMax());
×
170
    ratisConfig.setSchemaLogSegmentSizeMax(conf.getSchemaRegionRatisLogSegmentSizeMax());
×
171

172
    ratisConfig.setDataGrpcFlowControlWindow(conf.getDataRegionRatisGrpcFlowControlWindow());
×
173
    ratisConfig.setSchemaGrpcFlowControlWindow(conf.getSchemaRegionRatisGrpcFlowControlWindow());
×
174
    ratisConfig.setDataRegionGrpcLeaderOutstandingAppendsMax(
×
175
        conf.getDataRegionRatisGrpcLeaderOutstandingAppendsMax());
×
176
    ratisConfig.setSchemaRegionGrpcLeaderOutstandingAppendsMax(
×
177
        conf.getSchemaRegionRatisGrpcLeaderOutstandingAppendsMax());
×
178

179
    ratisConfig.setDataRegionLogForceSyncNum(conf.getDataRegionRatisLogForceSyncNum());
×
180
    ratisConfig.setSchemaRegionLogForceSyncNum(conf.getSchemaRegionRatisLogForceSyncNum());
×
181

182
    ratisConfig.setDataLeaderElectionTimeoutMin(
×
183
        conf.getDataRegionRatisRpcLeaderElectionTimeoutMinMs());
×
184
    ratisConfig.setSchemaLeaderElectionTimeoutMin(
×
185
        conf.getSchemaRegionRatisRpcLeaderElectionTimeoutMinMs());
×
186

187
    ratisConfig.setDataLeaderElectionTimeoutMax(
×
188
        conf.getDataRegionRatisRpcLeaderElectionTimeoutMaxMs());
×
189
    ratisConfig.setSchemaLeaderElectionTimeoutMax(
×
190
        conf.getSchemaRegionRatisRpcLeaderElectionTimeoutMaxMs());
×
191

192
    ratisConfig.setDataRequestTimeout(conf.getDataRegionRatisRequestTimeoutMs());
×
193
    ratisConfig.setSchemaRequestTimeout(conf.getSchemaRegionRatisRequestTimeoutMs());
×
194

195
    ratisConfig.setDataMaxRetryAttempts(conf.getDataRegionRatisMaxRetryAttempts());
×
196
    ratisConfig.setDataInitialSleepTime(conf.getDataRegionRatisInitialSleepTimeMs());
×
197
    ratisConfig.setDataMaxSleepTime(conf.getDataRegionRatisMaxSleepTimeMs());
×
198
    ratisConfig.setSchemaMaxRetryAttempts(conf.getSchemaRegionRatisMaxRetryAttempts());
×
199
    ratisConfig.setSchemaInitialSleepTime(conf.getSchemaRegionRatisInitialSleepTimeMs());
×
200
    ratisConfig.setSchemaMaxSleepTime(conf.getSchemaRegionRatisMaxSleepTimeMs());
×
201

202
    ratisConfig.setSchemaPreserveWhenPurge(conf.getSchemaRegionRatisPreserveLogsWhenPurge());
×
203
    ratisConfig.setDataPreserveWhenPurge(conf.getDataRegionRatisPreserveLogsWhenPurge());
×
204

205
    ratisConfig.setFirstElectionTimeoutMin(conf.getRatisFirstElectionTimeoutMinMs());
×
206
    ratisConfig.setFirstElectionTimeoutMax(conf.getRatisFirstElectionTimeoutMaxMs());
×
207

208
    ratisConfig.setSchemaRegionRatisLogMax(conf.getSchemaRegionRatisLogMax());
×
209
    ratisConfig.setDataRegionRatisLogMax(conf.getDataRegionRatisLogMax());
×
210

211
    dataSet.setRatisConfig(ratisConfig);
×
212
  }
×
213

214
  private void setCQConfig(ConfigurationResp dataSet) {
215
    final ConfigNodeConfig conf = ConfigNodeDescriptor.getInstance().getConf();
×
216
    TCQConfig cqConfig = new TCQConfig();
×
217
    cqConfig.setCqMinEveryIntervalInMs(conf.getCqMinEveryIntervalInMs());
×
218

219
    dataSet.setCqConfig(cqConfig);
×
220
  }
×
221

222
  private TRuntimeConfiguration getRuntimeConfiguration() {
223
    getPipeManager().getPipePluginCoordinator().lock();
×
224
    getTriggerManager().getTriggerInfo().acquireTriggerTableLock();
×
225
    getUDFManager().getUdfInfo().acquireUDFTableLock();
×
226
    try {
227
      final TRuntimeConfiguration runtimeConfiguration = new TRuntimeConfiguration();
×
228
      runtimeConfiguration.setTemplateInfo(getClusterSchemaManager().getAllTemplateSetInfo());
×
229
      runtimeConfiguration.setAllTriggerInformation(
×
230
          getTriggerManager().getTriggerTable(false).getAllTriggerInformation());
×
231
      runtimeConfiguration.setAllUDFInformation(
×
232
          getUDFManager().getUDFTable().getAllUDFInformation());
×
233
      runtimeConfiguration.setAllPipeInformation(
×
234
          getPipeManager().getPipePluginCoordinator().getPipePluginTable().getAllPipePluginMeta());
×
235
      runtimeConfiguration.setAllTTLInformation(
×
236
          DataNodeRegisterResp.convertAllTTLInformation(getClusterSchemaManager().getAllTTLInfo()));
×
237
      return runtimeConfiguration;
×
238
    } finally {
239
      getTriggerManager().getTriggerInfo().releaseTriggerTableLock();
×
240
      getUDFManager().getUdfInfo().releaseUDFTableLock();
×
241
      getPipeManager().getPipePluginCoordinator().unlock();
×
242
    }
243
  }
244

245
  /**
246
   * Register DataNode.
247
   *
248
   * @param req TDataNodeRegisterReq
249
   * @return DataNodeConfigurationDataSet. The {@link TSStatus} will be set to {@link
250
   *     TSStatusCode#SUCCESS_STATUS} when register success.
251
   */
252
  public DataSet registerDataNode(TDataNodeRegisterReq req) {
253
    int dataNodeId = nodeInfo.generateNextNodeId();
×
254

255
    RegisterDataNodePlan registerDataNodePlan =
×
256
        new RegisterDataNodePlan(req.getDataNodeConfiguration());
×
257
    // Register new DataNode
258
    registerDataNodePlan.getDataNodeConfiguration().getLocation().setDataNodeId(dataNodeId);
×
259
    try {
260
      getConsensusManager().write(registerDataNodePlan);
×
261
    } catch (ConsensusException e) {
×
262
      LOGGER.warn(CONSENSUS_WRITE_ERROR, e);
×
263
    }
×
264

265
    // update datanode's versionInfo
266
    UpdateVersionInfoPlan updateVersionInfoPlan =
×
267
        new UpdateVersionInfoPlan(req.getVersionInfo(), dataNodeId);
×
268
    try {
269
      getConsensusManager().write(updateVersionInfoPlan);
×
270
    } catch (ConsensusException e) {
×
271
      LOGGER.warn(CONSENSUS_WRITE_ERROR, e);
×
272
    }
×
273

274
    // Bind DataNode metrics
275
    PartitionMetrics.bindDataNodePartitionMetrics(
×
276
        MetricService.getInstance(), configManager, dataNodeId);
×
277

278
    // Adjust the maximum RegionGroup number of each StorageGroup
279
    getClusterSchemaManager().adjustMaxRegionGroupNum();
×
280

281
    DataNodeRegisterResp resp = new DataNodeRegisterResp();
×
282

283
    resp.setStatus(ClusterNodeStartUtils.ACCEPT_NODE_REGISTRATION);
×
284
    resp.setConfigNodeList(getRegisteredConfigNodes());
×
285
    resp.setDataNodeId(
×
286
        registerDataNodePlan.getDataNodeConfiguration().getLocation().getDataNodeId());
×
287
    resp.setRuntimeConfiguration(getRuntimeConfiguration());
×
288
    return resp;
×
289
  }
290

291
  public TDataNodeRestartResp updateDataNodeIfNecessary(TDataNodeRestartReq req) {
292
    int nodeId = req.getDataNodeConfiguration().getLocation().getDataNodeId();
×
293
    TDataNodeConfiguration dataNodeConfiguration = getRegisteredDataNode(nodeId);
×
294
    if (!req.getDataNodeConfiguration().equals(dataNodeConfiguration)) {
×
295
      // Update DataNodeConfiguration when modified during restart
296
      UpdateDataNodePlan updateDataNodePlan =
×
297
          new UpdateDataNodePlan(req.getDataNodeConfiguration());
×
298
      try {
299
        getConsensusManager().write(updateDataNodePlan);
×
300
      } catch (ConsensusException e) {
×
301
        LOGGER.warn(CONSENSUS_WRITE_ERROR, e);
×
302
      }
×
303
    }
304
    TNodeVersionInfo versionInfo = nodeInfo.getVersionInfo(nodeId);
×
305
    if (!req.getVersionInfo().equals(versionInfo)) {
×
306
      // Update versionInfo when modified during restart
307
      UpdateVersionInfoPlan updateVersionInfoPlan =
×
308
          new UpdateVersionInfoPlan(req.getVersionInfo(), nodeId);
×
309
      try {
310
        getConsensusManager().write(updateVersionInfoPlan);
×
311
      } catch (ConsensusException e) {
×
312
        LOGGER.warn(CONSENSUS_WRITE_ERROR, e);
×
313
      }
×
314
    }
315

316
    TDataNodeRestartResp resp = new TDataNodeRestartResp();
×
317
    resp.setStatus(ClusterNodeStartUtils.ACCEPT_NODE_RESTART);
×
318
    resp.setConfigNodeList(getRegisteredConfigNodes());
×
319
    resp.setRuntimeConfiguration(getRuntimeConfiguration());
×
320
    return resp;
×
321
  }
322

323
  /**
324
   * Remove DataNodes.
325
   *
326
   * @param removeDataNodePlan removeDataNodePlan
327
   * @return DataNodeToStatusResp, The TSStatus will be SUCCEED_STATUS if the request is accepted,
328
   *     DATANODE_NOT_EXIST when some datanode does not exist.
329
   */
330
  public DataSet removeDataNode(RemoveDataNodePlan removeDataNodePlan) {
331
    LOGGER.info("NodeManager start to remove DataNode {}", removeDataNodePlan);
×
332

333
    DataNodeRemoveHandler dataNodeRemoveHandler =
×
334
        new DataNodeRemoveHandler((ConfigManager) configManager);
335
    DataNodeToStatusResp preCheckStatus =
×
336
        dataNodeRemoveHandler.checkRemoveDataNodeRequest(removeDataNodePlan);
×
337
    if (preCheckStatus.getStatus().getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
×
338
      LOGGER.error(
×
339
          "The remove DataNode request check failed. req: {}, check result: {}",
340
          removeDataNodePlan,
341
          preCheckStatus.getStatus());
×
342
      return preCheckStatus;
×
343
    }
344

345
    // Do transfer of the DataNodes before remove
346
    DataNodeToStatusResp dataSet = new DataNodeToStatusResp();
×
347
    if (configManager.transfer(removeDataNodePlan.getDataNodeLocations()).getCode()
×
348
        != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
×
349
      dataSet.setStatus(
×
350
          new TSStatus(TSStatusCode.REMOVE_DATANODE_ERROR.getStatusCode())
×
351
              .setMessage("Fail to do transfer of the DataNodes"));
×
352
      return dataSet;
×
353
    }
354

355
    // Add request to queue, then return to client
356
    boolean removeSucceed = configManager.getProcedureManager().removeDataNode(removeDataNodePlan);
×
357
    TSStatus status;
358
    if (removeSucceed) {
×
359
      status = new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode());
×
360
      status.setMessage("Server accepted the request");
×
361
    } else {
362
      status = new TSStatus(TSStatusCode.REMOVE_DATANODE_ERROR.getStatusCode());
×
363
      status.setMessage("Server rejected the request, maybe requests are too many");
×
364
    }
365
    dataSet.setStatus(status);
×
366

367
    LOGGER.info(
×
368
        "NodeManager submit RemoveDataNodePlan finished, removeDataNodePlan: {}",
369
        removeDataNodePlan);
370
    return dataSet;
×
371
  }
372

373
  public TConfigNodeRegisterResp registerConfigNode(TConfigNodeRegisterReq req) {
374
    int nodeId = nodeInfo.generateNextNodeId();
×
375
    req.getConfigNodeLocation().setConfigNodeId(nodeId);
×
376
    configManager.getProcedureManager().addConfigNode(req);
×
377
    return new TConfigNodeRegisterResp()
×
378
        .setStatus(ClusterNodeStartUtils.ACCEPT_NODE_REGISTRATION)
×
379
        .setConfigNodeId(nodeId);
×
380
  }
381

382
  public TSStatus updateConfigNodeIfNecessary(int configNodeId, TNodeVersionInfo versionInfo) {
383
    TNodeVersionInfo recordVersionInfo = nodeInfo.getVersionInfo(configNodeId);
×
384
    if (!recordVersionInfo.equals(versionInfo)) {
×
385
      // Update versionInfo when modified during restart
386
      UpdateVersionInfoPlan updateConfigNodePlan =
×
387
          new UpdateVersionInfoPlan(versionInfo, configNodeId);
388
      try {
389
        return getConsensusManager().write(updateConfigNodePlan);
×
390
      } catch (ConsensusException e) {
×
391
        return new TSStatus(TSStatusCode.CONSENSUS_NOT_INITIALIZED.getStatusCode());
×
392
      }
393
    }
394
    return ClusterNodeStartUtils.ACCEPT_NODE_RESTART;
×
395
  }
396

397
  /**
398
   * Get TDataNodeConfiguration.
399
   *
400
   * @param req GetDataNodeConfigurationPlan
401
   * @return The specific DataNode's configuration or all DataNodes' configuration if dataNodeId in
402
   *     GetDataNodeConfigurationPlan is -1
403
   */
404
  public DataNodeConfigurationResp getDataNodeConfiguration(GetDataNodeConfigurationPlan req) {
405
    try {
406
      return (DataNodeConfigurationResp) getConsensusManager().read(req);
×
407
    } catch (ConsensusException e) {
×
408
      LOGGER.warn("Failed in the read API executing the consensus layer due to: ", e);
×
409
      TSStatus res = new TSStatus(TSStatusCode.EXECUTE_STATEMENT_ERROR.getStatusCode());
×
410
      res.setMessage(e.getMessage());
×
411
      DataNodeConfigurationResp response = new DataNodeConfigurationResp();
×
412
      response.setStatus(res);
×
413
      return response;
×
414
    }
415
  }
416

417
  /**
418
   * Only leader use this interface.
419
   *
420
   * @return The number of registered DataNodes
421
   */
422
  public int getRegisteredDataNodeCount() {
423
    return nodeInfo.getRegisteredDataNodeCount();
×
424
  }
425

426
  /**
427
   * Only leader use this interface.
428
   *
429
   * @return All registered DataNodes
430
   */
431
  public List<TDataNodeConfiguration> getRegisteredDataNodes() {
432
    return nodeInfo.getRegisteredDataNodes();
×
433
  }
434

435
  /**
436
   * Only leader use this interface.
437
   *
438
   * <p>Notice: The result will be an empty TDataNodeConfiguration if the specified DataNode doesn't
439
   * register
440
   *
441
   * @param dataNodeId The specified DataNode's index
442
   * @return The specified registered DataNode
443
   */
444
  public TDataNodeConfiguration getRegisteredDataNode(int dataNodeId) {
445
    return nodeInfo.getRegisteredDataNode(dataNodeId);
×
446
  }
447

448
  public Map<Integer, TDataNodeLocation> getRegisteredDataNodeLocations() {
449
    Map<Integer, TDataNodeLocation> dataNodeLocations = new ConcurrentHashMap<>();
×
450
    nodeInfo
×
451
        .getRegisteredDataNodes()
×
452
        .forEach(
×
453
            dataNodeConfiguration ->
454
                dataNodeLocations.put(
×
455
                    dataNodeConfiguration.getLocation().getDataNodeId(),
×
456
                    dataNodeConfiguration.getLocation()));
×
457
    return dataNodeLocations;
×
458
  }
459

460
  public List<TDataNodeInfo> getRegisteredDataNodeInfoList() {
461
    List<TDataNodeInfo> dataNodeInfoList = new ArrayList<>();
×
462
    List<TDataNodeConfiguration> registeredDataNodes = this.getRegisteredDataNodes();
×
463
    if (registeredDataNodes != null) {
×
464
      registeredDataNodes.forEach(
×
465
          (registeredDataNode) -> {
466
            TDataNodeInfo dataNodeInfo = new TDataNodeInfo();
×
467
            int dataNodeId = registeredDataNode.getLocation().getDataNodeId();
×
468
            dataNodeInfo.setDataNodeId(dataNodeId);
×
469
            dataNodeInfo.setStatus(getLoadManager().getNodeStatusWithReason(dataNodeId));
×
470
            dataNodeInfo.setRpcAddresss(
×
471
                registeredDataNode.getLocation().getClientRpcEndPoint().getIp());
×
472
            dataNodeInfo.setRpcPort(
×
473
                registeredDataNode.getLocation().getClientRpcEndPoint().getPort());
×
474
            dataNodeInfo.setDataRegionNum(0);
×
475
            dataNodeInfo.setSchemaRegionNum(0);
×
476
            dataNodeInfo.setCpuCoreNum(registeredDataNode.getResource().getCpuCoreNum());
×
477
            dataNodeInfoList.add(dataNodeInfo);
×
478
          });
×
479
    }
480

481
    // Map<DataNodeId, DataRegionNum>
482
    Map<Integer, AtomicInteger> dataRegionNumMap = new HashMap<>();
×
483
    // Map<DataNodeId, SchemaRegionNum>
484
    Map<Integer, AtomicInteger> schemaRegionNumMap = new HashMap<>();
×
485
    List<TRegionReplicaSet> regionReplicaSets = getPartitionManager().getAllReplicaSets();
×
486
    regionReplicaSets.forEach(
×
487
        regionReplicaSet ->
488
            regionReplicaSet
×
489
                .getDataNodeLocations()
×
490
                .forEach(
×
491
                    dataNodeLocation -> {
492
                      switch (regionReplicaSet.getRegionId().getType()) {
×
493
                        case SchemaRegion:
494
                          schemaRegionNumMap
×
495
                              .computeIfAbsent(
×
496
                                  dataNodeLocation.getDataNodeId(), key -> new AtomicInteger())
×
497
                              .getAndIncrement();
×
498
                          break;
×
499
                        case DataRegion:
500
                        default:
501
                          dataRegionNumMap
×
502
                              .computeIfAbsent(
×
503
                                  dataNodeLocation.getDataNodeId(), key -> new AtomicInteger())
×
504
                              .getAndIncrement();
×
505
                      }
506
                    }));
×
507
    AtomicInteger zero = new AtomicInteger(0);
×
508
    dataNodeInfoList.forEach(
×
509
        (dataNodesInfo -> {
510
          dataNodesInfo.setSchemaRegionNum(
×
511
              schemaRegionNumMap.getOrDefault(dataNodesInfo.getDataNodeId(), zero).get());
×
512
          dataNodesInfo.setDataRegionNum(
×
513
              dataRegionNumMap.getOrDefault(dataNodesInfo.getDataNodeId(), zero).get());
×
514
        }));
×
515

516
    dataNodeInfoList.sort(Comparator.comparingInt(TDataNodeInfo::getDataNodeId));
×
517
    return dataNodeInfoList;
×
518
  }
519

520
  public List<TConfigNodeLocation> getRegisteredConfigNodes() {
521
    return nodeInfo.getRegisteredConfigNodes();
×
522
  }
523

524
  public Map<Integer, TNodeVersionInfo> getNodeVersionInfo() {
525
    return nodeInfo.getNodeVersionInfo();
×
526
  }
527

528
  public List<TConfigNodeInfo> getRegisteredConfigNodeInfoList() {
529
    List<TConfigNodeInfo> configNodeInfoList = new ArrayList<>();
×
530
    List<TConfigNodeLocation> registeredConfigNodes = this.getRegisteredConfigNodes();
×
531
    if (registeredConfigNodes != null) {
×
532
      registeredConfigNodes.forEach(
×
533
          (configNodeLocation) -> {
534
            TConfigNodeInfo info = new TConfigNodeInfo();
×
535
            int configNodeId = configNodeLocation.getConfigNodeId();
×
536
            info.setConfigNodeId(configNodeId);
×
537
            info.setStatus(getLoadManager().getNodeStatusWithReason(configNodeId));
×
538
            info.setInternalAddress(configNodeLocation.getInternalEndPoint().getIp());
×
539
            info.setInternalPort(configNodeLocation.getInternalEndPoint().getPort());
×
540
            info.setRoleType(
×
541
                configNodeLocation.getConfigNodeId() == ConfigNodeHeartbeatCache.CURRENT_NODE_ID
×
542
                    ? RegionRoleType.Leader.name()
×
543
                    : RegionRoleType.Follower.name());
×
544
            configNodeInfoList.add(info);
×
545
          });
×
546
    }
547
    configNodeInfoList.sort(Comparator.comparingInt(TConfigNodeInfo::getConfigNodeId));
×
548
    return configNodeInfoList;
×
549
  }
550

551
  /**
552
   * Only leader use this interface, record the new ConfigNode's information.
553
   *
554
   * @param configNodeLocation The new ConfigNode.
555
   * @param versionInfo The new ConfigNode's versionInfo.
556
   */
557
  public void applyConfigNode(
558
      TConfigNodeLocation configNodeLocation, TNodeVersionInfo versionInfo) {
559
    ApplyConfigNodePlan applyConfigNodePlan = new ApplyConfigNodePlan(configNodeLocation);
×
560
    try {
561
      getConsensusManager().write(applyConfigNodePlan);
×
562
    } catch (ConsensusException e) {
×
563
      LOGGER.warn(CONSENSUS_WRITE_ERROR, e);
×
564
    }
×
565
    UpdateVersionInfoPlan updateVersionInfoPlan =
×
566
        new UpdateVersionInfoPlan(versionInfo, configNodeLocation.getConfigNodeId());
×
567
    try {
568
      getConsensusManager().write(updateVersionInfoPlan);
×
569
    } catch (ConsensusException e) {
×
570
      LOGGER.warn(CONSENSUS_WRITE_ERROR, e);
×
571
    }
×
572
  }
×
573

574
  /**
575
   * Only leader use this interface, check the ConfigNode before remove it.
576
   *
577
   * @param removeConfigNodePlan RemoveConfigNodePlan
578
   */
579
  public TSStatus checkConfigNodeBeforeRemove(RemoveConfigNodePlan removeConfigNodePlan) {
580
    removeConfigNodeLock.lock();
×
581
    try {
582
      // Check OnlineConfigNodes number
583
      if (filterConfigNodeThroughStatus(NodeStatus.Running).size() <= 1) {
×
584
        return new TSStatus(TSStatusCode.REMOVE_CONFIGNODE_ERROR.getStatusCode())
×
585
            .setMessage(
×
586
                "Remove ConfigNode failed because there is only one ConfigNode in current Cluster.");
587
      }
588

589
      // Check whether the registeredConfigNodes contain the ConfigNode to be removed.
590
      if (!getRegisteredConfigNodes().contains(removeConfigNodePlan.getConfigNodeLocation())) {
×
591
        return new TSStatus(TSStatusCode.REMOVE_CONFIGNODE_ERROR.getStatusCode())
×
592
            .setMessage("Remove ConfigNode failed because the ConfigNode not in current Cluster.");
×
593
      }
594

595
      // Check whether the remove ConfigNode is leader
596
      TConfigNodeLocation leader = getConsensusManager().getLeader();
×
597
      if (leader == null) {
×
598
        return new TSStatus(TSStatusCode.REMOVE_CONFIGNODE_ERROR.getStatusCode())
×
599
            .setMessage(
×
600
                "Remove ConfigNode failed because the ConfigNodeGroup is on leader election, please retry.");
601
      }
602

603
      if (leader
×
604
          .getInternalEndPoint()
×
605
          .equals(removeConfigNodePlan.getConfigNodeLocation().getInternalEndPoint())) {
×
606
        // transfer leader
607
        return transferLeader(removeConfigNodePlan, getConsensusManager().getConsensusGroupId());
×
608
      }
609

610
    } finally {
611
      removeConfigNodeLock.unlock();
×
612
    }
613

614
    return new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode())
×
615
        .setMessage("Successfully remove confignode.");
×
616
  }
617

618
  private TSStatus transferLeader(
619
      RemoveConfigNodePlan removeConfigNodePlan, ConsensusGroupId groupId) {
620
    Optional<TConfigNodeLocation> optional =
×
621
        filterConfigNodeThroughStatus(NodeStatus.Running).stream()
×
622
            .filter(e -> !e.equals(removeConfigNodePlan.getConfigNodeLocation()))
×
623
            .findAny();
×
624
    TConfigNodeLocation newLeader = null;
×
625
    if (optional.isPresent()) {
×
626
      newLeader = optional.get();
×
627
    } else {
628
      return new TSStatus(TSStatusCode.TRANSFER_LEADER_ERROR.getStatusCode())
×
629
          .setMessage(
×
630
              "Transfer ConfigNode leader failed because can not find any running ConfigNode.");
631
    }
632
    try {
633
      getConsensusManager()
×
634
          .getConsensusImpl()
×
635
          .transferLeader(
×
636
              groupId,
637
              new Peer(groupId, newLeader.getConfigNodeId(), newLeader.getConsensusEndPoint()));
×
638
    } catch (ConsensusException e) {
×
639
      return new TSStatus(TSStatusCode.REMOVE_CONFIGNODE_ERROR.getStatusCode())
×
640
          .setMessage("Remove ConfigNode failed because transfer ConfigNode leader failed.");
×
641
    }
×
642
    return new TSStatus(TSStatusCode.REDIRECTION_RECOMMEND.getStatusCode())
×
643
        .setRedirectNode(newLeader.getInternalEndPoint())
×
644
        .setMessage(
×
645
            "The ConfigNode to be removed is leader, already transfer Leader to "
646
                + newLeader
647
                + ".");
648
  }
649

650
  public List<TSStatus> merge() {
651
    Map<Integer, TDataNodeLocation> dataNodeLocationMap =
×
652
        configManager.getNodeManager().getRegisteredDataNodeLocations();
×
653
    AsyncClientHandler<Object, TSStatus> clientHandler =
×
654
        new AsyncClientHandler<>(DataNodeRequestType.MERGE, dataNodeLocationMap);
655
    AsyncDataNodeClientPool.getInstance().sendAsyncRequestToDataNodeWithRetry(clientHandler);
×
656
    return clientHandler.getResponseList();
×
657
  }
658

659
  public List<TSStatus> flush(TFlushReq req) {
660
    Map<Integer, TDataNodeLocation> dataNodeLocationMap =
×
661
        configManager.getNodeManager().getRegisteredDataNodeLocations();
×
662
    AsyncClientHandler<TFlushReq, TSStatus> clientHandler =
×
663
        new AsyncClientHandler<>(DataNodeRequestType.FLUSH, req, dataNodeLocationMap);
664
    AsyncDataNodeClientPool.getInstance().sendAsyncRequestToDataNodeWithRetry(clientHandler);
×
665
    return clientHandler.getResponseList();
×
666
  }
667

668
  public List<TSStatus> clearCache() {
669
    Map<Integer, TDataNodeLocation> dataNodeLocationMap =
×
670
        configManager.getNodeManager().getRegisteredDataNodeLocations();
×
671
    AsyncClientHandler<Object, TSStatus> clientHandler =
×
672
        new AsyncClientHandler<>(DataNodeRequestType.CLEAR_CACHE, dataNodeLocationMap);
673
    AsyncDataNodeClientPool.getInstance().sendAsyncRequestToDataNodeWithRetry(clientHandler);
×
674
    return clientHandler.getResponseList();
×
675
  }
676

677
  public List<TSStatus> loadConfiguration() {
678
    Map<Integer, TDataNodeLocation> dataNodeLocationMap =
×
679
        configManager.getNodeManager().getRegisteredDataNodeLocations();
×
680
    AsyncClientHandler<Object, TSStatus> clientHandler =
×
681
        new AsyncClientHandler<>(DataNodeRequestType.LOAD_CONFIGURATION, dataNodeLocationMap);
682
    AsyncDataNodeClientPool.getInstance().sendAsyncRequestToDataNodeWithRetry(clientHandler);
×
683
    return clientHandler.getResponseList();
×
684
  }
685

686
  public List<TSStatus> setSystemStatus(String status) {
687
    Map<Integer, TDataNodeLocation> dataNodeLocationMap =
×
688
        configManager.getNodeManager().getRegisteredDataNodeLocations();
×
689
    AsyncClientHandler<String, TSStatus> clientHandler =
×
690
        new AsyncClientHandler<>(
691
            DataNodeRequestType.SET_SYSTEM_STATUS, status, dataNodeLocationMap);
692
    AsyncDataNodeClientPool.getInstance().sendAsyncRequestToDataNodeWithRetry(clientHandler);
×
693
    return clientHandler.getResponseList();
×
694
  }
695

696
  public TSStatus setDataNodeStatus(TSetDataNodeStatusReq setDataNodeStatusReq) {
697
    return SyncDataNodeClientPool.getInstance()
×
698
        .sendSyncRequestToDataNodeWithRetry(
×
699
            setDataNodeStatusReq.getTargetDataNode().getInternalEndPoint(),
×
700
            setDataNodeStatusReq.getStatus(),
×
701
            DataNodeRequestType.SET_SYSTEM_STATUS);
702
  }
703

704
  /**
705
   * Kill read on DataNode.
706
   *
707
   * @param queryId the id of specific read need to be killed, it will be NULL if kill all queries
708
   * @param dataNodeId the DataNode obtains target read, -1 means we will kill all queries on all
709
   *     DataNodes
710
   */
711
  public TSStatus killQuery(String queryId, int dataNodeId) {
712
    if (dataNodeId < 0) {
×
713
      return killAllQueries();
×
714
    } else {
715
      return killSpecificQuery(queryId, getRegisteredDataNodeLocations().get(dataNodeId));
×
716
    }
717
  }
718

719
  private TSStatus killAllQueries() {
720
    Map<Integer, TDataNodeLocation> dataNodeLocationMap =
×
721
        configManager.getNodeManager().getRegisteredDataNodeLocations();
×
722
    AsyncClientHandler<String, TSStatus> clientHandler =
×
723
        new AsyncClientHandler<>(DataNodeRequestType.KILL_QUERY_INSTANCE, dataNodeLocationMap);
724
    AsyncDataNodeClientPool.getInstance().sendAsyncRequestToDataNodeWithRetry(clientHandler);
×
725
    return RpcUtils.squashResponseStatusList(clientHandler.getResponseList());
×
726
  }
727

728
  private TSStatus killSpecificQuery(String queryId, TDataNodeLocation dataNodeLocation) {
729
    if (dataNodeLocation == null) {
×
730
      return new TSStatus(TSStatusCode.INTERNAL_SERVER_ERROR.getStatusCode())
×
731
          .setMessage(
×
732
              "The target DataNode is not existed, please ensure your input <queryId> is correct");
733
    } else {
734
      return SyncDataNodeClientPool.getInstance()
×
735
          .sendSyncRequestToDataNodeWithRetry(
×
736
              dataNodeLocation.getInternalEndPoint(),
×
737
              queryId,
738
              DataNodeRequestType.KILL_QUERY_INSTANCE);
739
    }
740
  }
741

742
  /**
743
   * Filter ConfigNodes through the specified NodeStatus.
744
   *
745
   * @param status The specified NodeStatus
746
   * @return Filtered ConfigNodes with the specified NodeStatus
747
   */
748
  public List<TConfigNodeLocation> filterConfigNodeThroughStatus(NodeStatus... status) {
749
    return nodeInfo.getRegisteredConfigNodes(
×
750
        getLoadManager().filterConfigNodeThroughStatus(status));
×
751
  }
752

753
  /**
754
   * Filter DataNodes through the specified NodeStatus.
755
   *
756
   * @param status The specified NodeStatus
757
   * @return Filtered DataNodes with the specified NodeStatus
758
   */
759
  public List<TDataNodeConfiguration> filterDataNodeThroughStatus(NodeStatus... status) {
760
    return nodeInfo.getRegisteredDataNodes(getLoadManager().filterDataNodeThroughStatus(status));
×
761
  }
762

763
  /**
764
   * Get the DataNodeLocation of the DataNode which has the lowest loadScore.
765
   *
766
   * @return TDataNodeLocation with the lowest loadScore
767
   */
768
  public Optional<TDataNodeLocation> getLowestLoadDataNode() {
769
    // TODO get real lowest load data node after scoring algorithm being implemented
770
    int dataNodeId = getLoadManager().getLowestLoadDataNode();
×
771
    return dataNodeId < 0
×
772
        ? Optional.empty()
×
773
        : Optional.of(getRegisteredDataNode(dataNodeId).getLocation());
×
774
  }
775

776
  /**
777
   * Get the DataNodeLocation which has the lowest loadScore within input.
778
   *
779
   * @return TDataNodeLocation with the lowest loadScore
780
   */
781
  public TDataNodeLocation getLowestLoadDataNode(Set<Integer> nodes) {
782
    int dataNodeId = getLoadManager().getLowestLoadDataNode(new ArrayList<>(nodes));
×
783
    return getRegisteredDataNode(dataNodeId).getLocation();
×
784
  }
785

786
  private ConsensusManager getConsensusManager() {
787
    return configManager.getConsensusManager();
×
788
  }
789

790
  private ClusterSchemaManager getClusterSchemaManager() {
791
    return configManager.getClusterSchemaManager();
×
792
  }
793

794
  private PartitionManager getPartitionManager() {
795
    return configManager.getPartitionManager();
×
796
  }
797

798
  private LoadManager getLoadManager() {
799
    return configManager.getLoadManager();
×
800
  }
801

802
  private TriggerManager getTriggerManager() {
803
    return configManager.getTriggerManager();
×
804
  }
805

806
  private PipeManager getPipeManager() {
807
    return configManager.getPipeManager();
×
808
  }
809

810
  private UDFManager getUDFManager() {
811
    return configManager.getUDFManager();
×
812
  }
813
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc