• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

uber / cadence-java-client / 16

16 Apr 2024 01:28AM UTC coverage: 60.239% (-0.1%) from 60.343%
16

push

buildkite

mstifflin
Remove unnecessary sidecar command, try executing with lower resources

11446 of 19001 relevant lines covered (60.24%)

0.6 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

88.78
/src/main/java/com/uber/cadence/worker/Worker.java
1
/*
2
 *  Modifications Copyright (c) 2017-2020 Uber Technologies Inc.
3
 *  Portions of the Software are attributed to Copyright (c) 2020 Temporal Technologies Inc.
4
 *  Copyright 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved.
5
 *
6
 *  Licensed under the Apache License, Version 2.0 (the "License"). You may not
7
 *  use this file except in compliance with the License. A copy of the License is
8
 *  located at
9
 *
10
 *  http://aws.amazon.com/apache2.0
11
 *
12
 *  or in the "license" file accompanying this file. This file is distributed on
13
 *  an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
14
 *  express or implied. See the License for the specific language governing
15
 *  permissions and limitations under the License.
16
 */
17

18
package com.uber.cadence.worker;
19

20
import com.google.common.annotations.VisibleForTesting;
21
import com.google.common.base.MoreObjects;
22
import com.google.common.base.Preconditions;
23
import com.uber.cadence.client.WorkflowClient;
24
import com.uber.cadence.common.WorkflowExecutionHistory;
25
import com.uber.cadence.context.ContextPropagator;
26
import com.uber.cadence.converter.DataConverter;
27
import com.uber.cadence.internal.common.InternalUtils;
28
import com.uber.cadence.internal.metrics.MetricsTag;
29
import com.uber.cadence.internal.replay.DeciderCache;
30
import com.uber.cadence.internal.sync.SyncActivityWorker;
31
import com.uber.cadence.internal.sync.SyncWorkflowWorker;
32
import com.uber.cadence.internal.worker.SingleWorkerOptions;
33
import com.uber.cadence.internal.worker.Suspendable;
34
import com.uber.cadence.workflow.Functions.Func;
35
import com.uber.cadence.workflow.WorkflowMethod;
36
import com.uber.m3.tally.Scope;
37
import com.uber.m3.util.ImmutableMap;
38
import java.time.Duration;
39
import java.util.List;
40
import java.util.Objects;
41
import java.util.concurrent.CompletableFuture;
42
import java.util.concurrent.ThreadPoolExecutor;
43
import java.util.concurrent.TimeUnit;
44
import java.util.concurrent.atomic.AtomicBoolean;
45

46
/**
47
 * Hosts activity and workflow implementations. Uses long poll to receive activity and decision
48
 * tasks and processes them in a correspondent thread pool.
49
 */
50
public final class Worker implements Suspendable {
51

52
  private final WorkerOptions options;
53
  private final String taskList;
54
  private final SyncWorkflowWorker workflowWorker;
55
  private final SyncActivityWorker activityWorker;
56
  private final AtomicBoolean started = new AtomicBoolean();
1✔
57

58
  /**
59
   * Creates worker that connects to an instance of the Cadence Service.
60
   *
61
   * @param client client to the Cadence Service endpoint.
62
   * @param taskList task list name worker uses to poll. It uses this name for both decision and
63
   *     activity task list polls.
64
   * @param options Options (like {@link DataConverter} override) for configuring worker.
65
   */
66
  Worker(
67
      WorkflowClient client,
68
      String taskList,
69
      WorkerFactoryOptions factoryOptions,
70
      WorkerOptions options,
71
      DeciderCache cache,
72
      String stickyTaskListName,
73
      Duration stickyDecisionScheduleToStartTimeout,
74
      ThreadPoolExecutor threadPoolExecutor,
75
      List<ContextPropagator> contextPropagators) {
1✔
76
    this.taskList = Objects.requireNonNull(taskList);
1✔
77
    options = MoreObjects.firstNonNull(options, WorkerOptions.defaultInstance());
1✔
78
    this.options = options;
1✔
79

80
    Scope metricsScope =
1✔
81
        client
82
            .getOptions()
1✔
83
            .getMetricsScope()
1✔
84
            .tagged(ImmutableMap.of(MetricsTag.TASK_LIST, taskList));
1✔
85

86
    SingleWorkerOptions activityOptions =
87
        SingleWorkerOptions.newBuilder()
1✔
88
            .setIdentity(client.getOptions().getIdentity())
1✔
89
            .setDataConverter(client.getOptions().getDataConverter())
1✔
90
            .setTaskExecutorThreadPoolSize(options.getMaxConcurrentActivityExecutionSize())
1✔
91
            .setTaskListActivitiesPerSecond(options.getTaskListActivitiesPerSecond())
1✔
92
            .setPollerOptions(options.getActivityPollerOptions())
1✔
93
            .setMetricsScope(metricsScope)
1✔
94
            .setEnableLoggingInReplay(factoryOptions.isEnableLoggingInReplay())
1✔
95
            .setContextPropagators(contextPropagators)
1✔
96
            .setTracer(options.getTracer())
1✔
97
            .build();
1✔
98
    activityWorker =
1✔
99
        new SyncActivityWorker(
100
            client.getService(), client.getOptions().getDomain(), taskList, activityOptions);
1✔
101

102
    SingleWorkerOptions workflowOptions =
103
        SingleWorkerOptions.newBuilder()
1✔
104
            .setDataConverter(client.getOptions().getDataConverter())
1✔
105
            .setIdentity(client.getOptions().getIdentity())
1✔
106
            .setPollerOptions(options.getWorkflowPollerOptions())
1✔
107
            .setTaskExecutorThreadPoolSize(options.getMaxConcurrentWorkflowExecutionSize())
1✔
108
            .setMetricsScope(metricsScope)
1✔
109
            .setEnableLoggingInReplay(factoryOptions.isEnableLoggingInReplay())
1✔
110
            .setContextPropagators(contextPropagators)
1✔
111
            .setTracer(options.getTracer())
1✔
112
            .build();
1✔
113
    SingleWorkerOptions localActivityOptions =
114
        SingleWorkerOptions.newBuilder()
1✔
115
            .setDataConverter(client.getOptions().getDataConverter())
1✔
116
            .setIdentity(client.getOptions().getIdentity())
1✔
117
            .setPollerOptions(options.getWorkflowPollerOptions())
1✔
118
            .setTaskExecutorThreadPoolSize(options.getMaxConcurrentLocalActivityExecutionSize())
1✔
119
            .setMetricsScope(metricsScope)
1✔
120
            .setEnableLoggingInReplay(factoryOptions.isEnableLoggingInReplay())
1✔
121
            .setContextPropagators(contextPropagators)
1✔
122
            .setTracer(options.getTracer())
1✔
123
            .build();
1✔
124
    workflowWorker =
1✔
125
        new SyncWorkflowWorker(
126
            client.getService(),
1✔
127
            client.getOptions().getDomain(),
1✔
128
            taskList,
129
            this.options.getInterceptorFactory(),
1✔
130
            workflowOptions,
131
            localActivityOptions,
132
            activityOptions,
133
            cache,
134
            stickyTaskListName,
135
            stickyDecisionScheduleToStartTimeout,
136
            threadPoolExecutor);
137
  }
1✔
138

139
  SyncWorkflowWorker getWorkflowWorker() {
140
    return workflowWorker;
1✔
141
  }
142

143
  /**
144
   * Register workflow implementation classes with a worker. Overwrites previously registered types.
145
   * A workflow implementation class must implement at least one interface with a method annotated
146
   * with {@link WorkflowMethod}. That method becomes a workflow type that this worker supports.
147
   *
148
   * <p>Implementations that share a worker must implement different interfaces as a workflow type
149
   * is identified by the workflow interface, not by the implementation.
150
   *
151
   * <p>The reason for registration accepting workflow class, but not the workflow instance is that
152
   * workflows are stateful and a new instance is created for each workflow execution.
153
   */
154
  public void registerWorkflowImplementationTypes(Class<?>... workflowImplementationClasses) {
155
    Preconditions.checkState(
1✔
156
        !started.get(),
1✔
157
        "registerWorkflowImplementationTypes is not allowed after worker has started");
158

159
    workflowWorker.setWorkflowImplementationTypes(
1✔
160
        new WorkflowImplementationOptions.Builder().build(), workflowImplementationClasses);
1✔
161
  }
1✔
162

163
  /**
164
   * Register workflow implementation classes with a worker. Overwrites previously registered types.
165
   * A workflow implementation class must implement at least one interface with a method annotated
166
   * with {@link WorkflowMethod}. That method becomes a workflow type that this worker supports.
167
   *
168
   * <p>Implementations that share a worker must implement different interfaces as a workflow type
169
   * is identified by the workflow interface, not by the implementation.
170
   *
171
   * <p>The reason for registration accepting workflow class, but not the workflow instance is that
172
   * workflows are stateful and a new instance is created for each workflow execution.
173
   */
174
  public void registerWorkflowImplementationTypes(
175
      WorkflowImplementationOptions options, Class<?>... workflowImplementationClasses) {
176
    Preconditions.checkState(
1✔
177
        !started.get(),
1✔
178
        "registerWorkflowImplementationTypes is not allowed after worker has started");
179

180
    workflowWorker.setWorkflowImplementationTypes(options, workflowImplementationClasses);
1✔
181
  }
1✔
182

183
  /**
184
   * Configures a factory to use when an instance of a workflow implementation is created.
185
   * !IMPORTANT to provide newly created instances, each time factory is applied.
186
   *
187
   * <p>Unless mocking a workflow execution use {@link
188
   * #registerWorkflowImplementationTypes(Class[])}.
189
   *
190
   * @param workflowInterface Workflow interface that this factory implements
191
   * @param factory factory that when called creates a new instance of the workflow implementation
192
   *     object.
193
   * @param <R> type of the workflow object to create.
194
   */
195
  public <R> void addWorkflowImplementationFactory(
196
      WorkflowImplementationOptions options, Class<R> workflowInterface, Func<R> factory) {
197
    workflowWorker.addWorkflowImplementationFactory(options, workflowInterface, factory);
×
198
  }
×
199

200
  /**
201
   * Configures a factory to use when an instance of a workflow implementation is created. The only
202
   * valid use for this method is unit testing, specifically to instantiate mocks that implement
203
   * child workflows. An example of mocking a child workflow:
204
   *
205
   * <pre><code>
206
   *   worker.addWorkflowImplementationFactory(ChildWorkflow.class, () -> {
207
   *     ChildWorkflow child = mock(ChildWorkflow.class);
208
   *     when(child.workflow(anyString(), anyString())).thenReturn("result1");
209
   *     return child;
210
   *   });
211
   * </code></pre>
212
   *
213
   * <p>Unless mocking a workflow execution use {@link
214
   * #registerWorkflowImplementationTypes(Class[])}.
215
   *
216
   * @param workflowInterface Workflow interface that this factory implements
217
   * @param factory factory that when called creates a new instance of the workflow implementation
218
   *     object.
219
   * @param <R> type of the workflow object to create.
220
   */
221
  @VisibleForTesting
222
  public <R> void addWorkflowImplementationFactory(Class<R> workflowInterface, Func<R> factory) {
223
    workflowWorker.addWorkflowImplementationFactory(workflowInterface, factory);
1✔
224
  }
1✔
225

226
  /**
227
   * Register activity implementation objects with a worker. Overwrites previously registered
228
   * objects. As activities are reentrant and stateless only one instance per activity type is
229
   * registered.
230
   *
231
   * <p>Implementations that share a worker must implement different interfaces as an activity type
232
   * is identified by the activity interface, not by the implementation.
233
   *
234
   * <p>
235
   */
236
  public void registerActivitiesImplementations(Object... activityImplementations) {
237
    Preconditions.checkState(
1✔
238
        !started.get(),
1✔
239
        "registerActivitiesImplementations is not allowed after worker has started");
240

241
    if (activityWorker != null) {
1✔
242
      activityWorker.setActivitiesImplementation(activityImplementations);
1✔
243
      workflowWorker.setActivitiesImplementationToDispatchLocally(activityImplementations);
1✔
244
    }
245

246
    workflowWorker.setLocalActivitiesImplementation(activityImplementations);
1✔
247
  }
1✔
248

249
  void start() {
250
    if (!started.compareAndSet(false, true)) {
1✔
251
      return;
×
252
    }
253
    workflowWorker.start();
1✔
254
    activityWorker.start();
1✔
255
  }
1✔
256

257
  void shutdown() {
258
    activityWorker.shutdown();
1✔
259
    workflowWorker.shutdown();
1✔
260
  }
1✔
261

262
  void shutdownNow() {
263
    activityWorker.shutdownNow();
1✔
264
    workflowWorker.shutdownNow();
1✔
265
  }
1✔
266

267
  boolean isTerminated() {
268
    return activityWorker.isTerminated() && workflowWorker.isTerminated();
×
269
  }
270

271
  void awaitTermination(long timeout, TimeUnit unit) {
272
    long timeoutMillis = InternalUtils.awaitTermination(activityWorker, unit.toMillis(timeout));
1✔
273
    InternalUtils.awaitTermination(workflowWorker, timeoutMillis);
1✔
274
  }
1✔
275

276
  @Override
277
  public String toString() {
278
    return "Worker{" + "options=" + options + '}';
×
279
  }
280

281
  /**
282
   * This is an utility method to replay a workflow execution using this particular instance of a
283
   * worker. This method is useful to troubleshoot workflows by running them in a debugger. To work
284
   * the workflow implementation type must be registered with this worker. There is no need to call
285
   * {@link #start()} to be able to call this method.
286
   *
287
   * @param history workflow execution history to replay
288
   * @throws Exception if replay failed for any reason
289
   */
290
  public void replayWorkflowExecution(WorkflowExecutionHistory history) throws Exception {
291
    workflowWorker.queryWorkflowExecution(
1✔
292
        history,
293
        WorkflowClient.QUERY_TYPE_REPLAY_ONLY,
294
        String.class,
295
        String.class,
296
        new Object[] {});
297
  }
1✔
298

299
  /**
300
   * This is an utility method to replay a workflow execution using this particular instance of a
301
   * worker. This method is useful to troubleshoot workflows by running them in a debugger. To work
302
   * the workflow implementation type must be registered with this worker. There is no need to call
303
   * {@link #start()} to be able to call this method.
304
   *
305
   * @param jsonSerializedHistory workflow execution history in JSON format to replay
306
   * @throws Exception if replay failed for any reason
307
   */
308
  public void replayWorkflowExecution(String jsonSerializedHistory) throws Exception {
309
    WorkflowExecutionHistory history = WorkflowExecutionHistory.fromJson(jsonSerializedHistory);
×
310
    replayWorkflowExecution(history);
×
311
  }
×
312

313
  public String getTaskList() {
314
    return taskList;
×
315
  }
316

317
  @Override
318
  public void suspendPolling() {
319
    workflowWorker.suspendPolling();
1✔
320
    activityWorker.suspendPolling();
1✔
321
  }
1✔
322

323
  @Override
324
  public void resumePolling() {
325
    workflowWorker.resumePolling();
1✔
326
    activityWorker.resumePolling();
1✔
327
  }
1✔
328

329
  @Override
330
  public boolean isSuspended() {
331
    return workflowWorker.isSuspended() && activityWorker.isSuspended();
×
332
  }
333

334
  /**
335
   * Checks if we have a valid connection to the Cadence cluster, and potentially resets the peer
336
   * list
337
   */
338
  public CompletableFuture<Boolean> isHealthy() {
339
    return workflowWorker.isHealthy();
×
340
  }
341
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc