• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

temporalio / sdk-java / #272

21 Jun 2024 08:17PM UTC coverage: 77.548% (+0.04%) from 77.506%
#272

push

github

web-flow
Resource based tuner (#2110)

275 of 338 new or added lines in 11 files covered. (81.36%)

12 existing lines in 5 files now uncovered.

19522 of 25174 relevant lines covered (77.55%)

0.78 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

72.15
/temporal-test-server/src/main/java/io/temporal/internal/testservice/TestWorkflowService.java
1
/*
2
 * Copyright (C) 2022 Temporal Technologies, Inc. All Rights Reserved.
3
 *
4
 * Copyright (C) 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved.
5
 *
6
 * Modifications copyright (C) 2017 Uber Technologies, Inc.
7
 *
8
 * Licensed under the Apache License, Version 2.0 (the "License");
9
 * you may not use this material except in compliance with the License.
10
 * You may obtain a copy of the License at
11
 *
12
 *   http://www.apache.org/licenses/LICENSE-2.0
13
 *
14
 * Unless required by applicable law or agreed to in writing, software
15
 * distributed under the License is distributed on an "AS IS" BASIS,
16
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17
 * See the License for the specific language governing permissions and
18
 * limitations under the License.
19
 */
20

21
package io.temporal.internal.testservice;
22

23
import static io.temporal.internal.testservice.CronUtils.getBackoffInterval;
24

25
import com.google.common.base.Preconditions;
26
import com.google.common.base.Throwables;
27
import com.google.protobuf.ByteString;
28
import com.google.protobuf.Empty;
29
import com.google.protobuf.Timestamp;
30
import com.google.protobuf.util.Timestamps;
31
import io.grpc.*;
32
import io.grpc.stub.StreamObserver;
33
import io.temporal.api.command.v1.ContinueAsNewWorkflowExecutionCommandAttributes;
34
import io.temporal.api.command.v1.SignalExternalWorkflowExecutionCommandAttributes;
35
import io.temporal.api.common.v1.Payload;
36
import io.temporal.api.common.v1.Payloads;
37
import io.temporal.api.common.v1.RetryPolicy;
38
import io.temporal.api.common.v1.WorkflowExecution;
39
import io.temporal.api.enums.v1.*;
40
import io.temporal.api.errordetails.v1.WorkflowExecutionAlreadyStartedFailure;
41
import io.temporal.api.failure.v1.Failure;
42
import io.temporal.api.history.v1.WorkflowExecutionContinuedAsNewEventAttributes;
43
import io.temporal.api.namespace.v1.NamespaceInfo;
44
import io.temporal.api.testservice.v1.LockTimeSkippingRequest;
45
import io.temporal.api.testservice.v1.SleepRequest;
46
import io.temporal.api.testservice.v1.UnlockTimeSkippingRequest;
47
import io.temporal.api.workflow.v1.WorkflowExecutionInfo;
48
import io.temporal.api.workflowservice.v1.*;
49
import io.temporal.internal.common.ProtobufTimeUtils;
50
import io.temporal.internal.testservice.TestWorkflowStore.WorkflowState;
51
import io.temporal.serviceclient.StatusUtils;
52
import io.temporal.serviceclient.WorkflowServiceStubs;
53
import io.temporal.serviceclient.WorkflowServiceStubsOptions;
54
import java.io.Closeable;
55
import java.io.IOException;
56
import java.time.Clock;
57
import java.time.Duration;
58
import java.util.*;
59
import java.util.concurrent.*;
60
import java.util.concurrent.locks.Lock;
61
import java.util.concurrent.locks.ReentrantLock;
62
import javax.annotation.Nonnull;
63
import javax.annotation.Nullable;
64
import org.slf4j.Logger;
65
import org.slf4j.LoggerFactory;
66

67
/**
68
 * In memory implementation of the Workflow Service. To be used for testing purposes only.
69
 *
70
 * <p>Do not use directly, instead use {@link io.temporal.testing.TestWorkflowEnvironment}.
71
 */
72
public final class TestWorkflowService extends WorkflowServiceGrpc.WorkflowServiceImplBase
73
    implements Closeable {
74
  private static final Logger log = LoggerFactory.getLogger(TestWorkflowService.class);
1✔
75
  private final Map<ExecutionId, TestWorkflowMutableState> executions = new HashMap<>();
1✔
76
  // key->WorkflowId
77
  private final Map<WorkflowId, TestWorkflowMutableState> executionsByWorkflowId = new HashMap<>();
1✔
78
  private final ExecutorService executor = Executors.newCachedThreadPool();
1✔
79
  private final Lock lock = new ReentrantLock();
1✔
80

81
  private final TestWorkflowStore store;
82
  private final TestVisibilityStore visibilityStore;
83
  private final SelfAdvancingTimer selfAdvancingTimer;
84

85
  private final ScheduledExecutorService backgroundScheduler =
1✔
86
      Executors.newSingleThreadScheduledExecutor();
1✔
87

88
  private final Server outOfProcessServer;
89
  private final InProcessGRPCServer inProcessServer;
90
  private final WorkflowServiceStubs workflowServiceStubs;
91

92
  TestWorkflowService(
93
      TestWorkflowStore store,
94
      TestVisibilityStore visibilityStore,
95
      SelfAdvancingTimer selfAdvancingTimer) {
1✔
96
    this.store = store;
1✔
97
    this.visibilityStore = visibilityStore;
1✔
98
    this.selfAdvancingTimer = selfAdvancingTimer;
1✔
99
    this.outOfProcessServer = null;
1✔
100
    this.inProcessServer = null;
1✔
101
    this.workflowServiceStubs = null;
1✔
102
  }
1✔
103

104
  @Override
105
  public void close() {
106
    log.debug("Shutting down TestWorkflowService");
1✔
107

108
    log.debug("Shutting down background scheduler");
1✔
109
    backgroundScheduler.shutdown();
1✔
110

111
    if (outOfProcessServer != null) {
1✔
112
      log.info("Shutting down out-of-process GRPC server");
×
113
      outOfProcessServer.shutdown();
×
114
    }
115

116
    if (workflowServiceStubs != null) {
1✔
117
      workflowServiceStubs.shutdown();
×
118
    }
119

120
    if (inProcessServer != null) {
1✔
121
      log.info("Shutting down in-process GRPC server");
×
122
      inProcessServer.shutdown();
×
123
    }
124

125
    executor.shutdown();
1✔
126

127
    try {
128
      executor.awaitTermination(1, TimeUnit.SECONDS);
1✔
129

130
      if (outOfProcessServer != null) {
1✔
131
        outOfProcessServer.awaitTermination(1, TimeUnit.SECONDS);
×
132
      }
133

134
      if (workflowServiceStubs != null) {
1✔
135
        workflowServiceStubs.awaitTermination(1, TimeUnit.SECONDS);
×
136
      }
137

138
      if (inProcessServer != null) {
1✔
139
        inProcessServer.awaitTermination(1, TimeUnit.SECONDS);
×
140
      }
141

142
    } catch (InterruptedException e) {
×
143
      Thread.currentThread().interrupt();
×
144
      log.debug("shutdown interrupted", e);
×
145
    }
1✔
146

147
    store.close();
1✔
148
  }
1✔
149

150
  private TestWorkflowMutableState getMutableState(ExecutionId executionId) {
151
    return getMutableState(executionId, true);
1✔
152
  }
153

154
  private TestWorkflowMutableState getMutableState(ExecutionId executionId, boolean failNotExists) {
155
    lock.lock();
1✔
156
    try {
157
      if (executionId.getExecution().getRunId().isEmpty()) {
1✔
158
        return getMutableState(executionId.getWorkflowId(), failNotExists);
1✔
159
      }
160
      TestWorkflowMutableState mutableState = executions.get(executionId);
1✔
161
      if (mutableState == null && failNotExists) {
1✔
162
        throw Status.NOT_FOUND
1✔
163
            .withDescription(
1✔
164
                "Execution \""
165
                    + executionId
166
                    + "\" not found in mutable state. Known executions: "
167
                    + executions.values()
1✔
168
                    + ", service="
169
                    + this)
170
            .asRuntimeException();
1✔
171
      }
172
      return mutableState;
1✔
173
    } finally {
174
      lock.unlock();
1✔
175
    }
176
  }
177

178
  private TestWorkflowMutableState getMutableState(WorkflowId workflowId, boolean failNotExists) {
179
    lock.lock();
1✔
180
    try {
181
      TestWorkflowMutableState mutableState = executionsByWorkflowId.get(workflowId);
1✔
182
      if (mutableState == null && failNotExists) {
1✔
183
        throw Status.NOT_FOUND
1✔
184
            .withDescription("Execution not found in mutable state: " + workflowId)
1✔
185
            .asRuntimeException();
1✔
186
      }
187
      return mutableState;
1✔
188
    } finally {
189
      lock.unlock();
1✔
190
    }
191
  }
192

193
  @Override
194
  public void startWorkflowExecution(
195
      StartWorkflowExecutionRequest request,
196
      StreamObserver<StartWorkflowExecutionResponse> responseObserver) {
197
    try {
198
      Duration backoffInterval = getBackoffInterval(request.getCronSchedule(), store.currentTime());
1✔
199
      StartWorkflowExecutionResponse response =
1✔
200
          startWorkflowExecutionImpl(
1✔
201
              request, backoffInterval, Optional.empty(), OptionalLong.empty(), null);
1✔
202
      responseObserver.onNext(response);
1✔
203
      responseObserver.onCompleted();
1✔
204
    } catch (StatusRuntimeException e) {
1✔
205
      handleStatusRuntimeException(e, responseObserver);
1✔
206
    }
1✔
207
  }
1✔
208

209
  StartWorkflowExecutionResponse startWorkflowExecutionImpl(
210
      StartWorkflowExecutionRequest startRequest,
211
      Duration backoffStartInterval,
212
      Optional<TestWorkflowMutableState> parent,
213
      OptionalLong parentChildInitiatedEventId,
214
      @Nullable SignalWorkflowExecutionRequest signalWithStartSignal) {
215
    String requestWorkflowId = requireNotNull("WorkflowId", startRequest.getWorkflowId());
1✔
216
    String namespace = requireNotNull("Namespace", startRequest.getNamespace());
1✔
217
    WorkflowId workflowId = new WorkflowId(namespace, requestWorkflowId);
1✔
218
    WorkflowIdReusePolicy reusePolicy = startRequest.getWorkflowIdReusePolicy();
1✔
219
    WorkflowIdConflictPolicy conflictPolicy = startRequest.getWorkflowIdConflictPolicy();
1✔
220
    if (conflictPolicy != WorkflowIdConflictPolicy.WORKFLOW_ID_CONFLICT_POLICY_UNSPECIFIED
1✔
221
        && reusePolicy == WorkflowIdReusePolicy.WORKFLOW_ID_REUSE_POLICY_TERMINATE_IF_RUNNING) {
222
      throw createInvalidArgument(
×
223
          "Invalid WorkflowIDReusePolicy: WORKFLOW_ID_REUSE_POLICY_TERMINATE_IF_RUNNING cannot be used together with a WorkflowIDConflictPolicy.");
224
    }
225

226
    TestWorkflowMutableState existing;
227
    lock.lock();
1✔
228
    try {
229
      String newRunId = UUID.randomUUID().toString();
1✔
230
      existing = executionsByWorkflowId.get(workflowId);
1✔
231
      if (existing != null) {
1✔
232
        WorkflowExecutionStatus status = existing.getWorkflowExecutionStatus();
1✔
233

234
        if (status == WorkflowExecutionStatus.WORKFLOW_EXECUTION_STATUS_RUNNING
1✔
235
            && (reusePolicy == WorkflowIdReusePolicy.WORKFLOW_ID_REUSE_POLICY_TERMINATE_IF_RUNNING
236
                || conflictPolicy
237
                    == WorkflowIdConflictPolicy.WORKFLOW_ID_CONFLICT_POLICY_TERMINATE_EXISTING)) {
238
          existing.terminateWorkflowExecution(
1✔
239
              TerminateWorkflowExecutionRequest.newBuilder()
1✔
240
                  .setNamespace(startRequest.getNamespace())
1✔
241
                  .setWorkflowExecution(existing.getExecutionId().getExecution())
1✔
242
                  .setReason("TerminateIfRunning WorkflowIdReusePolicy Policy")
1✔
243
                  .setIdentity("history-service")
1✔
244
                  .setDetails(
1✔
245
                      Payloads.newBuilder()
1✔
246
                          .addPayloads(
1✔
247
                              Payload.newBuilder()
1✔
248
                                  .setData(
1✔
249
                                      ByteString.copyFromUtf8(
1✔
250
                                          String.format("terminated by new runID: %s", newRunId)))
1✔
251
                                  .build())
1✔
252
                          .build())
1✔
253
                  .build());
1✔
254
        } else if (status == WorkflowExecutionStatus.WORKFLOW_EXECUTION_STATUS_RUNNING
1✔
255
            && conflictPolicy
256
                == WorkflowIdConflictPolicy.WORKFLOW_ID_CONFLICT_POLICY_USE_EXISTING) {
257
          return StartWorkflowExecutionResponse.newBuilder()
1✔
258
              .setStarted(false)
1✔
259
              .setRunId(existing.getExecutionId().getExecution().getRunId())
1✔
260
              .build();
1✔
261
        } else if (status == WorkflowExecutionStatus.WORKFLOW_EXECUTION_STATUS_RUNNING
1✔
262
            || reusePolicy == WorkflowIdReusePolicy.WORKFLOW_ID_REUSE_POLICY_REJECT_DUPLICATE) {
263
          return throwDuplicatedWorkflow(startRequest, existing);
×
264
        } else if (reusePolicy
1✔
265
                == WorkflowIdReusePolicy.WORKFLOW_ID_REUSE_POLICY_ALLOW_DUPLICATE_FAILED_ONLY
266
            && (status == WorkflowExecutionStatus.WORKFLOW_EXECUTION_STATUS_COMPLETED
267
                || status == WorkflowExecutionStatus.WORKFLOW_EXECUTION_STATUS_CONTINUED_AS_NEW)) {
268
          return throwDuplicatedWorkflow(startRequest, existing);
×
269
        }
270
      }
271
      Optional<TestServiceRetryState> retryState;
272
      Optional<Failure> lastFailure = Optional.empty();
1✔
273
      if (startRequest.hasRetryPolicy()) {
1✔
274
        Duration expirationInterval =
1✔
275
            ProtobufTimeUtils.toJavaDuration(startRequest.getWorkflowExecutionTimeout());
1✔
276
        retryState = newRetryStateLocked(startRequest.getRetryPolicy(), expirationInterval);
1✔
277
        if (retryState.isPresent()) {
1✔
278
          lastFailure = retryState.get().getPreviousRunFailure();
1✔
279
        }
280
      } else {
1✔
281
        retryState = Optional.empty();
1✔
282
      }
283
      return startWorkflowExecutionNoRunningCheckLocked(
1✔
284
          startRequest,
285
          newRunId,
286
          // it's the first execution in the continue-as-new chain, so firstExecutionRunId =
287
          // newRunId
288
          newRunId,
289
          Optional.empty(),
1✔
290
          retryState,
291
          backoffStartInterval,
292
          null,
293
          lastFailure,
294
          parent,
295
          parentChildInitiatedEventId,
296
          signalWithStartSignal,
297
          workflowId);
298
    } finally {
299
      lock.unlock();
1✔
300
    }
301
  }
302

303
  private Optional<TestServiceRetryState> newRetryStateLocked(
304
      RetryPolicy retryPolicy, Duration expirationInterval) {
305
    Timestamp expirationTime =
306
        expirationInterval.isZero()
1✔
307
            ? Timestamps.fromNanos(0)
1✔
308
            : Timestamps.add(
1✔
309
                store.currentTime(), ProtobufTimeUtils.toProtoDuration(expirationInterval));
1✔
310
    return Optional.of(new TestServiceRetryState(retryPolicy, expirationTime));
1✔
311
  }
312

313
  private StartWorkflowExecutionResponse throwDuplicatedWorkflow(
314
      StartWorkflowExecutionRequest startRequest, TestWorkflowMutableState existing) {
315
    WorkflowExecution execution = existing.getExecutionId().getExecution();
1✔
316
    WorkflowExecutionAlreadyStartedFailure error =
317
        WorkflowExecutionAlreadyStartedFailure.newBuilder()
1✔
318
            .setRunId(execution.getRunId())
1✔
319
            .setStartRequestId(startRequest.getRequestId())
1✔
320
            .build();
1✔
321
    throw StatusUtils.newException(
1✔
322
        Status.ALREADY_EXISTS.withDescription(
1✔
323
            String.format(
1✔
324
                "WorkflowId: %s, " + "RunId: %s", execution.getWorkflowId(), execution.getRunId())),
1✔
325
        error,
326
        WorkflowExecutionAlreadyStartedFailure.getDescriptor());
1✔
327
  }
328

329
  private StartWorkflowExecutionResponse startWorkflowExecutionNoRunningCheckLocked(
330
      StartWorkflowExecutionRequest startRequest,
331
      @Nonnull String runId,
332
      @Nonnull String firstExecutionRunId,
333
      Optional<String> continuedExecutionRunId,
334
      Optional<TestServiceRetryState> retryState,
335
      Duration backoffStartInterval,
336
      Payloads lastCompletionResult,
337
      Optional<Failure> lastFailure,
338
      Optional<TestWorkflowMutableState> parent,
339
      OptionalLong parentChildInitiatedEventId,
340
      @Nullable SignalWorkflowExecutionRequest signalWithStartSignal,
341
      WorkflowId workflowId) {
342
    String namespace = startRequest.getNamespace();
1✔
343
    TestWorkflowMutableState mutableState =
1✔
344
        new TestWorkflowMutableStateImpl(
345
            startRequest,
346
            firstExecutionRunId,
347
            runId,
348
            retryState,
349
            backoffStartInterval,
350
            lastCompletionResult,
351
            lastFailure,
352
            parent,
353
            parentChildInitiatedEventId,
354
            continuedExecutionRunId,
355
            this,
356
            store,
357
            visibilityStore,
358
            selfAdvancingTimer);
359
    WorkflowExecution execution = mutableState.getExecutionId().getExecution();
1✔
360
    ExecutionId executionId = new ExecutionId(namespace, execution);
1✔
361
    executionsByWorkflowId.put(workflowId, mutableState);
1✔
362
    executions.put(executionId, mutableState);
1✔
363

364
    PollWorkflowTaskQueueRequest eagerWorkflowTaskPollRequest =
365
        startRequest.getRequestEagerExecution()
1✔
366
            ? PollWorkflowTaskQueueRequest.newBuilder()
1✔
367
                .setIdentity(startRequest.getIdentity())
1✔
368
                .setNamespace(startRequest.getNamespace())
1✔
369
                .setTaskQueue(startRequest.getTaskQueue())
1✔
370
                .build()
1✔
371
            : null;
1✔
372

373
    @Nullable
374
    PollWorkflowTaskQueueResponse eagerWorkflowTask =
1✔
375
        mutableState.startWorkflow(
1✔
376
            continuedExecutionRunId.isPresent(),
1✔
377
            signalWithStartSignal,
378
            eagerWorkflowTaskPollRequest);
379
    StartWorkflowExecutionResponse.Builder response =
380
        StartWorkflowExecutionResponse.newBuilder().setRunId(execution.getRunId()).setStarted(true);
1✔
381
    if (eagerWorkflowTask != null) {
1✔
382
      response.setEagerWorkflowTask(eagerWorkflowTask);
1✔
383
    }
384
    return response.build();
1✔
385
  }
386

387
  @Override
388
  public void getWorkflowExecutionHistory(
389
      GetWorkflowExecutionHistoryRequest getRequest,
390
      StreamObserver<GetWorkflowExecutionHistoryResponse> responseObserver) {
391
    ExecutionId executionId = new ExecutionId(getRequest.getNamespace(), getRequest.getExecution());
1✔
392
    executor.execute(
1✔
393
        // preserving gRPC context deadline between threads
394
        Context.current()
1✔
395
            .wrap(
1✔
396
                () -> {
397
                  try {
398
                    TestWorkflowMutableState mutableState = getMutableState(executionId);
1✔
399
                    responseObserver.onNext(
1✔
400
                        store.getWorkflowExecutionHistory(
1✔
401
                            mutableState.getExecutionId(),
1✔
402
                            getRequest,
403
                            // We explicitly don't try to respond inside the context deadline.
404
                            // If we try to fit into the context deadline, the deadline may be not
405
                            // expired on the client side and an empty response will lead to a new
406
                            // request, making the client hammer the server at the tail end of the
407
                            // deadline.
408
                            // So this call is designed to wait fully till the end of the
409
                            // context deadline and throw DEADLINE_EXCEEDED if the deadline is less
410
                            // than 20s.
411
                            // If it's longer than 20 seconds - we return an empty result.
412
                            Deadline.after(20, TimeUnit.SECONDS)));
1✔
413
                    responseObserver.onCompleted();
1✔
414
                  } catch (StatusRuntimeException e) {
1✔
415
                    if (e.getStatus().getCode() == Status.Code.INTERNAL) {
1✔
416
                      log.error("unexpected", e);
×
417
                    }
418
                    responseObserver.onError(e);
1✔
419
                  } catch (Exception e) {
×
420
                    log.error("unexpected", e);
×
421
                    responseObserver.onError(e);
×
422
                  }
1✔
423
                }));
1✔
424
  }
1✔
425

426
  private <T> T pollTaskQueue(Context ctx, Future<T> futureValue)
427
      throws ExecutionException, InterruptedException {
428
    final Context.CancellationListener canceler = context -> futureValue.cancel(true);
1✔
429
    ctx.addListener(canceler, this.backgroundScheduler);
1✔
430
    try {
431
      return futureValue.get();
1✔
432
    } finally {
433
      ctx.removeListener(canceler);
1✔
434
    }
435
  }
436

437
  @Override
438
  public void pollWorkflowTaskQueue(
439
      PollWorkflowTaskQueueRequest pollRequest,
440
      StreamObserver<PollWorkflowTaskQueueResponse> responseObserver) {
441
    try (Context.CancellableContext ctx = deadlineCtx(getLongPollDeadline())) {
1✔
442
      PollWorkflowTaskQueueResponse.Builder task;
443
      try {
444
        task = pollTaskQueue(ctx, store.pollWorkflowTaskQueue(pollRequest));
1✔
445
      } catch (ExecutionException e) {
×
446
        responseObserver.onError(e);
×
447
        return;
×
448
      } catch (InterruptedException e) {
×
449
        Thread.currentThread().interrupt();
×
450
        responseObserver.onNext(PollWorkflowTaskQueueResponse.getDefaultInstance());
×
451
        responseObserver.onCompleted();
×
452
        return;
×
453
      } catch (CancellationException e) {
1✔
454
        responseObserver.onNext(PollWorkflowTaskQueueResponse.getDefaultInstance());
1✔
455
        responseObserver.onCompleted();
1✔
456
        return;
1✔
457
      }
1✔
458

459
      ExecutionId executionId =
1✔
460
          new ExecutionId(pollRequest.getNamespace(), task.getWorkflowExecution());
1✔
461
      TestWorkflowMutableState mutableState = getMutableState(executionId);
1✔
462
      try {
463
        mutableState.startWorkflowTask(task, pollRequest);
1✔
464
        // The task always has the original task queue that was created as part of the response.
465
        // This may be a different task queue than the task queue it was scheduled on, as in the
466
        // case of sticky execution.
467
        task.setWorkflowExecutionTaskQueue(mutableState.getStartRequest().getTaskQueue());
1✔
468
        PollWorkflowTaskQueueResponse response = task.build();
1✔
469
        responseObserver.onNext(response);
1✔
470
        responseObserver.onCompleted();
1✔
471
      } catch (StatusRuntimeException e) {
1✔
472
        if (e.getStatus().getCode() == Status.Code.NOT_FOUND) {
1✔
473
          if (log.isDebugEnabled()) {
1✔
474
            log.debug("Skipping outdated workflow task for " + executionId, e);
×
475
          }
476
          // The real service doesn't return this call on outdated task.
477
          // For simplicity, we return an empty result here.
478
          responseObserver.onNext(PollWorkflowTaskQueueResponse.getDefaultInstance());
1✔
479
          responseObserver.onCompleted();
1✔
480
        } else {
481
          if (e.getStatus().getCode() == Status.Code.INTERNAL) {
×
482
            log.error("unexpected", e);
×
483
          }
484
          responseObserver.onError(e);
×
485
        }
486
      }
1✔
487
    }
1✔
488
  }
1✔
489

490
  @Override
491
  public void respondWorkflowTaskCompleted(
492
      RespondWorkflowTaskCompletedRequest request,
493
      StreamObserver<RespondWorkflowTaskCompletedResponse> responseObserver) {
494
    try {
495
      WorkflowTaskToken taskToken = WorkflowTaskToken.fromBytes(request.getTaskToken());
1✔
496
      TestWorkflowMutableState mutableState = getMutableState(taskToken.getExecutionId());
1✔
497
      mutableState.completeWorkflowTask(taskToken.getHistorySize(), request);
1✔
498
      responseObserver.onNext(RespondWorkflowTaskCompletedResponse.getDefaultInstance());
1✔
499
      responseObserver.onCompleted();
1✔
500
    } catch (StatusRuntimeException e) {
1✔
501
      handleStatusRuntimeException(e, responseObserver);
1✔
502
    } catch (Throwable e) {
×
503
      responseObserver.onError(
×
504
          Status.INTERNAL
505
              .withDescription(Throwables.getStackTraceAsString(e))
×
506
              .withCause(e)
×
507
              .asRuntimeException());
×
508
    }
1✔
509
  }
1✔
510

511
  @Override
512
  public void respondWorkflowTaskFailed(
513
      RespondWorkflowTaskFailedRequest failedRequest,
514
      StreamObserver<RespondWorkflowTaskFailedResponse> responseObserver) {
515
    try {
516
      WorkflowTaskToken taskToken = WorkflowTaskToken.fromBytes(failedRequest.getTaskToken());
1✔
517
      TestWorkflowMutableState mutableState = getMutableState(taskToken.getExecutionId());
1✔
518
      mutableState.failWorkflowTask(failedRequest);
1✔
519
      responseObserver.onNext(RespondWorkflowTaskFailedResponse.getDefaultInstance());
1✔
520
      responseObserver.onCompleted();
1✔
521
    } catch (StatusRuntimeException e) {
×
522
      handleStatusRuntimeException(e, responseObserver);
×
523
    }
1✔
524
  }
1✔
525

526
  @Override
527
  public void getSystemInfo(
528
      GetSystemInfoRequest request, StreamObserver<GetSystemInfoResponse> responseObserver) {
529
    responseObserver.onNext(
1✔
530
        GetSystemInfoResponse.newBuilder()
1✔
531
            .setCapabilities(
1✔
532
                // These are the capabilities I could verify the test server supports
533
                GetSystemInfoResponse.Capabilities.newBuilder()
1✔
534
                    .setSdkMetadata(true)
1✔
535
                    .setSignalAndQueryHeader(true)
1✔
536
                    .setEncodedFailureAttributes(true)
1✔
537
                    .setEagerWorkflowStart(true)
1✔
538
                    .build())
1✔
539
            .build());
1✔
540
    responseObserver.onCompleted();
1✔
541
  }
1✔
542

543
  private Context.CancellableContext deadlineCtx(Deadline deadline) {
544
    return Context.current().withDeadline(deadline, this.backgroundScheduler);
1✔
545
  }
546

547
  @Override
548
  public void pollActivityTaskQueue(
549
      PollActivityTaskQueueRequest pollRequest,
550
      StreamObserver<PollActivityTaskQueueResponse> responseObserver) {
551
    try (Context.CancellableContext ctx = deadlineCtx(getLongPollDeadline())) {
1✔
552

553
      PollActivityTaskQueueResponse.Builder task;
554
      try {
555
        task = pollTaskQueue(ctx, store.pollActivityTaskQueue(pollRequest));
1✔
556
      } catch (ExecutionException e) {
×
557
        responseObserver.onError(e);
×
558
        return;
×
559
      } catch (InterruptedException e) {
×
560
        Thread.currentThread().interrupt();
×
561
        responseObserver.onNext(PollActivityTaskQueueResponse.getDefaultInstance());
×
562
        responseObserver.onCompleted();
×
563
        return;
×
564
      } catch (CancellationException e) {
1✔
565
        responseObserver.onNext(PollActivityTaskQueueResponse.getDefaultInstance());
1✔
566
        responseObserver.onCompleted();
1✔
567
        return;
1✔
568
      }
1✔
569

570
      ExecutionId executionId =
1✔
571
          new ExecutionId(pollRequest.getNamespace(), task.getWorkflowExecution());
1✔
572
      TestWorkflowMutableState mutableState = getMutableState(executionId);
1✔
573
      try {
574
        mutableState.startActivityTask(task, pollRequest);
1✔
575
        responseObserver.onNext(task.build());
1✔
576
        responseObserver.onCompleted();
1✔
577
      } catch (StatusRuntimeException e) {
1✔
578
        if (e.getStatus().getCode() == Status.Code.NOT_FOUND) {
1✔
579
          if (log.isDebugEnabled()) {
1✔
580
            log.debug("Skipping outdated activity task for " + executionId, e);
×
581
          }
582
          responseObserver.onNext(PollActivityTaskQueueResponse.getDefaultInstance());
1✔
583
          responseObserver.onCompleted();
1✔
584
        } else {
585
          if (e.getStatus().getCode() == Status.Code.INTERNAL) {
×
586
            log.error("unexpected", e);
×
587
          }
588
          responseObserver.onError(e);
×
589
        }
590
      }
1✔
591
    }
1✔
592
  }
1✔
593

594
  @Override
595
  public void recordActivityTaskHeartbeat(
596
      RecordActivityTaskHeartbeatRequest heartbeatRequest,
597
      StreamObserver<RecordActivityTaskHeartbeatResponse> responseObserver) {
598
    try {
599
      ActivityTaskToken activityTaskToken =
1✔
600
          ActivityTaskToken.fromBytes(heartbeatRequest.getTaskToken());
1✔
601
      TestWorkflowMutableState mutableState = getMutableState(activityTaskToken.getExecutionId());
1✔
602
      boolean cancelRequested =
1✔
603
          mutableState.heartbeatActivityTask(
1✔
604
              activityTaskToken.getScheduledEventId(), heartbeatRequest.getDetails());
1✔
605
      responseObserver.onNext(
1✔
606
          RecordActivityTaskHeartbeatResponse.newBuilder()
1✔
607
              .setCancelRequested(cancelRequested)
1✔
608
              .build());
1✔
609
      responseObserver.onCompleted();
1✔
610
    } catch (StatusRuntimeException e) {
1✔
611
      handleStatusRuntimeException(e, responseObserver);
1✔
612
    }
1✔
613
  }
1✔
614

615
  @Override
616
  public void recordActivityTaskHeartbeatById(
617
      RecordActivityTaskHeartbeatByIdRequest heartbeatRequest,
618
      StreamObserver<RecordActivityTaskHeartbeatByIdResponse> responseObserver) {
619
    try {
620
      ExecutionId execution =
×
621
          new ExecutionId(
622
              heartbeatRequest.getNamespace(),
×
623
              heartbeatRequest.getWorkflowId(),
×
624
              heartbeatRequest.getRunId());
×
625
      TestWorkflowMutableState mutableState = getMutableState(execution);
×
626
      boolean cancelRequested =
×
627
          mutableState.heartbeatActivityTaskById(
×
628
              heartbeatRequest.getActivityId(),
×
629
              heartbeatRequest.getDetails(),
×
630
              heartbeatRequest.getIdentity());
×
631
      responseObserver.onNext(
×
632
          RecordActivityTaskHeartbeatByIdResponse.newBuilder()
×
633
              .setCancelRequested(cancelRequested)
×
634
              .build());
×
635
      responseObserver.onCompleted();
×
636
    } catch (StatusRuntimeException e) {
×
637
      handleStatusRuntimeException(e, responseObserver);
×
638
    }
×
639
  }
×
640

641
  @Override
642
  public void respondActivityTaskCompleted(
643
      RespondActivityTaskCompletedRequest completeRequest,
644
      StreamObserver<RespondActivityTaskCompletedResponse> responseObserver) {
645
    try {
646
      ActivityTaskToken activityTaskToken =
1✔
647
          ActivityTaskToken.fromBytes(completeRequest.getTaskToken());
1✔
648
      TestWorkflowMutableState mutableState = getMutableState(activityTaskToken.getExecutionId());
1✔
649
      mutableState.completeActivityTask(activityTaskToken.getScheduledEventId(), completeRequest);
1✔
650
      responseObserver.onNext(RespondActivityTaskCompletedResponse.getDefaultInstance());
1✔
651
      responseObserver.onCompleted();
1✔
652
    } catch (StatusRuntimeException e) {
1✔
653
      handleStatusRuntimeException(e, responseObserver);
1✔
654
    }
1✔
655
  }
1✔
656

657
  @Override
658
  public void respondActivityTaskCompletedById(
659
      RespondActivityTaskCompletedByIdRequest completeRequest,
660
      StreamObserver<RespondActivityTaskCompletedByIdResponse> responseObserver) {
661
    try {
662
      ExecutionId executionId =
×
663
          new ExecutionId(
664
              completeRequest.getNamespace(),
×
665
              completeRequest.getWorkflowId(),
×
666
              completeRequest.getRunId());
×
667
      TestWorkflowMutableState mutableState = getMutableState(executionId);
×
668
      mutableState.completeActivityTaskById(completeRequest.getActivityId(), completeRequest);
×
669
      responseObserver.onNext(RespondActivityTaskCompletedByIdResponse.getDefaultInstance());
×
670
      responseObserver.onCompleted();
×
671
    } catch (StatusRuntimeException e) {
×
672
      handleStatusRuntimeException(e, responseObserver);
×
673
    }
×
674
  }
×
675

676
  @Override
677
  public void respondActivityTaskFailed(
678
      RespondActivityTaskFailedRequest failRequest,
679
      StreamObserver<RespondActivityTaskFailedResponse> responseObserver) {
680
    try {
681
      ActivityTaskToken activityTaskToken = ActivityTaskToken.fromBytes(failRequest.getTaskToken());
1✔
682
      TestWorkflowMutableState mutableState = getMutableState(activityTaskToken.getExecutionId());
1✔
683
      mutableState.failActivityTask(activityTaskToken.getScheduledEventId(), failRequest);
1✔
684
      responseObserver.onNext(RespondActivityTaskFailedResponse.getDefaultInstance());
1✔
685
      responseObserver.onCompleted();
1✔
686
    } catch (StatusRuntimeException e) {
1✔
687
      handleStatusRuntimeException(e, responseObserver);
1✔
688
    }
1✔
689
  }
1✔
690

691
  @Override
692
  public void respondActivityTaskFailedById(
693
      RespondActivityTaskFailedByIdRequest failRequest,
694
      StreamObserver<RespondActivityTaskFailedByIdResponse> responseObserver) {
695
    try {
696
      ExecutionId executionId =
×
697
          new ExecutionId(
698
              failRequest.getNamespace(), failRequest.getWorkflowId(), failRequest.getRunId());
×
699
      TestWorkflowMutableState mutableState = getMutableState(executionId);
×
700
      mutableState.failActivityTaskById(failRequest.getActivityId(), failRequest);
×
701
      responseObserver.onNext(RespondActivityTaskFailedByIdResponse.getDefaultInstance());
×
702
      responseObserver.onCompleted();
×
703
    } catch (StatusRuntimeException e) {
×
704
      handleStatusRuntimeException(e, responseObserver);
×
705
    }
×
706
  }
×
707

708
  @Override
709
  public void respondActivityTaskCanceled(
710
      RespondActivityTaskCanceledRequest canceledRequest,
711
      StreamObserver<RespondActivityTaskCanceledResponse> responseObserver) {
712
    try {
713
      ActivityTaskToken activityTaskToken =
1✔
714
          ActivityTaskToken.fromBytes(canceledRequest.getTaskToken());
1✔
715
      TestWorkflowMutableState mutableState = getMutableState(activityTaskToken.getExecutionId());
1✔
716
      mutableState.cancelActivityTask(activityTaskToken.getScheduledEventId(), canceledRequest);
1✔
717
      responseObserver.onNext(RespondActivityTaskCanceledResponse.getDefaultInstance());
1✔
718
      responseObserver.onCompleted();
1✔
UNCOV
719
    } catch (StatusRuntimeException e) {
×
UNCOV
720
      handleStatusRuntimeException(e, responseObserver);
×
721
    }
1✔
722
  }
1✔
723

724
  @Override
725
  public void respondActivityTaskCanceledById(
726
      RespondActivityTaskCanceledByIdRequest canceledRequest,
727
      StreamObserver<RespondActivityTaskCanceledByIdResponse> responseObserver) {
728
    try {
729
      ExecutionId executionId =
×
730
          new ExecutionId(
731
              canceledRequest.getNamespace(),
×
732
              canceledRequest.getWorkflowId(),
×
733
              canceledRequest.getRunId());
×
734
      TestWorkflowMutableState mutableState = getMutableState(executionId);
×
735
      mutableState.cancelActivityTaskById(canceledRequest.getActivityId(), canceledRequest);
×
736
      responseObserver.onNext(RespondActivityTaskCanceledByIdResponse.getDefaultInstance());
×
737
      responseObserver.onCompleted();
×
738
    } catch (StatusRuntimeException e) {
×
739
      handleStatusRuntimeException(e, responseObserver);
×
740
    }
×
741
  }
×
742

743
  @Override
744
  public void requestCancelWorkflowExecution(
745
      RequestCancelWorkflowExecutionRequest cancelRequest,
746
      StreamObserver<RequestCancelWorkflowExecutionResponse> responseObserver) {
747
    try {
748
      requestCancelWorkflowExecution(cancelRequest, Optional.empty());
1✔
749
      responseObserver.onNext(RequestCancelWorkflowExecutionResponse.getDefaultInstance());
1✔
750
      responseObserver.onCompleted();
1✔
751
    } catch (StatusRuntimeException e) {
1✔
752
      handleStatusRuntimeException(e, responseObserver);
1✔
753
    }
1✔
754
  }
1✔
755

756
  void requestCancelWorkflowExecution(
757
      RequestCancelWorkflowExecutionRequest cancelRequest,
758
      Optional<TestWorkflowMutableStateImpl.CancelExternalWorkflowExecutionCallerInfo> callerInfo) {
759
    ExecutionId executionId =
1✔
760
        new ExecutionId(cancelRequest.getNamespace(), cancelRequest.getWorkflowExecution());
1✔
761
    TestWorkflowMutableState mutableState = getMutableState(executionId);
1✔
762
    mutableState.requestCancelWorkflowExecution(cancelRequest, callerInfo);
1✔
763
  }
1✔
764

765
  @Override
766
  public void terminateWorkflowExecution(
767
      TerminateWorkflowExecutionRequest request,
768
      StreamObserver<TerminateWorkflowExecutionResponse> responseObserver) {
769
    try {
770
      terminateWorkflowExecution(request);
1✔
771
      responseObserver.onNext(TerminateWorkflowExecutionResponse.getDefaultInstance());
1✔
772
      responseObserver.onCompleted();
1✔
773
    } catch (StatusRuntimeException e) {
1✔
774
      handleStatusRuntimeException(e, responseObserver);
1✔
775
    }
1✔
776
  }
1✔
777

778
  private void terminateWorkflowExecution(TerminateWorkflowExecutionRequest request) {
779
    ExecutionId executionId =
1✔
780
        new ExecutionId(request.getNamespace(), request.getWorkflowExecution());
1✔
781
    TestWorkflowMutableState mutableState = getMutableState(executionId);
1✔
782
    mutableState.terminateWorkflowExecution(request);
1✔
783
  }
1✔
784

785
  @Override
786
  public void signalWorkflowExecution(
787
      SignalWorkflowExecutionRequest signalRequest,
788
      StreamObserver<SignalWorkflowExecutionResponse> responseObserver) {
789
    try {
790
      ExecutionId executionId =
1✔
791
          new ExecutionId(signalRequest.getNamespace(), signalRequest.getWorkflowExecution());
1✔
792
      TestWorkflowMutableState mutableState = getMutableState(executionId);
1✔
793
      mutableState.signal(signalRequest);
1✔
794
      responseObserver.onNext(SignalWorkflowExecutionResponse.getDefaultInstance());
1✔
795
      responseObserver.onCompleted();
1✔
796
    } catch (StatusRuntimeException e) {
1✔
797
      handleStatusRuntimeException(e, responseObserver);
1✔
798
    }
1✔
799
  }
1✔
800

801
  @Override
802
  public void updateWorkflowExecution(
803
      UpdateWorkflowExecutionRequest request,
804
      StreamObserver<UpdateWorkflowExecutionResponse> responseObserver) {
805
    try {
806
      ExecutionId executionId =
1✔
807
          new ExecutionId(request.getNamespace(), request.getWorkflowExecution());
1✔
808
      TestWorkflowMutableState mutableState = getMutableState(executionId);
1✔
809
      @Nullable Deadline deadline = Context.current().getDeadline();
1✔
810
      UpdateWorkflowExecutionResponse response =
1✔
811
          mutableState.updateWorkflowExecution(request, deadline);
1✔
812
      responseObserver.onNext(response);
1✔
813
      responseObserver.onCompleted();
1✔
814
    } catch (StatusRuntimeException e) {
1✔
815
      handleStatusRuntimeException(e, responseObserver);
1✔
816
    }
1✔
817
  }
1✔
818

819
  @Override
820
  public void pollWorkflowExecutionUpdate(
821
      PollWorkflowExecutionUpdateRequest request,
822
      StreamObserver<PollWorkflowExecutionUpdateResponse> responseObserver) {
823
    try {
824
      ExecutionId executionId =
1✔
825
          new ExecutionId(request.getNamespace(), request.getUpdateRef().getWorkflowExecution());
1✔
826
      TestWorkflowMutableState mutableState = getMutableState(executionId);
1✔
827
      @Nullable Deadline deadline = Context.current().getDeadline();
1✔
828
      PollWorkflowExecutionUpdateResponse response =
1✔
829
          mutableState.pollUpdateWorkflowExecution(request, deadline);
1✔
830
      responseObserver.onNext(response);
1✔
831
      responseObserver.onCompleted();
1✔
832
    } catch (StatusRuntimeException e) {
1✔
833
      handleStatusRuntimeException(e, responseObserver);
1✔
834
    }
1✔
835
  }
1✔
836

837
  @Override
838
  public void signalWithStartWorkflowExecution(
839
      SignalWithStartWorkflowExecutionRequest r,
840
      StreamObserver<SignalWithStartWorkflowExecutionResponse> responseObserver) {
841
    try {
842
      if (!r.hasTaskQueue()) {
1✔
843
        throw Status.INVALID_ARGUMENT
×
844
            .withDescription("request missing required taskQueue field")
×
845
            .asRuntimeException();
×
846
      }
847
      if (!r.hasWorkflowType()) {
1✔
848
        throw Status.INVALID_ARGUMENT
×
849
            .withDescription("request missing required workflowType field")
×
850
            .asRuntimeException();
×
851
      }
852
      ExecutionId executionId = new ExecutionId(r.getNamespace(), r.getWorkflowId(), null);
1✔
853
      TestWorkflowMutableState mutableState = getMutableState(executionId, false);
1✔
854
      SignalWorkflowExecutionRequest signalRequest =
855
          SignalWorkflowExecutionRequest.newBuilder()
1✔
856
              .setInput(r.getSignalInput())
1✔
857
              .setSignalName(r.getSignalName())
1✔
858
              .setWorkflowExecution(executionId.getExecution())
1✔
859
              .setRequestId(r.getRequestId())
1✔
860
              .setControl(r.getControl())
1✔
861
              .setNamespace(r.getNamespace())
1✔
862
              .setIdentity(r.getIdentity())
1✔
863
              .build();
1✔
864
      if (mutableState != null && !mutableState.isTerminalState()) {
1✔
865
        mutableState.signal(signalRequest);
1✔
866
        responseObserver.onNext(
1✔
867
            SignalWithStartWorkflowExecutionResponse.newBuilder()
1✔
868
                .setRunId(mutableState.getExecutionId().getExecution().getRunId())
1✔
869
                .build());
1✔
870
        responseObserver.onCompleted();
1✔
871
        return;
1✔
872
      }
873
      StartWorkflowExecutionRequest.Builder startRequest =
874
          StartWorkflowExecutionRequest.newBuilder()
1✔
875
              .setRequestId(r.getRequestId())
1✔
876
              .setInput(r.getInput())
1✔
877
              .setWorkflowExecutionTimeout(r.getWorkflowExecutionTimeout())
1✔
878
              .setWorkflowRunTimeout(r.getWorkflowRunTimeout())
1✔
879
              .setWorkflowTaskTimeout(r.getWorkflowTaskTimeout())
1✔
880
              .setNamespace(r.getNamespace())
1✔
881
              .setTaskQueue(r.getTaskQueue())
1✔
882
              .setWorkflowId(r.getWorkflowId())
1✔
883
              .setWorkflowIdReusePolicy(r.getWorkflowIdReusePolicy())
1✔
884
              .setIdentity(r.getIdentity())
1✔
885
              .setWorkflowType(r.getWorkflowType())
1✔
886
              .setCronSchedule(r.getCronSchedule())
1✔
887
              .setRequestId(r.getRequestId());
1✔
888
      if (r.hasRetryPolicy()) {
1✔
889
        startRequest.setRetryPolicy(r.getRetryPolicy());
×
890
      }
891
      if (r.hasHeader()) {
1✔
892
        startRequest.setHeader(r.getHeader());
1✔
893
      }
894
      if (r.hasMemo()) {
1✔
895
        startRequest.setMemo(r.getMemo());
×
896
      }
897
      if (r.hasSearchAttributes()) {
1✔
898
        startRequest.setSearchAttributes(r.getSearchAttributes());
1✔
899
      }
900
      if (r.hasWorkflowStartDelay()) {
1✔
901
        startRequest.setWorkflowStartDelay(r.getWorkflowStartDelay());
1✔
902
      }
903

904
      StartWorkflowExecutionResponse startResult =
1✔
905
          startWorkflowExecutionImpl(
1✔
906
              startRequest.build(),
1✔
907
              Duration.ZERO,
908
              Optional.empty(),
1✔
909
              OptionalLong.empty(),
1✔
910
              signalRequest);
911
      responseObserver.onNext(
1✔
912
          SignalWithStartWorkflowExecutionResponse.newBuilder()
1✔
913
              .setRunId(startResult.getRunId())
1✔
914
              .build());
1✔
915
      responseObserver.onCompleted();
1✔
916
    } catch (StatusRuntimeException e) {
1✔
917
      handleStatusRuntimeException(e, responseObserver);
1✔
918
    }
1✔
919
  }
1✔
920

921
  public void signalExternalWorkflowExecution(
922
      String signalId,
923
      SignalExternalWorkflowExecutionCommandAttributes commandAttributes,
924
      TestWorkflowMutableState source) {
925
    String namespace;
926
    if (commandAttributes.getNamespace().isEmpty()) {
1✔
927
      namespace = source.getExecutionId().getNamespace();
1✔
928
    } else {
929
      namespace = commandAttributes.getNamespace();
×
930
    }
931
    ExecutionId executionId = new ExecutionId(namespace, commandAttributes.getExecution());
1✔
932
    TestWorkflowMutableState mutableState;
933
    try {
934
      mutableState = getMutableState(executionId);
1✔
935
      mutableState.signalFromWorkflow(commandAttributes);
1✔
936
      source.completeSignalExternalWorkflowExecution(
1✔
937
          signalId, mutableState.getExecutionId().getExecution().getRunId());
1✔
938
    } catch (StatusRuntimeException e) {
1✔
939
      if (e.getStatus().getCode() == Status.Code.NOT_FOUND) {
1✔
940
        source.failSignalExternalWorkflowExecution(
1✔
941
            signalId,
942
            SignalExternalWorkflowExecutionFailedCause
943
                .SIGNAL_EXTERNAL_WORKFLOW_EXECUTION_FAILED_CAUSE_EXTERNAL_WORKFLOW_EXECUTION_NOT_FOUND);
944
      } else {
945
        throw e;
×
946
      }
947
    }
1✔
948
  }
1✔
949

950
  /**
951
   * Creates next run of a workflow execution
952
   *
953
   * @return RunId
954
   */
955
  public String continueAsNew(
956
      StartWorkflowExecutionRequest previousRunStartRequest,
957
      ContinueAsNewWorkflowExecutionCommandAttributes ca,
958
      WorkflowExecutionContinuedAsNewEventAttributes ea,
959
      Optional<TestServiceRetryState> retryState,
960
      String identity,
961
      ExecutionId continuedExecutionId,
962
      String firstExecutionRunId,
963
      Optional<TestWorkflowMutableState> parent,
964
      OptionalLong parentChildInitiatedEventId) {
965
    StartWorkflowExecutionRequest.Builder startRequestBuilder =
966
        StartWorkflowExecutionRequest.newBuilder()
1✔
967
            .setRequestId(UUID.randomUUID().toString())
1✔
968
            .setWorkflowType(ea.getWorkflowType())
1✔
969
            .setWorkflowRunTimeout(ea.getWorkflowRunTimeout())
1✔
970
            .setWorkflowTaskTimeout(ea.getWorkflowTaskTimeout())
1✔
971
            .setNamespace(continuedExecutionId.getNamespace())
1✔
972
            .setTaskQueue(ea.getTaskQueue())
1✔
973
            .setWorkflowId(continuedExecutionId.getWorkflowId().getWorkflowId())
1✔
974
            .setWorkflowIdReusePolicy(previousRunStartRequest.getWorkflowIdReusePolicy())
1✔
975
            .setIdentity(identity)
1✔
976
            .setCronSchedule(previousRunStartRequest.getCronSchedule());
1✔
977
    // TODO: Service doesn't perform this copy.
978
    // See https://github.com/temporalio/temporal/issues/5249
979
    //    if (previousRunStartRequest.hasRetryPolicy()) {
980
    //      startRequestBuilder.setRetryPolicy(previousRunStartRequest.getRetryPolicy());
981
    //    }
982
    if (ca.hasRetryPolicy()) {
1✔
983
      startRequestBuilder.setRetryPolicy(ca.getRetryPolicy());
1✔
984
    }
985
    if (ea.hasInput()) {
1✔
986
      startRequestBuilder.setInput(ea.getInput());
1✔
987
    }
988
    if (ea.hasHeader()) {
1✔
989
      startRequestBuilder.setHeader(ea.getHeader());
1✔
990
    }
991
    StartWorkflowExecutionRequest startRequest = startRequestBuilder.build();
1✔
992
    lock.lock();
1✔
993
    Optional<Failure> lastFail =
994
        ea.hasFailure()
1✔
995
            ? Optional.of(ea.getFailure())
1✔
996
            : retryState.flatMap(TestServiceRetryState::getPreviousRunFailure);
1✔
997
    try {
998
      StartWorkflowExecutionResponse response =
1✔
999
          startWorkflowExecutionNoRunningCheckLocked(
1✔
1000
              startRequest,
1001
              ea.getNewExecutionRunId(),
1✔
1002
              firstExecutionRunId,
1003
              Optional.of(continuedExecutionId.getExecution().getRunId()),
1✔
1004
              retryState,
1005
              ProtobufTimeUtils.toJavaDuration(ea.getBackoffStartInterval()),
1✔
1006
              ea.getLastCompletionResult(),
1✔
1007
              lastFail,
1008
              parent,
1009
              parentChildInitiatedEventId,
1010
              null,
1011
              continuedExecutionId.getWorkflowId());
1✔
1012
      return response.getRunId();
1✔
1013
    } finally {
1014
      lock.unlock();
1✔
1015
    }
1016
  }
1017

1018
  @Override
1019
  public void listOpenWorkflowExecutions(
1020
      ListOpenWorkflowExecutionsRequest listRequest,
1021
      StreamObserver<ListOpenWorkflowExecutionsResponse> responseObserver) {
1022
    try {
1023
      Optional<String> workflowIdFilter;
1024
      if (listRequest.hasExecutionFilter()
1✔
1025
          && !listRequest.getExecutionFilter().getWorkflowId().isEmpty()) {
×
1026
        workflowIdFilter = Optional.of(listRequest.getExecutionFilter().getWorkflowId());
×
1027
      } else {
1028
        workflowIdFilter = Optional.empty();
1✔
1029
      }
1030
      List<WorkflowExecutionInfo> result =
1✔
1031
          store.listWorkflows(WorkflowState.OPEN, workflowIdFilter);
1✔
1032
      responseObserver.onNext(
1✔
1033
          ListOpenWorkflowExecutionsResponse.newBuilder().addAllExecutions(result).build());
1✔
1034
      responseObserver.onCompleted();
1✔
1035
    } catch (StatusRuntimeException e) {
×
1036
      handleStatusRuntimeException(e, responseObserver);
×
1037
    }
1✔
1038
  }
1✔
1039

1040
  @Override
1041
  public void listClosedWorkflowExecutions(
1042
      ListClosedWorkflowExecutionsRequest listRequest,
1043
      StreamObserver<ListClosedWorkflowExecutionsResponse> responseObserver) {
1044
    try {
1045
      Optional<String> workflowIdFilter;
1046
      if (listRequest.hasExecutionFilter()
1✔
1047
          && !listRequest.getExecutionFilter().getWorkflowId().isEmpty()) {
×
1048
        workflowIdFilter = Optional.of(listRequest.getExecutionFilter().getWorkflowId());
×
1049
      } else {
1050
        workflowIdFilter = Optional.empty();
1✔
1051
      }
1052
      List<WorkflowExecutionInfo> result =
1✔
1053
          store.listWorkflows(WorkflowState.CLOSED, workflowIdFilter);
1✔
1054
      responseObserver.onNext(
1✔
1055
          ListClosedWorkflowExecutionsResponse.newBuilder().addAllExecutions(result).build());
1✔
1056
      responseObserver.onCompleted();
1✔
1057
    } catch (StatusRuntimeException e) {
×
1058
      handleStatusRuntimeException(e, responseObserver);
×
1059
    }
1✔
1060
  }
1✔
1061

1062
  @Override
1063
  public void respondQueryTaskCompleted(
1064
      RespondQueryTaskCompletedRequest completeRequest,
1065
      StreamObserver<RespondQueryTaskCompletedResponse> responseObserver) {
1066
    try {
1067
      QueryId queryId = QueryId.fromBytes(completeRequest.getTaskToken());
1✔
1068
      TestWorkflowMutableState mutableState = getMutableState(queryId.getExecutionId());
1✔
1069
      mutableState.completeQuery(queryId, completeRequest);
1✔
1070
      responseObserver.onNext(RespondQueryTaskCompletedResponse.getDefaultInstance());
1✔
1071
      responseObserver.onCompleted();
1✔
1072
    } catch (StatusRuntimeException e) {
×
1073
      handleStatusRuntimeException(e, responseObserver);
×
1074
    }
1✔
1075
  }
1✔
1076

1077
  @Override
1078
  public void queryWorkflow(
1079
      QueryWorkflowRequest queryRequest, StreamObserver<QueryWorkflowResponse> responseObserver) {
1080
    try {
1081
      ExecutionId executionId =
1✔
1082
          new ExecutionId(queryRequest.getNamespace(), queryRequest.getExecution());
1✔
1083
      TestWorkflowMutableState mutableState = getMutableState(executionId);
1✔
1084
      @Nullable Deadline deadline = Context.current().getDeadline();
1✔
1085
      QueryWorkflowResponse result =
1✔
1086
          mutableState.query(
1✔
1087
              queryRequest,
1088
              deadline != null ? deadline.timeRemaining(TimeUnit.MILLISECONDS) : Long.MAX_VALUE);
1✔
1089
      responseObserver.onNext(result);
1✔
1090
      responseObserver.onCompleted();
1✔
1091
    } catch (StatusRuntimeException e) {
1✔
1092
      handleStatusRuntimeException(e, responseObserver);
1✔
1093
    }
1✔
1094
  }
1✔
1095

1096
  @Override
1097
  public void describeWorkflowExecution(
1098
      DescribeWorkflowExecutionRequest request,
1099
      StreamObserver<io.temporal.api.workflowservice.v1.DescribeWorkflowExecutionResponse>
1100
          responseObserver) {
1101
    try {
1102
      if (request.getNamespace().isEmpty()) {
1✔
1103
        throw createInvalidArgument("Namespace not set on request.");
×
1104
      }
1105
      if (!request.hasExecution()) {
1✔
1106
        throw createInvalidArgument("Execution not set on request.");
×
1107
      }
1108

1109
      ExecutionId executionId = new ExecutionId(request.getNamespace(), request.getExecution());
1✔
1110
      TestWorkflowMutableState mutableState = getMutableState(executionId);
1✔
1111
      DescribeWorkflowExecutionResponse result = mutableState.describeWorkflowExecution();
1✔
1112
      responseObserver.onNext(result);
1✔
1113
      responseObserver.onCompleted();
1✔
1114
    } catch (StatusRuntimeException e) {
1✔
1115
      handleStatusRuntimeException(e, responseObserver);
1✔
1116
    }
1✔
1117
  }
1✔
1118

1119
  /**
1120
   * This method doesn't make much sense for test server, it accepts all namespaces as existent and
1121
   * registered. so, it's a trivial implementation just returning an info that a namespace is
1122
   * registered irrespectively of the input
1123
   */
1124
  @Override
1125
  public void describeNamespace(
1126
      DescribeNamespaceRequest request,
1127
      StreamObserver<DescribeNamespaceResponse> responseObserver) {
1128
    try {
1129
      if (request.getNamespace().isEmpty()) {
1✔
1130
        throw createInvalidArgument("Namespace not set on request.");
×
1131
      }
1132
      // generating a stable UUID for name
1133
      String namespaceId = UUID.nameUUIDFromBytes(request.getNamespace().getBytes()).toString();
1✔
1134
      DescribeNamespaceResponse result =
1135
          DescribeNamespaceResponse.newBuilder()
1✔
1136
              .setNamespaceInfo(
1✔
1137
                  NamespaceInfo.newBuilder()
1✔
1138
                      .setName(request.getNamespace())
1✔
1139
                      .setState(NamespaceState.NAMESPACE_STATE_REGISTERED)
1✔
1140
                      .setId(namespaceId)
1✔
1141
                      .build())
1✔
1142
              .build();
1✔
1143
      responseObserver.onNext(result);
1✔
1144
      responseObserver.onCompleted();
1✔
1145
    } catch (StatusRuntimeException e) {
1✔
1146
      handleStatusRuntimeException(e, responseObserver);
1✔
1147
    }
1✔
1148
  }
1✔
1149

1150
  private <R> R requireNotNull(String fieldName, R value) {
1151
    if (value == null) {
1✔
1152
      throw Status.INVALID_ARGUMENT
×
1153
          .withDescription("Missing required field \"" + fieldName + "\".")
×
1154
          .asRuntimeException();
×
1155
    }
1156
    return value;
1✔
1157
  }
1158

1159
  /**
1160
   * Adds diagnostic data about internal service state to the provided {@link StringBuilder}.
1161
   * Includes histories of all workflow instances stored in the service.
1162
   */
1163
  public void getDiagnostics(StringBuilder result) {
1164
    store.getDiagnostics(result);
×
1165
  }
×
1166

1167
  /**
1168
   * @deprecated use {@link io.temporal.serviceclient.TestServiceStubs} and {@link
1169
   *     io.temporal.api.testservice.v1.TestServiceGrpc.TestServiceBlockingStub#getCurrentTime(Empty)}
1170
   */
1171
  @Deprecated
1172
  public long currentTimeMillis() {
1173
    return selfAdvancingTimer.getClock().getAsLong();
×
1174
  }
1175

1176
  /** Invokes callback after the specified delay according to internal service clock. */
1177
  public void registerDelayedCallback(Duration delay, Runnable r) {
1178
    store.registerDelayedCallback(delay, r);
1✔
1179
  }
1✔
1180

1181
  /**
1182
   * Disables time skipping. To re-enable call {@link #unlockTimeSkipping(String)}. These calls are
1183
   * counted, so calling unlock does not guarantee that time is going to be skipped immediately as
1184
   * another lock can be holding it.
1185
   *
1186
   * @deprecated use {@link io.temporal.serviceclient.TestServiceStubs} and {@link
1187
   *     io.temporal.api.testservice.v1.TestServiceGrpc.TestServiceBlockingStub#lockTimeSkipping(LockTimeSkippingRequest)}
1188
   */
1189
  @Deprecated
1190
  public void lockTimeSkipping(String caller) {
1191
    selfAdvancingTimer.lockTimeSkipping(caller);
×
1192
  }
×
1193

1194
  /**
1195
   * @deprecated use {@link io.temporal.serviceclient.TestServiceStubs} and {@link
1196
   *     io.temporal.api.testservice.v1.TestServiceGrpc.TestServiceBlockingStub#unlockTimeSkipping(UnlockTimeSkippingRequest)}
1197
   */
1198
  @Deprecated
1199
  public void unlockTimeSkipping(String caller) {
1200
    selfAdvancingTimer.unlockTimeSkipping(caller);
×
1201
  }
×
1202

1203
  /**
1204
   * Unlocks time skipping and blocks the calling thread until internal clock passes the current +
1205
   * duration time.<br>
1206
   * When the time is reached, locks time skipping and returns.<br>
1207
   * Might not block at all due to time skipping. Or might block if the time skipping lock counter
1208
   * was more than 1.
1209
   *
1210
   * @deprecated use {@link io.temporal.serviceclient.TestServiceStubs} and {@link
1211
   *     io.temporal.api.testservice.v1.TestServiceGrpc.TestServiceBlockingStub#unlockTimeSkippingWithSleep(SleepRequest)}
1212
   */
1213
  @Deprecated
1214
  public void sleep(Duration duration) {
1215
    CompletableFuture<Void> result = new CompletableFuture<>();
×
1216
    selfAdvancingTimer.schedule(
×
1217
        duration,
1218
        () -> {
1219
          selfAdvancingTimer.lockTimeSkipping("TestWorkflowService sleep");
×
1220
          result.complete(null);
×
1221
        },
×
1222
        "workflow sleep");
1223
    selfAdvancingTimer.unlockTimeSkipping("TestWorkflowService sleep");
×
1224
    try {
1225
      result.get();
×
1226
    } catch (InterruptedException e) {
×
1227
      Thread.currentThread().interrupt();
×
1228
      throw new RuntimeException(e);
×
1229
    } catch (ExecutionException e) {
×
1230
      throw new RuntimeException(e);
×
1231
    }
×
1232
  }
×
1233

1234
  /**
1235
   * Temporal server times out task queue long poll calls after 1 minute and returns an empty
1236
   * result. After which the request has to be retried by the client if it wants to continue
1237
   * waiting. We emulate this behavior here.
1238
   *
1239
   * <p>If there is a deadline present, for task queue poll requests server will respond inside the
1240
   * deadline. Note that the latest is not applicable for getWorkflowExecutionHistory() long polls.
1241
   *
1242
   * @return minimum between the context deadline and maximum long poll deadline.
1243
   */
1244
  private Deadline getLongPollDeadline() {
1245
    @Nullable Deadline deadline = Context.current().getDeadline();
1✔
1246
    Deadline maximumDeadline =
1✔
1247
        Deadline.after(
1✔
1248
            WorkflowServiceStubsOptions.DEFAULT_SERVER_LONG_POLL_RPC_TIMEOUT.toMillis(),
1✔
1249
            TimeUnit.MILLISECONDS);
1250
    return deadline != null ? deadline.minimum(maximumDeadline) : maximumDeadline;
1✔
1251
  }
1252

1253
  private void handleStatusRuntimeException(
1254
      StatusRuntimeException e, StreamObserver<?> responseObserver) {
1255
    if (e.getStatus().getCode() == Status.Code.INTERNAL) {
1✔
1256
      log.error("unexpected", e);
1✔
1257
    }
1258
    responseObserver.onError(e);
1✔
1259
  }
1✔
1260

1261
  /**
1262
   * Creates an in-memory service along with client stubs for use in Java code. See also
1263
   * createServerOnly and createWithNoGrpcServer.
1264
   *
1265
   * @deprecated use {@link io.temporal.testserver.TestServer#createServer(boolean)} instead and
1266
   *     pass {@code lockTimeSkipping=false} to emulate the behavior of this method
1267
   */
1268
  @Deprecated
1269
  public TestWorkflowService() {
1270
    this(0, true);
×
1271
  }
×
1272

1273
  /**
1274
   * Creates an in-memory service along with client stubs for use in Java code. See also
1275
   * createServerOnly and createWithNoGrpcServer.
1276
   *
1277
   * @deprecated use {@link io.temporal.testserver.TestServer#createServer(boolean, long)} instead
1278
   *     and pass {@code lockTimeSkipping=false} to emulate the behavior of this method
1279
   */
1280
  @Deprecated
1281
  public TestWorkflowService(long initialTimeMillis) {
1282
    this(initialTimeMillis, true);
×
1283
  }
×
1284

1285
  /**
1286
   * Creates an in-memory service along with client stubs for use in Java code. See also
1287
   * createServerOnly and createWithNoGrpcServer.
1288
   *
1289
   * @deprecated use {@link io.temporal.testserver.TestServer#createServer(boolean)} instead
1290
   */
1291
  @Deprecated
1292
  public TestWorkflowService(boolean lockTimeSkipping) {
1293
    this(0, true);
×
1294
    if (lockTimeSkipping) {
×
1295
      this.lockTimeSkipping("constructor");
×
1296
    }
1297
  }
×
1298

1299
  /**
1300
   * Creates an instance of TestWorkflowService that does not manage its own gRPC server. Useful for
1301
   * including in an externally managed gRPC server.
1302
   *
1303
   * @deprecated use {@link TestServicesStarter} to create just the services with gRPC server
1304
   */
1305
  @Deprecated
1306
  public static TestWorkflowService createWithNoGrpcServer() {
1307
    return new TestWorkflowService(0, false);
×
1308
  }
1309

1310
  private TestWorkflowService(long initialTimeMillis, boolean startInProcessServer) {
×
1311
    this.selfAdvancingTimer =
×
1312
        new SelfAdvancingTimerImpl(initialTimeMillis, Clock.systemDefaultZone());
×
1313
    store = new TestWorkflowStoreImpl(this.selfAdvancingTimer);
×
1314
    visibilityStore = new TestVisibilityStoreImpl();
×
1315
    outOfProcessServer = null;
×
1316
    if (startInProcessServer) {
×
1317
      this.inProcessServer = new InProcessGRPCServer(Collections.singletonList(this));
×
1318
      this.workflowServiceStubs =
×
1319
          WorkflowServiceStubs.newServiceStubs(
×
1320
              WorkflowServiceStubsOptions.newBuilder()
×
1321
                  .setChannel(inProcessServer.getChannel())
×
1322
                  .build());
×
1323
    } else {
1324
      this.inProcessServer = null;
×
1325
      this.workflowServiceStubs = null;
×
1326
    }
1327
  }
×
1328

1329
  /**
1330
   * Creates an out-of-process rather than in-process server, and does not set up a client. Useful,
1331
   * for example, if you want to use the test service from other SDKs.
1332
   *
1333
   * @param port the port to listen on
1334
   * @deprecated use {@link io.temporal.testserver.TestServer#createPortBoundServer(int, boolean)}
1335
   *     instead and pass {@code lockTimeSkipping=false} to emulate the behavior of this method
1336
   */
1337
  @Deprecated
1338
  public static TestWorkflowService createServerOnly(int port) {
1339
    TestWorkflowService result = new TestWorkflowService(true, port);
×
1340
    log.info("Server started, listening on " + port);
×
1341
    return result;
×
1342
  }
1343

1344
  private TestWorkflowService(boolean isOutOfProc, int port) {
×
1345
    // isOutOfProc is just here to make unambiguous constructor overloading.
1346
    Preconditions.checkState(isOutOfProc, "Impossible.");
×
1347
    inProcessServer = null;
×
1348
    workflowServiceStubs = null;
×
1349
    this.selfAdvancingTimer = new SelfAdvancingTimerImpl(0, Clock.systemDefaultZone());
×
1350
    store = new TestWorkflowStoreImpl(selfAdvancingTimer);
×
1351
    visibilityStore = new TestVisibilityStoreImpl();
×
1352
    try {
1353
      ServerBuilder<?> serverBuilder =
×
1354
          Grpc.newServerBuilderForPort(port, InsecureServerCredentials.create());
×
1355
      GRPCServerHelper.registerServicesAndHealthChecks(
×
1356
          Collections.singletonList(this), serverBuilder);
×
1357
      outOfProcessServer = serverBuilder.build().start();
×
1358
    } catch (IOException e) {
×
1359
      throw new RuntimeException(e);
×
1360
    }
×
1361
  }
×
1362

1363
  @Deprecated
1364
  public WorkflowServiceStubs newClientStub() {
1365
    if (workflowServiceStubs == null) {
×
1366
      throw new RuntimeException(
×
1367
          "Cannot get a client when you created your TestWorkflowService with createServerOnly.");
1368
    }
1369
    return workflowServiceStubs;
×
1370
  }
1371

1372
  private static StatusRuntimeException createInvalidArgument(String description) {
1373
    throw Status.INVALID_ARGUMENT.withDescription(description).asRuntimeException();
1✔
1374
  }
1375
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc