• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

uber / cadence-java-client / 2384

12 Jun 2024 09:48PM CUT coverage: 61.43% (-0.03%) from 61.455%
2384

push

buildkite

web-flow
Release v3.12.4 (#911)

What changed?

3.12.4
Fix incorrect span activation for local activities

11966 of 19479 relevant lines covered (61.43%)

0.61 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

89.72
/src/main/java/com/uber/cadence/internal/replay/ActivityDecisionContext.java
1
/*
2
 *  Copyright 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved.
3
 *
4
 *  Modifications copyright (C) 2017 Uber Technologies, Inc.
5
 *
6
 *  Licensed under the Apache License, Version 2.0 (the "License"). You may not
7
 *  use this file except in compliance with the License. A copy of the License is
8
 *  located at
9
 *
10
 *  http://aws.amazon.com/apache2.0
11
 *
12
 *  or in the "license" file accompanying this file. This file is distributed on
13
 *  an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
14
 *  express or implied. See the License for the specific language governing
15
 *  permissions and limitations under the License.
16
 */
17

18
package com.uber.cadence.internal.replay;
19

20
import com.uber.cadence.ActivityTaskCanceledEventAttributes;
21
import com.uber.cadence.ActivityTaskCompletedEventAttributes;
22
import com.uber.cadence.ActivityTaskFailedEventAttributes;
23
import com.uber.cadence.ActivityTaskTimedOutEventAttributes;
24
import com.uber.cadence.ActivityType;
25
import com.uber.cadence.Header;
26
import com.uber.cadence.HistoryEvent;
27
import com.uber.cadence.ScheduleActivityTaskDecisionAttributes;
28
import com.uber.cadence.TaskList;
29
import com.uber.cadence.TimeoutType;
30
import com.uber.cadence.internal.common.RetryParameters;
31
import java.nio.ByteBuffer;
32
import java.util.HashMap;
33
import java.util.Map;
34
import java.util.concurrent.CancellationException;
35
import java.util.function.BiConsumer;
36
import java.util.function.Consumer;
37

38
final class ActivityDecisionContext {
39

40
  private final class ActivityCancellationHandler implements Consumer<Exception> {
41

42
    private final long scheduledEventId;
43

44
    private final String activityId;
45

46
    private final BiConsumer<byte[], Exception> callback;
47

48
    private ActivityCancellationHandler(
49
        long scheduledEventId, String activityId, BiConsumer<byte[], Exception> callaback) {
1✔
50
      this.scheduledEventId = scheduledEventId;
1✔
51
      this.activityId = activityId;
1✔
52
      this.callback = callaback;
1✔
53
    }
1✔
54

55
    @Override
56
    public void accept(Exception cause) {
57
      if (!scheduledActivities.containsKey(scheduledEventId)) {
1✔
58
        // Cancellation handlers are not deregistered. So they fire after an activity completion.
59
        return;
×
60
      }
61
      decisions.requestCancelActivityTask(
1✔
62
          scheduledEventId,
63
          () -> {
64
            OpenRequestInfo<byte[], ActivityType> scheduled =
×
65
                scheduledActivities.remove(scheduledEventId);
×
66
            if (scheduled == null) {
×
67
              throw new IllegalArgumentException(
×
68
                  String.format(
×
69
                      "Activity with activityId=%s and scheduledEventId=%d wasn't found",
70
                      activityId, scheduledEventId));
×
71
            }
72
            callback.accept(null, new CancellationException("Cancelled by request"));
×
73
          });
×
74
    }
1✔
75
  }
76

77
  private final DecisionsHelper decisions;
78

79
  // key is scheduledEventId
80
  private final Map<Long, OpenRequestInfo<byte[], ActivityType>> scheduledActivities =
1✔
81
      new HashMap<>();
82

83
  ActivityDecisionContext(DecisionsHelper decisions) {
1✔
84
    this.decisions = decisions;
1✔
85
  }
1✔
86

87
  public boolean isActivityScheduledWithRetryOptions() {
88
    return decisions.isActivityScheduledWithRetryOptions();
1✔
89
  }
90

91
  Consumer<Exception> scheduleActivityTask(
92
      ExecuteActivityParameters parameters, BiConsumer<byte[], Exception> callback) {
93
    final OpenRequestInfo<byte[], ActivityType> context =
1✔
94
        new OpenRequestInfo<>(parameters.getActivityType());
1✔
95
    final ScheduleActivityTaskDecisionAttributes attributes =
1✔
96
        new ScheduleActivityTaskDecisionAttributes();
97
    attributes.setActivityType(parameters.getActivityType());
1✔
98
    attributes.setInput(parameters.getInput());
1✔
99
    if (parameters.getHeartbeatTimeoutSeconds() > 0) {
1✔
100
      attributes.setHeartbeatTimeoutSeconds((int) parameters.getHeartbeatTimeoutSeconds());
1✔
101
    }
102
    attributes.setScheduleToCloseTimeoutSeconds(
1✔
103
        (int) parameters.getScheduleToCloseTimeoutSeconds());
1✔
104
    attributes.setScheduleToStartTimeoutSeconds(
1✔
105
        (int) parameters.getScheduleToStartTimeoutSeconds());
1✔
106
    attributes.setStartToCloseTimeoutSeconds((int) parameters.getStartToCloseTimeoutSeconds());
1✔
107

108
    // attributes.setTaskPriority(InternalUtils.taskPriorityToString(parameters.getTaskPriority()));
109
    String activityId = parameters.getActivityId();
1✔
110
    if (activityId == null) {
1✔
111
      activityId = String.valueOf(decisions.getAndIncrementNextId());
1✔
112
    }
113
    attributes.setActivityId(activityId);
1✔
114

115
    String taskList = parameters.getTaskList();
1✔
116
    if (taskList != null && !taskList.isEmpty()) {
1✔
117
      TaskList tl = new TaskList();
1✔
118
      tl.setName(taskList);
1✔
119
      attributes.setTaskList(tl);
1✔
120
    }
121
    RetryParameters retryParameters = parameters.getRetryParameters();
1✔
122
    if (retryParameters != null) {
1✔
123
      attributes.setRetryPolicy(retryParameters.toRetryPolicy());
1✔
124
    }
125

126
    attributes.setHeader(toHeaderThrift(parameters.getContext()));
1✔
127

128
    long scheduledEventId = decisions.scheduleActivityTask(attributes);
1✔
129
    context.setCompletionHandle(callback);
1✔
130
    scheduledActivities.put(scheduledEventId, context);
1✔
131
    return new ActivityDecisionContext.ActivityCancellationHandler(
1✔
132
        scheduledEventId, attributes.getActivityId(), callback);
1✔
133
  }
134

135
  void handleActivityTaskCanceled(HistoryEvent event) {
136
    ActivityTaskCanceledEventAttributes attributes = event.getActivityTaskCanceledEventAttributes();
1✔
137
    if (decisions.handleActivityTaskCanceled(event)) {
1✔
138
      CancellationException e = new CancellationException();
1✔
139
      OpenRequestInfo<byte[], ActivityType> scheduled =
1✔
140
          scheduledActivities.remove(attributes.getScheduledEventId());
1✔
141
      if (scheduled != null) {
1✔
142
        BiConsumer<byte[], Exception> completionHandle = scheduled.getCompletionCallback();
1✔
143
        // It is OK to fail with subclass of CancellationException when cancellation requested.
144
        // It allows passing information about cancellation (details in this case) to the
145
        // surrounding doCatch block
146
        completionHandle.accept(null, e);
1✔
147
      }
148
    }
149
  }
1✔
150

151
  void handleActivityTaskCompleted(HistoryEvent event) {
152
    ActivityTaskCompletedEventAttributes attributes =
1✔
153
        event.getActivityTaskCompletedEventAttributes();
1✔
154
    if (decisions.handleActivityTaskClosed(attributes.getScheduledEventId())) {
1✔
155
      OpenRequestInfo<byte[], ActivityType> scheduled =
1✔
156
          scheduledActivities.remove(attributes.getScheduledEventId());
1✔
157
      if (scheduled != null) {
1✔
158
        byte[] result = attributes.getResult();
1✔
159
        BiConsumer<byte[], Exception> completionHandle = scheduled.getCompletionCallback();
1✔
160
        completionHandle.accept(result, null);
1✔
161
      } else {
1✔
162
        throw new NonDeterminisicWorkflowError(
×
163
            "Trying to complete activity event "
164
                + attributes.getScheduledEventId()
×
165
                + " that is not in scheduledActivities");
166
      }
167
    }
168
  }
1✔
169

170
  void handleActivityTaskFailed(HistoryEvent event) {
171
    ActivityTaskFailedEventAttributes attributes = event.getActivityTaskFailedEventAttributes();
1✔
172
    if (decisions.handleActivityTaskClosed(attributes.getScheduledEventId())) {
1✔
173
      OpenRequestInfo<byte[], ActivityType> scheduled =
1✔
174
          scheduledActivities.remove(attributes.getScheduledEventId());
1✔
175
      if (scheduled != null) {
1✔
176
        String reason = attributes.getReason();
1✔
177
        byte[] details = attributes.getDetails();
1✔
178
        ActivityTaskFailedException failure =
1✔
179
            new ActivityTaskFailedException(
180
                event.getEventId(), scheduled.getUserContext(), null, reason, details);
1✔
181
        BiConsumer<byte[], Exception> completionHandle = scheduled.getCompletionCallback();
1✔
182
        completionHandle.accept(null, failure);
1✔
183
      }
184
    }
185
  }
1✔
186

187
  void handleActivityTaskTimedOut(HistoryEvent event) {
188
    ActivityTaskTimedOutEventAttributes attributes = event.getActivityTaskTimedOutEventAttributes();
1✔
189
    if (decisions.handleActivityTaskClosed(attributes.getScheduledEventId())) {
1✔
190
      OpenRequestInfo<byte[], ActivityType> scheduled =
1✔
191
          scheduledActivities.remove(attributes.getScheduledEventId());
1✔
192
      if (scheduled != null) {
1✔
193
        TimeoutType timeoutType = attributes.getTimeoutType();
1✔
194
        byte[] details = attributes.getDetails();
1✔
195
        ActivityTaskTimeoutException failure =
1✔
196
            new ActivityTaskTimeoutException(
197
                event.getEventId(), scheduled.getUserContext(), null, timeoutType, details);
1✔
198
        BiConsumer<byte[], Exception> completionHandle = scheduled.getCompletionCallback();
1✔
199
        completionHandle.accept(null, failure);
1✔
200
      }
201
    }
202
  }
1✔
203

204
  private Header toHeaderThrift(Map<String, byte[]> headers) {
205
    if (headers == null || headers.isEmpty()) {
1✔
206
      return null;
1✔
207
    }
208
    Map<String, ByteBuffer> fields = new HashMap<>();
1✔
209
    for (Map.Entry<String, byte[]> item : headers.entrySet()) {
1✔
210
      fields.put(item.getKey(), ByteBuffer.wrap(item.getValue()));
1✔
211
    }
1✔
212
    Header headerThrift = new Header();
1✔
213
    headerThrift.setFields(fields);
1✔
214
    return headerThrift;
1✔
215
  }
216
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc