• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

uber / cadence-java-client / 1997

06 Oct 2023 06:47PM UTC coverage: 60.25% (-0.03%) from 60.278%
1997

push

buildkite

web-flow
Populate tasklistkind in poll request (#859)

13 of 13 new or added lines in 4 files covered. (100.0%)

11345 of 18830 relevant lines covered (60.25%)

0.6 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

84.62
/src/main/java/com/uber/cadence/internal/external/GenericWorkflowClientExternalImpl.java
1
/*
2
 *  Copyright 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved.
3
 *
4
 *  Modifications copyright (C) 2017 Uber Technologies, Inc.
5
 *
6
 *  Licensed under the Apache License, Version 2.0 (the "License"). You may not
7
 *  use this file except in compliance with the License. A copy of the License is
8
 *  located at
9
 *
10
 *  http://aws.amazon.com/apache2.0
11
 *
12
 *  or in the "license" file accompanying this file. This file is distributed on
13
 *  an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
14
 *  express or implied. See the License for the specific language governing
15
 *  permissions and limitations under the License.
16
 */
17

18
package com.uber.cadence.internal.external;
19

20
import com.google.common.base.Strings;
21
import com.uber.cadence.Header;
22
import com.uber.cadence.Memo;
23
import com.uber.cadence.QueryWorkflowRequest;
24
import com.uber.cadence.QueryWorkflowResponse;
25
import com.uber.cadence.RequestCancelWorkflowExecutionRequest;
26
import com.uber.cadence.RetryPolicy;
27
import com.uber.cadence.SearchAttributes;
28
import com.uber.cadence.SignalWithStartWorkflowExecutionRequest;
29
import com.uber.cadence.SignalWorkflowExecutionRequest;
30
import com.uber.cadence.StartWorkflowExecutionRequest;
31
import com.uber.cadence.StartWorkflowExecutionResponse;
32
import com.uber.cadence.TaskList;
33
import com.uber.cadence.TerminateWorkflowExecutionRequest;
34
import com.uber.cadence.WorkflowExecution;
35
import com.uber.cadence.WorkflowExecutionAlreadyStartedError;
36
import com.uber.cadence.WorkflowQuery;
37
import com.uber.cadence.common.RetryOptions;
38
import com.uber.cadence.internal.common.*;
39
import com.uber.cadence.internal.metrics.MetricsTag;
40
import com.uber.cadence.internal.metrics.MetricsType;
41
import com.uber.cadence.internal.replay.QueryWorkflowParameters;
42
import com.uber.cadence.internal.replay.SignalExternalWorkflowParameters;
43
import com.uber.cadence.serviceclient.IWorkflowService;
44
import com.uber.m3.tally.Scope;
45
import com.uber.m3.util.ImmutableMap;
46
import java.nio.ByteBuffer;
47
import java.time.Duration;
48
import java.util.HashMap;
49
import java.util.Map;
50
import java.util.UUID;
51
import java.util.concurrent.CompletableFuture;
52
import org.apache.thrift.TException;
53
import org.apache.thrift.async.AsyncMethodCallback;
54

55
public final class GenericWorkflowClientExternalImpl implements GenericWorkflowClientExternal {
56

57
  private final String domain;
58
  private final IWorkflowService service;
59
  private final Scope metricsScope;
60

61
  public GenericWorkflowClientExternalImpl(
62
      IWorkflowService service, String domain, Scope metricsScope) {
1✔
63
    this.service = service;
1✔
64
    this.domain = domain;
1✔
65
    this.metricsScope = metricsScope;
1✔
66
  }
1✔
67

68
  @Override
69
  public String getDomain() {
70
    return domain;
1✔
71
  }
72

73
  @Override
74
  public IWorkflowService getService() {
75
    return service;
1✔
76
  }
77

78
  @Override
79
  public WorkflowExecution startWorkflow(StartWorkflowExecutionParameters startParameters)
80
      throws WorkflowExecutionAlreadyStartedError {
81
    try {
82
      return startWorkflowInternal(startParameters);
1✔
83
    } finally {
84
      emitMetricsForStartWorkflow(startParameters);
1✔
85
    }
86
  }
87

88
  @Override
89
  public CompletableFuture<WorkflowExecution> startWorkflowAsync(
90
      StartWorkflowExecutionParameters startParameters) {
91

92
    return startWorkflowAsync(startParameters, Long.MAX_VALUE);
×
93
  }
94

95
  @Override
96
  public CompletableFuture<WorkflowExecution> startWorkflowAsync(
97
      StartWorkflowExecutionParameters startParameters, Long timeoutInMillis) {
98

99
    emitMetricsForStartWorkflow(startParameters);
1✔
100
    return startWorkflowAsyncInternal(startParameters, timeoutInMillis);
1✔
101
  }
102

103
  private void emitMetricsForStartWorkflow(StartWorkflowExecutionParameters startParameters) {
104
    // TODO: can probably cache this
105
    Map<String, String> tags =
1✔
106
        new ImmutableMap.Builder<String, String>(3)
107
            .put(MetricsTag.WORKFLOW_TYPE, startParameters.getWorkflowType().getName())
1✔
108
            .put(MetricsTag.TASK_LIST, startParameters.getTaskList())
1✔
109
            .put(MetricsTag.DOMAIN, domain)
1✔
110
            .build();
1✔
111
    metricsScope.tagged(tags).counter(MetricsType.WORKFLOW_START_COUNTER).inc(1);
1✔
112
  }
1✔
113

114
  private WorkflowExecution startWorkflowInternal(StartWorkflowExecutionParameters startParameters)
115
      throws WorkflowExecutionAlreadyStartedError {
116

117
    StartWorkflowExecutionRequest request = getStartRequest(startParameters);
1✔
118
    StartWorkflowExecutionResponse result;
119
    try {
120
      result =
1✔
121
          RpcRetryer.retryWithResult(
1✔
122
              RpcRetryer.DEFAULT_RPC_RETRY_OPTIONS, () -> service.StartWorkflowExecution(request));
1✔
123
    } catch (WorkflowExecutionAlreadyStartedError e) {
1✔
124
      throw e;
1✔
125
    } catch (TException e) {
×
126
      throw CheckedExceptionWrapper.wrap(e);
×
127
    }
1✔
128
    WorkflowExecution execution = new WorkflowExecution();
1✔
129
    execution.setRunId(result.getRunId());
1✔
130
    execution.setWorkflowId(request.getWorkflowId());
1✔
131

132
    return execution;
1✔
133
  }
134

135
  private RetryOptions getRetryOptionsWithExpiration(RetryOptions o, Long timeoutInMillis) {
136
    if (timeoutInMillis == null || timeoutInMillis <= 0 || timeoutInMillis == Long.MAX_VALUE) {
1✔
137
      return o;
1✔
138
    }
139
    return new RetryOptions.Builder(RpcRetryer.DEFAULT_RPC_RETRY_OPTIONS)
1✔
140
        .setExpiration(Duration.ofMillis((timeoutInMillis)))
1✔
141
        .build();
1✔
142
  }
143

144
  private CompletableFuture<WorkflowExecution> startWorkflowAsyncInternal(
145
      StartWorkflowExecutionParameters startParameters, Long timeoutInMillis) {
146
    StartWorkflowExecutionRequest request = getStartRequest(startParameters);
1✔
147

148
    return RpcRetryer.retryWithResultAsync(
1✔
149
        getRetryOptionsWithExpiration(RpcRetryer.DEFAULT_RPC_RETRY_OPTIONS, timeoutInMillis),
1✔
150
        () -> {
151
          CompletableFuture<WorkflowExecution> result = new CompletableFuture<>();
1✔
152
          try {
153

154
            service.StartWorkflowExecutionWithTimeout(
1✔
155
                request,
156
                new AsyncMethodCallback<StartWorkflowExecutionResponse>() {
1✔
157
                  @Override
158
                  public void onComplete(StartWorkflowExecutionResponse response) {
159
                    WorkflowExecution execution = new WorkflowExecution();
1✔
160
                    execution.setRunId(response.getRunId());
1✔
161
                    execution.setWorkflowId(request.getWorkflowId());
1✔
162
                    result.complete(execution);
1✔
163
                  }
1✔
164

165
                  @Override
166
                  public void onError(Exception exception) {
167
                    result.completeExceptionally(exception);
1✔
168
                  }
1✔
169
                },
170
                timeoutInMillis);
171
          } catch (TException e) {
×
172
            result.completeExceptionally(e);
×
173
          }
1✔
174
          return result;
1✔
175
        });
176
  }
177

178
  private StartWorkflowExecutionRequest getStartRequest(
179
      StartWorkflowExecutionParameters startParameters) {
180
    StartWorkflowExecutionRequest request = new StartWorkflowExecutionRequest();
1✔
181
    request.setDomain(domain);
1✔
182
    if (startParameters.getInput() != null) {
1✔
183
      request.setInput(startParameters.getInput());
1✔
184
    }
185
    request.setExecutionStartToCloseTimeoutSeconds(
1✔
186
        (int) startParameters.getExecutionStartToCloseTimeoutSeconds());
1✔
187
    request.setTaskStartToCloseTimeoutSeconds(
1✔
188
        (int) startParameters.getTaskStartToCloseTimeoutSeconds());
1✔
189
    request.setWorkflowIdReusePolicy(startParameters.getWorkflowIdReusePolicy());
1✔
190
    String taskList = startParameters.getTaskList();
1✔
191
    if (taskList != null && !taskList.isEmpty()) {
1✔
192
      TaskList tl = new TaskList();
1✔
193
      tl.setName(taskList);
1✔
194
      request.setTaskList(tl);
1✔
195
    }
196
    String workflowId = startParameters.getWorkflowId();
1✔
197
    if (workflowId == null) {
1✔
198
      workflowId = UUID.randomUUID().toString();
×
199
    }
200
    request.setWorkflowId(workflowId);
1✔
201
    request.setWorkflowType(startParameters.getWorkflowType());
1✔
202
    RetryParameters retryParameters = startParameters.getRetryParameters();
1✔
203
    if (retryParameters != null) {
1✔
204
      RetryPolicy retryPolicy = toRetryPolicy(retryParameters);
1✔
205
      request.setRetryPolicy(retryPolicy);
1✔
206
    }
207
    if (!Strings.isNullOrEmpty(startParameters.getCronSchedule())) {
1✔
208
      request.setCronSchedule(startParameters.getCronSchedule());
1✔
209
    }
210
    request.setMemo(toMemoThrift(startParameters.getMemo()));
1✔
211
    request.setSearchAttributes(toSearchAttributesThrift(startParameters.getSearchAttributes()));
1✔
212
    request.setHeader(toHeaderThrift(startParameters.getContext()));
1✔
213
    if (startParameters.getDelayStart() != null) {
1✔
214
      request.setDelayStartSeconds((int) startParameters.getDelayStart().getSeconds());
×
215
    }
216

217
    return request;
1✔
218
  }
219

220
  private Memo toMemoThrift(Map<String, byte[]> memo) {
221
    if (memo == null || memo.isEmpty()) {
1✔
222
      return null;
1✔
223
    }
224

225
    Map<String, ByteBuffer> fields = new HashMap<>();
1✔
226
    for (Map.Entry<String, byte[]> item : memo.entrySet()) {
1✔
227
      fields.put(item.getKey(), ByteBuffer.wrap(item.getValue()));
1✔
228
    }
1✔
229
    Memo memoThrift = new Memo();
1✔
230
    memoThrift.setFields(fields);
1✔
231
    return memoThrift;
1✔
232
  }
233

234
  private SearchAttributes toSearchAttributesThrift(Map<String, byte[]> searchAttributes) {
235
    if (searchAttributes == null || searchAttributes.isEmpty()) {
1✔
236
      return null;
1✔
237
    }
238

239
    Map<String, ByteBuffer> fields = new HashMap<>();
1✔
240
    for (Map.Entry<String, byte[]> item : searchAttributes.entrySet()) {
1✔
241
      fields.put(item.getKey(), ByteBuffer.wrap(item.getValue()));
1✔
242
    }
1✔
243
    SearchAttributes searchAttrThrift = new SearchAttributes();
1✔
244
    searchAttrThrift.setIndexedFields(fields);
1✔
245
    return searchAttrThrift;
1✔
246
  }
247

248
  private Header toHeaderThrift(Map<String, byte[]> headers) {
249
    if (headers == null || headers.isEmpty()) {
1✔
250
      return null;
1✔
251
    }
252
    Map<String, ByteBuffer> fields = new HashMap<>();
1✔
253
    for (Map.Entry<String, byte[]> item : headers.entrySet()) {
1✔
254
      fields.put(item.getKey(), ByteBuffer.wrap(item.getValue()));
1✔
255
    }
1✔
256
    Header headerThrift = new Header();
1✔
257
    headerThrift.setFields(fields);
1✔
258
    return headerThrift;
1✔
259
  }
260

261
  private RetryPolicy toRetryPolicy(RetryParameters retryParameters) {
262
    return new RetryPolicy()
1✔
263
        .setBackoffCoefficient(retryParameters.getBackoffCoefficient())
1✔
264
        .setExpirationIntervalInSeconds(retryParameters.getExpirationIntervalInSeconds())
1✔
265
        .setInitialIntervalInSeconds(retryParameters.getInitialIntervalInSeconds())
1✔
266
        .setMaximumAttempts(retryParameters.getMaximumAttempts())
1✔
267
        .setMaximumIntervalInSeconds(retryParameters.getMaximumIntervalInSeconds())
1✔
268
        .setNonRetriableErrorReasons(retryParameters.getNonRetriableErrorReasons());
1✔
269
  }
270

271
  @Override
272
  public void signalWorkflowExecution(SignalExternalWorkflowParameters signalParameters) {
273
    SignalWorkflowExecutionRequest request = getSignalRequest(signalParameters);
1✔
274

275
    try {
276
      RpcRetryer.retry(() -> service.SignalWorkflowExecution(request));
1✔
277
    } catch (TException e) {
1✔
278
      throw CheckedExceptionWrapper.wrap(e);
1✔
279
    }
1✔
280
  }
1✔
281

282
  @Override
283
  public CompletableFuture<Void> signalWorkflowExecutionAsync(
284
      SignalExternalWorkflowParameters signalParameters) {
285
    return signalWorkflowExecutionAsync(signalParameters, Long.MAX_VALUE);
×
286
  }
287

288
  @Override
289
  public CompletableFuture<Void> signalWorkflowExecutionAsync(
290
      SignalExternalWorkflowParameters signalParameters, Long timeoutInMillis) {
291
    SignalWorkflowExecutionRequest request = getSignalRequest(signalParameters);
1✔
292
    return RpcRetryer.retryWithResultAsync(
1✔
293
        getRetryOptionsWithExpiration(RpcRetryer.DEFAULT_RPC_RETRY_OPTIONS, timeoutInMillis),
1✔
294
        () -> {
295
          CompletableFuture<Void> result = new CompletableFuture<>();
1✔
296
          try {
297
            service.SignalWorkflowExecution(
1✔
298
                request,
299
                new AsyncMethodCallback() {
1✔
300
                  @Override
301
                  public void onComplete(Object response) {
302
                    result.complete(null);
1✔
303
                  }
1✔
304

305
                  @Override
306
                  public void onError(Exception exception) {
307
                    result.completeExceptionally(exception);
×
308
                  }
×
309
                });
310
          } catch (TException e) {
×
311
            result.completeExceptionally(e);
×
312
          }
1✔
313
          return result;
1✔
314
        });
315
  }
316

317
  private SignalWorkflowExecutionRequest getSignalRequest(
318
      SignalExternalWorkflowParameters signalParameters) {
319
    SignalWorkflowExecutionRequest request = new SignalWorkflowExecutionRequest();
1✔
320
    request.setDomain(domain);
1✔
321
    request.setInput(signalParameters.getInput());
1✔
322
    request.setSignalName(signalParameters.getSignalName());
1✔
323
    WorkflowExecution execution = new WorkflowExecution();
1✔
324
    execution.setRunId(signalParameters.getRunId());
1✔
325
    execution.setWorkflowId(signalParameters.getWorkflowId());
1✔
326
    request.setWorkflowExecution(execution);
1✔
327
    return request;
1✔
328
  }
329

330
  @Override
331
  public WorkflowExecution signalWithStartWorkflowExecution(
332
      SignalWithStartWorkflowExecutionParameters parameters) {
333
    try {
334
      return signalWithStartWorkflowInternal(parameters);
1✔
335
    } finally {
336
      Map<String, String> tags =
1✔
337
          new ImmutableMap.Builder<String, String>(3)
338
              .put(
1✔
339
                  MetricsTag.WORKFLOW_TYPE,
340
                  parameters.getStartParameters().getWorkflowType().getName())
1✔
341
              .put(MetricsTag.TASK_LIST, parameters.getStartParameters().getTaskList())
1✔
342
              .put(MetricsTag.DOMAIN, domain)
1✔
343
              .build();
1✔
344
      metricsScope.tagged(tags).counter(MetricsType.WORKFLOW_SIGNAL_WITH_START_COUNTER).inc(1);
1✔
345
    }
346
  }
347

348
  private WorkflowExecution signalWithStartWorkflowInternal(
349
      SignalWithStartWorkflowExecutionParameters parameters) {
350
    SignalWithStartWorkflowExecutionRequest request = new SignalWithStartWorkflowExecutionRequest();
1✔
351
    request.setDomain(domain);
1✔
352
    StartWorkflowExecutionParameters startParameters = parameters.getStartParameters();
1✔
353
    request.setSignalName(parameters.getSignalName());
1✔
354
    request.setSignalInput(parameters.getSignalInput());
1✔
355
    // TODO        request.setIdentity()
356

357
    if (startParameters.getInput() != null) {
1✔
358
      request.setInput(startParameters.getInput());
×
359
    }
360
    request.setExecutionStartToCloseTimeoutSeconds(
1✔
361
        (int) startParameters.getExecutionStartToCloseTimeoutSeconds());
1✔
362
    request.setTaskStartToCloseTimeoutSeconds(
1✔
363
        (int) startParameters.getTaskStartToCloseTimeoutSeconds());
1✔
364
    request.setWorkflowIdReusePolicy(startParameters.getWorkflowIdReusePolicy());
1✔
365
    String taskList = startParameters.getTaskList();
1✔
366
    if (taskList != null && !taskList.isEmpty()) {
1✔
367
      TaskList tl = new TaskList();
1✔
368
      tl.setName(taskList);
1✔
369
      request.setTaskList(tl);
1✔
370
    }
371
    String workflowId = startParameters.getWorkflowId();
1✔
372
    if (workflowId == null) {
1✔
373
      workflowId = UUID.randomUUID().toString();
×
374
    }
375
    request.setWorkflowId(workflowId);
1✔
376
    request.setWorkflowType(startParameters.getWorkflowType());
1✔
377
    RetryParameters retryParameters = startParameters.getRetryParameters();
1✔
378
    if (retryParameters != null) {
1✔
379
      RetryPolicy retryPolicy = toRetryPolicy(retryParameters);
×
380
      request.setRetryPolicy(retryPolicy);
×
381
    }
382
    if (!Strings.isNullOrEmpty(startParameters.getCronSchedule())) {
1✔
383
      request.setCronSchedule(startParameters.getCronSchedule());
×
384
    }
385
    if (startParameters.getDelayStart() != null) {
1✔
386
      request.setDelayStartSeconds((int) startParameters.getDelayStart().getSeconds());
×
387
    }
388
    StartWorkflowExecutionResponse result;
389
    try {
390
      result =
1✔
391
          RpcRetryer.retryWithResult(
1✔
392
              RpcRetryer.DEFAULT_RPC_RETRY_OPTIONS,
393
              () -> service.SignalWithStartWorkflowExecution(request));
1✔
394
    } catch (TException e) {
×
395
      throw CheckedExceptionWrapper.wrap(e);
×
396
    }
1✔
397
    WorkflowExecution execution = new WorkflowExecution();
1✔
398
    execution.setRunId(result.getRunId());
1✔
399
    execution.setWorkflowId(request.getWorkflowId());
1✔
400
    return execution;
1✔
401
  }
402

403
  @Override
404
  public void requestCancelWorkflowExecution(WorkflowExecution execution) {
405
    RequestCancelWorkflowExecutionRequest request = new RequestCancelWorkflowExecutionRequest();
1✔
406
    request.setDomain(domain);
1✔
407
    request.setWorkflowExecution(execution);
1✔
408
    try {
409
      RpcRetryer.retry(() -> service.RequestCancelWorkflowExecution(request));
1✔
410
    } catch (TException e) {
×
411
      throw CheckedExceptionWrapper.wrap(e);
×
412
    }
1✔
413
  }
1✔
414

415
  @Override
416
  public QueryWorkflowResponse queryWorkflow(QueryWorkflowParameters queryParameters) {
417
    QueryWorkflowRequest request = new QueryWorkflowRequest();
1✔
418
    request.setDomain(domain);
1✔
419
    WorkflowExecution execution = new WorkflowExecution();
1✔
420
    execution.setWorkflowId(queryParameters.getWorkflowId()).setRunId(queryParameters.getRunId());
1✔
421
    request.setExecution(execution);
1✔
422
    WorkflowQuery query = new WorkflowQuery();
1✔
423
    query.setQueryArgs(queryParameters.getInput());
1✔
424
    query.setQueryType(queryParameters.getQueryType());
1✔
425
    request.setQuery(query);
1✔
426
    request.setQueryRejectCondition(queryParameters.getQueryRejectCondition());
1✔
427
    request.setQueryConsistencyLevel(queryParameters.getQueryConsistencyLevel());
1✔
428
    try {
429
      QueryWorkflowResponse response =
1✔
430
          RpcRetryer.retryWithResult(
1✔
431
              RpcRetryer.DEFAULT_RPC_RETRY_OPTIONS, () -> service.QueryWorkflow(request));
1✔
432
      return response;
1✔
433
    } catch (TException e) {
×
434
      throw CheckedExceptionWrapper.wrap(e);
×
435
    }
436
  }
437

438
  @Override
439
  public String generateUniqueId() {
440
    String workflowId = UUID.randomUUID().toString();
×
441
    return workflowId;
×
442
  }
443

444
  @Override
445
  public void terminateWorkflowExecution(TerminateWorkflowExecutionParameters terminateParameters) {
446
    TerminateWorkflowExecutionRequest request = new TerminateWorkflowExecutionRequest();
×
447
    request.setWorkflowExecution(terminateParameters.getWorkflowExecution());
×
448
    request.setDomain(domain);
×
449
    request.setDetails(terminateParameters.getDetails());
×
450
    request.setReason(terminateParameters.getReason());
×
451
    //        request.setChildPolicy(terminateParameters.getChildPolicy());
452
    try {
453
      RpcRetryer.retry(() -> service.TerminateWorkflowExecution(request));
×
454
    } catch (TException e) {
×
455
      throw CheckedExceptionWrapper.wrap(e);
×
456
    }
×
457
  }
×
458
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc