• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

apache / iotdb / #9999

05 Sep 2023 08:10AM CUT coverage: 47.669% (-0.03%) from 47.697%
#9999

push

travis_ci

web-flow
[IOTDB-6130] Delete data by specific pattern didn't work

80151 of 168139 relevant lines covered (47.67%)

0.48 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

75.15
/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/ProcedureExecutor.java
1
/*
2
 * Licensed to the Apache Software Foundation (ASF) under one
3
 * or more contributor license agreements.  See the NOTICE file
4
 * distributed with this work for additional information
5
 * regarding copyright ownership.  The ASF licenses this file
6
 * to you under the Apache License, Version 2.0 (the
7
 * "License"); you may not use this file except in compliance
8
 * with the License.  You may obtain a copy of the License at
9
 *
10
 *     http://www.apache.org/licenses/LICENSE-2.0
11
 *
12
 * Unless required by applicable law or agreed to in writing,
13
 * software distributed under the License is distributed on an
14
 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15
 * KIND, either express or implied.  See the License for the
16
 * specific language governing permissions and limitations
17
 * under the License.
18
 */
19

20
package org.apache.iotdb.confignode.procedure;
21

22
import org.apache.iotdb.commons.concurrent.ThreadName;
23
import org.apache.iotdb.confignode.procedure.exception.ProcedureException;
24
import org.apache.iotdb.confignode.procedure.exception.ProcedureSuspendedException;
25
import org.apache.iotdb.confignode.procedure.exception.ProcedureYieldException;
26
import org.apache.iotdb.confignode.procedure.scheduler.ProcedureScheduler;
27
import org.apache.iotdb.confignode.procedure.scheduler.SimpleProcedureScheduler;
28
import org.apache.iotdb.confignode.procedure.state.ProcedureLockState;
29
import org.apache.iotdb.confignode.procedure.state.ProcedureState;
30
import org.apache.iotdb.confignode.procedure.store.IProcedureStore;
31

32
import com.google.common.base.Preconditions;
33
import org.slf4j.Logger;
34
import org.slf4j.LoggerFactory;
35

36
import java.io.IOException;
37
import java.util.ArrayDeque;
38
import java.util.ArrayList;
39
import java.util.Arrays;
40
import java.util.Deque;
41
import java.util.HashSet;
42
import java.util.List;
43
import java.util.Set;
44
import java.util.concurrent.ConcurrentHashMap;
45
import java.util.concurrent.CopyOnWriteArrayList;
46
import java.util.concurrent.TimeUnit;
47
import java.util.concurrent.atomic.AtomicBoolean;
48
import java.util.concurrent.atomic.AtomicInteger;
49
import java.util.concurrent.atomic.AtomicLong;
50
import java.util.concurrent.atomic.AtomicReference;
51

52
public class ProcedureExecutor<Env> {
53
  private static final Logger LOG = LoggerFactory.getLogger(ProcedureExecutor.class);
1✔
54

55
  private final ConcurrentHashMap<Long, CompletedProcedureContainer<Env>> completed =
1✔
56
      new ConcurrentHashMap<>();
57

58
  private final ConcurrentHashMap<Long, RootProcedureStack<Env>> rollbackStack =
1✔
59
      new ConcurrentHashMap<>();
60

61
  private final ConcurrentHashMap<Long, Procedure> procedures = new ConcurrentHashMap<>();
1✔
62

63
  private ThreadGroup threadGroup;
64

65
  private CopyOnWriteArrayList<WorkerThread> workerThreads;
66

67
  private TimeoutExecutorThread<Env> timeoutExecutor;
68

69
  private TimeoutExecutorThread<Env> workerMonitorExecutor;
70

71
  private int corePoolSize;
72
  private int maxPoolSize;
73

74
  private final ProcedureScheduler scheduler;
75

76
  private final AtomicLong lastProcId = new AtomicLong(-1);
1✔
77
  private final AtomicLong workId = new AtomicLong(0);
1✔
78
  private final AtomicInteger activeExecutorCount = new AtomicInteger(0);
1✔
79
  private final AtomicBoolean running = new AtomicBoolean(false);
1✔
80
  private final Env environment;
81
  private final IProcedureStore store;
82

83
  public ProcedureExecutor(
84
      final Env environment, final IProcedureStore store, final ProcedureScheduler scheduler) {
1✔
85
    this.environment = environment;
1✔
86
    this.scheduler = scheduler;
1✔
87
    this.store = store;
1✔
88
    this.lastProcId.incrementAndGet();
1✔
89
  }
1✔
90

91
  public ProcedureExecutor(final Env environment, final IProcedureStore store) {
92
    this(environment, store, new SimpleProcedureScheduler());
1✔
93
  }
1✔
94

95
  public void init(int numThreads) {
96
    this.corePoolSize = numThreads;
1✔
97
    this.maxPoolSize = 10 * numThreads;
1✔
98
    this.threadGroup = new ThreadGroup(ThreadName.CONFIG_NODE_PROCEDURE_WORKER.getName());
1✔
99
    this.timeoutExecutor =
1✔
100
        new TimeoutExecutorThread<>(
101
            this, threadGroup, ThreadName.CONFIG_NODE_TIMEOUT_EXECUTOR.getName());
1✔
102
    this.workerMonitorExecutor =
1✔
103
        new TimeoutExecutorThread<>(
104
            this, threadGroup, ThreadName.CONFIG_NODE_WORKER_THREAD_MONITOR.getName());
1✔
105
    workId.set(0);
1✔
106
    workerThreads = new CopyOnWriteArrayList<>();
1✔
107
    for (int i = 0; i < corePoolSize; i++) {
1✔
108
      workerThreads.add(new WorkerThread(threadGroup));
1✔
109
    }
110
    // Add worker monitor
111
    workerMonitorExecutor.add(new WorkerMonitor());
1✔
112

113
    scheduler.start();
1✔
114
    recover();
1✔
115
  }
1✔
116

117
  private void recover() {
118
    // 1.Build rollback stack
119
    int runnableCount = 0;
1✔
120
    int failedCount = 0;
1✔
121
    int waitingCount = 0;
1✔
122
    int waitingTimeoutCount = 0;
1✔
123
    List<Procedure> procedureList = new ArrayList<>();
1✔
124
    // Load procedure wal file
125
    store.load(procedureList);
1✔
126
    for (Procedure<Env> proc : procedureList) {
1✔
127
      if (proc.isFinished()) {
1✔
128
        completed.putIfAbsent(proc.getProcId(), new CompletedProcedureContainer(proc));
1✔
129
      } else {
130
        if (!proc.hasParent()) {
1✔
131
          rollbackStack.put(proc.getProcId(), new RootProcedureStack<>());
1✔
132
        }
133
      }
134
      procedures.putIfAbsent(proc.getProcId(), proc);
1✔
135
      switch (proc.getState()) {
1✔
136
        case RUNNABLE:
137
          runnableCount++;
1✔
138
          break;
1✔
139
        case FAILED:
140
          failedCount++;
×
141
          break;
×
142
        case WAITING:
143
          waitingCount++;
1✔
144
          break;
1✔
145
        case WAITING_TIMEOUT:
146
          waitingTimeoutCount++;
×
147
          break;
×
148
        default:
149
          break;
150
      }
151
    }
1✔
152
    List<Procedure<Env>> runnableList = new ArrayList<>(runnableCount);
1✔
153
    List<Procedure<Env>> failedList = new ArrayList<>(failedCount);
1✔
154
    List<Procedure<Env>> waitingList = new ArrayList<>(waitingCount);
1✔
155
    List<Procedure<Env>> waitingTimeoutList = new ArrayList<>(waitingTimeoutCount);
1✔
156
    for (Procedure<Env> proc : procedureList) {
1✔
157
      if (proc.isFinished() && !proc.hasParent()) {
1✔
158
        continue;
×
159
      }
160
      long rootProcedureId = getRootProcId(proc);
1✔
161
      if (proc.hasParent()) {
1✔
162
        Procedure<Env> parent = procedures.get(proc.getParentProcId());
1✔
163
        if (parent != null && !proc.isFinished()) {
1✔
164
          parent.incChildrenLatch();
1✔
165
        }
166
      }
167
      RootProcedureStack rootStack = rollbackStack.get(rootProcedureId);
1✔
168
      if (rootStack != null) {
1✔
169
        rootStack.loadStack(proc);
1✔
170
      }
171
      proc.setRootProcedureId(rootProcedureId);
1✔
172
      switch (proc.getState()) {
1✔
173
        case RUNNABLE:
174
          runnableList.add(proc);
1✔
175
          break;
1✔
176
        case FAILED:
177
          failedList.add(proc);
×
178
          break;
×
179
        case WAITING:
180
          waitingList.add(proc);
1✔
181
          break;
1✔
182
        case WAITING_TIMEOUT:
183
          waitingTimeoutList.add(proc);
×
184
          break;
×
185
        case ROLLEDBACK:
186
        case INITIALIZING:
187
          LOG.error("Unexpected state:{} for {}", proc.getState(), proc);
×
188
          throw new UnsupportedOperationException("Unexpected state");
×
189
        default:
190
          break;
191
      }
192
    }
1✔
193

194
    waitingList.forEach(
1✔
195
        procedure -> {
196
          if (procedure.hasChildren()) {
1✔
197
            procedure.setState(ProcedureState.RUNNABLE);
1✔
198
            runnableList.add(procedure);
1✔
199
          } else {
200
            procedure.afterRecover(environment);
×
201
          }
202
        });
1✔
203
    restoreLocks();
1✔
204

205
    waitingTimeoutList.forEach(
1✔
206
        procedure -> {
207
          procedure.afterRecover(environment);
×
208
          timeoutExecutor.add(procedure);
×
209
        });
×
210

211
    failedList.forEach(scheduler::addBack);
1✔
212
    runnableList.forEach(
1✔
213
        procedure -> {
214
          procedure.afterRecover(environment);
1✔
215
          scheduler.addBack(procedure);
1✔
216
        });
1✔
217
    scheduler.signalAll();
1✔
218
  }
1✔
219

220
  public long getRootProcId(Procedure proc) {
221
    return Procedure.getRootProcedureId(procedures, proc);
1✔
222
  }
223

224
  private void releaseLock(Procedure<Env> procedure, boolean force) {
225
    if (force || !procedure.holdLock(this.environment) || procedure.isFinished()) {
1✔
226
      procedure.doReleaseLock(this.environment, store);
1✔
227
    }
228
  }
1✔
229

230
  private void restoreLock(Procedure procedure, Set<Long> restored) {
231
    procedure.restoreLock(environment);
1✔
232
    restored.add(procedure.getProcId());
1✔
233
  }
1✔
234

235
  private void restoreLocks(Deque<Procedure<Env>> stack, Set<Long> restored) {
236
    while (!stack.isEmpty()) {
1✔
237
      restoreLock(stack.pop(), restored);
1✔
238
    }
239
  }
1✔
240

241
  private void restoreLocks() {
242
    Set<Long> restored = new HashSet<>();
1✔
243
    Deque<Procedure<Env>> stack = new ArrayDeque<>();
1✔
244
    procedures
1✔
245
        .values()
1✔
246
        .forEach(
1✔
247
            procedure -> {
248
              while (procedure != null) {
1✔
249
                if (restored.contains(procedure.getProcId())) {
1✔
250
                  restoreLocks(stack, restored);
1✔
251
                  return;
1✔
252
                }
253
                if (!procedure.hasParent()) {
1✔
254
                  restoreLock(procedure, restored);
1✔
255
                  restoreLocks(stack, restored);
1✔
256
                  return;
1✔
257
                }
258
                stack.push(procedure);
1✔
259
                procedure = procedures.get(procedure.getParentProcId());
1✔
260
              }
261
            });
×
262
  }
1✔
263

264
  public void startWorkers() {
265
    if (!running.compareAndSet(false, true)) {
1✔
266
      LOG.warn("Already running");
×
267
      return;
×
268
    }
269
    timeoutExecutor.start();
1✔
270
    workerMonitorExecutor.start();
1✔
271
    for (WorkerThread workerThread : workerThreads) {
1✔
272
      workerThread.start();
1✔
273
    }
1✔
274
  }
1✔
275

276
  public void startCompletedCleaner(long cleanTimeInterval, long cleanEvictTTL) {
277
    addInternalProcedure(
×
278
        new CompletedProcedureRecycler(store, completed, cleanTimeInterval, cleanEvictTTL));
279
  }
×
280

281
  private void addInternalProcedure(InternalProcedure interalProcedure) {
282
    if (interalProcedure == null) {
×
283
      return;
×
284
    }
285
    interalProcedure.setState(ProcedureState.WAITING_TIMEOUT);
×
286
    timeoutExecutor.add(interalProcedure);
×
287
  }
×
288

289
  public boolean removeInternalProcedure(InternalProcedure internalProcedure) {
290
    if (internalProcedure == null) {
×
291
      return true;
×
292
    }
293
    internalProcedure.setState(ProcedureState.SUCCESS);
×
294
    return timeoutExecutor.remove(internalProcedure);
×
295
  }
296

297
  /**
298
   * Get next Procedure id
299
   *
300
   * @return next procedure id
301
   */
302
  private long nextProcId() {
303
    long procId = lastProcId.incrementAndGet();
1✔
304
    if (procId < 0) {
1✔
305
      while (!lastProcId.compareAndSet(procId, 0)) {
×
306
        procId = lastProcId.get();
×
307
        if (procId >= 0) {
×
308
          break;
×
309
        }
310
      }
311
      while (procedures.containsKey(procId)) {
×
312
        procId = lastProcId.incrementAndGet();
×
313
      }
314
    }
315
    return procId;
1✔
316
  }
317

318
  /**
319
   * Executes procedure
320
   *
321
   * <p>Calls doExecute() if success and return subprocedures submit sub procs set the state to
322
   * WAITING, wait for all sub procs completed. else if no sub procs procedure completed
323
   * successfully set procedure's parent to RUNNABLE in case of failure start rollback of the
324
   * procedure.
325
   *
326
   * @param proc procedure
327
   */
328
  private void executeProcedure(Procedure<Env> proc) {
329
    if (proc.isFinished()) {
1✔
330
      LOG.debug("{} is already finished.", proc);
×
331
      return;
×
332
    }
333
    final Long rootProcId = getRootProcedureId(proc);
1✔
334
    if (rootProcId == null) {
1✔
335
      LOG.warn("Rollback because parent is done/rolledback, proc is {}", proc);
×
336
      executeRollback(proc);
×
337
      return;
×
338
    }
339
    RootProcedureStack<Env> rootProcStack = rollbackStack.get(rootProcId);
1✔
340
    if (rootProcStack == null) {
1✔
341
      LOG.warn("Rollback stack is null for {}", proc.getProcId());
×
342
      return;
×
343
    }
344
    do {
345
      if (!rootProcStack.acquire()) {
1✔
346
        if (rootProcStack.setRollback()) {
1✔
347
          switch (executeRootStackRollback(rootProcId, rootProcStack)) {
1✔
348
            case LOCK_ACQUIRED:
349
              break;
1✔
350
            case LOCK_EVENT_WAIT:
351
              LOG.info("LOCK_EVENT_WAIT rollback " + proc);
×
352
              rootProcStack.unsetRollback();
×
353
              break;
×
354
            case LOCK_YIELD_WAIT:
355
              rootProcStack.unsetRollback();
×
356
              scheduler.yield(proc);
×
357
              break;
×
358
            default:
359
              throw new UnsupportedOperationException();
×
360
          }
361
        } else {
362
          if (!proc.wasExecuted()) {
1✔
363
            switch (executeRollback(proc)) {
1✔
364
              case LOCK_ACQUIRED:
365
                break;
1✔
366
              case LOCK_EVENT_WAIT:
367
                LOG.info("LOCK_EVENT_WAIT can't rollback child running for {}", proc);
×
368
                break;
×
369
              case LOCK_YIELD_WAIT:
370
                scheduler.yield(proc);
×
371
                break;
×
372
              default:
373
                throw new UnsupportedOperationException();
×
374
            }
375
          }
376
        }
377
        break;
378
      }
379
      ProcedureLockState lockState = acquireLock(proc);
1✔
380
      switch (lockState) {
1✔
381
        case LOCK_ACQUIRED:
382
          executeProcedure(rootProcStack, proc);
1✔
383
          break;
1✔
384
        case LOCK_YIELD_WAIT:
385
        case LOCK_EVENT_WAIT:
386
          LOG.info("{} lockstate is {}", proc, lockState);
×
387
          break;
×
388
        default:
389
          throw new UnsupportedOperationException();
×
390
      }
391
      rootProcStack.release();
1✔
392

393
      if (proc.isSuccess()) {
1✔
394
        LOG.info("{} finished in {}ms successfully.", proc, proc.elapsedTime());
1✔
395
        if (proc.getProcId() == rootProcId) {
1✔
396
          rootProcedureCleanup(proc);
1✔
397
        } else {
398
          executeCompletionCleanup(proc);
1✔
399
        }
400
        return;
1✔
401
      }
402

403
    } while (rootProcStack.isFailed());
1✔
404
  }
1✔
405

406
  /**
407
   * execute procedure and submit its children
408
   *
409
   * @param rootProcStack procedure's root proc stack
410
   * @param proc procedure
411
   */
412
  private void executeProcedure(RootProcedureStack rootProcStack, Procedure<Env> proc) {
413
    Preconditions.checkArgument(
1✔
414
        proc.getState() == ProcedureState.RUNNABLE, "NOT RUNNABLE! " + proc);
1✔
415
    boolean suspended = false;
1✔
416
    boolean reExecute;
417

418
    Procedure<Env>[] subprocs = null;
1✔
419
    do {
420
      reExecute = false;
1✔
421
      proc.resetPersistance();
1✔
422
      try {
423
        subprocs = proc.doExecute(this.environment);
1✔
424
        if (subprocs != null && subprocs.length == 0) {
1✔
425
          subprocs = null;
1✔
426
        }
427
      } catch (ProcedureSuspendedException e) {
×
428
        LOG.debug("Suspend {}", proc);
×
429
        suspended = true;
×
430
      } catch (ProcedureYieldException e) {
×
431
        LOG.debug("Yield {}", proc);
×
432
        yieldProcedure(proc);
×
433
      } catch (InterruptedException e) {
×
434
        LOG.warn("Interrupt during execution, suspend or retry it later.", e);
×
435
        yieldProcedure(proc);
×
436
      } catch (Throwable e) {
1✔
437
        LOG.error("CODE-BUG:{}", proc, e);
1✔
438
        proc.setFailure(new ProcedureException(e.getMessage(), e));
1✔
439
      }
1✔
440

441
      if (!proc.isFailed()) {
1✔
442
        if (subprocs != null) {
1✔
443
          if (subprocs.length == 1 && subprocs[0] == proc) {
1✔
444
            subprocs = null;
1✔
445
            reExecute = true;
1✔
446
          } else {
447
            subprocs = initializeChildren(rootProcStack, proc, subprocs);
1✔
448
            LOG.info("Initialized sub procs:{}", Arrays.toString(subprocs));
1✔
449
          }
450
        } else if (proc.getState() == ProcedureState.WAITING_TIMEOUT) {
1✔
451
          LOG.info("Added into timeoutExecutor {}", proc);
×
452
        } else if (!suspended) {
1✔
453
          proc.setState(ProcedureState.SUCCESS);
1✔
454
        }
455
      }
456
      // add procedure into rollback stack.
457
      rootProcStack.addRollbackStep(proc);
1✔
458

459
      if (proc.needPersistance()) {
1✔
460
        updateStoreOnExecution(rootProcStack, proc, subprocs);
1✔
461
      }
462

463
      if (!store.isRunning()) {
1✔
464
        return;
×
465
      }
466

467
      if (proc.isRunnable() && !suspended && proc.isYieldAfterExecution(this.environment)) {
1✔
468
        yieldProcedure(proc);
×
469
        return;
×
470
      }
471
    } while (reExecute);
1✔
472

473
    if (subprocs != null && !proc.isFailed()) {
1✔
474
      submitChildrenProcedures(subprocs);
1✔
475
    }
476

477
    releaseLock(proc, false);
1✔
478
    if (!suspended && proc.isFinished() && proc.hasParent()) {
1✔
479
      countDownChildren(rootProcStack, proc);
1✔
480
    }
481
  }
1✔
482

483
  /**
484
   * Serve as a countdown latch to check whether all children has completed.
485
   *
486
   * @param rootProcStack root procedure stack
487
   * @param proc proc
488
   */
489
  private void countDownChildren(RootProcedureStack rootProcStack, Procedure<Env> proc) {
490
    Procedure<Env> parent = procedures.get(proc.getParentProcId());
1✔
491
    if (parent == null && rootProcStack.isRollingback()) {
1✔
492
      return;
×
493
    }
494
    if (parent != null && parent.tryRunnable()) {
1✔
495
      // If success, means all its children have completed, move parent to front of the queue.
496
      store.update(parent);
1✔
497
      scheduler.addFront(parent);
1✔
498
      LOG.info(
1✔
499
          "Finished subprocedure pid={}, resume processing ppid={}",
500
          proc.getProcId(),
1✔
501
          parent.getProcId());
1✔
502
    }
503
  }
1✔
504

505
  /**
506
   * Submit children procedures.
507
   *
508
   * @param subprocs children procedures
509
   */
510
  private void submitChildrenProcedures(Procedure<Env>[] subprocs) {
511
    for (Procedure<Env> subproc : subprocs) {
1✔
512
      procedures.put(subproc.getProcId(), subproc);
1✔
513
      scheduler.addFront(subproc);
1✔
514
    }
515
  }
1✔
516

517
  private void updateStoreOnExecution(
518
      RootProcedureStack rootProcStack, Procedure<Env> proc, Procedure<Env>[] subprocs) {
519
    if (subprocs != null && !proc.isFailed()) {
1✔
520
      if (LOG.isDebugEnabled()) {
1✔
521
        LOG.debug("Stored {}, children {}", proc, Arrays.toString(subprocs));
×
522
      }
523
      store.update(subprocs);
1✔
524
    } else {
525
      LOG.debug("Store update {}", proc);
1✔
526
      if (proc.isFinished() && !proc.hasParent()) {
1✔
527
        final long[] childProcIds = rootProcStack.getSubprocedureIds();
1✔
528
        if (childProcIds != null) {
1✔
529
          store.delete(childProcIds);
1✔
530
          for (long childProcId : childProcIds) {
1✔
531
            procedures.remove(childProcId);
1✔
532
          }
533
        } else {
534
          store.update(proc);
×
535
        }
536
      } else {
1✔
537
        store.update(proc);
1✔
538
      }
539
    }
540
  }
1✔
541

542
  private Procedure<Env>[] initializeChildren(
543
      RootProcedureStack rootProcStack, Procedure<Env> proc, Procedure<Env>[] subprocs) {
544
    final long rootProcedureId = getRootProcedureId(proc);
1✔
545
    for (int i = 0; i < subprocs.length; i++) {
1✔
546
      Procedure<Env> subproc = subprocs[i];
1✔
547
      if (subproc == null) {
1✔
548
        String errMsg = "subproc[" + i + "] is null, aborting procedure";
×
549
        proc.setFailure(new ProcedureException((errMsg), new IllegalArgumentException(errMsg)));
×
550
        return null;
×
551
      }
552
      subproc.setParentProcId(proc.getProcId());
1✔
553
      subproc.setRootProcId(rootProcedureId);
1✔
554
      subproc.setProcId(nextProcId());
1✔
555
      subproc.setProcRunnable();
1✔
556
      rootProcStack.addSubProcedure(subproc);
1✔
557
    }
558

559
    if (!proc.isFailed()) {
1✔
560
      proc.setChildrenLatch(subprocs.length);
1✔
561
      switch (proc.getState()) {
1✔
562
        case RUNNABLE:
563
          proc.setState(ProcedureState.WAITING);
1✔
564
          break;
1✔
565
        case WAITING_TIMEOUT:
566
          timeoutExecutor.add(proc);
×
567
          break;
×
568
        default:
569
          break;
570
      }
571
    }
572
    return subprocs;
1✔
573
  }
574

575
  private void yieldProcedure(Procedure<Env> proc) {
576
    releaseLock(proc, false);
×
577
    scheduler.yield(proc);
×
578
  }
×
579

580
  /**
581
   * Rollback full root procedure stack.
582
   *
583
   * @param rootProcId root procedure id
584
   * @param procedureStack root procedure stack
585
   * @return lock state
586
   */
587
  private ProcedureLockState executeRootStackRollback(
588
      Long rootProcId, RootProcedureStack procedureStack) {
589
    Procedure<Env> rootProcedure = procedures.get(rootProcId);
1✔
590
    ProcedureException exception = rootProcedure.getException();
1✔
591
    if (exception == null) {
1✔
592
      exception = procedureStack.getException();
1✔
593
      rootProcedure.setFailure(exception);
1✔
594
      store.update(rootProcedure);
1✔
595
    }
596
    List<Procedure<Env>> subprocStack = procedureStack.getSubproceduresStack();
1✔
597
    int stackTail = subprocStack.size();
1✔
598
    while (stackTail-- > 0) {
1✔
599
      Procedure<Env> procedure = subprocStack.get(stackTail);
1✔
600
      if (procedure.isSuccess()) {
1✔
601
        subprocStack.remove(stackTail);
1✔
602
        cleanupAfterRollback(procedure);
1✔
603
        continue;
1✔
604
      }
605
      ProcedureLockState lockState = acquireLock(procedure);
1✔
606
      if (lockState != ProcedureLockState.LOCK_ACQUIRED) {
1✔
607
        return lockState;
×
608
      }
609
      lockState = executeRollback(procedure);
1✔
610
      releaseLock(procedure, false);
1✔
611

612
      boolean abortRollback = lockState != ProcedureLockState.LOCK_ACQUIRED;
1✔
613
      abortRollback |= !isRunning() || !store.isRunning();
1✔
614
      if (abortRollback) {
1✔
615
        return lockState;
×
616
      }
617

618
      if (!procedure.isFinished() && procedure.isYieldAfterExecution(this.environment)) {
1✔
619
        return ProcedureLockState.LOCK_YIELD_WAIT;
×
620
      }
621

622
      if (procedure != rootProcedure) {
1✔
623
        executeCompletionCleanup(procedure);
1✔
624
      }
625
    }
1✔
626

627
    LOG.info("Rolled back {}, time duration is {}", rootProcedure, rootProcedure.elapsedTime());
1✔
628
    rootProcedureCleanup(rootProcedure);
1✔
629
    return ProcedureLockState.LOCK_ACQUIRED;
1✔
630
  }
631

632
  private ProcedureLockState acquireLock(Procedure<Env> proc) {
633
    if (proc.hasLock()) {
1✔
634
      return ProcedureLockState.LOCK_ACQUIRED;
×
635
    }
636
    return proc.doAcquireLock(this.environment, store);
1✔
637
  }
638

639
  /**
640
   * do execute defined in procedure and then update store or remove completely in case it is a
641
   * child.
642
   *
643
   * @param procedure procedure
644
   * @return procedure lock state
645
   */
646
  private ProcedureLockState executeRollback(Procedure<Env> procedure) {
647
    try {
648
      procedure.doRollback(this.environment);
1✔
649
    } catch (IOException e) {
×
650
      LOG.error("Roll back failed for {}", procedure, e);
×
651
    } catch (InterruptedException e) {
×
652
      LOG.warn("Interrupted exception occured for {}", procedure, e);
×
653
    } catch (Throwable t) {
×
654
      LOG.error("CODE-BUG: runtime exception for {}", procedure, t);
×
655
    }
1✔
656
    cleanupAfterRollback(procedure);
1✔
657
    return ProcedureLockState.LOCK_ACQUIRED;
1✔
658
  }
659

660
  private void cleanupAfterRollback(Procedure<Env> procedure) {
661
    if (procedure.removeStackIndex()) {
1✔
662
      if (!procedure.isSuccess()) {
1✔
663
        procedure.setState(ProcedureState.ROLLEDBACK);
1✔
664
      }
665
      if (procedure.hasParent()) {
1✔
666
        store.delete(procedure.getProcId());
1✔
667
        procedures.remove(procedure.getProcId());
1✔
668
      } else {
669
        final long[] childProcIds = rollbackStack.get(procedure.getProcId()).getSubprocedureIds();
1✔
670
        if (childProcIds != null) {
1✔
671
          store.delete(childProcIds);
1✔
672
        } else {
673
          store.update(procedure);
×
674
        }
675
      }
1✔
676
    } else {
677
      store.update(procedure);
1✔
678
    }
679
  }
1✔
680

681
  private void executeCompletionCleanup(Procedure<Env> proc) {
682
    if (proc.hasLock()) {
1✔
683
      releaseLock(proc, true);
×
684
    }
685
    try {
686
      proc.completionCleanup(this.environment);
1✔
687
    } catch (Throwable e) {
×
688
      LOG.error("CODE-BUG:Uncaught runtime exception for procedure {}", proc, e);
×
689
    }
1✔
690
  }
1✔
691

692
  private void rootProcedureCleanup(Procedure<Env> proc) {
693
    executeCompletionCleanup(proc);
1✔
694
    CompletedProcedureContainer<Env> retainer = new CompletedProcedureContainer<>(proc);
1✔
695
    completed.put(proc.getProcId(), retainer);
1✔
696
    rollbackStack.remove(proc.getProcId());
1✔
697
    procedures.remove(proc.getProcId());
1✔
698
  }
1✔
699

700
  private Long getRootProcedureId(Procedure<Env> proc) {
701
    return Procedure.getRootProcedureId(procedures, proc);
1✔
702
  }
703

704
  /**
705
   * Add a Procedure to executor.
706
   *
707
   * @param procedure procedure
708
   * @return procedure id
709
   */
710
  private long pushProcedure(Procedure<Env> procedure) {
711
    final long currentProcId = procedure.getProcId();
1✔
712
    RootProcedureStack<Env> stack = new RootProcedureStack<>();
1✔
713
    rollbackStack.put(currentProcId, stack);
1✔
714
    procedures.put(currentProcId, procedure);
1✔
715
    scheduler.addBack(procedure);
1✔
716
    return procedure.getProcId();
1✔
717
  }
718

719
  private class WorkerThread extends StoppableThread {
720
    private final AtomicLong startTime = new AtomicLong(Long.MAX_VALUE);
1✔
721
    private final AtomicReference<Procedure<Env>> activeProcedure = new AtomicReference<>();
1✔
722
    protected long keepAliveTime = -1;
1✔
723

724
    public WorkerThread(ThreadGroup threadGroup) {
725
      this(threadGroup, "ProcExecWorker-");
1✔
726
    }
1✔
727

728
    public WorkerThread(ThreadGroup threadGroup, String prefix) {
1✔
729
      super(threadGroup, prefix + workId.incrementAndGet());
1✔
730
      setDaemon(true);
1✔
731
    }
1✔
732

733
    @Override
734
    public void sendStopSignal() {
735
      scheduler.signalAll();
×
736
    }
×
737

738
    @Override
739
    public void run() {
740
      long lastUpdated = System.currentTimeMillis();
1✔
741
      try {
742
        while (isRunning() && keepAlive(lastUpdated)) {
1✔
743
          Procedure<Env> procedure = scheduler.poll(keepAliveTime, TimeUnit.MILLISECONDS);
1✔
744
          if (procedure == null) {
1✔
745
            continue;
1✔
746
          }
747
          this.activeProcedure.set(procedure);
1✔
748
          int activeCount = activeExecutorCount.incrementAndGet();
1✔
749
          startTime.set(System.currentTimeMillis());
1✔
750
          executeProcedure(procedure);
1✔
751
          activeCount = activeExecutorCount.decrementAndGet();
1✔
752
          LOG.trace("Halt pid={}, activeCount={}", procedure.getProcId(), activeCount);
1✔
753
          this.activeProcedure.set(null);
1✔
754
          lastUpdated = System.currentTimeMillis();
1✔
755
          startTime.set(lastUpdated);
1✔
756
        }
1✔
757

758
      } catch (Throwable throwable) {
×
759
        if (this.activeProcedure.get() != null) {
×
760
          LOG.warn("Worker terminated {}", this.activeProcedure.get(), throwable);
×
761
        }
762
      } finally {
763
        LOG.debug("Worker terminated.");
1✔
764
      }
765
      workerThreads.remove(this);
1✔
766
    }
1✔
767

768
    protected boolean keepAlive(long lastUpdated) {
769
      return true;
1✔
770
    }
771

772
    @Override
773
    public String toString() {
774
      Procedure<?> p = this.activeProcedure.get();
1✔
775
      return getName() + "(pid=" + (p == null ? Procedure.NO_PROC_ID : p.getProcId() + ")");
1✔
776
    }
777

778
    /** @return the time since the current procedure is running */
779
    public long getCurrentRunTime() {
780
      return System.currentTimeMillis() - startTime.get();
1✔
781
    }
782
  }
783

784
  // A worker thread which can be added when core workers are stuck. Will timeout after
785
  // keepAliveTime if there is no procedure to run.
786
  private final class KeepAliveWorkerThread extends WorkerThread {
787

788
    public KeepAliveWorkerThread(ThreadGroup group) {
1✔
789
      super(group, "KAProcExecWorker-");
1✔
790
      this.keepAliveTime = TimeUnit.SECONDS.toMillis(10);
1✔
791
    }
1✔
792

793
    @Override
794
    protected boolean keepAlive(long lastUpdate) {
795
      return System.currentTimeMillis() - lastUpdate < keepAliveTime;
1✔
796
    }
797
  }
798

799
  private final class WorkerMonitor extends InternalProcedure<Env> {
800
    private static final int DEFAULT_WORKER_MONITOR_INTERVAL = 5000; // 5sec
801

802
    private static final int DEFAULT_WORKER_STUCK_THRESHOLD = 10000; // 10sec
803

804
    private static final float DEFAULT_WORKER_ADD_STUCK_PERCENTAGE = 0.5f; // 50% stuck
805

806
    public WorkerMonitor() {
1✔
807
      super(DEFAULT_WORKER_MONITOR_INTERVAL);
1✔
808
      updateTimestamp();
1✔
809
    }
1✔
810

811
    private int checkForStuckWorkers() {
812
      // Check if any of the worker is stuck
813
      int stuckCount = 0;
1✔
814
      for (WorkerThread worker : workerThreads) {
1✔
815
        if (worker.activeProcedure.get() == null
1✔
816
            || worker.getCurrentRunTime() < DEFAULT_WORKER_STUCK_THRESHOLD) {
1✔
817
          continue;
1✔
818
        }
819

820
        // WARN the worker is stuck
821
        stuckCount++;
1✔
822
        LOG.warn("Worker stuck {}, run time {} ms", worker, worker.getCurrentRunTime());
1✔
823
      }
1✔
824
      return stuckCount;
1✔
825
    }
826

827
    private void checkThreadCount(final int stuckCount) {
828
      // Nothing to do if there are no runnable tasks
829
      if (stuckCount < 1 || !scheduler.hasRunnables()) {
1✔
830
        return;
1✔
831
      }
832
      // Add a new thread if the worker stuck percentage exceed the threshold limit
833
      // and every handler is active.
834
      final float stuckPerc = ((float) stuckCount) / workerThreads.size();
1✔
835
      // Let's add new worker thread more aggressively, as they will timeout finally if there is no
836
      // work to do.
837
      if (stuckPerc >= DEFAULT_WORKER_ADD_STUCK_PERCENTAGE && workerThreads.size() < maxPoolSize) {
1✔
838
        final KeepAliveWorkerThread worker = new KeepAliveWorkerThread(threadGroup);
1✔
839
        workerThreads.add(worker);
1✔
840
        worker.start();
1✔
841
        LOG.debug("Added new worker thread {}", worker);
1✔
842
      }
843
    }
1✔
844

845
    @Override
846
    protected void periodicExecute(Env env) {
847
      final int stuckCount = checkForStuckWorkers();
1✔
848
      checkThreadCount(stuckCount);
1✔
849
      updateTimestamp();
1✔
850
    }
1✔
851
  }
852

853
  public int getWorkerThreadCount() {
854
    return workerThreads.size();
1✔
855
  }
856

857
  public boolean isRunning() {
858
    return running.get();
1✔
859
  }
860

861
  public void stop() {
862
    if (!running.getAndSet(false)) {
1✔
863
      return;
×
864
    }
865
    LOG.info("Stopping");
1✔
866
    scheduler.stop();
1✔
867
    timeoutExecutor.sendStopSignal();
1✔
868
  }
1✔
869

870
  public void join() {
871
    timeoutExecutor.awaitTermination();
1✔
872
    workerMonitorExecutor.awaitTermination();
1✔
873
    for (WorkerThread workerThread : workerThreads) {
1✔
874
      workerThread.awaitTermination();
×
875
    }
×
876
    try {
877
      threadGroup.destroy();
1✔
878
    } catch (IllegalThreadStateException e) {
×
879
      LOG.warn(
×
880
          "ProcedureExecutor threadGroup {} contains running threads which are used by non-procedure module.",
881
          this.threadGroup);
882
      this.threadGroup.list();
×
883
    }
1✔
884
  }
1✔
885

886
  public boolean isStarted(long procId) {
887
    Procedure<Env> procedure = procedures.get(procId);
×
888
    if (procedure == null) {
×
889
      return completed.get(procId) != null;
×
890
    }
891
    return procedure.wasExecuted();
×
892
  }
893

894
  public boolean isFinished(final long procId) {
895
    return !procedures.containsKey(procId);
1✔
896
  }
897

898
  public ConcurrentHashMap<Long, Procedure> getProcedures() {
899
    return procedures;
1✔
900
  }
901

902
  // -----------------------------CLIENT IMPLEMENTATION-----------------------------------
903
  /**
904
   * Submit a new root-procedure to the executor, called by client.
905
   *
906
   * @param procedure root procedure
907
   * @return procedure id
908
   */
909
  public long submitProcedure(Procedure<Env> procedure) {
910
    Preconditions.checkArgument(lastProcId.get() >= 0);
1✔
911
    Preconditions.checkArgument(procedure.getState() == ProcedureState.INITIALIZING);
1✔
912
    Preconditions.checkArgument(!procedure.hasParent(), "Unexpected parent", procedure);
1✔
913
    final long currentProcId = nextProcId();
1✔
914
    // Initialize the procedure
915
    procedure.setProcId(currentProcId);
1✔
916
    procedure.setProcRunnable();
1✔
917
    // Commit the transaction
918
    store.update(procedure);
1✔
919
    LOG.debug("{} is stored.", procedure);
1✔
920
    // Add the procedure to the executor
921
    return pushProcedure(procedure);
1✔
922
  }
923

924
  /**
925
   * Abort a specified procedure.
926
   *
927
   * @param procId procedure id
928
   * @param force whether abort the running procdure.
929
   * @return true if the procedure exists and has received the abort.
930
   */
931
  public boolean abort(long procId, boolean force) {
932
    Procedure<Env> procedure = procedures.get(procId);
×
933
    if (procedure != null) {
×
934
      if (!force && procedure.wasExecuted()) {
×
935
        return false;
×
936
      }
937
      return procedure.abort(this.environment);
×
938
    }
939
    return false;
×
940
  }
941

942
  public boolean abort(long procId) {
943
    return abort(procId, true);
×
944
  }
945

946
  public Procedure<Env> getResult(long procId) {
947
    CompletedProcedureContainer retainer = completed.get(procId);
×
948
    if (retainer == null) {
×
949
      return null;
×
950
    } else {
951
      return retainer.getProcedure();
×
952
    }
953
  }
954

955
  /**
956
   * Query a procedure result.
957
   *
958
   * @param procId procedure id
959
   * @return procedure or retainer
960
   */
961
  public Procedure<Env> getResultOrProcedure(long procId) {
962
    CompletedProcedureContainer retainer = completed.get(procId);
1✔
963
    if (retainer == null) {
1✔
964
      return procedures.get(procId);
×
965
    } else {
966
      return retainer.getProcedure();
1✔
967
    }
968
  }
969

970
  public ProcedureScheduler getScheduler() {
971
    return scheduler;
1✔
972
  }
973

974
  public Env getEnvironment() {
975
    return environment;
1✔
976
  }
977

978
  public IProcedureStore getStore() {
979
    return store;
×
980
  }
981

982
  public RootProcedureStack<Env> getRollbackStack(long rootProcId) {
983
    return rollbackStack.get(rootProcId);
×
984
  }
985
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc