• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

temporalio / sdk-java / #172

pending completion
#172

push

github-actions

web-flow
Update CODEOWNERS (#1773)

## What was changed
Update CODEOWNERS so that Security can own the Semgrep rules files and paths.

## Why?
We are adding Semgrep for static analysis to this repository, and only the security team should be able to approve exclusions from the policy.

## Checklist

How was this tested:
We ran this scanner on internal repos with this CODEOWNERS file and it worked as expected.

18029 of 22084 relevant lines covered (81.64%)

0.82 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

70.68
/temporal-test-server/src/main/java/io/temporal/internal/testservice/TestWorkflowService.java
1
/*
2
 * Copyright (C) 2022 Temporal Technologies, Inc. All Rights Reserved.
3
 *
4
 * Copyright (C) 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved.
5
 *
6
 * Modifications copyright (C) 2017 Uber Technologies, Inc.
7
 *
8
 * Licensed under the Apache License, Version 2.0 (the "License");
9
 * you may not use this material except in compliance with the License.
10
 * You may obtain a copy of the License at
11
 *
12
 *   http://www.apache.org/licenses/LICENSE-2.0
13
 *
14
 * Unless required by applicable law or agreed to in writing, software
15
 * distributed under the License is distributed on an "AS IS" BASIS,
16
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17
 * See the License for the specific language governing permissions and
18
 * limitations under the License.
19
 */
20

21
package io.temporal.internal.testservice;
22

23
import static io.temporal.internal.testservice.CronUtils.getBackoffInterval;
24

25
import com.google.common.base.Preconditions;
26
import com.google.common.base.Throwables;
27
import com.google.protobuf.ByteString;
28
import com.google.protobuf.Empty;
29
import com.google.protobuf.Timestamp;
30
import com.google.protobuf.util.Timestamps;
31
import io.grpc.*;
32
import io.grpc.stub.StreamObserver;
33
import io.temporal.api.command.v1.SignalExternalWorkflowExecutionCommandAttributes;
34
import io.temporal.api.common.v1.Payload;
35
import io.temporal.api.common.v1.Payloads;
36
import io.temporal.api.common.v1.RetryPolicy;
37
import io.temporal.api.common.v1.WorkflowExecution;
38
import io.temporal.api.enums.v1.NamespaceState;
39
import io.temporal.api.enums.v1.SignalExternalWorkflowExecutionFailedCause;
40
import io.temporal.api.enums.v1.WorkflowExecutionStatus;
41
import io.temporal.api.enums.v1.WorkflowIdReusePolicy;
42
import io.temporal.api.errordetails.v1.WorkflowExecutionAlreadyStartedFailure;
43
import io.temporal.api.failure.v1.Failure;
44
import io.temporal.api.history.v1.WorkflowExecutionContinuedAsNewEventAttributes;
45
import io.temporal.api.namespace.v1.NamespaceInfo;
46
import io.temporal.api.testservice.v1.LockTimeSkippingRequest;
47
import io.temporal.api.testservice.v1.SleepRequest;
48
import io.temporal.api.testservice.v1.UnlockTimeSkippingRequest;
49
import io.temporal.api.workflow.v1.WorkflowExecutionInfo;
50
import io.temporal.api.workflowservice.v1.*;
51
import io.temporal.internal.common.ProtobufTimeUtils;
52
import io.temporal.internal.testservice.TestWorkflowStore.WorkflowState;
53
import io.temporal.serviceclient.StatusUtils;
54
import io.temporal.serviceclient.WorkflowServiceStubs;
55
import io.temporal.serviceclient.WorkflowServiceStubsOptions;
56
import java.io.Closeable;
57
import java.io.IOException;
58
import java.time.Clock;
59
import java.time.Duration;
60
import java.util.*;
61
import java.util.concurrent.*;
62
import java.util.concurrent.locks.Lock;
63
import java.util.concurrent.locks.ReentrantLock;
64
import javax.annotation.Nonnull;
65
import javax.annotation.Nullable;
66
import org.slf4j.Logger;
67
import org.slf4j.LoggerFactory;
68

69
/**
70
 * In memory implementation of the Workflow Service. To be used for testing purposes only.
71
 *
72
 * <p>Do not use directly, instead use {@link io.temporal.testing.TestWorkflowEnvironment}.
73
 */
74
public final class TestWorkflowService extends WorkflowServiceGrpc.WorkflowServiceImplBase
75
    implements Closeable {
76
  private static final Logger log = LoggerFactory.getLogger(TestWorkflowService.class);
77
  private final Map<ExecutionId, TestWorkflowMutableState> executions = new HashMap<>();
78
  // key->WorkflowId
79
  private final Map<WorkflowId, TestWorkflowMutableState> executionsByWorkflowId = new HashMap<>();
80
  private final ExecutorService executor = Executors.newCachedThreadPool();
81
  private final Lock lock = new ReentrantLock();
82

83
  private final TestWorkflowStore store;
84
  private final TestVisibilityStore visibilityStore;
85
  private final SelfAdvancingTimer selfAdvancingTimer;
86

87
  private final ScheduledExecutorService backgroundScheduler =
88
      Executors.newSingleThreadScheduledExecutor();
89

90
  private final Server outOfProcessServer;
91
  private final InProcessGRPCServer inProcessServer;
92
  private final WorkflowServiceStubs workflowServiceStubs;
93

94
  TestWorkflowService(
95
      TestWorkflowStore store,
96
      TestVisibilityStore visibilityStore,
97
      SelfAdvancingTimer selfAdvancingTimer) {
98
    this.store = store;
99
    this.visibilityStore = visibilityStore;
100
    this.selfAdvancingTimer = selfAdvancingTimer;
101
    this.outOfProcessServer = null;
102
    this.inProcessServer = null;
103
    this.workflowServiceStubs = null;
104
  }
105

106
  @Override
107
  public void close() {
108
    log.debug("Shutting down TestWorkflowService");
109

110
    log.debug("Shutting down background scheduler");
111
    backgroundScheduler.shutdown();
112

113
    if (outOfProcessServer != null) {
114
      log.info("Shutting down out-of-process GRPC server");
115
      outOfProcessServer.shutdown();
116
    }
117

118
    if (workflowServiceStubs != null) {
119
      workflowServiceStubs.shutdown();
120
    }
121

122
    if (inProcessServer != null) {
123
      log.info("Shutting down in-process GRPC server");
124
      inProcessServer.shutdown();
125
    }
126

127
    executor.shutdown();
128

129
    try {
130
      executor.awaitTermination(1, TimeUnit.SECONDS);
131

132
      if (outOfProcessServer != null) {
133
        outOfProcessServer.awaitTermination(1, TimeUnit.SECONDS);
134
      }
135

136
      if (workflowServiceStubs != null) {
137
        workflowServiceStubs.awaitTermination(1, TimeUnit.SECONDS);
138
      }
139

140
      if (inProcessServer != null) {
141
        inProcessServer.awaitTermination(1, TimeUnit.SECONDS);
142
      }
143

144
    } catch (InterruptedException e) {
145
      Thread.currentThread().interrupt();
146
      log.debug("shutdown interrupted", e);
147
    }
148

149
    store.close();
150
  }
151

152
  private TestWorkflowMutableState getMutableState(ExecutionId executionId) {
153
    return getMutableState(executionId, true);
154
  }
155

156
  private TestWorkflowMutableState getMutableState(ExecutionId executionId, boolean failNotExists) {
157
    lock.lock();
158
    try {
159
      if (executionId.getExecution().getRunId().isEmpty()) {
160
        return getMutableState(executionId.getWorkflowId(), failNotExists);
161
      }
162
      TestWorkflowMutableState mutableState = executions.get(executionId);
163
      if (mutableState == null && failNotExists) {
164
        throw Status.NOT_FOUND
165
            .withDescription(
166
                "Execution \""
167
                    + executionId
168
                    + "\" not found in mutable state. Known executions: "
169
                    + executions.values()
170
                    + ", service="
171
                    + this)
172
            .asRuntimeException();
173
      }
174
      return mutableState;
175
    } finally {
176
      lock.unlock();
177
    }
178
  }
179

180
  private TestWorkflowMutableState getMutableState(WorkflowId workflowId, boolean failNotExists) {
181
    lock.lock();
182
    try {
183
      TestWorkflowMutableState mutableState = executionsByWorkflowId.get(workflowId);
184
      if (mutableState == null && failNotExists) {
185
        throw Status.NOT_FOUND
186
            .withDescription("Execution not found in mutable state: " + workflowId)
187
            .asRuntimeException();
188
      }
189
      return mutableState;
190
    } finally {
191
      lock.unlock();
192
    }
193
  }
194

195
  @Override
196
  public void startWorkflowExecution(
197
      StartWorkflowExecutionRequest request,
198
      StreamObserver<StartWorkflowExecutionResponse> responseObserver) {
199
    try {
200
      Duration backoffInterval = getBackoffInterval(request.getCronSchedule(), store.currentTime());
201
      StartWorkflowExecutionResponse response =
202
          startWorkflowExecutionImpl(
203
              request, backoffInterval, Optional.empty(), OptionalLong.empty(), null);
204
      responseObserver.onNext(response);
205
      responseObserver.onCompleted();
206
    } catch (StatusRuntimeException e) {
207
      handleStatusRuntimeException(e, responseObserver);
208
    }
209
  }
210

211
  StartWorkflowExecutionResponse startWorkflowExecutionImpl(
212
      StartWorkflowExecutionRequest startRequest,
213
      Duration backoffStartInterval,
214
      Optional<TestWorkflowMutableState> parent,
215
      OptionalLong parentChildInitiatedEventId,
216
      @Nullable SignalWorkflowExecutionRequest signalWithStartSignal) {
217
    String requestWorkflowId = requireNotNull("WorkflowId", startRequest.getWorkflowId());
218
    String namespace = requireNotNull("Namespace", startRequest.getNamespace());
219
    WorkflowId workflowId = new WorkflowId(namespace, requestWorkflowId);
220
    TestWorkflowMutableState existing;
221
    lock.lock();
222
    try {
223
      String newRunId = UUID.randomUUID().toString();
224
      existing = executionsByWorkflowId.get(workflowId);
225
      if (existing != null) {
226
        WorkflowExecutionStatus status = existing.getWorkflowExecutionStatus();
227
        WorkflowIdReusePolicy policy = startRequest.getWorkflowIdReusePolicy();
228

229
        if (status == WorkflowExecutionStatus.WORKFLOW_EXECUTION_STATUS_RUNNING
230
            && policy == WorkflowIdReusePolicy.WORKFLOW_ID_REUSE_POLICY_TERMINATE_IF_RUNNING) {
231
          existing.terminateWorkflowExecution(
232
              TerminateWorkflowExecutionRequest.newBuilder()
233
                  .setNamespace(startRequest.getNamespace())
234
                  .setWorkflowExecution(existing.getExecutionId().getExecution())
235
                  .setReason("TerminateIfRunning WorkflowIdReusePolicy Policy")
236
                  .setIdentity("history-service")
237
                  .setDetails(
238
                      Payloads.newBuilder()
239
                          .addPayloads(
240
                              Payload.newBuilder()
241
                                  .setData(
242
                                      ByteString.copyFromUtf8(
243
                                          String.format("terminated by new runID: %s", newRunId)))
244
                                  .build())
245
                          .build())
246
                  .build());
247
        } else if (status == WorkflowExecutionStatus.WORKFLOW_EXECUTION_STATUS_RUNNING
248
            || policy == WorkflowIdReusePolicy.WORKFLOW_ID_REUSE_POLICY_REJECT_DUPLICATE) {
249
          return throwDuplicatedWorkflow(startRequest, existing);
250
        } else if (policy
251
                == WorkflowIdReusePolicy.WORKFLOW_ID_REUSE_POLICY_ALLOW_DUPLICATE_FAILED_ONLY
252
            && (status == WorkflowExecutionStatus.WORKFLOW_EXECUTION_STATUS_COMPLETED
253
                || status == WorkflowExecutionStatus.WORKFLOW_EXECUTION_STATUS_CONTINUED_AS_NEW)) {
254
          return throwDuplicatedWorkflow(startRequest, existing);
255
        }
256
      }
257
      Optional<TestServiceRetryState> retryState;
258
      Optional<Failure> lastFailure = Optional.empty();
259
      if (startRequest.hasRetryPolicy()) {
260
        Duration expirationInterval =
261
            ProtobufTimeUtils.toJavaDuration(startRequest.getWorkflowExecutionTimeout());
262
        retryState = newRetryStateLocked(startRequest.getRetryPolicy(), expirationInterval);
263
        if (retryState.isPresent()) {
264
          lastFailure = retryState.get().getPreviousRunFailure();
265
        }
266
      } else {
267
        retryState = Optional.empty();
268
      }
269
      return startWorkflowExecutionNoRunningCheckLocked(
270
          startRequest,
271
          newRunId,
272
          // it's the first execution in the continue-as-new chain, so firstExecutionRunId =
273
          // newRunId
274
          newRunId,
275
          Optional.empty(),
276
          retryState,
277
          backoffStartInterval,
278
          null,
279
          lastFailure,
280
          parent,
281
          parentChildInitiatedEventId,
282
          signalWithStartSignal,
283
          workflowId);
284
    } finally {
285
      lock.unlock();
286
    }
287
  }
288

289
  private Optional<TestServiceRetryState> newRetryStateLocked(
290
      RetryPolicy retryPolicy, Duration expirationInterval) {
291
    Timestamp expirationTime =
292
        expirationInterval.isZero()
293
            ? Timestamps.fromNanos(0)
294
            : Timestamps.add(
295
                store.currentTime(), ProtobufTimeUtils.toProtoDuration(expirationInterval));
296
    return Optional.of(new TestServiceRetryState(retryPolicy, expirationTime));
297
  }
298

299
  private StartWorkflowExecutionResponse throwDuplicatedWorkflow(
300
      StartWorkflowExecutionRequest startRequest, TestWorkflowMutableState existing) {
301
    WorkflowExecution execution = existing.getExecutionId().getExecution();
302
    WorkflowExecutionAlreadyStartedFailure error =
303
        WorkflowExecutionAlreadyStartedFailure.newBuilder()
304
            .setRunId(execution.getRunId())
305
            .setStartRequestId(startRequest.getRequestId())
306
            .build();
307
    throw StatusUtils.newException(
308
        Status.ALREADY_EXISTS.withDescription(
309
            String.format(
310
                "WorkflowId: %s, " + "RunId: %s", execution.getWorkflowId(), execution.getRunId())),
311
        error,
312
        WorkflowExecutionAlreadyStartedFailure.getDescriptor());
313
  }
314

315
  private StartWorkflowExecutionResponse startWorkflowExecutionNoRunningCheckLocked(
316
      StartWorkflowExecutionRequest startRequest,
317
      @Nonnull String runId,
318
      @Nonnull String firstExecutionRunId,
319
      Optional<String> continuedExecutionRunId,
320
      Optional<TestServiceRetryState> retryState,
321
      Duration backoffStartInterval,
322
      Payloads lastCompletionResult,
323
      Optional<Failure> lastFailure,
324
      Optional<TestWorkflowMutableState> parent,
325
      OptionalLong parentChildInitiatedEventId,
326
      @Nullable SignalWorkflowExecutionRequest signalWithStartSignal,
327
      WorkflowId workflowId) {
328
    String namespace = startRequest.getNamespace();
329
    TestWorkflowMutableState mutableState =
330
        new TestWorkflowMutableStateImpl(
331
            startRequest,
332
            firstExecutionRunId,
333
            runId,
334
            retryState,
335
            backoffStartInterval,
336
            lastCompletionResult,
337
            lastFailure,
338
            parent,
339
            parentChildInitiatedEventId,
340
            continuedExecutionRunId,
341
            this,
342
            store,
343
            visibilityStore,
344
            selfAdvancingTimer);
345
    WorkflowExecution execution = mutableState.getExecutionId().getExecution();
346
    ExecutionId executionId = new ExecutionId(namespace, execution);
347
    executionsByWorkflowId.put(workflowId, mutableState);
348
    executions.put(executionId, mutableState);
349

350
    PollWorkflowTaskQueueRequest eagerWorkflowTaskPollRequest =
351
        startRequest.getRequestEagerExecution()
352
            ? PollWorkflowTaskQueueRequest.newBuilder()
353
                .setIdentity(startRequest.getIdentity())
354
                .setNamespace(startRequest.getNamespace())
355
                .setTaskQueue(startRequest.getTaskQueue())
356
                .build()
357
            : null;
358

359
    @Nullable
360
    PollWorkflowTaskQueueResponse eagerWorkflowTask =
361
        mutableState.startWorkflow(
362
            continuedExecutionRunId.isPresent(),
363
            signalWithStartSignal,
364
            eagerWorkflowTaskPollRequest);
365
    StartWorkflowExecutionResponse.Builder response =
366
        StartWorkflowExecutionResponse.newBuilder().setRunId(execution.getRunId());
367
    if (eagerWorkflowTask != null) {
368
      response.setEagerWorkflowTask(eagerWorkflowTask);
369
    }
370
    return response.build();
371
  }
372

373
  @Override
374
  public void getWorkflowExecutionHistory(
375
      GetWorkflowExecutionHistoryRequest getRequest,
376
      StreamObserver<GetWorkflowExecutionHistoryResponse> responseObserver) {
377
    ExecutionId executionId = new ExecutionId(getRequest.getNamespace(), getRequest.getExecution());
378
    executor.execute(
379
        // preserving gRPC context deadline between threads
380
        Context.current()
381
            .wrap(
382
                () -> {
383
                  try {
384
                    TestWorkflowMutableState mutableState = getMutableState(executionId);
385
                    responseObserver.onNext(
386
                        store.getWorkflowExecutionHistory(
387
                            mutableState.getExecutionId(),
388
                            getRequest,
389
                            // We explicitly don't try to respond inside the context deadline.
390
                            // If we try to fit into the context deadline, the deadline may be not
391
                            // expired on the client side and an empty response will lead to a new
392
                            // request, making the client hammer the server at the tail end of the
393
                            // deadline.
394
                            // So this call is designed to wait fully till the end of the
395
                            // context deadline and throw DEADLINE_EXCEEDED if the deadline is less
396
                            // than 20s.
397
                            // If it's longer than 20 seconds - we return an empty result.
398
                            Deadline.after(20, TimeUnit.SECONDS)));
399
                    responseObserver.onCompleted();
400
                  } catch (StatusRuntimeException e) {
401
                    if (e.getStatus().getCode() == Status.Code.INTERNAL) {
402
                      log.error("unexpected", e);
403
                    }
404
                    responseObserver.onError(e);
405
                  } catch (Exception e) {
406
                    log.error("unexpected", e);
407
                    responseObserver.onError(e);
408
                  }
409
                }));
410
  }
411

412
  private <T> T pollTaskQueue(Context ctx, Future<T> futureValue)
413
      throws ExecutionException, InterruptedException {
414
    final Context.CancellationListener canceler = context -> futureValue.cancel(true);
415
    ctx.addListener(canceler, this.backgroundScheduler);
416
    try {
417
      return futureValue.get();
418
    } finally {
419
      ctx.removeListener(canceler);
420
    }
421
  }
422

423
  @Override
424
  public void pollWorkflowTaskQueue(
425
      PollWorkflowTaskQueueRequest pollRequest,
426
      StreamObserver<PollWorkflowTaskQueueResponse> responseObserver) {
427
    try (Context.CancellableContext ctx = deadlineCtx(getLongPollDeadline())) {
428
      PollWorkflowTaskQueueResponse.Builder task;
429
      try {
430
        task = pollTaskQueue(ctx, store.pollWorkflowTaskQueue(pollRequest));
431
      } catch (ExecutionException e) {
432
        responseObserver.onError(e);
433
        return;
434
      } catch (InterruptedException e) {
435
        Thread.currentThread().interrupt();
436
        responseObserver.onNext(PollWorkflowTaskQueueResponse.getDefaultInstance());
437
        responseObserver.onCompleted();
438
        return;
439
      } catch (CancellationException e) {
440
        responseObserver.onNext(PollWorkflowTaskQueueResponse.getDefaultInstance());
441
        responseObserver.onCompleted();
442
        return;
443
      }
444

445
      ExecutionId executionId =
446
          new ExecutionId(pollRequest.getNamespace(), task.getWorkflowExecution());
447
      TestWorkflowMutableState mutableState = getMutableState(executionId);
448
      try {
449
        mutableState.startWorkflowTask(task, pollRequest);
450
        // The task always has the original task queue that was created as part of the response.
451
        // This may be a different task queue than the task queue it was scheduled on, as in the
452
        // case of sticky execution.
453
        task.setWorkflowExecutionTaskQueue(mutableState.getStartRequest().getTaskQueue());
454
        PollWorkflowTaskQueueResponse response = task.build();
455
        responseObserver.onNext(response);
456
        responseObserver.onCompleted();
457
      } catch (StatusRuntimeException e) {
458
        if (e.getStatus().getCode() == Status.Code.NOT_FOUND) {
459
          if (log.isDebugEnabled()) {
460
            log.debug("Skipping outdated workflow task for " + executionId, e);
461
          }
462
          // The real service doesn't return this call on outdated task.
463
          // For simplicity, we return an empty result here.
464
          responseObserver.onNext(PollWorkflowTaskQueueResponse.getDefaultInstance());
465
          responseObserver.onCompleted();
466
        } else {
467
          if (e.getStatus().getCode() == Status.Code.INTERNAL) {
468
            log.error("unexpected", e);
469
          }
470
          responseObserver.onError(e);
471
        }
472
      }
473
    }
474
  }
475

476
  @Override
477
  public void respondWorkflowTaskCompleted(
478
      RespondWorkflowTaskCompletedRequest request,
479
      StreamObserver<RespondWorkflowTaskCompletedResponse> responseObserver) {
480
    try {
481
      WorkflowTaskToken taskToken = WorkflowTaskToken.fromBytes(request.getTaskToken());
482
      TestWorkflowMutableState mutableState = getMutableState(taskToken.getExecutionId());
483
      mutableState.completeWorkflowTask(taskToken.getHistorySize(), request);
484
      responseObserver.onNext(RespondWorkflowTaskCompletedResponse.getDefaultInstance());
485
      responseObserver.onCompleted();
486
    } catch (StatusRuntimeException e) {
487
      handleStatusRuntimeException(e, responseObserver);
488
    } catch (Throwable e) {
489
      responseObserver.onError(
490
          Status.INTERNAL
491
              .withDescription(Throwables.getStackTraceAsString(e))
492
              .withCause(e)
493
              .asRuntimeException());
494
    }
495
  }
496

497
  @Override
498
  public void respondWorkflowTaskFailed(
499
      RespondWorkflowTaskFailedRequest failedRequest,
500
      StreamObserver<RespondWorkflowTaskFailedResponse> responseObserver) {
501
    try {
502
      WorkflowTaskToken taskToken = WorkflowTaskToken.fromBytes(failedRequest.getTaskToken());
503
      TestWorkflowMutableState mutableState = getMutableState(taskToken.getExecutionId());
504
      mutableState.failWorkflowTask(failedRequest);
505
      responseObserver.onNext(RespondWorkflowTaskFailedResponse.getDefaultInstance());
506
      responseObserver.onCompleted();
507
    } catch (StatusRuntimeException e) {
508
      handleStatusRuntimeException(e, responseObserver);
509
    }
510
  }
511

512
  private Context.CancellableContext deadlineCtx(Deadline deadline) {
513
    return Context.current().withDeadline(deadline, this.backgroundScheduler);
514
  }
515

516
  @Override
517
  public void pollActivityTaskQueue(
518
      PollActivityTaskQueueRequest pollRequest,
519
      StreamObserver<PollActivityTaskQueueResponse> responseObserver) {
520
    try (Context.CancellableContext ctx = deadlineCtx(getLongPollDeadline())) {
521

522
      PollActivityTaskQueueResponse.Builder task;
523
      try {
524
        task = pollTaskQueue(ctx, store.pollActivityTaskQueue(pollRequest));
525
      } catch (ExecutionException e) {
526
        responseObserver.onError(e);
527
        return;
528
      } catch (InterruptedException e) {
529
        Thread.currentThread().interrupt();
530
        responseObserver.onNext(PollActivityTaskQueueResponse.getDefaultInstance());
531
        responseObserver.onCompleted();
532
        return;
533
      } catch (CancellationException e) {
534
        responseObserver.onNext(PollActivityTaskQueueResponse.getDefaultInstance());
535
        responseObserver.onCompleted();
536
        return;
537
      }
538

539
      ExecutionId executionId =
540
          new ExecutionId(pollRequest.getNamespace(), task.getWorkflowExecution());
541
      TestWorkflowMutableState mutableState = getMutableState(executionId);
542
      try {
543
        mutableState.startActivityTask(task, pollRequest);
544
        responseObserver.onNext(task.build());
545
        responseObserver.onCompleted();
546
      } catch (StatusRuntimeException e) {
547
        if (e.getStatus().getCode() == Status.Code.NOT_FOUND) {
548
          if (log.isDebugEnabled()) {
549
            log.debug("Skipping outdated activity task for " + executionId, e);
550
          }
551
          responseObserver.onNext(PollActivityTaskQueueResponse.getDefaultInstance());
552
          responseObserver.onCompleted();
553
        } else {
554
          if (e.getStatus().getCode() == Status.Code.INTERNAL) {
555
            log.error("unexpected", e);
556
          }
557
          responseObserver.onError(e);
558
        }
559
      }
560
    }
561
  }
562

563
  @Override
564
  public void recordActivityTaskHeartbeat(
565
      RecordActivityTaskHeartbeatRequest heartbeatRequest,
566
      StreamObserver<RecordActivityTaskHeartbeatResponse> responseObserver) {
567
    try {
568
      ActivityTaskToken activityTaskToken =
569
          ActivityTaskToken.fromBytes(heartbeatRequest.getTaskToken());
570
      TestWorkflowMutableState mutableState = getMutableState(activityTaskToken.getExecutionId());
571
      boolean cancelRequested =
572
          mutableState.heartbeatActivityTask(
573
              activityTaskToken.getScheduledEventId(), heartbeatRequest.getDetails());
574
      responseObserver.onNext(
575
          RecordActivityTaskHeartbeatResponse.newBuilder()
576
              .setCancelRequested(cancelRequested)
577
              .build());
578
      responseObserver.onCompleted();
579
    } catch (StatusRuntimeException e) {
580
      handleStatusRuntimeException(e, responseObserver);
581
    }
582
  }
583

584
  @Override
585
  public void recordActivityTaskHeartbeatById(
586
      RecordActivityTaskHeartbeatByIdRequest heartbeatRequest,
587
      StreamObserver<RecordActivityTaskHeartbeatByIdResponse> responseObserver) {
588
    try {
589
      ExecutionId execution =
590
          new ExecutionId(
591
              heartbeatRequest.getNamespace(),
592
              heartbeatRequest.getWorkflowId(),
593
              heartbeatRequest.getRunId());
594
      TestWorkflowMutableState mutableState = getMutableState(execution);
595
      boolean cancelRequested =
596
          mutableState.heartbeatActivityTaskById(
597
              heartbeatRequest.getActivityId(),
598
              heartbeatRequest.getDetails(),
599
              heartbeatRequest.getIdentity());
600
      responseObserver.onNext(
601
          RecordActivityTaskHeartbeatByIdResponse.newBuilder()
602
              .setCancelRequested(cancelRequested)
603
              .build());
604
      responseObserver.onCompleted();
605
    } catch (StatusRuntimeException e) {
606
      handleStatusRuntimeException(e, responseObserver);
607
    }
608
  }
609

610
  @Override
611
  public void respondActivityTaskCompleted(
612
      RespondActivityTaskCompletedRequest completeRequest,
613
      StreamObserver<RespondActivityTaskCompletedResponse> responseObserver) {
614
    try {
615
      ActivityTaskToken activityTaskToken =
616
          ActivityTaskToken.fromBytes(completeRequest.getTaskToken());
617
      TestWorkflowMutableState mutableState = getMutableState(activityTaskToken.getExecutionId());
618
      mutableState.completeActivityTask(activityTaskToken.getScheduledEventId(), completeRequest);
619
      responseObserver.onNext(RespondActivityTaskCompletedResponse.getDefaultInstance());
620
      responseObserver.onCompleted();
621
    } catch (StatusRuntimeException e) {
622
      handleStatusRuntimeException(e, responseObserver);
623
    }
624
  }
625

626
  @Override
627
  public void respondActivityTaskCompletedById(
628
      RespondActivityTaskCompletedByIdRequest completeRequest,
629
      StreamObserver<RespondActivityTaskCompletedByIdResponse> responseObserver) {
630
    try {
631
      ExecutionId executionId =
632
          new ExecutionId(
633
              completeRequest.getNamespace(),
634
              completeRequest.getWorkflowId(),
635
              completeRequest.getRunId());
636
      TestWorkflowMutableState mutableState = getMutableState(executionId);
637
      mutableState.completeActivityTaskById(completeRequest.getActivityId(), completeRequest);
638
      responseObserver.onNext(RespondActivityTaskCompletedByIdResponse.getDefaultInstance());
639
      responseObserver.onCompleted();
640
    } catch (StatusRuntimeException e) {
641
      handleStatusRuntimeException(e, responseObserver);
642
    }
643
  }
644

645
  @Override
646
  public void respondActivityTaskFailed(
647
      RespondActivityTaskFailedRequest failRequest,
648
      StreamObserver<RespondActivityTaskFailedResponse> responseObserver) {
649
    try {
650
      ActivityTaskToken activityTaskToken = ActivityTaskToken.fromBytes(failRequest.getTaskToken());
651
      TestWorkflowMutableState mutableState = getMutableState(activityTaskToken.getExecutionId());
652
      mutableState.failActivityTask(activityTaskToken.getScheduledEventId(), failRequest);
653
      responseObserver.onNext(RespondActivityTaskFailedResponse.getDefaultInstance());
654
      responseObserver.onCompleted();
655
    } catch (StatusRuntimeException e) {
656
      handleStatusRuntimeException(e, responseObserver);
657
    }
658
  }
659

660
  @Override
661
  public void respondActivityTaskFailedById(
662
      RespondActivityTaskFailedByIdRequest failRequest,
663
      StreamObserver<RespondActivityTaskFailedByIdResponse> responseObserver) {
664
    try {
665
      ExecutionId executionId =
666
          new ExecutionId(
667
              failRequest.getNamespace(), failRequest.getWorkflowId(), failRequest.getRunId());
668
      TestWorkflowMutableState mutableState = getMutableState(executionId);
669
      mutableState.failActivityTaskById(failRequest.getActivityId(), failRequest);
670
      responseObserver.onNext(RespondActivityTaskFailedByIdResponse.getDefaultInstance());
671
      responseObserver.onCompleted();
672
    } catch (StatusRuntimeException e) {
673
      handleStatusRuntimeException(e, responseObserver);
674
    }
675
  }
676

677
  @Override
678
  public void respondActivityTaskCanceled(
679
      RespondActivityTaskCanceledRequest canceledRequest,
680
      StreamObserver<RespondActivityTaskCanceledResponse> responseObserver) {
681
    try {
682
      ActivityTaskToken activityTaskToken =
683
          ActivityTaskToken.fromBytes(canceledRequest.getTaskToken());
684
      TestWorkflowMutableState mutableState = getMutableState(activityTaskToken.getExecutionId());
685
      mutableState.cancelActivityTask(activityTaskToken.getScheduledEventId(), canceledRequest);
686
      responseObserver.onNext(RespondActivityTaskCanceledResponse.getDefaultInstance());
687
      responseObserver.onCompleted();
688
    } catch (StatusRuntimeException e) {
689
      handleStatusRuntimeException(e, responseObserver);
690
    }
691
  }
692

693
  @Override
694
  public void respondActivityTaskCanceledById(
695
      RespondActivityTaskCanceledByIdRequest canceledRequest,
696
      StreamObserver<RespondActivityTaskCanceledByIdResponse> responseObserver) {
697
    try {
698
      ExecutionId executionId =
699
          new ExecutionId(
700
              canceledRequest.getNamespace(),
701
              canceledRequest.getWorkflowId(),
702
              canceledRequest.getRunId());
703
      TestWorkflowMutableState mutableState = getMutableState(executionId);
704
      mutableState.cancelActivityTaskById(canceledRequest.getActivityId(), canceledRequest);
705
      responseObserver.onNext(RespondActivityTaskCanceledByIdResponse.getDefaultInstance());
706
      responseObserver.onCompleted();
707
    } catch (StatusRuntimeException e) {
708
      handleStatusRuntimeException(e, responseObserver);
709
    }
710
  }
711

712
  @Override
713
  public void requestCancelWorkflowExecution(
714
      RequestCancelWorkflowExecutionRequest cancelRequest,
715
      StreamObserver<RequestCancelWorkflowExecutionResponse> responseObserver) {
716
    try {
717
      requestCancelWorkflowExecution(cancelRequest, Optional.empty());
718
      responseObserver.onNext(RequestCancelWorkflowExecutionResponse.getDefaultInstance());
719
      responseObserver.onCompleted();
720
    } catch (StatusRuntimeException e) {
721
      handleStatusRuntimeException(e, responseObserver);
722
    }
723
  }
724

725
  void requestCancelWorkflowExecution(
726
      RequestCancelWorkflowExecutionRequest cancelRequest,
727
      Optional<TestWorkflowMutableStateImpl.CancelExternalWorkflowExecutionCallerInfo> callerInfo) {
728
    ExecutionId executionId =
729
        new ExecutionId(cancelRequest.getNamespace(), cancelRequest.getWorkflowExecution());
730
    TestWorkflowMutableState mutableState = getMutableState(executionId);
731
    mutableState.requestCancelWorkflowExecution(cancelRequest, callerInfo);
732
  }
733

734
  @Override
735
  public void terminateWorkflowExecution(
736
      TerminateWorkflowExecutionRequest request,
737
      StreamObserver<TerminateWorkflowExecutionResponse> responseObserver) {
738
    try {
739
      terminateWorkflowExecution(request);
740
      responseObserver.onNext(TerminateWorkflowExecutionResponse.getDefaultInstance());
741
      responseObserver.onCompleted();
742
    } catch (StatusRuntimeException e) {
743
      handleStatusRuntimeException(e, responseObserver);
744
    }
745
  }
746

747
  private void terminateWorkflowExecution(TerminateWorkflowExecutionRequest request) {
748
    ExecutionId executionId =
749
        new ExecutionId(request.getNamespace(), request.getWorkflowExecution());
750
    TestWorkflowMutableState mutableState = getMutableState(executionId);
751
    mutableState.terminateWorkflowExecution(request);
752
  }
753

754
  @Override
755
  public void signalWorkflowExecution(
756
      SignalWorkflowExecutionRequest signalRequest,
757
      StreamObserver<SignalWorkflowExecutionResponse> responseObserver) {
758
    try {
759
      ExecutionId executionId =
760
          new ExecutionId(signalRequest.getNamespace(), signalRequest.getWorkflowExecution());
761
      TestWorkflowMutableState mutableState = getMutableState(executionId);
762
      mutableState.signal(signalRequest);
763
      responseObserver.onNext(SignalWorkflowExecutionResponse.getDefaultInstance());
764
      responseObserver.onCompleted();
765
    } catch (StatusRuntimeException e) {
766
      handleStatusRuntimeException(e, responseObserver);
767
    }
768
  }
769

770
  @Override
771
  public void updateWorkflowExecution(
772
      UpdateWorkflowExecutionRequest request,
773
      StreamObserver<UpdateWorkflowExecutionResponse> responseObserver) {
774
    try {
775
      ExecutionId executionId =
776
          new ExecutionId(request.getNamespace(), request.getWorkflowExecution());
777
      TestWorkflowMutableState mutableState = getMutableState(executionId);
778
      @Nullable Deadline deadline = Context.current().getDeadline();
779
      UpdateWorkflowExecutionResponse response =
780
          mutableState.updateWorkflowExecution(request, deadline);
781
      responseObserver.onNext(response);
782
      responseObserver.onCompleted();
783
    } catch (StatusRuntimeException e) {
784
      handleStatusRuntimeException(e, responseObserver);
785
    }
786
  }
787

788
  @Override
789
  public void pollWorkflowExecutionUpdate(
790
      PollWorkflowExecutionUpdateRequest request,
791
      StreamObserver<PollWorkflowExecutionUpdateResponse> responseObserver) {
792
    try {
793
      ExecutionId executionId =
794
          new ExecutionId(request.getNamespace(), request.getUpdateRef().getWorkflowExecution());
795
      TestWorkflowMutableState mutableState = getMutableState(executionId);
796
      @Nullable Deadline deadline = Context.current().getDeadline();
797
      PollWorkflowExecutionUpdateResponse response =
798
          mutableState.pollUpdateWorkflowExecution(request, deadline);
799
      responseObserver.onNext(response);
800
      responseObserver.onCompleted();
801
    } catch (StatusRuntimeException e) {
802
      handleStatusRuntimeException(e, responseObserver);
803
    }
804
  }
805

806
  @Override
807
  public void signalWithStartWorkflowExecution(
808
      SignalWithStartWorkflowExecutionRequest r,
809
      StreamObserver<SignalWithStartWorkflowExecutionResponse> responseObserver) {
810
    try {
811
      if (!r.hasTaskQueue()) {
812
        throw Status.INVALID_ARGUMENT
813
            .withDescription("request missing required taskQueue field")
814
            .asRuntimeException();
815
      }
816
      if (!r.hasWorkflowType()) {
817
        throw Status.INVALID_ARGUMENT
818
            .withDescription("request missing required workflowType field")
819
            .asRuntimeException();
820
      }
821
      ExecutionId executionId = new ExecutionId(r.getNamespace(), r.getWorkflowId(), null);
822
      TestWorkflowMutableState mutableState = getMutableState(executionId, false);
823
      SignalWorkflowExecutionRequest signalRequest =
824
          SignalWorkflowExecutionRequest.newBuilder()
825
              .setInput(r.getSignalInput())
826
              .setSignalName(r.getSignalName())
827
              .setWorkflowExecution(executionId.getExecution())
828
              .setRequestId(r.getRequestId())
829
              .setControl(r.getControl())
830
              .setNamespace(r.getNamespace())
831
              .setIdentity(r.getIdentity())
832
              .build();
833
      if (mutableState != null && !mutableState.isTerminalState()) {
834
        mutableState.signal(signalRequest);
835
        responseObserver.onNext(
836
            SignalWithStartWorkflowExecutionResponse.newBuilder()
837
                .setRunId(mutableState.getExecutionId().getExecution().getRunId())
838
                .build());
839
        responseObserver.onCompleted();
840
        return;
841
      }
842
      StartWorkflowExecutionRequest.Builder startRequest =
843
          StartWorkflowExecutionRequest.newBuilder()
844
              .setRequestId(r.getRequestId())
845
              .setInput(r.getInput())
846
              .setWorkflowExecutionTimeout(r.getWorkflowExecutionTimeout())
847
              .setWorkflowRunTimeout(r.getWorkflowRunTimeout())
848
              .setWorkflowTaskTimeout(r.getWorkflowTaskTimeout())
849
              .setNamespace(r.getNamespace())
850
              .setTaskQueue(r.getTaskQueue())
851
              .setWorkflowId(r.getWorkflowId())
852
              .setWorkflowIdReusePolicy(r.getWorkflowIdReusePolicy())
853
              .setIdentity(r.getIdentity())
854
              .setWorkflowType(r.getWorkflowType())
855
              .setCronSchedule(r.getCronSchedule())
856
              .setRequestId(r.getRequestId());
857
      if (r.hasRetryPolicy()) {
858
        startRequest.setRetryPolicy(r.getRetryPolicy());
859
      }
860
      if (r.hasHeader()) {
861
        startRequest.setHeader(r.getHeader());
862
      }
863
      if (r.hasMemo()) {
864
        startRequest.setMemo(r.getMemo());
865
      }
866
      if (r.hasSearchAttributes()) {
867
        startRequest.setSearchAttributes(r.getSearchAttributes());
868
      }
869
      StartWorkflowExecutionResponse startResult =
870
          startWorkflowExecutionImpl(
871
              startRequest.build(),
872
              Duration.ZERO,
873
              Optional.empty(),
874
              OptionalLong.empty(),
875
              signalRequest);
876
      responseObserver.onNext(
877
          SignalWithStartWorkflowExecutionResponse.newBuilder()
878
              .setRunId(startResult.getRunId())
879
              .build());
880
      responseObserver.onCompleted();
881
    } catch (StatusRuntimeException e) {
882
      handleStatusRuntimeException(e, responseObserver);
883
    }
884
  }
885

886
  public void signalExternalWorkflowExecution(
887
      String signalId,
888
      SignalExternalWorkflowExecutionCommandAttributes commandAttributes,
889
      TestWorkflowMutableState source) {
890
    String namespace;
891
    if (commandAttributes.getNamespace().isEmpty()) {
892
      namespace = source.getExecutionId().getNamespace();
893
    } else {
894
      namespace = commandAttributes.getNamespace();
895
    }
896
    ExecutionId executionId = new ExecutionId(namespace, commandAttributes.getExecution());
897
    TestWorkflowMutableState mutableState;
898
    try {
899
      mutableState = getMutableState(executionId);
900
      mutableState.signalFromWorkflow(commandAttributes);
901
      source.completeSignalExternalWorkflowExecution(
902
          signalId, mutableState.getExecutionId().getExecution().getRunId());
903
    } catch (StatusRuntimeException e) {
904
      if (e.getStatus().getCode() == Status.Code.NOT_FOUND) {
905
        source.failSignalExternalWorkflowExecution(
906
            signalId,
907
            SignalExternalWorkflowExecutionFailedCause
908
                .SIGNAL_EXTERNAL_WORKFLOW_EXECUTION_FAILED_CAUSE_EXTERNAL_WORKFLOW_EXECUTION_NOT_FOUND);
909
      } else {
910
        throw e;
911
      }
912
    }
913
  }
914

915
  /**
916
   * Creates next run of a workflow execution
917
   *
918
   * @return RunId
919
   */
920
  public String continueAsNew(
921
      StartWorkflowExecutionRequest previousRunStartRequest,
922
      WorkflowExecutionContinuedAsNewEventAttributes a,
923
      Optional<TestServiceRetryState> retryState,
924
      String identity,
925
      ExecutionId continuedExecutionId,
926
      String firstExecutionRunId,
927
      Optional<TestWorkflowMutableState> parent,
928
      OptionalLong parentChildInitiatedEventId) {
929
    StartWorkflowExecutionRequest.Builder startRequestBuilder =
930
        StartWorkflowExecutionRequest.newBuilder()
931
            .setRequestId(UUID.randomUUID().toString())
932
            .setWorkflowType(a.getWorkflowType())
933
            .setWorkflowRunTimeout(a.getWorkflowRunTimeout())
934
            .setWorkflowTaskTimeout(a.getWorkflowTaskTimeout())
935
            .setNamespace(continuedExecutionId.getNamespace())
936
            .setTaskQueue(a.getTaskQueue())
937
            .setWorkflowId(continuedExecutionId.getWorkflowId().getWorkflowId())
938
            .setWorkflowIdReusePolicy(previousRunStartRequest.getWorkflowIdReusePolicy())
939
            .setIdentity(identity)
940
            .setCronSchedule(previousRunStartRequest.getCronSchedule());
941
    if (previousRunStartRequest.hasRetryPolicy()) {
942
      startRequestBuilder.setRetryPolicy(previousRunStartRequest.getRetryPolicy());
943
    }
944
    if (a.hasInput()) {
945
      startRequestBuilder.setInput(a.getInput());
946
    }
947
    if (a.hasHeader()) {
948
      startRequestBuilder.setHeader(a.getHeader());
949
    }
950
    StartWorkflowExecutionRequest startRequest = startRequestBuilder.build();
951
    lock.lock();
952
    Optional<Failure> lastFail =
953
        a.hasFailure()
954
            ? Optional.of(a.getFailure())
955
            : retryState.flatMap(TestServiceRetryState::getPreviousRunFailure);
956
    try {
957
      StartWorkflowExecutionResponse response =
958
          startWorkflowExecutionNoRunningCheckLocked(
959
              startRequest,
960
              a.getNewExecutionRunId(),
961
              firstExecutionRunId,
962
              Optional.of(continuedExecutionId.getExecution().getRunId()),
963
              retryState,
964
              ProtobufTimeUtils.toJavaDuration(a.getBackoffStartInterval()),
965
              a.getLastCompletionResult(),
966
              lastFail,
967
              parent,
968
              parentChildInitiatedEventId,
969
              null,
970
              continuedExecutionId.getWorkflowId());
971
      return response.getRunId();
972
    } finally {
973
      lock.unlock();
974
    }
975
  }
976

977
  @Override
978
  public void listOpenWorkflowExecutions(
979
      ListOpenWorkflowExecutionsRequest listRequest,
980
      StreamObserver<ListOpenWorkflowExecutionsResponse> responseObserver) {
981
    try {
982
      Optional<String> workflowIdFilter;
983
      if (listRequest.hasExecutionFilter()
984
          && !listRequest.getExecutionFilter().getWorkflowId().isEmpty()) {
985
        workflowIdFilter = Optional.of(listRequest.getExecutionFilter().getWorkflowId());
986
      } else {
987
        workflowIdFilter = Optional.empty();
988
      }
989
      List<WorkflowExecutionInfo> result =
990
          store.listWorkflows(WorkflowState.OPEN, workflowIdFilter);
991
      responseObserver.onNext(
992
          ListOpenWorkflowExecutionsResponse.newBuilder().addAllExecutions(result).build());
993
      responseObserver.onCompleted();
994
    } catch (StatusRuntimeException e) {
995
      handleStatusRuntimeException(e, responseObserver);
996
    }
997
  }
998

999
  @Override
1000
  public void listClosedWorkflowExecutions(
1001
      ListClosedWorkflowExecutionsRequest listRequest,
1002
      StreamObserver<ListClosedWorkflowExecutionsResponse> responseObserver) {
1003
    try {
1004
      Optional<String> workflowIdFilter;
1005
      if (listRequest.hasExecutionFilter()
1006
          && !listRequest.getExecutionFilter().getWorkflowId().isEmpty()) {
1007
        workflowIdFilter = Optional.of(listRequest.getExecutionFilter().getWorkflowId());
1008
      } else {
1009
        workflowIdFilter = Optional.empty();
1010
      }
1011
      List<WorkflowExecutionInfo> result =
1012
          store.listWorkflows(WorkflowState.CLOSED, workflowIdFilter);
1013
      responseObserver.onNext(
1014
          ListClosedWorkflowExecutionsResponse.newBuilder().addAllExecutions(result).build());
1015
      responseObserver.onCompleted();
1016
    } catch (StatusRuntimeException e) {
1017
      handleStatusRuntimeException(e, responseObserver);
1018
    }
1019
  }
1020

1021
  @Override
1022
  public void respondQueryTaskCompleted(
1023
      RespondQueryTaskCompletedRequest completeRequest,
1024
      StreamObserver<RespondQueryTaskCompletedResponse> responseObserver) {
1025
    try {
1026
      QueryId queryId = QueryId.fromBytes(completeRequest.getTaskToken());
1027
      TestWorkflowMutableState mutableState = getMutableState(queryId.getExecutionId());
1028
      mutableState.completeQuery(queryId, completeRequest);
1029
      responseObserver.onNext(RespondQueryTaskCompletedResponse.getDefaultInstance());
1030
      responseObserver.onCompleted();
1031
    } catch (StatusRuntimeException e) {
1032
      handleStatusRuntimeException(e, responseObserver);
1033
    }
1034
  }
1035

1036
  @Override
1037
  public void queryWorkflow(
1038
      QueryWorkflowRequest queryRequest, StreamObserver<QueryWorkflowResponse> responseObserver) {
1039
    try {
1040
      ExecutionId executionId =
1041
          new ExecutionId(queryRequest.getNamespace(), queryRequest.getExecution());
1042
      TestWorkflowMutableState mutableState = getMutableState(executionId);
1043
      @Nullable Deadline deadline = Context.current().getDeadline();
1044
      QueryWorkflowResponse result =
1045
          mutableState.query(
1046
              queryRequest,
1047
              deadline != null ? deadline.timeRemaining(TimeUnit.MILLISECONDS) : Long.MAX_VALUE);
1048
      responseObserver.onNext(result);
1049
      responseObserver.onCompleted();
1050
    } catch (StatusRuntimeException e) {
1051
      handleStatusRuntimeException(e, responseObserver);
1052
    }
1053
  }
1054

1055
  @Override
1056
  public void describeWorkflowExecution(
1057
      DescribeWorkflowExecutionRequest request,
1058
      StreamObserver<io.temporal.api.workflowservice.v1.DescribeWorkflowExecutionResponse>
1059
          responseObserver) {
1060
    try {
1061
      if (request.getNamespace().isEmpty()) {
1062
        throw createInvalidArgument("Namespace not set on request.");
1063
      }
1064
      if (!request.hasExecution()) {
1065
        throw createInvalidArgument("Execution not set on request.");
1066
      }
1067

1068
      ExecutionId executionId = new ExecutionId(request.getNamespace(), request.getExecution());
1069
      TestWorkflowMutableState mutableState = getMutableState(executionId);
1070
      DescribeWorkflowExecutionResponse result = mutableState.describeWorkflowExecution();
1071
      responseObserver.onNext(result);
1072
      responseObserver.onCompleted();
1073
    } catch (StatusRuntimeException e) {
1074
      handleStatusRuntimeException(e, responseObserver);
1075
    }
1076
  }
1077

1078
  /**
1079
   * This method doesn't make much sense for test server, it accepts all namespaces as existent and
1080
   * registered. so, it's a trivial implementation just returning an info that a namespace is
1081
   * registered irrespectively of the input
1082
   */
1083
  @Override
1084
  public void describeNamespace(
1085
      DescribeNamespaceRequest request,
1086
      StreamObserver<DescribeNamespaceResponse> responseObserver) {
1087
    try {
1088
      if (request.getNamespace().isEmpty()) {
1089
        throw createInvalidArgument("Namespace not set on request.");
1090
      }
1091
      // generating a stable UUID for name
1092
      String namespaceId = UUID.nameUUIDFromBytes(request.getNamespace().getBytes()).toString();
1093
      DescribeNamespaceResponse result =
1094
          DescribeNamespaceResponse.newBuilder()
1095
              .setNamespaceInfo(
1096
                  NamespaceInfo.newBuilder()
1097
                      .setName(request.getNamespace())
1098
                      .setState(NamespaceState.NAMESPACE_STATE_REGISTERED)
1099
                      .setId(namespaceId)
1100
                      .build())
1101
              .build();
1102
      responseObserver.onNext(result);
1103
      responseObserver.onCompleted();
1104
    } catch (StatusRuntimeException e) {
1105
      handleStatusRuntimeException(e, responseObserver);
1106
    }
1107
  }
1108

1109
  private <R> R requireNotNull(String fieldName, R value) {
1110
    if (value == null) {
1111
      throw Status.INVALID_ARGUMENT
1112
          .withDescription("Missing required field \"" + fieldName + "\".")
1113
          .asRuntimeException();
1114
    }
1115
    return value;
1116
  }
1117

1118
  /**
1119
   * Adds diagnostic data about internal service state to the provided {@link StringBuilder}.
1120
   * Includes histories of all workflow instances stored in the service.
1121
   */
1122
  public void getDiagnostics(StringBuilder result) {
1123
    store.getDiagnostics(result);
1124
  }
1125

1126
  /**
1127
   * @deprecated use {@link io.temporal.serviceclient.TestServiceStubs} and {@link
1128
   *     io.temporal.api.testservice.v1.TestServiceGrpc.TestServiceBlockingStub#getCurrentTime(Empty)}
1129
   */
1130
  @Deprecated
1131
  public long currentTimeMillis() {
1132
    return selfAdvancingTimer.getClock().getAsLong();
1133
  }
1134

1135
  /** Invokes callback after the specified delay according to internal service clock. */
1136
  public void registerDelayedCallback(Duration delay, Runnable r) {
1137
    store.registerDelayedCallback(delay, r);
1138
  }
1139

1140
  /**
1141
   * Disables time skipping. To re-enable call {@link #unlockTimeSkipping(String)}. These calls are
1142
   * counted, so calling unlock does not guarantee that time is going to be skipped immediately as
1143
   * another lock can be holding it.
1144
   *
1145
   * @deprecated use {@link io.temporal.serviceclient.TestServiceStubs} and {@link
1146
   *     io.temporal.api.testservice.v1.TestServiceGrpc.TestServiceBlockingStub#lockTimeSkipping(LockTimeSkippingRequest)}
1147
   */
1148
  @Deprecated
1149
  public void lockTimeSkipping(String caller) {
1150
    selfAdvancingTimer.lockTimeSkipping(caller);
1151
  }
1152

1153
  /**
1154
   * @deprecated use {@link io.temporal.serviceclient.TestServiceStubs} and {@link
1155
   *     io.temporal.api.testservice.v1.TestServiceGrpc.TestServiceBlockingStub#unlockTimeSkipping(UnlockTimeSkippingRequest)}
1156
   */
1157
  @Deprecated
1158
  public void unlockTimeSkipping(String caller) {
1159
    selfAdvancingTimer.unlockTimeSkipping(caller);
1160
  }
1161

1162
  /**
1163
   * Unlocks time skipping and blocks the calling thread until internal clock passes the current +
1164
   * duration time.<br>
1165
   * When the time is reached, locks time skipping and returns.<br>
1166
   * Might not block at all due to time skipping. Or might block if the time skipping lock counter
1167
   * was more than 1.
1168
   *
1169
   * @deprecated use {@link io.temporal.serviceclient.TestServiceStubs} and {@link
1170
   *     io.temporal.api.testservice.v1.TestServiceGrpc.TestServiceBlockingStub#unlockTimeSkippingWithSleep(SleepRequest)}
1171
   */
1172
  @Deprecated
1173
  public void sleep(Duration duration) {
1174
    CompletableFuture<Void> result = new CompletableFuture<>();
1175
    selfAdvancingTimer.schedule(
1176
        duration,
1177
        () -> {
1178
          selfAdvancingTimer.lockTimeSkipping("TestWorkflowService sleep");
1179
          result.complete(null);
1180
        },
1181
        "workflow sleep");
1182
    selfAdvancingTimer.unlockTimeSkipping("TestWorkflowService sleep");
1183
    try {
1184
      result.get();
1185
    } catch (InterruptedException e) {
1186
      Thread.currentThread().interrupt();
1187
      throw new RuntimeException(e);
1188
    } catch (ExecutionException e) {
1189
      throw new RuntimeException(e);
1190
    }
1191
  }
1192

1193
  /**
1194
   * Temporal server times out task queue long poll calls after 1 minute and returns an empty
1195
   * result. After which the request has to be retried by the client if it wants to continue
1196
   * waiting. We emulate this behavior here.
1197
   *
1198
   * <p>If there is a deadline present, for task queue poll requests server will respond inside the
1199
   * deadline. Note that the latest is not applicable for getWorkflowExecutionHistory() long polls.
1200
   *
1201
   * @return minimum between the context deadline and maximum long poll deadline.
1202
   */
1203
  private Deadline getLongPollDeadline() {
1204
    @Nullable Deadline deadline = Context.current().getDeadline();
1205
    Deadline maximumDeadline =
1206
        Deadline.after(
1207
            WorkflowServiceStubsOptions.DEFAULT_SERVER_LONG_POLL_RPC_TIMEOUT.toMillis(),
1208
            TimeUnit.MILLISECONDS);
1209
    return deadline != null ? deadline.minimum(maximumDeadline) : maximumDeadline;
1210
  }
1211

1212
  private void handleStatusRuntimeException(
1213
      StatusRuntimeException e, StreamObserver<?> responseObserver) {
1214
    if (e.getStatus().getCode() == Status.Code.INTERNAL) {
1215
      log.error("unexpected", e);
1216
    }
1217
    responseObserver.onError(e);
1218
  }
1219

1220
  /**
1221
   * Creates an in-memory service along with client stubs for use in Java code. See also
1222
   * createServerOnly and createWithNoGrpcServer.
1223
   *
1224
   * @deprecated use {@link io.temporal.testserver.TestServer#createServer(boolean)} instead and
1225
   *     pass {@code lockTimeSkipping=false} to emulate the behavior of this method
1226
   */
1227
  @Deprecated
1228
  public TestWorkflowService() {
1229
    this(0, true);
1230
  }
1231

1232
  /**
1233
   * Creates an in-memory service along with client stubs for use in Java code. See also
1234
   * createServerOnly and createWithNoGrpcServer.
1235
   *
1236
   * @deprecated use {@link io.temporal.testserver.TestServer#createServer(boolean, long)} instead
1237
   *     and pass {@code lockTimeSkipping=false} to emulate the behavior of this method
1238
   */
1239
  @Deprecated
1240
  public TestWorkflowService(long initialTimeMillis) {
1241
    this(initialTimeMillis, true);
1242
  }
1243

1244
  /**
1245
   * Creates an in-memory service along with client stubs for use in Java code. See also
1246
   * createServerOnly and createWithNoGrpcServer.
1247
   *
1248
   * @deprecated use {@link io.temporal.testserver.TestServer#createServer(boolean)} instead
1249
   */
1250
  @Deprecated
1251
  public TestWorkflowService(boolean lockTimeSkipping) {
1252
    this(0, true);
1253
    if (lockTimeSkipping) {
1254
      this.lockTimeSkipping("constructor");
1255
    }
1256
  }
1257

1258
  /**
1259
   * Creates an instance of TestWorkflowService that does not manage its own gRPC server. Useful for
1260
   * including in an externally managed gRPC server.
1261
   *
1262
   * @deprecated use {@link TestServicesStarter} to create just the services with gRPC server
1263
   */
1264
  @Deprecated
1265
  public static TestWorkflowService createWithNoGrpcServer() {
1266
    return new TestWorkflowService(0, false);
1267
  }
1268

1269
  private TestWorkflowService(long initialTimeMillis, boolean startInProcessServer) {
1270
    this.selfAdvancingTimer =
1271
        new SelfAdvancingTimerImpl(initialTimeMillis, Clock.systemDefaultZone());
1272
    store = new TestWorkflowStoreImpl(this.selfAdvancingTimer);
1273
    visibilityStore = new TestVisibilityStoreImpl();
1274
    outOfProcessServer = null;
1275
    if (startInProcessServer) {
1276
      this.inProcessServer = new InProcessGRPCServer(Collections.singletonList(this));
1277
      this.workflowServiceStubs =
1278
          WorkflowServiceStubs.newServiceStubs(
1279
              WorkflowServiceStubsOptions.newBuilder()
1280
                  .setChannel(inProcessServer.getChannel())
1281
                  .build());
1282
    } else {
1283
      this.inProcessServer = null;
1284
      this.workflowServiceStubs = null;
1285
    }
1286
  }
1287

1288
  /**
1289
   * Creates an out-of-process rather than in-process server, and does not set up a client. Useful,
1290
   * for example, if you want to use the test service from other SDKs.
1291
   *
1292
   * @param port the port to listen on
1293
   * @deprecated use {@link io.temporal.testserver.TestServer#createPortBoundServer(int, boolean)}
1294
   *     instead and pass {@code lockTimeSkipping=false} to emulate the behavior of this method
1295
   */
1296
  @Deprecated
1297
  public static TestWorkflowService createServerOnly(int port) {
1298
    TestWorkflowService result = new TestWorkflowService(true, port);
1299
    log.info("Server started, listening on " + port);
1300
    return result;
1301
  }
1302

1303
  private TestWorkflowService(boolean isOutOfProc, int port) {
1304
    // isOutOfProc is just here to make unambiguous constructor overloading.
1305
    Preconditions.checkState(isOutOfProc, "Impossible.");
1306
    inProcessServer = null;
1307
    workflowServiceStubs = null;
1308
    this.selfAdvancingTimer = new SelfAdvancingTimerImpl(0, Clock.systemDefaultZone());
1309
    store = new TestWorkflowStoreImpl(selfAdvancingTimer);
1310
    visibilityStore = new TestVisibilityStoreImpl();
1311
    try {
1312
      ServerBuilder<?> serverBuilder =
1313
          Grpc.newServerBuilderForPort(port, InsecureServerCredentials.create());
1314
      GRPCServerHelper.registerServicesAndHealthChecks(
1315
          Collections.singletonList(this), serverBuilder);
1316
      outOfProcessServer = serverBuilder.build().start();
1317
    } catch (IOException e) {
1318
      throw new RuntimeException(e);
1319
    }
1320
  }
1321

1322
  @Deprecated
1323
  public WorkflowServiceStubs newClientStub() {
1324
    if (workflowServiceStubs == null) {
1325
      throw new RuntimeException(
1326
          "Cannot get a client when you created your TestWorkflowService with createServerOnly.");
1327
    }
1328
    return workflowServiceStubs;
1329
  }
1330

1331
  private static StatusRuntimeException createInvalidArgument(String description) {
1332
    throw Status.INVALID_ARGUMENT.withDescription(description).asRuntimeException();
1333
  }
1334
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc