• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

temporalio / sdk-java / #333

16 Oct 2024 07:28PM UTC coverage: 78.65% (+0.6%) from 78.085%
#333

push

github

web-flow
Fix code coverage (#2275)

22670 of 28824 relevant lines covered (78.65%)

0.79 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

81.13
/temporal-testing/src/main/java/io/temporal/testing/TestWorkflowEnvironmentInternal.java
1
/*
2
 * Copyright (C) 2022 Temporal Technologies, Inc. All Rights Reserved.
3
 *
4
 * Copyright (C) 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved.
5
 *
6
 * Modifications copyright (C) 2017 Uber Technologies, Inc.
7
 *
8
 * Licensed under the Apache License, Version 2.0 (the "License");
9
 * you may not use this material except in compliance with the License.
10
 * You may obtain a copy of the License at
11
 *
12
 *   http://www.apache.org/licenses/LICENSE-2.0
13
 *
14
 * Unless required by applicable law or agreed to in writing, software
15
 * distributed under the License is distributed on an "AS IS" BASIS,
16
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17
 * See the License for the specific language governing permissions and
18
 * limitations under the License.
19
 */
20

21
package io.temporal.testing;
22

23
import com.google.common.base.Preconditions;
24
import com.google.common.collect.ObjectArrays;
25
import com.google.protobuf.ByteString;
26
import com.google.protobuf.Empty;
27
import com.uber.m3.tally.NoopScope;
28
import com.uber.m3.tally.Scope;
29
import io.grpc.Status;
30
import io.grpc.StatusRuntimeException;
31
import io.temporal.api.common.v1.Payload;
32
import io.temporal.api.common.v1.WorkflowExecution;
33
import io.temporal.api.enums.v1.IndexedValueType;
34
import io.temporal.api.nexus.v1.Endpoint;
35
import io.temporal.api.nexus.v1.EndpointSpec;
36
import io.temporal.api.nexus.v1.EndpointTarget;
37
import io.temporal.api.operatorservice.v1.AddSearchAttributesRequest;
38
import io.temporal.api.operatorservice.v1.CreateNexusEndpointRequest;
39
import io.temporal.api.testservice.v1.SleepRequest;
40
import io.temporal.client.WorkflowClient;
41
import io.temporal.client.WorkflowClientOptions;
42
import io.temporal.common.WorkflowExecutionHistory;
43
import io.temporal.internal.common.ProtobufTimeUtils;
44
import io.temporal.internal.testservice.TestWorkflowService;
45
import io.temporal.serviceclient.*;
46
import io.temporal.testserver.TestServer;
47
import io.temporal.worker.Worker;
48
import io.temporal.worker.WorkerFactory;
49
import io.temporal.worker.WorkerOptions;
50
import java.time.Duration;
51
import java.util.concurrent.TimeUnit;
52
import javax.annotation.Nonnull;
53
import javax.annotation.Nullable;
54

55
public final class TestWorkflowEnvironmentInternal implements TestWorkflowEnvironment {
56

57
  private final WorkflowClientOptions workflowClientOptions;
58
  private final WorkflowServiceStubs workflowServiceStubs;
59
  private final OperatorServiceStubs operatorServiceStubs;
60
  private final @Nullable TestServiceStubs testServiceStubs;
61
  private final @Nullable TestServer.InProcessTestServer inProcessServer;
62
  private final @Nullable TestWorkflowService service;
63
  private final WorkerFactory workerFactory;
64
  private final @Nullable TimeLockingInterceptor timeLockingInterceptor;
65
  private final IdempotentTimeLocker constructorTimeLock;
66

67
  public TestWorkflowEnvironmentInternal(@Nullable TestEnvironmentOptions testEnvironmentOptions) {
1✔
68
    if (testEnvironmentOptions == null) {
1✔
69
      testEnvironmentOptions = TestEnvironmentOptions.getDefaultInstance();
×
70
    }
71
    this.workflowClientOptions =
1✔
72
        WorkflowClientOptions.newBuilder(testEnvironmentOptions.getWorkflowClientOptions())
1✔
73
            .validateAndBuildWithDefaults();
1✔
74

75
    WorkflowServiceStubsOptions.Builder stubsOptionsBuilder =
76
        testEnvironmentOptions.getWorkflowServiceStubsOptions() != null
1✔
77
            ? WorkflowServiceStubsOptions.newBuilder(
1✔
78
                testEnvironmentOptions.getWorkflowServiceStubsOptions())
1✔
79
            : WorkflowServiceStubsOptions.newBuilder();
1✔
80

81
    Scope metricsScope = testEnvironmentOptions.getMetricsScope();
1✔
82
    if (metricsScope != null && !(NoopScope.class.equals(metricsScope.getClass()))) {
1✔
83
      stubsOptionsBuilder = stubsOptionsBuilder.setMetricsScope(metricsScope);
1✔
84
    }
85

86
    if (testEnvironmentOptions.isUseExternalService()) {
1✔
87
      this.inProcessServer = null;
×
88
      this.service = null;
×
89
      this.workflowServiceStubs =
×
90
          WorkflowServiceStubs.newServiceStubs(
×
91
              stubsOptionsBuilder.setTarget(testEnvironmentOptions.getTarget()).build());
×
92
      this.testServiceStubs = null;
×
93
      this.timeLockingInterceptor = null;
×
94
      this.constructorTimeLock = null;
×
95
    } else {
96
      this.inProcessServer =
1✔
97
          TestServer.createServer(true, testEnvironmentOptions.getInitialTimeMillis());
1✔
98
      this.service = fetchWorkflowService();
1✔
99

100
      WorkflowServiceStubsOptions workflowServiceStubsOptions =
1✔
101
          stubsOptionsBuilder
102
              .setChannel(this.inProcessServer.getChannel())
1✔
103
              .setTarget(null)
1✔
104
              .validateAndBuildWithDefaults();
1✔
105
      this.workflowServiceStubs = WorkflowServiceStubs.newServiceStubs(workflowServiceStubsOptions);
1✔
106
      this.testServiceStubs =
1✔
107
          TestServiceStubs.newServiceStubs(
1✔
108
              TestServiceStubsOptions.newBuilder(workflowServiceStubsOptions)
1✔
109
                  // we don't want long calls to test service to throw with DEADLINE_EXCEEDED
110
                  .setRpcTimeout(Duration.ofMillis(Long.MAX_VALUE))
1✔
111
                  .validateAndBuildWithDefaults());
1✔
112
      this.timeLockingInterceptor = new TimeLockingInterceptor(this.testServiceStubs);
1✔
113

114
      if (!testEnvironmentOptions.isUseTimeskipping()) {
1✔
115
        // If the options ask for no timeskipping, lock one extra time. There will never be a
116
        // corresponding unlock, so timeskipping will always be off.
117
        this.constructorTimeLock = new IdempotentTimeLocker(this.testServiceStubs);
1✔
118
        this.constructorTimeLock.lockTimeSkipping();
1✔
119
      } else {
120
        this.constructorTimeLock = null;
1✔
121
      }
122
    }
123

124
    this.operatorServiceStubs =
1✔
125
        OperatorServiceStubs.newServiceStubs(
1✔
126
            OperatorServiceStubsOptions.newBuilder()
1✔
127
                .setChannel(workflowServiceStubs.getRawChannel())
1✔
128
                .validateAndBuildWithDefaults());
1✔
129

130
    WorkflowClient client =
1✔
131
        WorkflowClient.newInstance(this.workflowServiceStubs, this.workflowClientOptions);
1✔
132
    this.workerFactory =
1✔
133
        WorkerFactory.newInstance(client, testEnvironmentOptions.getWorkerFactoryOptions());
1✔
134

135
    testEnvironmentOptions.getSearchAttributes().forEach(this::registerSearchAttribute);
1✔
136
  }
1✔
137

138
  @SuppressWarnings("deprecation")
139
  private TestWorkflowService fetchWorkflowService() {
140
    return this.inProcessServer.getWorkflowService();
1✔
141
  }
142

143
  @Override
144
  public Worker newWorker(String taskQueue) {
145
    return workerFactory.newWorker(taskQueue, WorkerOptions.getDefaultInstance());
1✔
146
  }
147

148
  @Override
149
  public Worker newWorker(String taskQueue, WorkerOptions options) {
150
    return workerFactory.newWorker(taskQueue, options);
1✔
151
  }
152

153
  @Override
154
  public WorkflowClient getWorkflowClient() {
155
    WorkflowClientOptions options;
156
    if (timeLockingInterceptor != null) {
1✔
157
      options =
1✔
158
          WorkflowClientOptions.newBuilder(workflowClientOptions)
1✔
159
              .setInterceptors(
1✔
160
                  ObjectArrays.concat(
1✔
161
                      workflowClientOptions.getInterceptors(), timeLockingInterceptor))
1✔
162
              .build();
1✔
163
    } else {
164
      options = workflowClientOptions;
×
165
    }
166
    return WorkflowClient.newInstance(workflowServiceStubs, options);
1✔
167
  }
168

169
  @Override
170
  public long currentTimeMillis() {
171
    if (testServiceStubs != null) {
1✔
172
      return ProtobufTimeUtils.toJavaInstant(
1✔
173
              testServiceStubs.blockingStub().getCurrentTime(Empty.newBuilder().build()).getTime())
1✔
174
          .toEpochMilli();
1✔
175
    } else {
176
      return System.currentTimeMillis();
×
177
    }
178
  }
179

180
  @Override
181
  public void sleep(Duration duration) {
182
    if (testServiceStubs != null) {
1✔
183
      testServiceStubs
1✔
184
          .blockingStub()
1✔
185
          .unlockTimeSkippingWithSleep(
1✔
186
              SleepRequest.newBuilder()
1✔
187
                  .setDuration(ProtobufTimeUtils.toProtoDuration(duration))
1✔
188
                  .build());
1✔
189
    } else {
190
      try {
191
        Thread.sleep(duration.toMillis());
×
192
      } catch (InterruptedException e) {
×
193
        Thread.currentThread().interrupt();
×
194
        throw new RuntimeException(e);
×
195
      }
×
196
    }
197
  }
1✔
198

199
  @Override
200
  public void registerDelayedCallback(Duration delay, Runnable r) {
201
    Preconditions.checkState(
1✔
202
        service != null, "registerDelayedCallback is not supported with the external service");
203
    service.registerDelayedCallback(delay, r);
1✔
204
  }
1✔
205

206
  @Override
207
  public boolean registerSearchAttribute(String name, IndexedValueType type) {
208
    if (IndexedValueType.INDEXED_VALUE_TYPE_UNSPECIFIED.equals(type)) {
1✔
209
      throw new IllegalArgumentException(
×
210
          "Class " + type + " can't be used as a search attribute type");
211
    }
212
    AddSearchAttributesRequest request =
213
        AddSearchAttributesRequest.newBuilder()
1✔
214
            .setNamespace(getNamespace())
1✔
215
            .putSearchAttributes(name, type)
1✔
216
            .build();
1✔
217
    try {
218
      operatorServiceStubs.blockingStub().addSearchAttributes(request);
1✔
219
      return true;
1✔
220
    } catch (StatusRuntimeException e) {
×
221
      if (Status.Code.ALREADY_EXISTS.equals(e.getStatus().getCode())) {
×
222
        return false;
×
223
      }
224
      throw e;
×
225
    }
226
  }
227

228
  @Override
229
  public Endpoint createNexusEndpoint(String name, String taskQueue) {
230
    EndpointSpec spec =
231
        EndpointSpec.newBuilder()
1✔
232
            .setName(name)
1✔
233
            .setDescription(
1✔
234
                Payload.newBuilder()
1✔
235
                    .setData(
1✔
236
                        ByteString.copyFromUtf8(
1✔
237
                            "Test Nexus endpoint created by the Java SDK WorkflowTestEnvironment")))
238
            .setTarget(
1✔
239
                EndpointTarget.newBuilder()
1✔
240
                    .setWorker(
1✔
241
                        EndpointTarget.Worker.newBuilder()
1✔
242
                            .setNamespace(getNamespace())
1✔
243
                            .setTaskQueue(taskQueue)))
1✔
244
            .build();
1✔
245
    CreateNexusEndpointRequest request =
246
        CreateNexusEndpointRequest.newBuilder().setSpec(spec).build();
1✔
247
    return operatorServiceStubs.blockingStub().createNexusEndpoint(request).getEndpoint();
1✔
248
  }
249

250
  public void deleteNexusEndpoint(Endpoint endpoint) {
251
    operatorServiceStubs
1✔
252
        .blockingStub()
1✔
253
        .deleteNexusEndpoint(
1✔
254
            io.temporal.api.operatorservice.v1.DeleteNexusEndpointRequest.newBuilder()
1✔
255
                .setId(endpoint.getId())
1✔
256
                .setVersion(endpoint.getVersion())
1✔
257
                .build());
1✔
258
  }
1✔
259

260
  @Deprecated
261
  public WorkflowServiceStubs getWorkflowService() {
262
    return getWorkflowServiceStubs();
×
263
  }
264

265
  @Override
266
  public WorkflowServiceStubs getWorkflowServiceStubs() {
267
    return workflowServiceStubs;
1✔
268
  }
269

270
  @Override
271
  public OperatorServiceStubs getOperatorServiceStubs() {
272
    return operatorServiceStubs;
1✔
273
  }
274

275
  @Override
276
  public String getNamespace() {
277
    return workflowClientOptions.getNamespace();
1✔
278
  }
279

280
  @Override
281
  public String getDiagnostics() {
282
    Preconditions.checkState(
×
283
        service != null, "getDiagnostics is not supported with the external service");
284
    StringBuilder result = new StringBuilder();
×
285
    service.getDiagnostics(result);
×
286
    return result.toString();
×
287
  }
288

289
  @Override
290
  @Deprecated
291
  public WorkflowExecutionHistory getWorkflowExecutionHistory(
292
      @Nonnull WorkflowExecution execution) {
293
    Preconditions.checkNotNull(execution, "execution is required");
×
294
    return getWorkflowClient().fetchHistory(execution.getWorkflowId(), execution.getRunId());
×
295
  }
296

297
  @Override
298
  public void close() {
299
    if (testServiceStubs != null) {
1✔
300
      testServiceStubs.shutdownNow();
1✔
301
    }
302
    operatorServiceStubs.shutdownNow();
1✔
303
    workerFactory.shutdownNow();
1✔
304
    workerFactory.awaitTermination(10, TimeUnit.SECONDS);
1✔
305
    if (constructorTimeLock != null) {
1✔
306
      constructorTimeLock.unlockTimeSkipping();
1✔
307
    }
308
    workflowServiceStubs.shutdownNow();
1✔
309
    if (testServiceStubs != null) {
1✔
310
      testServiceStubs.awaitTermination(1, TimeUnit.SECONDS);
1✔
311
    }
312
    operatorServiceStubs.awaitTermination(1, TimeUnit.SECONDS);
1✔
313
    workflowServiceStubs.awaitTermination(1, TimeUnit.SECONDS);
1✔
314
    if (inProcessServer != null) {
1✔
315
      inProcessServer.close();
1✔
316
    }
317
  }
1✔
318

319
  @Override
320
  public void start() {
321
    workerFactory.start();
1✔
322
  }
1✔
323

324
  @Override
325
  public boolean isStarted() {
326
    return workerFactory.isStarted();
1✔
327
  }
328

329
  @Override
330
  public boolean isShutdown() {
331
    return workerFactory.isShutdown();
×
332
  }
333

334
  @Override
335
  public boolean isTerminated() {
336
    return workerFactory.isTerminated();
×
337
  }
338

339
  @Override
340
  @Deprecated
341
  public void shutdownTestService() {
342
    if (service != null) {
1✔
343
      service.close();
1✔
344
    }
345
  }
1✔
346

347
  @Override
348
  public void shutdown() {
349
    workerFactory.shutdown();
1✔
350
  }
1✔
351

352
  @Override
353
  public void shutdownNow() {
354
    workerFactory.shutdownNow();
1✔
355
  }
1✔
356

357
  @Override
358
  public void awaitTermination(long timeout, TimeUnit unit) {
359
    workerFactory.awaitTermination(timeout, unit);
1✔
360
  }
1✔
361

362
  @Override
363
  public WorkerFactory getWorkerFactory() {
364
    return workerFactory;
1✔
365
  }
366
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc