• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

uber / cadence-java-client / 16

16 Apr 2024 01:28AM UTC coverage: 60.239% (-0.1%) from 60.343%
16

push

buildkite

mstifflin
Remove unnecessary sidecar command, try executing with lower resources

11446 of 19001 relevant lines covered (60.24%)

0.6 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

82.5
/src/main/java/com/uber/cadence/internal/sync/SyncWorkflowWorker.java
1
/*
2
 *  Copyright 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved.
3
 *
4
 *  Modifications copyright (C) 2017 Uber Technologies, Inc.
5
 *
6
 *  Licensed under the Apache License, Version 2.0 (the "License"). You may not
7
 *  use this file except in compliance with the License. A copy of the License is
8
 *  located at
9
 *
10
 *  http://aws.amazon.com/apache2.0
11
 *
12
 *  or in the "license" file accompanying this file. This file is distributed on
13
 *  an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
14
 *  express or implied. See the License for the specific language governing
15
 *  permissions and limitations under the License.
16
 */
17

18
package com.uber.cadence.internal.sync;
19

20
import com.uber.cadence.PollForDecisionTaskResponse;
21
import com.uber.cadence.WorkflowExecution;
22
import com.uber.cadence.common.WorkflowExecutionHistory;
23
import com.uber.cadence.converter.DataConverter;
24
import com.uber.cadence.internal.common.InternalUtils;
25
import com.uber.cadence.internal.replay.DeciderCache;
26
import com.uber.cadence.internal.replay.ReplayDecisionTaskHandler;
27
import com.uber.cadence.internal.worker.DecisionTaskHandler;
28
import com.uber.cadence.internal.worker.LocalActivityWorker;
29
import com.uber.cadence.internal.worker.LocallyDispatchedActivityWorker;
30
import com.uber.cadence.internal.worker.LocallyDispatchedActivityWorker.Task;
31
import com.uber.cadence.internal.worker.NoopSuspendableWorker;
32
import com.uber.cadence.internal.worker.SingleWorkerOptions;
33
import com.uber.cadence.internal.worker.SuspendableWorker;
34
import com.uber.cadence.internal.worker.WorkflowWorker;
35
import com.uber.cadence.serviceclient.IWorkflowService;
36
import com.uber.cadence.worker.WorkflowImplementationOptions;
37
import com.uber.cadence.workflow.Functions.Func;
38
import com.uber.cadence.workflow.WorkflowInterceptor;
39
import java.lang.reflect.Type;
40
import java.time.Duration;
41
import java.util.Objects;
42
import java.util.concurrent.*;
43
import java.util.function.Consumer;
44
import java.util.function.Function;
45

46
/** Workflow worker that supports POJO workflow implementations. */
47
public class SyncWorkflowWorker
48
    implements SuspendableWorker, Consumer<PollForDecisionTaskResponse> {
49

50
  private final WorkflowWorker workflowWorker;
51
  private final LocalActivityWorker laWorker;
52
  private final POJOWorkflowImplementationFactory factory;
53
  private final DataConverter dataConverter;
54
  private final POJOActivityTaskHandler laTaskHandler;
55
  private final ScheduledExecutorService heartbeatExecutor = Executors.newScheduledThreadPool(4);
1✔
56
  private final ScheduledExecutorService ldaHeartbeatExecutor = Executors.newScheduledThreadPool(4);
1✔
57
  private SuspendableWorker ldaWorker;
58
  private POJOActivityTaskHandler ldaTaskHandler;
59
  private final IWorkflowService service;
60

61
  public SyncWorkflowWorker(
62
      IWorkflowService service,
63
      String domain,
64
      String taskList,
65
      Function<WorkflowInterceptor, WorkflowInterceptor> interceptorFactory,
66
      SingleWorkerOptions workflowOptions,
67
      SingleWorkerOptions localActivityOptions,
68
      SingleWorkerOptions locallyDispatchedActivityOptions,
69
      DeciderCache cache,
70
      String stickyTaskListName,
71
      Duration stickyDecisionScheduleToStartTimeout,
72
      ThreadPoolExecutor workflowThreadPool) {
1✔
73
    Objects.requireNonNull(workflowThreadPool);
1✔
74
    this.dataConverter = workflowOptions.getDataConverter();
1✔
75
    this.service = service;
1✔
76

77
    factory =
1✔
78
        new POJOWorkflowImplementationFactory(
79
            workflowOptions.getDataConverter(),
1✔
80
            workflowThreadPool,
81
            interceptorFactory,
82
            cache,
83
            workflowOptions.getContextPropagators(),
1✔
84
            workflowOptions.getTracer());
1✔
85

86
    laTaskHandler =
1✔
87
        new POJOActivityTaskHandler(
88
            service, domain, localActivityOptions.getDataConverter(), heartbeatExecutor);
1✔
89
    laWorker = new LocalActivityWorker(domain, taskList, localActivityOptions, laTaskHandler);
1✔
90

91
    DecisionTaskHandler taskHandler =
1✔
92
        new ReplayDecisionTaskHandler(
93
            domain,
94
            factory,
95
            cache,
96
            workflowOptions,
97
            stickyTaskListName,
98
            stickyDecisionScheduleToStartTimeout,
99
            service,
100
            laWorker.getLocalActivityTaskPoller());
1✔
101

102
    Function<Task, Boolean> locallyDispatchedActivityTaskPoller = null;
1✔
103
    // do not dispatch locally if TaskListActivitiesPerSecond is set
104
    if (locallyDispatchedActivityOptions.getTaskListActivitiesPerSecond() == 0) {
1✔
105
      ldaTaskHandler =
1✔
106
          new POJOActivityTaskHandler(
107
              service,
108
              domain,
109
              locallyDispatchedActivityOptions.getDataConverter(),
1✔
110
              ldaHeartbeatExecutor);
111
      ldaWorker =
1✔
112
          new LocallyDispatchedActivityWorker(
113
              service, domain, taskList, locallyDispatchedActivityOptions, ldaTaskHandler);
114
      locallyDispatchedActivityTaskPoller =
1✔
115
          ((LocallyDispatchedActivityWorker) ldaWorker).getLocallyDispatchedActivityTaskPoller();
1✔
116
    } else {
117
      ldaWorker = new NoopSuspendableWorker();
×
118
    }
119

120
    workflowWorker =
1✔
121
        new WorkflowWorker(
122
            service,
123
            domain,
124
            taskList,
125
            workflowOptions,
126
            taskHandler,
127
            locallyDispatchedActivityTaskPoller,
128
            stickyTaskListName);
129
  }
1✔
130

131
  public void setWorkflowImplementationTypes(
132
      WorkflowImplementationOptions options, Class<?>[] workflowImplementationTypes) {
133
    factory.setWorkflowImplementationTypes(options, workflowImplementationTypes);
1✔
134
  }
1✔
135

136
  public <R> void addWorkflowImplementationFactory(
137
      WorkflowImplementationOptions options, Class<R> clazz, Func<R> factory) {
138
    this.factory.addWorkflowImplementationFactory(options, clazz, factory);
×
139
  }
×
140

141
  public <R> void addWorkflowImplementationFactory(Class<R> clazz, Func<R> factory) {
142
    this.factory.addWorkflowImplementationFactory(clazz, factory);
1✔
143
  }
1✔
144

145
  public void setLocalActivitiesImplementation(Object... activitiesImplementation) {
146
    this.laTaskHandler.setLocalActivitiesImplementation(activitiesImplementation);
1✔
147
  }
1✔
148

149
  public void setActivitiesImplementationToDispatchLocally(Object... activitiesImplementation) {
150
    if (this.ldaTaskHandler != null) {
1✔
151
      this.ldaTaskHandler.setActivitiesImplementation(activitiesImplementation);
1✔
152
    }
153
  }
1✔
154

155
  @Override
156
  public void start() {
157
    workflowWorker.start();
1✔
158
    // workflowWorker doesn't start if no types are registered with it. In that case we don't need
159
    // to start LocalActivity Worker.
160
    if (workflowWorker.isStarted()) {
1✔
161
      laWorker.start();
1✔
162
      ldaWorker.start();
1✔
163
    }
164
  }
1✔
165

166
  @Override
167
  public boolean isStarted() {
168
    return workflowWorker.isStarted() && laWorker.isStarted() && ldaWorker.isStarted();
×
169
  }
170

171
  @Override
172
  public boolean isShutdown() {
173
    return workflowWorker.isShutdown() && laWorker.isShutdown() && ldaWorker.isShutdown();
×
174
  }
175

176
  @Override
177
  public boolean isTerminated() {
178
    return workflowWorker.isTerminated()
×
179
        && laWorker.isTerminated()
×
180
        && ldaHeartbeatExecutor.isTerminated()
×
181
        && ldaWorker.isTerminated();
×
182
  }
183

184
  @Override
185
  public void shutdown() {
186
    laWorker.shutdown();
1✔
187
    ldaHeartbeatExecutor.shutdown();
1✔
188
    ldaWorker.shutdown();
1✔
189
    workflowWorker.shutdown();
1✔
190
  }
1✔
191

192
  @Override
193
  public void shutdownNow() {
194
    laWorker.shutdownNow();
1✔
195
    ldaHeartbeatExecutor.shutdownNow();
1✔
196
    ldaWorker.shutdownNow();
1✔
197
    workflowWorker.shutdownNow();
1✔
198
  }
1✔
199

200
  @Override
201
  public void awaitTermination(long timeout, TimeUnit unit) {
202
    long timeoutMillis = InternalUtils.awaitTermination(laWorker, unit.toMillis(timeout));
1✔
203
    timeoutMillis = InternalUtils.awaitTermination(ldaHeartbeatExecutor, timeoutMillis);
1✔
204
    timeoutMillis = InternalUtils.awaitTermination(ldaWorker, timeoutMillis);
1✔
205
    InternalUtils.awaitTermination(workflowWorker, timeoutMillis);
1✔
206
  }
1✔
207

208
  @Override
209
  public void suspendPolling() {
210
    workflowWorker.suspendPolling();
1✔
211
    laWorker.suspendPolling();
1✔
212
    ldaWorker.suspendPolling();
1✔
213
  }
1✔
214

215
  @Override
216
  public void resumePolling() {
217
    workflowWorker.resumePolling();
1✔
218
    laWorker.resumePolling();
1✔
219
    ldaWorker.resumePolling();
1✔
220
  }
1✔
221

222
  @Override
223
  public boolean isSuspended() {
224
    return workflowWorker.isSuspended() && laWorker.isSuspended() && ldaWorker.isSuspended();
×
225
  }
226

227
  public <R> R queryWorkflowExecution(
228
      WorkflowExecution execution,
229
      String queryType,
230
      Class<R> resultClass,
231
      Type resultType,
232
      Object[] args)
233
      throws Exception {
234
    byte[] serializedArgs = dataConverter.toData(args);
×
235
    byte[] result = workflowWorker.queryWorkflowExecution(execution, queryType, serializedArgs);
×
236
    return dataConverter.fromData(result, resultClass, resultType);
×
237
  }
238

239
  public <R> R queryWorkflowExecution(
240
      WorkflowExecutionHistory history,
241
      String queryType,
242
      Class<R> resultClass,
243
      Type resultType,
244
      Object[] args)
245
      throws Exception {
246
    byte[] serializedArgs = dataConverter.toData(args);
1✔
247
    byte[] result = workflowWorker.queryWorkflowExecution(history, queryType, serializedArgs);
1✔
248
    return dataConverter.fromData(result, resultClass, resultType);
1✔
249
  }
250

251
  @Override
252
  public void accept(PollForDecisionTaskResponse pollForDecisionTaskResponse) {
253
    workflowWorker.accept(pollForDecisionTaskResponse);
1✔
254
  }
1✔
255

256
  public CompletableFuture<Boolean> isHealthy() {
257
    return service.isHealthy();
×
258
  }
259
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc