• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

temporalio / sdk-java / #165

pending completion
#165

push

github-actions

web-flow
Allow Data Converter code to escape deadlock detection (#1723)

Issue #1301

116 of 116 new or added lines in 5 files covered. (100.0%)

17121 of 20963 relevant lines covered (81.67%)

0.82 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

78.68
/temporal-sdk/src/main/java/io/temporal/internal/sync/WorkflowThreadContext.java
1
/*
2
 * Copyright (C) 2022 Temporal Technologies, Inc. All Rights Reserved.
3
 *
4
 * Copyright (C) 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved.
5
 *
6
 * Modifications copyright (C) 2017 Uber Technologies, Inc.
7
 *
8
 * Licensed under the Apache License, Version 2.0 (the "License");
9
 * you may not use this material except in compliance with the License.
10
 * You may obtain a copy of the License at
11
 *
12
 *   http://www.apache.org/licenses/LICENSE-2.0
13
 *
14
 * Unless required by applicable law or agreed to in writing, software
15
 * distributed under the License is distributed on an "AS IS" BASIS,
16
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17
 * See the License for the specific language governing permissions and
18
 * limitations under the License.
19
 */
20

21
package io.temporal.internal.sync;
22

23
import com.google.common.base.Preconditions;
24
import com.google.common.base.Throwables;
25
import io.temporal.internal.common.NonIdempotentHandle;
26
import io.temporal.internal.common.env.DebugModeUtils;
27
import io.temporal.workflow.Functions;
28
import java.util.concurrent.TimeUnit;
29
import java.util.concurrent.locks.Condition;
30
import java.util.concurrent.locks.Lock;
31
import java.util.function.Supplier;
32
import javax.annotation.Nullable;
33
import org.slf4j.Logger;
34
import org.slf4j.LoggerFactory;
35

36
class WorkflowThreadContext {
37
  private static final Logger log = LoggerFactory.getLogger(WorkflowThreadContext.class);
1✔
38

39
  // Shared runner lock
40
  private final Lock runnerLock;
41
  private final WorkflowThreadScheduler scheduler;
42
  // Used to block evaluateInCoroutineContext
43
  private final Condition evaluationCondition;
44

45
  // thread safety of these field is guaranteed by taking a #lock for reading or updating
46
  private Status status = Status.CREATED;
1✔
47
  @Nullable private Thread currentThread;
48

49
  private Functions.Proc1<String> evaluationFunction;
50
  private Throwable unhandledException;
51
  private boolean inRunUntilBlocked;
52
  private boolean remainedBlocked;
53
  private String yieldReason;
54
  private boolean destroyRequested;
55

56
  WorkflowThreadContext(Lock runnerLock) {
1✔
57
    this.runnerLock = runnerLock;
1✔
58
    this.scheduler = new WorkflowThreadScheduler(runnerLock);
1✔
59
    this.evaluationCondition = runnerLock.newCondition();
1✔
60
  }
1✔
61

62
  /**
63
   * Initial yield allows the workflow thread to block at the start before exercising any user code.
64
   * This gives a control over when the execution may be started. It also provides happens-before
65
   * relationship with other threads and DeterministicRunnerImpl through shared lock.
66
   */
67
  public void initialYield() {
68
    Status status = getStatus();
1✔
69
    if (status == Status.DONE) {
1✔
70
      throw new DestroyWorkflowThreadError("done in initialYield");
×
71
    }
72
    Preconditions.checkState(status == Status.RUNNING, "threadContext not in RUNNING state");
1✔
73
    this.yield("created", () -> true);
1✔
74
  }
1✔
75

76
  public void yield(String reason, Supplier<Boolean> unblockFunction) {
77
    if (unblockFunction == null) {
1✔
78
      throw new IllegalArgumentException("null unblockFunction");
×
79
    }
80
    // Evaluates unblockFunction out of the lock to avoid deadlocks.
81
    runnerLock.lock();
1✔
82
    try {
83
      if (destroyRequested) {
1✔
84
        throw new DestroyWorkflowThreadError();
1✔
85
      }
86
      yieldReason = reason;
1✔
87

88
      while (!inRunUntilBlocked || !unblockFunction.get()) {
1✔
89
        status = Status.YIELDED;
1✔
90
        scheduler.yieldLocked();
1✔
91
        if (destroyRequested) {
1✔
92
          throw new DestroyWorkflowThreadError();
1✔
93
        }
94
        maybeEvaluateLocked(reason);
1✔
95
      }
96

97
      status = Status.RUNNING;
1✔
98
      yieldReason = null;
1✔
99
    } catch (InterruptedException e) {
1✔
100
      // Throwing Error in workflow code aborts workflow task without failing workflow.
101
      Thread.currentThread().interrupt();
1✔
102
      if (!isDestroyRequested()) {
1✔
103
        throw new Error("Unexpected interrupt", e);
1✔
104
      }
105
    } finally {
106
      remainedBlocked = false;
1✔
107
      runnerLock.unlock();
1✔
108
    }
109
  }
1✔
110

111
  /**
112
   * Execute evaluation function by the thread that owns this context if {@link
113
   * #evaluateInCoroutineContext(Functions.Proc1)} was called.
114
   *
115
   * <p>Should be called under the lock
116
   *
117
   * @param reason human-readable reason for current thread blockage passed to await call.
118
   */
119
  private void maybeEvaluateLocked(String reason) {
120
    if (status == Status.EVALUATING) {
1✔
121
      try {
122
        evaluationFunction.apply(reason);
×
123
      } catch (Exception e) {
×
124
        evaluationFunction.apply(Throwables.getStackTraceAsString(e));
×
125
      } finally {
126
        status = Status.RUNNING;
×
127
        evaluationCondition.signal();
×
128
      }
129
    }
130
  }
1✔
131

132
  /**
133
   * Call {@code function} by the thread that owns this context and is currently blocked in a await.
134
   * Used to get information about current state of the thread like current stack trace.
135
   *
136
   * @param function to evaluate. Consumes reason for yielding as a parameter.
137
   */
138
  public void evaluateInCoroutineContext(Functions.Proc1<String> function) {
139
    runnerLock.lock();
×
140
    try {
141
      Preconditions.checkArgument(function != null, "null function");
×
142
      if (status != Status.YIELDED && status != Status.RUNNING) {
×
143
        throw new IllegalStateException("Not in yielded status: " + status);
×
144
      }
145
      if (evaluationFunction != null) {
×
146
        // We need to make sure no parallel evaluateInCoroutineContext are permitted.
147
        // The lock itself is not enough, because we call `await` later in the method body that
148
        // releases the original lock.
149
        // So we do a non-atomic CAS by comparing `evaluationFunction` with null and setting it on
150
        // entrance and resetting it on exit under a lock to achieve exclusivity.
151
        // We don't need an atomic CAS here, because we are under the lock when we perform these
152
        // operations.
153
        // A more explicit solution may be implemented using a separate lock for evaluate calls.
154
        throw new IllegalStateException("Already evaluating");
×
155
      }
156
      Preconditions.checkState(!inRunUntilBlocked, "Running runUntilBlocked");
×
157
      evaluationFunction = function;
×
158
      status = Status.EVALUATING;
×
159
      scheduler.scheduleLocked();
×
160
      while (status == Status.EVALUATING) {
×
161
        evaluationCondition.await();
×
162
      }
163
    } catch (InterruptedException e) {
×
164
      Thread.currentThread().interrupt();
×
165
      throw new Error("Unexpected interrupt", e);
×
166
    } finally {
167
      evaluationFunction = null;
×
168
      runnerLock.unlock();
×
169
    }
170
  }
×
171

172
  public void verifyAndStart() {
173
    runnerLock.lock();
1✔
174
    try {
175
      Preconditions.checkState(this.status == Status.CREATED, "already started");
1✔
176
      this.status = Status.RUNNING;
1✔
177
    } finally {
178
      runnerLock.unlock();
1✔
179
    }
180
  }
1✔
181

182
  public Status getStatus() {
183
    runnerLock.lock();
1✔
184
    try {
185
      return status;
1✔
186
    } finally {
187
      runnerLock.unlock();
1✔
188
    }
189
  }
190

191
  public void makeDone() {
192
    // Unblock runUntilBlocked if thread exited instead of yielding.
193
    runnerLock.lock();
1✔
194
    try {
195
      this.status = Status.DONE;
1✔
196
      scheduler.completeLocked();
1✔
197
      // it's important to clear the thread after or together (under one lock) when setting the
198
      // status, so nobody sees the context yet with RUNNING status, but without a currentThread
199
      clearCurrentThreadLocked();
1✔
200
    } finally {
201
      runnerLock.unlock();
1✔
202
    }
203
  }
1✔
204

205
  public boolean isDone() {
206
    runnerLock.lock();
1✔
207
    try {
208
      return status == Status.DONE;
1✔
209
    } finally {
210
      runnerLock.unlock();
1✔
211
    }
212
  }
213

214
  public Throwable getUnhandledException() {
215
    runnerLock.lock();
1✔
216
    try {
217
      return unhandledException;
1✔
218
    } finally {
219
      runnerLock.unlock();
1✔
220
    }
221
  }
222

223
  public void setUnhandledException(Throwable unhandledException) {
224
    runnerLock.lock();
1✔
225
    try {
226
      this.unhandledException = unhandledException;
1✔
227
    } finally {
228
      runnerLock.unlock();
1✔
229
    }
230
  }
1✔
231

232
  public String getYieldReason() {
233
    return yieldReason;
1✔
234
  }
235

236
  /**
237
   * @param deadlockDetectionTimeoutMs maximum time in milliseconds the thread can run before
238
   *     calling yield. Discarded if {@code TEMPORAL_DEBUG} env variable is set.
239
   * @return true if thread made some progress. Which is await was unblocked and some code after it
240
   *     * was executed.
241
   */
242
  public boolean runUntilBlocked(long deadlockDetectionTimeoutMs) {
243
    if (DebugModeUtils.isTemporalDebugModeOn()) {
1✔
244
      deadlockDetectionTimeoutMs = Long.MAX_VALUE;
1✔
245
    }
246
    runnerLock.lock();
1✔
247
    try {
248
      if (status == Status.DONE) {
1✔
249
        return false;
×
250
      }
251
      Preconditions.checkState(
1✔
252
          evaluationFunction == null, "Cannot runUntilBlocked while evaluating");
253
      inRunUntilBlocked = true;
1✔
254
      remainedBlocked = true;
1✔
255
      scheduler.scheduleLocked();
1✔
256
      WorkflowThreadScheduler.WaitForYieldResult yieldResult =
1✔
257
          scheduler.waitForYieldLocked(deadlockDetectionTimeoutMs, TimeUnit.MILLISECONDS);
1✔
258
      if (WorkflowThreadScheduler.WaitForYieldResult.DEADLOCK_DETECTED.equals(yieldResult)) {
1✔
259
        long detectionTimestamp = System.currentTimeMillis();
1✔
260
        if (currentThread != null) {
1✔
261
          throw new PotentialDeadlockException(currentThread.getName(), this, detectionTimestamp);
1✔
262
        } else {
263
          // This should never happen.
264
          // We clear currentThread only after setting the status to DONE.
265
          // And we check for it by the status condition check after waking up on the condition
266
          // and acquiring the lock back
267
          log.warn("Illegal State: WorkflowThreadContext has no currentThread in {} state", status);
×
268
          throw new PotentialDeadlockException("UnknownThread", this, detectionTimestamp);
×
269
        }
270
      }
271
      Preconditions.checkState(
1✔
272
          evaluationFunction == null, "Cannot runUntilBlocked while evaluating");
273
      return !remainedBlocked;
1✔
274
    } catch (InterruptedException e) {
1✔
275
      Thread.currentThread().interrupt();
1✔
276
      if (!isDestroyRequested()) {
1✔
277
        throw new Error("Unexpected interrupt", e);
1✔
278
      }
279
      return true;
×
280
    } finally {
281
      inRunUntilBlocked = false;
1✔
282
      runnerLock.unlock();
1✔
283
    }
284
  }
285

286
  public boolean isDestroyRequested() {
287
    runnerLock.lock();
1✔
288
    try {
289
      return destroyRequested;
1✔
290
    } finally {
291
      runnerLock.unlock();
1✔
292
    }
293
  }
294

295
  /**
296
   * Non-blocking call, never throws.<br>
297
   * There is no guarantee that the thread is destroyed at the end of this call.
298
   */
299
  void initiateDestroy() {
300
    runnerLock.lock();
1✔
301
    try {
302
      destroyRequested = true;
1✔
303
      if (status == Status.CREATED) {
1✔
304
        // prevent from running
305
        status = Status.DONE;
1✔
306
        return;
1✔
307
      }
308
      if (status == Status.RUNNING) {
1✔
309
        // we don't want to trigger an event loop if we are running already
310
        return;
1✔
311
      }
312
      if (status == Status.DONE) {
1✔
313
        // nothing to destroy
314
        return;
1✔
315
      }
316
      scheduler.scheduleLocked();
1✔
317
    } finally {
318
      runnerLock.unlock();
1✔
319
    }
320
  }
1✔
321

322
  public void initializeCurrentThread(Thread currentThread) {
323
    runnerLock.lock();
1✔
324
    try {
325
      this.currentThread = currentThread;
1✔
326
    } finally {
327
      runnerLock.unlock();
1✔
328
    }
329
  }
1✔
330

331
  /**
332
   * Call at the end of the execution to set up a current thread to null because it could be running
333
   * a different workflow already and doesn't belong to this context anymore.
334
   *
335
   * <p>Should be called under the lock
336
   */
337
  private void clearCurrentThreadLocked() {
338
    this.currentThread = null;
1✔
339
  }
1✔
340

341
  /**
342
   * @return current thread that owns this context, could be null if the execution finished or
343
   *     didn't start yet
344
   */
345
  @Nullable
346
  public Thread getCurrentThread() {
347
    runnerLock.lock();
1✔
348
    try {
349
      return currentThread;
1✔
350
    } finally {
351
      runnerLock.unlock();
1✔
352
    }
353
  }
354

355
  public NonIdempotentHandle lockDeadlockDetector() {
356
    scheduler.lockDeadlockDetection();
1✔
357
    return scheduler::unlockDeadlockDetection;
1✔
358
  }
359
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc