• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

temporalio / sdk-java / #207

06 Nov 2023 04:21PM CUT coverage: 77.277% (-0.008%) from 77.285%
#207

push

github

web-flow
Test continue as new with local activities (#1922)

Test continue as new with local activities

18728 of 24235 relevant lines covered (77.28%)

0.77 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

91.8
/temporal-sdk/src/main/java/io/temporal/internal/sync/DeterministicRunnerImpl.java
1
/*
2
 * Copyright (C) 2022 Temporal Technologies, Inc. All Rights Reserved.
3
 *
4
 * Copyright (C) 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved.
5
 *
6
 * Modifications copyright (C) 2017 Uber Technologies, Inc.
7
 *
8
 * Licensed under the Apache License, Version 2.0 (the "License");
9
 * you may not use this material except in compliance with the License.
10
 * You may obtain a copy of the License at
11
 *
12
 *   http://www.apache.org/licenses/LICENSE-2.0
13
 *
14
 * Unless required by applicable law or agreed to in writing, software
15
 * distributed under the License is distributed on an "AS IS" BASIS,
16
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17
 * See the License for the specific language governing permissions and
18
 * limitations under the License.
19
 */
20

21
package io.temporal.internal.sync;
22

23
import com.google.common.base.Preconditions;
24
import com.google.common.primitives.Ints;
25
import io.temporal.common.context.ContextPropagator;
26
import io.temporal.internal.WorkflowThreadMarker;
27
import io.temporal.internal.context.ContextThreadLocal;
28
import io.temporal.internal.worker.WorkflowExecutorCache;
29
import io.temporal.serviceclient.CheckedExceptionWrapper;
30
import io.temporal.workflow.Promise;
31
import java.util.ArrayList;
32
import java.util.HashMap;
33
import java.util.HashSet;
34
import java.util.Iterator;
35
import java.util.List;
36
import java.util.Map;
37
import java.util.Optional;
38
import java.util.Set;
39
import java.util.TreeSet;
40
import java.util.concurrent.*;
41
import java.util.concurrent.locks.Lock;
42
import java.util.concurrent.locks.ReentrantLock;
43
import java.util.function.Supplier;
44
import javax.annotation.Nonnull;
45
import javax.annotation.Nullable;
46
import org.slf4j.Logger;
47
import org.slf4j.LoggerFactory;
48

49
/**
50
 * Throws Error in case of any unexpected condition. It is to fail a workflow task, not a workflow.
51
 */
52
class DeterministicRunnerImpl implements DeterministicRunner {
53

54
  private static final int ROOT_THREAD_PRIORITY = 0;
55
  private static final int CALLBACK_THREAD_PRIORITY = 10;
56
  private static final int WORKFLOW_THREAD_PRIORITY = 20000000;
57

58
  static final String WORKFLOW_ROOT_THREAD_NAME = "workflow-root";
59

60
  private static final Logger log = LoggerFactory.getLogger(DeterministicRunnerImpl.class);
1✔
61
  private static final ThreadLocal<WorkflowThread> currentThreadThreadLocal = new ThreadLocal<>();
1✔
62

63
  // Runner Lock is taken shortly by Workflow Threads when they resume and yield.
64
  // It's taken and retained also by DeterministicRunnerImpl control code.
65
  // This ensures happens-before between all workflow threads and DeterministicRunnerImpl control
66
  // code.
67
  // Note that the Runner Lock is not retained when workflow thread executes and not inside yield
68
  // method.
69
  // To check for an active event loop, inRunUntilAllBlocked value taken under Runner Lock should be
70
  // used.
71
  private final Lock lock = new ReentrantLock();
1✔
72
  // true when the control code of the main workflow event loop is running.
73
  // Workflow methods may get unblocked and executed by the control code when true.
74
  // Updated always with Runner Lock taken.
75
  private boolean inRunUntilAllBlocked;
76

77
  // Note that threads field is a set. So we need to make sure that getPriority never returns the
78
  // same value for different threads. We use addedThreads variable for this. Protected by lock
79
  private final Set<WorkflowThread> threads =
1✔
80
      new TreeSet<>((t1, t2) -> Ints.compare(t1.getPriority(), t2.getPriority()));
1✔
81
  // Values from RunnerLocalInternal
82
  private final Map<RunnerLocalInternal<?>, Object> runnerLocalMap = new HashMap<>();
1✔
83
  private final Runnable rootRunnable;
84
  private final WorkflowThreadExecutor workflowThreadExecutor;
85
  private final SyncWorkflowContext workflowContext;
86
  private final WorkflowExecutorCache cache;
87

88
  // always accessed under the runner lock
89
  private final List<NamedRunnable> toExecuteInWorkflowThread = new ArrayList<>();
1✔
90

91
  // Access to workflowThreadsToAdd, callbackThreadsToAdd, addedThreads doesn't have to be
92
  // synchronized.
93
  // Inside DeterministicRunner the access to these variables is under the runner lock.
94
  //
95
  // newWorkflowThread can be called from the workflow thread directly by using an Async.
96
  // But Workflow Threads when resuming or yielding are required to get the same runner lock as the
97
  // DeterministicRunner itself, which provides happens-before and guarantees visibility of changes
98
  // to these collections.
99
  // Only one Workflow Thread can run at a time and no DeterministicRunner code modifying these
100
  // variables run at the same time with the Workflow Thread.
101
  private final List<WorkflowThread> workflowThreadsToAdd = new ArrayList<>();
1✔
102
  private final List<WorkflowThread> callbackThreadsToAdd = new ArrayList<>();
1✔
103
  private int addedThreads;
104

105
  /**
106
   * Close is requested by the workflow code and the workflow thread itself. Such close is processed
107
   * immediately after the requesting thread is blocked, other workflow threads don't get a chance
108
   * to proceed after it.
109
   */
110
  private boolean exitRequested;
111

112
  /**
113
   * Close is requested by the control code. This close is potentially delayed and wait till the
114
   * workflow code is blocked if it's currently processing.
115
   */
116
  private boolean closeRequested;
117

118
  /**
119
   * true if some thread already started performing closure. Only one thread can do it and only
120
   * once.
121
   */
122
  private boolean closeStarted;
123

124
  /** If this future is filled, the runner is successfully closed */
125
  private final CompletableFuture<?> closeFuture = new CompletableFuture<>();
1✔
126

127
  static WorkflowThread currentThreadInternal() {
128
    WorkflowThread result = currentThreadThreadLocal.get();
1✔
129
    if (result == null) {
1✔
130
      throw new Error("Called from non workflow or workflow callback thread");
×
131
    }
132
    return result;
1✔
133
  }
134

135
  static Optional<WorkflowThread> currentThreadInternalIfPresent() {
136
    WorkflowThread result = currentThreadThreadLocal.get();
1✔
137
    if (result == null) {
1✔
138
      return Optional.empty();
1✔
139
    }
140
    return Optional.of(result);
1✔
141
  }
142

143
  static void setCurrentThreadInternal(WorkflowThread coroutine) {
144
    if (coroutine != null) {
1✔
145
      currentThreadThreadLocal.set(coroutine);
1✔
146
      WorkflowThreadMarkerAccessor.markAsWorkflowThread();
1✔
147
    } else {
148
      currentThreadThreadLocal.set(null);
1✔
149
      WorkflowThreadMarkerAccessor.markAsNonWorkflowThread();
1✔
150
    }
151
  }
1✔
152

153
  /**
154
   * Used to check for failedPromises that contain an error, but never where accessed. It is to
155
   * avoid failure swallowing by failedPromises which is very hard to troubleshoot.
156
   */
157
  private final Set<Promise<?>> failedPromises = new HashSet<>();
1✔
158

159
  private WorkflowThread rootWorkflowThread;
160
  private final CancellationScopeImpl runnerCancellationScope;
161

162
  DeterministicRunnerImpl(
163
      WorkflowThreadExecutor workflowThreadExecutor,
164
      @Nonnull SyncWorkflowContext workflowContext,
165
      Runnable root) {
166
    this(workflowThreadExecutor, workflowContext, root, null);
1✔
167
  }
1✔
168

169
  DeterministicRunnerImpl(
170
      WorkflowThreadExecutor workflowThreadExecutor,
171
      @Nonnull SyncWorkflowContext workflowContext,
172
      Runnable root,
173
      WorkflowExecutorCache cache) {
1✔
174
    this.workflowThreadExecutor = workflowThreadExecutor;
1✔
175
    this.workflowContext = Preconditions.checkNotNull(workflowContext, "workflowContext");
1✔
176
    // TODO this should be refactored, publishing of this in an constructor into external objects is
177
    // a bad practice
178
    this.workflowContext.setRunner(this);
1✔
179
    this.cache = cache;
1✔
180
    this.runnerCancellationScope = new CancellationScopeImpl(true, null, null);
1✔
181
    this.rootRunnable = root;
1✔
182
  }
1✔
183

184
  @Override
185
  public void runUntilAllBlocked(long deadlockDetectionTimeout) {
186
    if (rootWorkflowThread == null) {
1✔
187
      rootWorkflowThread = newRootThread(rootRunnable);
1✔
188
      threads.add(rootWorkflowThread);
1✔
189
      rootWorkflowThread.start();
1✔
190
    }
191
    lock.lock();
1✔
192
    try {
193
      checkNotClosed();
1✔
194
      checkNotCloseRequestedLocked();
1✔
195
      inRunUntilAllBlocked = true;
1✔
196
      // Keep repeating until at least one of the threads makes progress.
197
      boolean progress;
198
      outerLoop:
199
      do {
200
        if (exitRequested) {
1✔
201
          closeRequested = true;
×
202
          break;
×
203
        }
204
        if (!toExecuteInWorkflowThread.isEmpty()) {
1✔
205
          for (NamedRunnable nr : toExecuteInWorkflowThread) {
1✔
206
            Object callbackThread =
1✔
207
                workflowContext
208
                    .getWorkflowInboundInterceptor()
1✔
209
                    .newCallbackThread(nr.runnable, nr.name);
1✔
210
            Preconditions.checkState(
1✔
211
                callbackThread != null,
212
                "[BUG] One of the custom interceptors illegally overrode newCallbackThread result to null. "
213
                    + "Check WorkflowInboundCallsInterceptor#newCallbackThread contract.");
214
            Preconditions.checkState(
1✔
215
                callbackThread instanceof WorkflowThread,
216
                "[BUG] One of the custom interceptors illegally overrode newCallbackThread result. "
217
                    + "Check WorkflowInboundCallsInterceptor#newCallbackThread contract. "
218
                    + "Illegal object returned from the interceptors chain: %s",
219
                callbackThread);
220
          }
1✔
221

222
          appendCallbackThreadsLocked();
1✔
223
        }
224
        toExecuteInWorkflowThread.clear();
1✔
225
        progress = false;
1✔
226
        Iterator<WorkflowThread> ci = threads.iterator();
1✔
227
        while (ci.hasNext()) {
1✔
228
          WorkflowThread c = ci.next();
1✔
229
          progress = c.runUntilBlocked(deadlockDetectionTimeout) || progress;
1✔
230
          if (exitRequested) {
1✔
231
            closeRequested = true;
1✔
232
            break outerLoop;
1✔
233
          }
234
          if (c.isDone()) {
1✔
235
            ci.remove();
1✔
236
            Throwable unhandledException = c.getUnhandledException();
1✔
237
            if (unhandledException != null) {
1✔
238
              closeRequested = true;
1✔
239
              throw WorkflowInternal.wrap(unhandledException);
1✔
240
            }
241
          }
242
        }
1✔
243
        appendWorkflowThreadsLocked();
1✔
244
      } while (progress && !threads.isEmpty());
1✔
245
    } catch (PotentialDeadlockException e) {
1✔
246
      String triggerThreadStackTrace = "";
1✔
247
      StringBuilder otherThreadsDump = new StringBuilder();
1✔
248
      for (WorkflowThread t : threads) {
1✔
249
        if (t.getWorkflowThreadContext() != e.getWorkflowThreadContext()) {
1✔
250
          if (otherThreadsDump.length() > 0) {
1✔
251
            otherThreadsDump.append("\n");
×
252
          }
253
          otherThreadsDump.append(t.getStackTrace());
1✔
254
        } else {
255
          triggerThreadStackTrace = t.getStackTrace();
1✔
256
        }
257
      }
1✔
258
      e.setStackDump(
1✔
259
          triggerThreadStackTrace, otherThreadsDump.toString(), System.currentTimeMillis());
1✔
260
      throw e;
1✔
261
    } finally {
262
      inRunUntilAllBlocked = false;
1✔
263
      lock.unlock();
1✔
264
      // Close was requested while running
265
      if (closeRequested) {
1✔
266
        close(true);
1✔
267
      }
268
    }
269
  }
1✔
270

271
  @Override
272
  public boolean isDone() {
273
    lock.lock();
1✔
274
    try {
275
      return closeFuture.isDone()
1✔
276
          // if close is requested, we should wait for the closeFuture to be filled
277
          || !closeRequested && !areThreadsToBeExecuted();
1✔
278
    } finally {
279
      lock.unlock();
1✔
280
    }
281
  }
282

283
  @Override
284
  public void cancel(String reason) {
285
    executeInWorkflowThread("cancel workflow callback", () -> rootWorkflowThread.cancel(reason));
1✔
286
  }
1✔
287

288
  @Override
289
  public void close() {
290
    close(false);
1✔
291
  }
1✔
292

293
  /**
294
   * Destroys all controlled workflow threads by throwing {@link DestroyWorkflowThreadError} from
295
   * {@link WorkflowThreadContext#yield(String, Supplier)} when the threads are blocking on the
296
   * temporal-sdk code.
297
   */
298
  private void close(boolean fromWorkflowThread) {
299
    lock.lock();
1✔
300
    if (closeFuture.isDone()) {
1✔
301
      lock.unlock();
1✔
302
      return;
1✔
303
    }
304
    closeRequested = true;
1✔
305
    if (
1✔
306
    // If runUntilAllBlocked is true, event loop and workflow threads are executing.
307
    // Do not close immediately in this case.
308
    // closeRequested set earlier will make an event loop control thread to call close()
309
    // at the end that will actually perform the closure.
310
    inRunUntilAllBlocked
311
        // some other thread or caller is already in the process of closing
312
        || closeStarted) {
313

314
      lock.unlock();
1✔
315
      // If called from the workflow thread, we don't want to block this call, otherwise we
316
      // will cause a deadlock. The thread performing the closure waits for all workflow threads
317
      // to complete first.
318
      if (!fromWorkflowThread) {
1✔
319
        // We will not perform the closure in this call and should just wait on the future when
320
        // another thread responsible for it will close.
321
        closeFuture.join();
1✔
322
      }
323
      return;
1✔
324
    }
325

326
    closeStarted = true;
1✔
327
    // lock is taken here
328
    try {
329
      // in some circumstances when a workflow broke Deadline Detector,
330
      // runUntilAllBlocked may return while workflow threads are still running.
331
      // If this happens, these threads may potentially start new additional threads that will be
332
      // in workflowThreadsToAdd and callbackThreadsToAdd.
333
      // That's why we need to make sure that all the spawned threads were shut down in a cycle.
334
      while (areThreadsToBeExecuted()) {
1✔
335
        List<WorkflowThreadStopFuture> threadFutures = new ArrayList<>();
1✔
336
        try {
337
          toExecuteInWorkflowThread.clear();
1✔
338
          appendWorkflowThreadsLocked();
1✔
339
          appendCallbackThreadsLocked();
1✔
340
          for (WorkflowThread workflowThread : threads) {
1✔
341
            threadFutures.add(
1✔
342
                new WorkflowThreadStopFuture(workflowThread, workflowThread.stopNow()));
1✔
343
          }
1✔
344
          threads.clear();
1✔
345

346
          // We cannot use an iterator to unregister failed Promises since f.get()
347
          // will remove the promise directly from failedPromises. This causes an
348
          // ConcurrentModificationException
349
          // For this reason we will loop over a copy of failedPromises.
350
          Set<Promise<?>> failedPromisesLoop = new HashSet<>(failedPromises);
1✔
351
          for (Promise<?> f : failedPromisesLoop) {
1✔
352
            try {
353
              f.get();
×
354
              throw new Error("unreachable");
×
355
            } catch (RuntimeException e) {
1✔
356
              log.warn(
1✔
357
                  "Promise completed with exception and was never accessed. The ignored exception:",
358
                  CheckedExceptionWrapper.unwrap(e));
1✔
359
            }
360
          }
1✔
361
        } finally {
362
          // we need to unlock for the further code because threads will not be able to proceed with
363
          // destruction otherwise.
364
          lock.unlock();
1✔
365
        }
366

367
        // Wait on all tasks outside the lock since these tasks use the same lock to execute.
368
        try {
369
          for (WorkflowThreadStopFuture threadFuture : threadFutures) {
1✔
370
            try {
371
              threadFuture.stopFuture.get(10, TimeUnit.SECONDS);
1✔
372
            } catch (TimeoutException e) {
×
373
              WorkflowThread workflowThread = threadFuture.workflowThread;
×
374
              log.error(
×
375
                  "[BUG] Workflow thread '{}' of workflow '{}' can't be destroyed in time. "
376
                      + "This will lead to a workflow cache leak. "
377
                      + "This problem is usually caused by a workflow implementation swallowing java.lang.Error instead of rethrowing it. "
378
                      + " Thread dump of the stuck thread:\n{}",
379
                  workflowThread.getName(),
×
380
                  workflowContext.getReplayContext().getWorkflowId(),
×
381
                  workflowThread.getStackTrace());
×
382
            }
1✔
383
          }
1✔
384
        } catch (InterruptedException e) {
1✔
385
          Thread.currentThread().interrupt();
1✔
386
          // Worker is likely stopped with shutdownNow()
387
          // TODO consider propagating as an original interrupted exception to the top level
388
          throw new Error("Worker executor thread interrupted during stopping of a coroutine", e);
1✔
389
        } catch (ExecutionException e) {
×
390
          throw new Error("[BUG] Unexpected failure while stopping a coroutine", e);
×
391
        } finally {
392
          // acquire the lock back as it should be taken for the loop condition check.
393
          lock.lock();
1✔
394
        }
395
      }
1✔
396
    } finally {
397
      closeFuture.complete(null);
1✔
398
      lock.unlock();
1✔
399
    }
400
  }
1✔
401

402
  @Override
403
  public String stackTrace() {
404
    StringBuilder result = new StringBuilder();
1✔
405
    lock.lock();
1✔
406
    try {
407
      if (closeFuture.isDone()) {
1✔
408
        return "Workflow is closed.";
1✔
409
      }
410
      for (WorkflowThread coroutine : threads) {
1✔
411
        if (result.length() > 0) {
1✔
412
          result.append("\n");
1✔
413
        }
414
        coroutine.addStackTrace(result);
1✔
415
      }
1✔
416
    } finally {
417
      lock.unlock();
1✔
418
    }
419
    return result.toString();
1✔
420
  }
421

422
  private void appendWorkflowThreadsLocked() {
423
    threads.addAll(workflowThreadsToAdd);
1✔
424
    workflowThreadsToAdd.clear();
1✔
425
  }
1✔
426

427
  private void appendCallbackThreadsLocked() {
428
    // TODO I'm not sure this comment makes sense, because threads list has comparator and we use
429
    // thread priorities anyway.
430

431
    // It is important to prepend threads as there are callbacks
432
    // like signals that have to run before any other threads.
433
    // Otherwise signal might be never processed if it was received
434
    // after workflow decided to close.
435
    // Adding the callbacks in the same order as they appear in history.
436
    for (int i = callbackThreadsToAdd.size() - 1; i >= 0; i--) {
1✔
437
      threads.add(callbackThreadsToAdd.get(i));
1✔
438
    }
439
    callbackThreadsToAdd.clear();
1✔
440
  }
1✔
441

442
  /** Creates a new instance of a root workflow thread. */
443
  private WorkflowThread newRootThread(Runnable runnable) {
444
    String name = WORKFLOW_ROOT_THREAD_NAME;
1✔
445
    // TODO: workflow instance specific thread name
446
    // String name = "workflow[" + workflowContext.getContext().getWorkflowId() + "]-root";
447
    if (rootWorkflowThread != null) {
1✔
448
      throw new IllegalStateException(
×
449
          "newRootThread can be called only if there is no existing root workflow thread");
450
    }
451
    rootWorkflowThread =
1✔
452
        new WorkflowThreadImpl(
453
            workflowThreadExecutor,
454
            workflowContext,
455
            this,
456
            name,
457
            ROOT_THREAD_PRIORITY,
458
            false,
459
            runnerCancellationScope,
460
            runnable,
461
            cache,
462
            getContextPropagators(),
1✔
463
            getPropagatedContexts());
1✔
464
    return rootWorkflowThread;
1✔
465
  }
466

467
  @Nonnull
468
  @Override
469
  public WorkflowThread newWorkflowThread(
470
      Runnable runnable, boolean detached, @Nullable String name) {
471
    if (name == null) {
1✔
472
      name = "workflow[" + workflowContext.getReplayContext().getWorkflowId() + "]-" + addedThreads;
1✔
473
    }
474
    if (rootWorkflowThread == null) {
1✔
475
      throw new IllegalStateException(
×
476
          "newChildThread can be called only with existing root workflow thread");
477
    }
478
    checkWorkflowThreadOnly();
1✔
479
    checkNotClosed();
1✔
480
    WorkflowThread result =
1✔
481
        new WorkflowThreadImpl(
482
            workflowThreadExecutor,
483
            workflowContext,
484
            this,
485
            name,
486
            WORKFLOW_THREAD_PRIORITY + (addedThreads++),
487
            detached,
488
            CancellationScopeImpl.current(),
1✔
489
            runnable,
490
            cache,
491
            getContextPropagators(),
1✔
492
            getPropagatedContexts());
1✔
493
    workflowThreadsToAdd.add(result);
1✔
494
    return result;
1✔
495
  }
496

497
  @Nonnull
498
  @Override
499
  public WorkflowThread newCallbackThread(Runnable runnable, @Nullable String name) {
500
    if (name == null) {
1✔
501
      name = "workflow[" + workflowContext.getReplayContext().getWorkflowId() + "]-" + addedThreads;
×
502
    }
503
    WorkflowThread result =
1✔
504
        new WorkflowThreadImpl(
505
            workflowThreadExecutor,
506
            workflowContext,
507
            this,
508
            name,
509
            CALLBACK_THREAD_PRIORITY
510
                + (addedThreads++), // maintain the order in toExecuteInWorkflowThread
511
            false,
512
            runnerCancellationScope,
513
            runnable,
514
            cache,
515
            getContextPropagators(),
1✔
516
            getPropagatedContexts());
1✔
517
    callbackThreadsToAdd.add(result);
1✔
518
    return result;
1✔
519
  }
520

521
  /**
522
   * Executes before any other threads next time runUntilBlockedCalled. Must never be called from
523
   * any workflow threads.
524
   */
525
  @Override
526
  public void executeInWorkflowThread(String name, Runnable runnable) {
527
    lock.lock();
1✔
528
    try {
529
      // if the execution is closed, we will just add the callbacks, but we will never create
530
      // threads for them, so they will be effectively ignored
531
      toExecuteInWorkflowThread.add(new NamedRunnable(name, runnable));
1✔
532
    } finally {
533
      lock.unlock();
1✔
534
    }
535
  }
1✔
536

537
  Lock getLock() {
538
    return lock;
1✔
539
  }
540

541
  /** Register a promise that had failed but wasn't accessed yet. */
542
  void registerFailedPromise(Promise<?> promise) {
543
    if (!promise.isCompleted()) {
1✔
544
      throw new Error("expected failed");
×
545
    }
546
    failedPromises.add(promise);
1✔
547
  }
1✔
548

549
  /** Forget a failed promise as it was accessed. */
550
  void forgetFailedPromise(Promise<?> promise) {
551
    failedPromises.remove(promise);
1✔
552
  }
1✔
553

554
  void exit() {
555
    checkNotClosed();
1✔
556
    checkWorkflowThreadOnly();
1✔
557
    this.exitRequested = true;
1✔
558
  }
1✔
559

560
  private void checkWorkflowThreadOnly() {
561
    // TODO this is not a correct way to test for the fact that the method is called from the
562
    // workflow method.
563
    //  This check verifies that the workflow methods or controlling code are now running,
564
    //  but it doesn't verify if the calling thread is the one.
565
    if (!inRunUntilAllBlocked) {
1✔
566
      throw new Error("called from non workflow thread");
×
567
    }
568
  }
1✔
569

570
  private void checkNotCloseRequestedLocked() {
571
    if (closeRequested) {
1✔
572
      throw new Error("close requested");
1✔
573
    }
574
  }
1✔
575

576
  private void checkNotClosed() {
577
    if (closeFuture.isDone()) {
1✔
578
      throw new Error("closed");
×
579
    }
580
  }
1✔
581

582
  /**
583
   * @return true if there are no threads left to be processed for this workflow.
584
   */
585
  private boolean areThreadsToBeExecuted() {
586
    return !threads.isEmpty()
1✔
587
        || !workflowThreadsToAdd.isEmpty()
1✔
588
        || !callbackThreadsToAdd.isEmpty()
1✔
589
        || !toExecuteInWorkflowThread.isEmpty();
1✔
590
  }
591

592
  /**
593
   * Retrieve data from runner locals. Returns 1. not found (an empty Optional) 2. found but null
594
   * (an Optional of an empty Optional) 3. found and non-null (an Optional of an Optional of a
595
   * value). The type nesting is because Java Optionals cannot understand "Some null" vs "None",
596
   * which is exactly what we need here.
597
   *
598
   * @param key
599
   * @return one of three cases
600
   * @param <T>
601
   */
602
  @SuppressWarnings("unchecked")
603
  <T> Optional<Optional<T>> getRunnerLocal(RunnerLocalInternal<T> key) {
604
    if (!runnerLocalMap.containsKey(key)) {
1✔
605
      return Optional.empty();
1✔
606
    }
607
    return Optional.of(Optional.ofNullable((T) runnerLocalMap.get(key)));
1✔
608
  }
609

610
  <T> void setRunnerLocal(RunnerLocalInternal<T> key, T value) {
611
    runnerLocalMap.put(key, value);
1✔
612
  }
1✔
613

614
  /**
615
   * If we're executing as part of a workflow, get the current thread's context. Otherwise get the
616
   * context info from the workflow context.
617
   */
618
  private Map<String, Object> getPropagatedContexts() {
619
    if (currentThreadThreadLocal.get() != null) {
1✔
620
      return ContextThreadLocal.getCurrentContextForPropagation();
1✔
621
    } else {
622
      return workflowContext.getPropagatedContexts();
1✔
623
    }
624
  }
625

626
  private List<ContextPropagator> getContextPropagators() {
627
    if (currentThreadThreadLocal.get() != null) {
1✔
628
      return ContextThreadLocal.getContextPropagators();
1✔
629
    } else {
630
      return workflowContext.getContextPropagators();
1✔
631
    }
632
  }
633

634
  private static class WorkflowThreadMarkerAccessor extends WorkflowThreadMarker {
635
    public static void markAsWorkflowThread() {
636
      isWorkflowThreadThreadLocal.set(true);
1✔
637
    }
1✔
638

639
    public static void markAsNonWorkflowThread() {
640
      isWorkflowThreadThreadLocal.set(false);
1✔
641
    }
1✔
642
  }
643

644
  private static class NamedRunnable {
645
    private final String name;
646
    private final Runnable runnable;
647

648
    private NamedRunnable(String name, Runnable runnable) {
1✔
649
      this.name = name;
1✔
650
      this.runnable = runnable;
1✔
651
    }
1✔
652
  }
653

654
  private static class WorkflowThreadStopFuture {
655
    private final WorkflowThread workflowThread;
656
    private final Future<?> stopFuture;
657

658
    public WorkflowThreadStopFuture(WorkflowThread workflowThread, Future<?> stopFuture) {
1✔
659
      this.workflowThread = workflowThread;
1✔
660
      this.stopFuture = stopFuture;
1✔
661
    }
1✔
662
  }
663
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc