• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

uber / cadence-java-client / 2409

03 Jul 2024 08:33PM CUT coverage: 61.467% (-0.05%) from 61.518%
2409

push

buildkite

web-flow
Avoid consuming ByteBuffers (#913)

A ByteBuffer is a pointer to a byte[] with a starting position, a current position, and a limit. Any function that reads from its contents updates the current position. Both TracingPropagator and WorkflowUtils copy the entirety of its contents, and in doing so they mutate the current position. WorkflowUtils resets it afterwards but this still isn't thread-safe as another thread may be trying to read it.

By duplicating the ByteBuffer (copying only the metadata, not the actual contents) we avoid modifying it. It doesn't seem likely that there's real impact in either of these cases beyond unit tests, where these ByteBuffers stick around in the workflow history and are repeatedly serialized/deserialized. Modifying them during serialization can create test flakiness as that can trigger exceptions.

2 of 2 new or added lines in 2 files covered. (100.0%)

10 existing lines in 4 files now uncovered.

11972 of 19477 relevant lines covered (61.47%)

0.61 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

89.72
/src/main/java/com/uber/cadence/internal/replay/ActivityDecisionContext.java
1
/*
2
 *  Copyright 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved.
3
 *
4
 *  Modifications copyright (C) 2017 Uber Technologies, Inc.
5
 *
6
 *  Licensed under the Apache License, Version 2.0 (the "License"). You may not
7
 *  use this file except in compliance with the License. A copy of the License is
8
 *  located at
9
 *
10
 *  http://aws.amazon.com/apache2.0
11
 *
12
 *  or in the "license" file accompanying this file. This file is distributed on
13
 *  an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
14
 *  express or implied. See the License for the specific language governing
15
 *  permissions and limitations under the License.
16
 */
17

18
package com.uber.cadence.internal.replay;
19

20
import com.uber.cadence.ActivityTaskCanceledEventAttributes;
21
import com.uber.cadence.ActivityTaskCompletedEventAttributes;
22
import com.uber.cadence.ActivityTaskFailedEventAttributes;
23
import com.uber.cadence.ActivityTaskTimedOutEventAttributes;
24
import com.uber.cadence.ActivityType;
25
import com.uber.cadence.Header;
26
import com.uber.cadence.HistoryEvent;
27
import com.uber.cadence.ScheduleActivityTaskDecisionAttributes;
28
import com.uber.cadence.TaskList;
29
import com.uber.cadence.TimeoutType;
30
import com.uber.cadence.internal.common.RetryParameters;
31
import java.nio.ByteBuffer;
32
import java.util.HashMap;
33
import java.util.Map;
34
import java.util.concurrent.CancellationException;
35
import java.util.function.BiConsumer;
36
import java.util.function.Consumer;
37

38
final class ActivityDecisionContext {
39

40
  private final class ActivityCancellationHandler implements Consumer<Exception> {
41

42
    private final long scheduledEventId;
43

44
    private final String activityId;
45

46
    private final BiConsumer<byte[], Exception> callback;
47

48
    private ActivityCancellationHandler(
49
        long scheduledEventId, String activityId, BiConsumer<byte[], Exception> callaback) {
1✔
50
      this.scheduledEventId = scheduledEventId;
1✔
51
      this.activityId = activityId;
1✔
52
      this.callback = callaback;
1✔
53
    }
1✔
54

55
    @Override
56
    public void accept(Exception cause) {
57
      if (!scheduledActivities.containsKey(scheduledEventId)) {
1✔
58
        // Cancellation handlers are not deregistered. So they fire after an activity completion.
59
        return;
×
60
      }
61
      decisions.requestCancelActivityTask(
1✔
62
          scheduledEventId,
63
          () -> {
UNCOV
64
            OpenRequestInfo<byte[], ActivityType> scheduled =
×
UNCOV
65
                scheduledActivities.remove(scheduledEventId);
×
UNCOV
66
            if (scheduled == null) {
×
67
              throw new IllegalArgumentException(
×
68
                  String.format(
×
69
                      "Activity with activityId=%s and scheduledEventId=%d wasn't found",
70
                      activityId, scheduledEventId));
×
71
            }
UNCOV
72
            callback.accept(null, new CancellationException("Cancelled by request"));
×
UNCOV
73
          });
×
74
    }
1✔
75
  }
76

77
  private final DecisionsHelper decisions;
78

79
  // key is scheduledEventId
80
  private final Map<Long, OpenRequestInfo<byte[], ActivityType>> scheduledActivities =
1✔
81
      new HashMap<>();
82

83
  ActivityDecisionContext(DecisionsHelper decisions) {
1✔
84
    this.decisions = decisions;
1✔
85
  }
1✔
86

87
  public boolean isActivityScheduledWithRetryOptions() {
88
    return decisions.isActivityScheduledWithRetryOptions();
1✔
89
  }
90

91
  Consumer<Exception> scheduleActivityTask(
92
      ExecuteActivityParameters parameters, BiConsumer<byte[], Exception> callback) {
93
    final OpenRequestInfo<byte[], ActivityType> context =
1✔
94
        new OpenRequestInfo<>(parameters.getActivityType());
1✔
95
    final ScheduleActivityTaskDecisionAttributes attributes =
1✔
96
        new ScheduleActivityTaskDecisionAttributes();
97
    attributes.setActivityType(parameters.getActivityType());
1✔
98
    attributes.setInput(parameters.getInput());
1✔
99
    if (parameters.getHeartbeatTimeoutSeconds() > 0) {
1✔
100
      attributes.setHeartbeatTimeoutSeconds((int) parameters.getHeartbeatTimeoutSeconds());
1✔
101
    }
102
    attributes.setScheduleToCloseTimeoutSeconds(
1✔
103
        (int) parameters.getScheduleToCloseTimeoutSeconds());
1✔
104
    attributes.setScheduleToStartTimeoutSeconds(
1✔
105
        (int) parameters.getScheduleToStartTimeoutSeconds());
1✔
106
    attributes.setStartToCloseTimeoutSeconds((int) parameters.getStartToCloseTimeoutSeconds());
1✔
107

108
    // attributes.setTaskPriority(InternalUtils.taskPriorityToString(parameters.getTaskPriority()));
109
    String activityId = parameters.getActivityId();
1✔
110
    if (activityId == null) {
1✔
111
      activityId = String.valueOf(decisions.getAndIncrementNextId());
1✔
112
    }
113
    attributes.setActivityId(activityId);
1✔
114

115
    String taskList = parameters.getTaskList();
1✔
116
    if (taskList != null && !taskList.isEmpty()) {
1✔
117
      TaskList tl = new TaskList();
1✔
118
      tl.setName(taskList);
1✔
119
      attributes.setTaskList(tl);
1✔
120
    }
121
    RetryParameters retryParameters = parameters.getRetryParameters();
1✔
122
    if (retryParameters != null) {
1✔
123
      attributes.setRetryPolicy(retryParameters.toRetryPolicy());
1✔
124
    }
125

126
    attributes.setHeader(toHeaderThrift(parameters.getContext()));
1✔
127

128
    long scheduledEventId = decisions.scheduleActivityTask(attributes);
1✔
129
    context.setCompletionHandle(callback);
1✔
130
    scheduledActivities.put(scheduledEventId, context);
1✔
131
    return new ActivityDecisionContext.ActivityCancellationHandler(
1✔
132
        scheduledEventId, attributes.getActivityId(), callback);
1✔
133
  }
134

135
  void handleActivityTaskCanceled(HistoryEvent event) {
136
    ActivityTaskCanceledEventAttributes attributes = event.getActivityTaskCanceledEventAttributes();
1✔
137
    if (decisions.handleActivityTaskCanceled(event)) {
1✔
138
      CancellationException e = new CancellationException();
1✔
139
      OpenRequestInfo<byte[], ActivityType> scheduled =
1✔
140
          scheduledActivities.remove(attributes.getScheduledEventId());
1✔
141
      if (scheduled != null) {
1✔
142
        BiConsumer<byte[], Exception> completionHandle = scheduled.getCompletionCallback();
1✔
143
        // It is OK to fail with subclass of CancellationException when cancellation requested.
144
        // It allows passing information about cancellation (details in this case) to the
145
        // surrounding doCatch block
146
        completionHandle.accept(null, e);
1✔
147
      }
148
    }
149
  }
1✔
150

151
  void handleActivityTaskCompleted(HistoryEvent event) {
152
    ActivityTaskCompletedEventAttributes attributes =
1✔
153
        event.getActivityTaskCompletedEventAttributes();
1✔
154
    if (decisions.handleActivityTaskClosed(attributes.getScheduledEventId())) {
1✔
155
      OpenRequestInfo<byte[], ActivityType> scheduled =
1✔
156
          scheduledActivities.remove(attributes.getScheduledEventId());
1✔
157
      if (scheduled != null) {
1✔
158
        byte[] result = attributes.getResult();
1✔
159
        BiConsumer<byte[], Exception> completionHandle = scheduled.getCompletionCallback();
1✔
160
        completionHandle.accept(result, null);
1✔
161
      } else {
1✔
162
        throw new NonDeterminisicWorkflowError(
×
163
            "Trying to complete activity event "
164
                + attributes.getScheduledEventId()
×
165
                + " that is not in scheduledActivities");
166
      }
167
    }
168
  }
1✔
169

170
  void handleActivityTaskFailed(HistoryEvent event) {
171
    ActivityTaskFailedEventAttributes attributes = event.getActivityTaskFailedEventAttributes();
1✔
172
    if (decisions.handleActivityTaskClosed(attributes.getScheduledEventId())) {
1✔
173
      OpenRequestInfo<byte[], ActivityType> scheduled =
1✔
174
          scheduledActivities.remove(attributes.getScheduledEventId());
1✔
175
      if (scheduled != null) {
1✔
176
        String reason = attributes.getReason();
1✔
177
        byte[] details = attributes.getDetails();
1✔
178
        ActivityTaskFailedException failure =
1✔
179
            new ActivityTaskFailedException(
180
                event.getEventId(), scheduled.getUserContext(), null, reason, details);
1✔
181
        BiConsumer<byte[], Exception> completionHandle = scheduled.getCompletionCallback();
1✔
182
        completionHandle.accept(null, failure);
1✔
183
      }
184
    }
185
  }
1✔
186

187
  void handleActivityTaskTimedOut(HistoryEvent event) {
188
    ActivityTaskTimedOutEventAttributes attributes = event.getActivityTaskTimedOutEventAttributes();
1✔
189
    if (decisions.handleActivityTaskClosed(attributes.getScheduledEventId())) {
1✔
190
      OpenRequestInfo<byte[], ActivityType> scheduled =
1✔
191
          scheduledActivities.remove(attributes.getScheduledEventId());
1✔
192
      if (scheduled != null) {
1✔
193
        TimeoutType timeoutType = attributes.getTimeoutType();
1✔
194
        byte[] details = attributes.getDetails();
1✔
195
        ActivityTaskTimeoutException failure =
1✔
196
            new ActivityTaskTimeoutException(
197
                event.getEventId(), scheduled.getUserContext(), null, timeoutType, details);
1✔
198
        BiConsumer<byte[], Exception> completionHandle = scheduled.getCompletionCallback();
1✔
199
        completionHandle.accept(null, failure);
1✔
200
      }
201
    }
202
  }
1✔
203

204
  private Header toHeaderThrift(Map<String, byte[]> headers) {
205
    if (headers == null || headers.isEmpty()) {
1✔
206
      return null;
1✔
207
    }
208
    Map<String, ByteBuffer> fields = new HashMap<>();
1✔
209
    for (Map.Entry<String, byte[]> item : headers.entrySet()) {
1✔
210
      fields.put(item.getKey(), ByteBuffer.wrap(item.getValue()));
1✔
211
    }
1✔
212
    Header headerThrift = new Header();
1✔
213
    headerThrift.setFields(fields);
1✔
214
    return headerThrift;
1✔
215
  }
216
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc