• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

temporalio / sdk-java / #302

15 Aug 2024 04:56PM UTC coverage: 77.456% (-0.3%) from 77.71%
#302

push

github

web-flow
Implement test server support for sync Nexus operation commands (#2176)

* Implement test server support for sync Nexus operations

* Nexus operations command implementations

* test cleanup

* cleanup

* tests

334 of 524 new or added lines in 9 files covered. (63.74%)

9 existing lines in 3 files now uncovered.

20298 of 26206 relevant lines covered (77.46%)

0.77 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

70.47
/temporal-test-server/src/main/java/io/temporal/internal/testservice/TestWorkflowService.java
1
/*
2
 * Copyright (C) 2022 Temporal Technologies, Inc. All Rights Reserved.
3
 *
4
 * Copyright (C) 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved.
5
 *
6
 * Modifications copyright (C) 2017 Uber Technologies, Inc.
7
 *
8
 * Licensed under the Apache License, Version 2.0 (the "License");
9
 * you may not use this material except in compliance with the License.
10
 * You may obtain a copy of the License at
11
 *
12
 *   http://www.apache.org/licenses/LICENSE-2.0
13
 *
14
 * Unless required by applicable law or agreed to in writing, software
15
 * distributed under the License is distributed on an "AS IS" BASIS,
16
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17
 * See the License for the specific language governing permissions and
18
 * limitations under the License.
19
 */
20

21
package io.temporal.internal.testservice;
22

23
import static io.temporal.internal.testservice.CronUtils.getBackoffInterval;
24

25
import com.google.common.base.Preconditions;
26
import com.google.common.base.Throwables;
27
import com.google.protobuf.ByteString;
28
import com.google.protobuf.Empty;
29
import com.google.protobuf.Timestamp;
30
import com.google.protobuf.util.Timestamps;
31
import io.grpc.*;
32
import io.grpc.stub.StreamObserver;
33
import io.temporal.api.command.v1.ContinueAsNewWorkflowExecutionCommandAttributes;
34
import io.temporal.api.command.v1.SignalExternalWorkflowExecutionCommandAttributes;
35
import io.temporal.api.common.v1.Payload;
36
import io.temporal.api.common.v1.Payloads;
37
import io.temporal.api.common.v1.RetryPolicy;
38
import io.temporal.api.common.v1.WorkflowExecution;
39
import io.temporal.api.enums.v1.*;
40
import io.temporal.api.errordetails.v1.WorkflowExecutionAlreadyStartedFailure;
41
import io.temporal.api.failure.v1.Failure;
42
import io.temporal.api.history.v1.WorkflowExecutionContinuedAsNewEventAttributes;
43
import io.temporal.api.namespace.v1.NamespaceInfo;
44
import io.temporal.api.testservice.v1.LockTimeSkippingRequest;
45
import io.temporal.api.testservice.v1.SleepRequest;
46
import io.temporal.api.testservice.v1.UnlockTimeSkippingRequest;
47
import io.temporal.api.workflow.v1.WorkflowExecutionInfo;
48
import io.temporal.api.workflowservice.v1.*;
49
import io.temporal.internal.common.ProtobufTimeUtils;
50
import io.temporal.internal.testservice.TestWorkflowStore.WorkflowState;
51
import io.temporal.serviceclient.StatusUtils;
52
import io.temporal.serviceclient.WorkflowServiceStubs;
53
import io.temporal.serviceclient.WorkflowServiceStubsOptions;
54
import java.io.Closeable;
55
import java.io.IOException;
56
import java.time.Clock;
57
import java.time.Duration;
58
import java.util.*;
59
import java.util.concurrent.*;
60
import java.util.concurrent.locks.Lock;
61
import java.util.concurrent.locks.ReentrantLock;
62
import javax.annotation.Nonnull;
63
import javax.annotation.Nullable;
64
import org.slf4j.Logger;
65
import org.slf4j.LoggerFactory;
66

67
/**
68
 * In memory implementation of the Workflow Service. To be used for testing purposes only.
69
 *
70
 * <p>Do not use directly, instead use {@link io.temporal.testing.TestWorkflowEnvironment}.
71
 */
72
public final class TestWorkflowService extends WorkflowServiceGrpc.WorkflowServiceImplBase
73
    implements Closeable {
74
  private static final Logger log = LoggerFactory.getLogger(TestWorkflowService.class);
1✔
75
  private final Map<ExecutionId, TestWorkflowMutableState> executions = new HashMap<>();
1✔
76
  // key->WorkflowId
77
  private final Map<WorkflowId, TestWorkflowMutableState> executionsByWorkflowId = new HashMap<>();
1✔
78
  private final ExecutorService executor = Executors.newCachedThreadPool();
1✔
79
  private final Lock lock = new ReentrantLock();
1✔
80

81
  private final TestWorkflowStore store;
82
  private final TestVisibilityStore visibilityStore;
83
  private final TestNexusEndpointStore nexusEndpointStore;
84
  private final SelfAdvancingTimer selfAdvancingTimer;
85

86
  private final ScheduledExecutorService backgroundScheduler =
1✔
87
      Executors.newSingleThreadScheduledExecutor();
1✔
88

89
  private final Server outOfProcessServer;
90
  private final InProcessGRPCServer inProcessServer;
91
  private final WorkflowServiceStubs workflowServiceStubs;
92

93
  TestWorkflowService(
94
      TestWorkflowStore store,
95
      TestVisibilityStore visibilityStore,
96
      TestNexusEndpointStore nexusEndpointStore,
97
      SelfAdvancingTimer selfAdvancingTimer) {
1✔
98
    this.store = store;
1✔
99
    this.visibilityStore = visibilityStore;
1✔
100
    this.nexusEndpointStore = nexusEndpointStore;
1✔
101
    this.selfAdvancingTimer = selfAdvancingTimer;
1✔
102
    this.outOfProcessServer = null;
1✔
103
    this.inProcessServer = null;
1✔
104
    this.workflowServiceStubs = null;
1✔
105
  }
1✔
106

107
  @Override
108
  public void close() {
109
    log.debug("Shutting down TestWorkflowService");
1✔
110

111
    log.debug("Shutting down background scheduler");
1✔
112
    backgroundScheduler.shutdown();
1✔
113

114
    if (outOfProcessServer != null) {
1✔
115
      log.info("Shutting down out-of-process GRPC server");
×
116
      outOfProcessServer.shutdown();
×
117
    }
118

119
    if (workflowServiceStubs != null) {
1✔
120
      workflowServiceStubs.shutdown();
×
121
    }
122

123
    if (inProcessServer != null) {
1✔
124
      log.info("Shutting down in-process GRPC server");
×
125
      inProcessServer.shutdown();
×
126
    }
127

128
    executor.shutdown();
1✔
129

130
    try {
131
      executor.awaitTermination(1, TimeUnit.SECONDS);
1✔
132

133
      if (outOfProcessServer != null) {
1✔
134
        outOfProcessServer.awaitTermination(1, TimeUnit.SECONDS);
×
135
      }
136

137
      if (workflowServiceStubs != null) {
1✔
138
        workflowServiceStubs.awaitTermination(1, TimeUnit.SECONDS);
×
139
      }
140

141
      if (inProcessServer != null) {
1✔
142
        inProcessServer.awaitTermination(1, TimeUnit.SECONDS);
×
143
      }
144

145
    } catch (InterruptedException e) {
×
146
      Thread.currentThread().interrupt();
×
147
      log.debug("shutdown interrupted", e);
×
148
    }
1✔
149

150
    store.close();
1✔
151
  }
1✔
152

153
  private TestWorkflowMutableState getMutableState(ExecutionId executionId) {
154
    return getMutableState(executionId, true);
1✔
155
  }
156

157
  private TestWorkflowMutableState getMutableState(ExecutionId executionId, boolean failNotExists) {
158
    lock.lock();
1✔
159
    try {
160
      if (executionId.getExecution().getRunId().isEmpty()) {
1✔
161
        return getMutableState(executionId.getWorkflowId(), failNotExists);
1✔
162
      }
163
      TestWorkflowMutableState mutableState = executions.get(executionId);
1✔
164
      if (mutableState == null && failNotExists) {
1✔
165
        throw Status.NOT_FOUND
1✔
166
            .withDescription(
1✔
167
                "Execution \""
168
                    + executionId
169
                    + "\" not found in mutable state. Known executions: "
170
                    + executions.values()
1✔
171
                    + ", service="
172
                    + this)
173
            .asRuntimeException();
1✔
174
      }
175
      return mutableState;
1✔
176
    } finally {
177
      lock.unlock();
1✔
178
    }
179
  }
180

181
  private TestWorkflowMutableState getMutableState(WorkflowId workflowId, boolean failNotExists) {
182
    lock.lock();
1✔
183
    try {
184
      TestWorkflowMutableState mutableState = executionsByWorkflowId.get(workflowId);
1✔
185
      if (mutableState == null && failNotExists) {
1✔
186
        throw Status.NOT_FOUND
1✔
187
            .withDescription("Execution not found in mutable state: " + workflowId)
1✔
188
            .asRuntimeException();
1✔
189
      }
190
      return mutableState;
1✔
191
    } finally {
192
      lock.unlock();
1✔
193
    }
194
  }
195

196
  @Override
197
  public void startWorkflowExecution(
198
      StartWorkflowExecutionRequest request,
199
      StreamObserver<StartWorkflowExecutionResponse> responseObserver) {
200
    try {
201
      Duration backoffInterval = getBackoffInterval(request.getCronSchedule(), store.currentTime());
1✔
202
      StartWorkflowExecutionResponse response =
1✔
203
          startWorkflowExecutionImpl(
1✔
204
              request, backoffInterval, Optional.empty(), OptionalLong.empty(), null);
1✔
205
      responseObserver.onNext(response);
1✔
206
      responseObserver.onCompleted();
1✔
207
    } catch (StatusRuntimeException e) {
1✔
208
      handleStatusRuntimeException(e, responseObserver);
1✔
209
    }
1✔
210
  }
1✔
211

212
  StartWorkflowExecutionResponse startWorkflowExecutionImpl(
213
      StartWorkflowExecutionRequest startRequest,
214
      Duration backoffStartInterval,
215
      Optional<TestWorkflowMutableState> parent,
216
      OptionalLong parentChildInitiatedEventId,
217
      @Nullable SignalWorkflowExecutionRequest signalWithStartSignal) {
218
    String requestWorkflowId = requireNotNull("WorkflowId", startRequest.getWorkflowId());
1✔
219
    String namespace = requireNotNull("Namespace", startRequest.getNamespace());
1✔
220
    WorkflowId workflowId = new WorkflowId(namespace, requestWorkflowId);
1✔
221
    WorkflowIdReusePolicy reusePolicy = startRequest.getWorkflowIdReusePolicy();
1✔
222
    WorkflowIdConflictPolicy conflictPolicy = startRequest.getWorkflowIdConflictPolicy();
1✔
223
    if (conflictPolicy != WorkflowIdConflictPolicy.WORKFLOW_ID_CONFLICT_POLICY_UNSPECIFIED
1✔
224
        && reusePolicy == WorkflowIdReusePolicy.WORKFLOW_ID_REUSE_POLICY_TERMINATE_IF_RUNNING) {
225
      throw createInvalidArgument(
×
226
          "Invalid WorkflowIDReusePolicy: WORKFLOW_ID_REUSE_POLICY_TERMINATE_IF_RUNNING cannot be used together with a WorkflowIDConflictPolicy.");
227
    }
228

229
    TestWorkflowMutableState existing;
230
    lock.lock();
1✔
231
    try {
232
      String newRunId = UUID.randomUUID().toString();
1✔
233
      existing = executionsByWorkflowId.get(workflowId);
1✔
234
      if (existing != null) {
1✔
235
        WorkflowExecutionStatus status = existing.getWorkflowExecutionStatus();
1✔
236

237
        if (status == WorkflowExecutionStatus.WORKFLOW_EXECUTION_STATUS_RUNNING
1✔
238
            && (reusePolicy == WorkflowIdReusePolicy.WORKFLOW_ID_REUSE_POLICY_TERMINATE_IF_RUNNING
239
                || conflictPolicy
240
                    == WorkflowIdConflictPolicy.WORKFLOW_ID_CONFLICT_POLICY_TERMINATE_EXISTING)) {
241
          existing.terminateWorkflowExecution(
1✔
242
              TerminateWorkflowExecutionRequest.newBuilder()
1✔
243
                  .setNamespace(startRequest.getNamespace())
1✔
244
                  .setWorkflowExecution(existing.getExecutionId().getExecution())
1✔
245
                  .setReason("TerminateIfRunning WorkflowIdReusePolicy Policy")
1✔
246
                  .setIdentity("history-service")
1✔
247
                  .setDetails(
1✔
248
                      Payloads.newBuilder()
1✔
249
                          .addPayloads(
1✔
250
                              Payload.newBuilder()
1✔
251
                                  .setData(
1✔
252
                                      ByteString.copyFromUtf8(
1✔
253
                                          String.format("terminated by new runID: %s", newRunId)))
1✔
254
                                  .build())
1✔
255
                          .build())
1✔
256
                  .build());
1✔
257
        } else if (status == WorkflowExecutionStatus.WORKFLOW_EXECUTION_STATUS_RUNNING
1✔
258
            && conflictPolicy
259
                == WorkflowIdConflictPolicy.WORKFLOW_ID_CONFLICT_POLICY_USE_EXISTING) {
260
          return StartWorkflowExecutionResponse.newBuilder()
1✔
261
              .setStarted(false)
1✔
262
              .setRunId(existing.getExecutionId().getExecution().getRunId())
1✔
263
              .build();
1✔
264
        } else if (status == WorkflowExecutionStatus.WORKFLOW_EXECUTION_STATUS_RUNNING
1✔
265
            || reusePolicy == WorkflowIdReusePolicy.WORKFLOW_ID_REUSE_POLICY_REJECT_DUPLICATE) {
266
          return throwDuplicatedWorkflow(startRequest, existing);
×
267
        } else if (reusePolicy
1✔
268
                == WorkflowIdReusePolicy.WORKFLOW_ID_REUSE_POLICY_ALLOW_DUPLICATE_FAILED_ONLY
269
            && (status == WorkflowExecutionStatus.WORKFLOW_EXECUTION_STATUS_COMPLETED
270
                || status == WorkflowExecutionStatus.WORKFLOW_EXECUTION_STATUS_CONTINUED_AS_NEW)) {
271
          return throwDuplicatedWorkflow(startRequest, existing);
×
272
        }
273
      }
274
      Optional<TestServiceRetryState> retryState;
275
      Optional<Failure> lastFailure = Optional.empty();
1✔
276
      if (startRequest.hasRetryPolicy()) {
1✔
277
        Duration expirationInterval =
1✔
278
            ProtobufTimeUtils.toJavaDuration(startRequest.getWorkflowExecutionTimeout());
1✔
279
        retryState = newRetryStateLocked(startRequest.getRetryPolicy(), expirationInterval);
1✔
280
        if (retryState.isPresent()) {
1✔
281
          lastFailure = retryState.get().getPreviousRunFailure();
1✔
282
        }
283
      } else {
1✔
284
        retryState = Optional.empty();
1✔
285
      }
286
      return startWorkflowExecutionNoRunningCheckLocked(
1✔
287
          startRequest,
288
          newRunId,
289
          // it's the first execution in the continue-as-new chain, so firstExecutionRunId =
290
          // newRunId
291
          newRunId,
292
          Optional.empty(),
1✔
293
          retryState,
294
          backoffStartInterval,
295
          null,
296
          lastFailure,
297
          parent,
298
          parentChildInitiatedEventId,
299
          signalWithStartSignal,
300
          workflowId);
301
    } finally {
302
      lock.unlock();
1✔
303
    }
304
  }
305

306
  private Optional<TestServiceRetryState> newRetryStateLocked(
307
      RetryPolicy retryPolicy, Duration expirationInterval) {
308
    Timestamp expirationTime =
309
        expirationInterval.isZero()
1✔
310
            ? Timestamps.fromNanos(0)
1✔
311
            : Timestamps.add(
1✔
312
                store.currentTime(), ProtobufTimeUtils.toProtoDuration(expirationInterval));
1✔
313
    return Optional.of(new TestServiceRetryState(retryPolicy, expirationTime));
1✔
314
  }
315

316
  private StartWorkflowExecutionResponse throwDuplicatedWorkflow(
317
      StartWorkflowExecutionRequest startRequest, TestWorkflowMutableState existing) {
318
    WorkflowExecution execution = existing.getExecutionId().getExecution();
1✔
319
    WorkflowExecutionAlreadyStartedFailure error =
320
        WorkflowExecutionAlreadyStartedFailure.newBuilder()
1✔
321
            .setRunId(execution.getRunId())
1✔
322
            .setStartRequestId(startRequest.getRequestId())
1✔
323
            .build();
1✔
324
    throw StatusUtils.newException(
1✔
325
        Status.ALREADY_EXISTS.withDescription(
1✔
326
            String.format(
1✔
327
                "WorkflowId: %s, " + "RunId: %s", execution.getWorkflowId(), execution.getRunId())),
1✔
328
        error,
329
        WorkflowExecutionAlreadyStartedFailure.getDescriptor());
1✔
330
  }
331

332
  private StartWorkflowExecutionResponse startWorkflowExecutionNoRunningCheckLocked(
333
      StartWorkflowExecutionRequest startRequest,
334
      @Nonnull String runId,
335
      @Nonnull String firstExecutionRunId,
336
      Optional<String> continuedExecutionRunId,
337
      Optional<TestServiceRetryState> retryState,
338
      Duration backoffStartInterval,
339
      Payloads lastCompletionResult,
340
      Optional<Failure> lastFailure,
341
      Optional<TestWorkflowMutableState> parent,
342
      OptionalLong parentChildInitiatedEventId,
343
      @Nullable SignalWorkflowExecutionRequest signalWithStartSignal,
344
      WorkflowId workflowId) {
345
    String namespace = startRequest.getNamespace();
1✔
346
    TestWorkflowMutableState mutableState =
1✔
347
        new TestWorkflowMutableStateImpl(
348
            startRequest,
349
            firstExecutionRunId,
350
            runId,
351
            retryState,
352
            backoffStartInterval,
353
            lastCompletionResult,
354
            lastFailure,
355
            parent,
356
            parentChildInitiatedEventId,
357
            continuedExecutionRunId,
358
            this,
359
            store,
360
            visibilityStore,
361
            nexusEndpointStore,
362
            selfAdvancingTimer);
363
    WorkflowExecution execution = mutableState.getExecutionId().getExecution();
1✔
364
    ExecutionId executionId = new ExecutionId(namespace, execution);
1✔
365
    executionsByWorkflowId.put(workflowId, mutableState);
1✔
366
    executions.put(executionId, mutableState);
1✔
367

368
    PollWorkflowTaskQueueRequest eagerWorkflowTaskPollRequest =
369
        startRequest.getRequestEagerExecution()
1✔
370
            ? PollWorkflowTaskQueueRequest.newBuilder()
1✔
371
                .setIdentity(startRequest.getIdentity())
1✔
372
                .setNamespace(startRequest.getNamespace())
1✔
373
                .setTaskQueue(startRequest.getTaskQueue())
1✔
374
                .build()
1✔
375
            : null;
1✔
376

377
    @Nullable
378
    PollWorkflowTaskQueueResponse eagerWorkflowTask =
1✔
379
        mutableState.startWorkflow(
1✔
380
            continuedExecutionRunId.isPresent(),
1✔
381
            signalWithStartSignal,
382
            eagerWorkflowTaskPollRequest);
383
    StartWorkflowExecutionResponse.Builder response =
384
        StartWorkflowExecutionResponse.newBuilder().setRunId(execution.getRunId()).setStarted(true);
1✔
385
    if (eagerWorkflowTask != null) {
1✔
386
      response.setEagerWorkflowTask(eagerWorkflowTask);
1✔
387
    }
388
    return response.build();
1✔
389
  }
390

391
  @Override
392
  public void getWorkflowExecutionHistory(
393
      GetWorkflowExecutionHistoryRequest getRequest,
394
      StreamObserver<GetWorkflowExecutionHistoryResponse> responseObserver) {
395
    ExecutionId executionId = new ExecutionId(getRequest.getNamespace(), getRequest.getExecution());
1✔
396
    executor.execute(
1✔
397
        // preserving gRPC context deadline between threads
398
        Context.current()
1✔
399
            .wrap(
1✔
400
                () -> {
401
                  try {
402
                    TestWorkflowMutableState mutableState = getMutableState(executionId);
1✔
403
                    responseObserver.onNext(
1✔
404
                        store.getWorkflowExecutionHistory(
1✔
405
                            mutableState.getExecutionId(),
1✔
406
                            getRequest,
407
                            // We explicitly don't try to respond inside the context deadline.
408
                            // If we try to fit into the context deadline, the deadline may be not
409
                            // expired on the client side and an empty response will lead to a new
410
                            // request, making the client hammer the server at the tail end of the
411
                            // deadline.
412
                            // So this call is designed to wait fully till the end of the
413
                            // context deadline and throw DEADLINE_EXCEEDED if the deadline is less
414
                            // than 20s.
415
                            // If it's longer than 20 seconds - we return an empty result.
416
                            Deadline.after(20, TimeUnit.SECONDS)));
1✔
417
                    responseObserver.onCompleted();
1✔
418
                  } catch (StatusRuntimeException e) {
1✔
419
                    if (e.getStatus().getCode() == Status.Code.INTERNAL) {
1✔
420
                      log.error("unexpected", e);
×
421
                    }
422
                    responseObserver.onError(e);
1✔
423
                  } catch (Exception e) {
×
424
                    log.error("unexpected", e);
×
425
                    responseObserver.onError(e);
×
426
                  }
1✔
427
                }));
1✔
428
  }
1✔
429

430
  private <T> T pollTaskQueue(Context ctx, Future<T> futureValue)
431
      throws ExecutionException, InterruptedException {
432
    final Context.CancellationListener canceler = context -> futureValue.cancel(true);
1✔
433
    ctx.addListener(canceler, this.backgroundScheduler);
1✔
434
    try {
435
      return futureValue.get();
1✔
436
    } finally {
437
      ctx.removeListener(canceler);
1✔
438
    }
439
  }
440

441
  @Override
442
  public void pollWorkflowTaskQueue(
443
      PollWorkflowTaskQueueRequest pollRequest,
444
      StreamObserver<PollWorkflowTaskQueueResponse> responseObserver) {
445
    try (Context.CancellableContext ctx = deadlineCtx(getLongPollDeadline())) {
1✔
446
      PollWorkflowTaskQueueResponse.Builder task;
447
      try {
448
        task = pollTaskQueue(ctx, store.pollWorkflowTaskQueue(pollRequest));
1✔
449
      } catch (ExecutionException e) {
×
450
        responseObserver.onError(e);
×
451
        return;
×
452
      } catch (InterruptedException e) {
×
453
        Thread.currentThread().interrupt();
×
454
        responseObserver.onNext(PollWorkflowTaskQueueResponse.getDefaultInstance());
×
455
        responseObserver.onCompleted();
×
456
        return;
×
457
      } catch (CancellationException e) {
1✔
458
        responseObserver.onNext(PollWorkflowTaskQueueResponse.getDefaultInstance());
1✔
459
        responseObserver.onCompleted();
1✔
460
        return;
1✔
461
      }
1✔
462

463
      ExecutionId executionId =
1✔
464
          new ExecutionId(pollRequest.getNamespace(), task.getWorkflowExecution());
1✔
465
      TestWorkflowMutableState mutableState = getMutableState(executionId);
1✔
466
      try {
467
        mutableState.startWorkflowTask(task, pollRequest);
1✔
468
        // The task always has the original task queue that was created as part of the response.
469
        // This may be a different task queue than the task queue it was scheduled on, as in the
470
        // case of sticky execution.
471
        task.setWorkflowExecutionTaskQueue(mutableState.getStartRequest().getTaskQueue());
1✔
472
        PollWorkflowTaskQueueResponse response = task.build();
1✔
473
        responseObserver.onNext(response);
1✔
474
        responseObserver.onCompleted();
1✔
475
      } catch (StatusRuntimeException e) {
×
476
        if (e.getStatus().getCode() == Status.Code.NOT_FOUND) {
×
477
          if (log.isDebugEnabled()) {
×
478
            log.debug("Skipping outdated workflow task for " + executionId, e);
×
479
          }
480
          // The real service doesn't return this call on outdated task.
481
          // For simplicity, we return an empty result here.
482
          responseObserver.onNext(PollWorkflowTaskQueueResponse.getDefaultInstance());
×
483
          responseObserver.onCompleted();
×
484
        } else {
485
          if (e.getStatus().getCode() == Status.Code.INTERNAL) {
×
486
            log.error("unexpected", e);
×
487
          }
488
          responseObserver.onError(e);
×
489
        }
490
      }
1✔
491
    }
1✔
492
  }
1✔
493

494
  @Override
495
  public void respondWorkflowTaskCompleted(
496
      RespondWorkflowTaskCompletedRequest request,
497
      StreamObserver<RespondWorkflowTaskCompletedResponse> responseObserver) {
498
    try {
499
      WorkflowTaskToken taskToken = WorkflowTaskToken.fromBytes(request.getTaskToken());
1✔
500
      TestWorkflowMutableState mutableState = getMutableState(taskToken.getExecutionId());
1✔
501
      mutableState.completeWorkflowTask(taskToken.getHistorySize(), request);
1✔
502
      responseObserver.onNext(RespondWorkflowTaskCompletedResponse.getDefaultInstance());
1✔
503
      responseObserver.onCompleted();
1✔
504
    } catch (StatusRuntimeException e) {
1✔
505
      handleStatusRuntimeException(e, responseObserver);
1✔
506
    } catch (Throwable e) {
×
507
      responseObserver.onError(
×
508
          Status.INTERNAL
509
              .withDescription(Throwables.getStackTraceAsString(e))
×
510
              .withCause(e)
×
511
              .asRuntimeException());
×
512
    }
1✔
513
  }
1✔
514

515
  @Override
516
  public void respondWorkflowTaskFailed(
517
      RespondWorkflowTaskFailedRequest failedRequest,
518
      StreamObserver<RespondWorkflowTaskFailedResponse> responseObserver) {
519
    try {
520
      WorkflowTaskToken taskToken = WorkflowTaskToken.fromBytes(failedRequest.getTaskToken());
1✔
521
      TestWorkflowMutableState mutableState = getMutableState(taskToken.getExecutionId());
1✔
522
      mutableState.failWorkflowTask(failedRequest);
1✔
523
      responseObserver.onNext(RespondWorkflowTaskFailedResponse.getDefaultInstance());
1✔
524
      responseObserver.onCompleted();
1✔
525
    } catch (StatusRuntimeException e) {
×
526
      handleStatusRuntimeException(e, responseObserver);
×
527
    }
1✔
528
  }
1✔
529

530
  @Override
531
  public void getSystemInfo(
532
      GetSystemInfoRequest request, StreamObserver<GetSystemInfoResponse> responseObserver) {
533
    responseObserver.onNext(
1✔
534
        GetSystemInfoResponse.newBuilder()
1✔
535
            .setCapabilities(
1✔
536
                // These are the capabilities I could verify the test server supports
537
                GetSystemInfoResponse.Capabilities.newBuilder()
1✔
538
                    .setSdkMetadata(true)
1✔
539
                    .setSignalAndQueryHeader(true)
1✔
540
                    .setEncodedFailureAttributes(true)
1✔
541
                    .setEagerWorkflowStart(true)
1✔
542
                    .build())
1✔
543
            .build());
1✔
544
    responseObserver.onCompleted();
1✔
545
  }
1✔
546

547
  private Context.CancellableContext deadlineCtx(Deadline deadline) {
548
    return Context.current().withDeadline(deadline, this.backgroundScheduler);
1✔
549
  }
550

551
  @Override
552
  public void pollActivityTaskQueue(
553
      PollActivityTaskQueueRequest pollRequest,
554
      StreamObserver<PollActivityTaskQueueResponse> responseObserver) {
555
    try (Context.CancellableContext ctx = deadlineCtx(getLongPollDeadline())) {
1✔
556

557
      PollActivityTaskQueueResponse.Builder task;
558
      try {
559
        task = pollTaskQueue(ctx, store.pollActivityTaskQueue(pollRequest));
1✔
560
      } catch (ExecutionException e) {
×
561
        responseObserver.onError(e);
×
562
        return;
×
563
      } catch (InterruptedException e) {
×
564
        Thread.currentThread().interrupt();
×
565
        responseObserver.onNext(PollActivityTaskQueueResponse.getDefaultInstance());
×
566
        responseObserver.onCompleted();
×
567
        return;
×
568
      } catch (CancellationException e) {
1✔
569
        responseObserver.onNext(PollActivityTaskQueueResponse.getDefaultInstance());
1✔
570
        responseObserver.onCompleted();
1✔
571
        return;
1✔
572
      }
1✔
573

574
      ExecutionId executionId =
1✔
575
          new ExecutionId(pollRequest.getNamespace(), task.getWorkflowExecution());
1✔
576
      TestWorkflowMutableState mutableState = getMutableState(executionId);
1✔
577
      try {
578
        mutableState.startActivityTask(task, pollRequest);
1✔
579
        responseObserver.onNext(task.build());
1✔
580
        responseObserver.onCompleted();
1✔
581
      } catch (StatusRuntimeException e) {
1✔
582
        if (e.getStatus().getCode() == Status.Code.NOT_FOUND) {
1✔
583
          if (log.isDebugEnabled()) {
1✔
584
            log.debug("Skipping outdated activity task for " + executionId, e);
×
585
          }
586
          responseObserver.onNext(PollActivityTaskQueueResponse.getDefaultInstance());
1✔
587
          responseObserver.onCompleted();
1✔
588
        } else {
589
          if (e.getStatus().getCode() == Status.Code.INTERNAL) {
×
590
            log.error("unexpected", e);
×
591
          }
592
          responseObserver.onError(e);
×
593
        }
594
      }
1✔
595
    }
1✔
596
  }
1✔
597

598
  @Override
599
  public void recordActivityTaskHeartbeat(
600
      RecordActivityTaskHeartbeatRequest heartbeatRequest,
601
      StreamObserver<RecordActivityTaskHeartbeatResponse> responseObserver) {
602
    try {
603
      ActivityTaskToken activityTaskToken =
1✔
604
          ActivityTaskToken.fromBytes(heartbeatRequest.getTaskToken());
1✔
605
      TestWorkflowMutableState mutableState = getMutableState(activityTaskToken.getExecutionId());
1✔
606
      boolean cancelRequested =
1✔
607
          mutableState.heartbeatActivityTask(
1✔
608
              activityTaskToken.getScheduledEventId(), heartbeatRequest.getDetails());
1✔
609
      responseObserver.onNext(
1✔
610
          RecordActivityTaskHeartbeatResponse.newBuilder()
1✔
611
              .setCancelRequested(cancelRequested)
1✔
612
              .build());
1✔
613
      responseObserver.onCompleted();
1✔
614
    } catch (StatusRuntimeException e) {
1✔
615
      handleStatusRuntimeException(e, responseObserver);
1✔
616
    }
1✔
617
  }
1✔
618

619
  @Override
620
  public void recordActivityTaskHeartbeatById(
621
      RecordActivityTaskHeartbeatByIdRequest heartbeatRequest,
622
      StreamObserver<RecordActivityTaskHeartbeatByIdResponse> responseObserver) {
623
    try {
624
      ExecutionId execution =
×
625
          new ExecutionId(
626
              heartbeatRequest.getNamespace(),
×
627
              heartbeatRequest.getWorkflowId(),
×
628
              heartbeatRequest.getRunId());
×
629
      TestWorkflowMutableState mutableState = getMutableState(execution);
×
630
      boolean cancelRequested =
×
631
          mutableState.heartbeatActivityTaskById(
×
632
              heartbeatRequest.getActivityId(),
×
633
              heartbeatRequest.getDetails(),
×
634
              heartbeatRequest.getIdentity());
×
635
      responseObserver.onNext(
×
636
          RecordActivityTaskHeartbeatByIdResponse.newBuilder()
×
637
              .setCancelRequested(cancelRequested)
×
638
              .build());
×
639
      responseObserver.onCompleted();
×
640
    } catch (StatusRuntimeException e) {
×
641
      handleStatusRuntimeException(e, responseObserver);
×
642
    }
×
643
  }
×
644

645
  @Override
646
  public void respondActivityTaskCompleted(
647
      RespondActivityTaskCompletedRequest completeRequest,
648
      StreamObserver<RespondActivityTaskCompletedResponse> responseObserver) {
649
    try {
650
      ActivityTaskToken activityTaskToken =
1✔
651
          ActivityTaskToken.fromBytes(completeRequest.getTaskToken());
1✔
652
      TestWorkflowMutableState mutableState = getMutableState(activityTaskToken.getExecutionId());
1✔
653
      mutableState.completeActivityTask(activityTaskToken.getScheduledEventId(), completeRequest);
1✔
654
      responseObserver.onNext(RespondActivityTaskCompletedResponse.getDefaultInstance());
1✔
655
      responseObserver.onCompleted();
1✔
656
    } catch (StatusRuntimeException e) {
1✔
657
      handleStatusRuntimeException(e, responseObserver);
1✔
658
    }
1✔
659
  }
1✔
660

661
  @Override
662
  public void respondActivityTaskCompletedById(
663
      RespondActivityTaskCompletedByIdRequest completeRequest,
664
      StreamObserver<RespondActivityTaskCompletedByIdResponse> responseObserver) {
665
    try {
666
      ExecutionId executionId =
×
667
          new ExecutionId(
668
              completeRequest.getNamespace(),
×
669
              completeRequest.getWorkflowId(),
×
670
              completeRequest.getRunId());
×
671
      TestWorkflowMutableState mutableState = getMutableState(executionId);
×
672
      mutableState.completeActivityTaskById(completeRequest.getActivityId(), completeRequest);
×
673
      responseObserver.onNext(RespondActivityTaskCompletedByIdResponse.getDefaultInstance());
×
674
      responseObserver.onCompleted();
×
675
    } catch (StatusRuntimeException e) {
×
676
      handleStatusRuntimeException(e, responseObserver);
×
677
    }
×
678
  }
×
679

680
  @Override
681
  public void respondActivityTaskFailed(
682
      RespondActivityTaskFailedRequest failRequest,
683
      StreamObserver<RespondActivityTaskFailedResponse> responseObserver) {
684
    try {
685
      ActivityTaskToken activityTaskToken = ActivityTaskToken.fromBytes(failRequest.getTaskToken());
1✔
686
      TestWorkflowMutableState mutableState = getMutableState(activityTaskToken.getExecutionId());
1✔
687
      mutableState.failActivityTask(activityTaskToken.getScheduledEventId(), failRequest);
1✔
688
      responseObserver.onNext(RespondActivityTaskFailedResponse.getDefaultInstance());
1✔
689
      responseObserver.onCompleted();
1✔
690
    } catch (StatusRuntimeException e) {
1✔
691
      handleStatusRuntimeException(e, responseObserver);
1✔
692
    }
1✔
693
  }
1✔
694

695
  @Override
696
  public void respondActivityTaskFailedById(
697
      RespondActivityTaskFailedByIdRequest failRequest,
698
      StreamObserver<RespondActivityTaskFailedByIdResponse> responseObserver) {
699
    try {
700
      ExecutionId executionId =
×
701
          new ExecutionId(
702
              failRequest.getNamespace(), failRequest.getWorkflowId(), failRequest.getRunId());
×
703
      TestWorkflowMutableState mutableState = getMutableState(executionId);
×
704
      mutableState.failActivityTaskById(failRequest.getActivityId(), failRequest);
×
705
      responseObserver.onNext(RespondActivityTaskFailedByIdResponse.getDefaultInstance());
×
706
      responseObserver.onCompleted();
×
707
    } catch (StatusRuntimeException e) {
×
708
      handleStatusRuntimeException(e, responseObserver);
×
709
    }
×
710
  }
×
711

712
  @Override
713
  public void respondActivityTaskCanceled(
714
      RespondActivityTaskCanceledRequest canceledRequest,
715
      StreamObserver<RespondActivityTaskCanceledResponse> responseObserver) {
716
    try {
717
      ActivityTaskToken activityTaskToken =
1✔
718
          ActivityTaskToken.fromBytes(canceledRequest.getTaskToken());
1✔
719
      TestWorkflowMutableState mutableState = getMutableState(activityTaskToken.getExecutionId());
1✔
720
      mutableState.cancelActivityTask(activityTaskToken.getScheduledEventId(), canceledRequest);
1✔
721
      responseObserver.onNext(RespondActivityTaskCanceledResponse.getDefaultInstance());
1✔
722
      responseObserver.onCompleted();
1✔
723
    } catch (StatusRuntimeException e) {
×
724
      handleStatusRuntimeException(e, responseObserver);
×
725
    }
1✔
726
  }
1✔
727

728
  @Override
729
  public void respondActivityTaskCanceledById(
730
      RespondActivityTaskCanceledByIdRequest canceledRequest,
731
      StreamObserver<RespondActivityTaskCanceledByIdResponse> responseObserver) {
732
    try {
733
      ExecutionId executionId =
×
734
          new ExecutionId(
735
              canceledRequest.getNamespace(),
×
736
              canceledRequest.getWorkflowId(),
×
737
              canceledRequest.getRunId());
×
738
      TestWorkflowMutableState mutableState = getMutableState(executionId);
×
739
      mutableState.cancelActivityTaskById(canceledRequest.getActivityId(), canceledRequest);
×
740
      responseObserver.onNext(RespondActivityTaskCanceledByIdResponse.getDefaultInstance());
×
741
      responseObserver.onCompleted();
×
742
    } catch (StatusRuntimeException e) {
×
743
      handleStatusRuntimeException(e, responseObserver);
×
744
    }
×
745
  }
×
746

747
  @Override
748
  public void pollNexusTaskQueue(
749
      PollNexusTaskQueueRequest request,
750
      StreamObserver<PollNexusTaskQueueResponse> responseObserver) {
751
    try (Context.CancellableContext ctx = deadlineCtx(getLongPollDeadline())) {
1✔
752

753
      PollNexusTaskQueueResponse.Builder task;
754
      try {
755
        task = pollTaskQueue(ctx, store.pollNexusTaskQueue(request));
1✔
NEW
756
      } catch (ExecutionException e) {
×
NEW
757
        responseObserver.onError(e);
×
NEW
758
        return;
×
NEW
759
      } catch (InterruptedException e) {
×
NEW
760
        Thread.currentThread().interrupt();
×
NEW
761
        responseObserver.onNext(PollNexusTaskQueueResponse.getDefaultInstance());
×
NEW
762
        responseObserver.onCompleted();
×
NEW
763
        return;
×
NEW
764
      } catch (CancellationException e) {
×
NEW
765
        responseObserver.onNext(PollNexusTaskQueueResponse.getDefaultInstance());
×
NEW
766
        responseObserver.onCompleted();
×
NEW
767
        return;
×
768
      }
1✔
769

770
      responseObserver.onNext(task.build());
1✔
771
      responseObserver.onCompleted();
1✔
NEW
772
    }
×
773
  }
1✔
774

775
  @Override
776
  public void respondNexusTaskCompleted(
777
      RespondNexusTaskCompletedRequest request,
778
      StreamObserver<RespondNexusTaskCompletedResponse> responseObserver) {
779
    try {
780
      NexusTaskToken taskToken = NexusTaskToken.fromBytes(request.getTaskToken());
1✔
781
      TestWorkflowMutableState mutableState = getMutableState(taskToken.getExecutionId());
1✔
782
      if (request.getResponse().hasStartOperation()
1✔
783
          && request.getResponse().getStartOperation().hasAsyncSuccess()) {
1✔
784
        // Start event is only recorded for async success
NEW
785
        mutableState.startNexusTask(taskToken.getScheduledEventId(), request);
×
786
      } else {
787
        mutableState.completeNexusTask(taskToken.getScheduledEventId(), request);
1✔
788
      }
789
      responseObserver.onNext(RespondNexusTaskCompletedResponse.getDefaultInstance());
1✔
790
      responseObserver.onCompleted();
1✔
NEW
791
    } catch (StatusRuntimeException e) {
×
NEW
792
      handleStatusRuntimeException(e, responseObserver);
×
793
    }
1✔
794
  }
1✔
795

796
  @Override
797
  public void respondNexusTaskFailed(
798
      RespondNexusTaskFailedRequest request,
799
      StreamObserver<RespondNexusTaskFailedResponse> responseObserver) {
800
    try {
801
      NexusTaskToken taskToken = NexusTaskToken.fromBytes(request.getTaskToken());
1✔
802
      TestWorkflowMutableState mutableState = getMutableState(taskToken.getExecutionId());
1✔
NEW
803
      mutableState.failNexusTask(taskToken.getScheduledEventId(), request);
×
NEW
804
      responseObserver.onNext(RespondNexusTaskFailedResponse.getDefaultInstance());
×
NEW
805
      responseObserver.onCompleted();
×
806
    } catch (StatusRuntimeException e) {
1✔
807
      handleStatusRuntimeException(e, responseObserver);
1✔
NEW
808
    }
×
809
  }
1✔
810

811
  @Override
812
  public void requestCancelWorkflowExecution(
813
      RequestCancelWorkflowExecutionRequest cancelRequest,
814
      StreamObserver<RequestCancelWorkflowExecutionResponse> responseObserver) {
815
    try {
816
      requestCancelWorkflowExecution(cancelRequest, Optional.empty());
1✔
817
      responseObserver.onNext(RequestCancelWorkflowExecutionResponse.getDefaultInstance());
1✔
818
      responseObserver.onCompleted();
1✔
819
    } catch (StatusRuntimeException e) {
1✔
820
      handleStatusRuntimeException(e, responseObserver);
1✔
821
    }
1✔
822
  }
1✔
823

824
  void requestCancelWorkflowExecution(
825
      RequestCancelWorkflowExecutionRequest cancelRequest,
826
      Optional<TestWorkflowMutableStateImpl.CancelExternalWorkflowExecutionCallerInfo> callerInfo) {
827
    ExecutionId executionId =
1✔
828
        new ExecutionId(cancelRequest.getNamespace(), cancelRequest.getWorkflowExecution());
1✔
829
    TestWorkflowMutableState mutableState = getMutableState(executionId);
1✔
830
    mutableState.requestCancelWorkflowExecution(cancelRequest, callerInfo);
1✔
831
  }
1✔
832

833
  @Override
834
  public void terminateWorkflowExecution(
835
      TerminateWorkflowExecutionRequest request,
836
      StreamObserver<TerminateWorkflowExecutionResponse> responseObserver) {
837
    try {
838
      terminateWorkflowExecution(request);
1✔
839
      responseObserver.onNext(TerminateWorkflowExecutionResponse.getDefaultInstance());
1✔
840
      responseObserver.onCompleted();
1✔
841
    } catch (StatusRuntimeException e) {
1✔
842
      handleStatusRuntimeException(e, responseObserver);
1✔
843
    }
1✔
844
  }
1✔
845

846
  private void terminateWorkflowExecution(TerminateWorkflowExecutionRequest request) {
847
    ExecutionId executionId =
1✔
848
        new ExecutionId(request.getNamespace(), request.getWorkflowExecution());
1✔
849
    TestWorkflowMutableState mutableState = getMutableState(executionId);
1✔
850
    mutableState.terminateWorkflowExecution(request);
1✔
851
  }
1✔
852

853
  @Override
854
  public void signalWorkflowExecution(
855
      SignalWorkflowExecutionRequest signalRequest,
856
      StreamObserver<SignalWorkflowExecutionResponse> responseObserver) {
857
    try {
858
      ExecutionId executionId =
1✔
859
          new ExecutionId(signalRequest.getNamespace(), signalRequest.getWorkflowExecution());
1✔
860
      TestWorkflowMutableState mutableState = getMutableState(executionId);
1✔
861
      mutableState.signal(signalRequest);
1✔
862
      responseObserver.onNext(SignalWorkflowExecutionResponse.getDefaultInstance());
1✔
863
      responseObserver.onCompleted();
1✔
864
    } catch (StatusRuntimeException e) {
1✔
865
      handleStatusRuntimeException(e, responseObserver);
1✔
866
    }
1✔
867
  }
1✔
868

869
  @Override
870
  public void updateWorkflowExecution(
871
      UpdateWorkflowExecutionRequest request,
872
      StreamObserver<UpdateWorkflowExecutionResponse> responseObserver) {
873
    try (Context.CancellableContext ctx = deadlineCtx(getUpdatePollDeadline())) {
1✔
874
      Context toRestore = ctx.attach();
1✔
875
      try {
876
        ExecutionId executionId =
1✔
877
            new ExecutionId(request.getNamespace(), request.getWorkflowExecution());
1✔
878
        TestWorkflowMutableState mutableState = getMutableState(executionId);
1✔
879
        @Nullable Deadline deadline = Context.current().getDeadline();
1✔
880
        UpdateWorkflowExecutionResponse response =
1✔
881
            mutableState.updateWorkflowExecution(request, deadline);
1✔
882
        responseObserver.onNext(response);
1✔
883
        responseObserver.onCompleted();
1✔
884
      } catch (StatusRuntimeException e) {
1✔
885
        handleStatusRuntimeException(e, responseObserver);
1✔
886
      } finally {
887
        ctx.detach(toRestore);
1✔
888
      }
889
    }
890
  }
1✔
891

892
  @Override
893
  public void pollWorkflowExecutionUpdate(
894
      PollWorkflowExecutionUpdateRequest request,
895
      StreamObserver<PollWorkflowExecutionUpdateResponse> responseObserver) {
896
    try (Context.CancellableContext ctx = deadlineCtx(getUpdatePollDeadline())) {
1✔
897
      Context toRestore = ctx.attach();
1✔
898
      try {
899
        ExecutionId executionId =
1✔
900
            new ExecutionId(request.getNamespace(), request.getUpdateRef().getWorkflowExecution());
1✔
901
        TestWorkflowMutableState mutableState = getMutableState(executionId);
1✔
902
        @Nullable Deadline deadline = Context.current().getDeadline();
1✔
903
        PollWorkflowExecutionUpdateResponse response =
1✔
904
            mutableState.pollUpdateWorkflowExecution(request, deadline);
1✔
905
        responseObserver.onNext(response);
1✔
906
        responseObserver.onCompleted();
1✔
907
      } catch (StatusRuntimeException e) {
1✔
908
        handleStatusRuntimeException(e, responseObserver);
1✔
909
      } finally {
910
        ctx.detach(toRestore);
1✔
911
      }
912
    }
913
  }
1✔
914

915
  @Override
916
  public void signalWithStartWorkflowExecution(
917
      SignalWithStartWorkflowExecutionRequest r,
918
      StreamObserver<SignalWithStartWorkflowExecutionResponse> responseObserver) {
919
    try {
920
      if (!r.hasTaskQueue()) {
1✔
921
        throw Status.INVALID_ARGUMENT
×
922
            .withDescription("request missing required taskQueue field")
×
923
            .asRuntimeException();
×
924
      }
925
      if (!r.hasWorkflowType()) {
1✔
926
        throw Status.INVALID_ARGUMENT
×
927
            .withDescription("request missing required workflowType field")
×
928
            .asRuntimeException();
×
929
      }
930
      ExecutionId executionId = new ExecutionId(r.getNamespace(), r.getWorkflowId(), null);
1✔
931
      TestWorkflowMutableState mutableState = getMutableState(executionId, false);
1✔
932
      SignalWorkflowExecutionRequest signalRequest =
933
          SignalWorkflowExecutionRequest.newBuilder()
1✔
934
              .setInput(r.getSignalInput())
1✔
935
              .setSignalName(r.getSignalName())
1✔
936
              .setWorkflowExecution(executionId.getExecution())
1✔
937
              .setRequestId(r.getRequestId())
1✔
938
              .setControl(r.getControl())
1✔
939
              .setNamespace(r.getNamespace())
1✔
940
              .setIdentity(r.getIdentity())
1✔
941
              .build();
1✔
942
      if (mutableState != null && !mutableState.isTerminalState()) {
1✔
943
        mutableState.signal(signalRequest);
1✔
944
        responseObserver.onNext(
1✔
945
            SignalWithStartWorkflowExecutionResponse.newBuilder()
1✔
946
                .setRunId(mutableState.getExecutionId().getExecution().getRunId())
1✔
947
                .build());
1✔
948
        responseObserver.onCompleted();
1✔
949
        return;
1✔
950
      }
951
      StartWorkflowExecutionRequest.Builder startRequest =
952
          StartWorkflowExecutionRequest.newBuilder()
1✔
953
              .setRequestId(r.getRequestId())
1✔
954
              .setInput(r.getInput())
1✔
955
              .setWorkflowExecutionTimeout(r.getWorkflowExecutionTimeout())
1✔
956
              .setWorkflowRunTimeout(r.getWorkflowRunTimeout())
1✔
957
              .setWorkflowTaskTimeout(r.getWorkflowTaskTimeout())
1✔
958
              .setNamespace(r.getNamespace())
1✔
959
              .setTaskQueue(r.getTaskQueue())
1✔
960
              .setWorkflowId(r.getWorkflowId())
1✔
961
              .setWorkflowIdReusePolicy(r.getWorkflowIdReusePolicy())
1✔
962
              .setIdentity(r.getIdentity())
1✔
963
              .setWorkflowType(r.getWorkflowType())
1✔
964
              .setCronSchedule(r.getCronSchedule())
1✔
965
              .setRequestId(r.getRequestId());
1✔
966
      if (r.hasRetryPolicy()) {
1✔
967
        startRequest.setRetryPolicy(r.getRetryPolicy());
×
968
      }
969
      if (r.hasHeader()) {
1✔
970
        startRequest.setHeader(r.getHeader());
1✔
971
      }
972
      if (r.hasMemo()) {
1✔
973
        startRequest.setMemo(r.getMemo());
×
974
      }
975
      if (r.hasSearchAttributes()) {
1✔
976
        startRequest.setSearchAttributes(r.getSearchAttributes());
1✔
977
      }
978
      if (r.hasWorkflowStartDelay()) {
1✔
979
        startRequest.setWorkflowStartDelay(r.getWorkflowStartDelay());
1✔
980
      }
981

982
      StartWorkflowExecutionResponse startResult =
1✔
983
          startWorkflowExecutionImpl(
1✔
984
              startRequest.build(),
1✔
985
              Duration.ZERO,
986
              Optional.empty(),
1✔
987
              OptionalLong.empty(),
1✔
988
              signalRequest);
989
      responseObserver.onNext(
1✔
990
          SignalWithStartWorkflowExecutionResponse.newBuilder()
1✔
991
              .setRunId(startResult.getRunId())
1✔
992
              .build());
1✔
993
      responseObserver.onCompleted();
1✔
994
    } catch (StatusRuntimeException e) {
1✔
995
      handleStatusRuntimeException(e, responseObserver);
1✔
996
    }
1✔
997
  }
1✔
998

999
  public void signalExternalWorkflowExecution(
1000
      String signalId,
1001
      SignalExternalWorkflowExecutionCommandAttributes commandAttributes,
1002
      TestWorkflowMutableState source) {
1003
    String namespace;
1004
    if (commandAttributes.getNamespace().isEmpty()) {
1✔
1005
      namespace = source.getExecutionId().getNamespace();
1✔
1006
    } else {
1007
      namespace = commandAttributes.getNamespace();
×
1008
    }
1009
    ExecutionId executionId = new ExecutionId(namespace, commandAttributes.getExecution());
1✔
1010
    TestWorkflowMutableState mutableState;
1011
    try {
1012
      mutableState = getMutableState(executionId);
1✔
1013
      mutableState.signalFromWorkflow(commandAttributes);
1✔
1014
      source.completeSignalExternalWorkflowExecution(
1✔
1015
          signalId, mutableState.getExecutionId().getExecution().getRunId());
1✔
1016
    } catch (StatusRuntimeException e) {
1✔
1017
      if (e.getStatus().getCode() == Status.Code.NOT_FOUND) {
1✔
1018
        source.failSignalExternalWorkflowExecution(
1✔
1019
            signalId,
1020
            SignalExternalWorkflowExecutionFailedCause
1021
                .SIGNAL_EXTERNAL_WORKFLOW_EXECUTION_FAILED_CAUSE_EXTERNAL_WORKFLOW_EXECUTION_NOT_FOUND);
1022
      } else {
1023
        throw e;
×
1024
      }
1025
    }
1✔
1026
  }
1✔
1027

1028
  /**
1029
   * Creates next run of a workflow execution
1030
   *
1031
   * @return RunId
1032
   */
1033
  public String continueAsNew(
1034
      StartWorkflowExecutionRequest previousRunStartRequest,
1035
      ContinueAsNewWorkflowExecutionCommandAttributes ca,
1036
      WorkflowExecutionContinuedAsNewEventAttributes ea,
1037
      Optional<TestServiceRetryState> retryState,
1038
      String identity,
1039
      ExecutionId continuedExecutionId,
1040
      String firstExecutionRunId,
1041
      Optional<TestWorkflowMutableState> parent,
1042
      OptionalLong parentChildInitiatedEventId) {
1043
    StartWorkflowExecutionRequest.Builder startRequestBuilder =
1044
        StartWorkflowExecutionRequest.newBuilder()
1✔
1045
            .setRequestId(UUID.randomUUID().toString())
1✔
1046
            .setWorkflowType(ea.getWorkflowType())
1✔
1047
            .setWorkflowRunTimeout(ea.getWorkflowRunTimeout())
1✔
1048
            .setWorkflowTaskTimeout(ea.getWorkflowTaskTimeout())
1✔
1049
            .setNamespace(continuedExecutionId.getNamespace())
1✔
1050
            .setTaskQueue(ea.getTaskQueue())
1✔
1051
            .setWorkflowId(continuedExecutionId.getWorkflowId().getWorkflowId())
1✔
1052
            .setWorkflowIdReusePolicy(previousRunStartRequest.getWorkflowIdReusePolicy())
1✔
1053
            .setIdentity(identity)
1✔
1054
            .setCronSchedule(previousRunStartRequest.getCronSchedule());
1✔
1055
    // TODO: Service doesn't perform this copy.
1056
    // See https://github.com/temporalio/temporal/issues/5249
1057
    //    if (previousRunStartRequest.hasRetryPolicy()) {
1058
    //      startRequestBuilder.setRetryPolicy(previousRunStartRequest.getRetryPolicy());
1059
    //    }
1060
    if (ca.hasRetryPolicy()) {
1✔
1061
      startRequestBuilder.setRetryPolicy(ca.getRetryPolicy());
1✔
1062
    }
1063
    if (ea.hasInput()) {
1✔
1064
      startRequestBuilder.setInput(ea.getInput());
1✔
1065
    }
1066
    if (ea.hasHeader()) {
1✔
1067
      startRequestBuilder.setHeader(ea.getHeader());
1✔
1068
    }
1069
    StartWorkflowExecutionRequest startRequest = startRequestBuilder.build();
1✔
1070
    lock.lock();
1✔
1071
    Optional<Failure> lastFail =
1072
        ea.hasFailure()
1✔
1073
            ? Optional.of(ea.getFailure())
1✔
1074
            : retryState.flatMap(TestServiceRetryState::getPreviousRunFailure);
1✔
1075
    try {
1076
      StartWorkflowExecutionResponse response =
1✔
1077
          startWorkflowExecutionNoRunningCheckLocked(
1✔
1078
              startRequest,
1079
              ea.getNewExecutionRunId(),
1✔
1080
              firstExecutionRunId,
1081
              Optional.of(continuedExecutionId.getExecution().getRunId()),
1✔
1082
              retryState,
1083
              ProtobufTimeUtils.toJavaDuration(ea.getBackoffStartInterval()),
1✔
1084
              ea.getLastCompletionResult(),
1✔
1085
              lastFail,
1086
              parent,
1087
              parentChildInitiatedEventId,
1088
              null,
1089
              continuedExecutionId.getWorkflowId());
1✔
1090
      return response.getRunId();
1✔
1091
    } finally {
1092
      lock.unlock();
1✔
1093
    }
1094
  }
1095

1096
  @Override
1097
  public void listOpenWorkflowExecutions(
1098
      ListOpenWorkflowExecutionsRequest listRequest,
1099
      StreamObserver<ListOpenWorkflowExecutionsResponse> responseObserver) {
1100
    try {
1101
      Optional<String> workflowIdFilter;
1102
      if (listRequest.hasExecutionFilter()
1✔
1103
          && !listRequest.getExecutionFilter().getWorkflowId().isEmpty()) {
×
1104
        workflowIdFilter = Optional.of(listRequest.getExecutionFilter().getWorkflowId());
×
1105
      } else {
1106
        workflowIdFilter = Optional.empty();
1✔
1107
      }
1108
      List<WorkflowExecutionInfo> result =
1✔
1109
          store.listWorkflows(WorkflowState.OPEN, workflowIdFilter);
1✔
1110
      responseObserver.onNext(
1✔
1111
          ListOpenWorkflowExecutionsResponse.newBuilder().addAllExecutions(result).build());
1✔
1112
      responseObserver.onCompleted();
1✔
1113
    } catch (StatusRuntimeException e) {
×
1114
      handleStatusRuntimeException(e, responseObserver);
×
1115
    }
1✔
1116
  }
1✔
1117

1118
  @Override
1119
  public void listClosedWorkflowExecutions(
1120
      ListClosedWorkflowExecutionsRequest listRequest,
1121
      StreamObserver<ListClosedWorkflowExecutionsResponse> responseObserver) {
1122
    try {
1123
      Optional<String> workflowIdFilter;
1124
      if (listRequest.hasExecutionFilter()
1✔
1125
          && !listRequest.getExecutionFilter().getWorkflowId().isEmpty()) {
×
1126
        workflowIdFilter = Optional.of(listRequest.getExecutionFilter().getWorkflowId());
×
1127
      } else {
1128
        workflowIdFilter = Optional.empty();
1✔
1129
      }
1130
      List<WorkflowExecutionInfo> result =
1✔
1131
          store.listWorkflows(WorkflowState.CLOSED, workflowIdFilter);
1✔
1132
      responseObserver.onNext(
1✔
1133
          ListClosedWorkflowExecutionsResponse.newBuilder().addAllExecutions(result).build());
1✔
1134
      responseObserver.onCompleted();
1✔
1135
    } catch (StatusRuntimeException e) {
×
1136
      handleStatusRuntimeException(e, responseObserver);
×
1137
    }
1✔
1138
  }
1✔
1139

1140
  @Override
1141
  public void respondQueryTaskCompleted(
1142
      RespondQueryTaskCompletedRequest completeRequest,
1143
      StreamObserver<RespondQueryTaskCompletedResponse> responseObserver) {
1144
    try {
1145
      QueryId queryId = QueryId.fromBytes(completeRequest.getTaskToken());
1✔
1146
      TestWorkflowMutableState mutableState = getMutableState(queryId.getExecutionId());
1✔
1147
      mutableState.completeQuery(queryId, completeRequest);
1✔
1148
      responseObserver.onNext(RespondQueryTaskCompletedResponse.getDefaultInstance());
1✔
1149
      responseObserver.onCompleted();
1✔
1150
    } catch (StatusRuntimeException e) {
×
1151
      handleStatusRuntimeException(e, responseObserver);
×
1152
    }
1✔
1153
  }
1✔
1154

1155
  @Override
1156
  public void queryWorkflow(
1157
      QueryWorkflowRequest queryRequest, StreamObserver<QueryWorkflowResponse> responseObserver) {
1158
    try {
1159
      ExecutionId executionId =
1✔
1160
          new ExecutionId(queryRequest.getNamespace(), queryRequest.getExecution());
1✔
1161
      TestWorkflowMutableState mutableState = getMutableState(executionId);
1✔
1162
      @Nullable Deadline deadline = Context.current().getDeadline();
1✔
1163
      QueryWorkflowResponse result =
1✔
1164
          mutableState.query(
1✔
1165
              queryRequest,
1166
              deadline != null ? deadline.timeRemaining(TimeUnit.MILLISECONDS) : Long.MAX_VALUE);
1✔
1167
      responseObserver.onNext(result);
1✔
1168
      responseObserver.onCompleted();
1✔
1169
    } catch (StatusRuntimeException e) {
1✔
1170
      handleStatusRuntimeException(e, responseObserver);
1✔
1171
    }
1✔
1172
  }
1✔
1173

1174
  @Override
1175
  public void describeWorkflowExecution(
1176
      DescribeWorkflowExecutionRequest request,
1177
      StreamObserver<io.temporal.api.workflowservice.v1.DescribeWorkflowExecutionResponse>
1178
          responseObserver) {
1179
    try {
1180
      if (request.getNamespace().isEmpty()) {
1✔
1181
        throw createInvalidArgument("Namespace not set on request.");
×
1182
      }
1183
      if (!request.hasExecution()) {
1✔
1184
        throw createInvalidArgument("Execution not set on request.");
×
1185
      }
1186

1187
      ExecutionId executionId = new ExecutionId(request.getNamespace(), request.getExecution());
1✔
1188
      TestWorkflowMutableState mutableState = getMutableState(executionId);
1✔
1189
      DescribeWorkflowExecutionResponse result = mutableState.describeWorkflowExecution();
1✔
1190
      responseObserver.onNext(result);
1✔
1191
      responseObserver.onCompleted();
1✔
1192
    } catch (StatusRuntimeException e) {
1✔
1193
      handleStatusRuntimeException(e, responseObserver);
1✔
1194
    }
1✔
1195
  }
1✔
1196

1197
  /**
1198
   * This method doesn't make much sense for test server, it accepts all namespaces as existent and
1199
   * registered. so, it's a trivial implementation just returning an info that a namespace is
1200
   * registered irrespectively of the input
1201
   */
1202
  @Override
1203
  public void describeNamespace(
1204
      DescribeNamespaceRequest request,
1205
      StreamObserver<DescribeNamespaceResponse> responseObserver) {
1206
    try {
1207
      if (request.getNamespace().isEmpty()) {
1✔
1208
        throw createInvalidArgument("Namespace not set on request.");
×
1209
      }
1210
      // generating a stable UUID for name
1211
      String namespaceId = UUID.nameUUIDFromBytes(request.getNamespace().getBytes()).toString();
1✔
1212
      DescribeNamespaceResponse result =
1213
          DescribeNamespaceResponse.newBuilder()
1✔
1214
              .setNamespaceInfo(
1✔
1215
                  NamespaceInfo.newBuilder()
1✔
1216
                      .setName(request.getNamespace())
1✔
1217
                      .setState(NamespaceState.NAMESPACE_STATE_REGISTERED)
1✔
1218
                      .setId(namespaceId)
1✔
1219
                      .build())
1✔
1220
              .build();
1✔
1221
      responseObserver.onNext(result);
1✔
1222
      responseObserver.onCompleted();
1✔
1223
    } catch (StatusRuntimeException e) {
1✔
1224
      handleStatusRuntimeException(e, responseObserver);
1✔
1225
    }
1✔
1226
  }
1✔
1227

1228
  private <R> R requireNotNull(String fieldName, R value) {
1229
    if (value == null) {
1✔
1230
      throw Status.INVALID_ARGUMENT
×
1231
          .withDescription("Missing required field \"" + fieldName + "\".")
×
1232
          .asRuntimeException();
×
1233
    }
1234
    return value;
1✔
1235
  }
1236

1237
  /**
1238
   * Adds diagnostic data about internal service state to the provided {@link StringBuilder}.
1239
   * Includes histories of all workflow instances stored in the service.
1240
   */
1241
  public void getDiagnostics(StringBuilder result) {
1242
    store.getDiagnostics(result);
×
1243
  }
×
1244

1245
  /**
1246
   * @deprecated use {@link io.temporal.serviceclient.TestServiceStubs} and {@link
1247
   *     io.temporal.api.testservice.v1.TestServiceGrpc.TestServiceBlockingStub#getCurrentTime(Empty)}
1248
   */
1249
  @Deprecated
1250
  public long currentTimeMillis() {
1251
    return selfAdvancingTimer.getClock().getAsLong();
×
1252
  }
1253

1254
  /** Invokes callback after the specified delay according to internal service clock. */
1255
  public void registerDelayedCallback(Duration delay, Runnable r) {
1256
    store.registerDelayedCallback(delay, r);
1✔
1257
  }
1✔
1258

1259
  /**
1260
   * Disables time skipping. To re-enable call {@link #unlockTimeSkipping(String)}. These calls are
1261
   * counted, so calling unlock does not guarantee that time is going to be skipped immediately as
1262
   * another lock can be holding it.
1263
   *
1264
   * @deprecated use {@link io.temporal.serviceclient.TestServiceStubs} and {@link
1265
   *     io.temporal.api.testservice.v1.TestServiceGrpc.TestServiceBlockingStub#lockTimeSkipping(LockTimeSkippingRequest)}
1266
   */
1267
  @Deprecated
1268
  public void lockTimeSkipping(String caller) {
1269
    selfAdvancingTimer.lockTimeSkipping(caller);
×
1270
  }
×
1271

1272
  /**
1273
   * @deprecated use {@link io.temporal.serviceclient.TestServiceStubs} and {@link
1274
   *     io.temporal.api.testservice.v1.TestServiceGrpc.TestServiceBlockingStub#unlockTimeSkipping(UnlockTimeSkippingRequest)}
1275
   */
1276
  @Deprecated
1277
  public void unlockTimeSkipping(String caller) {
1278
    selfAdvancingTimer.unlockTimeSkipping(caller);
×
1279
  }
×
1280

1281
  /**
1282
   * Unlocks time skipping and blocks the calling thread until internal clock passes the current +
1283
   * duration time.<br>
1284
   * When the time is reached, locks time skipping and returns.<br>
1285
   * Might not block at all due to time skipping. Or might block if the time skipping lock counter
1286
   * was more than 1.
1287
   *
1288
   * @deprecated use {@link io.temporal.serviceclient.TestServiceStubs} and {@link
1289
   *     io.temporal.api.testservice.v1.TestServiceGrpc.TestServiceBlockingStub#unlockTimeSkippingWithSleep(SleepRequest)}
1290
   */
1291
  @Deprecated
1292
  public void sleep(Duration duration) {
1293
    CompletableFuture<Void> result = new CompletableFuture<>();
×
1294
    selfAdvancingTimer.schedule(
×
1295
        duration,
1296
        () -> {
1297
          selfAdvancingTimer.lockTimeSkipping("TestWorkflowService sleep");
×
1298
          result.complete(null);
×
1299
        },
×
1300
        "workflow sleep");
1301
    selfAdvancingTimer.unlockTimeSkipping("TestWorkflowService sleep");
×
1302
    try {
1303
      result.get();
×
1304
    } catch (InterruptedException e) {
×
1305
      Thread.currentThread().interrupt();
×
1306
      throw new RuntimeException(e);
×
1307
    } catch (ExecutionException e) {
×
1308
      throw new RuntimeException(e);
×
1309
    }
×
1310
  }
×
1311

1312
  /**
1313
   * Temporal server times out task queue long poll calls after 1 minute and returns an empty
1314
   * result. After which the request has to be retried by the client if it wants to continue
1315
   * waiting. We emulate this behavior here.
1316
   *
1317
   * <p>If there is a deadline present, for task queue poll requests server will respond inside the
1318
   * deadline. Note that the latest is not applicable for getWorkflowExecutionHistory() long polls.
1319
   *
1320
   * @return minimum between the context deadline and maximum long poll deadline.
1321
   */
1322
  private Deadline getLongPollDeadline() {
1323
    @Nullable Deadline deadline = Context.current().getDeadline();
1✔
1324
    Deadline maximumDeadline =
1✔
1325
        Deadline.after(
1✔
1326
            WorkflowServiceStubsOptions.DEFAULT_SERVER_LONG_POLL_RPC_TIMEOUT.toMillis(),
1✔
1327
            TimeUnit.MILLISECONDS);
1328
    return deadline != null ? deadline.minimum(maximumDeadline) : maximumDeadline;
1✔
1329
  }
1330

1331
  private Deadline getUpdatePollDeadline() {
1332
    @Nullable Deadline deadline = Context.current().getDeadline();
1✔
1333
    Deadline maximumDeadline =
1✔
1334
        Deadline.after(Duration.ofSeconds(10).toMillis(), TimeUnit.MILLISECONDS);
1✔
1335
    return deadline != null ? deadline.minimum(maximumDeadline) : maximumDeadline;
1✔
1336
  }
1337

1338
  private void handleStatusRuntimeException(
1339
      StatusRuntimeException e, StreamObserver<?> responseObserver) {
1340
    if (e.getStatus().getCode() == Status.Code.INTERNAL) {
1✔
1341
      log.error("unexpected", e);
1✔
1342
    }
1343
    responseObserver.onError(e);
1✔
1344
  }
1✔
1345

1346
  /**
1347
   * Creates an in-memory service along with client stubs for use in Java code. See also
1348
   * createServerOnly and createWithNoGrpcServer.
1349
   *
1350
   * @deprecated use {@link io.temporal.testserver.TestServer#createServer(boolean)} instead and
1351
   *     pass {@code lockTimeSkipping=false} to emulate the behavior of this method
1352
   */
1353
  @Deprecated
1354
  public TestWorkflowService() {
1355
    this(0, true);
×
1356
  }
×
1357

1358
  /**
1359
   * Creates an in-memory service along with client stubs for use in Java code. See also
1360
   * createServerOnly and createWithNoGrpcServer.
1361
   *
1362
   * @deprecated use {@link io.temporal.testserver.TestServer#createServer(boolean, long)} instead
1363
   *     and pass {@code lockTimeSkipping=false} to emulate the behavior of this method
1364
   */
1365
  @Deprecated
1366
  public TestWorkflowService(long initialTimeMillis) {
1367
    this(initialTimeMillis, true);
×
1368
  }
×
1369

1370
  /**
1371
   * Creates an in-memory service along with client stubs for use in Java code. See also
1372
   * createServerOnly and createWithNoGrpcServer.
1373
   *
1374
   * @deprecated use {@link io.temporal.testserver.TestServer#createServer(boolean)} instead
1375
   */
1376
  @Deprecated
1377
  public TestWorkflowService(boolean lockTimeSkipping) {
1378
    this(0, true);
×
1379
    if (lockTimeSkipping) {
×
1380
      this.lockTimeSkipping("constructor");
×
1381
    }
1382
  }
×
1383

1384
  /**
1385
   * Creates an instance of TestWorkflowService that does not manage its own gRPC server. Useful for
1386
   * including in an externally managed gRPC server.
1387
   *
1388
   * @deprecated use {@link TestServicesStarter} to create just the services with gRPC server
1389
   */
1390
  @Deprecated
1391
  public static TestWorkflowService createWithNoGrpcServer() {
1392
    return new TestWorkflowService(0, false);
×
1393
  }
1394

1395
  private TestWorkflowService(long initialTimeMillis, boolean startInProcessServer) {
×
1396
    this.selfAdvancingTimer =
×
1397
        new SelfAdvancingTimerImpl(initialTimeMillis, Clock.systemDefaultZone());
×
1398
    store = new TestWorkflowStoreImpl(this.selfAdvancingTimer);
×
1399
    visibilityStore = new TestVisibilityStoreImpl();
×
NEW
1400
    nexusEndpointStore = new TestNexusEndpointStoreImpl();
×
1401
    outOfProcessServer = null;
×
1402
    if (startInProcessServer) {
×
1403
      this.inProcessServer = new InProcessGRPCServer(Collections.singletonList(this));
×
1404
      this.workflowServiceStubs =
×
1405
          WorkflowServiceStubs.newServiceStubs(
×
1406
              WorkflowServiceStubsOptions.newBuilder()
×
1407
                  .setChannel(inProcessServer.getChannel())
×
1408
                  .build());
×
1409
    } else {
1410
      this.inProcessServer = null;
×
1411
      this.workflowServiceStubs = null;
×
1412
    }
1413
  }
×
1414

1415
  /**
1416
   * Creates an out-of-process rather than in-process server, and does not set up a client. Useful,
1417
   * for example, if you want to use the test service from other SDKs.
1418
   *
1419
   * @param port the port to listen on
1420
   * @deprecated use {@link io.temporal.testserver.TestServer#createPortBoundServer(int, boolean)}
1421
   *     instead and pass {@code lockTimeSkipping=false} to emulate the behavior of this method
1422
   */
1423
  @Deprecated
1424
  public static TestWorkflowService createServerOnly(int port) {
1425
    TestWorkflowService result = new TestWorkflowService(true, port);
×
1426
    log.info("Server started, listening on " + port);
×
1427
    return result;
×
1428
  }
1429

1430
  private TestWorkflowService(boolean isOutOfProc, int port) {
×
1431
    // isOutOfProc is just here to make unambiguous constructor overloading.
1432
    Preconditions.checkState(isOutOfProc, "Impossible.");
×
1433
    inProcessServer = null;
×
1434
    workflowServiceStubs = null;
×
1435
    this.selfAdvancingTimer = new SelfAdvancingTimerImpl(0, Clock.systemDefaultZone());
×
1436
    store = new TestWorkflowStoreImpl(selfAdvancingTimer);
×
1437
    visibilityStore = new TestVisibilityStoreImpl();
×
NEW
1438
    nexusEndpointStore = new TestNexusEndpointStoreImpl();
×
1439
    try {
1440
      ServerBuilder<?> serverBuilder =
×
1441
          Grpc.newServerBuilderForPort(port, InsecureServerCredentials.create());
×
1442
      GRPCServerHelper.registerServicesAndHealthChecks(
×
1443
          Collections.singletonList(this), serverBuilder);
×
1444
      outOfProcessServer = serverBuilder.build().start();
×
1445
    } catch (IOException e) {
×
1446
      throw new RuntimeException(e);
×
1447
    }
×
1448
  }
×
1449

1450
  @Deprecated
1451
  public WorkflowServiceStubs newClientStub() {
1452
    if (workflowServiceStubs == null) {
×
1453
      throw new RuntimeException(
×
1454
          "Cannot get a client when you created your TestWorkflowService with createServerOnly.");
1455
    }
1456
    return workflowServiceStubs;
×
1457
  }
1458

1459
  private static StatusRuntimeException createInvalidArgument(String description) {
1460
    throw Status.INVALID_ARGUMENT.withDescription(description).asRuntimeException();
1✔
1461
  }
1462
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc