• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

temporalio / sdk-java / #153

pending completion
#153

push

github-actions

web-flow
Eager Workflow Task Dispatch (#1674)

Issue #1646

Signed-off-by: Dmitry Spikhalskiy <dmitry@spikhalskiy.com>

213 of 213 new or added lines in 22 files covered. (100.0%)

16682 of 20566 relevant lines covered (81.11%)

0.81 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

85.82
/temporal-sdk/src/main/java/io/temporal/worker/Worker.java
1
/*
2
 * Copyright (C) 2022 Temporal Technologies, Inc. All Rights Reserved.
3
 *
4
 * Copyright (C) 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved.
5
 *
6
 * Modifications copyright (C) 2017 Uber Technologies, Inc.
7
 *
8
 * Licensed under the Apache License, Version 2.0 (the "License");
9
 * you may not use this material except in compliance with the License.
10
 * You may obtain a copy of the License at
11
 *
12
 *   http://www.apache.org/licenses/LICENSE-2.0
13
 *
14
 * Unless required by applicable law or agreed to in writing, software
15
 * distributed under the License is distributed on an "AS IS" BASIS,
16
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17
 * See the License for the specific language governing permissions and
18
 * limitations under the License.
19
 */
20

21
package io.temporal.testing;
22

23
import com.uber.m3.tally.Scope;
24
import io.temporal.api.common.v1.WorkflowExecution;
25
import io.temporal.api.enums.v1.IndexedValueType;
26
import io.temporal.api.history.v1.History;
27
import io.temporal.api.workflowservice.v1.WorkflowServiceGrpc;
28
import io.temporal.client.WorkflowClient;
29
import io.temporal.client.WorkflowClientOptions;
30
import io.temporal.client.WorkflowOptions;
31
import io.temporal.client.WorkflowStub;
32
import io.temporal.common.interceptors.WorkerInterceptor;
33
import io.temporal.internal.common.env.DebugModeUtils;
34
import io.temporal.internal.docker.RegisterTestNamespace;
35
import io.temporal.serviceclient.WorkflowServiceStubs;
36
import io.temporal.serviceclient.WorkflowServiceStubsOptions;
37
import io.temporal.worker.Worker;
38
import io.temporal.worker.WorkerFactoryOptions;
39
import io.temporal.worker.WorkerOptions;
40
import io.temporal.worker.WorkflowImplementationOptions;
41
import java.time.Instant;
42
import java.util.HashMap;
43
import java.util.Map;
44
import java.util.UUID;
45
import javax.annotation.Nonnull;
46
import javax.annotation.Nullable;
47
import org.junit.Test;
48
import org.junit.rules.TestRule;
49
import org.junit.rules.TestWatcher;
50
import org.junit.rules.Timeout;
51
import org.junit.runner.Description;
52
import org.junit.runners.model.Statement;
53

54
/**
55
 * JUnit4
56
 *
57
 * <p>Test rule that sets up test environment, simplifying workflow worker creation and shutdown.
58
 * Can be used with both in-memory and standalone temporal service. (see {@link
59
 * Builder#setUseExternalService(boolean)} and {@link Builder#setTarget(String)}})
60
 *
61
 * <p>Example of usage:
62
 *
63
 * <pre><code>
64
 *   public class MyTest {
65
 *
66
 *  {@literal @}Rule
67
 *   public TestWorkflowRule workflowRule =
68
 *       TestWorkflowRule.newBuilder()
69
 *           .setWorkflowTypes(TestWorkflowImpl.class)
70
 *           .setActivityImplementations(new TestActivities())
71
 *           .build();
72
 *
73
 *  {@literal @}Test
74
 *   public void testMyWorkflow() {
75
 *       TestWorkflow workflow = workflowRule.getWorkflowClient().newWorkflowStub(
76
 *                 TestWorkflow.class, WorkflowOptions.newBuilder().setTaskQueue(workflowRule.getTaskQueue()).build());
77
 *       ...
78
 *   }
79
 * </code></pre>
80
 */
81
public class TestWorkflowRule implements TestRule {
82

83
  private final String namespace;
84
  private final boolean useExternalService;
85
  private final boolean doNotStart;
86
  @Nullable private final Timeout globalTimeout;
87

88
  private final Class<?>[] workflowTypes;
89
  private final Object[] activityImplementations;
90
  private final WorkflowServiceStubsOptions serviceStubsOptions;
91
  private final WorkflowClientOptions clientOptions;
92
  private final WorkerFactoryOptions workerFactoryOptions;
93
  private final WorkflowImplementationOptions workflowImplementationOptions;
94
  private final WorkerOptions workerOptions;
95
  private final String target;
96
  private final boolean useTimeskipping;
97
  private final Scope metricsScope;
98

99
  @Nonnull private final Map<String, IndexedValueType> searchAttributes;
100

101
  private String taskQueue;
102
  private final TestWorkflowEnvironment testEnvironment;
103
  private final TestWatcher watchman =
104
      new TestWatcher() {
105
        @Override
106
        protected void failed(Throwable e, Description description) {
107
          System.err.println("WORKFLOW EXECUTION HISTORIES:\n" + testEnvironment.getDiagnostics());
108
        }
109
      };
110

111
  private TestWorkflowRule(Builder builder) {
112
    this.doNotStart = builder.doNotStart;
113
    this.useExternalService = builder.useExternalService;
114
    this.namespace =
115
        (builder.namespace == null) ? RegisterTestNamespace.NAMESPACE : builder.namespace;
116
    this.workflowTypes = (builder.workflowTypes == null) ? new Class[0] : builder.workflowTypes;
117
    this.activityImplementations =
118
        (builder.activityImplementations == null) ? new Object[0] : builder.activityImplementations;
119
    this.serviceStubsOptions =
120
        (builder.workflowServiceStubsOptions == null)
121
            ? WorkflowServiceStubsOptions.newBuilder().build()
122
            : builder.workflowServiceStubsOptions;
123
    this.clientOptions =
124
        (builder.workflowClientOptions == null)
125
            ? WorkflowClientOptions.newBuilder().setNamespace(namespace).build()
126
            : builder.workflowClientOptions.toBuilder().setNamespace(namespace).build();
127
    this.workerOptions =
128
        (builder.workerOptions == null)
129
            ? WorkerOptions.newBuilder().build()
130
            : builder.workerOptions;
131
    this.workerFactoryOptions =
132
        (builder.workerFactoryOptions == null)
133
            ? WorkerFactoryOptions.newBuilder().build()
134
            : builder.workerFactoryOptions;
135
    this.workflowImplementationOptions =
136
        (builder.workflowImplementationOptions == null)
137
            ? WorkflowImplementationOptions.newBuilder().build()
138
            : builder.workflowImplementationOptions;
139
    this.globalTimeout =
140
        !DebugModeUtils.isTemporalDebugModeOn() && builder.testTimeoutSeconds != 0
141
            ? Timeout.seconds(builder.testTimeoutSeconds)
142
            : null;
143

144
    this.target = builder.target;
145
    this.useTimeskipping = builder.useTimeskipping;
146
    this.metricsScope = builder.metricsScope;
147
    this.searchAttributes = builder.searchAttributes;
148

149
    this.testEnvironment =
150
        TestWorkflowEnvironment.newInstance(createTestEnvOptions(builder.initialTimeMillis));
151
  }
152

153
  protected TestEnvironmentOptions createTestEnvOptions(long initialTimeMillis) {
154
    return TestEnvironmentOptions.newBuilder()
155
        .setWorkflowServiceStubsOptions(serviceStubsOptions)
156
        .setWorkflowClientOptions(clientOptions)
157
        .setWorkerFactoryOptions(workerFactoryOptions)
158
        .setUseExternalService(useExternalService)
159
        .setUseTimeskipping(useTimeskipping)
160
        .setTarget(target)
161
        .setInitialTimeMillis(initialTimeMillis)
162
        .setMetricsScope(metricsScope)
163
        .setSearchAttributes(searchAttributes)
164
        .build();
165
  }
166

167
  public static Builder newBuilder() {
168
    return new Builder();
169
  }
170

171
  public static class Builder {
172

173
    private String namespace;
174
    private String target;
175
    private boolean useExternalService;
176
    private boolean doNotStart;
177
    private long initialTimeMillis;
178
    // Default to TestEnvironmentOptions isUseTimeskipping
179
    private boolean useTimeskipping =
180
        TestEnvironmentOptions.getDefaultInstance().isUseTimeskipping();
181

182
    private Class<?>[] workflowTypes;
183
    private Object[] activityImplementations;
184
    private WorkflowServiceStubsOptions workflowServiceStubsOptions;
185
    private WorkflowClientOptions workflowClientOptions;
186
    private WorkerFactoryOptions workerFactoryOptions;
187
    private WorkflowImplementationOptions workflowImplementationOptions;
188
    private WorkerOptions workerOptions;
189
    private long testTimeoutSeconds;
190
    @Nonnull private final Map<String, IndexedValueType> searchAttributes = new HashMap<>();
191
    private Scope metricsScope;
192

193
    protected Builder() {}
194

195
    public Builder setWorkerOptions(WorkerOptions options) {
196
      this.workerOptions = options;
197
      return this;
198
    }
199

200
    public void setWorkflowServiceStubsOptions(
201
        WorkflowServiceStubsOptions workflowServiceStubsOptions) {
202
      this.workflowServiceStubsOptions = workflowServiceStubsOptions;
203
    }
204

205
    /**
206
     * Override {@link WorkflowClientOptions} for test environment. If set, takes precedence over
207
     * {@link #setNamespace(String) namespace}.
208
     */
209
    public Builder setWorkflowClientOptions(WorkflowClientOptions workflowClientOptions) {
210
      this.workflowClientOptions = workflowClientOptions;
211
      return this;
212
    }
213

214
    public Builder setWorkerFactoryOptions(WorkerFactoryOptions options) {
215
      this.workerFactoryOptions = options;
216
      return this;
217
    }
218

219
    public Builder setNamespace(String namespace) {
220
      this.namespace = namespace;
221
      return this;
222
    }
223

224
    public Builder setWorkflowTypes(Class<?>... workflowTypes) {
225
      this.workflowTypes = workflowTypes;
226
      return this;
227
    }
228

229
    public Builder setWorkflowTypes(
230
        WorkflowImplementationOptions implementationOptions, Class<?>... workflowTypes) {
231
      this.workflowImplementationOptions = implementationOptions;
232
      this.workflowTypes = workflowTypes;
233
      return this;
234
    }
235

236
    public Builder setActivityImplementations(Object... activityImplementations) {
237
      this.activityImplementations = activityImplementations;
238
      return this;
239
    }
240

241
    /**
242
     * Switches between in-memory and external temporal service implementations.
243
     *
244
     * @param useExternalService use external service if true.
245
     *     <p>Default is false.
246
     */
247
    public Builder setUseExternalService(boolean useExternalService) {
248
      this.useExternalService = useExternalService;
249
      return this;
250
    }
251

252
    /**
253
     * Sets TestEnvironmentOptions.setUseTimeskippings. If true, no actual wall-clock time will pass
254
     * when a workflow sleeps or sets a timer.
255
     *
256
     * <p>Default is true
257
     */
258
    public Builder setUseTimeskipping(boolean useTimeskipping) {
259
      this.useTimeskipping = useTimeskipping;
260
      return this;
261
    }
262

263
    /**
264
     * Optional parameter that defines an endpoint which will be used for the communication with
265
     * standalone temporal service. Has no effect if {@link #setUseExternalService(boolean)} is set
266
     * to false.
267
     *
268
     * <p>Default is to use 127.0.0.1:7233
269
     */
270
    public Builder setTarget(String target) {
271
      this.target = target;
272
      return this;
273
    }
274

275
    /**
276
     * @deprecated Temporal test rule shouldn't be responsible for enforcing test timeouts. Use
277
     *     toolchain of your test framework to enforce timeouts.
278
     */
279
    @Deprecated
280
    public Builder setTestTimeoutSeconds(long testTimeoutSeconds) {
281
      this.testTimeoutSeconds = testTimeoutSeconds;
282
      return this;
283
    }
284

285
    /**
286
     * Set the initial time for the workflow virtual clock, milliseconds since epoch.
287
     *
288
     * <p>Default is current time
289
     */
290
    public Builder setInitialTimeMillis(long initialTimeMillis) {
291
      this.initialTimeMillis = initialTimeMillis;
292
      return this;
293
    }
294

295
    /**
296
     * Set the initial time for the workflow virtual clock.
297
     *
298
     * <p>Default is current time
299
     */
300
    public Builder setInitialTime(Instant initialTime) {
301
      this.initialTimeMillis = initialTime.toEpochMilli();
302
      return this;
303
    }
304

305
    /**
306
     * When set to true the {@link TestWorkflowEnvironment#start()} is not called by the rule before
307
     * executing the test. This to support tests that register activities and workflows with workers
308
     * directly instead of using only {@link TestWorkflowRule.Builder}.
309
     */
310
    public Builder setDoNotStart(boolean doNotStart) {
311
      this.doNotStart = doNotStart;
312
      return this;
313
    }
314

315
    /**
316
     * Add a search attribute to be registered on the Temporal Server.
317
     *
318
     * @param name name of the search attribute
319
     * @param type search attribute type
320
     * @return {@code this}
321
     * @see <a
322
     *     href="https://docs.temporal.io/docs/tctl/how-to-add-a-custom-search-attribute-to-a-cluster-using-tctl">Add
323
     *     a Custom Search Attribute Using tctl</a>
324
     */
325
    public Builder registerSearchAttribute(String name, IndexedValueType type) {
326
      this.searchAttributes.put(name, type);
327
      return this;
328
    }
329

330
    /**
331
     * Sets the scope to be used for metrics reporting. Optional. Default is to not report metrics.
332
     *
333
     * <p>Note: Don't mock {@link Scope} in tests! If you need to verify the metrics behavior,
334
     * create a real Scope and mock, stub or spy a reporter instance:<br>
335
     *
336
     * <pre>{@code
337
     * StatsReporter reporter = mock(StatsReporter.class);
338
     * Scope metricsScope =
339
     *     new RootScopeBuilder()
340
     *         .reporter(reporter)
341
     *         .reportEvery(com.uber.m3.util.Duration.ofMillis(10));
342
     * }</pre>
343
     *
344
     * @param metricsScope the scope to be used for metrics reporting.
345
     * @return {@code this}
346
     */
347
    public Builder setMetricsScope(Scope metricsScope) {
348
      this.metricsScope = metricsScope;
349
      return this;
350
    }
351

352
    public TestWorkflowRule build() {
353
      return new TestWorkflowRule(this);
354
    }
355
  }
356

357
  @Override
358
  public Statement apply(Statement base, Description description) {
359
    taskQueue = init(description);
360
    Statement testWorkflowStatement =
361
        new Statement() {
362
          @Override
363
          public void evaluate() throws Throwable {
364
            start();
365
            base.evaluate();
366
            shutdown();
367
          }
368
        };
369

370
    Test annotation = description.getAnnotation(Test.class);
371
    boolean timeoutIsOverriddenOnTestAnnotation = annotation != null && annotation.timeout() > 0;
372

373
    if (globalTimeout != null && !timeoutIsOverriddenOnTestAnnotation) {
374
      testWorkflowStatement = globalTimeout.apply(testWorkflowStatement, description);
375
    }
376

377
    return watchman.apply(testWorkflowStatement, description);
378
  }
379

380
  private String init(Description description) {
381
    String testMethod = description.getMethodName();
382
    String taskQueue = "WorkflowTest-" + testMethod + "-" + UUID.randomUUID();
383
    Worker worker = testEnvironment.newWorker(taskQueue, workerOptions);
384
    worker.registerWorkflowImplementationTypes(workflowImplementationOptions, workflowTypes);
385
    worker.registerActivitiesImplementations(activityImplementations);
386
    return taskQueue;
387
  }
388

389
  private void start() {
390
    if (!doNotStart) {
391
      testEnvironment.start();
392
    }
393
  }
394

395
  protected void shutdown() {
396
    testEnvironment.close();
397
  }
398

399
  /**
400
   * See {@link Builder#setUseExternalService(boolean)}
401
   *
402
   * @return true if the rule is using external temporal service.
403
   */
404
  public boolean isUseExternalService() {
405
    return useExternalService;
406
  }
407

408
  public TestWorkflowEnvironment getTestEnvironment() {
409
    return testEnvironment;
410
  }
411

412
  /**
413
   * @return name of the task queue that test worker is polling.
414
   */
415
  public String getTaskQueue() {
416
    return taskQueue;
417
  }
418

419
  /**
420
   * @return client to the Temporal service used to start and query workflows.
421
   */
422
  public WorkflowClient getWorkflowClient() {
423
    return testEnvironment.getWorkflowClient();
424
  }
425

426
  /**
427
   * @return stubs connected to the test server (in-memory or external)
428
   */
429
  public WorkflowServiceStubs getWorkflowServiceStubs() {
430
    return testEnvironment.getWorkflowServiceStubs();
431
  }
432

433
  /**
434
   * @return blockingStub
435
   */
436
  public WorkflowServiceGrpc.WorkflowServiceBlockingStub blockingStub() {
437
    return getWorkflowServiceStubs().blockingStub();
438
  }
439

440
  /**
441
   * @return tracer.
442
   */
443
  public <T extends WorkerInterceptor> T getInterceptor(Class<T> type) {
444
    if (workerFactoryOptions.getWorkerInterceptors() != null) {
445
      for (WorkerInterceptor interceptor : workerFactoryOptions.getWorkerInterceptors()) {
446
        if (type.isInstance(interceptor)) {
447
          return type.cast(interceptor);
448
        }
449
      }
450
    }
451
    return null;
452
  }
453

454
  /**
455
   * @return workflow execution history
456
   * @deprecated use {@link WorkflowClient#fetchHistory(String, String)}. To obtain a WorkflowClient
457
   *     use {@link #getWorkflowClient()}
458
   */
459
  @Deprecated
460
  public History getHistory(@Nonnull WorkflowExecution execution) {
461
    return testEnvironment.getWorkflowExecutionHistory(execution).getHistory();
462
  }
463

464
  /**
465
   * @return name of the task queue that test worker is polling.
466
   * @deprecated use {@link WorkflowClient#fetchHistory(String, String)}. To obtain a WorkflowClient
467
   *     use {@link #getWorkflowClient()}
468
   */
469
  @Deprecated
470
  public History getWorkflowExecutionHistory(WorkflowExecution execution) {
471
    return testEnvironment.getWorkflowExecutionHistory(execution).getHistory();
472
  }
473

474
  /**
475
   * This worker listens to the default task queue which is obtainable via the {@link
476
   * #getTaskQueue()} method.
477
   *
478
   * @return the default worker created for each test method.
479
   */
480
  public Worker getWorker() {
481
    return testEnvironment.getWorkerFactory().getWorker(getTaskQueue());
482
  }
483

484
  public <T> T newWorkflowStub(Class<T> workflow) {
485
    return getWorkflowClient()
486
        .newWorkflowStub(workflow, newWorkflowOptionsForTaskQueue(getTaskQueue()));
487
  }
488

489
  public WorkflowStub newUntypedWorkflowStub(String workflow) {
490
    return getWorkflowClient()
491
        .newUntypedWorkflowStub(workflow, newWorkflowOptionsForTaskQueue(getTaskQueue()));
492
  }
493

494
  private static WorkflowOptions newWorkflowOptionsForTaskQueue(String taskQueue) {
495
    return WorkflowOptions.newBuilder().setTaskQueue(taskQueue).build();
496
  }
497
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc