• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

apache / iotdb / #9723

pending completion
#9723

push

travis_ci

web-flow
[To rel/1.2][IOTDB-6079] Cluster computing resource balance (#10746)

264 of 264 new or added lines in 11 files covered. (100.0%)

79280 of 165370 relevant lines covered (47.94%)

0.48 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

0.34
/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConfigManager.java
1
/*
2
 * Licensed to the Apache Software Foundation (ASF) under one
3
 * or more contributor license agreements.  See the NOTICE file
4
 * distributed with this work for additional information
5
 * regarding copyright ownership.  The ASF licenses this file
6
 * to you under the Apache License, Version 2.0 (the
7
 * "License"); you may not use this file except in compliance
8
 * with the License.  You may obtain a copy of the License at
9
 *
10
 *     http://www.apache.org/licenses/LICENSE-2.0
11
 *
12
 * Unless required by applicable law or agreed to in writing,
13
 * software distributed under the License is distributed on an
14
 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15
 * KIND, either express or implied.  See the License for the
16
 * specific language governing permissions and limitations
17
 * under the License.
18
 */
19

20
package org.apache.iotdb.confignode.manager;
21

22
import org.apache.iotdb.common.rpc.thrift.TConfigNodeLocation;
23
import org.apache.iotdb.common.rpc.thrift.TConsensusGroupId;
24
import org.apache.iotdb.common.rpc.thrift.TDataNodeConfiguration;
25
import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation;
26
import org.apache.iotdb.common.rpc.thrift.TFlushReq;
27
import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet;
28
import org.apache.iotdb.common.rpc.thrift.TSStatus;
29
import org.apache.iotdb.common.rpc.thrift.TSeriesPartitionSlot;
30
import org.apache.iotdb.common.rpc.thrift.TSetSpaceQuotaReq;
31
import org.apache.iotdb.common.rpc.thrift.TSetThrottleQuotaReq;
32
import org.apache.iotdb.common.rpc.thrift.TTimePartitionSlot;
33
import org.apache.iotdb.commons.cluster.NodeStatus;
34
import org.apache.iotdb.commons.cluster.NodeType;
35
import org.apache.iotdb.commons.conf.CommonConfig;
36
import org.apache.iotdb.commons.conf.CommonDescriptor;
37
import org.apache.iotdb.commons.conf.IoTDBConstant;
38
import org.apache.iotdb.commons.exception.IllegalPathException;
39
import org.apache.iotdb.commons.path.PartialPath;
40
import org.apache.iotdb.commons.path.PathPatternTree;
41
import org.apache.iotdb.commons.service.metric.MetricService;
42
import org.apache.iotdb.commons.utils.AuthUtils;
43
import org.apache.iotdb.commons.utils.PathUtils;
44
import org.apache.iotdb.commons.utils.StatusUtils;
45
import org.apache.iotdb.confignode.conf.ConfigNodeConfig;
46
import org.apache.iotdb.confignode.conf.ConfigNodeDescriptor;
47
import org.apache.iotdb.confignode.conf.SystemPropertiesUtils;
48
import org.apache.iotdb.confignode.consensus.request.auth.AuthorPlan;
49
import org.apache.iotdb.confignode.consensus.request.read.database.CountDatabasePlan;
50
import org.apache.iotdb.confignode.consensus.request.read.database.GetDatabasePlan;
51
import org.apache.iotdb.confignode.consensus.request.read.datanode.GetDataNodeConfigurationPlan;
52
import org.apache.iotdb.confignode.consensus.request.read.partition.GetDataPartitionPlan;
53
import org.apache.iotdb.confignode.consensus.request.read.partition.GetNodePathsPartitionPlan;
54
import org.apache.iotdb.confignode.consensus.request.read.partition.GetOrCreateDataPartitionPlan;
55
import org.apache.iotdb.confignode.consensus.request.read.partition.GetOrCreateSchemaPartitionPlan;
56
import org.apache.iotdb.confignode.consensus.request.read.partition.GetSchemaPartitionPlan;
57
import org.apache.iotdb.confignode.consensus.request.read.region.GetRegionInfoListPlan;
58
import org.apache.iotdb.confignode.consensus.request.write.confignode.RemoveConfigNodePlan;
59
import org.apache.iotdb.confignode.consensus.request.write.database.DatabaseSchemaPlan;
60
import org.apache.iotdb.confignode.consensus.request.write.database.SetDataReplicationFactorPlan;
61
import org.apache.iotdb.confignode.consensus.request.write.database.SetSchemaReplicationFactorPlan;
62
import org.apache.iotdb.confignode.consensus.request.write.database.SetTTLPlan;
63
import org.apache.iotdb.confignode.consensus.request.write.database.SetTimePartitionIntervalPlan;
64
import org.apache.iotdb.confignode.consensus.request.write.datanode.RemoveDataNodePlan;
65
import org.apache.iotdb.confignode.consensus.request.write.template.CreateSchemaTemplatePlan;
66
import org.apache.iotdb.confignode.consensus.response.auth.PermissionInfoResp;
67
import org.apache.iotdb.confignode.consensus.response.database.CountDatabaseResp;
68
import org.apache.iotdb.confignode.consensus.response.database.DatabaseSchemaResp;
69
import org.apache.iotdb.confignode.consensus.response.datanode.ConfigurationResp;
70
import org.apache.iotdb.confignode.consensus.response.datanode.DataNodeConfigurationResp;
71
import org.apache.iotdb.confignode.consensus.response.datanode.DataNodeRegisterResp;
72
import org.apache.iotdb.confignode.consensus.response.datanode.DataNodeToStatusResp;
73
import org.apache.iotdb.confignode.consensus.response.partition.DataPartitionResp;
74
import org.apache.iotdb.confignode.consensus.response.partition.RegionInfoListResp;
75
import org.apache.iotdb.confignode.consensus.response.partition.SchemaNodeManagementResp;
76
import org.apache.iotdb.confignode.consensus.response.partition.SchemaPartitionResp;
77
import org.apache.iotdb.confignode.consensus.response.template.TemplateSetInfoResp;
78
import org.apache.iotdb.confignode.consensus.statemachine.ConfigRegionStateMachine;
79
import org.apache.iotdb.confignode.manager.consensus.ConsensusManager;
80
import org.apache.iotdb.confignode.manager.cq.CQManager;
81
import org.apache.iotdb.confignode.manager.load.LoadManager;
82
import org.apache.iotdb.confignode.manager.load.cache.node.NodeHeartbeatSample;
83
import org.apache.iotdb.confignode.manager.node.ClusterNodeStartUtils;
84
import org.apache.iotdb.confignode.manager.node.NodeManager;
85
import org.apache.iotdb.confignode.manager.node.NodeMetrics;
86
import org.apache.iotdb.confignode.manager.partition.PartitionManager;
87
import org.apache.iotdb.confignode.manager.partition.PartitionMetrics;
88
import org.apache.iotdb.confignode.manager.pipe.PipeManager;
89
import org.apache.iotdb.confignode.manager.schema.ClusterSchemaManager;
90
import org.apache.iotdb.confignode.manager.schema.ClusterSchemaQuotaStatistics;
91
import org.apache.iotdb.confignode.persistence.AuthorInfo;
92
import org.apache.iotdb.confignode.persistence.ModelInfo;
93
import org.apache.iotdb.confignode.persistence.ProcedureInfo;
94
import org.apache.iotdb.confignode.persistence.TriggerInfo;
95
import org.apache.iotdb.confignode.persistence.UDFInfo;
96
import org.apache.iotdb.confignode.persistence.cq.CQInfo;
97
import org.apache.iotdb.confignode.persistence.executor.ConfigPlanExecutor;
98
import org.apache.iotdb.confignode.persistence.node.NodeInfo;
99
import org.apache.iotdb.confignode.persistence.partition.PartitionInfo;
100
import org.apache.iotdb.confignode.persistence.pipe.PipeInfo;
101
import org.apache.iotdb.confignode.persistence.quota.QuotaInfo;
102
import org.apache.iotdb.confignode.persistence.schema.ClusterSchemaInfo;
103
import org.apache.iotdb.confignode.rpc.thrift.TAlterLogicalViewReq;
104
import org.apache.iotdb.confignode.rpc.thrift.TAlterSchemaTemplateReq;
105
import org.apache.iotdb.confignode.rpc.thrift.TClusterParameters;
106
import org.apache.iotdb.confignode.rpc.thrift.TConfigNodeRegisterReq;
107
import org.apache.iotdb.confignode.rpc.thrift.TConfigNodeRegisterResp;
108
import org.apache.iotdb.confignode.rpc.thrift.TConfigNodeRestartReq;
109
import org.apache.iotdb.confignode.rpc.thrift.TCountTimeSlotListReq;
110
import org.apache.iotdb.confignode.rpc.thrift.TCountTimeSlotListResp;
111
import org.apache.iotdb.confignode.rpc.thrift.TCreateCQReq;
112
import org.apache.iotdb.confignode.rpc.thrift.TCreateFunctionReq;
113
import org.apache.iotdb.confignode.rpc.thrift.TCreateModelReq;
114
import org.apache.iotdb.confignode.rpc.thrift.TCreatePipePluginReq;
115
import org.apache.iotdb.confignode.rpc.thrift.TCreatePipeReq;
116
import org.apache.iotdb.confignode.rpc.thrift.TCreateSchemaTemplateReq;
117
import org.apache.iotdb.confignode.rpc.thrift.TCreateTriggerReq;
118
import org.apache.iotdb.confignode.rpc.thrift.TDataNodeRegisterReq;
119
import org.apache.iotdb.confignode.rpc.thrift.TDataNodeRestartReq;
120
import org.apache.iotdb.confignode.rpc.thrift.TDataNodeRestartResp;
121
import org.apache.iotdb.confignode.rpc.thrift.TDataPartitionTableResp;
122
import org.apache.iotdb.confignode.rpc.thrift.TDatabaseSchema;
123
import org.apache.iotdb.confignode.rpc.thrift.TDeactivateSchemaTemplateReq;
124
import org.apache.iotdb.confignode.rpc.thrift.TDeleteLogicalViewReq;
125
import org.apache.iotdb.confignode.rpc.thrift.TDeleteTimeSeriesReq;
126
import org.apache.iotdb.confignode.rpc.thrift.TDropCQReq;
127
import org.apache.iotdb.confignode.rpc.thrift.TDropModelReq;
128
import org.apache.iotdb.confignode.rpc.thrift.TDropTriggerReq;
129
import org.apache.iotdb.confignode.rpc.thrift.TGetAllPipeInfoResp;
130
import org.apache.iotdb.confignode.rpc.thrift.TGetAllTemplatesResp;
131
import org.apache.iotdb.confignode.rpc.thrift.TGetDataNodeLocationsResp;
132
import org.apache.iotdb.confignode.rpc.thrift.TGetJarInListReq;
133
import org.apache.iotdb.confignode.rpc.thrift.TGetJarInListResp;
134
import org.apache.iotdb.confignode.rpc.thrift.TGetLocationForTriggerResp;
135
import org.apache.iotdb.confignode.rpc.thrift.TGetPathsSetTemplatesResp;
136
import org.apache.iotdb.confignode.rpc.thrift.TGetPipePluginTableResp;
137
import org.apache.iotdb.confignode.rpc.thrift.TGetRegionIdReq;
138
import org.apache.iotdb.confignode.rpc.thrift.TGetRegionIdResp;
139
import org.apache.iotdb.confignode.rpc.thrift.TGetSeriesSlotListReq;
140
import org.apache.iotdb.confignode.rpc.thrift.TGetSeriesSlotListResp;
141
import org.apache.iotdb.confignode.rpc.thrift.TGetTemplateResp;
142
import org.apache.iotdb.confignode.rpc.thrift.TGetTimeSlotListReq;
143
import org.apache.iotdb.confignode.rpc.thrift.TGetTimeSlotListResp;
144
import org.apache.iotdb.confignode.rpc.thrift.TGetTriggerTableResp;
145
import org.apache.iotdb.confignode.rpc.thrift.TGetUDFTableResp;
146
import org.apache.iotdb.confignode.rpc.thrift.TMigrateRegionReq;
147
import org.apache.iotdb.confignode.rpc.thrift.TNodeVersionInfo;
148
import org.apache.iotdb.confignode.rpc.thrift.TPermissionInfoResp;
149
import org.apache.iotdb.confignode.rpc.thrift.TRegionMigrateResultReportReq;
150
import org.apache.iotdb.confignode.rpc.thrift.TRegionRouteMapResp;
151
import org.apache.iotdb.confignode.rpc.thrift.TSchemaNodeManagementResp;
152
import org.apache.iotdb.confignode.rpc.thrift.TSchemaPartitionTableResp;
153
import org.apache.iotdb.confignode.rpc.thrift.TSetDataNodeStatusReq;
154
import org.apache.iotdb.confignode.rpc.thrift.TSetSchemaTemplateReq;
155
import org.apache.iotdb.confignode.rpc.thrift.TShowCQResp;
156
import org.apache.iotdb.confignode.rpc.thrift.TShowClusterResp;
157
import org.apache.iotdb.confignode.rpc.thrift.TShowConfigNodesResp;
158
import org.apache.iotdb.confignode.rpc.thrift.TShowDataNodesResp;
159
import org.apache.iotdb.confignode.rpc.thrift.TShowDatabaseResp;
160
import org.apache.iotdb.confignode.rpc.thrift.TShowModelReq;
161
import org.apache.iotdb.confignode.rpc.thrift.TShowModelResp;
162
import org.apache.iotdb.confignode.rpc.thrift.TShowPipeReq;
163
import org.apache.iotdb.confignode.rpc.thrift.TShowPipeResp;
164
import org.apache.iotdb.confignode.rpc.thrift.TShowThrottleReq;
165
import org.apache.iotdb.confignode.rpc.thrift.TShowTrailReq;
166
import org.apache.iotdb.confignode.rpc.thrift.TShowTrailResp;
167
import org.apache.iotdb.confignode.rpc.thrift.TShowVariablesResp;
168
import org.apache.iotdb.confignode.rpc.thrift.TSpaceQuotaResp;
169
import org.apache.iotdb.confignode.rpc.thrift.TThrottleQuotaResp;
170
import org.apache.iotdb.confignode.rpc.thrift.TTimeSlotList;
171
import org.apache.iotdb.confignode.rpc.thrift.TUnsetSchemaTemplateReq;
172
import org.apache.iotdb.confignode.rpc.thrift.TUpdateModelInfoReq;
173
import org.apache.iotdb.confignode.rpc.thrift.TUpdateModelStateReq;
174
import org.apache.iotdb.consensus.common.DataSet;
175
import org.apache.iotdb.db.schemaengine.template.Template;
176
import org.apache.iotdb.db.schemaengine.template.TemplateAlterOperationType;
177
import org.apache.iotdb.db.schemaengine.template.alter.TemplateAlterOperationUtil;
178
import org.apache.iotdb.rpc.RpcUtils;
179
import org.apache.iotdb.rpc.TSStatusCode;
180
import org.apache.iotdb.tsfile.utils.Pair;
181

182
import org.slf4j.Logger;
183
import org.slf4j.LoggerFactory;
184

185
import java.io.IOException;
186
import java.nio.ByteBuffer;
187
import java.util.ArrayList;
188
import java.util.Arrays;
189
import java.util.Collection;
190
import java.util.Collections;
191
import java.util.Comparator;
192
import java.util.HashMap;
193
import java.util.HashSet;
194
import java.util.List;
195
import java.util.Map;
196
import java.util.Set;
197
import java.util.concurrent.atomic.AtomicReference;
198
import java.util.stream.Collectors;
199

200
import static org.apache.iotdb.commons.conf.IoTDBConstant.ONE_LEVEL_PATH_WILDCARD;
201

202
/** Entry of all management, AssignPartitionManager, AssignRegionManager. */
203
public class ConfigManager implements IManager {
204

205
  private static final Logger LOGGER = LoggerFactory.getLogger(ConfigManager.class);
1✔
206

207
  private static final ConfigNodeConfig CONF = ConfigNodeDescriptor.getInstance().getConf();
1✔
208
  private static final CommonConfig COMMON_CONF = CommonDescriptor.getInstance().getConfig();
1✔
209

210
  /** Manage PartitionTable read/write requests through the ConsensusLayer. */
211
  private final AtomicReference<ConsensusManager> consensusManager = new AtomicReference<>();
×
212

213
  /** Manage cluster node. */
214
  private final NodeManager nodeManager;
215

216
  /** Manage cluster schemaengine. */
217
  private final ClusterSchemaManager clusterSchemaManager;
218

219
  /** Manage cluster regions and partitions. */
220
  private final PartitionManager partitionManager;
221

222
  /** Manage cluster authorization. */
223
  private final PermissionManager permissionManager;
224

225
  private final LoadManager loadManager;
226

227
  /** Manage procedure. */
228
  private final ProcedureManager procedureManager;
229

230
  /** UDF. */
231
  private final UDFManager udfManager;
232

233
  /** Manage Trigger. */
234
  private final TriggerManager triggerManager;
235

236
  /** CQ. */
237
  private final CQManager cqManager;
238

239
  /** ML Model. */
240
  private final ModelManager modelManager;
241

242
  /** Pipe */
243
  private final PipeManager pipeManager;
244

245
  /** Manage quotas */
246
  private final ClusterQuotaManager clusterQuotaManager;
247

248
  private final ConfigRegionStateMachine stateMachine;
249

250
  private final RetryFailedTasksThread retryFailedTasksThread;
251

252
  private static final String DATABASE = "\tDatabase=";
253

254
  public ConfigManager() throws IOException {
×
255
    // Build the persistence module
256
    NodeInfo nodeInfo = new NodeInfo();
×
257
    ClusterSchemaInfo clusterSchemaInfo = new ClusterSchemaInfo();
×
258
    PartitionInfo partitionInfo = new PartitionInfo();
×
259
    AuthorInfo authorInfo = new AuthorInfo();
×
260
    ProcedureInfo procedureInfo = new ProcedureInfo();
×
261
    UDFInfo udfInfo = new UDFInfo();
×
262
    TriggerInfo triggerInfo = new TriggerInfo();
×
263
    CQInfo cqInfo = new CQInfo();
×
264
    ModelInfo modelInfo = new ModelInfo();
×
265
    PipeInfo pipeInfo = new PipeInfo();
×
266
    QuotaInfo quotaInfo = new QuotaInfo();
×
267

268
    // Build state machine and executor
269
    ConfigPlanExecutor executor =
×
270
        new ConfigPlanExecutor(
271
            nodeInfo,
272
            clusterSchemaInfo,
273
            partitionInfo,
274
            authorInfo,
275
            procedureInfo,
276
            udfInfo,
277
            triggerInfo,
278
            cqInfo,
279
            modelInfo,
280
            pipeInfo,
281
            quotaInfo);
282
    this.stateMachine = new ConfigRegionStateMachine(this, executor);
×
283

284
    // Build the manager module
285
    this.nodeManager = new NodeManager(this, nodeInfo);
×
286
    this.clusterSchemaManager =
×
287
        new ClusterSchemaManager(this, clusterSchemaInfo, new ClusterSchemaQuotaStatistics());
288
    this.partitionManager = new PartitionManager(this, partitionInfo);
×
289
    this.permissionManager = new PermissionManager(this, authorInfo);
×
290
    this.procedureManager = new ProcedureManager(this, procedureInfo);
×
291
    this.udfManager = new UDFManager(this, udfInfo);
×
292
    this.triggerManager = new TriggerManager(this, triggerInfo);
×
293
    this.cqManager = new CQManager(this);
×
294
    this.modelManager = new ModelManager(this, modelInfo);
×
295
    this.pipeManager = new PipeManager(this, pipeInfo);
×
296

297
    // 1. keep PipeManager initialization before LoadManager initialization, because
298
    // LoadManager will register PipeManager as a listener.
299
    // 2. keep RetryFailedTasksThread initialization after LoadManager initialization,
300
    // because RetryFailedTasksThread will keep a reference of LoadManager.
301
    this.loadManager = new LoadManager(this);
×
302

303
    this.retryFailedTasksThread = new RetryFailedTasksThread(this);
×
304
    this.clusterQuotaManager = new ClusterQuotaManager(this, quotaInfo);
×
305
  }
×
306

307
  public void initConsensusManager() throws IOException {
308
    this.consensusManager.set(new ConsensusManager(this, this.stateMachine));
×
309
  }
×
310

311
  public void close() throws IOException {
312
    if (consensusManager.get() != null) {
×
313
      consensusManager.get().close();
×
314
    }
315
    if (partitionManager != null) {
×
316
      partitionManager.getRegionMaintainer().shutdown();
×
317
    }
318
    if (procedureManager != null) {
×
319
      procedureManager.shiftExecutor(false);
×
320
    }
321
  }
×
322

323
  @Override
324
  public DataSet getSystemConfiguration() {
325
    TSStatus status = confirmLeader();
×
326
    ConfigurationResp dataSet;
327
    // Notice: The Seed-ConfigNode must also have the privilege to give system configuration.
328
    // Otherwise, the IoTDB-cluster will not have the ability to restart from scratch.
329
    if (status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()
×
330
        || ConfigNodeDescriptor.getInstance().isSeedConfigNode()
×
331
        || SystemPropertiesUtils.isSeedConfigNode()) {
×
332
      dataSet = (ConfigurationResp) nodeManager.getSystemConfiguration();
×
333
    } else {
334
      dataSet = new ConfigurationResp();
×
335
      dataSet.setStatus(status);
×
336
    }
337
    return dataSet;
×
338
  }
339

340
  @Override
341
  public DataSet registerDataNode(TDataNodeRegisterReq req) {
342
    TSStatus status = confirmLeader();
×
343
    if (status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
×
344
      status =
×
345
          ClusterNodeStartUtils.confirmNodeRegistration(
×
346
              NodeType.DataNode,
347
              req.getClusterName(),
×
348
              req.getDataNodeConfiguration().getLocation(),
×
349
              this);
350
      if (status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
×
351
        return nodeManager.registerDataNode(req);
×
352
      }
353
    }
354
    DataNodeRegisterResp resp = new DataNodeRegisterResp();
×
355
    resp.setStatus(status);
×
356
    resp.setConfigNodeList(getNodeManager().getRegisteredConfigNodes());
×
357
    return resp;
×
358
  }
359

360
  @Override
361
  public TDataNodeRestartResp restartDataNode(TDataNodeRestartReq req) {
362
    TSStatus status = confirmLeader();
×
363
    // Notice: The Seed-ConfigNode must also have the privilege to do Node restart check.
364
    // Otherwise, the IoTDB-cluster will not have the ability to restart from scratch.
365
    if (status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()
×
366
        || ConfigNodeDescriptor.getInstance().isSeedConfigNode()
×
367
        || SystemPropertiesUtils.isSeedConfigNode()) {
×
368
      status =
×
369
          ClusterNodeStartUtils.confirmNodeRestart(
×
370
              NodeType.DataNode,
371
              req.getClusterName(),
×
372
              req.getDataNodeConfiguration().getLocation().getDataNodeId(),
×
373
              req.getDataNodeConfiguration().getLocation(),
×
374
              this);
375
      if (status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
×
376
        return nodeManager.updateDataNodeIfNecessary(req);
×
377
      }
378
    }
379

380
    return new TDataNodeRestartResp()
×
381
        .setStatus(status)
×
382
        .setConfigNodeList(getNodeManager().getRegisteredConfigNodes());
×
383
  }
384

385
  @Override
386
  public DataSet removeDataNode(RemoveDataNodePlan removeDataNodePlan) {
387
    TSStatus status = confirmLeader();
×
388
    if (status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
×
389
      return nodeManager.removeDataNode(removeDataNodePlan);
×
390
    } else {
391
      DataNodeToStatusResp dataSet = new DataNodeToStatusResp();
×
392
      dataSet.setStatus(status);
×
393
      return dataSet;
×
394
    }
395
  }
396

397
  @Override
398
  public TSStatus reportDataNodeShutdown(TDataNodeLocation dataNodeLocation) {
399
    TSStatus status = confirmLeader();
×
400
    if (status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
×
401
      // Force updating the target DataNode's status to Unknown
402
      getLoadManager()
×
403
          .forceUpdateNodeCache(
×
404
              NodeType.DataNode,
405
              dataNodeLocation.getDataNodeId(),
×
406
              NodeHeartbeatSample.generateDefaultSample(NodeStatus.Unknown));
×
407
      LOGGER.info(
×
408
          "[ShutdownHook] The DataNode-{} will be shutdown soon, mark it as Unknown",
409
          dataNodeLocation.getDataNodeId());
×
410
    }
411
    return status;
×
412
  }
413

414
  @Override
415
  public TSStatus reportRegionMigrateResult(TRegionMigrateResultReportReq req) {
416
    TSStatus status = confirmLeader();
×
417
    if (status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
×
418
      procedureManager.reportRegionMigrateResult(req);
×
419
    }
420
    return status;
×
421
  }
422

423
  @Override
424
  public DataSet getDataNodeConfiguration(
425
      GetDataNodeConfigurationPlan getDataNodeConfigurationPlan) {
426
    TSStatus status = confirmLeader();
×
427
    if (status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
×
428
      return nodeManager.getDataNodeConfiguration(getDataNodeConfigurationPlan);
×
429
    } else {
430
      DataNodeConfigurationResp dataSet = new DataNodeConfigurationResp();
×
431
      dataSet.setStatus(status);
×
432
      return dataSet;
×
433
    }
434
  }
435

436
  @Override
437
  public TShowClusterResp showCluster() {
438
    TSStatus status = confirmLeader();
×
439
    if (status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
×
440
      List<TConfigNodeLocation> configNodeLocations = getNodeManager().getRegisteredConfigNodes();
×
441
      configNodeLocations.sort(Comparator.comparingInt(TConfigNodeLocation::getConfigNodeId));
×
442
      List<TDataNodeLocation> dataNodeInfoLocations =
×
443
          getNodeManager().getRegisteredDataNodes().stream()
×
444
              .map(TDataNodeConfiguration::getLocation)
×
445
              .sorted(Comparator.comparingInt(TDataNodeLocation::getDataNodeId))
×
446
              .collect(Collectors.toList());
×
447
      Map<Integer, String> nodeStatus = getLoadManager().getNodeStatusWithReason();
×
448
      Map<Integer, TNodeVersionInfo> nodeVersionInfo = getNodeManager().getNodeVersionInfo();
×
449
      return new TShowClusterResp(
×
450
          status, configNodeLocations, dataNodeInfoLocations, nodeStatus, nodeVersionInfo);
451
    } else {
452
      return new TShowClusterResp(
×
453
          status, new ArrayList<>(), new ArrayList<>(), new HashMap<>(), new HashMap<>());
454
    }
455
  }
456

457
  @Override
458
  public TShowVariablesResp showVariables() {
459
    TSStatus status = confirmLeader();
×
460
    TShowVariablesResp resp = new TShowVariablesResp();
×
461
    resp.setStatus(status);
×
462
    if (status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
×
463
      resp.setClusterParameters(getClusterParameters());
×
464
    }
465
    return resp;
×
466
  }
467

468
  public TClusterParameters getClusterParameters() {
469
    TClusterParameters clusterParameters = new TClusterParameters();
×
470
    clusterParameters.setClusterName(CONF.getClusterName());
×
471
    clusterParameters.setConfigNodeConsensusProtocolClass(
×
472
        CONF.getConfigNodeConsensusProtocolClass());
×
473
    clusterParameters.setDataRegionConsensusProtocolClass(
×
474
        CONF.getDataRegionConsensusProtocolClass());
×
475
    clusterParameters.setSchemaRegionConsensusProtocolClass(
×
476
        CONF.getSchemaRegionConsensusProtocolClass());
×
477
    clusterParameters.setSeriesPartitionSlotNum(CONF.getSeriesSlotNum());
×
478
    clusterParameters.setSeriesPartitionExecutorClass(CONF.getSeriesPartitionExecutorClass());
×
479
    clusterParameters.setDefaultTTL(COMMON_CONF.getDefaultTTLInMs());
×
480
    clusterParameters.setTimePartitionInterval(COMMON_CONF.getTimePartitionInterval());
×
481
    clusterParameters.setDataReplicationFactor(CONF.getDataReplicationFactor());
×
482
    clusterParameters.setSchemaReplicationFactor(CONF.getSchemaReplicationFactor());
×
483
    clusterParameters.setDataRegionPerDataNode(CONF.getDataRegionPerDataNode());
×
484
    clusterParameters.setSchemaRegionPerDataNode(CONF.getSchemaRegionPerDataNode());
×
485
    clusterParameters.setDiskSpaceWarningThreshold(COMMON_CONF.getDiskSpaceWarningThreshold());
×
486
    clusterParameters.setReadConsistencyLevel(CONF.getReadConsistencyLevel());
×
487
    clusterParameters.setTimestampPrecision(COMMON_CONF.getTimestampPrecision());
×
488
    clusterParameters.setSchemaEngineMode(COMMON_CONF.getSchemaEngineMode());
×
489
    clusterParameters.setTagAttributeTotalSize(COMMON_CONF.getTagAttributeTotalSize());
×
490
    clusterParameters.setDatabaseLimitThreshold(COMMON_CONF.getDatabaseLimitThreshold());
×
491
    return clusterParameters;
×
492
  }
493

494
  @Override
495
  public TSStatus setTTL(SetTTLPlan setTTLPlan) {
496
    TSStatus status = confirmLeader();
×
497
    if (status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
×
498
      return clusterSchemaManager.setTTL(setTTLPlan);
×
499
    } else {
500
      return status;
×
501
    }
502
  }
503

504
  @Override
505
  public TSStatus setSchemaReplicationFactor(
506
      SetSchemaReplicationFactorPlan setSchemaReplicationFactorPlan) {
507
    TSStatus status = confirmLeader();
×
508
    if (status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
×
509
      return clusterSchemaManager.setSchemaReplicationFactor(setSchemaReplicationFactorPlan);
×
510
    } else {
511
      return status;
×
512
    }
513
  }
514

515
  @Override
516
  public TSStatus setDataReplicationFactor(
517
      SetDataReplicationFactorPlan setDataReplicationFactorPlan) {
518
    TSStatus status = confirmLeader();
×
519
    if (status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
×
520
      return clusterSchemaManager.setDataReplicationFactor(setDataReplicationFactorPlan);
×
521
    } else {
522
      return status;
×
523
    }
524
  }
525

526
  @Override
527
  public TSStatus setTimePartitionInterval(
528
      SetTimePartitionIntervalPlan setTimePartitionIntervalPlan) {
529
    TSStatus status = confirmLeader();
×
530
    if (status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
×
531
      return clusterSchemaManager.setTimePartitionInterval(setTimePartitionIntervalPlan);
×
532
    } else {
533
      return status;
×
534
    }
535
  }
536

537
  @Override
538
  public DataSet countMatchedDatabases(CountDatabasePlan countDatabasePlan) {
539
    TSStatus status = confirmLeader();
×
540
    CountDatabaseResp result = new CountDatabaseResp();
×
541
    if (status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
×
542
      return clusterSchemaManager.countMatchedDatabases(countDatabasePlan);
×
543
    } else {
544
      result.setStatus(status);
×
545
    }
546
    return result;
×
547
  }
548

549
  @Override
550
  public DataSet getMatchedDatabaseSchemas(GetDatabasePlan getDatabaseReq) {
551
    TSStatus status = confirmLeader();
×
552
    if (status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
×
553
      return clusterSchemaManager.getMatchedDatabaseSchema(getDatabaseReq);
×
554
    } else {
555
      DatabaseSchemaResp dataSet = new DatabaseSchemaResp();
×
556
      dataSet.setStatus(status);
×
557
      return dataSet;
×
558
    }
559
  }
560

561
  @Override
562
  public synchronized TSStatus setDatabase(DatabaseSchemaPlan databaseSchemaPlan) {
563
    TSStatus status = confirmLeader();
×
564
    if (status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
×
565
      return clusterSchemaManager.setDatabase(databaseSchemaPlan);
×
566
    } else {
567
      return status;
×
568
    }
569
  }
570

571
  @Override
572
  public TSStatus alterDatabase(DatabaseSchemaPlan databaseSchemaPlan) {
573
    TSStatus status = confirmLeader();
×
574
    if (status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
×
575
      return clusterSchemaManager.alterDatabase(databaseSchemaPlan);
×
576
    } else {
577
      return status;
×
578
    }
579
  }
580

581
  @Override
582
  public synchronized TSStatus deleteDatabases(List<String> deletedPaths) {
583
    TSStatus status = confirmLeader();
×
584
    if (status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
×
585
      // remove wild
586
      Map<String, TDatabaseSchema> deleteDatabaseSchemaMap =
×
587
          getClusterSchemaManager().getMatchedDatabaseSchemasByName(deletedPaths);
×
588
      if (deleteDatabaseSchemaMap.isEmpty()) {
×
589
        return RpcUtils.getStatus(
×
590
            TSStatusCode.PATH_NOT_EXIST.getStatusCode(),
×
591
            String.format("Path %s does not exist", Arrays.toString(deletedPaths.toArray())));
×
592
      }
593
      ArrayList<TDatabaseSchema> parsedDeleteDatabases =
×
594
          new ArrayList<>(deleteDatabaseSchemaMap.values());
×
595
      return procedureManager.deleteDatabases(parsedDeleteDatabases);
×
596
    } else {
597
      return status;
×
598
    }
599
  }
600

601
  private List<TSeriesPartitionSlot> calculateRelatedSlot(PartialPath path, PartialPath database) {
602
    // The path contains `**`
603
    if (path.getFullPath().contains(IoTDBConstant.MULTI_LEVEL_PATH_WILDCARD)) {
×
604
      return new ArrayList<>();
×
605
    }
606
    List<PartialPath> innerPathList = path.alterPrefixPath(database);
×
607
    if (innerPathList.size() == 0) {
×
608
      return new ArrayList<>();
×
609
    }
610
    PartialPath innerPath = innerPathList.get(0);
×
611
    // The innerPath contains `*` and the only `*` is not in last level
612
    if (innerPath.getDevice().contains(IoTDBConstant.ONE_LEVEL_PATH_WILDCARD)) {
×
613
      return new ArrayList<>();
×
614
    }
615
    return Collections.singletonList(
×
616
        getPartitionManager().getSeriesPartitionSlot(innerPath.getDevice()));
×
617
  }
618

619
  @Override
620
  public TSchemaPartitionTableResp getSchemaPartition(PathPatternTree patternTree) {
621
    // Construct empty response
622
    TSchemaPartitionTableResp resp = new TSchemaPartitionTableResp();
×
623

624
    TSStatus status = confirmLeader();
×
625
    if (status.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
×
626
      return resp.setStatus(status);
×
627
    }
628

629
    // Build GetSchemaPartitionPlan
630
    Map<String, Set<TSeriesPartitionSlot>> partitionSlotsMap = new HashMap<>();
×
631
    List<PartialPath> relatedPaths = patternTree.getAllPathPatterns();
×
632
    List<String> allDatabases = getClusterSchemaManager().getDatabaseNames();
×
633
    List<PartialPath> allDatabasePaths = new ArrayList<>();
×
634
    for (String database : allDatabases) {
×
635
      try {
636
        allDatabasePaths.add(new PartialPath(database));
×
637
      } catch (IllegalPathException e) {
×
638
        throw new RuntimeException(e);
×
639
      }
×
640
    }
×
641
    Map<String, Boolean> scanAllRegions = new HashMap<>();
×
642
    for (PartialPath path : relatedPaths) {
×
643
      for (int i = 0; i < allDatabases.size(); i++) {
×
644
        String database = allDatabases.get(i);
×
645
        PartialPath databasePath = allDatabasePaths.get(i);
×
646
        if (path.overlapWithFullPathPrefix(databasePath) && !scanAllRegions.containsKey(database)) {
×
647
          List<TSeriesPartitionSlot> relatedSlot = calculateRelatedSlot(path, databasePath);
×
648
          if (relatedSlot.isEmpty()) {
×
649
            scanAllRegions.put(database, true);
×
650
            partitionSlotsMap.put(database, new HashSet<>());
×
651
          } else {
652
            partitionSlotsMap.computeIfAbsent(database, k -> new HashSet<>()).addAll(relatedSlot);
×
653
          }
654
        }
655
      }
656
    }
×
657

658
    // Return empty resp if the partitionSlotsMap is empty
659
    if (partitionSlotsMap.isEmpty()) {
×
660
      return resp.setStatus(StatusUtils.OK).setSchemaPartitionTable(new HashMap<>());
×
661
    }
662

663
    GetSchemaPartitionPlan getSchemaPartitionPlan =
×
664
        new GetSchemaPartitionPlan(
665
            partitionSlotsMap.entrySet().stream()
×
666
                .collect(Collectors.toMap(Map.Entry::getKey, e -> new ArrayList<>(e.getValue()))));
×
667
    SchemaPartitionResp queryResult =
×
668
        (SchemaPartitionResp) partitionManager.getSchemaPartition(getSchemaPartitionPlan);
×
669
    resp = queryResult.convertToRpcSchemaPartitionTableResp();
×
670

671
    LOGGER.debug("GetSchemaPartition receive paths: {}, return: {}", relatedPaths, resp);
×
672

673
    return resp;
×
674
  }
675

676
  @Override
677
  public TSchemaPartitionTableResp getOrCreateSchemaPartition(PathPatternTree patternTree) {
678
    // Construct empty response
679
    TSchemaPartitionTableResp resp = new TSchemaPartitionTableResp();
×
680

681
    TSStatus status = confirmLeader();
×
682
    if (status.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
×
683
      return resp.setStatus(status);
×
684
    }
685

686
    List<String> devicePaths = patternTree.getAllDevicePatterns();
×
687
    List<String> databases = getClusterSchemaManager().getDatabaseNames();
×
688

689
    // Build GetOrCreateSchemaPartitionPlan
690
    Map<String, List<TSeriesPartitionSlot>> partitionSlotsMap = new HashMap<>();
×
691
    for (String devicePath : devicePaths) {
×
692
      if (!devicePath.contains("*")) {
×
693
        // Only check devicePaths that without "*"
694
        for (String database : databases) {
×
695
          if (PathUtils.isStartWith(devicePath, database)) {
×
696
            partitionSlotsMap
×
697
                .computeIfAbsent(database, key -> new ArrayList<>())
×
698
                .add(getPartitionManager().getSeriesPartitionSlot(devicePath));
×
699
            break;
×
700
          }
701
        }
×
702
      }
703
    }
×
704
    GetOrCreateSchemaPartitionPlan getOrCreateSchemaPartitionPlan =
×
705
        new GetOrCreateSchemaPartitionPlan(partitionSlotsMap);
706

707
    SchemaPartitionResp queryResult =
×
708
        partitionManager.getOrCreateSchemaPartition(getOrCreateSchemaPartitionPlan);
×
709
    resp = queryResult.convertToRpcSchemaPartitionTableResp();
×
710

711
    if (CONF.isEnablePrintingNewlyCreatedPartition()) {
×
712
      printNewCreatedSchemaPartition(devicePaths, resp);
×
713
    }
714

715
    return resp;
×
716
  }
717

718
  private void printNewCreatedSchemaPartition(
719
      List<String> devicePaths, TSchemaPartitionTableResp resp) {
720
    final String lineSeparator = System.lineSeparator();
×
721
    StringBuilder devicePathString = new StringBuilder("{");
×
722
    for (String devicePath : devicePaths) {
×
723
      devicePathString.append(lineSeparator).append("\t").append(devicePath).append(",");
×
724
    }
×
725
    devicePathString.append(lineSeparator).append("}");
×
726

727
    StringBuilder schemaPartitionRespString = new StringBuilder("{");
×
728
    schemaPartitionRespString
×
729
        .append(lineSeparator)
×
730
        .append("\tTSStatus=")
×
731
        .append(resp.getStatus().getCode())
×
732
        .append(",");
×
733
    Map<String, Map<TSeriesPartitionSlot, TConsensusGroupId>> schemaPartitionTable =
×
734
        resp.getSchemaPartitionTable();
×
735
    for (Map.Entry<String, Map<TSeriesPartitionSlot, TConsensusGroupId>> databaseEntry :
736
        schemaPartitionTable.entrySet()) {
×
737
      String database = databaseEntry.getKey();
×
738
      schemaPartitionRespString
×
739
          .append(lineSeparator)
×
740
          .append(DATABASE)
×
741
          .append(database)
×
742
          .append(": {");
×
743
      for (Map.Entry<TSeriesPartitionSlot, TConsensusGroupId> slotEntry :
744
          databaseEntry.getValue().entrySet()) {
×
745
        schemaPartitionRespString
×
746
            .append(lineSeparator)
×
747
            .append("\t\t")
×
748
            .append(slotEntry.getKey())
×
749
            .append(", ")
×
750
            .append(slotEntry.getValue())
×
751
            .append(",");
×
752
      }
×
753
      schemaPartitionRespString.append(lineSeparator).append("\t},");
×
754
    }
×
755
    schemaPartitionRespString.append(lineSeparator).append("}");
×
756
    LOGGER.info(
×
757
        "[GetOrCreateSchemaPartition]:{} Receive PathPatternTree: {}, Return TSchemaPartitionTableResp: {}",
758
        lineSeparator,
759
        devicePathString,
760
        schemaPartitionRespString);
761
  }
×
762

763
  @Override
764
  public TSchemaNodeManagementResp getNodePathsPartition(PartialPath partialPath, Integer level) {
765
    TSStatus status = confirmLeader();
×
766
    if (status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
×
767
      GetNodePathsPartitionPlan getNodePathsPartitionPlan = new GetNodePathsPartitionPlan();
×
768
      getNodePathsPartitionPlan.setPartialPath(partialPath);
×
769
      if (null != level) {
×
770
        getNodePathsPartitionPlan.setLevel(level);
×
771
      }
772
      SchemaNodeManagementResp resp =
×
773
          partitionManager.getNodePathsPartition(getNodePathsPartitionPlan);
×
774
      TSchemaNodeManagementResp result =
×
775
          resp.convertToRpcSchemaNodeManagementPartitionResp(
×
776
              getLoadManager().getRegionPriorityMap());
×
777

778
      LOGGER.info(
×
779
          "getNodePathsPartition receive devicePaths: {}, level: {}, return TSchemaNodeManagementResp: {}",
780
          partialPath,
781
          level,
782
          result);
783

784
      return result;
×
785
    } else {
786
      return new TSchemaNodeManagementResp().setStatus(status);
×
787
    }
788
  }
789

790
  @Override
791
  public TDataPartitionTableResp getDataPartition(GetDataPartitionPlan getDataPartitionPlan) {
792
    // Construct empty response
793
    TDataPartitionTableResp resp = new TDataPartitionTableResp();
×
794

795
    TSStatus status = confirmLeader();
×
796
    if (status.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
×
797
      return resp.setStatus(status);
×
798
    }
799
    DataPartitionResp queryResult =
×
800
        (DataPartitionResp) partitionManager.getDataPartition(getDataPartitionPlan);
×
801

802
    resp = queryResult.convertToTDataPartitionTableResp();
×
803

804
    LOGGER.debug(
×
805
        "GetDataPartition interface receive PartitionSlotsMap: {}, return: {}",
806
        getDataPartitionPlan.getPartitionSlotsMap(),
×
807
        resp);
808

809
    return resp;
×
810
  }
811

812
  @Override
813
  public TDataPartitionTableResp getOrCreateDataPartition(
814
      GetOrCreateDataPartitionPlan getOrCreateDataPartitionPlan) {
815
    // Construct empty response
816
    TDataPartitionTableResp resp = new TDataPartitionTableResp();
×
817

818
    TSStatus status = confirmLeader();
×
819
    if (status.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
×
820
      return resp.setStatus(status);
×
821
    }
822

823
    DataPartitionResp queryResult =
×
824
        partitionManager.getOrCreateDataPartition(getOrCreateDataPartitionPlan);
×
825
    resp = queryResult.convertToTDataPartitionTableResp();
×
826

827
    if (CONF.isEnablePrintingNewlyCreatedPartition()) {
×
828
      printNewCreatedDataPartition(getOrCreateDataPartitionPlan, resp);
×
829
    }
830

831
    return resp;
×
832
  }
833

834
  private void printNewCreatedDataPartition(
835
      GetOrCreateDataPartitionPlan getOrCreateDataPartitionPlan, TDataPartitionTableResp resp) {
836
    final String lineSeparator = System.lineSeparator();
×
837
    StringBuilder partitionSlotsMapString = new StringBuilder("{");
×
838
    for (Map.Entry<String, Map<TSeriesPartitionSlot, TTimeSlotList>> databaseEntry :
839
        getOrCreateDataPartitionPlan.getPartitionSlotsMap().entrySet()) {
×
840
      String database = databaseEntry.getKey();
×
841
      partitionSlotsMapString.append(lineSeparator).append(DATABASE).append(database).append(": {");
×
842
      for (Map.Entry<TSeriesPartitionSlot, TTimeSlotList> slotEntry :
843
          databaseEntry.getValue().entrySet()) {
×
844
        partitionSlotsMapString
×
845
            .append(lineSeparator)
×
846
            .append("\t\t")
×
847
            .append(slotEntry.getKey())
×
848
            .append(",")
×
849
            .append(slotEntry.getValue());
×
850
      }
×
851
      partitionSlotsMapString.append(lineSeparator).append("\t},");
×
852
    }
×
853
    partitionSlotsMapString.append(lineSeparator).append("}");
×
854

855
    StringBuilder dataPartitionRespString = new StringBuilder("{");
×
856
    dataPartitionRespString
×
857
        .append(lineSeparator)
×
858
        .append("\tTSStatus=")
×
859
        .append(resp.getStatus().getCode())
×
860
        .append(",");
×
861
    Map<String, Map<TSeriesPartitionSlot, Map<TTimePartitionSlot, List<TConsensusGroupId>>>>
862
        dataPartitionTable = resp.getDataPartitionTable();
×
863
    for (Map.Entry<
864
            String, Map<TSeriesPartitionSlot, Map<TTimePartitionSlot, List<TConsensusGroupId>>>>
865
        databaseEntry : dataPartitionTable.entrySet()) {
×
866
      String database = databaseEntry.getKey();
×
867
      dataPartitionRespString.append(lineSeparator).append(DATABASE).append(database).append(": {");
×
868
      for (Map.Entry<TSeriesPartitionSlot, Map<TTimePartitionSlot, List<TConsensusGroupId>>>
869
          seriesSlotEntry : databaseEntry.getValue().entrySet()) {
×
870
        dataPartitionRespString
×
871
            .append(lineSeparator)
×
872
            .append("\t\t")
×
873
            .append(seriesSlotEntry.getKey())
×
874
            .append(": {");
×
875
        for (Map.Entry<TTimePartitionSlot, List<TConsensusGroupId>> timeSlotEntry :
876
            seriesSlotEntry.getValue().entrySet()) {
×
877
          dataPartitionRespString
×
878
              .append(lineSeparator)
×
879
              .append("\t\t\t")
×
880
              .append(timeSlotEntry.getKey())
×
881
              .append(", ")
×
882
              .append(timeSlotEntry.getValue())
×
883
              .append(",");
×
884
        }
×
885
        dataPartitionRespString.append(lineSeparator).append("\t\t},");
×
886
      }
×
887
      dataPartitionRespString.append(lineSeparator).append("\t}");
×
888
    }
×
889
    dataPartitionRespString.append(lineSeparator).append("}");
×
890

891
    LOGGER.info(
×
892
        "[GetOrCreateDataPartition]:{} Receive PartitionSlotsMap: {}, Return TDataPartitionTableResp: {}",
893
        lineSeparator,
894
        partitionSlotsMapString,
895
        dataPartitionRespString);
896
  }
×
897

898
  private TSStatus confirmLeader() {
899
    // Make sure the consensus layer has been initialized
900
    if (getConsensusManager() == null) {
×
901
      return new TSStatus(TSStatusCode.CONSENSUS_NOT_INITIALIZED.getStatusCode())
×
902
          .setMessage(
×
903
              "ConsensusManager of target-ConfigNode is not initialized, "
904
                  + "please make sure the target-ConfigNode has been started successfully.");
905
    }
906
    return getConsensusManager().confirmLeader();
×
907
  }
908

909
  @Override
910
  public NodeManager getNodeManager() {
911
    return nodeManager;
×
912
  }
913

914
  @Override
915
  public ClusterSchemaManager getClusterSchemaManager() {
916
    return clusterSchemaManager;
×
917
  }
918

919
  @Override
920
  public ConsensusManager getConsensusManager() {
921
    return consensusManager.get();
×
922
  }
923

924
  @Override
925
  public PartitionManager getPartitionManager() {
926
    return partitionManager;
×
927
  }
928

929
  @Override
930
  public LoadManager getLoadManager() {
931
    return loadManager;
×
932
  }
933

934
  @Override
935
  public TriggerManager getTriggerManager() {
936
    return triggerManager;
×
937
  }
938

939
  @Override
940
  public ModelManager getModelManager() {
941
    return modelManager;
×
942
  }
943

944
  @Override
945
  public PipeManager getPipeManager() {
946
    return pipeManager;
×
947
  }
948

949
  @Override
950
  public TSStatus operatePermission(AuthorPlan authorPlan) {
951
    TSStatus status = confirmLeader();
×
952
    if (status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
×
953
      return permissionManager.operatePermission(authorPlan);
×
954
    } else {
955
      return status;
×
956
    }
957
  }
958

959
  @Override
960
  public DataSet queryPermission(AuthorPlan authorPlan) {
961
    TSStatus status = confirmLeader();
×
962
    if (status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
×
963
      return permissionManager.queryPermission(authorPlan);
×
964
    } else {
965
      PermissionInfoResp dataSet = new PermissionInfoResp();
×
966
      dataSet.setStatus(status);
×
967
      return dataSet;
×
968
    }
969
  }
970

971
  @Override
972
  public TPermissionInfoResp login(String username, String password) {
973
    TSStatus status = confirmLeader();
×
974
    if (status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
×
975
      return permissionManager.login(username, password);
×
976
    } else {
977
      TPermissionInfoResp resp = AuthUtils.generateEmptyPermissionInfoResp();
×
978
      resp.setStatus(status);
×
979
      return resp;
×
980
    }
981
  }
982

983
  @Override
984
  public TPermissionInfoResp checkUserPrivileges(
985
      String username, List<PartialPath> paths, int permission) {
986
    TSStatus status = confirmLeader();
×
987
    if (status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
×
988
      return permissionManager.checkUserPrivileges(username, paths, permission);
×
989
    } else {
990
      TPermissionInfoResp resp = AuthUtils.generateEmptyPermissionInfoResp();
×
991
      resp.setStatus(status);
×
992
      return resp;
×
993
    }
994
  }
995

996
  @Override
997
  public TConfigNodeRegisterResp registerConfigNode(TConfigNodeRegisterReq req) {
998
    final int ERROR_STATUS_NODE_ID = -1;
×
999

1000
    TSStatus status = confirmLeader();
×
1001
    if (status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
×
1002
      // Make sure the global configurations are consist
1003
      status = checkConfigNodeGlobalConfig(req);
×
1004
      if (status == null) {
×
1005
        status =
×
1006
            ClusterNodeStartUtils.confirmNodeRegistration(
×
1007
                NodeType.ConfigNode,
1008
                req.getClusterParameters().getClusterName(),
×
1009
                req.getConfigNodeLocation(),
×
1010
                this);
1011
        if (status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
×
1012
          return nodeManager.registerConfigNode(req);
×
1013
        }
1014
      }
1015
    }
1016

1017
    return new TConfigNodeRegisterResp().setStatus(status).setConfigNodeId(ERROR_STATUS_NODE_ID);
×
1018
  }
1019

1020
  @Override
1021
  public TSStatus restartConfigNode(TConfigNodeRestartReq req) {
1022
    TSStatus status = confirmLeader();
×
1023
    // Notice: The Seed-ConfigNode must also have the privilege to do Node restart check.
1024
    // Otherwise, the IoTDB-cluster will not have the ability to restart from scratch.
1025
    if (status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()
×
1026
        || ConfigNodeDescriptor.getInstance().isSeedConfigNode()
×
1027
        || SystemPropertiesUtils.isSeedConfigNode()) {
×
1028
      status =
×
1029
          ClusterNodeStartUtils.confirmNodeRestart(
×
1030
              NodeType.ConfigNode,
1031
              req.getClusterName(),
×
1032
              req.getConfigNodeLocation().getConfigNodeId(),
×
1033
              req.getConfigNodeLocation(),
×
1034
              this);
1035
      if (status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
×
1036
        return ClusterNodeStartUtils.ACCEPT_NODE_RESTART;
×
1037
      }
1038
    }
1039
    return status;
×
1040
  }
1041

1042
  public TSStatus checkConfigNodeGlobalConfig(TConfigNodeRegisterReq req) {
1043
    final String errorPrefix = "Reject register, please ensure that the parameter ";
×
1044
    final String errorSuffix = " is consistent with the Seed-ConfigNode.";
×
1045
    TSStatus errorStatus = new TSStatus(TSStatusCode.CONFIGURATION_ERROR.getStatusCode());
×
1046
    TClusterParameters clusterParameters = req.getClusterParameters();
×
1047

1048
    if (!clusterParameters
×
1049
        .getConfigNodeConsensusProtocolClass()
×
1050
        .equals(CONF.getConfigNodeConsensusProtocolClass())) {
×
1051
      return errorStatus.setMessage(
×
1052
          errorPrefix + "config_node_consensus_protocol_class" + errorSuffix);
1053
    }
1054
    if (!clusterParameters
×
1055
        .getDataRegionConsensusProtocolClass()
×
1056
        .equals(CONF.getDataRegionConsensusProtocolClass())) {
×
1057
      return errorStatus.setMessage(
×
1058
          errorPrefix + "data_region_consensus_protocol_class" + errorSuffix);
1059
    }
1060
    if (!clusterParameters
×
1061
        .getSchemaRegionConsensusProtocolClass()
×
1062
        .equals(CONF.getSchemaRegionConsensusProtocolClass())) {
×
1063
      return errorStatus.setMessage(
×
1064
          errorPrefix + "schema_region_consensus_protocol_class" + errorSuffix);
1065
    }
1066

1067
    if (clusterParameters.getSeriesPartitionSlotNum() != CONF.getSeriesSlotNum()) {
×
1068
      return errorStatus.setMessage(errorPrefix + "series_partition_slot_num" + errorSuffix);
×
1069
    }
1070
    if (!clusterParameters
×
1071
        .getSeriesPartitionExecutorClass()
×
1072
        .equals(CONF.getSeriesPartitionExecutorClass())) {
×
1073
      return errorStatus.setMessage(errorPrefix + "series_partition_executor_class" + errorSuffix);
×
1074
    }
1075

1076
    if (clusterParameters.getDefaultTTL()
×
1077
        != CommonDescriptor.getInstance().getConfig().getDefaultTTLInMs()) {
×
1078
      return errorStatus.setMessage(errorPrefix + "default_ttl" + errorSuffix);
×
1079
    }
1080
    if (clusterParameters.getTimePartitionInterval() != COMMON_CONF.getTimePartitionInterval()) {
×
1081
      return errorStatus.setMessage(errorPrefix + "time_partition_interval" + errorSuffix);
×
1082
    }
1083

1084
    if (clusterParameters.getSchemaReplicationFactor() != CONF.getSchemaReplicationFactor()) {
×
1085
      return errorStatus.setMessage(errorPrefix + "schema_replication_factor" + errorSuffix);
×
1086
    }
1087
    if (clusterParameters.getDataReplicationFactor() != CONF.getDataReplicationFactor()) {
×
1088
      return errorStatus.setMessage(errorPrefix + "data_replication_factor" + errorSuffix);
×
1089
    }
1090

1091
    if (clusterParameters.getSchemaRegionPerDataNode() != CONF.getSchemaRegionPerDataNode()) {
×
1092
      return errorStatus.setMessage(errorPrefix + "schema_region_per_data_node" + errorSuffix);
×
1093
    }
1094
    if (clusterParameters.getDataRegionPerDataNode() != CONF.getDataRegionPerDataNode()) {
×
1095
      return errorStatus.setMessage(errorPrefix + "data_region_per_data_node" + errorSuffix);
×
1096
    }
1097

1098
    if (!clusterParameters.getReadConsistencyLevel().equals(CONF.getReadConsistencyLevel())) {
×
1099
      return errorStatus.setMessage(errorPrefix + "read_consistency_level" + errorSuffix);
×
1100
    }
1101

1102
    if (clusterParameters.getDiskSpaceWarningThreshold()
×
1103
        != COMMON_CONF.getDiskSpaceWarningThreshold()) {
×
1104
      return errorStatus.setMessage(errorPrefix + "disk_space_warning_threshold" + errorSuffix);
×
1105
    }
1106

1107
    if (!clusterParameters.getTimestampPrecision().equals(COMMON_CONF.getTimestampPrecision())) {
×
1108
      return errorStatus.setMessage(errorPrefix + "timestamp_precision" + errorSuffix);
×
1109
    }
1110

1111
    if (!clusterParameters.getSchemaEngineMode().equals(COMMON_CONF.getSchemaEngineMode())) {
×
1112
      return errorStatus.setMessage(errorPrefix + "schema_engine_mode" + errorSuffix);
×
1113
    }
1114

1115
    if (clusterParameters.getTagAttributeTotalSize() != COMMON_CONF.getTagAttributeTotalSize()) {
×
1116
      return errorStatus.setMessage(errorPrefix + "tag_attribute_total_size" + errorSuffix);
×
1117
    }
1118

1119
    if (clusterParameters.getDatabaseLimitThreshold() != COMMON_CONF.getDatabaseLimitThreshold()) {
×
1120
      return errorStatus.setMessage(errorPrefix + "database_limit_threshold" + errorSuffix);
×
1121
    }
1122

1123
    return null;
×
1124
  }
1125

1126
  @Override
1127
  public TSStatus createPeerForConsensusGroup(List<TConfigNodeLocation> configNodeLocations) {
1128
    for (int i = 0; i < 30; i++) {
×
1129
      try {
1130
        if (consensusManager.get() == null) {
×
1131
          Thread.sleep(1000);
×
1132
        } else {
1133
          // When add non Seed-ConfigNode to the ConfigNodeGroup, the parameter should be emptyList
1134
          consensusManager.get().createPeerForConsensusGroup(Collections.emptyList());
×
1135
          return StatusUtils.OK;
×
1136
        }
1137
      } catch (InterruptedException e) {
×
1138
        Thread.currentThread().interrupt();
×
1139
        LOGGER.warn("Unexpected interruption during retry creating peer for consensus group");
×
1140
      } catch (Exception e) {
×
1141
        LOGGER.error("Failed to create peer for consensus group", e);
×
1142
        break;
×
1143
      }
×
1144
    }
1145
    return StatusUtils.INTERNAL_ERROR;
×
1146
  }
1147

1148
  @Override
1149
  public TSStatus removeConfigNode(RemoveConfigNodePlan removeConfigNodePlan) {
1150
    TSStatus status = confirmLeader();
×
1151

1152
    if (status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
×
1153
      status = nodeManager.checkConfigNodeBeforeRemove(removeConfigNodePlan);
×
1154
      if (status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
×
1155
        procedureManager.removeConfigNode(removeConfigNodePlan);
×
1156
      }
1157
    }
1158

1159
    return status;
×
1160
  }
1161

1162
  @Override
1163
  public TSStatus reportConfigNodeShutdown(TConfigNodeLocation configNodeLocation) {
1164
    TSStatus status = confirmLeader();
×
1165
    if (status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
×
1166
      // Force updating the target ConfigNode's status to Unknown
1167
      getLoadManager()
×
1168
          .forceUpdateNodeCache(
×
1169
              NodeType.ConfigNode,
1170
              configNodeLocation.getConfigNodeId(),
×
1171
              NodeHeartbeatSample.generateDefaultSample(NodeStatus.Unknown));
×
1172
      LOGGER.info(
×
1173
          "[ShutdownHook] The ConfigNode-{} will be shutdown soon, mark it as Unknown",
1174
          configNodeLocation.getConfigNodeId());
×
1175
    }
1176
    return status;
×
1177
  }
1178

1179
  @Override
1180
  public TSStatus createFunction(TCreateFunctionReq req) {
1181
    TSStatus status = confirmLeader();
×
1182
    return status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()
×
1183
        ? udfManager.createFunction(req)
×
1184
        : status;
×
1185
  }
1186

1187
  @Override
1188
  public TSStatus dropFunction(String udfName) {
1189
    TSStatus status = confirmLeader();
×
1190
    return status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()
×
1191
        ? udfManager.dropFunction(udfName)
×
1192
        : status;
×
1193
  }
1194

1195
  @Override
1196
  public TGetUDFTableResp getUDFTable() {
1197
    TSStatus status = confirmLeader();
×
1198
    return status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()
×
1199
        ? udfManager.getUDFTable()
×
1200
        : new TGetUDFTableResp(status, Collections.emptyList());
×
1201
  }
1202

1203
  @Override
1204
  public TGetJarInListResp getUDFJar(TGetJarInListReq req) {
1205
    TSStatus status = confirmLeader();
×
1206
    return status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()
×
1207
        ? udfManager.getUDFJar(req)
×
1208
        : new TGetJarInListResp(status, Collections.emptyList());
×
1209
  }
1210

1211
  @Override
1212
  public TSStatus createTrigger(TCreateTriggerReq req) {
1213
    TSStatus status = confirmLeader();
×
1214
    return status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()
×
1215
        ? triggerManager.createTrigger(req)
×
1216
        : status;
×
1217
  }
1218

1219
  @Override
1220
  public TSStatus dropTrigger(TDropTriggerReq req) {
1221
    TSStatus status = confirmLeader();
×
1222
    return status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()
×
1223
        ? triggerManager.dropTrigger(req)
×
1224
        : status;
×
1225
  }
1226

1227
  @Override
1228
  public TGetTriggerTableResp getTriggerTable() {
1229
    TSStatus status = confirmLeader();
×
1230
    return status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()
×
1231
        ? triggerManager.getTriggerTable(false)
×
1232
        : new TGetTriggerTableResp(status, Collections.emptyList());
×
1233
  }
1234

1235
  @Override
1236
  public TGetTriggerTableResp getStatefulTriggerTable() {
1237
    TSStatus status = confirmLeader();
×
1238
    return status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()
×
1239
        ? triggerManager.getTriggerTable(true)
×
1240
        : new TGetTriggerTableResp(status, Collections.emptyList());
×
1241
  }
1242

1243
  @Override
1244
  public TGetLocationForTriggerResp getLocationOfStatefulTrigger(String triggerName) {
1245
    TSStatus status = confirmLeader();
×
1246
    return status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()
×
1247
        ? triggerManager.getLocationOfStatefulTrigger(triggerName)
×
1248
        : new TGetLocationForTriggerResp(status);
×
1249
  }
1250

1251
  @Override
1252
  public TGetJarInListResp getTriggerJar(TGetJarInListReq req) {
1253
    TSStatus status = confirmLeader();
×
1254
    return status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()
×
1255
        ? triggerManager.getTriggerJar(req)
×
1256
        : new TGetJarInListResp(status, Collections.emptyList());
×
1257
  }
1258

1259
  @Override
1260
  public TSStatus createPipePlugin(TCreatePipePluginReq req) {
1261
    TSStatus status = confirmLeader();
×
1262
    return status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()
×
1263
        ? pipeManager.getPipePluginCoordinator().createPipePlugin(req)
×
1264
        : status;
×
1265
  }
1266

1267
  @Override
1268
  public TSStatus dropPipePlugin(String pipePluginName) {
1269
    TSStatus status = confirmLeader();
×
1270
    return status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()
×
1271
        ? pipeManager.getPipePluginCoordinator().dropPipePlugin(pipePluginName)
×
1272
        : status;
×
1273
  }
1274

1275
  @Override
1276
  public TGetPipePluginTableResp getPipePluginTable() {
1277
    TSStatus status = confirmLeader();
×
1278
    return status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()
×
1279
        ? pipeManager.getPipePluginCoordinator().getPipePluginTable()
×
1280
        : new TGetPipePluginTableResp(status, Collections.emptyList());
×
1281
  }
1282

1283
  @Override
1284
  public TGetJarInListResp getPipePluginJar(TGetJarInListReq req) {
1285
    TSStatus status = confirmLeader();
×
1286
    return status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()
×
1287
        ? pipeManager.getPipePluginCoordinator().getPipePluginJar(req)
×
1288
        : new TGetJarInListResp(status, Collections.emptyList());
×
1289
  }
1290

1291
  @Override
1292
  public TSStatus merge() {
1293
    TSStatus status = confirmLeader();
×
1294
    return status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()
×
1295
        ? RpcUtils.squashResponseStatusList(nodeManager.merge())
×
1296
        : status;
×
1297
  }
1298

1299
  @Override
1300
  public TSStatus flush(TFlushReq req) {
1301
    TSStatus status = confirmLeader();
×
1302
    return status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()
×
1303
        ? RpcUtils.squashResponseStatusList(nodeManager.flush(req))
×
1304
        : status;
×
1305
  }
1306

1307
  @Override
1308
  public TSStatus clearCache() {
1309
    TSStatus status = confirmLeader();
×
1310
    return status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()
×
1311
        ? RpcUtils.squashResponseStatusList(nodeManager.clearCache())
×
1312
        : status;
×
1313
  }
1314

1315
  @Override
1316
  public TSStatus loadConfiguration() {
1317
    TSStatus status = confirmLeader();
×
1318
    return status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()
×
1319
        ? RpcUtils.squashResponseStatusList(nodeManager.loadConfiguration())
×
1320
        : status;
×
1321
  }
1322

1323
  @Override
1324
  public TSStatus setSystemStatus(String systemStatus) {
1325
    TSStatus status = confirmLeader();
×
1326
    return status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()
×
1327
        ? RpcUtils.squashResponseStatusList(nodeManager.setSystemStatus(systemStatus))
×
1328
        : status;
×
1329
  }
1330

1331
  @Override
1332
  public TSStatus setDataNodeStatus(TSetDataNodeStatusReq req) {
1333
    TSStatus status = confirmLeader();
×
1334
    return status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()
×
1335
        ? nodeManager.setDataNodeStatus(req)
×
1336
        : status;
×
1337
  }
1338

1339
  @Override
1340
  public TSStatus killQuery(String queryId, int dataNodeId) {
1341
    TSStatus status = confirmLeader();
×
1342
    return status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()
×
1343
        ? nodeManager.killQuery(queryId, dataNodeId)
×
1344
        : status;
×
1345
  }
1346

1347
  @Override
1348
  public TGetDataNodeLocationsResp getRunningDataNodeLocations() {
1349
    TSStatus status = confirmLeader();
×
1350
    return status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()
×
1351
        ? new TGetDataNodeLocationsResp(
×
1352
            new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode()),
×
1353
            nodeManager.filterDataNodeThroughStatus(NodeStatus.Running).stream()
×
1354
                .map(TDataNodeConfiguration::getLocation)
×
1355
                .collect(Collectors.toList()))
×
1356
        : new TGetDataNodeLocationsResp(status, Collections.emptyList());
×
1357
  }
1358

1359
  @Override
1360
  public TRegionRouteMapResp getLatestRegionRouteMap() {
1361
    TSStatus status = confirmLeader();
×
1362
    TRegionRouteMapResp resp = new TRegionRouteMapResp(status);
×
1363

1364
    if (status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
×
1365
      resp.setTimestamp(System.currentTimeMillis());
×
1366
      resp.setRegionRouteMap(getLoadManager().getRegionPriorityMap());
×
1367
    }
1368

1369
    return resp;
×
1370
  }
1371

1372
  @Override
1373
  public UDFManager getUDFManager() {
1374
    return udfManager;
×
1375
  }
1376

1377
  @Override
1378
  public RegionInfoListResp showRegion(GetRegionInfoListPlan getRegionInfoListPlan) {
1379
    TSStatus status = confirmLeader();
×
1380
    if (status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
×
1381
      return partitionManager.getRegionInfoList(getRegionInfoListPlan);
×
1382
    } else {
1383
      RegionInfoListResp regionResp = new RegionInfoListResp();
×
1384
      regionResp.setStatus(status);
×
1385
      return regionResp;
×
1386
    }
1387
  }
1388

1389
  @Override
1390
  public TShowDataNodesResp showDataNodes() {
1391
    TSStatus status = confirmLeader();
×
1392
    TShowDataNodesResp resp = new TShowDataNodesResp();
×
1393
    if (status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
×
1394
      return resp.setDataNodesInfoList(nodeManager.getRegisteredDataNodeInfoList())
×
1395
          .setStatus(StatusUtils.OK);
×
1396
    } else {
1397
      return resp.setStatus(status);
×
1398
    }
1399
  }
1400

1401
  @Override
1402
  public TShowConfigNodesResp showConfigNodes() {
1403
    TSStatus status = confirmLeader();
×
1404
    TShowConfigNodesResp resp = new TShowConfigNodesResp();
×
1405
    if (status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
×
1406
      return resp.setConfigNodesInfoList(nodeManager.getRegisteredConfigNodeInfoList())
×
1407
          .setStatus(StatusUtils.OK);
×
1408
    } else {
1409
      return resp.setStatus(status);
×
1410
    }
1411
  }
1412

1413
  @Override
1414
  public TShowDatabaseResp showDatabase(GetDatabasePlan getDatabasePlan) {
1415
    TSStatus status = confirmLeader();
×
1416
    if (status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
×
1417
      return getClusterSchemaManager().showDatabase(getDatabasePlan);
×
1418
    } else {
1419
      return new TShowDatabaseResp().setStatus(status);
×
1420
    }
1421
  }
1422

1423
  @Override
1424
  public ProcedureManager getProcedureManager() {
1425
    return procedureManager;
×
1426
  }
1427

1428
  @Override
1429
  public CQManager getCQManager() {
1430
    return cqManager;
×
1431
  }
1432

1433
  @Override
1434
  public ClusterQuotaManager getClusterQuotaManager() {
1435
    return clusterQuotaManager;
×
1436
  }
1437

1438
  @Override
1439
  public RetryFailedTasksThread getRetryFailedTasksThread() {
1440
    return retryFailedTasksThread;
×
1441
  }
1442

1443
  @Override
1444
  public void addMetrics() {
1445
    MetricService.getInstance().addMetricSet(new NodeMetrics(getNodeManager()));
×
1446
    MetricService.getInstance().addMetricSet(new PartitionMetrics(this));
×
1447
  }
×
1448

1449
  @Override
1450
  public TSStatus createSchemaTemplate(TCreateSchemaTemplateReq req) {
1451
    TSStatus status = confirmLeader();
×
1452
    if (status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
×
1453
      CreateSchemaTemplatePlan createSchemaTemplatePlan =
×
1454
          new CreateSchemaTemplatePlan(req.getSerializedTemplate());
×
1455
      return clusterSchemaManager.createTemplate(createSchemaTemplatePlan);
×
1456
    } else {
1457
      return status;
×
1458
    }
1459
  }
1460

1461
  @Override
1462
  public TGetAllTemplatesResp getAllTemplates() {
1463
    TSStatus status = confirmLeader();
×
1464
    if (status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
×
1465
      return clusterSchemaManager.getAllTemplates();
×
1466
    } else {
1467
      return new TGetAllTemplatesResp().setStatus(status);
×
1468
    }
1469
  }
1470

1471
  @Override
1472
  public TGetTemplateResp getTemplate(String req) {
1473
    TSStatus status = confirmLeader();
×
1474
    if (status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
×
1475
      return clusterSchemaManager.getTemplate(req);
×
1476
    } else {
1477
      return new TGetTemplateResp().setStatus(status);
×
1478
    }
1479
  }
1480

1481
  @Override
1482
  public synchronized TSStatus setSchemaTemplate(TSetSchemaTemplateReq req) {
1483
    TSStatus status = confirmLeader();
×
1484
    if (status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
×
1485
      return procedureManager.setSchemaTemplate(req.getQueryId(), req.getName(), req.getPath());
×
1486
    } else {
1487
      return status;
×
1488
    }
1489
  }
1490

1491
  @Override
1492
  public TGetPathsSetTemplatesResp getPathsSetTemplate(String req) {
1493
    TSStatus status = confirmLeader();
×
1494
    if (status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
×
1495
      return clusterSchemaManager.getPathsSetTemplate(req);
×
1496
    } else {
1497
      return new TGetPathsSetTemplatesResp(status);
×
1498
    }
1499
  }
1500

1501
  @Override
1502
  public TSStatus deactivateSchemaTemplate(TDeactivateSchemaTemplateReq req) {
1503
    TSStatus status = confirmLeader();
×
1504
    if (status.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
×
1505
      return status;
×
1506
    }
1507

1508
    PathPatternTree patternTree =
×
1509
        PathPatternTree.deserialize(ByteBuffer.wrap(req.getPathPatternTree()));
×
1510

1511
    List<PartialPath> patternList = patternTree.getAllPathPatterns();
×
1512
    TemplateSetInfoResp templateSetInfoResp = clusterSchemaManager.getTemplateSetInfo(patternList);
×
1513
    if (templateSetInfoResp.getStatus().getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
×
1514
      return templateSetInfoResp.getStatus();
×
1515
    }
1516

1517
    Map<PartialPath, List<Template>> templateSetInfo = templateSetInfoResp.getPatternTemplateMap();
×
1518
    if (templateSetInfo.isEmpty()) {
×
1519
      return RpcUtils.getStatus(
×
1520
          TSStatusCode.TEMPLATE_NOT_SET,
1521
          String.format(
×
1522
              "Schema Template %s is not set on any prefix path of %s",
1523
              req.getTemplateName(), patternList));
×
1524
    }
1525

1526
    if (!req.getTemplateName().equals(ONE_LEVEL_PATH_WILDCARD)) {
×
1527
      Map<PartialPath, List<Template>> filteredTemplateSetInfo = new HashMap<>();
×
1528
      for (Map.Entry<PartialPath, List<Template>> entry : templateSetInfo.entrySet()) {
×
1529
        for (Template template : entry.getValue()) {
×
1530
          if (template.getName().equals(req.getTemplateName())) {
×
1531
            filteredTemplateSetInfo.put(entry.getKey(), Collections.singletonList(template));
×
1532
            break;
×
1533
          }
1534
        }
×
1535
      }
×
1536

1537
      if (filteredTemplateSetInfo.isEmpty()) {
×
1538
        return RpcUtils.getStatus(
×
1539
            TSStatusCode.TEMPLATE_NOT_SET,
1540
            String.format(
×
1541
                "Schema Template %s is not set on any prefix path of %s",
1542
                req.getTemplateName(), patternList));
×
1543
      }
1544

1545
      templateSetInfo = filteredTemplateSetInfo;
×
1546
    }
1547

1548
    return procedureManager.deactivateTemplate(req.getQueryId(), templateSetInfo);
×
1549
  }
1550

1551
  @Override
1552
  public synchronized TSStatus unsetSchemaTemplate(TUnsetSchemaTemplateReq req) {
1553
    TSStatus status = confirmLeader();
×
1554
    if (status.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
×
1555
      return status;
×
1556
    }
1557
    Pair<TSStatus, Template> checkResult =
×
1558
        clusterSchemaManager.checkIsTemplateSetOnPath(req.getTemplateName(), req.getPath());
×
1559
    if (checkResult.left.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
×
1560
      try {
1561
        return procedureManager.unsetSchemaTemplate(
×
1562
            req.getQueryId(), checkResult.right, new PartialPath(req.getPath()));
×
1563
      } catch (IllegalPathException e) {
×
1564
        return RpcUtils.getStatus(e.getErrorCode(), e.getMessage());
×
1565
      }
1566
    } else {
1567
      return checkResult.left;
×
1568
    }
1569
  }
1570

1571
  @Override
1572
  public TSStatus dropSchemaTemplate(String templateName) {
1573
    TSStatus status = confirmLeader();
×
1574
    if (status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
×
1575
      return clusterSchemaManager.dropSchemaTemplate(templateName);
×
1576
    } else {
1577
      return status;
×
1578
    }
1579
  }
1580

1581
  @Override
1582
  public TSStatus alterSchemaTemplate(TAlterSchemaTemplateReq req) {
1583
    TSStatus status = confirmLeader();
×
1584
    if (status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
×
1585
      ByteBuffer buffer = ByteBuffer.wrap(req.getTemplateAlterInfo());
×
1586
      TemplateAlterOperationType operationType =
×
1587
          TemplateAlterOperationUtil.parseOperationType(buffer);
×
1588
      if (operationType.equals(TemplateAlterOperationType.EXTEND_TEMPLATE)) {
×
1589
        return clusterSchemaManager.extendSchemaTemplate(
×
1590
            TemplateAlterOperationUtil.parseTemplateExtendInfo(buffer));
×
1591
      }
1592
      return RpcUtils.getStatus(TSStatusCode.UNSUPPORTED_OPERATION);
×
1593
    } else {
1594
      return status;
×
1595
    }
1596
  }
1597

1598
  @Override
1599
  public TSStatus deleteTimeSeries(TDeleteTimeSeriesReq req) {
1600
    TSStatus status = confirmLeader();
×
1601
    if (status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
×
1602
      return procedureManager.deleteTimeSeries(req);
×
1603
    } else {
1604
      return status;
×
1605
    }
1606
  }
1607

1608
  @Override
1609
  public TSStatus deleteLogicalView(TDeleteLogicalViewReq req) {
1610
    TSStatus status = confirmLeader();
×
1611
    if (status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
×
1612
      return procedureManager.deleteLogicalView(req);
×
1613
    } else {
1614
      return status;
×
1615
    }
1616
  }
1617

1618
  @Override
1619
  public TSStatus alterLogicalView(TAlterLogicalViewReq req) {
1620
    TSStatus status = confirmLeader();
×
1621
    if (status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
×
1622
      return procedureManager.alterLogicalView(req);
×
1623
    } else {
1624
      return status;
×
1625
    }
1626
  }
1627

1628
  @Override
1629
  public TSStatus createPipe(TCreatePipeReq req) {
1630
    TSStatus status = confirmLeader();
×
1631
    return status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()
×
1632
        ? pipeManager.getPipeTaskCoordinator().createPipe(req)
×
1633
        : status;
×
1634
  }
1635

1636
  @Override
1637
  public TSStatus startPipe(String pipeName) {
1638
    TSStatus status = confirmLeader();
×
1639
    return status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()
×
1640
        ? pipeManager.getPipeTaskCoordinator().startPipe(pipeName)
×
1641
        : status;
×
1642
  }
1643

1644
  @Override
1645
  public TSStatus stopPipe(String pipeName) {
1646
    TSStatus status = confirmLeader();
×
1647
    return status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()
×
1648
        ? pipeManager.getPipeTaskCoordinator().stopPipe(pipeName)
×
1649
        : status;
×
1650
  }
1651

1652
  @Override
1653
  public TSStatus dropPipe(String pipeName) {
1654
    TSStatus status = confirmLeader();
×
1655
    return status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()
×
1656
        ? pipeManager.getPipeTaskCoordinator().dropPipe(pipeName)
×
1657
        : status;
×
1658
  }
1659

1660
  @Override
1661
  public TShowPipeResp showPipe(TShowPipeReq req) {
1662
    TSStatus status = confirmLeader();
×
1663
    return status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()
×
1664
        ? pipeManager.getPipeTaskCoordinator().showPipes(req)
×
1665
        : new TShowPipeResp().setStatus(status);
×
1666
  }
1667

1668
  @Override
1669
  public TGetAllPipeInfoResp getAllPipeInfo() {
1670
    TSStatus status = confirmLeader();
×
1671
    return status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()
×
1672
        ? pipeManager.getPipeTaskCoordinator().getAllPipeInfo()
×
1673
        : new TGetAllPipeInfoResp().setStatus(status);
×
1674
  }
1675

1676
  @Override
1677
  public TGetRegionIdResp getRegionId(TGetRegionIdReq req) {
1678
    TSStatus status = confirmLeader();
×
1679
    return status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()
×
1680
        ? partitionManager.getRegionId(req).convertToRpcGetRegionIdResp()
×
1681
        : new TGetRegionIdResp(status);
×
1682
  }
1683

1684
  @Override
1685
  public TGetTimeSlotListResp getTimeSlotList(TGetTimeSlotListReq req) {
1686
    TSStatus status = confirmLeader();
×
1687
    return status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()
×
1688
        ? partitionManager.getTimeSlotList(req).convertToRpcGetTimeSlotListResp()
×
1689
        : new TGetTimeSlotListResp(status);
×
1690
  }
1691

1692
  @Override
1693
  public TCountTimeSlotListResp countTimeSlotList(TCountTimeSlotListReq req) {
1694
    TSStatus status = confirmLeader();
×
1695
    return status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()
×
1696
        ? partitionManager.countTimeSlotList(req).convertToRpcCountTimeSlotListResp()
×
1697
        : new TCountTimeSlotListResp(status);
×
1698
  }
1699

1700
  @Override
1701
  public TGetSeriesSlotListResp getSeriesSlotList(TGetSeriesSlotListReq req) {
1702
    TSStatus status = confirmLeader();
×
1703
    return status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()
×
1704
        ? partitionManager.getSeriesSlotList(req).convertToRpcGetSeriesSlotListResp()
×
1705
        : new TGetSeriesSlotListResp(status);
×
1706
  }
1707

1708
  @Override
1709
  public TSStatus migrateRegion(TMigrateRegionReq req) {
1710
    TSStatus status = confirmLeader();
×
1711
    return status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()
×
1712
        ? procedureManager.migrateRegion(req)
×
1713
        : status;
×
1714
  }
1715

1716
  @Override
1717
  public TSStatus createCQ(TCreateCQReq req) {
1718
    TSStatus status = confirmLeader();
×
1719
    return status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()
×
1720
        ? cqManager.createCQ(req)
×
1721
        : status;
×
1722
  }
1723

1724
  @Override
1725
  public TSStatus dropCQ(TDropCQReq req) {
1726
    TSStatus status = confirmLeader();
×
1727
    return status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()
×
1728
        ? cqManager.dropCQ(req)
×
1729
        : status;
×
1730
  }
1731

1732
  @Override
1733
  public TShowCQResp showCQ() {
1734
    TSStatus status = confirmLeader();
×
1735
    return status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()
×
1736
        ? cqManager.showCQ()
×
1737
        : new TShowCQResp(status, Collections.emptyList());
×
1738
  }
1739

1740
  /**
1741
   * Get all related schemaRegion which may contains the timeseries matched by given patternTree.
1742
   */
1743
  public Map<TConsensusGroupId, TRegionReplicaSet> getRelatedSchemaRegionGroup(
1744
      PathPatternTree patternTree) {
1745
    Map<String, Map<TSeriesPartitionSlot, TConsensusGroupId>> schemaPartitionTable =
×
1746
        getSchemaPartition(patternTree).getSchemaPartitionTable();
×
1747

1748
    List<TRegionReplicaSet> allRegionReplicaSets = getPartitionManager().getAllReplicaSets();
×
1749
    Set<TConsensusGroupId> groupIdSet =
×
1750
        schemaPartitionTable.values().stream()
×
1751
            .flatMap(m -> m.values().stream())
×
1752
            .collect(Collectors.toSet());
×
1753
    Map<TConsensusGroupId, TRegionReplicaSet> filteredRegionReplicaSets = new HashMap<>();
×
1754
    for (TRegionReplicaSet regionReplicaSet : allRegionReplicaSets) {
×
1755
      if (groupIdSet.contains(regionReplicaSet.getRegionId())) {
×
1756
        filteredRegionReplicaSets.put(regionReplicaSet.getRegionId(), regionReplicaSet);
×
1757
      }
1758
    }
×
1759
    return filteredRegionReplicaSets;
×
1760
  }
1761

1762
  /**
1763
   * Get all related dataRegion which may contains the data of specific timeseries matched by given
1764
   * patternTree
1765
   */
1766
  public Map<TConsensusGroupId, TRegionReplicaSet> getRelatedDataRegionGroup(
1767
      PathPatternTree patternTree) {
1768
    // Get all databases and slots by getting schemaengine partition
1769
    Map<String, Map<TSeriesPartitionSlot, TConsensusGroupId>> schemaPartitionTable =
×
1770
        getSchemaPartition(patternTree).getSchemaPartitionTable();
×
1771

1772
    // Construct request for getting data partition
1773
    Map<String, Map<TSeriesPartitionSlot, TTimeSlotList>> partitionSlotsMap = new HashMap<>();
×
1774
    schemaPartitionTable.forEach(
×
1775
        (key, value) -> {
1776
          Map<TSeriesPartitionSlot, TTimeSlotList> slotListMap = new HashMap<>();
×
1777
          value
×
1778
              .keySet()
×
1779
              .forEach(
×
1780
                  slot ->
1781
                      slotListMap.put(
×
1782
                          slot, new TTimeSlotList(Collections.emptyList(), true, true)));
×
1783
          partitionSlotsMap.put(key, slotListMap);
×
1784
        });
×
1785

1786
    // Get all data partitions
1787
    GetDataPartitionPlan getDataPartitionPlan = new GetDataPartitionPlan(partitionSlotsMap);
×
1788
    Map<String, Map<TSeriesPartitionSlot, Map<TTimePartitionSlot, List<TConsensusGroupId>>>>
1789
        dataPartitionTable = getDataPartition(getDataPartitionPlan).getDataPartitionTable();
×
1790

1791
    // Get all region replicaset of target data partitions
1792
    List<TRegionReplicaSet> allRegionReplicaSets = getPartitionManager().getAllReplicaSets();
×
1793
    Set<TConsensusGroupId> groupIdSet =
×
1794
        dataPartitionTable.values().stream()
×
1795
            .flatMap(
×
1796
                tSeriesPartitionSlotMapMap ->
1797
                    tSeriesPartitionSlotMapMap.values().stream()
×
1798
                        .flatMap(
×
1799
                            tTimePartitionSlotListMap ->
1800
                                tTimePartitionSlotListMap.values().stream()
×
1801
                                    .flatMap(Collection::stream)))
×
1802
            .collect(Collectors.toSet());
×
1803
    Map<TConsensusGroupId, TRegionReplicaSet> filteredRegionReplicaSets = new HashMap<>();
×
1804
    for (TRegionReplicaSet regionReplicaSet : allRegionReplicaSets) {
×
1805
      if (groupIdSet.contains(regionReplicaSet.getRegionId())) {
×
1806
        filteredRegionReplicaSets.put(regionReplicaSet.getRegionId(), regionReplicaSet);
×
1807
      }
1808
    }
×
1809
    return filteredRegionReplicaSets;
×
1810
  }
1811

1812
  public TSStatus transfer(List<TDataNodeLocation> newUnknownDataList) {
1813
    Map<Integer, TDataNodeLocation> runningDataNodeLocationMap = new HashMap<>();
×
1814
    nodeManager
×
1815
        .filterDataNodeThroughStatus(NodeStatus.Running)
×
1816
        .forEach(
×
1817
            dataNodeConfiguration ->
1818
                runningDataNodeLocationMap.put(
×
1819
                    dataNodeConfiguration.getLocation().getDataNodeId(),
×
1820
                    dataNodeConfiguration.getLocation()));
×
1821
    if (runningDataNodeLocationMap.isEmpty()) {
×
1822
      // No running DataNode, will not transfer and print log
1823
      return new TSStatus(TSStatusCode.EXECUTE_STATEMENT_ERROR.getStatusCode());
×
1824
    }
1825

1826
    newUnknownDataList.forEach(
×
1827
        dataNodeLocation -> runningDataNodeLocationMap.remove(dataNodeLocation.getDataNodeId()));
×
1828

1829
    LOGGER.info("Start transfer of {}", newUnknownDataList);
×
1830
    // Transfer trigger
1831
    TSStatus transferResult =
×
1832
        triggerManager.transferTrigger(newUnknownDataList, runningDataNodeLocationMap);
×
1833
    if (transferResult.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
×
1834
      LOGGER.warn("Fail to transfer because {}, will retry", transferResult.getMessage());
×
1835
    }
1836

1837
    return transferResult;
×
1838
  }
1839

1840
  @Override
1841
  public TSStatus createModel(TCreateModelReq req) {
1842
    TSStatus status = confirmLeader();
×
1843
    return status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()
×
1844
        ? modelManager.createModel(req)
×
1845
        : status;
×
1846
  }
1847

1848
  @Override
1849
  public TSStatus dropModel(TDropModelReq req) {
1850
    TSStatus status = confirmLeader();
×
1851
    return status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()
×
1852
        ? modelManager.dropModel(req)
×
1853
        : status;
×
1854
  }
1855

1856
  @Override
1857
  public TShowModelResp showModel(TShowModelReq req) {
1858
    TSStatus status = confirmLeader();
×
1859
    return status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()
×
1860
        ? modelManager.showModel(req)
×
1861
        : new TShowModelResp(status, Collections.emptyList());
×
1862
  }
1863

1864
  @Override
1865
  public TShowTrailResp showTrail(TShowTrailReq req) {
1866
    TSStatus status = confirmLeader();
×
1867
    return status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()
×
1868
        ? modelManager.showTrail(req)
×
1869
        : new TShowTrailResp(status, Collections.emptyList());
×
1870
  }
1871

1872
  @Override
1873
  public TSStatus updateModelInfo(TUpdateModelInfoReq req) {
1874
    TSStatus status = confirmLeader();
×
1875
    return status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()
×
1876
        ? modelManager.updateModelInfo(req)
×
1877
        : status;
×
1878
  }
1879

1880
  @Override
1881
  public TSStatus updateModelState(TUpdateModelStateReq req) {
1882
    TSStatus status = confirmLeader();
×
1883
    return status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()
×
1884
        ? modelManager.updateModelState(req)
×
1885
        : status;
×
1886
  }
1887

1888
  @Override
1889
  public TSStatus setSpaceQuota(TSetSpaceQuotaReq req) {
1890
    TSStatus status = confirmLeader();
×
1891
    return status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()
×
1892
        ? clusterQuotaManager.setSpaceQuota(req)
×
1893
        : status;
×
1894
  }
1895

1896
  public TSpaceQuotaResp showSpaceQuota(List<String> databases) {
1897
    TSStatus status = confirmLeader();
×
1898
    return status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()
×
1899
        ? clusterQuotaManager.showSpaceQuota(databases)
×
1900
        : new TSpaceQuotaResp(status);
×
1901
  }
1902

1903
  public TSpaceQuotaResp getSpaceQuota() {
1904
    TSStatus status = confirmLeader();
×
1905
    return status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()
×
1906
        ? clusterQuotaManager.getSpaceQuota()
×
1907
        : new TSpaceQuotaResp(status);
×
1908
  }
1909

1910
  public TSStatus setThrottleQuota(TSetThrottleQuotaReq req) {
1911
    TSStatus status = confirmLeader();
×
1912
    return status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()
×
1913
        ? clusterQuotaManager.setThrottleQuota(req)
×
1914
        : status;
×
1915
  }
1916

1917
  public TThrottleQuotaResp showThrottleQuota(TShowThrottleReq req) {
1918
    TSStatus status = confirmLeader();
×
1919
    return status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()
×
1920
        ? clusterQuotaManager.showThrottleQuota(req)
×
1921
        : new TThrottleQuotaResp(status);
×
1922
  }
1923

1924
  public TThrottleQuotaResp getThrottleQuota() {
1925
    TSStatus status = confirmLeader();
×
1926
    return status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()
×
1927
        ? clusterQuotaManager.getThrottleQuota()
×
1928
        : new TThrottleQuotaResp(status);
×
1929
  }
1930
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc