• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

temporalio / sdk-java / #201

16 Oct 2023 03:47PM UTC coverage: 77.389% (+0.02%) from 77.368%
#201

push

github-actions

web-flow
Apply data converter context in more places (#1896)

Add data converter context to memo, lastFailure and schedules

23 of 23 new or added lines in 5 files covered. (100.0%)

18718 of 24187 relevant lines covered (77.39%)

0.77 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

71.27
/temporal-test-server/src/main/java/io/temporal/internal/testservice/TestWorkflowService.java
1
/*
2
 * Copyright (C) 2022 Temporal Technologies, Inc. All Rights Reserved.
3
 *
4
 * Copyright (C) 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved.
5
 *
6
 * Modifications copyright (C) 2017 Uber Technologies, Inc.
7
 *
8
 * Licensed under the Apache License, Version 2.0 (the "License");
9
 * you may not use this material except in compliance with the License.
10
 * You may obtain a copy of the License at
11
 *
12
 *   http://www.apache.org/licenses/LICENSE-2.0
13
 *
14
 * Unless required by applicable law or agreed to in writing, software
15
 * distributed under the License is distributed on an "AS IS" BASIS,
16
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17
 * See the License for the specific language governing permissions and
18
 * limitations under the License.
19
 */
20

21
package io.temporal.internal.testservice;
22

23
import static io.temporal.internal.testservice.CronUtils.getBackoffInterval;
24

25
import com.google.common.base.Preconditions;
26
import com.google.common.base.Throwables;
27
import com.google.protobuf.ByteString;
28
import com.google.protobuf.Empty;
29
import com.google.protobuf.Timestamp;
30
import com.google.protobuf.util.Timestamps;
31
import io.grpc.*;
32
import io.grpc.stub.StreamObserver;
33
import io.temporal.api.command.v1.SignalExternalWorkflowExecutionCommandAttributes;
34
import io.temporal.api.common.v1.Payload;
35
import io.temporal.api.common.v1.Payloads;
36
import io.temporal.api.common.v1.RetryPolicy;
37
import io.temporal.api.common.v1.WorkflowExecution;
38
import io.temporal.api.enums.v1.NamespaceState;
39
import io.temporal.api.enums.v1.SignalExternalWorkflowExecutionFailedCause;
40
import io.temporal.api.enums.v1.WorkflowExecutionStatus;
41
import io.temporal.api.enums.v1.WorkflowIdReusePolicy;
42
import io.temporal.api.errordetails.v1.WorkflowExecutionAlreadyStartedFailure;
43
import io.temporal.api.failure.v1.Failure;
44
import io.temporal.api.history.v1.WorkflowExecutionContinuedAsNewEventAttributes;
45
import io.temporal.api.namespace.v1.NamespaceInfo;
46
import io.temporal.api.testservice.v1.LockTimeSkippingRequest;
47
import io.temporal.api.testservice.v1.SleepRequest;
48
import io.temporal.api.testservice.v1.UnlockTimeSkippingRequest;
49
import io.temporal.api.workflow.v1.WorkflowExecutionInfo;
50
import io.temporal.api.workflowservice.v1.*;
51
import io.temporal.internal.common.ProtobufTimeUtils;
52
import io.temporal.internal.testservice.TestWorkflowStore.WorkflowState;
53
import io.temporal.serviceclient.StatusUtils;
54
import io.temporal.serviceclient.WorkflowServiceStubs;
55
import io.temporal.serviceclient.WorkflowServiceStubsOptions;
56
import java.io.Closeable;
57
import java.io.IOException;
58
import java.time.Clock;
59
import java.time.Duration;
60
import java.util.*;
61
import java.util.concurrent.*;
62
import java.util.concurrent.locks.Lock;
63
import java.util.concurrent.locks.ReentrantLock;
64
import javax.annotation.Nonnull;
65
import javax.annotation.Nullable;
66
import org.slf4j.Logger;
67
import org.slf4j.LoggerFactory;
68

69
/**
70
 * In memory implementation of the Workflow Service. To be used for testing purposes only.
71
 *
72
 * <p>Do not use directly, instead use {@link io.temporal.testing.TestWorkflowEnvironment}.
73
 */
74
public final class TestWorkflowService extends WorkflowServiceGrpc.WorkflowServiceImplBase
75
    implements Closeable {
76
  private static final Logger log = LoggerFactory.getLogger(TestWorkflowService.class);
1✔
77
  private final Map<ExecutionId, TestWorkflowMutableState> executions = new HashMap<>();
1✔
78
  // key->WorkflowId
79
  private final Map<WorkflowId, TestWorkflowMutableState> executionsByWorkflowId = new HashMap<>();
1✔
80
  private final ExecutorService executor = Executors.newCachedThreadPool();
1✔
81
  private final Lock lock = new ReentrantLock();
1✔
82

83
  private final TestWorkflowStore store;
84
  private final TestVisibilityStore visibilityStore;
85
  private final SelfAdvancingTimer selfAdvancingTimer;
86

87
  private final ScheduledExecutorService backgroundScheduler =
1✔
88
      Executors.newSingleThreadScheduledExecutor();
1✔
89

90
  private final Server outOfProcessServer;
91
  private final InProcessGRPCServer inProcessServer;
92
  private final WorkflowServiceStubs workflowServiceStubs;
93

94
  TestWorkflowService(
95
      TestWorkflowStore store,
96
      TestVisibilityStore visibilityStore,
97
      SelfAdvancingTimer selfAdvancingTimer) {
1✔
98
    this.store = store;
1✔
99
    this.visibilityStore = visibilityStore;
1✔
100
    this.selfAdvancingTimer = selfAdvancingTimer;
1✔
101
    this.outOfProcessServer = null;
1✔
102
    this.inProcessServer = null;
1✔
103
    this.workflowServiceStubs = null;
1✔
104
  }
1✔
105

106
  @Override
107
  public void close() {
108
    log.debug("Shutting down TestWorkflowService");
1✔
109

110
    log.debug("Shutting down background scheduler");
1✔
111
    backgroundScheduler.shutdown();
1✔
112

113
    if (outOfProcessServer != null) {
1✔
114
      log.info("Shutting down out-of-process GRPC server");
×
115
      outOfProcessServer.shutdown();
×
116
    }
117

118
    if (workflowServiceStubs != null) {
1✔
119
      workflowServiceStubs.shutdown();
×
120
    }
121

122
    if (inProcessServer != null) {
1✔
123
      log.info("Shutting down in-process GRPC server");
×
124
      inProcessServer.shutdown();
×
125
    }
126

127
    executor.shutdown();
1✔
128

129
    try {
130
      executor.awaitTermination(1, TimeUnit.SECONDS);
1✔
131

132
      if (outOfProcessServer != null) {
1✔
133
        outOfProcessServer.awaitTermination(1, TimeUnit.SECONDS);
×
134
      }
135

136
      if (workflowServiceStubs != null) {
1✔
137
        workflowServiceStubs.awaitTermination(1, TimeUnit.SECONDS);
×
138
      }
139

140
      if (inProcessServer != null) {
1✔
141
        inProcessServer.awaitTermination(1, TimeUnit.SECONDS);
×
142
      }
143

144
    } catch (InterruptedException e) {
×
145
      Thread.currentThread().interrupt();
×
146
      log.debug("shutdown interrupted", e);
×
147
    }
1✔
148

149
    store.close();
1✔
150
  }
1✔
151

152
  private TestWorkflowMutableState getMutableState(ExecutionId executionId) {
153
    return getMutableState(executionId, true);
1✔
154
  }
155

156
  private TestWorkflowMutableState getMutableState(ExecutionId executionId, boolean failNotExists) {
157
    lock.lock();
1✔
158
    try {
159
      if (executionId.getExecution().getRunId().isEmpty()) {
1✔
160
        return getMutableState(executionId.getWorkflowId(), failNotExists);
1✔
161
      }
162
      TestWorkflowMutableState mutableState = executions.get(executionId);
1✔
163
      if (mutableState == null && failNotExists) {
1✔
164
        throw Status.NOT_FOUND
1✔
165
            .withDescription(
1✔
166
                "Execution \""
167
                    + executionId
168
                    + "\" not found in mutable state. Known executions: "
169
                    + executions.values()
1✔
170
                    + ", service="
171
                    + this)
172
            .asRuntimeException();
1✔
173
      }
174
      return mutableState;
1✔
175
    } finally {
176
      lock.unlock();
1✔
177
    }
178
  }
179

180
  private TestWorkflowMutableState getMutableState(WorkflowId workflowId, boolean failNotExists) {
181
    lock.lock();
1✔
182
    try {
183
      TestWorkflowMutableState mutableState = executionsByWorkflowId.get(workflowId);
1✔
184
      if (mutableState == null && failNotExists) {
1✔
185
        throw Status.NOT_FOUND
1✔
186
            .withDescription("Execution not found in mutable state: " + workflowId)
1✔
187
            .asRuntimeException();
1✔
188
      }
189
      return mutableState;
1✔
190
    } finally {
191
      lock.unlock();
1✔
192
    }
193
  }
194

195
  @Override
196
  public void startWorkflowExecution(
197
      StartWorkflowExecutionRequest request,
198
      StreamObserver<StartWorkflowExecutionResponse> responseObserver) {
199
    try {
200
      Duration backoffInterval = getBackoffInterval(request.getCronSchedule(), store.currentTime());
1✔
201
      StartWorkflowExecutionResponse response =
1✔
202
          startWorkflowExecutionImpl(
1✔
203
              request, backoffInterval, Optional.empty(), OptionalLong.empty(), null);
1✔
204
      responseObserver.onNext(response);
1✔
205
      responseObserver.onCompleted();
1✔
206
    } catch (StatusRuntimeException e) {
1✔
207
      handleStatusRuntimeException(e, responseObserver);
1✔
208
    }
1✔
209
  }
1✔
210

211
  StartWorkflowExecutionResponse startWorkflowExecutionImpl(
212
      StartWorkflowExecutionRequest startRequest,
213
      Duration backoffStartInterval,
214
      Optional<TestWorkflowMutableState> parent,
215
      OptionalLong parentChildInitiatedEventId,
216
      @Nullable SignalWorkflowExecutionRequest signalWithStartSignal) {
217
    String requestWorkflowId = requireNotNull("WorkflowId", startRequest.getWorkflowId());
1✔
218
    String namespace = requireNotNull("Namespace", startRequest.getNamespace());
1✔
219
    WorkflowId workflowId = new WorkflowId(namespace, requestWorkflowId);
1✔
220
    TestWorkflowMutableState existing;
221
    lock.lock();
1✔
222
    try {
223
      String newRunId = UUID.randomUUID().toString();
1✔
224
      existing = executionsByWorkflowId.get(workflowId);
1✔
225
      if (existing != null) {
1✔
226
        WorkflowExecutionStatus status = existing.getWorkflowExecutionStatus();
1✔
227
        WorkflowIdReusePolicy policy = startRequest.getWorkflowIdReusePolicy();
1✔
228

229
        if (status == WorkflowExecutionStatus.WORKFLOW_EXECUTION_STATUS_RUNNING
1✔
230
            && policy == WorkflowIdReusePolicy.WORKFLOW_ID_REUSE_POLICY_TERMINATE_IF_RUNNING) {
231
          existing.terminateWorkflowExecution(
1✔
232
              TerminateWorkflowExecutionRequest.newBuilder()
1✔
233
                  .setNamespace(startRequest.getNamespace())
1✔
234
                  .setWorkflowExecution(existing.getExecutionId().getExecution())
1✔
235
                  .setReason("TerminateIfRunning WorkflowIdReusePolicy Policy")
1✔
236
                  .setIdentity("history-service")
1✔
237
                  .setDetails(
1✔
238
                      Payloads.newBuilder()
1✔
239
                          .addPayloads(
1✔
240
                              Payload.newBuilder()
1✔
241
                                  .setData(
1✔
242
                                      ByteString.copyFromUtf8(
1✔
243
                                          String.format("terminated by new runID: %s", newRunId)))
1✔
244
                                  .build())
1✔
245
                          .build())
1✔
246
                  .build());
1✔
247
        } else if (status == WorkflowExecutionStatus.WORKFLOW_EXECUTION_STATUS_RUNNING
1✔
248
            || policy == WorkflowIdReusePolicy.WORKFLOW_ID_REUSE_POLICY_REJECT_DUPLICATE) {
249
          return throwDuplicatedWorkflow(startRequest, existing);
×
250
        } else if (policy
1✔
251
                == WorkflowIdReusePolicy.WORKFLOW_ID_REUSE_POLICY_ALLOW_DUPLICATE_FAILED_ONLY
252
            && (status == WorkflowExecutionStatus.WORKFLOW_EXECUTION_STATUS_COMPLETED
253
                || status == WorkflowExecutionStatus.WORKFLOW_EXECUTION_STATUS_CONTINUED_AS_NEW)) {
254
          return throwDuplicatedWorkflow(startRequest, existing);
×
255
        }
256
      }
257
      Optional<TestServiceRetryState> retryState;
258
      Optional<Failure> lastFailure = Optional.empty();
1✔
259
      if (startRequest.hasRetryPolicy()) {
1✔
260
        Duration expirationInterval =
1✔
261
            ProtobufTimeUtils.toJavaDuration(startRequest.getWorkflowExecutionTimeout());
1✔
262
        retryState = newRetryStateLocked(startRequest.getRetryPolicy(), expirationInterval);
1✔
263
        if (retryState.isPresent()) {
1✔
264
          lastFailure = retryState.get().getPreviousRunFailure();
1✔
265
        }
266
      } else {
1✔
267
        retryState = Optional.empty();
1✔
268
      }
269
      return startWorkflowExecutionNoRunningCheckLocked(
1✔
270
          startRequest,
271
          newRunId,
272
          // it's the first execution in the continue-as-new chain, so firstExecutionRunId =
273
          // newRunId
274
          newRunId,
275
          Optional.empty(),
1✔
276
          retryState,
277
          backoffStartInterval,
278
          null,
279
          lastFailure,
280
          parent,
281
          parentChildInitiatedEventId,
282
          signalWithStartSignal,
283
          workflowId);
284
    } finally {
285
      lock.unlock();
1✔
286
    }
287
  }
288

289
  private Optional<TestServiceRetryState> newRetryStateLocked(
290
      RetryPolicy retryPolicy, Duration expirationInterval) {
291
    Timestamp expirationTime =
292
        expirationInterval.isZero()
1✔
293
            ? Timestamps.fromNanos(0)
1✔
294
            : Timestamps.add(
1✔
295
                store.currentTime(), ProtobufTimeUtils.toProtoDuration(expirationInterval));
1✔
296
    return Optional.of(new TestServiceRetryState(retryPolicy, expirationTime));
1✔
297
  }
298

299
  private StartWorkflowExecutionResponse throwDuplicatedWorkflow(
300
      StartWorkflowExecutionRequest startRequest, TestWorkflowMutableState existing) {
301
    WorkflowExecution execution = existing.getExecutionId().getExecution();
1✔
302
    WorkflowExecutionAlreadyStartedFailure error =
303
        WorkflowExecutionAlreadyStartedFailure.newBuilder()
1✔
304
            .setRunId(execution.getRunId())
1✔
305
            .setStartRequestId(startRequest.getRequestId())
1✔
306
            .build();
1✔
307
    throw StatusUtils.newException(
1✔
308
        Status.ALREADY_EXISTS.withDescription(
1✔
309
            String.format(
1✔
310
                "WorkflowId: %s, " + "RunId: %s", execution.getWorkflowId(), execution.getRunId())),
1✔
311
        error,
312
        WorkflowExecutionAlreadyStartedFailure.getDescriptor());
1✔
313
  }
314

315
  private StartWorkflowExecutionResponse startWorkflowExecutionNoRunningCheckLocked(
316
      StartWorkflowExecutionRequest startRequest,
317
      @Nonnull String runId,
318
      @Nonnull String firstExecutionRunId,
319
      Optional<String> continuedExecutionRunId,
320
      Optional<TestServiceRetryState> retryState,
321
      Duration backoffStartInterval,
322
      Payloads lastCompletionResult,
323
      Optional<Failure> lastFailure,
324
      Optional<TestWorkflowMutableState> parent,
325
      OptionalLong parentChildInitiatedEventId,
326
      @Nullable SignalWorkflowExecutionRequest signalWithStartSignal,
327
      WorkflowId workflowId) {
328
    String namespace = startRequest.getNamespace();
1✔
329
    TestWorkflowMutableState mutableState =
1✔
330
        new TestWorkflowMutableStateImpl(
331
            startRequest,
332
            firstExecutionRunId,
333
            runId,
334
            retryState,
335
            backoffStartInterval,
336
            lastCompletionResult,
337
            lastFailure,
338
            parent,
339
            parentChildInitiatedEventId,
340
            continuedExecutionRunId,
341
            this,
342
            store,
343
            visibilityStore,
344
            selfAdvancingTimer);
345
    WorkflowExecution execution = mutableState.getExecutionId().getExecution();
1✔
346
    ExecutionId executionId = new ExecutionId(namespace, execution);
1✔
347
    executionsByWorkflowId.put(workflowId, mutableState);
1✔
348
    executions.put(executionId, mutableState);
1✔
349

350
    PollWorkflowTaskQueueRequest eagerWorkflowTaskPollRequest =
351
        startRequest.getRequestEagerExecution()
1✔
352
            ? PollWorkflowTaskQueueRequest.newBuilder()
1✔
353
                .setIdentity(startRequest.getIdentity())
1✔
354
                .setNamespace(startRequest.getNamespace())
1✔
355
                .setTaskQueue(startRequest.getTaskQueue())
1✔
356
                .build()
1✔
357
            : null;
1✔
358

359
    @Nullable
360
    PollWorkflowTaskQueueResponse eagerWorkflowTask =
1✔
361
        mutableState.startWorkflow(
1✔
362
            continuedExecutionRunId.isPresent(),
1✔
363
            signalWithStartSignal,
364
            eagerWorkflowTaskPollRequest);
365
    StartWorkflowExecutionResponse.Builder response =
366
        StartWorkflowExecutionResponse.newBuilder().setRunId(execution.getRunId());
1✔
367
    if (eagerWorkflowTask != null) {
1✔
368
      response.setEagerWorkflowTask(eagerWorkflowTask);
1✔
369
    }
370
    return response.build();
1✔
371
  }
372

373
  @Override
374
  public void getWorkflowExecutionHistory(
375
      GetWorkflowExecutionHistoryRequest getRequest,
376
      StreamObserver<GetWorkflowExecutionHistoryResponse> responseObserver) {
377
    ExecutionId executionId = new ExecutionId(getRequest.getNamespace(), getRequest.getExecution());
1✔
378
    executor.execute(
1✔
379
        // preserving gRPC context deadline between threads
380
        Context.current()
1✔
381
            .wrap(
1✔
382
                () -> {
383
                  try {
384
                    TestWorkflowMutableState mutableState = getMutableState(executionId);
1✔
385
                    responseObserver.onNext(
1✔
386
                        store.getWorkflowExecutionHistory(
1✔
387
                            mutableState.getExecutionId(),
1✔
388
                            getRequest,
389
                            // We explicitly don't try to respond inside the context deadline.
390
                            // If we try to fit into the context deadline, the deadline may be not
391
                            // expired on the client side and an empty response will lead to a new
392
                            // request, making the client hammer the server at the tail end of the
393
                            // deadline.
394
                            // So this call is designed to wait fully till the end of the
395
                            // context deadline and throw DEADLINE_EXCEEDED if the deadline is less
396
                            // than 20s.
397
                            // If it's longer than 20 seconds - we return an empty result.
398
                            Deadline.after(20, TimeUnit.SECONDS)));
1✔
399
                    responseObserver.onCompleted();
1✔
400
                  } catch (StatusRuntimeException e) {
1✔
401
                    if (e.getStatus().getCode() == Status.Code.INTERNAL) {
1✔
402
                      log.error("unexpected", e);
×
403
                    }
404
                    responseObserver.onError(e);
1✔
405
                  } catch (Exception e) {
×
406
                    log.error("unexpected", e);
×
407
                    responseObserver.onError(e);
×
408
                  }
1✔
409
                }));
1✔
410
  }
1✔
411

412
  private <T> T pollTaskQueue(Context ctx, Future<T> futureValue)
413
      throws ExecutionException, InterruptedException {
414
    final Context.CancellationListener canceler = context -> futureValue.cancel(true);
1✔
415
    ctx.addListener(canceler, this.backgroundScheduler);
1✔
416
    try {
417
      return futureValue.get();
1✔
418
    } finally {
419
      ctx.removeListener(canceler);
1✔
420
    }
421
  }
422

423
  @Override
424
  public void pollWorkflowTaskQueue(
425
      PollWorkflowTaskQueueRequest pollRequest,
426
      StreamObserver<PollWorkflowTaskQueueResponse> responseObserver) {
427
    try (Context.CancellableContext ctx = deadlineCtx(getLongPollDeadline())) {
1✔
428
      PollWorkflowTaskQueueResponse.Builder task;
429
      try {
430
        task = pollTaskQueue(ctx, store.pollWorkflowTaskQueue(pollRequest));
1✔
431
      } catch (ExecutionException e) {
×
432
        responseObserver.onError(e);
×
433
        return;
×
434
      } catch (InterruptedException e) {
×
435
        Thread.currentThread().interrupt();
×
436
        responseObserver.onNext(PollWorkflowTaskQueueResponse.getDefaultInstance());
×
437
        responseObserver.onCompleted();
×
438
        return;
×
439
      } catch (CancellationException e) {
1✔
440
        responseObserver.onNext(PollWorkflowTaskQueueResponse.getDefaultInstance());
1✔
441
        responseObserver.onCompleted();
1✔
442
        return;
1✔
443
      }
1✔
444

445
      ExecutionId executionId =
1✔
446
          new ExecutionId(pollRequest.getNamespace(), task.getWorkflowExecution());
1✔
447
      TestWorkflowMutableState mutableState = getMutableState(executionId);
1✔
448
      try {
449
        mutableState.startWorkflowTask(task, pollRequest);
1✔
450
        // The task always has the original task queue that was created as part of the response.
451
        // This may be a different task queue than the task queue it was scheduled on, as in the
452
        // case of sticky execution.
453
        task.setWorkflowExecutionTaskQueue(mutableState.getStartRequest().getTaskQueue());
1✔
454
        PollWorkflowTaskQueueResponse response = task.build();
1✔
455
        responseObserver.onNext(response);
1✔
456
        responseObserver.onCompleted();
1✔
457
      } catch (StatusRuntimeException e) {
×
458
        if (e.getStatus().getCode() == Status.Code.NOT_FOUND) {
×
459
          if (log.isDebugEnabled()) {
×
460
            log.debug("Skipping outdated workflow task for " + executionId, e);
×
461
          }
462
          // The real service doesn't return this call on outdated task.
463
          // For simplicity, we return an empty result here.
464
          responseObserver.onNext(PollWorkflowTaskQueueResponse.getDefaultInstance());
×
465
          responseObserver.onCompleted();
×
466
        } else {
467
          if (e.getStatus().getCode() == Status.Code.INTERNAL) {
×
468
            log.error("unexpected", e);
×
469
          }
470
          responseObserver.onError(e);
×
471
        }
472
      }
1✔
473
    }
1✔
474
  }
1✔
475

476
  @Override
477
  public void respondWorkflowTaskCompleted(
478
      RespondWorkflowTaskCompletedRequest request,
479
      StreamObserver<RespondWorkflowTaskCompletedResponse> responseObserver) {
480
    try {
481
      WorkflowTaskToken taskToken = WorkflowTaskToken.fromBytes(request.getTaskToken());
1✔
482
      TestWorkflowMutableState mutableState = getMutableState(taskToken.getExecutionId());
1✔
483
      mutableState.completeWorkflowTask(taskToken.getHistorySize(), request);
1✔
484
      responseObserver.onNext(RespondWorkflowTaskCompletedResponse.getDefaultInstance());
1✔
485
      responseObserver.onCompleted();
1✔
486
    } catch (StatusRuntimeException e) {
1✔
487
      handleStatusRuntimeException(e, responseObserver);
1✔
488
    } catch (Throwable e) {
×
489
      responseObserver.onError(
×
490
          Status.INTERNAL
491
              .withDescription(Throwables.getStackTraceAsString(e))
×
492
              .withCause(e)
×
493
              .asRuntimeException());
×
494
    }
1✔
495
  }
1✔
496

497
  @Override
498
  public void respondWorkflowTaskFailed(
499
      RespondWorkflowTaskFailedRequest failedRequest,
500
      StreamObserver<RespondWorkflowTaskFailedResponse> responseObserver) {
501
    try {
502
      WorkflowTaskToken taskToken = WorkflowTaskToken.fromBytes(failedRequest.getTaskToken());
1✔
503
      TestWorkflowMutableState mutableState = getMutableState(taskToken.getExecutionId());
1✔
504
      mutableState.failWorkflowTask(failedRequest);
1✔
505
      responseObserver.onNext(RespondWorkflowTaskFailedResponse.getDefaultInstance());
1✔
506
      responseObserver.onCompleted();
1✔
507
    } catch (StatusRuntimeException e) {
×
508
      handleStatusRuntimeException(e, responseObserver);
×
509
    }
1✔
510
  }
1✔
511

512
  @Override
513
  public void getSystemInfo(
514
      GetSystemInfoRequest request, StreamObserver<GetSystemInfoResponse> responseObserver) {
515
    responseObserver.onNext(
1✔
516
        GetSystemInfoResponse.newBuilder()
1✔
517
            .setCapabilities(
1✔
518
                // These are the capabilities I could verify the test server supports
519
                GetSystemInfoResponse.Capabilities.newBuilder()
1✔
520
                    .setSdkMetadata(true)
1✔
521
                    .setSignalAndQueryHeader(true)
1✔
522
                    .setEncodedFailureAttributes(true)
1✔
523
                    .setEagerWorkflowStart(true)
1✔
524
                    .build())
1✔
525
            .build());
1✔
526
    responseObserver.onCompleted();
1✔
527
  }
1✔
528

529
  private Context.CancellableContext deadlineCtx(Deadline deadline) {
530
    return Context.current().withDeadline(deadline, this.backgroundScheduler);
1✔
531
  }
532

533
  @Override
534
  public void pollActivityTaskQueue(
535
      PollActivityTaskQueueRequest pollRequest,
536
      StreamObserver<PollActivityTaskQueueResponse> responseObserver) {
537
    try (Context.CancellableContext ctx = deadlineCtx(getLongPollDeadline())) {
1✔
538

539
      PollActivityTaskQueueResponse.Builder task;
540
      try {
541
        task = pollTaskQueue(ctx, store.pollActivityTaskQueue(pollRequest));
1✔
542
      } catch (ExecutionException e) {
×
543
        responseObserver.onError(e);
×
544
        return;
×
545
      } catch (InterruptedException e) {
×
546
        Thread.currentThread().interrupt();
×
547
        responseObserver.onNext(PollActivityTaskQueueResponse.getDefaultInstance());
×
548
        responseObserver.onCompleted();
×
549
        return;
×
550
      } catch (CancellationException e) {
1✔
551
        responseObserver.onNext(PollActivityTaskQueueResponse.getDefaultInstance());
1✔
552
        responseObserver.onCompleted();
1✔
553
        return;
1✔
554
      }
1✔
555

556
      ExecutionId executionId =
1✔
557
          new ExecutionId(pollRequest.getNamespace(), task.getWorkflowExecution());
1✔
558
      TestWorkflowMutableState mutableState = getMutableState(executionId);
1✔
559
      try {
560
        mutableState.startActivityTask(task, pollRequest);
1✔
561
        responseObserver.onNext(task.build());
1✔
562
        responseObserver.onCompleted();
1✔
563
      } catch (StatusRuntimeException e) {
1✔
564
        if (e.getStatus().getCode() == Status.Code.NOT_FOUND) {
1✔
565
          if (log.isDebugEnabled()) {
1✔
566
            log.debug("Skipping outdated activity task for " + executionId, e);
×
567
          }
568
          responseObserver.onNext(PollActivityTaskQueueResponse.getDefaultInstance());
1✔
569
          responseObserver.onCompleted();
1✔
570
        } else {
571
          if (e.getStatus().getCode() == Status.Code.INTERNAL) {
×
572
            log.error("unexpected", e);
×
573
          }
574
          responseObserver.onError(e);
×
575
        }
576
      }
1✔
577
    }
1✔
578
  }
1✔
579

580
  @Override
581
  public void recordActivityTaskHeartbeat(
582
      RecordActivityTaskHeartbeatRequest heartbeatRequest,
583
      StreamObserver<RecordActivityTaskHeartbeatResponse> responseObserver) {
584
    try {
585
      ActivityTaskToken activityTaskToken =
1✔
586
          ActivityTaskToken.fromBytes(heartbeatRequest.getTaskToken());
1✔
587
      TestWorkflowMutableState mutableState = getMutableState(activityTaskToken.getExecutionId());
1✔
588
      boolean cancelRequested =
1✔
589
          mutableState.heartbeatActivityTask(
1✔
590
              activityTaskToken.getScheduledEventId(), heartbeatRequest.getDetails());
1✔
591
      responseObserver.onNext(
1✔
592
          RecordActivityTaskHeartbeatResponse.newBuilder()
1✔
593
              .setCancelRequested(cancelRequested)
1✔
594
              .build());
1✔
595
      responseObserver.onCompleted();
1✔
596
    } catch (StatusRuntimeException e) {
1✔
597
      handleStatusRuntimeException(e, responseObserver);
1✔
598
    }
1✔
599
  }
1✔
600

601
  @Override
602
  public void recordActivityTaskHeartbeatById(
603
      RecordActivityTaskHeartbeatByIdRequest heartbeatRequest,
604
      StreamObserver<RecordActivityTaskHeartbeatByIdResponse> responseObserver) {
605
    try {
606
      ExecutionId execution =
×
607
          new ExecutionId(
608
              heartbeatRequest.getNamespace(),
×
609
              heartbeatRequest.getWorkflowId(),
×
610
              heartbeatRequest.getRunId());
×
611
      TestWorkflowMutableState mutableState = getMutableState(execution);
×
612
      boolean cancelRequested =
×
613
          mutableState.heartbeatActivityTaskById(
×
614
              heartbeatRequest.getActivityId(),
×
615
              heartbeatRequest.getDetails(),
×
616
              heartbeatRequest.getIdentity());
×
617
      responseObserver.onNext(
×
618
          RecordActivityTaskHeartbeatByIdResponse.newBuilder()
×
619
              .setCancelRequested(cancelRequested)
×
620
              .build());
×
621
      responseObserver.onCompleted();
×
622
    } catch (StatusRuntimeException e) {
×
623
      handleStatusRuntimeException(e, responseObserver);
×
624
    }
×
625
  }
×
626

627
  @Override
628
  public void respondActivityTaskCompleted(
629
      RespondActivityTaskCompletedRequest completeRequest,
630
      StreamObserver<RespondActivityTaskCompletedResponse> responseObserver) {
631
    try {
632
      ActivityTaskToken activityTaskToken =
1✔
633
          ActivityTaskToken.fromBytes(completeRequest.getTaskToken());
1✔
634
      TestWorkflowMutableState mutableState = getMutableState(activityTaskToken.getExecutionId());
1✔
635
      mutableState.completeActivityTask(activityTaskToken.getScheduledEventId(), completeRequest);
1✔
636
      responseObserver.onNext(RespondActivityTaskCompletedResponse.getDefaultInstance());
1✔
637
      responseObserver.onCompleted();
1✔
638
    } catch (StatusRuntimeException e) {
1✔
639
      handleStatusRuntimeException(e, responseObserver);
1✔
640
    }
1✔
641
  }
1✔
642

643
  @Override
644
  public void respondActivityTaskCompletedById(
645
      RespondActivityTaskCompletedByIdRequest completeRequest,
646
      StreamObserver<RespondActivityTaskCompletedByIdResponse> responseObserver) {
647
    try {
648
      ExecutionId executionId =
×
649
          new ExecutionId(
650
              completeRequest.getNamespace(),
×
651
              completeRequest.getWorkflowId(),
×
652
              completeRequest.getRunId());
×
653
      TestWorkflowMutableState mutableState = getMutableState(executionId);
×
654
      mutableState.completeActivityTaskById(completeRequest.getActivityId(), completeRequest);
×
655
      responseObserver.onNext(RespondActivityTaskCompletedByIdResponse.getDefaultInstance());
×
656
      responseObserver.onCompleted();
×
657
    } catch (StatusRuntimeException e) {
×
658
      handleStatusRuntimeException(e, responseObserver);
×
659
    }
×
660
  }
×
661

662
  @Override
663
  public void respondActivityTaskFailed(
664
      RespondActivityTaskFailedRequest failRequest,
665
      StreamObserver<RespondActivityTaskFailedResponse> responseObserver) {
666
    try {
667
      ActivityTaskToken activityTaskToken = ActivityTaskToken.fromBytes(failRequest.getTaskToken());
1✔
668
      TestWorkflowMutableState mutableState = getMutableState(activityTaskToken.getExecutionId());
1✔
669
      mutableState.failActivityTask(activityTaskToken.getScheduledEventId(), failRequest);
1✔
670
      responseObserver.onNext(RespondActivityTaskFailedResponse.getDefaultInstance());
1✔
671
      responseObserver.onCompleted();
1✔
672
    } catch (StatusRuntimeException e) {
1✔
673
      handleStatusRuntimeException(e, responseObserver);
1✔
674
    }
1✔
675
  }
1✔
676

677
  @Override
678
  public void respondActivityTaskFailedById(
679
      RespondActivityTaskFailedByIdRequest failRequest,
680
      StreamObserver<RespondActivityTaskFailedByIdResponse> responseObserver) {
681
    try {
682
      ExecutionId executionId =
×
683
          new ExecutionId(
684
              failRequest.getNamespace(), failRequest.getWorkflowId(), failRequest.getRunId());
×
685
      TestWorkflowMutableState mutableState = getMutableState(executionId);
×
686
      mutableState.failActivityTaskById(failRequest.getActivityId(), failRequest);
×
687
      responseObserver.onNext(RespondActivityTaskFailedByIdResponse.getDefaultInstance());
×
688
      responseObserver.onCompleted();
×
689
    } catch (StatusRuntimeException e) {
×
690
      handleStatusRuntimeException(e, responseObserver);
×
691
    }
×
692
  }
×
693

694
  @Override
695
  public void respondActivityTaskCanceled(
696
      RespondActivityTaskCanceledRequest canceledRequest,
697
      StreamObserver<RespondActivityTaskCanceledResponse> responseObserver) {
698
    try {
699
      ActivityTaskToken activityTaskToken =
1✔
700
          ActivityTaskToken.fromBytes(canceledRequest.getTaskToken());
1✔
701
      TestWorkflowMutableState mutableState = getMutableState(activityTaskToken.getExecutionId());
1✔
702
      mutableState.cancelActivityTask(activityTaskToken.getScheduledEventId(), canceledRequest);
1✔
703
      responseObserver.onNext(RespondActivityTaskCanceledResponse.getDefaultInstance());
1✔
704
      responseObserver.onCompleted();
1✔
705
    } catch (StatusRuntimeException e) {
×
706
      handleStatusRuntimeException(e, responseObserver);
×
707
    }
1✔
708
  }
1✔
709

710
  @Override
711
  public void respondActivityTaskCanceledById(
712
      RespondActivityTaskCanceledByIdRequest canceledRequest,
713
      StreamObserver<RespondActivityTaskCanceledByIdResponse> responseObserver) {
714
    try {
715
      ExecutionId executionId =
×
716
          new ExecutionId(
717
              canceledRequest.getNamespace(),
×
718
              canceledRequest.getWorkflowId(),
×
719
              canceledRequest.getRunId());
×
720
      TestWorkflowMutableState mutableState = getMutableState(executionId);
×
721
      mutableState.cancelActivityTaskById(canceledRequest.getActivityId(), canceledRequest);
×
722
      responseObserver.onNext(RespondActivityTaskCanceledByIdResponse.getDefaultInstance());
×
723
      responseObserver.onCompleted();
×
724
    } catch (StatusRuntimeException e) {
×
725
      handleStatusRuntimeException(e, responseObserver);
×
726
    }
×
727
  }
×
728

729
  @Override
730
  public void requestCancelWorkflowExecution(
731
      RequestCancelWorkflowExecutionRequest cancelRequest,
732
      StreamObserver<RequestCancelWorkflowExecutionResponse> responseObserver) {
733
    try {
734
      requestCancelWorkflowExecution(cancelRequest, Optional.empty());
1✔
735
      responseObserver.onNext(RequestCancelWorkflowExecutionResponse.getDefaultInstance());
1✔
736
      responseObserver.onCompleted();
1✔
737
    } catch (StatusRuntimeException e) {
1✔
738
      handleStatusRuntimeException(e, responseObserver);
1✔
739
    }
1✔
740
  }
1✔
741

742
  void requestCancelWorkflowExecution(
743
      RequestCancelWorkflowExecutionRequest cancelRequest,
744
      Optional<TestWorkflowMutableStateImpl.CancelExternalWorkflowExecutionCallerInfo> callerInfo) {
745
    ExecutionId executionId =
1✔
746
        new ExecutionId(cancelRequest.getNamespace(), cancelRequest.getWorkflowExecution());
1✔
747
    TestWorkflowMutableState mutableState = getMutableState(executionId);
1✔
748
    mutableState.requestCancelWorkflowExecution(cancelRequest, callerInfo);
1✔
749
  }
1✔
750

751
  @Override
752
  public void terminateWorkflowExecution(
753
      TerminateWorkflowExecutionRequest request,
754
      StreamObserver<TerminateWorkflowExecutionResponse> responseObserver) {
755
    try {
756
      terminateWorkflowExecution(request);
1✔
757
      responseObserver.onNext(TerminateWorkflowExecutionResponse.getDefaultInstance());
1✔
758
      responseObserver.onCompleted();
1✔
759
    } catch (StatusRuntimeException e) {
1✔
760
      handleStatusRuntimeException(e, responseObserver);
1✔
761
    }
1✔
762
  }
1✔
763

764
  private void terminateWorkflowExecution(TerminateWorkflowExecutionRequest request) {
765
    ExecutionId executionId =
1✔
766
        new ExecutionId(request.getNamespace(), request.getWorkflowExecution());
1✔
767
    TestWorkflowMutableState mutableState = getMutableState(executionId);
1✔
768
    mutableState.terminateWorkflowExecution(request);
1✔
769
  }
1✔
770

771
  @Override
772
  public void signalWorkflowExecution(
773
      SignalWorkflowExecutionRequest signalRequest,
774
      StreamObserver<SignalWorkflowExecutionResponse> responseObserver) {
775
    try {
776
      ExecutionId executionId =
1✔
777
          new ExecutionId(signalRequest.getNamespace(), signalRequest.getWorkflowExecution());
1✔
778
      TestWorkflowMutableState mutableState = getMutableState(executionId);
1✔
779
      mutableState.signal(signalRequest);
1✔
780
      responseObserver.onNext(SignalWorkflowExecutionResponse.getDefaultInstance());
1✔
781
      responseObserver.onCompleted();
1✔
782
    } catch (StatusRuntimeException e) {
1✔
783
      handleStatusRuntimeException(e, responseObserver);
1✔
784
    }
1✔
785
  }
1✔
786

787
  @Override
788
  public void updateWorkflowExecution(
789
      UpdateWorkflowExecutionRequest request,
790
      StreamObserver<UpdateWorkflowExecutionResponse> responseObserver) {
791
    try {
792
      ExecutionId executionId =
1✔
793
          new ExecutionId(request.getNamespace(), request.getWorkflowExecution());
1✔
794
      TestWorkflowMutableState mutableState = getMutableState(executionId);
1✔
795
      @Nullable Deadline deadline = Context.current().getDeadline();
1✔
796
      UpdateWorkflowExecutionResponse response =
1✔
797
          mutableState.updateWorkflowExecution(request, deadline);
1✔
798
      responseObserver.onNext(response);
1✔
799
      responseObserver.onCompleted();
1✔
800
    } catch (StatusRuntimeException e) {
1✔
801
      handleStatusRuntimeException(e, responseObserver);
1✔
802
    }
1✔
803
  }
1✔
804

805
  @Override
806
  public void pollWorkflowExecutionUpdate(
807
      PollWorkflowExecutionUpdateRequest request,
808
      StreamObserver<PollWorkflowExecutionUpdateResponse> responseObserver) {
809
    try {
810
      ExecutionId executionId =
1✔
811
          new ExecutionId(request.getNamespace(), request.getUpdateRef().getWorkflowExecution());
1✔
812
      TestWorkflowMutableState mutableState = getMutableState(executionId);
1✔
813
      @Nullable Deadline deadline = Context.current().getDeadline();
1✔
814
      PollWorkflowExecutionUpdateResponse response =
1✔
815
          mutableState.pollUpdateWorkflowExecution(request, deadline);
1✔
816
      responseObserver.onNext(response);
1✔
817
      responseObserver.onCompleted();
1✔
818
    } catch (StatusRuntimeException e) {
1✔
819
      handleStatusRuntimeException(e, responseObserver);
1✔
820
    }
1✔
821
  }
1✔
822

823
  @Override
824
  public void signalWithStartWorkflowExecution(
825
      SignalWithStartWorkflowExecutionRequest r,
826
      StreamObserver<SignalWithStartWorkflowExecutionResponse> responseObserver) {
827
    try {
828
      if (!r.hasTaskQueue()) {
1✔
829
        throw Status.INVALID_ARGUMENT
×
830
            .withDescription("request missing required taskQueue field")
×
831
            .asRuntimeException();
×
832
      }
833
      if (!r.hasWorkflowType()) {
1✔
834
        throw Status.INVALID_ARGUMENT
×
835
            .withDescription("request missing required workflowType field")
×
836
            .asRuntimeException();
×
837
      }
838
      ExecutionId executionId = new ExecutionId(r.getNamespace(), r.getWorkflowId(), null);
1✔
839
      TestWorkflowMutableState mutableState = getMutableState(executionId, false);
1✔
840
      SignalWorkflowExecutionRequest signalRequest =
841
          SignalWorkflowExecutionRequest.newBuilder()
1✔
842
              .setInput(r.getSignalInput())
1✔
843
              .setSignalName(r.getSignalName())
1✔
844
              .setWorkflowExecution(executionId.getExecution())
1✔
845
              .setRequestId(r.getRequestId())
1✔
846
              .setControl(r.getControl())
1✔
847
              .setNamespace(r.getNamespace())
1✔
848
              .setIdentity(r.getIdentity())
1✔
849
              .build();
1✔
850
      if (mutableState != null && !mutableState.isTerminalState()) {
1✔
851
        mutableState.signal(signalRequest);
1✔
852
        responseObserver.onNext(
1✔
853
            SignalWithStartWorkflowExecutionResponse.newBuilder()
1✔
854
                .setRunId(mutableState.getExecutionId().getExecution().getRunId())
1✔
855
                .build());
1✔
856
        responseObserver.onCompleted();
1✔
857
        return;
1✔
858
      }
859
      StartWorkflowExecutionRequest.Builder startRequest =
860
          StartWorkflowExecutionRequest.newBuilder()
1✔
861
              .setRequestId(r.getRequestId())
1✔
862
              .setInput(r.getInput())
1✔
863
              .setWorkflowExecutionTimeout(r.getWorkflowExecutionTimeout())
1✔
864
              .setWorkflowRunTimeout(r.getWorkflowRunTimeout())
1✔
865
              .setWorkflowTaskTimeout(r.getWorkflowTaskTimeout())
1✔
866
              .setNamespace(r.getNamespace())
1✔
867
              .setTaskQueue(r.getTaskQueue())
1✔
868
              .setWorkflowId(r.getWorkflowId())
1✔
869
              .setWorkflowIdReusePolicy(r.getWorkflowIdReusePolicy())
1✔
870
              .setIdentity(r.getIdentity())
1✔
871
              .setWorkflowType(r.getWorkflowType())
1✔
872
              .setCronSchedule(r.getCronSchedule())
1✔
873
              .setRequestId(r.getRequestId());
1✔
874
      if (r.hasRetryPolicy()) {
1✔
875
        startRequest.setRetryPolicy(r.getRetryPolicy());
×
876
      }
877
      if (r.hasHeader()) {
1✔
878
        startRequest.setHeader(r.getHeader());
1✔
879
      }
880
      if (r.hasMemo()) {
1✔
881
        startRequest.setMemo(r.getMemo());
×
882
      }
883
      if (r.hasSearchAttributes()) {
1✔
884
        startRequest.setSearchAttributes(r.getSearchAttributes());
1✔
885
      }
886
      if (r.hasWorkflowStartDelay()) {
1✔
887
        startRequest.setWorkflowStartDelay(r.getWorkflowStartDelay());
1✔
888
      }
889

890
      StartWorkflowExecutionResponse startResult =
1✔
891
          startWorkflowExecutionImpl(
1✔
892
              startRequest.build(),
1✔
893
              Duration.ZERO,
894
              Optional.empty(),
1✔
895
              OptionalLong.empty(),
1✔
896
              signalRequest);
897
      responseObserver.onNext(
1✔
898
          SignalWithStartWorkflowExecutionResponse.newBuilder()
1✔
899
              .setRunId(startResult.getRunId())
1✔
900
              .build());
1✔
901
      responseObserver.onCompleted();
1✔
902
    } catch (StatusRuntimeException e) {
1✔
903
      handleStatusRuntimeException(e, responseObserver);
1✔
904
    }
1✔
905
  }
1✔
906

907
  public void signalExternalWorkflowExecution(
908
      String signalId,
909
      SignalExternalWorkflowExecutionCommandAttributes commandAttributes,
910
      TestWorkflowMutableState source) {
911
    String namespace;
912
    if (commandAttributes.getNamespace().isEmpty()) {
1✔
913
      namespace = source.getExecutionId().getNamespace();
1✔
914
    } else {
915
      namespace = commandAttributes.getNamespace();
×
916
    }
917
    ExecutionId executionId = new ExecutionId(namespace, commandAttributes.getExecution());
1✔
918
    TestWorkflowMutableState mutableState;
919
    try {
920
      mutableState = getMutableState(executionId);
1✔
921
      mutableState.signalFromWorkflow(commandAttributes);
1✔
922
      source.completeSignalExternalWorkflowExecution(
1✔
923
          signalId, mutableState.getExecutionId().getExecution().getRunId());
1✔
924
    } catch (StatusRuntimeException e) {
1✔
925
      if (e.getStatus().getCode() == Status.Code.NOT_FOUND) {
1✔
926
        source.failSignalExternalWorkflowExecution(
1✔
927
            signalId,
928
            SignalExternalWorkflowExecutionFailedCause
929
                .SIGNAL_EXTERNAL_WORKFLOW_EXECUTION_FAILED_CAUSE_EXTERNAL_WORKFLOW_EXECUTION_NOT_FOUND);
930
      } else {
931
        throw e;
×
932
      }
933
    }
1✔
934
  }
1✔
935

936
  /**
937
   * Creates next run of a workflow execution
938
   *
939
   * @return RunId
940
   */
941
  public String continueAsNew(
942
      StartWorkflowExecutionRequest previousRunStartRequest,
943
      WorkflowExecutionContinuedAsNewEventAttributes a,
944
      Optional<TestServiceRetryState> retryState,
945
      String identity,
946
      ExecutionId continuedExecutionId,
947
      String firstExecutionRunId,
948
      Optional<TestWorkflowMutableState> parent,
949
      OptionalLong parentChildInitiatedEventId) {
950
    StartWorkflowExecutionRequest.Builder startRequestBuilder =
951
        StartWorkflowExecutionRequest.newBuilder()
1✔
952
            .setRequestId(UUID.randomUUID().toString())
1✔
953
            .setWorkflowType(a.getWorkflowType())
1✔
954
            .setWorkflowRunTimeout(a.getWorkflowRunTimeout())
1✔
955
            .setWorkflowTaskTimeout(a.getWorkflowTaskTimeout())
1✔
956
            .setNamespace(continuedExecutionId.getNamespace())
1✔
957
            .setTaskQueue(a.getTaskQueue())
1✔
958
            .setWorkflowId(continuedExecutionId.getWorkflowId().getWorkflowId())
1✔
959
            .setWorkflowIdReusePolicy(previousRunStartRequest.getWorkflowIdReusePolicy())
1✔
960
            .setIdentity(identity)
1✔
961
            .setCronSchedule(previousRunStartRequest.getCronSchedule());
1✔
962
    if (previousRunStartRequest.hasRetryPolicy()) {
1✔
963
      startRequestBuilder.setRetryPolicy(previousRunStartRequest.getRetryPolicy());
1✔
964
    }
965
    if (a.hasInput()) {
1✔
966
      startRequestBuilder.setInput(a.getInput());
1✔
967
    }
968
    if (a.hasHeader()) {
1✔
969
      startRequestBuilder.setHeader(a.getHeader());
1✔
970
    }
971
    StartWorkflowExecutionRequest startRequest = startRequestBuilder.build();
1✔
972
    lock.lock();
1✔
973
    Optional<Failure> lastFail =
974
        a.hasFailure()
1✔
975
            ? Optional.of(a.getFailure())
1✔
976
            : retryState.flatMap(TestServiceRetryState::getPreviousRunFailure);
1✔
977
    try {
978
      StartWorkflowExecutionResponse response =
1✔
979
          startWorkflowExecutionNoRunningCheckLocked(
1✔
980
              startRequest,
981
              a.getNewExecutionRunId(),
1✔
982
              firstExecutionRunId,
983
              Optional.of(continuedExecutionId.getExecution().getRunId()),
1✔
984
              retryState,
985
              ProtobufTimeUtils.toJavaDuration(a.getBackoffStartInterval()),
1✔
986
              a.getLastCompletionResult(),
1✔
987
              lastFail,
988
              parent,
989
              parentChildInitiatedEventId,
990
              null,
991
              continuedExecutionId.getWorkflowId());
1✔
992
      return response.getRunId();
1✔
993
    } finally {
994
      lock.unlock();
1✔
995
    }
996
  }
997

998
  @Override
999
  public void listOpenWorkflowExecutions(
1000
      ListOpenWorkflowExecutionsRequest listRequest,
1001
      StreamObserver<ListOpenWorkflowExecutionsResponse> responseObserver) {
1002
    try {
1003
      Optional<String> workflowIdFilter;
1004
      if (listRequest.hasExecutionFilter()
1✔
1005
          && !listRequest.getExecutionFilter().getWorkflowId().isEmpty()) {
×
1006
        workflowIdFilter = Optional.of(listRequest.getExecutionFilter().getWorkflowId());
×
1007
      } else {
1008
        workflowIdFilter = Optional.empty();
1✔
1009
      }
1010
      List<WorkflowExecutionInfo> result =
1✔
1011
          store.listWorkflows(WorkflowState.OPEN, workflowIdFilter);
1✔
1012
      responseObserver.onNext(
1✔
1013
          ListOpenWorkflowExecutionsResponse.newBuilder().addAllExecutions(result).build());
1✔
1014
      responseObserver.onCompleted();
1✔
1015
    } catch (StatusRuntimeException e) {
×
1016
      handleStatusRuntimeException(e, responseObserver);
×
1017
    }
1✔
1018
  }
1✔
1019

1020
  @Override
1021
  public void listClosedWorkflowExecutions(
1022
      ListClosedWorkflowExecutionsRequest listRequest,
1023
      StreamObserver<ListClosedWorkflowExecutionsResponse> responseObserver) {
1024
    try {
1025
      Optional<String> workflowIdFilter;
1026
      if (listRequest.hasExecutionFilter()
1✔
1027
          && !listRequest.getExecutionFilter().getWorkflowId().isEmpty()) {
×
1028
        workflowIdFilter = Optional.of(listRequest.getExecutionFilter().getWorkflowId());
×
1029
      } else {
1030
        workflowIdFilter = Optional.empty();
1✔
1031
      }
1032
      List<WorkflowExecutionInfo> result =
1✔
1033
          store.listWorkflows(WorkflowState.CLOSED, workflowIdFilter);
1✔
1034
      responseObserver.onNext(
1✔
1035
          ListClosedWorkflowExecutionsResponse.newBuilder().addAllExecutions(result).build());
1✔
1036
      responseObserver.onCompleted();
1✔
1037
    } catch (StatusRuntimeException e) {
×
1038
      handleStatusRuntimeException(e, responseObserver);
×
1039
    }
1✔
1040
  }
1✔
1041

1042
  @Override
1043
  public void respondQueryTaskCompleted(
1044
      RespondQueryTaskCompletedRequest completeRequest,
1045
      StreamObserver<RespondQueryTaskCompletedResponse> responseObserver) {
1046
    try {
1047
      QueryId queryId = QueryId.fromBytes(completeRequest.getTaskToken());
1✔
1048
      TestWorkflowMutableState mutableState = getMutableState(queryId.getExecutionId());
1✔
1049
      mutableState.completeQuery(queryId, completeRequest);
1✔
1050
      responseObserver.onNext(RespondQueryTaskCompletedResponse.getDefaultInstance());
1✔
1051
      responseObserver.onCompleted();
1✔
1052
    } catch (StatusRuntimeException e) {
×
1053
      handleStatusRuntimeException(e, responseObserver);
×
1054
    }
1✔
1055
  }
1✔
1056

1057
  @Override
1058
  public void queryWorkflow(
1059
      QueryWorkflowRequest queryRequest, StreamObserver<QueryWorkflowResponse> responseObserver) {
1060
    try {
1061
      ExecutionId executionId =
1✔
1062
          new ExecutionId(queryRequest.getNamespace(), queryRequest.getExecution());
1✔
1063
      TestWorkflowMutableState mutableState = getMutableState(executionId);
1✔
1064
      @Nullable Deadline deadline = Context.current().getDeadline();
1✔
1065
      QueryWorkflowResponse result =
1✔
1066
          mutableState.query(
1✔
1067
              queryRequest,
1068
              deadline != null ? deadline.timeRemaining(TimeUnit.MILLISECONDS) : Long.MAX_VALUE);
1✔
1069
      responseObserver.onNext(result);
1✔
1070
      responseObserver.onCompleted();
1✔
1071
    } catch (StatusRuntimeException e) {
1✔
1072
      handleStatusRuntimeException(e, responseObserver);
1✔
1073
    }
1✔
1074
  }
1✔
1075

1076
  @Override
1077
  public void describeWorkflowExecution(
1078
      DescribeWorkflowExecutionRequest request,
1079
      StreamObserver<io.temporal.api.workflowservice.v1.DescribeWorkflowExecutionResponse>
1080
          responseObserver) {
1081
    try {
1082
      if (request.getNamespace().isEmpty()) {
1✔
1083
        throw createInvalidArgument("Namespace not set on request.");
×
1084
      }
1085
      if (!request.hasExecution()) {
1✔
1086
        throw createInvalidArgument("Execution not set on request.");
×
1087
      }
1088

1089
      ExecutionId executionId = new ExecutionId(request.getNamespace(), request.getExecution());
1✔
1090
      TestWorkflowMutableState mutableState = getMutableState(executionId);
1✔
1091
      DescribeWorkflowExecutionResponse result = mutableState.describeWorkflowExecution();
1✔
1092
      responseObserver.onNext(result);
1✔
1093
      responseObserver.onCompleted();
1✔
1094
    } catch (StatusRuntimeException e) {
1✔
1095
      handleStatusRuntimeException(e, responseObserver);
1✔
1096
    }
1✔
1097
  }
1✔
1098

1099
  /**
1100
   * This method doesn't make much sense for test server, it accepts all namespaces as existent and
1101
   * registered. so, it's a trivial implementation just returning an info that a namespace is
1102
   * registered irrespectively of the input
1103
   */
1104
  @Override
1105
  public void describeNamespace(
1106
      DescribeNamespaceRequest request,
1107
      StreamObserver<DescribeNamespaceResponse> responseObserver) {
1108
    try {
1109
      if (request.getNamespace().isEmpty()) {
1✔
1110
        throw createInvalidArgument("Namespace not set on request.");
×
1111
      }
1112
      // generating a stable UUID for name
1113
      String namespaceId = UUID.nameUUIDFromBytes(request.getNamespace().getBytes()).toString();
1✔
1114
      DescribeNamespaceResponse result =
1115
          DescribeNamespaceResponse.newBuilder()
1✔
1116
              .setNamespaceInfo(
1✔
1117
                  NamespaceInfo.newBuilder()
1✔
1118
                      .setName(request.getNamespace())
1✔
1119
                      .setState(NamespaceState.NAMESPACE_STATE_REGISTERED)
1✔
1120
                      .setId(namespaceId)
1✔
1121
                      .build())
1✔
1122
              .build();
1✔
1123
      responseObserver.onNext(result);
1✔
1124
      responseObserver.onCompleted();
1✔
1125
    } catch (StatusRuntimeException e) {
1✔
1126
      handleStatusRuntimeException(e, responseObserver);
1✔
1127
    }
1✔
1128
  }
1✔
1129

1130
  private <R> R requireNotNull(String fieldName, R value) {
1131
    if (value == null) {
1✔
1132
      throw Status.INVALID_ARGUMENT
×
1133
          .withDescription("Missing required field \"" + fieldName + "\".")
×
1134
          .asRuntimeException();
×
1135
    }
1136
    return value;
1✔
1137
  }
1138

1139
  /**
1140
   * Adds diagnostic data about internal service state to the provided {@link StringBuilder}.
1141
   * Includes histories of all workflow instances stored in the service.
1142
   */
1143
  public void getDiagnostics(StringBuilder result) {
1144
    store.getDiagnostics(result);
×
1145
  }
×
1146

1147
  /**
1148
   * @deprecated use {@link io.temporal.serviceclient.TestServiceStubs} and {@link
1149
   *     io.temporal.api.testservice.v1.TestServiceGrpc.TestServiceBlockingStub#getCurrentTime(Empty)}
1150
   */
1151
  @Deprecated
1152
  public long currentTimeMillis() {
1153
    return selfAdvancingTimer.getClock().getAsLong();
×
1154
  }
1155

1156
  /** Invokes callback after the specified delay according to internal service clock. */
1157
  public void registerDelayedCallback(Duration delay, Runnable r) {
1158
    store.registerDelayedCallback(delay, r);
1✔
1159
  }
1✔
1160

1161
  /**
1162
   * Disables time skipping. To re-enable call {@link #unlockTimeSkipping(String)}. These calls are
1163
   * counted, so calling unlock does not guarantee that time is going to be skipped immediately as
1164
   * another lock can be holding it.
1165
   *
1166
   * @deprecated use {@link io.temporal.serviceclient.TestServiceStubs} and {@link
1167
   *     io.temporal.api.testservice.v1.TestServiceGrpc.TestServiceBlockingStub#lockTimeSkipping(LockTimeSkippingRequest)}
1168
   */
1169
  @Deprecated
1170
  public void lockTimeSkipping(String caller) {
1171
    selfAdvancingTimer.lockTimeSkipping(caller);
×
1172
  }
×
1173

1174
  /**
1175
   * @deprecated use {@link io.temporal.serviceclient.TestServiceStubs} and {@link
1176
   *     io.temporal.api.testservice.v1.TestServiceGrpc.TestServiceBlockingStub#unlockTimeSkipping(UnlockTimeSkippingRequest)}
1177
   */
1178
  @Deprecated
1179
  public void unlockTimeSkipping(String caller) {
1180
    selfAdvancingTimer.unlockTimeSkipping(caller);
×
1181
  }
×
1182

1183
  /**
1184
   * Unlocks time skipping and blocks the calling thread until internal clock passes the current +
1185
   * duration time.<br>
1186
   * When the time is reached, locks time skipping and returns.<br>
1187
   * Might not block at all due to time skipping. Or might block if the time skipping lock counter
1188
   * was more than 1.
1189
   *
1190
   * @deprecated use {@link io.temporal.serviceclient.TestServiceStubs} and {@link
1191
   *     io.temporal.api.testservice.v1.TestServiceGrpc.TestServiceBlockingStub#unlockTimeSkippingWithSleep(SleepRequest)}
1192
   */
1193
  @Deprecated
1194
  public void sleep(Duration duration) {
1195
    CompletableFuture<Void> result = new CompletableFuture<>();
×
1196
    selfAdvancingTimer.schedule(
×
1197
        duration,
1198
        () -> {
1199
          selfAdvancingTimer.lockTimeSkipping("TestWorkflowService sleep");
×
1200
          result.complete(null);
×
1201
        },
×
1202
        "workflow sleep");
1203
    selfAdvancingTimer.unlockTimeSkipping("TestWorkflowService sleep");
×
1204
    try {
1205
      result.get();
×
1206
    } catch (InterruptedException e) {
×
1207
      Thread.currentThread().interrupt();
×
1208
      throw new RuntimeException(e);
×
1209
    } catch (ExecutionException e) {
×
1210
      throw new RuntimeException(e);
×
1211
    }
×
1212
  }
×
1213

1214
  /**
1215
   * Temporal server times out task queue long poll calls after 1 minute and returns an empty
1216
   * result. After which the request has to be retried by the client if it wants to continue
1217
   * waiting. We emulate this behavior here.
1218
   *
1219
   * <p>If there is a deadline present, for task queue poll requests server will respond inside the
1220
   * deadline. Note that the latest is not applicable for getWorkflowExecutionHistory() long polls.
1221
   *
1222
   * @return minimum between the context deadline and maximum long poll deadline.
1223
   */
1224
  private Deadline getLongPollDeadline() {
1225
    @Nullable Deadline deadline = Context.current().getDeadline();
1✔
1226
    Deadline maximumDeadline =
1✔
1227
        Deadline.after(
1✔
1228
            WorkflowServiceStubsOptions.DEFAULT_SERVER_LONG_POLL_RPC_TIMEOUT.toMillis(),
1✔
1229
            TimeUnit.MILLISECONDS);
1230
    return deadline != null ? deadline.minimum(maximumDeadline) : maximumDeadline;
1✔
1231
  }
1232

1233
  private void handleStatusRuntimeException(
1234
      StatusRuntimeException e, StreamObserver<?> responseObserver) {
1235
    if (e.getStatus().getCode() == Status.Code.INTERNAL) {
1✔
1236
      log.error("unexpected", e);
1✔
1237
    }
1238
    responseObserver.onError(e);
1✔
1239
  }
1✔
1240

1241
  /**
1242
   * Creates an in-memory service along with client stubs for use in Java code. See also
1243
   * createServerOnly and createWithNoGrpcServer.
1244
   *
1245
   * @deprecated use {@link io.temporal.testserver.TestServer#createServer(boolean)} instead and
1246
   *     pass {@code lockTimeSkipping=false} to emulate the behavior of this method
1247
   */
1248
  @Deprecated
1249
  public TestWorkflowService() {
1250
    this(0, true);
×
1251
  }
×
1252

1253
  /**
1254
   * Creates an in-memory service along with client stubs for use in Java code. See also
1255
   * createServerOnly and createWithNoGrpcServer.
1256
   *
1257
   * @deprecated use {@link io.temporal.testserver.TestServer#createServer(boolean, long)} instead
1258
   *     and pass {@code lockTimeSkipping=false} to emulate the behavior of this method
1259
   */
1260
  @Deprecated
1261
  public TestWorkflowService(long initialTimeMillis) {
1262
    this(initialTimeMillis, true);
×
1263
  }
×
1264

1265
  /**
1266
   * Creates an in-memory service along with client stubs for use in Java code. See also
1267
   * createServerOnly and createWithNoGrpcServer.
1268
   *
1269
   * @deprecated use {@link io.temporal.testserver.TestServer#createServer(boolean)} instead
1270
   */
1271
  @Deprecated
1272
  public TestWorkflowService(boolean lockTimeSkipping) {
1273
    this(0, true);
×
1274
    if (lockTimeSkipping) {
×
1275
      this.lockTimeSkipping("constructor");
×
1276
    }
1277
  }
×
1278

1279
  /**
1280
   * Creates an instance of TestWorkflowService that does not manage its own gRPC server. Useful for
1281
   * including in an externally managed gRPC server.
1282
   *
1283
   * @deprecated use {@link TestServicesStarter} to create just the services with gRPC server
1284
   */
1285
  @Deprecated
1286
  public static TestWorkflowService createWithNoGrpcServer() {
1287
    return new TestWorkflowService(0, false);
×
1288
  }
1289

1290
  private TestWorkflowService(long initialTimeMillis, boolean startInProcessServer) {
×
1291
    this.selfAdvancingTimer =
×
1292
        new SelfAdvancingTimerImpl(initialTimeMillis, Clock.systemDefaultZone());
×
1293
    store = new TestWorkflowStoreImpl(this.selfAdvancingTimer);
×
1294
    visibilityStore = new TestVisibilityStoreImpl();
×
1295
    outOfProcessServer = null;
×
1296
    if (startInProcessServer) {
×
1297
      this.inProcessServer = new InProcessGRPCServer(Collections.singletonList(this));
×
1298
      this.workflowServiceStubs =
×
1299
          WorkflowServiceStubs.newServiceStubs(
×
1300
              WorkflowServiceStubsOptions.newBuilder()
×
1301
                  .setChannel(inProcessServer.getChannel())
×
1302
                  .build());
×
1303
    } else {
1304
      this.inProcessServer = null;
×
1305
      this.workflowServiceStubs = null;
×
1306
    }
1307
  }
×
1308

1309
  /**
1310
   * Creates an out-of-process rather than in-process server, and does not set up a client. Useful,
1311
   * for example, if you want to use the test service from other SDKs.
1312
   *
1313
   * @param port the port to listen on
1314
   * @deprecated use {@link io.temporal.testserver.TestServer#createPortBoundServer(int, boolean)}
1315
   *     instead and pass {@code lockTimeSkipping=false} to emulate the behavior of this method
1316
   */
1317
  @Deprecated
1318
  public static TestWorkflowService createServerOnly(int port) {
1319
    TestWorkflowService result = new TestWorkflowService(true, port);
×
1320
    log.info("Server started, listening on " + port);
×
1321
    return result;
×
1322
  }
1323

1324
  private TestWorkflowService(boolean isOutOfProc, int port) {
×
1325
    // isOutOfProc is just here to make unambiguous constructor overloading.
1326
    Preconditions.checkState(isOutOfProc, "Impossible.");
×
1327
    inProcessServer = null;
×
1328
    workflowServiceStubs = null;
×
1329
    this.selfAdvancingTimer = new SelfAdvancingTimerImpl(0, Clock.systemDefaultZone());
×
1330
    store = new TestWorkflowStoreImpl(selfAdvancingTimer);
×
1331
    visibilityStore = new TestVisibilityStoreImpl();
×
1332
    try {
1333
      ServerBuilder<?> serverBuilder =
×
1334
          Grpc.newServerBuilderForPort(port, InsecureServerCredentials.create());
×
1335
      GRPCServerHelper.registerServicesAndHealthChecks(
×
1336
          Collections.singletonList(this), serverBuilder);
×
1337
      outOfProcessServer = serverBuilder.build().start();
×
1338
    } catch (IOException e) {
×
1339
      throw new RuntimeException(e);
×
1340
    }
×
1341
  }
×
1342

1343
  @Deprecated
1344
  public WorkflowServiceStubs newClientStub() {
1345
    if (workflowServiceStubs == null) {
×
1346
      throw new RuntimeException(
×
1347
          "Cannot get a client when you created your TestWorkflowService with createServerOnly.");
1348
    }
1349
    return workflowServiceStubs;
×
1350
  }
1351

1352
  private static StatusRuntimeException createInvalidArgument(String description) {
1353
    throw Status.INVALID_ARGUMENT.withDescription(description).asRuntimeException();
1✔
1354
  }
1355
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc