• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

apache / iotdb / #9905

23 Aug 2023 06:20AM UTC coverage: 47.785% (-0.1%) from 47.922%
#9905

push

travis_ci

web-flow
[To rel/1.2][Metric] Fix flush point statistics (#10934)

23 of 23 new or added lines in 1 file covered. (100.0%)

79851 of 167106 relevant lines covered (47.78%)

0.48 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

0.0
/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/node/NodeManager.java
1
/*
2
 * Licensed to the Apache Software Foundation (ASF) under one
3
 * or more contributor license agreements.  See the NOTICE file
4
 * distributed with this work for additional information
5
 * regarding copyright ownership.  The ASF licenses this file
6
 * to you under the Apache License, Version 2.0 (the
7
 * "License"); you may not use this file except in compliance
8
 * with the License.  You may obtain a copy of the License at
9
 *
10
 *     http://www.apache.org/licenses/LICENSE-2.0
11
 *
12
 * Unless required by applicable law or agreed to in writing,
13
 * software distributed under the License is distributed on an
14
 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15
 * KIND, either express or implied.  See the License for the
16
 * specific language governing permissions and limitations
17
 * under the License.
18
 */
19

20
package org.apache.iotdb.confignode.manager.node;
21

22
import org.apache.iotdb.common.rpc.thrift.TConfigNodeLocation;
23
import org.apache.iotdb.common.rpc.thrift.TDataNodeConfiguration;
24
import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation;
25
import org.apache.iotdb.common.rpc.thrift.TFlushReq;
26
import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet;
27
import org.apache.iotdb.common.rpc.thrift.TSStatus;
28
import org.apache.iotdb.commons.cluster.NodeStatus;
29
import org.apache.iotdb.commons.cluster.RegionRoleType;
30
import org.apache.iotdb.commons.conf.CommonConfig;
31
import org.apache.iotdb.commons.conf.CommonDescriptor;
32
import org.apache.iotdb.commons.consensus.ConsensusGroupId;
33
import org.apache.iotdb.commons.service.metric.MetricService;
34
import org.apache.iotdb.confignode.client.DataNodeRequestType;
35
import org.apache.iotdb.confignode.client.async.AsyncDataNodeClientPool;
36
import org.apache.iotdb.confignode.client.async.handlers.AsyncClientHandler;
37
import org.apache.iotdb.confignode.client.sync.SyncDataNodeClientPool;
38
import org.apache.iotdb.confignode.conf.ConfigNodeConfig;
39
import org.apache.iotdb.confignode.conf.ConfigNodeDescriptor;
40
import org.apache.iotdb.confignode.consensus.request.read.datanode.GetDataNodeConfigurationPlan;
41
import org.apache.iotdb.confignode.consensus.request.write.confignode.ApplyConfigNodePlan;
42
import org.apache.iotdb.confignode.consensus.request.write.confignode.RemoveConfigNodePlan;
43
import org.apache.iotdb.confignode.consensus.request.write.confignode.UpdateVersionInfoPlan;
44
import org.apache.iotdb.confignode.consensus.request.write.datanode.RegisterDataNodePlan;
45
import org.apache.iotdb.confignode.consensus.request.write.datanode.RemoveDataNodePlan;
46
import org.apache.iotdb.confignode.consensus.request.write.datanode.UpdateDataNodePlan;
47
import org.apache.iotdb.confignode.consensus.response.datanode.ConfigurationResp;
48
import org.apache.iotdb.confignode.consensus.response.datanode.DataNodeConfigurationResp;
49
import org.apache.iotdb.confignode.consensus.response.datanode.DataNodeRegisterResp;
50
import org.apache.iotdb.confignode.consensus.response.datanode.DataNodeToStatusResp;
51
import org.apache.iotdb.confignode.manager.ConfigManager;
52
import org.apache.iotdb.confignode.manager.IManager;
53
import org.apache.iotdb.confignode.manager.TriggerManager;
54
import org.apache.iotdb.confignode.manager.UDFManager;
55
import org.apache.iotdb.confignode.manager.consensus.ConsensusManager;
56
import org.apache.iotdb.confignode.manager.load.LoadManager;
57
import org.apache.iotdb.confignode.manager.load.cache.node.ConfigNodeHeartbeatCache;
58
import org.apache.iotdb.confignode.manager.partition.PartitionManager;
59
import org.apache.iotdb.confignode.manager.partition.PartitionMetrics;
60
import org.apache.iotdb.confignode.manager.pipe.PipeManager;
61
import org.apache.iotdb.confignode.manager.schema.ClusterSchemaManager;
62
import org.apache.iotdb.confignode.persistence.node.NodeInfo;
63
import org.apache.iotdb.confignode.procedure.env.DataNodeRemoveHandler;
64
import org.apache.iotdb.confignode.rpc.thrift.TCQConfig;
65
import org.apache.iotdb.confignode.rpc.thrift.TConfigNodeInfo;
66
import org.apache.iotdb.confignode.rpc.thrift.TConfigNodeRegisterReq;
67
import org.apache.iotdb.confignode.rpc.thrift.TConfigNodeRegisterResp;
68
import org.apache.iotdb.confignode.rpc.thrift.TDataNodeInfo;
69
import org.apache.iotdb.confignode.rpc.thrift.TDataNodeRegisterReq;
70
import org.apache.iotdb.confignode.rpc.thrift.TDataNodeRestartReq;
71
import org.apache.iotdb.confignode.rpc.thrift.TDataNodeRestartResp;
72
import org.apache.iotdb.confignode.rpc.thrift.TGlobalConfig;
73
import org.apache.iotdb.confignode.rpc.thrift.TNodeVersionInfo;
74
import org.apache.iotdb.confignode.rpc.thrift.TRatisConfig;
75
import org.apache.iotdb.confignode.rpc.thrift.TRuntimeConfiguration;
76
import org.apache.iotdb.confignode.rpc.thrift.TSetDataNodeStatusReq;
77
import org.apache.iotdb.consensus.common.DataSet;
78
import org.apache.iotdb.consensus.common.Peer;
79
import org.apache.iotdb.consensus.exception.ConsensusException;
80
import org.apache.iotdb.rpc.RpcUtils;
81
import org.apache.iotdb.rpc.TSStatusCode;
82

83
import org.slf4j.Logger;
84
import org.slf4j.LoggerFactory;
85

86
import java.util.ArrayList;
87
import java.util.Comparator;
88
import java.util.HashMap;
89
import java.util.List;
90
import java.util.Map;
91
import java.util.Optional;
92
import java.util.Set;
93
import java.util.concurrent.ConcurrentHashMap;
94
import java.util.concurrent.atomic.AtomicInteger;
95
import java.util.concurrent.locks.ReentrantLock;
96

97
/** {@link NodeManager} manages cluster node addition and removal requests. */
98
public class NodeManager {
99

100
  private static final Logger LOGGER = LoggerFactory.getLogger(NodeManager.class);
×
101

102
  private static final ConfigNodeConfig CONF = ConfigNodeDescriptor.getInstance().getConf();
×
103
  public static final long HEARTBEAT_INTERVAL = CONF.getHeartbeatIntervalInMs();
×
104

105
  private final IManager configManager;
106
  private final NodeInfo nodeInfo;
107

108
  private final ReentrantLock removeConfigNodeLock;
109

110
  private static final String CONSENSUS_WRITE_ERROR =
111
      "Failed in the write API executing the consensus layer due to: ";
112

113
  public NodeManager(IManager configManager, NodeInfo nodeInfo) {
×
114
    this.configManager = configManager;
×
115
    this.nodeInfo = nodeInfo;
×
116
    this.removeConfigNodeLock = new ReentrantLock();
×
117
  }
×
118

119
  /**
120
   * Get system configurations.
121
   *
122
   * @return ConfigurationResp. The TSStatus will be set to SUCCESS_STATUS.
123
   */
124
  public DataSet getSystemConfiguration() {
125
    ConfigurationResp dataSet = new ConfigurationResp();
×
126
    dataSet.setStatus(RpcUtils.getStatus(TSStatusCode.SUCCESS_STATUS));
×
127
    setGlobalConfig(dataSet);
×
128
    setRatisConfig(dataSet);
×
129
    setCQConfig(dataSet);
×
130
    return dataSet;
×
131
  }
132

133
  private void setGlobalConfig(ConfigurationResp dataSet) {
134
    // Set TGlobalConfig
135
    final ConfigNodeConfig configNodeConfig = ConfigNodeDescriptor.getInstance().getConf();
×
136
    final CommonConfig commonConfig = CommonDescriptor.getInstance().getConfig();
×
137
    TGlobalConfig globalConfig = new TGlobalConfig();
×
138
    globalConfig.setDataRegionConsensusProtocolClass(
×
139
        configNodeConfig.getDataRegionConsensusProtocolClass());
×
140
    globalConfig.setSchemaRegionConsensusProtocolClass(
×
141
        configNodeConfig.getSchemaRegionConsensusProtocolClass());
×
142
    globalConfig.setSeriesPartitionSlotNum(configNodeConfig.getSeriesSlotNum());
×
143
    globalConfig.setSeriesPartitionExecutorClass(
×
144
        configNodeConfig.getSeriesPartitionExecutorClass());
×
145
    globalConfig.setTimePartitionInterval(commonConfig.getTimePartitionInterval());
×
146
    globalConfig.setReadConsistencyLevel(configNodeConfig.getReadConsistencyLevel());
×
147
    globalConfig.setDiskSpaceWarningThreshold(commonConfig.getDiskSpaceWarningThreshold());
×
148
    globalConfig.setTimestampPrecision(commonConfig.getTimestampPrecision());
×
149
    globalConfig.setSchemaEngineMode(commonConfig.getSchemaEngineMode());
×
150
    globalConfig.setTagAttributeTotalSize(commonConfig.getTagAttributeTotalSize());
×
151
    dataSet.setGlobalConfig(globalConfig);
×
152
  }
×
153

154
  private void setRatisConfig(ConfigurationResp dataSet) {
155
    ConfigNodeConfig conf = ConfigNodeDescriptor.getInstance().getConf();
×
156
    TRatisConfig ratisConfig = new TRatisConfig();
×
157

158
    ratisConfig.setDataAppenderBufferSize(conf.getDataRegionRatisConsensusLogAppenderBufferSize());
×
159
    ratisConfig.setSchemaAppenderBufferSize(
×
160
        conf.getSchemaRegionRatisConsensusLogAppenderBufferSize());
×
161

162
    ratisConfig.setDataSnapshotTriggerThreshold(conf.getDataRegionRatisSnapshotTriggerThreshold());
×
163
    ratisConfig.setSchemaSnapshotTriggerThreshold(
×
164
        conf.getSchemaRegionRatisSnapshotTriggerThreshold());
×
165

166
    ratisConfig.setDataLogUnsafeFlushEnable(conf.isDataRegionRatisLogUnsafeFlushEnable());
×
167
    ratisConfig.setSchemaLogUnsafeFlushEnable(conf.isSchemaRegionRatisLogUnsafeFlushEnable());
×
168
    ratisConfig.setDataRegionLogForceSyncNum(conf.getDataRegionRatisLogForceSyncNum());
×
169

170
    ratisConfig.setDataLogSegmentSizeMax(conf.getDataRegionRatisLogSegmentSizeMax());
×
171
    ratisConfig.setSchemaLogSegmentSizeMax(conf.getSchemaRegionRatisLogSegmentSizeMax());
×
172

173
    ratisConfig.setDataGrpcFlowControlWindow(conf.getDataRegionRatisGrpcFlowControlWindow());
×
174
    ratisConfig.setSchemaGrpcFlowControlWindow(conf.getSchemaRegionRatisGrpcFlowControlWindow());
×
175
    ratisConfig.setDataRegionGrpcLeaderOutstandingAppendsMax(
×
176
        conf.getDataRegionRatisGrpcLeaderOutstandingAppendsMax());
×
177

178
    ratisConfig.setDataLeaderElectionTimeoutMin(
×
179
        conf.getDataRegionRatisRpcLeaderElectionTimeoutMinMs());
×
180
    ratisConfig.setSchemaLeaderElectionTimeoutMin(
×
181
        conf.getSchemaRegionRatisRpcLeaderElectionTimeoutMinMs());
×
182

183
    ratisConfig.setDataLeaderElectionTimeoutMax(
×
184
        conf.getDataRegionRatisRpcLeaderElectionTimeoutMaxMs());
×
185
    ratisConfig.setSchemaLeaderElectionTimeoutMax(
×
186
        conf.getSchemaRegionRatisRpcLeaderElectionTimeoutMaxMs());
×
187

188
    ratisConfig.setDataRequestTimeout(conf.getDataRegionRatisRequestTimeoutMs());
×
189
    ratisConfig.setSchemaRequestTimeout(conf.getSchemaRegionRatisRequestTimeoutMs());
×
190

191
    ratisConfig.setDataMaxRetryAttempts(conf.getDataRegionRatisMaxRetryAttempts());
×
192
    ratisConfig.setDataInitialSleepTime(conf.getDataRegionRatisInitialSleepTimeMs());
×
193
    ratisConfig.setDataMaxSleepTime(conf.getDataRegionRatisMaxSleepTimeMs());
×
194
    ratisConfig.setSchemaMaxRetryAttempts(conf.getSchemaRegionRatisMaxRetryAttempts());
×
195
    ratisConfig.setSchemaInitialSleepTime(conf.getSchemaRegionRatisInitialSleepTimeMs());
×
196
    ratisConfig.setSchemaMaxSleepTime(conf.getSchemaRegionRatisMaxSleepTimeMs());
×
197

198
    ratisConfig.setSchemaPreserveWhenPurge(conf.getSchemaRegionRatisPreserveLogsWhenPurge());
×
199
    ratisConfig.setDataPreserveWhenPurge(conf.getDataRegionRatisPreserveLogsWhenPurge());
×
200

201
    ratisConfig.setFirstElectionTimeoutMin(conf.getRatisFirstElectionTimeoutMinMs());
×
202
    ratisConfig.setFirstElectionTimeoutMax(conf.getRatisFirstElectionTimeoutMaxMs());
×
203

204
    ratisConfig.setSchemaRegionRatisLogMax(conf.getSchemaRegionRatisLogMax());
×
205
    ratisConfig.setDataRegionRatisLogMax(conf.getDataRegionRatisLogMax());
×
206

207
    dataSet.setRatisConfig(ratisConfig);
×
208
  }
×
209

210
  private void setCQConfig(ConfigurationResp dataSet) {
211
    final ConfigNodeConfig conf = ConfigNodeDescriptor.getInstance().getConf();
×
212
    TCQConfig cqConfig = new TCQConfig();
×
213
    cqConfig.setCqMinEveryIntervalInMs(conf.getCqMinEveryIntervalInMs());
×
214

215
    dataSet.setCqConfig(cqConfig);
×
216
  }
×
217

218
  private TRuntimeConfiguration getRuntimeConfiguration() {
219
    getPipeManager().getPipePluginCoordinator().lock();
×
220
    getTriggerManager().getTriggerInfo().acquireTriggerTableLock();
×
221
    getUDFManager().getUdfInfo().acquireUDFTableLock();
×
222
    try {
223
      final TRuntimeConfiguration runtimeConfiguration = new TRuntimeConfiguration();
×
224
      runtimeConfiguration.setTemplateInfo(getClusterSchemaManager().getAllTemplateSetInfo());
×
225
      runtimeConfiguration.setAllTriggerInformation(
×
226
          getTriggerManager().getTriggerTable(false).getAllTriggerInformation());
×
227
      runtimeConfiguration.setAllUDFInformation(
×
228
          getUDFManager().getUDFTable().getAllUDFInformation());
×
229
      runtimeConfiguration.setAllPipeInformation(
×
230
          getPipeManager().getPipePluginCoordinator().getPipePluginTable().getAllPipePluginMeta());
×
231
      runtimeConfiguration.setAllTTLInformation(
×
232
          DataNodeRegisterResp.convertAllTTLInformation(getClusterSchemaManager().getAllTTLInfo()));
×
233
      return runtimeConfiguration;
×
234
    } finally {
235
      getTriggerManager().getTriggerInfo().releaseTriggerTableLock();
×
236
      getUDFManager().getUdfInfo().releaseUDFTableLock();
×
237
      getPipeManager().getPipePluginCoordinator().unlock();
×
238
    }
239
  }
240

241
  /**
242
   * Register DataNode.
243
   *
244
   * @param req TDataNodeRegisterReq
245
   * @return DataNodeConfigurationDataSet. The {@link TSStatus} will be set to {@link
246
   *     TSStatusCode#SUCCESS_STATUS} when register success.
247
   */
248
  public DataSet registerDataNode(TDataNodeRegisterReq req) {
249
    int dataNodeId = nodeInfo.generateNextNodeId();
×
250

251
    RegisterDataNodePlan registerDataNodePlan =
×
252
        new RegisterDataNodePlan(req.getDataNodeConfiguration());
×
253
    // Register new DataNode
254
    registerDataNodePlan.getDataNodeConfiguration().getLocation().setDataNodeId(dataNodeId);
×
255
    try {
256
      getConsensusManager().write(registerDataNodePlan);
×
257
    } catch (ConsensusException e) {
×
258
      LOGGER.warn(CONSENSUS_WRITE_ERROR, e);
×
259
    }
×
260

261
    // update datanode's versionInfo
262
    UpdateVersionInfoPlan updateVersionInfoPlan =
×
263
        new UpdateVersionInfoPlan(req.getVersionInfo(), dataNodeId);
×
264
    try {
265
      getConsensusManager().write(updateVersionInfoPlan);
×
266
    } catch (ConsensusException e) {
×
267
      LOGGER.warn(CONSENSUS_WRITE_ERROR, e);
×
268
    }
×
269

270
    // Bind DataNode metrics
271
    PartitionMetrics.bindDataNodePartitionMetrics(
×
272
        MetricService.getInstance(), configManager, dataNodeId);
×
273

274
    // Adjust the maximum RegionGroup number of each StorageGroup
275
    getClusterSchemaManager().adjustMaxRegionGroupNum();
×
276

277
    DataNodeRegisterResp resp = new DataNodeRegisterResp();
×
278

279
    resp.setStatus(ClusterNodeStartUtils.ACCEPT_NODE_REGISTRATION);
×
280
    resp.setConfigNodeList(getRegisteredConfigNodes());
×
281
    resp.setDataNodeId(
×
282
        registerDataNodePlan.getDataNodeConfiguration().getLocation().getDataNodeId());
×
283
    resp.setRuntimeConfiguration(getRuntimeConfiguration());
×
284
    return resp;
×
285
  }
286

287
  public TDataNodeRestartResp updateDataNodeIfNecessary(TDataNodeRestartReq req) {
288
    int nodeId = req.getDataNodeConfiguration().getLocation().getDataNodeId();
×
289
    TDataNodeConfiguration dataNodeConfiguration = getRegisteredDataNode(nodeId);
×
290
    if (!req.getDataNodeConfiguration().equals(dataNodeConfiguration)) {
×
291
      // Update DataNodeConfiguration when modified during restart
292
      UpdateDataNodePlan updateDataNodePlan =
×
293
          new UpdateDataNodePlan(req.getDataNodeConfiguration());
×
294
      try {
295
        getConsensusManager().write(updateDataNodePlan);
×
296
      } catch (ConsensusException e) {
×
297
        LOGGER.warn(CONSENSUS_WRITE_ERROR, e);
×
298
      }
×
299
    }
300
    TNodeVersionInfo versionInfo = nodeInfo.getVersionInfo(nodeId);
×
301
    if (!req.getVersionInfo().equals(versionInfo)) {
×
302
      // Update versionInfo when modified during restart
303
      UpdateVersionInfoPlan updateVersionInfoPlan =
×
304
          new UpdateVersionInfoPlan(req.getVersionInfo(), nodeId);
×
305
      try {
306
        getConsensusManager().write(updateVersionInfoPlan);
×
307
      } catch (ConsensusException e) {
×
308
        LOGGER.warn(CONSENSUS_WRITE_ERROR, e);
×
309
      }
×
310
    }
311

312
    TDataNodeRestartResp resp = new TDataNodeRestartResp();
×
313
    resp.setStatus(ClusterNodeStartUtils.ACCEPT_NODE_RESTART);
×
314
    resp.setConfigNodeList(getRegisteredConfigNodes());
×
315
    resp.setRuntimeConfiguration(getRuntimeConfiguration());
×
316
    return resp;
×
317
  }
318

319
  /**
320
   * Remove DataNodes.
321
   *
322
   * @param removeDataNodePlan removeDataNodePlan
323
   * @return DataNodeToStatusResp, The TSStatus will be SUCCEED_STATUS if the request is accepted,
324
   *     DATANODE_NOT_EXIST when some datanode does not exist.
325
   */
326
  public DataSet removeDataNode(RemoveDataNodePlan removeDataNodePlan) {
327
    LOGGER.info("NodeManager start to remove DataNode {}", removeDataNodePlan);
×
328

329
    DataNodeRemoveHandler dataNodeRemoveHandler =
×
330
        new DataNodeRemoveHandler((ConfigManager) configManager);
331
    DataNodeToStatusResp preCheckStatus =
×
332
        dataNodeRemoveHandler.checkRemoveDataNodeRequest(removeDataNodePlan);
×
333
    if (preCheckStatus.getStatus().getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
×
334
      LOGGER.error(
×
335
          "The remove DataNode request check failed. req: {}, check result: {}",
336
          removeDataNodePlan,
337
          preCheckStatus.getStatus());
×
338
      return preCheckStatus;
×
339
    }
340

341
    // Do transfer of the DataNodes before remove
342
    DataNodeToStatusResp dataSet = new DataNodeToStatusResp();
×
343
    if (configManager.transfer(removeDataNodePlan.getDataNodeLocations()).getCode()
×
344
        != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
×
345
      dataSet.setStatus(
×
346
          new TSStatus(TSStatusCode.REMOVE_DATANODE_ERROR.getStatusCode())
×
347
              .setMessage("Fail to do transfer of the DataNodes"));
×
348
      return dataSet;
×
349
    }
350

351
    // Add request to queue, then return to client
352
    boolean removeSucceed = configManager.getProcedureManager().removeDataNode(removeDataNodePlan);
×
353
    TSStatus status;
354
    if (removeSucceed) {
×
355
      status = new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode());
×
356
      status.setMessage("Server accepted the request");
×
357
    } else {
358
      status = new TSStatus(TSStatusCode.REMOVE_DATANODE_ERROR.getStatusCode());
×
359
      status.setMessage("Server rejected the request, maybe requests are too many");
×
360
    }
361
    dataSet.setStatus(status);
×
362

363
    LOGGER.info(
×
364
        "NodeManager submit RemoveDataNodePlan finished, removeDataNodePlan: {}",
365
        removeDataNodePlan);
366
    return dataSet;
×
367
  }
368

369
  public TConfigNodeRegisterResp registerConfigNode(TConfigNodeRegisterReq req) {
370
    int nodeId = nodeInfo.generateNextNodeId();
×
371
    req.getConfigNodeLocation().setConfigNodeId(nodeId);
×
372
    configManager.getProcedureManager().addConfigNode(req);
×
373
    return new TConfigNodeRegisterResp()
×
374
        .setStatus(ClusterNodeStartUtils.ACCEPT_NODE_REGISTRATION)
×
375
        .setConfigNodeId(nodeId);
×
376
  }
377

378
  public TSStatus updateConfigNodeIfNecessary(int configNodeId, TNodeVersionInfo versionInfo) {
379
    TNodeVersionInfo recordVersionInfo = nodeInfo.getVersionInfo(configNodeId);
×
380
    if (!recordVersionInfo.equals(versionInfo)) {
×
381
      // Update versionInfo when modified during restart
382
      UpdateVersionInfoPlan updateConfigNodePlan =
×
383
          new UpdateVersionInfoPlan(versionInfo, configNodeId);
384
      try {
385
        return getConsensusManager().write(updateConfigNodePlan);
×
386
      } catch (ConsensusException e) {
×
387
        return new TSStatus(TSStatusCode.CONSENSUS_NOT_INITIALIZED.getStatusCode());
×
388
      }
389
    }
390
    return ClusterNodeStartUtils.ACCEPT_NODE_RESTART;
×
391
  }
392

393
  /**
394
   * Get TDataNodeConfiguration.
395
   *
396
   * @param req GetDataNodeConfigurationPlan
397
   * @return The specific DataNode's configuration or all DataNodes' configuration if dataNodeId in
398
   *     GetDataNodeConfigurationPlan is -1
399
   */
400
  public DataNodeConfigurationResp getDataNodeConfiguration(GetDataNodeConfigurationPlan req) {
401
    try {
402
      return (DataNodeConfigurationResp) getConsensusManager().read(req);
×
403
    } catch (ConsensusException e) {
×
404
      LOGGER.warn("Failed in the read API executing the consensus layer due to: ", e);
×
405
      TSStatus res = new TSStatus(TSStatusCode.EXECUTE_STATEMENT_ERROR.getStatusCode());
×
406
      res.setMessage(e.getMessage());
×
407
      DataNodeConfigurationResp response = new DataNodeConfigurationResp();
×
408
      response.setStatus(res);
×
409
      return response;
×
410
    }
411
  }
412

413
  /**
414
   * Only leader use this interface.
415
   *
416
   * @return The number of registered DataNodes
417
   */
418
  public int getRegisteredDataNodeCount() {
419
    return nodeInfo.getRegisteredDataNodeCount();
×
420
  }
421

422
  /**
423
   * Only leader use this interface.
424
   *
425
   * @return All registered DataNodes
426
   */
427
  public List<TDataNodeConfiguration> getRegisteredDataNodes() {
428
    return nodeInfo.getRegisteredDataNodes();
×
429
  }
430

431
  /**
432
   * Only leader use this interface.
433
   *
434
   * <p>Notice: The result will be an empty TDataNodeConfiguration if the specified DataNode doesn't
435
   * register
436
   *
437
   * @param dataNodeId The specified DataNode's index
438
   * @return The specified registered DataNode
439
   */
440
  public TDataNodeConfiguration getRegisteredDataNode(int dataNodeId) {
441
    return nodeInfo.getRegisteredDataNode(dataNodeId);
×
442
  }
443

444
  public Map<Integer, TDataNodeLocation> getRegisteredDataNodeLocations() {
445
    Map<Integer, TDataNodeLocation> dataNodeLocations = new ConcurrentHashMap<>();
×
446
    nodeInfo
×
447
        .getRegisteredDataNodes()
×
448
        .forEach(
×
449
            dataNodeConfiguration ->
450
                dataNodeLocations.put(
×
451
                    dataNodeConfiguration.getLocation().getDataNodeId(),
×
452
                    dataNodeConfiguration.getLocation()));
×
453
    return dataNodeLocations;
×
454
  }
455

456
  public List<TDataNodeInfo> getRegisteredDataNodeInfoList() {
457
    List<TDataNodeInfo> dataNodeInfoList = new ArrayList<>();
×
458
    List<TDataNodeConfiguration> registeredDataNodes = this.getRegisteredDataNodes();
×
459
    if (registeredDataNodes != null) {
×
460
      registeredDataNodes.forEach(
×
461
          (registeredDataNode) -> {
462
            TDataNodeInfo dataNodeInfo = new TDataNodeInfo();
×
463
            int dataNodeId = registeredDataNode.getLocation().getDataNodeId();
×
464
            dataNodeInfo.setDataNodeId(dataNodeId);
×
465
            dataNodeInfo.setStatus(getLoadManager().getNodeStatusWithReason(dataNodeId));
×
466
            dataNodeInfo.setRpcAddresss(
×
467
                registeredDataNode.getLocation().getClientRpcEndPoint().getIp());
×
468
            dataNodeInfo.setRpcPort(
×
469
                registeredDataNode.getLocation().getClientRpcEndPoint().getPort());
×
470
            dataNodeInfo.setDataRegionNum(0);
×
471
            dataNodeInfo.setSchemaRegionNum(0);
×
472
            dataNodeInfo.setCpuCoreNum(registeredDataNode.getResource().getCpuCoreNum());
×
473
            dataNodeInfoList.add(dataNodeInfo);
×
474
          });
×
475
    }
476

477
    // Map<DataNodeId, DataRegionNum>
478
    Map<Integer, AtomicInteger> dataRegionNumMap = new HashMap<>();
×
479
    // Map<DataNodeId, SchemaRegionNum>
480
    Map<Integer, AtomicInteger> schemaRegionNumMap = new HashMap<>();
×
481
    List<TRegionReplicaSet> regionReplicaSets = getPartitionManager().getAllReplicaSets();
×
482
    regionReplicaSets.forEach(
×
483
        regionReplicaSet ->
484
            regionReplicaSet
×
485
                .getDataNodeLocations()
×
486
                .forEach(
×
487
                    dataNodeLocation -> {
488
                      switch (regionReplicaSet.getRegionId().getType()) {
×
489
                        case SchemaRegion:
490
                          schemaRegionNumMap
×
491
                              .computeIfAbsent(
×
492
                                  dataNodeLocation.getDataNodeId(), key -> new AtomicInteger())
×
493
                              .getAndIncrement();
×
494
                          break;
×
495
                        case DataRegion:
496
                        default:
497
                          dataRegionNumMap
×
498
                              .computeIfAbsent(
×
499
                                  dataNodeLocation.getDataNodeId(), key -> new AtomicInteger())
×
500
                              .getAndIncrement();
×
501
                      }
502
                    }));
×
503
    AtomicInteger zero = new AtomicInteger(0);
×
504
    dataNodeInfoList.forEach(
×
505
        (dataNodesInfo -> {
506
          dataNodesInfo.setSchemaRegionNum(
×
507
              schemaRegionNumMap.getOrDefault(dataNodesInfo.getDataNodeId(), zero).get());
×
508
          dataNodesInfo.setDataRegionNum(
×
509
              dataRegionNumMap.getOrDefault(dataNodesInfo.getDataNodeId(), zero).get());
×
510
        }));
×
511

512
    dataNodeInfoList.sort(Comparator.comparingInt(TDataNodeInfo::getDataNodeId));
×
513
    return dataNodeInfoList;
×
514
  }
515

516
  public List<TConfigNodeLocation> getRegisteredConfigNodes() {
517
    return nodeInfo.getRegisteredConfigNodes();
×
518
  }
519

520
  public Map<Integer, TNodeVersionInfo> getNodeVersionInfo() {
521
    return nodeInfo.getNodeVersionInfo();
×
522
  }
523

524
  public List<TConfigNodeInfo> getRegisteredConfigNodeInfoList() {
525
    List<TConfigNodeInfo> configNodeInfoList = new ArrayList<>();
×
526
    List<TConfigNodeLocation> registeredConfigNodes = this.getRegisteredConfigNodes();
×
527
    if (registeredConfigNodes != null) {
×
528
      registeredConfigNodes.forEach(
×
529
          (configNodeLocation) -> {
530
            TConfigNodeInfo info = new TConfigNodeInfo();
×
531
            int configNodeId = configNodeLocation.getConfigNodeId();
×
532
            info.setConfigNodeId(configNodeId);
×
533
            info.setStatus(getLoadManager().getNodeStatusWithReason(configNodeId));
×
534
            info.setInternalAddress(configNodeLocation.getInternalEndPoint().getIp());
×
535
            info.setInternalPort(configNodeLocation.getInternalEndPoint().getPort());
×
536
            info.setRoleType(
×
537
                configNodeLocation.getConfigNodeId() == ConfigNodeHeartbeatCache.CURRENT_NODE_ID
×
538
                    ? RegionRoleType.Leader.name()
×
539
                    : RegionRoleType.Follower.name());
×
540
            configNodeInfoList.add(info);
×
541
          });
×
542
    }
543
    configNodeInfoList.sort(Comparator.comparingInt(TConfigNodeInfo::getConfigNodeId));
×
544
    return configNodeInfoList;
×
545
  }
546

547
  /**
548
   * Only leader use this interface, record the new ConfigNode's information.
549
   *
550
   * @param configNodeLocation The new ConfigNode.
551
   * @param versionInfo The new ConfigNode's versionInfo.
552
   */
553
  public void applyConfigNode(
554
      TConfigNodeLocation configNodeLocation, TNodeVersionInfo versionInfo) {
555
    ApplyConfigNodePlan applyConfigNodePlan = new ApplyConfigNodePlan(configNodeLocation);
×
556
    try {
557
      getConsensusManager().write(applyConfigNodePlan);
×
558
    } catch (ConsensusException e) {
×
559
      LOGGER.warn(CONSENSUS_WRITE_ERROR, e);
×
560
    }
×
561
    UpdateVersionInfoPlan updateVersionInfoPlan =
×
562
        new UpdateVersionInfoPlan(versionInfo, configNodeLocation.getConfigNodeId());
×
563
    try {
564
      getConsensusManager().write(updateVersionInfoPlan);
×
565
    } catch (ConsensusException e) {
×
566
      LOGGER.warn(CONSENSUS_WRITE_ERROR, e);
×
567
    }
×
568
  }
×
569

570
  /**
571
   * Only leader use this interface, check the ConfigNode before remove it.
572
   *
573
   * @param removeConfigNodePlan RemoveConfigNodePlan
574
   */
575
  public TSStatus checkConfigNodeBeforeRemove(RemoveConfigNodePlan removeConfigNodePlan) {
576
    removeConfigNodeLock.lock();
×
577
    try {
578
      // Check OnlineConfigNodes number
579
      if (filterConfigNodeThroughStatus(NodeStatus.Running).size() <= 1) {
×
580
        return new TSStatus(TSStatusCode.REMOVE_CONFIGNODE_ERROR.getStatusCode())
×
581
            .setMessage(
×
582
                "Remove ConfigNode failed because there is only one ConfigNode in current Cluster.");
583
      }
584

585
      // Check whether the registeredConfigNodes contain the ConfigNode to be removed.
586
      if (!getRegisteredConfigNodes().contains(removeConfigNodePlan.getConfigNodeLocation())) {
×
587
        return new TSStatus(TSStatusCode.REMOVE_CONFIGNODE_ERROR.getStatusCode())
×
588
            .setMessage("Remove ConfigNode failed because the ConfigNode not in current Cluster.");
×
589
      }
590

591
      // Check whether the remove ConfigNode is leader
592
      TConfigNodeLocation leader = getConsensusManager().getLeader();
×
593
      if (leader == null) {
×
594
        return new TSStatus(TSStatusCode.REMOVE_CONFIGNODE_ERROR.getStatusCode())
×
595
            .setMessage(
×
596
                "Remove ConfigNode failed because the ConfigNodeGroup is on leader election, please retry.");
597
      }
598

599
      if (leader
×
600
          .getInternalEndPoint()
×
601
          .equals(removeConfigNodePlan.getConfigNodeLocation().getInternalEndPoint())) {
×
602
        // transfer leader
603
        return transferLeader(removeConfigNodePlan, getConsensusManager().getConsensusGroupId());
×
604
      }
605

606
    } finally {
607
      removeConfigNodeLock.unlock();
×
608
    }
609

610
    return new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode())
×
611
        .setMessage("Successfully remove confignode.");
×
612
  }
613

614
  private TSStatus transferLeader(
615
      RemoveConfigNodePlan removeConfigNodePlan, ConsensusGroupId groupId) {
616
    Optional<TConfigNodeLocation> optional =
×
617
        filterConfigNodeThroughStatus(NodeStatus.Running).stream()
×
618
            .filter(e -> !e.equals(removeConfigNodePlan.getConfigNodeLocation()))
×
619
            .findAny();
×
620
    TConfigNodeLocation newLeader = null;
×
621
    if (optional.isPresent()) {
×
622
      newLeader = optional.get();
×
623
    } else {
624
      return new TSStatus(TSStatusCode.TRANSFER_LEADER_ERROR.getStatusCode())
×
625
          .setMessage(
×
626
              "Transfer ConfigNode leader failed because can not find any running ConfigNode.");
627
    }
628
    try {
629
      getConsensusManager()
×
630
          .getConsensusImpl()
×
631
          .transferLeader(
×
632
              groupId,
633
              new Peer(groupId, newLeader.getConfigNodeId(), newLeader.getConsensusEndPoint()));
×
634
    } catch (ConsensusException e) {
×
635
      return new TSStatus(TSStatusCode.REMOVE_CONFIGNODE_ERROR.getStatusCode())
×
636
          .setMessage("Remove ConfigNode failed because transfer ConfigNode leader failed.");
×
637
    }
×
638
    return new TSStatus(TSStatusCode.REDIRECTION_RECOMMEND.getStatusCode())
×
639
        .setRedirectNode(newLeader.getInternalEndPoint())
×
640
        .setMessage(
×
641
            "The ConfigNode to be removed is leader, already transfer Leader to "
642
                + newLeader
643
                + ".");
644
  }
645

646
  public List<TSStatus> merge() {
647
    Map<Integer, TDataNodeLocation> dataNodeLocationMap =
×
648
        configManager.getNodeManager().getRegisteredDataNodeLocations();
×
649
    AsyncClientHandler<Object, TSStatus> clientHandler =
×
650
        new AsyncClientHandler<>(DataNodeRequestType.MERGE, dataNodeLocationMap);
651
    AsyncDataNodeClientPool.getInstance().sendAsyncRequestToDataNodeWithRetry(clientHandler);
×
652
    return clientHandler.getResponseList();
×
653
  }
654

655
  public List<TSStatus> flush(TFlushReq req) {
656
    Map<Integer, TDataNodeLocation> dataNodeLocationMap =
×
657
        configManager.getNodeManager().getRegisteredDataNodeLocations();
×
658
    AsyncClientHandler<TFlushReq, TSStatus> clientHandler =
×
659
        new AsyncClientHandler<>(DataNodeRequestType.FLUSH, req, dataNodeLocationMap);
660
    AsyncDataNodeClientPool.getInstance().sendAsyncRequestToDataNodeWithRetry(clientHandler);
×
661
    return clientHandler.getResponseList();
×
662
  }
663

664
  public List<TSStatus> clearCache() {
665
    Map<Integer, TDataNodeLocation> dataNodeLocationMap =
×
666
        configManager.getNodeManager().getRegisteredDataNodeLocations();
×
667
    AsyncClientHandler<Object, TSStatus> clientHandler =
×
668
        new AsyncClientHandler<>(DataNodeRequestType.CLEAR_CACHE, dataNodeLocationMap);
669
    AsyncDataNodeClientPool.getInstance().sendAsyncRequestToDataNodeWithRetry(clientHandler);
×
670
    return clientHandler.getResponseList();
×
671
  }
672

673
  public List<TSStatus> loadConfiguration() {
674
    Map<Integer, TDataNodeLocation> dataNodeLocationMap =
×
675
        configManager.getNodeManager().getRegisteredDataNodeLocations();
×
676
    AsyncClientHandler<Object, TSStatus> clientHandler =
×
677
        new AsyncClientHandler<>(DataNodeRequestType.LOAD_CONFIGURATION, dataNodeLocationMap);
678
    AsyncDataNodeClientPool.getInstance().sendAsyncRequestToDataNodeWithRetry(clientHandler);
×
679
    return clientHandler.getResponseList();
×
680
  }
681

682
  public List<TSStatus> setSystemStatus(String status) {
683
    Map<Integer, TDataNodeLocation> dataNodeLocationMap =
×
684
        configManager.getNodeManager().getRegisteredDataNodeLocations();
×
685
    AsyncClientHandler<String, TSStatus> clientHandler =
×
686
        new AsyncClientHandler<>(
687
            DataNodeRequestType.SET_SYSTEM_STATUS, status, dataNodeLocationMap);
688
    AsyncDataNodeClientPool.getInstance().sendAsyncRequestToDataNodeWithRetry(clientHandler);
×
689
    return clientHandler.getResponseList();
×
690
  }
691

692
  public TSStatus setDataNodeStatus(TSetDataNodeStatusReq setDataNodeStatusReq) {
693
    return SyncDataNodeClientPool.getInstance()
×
694
        .sendSyncRequestToDataNodeWithRetry(
×
695
            setDataNodeStatusReq.getTargetDataNode().getInternalEndPoint(),
×
696
            setDataNodeStatusReq.getStatus(),
×
697
            DataNodeRequestType.SET_SYSTEM_STATUS);
698
  }
699

700
  /**
701
   * Kill read on DataNode.
702
   *
703
   * @param queryId the id of specific read need to be killed, it will be NULL if kill all queries
704
   * @param dataNodeId the DataNode obtains target read, -1 means we will kill all queries on all
705
   *     DataNodes
706
   */
707
  public TSStatus killQuery(String queryId, int dataNodeId) {
708
    if (dataNodeId < 0) {
×
709
      return killAllQueries();
×
710
    } else {
711
      return killSpecificQuery(queryId, getRegisteredDataNodeLocations().get(dataNodeId));
×
712
    }
713
  }
714

715
  private TSStatus killAllQueries() {
716
    Map<Integer, TDataNodeLocation> dataNodeLocationMap =
×
717
        configManager.getNodeManager().getRegisteredDataNodeLocations();
×
718
    AsyncClientHandler<String, TSStatus> clientHandler =
×
719
        new AsyncClientHandler<>(DataNodeRequestType.KILL_QUERY_INSTANCE, dataNodeLocationMap);
720
    AsyncDataNodeClientPool.getInstance().sendAsyncRequestToDataNodeWithRetry(clientHandler);
×
721
    return RpcUtils.squashResponseStatusList(clientHandler.getResponseList());
×
722
  }
723

724
  private TSStatus killSpecificQuery(String queryId, TDataNodeLocation dataNodeLocation) {
725
    if (dataNodeLocation == null) {
×
726
      return new TSStatus(TSStatusCode.INTERNAL_SERVER_ERROR.getStatusCode())
×
727
          .setMessage(
×
728
              "The target DataNode is not existed, please ensure your input <queryId> is correct");
729
    } else {
730
      return SyncDataNodeClientPool.getInstance()
×
731
          .sendSyncRequestToDataNodeWithRetry(
×
732
              dataNodeLocation.getInternalEndPoint(),
×
733
              queryId,
734
              DataNodeRequestType.KILL_QUERY_INSTANCE);
735
    }
736
  }
737

738
  /**
739
   * Filter ConfigNodes through the specified NodeStatus.
740
   *
741
   * @param status The specified NodeStatus
742
   * @return Filtered ConfigNodes with the specified NodeStatus
743
   */
744
  public List<TConfigNodeLocation> filterConfigNodeThroughStatus(NodeStatus... status) {
745
    return nodeInfo.getRegisteredConfigNodes(
×
746
        getLoadManager().filterConfigNodeThroughStatus(status));
×
747
  }
748

749
  /**
750
   * Filter DataNodes through the specified NodeStatus.
751
   *
752
   * @param status The specified NodeStatus
753
   * @return Filtered DataNodes with the specified NodeStatus
754
   */
755
  public List<TDataNodeConfiguration> filterDataNodeThroughStatus(NodeStatus... status) {
756
    return nodeInfo.getRegisteredDataNodes(getLoadManager().filterDataNodeThroughStatus(status));
×
757
  }
758

759
  /**
760
   * Get the DataNodeLocation of the DataNode which has the lowest loadScore.
761
   *
762
   * @return TDataNodeLocation with the lowest loadScore
763
   */
764
  public Optional<TDataNodeLocation> getLowestLoadDataNode() {
765
    // TODO get real lowest load data node after scoring algorithm being implemented
766
    int dataNodeId = getLoadManager().getLowestLoadDataNode();
×
767
    return dataNodeId < 0
×
768
        ? Optional.empty()
×
769
        : Optional.of(getRegisteredDataNode(dataNodeId).getLocation());
×
770
  }
771

772
  /**
773
   * Get the DataNodeLocation which has the lowest loadScore within input.
774
   *
775
   * @return TDataNodeLocation with the lowest loadScore
776
   */
777
  public TDataNodeLocation getLowestLoadDataNode(Set<Integer> nodes) {
778
    int dataNodeId = getLoadManager().getLowestLoadDataNode(new ArrayList<>(nodes));
×
779
    return getRegisteredDataNode(dataNodeId).getLocation();
×
780
  }
781

782
  private ConsensusManager getConsensusManager() {
783
    return configManager.getConsensusManager();
×
784
  }
785

786
  private ClusterSchemaManager getClusterSchemaManager() {
787
    return configManager.getClusterSchemaManager();
×
788
  }
789

790
  private PartitionManager getPartitionManager() {
791
    return configManager.getPartitionManager();
×
792
  }
793

794
  private LoadManager getLoadManager() {
795
    return configManager.getLoadManager();
×
796
  }
797

798
  private TriggerManager getTriggerManager() {
799
    return configManager.getTriggerManager();
×
800
  }
801

802
  private PipeManager getPipeManager() {
803
    return configManager.getPipeManager();
×
804
  }
805

806
  private UDFManager getUDFManager() {
807
    return configManager.getUDFManager();
×
808
  }
809
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc