• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

temporalio / sdk-java / #333

16 Oct 2024 07:28PM UTC coverage: 78.65% (+0.6%) from 78.085%
#333

push

github

web-flow
Fix code coverage (#2275)

22670 of 28824 relevant lines covered (78.65%)

0.79 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

86.48
/temporal-sdk/src/main/java/io/temporal/client/WorkflowStubImpl.java
1
/*
2
 * Copyright (C) 2022 Temporal Technologies, Inc. All Rights Reserved.
3
 *
4
 * Copyright (C) 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved.
5
 *
6
 * Modifications copyright (C) 2017 Uber Technologies, Inc.
7
 *
8
 * Licensed under the Apache License, Version 2.0 (the "License");
9
 * you may not use this material except in compliance with the License.
10
 * You may obtain a copy of the License at
11
 *
12
 *   http://www.apache.org/licenses/LICENSE-2.0
13
 *
14
 * Unless required by applicable law or agreed to in writing, software
15
 * distributed under the License is distributed on an "AS IS" BASIS,
16
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17
 * See the License for the specific language governing permissions and
18
 * limitations under the License.
19
 */
20

21
package io.temporal.client;
22

23
import com.google.common.base.Strings;
24
import io.grpc.Status;
25
import io.grpc.StatusRuntimeException;
26
import io.temporal.api.common.v1.WorkflowExecution;
27
import io.temporal.api.errordetails.v1.QueryFailedFailure;
28
import io.temporal.api.errordetails.v1.WorkflowExecutionAlreadyStartedFailure;
29
import io.temporal.api.errordetails.v1.WorkflowNotReadyFailure;
30
import io.temporal.api.update.v1.WaitPolicy;
31
import io.temporal.common.interceptors.Header;
32
import io.temporal.common.interceptors.WorkflowClientCallsInterceptor;
33
import io.temporal.failure.CanceledFailure;
34
import io.temporal.internal.client.LazyWorkflowUpdateHandleImpl;
35
import io.temporal.serviceclient.CheckedExceptionWrapper;
36
import io.temporal.serviceclient.StatusUtils;
37
import java.lang.reflect.Type;
38
import java.util.Optional;
39
import java.util.UUID;
40
import java.util.concurrent.*;
41
import java.util.concurrent.atomic.AtomicReference;
42
import javax.annotation.Nonnull;
43
import javax.annotation.Nullable;
44

45
class WorkflowStubImpl implements WorkflowStub {
46
  private final WorkflowClientOptions clientOptions;
47
  private final WorkflowClientCallsInterceptor workflowClientInvoker;
48
  private final Optional<String> workflowType;
49
  // Execution this stub is bound to
50
  private final AtomicReference<WorkflowExecution> execution = new AtomicReference<>();
1✔
51
  // Full WorkflowExecution that this stub is started if any.
52
  // After a start, WorkflowStub binds to (workflowId, null) to follow the chain of RunIds.
53
  // But this field keeps the full (workflowId, runId) execution info that was started by this stub.
54
  private final AtomicReference<WorkflowExecution> startedExecution = new AtomicReference<>();
1✔
55
  // if null, this stub is created to bound to an existing execution.
56
  // This stub is created to bound to an existing execution otherwise.
57
  private final @Nullable WorkflowOptions options;
58

59
  WorkflowStubImpl(
60
      WorkflowClientOptions clientOptions,
61
      WorkflowClientCallsInterceptor workflowClientInvoker,
62
      Optional<String> workflowType,
63
      WorkflowExecution execution) {
1✔
64
    this.clientOptions = clientOptions;
1✔
65
    this.workflowClientInvoker = workflowClientInvoker;
1✔
66
    this.workflowType = workflowType;
1✔
67
    if (execution == null || execution.getWorkflowId().isEmpty()) {
1✔
68
      throw new IllegalArgumentException("null or empty workflowId");
×
69
    }
70
    this.execution.set(execution);
1✔
71
    this.options = null;
1✔
72
  }
1✔
73

74
  WorkflowStubImpl(
75
      WorkflowClientOptions clientOptions,
76
      WorkflowClientCallsInterceptor workflowClientInvoker,
77
      String workflowType,
78
      @Nonnull WorkflowOptions options) {
1✔
79
    this.clientOptions = clientOptions;
1✔
80
    this.workflowClientInvoker = workflowClientInvoker;
1✔
81
    this.workflowType = Optional.of(workflowType);
1✔
82
    this.options = options;
1✔
83
  }
1✔
84

85
  @Override
86
  public void signal(String signalName, Object... args) {
87
    checkStarted();
1✔
88
    WorkflowExecution targetExecution = currentExecutionWithoutRunId();
1✔
89
    try {
90
      workflowClientInvoker.signal(
1✔
91
          new WorkflowClientCallsInterceptor.WorkflowSignalInput(
92
              targetExecution, signalName, Header.empty(), args));
1✔
93
    } catch (Exception e) {
1✔
94
      Throwable throwable = throwAsWorkflowFailureException(e, targetExecution);
×
95
      throw new WorkflowServiceException(targetExecution, workflowType.orElse(null), throwable);
×
96
    }
1✔
97
  }
1✔
98

99
  private WorkflowExecution startWithOptions(WorkflowOptions options, Object... args) {
100
    checkExecutionIsNotStarted();
1✔
101
    String workflowId = getWorkflowIdForStart(options);
1✔
102
    WorkflowExecution workflowExecution = null;
1✔
103
    try {
104
      WorkflowClientCallsInterceptor.WorkflowStartOutput workflowStartOutput =
1✔
105
          workflowClientInvoker.start(
1✔
106
              new WorkflowClientCallsInterceptor.WorkflowStartInput(
107
                  workflowId, workflowType.get(), Header.empty(), args, options));
1✔
108
      workflowExecution = workflowStartOutput.getWorkflowExecution();
1✔
109
      populateExecutionAfterStart(workflowExecution);
1✔
110
      return workflowExecution;
1✔
111
    } catch (StatusRuntimeException e) {
1✔
112
      throw wrapStartException(workflowId, workflowType.orElse(null), e);
1✔
113
    } catch (Exception e) {
1✔
114
      if (workflowExecution == null) {
1✔
115
        // if start failed with exception - there could be no valid workflow execution populated
116
        // from the server.
117
        // WorkflowServiceException requires not null workflowExecution, so we have to provide
118
        // an WorkflowExecution instance with just a workflowId
119
        workflowExecution = WorkflowExecution.newBuilder().setWorkflowId(workflowId).build();
1✔
120
      }
121
      throw new WorkflowServiceException(workflowExecution, workflowType.orElse(null), e);
1✔
122
    }
123
  }
124

125
  @Override
126
  public WorkflowExecution start(Object... args) {
127
    if (options == null) {
1✔
128
      throw new IllegalStateException("Required parameter WorkflowOptions is missing");
×
129
    }
130
    return startWithOptions(WorkflowOptions.merge(null, null, options), args);
1✔
131
  }
132

133
  @Override
134
  public <R> WorkflowUpdateHandle<R> updateWithStart(
135
      UpdateWithStartWorkflowOperation<R> updateOperation, Object... startArgs) {
136
    if (options == null) {
1✔
137
      throw new IllegalStateException("Required parameter WorkflowOptions is missing");
×
138
    }
139
    if (!updateOperation.markInvoked()) {
1✔
140
      throw new IllegalStateException("UpdateWithStartWorkflowOperation was already executed");
1✔
141
    }
142
    checkExecutionIsNotStarted();
1✔
143
    String workflowId = getWorkflowIdForStart(options);
1✔
144
    WorkflowExecution workflowExecution = null;
1✔
145
    try {
146
      WorkflowClientCallsInterceptor.WorkflowStartInput startInput =
1✔
147
          new WorkflowClientCallsInterceptor.WorkflowStartInput(
148
              workflowId, workflowType.get(), Header.empty(), startArgs, options);
1✔
149
      WorkflowClientCallsInterceptor.WorkflowUpdateWithStartInput<R> input =
1✔
150
          new WorkflowClientCallsInterceptor.WorkflowUpdateWithStartInput<>(
151
              startInput, updateOperation);
152
      WorkflowClientCallsInterceptor.WorkflowUpdateWithStartOutput<R> output =
1✔
153
          workflowClientInvoker.updateWithStart(input);
1✔
154
      workflowExecution = output.getWorkflowStartOutput().getWorkflowExecution();
1✔
155
      populateExecutionAfterStart(workflowExecution);
1✔
156
      WorkflowUpdateHandle<R> updateHandle = output.getUpdateHandle();
1✔
157
      updateOperation.setUpdateHandle(updateHandle);
1✔
158
      return updateHandle;
1✔
159
    } catch (StatusRuntimeException e) {
1✔
160
      throw wrapStartException(workflowId, workflowType.orElse(null), e);
1✔
161
    } catch (Exception e) {
1✔
162
      if (workflowExecution == null) {
1✔
163
        // if start failed with exception - there could be no valid workflow execution populated
164
        // from the server.
165
        // WorkflowServiceException requires not null workflowExecution, so we have to provide
166
        // an WorkflowExecution instance with just a workflowId
167
        workflowExecution = WorkflowExecution.newBuilder().setWorkflowId(workflowId).build();
1✔
168
      }
169
      throw new WorkflowServiceException(workflowExecution, workflowType.orElse(null), e);
1✔
170
    }
171
  }
172

173
  private WorkflowExecution signalWithStartWithOptions(
174
      WorkflowOptions options, String signalName, Object[] signalArgs, Object[] startArgs) {
175
    checkExecutionIsNotStarted();
1✔
176
    String workflowId = getWorkflowIdForStart(options);
1✔
177
    WorkflowExecution workflowExecution = null;
1✔
178
    try {
179
      WorkflowClientCallsInterceptor.WorkflowSignalWithStartOutput workflowStartOutput =
1✔
180
          workflowClientInvoker.signalWithStart(
1✔
181
              new WorkflowClientCallsInterceptor.WorkflowSignalWithStartInput(
182
                  new WorkflowClientCallsInterceptor.WorkflowStartInput(
183
                      workflowId, workflowType.get(), Header.empty(), startArgs, options),
1✔
184
                  signalName,
185
                  signalArgs));
186
      workflowExecution = workflowStartOutput.getWorkflowStartOutput().getWorkflowExecution();
1✔
187
      populateExecutionAfterStart(workflowExecution);
1✔
188
      return workflowExecution;
1✔
189
    } catch (StatusRuntimeException e) {
1✔
190
      throw wrapStartException(workflowId, workflowType.orElse(null), e);
1✔
191
    } catch (Exception e) {
×
192
      if (workflowExecution == null) {
×
193
        // if start failed with exception - there could be no valid workflow execution populated
194
        // from the server.
195
        // WorkflowServiceException requires not null workflowExecution, so we have to provide
196
        // an WorkflowExecution instance with just a workflowId
197
        workflowExecution = WorkflowExecution.newBuilder().setWorkflowId(workflowId).build();
×
198
      }
199
      throw new WorkflowServiceException(workflowExecution, workflowType.orElse(null), e);
×
200
    }
201
  }
202

203
  private static String getWorkflowIdForStart(WorkflowOptions options) {
204
    String workflowId = options.getWorkflowId();
1✔
205
    if (workflowId == null) {
1✔
206
      workflowId = UUID.randomUUID().toString();
1✔
207
    }
208
    return workflowId;
1✔
209
  }
210

211
  @Override
212
  public WorkflowExecution signalWithStart(
213
      String signalName, Object[] signalArgs, Object[] startArgs) {
214
    if (options == null) {
1✔
215
      throw new IllegalStateException("Required parameter WorkflowOptions is missing");
×
216
    }
217
    return signalWithStartWithOptions(
1✔
218
        WorkflowOptions.merge(null, null, options), signalName, signalArgs, startArgs);
1✔
219
  }
220

221
  @Override
222
  public Optional<String> getWorkflowType() {
223
    return workflowType;
1✔
224
  }
225

226
  @Override
227
  public WorkflowExecution getExecution() {
228
    return options != null ? startedExecution.get() : execution.get();
1✔
229
  }
230

231
  @Override
232
  public <R> R getResult(Class<R> resultClass) {
233
    return getResult(resultClass, resultClass);
1✔
234
  }
235

236
  @Override
237
  public <R> R getResult(Class<R> resultClass, Type resultType) {
238
    try {
239
      // int max to not overflow long
240
      return getResult(Integer.MAX_VALUE, TimeUnit.MILLISECONDS, resultClass, resultType);
1✔
241
    } catch (TimeoutException e) {
×
242
      throw new WorkflowServiceException(execution.get(), workflowType.orElse(null), e);
×
243
    }
244
  }
245

246
  @Override
247
  public <R> R getResult(long timeout, TimeUnit unit, Class<R> resultClass)
248
      throws TimeoutException {
249
    return getResult(timeout, unit, resultClass, resultClass);
×
250
  }
251

252
  @Override
253
  public <R> R getResult(long timeout, TimeUnit unit, Class<R> resultClass, Type resultType)
254
      throws TimeoutException {
255
    checkStarted();
1✔
256
    WorkflowExecution targetExecution = execution.get();
1✔
257
    try {
258
      WorkflowClientCallsInterceptor.GetResultOutput<R> result =
1✔
259
          workflowClientInvoker.getResult(
1✔
260
              new WorkflowClientCallsInterceptor.GetResultInput<>(
261
                  targetExecution, workflowType, timeout, unit, resultClass, resultType));
262
      return result.getResult();
1✔
263
    } catch (Exception e) {
1✔
264
      return throwAsWorkflowFailureExceptionForResult(e, resultClass, targetExecution);
×
265
    }
266
  }
267

268
  @Override
269
  public <R> CompletableFuture<R> getResultAsync(Class<R> resultClass) {
270
    return getResultAsync(resultClass, resultClass);
1✔
271
  }
272

273
  @Override
274
  public <R> CompletableFuture<R> getResultAsync(Class<R> resultClass, Type resultType) {
275
    return getResultAsync(Long.MAX_VALUE, TimeUnit.MILLISECONDS, resultClass, resultType);
1✔
276
  }
277

278
  @Override
279
  public <R> CompletableFuture<R> getResultAsync(
280
      long timeout, TimeUnit unit, Class<R> resultClass) {
281
    return getResultAsync(timeout, unit, resultClass, resultClass);
1✔
282
  }
283

284
  @Override
285
  public <R> CompletableFuture<R> getResultAsync(
286
      long timeout, TimeUnit unit, Class<R> resultClass, Type resultType) {
287
    checkStarted();
1✔
288
    WorkflowExecution targetExecution = execution.get();
1✔
289
    WorkflowClientCallsInterceptor.GetResultAsyncOutput<R> result =
1✔
290
        workflowClientInvoker.getResultAsync(
1✔
291
            new WorkflowClientCallsInterceptor.GetResultInput<>(
292
                targetExecution, workflowType, timeout, unit, resultClass, resultType));
293
    return result
1✔
294
        .getResult()
1✔
295
        .exceptionally(
1✔
296
            e -> {
297
              try {
298
                return throwAsWorkflowFailureExceptionForResult(e, resultClass, targetExecution);
×
299
              } catch (TimeoutException ex) {
1✔
300
                throw new CompletionException(ex);
1✔
301
              }
302
            });
303
  }
304

305
  @Override
306
  public <R> R query(String queryType, Class<R> resultClass, Object... args) {
307
    return query(queryType, resultClass, resultClass, args);
1✔
308
  }
309

310
  @Override
311
  public <R> R query(String queryType, Class<R> resultClass, Type resultType, Object... args) {
312
    checkStarted();
1✔
313
    WorkflowClientCallsInterceptor.QueryOutput<R> result;
314
    WorkflowExecution targetExecution = execution.get();
1✔
315
    try {
316
      result =
1✔
317
          workflowClientInvoker.query(
1✔
318
              new WorkflowClientCallsInterceptor.QueryInput<>(
319
                  targetExecution, queryType, Header.empty(), args, resultClass, resultType));
1✔
320
    } catch (Exception e) {
1✔
321
      return throwAsWorkflowFailureExceptionForQuery(e, resultClass, targetExecution);
×
322
    }
1✔
323
    if (result.isQueryRejected()) {
1✔
324
      throw new WorkflowQueryConditionallyRejectedException(
1✔
325
          targetExecution,
326
          workflowType.orElse(null),
1✔
327
          clientOptions.getQueryRejectCondition(),
1✔
328
          result.getQueryRejectedStatus(),
1✔
329
          null);
330
    }
331
    return result.getResult();
1✔
332
  }
333

334
  @Override
335
  public <R> R update(String updateName, Class<R> resultClass, Object... args) {
336
    checkStarted();
1✔
337
    try {
338
      UpdateOptions<R> options =
339
          UpdateOptions.<R>newBuilder()
1✔
340
              .setUpdateName(updateName)
1✔
341
              .setWaitForStage(WorkflowUpdateStage.COMPLETED)
1✔
342
              .setResultClass(resultClass)
1✔
343
              .build();
1✔
344
      return startUpdate(options, args).getResultAsync().get();
1✔
345
    } catch (InterruptedException e) {
×
346
      throw new RuntimeException(e);
×
347
    } catch (ExecutionException e) {
1✔
348
      Throwable cause = e.getCause();
1✔
349
      throw (cause instanceof RuntimeException
1✔
350
          ? (RuntimeException) cause
1✔
351
          : new RuntimeException(cause));
1✔
352
    }
353
  }
354

355
  @Override
356
  public <R> WorkflowUpdateHandle<R> startUpdate(
357
      String updateName, WorkflowUpdateStage waitForStage, Class<R> resultClass, Object... args) {
358
    UpdateOptions<R> options =
359
        UpdateOptions.<R>newBuilder()
1✔
360
            .setUpdateName(updateName)
1✔
361
            .setWaitForStage(waitForStage)
1✔
362
            .setResultClass(resultClass)
1✔
363
            .setResultType(resultClass)
1✔
364
            .build();
1✔
365

366
    return startUpdate(options, args);
1✔
367
  }
368

369
  @Override
370
  public <R> WorkflowUpdateHandle<R> startUpdate(UpdateOptions<R> options, Object... args) {
371
    checkStarted();
1✔
372
    options.validate();
1✔
373
    WorkflowExecution targetExecution = execution.get();
1✔
374
    try {
375
      return workflowClientInvoker.startUpdate(
1✔
376
          new WorkflowClientCallsInterceptor.StartUpdateInput<>(
377
              targetExecution,
378
              workflowType,
379
              options.getUpdateName(),
1✔
380
              Header.empty(),
1✔
381
              options.getUpdateId(),
1✔
382
              args,
383
              options.getResultClass(),
1✔
384
              options.getResultType(),
1✔
385
              options.getFirstExecutionRunId(),
1✔
386
              WaitPolicy.newBuilder()
1✔
387
                  .setLifecycleStage(options.getWaitForStage().getProto())
1✔
388
                  .build()));
1✔
389
    } catch (Exception e) {
1✔
390
      Throwable throwable = throwAsWorkflowFailureException(e, targetExecution);
1✔
391
      throw new WorkflowServiceException(targetExecution, workflowType.orElse(null), throwable);
1✔
392
    }
393
  }
394

395
  @Override
396
  public <R> WorkflowUpdateHandle<R> getUpdateHandle(String updateId, Class<R> resultClass) {
397
    return new LazyWorkflowUpdateHandleImpl<>(
1✔
398
        workflowClientInvoker,
399
        workflowType.orElse(null),
1✔
400
        "",
401
        updateId,
402
        execution.get(),
1✔
403
        resultClass,
404
        resultClass);
405
  }
406

407
  @Override
408
  public <R> WorkflowUpdateHandle<R> getUpdateHandle(
409
      String updateId, Class<R> resultClass, Type resultType) {
410
    return new LazyWorkflowUpdateHandleImpl<>(
×
411
        workflowClientInvoker,
412
        workflowType.orElse(null),
×
413
        "",
414
        updateId,
415
        execution.get(),
×
416
        resultClass,
417
        resultType);
418
  }
419

420
  @Override
421
  public void cancel() {
422
    checkStarted();
1✔
423
    WorkflowExecution targetExecution = currentExecutionWithoutRunId();
1✔
424
    try {
425
      workflowClientInvoker.cancel(new WorkflowClientCallsInterceptor.CancelInput(targetExecution));
1✔
426
    } catch (Exception e) {
1✔
427
      Throwable failure = throwAsWorkflowFailureException(e, targetExecution);
×
428
      throw new WorkflowServiceException(targetExecution, workflowType.orElse(null), failure);
×
429
    }
1✔
430
  }
1✔
431

432
  @Override
433
  public void terminate(@Nullable String reason, Object... details) {
434
    checkStarted();
1✔
435
    WorkflowExecution targetExecution = currentExecutionWithoutRunId();
1✔
436
    try {
437
      workflowClientInvoker.terminate(
1✔
438
          new WorkflowClientCallsInterceptor.TerminateInput(targetExecution, reason, details));
439
    } catch (Exception e) {
1✔
440
      Throwable failure = throwAsWorkflowFailureException(e, targetExecution);
×
441
      throw new WorkflowServiceException(targetExecution, workflowType.orElse(null), failure);
×
442
    }
1✔
443
  }
1✔
444

445
  @Override
446
  public Optional<WorkflowOptions> getOptions() {
447
    return Optional.ofNullable(options);
1✔
448
  }
449

450
  @Override
451
  public WorkflowStub newInstance(WorkflowOptions options) {
452
    return new WorkflowStubImpl(
1✔
453
        clientOptions, workflowClientInvoker, workflowType.orElse(null), options);
1✔
454
  }
455

456
  private void checkStarted() {
457
    if (execution.get() == null || execution.get().getWorkflowId() == null) {
1✔
458
      throw new IllegalStateException("Null workflowId. Was workflow started?");
×
459
    }
460
  }
1✔
461

462
  private void checkExecutionIsNotStarted() {
463
    if (execution.get() != null) {
1✔
464
      throw new IllegalStateException(
1✔
465
          "Cannot reuse a stub instance to start more than one workflow execution. The stub "
466
              + "points to already started execution. If you are trying to wait for a workflow completion either "
467
              + "change WorkflowIdReusePolicy from AllowDuplicate or use WorkflowStub.getResult");
468
    }
469
  }
1✔
470

471
  /*
472
   * Exceptions handling and processing for all methods of the stub
473
   */
474
  private RuntimeException wrapStartException(
475
      String workflowId, String workflowType, StatusRuntimeException e) {
476
    WorkflowExecution.Builder executionBuilder =
477
        WorkflowExecution.newBuilder().setWorkflowId(workflowId);
1✔
478

479
    WorkflowExecutionAlreadyStartedFailure f =
1✔
480
        StatusUtils.getFailure(e, WorkflowExecutionAlreadyStartedFailure.class);
1✔
481
    if (f != null) {
1✔
482
      WorkflowExecution exe = executionBuilder.setRunId(f.getRunId()).build();
1✔
483
      populateExecutionAfterStart(exe);
1✔
484
      return new WorkflowExecutionAlreadyStarted(exe, workflowType, e);
1✔
485
    } else {
486
      WorkflowExecution exe = executionBuilder.build();
1✔
487
      return new WorkflowServiceException(exe, workflowType, e);
1✔
488
    }
489
  }
490

491
  /**
492
   * RunId can change e.g. workflow does ContinueAsNew. Emptying runId in workflowExecution allows
493
   * Temporal server figure out the current run id dynamically.
494
   */
495
  private WorkflowExecution currentExecutionWithoutRunId() {
496
    WorkflowExecution workflowExecution = execution.get();
1✔
497
    if (Strings.isNullOrEmpty(workflowExecution.getRunId())) {
1✔
498
      return workflowExecution;
1✔
499
    } else {
500
      return WorkflowExecution.newBuilder(workflowExecution).setRunId("").build();
1✔
501
    }
502
  }
503

504
  private <R> R throwAsWorkflowFailureExceptionForQuery(
505
      Throwable failure,
506
      @SuppressWarnings("unused") Class<R> returnType,
507
      WorkflowExecution targetExecution) {
508
    failure = throwAsWorkflowFailureException(failure, targetExecution);
1✔
509
    if (failure instanceof StatusRuntimeException) {
1✔
510
      StatusRuntimeException sre = (StatusRuntimeException) failure;
1✔
511
      if (StatusUtils.hasFailure(sre, QueryFailedFailure.class)) {
1✔
512
        throw new WorkflowQueryException(execution.get(), workflowType.orElse(null), failure);
1✔
513
      } else if (Status.Code.FAILED_PRECONDITION.equals(sre.getStatus().getCode())
×
514
          && StatusUtils.hasFailure(sre, WorkflowNotReadyFailure.class)) {
×
515
        // Processes the edge case introduced by https://github.com/temporalio/temporal/pull/2826
516
        throw new WorkflowQueryRejectedException(
×
517
            targetExecution, workflowType.orElse(null), failure);
×
518
      }
519
    }
520
    throw new WorkflowServiceException(targetExecution, workflowType.orElse(null), failure);
×
521
  }
522

523
  // This function never returns anything, it only throws
524
  private <R> R throwAsWorkflowFailureExceptionForResult(
525
      Throwable failure,
526
      @SuppressWarnings("unused") Class<R> returnType,
527
      WorkflowExecution targetExecution)
528
      throws TimeoutException {
529
    failure = throwAsWorkflowFailureException(failure, targetExecution);
1✔
530
    if (failure instanceof TimeoutException) {
1✔
531
      throw (TimeoutException) failure;
1✔
532
    } else if (failure instanceof CanceledFailure) {
1✔
533
      throw (CanceledFailure) failure;
×
534
    }
535
    throw new WorkflowServiceException(targetExecution, workflowType.orElse(null), failure);
1✔
536
  }
537

538
  private Throwable throwAsWorkflowFailureException(
539
      Throwable failure, WorkflowExecution targetExecution) {
540
    if (failure instanceof CompletionException) {
1✔
541
      // if we work with CompletableFuture, the exception may be wrapped into CompletionException
542
      failure = failure.getCause();
1✔
543
    }
544
    failure = CheckedExceptionWrapper.unwrap(failure);
1✔
545
    if (failure instanceof Error) {
1✔
546
      throw (Error) failure;
×
547
    }
548
    if (failure instanceof StatusRuntimeException) {
1✔
549
      StatusRuntimeException sre = (StatusRuntimeException) failure;
1✔
550
      if (Status.Code.NOT_FOUND.equals(sre.getStatus().getCode())) {
1✔
551
        throw new WorkflowNotFoundException(targetExecution, workflowType.orElse(null), sre);
1✔
552
      }
553
    } else if (failure instanceof WorkflowException) {
1✔
554
      throw (WorkflowException) failure;
1✔
555
    }
556
    return failure;
1✔
557
  }
558

559
  private void populateExecutionAfterStart(WorkflowExecution startedExecution) {
560
    this.startedExecution.set(startedExecution);
1✔
561
    // bind to an execution without a runId, so queries follow runId chains by default
562
    this.execution.set(WorkflowExecution.newBuilder(startedExecution).setRunId("").build());
1✔
563
  }
1✔
564
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc