• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

temporalio / sdk-java / #171

pending completion
#171

push

github-actions

web-flow
Update README.md (#1765)

Fix typo in spring-boot-autoconfigure readme

17817 of 21804 relevant lines covered (81.71%)

0.82 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

70.04
/temporal-test-server/src/main/java/io/temporal/internal/testservice/TestWorkflowService.java
1
/*
2
 * Copyright (C) 2022 Temporal Technologies, Inc. All Rights Reserved.
3
 *
4
 * Copyright (C) 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved.
5
 *
6
 * Modifications copyright (C) 2017 Uber Technologies, Inc.
7
 *
8
 * Licensed under the Apache License, Version 2.0 (the "License");
9
 * you may not use this material except in compliance with the License.
10
 * You may obtain a copy of the License at
11
 *
12
 *   http://www.apache.org/licenses/LICENSE-2.0
13
 *
14
 * Unless required by applicable law or agreed to in writing, software
15
 * distributed under the License is distributed on an "AS IS" BASIS,
16
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17
 * See the License for the specific language governing permissions and
18
 * limitations under the License.
19
 */
20

21
package io.temporal.internal.testservice;
22

23
import static io.temporal.internal.testservice.CronUtils.getBackoffInterval;
24

25
import com.google.common.base.Preconditions;
26
import com.google.common.base.Throwables;
27
import com.google.protobuf.ByteString;
28
import com.google.protobuf.Empty;
29
import com.google.protobuf.Timestamp;
30
import com.google.protobuf.util.Timestamps;
31
import io.grpc.*;
32
import io.grpc.stub.StreamObserver;
33
import io.temporal.api.command.v1.SignalExternalWorkflowExecutionCommandAttributes;
34
import io.temporal.api.common.v1.Payload;
35
import io.temporal.api.common.v1.Payloads;
36
import io.temporal.api.common.v1.RetryPolicy;
37
import io.temporal.api.common.v1.WorkflowExecution;
38
import io.temporal.api.enums.v1.NamespaceState;
39
import io.temporal.api.enums.v1.SignalExternalWorkflowExecutionFailedCause;
40
import io.temporal.api.enums.v1.WorkflowExecutionStatus;
41
import io.temporal.api.enums.v1.WorkflowIdReusePolicy;
42
import io.temporal.api.errordetails.v1.WorkflowExecutionAlreadyStartedFailure;
43
import io.temporal.api.failure.v1.Failure;
44
import io.temporal.api.history.v1.WorkflowExecutionContinuedAsNewEventAttributes;
45
import io.temporal.api.namespace.v1.NamespaceInfo;
46
import io.temporal.api.testservice.v1.LockTimeSkippingRequest;
47
import io.temporal.api.testservice.v1.SleepRequest;
48
import io.temporal.api.testservice.v1.UnlockTimeSkippingRequest;
49
import io.temporal.api.workflow.v1.WorkflowExecutionInfo;
50
import io.temporal.api.workflowservice.v1.*;
51
import io.temporal.internal.common.ProtobufTimeUtils;
52
import io.temporal.internal.testservice.TestWorkflowStore.WorkflowState;
53
import io.temporal.serviceclient.StatusUtils;
54
import io.temporal.serviceclient.WorkflowServiceStubs;
55
import io.temporal.serviceclient.WorkflowServiceStubsOptions;
56
import java.io.Closeable;
57
import java.io.IOException;
58
import java.time.Clock;
59
import java.time.Duration;
60
import java.util.*;
61
import java.util.concurrent.*;
62
import java.util.concurrent.locks.Lock;
63
import java.util.concurrent.locks.ReentrantLock;
64
import javax.annotation.Nonnull;
65
import javax.annotation.Nullable;
66
import org.slf4j.Logger;
67
import org.slf4j.LoggerFactory;
68

69
/**
70
 * In memory implementation of the Workflow Service. To be used for testing purposes only.
71
 *
72
 * <p>Do not use directly, instead use {@link io.temporal.testing.TestWorkflowEnvironment}.
73
 */
74
public final class TestWorkflowService extends WorkflowServiceGrpc.WorkflowServiceImplBase
75
    implements Closeable {
76
  private static final Logger log = LoggerFactory.getLogger(TestWorkflowService.class);
1✔
77
  private final Map<ExecutionId, TestWorkflowMutableState> executions = new HashMap<>();
1✔
78
  // key->WorkflowId
79
  private final Map<WorkflowId, TestWorkflowMutableState> executionsByWorkflowId = new HashMap<>();
1✔
80
  private final ExecutorService executor = Executors.newCachedThreadPool();
1✔
81
  private final Lock lock = new ReentrantLock();
1✔
82

83
  private final TestWorkflowStore store;
84
  private final TestVisibilityStore visibilityStore;
85
  private final SelfAdvancingTimer selfAdvancingTimer;
86

87
  private final ScheduledExecutorService backgroundScheduler =
1✔
88
      Executors.newSingleThreadScheduledExecutor();
1✔
89

90
  private final Server outOfProcessServer;
91
  private final InProcessGRPCServer inProcessServer;
92
  private final WorkflowServiceStubs workflowServiceStubs;
93

94
  TestWorkflowService(
95
      TestWorkflowStore store,
96
      TestVisibilityStore visibilityStore,
97
      SelfAdvancingTimer selfAdvancingTimer) {
1✔
98
    this.store = store;
1✔
99
    this.visibilityStore = visibilityStore;
1✔
100
    this.selfAdvancingTimer = selfAdvancingTimer;
1✔
101
    this.outOfProcessServer = null;
1✔
102
    this.inProcessServer = null;
1✔
103
    this.workflowServiceStubs = null;
1✔
104
  }
1✔
105

106
  @Override
107
  public void close() {
108
    log.debug("Shutting down TestWorkflowService");
1✔
109

110
    log.debug("Shutting down background scheduler");
1✔
111
    backgroundScheduler.shutdown();
1✔
112

113
    if (outOfProcessServer != null) {
1✔
114
      log.info("Shutting down out-of-process GRPC server");
×
115
      outOfProcessServer.shutdown();
×
116
    }
117

118
    if (workflowServiceStubs != null) {
1✔
119
      workflowServiceStubs.shutdown();
×
120
    }
121

122
    if (inProcessServer != null) {
1✔
123
      log.info("Shutting down in-process GRPC server");
×
124
      inProcessServer.shutdown();
×
125
    }
126

127
    executor.shutdown();
1✔
128

129
    try {
130
      executor.awaitTermination(1, TimeUnit.SECONDS);
1✔
131

132
      if (outOfProcessServer != null) {
1✔
133
        outOfProcessServer.awaitTermination(1, TimeUnit.SECONDS);
×
134
      }
135

136
      if (workflowServiceStubs != null) {
1✔
137
        workflowServiceStubs.awaitTermination(1, TimeUnit.SECONDS);
×
138
      }
139

140
      if (inProcessServer != null) {
1✔
141
        inProcessServer.awaitTermination(1, TimeUnit.SECONDS);
×
142
      }
143

144
    } catch (InterruptedException e) {
×
145
      Thread.currentThread().interrupt();
×
146
      log.debug("shutdown interrupted", e);
×
147
    }
1✔
148

149
    store.close();
1✔
150
  }
1✔
151

152
  private TestWorkflowMutableState getMutableState(ExecutionId executionId) {
153
    return getMutableState(executionId, true);
1✔
154
  }
155

156
  private TestWorkflowMutableState getMutableState(ExecutionId executionId, boolean failNotExists) {
157
    lock.lock();
1✔
158
    try {
159
      if (executionId.getExecution().getRunId().isEmpty()) {
1✔
160
        return getMutableState(executionId.getWorkflowId(), failNotExists);
1✔
161
      }
162
      TestWorkflowMutableState mutableState = executions.get(executionId);
1✔
163
      if (mutableState == null && failNotExists) {
1✔
164
        throw Status.NOT_FOUND
1✔
165
            .withDescription(
1✔
166
                "Execution \""
167
                    + executionId
168
                    + "\" not found in mutable state. Known executions: "
169
                    + executions.values()
1✔
170
                    + ", service="
171
                    + this)
172
            .asRuntimeException();
1✔
173
      }
174
      return mutableState;
1✔
175
    } finally {
176
      lock.unlock();
1✔
177
    }
178
  }
179

180
  private TestWorkflowMutableState getMutableState(WorkflowId workflowId, boolean failNotExists) {
181
    lock.lock();
1✔
182
    try {
183
      TestWorkflowMutableState mutableState = executionsByWorkflowId.get(workflowId);
1✔
184
      if (mutableState == null && failNotExists) {
1✔
185
        throw Status.NOT_FOUND
1✔
186
            .withDescription("Execution not found in mutable state: " + workflowId)
1✔
187
            .asRuntimeException();
1✔
188
      }
189
      return mutableState;
1✔
190
    } finally {
191
      lock.unlock();
1✔
192
    }
193
  }
194

195
  @Override
196
  public void startWorkflowExecution(
197
      StartWorkflowExecutionRequest request,
198
      StreamObserver<StartWorkflowExecutionResponse> responseObserver) {
199
    try {
200
      Duration backoffInterval = getBackoffInterval(request.getCronSchedule(), store.currentTime());
1✔
201
      StartWorkflowExecutionResponse response =
1✔
202
          startWorkflowExecutionImpl(
1✔
203
              request, backoffInterval, Optional.empty(), OptionalLong.empty(), null);
1✔
204
      responseObserver.onNext(response);
1✔
205
      responseObserver.onCompleted();
1✔
206
    } catch (StatusRuntimeException e) {
1✔
207
      handleStatusRuntimeException(e, responseObserver);
1✔
208
    }
1✔
209
  }
1✔
210

211
  StartWorkflowExecutionResponse startWorkflowExecutionImpl(
212
      StartWorkflowExecutionRequest startRequest,
213
      Duration backoffStartInterval,
214
      Optional<TestWorkflowMutableState> parent,
215
      OptionalLong parentChildInitiatedEventId,
216
      @Nullable SignalWorkflowExecutionRequest signalWithStartSignal) {
217
    String requestWorkflowId = requireNotNull("WorkflowId", startRequest.getWorkflowId());
1✔
218
    String namespace = requireNotNull("Namespace", startRequest.getNamespace());
1✔
219
    WorkflowId workflowId = new WorkflowId(namespace, requestWorkflowId);
1✔
220
    TestWorkflowMutableState existing;
221
    lock.lock();
1✔
222
    try {
223
      String newRunId = UUID.randomUUID().toString();
1✔
224
      existing = executionsByWorkflowId.get(workflowId);
1✔
225
      if (existing != null) {
1✔
226
        WorkflowExecutionStatus status = existing.getWorkflowExecutionStatus();
1✔
227
        WorkflowIdReusePolicy policy = startRequest.getWorkflowIdReusePolicy();
1✔
228

229
        if (status == WorkflowExecutionStatus.WORKFLOW_EXECUTION_STATUS_RUNNING
1✔
230
            && policy == WorkflowIdReusePolicy.WORKFLOW_ID_REUSE_POLICY_TERMINATE_IF_RUNNING) {
231
          existing.terminateWorkflowExecution(
1✔
232
              TerminateWorkflowExecutionRequest.newBuilder()
1✔
233
                  .setNamespace(startRequest.getNamespace())
1✔
234
                  .setWorkflowExecution(existing.getExecutionId().getExecution())
1✔
235
                  .setReason("TerminateIfRunning WorkflowIdReusePolicy Policy")
1✔
236
                  .setIdentity("history-service")
1✔
237
                  .setDetails(
1✔
238
                      Payloads.newBuilder()
1✔
239
                          .addPayloads(
1✔
240
                              Payload.newBuilder()
1✔
241
                                  .setData(
1✔
242
                                      ByteString.copyFromUtf8(
1✔
243
                                          String.format("terminated by new runID: %s", newRunId)))
1✔
244
                                  .build())
1✔
245
                          .build())
1✔
246
                  .build());
1✔
247
        } else if (status == WorkflowExecutionStatus.WORKFLOW_EXECUTION_STATUS_RUNNING
1✔
248
            || policy == WorkflowIdReusePolicy.WORKFLOW_ID_REUSE_POLICY_REJECT_DUPLICATE) {
249
          return throwDuplicatedWorkflow(startRequest, existing);
×
250
        } else if (policy
1✔
251
                == WorkflowIdReusePolicy.WORKFLOW_ID_REUSE_POLICY_ALLOW_DUPLICATE_FAILED_ONLY
252
            && (status == WorkflowExecutionStatus.WORKFLOW_EXECUTION_STATUS_COMPLETED
253
                || status == WorkflowExecutionStatus.WORKFLOW_EXECUTION_STATUS_CONTINUED_AS_NEW)) {
254
          return throwDuplicatedWorkflow(startRequest, existing);
×
255
        }
256
      }
257
      Optional<TestServiceRetryState> retryState;
258
      Optional<Failure> lastFailure = Optional.empty();
1✔
259
      if (startRequest.hasRetryPolicy()) {
1✔
260
        Duration expirationInterval =
1✔
261
            ProtobufTimeUtils.toJavaDuration(startRequest.getWorkflowExecutionTimeout());
1✔
262
        retryState = newRetryStateLocked(startRequest.getRetryPolicy(), expirationInterval);
1✔
263
        if (retryState.isPresent()) {
1✔
264
          lastFailure = retryState.get().getPreviousRunFailure();
1✔
265
        }
266
      } else {
1✔
267
        retryState = Optional.empty();
1✔
268
      }
269
      return startWorkflowExecutionNoRunningCheckLocked(
1✔
270
          startRequest,
271
          newRunId,
272
          // it's the first execution in the continue-as-new chain, so firstExecutionRunId =
273
          // newRunId
274
          newRunId,
275
          Optional.empty(),
1✔
276
          retryState,
277
          backoffStartInterval,
278
          null,
279
          lastFailure,
280
          parent,
281
          parentChildInitiatedEventId,
282
          signalWithStartSignal,
283
          workflowId);
284
    } finally {
285
      lock.unlock();
1✔
286
    }
287
  }
288

289
  private Optional<TestServiceRetryState> newRetryStateLocked(
290
      RetryPolicy retryPolicy, Duration expirationInterval) {
291
    Timestamp expirationTime =
292
        expirationInterval.isZero()
1✔
293
            ? Timestamps.fromNanos(0)
1✔
294
            : Timestamps.add(
1✔
295
                store.currentTime(), ProtobufTimeUtils.toProtoDuration(expirationInterval));
1✔
296
    return Optional.of(new TestServiceRetryState(retryPolicy, expirationTime));
1✔
297
  }
298

299
  private StartWorkflowExecutionResponse throwDuplicatedWorkflow(
300
      StartWorkflowExecutionRequest startRequest, TestWorkflowMutableState existing) {
301
    WorkflowExecution execution = existing.getExecutionId().getExecution();
1✔
302
    WorkflowExecutionAlreadyStartedFailure error =
303
        WorkflowExecutionAlreadyStartedFailure.newBuilder()
1✔
304
            .setRunId(execution.getRunId())
1✔
305
            .setStartRequestId(startRequest.getRequestId())
1✔
306
            .build();
1✔
307
    throw StatusUtils.newException(
1✔
308
        Status.ALREADY_EXISTS.withDescription(
1✔
309
            String.format(
1✔
310
                "WorkflowId: %s, " + "RunId: %s", execution.getWorkflowId(), execution.getRunId())),
1✔
311
        error,
312
        WorkflowExecutionAlreadyStartedFailure.getDescriptor());
1✔
313
  }
314

315
  private StartWorkflowExecutionResponse startWorkflowExecutionNoRunningCheckLocked(
316
      StartWorkflowExecutionRequest startRequest,
317
      @Nonnull String runId,
318
      @Nonnull String firstExecutionRunId,
319
      Optional<String> continuedExecutionRunId,
320
      Optional<TestServiceRetryState> retryState,
321
      Duration backoffStartInterval,
322
      Payloads lastCompletionResult,
323
      Optional<Failure> lastFailure,
324
      Optional<TestWorkflowMutableState> parent,
325
      OptionalLong parentChildInitiatedEventId,
326
      @Nullable SignalWorkflowExecutionRequest signalWithStartSignal,
327
      WorkflowId workflowId) {
328
    String namespace = startRequest.getNamespace();
1✔
329
    TestWorkflowMutableState mutableState =
1✔
330
        new TestWorkflowMutableStateImpl(
331
            startRequest,
332
            firstExecutionRunId,
333
            runId,
334
            retryState,
335
            backoffStartInterval,
336
            lastCompletionResult,
337
            lastFailure,
338
            parent,
339
            parentChildInitiatedEventId,
340
            continuedExecutionRunId,
341
            this,
342
            store,
343
            visibilityStore,
344
            selfAdvancingTimer);
345
    WorkflowExecution execution = mutableState.getExecutionId().getExecution();
1✔
346
    ExecutionId executionId = new ExecutionId(namespace, execution);
1✔
347
    executionsByWorkflowId.put(workflowId, mutableState);
1✔
348
    executions.put(executionId, mutableState);
1✔
349

350
    PollWorkflowTaskQueueRequest eagerWorkflowTaskPollRequest =
351
        startRequest.getRequestEagerExecution()
1✔
352
            ? PollWorkflowTaskQueueRequest.newBuilder()
1✔
353
                .setIdentity(startRequest.getIdentity())
1✔
354
                .setNamespace(startRequest.getNamespace())
1✔
355
                .setTaskQueue(startRequest.getTaskQueue())
1✔
356
                .build()
1✔
357
            : null;
1✔
358

359
    @Nullable
360
    PollWorkflowTaskQueueResponse eagerWorkflowTask =
1✔
361
        mutableState.startWorkflow(
1✔
362
            continuedExecutionRunId.isPresent(),
1✔
363
            signalWithStartSignal,
364
            eagerWorkflowTaskPollRequest);
365
    StartWorkflowExecutionResponse.Builder response =
366
        StartWorkflowExecutionResponse.newBuilder().setRunId(execution.getRunId());
1✔
367
    if (eagerWorkflowTask != null) {
1✔
368
      response.setEagerWorkflowTask(eagerWorkflowTask);
1✔
369
    }
370
    return response.build();
1✔
371
  }
372

373
  @Override
374
  public void getWorkflowExecutionHistory(
375
      GetWorkflowExecutionHistoryRequest getRequest,
376
      StreamObserver<GetWorkflowExecutionHistoryResponse> responseObserver) {
377
    ExecutionId executionId = new ExecutionId(getRequest.getNamespace(), getRequest.getExecution());
1✔
378
    executor.execute(
1✔
379
        // preserving gRPC context deadline between threads
380
        Context.current()
1✔
381
            .wrap(
1✔
382
                () -> {
383
                  try {
384
                    TestWorkflowMutableState mutableState = getMutableState(executionId);
1✔
385
                    responseObserver.onNext(
1✔
386
                        store.getWorkflowExecutionHistory(
1✔
387
                            mutableState.getExecutionId(),
1✔
388
                            getRequest,
389
                            // We explicitly don't try to respond inside the context deadline.
390
                            // If we try to fit into the context deadline, the deadline may be not
391
                            // expired on the client side and an empty response will lead to a new
392
                            // request, making the client hammer the server at the tail end of the
393
                            // deadline.
394
                            // So this call is designed to wait fully till the end of the
395
                            // context deadline and throw DEADLINE_EXCEEDED if the deadline is less
396
                            // than 20s.
397
                            // If it's longer than 20 seconds - we return an empty result.
398
                            Deadline.after(20, TimeUnit.SECONDS)));
1✔
399
                    responseObserver.onCompleted();
1✔
400
                  } catch (StatusRuntimeException e) {
1✔
401
                    if (e.getStatus().getCode() == Status.Code.INTERNAL) {
1✔
402
                      log.error("unexpected", e);
×
403
                    }
404
                    responseObserver.onError(e);
1✔
405
                  } catch (Exception e) {
×
406
                    log.error("unexpected", e);
×
407
                    responseObserver.onError(e);
×
408
                  }
1✔
409
                }));
1✔
410
  }
1✔
411

412
  private <T> T pollTaskQueue(Context ctx, Future<T> futureValue)
413
      throws ExecutionException, InterruptedException {
414
    final Context.CancellationListener canceler = context -> futureValue.cancel(true);
1✔
415
    ctx.addListener(canceler, this.backgroundScheduler);
1✔
416
    try {
417
      return futureValue.get();
1✔
418
    } finally {
419
      ctx.removeListener(canceler);
1✔
420
    }
421
  }
422

423
  @Override
424
  public void pollWorkflowTaskQueue(
425
      PollWorkflowTaskQueueRequest pollRequest,
426
      StreamObserver<PollWorkflowTaskQueueResponse> responseObserver) {
427
    try (Context.CancellableContext ctx = deadlineCtx(getLongPollDeadline())) {
1✔
428
      PollWorkflowTaskQueueResponse.Builder task;
429
      try {
430
        task = pollTaskQueue(ctx, store.pollWorkflowTaskQueue(pollRequest));
1✔
431
      } catch (ExecutionException e) {
×
432
        responseObserver.onError(e);
×
433
        return;
×
434
      } catch (InterruptedException e) {
×
435
        Thread.currentThread().interrupt();
×
436
        responseObserver.onNext(PollWorkflowTaskQueueResponse.getDefaultInstance());
×
437
        responseObserver.onCompleted();
×
438
        return;
×
439
      } catch (CancellationException e) {
1✔
440
        responseObserver.onNext(PollWorkflowTaskQueueResponse.getDefaultInstance());
1✔
441
        responseObserver.onCompleted();
1✔
442
        return;
1✔
443
      }
1✔
444

445
      ExecutionId executionId =
1✔
446
          new ExecutionId(pollRequest.getNamespace(), task.getWorkflowExecution());
1✔
447
      TestWorkflowMutableState mutableState = getMutableState(executionId);
1✔
448
      try {
449
        mutableState.startWorkflowTask(task, pollRequest);
1✔
450
        // The task always has the original task queue that was created as part of the response.
451
        // This may be a different task queue than the task queue it was scheduled on, as in the
452
        // case of sticky execution.
453
        task.setWorkflowExecutionTaskQueue(mutableState.getStartRequest().getTaskQueue());
1✔
454
        PollWorkflowTaskQueueResponse response = task.build();
1✔
455
        responseObserver.onNext(response);
1✔
456
        responseObserver.onCompleted();
1✔
457
      } catch (StatusRuntimeException e) {
×
458
        if (e.getStatus().getCode() == Status.Code.NOT_FOUND) {
×
459
          if (log.isDebugEnabled()) {
×
460
            log.debug("Skipping outdated workflow task for " + executionId, e);
×
461
          }
462
          // The real service doesn't return this call on outdated task.
463
          // For simplicity, we return an empty result here.
464
          responseObserver.onNext(PollWorkflowTaskQueueResponse.getDefaultInstance());
×
465
          responseObserver.onCompleted();
×
466
        } else {
467
          if (e.getStatus().getCode() == Status.Code.INTERNAL) {
×
468
            log.error("unexpected", e);
×
469
          }
470
          responseObserver.onError(e);
×
471
        }
472
      }
1✔
473
    }
1✔
474
  }
1✔
475

476
  @Override
477
  public void respondWorkflowTaskCompleted(
478
      RespondWorkflowTaskCompletedRequest request,
479
      StreamObserver<RespondWorkflowTaskCompletedResponse> responseObserver) {
480
    try {
481
      WorkflowTaskToken taskToken = WorkflowTaskToken.fromBytes(request.getTaskToken());
1✔
482
      TestWorkflowMutableState mutableState = getMutableState(taskToken.getExecutionId());
1✔
483
      mutableState.completeWorkflowTask(taskToken.getHistorySize(), request);
1✔
484
      responseObserver.onNext(RespondWorkflowTaskCompletedResponse.getDefaultInstance());
1✔
485
      responseObserver.onCompleted();
1✔
486
    } catch (StatusRuntimeException e) {
1✔
487
      handleStatusRuntimeException(e, responseObserver);
1✔
488
    } catch (Throwable e) {
×
489
      responseObserver.onError(
×
490
          Status.INTERNAL
491
              .withDescription(Throwables.getStackTraceAsString(e))
×
492
              .withCause(e)
×
493
              .asRuntimeException());
×
494
    }
1✔
495
  }
1✔
496

497
  @Override
498
  public void respondWorkflowTaskFailed(
499
      RespondWorkflowTaskFailedRequest failedRequest,
500
      StreamObserver<RespondWorkflowTaskFailedResponse> responseObserver) {
501
    try {
502
      WorkflowTaskToken taskToken = WorkflowTaskToken.fromBytes(failedRequest.getTaskToken());
1✔
503
      TestWorkflowMutableState mutableState = getMutableState(taskToken.getExecutionId());
1✔
504
      mutableState.failWorkflowTask(failedRequest);
1✔
505
      responseObserver.onNext(RespondWorkflowTaskFailedResponse.getDefaultInstance());
1✔
506
      responseObserver.onCompleted();
1✔
507
    } catch (StatusRuntimeException e) {
×
508
      handleStatusRuntimeException(e, responseObserver);
×
509
    }
1✔
510
  }
1✔
511

512
  private Context.CancellableContext deadlineCtx(Deadline deadline) {
513
    return Context.current().withDeadline(deadline, this.backgroundScheduler);
1✔
514
  }
515

516
  @Override
517
  public void pollActivityTaskQueue(
518
      PollActivityTaskQueueRequest pollRequest,
519
      StreamObserver<PollActivityTaskQueueResponse> responseObserver) {
520
    try (Context.CancellableContext ctx = deadlineCtx(getLongPollDeadline())) {
1✔
521

522
      PollActivityTaskQueueResponse.Builder task;
523
      try {
524
        task = pollTaskQueue(ctx, store.pollActivityTaskQueue(pollRequest));
1✔
525
      } catch (ExecutionException e) {
×
526
        responseObserver.onError(e);
×
527
        return;
×
528
      } catch (InterruptedException e) {
×
529
        Thread.currentThread().interrupt();
×
530
        responseObserver.onNext(PollActivityTaskQueueResponse.getDefaultInstance());
×
531
        responseObserver.onCompleted();
×
532
        return;
×
533
      } catch (CancellationException e) {
1✔
534
        responseObserver.onNext(PollActivityTaskQueueResponse.getDefaultInstance());
1✔
535
        responseObserver.onCompleted();
1✔
536
        return;
1✔
537
      }
1✔
538

539
      ExecutionId executionId =
1✔
540
          new ExecutionId(pollRequest.getNamespace(), task.getWorkflowExecution());
1✔
541
      TestWorkflowMutableState mutableState = getMutableState(executionId);
1✔
542
      try {
543
        mutableState.startActivityTask(task, pollRequest);
1✔
544
        responseObserver.onNext(task.build());
1✔
545
        responseObserver.onCompleted();
1✔
546
      } catch (StatusRuntimeException e) {
1✔
547
        if (e.getStatus().getCode() == Status.Code.NOT_FOUND) {
1✔
548
          if (log.isDebugEnabled()) {
1✔
549
            log.debug("Skipping outdated activity task for " + executionId, e);
×
550
          }
551
          responseObserver.onNext(PollActivityTaskQueueResponse.getDefaultInstance());
1✔
552
          responseObserver.onCompleted();
1✔
553
        } else {
554
          if (e.getStatus().getCode() == Status.Code.INTERNAL) {
×
555
            log.error("unexpected", e);
×
556
          }
557
          responseObserver.onError(e);
×
558
        }
559
      }
1✔
560
    }
1✔
561
  }
1✔
562

563
  @Override
564
  public void recordActivityTaskHeartbeat(
565
      RecordActivityTaskHeartbeatRequest heartbeatRequest,
566
      StreamObserver<RecordActivityTaskHeartbeatResponse> responseObserver) {
567
    try {
568
      ActivityTaskToken activityTaskToken =
1✔
569
          ActivityTaskToken.fromBytes(heartbeatRequest.getTaskToken());
1✔
570
      TestWorkflowMutableState mutableState = getMutableState(activityTaskToken.getExecutionId());
1✔
571
      boolean cancelRequested =
1✔
572
          mutableState.heartbeatActivityTask(
1✔
573
              activityTaskToken.getScheduledEventId(), heartbeatRequest.getDetails());
1✔
574
      responseObserver.onNext(
1✔
575
          RecordActivityTaskHeartbeatResponse.newBuilder()
1✔
576
              .setCancelRequested(cancelRequested)
1✔
577
              .build());
1✔
578
      responseObserver.onCompleted();
1✔
579
    } catch (StatusRuntimeException e) {
1✔
580
      handleStatusRuntimeException(e, responseObserver);
1✔
581
    }
1✔
582
  }
1✔
583

584
  @Override
585
  public void recordActivityTaskHeartbeatById(
586
      RecordActivityTaskHeartbeatByIdRequest heartbeatRequest,
587
      StreamObserver<RecordActivityTaskHeartbeatByIdResponse> responseObserver) {
588
    try {
589
      ExecutionId execution =
×
590
          new ExecutionId(
591
              heartbeatRequest.getNamespace(),
×
592
              heartbeatRequest.getWorkflowId(),
×
593
              heartbeatRequest.getRunId());
×
594
      TestWorkflowMutableState mutableState = getMutableState(execution);
×
595
      boolean cancelRequested =
×
596
          mutableState.heartbeatActivityTaskById(
×
597
              heartbeatRequest.getActivityId(),
×
598
              heartbeatRequest.getDetails(),
×
599
              heartbeatRequest.getIdentity());
×
600
      responseObserver.onNext(
×
601
          RecordActivityTaskHeartbeatByIdResponse.newBuilder()
×
602
              .setCancelRequested(cancelRequested)
×
603
              .build());
×
604
      responseObserver.onCompleted();
×
605
    } catch (StatusRuntimeException e) {
×
606
      handleStatusRuntimeException(e, responseObserver);
×
607
    }
×
608
  }
×
609

610
  @Override
611
  public void respondActivityTaskCompleted(
612
      RespondActivityTaskCompletedRequest completeRequest,
613
      StreamObserver<RespondActivityTaskCompletedResponse> responseObserver) {
614
    try {
615
      ActivityTaskToken activityTaskToken =
1✔
616
          ActivityTaskToken.fromBytes(completeRequest.getTaskToken());
1✔
617
      TestWorkflowMutableState mutableState = getMutableState(activityTaskToken.getExecutionId());
1✔
618
      mutableState.completeActivityTask(activityTaskToken.getScheduledEventId(), completeRequest);
1✔
619
      responseObserver.onNext(RespondActivityTaskCompletedResponse.getDefaultInstance());
1✔
620
      responseObserver.onCompleted();
1✔
621
    } catch (StatusRuntimeException e) {
1✔
622
      handleStatusRuntimeException(e, responseObserver);
1✔
623
    }
1✔
624
  }
1✔
625

626
  @Override
627
  public void respondActivityTaskCompletedById(
628
      RespondActivityTaskCompletedByIdRequest completeRequest,
629
      StreamObserver<RespondActivityTaskCompletedByIdResponse> responseObserver) {
630
    try {
631
      ExecutionId executionId =
×
632
          new ExecutionId(
633
              completeRequest.getNamespace(),
×
634
              completeRequest.getWorkflowId(),
×
635
              completeRequest.getRunId());
×
636
      TestWorkflowMutableState mutableState = getMutableState(executionId);
×
637
      mutableState.completeActivityTaskById(completeRequest.getActivityId(), completeRequest);
×
638
      responseObserver.onNext(RespondActivityTaskCompletedByIdResponse.getDefaultInstance());
×
639
      responseObserver.onCompleted();
×
640
    } catch (StatusRuntimeException e) {
×
641
      handleStatusRuntimeException(e, responseObserver);
×
642
    }
×
643
  }
×
644

645
  @Override
646
  public void respondActivityTaskFailed(
647
      RespondActivityTaskFailedRequest failRequest,
648
      StreamObserver<RespondActivityTaskFailedResponse> responseObserver) {
649
    try {
650
      ActivityTaskToken activityTaskToken = ActivityTaskToken.fromBytes(failRequest.getTaskToken());
1✔
651
      TestWorkflowMutableState mutableState = getMutableState(activityTaskToken.getExecutionId());
1✔
652
      mutableState.failActivityTask(activityTaskToken.getScheduledEventId(), failRequest);
1✔
653
      responseObserver.onNext(RespondActivityTaskFailedResponse.getDefaultInstance());
1✔
654
      responseObserver.onCompleted();
1✔
655
    } catch (StatusRuntimeException e) {
1✔
656
      handleStatusRuntimeException(e, responseObserver);
1✔
657
    }
1✔
658
  }
1✔
659

660
  @Override
661
  public void respondActivityTaskFailedById(
662
      RespondActivityTaskFailedByIdRequest failRequest,
663
      StreamObserver<RespondActivityTaskFailedByIdResponse> responseObserver) {
664
    try {
665
      ExecutionId executionId =
×
666
          new ExecutionId(
667
              failRequest.getNamespace(), failRequest.getWorkflowId(), failRequest.getRunId());
×
668
      TestWorkflowMutableState mutableState = getMutableState(executionId);
×
669
      mutableState.failActivityTaskById(failRequest.getActivityId(), failRequest);
×
670
      responseObserver.onNext(RespondActivityTaskFailedByIdResponse.getDefaultInstance());
×
671
      responseObserver.onCompleted();
×
672
    } catch (StatusRuntimeException e) {
×
673
      handleStatusRuntimeException(e, responseObserver);
×
674
    }
×
675
  }
×
676

677
  @Override
678
  public void respondActivityTaskCanceled(
679
      RespondActivityTaskCanceledRequest canceledRequest,
680
      StreamObserver<RespondActivityTaskCanceledResponse> responseObserver) {
681
    try {
682
      ActivityTaskToken activityTaskToken =
1✔
683
          ActivityTaskToken.fromBytes(canceledRequest.getTaskToken());
1✔
684
      TestWorkflowMutableState mutableState = getMutableState(activityTaskToken.getExecutionId());
1✔
685
      mutableState.cancelActivityTask(activityTaskToken.getScheduledEventId(), canceledRequest);
1✔
686
      responseObserver.onNext(RespondActivityTaskCanceledResponse.getDefaultInstance());
1✔
687
      responseObserver.onCompleted();
1✔
688
    } catch (StatusRuntimeException e) {
1✔
689
      handleStatusRuntimeException(e, responseObserver);
1✔
690
    }
1✔
691
  }
1✔
692

693
  @Override
694
  public void respondActivityTaskCanceledById(
695
      RespondActivityTaskCanceledByIdRequest canceledRequest,
696
      StreamObserver<RespondActivityTaskCanceledByIdResponse> responseObserver) {
697
    try {
698
      ExecutionId executionId =
×
699
          new ExecutionId(
700
              canceledRequest.getNamespace(),
×
701
              canceledRequest.getWorkflowId(),
×
702
              canceledRequest.getRunId());
×
703
      TestWorkflowMutableState mutableState = getMutableState(executionId);
×
704
      mutableState.cancelActivityTaskById(canceledRequest.getActivityId(), canceledRequest);
×
705
      responseObserver.onNext(RespondActivityTaskCanceledByIdResponse.getDefaultInstance());
×
706
      responseObserver.onCompleted();
×
707
    } catch (StatusRuntimeException e) {
×
708
      handleStatusRuntimeException(e, responseObserver);
×
709
    }
×
710
  }
×
711

712
  @Override
713
  public void requestCancelWorkflowExecution(
714
      RequestCancelWorkflowExecutionRequest cancelRequest,
715
      StreamObserver<RequestCancelWorkflowExecutionResponse> responseObserver) {
716
    try {
717
      requestCancelWorkflowExecution(cancelRequest, Optional.empty());
1✔
718
      responseObserver.onNext(RequestCancelWorkflowExecutionResponse.getDefaultInstance());
1✔
719
      responseObserver.onCompleted();
1✔
720
    } catch (StatusRuntimeException e) {
1✔
721
      handleStatusRuntimeException(e, responseObserver);
1✔
722
    }
1✔
723
  }
1✔
724

725
  void requestCancelWorkflowExecution(
726
      RequestCancelWorkflowExecutionRequest cancelRequest,
727
      Optional<TestWorkflowMutableStateImpl.CancelExternalWorkflowExecutionCallerInfo> callerInfo) {
728
    ExecutionId executionId =
1✔
729
        new ExecutionId(cancelRequest.getNamespace(), cancelRequest.getWorkflowExecution());
1✔
730
    TestWorkflowMutableState mutableState = getMutableState(executionId);
1✔
731
    mutableState.requestCancelWorkflowExecution(cancelRequest, callerInfo);
1✔
732
  }
1✔
733

734
  @Override
735
  public void terminateWorkflowExecution(
736
      TerminateWorkflowExecutionRequest request,
737
      StreamObserver<TerminateWorkflowExecutionResponse> responseObserver) {
738
    try {
739
      terminateWorkflowExecution(request);
1✔
740
      responseObserver.onNext(TerminateWorkflowExecutionResponse.getDefaultInstance());
1✔
741
      responseObserver.onCompleted();
1✔
742
    } catch (StatusRuntimeException e) {
1✔
743
      handleStatusRuntimeException(e, responseObserver);
1✔
744
    }
1✔
745
  }
1✔
746

747
  private void terminateWorkflowExecution(TerminateWorkflowExecutionRequest request) {
748
    ExecutionId executionId =
1✔
749
        new ExecutionId(request.getNamespace(), request.getWorkflowExecution());
1✔
750
    TestWorkflowMutableState mutableState = getMutableState(executionId);
1✔
751
    mutableState.terminateWorkflowExecution(request);
1✔
752
  }
1✔
753

754
  @Override
755
  public void signalWorkflowExecution(
756
      SignalWorkflowExecutionRequest signalRequest,
757
      StreamObserver<SignalWorkflowExecutionResponse> responseObserver) {
758
    try {
759
      ExecutionId executionId =
1✔
760
          new ExecutionId(signalRequest.getNamespace(), signalRequest.getWorkflowExecution());
1✔
761
      TestWorkflowMutableState mutableState = getMutableState(executionId);
1✔
762
      mutableState.signal(signalRequest);
1✔
763
      responseObserver.onNext(SignalWorkflowExecutionResponse.getDefaultInstance());
1✔
764
      responseObserver.onCompleted();
1✔
765
    } catch (StatusRuntimeException e) {
1✔
766
      handleStatusRuntimeException(e, responseObserver);
1✔
767
    }
1✔
768
  }
1✔
769

770
  @Override
771
  public void updateWorkflowExecution(
772
      UpdateWorkflowExecutionRequest request,
773
      StreamObserver<UpdateWorkflowExecutionResponse> responseObserver) {
774
    try {
775
      ExecutionId executionId =
1✔
776
          new ExecutionId(request.getNamespace(), request.getWorkflowExecution());
1✔
777
      TestWorkflowMutableState mutableState = getMutableState(executionId);
1✔
778
      @Nullable Deadline deadline = Context.current().getDeadline();
1✔
779
      UpdateWorkflowExecutionResponse response =
1✔
780
          mutableState.updateWorkflowExecution(request, deadline);
1✔
781
      responseObserver.onNext(response);
1✔
782
      responseObserver.onCompleted();
1✔
783
    } catch (StatusRuntimeException e) {
1✔
784
      handleStatusRuntimeException(e, responseObserver);
1✔
785
    }
1✔
786
  }
1✔
787

788
  @Override
789
  public void pollWorkflowExecutionUpdate(
790
      PollWorkflowExecutionUpdateRequest request,
791
      StreamObserver<PollWorkflowExecutionUpdateResponse> responseObserver) {
792
    responseObserver.onError(
×
793
        Status.UNIMPLEMENTED
794
            .withDescription("Test server does not implement poll update")
×
795
            .asRuntimeException());
×
796
  }
×
797

798
  @Override
799
  public void signalWithStartWorkflowExecution(
800
      SignalWithStartWorkflowExecutionRequest r,
801
      StreamObserver<SignalWithStartWorkflowExecutionResponse> responseObserver) {
802
    try {
803
      if (!r.hasTaskQueue()) {
1✔
804
        throw Status.INVALID_ARGUMENT
×
805
            .withDescription("request missing required taskQueue field")
×
806
            .asRuntimeException();
×
807
      }
808
      if (!r.hasWorkflowType()) {
1✔
809
        throw Status.INVALID_ARGUMENT
×
810
            .withDescription("request missing required workflowType field")
×
811
            .asRuntimeException();
×
812
      }
813
      ExecutionId executionId = new ExecutionId(r.getNamespace(), r.getWorkflowId(), null);
1✔
814
      TestWorkflowMutableState mutableState = getMutableState(executionId, false);
1✔
815
      SignalWorkflowExecutionRequest signalRequest =
816
          SignalWorkflowExecutionRequest.newBuilder()
1✔
817
              .setInput(r.getSignalInput())
1✔
818
              .setSignalName(r.getSignalName())
1✔
819
              .setWorkflowExecution(executionId.getExecution())
1✔
820
              .setRequestId(r.getRequestId())
1✔
821
              .setControl(r.getControl())
1✔
822
              .setNamespace(r.getNamespace())
1✔
823
              .setIdentity(r.getIdentity())
1✔
824
              .build();
1✔
825
      if (mutableState != null && !mutableState.isTerminalState()) {
1✔
826
        mutableState.signal(signalRequest);
1✔
827
        responseObserver.onNext(
1✔
828
            SignalWithStartWorkflowExecutionResponse.newBuilder()
1✔
829
                .setRunId(mutableState.getExecutionId().getExecution().getRunId())
1✔
830
                .build());
1✔
831
        responseObserver.onCompleted();
1✔
832
        return;
1✔
833
      }
834
      StartWorkflowExecutionRequest.Builder startRequest =
835
          StartWorkflowExecutionRequest.newBuilder()
1✔
836
              .setRequestId(r.getRequestId())
1✔
837
              .setInput(r.getInput())
1✔
838
              .setWorkflowExecutionTimeout(r.getWorkflowExecutionTimeout())
1✔
839
              .setWorkflowRunTimeout(r.getWorkflowRunTimeout())
1✔
840
              .setWorkflowTaskTimeout(r.getWorkflowTaskTimeout())
1✔
841
              .setNamespace(r.getNamespace())
1✔
842
              .setTaskQueue(r.getTaskQueue())
1✔
843
              .setWorkflowId(r.getWorkflowId())
1✔
844
              .setWorkflowIdReusePolicy(r.getWorkflowIdReusePolicy())
1✔
845
              .setIdentity(r.getIdentity())
1✔
846
              .setWorkflowType(r.getWorkflowType())
1✔
847
              .setCronSchedule(r.getCronSchedule())
1✔
848
              .setRequestId(r.getRequestId());
1✔
849
      if (r.hasRetryPolicy()) {
1✔
850
        startRequest.setRetryPolicy(r.getRetryPolicy());
×
851
      }
852
      if (r.hasHeader()) {
1✔
853
        startRequest.setHeader(r.getHeader());
1✔
854
      }
855
      if (r.hasMemo()) {
1✔
856
        startRequest.setMemo(r.getMemo());
×
857
      }
858
      if (r.hasSearchAttributes()) {
1✔
859
        startRequest.setSearchAttributes(r.getSearchAttributes());
1✔
860
      }
861
      StartWorkflowExecutionResponse startResult =
1✔
862
          startWorkflowExecutionImpl(
1✔
863
              startRequest.build(),
1✔
864
              Duration.ZERO,
865
              Optional.empty(),
1✔
866
              OptionalLong.empty(),
1✔
867
              signalRequest);
868
      responseObserver.onNext(
1✔
869
          SignalWithStartWorkflowExecutionResponse.newBuilder()
1✔
870
              .setRunId(startResult.getRunId())
1✔
871
              .build());
1✔
872
      responseObserver.onCompleted();
1✔
873
    } catch (StatusRuntimeException e) {
1✔
874
      handleStatusRuntimeException(e, responseObserver);
1✔
875
    }
1✔
876
  }
1✔
877

878
  public void signalExternalWorkflowExecution(
879
      String signalId,
880
      SignalExternalWorkflowExecutionCommandAttributes commandAttributes,
881
      TestWorkflowMutableState source) {
882
    String namespace;
883
    if (commandAttributes.getNamespace().isEmpty()) {
1✔
884
      namespace = source.getExecutionId().getNamespace();
1✔
885
    } else {
886
      namespace = commandAttributes.getNamespace();
×
887
    }
888
    ExecutionId executionId = new ExecutionId(namespace, commandAttributes.getExecution());
1✔
889
    TestWorkflowMutableState mutableState;
890
    try {
891
      mutableState = getMutableState(executionId);
1✔
892
      mutableState.signalFromWorkflow(commandAttributes);
1✔
893
      source.completeSignalExternalWorkflowExecution(
1✔
894
          signalId, mutableState.getExecutionId().getExecution().getRunId());
1✔
895
    } catch (StatusRuntimeException e) {
1✔
896
      if (e.getStatus().getCode() == Status.Code.NOT_FOUND) {
1✔
897
        source.failSignalExternalWorkflowExecution(
1✔
898
            signalId,
899
            SignalExternalWorkflowExecutionFailedCause
900
                .SIGNAL_EXTERNAL_WORKFLOW_EXECUTION_FAILED_CAUSE_EXTERNAL_WORKFLOW_EXECUTION_NOT_FOUND);
901
      } else {
902
        throw e;
×
903
      }
904
    }
1✔
905
  }
1✔
906

907
  /**
908
   * Creates next run of a workflow execution
909
   *
910
   * @return RunId
911
   */
912
  public String continueAsNew(
913
      StartWorkflowExecutionRequest previousRunStartRequest,
914
      WorkflowExecutionContinuedAsNewEventAttributes a,
915
      Optional<TestServiceRetryState> retryState,
916
      String identity,
917
      ExecutionId continuedExecutionId,
918
      String firstExecutionRunId,
919
      Optional<TestWorkflowMutableState> parent,
920
      OptionalLong parentChildInitiatedEventId) {
921
    StartWorkflowExecutionRequest.Builder startRequestBuilder =
922
        StartWorkflowExecutionRequest.newBuilder()
1✔
923
            .setRequestId(UUID.randomUUID().toString())
1✔
924
            .setWorkflowType(a.getWorkflowType())
1✔
925
            .setWorkflowRunTimeout(a.getWorkflowRunTimeout())
1✔
926
            .setWorkflowTaskTimeout(a.getWorkflowTaskTimeout())
1✔
927
            .setNamespace(continuedExecutionId.getNamespace())
1✔
928
            .setTaskQueue(a.getTaskQueue())
1✔
929
            .setWorkflowId(continuedExecutionId.getWorkflowId().getWorkflowId())
1✔
930
            .setWorkflowIdReusePolicy(previousRunStartRequest.getWorkflowIdReusePolicy())
1✔
931
            .setIdentity(identity)
1✔
932
            .setCronSchedule(previousRunStartRequest.getCronSchedule());
1✔
933
    if (previousRunStartRequest.hasRetryPolicy()) {
1✔
934
      startRequestBuilder.setRetryPolicy(previousRunStartRequest.getRetryPolicy());
1✔
935
    }
936
    if (a.hasInput()) {
1✔
937
      startRequestBuilder.setInput(a.getInput());
1✔
938
    }
939
    if (a.hasHeader()) {
1✔
940
      startRequestBuilder.setHeader(a.getHeader());
1✔
941
    }
942
    StartWorkflowExecutionRequest startRequest = startRequestBuilder.build();
1✔
943
    lock.lock();
1✔
944
    Optional<Failure> lastFail =
945
        a.hasFailure()
1✔
946
            ? Optional.of(a.getFailure())
1✔
947
            : retryState.flatMap(TestServiceRetryState::getPreviousRunFailure);
1✔
948
    try {
949
      StartWorkflowExecutionResponse response =
1✔
950
          startWorkflowExecutionNoRunningCheckLocked(
1✔
951
              startRequest,
952
              a.getNewExecutionRunId(),
1✔
953
              firstExecutionRunId,
954
              Optional.of(continuedExecutionId.getExecution().getRunId()),
1✔
955
              retryState,
956
              ProtobufTimeUtils.toJavaDuration(a.getBackoffStartInterval()),
1✔
957
              a.getLastCompletionResult(),
1✔
958
              lastFail,
959
              parent,
960
              parentChildInitiatedEventId,
961
              null,
962
              continuedExecutionId.getWorkflowId());
1✔
963
      return response.getRunId();
1✔
964
    } finally {
965
      lock.unlock();
1✔
966
    }
967
  }
968

969
  @Override
970
  public void listOpenWorkflowExecutions(
971
      ListOpenWorkflowExecutionsRequest listRequest,
972
      StreamObserver<ListOpenWorkflowExecutionsResponse> responseObserver) {
973
    try {
974
      Optional<String> workflowIdFilter;
975
      if (listRequest.hasExecutionFilter()
1✔
976
          && !listRequest.getExecutionFilter().getWorkflowId().isEmpty()) {
×
977
        workflowIdFilter = Optional.of(listRequest.getExecutionFilter().getWorkflowId());
×
978
      } else {
979
        workflowIdFilter = Optional.empty();
1✔
980
      }
981
      List<WorkflowExecutionInfo> result =
1✔
982
          store.listWorkflows(WorkflowState.OPEN, workflowIdFilter);
1✔
983
      responseObserver.onNext(
1✔
984
          ListOpenWorkflowExecutionsResponse.newBuilder().addAllExecutions(result).build());
1✔
985
      responseObserver.onCompleted();
1✔
986
    } catch (StatusRuntimeException e) {
×
987
      handleStatusRuntimeException(e, responseObserver);
×
988
    }
1✔
989
  }
1✔
990

991
  @Override
992
  public void listClosedWorkflowExecutions(
993
      ListClosedWorkflowExecutionsRequest listRequest,
994
      StreamObserver<ListClosedWorkflowExecutionsResponse> responseObserver) {
995
    try {
996
      Optional<String> workflowIdFilter;
997
      if (listRequest.hasExecutionFilter()
1✔
998
          && !listRequest.getExecutionFilter().getWorkflowId().isEmpty()) {
×
999
        workflowIdFilter = Optional.of(listRequest.getExecutionFilter().getWorkflowId());
×
1000
      } else {
1001
        workflowIdFilter = Optional.empty();
1✔
1002
      }
1003
      List<WorkflowExecutionInfo> result =
1✔
1004
          store.listWorkflows(WorkflowState.CLOSED, workflowIdFilter);
1✔
1005
      responseObserver.onNext(
1✔
1006
          ListClosedWorkflowExecutionsResponse.newBuilder().addAllExecutions(result).build());
1✔
1007
      responseObserver.onCompleted();
1✔
1008
    } catch (StatusRuntimeException e) {
×
1009
      handleStatusRuntimeException(e, responseObserver);
×
1010
    }
1✔
1011
  }
1✔
1012

1013
  @Override
1014
  public void respondQueryTaskCompleted(
1015
      RespondQueryTaskCompletedRequest completeRequest,
1016
      StreamObserver<RespondQueryTaskCompletedResponse> responseObserver) {
1017
    try {
1018
      QueryId queryId = QueryId.fromBytes(completeRequest.getTaskToken());
1✔
1019
      TestWorkflowMutableState mutableState = getMutableState(queryId.getExecutionId());
1✔
1020
      mutableState.completeQuery(queryId, completeRequest);
1✔
1021
      responseObserver.onNext(RespondQueryTaskCompletedResponse.getDefaultInstance());
1✔
1022
      responseObserver.onCompleted();
1✔
1023
    } catch (StatusRuntimeException e) {
×
1024
      handleStatusRuntimeException(e, responseObserver);
×
1025
    }
1✔
1026
  }
1✔
1027

1028
  @Override
1029
  public void queryWorkflow(
1030
      QueryWorkflowRequest queryRequest, StreamObserver<QueryWorkflowResponse> responseObserver) {
1031
    try {
1032
      ExecutionId executionId =
1✔
1033
          new ExecutionId(queryRequest.getNamespace(), queryRequest.getExecution());
1✔
1034
      TestWorkflowMutableState mutableState = getMutableState(executionId);
1✔
1035
      @Nullable Deadline deadline = Context.current().getDeadline();
1✔
1036
      QueryWorkflowResponse result =
1✔
1037
          mutableState.query(
1✔
1038
              queryRequest,
1039
              deadline != null ? deadline.timeRemaining(TimeUnit.MILLISECONDS) : Long.MAX_VALUE);
1✔
1040
      responseObserver.onNext(result);
1✔
1041
      responseObserver.onCompleted();
1✔
1042
    } catch (StatusRuntimeException e) {
1✔
1043
      handleStatusRuntimeException(e, responseObserver);
1✔
1044
    }
1✔
1045
  }
1✔
1046

1047
  @Override
1048
  public void describeWorkflowExecution(
1049
      DescribeWorkflowExecutionRequest request,
1050
      StreamObserver<io.temporal.api.workflowservice.v1.DescribeWorkflowExecutionResponse>
1051
          responseObserver) {
1052
    try {
1053
      if (request.getNamespace().isEmpty()) {
1✔
1054
        throw createInvalidArgument("Namespace not set on request.");
×
1055
      }
1056
      if (!request.hasExecution()) {
1✔
1057
        throw createInvalidArgument("Execution not set on request.");
×
1058
      }
1059

1060
      ExecutionId executionId = new ExecutionId(request.getNamespace(), request.getExecution());
1✔
1061
      TestWorkflowMutableState mutableState = getMutableState(executionId);
1✔
1062
      DescribeWorkflowExecutionResponse result = mutableState.describeWorkflowExecution();
1✔
1063
      responseObserver.onNext(result);
1✔
1064
      responseObserver.onCompleted();
1✔
1065
    } catch (StatusRuntimeException e) {
1✔
1066
      handleStatusRuntimeException(e, responseObserver);
1✔
1067
    }
1✔
1068
  }
1✔
1069

1070
  /**
1071
   * This method doesn't make much sense for test server, it accepts all namespaces as existent and
1072
   * registered. so, it's a trivial implementation just returning an info that a namespace is
1073
   * registered irrespectively of the input
1074
   */
1075
  @Override
1076
  public void describeNamespace(
1077
      DescribeNamespaceRequest request,
1078
      StreamObserver<DescribeNamespaceResponse> responseObserver) {
1079
    try {
1080
      if (request.getNamespace().isEmpty()) {
1✔
1081
        throw createInvalidArgument("Namespace not set on request.");
×
1082
      }
1083
      // generating a stable UUID for name
1084
      String namespaceId = UUID.nameUUIDFromBytes(request.getNamespace().getBytes()).toString();
1✔
1085
      DescribeNamespaceResponse result =
1086
          DescribeNamespaceResponse.newBuilder()
1✔
1087
              .setNamespaceInfo(
1✔
1088
                  NamespaceInfo.newBuilder()
1✔
1089
                      .setName(request.getNamespace())
1✔
1090
                      .setState(NamespaceState.NAMESPACE_STATE_REGISTERED)
1✔
1091
                      .setId(namespaceId)
1✔
1092
                      .build())
1✔
1093
              .build();
1✔
1094
      responseObserver.onNext(result);
1✔
1095
      responseObserver.onCompleted();
1✔
1096
    } catch (StatusRuntimeException e) {
1✔
1097
      handleStatusRuntimeException(e, responseObserver);
1✔
1098
    }
1✔
1099
  }
1✔
1100

1101
  private <R> R requireNotNull(String fieldName, R value) {
1102
    if (value == null) {
1✔
1103
      throw Status.INVALID_ARGUMENT
×
1104
          .withDescription("Missing required field \"" + fieldName + "\".")
×
1105
          .asRuntimeException();
×
1106
    }
1107
    return value;
1✔
1108
  }
1109

1110
  /**
1111
   * Adds diagnostic data about internal service state to the provided {@link StringBuilder}.
1112
   * Includes histories of all workflow instances stored in the service.
1113
   */
1114
  public void getDiagnostics(StringBuilder result) {
1115
    store.getDiagnostics(result);
×
1116
  }
×
1117

1118
  /**
1119
   * @deprecated use {@link io.temporal.serviceclient.TestServiceStubs} and {@link
1120
   *     io.temporal.api.testservice.v1.TestServiceGrpc.TestServiceBlockingStub#getCurrentTime(Empty)}
1121
   */
1122
  @Deprecated
1123
  public long currentTimeMillis() {
1124
    return selfAdvancingTimer.getClock().getAsLong();
×
1125
  }
1126

1127
  /** Invokes callback after the specified delay according to internal service clock. */
1128
  public void registerDelayedCallback(Duration delay, Runnable r) {
1129
    store.registerDelayedCallback(delay, r);
1✔
1130
  }
1✔
1131

1132
  /**
1133
   * Disables time skipping. To re-enable call {@link #unlockTimeSkipping(String)}. These calls are
1134
   * counted, so calling unlock does not guarantee that time is going to be skipped immediately as
1135
   * another lock can be holding it.
1136
   *
1137
   * @deprecated use {@link io.temporal.serviceclient.TestServiceStubs} and {@link
1138
   *     io.temporal.api.testservice.v1.TestServiceGrpc.TestServiceBlockingStub#lockTimeSkipping(LockTimeSkippingRequest)}
1139
   */
1140
  @Deprecated
1141
  public void lockTimeSkipping(String caller) {
1142
    selfAdvancingTimer.lockTimeSkipping(caller);
×
1143
  }
×
1144

1145
  /**
1146
   * @deprecated use {@link io.temporal.serviceclient.TestServiceStubs} and {@link
1147
   *     io.temporal.api.testservice.v1.TestServiceGrpc.TestServiceBlockingStub#unlockTimeSkipping(UnlockTimeSkippingRequest)}
1148
   */
1149
  @Deprecated
1150
  public void unlockTimeSkipping(String caller) {
1151
    selfAdvancingTimer.unlockTimeSkipping(caller);
×
1152
  }
×
1153

1154
  /**
1155
   * Unlocks time skipping and blocks the calling thread until internal clock passes the current +
1156
   * duration time.<br>
1157
   * When the time is reached, locks time skipping and returns.<br>
1158
   * Might not block at all due to time skipping. Or might block if the time skipping lock counter
1159
   * was more than 1.
1160
   *
1161
   * @deprecated use {@link io.temporal.serviceclient.TestServiceStubs} and {@link
1162
   *     io.temporal.api.testservice.v1.TestServiceGrpc.TestServiceBlockingStub#unlockTimeSkippingWithSleep(SleepRequest)}
1163
   */
1164
  @Deprecated
1165
  public void sleep(Duration duration) {
1166
    CompletableFuture<Void> result = new CompletableFuture<>();
×
1167
    selfAdvancingTimer.schedule(
×
1168
        duration,
1169
        () -> {
1170
          selfAdvancingTimer.lockTimeSkipping("TestWorkflowService sleep");
×
1171
          result.complete(null);
×
1172
        },
×
1173
        "workflow sleep");
1174
    selfAdvancingTimer.unlockTimeSkipping("TestWorkflowService sleep");
×
1175
    try {
1176
      result.get();
×
1177
    } catch (InterruptedException e) {
×
1178
      Thread.currentThread().interrupt();
×
1179
      throw new RuntimeException(e);
×
1180
    } catch (ExecutionException e) {
×
1181
      throw new RuntimeException(e);
×
1182
    }
×
1183
  }
×
1184

1185
  /**
1186
   * Temporal server times out task queue long poll calls after 1 minute and returns an empty
1187
   * result. After which the request has to be retried by the client if it wants to continue
1188
   * waiting. We emulate this behavior here.
1189
   *
1190
   * <p>If there is a deadline present, for task queue poll requests server will respond inside the
1191
   * deadline. Note that the latest is not applicable for getWorkflowExecutionHistory() long polls.
1192
   *
1193
   * @return minimum between the context deadline and maximum long poll deadline.
1194
   */
1195
  private Deadline getLongPollDeadline() {
1196
    @Nullable Deadline deadline = Context.current().getDeadline();
1✔
1197
    Deadline maximumDeadline =
1✔
1198
        Deadline.after(
1✔
1199
            WorkflowServiceStubsOptions.DEFAULT_SERVER_LONG_POLL_RPC_TIMEOUT.toMillis(),
1✔
1200
            TimeUnit.MILLISECONDS);
1201
    return deadline != null ? deadline.minimum(maximumDeadline) : maximumDeadline;
1✔
1202
  }
1203

1204
  private void handleStatusRuntimeException(
1205
      StatusRuntimeException e, StreamObserver<?> responseObserver) {
1206
    if (e.getStatus().getCode() == Status.Code.INTERNAL) {
1✔
1207
      log.error("unexpected", e);
1✔
1208
    }
1209
    responseObserver.onError(e);
1✔
1210
  }
1✔
1211

1212
  /**
1213
   * Creates an in-memory service along with client stubs for use in Java code. See also
1214
   * createServerOnly and createWithNoGrpcServer.
1215
   *
1216
   * @deprecated use {@link io.temporal.testserver.TestServer#createServer(boolean)} instead and
1217
   *     pass {@code lockTimeSkipping=false} to emulate the behavior of this method
1218
   */
1219
  @Deprecated
1220
  public TestWorkflowService() {
1221
    this(0, true);
×
1222
  }
×
1223

1224
  /**
1225
   * Creates an in-memory service along with client stubs for use in Java code. See also
1226
   * createServerOnly and createWithNoGrpcServer.
1227
   *
1228
   * @deprecated use {@link io.temporal.testserver.TestServer#createServer(boolean, long)} instead
1229
   *     and pass {@code lockTimeSkipping=false} to emulate the behavior of this method
1230
   */
1231
  @Deprecated
1232
  public TestWorkflowService(long initialTimeMillis) {
1233
    this(initialTimeMillis, true);
×
1234
  }
×
1235

1236
  /**
1237
   * Creates an in-memory service along with client stubs for use in Java code. See also
1238
   * createServerOnly and createWithNoGrpcServer.
1239
   *
1240
   * @deprecated use {@link io.temporal.testserver.TestServer#createServer(boolean)} instead
1241
   */
1242
  @Deprecated
1243
  public TestWorkflowService(boolean lockTimeSkipping) {
1244
    this(0, true);
×
1245
    if (lockTimeSkipping) {
×
1246
      this.lockTimeSkipping("constructor");
×
1247
    }
1248
  }
×
1249

1250
  /**
1251
   * Creates an instance of TestWorkflowService that does not manage its own gRPC server. Useful for
1252
   * including in an externally managed gRPC server.
1253
   *
1254
   * @deprecated use {@link TestServicesStarter} to create just the services with gRPC server
1255
   */
1256
  @Deprecated
1257
  public static TestWorkflowService createWithNoGrpcServer() {
1258
    return new TestWorkflowService(0, false);
×
1259
  }
1260

1261
  private TestWorkflowService(long initialTimeMillis, boolean startInProcessServer) {
×
1262
    this.selfAdvancingTimer =
×
1263
        new SelfAdvancingTimerImpl(initialTimeMillis, Clock.systemDefaultZone());
×
1264
    store = new TestWorkflowStoreImpl(this.selfAdvancingTimer);
×
1265
    visibilityStore = new TestVisibilityStoreImpl();
×
1266
    outOfProcessServer = null;
×
1267
    if (startInProcessServer) {
×
1268
      this.inProcessServer = new InProcessGRPCServer(Collections.singletonList(this));
×
1269
      this.workflowServiceStubs =
×
1270
          WorkflowServiceStubs.newServiceStubs(
×
1271
              WorkflowServiceStubsOptions.newBuilder()
×
1272
                  .setChannel(inProcessServer.getChannel())
×
1273
                  .build());
×
1274
    } else {
1275
      this.inProcessServer = null;
×
1276
      this.workflowServiceStubs = null;
×
1277
    }
1278
  }
×
1279

1280
  /**
1281
   * Creates an out-of-process rather than in-process server, and does not set up a client. Useful,
1282
   * for example, if you want to use the test service from other SDKs.
1283
   *
1284
   * @param port the port to listen on
1285
   * @deprecated use {@link io.temporal.testserver.TestServer#createPortBoundServer(int, boolean)}
1286
   *     instead and pass {@code lockTimeSkipping=false} to emulate the behavior of this method
1287
   */
1288
  @Deprecated
1289
  public static TestWorkflowService createServerOnly(int port) {
1290
    TestWorkflowService result = new TestWorkflowService(true, port);
×
1291
    log.info("Server started, listening on " + port);
×
1292
    return result;
×
1293
  }
1294

1295
  private TestWorkflowService(boolean isOutOfProc, int port) {
×
1296
    // isOutOfProc is just here to make unambiguous constructor overloading.
1297
    Preconditions.checkState(isOutOfProc, "Impossible.");
×
1298
    inProcessServer = null;
×
1299
    workflowServiceStubs = null;
×
1300
    this.selfAdvancingTimer = new SelfAdvancingTimerImpl(0, Clock.systemDefaultZone());
×
1301
    store = new TestWorkflowStoreImpl(selfAdvancingTimer);
×
1302
    visibilityStore = new TestVisibilityStoreImpl();
×
1303
    try {
1304
      ServerBuilder<?> serverBuilder =
×
1305
          Grpc.newServerBuilderForPort(port, InsecureServerCredentials.create());
×
1306
      GRPCServerHelper.registerServicesAndHealthChecks(
×
1307
          Collections.singletonList(this), serverBuilder);
×
1308
      outOfProcessServer = serverBuilder.build().start();
×
1309
    } catch (IOException e) {
×
1310
      throw new RuntimeException(e);
×
1311
    }
×
1312
  }
×
1313

1314
  @Deprecated
1315
  public WorkflowServiceStubs newClientStub() {
1316
    if (workflowServiceStubs == null) {
×
1317
      throw new RuntimeException(
×
1318
          "Cannot get a client when you created your TestWorkflowService with createServerOnly.");
1319
    }
1320
    return workflowServiceStubs;
×
1321
  }
1322

1323
  private static StatusRuntimeException createInvalidArgument(String description) {
1324
    throw Status.INVALID_ARGUMENT.withDescription(description).asRuntimeException();
1✔
1325
  }
1326
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc