• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

apache / iotdb / #9686

pending completion
#9686

push

travis_ci

web-flow
add build info in show cluster (#10595)

146 of 146 new or added lines in 13 files covered. (100.0%)

79232 of 165062 relevant lines covered (48.0%)

0.48 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

0.0
/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/ProcedureManager.java
1
/*
2
 * Licensed to the Apache Software Foundation (ASF) under one
3
 * or more contributor license agreements.  See the NOTICE file
4
 * distributed with this work for additional information
5
 * regarding copyright ownership.  The ASF licenses this file
6
 * to you under the Apache License, Version 2.0 (the
7
 * "License"); you may not use this file except in compliance
8
 * with the License.  You may obtain a copy of the License at
9
 *
10
 *     http://www.apache.org/licenses/LICENSE-2.0
11
 *
12
 * Unless required by applicable law or agreed to in writing,
13
 * software distributed under the License is distributed on an
14
 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15
 * KIND, either express or implied.  See the License for the
16
 * specific language governing permissions and limitations
17
 * under the License.
18
 */
19

20
package org.apache.iotdb.confignode.manager;
21

22
import org.apache.iotdb.common.rpc.thrift.TConsensusGroupId;
23
import org.apache.iotdb.common.rpc.thrift.TConsensusGroupType;
24
import org.apache.iotdb.common.rpc.thrift.TDataNodeConfiguration;
25
import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation;
26
import org.apache.iotdb.common.rpc.thrift.TSStatus;
27
import org.apache.iotdb.commons.cluster.NodeStatus;
28
import org.apache.iotdb.commons.conf.IoTDBConstant;
29
import org.apache.iotdb.commons.exception.IoTDBException;
30
import org.apache.iotdb.commons.model.ModelInformation;
31
import org.apache.iotdb.commons.path.PartialPath;
32
import org.apache.iotdb.commons.path.PathDeserializeUtil;
33
import org.apache.iotdb.commons.path.PathPatternTree;
34
import org.apache.iotdb.commons.pipe.plugin.meta.PipePluginMeta;
35
import org.apache.iotdb.commons.schema.view.viewExpression.ViewExpression;
36
import org.apache.iotdb.commons.trigger.TriggerInformation;
37
import org.apache.iotdb.commons.utils.StatusUtils;
38
import org.apache.iotdb.confignode.conf.ConfigNodeConfig;
39
import org.apache.iotdb.confignode.conf.ConfigNodeDescriptor;
40
import org.apache.iotdb.confignode.consensus.request.write.confignode.RemoveConfigNodePlan;
41
import org.apache.iotdb.confignode.consensus.request.write.datanode.RemoveDataNodePlan;
42
import org.apache.iotdb.confignode.consensus.request.write.procedure.UpdateProcedurePlan;
43
import org.apache.iotdb.confignode.consensus.request.write.region.CreateRegionGroupsPlan;
44
import org.apache.iotdb.confignode.manager.partition.PartitionManager;
45
import org.apache.iotdb.confignode.persistence.ProcedureInfo;
46
import org.apache.iotdb.confignode.procedure.Procedure;
47
import org.apache.iotdb.confignode.procedure.ProcedureExecutor;
48
import org.apache.iotdb.confignode.procedure.env.ConfigNodeProcedureEnv;
49
import org.apache.iotdb.confignode.procedure.impl.cq.CreateCQProcedure;
50
import org.apache.iotdb.confignode.procedure.impl.model.CreateModelProcedure;
51
import org.apache.iotdb.confignode.procedure.impl.model.DropModelProcedure;
52
import org.apache.iotdb.confignode.procedure.impl.node.AddConfigNodeProcedure;
53
import org.apache.iotdb.confignode.procedure.impl.node.RemoveConfigNodeProcedure;
54
import org.apache.iotdb.confignode.procedure.impl.node.RemoveDataNodeProcedure;
55
import org.apache.iotdb.confignode.procedure.impl.pipe.plugin.CreatePipePluginProcedure;
56
import org.apache.iotdb.confignode.procedure.impl.pipe.plugin.DropPipePluginProcedure;
57
import org.apache.iotdb.confignode.procedure.impl.pipe.runtime.PipeHandleLeaderChangeProcedure;
58
import org.apache.iotdb.confignode.procedure.impl.pipe.runtime.PipeHandleMetaChangeProcedure;
59
import org.apache.iotdb.confignode.procedure.impl.pipe.runtime.PipeMetaSyncProcedure;
60
import org.apache.iotdb.confignode.procedure.impl.pipe.task.CreatePipeProcedureV2;
61
import org.apache.iotdb.confignode.procedure.impl.pipe.task.DropPipeProcedureV2;
62
import org.apache.iotdb.confignode.procedure.impl.pipe.task.StartPipeProcedureV2;
63
import org.apache.iotdb.confignode.procedure.impl.pipe.task.StopPipeProcedureV2;
64
import org.apache.iotdb.confignode.procedure.impl.schema.AlterLogicalViewProcedure;
65
import org.apache.iotdb.confignode.procedure.impl.schema.DeactivateTemplateProcedure;
66
import org.apache.iotdb.confignode.procedure.impl.schema.DeleteDatabaseProcedure;
67
import org.apache.iotdb.confignode.procedure.impl.schema.DeleteLogicalViewProcedure;
68
import org.apache.iotdb.confignode.procedure.impl.schema.DeleteTimeSeriesProcedure;
69
import org.apache.iotdb.confignode.procedure.impl.schema.SetTemplateProcedure;
70
import org.apache.iotdb.confignode.procedure.impl.schema.UnsetTemplateProcedure;
71
import org.apache.iotdb.confignode.procedure.impl.statemachine.CreateRegionGroupsProcedure;
72
import org.apache.iotdb.confignode.procedure.impl.statemachine.RegionMigrateProcedure;
73
import org.apache.iotdb.confignode.procedure.impl.trigger.CreateTriggerProcedure;
74
import org.apache.iotdb.confignode.procedure.impl.trigger.DropTriggerProcedure;
75
import org.apache.iotdb.confignode.procedure.scheduler.ProcedureScheduler;
76
import org.apache.iotdb.confignode.procedure.scheduler.SimpleProcedureScheduler;
77
import org.apache.iotdb.confignode.procedure.store.ConfigProcedureStore;
78
import org.apache.iotdb.confignode.procedure.store.IProcedureStore;
79
import org.apache.iotdb.confignode.procedure.store.ProcedureFactory;
80
import org.apache.iotdb.confignode.procedure.store.ProcedureStore;
81
import org.apache.iotdb.confignode.procedure.store.ProcedureType;
82
import org.apache.iotdb.confignode.rpc.thrift.TAlterLogicalViewReq;
83
import org.apache.iotdb.confignode.rpc.thrift.TConfigNodeRegisterReq;
84
import org.apache.iotdb.confignode.rpc.thrift.TCreateCQReq;
85
import org.apache.iotdb.confignode.rpc.thrift.TCreatePipeReq;
86
import org.apache.iotdb.confignode.rpc.thrift.TDatabaseSchema;
87
import org.apache.iotdb.confignode.rpc.thrift.TDeleteLogicalViewReq;
88
import org.apache.iotdb.confignode.rpc.thrift.TDeleteTimeSeriesReq;
89
import org.apache.iotdb.confignode.rpc.thrift.TMigrateRegionReq;
90
import org.apache.iotdb.confignode.rpc.thrift.TRegionMigrateResultReportReq;
91
import org.apache.iotdb.db.exception.BatchProcessException;
92
import org.apache.iotdb.db.schemaengine.template.Template;
93
import org.apache.iotdb.rpc.RpcUtils;
94
import org.apache.iotdb.rpc.TSStatusCode;
95
import org.apache.iotdb.tsfile.utils.Binary;
96
import org.apache.iotdb.tsfile.utils.Pair;
97

98
import org.slf4j.Logger;
99
import org.slf4j.LoggerFactory;
100

101
import java.io.IOException;
102
import java.nio.ByteBuffer;
103
import java.util.ArrayList;
104
import java.util.Arrays;
105
import java.util.Collections;
106
import java.util.HashMap;
107
import java.util.List;
108
import java.util.Map;
109
import java.util.Set;
110
import java.util.concurrent.ScheduledExecutorService;
111
import java.util.concurrent.TimeUnit;
112
import java.util.stream.Collectors;
113

114
public class ProcedureManager {
115
  private static final Logger LOGGER = LoggerFactory.getLogger(ProcedureManager.class);
×
116

117
  private static final ConfigNodeConfig CONFIG_NODE_CONFIG =
×
118
      ConfigNodeDescriptor.getInstance().getConf();
×
119

120
  private static final int PROCEDURE_WAIT_TIME_OUT = 30;
121
  private static final int PROCEDURE_WAIT_RETRY_TIMEOUT = 250;
122

123
  private final ConfigManager configManager;
124
  private ProcedureExecutor<ConfigNodeProcedureEnv> executor;
125
  private ProcedureScheduler scheduler;
126
  private IProcedureStore store;
127
  private ConfigNodeProcedureEnv env;
128

129
  private final long planSizeLimit;
130

131
  public ProcedureManager(ConfigManager configManager, ProcedureInfo procedureInfo) {
×
132
    this.configManager = configManager;
×
133
    this.scheduler = new SimpleProcedureScheduler();
×
134
    this.store = new ConfigProcedureStore(configManager, procedureInfo);
×
135
    this.env = new ConfigNodeProcedureEnv(configManager, scheduler);
×
136
    this.executor = new ProcedureExecutor<>(env, store, scheduler);
×
137
    this.planSizeLimit =
×
138
        ConfigNodeDescriptor.getInstance()
×
139
                .getConf()
×
140
                .getConfigNodeRatisConsensusLogAppenderBufferSize()
×
141
            - IoTDBConstant.RAFT_LOG_BASIC_SIZE;
142
  }
×
143

144
  public void shiftExecutor(boolean running) {
145
    if (running) {
×
146
      if (!executor.isRunning()) {
×
147
        executor.init(CONFIG_NODE_CONFIG.getProcedureCoreWorkerThreadsCount());
×
148
        executor.startWorkers();
×
149
        executor.startCompletedCleaner(
×
150
            CONFIG_NODE_CONFIG.getProcedureCompletedCleanInterval(),
×
151
            CONFIG_NODE_CONFIG.getProcedureCompletedEvictTTL());
×
152
        store.start();
×
153
        LOGGER.info("ProcedureManager is started successfully.");
×
154
      }
155
    } else {
156
      if (executor.isRunning()) {
×
157
        executor.stop();
×
158
        if (!executor.isRunning()) {
×
159
          executor.join();
×
160
          store.stop();
×
161
          LOGGER.info("ProcedureManager is stopped successfully.");
×
162
        }
163
      }
164
    }
165
  }
×
166

167
  public TSStatus deleteDatabases(ArrayList<TDatabaseSchema> deleteSgSchemaList) {
168
    List<Long> procedureIds = new ArrayList<>();
×
169
    for (TDatabaseSchema storageGroupSchema : deleteSgSchemaList) {
×
170
      DeleteDatabaseProcedure deleteDatabaseProcedure =
×
171
          new DeleteDatabaseProcedure(storageGroupSchema);
172
      long procedureId = this.executor.submitProcedure(deleteDatabaseProcedure);
×
173
      procedureIds.add(procedureId);
×
174
    }
×
175
    List<TSStatus> procedureStatus = new ArrayList<>();
×
176
    boolean isSucceed = waitingProcedureFinished(procedureIds, procedureStatus);
×
177
    // clear the previously deleted regions
178
    final PartitionManager partitionManager = getConfigManager().getPartitionManager();
×
179
    partitionManager.getRegionMaintainer().submit(partitionManager::maintainRegionReplicas);
×
180
    if (isSucceed) {
×
181
      return StatusUtils.OK;
×
182
    } else {
183
      return RpcUtils.getStatus(procedureStatus);
×
184
    }
185
  }
186

187
  public TSStatus deleteTimeSeries(TDeleteTimeSeriesReq req) {
188
    String queryId = req.getQueryId();
×
189
    PathPatternTree patternTree =
×
190
        PathPatternTree.deserialize(ByteBuffer.wrap(req.getPathPatternTree()));
×
191
    long procedureId = -1;
×
192
    synchronized (this) {
×
193
      boolean hasOverlappedTask = false;
×
194
      ProcedureType type;
195
      DeleteTimeSeriesProcedure deleteTimeSeriesProcedure;
196
      for (Procedure<?> procedure : executor.getProcedures().values()) {
×
197
        type = ProcedureFactory.getProcedureType(procedure);
×
198
        if (type == null || !type.equals(ProcedureType.DELETE_TIMESERIES_PROCEDURE)) {
×
199
          continue;
×
200
        }
201
        deleteTimeSeriesProcedure = ((DeleteTimeSeriesProcedure) procedure);
×
202
        if (queryId.equals(deleteTimeSeriesProcedure.getQueryId())) {
×
203
          procedureId = deleteTimeSeriesProcedure.getProcId();
×
204
          break;
×
205
        }
206
        if (patternTree.isOverlapWith(deleteTimeSeriesProcedure.getPatternTree())) {
×
207
          hasOverlappedTask = true;
×
208
          break;
×
209
        }
210
      }
×
211

212
      if (procedureId == -1) {
×
213
        if (hasOverlappedTask) {
×
214
          return RpcUtils.getStatus(
×
215
              TSStatusCode.OVERLAP_WITH_EXISTING_TASK,
216
              "Some other task is deleting some target timeseries.");
217
        }
218
        procedureId =
×
219
            this.executor.submitProcedure(new DeleteTimeSeriesProcedure(queryId, patternTree));
×
220
      }
221
    }
×
222
    List<TSStatus> procedureStatus = new ArrayList<>();
×
223
    boolean isSucceed =
×
224
        waitingProcedureFinished(Collections.singletonList(procedureId), procedureStatus);
×
225
    if (isSucceed) {
×
226
      return StatusUtils.OK;
×
227
    } else {
228
      return procedureStatus.get(0);
×
229
    }
230
  }
231

232
  public TSStatus deleteLogicalView(TDeleteLogicalViewReq req) {
233
    String queryId = req.getQueryId();
×
234
    PathPatternTree patternTree =
×
235
        PathPatternTree.deserialize(ByteBuffer.wrap(req.getPathPatternTree()));
×
236
    long procedureId = -1;
×
237
    synchronized (this) {
×
238
      boolean hasOverlappedTask = false;
×
239
      ProcedureType type;
240
      DeleteLogicalViewProcedure deleteLogicalViewProcedure;
241
      for (Procedure<?> procedure : executor.getProcedures().values()) {
×
242
        type = ProcedureFactory.getProcedureType(procedure);
×
243
        if (type == null || !type.equals(ProcedureType.DELETE_LOGICAL_VIEW_PROCEDURE)) {
×
244
          continue;
×
245
        }
246
        deleteLogicalViewProcedure = ((DeleteLogicalViewProcedure) procedure);
×
247
        if (queryId.equals(deleteLogicalViewProcedure.getQueryId())) {
×
248
          procedureId = deleteLogicalViewProcedure.getProcId();
×
249
          break;
×
250
        }
251
        if (patternTree.isOverlapWith(deleteLogicalViewProcedure.getPatternTree())) {
×
252
          hasOverlappedTask = true;
×
253
          break;
×
254
        }
255
      }
×
256

257
      if (procedureId == -1) {
×
258
        if (hasOverlappedTask) {
×
259
          return RpcUtils.getStatus(
×
260
              TSStatusCode.OVERLAP_WITH_EXISTING_TASK,
261
              "Some other task is deleting some target views.");
262
        }
263
        procedureId =
×
264
            this.executor.submitProcedure(new DeleteLogicalViewProcedure(queryId, patternTree));
×
265
      }
266
    }
×
267
    List<TSStatus> procedureStatus = new ArrayList<>();
×
268
    boolean isSucceed =
×
269
        waitingProcedureFinished(Collections.singletonList(procedureId), procedureStatus);
×
270
    if (isSucceed) {
×
271
      return StatusUtils.OK;
×
272
    } else {
273
      return procedureStatus.get(0);
×
274
    }
275
  }
276

277
  public TSStatus alterLogicalView(TAlterLogicalViewReq req) {
278
    String queryId = req.getQueryId();
×
279
    ByteBuffer byteBuffer = ByteBuffer.wrap(req.getViewBinary());
×
280
    Map<PartialPath, ViewExpression> viewPathToSourceMap = new HashMap<>();
×
281
    int size = byteBuffer.getInt();
×
282
    PartialPath path;
283
    ViewExpression viewExpression;
284
    for (int i = 0; i < size; i++) {
×
285
      path = (PartialPath) PathDeserializeUtil.deserialize(byteBuffer);
×
286
      viewExpression = ViewExpression.deserialize(byteBuffer);
×
287
      viewPathToSourceMap.put(path, viewExpression);
×
288
    }
289

290
    long procedureId = -1;
×
291
    synchronized (this) {
×
292
      ProcedureType type;
293
      AlterLogicalViewProcedure alterLogicalViewProcedure;
294
      for (Procedure<?> procedure : executor.getProcedures().values()) {
×
295
        type = ProcedureFactory.getProcedureType(procedure);
×
296
        if (type == null || !type.equals(ProcedureType.ALTER_LOGICAL_VIEW_PROCEDURE)) {
×
297
          continue;
×
298
        }
299
        alterLogicalViewProcedure = ((AlterLogicalViewProcedure) procedure);
×
300
        if (queryId.equals(alterLogicalViewProcedure.getQueryId())) {
×
301
          procedureId = alterLogicalViewProcedure.getProcId();
×
302
          break;
×
303
        }
304
      }
×
305

306
      if (procedureId == -1) {
×
307
        procedureId =
×
308
            this.executor.submitProcedure(
×
309
                new AlterLogicalViewProcedure(queryId, viewPathToSourceMap));
310
      }
311
    }
×
312
    List<TSStatus> procedureStatus = new ArrayList<>();
×
313
    boolean isSucceed =
×
314
        waitingProcedureFinished(Collections.singletonList(procedureId), procedureStatus);
×
315
    if (isSucceed) {
×
316
      return StatusUtils.OK;
×
317
    } else {
318
      return procedureStatus.get(0);
×
319
    }
320
  }
321

322
  public TSStatus setSchemaTemplate(String queryId, String templateName, String templateSetPath) {
323
    long procedureId = -1;
×
324
    synchronized (this) {
×
325
      boolean hasOverlappedTask = false;
×
326
      ProcedureType type;
327
      SetTemplateProcedure setTemplateProcedure;
328
      for (Procedure<?> procedure : executor.getProcedures().values()) {
×
329
        type = ProcedureFactory.getProcedureType(procedure);
×
330
        if (type == null || !type.equals(ProcedureType.SET_TEMPLATE_PROCEDURE)) {
×
331
          continue;
×
332
        }
333
        setTemplateProcedure = (SetTemplateProcedure) procedure;
×
334
        if (queryId.equals(setTemplateProcedure.getQueryId())) {
×
335
          procedureId = setTemplateProcedure.getProcId();
×
336
          break;
×
337
        }
338
        if (templateSetPath.equals(setTemplateProcedure.getTemplateSetPath())) {
×
339
          hasOverlappedTask = true;
×
340
          break;
×
341
        }
342
      }
×
343

344
      if (procedureId == -1) {
×
345
        if (hasOverlappedTask) {
×
346
          return RpcUtils.getStatus(
×
347
              TSStatusCode.OVERLAP_WITH_EXISTING_TASK,
348
              "Some other task is setting template on target path.");
349
        }
350
        procedureId =
×
351
            this.executor.submitProcedure(
×
352
                new SetTemplateProcedure(queryId, templateName, templateSetPath));
353
      }
354
    }
×
355
    List<TSStatus> procedureStatus = new ArrayList<>();
×
356
    boolean isSucceed =
×
357
        waitingProcedureFinished(Collections.singletonList(procedureId), procedureStatus);
×
358
    if (isSucceed) {
×
359
      return StatusUtils.OK;
×
360
    } else {
361
      return procedureStatus.get(0);
×
362
    }
363
  }
364

365
  public TSStatus deactivateTemplate(
366
      String queryId, Map<PartialPath, List<Template>> templateSetInfo) {
367
    long procedureId = -1;
×
368
    synchronized (this) {
×
369
      boolean hasOverlappedTask = false;
×
370
      ProcedureType type;
371
      DeactivateTemplateProcedure deactivateTemplateProcedure;
372
      for (Procedure<?> procedure : executor.getProcedures().values()) {
×
373
        type = ProcedureFactory.getProcedureType(procedure);
×
374
        if (type == null || !type.equals(ProcedureType.DEACTIVATE_TEMPLATE_PROCEDURE)) {
×
375
          continue;
×
376
        }
377
        deactivateTemplateProcedure = (DeactivateTemplateProcedure) procedure;
×
378
        if (queryId.equals(deactivateTemplateProcedure.getQueryId())) {
×
379
          procedureId = deactivateTemplateProcedure.getProcId();
×
380
          break;
×
381
        }
382
        for (PartialPath pattern : templateSetInfo.keySet()) {
×
383
          for (PartialPath existingPattern :
384
              deactivateTemplateProcedure.getTemplateSetInfo().keySet()) {
×
385
            if (pattern.overlapWith(existingPattern)) {
×
386
              hasOverlappedTask = true;
×
387
              break;
×
388
            }
389
          }
×
390
          if (hasOverlappedTask) {
×
391
            break;
×
392
          }
393
        }
×
394
        if (hasOverlappedTask) {
×
395
          break;
×
396
        }
397
      }
×
398

399
      if (procedureId == -1) {
×
400
        if (hasOverlappedTask) {
×
401
          return RpcUtils.getStatus(
×
402
              TSStatusCode.OVERLAP_WITH_EXISTING_TASK,
403
              "Some other task is deactivating some target template from target path.");
404
        }
405
        procedureId =
×
406
            this.executor.submitProcedure(
×
407
                new DeactivateTemplateProcedure(queryId, templateSetInfo));
408
      }
409
    }
×
410
    List<TSStatus> procedureStatus = new ArrayList<>();
×
411
    boolean isSucceed =
×
412
        waitingProcedureFinished(Collections.singletonList(procedureId), procedureStatus);
×
413
    if (isSucceed) {
×
414
      return StatusUtils.OK;
×
415
    } else {
416
      return procedureStatus.get(0);
×
417
    }
418
  }
419

420
  public TSStatus unsetSchemaTemplate(String queryId, Template template, PartialPath path) {
421
    long procedureId = -1;
×
422
    synchronized (this) {
×
423
      boolean hasOverlappedTask = false;
×
424
      ProcedureType type;
425
      UnsetTemplateProcedure unsetTemplateProcedure;
426
      for (Procedure<?> procedure : executor.getProcedures().values()) {
×
427
        type = ProcedureFactory.getProcedureType(procedure);
×
428
        if (type == null || !type.equals(ProcedureType.UNSET_TEMPLATE_PROCEDURE)) {
×
429
          continue;
×
430
        }
431
        unsetTemplateProcedure = (UnsetTemplateProcedure) procedure;
×
432
        if (queryId.equals(unsetTemplateProcedure.getQueryId())) {
×
433
          procedureId = unsetTemplateProcedure.getProcId();
×
434
          break;
×
435
        }
436
        if (template.getId() == unsetTemplateProcedure.getTemplateId()
×
437
            && path.equals(unsetTemplateProcedure.getPath())) {
×
438
          hasOverlappedTask = true;
×
439
          break;
×
440
        }
441
      }
×
442

443
      if (procedureId == -1) {
×
444
        if (hasOverlappedTask) {
×
445
          return RpcUtils.getStatus(
×
446
              TSStatusCode.OVERLAP_WITH_EXISTING_TASK,
447
              "Some other task is unsetting target template from target path "
448
                  + path.getFullPath());
×
449
        }
450
        procedureId =
×
451
            this.executor.submitProcedure(new UnsetTemplateProcedure(queryId, template, path));
×
452
      }
453
    }
×
454
    List<TSStatus> procedureStatus = new ArrayList<>();
×
455
    boolean isSucceed =
×
456
        waitingProcedureFinished(Collections.singletonList(procedureId), procedureStatus);
×
457
    if (isSucceed) {
×
458
      return StatusUtils.OK;
×
459
    } else {
460
      return procedureStatus.get(0);
×
461
    }
462
  }
463

464
  /**
465
   * Generate an {@link AddConfigNodeProcedure}, and serially execute all the {@link
466
   * AddConfigNodeProcedure}s.
467
   */
468
  public void addConfigNode(TConfigNodeRegisterReq req) {
469
    AddConfigNodeProcedure addConfigNodeProcedure =
×
470
        new AddConfigNodeProcedure(req.getConfigNodeLocation(), req.getBuildInfo());
×
471
    this.executor.submitProcedure(addConfigNodeProcedure);
×
472
  }
×
473

474
  /**
475
   * Generate a {@link RemoveConfigNodeProcedure}, and serially execute all the {@link
476
   * RemoveConfigNodeProcedure}s.
477
   */
478
  public void removeConfigNode(RemoveConfigNodePlan removeConfigNodePlan) {
479
    RemoveConfigNodeProcedure removeConfigNodeProcedure =
×
480
        new RemoveConfigNodeProcedure(removeConfigNodePlan.getConfigNodeLocation());
×
481
    this.executor.submitProcedure(removeConfigNodeProcedure);
×
482
    LOGGER.info("Submit RemoveConfigNodeProcedure successfully: {}", removeConfigNodePlan);
×
483
  }
×
484

485
  /**
486
   * Generate {@link RemoveDataNodeProcedure}s, and serially execute all the {@link
487
   * RemoveDataNodeProcedure}s.
488
   */
489
  public boolean removeDataNode(RemoveDataNodePlan removeDataNodePlan) {
490
    removeDataNodePlan
×
491
        .getDataNodeLocations()
×
492
        .forEach(
×
493
            tDataNodeLocation -> {
494
              this.executor.submitProcedure(new RemoveDataNodeProcedure(tDataNodeLocation));
×
495
              LOGGER.info("Submit RemoveDataNodeProcedure successfully, {}", tDataNodeLocation);
×
496
            });
×
497
    return true;
×
498
  }
499

500
  public TSStatus migrateRegion(TMigrateRegionReq migrateRegionReq) {
501
    TConsensusGroupId regionGroupId;
502
    if (configManager
×
503
        .getPartitionManager()
×
504
        .isRegionGroupExists(
×
505
            new TConsensusGroupId(
506
                TConsensusGroupType.SchemaRegion, migrateRegionReq.getRegionId()))) {
×
507
      regionGroupId =
×
508
          new TConsensusGroupId(TConsensusGroupType.SchemaRegion, migrateRegionReq.getRegionId());
×
509
    } else if (configManager
×
510
        .getPartitionManager()
×
511
        .isRegionGroupExists(
×
512
            new TConsensusGroupId(
513
                TConsensusGroupType.DataRegion, migrateRegionReq.getRegionId()))) {
×
514
      regionGroupId =
×
515
          new TConsensusGroupId(TConsensusGroupType.DataRegion, migrateRegionReq.getRegionId());
×
516
    } else {
517
      LOGGER.warn(
×
518
          "Submit RegionMigrateProcedure failed, because RegionGroup: {} doesn't exist",
519
          migrateRegionReq.getRegionId());
×
520
      TSStatus status = new TSStatus(TSStatusCode.MIGRATE_REGION_ERROR.getStatusCode());
×
521
      status.setMessage(
×
522
          String.format(
×
523
              "Submit RegionMigrateProcedure failed, because RegionGroup: %s doesn't exist",
524
              migrateRegionReq.getRegionId()));
×
525
      return status;
×
526
    }
527

528
    TDataNodeLocation originalDataNode =
×
529
        configManager
530
            .getNodeManager()
×
531
            .getRegisteredDataNode(migrateRegionReq.getFromId())
×
532
            .getLocation();
×
533
    TDataNodeLocation destDataNode =
×
534
        configManager
535
            .getNodeManager()
×
536
            .getRegisteredDataNode(migrateRegionReq.getToId())
×
537
            .getLocation();
×
538

539
    if (originalDataNode == null) {
×
540
      LOGGER.warn(
×
541
          "Submit RegionMigrateProcedure failed, because no original DataNode {}",
542
          migrateRegionReq.getFromId());
×
543
      TSStatus status = new TSStatus(TSStatusCode.MIGRATE_REGION_ERROR.getStatusCode());
×
544
      status.setMessage(
×
545
          "Submit RegionMigrateProcedure failed, because no original DataNode "
546
              + migrateRegionReq.getFromId());
×
547
      return status;
×
548
    } else if (destDataNode == null) {
×
549
      LOGGER.warn(
×
550
          "Submit RegionMigrateProcedure failed, because no target DataNode {}",
551
          migrateRegionReq.getToId());
×
552
      TSStatus status = new TSStatus(TSStatusCode.MIGRATE_REGION_ERROR.getStatusCode());
×
553
      status.setMessage(
×
554
          "Submit RegionMigrateProcedure failed, because no target DataNode "
555
              + migrateRegionReq.getToId());
×
556
      return status;
×
557
    } else if (configManager.getPartitionManager()
×
558
        .getAllReplicaSets(originalDataNode.getDataNodeId()).stream()
×
559
        .noneMatch(replicaSet -> replicaSet.getRegionId().equals(regionGroupId))) {
×
560
      LOGGER.warn(
×
561
          "Submit RegionMigrateProcedure failed, because the original DataNode {} doesn't contain Region {}",
562
          migrateRegionReq.getFromId(),
×
563
          migrateRegionReq.getRegionId());
×
564
      TSStatus status = new TSStatus(TSStatusCode.MIGRATE_REGION_ERROR.getStatusCode());
×
565
      status.setMessage(
×
566
          "Submit RegionMigrateProcedure failed, because the original DataNode "
567
              + migrateRegionReq.getFromId()
×
568
              + " doesn't contain Region "
569
              + migrateRegionReq.getRegionId());
×
570
      return status;
×
571
    } else if (configManager.getPartitionManager().getAllReplicaSets(destDataNode.getDataNodeId())
×
572
        .stream()
×
573
        .anyMatch(replicaSet -> replicaSet.getRegionId().equals(regionGroupId))) {
×
574
      LOGGER.warn(
×
575
          "Submit RegionMigrateProcedure failed, because the target DataNode {} already contains Region {}",
576
          migrateRegionReq.getToId(),
×
577
          migrateRegionReq.getRegionId());
×
578
      TSStatus status = new TSStatus(TSStatusCode.MIGRATE_REGION_ERROR.getStatusCode());
×
579
      status.setMessage(
×
580
          "Submit RegionMigrateProcedure failed, because the target DataNode "
581
              + migrateRegionReq.getToId()
×
582
              + " already contains Region "
583
              + migrateRegionReq.getRegionId());
×
584
      return status;
×
585
    }
586
    // Here we only check Running DataNode to implement migration, because removing nodes may not
587
    // exist when add peer is performing
588
    Set<Integer> aliveDataNodes =
×
589
        configManager.getNodeManager().filterDataNodeThroughStatus(NodeStatus.Running).stream()
×
590
            .map(TDataNodeConfiguration::getLocation)
×
591
            .map(TDataNodeLocation::getDataNodeId)
×
592
            .collect(Collectors.toSet());
×
593
    if (NodeStatus.Unknown.equals(
×
594
        configManager.getLoadManager().getNodeStatus(migrateRegionReq.getFromId()))) {
×
595
      LOGGER.warn(
×
596
          "Submit RegionMigrateProcedure failed, because the sourceDataNode {} is Unknown.",
597
          migrateRegionReq.getFromId());
×
598
      TSStatus status = new TSStatus(TSStatusCode.MIGRATE_REGION_ERROR.getStatusCode());
×
599
      status.setMessage(
×
600
          "Submit RegionMigrateProcedure failed, because the sourceDataNode "
601
              + migrateRegionReq.getFromId()
×
602
              + " is Unknown.");
603
      return status;
×
604
    }
605

606
    if (!aliveDataNodes.contains(migrateRegionReq.getToId())) {
×
607
      LOGGER.warn(
×
608
          "Submit RegionMigrateProcedure failed, because the destDataNode {} is ReadOnly or Unknown.",
609
          migrateRegionReq.getToId());
×
610
      TSStatus status = new TSStatus(TSStatusCode.MIGRATE_REGION_ERROR.getStatusCode());
×
611
      status.setMessage(
×
612
          "Submit RegionMigrateProcedure failed, because the destDataNode "
613
              + migrateRegionReq.getToId()
×
614
              + " is ReadOnly or Unknown.");
615
      return status;
×
616
    }
617
    this.executor.submitProcedure(
×
618
        new RegionMigrateProcedure(regionGroupId, originalDataNode, destDataNode));
619
    LOGGER.info(
×
620
        "Submit RegionMigrateProcedure successfully, Region: {}, From: {}, To: {}",
621
        migrateRegionReq.getRegionId(),
×
622
        migrateRegionReq.getFromId(),
×
623
        migrateRegionReq.getToId());
×
624
    return new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode());
×
625
  }
626

627
  /**
628
   * Generate CreateRegionGroupsProcedure and wait for it finished.
629
   *
630
   * @return SUCCESS_STATUS if all RegionGroups created successfully, CREATE_REGION_ERROR otherwise
631
   */
632
  public TSStatus createRegionGroups(
633
      TConsensusGroupType consensusGroupType, CreateRegionGroupsPlan createRegionGroupsPlan) {
634
    long procedureId =
×
635
        executor.submitProcedure(
×
636
            new CreateRegionGroupsProcedure(consensusGroupType, createRegionGroupsPlan));
637
    List<TSStatus> statusList = new ArrayList<>();
×
638
    boolean isSucceed =
×
639
        waitingProcedureFinished(Collections.singletonList(procedureId), statusList);
×
640
    if (isSucceed) {
×
641
      return RpcUtils.SUCCESS_STATUS;
×
642
    } else {
643
      return new TSStatus(TSStatusCode.CREATE_REGION_ERROR.getStatusCode())
×
644
          .setMessage(statusList.get(0).getMessage());
×
645
    }
646
  }
647

648
  /**
649
   * Generate CreateTriggerProcedure and wait for it finished.
650
   *
651
   * @return SUCCESS_STATUS if trigger created successfully, CREATE_TRIGGER_ERROR otherwise
652
   */
653
  public TSStatus createTrigger(TriggerInformation triggerInformation, Binary jarFile) {
654
    final CreateTriggerProcedure createTriggerProcedure =
×
655
        new CreateTriggerProcedure(triggerInformation, jarFile);
656
    try {
657
      if (jarFile != null
×
658
          && new UpdateProcedurePlan(createTriggerProcedure).getSerializedSize() > planSizeLimit) {
×
659
        return new TSStatus(TSStatusCode.CREATE_TRIGGER_ERROR.getStatusCode())
×
660
            .setMessage(
×
661
                String.format(
×
662
                    "Fail to create trigger[%s], the size of Jar is too large, you can increase the value of property 'config_node_ratis_log_appender_buffer_size_max' on ConfigNode",
663
                    triggerInformation.getTriggerName()));
×
664
      }
665
    } catch (IOException e) {
×
666
      return new TSStatus(TSStatusCode.CREATE_TRIGGER_ERROR.getStatusCode())
×
667
          .setMessage(e.getMessage());
×
668
    }
×
669

670
    long procedureId = executor.submitProcedure(createTriggerProcedure);
×
671
    List<TSStatus> statusList = new ArrayList<>();
×
672
    boolean isSucceed =
×
673
        waitingProcedureFinished(Collections.singletonList(procedureId), statusList);
×
674
    if (isSucceed) {
×
675
      return RpcUtils.SUCCESS_STATUS;
×
676
    } else {
677
      return new TSStatus(TSStatusCode.CREATE_TRIGGER_ERROR.getStatusCode())
×
678
          .setMessage(statusList.get(0).getMessage());
×
679
    }
680
  }
681

682
  /**
683
   * Generate DropTriggerProcedure and wait for it finished.
684
   *
685
   * @return SUCCESS_STATUS if trigger dropped successfully, DROP_TRIGGER_ERROR otherwise
686
   */
687
  public TSStatus dropTrigger(String triggerName) {
688
    long procedureId = executor.submitProcedure(new DropTriggerProcedure(triggerName));
×
689
    List<TSStatus> statusList = new ArrayList<>();
×
690
    boolean isSucceed =
×
691
        waitingProcedureFinished(Collections.singletonList(procedureId), statusList);
×
692
    if (isSucceed) {
×
693
      return RpcUtils.SUCCESS_STATUS;
×
694
    } else {
695
      return new TSStatus(TSStatusCode.DROP_TRIGGER_ERROR.getStatusCode())
×
696
          .setMessage(statusList.get(0).getMessage());
×
697
    }
698
  }
699

700
  public TSStatus createCQ(TCreateCQReq req, ScheduledExecutorService scheduledExecutor) {
701
    long procedureId = executor.submitProcedure(new CreateCQProcedure(req, scheduledExecutor));
×
702
    List<TSStatus> statusList = new ArrayList<>();
×
703
    waitingProcedureFinished(Collections.singletonList(procedureId), statusList);
×
704
    return statusList.get(0);
×
705
  }
706

707
  public TSStatus createModel(ModelInformation modelInformation, Map<String, String> modelConfigs) {
708
    long procedureId =
×
709
        executor.submitProcedure(new CreateModelProcedure(modelInformation, modelConfigs));
×
710
    List<TSStatus> statusList = new ArrayList<>();
×
711
    boolean isSucceed =
×
712
        waitingProcedureFinished(Collections.singletonList(procedureId), statusList);
×
713
    if (isSucceed) {
×
714
      return RpcUtils.SUCCESS_STATUS;
×
715
    } else {
716
      return new TSStatus(TSStatusCode.CREATE_MODEL_ERROR.getStatusCode())
×
717
          .setMessage(statusList.get(0).getMessage());
×
718
    }
719
  }
720

721
  public TSStatus dropModel(String modelId) {
722
    long procedureId = executor.submitProcedure(new DropModelProcedure(modelId));
×
723
    List<TSStatus> statusList = new ArrayList<>();
×
724
    boolean isSucceed =
×
725
        waitingProcedureFinished(Collections.singletonList(procedureId), statusList);
×
726
    if (isSucceed) {
×
727
      return RpcUtils.SUCCESS_STATUS;
×
728
    } else {
729
      return new TSStatus(TSStatusCode.DROP_MODEL_ERROR.getStatusCode())
×
730
          .setMessage(statusList.get(0).getMessage());
×
731
    }
732
  }
733

734
  public TSStatus createPipePlugin(PipePluginMeta pipePluginMeta, byte[] jarFile) {
735
    final CreatePipePluginProcedure createPipePluginProcedure =
×
736
        new CreatePipePluginProcedure(pipePluginMeta, jarFile);
737
    try {
738
      if (jarFile != null
×
739
          && new UpdateProcedurePlan(createPipePluginProcedure).getSerializedSize()
×
740
              > planSizeLimit) {
741
        return new TSStatus(TSStatusCode.CREATE_PIPE_PLUGIN_ERROR.getStatusCode())
×
742
            .setMessage(
×
743
                String.format(
×
744
                    "Fail to create pipe plugin[%s], the size of Jar is too large, you can increase the value of property 'config_node_ratis_log_appender_buffer_size_max' on ConfigNode",
745
                    pipePluginMeta.getPluginName()));
×
746
      }
747
    } catch (IOException e) {
×
748
      return new TSStatus(TSStatusCode.CREATE_PIPE_PLUGIN_ERROR.getStatusCode())
×
749
          .setMessage(e.getMessage());
×
750
    }
×
751

752
    long procedureId = executor.submitProcedure(createPipePluginProcedure);
×
753
    List<TSStatus> statusList = new ArrayList<>();
×
754
    boolean isSucceed =
×
755
        waitingProcedureFinished(Collections.singletonList(procedureId), statusList);
×
756
    if (isSucceed) {
×
757
      return RpcUtils.SUCCESS_STATUS;
×
758
    } else {
759
      return new TSStatus(TSStatusCode.CREATE_PIPE_PLUGIN_ERROR.getStatusCode())
×
760
          .setMessage(statusList.get(0).getMessage());
×
761
    }
762
  }
763

764
  public TSStatus dropPipePlugin(String pluginName) {
765
    long procedureId = executor.submitProcedure(new DropPipePluginProcedure(pluginName));
×
766
    List<TSStatus> statusList = new ArrayList<>();
×
767
    boolean isSucceed =
×
768
        waitingProcedureFinished(Collections.singletonList(procedureId), statusList);
×
769
    if (isSucceed) {
×
770
      return RpcUtils.SUCCESS_STATUS;
×
771
    } else {
772
      return new TSStatus(TSStatusCode.DROP_PIPE_PLUGIN_ERROR.getStatusCode())
×
773
          .setMessage(statusList.get(0).getMessage());
×
774
    }
775
  }
776

777
  public TSStatus createPipe(TCreatePipeReq req) {
778
    try {
779
      long procedureId = executor.submitProcedure(new CreatePipeProcedureV2(req));
×
780
      List<TSStatus> statusList = new ArrayList<>();
×
781
      boolean isSucceed =
×
782
          waitingProcedureFinished(Collections.singletonList(procedureId), statusList);
×
783
      if (isSucceed) {
×
784
        return RpcUtils.SUCCESS_STATUS;
×
785
      } else {
786
        return new TSStatus(TSStatusCode.PIPE_ERROR.getStatusCode())
×
787
            .setMessage(statusList.get(0).getMessage());
×
788
      }
789
    } catch (Exception e) {
×
790
      return new TSStatus(TSStatusCode.PIPE_ERROR.getStatusCode()).setMessage(e.getMessage());
×
791
    }
792
  }
793

794
  public TSStatus startPipe(String pipeName) {
795
    try {
796
      long procedureId = executor.submitProcedure(new StartPipeProcedureV2(pipeName));
×
797
      List<TSStatus> statusList = new ArrayList<>();
×
798
      boolean isSucceed =
×
799
          waitingProcedureFinished(Collections.singletonList(procedureId), statusList);
×
800
      if (isSucceed) {
×
801
        return RpcUtils.SUCCESS_STATUS;
×
802
      } else {
803
        return new TSStatus(TSStatusCode.PIPE_ERROR.getStatusCode())
×
804
            .setMessage(statusList.get(0).getMessage());
×
805
      }
806
    } catch (Exception e) {
×
807
      return new TSStatus(TSStatusCode.PIPE_ERROR.getStatusCode()).setMessage(e.getMessage());
×
808
    }
809
  }
810

811
  public TSStatus stopPipe(String pipeName) {
812
    try {
813
      long procedureId = executor.submitProcedure(new StopPipeProcedureV2(pipeName));
×
814
      List<TSStatus> statusList = new ArrayList<>();
×
815
      boolean isSucceed =
×
816
          waitingProcedureFinished(Collections.singletonList(procedureId), statusList);
×
817
      if (isSucceed) {
×
818
        return RpcUtils.SUCCESS_STATUS;
×
819
      } else {
820
        return new TSStatus(TSStatusCode.PIPE_ERROR.getStatusCode())
×
821
            .setMessage(statusList.get(0).getMessage());
×
822
      }
823
    } catch (Exception e) {
×
824
      return new TSStatus(TSStatusCode.PIPE_ERROR.getStatusCode()).setMessage(e.getMessage());
×
825
    }
826
  }
827

828
  public TSStatus dropPipe(String pipeName) {
829
    try {
830
      long procedureId = executor.submitProcedure(new DropPipeProcedureV2(pipeName));
×
831
      List<TSStatus> statusList = new ArrayList<>();
×
832
      boolean isSucceed =
×
833
          waitingProcedureFinished(Collections.singletonList(procedureId), statusList);
×
834
      if (isSucceed) {
×
835
        return RpcUtils.SUCCESS_STATUS;
×
836
      } else {
837
        return new TSStatus(TSStatusCode.PIPE_ERROR.getStatusCode())
×
838
            .setMessage(statusList.get(0).getMessage());
×
839
      }
840
    } catch (Exception e) {
×
841
      return new TSStatus(TSStatusCode.PIPE_ERROR.getStatusCode()).setMessage(e.getMessage());
×
842
    }
843
  }
844

845
  public void pipeHandleLeaderChange(
846
      Map<TConsensusGroupId, Pair<Integer, Integer>> dataRegionGroupToOldAndNewLeaderPairMap) {
847
    try {
848
      final long procedureId =
×
849
          executor.submitProcedure(
×
850
              new PipeHandleLeaderChangeProcedure(dataRegionGroupToOldAndNewLeaderPairMap));
851
      LOGGER.info("PipeHandleLeaderChangeProcedure was submitted, procedureId: {}.", procedureId);
×
852
    } catch (Exception e) {
×
853
      LOGGER.warn("PipeHandleLeaderChangeProcedure was failed to submit.", e);
×
854
    }
×
855
  }
×
856

857
  public void pipeHandleMetaChange(
858
      boolean needWriteConsensusOnConfigNodes, boolean needPushPipeMetaToDataNodes) {
859
    try {
860
      final long procedureId =
×
861
          executor.submitProcedure(
×
862
              new PipeHandleMetaChangeProcedure(
863
                  needWriteConsensusOnConfigNodes, needPushPipeMetaToDataNodes));
864
      LOGGER.info("PipeHandleMetaChangeProcedure was submitted, procedureId: {}.", procedureId);
×
865
    } catch (Exception e) {
×
866
      LOGGER.warn("PipeHandleMetaChangeProcedure was failed to submit.", e);
×
867
    }
×
868
  }
×
869

870
  public TSStatus pipeMetaSync() {
871
    try {
872
      final long procedureId = executor.submitProcedure(new PipeMetaSyncProcedure());
×
873
      final List<TSStatus> statusList = new ArrayList<>();
×
874
      final boolean isSucceed =
×
875
          waitingProcedureFinished(Collections.singletonList(procedureId), statusList);
×
876
      if (isSucceed) {
×
877
        return RpcUtils.SUCCESS_STATUS;
×
878
      } else {
879
        return new TSStatus(TSStatusCode.PIPE_ERROR.getStatusCode())
×
880
            .setMessage(statusList.get(0).getMessage());
×
881
      }
882
    } catch (Exception e) {
×
883
      return new TSStatus(TSStatusCode.PIPE_ERROR.getStatusCode()).setMessage(e.getMessage());
×
884
    }
885
  }
886

887
  /**
888
   * Waiting until the specific procedures finished.
889
   *
890
   * @param procedureIds The specific procedures' index
891
   * @param statusList The corresponding running results of these procedures
892
   * @return True if all Procedures finished successfully, false otherwise
893
   */
894
  private boolean waitingProcedureFinished(List<Long> procedureIds, List<TSStatus> statusList) {
895
    boolean isSucceed = true;
×
896
    for (long procedureId : procedureIds) {
×
897
      long startTimeForCurrentProcedure = System.currentTimeMillis();
×
898
      while (executor.isRunning()
×
899
          && !executor.isFinished(procedureId)
×
900
          && TimeUnit.MILLISECONDS.toSeconds(
×
901
                  System.currentTimeMillis() - startTimeForCurrentProcedure)
×
902
              < PROCEDURE_WAIT_TIME_OUT) {
903
        sleepWithoutInterrupt(PROCEDURE_WAIT_RETRY_TIMEOUT);
×
904
      }
905
      Procedure<ConfigNodeProcedureEnv> finishedProcedure =
×
906
          executor.getResultOrProcedure(procedureId);
×
907
      if (!finishedProcedure.isFinished()) {
×
908
        // the procedure is still executing
909
        statusList.add(RpcUtils.getStatus(TSStatusCode.OVERLAP_WITH_EXISTING_TASK));
×
910
        isSucceed = false;
×
911
        continue;
×
912
      }
913
      if (finishedProcedure.isSuccess()) {
×
914
        statusList.add(StatusUtils.OK);
×
915
      } else {
916
        if (finishedProcedure.getException().getCause() instanceof IoTDBException) {
×
917
          IoTDBException e = (IoTDBException) finishedProcedure.getException().getCause();
×
918
          if (e instanceof BatchProcessException) {
×
919
            statusList.add(
×
920
                RpcUtils.getStatus(
×
921
                    Arrays.stream(((BatchProcessException) e).getFailingStatus())
×
922
                        .collect(Collectors.toList())));
×
923
          } else {
924
            statusList.add(RpcUtils.getStatus(e.getErrorCode(), e.getMessage()));
×
925
          }
926
        } else {
×
927
          statusList.add(
×
928
              StatusUtils.EXECUTE_STATEMENT_ERROR.setMessage(
×
929
                  finishedProcedure.getException().getMessage()));
×
930
        }
931
        isSucceed = false;
×
932
      }
933
    }
×
934
    return isSucceed;
×
935
  }
936

937
  public static void sleepWithoutInterrupt(final long timeToSleep) {
938
    long currentTime = System.currentTimeMillis();
×
939
    long endTime = timeToSleep + currentTime;
×
940
    boolean interrupted = false;
×
941
    while (currentTime < endTime) {
×
942
      try {
943
        Thread.sleep(endTime - currentTime);
×
944
      } catch (InterruptedException e) {
×
945
        interrupted = true;
×
946
      }
×
947
      currentTime = System.currentTimeMillis();
×
948
    }
949
    if (interrupted) {
×
950
      Thread.currentThread().interrupt();
×
951
    }
952
  }
×
953

954
  // ======================================================
955
  /*
956
     GET-SET Region
957
  */
958
  // ======================================================
959
  public IManager getConfigManager() {
960
    return configManager;
×
961
  }
962

963
  public ProcedureExecutor<ConfigNodeProcedureEnv> getExecutor() {
964
    return executor;
×
965
  }
966

967
  public void setExecutor(ProcedureExecutor<ConfigNodeProcedureEnv> executor) {
968
    this.executor = executor;
×
969
  }
×
970

971
  public ProcedureScheduler getScheduler() {
972
    return scheduler;
×
973
  }
974

975
  public void setScheduler(ProcedureScheduler scheduler) {
976
    this.scheduler = scheduler;
×
977
  }
×
978

979
  public IProcedureStore getStore() {
980
    return store;
×
981
  }
982

983
  public void setStore(ProcedureStore store) {
984
    this.store = store;
×
985
  }
×
986

987
  public ConfigNodeProcedureEnv getEnv() {
988
    return env;
×
989
  }
990

991
  public void setEnv(ConfigNodeProcedureEnv env) {
992
    this.env = env;
×
993
  }
×
994

995
  public void reportRegionMigrateResult(TRegionMigrateResultReportReq req) {
996

997
    this.executor
×
998
        .getProcedures()
×
999
        .values()
×
1000
        .forEach(
×
1001
            procedure -> {
1002
              if (procedure instanceof RegionMigrateProcedure) {
×
1003
                RegionMigrateProcedure regionMigrateProcedure = (RegionMigrateProcedure) procedure;
×
1004
                if (regionMigrateProcedure.getConsensusGroupId().equals(req.getRegionId())) {
×
1005
                  regionMigrateProcedure.notifyTheRegionMigrateFinished(req);
×
1006
                }
1007
              }
1008
            });
×
1009
  }
×
1010
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc