• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

uber / cadence-java-client / 2490

13 Aug 2024 05:39PM CUT coverage: 61.99% (-0.03%) from 62.021%
2490

push

buildkite

web-flow
Removing fossa as it is migrated to snyk (#919)

12090 of 19503 relevant lines covered (61.99%)

0.62 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

81.58
/src/main/java/com/uber/cadence/internal/sync/WorkflowThreadContext.java
1
/*
2
 *  Copyright 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved.
3
 *
4
 *  Modifications copyright (C) 2017 Uber Technologies, Inc.
5
 *
6
 *  Licensed under the Apache License, Version 2.0 (the "License"). You may not
7
 *  use this file except in compliance with the License. A copy of the License is
8
 *  located at
9
 *
10
 *  http://aws.amazon.com/apache2.0
11
 *
12
 *  or in the "license" file accompanying this file. This file is distributed on
13
 *  an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
14
 *  express or implied. See the License for the specific language governing
15
 *  permissions and limitations under the License.
16
 */
17

18
package com.uber.cadence.internal.sync;
19

20
import java.util.concurrent.locks.Condition;
21
import java.util.concurrent.locks.Lock;
22
import java.util.function.Consumer;
23
import java.util.function.Supplier;
24

25
class WorkflowThreadContext {
26

27
  // Shared runner lock
28
  private final Lock lock;
29
  // Used to block await call
30
  private final Condition yieldCondition;
31
  // Used to block runUntilBlocked call
32
  private final Condition runCondition;
33
  // Used to block evaluateInCoroutineContext
34
  private final Condition evaluationCondition;
35

36
  private Status status = Status.CREATED;
1✔
37
  private Consumer<String> evaluationFunction;
38
  private Throwable unhandledException;
39
  private boolean inRunUntilBlocked;
40
  private boolean remainedBlocked;
41
  private String yieldReason;
42
  private boolean destroyRequested;
43

44
  WorkflowThreadContext(Lock lock) {
1✔
45
    this.lock = lock;
1✔
46
    this.yieldCondition = lock.newCondition();
1✔
47
    this.runCondition = lock.newCondition();
1✔
48
    this.evaluationCondition = lock.newCondition();
1✔
49
  }
1✔
50

51
  public void initialYield() {
52
    Status status = getStatus();
1✔
53
    if (status == Status.DONE) {
1✔
54
      throw new DestroyWorkflowThreadError("done in initialYield");
×
55
    }
56
    if (status != Status.RUNNING) {
1✔
57
      throw new IllegalStateException("not in RUNNING but in " + status + " state");
×
58
    }
59
    yield("created", () -> true);
1✔
60
  }
1✔
61

62
  public void yield(String reason, Supplier<Boolean> unblockFunction) {
63
    if (unblockFunction == null) {
1✔
64
      throw new IllegalArgumentException("null unblockFunction");
×
65
    }
66
    // Evaluates unblockFunction out of the lock to avoid deadlocks.
67
    lock.lock();
1✔
68
    try {
69
      // TODO: Verify that calling unblockFunction under the lock is a sane thing to do.
70
      while (!inRunUntilBlocked || !unblockFunction.get()) {
1✔
71
        if (destroyRequested) {
1✔
72
          throw new DestroyWorkflowThreadError();
×
73
        }
74
        status = Status.YIELDED;
1✔
75
        runCondition.signal();
1✔
76
        yieldCondition.await();
1✔
77
        mayBeEvaluate(reason);
1✔
78
        yieldReason = reason;
1✔
79
      }
80
    } catch (InterruptedException e) {
×
81
      // Throwing Error in workflow code aborts decision without failing workflow.
82
      throw new Error("Unexpected interrupt", e);
×
83
    } finally {
84
      setStatus(Status.RUNNING);
1✔
85
      remainedBlocked = false;
1✔
86
      yieldReason = null;
1✔
87
      lock.unlock();
1✔
88
    }
89
  }
1✔
90

91
  /**
92
   * Execute evaluation function by the thread that owns this context if {@link
93
   * #evaluateInCoroutineContext(Consumer)} was called.
94
   *
95
   * @param reason human readable reason for current thread blockage passed to await call.
96
   */
97
  private void mayBeEvaluate(String reason) {
98
    if (status == Status.EVALUATING) {
1✔
99
      try {
100
        evaluationFunction.accept(reason);
×
101
      } catch (Exception e) {
×
102
        evaluationFunction.accept(e.toString());
×
103
      } finally {
104
        status = Status.YIELDED;
1✔
105
        evaluationCondition.signal();
1✔
106
      }
107
    }
108
  }
1✔
109

110
  /**
111
   * Call function by the thread that owns this context and is currently blocked in a await. Used to
112
   * get information about current state of the thread like current stack trace.
113
   *
114
   * @param function to evaluate. Consumes reason for yielding parameter.
115
   */
116
  public void evaluateInCoroutineContext(Consumer<String> function) {
117
    lock.lock();
1✔
118
    try {
119
      if (function == null) {
1✔
120
        throw new IllegalArgumentException("null function");
×
121
      }
122
      if (status != Status.YIELDED && status != Status.RUNNING) {
1✔
123
        throw new IllegalStateException("Not in yielded status: " + status);
×
124
      }
125
      if (evaluationFunction != null) {
1✔
126
        throw new IllegalStateException("Already evaluating");
×
127
      }
128
      if (inRunUntilBlocked) {
1✔
129
        throw new IllegalStateException("Running runUntilBlocked");
×
130
      }
131
      evaluationFunction = function;
1✔
132
      status = Status.EVALUATING;
1✔
133
      yieldCondition.signal();
1✔
134
      while (status == Status.EVALUATING) {
1✔
135
        evaluationCondition.await();
1✔
136
      }
137
    } catch (InterruptedException e) {
×
138
      throw new Error("Unexpected interrupt", e);
×
139
    } finally {
140
      evaluationFunction = null;
1✔
141
      lock.unlock();
1✔
142
    }
143
  }
1✔
144

145
  public Status getStatus() {
146
    lock.lock();
1✔
147
    try {
148
      return status;
1✔
149
    } finally {
150
      lock.unlock();
1✔
151
    }
152
  }
153

154
  public void setStatus(Status status) {
155
    // Unblock runUntilBlocked if thread exited instead of yielding.
156
    lock.lock();
1✔
157
    try {
158
      this.status = status;
1✔
159
      if (isDone()) {
1✔
160
        runCondition.signal();
1✔
161
      }
162
    } finally {
163
      lock.unlock();
1✔
164
    }
165
  }
1✔
166

167
  public boolean isDone() {
168
    lock.lock();
1✔
169
    try {
170
      return status == Status.DONE;
1✔
171
    } finally {
172
      lock.unlock();
1✔
173
    }
174
  }
175

176
  public Throwable getUnhandledException() {
177
    lock.lock();
1✔
178
    try {
179
      return unhandledException;
1✔
180
    } finally {
181
      lock.unlock();
1✔
182
    }
183
  }
184

185
  public void setUnhandledException(Throwable unhandledException) {
186
    lock.lock();
1✔
187
    try {
188
      this.unhandledException = unhandledException;
1✔
189
    } finally {
190
      lock.unlock();
1✔
191
    }
192
  }
1✔
193

194
  public String getYieldReason() {
195
    return yieldReason;
1✔
196
  }
197

198
  /**
199
   * @return true if thread made some progress. Which is await was unblocked and some code after it
200
   *     was executed.
201
   */
202
  public boolean runUntilBlocked() {
203
    lock.lock();
1✔
204
    try {
205
      if (status == Status.DONE) {
1✔
206
        return false;
1✔
207
      }
208
      if (evaluationFunction != null) {
1✔
209
        throw new IllegalStateException("Cannot runUntilBlocked while evaluating");
×
210
      }
211
      inRunUntilBlocked = true;
1✔
212
      if (status != Status.CREATED) {
1✔
213
        status = Status.RUNNING;
1✔
214
      }
215
      remainedBlocked = true;
1✔
216
      yieldCondition.signal();
1✔
217
      while (status == Status.RUNNING || status == Status.CREATED) {
1✔
218
        runCondition.await();
1✔
219
        if (evaluationFunction != null) {
1✔
220
          throw new IllegalStateException("Cannot runUntilBlocked while evaluating");
×
221
        }
222
      }
223
      return !remainedBlocked;
1✔
224
    } catch (InterruptedException e) {
1✔
225
      throw new Error("Unexpected interrupt", e);
1✔
226
    } finally {
227
      inRunUntilBlocked = false;
1✔
228
      lock.unlock();
1✔
229
    }
230
  }
231

232
  public boolean isDestroyRequested() {
233
    lock.lock();
1✔
234
    try {
235
      return destroyRequested;
1✔
236
    } finally {
237
      lock.unlock();
1✔
238
    }
239
  }
240

241
  public void destroy() {
242
    lock.lock();
1✔
243
    try {
244
      destroyRequested = true;
1✔
245
      if (status == Status.CREATED || status == Status.RUNNING || status == Status.DONE) {
1✔
246
        status = Status.DONE;
1✔
247
        return;
1✔
248
      }
249
    } finally {
250
      lock.unlock();
1✔
251
    }
252
    evaluateInCoroutineContext(
1✔
253
        (r) -> {
254
          throw new DestroyWorkflowThreadError();
1✔
255
        });
256
    runUntilBlocked();
1✔
257
  }
1✔
258

259
  /** To be called only from a workflow thread. */
260
  public void exit() {
261
    lock.lock();
×
262
    try {
263
      destroyRequested = true;
×
264
    } finally {
265
      lock.unlock();
×
266
    }
267
    throw new DestroyWorkflowThreadError();
×
268
  }
269
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc