• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

temporalio / sdk-java / #253

21 May 2024 07:13PM UTC coverage: 77.429% (-0.09%) from 77.517%
#253

push

github

web-flow
Don't return update handles until desired stage reached (#2066)

45 of 54 new or added lines in 7 files covered. (83.33%)

17 existing lines in 5 files now uncovered.

19169 of 24757 relevant lines covered (77.43%)

0.77 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

71.27
/temporal-test-server/src/main/java/io/temporal/internal/testservice/TestWorkflowService.java
1
/*
2
 * Copyright (C) 2022 Temporal Technologies, Inc. All Rights Reserved.
3
 *
4
 * Copyright (C) 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved.
5
 *
6
 * Modifications copyright (C) 2017 Uber Technologies, Inc.
7
 *
8
 * Licensed under the Apache License, Version 2.0 (the "License");
9
 * you may not use this material except in compliance with the License.
10
 * You may obtain a copy of the License at
11
 *
12
 *   http://www.apache.org/licenses/LICENSE-2.0
13
 *
14
 * Unless required by applicable law or agreed to in writing, software
15
 * distributed under the License is distributed on an "AS IS" BASIS,
16
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17
 * See the License for the specific language governing permissions and
18
 * limitations under the License.
19
 */
20

21
package io.temporal.internal.testservice;
22

23
import static io.temporal.internal.testservice.CronUtils.getBackoffInterval;
24

25
import com.google.common.base.Preconditions;
26
import com.google.common.base.Throwables;
27
import com.google.protobuf.ByteString;
28
import com.google.protobuf.Empty;
29
import com.google.protobuf.Timestamp;
30
import com.google.protobuf.util.Timestamps;
31
import io.grpc.*;
32
import io.grpc.stub.StreamObserver;
33
import io.temporal.api.command.v1.ContinueAsNewWorkflowExecutionCommandAttributes;
34
import io.temporal.api.command.v1.SignalExternalWorkflowExecutionCommandAttributes;
35
import io.temporal.api.common.v1.Payload;
36
import io.temporal.api.common.v1.Payloads;
37
import io.temporal.api.common.v1.RetryPolicy;
38
import io.temporal.api.common.v1.WorkflowExecution;
39
import io.temporal.api.enums.v1.NamespaceState;
40
import io.temporal.api.enums.v1.SignalExternalWorkflowExecutionFailedCause;
41
import io.temporal.api.enums.v1.WorkflowExecutionStatus;
42
import io.temporal.api.enums.v1.WorkflowIdReusePolicy;
43
import io.temporal.api.errordetails.v1.WorkflowExecutionAlreadyStartedFailure;
44
import io.temporal.api.failure.v1.Failure;
45
import io.temporal.api.history.v1.WorkflowExecutionContinuedAsNewEventAttributes;
46
import io.temporal.api.namespace.v1.NamespaceInfo;
47
import io.temporal.api.testservice.v1.LockTimeSkippingRequest;
48
import io.temporal.api.testservice.v1.SleepRequest;
49
import io.temporal.api.testservice.v1.UnlockTimeSkippingRequest;
50
import io.temporal.api.workflow.v1.WorkflowExecutionInfo;
51
import io.temporal.api.workflowservice.v1.*;
52
import io.temporal.internal.common.ProtobufTimeUtils;
53
import io.temporal.internal.testservice.TestWorkflowStore.WorkflowState;
54
import io.temporal.serviceclient.StatusUtils;
55
import io.temporal.serviceclient.WorkflowServiceStubs;
56
import io.temporal.serviceclient.WorkflowServiceStubsOptions;
57
import java.io.Closeable;
58
import java.io.IOException;
59
import java.time.Clock;
60
import java.time.Duration;
61
import java.util.*;
62
import java.util.concurrent.*;
63
import java.util.concurrent.locks.Lock;
64
import java.util.concurrent.locks.ReentrantLock;
65
import javax.annotation.Nonnull;
66
import javax.annotation.Nullable;
67
import org.slf4j.Logger;
68
import org.slf4j.LoggerFactory;
69

70
/**
71
 * In memory implementation of the Workflow Service. To be used for testing purposes only.
72
 *
73
 * <p>Do not use directly, instead use {@link io.temporal.testing.TestWorkflowEnvironment}.
74
 */
75
public final class TestWorkflowService extends WorkflowServiceGrpc.WorkflowServiceImplBase
76
    implements Closeable {
77
  private static final Logger log = LoggerFactory.getLogger(TestWorkflowService.class);
1✔
78
  private final Map<ExecutionId, TestWorkflowMutableState> executions = new HashMap<>();
1✔
79
  // key->WorkflowId
80
  private final Map<WorkflowId, TestWorkflowMutableState> executionsByWorkflowId = new HashMap<>();
1✔
81
  private final ExecutorService executor = Executors.newCachedThreadPool();
1✔
82
  private final Lock lock = new ReentrantLock();
1✔
83

84
  private final TestWorkflowStore store;
85
  private final TestVisibilityStore visibilityStore;
86
  private final SelfAdvancingTimer selfAdvancingTimer;
87

88
  private final ScheduledExecutorService backgroundScheduler =
1✔
89
      Executors.newSingleThreadScheduledExecutor();
1✔
90

91
  private final Server outOfProcessServer;
92
  private final InProcessGRPCServer inProcessServer;
93
  private final WorkflowServiceStubs workflowServiceStubs;
94

95
  TestWorkflowService(
96
      TestWorkflowStore store,
97
      TestVisibilityStore visibilityStore,
98
      SelfAdvancingTimer selfAdvancingTimer) {
1✔
99
    this.store = store;
1✔
100
    this.visibilityStore = visibilityStore;
1✔
101
    this.selfAdvancingTimer = selfAdvancingTimer;
1✔
102
    this.outOfProcessServer = null;
1✔
103
    this.inProcessServer = null;
1✔
104
    this.workflowServiceStubs = null;
1✔
105
  }
1✔
106

107
  @Override
108
  public void close() {
109
    log.debug("Shutting down TestWorkflowService");
1✔
110

111
    log.debug("Shutting down background scheduler");
1✔
112
    backgroundScheduler.shutdown();
1✔
113

114
    if (outOfProcessServer != null) {
1✔
115
      log.info("Shutting down out-of-process GRPC server");
×
116
      outOfProcessServer.shutdown();
×
117
    }
118

119
    if (workflowServiceStubs != null) {
1✔
120
      workflowServiceStubs.shutdown();
×
121
    }
122

123
    if (inProcessServer != null) {
1✔
124
      log.info("Shutting down in-process GRPC server");
×
125
      inProcessServer.shutdown();
×
126
    }
127

128
    executor.shutdown();
1✔
129

130
    try {
131
      executor.awaitTermination(1, TimeUnit.SECONDS);
1✔
132

133
      if (outOfProcessServer != null) {
1✔
134
        outOfProcessServer.awaitTermination(1, TimeUnit.SECONDS);
×
135
      }
136

137
      if (workflowServiceStubs != null) {
1✔
138
        workflowServiceStubs.awaitTermination(1, TimeUnit.SECONDS);
×
139
      }
140

141
      if (inProcessServer != null) {
1✔
142
        inProcessServer.awaitTermination(1, TimeUnit.SECONDS);
×
143
      }
144

145
    } catch (InterruptedException e) {
×
146
      Thread.currentThread().interrupt();
×
147
      log.debug("shutdown interrupted", e);
×
148
    }
1✔
149

150
    store.close();
1✔
151
  }
1✔
152

153
  private TestWorkflowMutableState getMutableState(ExecutionId executionId) {
154
    return getMutableState(executionId, true);
1✔
155
  }
156

157
  private TestWorkflowMutableState getMutableState(ExecutionId executionId, boolean failNotExists) {
158
    lock.lock();
1✔
159
    try {
160
      if (executionId.getExecution().getRunId().isEmpty()) {
1✔
161
        return getMutableState(executionId.getWorkflowId(), failNotExists);
1✔
162
      }
163
      TestWorkflowMutableState mutableState = executions.get(executionId);
1✔
164
      if (mutableState == null && failNotExists) {
1✔
165
        throw Status.NOT_FOUND
1✔
166
            .withDescription(
1✔
167
                "Execution \""
168
                    + executionId
169
                    + "\" not found in mutable state. Known executions: "
170
                    + executions.values()
1✔
171
                    + ", service="
172
                    + this)
173
            .asRuntimeException();
1✔
174
      }
175
      return mutableState;
1✔
176
    } finally {
177
      lock.unlock();
1✔
178
    }
179
  }
180

181
  private TestWorkflowMutableState getMutableState(WorkflowId workflowId, boolean failNotExists) {
182
    lock.lock();
1✔
183
    try {
184
      TestWorkflowMutableState mutableState = executionsByWorkflowId.get(workflowId);
1✔
185
      if (mutableState == null && failNotExists) {
1✔
186
        throw Status.NOT_FOUND
1✔
187
            .withDescription("Execution not found in mutable state: " + workflowId)
1✔
188
            .asRuntimeException();
1✔
189
      }
190
      return mutableState;
1✔
191
    } finally {
192
      lock.unlock();
1✔
193
    }
194
  }
195

196
  @Override
197
  public void startWorkflowExecution(
198
      StartWorkflowExecutionRequest request,
199
      StreamObserver<StartWorkflowExecutionResponse> responseObserver) {
200
    try {
201
      Duration backoffInterval = getBackoffInterval(request.getCronSchedule(), store.currentTime());
1✔
202
      StartWorkflowExecutionResponse response =
1✔
203
          startWorkflowExecutionImpl(
1✔
204
              request, backoffInterval, Optional.empty(), OptionalLong.empty(), null);
1✔
205
      responseObserver.onNext(response);
1✔
206
      responseObserver.onCompleted();
1✔
207
    } catch (StatusRuntimeException e) {
1✔
208
      handleStatusRuntimeException(e, responseObserver);
1✔
209
    }
1✔
210
  }
1✔
211

212
  StartWorkflowExecutionResponse startWorkflowExecutionImpl(
213
      StartWorkflowExecutionRequest startRequest,
214
      Duration backoffStartInterval,
215
      Optional<TestWorkflowMutableState> parent,
216
      OptionalLong parentChildInitiatedEventId,
217
      @Nullable SignalWorkflowExecutionRequest signalWithStartSignal) {
218
    String requestWorkflowId = requireNotNull("WorkflowId", startRequest.getWorkflowId());
1✔
219
    String namespace = requireNotNull("Namespace", startRequest.getNamespace());
1✔
220
    WorkflowId workflowId = new WorkflowId(namespace, requestWorkflowId);
1✔
221
    TestWorkflowMutableState existing;
222
    lock.lock();
1✔
223
    try {
224
      String newRunId = UUID.randomUUID().toString();
1✔
225
      existing = executionsByWorkflowId.get(workflowId);
1✔
226
      if (existing != null) {
1✔
227
        WorkflowExecutionStatus status = existing.getWorkflowExecutionStatus();
1✔
228
        WorkflowIdReusePolicy policy = startRequest.getWorkflowIdReusePolicy();
1✔
229

230
        if (status == WorkflowExecutionStatus.WORKFLOW_EXECUTION_STATUS_RUNNING
1✔
231
            && policy == WorkflowIdReusePolicy.WORKFLOW_ID_REUSE_POLICY_TERMINATE_IF_RUNNING) {
232
          existing.terminateWorkflowExecution(
1✔
233
              TerminateWorkflowExecutionRequest.newBuilder()
1✔
234
                  .setNamespace(startRequest.getNamespace())
1✔
235
                  .setWorkflowExecution(existing.getExecutionId().getExecution())
1✔
236
                  .setReason("TerminateIfRunning WorkflowIdReusePolicy Policy")
1✔
237
                  .setIdentity("history-service")
1✔
238
                  .setDetails(
1✔
239
                      Payloads.newBuilder()
1✔
240
                          .addPayloads(
1✔
241
                              Payload.newBuilder()
1✔
242
                                  .setData(
1✔
243
                                      ByteString.copyFromUtf8(
1✔
244
                                          String.format("terminated by new runID: %s", newRunId)))
1✔
245
                                  .build())
1✔
246
                          .build())
1✔
247
                  .build());
1✔
248
        } else if (status == WorkflowExecutionStatus.WORKFLOW_EXECUTION_STATUS_RUNNING
1✔
249
            || policy == WorkflowIdReusePolicy.WORKFLOW_ID_REUSE_POLICY_REJECT_DUPLICATE) {
250
          return throwDuplicatedWorkflow(startRequest, existing);
×
251
        } else if (policy
1✔
252
                == WorkflowIdReusePolicy.WORKFLOW_ID_REUSE_POLICY_ALLOW_DUPLICATE_FAILED_ONLY
253
            && (status == WorkflowExecutionStatus.WORKFLOW_EXECUTION_STATUS_COMPLETED
254
                || status == WorkflowExecutionStatus.WORKFLOW_EXECUTION_STATUS_CONTINUED_AS_NEW)) {
255
          return throwDuplicatedWorkflow(startRequest, existing);
×
256
        }
257
      }
258
      Optional<TestServiceRetryState> retryState;
259
      Optional<Failure> lastFailure = Optional.empty();
1✔
260
      if (startRequest.hasRetryPolicy()) {
1✔
261
        Duration expirationInterval =
1✔
262
            ProtobufTimeUtils.toJavaDuration(startRequest.getWorkflowExecutionTimeout());
1✔
263
        retryState = newRetryStateLocked(startRequest.getRetryPolicy(), expirationInterval);
1✔
264
        if (retryState.isPresent()) {
1✔
265
          lastFailure = retryState.get().getPreviousRunFailure();
1✔
266
        }
267
      } else {
1✔
268
        retryState = Optional.empty();
1✔
269
      }
270
      return startWorkflowExecutionNoRunningCheckLocked(
1✔
271
          startRequest,
272
          newRunId,
273
          // it's the first execution in the continue-as-new chain, so firstExecutionRunId =
274
          // newRunId
275
          newRunId,
276
          Optional.empty(),
1✔
277
          retryState,
278
          backoffStartInterval,
279
          null,
280
          lastFailure,
281
          parent,
282
          parentChildInitiatedEventId,
283
          signalWithStartSignal,
284
          workflowId);
285
    } finally {
286
      lock.unlock();
1✔
287
    }
288
  }
289

290
  private Optional<TestServiceRetryState> newRetryStateLocked(
291
      RetryPolicy retryPolicy, Duration expirationInterval) {
292
    Timestamp expirationTime =
293
        expirationInterval.isZero()
1✔
294
            ? Timestamps.fromNanos(0)
1✔
295
            : Timestamps.add(
1✔
296
                store.currentTime(), ProtobufTimeUtils.toProtoDuration(expirationInterval));
1✔
297
    return Optional.of(new TestServiceRetryState(retryPolicy, expirationTime));
1✔
298
  }
299

300
  private StartWorkflowExecutionResponse throwDuplicatedWorkflow(
301
      StartWorkflowExecutionRequest startRequest, TestWorkflowMutableState existing) {
302
    WorkflowExecution execution = existing.getExecutionId().getExecution();
1✔
303
    WorkflowExecutionAlreadyStartedFailure error =
304
        WorkflowExecutionAlreadyStartedFailure.newBuilder()
1✔
305
            .setRunId(execution.getRunId())
1✔
306
            .setStartRequestId(startRequest.getRequestId())
1✔
307
            .build();
1✔
308
    throw StatusUtils.newException(
1✔
309
        Status.ALREADY_EXISTS.withDescription(
1✔
310
            String.format(
1✔
311
                "WorkflowId: %s, " + "RunId: %s", execution.getWorkflowId(), execution.getRunId())),
1✔
312
        error,
313
        WorkflowExecutionAlreadyStartedFailure.getDescriptor());
1✔
314
  }
315

316
  private StartWorkflowExecutionResponse startWorkflowExecutionNoRunningCheckLocked(
317
      StartWorkflowExecutionRequest startRequest,
318
      @Nonnull String runId,
319
      @Nonnull String firstExecutionRunId,
320
      Optional<String> continuedExecutionRunId,
321
      Optional<TestServiceRetryState> retryState,
322
      Duration backoffStartInterval,
323
      Payloads lastCompletionResult,
324
      Optional<Failure> lastFailure,
325
      Optional<TestWorkflowMutableState> parent,
326
      OptionalLong parentChildInitiatedEventId,
327
      @Nullable SignalWorkflowExecutionRequest signalWithStartSignal,
328
      WorkflowId workflowId) {
329
    String namespace = startRequest.getNamespace();
1✔
330
    TestWorkflowMutableState mutableState =
1✔
331
        new TestWorkflowMutableStateImpl(
332
            startRequest,
333
            firstExecutionRunId,
334
            runId,
335
            retryState,
336
            backoffStartInterval,
337
            lastCompletionResult,
338
            lastFailure,
339
            parent,
340
            parentChildInitiatedEventId,
341
            continuedExecutionRunId,
342
            this,
343
            store,
344
            visibilityStore,
345
            selfAdvancingTimer);
346
    WorkflowExecution execution = mutableState.getExecutionId().getExecution();
1✔
347
    ExecutionId executionId = new ExecutionId(namespace, execution);
1✔
348
    executionsByWorkflowId.put(workflowId, mutableState);
1✔
349
    executions.put(executionId, mutableState);
1✔
350

351
    PollWorkflowTaskQueueRequest eagerWorkflowTaskPollRequest =
352
        startRequest.getRequestEagerExecution()
1✔
353
            ? PollWorkflowTaskQueueRequest.newBuilder()
1✔
354
                .setIdentity(startRequest.getIdentity())
1✔
355
                .setNamespace(startRequest.getNamespace())
1✔
356
                .setTaskQueue(startRequest.getTaskQueue())
1✔
357
                .build()
1✔
358
            : null;
1✔
359

360
    @Nullable
361
    PollWorkflowTaskQueueResponse eagerWorkflowTask =
1✔
362
        mutableState.startWorkflow(
1✔
363
            continuedExecutionRunId.isPresent(),
1✔
364
            signalWithStartSignal,
365
            eagerWorkflowTaskPollRequest);
366
    StartWorkflowExecutionResponse.Builder response =
367
        StartWorkflowExecutionResponse.newBuilder().setRunId(execution.getRunId());
1✔
368
    if (eagerWorkflowTask != null) {
1✔
369
      response.setEagerWorkflowTask(eagerWorkflowTask);
1✔
370
    }
371
    return response.build();
1✔
372
  }
373

374
  @Override
375
  public void getWorkflowExecutionHistory(
376
      GetWorkflowExecutionHistoryRequest getRequest,
377
      StreamObserver<GetWorkflowExecutionHistoryResponse> responseObserver) {
378
    ExecutionId executionId = new ExecutionId(getRequest.getNamespace(), getRequest.getExecution());
1✔
379
    executor.execute(
1✔
380
        // preserving gRPC context deadline between threads
381
        Context.current()
1✔
382
            .wrap(
1✔
383
                () -> {
384
                  try {
385
                    TestWorkflowMutableState mutableState = getMutableState(executionId);
1✔
386
                    responseObserver.onNext(
1✔
387
                        store.getWorkflowExecutionHistory(
1✔
388
                            mutableState.getExecutionId(),
1✔
389
                            getRequest,
390
                            // We explicitly don't try to respond inside the context deadline.
391
                            // If we try to fit into the context deadline, the deadline may be not
392
                            // expired on the client side and an empty response will lead to a new
393
                            // request, making the client hammer the server at the tail end of the
394
                            // deadline.
395
                            // So this call is designed to wait fully till the end of the
396
                            // context deadline and throw DEADLINE_EXCEEDED if the deadline is less
397
                            // than 20s.
398
                            // If it's longer than 20 seconds - we return an empty result.
399
                            Deadline.after(20, TimeUnit.SECONDS)));
1✔
400
                    responseObserver.onCompleted();
1✔
401
                  } catch (StatusRuntimeException e) {
1✔
402
                    if (e.getStatus().getCode() == Status.Code.INTERNAL) {
1✔
403
                      log.error("unexpected", e);
×
404
                    }
405
                    responseObserver.onError(e);
1✔
406
                  } catch (Exception e) {
×
407
                    log.error("unexpected", e);
×
408
                    responseObserver.onError(e);
×
409
                  }
1✔
410
                }));
1✔
411
  }
1✔
412

413
  private <T> T pollTaskQueue(Context ctx, Future<T> futureValue)
414
      throws ExecutionException, InterruptedException {
415
    final Context.CancellationListener canceler = context -> futureValue.cancel(true);
1✔
416
    ctx.addListener(canceler, this.backgroundScheduler);
1✔
417
    try {
418
      return futureValue.get();
1✔
419
    } finally {
420
      ctx.removeListener(canceler);
1✔
421
    }
422
  }
423

424
  @Override
425
  public void pollWorkflowTaskQueue(
426
      PollWorkflowTaskQueueRequest pollRequest,
427
      StreamObserver<PollWorkflowTaskQueueResponse> responseObserver) {
428
    try (Context.CancellableContext ctx = deadlineCtx(getLongPollDeadline())) {
1✔
429
      PollWorkflowTaskQueueResponse.Builder task;
430
      try {
431
        task = pollTaskQueue(ctx, store.pollWorkflowTaskQueue(pollRequest));
1✔
432
      } catch (ExecutionException e) {
×
433
        responseObserver.onError(e);
×
434
        return;
×
435
      } catch (InterruptedException e) {
×
436
        Thread.currentThread().interrupt();
×
437
        responseObserver.onNext(PollWorkflowTaskQueueResponse.getDefaultInstance());
×
438
        responseObserver.onCompleted();
×
439
        return;
×
440
      } catch (CancellationException e) {
1✔
441
        responseObserver.onNext(PollWorkflowTaskQueueResponse.getDefaultInstance());
1✔
442
        responseObserver.onCompleted();
1✔
443
        return;
1✔
444
      }
1✔
445

446
      ExecutionId executionId =
1✔
447
          new ExecutionId(pollRequest.getNamespace(), task.getWorkflowExecution());
1✔
448
      TestWorkflowMutableState mutableState = getMutableState(executionId);
1✔
449
      try {
450
        mutableState.startWorkflowTask(task, pollRequest);
1✔
451
        // The task always has the original task queue that was created as part of the response.
452
        // This may be a different task queue than the task queue it was scheduled on, as in the
453
        // case of sticky execution.
454
        task.setWorkflowExecutionTaskQueue(mutableState.getStartRequest().getTaskQueue());
1✔
455
        PollWorkflowTaskQueueResponse response = task.build();
1✔
456
        responseObserver.onNext(response);
1✔
457
        responseObserver.onCompleted();
1✔
458
      } catch (StatusRuntimeException e) {
×
459
        if (e.getStatus().getCode() == Status.Code.NOT_FOUND) {
×
460
          if (log.isDebugEnabled()) {
×
461
            log.debug("Skipping outdated workflow task for " + executionId, e);
×
462
          }
463
          // The real service doesn't return this call on outdated task.
464
          // For simplicity, we return an empty result here.
465
          responseObserver.onNext(PollWorkflowTaskQueueResponse.getDefaultInstance());
×
466
          responseObserver.onCompleted();
×
467
        } else {
468
          if (e.getStatus().getCode() == Status.Code.INTERNAL) {
×
469
            log.error("unexpected", e);
×
470
          }
471
          responseObserver.onError(e);
×
472
        }
473
      }
1✔
474
    }
1✔
475
  }
1✔
476

477
  @Override
478
  public void respondWorkflowTaskCompleted(
479
      RespondWorkflowTaskCompletedRequest request,
480
      StreamObserver<RespondWorkflowTaskCompletedResponse> responseObserver) {
481
    try {
482
      WorkflowTaskToken taskToken = WorkflowTaskToken.fromBytes(request.getTaskToken());
1✔
483
      TestWorkflowMutableState mutableState = getMutableState(taskToken.getExecutionId());
1✔
484
      mutableState.completeWorkflowTask(taskToken.getHistorySize(), request);
1✔
485
      responseObserver.onNext(RespondWorkflowTaskCompletedResponse.getDefaultInstance());
1✔
486
      responseObserver.onCompleted();
1✔
487
    } catch (StatusRuntimeException e) {
1✔
488
      handleStatusRuntimeException(e, responseObserver);
1✔
489
    } catch (Throwable e) {
×
490
      responseObserver.onError(
×
491
          Status.INTERNAL
492
              .withDescription(Throwables.getStackTraceAsString(e))
×
493
              .withCause(e)
×
494
              .asRuntimeException());
×
495
    }
1✔
496
  }
1✔
497

498
  @Override
499
  public void respondWorkflowTaskFailed(
500
      RespondWorkflowTaskFailedRequest failedRequest,
501
      StreamObserver<RespondWorkflowTaskFailedResponse> responseObserver) {
502
    try {
503
      WorkflowTaskToken taskToken = WorkflowTaskToken.fromBytes(failedRequest.getTaskToken());
1✔
504
      TestWorkflowMutableState mutableState = getMutableState(taskToken.getExecutionId());
1✔
505
      mutableState.failWorkflowTask(failedRequest);
1✔
506
      responseObserver.onNext(RespondWorkflowTaskFailedResponse.getDefaultInstance());
1✔
507
      responseObserver.onCompleted();
1✔
508
    } catch (StatusRuntimeException e) {
×
509
      handleStatusRuntimeException(e, responseObserver);
×
510
    }
1✔
511
  }
1✔
512

513
  @Override
514
  public void getSystemInfo(
515
      GetSystemInfoRequest request, StreamObserver<GetSystemInfoResponse> responseObserver) {
516
    responseObserver.onNext(
1✔
517
        GetSystemInfoResponse.newBuilder()
1✔
518
            .setCapabilities(
1✔
519
                // These are the capabilities I could verify the test server supports
520
                GetSystemInfoResponse.Capabilities.newBuilder()
1✔
521
                    .setSdkMetadata(true)
1✔
522
                    .setSignalAndQueryHeader(true)
1✔
523
                    .setEncodedFailureAttributes(true)
1✔
524
                    .setEagerWorkflowStart(true)
1✔
525
                    .build())
1✔
526
            .build());
1✔
527
    responseObserver.onCompleted();
1✔
528
  }
1✔
529

530
  private Context.CancellableContext deadlineCtx(Deadline deadline) {
531
    return Context.current().withDeadline(deadline, this.backgroundScheduler);
1✔
532
  }
533

534
  @Override
535
  public void pollActivityTaskQueue(
536
      PollActivityTaskQueueRequest pollRequest,
537
      StreamObserver<PollActivityTaskQueueResponse> responseObserver) {
538
    try (Context.CancellableContext ctx = deadlineCtx(getLongPollDeadline())) {
1✔
539

540
      PollActivityTaskQueueResponse.Builder task;
541
      try {
542
        task = pollTaskQueue(ctx, store.pollActivityTaskQueue(pollRequest));
1✔
543
      } catch (ExecutionException e) {
×
544
        responseObserver.onError(e);
×
545
        return;
×
546
      } catch (InterruptedException e) {
×
547
        Thread.currentThread().interrupt();
×
548
        responseObserver.onNext(PollActivityTaskQueueResponse.getDefaultInstance());
×
549
        responseObserver.onCompleted();
×
550
        return;
×
551
      } catch (CancellationException e) {
1✔
552
        responseObserver.onNext(PollActivityTaskQueueResponse.getDefaultInstance());
1✔
553
        responseObserver.onCompleted();
1✔
554
        return;
1✔
555
      }
1✔
556

557
      ExecutionId executionId =
1✔
558
          new ExecutionId(pollRequest.getNamespace(), task.getWorkflowExecution());
1✔
559
      TestWorkflowMutableState mutableState = getMutableState(executionId);
1✔
560
      try {
561
        mutableState.startActivityTask(task, pollRequest);
1✔
562
        responseObserver.onNext(task.build());
1✔
563
        responseObserver.onCompleted();
1✔
564
      } catch (StatusRuntimeException e) {
1✔
565
        if (e.getStatus().getCode() == Status.Code.NOT_FOUND) {
1✔
566
          if (log.isDebugEnabled()) {
1✔
567
            log.debug("Skipping outdated activity task for " + executionId, e);
×
568
          }
569
          responseObserver.onNext(PollActivityTaskQueueResponse.getDefaultInstance());
1✔
570
          responseObserver.onCompleted();
1✔
571
        } else {
572
          if (e.getStatus().getCode() == Status.Code.INTERNAL) {
×
573
            log.error("unexpected", e);
×
574
          }
575
          responseObserver.onError(e);
×
576
        }
577
      }
1✔
578
    }
1✔
579
  }
1✔
580

581
  @Override
582
  public void recordActivityTaskHeartbeat(
583
      RecordActivityTaskHeartbeatRequest heartbeatRequest,
584
      StreamObserver<RecordActivityTaskHeartbeatResponse> responseObserver) {
585
    try {
586
      ActivityTaskToken activityTaskToken =
1✔
587
          ActivityTaskToken.fromBytes(heartbeatRequest.getTaskToken());
1✔
588
      TestWorkflowMutableState mutableState = getMutableState(activityTaskToken.getExecutionId());
1✔
589
      boolean cancelRequested =
1✔
590
          mutableState.heartbeatActivityTask(
1✔
591
              activityTaskToken.getScheduledEventId(), heartbeatRequest.getDetails());
1✔
592
      responseObserver.onNext(
1✔
593
          RecordActivityTaskHeartbeatResponse.newBuilder()
1✔
594
              .setCancelRequested(cancelRequested)
1✔
595
              .build());
1✔
596
      responseObserver.onCompleted();
1✔
597
    } catch (StatusRuntimeException e) {
1✔
598
      handleStatusRuntimeException(e, responseObserver);
1✔
599
    }
1✔
600
  }
1✔
601

602
  @Override
603
  public void recordActivityTaskHeartbeatById(
604
      RecordActivityTaskHeartbeatByIdRequest heartbeatRequest,
605
      StreamObserver<RecordActivityTaskHeartbeatByIdResponse> responseObserver) {
606
    try {
607
      ExecutionId execution =
×
608
          new ExecutionId(
609
              heartbeatRequest.getNamespace(),
×
610
              heartbeatRequest.getWorkflowId(),
×
611
              heartbeatRequest.getRunId());
×
612
      TestWorkflowMutableState mutableState = getMutableState(execution);
×
613
      boolean cancelRequested =
×
614
          mutableState.heartbeatActivityTaskById(
×
615
              heartbeatRequest.getActivityId(),
×
616
              heartbeatRequest.getDetails(),
×
617
              heartbeatRequest.getIdentity());
×
618
      responseObserver.onNext(
×
619
          RecordActivityTaskHeartbeatByIdResponse.newBuilder()
×
620
              .setCancelRequested(cancelRequested)
×
621
              .build());
×
622
      responseObserver.onCompleted();
×
623
    } catch (StatusRuntimeException e) {
×
624
      handleStatusRuntimeException(e, responseObserver);
×
625
    }
×
626
  }
×
627

628
  @Override
629
  public void respondActivityTaskCompleted(
630
      RespondActivityTaskCompletedRequest completeRequest,
631
      StreamObserver<RespondActivityTaskCompletedResponse> responseObserver) {
632
    try {
633
      ActivityTaskToken activityTaskToken =
1✔
634
          ActivityTaskToken.fromBytes(completeRequest.getTaskToken());
1✔
635
      TestWorkflowMutableState mutableState = getMutableState(activityTaskToken.getExecutionId());
1✔
636
      mutableState.completeActivityTask(activityTaskToken.getScheduledEventId(), completeRequest);
1✔
637
      responseObserver.onNext(RespondActivityTaskCompletedResponse.getDefaultInstance());
1✔
638
      responseObserver.onCompleted();
1✔
639
    } catch (StatusRuntimeException e) {
1✔
640
      handleStatusRuntimeException(e, responseObserver);
1✔
641
    }
1✔
642
  }
1✔
643

644
  @Override
645
  public void respondActivityTaskCompletedById(
646
      RespondActivityTaskCompletedByIdRequest completeRequest,
647
      StreamObserver<RespondActivityTaskCompletedByIdResponse> responseObserver) {
648
    try {
649
      ExecutionId executionId =
×
650
          new ExecutionId(
651
              completeRequest.getNamespace(),
×
652
              completeRequest.getWorkflowId(),
×
653
              completeRequest.getRunId());
×
654
      TestWorkflowMutableState mutableState = getMutableState(executionId);
×
655
      mutableState.completeActivityTaskById(completeRequest.getActivityId(), completeRequest);
×
656
      responseObserver.onNext(RespondActivityTaskCompletedByIdResponse.getDefaultInstance());
×
657
      responseObserver.onCompleted();
×
658
    } catch (StatusRuntimeException e) {
×
659
      handleStatusRuntimeException(e, responseObserver);
×
660
    }
×
661
  }
×
662

663
  @Override
664
  public void respondActivityTaskFailed(
665
      RespondActivityTaskFailedRequest failRequest,
666
      StreamObserver<RespondActivityTaskFailedResponse> responseObserver) {
667
    try {
668
      ActivityTaskToken activityTaskToken = ActivityTaskToken.fromBytes(failRequest.getTaskToken());
1✔
669
      TestWorkflowMutableState mutableState = getMutableState(activityTaskToken.getExecutionId());
1✔
670
      mutableState.failActivityTask(activityTaskToken.getScheduledEventId(), failRequest);
1✔
671
      responseObserver.onNext(RespondActivityTaskFailedResponse.getDefaultInstance());
1✔
672
      responseObserver.onCompleted();
1✔
673
    } catch (StatusRuntimeException e) {
1✔
674
      handleStatusRuntimeException(e, responseObserver);
1✔
675
    }
1✔
676
  }
1✔
677

678
  @Override
679
  public void respondActivityTaskFailedById(
680
      RespondActivityTaskFailedByIdRequest failRequest,
681
      StreamObserver<RespondActivityTaskFailedByIdResponse> responseObserver) {
682
    try {
683
      ExecutionId executionId =
×
684
          new ExecutionId(
685
              failRequest.getNamespace(), failRequest.getWorkflowId(), failRequest.getRunId());
×
686
      TestWorkflowMutableState mutableState = getMutableState(executionId);
×
687
      mutableState.failActivityTaskById(failRequest.getActivityId(), failRequest);
×
688
      responseObserver.onNext(RespondActivityTaskFailedByIdResponse.getDefaultInstance());
×
689
      responseObserver.onCompleted();
×
690
    } catch (StatusRuntimeException e) {
×
691
      handleStatusRuntimeException(e, responseObserver);
×
692
    }
×
693
  }
×
694

695
  @Override
696
  public void respondActivityTaskCanceled(
697
      RespondActivityTaskCanceledRequest canceledRequest,
698
      StreamObserver<RespondActivityTaskCanceledResponse> responseObserver) {
699
    try {
700
      ActivityTaskToken activityTaskToken =
1✔
701
          ActivityTaskToken.fromBytes(canceledRequest.getTaskToken());
1✔
702
      TestWorkflowMutableState mutableState = getMutableState(activityTaskToken.getExecutionId());
1✔
703
      mutableState.cancelActivityTask(activityTaskToken.getScheduledEventId(), canceledRequest);
1✔
704
      responseObserver.onNext(RespondActivityTaskCanceledResponse.getDefaultInstance());
1✔
705
      responseObserver.onCompleted();
1✔
UNCOV
706
    } catch (StatusRuntimeException e) {
×
UNCOV
707
      handleStatusRuntimeException(e, responseObserver);
×
708
    }
1✔
709
  }
1✔
710

711
  @Override
712
  public void respondActivityTaskCanceledById(
713
      RespondActivityTaskCanceledByIdRequest canceledRequest,
714
      StreamObserver<RespondActivityTaskCanceledByIdResponse> responseObserver) {
715
    try {
716
      ExecutionId executionId =
×
717
          new ExecutionId(
718
              canceledRequest.getNamespace(),
×
719
              canceledRequest.getWorkflowId(),
×
720
              canceledRequest.getRunId());
×
721
      TestWorkflowMutableState mutableState = getMutableState(executionId);
×
722
      mutableState.cancelActivityTaskById(canceledRequest.getActivityId(), canceledRequest);
×
723
      responseObserver.onNext(RespondActivityTaskCanceledByIdResponse.getDefaultInstance());
×
724
      responseObserver.onCompleted();
×
725
    } catch (StatusRuntimeException e) {
×
726
      handleStatusRuntimeException(e, responseObserver);
×
727
    }
×
728
  }
×
729

730
  @Override
731
  public void requestCancelWorkflowExecution(
732
      RequestCancelWorkflowExecutionRequest cancelRequest,
733
      StreamObserver<RequestCancelWorkflowExecutionResponse> responseObserver) {
734
    try {
735
      requestCancelWorkflowExecution(cancelRequest, Optional.empty());
1✔
736
      responseObserver.onNext(RequestCancelWorkflowExecutionResponse.getDefaultInstance());
1✔
737
      responseObserver.onCompleted();
1✔
738
    } catch (StatusRuntimeException e) {
1✔
739
      handleStatusRuntimeException(e, responseObserver);
1✔
740
    }
1✔
741
  }
1✔
742

743
  void requestCancelWorkflowExecution(
744
      RequestCancelWorkflowExecutionRequest cancelRequest,
745
      Optional<TestWorkflowMutableStateImpl.CancelExternalWorkflowExecutionCallerInfo> callerInfo) {
746
    ExecutionId executionId =
1✔
747
        new ExecutionId(cancelRequest.getNamespace(), cancelRequest.getWorkflowExecution());
1✔
748
    TestWorkflowMutableState mutableState = getMutableState(executionId);
1✔
749
    mutableState.requestCancelWorkflowExecution(cancelRequest, callerInfo);
1✔
750
  }
1✔
751

752
  @Override
753
  public void terminateWorkflowExecution(
754
      TerminateWorkflowExecutionRequest request,
755
      StreamObserver<TerminateWorkflowExecutionResponse> responseObserver) {
756
    try {
757
      terminateWorkflowExecution(request);
1✔
758
      responseObserver.onNext(TerminateWorkflowExecutionResponse.getDefaultInstance());
1✔
759
      responseObserver.onCompleted();
1✔
760
    } catch (StatusRuntimeException e) {
1✔
761
      handleStatusRuntimeException(e, responseObserver);
1✔
762
    }
1✔
763
  }
1✔
764

765
  private void terminateWorkflowExecution(TerminateWorkflowExecutionRequest request) {
766
    ExecutionId executionId =
1✔
767
        new ExecutionId(request.getNamespace(), request.getWorkflowExecution());
1✔
768
    TestWorkflowMutableState mutableState = getMutableState(executionId);
1✔
769
    mutableState.terminateWorkflowExecution(request);
1✔
770
  }
1✔
771

772
  @Override
773
  public void signalWorkflowExecution(
774
      SignalWorkflowExecutionRequest signalRequest,
775
      StreamObserver<SignalWorkflowExecutionResponse> responseObserver) {
776
    try {
777
      ExecutionId executionId =
1✔
778
          new ExecutionId(signalRequest.getNamespace(), signalRequest.getWorkflowExecution());
1✔
779
      TestWorkflowMutableState mutableState = getMutableState(executionId);
1✔
780
      mutableState.signal(signalRequest);
1✔
781
      responseObserver.onNext(SignalWorkflowExecutionResponse.getDefaultInstance());
1✔
782
      responseObserver.onCompleted();
1✔
783
    } catch (StatusRuntimeException e) {
1✔
784
      handleStatusRuntimeException(e, responseObserver);
1✔
785
    }
1✔
786
  }
1✔
787

788
  @Override
789
  public void updateWorkflowExecution(
790
      UpdateWorkflowExecutionRequest request,
791
      StreamObserver<UpdateWorkflowExecutionResponse> responseObserver) {
792
    try {
793
      ExecutionId executionId =
1✔
794
          new ExecutionId(request.getNamespace(), request.getWorkflowExecution());
1✔
795
      TestWorkflowMutableState mutableState = getMutableState(executionId);
1✔
796
      @Nullable Deadline deadline = Context.current().getDeadline();
1✔
797
      UpdateWorkflowExecutionResponse response =
1✔
798
          mutableState.updateWorkflowExecution(request, deadline);
1✔
799
      responseObserver.onNext(response);
1✔
800
      responseObserver.onCompleted();
1✔
801
    } catch (StatusRuntimeException e) {
1✔
802
      handleStatusRuntimeException(e, responseObserver);
1✔
803
    }
1✔
804
  }
1✔
805

806
  @Override
807
  public void pollWorkflowExecutionUpdate(
808
      PollWorkflowExecutionUpdateRequest request,
809
      StreamObserver<PollWorkflowExecutionUpdateResponse> responseObserver) {
810
    try {
811
      ExecutionId executionId =
1✔
812
          new ExecutionId(request.getNamespace(), request.getUpdateRef().getWorkflowExecution());
1✔
813
      TestWorkflowMutableState mutableState = getMutableState(executionId);
1✔
814
      @Nullable Deadline deadline = Context.current().getDeadline();
1✔
815
      PollWorkflowExecutionUpdateResponse response =
1✔
816
          mutableState.pollUpdateWorkflowExecution(request, deadline);
1✔
817
      responseObserver.onNext(response);
1✔
818
      responseObserver.onCompleted();
1✔
819
    } catch (StatusRuntimeException e) {
1✔
820
      handleStatusRuntimeException(e, responseObserver);
1✔
821
    }
1✔
822
  }
1✔
823

824
  @Override
825
  public void signalWithStartWorkflowExecution(
826
      SignalWithStartWorkflowExecutionRequest r,
827
      StreamObserver<SignalWithStartWorkflowExecutionResponse> responseObserver) {
828
    try {
829
      if (!r.hasTaskQueue()) {
1✔
830
        throw Status.INVALID_ARGUMENT
×
831
            .withDescription("request missing required taskQueue field")
×
832
            .asRuntimeException();
×
833
      }
834
      if (!r.hasWorkflowType()) {
1✔
835
        throw Status.INVALID_ARGUMENT
×
836
            .withDescription("request missing required workflowType field")
×
837
            .asRuntimeException();
×
838
      }
839
      ExecutionId executionId = new ExecutionId(r.getNamespace(), r.getWorkflowId(), null);
1✔
840
      TestWorkflowMutableState mutableState = getMutableState(executionId, false);
1✔
841
      SignalWorkflowExecutionRequest signalRequest =
842
          SignalWorkflowExecutionRequest.newBuilder()
1✔
843
              .setInput(r.getSignalInput())
1✔
844
              .setSignalName(r.getSignalName())
1✔
845
              .setWorkflowExecution(executionId.getExecution())
1✔
846
              .setRequestId(r.getRequestId())
1✔
847
              .setControl(r.getControl())
1✔
848
              .setNamespace(r.getNamespace())
1✔
849
              .setIdentity(r.getIdentity())
1✔
850
              .build();
1✔
851
      if (mutableState != null && !mutableState.isTerminalState()) {
1✔
852
        mutableState.signal(signalRequest);
1✔
853
        responseObserver.onNext(
1✔
854
            SignalWithStartWorkflowExecutionResponse.newBuilder()
1✔
855
                .setRunId(mutableState.getExecutionId().getExecution().getRunId())
1✔
856
                .build());
1✔
857
        responseObserver.onCompleted();
1✔
858
        return;
1✔
859
      }
860
      StartWorkflowExecutionRequest.Builder startRequest =
861
          StartWorkflowExecutionRequest.newBuilder()
1✔
862
              .setRequestId(r.getRequestId())
1✔
863
              .setInput(r.getInput())
1✔
864
              .setWorkflowExecutionTimeout(r.getWorkflowExecutionTimeout())
1✔
865
              .setWorkflowRunTimeout(r.getWorkflowRunTimeout())
1✔
866
              .setWorkflowTaskTimeout(r.getWorkflowTaskTimeout())
1✔
867
              .setNamespace(r.getNamespace())
1✔
868
              .setTaskQueue(r.getTaskQueue())
1✔
869
              .setWorkflowId(r.getWorkflowId())
1✔
870
              .setWorkflowIdReusePolicy(r.getWorkflowIdReusePolicy())
1✔
871
              .setIdentity(r.getIdentity())
1✔
872
              .setWorkflowType(r.getWorkflowType())
1✔
873
              .setCronSchedule(r.getCronSchedule())
1✔
874
              .setRequestId(r.getRequestId());
1✔
875
      if (r.hasRetryPolicy()) {
1✔
876
        startRequest.setRetryPolicy(r.getRetryPolicy());
×
877
      }
878
      if (r.hasHeader()) {
1✔
879
        startRequest.setHeader(r.getHeader());
1✔
880
      }
881
      if (r.hasMemo()) {
1✔
882
        startRequest.setMemo(r.getMemo());
×
883
      }
884
      if (r.hasSearchAttributes()) {
1✔
885
        startRequest.setSearchAttributes(r.getSearchAttributes());
1✔
886
      }
887
      if (r.hasWorkflowStartDelay()) {
1✔
888
        startRequest.setWorkflowStartDelay(r.getWorkflowStartDelay());
1✔
889
      }
890

891
      StartWorkflowExecutionResponse startResult =
1✔
892
          startWorkflowExecutionImpl(
1✔
893
              startRequest.build(),
1✔
894
              Duration.ZERO,
895
              Optional.empty(),
1✔
896
              OptionalLong.empty(),
1✔
897
              signalRequest);
898
      responseObserver.onNext(
1✔
899
          SignalWithStartWorkflowExecutionResponse.newBuilder()
1✔
900
              .setRunId(startResult.getRunId())
1✔
901
              .build());
1✔
902
      responseObserver.onCompleted();
1✔
903
    } catch (StatusRuntimeException e) {
1✔
904
      handleStatusRuntimeException(e, responseObserver);
1✔
905
    }
1✔
906
  }
1✔
907

908
  public void signalExternalWorkflowExecution(
909
      String signalId,
910
      SignalExternalWorkflowExecutionCommandAttributes commandAttributes,
911
      TestWorkflowMutableState source) {
912
    String namespace;
913
    if (commandAttributes.getNamespace().isEmpty()) {
1✔
914
      namespace = source.getExecutionId().getNamespace();
1✔
915
    } else {
916
      namespace = commandAttributes.getNamespace();
×
917
    }
918
    ExecutionId executionId = new ExecutionId(namespace, commandAttributes.getExecution());
1✔
919
    TestWorkflowMutableState mutableState;
920
    try {
921
      mutableState = getMutableState(executionId);
1✔
922
      mutableState.signalFromWorkflow(commandAttributes);
1✔
923
      source.completeSignalExternalWorkflowExecution(
1✔
924
          signalId, mutableState.getExecutionId().getExecution().getRunId());
1✔
925
    } catch (StatusRuntimeException e) {
1✔
926
      if (e.getStatus().getCode() == Status.Code.NOT_FOUND) {
1✔
927
        source.failSignalExternalWorkflowExecution(
1✔
928
            signalId,
929
            SignalExternalWorkflowExecutionFailedCause
930
                .SIGNAL_EXTERNAL_WORKFLOW_EXECUTION_FAILED_CAUSE_EXTERNAL_WORKFLOW_EXECUTION_NOT_FOUND);
931
      } else {
932
        throw e;
×
933
      }
934
    }
1✔
935
  }
1✔
936

937
  /**
938
   * Creates next run of a workflow execution
939
   *
940
   * @return RunId
941
   */
942
  public String continueAsNew(
943
      StartWorkflowExecutionRequest previousRunStartRequest,
944
      ContinueAsNewWorkflowExecutionCommandAttributes ca,
945
      WorkflowExecutionContinuedAsNewEventAttributes ea,
946
      Optional<TestServiceRetryState> retryState,
947
      String identity,
948
      ExecutionId continuedExecutionId,
949
      String firstExecutionRunId,
950
      Optional<TestWorkflowMutableState> parent,
951
      OptionalLong parentChildInitiatedEventId) {
952
    StartWorkflowExecutionRequest.Builder startRequestBuilder =
953
        StartWorkflowExecutionRequest.newBuilder()
1✔
954
            .setRequestId(UUID.randomUUID().toString())
1✔
955
            .setWorkflowType(ea.getWorkflowType())
1✔
956
            .setWorkflowRunTimeout(ea.getWorkflowRunTimeout())
1✔
957
            .setWorkflowTaskTimeout(ea.getWorkflowTaskTimeout())
1✔
958
            .setNamespace(continuedExecutionId.getNamespace())
1✔
959
            .setTaskQueue(ea.getTaskQueue())
1✔
960
            .setWorkflowId(continuedExecutionId.getWorkflowId().getWorkflowId())
1✔
961
            .setWorkflowIdReusePolicy(previousRunStartRequest.getWorkflowIdReusePolicy())
1✔
962
            .setIdentity(identity)
1✔
963
            .setCronSchedule(previousRunStartRequest.getCronSchedule());
1✔
964
    // TODO: Service doesn't perform this copy.
965
    // See https://github.com/temporalio/temporal/issues/5249
966
    //    if (previousRunStartRequest.hasRetryPolicy()) {
967
    //      startRequestBuilder.setRetryPolicy(previousRunStartRequest.getRetryPolicy());
968
    //    }
969
    if (ca.hasRetryPolicy()) {
1✔
970
      startRequestBuilder.setRetryPolicy(ca.getRetryPolicy());
1✔
971
    }
972
    if (ea.hasInput()) {
1✔
973
      startRequestBuilder.setInput(ea.getInput());
1✔
974
    }
975
    if (ea.hasHeader()) {
1✔
976
      startRequestBuilder.setHeader(ea.getHeader());
1✔
977
    }
978
    StartWorkflowExecutionRequest startRequest = startRequestBuilder.build();
1✔
979
    lock.lock();
1✔
980
    Optional<Failure> lastFail =
981
        ea.hasFailure()
1✔
982
            ? Optional.of(ea.getFailure())
1✔
983
            : retryState.flatMap(TestServiceRetryState::getPreviousRunFailure);
1✔
984
    try {
985
      StartWorkflowExecutionResponse response =
1✔
986
          startWorkflowExecutionNoRunningCheckLocked(
1✔
987
              startRequest,
988
              ea.getNewExecutionRunId(),
1✔
989
              firstExecutionRunId,
990
              Optional.of(continuedExecutionId.getExecution().getRunId()),
1✔
991
              retryState,
992
              ProtobufTimeUtils.toJavaDuration(ea.getBackoffStartInterval()),
1✔
993
              ea.getLastCompletionResult(),
1✔
994
              lastFail,
995
              parent,
996
              parentChildInitiatedEventId,
997
              null,
998
              continuedExecutionId.getWorkflowId());
1✔
999
      return response.getRunId();
1✔
1000
    } finally {
1001
      lock.unlock();
1✔
1002
    }
1003
  }
1004

1005
  @Override
1006
  public void listOpenWorkflowExecutions(
1007
      ListOpenWorkflowExecutionsRequest listRequest,
1008
      StreamObserver<ListOpenWorkflowExecutionsResponse> responseObserver) {
1009
    try {
1010
      Optional<String> workflowIdFilter;
1011
      if (listRequest.hasExecutionFilter()
1✔
1012
          && !listRequest.getExecutionFilter().getWorkflowId().isEmpty()) {
×
1013
        workflowIdFilter = Optional.of(listRequest.getExecutionFilter().getWorkflowId());
×
1014
      } else {
1015
        workflowIdFilter = Optional.empty();
1✔
1016
      }
1017
      List<WorkflowExecutionInfo> result =
1✔
1018
          store.listWorkflows(WorkflowState.OPEN, workflowIdFilter);
1✔
1019
      responseObserver.onNext(
1✔
1020
          ListOpenWorkflowExecutionsResponse.newBuilder().addAllExecutions(result).build());
1✔
1021
      responseObserver.onCompleted();
1✔
1022
    } catch (StatusRuntimeException e) {
×
1023
      handleStatusRuntimeException(e, responseObserver);
×
1024
    }
1✔
1025
  }
1✔
1026

1027
  @Override
1028
  public void listClosedWorkflowExecutions(
1029
      ListClosedWorkflowExecutionsRequest listRequest,
1030
      StreamObserver<ListClosedWorkflowExecutionsResponse> responseObserver) {
1031
    try {
1032
      Optional<String> workflowIdFilter;
1033
      if (listRequest.hasExecutionFilter()
1✔
1034
          && !listRequest.getExecutionFilter().getWorkflowId().isEmpty()) {
×
1035
        workflowIdFilter = Optional.of(listRequest.getExecutionFilter().getWorkflowId());
×
1036
      } else {
1037
        workflowIdFilter = Optional.empty();
1✔
1038
      }
1039
      List<WorkflowExecutionInfo> result =
1✔
1040
          store.listWorkflows(WorkflowState.CLOSED, workflowIdFilter);
1✔
1041
      responseObserver.onNext(
1✔
1042
          ListClosedWorkflowExecutionsResponse.newBuilder().addAllExecutions(result).build());
1✔
1043
      responseObserver.onCompleted();
1✔
1044
    } catch (StatusRuntimeException e) {
×
1045
      handleStatusRuntimeException(e, responseObserver);
×
1046
    }
1✔
1047
  }
1✔
1048

1049
  @Override
1050
  public void respondQueryTaskCompleted(
1051
      RespondQueryTaskCompletedRequest completeRequest,
1052
      StreamObserver<RespondQueryTaskCompletedResponse> responseObserver) {
1053
    try {
1054
      QueryId queryId = QueryId.fromBytes(completeRequest.getTaskToken());
1✔
1055
      TestWorkflowMutableState mutableState = getMutableState(queryId.getExecutionId());
1✔
1056
      mutableState.completeQuery(queryId, completeRequest);
1✔
1057
      responseObserver.onNext(RespondQueryTaskCompletedResponse.getDefaultInstance());
1✔
1058
      responseObserver.onCompleted();
1✔
1059
    } catch (StatusRuntimeException e) {
×
1060
      handleStatusRuntimeException(e, responseObserver);
×
1061
    }
1✔
1062
  }
1✔
1063

1064
  @Override
1065
  public void queryWorkflow(
1066
      QueryWorkflowRequest queryRequest, StreamObserver<QueryWorkflowResponse> responseObserver) {
1067
    try {
1068
      ExecutionId executionId =
1✔
1069
          new ExecutionId(queryRequest.getNamespace(), queryRequest.getExecution());
1✔
1070
      TestWorkflowMutableState mutableState = getMutableState(executionId);
1✔
1071
      @Nullable Deadline deadline = Context.current().getDeadline();
1✔
1072
      QueryWorkflowResponse result =
1✔
1073
          mutableState.query(
1✔
1074
              queryRequest,
1075
              deadline != null ? deadline.timeRemaining(TimeUnit.MILLISECONDS) : Long.MAX_VALUE);
1✔
1076
      responseObserver.onNext(result);
1✔
1077
      responseObserver.onCompleted();
1✔
1078
    } catch (StatusRuntimeException e) {
1✔
1079
      handleStatusRuntimeException(e, responseObserver);
1✔
1080
    }
1✔
1081
  }
1✔
1082

1083
  @Override
1084
  public void describeWorkflowExecution(
1085
      DescribeWorkflowExecutionRequest request,
1086
      StreamObserver<io.temporal.api.workflowservice.v1.DescribeWorkflowExecutionResponse>
1087
          responseObserver) {
1088
    try {
1089
      if (request.getNamespace().isEmpty()) {
1✔
1090
        throw createInvalidArgument("Namespace not set on request.");
×
1091
      }
1092
      if (!request.hasExecution()) {
1✔
1093
        throw createInvalidArgument("Execution not set on request.");
×
1094
      }
1095

1096
      ExecutionId executionId = new ExecutionId(request.getNamespace(), request.getExecution());
1✔
1097
      TestWorkflowMutableState mutableState = getMutableState(executionId);
1✔
1098
      DescribeWorkflowExecutionResponse result = mutableState.describeWorkflowExecution();
1✔
1099
      responseObserver.onNext(result);
1✔
1100
      responseObserver.onCompleted();
1✔
1101
    } catch (StatusRuntimeException e) {
1✔
1102
      handleStatusRuntimeException(e, responseObserver);
1✔
1103
    }
1✔
1104
  }
1✔
1105

1106
  /**
1107
   * This method doesn't make much sense for test server, it accepts all namespaces as existent and
1108
   * registered. so, it's a trivial implementation just returning an info that a namespace is
1109
   * registered irrespectively of the input
1110
   */
1111
  @Override
1112
  public void describeNamespace(
1113
      DescribeNamespaceRequest request,
1114
      StreamObserver<DescribeNamespaceResponse> responseObserver) {
1115
    try {
1116
      if (request.getNamespace().isEmpty()) {
1✔
1117
        throw createInvalidArgument("Namespace not set on request.");
×
1118
      }
1119
      // generating a stable UUID for name
1120
      String namespaceId = UUID.nameUUIDFromBytes(request.getNamespace().getBytes()).toString();
1✔
1121
      DescribeNamespaceResponse result =
1122
          DescribeNamespaceResponse.newBuilder()
1✔
1123
              .setNamespaceInfo(
1✔
1124
                  NamespaceInfo.newBuilder()
1✔
1125
                      .setName(request.getNamespace())
1✔
1126
                      .setState(NamespaceState.NAMESPACE_STATE_REGISTERED)
1✔
1127
                      .setId(namespaceId)
1✔
1128
                      .build())
1✔
1129
              .build();
1✔
1130
      responseObserver.onNext(result);
1✔
1131
      responseObserver.onCompleted();
1✔
1132
    } catch (StatusRuntimeException e) {
1✔
1133
      handleStatusRuntimeException(e, responseObserver);
1✔
1134
    }
1✔
1135
  }
1✔
1136

1137
  private <R> R requireNotNull(String fieldName, R value) {
1138
    if (value == null) {
1✔
1139
      throw Status.INVALID_ARGUMENT
×
1140
          .withDescription("Missing required field \"" + fieldName + "\".")
×
1141
          .asRuntimeException();
×
1142
    }
1143
    return value;
1✔
1144
  }
1145

1146
  /**
1147
   * Adds diagnostic data about internal service state to the provided {@link StringBuilder}.
1148
   * Includes histories of all workflow instances stored in the service.
1149
   */
1150
  public void getDiagnostics(StringBuilder result) {
1151
    store.getDiagnostics(result);
×
1152
  }
×
1153

1154
  /**
1155
   * @deprecated use {@link io.temporal.serviceclient.TestServiceStubs} and {@link
1156
   *     io.temporal.api.testservice.v1.TestServiceGrpc.TestServiceBlockingStub#getCurrentTime(Empty)}
1157
   */
1158
  @Deprecated
1159
  public long currentTimeMillis() {
1160
    return selfAdvancingTimer.getClock().getAsLong();
×
1161
  }
1162

1163
  /** Invokes callback after the specified delay according to internal service clock. */
1164
  public void registerDelayedCallback(Duration delay, Runnable r) {
1165
    store.registerDelayedCallback(delay, r);
1✔
1166
  }
1✔
1167

1168
  /**
1169
   * Disables time skipping. To re-enable call {@link #unlockTimeSkipping(String)}. These calls are
1170
   * counted, so calling unlock does not guarantee that time is going to be skipped immediately as
1171
   * another lock can be holding it.
1172
   *
1173
   * @deprecated use {@link io.temporal.serviceclient.TestServiceStubs} and {@link
1174
   *     io.temporal.api.testservice.v1.TestServiceGrpc.TestServiceBlockingStub#lockTimeSkipping(LockTimeSkippingRequest)}
1175
   */
1176
  @Deprecated
1177
  public void lockTimeSkipping(String caller) {
1178
    selfAdvancingTimer.lockTimeSkipping(caller);
×
1179
  }
×
1180

1181
  /**
1182
   * @deprecated use {@link io.temporal.serviceclient.TestServiceStubs} and {@link
1183
   *     io.temporal.api.testservice.v1.TestServiceGrpc.TestServiceBlockingStub#unlockTimeSkipping(UnlockTimeSkippingRequest)}
1184
   */
1185
  @Deprecated
1186
  public void unlockTimeSkipping(String caller) {
1187
    selfAdvancingTimer.unlockTimeSkipping(caller);
×
1188
  }
×
1189

1190
  /**
1191
   * Unlocks time skipping and blocks the calling thread until internal clock passes the current +
1192
   * duration time.<br>
1193
   * When the time is reached, locks time skipping and returns.<br>
1194
   * Might not block at all due to time skipping. Or might block if the time skipping lock counter
1195
   * was more than 1.
1196
   *
1197
   * @deprecated use {@link io.temporal.serviceclient.TestServiceStubs} and {@link
1198
   *     io.temporal.api.testservice.v1.TestServiceGrpc.TestServiceBlockingStub#unlockTimeSkippingWithSleep(SleepRequest)}
1199
   */
1200
  @Deprecated
1201
  public void sleep(Duration duration) {
1202
    CompletableFuture<Void> result = new CompletableFuture<>();
×
1203
    selfAdvancingTimer.schedule(
×
1204
        duration,
1205
        () -> {
1206
          selfAdvancingTimer.lockTimeSkipping("TestWorkflowService sleep");
×
1207
          result.complete(null);
×
1208
        },
×
1209
        "workflow sleep");
1210
    selfAdvancingTimer.unlockTimeSkipping("TestWorkflowService sleep");
×
1211
    try {
1212
      result.get();
×
1213
    } catch (InterruptedException e) {
×
1214
      Thread.currentThread().interrupt();
×
1215
      throw new RuntimeException(e);
×
1216
    } catch (ExecutionException e) {
×
1217
      throw new RuntimeException(e);
×
1218
    }
×
1219
  }
×
1220

1221
  /**
1222
   * Temporal server times out task queue long poll calls after 1 minute and returns an empty
1223
   * result. After which the request has to be retried by the client if it wants to continue
1224
   * waiting. We emulate this behavior here.
1225
   *
1226
   * <p>If there is a deadline present, for task queue poll requests server will respond inside the
1227
   * deadline. Note that the latest is not applicable for getWorkflowExecutionHistory() long polls.
1228
   *
1229
   * @return minimum between the context deadline and maximum long poll deadline.
1230
   */
1231
  private Deadline getLongPollDeadline() {
1232
    @Nullable Deadline deadline = Context.current().getDeadline();
1✔
1233
    Deadline maximumDeadline =
1✔
1234
        Deadline.after(
1✔
1235
            WorkflowServiceStubsOptions.DEFAULT_SERVER_LONG_POLL_RPC_TIMEOUT.toMillis(),
1✔
1236
            TimeUnit.MILLISECONDS);
1237
    return deadline != null ? deadline.minimum(maximumDeadline) : maximumDeadline;
1✔
1238
  }
1239

1240
  private void handleStatusRuntimeException(
1241
      StatusRuntimeException e, StreamObserver<?> responseObserver) {
1242
    if (e.getStatus().getCode() == Status.Code.INTERNAL) {
1✔
1243
      log.error("unexpected", e);
1✔
1244
    }
1245
    responseObserver.onError(e);
1✔
1246
  }
1✔
1247

1248
  /**
1249
   * Creates an in-memory service along with client stubs for use in Java code. See also
1250
   * createServerOnly and createWithNoGrpcServer.
1251
   *
1252
   * @deprecated use {@link io.temporal.testserver.TestServer#createServer(boolean)} instead and
1253
   *     pass {@code lockTimeSkipping=false} to emulate the behavior of this method
1254
   */
1255
  @Deprecated
1256
  public TestWorkflowService() {
1257
    this(0, true);
×
1258
  }
×
1259

1260
  /**
1261
   * Creates an in-memory service along with client stubs for use in Java code. See also
1262
   * createServerOnly and createWithNoGrpcServer.
1263
   *
1264
   * @deprecated use {@link io.temporal.testserver.TestServer#createServer(boolean, long)} instead
1265
   *     and pass {@code lockTimeSkipping=false} to emulate the behavior of this method
1266
   */
1267
  @Deprecated
1268
  public TestWorkflowService(long initialTimeMillis) {
1269
    this(initialTimeMillis, true);
×
1270
  }
×
1271

1272
  /**
1273
   * Creates an in-memory service along with client stubs for use in Java code. See also
1274
   * createServerOnly and createWithNoGrpcServer.
1275
   *
1276
   * @deprecated use {@link io.temporal.testserver.TestServer#createServer(boolean)} instead
1277
   */
1278
  @Deprecated
1279
  public TestWorkflowService(boolean lockTimeSkipping) {
1280
    this(0, true);
×
1281
    if (lockTimeSkipping) {
×
1282
      this.lockTimeSkipping("constructor");
×
1283
    }
1284
  }
×
1285

1286
  /**
1287
   * Creates an instance of TestWorkflowService that does not manage its own gRPC server. Useful for
1288
   * including in an externally managed gRPC server.
1289
   *
1290
   * @deprecated use {@link TestServicesStarter} to create just the services with gRPC server
1291
   */
1292
  @Deprecated
1293
  public static TestWorkflowService createWithNoGrpcServer() {
1294
    return new TestWorkflowService(0, false);
×
1295
  }
1296

1297
  private TestWorkflowService(long initialTimeMillis, boolean startInProcessServer) {
×
1298
    this.selfAdvancingTimer =
×
1299
        new SelfAdvancingTimerImpl(initialTimeMillis, Clock.systemDefaultZone());
×
1300
    store = new TestWorkflowStoreImpl(this.selfAdvancingTimer);
×
1301
    visibilityStore = new TestVisibilityStoreImpl();
×
1302
    outOfProcessServer = null;
×
1303
    if (startInProcessServer) {
×
1304
      this.inProcessServer = new InProcessGRPCServer(Collections.singletonList(this));
×
1305
      this.workflowServiceStubs =
×
1306
          WorkflowServiceStubs.newServiceStubs(
×
1307
              WorkflowServiceStubsOptions.newBuilder()
×
1308
                  .setChannel(inProcessServer.getChannel())
×
1309
                  .build());
×
1310
    } else {
1311
      this.inProcessServer = null;
×
1312
      this.workflowServiceStubs = null;
×
1313
    }
1314
  }
×
1315

1316
  /**
1317
   * Creates an out-of-process rather than in-process server, and does not set up a client. Useful,
1318
   * for example, if you want to use the test service from other SDKs.
1319
   *
1320
   * @param port the port to listen on
1321
   * @deprecated use {@link io.temporal.testserver.TestServer#createPortBoundServer(int, boolean)}
1322
   *     instead and pass {@code lockTimeSkipping=false} to emulate the behavior of this method
1323
   */
1324
  @Deprecated
1325
  public static TestWorkflowService createServerOnly(int port) {
1326
    TestWorkflowService result = new TestWorkflowService(true, port);
×
1327
    log.info("Server started, listening on " + port);
×
1328
    return result;
×
1329
  }
1330

1331
  private TestWorkflowService(boolean isOutOfProc, int port) {
×
1332
    // isOutOfProc is just here to make unambiguous constructor overloading.
1333
    Preconditions.checkState(isOutOfProc, "Impossible.");
×
1334
    inProcessServer = null;
×
1335
    workflowServiceStubs = null;
×
1336
    this.selfAdvancingTimer = new SelfAdvancingTimerImpl(0, Clock.systemDefaultZone());
×
1337
    store = new TestWorkflowStoreImpl(selfAdvancingTimer);
×
1338
    visibilityStore = new TestVisibilityStoreImpl();
×
1339
    try {
1340
      ServerBuilder<?> serverBuilder =
×
1341
          Grpc.newServerBuilderForPort(port, InsecureServerCredentials.create());
×
1342
      GRPCServerHelper.registerServicesAndHealthChecks(
×
1343
          Collections.singletonList(this), serverBuilder);
×
1344
      outOfProcessServer = serverBuilder.build().start();
×
1345
    } catch (IOException e) {
×
1346
      throw new RuntimeException(e);
×
1347
    }
×
1348
  }
×
1349

1350
  @Deprecated
1351
  public WorkflowServiceStubs newClientStub() {
1352
    if (workflowServiceStubs == null) {
×
1353
      throw new RuntimeException(
×
1354
          "Cannot get a client when you created your TestWorkflowService with createServerOnly.");
1355
    }
1356
    return workflowServiceStubs;
×
1357
  }
1358

1359
  private static StatusRuntimeException createInvalidArgument(String description) {
1360
    throw Status.INVALID_ARGUMENT.withDescription(description).asRuntimeException();
1✔
1361
  }
1362
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc