• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

uber / cadence-java-client / 2241

11 Apr 2024 07:03PM UTC coverage: 60.286% (+0.02%) from 60.263%
2241

push

buildkite

web-flow
Enable open tracing propagation in workflow lifecycles (#876)

How it works
Context Propagation in Cadence (Customer)

On start workflow API, trace span with context is written into workflow start event attributes, which is persisted in cadence server side.
On workflow-start in client, this span is referenced and activated on execute workflow.
On scheduling child workflows and activities (including local activities), the span is written into child workflow's workflow start event attributes and activity's schedule activity event attributes.
On processing activities/childworkflows, the persisted span is referenced and activated again.

Sample Spans

Notes: Poll + Respond apis spans are omitted here

{traceId:1, spanId:2, parentId:0, operationName:"cadence-RegisterDomain"}
{traceId:1, spanId:3, parentId:2, operationName:"Test Started"}
{traceId:1, spanId:18, parentId:3, operationName:"cadence-StartWorkflowExecution"}
{traceId:1, spanId:19, parentId:18, operationName:"cadence-GetWorkflowExecutionHistory"}
{traceId:1, spanId:21, parentId:18, operationName:"cadence-ExecuteWorkflow"}
{traceId:1, spanId:24, parentId:21, operationName:"cadence-ExecuteActivity"}
{traceId:1, spanId:25, parentId:24, operationName:"cadence-RespondActivityTaskCompleted"}
{traceId:1, spanId:31, parentId:21, operationName:"cadence-ExecuteWorkflow"}
{traceId:1, spanId:32, parentId:31, operationName:"cadence-ExecuteLocalActivity"}

What changed?

added an Propagator entity with tracing extract/inject logic
added trace activation logic in activity and workflow executors
added trace activation on service client (Tchannel + GRPC)
Why?

improve observability

How did you test it?

integration test

111 of 175 new or added lines in 13 files covered. (63.43%)

12 existing lines in 6 files now uncovered.

11455 of 19001 relevant lines covered (60.29%)

0.6 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

77.2
/src/main/java/com/uber/cadence/internal/sync/WorkflowThreadImpl.java
1
/*
2
 *  Copyright 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved.
3
 *
4
 *  Modifications copyright (C) 2017 Uber Technologies, Inc.
5
 *
6
 *  Licensed under the Apache License, Version 2.0 (the "License"). You may not
7
 *  use this file except in compliance with the License. A copy of the License is
8
 *  located at
9
 *
10
 *  http://aws.amazon.com/apache2.0
11
 *
12
 *  or in the "license" file accompanying this file. This file is distributed on
13
 *  an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
14
 *  express or implied. See the License for the specific language governing
15
 *  permissions and limitations under the License.
16
 */
17

18
package com.uber.cadence.internal.sync;
19

20
import com.google.common.util.concurrent.RateLimiter;
21
import com.uber.cadence.context.ContextPropagator;
22
import com.uber.cadence.internal.context.ContextThreadLocal;
23
import com.uber.cadence.internal.logging.LoggerTag;
24
import com.uber.cadence.internal.metrics.MetricsType;
25
import com.uber.cadence.internal.replay.DeciderCache;
26
import com.uber.cadence.internal.replay.DecisionContext;
27
import com.uber.cadence.workflow.Promise;
28
import java.io.PrintWriter;
29
import java.io.StringWriter;
30
import java.util.HashMap;
31
import java.util.List;
32
import java.util.Map;
33
import java.util.Optional;
34
import java.util.concurrent.*;
35
import java.util.function.Consumer;
36
import java.util.function.Supplier;
37
import org.slf4j.Logger;
38
import org.slf4j.LoggerFactory;
39
import org.slf4j.MDC;
40

41
class WorkflowThreadImpl implements WorkflowThread {
42
  private static final RateLimiter metricsRateLimiter = RateLimiter.create(1);
1✔
43

44
  /**
45
   * Runnable passed to the thread that wraps a runnable passed to the WorkflowThreadImpl
46
   * constructor.
47
   */
48
  class RunnableWrapper implements Runnable {
49

50
    private final WorkflowThreadContext threadContext;
51
    private final DecisionContext decisionContext;
52
    private String originalName;
53
    private String name;
54
    private CancellationScopeImpl cancellationScope;
55
    private List<ContextPropagator> contextPropagators;
56
    private Map<String, Object> propagatedContexts;
57

58
    RunnableWrapper(
59
        WorkflowThreadContext threadContext,
60
        DecisionContext decisionContext,
61
        String name,
62
        boolean detached,
63
        CancellationScopeImpl parent,
64
        Runnable runnable,
65
        List<ContextPropagator> contextPropagators,
66
        Map<String, Object> propagatedContexts) {
1✔
67
      this.threadContext = threadContext;
1✔
68
      this.decisionContext = decisionContext;
1✔
69
      this.name = name;
1✔
70
      cancellationScope = new CancellationScopeImpl(detached, runnable, parent);
1✔
71
      if (context.getStatus() != Status.CREATED) {
1✔
72
        throw new IllegalStateException("threadContext not in CREATED state");
×
73
      }
74
      this.contextPropagators = contextPropagators;
1✔
75
      this.propagatedContexts = propagatedContexts;
1✔
76
    }
1✔
77

78
    @Override
79
    public void run() {
80
      thread = Thread.currentThread();
1✔
81
      originalName = thread.getName();
1✔
82
      thread.setName(name);
1✔
83
      DeterministicRunnerImpl.setCurrentThreadInternal(WorkflowThreadImpl.this);
1✔
84
      decisionContext.getWorkflowId();
1✔
85
      MDC.put(LoggerTag.WORKFLOW_ID, decisionContext.getWorkflowId());
1✔
86
      MDC.put(LoggerTag.WORKFLOW_TYPE, decisionContext.getWorkflowType().getName());
1✔
87
      MDC.put(LoggerTag.RUN_ID, decisionContext.getRunId());
1✔
88
      MDC.put(LoggerTag.TASK_LIST, decisionContext.getTaskList());
1✔
89
      MDC.put(LoggerTag.DOMAIN, decisionContext.getDomain());
1✔
90

91
      // Repopulate the context(s)
92
      ContextThreadLocal.setContextPropagators(this.contextPropagators);
1✔
93
      ContextThreadLocal.propagateContextToCurrentThread(this.propagatedContexts);
1✔
94

95
      try {
96
        // initialYield blocks thread until the first runUntilBlocked is called.
97
        // Otherwise r starts executing without control of the sync.
98
        threadContext.initialYield();
1✔
99
        cancellationScope.run();
1✔
100
      } catch (DestroyWorkflowThreadError e) {
1✔
101
        if (!threadContext.isDestroyRequested()) {
1✔
102
          threadContext.setUnhandledException(e);
1✔
103
        }
104
      } catch (Error e) {
1✔
105
        // Error aborts decision, not fails the workflow.
106
        if (log.isErrorEnabled() && !root) {
1✔
107
          StringWriter sw = new StringWriter();
1✔
108
          PrintWriter pw = new PrintWriter(sw, true);
1✔
109
          e.printStackTrace(pw);
1✔
110
          String stackTrace = sw.getBuffer().toString();
1✔
111
          log.error(
1✔
112
              String.format("Workflow thread \"%s\" run failed with Error:\n%s", name, stackTrace));
1✔
113
        }
114
        threadContext.setUnhandledException(e);
1✔
115
      } catch (CancellationException e) {
1✔
116
        if (!isCancelRequested()) {
1✔
117
          threadContext.setUnhandledException(e);
1✔
118
        }
119
        if (log.isDebugEnabled()) {
1✔
120
          log.debug(String.format("Workflow thread \"%s\" run cancelled", name));
×
121
        }
122
      } catch (Throwable e) {
1✔
123
        if (log.isWarnEnabled() && !root) {
1✔
124
          StringWriter sw = new StringWriter();
×
125
          PrintWriter pw = new PrintWriter(sw, true);
×
126
          e.printStackTrace(pw);
×
127
          String stackTrace = sw.getBuffer().toString();
×
128
          log.warn(
×
129
              String.format(
×
130
                  "Workflow thread \"%s\" run failed with unhandled exception:\n%s",
131
                  name, stackTrace));
132
        }
133
        threadContext.setUnhandledException(e);
1✔
134
      } finally {
135
        DeterministicRunnerImpl.setCurrentThreadInternal(null);
1✔
136
        threadContext.setStatus(Status.DONE);
1✔
137
        thread.setName(originalName);
1✔
138
        thread = null;
1✔
139
        MDC.clear();
1✔
140
      }
141
    }
1✔
142

143
    public String getName() {
144
      return name;
1✔
145
    }
146

147
    StackTraceElement[] getStackTrace() {
148
      if (thread != null) {
×
149
        return thread.getStackTrace();
×
150
      }
151
      return new StackTraceElement[0];
×
152
    }
153

154
    public void setName(String name) {
155
      this.name = name;
×
156
      if (thread != null) {
×
157
        thread.setName(name);
×
158
      }
159
    }
×
160
  }
161

162
  private static final Logger log = LoggerFactory.getLogger(WorkflowThreadImpl.class);
1✔
163

164
  private final boolean root;
165
  private final ExecutorService threadPool;
166
  private final WorkflowThreadContext context;
167
  private DeciderCache cache;
168
  private final DeterministicRunnerImpl runner;
169
  private final RunnableWrapper task;
170
  private Thread thread;
171
  private Future<?> taskFuture;
172
  private final Map<WorkflowThreadLocalInternal<?>, Object> threadLocalMap = new HashMap<>();
1✔
173

174
  /**
175
   * If not 0 then thread is blocked on a sleep (or on an operation with a timeout). The value is
176
   * the time in milliseconds (as in currentTimeMillis()) when thread will continue. Note that
177
   * thread still has to be called for evaluation as other threads might interrupt the blocking
178
   * call.
179
   */
180
  private long blockedUntil;
181

182
  WorkflowThreadImpl(
183
      boolean root,
184
      ExecutorService threadPool,
185
      DeterministicRunnerImpl runner,
186
      String name,
187
      boolean detached,
188
      CancellationScopeImpl parentCancellationScope,
189
      Runnable runnable,
190
      DeciderCache cache,
191
      List<ContextPropagator> contextPropagators,
192
      Map<String, Object> propagatedContexts) {
1✔
193
    this.root = root;
1✔
194
    this.threadPool = threadPool;
1✔
195
    this.runner = runner;
1✔
196
    this.context = new WorkflowThreadContext(runner.getLock());
1✔
197
    this.cache = cache;
1✔
198

199
    if (name == null) {
1✔
200
      name = "workflow-" + super.hashCode();
1✔
201
    }
202

203
    this.task =
1✔
204
        new RunnableWrapper(
205
            context,
206
            runner.getDecisionContext().getContext(),
1✔
207
            name,
208
            detached,
209
            parentCancellationScope,
210
            runnable,
211
            contextPropagators,
212
            propagatedContexts);
213
  }
1✔
214

215
  @Override
216
  public void run() {
217
    throw new UnsupportedOperationException("not used");
×
218
  }
219

220
  @Override
221
  public boolean isDetached() {
222
    return task.cancellationScope.isDetached();
×
223
  }
224

225
  @Override
226
  public void cancel() {
227
    task.cancellationScope.cancel();
×
228
  }
×
229

230
  @Override
231
  public void cancel(String reason) {
232
    task.cancellationScope.cancel(reason);
1✔
233
  }
1✔
234

235
  @Override
236
  public String getCancellationReason() {
237
    return task.cancellationScope.getCancellationReason();
×
238
  }
239

240
  @Override
241
  public boolean isCancelRequested() {
242
    return task.cancellationScope.isCancelRequested();
1✔
243
  }
244

245
  @Override
246
  public Promise<String> getCancellationRequest() {
247
    return task.cancellationScope.getCancellationRequest();
×
248
  }
249

250
  @Override
251
  public void start() {
252
    if (context.getStatus() != Status.CREATED) {
1✔
253
      throw new IllegalThreadStateException("already started");
×
254
    }
255
    context.setStatus(Status.RUNNING);
1✔
256

257
    if (metricsRateLimiter.tryAcquire(1)) {
1✔
258
      getDecisionContext()
1✔
259
          .getMetricsScope()
1✔
260
          .gauge(MetricsType.WORKFLOW_ACTIVE_THREAD_COUNT)
1✔
261
          .update(((ThreadPoolExecutor) threadPool).getActiveCount());
1✔
262
    }
263

264
    while (true) {
265
      try {
266
        taskFuture = threadPool.submit(task);
1✔
267
        return;
1✔
268
      } catch (RejectedExecutionException e) {
1✔
269
        getDecisionContext()
1✔
270
            .getMetricsScope()
1✔
271
            .counter(MetricsType.STICKY_CACHE_THREAD_FORCED_EVICTION)
1✔
272
            .inc(1);
1✔
273
        if (cache != null) {
1✔
274
          boolean evicted =
1✔
275
              cache.evictAnyNotInProcessing(
1✔
276
                  this.runner.getDecisionContext().getContext().getRunId());
1✔
277
          if (!evicted) {
1✔
278
            // Note here we need to throw error, not exception. Otherwise it will be
279
            // translated to workflow execution exception and instead of failing the
280
            // decision we will be failing the workflow.
UNCOV
281
            throw new WorkflowRejectedExecutionError(e);
×
282
          }
283
        } else {
1✔
284
          throw new WorkflowRejectedExecutionError(e);
1✔
285
        }
286
      }
1✔
287
    }
288
  }
289

290
  public WorkflowThreadContext getContext() {
291
    return context;
1✔
292
  }
293

294
  @Override
295
  public DeterministicRunnerImpl getRunner() {
296
    return runner;
1✔
297
  }
298

299
  @Override
300
  public SyncDecisionContext getDecisionContext() {
301
    return runner.getDecisionContext();
1✔
302
  }
303

304
  @Override
305
  public void setName(String name) {
306
    task.setName(name);
×
307
  }
×
308

309
  @Override
310
  public String getName() {
311
    return task.getName();
1✔
312
  }
313

314
  @Override
315
  public long getId() {
316
    return hashCode();
×
317
  }
318

319
  @Override
320
  public long getBlockedUntil() {
321
    return blockedUntil;
1✔
322
  }
323

324
  private void setBlockedUntil(long blockedUntil) {
325
    this.blockedUntil = blockedUntil;
1✔
326
  }
1✔
327

328
  /** @return true if coroutine made some progress. */
329
  @Override
330
  public boolean runUntilBlocked() {
331
    if (taskFuture == null) {
1✔
332
      start();
1✔
333
    }
334
    return context.runUntilBlocked();
1✔
335
  }
336

337
  @Override
338
  public boolean isDone() {
339
    return context.isDone();
1✔
340
  }
341

342
  public Thread.State getState() {
343
    if (context.getStatus() == Status.YIELDED) {
×
344
      return Thread.State.BLOCKED;
×
345
    } else if (context.getStatus() == Status.DONE) {
×
346
      return Thread.State.TERMINATED;
×
347
    } else {
348
      return Thread.State.RUNNABLE;
×
349
    }
350
  }
351

352
  @Override
353
  public Throwable getUnhandledException() {
354
    return context.getUnhandledException();
1✔
355
  }
356

357
  /**
358
   * Evaluates function in the threadContext of the coroutine without unblocking it. Used to get
359
   * current coroutine status, like stack trace.
360
   *
361
   * @param function Parameter is reason for current goroutine blockage.
362
   */
363
  public void evaluateInCoroutineContext(Consumer<String> function) {
364
    context.evaluateInCoroutineContext(function);
×
365
  }
×
366

367
  /**
368
   * Interrupt coroutine by throwing DestroyWorkflowThreadError from a await method it is blocked on
369
   * and return underlying Future to be waited on.
370
   */
371
  @Override
372
  public Future<?> stopNow() {
373
    // Cannot call destroy() on itself
374
    if (thread == Thread.currentThread()) {
1✔
375
      throw new Error("Cannot call destroy on itself: " + thread.getName());
×
376
    }
377
    context.destroy();
1✔
378
    if (!context.isDone()) {
1✔
379
      throw new RuntimeException(
×
380
          "Couldn't destroy the thread. " + "The blocked thread stack trace: " + getStackTrace());
×
381
    }
382
    if (taskFuture == null) {
1✔
383
      return getCompletedFuture();
1✔
384
    }
385
    return taskFuture;
1✔
386
  }
387

388
  private Future<?> getCompletedFuture() {
389
    CompletableFuture<String> f = new CompletableFuture<>();
1✔
390
    f.complete("done");
1✔
391
    return f;
1✔
392
  }
393

394
  @Override
395
  public void addStackTrace(StringBuilder result) {
396
    result.append(getName());
1✔
397
    if (thread == null) {
1✔
398
      result.append("(NEW)");
×
399
      return;
×
400
    }
401
    result.append(": (BLOCKED on ").append(getContext().getYieldReason()).append(")\n");
1✔
402
    // These numbers might change if implementation changes.
403
    int omitTop = 5;
1✔
404
    int omitBottom = 7;
1✔
405
    if (DeterministicRunnerImpl.WORKFLOW_ROOT_THREAD_NAME.equals(getName())) {
1✔
406
      omitBottom = 11;
1✔
407
    }
408
    StackTraceElement[] stackTrace = thread.getStackTrace();
1✔
409
    for (int i = omitTop; i < stackTrace.length - omitBottom; i++) {
1✔
410
      StackTraceElement e = stackTrace[i];
1✔
411
      if (i == omitTop && "await".equals(e.getMethodName())) continue;
1✔
412
      result.append(e);
1✔
413
      result.append("\n");
1✔
414
    }
415
  }
1✔
416

417
  @Override
418
  public void yield(String reason, Supplier<Boolean> unblockCondition) {
419
    context.yield(reason, unblockCondition);
1✔
420
  }
1✔
421

422
  @Override
423
  public boolean yield(long timeoutMillis, String reason, Supplier<Boolean> unblockCondition)
424
      throws DestroyWorkflowThreadError {
425
    if (timeoutMillis == 0) {
1✔
426
      return unblockCondition.get();
1✔
427
    }
428
    long blockedUntil = WorkflowInternal.currentTimeMillis() + timeoutMillis;
1✔
429
    setBlockedUntil(blockedUntil);
1✔
430
    YieldWithTimeoutCondition condition =
1✔
431
        new YieldWithTimeoutCondition(unblockCondition, blockedUntil);
432
    WorkflowThread.await(reason, condition);
1✔
433
    return !condition.isTimedOut();
1✔
434
  }
435

436
  @Override
437
  public <R> void exitThread(R value) {
438
    runner.exit(value);
1✔
439
    throw new DestroyWorkflowThreadError("exit");
1✔
440
  }
441

442
  @Override
443
  public <T> void setThreadLocal(WorkflowThreadLocalInternal<T> key, T value) {
444
    threadLocalMap.put(key, value);
1✔
445
  }
1✔
446

447
  @SuppressWarnings("unchecked")
448
  @Override
449
  public <T> Optional<T> getThreadLocal(WorkflowThreadLocalInternal<T> key) {
450
    if (!threadLocalMap.containsKey(key)) {
1✔
451
      return Optional.empty();
1✔
452
    }
453
    return Optional.of((T) threadLocalMap.get(key));
1✔
454
  }
455

456
  /** @return stack trace of the coroutine thread */
457
  @Override
458
  public String getStackTrace() {
459
    StackTraceElement[] st = task.getStackTrace();
×
460
    StringWriter sw = new StringWriter();
×
461
    PrintWriter pw = new PrintWriter(sw);
×
462
    for (StackTraceElement se : st) {
×
463
      pw.println("\tat " + se);
×
464
    }
465
    return sw.toString();
×
466
  }
467

468
  static class YieldWithTimeoutCondition implements Supplier<Boolean> {
469

470
    private final Supplier<Boolean> unblockCondition;
471
    private final long blockedUntil;
472
    private boolean timedOut;
473

474
    YieldWithTimeoutCondition(Supplier<Boolean> unblockCondition, long blockedUntil) {
1✔
475
      this.unblockCondition = unblockCondition;
1✔
476
      this.blockedUntil = blockedUntil;
1✔
477
    }
1✔
478

479
    boolean isTimedOut() {
480
      return timedOut;
1✔
481
    }
482

483
    /** @return true if condition matched or timed out */
484
    @Override
485
    public Boolean get() {
486
      boolean result = unblockCondition.get();
1✔
487
      if (result) {
1✔
488
        return true;
1✔
489
      }
490
      timedOut = WorkflowInternal.currentTimeMillis() >= blockedUntil;
1✔
491
      return timedOut;
1✔
492
    }
493
  }
494
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc