• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

temporalio / sdk-java / #164

pending completion
#164

push

github-actions

web-flow
Release v1.19.1 (#1714)

17057 of 20907 relevant lines covered (81.59%)

0.82 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

69.62
/temporal-test-server/src/main/java/io/temporal/internal/testservice/TestWorkflowService.java
1
/*
2
 * Copyright (C) 2022 Temporal Technologies, Inc. All Rights Reserved.
3
 *
4
 * Copyright (C) 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved.
5
 *
6
 * Modifications copyright (C) 2017 Uber Technologies, Inc.
7
 *
8
 * Licensed under the Apache License, Version 2.0 (the "License");
9
 * you may not use this material except in compliance with the License.
10
 * You may obtain a copy of the License at
11
 *
12
 *   http://www.apache.org/licenses/LICENSE-2.0
13
 *
14
 * Unless required by applicable law or agreed to in writing, software
15
 * distributed under the License is distributed on an "AS IS" BASIS,
16
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17
 * See the License for the specific language governing permissions and
18
 * limitations under the License.
19
 */
20

21
package io.temporal.internal.testservice;
22

23
import static io.temporal.internal.testservice.CronUtils.getBackoffInterval;
24

25
import com.google.common.base.Preconditions;
26
import com.google.common.base.Throwables;
27
import com.google.protobuf.ByteString;
28
import com.google.protobuf.Empty;
29
import com.google.protobuf.Timestamp;
30
import com.google.protobuf.util.Timestamps;
31
import io.grpc.*;
32
import io.grpc.stub.StreamObserver;
33
import io.temporal.api.command.v1.SignalExternalWorkflowExecutionCommandAttributes;
34
import io.temporal.api.common.v1.Payload;
35
import io.temporal.api.common.v1.Payloads;
36
import io.temporal.api.common.v1.RetryPolicy;
37
import io.temporal.api.common.v1.WorkflowExecution;
38
import io.temporal.api.enums.v1.NamespaceState;
39
import io.temporal.api.enums.v1.SignalExternalWorkflowExecutionFailedCause;
40
import io.temporal.api.enums.v1.WorkflowExecutionStatus;
41
import io.temporal.api.enums.v1.WorkflowIdReusePolicy;
42
import io.temporal.api.errordetails.v1.WorkflowExecutionAlreadyStartedFailure;
43
import io.temporal.api.failure.v1.Failure;
44
import io.temporal.api.history.v1.WorkflowExecutionContinuedAsNewEventAttributes;
45
import io.temporal.api.namespace.v1.NamespaceInfo;
46
import io.temporal.api.testservice.v1.LockTimeSkippingRequest;
47
import io.temporal.api.testservice.v1.SleepRequest;
48
import io.temporal.api.testservice.v1.UnlockTimeSkippingRequest;
49
import io.temporal.api.workflow.v1.WorkflowExecutionInfo;
50
import io.temporal.api.workflowservice.v1.*;
51
import io.temporal.internal.common.ProtobufTimeUtils;
52
import io.temporal.internal.testservice.TestWorkflowStore.WorkflowState;
53
import io.temporal.serviceclient.StatusUtils;
54
import io.temporal.serviceclient.WorkflowServiceStubs;
55
import io.temporal.serviceclient.WorkflowServiceStubsOptions;
56
import java.io.Closeable;
57
import java.io.IOException;
58
import java.time.Clock;
59
import java.time.Duration;
60
import java.util.*;
61
import java.util.concurrent.*;
62
import java.util.concurrent.locks.Lock;
63
import java.util.concurrent.locks.ReentrantLock;
64
import javax.annotation.Nonnull;
65
import javax.annotation.Nullable;
66
import org.slf4j.Logger;
67
import org.slf4j.LoggerFactory;
68

69
/**
70
 * In memory implementation of the Workflow Service. To be used for testing purposes only.
71
 *
72
 * <p>Do not use directly, instead use {@link io.temporal.testing.TestWorkflowEnvironment}.
73
 */
74
public final class TestWorkflowService extends WorkflowServiceGrpc.WorkflowServiceImplBase
75
    implements Closeable {
76
  private static final Logger log = LoggerFactory.getLogger(TestWorkflowService.class);
1✔
77
  private final Map<ExecutionId, TestWorkflowMutableState> executions = new HashMap<>();
1✔
78
  // key->WorkflowId
79
  private final Map<WorkflowId, TestWorkflowMutableState> executionsByWorkflowId = new HashMap<>();
1✔
80
  private final ExecutorService executor = Executors.newCachedThreadPool();
1✔
81
  private final Lock lock = new ReentrantLock();
1✔
82

83
  private final TestWorkflowStore store;
84
  private final TestVisibilityStore visibilityStore;
85
  private final SelfAdvancingTimer selfAdvancingTimer;
86

87
  private final ScheduledExecutorService backgroundScheduler =
1✔
88
      Executors.newSingleThreadScheduledExecutor();
1✔
89

90
  private final Server outOfProcessServer;
91
  private final InProcessGRPCServer inProcessServer;
92
  private final WorkflowServiceStubs workflowServiceStubs;
93

94
  TestWorkflowService(
95
      TestWorkflowStore store,
96
      TestVisibilityStore visibilityStore,
97
      SelfAdvancingTimer selfAdvancingTimer) {
1✔
98
    this.store = store;
1✔
99
    this.visibilityStore = visibilityStore;
1✔
100
    this.selfAdvancingTimer = selfAdvancingTimer;
1✔
101
    this.outOfProcessServer = null;
1✔
102
    this.inProcessServer = null;
1✔
103
    this.workflowServiceStubs = null;
1✔
104
  }
1✔
105

106
  @Override
107
  public void close() {
108
    log.debug("Shutting down TestWorkflowService");
1✔
109

110
    log.debug("Shutting down background scheduler");
1✔
111
    backgroundScheduler.shutdown();
1✔
112

113
    if (outOfProcessServer != null) {
1✔
114
      log.info("Shutting down out-of-process GRPC server");
×
115
      outOfProcessServer.shutdown();
×
116
    }
117

118
    if (workflowServiceStubs != null) {
1✔
119
      workflowServiceStubs.shutdown();
×
120
    }
121

122
    if (inProcessServer != null) {
1✔
123
      log.info("Shutting down in-process GRPC server");
×
124
      inProcessServer.shutdown();
×
125
    }
126

127
    executor.shutdown();
1✔
128

129
    try {
130
      executor.awaitTermination(1, TimeUnit.SECONDS);
1✔
131

132
      if (outOfProcessServer != null) {
1✔
133
        outOfProcessServer.awaitTermination(1, TimeUnit.SECONDS);
×
134
      }
135

136
      if (workflowServiceStubs != null) {
1✔
137
        workflowServiceStubs.awaitTermination(1, TimeUnit.SECONDS);
×
138
      }
139

140
      if (inProcessServer != null) {
1✔
141
        inProcessServer.awaitTermination(1, TimeUnit.SECONDS);
×
142
      }
143

144
    } catch (InterruptedException e) {
×
145
      Thread.currentThread().interrupt();
×
146
      log.debug("shutdown interrupted", e);
×
147
    }
1✔
148

149
    store.close();
1✔
150
  }
1✔
151

152
  private TestWorkflowMutableState getMutableState(ExecutionId executionId) {
153
    return getMutableState(executionId, true);
1✔
154
  }
155

156
  private TestWorkflowMutableState getMutableState(ExecutionId executionId, boolean failNotExists) {
157
    lock.lock();
1✔
158
    try {
159
      if (executionId.getExecution().getRunId().isEmpty()) {
1✔
160
        return getMutableState(executionId.getWorkflowId(), failNotExists);
1✔
161
      }
162
      TestWorkflowMutableState mutableState = executions.get(executionId);
1✔
163
      if (mutableState == null && failNotExists) {
1✔
164
        throw Status.NOT_FOUND
1✔
165
            .withDescription(
1✔
166
                "Execution \""
167
                    + executionId
168
                    + "\" not found in mutable state. Known executions: "
169
                    + executions.values()
1✔
170
                    + ", service="
171
                    + this)
172
            .asRuntimeException();
1✔
173
      }
174
      return mutableState;
1✔
175
    } finally {
176
      lock.unlock();
1✔
177
    }
178
  }
179

180
  private TestWorkflowMutableState getMutableState(WorkflowId workflowId, boolean failNotExists) {
181
    lock.lock();
1✔
182
    try {
183
      TestWorkflowMutableState mutableState = executionsByWorkflowId.get(workflowId);
1✔
184
      if (mutableState == null && failNotExists) {
1✔
185
        throw Status.NOT_FOUND
1✔
186
            .withDescription("Execution not found in mutable state: " + workflowId)
1✔
187
            .asRuntimeException();
1✔
188
      }
189
      return mutableState;
1✔
190
    } finally {
191
      lock.unlock();
1✔
192
    }
193
  }
194

195
  @Override
196
  public void startWorkflowExecution(
197
      StartWorkflowExecutionRequest request,
198
      StreamObserver<StartWorkflowExecutionResponse> responseObserver) {
199
    try {
200
      Duration backoffInterval = getBackoffInterval(request.getCronSchedule(), store.currentTime());
1✔
201
      StartWorkflowExecutionResponse response =
1✔
202
          startWorkflowExecutionImpl(
1✔
203
              request, backoffInterval, Optional.empty(), OptionalLong.empty(), null);
1✔
204
      responseObserver.onNext(response);
1✔
205
      responseObserver.onCompleted();
1✔
206
    } catch (StatusRuntimeException e) {
1✔
207
      handleStatusRuntimeException(e, responseObserver);
1✔
208
    }
1✔
209
  }
1✔
210

211
  StartWorkflowExecutionResponse startWorkflowExecutionImpl(
212
      StartWorkflowExecutionRequest startRequest,
213
      Duration backoffStartInterval,
214
      Optional<TestWorkflowMutableState> parent,
215
      OptionalLong parentChildInitiatedEventId,
216
      @Nullable SignalWorkflowExecutionRequest signalWithStartSignal) {
217
    String requestWorkflowId = requireNotNull("WorkflowId", startRequest.getWorkflowId());
1✔
218
    String namespace = requireNotNull("Namespace", startRequest.getNamespace());
1✔
219
    WorkflowId workflowId = new WorkflowId(namespace, requestWorkflowId);
1✔
220
    TestWorkflowMutableState existing;
221
    lock.lock();
1✔
222
    try {
223
      String newRunId = UUID.randomUUID().toString();
1✔
224
      existing = executionsByWorkflowId.get(workflowId);
1✔
225
      if (existing != null) {
1✔
226
        WorkflowExecutionStatus status = existing.getWorkflowExecutionStatus();
1✔
227
        WorkflowIdReusePolicy policy = startRequest.getWorkflowIdReusePolicy();
1✔
228

229
        if (status == WorkflowExecutionStatus.WORKFLOW_EXECUTION_STATUS_RUNNING
1✔
230
            && policy == WorkflowIdReusePolicy.WORKFLOW_ID_REUSE_POLICY_TERMINATE_IF_RUNNING) {
231
          existing.terminateWorkflowExecution(
1✔
232
              TerminateWorkflowExecutionRequest.newBuilder()
1✔
233
                  .setNamespace(startRequest.getNamespace())
1✔
234
                  .setWorkflowExecution(existing.getExecutionId().getExecution())
1✔
235
                  .setReason("TerminateIfRunning WorkflowIdReusePolicy Policy")
1✔
236
                  .setIdentity("history-service")
1✔
237
                  .setDetails(
1✔
238
                      Payloads.newBuilder()
1✔
239
                          .addPayloads(
1✔
240
                              Payload.newBuilder()
1✔
241
                                  .setData(
1✔
242
                                      ByteString.copyFromUtf8(
1✔
243
                                          String.format("terminated by new runID: %s", newRunId)))
1✔
244
                                  .build())
1✔
245
                          .build())
1✔
246
                  .build());
1✔
247
        } else if (status == WorkflowExecutionStatus.WORKFLOW_EXECUTION_STATUS_RUNNING
1✔
248
            || policy == WorkflowIdReusePolicy.WORKFLOW_ID_REUSE_POLICY_REJECT_DUPLICATE) {
249
          return throwDuplicatedWorkflow(startRequest, existing);
×
250
        } else if (policy
1✔
251
                == WorkflowIdReusePolicy.WORKFLOW_ID_REUSE_POLICY_ALLOW_DUPLICATE_FAILED_ONLY
252
            && (status == WorkflowExecutionStatus.WORKFLOW_EXECUTION_STATUS_COMPLETED
253
                || status == WorkflowExecutionStatus.WORKFLOW_EXECUTION_STATUS_CONTINUED_AS_NEW)) {
254
          return throwDuplicatedWorkflow(startRequest, existing);
×
255
        }
256
      }
257
      Optional<TestServiceRetryState> retryState;
258
      Optional<Failure> lastFailure = Optional.empty();
1✔
259
      if (startRequest.hasRetryPolicy()) {
1✔
260
        Duration expirationInterval =
1✔
261
            ProtobufTimeUtils.toJavaDuration(startRequest.getWorkflowExecutionTimeout());
1✔
262
        retryState = newRetryStateLocked(startRequest.getRetryPolicy(), expirationInterval);
1✔
263
        if (retryState.isPresent()) {
1✔
264
          lastFailure = retryState.get().getPreviousRunFailure();
1✔
265
        }
266
      } else {
1✔
267
        retryState = Optional.empty();
1✔
268
      }
269
      return startWorkflowExecutionNoRunningCheckLocked(
1✔
270
          startRequest,
271
          newRunId,
272
          // it's the first execution in the continue-as-new chain, so firstExecutionRunId =
273
          // newRunId
274
          newRunId,
275
          Optional.empty(),
1✔
276
          retryState,
277
          backoffStartInterval,
278
          null,
279
          lastFailure,
280
          parent,
281
          parentChildInitiatedEventId,
282
          signalWithStartSignal,
283
          workflowId);
284
    } finally {
285
      lock.unlock();
1✔
286
    }
287
  }
288

289
  private Optional<TestServiceRetryState> newRetryStateLocked(
290
      RetryPolicy retryPolicy, Duration expirationInterval) {
291
    Timestamp expirationTime =
292
        expirationInterval.isZero()
1✔
293
            ? Timestamps.fromNanos(0)
1✔
294
            : Timestamps.add(
1✔
295
                store.currentTime(), ProtobufTimeUtils.toProtoDuration(expirationInterval));
1✔
296
    return Optional.of(new TestServiceRetryState(retryPolicy, expirationTime));
1✔
297
  }
298

299
  private StartWorkflowExecutionResponse throwDuplicatedWorkflow(
300
      StartWorkflowExecutionRequest startRequest, TestWorkflowMutableState existing) {
301
    WorkflowExecution execution = existing.getExecutionId().getExecution();
1✔
302
    WorkflowExecutionAlreadyStartedFailure error =
303
        WorkflowExecutionAlreadyStartedFailure.newBuilder()
1✔
304
            .setRunId(execution.getRunId())
1✔
305
            .setStartRequestId(startRequest.getRequestId())
1✔
306
            .build();
1✔
307
    throw StatusUtils.newException(
1✔
308
        Status.ALREADY_EXISTS.withDescription(
1✔
309
            String.format(
1✔
310
                "WorkflowId: %s, " + "RunId: %s", execution.getWorkflowId(), execution.getRunId())),
1✔
311
        error,
312
        WorkflowExecutionAlreadyStartedFailure.getDescriptor());
1✔
313
  }
314

315
  private StartWorkflowExecutionResponse startWorkflowExecutionNoRunningCheckLocked(
316
      StartWorkflowExecutionRequest startRequest,
317
      @Nonnull String runId,
318
      @Nonnull String firstExecutionRunId,
319
      Optional<String> continuedExecutionRunId,
320
      Optional<TestServiceRetryState> retryState,
321
      Duration backoffStartInterval,
322
      Payloads lastCompletionResult,
323
      Optional<Failure> lastFailure,
324
      Optional<TestWorkflowMutableState> parent,
325
      OptionalLong parentChildInitiatedEventId,
326
      @Nullable SignalWorkflowExecutionRequest signalWithStartSignal,
327
      WorkflowId workflowId) {
328
    String namespace = startRequest.getNamespace();
1✔
329
    TestWorkflowMutableState mutableState =
1✔
330
        new TestWorkflowMutableStateImpl(
331
            startRequest,
332
            firstExecutionRunId,
333
            runId,
334
            retryState,
335
            backoffStartInterval,
336
            lastCompletionResult,
337
            lastFailure,
338
            parent,
339
            parentChildInitiatedEventId,
340
            continuedExecutionRunId,
341
            this,
342
            store,
343
            visibilityStore,
344
            selfAdvancingTimer);
345
    WorkflowExecution execution = mutableState.getExecutionId().getExecution();
1✔
346
    ExecutionId executionId = new ExecutionId(namespace, execution);
1✔
347
    executionsByWorkflowId.put(workflowId, mutableState);
1✔
348
    executions.put(executionId, mutableState);
1✔
349

350
    PollWorkflowTaskQueueRequest eagerWorkflowTaskPollRequest =
351
        startRequest.getRequestEagerExecution()
1✔
352
            ? PollWorkflowTaskQueueRequest.newBuilder()
1✔
353
                .setIdentity(startRequest.getIdentity())
1✔
354
                .setNamespace(startRequest.getNamespace())
1✔
355
                .setTaskQueue(startRequest.getTaskQueue())
1✔
356
                .build()
1✔
357
            : null;
1✔
358

359
    @Nullable
360
    PollWorkflowTaskQueueResponse eagerWorkflowTask =
1✔
361
        mutableState.startWorkflow(
1✔
362
            continuedExecutionRunId.isPresent(),
1✔
363
            signalWithStartSignal,
364
            eagerWorkflowTaskPollRequest);
365
    StartWorkflowExecutionResponse.Builder response =
366
        StartWorkflowExecutionResponse.newBuilder().setRunId(execution.getRunId());
1✔
367
    if (eagerWorkflowTask != null) {
1✔
368
      response.setEagerWorkflowTask(eagerWorkflowTask);
1✔
369
    }
370
    return response.build();
1✔
371
  }
372

373
  @Override
374
  public void getWorkflowExecutionHistory(
375
      GetWorkflowExecutionHistoryRequest getRequest,
376
      StreamObserver<GetWorkflowExecutionHistoryResponse> responseObserver) {
377
    ExecutionId executionId = new ExecutionId(getRequest.getNamespace(), getRequest.getExecution());
1✔
378
    executor.execute(
1✔
379
        // preserving gRPC context deadline between threads
380
        Context.current()
1✔
381
            .wrap(
1✔
382
                () -> {
383
                  try {
384
                    TestWorkflowMutableState mutableState = getMutableState(executionId);
1✔
385
                    responseObserver.onNext(
1✔
386
                        store.getWorkflowExecutionHistory(
1✔
387
                            mutableState.getExecutionId(),
1✔
388
                            getRequest,
389
                            // We explicitly don't try to respond inside the context deadline.
390
                            // If we try to fit into the context deadline, the deadline may be not
391
                            // expired on the client side and an empty response will lead to a new
392
                            // request, making the client hammer the server at the tail end of the
393
                            // deadline.
394
                            // So this call is designed to wait fully till the end of the
395
                            // context deadline and throw DEADLINE_EXCEEDED if the deadline is less
396
                            // than 20s.
397
                            // If it's longer than 20 seconds - we return an empty result.
398
                            Deadline.after(20, TimeUnit.SECONDS)));
1✔
399
                    responseObserver.onCompleted();
1✔
400
                  } catch (StatusRuntimeException e) {
1✔
401
                    if (e.getStatus().getCode() == Status.Code.INTERNAL) {
1✔
402
                      log.error("unexpected", e);
×
403
                    }
404
                    responseObserver.onError(e);
1✔
405
                  } catch (Exception e) {
×
406
                    log.error("unexpected", e);
×
407
                    responseObserver.onError(e);
×
408
                  }
1✔
409
                }));
1✔
410
  }
1✔
411

412
  private <T> T pollTaskQueue(Context ctx, Future<T> futureValue)
413
      throws ExecutionException, InterruptedException {
414
    final Context.CancellationListener canceler = context -> futureValue.cancel(true);
1✔
415
    ctx.addListener(canceler, this.backgroundScheduler);
1✔
416
    try {
417
      return futureValue.get();
1✔
418
    } finally {
419
      ctx.removeListener(canceler);
1✔
420
    }
421
  }
422

423
  @Override
424
  public void pollWorkflowTaskQueue(
425
      PollWorkflowTaskQueueRequest pollRequest,
426
      StreamObserver<PollWorkflowTaskQueueResponse> responseObserver) {
427
    try (Context.CancellableContext ctx = deadlineCtx(getLongPollDeadline())) {
1✔
428
      PollWorkflowTaskQueueResponse.Builder task;
429
      try {
430
        task = pollTaskQueue(ctx, store.pollWorkflowTaskQueue(pollRequest));
1✔
431
      } catch (ExecutionException e) {
×
432
        responseObserver.onError(e);
×
433
        return;
×
434
      } catch (InterruptedException e) {
×
435
        Thread.currentThread().interrupt();
×
436
        responseObserver.onNext(PollWorkflowTaskQueueResponse.getDefaultInstance());
×
437
        responseObserver.onCompleted();
×
438
        return;
×
439
      } catch (CancellationException e) {
1✔
440
        responseObserver.onNext(PollWorkflowTaskQueueResponse.getDefaultInstance());
1✔
441
        responseObserver.onCompleted();
1✔
442
        return;
1✔
443
      }
1✔
444

445
      ExecutionId executionId =
1✔
446
          new ExecutionId(pollRequest.getNamespace(), task.getWorkflowExecution());
1✔
447
      TestWorkflowMutableState mutableState = getMutableState(executionId);
1✔
448
      try {
449
        mutableState.startWorkflowTask(task, pollRequest);
1✔
450
        // The task always has the original task queue that was created as part of the response.
451
        // This may be a different task queue than the task queue it was scheduled on, as in the
452
        // case of sticky execution.
453
        task.setWorkflowExecutionTaskQueue(mutableState.getStartRequest().getTaskQueue());
1✔
454
        PollWorkflowTaskQueueResponse response = task.build();
1✔
455
        responseObserver.onNext(response);
1✔
456
        responseObserver.onCompleted();
1✔
457
      } catch (StatusRuntimeException e) {
×
458
        if (e.getStatus().getCode() == Status.Code.NOT_FOUND) {
×
459
          if (log.isDebugEnabled()) {
×
460
            log.debug("Skipping outdated workflow task for " + executionId, e);
×
461
          }
462
          // The real service doesn't return this call on outdated task.
463
          // For simplicity, we return an empty result here.
464
          responseObserver.onNext(PollWorkflowTaskQueueResponse.getDefaultInstance());
×
465
          responseObserver.onCompleted();
×
466
        } else {
467
          if (e.getStatus().getCode() == Status.Code.INTERNAL) {
×
468
            log.error("unexpected", e);
×
469
          }
470
          responseObserver.onError(e);
×
471
        }
472
      }
1✔
473
    }
1✔
474
  }
1✔
475

476
  @Override
477
  public void respondWorkflowTaskCompleted(
478
      RespondWorkflowTaskCompletedRequest request,
479
      StreamObserver<RespondWorkflowTaskCompletedResponse> responseObserver) {
480
    try {
481
      WorkflowTaskToken taskToken = WorkflowTaskToken.fromBytes(request.getTaskToken());
1✔
482
      TestWorkflowMutableState mutableState = getMutableState(taskToken.getExecutionId());
1✔
483
      mutableState.completeWorkflowTask(taskToken.getHistorySize(), request);
1✔
484
      responseObserver.onNext(RespondWorkflowTaskCompletedResponse.getDefaultInstance());
1✔
485
      responseObserver.onCompleted();
1✔
486
    } catch (StatusRuntimeException e) {
1✔
487
      handleStatusRuntimeException(e, responseObserver);
1✔
488
    } catch (Throwable e) {
×
489
      responseObserver.onError(
×
490
          Status.INTERNAL
491
              .withDescription(Throwables.getStackTraceAsString(e))
×
492
              .withCause(e)
×
493
              .asRuntimeException());
×
494
    }
1✔
495
  }
1✔
496

497
  @Override
498
  public void respondWorkflowTaskFailed(
499
      RespondWorkflowTaskFailedRequest failedRequest,
500
      StreamObserver<RespondWorkflowTaskFailedResponse> responseObserver) {
501
    try {
502
      WorkflowTaskToken taskToken = WorkflowTaskToken.fromBytes(failedRequest.getTaskToken());
1✔
503
      TestWorkflowMutableState mutableState = getMutableState(taskToken.getExecutionId());
1✔
504
      mutableState.failWorkflowTask(failedRequest);
1✔
505
      responseObserver.onNext(RespondWorkflowTaskFailedResponse.getDefaultInstance());
1✔
506
      responseObserver.onCompleted();
1✔
507
    } catch (StatusRuntimeException e) {
×
508
      handleStatusRuntimeException(e, responseObserver);
×
509
    }
1✔
510
  }
1✔
511

512
  private Context.CancellableContext deadlineCtx(Deadline deadline) {
513
    return Context.current().withDeadline(deadline, this.backgroundScheduler);
1✔
514
  }
515

516
  @Override
517
  public void pollActivityTaskQueue(
518
      PollActivityTaskQueueRequest pollRequest,
519
      StreamObserver<PollActivityTaskQueueResponse> responseObserver) {
520
    try (Context.CancellableContext ctx = deadlineCtx(getLongPollDeadline())) {
1✔
521

522
      PollActivityTaskQueueResponse.Builder task;
523
      try {
524
        task = pollTaskQueue(ctx, store.pollActivityTaskQueue(pollRequest));
1✔
525
      } catch (ExecutionException e) {
×
526
        responseObserver.onError(e);
×
527
        return;
×
528
      } catch (InterruptedException e) {
×
529
        Thread.currentThread().interrupt();
×
530
        responseObserver.onNext(PollActivityTaskQueueResponse.getDefaultInstance());
×
531
        responseObserver.onCompleted();
×
532
        return;
×
533
      } catch (CancellationException e) {
1✔
534
        responseObserver.onNext(PollActivityTaskQueueResponse.getDefaultInstance());
1✔
535
        responseObserver.onCompleted();
1✔
536
        return;
1✔
537
      }
1✔
538

539
      ExecutionId executionId =
1✔
540
          new ExecutionId(pollRequest.getNamespace(), task.getWorkflowExecution());
1✔
541
      TestWorkflowMutableState mutableState = getMutableState(executionId);
1✔
542
      try {
543
        mutableState.startActivityTask(task, pollRequest);
1✔
544
        responseObserver.onNext(task.build());
1✔
545
        responseObserver.onCompleted();
1✔
546
      } catch (StatusRuntimeException e) {
1✔
547
        if (e.getStatus().getCode() == Status.Code.NOT_FOUND) {
1✔
548
          if (log.isDebugEnabled()) {
1✔
549
            log.debug("Skipping outdated activity task for " + executionId, e);
×
550
          }
551
          responseObserver.onNext(PollActivityTaskQueueResponse.getDefaultInstance());
1✔
552
          responseObserver.onCompleted();
1✔
553
        } else {
554
          if (e.getStatus().getCode() == Status.Code.INTERNAL) {
×
555
            log.error("unexpected", e);
×
556
          }
557
          responseObserver.onError(e);
×
558
        }
559
      }
1✔
560
    }
1✔
561
  }
1✔
562

563
  @Override
564
  public void recordActivityTaskHeartbeat(
565
      RecordActivityTaskHeartbeatRequest heartbeatRequest,
566
      StreamObserver<RecordActivityTaskHeartbeatResponse> responseObserver) {
567
    try {
568
      ActivityTaskToken activityTaskToken =
1✔
569
          ActivityTaskToken.fromBytes(heartbeatRequest.getTaskToken());
1✔
570
      TestWorkflowMutableState mutableState = getMutableState(activityTaskToken.getExecutionId());
1✔
571
      boolean cancelRequested =
1✔
572
          mutableState.heartbeatActivityTask(
1✔
573
              activityTaskToken.getScheduledEventId(), heartbeatRequest.getDetails());
1✔
574
      responseObserver.onNext(
1✔
575
          RecordActivityTaskHeartbeatResponse.newBuilder()
1✔
576
              .setCancelRequested(cancelRequested)
1✔
577
              .build());
1✔
578
      responseObserver.onCompleted();
1✔
579
    } catch (StatusRuntimeException e) {
1✔
580
      handleStatusRuntimeException(e, responseObserver);
1✔
581
    }
1✔
582
  }
1✔
583

584
  @Override
585
  public void recordActivityTaskHeartbeatById(
586
      RecordActivityTaskHeartbeatByIdRequest heartbeatRequest,
587
      StreamObserver<RecordActivityTaskHeartbeatByIdResponse> responseObserver) {
588
    try {
589
      ExecutionId execution =
×
590
          new ExecutionId(
591
              heartbeatRequest.getNamespace(),
×
592
              heartbeatRequest.getWorkflowId(),
×
593
              heartbeatRequest.getRunId());
×
594
      TestWorkflowMutableState mutableState = getMutableState(execution);
×
595
      boolean cancelRequested =
×
596
          mutableState.heartbeatActivityTaskById(
×
597
              heartbeatRequest.getActivityId(),
×
598
              heartbeatRequest.getDetails(),
×
599
              heartbeatRequest.getIdentity());
×
600
      responseObserver.onNext(
×
601
          RecordActivityTaskHeartbeatByIdResponse.newBuilder()
×
602
              .setCancelRequested(cancelRequested)
×
603
              .build());
×
604
      responseObserver.onCompleted();
×
605
    } catch (StatusRuntimeException e) {
×
606
      handleStatusRuntimeException(e, responseObserver);
×
607
    }
×
608
  }
×
609

610
  @Override
611
  public void respondActivityTaskCompleted(
612
      RespondActivityTaskCompletedRequest completeRequest,
613
      StreamObserver<RespondActivityTaskCompletedResponse> responseObserver) {
614
    try {
615
      ActivityTaskToken activityTaskToken =
1✔
616
          ActivityTaskToken.fromBytes(completeRequest.getTaskToken());
1✔
617
      TestWorkflowMutableState mutableState = getMutableState(activityTaskToken.getExecutionId());
1✔
618
      mutableState.completeActivityTask(activityTaskToken.getScheduledEventId(), completeRequest);
1✔
619
      responseObserver.onNext(RespondActivityTaskCompletedResponse.getDefaultInstance());
1✔
620
      responseObserver.onCompleted();
1✔
621
    } catch (StatusRuntimeException e) {
1✔
622
      handleStatusRuntimeException(e, responseObserver);
1✔
623
    }
1✔
624
  }
1✔
625

626
  @Override
627
  public void respondActivityTaskCompletedById(
628
      RespondActivityTaskCompletedByIdRequest completeRequest,
629
      StreamObserver<RespondActivityTaskCompletedByIdResponse> responseObserver) {
630
    try {
631
      ExecutionId executionId =
×
632
          new ExecutionId(
633
              completeRequest.getNamespace(),
×
634
              completeRequest.getWorkflowId(),
×
635
              completeRequest.getRunId());
×
636
      TestWorkflowMutableState mutableState = getMutableState(executionId);
×
637
      mutableState.completeActivityTaskById(completeRequest.getActivityId(), completeRequest);
×
638
      responseObserver.onNext(RespondActivityTaskCompletedByIdResponse.getDefaultInstance());
×
639
      responseObserver.onCompleted();
×
640
    } catch (StatusRuntimeException e) {
×
641
      handleStatusRuntimeException(e, responseObserver);
×
642
    }
×
643
  }
×
644

645
  @Override
646
  public void respondActivityTaskFailed(
647
      RespondActivityTaskFailedRequest failRequest,
648
      StreamObserver<RespondActivityTaskFailedResponse> responseObserver) {
649
    try {
650
      ActivityTaskToken activityTaskToken = ActivityTaskToken.fromBytes(failRequest.getTaskToken());
1✔
651
      TestWorkflowMutableState mutableState = getMutableState(activityTaskToken.getExecutionId());
1✔
652
      mutableState.failActivityTask(activityTaskToken.getScheduledEventId(), failRequest);
1✔
653
      responseObserver.onNext(RespondActivityTaskFailedResponse.getDefaultInstance());
1✔
654
      responseObserver.onCompleted();
1✔
655
    } catch (StatusRuntimeException e) {
1✔
656
      handleStatusRuntimeException(e, responseObserver);
1✔
657
    }
1✔
658
  }
1✔
659

660
  @Override
661
  public void respondActivityTaskFailedById(
662
      RespondActivityTaskFailedByIdRequest failRequest,
663
      StreamObserver<RespondActivityTaskFailedByIdResponse> responseObserver) {
664
    try {
665
      ExecutionId executionId =
×
666
          new ExecutionId(
667
              failRequest.getNamespace(), failRequest.getWorkflowId(), failRequest.getRunId());
×
668
      TestWorkflowMutableState mutableState = getMutableState(executionId);
×
669
      mutableState.failActivityTaskById(failRequest.getActivityId(), failRequest);
×
670
      responseObserver.onNext(RespondActivityTaskFailedByIdResponse.getDefaultInstance());
×
671
      responseObserver.onCompleted();
×
672
    } catch (StatusRuntimeException e) {
×
673
      handleStatusRuntimeException(e, responseObserver);
×
674
    }
×
675
  }
×
676

677
  @Override
678
  public void respondActivityTaskCanceled(
679
      RespondActivityTaskCanceledRequest canceledRequest,
680
      StreamObserver<RespondActivityTaskCanceledResponse> responseObserver) {
681
    try {
682
      ActivityTaskToken activityTaskToken =
1✔
683
          ActivityTaskToken.fromBytes(canceledRequest.getTaskToken());
1✔
684
      TestWorkflowMutableState mutableState = getMutableState(activityTaskToken.getExecutionId());
1✔
685
      mutableState.cancelActivityTask(activityTaskToken.getScheduledEventId(), canceledRequest);
1✔
686
      responseObserver.onNext(RespondActivityTaskCanceledResponse.getDefaultInstance());
1✔
687
      responseObserver.onCompleted();
1✔
688
    } catch (StatusRuntimeException e) {
×
689
      handleStatusRuntimeException(e, responseObserver);
×
690
    }
1✔
691
  }
1✔
692

693
  @Override
694
  public void respondActivityTaskCanceledById(
695
      RespondActivityTaskCanceledByIdRequest canceledRequest,
696
      StreamObserver<RespondActivityTaskCanceledByIdResponse> responseObserver) {
697
    try {
698
      ExecutionId executionId =
×
699
          new ExecutionId(
700
              canceledRequest.getNamespace(),
×
701
              canceledRequest.getWorkflowId(),
×
702
              canceledRequest.getRunId());
×
703
      TestWorkflowMutableState mutableState = getMutableState(executionId);
×
704
      mutableState.cancelActivityTaskById(canceledRequest.getActivityId(), canceledRequest);
×
705
      responseObserver.onNext(RespondActivityTaskCanceledByIdResponse.getDefaultInstance());
×
706
      responseObserver.onCompleted();
×
707
    } catch (StatusRuntimeException e) {
×
708
      handleStatusRuntimeException(e, responseObserver);
×
709
    }
×
710
  }
×
711

712
  @Override
713
  public void requestCancelWorkflowExecution(
714
      RequestCancelWorkflowExecutionRequest cancelRequest,
715
      StreamObserver<RequestCancelWorkflowExecutionResponse> responseObserver) {
716
    try {
717
      requestCancelWorkflowExecution(cancelRequest, Optional.empty());
1✔
718
      responseObserver.onNext(RequestCancelWorkflowExecutionResponse.getDefaultInstance());
1✔
719
      responseObserver.onCompleted();
1✔
720
    } catch (StatusRuntimeException e) {
1✔
721
      handleStatusRuntimeException(e, responseObserver);
1✔
722
    }
1✔
723
  }
1✔
724

725
  void requestCancelWorkflowExecution(
726
      RequestCancelWorkflowExecutionRequest cancelRequest,
727
      Optional<TestWorkflowMutableStateImpl.CancelExternalWorkflowExecutionCallerInfo> callerInfo) {
728
    ExecutionId executionId =
1✔
729
        new ExecutionId(cancelRequest.getNamespace(), cancelRequest.getWorkflowExecution());
1✔
730
    TestWorkflowMutableState mutableState = getMutableState(executionId);
1✔
731
    mutableState.requestCancelWorkflowExecution(cancelRequest, callerInfo);
1✔
732
  }
1✔
733

734
  @Override
735
  public void terminateWorkflowExecution(
736
      TerminateWorkflowExecutionRequest request,
737
      StreamObserver<TerminateWorkflowExecutionResponse> responseObserver) {
738
    try {
739
      terminateWorkflowExecution(request);
1✔
740
      responseObserver.onNext(TerminateWorkflowExecutionResponse.getDefaultInstance());
1✔
741
      responseObserver.onCompleted();
1✔
742
    } catch (StatusRuntimeException e) {
1✔
743
      handleStatusRuntimeException(e, responseObserver);
1✔
744
    }
1✔
745
  }
1✔
746

747
  private void terminateWorkflowExecution(TerminateWorkflowExecutionRequest request) {
748
    ExecutionId executionId =
1✔
749
        new ExecutionId(request.getNamespace(), request.getWorkflowExecution());
1✔
750
    TestWorkflowMutableState mutableState = getMutableState(executionId);
1✔
751
    mutableState.terminateWorkflowExecution(request);
1✔
752
  }
1✔
753

754
  @Override
755
  public void signalWorkflowExecution(
756
      SignalWorkflowExecutionRequest signalRequest,
757
      StreamObserver<SignalWorkflowExecutionResponse> responseObserver) {
758
    try {
759
      ExecutionId executionId =
1✔
760
          new ExecutionId(signalRequest.getNamespace(), signalRequest.getWorkflowExecution());
1✔
761
      TestWorkflowMutableState mutableState = getMutableState(executionId);
1✔
762
      mutableState.signal(signalRequest);
1✔
763
      responseObserver.onNext(SignalWorkflowExecutionResponse.getDefaultInstance());
1✔
764
      responseObserver.onCompleted();
1✔
765
    } catch (StatusRuntimeException e) {
1✔
766
      handleStatusRuntimeException(e, responseObserver);
1✔
767
    }
1✔
768
  }
1✔
769

770
  @Override
771
  public void signalWithStartWorkflowExecution(
772
      SignalWithStartWorkflowExecutionRequest r,
773
      StreamObserver<SignalWithStartWorkflowExecutionResponse> responseObserver) {
774
    try {
775
      if (!r.hasTaskQueue()) {
1✔
776
        throw Status.INVALID_ARGUMENT
×
777
            .withDescription("request missing required taskQueue field")
×
778
            .asRuntimeException();
×
779
      }
780
      if (!r.hasWorkflowType()) {
1✔
781
        throw Status.INVALID_ARGUMENT
×
782
            .withDescription("request missing required workflowType field")
×
783
            .asRuntimeException();
×
784
      }
785
      ExecutionId executionId = new ExecutionId(r.getNamespace(), r.getWorkflowId(), null);
1✔
786
      TestWorkflowMutableState mutableState = getMutableState(executionId, false);
1✔
787
      SignalWorkflowExecutionRequest signalRequest =
788
          SignalWorkflowExecutionRequest.newBuilder()
1✔
789
              .setInput(r.getSignalInput())
1✔
790
              .setSignalName(r.getSignalName())
1✔
791
              .setWorkflowExecution(executionId.getExecution())
1✔
792
              .setRequestId(r.getRequestId())
1✔
793
              .setControl(r.getControl())
1✔
794
              .setNamespace(r.getNamespace())
1✔
795
              .setIdentity(r.getIdentity())
1✔
796
              .build();
1✔
797
      if (mutableState != null && !mutableState.isTerminalState()) {
1✔
798
        mutableState.signal(signalRequest);
1✔
799
        responseObserver.onNext(
1✔
800
            SignalWithStartWorkflowExecutionResponse.newBuilder()
1✔
801
                .setRunId(mutableState.getExecutionId().getExecution().getRunId())
1✔
802
                .build());
1✔
803
        responseObserver.onCompleted();
1✔
804
        return;
1✔
805
      }
806
      StartWorkflowExecutionRequest.Builder startRequest =
807
          StartWorkflowExecutionRequest.newBuilder()
1✔
808
              .setRequestId(r.getRequestId())
1✔
809
              .setInput(r.getInput())
1✔
810
              .setWorkflowExecutionTimeout(r.getWorkflowExecutionTimeout())
1✔
811
              .setWorkflowRunTimeout(r.getWorkflowRunTimeout())
1✔
812
              .setWorkflowTaskTimeout(r.getWorkflowTaskTimeout())
1✔
813
              .setNamespace(r.getNamespace())
1✔
814
              .setTaskQueue(r.getTaskQueue())
1✔
815
              .setWorkflowId(r.getWorkflowId())
1✔
816
              .setWorkflowIdReusePolicy(r.getWorkflowIdReusePolicy())
1✔
817
              .setIdentity(r.getIdentity())
1✔
818
              .setWorkflowType(r.getWorkflowType())
1✔
819
              .setCronSchedule(r.getCronSchedule())
1✔
820
              .setRequestId(r.getRequestId());
1✔
821
      if (r.hasRetryPolicy()) {
1✔
822
        startRequest.setRetryPolicy(r.getRetryPolicy());
×
823
      }
824
      if (r.hasHeader()) {
1✔
825
        startRequest.setHeader(r.getHeader());
1✔
826
      }
827
      if (r.hasMemo()) {
1✔
828
        startRequest.setMemo(r.getMemo());
×
829
      }
830
      if (r.hasSearchAttributes()) {
1✔
831
        startRequest.setSearchAttributes(r.getSearchAttributes());
1✔
832
      }
833
      StartWorkflowExecutionResponse startResult =
1✔
834
          startWorkflowExecutionImpl(
1✔
835
              startRequest.build(),
1✔
836
              Duration.ZERO,
837
              Optional.empty(),
1✔
838
              OptionalLong.empty(),
1✔
839
              signalRequest);
840
      responseObserver.onNext(
1✔
841
          SignalWithStartWorkflowExecutionResponse.newBuilder()
1✔
842
              .setRunId(startResult.getRunId())
1✔
843
              .build());
1✔
844
      responseObserver.onCompleted();
1✔
845
    } catch (StatusRuntimeException e) {
1✔
846
      handleStatusRuntimeException(e, responseObserver);
1✔
847
    }
1✔
848
  }
1✔
849

850
  public void signalExternalWorkflowExecution(
851
      String signalId,
852
      SignalExternalWorkflowExecutionCommandAttributes commandAttributes,
853
      TestWorkflowMutableState source) {
854
    String namespace;
855
    if (commandAttributes.getNamespace().isEmpty()) {
1✔
856
      namespace = source.getExecutionId().getNamespace();
1✔
857
    } else {
858
      namespace = commandAttributes.getNamespace();
×
859
    }
860
    ExecutionId executionId = new ExecutionId(namespace, commandAttributes.getExecution());
1✔
861
    TestWorkflowMutableState mutableState;
862
    try {
863
      mutableState = getMutableState(executionId);
1✔
864
      mutableState.signalFromWorkflow(commandAttributes);
1✔
865
      source.completeSignalExternalWorkflowExecution(
1✔
866
          signalId, mutableState.getExecutionId().getExecution().getRunId());
1✔
867
    } catch (StatusRuntimeException e) {
1✔
868
      if (e.getStatus().getCode() == Status.Code.NOT_FOUND) {
1✔
869
        source.failSignalExternalWorkflowExecution(
1✔
870
            signalId,
871
            SignalExternalWorkflowExecutionFailedCause
872
                .SIGNAL_EXTERNAL_WORKFLOW_EXECUTION_FAILED_CAUSE_EXTERNAL_WORKFLOW_EXECUTION_NOT_FOUND);
873
      } else {
874
        throw e;
×
875
      }
876
    }
1✔
877
  }
1✔
878

879
  /**
880
   * Creates next run of a workflow execution
881
   *
882
   * @return RunId
883
   */
884
  public String continueAsNew(
885
      StartWorkflowExecutionRequest previousRunStartRequest,
886
      WorkflowExecutionContinuedAsNewEventAttributes a,
887
      Optional<TestServiceRetryState> retryState,
888
      String identity,
889
      ExecutionId continuedExecutionId,
890
      String firstExecutionRunId,
891
      Optional<TestWorkflowMutableState> parent,
892
      OptionalLong parentChildInitiatedEventId) {
893
    StartWorkflowExecutionRequest.Builder startRequestBuilder =
894
        StartWorkflowExecutionRequest.newBuilder()
1✔
895
            .setRequestId(UUID.randomUUID().toString())
1✔
896
            .setWorkflowType(a.getWorkflowType())
1✔
897
            .setWorkflowRunTimeout(a.getWorkflowRunTimeout())
1✔
898
            .setWorkflowTaskTimeout(a.getWorkflowTaskTimeout())
1✔
899
            .setNamespace(continuedExecutionId.getNamespace())
1✔
900
            .setTaskQueue(a.getTaskQueue())
1✔
901
            .setWorkflowId(continuedExecutionId.getWorkflowId().getWorkflowId())
1✔
902
            .setWorkflowIdReusePolicy(previousRunStartRequest.getWorkflowIdReusePolicy())
1✔
903
            .setIdentity(identity)
1✔
904
            .setCronSchedule(previousRunStartRequest.getCronSchedule());
1✔
905
    if (previousRunStartRequest.hasRetryPolicy()) {
1✔
906
      startRequestBuilder.setRetryPolicy(previousRunStartRequest.getRetryPolicy());
1✔
907
    }
908
    if (a.hasInput()) {
1✔
909
      startRequestBuilder.setInput(a.getInput());
1✔
910
    }
911
    if (a.hasHeader()) {
1✔
912
      startRequestBuilder.setHeader(a.getHeader());
1✔
913
    }
914
    StartWorkflowExecutionRequest startRequest = startRequestBuilder.build();
1✔
915
    lock.lock();
1✔
916
    Optional<Failure> lastFail =
917
        a.hasFailure()
1✔
918
            ? Optional.of(a.getFailure())
1✔
919
            : retryState.flatMap(TestServiceRetryState::getPreviousRunFailure);
1✔
920
    try {
921
      StartWorkflowExecutionResponse response =
1✔
922
          startWorkflowExecutionNoRunningCheckLocked(
1✔
923
              startRequest,
924
              a.getNewExecutionRunId(),
1✔
925
              firstExecutionRunId,
926
              Optional.of(continuedExecutionId.getExecution().getRunId()),
1✔
927
              retryState,
928
              ProtobufTimeUtils.toJavaDuration(a.getBackoffStartInterval()),
1✔
929
              a.getLastCompletionResult(),
1✔
930
              lastFail,
931
              parent,
932
              parentChildInitiatedEventId,
933
              null,
934
              continuedExecutionId.getWorkflowId());
1✔
935
      return response.getRunId();
1✔
936
    } finally {
937
      lock.unlock();
1✔
938
    }
939
  }
940

941
  @Override
942
  public void listOpenWorkflowExecutions(
943
      ListOpenWorkflowExecutionsRequest listRequest,
944
      StreamObserver<ListOpenWorkflowExecutionsResponse> responseObserver) {
945
    try {
946
      Optional<String> workflowIdFilter;
947
      if (listRequest.hasExecutionFilter()
1✔
948
          && !listRequest.getExecutionFilter().getWorkflowId().isEmpty()) {
×
949
        workflowIdFilter = Optional.of(listRequest.getExecutionFilter().getWorkflowId());
×
950
      } else {
951
        workflowIdFilter = Optional.empty();
1✔
952
      }
953
      List<WorkflowExecutionInfo> result =
1✔
954
          store.listWorkflows(WorkflowState.OPEN, workflowIdFilter);
1✔
955
      responseObserver.onNext(
1✔
956
          ListOpenWorkflowExecutionsResponse.newBuilder().addAllExecutions(result).build());
1✔
957
      responseObserver.onCompleted();
1✔
958
    } catch (StatusRuntimeException e) {
×
959
      handleStatusRuntimeException(e, responseObserver);
×
960
    }
1✔
961
  }
1✔
962

963
  @Override
964
  public void listClosedWorkflowExecutions(
965
      ListClosedWorkflowExecutionsRequest listRequest,
966
      StreamObserver<ListClosedWorkflowExecutionsResponse> responseObserver) {
967
    try {
968
      Optional<String> workflowIdFilter;
969
      if (listRequest.hasExecutionFilter()
1✔
970
          && !listRequest.getExecutionFilter().getWorkflowId().isEmpty()) {
×
971
        workflowIdFilter = Optional.of(listRequest.getExecutionFilter().getWorkflowId());
×
972
      } else {
973
        workflowIdFilter = Optional.empty();
1✔
974
      }
975
      List<WorkflowExecutionInfo> result =
1✔
976
          store.listWorkflows(WorkflowState.CLOSED, workflowIdFilter);
1✔
977
      responseObserver.onNext(
1✔
978
          ListClosedWorkflowExecutionsResponse.newBuilder().addAllExecutions(result).build());
1✔
979
      responseObserver.onCompleted();
1✔
980
    } catch (StatusRuntimeException e) {
×
981
      handleStatusRuntimeException(e, responseObserver);
×
982
    }
1✔
983
  }
1✔
984

985
  @Override
986
  public void respondQueryTaskCompleted(
987
      RespondQueryTaskCompletedRequest completeRequest,
988
      StreamObserver<RespondQueryTaskCompletedResponse> responseObserver) {
989
    try {
990
      QueryId queryId = QueryId.fromBytes(completeRequest.getTaskToken());
1✔
991
      TestWorkflowMutableState mutableState = getMutableState(queryId.getExecutionId());
1✔
992
      mutableState.completeQuery(queryId, completeRequest);
1✔
993
      responseObserver.onNext(RespondQueryTaskCompletedResponse.getDefaultInstance());
1✔
994
      responseObserver.onCompleted();
1✔
995
    } catch (StatusRuntimeException e) {
×
996
      handleStatusRuntimeException(e, responseObserver);
×
997
    }
1✔
998
  }
1✔
999

1000
  @Override
1001
  public void queryWorkflow(
1002
      QueryWorkflowRequest queryRequest, StreamObserver<QueryWorkflowResponse> responseObserver) {
1003
    try {
1004
      ExecutionId executionId =
1✔
1005
          new ExecutionId(queryRequest.getNamespace(), queryRequest.getExecution());
1✔
1006
      TestWorkflowMutableState mutableState = getMutableState(executionId);
1✔
1007
      @Nullable Deadline deadline = Context.current().getDeadline();
1✔
1008
      QueryWorkflowResponse result =
1✔
1009
          mutableState.query(
1✔
1010
              queryRequest,
1011
              deadline != null ? deadline.timeRemaining(TimeUnit.MILLISECONDS) : Long.MAX_VALUE);
1✔
1012
      responseObserver.onNext(result);
1✔
1013
      responseObserver.onCompleted();
1✔
1014
    } catch (StatusRuntimeException e) {
1✔
1015
      handleStatusRuntimeException(e, responseObserver);
1✔
1016
    }
1✔
1017
  }
1✔
1018

1019
  @Override
1020
  public void describeWorkflowExecution(
1021
      DescribeWorkflowExecutionRequest request,
1022
      StreamObserver<io.temporal.api.workflowservice.v1.DescribeWorkflowExecutionResponse>
1023
          responseObserver) {
1024
    try {
1025
      if (request.getNamespace().isEmpty()) {
1✔
1026
        throw createInvalidArgument("Namespace not set on request.");
×
1027
      }
1028
      if (!request.hasExecution()) {
1✔
1029
        throw createInvalidArgument("Execution not set on request.");
×
1030
      }
1031

1032
      ExecutionId executionId = new ExecutionId(request.getNamespace(), request.getExecution());
1✔
1033
      TestWorkflowMutableState mutableState = getMutableState(executionId);
1✔
1034
      DescribeWorkflowExecutionResponse result = mutableState.describeWorkflowExecution();
1✔
1035
      responseObserver.onNext(result);
1✔
1036
      responseObserver.onCompleted();
1✔
1037
    } catch (StatusRuntimeException e) {
1✔
1038
      handleStatusRuntimeException(e, responseObserver);
1✔
1039
    }
1✔
1040
  }
1✔
1041

1042
  /**
1043
   * This method doesn't make much sense for test server, it accepts all namespaces as existent and
1044
   * registered. so, it's a trivial implementation just returning an info that a namespace is
1045
   * registered irrespectively of the input
1046
   */
1047
  @Override
1048
  public void describeNamespace(
1049
      DescribeNamespaceRequest request,
1050
      StreamObserver<DescribeNamespaceResponse> responseObserver) {
1051
    try {
1052
      if (request.getNamespace().isEmpty()) {
1✔
1053
        throw createInvalidArgument("Namespace not set on request.");
×
1054
      }
1055
      // generating a stable UUID for name
1056
      String namespaceId = UUID.nameUUIDFromBytes(request.getNamespace().getBytes()).toString();
1✔
1057
      DescribeNamespaceResponse result =
1058
          DescribeNamespaceResponse.newBuilder()
1✔
1059
              .setNamespaceInfo(
1✔
1060
                  NamespaceInfo.newBuilder()
1✔
1061
                      .setName(request.getNamespace())
1✔
1062
                      .setState(NamespaceState.NAMESPACE_STATE_REGISTERED)
1✔
1063
                      .setId(namespaceId)
1✔
1064
                      .build())
1✔
1065
              .build();
1✔
1066
      responseObserver.onNext(result);
1✔
1067
      responseObserver.onCompleted();
1✔
1068
    } catch (StatusRuntimeException e) {
1✔
1069
      handleStatusRuntimeException(e, responseObserver);
1✔
1070
    }
1✔
1071
  }
1✔
1072

1073
  private <R> R requireNotNull(String fieldName, R value) {
1074
    if (value == null) {
1✔
1075
      throw Status.INVALID_ARGUMENT
×
1076
          .withDescription("Missing required field \"" + fieldName + "\".")
×
1077
          .asRuntimeException();
×
1078
    }
1079
    return value;
1✔
1080
  }
1081

1082
  /**
1083
   * Adds diagnostic data about internal service state to the provided {@link StringBuilder}.
1084
   * Includes histories of all workflow instances stored in the service.
1085
   */
1086
  public void getDiagnostics(StringBuilder result) {
1087
    store.getDiagnostics(result);
×
1088
  }
×
1089

1090
  /**
1091
   * @deprecated use {@link io.temporal.serviceclient.TestServiceStubs} and {@link
1092
   *     io.temporal.api.testservice.v1.TestServiceGrpc.TestServiceBlockingStub#getCurrentTime(Empty)}
1093
   */
1094
  @Deprecated
1095
  public long currentTimeMillis() {
1096
    return selfAdvancingTimer.getClock().getAsLong();
×
1097
  }
1098

1099
  /** Invokes callback after the specified delay according to internal service clock. */
1100
  public void registerDelayedCallback(Duration delay, Runnable r) {
1101
    store.registerDelayedCallback(delay, r);
1✔
1102
  }
1✔
1103

1104
  /**
1105
   * Disables time skipping. To re-enable call {@link #unlockTimeSkipping(String)}. These calls are
1106
   * counted, so calling unlock does not guarantee that time is going to be skipped immediately as
1107
   * another lock can be holding it.
1108
   *
1109
   * @deprecated use {@link io.temporal.serviceclient.TestServiceStubs} and {@link
1110
   *     io.temporal.api.testservice.v1.TestServiceGrpc.TestServiceBlockingStub#lockTimeSkipping(LockTimeSkippingRequest)}
1111
   */
1112
  @Deprecated
1113
  public void lockTimeSkipping(String caller) {
1114
    selfAdvancingTimer.lockTimeSkipping(caller);
×
1115
  }
×
1116

1117
  /**
1118
   * @deprecated use {@link io.temporal.serviceclient.TestServiceStubs} and {@link
1119
   *     io.temporal.api.testservice.v1.TestServiceGrpc.TestServiceBlockingStub#unlockTimeSkipping(UnlockTimeSkippingRequest)}
1120
   */
1121
  @Deprecated
1122
  public void unlockTimeSkipping(String caller) {
1123
    selfAdvancingTimer.unlockTimeSkipping(caller);
×
1124
  }
×
1125

1126
  /**
1127
   * Unlocks time skipping and blocks the calling thread until internal clock passes the current +
1128
   * duration time.<br>
1129
   * When the time is reached, locks time skipping and returns.<br>
1130
   * Might not block at all due to time skipping. Or might block if the time skipping lock counter
1131
   * was more than 1.
1132
   *
1133
   * @deprecated use {@link io.temporal.serviceclient.TestServiceStubs} and {@link
1134
   *     io.temporal.api.testservice.v1.TestServiceGrpc.TestServiceBlockingStub#unlockTimeSkippingWithSleep(SleepRequest)}
1135
   */
1136
  @Deprecated
1137
  public void sleep(Duration duration) {
1138
    CompletableFuture<Void> result = new CompletableFuture<>();
×
1139
    selfAdvancingTimer.schedule(
×
1140
        duration,
1141
        () -> {
1142
          selfAdvancingTimer.lockTimeSkipping("TestWorkflowService sleep");
×
1143
          result.complete(null);
×
1144
        },
×
1145
        "workflow sleep");
1146
    selfAdvancingTimer.unlockTimeSkipping("TestWorkflowService sleep");
×
1147
    try {
1148
      result.get();
×
1149
    } catch (InterruptedException e) {
×
1150
      Thread.currentThread().interrupt();
×
1151
      throw new RuntimeException(e);
×
1152
    } catch (ExecutionException e) {
×
1153
      throw new RuntimeException(e);
×
1154
    }
×
1155
  }
×
1156

1157
  /**
1158
   * Temporal server times out task queue long poll calls after 1 minute and returns an empty
1159
   * result. After which the request has to be retried by the client if it wants to continue
1160
   * waiting. We emulate this behavior here.
1161
   *
1162
   * <p>If there is a deadline present, for task queue poll requests server will respond inside the
1163
   * deadline. Note that the latest is not applicable for getWorkflowExecutionHistory() long polls.
1164
   *
1165
   * @return minimum between the context deadline and maximum long poll deadline.
1166
   */
1167
  private Deadline getLongPollDeadline() {
1168
    @Nullable Deadline deadline = Context.current().getDeadline();
1✔
1169
    Deadline maximumDeadline =
1✔
1170
        Deadline.after(
1✔
1171
            WorkflowServiceStubsOptions.DEFAULT_SERVER_LONG_POLL_RPC_TIMEOUT.toMillis(),
1✔
1172
            TimeUnit.MILLISECONDS);
1173
    return deadline != null ? deadline.minimum(maximumDeadline) : maximumDeadline;
1✔
1174
  }
1175

1176
  private void handleStatusRuntimeException(
1177
      StatusRuntimeException e, StreamObserver<?> responseObserver) {
1178
    if (e.getStatus().getCode() == Status.Code.INTERNAL) {
1✔
1179
      log.error("unexpected", e);
1✔
1180
    }
1181
    responseObserver.onError(e);
1✔
1182
  }
1✔
1183

1184
  /**
1185
   * Creates an in-memory service along with client stubs for use in Java code. See also
1186
   * createServerOnly and createWithNoGrpcServer.
1187
   *
1188
   * @deprecated use {@link io.temporal.testserver.TestServer#createServer(boolean)} instead and
1189
   *     pass {@code lockTimeSkipping=false} to emulate the behavior of this method
1190
   */
1191
  @Deprecated
1192
  public TestWorkflowService() {
1193
    this(0, true);
×
1194
  }
×
1195

1196
  /**
1197
   * Creates an in-memory service along with client stubs for use in Java code. See also
1198
   * createServerOnly and createWithNoGrpcServer.
1199
   *
1200
   * @deprecated use {@link io.temporal.testserver.TestServer#createServer(boolean, long)} instead
1201
   *     and pass {@code lockTimeSkipping=false} to emulate the behavior of this method
1202
   */
1203
  @Deprecated
1204
  public TestWorkflowService(long initialTimeMillis) {
1205
    this(initialTimeMillis, true);
×
1206
  }
×
1207

1208
  /**
1209
   * Creates an in-memory service along with client stubs for use in Java code. See also
1210
   * createServerOnly and createWithNoGrpcServer.
1211
   *
1212
   * @deprecated use {@link io.temporal.testserver.TestServer#createServer(boolean)} instead
1213
   */
1214
  @Deprecated
1215
  public TestWorkflowService(boolean lockTimeSkipping) {
1216
    this(0, true);
×
1217
    if (lockTimeSkipping) {
×
1218
      this.lockTimeSkipping("constructor");
×
1219
    }
1220
  }
×
1221

1222
  /**
1223
   * Creates an instance of TestWorkflowService that does not manage its own gRPC server. Useful for
1224
   * including in an externally managed gRPC server.
1225
   *
1226
   * @deprecated use {@link TestServicesStarter} to create just the services with gRPC server
1227
   */
1228
  @Deprecated
1229
  public static TestWorkflowService createWithNoGrpcServer() {
1230
    return new TestWorkflowService(0, false);
×
1231
  }
1232

1233
  private TestWorkflowService(long initialTimeMillis, boolean startInProcessServer) {
×
1234
    this.selfAdvancingTimer =
×
1235
        new SelfAdvancingTimerImpl(initialTimeMillis, Clock.systemDefaultZone());
×
1236
    store = new TestWorkflowStoreImpl(this.selfAdvancingTimer);
×
1237
    visibilityStore = new TestVisibilityStoreImpl();
×
1238
    outOfProcessServer = null;
×
1239
    if (startInProcessServer) {
×
1240
      this.inProcessServer = new InProcessGRPCServer(Collections.singletonList(this));
×
1241
      this.workflowServiceStubs =
×
1242
          WorkflowServiceStubs.newServiceStubs(
×
1243
              WorkflowServiceStubsOptions.newBuilder()
×
1244
                  .setChannel(inProcessServer.getChannel())
×
1245
                  .build());
×
1246
    } else {
1247
      this.inProcessServer = null;
×
1248
      this.workflowServiceStubs = null;
×
1249
    }
1250
  }
×
1251

1252
  /**
1253
   * Creates an out-of-process rather than in-process server, and does not set up a client. Useful,
1254
   * for example, if you want to use the test service from other SDKs.
1255
   *
1256
   * @param port the port to listen on
1257
   * @deprecated use {@link io.temporal.testserver.TestServer#createPortBoundServer(int, boolean)}
1258
   *     instead and pass {@code lockTimeSkipping=false} to emulate the behavior of this method
1259
   */
1260
  @Deprecated
1261
  public static TestWorkflowService createServerOnly(int port) {
1262
    TestWorkflowService result = new TestWorkflowService(true, port);
×
1263
    log.info("Server started, listening on " + port);
×
1264
    return result;
×
1265
  }
1266

1267
  private TestWorkflowService(boolean isOutOfProc, int port) {
×
1268
    // isOutOfProc is just here to make unambiguous constructor overloading.
1269
    Preconditions.checkState(isOutOfProc, "Impossible.");
×
1270
    inProcessServer = null;
×
1271
    workflowServiceStubs = null;
×
1272
    this.selfAdvancingTimer = new SelfAdvancingTimerImpl(0, Clock.systemDefaultZone());
×
1273
    store = new TestWorkflowStoreImpl(selfAdvancingTimer);
×
1274
    visibilityStore = new TestVisibilityStoreImpl();
×
1275
    try {
1276
      ServerBuilder<?> serverBuilder =
×
1277
          Grpc.newServerBuilderForPort(port, InsecureServerCredentials.create());
×
1278
      GRPCServerHelper.registerServicesAndHealthChecks(
×
1279
          Collections.singletonList(this), serverBuilder);
×
1280
      outOfProcessServer = serverBuilder.build().start();
×
1281
    } catch (IOException e) {
×
1282
      throw new RuntimeException(e);
×
1283
    }
×
1284
  }
×
1285

1286
  @Deprecated
1287
  public WorkflowServiceStubs newClientStub() {
1288
    if (workflowServiceStubs == null) {
×
1289
      throw new RuntimeException(
×
1290
          "Cannot get a client when you created your TestWorkflowService with createServerOnly.");
1291
    }
1292
    return workflowServiceStubs;
×
1293
  }
1294

1295
  private static StatusRuntimeException createInvalidArgument(String description) {
1296
    throw Status.INVALID_ARGUMENT.withDescription(description).asRuntimeException();
1✔
1297
  }
1298
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc