• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

uber / cadence-java-client / 1995

27 Sep 2023 06:06PM UTC coverage: 60.183% (-0.04%) from 60.226%
1995

push

buildkite

web-flow
fixed bug: Added alreadyStarted workflow case (#853)

If we get WorkflowExecutionAlreadyStartedError while starting workflow in new domain, then we don't require startWorkflowInNew domain again as well as not throw error. Hence, we will inform by giving status as "Workflow already started".

8 of 8 new or added lines in 2 files covered. (100.0%)

11327 of 18821 relevant lines covered (60.18%)

0.6 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

87.8
/src/main/java/com/uber/cadence/internal/replay/ReplayDecisionTaskHandler.java
1
/*
2
 *  Copyright 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved.
3
 *
4
 *  Modifications copyright (C) 2017 Uber Technologies, Inc.
5
 *
6
 *  Licensed under the Apache License, Version 2.0 (the "License"). You may not
7
 *  use this file except in compliance with the License. A copy of the License is
8
 *  located at
9
 *
10
 *  http://aws.amazon.com/apache2.0
11
 *
12
 *  or in the "license" file accompanying this file. This file is distributed on
13
 *  an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
14
 *  express or implied. See the License for the specific language governing
15
 *  permissions and limitations under the License.
16
 */
17

18
package com.uber.cadence.internal.replay;
19

20
import static com.uber.cadence.internal.common.InternalUtils.createStickyTaskList;
21

22
import com.uber.cadence.GetWorkflowExecutionHistoryRequest;
23
import com.uber.cadence.GetWorkflowExecutionHistoryResponse;
24
import com.uber.cadence.HistoryEvent;
25
import com.uber.cadence.PollForDecisionTaskResponse;
26
import com.uber.cadence.QueryTaskCompletedType;
27
import com.uber.cadence.RespondDecisionTaskCompletedRequest;
28
import com.uber.cadence.RespondDecisionTaskFailedRequest;
29
import com.uber.cadence.RespondQueryTaskCompletedRequest;
30
import com.uber.cadence.StickyExecutionAttributes;
31
import com.uber.cadence.WorkflowExecution;
32
import com.uber.cadence.WorkflowType;
33
import com.uber.cadence.internal.common.WorkflowExecutionUtils;
34
import com.uber.cadence.internal.metrics.MetricsType;
35
import com.uber.cadence.internal.worker.DecisionTaskHandler;
36
import com.uber.cadence.internal.worker.LocalActivityWorker;
37
import com.uber.cadence.internal.worker.SingleWorkerOptions;
38
import com.uber.cadence.serviceclient.IWorkflowService;
39
import java.io.PrintWriter;
40
import java.io.StringWriter;
41
import java.nio.charset.StandardCharsets;
42
import java.time.Duration;
43
import java.util.List;
44
import java.util.Objects;
45
import java.util.concurrent.atomic.AtomicBoolean;
46
import java.util.function.BiFunction;
47
import org.slf4j.Logger;
48
import org.slf4j.LoggerFactory;
49

50
public final class ReplayDecisionTaskHandler implements DecisionTaskHandler {
51

52
  private static final Logger log = LoggerFactory.getLogger(ReplayDecisionTaskHandler.class);
1✔
53

54
  private final ReplayWorkflowFactory workflowFactory;
55
  private final String domain;
56
  private final DeciderCache cache;
57
  private final SingleWorkerOptions options;
58
  private final Duration stickyTaskListScheduleToStartTimeout;
59
  private IWorkflowService service;
60
  private String stickyTaskListName;
61
  private final BiFunction<LocalActivityWorker.Task, Duration, Boolean> laTaskPoller;
62

63
  public ReplayDecisionTaskHandler(
64
      String domain,
65
      ReplayWorkflowFactory asyncWorkflowFactory,
66
      DeciderCache cache,
67
      SingleWorkerOptions options,
68
      String stickyTaskListName,
69
      Duration stickyTaskListScheduleToStartTimeout,
70
      IWorkflowService service,
71
      BiFunction<LocalActivityWorker.Task, Duration, Boolean> laTaskPoller) {
1✔
72
    this.domain = domain;
1✔
73
    this.workflowFactory = asyncWorkflowFactory;
1✔
74
    this.cache = cache;
1✔
75
    this.options = options;
1✔
76
    this.stickyTaskListName = stickyTaskListName;
1✔
77
    this.stickyTaskListScheduleToStartTimeout = stickyTaskListScheduleToStartTimeout;
1✔
78
    this.service = Objects.requireNonNull(service);
1✔
79
    this.laTaskPoller = laTaskPoller;
1✔
80
  }
1✔
81

82
  @Override
83
  public DecisionTaskHandler.Result handleDecisionTask(PollForDecisionTaskResponse decisionTask)
84
      throws Exception {
85
    try {
86
      return handleDecisionTaskImpl(decisionTask);
1✔
87
    } catch (Throwable e) {
1✔
88
      options.getMetricsScope().counter(MetricsType.DECISION_EXECUTION_FAILED_COUNTER).inc(1);
1✔
89
      // Only fail decision on first attempt, subsequent failure on the same decision task will
90
      // timeout. This is to avoid spin on the failed decision task.
91
      if (decisionTask.getAttempt() > 0) {
1✔
92
        if (e instanceof Error) {
1✔
93
          throw (Error) e;
1✔
94
        }
95
        throw (Exception) e;
×
96
      }
97
      if (log.isErrorEnabled()) {
1✔
98
        WorkflowExecution execution = decisionTask.getWorkflowExecution();
1✔
99
        log.error(
1✔
100
            "Workflow task failure. startedEventId="
101
                + decisionTask.getStartedEventId()
1✔
102
                + ", WorkflowID="
103
                + execution.getWorkflowId()
1✔
104
                + ", RunID="
105
                + execution.getRunId()
1✔
106
                + ". If see continuously the workflow might be stuck.",
107
            e);
108
      }
109
      RespondDecisionTaskFailedRequest failedRequest = new RespondDecisionTaskFailedRequest();
1✔
110
      failedRequest.setTaskToken(decisionTask.getTaskToken());
1✔
111
      StringWriter sw = new StringWriter();
1✔
112
      PrintWriter pw = new PrintWriter(sw);
1✔
113
      e.printStackTrace(pw);
1✔
114
      String stackTrace = sw.toString();
1✔
115
      failedRequest.setDetails(stackTrace.getBytes(StandardCharsets.UTF_8));
1✔
116
      return new DecisionTaskHandler.Result(null, failedRequest, null);
1✔
117
    }
118
  }
119

120
  private Result handleDecisionTaskImpl(PollForDecisionTaskResponse decisionTask) throws Throwable {
121

122
    if (decisionTask.isSetQuery()) {
1✔
123
      return processQuery(decisionTask);
1✔
124
    } else {
125
      return processDecision(decisionTask);
1✔
126
    }
127
  }
128

129
  private Result processDecision(PollForDecisionTaskResponse decisionTask) throws Throwable {
130
    Decider decider = null;
1✔
131
    AtomicBoolean createdNew = new AtomicBoolean();
1✔
132
    try {
133
      if (stickyTaskListName == null) {
1✔
134
        decider = createDecider(decisionTask);
1✔
135
      } else {
136
        decider =
1✔
137
            cache.getOrCreate(
1✔
138
                decisionTask,
139
                () -> {
140
                  createdNew.set(true);
1✔
141
                  return createDecider(decisionTask);
1✔
142
                });
143
      }
144

145
      Decider.DecisionResult result = decider.decide(decisionTask);
1✔
146

147
      if (stickyTaskListName != null && createdNew.get()) {
1✔
148
        cache.addToCache(decisionTask, decider);
1✔
149
      }
150

151
      if (log.isTraceEnabled()) {
1✔
152
        WorkflowExecution execution = decisionTask.getWorkflowExecution();
×
153
        log.trace(
×
154
            "WorkflowTask startedEventId="
155
                + decisionTask.getStartedEventId()
×
156
                + ", WorkflowID="
157
                + execution.getWorkflowId()
×
158
                + ", RunID="
159
                + execution.getRunId()
×
160
                + " completed with "
161
                + WorkflowExecutionUtils.prettyPrintDecisions(result.getDecisions())
×
162
                + " forceCreateNewDecisionTask "
163
                + result.getForceCreateNewDecisionTask());
×
164
      } else if (log.isDebugEnabled()) {
1✔
165
        WorkflowExecution execution = decisionTask.getWorkflowExecution();
×
166
        log.debug(
×
167
            "WorkflowTask startedEventId="
168
                + decisionTask.getStartedEventId()
×
169
                + ", WorkflowID="
170
                + execution.getWorkflowId()
×
171
                + ", RunID="
172
                + execution.getRunId()
×
173
                + " completed with "
174
                + result.getDecisions().size()
×
175
                + " new decisions"
176
                + " forceCreateNewDecisionTask "
177
                + result.getForceCreateNewDecisionTask());
×
178
      }
179
      return createCompletedRequest(decisionTask, result);
1✔
180
    } catch (Throwable e) {
1✔
181
      // Note here that the decider might not be in the cache, even sticky is on. In that case we
182
      // need to close the decider explicitly.
183
      // For items in the cache, invalidation callback will try to close again, which should be ok.
184
      if (decider != null) {
1✔
185
        decider.close();
1✔
186
      }
187

188
      if (stickyTaskListName != null) {
1✔
189
        cache.invalidate(decisionTask.getWorkflowExecution().getRunId());
1✔
190
      }
191
      throw e;
1✔
192
    } finally {
193
      if (stickyTaskListName == null && decider != null) {
1✔
194
        decider.close();
1✔
195
      } else {
196
        cache.markProcessingDone(decisionTask);
1✔
197
      }
198
    }
199
  }
200

201
  private Result processQuery(PollForDecisionTaskResponse decisionTask) {
202
    RespondQueryTaskCompletedRequest queryCompletedRequest = new RespondQueryTaskCompletedRequest();
1✔
203
    queryCompletedRequest.setTaskToken(decisionTask.getTaskToken());
1✔
204
    Decider decider = null;
1✔
205
    AtomicBoolean createdNew = new AtomicBoolean();
1✔
206
    try {
207
      if (stickyTaskListName == null) {
1✔
208
        decider = createDecider(decisionTask);
1✔
209
      } else {
210
        decider =
1✔
211
            cache.getOrCreate(
1✔
212
                decisionTask,
213
                () -> {
214
                  createdNew.set(true);
1✔
215
                  return createDecider(decisionTask);
1✔
216
                });
217
      }
218

219
      byte[] queryResult = decider.query(decisionTask, decisionTask.getQuery());
1✔
220
      if (stickyTaskListName != null && createdNew.get()) {
1✔
221
        cache.addToCache(decisionTask, decider);
1✔
222
      }
223
      queryCompletedRequest.setQueryResult(queryResult);
1✔
224
      queryCompletedRequest.setCompletedType(QueryTaskCompletedType.COMPLETED);
1✔
225
    } catch (Throwable e) {
1✔
226
      // TODO: Appropriate exception serialization.
227
      StringWriter sw = new StringWriter();
1✔
228
      PrintWriter pw = new PrintWriter(sw);
1✔
229
      e.printStackTrace(pw);
1✔
230
      queryCompletedRequest.setErrorMessage(sw.toString());
1✔
231
      queryCompletedRequest.setCompletedType(QueryTaskCompletedType.FAILED);
1✔
232
    } finally {
233
      if (stickyTaskListName == null && decider != null) {
1✔
234
        decider.close();
1✔
235
      } else {
236
        cache.markProcessingDone(decisionTask);
1✔
237
      }
238
    }
239
    return new Result(null, null, queryCompletedRequest);
1✔
240
  }
241

242
  private Result createCompletedRequest(
243
      PollForDecisionTaskResponse decisionTask, Decider.DecisionResult result) {
244
    RespondDecisionTaskCompletedRequest completedRequest =
1✔
245
        new RespondDecisionTaskCompletedRequest();
246
    completedRequest.setTaskToken(decisionTask.getTaskToken());
1✔
247
    completedRequest.setDecisions(result.getDecisions());
1✔
248
    completedRequest.setQueryResults(result.getQueryResults());
1✔
249
    completedRequest.setForceCreateNewDecisionTask(result.getForceCreateNewDecisionTask());
1✔
250

251
    if (stickyTaskListName != null) {
1✔
252
      StickyExecutionAttributes attributes = new StickyExecutionAttributes();
1✔
253
      attributes.setWorkerTaskList(createStickyTaskList(stickyTaskListName));
1✔
254
      attributes.setScheduleToStartTimeoutSeconds(
1✔
255
          (int) stickyTaskListScheduleToStartTimeout.getSeconds());
1✔
256
      completedRequest.setStickyAttributes(attributes);
1✔
257
    }
258
    return new Result(completedRequest, null, null);
1✔
259
  }
260

261
  @Override
262
  public boolean isAnyTypeSupported() {
263
    return workflowFactory.isAnyTypeSupported();
1✔
264
  }
265

266
  private Decider createDecider(PollForDecisionTaskResponse decisionTask) throws Exception {
267
    WorkflowType workflowType = decisionTask.getWorkflowType();
1✔
268
    List<HistoryEvent> events = decisionTask.getHistory().getEvents();
1✔
269
    // Sticky decision task with partial history
270
    if (events.isEmpty() || events.get(0).getEventId() > 1) {
1✔
271
      GetWorkflowExecutionHistoryRequest getHistoryRequest =
1✔
272
          new GetWorkflowExecutionHistoryRequest()
273
              .setDomain(domain)
1✔
274
              .setExecution(decisionTask.getWorkflowExecution());
1✔
275
      GetWorkflowExecutionHistoryResponse getHistoryResponse =
1✔
276
          service.GetWorkflowExecutionHistory(getHistoryRequest);
1✔
277
      decisionTask.setHistory(getHistoryResponse.getHistory());
1✔
278
      decisionTask.setNextPageToken(getHistoryResponse.getNextPageToken());
1✔
279
    }
280
    DecisionsHelper decisionsHelper = new DecisionsHelper(decisionTask, options);
1✔
281
    ReplayWorkflow workflow = workflowFactory.getWorkflow(workflowType);
1✔
282
    return new ReplayDecider(
1✔
283
        service, domain, workflowType, workflow, decisionsHelper, options, laTaskPoller);
284
  }
285
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc