• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

uber / cadence-java-client / 2409

03 Jul 2024 08:33PM CUT coverage: 61.467% (-0.05%) from 61.518%
2409

push

buildkite

web-flow
Avoid consuming ByteBuffers (#913)

A ByteBuffer is a pointer to a byte[] with a starting position, a current position, and a limit. Any function that reads from its contents updates the current position. Both TracingPropagator and WorkflowUtils copy the entirety of its contents, and in doing so they mutate the current position. WorkflowUtils resets it afterwards but this still isn't thread-safe as another thread may be trying to read it.

By duplicating the ByteBuffer (copying only the metadata, not the actual contents) we avoid modifying it. It doesn't seem likely that there's real impact in either of these cases beyond unit tests, where these ByteBuffers stick around in the workflow history and are repeatedly serialized/deserialized. Modifying them during serialization can create test flakiness as that can trigger exceptions.

2 of 2 new or added lines in 2 files covered. (100.0%)

10 existing lines in 4 files now uncovered.

11972 of 19477 relevant lines covered (61.47%)

0.61 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

92.65
/src/main/java/com/uber/cadence/internal/sync/CancellationScopeImpl.java
1
/*
2
 *  Copyright 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved.
3
 *
4
 *  Modifications copyright (C) 2017 Uber Technologies, Inc.
5
 *
6
 *  Licensed under the Apache License, Version 2.0 (the "License"). You may not
7
 *  use this file except in compliance with the License. A copy of the License is
8
 *  located at
9
 *
10
 *  http://aws.amazon.com/apache2.0
11
 *
12
 *  or in the "license" file accompanying this file. This file is distributed on
13
 *  an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
14
 *  express or implied. See the License for the specific language governing
15
 *  permissions and limitations under the License.
16
 */
17

18
package com.uber.cadence.internal.sync;
19

20
import com.uber.cadence.workflow.CancellationScope;
21
import com.uber.cadence.workflow.CompletablePromise;
22
import com.uber.cadence.workflow.Functions;
23
import com.uber.cadence.workflow.Workflow;
24
import java.util.ArrayDeque;
25
import java.util.Deque;
26
import java.util.HashSet;
27
import java.util.Set;
28

29
class CancellationScopeImpl implements CancellationScope {
30

31
  private static ThreadLocal<Deque<CancellationScopeImpl>> scopeStack =
1✔
32
      ThreadLocal.withInitial(ArrayDeque::new);
1✔
33
  private boolean detached;
34
  private CompletablePromise<String> cancellationPromise;
35

36
  static CancellationScopeImpl current() {
37
    if (scopeStack.get().isEmpty()) {
1✔
38
      throw new IllegalStateException("Cannot be called by non workflow thread");
×
39
    }
40
    return scopeStack.get().peekFirst();
1✔
41
  }
42

43
  private static void pushCurrent(CancellationScopeImpl scope) {
44
    scopeStack.get().addFirst(scope);
1✔
45
  }
1✔
46

47
  private static void popCurrent(CancellationScopeImpl expected) {
48
    CancellationScopeImpl current = scopeStack.get().pollFirst();
1✔
49
    if (current != expected) {
1✔
50
      throw new Error("Unexpected scope");
×
51
    }
52
    if (!current.detached) {
1✔
53
      current.parent.removeChild(current);
1✔
54
    }
55
  }
1✔
56

57
  private final Runnable runnable;
58
  private CancellationScopeImpl parent;
59
  private final Set<CancellationScopeImpl> children = new HashSet<>();
1✔
60
  /**
61
   * When disconnected scope has no parent and thus doesn't receive cancellation requests from it.
62
   */
63
  private boolean cancelRequested;
64

65
  private String reason;
66

67
  CancellationScopeImpl(boolean ignoreParentCancellation, Runnable runnable) {
68
    this(ignoreParentCancellation, runnable, current());
1✔
69
  }
1✔
70

71
  CancellationScopeImpl(boolean detached, Runnable runnable, CancellationScopeImpl parent) {
1✔
72
    this.detached = detached;
1✔
73
    this.runnable = runnable;
1✔
74
    setParent(parent);
1✔
75
  }
1✔
76

77
  public CancellationScopeImpl(
78
      boolean ignoreParentCancellation, Functions.Proc1<CancellationScope> proc) {
1✔
79
    this.detached = ignoreParentCancellation;
1✔
80
    this.runnable = () -> proc.apply(this);
1✔
81
    setParent(current());
1✔
82
  }
1✔
83

84
  private void setParent(CancellationScopeImpl parent) {
85
    if (parent == null) {
1✔
86
      detached = true;
1✔
87
      return;
1✔
88
    }
89
    if (!detached) {
1✔
90
      this.parent = parent;
1✔
91
      parent.addChild(this);
1✔
92
      if (parent.isCancelRequested()) {
1✔
UNCOV
93
        cancel(parent.getCancellationReason());
×
94
      }
95
    }
96
  }
1✔
97

98
  @Override
99
  public void run() {
100
    try {
101
      pushCurrent(this);
1✔
102
      runnable.run();
1✔
103
    } finally {
104
      popCurrent(this);
1✔
105
    }
106
  }
1✔
107

108
  @Override
109
  public boolean isDetached() {
110
    return detached;
×
111
  }
112

113
  @Override
114
  public void cancel() {
115
    cancelRequested = true;
1✔
116
    reason = null;
1✔
117
    for (CancellationScopeImpl child : children) {
1✔
118
      child.cancel();
1✔
119
    }
1✔
120
    if (cancellationPromise != null) {
1✔
121
      cancellationPromise.complete(null);
1✔
122
    }
123
  }
1✔
124

125
  @Override
126
  public void cancel(String reason) {
127
    cancelRequested = true;
1✔
128
    this.reason = reason;
1✔
129
    for (CancellationScopeImpl child : children) {
1✔
130
      child.cancel(reason);
1✔
131
    }
1✔
132
    if (cancellationPromise != null) {
1✔
133
      cancellationPromise.complete(reason);
1✔
134
    }
135
  }
1✔
136

137
  @Override
138
  public String getCancellationReason() {
139
    return reason;
1✔
140
  }
141

142
  @Override
143
  public boolean isCancelRequested() {
144
    return cancelRequested;
1✔
145
  }
146

147
  @Override
148
  public CompletablePromise<String> getCancellationRequest() {
149
    if (cancellationPromise == null) {
1✔
150
      cancellationPromise = Workflow.newPromise();
1✔
151
      if (isCancelRequested()) {
1✔
152
        cancellationPromise.complete(getCancellationReason());
1✔
153
      }
154
    }
155
    return cancellationPromise;
1✔
156
  }
157

158
  private void addChild(CancellationScopeImpl scope) {
159
    children.add(scope);
1✔
160
  }
1✔
161

162
  private void removeChild(CancellationScopeImpl scope) {
163
    if (!children.remove(scope)) {
1✔
164
      throw new Error("Not a child");
×
165
    }
166
  }
1✔
167
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc