• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

apache / iotdb / #10019

07 Sep 2023 04:50AM UTC coverage: 47.489% (-0.2%) from 47.655%
#10019

push

travis_ci

web-flow
Pipe: Fix ConcurrentModificationException caused by concurrently iterating through CachedSchemaPatternMatcher.extractors when an PipeHeartbeatEvent is being assigned (#11074)

* try to fix ConcurrentModificationException when assigning PipeHeartbeatEvent

* Update CachedSchemaPatternMatcher.java

---------

Co-authored-by: 马子坤 <55695098+DanielWang2035@users.noreply.github.com>

1 of 1 new or added line in 1 file covered. (100.0%)

80551 of 169622 relevant lines covered (47.49%)

0.47 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

0.0
/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/ProcedureManager.java
1
/*
2
 * Licensed to the Apache Software Foundation (ASF) under one
3
 * or more contributor license agreements.  See the NOTICE file
4
 * distributed with this work for additional information
5
 * regarding copyright ownership.  The ASF licenses this file
6
 * to you under the Apache License, Version 2.0 (the
7
 * "License"); you may not use this file except in compliance
8
 * with the License.  You may obtain a copy of the License at
9
 *
10
 *     http://www.apache.org/licenses/LICENSE-2.0
11
 *
12
 * Unless required by applicable law or agreed to in writing,
13
 * software distributed under the License is distributed on an
14
 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15
 * KIND, either express or implied.  See the License for the
16
 * specific language governing permissions and limitations
17
 * under the License.
18
 */
19

20
package org.apache.iotdb.confignode.manager;
21

22
import org.apache.iotdb.common.rpc.thrift.TConsensusGroupId;
23
import org.apache.iotdb.common.rpc.thrift.TConsensusGroupType;
24
import org.apache.iotdb.common.rpc.thrift.TDataNodeConfiguration;
25
import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation;
26
import org.apache.iotdb.common.rpc.thrift.TSStatus;
27
import org.apache.iotdb.commons.cluster.NodeStatus;
28
import org.apache.iotdb.commons.conf.IoTDBConstant;
29
import org.apache.iotdb.commons.exception.IoTDBException;
30
import org.apache.iotdb.commons.model.ModelInformation;
31
import org.apache.iotdb.commons.path.PartialPath;
32
import org.apache.iotdb.commons.path.PathDeserializeUtil;
33
import org.apache.iotdb.commons.path.PathPatternTree;
34
import org.apache.iotdb.commons.pipe.plugin.meta.PipePluginMeta;
35
import org.apache.iotdb.commons.schema.view.viewExpression.ViewExpression;
36
import org.apache.iotdb.commons.trigger.TriggerInformation;
37
import org.apache.iotdb.commons.utils.StatusUtils;
38
import org.apache.iotdb.confignode.conf.ConfigNodeConfig;
39
import org.apache.iotdb.confignode.conf.ConfigNodeDescriptor;
40
import org.apache.iotdb.confignode.consensus.request.write.confignode.RemoveConfigNodePlan;
41
import org.apache.iotdb.confignode.consensus.request.write.datanode.RemoveDataNodePlan;
42
import org.apache.iotdb.confignode.consensus.request.write.procedure.UpdateProcedurePlan;
43
import org.apache.iotdb.confignode.consensus.request.write.region.CreateRegionGroupsPlan;
44
import org.apache.iotdb.confignode.manager.partition.PartitionManager;
45
import org.apache.iotdb.confignode.persistence.ProcedureInfo;
46
import org.apache.iotdb.confignode.procedure.Procedure;
47
import org.apache.iotdb.confignode.procedure.ProcedureExecutor;
48
import org.apache.iotdb.confignode.procedure.env.ConfigNodeProcedureEnv;
49
import org.apache.iotdb.confignode.procedure.impl.cq.CreateCQProcedure;
50
import org.apache.iotdb.confignode.procedure.impl.model.CreateModelProcedure;
51
import org.apache.iotdb.confignode.procedure.impl.model.DropModelProcedure;
52
import org.apache.iotdb.confignode.procedure.impl.node.AddConfigNodeProcedure;
53
import org.apache.iotdb.confignode.procedure.impl.node.RemoveConfigNodeProcedure;
54
import org.apache.iotdb.confignode.procedure.impl.node.RemoveDataNodeProcedure;
55
import org.apache.iotdb.confignode.procedure.impl.pipe.plugin.CreatePipePluginProcedure;
56
import org.apache.iotdb.confignode.procedure.impl.pipe.plugin.DropPipePluginProcedure;
57
import org.apache.iotdb.confignode.procedure.impl.pipe.runtime.PipeHandleLeaderChangeProcedure;
58
import org.apache.iotdb.confignode.procedure.impl.pipe.runtime.PipeHandleMetaChangeProcedure;
59
import org.apache.iotdb.confignode.procedure.impl.pipe.runtime.PipeMetaSyncProcedure;
60
import org.apache.iotdb.confignode.procedure.impl.pipe.task.CreatePipeProcedureV2;
61
import org.apache.iotdb.confignode.procedure.impl.pipe.task.DropPipeProcedureV2;
62
import org.apache.iotdb.confignode.procedure.impl.pipe.task.StartPipeProcedureV2;
63
import org.apache.iotdb.confignode.procedure.impl.pipe.task.StopPipeProcedureV2;
64
import org.apache.iotdb.confignode.procedure.impl.schema.AlterLogicalViewProcedure;
65
import org.apache.iotdb.confignode.procedure.impl.schema.DeactivateTemplateProcedure;
66
import org.apache.iotdb.confignode.procedure.impl.schema.DeleteDatabaseProcedure;
67
import org.apache.iotdb.confignode.procedure.impl.schema.DeleteLogicalViewProcedure;
68
import org.apache.iotdb.confignode.procedure.impl.schema.DeleteTimeSeriesProcedure;
69
import org.apache.iotdb.confignode.procedure.impl.schema.SetTemplateProcedure;
70
import org.apache.iotdb.confignode.procedure.impl.schema.UnsetTemplateProcedure;
71
import org.apache.iotdb.confignode.procedure.impl.statemachine.CreateRegionGroupsProcedure;
72
import org.apache.iotdb.confignode.procedure.impl.statemachine.RegionMigrateProcedure;
73
import org.apache.iotdb.confignode.procedure.impl.trigger.CreateTriggerProcedure;
74
import org.apache.iotdb.confignode.procedure.impl.trigger.DropTriggerProcedure;
75
import org.apache.iotdb.confignode.procedure.scheduler.ProcedureScheduler;
76
import org.apache.iotdb.confignode.procedure.scheduler.SimpleProcedureScheduler;
77
import org.apache.iotdb.confignode.procedure.store.ConfigProcedureStore;
78
import org.apache.iotdb.confignode.procedure.store.IProcedureStore;
79
import org.apache.iotdb.confignode.procedure.store.ProcedureFactory;
80
import org.apache.iotdb.confignode.procedure.store.ProcedureStore;
81
import org.apache.iotdb.confignode.procedure.store.ProcedureType;
82
import org.apache.iotdb.confignode.rpc.thrift.TAlterLogicalViewReq;
83
import org.apache.iotdb.confignode.rpc.thrift.TConfigNodeRegisterReq;
84
import org.apache.iotdb.confignode.rpc.thrift.TCreateCQReq;
85
import org.apache.iotdb.confignode.rpc.thrift.TCreatePipeReq;
86
import org.apache.iotdb.confignode.rpc.thrift.TDatabaseSchema;
87
import org.apache.iotdb.confignode.rpc.thrift.TDeleteLogicalViewReq;
88
import org.apache.iotdb.confignode.rpc.thrift.TDeleteTimeSeriesReq;
89
import org.apache.iotdb.confignode.rpc.thrift.TMigrateRegionReq;
90
import org.apache.iotdb.confignode.rpc.thrift.TRegionMigrateResultReportReq;
91
import org.apache.iotdb.db.exception.BatchProcessException;
92
import org.apache.iotdb.db.schemaengine.template.Template;
93
import org.apache.iotdb.rpc.RpcUtils;
94
import org.apache.iotdb.rpc.TSStatusCode;
95
import org.apache.iotdb.tsfile.utils.Binary;
96
import org.apache.iotdb.tsfile.utils.Pair;
97

98
import org.slf4j.Logger;
99
import org.slf4j.LoggerFactory;
100

101
import java.io.IOException;
102
import java.nio.ByteBuffer;
103
import java.util.ArrayList;
104
import java.util.Arrays;
105
import java.util.Collections;
106
import java.util.HashMap;
107
import java.util.List;
108
import java.util.Map;
109
import java.util.Set;
110
import java.util.concurrent.ScheduledExecutorService;
111
import java.util.concurrent.TimeUnit;
112
import java.util.stream.Collectors;
113

114
public class ProcedureManager {
115
  private static final Logger LOGGER = LoggerFactory.getLogger(ProcedureManager.class);
×
116

117
  private static final ConfigNodeConfig CONFIG_NODE_CONFIG =
×
118
      ConfigNodeDescriptor.getInstance().getConf();
×
119

120
  private static final int PROCEDURE_WAIT_TIME_OUT = 30;
121
  private static final int PROCEDURE_WAIT_RETRY_TIMEOUT = 250;
122

123
  private final ConfigManager configManager;
124
  private ProcedureExecutor<ConfigNodeProcedureEnv> executor;
125
  private ProcedureScheduler scheduler;
126
  private IProcedureStore store;
127
  private ConfigNodeProcedureEnv env;
128

129
  private final long planSizeLimit;
130

131
  public ProcedureManager(ConfigManager configManager, ProcedureInfo procedureInfo) {
×
132
    this.configManager = configManager;
×
133
    this.scheduler = new SimpleProcedureScheduler();
×
134
    this.store = new ConfigProcedureStore(configManager, procedureInfo);
×
135
    this.env = new ConfigNodeProcedureEnv(configManager, scheduler);
×
136
    this.executor = new ProcedureExecutor<>(env, store, scheduler);
×
137
    this.planSizeLimit =
×
138
        ConfigNodeDescriptor.getInstance()
×
139
                .getConf()
×
140
                .getConfigNodeRatisConsensusLogAppenderBufferSize()
×
141
            - IoTDBConstant.RAFT_LOG_BASIC_SIZE;
142
  }
×
143

144
  public void shiftExecutor(boolean running) {
145
    if (running) {
×
146
      if (!executor.isRunning()) {
×
147
        executor.init(CONFIG_NODE_CONFIG.getProcedureCoreWorkerThreadsCount());
×
148
        executor.startWorkers();
×
149
        executor.startCompletedCleaner(
×
150
            CONFIG_NODE_CONFIG.getProcedureCompletedCleanInterval(),
×
151
            CONFIG_NODE_CONFIG.getProcedureCompletedEvictTTL());
×
152
        store.start();
×
153
        LOGGER.info("ProcedureManager is started successfully.");
×
154
      }
155
    } else {
156
      if (executor.isRunning()) {
×
157
        executor.stop();
×
158
        if (!executor.isRunning()) {
×
159
          executor.join();
×
160
          store.stop();
×
161
          LOGGER.info("ProcedureManager is stopped successfully.");
×
162
        }
163
      }
164
    }
165
  }
×
166

167
  public TSStatus deleteDatabases(ArrayList<TDatabaseSchema> deleteSgSchemaList) {
168
    List<Long> procedureIds = new ArrayList<>();
×
169
    for (TDatabaseSchema storageGroupSchema : deleteSgSchemaList) {
×
170
      DeleteDatabaseProcedure deleteDatabaseProcedure =
×
171
          new DeleteDatabaseProcedure(storageGroupSchema);
172
      long procedureId = this.executor.submitProcedure(deleteDatabaseProcedure);
×
173
      procedureIds.add(procedureId);
×
174
    }
×
175
    List<TSStatus> procedureStatus = new ArrayList<>();
×
176
    boolean isSucceed = waitingProcedureFinished(procedureIds, procedureStatus);
×
177
    // clear the previously deleted regions
178
    final PartitionManager partitionManager = getConfigManager().getPartitionManager();
×
179
    partitionManager.getRegionMaintainer().submit(partitionManager::maintainRegionReplicas);
×
180
    if (isSucceed) {
×
181
      return StatusUtils.OK;
×
182
    } else {
183
      return RpcUtils.getStatus(procedureStatus);
×
184
    }
185
  }
186

187
  public TSStatus deleteTimeSeries(TDeleteTimeSeriesReq req) {
188
    String queryId = req.getQueryId();
×
189
    PathPatternTree patternTree =
×
190
        PathPatternTree.deserialize(ByteBuffer.wrap(req.getPathPatternTree()));
×
191
    long procedureId = -1;
×
192
    synchronized (this) {
×
193
      boolean hasOverlappedTask = false;
×
194
      ProcedureType type;
195
      DeleteTimeSeriesProcedure deleteTimeSeriesProcedure;
196
      for (Procedure<?> procedure : executor.getProcedures().values()) {
×
197
        type = ProcedureFactory.getProcedureType(procedure);
×
198
        if (type == null || !type.equals(ProcedureType.DELETE_TIMESERIES_PROCEDURE)) {
×
199
          continue;
×
200
        }
201
        deleteTimeSeriesProcedure = ((DeleteTimeSeriesProcedure) procedure);
×
202
        if (queryId.equals(deleteTimeSeriesProcedure.getQueryId())) {
×
203
          procedureId = deleteTimeSeriesProcedure.getProcId();
×
204
          break;
×
205
        }
206
        if (patternTree.isOverlapWith(deleteTimeSeriesProcedure.getPatternTree())) {
×
207
          hasOverlappedTask = true;
×
208
          break;
×
209
        }
210
      }
×
211

212
      if (procedureId == -1) {
×
213
        if (hasOverlappedTask) {
×
214
          return RpcUtils.getStatus(
×
215
              TSStatusCode.OVERLAP_WITH_EXISTING_TASK,
216
              "Some other task is deleting some target timeseries.");
217
        }
218
        procedureId =
×
219
            this.executor.submitProcedure(new DeleteTimeSeriesProcedure(queryId, patternTree));
×
220
      }
221
    }
×
222
    List<TSStatus> procedureStatus = new ArrayList<>();
×
223
    boolean isSucceed =
×
224
        waitingProcedureFinished(Collections.singletonList(procedureId), procedureStatus);
×
225
    if (isSucceed) {
×
226
      return StatusUtils.OK;
×
227
    } else {
228
      return procedureStatus.get(0);
×
229
    }
230
  }
231

232
  public TSStatus deleteLogicalView(TDeleteLogicalViewReq req) {
233
    String queryId = req.getQueryId();
×
234
    PathPatternTree patternTree =
×
235
        PathPatternTree.deserialize(ByteBuffer.wrap(req.getPathPatternTree()));
×
236
    long procedureId = -1;
×
237
    synchronized (this) {
×
238
      boolean hasOverlappedTask = false;
×
239
      ProcedureType type;
240
      DeleteLogicalViewProcedure deleteLogicalViewProcedure;
241
      for (Procedure<?> procedure : executor.getProcedures().values()) {
×
242
        type = ProcedureFactory.getProcedureType(procedure);
×
243
        if (type == null || !type.equals(ProcedureType.DELETE_LOGICAL_VIEW_PROCEDURE)) {
×
244
          continue;
×
245
        }
246
        deleteLogicalViewProcedure = ((DeleteLogicalViewProcedure) procedure);
×
247
        if (queryId.equals(deleteLogicalViewProcedure.getQueryId())) {
×
248
          procedureId = deleteLogicalViewProcedure.getProcId();
×
249
          break;
×
250
        }
251
        if (patternTree.isOverlapWith(deleteLogicalViewProcedure.getPatternTree())) {
×
252
          hasOverlappedTask = true;
×
253
          break;
×
254
        }
255
      }
×
256

257
      if (procedureId == -1) {
×
258
        if (hasOverlappedTask) {
×
259
          return RpcUtils.getStatus(
×
260
              TSStatusCode.OVERLAP_WITH_EXISTING_TASK,
261
              "Some other task is deleting some target views.");
262
        }
263
        procedureId =
×
264
            this.executor.submitProcedure(new DeleteLogicalViewProcedure(queryId, patternTree));
×
265
      }
266
    }
×
267
    List<TSStatus> procedureStatus = new ArrayList<>();
×
268
    boolean isSucceed =
×
269
        waitingProcedureFinished(Collections.singletonList(procedureId), procedureStatus);
×
270
    if (isSucceed) {
×
271
      return StatusUtils.OK;
×
272
    } else {
273
      return procedureStatus.get(0);
×
274
    }
275
  }
276

277
  public TSStatus alterLogicalView(TAlterLogicalViewReq req) {
278
    String queryId = req.getQueryId();
×
279
    ByteBuffer byteBuffer = ByteBuffer.wrap(req.getViewBinary());
×
280
    Map<PartialPath, ViewExpression> viewPathToSourceMap = new HashMap<>();
×
281
    int size = byteBuffer.getInt();
×
282
    PartialPath path;
283
    ViewExpression viewExpression;
284
    for (int i = 0; i < size; i++) {
×
285
      path = (PartialPath) PathDeserializeUtil.deserialize(byteBuffer);
×
286
      viewExpression = ViewExpression.deserialize(byteBuffer);
×
287
      viewPathToSourceMap.put(path, viewExpression);
×
288
    }
289

290
    long procedureId = -1;
×
291
    synchronized (this) {
×
292
      ProcedureType type;
293
      AlterLogicalViewProcedure alterLogicalViewProcedure;
294
      for (Procedure<?> procedure : executor.getProcedures().values()) {
×
295
        type = ProcedureFactory.getProcedureType(procedure);
×
296
        if (type == null || !type.equals(ProcedureType.ALTER_LOGICAL_VIEW_PROCEDURE)) {
×
297
          continue;
×
298
        }
299
        alterLogicalViewProcedure = ((AlterLogicalViewProcedure) procedure);
×
300
        if (queryId.equals(alterLogicalViewProcedure.getQueryId())) {
×
301
          procedureId = alterLogicalViewProcedure.getProcId();
×
302
          break;
×
303
        }
304
      }
×
305

306
      if (procedureId == -1) {
×
307
        procedureId =
×
308
            this.executor.submitProcedure(
×
309
                new AlterLogicalViewProcedure(queryId, viewPathToSourceMap));
310
      }
311
    }
×
312
    List<TSStatus> procedureStatus = new ArrayList<>();
×
313
    boolean isSucceed =
×
314
        waitingProcedureFinished(Collections.singletonList(procedureId), procedureStatus);
×
315
    if (isSucceed) {
×
316
      return StatusUtils.OK;
×
317
    } else {
318
      return procedureStatus.get(0);
×
319
    }
320
  }
321

322
  public TSStatus setSchemaTemplate(String queryId, String templateName, String templateSetPath) {
323
    long procedureId = -1;
×
324
    synchronized (this) {
×
325
      boolean hasOverlappedTask = false;
×
326
      ProcedureType type;
327
      SetTemplateProcedure setTemplateProcedure;
328
      for (Procedure<?> procedure : executor.getProcedures().values()) {
×
329
        type = ProcedureFactory.getProcedureType(procedure);
×
330
        if (type == null || !type.equals(ProcedureType.SET_TEMPLATE_PROCEDURE)) {
×
331
          continue;
×
332
        }
333
        setTemplateProcedure = (SetTemplateProcedure) procedure;
×
334
        if (queryId.equals(setTemplateProcedure.getQueryId())) {
×
335
          procedureId = setTemplateProcedure.getProcId();
×
336
          break;
×
337
        }
338
        if (templateSetPath.equals(setTemplateProcedure.getTemplateSetPath())) {
×
339
          hasOverlappedTask = true;
×
340
          break;
×
341
        }
342
      }
×
343

344
      if (procedureId == -1) {
×
345
        if (hasOverlappedTask) {
×
346
          return RpcUtils.getStatus(
×
347
              TSStatusCode.OVERLAP_WITH_EXISTING_TASK,
348
              "Some other task is setting template on target path.");
349
        }
350
        procedureId =
×
351
            this.executor.submitProcedure(
×
352
                new SetTemplateProcedure(queryId, templateName, templateSetPath));
353
      }
354
    }
×
355
    List<TSStatus> procedureStatus = new ArrayList<>();
×
356
    boolean isSucceed =
×
357
        waitingProcedureFinished(Collections.singletonList(procedureId), procedureStatus);
×
358
    if (isSucceed) {
×
359
      return StatusUtils.OK;
×
360
    } else {
361
      return procedureStatus.get(0);
×
362
    }
363
  }
364

365
  public TSStatus deactivateTemplate(
366
      String queryId, Map<PartialPath, List<Template>> templateSetInfo) {
367
    long procedureId = -1;
×
368
    synchronized (this) {
×
369
      boolean hasOverlappedTask = false;
×
370
      ProcedureType type;
371
      DeactivateTemplateProcedure deactivateTemplateProcedure;
372
      for (Procedure<?> procedure : executor.getProcedures().values()) {
×
373
        type = ProcedureFactory.getProcedureType(procedure);
×
374
        if (type == null || !type.equals(ProcedureType.DEACTIVATE_TEMPLATE_PROCEDURE)) {
×
375
          continue;
×
376
        }
377
        deactivateTemplateProcedure = (DeactivateTemplateProcedure) procedure;
×
378
        if (queryId.equals(deactivateTemplateProcedure.getQueryId())) {
×
379
          procedureId = deactivateTemplateProcedure.getProcId();
×
380
          break;
×
381
        }
382
        for (PartialPath pattern : templateSetInfo.keySet()) {
×
383
          for (PartialPath existingPattern :
384
              deactivateTemplateProcedure.getTemplateSetInfo().keySet()) {
×
385
            if (pattern.overlapWith(existingPattern)) {
×
386
              hasOverlappedTask = true;
×
387
              break;
×
388
            }
389
          }
×
390
          if (hasOverlappedTask) {
×
391
            break;
×
392
          }
393
        }
×
394
        if (hasOverlappedTask) {
×
395
          break;
×
396
        }
397
      }
×
398

399
      if (procedureId == -1) {
×
400
        if (hasOverlappedTask) {
×
401
          return RpcUtils.getStatus(
×
402
              TSStatusCode.OVERLAP_WITH_EXISTING_TASK,
403
              "Some other task is deactivating some target template from target path.");
404
        }
405
        procedureId =
×
406
            this.executor.submitProcedure(
×
407
                new DeactivateTemplateProcedure(queryId, templateSetInfo));
408
      }
409
    }
×
410
    List<TSStatus> procedureStatus = new ArrayList<>();
×
411
    boolean isSucceed =
×
412
        waitingProcedureFinished(Collections.singletonList(procedureId), procedureStatus);
×
413
    if (isSucceed) {
×
414
      return StatusUtils.OK;
×
415
    } else {
416
      return procedureStatus.get(0);
×
417
    }
418
  }
419

420
  public TSStatus unsetSchemaTemplate(String queryId, Template template, PartialPath path) {
421
    long procedureId = -1;
×
422
    synchronized (this) {
×
423
      boolean hasOverlappedTask = false;
×
424
      ProcedureType type;
425
      UnsetTemplateProcedure unsetTemplateProcedure;
426
      for (Procedure<?> procedure : executor.getProcedures().values()) {
×
427
        type = ProcedureFactory.getProcedureType(procedure);
×
428
        if (type == null || !type.equals(ProcedureType.UNSET_TEMPLATE_PROCEDURE)) {
×
429
          continue;
×
430
        }
431
        unsetTemplateProcedure = (UnsetTemplateProcedure) procedure;
×
432
        if (queryId.equals(unsetTemplateProcedure.getQueryId())) {
×
433
          procedureId = unsetTemplateProcedure.getProcId();
×
434
          break;
×
435
        }
436
        if (template.getId() == unsetTemplateProcedure.getTemplateId()
×
437
            && path.equals(unsetTemplateProcedure.getPath())) {
×
438
          hasOverlappedTask = true;
×
439
          break;
×
440
        }
441
      }
×
442

443
      if (procedureId == -1) {
×
444
        if (hasOverlappedTask) {
×
445
          return RpcUtils.getStatus(
×
446
              TSStatusCode.OVERLAP_WITH_EXISTING_TASK,
447
              "Some other task is unsetting target template from target path "
448
                  + path.getFullPath());
×
449
        }
450
        procedureId =
×
451
            this.executor.submitProcedure(new UnsetTemplateProcedure(queryId, template, path));
×
452
      }
453
    }
×
454
    List<TSStatus> procedureStatus = new ArrayList<>();
×
455
    boolean isSucceed =
×
456
        waitingProcedureFinished(Collections.singletonList(procedureId), procedureStatus);
×
457
    if (isSucceed) {
×
458
      return StatusUtils.OK;
×
459
    } else {
460
      return procedureStatus.get(0);
×
461
    }
462
  }
463

464
  /**
465
   * Generate an {@link AddConfigNodeProcedure}, and serially execute all the {@link
466
   * AddConfigNodeProcedure}s.
467
   */
468
  public void addConfigNode(TConfigNodeRegisterReq req) {
469
    AddConfigNodeProcedure addConfigNodeProcedure =
×
470
        new AddConfigNodeProcedure(req.getConfigNodeLocation(), req.getVersionInfo());
×
471
    this.executor.submitProcedure(addConfigNodeProcedure);
×
472
  }
×
473

474
  /**
475
   * Generate a {@link RemoveConfigNodeProcedure}, and serially execute all the {@link
476
   * RemoveConfigNodeProcedure}s.
477
   */
478
  public void removeConfigNode(RemoveConfigNodePlan removeConfigNodePlan) {
479
    RemoveConfigNodeProcedure removeConfigNodeProcedure =
×
480
        new RemoveConfigNodeProcedure(removeConfigNodePlan.getConfigNodeLocation());
×
481
    this.executor.submitProcedure(removeConfigNodeProcedure);
×
482
    LOGGER.info("Submit RemoveConfigNodeProcedure successfully: {}", removeConfigNodePlan);
×
483
  }
×
484

485
  /**
486
   * Generate {@link RemoveDataNodeProcedure}s, and serially execute all the {@link
487
   * RemoveDataNodeProcedure}s.
488
   */
489
  public boolean removeDataNode(RemoveDataNodePlan removeDataNodePlan) {
490
    removeDataNodePlan
×
491
        .getDataNodeLocations()
×
492
        .forEach(
×
493
            tDataNodeLocation -> {
494
              this.executor.submitProcedure(new RemoveDataNodeProcedure(tDataNodeLocation));
×
495
              LOGGER.info("Submit RemoveDataNodeProcedure successfully, {}", tDataNodeLocation);
×
496
            });
×
497
    return true;
×
498
  }
499

500
  public TSStatus migrateRegion(TMigrateRegionReq migrateRegionReq) {
501
    TConsensusGroupId regionGroupId;
502
    if (configManager
×
503
        .getPartitionManager()
×
504
        .isRegionGroupExists(
×
505
            new TConsensusGroupId(
506
                TConsensusGroupType.SchemaRegion, migrateRegionReq.getRegionId()))) {
×
507
      regionGroupId =
×
508
          new TConsensusGroupId(TConsensusGroupType.SchemaRegion, migrateRegionReq.getRegionId());
×
509
    } else if (configManager
×
510
        .getPartitionManager()
×
511
        .isRegionGroupExists(
×
512
            new TConsensusGroupId(
513
                TConsensusGroupType.DataRegion, migrateRegionReq.getRegionId()))) {
×
514
      regionGroupId =
×
515
          new TConsensusGroupId(TConsensusGroupType.DataRegion, migrateRegionReq.getRegionId());
×
516
    } else {
517
      LOGGER.warn(
×
518
          "Submit RegionMigrateProcedure failed, because RegionGroup: {} doesn't exist",
519
          migrateRegionReq.getRegionId());
×
520
      TSStatus status = new TSStatus(TSStatusCode.MIGRATE_REGION_ERROR.getStatusCode());
×
521
      status.setMessage(
×
522
          String.format(
×
523
              "Submit RegionMigrateProcedure failed, because RegionGroup: %s doesn't exist",
524
              migrateRegionReq.getRegionId()));
×
525
      return status;
×
526
    }
527

528
    TDataNodeLocation originalDataNode =
×
529
        configManager
530
            .getNodeManager()
×
531
            .getRegisteredDataNode(migrateRegionReq.getFromId())
×
532
            .getLocation();
×
533
    TDataNodeLocation destDataNode =
×
534
        configManager
535
            .getNodeManager()
×
536
            .getRegisteredDataNode(migrateRegionReq.getToId())
×
537
            .getLocation();
×
538

539
    if (originalDataNode == null) {
×
540
      LOGGER.warn(
×
541
          "Submit RegionMigrateProcedure failed, because no original DataNode {}",
542
          migrateRegionReq.getFromId());
×
543
      TSStatus status = new TSStatus(TSStatusCode.MIGRATE_REGION_ERROR.getStatusCode());
×
544
      status.setMessage(
×
545
          "Submit RegionMigrateProcedure failed, because no original DataNode "
546
              + migrateRegionReq.getFromId());
×
547
      return status;
×
548
    } else if (destDataNode == null) {
×
549
      LOGGER.warn(
×
550
          "Submit RegionMigrateProcedure failed, because no target DataNode {}",
551
          migrateRegionReq.getToId());
×
552
      TSStatus status = new TSStatus(TSStatusCode.MIGRATE_REGION_ERROR.getStatusCode());
×
553
      status.setMessage(
×
554
          "Submit RegionMigrateProcedure failed, because no target DataNode "
555
              + migrateRegionReq.getToId());
×
556
      return status;
×
557
    } else if (configManager.getPartitionManager()
×
558
        .getAllReplicaSets(originalDataNode.getDataNodeId()).stream()
×
559
        .noneMatch(replicaSet -> replicaSet.getRegionId().equals(regionGroupId))) {
×
560
      LOGGER.warn(
×
561
          "Submit RegionMigrateProcedure failed, because the original DataNode {} doesn't contain Region {}",
562
          migrateRegionReq.getFromId(),
×
563
          migrateRegionReq.getRegionId());
×
564
      TSStatus status = new TSStatus(TSStatusCode.MIGRATE_REGION_ERROR.getStatusCode());
×
565
      status.setMessage(
×
566
          "Submit RegionMigrateProcedure failed, because the original DataNode "
567
              + migrateRegionReq.getFromId()
×
568
              + " doesn't contain Region "
569
              + migrateRegionReq.getRegionId());
×
570
      return status;
×
571
    } else if (configManager.getPartitionManager().getAllReplicaSets(destDataNode.getDataNodeId())
×
572
        .stream()
×
573
        .anyMatch(replicaSet -> replicaSet.getRegionId().equals(regionGroupId))) {
×
574
      LOGGER.warn(
×
575
          "Submit RegionMigrateProcedure failed, because the target DataNode {} already contains Region {}",
576
          migrateRegionReq.getToId(),
×
577
          migrateRegionReq.getRegionId());
×
578
      TSStatus status = new TSStatus(TSStatusCode.MIGRATE_REGION_ERROR.getStatusCode());
×
579
      status.setMessage(
×
580
          "Submit RegionMigrateProcedure failed, because the target DataNode "
581
              + migrateRegionReq.getToId()
×
582
              + " already contains Region "
583
              + migrateRegionReq.getRegionId());
×
584
      return status;
×
585
    }
586
    // Here we only check Running DataNode to implement migration, because removing nodes may not
587
    // exist when add peer is performing
588
    Set<Integer> aliveDataNodes =
×
589
        configManager.getNodeManager().filterDataNodeThroughStatus(NodeStatus.Running).stream()
×
590
            .map(TDataNodeConfiguration::getLocation)
×
591
            .map(TDataNodeLocation::getDataNodeId)
×
592
            .collect(Collectors.toSet());
×
593
    if (NodeStatus.Unknown.equals(
×
594
        configManager.getLoadManager().getNodeStatus(migrateRegionReq.getFromId()))) {
×
595
      LOGGER.warn(
×
596
          "Submit RegionMigrateProcedure failed, because the sourceDataNode {} is Unknown.",
597
          migrateRegionReq.getFromId());
×
598
      TSStatus status = new TSStatus(TSStatusCode.MIGRATE_REGION_ERROR.getStatusCode());
×
599
      status.setMessage(
×
600
          "Submit RegionMigrateProcedure failed, because the sourceDataNode "
601
              + migrateRegionReq.getFromId()
×
602
              + " is Unknown.");
603
      return status;
×
604
    }
605

606
    if (!aliveDataNodes.contains(migrateRegionReq.getToId())) {
×
607
      LOGGER.warn(
×
608
          "Submit RegionMigrateProcedure failed, because the destDataNode {} is ReadOnly or Unknown.",
609
          migrateRegionReq.getToId());
×
610
      TSStatus status = new TSStatus(TSStatusCode.MIGRATE_REGION_ERROR.getStatusCode());
×
611
      status.setMessage(
×
612
          "Submit RegionMigrateProcedure failed, because the destDataNode "
613
              + migrateRegionReq.getToId()
×
614
              + " is ReadOnly or Unknown.");
615
      return status;
×
616
    }
617
    this.executor.submitProcedure(
×
618
        new RegionMigrateProcedure(regionGroupId, originalDataNode, destDataNode));
619
    LOGGER.info(
×
620
        "Submit RegionMigrateProcedure successfully, Region: {}, From: {}, To: {}",
621
        migrateRegionReq.getRegionId(),
×
622
        migrateRegionReq.getFromId(),
×
623
        migrateRegionReq.getToId());
×
624
    return new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode());
×
625
  }
626

627
  /**
628
   * Generate CreateRegionGroupsProcedure and wait for it finished.
629
   *
630
   * @return SUCCESS_STATUS if all RegionGroups created successfully, CREATE_REGION_ERROR otherwise
631
   */
632
  public TSStatus createRegionGroups(
633
      TConsensusGroupType consensusGroupType, CreateRegionGroupsPlan createRegionGroupsPlan) {
634
    long procedureId =
×
635
        executor.submitProcedure(
×
636
            new CreateRegionGroupsProcedure(consensusGroupType, createRegionGroupsPlan));
637
    List<TSStatus> statusList = new ArrayList<>();
×
638
    boolean isSucceed =
×
639
        waitingProcedureFinished(Collections.singletonList(procedureId), statusList);
×
640
    if (isSucceed) {
×
641
      return RpcUtils.SUCCESS_STATUS;
×
642
    } else {
643
      return new TSStatus(TSStatusCode.CREATE_REGION_ERROR.getStatusCode())
×
644
          .setMessage(statusList.get(0).getMessage());
×
645
    }
646
  }
647

648
  /**
649
   * Generate CreateTriggerProcedure and wait for it finished.
650
   *
651
   * @return SUCCESS_STATUS if trigger created successfully, CREATE_TRIGGER_ERROR otherwise
652
   */
653
  public TSStatus createTrigger(TriggerInformation triggerInformation, Binary jarFile) {
654
    final CreateTriggerProcedure createTriggerProcedure =
×
655
        new CreateTriggerProcedure(triggerInformation, jarFile);
656
    try {
657
      if (jarFile != null
×
658
          && new UpdateProcedurePlan(createTriggerProcedure).getSerializedSize() > planSizeLimit) {
×
659
        return new TSStatus(TSStatusCode.CREATE_TRIGGER_ERROR.getStatusCode())
×
660
            .setMessage(
×
661
                String.format(
×
662
                    "Fail to create trigger[%s], the size of Jar is too large, you can increase the value of property 'config_node_ratis_log_appender_buffer_size_max' on ConfigNode",
663
                    triggerInformation.getTriggerName()));
×
664
      }
665
    } catch (IOException e) {
×
666
      return new TSStatus(TSStatusCode.CREATE_TRIGGER_ERROR.getStatusCode())
×
667
          .setMessage(e.getMessage());
×
668
    }
×
669

670
    long procedureId = executor.submitProcedure(createTriggerProcedure);
×
671
    List<TSStatus> statusList = new ArrayList<>();
×
672
    boolean isSucceed =
×
673
        waitingProcedureFinished(Collections.singletonList(procedureId), statusList);
×
674
    if (isSucceed) {
×
675
      return RpcUtils.SUCCESS_STATUS;
×
676
    } else {
677
      return new TSStatus(TSStatusCode.CREATE_TRIGGER_ERROR.getStatusCode())
×
678
          .setMessage(statusList.get(0).getMessage());
×
679
    }
680
  }
681

682
  /**
683
   * Generate DropTriggerProcedure and wait for it finished.
684
   *
685
   * @return SUCCESS_STATUS if trigger dropped successfully, DROP_TRIGGER_ERROR otherwise
686
   */
687
  public TSStatus dropTrigger(String triggerName) {
688
    long procedureId = executor.submitProcedure(new DropTriggerProcedure(triggerName));
×
689
    List<TSStatus> statusList = new ArrayList<>();
×
690
    boolean isSucceed =
×
691
        waitingProcedureFinished(Collections.singletonList(procedureId), statusList);
×
692
    if (isSucceed) {
×
693
      return RpcUtils.SUCCESS_STATUS;
×
694
    } else {
695
      return new TSStatus(TSStatusCode.DROP_TRIGGER_ERROR.getStatusCode())
×
696
          .setMessage(statusList.get(0).getMessage());
×
697
    }
698
  }
699

700
  public TSStatus createCQ(TCreateCQReq req, ScheduledExecutorService scheduledExecutor) {
701
    long procedureId = executor.submitProcedure(new CreateCQProcedure(req, scheduledExecutor));
×
702
    List<TSStatus> statusList = new ArrayList<>();
×
703
    waitingProcedureFinished(Collections.singletonList(procedureId), statusList);
×
704
    return statusList.get(0);
×
705
  }
706

707
  public TSStatus createModel(
708
      ModelInformation modelInformation, Map<String, String> hyperparameters) {
709
    long procedureId =
×
710
        executor.submitProcedure(new CreateModelProcedure(modelInformation, hyperparameters));
×
711
    List<TSStatus> statusList = new ArrayList<>();
×
712
    boolean isSucceed =
×
713
        waitingProcedureFinished(Collections.singletonList(procedureId), statusList);
×
714
    if (isSucceed) {
×
715
      return RpcUtils.SUCCESS_STATUS;
×
716
    } else {
717
      return new TSStatus(TSStatusCode.CREATE_MODEL_ERROR.getStatusCode())
×
718
          .setMessage(statusList.get(0).getMessage());
×
719
    }
720
  }
721

722
  public TSStatus dropModel(String modelId) {
723
    long procedureId = executor.submitProcedure(new DropModelProcedure(modelId));
×
724
    List<TSStatus> statusList = new ArrayList<>();
×
725
    boolean isSucceed =
×
726
        waitingProcedureFinished(Collections.singletonList(procedureId), statusList);
×
727
    if (isSucceed) {
×
728
      return RpcUtils.SUCCESS_STATUS;
×
729
    } else {
730
      return new TSStatus(TSStatusCode.DROP_MODEL_ERROR.getStatusCode())
×
731
          .setMessage(statusList.get(0).getMessage());
×
732
    }
733
  }
734

735
  public TSStatus createPipePlugin(PipePluginMeta pipePluginMeta, byte[] jarFile) {
736
    final CreatePipePluginProcedure createPipePluginProcedure =
×
737
        new CreatePipePluginProcedure(pipePluginMeta, jarFile);
738
    try {
739
      if (jarFile != null
×
740
          && new UpdateProcedurePlan(createPipePluginProcedure).getSerializedSize()
×
741
              > planSizeLimit) {
742
        return new TSStatus(TSStatusCode.CREATE_PIPE_PLUGIN_ERROR.getStatusCode())
×
743
            .setMessage(
×
744
                String.format(
×
745
                    "Fail to create pipe plugin[%s], the size of Jar is too large, you can increase the value of property 'config_node_ratis_log_appender_buffer_size_max' on ConfigNode",
746
                    pipePluginMeta.getPluginName()));
×
747
      }
748
    } catch (IOException e) {
×
749
      return new TSStatus(TSStatusCode.CREATE_PIPE_PLUGIN_ERROR.getStatusCode())
×
750
          .setMessage(e.getMessage());
×
751
    }
×
752

753
    long procedureId = executor.submitProcedure(createPipePluginProcedure);
×
754
    List<TSStatus> statusList = new ArrayList<>();
×
755
    boolean isSucceed =
×
756
        waitingProcedureFinished(Collections.singletonList(procedureId), statusList);
×
757
    if (isSucceed) {
×
758
      return RpcUtils.SUCCESS_STATUS;
×
759
    } else {
760
      return new TSStatus(TSStatusCode.CREATE_PIPE_PLUGIN_ERROR.getStatusCode())
×
761
          .setMessage(statusList.get(0).getMessage());
×
762
    }
763
  }
764

765
  public TSStatus dropPipePlugin(String pluginName) {
766
    long procedureId = executor.submitProcedure(new DropPipePluginProcedure(pluginName));
×
767
    List<TSStatus> statusList = new ArrayList<>();
×
768
    boolean isSucceed =
×
769
        waitingProcedureFinished(Collections.singletonList(procedureId), statusList);
×
770
    if (isSucceed) {
×
771
      return RpcUtils.SUCCESS_STATUS;
×
772
    } else {
773
      return new TSStatus(TSStatusCode.DROP_PIPE_PLUGIN_ERROR.getStatusCode())
×
774
          .setMessage(statusList.get(0).getMessage());
×
775
    }
776
  }
777

778
  public TSStatus createPipe(TCreatePipeReq req) {
779
    try {
780
      long procedureId = executor.submitProcedure(new CreatePipeProcedureV2(req));
×
781
      List<TSStatus> statusList = new ArrayList<>();
×
782
      boolean isSucceed =
×
783
          waitingProcedureFinished(Collections.singletonList(procedureId), statusList);
×
784
      if (isSucceed) {
×
785
        return RpcUtils.SUCCESS_STATUS;
×
786
      } else {
787
        return new TSStatus(TSStatusCode.PIPE_ERROR.getStatusCode())
×
788
            .setMessage(statusList.get(0).getMessage());
×
789
      }
790
    } catch (Exception e) {
×
791
      return new TSStatus(TSStatusCode.PIPE_ERROR.getStatusCode()).setMessage(e.getMessage());
×
792
    }
793
  }
794

795
  public TSStatus startPipe(String pipeName) {
796
    try {
797
      long procedureId = executor.submitProcedure(new StartPipeProcedureV2(pipeName));
×
798
      List<TSStatus> statusList = new ArrayList<>();
×
799
      boolean isSucceed =
×
800
          waitingProcedureFinished(Collections.singletonList(procedureId), statusList);
×
801
      if (isSucceed) {
×
802
        return RpcUtils.SUCCESS_STATUS;
×
803
      } else {
804
        return new TSStatus(TSStatusCode.PIPE_ERROR.getStatusCode())
×
805
            .setMessage(statusList.get(0).getMessage());
×
806
      }
807
    } catch (Exception e) {
×
808
      return new TSStatus(TSStatusCode.PIPE_ERROR.getStatusCode()).setMessage(e.getMessage());
×
809
    }
810
  }
811

812
  public TSStatus stopPipe(String pipeName) {
813
    try {
814
      long procedureId = executor.submitProcedure(new StopPipeProcedureV2(pipeName));
×
815
      List<TSStatus> statusList = new ArrayList<>();
×
816
      boolean isSucceed =
×
817
          waitingProcedureFinished(Collections.singletonList(procedureId), statusList);
×
818
      if (isSucceed) {
×
819
        return RpcUtils.SUCCESS_STATUS;
×
820
      } else {
821
        return new TSStatus(TSStatusCode.PIPE_ERROR.getStatusCode())
×
822
            .setMessage(statusList.get(0).getMessage());
×
823
      }
824
    } catch (Exception e) {
×
825
      return new TSStatus(TSStatusCode.PIPE_ERROR.getStatusCode()).setMessage(e.getMessage());
×
826
    }
827
  }
828

829
  public TSStatus dropPipe(String pipeName) {
830
    try {
831
      long procedureId = executor.submitProcedure(new DropPipeProcedureV2(pipeName));
×
832
      List<TSStatus> statusList = new ArrayList<>();
×
833
      boolean isSucceed =
×
834
          waitingProcedureFinished(Collections.singletonList(procedureId), statusList);
×
835
      if (isSucceed) {
×
836
        return RpcUtils.SUCCESS_STATUS;
×
837
      } else {
838
        return new TSStatus(TSStatusCode.PIPE_ERROR.getStatusCode())
×
839
            .setMessage(statusList.get(0).getMessage());
×
840
      }
841
    } catch (Exception e) {
×
842
      return new TSStatus(TSStatusCode.PIPE_ERROR.getStatusCode()).setMessage(e.getMessage());
×
843
    }
844
  }
845

846
  public void pipeHandleLeaderChange(
847
      Map<TConsensusGroupId, Pair<Integer, Integer>> dataRegionGroupToOldAndNewLeaderPairMap) {
848
    try {
849
      final long procedureId =
×
850
          executor.submitProcedure(
×
851
              new PipeHandleLeaderChangeProcedure(dataRegionGroupToOldAndNewLeaderPairMap));
852
      LOGGER.info("PipeHandleLeaderChangeProcedure was submitted, procedureId: {}.", procedureId);
×
853
    } catch (Exception e) {
×
854
      LOGGER.warn("PipeHandleLeaderChangeProcedure was failed to submit.", e);
×
855
    }
×
856
  }
×
857

858
  public void pipeHandleMetaChange(
859
      boolean needWriteConsensusOnConfigNodes, boolean needPushPipeMetaToDataNodes) {
860
    try {
861
      final long procedureId =
×
862
          executor.submitProcedure(
×
863
              new PipeHandleMetaChangeProcedure(
864
                  needWriteConsensusOnConfigNodes, needPushPipeMetaToDataNodes));
865
      LOGGER.info("PipeHandleMetaChangeProcedure was submitted, procedureId: {}.", procedureId);
×
866
    } catch (Exception e) {
×
867
      LOGGER.warn("PipeHandleMetaChangeProcedure was failed to submit.", e);
×
868
    }
×
869
  }
×
870

871
  public TSStatus pipeMetaSync() {
872
    try {
873
      final long procedureId = executor.submitProcedure(new PipeMetaSyncProcedure());
×
874
      final List<TSStatus> statusList = new ArrayList<>();
×
875
      final boolean isSucceed =
×
876
          waitingProcedureFinished(Collections.singletonList(procedureId), statusList);
×
877
      if (isSucceed) {
×
878
        return RpcUtils.SUCCESS_STATUS;
×
879
      } else {
880
        return new TSStatus(TSStatusCode.PIPE_ERROR.getStatusCode())
×
881
            .setMessage(statusList.get(0).getMessage());
×
882
      }
883
    } catch (Exception e) {
×
884
      return new TSStatus(TSStatusCode.PIPE_ERROR.getStatusCode()).setMessage(e.getMessage());
×
885
    }
886
  }
887

888
  /**
889
   * Waiting until the specific procedures finished.
890
   *
891
   * @param procedureIds The specific procedures' index
892
   * @param statusList The corresponding running results of these procedures
893
   * @return True if all Procedures finished successfully, false otherwise
894
   */
895
  private boolean waitingProcedureFinished(List<Long> procedureIds, List<TSStatus> statusList) {
896
    boolean isSucceed = true;
×
897
    for (long procedureId : procedureIds) {
×
898
      long startTimeForCurrentProcedure = System.currentTimeMillis();
×
899
      while (executor.isRunning()
×
900
          && !executor.isFinished(procedureId)
×
901
          && TimeUnit.MILLISECONDS.toSeconds(
×
902
                  System.currentTimeMillis() - startTimeForCurrentProcedure)
×
903
              < PROCEDURE_WAIT_TIME_OUT) {
904
        sleepWithoutInterrupt(PROCEDURE_WAIT_RETRY_TIMEOUT);
×
905
      }
906
      Procedure<ConfigNodeProcedureEnv> finishedProcedure =
×
907
          executor.getResultOrProcedure(procedureId);
×
908
      if (!finishedProcedure.isFinished()) {
×
909
        // the procedure is still executing
910
        statusList.add(RpcUtils.getStatus(TSStatusCode.OVERLAP_WITH_EXISTING_TASK));
×
911
        isSucceed = false;
×
912
        continue;
×
913
      }
914
      if (finishedProcedure.isSuccess()) {
×
915
        statusList.add(StatusUtils.OK);
×
916
      } else {
917
        if (finishedProcedure.getException().getCause() instanceof IoTDBException) {
×
918
          IoTDBException e = (IoTDBException) finishedProcedure.getException().getCause();
×
919
          if (e instanceof BatchProcessException) {
×
920
            statusList.add(
×
921
                RpcUtils.getStatus(
×
922
                    Arrays.stream(((BatchProcessException) e).getFailingStatus())
×
923
                        .collect(Collectors.toList())));
×
924
          } else {
925
            statusList.add(RpcUtils.getStatus(e.getErrorCode(), e.getMessage()));
×
926
          }
927
        } else {
×
928
          statusList.add(
×
929
              StatusUtils.EXECUTE_STATEMENT_ERROR.setMessage(
×
930
                  finishedProcedure.getException().getMessage()));
×
931
        }
932
        isSucceed = false;
×
933
      }
934
    }
×
935
    return isSucceed;
×
936
  }
937

938
  public static void sleepWithoutInterrupt(final long timeToSleep) {
939
    long currentTime = System.currentTimeMillis();
×
940
    long endTime = timeToSleep + currentTime;
×
941
    boolean interrupted = false;
×
942
    while (currentTime < endTime) {
×
943
      try {
944
        Thread.sleep(endTime - currentTime);
×
945
      } catch (InterruptedException e) {
×
946
        interrupted = true;
×
947
      }
×
948
      currentTime = System.currentTimeMillis();
×
949
    }
950
    if (interrupted) {
×
951
      Thread.currentThread().interrupt();
×
952
    }
953
  }
×
954

955
  // ======================================================
956
  /*
957
     GET-SET Region
958
  */
959
  // ======================================================
960
  public IManager getConfigManager() {
961
    return configManager;
×
962
  }
963

964
  public ProcedureExecutor<ConfigNodeProcedureEnv> getExecutor() {
965
    return executor;
×
966
  }
967

968
  public void setExecutor(ProcedureExecutor<ConfigNodeProcedureEnv> executor) {
969
    this.executor = executor;
×
970
  }
×
971

972
  public ProcedureScheduler getScheduler() {
973
    return scheduler;
×
974
  }
975

976
  public void setScheduler(ProcedureScheduler scheduler) {
977
    this.scheduler = scheduler;
×
978
  }
×
979

980
  public IProcedureStore getStore() {
981
    return store;
×
982
  }
983

984
  public void setStore(ProcedureStore store) {
985
    this.store = store;
×
986
  }
×
987

988
  public ConfigNodeProcedureEnv getEnv() {
989
    return env;
×
990
  }
991

992
  public void setEnv(ConfigNodeProcedureEnv env) {
993
    this.env = env;
×
994
  }
×
995

996
  public void reportRegionMigrateResult(TRegionMigrateResultReportReq req) {
997

998
    this.executor
×
999
        .getProcedures()
×
1000
        .values()
×
1001
        .forEach(
×
1002
            procedure -> {
1003
              if (procedure instanceof RegionMigrateProcedure) {
×
1004
                RegionMigrateProcedure regionMigrateProcedure = (RegionMigrateProcedure) procedure;
×
1005
                if (regionMigrateProcedure.getConsensusGroupId().equals(req.getRegionId())) {
×
1006
                  regionMigrateProcedure.notifyTheRegionMigrateFinished(req);
×
1007
                }
1008
              }
1009
            });
×
1010
  }
×
1011
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc