• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

uber / cadence-java-client / 2005

10 Oct 2023 06:01PM UTC coverage: 60.196% (-0.1%) from 60.308%
2005

push

buildkite

web-flow
Release 3.10.1 (#857)

* fixed bug: Added alreadyStarted workflow case

11335 of 18830 relevant lines covered (60.2%)

0.6 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

81.69
/src/main/java/com/uber/cadence/internal/sync/WorkflowStubImpl.java
1
/*
2
 *  Copyright 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved.
3
 *
4
 *  Modifications copyright (C) 2017 Uber Technologies, Inc.
5
 *
6
 *  Licensed under the Apache License, Version 2.0 (the "License"). You may not
7
 *  use this file except in compliance with the License. A copy of the License is
8
 *  located at
9
 *
10
 *  http://aws.amazon.com/apache2.0
11
 *
12
 *  or in the "license" file accompanying this file. This file is distributed on
13
 *  an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
14
 *  express or implied. See the License for the specific language governing
15
 *  permissions and limitations under the License.
16
 */
17

18
package com.uber.cadence.internal.sync;
19

20
import com.uber.cadence.EntityNotExistsError;
21
import com.uber.cadence.InternalServiceError;
22
import com.uber.cadence.QueryConsistencyLevel;
23
import com.uber.cadence.QueryFailedError;
24
import com.uber.cadence.QueryRejectCondition;
25
import com.uber.cadence.QueryWorkflowResponse;
26
import com.uber.cadence.WorkflowExecution;
27
import com.uber.cadence.WorkflowExecutionAlreadyCompletedError;
28
import com.uber.cadence.WorkflowExecutionAlreadyStartedError;
29
import com.uber.cadence.WorkflowType;
30
import com.uber.cadence.client.*;
31
import com.uber.cadence.context.ContextPropagator;
32
import com.uber.cadence.converter.DataConverter;
33
import com.uber.cadence.converter.DataConverterException;
34
import com.uber.cadence.converter.JsonDataConverter;
35
import com.uber.cadence.internal.common.CheckedExceptionWrapper;
36
import com.uber.cadence.internal.common.SignalWithStartWorkflowExecutionParameters;
37
import com.uber.cadence.internal.common.StartWorkflowExecutionParameters;
38
import com.uber.cadence.internal.common.WorkflowExecutionFailedException;
39
import com.uber.cadence.internal.common.WorkflowExecutionUtils;
40
import com.uber.cadence.internal.external.GenericWorkflowClientExternal;
41
import com.uber.cadence.internal.replay.QueryWorkflowParameters;
42
import com.uber.cadence.internal.replay.SignalExternalWorkflowParameters;
43
import java.lang.reflect.Type;
44
import java.util.HashMap;
45
import java.util.List;
46
import java.util.Map;
47
import java.util.Optional;
48
import java.util.UUID;
49
import java.util.concurrent.CancellationException;
50
import java.util.concurrent.CompletableFuture;
51
import java.util.concurrent.CompletionException;
52
import java.util.concurrent.TimeUnit;
53
import java.util.concurrent.TimeoutException;
54
import java.util.concurrent.atomic.AtomicReference;
55

56
class WorkflowStubImpl implements WorkflowStub {
57

58
  private final GenericWorkflowClientExternal genericClient;
59
  private final DataConverter dataConverter;
60
  private final Optional<String> workflowType;
61
  private AtomicReference<WorkflowExecution> execution = new AtomicReference<>();
1✔
62
  private final Optional<WorkflowOptions> options;
63
  private final WorkflowClientOptions clientOptions;
64

65
  WorkflowStubImpl(
66
      WorkflowClientOptions clientOptions,
67
      GenericWorkflowClientExternal genericClient,
68
      Optional<String> workflowType,
69
      WorkflowExecution execution) {
1✔
70
    this.clientOptions = clientOptions;
1✔
71
    this.genericClient = genericClient;
1✔
72
    this.dataConverter = clientOptions.getDataConverter();
1✔
73
    this.workflowType = workflowType;
1✔
74
    if (execution == null
1✔
75
        || execution.getWorkflowId() == null
1✔
76
        || execution.getWorkflowId().isEmpty()) {
1✔
77
      throw new IllegalArgumentException("null or empty workflowId");
×
78
    }
79
    this.execution.set(execution);
1✔
80
    this.options = Optional.empty();
1✔
81
  }
1✔
82

83
  WorkflowStubImpl(
84
      WorkflowClientOptions clientOptions,
85
      GenericWorkflowClientExternal genericClient,
86
      String workflowType,
87
      WorkflowOptions options) {
1✔
88
    this.clientOptions = clientOptions;
1✔
89
    this.genericClient = genericClient;
1✔
90
    this.dataConverter = clientOptions.getDataConverter();
1✔
91
    this.workflowType = Optional.of(workflowType);
1✔
92
    this.options = Optional.of(options);
1✔
93
  }
1✔
94

95
  @Override
96
  public void signal(String signalName, Object... input) {
97
    SignalExternalWorkflowParameters p = getSignalExternalWorkflowParameters(signalName, input);
1✔
98
    try {
99
      genericClient.signalWorkflowExecution(p);
1✔
100
    } catch (Exception e) {
1✔
101
      throw new WorkflowServiceException(execution.get(), workflowType, e);
1✔
102
    }
1✔
103
  }
1✔
104

105
  @Override
106
  public CompletableFuture<Void> signalAsync(String signalName, Object... input) {
107
    return signalAsyncWithTimeout(Long.MAX_VALUE, TimeUnit.MILLISECONDS, signalName, input);
1✔
108
  }
109

110
  @Override
111
  public CompletableFuture<Void> signalAsyncWithTimeout(
112
      long timeout, TimeUnit unit, String signalName, Object... input) {
113
    SignalExternalWorkflowParameters p = getSignalExternalWorkflowParameters(signalName, input);
1✔
114
    return genericClient.signalWorkflowExecutionAsync(p, unit.toMillis(timeout));
1✔
115
  }
116

117
  private SignalExternalWorkflowParameters getSignalExternalWorkflowParameters(
118
      String signalName, Object... input) {
119
    checkStarted();
1✔
120
    SignalExternalWorkflowParameters p = new SignalExternalWorkflowParameters();
1✔
121
    p.setInput(dataConverter.toData(input));
1✔
122
    p.setSignalName(signalName);
1✔
123
    p.setWorkflowId(execution.get().getWorkflowId());
1✔
124
    // TODO: Deal with signaling started workflow only, when requested
125
    // Commented out to support signaling workflows that called continue as new.
126
    //        p.setRunId(execution.getRunId());
127
    return p;
1✔
128
  }
129

130
  private WorkflowExecution startWithOptions(WorkflowOptions o, Object... args) {
131
    StartWorkflowExecutionParameters p = getStartWorkflowExecutionParameters(o, args);
1✔
132
    try {
133
      execution.set(genericClient.startWorkflow(p));
1✔
134
    } catch (WorkflowExecutionAlreadyStartedError e) {
1✔
135
      execution.set(
1✔
136
          new WorkflowExecution().setWorkflowId(p.getWorkflowId()).setRunId(e.getRunId()));
1✔
137
      WorkflowExecution execution =
1✔
138
          new WorkflowExecution().setWorkflowId(p.getWorkflowId()).setRunId(e.getRunId());
1✔
139
      throw new DuplicateWorkflowException(execution, workflowType.get(), e.getMessage());
1✔
140
    } catch (Exception e) {
×
141
      throw new WorkflowServiceException(execution.get(), workflowType, e);
×
142
    }
1✔
143
    return execution.get();
1✔
144
  }
145

146
  private CompletableFuture<WorkflowExecution> startAsyncWithOptions(
147
      long timeout, TimeUnit unit, WorkflowOptions o, Object... args) {
148
    StartWorkflowExecutionParameters p = getStartWorkflowExecutionParameters(o, args);
1✔
149
    return genericClient.startWorkflowAsync(p, unit.toMillis(timeout));
1✔
150
  }
151

152
  private StartWorkflowExecutionParameters getStartWorkflowExecutionParameters(
153
      WorkflowOptions o, Object[] args) {
154
    if (execution.get() != null) {
1✔
155
      throw new DuplicateWorkflowException(
1✔
156
          execution.get(),
1✔
157
          workflowType.get(),
1✔
158
          "Cannot reuse a stub instance to start more than one workflow execution. The stub "
159
              + "points to already started execution.");
160
    }
161
    StartWorkflowExecutionParameters p = StartWorkflowExecutionParameters.fromWorkflowOptions(o);
1✔
162
    if (o.getWorkflowId() == null) {
1✔
163
      p.setWorkflowId(UUID.randomUUID().toString());
1✔
164
    } else {
165
      p.setWorkflowId(o.getWorkflowId());
1✔
166
    }
167
    p.setInput(dataConverter.toData(args));
1✔
168
    p.setWorkflowType(new WorkflowType().setName(workflowType.get()));
1✔
169
    p.setMemo(convertMemoFromObjectToBytes(o.getMemo()));
1✔
170
    p.setSearchAttributes(convertSearchAttributesFromObjectToBytes(o.getSearchAttributes()));
1✔
171
    p.setContext(extractContextsAndConvertToBytes(o.getContextPropagators()));
1✔
172
    p.setDelayStart(o.getDelayStart());
1✔
173
    return p;
1✔
174
  }
175

176
  private Map<String, byte[]> convertMapFromObjectToBytes(
177
      Map<String, Object> map, DataConverter dataConverter) {
178
    if (map == null) {
1✔
179
      return null;
1✔
180
    }
181
    Map<String, byte[]> result = new HashMap<>();
1✔
182
    for (Map.Entry<String, Object> item : map.entrySet()) {
1✔
183
      try {
184
        result.put(item.getKey(), dataConverter.toData(item.getValue()));
1✔
185
      } catch (DataConverterException e) {
×
186
        throw new DataConverterException("Cannot serialize key " + item.getKey(), e.getCause());
×
187
      }
1✔
188
    }
1✔
189
    return result;
1✔
190
  }
191

192
  private Map<String, byte[]> convertMemoFromObjectToBytes(Map<String, Object> map) {
193
    return convertMapFromObjectToBytes(map, dataConverter);
1✔
194
  }
195

196
  private Map<String, byte[]> convertSearchAttributesFromObjectToBytes(Map<String, Object> map) {
197
    return convertMapFromObjectToBytes(map, JsonDataConverter.getInstance());
1✔
198
  }
199

200
  private Map<String, byte[]> extractContextsAndConvertToBytes(
201
      List<ContextPropagator> contextPropagators) {
202
    if (contextPropagators == null || contextPropagators.isEmpty()) {
1✔
203
      return null;
1✔
204
    }
205
    Map<String, byte[]> result = new HashMap<>();
1✔
206
    for (ContextPropagator propagator : contextPropagators) {
1✔
207
      result.putAll(propagator.serializeContext(propagator.getCurrentContext()));
1✔
208
    }
1✔
209
    return result;
1✔
210
  }
211

212
  @Override
213
  public WorkflowExecution start(Object... args) {
214
    if (!options.isPresent()) {
1✔
215
      throw new IllegalStateException("Required parameter WorkflowOptions is missing");
×
216
    }
217
    return startWithOptions(WorkflowOptions.merge(null, null, null, options.get()), args);
1✔
218
  }
219

220
  @Override
221
  public CompletableFuture<WorkflowExecution> startAsync(Object... args) {
222
    return startAsyncWithTimeout(Long.MAX_VALUE, TimeUnit.MILLISECONDS, args);
1✔
223
  }
224

225
  @Override
226
  public CompletableFuture<WorkflowExecution> startAsyncWithTimeout(
227
      long timeout, TimeUnit unit, Object... args) {
228
    if (!options.isPresent()) {
1✔
229
      throw new IllegalStateException("Required parameter WorkflowOptions is missing");
×
230
    }
231

232
    CompletableFuture<WorkflowExecution> result =
1✔
233
        startAsyncWithOptions(
1✔
234
            timeout, unit, WorkflowOptions.merge(null, null, null, options.get()), args);
1✔
235
    return result.whenComplete(
1✔
236
        (input, exception) -> {
237
          if (input != null) {
1✔
238
            execution.set(
1✔
239
                new WorkflowExecution()
240
                    .setWorkflowId(input.getWorkflowId())
1✔
241
                    .setRunId(input.getRunId()));
1✔
242
          }
243
        });
1✔
244
  }
245

246
  private WorkflowExecution signalWithStartWithOptions(
247
      WorkflowOptions options, String signalName, Object[] signalArgs, Object[] startArgs) {
248
    StartWorkflowExecutionParameters sp = getStartWorkflowExecutionParameters(options, startArgs);
1✔
249

250
    byte[] signalInput = dataConverter.toData(signalArgs);
1✔
251
    SignalWithStartWorkflowExecutionParameters p =
1✔
252
        new SignalWithStartWorkflowExecutionParameters(sp, signalName, signalInput);
253
    try {
254
      execution.set(genericClient.signalWithStartWorkflowExecution(p));
1✔
255
    } catch (Exception e) {
×
256
      throw new WorkflowServiceException(execution.get(), workflowType, e);
×
257
    }
1✔
258
    return execution.get();
1✔
259
  }
260

261
  @Override
262
  public WorkflowExecution signalWithStart(
263
      String signalName, Object[] signalArgs, Object[] startArgs) {
264
    if (!options.isPresent()) {
1✔
265
      throw new IllegalStateException("Required parameter WorkflowOptions is missing");
×
266
    }
267
    return signalWithStartWithOptions(
1✔
268
        WorkflowOptions.merge(null, null, null, options.get()), signalName, signalArgs, startArgs);
1✔
269
  }
270

271
  @Override
272
  public Optional<String> getWorkflowType() {
273
    return workflowType;
1✔
274
  }
275

276
  @Override
277
  public WorkflowExecution getExecution() {
278
    return execution.get();
1✔
279
  }
280

281
  @Override
282
  public <R> R getResult(Class<R> resultClass) {
283
    return getResult(resultClass, resultClass);
1✔
284
  }
285

286
  @Override
287
  public <R> R getResult(Class<R> resultClass, Type resultType) {
288
    try {
289
      return getResult(Long.MAX_VALUE, TimeUnit.MILLISECONDS, resultClass, resultType);
1✔
290
    } catch (TimeoutException e) {
×
291
      throw CheckedExceptionWrapper.wrap(e);
×
292
    }
293
  }
294

295
  @Override
296
  public <R> R getResult(long timeout, TimeUnit unit, Class<R> resultClass)
297
      throws TimeoutException {
298
    return getResult(timeout, unit, resultClass, resultClass);
×
299
  }
300

301
  @Override
302
  public <R> R getResult(long timeout, TimeUnit unit, Class<R> resultClass, Type resultType)
303
      throws TimeoutException {
304
    checkStarted();
1✔
305
    try {
306
      byte[] resultValue =
1✔
307
          WorkflowExecutionUtils.getWorkflowExecutionResult(
1✔
308
              genericClient.getService(),
1✔
309
              genericClient.getDomain(),
1✔
310
              execution.get(),
1✔
311
              workflowType,
312
              timeout,
313
              unit);
314
      if (resultValue == null) {
1✔
315
        return null;
×
316
      }
317
      return dataConverter.fromData(resultValue, resultClass, resultType);
1✔
318
    } catch (TimeoutException e) {
×
319
      throw e;
×
320
    } catch (Exception e) {
1✔
321
      return mapToWorkflowFailureException(e, resultClass);
×
322
    }
323
  }
324

325
  @Override
326
  public <R> CompletableFuture<R> getResultAsync(Class<R> resultClass) {
327
    return getResultAsync(resultClass, resultClass);
1✔
328
  }
329

330
  @Override
331
  public <R> CompletableFuture<R> getResultAsync(Class<R> resultClass, Type resultType) {
332
    return getResultAsync(Long.MAX_VALUE, TimeUnit.MILLISECONDS, resultClass, resultType);
1✔
333
  }
334

335
  @Override
336
  public <R> CompletableFuture<R> getResultAsync(
337
      long timeout, TimeUnit unit, Class<R> resultClass) {
338
    return getResultAsync(timeout, unit, resultClass, resultClass);
1✔
339
  }
340

341
  @Override
342
  public <R> CompletableFuture<R> getResultAsync(
343
      long timeout, TimeUnit unit, Class<R> resultClass, Type resultType) {
344
    checkStarted();
1✔
345
    return WorkflowExecutionUtils.getWorkflowExecutionResultAsync(
1✔
346
            genericClient.getService(),
1✔
347
            genericClient.getDomain(),
1✔
348
            execution.get(),
1✔
349
            workflowType,
350
            timeout,
351
            unit)
352
        .handle(
1✔
353
            (r, e) -> {
354
              if (e instanceof CompletionException) {
1✔
355
                e = e.getCause();
1✔
356
              }
357
              if (e instanceof WorkflowExecutionFailedException) {
1✔
358
                return mapToWorkflowFailureException(
×
359
                    (WorkflowExecutionFailedException) e, resultClass);
360
              }
361
              if (e != null) {
1✔
362
                throw CheckedExceptionWrapper.wrap(e);
×
363
              }
364
              if (r == null) {
1✔
365
                return null;
×
366
              }
367
              return dataConverter.fromData(r, resultClass, resultType);
1✔
368
            });
369
  }
370

371
  private <R> R mapToWorkflowFailureException(
372
      Exception failure, @SuppressWarnings("unused") Class<R> returnType) {
373
    failure = CheckedExceptionWrapper.unwrap(failure);
1✔
374
    Class<Throwable> detailsClass;
375
    if (failure instanceof WorkflowExecutionFailedException) {
1✔
376
      WorkflowExecutionFailedException executionFailed = (WorkflowExecutionFailedException) failure;
1✔
377
      try {
378
        @SuppressWarnings("unchecked")
379
        Class<Throwable> dc = (Class<Throwable>) Class.forName(executionFailed.getReason());
1✔
380
        detailsClass = dc;
1✔
381
      } catch (Exception e) {
×
382
        RuntimeException ee =
×
383
            new RuntimeException(
384
                "Couldn't deserialize failure cause "
385
                    + "as the reason field is expected to contain an exception class name",
386
                executionFailed);
387
        throw new WorkflowFailureException(
×
388
            execution.get(), workflowType, executionFailed.getDecisionTaskCompletedEventId(), ee);
×
389
      }
1✔
390
      Throwable cause =
1✔
391
          dataConverter.fromData(executionFailed.getDetails(), detailsClass, detailsClass);
1✔
392
      throw new WorkflowFailureException(
1✔
393
          execution.get(), workflowType, executionFailed.getDecisionTaskCompletedEventId(), cause);
1✔
394
    } else if (failure instanceof EntityNotExistsError) {
1✔
395
      throw new WorkflowNotFoundException(execution.get(), workflowType, failure.getMessage());
×
396
    } else if (failure instanceof WorkflowExecutionAlreadyCompletedError) {
1✔
397
      throw new WorkflowAlreadyCompletedException(
×
398
          execution.get(), workflowType, failure.getMessage());
×
399
    } else if (failure instanceof CancellationException) {
1✔
400
      throw (CancellationException) failure;
1✔
401
    } else if (failure instanceof WorkflowException) {
1✔
402
      throw (WorkflowException) failure;
1✔
403
    } else {
404
      throw new WorkflowServiceException(execution.get(), workflowType, failure);
×
405
    }
406
  }
407

408
  @Override
409
  public <R> R query(String queryType, Class<R> resultClass, Object... args) {
410
    return queryWithOptions(
1✔
411
        queryType, new QueryOptions.Builder().build(), resultClass, resultClass, args);
1✔
412
  }
413

414
  @Override
415
  public <R> R query(String queryType, Class<R> resultClass, Type resultType, Object... args) {
416
    return query(queryType, resultClass, resultType, clientOptions.getQueryRejectCondition(), args);
1✔
417
  }
418

419
  @Override
420
  public <R> R query(
421
      String queryType,
422
      Class<R> resultClass,
423
      QueryRejectCondition queryRejectCondition,
424
      Object... args) {
425
    return query(queryType, resultClass, resultClass, queryRejectCondition, args);
×
426
  }
427

428
  @Override
429
  public <R> R query(
430
      String queryType,
431
      Class<R> resultClass,
432
      Type resultType,
433
      QueryRejectCondition queryRejectCondition,
434
      Object... args) {
435
    return queryWithOptions(
1✔
436
        queryType,
437
        new QueryOptions.Builder()
438
            .setQueryRejectCondition(queryRejectCondition)
1✔
439
            .setQueryConsistencyLevel(QueryConsistencyLevel.EVENTUAL)
1✔
440
            .build(),
1✔
441
        resultType,
442
        resultClass,
443
        args);
444
  }
445

446
  @Override
447
  public <R> R queryWithOptions(
448
      String queryType,
449
      QueryOptions options,
450
      Type resultType,
451
      Class<R> resultClass,
452
      Object... args) {
453
    checkStarted();
1✔
454
    QueryWorkflowParameters p = new QueryWorkflowParameters();
1✔
455
    p.setInput(dataConverter.toData(args));
1✔
456
    p.setQueryType(queryType);
1✔
457
    p.setWorkflowId(execution.get().getWorkflowId());
1✔
458
    p.setQueryRejectCondition(options.getQueryRejectCondition());
1✔
459
    p.setQueryConsistencyLevel(options.getQueryConsistencyLevel());
1✔
460

461
    QueryWorkflowResponse result;
462
    try {
463
      result = genericClient.queryWorkflow(p);
1✔
464
    } catch (RuntimeException e) {
×
465
      Exception unwrapped = CheckedExceptionWrapper.unwrap(e);
×
466
      if (unwrapped instanceof EntityNotExistsError) {
×
467
        throw new WorkflowNotFoundException(execution.get(), workflowType, e.getMessage());
×
468
      }
469
      if (unwrapped instanceof QueryFailedError) {
×
470
        throw new WorkflowQueryException(execution.get(), unwrapped.getMessage());
×
471
      }
472
      if (unwrapped instanceof InternalServiceError) {
×
473
        throw new WorkflowServiceException(execution.get(), workflowType, unwrapped);
×
474
      }
475
      throw e;
×
476
    }
1✔
477

478
    if (result.queryRejected == null) {
1✔
479
      return dataConverter.fromData(result.getQueryResult(), resultClass, resultType);
1✔
480
    } else {
481
      throw new WorkflowQueryRejectedException(
1✔
482
          execution.get(),
1✔
483
          options.getQueryRejectCondition(),
1✔
484
          result.getQueryRejected().getCloseStatus());
1✔
485
    }
486
  }
487

488
  @Override
489
  public void cancel() {
490
    if (execution.get() == null || execution.get().getWorkflowId() == null) {
1✔
491
      return;
×
492
    }
493

494
    // RunId can change if workflow does ContinueAsNew. So we do not set it here and
495
    // let the server figure out the current run.
496
    genericClient.requestCancelWorkflowExecution(
1✔
497
        new WorkflowExecution().setWorkflowId(execution.get().getWorkflowId()));
1✔
498
  }
1✔
499

500
  @Override
501
  public Optional<WorkflowOptions> getOptions() {
502
    return options;
1✔
503
  }
504

505
  private void checkStarted() {
506
    if (execution.get() == null || execution.get().getWorkflowId() == null) {
1✔
507
      throw new IllegalStateException("Null workflowId. Was workflow started?");
1✔
508
    }
509
  }
1✔
510
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc