• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

uber / cadence-java-client / 2016

11 Oct 2023 04:32PM CUT coverage: 60.271% (+0.08%) from 60.196%
2016

push

buildkite

web-flow
Release 3.10.1-fix (#860)

* fixed bug: Added alreadyStarted workflow case
* populate tasklist kind in poll request

11349 of 18830 relevant lines covered (60.27%)

0.6 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

72.5
/src/main/java/com/uber/cadence/internal/worker/PollDecisionTaskDispatcher.java
1
/*
2
 *  Copyright 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved.
3
 *
4
 *  Modifications copyright (C) 2017 Uber Technologies, Inc.
5
 *
6
 *  Licensed under the Apache License, Version 2.0 (the "License"). You may not
7
 *  use this file except in compliance with the License. A copy of the License is
8
 *  located at
9
 *
10
 *  http://aws.amazon.com/apache2.0
11
 *
12
 *  or in the "license" file accompanying this file. This file is distributed on
13
 *  an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
14
 *  express or implied. See the License for the specific language governing
15
 *  permissions and limitations under the License.
16
 */
17

18
package com.uber.cadence.internal.worker;
19

20
import com.uber.cadence.DecisionTaskFailedCause;
21
import com.uber.cadence.PollForDecisionTaskResponse;
22
import com.uber.cadence.RespondDecisionTaskFailedRequest;
23
import com.uber.cadence.serviceclient.IWorkflowService;
24
import java.nio.charset.Charset;
25
import java.util.Map;
26
import java.util.Objects;
27
import java.util.concurrent.ConcurrentHashMap;
28
import java.util.concurrent.RejectedExecutionException;
29
import java.util.concurrent.TimeUnit;
30
import java.util.concurrent.atomic.AtomicBoolean;
31
import java.util.function.Consumer;
32
import org.slf4j.Logger;
33
import org.slf4j.LoggerFactory;
34

35
public final class PollDecisionTaskDispatcher
36
    implements ShutdownableTaskExecutor<PollForDecisionTaskResponse> {
37

38
  private static final Logger log = LoggerFactory.getLogger(PollDecisionTaskDispatcher.class);
1✔
39
  private final Map<String, Consumer<PollForDecisionTaskResponse>> subscribers =
1✔
40
      new ConcurrentHashMap<>();
41
  private IWorkflowService service;
42
  private Thread.UncaughtExceptionHandler uncaughtExceptionHandler =
1✔
43
      (t, e) -> log.error("uncaught exception", e);
×
44
  private AtomicBoolean shutdown = new AtomicBoolean();
1✔
45

46
  public PollDecisionTaskDispatcher(IWorkflowService service) {
1✔
47
    this.service = Objects.requireNonNull(service);
1✔
48
  }
1✔
49

50
  public PollDecisionTaskDispatcher(
51
      IWorkflowService service, Thread.UncaughtExceptionHandler exceptionHandler) {
×
52
    this.service = Objects.requireNonNull(service);
×
53
    if (exceptionHandler != null) {
×
54
      this.uncaughtExceptionHandler = exceptionHandler;
×
55
    }
56
  }
×
57

58
  @Override
59
  public void process(PollForDecisionTaskResponse t) {
60
    if (isShutdown()) {
1✔
61
      throw new RejectedExecutionException("shutdown");
×
62
    }
63
    String taskListName = t.getWorkflowExecutionTaskList().getName();
1✔
64
    if (subscribers.containsKey(taskListName)) {
1✔
65
      subscribers.get(taskListName).accept(t);
1✔
66
    } else {
67
      RespondDecisionTaskFailedRequest request = new RespondDecisionTaskFailedRequest();
1✔
68
      request.setTaskToken(t.taskToken);
1✔
69
      request.setCause(DecisionTaskFailedCause.RESET_STICKY_TASKLIST);
1✔
70
      String message =
1✔
71
          String.format(
1✔
72
              "No handler is subscribed for the PollForDecisionTaskResponse.WorkflowExecutionTaskList %s",
73
              taskListName);
74
      request.setDetails(message.getBytes(Charset.defaultCharset()));
1✔
75
      log.warn(message);
1✔
76

77
      try {
78
        service.RespondDecisionTaskFailed(request);
1✔
79
      } catch (Exception e) {
×
80
        uncaughtExceptionHandler.uncaughtException(Thread.currentThread(), e);
×
81
      }
1✔
82
    }
83
  }
1✔
84

85
  @Override
86
  public boolean hasCapacity() {
87
    return true;
×
88
  }
89

90
  public void subscribe(String taskList, Consumer<PollForDecisionTaskResponse> consumer) {
91
    subscribers.put(taskList, consumer);
1✔
92
  }
1✔
93

94
  @Override
95
  public boolean isShutdown() {
96
    return shutdown.get();
1✔
97
  }
98

99
  @Override
100
  public boolean isTerminated() {
101
    return shutdown.get();
×
102
  }
103

104
  @Override
105
  public void shutdown() {
106
    shutdown.set(true);
1✔
107
  }
1✔
108

109
  @Override
110
  public void shutdownNow() {
111
    shutdown.set(true);
1✔
112
  }
1✔
113

114
  @Override
115
  public void awaitTermination(long timeout, TimeUnit unit) {}
1✔
116
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc