• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

uber / cadence-java-client / 2555

24 Oct 2024 10:50PM UTC coverage: 66.622% (+0.4%) from 66.195%
2555

push

buildkite

web-flow
Refactor Test environment initialization to CadenceTestRule from WorkflowTest. (#923)

WorkflowTest is currently 6,000 lines long and has nearly every test related to end to end client behavior. It provides the rather neat behavior that it supports running against both an instance of Cadence running in Docker and against the test version. It's additionally parameterized to run the entire test suite with or without sticky execution enabled.

Due to the complexity in handling both environments, adding yet another test to WorkflowTest has always been the easiest option for developers. To allow for tests to easily be split into other files, extract the core functionality to a Junit test rule that can easily be reused by additional tests.

With the exception of testSignalCrossDomainExternalWorkflow and the replay tests that don't use the test environment, all tests have been left in WorkflowTest to be split out later.

12910 of 19378 relevant lines covered (66.62%)

0.67 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

90.91
/src/main/java/com/uber/cadence/internal/sync/SyncWorkflowWorker.java
1
/*
2
 *  Copyright 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved.
3
 *
4
 *  Modifications copyright (C) 2017 Uber Technologies, Inc.
5
 *
6
 *  Licensed under the Apache License, Version 2.0 (the "License"). You may not
7
 *  use this file except in compliance with the License. A copy of the License is
8
 *  located at
9
 *
10
 *  http://aws.amazon.com/apache2.0
11
 *
12
 *  or in the "license" file accompanying this file. This file is distributed on
13
 *  an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
14
 *  express or implied. See the License for the specific language governing
15
 *  permissions and limitations under the License.
16
 */
17

18
package com.uber.cadence.internal.sync;
19

20
import com.uber.cadence.PollForDecisionTaskResponse;
21
import com.uber.cadence.common.WorkflowExecutionHistory;
22
import com.uber.cadence.converter.DataConverter;
23
import com.uber.cadence.internal.common.InternalUtils;
24
import com.uber.cadence.internal.replay.DeciderCache;
25
import com.uber.cadence.internal.replay.ReplayDecisionTaskHandler;
26
import com.uber.cadence.internal.worker.DecisionTaskHandler;
27
import com.uber.cadence.internal.worker.LocalActivityWorker;
28
import com.uber.cadence.internal.worker.LocallyDispatchedActivityWorker;
29
import com.uber.cadence.internal.worker.LocallyDispatchedActivityWorker.Task;
30
import com.uber.cadence.internal.worker.NoopSuspendableWorker;
31
import com.uber.cadence.internal.worker.SingleWorkerOptions;
32
import com.uber.cadence.internal.worker.SuspendableWorker;
33
import com.uber.cadence.internal.worker.WorkflowWorker;
34
import com.uber.cadence.serviceclient.IWorkflowService;
35
import com.uber.cadence.worker.WorkflowImplementationOptions;
36
import com.uber.cadence.workflow.Functions.Func;
37
import com.uber.cadence.workflow.WorkflowInterceptor;
38
import java.lang.reflect.Type;
39
import java.time.Duration;
40
import java.util.Objects;
41
import java.util.concurrent.*;
42
import java.util.function.Consumer;
43
import java.util.function.Function;
44

45
/** Workflow worker that supports POJO workflow implementations. */
46
public class SyncWorkflowWorker
47
    implements SuspendableWorker, Consumer<PollForDecisionTaskResponse> {
48

49
  private final WorkflowWorker workflowWorker;
50
  private final LocalActivityWorker laWorker;
51
  private final POJOWorkflowImplementationFactory factory;
52
  private final DataConverter dataConverter;
53
  private final POJOActivityTaskHandler laTaskHandler;
54
  private final ScheduledExecutorService heartbeatExecutor = Executors.newScheduledThreadPool(4);
1✔
55
  private final ScheduledExecutorService ldaHeartbeatExecutor = Executors.newScheduledThreadPool(4);
1✔
56
  private SuspendableWorker ldaWorker;
57
  private POJOActivityTaskHandler ldaTaskHandler;
58
  private final IWorkflowService service;
59

60
  public SyncWorkflowWorker(
61
      IWorkflowService service,
62
      String domain,
63
      String taskList,
64
      Function<WorkflowInterceptor, WorkflowInterceptor> interceptorFactory,
65
      SingleWorkerOptions workflowOptions,
66
      SingleWorkerOptions localActivityOptions,
67
      SingleWorkerOptions locallyDispatchedActivityOptions,
68
      DeciderCache cache,
69
      String stickyTaskListName,
70
      Duration stickyDecisionScheduleToStartTimeout,
71
      ThreadPoolExecutor workflowThreadPool) {
1✔
72
    Objects.requireNonNull(workflowThreadPool);
1✔
73
    this.dataConverter = workflowOptions.getDataConverter();
1✔
74
    this.service = service;
1✔
75

76
    factory =
1✔
77
        new POJOWorkflowImplementationFactory(
78
            workflowOptions.getDataConverter(),
1✔
79
            workflowThreadPool,
80
            interceptorFactory,
81
            cache,
82
            workflowOptions.getContextPropagators(),
1✔
83
            workflowOptions.getTracer());
1✔
84

85
    laTaskHandler =
1✔
86
        new POJOActivityTaskHandler(
87
            service, domain, localActivityOptions.getDataConverter(), heartbeatExecutor);
1✔
88
    laWorker = new LocalActivityWorker(domain, taskList, localActivityOptions, laTaskHandler);
1✔
89

90
    DecisionTaskHandler taskHandler =
1✔
91
        new ReplayDecisionTaskHandler(
92
            domain,
93
            factory,
94
            cache,
95
            workflowOptions,
96
            stickyTaskListName,
97
            stickyDecisionScheduleToStartTimeout,
98
            service,
99
            laWorker.getLocalActivityTaskPoller());
1✔
100

101
    Function<Task, Boolean> locallyDispatchedActivityTaskPoller = null;
1✔
102
    // do not dispatch locally if TaskListActivitiesPerSecond is set
103
    if (locallyDispatchedActivityOptions.getTaskListActivitiesPerSecond() == 0) {
1✔
104
      ldaTaskHandler =
1✔
105
          new POJOActivityTaskHandler(
106
              service,
107
              domain,
108
              locallyDispatchedActivityOptions.getDataConverter(),
1✔
109
              ldaHeartbeatExecutor);
110
      ldaWorker =
1✔
111
          new LocallyDispatchedActivityWorker(
112
              service, domain, taskList, locallyDispatchedActivityOptions, ldaTaskHandler);
113
      locallyDispatchedActivityTaskPoller =
1✔
114
          ((LocallyDispatchedActivityWorker) ldaWorker).getLocallyDispatchedActivityTaskPoller();
1✔
115
    } else {
116
      ldaWorker = new NoopSuspendableWorker();
×
117
    }
118

119
    workflowWorker =
1✔
120
        new WorkflowWorker(
121
            service,
122
            domain,
123
            taskList,
124
            workflowOptions,
125
            taskHandler,
126
            locallyDispatchedActivityTaskPoller,
127
            stickyTaskListName);
128
  }
1✔
129

130
  public void setWorkflowImplementationTypes(
131
      WorkflowImplementationOptions options, Class<?>[] workflowImplementationTypes) {
132
    factory.setWorkflowImplementationTypes(options, workflowImplementationTypes);
1✔
133
  }
1✔
134

135
  public <R> void addWorkflowImplementationFactory(
136
      WorkflowImplementationOptions options, Class<R> clazz, Func<R> factory) {
137
    this.factory.addWorkflowImplementationFactory(options, clazz, factory);
×
138
  }
×
139

140
  public <R> void addWorkflowImplementationFactory(Class<R> clazz, Func<R> factory) {
141
    this.factory.addWorkflowImplementationFactory(clazz, factory);
1✔
142
  }
1✔
143

144
  public void setLocalActivitiesImplementation(Object... activitiesImplementation) {
145
    this.laTaskHandler.setLocalActivitiesImplementation(activitiesImplementation);
1✔
146
  }
1✔
147

148
  public void setActivitiesImplementationToDispatchLocally(Object... activitiesImplementation) {
149
    if (this.ldaTaskHandler != null) {
1✔
150
      this.ldaTaskHandler.setActivitiesImplementation(activitiesImplementation);
1✔
151
    }
152
  }
1✔
153

154
  @Override
155
  public void start() {
156
    workflowWorker.start();
1✔
157
    // workflowWorker doesn't start if no types are registered with it. In that case we don't need
158
    // to start LocalActivity Worker.
159
    if (workflowWorker.isStarted()) {
1✔
160
      laWorker.start();
1✔
161
      ldaWorker.start();
1✔
162
    }
163
  }
1✔
164

165
  @Override
166
  public boolean isStarted() {
167
    return workflowWorker.isStarted() && laWorker.isStarted() && ldaWorker.isStarted();
×
168
  }
169

170
  @Override
171
  public boolean isShutdown() {
172
    return workflowWorker.isShutdown() && laWorker.isShutdown() && ldaWorker.isShutdown();
×
173
  }
174

175
  @Override
176
  public boolean isTerminated() {
177
    return workflowWorker.isTerminated()
1✔
178
        && laWorker.isTerminated()
1✔
179
        && ldaHeartbeatExecutor.isTerminated()
1✔
180
        && ldaWorker.isTerminated();
1✔
181
  }
182

183
  @Override
184
  public void shutdown() {
185
    laWorker.shutdown();
1✔
186
    ldaHeartbeatExecutor.shutdown();
1✔
187
    ldaWorker.shutdown();
1✔
188
    workflowWorker.shutdown();
1✔
189
  }
1✔
190

191
  @Override
192
  public void shutdownNow() {
193
    laWorker.shutdownNow();
1✔
194
    ldaHeartbeatExecutor.shutdownNow();
1✔
195
    ldaWorker.shutdownNow();
1✔
196
    workflowWorker.shutdownNow();
1✔
197
  }
1✔
198

199
  @Override
200
  public void awaitTermination(long timeout, TimeUnit unit) {
201
    long timeoutMillis = InternalUtils.awaitTermination(laWorker, unit.toMillis(timeout));
1✔
202
    timeoutMillis = InternalUtils.awaitTermination(ldaHeartbeatExecutor, timeoutMillis);
1✔
203
    timeoutMillis = InternalUtils.awaitTermination(ldaWorker, timeoutMillis);
1✔
204
    InternalUtils.awaitTermination(workflowWorker, timeoutMillis);
1✔
205
  }
1✔
206

207
  @Override
208
  public void suspendPolling() {
209
    workflowWorker.suspendPolling();
1✔
210
    laWorker.suspendPolling();
1✔
211
    ldaWorker.suspendPolling();
1✔
212
  }
1✔
213

214
  @Override
215
  public void resumePolling() {
216
    workflowWorker.resumePolling();
1✔
217
    laWorker.resumePolling();
1✔
218
    ldaWorker.resumePolling();
1✔
219
  }
1✔
220

221
  @Override
222
  public boolean isSuspended() {
223
    return workflowWorker.isSuspended() && laWorker.isSuspended() && ldaWorker.isSuspended();
×
224
  }
225

226
  public <R> R queryWorkflowExecution(
227
      WorkflowExecutionHistory history,
228
      String queryType,
229
      Class<R> resultClass,
230
      Type resultType,
231
      Object[] args)
232
      throws Exception {
233
    byte[] serializedArgs = dataConverter.toData(args);
1✔
234
    byte[] result = workflowWorker.queryWorkflowExecution(history, queryType, serializedArgs);
1✔
235
    return dataConverter.fromData(result, resultClass, resultType);
1✔
236
  }
237

238
  @Override
239
  public void accept(PollForDecisionTaskResponse pollForDecisionTaskResponse) {
240
    workflowWorker.accept(pollForDecisionTaskResponse);
1✔
241
  }
1✔
242

243
  public CompletableFuture<Boolean> isHealthy() {
244
    return service.isHealthy();
×
245
  }
246
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc