• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

temporalio / sdk-java / #333

16 Oct 2024 07:28PM UTC coverage: 78.65% (+0.6%) from 78.085%
#333

push

github

web-flow
Fix code coverage (#2275)

22670 of 28824 relevant lines covered (78.65%)

0.79 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

81.95
/temporal-testing/src/main/java/io/temporal/testing/internal/SDKTestWorkflowRule.java
1
/*
2
 * Copyright (C) 2022 Temporal Technologies, Inc. All Rights Reserved.
3
 *
4
 * Copyright (C) 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved.
5
 *
6
 * Modifications copyright (C) 2017 Uber Technologies, Inc.
7
 *
8
 * Licensed under the Apache License, Version 2.0 (the "License");
9
 * you may not use this material except in compliance with the License.
10
 * You may obtain a copy of the License at
11
 *
12
 *   http://www.apache.org/licenses/LICENSE-2.0
13
 *
14
 * Unless required by applicable law or agreed to in writing, software
15
 * distributed under the License is distributed on an "AS IS" BASIS,
16
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17
 * See the License for the specific language governing permissions and
18
 * limitations under the License.
19
 */
20

21
package io.temporal.testing.internal;
22

23
import static io.temporal.client.WorkflowClient.QUERY_TYPE_STACK_TRACE;
24
import static org.junit.Assert.fail;
25

26
import com.google.common.base.Charsets;
27
import com.google.common.base.Throwables;
28
import com.google.common.io.CharSink;
29
import com.google.common.io.Files;
30
import com.uber.m3.tally.Scope;
31
import io.temporal.api.enums.v1.EventType;
32
import io.temporal.api.enums.v1.IndexedValueType;
33
import io.temporal.api.history.v1.History;
34
import io.temporal.api.history.v1.HistoryEvent;
35
import io.temporal.api.nexus.v1.Endpoint;
36
import io.temporal.client.WorkflowClient;
37
import io.temporal.client.WorkflowClientOptions;
38
import io.temporal.client.WorkflowQueryException;
39
import io.temporal.client.WorkflowStub;
40
import io.temporal.common.SearchAttributeKey;
41
import io.temporal.common.WorkflowExecutionHistory;
42
import io.temporal.common.interceptors.WorkerInterceptor;
43
import io.temporal.internal.common.env.DebugModeUtils;
44
import io.temporal.internal.worker.WorkflowExecutorCache;
45
import io.temporal.serviceclient.WorkflowServiceStubs;
46
import io.temporal.serviceclient.WorkflowServiceStubsOptions;
47
import io.temporal.testing.TestWorkflowEnvironment;
48
import io.temporal.testing.TestWorkflowRule;
49
import io.temporal.worker.*;
50
import io.temporal.workflow.Functions;
51
import java.io.File;
52
import java.io.IOException;
53
import java.lang.reflect.InvocationTargetException;
54
import java.lang.reflect.Method;
55
import java.time.Duration;
56
import java.util.ArrayList;
57
import java.util.List;
58
import java.util.concurrent.ExecutionException;
59
import java.util.concurrent.ScheduledExecutorService;
60
import java.util.concurrent.ScheduledFuture;
61
import java.util.concurrent.ScheduledThreadPoolExecutor;
62
import java.util.concurrent.TimeUnit;
63
import javax.annotation.Nonnull;
64
import javax.annotation.Nullable;
65
import org.junit.Test;
66
import org.junit.rules.TestRule;
67
import org.junit.rules.Timeout;
68
import org.junit.runner.Description;
69
import org.junit.runners.model.Statement;
70
import org.slf4j.Logger;
71
import org.slf4j.LoggerFactory;
72

73
/**
74
 * Intended to be used only in the Java SDK test code. This Rule duplicates {@link TestWorkflowRule}
75
 * and provides additional convenience methods for SDK development
76
 */
77
public class SDKTestWorkflowRule implements TestRule {
78
  private static final Logger log = LoggerFactory.getLogger(SDKTestWorkflowRule.class);
1✔
79

80
  private static final long DEFAULT_TEST_TIMEOUT_SECONDS = 10;
81

82
  private static final long BUSY_WAIT_SLEEP_MS = 100;
83

84
  public static final String NAMESPACE = "UnitTest";
85
  public static final String UUID_REGEXP =
86
      "[0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12}";
87
  // Enable to regenerate JsonFiles used for replay testing.
88
  public static final boolean REGENERATE_JSON_FILES = false;
89
  // Only enable when USE_DOCKER_SERVICE is true
90
  public static final boolean useExternalService =
91
      ExternalServiceTestConfigurator.isUseExternalService();
1✔
92
  private static final List<ScheduledFuture<?>> delayedCallbacks = new ArrayList<>();
1✔
93
  private static final ScheduledExecutorService scheduledExecutor =
1✔
94
      new ScheduledThreadPoolExecutor(1);
95

96
  @Nullable private final Timeout globalTimeout;
97

98
  private final TestWorkflowRule testWorkflowRule;
99

100
  private SDKTestWorkflowRule(SDKTestWorkflowRule.Builder builder) {
1✔
101
    globalTimeout =
1✔
102
        !DebugModeUtils.isTemporalDebugModeOn()
1✔
103
            ? Timeout.seconds(
1✔
104
                builder.testTimeoutSeconds == 0
1✔
105
                    ? DEFAULT_TEST_TIMEOUT_SECONDS
1✔
106
                    : builder.testTimeoutSeconds)
1✔
107
            : null;
1✔
108

109
    testWorkflowRule =
1✔
110
        ExternalServiceTestConfigurator.configure(builder.testWorkflowRuleBuilder).build();
1✔
111
  }
1✔
112

113
  public static Builder newBuilder() {
114
    return new Builder();
1✔
115
  }
116

117
  public static class Builder {
118
    private long testTimeoutSeconds;
119

120
    private boolean workerFactoryOptionsAreSet = false;
1✔
121
    private final TestWorkflowRule.Builder testWorkflowRuleBuilder;
122

123
    public Builder() {
1✔
124
      testWorkflowRuleBuilder = TestWorkflowRule.newBuilder();
1✔
125
    }
1✔
126

127
    public Builder setWorkflowServiceStubsOptions(
128
        WorkflowServiceStubsOptions workflowServiceStubsOptions) {
129
      testWorkflowRuleBuilder.setWorkflowServiceStubsOptions(workflowServiceStubsOptions);
1✔
130
      return this;
1✔
131
    }
132

133
    public Builder setWorkflowClientOptions(WorkflowClientOptions workflowClientOptions) {
134
      testWorkflowRuleBuilder.setWorkflowClientOptions(workflowClientOptions);
1✔
135
      return this;
1✔
136
    }
137

138
    public Builder setWorkerOptions(WorkerOptions options) {
139
      testWorkflowRuleBuilder.setWorkerOptions(options);
1✔
140
      return this;
1✔
141
    }
142

143
    public Builder setWorkerFactoryOptions(WorkerFactoryOptions options) {
144
      options =
145
          (options.getWorkerInterceptors() == null)
1✔
146
              ? WorkerFactoryOptions.newBuilder(options)
1✔
147
                  .setWorkerInterceptors(
1✔
148
                      new TracingWorkerInterceptor(new TracingWorkerInterceptor.FilteredTrace()))
149
                  .build()
1✔
150
              : options;
1✔
151
      testWorkflowRuleBuilder.setWorkerFactoryOptions(options);
1✔
152
      workerFactoryOptionsAreSet = true;
1✔
153
      return this;
1✔
154
    }
155

156
    public Builder setNamespace(String namespace) {
157
      testWorkflowRuleBuilder.setNamespace(namespace);
×
158
      return this;
×
159
    }
160

161
    public Builder setWorkflowTypes(Class<?>... workflowTypes) {
162
      testWorkflowRuleBuilder.setWorkflowTypes(workflowTypes);
1✔
163
      return this;
1✔
164
    }
165

166
    public Builder setNexusServiceImplementation(Object... nexusServiceImplementations) {
167
      testWorkflowRuleBuilder.setNexusServiceImplementation(nexusServiceImplementations);
1✔
168
      return this;
1✔
169
    }
170

171
    public Builder setWorkflowTypes(
172
        WorkflowImplementationOptions implementationOptions, Class<?>... workflowTypes) {
173
      testWorkflowRuleBuilder.setWorkflowTypes(implementationOptions, workflowTypes);
1✔
174
      return this;
1✔
175
    }
176

177
    public Builder setActivityImplementations(Object... activityImplementations) {
178
      testWorkflowRuleBuilder.setActivityImplementations(activityImplementations);
1✔
179
      return this;
1✔
180
    }
181

182
    public Builder setUseExternalService(boolean useExternalService) {
183
      testWorkflowRuleBuilder.setUseExternalService(useExternalService);
×
184
      return this;
×
185
    }
186

187
    public Builder setTarget(String target) {
188
      testWorkflowRuleBuilder.setTarget(target);
×
189
      return this;
×
190
    }
191

192
    /** Global test timeout. Default is 10 seconds. */
193
    public Builder setTestTimeoutSeconds(long testTimeoutSeconds) {
194
      this.testTimeoutSeconds = testTimeoutSeconds;
1✔
195
      return this;
1✔
196
    }
197

198
    public Builder setInitialTimeMillis(long initialTimeMillis) {
199
      testWorkflowRuleBuilder.setInitialTimeMillis(initialTimeMillis);
1✔
200
      return this;
1✔
201
    }
202

203
    public Builder setDoNotStart(boolean doNotStart) {
204
      testWorkflowRuleBuilder.setDoNotStart(doNotStart);
1✔
205
      return this;
1✔
206
    }
207

208
    public Builder setUseTimeskipping(boolean useTimeskipping) {
209
      testWorkflowRuleBuilder.setUseTimeskipping(useTimeskipping);
1✔
210
      return this;
1✔
211
    }
212

213
    public Builder registerSearchAttribute(String name, IndexedValueType type) {
214
      testWorkflowRuleBuilder.registerSearchAttribute(name, type);
1✔
215
      return this;
1✔
216
    }
217

218
    public Builder registerSearchAttribute(SearchAttributeKey<?> key) {
219
      testWorkflowRuleBuilder.registerSearchAttribute(key);
1✔
220
      return this;
1✔
221
    }
222

223
    public Builder setMetricsScope(Scope scope) {
224
      testWorkflowRuleBuilder.setMetricsScope(scope);
1✔
225
      return this;
1✔
226
    }
227

228
    public SDKTestWorkflowRule build() {
229
      if (!workerFactoryOptionsAreSet) {
1✔
230
        testWorkflowRuleBuilder.setWorkerFactoryOptions(
1✔
231
            WorkerFactoryOptions.newBuilder()
1✔
232
                .setWorkerInterceptors(
1✔
233
                    new TracingWorkerInterceptor(new TracingWorkerInterceptor.FilteredTrace()))
234
                .build());
1✔
235
      }
236
      return new SDKTestWorkflowRule(this);
1✔
237
    }
238
  }
239

240
  public Statement apply(@Nonnull Statement base, Description description) {
241
    Statement testWorkflowStatement =
1✔
242
        new Statement() {
1✔
243
          @Override
244
          public void evaluate() throws Throwable {
245
            base.evaluate();
1✔
246
            shutdown();
1✔
247
          }
1✔
248
        };
249

250
    Test annotation = description.getAnnotation(Test.class);
1✔
251
    boolean timeoutIsOverriddenOnTestAnnotation = annotation != null && annotation.timeout() > 0;
1✔
252
    if (globalTimeout != null && !timeoutIsOverriddenOnTestAnnotation) {
1✔
253
      testWorkflowStatement = globalTimeout.apply(testWorkflowStatement, description);
1✔
254
    }
255

256
    return testWorkflowRule.apply(testWorkflowStatement, description);
1✔
257
  }
258

259
  public <T extends WorkerInterceptor> T getInterceptor(Class<T> type) {
260
    return testWorkflowRule.getInterceptor(type);
1✔
261
  }
262

263
  public String getTaskQueue() {
264
    return testWorkflowRule.getTaskQueue();
1✔
265
  }
266

267
  public Endpoint getNexusEndpoint() {
268
    return testWorkflowRule.getNexusEndpoint();
1✔
269
  }
270

271
  public Worker getWorker() {
272
    return testWorkflowRule.getWorker();
1✔
273
  }
274

275
  public WorkerFactoryOptions getWorkerFactoryOptions() {
276
    return testWorkflowRule.getWorkerFactoryOptions();
×
277
  }
278

279
  public WorkflowExecutionHistory getExecutionHistory(String workflowId) {
280
    return testWorkflowRule.getWorkflowClient().fetchHistory(workflowId);
1✔
281
  }
282

283
  /** Returns list of all events of the given EventType found in the history. */
284
  public List<HistoryEvent> getHistoryEvents(String workflowId, EventType eventType) {
285
    List<HistoryEvent> result = new ArrayList<>();
1✔
286
    History history = getExecutionHistory(workflowId).getHistory();
1✔
287
    for (HistoryEvent event : history.getEventsList()) {
1✔
288
      if (eventType == event.getEventType()) {
1✔
289
        result.add(event);
1✔
290
      }
291
    }
1✔
292
    return result;
1✔
293
  }
294

295
  /** Returns the first event of the given EventType found in the history. */
296
  public HistoryEvent getHistoryEvent(String workflowId, EventType eventType) {
297
    History history = getExecutionHistory(workflowId).getHistory();
1✔
298
    for (HistoryEvent event : history.getEventsList()) {
1✔
299
      if (eventType == event.getEventType()) {
1✔
300
        return event;
1✔
301
      }
302
    }
1✔
303
    throw new IllegalArgumentException("No event of " + eventType + " found in the history");
×
304
  }
305

306
  /** Asserts that an event of the given EventType is found in the history. */
307
  public void assertHistoryEvent(String workflowId, EventType eventType) {
308
    History history = getExecutionHistory(workflowId).getHistory();
1✔
309
    for (HistoryEvent event : history.getEventsList()) {
1✔
310
      if (eventType == event.getEventType()) {
1✔
311
        return;
1✔
312
      }
313
    }
1✔
314
    fail("No event of " + eventType + " found in the history");
×
315
  }
×
316

317
  /** Asserts that an event of the given EventType is not found in the history. */
318
  public void assertNoHistoryEvent(String workflowId, EventType eventType) {
319
    History history = getExecutionHistory(workflowId).getHistory();
1✔
320
    assertNoHistoryEvent(history, eventType);
1✔
321
  }
1✔
322

323
  /** Asserts that an event of the given EventType is not found in the history. */
324
  public static void assertNoHistoryEvent(History history, EventType eventType) {
325
    for (HistoryEvent event : history.getEventsList()) {
1✔
326
      if (eventType == event.getEventType()) {
1✔
327
        fail("Event of " + eventType + " found in the history");
×
328
      }
329
    }
1✔
330
  }
1✔
331

332
  /** Waits till the end of the workflow task if there is a workflow task in progress */
333
  public void waitForTheEndOfWFT(String workflowId) {
334
    WorkflowExecutionHistory initialHistory = getExecutionHistory(workflowId);
1✔
335

336
    HistoryEvent lastEvent = initialHistory.getLastEvent();
1✔
337
    if (isWFTInProgress(lastEvent)) {
1✔
338
      // wait for completion of a workflow task in progress
339
      long startEventId = lastEvent.getEventId();
1✔
340
      while (true) {
341
        List<HistoryEvent> historyEvents =
1✔
342
            getExecutionHistory(workflowId).getHistory().getEventsList();
1✔
343
        if (historyEvents.stream()
1✔
344
            .filter(e -> e.getEventId() > startEventId)
1✔
345
            .anyMatch(e -> !isWFTInProgress(e))) {
1✔
346
          return;
1✔
347
        }
348
        busyWaitSleep();
1✔
349
      }
1✔
350
    }
351
  }
1✔
352

353
  public WorkflowClient getWorkflowClient() {
354
    return testWorkflowRule.getWorkflowClient();
1✔
355
  }
356

357
  public WorkflowServiceStubs getWorkflowServiceStubs() {
358
    return testWorkflowRule.getWorkflowServiceStubs();
1✔
359
  }
360

361
  public boolean isUseExternalService() {
362
    return useExternalService;
1✔
363
  }
364

365
  public TestWorkflowEnvironment getTestEnvironment() {
366
    return testWorkflowRule.getTestEnvironment();
1✔
367
  }
368

369
  public <T> T newWorkflowStub(Class<T> workflow) {
370
    return testWorkflowRule.newWorkflowStub(workflow);
1✔
371
  }
372

373
  public <T> T newWorkflowStubTimeoutOptions(Class<T> workflow) {
374
    return getWorkflowClient()
1✔
375
        .newWorkflowStub(workflow, SDKTestOptions.newWorkflowOptionsWithTimeouts(getTaskQueue()));
1✔
376
  }
377

378
  public <T> T newWorkflowStub200sTimeoutOptions(Class<T> workflow) {
379
    return getWorkflowClient()
1✔
380
        .newWorkflowStub(
1✔
381
            workflow, SDKTestOptions.newWorkflowOptionsForTaskQueue200sTimeout(getTaskQueue()));
1✔
382
  }
383

384
  public WorkflowStub newUntypedWorkflowStub(String workflow) {
385
    return testWorkflowRule.newUntypedWorkflowStub(workflow);
1✔
386
  }
387

388
  public WorkflowStub newUntypedWorkflowStubTimeoutOptions(String workflow) {
389
    return getWorkflowClient()
1✔
390
        .newUntypedWorkflowStub(
1✔
391
            workflow, SDKTestOptions.newWorkflowOptionsWithTimeouts(getTaskQueue()));
1✔
392
  }
393

394
  /** Used to ensure that workflow first workflow task is executed. */
395
  public static void waitForOKQuery(Object anyStub) {
396
    WorkflowStub untypedStub;
397
    if (anyStub instanceof WorkflowStub) {
1✔
398
      untypedStub = (WorkflowStub) anyStub;
1✔
399
    } else {
400
      untypedStub = WorkflowStub.fromTyped(anyStub);
1✔
401
    }
402
    while (true) {
403
      try {
404
        String stackTrace = untypedStub.query(QUERY_TYPE_STACK_TRACE, String.class);
1✔
405
        if (!stackTrace.isEmpty()) {
1✔
406
          break;
1✔
407
        }
408
        busyWaitSleep();
×
409
      } catch (WorkflowQueryException e) {
×
410
        // Ignore
411
      }
×
412
    }
413
  }
1✔
414

415
  public <R> void addWorkflowImplementationFactory(
416
      Class<R> factoryImpl, Functions.Func<R> factoryFunc) {
417
    this.getTestEnvironment()
1✔
418
        .getWorkerFactory()
1✔
419
        .getWorker(this.getTaskQueue())
1✔
420
        .registerWorkflowImplementationFactory(factoryImpl, factoryFunc);
1✔
421
  }
1✔
422

423
  public void regenerateHistoryForReplay(String workflowId, String fileName) {
424
    if (REGENERATE_JSON_FILES) {
425
      String json = getExecutionHistory(workflowId).toJson(true);
426
      String projectPath = System.getProperty("user.dir");
427
      String resourceFile = projectPath + "/src/test/resources/" + fileName + ".json";
428
      File file = new File(resourceFile);
429
      CharSink sink = Files.asCharSink(file, Charsets.UTF_8);
430
      try {
431
        sink.write(json);
432
      } catch (IOException e) {
433
        Throwables.propagateIfPossible(e, RuntimeException.class);
434
      }
435
      log.info("Regenerated history file: " + resourceFile);
436
    }
437
  }
1✔
438

439
  // TODO: Refactor testEnv to support testing through real service to avoid these switches.
440
  public void registerDelayedCallback(Duration delay, Runnable r) {
441
    if (useExternalService) {
1✔
442
      ScheduledFuture<?> result =
×
443
          scheduledExecutor.schedule(r, delay.toMillis(), TimeUnit.MILLISECONDS);
×
444
      delayedCallbacks.add(result);
×
445
    } else {
×
446
      testWorkflowRule.getTestEnvironment().registerDelayedCallback(delay, r);
1✔
447
    }
448
  }
1✔
449

450
  protected void shutdown() throws Throwable {
451
    getTestEnvironment().shutdown();
1✔
452

453
    TracingWorkerInterceptor tracer = getInterceptor(TracingWorkerInterceptor.class);
1✔
454
    if (tracer != null) {
1✔
455
      tracer.assertExpected();
1✔
456
    }
457

458
    for (ScheduledFuture<?> result : delayedCallbacks) {
1✔
459
      if (result.isDone() && !result.isCancelled()) {
×
460
        try {
461
          result.get();
×
462
        } catch (InterruptedException e) {
×
463
          Thread.currentThread().interrupt();
×
464
        } catch (ExecutionException e) {
×
465
          throw e.getCause();
×
466
        }
×
467
      }
468
    }
×
469
  }
1✔
470

471
  public void sleep(Duration d) {
472
    if (useExternalService) {
1✔
473
      try {
474
        Thread.sleep(d.toMillis());
×
475
      } catch (InterruptedException e) {
×
476
        Thread.currentThread().interrupt();
×
477
        throw new RuntimeException("Interrupted", e);
×
478
      }
×
479
    } else {
480
      testWorkflowRule.getTestEnvironment().sleep(d);
1✔
481
    }
482
  }
1✔
483

484
  /** Causes eviction of all workflows in the worker cache */
485
  // TODO replace the horrible reflection implementation with a normal protected access by hiding
486
  //  WorkerFactory under an interface.
487
  public void invalidateWorkflowCache() {
488
    WorkerFactory workerFactory = testWorkflowRule.getTestEnvironment().getWorkerFactory();
1✔
489
    try {
490
      Method getCache = WorkerFactory.class.getDeclaredMethod("getCache");
1✔
491
      getCache.setAccessible(true);
1✔
492
      WorkflowExecutorCache cache = (WorkflowExecutorCache) getCache.invoke(workerFactory);
1✔
493
      cache.invalidateAll();
1✔
494
      while (cache.size() > 0) {
1✔
495
        busyWaitSleep();
×
496
      }
497
    } catch (NoSuchMethodException | IllegalAccessException | InvocationTargetException e) {
×
498
      throw new RuntimeException(e);
×
499
    }
1✔
500
  }
1✔
501

502
  private static boolean isWFTInProgress(HistoryEvent event) {
503
    EventType eventType = event.getEventType();
1✔
504
    switch (eventType) {
1✔
505
      case EVENT_TYPE_WORKFLOW_TASK_STARTED:
506
      case EVENT_TYPE_WORKFLOW_TASK_SCHEDULED:
507
        return true;
1✔
508
      default:
509
        return false;
1✔
510
    }
511
  }
512

513
  private static void busyWaitSleep() {
514
    try {
515
      Thread.sleep(BUSY_WAIT_SLEEP_MS);
1✔
516
    } catch (InterruptedException e) {
×
517
      Thread.currentThread().interrupt();
×
518
      throw new RuntimeException(e);
×
519
    }
1✔
520
  }
1✔
521
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc