• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

temporalio / sdk-java / #293

02 Aug 2024 06:38AM UTC coverage: 77.618% (+0.05%) from 77.571%
#293

push

github

web-flow
Add getCurrentUpdateInfo (#2158)

Add getCurrentUpdateInfo

33 of 34 new or added lines in 6 files covered. (97.06%)

8 existing lines in 2 files now uncovered.

19749 of 25444 relevant lines covered (77.62%)

0.78 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

71.77
/temporal-test-server/src/main/java/io/temporal/internal/testservice/TestWorkflowService.java
1
/*
2
 * Copyright (C) 2022 Temporal Technologies, Inc. All Rights Reserved.
3
 *
4
 * Copyright (C) 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved.
5
 *
6
 * Modifications copyright (C) 2017 Uber Technologies, Inc.
7
 *
8
 * Licensed under the Apache License, Version 2.0 (the "License");
9
 * you may not use this material except in compliance with the License.
10
 * You may obtain a copy of the License at
11
 *
12
 *   http://www.apache.org/licenses/LICENSE-2.0
13
 *
14
 * Unless required by applicable law or agreed to in writing, software
15
 * distributed under the License is distributed on an "AS IS" BASIS,
16
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17
 * See the License for the specific language governing permissions and
18
 * limitations under the License.
19
 */
20

21
package io.temporal.internal.testservice;
22

23
import static io.temporal.internal.testservice.CronUtils.getBackoffInterval;
24

25
import com.google.common.base.Preconditions;
26
import com.google.common.base.Throwables;
27
import com.google.protobuf.ByteString;
28
import com.google.protobuf.Empty;
29
import com.google.protobuf.Timestamp;
30
import com.google.protobuf.util.Timestamps;
31
import io.grpc.*;
32
import io.grpc.stub.StreamObserver;
33
import io.temporal.api.command.v1.ContinueAsNewWorkflowExecutionCommandAttributes;
34
import io.temporal.api.command.v1.SignalExternalWorkflowExecutionCommandAttributes;
35
import io.temporal.api.common.v1.Payload;
36
import io.temporal.api.common.v1.Payloads;
37
import io.temporal.api.common.v1.RetryPolicy;
38
import io.temporal.api.common.v1.WorkflowExecution;
39
import io.temporal.api.enums.v1.*;
40
import io.temporal.api.errordetails.v1.WorkflowExecutionAlreadyStartedFailure;
41
import io.temporal.api.failure.v1.Failure;
42
import io.temporal.api.history.v1.WorkflowExecutionContinuedAsNewEventAttributes;
43
import io.temporal.api.namespace.v1.NamespaceInfo;
44
import io.temporal.api.testservice.v1.LockTimeSkippingRequest;
45
import io.temporal.api.testservice.v1.SleepRequest;
46
import io.temporal.api.testservice.v1.UnlockTimeSkippingRequest;
47
import io.temporal.api.workflow.v1.WorkflowExecutionInfo;
48
import io.temporal.api.workflowservice.v1.*;
49
import io.temporal.internal.common.ProtobufTimeUtils;
50
import io.temporal.internal.testservice.TestWorkflowStore.WorkflowState;
51
import io.temporal.serviceclient.StatusUtils;
52
import io.temporal.serviceclient.WorkflowServiceStubs;
53
import io.temporal.serviceclient.WorkflowServiceStubsOptions;
54
import java.io.Closeable;
55
import java.io.IOException;
56
import java.time.Clock;
57
import java.time.Duration;
58
import java.util.*;
59
import java.util.concurrent.*;
60
import java.util.concurrent.locks.Lock;
61
import java.util.concurrent.locks.ReentrantLock;
62
import javax.annotation.Nonnull;
63
import javax.annotation.Nullable;
64
import org.slf4j.Logger;
65
import org.slf4j.LoggerFactory;
66

67
/**
68
 * In memory implementation of the Workflow Service. To be used for testing purposes only.
69
 *
70
 * <p>Do not use directly, instead use {@link io.temporal.testing.TestWorkflowEnvironment}.
71
 */
72
public final class TestWorkflowService extends WorkflowServiceGrpc.WorkflowServiceImplBase
73
    implements Closeable {
74
  private static final Logger log = LoggerFactory.getLogger(TestWorkflowService.class);
1✔
75
  private final Map<ExecutionId, TestWorkflowMutableState> executions = new HashMap<>();
1✔
76
  // key->WorkflowId
77
  private final Map<WorkflowId, TestWorkflowMutableState> executionsByWorkflowId = new HashMap<>();
1✔
78
  private final ExecutorService executor = Executors.newCachedThreadPool();
1✔
79
  private final Lock lock = new ReentrantLock();
1✔
80

81
  private final TestWorkflowStore store;
82
  private final TestVisibilityStore visibilityStore;
83
  private final SelfAdvancingTimer selfAdvancingTimer;
84

85
  private final ScheduledExecutorService backgroundScheduler =
1✔
86
      Executors.newSingleThreadScheduledExecutor();
1✔
87

88
  private final Server outOfProcessServer;
89
  private final InProcessGRPCServer inProcessServer;
90
  private final WorkflowServiceStubs workflowServiceStubs;
91

92
  TestWorkflowService(
93
      TestWorkflowStore store,
94
      TestVisibilityStore visibilityStore,
95
      SelfAdvancingTimer selfAdvancingTimer) {
1✔
96
    this.store = store;
1✔
97
    this.visibilityStore = visibilityStore;
1✔
98
    this.selfAdvancingTimer = selfAdvancingTimer;
1✔
99
    this.outOfProcessServer = null;
1✔
100
    this.inProcessServer = null;
1✔
101
    this.workflowServiceStubs = null;
1✔
102
  }
1✔
103

104
  @Override
105
  public void close() {
106
    log.debug("Shutting down TestWorkflowService");
1✔
107

108
    log.debug("Shutting down background scheduler");
1✔
109
    backgroundScheduler.shutdown();
1✔
110

111
    if (outOfProcessServer != null) {
1✔
112
      log.info("Shutting down out-of-process GRPC server");
×
113
      outOfProcessServer.shutdown();
×
114
    }
115

116
    if (workflowServiceStubs != null) {
1✔
117
      workflowServiceStubs.shutdown();
×
118
    }
119

120
    if (inProcessServer != null) {
1✔
121
      log.info("Shutting down in-process GRPC server");
×
122
      inProcessServer.shutdown();
×
123
    }
124

125
    executor.shutdown();
1✔
126

127
    try {
128
      executor.awaitTermination(1, TimeUnit.SECONDS);
1✔
129

130
      if (outOfProcessServer != null) {
1✔
131
        outOfProcessServer.awaitTermination(1, TimeUnit.SECONDS);
×
132
      }
133

134
      if (workflowServiceStubs != null) {
1✔
135
        workflowServiceStubs.awaitTermination(1, TimeUnit.SECONDS);
×
136
      }
137

138
      if (inProcessServer != null) {
1✔
139
        inProcessServer.awaitTermination(1, TimeUnit.SECONDS);
×
140
      }
141

142
    } catch (InterruptedException e) {
×
143
      Thread.currentThread().interrupt();
×
144
      log.debug("shutdown interrupted", e);
×
145
    }
1✔
146

147
    store.close();
1✔
148
  }
1✔
149

150
  private TestWorkflowMutableState getMutableState(ExecutionId executionId) {
151
    return getMutableState(executionId, true);
1✔
152
  }
153

154
  private TestWorkflowMutableState getMutableState(ExecutionId executionId, boolean failNotExists) {
155
    lock.lock();
1✔
156
    try {
157
      if (executionId.getExecution().getRunId().isEmpty()) {
1✔
158
        return getMutableState(executionId.getWorkflowId(), failNotExists);
1✔
159
      }
160
      TestWorkflowMutableState mutableState = executions.get(executionId);
1✔
161
      if (mutableState == null && failNotExists) {
1✔
162
        throw Status.NOT_FOUND
1✔
163
            .withDescription(
1✔
164
                "Execution \""
165
                    + executionId
166
                    + "\" not found in mutable state. Known executions: "
167
                    + executions.values()
1✔
168
                    + ", service="
169
                    + this)
170
            .asRuntimeException();
1✔
171
      }
172
      return mutableState;
1✔
173
    } finally {
174
      lock.unlock();
1✔
175
    }
176
  }
177

178
  private TestWorkflowMutableState getMutableState(WorkflowId workflowId, boolean failNotExists) {
179
    lock.lock();
1✔
180
    try {
181
      TestWorkflowMutableState mutableState = executionsByWorkflowId.get(workflowId);
1✔
182
      if (mutableState == null && failNotExists) {
1✔
183
        throw Status.NOT_FOUND
1✔
184
            .withDescription("Execution not found in mutable state: " + workflowId)
1✔
185
            .asRuntimeException();
1✔
186
      }
187
      return mutableState;
1✔
188
    } finally {
189
      lock.unlock();
1✔
190
    }
191
  }
192

193
  @Override
194
  public void startWorkflowExecution(
195
      StartWorkflowExecutionRequest request,
196
      StreamObserver<StartWorkflowExecutionResponse> responseObserver) {
197
    try {
198
      Duration backoffInterval = getBackoffInterval(request.getCronSchedule(), store.currentTime());
1✔
199
      StartWorkflowExecutionResponse response =
1✔
200
          startWorkflowExecutionImpl(
1✔
201
              request, backoffInterval, Optional.empty(), OptionalLong.empty(), null);
1✔
202
      responseObserver.onNext(response);
1✔
203
      responseObserver.onCompleted();
1✔
204
    } catch (StatusRuntimeException e) {
1✔
205
      handleStatusRuntimeException(e, responseObserver);
1✔
206
    }
1✔
207
  }
1✔
208

209
  StartWorkflowExecutionResponse startWorkflowExecutionImpl(
210
      StartWorkflowExecutionRequest startRequest,
211
      Duration backoffStartInterval,
212
      Optional<TestWorkflowMutableState> parent,
213
      OptionalLong parentChildInitiatedEventId,
214
      @Nullable SignalWorkflowExecutionRequest signalWithStartSignal) {
215
    String requestWorkflowId = requireNotNull("WorkflowId", startRequest.getWorkflowId());
1✔
216
    String namespace = requireNotNull("Namespace", startRequest.getNamespace());
1✔
217
    WorkflowId workflowId = new WorkflowId(namespace, requestWorkflowId);
1✔
218
    WorkflowIdReusePolicy reusePolicy = startRequest.getWorkflowIdReusePolicy();
1✔
219
    WorkflowIdConflictPolicy conflictPolicy = startRequest.getWorkflowIdConflictPolicy();
1✔
220
    if (conflictPolicy != WorkflowIdConflictPolicy.WORKFLOW_ID_CONFLICT_POLICY_UNSPECIFIED
1✔
221
        && reusePolicy == WorkflowIdReusePolicy.WORKFLOW_ID_REUSE_POLICY_TERMINATE_IF_RUNNING) {
222
      throw createInvalidArgument(
×
223
          "Invalid WorkflowIDReusePolicy: WORKFLOW_ID_REUSE_POLICY_TERMINATE_IF_RUNNING cannot be used together with a WorkflowIDConflictPolicy.");
224
    }
225

226
    TestWorkflowMutableState existing;
227
    lock.lock();
1✔
228
    try {
229
      String newRunId = UUID.randomUUID().toString();
1✔
230
      existing = executionsByWorkflowId.get(workflowId);
1✔
231
      if (existing != null) {
1✔
232
        WorkflowExecutionStatus status = existing.getWorkflowExecutionStatus();
1✔
233

234
        if (status == WorkflowExecutionStatus.WORKFLOW_EXECUTION_STATUS_RUNNING
1✔
235
            && (reusePolicy == WorkflowIdReusePolicy.WORKFLOW_ID_REUSE_POLICY_TERMINATE_IF_RUNNING
236
                || conflictPolicy
237
                    == WorkflowIdConflictPolicy.WORKFLOW_ID_CONFLICT_POLICY_TERMINATE_EXISTING)) {
238
          existing.terminateWorkflowExecution(
1✔
239
              TerminateWorkflowExecutionRequest.newBuilder()
1✔
240
                  .setNamespace(startRequest.getNamespace())
1✔
241
                  .setWorkflowExecution(existing.getExecutionId().getExecution())
1✔
242
                  .setReason("TerminateIfRunning WorkflowIdReusePolicy Policy")
1✔
243
                  .setIdentity("history-service")
1✔
244
                  .setDetails(
1✔
245
                      Payloads.newBuilder()
1✔
246
                          .addPayloads(
1✔
247
                              Payload.newBuilder()
1✔
248
                                  .setData(
1✔
249
                                      ByteString.copyFromUtf8(
1✔
250
                                          String.format("terminated by new runID: %s", newRunId)))
1✔
251
                                  .build())
1✔
252
                          .build())
1✔
253
                  .build());
1✔
254
        } else if (status == WorkflowExecutionStatus.WORKFLOW_EXECUTION_STATUS_RUNNING
1✔
255
            && conflictPolicy
256
                == WorkflowIdConflictPolicy.WORKFLOW_ID_CONFLICT_POLICY_USE_EXISTING) {
257
          return StartWorkflowExecutionResponse.newBuilder()
1✔
258
              .setStarted(false)
1✔
259
              .setRunId(existing.getExecutionId().getExecution().getRunId())
1✔
260
              .build();
1✔
261
        } else if (status == WorkflowExecutionStatus.WORKFLOW_EXECUTION_STATUS_RUNNING
1✔
262
            || reusePolicy == WorkflowIdReusePolicy.WORKFLOW_ID_REUSE_POLICY_REJECT_DUPLICATE) {
263
          return throwDuplicatedWorkflow(startRequest, existing);
×
264
        } else if (reusePolicy
1✔
265
                == WorkflowIdReusePolicy.WORKFLOW_ID_REUSE_POLICY_ALLOW_DUPLICATE_FAILED_ONLY
266
            && (status == WorkflowExecutionStatus.WORKFLOW_EXECUTION_STATUS_COMPLETED
267
                || status == WorkflowExecutionStatus.WORKFLOW_EXECUTION_STATUS_CONTINUED_AS_NEW)) {
268
          return throwDuplicatedWorkflow(startRequest, existing);
×
269
        }
270
      }
271
      Optional<TestServiceRetryState> retryState;
272
      Optional<Failure> lastFailure = Optional.empty();
1✔
273
      if (startRequest.hasRetryPolicy()) {
1✔
274
        Duration expirationInterval =
1✔
275
            ProtobufTimeUtils.toJavaDuration(startRequest.getWorkflowExecutionTimeout());
1✔
276
        retryState = newRetryStateLocked(startRequest.getRetryPolicy(), expirationInterval);
1✔
277
        if (retryState.isPresent()) {
1✔
278
          lastFailure = retryState.get().getPreviousRunFailure();
1✔
279
        }
280
      } else {
1✔
281
        retryState = Optional.empty();
1✔
282
      }
283
      return startWorkflowExecutionNoRunningCheckLocked(
1✔
284
          startRequest,
285
          newRunId,
286
          // it's the first execution in the continue-as-new chain, so firstExecutionRunId =
287
          // newRunId
288
          newRunId,
289
          Optional.empty(),
1✔
290
          retryState,
291
          backoffStartInterval,
292
          null,
293
          lastFailure,
294
          parent,
295
          parentChildInitiatedEventId,
296
          signalWithStartSignal,
297
          workflowId);
298
    } finally {
299
      lock.unlock();
1✔
300
    }
301
  }
302

303
  private Optional<TestServiceRetryState> newRetryStateLocked(
304
      RetryPolicy retryPolicy, Duration expirationInterval) {
305
    Timestamp expirationTime =
306
        expirationInterval.isZero()
1✔
307
            ? Timestamps.fromNanos(0)
1✔
308
            : Timestamps.add(
1✔
309
                store.currentTime(), ProtobufTimeUtils.toProtoDuration(expirationInterval));
1✔
310
    return Optional.of(new TestServiceRetryState(retryPolicy, expirationTime));
1✔
311
  }
312

313
  private StartWorkflowExecutionResponse throwDuplicatedWorkflow(
314
      StartWorkflowExecutionRequest startRequest, TestWorkflowMutableState existing) {
315
    WorkflowExecution execution = existing.getExecutionId().getExecution();
1✔
316
    WorkflowExecutionAlreadyStartedFailure error =
317
        WorkflowExecutionAlreadyStartedFailure.newBuilder()
1✔
318
            .setRunId(execution.getRunId())
1✔
319
            .setStartRequestId(startRequest.getRequestId())
1✔
320
            .build();
1✔
321
    throw StatusUtils.newException(
1✔
322
        Status.ALREADY_EXISTS.withDescription(
1✔
323
            String.format(
1✔
324
                "WorkflowId: %s, " + "RunId: %s", execution.getWorkflowId(), execution.getRunId())),
1✔
325
        error,
326
        WorkflowExecutionAlreadyStartedFailure.getDescriptor());
1✔
327
  }
328

329
  private StartWorkflowExecutionResponse startWorkflowExecutionNoRunningCheckLocked(
330
      StartWorkflowExecutionRequest startRequest,
331
      @Nonnull String runId,
332
      @Nonnull String firstExecutionRunId,
333
      Optional<String> continuedExecutionRunId,
334
      Optional<TestServiceRetryState> retryState,
335
      Duration backoffStartInterval,
336
      Payloads lastCompletionResult,
337
      Optional<Failure> lastFailure,
338
      Optional<TestWorkflowMutableState> parent,
339
      OptionalLong parentChildInitiatedEventId,
340
      @Nullable SignalWorkflowExecutionRequest signalWithStartSignal,
341
      WorkflowId workflowId) {
342
    String namespace = startRequest.getNamespace();
1✔
343
    TestWorkflowMutableState mutableState =
1✔
344
        new TestWorkflowMutableStateImpl(
345
            startRequest,
346
            firstExecutionRunId,
347
            runId,
348
            retryState,
349
            backoffStartInterval,
350
            lastCompletionResult,
351
            lastFailure,
352
            parent,
353
            parentChildInitiatedEventId,
354
            continuedExecutionRunId,
355
            this,
356
            store,
357
            visibilityStore,
358
            selfAdvancingTimer);
359
    WorkflowExecution execution = mutableState.getExecutionId().getExecution();
1✔
360
    ExecutionId executionId = new ExecutionId(namespace, execution);
1✔
361
    executionsByWorkflowId.put(workflowId, mutableState);
1✔
362
    executions.put(executionId, mutableState);
1✔
363

364
    PollWorkflowTaskQueueRequest eagerWorkflowTaskPollRequest =
365
        startRequest.getRequestEagerExecution()
1✔
366
            ? PollWorkflowTaskQueueRequest.newBuilder()
1✔
367
                .setIdentity(startRequest.getIdentity())
1✔
368
                .setNamespace(startRequest.getNamespace())
1✔
369
                .setTaskQueue(startRequest.getTaskQueue())
1✔
370
                .build()
1✔
371
            : null;
1✔
372

373
    @Nullable
374
    PollWorkflowTaskQueueResponse eagerWorkflowTask =
1✔
375
        mutableState.startWorkflow(
1✔
376
            continuedExecutionRunId.isPresent(),
1✔
377
            signalWithStartSignal,
378
            eagerWorkflowTaskPollRequest);
379
    StartWorkflowExecutionResponse.Builder response =
380
        StartWorkflowExecutionResponse.newBuilder().setRunId(execution.getRunId()).setStarted(true);
1✔
381
    if (eagerWorkflowTask != null) {
1✔
382
      response.setEagerWorkflowTask(eagerWorkflowTask);
1✔
383
    }
384
    return response.build();
1✔
385
  }
386

387
  @Override
388
  public void getWorkflowExecutionHistory(
389
      GetWorkflowExecutionHistoryRequest getRequest,
390
      StreamObserver<GetWorkflowExecutionHistoryResponse> responseObserver) {
391
    ExecutionId executionId = new ExecutionId(getRequest.getNamespace(), getRequest.getExecution());
1✔
392
    executor.execute(
1✔
393
        // preserving gRPC context deadline between threads
394
        Context.current()
1✔
395
            .wrap(
1✔
396
                () -> {
397
                  try {
398
                    TestWorkflowMutableState mutableState = getMutableState(executionId);
1✔
399
                    responseObserver.onNext(
1✔
400
                        store.getWorkflowExecutionHistory(
1✔
401
                            mutableState.getExecutionId(),
1✔
402
                            getRequest,
403
                            // We explicitly don't try to respond inside the context deadline.
404
                            // If we try to fit into the context deadline, the deadline may be not
405
                            // expired on the client side and an empty response will lead to a new
406
                            // request, making the client hammer the server at the tail end of the
407
                            // deadline.
408
                            // So this call is designed to wait fully till the end of the
409
                            // context deadline and throw DEADLINE_EXCEEDED if the deadline is less
410
                            // than 20s.
411
                            // If it's longer than 20 seconds - we return an empty result.
412
                            Deadline.after(20, TimeUnit.SECONDS)));
1✔
413
                    responseObserver.onCompleted();
1✔
414
                  } catch (StatusRuntimeException e) {
1✔
415
                    if (e.getStatus().getCode() == Status.Code.INTERNAL) {
1✔
416
                      log.error("unexpected", e);
×
417
                    }
418
                    responseObserver.onError(e);
1✔
419
                  } catch (Exception e) {
×
420
                    log.error("unexpected", e);
×
421
                    responseObserver.onError(e);
×
422
                  }
1✔
423
                }));
1✔
424
  }
1✔
425

426
  private <T> T pollTaskQueue(Context ctx, Future<T> futureValue)
427
      throws ExecutionException, InterruptedException {
428
    final Context.CancellationListener canceler = context -> futureValue.cancel(true);
1✔
429
    ctx.addListener(canceler, this.backgroundScheduler);
1✔
430
    try {
431
      return futureValue.get();
1✔
432
    } finally {
433
      ctx.removeListener(canceler);
1✔
434
    }
435
  }
436

437
  @Override
438
  public void pollWorkflowTaskQueue(
439
      PollWorkflowTaskQueueRequest pollRequest,
440
      StreamObserver<PollWorkflowTaskQueueResponse> responseObserver) {
441
    try (Context.CancellableContext ctx = deadlineCtx(getLongPollDeadline())) {
1✔
442
      PollWorkflowTaskQueueResponse.Builder task;
443
      try {
444
        task = pollTaskQueue(ctx, store.pollWorkflowTaskQueue(pollRequest));
1✔
445
      } catch (ExecutionException e) {
×
446
        responseObserver.onError(e);
×
447
        return;
×
448
      } catch (InterruptedException e) {
×
449
        Thread.currentThread().interrupt();
×
450
        responseObserver.onNext(PollWorkflowTaskQueueResponse.getDefaultInstance());
×
451
        responseObserver.onCompleted();
×
452
        return;
×
453
      } catch (CancellationException e) {
1✔
454
        responseObserver.onNext(PollWorkflowTaskQueueResponse.getDefaultInstance());
1✔
455
        responseObserver.onCompleted();
1✔
456
        return;
1✔
457
      }
1✔
458

459
      ExecutionId executionId =
1✔
460
          new ExecutionId(pollRequest.getNamespace(), task.getWorkflowExecution());
1✔
461
      TestWorkflowMutableState mutableState = getMutableState(executionId);
1✔
462
      try {
463
        mutableState.startWorkflowTask(task, pollRequest);
1✔
464
        // The task always has the original task queue that was created as part of the response.
465
        // This may be a different task queue than the task queue it was scheduled on, as in the
466
        // case of sticky execution.
467
        task.setWorkflowExecutionTaskQueue(mutableState.getStartRequest().getTaskQueue());
1✔
468
        PollWorkflowTaskQueueResponse response = task.build();
1✔
469
        responseObserver.onNext(response);
1✔
470
        responseObserver.onCompleted();
1✔
UNCOV
471
      } catch (StatusRuntimeException e) {
×
UNCOV
472
        if (e.getStatus().getCode() == Status.Code.NOT_FOUND) {
×
UNCOV
473
          if (log.isDebugEnabled()) {
×
474
            log.debug("Skipping outdated workflow task for " + executionId, e);
×
475
          }
476
          // The real service doesn't return this call on outdated task.
477
          // For simplicity, we return an empty result here.
UNCOV
478
          responseObserver.onNext(PollWorkflowTaskQueueResponse.getDefaultInstance());
×
UNCOV
479
          responseObserver.onCompleted();
×
480
        } else {
481
          if (e.getStatus().getCode() == Status.Code.INTERNAL) {
×
482
            log.error("unexpected", e);
×
483
          }
484
          responseObserver.onError(e);
×
485
        }
486
      }
1✔
487
    }
1✔
488
  }
1✔
489

490
  @Override
491
  public void respondWorkflowTaskCompleted(
492
      RespondWorkflowTaskCompletedRequest request,
493
      StreamObserver<RespondWorkflowTaskCompletedResponse> responseObserver) {
494
    try {
495
      WorkflowTaskToken taskToken = WorkflowTaskToken.fromBytes(request.getTaskToken());
1✔
496
      TestWorkflowMutableState mutableState = getMutableState(taskToken.getExecutionId());
1✔
497
      mutableState.completeWorkflowTask(taskToken.getHistorySize(), request);
1✔
498
      responseObserver.onNext(RespondWorkflowTaskCompletedResponse.getDefaultInstance());
1✔
499
      responseObserver.onCompleted();
1✔
500
    } catch (StatusRuntimeException e) {
1✔
501
      handleStatusRuntimeException(e, responseObserver);
1✔
502
    } catch (Throwable e) {
×
503
      responseObserver.onError(
×
504
          Status.INTERNAL
505
              .withDescription(Throwables.getStackTraceAsString(e))
×
506
              .withCause(e)
×
507
              .asRuntimeException());
×
508
    }
1✔
509
  }
1✔
510

511
  @Override
512
  public void respondWorkflowTaskFailed(
513
      RespondWorkflowTaskFailedRequest failedRequest,
514
      StreamObserver<RespondWorkflowTaskFailedResponse> responseObserver) {
515
    try {
516
      WorkflowTaskToken taskToken = WorkflowTaskToken.fromBytes(failedRequest.getTaskToken());
1✔
517
      TestWorkflowMutableState mutableState = getMutableState(taskToken.getExecutionId());
1✔
518
      mutableState.failWorkflowTask(failedRequest);
1✔
519
      responseObserver.onNext(RespondWorkflowTaskFailedResponse.getDefaultInstance());
1✔
520
      responseObserver.onCompleted();
1✔
521
    } catch (StatusRuntimeException e) {
×
522
      handleStatusRuntimeException(e, responseObserver);
×
523
    }
1✔
524
  }
1✔
525

526
  @Override
527
  public void getSystemInfo(
528
      GetSystemInfoRequest request, StreamObserver<GetSystemInfoResponse> responseObserver) {
529
    responseObserver.onNext(
1✔
530
        GetSystemInfoResponse.newBuilder()
1✔
531
            .setCapabilities(
1✔
532
                // These are the capabilities I could verify the test server supports
533
                GetSystemInfoResponse.Capabilities.newBuilder()
1✔
534
                    .setSdkMetadata(true)
1✔
535
                    .setSignalAndQueryHeader(true)
1✔
536
                    .setEncodedFailureAttributes(true)
1✔
537
                    .setEagerWorkflowStart(true)
1✔
538
                    .build())
1✔
539
            .build());
1✔
540
    responseObserver.onCompleted();
1✔
541
  }
1✔
542

543
  private Context.CancellableContext deadlineCtx(Deadline deadline) {
544
    return Context.current().withDeadline(deadline, this.backgroundScheduler);
1✔
545
  }
546

547
  @Override
548
  public void pollActivityTaskQueue(
549
      PollActivityTaskQueueRequest pollRequest,
550
      StreamObserver<PollActivityTaskQueueResponse> responseObserver) {
551
    try (Context.CancellableContext ctx = deadlineCtx(getLongPollDeadline())) {
1✔
552

553
      PollActivityTaskQueueResponse.Builder task;
554
      try {
555
        task = pollTaskQueue(ctx, store.pollActivityTaskQueue(pollRequest));
1✔
556
      } catch (ExecutionException e) {
×
557
        responseObserver.onError(e);
×
558
        return;
×
559
      } catch (InterruptedException e) {
×
560
        Thread.currentThread().interrupt();
×
561
        responseObserver.onNext(PollActivityTaskQueueResponse.getDefaultInstance());
×
562
        responseObserver.onCompleted();
×
563
        return;
×
564
      } catch (CancellationException e) {
1✔
565
        responseObserver.onNext(PollActivityTaskQueueResponse.getDefaultInstance());
1✔
566
        responseObserver.onCompleted();
1✔
567
        return;
1✔
568
      }
1✔
569

570
      ExecutionId executionId =
1✔
571
          new ExecutionId(pollRequest.getNamespace(), task.getWorkflowExecution());
1✔
572
      TestWorkflowMutableState mutableState = getMutableState(executionId);
1✔
573
      try {
574
        mutableState.startActivityTask(task, pollRequest);
1✔
575
        responseObserver.onNext(task.build());
1✔
576
        responseObserver.onCompleted();
1✔
577
      } catch (StatusRuntimeException e) {
1✔
578
        if (e.getStatus().getCode() == Status.Code.NOT_FOUND) {
1✔
579
          if (log.isDebugEnabled()) {
1✔
580
            log.debug("Skipping outdated activity task for " + executionId, e);
×
581
          }
582
          responseObserver.onNext(PollActivityTaskQueueResponse.getDefaultInstance());
1✔
583
          responseObserver.onCompleted();
1✔
584
        } else {
585
          if (e.getStatus().getCode() == Status.Code.INTERNAL) {
×
586
            log.error("unexpected", e);
×
587
          }
588
          responseObserver.onError(e);
×
589
        }
590
      }
1✔
591
    }
1✔
592
  }
1✔
593

594
  @Override
595
  public void recordActivityTaskHeartbeat(
596
      RecordActivityTaskHeartbeatRequest heartbeatRequest,
597
      StreamObserver<RecordActivityTaskHeartbeatResponse> responseObserver) {
598
    try {
599
      ActivityTaskToken activityTaskToken =
1✔
600
          ActivityTaskToken.fromBytes(heartbeatRequest.getTaskToken());
1✔
601
      TestWorkflowMutableState mutableState = getMutableState(activityTaskToken.getExecutionId());
1✔
602
      boolean cancelRequested =
1✔
603
          mutableState.heartbeatActivityTask(
1✔
604
              activityTaskToken.getScheduledEventId(), heartbeatRequest.getDetails());
1✔
605
      responseObserver.onNext(
1✔
606
          RecordActivityTaskHeartbeatResponse.newBuilder()
1✔
607
              .setCancelRequested(cancelRequested)
1✔
608
              .build());
1✔
609
      responseObserver.onCompleted();
1✔
610
    } catch (StatusRuntimeException e) {
1✔
611
      handleStatusRuntimeException(e, responseObserver);
1✔
612
    }
1✔
613
  }
1✔
614

615
  @Override
616
  public void recordActivityTaskHeartbeatById(
617
      RecordActivityTaskHeartbeatByIdRequest heartbeatRequest,
618
      StreamObserver<RecordActivityTaskHeartbeatByIdResponse> responseObserver) {
619
    try {
620
      ExecutionId execution =
×
621
          new ExecutionId(
622
              heartbeatRequest.getNamespace(),
×
623
              heartbeatRequest.getWorkflowId(),
×
624
              heartbeatRequest.getRunId());
×
625
      TestWorkflowMutableState mutableState = getMutableState(execution);
×
626
      boolean cancelRequested =
×
627
          mutableState.heartbeatActivityTaskById(
×
628
              heartbeatRequest.getActivityId(),
×
629
              heartbeatRequest.getDetails(),
×
630
              heartbeatRequest.getIdentity());
×
631
      responseObserver.onNext(
×
632
          RecordActivityTaskHeartbeatByIdResponse.newBuilder()
×
633
              .setCancelRequested(cancelRequested)
×
634
              .build());
×
635
      responseObserver.onCompleted();
×
636
    } catch (StatusRuntimeException e) {
×
637
      handleStatusRuntimeException(e, responseObserver);
×
638
    }
×
639
  }
×
640

641
  @Override
642
  public void respondActivityTaskCompleted(
643
      RespondActivityTaskCompletedRequest completeRequest,
644
      StreamObserver<RespondActivityTaskCompletedResponse> responseObserver) {
645
    try {
646
      ActivityTaskToken activityTaskToken =
1✔
647
          ActivityTaskToken.fromBytes(completeRequest.getTaskToken());
1✔
648
      TestWorkflowMutableState mutableState = getMutableState(activityTaskToken.getExecutionId());
1✔
649
      mutableState.completeActivityTask(activityTaskToken.getScheduledEventId(), completeRequest);
1✔
650
      responseObserver.onNext(RespondActivityTaskCompletedResponse.getDefaultInstance());
1✔
651
      responseObserver.onCompleted();
1✔
652
    } catch (StatusRuntimeException e) {
1✔
653
      handleStatusRuntimeException(e, responseObserver);
1✔
654
    }
1✔
655
  }
1✔
656

657
  @Override
658
  public void respondActivityTaskCompletedById(
659
      RespondActivityTaskCompletedByIdRequest completeRequest,
660
      StreamObserver<RespondActivityTaskCompletedByIdResponse> responseObserver) {
661
    try {
662
      ExecutionId executionId =
×
663
          new ExecutionId(
664
              completeRequest.getNamespace(),
×
665
              completeRequest.getWorkflowId(),
×
666
              completeRequest.getRunId());
×
667
      TestWorkflowMutableState mutableState = getMutableState(executionId);
×
668
      mutableState.completeActivityTaskById(completeRequest.getActivityId(), completeRequest);
×
669
      responseObserver.onNext(RespondActivityTaskCompletedByIdResponse.getDefaultInstance());
×
670
      responseObserver.onCompleted();
×
671
    } catch (StatusRuntimeException e) {
×
672
      handleStatusRuntimeException(e, responseObserver);
×
673
    }
×
674
  }
×
675

676
  @Override
677
  public void respondActivityTaskFailed(
678
      RespondActivityTaskFailedRequest failRequest,
679
      StreamObserver<RespondActivityTaskFailedResponse> responseObserver) {
680
    try {
681
      ActivityTaskToken activityTaskToken = ActivityTaskToken.fromBytes(failRequest.getTaskToken());
1✔
682
      TestWorkflowMutableState mutableState = getMutableState(activityTaskToken.getExecutionId());
1✔
683
      mutableState.failActivityTask(activityTaskToken.getScheduledEventId(), failRequest);
1✔
684
      responseObserver.onNext(RespondActivityTaskFailedResponse.getDefaultInstance());
1✔
685
      responseObserver.onCompleted();
1✔
686
    } catch (StatusRuntimeException e) {
1✔
687
      handleStatusRuntimeException(e, responseObserver);
1✔
688
    }
1✔
689
  }
1✔
690

691
  @Override
692
  public void respondActivityTaskFailedById(
693
      RespondActivityTaskFailedByIdRequest failRequest,
694
      StreamObserver<RespondActivityTaskFailedByIdResponse> responseObserver) {
695
    try {
696
      ExecutionId executionId =
×
697
          new ExecutionId(
698
              failRequest.getNamespace(), failRequest.getWorkflowId(), failRequest.getRunId());
×
699
      TestWorkflowMutableState mutableState = getMutableState(executionId);
×
700
      mutableState.failActivityTaskById(failRequest.getActivityId(), failRequest);
×
701
      responseObserver.onNext(RespondActivityTaskFailedByIdResponse.getDefaultInstance());
×
702
      responseObserver.onCompleted();
×
703
    } catch (StatusRuntimeException e) {
×
704
      handleStatusRuntimeException(e, responseObserver);
×
705
    }
×
706
  }
×
707

708
  @Override
709
  public void respondActivityTaskCanceled(
710
      RespondActivityTaskCanceledRequest canceledRequest,
711
      StreamObserver<RespondActivityTaskCanceledResponse> responseObserver) {
712
    try {
713
      ActivityTaskToken activityTaskToken =
1✔
714
          ActivityTaskToken.fromBytes(canceledRequest.getTaskToken());
1✔
715
      TestWorkflowMutableState mutableState = getMutableState(activityTaskToken.getExecutionId());
1✔
716
      mutableState.cancelActivityTask(activityTaskToken.getScheduledEventId(), canceledRequest);
1✔
717
      responseObserver.onNext(RespondActivityTaskCanceledResponse.getDefaultInstance());
1✔
718
      responseObserver.onCompleted();
1✔
719
    } catch (StatusRuntimeException e) {
×
720
      handleStatusRuntimeException(e, responseObserver);
×
721
    }
1✔
722
  }
1✔
723

724
  @Override
725
  public void respondActivityTaskCanceledById(
726
      RespondActivityTaskCanceledByIdRequest canceledRequest,
727
      StreamObserver<RespondActivityTaskCanceledByIdResponse> responseObserver) {
728
    try {
729
      ExecutionId executionId =
×
730
          new ExecutionId(
731
              canceledRequest.getNamespace(),
×
732
              canceledRequest.getWorkflowId(),
×
733
              canceledRequest.getRunId());
×
734
      TestWorkflowMutableState mutableState = getMutableState(executionId);
×
735
      mutableState.cancelActivityTaskById(canceledRequest.getActivityId(), canceledRequest);
×
736
      responseObserver.onNext(RespondActivityTaskCanceledByIdResponse.getDefaultInstance());
×
737
      responseObserver.onCompleted();
×
738
    } catch (StatusRuntimeException e) {
×
739
      handleStatusRuntimeException(e, responseObserver);
×
740
    }
×
741
  }
×
742

743
  @Override
744
  public void requestCancelWorkflowExecution(
745
      RequestCancelWorkflowExecutionRequest cancelRequest,
746
      StreamObserver<RequestCancelWorkflowExecutionResponse> responseObserver) {
747
    try {
748
      requestCancelWorkflowExecution(cancelRequest, Optional.empty());
1✔
749
      responseObserver.onNext(RequestCancelWorkflowExecutionResponse.getDefaultInstance());
1✔
750
      responseObserver.onCompleted();
1✔
751
    } catch (StatusRuntimeException e) {
1✔
752
      handleStatusRuntimeException(e, responseObserver);
1✔
753
    }
1✔
754
  }
1✔
755

756
  void requestCancelWorkflowExecution(
757
      RequestCancelWorkflowExecutionRequest cancelRequest,
758
      Optional<TestWorkflowMutableStateImpl.CancelExternalWorkflowExecutionCallerInfo> callerInfo) {
759
    ExecutionId executionId =
1✔
760
        new ExecutionId(cancelRequest.getNamespace(), cancelRequest.getWorkflowExecution());
1✔
761
    TestWorkflowMutableState mutableState = getMutableState(executionId);
1✔
762
    mutableState.requestCancelWorkflowExecution(cancelRequest, callerInfo);
1✔
763
  }
1✔
764

765
  @Override
766
  public void terminateWorkflowExecution(
767
      TerminateWorkflowExecutionRequest request,
768
      StreamObserver<TerminateWorkflowExecutionResponse> responseObserver) {
769
    try {
770
      terminateWorkflowExecution(request);
1✔
771
      responseObserver.onNext(TerminateWorkflowExecutionResponse.getDefaultInstance());
1✔
772
      responseObserver.onCompleted();
1✔
773
    } catch (StatusRuntimeException e) {
1✔
774
      handleStatusRuntimeException(e, responseObserver);
1✔
775
    }
1✔
776
  }
1✔
777

778
  private void terminateWorkflowExecution(TerminateWorkflowExecutionRequest request) {
779
    ExecutionId executionId =
1✔
780
        new ExecutionId(request.getNamespace(), request.getWorkflowExecution());
1✔
781
    TestWorkflowMutableState mutableState = getMutableState(executionId);
1✔
782
    mutableState.terminateWorkflowExecution(request);
1✔
783
  }
1✔
784

785
  @Override
786
  public void signalWorkflowExecution(
787
      SignalWorkflowExecutionRequest signalRequest,
788
      StreamObserver<SignalWorkflowExecutionResponse> responseObserver) {
789
    try {
790
      ExecutionId executionId =
1✔
791
          new ExecutionId(signalRequest.getNamespace(), signalRequest.getWorkflowExecution());
1✔
792
      TestWorkflowMutableState mutableState = getMutableState(executionId);
1✔
793
      mutableState.signal(signalRequest);
1✔
794
      responseObserver.onNext(SignalWorkflowExecutionResponse.getDefaultInstance());
1✔
795
      responseObserver.onCompleted();
1✔
796
    } catch (StatusRuntimeException e) {
1✔
797
      handleStatusRuntimeException(e, responseObserver);
1✔
798
    }
1✔
799
  }
1✔
800

801
  @Override
802
  public void updateWorkflowExecution(
803
      UpdateWorkflowExecutionRequest request,
804
      StreamObserver<UpdateWorkflowExecutionResponse> responseObserver) {
805
    try (Context.CancellableContext ctx = deadlineCtx(getUpdatePollDeadline())) {
1✔
806
      Context toRestore = ctx.attach();
1✔
807
      try {
808
        ExecutionId executionId =
1✔
809
            new ExecutionId(request.getNamespace(), request.getWorkflowExecution());
1✔
810
        TestWorkflowMutableState mutableState = getMutableState(executionId);
1✔
811
        @Nullable Deadline deadline = Context.current().getDeadline();
1✔
812
        UpdateWorkflowExecutionResponse response =
1✔
813
            mutableState.updateWorkflowExecution(request, deadline);
1✔
814
        responseObserver.onNext(response);
1✔
815
        responseObserver.onCompleted();
1✔
816
      } catch (StatusRuntimeException e) {
1✔
817
        handleStatusRuntimeException(e, responseObserver);
1✔
818
      } finally {
819
        ctx.detach(toRestore);
1✔
820
      }
821
    }
822
  }
1✔
823

824
  @Override
825
  public void pollWorkflowExecutionUpdate(
826
      PollWorkflowExecutionUpdateRequest request,
827
      StreamObserver<PollWorkflowExecutionUpdateResponse> responseObserver) {
828
    try (Context.CancellableContext ctx = deadlineCtx(getUpdatePollDeadline())) {
1✔
829
      Context toRestore = ctx.attach();
1✔
830
      try {
831
        ExecutionId executionId =
1✔
832
            new ExecutionId(request.getNamespace(), request.getUpdateRef().getWorkflowExecution());
1✔
833
        TestWorkflowMutableState mutableState = getMutableState(executionId);
1✔
834
        @Nullable Deadline deadline = Context.current().getDeadline();
1✔
835
        PollWorkflowExecutionUpdateResponse response =
1✔
836
            mutableState.pollUpdateWorkflowExecution(request, deadline);
1✔
837
        responseObserver.onNext(response);
1✔
838
        responseObserver.onCompleted();
1✔
839
      } catch (StatusRuntimeException e) {
1✔
840
        handleStatusRuntimeException(e, responseObserver);
1✔
841
      } finally {
842
        ctx.detach(toRestore);
1✔
843
      }
844
    }
845
  }
1✔
846

847
  @Override
848
  public void signalWithStartWorkflowExecution(
849
      SignalWithStartWorkflowExecutionRequest r,
850
      StreamObserver<SignalWithStartWorkflowExecutionResponse> responseObserver) {
851
    try {
852
      if (!r.hasTaskQueue()) {
1✔
853
        throw Status.INVALID_ARGUMENT
×
854
            .withDescription("request missing required taskQueue field")
×
855
            .asRuntimeException();
×
856
      }
857
      if (!r.hasWorkflowType()) {
1✔
858
        throw Status.INVALID_ARGUMENT
×
859
            .withDescription("request missing required workflowType field")
×
860
            .asRuntimeException();
×
861
      }
862
      ExecutionId executionId = new ExecutionId(r.getNamespace(), r.getWorkflowId(), null);
1✔
863
      TestWorkflowMutableState mutableState = getMutableState(executionId, false);
1✔
864
      SignalWorkflowExecutionRequest signalRequest =
865
          SignalWorkflowExecutionRequest.newBuilder()
1✔
866
              .setInput(r.getSignalInput())
1✔
867
              .setSignalName(r.getSignalName())
1✔
868
              .setWorkflowExecution(executionId.getExecution())
1✔
869
              .setRequestId(r.getRequestId())
1✔
870
              .setControl(r.getControl())
1✔
871
              .setNamespace(r.getNamespace())
1✔
872
              .setIdentity(r.getIdentity())
1✔
873
              .build();
1✔
874
      if (mutableState != null && !mutableState.isTerminalState()) {
1✔
875
        mutableState.signal(signalRequest);
1✔
876
        responseObserver.onNext(
1✔
877
            SignalWithStartWorkflowExecutionResponse.newBuilder()
1✔
878
                .setRunId(mutableState.getExecutionId().getExecution().getRunId())
1✔
879
                .build());
1✔
880
        responseObserver.onCompleted();
1✔
881
        return;
1✔
882
      }
883
      StartWorkflowExecutionRequest.Builder startRequest =
884
          StartWorkflowExecutionRequest.newBuilder()
1✔
885
              .setRequestId(r.getRequestId())
1✔
886
              .setInput(r.getInput())
1✔
887
              .setWorkflowExecutionTimeout(r.getWorkflowExecutionTimeout())
1✔
888
              .setWorkflowRunTimeout(r.getWorkflowRunTimeout())
1✔
889
              .setWorkflowTaskTimeout(r.getWorkflowTaskTimeout())
1✔
890
              .setNamespace(r.getNamespace())
1✔
891
              .setTaskQueue(r.getTaskQueue())
1✔
892
              .setWorkflowId(r.getWorkflowId())
1✔
893
              .setWorkflowIdReusePolicy(r.getWorkflowIdReusePolicy())
1✔
894
              .setIdentity(r.getIdentity())
1✔
895
              .setWorkflowType(r.getWorkflowType())
1✔
896
              .setCronSchedule(r.getCronSchedule())
1✔
897
              .setRequestId(r.getRequestId());
1✔
898
      if (r.hasRetryPolicy()) {
1✔
899
        startRequest.setRetryPolicy(r.getRetryPolicy());
×
900
      }
901
      if (r.hasHeader()) {
1✔
902
        startRequest.setHeader(r.getHeader());
1✔
903
      }
904
      if (r.hasMemo()) {
1✔
905
        startRequest.setMemo(r.getMemo());
×
906
      }
907
      if (r.hasSearchAttributes()) {
1✔
908
        startRequest.setSearchAttributes(r.getSearchAttributes());
1✔
909
      }
910
      if (r.hasWorkflowStartDelay()) {
1✔
911
        startRequest.setWorkflowStartDelay(r.getWorkflowStartDelay());
1✔
912
      }
913

914
      StartWorkflowExecutionResponse startResult =
1✔
915
          startWorkflowExecutionImpl(
1✔
916
              startRequest.build(),
1✔
917
              Duration.ZERO,
918
              Optional.empty(),
1✔
919
              OptionalLong.empty(),
1✔
920
              signalRequest);
921
      responseObserver.onNext(
1✔
922
          SignalWithStartWorkflowExecutionResponse.newBuilder()
1✔
923
              .setRunId(startResult.getRunId())
1✔
924
              .build());
1✔
925
      responseObserver.onCompleted();
1✔
926
    } catch (StatusRuntimeException e) {
1✔
927
      handleStatusRuntimeException(e, responseObserver);
1✔
928
    }
1✔
929
  }
1✔
930

931
  public void signalExternalWorkflowExecution(
932
      String signalId,
933
      SignalExternalWorkflowExecutionCommandAttributes commandAttributes,
934
      TestWorkflowMutableState source) {
935
    String namespace;
936
    if (commandAttributes.getNamespace().isEmpty()) {
1✔
937
      namespace = source.getExecutionId().getNamespace();
1✔
938
    } else {
939
      namespace = commandAttributes.getNamespace();
×
940
    }
941
    ExecutionId executionId = new ExecutionId(namespace, commandAttributes.getExecution());
1✔
942
    TestWorkflowMutableState mutableState;
943
    try {
944
      mutableState = getMutableState(executionId);
1✔
945
      mutableState.signalFromWorkflow(commandAttributes);
1✔
946
      source.completeSignalExternalWorkflowExecution(
1✔
947
          signalId, mutableState.getExecutionId().getExecution().getRunId());
1✔
948
    } catch (StatusRuntimeException e) {
1✔
949
      if (e.getStatus().getCode() == Status.Code.NOT_FOUND) {
1✔
950
        source.failSignalExternalWorkflowExecution(
1✔
951
            signalId,
952
            SignalExternalWorkflowExecutionFailedCause
953
                .SIGNAL_EXTERNAL_WORKFLOW_EXECUTION_FAILED_CAUSE_EXTERNAL_WORKFLOW_EXECUTION_NOT_FOUND);
954
      } else {
955
        throw e;
×
956
      }
957
    }
1✔
958
  }
1✔
959

960
  /**
961
   * Creates next run of a workflow execution
962
   *
963
   * @return RunId
964
   */
965
  public String continueAsNew(
966
      StartWorkflowExecutionRequest previousRunStartRequest,
967
      ContinueAsNewWorkflowExecutionCommandAttributes ca,
968
      WorkflowExecutionContinuedAsNewEventAttributes ea,
969
      Optional<TestServiceRetryState> retryState,
970
      String identity,
971
      ExecutionId continuedExecutionId,
972
      String firstExecutionRunId,
973
      Optional<TestWorkflowMutableState> parent,
974
      OptionalLong parentChildInitiatedEventId) {
975
    StartWorkflowExecutionRequest.Builder startRequestBuilder =
976
        StartWorkflowExecutionRequest.newBuilder()
1✔
977
            .setRequestId(UUID.randomUUID().toString())
1✔
978
            .setWorkflowType(ea.getWorkflowType())
1✔
979
            .setWorkflowRunTimeout(ea.getWorkflowRunTimeout())
1✔
980
            .setWorkflowTaskTimeout(ea.getWorkflowTaskTimeout())
1✔
981
            .setNamespace(continuedExecutionId.getNamespace())
1✔
982
            .setTaskQueue(ea.getTaskQueue())
1✔
983
            .setWorkflowId(continuedExecutionId.getWorkflowId().getWorkflowId())
1✔
984
            .setWorkflowIdReusePolicy(previousRunStartRequest.getWorkflowIdReusePolicy())
1✔
985
            .setIdentity(identity)
1✔
986
            .setCronSchedule(previousRunStartRequest.getCronSchedule());
1✔
987
    // TODO: Service doesn't perform this copy.
988
    // See https://github.com/temporalio/temporal/issues/5249
989
    //    if (previousRunStartRequest.hasRetryPolicy()) {
990
    //      startRequestBuilder.setRetryPolicy(previousRunStartRequest.getRetryPolicy());
991
    //    }
992
    if (ca.hasRetryPolicy()) {
1✔
993
      startRequestBuilder.setRetryPolicy(ca.getRetryPolicy());
1✔
994
    }
995
    if (ea.hasInput()) {
1✔
996
      startRequestBuilder.setInput(ea.getInput());
1✔
997
    }
998
    if (ea.hasHeader()) {
1✔
999
      startRequestBuilder.setHeader(ea.getHeader());
1✔
1000
    }
1001
    StartWorkflowExecutionRequest startRequest = startRequestBuilder.build();
1✔
1002
    lock.lock();
1✔
1003
    Optional<Failure> lastFail =
1004
        ea.hasFailure()
1✔
1005
            ? Optional.of(ea.getFailure())
1✔
1006
            : retryState.flatMap(TestServiceRetryState::getPreviousRunFailure);
1✔
1007
    try {
1008
      StartWorkflowExecutionResponse response =
1✔
1009
          startWorkflowExecutionNoRunningCheckLocked(
1✔
1010
              startRequest,
1011
              ea.getNewExecutionRunId(),
1✔
1012
              firstExecutionRunId,
1013
              Optional.of(continuedExecutionId.getExecution().getRunId()),
1✔
1014
              retryState,
1015
              ProtobufTimeUtils.toJavaDuration(ea.getBackoffStartInterval()),
1✔
1016
              ea.getLastCompletionResult(),
1✔
1017
              lastFail,
1018
              parent,
1019
              parentChildInitiatedEventId,
1020
              null,
1021
              continuedExecutionId.getWorkflowId());
1✔
1022
      return response.getRunId();
1✔
1023
    } finally {
1024
      lock.unlock();
1✔
1025
    }
1026
  }
1027

1028
  @Override
1029
  public void listOpenWorkflowExecutions(
1030
      ListOpenWorkflowExecutionsRequest listRequest,
1031
      StreamObserver<ListOpenWorkflowExecutionsResponse> responseObserver) {
1032
    try {
1033
      Optional<String> workflowIdFilter;
1034
      if (listRequest.hasExecutionFilter()
1✔
1035
          && !listRequest.getExecutionFilter().getWorkflowId().isEmpty()) {
×
1036
        workflowIdFilter = Optional.of(listRequest.getExecutionFilter().getWorkflowId());
×
1037
      } else {
1038
        workflowIdFilter = Optional.empty();
1✔
1039
      }
1040
      List<WorkflowExecutionInfo> result =
1✔
1041
          store.listWorkflows(WorkflowState.OPEN, workflowIdFilter);
1✔
1042
      responseObserver.onNext(
1✔
1043
          ListOpenWorkflowExecutionsResponse.newBuilder().addAllExecutions(result).build());
1✔
1044
      responseObserver.onCompleted();
1✔
1045
    } catch (StatusRuntimeException e) {
×
1046
      handleStatusRuntimeException(e, responseObserver);
×
1047
    }
1✔
1048
  }
1✔
1049

1050
  @Override
1051
  public void listClosedWorkflowExecutions(
1052
      ListClosedWorkflowExecutionsRequest listRequest,
1053
      StreamObserver<ListClosedWorkflowExecutionsResponse> responseObserver) {
1054
    try {
1055
      Optional<String> workflowIdFilter;
1056
      if (listRequest.hasExecutionFilter()
1✔
1057
          && !listRequest.getExecutionFilter().getWorkflowId().isEmpty()) {
×
1058
        workflowIdFilter = Optional.of(listRequest.getExecutionFilter().getWorkflowId());
×
1059
      } else {
1060
        workflowIdFilter = Optional.empty();
1✔
1061
      }
1062
      List<WorkflowExecutionInfo> result =
1✔
1063
          store.listWorkflows(WorkflowState.CLOSED, workflowIdFilter);
1✔
1064
      responseObserver.onNext(
1✔
1065
          ListClosedWorkflowExecutionsResponse.newBuilder().addAllExecutions(result).build());
1✔
1066
      responseObserver.onCompleted();
1✔
1067
    } catch (StatusRuntimeException e) {
×
1068
      handleStatusRuntimeException(e, responseObserver);
×
1069
    }
1✔
1070
  }
1✔
1071

1072
  @Override
1073
  public void respondQueryTaskCompleted(
1074
      RespondQueryTaskCompletedRequest completeRequest,
1075
      StreamObserver<RespondQueryTaskCompletedResponse> responseObserver) {
1076
    try {
1077
      QueryId queryId = QueryId.fromBytes(completeRequest.getTaskToken());
1✔
1078
      TestWorkflowMutableState mutableState = getMutableState(queryId.getExecutionId());
1✔
1079
      mutableState.completeQuery(queryId, completeRequest);
1✔
1080
      responseObserver.onNext(RespondQueryTaskCompletedResponse.getDefaultInstance());
1✔
1081
      responseObserver.onCompleted();
1✔
1082
    } catch (StatusRuntimeException e) {
×
1083
      handleStatusRuntimeException(e, responseObserver);
×
1084
    }
1✔
1085
  }
1✔
1086

1087
  @Override
1088
  public void queryWorkflow(
1089
      QueryWorkflowRequest queryRequest, StreamObserver<QueryWorkflowResponse> responseObserver) {
1090
    try {
1091
      ExecutionId executionId =
1✔
1092
          new ExecutionId(queryRequest.getNamespace(), queryRequest.getExecution());
1✔
1093
      TestWorkflowMutableState mutableState = getMutableState(executionId);
1✔
1094
      @Nullable Deadline deadline = Context.current().getDeadline();
1✔
1095
      QueryWorkflowResponse result =
1✔
1096
          mutableState.query(
1✔
1097
              queryRequest,
1098
              deadline != null ? deadline.timeRemaining(TimeUnit.MILLISECONDS) : Long.MAX_VALUE);
1✔
1099
      responseObserver.onNext(result);
1✔
1100
      responseObserver.onCompleted();
1✔
1101
    } catch (StatusRuntimeException e) {
1✔
1102
      handleStatusRuntimeException(e, responseObserver);
1✔
1103
    }
1✔
1104
  }
1✔
1105

1106
  @Override
1107
  public void describeWorkflowExecution(
1108
      DescribeWorkflowExecutionRequest request,
1109
      StreamObserver<io.temporal.api.workflowservice.v1.DescribeWorkflowExecutionResponse>
1110
          responseObserver) {
1111
    try {
1112
      if (request.getNamespace().isEmpty()) {
1✔
1113
        throw createInvalidArgument("Namespace not set on request.");
×
1114
      }
1115
      if (!request.hasExecution()) {
1✔
1116
        throw createInvalidArgument("Execution not set on request.");
×
1117
      }
1118

1119
      ExecutionId executionId = new ExecutionId(request.getNamespace(), request.getExecution());
1✔
1120
      TestWorkflowMutableState mutableState = getMutableState(executionId);
1✔
1121
      DescribeWorkflowExecutionResponse result = mutableState.describeWorkflowExecution();
1✔
1122
      responseObserver.onNext(result);
1✔
1123
      responseObserver.onCompleted();
1✔
1124
    } catch (StatusRuntimeException e) {
1✔
1125
      handleStatusRuntimeException(e, responseObserver);
1✔
1126
    }
1✔
1127
  }
1✔
1128

1129
  /**
1130
   * This method doesn't make much sense for test server, it accepts all namespaces as existent and
1131
   * registered. so, it's a trivial implementation just returning an info that a namespace is
1132
   * registered irrespectively of the input
1133
   */
1134
  @Override
1135
  public void describeNamespace(
1136
      DescribeNamespaceRequest request,
1137
      StreamObserver<DescribeNamespaceResponse> responseObserver) {
1138
    try {
1139
      if (request.getNamespace().isEmpty()) {
1✔
1140
        throw createInvalidArgument("Namespace not set on request.");
×
1141
      }
1142
      // generating a stable UUID for name
1143
      String namespaceId = UUID.nameUUIDFromBytes(request.getNamespace().getBytes()).toString();
1✔
1144
      DescribeNamespaceResponse result =
1145
          DescribeNamespaceResponse.newBuilder()
1✔
1146
              .setNamespaceInfo(
1✔
1147
                  NamespaceInfo.newBuilder()
1✔
1148
                      .setName(request.getNamespace())
1✔
1149
                      .setState(NamespaceState.NAMESPACE_STATE_REGISTERED)
1✔
1150
                      .setId(namespaceId)
1✔
1151
                      .build())
1✔
1152
              .build();
1✔
1153
      responseObserver.onNext(result);
1✔
1154
      responseObserver.onCompleted();
1✔
1155
    } catch (StatusRuntimeException e) {
1✔
1156
      handleStatusRuntimeException(e, responseObserver);
1✔
1157
    }
1✔
1158
  }
1✔
1159

1160
  private <R> R requireNotNull(String fieldName, R value) {
1161
    if (value == null) {
1✔
1162
      throw Status.INVALID_ARGUMENT
×
1163
          .withDescription("Missing required field \"" + fieldName + "\".")
×
1164
          .asRuntimeException();
×
1165
    }
1166
    return value;
1✔
1167
  }
1168

1169
  /**
1170
   * Adds diagnostic data about internal service state to the provided {@link StringBuilder}.
1171
   * Includes histories of all workflow instances stored in the service.
1172
   */
1173
  public void getDiagnostics(StringBuilder result) {
1174
    store.getDiagnostics(result);
×
1175
  }
×
1176

1177
  /**
1178
   * @deprecated use {@link io.temporal.serviceclient.TestServiceStubs} and {@link
1179
   *     io.temporal.api.testservice.v1.TestServiceGrpc.TestServiceBlockingStub#getCurrentTime(Empty)}
1180
   */
1181
  @Deprecated
1182
  public long currentTimeMillis() {
1183
    return selfAdvancingTimer.getClock().getAsLong();
×
1184
  }
1185

1186
  /** Invokes callback after the specified delay according to internal service clock. */
1187
  public void registerDelayedCallback(Duration delay, Runnable r) {
1188
    store.registerDelayedCallback(delay, r);
1✔
1189
  }
1✔
1190

1191
  /**
1192
   * Disables time skipping. To re-enable call {@link #unlockTimeSkipping(String)}. These calls are
1193
   * counted, so calling unlock does not guarantee that time is going to be skipped immediately as
1194
   * another lock can be holding it.
1195
   *
1196
   * @deprecated use {@link io.temporal.serviceclient.TestServiceStubs} and {@link
1197
   *     io.temporal.api.testservice.v1.TestServiceGrpc.TestServiceBlockingStub#lockTimeSkipping(LockTimeSkippingRequest)}
1198
   */
1199
  @Deprecated
1200
  public void lockTimeSkipping(String caller) {
1201
    selfAdvancingTimer.lockTimeSkipping(caller);
×
1202
  }
×
1203

1204
  /**
1205
   * @deprecated use {@link io.temporal.serviceclient.TestServiceStubs} and {@link
1206
   *     io.temporal.api.testservice.v1.TestServiceGrpc.TestServiceBlockingStub#unlockTimeSkipping(UnlockTimeSkippingRequest)}
1207
   */
1208
  @Deprecated
1209
  public void unlockTimeSkipping(String caller) {
1210
    selfAdvancingTimer.unlockTimeSkipping(caller);
×
1211
  }
×
1212

1213
  /**
1214
   * Unlocks time skipping and blocks the calling thread until internal clock passes the current +
1215
   * duration time.<br>
1216
   * When the time is reached, locks time skipping and returns.<br>
1217
   * Might not block at all due to time skipping. Or might block if the time skipping lock counter
1218
   * was more than 1.
1219
   *
1220
   * @deprecated use {@link io.temporal.serviceclient.TestServiceStubs} and {@link
1221
   *     io.temporal.api.testservice.v1.TestServiceGrpc.TestServiceBlockingStub#unlockTimeSkippingWithSleep(SleepRequest)}
1222
   */
1223
  @Deprecated
1224
  public void sleep(Duration duration) {
1225
    CompletableFuture<Void> result = new CompletableFuture<>();
×
1226
    selfAdvancingTimer.schedule(
×
1227
        duration,
1228
        () -> {
1229
          selfAdvancingTimer.lockTimeSkipping("TestWorkflowService sleep");
×
1230
          result.complete(null);
×
1231
        },
×
1232
        "workflow sleep");
1233
    selfAdvancingTimer.unlockTimeSkipping("TestWorkflowService sleep");
×
1234
    try {
1235
      result.get();
×
1236
    } catch (InterruptedException e) {
×
1237
      Thread.currentThread().interrupt();
×
1238
      throw new RuntimeException(e);
×
1239
    } catch (ExecutionException e) {
×
1240
      throw new RuntimeException(e);
×
1241
    }
×
1242
  }
×
1243

1244
  /**
1245
   * Temporal server times out task queue long poll calls after 1 minute and returns an empty
1246
   * result. After which the request has to be retried by the client if it wants to continue
1247
   * waiting. We emulate this behavior here.
1248
   *
1249
   * <p>If there is a deadline present, for task queue poll requests server will respond inside the
1250
   * deadline. Note that the latest is not applicable for getWorkflowExecutionHistory() long polls.
1251
   *
1252
   * @return minimum between the context deadline and maximum long poll deadline.
1253
   */
1254
  private Deadline getLongPollDeadline() {
1255
    @Nullable Deadline deadline = Context.current().getDeadline();
1✔
1256
    Deadline maximumDeadline =
1✔
1257
        Deadline.after(
1✔
1258
            WorkflowServiceStubsOptions.DEFAULT_SERVER_LONG_POLL_RPC_TIMEOUT.toMillis(),
1✔
1259
            TimeUnit.MILLISECONDS);
1260
    return deadline != null ? deadline.minimum(maximumDeadline) : maximumDeadline;
1✔
1261
  }
1262

1263
  private Deadline getUpdatePollDeadline() {
1264
    @Nullable Deadline deadline = Context.current().getDeadline();
1✔
1265
    Deadline maximumDeadline =
1✔
1266
        Deadline.after(Duration.ofSeconds(10).toMillis(), TimeUnit.MILLISECONDS);
1✔
1267
    return deadline != null ? deadline.minimum(maximumDeadline) : maximumDeadline;
1✔
1268
  }
1269

1270
  private void handleStatusRuntimeException(
1271
      StatusRuntimeException e, StreamObserver<?> responseObserver) {
1272
    if (e.getStatus().getCode() == Status.Code.INTERNAL) {
1✔
1273
      log.error("unexpected", e);
1✔
1274
    }
1275
    responseObserver.onError(e);
1✔
1276
  }
1✔
1277

1278
  /**
1279
   * Creates an in-memory service along with client stubs for use in Java code. See also
1280
   * createServerOnly and createWithNoGrpcServer.
1281
   *
1282
   * @deprecated use {@link io.temporal.testserver.TestServer#createServer(boolean)} instead and
1283
   *     pass {@code lockTimeSkipping=false} to emulate the behavior of this method
1284
   */
1285
  @Deprecated
1286
  public TestWorkflowService() {
1287
    this(0, true);
×
1288
  }
×
1289

1290
  /**
1291
   * Creates an in-memory service along with client stubs for use in Java code. See also
1292
   * createServerOnly and createWithNoGrpcServer.
1293
   *
1294
   * @deprecated use {@link io.temporal.testserver.TestServer#createServer(boolean, long)} instead
1295
   *     and pass {@code lockTimeSkipping=false} to emulate the behavior of this method
1296
   */
1297
  @Deprecated
1298
  public TestWorkflowService(long initialTimeMillis) {
1299
    this(initialTimeMillis, true);
×
1300
  }
×
1301

1302
  /**
1303
   * Creates an in-memory service along with client stubs for use in Java code. See also
1304
   * createServerOnly and createWithNoGrpcServer.
1305
   *
1306
   * @deprecated use {@link io.temporal.testserver.TestServer#createServer(boolean)} instead
1307
   */
1308
  @Deprecated
1309
  public TestWorkflowService(boolean lockTimeSkipping) {
1310
    this(0, true);
×
1311
    if (lockTimeSkipping) {
×
1312
      this.lockTimeSkipping("constructor");
×
1313
    }
1314
  }
×
1315

1316
  /**
1317
   * Creates an instance of TestWorkflowService that does not manage its own gRPC server. Useful for
1318
   * including in an externally managed gRPC server.
1319
   *
1320
   * @deprecated use {@link TestServicesStarter} to create just the services with gRPC server
1321
   */
1322
  @Deprecated
1323
  public static TestWorkflowService createWithNoGrpcServer() {
1324
    return new TestWorkflowService(0, false);
×
1325
  }
1326

1327
  private TestWorkflowService(long initialTimeMillis, boolean startInProcessServer) {
×
1328
    this.selfAdvancingTimer =
×
1329
        new SelfAdvancingTimerImpl(initialTimeMillis, Clock.systemDefaultZone());
×
1330
    store = new TestWorkflowStoreImpl(this.selfAdvancingTimer);
×
1331
    visibilityStore = new TestVisibilityStoreImpl();
×
1332
    outOfProcessServer = null;
×
1333
    if (startInProcessServer) {
×
1334
      this.inProcessServer = new InProcessGRPCServer(Collections.singletonList(this));
×
1335
      this.workflowServiceStubs =
×
1336
          WorkflowServiceStubs.newServiceStubs(
×
1337
              WorkflowServiceStubsOptions.newBuilder()
×
1338
                  .setChannel(inProcessServer.getChannel())
×
1339
                  .build());
×
1340
    } else {
1341
      this.inProcessServer = null;
×
1342
      this.workflowServiceStubs = null;
×
1343
    }
1344
  }
×
1345

1346
  /**
1347
   * Creates an out-of-process rather than in-process server, and does not set up a client. Useful,
1348
   * for example, if you want to use the test service from other SDKs.
1349
   *
1350
   * @param port the port to listen on
1351
   * @deprecated use {@link io.temporal.testserver.TestServer#createPortBoundServer(int, boolean)}
1352
   *     instead and pass {@code lockTimeSkipping=false} to emulate the behavior of this method
1353
   */
1354
  @Deprecated
1355
  public static TestWorkflowService createServerOnly(int port) {
1356
    TestWorkflowService result = new TestWorkflowService(true, port);
×
1357
    log.info("Server started, listening on " + port);
×
1358
    return result;
×
1359
  }
1360

1361
  private TestWorkflowService(boolean isOutOfProc, int port) {
×
1362
    // isOutOfProc is just here to make unambiguous constructor overloading.
1363
    Preconditions.checkState(isOutOfProc, "Impossible.");
×
1364
    inProcessServer = null;
×
1365
    workflowServiceStubs = null;
×
1366
    this.selfAdvancingTimer = new SelfAdvancingTimerImpl(0, Clock.systemDefaultZone());
×
1367
    store = new TestWorkflowStoreImpl(selfAdvancingTimer);
×
1368
    visibilityStore = new TestVisibilityStoreImpl();
×
1369
    try {
1370
      ServerBuilder<?> serverBuilder =
×
1371
          Grpc.newServerBuilderForPort(port, InsecureServerCredentials.create());
×
1372
      GRPCServerHelper.registerServicesAndHealthChecks(
×
1373
          Collections.singletonList(this), serverBuilder);
×
1374
      outOfProcessServer = serverBuilder.build().start();
×
1375
    } catch (IOException e) {
×
1376
      throw new RuntimeException(e);
×
1377
    }
×
1378
  }
×
1379

1380
  @Deprecated
1381
  public WorkflowServiceStubs newClientStub() {
1382
    if (workflowServiceStubs == null) {
×
1383
      throw new RuntimeException(
×
1384
          "Cannot get a client when you created your TestWorkflowService with createServerOnly.");
1385
    }
1386
    return workflowServiceStubs;
×
1387
  }
1388

1389
  private static StatusRuntimeException createInvalidArgument(String description) {
1390
    throw Status.INVALID_ARGUMENT.withDescription(description).asRuntimeException();
1✔
1391
  }
1392
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc