• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

temporalio / sdk-java / #169

pending completion
#169

push

github-actions

web-flow
Remove use of deprecated API (#1758)

4 of 4 new or added lines in 1 file covered. (100.0%)

17345 of 21558 relevant lines covered (80.46%)

0.8 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

75.77
/temporal-sdk/src/main/java/io/temporal/client/WorkflowStubImpl.java
1
/*
2
 * Copyright (C) 2022 Temporal Technologies, Inc. All Rights Reserved.
3
 *
4
 * Copyright (C) 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved.
5
 *
6
 * Modifications copyright (C) 2017 Uber Technologies, Inc.
7
 *
8
 * Licensed under the Apache License, Version 2.0 (the "License");
9
 * you may not use this material except in compliance with the License.
10
 * You may obtain a copy of the License at
11
 *
12
 *   http://www.apache.org/licenses/LICENSE-2.0
13
 *
14
 * Unless required by applicable law or agreed to in writing, software
15
 * distributed under the License is distributed on an "AS IS" BASIS,
16
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17
 * See the License for the specific language governing permissions and
18
 * limitations under the License.
19
 */
20

21
package io.temporal.client;
22

23
import com.google.common.base.Strings;
24
import io.grpc.Status;
25
import io.grpc.StatusRuntimeException;
26
import io.temporal.api.common.v1.WorkflowExecution;
27
import io.temporal.api.errordetails.v1.QueryFailedFailure;
28
import io.temporal.api.errordetails.v1.WorkflowExecutionAlreadyStartedFailure;
29
import io.temporal.api.errordetails.v1.WorkflowNotReadyFailure;
30
import io.temporal.common.interceptors.Header;
31
import io.temporal.common.interceptors.WorkflowClientCallsInterceptor;
32
import io.temporal.common.interceptors.WorkflowClientCallsInterceptor.UpdateOutput;
33
import io.temporal.failure.CanceledFailure;
34
import io.temporal.serviceclient.CheckedExceptionWrapper;
35
import io.temporal.serviceclient.StatusUtils;
36
import java.lang.reflect.Type;
37
import java.util.Optional;
38
import java.util.UUID;
39
import java.util.concurrent.CompletableFuture;
40
import java.util.concurrent.CompletionException;
41
import java.util.concurrent.TimeUnit;
42
import java.util.concurrent.TimeoutException;
43
import java.util.concurrent.atomic.AtomicReference;
44
import javax.annotation.Nonnull;
45
import javax.annotation.Nullable;
46

47
class WorkflowStubImpl implements WorkflowStub {
48
  private final WorkflowClientOptions clientOptions;
49
  private final WorkflowClientCallsInterceptor workflowClientInvoker;
50
  private final Optional<String> workflowType;
51
  // Execution this stub is bound to
52
  private final AtomicReference<WorkflowExecution> execution = new AtomicReference<>();
1✔
53
  // Full WorkflowExecution that this stub is started if any.
54
  // After a start, WorkflowStub binds to (workflowId, null) to follow the chain of RunIds.
55
  // But this field keeps the full (workflowId, runId) execution info that was started by this stub.
56
  private final AtomicReference<WorkflowExecution> startedExecution = new AtomicReference<>();
1✔
57
  // if null, this stub is created to bound to an existing execution.
58
  // This stub is created to bound to an existing execution otherwise.
59
  private final @Nullable WorkflowOptions options;
60

61
  WorkflowStubImpl(
62
      WorkflowClientOptions clientOptions,
63
      WorkflowClientCallsInterceptor workflowClientInvoker,
64
      Optional<String> workflowType,
65
      WorkflowExecution execution) {
1✔
66
    this.clientOptions = clientOptions;
1✔
67
    this.workflowClientInvoker = workflowClientInvoker;
1✔
68
    this.workflowType = workflowType;
1✔
69
    if (execution == null || execution.getWorkflowId().isEmpty()) {
1✔
70
      throw new IllegalArgumentException("null or empty workflowId");
×
71
    }
72
    this.execution.set(execution);
1✔
73
    this.options = null;
1✔
74
  }
1✔
75

76
  WorkflowStubImpl(
77
      WorkflowClientOptions clientOptions,
78
      WorkflowClientCallsInterceptor workflowClientInvoker,
79
      String workflowType,
80
      @Nonnull WorkflowOptions options) {
1✔
81
    this.clientOptions = clientOptions;
1✔
82
    this.workflowClientInvoker = workflowClientInvoker;
1✔
83
    this.workflowType = Optional.of(workflowType);
1✔
84
    this.options = options;
1✔
85
  }
1✔
86

87
  @Override
88
  public void signal(String signalName, Object... args) {
89
    checkStarted();
1✔
90
    WorkflowExecution targetExecution = currentExecutionWithoutRunId();
1✔
91
    try {
92
      workflowClientInvoker.signal(
1✔
93
          new WorkflowClientCallsInterceptor.WorkflowSignalInput(
94
              targetExecution, signalName, args));
95
    } catch (Exception e) {
1✔
96
      Throwable throwable = throwAsWorkflowFailureException(e, targetExecution);
×
97
      throw new WorkflowServiceException(targetExecution, workflowType.orElse(null), throwable);
×
98
    }
1✔
99
  }
1✔
100

101
  private WorkflowExecution startWithOptions(WorkflowOptions options, Object... args) {
102
    checkExecutionIsNotStarted();
1✔
103
    String workflowId = getWorkflowIdForStart(options);
1✔
104
    WorkflowExecution workflowExecution = null;
1✔
105
    try {
106
      WorkflowClientCallsInterceptor.WorkflowStartOutput workflowStartOutput =
1✔
107
          workflowClientInvoker.start(
1✔
108
              new WorkflowClientCallsInterceptor.WorkflowStartInput(
109
                  workflowId, workflowType.get(), Header.empty(), args, options));
1✔
110
      workflowExecution = workflowStartOutput.getWorkflowExecution();
1✔
111
      populateExecutionAfterStart(workflowExecution);
1✔
112
      return workflowExecution;
1✔
113
    } catch (StatusRuntimeException e) {
1✔
114
      throw wrapStartException(workflowId, workflowType.orElse(null), e);
1✔
115
    } catch (Exception e) {
1✔
116
      if (workflowExecution == null) {
1✔
117
        // if start failed with exception - there could be no valid workflow execution populated
118
        // from the server.
119
        // WorkflowServiceException requires not null workflowExecution, so we have to provide
120
        // an WorkflowExecution instance with just a workflowId
121
        workflowExecution = WorkflowExecution.newBuilder().setWorkflowId(workflowId).build();
1✔
122
      }
123
      throw new WorkflowServiceException(workflowExecution, workflowType.orElse(null), e);
1✔
124
    }
125
  }
126

127
  @Override
128
  public WorkflowExecution start(Object... args) {
129
    if (options == null) {
1✔
130
      throw new IllegalStateException("Required parameter WorkflowOptions is missing");
×
131
    }
132
    return startWithOptions(WorkflowOptions.merge(null, null, options), args);
1✔
133
  }
134

135
  private WorkflowExecution signalWithStartWithOptions(
136
      WorkflowOptions options, String signalName, Object[] signalArgs, Object[] startArgs) {
137
    checkExecutionIsNotStarted();
1✔
138
    String workflowId = getWorkflowIdForStart(options);
1✔
139
    WorkflowExecution workflowExecution = null;
1✔
140
    try {
141
      WorkflowClientCallsInterceptor.WorkflowSignalWithStartOutput workflowStartOutput =
1✔
142
          workflowClientInvoker.signalWithStart(
1✔
143
              new WorkflowClientCallsInterceptor.WorkflowSignalWithStartInput(
144
                  new WorkflowClientCallsInterceptor.WorkflowStartInput(
145
                      workflowId, workflowType.get(), Header.empty(), startArgs, options),
1✔
146
                  signalName,
147
                  signalArgs));
148
      workflowExecution = workflowStartOutput.getWorkflowStartOutput().getWorkflowExecution();
1✔
149
      populateExecutionAfterStart(workflowExecution);
1✔
150
      return workflowExecution;
1✔
151
    } catch (StatusRuntimeException e) {
1✔
152
      throw wrapStartException(workflowId, workflowType.orElse(null), e);
1✔
153
    } catch (Exception e) {
×
154
      if (workflowExecution == null) {
×
155
        // if start failed with exception - there could be no valid workflow execution populated
156
        // from the server.
157
        // WorkflowServiceException requires not null workflowExecution, so we have to provide
158
        // an WorkflowExecution instance with just a workflowId
159
        workflowExecution = WorkflowExecution.newBuilder().setWorkflowId(workflowId).build();
×
160
      }
161
      throw new WorkflowServiceException(workflowExecution, workflowType.orElse(null), e);
×
162
    }
163
  }
164

165
  private static String getWorkflowIdForStart(WorkflowOptions options) {
166
    String workflowId = options.getWorkflowId();
1✔
167
    if (workflowId == null) {
1✔
168
      workflowId = UUID.randomUUID().toString();
1✔
169
    }
170
    return workflowId;
1✔
171
  }
172

173
  @Override
174
  public WorkflowExecution signalWithStart(
175
      String signalName, Object[] signalArgs, Object[] startArgs) {
176
    if (options == null) {
1✔
177
      throw new IllegalStateException("Required parameter WorkflowOptions is missing");
×
178
    }
179
    return signalWithStartWithOptions(
1✔
180
        WorkflowOptions.merge(null, null, options), signalName, signalArgs, startArgs);
1✔
181
  }
182

183
  @Override
184
  public Optional<String> getWorkflowType() {
185
    return workflowType;
1✔
186
  }
187

188
  @Override
189
  public WorkflowExecution getExecution() {
190
    return options != null ? startedExecution.get() : execution.get();
1✔
191
  }
192

193
  @Override
194
  public <R> R getResult(Class<R> resultClass) {
195
    return getResult(resultClass, resultClass);
1✔
196
  }
197

198
  @Override
199
  public <R> R getResult(Class<R> resultClass, Type resultType) {
200
    try {
201
      // int max to not overflow long
202
      return getResult(Integer.MAX_VALUE, TimeUnit.MILLISECONDS, resultClass, resultType);
1✔
203
    } catch (TimeoutException e) {
×
204
      throw new WorkflowServiceException(execution.get(), workflowType.orElse(null), e);
×
205
    }
206
  }
207

208
  @Override
209
  public <R> R getResult(long timeout, TimeUnit unit, Class<R> resultClass)
210
      throws TimeoutException {
211
    return getResult(timeout, unit, resultClass, resultClass);
×
212
  }
213

214
  @Override
215
  public <R> R getResult(long timeout, TimeUnit unit, Class<R> resultClass, Type resultType)
216
      throws TimeoutException {
217
    checkStarted();
1✔
218
    WorkflowExecution targetExecution = execution.get();
1✔
219
    try {
220
      WorkflowClientCallsInterceptor.GetResultOutput<R> result =
1✔
221
          workflowClientInvoker.getResult(
1✔
222
              new WorkflowClientCallsInterceptor.GetResultInput<>(
223
                  targetExecution, workflowType, timeout, unit, resultClass, resultType));
224
      return result.getResult();
1✔
225
    } catch (Exception e) {
1✔
226
      return throwAsWorkflowFailureExceptionForResult(e, resultClass, targetExecution);
×
227
    }
228
  }
229

230
  @Override
231
  public <R> CompletableFuture<R> getResultAsync(Class<R> resultClass) {
232
    return getResultAsync(resultClass, resultClass);
1✔
233
  }
234

235
  @Override
236
  public <R> CompletableFuture<R> getResultAsync(Class<R> resultClass, Type resultType) {
237
    return getResultAsync(Long.MAX_VALUE, TimeUnit.MILLISECONDS, resultClass, resultType);
1✔
238
  }
239

240
  @Override
241
  public <R> CompletableFuture<R> getResultAsync(
242
      long timeout, TimeUnit unit, Class<R> resultClass) {
243
    return getResultAsync(timeout, unit, resultClass, resultClass);
1✔
244
  }
245

246
  @Override
247
  public <R> CompletableFuture<R> getResultAsync(
248
      long timeout, TimeUnit unit, Class<R> resultClass, Type resultType) {
249
    checkStarted();
1✔
250
    WorkflowExecution targetExecution = execution.get();
1✔
251
    WorkflowClientCallsInterceptor.GetResultAsyncOutput<R> result =
1✔
252
        workflowClientInvoker.getResultAsync(
1✔
253
            new WorkflowClientCallsInterceptor.GetResultInput<>(
254
                targetExecution, workflowType, timeout, unit, resultClass, resultType));
255
    return result
1✔
256
        .getResult()
1✔
257
        .exceptionally(
1✔
258
            e -> {
259
              try {
260
                return throwAsWorkflowFailureExceptionForResult(e, resultClass, targetExecution);
×
261
              } catch (TimeoutException ex) {
1✔
262
                throw new CompletionException(ex);
1✔
263
              }
264
            });
265
  }
266

267
  @Override
268
  public <R> R query(String queryType, Class<R> resultClass, Object... args) {
269
    return query(queryType, resultClass, resultClass, args);
1✔
270
  }
271

272
  @Override
273
  public <R> R query(String queryType, Class<R> resultClass, Type resultType, Object... args) {
274
    checkStarted();
1✔
275
    WorkflowClientCallsInterceptor.QueryOutput<R> result;
276
    WorkflowExecution targetExecution = execution.get();
1✔
277
    try {
278
      result =
1✔
279
          workflowClientInvoker.query(
1✔
280
              new WorkflowClientCallsInterceptor.QueryInput<>(
281
                  targetExecution, queryType, args, resultClass, resultType));
282
    } catch (Exception e) {
1✔
283
      return throwAsWorkflowFailureExceptionForQuery(e, resultClass, targetExecution);
×
284
    }
1✔
285
    if (result.isQueryRejected()) {
1✔
286
      throw new WorkflowQueryConditionallyRejectedException(
1✔
287
          targetExecution,
288
          workflowType.orElse(null),
1✔
289
          clientOptions.getQueryRejectCondition(),
1✔
290
          result.getQueryRejectedStatus(),
1✔
291
          null);
292
    }
293
    return result.getResult();
1✔
294
  }
295

296
  @Override
297
  public <R> R update(String updateName, Class<R> resultClass, Object... args) {
298
    return update(updateName, "", "", resultClass, resultClass, args);
×
299
  }
300

301
  @Override
302
  public <R> R update(
303
      String updateName,
304
      String updateId,
305
      String firstExecutionRunId,
306
      Class<R> resultClass,
307
      Type resultType,
308
      Object... args) {
309
    checkStarted();
×
310
    UpdateOutput<R> result;
311
    WorkflowExecution targetExecution = execution.get();
×
312
    try {
313
      result =
×
314
          workflowClientInvoker.update(
×
315
              new WorkflowClientCallsInterceptor.UpdateInput<>(
316
                  targetExecution,
317
                  updateName,
318
                  updateId,
319
                  args,
320
                  resultClass,
321
                  resultType,
322
                  firstExecutionRunId));
323
      return result.getResult();
×
324

325
    } catch (Exception e) {
×
326
      Throwable throwable = throwAsWorkflowFailureException(e, targetExecution);
×
327
      throw new WorkflowServiceException(targetExecution, workflowType.orElse(null), throwable);
×
328
    }
329
  }
330

331
  @Override
332
  public <R> UpdateHandle<R> startUpdate(String updateName, Class<R> resultClass, Object... args) {
333
    return startUpdate(updateName, "", "", resultClass, resultClass, args);
×
334
  }
335

336
  @Override
337
  public <R> UpdateHandle<R> startUpdate(
338
      String updateName,
339
      String updateId,
340
      String firstExecutionRunId,
341
      Class<R> resultClass,
342
      Type resultType,
343
      Object... args) {
344
    checkStarted();
×
345
    WorkflowExecution targetExecution = execution.get();
×
346

347
    WorkflowClientCallsInterceptor.UpdateAsyncOutput<R> result =
×
348
        workflowClientInvoker.updateAsync(
×
349
            new WorkflowClientCallsInterceptor.UpdateInput<>(
350
                targetExecution,
351
                updateName,
352
                updateId,
353
                args,
354
                resultClass,
355
                resultType,
356
                firstExecutionRunId));
357

358
    return new UpdateHandleImpl<>(
×
359
        updateId,
360
        targetExecution,
361
        result
362
            .getResult()
×
363
            .exceptionally(
×
364
                e -> {
365
                  Throwable throwable = throwAsWorkflowFailureException(e, targetExecution);
×
366
                  throw new WorkflowServiceException(
×
367
                      targetExecution, workflowType.orElse(null), throwable);
×
368
                }));
369
  }
370

371
  @Override
372
  public void cancel() {
373
    checkStarted();
1✔
374
    WorkflowExecution targetExecution = currentExecutionWithoutRunId();
1✔
375
    try {
376
      workflowClientInvoker.cancel(new WorkflowClientCallsInterceptor.CancelInput(targetExecution));
1✔
377
    } catch (Exception e) {
1✔
378
      Throwable failure = throwAsWorkflowFailureException(e, targetExecution);
×
379
      throw new WorkflowServiceException(targetExecution, workflowType.orElse(null), failure);
×
380
    }
1✔
381
  }
1✔
382

383
  @Override
384
  public void terminate(@Nullable String reason, Object... details) {
385
    checkStarted();
1✔
386
    WorkflowExecution targetExecution = currentExecutionWithoutRunId();
1✔
387
    try {
388
      workflowClientInvoker.terminate(
1✔
389
          new WorkflowClientCallsInterceptor.TerminateInput(targetExecution, reason, details));
390
    } catch (Exception e) {
1✔
391
      Throwable failure = throwAsWorkflowFailureException(e, targetExecution);
×
392
      throw new WorkflowServiceException(targetExecution, workflowType.orElse(null), failure);
×
393
    }
1✔
394
  }
1✔
395

396
  @Override
397
  public Optional<WorkflowOptions> getOptions() {
398
    return Optional.ofNullable(options);
1✔
399
  }
400

401
  private void checkStarted() {
402
    if (execution.get() == null || execution.get().getWorkflowId() == null) {
1✔
403
      throw new IllegalStateException("Null workflowId. Was workflow started?");
×
404
    }
405
  }
1✔
406

407
  private void checkExecutionIsNotStarted() {
408
    if (execution.get() != null) {
1✔
409
      throw new IllegalStateException(
1✔
410
          "Cannot reuse a stub instance to start more than one workflow execution. The stub "
411
              + "points to already started execution. If you are trying to wait for a workflow completion either "
412
              + "change WorkflowIdReusePolicy from AllowDuplicate or use WorkflowStub.getResult");
413
    }
414
  }
1✔
415

416
  /*
417
   * Exceptions handling and processing for all methods of the stub
418
   */
419
  private RuntimeException wrapStartException(
420
      String workflowId, String workflowType, StatusRuntimeException e) {
421
    WorkflowExecution.Builder executionBuilder =
422
        WorkflowExecution.newBuilder().setWorkflowId(workflowId);
1✔
423

424
    WorkflowExecutionAlreadyStartedFailure f =
1✔
425
        StatusUtils.getFailure(e, WorkflowExecutionAlreadyStartedFailure.class);
1✔
426
    if (f != null) {
1✔
427
      WorkflowExecution exe = executionBuilder.setRunId(f.getRunId()).build();
1✔
428
      populateExecutionAfterStart(exe);
1✔
429
      return new WorkflowExecutionAlreadyStarted(exe, workflowType, e);
1✔
430
    } else {
431
      WorkflowExecution exe = executionBuilder.build();
1✔
432
      return new WorkflowServiceException(exe, workflowType, e);
1✔
433
    }
434
  }
435

436
  /**
437
   * RunId can change e.g. workflow does ContinueAsNew. Emptying runId in workflowExecution allows
438
   * Temporal server figure out the current run id dynamically.
439
   */
440
  private WorkflowExecution currentExecutionWithoutRunId() {
441
    WorkflowExecution workflowExecution = execution.get();
1✔
442
    if (Strings.isNullOrEmpty(workflowExecution.getRunId())) {
1✔
443
      return workflowExecution;
1✔
444
    } else {
445
      return WorkflowExecution.newBuilder(workflowExecution).setRunId("").build();
1✔
446
    }
447
  }
448

449
  private <R> R throwAsWorkflowFailureExceptionForQuery(
450
      Throwable failure,
451
      @SuppressWarnings("unused") Class<R> returnType,
452
      WorkflowExecution targetExecution) {
453
    failure = throwAsWorkflowFailureException(failure, targetExecution);
1✔
454
    if (failure instanceof StatusRuntimeException) {
1✔
455
      StatusRuntimeException sre = (StatusRuntimeException) failure;
1✔
456
      if (StatusUtils.hasFailure(sre, QueryFailedFailure.class)) {
1✔
457
        throw new WorkflowQueryException(execution.get(), workflowType.orElse(null), failure);
1✔
458
      } else if (Status.Code.FAILED_PRECONDITION.equals(sre.getStatus().getCode())
×
459
          && StatusUtils.hasFailure(sre, WorkflowNotReadyFailure.class)) {
×
460
        // Processes the edge case introduced by https://github.com/temporalio/temporal/pull/2826
461
        throw new WorkflowQueryRejectedException(
×
462
            targetExecution, workflowType.orElse(null), failure);
×
463
      }
464
    }
465
    throw new WorkflowServiceException(targetExecution, workflowType.orElse(null), failure);
×
466
  }
467

468
  // This function never returns anything, it only throws
469
  private <R> R throwAsWorkflowFailureExceptionForResult(
470
      Throwable failure,
471
      @SuppressWarnings("unused") Class<R> returnType,
472
      WorkflowExecution targetExecution)
473
      throws TimeoutException {
474
    failure = throwAsWorkflowFailureException(failure, targetExecution);
1✔
475
    if (failure instanceof TimeoutException) {
1✔
476
      throw (TimeoutException) failure;
1✔
477
    } else if (failure instanceof CanceledFailure) {
1✔
478
      throw (CanceledFailure) failure;
×
479
    }
480
    throw new WorkflowServiceException(targetExecution, workflowType.orElse(null), failure);
1✔
481
  }
482

483
  private Throwable throwAsWorkflowFailureException(
484
      Throwable failure, WorkflowExecution targetExecution) {
485
    if (failure instanceof CompletionException) {
1✔
486
      // if we work with CompletableFuture, the exception may be wrapped into CompletionException
487
      failure = failure.getCause();
1✔
488
    }
489
    failure = CheckedExceptionWrapper.unwrap(failure);
1✔
490
    if (failure instanceof Error) {
1✔
491
      throw (Error) failure;
×
492
    }
493
    if (failure instanceof StatusRuntimeException) {
1✔
494
      StatusRuntimeException sre = (StatusRuntimeException) failure;
1✔
495
      if (Status.Code.NOT_FOUND.equals(sre.getStatus().getCode())) {
1✔
496
        throw new WorkflowNotFoundException(targetExecution, workflowType.orElse(null), sre);
1✔
497
      }
498
    } else if (failure instanceof WorkflowException) {
1✔
499
      throw (WorkflowException) failure;
1✔
500
    }
501
    return failure;
1✔
502
  }
503

504
  private void populateExecutionAfterStart(WorkflowExecution startedExecution) {
505
    this.startedExecution.set(startedExecution);
1✔
506
    // bind to an execution without a runId, so queries follow runId chains by default
507
    this.execution.set(WorkflowExecution.newBuilder(startedExecution).setRunId("").build());
1✔
508
  }
1✔
509
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc