• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

uber / cadence-java-client / 2594

04 Nov 2024 05:49PM UTC coverage: 76.079% (+0.03%) from 76.054%
2594

Pull #937

buildkite

natemort
Fix flakiness in ManualActivityCompletionWorkflowTest

The operations here are flaky because we're starting an activity and then attempting to cancel/complete/fail it in parallel. We need the second activity to block until the first one has started, and there's no way to orchestrate that within the workflow. The best we can do is make the acitivites block until the ActivityTask is available.
Pull Request #937: Fix flakiness in ManualActivityCompletionWorkflowTest

14748 of 19385 relevant lines covered (76.08%)

0.76 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

77.32
/src/main/java/com/uber/cadence/internal/sync/WorkflowThreadImpl.java
1
/*
2
 *  Copyright 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved.
3
 *
4
 *  Modifications copyright (C) 2017 Uber Technologies, Inc.
5
 *
6
 *  Licensed under the Apache License, Version 2.0 (the "License"). You may not
7
 *  use this file except in compliance with the License. A copy of the License is
8
 *  located at
9
 *
10
 *  http://aws.amazon.com/apache2.0
11
 *
12
 *  or in the "license" file accompanying this file. This file is distributed on
13
 *  an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
14
 *  express or implied. See the License for the specific language governing
15
 *  permissions and limitations under the License.
16
 */
17

18
package com.uber.cadence.internal.sync;
19

20
import com.google.common.util.concurrent.RateLimiter;
21
import com.uber.cadence.context.ContextPropagator;
22
import com.uber.cadence.internal.context.ContextThreadLocal;
23
import com.uber.cadence.internal.logging.LoggerTag;
24
import com.uber.cadence.internal.metrics.MetricsType;
25
import com.uber.cadence.internal.replay.DeciderCache;
26
import com.uber.cadence.internal.replay.DecisionContext;
27
import com.uber.cadence.workflow.Promise;
28
import java.io.PrintWriter;
29
import java.io.StringWriter;
30
import java.util.HashMap;
31
import java.util.List;
32
import java.util.Map;
33
import java.util.Optional;
34
import java.util.concurrent.*;
35
import java.util.function.Consumer;
36
import java.util.function.Supplier;
37
import org.slf4j.Logger;
38
import org.slf4j.LoggerFactory;
39
import org.slf4j.MDC;
40

41
class WorkflowThreadImpl implements WorkflowThread {
42
  private static final RateLimiter metricsRateLimiter = RateLimiter.create(1);
1✔
43

44
  /**
45
   * Runnable passed to the thread that wraps a runnable passed to the WorkflowThreadImpl
46
   * constructor.
47
   */
48
  class RunnableWrapper implements Runnable {
49

50
    private final WorkflowThreadContext threadContext;
51
    private final DecisionContext decisionContext;
52
    private String originalName;
53
    private String name;
54
    private CancellationScopeImpl cancellationScope;
55
    private List<ContextPropagator> contextPropagators;
56
    private Map<String, Object> propagatedContexts;
57

58
    RunnableWrapper(
59
        WorkflowThreadContext threadContext,
60
        DecisionContext decisionContext,
61
        String name,
62
        boolean detached,
63
        CancellationScopeImpl parent,
64
        Runnable runnable,
65
        List<ContextPropagator> contextPropagators,
66
        Map<String, Object> propagatedContexts) {
1✔
67
      this.threadContext = threadContext;
1✔
68
      this.decisionContext = decisionContext;
1✔
69
      this.name = name;
1✔
70
      cancellationScope = new CancellationScopeImpl(detached, runnable, parent);
1✔
71
      if (context.getStatus() != Status.CREATED) {
1✔
72
        throw new IllegalStateException("threadContext not in CREATED state");
×
73
      }
74
      this.contextPropagators = contextPropagators;
1✔
75
      this.propagatedContexts = propagatedContexts;
1✔
76
    }
1✔
77

78
    @Override
79
    public void run() {
80
      thread = Thread.currentThread();
1✔
81
      originalName = thread.getName();
1✔
82
      thread.setName(name);
1✔
83
      DeterministicRunnerImpl.setCurrentThreadInternal(WorkflowThreadImpl.this);
1✔
84
      decisionContext.getWorkflowId();
1✔
85
      MDC.put(LoggerTag.WORKFLOW_ID, decisionContext.getWorkflowId());
1✔
86
      MDC.put(LoggerTag.WORKFLOW_TYPE, decisionContext.getWorkflowType().getName());
1✔
87
      MDC.put(LoggerTag.RUN_ID, decisionContext.getRunId());
1✔
88
      MDC.put(LoggerTag.TASK_LIST, decisionContext.getTaskList());
1✔
89
      MDC.put(LoggerTag.DOMAIN, decisionContext.getDomain());
1✔
90

91
      // Repopulate the context(s)
92
      ContextThreadLocal.setContextPropagators(this.contextPropagators);
1✔
93
      ContextThreadLocal.propagateContextToCurrentThread(this.propagatedContexts);
1✔
94

95
      try {
96
        // initialYield blocks thread until the first runUntilBlocked is called.
97
        // Otherwise r starts executing without control of the sync.
98
        threadContext.initialYield();
1✔
99
        cancellationScope.run();
1✔
100
      } catch (DestroyWorkflowThreadError e) {
1✔
101
        if (!threadContext.isDestroyRequested()) {
1✔
102
          threadContext.setUnhandledException(e);
1✔
103
        }
104
      } catch (Error e) {
1✔
105
        // Error aborts decision, not fails the workflow.
106
        if (log.isErrorEnabled() && !root) {
1✔
107
          StringWriter sw = new StringWriter();
1✔
108
          PrintWriter pw = new PrintWriter(sw, true);
1✔
109
          e.printStackTrace(pw);
1✔
110
          String stackTrace = sw.getBuffer().toString();
1✔
111
          log.error(
1✔
112
              String.format("Workflow thread \"%s\" run failed with Error:\n%s", name, stackTrace));
1✔
113
        }
114
        threadContext.setUnhandledException(e);
1✔
115
      } catch (CancellationException e) {
1✔
116
        if (!isCancelRequested()) {
1✔
117
          threadContext.setUnhandledException(e);
1✔
118
        }
119
        if (log.isDebugEnabled()) {
1✔
120
          log.debug(String.format("Workflow thread \"%s\" run cancelled", name));
×
121
        }
122
      } catch (Throwable e) {
1✔
123
        if (log.isWarnEnabled() && !root) {
1✔
124
          StringWriter sw = new StringWriter();
×
125
          PrintWriter pw = new PrintWriter(sw, true);
×
126
          e.printStackTrace(pw);
×
127
          String stackTrace = sw.getBuffer().toString();
×
128
          log.warn(
×
129
              String.format(
×
130
                  "Workflow thread \"%s\" run failed with unhandled exception:\n%s",
131
                  name, stackTrace));
132
        }
133
        threadContext.setUnhandledException(e);
1✔
134
      } finally {
135
        ContextThreadLocal.unsetCurrentContext();
1✔
136
        DeterministicRunnerImpl.setCurrentThreadInternal(null);
1✔
137
        threadContext.setStatus(Status.DONE);
1✔
138
        thread.setName(originalName);
1✔
139
        thread = null;
1✔
140
        MDC.clear();
1✔
141
      }
142
    }
1✔
143

144
    public String getName() {
145
      return name;
1✔
146
    }
147

148
    StackTraceElement[] getStackTrace() {
149
      if (thread != null) {
×
150
        return thread.getStackTrace();
×
151
      }
152
      return new StackTraceElement[0];
×
153
    }
154

155
    public void setName(String name) {
156
      this.name = name;
×
157
      if (thread != null) {
×
158
        thread.setName(name);
×
159
      }
160
    }
×
161
  }
162

163
  private static final Logger log = LoggerFactory.getLogger(WorkflowThreadImpl.class);
1✔
164

165
  private final boolean root;
166
  private final ExecutorService threadPool;
167
  private final WorkflowThreadContext context;
168
  private DeciderCache cache;
169
  private final DeterministicRunnerImpl runner;
170
  private final RunnableWrapper task;
171
  private Thread thread;
172
  private Future<?> taskFuture;
173
  private final Map<WorkflowThreadLocalInternal<?>, Object> threadLocalMap = new HashMap<>();
1✔
174

175
  /**
176
   * If not 0 then thread is blocked on a sleep (or on an operation with a timeout). The value is
177
   * the time in milliseconds (as in currentTimeMillis()) when thread will continue. Note that
178
   * thread still has to be called for evaluation as other threads might interrupt the blocking
179
   * call.
180
   */
181
  private long blockedUntil;
182

183
  WorkflowThreadImpl(
184
      boolean root,
185
      ExecutorService threadPool,
186
      DeterministicRunnerImpl runner,
187
      String name,
188
      boolean detached,
189
      CancellationScopeImpl parentCancellationScope,
190
      Runnable runnable,
191
      DeciderCache cache,
192
      List<ContextPropagator> contextPropagators,
193
      Map<String, Object> propagatedContexts) {
1✔
194
    this.root = root;
1✔
195
    this.threadPool = threadPool;
1✔
196
    this.runner = runner;
1✔
197
    this.context = new WorkflowThreadContext(runner.getLock());
1✔
198
    this.cache = cache;
1✔
199

200
    if (name == null) {
1✔
201
      name = "workflow-" + super.hashCode();
1✔
202
    }
203

204
    this.task =
1✔
205
        new RunnableWrapper(
206
            context,
207
            runner.getDecisionContext().getContext(),
1✔
208
            name,
209
            detached,
210
            parentCancellationScope,
211
            runnable,
212
            contextPropagators,
213
            propagatedContexts);
214
  }
1✔
215

216
  @Override
217
  public void run() {
218
    throw new UnsupportedOperationException("not used");
×
219
  }
220

221
  @Override
222
  public boolean isDetached() {
223
    return task.cancellationScope.isDetached();
×
224
  }
225

226
  @Override
227
  public void cancel() {
228
    task.cancellationScope.cancel();
×
229
  }
×
230

231
  @Override
232
  public void cancel(String reason) {
233
    task.cancellationScope.cancel(reason);
1✔
234
  }
1✔
235

236
  @Override
237
  public String getCancellationReason() {
238
    return task.cancellationScope.getCancellationReason();
×
239
  }
240

241
  @Override
242
  public boolean isCancelRequested() {
243
    return task.cancellationScope.isCancelRequested();
1✔
244
  }
245

246
  @Override
247
  public Promise<String> getCancellationRequest() {
248
    return task.cancellationScope.getCancellationRequest();
×
249
  }
250

251
  @Override
252
  public void start() {
253
    if (context.getStatus() != Status.CREATED) {
1✔
254
      throw new IllegalThreadStateException("already started");
×
255
    }
256
    context.setStatus(Status.RUNNING);
1✔
257

258
    if (metricsRateLimiter.tryAcquire(1)) {
1✔
259
      getDecisionContext()
1✔
260
          .getMetricsScope()
1✔
261
          .gauge(MetricsType.WORKFLOW_ACTIVE_THREAD_COUNT)
1✔
262
          .update(((ThreadPoolExecutor) threadPool).getActiveCount());
1✔
263
    }
264

265
    while (true) {
266
      try {
267
        taskFuture = threadPool.submit(task);
1✔
268
        return;
1✔
269
      } catch (RejectedExecutionException e) {
1✔
270
        getDecisionContext()
1✔
271
            .getMetricsScope()
1✔
272
            .counter(MetricsType.STICKY_CACHE_THREAD_FORCED_EVICTION)
1✔
273
            .inc(1);
1✔
274
        if (cache != null) {
1✔
275
          boolean evicted =
1✔
276
              cache.evictAnyNotInProcessing(
1✔
277
                  this.runner.getDecisionContext().getContext().getRunId());
1✔
278
          if (!evicted) {
1✔
279
            // Note here we need to throw error, not exception. Otherwise it will be
280
            // translated to workflow execution exception and instead of failing the
281
            // decision we will be failing the workflow.
282
            throw new WorkflowRejectedExecutionError(e);
×
283
          }
284
        } else {
1✔
285
          throw new WorkflowRejectedExecutionError(e);
1✔
286
        }
287
      }
1✔
288
    }
289
  }
290

291
  public WorkflowThreadContext getContext() {
292
    return context;
1✔
293
  }
294

295
  @Override
296
  public DeterministicRunnerImpl getRunner() {
297
    return runner;
1✔
298
  }
299

300
  @Override
301
  public SyncDecisionContext getDecisionContext() {
302
    return runner.getDecisionContext();
1✔
303
  }
304

305
  @Override
306
  public void setName(String name) {
307
    task.setName(name);
×
308
  }
×
309

310
  @Override
311
  public String getName() {
312
    return task.getName();
1✔
313
  }
314

315
  @Override
316
  public long getId() {
317
    return hashCode();
×
318
  }
319

320
  @Override
321
  public long getBlockedUntil() {
322
    return blockedUntil;
1✔
323
  }
324

325
  private void setBlockedUntil(long blockedUntil) {
326
    this.blockedUntil = blockedUntil;
1✔
327
  }
1✔
328

329
  /** @return true if coroutine made some progress. */
330
  @Override
331
  public boolean runUntilBlocked() {
332
    if (taskFuture == null) {
1✔
333
      start();
1✔
334
    }
335
    return context.runUntilBlocked();
1✔
336
  }
337

338
  @Override
339
  public boolean isDone() {
340
    return context.isDone();
1✔
341
  }
342

343
  public Thread.State getState() {
344
    if (context.getStatus() == Status.YIELDED) {
×
345
      return Thread.State.BLOCKED;
×
346
    } else if (context.getStatus() == Status.DONE) {
×
347
      return Thread.State.TERMINATED;
×
348
    } else {
349
      return Thread.State.RUNNABLE;
×
350
    }
351
  }
352

353
  @Override
354
  public Throwable getUnhandledException() {
355
    return context.getUnhandledException();
1✔
356
  }
357

358
  /**
359
   * Evaluates function in the threadContext of the coroutine without unblocking it. Used to get
360
   * current coroutine status, like stack trace.
361
   *
362
   * @param function Parameter is reason for current goroutine blockage.
363
   */
364
  public void evaluateInCoroutineContext(Consumer<String> function) {
365
    context.evaluateInCoroutineContext(function);
×
366
  }
×
367

368
  /**
369
   * Interrupt coroutine by throwing DestroyWorkflowThreadError from a await method it is blocked on
370
   * and return underlying Future to be waited on.
371
   */
372
  @Override
373
  public Future<?> stopNow() {
374
    // Cannot call destroy() on itself
375
    if (thread == Thread.currentThread()) {
1✔
376
      throw new Error("Cannot call destroy on itself: " + thread.getName());
×
377
    }
378
    context.destroy();
1✔
379
    if (!context.isDone()) {
1✔
380
      throw new RuntimeException(
×
381
          "Couldn't destroy the thread. " + "The blocked thread stack trace: " + getStackTrace());
×
382
    }
383
    if (taskFuture == null) {
1✔
384
      return getCompletedFuture();
1✔
385
    }
386
    return taskFuture;
1✔
387
  }
388

389
  private Future<?> getCompletedFuture() {
390
    CompletableFuture<String> f = new CompletableFuture<>();
1✔
391
    f.complete("done");
1✔
392
    return f;
1✔
393
  }
394

395
  @Override
396
  public void addStackTrace(StringBuilder result) {
397
    result.append(getName());
1✔
398
    if (thread == null) {
1✔
399
      result.append("(NEW)");
×
400
      return;
×
401
    }
402
    result.append(": (BLOCKED on ").append(getContext().getYieldReason()).append(")\n");
1✔
403
    // These numbers might change if implementation changes.
404
    int omitTop = 5;
1✔
405
    int omitBottom = 7;
1✔
406
    if (DeterministicRunnerImpl.WORKFLOW_ROOT_THREAD_NAME.equals(getName())) {
1✔
407
      omitBottom = 11;
1✔
408
    }
409
    StackTraceElement[] stackTrace = thread.getStackTrace();
1✔
410
    for (int i = omitTop; i < stackTrace.length - omitBottom; i++) {
1✔
411
      StackTraceElement e = stackTrace[i];
1✔
412
      if (i == omitTop && "await".equals(e.getMethodName())) continue;
1✔
413
      result.append(e);
1✔
414
      result.append("\n");
1✔
415
    }
416
  }
1✔
417

418
  @Override
419
  public void yield(String reason, Supplier<Boolean> unblockCondition) {
420
    context.yield(reason, unblockCondition);
1✔
421
  }
1✔
422

423
  @Override
424
  public boolean yield(long timeoutMillis, String reason, Supplier<Boolean> unblockCondition)
425
      throws DestroyWorkflowThreadError {
426
    if (timeoutMillis == 0) {
1✔
427
      return unblockCondition.get();
1✔
428
    }
429
    long blockedUntil = WorkflowInternal.currentTimeMillis() + timeoutMillis;
1✔
430
    setBlockedUntil(blockedUntil);
1✔
431
    YieldWithTimeoutCondition condition =
1✔
432
        new YieldWithTimeoutCondition(unblockCondition, blockedUntil);
433
    WorkflowThread.await(reason, condition);
1✔
434
    return !condition.isTimedOut();
1✔
435
  }
436

437
  @Override
438
  public <R> void exitThread(R value) {
439
    runner.exit(value);
1✔
440
    throw new DestroyWorkflowThreadError("exit");
1✔
441
  }
442

443
  @Override
444
  public <T> void setThreadLocal(WorkflowThreadLocalInternal<T> key, T value) {
445
    threadLocalMap.put(key, value);
1✔
446
  }
1✔
447

448
  @SuppressWarnings("unchecked")
449
  @Override
450
  public <T> Optional<T> getThreadLocal(WorkflowThreadLocalInternal<T> key) {
451
    if (!threadLocalMap.containsKey(key)) {
1✔
452
      return Optional.empty();
1✔
453
    }
454
    return Optional.of((T) threadLocalMap.get(key));
1✔
455
  }
456

457
  /** @return stack trace of the coroutine thread */
458
  @Override
459
  public String getStackTrace() {
460
    StackTraceElement[] st = task.getStackTrace();
×
461
    StringWriter sw = new StringWriter();
×
462
    PrintWriter pw = new PrintWriter(sw);
×
463
    for (StackTraceElement se : st) {
×
464
      pw.println("\tat " + se);
×
465
    }
466
    return sw.toString();
×
467
  }
468

469
  static class YieldWithTimeoutCondition implements Supplier<Boolean> {
470

471
    private final Supplier<Boolean> unblockCondition;
472
    private final long blockedUntil;
473
    private boolean timedOut;
474

475
    YieldWithTimeoutCondition(Supplier<Boolean> unblockCondition, long blockedUntil) {
1✔
476
      this.unblockCondition = unblockCondition;
1✔
477
      this.blockedUntil = blockedUntil;
1✔
478
    }
1✔
479

480
    boolean isTimedOut() {
481
      return timedOut;
1✔
482
    }
483

484
    /** @return true if condition matched or timed out */
485
    @Override
486
    public Boolean get() {
487
      boolean result = unblockCondition.get();
1✔
488
      if (result) {
1✔
489
        return true;
1✔
490
      }
491
      timedOut = WorkflowInternal.currentTimeMillis() >= blockedUntil;
1✔
492
      return timedOut;
1✔
493
    }
494
  }
495
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc