• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

apache / iotdb / #9905

23 Aug 2023 06:20AM UTC coverage: 47.785% (-0.1%) from 47.922%
#9905

push

travis_ci

web-flow
[To rel/1.2][Metric] Fix flush point statistics (#10934)

23 of 23 new or added lines in 1 file covered. (100.0%)

79851 of 167106 relevant lines covered (47.78%)

0.48 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

0.34
/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConfigManager.java
1
/*
2
 * Licensed to the Apache Software Foundation (ASF) under one
3
 * or more contributor license agreements.  See the NOTICE file
4
 * distributed with this work for additional information
5
 * regarding copyright ownership.  The ASF licenses this file
6
 * to you under the Apache License, Version 2.0 (the
7
 * "License"); you may not use this file except in compliance
8
 * with the License.  You may obtain a copy of the License at
9
 *
10
 *     http://www.apache.org/licenses/LICENSE-2.0
11
 *
12
 * Unless required by applicable law or agreed to in writing,
13
 * software distributed under the License is distributed on an
14
 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15
 * KIND, either express or implied.  See the License for the
16
 * specific language governing permissions and limitations
17
 * under the License.
18
 */
19

20
package org.apache.iotdb.confignode.manager;
21

22
import org.apache.iotdb.common.rpc.thrift.TConfigNodeLocation;
23
import org.apache.iotdb.common.rpc.thrift.TConsensusGroupId;
24
import org.apache.iotdb.common.rpc.thrift.TDataNodeConfiguration;
25
import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation;
26
import org.apache.iotdb.common.rpc.thrift.TFlushReq;
27
import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet;
28
import org.apache.iotdb.common.rpc.thrift.TSStatus;
29
import org.apache.iotdb.common.rpc.thrift.TSeriesPartitionSlot;
30
import org.apache.iotdb.common.rpc.thrift.TSetSpaceQuotaReq;
31
import org.apache.iotdb.common.rpc.thrift.TSetThrottleQuotaReq;
32
import org.apache.iotdb.common.rpc.thrift.TTimePartitionSlot;
33
import org.apache.iotdb.commons.cluster.NodeStatus;
34
import org.apache.iotdb.commons.cluster.NodeType;
35
import org.apache.iotdb.commons.conf.CommonConfig;
36
import org.apache.iotdb.commons.conf.CommonDescriptor;
37
import org.apache.iotdb.commons.conf.IoTDBConstant;
38
import org.apache.iotdb.commons.exception.IllegalPathException;
39
import org.apache.iotdb.commons.path.PartialPath;
40
import org.apache.iotdb.commons.path.PathPatternTree;
41
import org.apache.iotdb.commons.service.metric.MetricService;
42
import org.apache.iotdb.commons.utils.AuthUtils;
43
import org.apache.iotdb.commons.utils.PathUtils;
44
import org.apache.iotdb.commons.utils.StatusUtils;
45
import org.apache.iotdb.confignode.conf.ConfigNodeConfig;
46
import org.apache.iotdb.confignode.conf.ConfigNodeDescriptor;
47
import org.apache.iotdb.confignode.conf.SystemPropertiesUtils;
48
import org.apache.iotdb.confignode.consensus.request.auth.AuthorPlan;
49
import org.apache.iotdb.confignode.consensus.request.read.database.CountDatabasePlan;
50
import org.apache.iotdb.confignode.consensus.request.read.database.GetDatabasePlan;
51
import org.apache.iotdb.confignode.consensus.request.read.datanode.GetDataNodeConfigurationPlan;
52
import org.apache.iotdb.confignode.consensus.request.read.partition.GetDataPartitionPlan;
53
import org.apache.iotdb.confignode.consensus.request.read.partition.GetNodePathsPartitionPlan;
54
import org.apache.iotdb.confignode.consensus.request.read.partition.GetOrCreateDataPartitionPlan;
55
import org.apache.iotdb.confignode.consensus.request.read.partition.GetOrCreateSchemaPartitionPlan;
56
import org.apache.iotdb.confignode.consensus.request.read.partition.GetSchemaPartitionPlan;
57
import org.apache.iotdb.confignode.consensus.request.read.region.GetRegionInfoListPlan;
58
import org.apache.iotdb.confignode.consensus.request.write.confignode.RemoveConfigNodePlan;
59
import org.apache.iotdb.confignode.consensus.request.write.database.DatabaseSchemaPlan;
60
import org.apache.iotdb.confignode.consensus.request.write.database.SetDataReplicationFactorPlan;
61
import org.apache.iotdb.confignode.consensus.request.write.database.SetSchemaReplicationFactorPlan;
62
import org.apache.iotdb.confignode.consensus.request.write.database.SetTTLPlan;
63
import org.apache.iotdb.confignode.consensus.request.write.database.SetTimePartitionIntervalPlan;
64
import org.apache.iotdb.confignode.consensus.request.write.datanode.RemoveDataNodePlan;
65
import org.apache.iotdb.confignode.consensus.request.write.template.CreateSchemaTemplatePlan;
66
import org.apache.iotdb.confignode.consensus.response.auth.PermissionInfoResp;
67
import org.apache.iotdb.confignode.consensus.response.database.CountDatabaseResp;
68
import org.apache.iotdb.confignode.consensus.response.database.DatabaseSchemaResp;
69
import org.apache.iotdb.confignode.consensus.response.datanode.ConfigurationResp;
70
import org.apache.iotdb.confignode.consensus.response.datanode.DataNodeConfigurationResp;
71
import org.apache.iotdb.confignode.consensus.response.datanode.DataNodeRegisterResp;
72
import org.apache.iotdb.confignode.consensus.response.datanode.DataNodeToStatusResp;
73
import org.apache.iotdb.confignode.consensus.response.partition.DataPartitionResp;
74
import org.apache.iotdb.confignode.consensus.response.partition.RegionInfoListResp;
75
import org.apache.iotdb.confignode.consensus.response.partition.SchemaNodeManagementResp;
76
import org.apache.iotdb.confignode.consensus.response.partition.SchemaPartitionResp;
77
import org.apache.iotdb.confignode.consensus.response.template.TemplateSetInfoResp;
78
import org.apache.iotdb.confignode.consensus.statemachine.ConfigRegionStateMachine;
79
import org.apache.iotdb.confignode.manager.consensus.ConsensusManager;
80
import org.apache.iotdb.confignode.manager.cq.CQManager;
81
import org.apache.iotdb.confignode.manager.load.LoadManager;
82
import org.apache.iotdb.confignode.manager.load.cache.node.NodeHeartbeatSample;
83
import org.apache.iotdb.confignode.manager.node.ClusterNodeStartUtils;
84
import org.apache.iotdb.confignode.manager.node.NodeManager;
85
import org.apache.iotdb.confignode.manager.node.NodeMetrics;
86
import org.apache.iotdb.confignode.manager.partition.PartitionManager;
87
import org.apache.iotdb.confignode.manager.partition.PartitionMetrics;
88
import org.apache.iotdb.confignode.manager.pipe.PipeManager;
89
import org.apache.iotdb.confignode.manager.schema.ClusterSchemaManager;
90
import org.apache.iotdb.confignode.manager.schema.ClusterSchemaQuotaStatistics;
91
import org.apache.iotdb.confignode.persistence.AuthorInfo;
92
import org.apache.iotdb.confignode.persistence.ModelInfo;
93
import org.apache.iotdb.confignode.persistence.ProcedureInfo;
94
import org.apache.iotdb.confignode.persistence.TriggerInfo;
95
import org.apache.iotdb.confignode.persistence.UDFInfo;
96
import org.apache.iotdb.confignode.persistence.cq.CQInfo;
97
import org.apache.iotdb.confignode.persistence.executor.ConfigPlanExecutor;
98
import org.apache.iotdb.confignode.persistence.node.NodeInfo;
99
import org.apache.iotdb.confignode.persistence.partition.PartitionInfo;
100
import org.apache.iotdb.confignode.persistence.pipe.PipeInfo;
101
import org.apache.iotdb.confignode.persistence.quota.QuotaInfo;
102
import org.apache.iotdb.confignode.persistence.schema.ClusterSchemaInfo;
103
import org.apache.iotdb.confignode.rpc.thrift.TAlterLogicalViewReq;
104
import org.apache.iotdb.confignode.rpc.thrift.TAlterSchemaTemplateReq;
105
import org.apache.iotdb.confignode.rpc.thrift.TClusterParameters;
106
import org.apache.iotdb.confignode.rpc.thrift.TConfigNodeRegisterReq;
107
import org.apache.iotdb.confignode.rpc.thrift.TConfigNodeRegisterResp;
108
import org.apache.iotdb.confignode.rpc.thrift.TConfigNodeRestartReq;
109
import org.apache.iotdb.confignode.rpc.thrift.TCountTimeSlotListReq;
110
import org.apache.iotdb.confignode.rpc.thrift.TCountTimeSlotListResp;
111
import org.apache.iotdb.confignode.rpc.thrift.TCreateCQReq;
112
import org.apache.iotdb.confignode.rpc.thrift.TCreateFunctionReq;
113
import org.apache.iotdb.confignode.rpc.thrift.TCreateModelReq;
114
import org.apache.iotdb.confignode.rpc.thrift.TCreatePipePluginReq;
115
import org.apache.iotdb.confignode.rpc.thrift.TCreatePipeReq;
116
import org.apache.iotdb.confignode.rpc.thrift.TCreateSchemaTemplateReq;
117
import org.apache.iotdb.confignode.rpc.thrift.TCreateTriggerReq;
118
import org.apache.iotdb.confignode.rpc.thrift.TDataNodeRegisterReq;
119
import org.apache.iotdb.confignode.rpc.thrift.TDataNodeRestartReq;
120
import org.apache.iotdb.confignode.rpc.thrift.TDataNodeRestartResp;
121
import org.apache.iotdb.confignode.rpc.thrift.TDataPartitionTableResp;
122
import org.apache.iotdb.confignode.rpc.thrift.TDatabaseSchema;
123
import org.apache.iotdb.confignode.rpc.thrift.TDeactivateSchemaTemplateReq;
124
import org.apache.iotdb.confignode.rpc.thrift.TDeleteLogicalViewReq;
125
import org.apache.iotdb.confignode.rpc.thrift.TDeleteTimeSeriesReq;
126
import org.apache.iotdb.confignode.rpc.thrift.TDropCQReq;
127
import org.apache.iotdb.confignode.rpc.thrift.TDropModelReq;
128
import org.apache.iotdb.confignode.rpc.thrift.TDropTriggerReq;
129
import org.apache.iotdb.confignode.rpc.thrift.TGetAllPipeInfoResp;
130
import org.apache.iotdb.confignode.rpc.thrift.TGetAllTemplatesResp;
131
import org.apache.iotdb.confignode.rpc.thrift.TGetDataNodeLocationsResp;
132
import org.apache.iotdb.confignode.rpc.thrift.TGetJarInListReq;
133
import org.apache.iotdb.confignode.rpc.thrift.TGetJarInListResp;
134
import org.apache.iotdb.confignode.rpc.thrift.TGetLocationForTriggerResp;
135
import org.apache.iotdb.confignode.rpc.thrift.TGetPathsSetTemplatesResp;
136
import org.apache.iotdb.confignode.rpc.thrift.TGetPipePluginTableResp;
137
import org.apache.iotdb.confignode.rpc.thrift.TGetRegionIdReq;
138
import org.apache.iotdb.confignode.rpc.thrift.TGetRegionIdResp;
139
import org.apache.iotdb.confignode.rpc.thrift.TGetSeriesSlotListReq;
140
import org.apache.iotdb.confignode.rpc.thrift.TGetSeriesSlotListResp;
141
import org.apache.iotdb.confignode.rpc.thrift.TGetTemplateResp;
142
import org.apache.iotdb.confignode.rpc.thrift.TGetTimeSlotListReq;
143
import org.apache.iotdb.confignode.rpc.thrift.TGetTimeSlotListResp;
144
import org.apache.iotdb.confignode.rpc.thrift.TGetTriggerTableResp;
145
import org.apache.iotdb.confignode.rpc.thrift.TGetUDFTableResp;
146
import org.apache.iotdb.confignode.rpc.thrift.TMigrateRegionReq;
147
import org.apache.iotdb.confignode.rpc.thrift.TNodeVersionInfo;
148
import org.apache.iotdb.confignode.rpc.thrift.TPermissionInfoResp;
149
import org.apache.iotdb.confignode.rpc.thrift.TRegionMigrateResultReportReq;
150
import org.apache.iotdb.confignode.rpc.thrift.TRegionRouteMapResp;
151
import org.apache.iotdb.confignode.rpc.thrift.TSchemaNodeManagementResp;
152
import org.apache.iotdb.confignode.rpc.thrift.TSchemaPartitionTableResp;
153
import org.apache.iotdb.confignode.rpc.thrift.TSetDataNodeStatusReq;
154
import org.apache.iotdb.confignode.rpc.thrift.TSetSchemaTemplateReq;
155
import org.apache.iotdb.confignode.rpc.thrift.TShowCQResp;
156
import org.apache.iotdb.confignode.rpc.thrift.TShowClusterResp;
157
import org.apache.iotdb.confignode.rpc.thrift.TShowConfigNodesResp;
158
import org.apache.iotdb.confignode.rpc.thrift.TShowDataNodesResp;
159
import org.apache.iotdb.confignode.rpc.thrift.TShowDatabaseResp;
160
import org.apache.iotdb.confignode.rpc.thrift.TShowModelReq;
161
import org.apache.iotdb.confignode.rpc.thrift.TShowModelResp;
162
import org.apache.iotdb.confignode.rpc.thrift.TShowPipeReq;
163
import org.apache.iotdb.confignode.rpc.thrift.TShowPipeResp;
164
import org.apache.iotdb.confignode.rpc.thrift.TShowThrottleReq;
165
import org.apache.iotdb.confignode.rpc.thrift.TShowTrailReq;
166
import org.apache.iotdb.confignode.rpc.thrift.TShowTrailResp;
167
import org.apache.iotdb.confignode.rpc.thrift.TShowVariablesResp;
168
import org.apache.iotdb.confignode.rpc.thrift.TSpaceQuotaResp;
169
import org.apache.iotdb.confignode.rpc.thrift.TThrottleQuotaResp;
170
import org.apache.iotdb.confignode.rpc.thrift.TTimeSlotList;
171
import org.apache.iotdb.confignode.rpc.thrift.TUnsetSchemaTemplateReq;
172
import org.apache.iotdb.confignode.rpc.thrift.TUpdateModelInfoReq;
173
import org.apache.iotdb.confignode.rpc.thrift.TUpdateModelStateReq;
174
import org.apache.iotdb.consensus.common.DataSet;
175
import org.apache.iotdb.consensus.exception.ConsensusException;
176
import org.apache.iotdb.db.schemaengine.template.Template;
177
import org.apache.iotdb.db.schemaengine.template.TemplateAlterOperationType;
178
import org.apache.iotdb.db.schemaengine.template.alter.TemplateAlterOperationUtil;
179
import org.apache.iotdb.rpc.RpcUtils;
180
import org.apache.iotdb.rpc.TSStatusCode;
181
import org.apache.iotdb.tsfile.utils.Pair;
182

183
import org.slf4j.Logger;
184
import org.slf4j.LoggerFactory;
185

186
import java.io.IOException;
187
import java.nio.ByteBuffer;
188
import java.util.ArrayList;
189
import java.util.Arrays;
190
import java.util.Collection;
191
import java.util.Collections;
192
import java.util.Comparator;
193
import java.util.HashMap;
194
import java.util.HashSet;
195
import java.util.List;
196
import java.util.Map;
197
import java.util.Set;
198
import java.util.concurrent.atomic.AtomicReference;
199
import java.util.stream.Collectors;
200

201
import static org.apache.iotdb.commons.conf.IoTDBConstant.ONE_LEVEL_PATH_WILDCARD;
202

203
/** Entry of all management, AssignPartitionManager, AssignRegionManager. */
204
public class ConfigManager implements IManager {
205

206
  private static final Logger LOGGER = LoggerFactory.getLogger(ConfigManager.class);
1✔
207

208
  private static final ConfigNodeConfig CONF = ConfigNodeDescriptor.getInstance().getConf();
1✔
209
  private static final CommonConfig COMMON_CONF = CommonDescriptor.getInstance().getConfig();
1✔
210

211
  /** Manage PartitionTable read/write requests through the ConsensusLayer. */
212
  private final AtomicReference<ConsensusManager> consensusManager = new AtomicReference<>();
×
213

214
  /** Manage cluster node. */
215
  private final NodeManager nodeManager;
216

217
  /** Manage cluster schemaengine. */
218
  private final ClusterSchemaManager clusterSchemaManager;
219

220
  /** Manage cluster regions and partitions. */
221
  private final PartitionManager partitionManager;
222

223
  /** Manage cluster authorization. */
224
  private final PermissionManager permissionManager;
225

226
  private final LoadManager loadManager;
227

228
  /** Manage procedure. */
229
  private final ProcedureManager procedureManager;
230

231
  /** UDF. */
232
  private final UDFManager udfManager;
233

234
  /** Manage Trigger. */
235
  private final TriggerManager triggerManager;
236

237
  /** CQ. */
238
  private final CQManager cqManager;
239

240
  /** ML Model. */
241
  private final ModelManager modelManager;
242

243
  /** Pipe */
244
  private final PipeManager pipeManager;
245

246
  /** Manage quotas */
247
  private final ClusterQuotaManager clusterQuotaManager;
248

249
  private final ConfigRegionStateMachine stateMachine;
250

251
  private final RetryFailedTasksThread retryFailedTasksThread;
252

253
  private static final String DATABASE = "\tDatabase=";
254

255
  public ConfigManager() throws IOException {
×
256
    // Build the persistence module
257
    NodeInfo nodeInfo = new NodeInfo();
×
258
    ClusterSchemaInfo clusterSchemaInfo = new ClusterSchemaInfo();
×
259
    PartitionInfo partitionInfo = new PartitionInfo();
×
260
    AuthorInfo authorInfo = new AuthorInfo();
×
261
    ProcedureInfo procedureInfo = new ProcedureInfo();
×
262
    UDFInfo udfInfo = new UDFInfo();
×
263
    TriggerInfo triggerInfo = new TriggerInfo();
×
264
    CQInfo cqInfo = new CQInfo();
×
265
    ModelInfo modelInfo = new ModelInfo();
×
266
    PipeInfo pipeInfo = new PipeInfo();
×
267
    QuotaInfo quotaInfo = new QuotaInfo();
×
268

269
    // Build state machine and executor
270
    ConfigPlanExecutor executor =
×
271
        new ConfigPlanExecutor(
272
            nodeInfo,
273
            clusterSchemaInfo,
274
            partitionInfo,
275
            authorInfo,
276
            procedureInfo,
277
            udfInfo,
278
            triggerInfo,
279
            cqInfo,
280
            modelInfo,
281
            pipeInfo,
282
            quotaInfo);
283
    this.stateMachine = new ConfigRegionStateMachine(this, executor);
×
284

285
    // Build the manager module
286
    this.nodeManager = new NodeManager(this, nodeInfo);
×
287
    this.clusterSchemaManager =
×
288
        new ClusterSchemaManager(this, clusterSchemaInfo, new ClusterSchemaQuotaStatistics());
289
    this.partitionManager = new PartitionManager(this, partitionInfo);
×
290
    this.permissionManager = new PermissionManager(this, authorInfo);
×
291
    this.procedureManager = new ProcedureManager(this, procedureInfo);
×
292
    this.udfManager = new UDFManager(this, udfInfo);
×
293
    this.triggerManager = new TriggerManager(this, triggerInfo);
×
294
    this.cqManager = new CQManager(this);
×
295
    this.modelManager = new ModelManager(this, modelInfo);
×
296
    this.pipeManager = new PipeManager(this, pipeInfo);
×
297

298
    // 1. keep PipeManager initialization before LoadManager initialization, because
299
    // LoadManager will register PipeManager as a listener.
300
    // 2. keep RetryFailedTasksThread initialization after LoadManager initialization,
301
    // because RetryFailedTasksThread will keep a reference of LoadManager.
302
    this.loadManager = new LoadManager(this);
×
303

304
    this.retryFailedTasksThread = new RetryFailedTasksThread(this);
×
305
    this.clusterQuotaManager = new ClusterQuotaManager(this, quotaInfo);
×
306
  }
×
307

308
  public void initConsensusManager() throws IOException {
309
    this.consensusManager.set(new ConsensusManager(this, this.stateMachine));
×
310
  }
×
311

312
  public void close() throws IOException {
313
    if (consensusManager.get() != null) {
×
314
      consensusManager.get().close();
×
315
    }
316
    if (partitionManager != null) {
×
317
      partitionManager.getRegionMaintainer().shutdown();
×
318
    }
319
    if (procedureManager != null) {
×
320
      procedureManager.shiftExecutor(false);
×
321
    }
322
  }
×
323

324
  @Override
325
  public DataSet getSystemConfiguration() {
326
    TSStatus status = confirmLeader();
×
327
    ConfigurationResp dataSet;
328
    // Notice: The Seed-ConfigNode must also have the privilege to give system configuration.
329
    // Otherwise, the IoTDB-cluster will not have the ability to restart from scratch.
330
    if (status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()
×
331
        || ConfigNodeDescriptor.getInstance().isSeedConfigNode()
×
332
        || SystemPropertiesUtils.isSeedConfigNode()) {
×
333
      dataSet = (ConfigurationResp) nodeManager.getSystemConfiguration();
×
334
    } else {
335
      dataSet = new ConfigurationResp();
×
336
      dataSet.setStatus(status);
×
337
    }
338
    return dataSet;
×
339
  }
340

341
  @Override
342
  public DataSet registerDataNode(TDataNodeRegisterReq req) {
343
    TSStatus status = confirmLeader();
×
344
    if (status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
×
345
      status =
×
346
          ClusterNodeStartUtils.confirmNodeRegistration(
×
347
              NodeType.DataNode,
348
              req.getClusterName(),
×
349
              req.getDataNodeConfiguration().getLocation(),
×
350
              this);
351
      if (status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
×
352
        return nodeManager.registerDataNode(req);
×
353
      }
354
    }
355
    DataNodeRegisterResp resp = new DataNodeRegisterResp();
×
356
    resp.setStatus(status);
×
357
    resp.setConfigNodeList(getNodeManager().getRegisteredConfigNodes());
×
358
    return resp;
×
359
  }
360

361
  @Override
362
  public TDataNodeRestartResp restartDataNode(TDataNodeRestartReq req) {
363
    TSStatus status = confirmLeader();
×
364
    if (status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
×
365
      status =
×
366
          ClusterNodeStartUtils.confirmNodeRestart(
×
367
              NodeType.DataNode,
368
              req.getClusterName(),
×
369
              req.getDataNodeConfiguration().getLocation().getDataNodeId(),
×
370
              req.getDataNodeConfiguration().getLocation(),
×
371
              this);
372
      if (status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
×
373
        return nodeManager.updateDataNodeIfNecessary(req);
×
374
      }
375
    }
376
    return new TDataNodeRestartResp()
×
377
        .setStatus(status)
×
378
        .setConfigNodeList(getNodeManager().getRegisteredConfigNodes());
×
379
  }
380

381
  @Override
382
  public DataSet removeDataNode(RemoveDataNodePlan removeDataNodePlan) {
383
    TSStatus status = confirmLeader();
×
384
    if (status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
×
385
      return nodeManager.removeDataNode(removeDataNodePlan);
×
386
    } else {
387
      DataNodeToStatusResp dataSet = new DataNodeToStatusResp();
×
388
      dataSet.setStatus(status);
×
389
      return dataSet;
×
390
    }
391
  }
392

393
  @Override
394
  public TSStatus reportDataNodeShutdown(TDataNodeLocation dataNodeLocation) {
395
    TSStatus status = confirmLeader();
×
396
    if (status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
×
397
      // Force updating the target DataNode's status to Unknown
398
      getLoadManager()
×
399
          .forceUpdateNodeCache(
×
400
              NodeType.DataNode,
401
              dataNodeLocation.getDataNodeId(),
×
402
              NodeHeartbeatSample.generateDefaultSample(NodeStatus.Unknown));
×
403
      LOGGER.info(
×
404
          "[ShutdownHook] The DataNode-{} will be shutdown soon, mark it as Unknown",
405
          dataNodeLocation.getDataNodeId());
×
406
    }
407
    return status;
×
408
  }
409

410
  @Override
411
  public TSStatus reportRegionMigrateResult(TRegionMigrateResultReportReq req) {
412
    TSStatus status = confirmLeader();
×
413
    if (status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
×
414
      procedureManager.reportRegionMigrateResult(req);
×
415
    }
416
    return status;
×
417
  }
418

419
  @Override
420
  public DataSet getDataNodeConfiguration(
421
      GetDataNodeConfigurationPlan getDataNodeConfigurationPlan) {
422
    TSStatus status = confirmLeader();
×
423
    if (status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
×
424
      return nodeManager.getDataNodeConfiguration(getDataNodeConfigurationPlan);
×
425
    } else {
426
      DataNodeConfigurationResp dataSet = new DataNodeConfigurationResp();
×
427
      dataSet.setStatus(status);
×
428
      return dataSet;
×
429
    }
430
  }
431

432
  @Override
433
  public TShowClusterResp showCluster() {
434
    TSStatus status = confirmLeader();
×
435
    if (status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
×
436
      List<TConfigNodeLocation> configNodeLocations = getNodeManager().getRegisteredConfigNodes();
×
437
      configNodeLocations.sort(Comparator.comparingInt(TConfigNodeLocation::getConfigNodeId));
×
438
      List<TDataNodeLocation> dataNodeInfoLocations =
×
439
          getNodeManager().getRegisteredDataNodes().stream()
×
440
              .map(TDataNodeConfiguration::getLocation)
×
441
              .sorted(Comparator.comparingInt(TDataNodeLocation::getDataNodeId))
×
442
              .collect(Collectors.toList());
×
443
      Map<Integer, String> nodeStatus = getLoadManager().getNodeStatusWithReason();
×
444
      Map<Integer, TNodeVersionInfo> nodeVersionInfo = getNodeManager().getNodeVersionInfo();
×
445
      return new TShowClusterResp(
×
446
          status, configNodeLocations, dataNodeInfoLocations, nodeStatus, nodeVersionInfo);
447
    } else {
448
      return new TShowClusterResp(
×
449
          status, new ArrayList<>(), new ArrayList<>(), new HashMap<>(), new HashMap<>());
450
    }
451
  }
452

453
  @Override
454
  public TShowVariablesResp showVariables() {
455
    TSStatus status = confirmLeader();
×
456
    TShowVariablesResp resp = new TShowVariablesResp();
×
457
    resp.setStatus(status);
×
458
    if (status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
×
459
      resp.setClusterParameters(getClusterParameters());
×
460
    }
461
    return resp;
×
462
  }
463

464
  public TClusterParameters getClusterParameters() {
465
    TClusterParameters clusterParameters = new TClusterParameters();
×
466
    clusterParameters.setClusterName(CONF.getClusterName());
×
467
    clusterParameters.setConfigNodeConsensusProtocolClass(
×
468
        CONF.getConfigNodeConsensusProtocolClass());
×
469
    clusterParameters.setDataRegionConsensusProtocolClass(
×
470
        CONF.getDataRegionConsensusProtocolClass());
×
471
    clusterParameters.setSchemaRegionConsensusProtocolClass(
×
472
        CONF.getSchemaRegionConsensusProtocolClass());
×
473
    clusterParameters.setSeriesPartitionSlotNum(CONF.getSeriesSlotNum());
×
474
    clusterParameters.setSeriesPartitionExecutorClass(CONF.getSeriesPartitionExecutorClass());
×
475
    clusterParameters.setDefaultTTL(COMMON_CONF.getDefaultTTLInMs());
×
476
    clusterParameters.setTimePartitionInterval(COMMON_CONF.getTimePartitionInterval());
×
477
    clusterParameters.setDataReplicationFactor(CONF.getDataReplicationFactor());
×
478
    clusterParameters.setSchemaReplicationFactor(CONF.getSchemaReplicationFactor());
×
479
    clusterParameters.setDataRegionPerDataNode(CONF.getDataRegionPerDataNode());
×
480
    clusterParameters.setSchemaRegionPerDataNode(CONF.getSchemaRegionPerDataNode());
×
481
    clusterParameters.setDiskSpaceWarningThreshold(COMMON_CONF.getDiskSpaceWarningThreshold());
×
482
    clusterParameters.setReadConsistencyLevel(CONF.getReadConsistencyLevel());
×
483
    clusterParameters.setTimestampPrecision(COMMON_CONF.getTimestampPrecision());
×
484
    clusterParameters.setSchemaEngineMode(COMMON_CONF.getSchemaEngineMode());
×
485
    clusterParameters.setTagAttributeTotalSize(COMMON_CONF.getTagAttributeTotalSize());
×
486
    clusterParameters.setDatabaseLimitThreshold(COMMON_CONF.getDatabaseLimitThreshold());
×
487
    return clusterParameters;
×
488
  }
489

490
  @Override
491
  public TSStatus setTTL(SetTTLPlan setTTLPlan) {
492
    TSStatus status = confirmLeader();
×
493
    if (status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
×
494
      return clusterSchemaManager.setTTL(setTTLPlan);
×
495
    } else {
496
      return status;
×
497
    }
498
  }
499

500
  @Override
501
  public TSStatus setSchemaReplicationFactor(
502
      SetSchemaReplicationFactorPlan setSchemaReplicationFactorPlan) {
503
    TSStatus status = confirmLeader();
×
504
    if (status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
×
505
      return clusterSchemaManager.setSchemaReplicationFactor(setSchemaReplicationFactorPlan);
×
506
    } else {
507
      return status;
×
508
    }
509
  }
510

511
  @Override
512
  public TSStatus setDataReplicationFactor(
513
      SetDataReplicationFactorPlan setDataReplicationFactorPlan) {
514
    TSStatus status = confirmLeader();
×
515
    if (status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
×
516
      return clusterSchemaManager.setDataReplicationFactor(setDataReplicationFactorPlan);
×
517
    } else {
518
      return status;
×
519
    }
520
  }
521

522
  @Override
523
  public TSStatus setTimePartitionInterval(
524
      SetTimePartitionIntervalPlan setTimePartitionIntervalPlan) {
525
    TSStatus status = confirmLeader();
×
526
    if (status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
×
527
      return clusterSchemaManager.setTimePartitionInterval(setTimePartitionIntervalPlan);
×
528
    } else {
529
      return status;
×
530
    }
531
  }
532

533
  @Override
534
  public DataSet countMatchedDatabases(CountDatabasePlan countDatabasePlan) {
535
    TSStatus status = confirmLeader();
×
536
    CountDatabaseResp result = new CountDatabaseResp();
×
537
    if (status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
×
538
      return clusterSchemaManager.countMatchedDatabases(countDatabasePlan);
×
539
    } else {
540
      result.setStatus(status);
×
541
    }
542
    return result;
×
543
  }
544

545
  @Override
546
  public DataSet getMatchedDatabaseSchemas(GetDatabasePlan getDatabaseReq) {
547
    TSStatus status = confirmLeader();
×
548
    if (status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
×
549
      return clusterSchemaManager.getMatchedDatabaseSchema(getDatabaseReq);
×
550
    } else {
551
      DatabaseSchemaResp dataSet = new DatabaseSchemaResp();
×
552
      dataSet.setStatus(status);
×
553
      return dataSet;
×
554
    }
555
  }
556

557
  @Override
558
  public synchronized TSStatus setDatabase(DatabaseSchemaPlan databaseSchemaPlan) {
559
    TSStatus status = confirmLeader();
×
560
    if (status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
×
561
      return clusterSchemaManager.setDatabase(databaseSchemaPlan);
×
562
    } else {
563
      return status;
×
564
    }
565
  }
566

567
  @Override
568
  public TSStatus alterDatabase(DatabaseSchemaPlan databaseSchemaPlan) {
569
    TSStatus status = confirmLeader();
×
570
    if (status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
×
571
      return clusterSchemaManager.alterDatabase(databaseSchemaPlan);
×
572
    } else {
573
      return status;
×
574
    }
575
  }
576

577
  @Override
578
  public synchronized TSStatus deleteDatabases(List<String> deletedPaths) {
579
    TSStatus status = confirmLeader();
×
580
    if (status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
×
581
      // remove wild
582
      Map<String, TDatabaseSchema> deleteDatabaseSchemaMap =
×
583
          getClusterSchemaManager().getMatchedDatabaseSchemasByName(deletedPaths);
×
584
      if (deleteDatabaseSchemaMap.isEmpty()) {
×
585
        return RpcUtils.getStatus(
×
586
            TSStatusCode.PATH_NOT_EXIST.getStatusCode(),
×
587
            String.format("Path %s does not exist", Arrays.toString(deletedPaths.toArray())));
×
588
      }
589
      ArrayList<TDatabaseSchema> parsedDeleteDatabases =
×
590
          new ArrayList<>(deleteDatabaseSchemaMap.values());
×
591
      return procedureManager.deleteDatabases(parsedDeleteDatabases);
×
592
    } else {
593
      return status;
×
594
    }
595
  }
596

597
  private List<TSeriesPartitionSlot> calculateRelatedSlot(PartialPath path, PartialPath database) {
598
    // The path contains `**`
599
    if (path.getFullPath().contains(IoTDBConstant.MULTI_LEVEL_PATH_WILDCARD)) {
×
600
      return new ArrayList<>();
×
601
    }
602
    List<PartialPath> innerPathList = path.alterPrefixPath(database);
×
603
    if (innerPathList.size() == 0) {
×
604
      return new ArrayList<>();
×
605
    }
606
    PartialPath innerPath = innerPathList.get(0);
×
607
    // The innerPath contains `*` and the only `*` is not in last level
608
    if (innerPath.getDevice().contains(IoTDBConstant.ONE_LEVEL_PATH_WILDCARD)) {
×
609
      return new ArrayList<>();
×
610
    }
611
    return Collections.singletonList(
×
612
        getPartitionManager().getSeriesPartitionSlot(innerPath.getDevice()));
×
613
  }
614

615
  @Override
616
  public TSchemaPartitionTableResp getSchemaPartition(PathPatternTree patternTree) {
617
    // Construct empty response
618
    TSchemaPartitionTableResp resp = new TSchemaPartitionTableResp();
×
619

620
    TSStatus status = confirmLeader();
×
621
    if (status.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
×
622
      return resp.setStatus(status);
×
623
    }
624

625
    // Build GetSchemaPartitionPlan
626
    Map<String, Set<TSeriesPartitionSlot>> partitionSlotsMap = new HashMap<>();
×
627
    List<PartialPath> relatedPaths = patternTree.getAllPathPatterns();
×
628
    List<String> allDatabases = getClusterSchemaManager().getDatabaseNames();
×
629
    List<PartialPath> allDatabasePaths = new ArrayList<>();
×
630
    for (String database : allDatabases) {
×
631
      try {
632
        allDatabasePaths.add(new PartialPath(database));
×
633
      } catch (IllegalPathException e) {
×
634
        throw new RuntimeException(e);
×
635
      }
×
636
    }
×
637
    Map<String, Boolean> scanAllRegions = new HashMap<>();
×
638
    for (PartialPath path : relatedPaths) {
×
639
      for (int i = 0; i < allDatabases.size(); i++) {
×
640
        String database = allDatabases.get(i);
×
641
        PartialPath databasePath = allDatabasePaths.get(i);
×
642
        if (path.overlapWithFullPathPrefix(databasePath) && !scanAllRegions.containsKey(database)) {
×
643
          List<TSeriesPartitionSlot> relatedSlot = calculateRelatedSlot(path, databasePath);
×
644
          if (relatedSlot.isEmpty()) {
×
645
            scanAllRegions.put(database, true);
×
646
            partitionSlotsMap.put(database, new HashSet<>());
×
647
          } else {
648
            partitionSlotsMap.computeIfAbsent(database, k -> new HashSet<>()).addAll(relatedSlot);
×
649
          }
650
        }
651
      }
652
    }
×
653

654
    // Return empty resp if the partitionSlotsMap is empty
655
    if (partitionSlotsMap.isEmpty()) {
×
656
      return resp.setStatus(StatusUtils.OK).setSchemaPartitionTable(new HashMap<>());
×
657
    }
658

659
    GetSchemaPartitionPlan getSchemaPartitionPlan =
×
660
        new GetSchemaPartitionPlan(
661
            partitionSlotsMap.entrySet().stream()
×
662
                .collect(Collectors.toMap(Map.Entry::getKey, e -> new ArrayList<>(e.getValue()))));
×
663
    SchemaPartitionResp queryResult = partitionManager.getSchemaPartition(getSchemaPartitionPlan);
×
664
    resp = queryResult.convertToRpcSchemaPartitionTableResp();
×
665

666
    LOGGER.debug("GetSchemaPartition receive paths: {}, return: {}", relatedPaths, resp);
×
667

668
    return resp;
×
669
  }
670

671
  @Override
672
  public TSchemaPartitionTableResp getOrCreateSchemaPartition(PathPatternTree patternTree) {
673
    // Construct empty response
674
    TSchemaPartitionTableResp resp = new TSchemaPartitionTableResp();
×
675

676
    TSStatus status = confirmLeader();
×
677
    if (status.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
×
678
      return resp.setStatus(status);
×
679
    }
680

681
    List<String> devicePaths = patternTree.getAllDevicePatterns();
×
682
    List<String> databases = getClusterSchemaManager().getDatabaseNames();
×
683

684
    // Build GetOrCreateSchemaPartitionPlan
685
    Map<String, List<TSeriesPartitionSlot>> partitionSlotsMap = new HashMap<>();
×
686
    for (String devicePath : devicePaths) {
×
687
      if (!devicePath.contains("*")) {
×
688
        // Only check devicePaths that without "*"
689
        for (String database : databases) {
×
690
          if (PathUtils.isStartWith(devicePath, database)) {
×
691
            partitionSlotsMap
×
692
                .computeIfAbsent(database, key -> new ArrayList<>())
×
693
                .add(getPartitionManager().getSeriesPartitionSlot(devicePath));
×
694
            break;
×
695
          }
696
        }
×
697
      }
698
    }
×
699
    GetOrCreateSchemaPartitionPlan getOrCreateSchemaPartitionPlan =
×
700
        new GetOrCreateSchemaPartitionPlan(partitionSlotsMap);
701

702
    SchemaPartitionResp queryResult =
×
703
        partitionManager.getOrCreateSchemaPartition(getOrCreateSchemaPartitionPlan);
×
704
    resp = queryResult.convertToRpcSchemaPartitionTableResp();
×
705

706
    if (CONF.isEnablePrintingNewlyCreatedPartition()) {
×
707
      printNewCreatedSchemaPartition(devicePaths, resp);
×
708
    }
709

710
    return resp;
×
711
  }
712

713
  private void printNewCreatedSchemaPartition(
714
      List<String> devicePaths, TSchemaPartitionTableResp resp) {
715
    final String lineSeparator = System.lineSeparator();
×
716
    StringBuilder devicePathString = new StringBuilder("{");
×
717
    for (String devicePath : devicePaths) {
×
718
      devicePathString.append(lineSeparator).append("\t").append(devicePath).append(",");
×
719
    }
×
720
    devicePathString.append(lineSeparator).append("}");
×
721

722
    StringBuilder schemaPartitionRespString = new StringBuilder("{");
×
723
    schemaPartitionRespString
×
724
        .append(lineSeparator)
×
725
        .append("\tTSStatus=")
×
726
        .append(resp.getStatus().getCode())
×
727
        .append(",");
×
728
    Map<String, Map<TSeriesPartitionSlot, TConsensusGroupId>> schemaPartitionTable =
×
729
        resp.getSchemaPartitionTable();
×
730
    for (Map.Entry<String, Map<TSeriesPartitionSlot, TConsensusGroupId>> databaseEntry :
731
        schemaPartitionTable.entrySet()) {
×
732
      String database = databaseEntry.getKey();
×
733
      schemaPartitionRespString
×
734
          .append(lineSeparator)
×
735
          .append(DATABASE)
×
736
          .append(database)
×
737
          .append(": {");
×
738
      for (Map.Entry<TSeriesPartitionSlot, TConsensusGroupId> slotEntry :
739
          databaseEntry.getValue().entrySet()) {
×
740
        schemaPartitionRespString
×
741
            .append(lineSeparator)
×
742
            .append("\t\t")
×
743
            .append(slotEntry.getKey())
×
744
            .append(", ")
×
745
            .append(slotEntry.getValue())
×
746
            .append(",");
×
747
      }
×
748
      schemaPartitionRespString.append(lineSeparator).append("\t},");
×
749
    }
×
750
    schemaPartitionRespString.append(lineSeparator).append("}");
×
751
    LOGGER.info(
×
752
        "[GetOrCreateSchemaPartition]:{} Receive PathPatternTree: {}, Return TSchemaPartitionTableResp: {}",
753
        lineSeparator,
754
        devicePathString,
755
        schemaPartitionRespString);
756
  }
×
757

758
  @Override
759
  public TSchemaNodeManagementResp getNodePathsPartition(PartialPath partialPath, Integer level) {
760
    TSStatus status = confirmLeader();
×
761
    if (status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
×
762
      GetNodePathsPartitionPlan getNodePathsPartitionPlan = new GetNodePathsPartitionPlan();
×
763
      getNodePathsPartitionPlan.setPartialPath(partialPath);
×
764
      if (null != level) {
×
765
        getNodePathsPartitionPlan.setLevel(level);
×
766
      }
767
      SchemaNodeManagementResp resp =
×
768
          partitionManager.getNodePathsPartition(getNodePathsPartitionPlan);
×
769
      TSchemaNodeManagementResp result =
×
770
          resp.convertToRpcSchemaNodeManagementPartitionResp(
×
771
              getLoadManager().getRegionPriorityMap());
×
772

773
      LOGGER.info(
×
774
          "getNodePathsPartition receive devicePaths: {}, level: {}, return TSchemaNodeManagementResp: {}",
775
          partialPath,
776
          level,
777
          result);
778

779
      return result;
×
780
    } else {
781
      return new TSchemaNodeManagementResp().setStatus(status);
×
782
    }
783
  }
784

785
  @Override
786
  public TDataPartitionTableResp getDataPartition(GetDataPartitionPlan getDataPartitionPlan) {
787
    // Construct empty response
788
    TDataPartitionTableResp resp = new TDataPartitionTableResp();
×
789

790
    TSStatus status = confirmLeader();
×
791
    if (status.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
×
792
      return resp.setStatus(status);
×
793
    }
794
    DataPartitionResp queryResult = partitionManager.getDataPartition(getDataPartitionPlan);
×
795

796
    resp = queryResult.convertToTDataPartitionTableResp();
×
797

798
    LOGGER.debug(
×
799
        "GetDataPartition interface receive PartitionSlotsMap: {}, return: {}",
800
        getDataPartitionPlan.getPartitionSlotsMap(),
×
801
        resp);
802

803
    return resp;
×
804
  }
805

806
  @Override
807
  public TDataPartitionTableResp getOrCreateDataPartition(
808
      GetOrCreateDataPartitionPlan getOrCreateDataPartitionPlan) {
809
    // Construct empty response
810
    TDataPartitionTableResp resp = new TDataPartitionTableResp();
×
811

812
    TSStatus status = confirmLeader();
×
813
    if (status.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
×
814
      return resp.setStatus(status);
×
815
    }
816

817
    DataPartitionResp queryResult =
×
818
        partitionManager.getOrCreateDataPartition(getOrCreateDataPartitionPlan);
×
819
    resp = queryResult.convertToTDataPartitionTableResp();
×
820

821
    if (CONF.isEnablePrintingNewlyCreatedPartition()) {
×
822
      printNewCreatedDataPartition(getOrCreateDataPartitionPlan, resp);
×
823
    }
824

825
    return resp;
×
826
  }
827

828
  private void printNewCreatedDataPartition(
829
      GetOrCreateDataPartitionPlan getOrCreateDataPartitionPlan, TDataPartitionTableResp resp) {
830
    final String lineSeparator = System.lineSeparator();
×
831
    StringBuilder partitionSlotsMapString = new StringBuilder("{");
×
832
    for (Map.Entry<String, Map<TSeriesPartitionSlot, TTimeSlotList>> databaseEntry :
833
        getOrCreateDataPartitionPlan.getPartitionSlotsMap().entrySet()) {
×
834
      String database = databaseEntry.getKey();
×
835
      partitionSlotsMapString.append(lineSeparator).append(DATABASE).append(database).append(": {");
×
836
      for (Map.Entry<TSeriesPartitionSlot, TTimeSlotList> slotEntry :
837
          databaseEntry.getValue().entrySet()) {
×
838
        partitionSlotsMapString
×
839
            .append(lineSeparator)
×
840
            .append("\t\t")
×
841
            .append(slotEntry.getKey())
×
842
            .append(",")
×
843
            .append(slotEntry.getValue());
×
844
      }
×
845
      partitionSlotsMapString.append(lineSeparator).append("\t},");
×
846
    }
×
847
    partitionSlotsMapString.append(lineSeparator).append("}");
×
848

849
    StringBuilder dataPartitionRespString = new StringBuilder("{");
×
850
    dataPartitionRespString
×
851
        .append(lineSeparator)
×
852
        .append("\tTSStatus=")
×
853
        .append(resp.getStatus().getCode())
×
854
        .append(",");
×
855
    Map<String, Map<TSeriesPartitionSlot, Map<TTimePartitionSlot, List<TConsensusGroupId>>>>
856
        dataPartitionTable = resp.getDataPartitionTable();
×
857
    for (Map.Entry<
858
            String, Map<TSeriesPartitionSlot, Map<TTimePartitionSlot, List<TConsensusGroupId>>>>
859
        databaseEntry : dataPartitionTable.entrySet()) {
×
860
      String database = databaseEntry.getKey();
×
861
      dataPartitionRespString.append(lineSeparator).append(DATABASE).append(database).append(": {");
×
862
      for (Map.Entry<TSeriesPartitionSlot, Map<TTimePartitionSlot, List<TConsensusGroupId>>>
863
          seriesSlotEntry : databaseEntry.getValue().entrySet()) {
×
864
        dataPartitionRespString
×
865
            .append(lineSeparator)
×
866
            .append("\t\t")
×
867
            .append(seriesSlotEntry.getKey())
×
868
            .append(": {");
×
869
        for (Map.Entry<TTimePartitionSlot, List<TConsensusGroupId>> timeSlotEntry :
870
            seriesSlotEntry.getValue().entrySet()) {
×
871
          dataPartitionRespString
×
872
              .append(lineSeparator)
×
873
              .append("\t\t\t")
×
874
              .append(timeSlotEntry.getKey())
×
875
              .append(", ")
×
876
              .append(timeSlotEntry.getValue())
×
877
              .append(",");
×
878
        }
×
879
        dataPartitionRespString.append(lineSeparator).append("\t\t},");
×
880
      }
×
881
      dataPartitionRespString.append(lineSeparator).append("\t}");
×
882
    }
×
883
    dataPartitionRespString.append(lineSeparator).append("}");
×
884

885
    LOGGER.info(
×
886
        "[GetOrCreateDataPartition]:{} Receive PartitionSlotsMap: {}, Return TDataPartitionTableResp: {}",
887
        lineSeparator,
888
        partitionSlotsMapString,
889
        dataPartitionRespString);
890
  }
×
891

892
  private TSStatus confirmLeader() {
893
    // Make sure the consensus layer has been initialized
894
    if (getConsensusManager() == null) {
×
895
      return new TSStatus(TSStatusCode.CONSENSUS_NOT_INITIALIZED.getStatusCode())
×
896
          .setMessage(
×
897
              "ConsensusManager of target-ConfigNode is not initialized, "
898
                  + "please make sure the target-ConfigNode has been started successfully.");
899
    }
900
    return getConsensusManager().confirmLeader();
×
901
  }
902

903
  @Override
904
  public NodeManager getNodeManager() {
905
    return nodeManager;
×
906
  }
907

908
  @Override
909
  public ClusterSchemaManager getClusterSchemaManager() {
910
    return clusterSchemaManager;
×
911
  }
912

913
  @Override
914
  public ConsensusManager getConsensusManager() {
915
    return consensusManager.get();
×
916
  }
917

918
  @Override
919
  public PartitionManager getPartitionManager() {
920
    return partitionManager;
×
921
  }
922

923
  @Override
924
  public LoadManager getLoadManager() {
925
    return loadManager;
×
926
  }
927

928
  @Override
929
  public TriggerManager getTriggerManager() {
930
    return triggerManager;
×
931
  }
932

933
  @Override
934
  public ModelManager getModelManager() {
935
    return modelManager;
×
936
  }
937

938
  @Override
939
  public PipeManager getPipeManager() {
940
    return pipeManager;
×
941
  }
942

943
  @Override
944
  public TSStatus operatePermission(AuthorPlan authorPlan) {
945
    TSStatus status = confirmLeader();
×
946
    if (status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
×
947
      return permissionManager.operatePermission(authorPlan);
×
948
    } else {
949
      return status;
×
950
    }
951
  }
952

953
  @Override
954
  public DataSet queryPermission(AuthorPlan authorPlan) {
955
    TSStatus status = confirmLeader();
×
956
    if (status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
×
957
      return permissionManager.queryPermission(authorPlan);
×
958
    } else {
959
      PermissionInfoResp dataSet = new PermissionInfoResp();
×
960
      dataSet.setStatus(status);
×
961
      return dataSet;
×
962
    }
963
  }
964

965
  @Override
966
  public TPermissionInfoResp login(String username, String password) {
967
    TSStatus status = confirmLeader();
×
968
    if (status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
×
969
      return permissionManager.login(username, password);
×
970
    } else {
971
      TPermissionInfoResp resp = AuthUtils.generateEmptyPermissionInfoResp();
×
972
      resp.setStatus(status);
×
973
      return resp;
×
974
    }
975
  }
976

977
  @Override
978
  public TPermissionInfoResp checkUserPrivileges(
979
      String username, List<PartialPath> paths, int permission) {
980
    TSStatus status = confirmLeader();
×
981
    if (status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
×
982
      return permissionManager.checkUserPrivileges(username, paths, permission);
×
983
    } else {
984
      TPermissionInfoResp resp = AuthUtils.generateEmptyPermissionInfoResp();
×
985
      resp.setStatus(status);
×
986
      return resp;
×
987
    }
988
  }
989

990
  @Override
991
  public TConfigNodeRegisterResp registerConfigNode(TConfigNodeRegisterReq req) {
992
    final int ERROR_STATUS_NODE_ID = -1;
×
993

994
    TSStatus status = confirmLeader();
×
995
    if (status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
×
996
      // Make sure the global configurations are consist
997
      status = checkConfigNodeGlobalConfig(req);
×
998
      if (status == null) {
×
999
        status =
×
1000
            ClusterNodeStartUtils.confirmNodeRegistration(
×
1001
                NodeType.ConfigNode,
1002
                req.getClusterParameters().getClusterName(),
×
1003
                req.getConfigNodeLocation(),
×
1004
                this);
1005
        if (status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
×
1006
          return nodeManager.registerConfigNode(req);
×
1007
        }
1008
      }
1009
    }
1010

1011
    return new TConfigNodeRegisterResp().setStatus(status).setConfigNodeId(ERROR_STATUS_NODE_ID);
×
1012
  }
1013

1014
  @Override
1015
  public TSStatus restartConfigNode(TConfigNodeRestartReq req) {
1016
    TSStatus status = confirmLeader();
×
1017
    // Notice: The Seed-ConfigNode must also have the privilege to do Node restart check.
1018
    // Otherwise, the IoTDB-cluster will not have the ability to restart from scratch.
1019
    if (status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()
×
1020
        || ConfigNodeDescriptor.getInstance().isSeedConfigNode()
×
1021
        || SystemPropertiesUtils.isSeedConfigNode()) {
×
1022
      status =
×
1023
          ClusterNodeStartUtils.confirmNodeRestart(
×
1024
              NodeType.ConfigNode,
1025
              req.getClusterName(),
×
1026
              req.getConfigNodeLocation().getConfigNodeId(),
×
1027
              req.getConfigNodeLocation(),
×
1028
              this);
1029
      if (status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
×
1030
        return ClusterNodeStartUtils.ACCEPT_NODE_RESTART;
×
1031
      }
1032
    }
1033
    return status;
×
1034
  }
1035

1036
  public TSStatus checkConfigNodeGlobalConfig(TConfigNodeRegisterReq req) {
1037
    final String errorPrefix = "Reject register, please ensure that the parameter ";
×
1038
    final String errorSuffix = " is consistent with the Seed-ConfigNode.";
×
1039
    TSStatus errorStatus = new TSStatus(TSStatusCode.CONFIGURATION_ERROR.getStatusCode());
×
1040
    TClusterParameters clusterParameters = req.getClusterParameters();
×
1041

1042
    if (!clusterParameters
×
1043
        .getConfigNodeConsensusProtocolClass()
×
1044
        .equals(CONF.getConfigNodeConsensusProtocolClass())) {
×
1045
      return errorStatus.setMessage(
×
1046
          errorPrefix + "config_node_consensus_protocol_class" + errorSuffix);
1047
    }
1048
    if (!clusterParameters
×
1049
        .getDataRegionConsensusProtocolClass()
×
1050
        .equals(CONF.getDataRegionConsensusProtocolClass())) {
×
1051
      return errorStatus.setMessage(
×
1052
          errorPrefix + "data_region_consensus_protocol_class" + errorSuffix);
1053
    }
1054
    if (!clusterParameters
×
1055
        .getSchemaRegionConsensusProtocolClass()
×
1056
        .equals(CONF.getSchemaRegionConsensusProtocolClass())) {
×
1057
      return errorStatus.setMessage(
×
1058
          errorPrefix + "schema_region_consensus_protocol_class" + errorSuffix);
1059
    }
1060

1061
    if (clusterParameters.getSeriesPartitionSlotNum() != CONF.getSeriesSlotNum()) {
×
1062
      return errorStatus.setMessage(errorPrefix + "series_partition_slot_num" + errorSuffix);
×
1063
    }
1064
    if (!clusterParameters
×
1065
        .getSeriesPartitionExecutorClass()
×
1066
        .equals(CONF.getSeriesPartitionExecutorClass())) {
×
1067
      return errorStatus.setMessage(errorPrefix + "series_partition_executor_class" + errorSuffix);
×
1068
    }
1069

1070
    if (clusterParameters.getDefaultTTL()
×
1071
        != CommonDescriptor.getInstance().getConfig().getDefaultTTLInMs()) {
×
1072
      return errorStatus.setMessage(errorPrefix + "default_ttl" + errorSuffix);
×
1073
    }
1074
    if (clusterParameters.getTimePartitionInterval() != COMMON_CONF.getTimePartitionInterval()) {
×
1075
      return errorStatus.setMessage(errorPrefix + "time_partition_interval" + errorSuffix);
×
1076
    }
1077

1078
    if (clusterParameters.getSchemaReplicationFactor() != CONF.getSchemaReplicationFactor()) {
×
1079
      return errorStatus.setMessage(errorPrefix + "schema_replication_factor" + errorSuffix);
×
1080
    }
1081
    if (clusterParameters.getDataReplicationFactor() != CONF.getDataReplicationFactor()) {
×
1082
      return errorStatus.setMessage(errorPrefix + "data_replication_factor" + errorSuffix);
×
1083
    }
1084

1085
    if (clusterParameters.getSchemaRegionPerDataNode() != CONF.getSchemaRegionPerDataNode()) {
×
1086
      return errorStatus.setMessage(errorPrefix + "schema_region_per_data_node" + errorSuffix);
×
1087
    }
1088
    if (clusterParameters.getDataRegionPerDataNode() != CONF.getDataRegionPerDataNode()) {
×
1089
      return errorStatus.setMessage(errorPrefix + "data_region_per_data_node" + errorSuffix);
×
1090
    }
1091

1092
    if (!clusterParameters.getReadConsistencyLevel().equals(CONF.getReadConsistencyLevel())) {
×
1093
      return errorStatus.setMessage(errorPrefix + "read_consistency_level" + errorSuffix);
×
1094
    }
1095

1096
    if (clusterParameters.getDiskSpaceWarningThreshold()
×
1097
        != COMMON_CONF.getDiskSpaceWarningThreshold()) {
×
1098
      return errorStatus.setMessage(errorPrefix + "disk_space_warning_threshold" + errorSuffix);
×
1099
    }
1100

1101
    if (!clusterParameters.getTimestampPrecision().equals(COMMON_CONF.getTimestampPrecision())) {
×
1102
      return errorStatus.setMessage(errorPrefix + "timestamp_precision" + errorSuffix);
×
1103
    }
1104

1105
    if (!clusterParameters.getSchemaEngineMode().equals(COMMON_CONF.getSchemaEngineMode())) {
×
1106
      return errorStatus.setMessage(errorPrefix + "schema_engine_mode" + errorSuffix);
×
1107
    }
1108

1109
    if (clusterParameters.getTagAttributeTotalSize() != COMMON_CONF.getTagAttributeTotalSize()) {
×
1110
      return errorStatus.setMessage(errorPrefix + "tag_attribute_total_size" + errorSuffix);
×
1111
    }
1112

1113
    if (clusterParameters.getDatabaseLimitThreshold() != COMMON_CONF.getDatabaseLimitThreshold()) {
×
1114
      return errorStatus.setMessage(errorPrefix + "database_limit_threshold" + errorSuffix);
×
1115
    }
1116

1117
    return null;
×
1118
  }
1119

1120
  @Override
1121
  public TSStatus createPeerForConsensusGroup(List<TConfigNodeLocation> configNodeLocations) {
1122
    for (int i = 0; i < 30; i++) {
×
1123
      try {
1124
        if (consensusManager.get() == null) {
×
1125
          Thread.sleep(1000);
×
1126
        } else {
1127
          // When add non Seed-ConfigNode to the ConfigNodeGroup, the parameter should be emptyList
1128
          consensusManager.get().createPeerForConsensusGroup(Collections.emptyList());
×
1129
          return StatusUtils.OK;
×
1130
        }
1131
      } catch (InterruptedException e) {
×
1132
        Thread.currentThread().interrupt();
×
1133
        LOGGER.warn("Unexpected interruption during retry creating peer for consensus group");
×
1134
      } catch (ConsensusException e) {
×
1135
        LOGGER.error("Failed to create peer for consensus group", e);
×
1136
        break;
×
1137
      }
×
1138
    }
1139
    return StatusUtils.INTERNAL_ERROR;
×
1140
  }
1141

1142
  @Override
1143
  public TSStatus removeConfigNode(RemoveConfigNodePlan removeConfigNodePlan) {
1144
    TSStatus status = confirmLeader();
×
1145

1146
    if (status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
×
1147
      status = nodeManager.checkConfigNodeBeforeRemove(removeConfigNodePlan);
×
1148
      if (status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
×
1149
        procedureManager.removeConfigNode(removeConfigNodePlan);
×
1150
      }
1151
    }
1152

1153
    return status;
×
1154
  }
1155

1156
  @Override
1157
  public TSStatus reportConfigNodeShutdown(TConfigNodeLocation configNodeLocation) {
1158
    TSStatus status = confirmLeader();
×
1159
    if (status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
×
1160
      // Force updating the target ConfigNode's status to Unknown
1161
      getLoadManager()
×
1162
          .forceUpdateNodeCache(
×
1163
              NodeType.ConfigNode,
1164
              configNodeLocation.getConfigNodeId(),
×
1165
              NodeHeartbeatSample.generateDefaultSample(NodeStatus.Unknown));
×
1166
      LOGGER.info(
×
1167
          "[ShutdownHook] The ConfigNode-{} will be shutdown soon, mark it as Unknown",
1168
          configNodeLocation.getConfigNodeId());
×
1169
    }
1170
    return status;
×
1171
  }
1172

1173
  @Override
1174
  public TSStatus createFunction(TCreateFunctionReq req) {
1175
    TSStatus status = confirmLeader();
×
1176
    return status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()
×
1177
        ? udfManager.createFunction(req)
×
1178
        : status;
×
1179
  }
1180

1181
  @Override
1182
  public TSStatus dropFunction(String udfName) {
1183
    TSStatus status = confirmLeader();
×
1184
    return status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()
×
1185
        ? udfManager.dropFunction(udfName)
×
1186
        : status;
×
1187
  }
1188

1189
  @Override
1190
  public TGetUDFTableResp getUDFTable() {
1191
    TSStatus status = confirmLeader();
×
1192
    return status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()
×
1193
        ? udfManager.getUDFTable()
×
1194
        : new TGetUDFTableResp(status, Collections.emptyList());
×
1195
  }
1196

1197
  @Override
1198
  public TGetJarInListResp getUDFJar(TGetJarInListReq req) {
1199
    TSStatus status = confirmLeader();
×
1200
    return status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()
×
1201
        ? udfManager.getUDFJar(req)
×
1202
        : new TGetJarInListResp(status, Collections.emptyList());
×
1203
  }
1204

1205
  @Override
1206
  public TSStatus createTrigger(TCreateTriggerReq req) {
1207
    TSStatus status = confirmLeader();
×
1208
    return status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()
×
1209
        ? triggerManager.createTrigger(req)
×
1210
        : status;
×
1211
  }
1212

1213
  @Override
1214
  public TSStatus dropTrigger(TDropTriggerReq req) {
1215
    TSStatus status = confirmLeader();
×
1216
    return status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()
×
1217
        ? triggerManager.dropTrigger(req)
×
1218
        : status;
×
1219
  }
1220

1221
  @Override
1222
  public TGetTriggerTableResp getTriggerTable() {
1223
    TSStatus status = confirmLeader();
×
1224
    return status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()
×
1225
        ? triggerManager.getTriggerTable(false)
×
1226
        : new TGetTriggerTableResp(status, Collections.emptyList());
×
1227
  }
1228

1229
  @Override
1230
  public TGetTriggerTableResp getStatefulTriggerTable() {
1231
    TSStatus status = confirmLeader();
×
1232
    return status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()
×
1233
        ? triggerManager.getTriggerTable(true)
×
1234
        : new TGetTriggerTableResp(status, Collections.emptyList());
×
1235
  }
1236

1237
  @Override
1238
  public TGetLocationForTriggerResp getLocationOfStatefulTrigger(String triggerName) {
1239
    TSStatus status = confirmLeader();
×
1240
    return status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()
×
1241
        ? triggerManager.getLocationOfStatefulTrigger(triggerName)
×
1242
        : new TGetLocationForTriggerResp(status);
×
1243
  }
1244

1245
  @Override
1246
  public TGetJarInListResp getTriggerJar(TGetJarInListReq req) {
1247
    TSStatus status = confirmLeader();
×
1248
    return status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()
×
1249
        ? triggerManager.getTriggerJar(req)
×
1250
        : new TGetJarInListResp(status, Collections.emptyList());
×
1251
  }
1252

1253
  @Override
1254
  public TSStatus createPipePlugin(TCreatePipePluginReq req) {
1255
    TSStatus status = confirmLeader();
×
1256
    return status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()
×
1257
        ? pipeManager.getPipePluginCoordinator().createPipePlugin(req)
×
1258
        : status;
×
1259
  }
1260

1261
  @Override
1262
  public TSStatus dropPipePlugin(String pipePluginName) {
1263
    TSStatus status = confirmLeader();
×
1264
    return status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()
×
1265
        ? pipeManager.getPipePluginCoordinator().dropPipePlugin(pipePluginName)
×
1266
        : status;
×
1267
  }
1268

1269
  @Override
1270
  public TGetPipePluginTableResp getPipePluginTable() {
1271
    TSStatus status = confirmLeader();
×
1272
    return status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()
×
1273
        ? pipeManager.getPipePluginCoordinator().getPipePluginTable()
×
1274
        : new TGetPipePluginTableResp(status, Collections.emptyList());
×
1275
  }
1276

1277
  @Override
1278
  public TGetJarInListResp getPipePluginJar(TGetJarInListReq req) {
1279
    TSStatus status = confirmLeader();
×
1280
    return status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()
×
1281
        ? pipeManager.getPipePluginCoordinator().getPipePluginJar(req)
×
1282
        : new TGetJarInListResp(status, Collections.emptyList());
×
1283
  }
1284

1285
  @Override
1286
  public TSStatus merge() {
1287
    TSStatus status = confirmLeader();
×
1288
    return status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()
×
1289
        ? RpcUtils.squashResponseStatusList(nodeManager.merge())
×
1290
        : status;
×
1291
  }
1292

1293
  @Override
1294
  public TSStatus flush(TFlushReq req) {
1295
    TSStatus status = confirmLeader();
×
1296
    return status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()
×
1297
        ? RpcUtils.squashResponseStatusList(nodeManager.flush(req))
×
1298
        : status;
×
1299
  }
1300

1301
  @Override
1302
  public TSStatus clearCache() {
1303
    TSStatus status = confirmLeader();
×
1304
    return status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()
×
1305
        ? RpcUtils.squashResponseStatusList(nodeManager.clearCache())
×
1306
        : status;
×
1307
  }
1308

1309
  @Override
1310
  public TSStatus loadConfiguration() {
1311
    TSStatus status = confirmLeader();
×
1312
    return status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()
×
1313
        ? RpcUtils.squashResponseStatusList(nodeManager.loadConfiguration())
×
1314
        : status;
×
1315
  }
1316

1317
  @Override
1318
  public TSStatus setSystemStatus(String systemStatus) {
1319
    TSStatus status = confirmLeader();
×
1320
    return status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()
×
1321
        ? RpcUtils.squashResponseStatusList(nodeManager.setSystemStatus(systemStatus))
×
1322
        : status;
×
1323
  }
1324

1325
  @Override
1326
  public TSStatus setDataNodeStatus(TSetDataNodeStatusReq req) {
1327
    TSStatus status = confirmLeader();
×
1328
    return status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()
×
1329
        ? nodeManager.setDataNodeStatus(req)
×
1330
        : status;
×
1331
  }
1332

1333
  @Override
1334
  public TSStatus killQuery(String queryId, int dataNodeId) {
1335
    TSStatus status = confirmLeader();
×
1336
    return status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()
×
1337
        ? nodeManager.killQuery(queryId, dataNodeId)
×
1338
        : status;
×
1339
  }
1340

1341
  @Override
1342
  public TGetDataNodeLocationsResp getRunningDataNodeLocations() {
1343
    TSStatus status = confirmLeader();
×
1344
    return status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()
×
1345
        ? new TGetDataNodeLocationsResp(
×
1346
            new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode()),
×
1347
            nodeManager.filterDataNodeThroughStatus(NodeStatus.Running).stream()
×
1348
                .map(TDataNodeConfiguration::getLocation)
×
1349
                .collect(Collectors.toList()))
×
1350
        : new TGetDataNodeLocationsResp(status, Collections.emptyList());
×
1351
  }
1352

1353
  @Override
1354
  public TRegionRouteMapResp getLatestRegionRouteMap() {
1355
    TSStatus status = confirmLeader();
×
1356
    TRegionRouteMapResp resp = new TRegionRouteMapResp(status);
×
1357

1358
    if (status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
×
1359
      resp.setTimestamp(System.currentTimeMillis());
×
1360
      resp.setRegionRouteMap(getLoadManager().getRegionPriorityMap());
×
1361
    }
1362

1363
    return resp;
×
1364
  }
1365

1366
  @Override
1367
  public UDFManager getUDFManager() {
1368
    return udfManager;
×
1369
  }
1370

1371
  @Override
1372
  public RegionInfoListResp showRegion(GetRegionInfoListPlan getRegionInfoListPlan) {
1373
    TSStatus status = confirmLeader();
×
1374
    if (status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
×
1375
      return partitionManager.getRegionInfoList(getRegionInfoListPlan);
×
1376
    } else {
1377
      RegionInfoListResp regionResp = new RegionInfoListResp();
×
1378
      regionResp.setStatus(status);
×
1379
      return regionResp;
×
1380
    }
1381
  }
1382

1383
  @Override
1384
  public TShowDataNodesResp showDataNodes() {
1385
    TSStatus status = confirmLeader();
×
1386
    TShowDataNodesResp resp = new TShowDataNodesResp();
×
1387
    if (status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
×
1388
      return resp.setDataNodesInfoList(nodeManager.getRegisteredDataNodeInfoList())
×
1389
          .setStatus(StatusUtils.OK);
×
1390
    } else {
1391
      return resp.setStatus(status);
×
1392
    }
1393
  }
1394

1395
  @Override
1396
  public TShowConfigNodesResp showConfigNodes() {
1397
    TSStatus status = confirmLeader();
×
1398
    TShowConfigNodesResp resp = new TShowConfigNodesResp();
×
1399
    if (status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
×
1400
      return resp.setConfigNodesInfoList(nodeManager.getRegisteredConfigNodeInfoList())
×
1401
          .setStatus(StatusUtils.OK);
×
1402
    } else {
1403
      return resp.setStatus(status);
×
1404
    }
1405
  }
1406

1407
  @Override
1408
  public TShowDatabaseResp showDatabase(GetDatabasePlan getDatabasePlan) {
1409
    TSStatus status = confirmLeader();
×
1410
    if (status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
×
1411
      return getClusterSchemaManager().showDatabase(getDatabasePlan);
×
1412
    } else {
1413
      return new TShowDatabaseResp().setStatus(status);
×
1414
    }
1415
  }
1416

1417
  @Override
1418
  public ProcedureManager getProcedureManager() {
1419
    return procedureManager;
×
1420
  }
1421

1422
  @Override
1423
  public CQManager getCQManager() {
1424
    return cqManager;
×
1425
  }
1426

1427
  @Override
1428
  public ClusterQuotaManager getClusterQuotaManager() {
1429
    return clusterQuotaManager;
×
1430
  }
1431

1432
  @Override
1433
  public RetryFailedTasksThread getRetryFailedTasksThread() {
1434
    return retryFailedTasksThread;
×
1435
  }
1436

1437
  @Override
1438
  public void addMetrics() {
1439
    MetricService.getInstance().addMetricSet(new NodeMetrics(getNodeManager()));
×
1440
    MetricService.getInstance().addMetricSet(new PartitionMetrics(this));
×
1441
  }
×
1442

1443
  @Override
1444
  public TSStatus createSchemaTemplate(TCreateSchemaTemplateReq req) {
1445
    TSStatus status = confirmLeader();
×
1446
    if (status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
×
1447
      CreateSchemaTemplatePlan createSchemaTemplatePlan =
×
1448
          new CreateSchemaTemplatePlan(req.getSerializedTemplate());
×
1449
      return clusterSchemaManager.createTemplate(createSchemaTemplatePlan);
×
1450
    } else {
1451
      return status;
×
1452
    }
1453
  }
1454

1455
  @Override
1456
  public TGetAllTemplatesResp getAllTemplates() {
1457
    TSStatus status = confirmLeader();
×
1458
    if (status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
×
1459
      return clusterSchemaManager.getAllTemplates();
×
1460
    } else {
1461
      return new TGetAllTemplatesResp().setStatus(status);
×
1462
    }
1463
  }
1464

1465
  @Override
1466
  public TGetTemplateResp getTemplate(String req) {
1467
    TSStatus status = confirmLeader();
×
1468
    if (status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
×
1469
      return clusterSchemaManager.getTemplate(req);
×
1470
    } else {
1471
      return new TGetTemplateResp().setStatus(status);
×
1472
    }
1473
  }
1474

1475
  @Override
1476
  public synchronized TSStatus setSchemaTemplate(TSetSchemaTemplateReq req) {
1477
    TSStatus status = confirmLeader();
×
1478
    if (status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
×
1479
      return procedureManager.setSchemaTemplate(req.getQueryId(), req.getName(), req.getPath());
×
1480
    } else {
1481
      return status;
×
1482
    }
1483
  }
1484

1485
  @Override
1486
  public TGetPathsSetTemplatesResp getPathsSetTemplate(String req) {
1487
    TSStatus status = confirmLeader();
×
1488
    if (status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
×
1489
      return clusterSchemaManager.getPathsSetTemplate(req);
×
1490
    } else {
1491
      return new TGetPathsSetTemplatesResp(status);
×
1492
    }
1493
  }
1494

1495
  @Override
1496
  public TSStatus deactivateSchemaTemplate(TDeactivateSchemaTemplateReq req) {
1497
    TSStatus status = confirmLeader();
×
1498
    if (status.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
×
1499
      return status;
×
1500
    }
1501

1502
    PathPatternTree patternTree =
×
1503
        PathPatternTree.deserialize(ByteBuffer.wrap(req.getPathPatternTree()));
×
1504

1505
    List<PartialPath> patternList = patternTree.getAllPathPatterns();
×
1506
    TemplateSetInfoResp templateSetInfoResp = clusterSchemaManager.getTemplateSetInfo(patternList);
×
1507
    if (templateSetInfoResp.getStatus().getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
×
1508
      return templateSetInfoResp.getStatus();
×
1509
    }
1510

1511
    Map<PartialPath, List<Template>> templateSetInfo = templateSetInfoResp.getPatternTemplateMap();
×
1512
    if (templateSetInfo.isEmpty()) {
×
1513
      return RpcUtils.getStatus(
×
1514
          TSStatusCode.TEMPLATE_NOT_SET,
1515
          String.format(
×
1516
              "Schema Template %s is not set on any prefix path of %s",
1517
              req.getTemplateName(), patternList));
×
1518
    }
1519

1520
    if (!req.getTemplateName().equals(ONE_LEVEL_PATH_WILDCARD)) {
×
1521
      Map<PartialPath, List<Template>> filteredTemplateSetInfo = new HashMap<>();
×
1522
      for (Map.Entry<PartialPath, List<Template>> entry : templateSetInfo.entrySet()) {
×
1523
        for (Template template : entry.getValue()) {
×
1524
          if (template.getName().equals(req.getTemplateName())) {
×
1525
            filteredTemplateSetInfo.put(entry.getKey(), Collections.singletonList(template));
×
1526
            break;
×
1527
          }
1528
        }
×
1529
      }
×
1530

1531
      if (filteredTemplateSetInfo.isEmpty()) {
×
1532
        return RpcUtils.getStatus(
×
1533
            TSStatusCode.TEMPLATE_NOT_SET,
1534
            String.format(
×
1535
                "Schema Template %s is not set on any prefix path of %s",
1536
                req.getTemplateName(), patternList));
×
1537
      }
1538

1539
      templateSetInfo = filteredTemplateSetInfo;
×
1540
    }
1541

1542
    return procedureManager.deactivateTemplate(req.getQueryId(), templateSetInfo);
×
1543
  }
1544

1545
  @Override
1546
  public synchronized TSStatus unsetSchemaTemplate(TUnsetSchemaTemplateReq req) {
1547
    TSStatus status = confirmLeader();
×
1548
    if (status.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
×
1549
      return status;
×
1550
    }
1551
    Pair<TSStatus, Template> checkResult =
×
1552
        clusterSchemaManager.checkIsTemplateSetOnPath(req.getTemplateName(), req.getPath());
×
1553
    if (checkResult.left.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
×
1554
      try {
1555
        return procedureManager.unsetSchemaTemplate(
×
1556
            req.getQueryId(), checkResult.right, new PartialPath(req.getPath()));
×
1557
      } catch (IllegalPathException e) {
×
1558
        return RpcUtils.getStatus(e.getErrorCode(), e.getMessage());
×
1559
      }
1560
    } else {
1561
      return checkResult.left;
×
1562
    }
1563
  }
1564

1565
  @Override
1566
  public TSStatus dropSchemaTemplate(String templateName) {
1567
    TSStatus status = confirmLeader();
×
1568
    if (status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
×
1569
      return clusterSchemaManager.dropSchemaTemplate(templateName);
×
1570
    } else {
1571
      return status;
×
1572
    }
1573
  }
1574

1575
  @Override
1576
  public TSStatus alterSchemaTemplate(TAlterSchemaTemplateReq req) {
1577
    TSStatus status = confirmLeader();
×
1578
    if (status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
×
1579
      ByteBuffer buffer = ByteBuffer.wrap(req.getTemplateAlterInfo());
×
1580
      TemplateAlterOperationType operationType =
×
1581
          TemplateAlterOperationUtil.parseOperationType(buffer);
×
1582
      if (operationType.equals(TemplateAlterOperationType.EXTEND_TEMPLATE)) {
×
1583
        return clusterSchemaManager.extendSchemaTemplate(
×
1584
            TemplateAlterOperationUtil.parseTemplateExtendInfo(buffer));
×
1585
      }
1586
      return RpcUtils.getStatus(TSStatusCode.UNSUPPORTED_OPERATION);
×
1587
    } else {
1588
      return status;
×
1589
    }
1590
  }
1591

1592
  @Override
1593
  public TSStatus deleteTimeSeries(TDeleteTimeSeriesReq req) {
1594
    TSStatus status = confirmLeader();
×
1595
    if (status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
×
1596
      return procedureManager.deleteTimeSeries(req);
×
1597
    } else {
1598
      return status;
×
1599
    }
1600
  }
1601

1602
  @Override
1603
  public TSStatus deleteLogicalView(TDeleteLogicalViewReq req) {
1604
    TSStatus status = confirmLeader();
×
1605
    if (status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
×
1606
      return procedureManager.deleteLogicalView(req);
×
1607
    } else {
1608
      return status;
×
1609
    }
1610
  }
1611

1612
  @Override
1613
  public TSStatus alterLogicalView(TAlterLogicalViewReq req) {
1614
    TSStatus status = confirmLeader();
×
1615
    if (status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
×
1616
      return procedureManager.alterLogicalView(req);
×
1617
    } else {
1618
      return status;
×
1619
    }
1620
  }
1621

1622
  @Override
1623
  public TSStatus createPipe(TCreatePipeReq req) {
1624
    TSStatus status = confirmLeader();
×
1625
    return status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()
×
1626
        ? pipeManager.getPipeTaskCoordinator().createPipe(req)
×
1627
        : status;
×
1628
  }
1629

1630
  @Override
1631
  public TSStatus startPipe(String pipeName) {
1632
    TSStatus status = confirmLeader();
×
1633
    return status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()
×
1634
        ? pipeManager.getPipeTaskCoordinator().startPipe(pipeName)
×
1635
        : status;
×
1636
  }
1637

1638
  @Override
1639
  public TSStatus stopPipe(String pipeName) {
1640
    TSStatus status = confirmLeader();
×
1641
    return status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()
×
1642
        ? pipeManager.getPipeTaskCoordinator().stopPipe(pipeName)
×
1643
        : status;
×
1644
  }
1645

1646
  @Override
1647
  public TSStatus dropPipe(String pipeName) {
1648
    TSStatus status = confirmLeader();
×
1649
    return status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()
×
1650
        ? pipeManager.getPipeTaskCoordinator().dropPipe(pipeName)
×
1651
        : status;
×
1652
  }
1653

1654
  @Override
1655
  public TShowPipeResp showPipe(TShowPipeReq req) {
1656
    TSStatus status = confirmLeader();
×
1657
    return status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()
×
1658
        ? pipeManager.getPipeTaskCoordinator().showPipes(req)
×
1659
        : new TShowPipeResp().setStatus(status);
×
1660
  }
1661

1662
  @Override
1663
  public TGetAllPipeInfoResp getAllPipeInfo() {
1664
    TSStatus status = confirmLeader();
×
1665
    return status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()
×
1666
        ? pipeManager.getPipeTaskCoordinator().getAllPipeInfo()
×
1667
        : new TGetAllPipeInfoResp(status, Collections.emptyList());
×
1668
  }
1669

1670
  @Override
1671
  public TGetRegionIdResp getRegionId(TGetRegionIdReq req) {
1672
    TSStatus status = confirmLeader();
×
1673
    return status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()
×
1674
        ? partitionManager.getRegionId(req).convertToRpcGetRegionIdResp()
×
1675
        : new TGetRegionIdResp(status);
×
1676
  }
1677

1678
  @Override
1679
  public TGetTimeSlotListResp getTimeSlotList(TGetTimeSlotListReq req) {
1680
    TSStatus status = confirmLeader();
×
1681
    return status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()
×
1682
        ? partitionManager.getTimeSlotList(req).convertToRpcGetTimeSlotListResp()
×
1683
        : new TGetTimeSlotListResp(status);
×
1684
  }
1685

1686
  @Override
1687
  public TCountTimeSlotListResp countTimeSlotList(TCountTimeSlotListReq req) {
1688
    TSStatus status = confirmLeader();
×
1689
    return status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()
×
1690
        ? partitionManager.countTimeSlotList(req).convertToRpcCountTimeSlotListResp()
×
1691
        : new TCountTimeSlotListResp(status);
×
1692
  }
1693

1694
  @Override
1695
  public TGetSeriesSlotListResp getSeriesSlotList(TGetSeriesSlotListReq req) {
1696
    TSStatus status = confirmLeader();
×
1697
    return status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()
×
1698
        ? partitionManager.getSeriesSlotList(req).convertToRpcGetSeriesSlotListResp()
×
1699
        : new TGetSeriesSlotListResp(status);
×
1700
  }
1701

1702
  @Override
1703
  public TSStatus migrateRegion(TMigrateRegionReq req) {
1704
    TSStatus status = confirmLeader();
×
1705
    return status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()
×
1706
        ? procedureManager.migrateRegion(req)
×
1707
        : status;
×
1708
  }
1709

1710
  @Override
1711
  public TSStatus createCQ(TCreateCQReq req) {
1712
    TSStatus status = confirmLeader();
×
1713
    return status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()
×
1714
        ? cqManager.createCQ(req)
×
1715
        : status;
×
1716
  }
1717

1718
  @Override
1719
  public TSStatus dropCQ(TDropCQReq req) {
1720
    TSStatus status = confirmLeader();
×
1721
    return status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()
×
1722
        ? cqManager.dropCQ(req)
×
1723
        : status;
×
1724
  }
1725

1726
  @Override
1727
  public TShowCQResp showCQ() {
1728
    TSStatus status = confirmLeader();
×
1729
    return status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()
×
1730
        ? cqManager.showCQ()
×
1731
        : new TShowCQResp(status, Collections.emptyList());
×
1732
  }
1733

1734
  /**
1735
   * Get all related schemaRegion which may contains the timeseries matched by given patternTree.
1736
   */
1737
  public Map<TConsensusGroupId, TRegionReplicaSet> getRelatedSchemaRegionGroup(
1738
      PathPatternTree patternTree) {
1739
    Map<String, Map<TSeriesPartitionSlot, TConsensusGroupId>> schemaPartitionTable =
×
1740
        getSchemaPartition(patternTree).getSchemaPartitionTable();
×
1741

1742
    List<TRegionReplicaSet> allRegionReplicaSets = getPartitionManager().getAllReplicaSets();
×
1743
    Set<TConsensusGroupId> groupIdSet =
×
1744
        schemaPartitionTable.values().stream()
×
1745
            .flatMap(m -> m.values().stream())
×
1746
            .collect(Collectors.toSet());
×
1747
    Map<TConsensusGroupId, TRegionReplicaSet> filteredRegionReplicaSets = new HashMap<>();
×
1748
    for (TRegionReplicaSet regionReplicaSet : allRegionReplicaSets) {
×
1749
      if (groupIdSet.contains(regionReplicaSet.getRegionId())) {
×
1750
        filteredRegionReplicaSets.put(regionReplicaSet.getRegionId(), regionReplicaSet);
×
1751
      }
1752
    }
×
1753
    return filteredRegionReplicaSets;
×
1754
  }
1755

1756
  /**
1757
   * Get all related dataRegion which may contains the data of specific timeseries matched by given
1758
   * patternTree
1759
   */
1760
  public Map<TConsensusGroupId, TRegionReplicaSet> getRelatedDataRegionGroup(
1761
      PathPatternTree patternTree) {
1762
    // Get all databases and slots by getting schemaengine partition
1763
    Map<String, Map<TSeriesPartitionSlot, TConsensusGroupId>> schemaPartitionTable =
×
1764
        getSchemaPartition(patternTree).getSchemaPartitionTable();
×
1765

1766
    // Construct request for getting data partition
1767
    Map<String, Map<TSeriesPartitionSlot, TTimeSlotList>> partitionSlotsMap = new HashMap<>();
×
1768
    schemaPartitionTable.forEach(
×
1769
        (key, value) -> {
1770
          Map<TSeriesPartitionSlot, TTimeSlotList> slotListMap = new HashMap<>();
×
1771
          value
×
1772
              .keySet()
×
1773
              .forEach(
×
1774
                  slot ->
1775
                      slotListMap.put(
×
1776
                          slot, new TTimeSlotList(Collections.emptyList(), true, true)));
×
1777
          partitionSlotsMap.put(key, slotListMap);
×
1778
        });
×
1779

1780
    // Get all data partitions
1781
    GetDataPartitionPlan getDataPartitionPlan = new GetDataPartitionPlan(partitionSlotsMap);
×
1782
    Map<String, Map<TSeriesPartitionSlot, Map<TTimePartitionSlot, List<TConsensusGroupId>>>>
1783
        dataPartitionTable = getDataPartition(getDataPartitionPlan).getDataPartitionTable();
×
1784

1785
    // Get all region replicaset of target data partitions
1786
    List<TRegionReplicaSet> allRegionReplicaSets = getPartitionManager().getAllReplicaSets();
×
1787
    Set<TConsensusGroupId> groupIdSet =
×
1788
        dataPartitionTable.values().stream()
×
1789
            .flatMap(
×
1790
                tSeriesPartitionSlotMapMap ->
1791
                    tSeriesPartitionSlotMapMap.values().stream()
×
1792
                        .flatMap(
×
1793
                            tTimePartitionSlotListMap ->
1794
                                tTimePartitionSlotListMap.values().stream()
×
1795
                                    .flatMap(Collection::stream)))
×
1796
            .collect(Collectors.toSet());
×
1797
    Map<TConsensusGroupId, TRegionReplicaSet> filteredRegionReplicaSets = new HashMap<>();
×
1798
    for (TRegionReplicaSet regionReplicaSet : allRegionReplicaSets) {
×
1799
      if (groupIdSet.contains(regionReplicaSet.getRegionId())) {
×
1800
        filteredRegionReplicaSets.put(regionReplicaSet.getRegionId(), regionReplicaSet);
×
1801
      }
1802
    }
×
1803
    return filteredRegionReplicaSets;
×
1804
  }
1805

1806
  public TSStatus transfer(List<TDataNodeLocation> newUnknownDataList) {
1807
    Map<Integer, TDataNodeLocation> runningDataNodeLocationMap = new HashMap<>();
×
1808
    nodeManager
×
1809
        .filterDataNodeThroughStatus(NodeStatus.Running)
×
1810
        .forEach(
×
1811
            dataNodeConfiguration ->
1812
                runningDataNodeLocationMap.put(
×
1813
                    dataNodeConfiguration.getLocation().getDataNodeId(),
×
1814
                    dataNodeConfiguration.getLocation()));
×
1815
    if (runningDataNodeLocationMap.isEmpty()) {
×
1816
      // No running DataNode, will not transfer and print log
1817
      return new TSStatus(TSStatusCode.EXECUTE_STATEMENT_ERROR.getStatusCode());
×
1818
    }
1819

1820
    newUnknownDataList.forEach(
×
1821
        dataNodeLocation -> runningDataNodeLocationMap.remove(dataNodeLocation.getDataNodeId()));
×
1822

1823
    LOGGER.info("Start transfer of {}", newUnknownDataList);
×
1824
    // Transfer trigger
1825
    TSStatus transferResult =
×
1826
        triggerManager.transferTrigger(newUnknownDataList, runningDataNodeLocationMap);
×
1827
    if (transferResult.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
×
1828
      LOGGER.warn("Fail to transfer because {}, will retry", transferResult.getMessage());
×
1829
    }
1830

1831
    return transferResult;
×
1832
  }
1833

1834
  @Override
1835
  public TSStatus createModel(TCreateModelReq req) {
1836
    TSStatus status = confirmLeader();
×
1837
    return status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()
×
1838
        ? modelManager.createModel(req)
×
1839
        : status;
×
1840
  }
1841

1842
  @Override
1843
  public TSStatus dropModel(TDropModelReq req) {
1844
    TSStatus status = confirmLeader();
×
1845
    return status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()
×
1846
        ? modelManager.dropModel(req)
×
1847
        : status;
×
1848
  }
1849

1850
  @Override
1851
  public TShowModelResp showModel(TShowModelReq req) {
1852
    TSStatus status = confirmLeader();
×
1853
    return status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()
×
1854
        ? modelManager.showModel(req)
×
1855
        : new TShowModelResp(status, Collections.emptyList());
×
1856
  }
1857

1858
  @Override
1859
  public TShowTrailResp showTrail(TShowTrailReq req) {
1860
    TSStatus status = confirmLeader();
×
1861
    return status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()
×
1862
        ? modelManager.showTrail(req)
×
1863
        : new TShowTrailResp(status, Collections.emptyList());
×
1864
  }
1865

1866
  @Override
1867
  public TSStatus updateModelInfo(TUpdateModelInfoReq req) {
1868
    TSStatus status = confirmLeader();
×
1869
    return status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()
×
1870
        ? modelManager.updateModelInfo(req)
×
1871
        : status;
×
1872
  }
1873

1874
  @Override
1875
  public TSStatus updateModelState(TUpdateModelStateReq req) {
1876
    TSStatus status = confirmLeader();
×
1877
    return status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()
×
1878
        ? modelManager.updateModelState(req)
×
1879
        : status;
×
1880
  }
1881

1882
  @Override
1883
  public TSStatus setSpaceQuota(TSetSpaceQuotaReq req) {
1884
    TSStatus status = confirmLeader();
×
1885
    return status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()
×
1886
        ? clusterQuotaManager.setSpaceQuota(req)
×
1887
        : status;
×
1888
  }
1889

1890
  public TSpaceQuotaResp showSpaceQuota(List<String> databases) {
1891
    TSStatus status = confirmLeader();
×
1892
    return status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()
×
1893
        ? clusterQuotaManager.showSpaceQuota(databases)
×
1894
        : new TSpaceQuotaResp(status);
×
1895
  }
1896

1897
  public TSpaceQuotaResp getSpaceQuota() {
1898
    TSStatus status = confirmLeader();
×
1899
    return status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()
×
1900
        ? clusterQuotaManager.getSpaceQuota()
×
1901
        : new TSpaceQuotaResp(status);
×
1902
  }
1903

1904
  public TSStatus setThrottleQuota(TSetThrottleQuotaReq req) {
1905
    TSStatus status = confirmLeader();
×
1906
    return status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()
×
1907
        ? clusterQuotaManager.setThrottleQuota(req)
×
1908
        : status;
×
1909
  }
1910

1911
  public TThrottleQuotaResp showThrottleQuota(TShowThrottleReq req) {
1912
    TSStatus status = confirmLeader();
×
1913
    return status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()
×
1914
        ? clusterQuotaManager.showThrottleQuota(req)
×
1915
        : new TThrottleQuotaResp(status);
×
1916
  }
1917

1918
  public TThrottleQuotaResp getThrottleQuota() {
1919
    TSStatus status = confirmLeader();
×
1920
    return status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()
×
1921
        ? clusterQuotaManager.getThrottleQuota()
×
1922
        : new TThrottleQuotaResp(status);
×
1923
  }
1924
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc