• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

temporalio / sdk-java / #153

pending completion
#153

push

github-actions

web-flow
Eager Workflow Task Dispatch (#1674)

Issue #1646

Signed-off-by: Dmitry Spikhalskiy <dmitry@spikhalskiy.com>

213 of 213 new or added lines in 22 files covered. (100.0%)

16682 of 20566 relevant lines covered (81.11%)

0.81 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

77.44
/temporal-testing/src/main/java/io/temporal/testing/internal/SDKTestWorkflowRule.java
1
/*
2
 * Copyright (C) 2022 Temporal Technologies, Inc. All Rights Reserved.
3
 *
4
 * Copyright (C) 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved.
5
 *
6
 * Modifications copyright (C) 2017 Uber Technologies, Inc.
7
 *
8
 * Licensed under the Apache License, Version 2.0 (the "License");
9
 * you may not use this material except in compliance with the License.
10
 * You may obtain a copy of the License at
11
 *
12
 *   http://www.apache.org/licenses/LICENSE-2.0
13
 *
14
 * Unless required by applicable law or agreed to in writing, software
15
 * distributed under the License is distributed on an "AS IS" BASIS,
16
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17
 * See the License for the specific language governing permissions and
18
 * limitations under the License.
19
 */
20

21
package io.temporal.testing.internal;
22

23
import static io.temporal.client.WorkflowClient.QUERY_TYPE_STACK_TRACE;
24
import static org.junit.Assert.fail;
25

26
import com.google.common.base.Charsets;
27
import com.google.common.base.Throwables;
28
import com.google.common.io.CharSink;
29
import com.google.common.io.Files;
30
import com.uber.m3.tally.Scope;
31
import io.temporal.api.enums.v1.EventType;
32
import io.temporal.api.enums.v1.IndexedValueType;
33
import io.temporal.api.history.v1.History;
34
import io.temporal.api.history.v1.HistoryEvent;
35
import io.temporal.client.WorkflowClient;
36
import io.temporal.client.WorkflowClientOptions;
37
import io.temporal.client.WorkflowQueryException;
38
import io.temporal.client.WorkflowStub;
39
import io.temporal.common.WorkflowExecutionHistory;
40
import io.temporal.common.interceptors.WorkerInterceptor;
41
import io.temporal.internal.common.env.DebugModeUtils;
42
import io.temporal.internal.worker.WorkflowExecutorCache;
43
import io.temporal.serviceclient.WorkflowServiceStubs;
44
import io.temporal.serviceclient.WorkflowServiceStubsOptions;
45
import io.temporal.testing.TestWorkflowEnvironment;
46
import io.temporal.testing.TestWorkflowRule;
47
import io.temporal.worker.*;
48
import io.temporal.workflow.Functions;
49
import java.io.File;
50
import java.io.IOException;
51
import java.lang.reflect.InvocationTargetException;
52
import java.lang.reflect.Method;
53
import java.time.Duration;
54
import java.util.ArrayList;
55
import java.util.List;
56
import java.util.concurrent.ExecutionException;
57
import java.util.concurrent.ScheduledExecutorService;
58
import java.util.concurrent.ScheduledFuture;
59
import java.util.concurrent.ScheduledThreadPoolExecutor;
60
import java.util.concurrent.TimeUnit;
61
import javax.annotation.Nonnull;
62
import javax.annotation.Nullable;
63
import org.junit.Test;
64
import org.junit.rules.TestRule;
65
import org.junit.rules.Timeout;
66
import org.junit.runner.Description;
67
import org.junit.runners.model.Statement;
68
import org.slf4j.Logger;
69
import org.slf4j.LoggerFactory;
70

71
/**
72
 * Intended to be used only in the Java SDK test code. This Rule duplicates {@link TestWorkflowRule}
73
 * and provides additional convenience methods for SDK development
74
 */
75
public class SDKTestWorkflowRule implements TestRule {
76
  private static final Logger log = LoggerFactory.getLogger(SDKTestWorkflowRule.class);
1✔
77

78
  private static final long DEFAULT_TEST_TIMEOUT_SECONDS = 10;
79

80
  private static final long BUSY_WAIT_SLEEP_MS = 100;
81

82
  public static final String NAMESPACE = "UnitTest";
83
  public static final String UUID_REGEXP =
84
      "[0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12}";
85
  // Enable to regenerate JsonFiles used for replay testing.
86
  public static final boolean REGENERATE_JSON_FILES = false;
87
  // Only enable when USE_DOCKER_SERVICE is true
88
  public static final boolean useExternalService =
89
      ExternalServiceTestConfigurator.isUseExternalService();
1✔
90
  private static final List<ScheduledFuture<?>> delayedCallbacks = new ArrayList<>();
1✔
91
  private static final ScheduledExecutorService scheduledExecutor =
1✔
92
      new ScheduledThreadPoolExecutor(1);
93

94
  @Nullable private final Timeout globalTimeout;
95

96
  private final TestWorkflowRule testWorkflowRule;
97

98
  private SDKTestWorkflowRule(SDKTestWorkflowRule.Builder builder) {
1✔
99
    globalTimeout =
1✔
100
        !DebugModeUtils.isTemporalDebugModeOn()
1✔
101
            ? Timeout.seconds(
1✔
102
                builder.testTimeoutSeconds == 0
1✔
103
                    ? DEFAULT_TEST_TIMEOUT_SECONDS
1✔
104
                    : builder.testTimeoutSeconds)
1✔
105
            : null;
1✔
106

107
    testWorkflowRule =
1✔
108
        ExternalServiceTestConfigurator.configure(builder.testWorkflowRuleBuilder).build();
1✔
109
  }
1✔
110

111
  public static Builder newBuilder() {
112
    return new Builder();
1✔
113
  }
114

115
  public static class Builder {
116
    private long testTimeoutSeconds;
117

118
    private boolean workerFactoryOptionsAreSet = false;
1✔
119
    private final TestWorkflowRule.Builder testWorkflowRuleBuilder;
120

121
    public Builder() {
1✔
122
      testWorkflowRuleBuilder = TestWorkflowRule.newBuilder();
1✔
123
    }
1✔
124

125
    public Builder setWorkflowServiceStubsOptions(
126
        WorkflowServiceStubsOptions workflowServiceStubsOptions) {
127
      testWorkflowRuleBuilder.setWorkflowServiceStubsOptions(workflowServiceStubsOptions);
1✔
128
      return this;
1✔
129
    }
130

131
    public Builder setWorkflowClientOptions(WorkflowClientOptions workflowClientOptions) {
132
      testWorkflowRuleBuilder.setWorkflowClientOptions(workflowClientOptions);
1✔
133
      return this;
1✔
134
    }
135

136
    public Builder setWorkerOptions(WorkerOptions options) {
137
      testWorkflowRuleBuilder.setWorkerOptions(options);
1✔
138
      return this;
1✔
139
    }
140

141
    public Builder setWorkerFactoryOptions(WorkerFactoryOptions options) {
142
      options =
143
          (options.getWorkerInterceptors() == null)
1✔
144
              ? WorkerFactoryOptions.newBuilder(options)
1✔
145
                  .setWorkerInterceptors(
1✔
146
                      new TracingWorkerInterceptor(new TracingWorkerInterceptor.FilteredTrace()))
147
                  .build()
1✔
148
              : options;
1✔
149
      testWorkflowRuleBuilder.setWorkerFactoryOptions(options);
1✔
150
      workerFactoryOptionsAreSet = true;
1✔
151
      return this;
1✔
152
    }
153

154
    public Builder setNamespace(String namespace) {
155
      testWorkflowRuleBuilder.setNamespace(namespace);
×
156
      return this;
×
157
    }
158

159
    public Builder setWorkflowTypes(Class<?>... workflowTypes) {
160
      testWorkflowRuleBuilder.setWorkflowTypes(workflowTypes);
1✔
161
      return this;
1✔
162
    }
163

164
    public Builder setWorkflowTypes(
165
        WorkflowImplementationOptions implementationOptions, Class<?>... workflowTypes) {
166
      testWorkflowRuleBuilder.setWorkflowTypes(implementationOptions, workflowTypes);
1✔
167
      return this;
1✔
168
    }
169

170
    public Builder setActivityImplementations(Object... activityImplementations) {
171
      testWorkflowRuleBuilder.setActivityImplementations(activityImplementations);
1✔
172
      return this;
1✔
173
    }
174

175
    public Builder setUseExternalService(boolean useExternalService) {
176
      testWorkflowRuleBuilder.setUseExternalService(useExternalService);
×
177
      return this;
×
178
    }
179

180
    public Builder setTarget(String target) {
181
      testWorkflowRuleBuilder.setTarget(target);
×
182
      return this;
×
183
    }
184

185
    /** Global test timeout. Default is 10 seconds. */
186
    public Builder setTestTimeoutSeconds(long testTimeoutSeconds) {
187
      this.testTimeoutSeconds = testTimeoutSeconds;
1✔
188
      return this;
1✔
189
    }
190

191
    public Builder setInitialTimeMillis(long initialTimeMillis) {
192
      testWorkflowRuleBuilder.setInitialTimeMillis(initialTimeMillis);
1✔
193
      return this;
1✔
194
    }
195

196
    public Builder setDoNotStart(boolean doNotStart) {
197
      testWorkflowRuleBuilder.setDoNotStart(doNotStart);
1✔
198
      return this;
1✔
199
    }
200

201
    public Builder setUseTimeskipping(boolean useTimeskipping) {
202
      testWorkflowRuleBuilder.setUseTimeskipping(useTimeskipping);
1✔
203
      return this;
1✔
204
    }
205

206
    public Builder registerSearchAttribute(String name, IndexedValueType type) {
207
      testWorkflowRuleBuilder.registerSearchAttribute(name, type);
1✔
208
      return this;
1✔
209
    }
210

211
    public Builder setMetricsScope(Scope scope) {
212
      testWorkflowRuleBuilder.setMetricsScope(scope);
1✔
213
      return this;
1✔
214
    }
215

216
    public SDKTestWorkflowRule build() {
217
      if (!workerFactoryOptionsAreSet) {
1✔
218
        testWorkflowRuleBuilder.setWorkerFactoryOptions(
1✔
219
            WorkerFactoryOptions.newBuilder()
1✔
220
                .setWorkerInterceptors(
1✔
221
                    new TracingWorkerInterceptor(new TracingWorkerInterceptor.FilteredTrace()))
222
                .build());
1✔
223
      }
224
      return new SDKTestWorkflowRule(this);
1✔
225
    }
226
  }
227

228
  public Statement apply(@Nonnull Statement base, Description description) {
229
    Statement testWorkflowStatement = base;
1✔
230

231
    Test annotation = description.getAnnotation(Test.class);
1✔
232
    boolean timeoutIsOverriddenOnTestAnnotation = annotation != null && annotation.timeout() > 0;
1✔
233
    if (globalTimeout != null && !timeoutIsOverriddenOnTestAnnotation) {
1✔
234
      testWorkflowStatement = globalTimeout.apply(testWorkflowStatement, description);
1✔
235
    }
236

237
    return testWorkflowRule.apply(testWorkflowStatement, description);
1✔
238
  }
239

240
  public <T extends WorkerInterceptor> T getInterceptor(Class<T> type) {
241
    return testWorkflowRule.getInterceptor(type);
1✔
242
  }
243

244
  public String getTaskQueue() {
245
    return testWorkflowRule.getTaskQueue();
1✔
246
  }
247

248
  public Worker getWorker() {
249
    return testWorkflowRule.getWorker();
1✔
250
  }
251

252
  public WorkflowExecutionHistory getExecutionHistory(String workflowId) {
253
    return testWorkflowRule.getWorkflowClient().fetchHistory(workflowId);
1✔
254
  }
255

256
  /** Returns list of all events of the given EventType found in the history. */
257
  public List<HistoryEvent> getHistoryEvents(String workflowId, EventType eventType) {
258
    List<HistoryEvent> result = new ArrayList<>();
1✔
259
    History history = getExecutionHistory(workflowId).getHistory();
1✔
260
    for (HistoryEvent event : history.getEventsList()) {
1✔
261
      if (eventType == event.getEventType()) {
1✔
262
        result.add(event);
1✔
263
      }
264
    }
1✔
265
    return result;
1✔
266
  }
267

268
  /** Returns the first event of the given EventType found in the history. */
269
  public HistoryEvent getHistoryEvent(String workflowId, EventType eventType) {
270
    History history = getExecutionHistory(workflowId).getHistory();
1✔
271
    for (HistoryEvent event : history.getEventsList()) {
1✔
272
      if (eventType == event.getEventType()) {
1✔
273
        return event;
1✔
274
      }
275
    }
1✔
276
    throw new IllegalArgumentException("No event of " + eventType + " found in the history");
×
277
  }
278

279
  /** Asserts that an event of the given EventType is found in the history. */
280
  public void assertHistoryEvent(String workflowId, EventType eventType) {
281
    History history = getExecutionHistory(workflowId).getHistory();
1✔
282
    for (HistoryEvent event : history.getEventsList()) {
1✔
283
      if (eventType == event.getEventType()) {
1✔
284
        return;
1✔
285
      }
286
    }
1✔
287
    fail("No event of " + eventType + " found in the history");
×
288
  }
×
289

290
  /** Asserts that an event of the given EventType is not found in the history. */
291
  public void assertNoHistoryEvent(String workflowId, EventType eventType) {
292
    History history = getExecutionHistory(workflowId).getHistory();
1✔
293
    assertNoHistoryEvent(history, eventType);
1✔
294
  }
1✔
295

296
  /** Asserts that an event of the given EventType is not found in the history. */
297
  public static void assertNoHistoryEvent(History history, EventType eventType) {
298
    for (HistoryEvent event : history.getEventsList()) {
1✔
299
      if (eventType == event.getEventType()) {
1✔
300
        fail("Event of " + eventType + " found in the history");
×
301
      }
302
    }
1✔
303
  }
1✔
304

305
  /** Waits till the end of the workflow task if there is a workflow task in progress */
306
  public void waitForTheEndOfWFT(String workflowId) {
307
    WorkflowExecutionHistory initialHistory = getExecutionHistory(workflowId);
1✔
308

309
    HistoryEvent lastEvent = initialHistory.getLastEvent();
1✔
310
    if (isWFTInProgress(lastEvent)) {
1✔
311
      // wait for completion of a workflow task in progress
312
      long startEventId = lastEvent.getEventId();
1✔
313
      while (true) {
314
        List<HistoryEvent> historyEvents =
1✔
315
            getExecutionHistory(workflowId).getHistory().getEventsList();
1✔
316
        if (historyEvents.stream()
1✔
317
            .filter(e -> e.getEventId() > startEventId)
1✔
318
            .anyMatch(e -> !isWFTInProgress(e))) {
1✔
319
          return;
1✔
320
        }
321
        busyWaitSleep();
1✔
322
      }
1✔
323
    }
324
  }
×
325

326
  public WorkflowClient getWorkflowClient() {
327
    return testWorkflowRule.getWorkflowClient();
1✔
328
  }
329

330
  public WorkflowServiceStubs getWorkflowServiceStubs() {
331
    return testWorkflowRule.getWorkflowServiceStubs();
1✔
332
  }
333

334
  public boolean isUseExternalService() {
335
    return useExternalService;
1✔
336
  }
337

338
  public TestWorkflowEnvironment getTestEnvironment() {
339
    return testWorkflowRule.getTestEnvironment();
1✔
340
  }
341

342
  public <T> T newWorkflowStub(Class<T> workflow) {
343
    return testWorkflowRule.newWorkflowStub(workflow);
1✔
344
  }
345

346
  public <T> T newWorkflowStubTimeoutOptions(Class<T> workflow) {
347
    return getWorkflowClient()
1✔
348
        .newWorkflowStub(workflow, SDKTestOptions.newWorkflowOptionsWithTimeouts(getTaskQueue()));
1✔
349
  }
350

351
  public <T> T newWorkflowStub200sTimeoutOptions(Class<T> workflow) {
352
    return getWorkflowClient()
1✔
353
        .newWorkflowStub(
1✔
354
            workflow, SDKTestOptions.newWorkflowOptionsForTaskQueue200sTimeout(getTaskQueue()));
1✔
355
  }
356

357
  public WorkflowStub newUntypedWorkflowStub(String workflow) {
358
    return testWorkflowRule.newUntypedWorkflowStub(workflow);
1✔
359
  }
360

361
  public WorkflowStub newUntypedWorkflowStubTimeoutOptions(String workflow) {
362
    return getWorkflowClient()
1✔
363
        .newUntypedWorkflowStub(
1✔
364
            workflow, SDKTestOptions.newWorkflowOptionsWithTimeouts(getTaskQueue()));
1✔
365
  }
366

367
  /** Used to ensure that workflow first workflow task is executed. */
368
  public static void waitForOKQuery(Object anyStub) {
369
    WorkflowStub untypedStub;
370
    if (anyStub instanceof WorkflowStub) {
1✔
371
      untypedStub = (WorkflowStub) anyStub;
1✔
372
    } else {
373
      untypedStub = WorkflowStub.fromTyped(anyStub);
×
374
    }
375
    while (true) {
376
      try {
377
        String stackTrace = untypedStub.query(QUERY_TYPE_STACK_TRACE, String.class);
1✔
378
        if (!stackTrace.isEmpty()) {
1✔
379
          break;
1✔
380
        }
381
        busyWaitSleep();
×
382
      } catch (WorkflowQueryException e) {
×
383
        // Ignore
384
      }
×
385
    }
386
  }
1✔
387

388
  public <R> void addWorkflowImplementationFactory(
389
      Class<R> factoryImpl, Functions.Func<R> factoryFunc) {
390
    this.getTestEnvironment()
1✔
391
        .getWorkerFactory()
1✔
392
        .getWorker(this.getTaskQueue())
1✔
393
        .registerWorkflowImplementationFactory(factoryImpl, factoryFunc);
1✔
394
  }
1✔
395

396
  public void regenerateHistoryForReplay(String workflowId, String fileName) {
397
    if (REGENERATE_JSON_FILES) {
398
      String json = getExecutionHistory(workflowId).toJson(true);
399
      String projectPath = System.getProperty("user.dir");
400
      String resourceFile = projectPath + "/src/test/resources/" + fileName + ".json";
401
      File file = new File(resourceFile);
402
      CharSink sink = Files.asCharSink(file, Charsets.UTF_8);
403
      try {
404
        sink.write(json);
405
      } catch (IOException e) {
406
        Throwables.propagateIfPossible(e, RuntimeException.class);
407
      }
408
      log.info("Regenerated history file: " + resourceFile);
409
    }
410
  }
1✔
411

412
  // TODO: Refactor testEnv to support testing through real service to avoid these switches.
413
  public void registerDelayedCallback(Duration delay, Runnable r) {
414
    if (useExternalService) {
1✔
415
      ScheduledFuture<?> result =
×
416
          scheduledExecutor.schedule(r, delay.toMillis(), TimeUnit.MILLISECONDS);
×
417
      delayedCallbacks.add(result);
×
418
    } else {
×
419
      testWorkflowRule.getTestEnvironment().registerDelayedCallback(delay, r);
1✔
420
    }
421
  }
1✔
422

423
  protected void shutdown() throws Throwable {
424
    getTestEnvironment().shutdown();
×
425

426
    TracingWorkerInterceptor tracer = getInterceptor(TracingWorkerInterceptor.class);
×
427
    if (tracer != null) {
×
428
      tracer.assertExpected();
×
429
    }
430

431
    for (ScheduledFuture<?> result : delayedCallbacks) {
×
432
      if (result.isDone() && !result.isCancelled()) {
×
433
        try {
434
          result.get();
×
435
        } catch (InterruptedException e) {
×
436
          Thread.currentThread().interrupt();
×
437
        } catch (ExecutionException e) {
×
438
          throw e.getCause();
×
439
        }
×
440
      }
441
    }
×
442
  }
×
443

444
  public void sleep(Duration d) {
445
    if (useExternalService) {
1✔
446
      try {
447
        Thread.sleep(d.toMillis());
×
448
      } catch (InterruptedException e) {
×
449
        Thread.currentThread().interrupt();
×
450
        throw new RuntimeException("Interrupted", e);
×
451
      }
×
452
    } else {
453
      testWorkflowRule.getTestEnvironment().sleep(d);
1✔
454
    }
455
  }
1✔
456

457
  /** Causes eviction of all workflows in the worker cache */
458
  // TODO replace the horrible reflection implementation with a normal protected access by hiding
459
  //  WorkerFactory under an interface.
460
  public void invalidateWorkflowCache() {
461
    WorkerFactory workerFactory = testWorkflowRule.getTestEnvironment().getWorkerFactory();
1✔
462
    try {
463
      Method getCache = WorkerFactory.class.getDeclaredMethod("getCache");
1✔
464
      getCache.setAccessible(true);
1✔
465
      WorkflowExecutorCache cache = (WorkflowExecutorCache) getCache.invoke(workerFactory);
1✔
466
      cache.invalidateAll();
1✔
467
      while (cache.size() > 0) {
1✔
468
        busyWaitSleep();
×
469
      }
470
    } catch (NoSuchMethodException | IllegalAccessException | InvocationTargetException e) {
×
471
      throw new RuntimeException(e);
×
472
    }
1✔
473
  }
1✔
474

475
  private static boolean isWFTInProgress(HistoryEvent event) {
476
    EventType eventType = event.getEventType();
1✔
477
    switch (eventType) {
1✔
478
      case EVENT_TYPE_WORKFLOW_TASK_STARTED:
479
      case EVENT_TYPE_WORKFLOW_TASK_SCHEDULED:
480
        return true;
1✔
481
      default:
482
        return false;
1✔
483
    }
484
  }
485

486
  private static void busyWaitSleep() {
487
    try {
488
      Thread.sleep(BUSY_WAIT_SLEEP_MS);
1✔
489
    } catch (InterruptedException e) {
×
490
      Thread.currentThread().interrupt();
×
491
      throw new RuntimeException(e);
×
492
    }
1✔
493
  }
1✔
494
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc