• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

apache / iotdb / #10019

07 Sep 2023 04:50AM UTC coverage: 47.489% (-0.2%) from 47.655%
#10019

push

travis_ci

web-flow
Pipe: Fix ConcurrentModificationException caused by concurrently iterating through CachedSchemaPatternMatcher.extractors when an PipeHeartbeatEvent is being assigned (#11074)

* try to fix ConcurrentModificationException when assigning PipeHeartbeatEvent

* Update CachedSchemaPatternMatcher.java

---------

Co-authored-by: 马子坤 <55695098+DanielWang2035@users.noreply.github.com>

1 of 1 new or added line in 1 file covered. (100.0%)

80551 of 169622 relevant lines covered (47.49%)

0.47 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

0.34
/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConfigManager.java
1
/*
2
 * Licensed to the Apache Software Foundation (ASF) under one
3
 * or more contributor license agreements.  See the NOTICE file
4
 * distributed with this work for additional information
5
 * regarding copyright ownership.  The ASF licenses this file
6
 * to you under the Apache License, Version 2.0 (the
7
 * "License"); you may not use this file except in compliance
8
 * with the License.  You may obtain a copy of the License at
9
 *
10
 *     http://www.apache.org/licenses/LICENSE-2.0
11
 *
12
 * Unless required by applicable law or agreed to in writing,
13
 * software distributed under the License is distributed on an
14
 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15
 * KIND, either express or implied.  See the License for the
16
 * specific language governing permissions and limitations
17
 * under the License.
18
 */
19

20
package org.apache.iotdb.confignode.manager;
21

22
import org.apache.iotdb.common.rpc.thrift.TConfigNodeLocation;
23
import org.apache.iotdb.common.rpc.thrift.TConsensusGroupId;
24
import org.apache.iotdb.common.rpc.thrift.TDataNodeConfiguration;
25
import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation;
26
import org.apache.iotdb.common.rpc.thrift.TFlushReq;
27
import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet;
28
import org.apache.iotdb.common.rpc.thrift.TSStatus;
29
import org.apache.iotdb.common.rpc.thrift.TSeriesPartitionSlot;
30
import org.apache.iotdb.common.rpc.thrift.TSetSpaceQuotaReq;
31
import org.apache.iotdb.common.rpc.thrift.TSetThrottleQuotaReq;
32
import org.apache.iotdb.common.rpc.thrift.TTimePartitionSlot;
33
import org.apache.iotdb.commons.cluster.NodeStatus;
34
import org.apache.iotdb.commons.cluster.NodeType;
35
import org.apache.iotdb.commons.conf.CommonConfig;
36
import org.apache.iotdb.commons.conf.CommonDescriptor;
37
import org.apache.iotdb.commons.conf.IoTDBConstant;
38
import org.apache.iotdb.commons.exception.IllegalPathException;
39
import org.apache.iotdb.commons.path.PartialPath;
40
import org.apache.iotdb.commons.path.PathPatternTree;
41
import org.apache.iotdb.commons.service.metric.MetricService;
42
import org.apache.iotdb.commons.utils.AuthUtils;
43
import org.apache.iotdb.commons.utils.PathUtils;
44
import org.apache.iotdb.commons.utils.StatusUtils;
45
import org.apache.iotdb.confignode.conf.ConfigNodeConfig;
46
import org.apache.iotdb.confignode.conf.ConfigNodeDescriptor;
47
import org.apache.iotdb.confignode.conf.SystemPropertiesUtils;
48
import org.apache.iotdb.confignode.consensus.request.auth.AuthorPlan;
49
import org.apache.iotdb.confignode.consensus.request.read.database.CountDatabasePlan;
50
import org.apache.iotdb.confignode.consensus.request.read.database.GetDatabasePlan;
51
import org.apache.iotdb.confignode.consensus.request.read.datanode.GetDataNodeConfigurationPlan;
52
import org.apache.iotdb.confignode.consensus.request.read.partition.GetDataPartitionPlan;
53
import org.apache.iotdb.confignode.consensus.request.read.partition.GetNodePathsPartitionPlan;
54
import org.apache.iotdb.confignode.consensus.request.read.partition.GetOrCreateDataPartitionPlan;
55
import org.apache.iotdb.confignode.consensus.request.read.partition.GetOrCreateSchemaPartitionPlan;
56
import org.apache.iotdb.confignode.consensus.request.read.partition.GetSchemaPartitionPlan;
57
import org.apache.iotdb.confignode.consensus.request.read.region.GetRegionInfoListPlan;
58
import org.apache.iotdb.confignode.consensus.request.write.confignode.RemoveConfigNodePlan;
59
import org.apache.iotdb.confignode.consensus.request.write.database.DatabaseSchemaPlan;
60
import org.apache.iotdb.confignode.consensus.request.write.database.SetDataReplicationFactorPlan;
61
import org.apache.iotdb.confignode.consensus.request.write.database.SetSchemaReplicationFactorPlan;
62
import org.apache.iotdb.confignode.consensus.request.write.database.SetTTLPlan;
63
import org.apache.iotdb.confignode.consensus.request.write.database.SetTimePartitionIntervalPlan;
64
import org.apache.iotdb.confignode.consensus.request.write.datanode.RemoveDataNodePlan;
65
import org.apache.iotdb.confignode.consensus.request.write.template.CreateSchemaTemplatePlan;
66
import org.apache.iotdb.confignode.consensus.response.auth.PermissionInfoResp;
67
import org.apache.iotdb.confignode.consensus.response.database.CountDatabaseResp;
68
import org.apache.iotdb.confignode.consensus.response.database.DatabaseSchemaResp;
69
import org.apache.iotdb.confignode.consensus.response.datanode.ConfigurationResp;
70
import org.apache.iotdb.confignode.consensus.response.datanode.DataNodeConfigurationResp;
71
import org.apache.iotdb.confignode.consensus.response.datanode.DataNodeRegisterResp;
72
import org.apache.iotdb.confignode.consensus.response.datanode.DataNodeToStatusResp;
73
import org.apache.iotdb.confignode.consensus.response.partition.DataPartitionResp;
74
import org.apache.iotdb.confignode.consensus.response.partition.RegionInfoListResp;
75
import org.apache.iotdb.confignode.consensus.response.partition.SchemaNodeManagementResp;
76
import org.apache.iotdb.confignode.consensus.response.partition.SchemaPartitionResp;
77
import org.apache.iotdb.confignode.consensus.response.template.TemplateSetInfoResp;
78
import org.apache.iotdb.confignode.consensus.statemachine.ConfigRegionStateMachine;
79
import org.apache.iotdb.confignode.manager.consensus.ConsensusManager;
80
import org.apache.iotdb.confignode.manager.cq.CQManager;
81
import org.apache.iotdb.confignode.manager.load.LoadManager;
82
import org.apache.iotdb.confignode.manager.load.cache.node.NodeHeartbeatSample;
83
import org.apache.iotdb.confignode.manager.node.ClusterNodeStartUtils;
84
import org.apache.iotdb.confignode.manager.node.NodeManager;
85
import org.apache.iotdb.confignode.manager.node.NodeMetrics;
86
import org.apache.iotdb.confignode.manager.partition.PartitionManager;
87
import org.apache.iotdb.confignode.manager.partition.PartitionMetrics;
88
import org.apache.iotdb.confignode.manager.pipe.PipeManager;
89
import org.apache.iotdb.confignode.manager.schema.ClusterSchemaManager;
90
import org.apache.iotdb.confignode.manager.schema.ClusterSchemaQuotaStatistics;
91
import org.apache.iotdb.confignode.persistence.AuthorInfo;
92
import org.apache.iotdb.confignode.persistence.ModelInfo;
93
import org.apache.iotdb.confignode.persistence.ProcedureInfo;
94
import org.apache.iotdb.confignode.persistence.TriggerInfo;
95
import org.apache.iotdb.confignode.persistence.UDFInfo;
96
import org.apache.iotdb.confignode.persistence.cq.CQInfo;
97
import org.apache.iotdb.confignode.persistence.executor.ConfigPlanExecutor;
98
import org.apache.iotdb.confignode.persistence.node.NodeInfo;
99
import org.apache.iotdb.confignode.persistence.partition.PartitionInfo;
100
import org.apache.iotdb.confignode.persistence.pipe.PipeInfo;
101
import org.apache.iotdb.confignode.persistence.quota.QuotaInfo;
102
import org.apache.iotdb.confignode.persistence.schema.ClusterSchemaInfo;
103
import org.apache.iotdb.confignode.rpc.thrift.TAlterLogicalViewReq;
104
import org.apache.iotdb.confignode.rpc.thrift.TAlterSchemaTemplateReq;
105
import org.apache.iotdb.confignode.rpc.thrift.TClusterParameters;
106
import org.apache.iotdb.confignode.rpc.thrift.TConfigNodeRegisterReq;
107
import org.apache.iotdb.confignode.rpc.thrift.TConfigNodeRegisterResp;
108
import org.apache.iotdb.confignode.rpc.thrift.TCountTimeSlotListReq;
109
import org.apache.iotdb.confignode.rpc.thrift.TCountTimeSlotListResp;
110
import org.apache.iotdb.confignode.rpc.thrift.TCreateCQReq;
111
import org.apache.iotdb.confignode.rpc.thrift.TCreateFunctionReq;
112
import org.apache.iotdb.confignode.rpc.thrift.TCreateModelReq;
113
import org.apache.iotdb.confignode.rpc.thrift.TCreatePipePluginReq;
114
import org.apache.iotdb.confignode.rpc.thrift.TCreatePipeReq;
115
import org.apache.iotdb.confignode.rpc.thrift.TCreateSchemaTemplateReq;
116
import org.apache.iotdb.confignode.rpc.thrift.TCreateTriggerReq;
117
import org.apache.iotdb.confignode.rpc.thrift.TDataNodeRegisterReq;
118
import org.apache.iotdb.confignode.rpc.thrift.TDataNodeRestartReq;
119
import org.apache.iotdb.confignode.rpc.thrift.TDataNodeRestartResp;
120
import org.apache.iotdb.confignode.rpc.thrift.TDataPartitionTableResp;
121
import org.apache.iotdb.confignode.rpc.thrift.TDatabaseSchema;
122
import org.apache.iotdb.confignode.rpc.thrift.TDeactivateSchemaTemplateReq;
123
import org.apache.iotdb.confignode.rpc.thrift.TDeleteLogicalViewReq;
124
import org.apache.iotdb.confignode.rpc.thrift.TDeleteTimeSeriesReq;
125
import org.apache.iotdb.confignode.rpc.thrift.TDropCQReq;
126
import org.apache.iotdb.confignode.rpc.thrift.TDropModelReq;
127
import org.apache.iotdb.confignode.rpc.thrift.TDropTriggerReq;
128
import org.apache.iotdb.confignode.rpc.thrift.TGetAllPipeInfoResp;
129
import org.apache.iotdb.confignode.rpc.thrift.TGetAllTemplatesResp;
130
import org.apache.iotdb.confignode.rpc.thrift.TGetDataNodeLocationsResp;
131
import org.apache.iotdb.confignode.rpc.thrift.TGetJarInListReq;
132
import org.apache.iotdb.confignode.rpc.thrift.TGetJarInListResp;
133
import org.apache.iotdb.confignode.rpc.thrift.TGetLocationForTriggerResp;
134
import org.apache.iotdb.confignode.rpc.thrift.TGetModelInfoReq;
135
import org.apache.iotdb.confignode.rpc.thrift.TGetModelInfoResp;
136
import org.apache.iotdb.confignode.rpc.thrift.TGetPathsSetTemplatesResp;
137
import org.apache.iotdb.confignode.rpc.thrift.TGetPipePluginTableResp;
138
import org.apache.iotdb.confignode.rpc.thrift.TGetRegionIdReq;
139
import org.apache.iotdb.confignode.rpc.thrift.TGetRegionIdResp;
140
import org.apache.iotdb.confignode.rpc.thrift.TGetSeriesSlotListReq;
141
import org.apache.iotdb.confignode.rpc.thrift.TGetSeriesSlotListResp;
142
import org.apache.iotdb.confignode.rpc.thrift.TGetTemplateResp;
143
import org.apache.iotdb.confignode.rpc.thrift.TGetTimeSlotListReq;
144
import org.apache.iotdb.confignode.rpc.thrift.TGetTimeSlotListResp;
145
import org.apache.iotdb.confignode.rpc.thrift.TGetTriggerTableResp;
146
import org.apache.iotdb.confignode.rpc.thrift.TGetUDFTableResp;
147
import org.apache.iotdb.confignode.rpc.thrift.TMigrateRegionReq;
148
import org.apache.iotdb.confignode.rpc.thrift.TNodeVersionInfo;
149
import org.apache.iotdb.confignode.rpc.thrift.TPermissionInfoResp;
150
import org.apache.iotdb.confignode.rpc.thrift.TRegionMigrateResultReportReq;
151
import org.apache.iotdb.confignode.rpc.thrift.TRegionRouteMapResp;
152
import org.apache.iotdb.confignode.rpc.thrift.TSchemaNodeManagementResp;
153
import org.apache.iotdb.confignode.rpc.thrift.TSchemaPartitionTableResp;
154
import org.apache.iotdb.confignode.rpc.thrift.TSetDataNodeStatusReq;
155
import org.apache.iotdb.confignode.rpc.thrift.TSetSchemaTemplateReq;
156
import org.apache.iotdb.confignode.rpc.thrift.TShowCQResp;
157
import org.apache.iotdb.confignode.rpc.thrift.TShowClusterResp;
158
import org.apache.iotdb.confignode.rpc.thrift.TShowConfigNodesResp;
159
import org.apache.iotdb.confignode.rpc.thrift.TShowDataNodesResp;
160
import org.apache.iotdb.confignode.rpc.thrift.TShowDatabaseResp;
161
import org.apache.iotdb.confignode.rpc.thrift.TShowModelReq;
162
import org.apache.iotdb.confignode.rpc.thrift.TShowModelResp;
163
import org.apache.iotdb.confignode.rpc.thrift.TShowPipeReq;
164
import org.apache.iotdb.confignode.rpc.thrift.TShowPipeResp;
165
import org.apache.iotdb.confignode.rpc.thrift.TShowThrottleReq;
166
import org.apache.iotdb.confignode.rpc.thrift.TShowTrialReq;
167
import org.apache.iotdb.confignode.rpc.thrift.TShowTrialResp;
168
import org.apache.iotdb.confignode.rpc.thrift.TShowVariablesResp;
169
import org.apache.iotdb.confignode.rpc.thrift.TSpaceQuotaResp;
170
import org.apache.iotdb.confignode.rpc.thrift.TThrottleQuotaResp;
171
import org.apache.iotdb.confignode.rpc.thrift.TTimeSlotList;
172
import org.apache.iotdb.confignode.rpc.thrift.TUnsetSchemaTemplateReq;
173
import org.apache.iotdb.confignode.rpc.thrift.TUpdateModelInfoReq;
174
import org.apache.iotdb.confignode.rpc.thrift.TUpdateModelStateReq;
175
import org.apache.iotdb.consensus.common.DataSet;
176
import org.apache.iotdb.consensus.exception.ConsensusException;
177
import org.apache.iotdb.db.schemaengine.template.Template;
178
import org.apache.iotdb.db.schemaengine.template.TemplateAlterOperationType;
179
import org.apache.iotdb.db.schemaengine.template.alter.TemplateAlterOperationUtil;
180
import org.apache.iotdb.rpc.RpcUtils;
181
import org.apache.iotdb.rpc.TSStatusCode;
182
import org.apache.iotdb.tsfile.utils.Pair;
183

184
import org.slf4j.Logger;
185
import org.slf4j.LoggerFactory;
186

187
import java.io.IOException;
188
import java.nio.ByteBuffer;
189
import java.util.ArrayList;
190
import java.util.Arrays;
191
import java.util.Collection;
192
import java.util.Collections;
193
import java.util.Comparator;
194
import java.util.HashMap;
195
import java.util.HashSet;
196
import java.util.List;
197
import java.util.Map;
198
import java.util.Set;
199
import java.util.concurrent.atomic.AtomicReference;
200
import java.util.stream.Collectors;
201

202
import static org.apache.iotdb.commons.conf.IoTDBConstant.ONE_LEVEL_PATH_WILDCARD;
203

204
/** Entry of all management, AssignPartitionManager, AssignRegionManager. */
205
public class ConfigManager implements IManager {
206

207
  private static final Logger LOGGER = LoggerFactory.getLogger(ConfigManager.class);
1✔
208

209
  private static final ConfigNodeConfig CONF = ConfigNodeDescriptor.getInstance().getConf();
1✔
210
  private static final CommonConfig COMMON_CONF = CommonDescriptor.getInstance().getConfig();
1✔
211

212
  /** Manage PartitionTable read/write requests through the ConsensusLayer. */
213
  private final AtomicReference<ConsensusManager> consensusManager = new AtomicReference<>();
×
214

215
  /** Manage cluster node. */
216
  private final NodeManager nodeManager;
217

218
  /** Manage cluster schemaengine. */
219
  private final ClusterSchemaManager clusterSchemaManager;
220

221
  /** Manage cluster regions and partitions. */
222
  private final PartitionManager partitionManager;
223

224
  /** Manage cluster authorization. */
225
  private final PermissionManager permissionManager;
226

227
  private final LoadManager loadManager;
228

229
  /** Manage procedure. */
230
  private final ProcedureManager procedureManager;
231

232
  /** UDF. */
233
  private final UDFManager udfManager;
234

235
  /** Manage Trigger. */
236
  private final TriggerManager triggerManager;
237

238
  /** CQ. */
239
  private final CQManager cqManager;
240

241
  /** ML Model. */
242
  private final ModelManager modelManager;
243

244
  /** Pipe */
245
  private final PipeManager pipeManager;
246

247
  /** Manage quotas */
248
  private final ClusterQuotaManager clusterQuotaManager;
249

250
  private final ConfigRegionStateMachine stateMachine;
251

252
  private final RetryFailedTasksThread retryFailedTasksThread;
253

254
  private static final String DATABASE = "\tDatabase=";
255

256
  public ConfigManager() throws IOException {
×
257
    // Build the persistence module
258
    NodeInfo nodeInfo = new NodeInfo();
×
259
    ClusterSchemaInfo clusterSchemaInfo = new ClusterSchemaInfo();
×
260
    PartitionInfo partitionInfo = new PartitionInfo();
×
261
    AuthorInfo authorInfo = new AuthorInfo();
×
262
    ProcedureInfo procedureInfo = new ProcedureInfo();
×
263
    UDFInfo udfInfo = new UDFInfo();
×
264
    TriggerInfo triggerInfo = new TriggerInfo();
×
265
    CQInfo cqInfo = new CQInfo();
×
266
    ModelInfo modelInfo = new ModelInfo();
×
267
    PipeInfo pipeInfo = new PipeInfo();
×
268
    QuotaInfo quotaInfo = new QuotaInfo();
×
269

270
    // Build state machine and executor
271
    ConfigPlanExecutor executor =
×
272
        new ConfigPlanExecutor(
273
            nodeInfo,
274
            clusterSchemaInfo,
275
            partitionInfo,
276
            authorInfo,
277
            procedureInfo,
278
            udfInfo,
279
            triggerInfo,
280
            cqInfo,
281
            modelInfo,
282
            pipeInfo,
283
            quotaInfo);
284
    this.stateMachine = new ConfigRegionStateMachine(this, executor);
×
285

286
    // Build the manager module
287
    this.nodeManager = new NodeManager(this, nodeInfo);
×
288
    this.clusterSchemaManager =
×
289
        new ClusterSchemaManager(this, clusterSchemaInfo, new ClusterSchemaQuotaStatistics());
290
    this.partitionManager = new PartitionManager(this, partitionInfo);
×
291
    this.permissionManager = new PermissionManager(this, authorInfo);
×
292
    this.procedureManager = new ProcedureManager(this, procedureInfo);
×
293
    this.udfManager = new UDFManager(this, udfInfo);
×
294
    this.triggerManager = new TriggerManager(this, triggerInfo);
×
295
    this.cqManager = new CQManager(this);
×
296
    this.modelManager = new ModelManager(this, modelInfo);
×
297
    this.pipeManager = new PipeManager(this, pipeInfo);
×
298

299
    // 1. keep PipeManager initialization before LoadManager initialization, because
300
    // LoadManager will register PipeManager as a listener.
301
    // 2. keep RetryFailedTasksThread initialization after LoadManager initialization,
302
    // because RetryFailedTasksThread will keep a reference of LoadManager.
303
    this.loadManager = new LoadManager(this);
×
304

305
    this.retryFailedTasksThread = new RetryFailedTasksThread(this);
×
306
    this.clusterQuotaManager = new ClusterQuotaManager(this, quotaInfo);
×
307
  }
×
308

309
  public void initConsensusManager() throws IOException {
310
    this.consensusManager.set(new ConsensusManager(this, this.stateMachine));
×
311
  }
×
312

313
  public void close() throws IOException {
314
    if (consensusManager.get() != null) {
×
315
      consensusManager.get().close();
×
316
    }
317
    if (partitionManager != null) {
×
318
      partitionManager.getRegionMaintainer().shutdown();
×
319
    }
320
    if (procedureManager != null) {
×
321
      procedureManager.shiftExecutor(false);
×
322
    }
323
  }
×
324

325
  @Override
326
  public DataSet getSystemConfiguration() {
327
    TSStatus status = confirmLeader();
×
328
    ConfigurationResp dataSet;
329
    // Notice: The Seed-ConfigNode must also have the privilege to give system configuration.
330
    // Otherwise, the IoTDB-cluster will not have the ability to restart from scratch.
331
    if (status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()
×
332
        || ConfigNodeDescriptor.getInstance().isSeedConfigNode()
×
333
        || SystemPropertiesUtils.isSeedConfigNode()) {
×
334
      dataSet = (ConfigurationResp) nodeManager.getSystemConfiguration();
×
335
    } else {
336
      dataSet = new ConfigurationResp();
×
337
      dataSet.setStatus(status);
×
338
    }
339
    return dataSet;
×
340
  }
341

342
  @Override
343
  public DataSet registerDataNode(TDataNodeRegisterReq req) {
344
    TSStatus status = confirmLeader();
×
345
    if (status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
×
346
      status =
×
347
          ClusterNodeStartUtils.confirmNodeRegistration(
×
348
              NodeType.DataNode,
349
              req.getClusterName(),
×
350
              req.getDataNodeConfiguration().getLocation(),
×
351
              this);
352
      if (status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
×
353
        return nodeManager.registerDataNode(req);
×
354
      }
355
    }
356
    DataNodeRegisterResp resp = new DataNodeRegisterResp();
×
357
    resp.setStatus(status);
×
358
    resp.setConfigNodeList(getNodeManager().getRegisteredConfigNodes());
×
359
    return resp;
×
360
  }
361

362
  @Override
363
  public TDataNodeRestartResp restartDataNode(TDataNodeRestartReq req) {
364
    TSStatus status = confirmLeader();
×
365
    if (status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
×
366
      status =
×
367
          ClusterNodeStartUtils.confirmNodeRestart(
×
368
              NodeType.DataNode,
369
              req.getClusterName(),
×
370
              req.getDataNodeConfiguration().getLocation().getDataNodeId(),
×
371
              req.getDataNodeConfiguration().getLocation(),
×
372
              this);
373
      if (status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
×
374
        return nodeManager.updateDataNodeIfNecessary(req);
×
375
      }
376
    }
377
    return new TDataNodeRestartResp()
×
378
        .setStatus(status)
×
379
        .setConfigNodeList(getNodeManager().getRegisteredConfigNodes());
×
380
  }
381

382
  @Override
383
  public DataSet removeDataNode(RemoveDataNodePlan removeDataNodePlan) {
384
    TSStatus status = confirmLeader();
×
385
    if (status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
×
386
      return nodeManager.removeDataNode(removeDataNodePlan);
×
387
    } else {
388
      DataNodeToStatusResp dataSet = new DataNodeToStatusResp();
×
389
      dataSet.setStatus(status);
×
390
      return dataSet;
×
391
    }
392
  }
393

394
  @Override
395
  public TSStatus reportDataNodeShutdown(TDataNodeLocation dataNodeLocation) {
396
    TSStatus status = confirmLeader();
×
397
    if (status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
×
398
      // Force updating the target DataNode's status to Unknown
399
      getLoadManager()
×
400
          .forceUpdateNodeCache(
×
401
              NodeType.DataNode,
402
              dataNodeLocation.getDataNodeId(),
×
403
              NodeHeartbeatSample.generateDefaultSample(NodeStatus.Unknown));
×
404
      LOGGER.info(
×
405
          "[ShutdownHook] The DataNode-{} will be shutdown soon, mark it as Unknown",
406
          dataNodeLocation.getDataNodeId());
×
407
    }
408
    return status;
×
409
  }
410

411
  @Override
412
  public TSStatus reportRegionMigrateResult(TRegionMigrateResultReportReq req) {
413
    TSStatus status = confirmLeader();
×
414
    if (status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
×
415
      procedureManager.reportRegionMigrateResult(req);
×
416
    }
417
    return status;
×
418
  }
419

420
  @Override
421
  public DataSet getDataNodeConfiguration(
422
      GetDataNodeConfigurationPlan getDataNodeConfigurationPlan) {
423
    TSStatus status = confirmLeader();
×
424
    if (status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
×
425
      return nodeManager.getDataNodeConfiguration(getDataNodeConfigurationPlan);
×
426
    } else {
427
      DataNodeConfigurationResp dataSet = new DataNodeConfigurationResp();
×
428
      dataSet.setStatus(status);
×
429
      return dataSet;
×
430
    }
431
  }
432

433
  @Override
434
  public TShowClusterResp showCluster() {
435
    TSStatus status = confirmLeader();
×
436
    if (status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
×
437
      List<TConfigNodeLocation> configNodeLocations = getNodeManager().getRegisteredConfigNodes();
×
438
      configNodeLocations.sort(Comparator.comparingInt(TConfigNodeLocation::getConfigNodeId));
×
439
      List<TDataNodeLocation> dataNodeInfoLocations =
×
440
          getNodeManager().getRegisteredDataNodes().stream()
×
441
              .map(TDataNodeConfiguration::getLocation)
×
442
              .sorted(Comparator.comparingInt(TDataNodeLocation::getDataNodeId))
×
443
              .collect(Collectors.toList());
×
444
      Map<Integer, String> nodeStatus = getLoadManager().getNodeStatusWithReason();
×
445
      Map<Integer, TNodeVersionInfo> nodeVersionInfo = getNodeManager().getNodeVersionInfo();
×
446
      return new TShowClusterResp(
×
447
          status, configNodeLocations, dataNodeInfoLocations, nodeStatus, nodeVersionInfo);
448
    } else {
449
      return new TShowClusterResp(
×
450
          status, new ArrayList<>(), new ArrayList<>(), new HashMap<>(), new HashMap<>());
451
    }
452
  }
453

454
  @Override
455
  public TShowVariablesResp showVariables() {
456
    TSStatus status = confirmLeader();
×
457
    TShowVariablesResp resp = new TShowVariablesResp();
×
458
    resp.setStatus(status);
×
459
    if (status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
×
460
      resp.setClusterParameters(getClusterParameters());
×
461
    }
462
    return resp;
×
463
  }
464

465
  public TClusterParameters getClusterParameters() {
466
    TClusterParameters clusterParameters = new TClusterParameters();
×
467
    clusterParameters.setClusterName(CONF.getClusterName());
×
468
    clusterParameters.setConfigNodeConsensusProtocolClass(
×
469
        CONF.getConfigNodeConsensusProtocolClass());
×
470
    clusterParameters.setDataRegionConsensusProtocolClass(
×
471
        CONF.getDataRegionConsensusProtocolClass());
×
472
    clusterParameters.setSchemaRegionConsensusProtocolClass(
×
473
        CONF.getSchemaRegionConsensusProtocolClass());
×
474
    clusterParameters.setSeriesPartitionSlotNum(CONF.getSeriesSlotNum());
×
475
    clusterParameters.setSeriesPartitionExecutorClass(CONF.getSeriesPartitionExecutorClass());
×
476
    clusterParameters.setDefaultTTL(COMMON_CONF.getDefaultTTLInMs());
×
477
    clusterParameters.setTimePartitionInterval(COMMON_CONF.getTimePartitionInterval());
×
478
    clusterParameters.setDataReplicationFactor(CONF.getDataReplicationFactor());
×
479
    clusterParameters.setSchemaReplicationFactor(CONF.getSchemaReplicationFactor());
×
480
    clusterParameters.setDataRegionPerDataNode(CONF.getDataRegionPerDataNode());
×
481
    clusterParameters.setSchemaRegionPerDataNode(CONF.getSchemaRegionPerDataNode());
×
482
    clusterParameters.setDiskSpaceWarningThreshold(COMMON_CONF.getDiskSpaceWarningThreshold());
×
483
    clusterParameters.setReadConsistencyLevel(CONF.getReadConsistencyLevel());
×
484
    clusterParameters.setTimestampPrecision(COMMON_CONF.getTimestampPrecision());
×
485
    clusterParameters.setSchemaEngineMode(COMMON_CONF.getSchemaEngineMode());
×
486
    clusterParameters.setTagAttributeTotalSize(COMMON_CONF.getTagAttributeTotalSize());
×
487
    clusterParameters.setDatabaseLimitThreshold(COMMON_CONF.getDatabaseLimitThreshold());
×
488
    return clusterParameters;
×
489
  }
490

491
  @Override
492
  public TSStatus setTTL(SetTTLPlan setTTLPlan) {
493
    TSStatus status = confirmLeader();
×
494
    if (status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
×
495
      return clusterSchemaManager.setTTL(setTTLPlan);
×
496
    } else {
497
      return status;
×
498
    }
499
  }
500

501
  @Override
502
  public TSStatus setSchemaReplicationFactor(
503
      SetSchemaReplicationFactorPlan setSchemaReplicationFactorPlan) {
504
    TSStatus status = confirmLeader();
×
505
    if (status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
×
506
      return clusterSchemaManager.setSchemaReplicationFactor(setSchemaReplicationFactorPlan);
×
507
    } else {
508
      return status;
×
509
    }
510
  }
511

512
  @Override
513
  public TSStatus setDataReplicationFactor(
514
      SetDataReplicationFactorPlan setDataReplicationFactorPlan) {
515
    TSStatus status = confirmLeader();
×
516
    if (status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
×
517
      return clusterSchemaManager.setDataReplicationFactor(setDataReplicationFactorPlan);
×
518
    } else {
519
      return status;
×
520
    }
521
  }
522

523
  @Override
524
  public TSStatus setTimePartitionInterval(
525
      SetTimePartitionIntervalPlan setTimePartitionIntervalPlan) {
526
    TSStatus status = confirmLeader();
×
527
    if (status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
×
528
      return clusterSchemaManager.setTimePartitionInterval(setTimePartitionIntervalPlan);
×
529
    } else {
530
      return status;
×
531
    }
532
  }
533

534
  @Override
535
  public DataSet countMatchedDatabases(CountDatabasePlan countDatabasePlan) {
536
    TSStatus status = confirmLeader();
×
537
    CountDatabaseResp result = new CountDatabaseResp();
×
538
    if (status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
×
539
      return clusterSchemaManager.countMatchedDatabases(countDatabasePlan);
×
540
    } else {
541
      result.setStatus(status);
×
542
    }
543
    return result;
×
544
  }
545

546
  @Override
547
  public DataSet getMatchedDatabaseSchemas(GetDatabasePlan getDatabaseReq) {
548
    TSStatus status = confirmLeader();
×
549
    if (status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
×
550
      return clusterSchemaManager.getMatchedDatabaseSchema(getDatabaseReq);
×
551
    } else {
552
      DatabaseSchemaResp dataSet = new DatabaseSchemaResp();
×
553
      dataSet.setStatus(status);
×
554
      return dataSet;
×
555
    }
556
  }
557

558
  @Override
559
  public synchronized TSStatus setDatabase(DatabaseSchemaPlan databaseSchemaPlan) {
560
    TSStatus status = confirmLeader();
×
561
    if (status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
×
562
      return clusterSchemaManager.setDatabase(databaseSchemaPlan);
×
563
    } else {
564
      return status;
×
565
    }
566
  }
567

568
  @Override
569
  public TSStatus alterDatabase(DatabaseSchemaPlan databaseSchemaPlan) {
570
    TSStatus status = confirmLeader();
×
571
    if (status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
×
572
      return clusterSchemaManager.alterDatabase(databaseSchemaPlan);
×
573
    } else {
574
      return status;
×
575
    }
576
  }
577

578
  @Override
579
  public synchronized TSStatus deleteDatabases(List<String> deletedPaths) {
580
    TSStatus status = confirmLeader();
×
581
    if (status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
×
582
      // remove wild
583
      Map<String, TDatabaseSchema> deleteDatabaseSchemaMap =
×
584
          getClusterSchemaManager().getMatchedDatabaseSchemasByName(deletedPaths);
×
585
      if (deleteDatabaseSchemaMap.isEmpty()) {
×
586
        return RpcUtils.getStatus(
×
587
            TSStatusCode.PATH_NOT_EXIST.getStatusCode(),
×
588
            String.format("Path %s does not exist", Arrays.toString(deletedPaths.toArray())));
×
589
      }
590
      ArrayList<TDatabaseSchema> parsedDeleteDatabases =
×
591
          new ArrayList<>(deleteDatabaseSchemaMap.values());
×
592
      return procedureManager.deleteDatabases(parsedDeleteDatabases);
×
593
    } else {
594
      return status;
×
595
    }
596
  }
597

598
  private List<TSeriesPartitionSlot> calculateRelatedSlot(PartialPath path, PartialPath database) {
599
    // The path contains `**`
600
    if (path.getFullPath().contains(IoTDBConstant.MULTI_LEVEL_PATH_WILDCARD)) {
×
601
      return new ArrayList<>();
×
602
    }
603
    List<PartialPath> innerPathList = path.alterPrefixPath(database);
×
604
    if (innerPathList.size() == 0) {
×
605
      return new ArrayList<>();
×
606
    }
607
    PartialPath innerPath = innerPathList.get(0);
×
608
    // The innerPath contains `*` and the only `*` is not in last level
609
    if (innerPath.getDevice().contains(IoTDBConstant.ONE_LEVEL_PATH_WILDCARD)) {
×
610
      return new ArrayList<>();
×
611
    }
612
    return Collections.singletonList(
×
613
        getPartitionManager().getSeriesPartitionSlot(innerPath.getDevice()));
×
614
  }
615

616
  @Override
617
  public TSchemaPartitionTableResp getSchemaPartition(PathPatternTree patternTree) {
618
    // Construct empty response
619
    TSchemaPartitionTableResp resp = new TSchemaPartitionTableResp();
×
620

621
    TSStatus status = confirmLeader();
×
622
    if (status.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
×
623
      return resp.setStatus(status);
×
624
    }
625

626
    // Build GetSchemaPartitionPlan
627
    Map<String, Set<TSeriesPartitionSlot>> partitionSlotsMap = new HashMap<>();
×
628
    List<PartialPath> relatedPaths = patternTree.getAllPathPatterns();
×
629
    List<String> allDatabases = getClusterSchemaManager().getDatabaseNames();
×
630
    List<PartialPath> allDatabasePaths = new ArrayList<>();
×
631
    for (String database : allDatabases) {
×
632
      try {
633
        allDatabasePaths.add(new PartialPath(database));
×
634
      } catch (IllegalPathException e) {
×
635
        throw new RuntimeException(e);
×
636
      }
×
637
    }
×
638
    Map<String, Boolean> scanAllRegions = new HashMap<>();
×
639
    for (PartialPath path : relatedPaths) {
×
640
      for (int i = 0; i < allDatabases.size(); i++) {
×
641
        String database = allDatabases.get(i);
×
642
        PartialPath databasePath = allDatabasePaths.get(i);
×
643
        if (path.overlapWithFullPathPrefix(databasePath) && !scanAllRegions.containsKey(database)) {
×
644
          List<TSeriesPartitionSlot> relatedSlot = calculateRelatedSlot(path, databasePath);
×
645
          if (relatedSlot.isEmpty()) {
×
646
            scanAllRegions.put(database, true);
×
647
            partitionSlotsMap.put(database, new HashSet<>());
×
648
          } else {
649
            partitionSlotsMap.computeIfAbsent(database, k -> new HashSet<>()).addAll(relatedSlot);
×
650
          }
651
        }
652
      }
653
    }
×
654

655
    // Return empty resp if the partitionSlotsMap is empty
656
    if (partitionSlotsMap.isEmpty()) {
×
657
      return resp.setStatus(StatusUtils.OK).setSchemaPartitionTable(new HashMap<>());
×
658
    }
659

660
    GetSchemaPartitionPlan getSchemaPartitionPlan =
×
661
        new GetSchemaPartitionPlan(
662
            partitionSlotsMap.entrySet().stream()
×
663
                .collect(Collectors.toMap(Map.Entry::getKey, e -> new ArrayList<>(e.getValue()))));
×
664
    SchemaPartitionResp queryResult = partitionManager.getSchemaPartition(getSchemaPartitionPlan);
×
665
    resp = queryResult.convertToRpcSchemaPartitionTableResp();
×
666

667
    LOGGER.debug("GetSchemaPartition receive paths: {}, return: {}", relatedPaths, resp);
×
668

669
    return resp;
×
670
  }
671

672
  @Override
673
  public TSchemaPartitionTableResp getOrCreateSchemaPartition(PathPatternTree patternTree) {
674
    // Construct empty response
675
    TSchemaPartitionTableResp resp = new TSchemaPartitionTableResp();
×
676

677
    TSStatus status = confirmLeader();
×
678
    if (status.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
×
679
      return resp.setStatus(status);
×
680
    }
681

682
    List<String> devicePaths = patternTree.getAllDevicePatterns();
×
683
    List<String> databases = getClusterSchemaManager().getDatabaseNames();
×
684

685
    // Build GetOrCreateSchemaPartitionPlan
686
    Map<String, List<TSeriesPartitionSlot>> partitionSlotsMap = new HashMap<>();
×
687
    for (String devicePath : devicePaths) {
×
688
      if (!devicePath.contains("*")) {
×
689
        // Only check devicePaths that without "*"
690
        for (String database : databases) {
×
691
          if (PathUtils.isStartWith(devicePath, database)) {
×
692
            partitionSlotsMap
×
693
                .computeIfAbsent(database, key -> new ArrayList<>())
×
694
                .add(getPartitionManager().getSeriesPartitionSlot(devicePath));
×
695
            break;
×
696
          }
697
        }
×
698
      }
699
    }
×
700
    GetOrCreateSchemaPartitionPlan getOrCreateSchemaPartitionPlan =
×
701
        new GetOrCreateSchemaPartitionPlan(partitionSlotsMap);
702

703
    SchemaPartitionResp queryResult =
×
704
        partitionManager.getOrCreateSchemaPartition(getOrCreateSchemaPartitionPlan);
×
705
    resp = queryResult.convertToRpcSchemaPartitionTableResp();
×
706

707
    if (CONF.isEnablePrintingNewlyCreatedPartition()) {
×
708
      printNewCreatedSchemaPartition(devicePaths, resp);
×
709
    }
710

711
    return resp;
×
712
  }
713

714
  private void printNewCreatedSchemaPartition(
715
      List<String> devicePaths, TSchemaPartitionTableResp resp) {
716
    final String lineSeparator = System.lineSeparator();
×
717
    StringBuilder devicePathString = new StringBuilder("{");
×
718
    for (String devicePath : devicePaths) {
×
719
      devicePathString.append(lineSeparator).append("\t").append(devicePath).append(",");
×
720
    }
×
721
    devicePathString.append(lineSeparator).append("}");
×
722

723
    StringBuilder schemaPartitionRespString = new StringBuilder("{");
×
724
    schemaPartitionRespString
×
725
        .append(lineSeparator)
×
726
        .append("\tTSStatus=")
×
727
        .append(resp.getStatus().getCode())
×
728
        .append(",");
×
729
    Map<String, Map<TSeriesPartitionSlot, TConsensusGroupId>> schemaPartitionTable =
×
730
        resp.getSchemaPartitionTable();
×
731
    for (Map.Entry<String, Map<TSeriesPartitionSlot, TConsensusGroupId>> databaseEntry :
732
        schemaPartitionTable.entrySet()) {
×
733
      String database = databaseEntry.getKey();
×
734
      schemaPartitionRespString
×
735
          .append(lineSeparator)
×
736
          .append(DATABASE)
×
737
          .append(database)
×
738
          .append(": {");
×
739
      for (Map.Entry<TSeriesPartitionSlot, TConsensusGroupId> slotEntry :
740
          databaseEntry.getValue().entrySet()) {
×
741
        schemaPartitionRespString
×
742
            .append(lineSeparator)
×
743
            .append("\t\t")
×
744
            .append(slotEntry.getKey())
×
745
            .append(", ")
×
746
            .append(slotEntry.getValue())
×
747
            .append(",");
×
748
      }
×
749
      schemaPartitionRespString.append(lineSeparator).append("\t},");
×
750
    }
×
751
    schemaPartitionRespString.append(lineSeparator).append("}");
×
752
    LOGGER.info(
×
753
        "[GetOrCreateSchemaPartition]:{} Receive PathPatternTree: {}, Return TSchemaPartitionTableResp: {}",
754
        lineSeparator,
755
        devicePathString,
756
        schemaPartitionRespString);
757
  }
×
758

759
  @Override
760
  public TSchemaNodeManagementResp getNodePathsPartition(PartialPath partialPath, Integer level) {
761
    TSStatus status = confirmLeader();
×
762
    if (status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
×
763
      GetNodePathsPartitionPlan getNodePathsPartitionPlan = new GetNodePathsPartitionPlan();
×
764
      getNodePathsPartitionPlan.setPartialPath(partialPath);
×
765
      if (null != level) {
×
766
        getNodePathsPartitionPlan.setLevel(level);
×
767
      }
768
      SchemaNodeManagementResp resp =
×
769
          partitionManager.getNodePathsPartition(getNodePathsPartitionPlan);
×
770
      TSchemaNodeManagementResp result =
×
771
          resp.convertToRpcSchemaNodeManagementPartitionResp(
×
772
              getLoadManager().getRegionPriorityMap());
×
773

774
      LOGGER.info(
×
775
          "getNodePathsPartition receive devicePaths: {}, level: {}, return TSchemaNodeManagementResp: {}",
776
          partialPath,
777
          level,
778
          result);
779

780
      return result;
×
781
    } else {
782
      return new TSchemaNodeManagementResp().setStatus(status);
×
783
    }
784
  }
785

786
  @Override
787
  public TDataPartitionTableResp getDataPartition(GetDataPartitionPlan getDataPartitionPlan) {
788
    // Construct empty response
789
    TDataPartitionTableResp resp = new TDataPartitionTableResp();
×
790

791
    TSStatus status = confirmLeader();
×
792
    if (status.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
×
793
      return resp.setStatus(status);
×
794
    }
795
    DataPartitionResp queryResult = partitionManager.getDataPartition(getDataPartitionPlan);
×
796

797
    resp = queryResult.convertToTDataPartitionTableResp();
×
798

799
    LOGGER.debug(
×
800
        "GetDataPartition interface receive PartitionSlotsMap: {}, return: {}",
801
        getDataPartitionPlan.getPartitionSlotsMap(),
×
802
        resp);
803

804
    return resp;
×
805
  }
806

807
  @Override
808
  public TDataPartitionTableResp getOrCreateDataPartition(
809
      GetOrCreateDataPartitionPlan getOrCreateDataPartitionPlan) {
810
    // Construct empty response
811
    TDataPartitionTableResp resp = new TDataPartitionTableResp();
×
812

813
    TSStatus status = confirmLeader();
×
814
    if (status.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
×
815
      return resp.setStatus(status);
×
816
    }
817

818
    DataPartitionResp queryResult =
×
819
        partitionManager.getOrCreateDataPartition(getOrCreateDataPartitionPlan);
×
820
    resp = queryResult.convertToTDataPartitionTableResp();
×
821

822
    if (CONF.isEnablePrintingNewlyCreatedPartition()) {
×
823
      printNewCreatedDataPartition(getOrCreateDataPartitionPlan, resp);
×
824
    }
825

826
    return resp;
×
827
  }
828

829
  private void printNewCreatedDataPartition(
830
      GetOrCreateDataPartitionPlan getOrCreateDataPartitionPlan, TDataPartitionTableResp resp) {
831
    final String lineSeparator = System.lineSeparator();
×
832
    StringBuilder partitionSlotsMapString = new StringBuilder("{");
×
833
    for (Map.Entry<String, Map<TSeriesPartitionSlot, TTimeSlotList>> databaseEntry :
834
        getOrCreateDataPartitionPlan.getPartitionSlotsMap().entrySet()) {
×
835
      String database = databaseEntry.getKey();
×
836
      partitionSlotsMapString.append(lineSeparator).append(DATABASE).append(database).append(": {");
×
837
      for (Map.Entry<TSeriesPartitionSlot, TTimeSlotList> slotEntry :
838
          databaseEntry.getValue().entrySet()) {
×
839
        partitionSlotsMapString
×
840
            .append(lineSeparator)
×
841
            .append("\t\t")
×
842
            .append(slotEntry.getKey())
×
843
            .append(",")
×
844
            .append(slotEntry.getValue());
×
845
      }
×
846
      partitionSlotsMapString.append(lineSeparator).append("\t},");
×
847
    }
×
848
    partitionSlotsMapString.append(lineSeparator).append("}");
×
849

850
    StringBuilder dataPartitionRespString = new StringBuilder("{");
×
851
    dataPartitionRespString
×
852
        .append(lineSeparator)
×
853
        .append("\tTSStatus=")
×
854
        .append(resp.getStatus().getCode())
×
855
        .append(",");
×
856
    Map<String, Map<TSeriesPartitionSlot, Map<TTimePartitionSlot, List<TConsensusGroupId>>>>
857
        dataPartitionTable = resp.getDataPartitionTable();
×
858
    for (Map.Entry<
859
            String, Map<TSeriesPartitionSlot, Map<TTimePartitionSlot, List<TConsensusGroupId>>>>
860
        databaseEntry : dataPartitionTable.entrySet()) {
×
861
      String database = databaseEntry.getKey();
×
862
      dataPartitionRespString.append(lineSeparator).append(DATABASE).append(database).append(": {");
×
863
      for (Map.Entry<TSeriesPartitionSlot, Map<TTimePartitionSlot, List<TConsensusGroupId>>>
864
          seriesSlotEntry : databaseEntry.getValue().entrySet()) {
×
865
        dataPartitionRespString
×
866
            .append(lineSeparator)
×
867
            .append("\t\t")
×
868
            .append(seriesSlotEntry.getKey())
×
869
            .append(": {");
×
870
        for (Map.Entry<TTimePartitionSlot, List<TConsensusGroupId>> timeSlotEntry :
871
            seriesSlotEntry.getValue().entrySet()) {
×
872
          dataPartitionRespString
×
873
              .append(lineSeparator)
×
874
              .append("\t\t\t")
×
875
              .append(timeSlotEntry.getKey())
×
876
              .append(", ")
×
877
              .append(timeSlotEntry.getValue())
×
878
              .append(",");
×
879
        }
×
880
        dataPartitionRespString.append(lineSeparator).append("\t\t},");
×
881
      }
×
882
      dataPartitionRespString.append(lineSeparator).append("\t}");
×
883
    }
×
884
    dataPartitionRespString.append(lineSeparator).append("}");
×
885

886
    LOGGER.info(
×
887
        "[GetOrCreateDataPartition]:{} Receive PartitionSlotsMap: {}, Return TDataPartitionTableResp: {}",
888
        lineSeparator,
889
        partitionSlotsMapString,
890
        dataPartitionRespString);
891
  }
×
892

893
  private TSStatus confirmLeader() {
894
    // Make sure the consensus layer has been initialized
895
    if (getConsensusManager() == null) {
×
896
      return new TSStatus(TSStatusCode.CONSENSUS_NOT_INITIALIZED.getStatusCode())
×
897
          .setMessage(
×
898
              "ConsensusManager of target-ConfigNode is not initialized, "
899
                  + "please make sure the target-ConfigNode has been started successfully.");
900
    }
901
    return getConsensusManager().confirmLeader();
×
902
  }
903

904
  @Override
905
  public NodeManager getNodeManager() {
906
    return nodeManager;
×
907
  }
908

909
  @Override
910
  public ClusterSchemaManager getClusterSchemaManager() {
911
    return clusterSchemaManager;
×
912
  }
913

914
  @Override
915
  public ConsensusManager getConsensusManager() {
916
    return consensusManager.get();
×
917
  }
918

919
  @Override
920
  public PartitionManager getPartitionManager() {
921
    return partitionManager;
×
922
  }
923

924
  @Override
925
  public LoadManager getLoadManager() {
926
    return loadManager;
×
927
  }
928

929
  @Override
930
  public TriggerManager getTriggerManager() {
931
    return triggerManager;
×
932
  }
933

934
  @Override
935
  public ModelManager getModelManager() {
936
    return modelManager;
×
937
  }
938

939
  @Override
940
  public PipeManager getPipeManager() {
941
    return pipeManager;
×
942
  }
943

944
  @Override
945
  public TSStatus operatePermission(AuthorPlan authorPlan) {
946
    TSStatus status = confirmLeader();
×
947
    if (status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
×
948
      return permissionManager.operatePermission(authorPlan);
×
949
    } else {
950
      return status;
×
951
    }
952
  }
953

954
  @Override
955
  public DataSet queryPermission(AuthorPlan authorPlan) {
956
    TSStatus status = confirmLeader();
×
957
    if (status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
×
958
      return permissionManager.queryPermission(authorPlan);
×
959
    } else {
960
      PermissionInfoResp dataSet = new PermissionInfoResp();
×
961
      dataSet.setStatus(status);
×
962
      return dataSet;
×
963
    }
964
  }
965

966
  @Override
967
  public TPermissionInfoResp login(String username, String password) {
968
    TSStatus status = confirmLeader();
×
969
    if (status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
×
970
      return permissionManager.login(username, password);
×
971
    } else {
972
      TPermissionInfoResp resp = AuthUtils.generateEmptyPermissionInfoResp();
×
973
      resp.setStatus(status);
×
974
      return resp;
×
975
    }
976
  }
977

978
  @Override
979
  public TPermissionInfoResp checkUserPrivileges(
980
      String username, List<PartialPath> paths, int permission) {
981
    TSStatus status = confirmLeader();
×
982
    if (status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
×
983
      return permissionManager.checkUserPrivileges(username, paths, permission);
×
984
    } else {
985
      TPermissionInfoResp resp = AuthUtils.generateEmptyPermissionInfoResp();
×
986
      resp.setStatus(status);
×
987
      return resp;
×
988
    }
989
  }
990

991
  @Override
992
  public TConfigNodeRegisterResp registerConfigNode(TConfigNodeRegisterReq req) {
993
    final int ERROR_STATUS_NODE_ID = -1;
×
994

995
    TSStatus status = confirmLeader();
×
996
    if (status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
×
997
      // Make sure the global configurations are consist
998
      status = checkConfigNodeGlobalConfig(req);
×
999
      if (status == null) {
×
1000
        status =
×
1001
            ClusterNodeStartUtils.confirmNodeRegistration(
×
1002
                NodeType.ConfigNode,
1003
                req.getClusterParameters().getClusterName(),
×
1004
                req.getConfigNodeLocation(),
×
1005
                this);
1006
        if (status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
×
1007
          return nodeManager.registerConfigNode(req);
×
1008
        }
1009
      }
1010
    }
1011

1012
    return new TConfigNodeRegisterResp().setStatus(status).setConfigNodeId(ERROR_STATUS_NODE_ID);
×
1013
  }
1014

1015
  public TSStatus checkConfigNodeGlobalConfig(TConfigNodeRegisterReq req) {
1016
    final String errorPrefix = "Reject register, please ensure that the parameter ";
×
1017
    final String errorSuffix = " is consistent with the Seed-ConfigNode.";
×
1018
    TSStatus errorStatus = new TSStatus(TSStatusCode.CONFIGURATION_ERROR.getStatusCode());
×
1019
    TClusterParameters clusterParameters = req.getClusterParameters();
×
1020

1021
    if (!clusterParameters
×
1022
        .getConfigNodeConsensusProtocolClass()
×
1023
        .equals(CONF.getConfigNodeConsensusProtocolClass())) {
×
1024
      return errorStatus.setMessage(
×
1025
          errorPrefix + "config_node_consensus_protocol_class" + errorSuffix);
1026
    }
1027
    if (!clusterParameters
×
1028
        .getDataRegionConsensusProtocolClass()
×
1029
        .equals(CONF.getDataRegionConsensusProtocolClass())) {
×
1030
      return errorStatus.setMessage(
×
1031
          errorPrefix + "data_region_consensus_protocol_class" + errorSuffix);
1032
    }
1033
    if (!clusterParameters
×
1034
        .getSchemaRegionConsensusProtocolClass()
×
1035
        .equals(CONF.getSchemaRegionConsensusProtocolClass())) {
×
1036
      return errorStatus.setMessage(
×
1037
          errorPrefix + "schema_region_consensus_protocol_class" + errorSuffix);
1038
    }
1039

1040
    if (clusterParameters.getSeriesPartitionSlotNum() != CONF.getSeriesSlotNum()) {
×
1041
      return errorStatus.setMessage(errorPrefix + "series_partition_slot_num" + errorSuffix);
×
1042
    }
1043
    if (!clusterParameters
×
1044
        .getSeriesPartitionExecutorClass()
×
1045
        .equals(CONF.getSeriesPartitionExecutorClass())) {
×
1046
      return errorStatus.setMessage(errorPrefix + "series_partition_executor_class" + errorSuffix);
×
1047
    }
1048

1049
    if (clusterParameters.getDefaultTTL()
×
1050
        != CommonDescriptor.getInstance().getConfig().getDefaultTTLInMs()) {
×
1051
      return errorStatus.setMessage(errorPrefix + "default_ttl" + errorSuffix);
×
1052
    }
1053
    if (clusterParameters.getTimePartitionInterval() != COMMON_CONF.getTimePartitionInterval()) {
×
1054
      return errorStatus.setMessage(errorPrefix + "time_partition_interval" + errorSuffix);
×
1055
    }
1056

1057
    if (clusterParameters.getSchemaReplicationFactor() != CONF.getSchemaReplicationFactor()) {
×
1058
      return errorStatus.setMessage(errorPrefix + "schema_replication_factor" + errorSuffix);
×
1059
    }
1060
    if (clusterParameters.getDataReplicationFactor() != CONF.getDataReplicationFactor()) {
×
1061
      return errorStatus.setMessage(errorPrefix + "data_replication_factor" + errorSuffix);
×
1062
    }
1063

1064
    if (clusterParameters.getSchemaRegionPerDataNode() != CONF.getSchemaRegionPerDataNode()) {
×
1065
      return errorStatus.setMessage(errorPrefix + "schema_region_per_data_node" + errorSuffix);
×
1066
    }
1067
    if (clusterParameters.getDataRegionPerDataNode() != CONF.getDataRegionPerDataNode()) {
×
1068
      return errorStatus.setMessage(errorPrefix + "data_region_per_data_node" + errorSuffix);
×
1069
    }
1070

1071
    if (!clusterParameters.getReadConsistencyLevel().equals(CONF.getReadConsistencyLevel())) {
×
1072
      return errorStatus.setMessage(errorPrefix + "read_consistency_level" + errorSuffix);
×
1073
    }
1074

1075
    if (clusterParameters.getDiskSpaceWarningThreshold()
×
1076
        != COMMON_CONF.getDiskSpaceWarningThreshold()) {
×
1077
      return errorStatus.setMessage(errorPrefix + "disk_space_warning_threshold" + errorSuffix);
×
1078
    }
1079

1080
    if (!clusterParameters.getTimestampPrecision().equals(COMMON_CONF.getTimestampPrecision())) {
×
1081
      return errorStatus.setMessage(errorPrefix + "timestamp_precision" + errorSuffix);
×
1082
    }
1083

1084
    if (!clusterParameters.getSchemaEngineMode().equals(COMMON_CONF.getSchemaEngineMode())) {
×
1085
      return errorStatus.setMessage(errorPrefix + "schema_engine_mode" + errorSuffix);
×
1086
    }
1087

1088
    if (clusterParameters.getTagAttributeTotalSize() != COMMON_CONF.getTagAttributeTotalSize()) {
×
1089
      return errorStatus.setMessage(errorPrefix + "tag_attribute_total_size" + errorSuffix);
×
1090
    }
1091

1092
    if (clusterParameters.getDatabaseLimitThreshold() != COMMON_CONF.getDatabaseLimitThreshold()) {
×
1093
      return errorStatus.setMessage(errorPrefix + "database_limit_threshold" + errorSuffix);
×
1094
    }
1095

1096
    return null;
×
1097
  }
1098

1099
  @Override
1100
  public TSStatus createPeerForConsensusGroup(List<TConfigNodeLocation> configNodeLocations) {
1101
    for (int i = 0; i < 30; i++) {
×
1102
      try {
1103
        if (consensusManager.get() == null) {
×
1104
          Thread.sleep(1000);
×
1105
        } else {
1106
          // When add non Seed-ConfigNode to the ConfigNodeGroup, the parameter should be emptyList
1107
          consensusManager.get().createPeerForConsensusGroup(Collections.emptyList());
×
1108
          return StatusUtils.OK;
×
1109
        }
1110
      } catch (InterruptedException e) {
×
1111
        Thread.currentThread().interrupt();
×
1112
        LOGGER.warn("Unexpected interruption during retry creating peer for consensus group");
×
1113
      } catch (ConsensusException e) {
×
1114
        LOGGER.error("Failed to create peer for consensus group", e);
×
1115
        break;
×
1116
      }
×
1117
    }
1118
    return StatusUtils.INTERNAL_ERROR;
×
1119
  }
1120

1121
  @Override
1122
  public TSStatus removeConfigNode(RemoveConfigNodePlan removeConfigNodePlan) {
1123
    TSStatus status = confirmLeader();
×
1124

1125
    if (status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
×
1126
      status = nodeManager.checkConfigNodeBeforeRemove(removeConfigNodePlan);
×
1127
      if (status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
×
1128
        procedureManager.removeConfigNode(removeConfigNodePlan);
×
1129
      }
1130
    }
1131

1132
    return status;
×
1133
  }
1134

1135
  @Override
1136
  public TSStatus reportConfigNodeShutdown(TConfigNodeLocation configNodeLocation) {
1137
    TSStatus status = confirmLeader();
×
1138
    if (status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
×
1139
      // Force updating the target ConfigNode's status to Unknown
1140
      getLoadManager()
×
1141
          .forceUpdateNodeCache(
×
1142
              NodeType.ConfigNode,
1143
              configNodeLocation.getConfigNodeId(),
×
1144
              NodeHeartbeatSample.generateDefaultSample(NodeStatus.Unknown));
×
1145
      LOGGER.info(
×
1146
          "[ShutdownHook] The ConfigNode-{} will be shutdown soon, mark it as Unknown",
1147
          configNodeLocation.getConfigNodeId());
×
1148
    }
1149
    return status;
×
1150
  }
1151

1152
  @Override
1153
  public TSStatus createFunction(TCreateFunctionReq req) {
1154
    TSStatus status = confirmLeader();
×
1155
    return status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()
×
1156
        ? udfManager.createFunction(req)
×
1157
        : status;
×
1158
  }
1159

1160
  @Override
1161
  public TSStatus dropFunction(String udfName) {
1162
    TSStatus status = confirmLeader();
×
1163
    return status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()
×
1164
        ? udfManager.dropFunction(udfName)
×
1165
        : status;
×
1166
  }
1167

1168
  @Override
1169
  public TGetUDFTableResp getUDFTable() {
1170
    TSStatus status = confirmLeader();
×
1171
    return status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()
×
1172
        ? udfManager.getUDFTable()
×
1173
        : new TGetUDFTableResp(status, Collections.emptyList());
×
1174
  }
1175

1176
  @Override
1177
  public TGetJarInListResp getUDFJar(TGetJarInListReq req) {
1178
    TSStatus status = confirmLeader();
×
1179
    return status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()
×
1180
        ? udfManager.getUDFJar(req)
×
1181
        : new TGetJarInListResp(status, Collections.emptyList());
×
1182
  }
1183

1184
  @Override
1185
  public TSStatus createTrigger(TCreateTriggerReq req) {
1186
    TSStatus status = confirmLeader();
×
1187
    return status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()
×
1188
        ? triggerManager.createTrigger(req)
×
1189
        : status;
×
1190
  }
1191

1192
  @Override
1193
  public TSStatus dropTrigger(TDropTriggerReq req) {
1194
    TSStatus status = confirmLeader();
×
1195
    return status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()
×
1196
        ? triggerManager.dropTrigger(req)
×
1197
        : status;
×
1198
  }
1199

1200
  @Override
1201
  public TGetTriggerTableResp getTriggerTable() {
1202
    TSStatus status = confirmLeader();
×
1203
    return status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()
×
1204
        ? triggerManager.getTriggerTable(false)
×
1205
        : new TGetTriggerTableResp(status, Collections.emptyList());
×
1206
  }
1207

1208
  @Override
1209
  public TGetTriggerTableResp getStatefulTriggerTable() {
1210
    TSStatus status = confirmLeader();
×
1211
    return status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()
×
1212
        ? triggerManager.getTriggerTable(true)
×
1213
        : new TGetTriggerTableResp(status, Collections.emptyList());
×
1214
  }
1215

1216
  @Override
1217
  public TGetLocationForTriggerResp getLocationOfStatefulTrigger(String triggerName) {
1218
    TSStatus status = confirmLeader();
×
1219
    return status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()
×
1220
        ? triggerManager.getLocationOfStatefulTrigger(triggerName)
×
1221
        : new TGetLocationForTriggerResp(status);
×
1222
  }
1223

1224
  @Override
1225
  public TGetJarInListResp getTriggerJar(TGetJarInListReq req) {
1226
    TSStatus status = confirmLeader();
×
1227
    return status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()
×
1228
        ? triggerManager.getTriggerJar(req)
×
1229
        : new TGetJarInListResp(status, Collections.emptyList());
×
1230
  }
1231

1232
  @Override
1233
  public TSStatus createPipePlugin(TCreatePipePluginReq req) {
1234
    TSStatus status = confirmLeader();
×
1235
    return status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()
×
1236
        ? pipeManager.getPipePluginCoordinator().createPipePlugin(req)
×
1237
        : status;
×
1238
  }
1239

1240
  @Override
1241
  public TSStatus dropPipePlugin(String pipePluginName) {
1242
    TSStatus status = confirmLeader();
×
1243
    return status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()
×
1244
        ? pipeManager.getPipePluginCoordinator().dropPipePlugin(pipePluginName)
×
1245
        : status;
×
1246
  }
1247

1248
  @Override
1249
  public TGetPipePluginTableResp getPipePluginTable() {
1250
    TSStatus status = confirmLeader();
×
1251
    return status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()
×
1252
        ? pipeManager.getPipePluginCoordinator().getPipePluginTable()
×
1253
        : new TGetPipePluginTableResp(status, Collections.emptyList());
×
1254
  }
1255

1256
  @Override
1257
  public TGetJarInListResp getPipePluginJar(TGetJarInListReq req) {
1258
    TSStatus status = confirmLeader();
×
1259
    return status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()
×
1260
        ? pipeManager.getPipePluginCoordinator().getPipePluginJar(req)
×
1261
        : new TGetJarInListResp(status, Collections.emptyList());
×
1262
  }
1263

1264
  @Override
1265
  public TSStatus merge() {
1266
    TSStatus status = confirmLeader();
×
1267
    return status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()
×
1268
        ? RpcUtils.squashResponseStatusList(nodeManager.merge())
×
1269
        : status;
×
1270
  }
1271

1272
  @Override
1273
  public TSStatus flush(TFlushReq req) {
1274
    TSStatus status = confirmLeader();
×
1275
    return status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()
×
1276
        ? RpcUtils.squashResponseStatusList(nodeManager.flush(req))
×
1277
        : status;
×
1278
  }
1279

1280
  @Override
1281
  public TSStatus clearCache() {
1282
    TSStatus status = confirmLeader();
×
1283
    return status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()
×
1284
        ? RpcUtils.squashResponseStatusList(nodeManager.clearCache())
×
1285
        : status;
×
1286
  }
1287

1288
  @Override
1289
  public TSStatus loadConfiguration() {
1290
    TSStatus status = confirmLeader();
×
1291
    return status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()
×
1292
        ? RpcUtils.squashResponseStatusList(nodeManager.loadConfiguration())
×
1293
        : status;
×
1294
  }
1295

1296
  @Override
1297
  public TSStatus setSystemStatus(String systemStatus) {
1298
    TSStatus status = confirmLeader();
×
1299
    return status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()
×
1300
        ? RpcUtils.squashResponseStatusList(nodeManager.setSystemStatus(systemStatus))
×
1301
        : status;
×
1302
  }
1303

1304
  @Override
1305
  public TSStatus setDataNodeStatus(TSetDataNodeStatusReq req) {
1306
    TSStatus status = confirmLeader();
×
1307
    return status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()
×
1308
        ? nodeManager.setDataNodeStatus(req)
×
1309
        : status;
×
1310
  }
1311

1312
  @Override
1313
  public TSStatus killQuery(String queryId, int dataNodeId) {
1314
    TSStatus status = confirmLeader();
×
1315
    return status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()
×
1316
        ? nodeManager.killQuery(queryId, dataNodeId)
×
1317
        : status;
×
1318
  }
1319

1320
  @Override
1321
  public TGetDataNodeLocationsResp getRunningDataNodeLocations() {
1322
    TSStatus status = confirmLeader();
×
1323
    return status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()
×
1324
        ? new TGetDataNodeLocationsResp(
×
1325
            new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode()),
×
1326
            nodeManager.filterDataNodeThroughStatus(NodeStatus.Running).stream()
×
1327
                .map(TDataNodeConfiguration::getLocation)
×
1328
                .collect(Collectors.toList()))
×
1329
        : new TGetDataNodeLocationsResp(status, Collections.emptyList());
×
1330
  }
1331

1332
  @Override
1333
  public TRegionRouteMapResp getLatestRegionRouteMap() {
1334
    TSStatus status = confirmLeader();
×
1335
    TRegionRouteMapResp resp = new TRegionRouteMapResp(status);
×
1336

1337
    if (status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
×
1338
      resp.setTimestamp(System.currentTimeMillis());
×
1339
      resp.setRegionRouteMap(getLoadManager().getRegionPriorityMap());
×
1340
    }
1341

1342
    return resp;
×
1343
  }
1344

1345
  @Override
1346
  public UDFManager getUDFManager() {
1347
    return udfManager;
×
1348
  }
1349

1350
  @Override
1351
  public RegionInfoListResp showRegion(GetRegionInfoListPlan getRegionInfoListPlan) {
1352
    TSStatus status = confirmLeader();
×
1353
    if (status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
×
1354
      return partitionManager.getRegionInfoList(getRegionInfoListPlan);
×
1355
    } else {
1356
      RegionInfoListResp regionResp = new RegionInfoListResp();
×
1357
      regionResp.setStatus(status);
×
1358
      return regionResp;
×
1359
    }
1360
  }
1361

1362
  @Override
1363
  public TShowDataNodesResp showDataNodes() {
1364
    TSStatus status = confirmLeader();
×
1365
    TShowDataNodesResp resp = new TShowDataNodesResp();
×
1366
    if (status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
×
1367
      return resp.setDataNodesInfoList(nodeManager.getRegisteredDataNodeInfoList())
×
1368
          .setStatus(StatusUtils.OK);
×
1369
    } else {
1370
      return resp.setStatus(status);
×
1371
    }
1372
  }
1373

1374
  @Override
1375
  public TShowConfigNodesResp showConfigNodes() {
1376
    TSStatus status = confirmLeader();
×
1377
    TShowConfigNodesResp resp = new TShowConfigNodesResp();
×
1378
    if (status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
×
1379
      return resp.setConfigNodesInfoList(nodeManager.getRegisteredConfigNodeInfoList())
×
1380
          .setStatus(StatusUtils.OK);
×
1381
    } else {
1382
      return resp.setStatus(status);
×
1383
    }
1384
  }
1385

1386
  @Override
1387
  public TShowDatabaseResp showDatabase(GetDatabasePlan getDatabasePlan) {
1388
    TSStatus status = confirmLeader();
×
1389
    if (status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
×
1390
      return getClusterSchemaManager().showDatabase(getDatabasePlan);
×
1391
    } else {
1392
      return new TShowDatabaseResp().setStatus(status);
×
1393
    }
1394
  }
1395

1396
  @Override
1397
  public ProcedureManager getProcedureManager() {
1398
    return procedureManager;
×
1399
  }
1400

1401
  @Override
1402
  public CQManager getCQManager() {
1403
    return cqManager;
×
1404
  }
1405

1406
  @Override
1407
  public ClusterQuotaManager getClusterQuotaManager() {
1408
    return clusterQuotaManager;
×
1409
  }
1410

1411
  @Override
1412
  public RetryFailedTasksThread getRetryFailedTasksThread() {
1413
    return retryFailedTasksThread;
×
1414
  }
1415

1416
  @Override
1417
  public void addMetrics() {
1418
    MetricService.getInstance().addMetricSet(new NodeMetrics(getNodeManager()));
×
1419
    MetricService.getInstance().addMetricSet(new PartitionMetrics(this));
×
1420
  }
×
1421

1422
  @Override
1423
  public TSStatus createSchemaTemplate(TCreateSchemaTemplateReq req) {
1424
    TSStatus status = confirmLeader();
×
1425
    if (status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
×
1426
      CreateSchemaTemplatePlan createSchemaTemplatePlan =
×
1427
          new CreateSchemaTemplatePlan(req.getSerializedTemplate());
×
1428
      return clusterSchemaManager.createTemplate(createSchemaTemplatePlan);
×
1429
    } else {
1430
      return status;
×
1431
    }
1432
  }
1433

1434
  @Override
1435
  public TGetAllTemplatesResp getAllTemplates() {
1436
    TSStatus status = confirmLeader();
×
1437
    if (status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
×
1438
      return clusterSchemaManager.getAllTemplates();
×
1439
    } else {
1440
      return new TGetAllTemplatesResp().setStatus(status);
×
1441
    }
1442
  }
1443

1444
  @Override
1445
  public TGetTemplateResp getTemplate(String req) {
1446
    TSStatus status = confirmLeader();
×
1447
    if (status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
×
1448
      return clusterSchemaManager.getTemplate(req);
×
1449
    } else {
1450
      return new TGetTemplateResp().setStatus(status);
×
1451
    }
1452
  }
1453

1454
  @Override
1455
  public synchronized TSStatus setSchemaTemplate(TSetSchemaTemplateReq req) {
1456
    TSStatus status = confirmLeader();
×
1457
    if (status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
×
1458
      return procedureManager.setSchemaTemplate(req.getQueryId(), req.getName(), req.getPath());
×
1459
    } else {
1460
      return status;
×
1461
    }
1462
  }
1463

1464
  @Override
1465
  public TGetPathsSetTemplatesResp getPathsSetTemplate(String req) {
1466
    TSStatus status = confirmLeader();
×
1467
    if (status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
×
1468
      return clusterSchemaManager.getPathsSetTemplate(req);
×
1469
    } else {
1470
      return new TGetPathsSetTemplatesResp(status);
×
1471
    }
1472
  }
1473

1474
  @Override
1475
  public TSStatus deactivateSchemaTemplate(TDeactivateSchemaTemplateReq req) {
1476
    TSStatus status = confirmLeader();
×
1477
    if (status.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
×
1478
      return status;
×
1479
    }
1480

1481
    PathPatternTree patternTree =
×
1482
        PathPatternTree.deserialize(ByteBuffer.wrap(req.getPathPatternTree()));
×
1483

1484
    List<PartialPath> patternList = patternTree.getAllPathPatterns();
×
1485
    TemplateSetInfoResp templateSetInfoResp = clusterSchemaManager.getTemplateSetInfo(patternList);
×
1486
    if (templateSetInfoResp.getStatus().getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
×
1487
      return templateSetInfoResp.getStatus();
×
1488
    }
1489

1490
    Map<PartialPath, List<Template>> templateSetInfo = templateSetInfoResp.getPatternTemplateMap();
×
1491
    if (templateSetInfo.isEmpty()) {
×
1492
      return RpcUtils.getStatus(
×
1493
          TSStatusCode.TEMPLATE_NOT_SET,
1494
          String.format(
×
1495
              "Schema Template %s is not set on any prefix path of %s",
1496
              req.getTemplateName(), patternList));
×
1497
    }
1498

1499
    if (!req.getTemplateName().equals(ONE_LEVEL_PATH_WILDCARD)) {
×
1500
      Map<PartialPath, List<Template>> filteredTemplateSetInfo = new HashMap<>();
×
1501
      for (Map.Entry<PartialPath, List<Template>> entry : templateSetInfo.entrySet()) {
×
1502
        for (Template template : entry.getValue()) {
×
1503
          if (template.getName().equals(req.getTemplateName())) {
×
1504
            filteredTemplateSetInfo.put(entry.getKey(), Collections.singletonList(template));
×
1505
            break;
×
1506
          }
1507
        }
×
1508
      }
×
1509

1510
      if (filteredTemplateSetInfo.isEmpty()) {
×
1511
        return RpcUtils.getStatus(
×
1512
            TSStatusCode.TEMPLATE_NOT_SET,
1513
            String.format(
×
1514
                "Schema Template %s is not set on any prefix path of %s",
1515
                req.getTemplateName(), patternList));
×
1516
      }
1517

1518
      templateSetInfo = filteredTemplateSetInfo;
×
1519
    }
1520

1521
    return procedureManager.deactivateTemplate(req.getQueryId(), templateSetInfo);
×
1522
  }
1523

1524
  @Override
1525
  public synchronized TSStatus unsetSchemaTemplate(TUnsetSchemaTemplateReq req) {
1526
    TSStatus status = confirmLeader();
×
1527
    if (status.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
×
1528
      return status;
×
1529
    }
1530
    Pair<TSStatus, Template> checkResult =
×
1531
        clusterSchemaManager.checkIsTemplateSetOnPath(req.getTemplateName(), req.getPath());
×
1532
    if (checkResult.left.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
×
1533
      try {
1534
        return procedureManager.unsetSchemaTemplate(
×
1535
            req.getQueryId(), checkResult.right, new PartialPath(req.getPath()));
×
1536
      } catch (IllegalPathException e) {
×
1537
        return RpcUtils.getStatus(e.getErrorCode(), e.getMessage());
×
1538
      }
1539
    } else {
1540
      return checkResult.left;
×
1541
    }
1542
  }
1543

1544
  @Override
1545
  public TSStatus dropSchemaTemplate(String templateName) {
1546
    TSStatus status = confirmLeader();
×
1547
    if (status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
×
1548
      return clusterSchemaManager.dropSchemaTemplate(templateName);
×
1549
    } else {
1550
      return status;
×
1551
    }
1552
  }
1553

1554
  @Override
1555
  public TSStatus alterSchemaTemplate(TAlterSchemaTemplateReq req) {
1556
    TSStatus status = confirmLeader();
×
1557
    if (status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
×
1558
      ByteBuffer buffer = ByteBuffer.wrap(req.getTemplateAlterInfo());
×
1559
      TemplateAlterOperationType operationType =
×
1560
          TemplateAlterOperationUtil.parseOperationType(buffer);
×
1561
      if (operationType.equals(TemplateAlterOperationType.EXTEND_TEMPLATE)) {
×
1562
        return clusterSchemaManager.extendSchemaTemplate(
×
1563
            TemplateAlterOperationUtil.parseTemplateExtendInfo(buffer));
×
1564
      }
1565
      return RpcUtils.getStatus(TSStatusCode.UNSUPPORTED_OPERATION);
×
1566
    } else {
1567
      return status;
×
1568
    }
1569
  }
1570

1571
  @Override
1572
  public TSStatus deleteTimeSeries(TDeleteTimeSeriesReq req) {
1573
    TSStatus status = confirmLeader();
×
1574
    if (status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
×
1575
      return procedureManager.deleteTimeSeries(req);
×
1576
    } else {
1577
      return status;
×
1578
    }
1579
  }
1580

1581
  @Override
1582
  public TSStatus deleteLogicalView(TDeleteLogicalViewReq req) {
1583
    TSStatus status = confirmLeader();
×
1584
    if (status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
×
1585
      return procedureManager.deleteLogicalView(req);
×
1586
    } else {
1587
      return status;
×
1588
    }
1589
  }
1590

1591
  @Override
1592
  public TSStatus alterLogicalView(TAlterLogicalViewReq req) {
1593
    TSStatus status = confirmLeader();
×
1594
    if (status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
×
1595
      return procedureManager.alterLogicalView(req);
×
1596
    } else {
1597
      return status;
×
1598
    }
1599
  }
1600

1601
  @Override
1602
  public TSStatus createPipe(TCreatePipeReq req) {
1603
    TSStatus status = confirmLeader();
×
1604
    return status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()
×
1605
        ? pipeManager.getPipeTaskCoordinator().createPipe(req)
×
1606
        : status;
×
1607
  }
1608

1609
  @Override
1610
  public TSStatus startPipe(String pipeName) {
1611
    TSStatus status = confirmLeader();
×
1612
    return status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()
×
1613
        ? pipeManager.getPipeTaskCoordinator().startPipe(pipeName)
×
1614
        : status;
×
1615
  }
1616

1617
  @Override
1618
  public TSStatus stopPipe(String pipeName) {
1619
    TSStatus status = confirmLeader();
×
1620
    return status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()
×
1621
        ? pipeManager.getPipeTaskCoordinator().stopPipe(pipeName)
×
1622
        : status;
×
1623
  }
1624

1625
  @Override
1626
  public TSStatus dropPipe(String pipeName) {
1627
    TSStatus status = confirmLeader();
×
1628
    return status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()
×
1629
        ? pipeManager.getPipeTaskCoordinator().dropPipe(pipeName)
×
1630
        : status;
×
1631
  }
1632

1633
  @Override
1634
  public TShowPipeResp showPipe(TShowPipeReq req) {
1635
    TSStatus status = confirmLeader();
×
1636
    return status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()
×
1637
        ? pipeManager.getPipeTaskCoordinator().showPipes(req)
×
1638
        : new TShowPipeResp().setStatus(status);
×
1639
  }
1640

1641
  @Override
1642
  public TGetAllPipeInfoResp getAllPipeInfo() {
1643
    TSStatus status = confirmLeader();
×
1644
    return status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()
×
1645
        ? pipeManager.getPipeTaskCoordinator().getAllPipeInfo()
×
1646
        : new TGetAllPipeInfoResp(status, Collections.emptyList());
×
1647
  }
1648

1649
  @Override
1650
  public TGetRegionIdResp getRegionId(TGetRegionIdReq req) {
1651
    TSStatus status = confirmLeader();
×
1652
    return status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()
×
1653
        ? partitionManager.getRegionId(req).convertToRpcGetRegionIdResp()
×
1654
        : new TGetRegionIdResp(status);
×
1655
  }
1656

1657
  @Override
1658
  public TGetTimeSlotListResp getTimeSlotList(TGetTimeSlotListReq req) {
1659
    TSStatus status = confirmLeader();
×
1660
    return status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()
×
1661
        ? partitionManager.getTimeSlotList(req).convertToRpcGetTimeSlotListResp()
×
1662
        : new TGetTimeSlotListResp(status);
×
1663
  }
1664

1665
  @Override
1666
  public TCountTimeSlotListResp countTimeSlotList(TCountTimeSlotListReq req) {
1667
    TSStatus status = confirmLeader();
×
1668
    return status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()
×
1669
        ? partitionManager.countTimeSlotList(req).convertToRpcCountTimeSlotListResp()
×
1670
        : new TCountTimeSlotListResp(status);
×
1671
  }
1672

1673
  @Override
1674
  public TGetSeriesSlotListResp getSeriesSlotList(TGetSeriesSlotListReq req) {
1675
    TSStatus status = confirmLeader();
×
1676
    return status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()
×
1677
        ? partitionManager.getSeriesSlotList(req).convertToRpcGetSeriesSlotListResp()
×
1678
        : new TGetSeriesSlotListResp(status);
×
1679
  }
1680

1681
  @Override
1682
  public TSStatus migrateRegion(TMigrateRegionReq req) {
1683
    TSStatus status = confirmLeader();
×
1684
    return status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()
×
1685
        ? procedureManager.migrateRegion(req)
×
1686
        : status;
×
1687
  }
1688

1689
  @Override
1690
  public TSStatus createCQ(TCreateCQReq req) {
1691
    TSStatus status = confirmLeader();
×
1692
    return status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()
×
1693
        ? cqManager.createCQ(req)
×
1694
        : status;
×
1695
  }
1696

1697
  @Override
1698
  public TSStatus dropCQ(TDropCQReq req) {
1699
    TSStatus status = confirmLeader();
×
1700
    return status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()
×
1701
        ? cqManager.dropCQ(req)
×
1702
        : status;
×
1703
  }
1704

1705
  @Override
1706
  public TShowCQResp showCQ() {
1707
    TSStatus status = confirmLeader();
×
1708
    return status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()
×
1709
        ? cqManager.showCQ()
×
1710
        : new TShowCQResp(status, Collections.emptyList());
×
1711
  }
1712

1713
  /**
1714
   * Get all related schemaRegion which may contains the timeseries matched by given patternTree.
1715
   */
1716
  public Map<TConsensusGroupId, TRegionReplicaSet> getRelatedSchemaRegionGroup(
1717
      PathPatternTree patternTree) {
1718
    Map<String, Map<TSeriesPartitionSlot, TConsensusGroupId>> schemaPartitionTable =
×
1719
        getSchemaPartition(patternTree).getSchemaPartitionTable();
×
1720

1721
    List<TRegionReplicaSet> allRegionReplicaSets = getPartitionManager().getAllReplicaSets();
×
1722
    Set<TConsensusGroupId> groupIdSet =
×
1723
        schemaPartitionTable.values().stream()
×
1724
            .flatMap(m -> m.values().stream())
×
1725
            .collect(Collectors.toSet());
×
1726
    Map<TConsensusGroupId, TRegionReplicaSet> filteredRegionReplicaSets = new HashMap<>();
×
1727
    for (TRegionReplicaSet regionReplicaSet : allRegionReplicaSets) {
×
1728
      if (groupIdSet.contains(regionReplicaSet.getRegionId())) {
×
1729
        filteredRegionReplicaSets.put(regionReplicaSet.getRegionId(), regionReplicaSet);
×
1730
      }
1731
    }
×
1732
    return filteredRegionReplicaSets;
×
1733
  }
1734

1735
  /**
1736
   * Get all related dataRegion which may contains the data of specific timeseries matched by given
1737
   * patternTree
1738
   */
1739
  public Map<TConsensusGroupId, TRegionReplicaSet> getRelatedDataRegionGroup(
1740
      PathPatternTree patternTree) {
1741
    // Get all databases and slots by getting schemaengine partition
1742
    Map<String, Map<TSeriesPartitionSlot, TConsensusGroupId>> schemaPartitionTable =
×
1743
        getSchemaPartition(patternTree).getSchemaPartitionTable();
×
1744

1745
    // Construct request for getting data partition
1746
    Map<String, Map<TSeriesPartitionSlot, TTimeSlotList>> partitionSlotsMap = new HashMap<>();
×
1747
    schemaPartitionTable.forEach(
×
1748
        (key, value) -> {
1749
          Map<TSeriesPartitionSlot, TTimeSlotList> slotListMap = new HashMap<>();
×
1750
          value
×
1751
              .keySet()
×
1752
              .forEach(
×
1753
                  slot ->
1754
                      slotListMap.put(
×
1755
                          slot, new TTimeSlotList(Collections.emptyList(), true, true)));
×
1756
          partitionSlotsMap.put(key, slotListMap);
×
1757
        });
×
1758

1759
    // Get all data partitions
1760
    GetDataPartitionPlan getDataPartitionPlan = new GetDataPartitionPlan(partitionSlotsMap);
×
1761
    Map<String, Map<TSeriesPartitionSlot, Map<TTimePartitionSlot, List<TConsensusGroupId>>>>
1762
        dataPartitionTable = getDataPartition(getDataPartitionPlan).getDataPartitionTable();
×
1763

1764
    // Get all region replicaset of target data partitions
1765
    List<TRegionReplicaSet> allRegionReplicaSets = getPartitionManager().getAllReplicaSets();
×
1766
    Set<TConsensusGroupId> groupIdSet =
×
1767
        dataPartitionTable.values().stream()
×
1768
            .flatMap(
×
1769
                tSeriesPartitionSlotMapMap ->
1770
                    tSeriesPartitionSlotMapMap.values().stream()
×
1771
                        .flatMap(
×
1772
                            tTimePartitionSlotListMap ->
1773
                                tTimePartitionSlotListMap.values().stream()
×
1774
                                    .flatMap(Collection::stream)))
×
1775
            .collect(Collectors.toSet());
×
1776
    Map<TConsensusGroupId, TRegionReplicaSet> filteredRegionReplicaSets = new HashMap<>();
×
1777
    for (TRegionReplicaSet regionReplicaSet : allRegionReplicaSets) {
×
1778
      if (groupIdSet.contains(regionReplicaSet.getRegionId())) {
×
1779
        filteredRegionReplicaSets.put(regionReplicaSet.getRegionId(), regionReplicaSet);
×
1780
      }
1781
    }
×
1782
    return filteredRegionReplicaSets;
×
1783
  }
1784

1785
  public TSStatus transfer(List<TDataNodeLocation> newUnknownDataList) {
1786
    Map<Integer, TDataNodeLocation> runningDataNodeLocationMap = new HashMap<>();
×
1787
    nodeManager
×
1788
        .filterDataNodeThroughStatus(NodeStatus.Running)
×
1789
        .forEach(
×
1790
            dataNodeConfiguration ->
1791
                runningDataNodeLocationMap.put(
×
1792
                    dataNodeConfiguration.getLocation().getDataNodeId(),
×
1793
                    dataNodeConfiguration.getLocation()));
×
1794
    if (runningDataNodeLocationMap.isEmpty()) {
×
1795
      // No running DataNode, will not transfer and print log
1796
      return new TSStatus(TSStatusCode.EXECUTE_STATEMENT_ERROR.getStatusCode());
×
1797
    }
1798

1799
    newUnknownDataList.forEach(
×
1800
        dataNodeLocation -> runningDataNodeLocationMap.remove(dataNodeLocation.getDataNodeId()));
×
1801

1802
    LOGGER.info("Start transfer of {}", newUnknownDataList);
×
1803
    // Transfer trigger
1804
    TSStatus transferResult =
×
1805
        triggerManager.transferTrigger(newUnknownDataList, runningDataNodeLocationMap);
×
1806
    if (transferResult.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
×
1807
      LOGGER.warn("Fail to transfer because {}, will retry", transferResult.getMessage());
×
1808
    }
1809

1810
    return transferResult;
×
1811
  }
1812

1813
  @Override
1814
  public TSStatus createModel(TCreateModelReq req) {
1815
    TSStatus status = confirmLeader();
×
1816
    return status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()
×
1817
        ? modelManager.createModel(req)
×
1818
        : status;
×
1819
  }
1820

1821
  @Override
1822
  public TSStatus dropModel(TDropModelReq req) {
1823
    TSStatus status = confirmLeader();
×
1824
    return status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()
×
1825
        ? modelManager.dropModel(req)
×
1826
        : status;
×
1827
  }
1828

1829
  @Override
1830
  public TShowModelResp showModel(TShowModelReq req) {
1831
    TSStatus status = confirmLeader();
×
1832
    return status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()
×
1833
        ? modelManager.showModel(req)
×
1834
        : new TShowModelResp(status, Collections.emptyList());
×
1835
  }
1836

1837
  @Override
1838
  public TShowTrialResp showTrial(TShowTrialReq req) {
1839
    TSStatus status = confirmLeader();
×
1840
    return status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()
×
1841
        ? modelManager.showTrial(req)
×
1842
        : new TShowTrialResp(status, Collections.emptyList());
×
1843
  }
1844

1845
  @Override
1846
  public TSStatus updateModelInfo(TUpdateModelInfoReq req) {
1847
    TSStatus status = confirmLeader();
×
1848
    return status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()
×
1849
        ? modelManager.updateModelInfo(req)
×
1850
        : status;
×
1851
  }
1852

1853
  @Override
1854
  public TSStatus updateModelState(TUpdateModelStateReq req) {
1855
    TSStatus status = confirmLeader();
×
1856
    return status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()
×
1857
        ? modelManager.updateModelState(req)
×
1858
        : status;
×
1859
  }
1860

1861
  @Override
1862
  public TGetModelInfoResp getModelInfo(TGetModelInfoReq req) {
1863
    TSStatus status = confirmLeader();
×
1864
    return status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()
×
1865
        ? modelManager.getModelInfo(req)
×
1866
        : new TGetModelInfoResp(status);
×
1867
  }
1868

1869
  @Override
1870
  public TSStatus setSpaceQuota(TSetSpaceQuotaReq req) {
1871
    TSStatus status = confirmLeader();
×
1872
    return status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()
×
1873
        ? clusterQuotaManager.setSpaceQuota(req)
×
1874
        : status;
×
1875
  }
1876

1877
  public TSpaceQuotaResp showSpaceQuota(List<String> databases) {
1878
    TSStatus status = confirmLeader();
×
1879
    return status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()
×
1880
        ? clusterQuotaManager.showSpaceQuota(databases)
×
1881
        : new TSpaceQuotaResp(status);
×
1882
  }
1883

1884
  public TSpaceQuotaResp getSpaceQuota() {
1885
    TSStatus status = confirmLeader();
×
1886
    return status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()
×
1887
        ? clusterQuotaManager.getSpaceQuota()
×
1888
        : new TSpaceQuotaResp(status);
×
1889
  }
1890

1891
  public TSStatus setThrottleQuota(TSetThrottleQuotaReq req) {
1892
    TSStatus status = confirmLeader();
×
1893
    return status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()
×
1894
        ? clusterQuotaManager.setThrottleQuota(req)
×
1895
        : status;
×
1896
  }
1897

1898
  public TThrottleQuotaResp showThrottleQuota(TShowThrottleReq req) {
1899
    TSStatus status = confirmLeader();
×
1900
    return status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()
×
1901
        ? clusterQuotaManager.showThrottleQuota(req)
×
1902
        : new TThrottleQuotaResp(status);
×
1903
  }
1904

1905
  public TThrottleQuotaResp getThrottleQuota() {
1906
    TSStatus status = confirmLeader();
×
1907
    return status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()
×
1908
        ? clusterQuotaManager.getThrottleQuota()
×
1909
        : new TThrottleQuotaResp(status);
×
1910
  }
1911
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc