• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

uber / cadence-java-client / 2409

03 Jul 2024 08:33PM CUT coverage: 61.467% (-0.05%) from 61.518%
2409

push

buildkite

web-flow
Avoid consuming ByteBuffers (#913)

A ByteBuffer is a pointer to a byte[] with a starting position, a current position, and a limit. Any function that reads from its contents updates the current position. Both TracingPropagator and WorkflowUtils copy the entirety of its contents, and in doing so they mutate the current position. WorkflowUtils resets it afterwards but this still isn't thread-safe as another thread may be trying to read it.

By duplicating the ByteBuffer (copying only the metadata, not the actual contents) we avoid modifying it. It doesn't seem likely that there's real impact in either of these cases beyond unit tests, where these ByteBuffers stick around in the workflow history and are repeatedly serialized/deserialized. Modifying them during serialization can create test flakiness as that can trigger exceptions.

2 of 2 new or added lines in 2 files covered. (100.0%)

10 existing lines in 4 files now uncovered.

11972 of 19477 relevant lines covered (61.47%)

0.61 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

81.58
/src/main/java/com/uber/cadence/internal/sync/WorkflowThreadContext.java
1
/*
2
 *  Copyright 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved.
3
 *
4
 *  Modifications copyright (C) 2017 Uber Technologies, Inc.
5
 *
6
 *  Licensed under the Apache License, Version 2.0 (the "License"). You may not
7
 *  use this file except in compliance with the License. A copy of the License is
8
 *  located at
9
 *
10
 *  http://aws.amazon.com/apache2.0
11
 *
12
 *  or in the "license" file accompanying this file. This file is distributed on
13
 *  an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
14
 *  express or implied. See the License for the specific language governing
15
 *  permissions and limitations under the License.
16
 */
17

18
package com.uber.cadence.internal.sync;
19

20
import java.util.concurrent.locks.Condition;
21
import java.util.concurrent.locks.Lock;
22
import java.util.function.Consumer;
23
import java.util.function.Supplier;
24

25
class WorkflowThreadContext {
26

27
  // Shared runner lock
28
  private final Lock lock;
29
  // Used to block await call
30
  private final Condition yieldCondition;
31
  // Used to block runUntilBlocked call
32
  private final Condition runCondition;
33
  // Used to block evaluateInCoroutineContext
34
  private final Condition evaluationCondition;
35

36
  private Status status = Status.CREATED;
1✔
37
  private Consumer<String> evaluationFunction;
38
  private Throwable unhandledException;
39
  private boolean inRunUntilBlocked;
40
  private boolean remainedBlocked;
41
  private String yieldReason;
42
  private boolean destroyRequested;
43

44
  WorkflowThreadContext(Lock lock) {
1✔
45
    this.lock = lock;
1✔
46
    this.yieldCondition = lock.newCondition();
1✔
47
    this.runCondition = lock.newCondition();
1✔
48
    this.evaluationCondition = lock.newCondition();
1✔
49
  }
1✔
50

51
  public void initialYield() {
52
    Status status = getStatus();
1✔
53
    if (status == Status.DONE) {
1✔
UNCOV
54
      throw new DestroyWorkflowThreadError("done in initialYield");
×
55
    }
56
    if (status != Status.RUNNING) {
1✔
57
      throw new IllegalStateException("not in RUNNING but in " + status + " state");
×
58
    }
59
    yield("created", () -> true);
1✔
60
  }
1✔
61

62
  public void yield(String reason, Supplier<Boolean> unblockFunction) {
63
    if (unblockFunction == null) {
1✔
64
      throw new IllegalArgumentException("null unblockFunction");
×
65
    }
66
    // Evaluates unblockFunction out of the lock to avoid deadlocks.
67
    lock.lock();
1✔
68
    try {
69
      // TODO: Verify that calling unblockFunction under the lock is a sane thing to do.
70
      while (!inRunUntilBlocked || !unblockFunction.get()) {
1✔
71
        if (destroyRequested) {
1✔
72
          throw new DestroyWorkflowThreadError();
×
73
        }
74
        status = Status.YIELDED;
1✔
75
        runCondition.signal();
1✔
76
        yieldCondition.await();
1✔
77
        mayBeEvaluate(reason);
1✔
78
        yieldReason = reason;
1✔
79
      }
80
    } catch (InterruptedException e) {
×
81
      // Throwing Error in workflow code aborts decision without failing workflow.
82
      throw new Error("Unexpected interrupt", e);
×
83
    } finally {
84
      setStatus(Status.RUNNING);
1✔
85
      remainedBlocked = false;
1✔
86
      yieldReason = null;
1✔
87
      lock.unlock();
1✔
88
    }
89
  }
1✔
90

91
  /**
92
   * Execute evaluation function by the thread that owns this context if {@link
93
   * #evaluateInCoroutineContext(Consumer)} was called.
94
   *
95
   * @param reason human readable reason for current thread blockage passed to await call.
96
   */
97
  private void mayBeEvaluate(String reason) {
98
    if (status == Status.EVALUATING) {
1✔
99
      try {
100
        evaluationFunction.accept(reason);
×
101
      } catch (Exception e) {
×
102
        evaluationFunction.accept(e.toString());
×
103
      } finally {
104
        status = Status.YIELDED;
1✔
105
        evaluationCondition.signal();
1✔
106
      }
107
    }
108
  }
1✔
109

110
  /**
111
   * Call function by the thread that owns this context and is currently blocked in a await. Used to
112
   * get information about current state of the thread like current stack trace.
113
   *
114
   * @param function to evaluate. Consumes reason for yielding parameter.
115
   */
116
  public void evaluateInCoroutineContext(Consumer<String> function) {
117
    lock.lock();
1✔
118
    try {
119
      if (function == null) {
1✔
120
        throw new IllegalArgumentException("null function");
×
121
      }
122
      if (status != Status.YIELDED && status != Status.RUNNING) {
1✔
123
        throw new IllegalStateException("Not in yielded status: " + status);
×
124
      }
125
      if (evaluationFunction != null) {
1✔
126
        throw new IllegalStateException("Already evaluating");
×
127
      }
128
      if (inRunUntilBlocked) {
1✔
129
        throw new IllegalStateException("Running runUntilBlocked");
×
130
      }
131
      evaluationFunction = function;
1✔
132
      status = Status.EVALUATING;
1✔
133
      yieldCondition.signal();
1✔
134
      while (status == Status.EVALUATING) {
1✔
135
        evaluationCondition.await();
1✔
136
      }
137
    } catch (InterruptedException e) {
×
138
      throw new Error("Unexpected interrupt", e);
×
139
    } finally {
140
      evaluationFunction = null;
1✔
141
      lock.unlock();
1✔
142
    }
143
  }
1✔
144

145
  public Status getStatus() {
146
    lock.lock();
1✔
147
    try {
148
      return status;
1✔
149
    } finally {
150
      lock.unlock();
1✔
151
    }
152
  }
153

154
  public void setStatus(Status status) {
155
    // Unblock runUntilBlocked if thread exited instead of yielding.
156
    lock.lock();
1✔
157
    try {
158
      this.status = status;
1✔
159
      if (isDone()) {
1✔
160
        runCondition.signal();
1✔
161
      }
162
    } finally {
163
      lock.unlock();
1✔
164
    }
165
  }
1✔
166

167
  public boolean isDone() {
168
    lock.lock();
1✔
169
    try {
170
      return status == Status.DONE;
1✔
171
    } finally {
172
      lock.unlock();
1✔
173
    }
174
  }
175

176
  public Throwable getUnhandledException() {
177
    lock.lock();
1✔
178
    try {
179
      return unhandledException;
1✔
180
    } finally {
181
      lock.unlock();
1✔
182
    }
183
  }
184

185
  public void setUnhandledException(Throwable unhandledException) {
186
    lock.lock();
1✔
187
    try {
188
      this.unhandledException = unhandledException;
1✔
189
    } finally {
190
      lock.unlock();
1✔
191
    }
192
  }
1✔
193

194
  public String getYieldReason() {
195
    return yieldReason;
1✔
196
  }
197

198
  /**
199
   * @return true if thread made some progress. Which is await was unblocked and some code after it
200
   *     was executed.
201
   */
202
  public boolean runUntilBlocked() {
203
    lock.lock();
1✔
204
    try {
205
      if (status == Status.DONE) {
1✔
206
        return false;
1✔
207
      }
208
      if (evaluationFunction != null) {
1✔
209
        throw new IllegalStateException("Cannot runUntilBlocked while evaluating");
×
210
      }
211
      inRunUntilBlocked = true;
1✔
212
      if (status != Status.CREATED) {
1✔
213
        status = Status.RUNNING;
1✔
214
      }
215
      remainedBlocked = true;
1✔
216
      yieldCondition.signal();
1✔
217
      while (status == Status.RUNNING || status == Status.CREATED) {
1✔
218
        runCondition.await();
1✔
219
        if (evaluationFunction != null) {
1✔
220
          throw new IllegalStateException("Cannot runUntilBlocked while evaluating");
×
221
        }
222
      }
223
      return !remainedBlocked;
1✔
224
    } catch (InterruptedException e) {
1✔
225
      throw new Error("Unexpected interrupt", e);
1✔
226
    } finally {
227
      inRunUntilBlocked = false;
1✔
228
      lock.unlock();
1✔
229
    }
230
  }
231

232
  public boolean isDestroyRequested() {
233
    lock.lock();
1✔
234
    try {
235
      return destroyRequested;
1✔
236
    } finally {
237
      lock.unlock();
1✔
238
    }
239
  }
240

241
  public void destroy() {
242
    lock.lock();
1✔
243
    try {
244
      destroyRequested = true;
1✔
245
      if (status == Status.CREATED || status == Status.RUNNING || status == Status.DONE) {
1✔
246
        status = Status.DONE;
1✔
247
        return;
1✔
248
      }
249
    } finally {
250
      lock.unlock();
1✔
251
    }
252
    evaluateInCoroutineContext(
1✔
253
        (r) -> {
254
          throw new DestroyWorkflowThreadError();
1✔
255
        });
256
    runUntilBlocked();
1✔
257
  }
1✔
258

259
  /** To be called only from a workflow thread. */
260
  public void exit() {
261
    lock.lock();
×
262
    try {
263
      destroyRequested = true;
×
264
    } finally {
265
      lock.unlock();
×
266
    }
267
    throw new DestroyWorkflowThreadError();
×
268
  }
269
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc