• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

uber / cadence-java-client / 2490

13 Aug 2024 05:39PM CUT coverage: 61.99% (-0.03%) from 62.021%
2490

push

buildkite

web-flow
Removing fossa as it is migrated to snyk (#919)

12090 of 19503 relevant lines covered (61.99%)

0.62 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

87.8
/src/main/java/com/uber/cadence/internal/replay/ReplayDecisionTaskHandler.java
1
/*
2
 *  Copyright 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved.
3
 *
4
 *  Modifications copyright (C) 2017 Uber Technologies, Inc.
5
 *
6
 *  Licensed under the Apache License, Version 2.0 (the "License"). You may not
7
 *  use this file except in compliance with the License. A copy of the License is
8
 *  located at
9
 *
10
 *  http://aws.amazon.com/apache2.0
11
 *
12
 *  or in the "license" file accompanying this file. This file is distributed on
13
 *  an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
14
 *  express or implied. See the License for the specific language governing
15
 *  permissions and limitations under the License.
16
 */
17

18
package com.uber.cadence.internal.replay;
19

20
import static com.uber.cadence.internal.common.InternalUtils.createStickyTaskList;
21

22
import com.uber.cadence.GetWorkflowExecutionHistoryRequest;
23
import com.uber.cadence.GetWorkflowExecutionHistoryResponse;
24
import com.uber.cadence.HistoryEvent;
25
import com.uber.cadence.PollForDecisionTaskResponse;
26
import com.uber.cadence.QueryTaskCompletedType;
27
import com.uber.cadence.RespondDecisionTaskCompletedRequest;
28
import com.uber.cadence.RespondDecisionTaskFailedRequest;
29
import com.uber.cadence.RespondQueryTaskCompletedRequest;
30
import com.uber.cadence.StickyExecutionAttributes;
31
import com.uber.cadence.WorkflowExecution;
32
import com.uber.cadence.WorkflowType;
33
import com.uber.cadence.internal.common.WorkflowExecutionUtils;
34
import com.uber.cadence.internal.metrics.MetricsType;
35
import com.uber.cadence.internal.worker.DecisionTaskHandler;
36
import com.uber.cadence.internal.worker.LocalActivityWorker;
37
import com.uber.cadence.internal.worker.SingleWorkerOptions;
38
import com.uber.cadence.serviceclient.IWorkflowService;
39
import java.io.PrintWriter;
40
import java.io.StringWriter;
41
import java.nio.charset.StandardCharsets;
42
import java.time.Duration;
43
import java.util.List;
44
import java.util.Objects;
45
import java.util.concurrent.atomic.AtomicBoolean;
46
import java.util.function.BiFunction;
47
import org.slf4j.Logger;
48
import org.slf4j.LoggerFactory;
49

50
public final class ReplayDecisionTaskHandler implements DecisionTaskHandler {
51

52
  private static final Logger log = LoggerFactory.getLogger(ReplayDecisionTaskHandler.class);
1✔
53

54
  private final ReplayWorkflowFactory workflowFactory;
55
  private final String domain;
56
  private final DeciderCache cache;
57
  private final SingleWorkerOptions options;
58
  private final Duration stickyTaskListScheduleToStartTimeout;
59
  private IWorkflowService service;
60
  private String stickyTaskListName;
61
  private final BiFunction<LocalActivityWorker.Task, Duration, Boolean> laTaskPoller;
62

63
  public ReplayDecisionTaskHandler(
64
      String domain,
65
      ReplayWorkflowFactory asyncWorkflowFactory,
66
      DeciderCache cache,
67
      SingleWorkerOptions options,
68
      String stickyTaskListName,
69
      Duration stickyTaskListScheduleToStartTimeout,
70
      IWorkflowService service,
71
      BiFunction<LocalActivityWorker.Task, Duration, Boolean> laTaskPoller) {
1✔
72
    this.domain = domain;
1✔
73
    this.workflowFactory = asyncWorkflowFactory;
1✔
74
    this.cache = cache;
1✔
75
    this.options = options;
1✔
76
    this.stickyTaskListName = stickyTaskListName;
1✔
77
    this.stickyTaskListScheduleToStartTimeout = stickyTaskListScheduleToStartTimeout;
1✔
78
    this.service = Objects.requireNonNull(service);
1✔
79
    this.laTaskPoller = laTaskPoller;
1✔
80
  }
1✔
81

82
  @Override
83
  public DecisionTaskHandler.Result handleDecisionTask(PollForDecisionTaskResponse decisionTask)
84
      throws Exception {
85
    try {
86
      return handleDecisionTaskImpl(decisionTask);
1✔
87
    } catch (Throwable e) {
1✔
88
      options.getMetricsScope().counter(MetricsType.DECISION_EXECUTION_FAILED_COUNTER).inc(1);
1✔
89
      // Only fail decision on first attempt, subsequent failure on the same decision task will
90
      // timeout. This is to avoid spin on the failed decision task.
91
      if (decisionTask.getAttempt() > 0) {
1✔
92
        if (e instanceof Error) {
1✔
93
          throw (Error) e;
1✔
94
        }
95
        throw (Exception) e;
×
96
      }
97
      if (log.isErrorEnabled()) {
1✔
98
        WorkflowExecution execution = decisionTask.getWorkflowExecution();
1✔
99
        log.error(
1✔
100
            "Workflow task failure. startedEventId="
101
                + decisionTask.getStartedEventId()
1✔
102
                + ", WorkflowID="
103
                + execution.getWorkflowId()
1✔
104
                + ", RunID="
105
                + execution.getRunId()
1✔
106
                + ". If see continuously the workflow might be stuck.",
107
            e);
108
      }
109
      RespondDecisionTaskFailedRequest failedRequest = new RespondDecisionTaskFailedRequest();
1✔
110
      failedRequest.setTaskToken(decisionTask.getTaskToken());
1✔
111
      StringWriter sw = new StringWriter();
1✔
112
      PrintWriter pw = new PrintWriter(sw);
1✔
113
      e.printStackTrace(pw);
1✔
114
      String stackTrace = sw.toString();
1✔
115
      failedRequest.setDetails(stackTrace.getBytes(StandardCharsets.UTF_8));
1✔
116
      return new DecisionTaskHandler.Result(null, failedRequest, null);
1✔
117
    }
118
  }
119

120
  private Result handleDecisionTaskImpl(PollForDecisionTaskResponse decisionTask) throws Throwable {
121

122
    if (decisionTask.isSetQuery()) {
1✔
123
      return processQuery(decisionTask);
1✔
124
    } else {
125
      return processDecision(decisionTask);
1✔
126
    }
127
  }
128

129
  private Result processDecision(PollForDecisionTaskResponse decisionTask) throws Throwable {
130
    Decider decider = null;
1✔
131
    AtomicBoolean createdNew = new AtomicBoolean();
1✔
132
    try {
133
      if (stickyTaskListName == null) {
1✔
134
        decider = createDecider(decisionTask);
1✔
135
      } else {
136
        decider =
1✔
137
            cache.getOrCreate(
1✔
138
                decisionTask,
139
                () -> {
140
                  createdNew.set(true);
1✔
141
                  return createDecider(decisionTask);
1✔
142
                });
143
      }
144

145
      Decider.DecisionResult result = decider.decide(decisionTask);
1✔
146

147
      if (stickyTaskListName != null && createdNew.get()) {
1✔
148
        cache.addToCache(decisionTask, decider);
1✔
149
      }
150

151
      if (log.isTraceEnabled()) {
1✔
152
        WorkflowExecution execution = decisionTask.getWorkflowExecution();
×
153
        log.trace(
×
154
            "WorkflowTask startedEventId="
155
                + decisionTask.getStartedEventId()
×
156
                + ", WorkflowID="
157
                + execution.getWorkflowId()
×
158
                + ", RunID="
159
                + execution.getRunId()
×
160
                + " completed with "
161
                + WorkflowExecutionUtils.prettyPrintDecisions(result.getDecisions())
×
162
                + " forceCreateNewDecisionTask "
163
                + result.getForceCreateNewDecisionTask());
×
164
      } else if (log.isDebugEnabled()) {
1✔
165
        WorkflowExecution execution = decisionTask.getWorkflowExecution();
×
166
        log.debug(
×
167
            "WorkflowTask startedEventId="
168
                + decisionTask.getStartedEventId()
×
169
                + ", WorkflowID="
170
                + execution.getWorkflowId()
×
171
                + ", RunID="
172
                + execution.getRunId()
×
173
                + " completed with "
174
                + result.getDecisions().size()
×
175
                + " new decisions"
176
                + " forceCreateNewDecisionTask "
177
                + result.getForceCreateNewDecisionTask());
×
178
      }
179
      return createCompletedRequest(decisionTask, result);
1✔
180
    } catch (Throwable e) {
1✔
181
      // Note here that the decider might not be in the cache, even sticky is on. In that case we
182
      // need to close the decider explicitly.
183
      // For items in the cache, invalidation callback will try to close again, which should be ok.
184
      if (decider != null) {
1✔
185
        decider.close();
1✔
186
      }
187

188
      if (stickyTaskListName != null) {
1✔
189
        cache.invalidate(decisionTask.getWorkflowExecution().getRunId());
1✔
190
      }
191
      throw e;
1✔
192
    } finally {
193
      if (stickyTaskListName == null && decider != null) {
1✔
194
        decider.close();
1✔
195
      } else {
196
        cache.markProcessingDone(decisionTask);
1✔
197
      }
198
    }
199
  }
200

201
  private Result processQuery(PollForDecisionTaskResponse decisionTask) {
202
    RespondQueryTaskCompletedRequest queryCompletedRequest = new RespondQueryTaskCompletedRequest();
1✔
203
    queryCompletedRequest.setTaskToken(decisionTask.getTaskToken());
1✔
204
    Decider decider = null;
1✔
205
    AtomicBoolean createdNew = new AtomicBoolean();
1✔
206
    try {
207
      if (stickyTaskListName == null) {
1✔
208
        decider = createDecider(decisionTask);
1✔
209
      } else {
210
        decider =
1✔
211
            cache.getOrCreate(
1✔
212
                decisionTask,
213
                () -> {
214
                  createdNew.set(true);
1✔
215
                  return createDecider(decisionTask);
1✔
216
                });
217
      }
218

219
      byte[] queryResult = decider.query(decisionTask, decisionTask.getQuery());
1✔
220
      if (stickyTaskListName != null && createdNew.get()) {
1✔
221
        cache.addToCache(decisionTask, decider);
1✔
222
      }
223
      queryCompletedRequest.setQueryResult(queryResult);
1✔
224
      queryCompletedRequest.setCompletedType(QueryTaskCompletedType.COMPLETED);
1✔
225
    } catch (Throwable e) {
1✔
226
      // TODO: Appropriate exception serialization.
227
      StringWriter sw = new StringWriter();
1✔
228
      PrintWriter pw = new PrintWriter(sw);
1✔
229
      e.printStackTrace(pw);
1✔
230
      queryCompletedRequest.setErrorMessage(sw.toString());
1✔
231
      queryCompletedRequest.setCompletedType(QueryTaskCompletedType.FAILED);
1✔
232
    } finally {
233
      if (stickyTaskListName == null && decider != null) {
1✔
234
        decider.close();
1✔
235
      } else {
236
        cache.markProcessingDone(decisionTask);
1✔
237
      }
238
    }
239
    return new Result(null, null, queryCompletedRequest);
1✔
240
  }
241

242
  private Result createCompletedRequest(
243
      PollForDecisionTaskResponse decisionTask, Decider.DecisionResult result) {
244
    RespondDecisionTaskCompletedRequest completedRequest =
1✔
245
        new RespondDecisionTaskCompletedRequest();
246
    completedRequest.setTaskToken(decisionTask.getTaskToken());
1✔
247
    completedRequest.setDecisions(result.getDecisions());
1✔
248
    completedRequest.setQueryResults(result.getQueryResults());
1✔
249
    completedRequest.setForceCreateNewDecisionTask(result.getForceCreateNewDecisionTask());
1✔
250

251
    if (stickyTaskListName != null) {
1✔
252
      StickyExecutionAttributes attributes = new StickyExecutionAttributes();
1✔
253
      attributes.setWorkerTaskList(createStickyTaskList(stickyTaskListName));
1✔
254
      attributes.setScheduleToStartTimeoutSeconds(
1✔
255
          (int) stickyTaskListScheduleToStartTimeout.getSeconds());
1✔
256
      completedRequest.setStickyAttributes(attributes);
1✔
257
    }
258
    return new Result(completedRequest, null, null);
1✔
259
  }
260

261
  @Override
262
  public boolean isAnyTypeSupported() {
263
    return workflowFactory.isAnyTypeSupported();
1✔
264
  }
265

266
  private Decider createDecider(PollForDecisionTaskResponse decisionTask) throws Exception {
267
    WorkflowType workflowType = decisionTask.getWorkflowType();
1✔
268
    List<HistoryEvent> events = decisionTask.getHistory().getEvents();
1✔
269
    // Sticky decision task with partial history
270
    if (events.isEmpty() || events.get(0).getEventId() > 1) {
1✔
271
      GetWorkflowExecutionHistoryRequest getHistoryRequest =
1✔
272
          new GetWorkflowExecutionHistoryRequest()
273
              .setDomain(domain)
1✔
274
              .setExecution(decisionTask.getWorkflowExecution());
1✔
275
      GetWorkflowExecutionHistoryResponse getHistoryResponse =
1✔
276
          service.GetWorkflowExecutionHistory(getHistoryRequest);
1✔
277
      decisionTask.setHistory(getHistoryResponse.getHistory());
1✔
278
      decisionTask.setNextPageToken(getHistoryResponse.getNextPageToken());
1✔
279
    }
280
    DecisionsHelper decisionsHelper = new DecisionsHelper(decisionTask, options);
1✔
281
    ReplayWorkflow workflow = workflowFactory.getWorkflow(workflowType);
1✔
282
    return new ReplayDecider(
1✔
283
        service, domain, workflowType, workflow, decisionsHelper, options, laTaskPoller);
284
  }
285
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc