• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

temporalio / sdk-java / #103

pending completion
#103

push

github-actions

web-flow
Implement retry of local activities for over local retry threshold duration (#1542)

Issue #1261

244 of 244 new or added lines in 16 files covered. (100.0%)

16122 of 19841 relevant lines covered (81.26%)

0.81 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

91.29
/temporal-sdk/src/main/java/io/temporal/internal/sync/DeterministicRunnerImpl.java
1
/*
2
 * Copyright (C) 2022 Temporal Technologies, Inc. All Rights Reserved.
3
 *
4
 * Copyright (C) 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved.
5
 *
6
 * Modifications copyright (C) 2017 Uber Technologies, Inc.
7
 *
8
 * Licensed under the Apache License, Version 2.0 (the "License");
9
 * you may not use this material except in compliance with the License.
10
 * You may obtain a copy of the License at
11
 *
12
 *   http://www.apache.org/licenses/LICENSE-2.0
13
 *
14
 * Unless required by applicable law or agreed to in writing, software
15
 * distributed under the License is distributed on an "AS IS" BASIS,
16
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17
 * See the License for the specific language governing permissions and
18
 * limitations under the License.
19
 */
20

21
package io.temporal.internal.sync;
22

23
import com.google.common.base.Preconditions;
24
import com.google.common.primitives.Ints;
25
import io.temporal.common.context.ContextPropagator;
26
import io.temporal.internal.WorkflowThreadMarker;
27
import io.temporal.internal.context.ContextThreadLocal;
28
import io.temporal.internal.worker.WorkflowExecutorCache;
29
import io.temporal.serviceclient.CheckedExceptionWrapper;
30
import io.temporal.workflow.Promise;
31
import java.util.ArrayList;
32
import java.util.HashMap;
33
import java.util.HashSet;
34
import java.util.Iterator;
35
import java.util.List;
36
import java.util.Map;
37
import java.util.Optional;
38
import java.util.Set;
39
import java.util.TreeSet;
40
import java.util.concurrent.*;
41
import java.util.concurrent.locks.Lock;
42
import java.util.concurrent.locks.ReentrantLock;
43
import java.util.function.Supplier;
44
import javax.annotation.Nonnull;
45
import javax.annotation.Nullable;
46
import org.slf4j.Logger;
47
import org.slf4j.LoggerFactory;
48

49
/**
50
 * Throws Error in case of any unexpected condition. It is to fail a workflow task, not a workflow.
51
 */
52
class DeterministicRunnerImpl implements DeterministicRunner {
53

54
  private static final int ROOT_THREAD_PRIORITY = 0;
55
  private static final int CALLBACK_THREAD_PRIORITY = 10;
56
  private static final int WORKFLOW_THREAD_PRIORITY = 20000000;
57

58
  static final String WORKFLOW_ROOT_THREAD_NAME = "workflow-root";
59

60
  private static final Logger log = LoggerFactory.getLogger(DeterministicRunnerImpl.class);
1✔
61
  private static final ThreadLocal<WorkflowThread> currentThreadThreadLocal = new ThreadLocal<>();
1✔
62

63
  // Runner Lock is taken shortly by Workflow Threads when they resume and yield.
64
  // It's taken and retained also by DeterministicRunnerImpl control code.
65
  // This ensures happens-before between all workflow threads and DeterministicRunnerImpl control
66
  // code.
67
  // Note that the Runner Lock is not retained when workflow thread executes and not inside yield
68
  // method.
69
  // To check for an active event loop, inRunUntilAllBlocked value taken under Runner Lock should be
70
  // used.
71
  private final Lock lock = new ReentrantLock();
1✔
72
  // true when the control code of the main workflow event loop is running.
73
  // Workflow methods may get unblocked and executed by the control code when true.
74
  // Updated always with Runner Lock taken.
75
  private boolean inRunUntilAllBlocked;
76

77
  // Note that threads field is a set. So we need to make sure that getPriority never returns the
78
  // same value for different threads. We use addedThreads variable for this. Protected by lock
79
  private final Set<WorkflowThread> threads =
1✔
80
      new TreeSet<>((t1, t2) -> Ints.compare(t1.getPriority(), t2.getPriority()));
1✔
81
  // Values from RunnerLocalInternal
82
  private final Map<RunnerLocalInternal<?>, Object> runnerLocalMap = new HashMap<>();
1✔
83
  private final Runnable rootRunnable;
84
  private final WorkflowThreadExecutor workflowThreadExecutor;
85
  private final SyncWorkflowContext workflowContext;
86
  private final WorkflowExecutorCache cache;
87

88
  // always accessed under the runner lock
89
  private final List<NamedRunnable> toExecuteInWorkflowThread = new ArrayList<>();
1✔
90

91
  // Access to workflowThreadsToAdd, callbackThreadsToAdd, addedThreads doesn't have to be
92
  // synchronized.
93
  // Inside DeterministicRunner the access to these variables is under the runner lock.
94
  //
95
  // newWorkflowThread can be called from the workflow thread directly by using an Async.
96
  // But Workflow Threads when resuming or yielding are required to get the same runner lock as the
97
  // DeterministicRunner itself, which provides happens-before and guarantees visibility of changes
98
  // to these collections.
99
  // Only one Workflow Thread can run at a time and no DeterministicRunner code modifying these
100
  // variables run at the same time with the Workflow Thread.
101
  private final List<WorkflowThread> workflowThreadsToAdd = new ArrayList<>();
1✔
102
  private final List<WorkflowThread> callbackThreadsToAdd = new ArrayList<>();
1✔
103
  private int addedThreads;
104

105
  /**
106
   * Close is requested by the workflow code and the workflow thread itself. Such close is processed
107
   * immediately after the requesting thread is blocked, other workflow threads don't get a chance
108
   * to proceed after it.
109
   */
110
  private boolean exitRequested;
111

112
  /**
113
   * Close is requested by the control code. This close is potentially delayed and wait till the
114
   * workflow code is blocked if it's currently processing.
115
   */
116
  private boolean closeRequested;
117

118
  /**
119
   * true if some thread already started performing closure. Only one thread can do it and only
120
   * once.
121
   */
122
  private boolean closeStarted;
123

124
  /** If this future is filled, the runner is successfully closed */
125
  private final CompletableFuture<?> closeFuture = new CompletableFuture<>();
1✔
126

127
  static WorkflowThread currentThreadInternal() {
128
    WorkflowThread result = currentThreadThreadLocal.get();
1✔
129
    if (result == null) {
1✔
130
      throw new Error("Called from non workflow or workflow callback thread");
×
131
    }
132
    return result;
1✔
133
  }
134

135
  static Optional<WorkflowThread> currentThreadInternalIfPresent() {
136
    WorkflowThread result = currentThreadThreadLocal.get();
1✔
137
    if (result == null) {
1✔
138
      return Optional.empty();
1✔
139
    }
140
    return Optional.of(result);
1✔
141
  }
142

143
  static void setCurrentThreadInternal(WorkflowThread coroutine) {
144
    if (coroutine != null) {
1✔
145
      currentThreadThreadLocal.set(coroutine);
1✔
146
      WorkflowThreadMarkerAccessor.markAsWorkflowThread();
1✔
147
    } else {
148
      currentThreadThreadLocal.set(null);
1✔
149
      WorkflowThreadMarkerAccessor.markAsNonWorkflowThread();
1✔
150
    }
151
  }
1✔
152

153
  /**
154
   * Used to check for failedPromises that contain an error, but never where accessed. It is to
155
   * avoid failure swallowing by failedPromises which is very hard to troubleshoot.
156
   */
157
  private final Set<Promise<?>> failedPromises = new HashSet<>();
1✔
158

159
  private WorkflowThread rootWorkflowThread;
160
  private final CancellationScopeImpl runnerCancellationScope;
161

162
  DeterministicRunnerImpl(
163
      WorkflowThreadExecutor workflowThreadExecutor,
164
      @Nonnull SyncWorkflowContext workflowContext,
165
      Runnable root) {
166
    this(workflowThreadExecutor, workflowContext, root, null);
1✔
167
  }
1✔
168

169
  DeterministicRunnerImpl(
170
      WorkflowThreadExecutor workflowThreadExecutor,
171
      @Nonnull SyncWorkflowContext workflowContext,
172
      Runnable root,
173
      WorkflowExecutorCache cache) {
1✔
174
    this.workflowThreadExecutor = workflowThreadExecutor;
1✔
175
    this.workflowContext = Preconditions.checkNotNull(workflowContext, "workflowContext");
1✔
176
    // TODO this should be refactored, publishing of this in an constructor into external objects is
177
    // a bad practice
178
    this.workflowContext.setRunner(this);
1✔
179
    this.cache = cache;
1✔
180
    this.runnerCancellationScope = new CancellationScopeImpl(true, null, null);
1✔
181
    this.rootRunnable = root;
1✔
182
  }
1✔
183

184
  @Override
185
  public void runUntilAllBlocked(long deadlockDetectionTimeout) {
186
    if (rootWorkflowThread == null) {
1✔
187
      rootWorkflowThread = newRootThread(rootRunnable);
1✔
188
      threads.add(rootWorkflowThread);
1✔
189
      rootWorkflowThread.start();
1✔
190
    }
191
    lock.lock();
1✔
192
    try {
193
      checkNotClosed();
1✔
194
      checkNotCloseRequestedLocked();
1✔
195
      inRunUntilAllBlocked = true;
1✔
196
      // Keep repeating until at least one of the threads makes progress.
197
      boolean progress;
198
      outerLoop:
199
      do {
200
        if (exitRequested) {
1✔
201
          closeRequested = true;
×
202
          break;
×
203
        }
204
        if (!toExecuteInWorkflowThread.isEmpty()) {
1✔
205
          for (NamedRunnable nr : toExecuteInWorkflowThread) {
1✔
206
            Object callbackThread =
1✔
207
                workflowContext
208
                    .getWorkflowInboundInterceptor()
1✔
209
                    .newCallbackThread(nr.runnable, nr.name);
1✔
210
            Preconditions.checkState(
1✔
211
                callbackThread != null,
212
                "[BUG] One of the custom interceptors illegally overrode newCallbackThread result to null. "
213
                    + "Check WorkflowInboundCallsInterceptor#newCallbackThread contract.");
214
            Preconditions.checkState(
1✔
215
                callbackThread instanceof WorkflowThread,
216
                "[BUG] One of the custom interceptors illegally overrode newCallbackThread result. "
217
                    + "Check WorkflowInboundCallsInterceptor#newCallbackThread contract. "
218
                    + "Illegal object returned from the interceptors chain: %s",
219
                callbackThread);
220
          }
1✔
221

222
          appendCallbackThreadsLocked();
1✔
223
        }
224
        toExecuteInWorkflowThread.clear();
1✔
225
        progress = false;
1✔
226
        Iterator<WorkflowThread> ci = threads.iterator();
1✔
227
        while (ci.hasNext()) {
1✔
228
          WorkflowThread c = ci.next();
1✔
229
          progress = c.runUntilBlocked(deadlockDetectionTimeout) || progress;
1✔
230
          if (exitRequested) {
1✔
231
            closeRequested = true;
1✔
232
            break outerLoop;
1✔
233
          }
234
          if (c.isDone()) {
1✔
235
            ci.remove();
1✔
236
            Throwable unhandledException = c.getUnhandledException();
1✔
237
            if (unhandledException != null) {
1✔
238
              closeRequested = true;
1✔
239
              throw WorkflowInternal.wrap(unhandledException);
1✔
240
            }
241
          }
242
        }
1✔
243
        appendWorkflowThreadsLocked();
1✔
244
      } while (progress && !threads.isEmpty());
1✔
245
    } catch (PotentialDeadlockException e) {
1✔
246
      String triggerThreadStackTrace = "";
1✔
247
      StringBuilder otherThreadsDump = new StringBuilder();
1✔
248
      for (WorkflowThread t : threads) {
1✔
249
        if (t.getWorkflowThreadContext() != e.getWorkflowThreadContext()) {
1✔
250
          if (otherThreadsDump.length() > 0) {
1✔
251
            otherThreadsDump.append("\n");
×
252
          }
253
          otherThreadsDump.append(t.getStackTrace());
1✔
254
        } else {
255
          triggerThreadStackTrace = t.getStackTrace();
1✔
256
        }
257
      }
1✔
258
      e.setStackDump(
1✔
259
          triggerThreadStackTrace, otherThreadsDump.toString(), System.currentTimeMillis());
1✔
260
      throw e;
1✔
261
    } finally {
262
      inRunUntilAllBlocked = false;
1✔
263
      lock.unlock();
1✔
264
      // Close was requested while running
265
      if (closeRequested) {
1✔
266
        close();
1✔
267
      }
268
    }
269
  }
1✔
270

271
  @Override
272
  public boolean isDone() {
273
    lock.lock();
1✔
274
    try {
275
      return closeFuture.isDone()
1✔
276
          // if close is requested, we should wait for the closeFuture to be filled
277
          || !closeRequested && !areThreadsToBeExecuted();
1✔
278
    } finally {
279
      lock.unlock();
1✔
280
    }
281
  }
282

283
  @Override
284
  public void cancel(String reason) {
285
    executeInWorkflowThread("cancel workflow callback", () -> rootWorkflowThread.cancel(reason));
1✔
286
  }
1✔
287

288
  /**
289
   * Destroys all controlled workflow threads by throwing {@link DestroyWorkflowThreadError} from
290
   * {@link WorkflowThreadContext#yield(String, Supplier)} when the threads are blocking on the
291
   * temporal-sdk code.
292
   */
293
  @Override
294
  public void close() {
295
    lock.lock();
1✔
296
    if (closeFuture.isDone()) {
1✔
297
      lock.unlock();
1✔
298
      return;
1✔
299
    }
300
    closeRequested = true;
1✔
301
    if (
1✔
302
    // If runUntilAllBlocked is true, event loop and workflow threads are executing.
303
    // Do not close immediately in this case.
304
    // closeRequested set earlier will make an event loop control thread to call close()
305
    // at the end that will actually perform the closure.
306
    inRunUntilAllBlocked
307
        // some other thread or caller is already in the process of closing
308
        || closeStarted) {
309

310
      lock.unlock();
1✔
311
      // We will not perform the closure in this call and should just wait on the future when
312
      // another thread responsible for it will close.
313
      closeFuture.join();
1✔
314
      return;
1✔
315
    }
316

317
    closeStarted = true;
1✔
318
    // lock is taken here
319
    try {
320
      // in some circumstances when a workflow broke Deadline Detector,
321
      // runUntilAllBlocked may return while workflow threads are still running.
322
      // If this happens, these threads may potentially start new additional threads that will be
323
      // in workflowThreadsToAdd and callbackThreadsToAdd.
324
      // That's why we need to make sure that all the spawned threads were shut down in a cycle.
325
      while (areThreadsToBeExecuted()) {
1✔
326
        List<WorkflowThreadStopFuture> threadFutures = new ArrayList<>();
1✔
327
        try {
328
          toExecuteInWorkflowThread.clear();
1✔
329
          appendWorkflowThreadsLocked();
1✔
330
          appendCallbackThreadsLocked();
1✔
331
          for (WorkflowThread workflowThread : threads) {
1✔
332
            threadFutures.add(
1✔
333
                new WorkflowThreadStopFuture(workflowThread, workflowThread.stopNow()));
1✔
334
          }
1✔
335
          threads.clear();
1✔
336

337
          // We cannot use an iterator to unregister failed Promises since f.get()
338
          // will remove the promise directly from failedPromises. This causes an
339
          // ConcurrentModificationException
340
          // For this reason we will loop over a copy of failedPromises.
341
          Set<Promise<?>> failedPromisesLoop = new HashSet<>(failedPromises);
1✔
342
          for (Promise<?> f : failedPromisesLoop) {
1✔
343
            try {
344
              f.get();
×
345
              throw new Error("unreachable");
×
346
            } catch (RuntimeException e) {
1✔
347
              log.warn(
1✔
348
                  "Promise completed with exception and was never accessed. The ignored exception:",
349
                  CheckedExceptionWrapper.unwrap(e));
1✔
350
            }
351
          }
1✔
352
        } finally {
353
          // we need to unlock for the further code because threads will not be able to proceed with
354
          // destruction otherwise.
355
          lock.unlock();
1✔
356
        }
357

358
        // Wait on all tasks outside the lock since these tasks use the same lock to execute.
359
        try {
360
          for (WorkflowThreadStopFuture threadFuture : threadFutures) {
1✔
361
            try {
362
              threadFuture.stopFuture.get(10, TimeUnit.SECONDS);
1✔
363
            } catch (TimeoutException e) {
×
364
              WorkflowThread workflowThread = threadFuture.workflowThread;
×
365
              log.error(
×
366
                  "[BUG] Workflow thread '{}' of workflow '{}' can't be destroyed in time. "
367
                      + "This will lead to a workflow cache leak. "
368
                      + "This problem is usually caused by a workflow implementation swallowing java.lang.Error instead of rethrowing it. "
369
                      + " Thread dump of the stuck thread:\n{}",
370
                  workflowThread.getName(),
×
371
                  workflowContext.getReplayContext().getWorkflowId(),
×
372
                  workflowThread.getStackTrace());
×
373
            }
1✔
374
          }
1✔
375
        } catch (InterruptedException e) {
1✔
376
          Thread.currentThread().interrupt();
1✔
377
          // Worker is likely stopped with shutdownNow()
378
          // TODO consider propagating as an original interrupted exception to the top level
379
          throw new Error("Worker executor thread interrupted during stopping of a coroutine", e);
1✔
380
        } catch (ExecutionException e) {
×
381
          throw new Error("[BUG] Unexpected failure while stopping a coroutine", e);
×
382
        } finally {
383
          // acquire the lock back as it should be taken for the loop condition check.
384
          lock.lock();
1✔
385
        }
386
      }
1✔
387
    } finally {
388
      closeFuture.complete(null);
1✔
389
      lock.unlock();
1✔
390
    }
391
  }
1✔
392

393
  @Override
394
  public String stackTrace() {
395
    StringBuilder result = new StringBuilder();
1✔
396
    lock.lock();
1✔
397
    try {
398
      if (closeFuture.isDone()) {
1✔
399
        return "Workflow is closed.";
1✔
400
      }
401
      for (WorkflowThread coroutine : threads) {
1✔
402
        if (result.length() > 0) {
1✔
403
          result.append("\n");
1✔
404
        }
405
        coroutine.addStackTrace(result);
1✔
406
      }
1✔
407
    } finally {
408
      lock.unlock();
1✔
409
    }
410
    return result.toString();
1✔
411
  }
412

413
  private void appendWorkflowThreadsLocked() {
414
    threads.addAll(workflowThreadsToAdd);
1✔
415
    workflowThreadsToAdd.clear();
1✔
416
  }
1✔
417

418
  private void appendCallbackThreadsLocked() {
419
    // TODO I'm not sure this comment makes sense, because threads list has comparator and we use
420
    // thread priorities anyway.
421

422
    // It is important to prepend threads as there are callbacks
423
    // like signals that have to run before any other threads.
424
    // Otherwise signal might be never processed if it was received
425
    // after workflow decided to close.
426
    // Adding the callbacks in the same order as they appear in history.
427
    for (int i = callbackThreadsToAdd.size() - 1; i >= 0; i--) {
1✔
428
      threads.add(callbackThreadsToAdd.get(i));
1✔
429
    }
430
    callbackThreadsToAdd.clear();
1✔
431
  }
1✔
432

433
  /** Creates a new instance of a root workflow thread. */
434
  private WorkflowThread newRootThread(Runnable runnable) {
435
    String name = WORKFLOW_ROOT_THREAD_NAME;
1✔
436
    // TODO: workflow instance specific thread name
437
    // String name = "workflow[" + workflowContext.getContext().getWorkflowId() + "]-root";
438
    if (rootWorkflowThread != null) {
1✔
439
      throw new IllegalStateException(
×
440
          "newRootThread can be called only if there is no existing root workflow thread");
441
    }
442
    rootWorkflowThread =
1✔
443
        new WorkflowThreadImpl(
444
            workflowThreadExecutor,
445
            workflowContext,
446
            this,
447
            name,
448
            ROOT_THREAD_PRIORITY,
449
            false,
450
            runnerCancellationScope,
451
            runnable,
452
            cache,
453
            getContextPropagators(),
1✔
454
            getPropagatedContexts());
1✔
455
    return rootWorkflowThread;
1✔
456
  }
457

458
  @Nonnull
459
  @Override
460
  public WorkflowThread newWorkflowThread(
461
      Runnable runnable, boolean detached, @Nullable String name) {
462
    if (name == null) {
1✔
463
      name = "workflow[" + workflowContext.getReplayContext().getWorkflowId() + "]-" + addedThreads;
1✔
464
    }
465
    if (rootWorkflowThread == null) {
1✔
466
      throw new IllegalStateException(
×
467
          "newChildThread can be called only with existing root workflow thread");
468
    }
469
    checkWorkflowThreadOnly();
1✔
470
    checkNotClosed();
1✔
471
    WorkflowThread result =
1✔
472
        new WorkflowThreadImpl(
473
            workflowThreadExecutor,
474
            workflowContext,
475
            this,
476
            name,
477
            WORKFLOW_THREAD_PRIORITY + (addedThreads++),
478
            detached,
479
            CancellationScopeImpl.current(),
1✔
480
            runnable,
481
            cache,
482
            getContextPropagators(),
1✔
483
            getPropagatedContexts());
1✔
484
    workflowThreadsToAdd.add(result);
1✔
485
    return result;
1✔
486
  }
487

488
  @Nonnull
489
  @Override
490
  public WorkflowThread newCallbackThread(Runnable runnable, @Nullable String name) {
491
    if (name == null) {
1✔
492
      name = "workflow[" + workflowContext.getReplayContext().getWorkflowId() + "]-" + addedThreads;
×
493
    }
494
    WorkflowThread result =
1✔
495
        new WorkflowThreadImpl(
496
            workflowThreadExecutor,
497
            workflowContext,
498
            this,
499
            name,
500
            CALLBACK_THREAD_PRIORITY
501
                + (addedThreads++), // maintain the order in toExecuteInWorkflowThread
502
            false,
503
            runnerCancellationScope,
504
            runnable,
505
            cache,
506
            getContextPropagators(),
1✔
507
            getPropagatedContexts());
1✔
508
    callbackThreadsToAdd.add(result);
1✔
509
    return result;
1✔
510
  }
511

512
  /**
513
   * Executes before any other threads next time runUntilBlockedCalled. Must never be called from
514
   * any workflow threads.
515
   */
516
  @Override
517
  public void executeInWorkflowThread(String name, Runnable runnable) {
518
    lock.lock();
1✔
519
    try {
520
      // if the execution is closed, we will just add the callbacks, but we will never create
521
      // threads for them, so they will be effectively ignored
522
      toExecuteInWorkflowThread.add(new NamedRunnable(name, runnable));
1✔
523
    } finally {
524
      lock.unlock();
1✔
525
    }
526
  }
1✔
527

528
  Lock getLock() {
529
    return lock;
1✔
530
  }
531

532
  /** Register a promise that had failed but wasn't accessed yet. */
533
  void registerFailedPromise(Promise<?> promise) {
534
    if (!promise.isCompleted()) {
1✔
535
      throw new Error("expected failed");
×
536
    }
537
    failedPromises.add(promise);
1✔
538
  }
1✔
539

540
  /** Forget a failed promise as it was accessed. */
541
  void forgetFailedPromise(Promise<?> promise) {
542
    failedPromises.remove(promise);
1✔
543
  }
1✔
544

545
  void exit() {
546
    checkNotClosed();
1✔
547
    checkWorkflowThreadOnly();
1✔
548
    this.exitRequested = true;
1✔
549
  }
1✔
550

551
  private void checkWorkflowThreadOnly() {
552
    // TODO this is not a correct way to test for the fact that the method is called from the
553
    // workflow method.
554
    //  This check verifies that the workflow methods or controlling code are now running,
555
    //  but it doesn't verify if the calling thread is the one.
556
    if (!inRunUntilAllBlocked) {
1✔
557
      throw new Error("called from non workflow thread");
×
558
    }
559
  }
1✔
560

561
  private void checkNotCloseRequestedLocked() {
562
    if (closeRequested) {
1✔
563
      throw new Error("close requested");
×
564
    }
565
  }
1✔
566

567
  private void checkNotClosed() {
568
    if (closeFuture.isDone()) {
1✔
569
      throw new Error("closed");
×
570
    }
571
  }
1✔
572

573
  /**
574
   * @return true if there are no threads left to be processed for this workflow.
575
   */
576
  private boolean areThreadsToBeExecuted() {
577
    return !threads.isEmpty()
1✔
578
        || !workflowThreadsToAdd.isEmpty()
1✔
579
        || !callbackThreadsToAdd.isEmpty()
1✔
580
        || !toExecuteInWorkflowThread.isEmpty();
1✔
581
  }
582

583
  @SuppressWarnings("unchecked")
584
  <T> Optional<T> getRunnerLocal(RunnerLocalInternal<T> key) {
585
    if (!runnerLocalMap.containsKey(key)) {
1✔
586
      return Optional.empty();
1✔
587
    }
588
    return Optional.of((T) runnerLocalMap.get(key));
1✔
589
  }
590

591
  <T> void setRunnerLocal(RunnerLocalInternal<T> key, T value) {
592
    runnerLocalMap.put(key, value);
1✔
593
  }
1✔
594

595
  /**
596
   * If we're executing as part of a workflow, get the current thread's context. Otherwise get the
597
   * context info from the workflow context.
598
   */
599
  private Map<String, Object> getPropagatedContexts() {
600
    if (currentThreadThreadLocal.get() != null) {
1✔
601
      return ContextThreadLocal.getCurrentContextForPropagation();
1✔
602
    } else {
603
      return workflowContext.getPropagatedContexts();
1✔
604
    }
605
  }
606

607
  private List<ContextPropagator> getContextPropagators() {
608
    if (currentThreadThreadLocal.get() != null) {
1✔
609
      return ContextThreadLocal.getContextPropagators();
1✔
610
    } else {
611
      return workflowContext.getContextPropagators();
1✔
612
    }
613
  }
614

615
  private static class WorkflowThreadMarkerAccessor extends WorkflowThreadMarker {
616
    public static void markAsWorkflowThread() {
617
      isWorkflowThreadThreadLocal.set(true);
1✔
618
    }
1✔
619

620
    public static void markAsNonWorkflowThread() {
621
      isWorkflowThreadThreadLocal.set(false);
1✔
622
    }
1✔
623
  }
624

625
  private static class NamedRunnable {
626
    private final String name;
627
    private final Runnable runnable;
628

629
    private NamedRunnable(String name, Runnable runnable) {
1✔
630
      this.name = name;
1✔
631
      this.runnable = runnable;
1✔
632
    }
1✔
633
  }
634

635
  private static class WorkflowThreadStopFuture {
636
    private final WorkflowThread workflowThread;
637
    private final Future<?> stopFuture;
638

639
    public WorkflowThreadStopFuture(WorkflowThread workflowThread, Future<?> stopFuture) {
1✔
640
      this.workflowThread = workflowThread;
1✔
641
      this.stopFuture = stopFuture;
1✔
642
    }
1✔
643
  }
644
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc