• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

temporalio / sdk-java / #169

pending completion
#169

push

github-actions

web-flow
Remove use of deprecated API (#1758)

4 of 4 new or added lines in 1 file covered. (100.0%)

17345 of 21558 relevant lines covered (80.46%)

0.8 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

69.21
/temporal-test-server/src/main/java/io/temporal/internal/testservice/TestWorkflowService.java
1
/*
2
 * Copyright (C) 2022 Temporal Technologies, Inc. All Rights Reserved.
3
 *
4
 * Copyright (C) 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved.
5
 *
6
 * Modifications copyright (C) 2017 Uber Technologies, Inc.
7
 *
8
 * Licensed under the Apache License, Version 2.0 (the "License");
9
 * you may not use this material except in compliance with the License.
10
 * You may obtain a copy of the License at
11
 *
12
 *   http://www.apache.org/licenses/LICENSE-2.0
13
 *
14
 * Unless required by applicable law or agreed to in writing, software
15
 * distributed under the License is distributed on an "AS IS" BASIS,
16
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17
 * See the License for the specific language governing permissions and
18
 * limitations under the License.
19
 */
20

21
package io.temporal.internal.testservice;
22

23
import static io.temporal.internal.testservice.CronUtils.getBackoffInterval;
24

25
import com.google.common.base.Preconditions;
26
import com.google.common.base.Throwables;
27
import com.google.protobuf.ByteString;
28
import com.google.protobuf.Empty;
29
import com.google.protobuf.Timestamp;
30
import com.google.protobuf.util.Timestamps;
31
import io.grpc.*;
32
import io.grpc.stub.StreamObserver;
33
import io.temporal.api.command.v1.SignalExternalWorkflowExecutionCommandAttributes;
34
import io.temporal.api.common.v1.Payload;
35
import io.temporal.api.common.v1.Payloads;
36
import io.temporal.api.common.v1.RetryPolicy;
37
import io.temporal.api.common.v1.WorkflowExecution;
38
import io.temporal.api.enums.v1.NamespaceState;
39
import io.temporal.api.enums.v1.SignalExternalWorkflowExecutionFailedCause;
40
import io.temporal.api.enums.v1.WorkflowExecutionStatus;
41
import io.temporal.api.enums.v1.WorkflowIdReusePolicy;
42
import io.temporal.api.errordetails.v1.WorkflowExecutionAlreadyStartedFailure;
43
import io.temporal.api.failure.v1.Failure;
44
import io.temporal.api.history.v1.WorkflowExecutionContinuedAsNewEventAttributes;
45
import io.temporal.api.namespace.v1.NamespaceInfo;
46
import io.temporal.api.testservice.v1.LockTimeSkippingRequest;
47
import io.temporal.api.testservice.v1.SleepRequest;
48
import io.temporal.api.testservice.v1.UnlockTimeSkippingRequest;
49
import io.temporal.api.workflow.v1.WorkflowExecutionInfo;
50
import io.temporal.api.workflowservice.v1.*;
51
import io.temporal.internal.common.ProtobufTimeUtils;
52
import io.temporal.internal.testservice.TestWorkflowStore.WorkflowState;
53
import io.temporal.serviceclient.StatusUtils;
54
import io.temporal.serviceclient.WorkflowServiceStubs;
55
import io.temporal.serviceclient.WorkflowServiceStubsOptions;
56
import java.io.Closeable;
57
import java.io.IOException;
58
import java.time.Clock;
59
import java.time.Duration;
60
import java.util.*;
61
import java.util.concurrent.*;
62
import java.util.concurrent.locks.Lock;
63
import java.util.concurrent.locks.ReentrantLock;
64
import javax.annotation.Nonnull;
65
import javax.annotation.Nullable;
66
import org.slf4j.Logger;
67
import org.slf4j.LoggerFactory;
68

69
/**
70
 * In memory implementation of the Workflow Service. To be used for testing purposes only.
71
 *
72
 * <p>Do not use directly, instead use {@link io.temporal.testing.TestWorkflowEnvironment}.
73
 */
74
public final class TestWorkflowService extends WorkflowServiceGrpc.WorkflowServiceImplBase
75
    implements Closeable {
76
  private static final Logger log = LoggerFactory.getLogger(TestWorkflowService.class);
1✔
77
  private final Map<ExecutionId, TestWorkflowMutableState> executions = new HashMap<>();
1✔
78
  // key->WorkflowId
79
  private final Map<WorkflowId, TestWorkflowMutableState> executionsByWorkflowId = new HashMap<>();
1✔
80
  private final ExecutorService executor = Executors.newCachedThreadPool();
1✔
81
  private final Lock lock = new ReentrantLock();
1✔
82

83
  private final TestWorkflowStore store;
84
  private final TestVisibilityStore visibilityStore;
85
  private final SelfAdvancingTimer selfAdvancingTimer;
86

87
  private final ScheduledExecutorService backgroundScheduler =
1✔
88
      Executors.newSingleThreadScheduledExecutor();
1✔
89

90
  private final Server outOfProcessServer;
91
  private final InProcessGRPCServer inProcessServer;
92
  private final WorkflowServiceStubs workflowServiceStubs;
93

94
  TestWorkflowService(
95
      TestWorkflowStore store,
96
      TestVisibilityStore visibilityStore,
97
      SelfAdvancingTimer selfAdvancingTimer) {
1✔
98
    this.store = store;
1✔
99
    this.visibilityStore = visibilityStore;
1✔
100
    this.selfAdvancingTimer = selfAdvancingTimer;
1✔
101
    this.outOfProcessServer = null;
1✔
102
    this.inProcessServer = null;
1✔
103
    this.workflowServiceStubs = null;
1✔
104
  }
1✔
105

106
  @Override
107
  public void close() {
108
    log.debug("Shutting down TestWorkflowService");
1✔
109

110
    log.debug("Shutting down background scheduler");
1✔
111
    backgroundScheduler.shutdown();
1✔
112

113
    if (outOfProcessServer != null) {
1✔
114
      log.info("Shutting down out-of-process GRPC server");
×
115
      outOfProcessServer.shutdown();
×
116
    }
117

118
    if (workflowServiceStubs != null) {
1✔
119
      workflowServiceStubs.shutdown();
×
120
    }
121

122
    if (inProcessServer != null) {
1✔
123
      log.info("Shutting down in-process GRPC server");
×
124
      inProcessServer.shutdown();
×
125
    }
126

127
    executor.shutdown();
1✔
128

129
    try {
130
      executor.awaitTermination(1, TimeUnit.SECONDS);
1✔
131

132
      if (outOfProcessServer != null) {
1✔
133
        outOfProcessServer.awaitTermination(1, TimeUnit.SECONDS);
×
134
      }
135

136
      if (workflowServiceStubs != null) {
1✔
137
        workflowServiceStubs.awaitTermination(1, TimeUnit.SECONDS);
×
138
      }
139

140
      if (inProcessServer != null) {
1✔
141
        inProcessServer.awaitTermination(1, TimeUnit.SECONDS);
×
142
      }
143

144
    } catch (InterruptedException e) {
×
145
      Thread.currentThread().interrupt();
×
146
      log.debug("shutdown interrupted", e);
×
147
    }
1✔
148

149
    store.close();
1✔
150
  }
1✔
151

152
  private TestWorkflowMutableState getMutableState(ExecutionId executionId) {
153
    return getMutableState(executionId, true);
1✔
154
  }
155

156
  private TestWorkflowMutableState getMutableState(ExecutionId executionId, boolean failNotExists) {
157
    lock.lock();
1✔
158
    try {
159
      if (executionId.getExecution().getRunId().isEmpty()) {
1✔
160
        return getMutableState(executionId.getWorkflowId(), failNotExists);
1✔
161
      }
162
      TestWorkflowMutableState mutableState = executions.get(executionId);
1✔
163
      if (mutableState == null && failNotExists) {
1✔
164
        throw Status.NOT_FOUND
1✔
165
            .withDescription(
1✔
166
                "Execution \""
167
                    + executionId
168
                    + "\" not found in mutable state. Known executions: "
169
                    + executions.values()
1✔
170
                    + ", service="
171
                    + this)
172
            .asRuntimeException();
1✔
173
      }
174
      return mutableState;
1✔
175
    } finally {
176
      lock.unlock();
1✔
177
    }
178
  }
179

180
  private TestWorkflowMutableState getMutableState(WorkflowId workflowId, boolean failNotExists) {
181
    lock.lock();
1✔
182
    try {
183
      TestWorkflowMutableState mutableState = executionsByWorkflowId.get(workflowId);
1✔
184
      if (mutableState == null && failNotExists) {
1✔
185
        throw Status.NOT_FOUND
1✔
186
            .withDescription("Execution not found in mutable state: " + workflowId)
1✔
187
            .asRuntimeException();
1✔
188
      }
189
      return mutableState;
1✔
190
    } finally {
191
      lock.unlock();
1✔
192
    }
193
  }
194

195
  @Override
196
  public void startWorkflowExecution(
197
      StartWorkflowExecutionRequest request,
198
      StreamObserver<StartWorkflowExecutionResponse> responseObserver) {
199
    try {
200
      Duration backoffInterval = getBackoffInterval(request.getCronSchedule(), store.currentTime());
1✔
201
      StartWorkflowExecutionResponse response =
1✔
202
          startWorkflowExecutionImpl(
1✔
203
              request, backoffInterval, Optional.empty(), OptionalLong.empty(), null);
1✔
204
      responseObserver.onNext(response);
1✔
205
      responseObserver.onCompleted();
1✔
206
    } catch (StatusRuntimeException e) {
1✔
207
      handleStatusRuntimeException(e, responseObserver);
1✔
208
    }
1✔
209
  }
1✔
210

211
  StartWorkflowExecutionResponse startWorkflowExecutionImpl(
212
      StartWorkflowExecutionRequest startRequest,
213
      Duration backoffStartInterval,
214
      Optional<TestWorkflowMutableState> parent,
215
      OptionalLong parentChildInitiatedEventId,
216
      @Nullable SignalWorkflowExecutionRequest signalWithStartSignal) {
217
    String requestWorkflowId = requireNotNull("WorkflowId", startRequest.getWorkflowId());
1✔
218
    String namespace = requireNotNull("Namespace", startRequest.getNamespace());
1✔
219
    WorkflowId workflowId = new WorkflowId(namespace, requestWorkflowId);
1✔
220
    TestWorkflowMutableState existing;
221
    lock.lock();
1✔
222
    try {
223
      String newRunId = UUID.randomUUID().toString();
1✔
224
      existing = executionsByWorkflowId.get(workflowId);
1✔
225
      if (existing != null) {
1✔
226
        WorkflowExecutionStatus status = existing.getWorkflowExecutionStatus();
1✔
227
        WorkflowIdReusePolicy policy = startRequest.getWorkflowIdReusePolicy();
1✔
228

229
        if (status == WorkflowExecutionStatus.WORKFLOW_EXECUTION_STATUS_RUNNING
1✔
230
            && policy == WorkflowIdReusePolicy.WORKFLOW_ID_REUSE_POLICY_TERMINATE_IF_RUNNING) {
231
          existing.terminateWorkflowExecution(
1✔
232
              TerminateWorkflowExecutionRequest.newBuilder()
1✔
233
                  .setNamespace(startRequest.getNamespace())
1✔
234
                  .setWorkflowExecution(existing.getExecutionId().getExecution())
1✔
235
                  .setReason("TerminateIfRunning WorkflowIdReusePolicy Policy")
1✔
236
                  .setIdentity("history-service")
1✔
237
                  .setDetails(
1✔
238
                      Payloads.newBuilder()
1✔
239
                          .addPayloads(
1✔
240
                              Payload.newBuilder()
1✔
241
                                  .setData(
1✔
242
                                      ByteString.copyFromUtf8(
1✔
243
                                          String.format("terminated by new runID: %s", newRunId)))
1✔
244
                                  .build())
1✔
245
                          .build())
1✔
246
                  .build());
1✔
247
        } else if (status == WorkflowExecutionStatus.WORKFLOW_EXECUTION_STATUS_RUNNING
1✔
248
            || policy == WorkflowIdReusePolicy.WORKFLOW_ID_REUSE_POLICY_REJECT_DUPLICATE) {
249
          return throwDuplicatedWorkflow(startRequest, existing);
×
250
        } else if (policy
1✔
251
                == WorkflowIdReusePolicy.WORKFLOW_ID_REUSE_POLICY_ALLOW_DUPLICATE_FAILED_ONLY
252
            && (status == WorkflowExecutionStatus.WORKFLOW_EXECUTION_STATUS_COMPLETED
253
                || status == WorkflowExecutionStatus.WORKFLOW_EXECUTION_STATUS_CONTINUED_AS_NEW)) {
254
          return throwDuplicatedWorkflow(startRequest, existing);
×
255
        }
256
      }
257
      Optional<TestServiceRetryState> retryState;
258
      Optional<Failure> lastFailure = Optional.empty();
1✔
259
      if (startRequest.hasRetryPolicy()) {
1✔
260
        Duration expirationInterval =
1✔
261
            ProtobufTimeUtils.toJavaDuration(startRequest.getWorkflowExecutionTimeout());
1✔
262
        retryState = newRetryStateLocked(startRequest.getRetryPolicy(), expirationInterval);
1✔
263
        if (retryState.isPresent()) {
1✔
264
          lastFailure = retryState.get().getPreviousRunFailure();
1✔
265
        }
266
      } else {
1✔
267
        retryState = Optional.empty();
1✔
268
      }
269
      return startWorkflowExecutionNoRunningCheckLocked(
1✔
270
          startRequest,
271
          newRunId,
272
          // it's the first execution in the continue-as-new chain, so firstExecutionRunId =
273
          // newRunId
274
          newRunId,
275
          Optional.empty(),
1✔
276
          retryState,
277
          backoffStartInterval,
278
          null,
279
          lastFailure,
280
          parent,
281
          parentChildInitiatedEventId,
282
          signalWithStartSignal,
283
          workflowId);
284
    } finally {
285
      lock.unlock();
1✔
286
    }
287
  }
288

289
  private Optional<TestServiceRetryState> newRetryStateLocked(
290
      RetryPolicy retryPolicy, Duration expirationInterval) {
291
    Timestamp expirationTime =
292
        expirationInterval.isZero()
1✔
293
            ? Timestamps.fromNanos(0)
1✔
294
            : Timestamps.add(
1✔
295
                store.currentTime(), ProtobufTimeUtils.toProtoDuration(expirationInterval));
1✔
296
    return Optional.of(new TestServiceRetryState(retryPolicy, expirationTime));
1✔
297
  }
298

299
  private StartWorkflowExecutionResponse throwDuplicatedWorkflow(
300
      StartWorkflowExecutionRequest startRequest, TestWorkflowMutableState existing) {
301
    WorkflowExecution execution = existing.getExecutionId().getExecution();
1✔
302
    WorkflowExecutionAlreadyStartedFailure error =
303
        WorkflowExecutionAlreadyStartedFailure.newBuilder()
1✔
304
            .setRunId(execution.getRunId())
1✔
305
            .setStartRequestId(startRequest.getRequestId())
1✔
306
            .build();
1✔
307
    throw StatusUtils.newException(
1✔
308
        Status.ALREADY_EXISTS.withDescription(
1✔
309
            String.format(
1✔
310
                "WorkflowId: %s, " + "RunId: %s", execution.getWorkflowId(), execution.getRunId())),
1✔
311
        error,
312
        WorkflowExecutionAlreadyStartedFailure.getDescriptor());
1✔
313
  }
314

315
  private StartWorkflowExecutionResponse startWorkflowExecutionNoRunningCheckLocked(
316
      StartWorkflowExecutionRequest startRequest,
317
      @Nonnull String runId,
318
      @Nonnull String firstExecutionRunId,
319
      Optional<String> continuedExecutionRunId,
320
      Optional<TestServiceRetryState> retryState,
321
      Duration backoffStartInterval,
322
      Payloads lastCompletionResult,
323
      Optional<Failure> lastFailure,
324
      Optional<TestWorkflowMutableState> parent,
325
      OptionalLong parentChildInitiatedEventId,
326
      @Nullable SignalWorkflowExecutionRequest signalWithStartSignal,
327
      WorkflowId workflowId) {
328
    String namespace = startRequest.getNamespace();
1✔
329
    TestWorkflowMutableState mutableState =
1✔
330
        new TestWorkflowMutableStateImpl(
331
            startRequest,
332
            firstExecutionRunId,
333
            runId,
334
            retryState,
335
            backoffStartInterval,
336
            lastCompletionResult,
337
            lastFailure,
338
            parent,
339
            parentChildInitiatedEventId,
340
            continuedExecutionRunId,
341
            this,
342
            store,
343
            visibilityStore,
344
            selfAdvancingTimer);
345
    WorkflowExecution execution = mutableState.getExecutionId().getExecution();
1✔
346
    ExecutionId executionId = new ExecutionId(namespace, execution);
1✔
347
    executionsByWorkflowId.put(workflowId, mutableState);
1✔
348
    executions.put(executionId, mutableState);
1✔
349

350
    PollWorkflowTaskQueueRequest eagerWorkflowTaskPollRequest =
351
        startRequest.getRequestEagerExecution()
1✔
352
            ? PollWorkflowTaskQueueRequest.newBuilder()
1✔
353
                .setIdentity(startRequest.getIdentity())
1✔
354
                .setNamespace(startRequest.getNamespace())
1✔
355
                .setTaskQueue(startRequest.getTaskQueue())
1✔
356
                .build()
1✔
357
            : null;
1✔
358

359
    @Nullable
360
    PollWorkflowTaskQueueResponse eagerWorkflowTask =
1✔
361
        mutableState.startWorkflow(
1✔
362
            continuedExecutionRunId.isPresent(),
1✔
363
            signalWithStartSignal,
364
            eagerWorkflowTaskPollRequest);
365
    StartWorkflowExecutionResponse.Builder response =
366
        StartWorkflowExecutionResponse.newBuilder().setRunId(execution.getRunId());
1✔
367
    if (eagerWorkflowTask != null) {
1✔
368
      response.setEagerWorkflowTask(eagerWorkflowTask);
1✔
369
    }
370
    return response.build();
1✔
371
  }
372

373
  @Override
374
  public void getWorkflowExecutionHistory(
375
      GetWorkflowExecutionHistoryRequest getRequest,
376
      StreamObserver<GetWorkflowExecutionHistoryResponse> responseObserver) {
377
    ExecutionId executionId = new ExecutionId(getRequest.getNamespace(), getRequest.getExecution());
1✔
378
    executor.execute(
1✔
379
        // preserving gRPC context deadline between threads
380
        Context.current()
1✔
381
            .wrap(
1✔
382
                () -> {
383
                  try {
384
                    TestWorkflowMutableState mutableState = getMutableState(executionId);
1✔
385
                    responseObserver.onNext(
1✔
386
                        store.getWorkflowExecutionHistory(
1✔
387
                            mutableState.getExecutionId(),
1✔
388
                            getRequest,
389
                            // We explicitly don't try to respond inside the context deadline.
390
                            // If we try to fit into the context deadline, the deadline may be not
391
                            // expired on the client side and an empty response will lead to a new
392
                            // request, making the client hammer the server at the tail end of the
393
                            // deadline.
394
                            // So this call is designed to wait fully till the end of the
395
                            // context deadline and throw DEADLINE_EXCEEDED if the deadline is less
396
                            // than 20s.
397
                            // If it's longer than 20 seconds - we return an empty result.
398
                            Deadline.after(20, TimeUnit.SECONDS)));
1✔
399
                    responseObserver.onCompleted();
1✔
400
                  } catch (StatusRuntimeException e) {
1✔
401
                    if (e.getStatus().getCode() == Status.Code.INTERNAL) {
1✔
402
                      log.error("unexpected", e);
×
403
                    }
404
                    responseObserver.onError(e);
1✔
405
                  } catch (Exception e) {
×
406
                    log.error("unexpected", e);
×
407
                    responseObserver.onError(e);
×
408
                  }
1✔
409
                }));
1✔
410
  }
1✔
411

412
  private <T> T pollTaskQueue(Context ctx, Future<T> futureValue)
413
      throws ExecutionException, InterruptedException {
414
    final Context.CancellationListener canceler = context -> futureValue.cancel(true);
1✔
415
    ctx.addListener(canceler, this.backgroundScheduler);
1✔
416
    try {
417
      return futureValue.get();
1✔
418
    } finally {
419
      ctx.removeListener(canceler);
1✔
420
    }
421
  }
422

423
  @Override
424
  public void pollWorkflowTaskQueue(
425
      PollWorkflowTaskQueueRequest pollRequest,
426
      StreamObserver<PollWorkflowTaskQueueResponse> responseObserver) {
427
    try (Context.CancellableContext ctx = deadlineCtx(getLongPollDeadline())) {
1✔
428
      PollWorkflowTaskQueueResponse.Builder task;
429
      try {
430
        task = pollTaskQueue(ctx, store.pollWorkflowTaskQueue(pollRequest));
1✔
431
      } catch (ExecutionException e) {
×
432
        responseObserver.onError(e);
×
433
        return;
×
434
      } catch (InterruptedException e) {
×
435
        Thread.currentThread().interrupt();
×
436
        responseObserver.onNext(PollWorkflowTaskQueueResponse.getDefaultInstance());
×
437
        responseObserver.onCompleted();
×
438
        return;
×
439
      } catch (CancellationException e) {
1✔
440
        responseObserver.onNext(PollWorkflowTaskQueueResponse.getDefaultInstance());
1✔
441
        responseObserver.onCompleted();
1✔
442
        return;
1✔
443
      }
1✔
444

445
      ExecutionId executionId =
1✔
446
          new ExecutionId(pollRequest.getNamespace(), task.getWorkflowExecution());
1✔
447
      TestWorkflowMutableState mutableState = getMutableState(executionId);
1✔
448
      try {
449
        mutableState.startWorkflowTask(task, pollRequest);
1✔
450
        // The task always has the original task queue that was created as part of the response.
451
        // This may be a different task queue than the task queue it was scheduled on, as in the
452
        // case of sticky execution.
453
        task.setWorkflowExecutionTaskQueue(mutableState.getStartRequest().getTaskQueue());
1✔
454
        PollWorkflowTaskQueueResponse response = task.build();
1✔
455
        responseObserver.onNext(response);
1✔
456
        responseObserver.onCompleted();
1✔
457
      } catch (StatusRuntimeException e) {
×
458
        if (e.getStatus().getCode() == Status.Code.NOT_FOUND) {
×
459
          if (log.isDebugEnabled()) {
×
460
            log.debug("Skipping outdated workflow task for " + executionId, e);
×
461
          }
462
          // The real service doesn't return this call on outdated task.
463
          // For simplicity, we return an empty result here.
464
          responseObserver.onNext(PollWorkflowTaskQueueResponse.getDefaultInstance());
×
465
          responseObserver.onCompleted();
×
466
        } else {
467
          if (e.getStatus().getCode() == Status.Code.INTERNAL) {
×
468
            log.error("unexpected", e);
×
469
          }
470
          responseObserver.onError(e);
×
471
        }
472
      }
1✔
473
    }
1✔
474
  }
1✔
475

476
  @Override
477
  public void respondWorkflowTaskCompleted(
478
      RespondWorkflowTaskCompletedRequest request,
479
      StreamObserver<RespondWorkflowTaskCompletedResponse> responseObserver) {
480
    try {
481
      WorkflowTaskToken taskToken = WorkflowTaskToken.fromBytes(request.getTaskToken());
1✔
482
      TestWorkflowMutableState mutableState = getMutableState(taskToken.getExecutionId());
1✔
483
      mutableState.completeWorkflowTask(taskToken.getHistorySize(), request);
1✔
484
      responseObserver.onNext(RespondWorkflowTaskCompletedResponse.getDefaultInstance());
1✔
485
      responseObserver.onCompleted();
1✔
486
    } catch (StatusRuntimeException e) {
1✔
487
      handleStatusRuntimeException(e, responseObserver);
1✔
488
    } catch (Throwable e) {
×
489
      responseObserver.onError(
×
490
          Status.INTERNAL
491
              .withDescription(Throwables.getStackTraceAsString(e))
×
492
              .withCause(e)
×
493
              .asRuntimeException());
×
494
    }
1✔
495
  }
1✔
496

497
  @Override
498
  public void respondWorkflowTaskFailed(
499
      RespondWorkflowTaskFailedRequest failedRequest,
500
      StreamObserver<RespondWorkflowTaskFailedResponse> responseObserver) {
501
    try {
502
      WorkflowTaskToken taskToken = WorkflowTaskToken.fromBytes(failedRequest.getTaskToken());
1✔
503
      TestWorkflowMutableState mutableState = getMutableState(taskToken.getExecutionId());
1✔
504
      mutableState.failWorkflowTask(failedRequest);
1✔
505
      responseObserver.onNext(RespondWorkflowTaskFailedResponse.getDefaultInstance());
1✔
506
      responseObserver.onCompleted();
1✔
507
    } catch (StatusRuntimeException e) {
×
508
      handleStatusRuntimeException(e, responseObserver);
×
509
    }
1✔
510
  }
1✔
511

512
  private Context.CancellableContext deadlineCtx(Deadline deadline) {
513
    return Context.current().withDeadline(deadline, this.backgroundScheduler);
1✔
514
  }
515

516
  @Override
517
  public void pollActivityTaskQueue(
518
      PollActivityTaskQueueRequest pollRequest,
519
      StreamObserver<PollActivityTaskQueueResponse> responseObserver) {
520
    try (Context.CancellableContext ctx = deadlineCtx(getLongPollDeadline())) {
1✔
521

522
      PollActivityTaskQueueResponse.Builder task;
523
      try {
524
        task = pollTaskQueue(ctx, store.pollActivityTaskQueue(pollRequest));
1✔
525
      } catch (ExecutionException e) {
×
526
        responseObserver.onError(e);
×
527
        return;
×
528
      } catch (InterruptedException e) {
×
529
        Thread.currentThread().interrupt();
×
530
        responseObserver.onNext(PollActivityTaskQueueResponse.getDefaultInstance());
×
531
        responseObserver.onCompleted();
×
532
        return;
×
533
      } catch (CancellationException e) {
1✔
534
        responseObserver.onNext(PollActivityTaskQueueResponse.getDefaultInstance());
1✔
535
        responseObserver.onCompleted();
1✔
536
        return;
1✔
537
      }
1✔
538

539
      ExecutionId executionId =
1✔
540
          new ExecutionId(pollRequest.getNamespace(), task.getWorkflowExecution());
1✔
541
      TestWorkflowMutableState mutableState = getMutableState(executionId);
1✔
542
      try {
543
        mutableState.startActivityTask(task, pollRequest);
1✔
544
        responseObserver.onNext(task.build());
1✔
545
        responseObserver.onCompleted();
1✔
546
      } catch (StatusRuntimeException e) {
1✔
547
        if (e.getStatus().getCode() == Status.Code.NOT_FOUND) {
1✔
548
          if (log.isDebugEnabled()) {
1✔
549
            log.debug("Skipping outdated activity task for " + executionId, e);
×
550
          }
551
          responseObserver.onNext(PollActivityTaskQueueResponse.getDefaultInstance());
1✔
552
          responseObserver.onCompleted();
1✔
553
        } else {
554
          if (e.getStatus().getCode() == Status.Code.INTERNAL) {
×
555
            log.error("unexpected", e);
×
556
          }
557
          responseObserver.onError(e);
×
558
        }
559
      }
1✔
560
    }
1✔
561
  }
1✔
562

563
  @Override
564
  public void recordActivityTaskHeartbeat(
565
      RecordActivityTaskHeartbeatRequest heartbeatRequest,
566
      StreamObserver<RecordActivityTaskHeartbeatResponse> responseObserver) {
567
    try {
568
      ActivityTaskToken activityTaskToken =
1✔
569
          ActivityTaskToken.fromBytes(heartbeatRequest.getTaskToken());
1✔
570
      TestWorkflowMutableState mutableState = getMutableState(activityTaskToken.getExecutionId());
1✔
571
      boolean cancelRequested =
1✔
572
          mutableState.heartbeatActivityTask(
1✔
573
              activityTaskToken.getScheduledEventId(), heartbeatRequest.getDetails());
1✔
574
      responseObserver.onNext(
1✔
575
          RecordActivityTaskHeartbeatResponse.newBuilder()
1✔
576
              .setCancelRequested(cancelRequested)
1✔
577
              .build());
1✔
578
      responseObserver.onCompleted();
1✔
579
    } catch (StatusRuntimeException e) {
1✔
580
      handleStatusRuntimeException(e, responseObserver);
1✔
581
    }
1✔
582
  }
1✔
583

584
  @Override
585
  public void recordActivityTaskHeartbeatById(
586
      RecordActivityTaskHeartbeatByIdRequest heartbeatRequest,
587
      StreamObserver<RecordActivityTaskHeartbeatByIdResponse> responseObserver) {
588
    try {
589
      ExecutionId execution =
×
590
          new ExecutionId(
591
              heartbeatRequest.getNamespace(),
×
592
              heartbeatRequest.getWorkflowId(),
×
593
              heartbeatRequest.getRunId());
×
594
      TestWorkflowMutableState mutableState = getMutableState(execution);
×
595
      boolean cancelRequested =
×
596
          mutableState.heartbeatActivityTaskById(
×
597
              heartbeatRequest.getActivityId(),
×
598
              heartbeatRequest.getDetails(),
×
599
              heartbeatRequest.getIdentity());
×
600
      responseObserver.onNext(
×
601
          RecordActivityTaskHeartbeatByIdResponse.newBuilder()
×
602
              .setCancelRequested(cancelRequested)
×
603
              .build());
×
604
      responseObserver.onCompleted();
×
605
    } catch (StatusRuntimeException e) {
×
606
      handleStatusRuntimeException(e, responseObserver);
×
607
    }
×
608
  }
×
609

610
  @Override
611
  public void respondActivityTaskCompleted(
612
      RespondActivityTaskCompletedRequest completeRequest,
613
      StreamObserver<RespondActivityTaskCompletedResponse> responseObserver) {
614
    try {
615
      ActivityTaskToken activityTaskToken =
1✔
616
          ActivityTaskToken.fromBytes(completeRequest.getTaskToken());
1✔
617
      TestWorkflowMutableState mutableState = getMutableState(activityTaskToken.getExecutionId());
1✔
618
      mutableState.completeActivityTask(activityTaskToken.getScheduledEventId(), completeRequest);
1✔
619
      responseObserver.onNext(RespondActivityTaskCompletedResponse.getDefaultInstance());
1✔
620
      responseObserver.onCompleted();
1✔
621
    } catch (StatusRuntimeException e) {
1✔
622
      handleStatusRuntimeException(e, responseObserver);
1✔
623
    }
1✔
624
  }
1✔
625

626
  @Override
627
  public void respondActivityTaskCompletedById(
628
      RespondActivityTaskCompletedByIdRequest completeRequest,
629
      StreamObserver<RespondActivityTaskCompletedByIdResponse> responseObserver) {
630
    try {
631
      ExecutionId executionId =
×
632
          new ExecutionId(
633
              completeRequest.getNamespace(),
×
634
              completeRequest.getWorkflowId(),
×
635
              completeRequest.getRunId());
×
636
      TestWorkflowMutableState mutableState = getMutableState(executionId);
×
637
      mutableState.completeActivityTaskById(completeRequest.getActivityId(), completeRequest);
×
638
      responseObserver.onNext(RespondActivityTaskCompletedByIdResponse.getDefaultInstance());
×
639
      responseObserver.onCompleted();
×
640
    } catch (StatusRuntimeException e) {
×
641
      handleStatusRuntimeException(e, responseObserver);
×
642
    }
×
643
  }
×
644

645
  @Override
646
  public void respondActivityTaskFailed(
647
      RespondActivityTaskFailedRequest failRequest,
648
      StreamObserver<RespondActivityTaskFailedResponse> responseObserver) {
649
    try {
650
      ActivityTaskToken activityTaskToken = ActivityTaskToken.fromBytes(failRequest.getTaskToken());
1✔
651
      TestWorkflowMutableState mutableState = getMutableState(activityTaskToken.getExecutionId());
1✔
652
      mutableState.failActivityTask(activityTaskToken.getScheduledEventId(), failRequest);
1✔
653
      responseObserver.onNext(RespondActivityTaskFailedResponse.getDefaultInstance());
1✔
654
      responseObserver.onCompleted();
1✔
655
    } catch (StatusRuntimeException e) {
1✔
656
      handleStatusRuntimeException(e, responseObserver);
1✔
657
    }
1✔
658
  }
1✔
659

660
  @Override
661
  public void respondActivityTaskFailedById(
662
      RespondActivityTaskFailedByIdRequest failRequest,
663
      StreamObserver<RespondActivityTaskFailedByIdResponse> responseObserver) {
664
    try {
665
      ExecutionId executionId =
×
666
          new ExecutionId(
667
              failRequest.getNamespace(), failRequest.getWorkflowId(), failRequest.getRunId());
×
668
      TestWorkflowMutableState mutableState = getMutableState(executionId);
×
669
      mutableState.failActivityTaskById(failRequest.getActivityId(), failRequest);
×
670
      responseObserver.onNext(RespondActivityTaskFailedByIdResponse.getDefaultInstance());
×
671
      responseObserver.onCompleted();
×
672
    } catch (StatusRuntimeException e) {
×
673
      handleStatusRuntimeException(e, responseObserver);
×
674
    }
×
675
  }
×
676

677
  @Override
678
  public void respondActivityTaskCanceled(
679
      RespondActivityTaskCanceledRequest canceledRequest,
680
      StreamObserver<RespondActivityTaskCanceledResponse> responseObserver) {
681
    try {
682
      ActivityTaskToken activityTaskToken =
1✔
683
          ActivityTaskToken.fromBytes(canceledRequest.getTaskToken());
1✔
684
      TestWorkflowMutableState mutableState = getMutableState(activityTaskToken.getExecutionId());
1✔
685
      mutableState.cancelActivityTask(activityTaskToken.getScheduledEventId(), canceledRequest);
1✔
686
      responseObserver.onNext(RespondActivityTaskCanceledResponse.getDefaultInstance());
1✔
687
      responseObserver.onCompleted();
1✔
688
    } catch (StatusRuntimeException e) {
×
689
      handleStatusRuntimeException(e, responseObserver);
×
690
    }
1✔
691
  }
1✔
692

693
  @Override
694
  public void respondActivityTaskCanceledById(
695
      RespondActivityTaskCanceledByIdRequest canceledRequest,
696
      StreamObserver<RespondActivityTaskCanceledByIdResponse> responseObserver) {
697
    try {
698
      ExecutionId executionId =
×
699
          new ExecutionId(
700
              canceledRequest.getNamespace(),
×
701
              canceledRequest.getWorkflowId(),
×
702
              canceledRequest.getRunId());
×
703
      TestWorkflowMutableState mutableState = getMutableState(executionId);
×
704
      mutableState.cancelActivityTaskById(canceledRequest.getActivityId(), canceledRequest);
×
705
      responseObserver.onNext(RespondActivityTaskCanceledByIdResponse.getDefaultInstance());
×
706
      responseObserver.onCompleted();
×
707
    } catch (StatusRuntimeException e) {
×
708
      handleStatusRuntimeException(e, responseObserver);
×
709
    }
×
710
  }
×
711

712
  @Override
713
  public void requestCancelWorkflowExecution(
714
      RequestCancelWorkflowExecutionRequest cancelRequest,
715
      StreamObserver<RequestCancelWorkflowExecutionResponse> responseObserver) {
716
    try {
717
      requestCancelWorkflowExecution(cancelRequest, Optional.empty());
1✔
718
      responseObserver.onNext(RequestCancelWorkflowExecutionResponse.getDefaultInstance());
1✔
719
      responseObserver.onCompleted();
1✔
720
    } catch (StatusRuntimeException e) {
1✔
721
      handleStatusRuntimeException(e, responseObserver);
1✔
722
    }
1✔
723
  }
1✔
724

725
  void requestCancelWorkflowExecution(
726
      RequestCancelWorkflowExecutionRequest cancelRequest,
727
      Optional<TestWorkflowMutableStateImpl.CancelExternalWorkflowExecutionCallerInfo> callerInfo) {
728
    ExecutionId executionId =
1✔
729
        new ExecutionId(cancelRequest.getNamespace(), cancelRequest.getWorkflowExecution());
1✔
730
    TestWorkflowMutableState mutableState = getMutableState(executionId);
1✔
731
    mutableState.requestCancelWorkflowExecution(cancelRequest, callerInfo);
1✔
732
  }
1✔
733

734
  @Override
735
  public void terminateWorkflowExecution(
736
      TerminateWorkflowExecutionRequest request,
737
      StreamObserver<TerminateWorkflowExecutionResponse> responseObserver) {
738
    try {
739
      terminateWorkflowExecution(request);
1✔
740
      responseObserver.onNext(TerminateWorkflowExecutionResponse.getDefaultInstance());
1✔
741
      responseObserver.onCompleted();
1✔
742
    } catch (StatusRuntimeException e) {
1✔
743
      handleStatusRuntimeException(e, responseObserver);
1✔
744
    }
1✔
745
  }
1✔
746

747
  private void terminateWorkflowExecution(TerminateWorkflowExecutionRequest request) {
748
    ExecutionId executionId =
1✔
749
        new ExecutionId(request.getNamespace(), request.getWorkflowExecution());
1✔
750
    TestWorkflowMutableState mutableState = getMutableState(executionId);
1✔
751
    mutableState.terminateWorkflowExecution(request);
1✔
752
  }
1✔
753

754
  @Override
755
  public void signalWorkflowExecution(
756
      SignalWorkflowExecutionRequest signalRequest,
757
      StreamObserver<SignalWorkflowExecutionResponse> responseObserver) {
758
    try {
759
      ExecutionId executionId =
1✔
760
          new ExecutionId(signalRequest.getNamespace(), signalRequest.getWorkflowExecution());
1✔
761
      TestWorkflowMutableState mutableState = getMutableState(executionId);
1✔
762
      mutableState.signal(signalRequest);
1✔
763
      responseObserver.onNext(SignalWorkflowExecutionResponse.getDefaultInstance());
1✔
764
      responseObserver.onCompleted();
1✔
765
    } catch (StatusRuntimeException e) {
1✔
766
      handleStatusRuntimeException(e, responseObserver);
1✔
767
    }
1✔
768
  }
1✔
769

770
  @Override
771
  public void updateWorkflowExecution(
772
      UpdateWorkflowExecutionRequest request,
773
      StreamObserver<UpdateWorkflowExecutionResponse> responseObserver) {
774
    // TODO(https://github.com/temporalio/sdk-java/issues/1742) Add support for update to the test
775
    // server
776
    responseObserver.onError(
×
777
        Status.UNIMPLEMENTED
778
            .withDescription("Test server does not implement update")
×
779
            .asRuntimeException());
×
780
  }
×
781

782
  @Override
783
  public void signalWithStartWorkflowExecution(
784
      SignalWithStartWorkflowExecutionRequest r,
785
      StreamObserver<SignalWithStartWorkflowExecutionResponse> responseObserver) {
786
    try {
787
      if (!r.hasTaskQueue()) {
1✔
788
        throw Status.INVALID_ARGUMENT
×
789
            .withDescription("request missing required taskQueue field")
×
790
            .asRuntimeException();
×
791
      }
792
      if (!r.hasWorkflowType()) {
1✔
793
        throw Status.INVALID_ARGUMENT
×
794
            .withDescription("request missing required workflowType field")
×
795
            .asRuntimeException();
×
796
      }
797
      ExecutionId executionId = new ExecutionId(r.getNamespace(), r.getWorkflowId(), null);
1✔
798
      TestWorkflowMutableState mutableState = getMutableState(executionId, false);
1✔
799
      SignalWorkflowExecutionRequest signalRequest =
800
          SignalWorkflowExecutionRequest.newBuilder()
1✔
801
              .setInput(r.getSignalInput())
1✔
802
              .setSignalName(r.getSignalName())
1✔
803
              .setWorkflowExecution(executionId.getExecution())
1✔
804
              .setRequestId(r.getRequestId())
1✔
805
              .setControl(r.getControl())
1✔
806
              .setNamespace(r.getNamespace())
1✔
807
              .setIdentity(r.getIdentity())
1✔
808
              .build();
1✔
809
      if (mutableState != null && !mutableState.isTerminalState()) {
1✔
810
        mutableState.signal(signalRequest);
1✔
811
        responseObserver.onNext(
1✔
812
            SignalWithStartWorkflowExecutionResponse.newBuilder()
1✔
813
                .setRunId(mutableState.getExecutionId().getExecution().getRunId())
1✔
814
                .build());
1✔
815
        responseObserver.onCompleted();
1✔
816
        return;
1✔
817
      }
818
      StartWorkflowExecutionRequest.Builder startRequest =
819
          StartWorkflowExecutionRequest.newBuilder()
1✔
820
              .setRequestId(r.getRequestId())
1✔
821
              .setInput(r.getInput())
1✔
822
              .setWorkflowExecutionTimeout(r.getWorkflowExecutionTimeout())
1✔
823
              .setWorkflowRunTimeout(r.getWorkflowRunTimeout())
1✔
824
              .setWorkflowTaskTimeout(r.getWorkflowTaskTimeout())
1✔
825
              .setNamespace(r.getNamespace())
1✔
826
              .setTaskQueue(r.getTaskQueue())
1✔
827
              .setWorkflowId(r.getWorkflowId())
1✔
828
              .setWorkflowIdReusePolicy(r.getWorkflowIdReusePolicy())
1✔
829
              .setIdentity(r.getIdentity())
1✔
830
              .setWorkflowType(r.getWorkflowType())
1✔
831
              .setCronSchedule(r.getCronSchedule())
1✔
832
              .setRequestId(r.getRequestId());
1✔
833
      if (r.hasRetryPolicy()) {
1✔
834
        startRequest.setRetryPolicy(r.getRetryPolicy());
×
835
      }
836
      if (r.hasHeader()) {
1✔
837
        startRequest.setHeader(r.getHeader());
1✔
838
      }
839
      if (r.hasMemo()) {
1✔
840
        startRequest.setMemo(r.getMemo());
×
841
      }
842
      if (r.hasSearchAttributes()) {
1✔
843
        startRequest.setSearchAttributes(r.getSearchAttributes());
1✔
844
      }
845
      StartWorkflowExecutionResponse startResult =
1✔
846
          startWorkflowExecutionImpl(
1✔
847
              startRequest.build(),
1✔
848
              Duration.ZERO,
849
              Optional.empty(),
1✔
850
              OptionalLong.empty(),
1✔
851
              signalRequest);
852
      responseObserver.onNext(
1✔
853
          SignalWithStartWorkflowExecutionResponse.newBuilder()
1✔
854
              .setRunId(startResult.getRunId())
1✔
855
              .build());
1✔
856
      responseObserver.onCompleted();
1✔
857
    } catch (StatusRuntimeException e) {
1✔
858
      handleStatusRuntimeException(e, responseObserver);
1✔
859
    }
1✔
860
  }
1✔
861

862
  public void signalExternalWorkflowExecution(
863
      String signalId,
864
      SignalExternalWorkflowExecutionCommandAttributes commandAttributes,
865
      TestWorkflowMutableState source) {
866
    String namespace;
867
    if (commandAttributes.getNamespace().isEmpty()) {
1✔
868
      namespace = source.getExecutionId().getNamespace();
1✔
869
    } else {
870
      namespace = commandAttributes.getNamespace();
×
871
    }
872
    ExecutionId executionId = new ExecutionId(namespace, commandAttributes.getExecution());
1✔
873
    TestWorkflowMutableState mutableState;
874
    try {
875
      mutableState = getMutableState(executionId);
1✔
876
      mutableState.signalFromWorkflow(commandAttributes);
1✔
877
      source.completeSignalExternalWorkflowExecution(
1✔
878
          signalId, mutableState.getExecutionId().getExecution().getRunId());
1✔
879
    } catch (StatusRuntimeException e) {
1✔
880
      if (e.getStatus().getCode() == Status.Code.NOT_FOUND) {
1✔
881
        source.failSignalExternalWorkflowExecution(
1✔
882
            signalId,
883
            SignalExternalWorkflowExecutionFailedCause
884
                .SIGNAL_EXTERNAL_WORKFLOW_EXECUTION_FAILED_CAUSE_EXTERNAL_WORKFLOW_EXECUTION_NOT_FOUND);
885
      } else {
886
        throw e;
×
887
      }
888
    }
1✔
889
  }
1✔
890

891
  /**
892
   * Creates next run of a workflow execution
893
   *
894
   * @return RunId
895
   */
896
  public String continueAsNew(
897
      StartWorkflowExecutionRequest previousRunStartRequest,
898
      WorkflowExecutionContinuedAsNewEventAttributes a,
899
      Optional<TestServiceRetryState> retryState,
900
      String identity,
901
      ExecutionId continuedExecutionId,
902
      String firstExecutionRunId,
903
      Optional<TestWorkflowMutableState> parent,
904
      OptionalLong parentChildInitiatedEventId) {
905
    StartWorkflowExecutionRequest.Builder startRequestBuilder =
906
        StartWorkflowExecutionRequest.newBuilder()
1✔
907
            .setRequestId(UUID.randomUUID().toString())
1✔
908
            .setWorkflowType(a.getWorkflowType())
1✔
909
            .setWorkflowRunTimeout(a.getWorkflowRunTimeout())
1✔
910
            .setWorkflowTaskTimeout(a.getWorkflowTaskTimeout())
1✔
911
            .setNamespace(continuedExecutionId.getNamespace())
1✔
912
            .setTaskQueue(a.getTaskQueue())
1✔
913
            .setWorkflowId(continuedExecutionId.getWorkflowId().getWorkflowId())
1✔
914
            .setWorkflowIdReusePolicy(previousRunStartRequest.getWorkflowIdReusePolicy())
1✔
915
            .setIdentity(identity)
1✔
916
            .setCronSchedule(previousRunStartRequest.getCronSchedule());
1✔
917
    if (previousRunStartRequest.hasRetryPolicy()) {
1✔
918
      startRequestBuilder.setRetryPolicy(previousRunStartRequest.getRetryPolicy());
1✔
919
    }
920
    if (a.hasInput()) {
1✔
921
      startRequestBuilder.setInput(a.getInput());
1✔
922
    }
923
    if (a.hasHeader()) {
1✔
924
      startRequestBuilder.setHeader(a.getHeader());
1✔
925
    }
926
    StartWorkflowExecutionRequest startRequest = startRequestBuilder.build();
1✔
927
    lock.lock();
1✔
928
    Optional<Failure> lastFail =
929
        a.hasFailure()
1✔
930
            ? Optional.of(a.getFailure())
1✔
931
            : retryState.flatMap(TestServiceRetryState::getPreviousRunFailure);
1✔
932
    try {
933
      StartWorkflowExecutionResponse response =
1✔
934
          startWorkflowExecutionNoRunningCheckLocked(
1✔
935
              startRequest,
936
              a.getNewExecutionRunId(),
1✔
937
              firstExecutionRunId,
938
              Optional.of(continuedExecutionId.getExecution().getRunId()),
1✔
939
              retryState,
940
              ProtobufTimeUtils.toJavaDuration(a.getBackoffStartInterval()),
1✔
941
              a.getLastCompletionResult(),
1✔
942
              lastFail,
943
              parent,
944
              parentChildInitiatedEventId,
945
              null,
946
              continuedExecutionId.getWorkflowId());
1✔
947
      return response.getRunId();
1✔
948
    } finally {
949
      lock.unlock();
1✔
950
    }
951
  }
952

953
  @Override
954
  public void listOpenWorkflowExecutions(
955
      ListOpenWorkflowExecutionsRequest listRequest,
956
      StreamObserver<ListOpenWorkflowExecutionsResponse> responseObserver) {
957
    try {
958
      Optional<String> workflowIdFilter;
959
      if (listRequest.hasExecutionFilter()
1✔
960
          && !listRequest.getExecutionFilter().getWorkflowId().isEmpty()) {
×
961
        workflowIdFilter = Optional.of(listRequest.getExecutionFilter().getWorkflowId());
×
962
      } else {
963
        workflowIdFilter = Optional.empty();
1✔
964
      }
965
      List<WorkflowExecutionInfo> result =
1✔
966
          store.listWorkflows(WorkflowState.OPEN, workflowIdFilter);
1✔
967
      responseObserver.onNext(
1✔
968
          ListOpenWorkflowExecutionsResponse.newBuilder().addAllExecutions(result).build());
1✔
969
      responseObserver.onCompleted();
1✔
970
    } catch (StatusRuntimeException e) {
×
971
      handleStatusRuntimeException(e, responseObserver);
×
972
    }
1✔
973
  }
1✔
974

975
  @Override
976
  public void listClosedWorkflowExecutions(
977
      ListClosedWorkflowExecutionsRequest listRequest,
978
      StreamObserver<ListClosedWorkflowExecutionsResponse> responseObserver) {
979
    try {
980
      Optional<String> workflowIdFilter;
981
      if (listRequest.hasExecutionFilter()
1✔
982
          && !listRequest.getExecutionFilter().getWorkflowId().isEmpty()) {
×
983
        workflowIdFilter = Optional.of(listRequest.getExecutionFilter().getWorkflowId());
×
984
      } else {
985
        workflowIdFilter = Optional.empty();
1✔
986
      }
987
      List<WorkflowExecutionInfo> result =
1✔
988
          store.listWorkflows(WorkflowState.CLOSED, workflowIdFilter);
1✔
989
      responseObserver.onNext(
1✔
990
          ListClosedWorkflowExecutionsResponse.newBuilder().addAllExecutions(result).build());
1✔
991
      responseObserver.onCompleted();
1✔
992
    } catch (StatusRuntimeException e) {
×
993
      handleStatusRuntimeException(e, responseObserver);
×
994
    }
1✔
995
  }
1✔
996

997
  @Override
998
  public void respondQueryTaskCompleted(
999
      RespondQueryTaskCompletedRequest completeRequest,
1000
      StreamObserver<RespondQueryTaskCompletedResponse> responseObserver) {
1001
    try {
1002
      QueryId queryId = QueryId.fromBytes(completeRequest.getTaskToken());
1✔
1003
      TestWorkflowMutableState mutableState = getMutableState(queryId.getExecutionId());
1✔
1004
      mutableState.completeQuery(queryId, completeRequest);
1✔
1005
      responseObserver.onNext(RespondQueryTaskCompletedResponse.getDefaultInstance());
1✔
1006
      responseObserver.onCompleted();
1✔
1007
    } catch (StatusRuntimeException e) {
×
1008
      handleStatusRuntimeException(e, responseObserver);
×
1009
    }
1✔
1010
  }
1✔
1011

1012
  @Override
1013
  public void queryWorkflow(
1014
      QueryWorkflowRequest queryRequest, StreamObserver<QueryWorkflowResponse> responseObserver) {
1015
    try {
1016
      ExecutionId executionId =
1✔
1017
          new ExecutionId(queryRequest.getNamespace(), queryRequest.getExecution());
1✔
1018
      TestWorkflowMutableState mutableState = getMutableState(executionId);
1✔
1019
      @Nullable Deadline deadline = Context.current().getDeadline();
1✔
1020
      QueryWorkflowResponse result =
1✔
1021
          mutableState.query(
1✔
1022
              queryRequest,
1023
              deadline != null ? deadline.timeRemaining(TimeUnit.MILLISECONDS) : Long.MAX_VALUE);
1✔
1024
      responseObserver.onNext(result);
1✔
1025
      responseObserver.onCompleted();
1✔
1026
    } catch (StatusRuntimeException e) {
1✔
1027
      handleStatusRuntimeException(e, responseObserver);
1✔
1028
    }
1✔
1029
  }
1✔
1030

1031
  @Override
1032
  public void describeWorkflowExecution(
1033
      DescribeWorkflowExecutionRequest request,
1034
      StreamObserver<io.temporal.api.workflowservice.v1.DescribeWorkflowExecutionResponse>
1035
          responseObserver) {
1036
    try {
1037
      if (request.getNamespace().isEmpty()) {
1✔
1038
        throw createInvalidArgument("Namespace not set on request.");
×
1039
      }
1040
      if (!request.hasExecution()) {
1✔
1041
        throw createInvalidArgument("Execution not set on request.");
×
1042
      }
1043

1044
      ExecutionId executionId = new ExecutionId(request.getNamespace(), request.getExecution());
1✔
1045
      TestWorkflowMutableState mutableState = getMutableState(executionId);
1✔
1046
      DescribeWorkflowExecutionResponse result = mutableState.describeWorkflowExecution();
1✔
1047
      responseObserver.onNext(result);
1✔
1048
      responseObserver.onCompleted();
1✔
1049
    } catch (StatusRuntimeException e) {
1✔
1050
      handleStatusRuntimeException(e, responseObserver);
1✔
1051
    }
1✔
1052
  }
1✔
1053

1054
  /**
1055
   * This method doesn't make much sense for test server, it accepts all namespaces as existent and
1056
   * registered. so, it's a trivial implementation just returning an info that a namespace is
1057
   * registered irrespectively of the input
1058
   */
1059
  @Override
1060
  public void describeNamespace(
1061
      DescribeNamespaceRequest request,
1062
      StreamObserver<DescribeNamespaceResponse> responseObserver) {
1063
    try {
1064
      if (request.getNamespace().isEmpty()) {
1✔
1065
        throw createInvalidArgument("Namespace not set on request.");
×
1066
      }
1067
      // generating a stable UUID for name
1068
      String namespaceId = UUID.nameUUIDFromBytes(request.getNamespace().getBytes()).toString();
1✔
1069
      DescribeNamespaceResponse result =
1070
          DescribeNamespaceResponse.newBuilder()
1✔
1071
              .setNamespaceInfo(
1✔
1072
                  NamespaceInfo.newBuilder()
1✔
1073
                      .setName(request.getNamespace())
1✔
1074
                      .setState(NamespaceState.NAMESPACE_STATE_REGISTERED)
1✔
1075
                      .setId(namespaceId)
1✔
1076
                      .build())
1✔
1077
              .build();
1✔
1078
      responseObserver.onNext(result);
1✔
1079
      responseObserver.onCompleted();
1✔
1080
    } catch (StatusRuntimeException e) {
1✔
1081
      handleStatusRuntimeException(e, responseObserver);
1✔
1082
    }
1✔
1083
  }
1✔
1084

1085
  private <R> R requireNotNull(String fieldName, R value) {
1086
    if (value == null) {
1✔
1087
      throw Status.INVALID_ARGUMENT
×
1088
          .withDescription("Missing required field \"" + fieldName + "\".")
×
1089
          .asRuntimeException();
×
1090
    }
1091
    return value;
1✔
1092
  }
1093

1094
  /**
1095
   * Adds diagnostic data about internal service state to the provided {@link StringBuilder}.
1096
   * Includes histories of all workflow instances stored in the service.
1097
   */
1098
  public void getDiagnostics(StringBuilder result) {
1099
    store.getDiagnostics(result);
×
1100
  }
×
1101

1102
  /**
1103
   * @deprecated use {@link io.temporal.serviceclient.TestServiceStubs} and {@link
1104
   *     io.temporal.api.testservice.v1.TestServiceGrpc.TestServiceBlockingStub#getCurrentTime(Empty)}
1105
   */
1106
  @Deprecated
1107
  public long currentTimeMillis() {
1108
    return selfAdvancingTimer.getClock().getAsLong();
×
1109
  }
1110

1111
  /** Invokes callback after the specified delay according to internal service clock. */
1112
  public void registerDelayedCallback(Duration delay, Runnable r) {
1113
    store.registerDelayedCallback(delay, r);
1✔
1114
  }
1✔
1115

1116
  /**
1117
   * Disables time skipping. To re-enable call {@link #unlockTimeSkipping(String)}. These calls are
1118
   * counted, so calling unlock does not guarantee that time is going to be skipped immediately as
1119
   * another lock can be holding it.
1120
   *
1121
   * @deprecated use {@link io.temporal.serviceclient.TestServiceStubs} and {@link
1122
   *     io.temporal.api.testservice.v1.TestServiceGrpc.TestServiceBlockingStub#lockTimeSkipping(LockTimeSkippingRequest)}
1123
   */
1124
  @Deprecated
1125
  public void lockTimeSkipping(String caller) {
1126
    selfAdvancingTimer.lockTimeSkipping(caller);
×
1127
  }
×
1128

1129
  /**
1130
   * @deprecated use {@link io.temporal.serviceclient.TestServiceStubs} and {@link
1131
   *     io.temporal.api.testservice.v1.TestServiceGrpc.TestServiceBlockingStub#unlockTimeSkipping(UnlockTimeSkippingRequest)}
1132
   */
1133
  @Deprecated
1134
  public void unlockTimeSkipping(String caller) {
1135
    selfAdvancingTimer.unlockTimeSkipping(caller);
×
1136
  }
×
1137

1138
  /**
1139
   * Unlocks time skipping and blocks the calling thread until internal clock passes the current +
1140
   * duration time.<br>
1141
   * When the time is reached, locks time skipping and returns.<br>
1142
   * Might not block at all due to time skipping. Or might block if the time skipping lock counter
1143
   * was more than 1.
1144
   *
1145
   * @deprecated use {@link io.temporal.serviceclient.TestServiceStubs} and {@link
1146
   *     io.temporal.api.testservice.v1.TestServiceGrpc.TestServiceBlockingStub#unlockTimeSkippingWithSleep(SleepRequest)}
1147
   */
1148
  @Deprecated
1149
  public void sleep(Duration duration) {
1150
    CompletableFuture<Void> result = new CompletableFuture<>();
×
1151
    selfAdvancingTimer.schedule(
×
1152
        duration,
1153
        () -> {
1154
          selfAdvancingTimer.lockTimeSkipping("TestWorkflowService sleep");
×
1155
          result.complete(null);
×
1156
        },
×
1157
        "workflow sleep");
1158
    selfAdvancingTimer.unlockTimeSkipping("TestWorkflowService sleep");
×
1159
    try {
1160
      result.get();
×
1161
    } catch (InterruptedException e) {
×
1162
      Thread.currentThread().interrupt();
×
1163
      throw new RuntimeException(e);
×
1164
    } catch (ExecutionException e) {
×
1165
      throw new RuntimeException(e);
×
1166
    }
×
1167
  }
×
1168

1169
  /**
1170
   * Temporal server times out task queue long poll calls after 1 minute and returns an empty
1171
   * result. After which the request has to be retried by the client if it wants to continue
1172
   * waiting. We emulate this behavior here.
1173
   *
1174
   * <p>If there is a deadline present, for task queue poll requests server will respond inside the
1175
   * deadline. Note that the latest is not applicable for getWorkflowExecutionHistory() long polls.
1176
   *
1177
   * @return minimum between the context deadline and maximum long poll deadline.
1178
   */
1179
  private Deadline getLongPollDeadline() {
1180
    @Nullable Deadline deadline = Context.current().getDeadline();
1✔
1181
    Deadline maximumDeadline =
1✔
1182
        Deadline.after(
1✔
1183
            WorkflowServiceStubsOptions.DEFAULT_SERVER_LONG_POLL_RPC_TIMEOUT.toMillis(),
1✔
1184
            TimeUnit.MILLISECONDS);
1185
    return deadline != null ? deadline.minimum(maximumDeadline) : maximumDeadline;
1✔
1186
  }
1187

1188
  private void handleStatusRuntimeException(
1189
      StatusRuntimeException e, StreamObserver<?> responseObserver) {
1190
    if (e.getStatus().getCode() == Status.Code.INTERNAL) {
1✔
1191
      log.error("unexpected", e);
1✔
1192
    }
1193
    responseObserver.onError(e);
1✔
1194
  }
1✔
1195

1196
  /**
1197
   * Creates an in-memory service along with client stubs for use in Java code. See also
1198
   * createServerOnly and createWithNoGrpcServer.
1199
   *
1200
   * @deprecated use {@link io.temporal.testserver.TestServer#createServer(boolean)} instead and
1201
   *     pass {@code lockTimeSkipping=false} to emulate the behavior of this method
1202
   */
1203
  @Deprecated
1204
  public TestWorkflowService() {
1205
    this(0, true);
×
1206
  }
×
1207

1208
  /**
1209
   * Creates an in-memory service along with client stubs for use in Java code. See also
1210
   * createServerOnly and createWithNoGrpcServer.
1211
   *
1212
   * @deprecated use {@link io.temporal.testserver.TestServer#createServer(boolean, long)} instead
1213
   *     and pass {@code lockTimeSkipping=false} to emulate the behavior of this method
1214
   */
1215
  @Deprecated
1216
  public TestWorkflowService(long initialTimeMillis) {
1217
    this(initialTimeMillis, true);
×
1218
  }
×
1219

1220
  /**
1221
   * Creates an in-memory service along with client stubs for use in Java code. See also
1222
   * createServerOnly and createWithNoGrpcServer.
1223
   *
1224
   * @deprecated use {@link io.temporal.testserver.TestServer#createServer(boolean)} instead
1225
   */
1226
  @Deprecated
1227
  public TestWorkflowService(boolean lockTimeSkipping) {
1228
    this(0, true);
×
1229
    if (lockTimeSkipping) {
×
1230
      this.lockTimeSkipping("constructor");
×
1231
    }
1232
  }
×
1233

1234
  /**
1235
   * Creates an instance of TestWorkflowService that does not manage its own gRPC server. Useful for
1236
   * including in an externally managed gRPC server.
1237
   *
1238
   * @deprecated use {@link TestServicesStarter} to create just the services with gRPC server
1239
   */
1240
  @Deprecated
1241
  public static TestWorkflowService createWithNoGrpcServer() {
1242
    return new TestWorkflowService(0, false);
×
1243
  }
1244

1245
  private TestWorkflowService(long initialTimeMillis, boolean startInProcessServer) {
×
1246
    this.selfAdvancingTimer =
×
1247
        new SelfAdvancingTimerImpl(initialTimeMillis, Clock.systemDefaultZone());
×
1248
    store = new TestWorkflowStoreImpl(this.selfAdvancingTimer);
×
1249
    visibilityStore = new TestVisibilityStoreImpl();
×
1250
    outOfProcessServer = null;
×
1251
    if (startInProcessServer) {
×
1252
      this.inProcessServer = new InProcessGRPCServer(Collections.singletonList(this));
×
1253
      this.workflowServiceStubs =
×
1254
          WorkflowServiceStubs.newServiceStubs(
×
1255
              WorkflowServiceStubsOptions.newBuilder()
×
1256
                  .setChannel(inProcessServer.getChannel())
×
1257
                  .build());
×
1258
    } else {
1259
      this.inProcessServer = null;
×
1260
      this.workflowServiceStubs = null;
×
1261
    }
1262
  }
×
1263

1264
  /**
1265
   * Creates an out-of-process rather than in-process server, and does not set up a client. Useful,
1266
   * for example, if you want to use the test service from other SDKs.
1267
   *
1268
   * @param port the port to listen on
1269
   * @deprecated use {@link io.temporal.testserver.TestServer#createPortBoundServer(int, boolean)}
1270
   *     instead and pass {@code lockTimeSkipping=false} to emulate the behavior of this method
1271
   */
1272
  @Deprecated
1273
  public static TestWorkflowService createServerOnly(int port) {
1274
    TestWorkflowService result = new TestWorkflowService(true, port);
×
1275
    log.info("Server started, listening on " + port);
×
1276
    return result;
×
1277
  }
1278

1279
  private TestWorkflowService(boolean isOutOfProc, int port) {
×
1280
    // isOutOfProc is just here to make unambiguous constructor overloading.
1281
    Preconditions.checkState(isOutOfProc, "Impossible.");
×
1282
    inProcessServer = null;
×
1283
    workflowServiceStubs = null;
×
1284
    this.selfAdvancingTimer = new SelfAdvancingTimerImpl(0, Clock.systemDefaultZone());
×
1285
    store = new TestWorkflowStoreImpl(selfAdvancingTimer);
×
1286
    visibilityStore = new TestVisibilityStoreImpl();
×
1287
    try {
1288
      ServerBuilder<?> serverBuilder =
×
1289
          Grpc.newServerBuilderForPort(port, InsecureServerCredentials.create());
×
1290
      GRPCServerHelper.registerServicesAndHealthChecks(
×
1291
          Collections.singletonList(this), serverBuilder);
×
1292
      outOfProcessServer = serverBuilder.build().start();
×
1293
    } catch (IOException e) {
×
1294
      throw new RuntimeException(e);
×
1295
    }
×
1296
  }
×
1297

1298
  @Deprecated
1299
  public WorkflowServiceStubs newClientStub() {
1300
    if (workflowServiceStubs == null) {
×
1301
      throw new RuntimeException(
×
1302
          "Cannot get a client when you created your TestWorkflowService with createServerOnly.");
1303
    }
1304
    return workflowServiceStubs;
×
1305
  }
1306

1307
  private static StatusRuntimeException createInvalidArgument(String description) {
1308
    throw Status.INVALID_ARGUMENT.withDescription(description).asRuntimeException();
1✔
1309
  }
1310
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc