• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

uber / cadence-java-client / 1759

pending completion
1759

push

buildkite

GitHub
Exposed startedEventAttribute and DataConverter (#799)

* exposing startedEventAttributes through DecisionContext in WorkflowInfo for using in ContinueAsNew

* addition of workflowEventAttributes in dummy implementation

* Addition of dataConverter

8 of 8 new or added lines in 3 files covered. (100.0%)

11114 of 18404 relevant lines covered (60.39%)

0.6 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

77.2
/src/main/java/com/uber/cadence/internal/sync/WorkflowThreadImpl.java
1
/*
2
 *  Copyright 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved.
3
 *
4
 *  Modifications copyright (C) 2017 Uber Technologies, Inc.
5
 *
6
 *  Licensed under the Apache License, Version 2.0 (the "License"). You may not
7
 *  use this file except in compliance with the License. A copy of the License is
8
 *  located at
9
 *
10
 *  http://aws.amazon.com/apache2.0
11
 *
12
 *  or in the "license" file accompanying this file. This file is distributed on
13
 *  an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
14
 *  express or implied. See the License for the specific language governing
15
 *  permissions and limitations under the License.
16
 */
17

18
package com.uber.cadence.internal.sync;
19

20
import com.google.common.util.concurrent.RateLimiter;
21
import com.uber.cadence.context.ContextPropagator;
22
import com.uber.cadence.internal.context.ContextThreadLocal;
23
import com.uber.cadence.internal.logging.LoggerTag;
24
import com.uber.cadence.internal.metrics.MetricsType;
25
import com.uber.cadence.internal.replay.DeciderCache;
26
import com.uber.cadence.internal.replay.DecisionContext;
27
import com.uber.cadence.workflow.Promise;
28
import java.io.PrintWriter;
29
import java.io.StringWriter;
30
import java.util.HashMap;
31
import java.util.List;
32
import java.util.Map;
33
import java.util.Optional;
34
import java.util.concurrent.*;
35
import java.util.function.Consumer;
36
import java.util.function.Supplier;
37
import org.slf4j.Logger;
38
import org.slf4j.LoggerFactory;
39
import org.slf4j.MDC;
40

41
class WorkflowThreadImpl implements WorkflowThread {
42
  private static final RateLimiter metricsRateLimiter = RateLimiter.create(1);
1✔
43

44
  /**
45
   * Runnable passed to the thread that wraps a runnable passed to the WorkflowThreadImpl
46
   * constructor.
47
   */
48
  class RunnableWrapper implements Runnable {
49

50
    private final WorkflowThreadContext threadContext;
51
    private final DecisionContext decisionContext;
52
    private String originalName;
53
    private String name;
54
    private CancellationScopeImpl cancellationScope;
55
    private List<ContextPropagator> contextPropagators;
56
    private Map<String, Object> propagatedContexts;
57

58
    RunnableWrapper(
59
        WorkflowThreadContext threadContext,
60
        DecisionContext decisionContext,
61
        String name,
62
        boolean detached,
63
        CancellationScopeImpl parent,
64
        Runnable runnable,
65
        List<ContextPropagator> contextPropagators,
66
        Map<String, Object> propagatedContexts) {
1✔
67
      this.threadContext = threadContext;
1✔
68
      this.decisionContext = decisionContext;
1✔
69
      this.name = name;
1✔
70
      cancellationScope = new CancellationScopeImpl(detached, runnable, parent);
1✔
71
      if (context.getStatus() != Status.CREATED) {
1✔
72
        throw new IllegalStateException("threadContext not in CREATED state");
×
73
      }
74
      this.contextPropagators = contextPropagators;
1✔
75
      this.propagatedContexts = propagatedContexts;
1✔
76
    }
1✔
77

78
    @Override
79
    public void run() {
80
      thread = Thread.currentThread();
1✔
81
      originalName = thread.getName();
1✔
82
      thread.setName(name);
1✔
83
      DeterministicRunnerImpl.setCurrentThreadInternal(WorkflowThreadImpl.this);
1✔
84
      decisionContext.getWorkflowId();
1✔
85
      MDC.put(LoggerTag.WORKFLOW_ID, decisionContext.getWorkflowId());
1✔
86
      MDC.put(LoggerTag.WORKFLOW_TYPE, decisionContext.getWorkflowType().getName());
1✔
87
      MDC.put(LoggerTag.RUN_ID, decisionContext.getRunId());
1✔
88
      MDC.put(LoggerTag.TASK_LIST, decisionContext.getTaskList());
1✔
89
      MDC.put(LoggerTag.DOMAIN, decisionContext.getDomain());
1✔
90

91
      // Repopulate the context(s)
92
      ContextThreadLocal.setContextPropagators(this.contextPropagators);
1✔
93
      ContextThreadLocal.propagateContextToCurrentThread(this.propagatedContexts);
1✔
94

95
      try {
96
        // initialYield blocks thread until the first runUntilBlocked is called.
97
        // Otherwise r starts executing without control of the sync.
98
        threadContext.initialYield();
1✔
99
        cancellationScope.run();
1✔
100
      } catch (DestroyWorkflowThreadError e) {
1✔
101
        if (!threadContext.isDestroyRequested()) {
1✔
102
          threadContext.setUnhandledException(e);
1✔
103
        }
104
      } catch (Error e) {
1✔
105
        // Error aborts decision, not fails the workflow.
106
        if (log.isErrorEnabled() && !root) {
1✔
107
          StringWriter sw = new StringWriter();
1✔
108
          PrintWriter pw = new PrintWriter(sw, true);
1✔
109
          e.printStackTrace(pw);
1✔
110
          String stackTrace = sw.getBuffer().toString();
1✔
111
          log.error(
1✔
112
              String.format("Workflow thread \"%s\" run failed with Error:\n%s", name, stackTrace));
1✔
113
        }
114
        threadContext.setUnhandledException(e);
1✔
115
      } catch (CancellationException e) {
1✔
116
        if (!isCancelRequested()) {
1✔
117
          threadContext.setUnhandledException(e);
1✔
118
        }
119
        if (log.isDebugEnabled()) {
1✔
120
          log.debug(String.format("Workflow thread \"%s\" run cancelled", name));
×
121
        }
122
      } catch (Throwable e) {
1✔
123
        if (log.isWarnEnabled() && !root) {
1✔
124
          StringWriter sw = new StringWriter();
×
125
          PrintWriter pw = new PrintWriter(sw, true);
×
126
          e.printStackTrace(pw);
×
127
          String stackTrace = sw.getBuffer().toString();
×
128
          log.warn(
×
129
              String.format(
×
130
                  "Workflow thread \"%s\" run failed with unhandled exception:\n%s",
131
                  name, stackTrace));
132
        }
133
        threadContext.setUnhandledException(e);
1✔
134
      } finally {
135
        DeterministicRunnerImpl.setCurrentThreadInternal(null);
1✔
136
        threadContext.setStatus(Status.DONE);
1✔
137
        thread.setName(originalName);
1✔
138
        thread = null;
1✔
139
        MDC.clear();
1✔
140
      }
141
    }
1✔
142

143
    public String getName() {
144
      return name;
1✔
145
    }
146

147
    StackTraceElement[] getStackTrace() {
148
      if (thread != null) {
×
149
        return thread.getStackTrace();
×
150
      }
151
      return new StackTraceElement[0];
×
152
    }
153

154
    public void setName(String name) {
155
      this.name = name;
×
156
      if (thread != null) {
×
157
        thread.setName(name);
×
158
      }
159
    }
×
160
  }
161

162
  private static final Logger log = LoggerFactory.getLogger(WorkflowThreadImpl.class);
1✔
163

164
  private final boolean root;
165
  private final ExecutorService threadPool;
166
  private final WorkflowThreadContext context;
167
  private DeciderCache cache;
168
  private final DeterministicRunnerImpl runner;
169
  private final RunnableWrapper task;
170
  private Thread thread;
171
  private Future<?> taskFuture;
172
  private final Map<WorkflowThreadLocalInternal<?>, Object> threadLocalMap = new HashMap<>();
1✔
173

174
  /**
175
   * If not 0 then thread is blocked on a sleep (or on an operation with a timeout). The value is
176
   * the time in milliseconds (as in currentTimeMillis()) when thread will continue. Note that
177
   * thread still has to be called for evaluation as other threads might interrupt the blocking
178
   * call.
179
   */
180
  private long blockedUntil;
181

182
  WorkflowThreadImpl(
183
      boolean root,
184
      ExecutorService threadPool,
185
      DeterministicRunnerImpl runner,
186
      String name,
187
      boolean detached,
188
      CancellationScopeImpl parentCancellationScope,
189
      Runnable runnable,
190
      DeciderCache cache,
191
      List<ContextPropagator> contextPropagators,
192
      Map<String, Object> propagatedContexts) {
1✔
193
    this.root = root;
1✔
194
    this.threadPool = threadPool;
1✔
195
    this.runner = runner;
1✔
196
    this.context = new WorkflowThreadContext(runner.getLock());
1✔
197
    this.cache = cache;
1✔
198

199
    if (name == null) {
1✔
200
      name = "workflow-" + super.hashCode();
1✔
201
    }
202

203
    this.task =
1✔
204
        new RunnableWrapper(
205
            context,
206
            runner.getDecisionContext().getContext(),
1✔
207
            name,
208
            detached,
209
            parentCancellationScope,
210
            runnable,
211
            contextPropagators,
212
            propagatedContexts);
213
  }
1✔
214

215
  @Override
216
  public void run() {
217
    throw new UnsupportedOperationException("not used");
×
218
  }
219

220
  @Override
221
  public boolean isDetached() {
222
    return task.cancellationScope.isDetached();
×
223
  }
224

225
  @Override
226
  public void cancel() {
227
    task.cancellationScope.cancel();
×
228
  }
×
229

230
  @Override
231
  public void cancel(String reason) {
232
    task.cancellationScope.cancel(reason);
1✔
233
  }
1✔
234

235
  @Override
236
  public String getCancellationReason() {
237
    return task.cancellationScope.getCancellationReason();
×
238
  }
239

240
  @Override
241
  public boolean isCancelRequested() {
242
    return task.cancellationScope.isCancelRequested();
1✔
243
  }
244

245
  @Override
246
  public Promise<String> getCancellationRequest() {
247
    return task.cancellationScope.getCancellationRequest();
×
248
  }
249

250
  @Override
251
  public void start() {
252
    if (context.getStatus() != Status.CREATED) {
1✔
253
      throw new IllegalThreadStateException("already started");
×
254
    }
255
    context.setStatus(Status.RUNNING);
1✔
256

257
    if (metricsRateLimiter.tryAcquire(1)) {
1✔
258
      getDecisionContext()
1✔
259
          .getMetricsScope()
1✔
260
          .gauge(MetricsType.WORKFLOW_ACTIVE_THREAD_COUNT)
1✔
261
          .update(((ThreadPoolExecutor) threadPool).getActiveCount());
1✔
262
    }
263

264
    while (true) {
265
      try {
266
        taskFuture = threadPool.submit(task);
1✔
267
        return;
1✔
268
      } catch (RejectedExecutionException e) {
1✔
269
        getDecisionContext()
1✔
270
            .getMetricsScope()
1✔
271
            .counter(MetricsType.STICKY_CACHE_THREAD_FORCED_EVICTION)
1✔
272
            .inc(1);
1✔
273
        if (cache != null) {
1✔
274
          boolean evicted =
1✔
275
              cache.evictAnyNotInProcessing(
1✔
276
                  this.runner.getDecisionContext().getContext().getRunId());
1✔
277
          if (!evicted) {
1✔
278
            // Note here we need to throw error, not exception. Otherwise it will be
279
            // translated to workflow execution exception and instead of failing the
280
            // decision we will be failing the workflow.
281
            throw new WorkflowRejectedExecutionError(e);
×
282
          }
283
        } else {
1✔
284
          throw new WorkflowRejectedExecutionError(e);
1✔
285
        }
286
      }
1✔
287
    }
288
  }
289

290
  public WorkflowThreadContext getContext() {
291
    return context;
1✔
292
  }
293

294
  @Override
295
  public DeterministicRunnerImpl getRunner() {
296
    return runner;
1✔
297
  }
298

299
  @Override
300
  public SyncDecisionContext getDecisionContext() {
301
    return runner.getDecisionContext();
1✔
302
  }
303

304
  @Override
305
  public void setName(String name) {
306
    task.setName(name);
×
307
  }
×
308

309
  @Override
310
  public String getName() {
311
    return task.getName();
1✔
312
  }
313

314
  @Override
315
  public long getId() {
316
    return hashCode();
×
317
  }
318

319
  @Override
320
  public long getBlockedUntil() {
321
    return blockedUntil;
1✔
322
  }
323

324
  private void setBlockedUntil(long blockedUntil) {
325
    this.blockedUntil = blockedUntil;
1✔
326
  }
1✔
327

328
  /** @return true if coroutine made some progress. */
329
  @Override
330
  public boolean runUntilBlocked() {
331
    if (taskFuture == null) {
1✔
332
      start();
1✔
333
    }
334
    return context.runUntilBlocked();
1✔
335
  }
336

337
  @Override
338
  public boolean isDone() {
339
    return context.isDone();
1✔
340
  }
341

342
  public Thread.State getState() {
343
    if (context.getStatus() == Status.YIELDED) {
×
344
      return Thread.State.BLOCKED;
×
345
    } else if (context.getStatus() == Status.DONE) {
×
346
      return Thread.State.TERMINATED;
×
347
    } else {
348
      return Thread.State.RUNNABLE;
×
349
    }
350
  }
351

352
  @Override
353
  public Throwable getUnhandledException() {
354
    return context.getUnhandledException();
1✔
355
  }
356

357
  /**
358
   * Evaluates function in the threadContext of the coroutine without unblocking it. Used to get
359
   * current coroutine status, like stack trace.
360
   *
361
   * @param function Parameter is reason for current goroutine blockage.
362
   */
363
  public void evaluateInCoroutineContext(Consumer<String> function) {
364
    context.evaluateInCoroutineContext(function);
×
365
  }
×
366

367
  /**
368
   * Interrupt coroutine by throwing DestroyWorkflowThreadError from a await method it is blocked on
369
   * and return underlying Future to be waited on.
370
   */
371
  @Override
372
  public Future<?> stopNow() {
373
    // Cannot call destroy() on itself
374
    if (thread == Thread.currentThread()) {
1✔
375
      throw new Error("Cannot call destroy on itself: " + thread.getName());
×
376
    }
377
    context.destroy();
1✔
378
    if (!context.isDone()) {
1✔
379
      throw new RuntimeException(
×
380
          "Couldn't destroy the thread. " + "The blocked thread stack trace: " + getStackTrace());
×
381
    }
382
    if (taskFuture == null) {
1✔
383
      return getCompletedFuture();
1✔
384
    }
385
    return taskFuture;
1✔
386
  }
387

388
  private Future<?> getCompletedFuture() {
389
    CompletableFuture<String> f = new CompletableFuture<>();
1✔
390
    f.complete("done");
1✔
391
    return f;
1✔
392
  }
393

394
  @Override
395
  public void addStackTrace(StringBuilder result) {
396
    result.append(getName());
1✔
397
    if (thread == null) {
1✔
398
      result.append("(NEW)");
×
399
      return;
×
400
    }
401
    result.append(": (BLOCKED on ").append(getContext().getYieldReason()).append(")\n");
1✔
402
    // These numbers might change if implementation changes.
403
    int omitTop = 5;
1✔
404
    int omitBottom = 7;
1✔
405
    if (DeterministicRunnerImpl.WORKFLOW_ROOT_THREAD_NAME.equals(getName())) {
1✔
406
      omitBottom = 11;
1✔
407
    }
408
    StackTraceElement[] stackTrace = thread.getStackTrace();
1✔
409
    for (int i = omitTop; i < stackTrace.length - omitBottom; i++) {
1✔
410
      StackTraceElement e = stackTrace[i];
1✔
411
      if (i == omitTop && "await".equals(e.getMethodName())) continue;
1✔
412
      result.append(e);
1✔
413
      result.append("\n");
1✔
414
    }
415
  }
1✔
416

417
  @Override
418
  public void yield(String reason, Supplier<Boolean> unblockCondition) {
419
    context.yield(reason, unblockCondition);
1✔
420
  }
1✔
421

422
  @Override
423
  public boolean yield(long timeoutMillis, String reason, Supplier<Boolean> unblockCondition)
424
      throws DestroyWorkflowThreadError {
425
    if (timeoutMillis == 0) {
1✔
426
      return unblockCondition.get();
1✔
427
    }
428
    long blockedUntil = WorkflowInternal.currentTimeMillis() + timeoutMillis;
1✔
429
    setBlockedUntil(blockedUntil);
1✔
430
    YieldWithTimeoutCondition condition =
1✔
431
        new YieldWithTimeoutCondition(unblockCondition, blockedUntil);
432
    WorkflowThread.await(reason, condition);
1✔
433
    return !condition.isTimedOut();
1✔
434
  }
435

436
  @Override
437
  public <R> void exitThread(R value) {
438
    runner.exit(value);
1✔
439
    throw new DestroyWorkflowThreadError("exit");
1✔
440
  }
441

442
  @Override
443
  public <T> void setThreadLocal(WorkflowThreadLocalInternal<T> key, T value) {
444
    threadLocalMap.put(key, value);
1✔
445
  }
1✔
446

447
  @SuppressWarnings("unchecked")
448
  @Override
449
  public <T> Optional<T> getThreadLocal(WorkflowThreadLocalInternal<T> key) {
450
    if (!threadLocalMap.containsKey(key)) {
1✔
451
      return Optional.empty();
1✔
452
    }
453
    return Optional.of((T) threadLocalMap.get(key));
1✔
454
  }
455

456
  /** @return stack trace of the coroutine thread */
457
  @Override
458
  public String getStackTrace() {
459
    StackTraceElement[] st = task.getStackTrace();
×
460
    StringWriter sw = new StringWriter();
×
461
    PrintWriter pw = new PrintWriter(sw);
×
462
    for (StackTraceElement se : st) {
×
463
      pw.println("\tat " + se);
×
464
    }
465
    return sw.toString();
×
466
  }
467

468
  static class YieldWithTimeoutCondition implements Supplier<Boolean> {
469

470
    private final Supplier<Boolean> unblockCondition;
471
    private final long blockedUntil;
472
    private boolean timedOut;
473

474
    YieldWithTimeoutCondition(Supplier<Boolean> unblockCondition, long blockedUntil) {
1✔
475
      this.unblockCondition = unblockCondition;
1✔
476
      this.blockedUntil = blockedUntil;
1✔
477
    }
1✔
478

479
    boolean isTimedOut() {
480
      return timedOut;
1✔
481
    }
482

483
    /** @return true if condition matched or timed out */
484
    @Override
485
    public Boolean get() {
486
      boolean result = unblockCondition.get();
1✔
487
      if (result) {
1✔
488
        return true;
1✔
489
      }
490
      timedOut = WorkflowInternal.currentTimeMillis() >= blockedUntil;
1✔
491
      return timedOut;
1✔
492
    }
493
  }
494
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc