• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

uber / cadence-java-client / 2555

24 Oct 2024 10:50PM UTC coverage: 66.622% (+0.4%) from 66.195%
2555

push

buildkite

web-flow
Refactor Test environment initialization to CadenceTestRule from WorkflowTest. (#923)

WorkflowTest is currently 6,000 lines long and has nearly every test related to end to end client behavior. It provides the rather neat behavior that it supports running against both an instance of Cadence running in Docker and against the test version. It's additionally parameterized to run the entire test suite with or without sticky execution enabled.

Due to the complexity in handling both environments, adding yet another test to WorkflowTest has always been the easiest option for developers. To allow for tests to easily be split into other files, extract the core functionality to a Junit test rule that can easily be reused by additional tests.

With the exception of testSignalCrossDomainExternalWorkflow and the replay tests that don't use the test environment, all tests have been left in WorkflowTest to be split out later.

12910 of 19378 relevant lines covered (66.62%)

0.67 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

89.69
/src/main/java/com/uber/cadence/internal/sync/WorkflowInternal.java
1
/*
2
 *  Modifications Copyright (c) 2017-2020 Uber Technologies Inc.
3
 *  Portions of the Software are attributed to Copyright (c) 2020 Temporal Technologies Inc.
4
 *  Copyright 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved.
5
 *
6
 *  Licensed under the Apache License, Version 2.0 (the "License"). You may not
7
 *  use this file except in compliance with the License. A copy of the License is
8
 *  located at
9
 *
10
 *  http://aws.amazon.com/apache2.0
11
 *
12
 *  or in the "license" file accompanying this file. This file is distributed on
13
 *  an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
14
 *  express or implied. See the License for the specific language governing
15
 *  permissions and limitations under the License.
16
 */
17

18
package com.uber.cadence.internal.sync;
19

20
import static com.uber.cadence.internal.sync.AsyncInternal.AsyncMarker;
21

22
import com.google.common.reflect.TypeToken;
23
import com.uber.cadence.WorkflowExecution;
24
import com.uber.cadence.activity.ActivityOptions;
25
import com.uber.cadence.activity.LocalActivityOptions;
26
import com.uber.cadence.common.RetryOptions;
27
import com.uber.cadence.internal.common.CheckedExceptionWrapper;
28
import com.uber.cadence.internal.common.InternalUtils;
29
import com.uber.cadence.internal.logging.ReplayAwareLogger;
30
import com.uber.cadence.workflow.ActivityStub;
31
import com.uber.cadence.workflow.CancellationScope;
32
import com.uber.cadence.workflow.ChildWorkflowOptions;
33
import com.uber.cadence.workflow.ChildWorkflowStub;
34
import com.uber.cadence.workflow.CompletablePromise;
35
import com.uber.cadence.workflow.ContinueAsNewOptions;
36
import com.uber.cadence.workflow.ExternalWorkflowStub;
37
import com.uber.cadence.workflow.Functions;
38
import com.uber.cadence.workflow.Functions.Func;
39
import com.uber.cadence.workflow.Promise;
40
import com.uber.cadence.workflow.QueryMethod;
41
import com.uber.cadence.workflow.Workflow;
42
import com.uber.cadence.workflow.WorkflowInfo;
43
import com.uber.cadence.workflow.WorkflowInterceptor;
44
import com.uber.cadence.workflow.WorkflowQueue;
45
import com.uber.m3.tally.Scope;
46
import java.lang.reflect.InvocationHandler;
47
import java.lang.reflect.Method;
48
import java.lang.reflect.Proxy;
49
import java.lang.reflect.Type;
50
import java.time.Duration;
51
import java.util.Collection;
52
import java.util.List;
53
import java.util.Map;
54
import java.util.Optional;
55
import java.util.Random;
56
import java.util.UUID;
57
import java.util.function.BiPredicate;
58
import java.util.function.Supplier;
59
import org.slf4j.Logger;
60
import org.slf4j.LoggerFactory;
61

62
/**
63
 * Never reference directly. It is public only because Java doesn't have internal package support.
64
 */
65
public final class WorkflowInternal {
66
  public static final int DEFAULT_VERSION = -1;
67
  public static final String CADENCE_CHANGE_VERSION = "CadenceChangeVersion";
68

69
  public static WorkflowThread newThread(boolean ignoreParentCancellation, Runnable runnable) {
70
    return WorkflowThread.newThread(runnable, ignoreParentCancellation);
1✔
71
  }
72

73
  public static Promise<Void> newTimer(Duration duration) {
74
    return getWorkflowInterceptor().newTimer(duration);
1✔
75
  }
76

77
  public static <E> WorkflowQueue<E> newQueue(int capacity) {
78
    return new WorkflowQueueImpl<>(capacity);
1✔
79
  }
80

81
  public static <E> CompletablePromise<E> newCompletablePromise() {
82
    return new CompletablePromiseImpl<>();
1✔
83
  }
84

85
  public static <E> Promise<E> newPromise(E value) {
86
    CompletablePromise<E> result = Workflow.newPromise();
1✔
87
    result.complete(value);
1✔
88
    return result;
1✔
89
  }
90

91
  public static <E> Promise<E> newFailedPromise(Exception failure) {
92
    CompletablePromise<E> result = new CompletablePromiseImpl<>();
1✔
93
    result.completeExceptionally(CheckedExceptionWrapper.wrap(failure));
1✔
94
    return result;
1✔
95
  }
96

97
  /**
98
   * Register query or queries implementation object. There is no need to register top level
99
   * workflow implementation object as it is done implicitly. Only methods annotated with @{@link
100
   * QueryMethod} are registered.
101
   */
102
  public static void registerQuery(Object queryImplementation) {
103
    Class<?> cls = queryImplementation.getClass();
1✔
104
    TypeToken<?>.TypeSet interfaces = TypeToken.of(cls).getTypes().interfaces();
1✔
105
    if (interfaces.isEmpty()) {
1✔
106
      throw new IllegalArgumentException(cls.getName() + " must implement at least one interface");
×
107
    }
108
    for (TypeToken<?> i : interfaces) {
1✔
109
      for (Method method : i.getRawType().getMethods()) {
1✔
110
        QueryMethod queryMethod = method.getAnnotation(QueryMethod.class);
1✔
111
        if (queryMethod != null) {
1✔
112
          String name = queryMethod.name();
1✔
113
          if (name.isEmpty()) {
1✔
114
            name = InternalUtils.getSimpleName(method);
1✔
115
          }
116
          getWorkflowInterceptor()
1✔
117
              .registerQuery(
1✔
118
                  name,
119
                  method.getGenericParameterTypes(),
1✔
120
                  (args) -> {
121
                    try {
122
                      return method.invoke(queryImplementation, args);
1✔
123
                    } catch (Throwable e) {
×
124
                      throw CheckedExceptionWrapper.wrap(e);
×
125
                    }
126
                  });
127
        }
128
      }
129
    }
1✔
130
  }
1✔
131

132
  /** Should be used to get current time instead of {@link System#currentTimeMillis()} */
133
  public static long currentTimeMillis() {
134
    return DeterministicRunnerImpl.currentThreadInternal().getRunner().currentTimeMillis();
1✔
135
  }
136

137
  /**
138
   * Creates client stub to activities that implement given interface.
139
   *
140
   * @param activityInterface interface type implemented by activities
141
   */
142
  public static <T> T newActivityStub(Class<T> activityInterface, ActivityOptions options) {
143
    WorkflowInterceptor decisionContext = WorkflowInternal.getWorkflowInterceptor();
1✔
144
    InvocationHandler invocationHandler =
1✔
145
        ActivityInvocationHandler.newInstance(options, decisionContext);
1✔
146
    return ActivityInvocationHandlerBase.newProxy(activityInterface, invocationHandler);
1✔
147
  }
148

149
  /**
150
   * Creates client stub to local activities that implement given interface.
151
   *
152
   * @param activityInterface interface type implemented by activities
153
   */
154
  public static <T> T newLocalActivityStub(
155
      Class<T> activityInterface, LocalActivityOptions options) {
156
    WorkflowInterceptor decisionContext = WorkflowInternal.getWorkflowInterceptor();
1✔
157
    InvocationHandler invocationHandler =
1✔
158
        LocalActivityInvocationHandler.newInstance(options, decisionContext);
1✔
159
    return ActivityInvocationHandlerBase.newProxy(activityInterface, invocationHandler);
1✔
160
  }
161

162
  public static ActivityStub newUntypedActivityStub(ActivityOptions options) {
163
    return ActivityStubImpl.newInstance(options, getWorkflowInterceptor());
1✔
164
  }
165

166
  public static ActivityStub newUntypedLocalActivityStub(LocalActivityOptions options) {
167
    return LocalActivityStubImpl.newInstance(options, getWorkflowInterceptor());
1✔
168
  }
169

170
  @SuppressWarnings("unchecked")
171
  public static <T> T newChildWorkflowStub(
172
      Class<T> workflowInterface, ChildWorkflowOptions options) {
173
    return (T)
1✔
174
        Proxy.newProxyInstance(
1✔
175
            workflowInterface.getClassLoader(),
1✔
176
            new Class<?>[] {workflowInterface, WorkflowStubMarker.class, AsyncMarker.class},
177
            new ChildWorkflowInvocationHandler(
178
                workflowInterface, options, getWorkflowInterceptor()));
1✔
179
  }
180

181
  @SuppressWarnings("unchecked")
182
  public static <T> T newExternalWorkflowStub(
183
      Class<T> workflowInterface, WorkflowExecution execution) {
184
    return (T)
1✔
185
        Proxy.newProxyInstance(
1✔
186
            workflowInterface.getClassLoader(),
1✔
187
            new Class<?>[] {workflowInterface, WorkflowStubMarker.class, AsyncMarker.class},
188
            new ExternalWorkflowInvocationHandler(execution, getWorkflowInterceptor()));
1✔
189
  }
190

191
  public static Promise<WorkflowExecution> getWorkflowExecution(Object workflowStub) {
192
    if (workflowStub instanceof WorkflowStubMarker) {
1✔
193
      return ((WorkflowStubMarker) workflowStub).__getWorkflowExecution();
1✔
194
    }
195
    throw new IllegalArgumentException(
×
196
        "Not a workflow stub created through Workflow.newChildWorkflowStub: " + workflowStub);
197
  }
198

199
  public static ChildWorkflowStub newUntypedChildWorkflowStub(
200
      String workflowType, ChildWorkflowOptions options) {
201
    return new ChildWorkflowStubImpl(workflowType, options, getWorkflowInterceptor());
1✔
202
  }
203

204
  public static ExternalWorkflowStub newUntypedExternalWorkflowStub(WorkflowExecution execution) {
205
    return new ExternalWorkflowStubImpl(execution, getWorkflowInterceptor());
1✔
206
  }
207

208
  /**
209
   * Creates client stub that can be used to continue this workflow as new.
210
   *
211
   * @param workflowInterface interface type implemented by the next generation of workflow
212
   */
213
  @SuppressWarnings("unchecked")
214
  public static <T> T newContinueAsNewStub(
215
      Class<T> workflowInterface, ContinueAsNewOptions options) {
216
    return (T)
1✔
217
        Proxy.newProxyInstance(
1✔
218
            workflowInterface.getClassLoader(),
1✔
219
            new Class<?>[] {workflowInterface},
220
            new ContinueAsNewWorkflowInvocationHandler(options, getWorkflowInterceptor()));
1✔
221
  }
222

223
  private static WorkflowInterceptor getWorkflowInterceptor() {
224
    return DeterministicRunnerImpl.currentThreadInternal()
1✔
225
        .getDecisionContext()
1✔
226
        .getWorkflowInterceptor();
1✔
227
  }
228

229
  private static SyncDecisionContext getRootDecisionContext() {
230
    return DeterministicRunnerImpl.currentThreadInternal().getDecisionContext();
1✔
231
  }
232

233
  public static void await(String reason, Supplier<Boolean> unblockCondition)
234
      throws DestroyWorkflowThreadError {
235
    getWorkflowInterceptor().await(reason, unblockCondition);
1✔
236
  }
1✔
237

238
  public static boolean await(Duration timeout, String reason, Supplier<Boolean> unblockCondition)
239
      throws DestroyWorkflowThreadError {
240
    return getWorkflowInterceptor().await(timeout, reason, unblockCondition);
1✔
241
  }
242

243
  public static <R> R sideEffect(Class<R> resultClass, Type resultType, Func<R> func) {
244
    return getWorkflowInterceptor().sideEffect(resultClass, resultType, func);
1✔
245
  }
246

247
  public static <R> R mutableSideEffect(
248
      String id, Class<R> resultClass, Type resultType, BiPredicate<R, R> updated, Func<R> func) {
249
    return getWorkflowInterceptor().mutableSideEffect(id, resultClass, resultType, updated, func);
1✔
250
  }
251

252
  public static int getVersion(String changeID, int minSupported, int maxSupported) {
253
    return getWorkflowInterceptor().getVersion(changeID, minSupported, maxSupported);
1✔
254
  }
255

256
  public static <U> Promise<List<U>> promiseAllOf(Collection<Promise<U>> promises) {
257
    return new AllOfPromise<>(promises);
1✔
258
  }
259

260
  @SuppressWarnings("unchecked")
261
  public static Promise<Void> promiseAllOf(Promise<?>... promises) {
262
    return new AllOfPromise(promises).thenApply(results -> null);
1✔
263
  }
264

265
  public static Promise<Object> promiseAnyOf(Iterable<Promise<?>> promises) {
266
    return CompletablePromiseImpl.promiseAnyOf(promises);
1✔
267
  }
268

269
  public static Promise<Object> promiseAnyOf(Promise<?>... promises) {
270
    return CompletablePromiseImpl.promiseAnyOf(promises);
1✔
271
  }
272

273
  public static CancellationScope newCancellationScope(boolean detached, Runnable runnable) {
274
    return new CancellationScopeImpl(detached, runnable);
1✔
275
  }
276

277
  public static CancellationScope newCancellationScope(
278
      boolean detached, Functions.Proc1<CancellationScope> proc) {
279
    return new CancellationScopeImpl(detached, proc);
1✔
280
  }
281

282
  public static CancellationScopeImpl currentCancellationScope() {
283
    return CancellationScopeImpl.current();
1✔
284
  }
285

286
  public static RuntimeException wrap(Throwable e) {
287
    return CheckedExceptionWrapper.wrap(e);
1✔
288
  }
289

290
  public static Exception unwrap(Exception e) {
291
    return CheckedExceptionWrapper.unwrap(e);
×
292
  }
293

294
  /** Prohibit instantiation */
295
  private WorkflowInternal() {}
296

297
  public static boolean isReplaying() {
298
    return getRootDecisionContext().isReplaying();
1✔
299
  }
300

301
  public static WorkflowInfo getWorkflowInfo() {
302
    return new WorkflowInfoImpl(getRootDecisionContext().getContext());
1✔
303
  }
304

305
  public static <R> R retry(RetryOptions options, Functions.Func<R> fn) {
306
    return WorkflowRetryerInternal.validateOptionsAndRetry(options, fn);
×
307
  }
308

309
  public static void continueAsNew(
310
      Optional<String> workflowType, Optional<ContinueAsNewOptions> options, Object[] args) {
311
    getWorkflowInterceptor().continueAsNew(workflowType, options, args);
×
312
  }
×
313

314
  public static void continueAsNew(
315
      Optional<String> workflowType,
316
      Optional<ContinueAsNewOptions> options,
317
      Object[] args,
318
      WorkflowInterceptor decisionContext) {
319
    decisionContext.continueAsNew(workflowType, options, args);
×
320
  }
×
321

322
  public static void sleep(Duration duration) {
323
    getWorkflowInterceptor().sleep(duration);
1✔
324
  }
1✔
325

326
  public static Scope getMetricsScope() {
327
    return getRootDecisionContext().getMetricsScope();
1✔
328
  }
329

330
  private static boolean isLoggingEnabledInReplay() {
331
    return getRootDecisionContext().isLoggingEnabledInReplay();
1✔
332
  }
333

334
  public static UUID randomUUID() {
335
    return getRootDecisionContext().randomUUID();
1✔
336
  }
337

338
  public static Random newRandom() {
339
    return getRootDecisionContext().newRandom();
1✔
340
  }
341

342
  public static Logger getLogger(Class<?> clazz) {
343
    Logger logger = LoggerFactory.getLogger(clazz);
1✔
344
    return new ReplayAwareLogger(
1✔
345
        logger, WorkflowInternal::isReplaying, WorkflowInternal::isLoggingEnabledInReplay);
346
  }
347

348
  public static Logger getLogger(String name) {
349
    Logger logger = LoggerFactory.getLogger(name);
1✔
350
    return new ReplayAwareLogger(
1✔
351
        logger, WorkflowInternal::isReplaying, WorkflowInternal::isLoggingEnabledInReplay);
352
  }
353

354
  public static <R> R getLastCompletionResult(Class<R> resultClass, Type resultType) {
355
    return getRootDecisionContext().getLastCompletionResult(resultClass, resultType);
1✔
356
  }
357

358
  public static void upsertSearchAttributes(Map<String, Object> searchAttributes) {
359
    getWorkflowInterceptor().upsertSearchAttributes(searchAttributes);
1✔
360
  }
1✔
361

362
  public static WorkflowThread getRootWorkflowContext() {
363
    return DeterministicRunnerImpl.currentThreadInternal();
1✔
364
  }
365
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc