• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

uber / cadence-java-client / 2382

12 Jun 2024 05:44PM CUT coverage: 61.466% (-0.009%) from 61.475%
2382

Pull #910

buildkite

shijiesheng
lint
Pull Request #910: Fix incorrect span activation for local activities

8 of 8 new or added lines in 1 file covered. (100.0%)

1 existing line in 1 file now uncovered.

11973 of 19479 relevant lines covered (61.47%)

0.61 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

77.32
/src/main/java/com/uber/cadence/internal/sync/WorkflowThreadImpl.java
1
/*
2
 *  Copyright 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved.
3
 *
4
 *  Modifications copyright (C) 2017 Uber Technologies, Inc.
5
 *
6
 *  Licensed under the Apache License, Version 2.0 (the "License"). You may not
7
 *  use this file except in compliance with the License. A copy of the License is
8
 *  located at
9
 *
10
 *  http://aws.amazon.com/apache2.0
11
 *
12
 *  or in the "license" file accompanying this file. This file is distributed on
13
 *  an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
14
 *  express or implied. See the License for the specific language governing
15
 *  permissions and limitations under the License.
16
 */
17

18
package com.uber.cadence.internal.sync;
19

20
import com.google.common.util.concurrent.RateLimiter;
21
import com.uber.cadence.context.ContextPropagator;
22
import com.uber.cadence.internal.context.ContextThreadLocal;
23
import com.uber.cadence.internal.logging.LoggerTag;
24
import com.uber.cadence.internal.metrics.MetricsType;
25
import com.uber.cadence.internal.replay.DeciderCache;
26
import com.uber.cadence.internal.replay.DecisionContext;
27
import com.uber.cadence.workflow.Promise;
28
import java.io.PrintWriter;
29
import java.io.StringWriter;
30
import java.util.HashMap;
31
import java.util.List;
32
import java.util.Map;
33
import java.util.Optional;
34
import java.util.concurrent.*;
35
import java.util.function.Consumer;
36
import java.util.function.Supplier;
37
import org.slf4j.Logger;
38
import org.slf4j.LoggerFactory;
39
import org.slf4j.MDC;
40

41
class WorkflowThreadImpl implements WorkflowThread {
42
  private static final RateLimiter metricsRateLimiter = RateLimiter.create(1);
1✔
43

44
  /**
45
   * Runnable passed to the thread that wraps a runnable passed to the WorkflowThreadImpl
46
   * constructor.
47
   */
48
  class RunnableWrapper implements Runnable {
49

50
    private final WorkflowThreadContext threadContext;
51
    private final DecisionContext decisionContext;
52
    private String originalName;
53
    private String name;
54
    private CancellationScopeImpl cancellationScope;
55
    private List<ContextPropagator> contextPropagators;
56
    private Map<String, Object> propagatedContexts;
57

58
    RunnableWrapper(
59
        WorkflowThreadContext threadContext,
60
        DecisionContext decisionContext,
61
        String name,
62
        boolean detached,
63
        CancellationScopeImpl parent,
64
        Runnable runnable,
65
        List<ContextPropagator> contextPropagators,
66
        Map<String, Object> propagatedContexts) {
1✔
67
      this.threadContext = threadContext;
1✔
68
      this.decisionContext = decisionContext;
1✔
69
      this.name = name;
1✔
70
      cancellationScope = new CancellationScopeImpl(detached, runnable, parent);
1✔
71
      if (context.getStatus() != Status.CREATED) {
1✔
72
        throw new IllegalStateException("threadContext not in CREATED state");
×
73
      }
74
      this.contextPropagators = contextPropagators;
1✔
75
      this.propagatedContexts = propagatedContexts;
1✔
76
    }
1✔
77

78
    @Override
79
    public void run() {
80
      thread = Thread.currentThread();
1✔
81
      originalName = thread.getName();
1✔
82
      thread.setName(name);
1✔
83
      DeterministicRunnerImpl.setCurrentThreadInternal(WorkflowThreadImpl.this);
1✔
84
      decisionContext.getWorkflowId();
1✔
85
      MDC.put(LoggerTag.WORKFLOW_ID, decisionContext.getWorkflowId());
1✔
86
      MDC.put(LoggerTag.WORKFLOW_TYPE, decisionContext.getWorkflowType().getName());
1✔
87
      MDC.put(LoggerTag.RUN_ID, decisionContext.getRunId());
1✔
88
      MDC.put(LoggerTag.TASK_LIST, decisionContext.getTaskList());
1✔
89
      MDC.put(LoggerTag.DOMAIN, decisionContext.getDomain());
1✔
90

91
      // Repopulate the context(s)
92
      ContextThreadLocal.setContextPropagators(this.contextPropagators);
1✔
93
      ContextThreadLocal.propagateContextToCurrentThread(this.propagatedContexts);
1✔
94

95
      try {
96
        // initialYield blocks thread until the first runUntilBlocked is called.
97
        // Otherwise r starts executing without control of the sync.
98
        threadContext.initialYield();
1✔
99
        cancellationScope.run();
1✔
100
      } catch (DestroyWorkflowThreadError e) {
1✔
101
        if (!threadContext.isDestroyRequested()) {
1✔
102
          threadContext.setUnhandledException(e);
1✔
103
        }
104
      } catch (Error e) {
1✔
105
        // Error aborts decision, not fails the workflow.
106
        if (log.isErrorEnabled() && !root) {
1✔
107
          StringWriter sw = new StringWriter();
1✔
108
          PrintWriter pw = new PrintWriter(sw, true);
1✔
109
          e.printStackTrace(pw);
1✔
110
          String stackTrace = sw.getBuffer().toString();
1✔
111
          log.error(
1✔
112
              String.format("Workflow thread \"%s\" run failed with Error:\n%s", name, stackTrace));
1✔
113
        }
114
        threadContext.setUnhandledException(e);
1✔
115
      } catch (CancellationException e) {
1✔
116
        if (!isCancelRequested()) {
1✔
117
          threadContext.setUnhandledException(e);
1✔
118
        }
119
        if (log.isDebugEnabled()) {
1✔
120
          log.debug(String.format("Workflow thread \"%s\" run cancelled", name));
×
121
        }
122
      } catch (Throwable e) {
1✔
123
        if (log.isWarnEnabled() && !root) {
1✔
124
          StringWriter sw = new StringWriter();
×
125
          PrintWriter pw = new PrintWriter(sw, true);
×
126
          e.printStackTrace(pw);
×
127
          String stackTrace = sw.getBuffer().toString();
×
128
          log.warn(
×
129
              String.format(
×
130
                  "Workflow thread \"%s\" run failed with unhandled exception:\n%s",
131
                  name, stackTrace));
132
        }
133
        threadContext.setUnhandledException(e);
1✔
134
      } finally {
135
        ContextThreadLocal.unsetCurrentContext();
1✔
136
        DeterministicRunnerImpl.setCurrentThreadInternal(null);
1✔
137
        threadContext.setStatus(Status.DONE);
1✔
138
        thread.setName(originalName);
1✔
139
        thread = null;
1✔
140
        MDC.clear();
1✔
141
      }
142
    }
1✔
143

144
    public String getName() {
145
      return name;
1✔
146
    }
147

148
    StackTraceElement[] getStackTrace() {
149
      if (thread != null) {
×
150
        return thread.getStackTrace();
×
151
      }
152
      return new StackTraceElement[0];
×
153
    }
154

155
    public void setName(String name) {
156
      this.name = name;
×
157
      if (thread != null) {
×
158
        thread.setName(name);
×
159
      }
160
    }
×
161
  }
162

163
  private static final Logger log = LoggerFactory.getLogger(WorkflowThreadImpl.class);
1✔
164

165
  private final boolean root;
166
  private final ExecutorService threadPool;
167
  private final WorkflowThreadContext context;
168
  private DeciderCache cache;
169
  private final DeterministicRunnerImpl runner;
170
  private final RunnableWrapper task;
171
  private Thread thread;
172
  private Future<?> taskFuture;
173
  private final Map<WorkflowThreadLocalInternal<?>, Object> threadLocalMap = new HashMap<>();
1✔
174

175
  /**
176
   * If not 0 then thread is blocked on a sleep (or on an operation with a timeout). The value is
177
   * the time in milliseconds (as in currentTimeMillis()) when thread will continue. Note that
178
   * thread still has to be called for evaluation as other threads might interrupt the blocking
179
   * call.
180
   */
181
  private long blockedUntil;
182

183
  WorkflowThreadImpl(
184
      boolean root,
185
      ExecutorService threadPool,
186
      DeterministicRunnerImpl runner,
187
      String name,
188
      boolean detached,
189
      CancellationScopeImpl parentCancellationScope,
190
      Runnable runnable,
191
      DeciderCache cache,
192
      List<ContextPropagator> contextPropagators,
193
      Map<String, Object> propagatedContexts) {
1✔
194
    this.root = root;
1✔
195
    this.threadPool = threadPool;
1✔
196
    this.runner = runner;
1✔
197
    this.context = new WorkflowThreadContext(runner.getLock());
1✔
198
    this.cache = cache;
1✔
199

200
    if (name == null) {
1✔
201
      name = "workflow-" + super.hashCode();
1✔
202
    }
203

204
    this.task =
1✔
205
        new RunnableWrapper(
206
            context,
207
            runner.getDecisionContext().getContext(),
1✔
208
            name,
209
            detached,
210
            parentCancellationScope,
211
            runnable,
212
            contextPropagators,
213
            propagatedContexts);
214
  }
1✔
215

216
  @Override
217
  public void run() {
218
    throw new UnsupportedOperationException("not used");
×
219
  }
220

221
  @Override
222
  public boolean isDetached() {
223
    return task.cancellationScope.isDetached();
×
224
  }
225

226
  @Override
227
  public void cancel() {
228
    task.cancellationScope.cancel();
×
229
  }
×
230

231
  @Override
232
  public void cancel(String reason) {
233
    task.cancellationScope.cancel(reason);
1✔
234
  }
1✔
235

236
  @Override
237
  public String getCancellationReason() {
238
    return task.cancellationScope.getCancellationReason();
×
239
  }
240

241
  @Override
242
  public boolean isCancelRequested() {
243
    return task.cancellationScope.isCancelRequested();
1✔
244
  }
245

246
  @Override
247
  public Promise<String> getCancellationRequest() {
248
    return task.cancellationScope.getCancellationRequest();
×
249
  }
250

251
  @Override
252
  public void start() {
253
    if (context.getStatus() != Status.CREATED) {
1✔
254
      throw new IllegalThreadStateException("already started");
×
255
    }
256
    context.setStatus(Status.RUNNING);
1✔
257

258
    if (metricsRateLimiter.tryAcquire(1)) {
1✔
259
      getDecisionContext()
1✔
260
          .getMetricsScope()
1✔
261
          .gauge(MetricsType.WORKFLOW_ACTIVE_THREAD_COUNT)
1✔
262
          .update(((ThreadPoolExecutor) threadPool).getActiveCount());
1✔
263
    }
264

265
    while (true) {
266
      try {
267
        taskFuture = threadPool.submit(task);
1✔
268
        return;
1✔
269
      } catch (RejectedExecutionException e) {
1✔
270
        getDecisionContext()
1✔
271
            .getMetricsScope()
1✔
272
            .counter(MetricsType.STICKY_CACHE_THREAD_FORCED_EVICTION)
1✔
273
            .inc(1);
1✔
274
        if (cache != null) {
1✔
275
          boolean evicted =
1✔
276
              cache.evictAnyNotInProcessing(
1✔
277
                  this.runner.getDecisionContext().getContext().getRunId());
1✔
278
          if (!evicted) {
1✔
279
            // Note here we need to throw error, not exception. Otherwise it will be
280
            // translated to workflow execution exception and instead of failing the
281
            // decision we will be failing the workflow.
UNCOV
282
            throw new WorkflowRejectedExecutionError(e);
×
283
          }
284
        } else {
1✔
285
          throw new WorkflowRejectedExecutionError(e);
1✔
286
        }
287
      }
1✔
288
    }
289
  }
290

291
  public WorkflowThreadContext getContext() {
292
    return context;
1✔
293
  }
294

295
  @Override
296
  public DeterministicRunnerImpl getRunner() {
297
    return runner;
1✔
298
  }
299

300
  @Override
301
  public SyncDecisionContext getDecisionContext() {
302
    return runner.getDecisionContext();
1✔
303
  }
304

305
  @Override
306
  public void setName(String name) {
307
    task.setName(name);
×
308
  }
×
309

310
  @Override
311
  public String getName() {
312
    return task.getName();
1✔
313
  }
314

315
  @Override
316
  public long getId() {
317
    return hashCode();
×
318
  }
319

320
  @Override
321
  public long getBlockedUntil() {
322
    return blockedUntil;
1✔
323
  }
324

325
  private void setBlockedUntil(long blockedUntil) {
326
    this.blockedUntil = blockedUntil;
1✔
327
  }
1✔
328

329
  /** @return true if coroutine made some progress. */
330
  @Override
331
  public boolean runUntilBlocked() {
332
    if (taskFuture == null) {
1✔
333
      start();
1✔
334
    }
335
    return context.runUntilBlocked();
1✔
336
  }
337

338
  @Override
339
  public boolean isDone() {
340
    return context.isDone();
1✔
341
  }
342

343
  public Thread.State getState() {
344
    if (context.getStatus() == Status.YIELDED) {
×
345
      return Thread.State.BLOCKED;
×
346
    } else if (context.getStatus() == Status.DONE) {
×
347
      return Thread.State.TERMINATED;
×
348
    } else {
349
      return Thread.State.RUNNABLE;
×
350
    }
351
  }
352

353
  @Override
354
  public Throwable getUnhandledException() {
355
    return context.getUnhandledException();
1✔
356
  }
357

358
  /**
359
   * Evaluates function in the threadContext of the coroutine without unblocking it. Used to get
360
   * current coroutine status, like stack trace.
361
   *
362
   * @param function Parameter is reason for current goroutine blockage.
363
   */
364
  public void evaluateInCoroutineContext(Consumer<String> function) {
365
    context.evaluateInCoroutineContext(function);
×
366
  }
×
367

368
  /**
369
   * Interrupt coroutine by throwing DestroyWorkflowThreadError from a await method it is blocked on
370
   * and return underlying Future to be waited on.
371
   */
372
  @Override
373
  public Future<?> stopNow() {
374
    // Cannot call destroy() on itself
375
    if (thread == Thread.currentThread()) {
1✔
376
      throw new Error("Cannot call destroy on itself: " + thread.getName());
×
377
    }
378
    context.destroy();
1✔
379
    if (!context.isDone()) {
1✔
380
      throw new RuntimeException(
×
381
          "Couldn't destroy the thread. " + "The blocked thread stack trace: " + getStackTrace());
×
382
    }
383
    if (taskFuture == null) {
1✔
384
      return getCompletedFuture();
1✔
385
    }
386
    return taskFuture;
1✔
387
  }
388

389
  private Future<?> getCompletedFuture() {
390
    CompletableFuture<String> f = new CompletableFuture<>();
1✔
391
    f.complete("done");
1✔
392
    return f;
1✔
393
  }
394

395
  @Override
396
  public void addStackTrace(StringBuilder result) {
397
    result.append(getName());
1✔
398
    if (thread == null) {
1✔
399
      result.append("(NEW)");
×
400
      return;
×
401
    }
402
    result.append(": (BLOCKED on ").append(getContext().getYieldReason()).append(")\n");
1✔
403
    // These numbers might change if implementation changes.
404
    int omitTop = 5;
1✔
405
    int omitBottom = 7;
1✔
406
    if (DeterministicRunnerImpl.WORKFLOW_ROOT_THREAD_NAME.equals(getName())) {
1✔
407
      omitBottom = 11;
1✔
408
    }
409
    StackTraceElement[] stackTrace = thread.getStackTrace();
1✔
410
    for (int i = omitTop; i < stackTrace.length - omitBottom; i++) {
1✔
411
      StackTraceElement e = stackTrace[i];
1✔
412
      if (i == omitTop && "await".equals(e.getMethodName())) continue;
1✔
413
      result.append(e);
1✔
414
      result.append("\n");
1✔
415
    }
416
  }
1✔
417

418
  @Override
419
  public void yield(String reason, Supplier<Boolean> unblockCondition) {
420
    context.yield(reason, unblockCondition);
1✔
421
  }
1✔
422

423
  @Override
424
  public boolean yield(long timeoutMillis, String reason, Supplier<Boolean> unblockCondition)
425
      throws DestroyWorkflowThreadError {
426
    if (timeoutMillis == 0) {
1✔
427
      return unblockCondition.get();
1✔
428
    }
429
    long blockedUntil = WorkflowInternal.currentTimeMillis() + timeoutMillis;
1✔
430
    setBlockedUntil(blockedUntil);
1✔
431
    YieldWithTimeoutCondition condition =
1✔
432
        new YieldWithTimeoutCondition(unblockCondition, blockedUntil);
433
    WorkflowThread.await(reason, condition);
1✔
434
    return !condition.isTimedOut();
1✔
435
  }
436

437
  @Override
438
  public <R> void exitThread(R value) {
439
    runner.exit(value);
1✔
440
    throw new DestroyWorkflowThreadError("exit");
1✔
441
  }
442

443
  @Override
444
  public <T> void setThreadLocal(WorkflowThreadLocalInternal<T> key, T value) {
445
    threadLocalMap.put(key, value);
1✔
446
  }
1✔
447

448
  @SuppressWarnings("unchecked")
449
  @Override
450
  public <T> Optional<T> getThreadLocal(WorkflowThreadLocalInternal<T> key) {
451
    if (!threadLocalMap.containsKey(key)) {
1✔
452
      return Optional.empty();
1✔
453
    }
454
    return Optional.of((T) threadLocalMap.get(key));
1✔
455
  }
456

457
  /** @return stack trace of the coroutine thread */
458
  @Override
459
  public String getStackTrace() {
460
    StackTraceElement[] st = task.getStackTrace();
×
461
    StringWriter sw = new StringWriter();
×
462
    PrintWriter pw = new PrintWriter(sw);
×
463
    for (StackTraceElement se : st) {
×
464
      pw.println("\tat " + se);
×
465
    }
466
    return sw.toString();
×
467
  }
468

469
  static class YieldWithTimeoutCondition implements Supplier<Boolean> {
470

471
    private final Supplier<Boolean> unblockCondition;
472
    private final long blockedUntil;
473
    private boolean timedOut;
474

475
    YieldWithTimeoutCondition(Supplier<Boolean> unblockCondition, long blockedUntil) {
1✔
476
      this.unblockCondition = unblockCondition;
1✔
477
      this.blockedUntil = blockedUntil;
1✔
478
    }
1✔
479

480
    boolean isTimedOut() {
481
      return timedOut;
1✔
482
    }
483

484
    /** @return true if condition matched or timed out */
485
    @Override
486
    public Boolean get() {
487
      boolean result = unblockCondition.get();
1✔
488
      if (result) {
1✔
489
        return true;
1✔
490
      }
491
      timedOut = WorkflowInternal.currentTimeMillis() >= blockedUntil;
1✔
492
      return timedOut;
1✔
493
    }
494
  }
495
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc