• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

temporalio / sdk-java / #120

pending completion
#120

push

github-actions

web-flow
Rename sdk-features to features (#1599)

16189 of 20022 relevant lines covered (80.86%)

0.81 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

75.61
/temporal-sdk/src/main/java/io/temporal/internal/sync/WorkflowThreadImpl.java
1
/*
2
 * Copyright (C) 2022 Temporal Technologies, Inc. All Rights Reserved.
3
 *
4
 * Copyright (C) 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved.
5
 *
6
 * Modifications copyright (C) 2017 Uber Technologies, Inc.
7
 *
8
 * Licensed under the Apache License, Version 2.0 (the "License");
9
 * you may not use this material except in compliance with the License.
10
 * You may obtain a copy of the License at
11
 *
12
 *   http://www.apache.org/licenses/LICENSE-2.0
13
 *
14
 * Unless required by applicable law or agreed to in writing, software
15
 * distributed under the License is distributed on an "AS IS" BASIS,
16
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17
 * See the License for the specific language governing permissions and
18
 * limitations under the License.
19
 */
20

21
package io.temporal.internal.sync;
22

23
import com.google.common.base.Preconditions;
24
import io.temporal.common.context.ContextPropagator;
25
import io.temporal.failure.CanceledFailure;
26
import io.temporal.internal.context.ContextThreadLocal;
27
import io.temporal.internal.logging.LoggerTag;
28
import io.temporal.internal.replay.ReplayWorkflowContext;
29
import io.temporal.internal.worker.WorkflowExecutorCache;
30
import io.temporal.workflow.Functions;
31
import io.temporal.workflow.Promise;
32
import java.io.PrintWriter;
33
import java.io.StringWriter;
34
import java.util.HashMap;
35
import java.util.List;
36
import java.util.Map;
37
import java.util.Optional;
38
import java.util.concurrent.CompletableFuture;
39
import java.util.concurrent.Future;
40
import java.util.concurrent.RejectedExecutionException;
41
import java.util.function.Supplier;
42
import javax.annotation.Nonnull;
43
import javax.annotation.Nullable;
44
import org.slf4j.Logger;
45
import org.slf4j.LoggerFactory;
46
import org.slf4j.MDC;
47

48
class WorkflowThreadImpl implements WorkflowThread {
49
  /**
50
   * Runnable passed to the thread that wraps a runnable passed to the WorkflowThreadImpl
51
   * constructor.
52
   */
53
  class RunnableWrapper implements Runnable {
54

55
    private final WorkflowThreadContext threadContext;
56
    // TODO: Move MDC injection logic into an interceptor as this context shouldn't be leaked
57
    // to the WorkflowThreadImpl
58
    private final ReplayWorkflowContext replayWorkflowContext;
59
    private String originalName;
60
    private String name;
61
    private final CancellationScopeImpl cancellationScope;
62
    private final List<ContextPropagator> contextPropagators;
63
    private final Map<String, Object> propagatedContexts;
64

65
    RunnableWrapper(
66
        WorkflowThreadContext threadContext,
67
        ReplayWorkflowContext replayWorkflowContext,
68
        String name,
69
        boolean detached,
70
        CancellationScopeImpl parent,
71
        Runnable runnable,
72
        List<ContextPropagator> contextPropagators,
73
        Map<String, Object> propagatedContexts) {
1✔
74
      this.threadContext = threadContext;
1✔
75
      this.replayWorkflowContext = replayWorkflowContext;
1✔
76
      this.name = name;
1✔
77
      this.cancellationScope = new CancellationScopeImpl(detached, runnable, parent);
1✔
78
      Preconditions.checkState(
1✔
79
          context.getStatus() == Status.CREATED, "threadContext not in CREATED state");
1✔
80
      this.contextPropagators = contextPropagators;
1✔
81
      this.propagatedContexts = propagatedContexts;
1✔
82
    }
1✔
83

84
    @Override
85
    public void run() {
86
      Thread thread = Thread.currentThread();
1✔
87
      originalName = thread.getName();
1✔
88
      thread.setName(name);
1✔
89

90
      threadContext.initializeCurrentThread(thread);
1✔
91
      DeterministicRunnerImpl.setCurrentThreadInternal(WorkflowThreadImpl.this);
1✔
92

93
      MDC.put(LoggerTag.WORKFLOW_ID, replayWorkflowContext.getWorkflowId());
1✔
94
      MDC.put(LoggerTag.WORKFLOW_TYPE, replayWorkflowContext.getWorkflowType().getName());
1✔
95
      MDC.put(LoggerTag.RUN_ID, replayWorkflowContext.getRunId());
1✔
96
      MDC.put(LoggerTag.TASK_QUEUE, replayWorkflowContext.getTaskQueue());
1✔
97
      MDC.put(LoggerTag.NAMESPACE, replayWorkflowContext.getNamespace());
1✔
98

99
      // Repopulate the context(s)
100
      ContextThreadLocal.setContextPropagators(this.contextPropagators);
1✔
101
      ContextThreadLocal.propagateContextToCurrentThread(this.propagatedContexts);
1✔
102
      try {
103
        // initialYield blocks thread until the first runUntilBlocked is called.
104
        // Otherwise, r starts executing without control of the sync.
105
        threadContext.initialYield();
1✔
106
        cancellationScope.run();
1✔
107
      } catch (DestroyWorkflowThreadError e) {
1✔
108
        if (!threadContext.isDestroyRequested()) {
1✔
109
          threadContext.setUnhandledException(e);
1✔
110
        }
111
      } catch (Error e) {
1✔
112
        threadContext.setUnhandledException(e);
1✔
113
      } catch (CanceledFailure e) {
×
114
        if (!isCancelRequested()) {
×
115
          threadContext.setUnhandledException(e);
×
116
        }
117
        if (log.isDebugEnabled()) {
×
118
          log.debug(String.format("Workflow thread \"%s\" run canceled", name));
×
119
        }
120
      } catch (Throwable e) {
1✔
121
        threadContext.setUnhandledException(e);
1✔
122
      } finally {
123
        DeterministicRunnerImpl.setCurrentThreadInternal(null);
1✔
124
        threadContext.setStatus(Status.DONE);
1✔
125
        thread.setName(originalName);
1✔
126
        MDC.clear();
1✔
127
      }
128
    }
1✔
129

130
    public String getName() {
131
      return name;
1✔
132
    }
133

134
    StackTraceElement[] getStackTrace() {
135
      @Nullable Thread thread = threadContext.getCurrentThread();
1✔
136
      if (thread != null) {
1✔
137
        return thread.getStackTrace();
1✔
138
      }
139
      return new StackTraceElement[0];
×
140
    }
141

142
    public void setName(String name) {
143
      this.name = name;
×
144
      @Nullable Thread thread = threadContext.getCurrentThread();
×
145
      if (thread != null) {
×
146
        thread.setName(name);
×
147
      }
148
    }
×
149
  }
150

151
  private static final Logger log = LoggerFactory.getLogger(WorkflowThreadImpl.class);
1✔
152

153
  private final WorkflowThreadExecutor workflowThreadExecutor;
154
  private final WorkflowThreadContext context;
155
  private final WorkflowExecutorCache cache;
156
  private final SyncWorkflowContext syncWorkflowContext;
157

158
  private final DeterministicRunnerImpl runner;
159
  private final RunnableWrapper task;
160
  private final int priority;
161
  private Future<?> taskFuture;
162
  private final Map<WorkflowThreadLocalInternal<?>, Object> threadLocalMap = new HashMap<>();
1✔
163

164
  WorkflowThreadImpl(
165
      WorkflowThreadExecutor workflowThreadExecutor,
166
      SyncWorkflowContext syncWorkflowContext,
167
      DeterministicRunnerImpl runner,
168
      @Nonnull String name,
169
      int priority,
170
      boolean detached,
171
      CancellationScopeImpl parentCancellationScope,
172
      Runnable runnable,
173
      WorkflowExecutorCache cache,
174
      List<ContextPropagator> contextPropagators,
175
      Map<String, Object> propagatedContexts) {
1✔
176
    this.workflowThreadExecutor = workflowThreadExecutor;
1✔
177
    this.syncWorkflowContext = Preconditions.checkNotNull(syncWorkflowContext);
1✔
178
    this.runner = runner;
1✔
179
    this.context = new WorkflowThreadContext(runner.getLock());
1✔
180
    this.cache = cache;
1✔
181
    this.priority = priority;
1✔
182
    this.task =
1✔
183
        new RunnableWrapper(
184
            context,
185
            syncWorkflowContext.getReplayContext(),
1✔
186
            Preconditions.checkNotNull(name, "Thread name shouldn't be null"),
1✔
187
            detached,
188
            parentCancellationScope,
189
            runnable,
190
            contextPropagators,
191
            propagatedContexts);
192
  }
1✔
193

194
  @Override
195
  public void run() {
196
    throw new UnsupportedOperationException("not used");
×
197
  }
198

199
  @Override
200
  public boolean isDetached() {
201
    return task.cancellationScope.isDetached();
×
202
  }
203

204
  @Override
205
  public void cancel() {
206
    task.cancellationScope.cancel();
×
207
  }
×
208

209
  @Override
210
  public void cancel(String reason) {
211
    task.cancellationScope.cancel(reason);
1✔
212
  }
1✔
213

214
  @Override
215
  public String getCancellationReason() {
216
    return task.cancellationScope.getCancellationReason();
×
217
  }
218

219
  @Override
220
  public boolean isCancelRequested() {
221
    return task.cancellationScope.isCancelRequested();
×
222
  }
223

224
  @Override
225
  public Promise<String> getCancellationRequest() {
226
    return task.cancellationScope.getCancellationRequest();
×
227
  }
228

229
  @Override
230
  public void start() {
231
    context.verifyAndStart();
1✔
232
    while (true) {
233
      try {
234
        taskFuture = workflowThreadExecutor.submit(task);
1✔
235
        return;
1✔
236
      } catch (RejectedExecutionException e) {
1✔
237
        if (cache != null) {
1✔
238
          SyncWorkflowContext workflowContext = getWorkflowContext();
1✔
239
          ReplayWorkflowContext context = workflowContext.getReplayContext();
1✔
240
          boolean evicted =
1✔
241
              cache.evictAnyNotInProcessing(
1✔
242
                  context.getWorkflowExecution(), workflowContext.getMetricsScope());
1✔
243
          if (!evicted) {
1✔
244
            // Note here we need to throw error, not exception. Otherwise it will be
245
            // translated to workflow execution exception and instead of failing the
246
            // workflow task we will be failing the workflow.
247
            throw new WorkflowRejectedExecutionError(e);
×
248
          }
249
        } else {
1✔
250
          throw new WorkflowRejectedExecutionError(e);
1✔
251
        }
252
      }
1✔
253
    }
254
  }
255

256
  @Override
257
  public boolean isStarted() {
258
    return context.getStatus() != Status.CREATED;
×
259
  }
260

261
  @Override
262
  public WorkflowThreadContext getWorkflowThreadContext() {
263
    return context;
1✔
264
  }
265

266
  @Override
267
  public DeterministicRunnerImpl getRunner() {
268
    return runner;
1✔
269
  }
270

271
  @Override
272
  public SyncWorkflowContext getWorkflowContext() {
273
    return syncWorkflowContext;
1✔
274
  }
275

276
  @Override
277
  public void setName(String name) {
278
    task.setName(name);
×
279
  }
×
280

281
  @Override
282
  public String getName() {
283
    return task.getName();
1✔
284
  }
285

286
  @Override
287
  public long getId() {
288
    return hashCode();
×
289
  }
290

291
  @Override
292
  public int getPriority() {
293
    return priority;
1✔
294
  }
295

296
  @Override
297
  public boolean runUntilBlocked(long deadlockDetectionTimeoutMs) {
298
    if (taskFuture == null) {
1✔
299
      start();
1✔
300
    }
301
    return context.runUntilBlocked(deadlockDetectionTimeoutMs);
1✔
302
  }
303

304
  @Override
305
  public boolean isDone() {
306
    return context.isDone();
1✔
307
  }
308

309
  @Override
310
  public Throwable getUnhandledException() {
311
    return context.getUnhandledException();
1✔
312
  }
313

314
  /**
315
   * Evaluates function in the threadContext of the coroutine without unblocking it. Used to get
316
   * current coroutine status, like stack trace.
317
   *
318
   * @param function Parameter is reason for current goroutine blockage.
319
   */
320
  public void evaluateInCoroutineContext(Functions.Proc1<String> function) {
321
    context.evaluateInCoroutineContext(function);
×
322
  }
×
323

324
  /**
325
   * Interrupt coroutine by throwing DestroyWorkflowThreadError from an await method it is blocked
326
   * on and return underlying Future to be waited on.
327
   */
328
  @Override
329
  public Future<?> stopNow() {
330
    // Cannot call destroy() on itself
331
    @Nullable Thread thread = context.getCurrentThread();
1✔
332
    if (Thread.currentThread().equals(thread)) {
1✔
333
      throw new Error("Cannot call destroy on itself: " + thread.getName());
×
334
    }
335
    context.initiateDestroy();
1✔
336
    if (taskFuture == null) {
1✔
337
      return getCompletedFuture();
1✔
338
    }
339
    return taskFuture;
1✔
340
  }
341

342
  private Future<?> getCompletedFuture() {
343
    CompletableFuture<String> f = new CompletableFuture<>();
1✔
344
    f.complete("done");
1✔
345
    return f;
1✔
346
  }
347

348
  @Override
349
  public void addStackTrace(StringBuilder result) {
350
    result.append(getName());
1✔
351
    @Nullable Thread thread = context.getCurrentThread();
1✔
352
    if (thread == null) {
1✔
353
      result.append("(NEW)");
×
354
      return;
×
355
    }
356
    result
1✔
357
        .append(": (BLOCKED on ")
1✔
358
        .append(getWorkflowThreadContext().getYieldReason())
1✔
359
        .append(")\n");
1✔
360
    // These numbers might change if implementation changes.
361
    int omitTop = 5;
1✔
362
    int omitBottom = 7;
1✔
363
    // TODO it's not a good idea to rely on the name to understand the thread type. Instead of that
364
    // we would better
365
    // assign an explicit thread type enum to the threads. This will be especially important when we
366
    // refactor
367
    // root and workflow-method
368
    // thread names into names that will include workflowId
369
    if (DeterministicRunnerImpl.WORKFLOW_ROOT_THREAD_NAME.equals(getName())) {
1✔
370
      // TODO revisit this number
371
      omitBottom = 11;
×
372
    } else if (getName().startsWith(WorkflowMethodThreadNameStrategy.WORKFLOW_MAIN_THREAD_PREFIX)) {
1✔
373
      // TODO revisit this number
374
      omitBottom = 11;
1✔
375
    }
376
    StackTraceElement[] stackTrace = thread.getStackTrace();
1✔
377
    for (int i = omitTop; i < stackTrace.length - omitBottom; i++) {
1✔
378
      StackTraceElement e = stackTrace[i];
1✔
379
      if (i == omitTop && "await".equals(e.getMethodName())) continue;
1✔
380
      result.append(e);
1✔
381
      result.append("\n");
1✔
382
    }
383
  }
1✔
384

385
  @Override
386
  public void yield(String reason, Supplier<Boolean> unblockCondition) {
387
    context.yield(reason, unblockCondition);
1✔
388
  }
1✔
389

390
  @Override
391
  public void exitThread() {
392
    runner.exit();
1✔
393
    throw new DestroyWorkflowThreadError("exit");
1✔
394
  }
395

396
  @Override
397
  public <T> void setThreadLocal(WorkflowThreadLocalInternal<T> key, T value) {
398
    threadLocalMap.put(key, value);
1✔
399
  }
1✔
400

401
  @SuppressWarnings("unchecked")
402
  @Override
403
  public <T> Optional<T> getThreadLocal(WorkflowThreadLocalInternal<T> key) {
404
    if (!threadLocalMap.containsKey(key)) {
1✔
405
      return Optional.empty();
1✔
406
    }
407
    return Optional.of((T) threadLocalMap.get(key));
1✔
408
  }
409

410
  /**
411
   * @return stack trace of the coroutine thread
412
   */
413
  @Override
414
  public String getStackTrace() {
415
    StackTraceElement[] st = task.getStackTrace();
1✔
416
    StringWriter sw = new StringWriter();
1✔
417
    PrintWriter pw = new PrintWriter(sw);
1✔
418
    pw.append(task.getName());
1✔
419
    pw.append("\n");
1✔
420
    for (StackTraceElement se : st) {
1✔
421
      pw.println("\tat " + se);
1✔
422
    }
423
    return sw.toString();
1✔
424
  }
425

426
  static class YieldWithTimeoutCondition implements Supplier<Boolean> {
427

428
    private final Supplier<Boolean> unblockCondition;
429
    private final long blockedUntil;
430
    private boolean timedOut;
431

432
    YieldWithTimeoutCondition(Supplier<Boolean> unblockCondition, long blockedUntil) {
×
433
      this.unblockCondition = unblockCondition;
×
434
      this.blockedUntil = blockedUntil;
×
435
    }
×
436

437
    boolean isTimedOut() {
438
      return timedOut;
×
439
    }
440

441
    /**
442
     * @return true if condition matched or timed out
443
     */
444
    @Override
445
    public Boolean get() {
446
      boolean result = unblockCondition.get();
×
447
      if (result) {
×
448
        return true;
×
449
      }
450
      long currentTimeMillis = WorkflowInternal.currentTimeMillis();
×
451
      timedOut = currentTimeMillis >= blockedUntil;
×
452
      return timedOut;
×
453
    }
454
  }
455
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc