• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

temporalio / sdk-java / #153

pending completion
#153

push

github-actions

web-flow
Eager Workflow Task Dispatch (#1674)

Issue #1646

Signed-off-by: Dmitry Spikhalskiy <dmitry@spikhalskiy.com>

213 of 213 new or added lines in 22 files covered. (100.0%)

16682 of 20566 relevant lines covered (81.11%)

0.81 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

67.59
/temporal-test-server/src/main/java/io/temporal/internal/testservice/TestWorkflowService.java
1
/*
2
 * Copyright (C) 2022 Temporal Technologies, Inc. All Rights Reserved.
3
 *
4
 * Copyright (C) 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved.
5
 *
6
 * Modifications copyright (C) 2017 Uber Technologies, Inc.
7
 *
8
 * Licensed under the Apache License, Version 2.0 (the "License");
9
 * you may not use this material except in compliance with the License.
10
 * You may obtain a copy of the License at
11
 *
12
 *   http://www.apache.org/licenses/LICENSE-2.0
13
 *
14
 * Unless required by applicable law or agreed to in writing, software
15
 * distributed under the License is distributed on an "AS IS" BASIS,
16
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17
 * See the License for the specific language governing permissions and
18
 * limitations under the License.
19
 */
20

21
package io.temporal.internal.testservice;
22

23
import static io.temporal.internal.testservice.CronUtils.getBackoffInterval;
24

25
import com.google.common.base.Preconditions;
26
import com.google.common.base.Throwables;
27
import com.google.protobuf.Empty;
28
import com.google.protobuf.Timestamp;
29
import com.google.protobuf.util.Timestamps;
30
import io.grpc.*;
31
import io.grpc.stub.StreamObserver;
32
import io.temporal.api.command.v1.SignalExternalWorkflowExecutionCommandAttributes;
33
import io.temporal.api.common.v1.Payloads;
34
import io.temporal.api.common.v1.RetryPolicy;
35
import io.temporal.api.common.v1.WorkflowExecution;
36
import io.temporal.api.enums.v1.NamespaceState;
37
import io.temporal.api.enums.v1.SignalExternalWorkflowExecutionFailedCause;
38
import io.temporal.api.enums.v1.WorkflowExecutionStatus;
39
import io.temporal.api.enums.v1.WorkflowIdReusePolicy;
40
import io.temporal.api.errordetails.v1.WorkflowExecutionAlreadyStartedFailure;
41
import io.temporal.api.failure.v1.Failure;
42
import io.temporal.api.history.v1.WorkflowExecutionContinuedAsNewEventAttributes;
43
import io.temporal.api.namespace.v1.NamespaceInfo;
44
import io.temporal.api.testservice.v1.LockTimeSkippingRequest;
45
import io.temporal.api.testservice.v1.SleepRequest;
46
import io.temporal.api.testservice.v1.UnlockTimeSkippingRequest;
47
import io.temporal.api.workflow.v1.WorkflowExecutionInfo;
48
import io.temporal.api.workflowservice.v1.*;
49
import io.temporal.internal.common.ProtobufTimeUtils;
50
import io.temporal.internal.testservice.TestWorkflowStore.WorkflowState;
51
import io.temporal.serviceclient.StatusUtils;
52
import io.temporal.serviceclient.WorkflowServiceStubs;
53
import io.temporal.serviceclient.WorkflowServiceStubsOptions;
54
import java.io.Closeable;
55
import java.io.IOException;
56
import java.time.Clock;
57
import java.time.Duration;
58
import java.util.*;
59
import java.util.concurrent.*;
60
import java.util.concurrent.locks.Lock;
61
import java.util.concurrent.locks.ReentrantLock;
62
import javax.annotation.Nonnull;
63
import javax.annotation.Nullable;
64
import org.slf4j.Logger;
65
import org.slf4j.LoggerFactory;
66

67
/**
68
 * In memory implementation of the Workflow Service. To be used for testing purposes only.
69
 *
70
 * <p>Do not use directly, instead use {@link io.temporal.testing.TestWorkflowEnvironment}.
71
 */
72
public final class TestWorkflowService extends WorkflowServiceGrpc.WorkflowServiceImplBase
73
    implements Closeable {
74
  private static final Logger log = LoggerFactory.getLogger(TestWorkflowService.class);
1✔
75
  private final Map<ExecutionId, TestWorkflowMutableState> executions = new HashMap<>();
1✔
76
  // key->WorkflowId
77
  private final Map<WorkflowId, TestWorkflowMutableState> executionsByWorkflowId = new HashMap<>();
1✔
78
  private final ExecutorService executor = Executors.newCachedThreadPool();
1✔
79
  private final Lock lock = new ReentrantLock();
1✔
80

81
  private final TestWorkflowStore store;
82
  private final TestVisibilityStore visibilityStore;
83
  private final SelfAdvancingTimer selfAdvancingTimer;
84

85
  private final ScheduledExecutorService backgroundScheduler =
1✔
86
      Executors.newSingleThreadScheduledExecutor();
1✔
87

88
  private final Server outOfProcessServer;
89
  private final InProcessGRPCServer inProcessServer;
90
  private final WorkflowServiceStubs workflowServiceStubs;
91

92
  TestWorkflowService(
93
      TestWorkflowStore store,
94
      TestVisibilityStore visibilityStore,
95
      SelfAdvancingTimer selfAdvancingTimer) {
1✔
96
    this.store = store;
1✔
97
    this.visibilityStore = visibilityStore;
1✔
98
    this.selfAdvancingTimer = selfAdvancingTimer;
1✔
99
    this.outOfProcessServer = null;
1✔
100
    this.inProcessServer = null;
1✔
101
    this.workflowServiceStubs = null;
1✔
102
  }
1✔
103

104
  @Override
105
  public void close() {
106
    log.debug("Shutting down TestWorkflowService");
1✔
107

108
    log.debug("Shutting down background scheduler");
1✔
109
    backgroundScheduler.shutdown();
1✔
110

111
    if (outOfProcessServer != null) {
1✔
112
      log.info("Shutting down out-of-process GRPC server");
×
113
      outOfProcessServer.shutdown();
×
114
    }
115

116
    if (workflowServiceStubs != null) {
1✔
117
      workflowServiceStubs.shutdown();
×
118
    }
119

120
    if (inProcessServer != null) {
1✔
121
      log.info("Shutting down in-process GRPC server");
×
122
      inProcessServer.shutdown();
×
123
    }
124

125
    executor.shutdown();
1✔
126

127
    try {
128
      executor.awaitTermination(1, TimeUnit.SECONDS);
1✔
129

130
      if (outOfProcessServer != null) {
1✔
131
        outOfProcessServer.awaitTermination(1, TimeUnit.SECONDS);
×
132
      }
133

134
      if (workflowServiceStubs != null) {
1✔
135
        workflowServiceStubs.awaitTermination(1, TimeUnit.SECONDS);
×
136
      }
137

138
      if (inProcessServer != null) {
1✔
139
        inProcessServer.awaitTermination(1, TimeUnit.SECONDS);
×
140
      }
141

142
    } catch (InterruptedException e) {
×
143
      Thread.currentThread().interrupt();
×
144
      log.debug("shutdown interrupted", e);
×
145
    }
1✔
146

147
    store.close();
1✔
148
  }
1✔
149

150
  private TestWorkflowMutableState getMutableState(ExecutionId executionId) {
151
    return getMutableState(executionId, true);
1✔
152
  }
153

154
  private TestWorkflowMutableState getMutableState(ExecutionId executionId, boolean failNotExists) {
155
    lock.lock();
1✔
156
    try {
157
      if (executionId.getExecution().getRunId().isEmpty()) {
1✔
158
        return getMutableState(executionId.getWorkflowId(), failNotExists);
1✔
159
      }
160
      TestWorkflowMutableState mutableState = executions.get(executionId);
1✔
161
      if (mutableState == null && failNotExists) {
1✔
162
        throw Status.NOT_FOUND
1✔
163
            .withDescription(
1✔
164
                "Execution \""
165
                    + executionId
166
                    + "\" not found in mutable state. Known executions: "
167
                    + executions.values()
1✔
168
                    + ", service="
169
                    + this)
170
            .asRuntimeException();
1✔
171
      }
172
      return mutableState;
1✔
173
    } finally {
174
      lock.unlock();
1✔
175
    }
176
  }
177

178
  private TestWorkflowMutableState getMutableState(WorkflowId workflowId, boolean failNotExists) {
179
    lock.lock();
1✔
180
    try {
181
      TestWorkflowMutableState mutableState = executionsByWorkflowId.get(workflowId);
1✔
182
      if (mutableState == null && failNotExists) {
1✔
183
        throw Status.NOT_FOUND
1✔
184
            .withDescription("Execution not found in mutable state: " + workflowId)
1✔
185
            .asRuntimeException();
1✔
186
      }
187
      return mutableState;
1✔
188
    } finally {
189
      lock.unlock();
1✔
190
    }
191
  }
192

193
  @Override
194
  public void startWorkflowExecution(
195
      StartWorkflowExecutionRequest request,
196
      StreamObserver<StartWorkflowExecutionResponse> responseObserver) {
197
    try {
198
      Duration backoffInterval = getBackoffInterval(request.getCronSchedule(), store.currentTime());
1✔
199
      StartWorkflowExecutionResponse response =
1✔
200
          startWorkflowExecutionImpl(
1✔
201
              request, backoffInterval, Optional.empty(), OptionalLong.empty(), null);
1✔
202
      responseObserver.onNext(response);
1✔
203
      responseObserver.onCompleted();
1✔
204
    } catch (StatusRuntimeException e) {
1✔
205
      handleStatusRuntimeException(e, responseObserver);
1✔
206
    }
1✔
207
  }
1✔
208

209
  StartWorkflowExecutionResponse startWorkflowExecutionImpl(
210
      StartWorkflowExecutionRequest startRequest,
211
      Duration backoffStartInterval,
212
      Optional<TestWorkflowMutableState> parent,
213
      OptionalLong parentChildInitiatedEventId,
214
      @Nullable SignalWorkflowExecutionRequest signalWithStartSignal) {
215
    String requestWorkflowId = requireNotNull("WorkflowId", startRequest.getWorkflowId());
1✔
216
    String namespace = requireNotNull("Namespace", startRequest.getNamespace());
1✔
217
    WorkflowId workflowId = new WorkflowId(namespace, requestWorkflowId);
1✔
218
    TestWorkflowMutableState existing;
219
    lock.lock();
1✔
220
    try {
221
      existing = executionsByWorkflowId.get(workflowId);
1✔
222
      if (existing != null) {
1✔
223
        WorkflowExecutionStatus status = existing.getWorkflowExecutionStatus();
1✔
224
        WorkflowIdReusePolicy policy = startRequest.getWorkflowIdReusePolicy();
1✔
225
        if (status == WorkflowExecutionStatus.WORKFLOW_EXECUTION_STATUS_RUNNING
1✔
226
            || policy == WorkflowIdReusePolicy.WORKFLOW_ID_REUSE_POLICY_REJECT_DUPLICATE) {
227
          return throwDuplicatedWorkflow(startRequest, existing);
×
228
        }
229
        if (policy == WorkflowIdReusePolicy.WORKFLOW_ID_REUSE_POLICY_ALLOW_DUPLICATE_FAILED_ONLY
1✔
230
            && (status == WorkflowExecutionStatus.WORKFLOW_EXECUTION_STATUS_COMPLETED
231
                || status == WorkflowExecutionStatus.WORKFLOW_EXECUTION_STATUS_CONTINUED_AS_NEW)) {
232
          return throwDuplicatedWorkflow(startRequest, existing);
×
233
        }
234
      }
235
      Optional<TestServiceRetryState> retryState;
236
      Optional<Failure> lastFailure = Optional.empty();
1✔
237
      if (startRequest.hasRetryPolicy()) {
1✔
238
        Duration expirationInterval =
1✔
239
            ProtobufTimeUtils.toJavaDuration(startRequest.getWorkflowExecutionTimeout());
1✔
240
        retryState = newRetryStateLocked(startRequest.getRetryPolicy(), expirationInterval);
1✔
241
        if (retryState.isPresent()) {
1✔
242
          lastFailure = retryState.get().getPreviousRunFailure();
1✔
243
        }
244
      } else {
1✔
245
        retryState = Optional.empty();
1✔
246
      }
247
      String runId = UUID.randomUUID().toString();
1✔
248
      return startWorkflowExecutionNoRunningCheckLocked(
1✔
249
          startRequest,
250
          runId,
251
          // it's the first execution in the continue-as-new chain, so firstExecutionRunId = runId
252
          runId,
253
          Optional.empty(),
1✔
254
          retryState,
255
          backoffStartInterval,
256
          null,
257
          lastFailure,
258
          parent,
259
          parentChildInitiatedEventId,
260
          signalWithStartSignal,
261
          workflowId);
262
    } finally {
263
      lock.unlock();
1✔
264
    }
265
  }
266

267
  private Optional<TestServiceRetryState> newRetryStateLocked(
268
      RetryPolicy retryPolicy, Duration expirationInterval) {
269
    Timestamp expirationTime =
270
        expirationInterval.isZero()
1✔
271
            ? Timestamps.fromNanos(0)
1✔
272
            : Timestamps.add(
1✔
273
                store.currentTime(), ProtobufTimeUtils.toProtoDuration(expirationInterval));
1✔
274
    return Optional.of(new TestServiceRetryState(retryPolicy, expirationTime));
1✔
275
  }
276

277
  private StartWorkflowExecutionResponse throwDuplicatedWorkflow(
278
      StartWorkflowExecutionRequest startRequest, TestWorkflowMutableState existing) {
279
    WorkflowExecution execution = existing.getExecutionId().getExecution();
1✔
280
    WorkflowExecutionAlreadyStartedFailure error =
281
        WorkflowExecutionAlreadyStartedFailure.newBuilder()
1✔
282
            .setRunId(execution.getRunId())
1✔
283
            .setStartRequestId(startRequest.getRequestId())
1✔
284
            .build();
1✔
285
    throw StatusUtils.newException(
1✔
286
        Status.ALREADY_EXISTS.withDescription(
1✔
287
            String.format(
1✔
288
                "WorkflowId: %s, " + "RunId: %s", execution.getWorkflowId(), execution.getRunId())),
1✔
289
        error,
290
        WorkflowExecutionAlreadyStartedFailure.getDescriptor());
1✔
291
  }
292

293
  private StartWorkflowExecutionResponse startWorkflowExecutionNoRunningCheckLocked(
294
      StartWorkflowExecutionRequest startRequest,
295
      @Nonnull String runId,
296
      @Nonnull String firstExecutionRunId,
297
      Optional<String> continuedExecutionRunId,
298
      Optional<TestServiceRetryState> retryState,
299
      Duration backoffStartInterval,
300
      Payloads lastCompletionResult,
301
      Optional<Failure> lastFailure,
302
      Optional<TestWorkflowMutableState> parent,
303
      OptionalLong parentChildInitiatedEventId,
304
      @Nullable SignalWorkflowExecutionRequest signalWithStartSignal,
305
      WorkflowId workflowId) {
306
    String namespace = startRequest.getNamespace();
1✔
307
    TestWorkflowMutableState mutableState =
1✔
308
        new TestWorkflowMutableStateImpl(
309
            startRequest,
310
            firstExecutionRunId,
311
            runId,
312
            retryState,
313
            backoffStartInterval,
314
            lastCompletionResult,
315
            lastFailure,
316
            parent,
317
            parentChildInitiatedEventId,
318
            continuedExecutionRunId,
319
            this,
320
            store,
321
            visibilityStore,
322
            selfAdvancingTimer);
323
    WorkflowExecution execution = mutableState.getExecutionId().getExecution();
1✔
324
    ExecutionId executionId = new ExecutionId(namespace, execution);
1✔
325
    executionsByWorkflowId.put(workflowId, mutableState);
1✔
326
    executions.put(executionId, mutableState);
1✔
327

328
    PollWorkflowTaskQueueRequest eagerWorkflowTaskPollRequest =
329
        startRequest.getRequestEagerExecution()
1✔
330
            ? PollWorkflowTaskQueueRequest.newBuilder()
1✔
331
                .setIdentity(startRequest.getIdentity())
1✔
332
                .setNamespace(startRequest.getNamespace())
1✔
333
                .setTaskQueue(startRequest.getTaskQueue())
1✔
334
                .build()
1✔
335
            : null;
1✔
336

337
    @Nullable
338
    PollWorkflowTaskQueueResponse eagerWorkflowTask =
1✔
339
        mutableState.startWorkflow(
1✔
340
            continuedExecutionRunId.isPresent(),
1✔
341
            signalWithStartSignal,
342
            eagerWorkflowTaskPollRequest);
343
    StartWorkflowExecutionResponse.Builder response =
344
        StartWorkflowExecutionResponse.newBuilder().setRunId(execution.getRunId());
1✔
345
    if (eagerWorkflowTask != null) {
1✔
346
      response.setEagerWorkflowTask(eagerWorkflowTask);
1✔
347
    }
348
    return response.build();
1✔
349
  }
350

351
  @Override
352
  public void getWorkflowExecutionHistory(
353
      GetWorkflowExecutionHistoryRequest getRequest,
354
      StreamObserver<GetWorkflowExecutionHistoryResponse> responseObserver) {
355
    ExecutionId executionId = new ExecutionId(getRequest.getNamespace(), getRequest.getExecution());
1✔
356
    executor.execute(
1✔
357
        // preserving gRPC context deadline between threads
358
        Context.current()
1✔
359
            .wrap(
1✔
360
                () -> {
361
                  try {
362
                    TestWorkflowMutableState mutableState = getMutableState(executionId);
1✔
363
                    responseObserver.onNext(
1✔
364
                        store.getWorkflowExecutionHistory(
1✔
365
                            mutableState.getExecutionId(),
1✔
366
                            getRequest,
367
                            // We explicitly don't try to respond inside the context deadline.
368
                            // If we try to fit into the context deadline, the deadline may be not
369
                            // expired on the client side and an empty response will lead to a new
370
                            // request, making the client hammer the server at the tail end of the
371
                            // deadline.
372
                            // So this call is designed to wait fully till the end of the
373
                            // context deadline and throw DEADLINE_EXCEEDED if the deadline is less
374
                            // than 20s.
375
                            // If it's longer than 20 seconds - we return an empty result.
376
                            Deadline.after(20, TimeUnit.SECONDS)));
1✔
377
                    responseObserver.onCompleted();
1✔
378
                  } catch (StatusRuntimeException e) {
1✔
379
                    if (e.getStatus().getCode() == Status.Code.INTERNAL) {
1✔
380
                      log.error("unexpected", e);
×
381
                    }
382
                    responseObserver.onError(e);
1✔
383
                  } catch (Exception e) {
×
384
                    log.error("unexpected", e);
×
385
                    responseObserver.onError(e);
×
386
                  }
1✔
387
                }));
1✔
388
  }
1✔
389

390
  private <T> T pollTaskQueue(Context ctx, Future<T> futureValue)
391
      throws ExecutionException, InterruptedException {
392
    final Context.CancellationListener canceler = context -> futureValue.cancel(true);
1✔
393
    ctx.addListener(canceler, this.backgroundScheduler);
1✔
394
    try {
395
      return futureValue.get();
1✔
396
    } finally {
397
      ctx.removeListener(canceler);
1✔
398
    }
399
  }
400

401
  @Override
402
  public void pollWorkflowTaskQueue(
403
      PollWorkflowTaskQueueRequest pollRequest,
404
      StreamObserver<PollWorkflowTaskQueueResponse> responseObserver) {
405
    try (Context.CancellableContext ctx = deadlineCtx(getLongPollDeadline())) {
1✔
406
      PollWorkflowTaskQueueResponse.Builder task;
407
      try {
408
        task = pollTaskQueue(ctx, store.pollWorkflowTaskQueue(pollRequest));
1✔
409
      } catch (ExecutionException e) {
×
410
        responseObserver.onError(e);
×
411
        return;
×
412
      } catch (InterruptedException e) {
×
413
        Thread.currentThread().interrupt();
×
414
        responseObserver.onNext(PollWorkflowTaskQueueResponse.getDefaultInstance());
×
415
        responseObserver.onCompleted();
×
416
        return;
×
417
      } catch (CancellationException e) {
1✔
418
        responseObserver.onNext(PollWorkflowTaskQueueResponse.getDefaultInstance());
1✔
419
        responseObserver.onCompleted();
1✔
420
        return;
1✔
421
      }
1✔
422

423
      ExecutionId executionId =
1✔
424
          new ExecutionId(pollRequest.getNamespace(), task.getWorkflowExecution());
1✔
425
      TestWorkflowMutableState mutableState = getMutableState(executionId);
1✔
426
      try {
427
        mutableState.startWorkflowTask(task, pollRequest);
1✔
428
        // The task always has the original task queue that was created as part of the response.
429
        // This may be a different task queue than the task queue it was scheduled on, as in the
430
        // case of sticky execution.
431
        task.setWorkflowExecutionTaskQueue(mutableState.getStartRequest().getTaskQueue());
1✔
432
        PollWorkflowTaskQueueResponse response = task.build();
1✔
433
        responseObserver.onNext(response);
1✔
434
        responseObserver.onCompleted();
1✔
435
      } catch (StatusRuntimeException e) {
×
436
        if (e.getStatus().getCode() == Status.Code.NOT_FOUND) {
×
437
          if (log.isDebugEnabled()) {
×
438
            log.debug("Skipping outdated workflow task for " + executionId, e);
×
439
          }
440
          // The real service doesn't return this call on outdated task.
441
          // For simplicity, we return an empty result here.
442
          responseObserver.onNext(PollWorkflowTaskQueueResponse.getDefaultInstance());
×
443
          responseObserver.onCompleted();
×
444
        } else {
445
          if (e.getStatus().getCode() == Status.Code.INTERNAL) {
×
446
            log.error("unexpected", e);
×
447
          }
448
          responseObserver.onError(e);
×
449
        }
450
      }
1✔
451
    }
1✔
452
  }
1✔
453

454
  @Override
455
  public void respondWorkflowTaskCompleted(
456
      RespondWorkflowTaskCompletedRequest request,
457
      StreamObserver<RespondWorkflowTaskCompletedResponse> responseObserver) {
458
    try {
459
      WorkflowTaskToken taskToken = WorkflowTaskToken.fromBytes(request.getTaskToken());
1✔
460
      TestWorkflowMutableState mutableState = getMutableState(taskToken.getExecutionId());
1✔
461
      mutableState.completeWorkflowTask(taskToken.getHistorySize(), request);
1✔
462
      responseObserver.onNext(RespondWorkflowTaskCompletedResponse.getDefaultInstance());
1✔
463
      responseObserver.onCompleted();
1✔
464
    } catch (StatusRuntimeException e) {
1✔
465
      handleStatusRuntimeException(e, responseObserver);
1✔
466
    } catch (Throwable e) {
×
467
      responseObserver.onError(
×
468
          Status.INTERNAL
469
              .withDescription(Throwables.getStackTraceAsString(e))
×
470
              .withCause(e)
×
471
              .asRuntimeException());
×
472
    }
1✔
473
  }
1✔
474

475
  @Override
476
  public void respondWorkflowTaskFailed(
477
      RespondWorkflowTaskFailedRequest failedRequest,
478
      StreamObserver<RespondWorkflowTaskFailedResponse> responseObserver) {
479
    try {
480
      WorkflowTaskToken taskToken = WorkflowTaskToken.fromBytes(failedRequest.getTaskToken());
1✔
481
      TestWorkflowMutableState mutableState = getMutableState(taskToken.getExecutionId());
1✔
482
      mutableState.failWorkflowTask(failedRequest);
1✔
483
      responseObserver.onNext(RespondWorkflowTaskFailedResponse.getDefaultInstance());
1✔
484
      responseObserver.onCompleted();
1✔
485
    } catch (StatusRuntimeException e) {
×
486
      handleStatusRuntimeException(e, responseObserver);
×
487
    }
1✔
488
  }
1✔
489

490
  private Context.CancellableContext deadlineCtx(Deadline deadline) {
491
    return Context.current().withDeadline(deadline, this.backgroundScheduler);
1✔
492
  }
493

494
  @Override
495
  public void pollActivityTaskQueue(
496
      PollActivityTaskQueueRequest pollRequest,
497
      StreamObserver<PollActivityTaskQueueResponse> responseObserver) {
498
    try (Context.CancellableContext ctx = deadlineCtx(getLongPollDeadline())) {
1✔
499

500
      PollActivityTaskQueueResponse.Builder task;
501
      try {
502
        task = pollTaskQueue(ctx, store.pollActivityTaskQueue(pollRequest));
1✔
503
      } catch (ExecutionException e) {
×
504
        responseObserver.onError(e);
×
505
        return;
×
506
      } catch (InterruptedException e) {
×
507
        Thread.currentThread().interrupt();
×
508
        responseObserver.onNext(PollActivityTaskQueueResponse.getDefaultInstance());
×
509
        responseObserver.onCompleted();
×
510
        return;
×
511
      } catch (CancellationException e) {
1✔
512
        responseObserver.onNext(PollActivityTaskQueueResponse.getDefaultInstance());
1✔
513
        responseObserver.onCompleted();
1✔
514
        return;
1✔
515
      }
1✔
516

517
      ExecutionId executionId =
1✔
518
          new ExecutionId(pollRequest.getNamespace(), task.getWorkflowExecution());
1✔
519
      TestWorkflowMutableState mutableState = getMutableState(executionId);
1✔
520
      try {
521
        mutableState.startActivityTask(task, pollRequest);
1✔
522
        responseObserver.onNext(task.build());
1✔
523
        responseObserver.onCompleted();
1✔
524
      } catch (StatusRuntimeException e) {
1✔
525
        if (e.getStatus().getCode() == Status.Code.NOT_FOUND) {
1✔
526
          if (log.isDebugEnabled()) {
1✔
527
            log.debug("Skipping outdated activity task for " + executionId, e);
×
528
          }
529
          responseObserver.onNext(PollActivityTaskQueueResponse.getDefaultInstance());
1✔
530
          responseObserver.onCompleted();
1✔
531
        } else {
532
          if (e.getStatus().getCode() == Status.Code.INTERNAL) {
×
533
            log.error("unexpected", e);
×
534
          }
535
          responseObserver.onError(e);
×
536
        }
537
      }
1✔
538
    }
1✔
539
  }
1✔
540

541
  @Override
542
  public void recordActivityTaskHeartbeat(
543
      RecordActivityTaskHeartbeatRequest heartbeatRequest,
544
      StreamObserver<RecordActivityTaskHeartbeatResponse> responseObserver) {
545
    try {
546
      ActivityTaskToken activityTaskToken =
1✔
547
          ActivityTaskToken.fromBytes(heartbeatRequest.getTaskToken());
1✔
548
      TestWorkflowMutableState mutableState = getMutableState(activityTaskToken.getExecutionId());
1✔
549
      boolean cancelRequested =
1✔
550
          mutableState.heartbeatActivityTask(
1✔
551
              activityTaskToken.getScheduledEventId(), heartbeatRequest.getDetails());
1✔
552
      responseObserver.onNext(
1✔
553
          RecordActivityTaskHeartbeatResponse.newBuilder()
1✔
554
              .setCancelRequested(cancelRequested)
1✔
555
              .build());
1✔
556
      responseObserver.onCompleted();
1✔
557
    } catch (StatusRuntimeException e) {
1✔
558
      handleStatusRuntimeException(e, responseObserver);
1✔
559
    }
1✔
560
  }
1✔
561

562
  @Override
563
  public void recordActivityTaskHeartbeatById(
564
      RecordActivityTaskHeartbeatByIdRequest heartbeatRequest,
565
      StreamObserver<RecordActivityTaskHeartbeatByIdResponse> responseObserver) {
566
    try {
567
      ExecutionId execution =
×
568
          new ExecutionId(
569
              heartbeatRequest.getNamespace(),
×
570
              heartbeatRequest.getWorkflowId(),
×
571
              heartbeatRequest.getRunId());
×
572
      TestWorkflowMutableState mutableState = getMutableState(execution);
×
573
      boolean cancelRequested =
×
574
          mutableState.heartbeatActivityTaskById(
×
575
              heartbeatRequest.getActivityId(),
×
576
              heartbeatRequest.getDetails(),
×
577
              heartbeatRequest.getIdentity());
×
578
      responseObserver.onNext(
×
579
          RecordActivityTaskHeartbeatByIdResponse.newBuilder()
×
580
              .setCancelRequested(cancelRequested)
×
581
              .build());
×
582
      responseObserver.onCompleted();
×
583
    } catch (StatusRuntimeException e) {
×
584
      handleStatusRuntimeException(e, responseObserver);
×
585
    }
×
586
  }
×
587

588
  @Override
589
  public void respondActivityTaskCompleted(
590
      RespondActivityTaskCompletedRequest completeRequest,
591
      StreamObserver<RespondActivityTaskCompletedResponse> responseObserver) {
592
    try {
593
      ActivityTaskToken activityTaskToken =
1✔
594
          ActivityTaskToken.fromBytes(completeRequest.getTaskToken());
1✔
595
      TestWorkflowMutableState mutableState = getMutableState(activityTaskToken.getExecutionId());
1✔
596
      mutableState.completeActivityTask(activityTaskToken.getScheduledEventId(), completeRequest);
1✔
597
      responseObserver.onNext(RespondActivityTaskCompletedResponse.getDefaultInstance());
1✔
598
      responseObserver.onCompleted();
1✔
599
    } catch (StatusRuntimeException e) {
1✔
600
      handleStatusRuntimeException(e, responseObserver);
1✔
601
    }
1✔
602
  }
1✔
603

604
  @Override
605
  public void respondActivityTaskCompletedById(
606
      RespondActivityTaskCompletedByIdRequest completeRequest,
607
      StreamObserver<RespondActivityTaskCompletedByIdResponse> responseObserver) {
608
    try {
609
      ExecutionId executionId =
×
610
          new ExecutionId(
611
              completeRequest.getNamespace(),
×
612
              completeRequest.getWorkflowId(),
×
613
              completeRequest.getRunId());
×
614
      TestWorkflowMutableState mutableState = getMutableState(executionId);
×
615
      mutableState.completeActivityTaskById(completeRequest.getActivityId(), completeRequest);
×
616
      responseObserver.onNext(RespondActivityTaskCompletedByIdResponse.getDefaultInstance());
×
617
      responseObserver.onCompleted();
×
618
    } catch (StatusRuntimeException e) {
×
619
      handleStatusRuntimeException(e, responseObserver);
×
620
    }
×
621
  }
×
622

623
  @Override
624
  public void respondActivityTaskFailed(
625
      RespondActivityTaskFailedRequest failRequest,
626
      StreamObserver<RespondActivityTaskFailedResponse> responseObserver) {
627
    try {
628
      ActivityTaskToken activityTaskToken = ActivityTaskToken.fromBytes(failRequest.getTaskToken());
1✔
629
      TestWorkflowMutableState mutableState = getMutableState(activityTaskToken.getExecutionId());
1✔
630
      mutableState.failActivityTask(activityTaskToken.getScheduledEventId(), failRequest);
1✔
631
      responseObserver.onNext(RespondActivityTaskFailedResponse.getDefaultInstance());
1✔
632
      responseObserver.onCompleted();
1✔
633
    } catch (StatusRuntimeException e) {
1✔
634
      handleStatusRuntimeException(e, responseObserver);
1✔
635
    }
1✔
636
  }
1✔
637

638
  @Override
639
  public void respondActivityTaskFailedById(
640
      RespondActivityTaskFailedByIdRequest failRequest,
641
      StreamObserver<RespondActivityTaskFailedByIdResponse> responseObserver) {
642
    try {
643
      ExecutionId executionId =
×
644
          new ExecutionId(
645
              failRequest.getNamespace(), failRequest.getWorkflowId(), failRequest.getRunId());
×
646
      TestWorkflowMutableState mutableState = getMutableState(executionId);
×
647
      mutableState.failActivityTaskById(failRequest.getActivityId(), failRequest);
×
648
      responseObserver.onNext(RespondActivityTaskFailedByIdResponse.getDefaultInstance());
×
649
      responseObserver.onCompleted();
×
650
    } catch (StatusRuntimeException e) {
×
651
      handleStatusRuntimeException(e, responseObserver);
×
652
    }
×
653
  }
×
654

655
  @Override
656
  public void respondActivityTaskCanceled(
657
      RespondActivityTaskCanceledRequest canceledRequest,
658
      StreamObserver<RespondActivityTaskCanceledResponse> responseObserver) {
659
    try {
660
      ActivityTaskToken activityTaskToken =
×
661
          ActivityTaskToken.fromBytes(canceledRequest.getTaskToken());
×
662
      TestWorkflowMutableState mutableState = getMutableState(activityTaskToken.getExecutionId());
×
663
      mutableState.cancelActivityTask(activityTaskToken.getScheduledEventId(), canceledRequest);
×
664
      responseObserver.onNext(RespondActivityTaskCanceledResponse.getDefaultInstance());
×
665
      responseObserver.onCompleted();
×
666
    } catch (StatusRuntimeException e) {
×
667
      handleStatusRuntimeException(e, responseObserver);
×
668
    }
×
669
  }
×
670

671
  @Override
672
  public void respondActivityTaskCanceledById(
673
      RespondActivityTaskCanceledByIdRequest canceledRequest,
674
      StreamObserver<RespondActivityTaskCanceledByIdResponse> responseObserver) {
675
    try {
676
      ExecutionId executionId =
×
677
          new ExecutionId(
678
              canceledRequest.getNamespace(),
×
679
              canceledRequest.getWorkflowId(),
×
680
              canceledRequest.getRunId());
×
681
      TestWorkflowMutableState mutableState = getMutableState(executionId);
×
682
      mutableState.cancelActivityTaskById(canceledRequest.getActivityId(), canceledRequest);
×
683
      responseObserver.onNext(RespondActivityTaskCanceledByIdResponse.getDefaultInstance());
×
684
      responseObserver.onCompleted();
×
685
    } catch (StatusRuntimeException e) {
×
686
      handleStatusRuntimeException(e, responseObserver);
×
687
    }
×
688
  }
×
689

690
  @Override
691
  public void requestCancelWorkflowExecution(
692
      RequestCancelWorkflowExecutionRequest cancelRequest,
693
      StreamObserver<RequestCancelWorkflowExecutionResponse> responseObserver) {
694
    try {
695
      requestCancelWorkflowExecution(cancelRequest, Optional.empty());
1✔
696
      responseObserver.onNext(RequestCancelWorkflowExecutionResponse.getDefaultInstance());
1✔
697
      responseObserver.onCompleted();
1✔
698
    } catch (StatusRuntimeException e) {
1✔
699
      handleStatusRuntimeException(e, responseObserver);
1✔
700
    }
1✔
701
  }
1✔
702

703
  void requestCancelWorkflowExecution(
704
      RequestCancelWorkflowExecutionRequest cancelRequest,
705
      Optional<TestWorkflowMutableStateImpl.CancelExternalWorkflowExecutionCallerInfo> callerInfo) {
706
    ExecutionId executionId =
1✔
707
        new ExecutionId(cancelRequest.getNamespace(), cancelRequest.getWorkflowExecution());
1✔
708
    TestWorkflowMutableState mutableState = getMutableState(executionId);
1✔
709
    mutableState.requestCancelWorkflowExecution(cancelRequest, callerInfo);
1✔
710
  }
1✔
711

712
  @Override
713
  public void terminateWorkflowExecution(
714
      TerminateWorkflowExecutionRequest request,
715
      StreamObserver<TerminateWorkflowExecutionResponse> responseObserver) {
716
    try {
717
      terminateWorkflowExecution(request);
1✔
718
      responseObserver.onNext(TerminateWorkflowExecutionResponse.getDefaultInstance());
1✔
719
      responseObserver.onCompleted();
1✔
720
    } catch (StatusRuntimeException e) {
1✔
721
      handleStatusRuntimeException(e, responseObserver);
1✔
722
    }
1✔
723
  }
1✔
724

725
  private void terminateWorkflowExecution(TerminateWorkflowExecutionRequest request) {
726
    ExecutionId executionId =
1✔
727
        new ExecutionId(request.getNamespace(), request.getWorkflowExecution());
1✔
728
    TestWorkflowMutableState mutableState = getMutableState(executionId);
1✔
729
    mutableState.terminateWorkflowExecution(request);
1✔
730
  }
1✔
731

732
  @Override
733
  public void signalWorkflowExecution(
734
      SignalWorkflowExecutionRequest signalRequest,
735
      StreamObserver<SignalWorkflowExecutionResponse> responseObserver) {
736
    try {
737
      ExecutionId executionId =
1✔
738
          new ExecutionId(signalRequest.getNamespace(), signalRequest.getWorkflowExecution());
1✔
739
      TestWorkflowMutableState mutableState = getMutableState(executionId);
1✔
740
      mutableState.signal(signalRequest);
1✔
741
      responseObserver.onNext(SignalWorkflowExecutionResponse.getDefaultInstance());
1✔
742
      responseObserver.onCompleted();
1✔
743
    } catch (StatusRuntimeException e) {
1✔
744
      handleStatusRuntimeException(e, responseObserver);
1✔
745
    }
1✔
746
  }
1✔
747

748
  @Override
749
  public void signalWithStartWorkflowExecution(
750
      SignalWithStartWorkflowExecutionRequest r,
751
      StreamObserver<SignalWithStartWorkflowExecutionResponse> responseObserver) {
752
    try {
753
      if (!r.hasTaskQueue()) {
1✔
754
        throw Status.INVALID_ARGUMENT
×
755
            .withDescription("request missing required taskQueue field")
×
756
            .asRuntimeException();
×
757
      }
758
      if (!r.hasWorkflowType()) {
1✔
759
        throw Status.INVALID_ARGUMENT
×
760
            .withDescription("request missing required workflowType field")
×
761
            .asRuntimeException();
×
762
      }
763
      ExecutionId executionId = new ExecutionId(r.getNamespace(), r.getWorkflowId(), null);
1✔
764
      TestWorkflowMutableState mutableState = getMutableState(executionId, false);
1✔
765
      SignalWorkflowExecutionRequest signalRequest =
766
          SignalWorkflowExecutionRequest.newBuilder()
1✔
767
              .setInput(r.getSignalInput())
1✔
768
              .setSignalName(r.getSignalName())
1✔
769
              .setWorkflowExecution(executionId.getExecution())
1✔
770
              .setRequestId(r.getRequestId())
1✔
771
              .setControl(r.getControl())
1✔
772
              .setNamespace(r.getNamespace())
1✔
773
              .setIdentity(r.getIdentity())
1✔
774
              .build();
1✔
775
      if (mutableState != null && !mutableState.isTerminalState()) {
1✔
776
        mutableState.signal(signalRequest);
1✔
777
        responseObserver.onNext(
1✔
778
            SignalWithStartWorkflowExecutionResponse.newBuilder()
1✔
779
                .setRunId(mutableState.getExecutionId().getExecution().getRunId())
1✔
780
                .build());
1✔
781
        responseObserver.onCompleted();
1✔
782
        return;
1✔
783
      }
784
      StartWorkflowExecutionRequest.Builder startRequest =
785
          StartWorkflowExecutionRequest.newBuilder()
1✔
786
              .setRequestId(r.getRequestId())
1✔
787
              .setInput(r.getInput())
1✔
788
              .setWorkflowExecutionTimeout(r.getWorkflowExecutionTimeout())
1✔
789
              .setWorkflowRunTimeout(r.getWorkflowRunTimeout())
1✔
790
              .setWorkflowTaskTimeout(r.getWorkflowTaskTimeout())
1✔
791
              .setNamespace(r.getNamespace())
1✔
792
              .setTaskQueue(r.getTaskQueue())
1✔
793
              .setWorkflowId(r.getWorkflowId())
1✔
794
              .setWorkflowIdReusePolicy(r.getWorkflowIdReusePolicy())
1✔
795
              .setIdentity(r.getIdentity())
1✔
796
              .setWorkflowType(r.getWorkflowType())
1✔
797
              .setCronSchedule(r.getCronSchedule())
1✔
798
              .setRequestId(r.getRequestId());
1✔
799
      if (r.hasRetryPolicy()) {
1✔
800
        startRequest.setRetryPolicy(r.getRetryPolicy());
×
801
      }
802
      if (r.hasHeader()) {
1✔
803
        startRequest.setHeader(r.getHeader());
1✔
804
      }
805
      if (r.hasMemo()) {
1✔
806
        startRequest.setMemo(r.getMemo());
×
807
      }
808
      if (r.hasSearchAttributes()) {
1✔
809
        startRequest.setSearchAttributes(r.getSearchAttributes());
1✔
810
      }
811
      StartWorkflowExecutionResponse startResult =
1✔
812
          startWorkflowExecutionImpl(
1✔
813
              startRequest.build(),
1✔
814
              Duration.ZERO,
815
              Optional.empty(),
1✔
816
              OptionalLong.empty(),
1✔
817
              signalRequest);
818
      responseObserver.onNext(
1✔
819
          SignalWithStartWorkflowExecutionResponse.newBuilder()
1✔
820
              .setRunId(startResult.getRunId())
1✔
821
              .build());
1✔
822
      responseObserver.onCompleted();
1✔
823
    } catch (StatusRuntimeException e) {
1✔
824
      handleStatusRuntimeException(e, responseObserver);
1✔
825
    }
1✔
826
  }
1✔
827

828
  public void signalExternalWorkflowExecution(
829
      String signalId,
830
      SignalExternalWorkflowExecutionCommandAttributes commandAttributes,
831
      TestWorkflowMutableState source) {
832
    String namespace;
833
    if (commandAttributes.getNamespace().isEmpty()) {
1✔
834
      namespace = source.getExecutionId().getNamespace();
1✔
835
    } else {
836
      namespace = commandAttributes.getNamespace();
×
837
    }
838
    ExecutionId executionId = new ExecutionId(namespace, commandAttributes.getExecution());
1✔
839
    TestWorkflowMutableState mutableState;
840
    try {
841
      mutableState = getMutableState(executionId);
1✔
842
      mutableState.signalFromWorkflow(commandAttributes);
1✔
843
      source.completeSignalExternalWorkflowExecution(
1✔
844
          signalId, mutableState.getExecutionId().getExecution().getRunId());
1✔
845
    } catch (StatusRuntimeException e) {
1✔
846
      if (e.getStatus().getCode() == Status.Code.NOT_FOUND) {
1✔
847
        source.failSignalExternalWorkflowExecution(
1✔
848
            signalId,
849
            SignalExternalWorkflowExecutionFailedCause
850
                .SIGNAL_EXTERNAL_WORKFLOW_EXECUTION_FAILED_CAUSE_EXTERNAL_WORKFLOW_EXECUTION_NOT_FOUND);
851
      } else {
852
        throw e;
×
853
      }
854
    }
1✔
855
  }
1✔
856

857
  /**
858
   * Creates next run of a workflow execution
859
   *
860
   * @return RunId
861
   */
862
  public String continueAsNew(
863
      StartWorkflowExecutionRequest previousRunStartRequest,
864
      WorkflowExecutionContinuedAsNewEventAttributes a,
865
      Optional<TestServiceRetryState> retryState,
866
      String identity,
867
      ExecutionId continuedExecutionId,
868
      String firstExecutionRunId,
869
      Optional<TestWorkflowMutableState> parent,
870
      OptionalLong parentChildInitiatedEventId) {
871
    StartWorkflowExecutionRequest.Builder startRequestBuilder =
872
        StartWorkflowExecutionRequest.newBuilder()
1✔
873
            .setRequestId(UUID.randomUUID().toString())
1✔
874
            .setWorkflowType(a.getWorkflowType())
1✔
875
            .setWorkflowRunTimeout(a.getWorkflowRunTimeout())
1✔
876
            .setWorkflowTaskTimeout(a.getWorkflowTaskTimeout())
1✔
877
            .setNamespace(continuedExecutionId.getNamespace())
1✔
878
            .setTaskQueue(a.getTaskQueue())
1✔
879
            .setWorkflowId(continuedExecutionId.getWorkflowId().getWorkflowId())
1✔
880
            .setWorkflowIdReusePolicy(previousRunStartRequest.getWorkflowIdReusePolicy())
1✔
881
            .setIdentity(identity)
1✔
882
            .setCronSchedule(previousRunStartRequest.getCronSchedule());
1✔
883
    if (previousRunStartRequest.hasRetryPolicy()) {
1✔
884
      startRequestBuilder.setRetryPolicy(previousRunStartRequest.getRetryPolicy());
1✔
885
    }
886
    if (a.hasInput()) {
1✔
887
      startRequestBuilder.setInput(a.getInput());
1✔
888
    }
889
    if (a.hasHeader()) {
1✔
890
      startRequestBuilder.setHeader(a.getHeader());
1✔
891
    }
892
    StartWorkflowExecutionRequest startRequest = startRequestBuilder.build();
1✔
893
    lock.lock();
1✔
894
    Optional<Failure> lastFail =
895
        a.hasFailure()
1✔
896
            ? Optional.of(a.getFailure())
1✔
897
            : retryState.flatMap(TestServiceRetryState::getPreviousRunFailure);
1✔
898
    try {
899
      StartWorkflowExecutionResponse response =
1✔
900
          startWorkflowExecutionNoRunningCheckLocked(
1✔
901
              startRequest,
902
              a.getNewExecutionRunId(),
1✔
903
              firstExecutionRunId,
904
              Optional.of(continuedExecutionId.getExecution().getRunId()),
1✔
905
              retryState,
906
              ProtobufTimeUtils.toJavaDuration(a.getBackoffStartInterval()),
1✔
907
              a.getLastCompletionResult(),
1✔
908
              lastFail,
909
              parent,
910
              parentChildInitiatedEventId,
911
              null,
912
              continuedExecutionId.getWorkflowId());
1✔
913
      return response.getRunId();
1✔
914
    } finally {
915
      lock.unlock();
1✔
916
    }
917
  }
918

919
  @Override
920
  public void listOpenWorkflowExecutions(
921
      ListOpenWorkflowExecutionsRequest listRequest,
922
      StreamObserver<ListOpenWorkflowExecutionsResponse> responseObserver) {
923
    try {
924
      Optional<String> workflowIdFilter;
925
      if (listRequest.hasExecutionFilter()
1✔
926
          && !listRequest.getExecutionFilter().getWorkflowId().isEmpty()) {
×
927
        workflowIdFilter = Optional.of(listRequest.getExecutionFilter().getWorkflowId());
×
928
      } else {
929
        workflowIdFilter = Optional.empty();
1✔
930
      }
931
      List<WorkflowExecutionInfo> result =
1✔
932
          store.listWorkflows(WorkflowState.OPEN, workflowIdFilter);
1✔
933
      responseObserver.onNext(
1✔
934
          ListOpenWorkflowExecutionsResponse.newBuilder().addAllExecutions(result).build());
1✔
935
      responseObserver.onCompleted();
1✔
936
    } catch (StatusRuntimeException e) {
×
937
      handleStatusRuntimeException(e, responseObserver);
×
938
    }
1✔
939
  }
1✔
940

941
  @Override
942
  public void listClosedWorkflowExecutions(
943
      ListClosedWorkflowExecutionsRequest listRequest,
944
      StreamObserver<ListClosedWorkflowExecutionsResponse> responseObserver) {
945
    try {
946
      Optional<String> workflowIdFilter;
947
      if (listRequest.hasExecutionFilter()
1✔
948
          && !listRequest.getExecutionFilter().getWorkflowId().isEmpty()) {
×
949
        workflowIdFilter = Optional.of(listRequest.getExecutionFilter().getWorkflowId());
×
950
      } else {
951
        workflowIdFilter = Optional.empty();
1✔
952
      }
953
      List<WorkflowExecutionInfo> result =
1✔
954
          store.listWorkflows(WorkflowState.CLOSED, workflowIdFilter);
1✔
955
      responseObserver.onNext(
1✔
956
          ListClosedWorkflowExecutionsResponse.newBuilder().addAllExecutions(result).build());
1✔
957
      responseObserver.onCompleted();
1✔
958
    } catch (StatusRuntimeException e) {
×
959
      handleStatusRuntimeException(e, responseObserver);
×
960
    }
1✔
961
  }
1✔
962

963
  @Override
964
  public void respondQueryTaskCompleted(
965
      RespondQueryTaskCompletedRequest completeRequest,
966
      StreamObserver<RespondQueryTaskCompletedResponse> responseObserver) {
967
    try {
968
      QueryId queryId = QueryId.fromBytes(completeRequest.getTaskToken());
1✔
969
      TestWorkflowMutableState mutableState = getMutableState(queryId.getExecutionId());
1✔
970
      mutableState.completeQuery(queryId, completeRequest);
1✔
971
      responseObserver.onNext(RespondQueryTaskCompletedResponse.getDefaultInstance());
1✔
972
      responseObserver.onCompleted();
1✔
973
    } catch (StatusRuntimeException e) {
×
974
      handleStatusRuntimeException(e, responseObserver);
×
975
    }
1✔
976
  }
1✔
977

978
  @Override
979
  public void queryWorkflow(
980
      QueryWorkflowRequest queryRequest, StreamObserver<QueryWorkflowResponse> responseObserver) {
981
    try {
982
      ExecutionId executionId =
1✔
983
          new ExecutionId(queryRequest.getNamespace(), queryRequest.getExecution());
1✔
984
      TestWorkflowMutableState mutableState = getMutableState(executionId);
1✔
985
      @Nullable Deadline deadline = Context.current().getDeadline();
1✔
986
      QueryWorkflowResponse result =
1✔
987
          mutableState.query(
1✔
988
              queryRequest,
989
              deadline != null ? deadline.timeRemaining(TimeUnit.MILLISECONDS) : Long.MAX_VALUE);
1✔
990
      responseObserver.onNext(result);
1✔
991
      responseObserver.onCompleted();
1✔
992
    } catch (StatusRuntimeException e) {
1✔
993
      handleStatusRuntimeException(e, responseObserver);
1✔
994
    }
1✔
995
  }
1✔
996

997
  @Override
998
  public void describeWorkflowExecution(
999
      DescribeWorkflowExecutionRequest request,
1000
      StreamObserver<io.temporal.api.workflowservice.v1.DescribeWorkflowExecutionResponse>
1001
          responseObserver) {
1002
    try {
1003
      if (request.getNamespace().isEmpty()) {
1✔
1004
        throw createInvalidArgument("Namespace not set on request.");
×
1005
      }
1006
      if (!request.hasExecution()) {
1✔
1007
        throw createInvalidArgument("Execution not set on request.");
×
1008
      }
1009

1010
      ExecutionId executionId = new ExecutionId(request.getNamespace(), request.getExecution());
1✔
1011
      TestWorkflowMutableState mutableState = getMutableState(executionId);
1✔
1012
      DescribeWorkflowExecutionResponse result = mutableState.describeWorkflowExecution();
1✔
1013
      responseObserver.onNext(result);
1✔
1014
      responseObserver.onCompleted();
1✔
1015
    } catch (StatusRuntimeException e) {
1✔
1016
      handleStatusRuntimeException(e, responseObserver);
1✔
1017
    }
1✔
1018
  }
1✔
1019

1020
  /**
1021
   * This method doesn't make much sense for test server, it accepts all namespaces as existent and
1022
   * registered. so, it's a trivial implementation just returning an info that a namespace is
1023
   * registered irrespectively of the input
1024
   */
1025
  @Override
1026
  public void describeNamespace(
1027
      DescribeNamespaceRequest request,
1028
      StreamObserver<DescribeNamespaceResponse> responseObserver) {
1029
    try {
1030
      if (request.getNamespace().isEmpty()) {
1✔
1031
        throw createInvalidArgument("Namespace not set on request.");
×
1032
      }
1033
      // generating a stable UUID for name
1034
      String namespaceId = UUID.nameUUIDFromBytes(request.getNamespace().getBytes()).toString();
1✔
1035
      DescribeNamespaceResponse result =
1036
          DescribeNamespaceResponse.newBuilder()
1✔
1037
              .setNamespaceInfo(
1✔
1038
                  NamespaceInfo.newBuilder()
1✔
1039
                      .setName(request.getNamespace())
1✔
1040
                      .setState(NamespaceState.NAMESPACE_STATE_REGISTERED)
1✔
1041
                      .setId(namespaceId)
1✔
1042
                      .build())
1✔
1043
              .build();
1✔
1044
      responseObserver.onNext(result);
1✔
1045
      responseObserver.onCompleted();
1✔
1046
    } catch (StatusRuntimeException e) {
1✔
1047
      handleStatusRuntimeException(e, responseObserver);
1✔
1048
    }
1✔
1049
  }
1✔
1050

1051
  private <R> R requireNotNull(String fieldName, R value) {
1052
    if (value == null) {
1✔
1053
      throw Status.INVALID_ARGUMENT
×
1054
          .withDescription("Missing required field \"" + fieldName + "\".")
×
1055
          .asRuntimeException();
×
1056
    }
1057
    return value;
1✔
1058
  }
1059

1060
  /**
1061
   * Adds diagnostic data about internal service state to the provided {@link StringBuilder}.
1062
   * Includes histories of all workflow instances stored in the service.
1063
   */
1064
  public void getDiagnostics(StringBuilder result) {
1065
    store.getDiagnostics(result);
×
1066
  }
×
1067

1068
  /**
1069
   * @deprecated use {@link io.temporal.serviceclient.TestServiceStubs} and {@link
1070
   *     io.temporal.api.testservice.v1.TestServiceGrpc.TestServiceBlockingStub#getCurrentTime(Empty)}
1071
   */
1072
  @Deprecated
1073
  public long currentTimeMillis() {
1074
    return selfAdvancingTimer.getClock().getAsLong();
×
1075
  }
1076

1077
  /** Invokes callback after the specified delay according to internal service clock. */
1078
  public void registerDelayedCallback(Duration delay, Runnable r) {
1079
    store.registerDelayedCallback(delay, r);
1✔
1080
  }
1✔
1081

1082
  /**
1083
   * Disables time skipping. To re-enable call {@link #unlockTimeSkipping(String)}. These calls are
1084
   * counted, so calling unlock does not guarantee that time is going to be skipped immediately as
1085
   * another lock can be holding it.
1086
   *
1087
   * @deprecated use {@link io.temporal.serviceclient.TestServiceStubs} and {@link
1088
   *     io.temporal.api.testservice.v1.TestServiceGrpc.TestServiceBlockingStub#lockTimeSkipping(LockTimeSkippingRequest)}
1089
   */
1090
  @Deprecated
1091
  public void lockTimeSkipping(String caller) {
1092
    selfAdvancingTimer.lockTimeSkipping(caller);
×
1093
  }
×
1094

1095
  /**
1096
   * @deprecated use {@link io.temporal.serviceclient.TestServiceStubs} and {@link
1097
   *     io.temporal.api.testservice.v1.TestServiceGrpc.TestServiceBlockingStub#unlockTimeSkipping(UnlockTimeSkippingRequest)}
1098
   */
1099
  @Deprecated
1100
  public void unlockTimeSkipping(String caller) {
1101
    selfAdvancingTimer.unlockTimeSkipping(caller);
×
1102
  }
×
1103

1104
  /**
1105
   * Unlocks time skipping and blocks the calling thread until internal clock passes the current +
1106
   * duration time.<br>
1107
   * When the time is reached, locks time skipping and returns.<br>
1108
   * Might not block at all due to time skipping. Or might block if the time skipping lock counter
1109
   * was more than 1.
1110
   *
1111
   * @deprecated use {@link io.temporal.serviceclient.TestServiceStubs} and {@link
1112
   *     io.temporal.api.testservice.v1.TestServiceGrpc.TestServiceBlockingStub#unlockTimeSkippingWithSleep(SleepRequest)}
1113
   */
1114
  @Deprecated
1115
  public void sleep(Duration duration) {
1116
    CompletableFuture<Void> result = new CompletableFuture<>();
×
1117
    selfAdvancingTimer.schedule(
×
1118
        duration,
1119
        () -> {
1120
          selfAdvancingTimer.lockTimeSkipping("TestWorkflowService sleep");
×
1121
          result.complete(null);
×
1122
        },
×
1123
        "workflow sleep");
1124
    selfAdvancingTimer.unlockTimeSkipping("TestWorkflowService sleep");
×
1125
    try {
1126
      result.get();
×
1127
    } catch (InterruptedException e) {
×
1128
      Thread.currentThread().interrupt();
×
1129
      throw new RuntimeException(e);
×
1130
    } catch (ExecutionException e) {
×
1131
      throw new RuntimeException(e);
×
1132
    }
×
1133
  }
×
1134

1135
  /**
1136
   * Temporal server times out task queue long poll calls after 1 minute and returns an empty
1137
   * result. After which the request has to be retried by the client if it wants to continue
1138
   * waiting. We emulate this behavior here.
1139
   *
1140
   * <p>If there is a deadline present, for task queue poll requests server will respond inside the
1141
   * deadline. Note that the latest is not applicable for getWorkflowExecutionHistory() long polls.
1142
   *
1143
   * @return minimum between the context deadline and maximum long poll deadline.
1144
   */
1145
  private Deadline getLongPollDeadline() {
1146
    @Nullable Deadline deadline = Context.current().getDeadline();
1✔
1147
    Deadline maximumDeadline =
1✔
1148
        Deadline.after(
1✔
1149
            WorkflowServiceStubsOptions.DEFAULT_SERVER_LONG_POLL_RPC_TIMEOUT.toMillis(),
1✔
1150
            TimeUnit.MILLISECONDS);
1151
    return deadline != null ? deadline.minimum(maximumDeadline) : maximumDeadline;
1✔
1152
  }
1153

1154
  private void handleStatusRuntimeException(
1155
      StatusRuntimeException e, StreamObserver<?> responseObserver) {
1156
    if (e.getStatus().getCode() == Status.Code.INTERNAL) {
1✔
1157
      log.error("unexpected", e);
1✔
1158
    }
1159
    responseObserver.onError(e);
1✔
1160
  }
1✔
1161

1162
  /**
1163
   * Creates an in-memory service along with client stubs for use in Java code. See also
1164
   * createServerOnly and createWithNoGrpcServer.
1165
   *
1166
   * @deprecated use {@link io.temporal.testserver.TestServer#createServer(boolean)} instead and
1167
   *     pass {@code lockTimeSkipping=false} to emulate the behavior of this method
1168
   */
1169
  @Deprecated
1170
  public TestWorkflowService() {
1171
    this(0, true);
×
1172
  }
×
1173

1174
  /**
1175
   * Creates an in-memory service along with client stubs for use in Java code. See also
1176
   * createServerOnly and createWithNoGrpcServer.
1177
   *
1178
   * @deprecated use {@link io.temporal.testserver.TestServer#createServer(boolean, long)} instead
1179
   *     and pass {@code lockTimeSkipping=false} to emulate the behavior of this method
1180
   */
1181
  @Deprecated
1182
  public TestWorkflowService(long initialTimeMillis) {
1183
    this(initialTimeMillis, true);
×
1184
  }
×
1185

1186
  /**
1187
   * Creates an in-memory service along with client stubs for use in Java code. See also
1188
   * createServerOnly and createWithNoGrpcServer.
1189
   *
1190
   * @deprecated use {@link io.temporal.testserver.TestServer#createServer(boolean)} instead
1191
   */
1192
  @Deprecated
1193
  public TestWorkflowService(boolean lockTimeSkipping) {
1194
    this(0, true);
×
1195
    if (lockTimeSkipping) {
×
1196
      this.lockTimeSkipping("constructor");
×
1197
    }
1198
  }
×
1199

1200
  /**
1201
   * Creates an instance of TestWorkflowService that does not manage its own gRPC server. Useful for
1202
   * including in an externally managed gRPC server.
1203
   *
1204
   * @deprecated use {@link TestServicesStarter} to create just the services with gRPC server
1205
   */
1206
  @Deprecated
1207
  public static TestWorkflowService createWithNoGrpcServer() {
1208
    return new TestWorkflowService(0, false);
×
1209
  }
1210

1211
  private TestWorkflowService(long initialTimeMillis, boolean startInProcessServer) {
×
1212
    this.selfAdvancingTimer =
×
1213
        new SelfAdvancingTimerImpl(initialTimeMillis, Clock.systemDefaultZone());
×
1214
    store = new TestWorkflowStoreImpl(this.selfAdvancingTimer);
×
1215
    visibilityStore = new TestVisibilityStoreImpl();
×
1216
    outOfProcessServer = null;
×
1217
    if (startInProcessServer) {
×
1218
      this.inProcessServer = new InProcessGRPCServer(Collections.singletonList(this));
×
1219
      this.workflowServiceStubs =
×
1220
          WorkflowServiceStubs.newServiceStubs(
×
1221
              WorkflowServiceStubsOptions.newBuilder()
×
1222
                  .setChannel(inProcessServer.getChannel())
×
1223
                  .build());
×
1224
    } else {
1225
      this.inProcessServer = null;
×
1226
      this.workflowServiceStubs = null;
×
1227
    }
1228
  }
×
1229

1230
  /**
1231
   * Creates an out-of-process rather than in-process server, and does not set up a client. Useful,
1232
   * for example, if you want to use the test service from other SDKs.
1233
   *
1234
   * @param port the port to listen on
1235
   * @deprecated use {@link io.temporal.testserver.TestServer#createPortBoundServer(int, boolean)}
1236
   *     instead and pass {@code lockTimeSkipping=false} to emulate the behavior of this method
1237
   */
1238
  @Deprecated
1239
  public static TestWorkflowService createServerOnly(int port) {
1240
    TestWorkflowService result = new TestWorkflowService(true, port);
×
1241
    log.info("Server started, listening on " + port);
×
1242
    return result;
×
1243
  }
1244

1245
  private TestWorkflowService(boolean isOutOfProc, int port) {
×
1246
    // isOutOfProc is just here to make unambiguous constructor overloading.
1247
    Preconditions.checkState(isOutOfProc, "Impossible.");
×
1248
    inProcessServer = null;
×
1249
    workflowServiceStubs = null;
×
1250
    this.selfAdvancingTimer = new SelfAdvancingTimerImpl(0, Clock.systemDefaultZone());
×
1251
    store = new TestWorkflowStoreImpl(selfAdvancingTimer);
×
1252
    visibilityStore = new TestVisibilityStoreImpl();
×
1253
    try {
1254
      ServerBuilder<?> serverBuilder =
×
1255
          Grpc.newServerBuilderForPort(port, InsecureServerCredentials.create());
×
1256
      GRPCServerHelper.registerServicesAndHealthChecks(
×
1257
          Collections.singletonList(this), serverBuilder);
×
1258
      outOfProcessServer = serverBuilder.build().start();
×
1259
    } catch (IOException e) {
×
1260
      throw new RuntimeException(e);
×
1261
    }
×
1262
  }
×
1263

1264
  @Deprecated
1265
  public WorkflowServiceStubs newClientStub() {
1266
    if (workflowServiceStubs == null) {
×
1267
      throw new RuntimeException(
×
1268
          "Cannot get a client when you created your TestWorkflowService with createServerOnly.");
1269
    }
1270
    return workflowServiceStubs;
×
1271
  }
1272

1273
  private static StatusRuntimeException createInvalidArgument(String description) {
1274
    throw Status.INVALID_ARGUMENT.withDescription(description).asRuntimeException();
1✔
1275
  }
1276
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc