• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

temporalio / sdk-java / #333

16 Oct 2024 07:28PM UTC coverage: 78.65% (+0.6%) from 78.085%
#333

push

github

web-flow
Fix code coverage (#2275)

22670 of 28824 relevant lines covered (78.65%)

0.79 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

75.73
/temporal-test-server/src/main/java/io/temporal/internal/testservice/TestWorkflowService.java
1
/*
2
 * Copyright (C) 2022 Temporal Technologies, Inc. All Rights Reserved.
3
 *
4
 * Copyright (C) 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved.
5
 *
6
 * Modifications copyright (C) 2017 Uber Technologies, Inc.
7
 *
8
 * Licensed under the Apache License, Version 2.0 (the "License");
9
 * you may not use this material except in compliance with the License.
10
 * You may obtain a copy of the License at
11
 *
12
 *   http://www.apache.org/licenses/LICENSE-2.0
13
 *
14
 * Unless required by applicable law or agreed to in writing, software
15
 * distributed under the License is distributed on an "AS IS" BASIS,
16
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17
 * See the License for the specific language governing permissions and
18
 * limitations under the License.
19
 */
20

21
package io.temporal.internal.testservice;
22

23
import static io.temporal.api.enums.v1.UpdateWorkflowExecutionLifecycleStage.UPDATE_WORKFLOW_EXECUTION_LIFECYCLE_STAGE_COMPLETED;
24
import static io.temporal.api.workflowservice.v1.ExecuteMultiOperationRequest.Operation.OperationCase.START_WORKFLOW;
25
import static io.temporal.api.workflowservice.v1.ExecuteMultiOperationRequest.Operation.OperationCase.UPDATE_WORKFLOW;
26
import static io.temporal.internal.testservice.CronUtils.getBackoffInterval;
27

28
import com.google.common.base.Preconditions;
29
import com.google.common.base.Throwables;
30
import com.google.protobuf.ByteString;
31
import com.google.protobuf.Empty;
32
import com.google.protobuf.Timestamp;
33
import com.google.protobuf.util.Timestamps;
34
import io.grpc.*;
35
import io.grpc.stub.StreamObserver;
36
import io.nexusrpc.Header;
37
import io.temporal.api.command.v1.ContinueAsNewWorkflowExecutionCommandAttributes;
38
import io.temporal.api.command.v1.SignalExternalWorkflowExecutionCommandAttributes;
39
import io.temporal.api.common.v1.Payload;
40
import io.temporal.api.common.v1.Payloads;
41
import io.temporal.api.common.v1.RetryPolicy;
42
import io.temporal.api.common.v1.WorkflowExecution;
43
import io.temporal.api.enums.v1.*;
44
import io.temporal.api.errordetails.v1.MultiOperationExecutionFailure;
45
import io.temporal.api.errordetails.v1.WorkflowExecutionAlreadyStartedFailure;
46
import io.temporal.api.failure.v1.ApplicationFailureInfo;
47
import io.temporal.api.failure.v1.CanceledFailureInfo;
48
import io.temporal.api.failure.v1.Failure;
49
import io.temporal.api.failure.v1.MultiOperationExecutionAborted;
50
import io.temporal.api.history.v1.HistoryEvent;
51
import io.temporal.api.history.v1.WorkflowExecutionContinuedAsNewEventAttributes;
52
import io.temporal.api.namespace.v1.NamespaceInfo;
53
import io.temporal.api.nexus.v1.HandlerError;
54
import io.temporal.api.nexus.v1.Request;
55
import io.temporal.api.nexus.v1.StartOperationResponse;
56
import io.temporal.api.nexus.v1.UnsuccessfulOperationError;
57
import io.temporal.api.testservice.v1.LockTimeSkippingRequest;
58
import io.temporal.api.testservice.v1.SleepRequest;
59
import io.temporal.api.testservice.v1.TestServiceGrpc;
60
import io.temporal.api.testservice.v1.UnlockTimeSkippingRequest;
61
import io.temporal.api.workflow.v1.WorkflowExecutionInfo;
62
import io.temporal.api.workflowservice.v1.*;
63
import io.temporal.internal.common.ProtoUtils;
64
import io.temporal.internal.common.ProtobufTimeUtils;
65
import io.temporal.internal.testservice.TestWorkflowStore.WorkflowState;
66
import io.temporal.serviceclient.StatusUtils;
67
import io.temporal.serviceclient.TestServiceStubs;
68
import io.temporal.serviceclient.WorkflowServiceStubs;
69
import io.temporal.serviceclient.WorkflowServiceStubsOptions;
70
import io.temporal.testserver.TestServer;
71
import java.io.Closeable;
72
import java.io.IOException;
73
import java.time.Clock;
74
import java.time.Duration;
75
import java.util.*;
76
import java.util.concurrent.*;
77
import java.util.concurrent.atomic.AtomicReference;
78
import java.util.concurrent.locks.Lock;
79
import java.util.concurrent.locks.ReentrantLock;
80
import java.util.function.Consumer;
81
import java.util.stream.Collectors;
82
import javax.annotation.Nonnull;
83
import javax.annotation.Nullable;
84
import org.slf4j.Logger;
85
import org.slf4j.LoggerFactory;
86

87
/**
88
 * In memory implementation of the Workflow Service. To be used for testing purposes only.
89
 *
90
 * <p>Do not use directly, instead use {@link io.temporal.testing.TestWorkflowEnvironment}.
91
 */
92
public final class TestWorkflowService extends WorkflowServiceGrpc.WorkflowServiceImplBase
93
    implements Closeable {
94
  private static final Logger log = LoggerFactory.getLogger(TestWorkflowService.class);
1✔
95
  private final Map<ExecutionId, TestWorkflowMutableState> executions = new HashMap<>();
1✔
96
  // key->WorkflowId
97
  private final Map<WorkflowId, TestWorkflowMutableState> executionsByWorkflowId = new HashMap<>();
1✔
98
  private final ExecutorService executor = Executors.newCachedThreadPool();
1✔
99
  private final Lock lock = new ReentrantLock();
1✔
100

101
  private final TestWorkflowStore store;
102
  private final TestVisibilityStore visibilityStore;
103
  private final TestNexusEndpointStore nexusEndpointStore;
104
  private final SelfAdvancingTimer selfAdvancingTimer;
105

106
  private final ScheduledExecutorService backgroundScheduler =
1✔
107
      Executors.newSingleThreadScheduledExecutor();
1✔
108

109
  private final Server outOfProcessServer;
110
  private final InProcessGRPCServer inProcessServer;
111
  private final WorkflowServiceStubs workflowServiceStubs;
112

113
  TestWorkflowService(
114
      TestWorkflowStore store,
115
      TestVisibilityStore visibilityStore,
116
      TestNexusEndpointStore nexusEndpointStore,
117
      SelfAdvancingTimer selfAdvancingTimer) {
1✔
118
    this.store = store;
1✔
119
    this.visibilityStore = visibilityStore;
1✔
120
    this.nexusEndpointStore = nexusEndpointStore;
1✔
121
    this.selfAdvancingTimer = selfAdvancingTimer;
1✔
122
    this.outOfProcessServer = null;
1✔
123
    this.inProcessServer = null;
1✔
124
    this.workflowServiceStubs = null;
1✔
125
  }
1✔
126

127
  @Override
128
  public void close() {
129
    log.debug("Shutting down TestWorkflowService");
1✔
130

131
    log.debug("Shutting down background scheduler");
1✔
132
    backgroundScheduler.shutdown();
1✔
133

134
    if (outOfProcessServer != null) {
1✔
135
      log.info("Shutting down out-of-process GRPC server");
×
136
      outOfProcessServer.shutdown();
×
137
    }
138

139
    if (workflowServiceStubs != null) {
1✔
140
      workflowServiceStubs.shutdown();
×
141
    }
142

143
    if (inProcessServer != null) {
1✔
144
      log.info("Shutting down in-process GRPC server");
×
145
      inProcessServer.shutdown();
×
146
    }
147

148
    executor.shutdown();
1✔
149

150
    try {
151
      executor.awaitTermination(1, TimeUnit.SECONDS);
1✔
152

153
      if (outOfProcessServer != null) {
1✔
154
        outOfProcessServer.awaitTermination(1, TimeUnit.SECONDS);
×
155
      }
156

157
      if (workflowServiceStubs != null) {
1✔
158
        workflowServiceStubs.awaitTermination(1, TimeUnit.SECONDS);
×
159
      }
160

161
      if (inProcessServer != null) {
1✔
162
        inProcessServer.awaitTermination(1, TimeUnit.SECONDS);
×
163
      }
164

165
    } catch (InterruptedException e) {
×
166
      Thread.currentThread().interrupt();
×
167
      log.debug("shutdown interrupted", e);
×
168
    }
1✔
169

170
    store.close();
1✔
171
  }
1✔
172

173
  private TestWorkflowMutableState getMutableState(ExecutionId executionId) {
174
    return getMutableState(executionId, true);
1✔
175
  }
176

177
  private TestWorkflowMutableState getMutableState(ExecutionId executionId, boolean failNotExists) {
178
    lock.lock();
1✔
179
    try {
180
      if (executionId.getExecution().getRunId().isEmpty()) {
1✔
181
        return getMutableState(executionId.getWorkflowId(), failNotExists);
1✔
182
      }
183
      TestWorkflowMutableState mutableState = executions.get(executionId);
1✔
184
      if (mutableState == null && failNotExists) {
1✔
185
        throw Status.NOT_FOUND
1✔
186
            .withDescription(
1✔
187
                "Execution \""
188
                    + executionId
189
                    + "\" not found in mutable state. Known executions: "
190
                    + executions.values()
1✔
191
                    + ", service="
192
                    + this)
193
            .asRuntimeException();
1✔
194
      }
195
      return mutableState;
1✔
196
    } finally {
197
      lock.unlock();
1✔
198
    }
199
  }
200

201
  private TestWorkflowMutableState getMutableState(WorkflowId workflowId, boolean failNotExists) {
202
    lock.lock();
1✔
203
    try {
204
      TestWorkflowMutableState mutableState = executionsByWorkflowId.get(workflowId);
1✔
205
      if (mutableState == null && failNotExists) {
1✔
206
        throw Status.NOT_FOUND
1✔
207
            .withDescription("Execution not found in mutable state: " + workflowId)
1✔
208
            .asRuntimeException();
1✔
209
      }
210
      return mutableState;
1✔
211
    } finally {
212
      lock.unlock();
1✔
213
    }
214
  }
215

216
  @Override
217
  public void startWorkflowExecution(
218
      StartWorkflowExecutionRequest request,
219
      StreamObserver<StartWorkflowExecutionResponse> responseObserver) {
220
    try {
221
      Duration backoffInterval = getBackoffInterval(request.getCronSchedule(), store.currentTime());
1✔
222
      StartWorkflowExecutionResponse response =
1✔
223
          startWorkflowExecutionImpl(
1✔
224
              request, backoffInterval, Optional.empty(), OptionalLong.empty(), null);
1✔
225
      responseObserver.onNext(response);
1✔
226
      responseObserver.onCompleted();
1✔
227
    } catch (StatusRuntimeException e) {
1✔
228
      handleStatusRuntimeException(e, responseObserver);
1✔
229
    }
1✔
230
  }
1✔
231

232
  StartWorkflowExecutionResponse startWorkflowExecutionImpl(
233
      StartWorkflowExecutionRequest startRequest,
234
      Duration backoffStartInterval,
235
      Optional<TestWorkflowMutableState> parent,
236
      OptionalLong parentChildInitiatedEventId,
237
      @Nullable Consumer<TestWorkflowMutableState> withStart) {
238
    String requestWorkflowId = requireNotNull("WorkflowId", startRequest.getWorkflowId());
1✔
239
    String namespace = requireNotNull("Namespace", startRequest.getNamespace());
1✔
240
    WorkflowId workflowId = new WorkflowId(namespace, requestWorkflowId);
1✔
241
    WorkflowIdReusePolicy reusePolicy = startRequest.getWorkflowIdReusePolicy();
1✔
242
    WorkflowIdConflictPolicy conflictPolicy = startRequest.getWorkflowIdConflictPolicy();
1✔
243
    if (conflictPolicy != WorkflowIdConflictPolicy.WORKFLOW_ID_CONFLICT_POLICY_UNSPECIFIED
1✔
244
        && reusePolicy == WorkflowIdReusePolicy.WORKFLOW_ID_REUSE_POLICY_TERMINATE_IF_RUNNING) {
245
      throw createInvalidArgument(
×
246
          "Invalid WorkflowIDReusePolicy: WORKFLOW_ID_REUSE_POLICY_TERMINATE_IF_RUNNING cannot be used together with a WorkflowIDConflictPolicy.");
247
    }
248

249
    TestWorkflowMutableState existing;
250
    lock.lock();
1✔
251
    try {
252
      String newRunId = UUID.randomUUID().toString();
1✔
253
      existing = executionsByWorkflowId.get(workflowId);
1✔
254
      if (existing != null) {
1✔
255
        WorkflowExecutionStatus status = existing.getWorkflowExecutionStatus();
1✔
256

257
        if (status == WorkflowExecutionStatus.WORKFLOW_EXECUTION_STATUS_RUNNING
1✔
258
            && (reusePolicy == WorkflowIdReusePolicy.WORKFLOW_ID_REUSE_POLICY_TERMINATE_IF_RUNNING
259
                || conflictPolicy
260
                    == WorkflowIdConflictPolicy.WORKFLOW_ID_CONFLICT_POLICY_TERMINATE_EXISTING)) {
261
          existing.terminateWorkflowExecution(
1✔
262
              TerminateWorkflowExecutionRequest.newBuilder()
1✔
263
                  .setNamespace(startRequest.getNamespace())
1✔
264
                  .setWorkflowExecution(existing.getExecutionId().getExecution())
1✔
265
                  .setReason("TerminateIfRunning WorkflowIdReusePolicy Policy")
1✔
266
                  .setIdentity("history-service")
1✔
267
                  .setDetails(
1✔
268
                      Payloads.newBuilder()
1✔
269
                          .addPayloads(
1✔
270
                              Payload.newBuilder()
1✔
271
                                  .setData(
1✔
272
                                      ByteString.copyFromUtf8(
1✔
273
                                          String.format("terminated by new runID: %s", newRunId)))
1✔
274
                                  .build())
1✔
275
                          .build())
1✔
276
                  .build());
1✔
277
        } else if (status == WorkflowExecutionStatus.WORKFLOW_EXECUTION_STATUS_RUNNING
1✔
278
            && conflictPolicy
279
                == WorkflowIdConflictPolicy.WORKFLOW_ID_CONFLICT_POLICY_USE_EXISTING) {
280
          return StartWorkflowExecutionResponse.newBuilder()
1✔
281
              .setStarted(false)
1✔
282
              .setRunId(existing.getExecutionId().getExecution().getRunId())
1✔
283
              .build();
1✔
284
        } else if (status == WorkflowExecutionStatus.WORKFLOW_EXECUTION_STATUS_RUNNING
1✔
285
            || reusePolicy == WorkflowIdReusePolicy.WORKFLOW_ID_REUSE_POLICY_REJECT_DUPLICATE) {
286
          return throwDuplicatedWorkflow(startRequest, existing);
×
287
        } else if (reusePolicy
1✔
288
                == WorkflowIdReusePolicy.WORKFLOW_ID_REUSE_POLICY_ALLOW_DUPLICATE_FAILED_ONLY
289
            && (status == WorkflowExecutionStatus.WORKFLOW_EXECUTION_STATUS_COMPLETED
290
                || status == WorkflowExecutionStatus.WORKFLOW_EXECUTION_STATUS_CONTINUED_AS_NEW)) {
291
          return throwDuplicatedWorkflow(startRequest, existing);
×
292
        }
293
      }
294

295
      Optional<TestServiceRetryState> retryState;
296
      Optional<Failure> lastFailure = Optional.empty();
1✔
297
      if (startRequest.hasRetryPolicy()) {
1✔
298
        Duration expirationInterval =
1✔
299
            ProtobufTimeUtils.toJavaDuration(startRequest.getWorkflowExecutionTimeout());
1✔
300
        retryState = newRetryStateLocked(startRequest.getRetryPolicy(), expirationInterval);
1✔
301
        if (retryState.isPresent()) {
1✔
302
          lastFailure = retryState.get().getPreviousRunFailure();
1✔
303
        }
304
      } else {
1✔
305
        retryState = Optional.empty();
1✔
306
      }
307

308
      return startWorkflowExecutionNoRunningCheckLocked(
1✔
309
          startRequest,
310
          newRunId,
311
          // it's the first execution in the continue-as-new chain, so firstExecutionRunId =
312
          // newRunId
313
          newRunId,
314
          Optional.empty(),
1✔
315
          retryState,
316
          backoffStartInterval,
317
          null,
318
          lastFailure,
319
          parent,
320
          parentChildInitiatedEventId,
321
          withStart,
322
          workflowId);
323
    } finally {
324
      lock.unlock();
1✔
325
    }
326
  }
327

328
  private Optional<TestServiceRetryState> newRetryStateLocked(
329
      RetryPolicy retryPolicy, Duration expirationInterval) {
330
    Timestamp expirationTime =
331
        expirationInterval.isZero()
1✔
332
            ? Timestamps.fromNanos(0)
1✔
333
            : Timestamps.add(
1✔
334
                store.currentTime(), ProtobufTimeUtils.toProtoDuration(expirationInterval));
1✔
335
    return Optional.of(new TestServiceRetryState(retryPolicy, expirationTime));
1✔
336
  }
337

338
  private StartWorkflowExecutionResponse throwDuplicatedWorkflow(
339
      StartWorkflowExecutionRequest startRequest, TestWorkflowMutableState existing) {
340
    WorkflowExecution execution = existing.getExecutionId().getExecution();
1✔
341
    WorkflowExecutionAlreadyStartedFailure error =
342
        WorkflowExecutionAlreadyStartedFailure.newBuilder()
1✔
343
            .setRunId(execution.getRunId())
1✔
344
            .setStartRequestId(startRequest.getRequestId())
1✔
345
            .build();
1✔
346
    throw StatusUtils.newException(
1✔
347
        Status.ALREADY_EXISTS.withDescription(
1✔
348
            String.format(
1✔
349
                "WorkflowId: %s, " + "RunId: %s", execution.getWorkflowId(), execution.getRunId())),
1✔
350
        error,
351
        WorkflowExecutionAlreadyStartedFailure.getDescriptor());
1✔
352
  }
353

354
  private StartWorkflowExecutionResponse startWorkflowExecutionNoRunningCheckLocked(
355
      StartWorkflowExecutionRequest startRequest,
356
      @Nonnull String runId,
357
      @Nonnull String firstExecutionRunId,
358
      Optional<String> continuedExecutionRunId,
359
      Optional<TestServiceRetryState> retryState,
360
      Duration backoffStartInterval,
361
      Payloads lastCompletionResult,
362
      Optional<Failure> lastFailure,
363
      Optional<TestWorkflowMutableState> parent,
364
      OptionalLong parentChildInitiatedEventId,
365
      @Nullable Consumer<TestWorkflowMutableState> withStart,
366
      WorkflowId workflowId) {
367
    String namespace = startRequest.getNamespace();
1✔
368
    TestWorkflowMutableState mutableState =
1✔
369
        new TestWorkflowMutableStateImpl(
370
            startRequest,
371
            firstExecutionRunId,
372
            runId,
373
            retryState,
374
            backoffStartInterval,
375
            lastCompletionResult,
376
            lastFailure,
377
            parent,
378
            parentChildInitiatedEventId,
379
            continuedExecutionRunId,
380
            this,
381
            store,
382
            visibilityStore,
383
            nexusEndpointStore,
384
            selfAdvancingTimer);
385
    WorkflowExecution execution = mutableState.getExecutionId().getExecution();
1✔
386
    ExecutionId executionId = new ExecutionId(namespace, execution);
1✔
387
    executionsByWorkflowId.put(workflowId, mutableState);
1✔
388
    executions.put(executionId, mutableState);
1✔
389

390
    PollWorkflowTaskQueueRequest eagerWorkflowTaskPollRequest =
391
        startRequest.getRequestEagerExecution()
1✔
392
            ? PollWorkflowTaskQueueRequest.newBuilder()
1✔
393
                .setIdentity(startRequest.getIdentity())
1✔
394
                .setNamespace(startRequest.getNamespace())
1✔
395
                .setTaskQueue(startRequest.getTaskQueue())
1✔
396
                .build()
1✔
397
            : null;
1✔
398

399
    @Nullable
400
    PollWorkflowTaskQueueResponse eagerWorkflowTask =
1✔
401
        mutableState.startWorkflow(
1✔
402
            continuedExecutionRunId.isPresent(), eagerWorkflowTaskPollRequest, withStart);
1✔
403
    StartWorkflowExecutionResponse.Builder response =
404
        StartWorkflowExecutionResponse.newBuilder().setRunId(execution.getRunId()).setStarted(true);
1✔
405
    if (eagerWorkflowTask != null) {
1✔
406
      response.setEagerWorkflowTask(eagerWorkflowTask);
1✔
407
    }
408
    return response.build();
1✔
409
  }
410

411
  @Override
412
  public void getWorkflowExecutionHistory(
413
      GetWorkflowExecutionHistoryRequest getRequest,
414
      StreamObserver<GetWorkflowExecutionHistoryResponse> responseObserver) {
415
    ExecutionId executionId = new ExecutionId(getRequest.getNamespace(), getRequest.getExecution());
1✔
416
    executor.execute(
1✔
417
        // preserving gRPC context deadline between threads
418
        Context.current()
1✔
419
            .wrap(
1✔
420
                () -> {
421
                  try {
422
                    TestWorkflowMutableState mutableState = getMutableState(executionId);
1✔
423
                    responseObserver.onNext(
1✔
424
                        store.getWorkflowExecutionHistory(
1✔
425
                            mutableState.getExecutionId(),
1✔
426
                            getRequest,
427
                            // We explicitly don't try to respond inside the context deadline.
428
                            // If we try to fit into the context deadline, the deadline may be not
429
                            // expired on the client side and an empty response will lead to a new
430
                            // request, making the client hammer the server at the tail end of the
431
                            // deadline.
432
                            // So this call is designed to wait fully till the end of the
433
                            // context deadline and throw DEADLINE_EXCEEDED if the deadline is less
434
                            // than 20s.
435
                            // If it's longer than 20 seconds - we return an empty result.
436
                            Deadline.after(20, TimeUnit.SECONDS)));
1✔
437
                    responseObserver.onCompleted();
1✔
438
                  } catch (StatusRuntimeException e) {
1✔
439
                    if (e.getStatus().getCode() == Status.Code.INTERNAL) {
1✔
440
                      log.error("unexpected", e);
×
441
                    }
442
                    responseObserver.onError(e);
1✔
443
                  } catch (Exception e) {
×
444
                    log.error("unexpected", e);
×
445
                    responseObserver.onError(e);
×
446
                  }
1✔
447
                }));
1✔
448
  }
1✔
449

450
  private <T> T pollTaskQueue(Context ctx, Future<T> futureValue)
451
      throws ExecutionException, InterruptedException {
452
    final Context.CancellationListener canceler = context -> futureValue.cancel(true);
1✔
453
    ctx.addListener(canceler, this.backgroundScheduler);
1✔
454
    try {
455
      return futureValue.get();
1✔
456
    } finally {
457
      ctx.removeListener(canceler);
1✔
458
    }
459
  }
460

461
  @Override
462
  public void pollWorkflowTaskQueue(
463
      PollWorkflowTaskQueueRequest pollRequest,
464
      StreamObserver<PollWorkflowTaskQueueResponse> responseObserver) {
465
    try (Context.CancellableContext ctx = deadlineCtx(getLongPollDeadline())) {
1✔
466
      PollWorkflowTaskQueueResponse.Builder task;
467
      try {
468
        task = pollTaskQueue(ctx, store.pollWorkflowTaskQueue(pollRequest));
1✔
469
      } catch (ExecutionException e) {
×
470
        responseObserver.onError(e);
×
471
        return;
×
472
      } catch (InterruptedException e) {
×
473
        Thread.currentThread().interrupt();
×
474
        responseObserver.onNext(PollWorkflowTaskQueueResponse.getDefaultInstance());
×
475
        responseObserver.onCompleted();
×
476
        return;
×
477
      } catch (CancellationException e) {
1✔
478
        responseObserver.onNext(PollWorkflowTaskQueueResponse.getDefaultInstance());
1✔
479
        responseObserver.onCompleted();
1✔
480
        return;
1✔
481
      }
1✔
482

483
      ExecutionId executionId =
1✔
484
          new ExecutionId(pollRequest.getNamespace(), task.getWorkflowExecution());
1✔
485
      TestWorkflowMutableState mutableState = getMutableState(executionId);
1✔
486
      try {
487
        mutableState.startWorkflowTask(task, pollRequest);
1✔
488
        // The task always has the original task queue that was created as part of the response.
489
        // This may be a different task queue than the task queue it was scheduled on, as in the
490
        // case of sticky execution.
491
        task.setWorkflowExecutionTaskQueue(mutableState.getStartRequest().getTaskQueue());
1✔
492
        PollWorkflowTaskQueueResponse response = task.build();
1✔
493
        responseObserver.onNext(response);
1✔
494
        responseObserver.onCompleted();
1✔
495
      } catch (StatusRuntimeException e) {
×
496
        if (e.getStatus().getCode() == Status.Code.NOT_FOUND) {
×
497
          if (log.isDebugEnabled()) {
×
498
            log.debug("Skipping outdated workflow task for " + executionId, e);
×
499
          }
500
          // The real service doesn't return this call on outdated task.
501
          // For simplicity, we return an empty result here.
502
          responseObserver.onNext(PollWorkflowTaskQueueResponse.getDefaultInstance());
×
503
          responseObserver.onCompleted();
×
504
        } else {
505
          if (e.getStatus().getCode() == Status.Code.INTERNAL) {
×
506
            log.error("unexpected", e);
×
507
          }
508
          responseObserver.onError(e);
×
509
        }
510
      }
1✔
511
    }
1✔
512
  }
1✔
513

514
  @Override
515
  public void respondWorkflowTaskCompleted(
516
      RespondWorkflowTaskCompletedRequest request,
517
      StreamObserver<RespondWorkflowTaskCompletedResponse> responseObserver) {
518
    try {
519
      WorkflowTaskToken taskToken = WorkflowTaskToken.fromBytes(request.getTaskToken());
1✔
520
      TestWorkflowMutableState mutableState = getMutableState(taskToken.getExecutionId());
1✔
521
      mutableState.completeWorkflowTask(taskToken.getHistorySize(), request);
1✔
522
      responseObserver.onNext(RespondWorkflowTaskCompletedResponse.getDefaultInstance());
1✔
523
      responseObserver.onCompleted();
1✔
524
    } catch (StatusRuntimeException e) {
1✔
525
      handleStatusRuntimeException(e, responseObserver);
1✔
526
    } catch (Throwable e) {
×
527
      responseObserver.onError(
×
528
          Status.INTERNAL
529
              .withDescription(Throwables.getStackTraceAsString(e))
×
530
              .withCause(e)
×
531
              .asRuntimeException());
×
532
    }
1✔
533
  }
1✔
534

535
  @Override
536
  public void respondWorkflowTaskFailed(
537
      RespondWorkflowTaskFailedRequest failedRequest,
538
      StreamObserver<RespondWorkflowTaskFailedResponse> responseObserver) {
539
    try {
540
      WorkflowTaskToken taskToken = WorkflowTaskToken.fromBytes(failedRequest.getTaskToken());
1✔
541
      TestWorkflowMutableState mutableState = getMutableState(taskToken.getExecutionId());
1✔
542
      mutableState.failWorkflowTask(failedRequest);
1✔
543
      responseObserver.onNext(RespondWorkflowTaskFailedResponse.getDefaultInstance());
1✔
544
      responseObserver.onCompleted();
1✔
545
    } catch (StatusRuntimeException e) {
×
546
      handleStatusRuntimeException(e, responseObserver);
×
547
    }
1✔
548
  }
1✔
549

550
  @Override
551
  public void getSystemInfo(
552
      GetSystemInfoRequest request, StreamObserver<GetSystemInfoResponse> responseObserver) {
553
    responseObserver.onNext(
1✔
554
        GetSystemInfoResponse.newBuilder()
1✔
555
            .setCapabilities(
1✔
556
                // These are the capabilities I could verify the test server supports
557
                GetSystemInfoResponse.Capabilities.newBuilder()
1✔
558
                    .setSdkMetadata(true)
1✔
559
                    .setSignalAndQueryHeader(true)
1✔
560
                    .setEncodedFailureAttributes(true)
1✔
561
                    .setEagerWorkflowStart(true)
1✔
562
                    .setUpsertMemo(true)
1✔
563
                    .setNexus(true)
1✔
564
                    .build())
1✔
565
            .build());
1✔
566
    responseObserver.onCompleted();
1✔
567
  }
1✔
568

569
  private Context.CancellableContext deadlineCtx(Deadline deadline) {
570
    return Context.current().withDeadline(deadline, this.backgroundScheduler);
1✔
571
  }
572

573
  @Override
574
  public void pollActivityTaskQueue(
575
      PollActivityTaskQueueRequest pollRequest,
576
      StreamObserver<PollActivityTaskQueueResponse> responseObserver) {
577
    try (Context.CancellableContext ctx = deadlineCtx(getLongPollDeadline())) {
1✔
578

579
      PollActivityTaskQueueResponse.Builder task;
580
      try {
581
        task = pollTaskQueue(ctx, store.pollActivityTaskQueue(pollRequest));
1✔
582
      } catch (ExecutionException e) {
×
583
        responseObserver.onError(e);
×
584
        return;
×
585
      } catch (InterruptedException e) {
×
586
        Thread.currentThread().interrupt();
×
587
        responseObserver.onNext(PollActivityTaskQueueResponse.getDefaultInstance());
×
588
        responseObserver.onCompleted();
×
589
        return;
×
590
      } catch (CancellationException e) {
1✔
591
        responseObserver.onNext(PollActivityTaskQueueResponse.getDefaultInstance());
1✔
592
        responseObserver.onCompleted();
1✔
593
        return;
1✔
594
      }
1✔
595

596
      ExecutionId executionId =
1✔
597
          new ExecutionId(pollRequest.getNamespace(), task.getWorkflowExecution());
1✔
598
      TestWorkflowMutableState mutableState = getMutableState(executionId);
1✔
599
      try {
600
        mutableState.startActivityTask(task, pollRequest);
1✔
601
        responseObserver.onNext(task.build());
1✔
602
        responseObserver.onCompleted();
1✔
603
      } catch (StatusRuntimeException e) {
1✔
604
        if (e.getStatus().getCode() == Status.Code.NOT_FOUND) {
1✔
605
          if (log.isDebugEnabled()) {
1✔
606
            log.debug("Skipping outdated activity task for " + executionId, e);
×
607
          }
608
          responseObserver.onNext(PollActivityTaskQueueResponse.getDefaultInstance());
1✔
609
          responseObserver.onCompleted();
1✔
610
        } else {
611
          if (e.getStatus().getCode() == Status.Code.INTERNAL) {
×
612
            log.error("unexpected", e);
×
613
          }
614
          responseObserver.onError(e);
×
615
        }
616
      }
1✔
617
    }
1✔
618
  }
1✔
619

620
  @Override
621
  public void recordActivityTaskHeartbeat(
622
      RecordActivityTaskHeartbeatRequest heartbeatRequest,
623
      StreamObserver<RecordActivityTaskHeartbeatResponse> responseObserver) {
624
    try {
625
      ActivityTaskToken activityTaskToken =
1✔
626
          ActivityTaskToken.fromBytes(heartbeatRequest.getTaskToken());
1✔
627
      TestWorkflowMutableState mutableState = getMutableState(activityTaskToken.getExecutionId());
1✔
628
      boolean cancelRequested =
1✔
629
          mutableState.heartbeatActivityTask(
1✔
630
              activityTaskToken.getScheduledEventId(), heartbeatRequest.getDetails());
1✔
631
      responseObserver.onNext(
1✔
632
          RecordActivityTaskHeartbeatResponse.newBuilder()
1✔
633
              .setCancelRequested(cancelRequested)
1✔
634
              .build());
1✔
635
      responseObserver.onCompleted();
1✔
636
    } catch (StatusRuntimeException e) {
1✔
637
      handleStatusRuntimeException(e, responseObserver);
1✔
638
    }
1✔
639
  }
1✔
640

641
  @Override
642
  public void recordActivityTaskHeartbeatById(
643
      RecordActivityTaskHeartbeatByIdRequest heartbeatRequest,
644
      StreamObserver<RecordActivityTaskHeartbeatByIdResponse> responseObserver) {
645
    try {
646
      ExecutionId execution =
×
647
          new ExecutionId(
648
              heartbeatRequest.getNamespace(),
×
649
              heartbeatRequest.getWorkflowId(),
×
650
              heartbeatRequest.getRunId());
×
651
      TestWorkflowMutableState mutableState = getMutableState(execution);
×
652
      boolean cancelRequested =
×
653
          mutableState.heartbeatActivityTaskById(
×
654
              heartbeatRequest.getActivityId(),
×
655
              heartbeatRequest.getDetails(),
×
656
              heartbeatRequest.getIdentity());
×
657
      responseObserver.onNext(
×
658
          RecordActivityTaskHeartbeatByIdResponse.newBuilder()
×
659
              .setCancelRequested(cancelRequested)
×
660
              .build());
×
661
      responseObserver.onCompleted();
×
662
    } catch (StatusRuntimeException e) {
×
663
      handleStatusRuntimeException(e, responseObserver);
×
664
    }
×
665
  }
×
666

667
  @Override
668
  public void respondActivityTaskCompleted(
669
      RespondActivityTaskCompletedRequest completeRequest,
670
      StreamObserver<RespondActivityTaskCompletedResponse> responseObserver) {
671
    try {
672
      ActivityTaskToken activityTaskToken =
1✔
673
          ActivityTaskToken.fromBytes(completeRequest.getTaskToken());
1✔
674
      TestWorkflowMutableState mutableState = getMutableState(activityTaskToken.getExecutionId());
1✔
675
      mutableState.completeActivityTask(activityTaskToken.getScheduledEventId(), completeRequest);
1✔
676
      responseObserver.onNext(RespondActivityTaskCompletedResponse.getDefaultInstance());
1✔
677
      responseObserver.onCompleted();
1✔
678
    } catch (StatusRuntimeException e) {
1✔
679
      handleStatusRuntimeException(e, responseObserver);
1✔
680
    }
1✔
681
  }
1✔
682

683
  @Override
684
  public void respondActivityTaskCompletedById(
685
      RespondActivityTaskCompletedByIdRequest completeRequest,
686
      StreamObserver<RespondActivityTaskCompletedByIdResponse> responseObserver) {
687
    try {
688
      ExecutionId executionId =
×
689
          new ExecutionId(
690
              completeRequest.getNamespace(),
×
691
              completeRequest.getWorkflowId(),
×
692
              completeRequest.getRunId());
×
693
      TestWorkflowMutableState mutableState = getMutableState(executionId);
×
694
      mutableState.completeActivityTaskById(completeRequest.getActivityId(), completeRequest);
×
695
      responseObserver.onNext(RespondActivityTaskCompletedByIdResponse.getDefaultInstance());
×
696
      responseObserver.onCompleted();
×
697
    } catch (StatusRuntimeException e) {
×
698
      handleStatusRuntimeException(e, responseObserver);
×
699
    }
×
700
  }
×
701

702
  @Override
703
  public void respondActivityTaskFailed(
704
      RespondActivityTaskFailedRequest failRequest,
705
      StreamObserver<RespondActivityTaskFailedResponse> responseObserver) {
706
    try {
707
      ActivityTaskToken activityTaskToken = ActivityTaskToken.fromBytes(failRequest.getTaskToken());
1✔
708
      TestWorkflowMutableState mutableState = getMutableState(activityTaskToken.getExecutionId());
1✔
709
      mutableState.failActivityTask(activityTaskToken.getScheduledEventId(), failRequest);
1✔
710
      responseObserver.onNext(RespondActivityTaskFailedResponse.getDefaultInstance());
1✔
711
      responseObserver.onCompleted();
1✔
712
    } catch (StatusRuntimeException e) {
1✔
713
      handleStatusRuntimeException(e, responseObserver);
1✔
714
    }
1✔
715
  }
1✔
716

717
  @Override
718
  public void respondActivityTaskFailedById(
719
      RespondActivityTaskFailedByIdRequest failRequest,
720
      StreamObserver<RespondActivityTaskFailedByIdResponse> responseObserver) {
721
    try {
722
      ExecutionId executionId =
×
723
          new ExecutionId(
724
              failRequest.getNamespace(), failRequest.getWorkflowId(), failRequest.getRunId());
×
725
      TestWorkflowMutableState mutableState = getMutableState(executionId);
×
726
      mutableState.failActivityTaskById(failRequest.getActivityId(), failRequest);
×
727
      responseObserver.onNext(RespondActivityTaskFailedByIdResponse.getDefaultInstance());
×
728
      responseObserver.onCompleted();
×
729
    } catch (StatusRuntimeException e) {
×
730
      handleStatusRuntimeException(e, responseObserver);
×
731
    }
×
732
  }
×
733

734
  @Override
735
  public void respondActivityTaskCanceled(
736
      RespondActivityTaskCanceledRequest canceledRequest,
737
      StreamObserver<RespondActivityTaskCanceledResponse> responseObserver) {
738
    try {
739
      ActivityTaskToken activityTaskToken =
1✔
740
          ActivityTaskToken.fromBytes(canceledRequest.getTaskToken());
1✔
741
      TestWorkflowMutableState mutableState = getMutableState(activityTaskToken.getExecutionId());
1✔
742
      mutableState.cancelActivityTask(activityTaskToken.getScheduledEventId(), canceledRequest);
1✔
743
      responseObserver.onNext(RespondActivityTaskCanceledResponse.getDefaultInstance());
1✔
744
      responseObserver.onCompleted();
1✔
745
    } catch (StatusRuntimeException e) {
×
746
      handleStatusRuntimeException(e, responseObserver);
×
747
    }
1✔
748
  }
1✔
749

750
  @Override
751
  public void respondActivityTaskCanceledById(
752
      RespondActivityTaskCanceledByIdRequest canceledRequest,
753
      StreamObserver<RespondActivityTaskCanceledByIdResponse> responseObserver) {
754
    try {
755
      ExecutionId executionId =
×
756
          new ExecutionId(
757
              canceledRequest.getNamespace(),
×
758
              canceledRequest.getWorkflowId(),
×
759
              canceledRequest.getRunId());
×
760
      TestWorkflowMutableState mutableState = getMutableState(executionId);
×
761
      mutableState.cancelActivityTaskById(canceledRequest.getActivityId(), canceledRequest);
×
762
      responseObserver.onNext(RespondActivityTaskCanceledByIdResponse.getDefaultInstance());
×
763
      responseObserver.onCompleted();
×
764
    } catch (StatusRuntimeException e) {
×
765
      handleStatusRuntimeException(e, responseObserver);
×
766
    }
×
767
  }
×
768

769
  @Override
770
  public void pollNexusTaskQueue(
771
      PollNexusTaskQueueRequest request,
772
      StreamObserver<PollNexusTaskQueueResponse> responseObserver) {
773
    try (Context.CancellableContext ctx = deadlineCtx(getLongPollDeadline())) {
1✔
774
      TestWorkflowStore.NexusTask task;
775
      try {
776
        task = pollTaskQueue(ctx, store.pollNexusTaskQueue(request));
1✔
777
      } catch (ExecutionException e) {
×
778
        responseObserver.onError(e);
×
779
        return;
×
780
      } catch (InterruptedException e) {
×
781
        Thread.currentThread().interrupt();
×
782
        responseObserver.onNext(PollNexusTaskQueueResponse.getDefaultInstance());
×
783
        responseObserver.onCompleted();
×
784
        return;
×
785
      } catch (CancellationException e) {
1✔
786
        responseObserver.onNext(PollNexusTaskQueueResponse.getDefaultInstance());
1✔
787
        responseObserver.onCompleted();
1✔
788
        return;
1✔
789
      }
1✔
790

791
      String taskTimeout =
1✔
792
          String.valueOf(Timestamps.between(store.currentTime(), task.getDeadline()).getSeconds());
1✔
793
      Request.Builder req =
1✔
794
          task.getTask().getRequestBuilder().putHeader(Header.REQUEST_TIMEOUT, taskTimeout + "s");
1✔
795
      PollNexusTaskQueueResponse.Builder resp = task.getTask().setRequest(req);
1✔
796

797
      responseObserver.onNext(resp.build());
1✔
798
      responseObserver.onCompleted();
1✔
799
    }
1✔
800
  }
1✔
801

802
  @Override
803
  public void respondNexusTaskCompleted(
804
      RespondNexusTaskCompletedRequest request,
805
      StreamObserver<RespondNexusTaskCompletedResponse> responseObserver) {
806
    try {
807
      NexusTaskToken tt = NexusTaskToken.fromBytes(request.getTaskToken());
1✔
808
      TestWorkflowMutableState mutableState =
1✔
809
          getMutableState(tt.getOperationRef().getExecutionId());
1✔
810
      if (!mutableState.validateOperationTaskToken(tt)) {
1✔
811
        responseObserver.onNext(RespondNexusTaskCompletedResponse.getDefaultInstance());
1✔
812
        responseObserver.onCompleted();
1✔
813
        return;
1✔
814
      }
815

816
      if (request.getResponse().hasCancelOperation()) {
1✔
817
        Failure canceled =
818
            Failure.newBuilder()
1✔
819
                .setMessage("operation canceled")
1✔
820
                .setCanceledFailureInfo(CanceledFailureInfo.getDefaultInstance())
1✔
821
                .build();
1✔
822
        mutableState.cancelNexusOperation(tt.getOperationRef(), canceled);
1✔
823
      } else if (request.getResponse().hasStartOperation()) {
1✔
824
        StartOperationResponse startResp = request.getResponse().getStartOperation();
1✔
825
        if (startResp.hasOperationError()) {
1✔
826
          UnsuccessfulOperationError opError = startResp.getOperationError();
1✔
827
          Failure.Builder b = Failure.newBuilder().setMessage(opError.getFailure().getMessage());
1✔
828

829
          if (startResp.getOperationError().getOperationState().equals("canceled")) {
1✔
830
            b.setCanceledFailureInfo(
×
831
                CanceledFailureInfo.newBuilder()
×
832
                    .setDetails(nexusFailureMetadataToPayloads(opError.getFailure())));
×
833
            mutableState.cancelNexusOperation(tt.getOperationRef(), b.build());
×
834
          } else {
835
            b.setApplicationFailureInfo(
1✔
836
                ApplicationFailureInfo.newBuilder()
1✔
837
                    .setType("NexusOperationFailure")
1✔
838
                    .setDetails(nexusFailureMetadataToPayloads(opError.getFailure()))
1✔
839
                    .setNonRetryable(true));
1✔
840
            mutableState.failNexusOperation(tt.getOperationRef(), b.build());
1✔
841
          }
842
        } else if (startResp.hasAsyncSuccess()) {
1✔
843
          // Start event is only recorded for async success
844
          mutableState.startNexusOperation(
1✔
845
              tt.getOperationRef().getScheduledEventId(),
1✔
846
              request.getIdentity(),
1✔
847
              startResp.getAsyncSuccess());
1✔
848
        } else if (startResp.hasSyncSuccess()) {
1✔
849
          mutableState.completeNexusOperation(
1✔
850
              tt.getOperationRef(), startResp.getSyncSuccess().getPayload());
1✔
851
        } else {
852
          throw Status.INVALID_ARGUMENT
×
853
              .withDescription("Expected success or OperationError to be set on request.")
×
854
              .asRuntimeException();
×
855
        }
856
      } else {
1✔
857
        throw Status.INVALID_ARGUMENT
×
858
            .withDescription("Expected StartOperation or CancelOperation to be set on request.")
×
859
            .asRuntimeException();
×
860
      }
861
      responseObserver.onNext(RespondNexusTaskCompletedResponse.getDefaultInstance());
1✔
862
      responseObserver.onCompleted();
1✔
863
    } catch (StatusRuntimeException e) {
1✔
864
      handleStatusRuntimeException(e, responseObserver);
1✔
865
    }
1✔
866
  }
1✔
867

868
  @Override
869
  public void respondNexusTaskFailed(
870
      RespondNexusTaskFailedRequest request,
871
      StreamObserver<RespondNexusTaskFailedResponse> responseObserver) {
872
    try {
873
      if (!request.hasError()) {
1✔
874
        throw Status.INVALID_ARGUMENT
×
875
            .withDescription("Nexus handler error not set on RespondNexusTaskFailedRequest")
×
876
            .asRuntimeException();
×
877
      }
878
      NexusTaskToken tt = NexusTaskToken.fromBytes(request.getTaskToken());
1✔
879
      TestWorkflowMutableState mutableState =
1✔
880
          getMutableState(tt.getOperationRef().getExecutionId());
1✔
881
      if (mutableState.validateOperationTaskToken(tt)) {
1✔
882
        Failure failure = handlerErrorToFailure(request.getError());
1✔
883
        mutableState.failNexusOperation(tt.getOperationRef(), failure);
1✔
884
      }
885
      responseObserver.onNext(RespondNexusTaskFailedResponse.getDefaultInstance());
1✔
886
      responseObserver.onCompleted();
1✔
887
    } catch (StatusRuntimeException e) {
1✔
888
      handleStatusRuntimeException(e, responseObserver);
1✔
889
    }
1✔
890
  }
1✔
891

892
  public void completeNexusOperation(NexusOperationRef ref, HistoryEvent completionEvent) {
893
    TestWorkflowMutableState target = getMutableState(ref.getExecutionId());
1✔
894

895
    switch (completionEvent.getEventType()) {
1✔
896
      case EVENT_TYPE_WORKFLOW_EXECUTION_COMPLETED:
897
        Payloads result =
1✔
898
            completionEvent.getWorkflowExecutionCompletedEventAttributes().getResult();
1✔
899
        // All of our SDKs support returning a single value from workflows, we can safely ignore the
900
        // rest of the payloads. Additionally, even if a workflow could return more than a single
901
        // value,
902
        // Nexus does not support it.
903
        Payload p =
904
            (result.getPayloadsCount() > 0) ? result.getPayloads(0) : Payload.getDefaultInstance();
1✔
905
        target.completeNexusOperation(ref, p);
1✔
906
        break;
1✔
907
      case EVENT_TYPE_WORKFLOW_EXECUTION_FAILED:
908
        Failure f =
909
            Failure.newBuilder()
×
910
                .setMessage(
×
911
                    completionEvent
912
                        .getWorkflowExecutionFailedEventAttributes()
×
913
                        .getFailure()
×
914
                        .getMessage())
×
915
                .build();
×
916
        target.failNexusOperation(ref, f);
×
917
        break;
×
918
      case EVENT_TYPE_WORKFLOW_EXECUTION_CANCELED:
919
        Failure canceled =
920
            Failure.newBuilder()
1✔
921
                .setMessage("operation canceled")
1✔
922
                .setCanceledFailureInfo(CanceledFailureInfo.getDefaultInstance())
1✔
923
                .build();
1✔
924
        target.cancelNexusOperation(ref, canceled);
1✔
925
        break;
1✔
926
      case EVENT_TYPE_WORKFLOW_EXECUTION_TERMINATED:
927
        Failure terminated =
928
            Failure.newBuilder()
1✔
929
                .setMessage("operation terminated")
1✔
930
                .setApplicationFailureInfo(
1✔
931
                    ApplicationFailureInfo.newBuilder().setNonRetryable(true))
1✔
932
                .build();
1✔
933
        target.failNexusOperation(ref, terminated);
1✔
934
        break;
1✔
935
      case EVENT_TYPE_WORKFLOW_EXECUTION_TIMED_OUT:
936
        Failure timedOut =
937
            Failure.newBuilder()
1✔
938
                .setMessage("operation exceeded internal timeout")
1✔
939
                .setApplicationFailureInfo(
1✔
940
                    ApplicationFailureInfo.newBuilder().setNonRetryable(true))
1✔
941
                .build();
1✔
942
        target.failNexusOperation(ref, timedOut);
1✔
943
        break;
1✔
944
      default:
945
        throw Status.INTERNAL
×
946
            .withDescription("invalid workflow execution status: " + completionEvent.getEventType())
×
947
            .asRuntimeException();
×
948
    }
949
  }
1✔
950

951
  private static Failure handlerErrorToFailure(HandlerError err) {
952
    return Failure.newBuilder()
1✔
953
        .setMessage(err.getFailure().getMessage())
1✔
954
        .setApplicationFailureInfo(
1✔
955
            ApplicationFailureInfo.newBuilder()
1✔
956
                .setType(err.getErrorType())
1✔
957
                .setDetails(nexusFailureMetadataToPayloads(err.getFailure())))
1✔
958
        .build();
1✔
959
  }
960

961
  private static Payloads nexusFailureMetadataToPayloads(io.temporal.api.nexus.v1.Failure failure) {
962
    Map<String, ByteString> metadata =
1✔
963
        failure.getMetadataMap().entrySet().stream()
1✔
964
            .collect(
1✔
965
                Collectors.toMap(Map.Entry::getKey, e -> ByteString.copyFromUtf8(e.getValue())));
1✔
966
    return Payloads.newBuilder()
1✔
967
        .addPayloads(Payload.newBuilder().putAllMetadata(metadata).setData(failure.getDetails()))
1✔
968
        .build();
1✔
969
  }
970

971
  @Override
972
  public void requestCancelWorkflowExecution(
973
      RequestCancelWorkflowExecutionRequest cancelRequest,
974
      StreamObserver<RequestCancelWorkflowExecutionResponse> responseObserver) {
975
    try {
976
      requestCancelWorkflowExecution(cancelRequest, Optional.empty());
1✔
977
      responseObserver.onNext(RequestCancelWorkflowExecutionResponse.getDefaultInstance());
1✔
978
      responseObserver.onCompleted();
1✔
979
    } catch (StatusRuntimeException e) {
1✔
980
      handleStatusRuntimeException(e, responseObserver);
1✔
981
    }
1✔
982
  }
1✔
983

984
  void requestCancelWorkflowExecution(
985
      RequestCancelWorkflowExecutionRequest cancelRequest,
986
      Optional<TestWorkflowMutableStateImpl.CancelExternalWorkflowExecutionCallerInfo> callerInfo) {
987
    ExecutionId executionId =
1✔
988
        new ExecutionId(cancelRequest.getNamespace(), cancelRequest.getWorkflowExecution());
1✔
989
    TestWorkflowMutableState mutableState = getMutableState(executionId);
1✔
990
    mutableState.requestCancelWorkflowExecution(cancelRequest, callerInfo);
1✔
991
  }
1✔
992

993
  @Override
994
  public void terminateWorkflowExecution(
995
      TerminateWorkflowExecutionRequest request,
996
      StreamObserver<TerminateWorkflowExecutionResponse> responseObserver) {
997
    try {
998
      terminateWorkflowExecution(request);
1✔
999
      responseObserver.onNext(TerminateWorkflowExecutionResponse.getDefaultInstance());
1✔
1000
      responseObserver.onCompleted();
1✔
1001
    } catch (StatusRuntimeException e) {
1✔
1002
      handleStatusRuntimeException(e, responseObserver);
1✔
1003
    }
1✔
1004
  }
1✔
1005

1006
  private void terminateWorkflowExecution(TerminateWorkflowExecutionRequest request) {
1007
    ExecutionId executionId =
1✔
1008
        new ExecutionId(request.getNamespace(), request.getWorkflowExecution());
1✔
1009
    TestWorkflowMutableState mutableState = getMutableState(executionId);
1✔
1010
    mutableState.terminateWorkflowExecution(request);
1✔
1011
  }
1✔
1012

1013
  @Override
1014
  public void signalWorkflowExecution(
1015
      SignalWorkflowExecutionRequest signalRequest,
1016
      StreamObserver<SignalWorkflowExecutionResponse> responseObserver) {
1017
    try {
1018
      ExecutionId executionId =
1✔
1019
          new ExecutionId(signalRequest.getNamespace(), signalRequest.getWorkflowExecution());
1✔
1020
      TestWorkflowMutableState mutableState = getMutableState(executionId);
1✔
1021
      mutableState.signal(signalRequest);
1✔
1022
      responseObserver.onNext(SignalWorkflowExecutionResponse.getDefaultInstance());
1✔
1023
      responseObserver.onCompleted();
1✔
1024
    } catch (StatusRuntimeException e) {
1✔
1025
      handleStatusRuntimeException(e, responseObserver);
1✔
1026
    }
1✔
1027
  }
1✔
1028

1029
  @Override
1030
  public void updateWorkflowExecution(
1031
      UpdateWorkflowExecutionRequest request,
1032
      StreamObserver<UpdateWorkflowExecutionResponse> responseObserver) {
1033
    try (Context.CancellableContext ctx = deadlineCtx(getUpdatePollDeadline())) {
1✔
1034
      Context toRestore = ctx.attach();
1✔
1035
      try {
1036
        ExecutionId executionId =
1✔
1037
            new ExecutionId(request.getNamespace(), request.getWorkflowExecution());
1✔
1038
        TestWorkflowMutableState mutableState = getMutableState(executionId);
1✔
1039
        @Nullable Deadline deadline = Context.current().getDeadline();
1✔
1040
        TestWorkflowMutableStateImpl.UpdateHandle updateHandle =
1✔
1041
            mutableState.updateWorkflowExecution(request, deadline);
1✔
1042
        UpdateWorkflowExecutionResponse response =
1✔
1043
            waitForUpdateResponse(request, deadline, updateHandle);
1✔
1044
        responseObserver.onNext(response);
1✔
1045
        responseObserver.onCompleted();
1✔
1046
      } catch (StatusRuntimeException e) {
1✔
1047
        handleStatusRuntimeException(e, responseObserver);
1✔
1048
      } finally {
1049
        ctx.detach(toRestore);
1✔
1050
      }
1051
    }
1052
  }
1✔
1053

1054
  UpdateWorkflowExecutionResponse waitForUpdateResponse(
1055
      UpdateWorkflowExecutionRequest request,
1056
      Deadline deadline,
1057
      TestWorkflowMutableStateImpl.UpdateHandle updateHandle) {
1058
    try {
1059
      UpdateWorkflowExecutionLifecycleStage reachedStage =
1✔
1060
          updateHandle.waitForStage(
1✔
1061
              request.getWaitPolicy().getLifecycleStage(),
1✔
1062
              deadline.timeRemaining(TimeUnit.MILLISECONDS),
1✔
1063
              TimeUnit.MILLISECONDS);
1064
      UpdateWorkflowExecutionResponse.Builder response =
1065
          UpdateWorkflowExecutionResponse.newBuilder()
1✔
1066
              .setUpdateRef(updateHandle.getRef())
1✔
1067
              .setStage(reachedStage);
1✔
1068
      if (reachedStage == UPDATE_WORKFLOW_EXECUTION_LIFECYCLE_STAGE_COMPLETED) {
1✔
1069
        response.setOutcome(updateHandle.getOutcomeNow());
1✔
1070
      }
1071
      return response.build();
1✔
1072
    } catch (TimeoutException e) {
1✔
1073
      UpdateWorkflowExecutionLifecycleStage stage = updateHandle.getStage();
1✔
1074
      UpdateWorkflowExecutionResponse.Builder response =
1075
          UpdateWorkflowExecutionResponse.newBuilder()
1✔
1076
              .setUpdateRef(updateHandle.getRef())
1✔
1077
              .setStage(stage);
1✔
1078
      if (stage
1✔
1079
          == UpdateWorkflowExecutionLifecycleStage
1080
              .UPDATE_WORKFLOW_EXECUTION_LIFECYCLE_STAGE_COMPLETED) {
1081
        response.setOutcome(updateHandle.getOutcomeNow());
×
1082
      }
1083
      return response.build();
1✔
1084
    } catch (InterruptedException e) {
×
1085
      throw new RuntimeException(e);
×
1086
    } catch (ExecutionException e) {
×
1087
      Throwable cause = e.getCause();
×
1088
      if (cause instanceof StatusRuntimeException) {
×
1089
        throw (StatusRuntimeException) cause;
×
1090
      }
1091
      throw Status.INTERNAL
×
1092
          .withCause(cause)
×
1093
          .withDescription(cause.getMessage())
×
1094
          .asRuntimeException();
×
1095
    }
1096
  }
1097

1098
  @Override
1099
  public void pollWorkflowExecutionUpdate(
1100
      PollWorkflowExecutionUpdateRequest request,
1101
      StreamObserver<PollWorkflowExecutionUpdateResponse> responseObserver) {
1102
    try (Context.CancellableContext ctx = deadlineCtx(getUpdatePollDeadline())) {
1✔
1103
      Context toRestore = ctx.attach();
1✔
1104
      try {
1105
        ExecutionId executionId =
1✔
1106
            new ExecutionId(request.getNamespace(), request.getUpdateRef().getWorkflowExecution());
1✔
1107
        TestWorkflowMutableState mutableState = getMutableState(executionId);
1✔
1108
        @Nullable Deadline deadline = Context.current().getDeadline();
1✔
1109
        PollWorkflowExecutionUpdateResponse response =
1✔
1110
            mutableState.pollUpdateWorkflowExecution(request, deadline);
1✔
1111
        responseObserver.onNext(response);
1✔
1112
        responseObserver.onCompleted();
1✔
1113
      } catch (StatusRuntimeException e) {
1✔
1114
        handleStatusRuntimeException(e, responseObserver);
1✔
1115
      } finally {
1116
        ctx.detach(toRestore);
1✔
1117
      }
1118
    }
1119
  }
1✔
1120

1121
  @Override
1122
  public void executeMultiOperation(
1123
      ExecuteMultiOperationRequest request,
1124
      StreamObserver<ExecuteMultiOperationResponse> responseObserver) {
1125
    try {
1126
      if (request.getOperationsCount() != 2) {
1✔
1127
        throw Status.INVALID_ARGUMENT
1✔
1128
            .withDescription("Operations have to be exactly [Start, Update].")
1✔
1129
            .asRuntimeException();
1✔
1130
      }
1131

1132
      StartWorkflowExecutionRequest startRequest;
1133
      ExecuteMultiOperationRequest.Operation firstOperation = request.getOperations(0);
1✔
1134
      if (firstOperation.getOperationCase() != START_WORKFLOW) {
1✔
1135
        throw Status.INVALID_ARGUMENT
1✔
1136
            .withDescription("Operations have to be exactly [Start, Update].")
1✔
1137
            .asRuntimeException();
1✔
1138
      }
1139
      startRequest = firstOperation.getStartWorkflow();
1✔
1140

1141
      if (!startRequest.getCronSchedule().isEmpty()) {
1✔
1142
        throw multiOperationExecutionFailure(
1✔
1143
            MultiOperationExecutionFailure.OperationStatus.newBuilder()
1✔
1144
                .setCode(Status.INVALID_ARGUMENT.getCode().value())
1✔
1145
                .setMessage("INVALID_ARGUMENT: CronSchedule is not allowed.")
1✔
1146
                .build(),
1✔
1147
            null);
1148
      }
1149

1150
      if (startRequest.getRequestEagerExecution()) {
1✔
1151
        throw multiOperationExecutionFailure(
1✔
1152
            MultiOperationExecutionFailure.OperationStatus.newBuilder()
1✔
1153
                .setCode(Status.INVALID_ARGUMENT.getCode().value())
1✔
1154
                .setMessage("INVALID_ARGUMENT: RequestEagerExecution is not supported.")
1✔
1155
                .build(),
1✔
1156
            null);
1157
      }
1158

1159
      UpdateWorkflowExecutionRequest updateRequest;
1160
      ExecuteMultiOperationRequest.Operation secondOperation = request.getOperations(1);
1✔
1161
      if (secondOperation.getOperationCase() != UPDATE_WORKFLOW) {
1✔
1162
        throw Status.INVALID_ARGUMENT
1✔
1163
            .withDescription("Operations have to be exactly [Start, Update].")
1✔
1164
            .asRuntimeException();
1✔
1165
      }
1166
      updateRequest = secondOperation.getUpdateWorkflow();
1✔
1167

1168
      if (!updateRequest.getWorkflowExecution().getRunId().isEmpty()) {
1✔
1169
        throw multiOperationExecutionFailure(
1✔
1170
            null, // start aborted
1171
            MultiOperationExecutionFailure.OperationStatus.newBuilder()
1✔
1172
                .setCode(Status.INVALID_ARGUMENT.getCode().value())
1✔
1173
                .setMessage("INVALID_ARGUMENT: RunId is not allowed.")
1✔
1174
                .build());
1✔
1175
      }
1176

1177
      if (!updateRequest.getFirstExecutionRunId().isEmpty()) {
1✔
1178
        throw multiOperationExecutionFailure(
1✔
1179
            null, // start aborted
1180
            MultiOperationExecutionFailure.OperationStatus.newBuilder()
1✔
1181
                .setCode(Status.INVALID_ARGUMENT.getCode().value())
1✔
1182
                .setMessage("INVALID_ARGUMENT: FirstExecutionRunId is not allowed.")
1✔
1183
                .build());
1✔
1184
      }
1185

1186
      if (!startRequest
1✔
1187
          .getWorkflowId()
1✔
1188
          .equals(updateRequest.getWorkflowExecution().getWorkflowId())) {
1✔
1189
        throw multiOperationExecutionFailure(
1✔
1190
            null, // start aborted
1191
            MultiOperationExecutionFailure.OperationStatus.newBuilder()
1✔
1192
                .setCode(Status.INVALID_ARGUMENT.getCode().value())
1✔
1193
                .setMessage(
1✔
1194
                    "INVALID_ARGUMENT: WorkflowId is not consistent with previous operation(s)")
1195
                .build());
1✔
1196
      }
1197

1198
      if (startRequest.hasWorkflowStartDelay()) {
1✔
1199
        throw multiOperationExecutionFailure(
1✔
1200
            MultiOperationExecutionFailure.OperationStatus.newBuilder()
1✔
1201
                .setCode(Status.INVALID_ARGUMENT.getCode().value())
1✔
1202
                .setMessage("INVALID_ARGUMENT: WorkflowStartDelay is not supported.")
1✔
1203
                .build(),
1✔
1204
            null);
1205
      }
1206

1207
      @Nullable Deadline deadline = getUpdatePollDeadline();
1✔
1208

1209
      AtomicReference<TestWorkflowMutableStateImpl.UpdateHandle> updateHandle =
1✔
1210
          new AtomicReference<>();
1211
      Consumer<TestWorkflowMutableState> applyUpdate =
1✔
1212
          ms -> {
1213
            try {
1214
              updateHandle.set(ms.updateWorkflowExecution(updateRequest, deadline));
1✔
1215
            } catch (StatusRuntimeException e) {
1✔
1216
              throw multiOperationExecutionFailure(
1✔
1217
                  null, // ie start aborted
1218
                  MultiOperationExecutionFailure.OperationStatus.newBuilder()
1✔
1219
                      .setCode(e.getStatus().getCode().value())
1✔
1220
                      .setMessage(e.getMessage())
1✔
1221
                      .build());
1✔
1222
            }
1✔
1223
          };
1✔
1224

1225
      StartWorkflowExecutionResponse startResult;
1226
      try {
1227
        startResult =
1✔
1228
            startWorkflowExecutionImpl(
1✔
1229
                startRequest, Duration.ZERO, Optional.empty(), OptionalLong.empty(), applyUpdate);
1✔
1230
      } catch (StatusRuntimeException e) {
1✔
1231
        if (StatusUtils.hasFailure(e, MultiOperationExecutionFailure.class)) {
1✔
1232
          throw e;
1✔
1233
        }
1234

1235
        throw multiOperationExecutionFailure(
1✔
1236
            MultiOperationExecutionFailure.OperationStatus.newBuilder()
1✔
1237
                .setCode(e.getStatus().getCode().value())
1✔
1238
                .setMessage(e.getMessage())
1✔
1239
                .build(),
1✔
1240
            null); // ie update aborted
1241
      }
1✔
1242

1243
      // if the workflow wasn't started, only send the Update request
1244
      if (!startResult.getStarted()) {
1✔
1245
        ExecutionId executionId =
1✔
1246
            new ExecutionId(request.getNamespace(), updateRequest.getWorkflowExecution());
1✔
1247
        TestWorkflowMutableState mutableState = getMutableState(executionId);
1✔
1248
        applyUpdate.accept(mutableState);
1✔
1249
      }
1250

1251
      UpdateWorkflowExecutionResponse updateResult =
1✔
1252
          waitForUpdateResponse(updateRequest, deadline, updateHandle.get());
1✔
1253

1254
      ExecuteMultiOperationResponse response =
1255
          ExecuteMultiOperationResponse.newBuilder()
1✔
1256
              .addResponses(
1✔
1257
                  ExecuteMultiOperationResponse.Response.newBuilder().setStartWorkflow(startResult))
1✔
1258
              .addResponses(
1✔
1259
                  ExecuteMultiOperationResponse.Response.newBuilder()
1✔
1260
                      .setUpdateWorkflow(updateResult))
1✔
1261
              .build();
1✔
1262
      responseObserver.onNext(response);
1✔
1263
      responseObserver.onCompleted();
1✔
1264
    } catch (StatusRuntimeException e) {
1✔
1265
      handleStatusRuntimeException(e, responseObserver);
1✔
1266
    }
1✔
1267
  }
1✔
1268

1269
  private StatusRuntimeException multiOperationExecutionFailure(
1270
      MultiOperationExecutionFailure.OperationStatus... operationStatuses) {
1271
    Status status = null;
1✔
1272
    for (int i = 0; i < operationStatuses.length; i++) {
1✔
1273
      MultiOperationExecutionFailure.OperationStatus operationStatus = operationStatuses[i];
1✔
1274
      if (operationStatus == null) {
1✔
1275
        // convert to aborted failure
1276
        operationStatuses[i] =
1✔
1277
            MultiOperationExecutionFailure.OperationStatus.newBuilder()
1✔
1278
                .setCode(Status.ABORTED.getCode().value())
1✔
1279
                .setMessage("Operation was aborted.")
1✔
1280
                .addDetails(
1✔
1281
                    ProtoUtils.packAny(
1✔
1282
                        MultiOperationExecutionAborted.newBuilder().build(),
1✔
1283
                        MultiOperationExecutionAborted.getDescriptor()))
1✔
1284
                .build();
1✔
1285
        continue;
1✔
1286
      }
1287
      if (status != null) {
1✔
1288
        throw new IllegalArgumentException(
×
1289
            "exactly one non-null operation status must be specified");
1290
      }
1291
      status = Status.fromCodeValue(operationStatus.getCode());
1✔
1292
    }
1293
    if (status == null) {
1✔
1294
      throw new IllegalArgumentException("exactly one non-null operation status must be specified");
×
1295
    }
1296

1297
    return StatusUtils.newException(
1✔
1298
        status.withDescription("MultiOperation could not be executed"),
1✔
1299
        MultiOperationExecutionFailure.newBuilder()
1✔
1300
            .addAllStatuses(Arrays.asList(operationStatuses))
1✔
1301
            .build(),
1✔
1302
        MultiOperationExecutionFailure.getDescriptor());
1✔
1303
  }
1304

1305
  @Override
1306
  public void signalWithStartWorkflowExecution(
1307
      SignalWithStartWorkflowExecutionRequest r,
1308
      StreamObserver<SignalWithStartWorkflowExecutionResponse> responseObserver) {
1309
    try {
1310
      if (!r.hasTaskQueue()) {
1✔
1311
        throw Status.INVALID_ARGUMENT
×
1312
            .withDescription("request missing required taskQueue field")
×
1313
            .asRuntimeException();
×
1314
      }
1315
      if (!r.hasWorkflowType()) {
1✔
1316
        throw Status.INVALID_ARGUMENT
×
1317
            .withDescription("request missing required workflowType field")
×
1318
            .asRuntimeException();
×
1319
      }
1320
      ExecutionId executionId = new ExecutionId(r.getNamespace(), r.getWorkflowId(), null);
1✔
1321
      TestWorkflowMutableState mutableState = getMutableState(executionId, false);
1✔
1322
      SignalWorkflowExecutionRequest signalRequest =
1323
          SignalWorkflowExecutionRequest.newBuilder()
1✔
1324
              .setInput(r.getSignalInput())
1✔
1325
              .setSignalName(r.getSignalName())
1✔
1326
              .setWorkflowExecution(executionId.getExecution())
1✔
1327
              .setRequestId(r.getRequestId())
1✔
1328
              .setControl(r.getControl())
1✔
1329
              .setNamespace(r.getNamespace())
1✔
1330
              .setIdentity(r.getIdentity())
1✔
1331
              .build();
1✔
1332
      if (mutableState != null && !mutableState.isTerminalState()) {
1✔
1333
        mutableState.signal(signalRequest);
1✔
1334
        responseObserver.onNext(
1✔
1335
            SignalWithStartWorkflowExecutionResponse.newBuilder()
1✔
1336
                .setRunId(mutableState.getExecutionId().getExecution().getRunId())
1✔
1337
                .build());
1✔
1338
        responseObserver.onCompleted();
1✔
1339
        return;
1✔
1340
      }
1341
      StartWorkflowExecutionRequest.Builder startRequest =
1342
          StartWorkflowExecutionRequest.newBuilder()
1✔
1343
              .setRequestId(r.getRequestId())
1✔
1344
              .setInput(r.getInput())
1✔
1345
              .setWorkflowExecutionTimeout(r.getWorkflowExecutionTimeout())
1✔
1346
              .setWorkflowRunTimeout(r.getWorkflowRunTimeout())
1✔
1347
              .setWorkflowTaskTimeout(r.getWorkflowTaskTimeout())
1✔
1348
              .setNamespace(r.getNamespace())
1✔
1349
              .setTaskQueue(r.getTaskQueue())
1✔
1350
              .setWorkflowId(r.getWorkflowId())
1✔
1351
              .setWorkflowIdReusePolicy(r.getWorkflowIdReusePolicy())
1✔
1352
              .setIdentity(r.getIdentity())
1✔
1353
              .setWorkflowType(r.getWorkflowType())
1✔
1354
              .setCronSchedule(r.getCronSchedule())
1✔
1355
              .setRequestId(r.getRequestId());
1✔
1356
      if (r.hasRetryPolicy()) {
1✔
1357
        startRequest.setRetryPolicy(r.getRetryPolicy());
×
1358
      }
1359
      if (r.hasHeader()) {
1✔
1360
        startRequest.setHeader(r.getHeader());
1✔
1361
      }
1362
      if (r.hasMemo()) {
1✔
1363
        startRequest.setMemo(r.getMemo());
×
1364
      }
1365
      if (r.hasSearchAttributes()) {
1✔
1366
        startRequest.setSearchAttributes(r.getSearchAttributes());
1✔
1367
      }
1368
      if (r.hasWorkflowStartDelay()) {
1✔
1369
        startRequest.setWorkflowStartDelay(r.getWorkflowStartDelay());
1✔
1370
      }
1371

1372
      StartWorkflowExecutionResponse startResult =
1✔
1373
          startWorkflowExecutionImpl(
1✔
1374
              startRequest.build(),
1✔
1375
              Duration.ZERO,
1376
              Optional.empty(),
1✔
1377
              OptionalLong.empty(),
1✔
1378
              ms -> {
1379
                ms.signal(signalRequest);
1✔
1380
              });
1✔
1381
      responseObserver.onNext(
1✔
1382
          SignalWithStartWorkflowExecutionResponse.newBuilder()
1✔
1383
              .setRunId(startResult.getRunId())
1✔
1384
              .build());
1✔
1385
      responseObserver.onCompleted();
1✔
1386
    } catch (StatusRuntimeException e) {
1✔
1387
      handleStatusRuntimeException(e, responseObserver);
1✔
1388
    }
1✔
1389
  }
1✔
1390

1391
  public void signalExternalWorkflowExecution(
1392
      String signalId,
1393
      SignalExternalWorkflowExecutionCommandAttributes commandAttributes,
1394
      TestWorkflowMutableState source) {
1395
    String namespace;
1396
    if (commandAttributes.getNamespace().isEmpty()) {
1✔
1397
      namespace = source.getExecutionId().getNamespace();
1✔
1398
    } else {
1399
      namespace = commandAttributes.getNamespace();
×
1400
    }
1401
    ExecutionId executionId = new ExecutionId(namespace, commandAttributes.getExecution());
1✔
1402
    TestWorkflowMutableState mutableState;
1403
    try {
1404
      mutableState = getMutableState(executionId);
1✔
1405
      mutableState.signalFromWorkflow(commandAttributes);
1✔
1406
      source.completeSignalExternalWorkflowExecution(
1✔
1407
          signalId, mutableState.getExecutionId().getExecution().getRunId());
1✔
1408
    } catch (StatusRuntimeException e) {
1✔
1409
      if (e.getStatus().getCode() == Status.Code.NOT_FOUND) {
1✔
1410
        source.failSignalExternalWorkflowExecution(
1✔
1411
            signalId,
1412
            SignalExternalWorkflowExecutionFailedCause
1413
                .SIGNAL_EXTERNAL_WORKFLOW_EXECUTION_FAILED_CAUSE_EXTERNAL_WORKFLOW_EXECUTION_NOT_FOUND);
1414
      } else {
1415
        throw e;
×
1416
      }
1417
    }
1✔
1418
  }
1✔
1419

1420
  /**
1421
   * Creates next run of a workflow execution
1422
   *
1423
   * @return RunId
1424
   */
1425
  public String continueAsNew(
1426
      StartWorkflowExecutionRequest previousRunStartRequest,
1427
      ContinueAsNewWorkflowExecutionCommandAttributes ca,
1428
      WorkflowExecutionContinuedAsNewEventAttributes ea,
1429
      Optional<TestServiceRetryState> retryState,
1430
      String identity,
1431
      ExecutionId continuedExecutionId,
1432
      String firstExecutionRunId,
1433
      Optional<TestWorkflowMutableState> parent,
1434
      OptionalLong parentChildInitiatedEventId) {
1435
    StartWorkflowExecutionRequest.Builder startRequestBuilder =
1436
        StartWorkflowExecutionRequest.newBuilder()
1✔
1437
            .setRequestId(UUID.randomUUID().toString())
1✔
1438
            .setWorkflowType(ea.getWorkflowType())
1✔
1439
            .setWorkflowRunTimeout(ea.getWorkflowRunTimeout())
1✔
1440
            .setWorkflowTaskTimeout(ea.getWorkflowTaskTimeout())
1✔
1441
            .setNamespace(continuedExecutionId.getNamespace())
1✔
1442
            .setTaskQueue(ea.getTaskQueue())
1✔
1443
            .setWorkflowId(continuedExecutionId.getWorkflowId().getWorkflowId())
1✔
1444
            .setWorkflowIdReusePolicy(previousRunStartRequest.getWorkflowIdReusePolicy())
1✔
1445
            .setIdentity(identity)
1✔
1446
            .setCronSchedule(previousRunStartRequest.getCronSchedule());
1✔
1447
    // TODO: Service doesn't perform this copy.
1448
    // See https://github.com/temporalio/temporal/issues/5249
1449
    //    if (previousRunStartRequest.hasRetryPolicy()) {
1450
    //      startRequestBuilder.setRetryPolicy(previousRunStartRequest.getRetryPolicy());
1451
    //    }
1452
    if (previousRunStartRequest.getCompletionCallbacksCount() > 0) {
1✔
1453
      startRequestBuilder.addAllCompletionCallbacks(
1✔
1454
          previousRunStartRequest.getCompletionCallbacksList());
1✔
1455
    }
1456
    if (ca.hasRetryPolicy()) {
1✔
1457
      startRequestBuilder.setRetryPolicy(ca.getRetryPolicy());
1✔
1458
    }
1459
    if (ea.hasInput()) {
1✔
1460
      startRequestBuilder.setInput(ea.getInput());
1✔
1461
    }
1462
    if (ea.hasHeader()) {
1✔
1463
      startRequestBuilder.setHeader(ea.getHeader());
1✔
1464
    }
1465
    StartWorkflowExecutionRequest startRequest = startRequestBuilder.build();
1✔
1466
    lock.lock();
1✔
1467
    Optional<Failure> lastFail =
1468
        ea.hasFailure()
1✔
1469
            ? Optional.of(ea.getFailure())
1✔
1470
            : retryState.flatMap(TestServiceRetryState::getPreviousRunFailure);
1✔
1471
    try {
1472
      StartWorkflowExecutionResponse response =
1✔
1473
          startWorkflowExecutionNoRunningCheckLocked(
1✔
1474
              startRequest,
1475
              ea.getNewExecutionRunId(),
1✔
1476
              firstExecutionRunId,
1477
              Optional.of(continuedExecutionId.getExecution().getRunId()),
1✔
1478
              retryState,
1479
              ProtobufTimeUtils.toJavaDuration(ea.getBackoffStartInterval()),
1✔
1480
              ea.getLastCompletionResult(),
1✔
1481
              lastFail,
1482
              parent,
1483
              parentChildInitiatedEventId,
1484
              null,
1485
              continuedExecutionId.getWorkflowId());
1✔
1486
      return response.getRunId();
1✔
1487
    } finally {
1488
      lock.unlock();
1✔
1489
    }
1490
  }
1491

1492
  @Override
1493
  public void listOpenWorkflowExecutions(
1494
      ListOpenWorkflowExecutionsRequest listRequest,
1495
      StreamObserver<ListOpenWorkflowExecutionsResponse> responseObserver) {
1496
    try {
1497
      Optional<String> workflowIdFilter;
1498
      if (listRequest.hasExecutionFilter()
1✔
1499
          && !listRequest.getExecutionFilter().getWorkflowId().isEmpty()) {
×
1500
        workflowIdFilter = Optional.of(listRequest.getExecutionFilter().getWorkflowId());
×
1501
      } else {
1502
        workflowIdFilter = Optional.empty();
1✔
1503
      }
1504
      List<WorkflowExecutionInfo> result =
1✔
1505
          store.listWorkflows(WorkflowState.OPEN, workflowIdFilter);
1✔
1506
      responseObserver.onNext(
1✔
1507
          ListOpenWorkflowExecutionsResponse.newBuilder().addAllExecutions(result).build());
1✔
1508
      responseObserver.onCompleted();
1✔
1509
    } catch (StatusRuntimeException e) {
×
1510
      handleStatusRuntimeException(e, responseObserver);
×
1511
    }
1✔
1512
  }
1✔
1513

1514
  @Override
1515
  public void listClosedWorkflowExecutions(
1516
      ListClosedWorkflowExecutionsRequest listRequest,
1517
      StreamObserver<ListClosedWorkflowExecutionsResponse> responseObserver) {
1518
    try {
1519
      Optional<String> workflowIdFilter;
1520
      if (listRequest.hasExecutionFilter()
1✔
1521
          && !listRequest.getExecutionFilter().getWorkflowId().isEmpty()) {
×
1522
        workflowIdFilter = Optional.of(listRequest.getExecutionFilter().getWorkflowId());
×
1523
      } else {
1524
        workflowIdFilter = Optional.empty();
1✔
1525
      }
1526
      List<WorkflowExecutionInfo> result =
1✔
1527
          store.listWorkflows(WorkflowState.CLOSED, workflowIdFilter);
1✔
1528
      responseObserver.onNext(
1✔
1529
          ListClosedWorkflowExecutionsResponse.newBuilder().addAllExecutions(result).build());
1✔
1530
      responseObserver.onCompleted();
1✔
1531
    } catch (StatusRuntimeException e) {
×
1532
      handleStatusRuntimeException(e, responseObserver);
×
1533
    }
1✔
1534
  }
1✔
1535

1536
  @Override
1537
  public void respondQueryTaskCompleted(
1538
      RespondQueryTaskCompletedRequest completeRequest,
1539
      StreamObserver<RespondQueryTaskCompletedResponse> responseObserver) {
1540
    try {
1541
      QueryId queryId = QueryId.fromBytes(completeRequest.getTaskToken());
1✔
1542
      TestWorkflowMutableState mutableState = getMutableState(queryId.getExecutionId());
1✔
1543
      mutableState.completeQuery(queryId, completeRequest);
1✔
1544
      responseObserver.onNext(RespondQueryTaskCompletedResponse.getDefaultInstance());
1✔
1545
      responseObserver.onCompleted();
1✔
1546
    } catch (StatusRuntimeException e) {
×
1547
      handleStatusRuntimeException(e, responseObserver);
×
1548
    }
1✔
1549
  }
1✔
1550

1551
  @Override
1552
  public void queryWorkflow(
1553
      QueryWorkflowRequest queryRequest, StreamObserver<QueryWorkflowResponse> responseObserver) {
1554
    try {
1555
      ExecutionId executionId =
1✔
1556
          new ExecutionId(queryRequest.getNamespace(), queryRequest.getExecution());
1✔
1557
      TestWorkflowMutableState mutableState = getMutableState(executionId);
1✔
1558
      @Nullable Deadline deadline = Context.current().getDeadline();
1✔
1559
      QueryWorkflowResponse result =
1✔
1560
          mutableState.query(
1✔
1561
              queryRequest,
1562
              deadline != null ? deadline.timeRemaining(TimeUnit.MILLISECONDS) : Long.MAX_VALUE);
1✔
1563
      responseObserver.onNext(result);
1✔
1564
      responseObserver.onCompleted();
1✔
1565
    } catch (StatusRuntimeException e) {
1✔
1566
      handleStatusRuntimeException(e, responseObserver);
1✔
1567
    }
1✔
1568
  }
1✔
1569

1570
  @Override
1571
  public void describeWorkflowExecution(
1572
      DescribeWorkflowExecutionRequest request,
1573
      StreamObserver<DescribeWorkflowExecutionResponse> responseObserver) {
1574
    try {
1575
      if (request.getNamespace().isEmpty()) {
1✔
1576
        throw createInvalidArgument("Namespace not set on request.");
×
1577
      }
1578
      if (!request.hasExecution()) {
1✔
1579
        throw createInvalidArgument("Execution not set on request.");
×
1580
      }
1581

1582
      ExecutionId executionId = new ExecutionId(request.getNamespace(), request.getExecution());
1✔
1583
      TestWorkflowMutableState mutableState = getMutableState(executionId);
1✔
1584
      DescribeWorkflowExecutionResponse result = mutableState.describeWorkflowExecution();
1✔
1585
      responseObserver.onNext(result);
1✔
1586
      responseObserver.onCompleted();
1✔
1587
    } catch (StatusRuntimeException e) {
1✔
1588
      handleStatusRuntimeException(e, responseObserver);
1✔
1589
    }
1✔
1590
  }
1✔
1591

1592
  /**
1593
   * This method doesn't make much sense for test server, it accepts all namespaces as existent and
1594
   * registered. so, it's a trivial implementation just returning an info that a namespace is
1595
   * registered irrespectively of the input
1596
   */
1597
  @Override
1598
  public void describeNamespace(
1599
      DescribeNamespaceRequest request,
1600
      StreamObserver<DescribeNamespaceResponse> responseObserver) {
1601
    try {
1602
      if (request.getNamespace().isEmpty()) {
1✔
1603
        throw createInvalidArgument("Namespace not set on request.");
×
1604
      }
1605
      // generating a stable UUID for name
1606
      String namespaceId = UUID.nameUUIDFromBytes(request.getNamespace().getBytes()).toString();
1✔
1607
      DescribeNamespaceResponse result =
1608
          DescribeNamespaceResponse.newBuilder()
1✔
1609
              .setNamespaceInfo(
1✔
1610
                  NamespaceInfo.newBuilder()
1✔
1611
                      .setName(request.getNamespace())
1✔
1612
                      .setState(NamespaceState.NAMESPACE_STATE_REGISTERED)
1✔
1613
                      .setId(namespaceId)
1✔
1614
                      .setCapabilities(
1✔
1615
                          NamespaceInfo.Capabilities.newBuilder()
1✔
1616
                              .setEagerWorkflowStart(true)
1✔
1617
                              .setAsyncUpdate(true)
1✔
1618
                              .setSyncUpdate(true))
1✔
1619
                      .build())
1✔
1620
              .build();
1✔
1621
      responseObserver.onNext(result);
1✔
1622
      responseObserver.onCompleted();
1✔
1623
    } catch (StatusRuntimeException e) {
1✔
1624
      handleStatusRuntimeException(e, responseObserver);
1✔
1625
    }
1✔
1626
  }
1✔
1627

1628
  private <R> R requireNotNull(String fieldName, R value) {
1629
    if (value == null) {
1✔
1630
      throw Status.INVALID_ARGUMENT
×
1631
          .withDescription("Missing required field \"" + fieldName + "\".")
×
1632
          .asRuntimeException();
×
1633
    }
1634
    return value;
1✔
1635
  }
1636

1637
  /**
1638
   * Adds diagnostic data about internal service state to the provided {@link StringBuilder}.
1639
   * Includes histories of all workflow instances stored in the service.
1640
   */
1641
  public void getDiagnostics(StringBuilder result) {
1642
    store.getDiagnostics(result);
×
1643
  }
×
1644

1645
  /**
1646
   * @deprecated use {@link TestServiceStubs} and {@link
1647
   *     TestServiceGrpc.TestServiceBlockingStub#getCurrentTime(Empty)}
1648
   */
1649
  @Deprecated
1650
  public long currentTimeMillis() {
1651
    return selfAdvancingTimer.getClock().getAsLong();
×
1652
  }
1653

1654
  /** Invokes callback after the specified delay according to internal service clock. */
1655
  public void registerDelayedCallback(Duration delay, Runnable r) {
1656
    store.registerDelayedCallback(delay, r);
1✔
1657
  }
1✔
1658

1659
  /**
1660
   * Disables time skipping. To re-enable call {@link #unlockTimeSkipping(String)}. These calls are
1661
   * counted, so calling unlock does not guarantee that time is going to be skipped immediately as
1662
   * another lock can be holding it.
1663
   *
1664
   * @deprecated use {@link io.temporal.serviceclient.TestServiceStubs} and {@link
1665
   *     io.temporal.api.testservice.v1.TestServiceGrpc.TestServiceBlockingStub#lockTimeSkipping(LockTimeSkippingRequest)}
1666
   */
1667
  @Deprecated
1668
  public void lockTimeSkipping(String caller) {
1669
    selfAdvancingTimer.lockTimeSkipping(caller);
×
1670
  }
×
1671

1672
  /**
1673
   * @deprecated use {@link io.temporal.serviceclient.TestServiceStubs} and {@link
1674
   *     io.temporal.api.testservice.v1.TestServiceGrpc.TestServiceBlockingStub#unlockTimeSkipping(UnlockTimeSkippingRequest)}
1675
   */
1676
  @Deprecated
1677
  public void unlockTimeSkipping(String caller) {
1678
    selfAdvancingTimer.unlockTimeSkipping(caller);
×
1679
  }
×
1680

1681
  /**
1682
   * Unlocks time skipping and blocks the calling thread until internal clock passes the current +
1683
   * duration time.<br>
1684
   * When the time is reached, locks time skipping and returns.<br>
1685
   * Might not block at all due to time skipping. Or might block if the time skipping lock counter
1686
   * was more than 1.
1687
   *
1688
   * @deprecated use {@link io.temporal.serviceclient.TestServiceStubs} and {@link
1689
   *     io.temporal.api.testservice.v1.TestServiceGrpc.TestServiceBlockingStub#unlockTimeSkippingWithSleep(SleepRequest)}
1690
   */
1691
  @Deprecated
1692
  public void sleep(Duration duration) {
1693
    CompletableFuture<Void> result = new CompletableFuture<>();
×
1694
    selfAdvancingTimer.schedule(
×
1695
        duration,
1696
        () -> {
1697
          selfAdvancingTimer.lockTimeSkipping("TestWorkflowService sleep");
×
1698
          result.complete(null);
×
1699
        },
×
1700
        "workflow sleep");
1701
    selfAdvancingTimer.unlockTimeSkipping("TestWorkflowService sleep");
×
1702
    try {
1703
      result.get();
×
1704
    } catch (InterruptedException e) {
×
1705
      Thread.currentThread().interrupt();
×
1706
      throw new RuntimeException(e);
×
1707
    } catch (ExecutionException e) {
×
1708
      throw new RuntimeException(e);
×
1709
    }
×
1710
  }
×
1711

1712
  /**
1713
   * Temporal server times out task queue long poll calls after 1 minute and returns an empty
1714
   * result. After which the request has to be retried by the client if it wants to continue
1715
   * waiting. We emulate this behavior here.
1716
   *
1717
   * <p>If there is a deadline present, for task queue poll requests server will respond inside the
1718
   * deadline. Note that the latest is not applicable for getWorkflowExecutionHistory() long polls.
1719
   *
1720
   * @return minimum between the context deadline and maximum long poll deadline.
1721
   */
1722
  private Deadline getLongPollDeadline() {
1723
    @Nullable Deadline deadline = Context.current().getDeadline();
1✔
1724
    Deadline maximumDeadline =
1✔
1725
        Deadline.after(
1✔
1726
            WorkflowServiceStubsOptions.DEFAULT_SERVER_LONG_POLL_RPC_TIMEOUT.toMillis(),
1✔
1727
            TimeUnit.MILLISECONDS);
1728
    return deadline != null ? deadline.minimum(maximumDeadline) : maximumDeadline;
1✔
1729
  }
1730

1731
  private Deadline getUpdatePollDeadline() {
1732
    @Nullable Deadline deadline = Context.current().getDeadline();
1✔
1733
    Deadline maximumDeadline =
1✔
1734
        Deadline.after(Duration.ofSeconds(10).toMillis(), TimeUnit.MILLISECONDS);
1✔
1735
    return deadline != null ? deadline.minimum(maximumDeadline) : maximumDeadline;
1✔
1736
  }
1737

1738
  private void handleStatusRuntimeException(
1739
      StatusRuntimeException e, StreamObserver<?> responseObserver) {
1740
    if (e.getStatus().getCode() == Status.Code.INTERNAL) {
1✔
1741
      log.error("unexpected", e);
1✔
1742
    }
1743
    responseObserver.onError(e);
1✔
1744
  }
1✔
1745

1746
  /**
1747
   * Creates an in-memory service along with client stubs for use in Java code. See also
1748
   * createServerOnly and createWithNoGrpcServer.
1749
   *
1750
   * @deprecated use {@link TestServer#createServer(boolean)} instead and pass {@code
1751
   *     lockTimeSkipping=false} to emulate the behavior of this method
1752
   */
1753
  @Deprecated
1754
  public TestWorkflowService() {
1755
    this(0, true);
×
1756
  }
×
1757

1758
  /**
1759
   * Creates an in-memory service along with client stubs for use in Java code. See also
1760
   * createServerOnly and createWithNoGrpcServer.
1761
   *
1762
   * @deprecated use {@link TestServer#createServer(boolean, long)} instead and pass {@code
1763
   *     lockTimeSkipping=false} to emulate the behavior of this method
1764
   */
1765
  @Deprecated
1766
  public TestWorkflowService(long initialTimeMillis) {
1767
    this(initialTimeMillis, true);
×
1768
  }
×
1769

1770
  /**
1771
   * Creates an in-memory service along with client stubs for use in Java code. See also
1772
   * createServerOnly and createWithNoGrpcServer.
1773
   *
1774
   * @deprecated use {@link TestServer#createServer(boolean)} instead
1775
   */
1776
  @Deprecated
1777
  public TestWorkflowService(boolean lockTimeSkipping) {
1778
    this(0, true);
×
1779
    if (lockTimeSkipping) {
×
1780
      this.lockTimeSkipping("constructor");
×
1781
    }
1782
  }
×
1783

1784
  /**
1785
   * Creates an instance of TestWorkflowService that does not manage its own gRPC server. Useful for
1786
   * including in an externally managed gRPC server.
1787
   *
1788
   * @deprecated use {@link TestServicesStarter} to create just the services with gRPC server
1789
   */
1790
  @Deprecated
1791
  public static TestWorkflowService createWithNoGrpcServer() {
1792
    return new TestWorkflowService(0, false);
×
1793
  }
1794

1795
  private TestWorkflowService(long initialTimeMillis, boolean startInProcessServer) {
×
1796
    this.selfAdvancingTimer =
×
1797
        new SelfAdvancingTimerImpl(initialTimeMillis, Clock.systemDefaultZone());
×
1798
    store = new TestWorkflowStoreImpl(this.selfAdvancingTimer);
×
1799
    visibilityStore = new TestVisibilityStoreImpl();
×
1800
    nexusEndpointStore = new TestNexusEndpointStoreImpl();
×
1801
    outOfProcessServer = null;
×
1802
    if (startInProcessServer) {
×
1803
      this.inProcessServer = new InProcessGRPCServer(Collections.singletonList(this));
×
1804
      this.workflowServiceStubs =
×
1805
          WorkflowServiceStubs.newServiceStubs(
×
1806
              WorkflowServiceStubsOptions.newBuilder()
×
1807
                  .setChannel(inProcessServer.getChannel())
×
1808
                  .build());
×
1809
    } else {
1810
      this.inProcessServer = null;
×
1811
      this.workflowServiceStubs = null;
×
1812
    }
1813
  }
×
1814

1815
  /**
1816
   * Creates an out-of-process rather than in-process server, and does not set up a client. Useful,
1817
   * for example, if you want to use the test service from other SDKs.
1818
   *
1819
   * @param port the port to listen on
1820
   * @deprecated use {@link TestServer#createPortBoundServer(int, boolean)} instead and pass {@code
1821
   *     lockTimeSkipping=false} to emulate the behavior of this method
1822
   */
1823
  @Deprecated
1824
  public static TestWorkflowService createServerOnly(int port) {
1825
    TestWorkflowService result = new TestWorkflowService(true, port);
×
1826
    log.info("Server started, listening on " + port);
×
1827
    return result;
×
1828
  }
1829

1830
  private TestWorkflowService(boolean isOutOfProc, int port) {
×
1831
    // isOutOfProc is just here to make unambiguous constructor overloading.
1832
    Preconditions.checkState(isOutOfProc, "Impossible.");
×
1833
    inProcessServer = null;
×
1834
    workflowServiceStubs = null;
×
1835
    this.selfAdvancingTimer = new SelfAdvancingTimerImpl(0, Clock.systemDefaultZone());
×
1836
    store = new TestWorkflowStoreImpl(selfAdvancingTimer);
×
1837
    visibilityStore = new TestVisibilityStoreImpl();
×
1838
    nexusEndpointStore = new TestNexusEndpointStoreImpl();
×
1839
    try {
1840
      ServerBuilder<?> serverBuilder =
×
1841
          Grpc.newServerBuilderForPort(port, InsecureServerCredentials.create());
×
1842
      GRPCServerHelper.registerServicesAndHealthChecks(
×
1843
          Collections.singletonList(this), serverBuilder);
×
1844
      outOfProcessServer = serverBuilder.build().start();
×
1845
    } catch (IOException e) {
×
1846
      throw new RuntimeException(e);
×
1847
    }
×
1848
  }
×
1849

1850
  @Deprecated
1851
  public WorkflowServiceStubs newClientStub() {
1852
    if (workflowServiceStubs == null) {
×
1853
      throw new RuntimeException(
×
1854
          "Cannot get a client when you created your TestWorkflowService with createServerOnly.");
1855
    }
1856
    return workflowServiceStubs;
×
1857
  }
1858

1859
  private static StatusRuntimeException createInvalidArgument(String description) {
1860
    throw Status.INVALID_ARGUMENT.withDescription(description).asRuntimeException();
1✔
1861
  }
1862
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc