• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

temporalio / sdk-java / #175

pending completion
#175

push

github-actions

web-flow
Worker / Build Id versioning (#1786)

Implement new worker build id based versioning feature

236 of 236 new or added lines in 24 files covered. (100.0%)

18343 of 23697 relevant lines covered (77.41%)

0.81 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

78.28
/temporal-testing/src/main/java/io/temporal/testing/internal/SDKTestWorkflowRule.java
1
/*
2
 * Copyright (C) 2022 Temporal Technologies, Inc. All Rights Reserved.
3
 *
4
 * Copyright (C) 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved.
5
 *
6
 * Modifications copyright (C) 2017 Uber Technologies, Inc.
7
 *
8
 * Licensed under the Apache License, Version 2.0 (the "License");
9
 * you may not use this material except in compliance with the License.
10
 * You may obtain a copy of the License at
11
 *
12
 *   http://www.apache.org/licenses/LICENSE-2.0
13
 *
14
 * Unless required by applicable law or agreed to in writing, software
15
 * distributed under the License is distributed on an "AS IS" BASIS,
16
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17
 * See the License for the specific language governing permissions and
18
 * limitations under the License.
19
 */
20

21
package io.temporal.testing.internal;
22

23
import static io.temporal.client.WorkflowClient.QUERY_TYPE_STACK_TRACE;
24
import static org.junit.Assert.fail;
25

26
import com.google.common.base.Charsets;
27
import com.google.common.base.Throwables;
28
import com.google.common.io.CharSink;
29
import com.google.common.io.Files;
30
import com.uber.m3.tally.Scope;
31
import io.temporal.api.enums.v1.EventType;
32
import io.temporal.api.enums.v1.IndexedValueType;
33
import io.temporal.api.history.v1.History;
34
import io.temporal.api.history.v1.HistoryEvent;
35
import io.temporal.client.WorkflowClient;
36
import io.temporal.client.WorkflowClientOptions;
37
import io.temporal.client.WorkflowQueryException;
38
import io.temporal.client.WorkflowStub;
39
import io.temporal.common.SearchAttributeKey;
40
import io.temporal.common.WorkflowExecutionHistory;
41
import io.temporal.common.interceptors.WorkerInterceptor;
42
import io.temporal.internal.common.env.DebugModeUtils;
43
import io.temporal.internal.worker.WorkflowExecutorCache;
44
import io.temporal.serviceclient.WorkflowServiceStubs;
45
import io.temporal.serviceclient.WorkflowServiceStubsOptions;
46
import io.temporal.testing.TestWorkflowEnvironment;
47
import io.temporal.testing.TestWorkflowRule;
48
import io.temporal.worker.*;
49
import io.temporal.workflow.Functions;
50
import java.io.File;
51
import java.io.IOException;
52
import java.lang.reflect.InvocationTargetException;
53
import java.lang.reflect.Method;
54
import java.time.Duration;
55
import java.util.ArrayList;
56
import java.util.List;
57
import java.util.concurrent.ExecutionException;
58
import java.util.concurrent.ScheduledExecutorService;
59
import java.util.concurrent.ScheduledFuture;
60
import java.util.concurrent.ScheduledThreadPoolExecutor;
61
import java.util.concurrent.TimeUnit;
62
import javax.annotation.Nonnull;
63
import javax.annotation.Nullable;
64
import org.junit.Test;
65
import org.junit.rules.TestRule;
66
import org.junit.rules.Timeout;
67
import org.junit.runner.Description;
68
import org.junit.runners.model.Statement;
69
import org.slf4j.Logger;
70
import org.slf4j.LoggerFactory;
71

72
/**
73
 * Intended to be used only in the Java SDK test code. This Rule duplicates {@link TestWorkflowRule}
74
 * and provides additional convenience methods for SDK development
75
 */
76
public class SDKTestWorkflowRule implements TestRule {
77
  private static final Logger log = LoggerFactory.getLogger(SDKTestWorkflowRule.class);
1✔
78

79
  private static final long DEFAULT_TEST_TIMEOUT_SECONDS = 10;
80

81
  private static final long BUSY_WAIT_SLEEP_MS = 100;
82

83
  public static final String NAMESPACE = "UnitTest";
84
  public static final String UUID_REGEXP =
85
      "[0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12}";
86
  // Enable to regenerate JsonFiles used for replay testing.
87
  public static final boolean REGENERATE_JSON_FILES = false;
88
  // Only enable when USE_DOCKER_SERVICE is true
89
  public static final boolean useExternalService =
90
      ExternalServiceTestConfigurator.isUseExternalService();
1✔
91
  private static final List<ScheduledFuture<?>> delayedCallbacks = new ArrayList<>();
1✔
92
  private static final ScheduledExecutorService scheduledExecutor =
1✔
93
      new ScheduledThreadPoolExecutor(1);
94

95
  @Nullable private final Timeout globalTimeout;
96

97
  private final TestWorkflowRule testWorkflowRule;
98

99
  private SDKTestWorkflowRule(SDKTestWorkflowRule.Builder builder) {
1✔
100
    globalTimeout =
1✔
101
        !DebugModeUtils.isTemporalDebugModeOn()
1✔
102
            ? Timeout.seconds(
1✔
103
                builder.testTimeoutSeconds == 0
1✔
104
                    ? DEFAULT_TEST_TIMEOUT_SECONDS
1✔
105
                    : builder.testTimeoutSeconds)
1✔
106
            : null;
1✔
107

108
    testWorkflowRule =
1✔
109
        ExternalServiceTestConfigurator.configure(builder.testWorkflowRuleBuilder).build();
1✔
110
  }
1✔
111

112
  public static Builder newBuilder() {
113
    return new Builder();
1✔
114
  }
115

116
  public static class Builder {
117
    private long testTimeoutSeconds;
118

119
    private boolean workerFactoryOptionsAreSet = false;
1✔
120
    private final TestWorkflowRule.Builder testWorkflowRuleBuilder;
121

122
    public Builder() {
1✔
123
      testWorkflowRuleBuilder = TestWorkflowRule.newBuilder();
1✔
124
    }
1✔
125

126
    public Builder setWorkflowServiceStubsOptions(
127
        WorkflowServiceStubsOptions workflowServiceStubsOptions) {
128
      testWorkflowRuleBuilder.setWorkflowServiceStubsOptions(workflowServiceStubsOptions);
1✔
129
      return this;
1✔
130
    }
131

132
    public Builder setWorkflowClientOptions(WorkflowClientOptions workflowClientOptions) {
133
      testWorkflowRuleBuilder.setWorkflowClientOptions(workflowClientOptions);
1✔
134
      return this;
1✔
135
    }
136

137
    public Builder setWorkerOptions(WorkerOptions options) {
138
      testWorkflowRuleBuilder.setWorkerOptions(options);
1✔
139
      return this;
1✔
140
    }
141

142
    public Builder setWorkerFactoryOptions(WorkerFactoryOptions options) {
143
      options =
144
          (options.getWorkerInterceptors() == null)
1✔
145
              ? WorkerFactoryOptions.newBuilder(options)
1✔
146
                  .setWorkerInterceptors(
1✔
147
                      new TracingWorkerInterceptor(new TracingWorkerInterceptor.FilteredTrace()))
148
                  .build()
1✔
149
              : options;
1✔
150
      testWorkflowRuleBuilder.setWorkerFactoryOptions(options);
1✔
151
      workerFactoryOptionsAreSet = true;
1✔
152
      return this;
1✔
153
    }
154

155
    public Builder setNamespace(String namespace) {
156
      testWorkflowRuleBuilder.setNamespace(namespace);
×
157
      return this;
×
158
    }
159

160
    public Builder setWorkflowTypes(Class<?>... workflowTypes) {
161
      testWorkflowRuleBuilder.setWorkflowTypes(workflowTypes);
1✔
162
      return this;
1✔
163
    }
164

165
    public Builder setWorkflowTypes(
166
        WorkflowImplementationOptions implementationOptions, Class<?>... workflowTypes) {
167
      testWorkflowRuleBuilder.setWorkflowTypes(implementationOptions, workflowTypes);
1✔
168
      return this;
1✔
169
    }
170

171
    public Builder setActivityImplementations(Object... activityImplementations) {
172
      testWorkflowRuleBuilder.setActivityImplementations(activityImplementations);
1✔
173
      return this;
1✔
174
    }
175

176
    public Builder setUseExternalService(boolean useExternalService) {
177
      testWorkflowRuleBuilder.setUseExternalService(useExternalService);
×
178
      return this;
×
179
    }
180

181
    public Builder setTarget(String target) {
182
      testWorkflowRuleBuilder.setTarget(target);
×
183
      return this;
×
184
    }
185

186
    /** Global test timeout. Default is 10 seconds. */
187
    public Builder setTestTimeoutSeconds(long testTimeoutSeconds) {
188
      this.testTimeoutSeconds = testTimeoutSeconds;
1✔
189
      return this;
1✔
190
    }
191

192
    public Builder setInitialTimeMillis(long initialTimeMillis) {
193
      testWorkflowRuleBuilder.setInitialTimeMillis(initialTimeMillis);
1✔
194
      return this;
1✔
195
    }
196

197
    public Builder setDoNotStart(boolean doNotStart) {
198
      testWorkflowRuleBuilder.setDoNotStart(doNotStart);
1✔
199
      return this;
1✔
200
    }
201

202
    public Builder setUseTimeskipping(boolean useTimeskipping) {
203
      testWorkflowRuleBuilder.setUseTimeskipping(useTimeskipping);
1✔
204
      return this;
1✔
205
    }
206

207
    public Builder registerSearchAttribute(String name, IndexedValueType type) {
208
      testWorkflowRuleBuilder.registerSearchAttribute(name, type);
1✔
209
      return this;
1✔
210
    }
211

212
    public Builder registerSearchAttribute(SearchAttributeKey<?> key) {
213
      testWorkflowRuleBuilder.registerSearchAttribute(key);
1✔
214
      return this;
1✔
215
    }
216

217
    public Builder setMetricsScope(Scope scope) {
218
      testWorkflowRuleBuilder.setMetricsScope(scope);
1✔
219
      return this;
1✔
220
    }
221

222
    public SDKTestWorkflowRule build() {
223
      if (!workerFactoryOptionsAreSet) {
1✔
224
        testWorkflowRuleBuilder.setWorkerFactoryOptions(
1✔
225
            WorkerFactoryOptions.newBuilder()
1✔
226
                .setWorkerInterceptors(
1✔
227
                    new TracingWorkerInterceptor(new TracingWorkerInterceptor.FilteredTrace()))
228
                .build());
1✔
229
      }
230
      return new SDKTestWorkflowRule(this);
1✔
231
    }
232
  }
233

234
  public Statement apply(@Nonnull Statement base, Description description) {
235
    Statement testWorkflowStatement = base;
1✔
236

237
    Test annotation = description.getAnnotation(Test.class);
1✔
238
    boolean timeoutIsOverriddenOnTestAnnotation = annotation != null && annotation.timeout() > 0;
1✔
239
    if (globalTimeout != null && !timeoutIsOverriddenOnTestAnnotation) {
1✔
240
      testWorkflowStatement = globalTimeout.apply(testWorkflowStatement, description);
1✔
241
    }
242

243
    return testWorkflowRule.apply(testWorkflowStatement, description);
1✔
244
  }
245

246
  public <T extends WorkerInterceptor> T getInterceptor(Class<T> type) {
247
    return testWorkflowRule.getInterceptor(type);
1✔
248
  }
249

250
  public String getTaskQueue() {
251
    return testWorkflowRule.getTaskQueue();
1✔
252
  }
253

254
  public Worker getWorker() {
255
    return testWorkflowRule.getWorker();
1✔
256
  }
257

258
  public WorkerFactoryOptions getWorkerFactoryOptions() {
259
    return testWorkflowRule.getWorkerFactoryOptions();
×
260
  }
261

262
  public WorkflowExecutionHistory getExecutionHistory(String workflowId) {
263
    return testWorkflowRule.getWorkflowClient().fetchHistory(workflowId);
1✔
264
  }
265

266
  /** Returns list of all events of the given EventType found in the history. */
267
  public List<HistoryEvent> getHistoryEvents(String workflowId, EventType eventType) {
268
    List<HistoryEvent> result = new ArrayList<>();
1✔
269
    History history = getExecutionHistory(workflowId).getHistory();
1✔
270
    for (HistoryEvent event : history.getEventsList()) {
1✔
271
      if (eventType == event.getEventType()) {
1✔
272
        result.add(event);
1✔
273
      }
274
    }
1✔
275
    return result;
1✔
276
  }
277

278
  /** Returns the first event of the given EventType found in the history. */
279
  public HistoryEvent getHistoryEvent(String workflowId, EventType eventType) {
280
    History history = getExecutionHistory(workflowId).getHistory();
1✔
281
    for (HistoryEvent event : history.getEventsList()) {
1✔
282
      if (eventType == event.getEventType()) {
1✔
283
        return event;
1✔
284
      }
285
    }
1✔
286
    throw new IllegalArgumentException("No event of " + eventType + " found in the history");
×
287
  }
288

289
  /** Asserts that an event of the given EventType is found in the history. */
290
  public void assertHistoryEvent(String workflowId, EventType eventType) {
291
    History history = getExecutionHistory(workflowId).getHistory();
1✔
292
    for (HistoryEvent event : history.getEventsList()) {
1✔
293
      if (eventType == event.getEventType()) {
1✔
294
        return;
1✔
295
      }
296
    }
1✔
297
    fail("No event of " + eventType + " found in the history");
×
298
  }
×
299

300
  /** Asserts that an event of the given EventType is not found in the history. */
301
  public void assertNoHistoryEvent(String workflowId, EventType eventType) {
302
    History history = getExecutionHistory(workflowId).getHistory();
1✔
303
    assertNoHistoryEvent(history, eventType);
1✔
304
  }
1✔
305

306
  /** Asserts that an event of the given EventType is not found in the history. */
307
  public static void assertNoHistoryEvent(History history, EventType eventType) {
308
    for (HistoryEvent event : history.getEventsList()) {
1✔
309
      if (eventType == event.getEventType()) {
1✔
310
        fail("Event of " + eventType + " found in the history");
×
311
      }
312
    }
1✔
313
  }
1✔
314

315
  /** Waits till the end of the workflow task if there is a workflow task in progress */
316
  public void waitForTheEndOfWFT(String workflowId) {
317
    WorkflowExecutionHistory initialHistory = getExecutionHistory(workflowId);
1✔
318

319
    HistoryEvent lastEvent = initialHistory.getLastEvent();
1✔
320
    if (isWFTInProgress(lastEvent)) {
1✔
321
      // wait for completion of a workflow task in progress
322
      long startEventId = lastEvent.getEventId();
1✔
323
      while (true) {
324
        List<HistoryEvent> historyEvents =
1✔
325
            getExecutionHistory(workflowId).getHistory().getEventsList();
1✔
326
        if (historyEvents.stream()
1✔
327
            .filter(e -> e.getEventId() > startEventId)
1✔
328
            .anyMatch(e -> !isWFTInProgress(e))) {
1✔
329
          return;
1✔
330
        }
331
        busyWaitSleep();
1✔
332
      }
1✔
333
    }
334
  }
1✔
335

336
  public WorkflowClient getWorkflowClient() {
337
    return testWorkflowRule.getWorkflowClient();
1✔
338
  }
339

340
  public WorkflowServiceStubs getWorkflowServiceStubs() {
341
    return testWorkflowRule.getWorkflowServiceStubs();
1✔
342
  }
343

344
  public boolean isUseExternalService() {
345
    return useExternalService;
1✔
346
  }
347

348
  public TestWorkflowEnvironment getTestEnvironment() {
349
    return testWorkflowRule.getTestEnvironment();
1✔
350
  }
351

352
  public <T> T newWorkflowStub(Class<T> workflow) {
353
    return testWorkflowRule.newWorkflowStub(workflow);
1✔
354
  }
355

356
  public <T> T newWorkflowStubTimeoutOptions(Class<T> workflow) {
357
    return getWorkflowClient()
1✔
358
        .newWorkflowStub(workflow, SDKTestOptions.newWorkflowOptionsWithTimeouts(getTaskQueue()));
1✔
359
  }
360

361
  public <T> T newWorkflowStub200sTimeoutOptions(Class<T> workflow) {
362
    return getWorkflowClient()
1✔
363
        .newWorkflowStub(
1✔
364
            workflow, SDKTestOptions.newWorkflowOptionsForTaskQueue200sTimeout(getTaskQueue()));
1✔
365
  }
366

367
  public WorkflowStub newUntypedWorkflowStub(String workflow) {
368
    return testWorkflowRule.newUntypedWorkflowStub(workflow);
1✔
369
  }
370

371
  public WorkflowStub newUntypedWorkflowStubTimeoutOptions(String workflow) {
372
    return getWorkflowClient()
1✔
373
        .newUntypedWorkflowStub(
1✔
374
            workflow, SDKTestOptions.newWorkflowOptionsWithTimeouts(getTaskQueue()));
1✔
375
  }
376

377
  /** Used to ensure that workflow first workflow task is executed. */
378
  public static void waitForOKQuery(Object anyStub) {
379
    WorkflowStub untypedStub;
380
    if (anyStub instanceof WorkflowStub) {
1✔
381
      untypedStub = (WorkflowStub) anyStub;
1✔
382
    } else {
383
      untypedStub = WorkflowStub.fromTyped(anyStub);
1✔
384
    }
385
    while (true) {
386
      try {
387
        String stackTrace = untypedStub.query(QUERY_TYPE_STACK_TRACE, String.class);
1✔
388
        if (!stackTrace.isEmpty()) {
1✔
389
          break;
1✔
390
        }
391
        busyWaitSleep();
×
392
      } catch (WorkflowQueryException e) {
×
393
        // Ignore
394
      }
×
395
    }
396
  }
1✔
397

398
  public <R> void addWorkflowImplementationFactory(
399
      Class<R> factoryImpl, Functions.Func<R> factoryFunc) {
400
    this.getTestEnvironment()
1✔
401
        .getWorkerFactory()
1✔
402
        .getWorker(this.getTaskQueue())
1✔
403
        .registerWorkflowImplementationFactory(factoryImpl, factoryFunc);
1✔
404
  }
1✔
405

406
  public void regenerateHistoryForReplay(String workflowId, String fileName) {
407
    if (REGENERATE_JSON_FILES) {
408
      String json = getExecutionHistory(workflowId).toJson(true);
409
      String projectPath = System.getProperty("user.dir");
410
      String resourceFile = projectPath + "/src/test/resources/" + fileName + ".json";
411
      File file = new File(resourceFile);
412
      CharSink sink = Files.asCharSink(file, Charsets.UTF_8);
413
      try {
414
        sink.write(json);
415
      } catch (IOException e) {
416
        Throwables.propagateIfPossible(e, RuntimeException.class);
417
      }
418
      log.info("Regenerated history file: " + resourceFile);
419
    }
420
  }
1✔
421

422
  // TODO: Refactor testEnv to support testing through real service to avoid these switches.
423
  public void registerDelayedCallback(Duration delay, Runnable r) {
424
    if (useExternalService) {
1✔
425
      ScheduledFuture<?> result =
×
426
          scheduledExecutor.schedule(r, delay.toMillis(), TimeUnit.MILLISECONDS);
×
427
      delayedCallbacks.add(result);
×
428
    } else {
×
429
      testWorkflowRule.getTestEnvironment().registerDelayedCallback(delay, r);
1✔
430
    }
431
  }
1✔
432

433
  protected void shutdown() throws Throwable {
434
    getTestEnvironment().shutdown();
×
435

436
    TracingWorkerInterceptor tracer = getInterceptor(TracingWorkerInterceptor.class);
×
437
    if (tracer != null) {
×
438
      tracer.assertExpected();
×
439
    }
440

441
    for (ScheduledFuture<?> result : delayedCallbacks) {
×
442
      if (result.isDone() && !result.isCancelled()) {
×
443
        try {
444
          result.get();
×
445
        } catch (InterruptedException e) {
×
446
          Thread.currentThread().interrupt();
×
447
        } catch (ExecutionException e) {
×
448
          throw e.getCause();
×
449
        }
×
450
      }
451
    }
×
452
  }
×
453

454
  public void sleep(Duration d) {
455
    if (useExternalService) {
1✔
456
      try {
457
        Thread.sleep(d.toMillis());
×
458
      } catch (InterruptedException e) {
×
459
        Thread.currentThread().interrupt();
×
460
        throw new RuntimeException("Interrupted", e);
×
461
      }
×
462
    } else {
463
      testWorkflowRule.getTestEnvironment().sleep(d);
1✔
464
    }
465
  }
1✔
466

467
  /** Causes eviction of all workflows in the worker cache */
468
  // TODO replace the horrible reflection implementation with a normal protected access by hiding
469
  //  WorkerFactory under an interface.
470
  public void invalidateWorkflowCache() {
471
    WorkerFactory workerFactory = testWorkflowRule.getTestEnvironment().getWorkerFactory();
1✔
472
    try {
473
      Method getCache = WorkerFactory.class.getDeclaredMethod("getCache");
1✔
474
      getCache.setAccessible(true);
1✔
475
      WorkflowExecutorCache cache = (WorkflowExecutorCache) getCache.invoke(workerFactory);
1✔
476
      cache.invalidateAll();
1✔
477
      while (cache.size() > 0) {
1✔
478
        busyWaitSleep();
×
479
      }
480
    } catch (NoSuchMethodException | IllegalAccessException | InvocationTargetException e) {
×
481
      throw new RuntimeException(e);
×
482
    }
1✔
483
  }
1✔
484

485
  private static boolean isWFTInProgress(HistoryEvent event) {
486
    EventType eventType = event.getEventType();
1✔
487
    switch (eventType) {
1✔
488
      case EVENT_TYPE_WORKFLOW_TASK_STARTED:
489
      case EVENT_TYPE_WORKFLOW_TASK_SCHEDULED:
490
        return true;
1✔
491
      default:
492
        return false;
1✔
493
    }
494
  }
495

496
  private static void busyWaitSleep() {
497
    try {
498
      Thread.sleep(BUSY_WAIT_SLEEP_MS);
1✔
499
    } catch (InterruptedException e) {
×
500
      Thread.currentThread().interrupt();
×
501
      throw new RuntimeException(e);
×
502
    }
1✔
503
  }
1✔
504
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc