• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

apache / iotdb / #9965

30 Aug 2023 11:08AM UTC coverage: 47.773% (+0.01%) from 47.759%
#9965

push

travis_ci

web-flow
[IOTDB-6129] ConfigNode restarts without relying on Seed-ConfigNode  (#10988)

9 of 9 new or added lines in 2 files covered. (100.0%)

80390 of 168274 relevant lines covered (47.77%)

0.48 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

0.34
/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConfigManager.java
1
/*
2
 * Licensed to the Apache Software Foundation (ASF) under one
3
 * or more contributor license agreements.  See the NOTICE file
4
 * distributed with this work for additional information
5
 * regarding copyright ownership.  The ASF licenses this file
6
 * to you under the Apache License, Version 2.0 (the
7
 * "License"); you may not use this file except in compliance
8
 * with the License.  You may obtain a copy of the License at
9
 *
10
 *     http://www.apache.org/licenses/LICENSE-2.0
11
 *
12
 * Unless required by applicable law or agreed to in writing,
13
 * software distributed under the License is distributed on an
14
 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15
 * KIND, either express or implied.  See the License for the
16
 * specific language governing permissions and limitations
17
 * under the License.
18
 */
19

20
package org.apache.iotdb.confignode.manager;
21

22
import org.apache.iotdb.common.rpc.thrift.TConfigNodeLocation;
23
import org.apache.iotdb.common.rpc.thrift.TConsensusGroupId;
24
import org.apache.iotdb.common.rpc.thrift.TDataNodeConfiguration;
25
import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation;
26
import org.apache.iotdb.common.rpc.thrift.TFlushReq;
27
import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet;
28
import org.apache.iotdb.common.rpc.thrift.TSStatus;
29
import org.apache.iotdb.common.rpc.thrift.TSeriesPartitionSlot;
30
import org.apache.iotdb.common.rpc.thrift.TSetSpaceQuotaReq;
31
import org.apache.iotdb.common.rpc.thrift.TSetThrottleQuotaReq;
32
import org.apache.iotdb.common.rpc.thrift.TTimePartitionSlot;
33
import org.apache.iotdb.commons.cluster.NodeStatus;
34
import org.apache.iotdb.commons.cluster.NodeType;
35
import org.apache.iotdb.commons.conf.CommonConfig;
36
import org.apache.iotdb.commons.conf.CommonDescriptor;
37
import org.apache.iotdb.commons.conf.IoTDBConstant;
38
import org.apache.iotdb.commons.exception.IllegalPathException;
39
import org.apache.iotdb.commons.path.PartialPath;
40
import org.apache.iotdb.commons.path.PathPatternTree;
41
import org.apache.iotdb.commons.service.metric.MetricService;
42
import org.apache.iotdb.commons.utils.AuthUtils;
43
import org.apache.iotdb.commons.utils.PathUtils;
44
import org.apache.iotdb.commons.utils.StatusUtils;
45
import org.apache.iotdb.confignode.conf.ConfigNodeConfig;
46
import org.apache.iotdb.confignode.conf.ConfigNodeDescriptor;
47
import org.apache.iotdb.confignode.conf.SystemPropertiesUtils;
48
import org.apache.iotdb.confignode.consensus.request.auth.AuthorPlan;
49
import org.apache.iotdb.confignode.consensus.request.read.database.CountDatabasePlan;
50
import org.apache.iotdb.confignode.consensus.request.read.database.GetDatabasePlan;
51
import org.apache.iotdb.confignode.consensus.request.read.datanode.GetDataNodeConfigurationPlan;
52
import org.apache.iotdb.confignode.consensus.request.read.partition.GetDataPartitionPlan;
53
import org.apache.iotdb.confignode.consensus.request.read.partition.GetNodePathsPartitionPlan;
54
import org.apache.iotdb.confignode.consensus.request.read.partition.GetOrCreateDataPartitionPlan;
55
import org.apache.iotdb.confignode.consensus.request.read.partition.GetOrCreateSchemaPartitionPlan;
56
import org.apache.iotdb.confignode.consensus.request.read.partition.GetSchemaPartitionPlan;
57
import org.apache.iotdb.confignode.consensus.request.read.region.GetRegionInfoListPlan;
58
import org.apache.iotdb.confignode.consensus.request.write.confignode.RemoveConfigNodePlan;
59
import org.apache.iotdb.confignode.consensus.request.write.database.DatabaseSchemaPlan;
60
import org.apache.iotdb.confignode.consensus.request.write.database.SetDataReplicationFactorPlan;
61
import org.apache.iotdb.confignode.consensus.request.write.database.SetSchemaReplicationFactorPlan;
62
import org.apache.iotdb.confignode.consensus.request.write.database.SetTTLPlan;
63
import org.apache.iotdb.confignode.consensus.request.write.database.SetTimePartitionIntervalPlan;
64
import org.apache.iotdb.confignode.consensus.request.write.datanode.RemoveDataNodePlan;
65
import org.apache.iotdb.confignode.consensus.request.write.template.CreateSchemaTemplatePlan;
66
import org.apache.iotdb.confignode.consensus.response.auth.PermissionInfoResp;
67
import org.apache.iotdb.confignode.consensus.response.database.CountDatabaseResp;
68
import org.apache.iotdb.confignode.consensus.response.database.DatabaseSchemaResp;
69
import org.apache.iotdb.confignode.consensus.response.datanode.ConfigurationResp;
70
import org.apache.iotdb.confignode.consensus.response.datanode.DataNodeConfigurationResp;
71
import org.apache.iotdb.confignode.consensus.response.datanode.DataNodeRegisterResp;
72
import org.apache.iotdb.confignode.consensus.response.datanode.DataNodeToStatusResp;
73
import org.apache.iotdb.confignode.consensus.response.partition.DataPartitionResp;
74
import org.apache.iotdb.confignode.consensus.response.partition.RegionInfoListResp;
75
import org.apache.iotdb.confignode.consensus.response.partition.SchemaNodeManagementResp;
76
import org.apache.iotdb.confignode.consensus.response.partition.SchemaPartitionResp;
77
import org.apache.iotdb.confignode.consensus.response.template.TemplateSetInfoResp;
78
import org.apache.iotdb.confignode.consensus.statemachine.ConfigRegionStateMachine;
79
import org.apache.iotdb.confignode.manager.consensus.ConsensusManager;
80
import org.apache.iotdb.confignode.manager.cq.CQManager;
81
import org.apache.iotdb.confignode.manager.load.LoadManager;
82
import org.apache.iotdb.confignode.manager.load.cache.node.NodeHeartbeatSample;
83
import org.apache.iotdb.confignode.manager.node.ClusterNodeStartUtils;
84
import org.apache.iotdb.confignode.manager.node.NodeManager;
85
import org.apache.iotdb.confignode.manager.node.NodeMetrics;
86
import org.apache.iotdb.confignode.manager.partition.PartitionManager;
87
import org.apache.iotdb.confignode.manager.partition.PartitionMetrics;
88
import org.apache.iotdb.confignode.manager.pipe.PipeManager;
89
import org.apache.iotdb.confignode.manager.schema.ClusterSchemaManager;
90
import org.apache.iotdb.confignode.manager.schema.ClusterSchemaQuotaStatistics;
91
import org.apache.iotdb.confignode.persistence.AuthorInfo;
92
import org.apache.iotdb.confignode.persistence.ModelInfo;
93
import org.apache.iotdb.confignode.persistence.ProcedureInfo;
94
import org.apache.iotdb.confignode.persistence.TriggerInfo;
95
import org.apache.iotdb.confignode.persistence.UDFInfo;
96
import org.apache.iotdb.confignode.persistence.cq.CQInfo;
97
import org.apache.iotdb.confignode.persistence.executor.ConfigPlanExecutor;
98
import org.apache.iotdb.confignode.persistence.node.NodeInfo;
99
import org.apache.iotdb.confignode.persistence.partition.PartitionInfo;
100
import org.apache.iotdb.confignode.persistence.pipe.PipeInfo;
101
import org.apache.iotdb.confignode.persistence.quota.QuotaInfo;
102
import org.apache.iotdb.confignode.persistence.schema.ClusterSchemaInfo;
103
import org.apache.iotdb.confignode.rpc.thrift.TAlterLogicalViewReq;
104
import org.apache.iotdb.confignode.rpc.thrift.TAlterSchemaTemplateReq;
105
import org.apache.iotdb.confignode.rpc.thrift.TClusterParameters;
106
import org.apache.iotdb.confignode.rpc.thrift.TConfigNodeRegisterReq;
107
import org.apache.iotdb.confignode.rpc.thrift.TConfigNodeRegisterResp;
108
import org.apache.iotdb.confignode.rpc.thrift.TCountTimeSlotListReq;
109
import org.apache.iotdb.confignode.rpc.thrift.TCountTimeSlotListResp;
110
import org.apache.iotdb.confignode.rpc.thrift.TCreateCQReq;
111
import org.apache.iotdb.confignode.rpc.thrift.TCreateFunctionReq;
112
import org.apache.iotdb.confignode.rpc.thrift.TCreateModelReq;
113
import org.apache.iotdb.confignode.rpc.thrift.TCreatePipePluginReq;
114
import org.apache.iotdb.confignode.rpc.thrift.TCreatePipeReq;
115
import org.apache.iotdb.confignode.rpc.thrift.TCreateSchemaTemplateReq;
116
import org.apache.iotdb.confignode.rpc.thrift.TCreateTriggerReq;
117
import org.apache.iotdb.confignode.rpc.thrift.TDataNodeRegisterReq;
118
import org.apache.iotdb.confignode.rpc.thrift.TDataNodeRestartReq;
119
import org.apache.iotdb.confignode.rpc.thrift.TDataNodeRestartResp;
120
import org.apache.iotdb.confignode.rpc.thrift.TDataPartitionTableResp;
121
import org.apache.iotdb.confignode.rpc.thrift.TDatabaseSchema;
122
import org.apache.iotdb.confignode.rpc.thrift.TDeactivateSchemaTemplateReq;
123
import org.apache.iotdb.confignode.rpc.thrift.TDeleteLogicalViewReq;
124
import org.apache.iotdb.confignode.rpc.thrift.TDeleteTimeSeriesReq;
125
import org.apache.iotdb.confignode.rpc.thrift.TDropCQReq;
126
import org.apache.iotdb.confignode.rpc.thrift.TDropModelReq;
127
import org.apache.iotdb.confignode.rpc.thrift.TDropTriggerReq;
128
import org.apache.iotdb.confignode.rpc.thrift.TGetAllPipeInfoResp;
129
import org.apache.iotdb.confignode.rpc.thrift.TGetAllTemplatesResp;
130
import org.apache.iotdb.confignode.rpc.thrift.TGetDataNodeLocationsResp;
131
import org.apache.iotdb.confignode.rpc.thrift.TGetJarInListReq;
132
import org.apache.iotdb.confignode.rpc.thrift.TGetJarInListResp;
133
import org.apache.iotdb.confignode.rpc.thrift.TGetLocationForTriggerResp;
134
import org.apache.iotdb.confignode.rpc.thrift.TGetPathsSetTemplatesResp;
135
import org.apache.iotdb.confignode.rpc.thrift.TGetPipePluginTableResp;
136
import org.apache.iotdb.confignode.rpc.thrift.TGetRegionIdReq;
137
import org.apache.iotdb.confignode.rpc.thrift.TGetRegionIdResp;
138
import org.apache.iotdb.confignode.rpc.thrift.TGetSeriesSlotListReq;
139
import org.apache.iotdb.confignode.rpc.thrift.TGetSeriesSlotListResp;
140
import org.apache.iotdb.confignode.rpc.thrift.TGetTemplateResp;
141
import org.apache.iotdb.confignode.rpc.thrift.TGetTimeSlotListReq;
142
import org.apache.iotdb.confignode.rpc.thrift.TGetTimeSlotListResp;
143
import org.apache.iotdb.confignode.rpc.thrift.TGetTriggerTableResp;
144
import org.apache.iotdb.confignode.rpc.thrift.TGetUDFTableResp;
145
import org.apache.iotdb.confignode.rpc.thrift.TMigrateRegionReq;
146
import org.apache.iotdb.confignode.rpc.thrift.TNodeVersionInfo;
147
import org.apache.iotdb.confignode.rpc.thrift.TPermissionInfoResp;
148
import org.apache.iotdb.confignode.rpc.thrift.TRegionMigrateResultReportReq;
149
import org.apache.iotdb.confignode.rpc.thrift.TRegionRouteMapResp;
150
import org.apache.iotdb.confignode.rpc.thrift.TSchemaNodeManagementResp;
151
import org.apache.iotdb.confignode.rpc.thrift.TSchemaPartitionTableResp;
152
import org.apache.iotdb.confignode.rpc.thrift.TSetDataNodeStatusReq;
153
import org.apache.iotdb.confignode.rpc.thrift.TSetSchemaTemplateReq;
154
import org.apache.iotdb.confignode.rpc.thrift.TShowCQResp;
155
import org.apache.iotdb.confignode.rpc.thrift.TShowClusterResp;
156
import org.apache.iotdb.confignode.rpc.thrift.TShowConfigNodesResp;
157
import org.apache.iotdb.confignode.rpc.thrift.TShowDataNodesResp;
158
import org.apache.iotdb.confignode.rpc.thrift.TShowDatabaseResp;
159
import org.apache.iotdb.confignode.rpc.thrift.TShowModelReq;
160
import org.apache.iotdb.confignode.rpc.thrift.TShowModelResp;
161
import org.apache.iotdb.confignode.rpc.thrift.TShowPipeReq;
162
import org.apache.iotdb.confignode.rpc.thrift.TShowPipeResp;
163
import org.apache.iotdb.confignode.rpc.thrift.TShowThrottleReq;
164
import org.apache.iotdb.confignode.rpc.thrift.TShowTrailReq;
165
import org.apache.iotdb.confignode.rpc.thrift.TShowTrailResp;
166
import org.apache.iotdb.confignode.rpc.thrift.TShowVariablesResp;
167
import org.apache.iotdb.confignode.rpc.thrift.TSpaceQuotaResp;
168
import org.apache.iotdb.confignode.rpc.thrift.TThrottleQuotaResp;
169
import org.apache.iotdb.confignode.rpc.thrift.TTimeSlotList;
170
import org.apache.iotdb.confignode.rpc.thrift.TUnsetSchemaTemplateReq;
171
import org.apache.iotdb.confignode.rpc.thrift.TUpdateModelInfoReq;
172
import org.apache.iotdb.confignode.rpc.thrift.TUpdateModelStateReq;
173
import org.apache.iotdb.consensus.common.DataSet;
174
import org.apache.iotdb.consensus.exception.ConsensusException;
175
import org.apache.iotdb.db.schemaengine.template.Template;
176
import org.apache.iotdb.db.schemaengine.template.TemplateAlterOperationType;
177
import org.apache.iotdb.db.schemaengine.template.alter.TemplateAlterOperationUtil;
178
import org.apache.iotdb.rpc.RpcUtils;
179
import org.apache.iotdb.rpc.TSStatusCode;
180
import org.apache.iotdb.tsfile.utils.Pair;
181

182
import org.slf4j.Logger;
183
import org.slf4j.LoggerFactory;
184

185
import java.io.IOException;
186
import java.nio.ByteBuffer;
187
import java.util.ArrayList;
188
import java.util.Arrays;
189
import java.util.Collection;
190
import java.util.Collections;
191
import java.util.Comparator;
192
import java.util.HashMap;
193
import java.util.HashSet;
194
import java.util.List;
195
import java.util.Map;
196
import java.util.Set;
197
import java.util.concurrent.atomic.AtomicReference;
198
import java.util.stream.Collectors;
199

200
import static org.apache.iotdb.commons.conf.IoTDBConstant.ONE_LEVEL_PATH_WILDCARD;
201

202
/** Entry of all management, AssignPartitionManager, AssignRegionManager. */
203
public class ConfigManager implements IManager {
204

205
  private static final Logger LOGGER = LoggerFactory.getLogger(ConfigManager.class);
1✔
206

207
  private static final ConfigNodeConfig CONF = ConfigNodeDescriptor.getInstance().getConf();
1✔
208
  private static final CommonConfig COMMON_CONF = CommonDescriptor.getInstance().getConfig();
1✔
209

210
  /** Manage PartitionTable read/write requests through the ConsensusLayer. */
211
  private final AtomicReference<ConsensusManager> consensusManager = new AtomicReference<>();
×
212

213
  /** Manage cluster node. */
214
  private final NodeManager nodeManager;
215

216
  /** Manage cluster schemaengine. */
217
  private final ClusterSchemaManager clusterSchemaManager;
218

219
  /** Manage cluster regions and partitions. */
220
  private final PartitionManager partitionManager;
221

222
  /** Manage cluster authorization. */
223
  private final PermissionManager permissionManager;
224

225
  private final LoadManager loadManager;
226

227
  /** Manage procedure. */
228
  private final ProcedureManager procedureManager;
229

230
  /** UDF. */
231
  private final UDFManager udfManager;
232

233
  /** Manage Trigger. */
234
  private final TriggerManager triggerManager;
235

236
  /** CQ. */
237
  private final CQManager cqManager;
238

239
  /** ML Model. */
240
  private final ModelManager modelManager;
241

242
  /** Pipe */
243
  private final PipeManager pipeManager;
244

245
  /** Manage quotas */
246
  private final ClusterQuotaManager clusterQuotaManager;
247

248
  private final ConfigRegionStateMachine stateMachine;
249

250
  private final RetryFailedTasksThread retryFailedTasksThread;
251

252
  private static final String DATABASE = "\tDatabase=";
253

254
  public ConfigManager() throws IOException {
×
255
    // Build the persistence module
256
    NodeInfo nodeInfo = new NodeInfo();
×
257
    ClusterSchemaInfo clusterSchemaInfo = new ClusterSchemaInfo();
×
258
    PartitionInfo partitionInfo = new PartitionInfo();
×
259
    AuthorInfo authorInfo = new AuthorInfo();
×
260
    ProcedureInfo procedureInfo = new ProcedureInfo();
×
261
    UDFInfo udfInfo = new UDFInfo();
×
262
    TriggerInfo triggerInfo = new TriggerInfo();
×
263
    CQInfo cqInfo = new CQInfo();
×
264
    ModelInfo modelInfo = new ModelInfo();
×
265
    PipeInfo pipeInfo = new PipeInfo();
×
266
    QuotaInfo quotaInfo = new QuotaInfo();
×
267

268
    // Build state machine and executor
269
    ConfigPlanExecutor executor =
×
270
        new ConfigPlanExecutor(
271
            nodeInfo,
272
            clusterSchemaInfo,
273
            partitionInfo,
274
            authorInfo,
275
            procedureInfo,
276
            udfInfo,
277
            triggerInfo,
278
            cqInfo,
279
            modelInfo,
280
            pipeInfo,
281
            quotaInfo);
282
    this.stateMachine = new ConfigRegionStateMachine(this, executor);
×
283

284
    // Build the manager module
285
    this.nodeManager = new NodeManager(this, nodeInfo);
×
286
    this.clusterSchemaManager =
×
287
        new ClusterSchemaManager(this, clusterSchemaInfo, new ClusterSchemaQuotaStatistics());
288
    this.partitionManager = new PartitionManager(this, partitionInfo);
×
289
    this.permissionManager = new PermissionManager(this, authorInfo);
×
290
    this.procedureManager = new ProcedureManager(this, procedureInfo);
×
291
    this.udfManager = new UDFManager(this, udfInfo);
×
292
    this.triggerManager = new TriggerManager(this, triggerInfo);
×
293
    this.cqManager = new CQManager(this);
×
294
    this.modelManager = new ModelManager(this, modelInfo);
×
295
    this.pipeManager = new PipeManager(this, pipeInfo);
×
296

297
    // 1. keep PipeManager initialization before LoadManager initialization, because
298
    // LoadManager will register PipeManager as a listener.
299
    // 2. keep RetryFailedTasksThread initialization after LoadManager initialization,
300
    // because RetryFailedTasksThread will keep a reference of LoadManager.
301
    this.loadManager = new LoadManager(this);
×
302

303
    this.retryFailedTasksThread = new RetryFailedTasksThread(this);
×
304
    this.clusterQuotaManager = new ClusterQuotaManager(this, quotaInfo);
×
305
  }
×
306

307
  public void initConsensusManager() throws IOException {
308
    this.consensusManager.set(new ConsensusManager(this, this.stateMachine));
×
309
  }
×
310

311
  public void close() throws IOException {
312
    if (consensusManager.get() != null) {
×
313
      consensusManager.get().close();
×
314
    }
315
    if (partitionManager != null) {
×
316
      partitionManager.getRegionMaintainer().shutdown();
×
317
    }
318
    if (procedureManager != null) {
×
319
      procedureManager.shiftExecutor(false);
×
320
    }
321
  }
×
322

323
  @Override
324
  public DataSet getSystemConfiguration() {
325
    TSStatus status = confirmLeader();
×
326
    ConfigurationResp dataSet;
327
    // Notice: The Seed-ConfigNode must also have the privilege to give system configuration.
328
    // Otherwise, the IoTDB-cluster will not have the ability to restart from scratch.
329
    if (status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()
×
330
        || ConfigNodeDescriptor.getInstance().isSeedConfigNode()
×
331
        || SystemPropertiesUtils.isSeedConfigNode()) {
×
332
      dataSet = (ConfigurationResp) nodeManager.getSystemConfiguration();
×
333
    } else {
334
      dataSet = new ConfigurationResp();
×
335
      dataSet.setStatus(status);
×
336
    }
337
    return dataSet;
×
338
  }
339

340
  @Override
341
  public DataSet registerDataNode(TDataNodeRegisterReq req) {
342
    TSStatus status = confirmLeader();
×
343
    if (status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
×
344
      status =
×
345
          ClusterNodeStartUtils.confirmNodeRegistration(
×
346
              NodeType.DataNode,
347
              req.getClusterName(),
×
348
              req.getDataNodeConfiguration().getLocation(),
×
349
              this);
350
      if (status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
×
351
        return nodeManager.registerDataNode(req);
×
352
      }
353
    }
354
    DataNodeRegisterResp resp = new DataNodeRegisterResp();
×
355
    resp.setStatus(status);
×
356
    resp.setConfigNodeList(getNodeManager().getRegisteredConfigNodes());
×
357
    return resp;
×
358
  }
359

360
  @Override
361
  public TDataNodeRestartResp restartDataNode(TDataNodeRestartReq req) {
362
    TSStatus status = confirmLeader();
×
363
    if (status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
×
364
      status =
×
365
          ClusterNodeStartUtils.confirmNodeRestart(
×
366
              NodeType.DataNode,
367
              req.getClusterName(),
×
368
              req.getDataNodeConfiguration().getLocation().getDataNodeId(),
×
369
              req.getDataNodeConfiguration().getLocation(),
×
370
              this);
371
      if (status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
×
372
        return nodeManager.updateDataNodeIfNecessary(req);
×
373
      }
374
    }
375
    return new TDataNodeRestartResp()
×
376
        .setStatus(status)
×
377
        .setConfigNodeList(getNodeManager().getRegisteredConfigNodes());
×
378
  }
379

380
  @Override
381
  public DataSet removeDataNode(RemoveDataNodePlan removeDataNodePlan) {
382
    TSStatus status = confirmLeader();
×
383
    if (status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
×
384
      return nodeManager.removeDataNode(removeDataNodePlan);
×
385
    } else {
386
      DataNodeToStatusResp dataSet = new DataNodeToStatusResp();
×
387
      dataSet.setStatus(status);
×
388
      return dataSet;
×
389
    }
390
  }
391

392
  @Override
393
  public TSStatus reportDataNodeShutdown(TDataNodeLocation dataNodeLocation) {
394
    TSStatus status = confirmLeader();
×
395
    if (status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
×
396
      // Force updating the target DataNode's status to Unknown
397
      getLoadManager()
×
398
          .forceUpdateNodeCache(
×
399
              NodeType.DataNode,
400
              dataNodeLocation.getDataNodeId(),
×
401
              NodeHeartbeatSample.generateDefaultSample(NodeStatus.Unknown));
×
402
      LOGGER.info(
×
403
          "[ShutdownHook] The DataNode-{} will be shutdown soon, mark it as Unknown",
404
          dataNodeLocation.getDataNodeId());
×
405
    }
406
    return status;
×
407
  }
408

409
  @Override
410
  public TSStatus reportRegionMigrateResult(TRegionMigrateResultReportReq req) {
411
    TSStatus status = confirmLeader();
×
412
    if (status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
×
413
      procedureManager.reportRegionMigrateResult(req);
×
414
    }
415
    return status;
×
416
  }
417

418
  @Override
419
  public DataSet getDataNodeConfiguration(
420
      GetDataNodeConfigurationPlan getDataNodeConfigurationPlan) {
421
    TSStatus status = confirmLeader();
×
422
    if (status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
×
423
      return nodeManager.getDataNodeConfiguration(getDataNodeConfigurationPlan);
×
424
    } else {
425
      DataNodeConfigurationResp dataSet = new DataNodeConfigurationResp();
×
426
      dataSet.setStatus(status);
×
427
      return dataSet;
×
428
    }
429
  }
430

431
  @Override
432
  public TShowClusterResp showCluster() {
433
    TSStatus status = confirmLeader();
×
434
    if (status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
×
435
      List<TConfigNodeLocation> configNodeLocations = getNodeManager().getRegisteredConfigNodes();
×
436
      configNodeLocations.sort(Comparator.comparingInt(TConfigNodeLocation::getConfigNodeId));
×
437
      List<TDataNodeLocation> dataNodeInfoLocations =
×
438
          getNodeManager().getRegisteredDataNodes().stream()
×
439
              .map(TDataNodeConfiguration::getLocation)
×
440
              .sorted(Comparator.comparingInt(TDataNodeLocation::getDataNodeId))
×
441
              .collect(Collectors.toList());
×
442
      Map<Integer, String> nodeStatus = getLoadManager().getNodeStatusWithReason();
×
443
      Map<Integer, TNodeVersionInfo> nodeVersionInfo = getNodeManager().getNodeVersionInfo();
×
444
      return new TShowClusterResp(
×
445
          status, configNodeLocations, dataNodeInfoLocations, nodeStatus, nodeVersionInfo);
446
    } else {
447
      return new TShowClusterResp(
×
448
          status, new ArrayList<>(), new ArrayList<>(), new HashMap<>(), new HashMap<>());
449
    }
450
  }
451

452
  @Override
453
  public TShowVariablesResp showVariables() {
454
    TSStatus status = confirmLeader();
×
455
    TShowVariablesResp resp = new TShowVariablesResp();
×
456
    resp.setStatus(status);
×
457
    if (status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
×
458
      resp.setClusterParameters(getClusterParameters());
×
459
    }
460
    return resp;
×
461
  }
462

463
  public TClusterParameters getClusterParameters() {
464
    TClusterParameters clusterParameters = new TClusterParameters();
×
465
    clusterParameters.setClusterName(CONF.getClusterName());
×
466
    clusterParameters.setConfigNodeConsensusProtocolClass(
×
467
        CONF.getConfigNodeConsensusProtocolClass());
×
468
    clusterParameters.setDataRegionConsensusProtocolClass(
×
469
        CONF.getDataRegionConsensusProtocolClass());
×
470
    clusterParameters.setSchemaRegionConsensusProtocolClass(
×
471
        CONF.getSchemaRegionConsensusProtocolClass());
×
472
    clusterParameters.setSeriesPartitionSlotNum(CONF.getSeriesSlotNum());
×
473
    clusterParameters.setSeriesPartitionExecutorClass(CONF.getSeriesPartitionExecutorClass());
×
474
    clusterParameters.setDefaultTTL(COMMON_CONF.getDefaultTTLInMs());
×
475
    clusterParameters.setTimePartitionInterval(COMMON_CONF.getTimePartitionInterval());
×
476
    clusterParameters.setDataReplicationFactor(CONF.getDataReplicationFactor());
×
477
    clusterParameters.setSchemaReplicationFactor(CONF.getSchemaReplicationFactor());
×
478
    clusterParameters.setDataRegionPerDataNode(CONF.getDataRegionPerDataNode());
×
479
    clusterParameters.setSchemaRegionPerDataNode(CONF.getSchemaRegionPerDataNode());
×
480
    clusterParameters.setDiskSpaceWarningThreshold(COMMON_CONF.getDiskSpaceWarningThreshold());
×
481
    clusterParameters.setReadConsistencyLevel(CONF.getReadConsistencyLevel());
×
482
    clusterParameters.setTimestampPrecision(COMMON_CONF.getTimestampPrecision());
×
483
    clusterParameters.setSchemaEngineMode(COMMON_CONF.getSchemaEngineMode());
×
484
    clusterParameters.setTagAttributeTotalSize(COMMON_CONF.getTagAttributeTotalSize());
×
485
    clusterParameters.setDatabaseLimitThreshold(COMMON_CONF.getDatabaseLimitThreshold());
×
486
    return clusterParameters;
×
487
  }
488

489
  @Override
490
  public TSStatus setTTL(SetTTLPlan setTTLPlan) {
491
    TSStatus status = confirmLeader();
×
492
    if (status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
×
493
      return clusterSchemaManager.setTTL(setTTLPlan);
×
494
    } else {
495
      return status;
×
496
    }
497
  }
498

499
  @Override
500
  public TSStatus setSchemaReplicationFactor(
501
      SetSchemaReplicationFactorPlan setSchemaReplicationFactorPlan) {
502
    TSStatus status = confirmLeader();
×
503
    if (status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
×
504
      return clusterSchemaManager.setSchemaReplicationFactor(setSchemaReplicationFactorPlan);
×
505
    } else {
506
      return status;
×
507
    }
508
  }
509

510
  @Override
511
  public TSStatus setDataReplicationFactor(
512
      SetDataReplicationFactorPlan setDataReplicationFactorPlan) {
513
    TSStatus status = confirmLeader();
×
514
    if (status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
×
515
      return clusterSchemaManager.setDataReplicationFactor(setDataReplicationFactorPlan);
×
516
    } else {
517
      return status;
×
518
    }
519
  }
520

521
  @Override
522
  public TSStatus setTimePartitionInterval(
523
      SetTimePartitionIntervalPlan setTimePartitionIntervalPlan) {
524
    TSStatus status = confirmLeader();
×
525
    if (status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
×
526
      return clusterSchemaManager.setTimePartitionInterval(setTimePartitionIntervalPlan);
×
527
    } else {
528
      return status;
×
529
    }
530
  }
531

532
  @Override
533
  public DataSet countMatchedDatabases(CountDatabasePlan countDatabasePlan) {
534
    TSStatus status = confirmLeader();
×
535
    CountDatabaseResp result = new CountDatabaseResp();
×
536
    if (status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
×
537
      return clusterSchemaManager.countMatchedDatabases(countDatabasePlan);
×
538
    } else {
539
      result.setStatus(status);
×
540
    }
541
    return result;
×
542
  }
543

544
  @Override
545
  public DataSet getMatchedDatabaseSchemas(GetDatabasePlan getDatabaseReq) {
546
    TSStatus status = confirmLeader();
×
547
    if (status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
×
548
      return clusterSchemaManager.getMatchedDatabaseSchema(getDatabaseReq);
×
549
    } else {
550
      DatabaseSchemaResp dataSet = new DatabaseSchemaResp();
×
551
      dataSet.setStatus(status);
×
552
      return dataSet;
×
553
    }
554
  }
555

556
  @Override
557
  public synchronized TSStatus setDatabase(DatabaseSchemaPlan databaseSchemaPlan) {
558
    TSStatus status = confirmLeader();
×
559
    if (status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
×
560
      return clusterSchemaManager.setDatabase(databaseSchemaPlan);
×
561
    } else {
562
      return status;
×
563
    }
564
  }
565

566
  @Override
567
  public TSStatus alterDatabase(DatabaseSchemaPlan databaseSchemaPlan) {
568
    TSStatus status = confirmLeader();
×
569
    if (status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
×
570
      return clusterSchemaManager.alterDatabase(databaseSchemaPlan);
×
571
    } else {
572
      return status;
×
573
    }
574
  }
575

576
  @Override
577
  public synchronized TSStatus deleteDatabases(List<String> deletedPaths) {
578
    TSStatus status = confirmLeader();
×
579
    if (status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
×
580
      // remove wild
581
      Map<String, TDatabaseSchema> deleteDatabaseSchemaMap =
×
582
          getClusterSchemaManager().getMatchedDatabaseSchemasByName(deletedPaths);
×
583
      if (deleteDatabaseSchemaMap.isEmpty()) {
×
584
        return RpcUtils.getStatus(
×
585
            TSStatusCode.PATH_NOT_EXIST.getStatusCode(),
×
586
            String.format("Path %s does not exist", Arrays.toString(deletedPaths.toArray())));
×
587
      }
588
      ArrayList<TDatabaseSchema> parsedDeleteDatabases =
×
589
          new ArrayList<>(deleteDatabaseSchemaMap.values());
×
590
      return procedureManager.deleteDatabases(parsedDeleteDatabases);
×
591
    } else {
592
      return status;
×
593
    }
594
  }
595

596
  private List<TSeriesPartitionSlot> calculateRelatedSlot(PartialPath path, PartialPath database) {
597
    // The path contains `**`
598
    if (path.getFullPath().contains(IoTDBConstant.MULTI_LEVEL_PATH_WILDCARD)) {
×
599
      return new ArrayList<>();
×
600
    }
601
    List<PartialPath> innerPathList = path.alterPrefixPath(database);
×
602
    if (innerPathList.size() == 0) {
×
603
      return new ArrayList<>();
×
604
    }
605
    PartialPath innerPath = innerPathList.get(0);
×
606
    // The innerPath contains `*` and the only `*` is not in last level
607
    if (innerPath.getDevice().contains(IoTDBConstant.ONE_LEVEL_PATH_WILDCARD)) {
×
608
      return new ArrayList<>();
×
609
    }
610
    return Collections.singletonList(
×
611
        getPartitionManager().getSeriesPartitionSlot(innerPath.getDevice()));
×
612
  }
613

614
  @Override
615
  public TSchemaPartitionTableResp getSchemaPartition(PathPatternTree patternTree) {
616
    // Construct empty response
617
    TSchemaPartitionTableResp resp = new TSchemaPartitionTableResp();
×
618

619
    TSStatus status = confirmLeader();
×
620
    if (status.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
×
621
      return resp.setStatus(status);
×
622
    }
623

624
    // Build GetSchemaPartitionPlan
625
    Map<String, Set<TSeriesPartitionSlot>> partitionSlotsMap = new HashMap<>();
×
626
    List<PartialPath> relatedPaths = patternTree.getAllPathPatterns();
×
627
    List<String> allDatabases = getClusterSchemaManager().getDatabaseNames();
×
628
    List<PartialPath> allDatabasePaths = new ArrayList<>();
×
629
    for (String database : allDatabases) {
×
630
      try {
631
        allDatabasePaths.add(new PartialPath(database));
×
632
      } catch (IllegalPathException e) {
×
633
        throw new RuntimeException(e);
×
634
      }
×
635
    }
×
636
    Map<String, Boolean> scanAllRegions = new HashMap<>();
×
637
    for (PartialPath path : relatedPaths) {
×
638
      for (int i = 0; i < allDatabases.size(); i++) {
×
639
        String database = allDatabases.get(i);
×
640
        PartialPath databasePath = allDatabasePaths.get(i);
×
641
        if (path.overlapWithFullPathPrefix(databasePath) && !scanAllRegions.containsKey(database)) {
×
642
          List<TSeriesPartitionSlot> relatedSlot = calculateRelatedSlot(path, databasePath);
×
643
          if (relatedSlot.isEmpty()) {
×
644
            scanAllRegions.put(database, true);
×
645
            partitionSlotsMap.put(database, new HashSet<>());
×
646
          } else {
647
            partitionSlotsMap.computeIfAbsent(database, k -> new HashSet<>()).addAll(relatedSlot);
×
648
          }
649
        }
650
      }
651
    }
×
652

653
    // Return empty resp if the partitionSlotsMap is empty
654
    if (partitionSlotsMap.isEmpty()) {
×
655
      return resp.setStatus(StatusUtils.OK).setSchemaPartitionTable(new HashMap<>());
×
656
    }
657

658
    GetSchemaPartitionPlan getSchemaPartitionPlan =
×
659
        new GetSchemaPartitionPlan(
660
            partitionSlotsMap.entrySet().stream()
×
661
                .collect(Collectors.toMap(Map.Entry::getKey, e -> new ArrayList<>(e.getValue()))));
×
662
    SchemaPartitionResp queryResult = partitionManager.getSchemaPartition(getSchemaPartitionPlan);
×
663
    resp = queryResult.convertToRpcSchemaPartitionTableResp();
×
664

665
    LOGGER.debug("GetSchemaPartition receive paths: {}, return: {}", relatedPaths, resp);
×
666

667
    return resp;
×
668
  }
669

670
  @Override
671
  public TSchemaPartitionTableResp getOrCreateSchemaPartition(PathPatternTree patternTree) {
672
    // Construct empty response
673
    TSchemaPartitionTableResp resp = new TSchemaPartitionTableResp();
×
674

675
    TSStatus status = confirmLeader();
×
676
    if (status.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
×
677
      return resp.setStatus(status);
×
678
    }
679

680
    List<String> devicePaths = patternTree.getAllDevicePatterns();
×
681
    List<String> databases = getClusterSchemaManager().getDatabaseNames();
×
682

683
    // Build GetOrCreateSchemaPartitionPlan
684
    Map<String, List<TSeriesPartitionSlot>> partitionSlotsMap = new HashMap<>();
×
685
    for (String devicePath : devicePaths) {
×
686
      if (!devicePath.contains("*")) {
×
687
        // Only check devicePaths that without "*"
688
        for (String database : databases) {
×
689
          if (PathUtils.isStartWith(devicePath, database)) {
×
690
            partitionSlotsMap
×
691
                .computeIfAbsent(database, key -> new ArrayList<>())
×
692
                .add(getPartitionManager().getSeriesPartitionSlot(devicePath));
×
693
            break;
×
694
          }
695
        }
×
696
      }
697
    }
×
698
    GetOrCreateSchemaPartitionPlan getOrCreateSchemaPartitionPlan =
×
699
        new GetOrCreateSchemaPartitionPlan(partitionSlotsMap);
700

701
    SchemaPartitionResp queryResult =
×
702
        partitionManager.getOrCreateSchemaPartition(getOrCreateSchemaPartitionPlan);
×
703
    resp = queryResult.convertToRpcSchemaPartitionTableResp();
×
704

705
    if (CONF.isEnablePrintingNewlyCreatedPartition()) {
×
706
      printNewCreatedSchemaPartition(devicePaths, resp);
×
707
    }
708

709
    return resp;
×
710
  }
711

712
  private void printNewCreatedSchemaPartition(
713
      List<String> devicePaths, TSchemaPartitionTableResp resp) {
714
    final String lineSeparator = System.lineSeparator();
×
715
    StringBuilder devicePathString = new StringBuilder("{");
×
716
    for (String devicePath : devicePaths) {
×
717
      devicePathString.append(lineSeparator).append("\t").append(devicePath).append(",");
×
718
    }
×
719
    devicePathString.append(lineSeparator).append("}");
×
720

721
    StringBuilder schemaPartitionRespString = new StringBuilder("{");
×
722
    schemaPartitionRespString
×
723
        .append(lineSeparator)
×
724
        .append("\tTSStatus=")
×
725
        .append(resp.getStatus().getCode())
×
726
        .append(",");
×
727
    Map<String, Map<TSeriesPartitionSlot, TConsensusGroupId>> schemaPartitionTable =
×
728
        resp.getSchemaPartitionTable();
×
729
    for (Map.Entry<String, Map<TSeriesPartitionSlot, TConsensusGroupId>> databaseEntry :
730
        schemaPartitionTable.entrySet()) {
×
731
      String database = databaseEntry.getKey();
×
732
      schemaPartitionRespString
×
733
          .append(lineSeparator)
×
734
          .append(DATABASE)
×
735
          .append(database)
×
736
          .append(": {");
×
737
      for (Map.Entry<TSeriesPartitionSlot, TConsensusGroupId> slotEntry :
738
          databaseEntry.getValue().entrySet()) {
×
739
        schemaPartitionRespString
×
740
            .append(lineSeparator)
×
741
            .append("\t\t")
×
742
            .append(slotEntry.getKey())
×
743
            .append(", ")
×
744
            .append(slotEntry.getValue())
×
745
            .append(",");
×
746
      }
×
747
      schemaPartitionRespString.append(lineSeparator).append("\t},");
×
748
    }
×
749
    schemaPartitionRespString.append(lineSeparator).append("}");
×
750
    LOGGER.info(
×
751
        "[GetOrCreateSchemaPartition]:{} Receive PathPatternTree: {}, Return TSchemaPartitionTableResp: {}",
752
        lineSeparator,
753
        devicePathString,
754
        schemaPartitionRespString);
755
  }
×
756

757
  @Override
758
  public TSchemaNodeManagementResp getNodePathsPartition(PartialPath partialPath, Integer level) {
759
    TSStatus status = confirmLeader();
×
760
    if (status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
×
761
      GetNodePathsPartitionPlan getNodePathsPartitionPlan = new GetNodePathsPartitionPlan();
×
762
      getNodePathsPartitionPlan.setPartialPath(partialPath);
×
763
      if (null != level) {
×
764
        getNodePathsPartitionPlan.setLevel(level);
×
765
      }
766
      SchemaNodeManagementResp resp =
×
767
          partitionManager.getNodePathsPartition(getNodePathsPartitionPlan);
×
768
      TSchemaNodeManagementResp result =
×
769
          resp.convertToRpcSchemaNodeManagementPartitionResp(
×
770
              getLoadManager().getRegionPriorityMap());
×
771

772
      LOGGER.info(
×
773
          "getNodePathsPartition receive devicePaths: {}, level: {}, return TSchemaNodeManagementResp: {}",
774
          partialPath,
775
          level,
776
          result);
777

778
      return result;
×
779
    } else {
780
      return new TSchemaNodeManagementResp().setStatus(status);
×
781
    }
782
  }
783

784
  @Override
785
  public TDataPartitionTableResp getDataPartition(GetDataPartitionPlan getDataPartitionPlan) {
786
    // Construct empty response
787
    TDataPartitionTableResp resp = new TDataPartitionTableResp();
×
788

789
    TSStatus status = confirmLeader();
×
790
    if (status.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
×
791
      return resp.setStatus(status);
×
792
    }
793
    DataPartitionResp queryResult = partitionManager.getDataPartition(getDataPartitionPlan);
×
794

795
    resp = queryResult.convertToTDataPartitionTableResp();
×
796

797
    LOGGER.debug(
×
798
        "GetDataPartition interface receive PartitionSlotsMap: {}, return: {}",
799
        getDataPartitionPlan.getPartitionSlotsMap(),
×
800
        resp);
801

802
    return resp;
×
803
  }
804

805
  @Override
806
  public TDataPartitionTableResp getOrCreateDataPartition(
807
      GetOrCreateDataPartitionPlan getOrCreateDataPartitionPlan) {
808
    // Construct empty response
809
    TDataPartitionTableResp resp = new TDataPartitionTableResp();
×
810

811
    TSStatus status = confirmLeader();
×
812
    if (status.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
×
813
      return resp.setStatus(status);
×
814
    }
815

816
    DataPartitionResp queryResult =
×
817
        partitionManager.getOrCreateDataPartition(getOrCreateDataPartitionPlan);
×
818
    resp = queryResult.convertToTDataPartitionTableResp();
×
819

820
    if (CONF.isEnablePrintingNewlyCreatedPartition()) {
×
821
      printNewCreatedDataPartition(getOrCreateDataPartitionPlan, resp);
×
822
    }
823

824
    return resp;
×
825
  }
826

827
  private void printNewCreatedDataPartition(
828
      GetOrCreateDataPartitionPlan getOrCreateDataPartitionPlan, TDataPartitionTableResp resp) {
829
    final String lineSeparator = System.lineSeparator();
×
830
    StringBuilder partitionSlotsMapString = new StringBuilder("{");
×
831
    for (Map.Entry<String, Map<TSeriesPartitionSlot, TTimeSlotList>> databaseEntry :
832
        getOrCreateDataPartitionPlan.getPartitionSlotsMap().entrySet()) {
×
833
      String database = databaseEntry.getKey();
×
834
      partitionSlotsMapString.append(lineSeparator).append(DATABASE).append(database).append(": {");
×
835
      for (Map.Entry<TSeriesPartitionSlot, TTimeSlotList> slotEntry :
836
          databaseEntry.getValue().entrySet()) {
×
837
        partitionSlotsMapString
×
838
            .append(lineSeparator)
×
839
            .append("\t\t")
×
840
            .append(slotEntry.getKey())
×
841
            .append(",")
×
842
            .append(slotEntry.getValue());
×
843
      }
×
844
      partitionSlotsMapString.append(lineSeparator).append("\t},");
×
845
    }
×
846
    partitionSlotsMapString.append(lineSeparator).append("}");
×
847

848
    StringBuilder dataPartitionRespString = new StringBuilder("{");
×
849
    dataPartitionRespString
×
850
        .append(lineSeparator)
×
851
        .append("\tTSStatus=")
×
852
        .append(resp.getStatus().getCode())
×
853
        .append(",");
×
854
    Map<String, Map<TSeriesPartitionSlot, Map<TTimePartitionSlot, List<TConsensusGroupId>>>>
855
        dataPartitionTable = resp.getDataPartitionTable();
×
856
    for (Map.Entry<
857
            String, Map<TSeriesPartitionSlot, Map<TTimePartitionSlot, List<TConsensusGroupId>>>>
858
        databaseEntry : dataPartitionTable.entrySet()) {
×
859
      String database = databaseEntry.getKey();
×
860
      dataPartitionRespString.append(lineSeparator).append(DATABASE).append(database).append(": {");
×
861
      for (Map.Entry<TSeriesPartitionSlot, Map<TTimePartitionSlot, List<TConsensusGroupId>>>
862
          seriesSlotEntry : databaseEntry.getValue().entrySet()) {
×
863
        dataPartitionRespString
×
864
            .append(lineSeparator)
×
865
            .append("\t\t")
×
866
            .append(seriesSlotEntry.getKey())
×
867
            .append(": {");
×
868
        for (Map.Entry<TTimePartitionSlot, List<TConsensusGroupId>> timeSlotEntry :
869
            seriesSlotEntry.getValue().entrySet()) {
×
870
          dataPartitionRespString
×
871
              .append(lineSeparator)
×
872
              .append("\t\t\t")
×
873
              .append(timeSlotEntry.getKey())
×
874
              .append(", ")
×
875
              .append(timeSlotEntry.getValue())
×
876
              .append(",");
×
877
        }
×
878
        dataPartitionRespString.append(lineSeparator).append("\t\t},");
×
879
      }
×
880
      dataPartitionRespString.append(lineSeparator).append("\t}");
×
881
    }
×
882
    dataPartitionRespString.append(lineSeparator).append("}");
×
883

884
    LOGGER.info(
×
885
        "[GetOrCreateDataPartition]:{} Receive PartitionSlotsMap: {}, Return TDataPartitionTableResp: {}",
886
        lineSeparator,
887
        partitionSlotsMapString,
888
        dataPartitionRespString);
889
  }
×
890

891
  private TSStatus confirmLeader() {
892
    // Make sure the consensus layer has been initialized
893
    if (getConsensusManager() == null) {
×
894
      return new TSStatus(TSStatusCode.CONSENSUS_NOT_INITIALIZED.getStatusCode())
×
895
          .setMessage(
×
896
              "ConsensusManager of target-ConfigNode is not initialized, "
897
                  + "please make sure the target-ConfigNode has been started successfully.");
898
    }
899
    return getConsensusManager().confirmLeader();
×
900
  }
901

902
  @Override
903
  public NodeManager getNodeManager() {
904
    return nodeManager;
×
905
  }
906

907
  @Override
908
  public ClusterSchemaManager getClusterSchemaManager() {
909
    return clusterSchemaManager;
×
910
  }
911

912
  @Override
913
  public ConsensusManager getConsensusManager() {
914
    return consensusManager.get();
×
915
  }
916

917
  @Override
918
  public PartitionManager getPartitionManager() {
919
    return partitionManager;
×
920
  }
921

922
  @Override
923
  public LoadManager getLoadManager() {
924
    return loadManager;
×
925
  }
926

927
  @Override
928
  public TriggerManager getTriggerManager() {
929
    return triggerManager;
×
930
  }
931

932
  @Override
933
  public ModelManager getModelManager() {
934
    return modelManager;
×
935
  }
936

937
  @Override
938
  public PipeManager getPipeManager() {
939
    return pipeManager;
×
940
  }
941

942
  @Override
943
  public TSStatus operatePermission(AuthorPlan authorPlan) {
944
    TSStatus status = confirmLeader();
×
945
    if (status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
×
946
      return permissionManager.operatePermission(authorPlan);
×
947
    } else {
948
      return status;
×
949
    }
950
  }
951

952
  @Override
953
  public DataSet queryPermission(AuthorPlan authorPlan) {
954
    TSStatus status = confirmLeader();
×
955
    if (status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
×
956
      return permissionManager.queryPermission(authorPlan);
×
957
    } else {
958
      PermissionInfoResp dataSet = new PermissionInfoResp();
×
959
      dataSet.setStatus(status);
×
960
      return dataSet;
×
961
    }
962
  }
963

964
  @Override
965
  public TPermissionInfoResp login(String username, String password) {
966
    TSStatus status = confirmLeader();
×
967
    if (status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
×
968
      return permissionManager.login(username, password);
×
969
    } else {
970
      TPermissionInfoResp resp = AuthUtils.generateEmptyPermissionInfoResp();
×
971
      resp.setStatus(status);
×
972
      return resp;
×
973
    }
974
  }
975

976
  @Override
977
  public TPermissionInfoResp checkUserPrivileges(
978
      String username, List<PartialPath> paths, int permission) {
979
    TSStatus status = confirmLeader();
×
980
    if (status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
×
981
      return permissionManager.checkUserPrivileges(username, paths, permission);
×
982
    } else {
983
      TPermissionInfoResp resp = AuthUtils.generateEmptyPermissionInfoResp();
×
984
      resp.setStatus(status);
×
985
      return resp;
×
986
    }
987
  }
988

989
  @Override
990
  public TConfigNodeRegisterResp registerConfigNode(TConfigNodeRegisterReq req) {
991
    final int ERROR_STATUS_NODE_ID = -1;
×
992

993
    TSStatus status = confirmLeader();
×
994
    if (status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
×
995
      // Make sure the global configurations are consist
996
      status = checkConfigNodeGlobalConfig(req);
×
997
      if (status == null) {
×
998
        status =
×
999
            ClusterNodeStartUtils.confirmNodeRegistration(
×
1000
                NodeType.ConfigNode,
1001
                req.getClusterParameters().getClusterName(),
×
1002
                req.getConfigNodeLocation(),
×
1003
                this);
1004
        if (status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
×
1005
          return nodeManager.registerConfigNode(req);
×
1006
        }
1007
      }
1008
    }
1009

1010
    return new TConfigNodeRegisterResp().setStatus(status).setConfigNodeId(ERROR_STATUS_NODE_ID);
×
1011
  }
1012

1013
  public TSStatus checkConfigNodeGlobalConfig(TConfigNodeRegisterReq req) {
1014
    final String errorPrefix = "Reject register, please ensure that the parameter ";
×
1015
    final String errorSuffix = " is consistent with the Seed-ConfigNode.";
×
1016
    TSStatus errorStatus = new TSStatus(TSStatusCode.CONFIGURATION_ERROR.getStatusCode());
×
1017
    TClusterParameters clusterParameters = req.getClusterParameters();
×
1018

1019
    if (!clusterParameters
×
1020
        .getConfigNodeConsensusProtocolClass()
×
1021
        .equals(CONF.getConfigNodeConsensusProtocolClass())) {
×
1022
      return errorStatus.setMessage(
×
1023
          errorPrefix + "config_node_consensus_protocol_class" + errorSuffix);
1024
    }
1025
    if (!clusterParameters
×
1026
        .getDataRegionConsensusProtocolClass()
×
1027
        .equals(CONF.getDataRegionConsensusProtocolClass())) {
×
1028
      return errorStatus.setMessage(
×
1029
          errorPrefix + "data_region_consensus_protocol_class" + errorSuffix);
1030
    }
1031
    if (!clusterParameters
×
1032
        .getSchemaRegionConsensusProtocolClass()
×
1033
        .equals(CONF.getSchemaRegionConsensusProtocolClass())) {
×
1034
      return errorStatus.setMessage(
×
1035
          errorPrefix + "schema_region_consensus_protocol_class" + errorSuffix);
1036
    }
1037

1038
    if (clusterParameters.getSeriesPartitionSlotNum() != CONF.getSeriesSlotNum()) {
×
1039
      return errorStatus.setMessage(errorPrefix + "series_partition_slot_num" + errorSuffix);
×
1040
    }
1041
    if (!clusterParameters
×
1042
        .getSeriesPartitionExecutorClass()
×
1043
        .equals(CONF.getSeriesPartitionExecutorClass())) {
×
1044
      return errorStatus.setMessage(errorPrefix + "series_partition_executor_class" + errorSuffix);
×
1045
    }
1046

1047
    if (clusterParameters.getDefaultTTL()
×
1048
        != CommonDescriptor.getInstance().getConfig().getDefaultTTLInMs()) {
×
1049
      return errorStatus.setMessage(errorPrefix + "default_ttl" + errorSuffix);
×
1050
    }
1051
    if (clusterParameters.getTimePartitionInterval() != COMMON_CONF.getTimePartitionInterval()) {
×
1052
      return errorStatus.setMessage(errorPrefix + "time_partition_interval" + errorSuffix);
×
1053
    }
1054

1055
    if (clusterParameters.getSchemaReplicationFactor() != CONF.getSchemaReplicationFactor()) {
×
1056
      return errorStatus.setMessage(errorPrefix + "schema_replication_factor" + errorSuffix);
×
1057
    }
1058
    if (clusterParameters.getDataReplicationFactor() != CONF.getDataReplicationFactor()) {
×
1059
      return errorStatus.setMessage(errorPrefix + "data_replication_factor" + errorSuffix);
×
1060
    }
1061

1062
    if (clusterParameters.getSchemaRegionPerDataNode() != CONF.getSchemaRegionPerDataNode()) {
×
1063
      return errorStatus.setMessage(errorPrefix + "schema_region_per_data_node" + errorSuffix);
×
1064
    }
1065
    if (clusterParameters.getDataRegionPerDataNode() != CONF.getDataRegionPerDataNode()) {
×
1066
      return errorStatus.setMessage(errorPrefix + "data_region_per_data_node" + errorSuffix);
×
1067
    }
1068

1069
    if (!clusterParameters.getReadConsistencyLevel().equals(CONF.getReadConsistencyLevel())) {
×
1070
      return errorStatus.setMessage(errorPrefix + "read_consistency_level" + errorSuffix);
×
1071
    }
1072

1073
    if (clusterParameters.getDiskSpaceWarningThreshold()
×
1074
        != COMMON_CONF.getDiskSpaceWarningThreshold()) {
×
1075
      return errorStatus.setMessage(errorPrefix + "disk_space_warning_threshold" + errorSuffix);
×
1076
    }
1077

1078
    if (!clusterParameters.getTimestampPrecision().equals(COMMON_CONF.getTimestampPrecision())) {
×
1079
      return errorStatus.setMessage(errorPrefix + "timestamp_precision" + errorSuffix);
×
1080
    }
1081

1082
    if (!clusterParameters.getSchemaEngineMode().equals(COMMON_CONF.getSchemaEngineMode())) {
×
1083
      return errorStatus.setMessage(errorPrefix + "schema_engine_mode" + errorSuffix);
×
1084
    }
1085

1086
    if (clusterParameters.getTagAttributeTotalSize() != COMMON_CONF.getTagAttributeTotalSize()) {
×
1087
      return errorStatus.setMessage(errorPrefix + "tag_attribute_total_size" + errorSuffix);
×
1088
    }
1089

1090
    if (clusterParameters.getDatabaseLimitThreshold() != COMMON_CONF.getDatabaseLimitThreshold()) {
×
1091
      return errorStatus.setMessage(errorPrefix + "database_limit_threshold" + errorSuffix);
×
1092
    }
1093

1094
    return null;
×
1095
  }
1096

1097
  @Override
1098
  public TSStatus createPeerForConsensusGroup(List<TConfigNodeLocation> configNodeLocations) {
1099
    for (int i = 0; i < 30; i++) {
×
1100
      try {
1101
        if (consensusManager.get() == null) {
×
1102
          Thread.sleep(1000);
×
1103
        } else {
1104
          // When add non Seed-ConfigNode to the ConfigNodeGroup, the parameter should be emptyList
1105
          consensusManager.get().createPeerForConsensusGroup(Collections.emptyList());
×
1106
          return StatusUtils.OK;
×
1107
        }
1108
      } catch (InterruptedException e) {
×
1109
        Thread.currentThread().interrupt();
×
1110
        LOGGER.warn("Unexpected interruption during retry creating peer for consensus group");
×
1111
      } catch (ConsensusException e) {
×
1112
        LOGGER.error("Failed to create peer for consensus group", e);
×
1113
        break;
×
1114
      }
×
1115
    }
1116
    return StatusUtils.INTERNAL_ERROR;
×
1117
  }
1118

1119
  @Override
1120
  public TSStatus removeConfigNode(RemoveConfigNodePlan removeConfigNodePlan) {
1121
    TSStatus status = confirmLeader();
×
1122

1123
    if (status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
×
1124
      status = nodeManager.checkConfigNodeBeforeRemove(removeConfigNodePlan);
×
1125
      if (status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
×
1126
        procedureManager.removeConfigNode(removeConfigNodePlan);
×
1127
      }
1128
    }
1129

1130
    return status;
×
1131
  }
1132

1133
  @Override
1134
  public TSStatus reportConfigNodeShutdown(TConfigNodeLocation configNodeLocation) {
1135
    TSStatus status = confirmLeader();
×
1136
    if (status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
×
1137
      // Force updating the target ConfigNode's status to Unknown
1138
      getLoadManager()
×
1139
          .forceUpdateNodeCache(
×
1140
              NodeType.ConfigNode,
1141
              configNodeLocation.getConfigNodeId(),
×
1142
              NodeHeartbeatSample.generateDefaultSample(NodeStatus.Unknown));
×
1143
      LOGGER.info(
×
1144
          "[ShutdownHook] The ConfigNode-{} will be shutdown soon, mark it as Unknown",
1145
          configNodeLocation.getConfigNodeId());
×
1146
    }
1147
    return status;
×
1148
  }
1149

1150
  @Override
1151
  public TSStatus createFunction(TCreateFunctionReq req) {
1152
    TSStatus status = confirmLeader();
×
1153
    return status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()
×
1154
        ? udfManager.createFunction(req)
×
1155
        : status;
×
1156
  }
1157

1158
  @Override
1159
  public TSStatus dropFunction(String udfName) {
1160
    TSStatus status = confirmLeader();
×
1161
    return status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()
×
1162
        ? udfManager.dropFunction(udfName)
×
1163
        : status;
×
1164
  }
1165

1166
  @Override
1167
  public TGetUDFTableResp getUDFTable() {
1168
    TSStatus status = confirmLeader();
×
1169
    return status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()
×
1170
        ? udfManager.getUDFTable()
×
1171
        : new TGetUDFTableResp(status, Collections.emptyList());
×
1172
  }
1173

1174
  @Override
1175
  public TGetJarInListResp getUDFJar(TGetJarInListReq req) {
1176
    TSStatus status = confirmLeader();
×
1177
    return status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()
×
1178
        ? udfManager.getUDFJar(req)
×
1179
        : new TGetJarInListResp(status, Collections.emptyList());
×
1180
  }
1181

1182
  @Override
1183
  public TSStatus createTrigger(TCreateTriggerReq req) {
1184
    TSStatus status = confirmLeader();
×
1185
    return status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()
×
1186
        ? triggerManager.createTrigger(req)
×
1187
        : status;
×
1188
  }
1189

1190
  @Override
1191
  public TSStatus dropTrigger(TDropTriggerReq req) {
1192
    TSStatus status = confirmLeader();
×
1193
    return status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()
×
1194
        ? triggerManager.dropTrigger(req)
×
1195
        : status;
×
1196
  }
1197

1198
  @Override
1199
  public TGetTriggerTableResp getTriggerTable() {
1200
    TSStatus status = confirmLeader();
×
1201
    return status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()
×
1202
        ? triggerManager.getTriggerTable(false)
×
1203
        : new TGetTriggerTableResp(status, Collections.emptyList());
×
1204
  }
1205

1206
  @Override
1207
  public TGetTriggerTableResp getStatefulTriggerTable() {
1208
    TSStatus status = confirmLeader();
×
1209
    return status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()
×
1210
        ? triggerManager.getTriggerTable(true)
×
1211
        : new TGetTriggerTableResp(status, Collections.emptyList());
×
1212
  }
1213

1214
  @Override
1215
  public TGetLocationForTriggerResp getLocationOfStatefulTrigger(String triggerName) {
1216
    TSStatus status = confirmLeader();
×
1217
    return status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()
×
1218
        ? triggerManager.getLocationOfStatefulTrigger(triggerName)
×
1219
        : new TGetLocationForTriggerResp(status);
×
1220
  }
1221

1222
  @Override
1223
  public TGetJarInListResp getTriggerJar(TGetJarInListReq req) {
1224
    TSStatus status = confirmLeader();
×
1225
    return status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()
×
1226
        ? triggerManager.getTriggerJar(req)
×
1227
        : new TGetJarInListResp(status, Collections.emptyList());
×
1228
  }
1229

1230
  @Override
1231
  public TSStatus createPipePlugin(TCreatePipePluginReq req) {
1232
    TSStatus status = confirmLeader();
×
1233
    return status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()
×
1234
        ? pipeManager.getPipePluginCoordinator().createPipePlugin(req)
×
1235
        : status;
×
1236
  }
1237

1238
  @Override
1239
  public TSStatus dropPipePlugin(String pipePluginName) {
1240
    TSStatus status = confirmLeader();
×
1241
    return status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()
×
1242
        ? pipeManager.getPipePluginCoordinator().dropPipePlugin(pipePluginName)
×
1243
        : status;
×
1244
  }
1245

1246
  @Override
1247
  public TGetPipePluginTableResp getPipePluginTable() {
1248
    TSStatus status = confirmLeader();
×
1249
    return status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()
×
1250
        ? pipeManager.getPipePluginCoordinator().getPipePluginTable()
×
1251
        : new TGetPipePluginTableResp(status, Collections.emptyList());
×
1252
  }
1253

1254
  @Override
1255
  public TGetJarInListResp getPipePluginJar(TGetJarInListReq req) {
1256
    TSStatus status = confirmLeader();
×
1257
    return status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()
×
1258
        ? pipeManager.getPipePluginCoordinator().getPipePluginJar(req)
×
1259
        : new TGetJarInListResp(status, Collections.emptyList());
×
1260
  }
1261

1262
  @Override
1263
  public TSStatus merge() {
1264
    TSStatus status = confirmLeader();
×
1265
    return status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()
×
1266
        ? RpcUtils.squashResponseStatusList(nodeManager.merge())
×
1267
        : status;
×
1268
  }
1269

1270
  @Override
1271
  public TSStatus flush(TFlushReq req) {
1272
    TSStatus status = confirmLeader();
×
1273
    return status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()
×
1274
        ? RpcUtils.squashResponseStatusList(nodeManager.flush(req))
×
1275
        : status;
×
1276
  }
1277

1278
  @Override
1279
  public TSStatus clearCache() {
1280
    TSStatus status = confirmLeader();
×
1281
    return status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()
×
1282
        ? RpcUtils.squashResponseStatusList(nodeManager.clearCache())
×
1283
        : status;
×
1284
  }
1285

1286
  @Override
1287
  public TSStatus loadConfiguration() {
1288
    TSStatus status = confirmLeader();
×
1289
    return status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()
×
1290
        ? RpcUtils.squashResponseStatusList(nodeManager.loadConfiguration())
×
1291
        : status;
×
1292
  }
1293

1294
  @Override
1295
  public TSStatus setSystemStatus(String systemStatus) {
1296
    TSStatus status = confirmLeader();
×
1297
    return status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()
×
1298
        ? RpcUtils.squashResponseStatusList(nodeManager.setSystemStatus(systemStatus))
×
1299
        : status;
×
1300
  }
1301

1302
  @Override
1303
  public TSStatus setDataNodeStatus(TSetDataNodeStatusReq req) {
1304
    TSStatus status = confirmLeader();
×
1305
    return status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()
×
1306
        ? nodeManager.setDataNodeStatus(req)
×
1307
        : status;
×
1308
  }
1309

1310
  @Override
1311
  public TSStatus killQuery(String queryId, int dataNodeId) {
1312
    TSStatus status = confirmLeader();
×
1313
    return status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()
×
1314
        ? nodeManager.killQuery(queryId, dataNodeId)
×
1315
        : status;
×
1316
  }
1317

1318
  @Override
1319
  public TGetDataNodeLocationsResp getRunningDataNodeLocations() {
1320
    TSStatus status = confirmLeader();
×
1321
    return status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()
×
1322
        ? new TGetDataNodeLocationsResp(
×
1323
            new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode()),
×
1324
            nodeManager.filterDataNodeThroughStatus(NodeStatus.Running).stream()
×
1325
                .map(TDataNodeConfiguration::getLocation)
×
1326
                .collect(Collectors.toList()))
×
1327
        : new TGetDataNodeLocationsResp(status, Collections.emptyList());
×
1328
  }
1329

1330
  @Override
1331
  public TRegionRouteMapResp getLatestRegionRouteMap() {
1332
    TSStatus status = confirmLeader();
×
1333
    TRegionRouteMapResp resp = new TRegionRouteMapResp(status);
×
1334

1335
    if (status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
×
1336
      resp.setTimestamp(System.currentTimeMillis());
×
1337
      resp.setRegionRouteMap(getLoadManager().getRegionPriorityMap());
×
1338
    }
1339

1340
    return resp;
×
1341
  }
1342

1343
  @Override
1344
  public UDFManager getUDFManager() {
1345
    return udfManager;
×
1346
  }
1347

1348
  @Override
1349
  public RegionInfoListResp showRegion(GetRegionInfoListPlan getRegionInfoListPlan) {
1350
    TSStatus status = confirmLeader();
×
1351
    if (status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
×
1352
      return partitionManager.getRegionInfoList(getRegionInfoListPlan);
×
1353
    } else {
1354
      RegionInfoListResp regionResp = new RegionInfoListResp();
×
1355
      regionResp.setStatus(status);
×
1356
      return regionResp;
×
1357
    }
1358
  }
1359

1360
  @Override
1361
  public TShowDataNodesResp showDataNodes() {
1362
    TSStatus status = confirmLeader();
×
1363
    TShowDataNodesResp resp = new TShowDataNodesResp();
×
1364
    if (status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
×
1365
      return resp.setDataNodesInfoList(nodeManager.getRegisteredDataNodeInfoList())
×
1366
          .setStatus(StatusUtils.OK);
×
1367
    } else {
1368
      return resp.setStatus(status);
×
1369
    }
1370
  }
1371

1372
  @Override
1373
  public TShowConfigNodesResp showConfigNodes() {
1374
    TSStatus status = confirmLeader();
×
1375
    TShowConfigNodesResp resp = new TShowConfigNodesResp();
×
1376
    if (status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
×
1377
      return resp.setConfigNodesInfoList(nodeManager.getRegisteredConfigNodeInfoList())
×
1378
          .setStatus(StatusUtils.OK);
×
1379
    } else {
1380
      return resp.setStatus(status);
×
1381
    }
1382
  }
1383

1384
  @Override
1385
  public TShowDatabaseResp showDatabase(GetDatabasePlan getDatabasePlan) {
1386
    TSStatus status = confirmLeader();
×
1387
    if (status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
×
1388
      return getClusterSchemaManager().showDatabase(getDatabasePlan);
×
1389
    } else {
1390
      return new TShowDatabaseResp().setStatus(status);
×
1391
    }
1392
  }
1393

1394
  @Override
1395
  public ProcedureManager getProcedureManager() {
1396
    return procedureManager;
×
1397
  }
1398

1399
  @Override
1400
  public CQManager getCQManager() {
1401
    return cqManager;
×
1402
  }
1403

1404
  @Override
1405
  public ClusterQuotaManager getClusterQuotaManager() {
1406
    return clusterQuotaManager;
×
1407
  }
1408

1409
  @Override
1410
  public RetryFailedTasksThread getRetryFailedTasksThread() {
1411
    return retryFailedTasksThread;
×
1412
  }
1413

1414
  @Override
1415
  public void addMetrics() {
1416
    MetricService.getInstance().addMetricSet(new NodeMetrics(getNodeManager()));
×
1417
    MetricService.getInstance().addMetricSet(new PartitionMetrics(this));
×
1418
  }
×
1419

1420
  @Override
1421
  public TSStatus createSchemaTemplate(TCreateSchemaTemplateReq req) {
1422
    TSStatus status = confirmLeader();
×
1423
    if (status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
×
1424
      CreateSchemaTemplatePlan createSchemaTemplatePlan =
×
1425
          new CreateSchemaTemplatePlan(req.getSerializedTemplate());
×
1426
      return clusterSchemaManager.createTemplate(createSchemaTemplatePlan);
×
1427
    } else {
1428
      return status;
×
1429
    }
1430
  }
1431

1432
  @Override
1433
  public TGetAllTemplatesResp getAllTemplates() {
1434
    TSStatus status = confirmLeader();
×
1435
    if (status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
×
1436
      return clusterSchemaManager.getAllTemplates();
×
1437
    } else {
1438
      return new TGetAllTemplatesResp().setStatus(status);
×
1439
    }
1440
  }
1441

1442
  @Override
1443
  public TGetTemplateResp getTemplate(String req) {
1444
    TSStatus status = confirmLeader();
×
1445
    if (status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
×
1446
      return clusterSchemaManager.getTemplate(req);
×
1447
    } else {
1448
      return new TGetTemplateResp().setStatus(status);
×
1449
    }
1450
  }
1451

1452
  @Override
1453
  public synchronized TSStatus setSchemaTemplate(TSetSchemaTemplateReq req) {
1454
    TSStatus status = confirmLeader();
×
1455
    if (status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
×
1456
      return procedureManager.setSchemaTemplate(req.getQueryId(), req.getName(), req.getPath());
×
1457
    } else {
1458
      return status;
×
1459
    }
1460
  }
1461

1462
  @Override
1463
  public TGetPathsSetTemplatesResp getPathsSetTemplate(String req) {
1464
    TSStatus status = confirmLeader();
×
1465
    if (status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
×
1466
      return clusterSchemaManager.getPathsSetTemplate(req);
×
1467
    } else {
1468
      return new TGetPathsSetTemplatesResp(status);
×
1469
    }
1470
  }
1471

1472
  @Override
1473
  public TSStatus deactivateSchemaTemplate(TDeactivateSchemaTemplateReq req) {
1474
    TSStatus status = confirmLeader();
×
1475
    if (status.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
×
1476
      return status;
×
1477
    }
1478

1479
    PathPatternTree patternTree =
×
1480
        PathPatternTree.deserialize(ByteBuffer.wrap(req.getPathPatternTree()));
×
1481

1482
    List<PartialPath> patternList = patternTree.getAllPathPatterns();
×
1483
    TemplateSetInfoResp templateSetInfoResp = clusterSchemaManager.getTemplateSetInfo(patternList);
×
1484
    if (templateSetInfoResp.getStatus().getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
×
1485
      return templateSetInfoResp.getStatus();
×
1486
    }
1487

1488
    Map<PartialPath, List<Template>> templateSetInfo = templateSetInfoResp.getPatternTemplateMap();
×
1489
    if (templateSetInfo.isEmpty()) {
×
1490
      return RpcUtils.getStatus(
×
1491
          TSStatusCode.TEMPLATE_NOT_SET,
1492
          String.format(
×
1493
              "Schema Template %s is not set on any prefix path of %s",
1494
              req.getTemplateName(), patternList));
×
1495
    }
1496

1497
    if (!req.getTemplateName().equals(ONE_LEVEL_PATH_WILDCARD)) {
×
1498
      Map<PartialPath, List<Template>> filteredTemplateSetInfo = new HashMap<>();
×
1499
      for (Map.Entry<PartialPath, List<Template>> entry : templateSetInfo.entrySet()) {
×
1500
        for (Template template : entry.getValue()) {
×
1501
          if (template.getName().equals(req.getTemplateName())) {
×
1502
            filteredTemplateSetInfo.put(entry.getKey(), Collections.singletonList(template));
×
1503
            break;
×
1504
          }
1505
        }
×
1506
      }
×
1507

1508
      if (filteredTemplateSetInfo.isEmpty()) {
×
1509
        return RpcUtils.getStatus(
×
1510
            TSStatusCode.TEMPLATE_NOT_SET,
1511
            String.format(
×
1512
                "Schema Template %s is not set on any prefix path of %s",
1513
                req.getTemplateName(), patternList));
×
1514
      }
1515

1516
      templateSetInfo = filteredTemplateSetInfo;
×
1517
    }
1518

1519
    return procedureManager.deactivateTemplate(req.getQueryId(), templateSetInfo);
×
1520
  }
1521

1522
  @Override
1523
  public synchronized TSStatus unsetSchemaTemplate(TUnsetSchemaTemplateReq req) {
1524
    TSStatus status = confirmLeader();
×
1525
    if (status.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
×
1526
      return status;
×
1527
    }
1528
    Pair<TSStatus, Template> checkResult =
×
1529
        clusterSchemaManager.checkIsTemplateSetOnPath(req.getTemplateName(), req.getPath());
×
1530
    if (checkResult.left.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
×
1531
      try {
1532
        return procedureManager.unsetSchemaTemplate(
×
1533
            req.getQueryId(), checkResult.right, new PartialPath(req.getPath()));
×
1534
      } catch (IllegalPathException e) {
×
1535
        return RpcUtils.getStatus(e.getErrorCode(), e.getMessage());
×
1536
      }
1537
    } else {
1538
      return checkResult.left;
×
1539
    }
1540
  }
1541

1542
  @Override
1543
  public TSStatus dropSchemaTemplate(String templateName) {
1544
    TSStatus status = confirmLeader();
×
1545
    if (status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
×
1546
      return clusterSchemaManager.dropSchemaTemplate(templateName);
×
1547
    } else {
1548
      return status;
×
1549
    }
1550
  }
1551

1552
  @Override
1553
  public TSStatus alterSchemaTemplate(TAlterSchemaTemplateReq req) {
1554
    TSStatus status = confirmLeader();
×
1555
    if (status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
×
1556
      ByteBuffer buffer = ByteBuffer.wrap(req.getTemplateAlterInfo());
×
1557
      TemplateAlterOperationType operationType =
×
1558
          TemplateAlterOperationUtil.parseOperationType(buffer);
×
1559
      if (operationType.equals(TemplateAlterOperationType.EXTEND_TEMPLATE)) {
×
1560
        return clusterSchemaManager.extendSchemaTemplate(
×
1561
            TemplateAlterOperationUtil.parseTemplateExtendInfo(buffer));
×
1562
      }
1563
      return RpcUtils.getStatus(TSStatusCode.UNSUPPORTED_OPERATION);
×
1564
    } else {
1565
      return status;
×
1566
    }
1567
  }
1568

1569
  @Override
1570
  public TSStatus deleteTimeSeries(TDeleteTimeSeriesReq req) {
1571
    TSStatus status = confirmLeader();
×
1572
    if (status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
×
1573
      return procedureManager.deleteTimeSeries(req);
×
1574
    } else {
1575
      return status;
×
1576
    }
1577
  }
1578

1579
  @Override
1580
  public TSStatus deleteLogicalView(TDeleteLogicalViewReq req) {
1581
    TSStatus status = confirmLeader();
×
1582
    if (status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
×
1583
      return procedureManager.deleteLogicalView(req);
×
1584
    } else {
1585
      return status;
×
1586
    }
1587
  }
1588

1589
  @Override
1590
  public TSStatus alterLogicalView(TAlterLogicalViewReq req) {
1591
    TSStatus status = confirmLeader();
×
1592
    if (status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
×
1593
      return procedureManager.alterLogicalView(req);
×
1594
    } else {
1595
      return status;
×
1596
    }
1597
  }
1598

1599
  @Override
1600
  public TSStatus createPipe(TCreatePipeReq req) {
1601
    TSStatus status = confirmLeader();
×
1602
    return status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()
×
1603
        ? pipeManager.getPipeTaskCoordinator().createPipe(req)
×
1604
        : status;
×
1605
  }
1606

1607
  @Override
1608
  public TSStatus startPipe(String pipeName) {
1609
    TSStatus status = confirmLeader();
×
1610
    return status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()
×
1611
        ? pipeManager.getPipeTaskCoordinator().startPipe(pipeName)
×
1612
        : status;
×
1613
  }
1614

1615
  @Override
1616
  public TSStatus stopPipe(String pipeName) {
1617
    TSStatus status = confirmLeader();
×
1618
    return status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()
×
1619
        ? pipeManager.getPipeTaskCoordinator().stopPipe(pipeName)
×
1620
        : status;
×
1621
  }
1622

1623
  @Override
1624
  public TSStatus dropPipe(String pipeName) {
1625
    TSStatus status = confirmLeader();
×
1626
    return status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()
×
1627
        ? pipeManager.getPipeTaskCoordinator().dropPipe(pipeName)
×
1628
        : status;
×
1629
  }
1630

1631
  @Override
1632
  public TShowPipeResp showPipe(TShowPipeReq req) {
1633
    TSStatus status = confirmLeader();
×
1634
    return status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()
×
1635
        ? pipeManager.getPipeTaskCoordinator().showPipes(req)
×
1636
        : new TShowPipeResp().setStatus(status);
×
1637
  }
1638

1639
  @Override
1640
  public TGetAllPipeInfoResp getAllPipeInfo() {
1641
    TSStatus status = confirmLeader();
×
1642
    return status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()
×
1643
        ? pipeManager.getPipeTaskCoordinator().getAllPipeInfo()
×
1644
        : new TGetAllPipeInfoResp(status, Collections.emptyList());
×
1645
  }
1646

1647
  @Override
1648
  public TGetRegionIdResp getRegionId(TGetRegionIdReq req) {
1649
    TSStatus status = confirmLeader();
×
1650
    return status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()
×
1651
        ? partitionManager.getRegionId(req).convertToRpcGetRegionIdResp()
×
1652
        : new TGetRegionIdResp(status);
×
1653
  }
1654

1655
  @Override
1656
  public TGetTimeSlotListResp getTimeSlotList(TGetTimeSlotListReq req) {
1657
    TSStatus status = confirmLeader();
×
1658
    return status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()
×
1659
        ? partitionManager.getTimeSlotList(req).convertToRpcGetTimeSlotListResp()
×
1660
        : new TGetTimeSlotListResp(status);
×
1661
  }
1662

1663
  @Override
1664
  public TCountTimeSlotListResp countTimeSlotList(TCountTimeSlotListReq req) {
1665
    TSStatus status = confirmLeader();
×
1666
    return status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()
×
1667
        ? partitionManager.countTimeSlotList(req).convertToRpcCountTimeSlotListResp()
×
1668
        : new TCountTimeSlotListResp(status);
×
1669
  }
1670

1671
  @Override
1672
  public TGetSeriesSlotListResp getSeriesSlotList(TGetSeriesSlotListReq req) {
1673
    TSStatus status = confirmLeader();
×
1674
    return status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()
×
1675
        ? partitionManager.getSeriesSlotList(req).convertToRpcGetSeriesSlotListResp()
×
1676
        : new TGetSeriesSlotListResp(status);
×
1677
  }
1678

1679
  @Override
1680
  public TSStatus migrateRegion(TMigrateRegionReq req) {
1681
    TSStatus status = confirmLeader();
×
1682
    return status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()
×
1683
        ? procedureManager.migrateRegion(req)
×
1684
        : status;
×
1685
  }
1686

1687
  @Override
1688
  public TSStatus createCQ(TCreateCQReq req) {
1689
    TSStatus status = confirmLeader();
×
1690
    return status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()
×
1691
        ? cqManager.createCQ(req)
×
1692
        : status;
×
1693
  }
1694

1695
  @Override
1696
  public TSStatus dropCQ(TDropCQReq req) {
1697
    TSStatus status = confirmLeader();
×
1698
    return status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()
×
1699
        ? cqManager.dropCQ(req)
×
1700
        : status;
×
1701
  }
1702

1703
  @Override
1704
  public TShowCQResp showCQ() {
1705
    TSStatus status = confirmLeader();
×
1706
    return status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()
×
1707
        ? cqManager.showCQ()
×
1708
        : new TShowCQResp(status, Collections.emptyList());
×
1709
  }
1710

1711
  /**
1712
   * Get all related schemaRegion which may contains the timeseries matched by given patternTree.
1713
   */
1714
  public Map<TConsensusGroupId, TRegionReplicaSet> getRelatedSchemaRegionGroup(
1715
      PathPatternTree patternTree) {
1716
    Map<String, Map<TSeriesPartitionSlot, TConsensusGroupId>> schemaPartitionTable =
×
1717
        getSchemaPartition(patternTree).getSchemaPartitionTable();
×
1718

1719
    List<TRegionReplicaSet> allRegionReplicaSets = getPartitionManager().getAllReplicaSets();
×
1720
    Set<TConsensusGroupId> groupIdSet =
×
1721
        schemaPartitionTable.values().stream()
×
1722
            .flatMap(m -> m.values().stream())
×
1723
            .collect(Collectors.toSet());
×
1724
    Map<TConsensusGroupId, TRegionReplicaSet> filteredRegionReplicaSets = new HashMap<>();
×
1725
    for (TRegionReplicaSet regionReplicaSet : allRegionReplicaSets) {
×
1726
      if (groupIdSet.contains(regionReplicaSet.getRegionId())) {
×
1727
        filteredRegionReplicaSets.put(regionReplicaSet.getRegionId(), regionReplicaSet);
×
1728
      }
1729
    }
×
1730
    return filteredRegionReplicaSets;
×
1731
  }
1732

1733
  /**
1734
   * Get all related dataRegion which may contains the data of specific timeseries matched by given
1735
   * patternTree
1736
   */
1737
  public Map<TConsensusGroupId, TRegionReplicaSet> getRelatedDataRegionGroup(
1738
      PathPatternTree patternTree) {
1739
    // Get all databases and slots by getting schemaengine partition
1740
    Map<String, Map<TSeriesPartitionSlot, TConsensusGroupId>> schemaPartitionTable =
×
1741
        getSchemaPartition(patternTree).getSchemaPartitionTable();
×
1742

1743
    // Construct request for getting data partition
1744
    Map<String, Map<TSeriesPartitionSlot, TTimeSlotList>> partitionSlotsMap = new HashMap<>();
×
1745
    schemaPartitionTable.forEach(
×
1746
        (key, value) -> {
1747
          Map<TSeriesPartitionSlot, TTimeSlotList> slotListMap = new HashMap<>();
×
1748
          value
×
1749
              .keySet()
×
1750
              .forEach(
×
1751
                  slot ->
1752
                      slotListMap.put(
×
1753
                          slot, new TTimeSlotList(Collections.emptyList(), true, true)));
×
1754
          partitionSlotsMap.put(key, slotListMap);
×
1755
        });
×
1756

1757
    // Get all data partitions
1758
    GetDataPartitionPlan getDataPartitionPlan = new GetDataPartitionPlan(partitionSlotsMap);
×
1759
    Map<String, Map<TSeriesPartitionSlot, Map<TTimePartitionSlot, List<TConsensusGroupId>>>>
1760
        dataPartitionTable = getDataPartition(getDataPartitionPlan).getDataPartitionTable();
×
1761

1762
    // Get all region replicaset of target data partitions
1763
    List<TRegionReplicaSet> allRegionReplicaSets = getPartitionManager().getAllReplicaSets();
×
1764
    Set<TConsensusGroupId> groupIdSet =
×
1765
        dataPartitionTable.values().stream()
×
1766
            .flatMap(
×
1767
                tSeriesPartitionSlotMapMap ->
1768
                    tSeriesPartitionSlotMapMap.values().stream()
×
1769
                        .flatMap(
×
1770
                            tTimePartitionSlotListMap ->
1771
                                tTimePartitionSlotListMap.values().stream()
×
1772
                                    .flatMap(Collection::stream)))
×
1773
            .collect(Collectors.toSet());
×
1774
    Map<TConsensusGroupId, TRegionReplicaSet> filteredRegionReplicaSets = new HashMap<>();
×
1775
    for (TRegionReplicaSet regionReplicaSet : allRegionReplicaSets) {
×
1776
      if (groupIdSet.contains(regionReplicaSet.getRegionId())) {
×
1777
        filteredRegionReplicaSets.put(regionReplicaSet.getRegionId(), regionReplicaSet);
×
1778
      }
1779
    }
×
1780
    return filteredRegionReplicaSets;
×
1781
  }
1782

1783
  public TSStatus transfer(List<TDataNodeLocation> newUnknownDataList) {
1784
    Map<Integer, TDataNodeLocation> runningDataNodeLocationMap = new HashMap<>();
×
1785
    nodeManager
×
1786
        .filterDataNodeThroughStatus(NodeStatus.Running)
×
1787
        .forEach(
×
1788
            dataNodeConfiguration ->
1789
                runningDataNodeLocationMap.put(
×
1790
                    dataNodeConfiguration.getLocation().getDataNodeId(),
×
1791
                    dataNodeConfiguration.getLocation()));
×
1792
    if (runningDataNodeLocationMap.isEmpty()) {
×
1793
      // No running DataNode, will not transfer and print log
1794
      return new TSStatus(TSStatusCode.EXECUTE_STATEMENT_ERROR.getStatusCode());
×
1795
    }
1796

1797
    newUnknownDataList.forEach(
×
1798
        dataNodeLocation -> runningDataNodeLocationMap.remove(dataNodeLocation.getDataNodeId()));
×
1799

1800
    LOGGER.info("Start transfer of {}", newUnknownDataList);
×
1801
    // Transfer trigger
1802
    TSStatus transferResult =
×
1803
        triggerManager.transferTrigger(newUnknownDataList, runningDataNodeLocationMap);
×
1804
    if (transferResult.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
×
1805
      LOGGER.warn("Fail to transfer because {}, will retry", transferResult.getMessage());
×
1806
    }
1807

1808
    return transferResult;
×
1809
  }
1810

1811
  @Override
1812
  public TSStatus createModel(TCreateModelReq req) {
1813
    TSStatus status = confirmLeader();
×
1814
    return status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()
×
1815
        ? modelManager.createModel(req)
×
1816
        : status;
×
1817
  }
1818

1819
  @Override
1820
  public TSStatus dropModel(TDropModelReq req) {
1821
    TSStatus status = confirmLeader();
×
1822
    return status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()
×
1823
        ? modelManager.dropModel(req)
×
1824
        : status;
×
1825
  }
1826

1827
  @Override
1828
  public TShowModelResp showModel(TShowModelReq req) {
1829
    TSStatus status = confirmLeader();
×
1830
    return status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()
×
1831
        ? modelManager.showModel(req)
×
1832
        : new TShowModelResp(status, Collections.emptyList());
×
1833
  }
1834

1835
  @Override
1836
  public TShowTrailResp showTrail(TShowTrailReq req) {
1837
    TSStatus status = confirmLeader();
×
1838
    return status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()
×
1839
        ? modelManager.showTrail(req)
×
1840
        : new TShowTrailResp(status, Collections.emptyList());
×
1841
  }
1842

1843
  @Override
1844
  public TSStatus updateModelInfo(TUpdateModelInfoReq req) {
1845
    TSStatus status = confirmLeader();
×
1846
    return status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()
×
1847
        ? modelManager.updateModelInfo(req)
×
1848
        : status;
×
1849
  }
1850

1851
  @Override
1852
  public TSStatus updateModelState(TUpdateModelStateReq req) {
1853
    TSStatus status = confirmLeader();
×
1854
    return status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()
×
1855
        ? modelManager.updateModelState(req)
×
1856
        : status;
×
1857
  }
1858

1859
  @Override
1860
  public TSStatus setSpaceQuota(TSetSpaceQuotaReq req) {
1861
    TSStatus status = confirmLeader();
×
1862
    return status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()
×
1863
        ? clusterQuotaManager.setSpaceQuota(req)
×
1864
        : status;
×
1865
  }
1866

1867
  public TSpaceQuotaResp showSpaceQuota(List<String> databases) {
1868
    TSStatus status = confirmLeader();
×
1869
    return status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()
×
1870
        ? clusterQuotaManager.showSpaceQuota(databases)
×
1871
        : new TSpaceQuotaResp(status);
×
1872
  }
1873

1874
  public TSpaceQuotaResp getSpaceQuota() {
1875
    TSStatus status = confirmLeader();
×
1876
    return status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()
×
1877
        ? clusterQuotaManager.getSpaceQuota()
×
1878
        : new TSpaceQuotaResp(status);
×
1879
  }
1880

1881
  public TSStatus setThrottleQuota(TSetThrottleQuotaReq req) {
1882
    TSStatus status = confirmLeader();
×
1883
    return status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()
×
1884
        ? clusterQuotaManager.setThrottleQuota(req)
×
1885
        : status;
×
1886
  }
1887

1888
  public TThrottleQuotaResp showThrottleQuota(TShowThrottleReq req) {
1889
    TSStatus status = confirmLeader();
×
1890
    return status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()
×
1891
        ? clusterQuotaManager.showThrottleQuota(req)
×
1892
        : new TThrottleQuotaResp(status);
×
1893
  }
1894

1895
  public TThrottleQuotaResp getThrottleQuota() {
1896
    TSStatus status = confirmLeader();
×
1897
    return status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()
×
1898
        ? clusterQuotaManager.getThrottleQuota()
×
1899
        : new TThrottleQuotaResp(status);
×
1900
  }
1901
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc