• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

temporalio / sdk-java / #330

10 Oct 2024 04:35PM CUT coverage: 78.113% (-0.1%) from 78.238%
#330

push

github

web-flow
Test server support for bidi links (#2258)

* Test server support for bidi links

* typo

* license

* feedback

* link validation

* describe fields

* link validation

103 of 163 new or added lines in 3 files covered. (63.19%)

11 existing lines in 4 files now uncovered.

21332 of 27309 relevant lines covered (78.11%)

0.78 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

76.97
/temporal-sdk/src/main/java/io/temporal/internal/sync/WorkflowThreadImpl.java
1
/*
2
 * Copyright (C) 2022 Temporal Technologies, Inc. All Rights Reserved.
3
 *
4
 * Copyright (C) 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved.
5
 *
6
 * Modifications copyright (C) 2017 Uber Technologies, Inc.
7
 *
8
 * Licensed under the Apache License, Version 2.0 (the "License");
9
 * you may not use this material except in compliance with the License.
10
 * You may obtain a copy of the License at
11
 *
12
 *   http://www.apache.org/licenses/LICENSE-2.0
13
 *
14
 * Unless required by applicable law or agreed to in writing, software
15
 * distributed under the License is distributed on an "AS IS" BASIS,
16
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17
 * See the License for the specific language governing permissions and
18
 * limitations under the License.
19
 */
20

21
package io.temporal.internal.sync;
22

23
import com.google.common.base.Preconditions;
24
import io.temporal.common.context.ContextPropagator;
25
import io.temporal.failure.CanceledFailure;
26
import io.temporal.internal.common.NonIdempotentHandle;
27
import io.temporal.internal.context.ContextThreadLocal;
28
import io.temporal.internal.logging.LoggerTag;
29
import io.temporal.internal.replay.ReplayWorkflowContext;
30
import io.temporal.internal.worker.WorkflowExecutorCache;
31
import io.temporal.workflow.Functions;
32
import io.temporal.workflow.Promise;
33
import java.io.PrintWriter;
34
import java.io.StringWriter;
35
import java.util.HashMap;
36
import java.util.List;
37
import java.util.Map;
38
import java.util.Optional;
39
import java.util.concurrent.CompletableFuture;
40
import java.util.concurrent.Future;
41
import java.util.concurrent.RejectedExecutionException;
42
import java.util.function.Supplier;
43
import javax.annotation.Nonnull;
44
import javax.annotation.Nullable;
45
import org.slf4j.Logger;
46
import org.slf4j.LoggerFactory;
47
import org.slf4j.MDC;
48

49
class WorkflowThreadImpl implements WorkflowThread {
50
  /**
51
   * Runnable passed to the thread that wraps a runnable passed to the WorkflowThreadImpl
52
   * constructor.
53
   */
54
  class RunnableWrapper implements Runnable {
55

56
    private final WorkflowThreadContext threadContext;
57
    // TODO: Move MDC injection logic into an interceptor as this context shouldn't be leaked
58
    // to the WorkflowThreadImpl
59
    private final ReplayWorkflowContext replayWorkflowContext;
60
    private String originalName;
61
    private String name;
62
    private final CancellationScopeImpl cancellationScope;
63
    private final List<ContextPropagator> contextPropagators;
64
    private final Map<String, Object> propagatedContexts;
65

66
    RunnableWrapper(
67
        WorkflowThreadContext threadContext,
68
        ReplayWorkflowContext replayWorkflowContext,
69
        String name,
70
        boolean detached,
71
        CancellationScopeImpl parent,
72
        Runnable runnable,
73
        List<ContextPropagator> contextPropagators,
74
        Map<String, Object> propagatedContexts) {
1✔
75
      this.threadContext = threadContext;
1✔
76
      this.replayWorkflowContext = replayWorkflowContext;
1✔
77
      this.name = name;
1✔
78
      this.cancellationScope = new CancellationScopeImpl(detached, runnable, parent);
1✔
79
      Preconditions.checkState(
1✔
80
          context.getStatus() == Status.CREATED, "threadContext not in CREATED state");
1✔
81
      this.contextPropagators = contextPropagators;
1✔
82
      this.propagatedContexts = propagatedContexts;
1✔
83
    }
1✔
84

85
    @Override
86
    public void run() {
87
      Thread thread = Thread.currentThread();
1✔
88
      originalName = thread.getName();
1✔
89
      thread.setName(name);
1✔
90

91
      threadContext.initializeCurrentThread(thread);
1✔
92
      DeterministicRunnerImpl.setCurrentThreadInternal(WorkflowThreadImpl.this);
1✔
93

94
      MDC.put(LoggerTag.WORKFLOW_ID, replayWorkflowContext.getWorkflowId());
1✔
95
      MDC.put(LoggerTag.WORKFLOW_TYPE, replayWorkflowContext.getWorkflowType().getName());
1✔
96
      MDC.put(LoggerTag.RUN_ID, replayWorkflowContext.getRunId());
1✔
97
      MDC.put(LoggerTag.TASK_QUEUE, replayWorkflowContext.getTaskQueue());
1✔
98
      MDC.put(LoggerTag.NAMESPACE, replayWorkflowContext.getNamespace());
1✔
99

100
      // Repopulate the context(s)
101
      ContextThreadLocal.setContextPropagators(this.contextPropagators);
1✔
102
      ContextThreadLocal.propagateContextToCurrentThread(this.propagatedContexts);
1✔
103
      try {
104
        // initialYield blocks thread until the first runUntilBlocked is called.
105
        // Otherwise, r starts executing without control of the sync.
106
        threadContext.initialYield();
1✔
107
        cancellationScope.run();
1✔
108
      } catch (DestroyWorkflowThreadError e) {
1✔
109
        if (!threadContext.isDestroyRequested()) {
1✔
110
          threadContext.setUnhandledException(e);
1✔
111
        }
112
      } catch (Error e) {
1✔
113
        threadContext.setUnhandledException(e);
1✔
114
      } catch (CanceledFailure e) {
×
115
        if (!isCancelRequested()) {
×
116
          threadContext.setUnhandledException(e);
×
117
        }
118
        if (log.isDebugEnabled()) {
×
119
          log.debug(String.format("Workflow thread \"%s\" run canceled", name));
×
120
        }
121
      } catch (Throwable e) {
1✔
122
        threadContext.setUnhandledException(e);
1✔
123
      } finally {
124
        DeterministicRunnerImpl.setCurrentThreadInternal(null);
1✔
125
        threadContext.makeDone();
1✔
126
        thread.setName(originalName);
1✔
127
        MDC.clear();
1✔
128
      }
129
    }
1✔
130

131
    public String getName() {
132
      return name;
1✔
133
    }
134

135
    StackTraceElement[] getStackTrace() {
136
      @Nullable Thread thread = threadContext.getCurrentThread();
1✔
137
      if (thread != null) {
1✔
138
        return thread.getStackTrace();
1✔
139
      }
140
      return new StackTraceElement[0];
×
141
    }
142

143
    public void setName(String name) {
144
      this.name = name;
×
145
      @Nullable Thread thread = threadContext.getCurrentThread();
×
146
      if (thread != null) {
×
147
        thread.setName(name);
×
148
      }
149
    }
×
150
  }
151

152
  private static final Logger log = LoggerFactory.getLogger(WorkflowThreadImpl.class);
1✔
153

154
  private final WorkflowThreadExecutor workflowThreadExecutor;
155
  private final WorkflowThreadContext context;
156
  private final WorkflowExecutorCache cache;
157
  private final SyncWorkflowContext syncWorkflowContext;
158

159
  private final DeterministicRunnerImpl runner;
160
  private final RunnableWrapper task;
161
  private final int priority;
162
  private Future<?> taskFuture;
163
  private final Map<WorkflowThreadLocalInternal<?>, Object> threadLocalMap = new HashMap<>();
1✔
164

165
  WorkflowThreadImpl(
166
      WorkflowThreadExecutor workflowThreadExecutor,
167
      SyncWorkflowContext syncWorkflowContext,
168
      DeterministicRunnerImpl runner,
169
      @Nonnull String name,
170
      int priority,
171
      boolean detached,
172
      CancellationScopeImpl parentCancellationScope,
173
      Runnable runnable,
174
      WorkflowExecutorCache cache,
175
      List<ContextPropagator> contextPropagators,
176
      Map<String, Object> propagatedContexts) {
1✔
177
    this.workflowThreadExecutor = workflowThreadExecutor;
1✔
178
    this.syncWorkflowContext = Preconditions.checkNotNull(syncWorkflowContext);
1✔
179
    this.runner = runner;
1✔
180
    this.context = new WorkflowThreadContext(runner.getLock());
1✔
181
    this.cache = cache;
1✔
182
    this.priority = priority;
1✔
183
    this.task =
1✔
184
        new RunnableWrapper(
185
            context,
186
            syncWorkflowContext.getReplayContext(),
1✔
187
            Preconditions.checkNotNull(name, "Thread name shouldn't be null"),
1✔
188
            detached,
189
            parentCancellationScope,
190
            runnable,
191
            contextPropagators,
192
            propagatedContexts);
193
  }
1✔
194

195
  @Override
196
  public void run() {
197
    throw new UnsupportedOperationException("not used");
×
198
  }
199

200
  @Override
201
  public boolean isDetached() {
202
    return task.cancellationScope.isDetached();
×
203
  }
204

205
  @Override
206
  public void cancel() {
207
    task.cancellationScope.cancel();
1✔
208
  }
1✔
209

210
  @Override
211
  public void cancel(String reason) {
212
    task.cancellationScope.cancel(reason);
1✔
213
  }
1✔
214

215
  @Override
216
  public String getCancellationReason() {
217
    return task.cancellationScope.getCancellationReason();
×
218
  }
219

220
  @Override
221
  public boolean isCancelRequested() {
222
    return task.cancellationScope.isCancelRequested();
×
223
  }
224

225
  @Override
226
  public Promise<String> getCancellationRequest() {
227
    return task.cancellationScope.getCancellationRequest();
×
228
  }
229

230
  @Override
231
  public void start() {
232
    context.verifyAndStart();
1✔
233
    while (true) {
234
      try {
235
        taskFuture = workflowThreadExecutor.submit(task);
1✔
236
        return;
1✔
237
      } catch (RejectedExecutionException e) {
1✔
238
        if (cache != null) {
1✔
239
          SyncWorkflowContext workflowContext = getWorkflowContext();
1✔
240
          ReplayWorkflowContext context = workflowContext.getReplayContext();
1✔
241
          boolean evicted =
1✔
242
              cache.evictAnyNotInProcessing(
1✔
243
                  context.getWorkflowExecution(), workflowContext.getMetricsScope());
1✔
244
          if (!evicted) {
1✔
245
            // Note here we need to throw error, not exception. Otherwise it will be
246
            // translated to workflow execution exception and instead of failing the
247
            // workflow task we will be failing the workflow.
UNCOV
248
            throw new WorkflowRejectedExecutionError(e);
×
249
          }
250
        } else {
1✔
251
          throw new WorkflowRejectedExecutionError(e);
1✔
252
        }
253
      }
1✔
254
    }
255
  }
256

257
  @Override
258
  public boolean isStarted() {
259
    return context.getStatus() != Status.CREATED;
×
260
  }
261

262
  @Override
263
  public WorkflowThreadContext getWorkflowThreadContext() {
264
    return context;
1✔
265
  }
266

267
  @Override
268
  public DeterministicRunnerImpl getRunner() {
269
    return runner;
1✔
270
  }
271

272
  @Override
273
  public SyncWorkflowContext getWorkflowContext() {
274
    return syncWorkflowContext;
1✔
275
  }
276

277
  @Override
278
  public void setName(String name) {
279
    task.setName(name);
×
280
  }
×
281

282
  @Override
283
  public String getName() {
284
    return task.getName();
1✔
285
  }
286

287
  @Override
288
  public long getId() {
289
    return hashCode();
×
290
  }
291

292
  @Override
293
  public int getPriority() {
294
    return priority;
1✔
295
  }
296

297
  @Override
298
  public boolean runUntilBlocked(long deadlockDetectionTimeoutMs) {
299
    if (taskFuture == null) {
1✔
300
      start();
1✔
301
    }
302
    return context.runUntilBlocked(deadlockDetectionTimeoutMs);
1✔
303
  }
304

305
  @Override
306
  public NonIdempotentHandle lockDeadlockDetector() {
307
    return context.lockDeadlockDetector();
1✔
308
  }
309

310
  @Override
311
  public boolean isDone() {
312
    return context.isDone();
1✔
313
  }
314

315
  @Override
316
  public Throwable getUnhandledException() {
317
    return context.getUnhandledException();
1✔
318
  }
319

320
  /**
321
   * Evaluates function in the threadContext of the coroutine without unblocking it. Used to get
322
   * current coroutine status, like stack trace.
323
   *
324
   * @param function Parameter is reason for current goroutine blockage.
325
   */
326
  public void evaluateInCoroutineContext(Functions.Proc1<String> function) {
327
    context.evaluateInCoroutineContext(function);
×
328
  }
×
329

330
  /**
331
   * Interrupt coroutine by throwing DestroyWorkflowThreadError from an await method it is blocked
332
   * on and return underlying Future to be waited on.
333
   */
334
  @Override
335
  public Future<?> stopNow() {
336
    // Cannot call destroy() on itself
337
    @Nullable Thread thread = context.getCurrentThread();
1✔
338
    if (Thread.currentThread().equals(thread)) {
1✔
339
      throw new Error("Cannot call destroy on itself: " + thread.getName());
×
340
    }
341
    context.initiateDestroy();
1✔
342
    if (taskFuture == null) {
1✔
343
      return getCompletedFuture();
1✔
344
    }
345
    return taskFuture;
1✔
346
  }
347

348
  private Future<?> getCompletedFuture() {
349
    CompletableFuture<String> f = new CompletableFuture<>();
1✔
350
    f.complete("done");
1✔
351
    return f;
1✔
352
  }
353

354
  @Override
355
  public void addStackTrace(StringBuilder result) {
356
    result.append(getName());
1✔
357
    @Nullable Thread thread = context.getCurrentThread();
1✔
358
    if (thread == null) {
1✔
359
      result.append("(NEW)");
×
360
      return;
×
361
    }
362
    result
1✔
363
        .append(": (BLOCKED on ")
1✔
364
        .append(getWorkflowThreadContext().getYieldReason())
1✔
365
        .append(")\n");
1✔
366
    // These numbers might change if implementation changes.
367
    int omitTop = 5;
1✔
368
    int omitBottom = 7;
1✔
369
    // TODO it's not a good idea to rely on the name to understand the thread type. Instead of that
370
    // we would better
371
    // assign an explicit thread type enum to the threads. This will be especially important when we
372
    // refactor
373
    // root and workflow-method
374
    // thread names into names that will include workflowId
375
    if (DeterministicRunnerImpl.WORKFLOW_ROOT_THREAD_NAME.equals(getName())) {
1✔
376
      // TODO revisit this number
377
      omitBottom = 11;
×
378
    } else if (getName().startsWith(WorkflowMethodThreadNameStrategy.WORKFLOW_MAIN_THREAD_PREFIX)) {
1✔
379
      // TODO revisit this number
380
      omitBottom = 11;
1✔
381
    }
382
    StackTraceElement[] stackTrace = thread.getStackTrace();
1✔
383
    for (int i = omitTop; i < stackTrace.length - omitBottom; i++) {
1✔
384
      StackTraceElement e = stackTrace[i];
1✔
385
      if (i == omitTop && "await".equals(e.getMethodName())) continue;
1✔
386
      result.append(e);
1✔
387
      result.append("\n");
1✔
388
    }
389
  }
1✔
390

391
  @Override
392
  public void yield(String reason, Supplier<Boolean> unblockCondition) {
393
    context.yield(reason, unblockCondition);
1✔
394
  }
1✔
395

396
  @Override
397
  public void exitThread() {
398
    runner.exit();
1✔
399
    throw new DestroyWorkflowThreadError("exit");
1✔
400
  }
401

402
  @Override
403
  public <T> void setThreadLocal(WorkflowThreadLocalInternal<T> key, T value) {
404
    threadLocalMap.put(key, value);
1✔
405
  }
1✔
406

407
  /**
408
   * Retrieve data from thread locals. Returns 1. not found (an empty Optional) 2. found but null
409
   * (an Optional of an empty Optional) 3. found and non-null (an Optional of an Optional of a
410
   * value). The type nesting is because Java Optionals cannot understand "Some null" vs "None",
411
   * which is exactly what we need here.
412
   *
413
   * @param key
414
   * @return one of three cases
415
   * @param <T>
416
   */
417
  @SuppressWarnings("unchecked")
418
  public <T> Optional<Optional<T>> getThreadLocal(WorkflowThreadLocalInternal<T> key) {
419
    if (!threadLocalMap.containsKey(key)) {
1✔
420
      return Optional.empty();
1✔
421
    }
422
    return Optional.of(Optional.ofNullable((T) threadLocalMap.get(key)));
1✔
423
  }
424

425
  /**
426
   * @return stack trace of the coroutine thread
427
   */
428
  @Override
429
  public String getStackTrace() {
430
    StackTraceElement[] st = task.getStackTrace();
1✔
431
    StringWriter sw = new StringWriter();
1✔
432
    PrintWriter pw = new PrintWriter(sw);
1✔
433
    pw.append(task.getName());
1✔
434
    pw.append("\n");
1✔
435
    for (StackTraceElement se : st) {
1✔
436
      pw.println("\tat " + se);
1✔
437
    }
438
    return sw.toString();
1✔
439
  }
440

441
  static class YieldWithTimeoutCondition implements Supplier<Boolean> {
442

443
    private final Supplier<Boolean> unblockCondition;
444
    private final long blockedUntil;
445
    private boolean timedOut;
446

447
    YieldWithTimeoutCondition(Supplier<Boolean> unblockCondition, long blockedUntil) {
×
448
      this.unblockCondition = unblockCondition;
×
449
      this.blockedUntil = blockedUntil;
×
450
    }
×
451

452
    boolean isTimedOut() {
453
      return timedOut;
×
454
    }
455

456
    /**
457
     * @return true if condition matched or timed out
458
     */
459
    @Override
460
    public Boolean get() {
461
      boolean result = unblockCondition.get();
×
462
      if (result) {
×
463
        return true;
×
464
      }
465
      long currentTimeMillis = WorkflowInternal.currentTimeMillis();
×
466
      timedOut = currentTimeMillis >= blockedUntil;
×
467
      return timedOut;
×
468
    }
469
  }
470
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc