• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

apache / iotdb / #9967

30 Aug 2023 04:22PM CUT coverage: 47.7% (+0.04%) from 47.658%
#9967

push

travis_ci

web-flow
Pipe: Fix start-time and end-time parameters not working when extracting history data (#11001) (#11002)

(cherry picked from commit 35736cc67)

12 of 12 new or added lines in 6 files covered. (100.0%)

80165 of 168062 relevant lines covered (47.7%)

0.48 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

62.54
/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/Procedure.java
1
/*
2
 * Licensed to the Apache Software Foundation (ASF) under one
3
 * or more contributor license agreements.  See the NOTICE file
4
 * distributed with this work for additional information
5
 * regarding copyright ownership.  The ASF licenses this file
6
 * to you under the Apache License, Version 2.0 (the
7
 * "License"); you may not use this file except in compliance
8
 * with the License.  You may obtain a copy of the License at
9
 *
10
 *     http://www.apache.org/licenses/LICENSE-2.0
11
 *
12
 * Unless required by applicable law or agreed to in writing,
13
 * software distributed under the License is distributed on an
14
 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15
 * KIND, either express or implied.  See the License for the
16
 * specific language governing permissions and limitations
17
 * under the License.
18
 */
19

20
package org.apache.iotdb.confignode.procedure;
21

22
import org.apache.iotdb.confignode.procedure.exception.ProcedureAbortedException;
23
import org.apache.iotdb.confignode.procedure.exception.ProcedureException;
24
import org.apache.iotdb.confignode.procedure.exception.ProcedureSuspendedException;
25
import org.apache.iotdb.confignode.procedure.exception.ProcedureTimeoutException;
26
import org.apache.iotdb.confignode.procedure.exception.ProcedureYieldException;
27
import org.apache.iotdb.confignode.procedure.state.ProcedureLockState;
28
import org.apache.iotdb.confignode.procedure.state.ProcedureState;
29
import org.apache.iotdb.confignode.procedure.store.IProcedureStore;
30

31
import org.slf4j.Logger;
32
import org.slf4j.LoggerFactory;
33

34
import java.io.DataOutputStream;
35
import java.io.IOException;
36
import java.lang.reflect.InvocationTargetException;
37
import java.nio.ByteBuffer;
38
import java.nio.charset.StandardCharsets;
39
import java.util.ArrayList;
40
import java.util.Arrays;
41
import java.util.List;
42
import java.util.Map;
43
import java.util.concurrent.atomic.AtomicReference;
44

45
/**
46
 * Abstract class of all procedures.
47
 *
48
 * @param <Env>
49
 */
50
public abstract class Procedure<Env> implements Comparable<Procedure<Env>> {
1✔
51
  private static final Logger LOG = LoggerFactory.getLogger(Procedure.class);
1✔
52
  public static final long NO_PROC_ID = -1;
53
  public static final long NO_TIMEOUT = -1;
54

55
  private long parentProcId = NO_PROC_ID;
1✔
56
  private long rootProcId = NO_PROC_ID;
1✔
57
  private long procId = NO_PROC_ID;
1✔
58
  private long submittedTime;
59

60
  private ProcedureState state = ProcedureState.INITIALIZING;
1✔
61
  private int childrenLatch = 0;
1✔
62
  private ProcedureException exception;
63

64
  private volatile long timeout = NO_TIMEOUT;
1✔
65
  private volatile long lastUpdate;
66

67
  private final AtomicReference<byte[]> result = new AtomicReference<>();
1✔
68
  private volatile boolean locked = false;
1✔
69
  private boolean lockedWhenLoading = false;
1✔
70

71
  private int[] stackIndexes = null;
1✔
72

73
  private boolean persist = true;
1✔
74

75
  public boolean needPersistance() {
76
    return this.persist;
1✔
77
  }
78

79
  public void resetPersistance() {
80
    this.persist = true;
1✔
81
  }
1✔
82

83
  public final void skipPersistance() {
84
    this.persist = false;
×
85
  }
×
86

87
  public final boolean hasLock() {
88
    return locked;
1✔
89
  }
90

91
  // User level code, override it if necessary
92

93
  /**
94
   * The main code of the procedure. It must be idempotent since execute() may be called multiple
95
   * times in case of machine failure in the middle of the execution.
96
   *
97
   * @param env the environment passed to the ProcedureExecutor
98
   * @return a set of sub-procedures to run or ourselves if there is more work to do or null if the
99
   *     procedure is done.
100
   * @throws ProcedureYieldException the procedure will be added back to the queue and retried
101
   *     later.
102
   * @throws InterruptedException the procedure will be added back to the queue and retried later.
103
   * @throws ProcedureSuspendedException Signal to the executor that Procedure has suspended itself
104
   *     and has set itself up waiting for an external event to wake it back up again.
105
   */
106
  protected abstract Procedure<Env>[] execute(Env env)
107
      throws ProcedureYieldException, ProcedureSuspendedException, InterruptedException;
108

109
  /**
110
   * The code to undo what was done by the execute() code. It is called when the procedure or one of
111
   * the sub-procedures failed or an abort was requested. It should cleanup all the resources
112
   * created by the execute() call. The implementation must be idempotent since rollback() may be
113
   * called multiple time in case of machine failure in the middle of the execution.
114
   *
115
   * @param env the environment passed to the ProcedureExecutor
116
   * @throws IOException temporary failure, the rollback will retry later
117
   * @throws InterruptedException the procedure will be added back to the queue and retried later
118
   */
119
  protected abstract void rollback(Env env)
120
      throws IOException, InterruptedException, ProcedureException;
121

122
  /**
123
   * The abort() call is asynchronous and each procedure must decide how to deal with it, if they
124
   * want to be abortable. The simplest implementation is to have an AtomicBoolean set in the
125
   * abort() method and then the execute() will check if the abort flag is set or not. abort() may
126
   * be called multiple times from the client, so the implementation must be idempotent.
127
   *
128
   * <p>NOTE: abort() is not like Thread.interrupt(). It is just a notification that allows the
129
   * procedure implementor abort.
130
   */
131
  protected abstract boolean abort(Env env);
132

133
  public void serialize(DataOutputStream stream) throws IOException {
134
    // procid
135
    stream.writeLong(this.procId);
1✔
136
    // state
137
    stream.writeInt(this.state.ordinal());
1✔
138
    // submit time
139
    stream.writeLong(this.submittedTime);
1✔
140
    // last updated
141
    stream.writeLong(this.lastUpdate);
1✔
142
    // parent id
143
    stream.writeLong(this.parentProcId);
1✔
144
    // time out
145
    stream.writeLong(this.timeout);
1✔
146
    // stack indexes
147
    if (stackIndexes != null) {
1✔
148
      stream.writeInt(stackIndexes.length);
1✔
149
      for (int index : stackIndexes) {
1✔
150
        stream.writeInt(index);
1✔
151
      }
152
    } else {
153
      stream.writeInt(-1);
1✔
154
    }
155

156
    // exceptions
157
    if (hasException()) {
1✔
158
      stream.write((byte) 1);
×
159
      String exceptionClassName = exception.getClass().getName();
×
160
      byte[] exceptionClassNameBytes = exceptionClassName.getBytes(StandardCharsets.UTF_8);
×
161
      stream.writeInt(exceptionClassNameBytes.length);
×
162
      stream.write(exceptionClassNameBytes);
×
163
      String message = this.exception.getMessage();
×
164
      if (message != null) {
×
165
        byte[] messageBytes = message.getBytes(StandardCharsets.UTF_8);
×
166
        stream.writeInt(messageBytes.length);
×
167
        stream.write(messageBytes);
×
168
      } else {
×
169
        stream.writeInt(-1);
×
170
      }
171
    } else {
×
172
      stream.write((byte) 0);
1✔
173
    }
174

175
    // result
176
    if (result.get() != null) {
1✔
177
      stream.writeInt(result.get().length);
×
178
      stream.write(result.get());
×
179
    } else {
180
      stream.writeInt(-1);
1✔
181
    }
182

183
    // Has lock
184
    stream.write(this.hasLock() ? (byte) 1 : (byte) 0);
1✔
185
  }
1✔
186

187
  public void deserialize(ByteBuffer byteBuffer) {
188
    // procid
189
    this.setProcId(byteBuffer.getLong());
1✔
190
    // state
191
    this.setState(ProcedureState.values()[byteBuffer.getInt()]);
1✔
192
    //  submit time
193
    this.setSubmittedTime(byteBuffer.getLong());
1✔
194
    //  last updated
195
    this.setLastUpdate(byteBuffer.getLong());
1✔
196
    //  parent id
197
    this.setParentProcId(byteBuffer.getLong());
1✔
198
    //  time out
199
    this.setTimeout(byteBuffer.getLong());
1✔
200
    //  stack index
201
    int stackIndexesLen = byteBuffer.getInt();
1✔
202
    if (stackIndexesLen >= 0) {
1✔
203
      List<Integer> indexList = new ArrayList<>(stackIndexesLen);
1✔
204
      for (int i = 0; i < stackIndexesLen; i++) {
1✔
205
        indexList.add(byteBuffer.getInt());
1✔
206
      }
207
      this.setStackIndexes(indexList);
1✔
208
    }
209
    // exceptions
210
    if (byteBuffer.get() == 1) {
1✔
211
      Class<?> exceptionClass = deserializeTypeInfo(byteBuffer);
×
212
      int messageBytesLength = byteBuffer.getInt();
×
213
      String errMsg = null;
×
214
      if (messageBytesLength > 0) {
×
215
        byte[] messageBytes = new byte[messageBytesLength];
×
216
        byteBuffer.get(messageBytes);
×
217
        errMsg = new String(messageBytes, StandardCharsets.UTF_8);
×
218
      }
219
      ProcedureException exception;
220
      try {
221
        exception =
×
222
            (ProcedureException) exceptionClass.getConstructor(String.class).newInstance(errMsg);
×
223
      } catch (InstantiationException
×
224
          | IllegalAccessException
225
          | InvocationTargetException
226
          | NoSuchMethodException e) {
227
        LOG.warn("Instantiation exception class failed", e);
×
228
        exception = new ProcedureException(errMsg);
×
229
      }
×
230

231
      setFailure(exception);
×
232
    }
233

234
    // result
235
    int resultLen = byteBuffer.getInt();
1✔
236
    if (resultLen > 0) {
1✔
237
      byte[] resultArr = new byte[resultLen];
×
238
      byteBuffer.get(resultArr);
×
239
    }
240
    //  has  lock
241
    if (byteBuffer.get() == 1) {
1✔
242
      this.lockedWhenLoading();
×
243
    }
244
  }
1✔
245

246
  /**
247
   * Deserialize class Name and load class
248
   *
249
   * @param byteBuffer bytebuffer
250
   * @return Procedure
251
   */
252
  public static Class<?> deserializeTypeInfo(ByteBuffer byteBuffer) {
253
    int classNameBytesLen = byteBuffer.getInt();
×
254
    byte[] classNameBytes = new byte[classNameBytesLen];
×
255
    byteBuffer.get(classNameBytes);
×
256
    String className = new String(classNameBytes, StandardCharsets.UTF_8);
×
257
    Class<?> clazz;
258
    try {
259
      clazz = Class.forName(className);
×
260
    } catch (ClassNotFoundException e) {
×
261
      throw new RuntimeException("Invalid procedure class", e);
×
262
    }
×
263
    return clazz;
×
264
  }
265

266
  public static Procedure<?> newInstance(ByteBuffer byteBuffer) {
267
    Class<?> procedureClass = deserializeTypeInfo(byteBuffer);
×
268
    Procedure<?> procedure;
269
    try {
270
      procedure = (Procedure<?>) procedureClass.newInstance();
×
271
    } catch (InstantiationException | IllegalAccessException e) {
×
272
      throw new RuntimeException("Instantiation failed", e);
×
273
    }
×
274
    return procedure;
×
275
  }
276

277
  /**
278
   * The {@link #doAcquireLock(Object, IProcedureStore)} will be split into two steps, first, it
279
   * will call us to determine whether we need to wait for initialization, second, it will call
280
   * {@link #acquireLock(Object)} to actually handle the lock for this procedure.
281
   *
282
   * @return true means we need to wait until the environment has been initialized, otherwise true.
283
   */
284
  protected boolean waitInitialized(Env env) {
285
    return false;
1✔
286
  }
287

288
  /**
289
   * Acquire a lock, user should override it if necessary.
290
   *
291
   * @param env environment
292
   * @return state of lock
293
   */
294
  protected ProcedureLockState acquireLock(Env env) {
295
    return ProcedureLockState.LOCK_ACQUIRED;
1✔
296
  }
297

298
  /**
299
   * Release a lock, user should override it if necessary.
300
   *
301
   * @param env env
302
   */
303
  protected void releaseLock(Env env) {
304
    // no op
305
  }
1✔
306

307
  /**
308
   * Used to keep procedure lock even when the procedure is yielded or suspended.
309
   *
310
   * @param env env
311
   * @return true if hold the lock
312
   */
313
  protected boolean holdLock(Env env) {
314
    return false;
1✔
315
  }
316

317
  /**
318
   * Called before the procedure is recovered and added into the queue.
319
   *
320
   * @param env environment
321
   */
322
  protected final void beforeRecover(Env env) {
323
    // no op
324
  }
×
325

326
  /**
327
   * Called when the procedure is recovered and added into the queue.
328
   *
329
   * @param env environment
330
   */
331
  protected final void afterRecover(Env env) {
332
    // no op
333
  }
1✔
334

335
  /**
336
   * Called when the procedure is completed (success or rollback). The procedure may use this method
337
   * to clean up in-memory states. This operation will not be retried on failure.
338
   *
339
   * @param env environment
340
   */
341
  protected void completionCleanup(Env env) {
342
    // no op
343
  }
1✔
344

345
  /**
346
   * To make executor yield between each execution step to give other procedures a chance to run.
347
   *
348
   * @param env environment
349
   * @return return true if yield is allowed.
350
   */
351
  protected boolean isYieldAfterExecution(Env env) {
352
    return false;
×
353
  }
354

355
  // -------------------------Internal methods - called by the procedureExecutor------------------
356
  /**
357
   * Internal method called by the ProcedureExecutor that starts the user-level code execute().
358
   *
359
   * @param env execute environment
360
   * @return sub procedures
361
   */
362
  protected Procedure<Env>[] doExecute(Env env)
363
      throws ProcedureYieldException, ProcedureSuspendedException, InterruptedException {
364
    try {
365
      updateTimestamp();
1✔
366
      return execute(env);
1✔
367
    } finally {
368
      updateTimestamp();
1✔
369
    }
370
  }
371

372
  /**
373
   * Internal method called by the ProcedureExecutor that starts the user-level code rollback().
374
   *
375
   * @param env execute environment
376
   * @throws IOException ioe
377
   * @throws InterruptedException interrupted exception
378
   */
379
  public void doRollback(Env env) throws IOException, InterruptedException, ProcedureException {
380
    try {
381
      updateTimestamp();
1✔
382
      rollback(env);
1✔
383
    } finally {
384
      updateTimestamp();
1✔
385
    }
386
  }
1✔
387

388
  /**
389
   * Internal method called by the ProcedureExecutor that starts the user-level code acquireLock().
390
   *
391
   * @param env environment
392
   * @param store ProcedureStore
393
   * @return ProcedureLockState
394
   */
395
  public final ProcedureLockState doAcquireLock(Env env, IProcedureStore store) {
396
    if (waitInitialized(env)) {
1✔
397
      return ProcedureLockState.LOCK_EVENT_WAIT;
×
398
    }
399
    if (lockedWhenLoading) {
1✔
400
      lockedWhenLoading = false;
×
401
      locked = true;
×
402
      return ProcedureLockState.LOCK_ACQUIRED;
×
403
    }
404
    ProcedureLockState state = acquireLock(env);
1✔
405
    if (state == ProcedureLockState.LOCK_ACQUIRED) {
1✔
406
      locked = true;
1✔
407
      store.update(this);
1✔
408
    }
409
    return state;
1✔
410
  }
411

412
  /**
413
   * Presist lock state of the procedure
414
   *
415
   * @param env environment
416
   * @param store ProcedureStore
417
   */
418
  public final void doReleaseLock(Env env, IProcedureStore store) {
419
    locked = false;
1✔
420
    if (getState() != ProcedureState.ROLLEDBACK) {
1✔
421
      store.update(this);
1✔
422
    }
423
    releaseLock(env);
1✔
424
  }
1✔
425

426
  public final void restoreLock(Env env) {
427
    if (!lockedWhenLoading) {
1✔
428
      LOG.debug("{} didn't hold the lock before restarting, skip acquiring lock", this);
1✔
429
      return;
1✔
430
    }
431
    if (isFinished()) {
×
432
      LOG.debug("{} is already bypassed, skip acquiring lock.", this);
×
433
      return;
×
434
    }
435
    if (getState() == ProcedureState.WAITING && !holdLock(env)) {
×
436
      LOG.debug("{} is in WAITING STATE, and holdLock= false , skip acquiring lock.", this);
×
437
      return;
×
438
    }
439
    LOG.debug("{} held the lock before restarting, call acquireLock to restore it.", this);
×
440
    acquireLock(env);
×
441
  }
×
442

443
  @Override
444
  public String toString() {
445
    // Return the simple String presentation of the procedure.
446
    return toStringSimpleSB().toString();
1✔
447
  }
448

449
  /**
450
   * Build the StringBuilder for the simple form of procedure string.
451
   *
452
   * @return the StringBuilder
453
   */
454
  protected StringBuilder toStringSimpleSB() {
455
    final StringBuilder sb = new StringBuilder();
1✔
456

457
    sb.append("pid=");
1✔
458
    sb.append(getProcId());
1✔
459

460
    if (hasParent()) {
1✔
461
      sb.append(", ppid=");
1✔
462
      sb.append(getParentProcId());
1✔
463
    }
464

465
    /*
466
     * TODO
467
     * Enable later when this is being used.
468
     * Currently owner not used.
469
    if (hasOwner()) {
470
      sb.append(", owner=");
471
      sb.append(getOwner());
472
    }*/
473

474
    sb.append(", state="); // pState for Procedure State as opposed to any other kind.
1✔
475
    toStringState(sb);
1✔
476

477
    // Only print out locked if actually locked. Most of the time it is not.
478
    if (this.locked) {
1✔
479
      sb.append(", locked=").append(locked);
1✔
480
    }
481
    if (hasException()) {
1✔
482
      sb.append(", exception=" + getException());
1✔
483
    }
484

485
    sb.append("; ");
1✔
486
    toStringClassDetails(sb);
1✔
487

488
    return sb;
1✔
489
  }
490

491
  /** Extend the toString() information with more procedure details */
492
  public String toStringDetails() {
493
    final StringBuilder sb = toStringSimpleSB();
×
494

495
    sb.append(" submittedTime=");
×
496
    sb.append(getSubmittedTime());
×
497

498
    sb.append(", lastUpdate=");
×
499
    sb.append(getLastUpdate());
×
500

501
    final int[] stackIndices = getStackIndexes();
×
502
    if (stackIndices != null) {
×
503
      sb.append("\n");
×
504
      sb.append("stackIndexes=");
×
505
      sb.append(Arrays.toString(stackIndices));
×
506
    }
507

508
    return sb.toString();
×
509
  }
510

511
  protected String toStringClass() {
512
    StringBuilder sb = new StringBuilder();
×
513
    toStringClassDetails(sb);
×
514
    return sb.toString();
×
515
  }
516

517
  /**
518
   * Called from {@link #toString()} when interpolating {@link Procedure} State. Allows decorating
519
   * generic Procedure State with Procedure particulars.
520
   *
521
   * @param builder Append current {@link ProcedureState}
522
   */
523
  protected void toStringState(StringBuilder builder) {
524
    builder.append(getState());
1✔
525
  }
1✔
526

527
  /**
528
   * Extend the toString() information with the procedure details e.g. className and parameters
529
   *
530
   * @param builder the string builder to use to append the proc specific information
531
   */
532
  protected void toStringClassDetails(StringBuilder builder) {
533
    builder.append(getClass().getName());
1✔
534
  }
1✔
535

536
  // ==========================================================================
537
  //  Those fields are unchanged after initialization.
538
  //
539
  //  Each procedure will get created from the user or during
540
  //  ProcedureExecutor.start() during the load() phase and then submitted
541
  //  to the executor. these fields will never be changed after initialization
542
  // ==========================================================================
543
  public long getProcId() {
544
    return procId;
1✔
545
  }
546

547
  public boolean hasParent() {
548
    return parentProcId != NO_PROC_ID;
1✔
549
  }
550

551
  public long getParentProcId() {
552
    return parentProcId;
1✔
553
  }
554

555
  public long getRootProcId() {
556
    return rootProcId;
×
557
  }
558

559
  public String getProcName() {
560
    return toStringClass();
×
561
  }
562

563
  public long getSubmittedTime() {
564
    return submittedTime;
1✔
565
  }
566

567
  /** Called by the ProcedureExecutor to assign the ID to the newly created procedure. */
568
  protected void setProcId(long procId) {
569
    this.procId = procId;
1✔
570
  }
1✔
571

572
  public void setProcRunnable() {
573
    this.submittedTime = System.currentTimeMillis();
1✔
574
    setState(ProcedureState.RUNNABLE);
1✔
575
  }
1✔
576

577
  /** Called by the ProcedureExecutor to assign the parent to the newly created procedure. */
578
  protected void setParentProcId(long parentProcId) {
579
    this.parentProcId = parentProcId;
1✔
580
  }
1✔
581

582
  protected void setRootProcId(long rootProcId) {
583
    this.rootProcId = rootProcId;
1✔
584
  }
1✔
585

586
  /**
587
   * Called on store load to initialize the Procedure internals after the creation/deserialization.
588
   */
589
  protected void setSubmittedTime(long submittedTime) {
590
    this.submittedTime = submittedTime;
1✔
591
  }
1✔
592

593
  // ==========================================================================
594
  //  runtime state - timeout related
595
  // ==========================================================================
596
  /** @param timeout timeout interval in msec */
597
  protected void setTimeout(long timeout) {
598
    this.timeout = timeout;
1✔
599
  }
1✔
600

601
  public boolean hasTimeout() {
602
    return timeout != NO_TIMEOUT;
×
603
  }
604

605
  /** @return the timeout in msec */
606
  public long getTimeout() {
607
    return timeout;
1✔
608
  }
609

610
  /**
611
   * Called on store load to initialize the Procedure internals after the creation/deserialization.
612
   */
613
  protected void setLastUpdate(long lastUpdate) {
614
    this.lastUpdate = lastUpdate;
1✔
615
  }
1✔
616

617
  /** Called by ProcedureExecutor after each time a procedure step is executed. */
618
  protected void updateTimestamp() {
619
    this.lastUpdate = System.currentTimeMillis();
1✔
620
  }
1✔
621

622
  public long getLastUpdate() {
623
    return lastUpdate;
1✔
624
  }
625

626
  /**
627
   * Timeout of the next timeout. Called by the ProcedureExecutor if the procedure has timeout set
628
   * and the procedure is in the waiting queue.
629
   *
630
   * @return the timestamp of the next timeout.
631
   */
632
  protected long getTimeoutTimestamp() {
633
    return getLastUpdate() + getTimeout();
1✔
634
  }
635

636
  // ==========================================================================
637
  //  runtime state
638
  // ==========================================================================
639
  /** @return the time elapsed between the last update and the start time of the procedure. */
640
  public long elapsedTime() {
641
    return getLastUpdate() - getSubmittedTime();
1✔
642
  }
643

644
  /** @return the serialized result if any, otherwise null */
645
  public byte[] getResult() {
646
    return result.get();
×
647
  }
648

649
  /**
650
   * The procedure may leave a "result" on completion.
651
   *
652
   * @param result the serialized result that will be passed to the client
653
   */
654
  protected void setResult(byte[] result) {
655
    this.result.set(result);
×
656
  }
×
657

658
  /**
659
   * Will only be called when loading procedures from procedure store, where we need to record
660
   * whether the procedure has already held a lock. Later we will call {@link #restoreLock(Object)}
661
   * to actually acquire the lock.
662
   */
663
  final void lockedWhenLoading() {
664
    this.lockedWhenLoading = true;
×
665
  }
×
666

667
  /**
668
   * Can only be called when restarting, before the procedure actually being executed, as after we
669
   * actually call the {@link #doAcquireLock(Object, IProcedureStore)} method, we will reset {@link
670
   * #lockedWhenLoading} to false.
671
   *
672
   * <p>Now it is only used in the ProcedureScheduler to determine whether we should put a Procedure
673
   * in front of a queue.
674
   */
675
  public boolean isLockedWhenLoading() {
676
    return lockedWhenLoading;
×
677
  }
678

679
  // ==============================================================================================
680
  //  Runtime state, updated every operation by the ProcedureExecutor
681
  //
682
  //  There is always 1 thread at the time operating on the state of the procedure.
683
  //  The ProcedureExecutor may check and set states, or some Procecedure may
684
  //  update its own state. but no concurrent updates. we use synchronized here
685
  //  just because the procedure can get scheduled on different executor threads on each step.
686
  // ==============================================================================================
687

688
  /** @return true if the procedure is in a RUNNABLE state. */
689
  public synchronized boolean isRunnable() {
690
    return state == ProcedureState.RUNNABLE;
1✔
691
  }
692

693
  public synchronized boolean isInitializing() {
694
    return state == ProcedureState.INITIALIZING;
×
695
  }
696

697
  /** @return true if the procedure has failed. It may or may not have rolled back. */
698
  public synchronized boolean isFailed() {
699
    return state == ProcedureState.FAILED || state == ProcedureState.ROLLEDBACK;
1✔
700
  }
701

702
  /** @return true if the procedure is finished successfully. */
703
  public synchronized boolean isSuccess() {
704
    return state == ProcedureState.SUCCESS && !hasException();
1✔
705
  }
706

707
  /**
708
   * @return true if the procedure is finished. The Procedure may be completed successfully or
709
   *     rolledback.
710
   */
711
  public synchronized boolean isFinished() {
712
    return isSuccess() || state == ProcedureState.ROLLEDBACK;
1✔
713
  }
714

715
  /** @return true if the procedure is waiting for a child to finish or for an external event. */
716
  public synchronized boolean isWaiting() {
717
    switch (state) {
1✔
718
      case WAITING:
719
      case WAITING_TIMEOUT:
720
        return true;
×
721
      default:
722
        break;
723
    }
724
    return false;
1✔
725
  }
726

727
  protected synchronized void setState(final ProcedureState state) {
728
    this.state = state;
1✔
729
    updateTimestamp();
1✔
730
  }
1✔
731

732
  public synchronized ProcedureState getState() {
733
    return state;
1✔
734
  }
735

736
  protected synchronized void setFailure(final String source, final Throwable cause) {
737
    setFailure(new ProcedureException(source, cause));
×
738
  }
×
739

740
  protected synchronized void setFailure(final ProcedureException exception) {
741
    this.exception = exception;
1✔
742
    if (!isFinished()) {
1✔
743
      setState(ProcedureState.FAILED);
1✔
744
    }
745
  }
1✔
746

747
  protected void setAbortFailure(final String source, final String msg) {
748
    setFailure(source, new ProcedureAbortedException(msg));
×
749
  }
×
750

751
  /**
752
   * Called by the ProcedureExecutor when the timeout set by setTimeout() is expired.
753
   *
754
   * <p>Another usage for this method is to implement retrying. A procedure can set the state to
755
   * {@code WAITING_TIMEOUT} by calling {@code setState} method, and throw a {@link
756
   * ProcedureSuspendedException} to halt the execution of the procedure, and do not forget a call
757
   * {@link #setTimeout(long)} method to set the timeout. And you should also override this method
758
   * to wake up the procedure, and also return false to tell the ProcedureExecutor that the timeout
759
   * event has been handled.
760
   *
761
   * @return true to let the framework handle the timeout as abort, false in case the procedure
762
   *     handled the timeout itself.
763
   */
764
  protected synchronized boolean setTimeoutFailure(Env env) {
765
    if (state == ProcedureState.WAITING_TIMEOUT) {
×
766
      long timeDiff = System.currentTimeMillis() - lastUpdate;
×
767
      setFailure(
×
768
          "ProcedureExecutor",
769
          new ProcedureTimeoutException("Operation timed out after " + timeDiff + " ms."));
770
      return true;
×
771
    }
772
    return false;
×
773
  }
774

775
  public synchronized boolean hasException() {
776
    return exception != null;
1✔
777
  }
778

779
  public synchronized ProcedureException getException() {
780
    return exception;
1✔
781
  }
782

783
  /** Called by the ProcedureExecutor on procedure-load to restore the latch state */
784
  protected synchronized void setChildrenLatch(int numChildren) {
785
    this.childrenLatch = numChildren;
1✔
786
    if (LOG.isTraceEnabled()) {
1✔
787
      LOG.trace("CHILD LATCH INCREMENT SET " + this.childrenLatch, new Throwable(this.toString()));
×
788
    }
789
  }
1✔
790

791
  /** Called by the ProcedureExecutor on procedure-load to restore the latch state */
792
  protected synchronized void incChildrenLatch() {
793
    // TODO: can this be inferred from the stack? I think so...
794
    this.childrenLatch++;
1✔
795
    if (LOG.isTraceEnabled()) {
1✔
796
      LOG.trace("CHILD LATCH INCREMENT " + this.childrenLatch, new Throwable(this.toString()));
×
797
    }
798
  }
1✔
799

800
  /** Called by the ProcedureExecutor to notify that one of the sub-procedures has completed. */
801
  private synchronized boolean childrenCountDown() {
802
    assert childrenLatch > 0 : this;
1✔
803
    boolean b = --childrenLatch == 0;
1✔
804
    if (LOG.isTraceEnabled()) {
1✔
805
      LOG.trace("CHILD LATCH DECREMENT " + childrenLatch, new Throwable(this.toString()));
×
806
    }
807
    return b;
1✔
808
  }
809

810
  /**
811
   * Try to set this procedure into RUNNABLE state. Succeeds if all subprocedures/children are done.
812
   *
813
   * @return True if we were able to move procedure to RUNNABLE state.
814
   */
815
  synchronized boolean tryRunnable() {
816
    // Don't use isWaiting in the below; it returns true for WAITING and WAITING_TIMEOUT
817
    if (getState() == ProcedureState.WAITING && childrenCountDown()) {
1✔
818
      setState(ProcedureState.RUNNABLE);
1✔
819
      return true;
1✔
820
    } else {
821
      return false;
1✔
822
    }
823
  }
824

825
  protected synchronized boolean hasChildren() {
826
    return childrenLatch > 0;
1✔
827
  }
828

829
  protected synchronized int getChildrenLatch() {
830
    return childrenLatch;
×
831
  }
832

833
  /**
834
   * Called by the RootProcedureState on procedure execution. Each procedure store its stack-index
835
   * positions.
836
   */
837
  protected synchronized void addStackIndex(final int index) {
838
    if (stackIndexes == null) {
1✔
839
      stackIndexes = new int[] {index};
1✔
840
    } else {
841
      int count = stackIndexes.length;
1✔
842
      stackIndexes = Arrays.copyOf(stackIndexes, count + 1);
1✔
843
      stackIndexes[count] = index;
1✔
844
    }
845
  }
1✔
846

847
  protected synchronized boolean removeStackIndex() {
848
    if (stackIndexes != null && stackIndexes.length > 1) {
1✔
849
      stackIndexes = Arrays.copyOf(stackIndexes, stackIndexes.length - 1);
1✔
850
      return false;
1✔
851
    } else {
852
      stackIndexes = null;
1✔
853
      return true;
1✔
854
    }
855
  }
856

857
  /**
858
   * Called on store load to initialize the Procedure internals after the creation/deserialization.
859
   */
860
  protected synchronized void setStackIndexes(final List<Integer> stackIndexes) {
861
    this.stackIndexes = new int[stackIndexes.size()];
1✔
862
    for (int i = 0; i < this.stackIndexes.length; ++i) {
1✔
863
      this.stackIndexes[i] = stackIndexes.get(i);
1✔
864
    }
865
  }
1✔
866

867
  protected synchronized boolean wasExecuted() {
868
    return stackIndexes != null;
×
869
  }
870

871
  protected synchronized int[] getStackIndexes() {
872
    return stackIndexes;
1✔
873
  }
874

875
  /** Helper to lookup the root Procedure ID given a specified procedure. */
876
  protected static long getRootProcedureId(Map<Long, Procedure> procedures, Procedure proc) {
877
    while (proc.hasParent()) {
1✔
878
      proc = procedures.get(proc.getParentProcId());
1✔
879
      if (proc == null) {
1✔
880
        return NO_PROC_ID;
×
881
      }
882
    }
883
    return proc.getProcId();
1✔
884
  }
885

886
  public void setRootProcedureId(long rootProcedureId) {
887
    this.rootProcId = rootProcedureId;
1✔
888
  }
1✔
889

890
  /**
891
   * @param a the first procedure to be compared.
892
   * @param b the second procedure to be compared.
893
   * @return true if the two procedures have the same parent
894
   */
895
  public static boolean haveSameParent(Procedure<?> a, Procedure<?> b) {
896
    return a.hasParent() && b.hasParent() && (a.getParentProcId() == b.getParentProcId());
×
897
  }
898

899
  @Override
900
  public int compareTo(Procedure<Env> other) {
901
    return Long.compare(getProcId(), other.getProcId());
×
902
  }
903
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc