• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

uber / cadence-java-client / 2263

19 Apr 2024 03:59PM UTC coverage: 60.13% (-0.2%) from 60.328%
2263

push

buildkite

web-flow
Update idls (#882)

Implement required methods for new IDL types

0 of 51 new or added lines in 7 files covered. (0.0%)

16 existing lines in 7 files now uncovered.

11456 of 19052 relevant lines covered (60.13%)

0.6 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

72.46
/src/main/java/com/uber/cadence/migration/MigrationIWorkflowService.java
1
/*
2
 *  Copyright 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved.
3
 *
4
 *  Modifications copyright (C) 2017 Uber Technologies, Inc.
5
 *
6
 *  Licensed under the Apache License, Version 2.0 (the "License"). You may not
7
 *  use this file except in compliance with the License. A copy of the License is
8
 *  located at
9
 *
10
 *  http://aws.amazon.com/apache2.0
11
 *
12
 *  or in the "license" file accompanying this file. This file is distributed on
13
 *  an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
14
 *  express or implied. See the License for the specific language governing
15
 *  permissions and limitations under the License.
16
 */
17

18
package com.uber.cadence.migration;
19

20
import com.google.common.base.Strings;
21
import com.uber.cadence.*;
22
import com.uber.cadence.serviceclient.IWorkflowService;
23
import com.uber.cadence.serviceclient.IWorkflowServiceBase;
24
import java.util.Arrays;
25
import java.util.concurrent.CompletableFuture;
26
import java.util.concurrent.CompletionException;
27
import org.apache.thrift.TException;
28

29
public class MigrationIWorkflowService extends IWorkflowServiceBase {
30

31
  private IWorkflowService serviceOld, serviceNew;
32
  private String domainOld, domainNew;
33
  private static final int _defaultPageSize = 10;
34
  private static final String _listWorkflow = "_listWorkflow";
35
  private static final String _scanWorkflow = "_scanWorkflow";
36
  byte[] _marker = "to".getBytes();
1✔
37

38
  public MigrationIWorkflowService(
39
      IWorkflowService serviceOld,
40
      String domainOld,
41
      IWorkflowService serviceNew,
42
      String domainNew) {
1✔
43
    this.serviceOld = serviceOld;
1✔
44
    this.domainOld = domainOld;
1✔
45
    this.serviceNew = serviceNew;
1✔
46
    this.domainNew = domainNew;
1✔
47
  }
1✔
48

49
  @Override
50
  public StartWorkflowExecutionResponse StartWorkflowExecution(
51
      StartWorkflowExecutionRequest startRequest) throws TException {
52

53
    if (shouldStartInNew(startRequest.getWorkflowId()))
1✔
54
      return serviceNew.StartWorkflowExecution(startRequest);
1✔
55

56
    return serviceOld.StartWorkflowExecution(startRequest);
1✔
57
  }
58

59
  @Override
60
  public StartWorkflowExecutionAsyncResponse StartWorkflowExecutionAsync(
61
      StartWorkflowExecutionAsyncRequest startRequest)
62
      throws BadRequestError, WorkflowExecutionAlreadyStartedError, ServiceBusyError,
63
          DomainNotActiveError, LimitExceededError, EntityNotExistsError,
64
          ClientVersionNotSupportedError, TException {
65

NEW
66
    if (shouldStartInNew(startRequest.getRequest().getWorkflowId())) {
×
NEW
67
      return serviceNew.StartWorkflowExecutionAsync(startRequest);
×
68
    }
69

NEW
70
    return serviceOld.StartWorkflowExecutionAsync(startRequest);
×
71
  }
72

73
  /**
74
   * SignalWithStartWorkflowExecution is used to ensure sending signal to a workflow. If the
75
   * workflow is running, this results in WorkflowExecutionSignaled event being recorded in the
76
   * history and a decision task being created for the execution. If the workflow is not running or
77
   * not found, this results in WorkflowExecutionStarted and WorkflowExecutionSignaled events being
78
   * recorded in history, and a decision task being created for the execution
79
   */
80
  @Override
81
  public StartWorkflowExecutionResponse SignalWithStartWorkflowExecution(
82
      SignalWithStartWorkflowExecutionRequest signalWithStartRequest) throws TException {
83
    if (shouldStartInNew(signalWithStartRequest.getWorkflowId()))
1✔
84
      return serviceNew.SignalWithStartWorkflowExecution(signalWithStartRequest);
1✔
85
    return serviceOld.SignalWithStartWorkflowExecution(signalWithStartRequest);
1✔
86
  }
87

88
  @Override
89
  public SignalWithStartWorkflowExecutionAsyncResponse SignalWithStartWorkflowExecutionAsync(
90
      SignalWithStartWorkflowExecutionAsyncRequest signalWithStartRequest)
91
      throws BadRequestError, WorkflowExecutionAlreadyStartedError, ServiceBusyError,
92
          DomainNotActiveError, LimitExceededError, EntityNotExistsError,
93
          ClientVersionNotSupportedError, TException {
NEW
94
    if (shouldStartInNew(signalWithStartRequest.getRequest().getWorkflowId())) {
×
NEW
95
      return serviceNew.SignalWithStartWorkflowExecutionAsync(signalWithStartRequest);
×
96
    }
97

NEW
98
    return serviceOld.SignalWithStartWorkflowExecutionAsync(signalWithStartRequest);
×
99
  }
100

101
  /**
102
   * SignalWorkflowExecution is used to send a signal event to running workflow execution. This
103
   * results in WorkflowExecutionSignaled event recorded in the history and a decision task being
104
   * created for the execution.
105
   */
106
  @Override
107
  public void SignalWorkflowExecution(SignalWorkflowExecutionRequest signalRequest)
108
      throws TException {
109
    if (shouldStartInNew(signalRequest.getWorkflowExecution().getWorkflowId()))
1✔
110
      serviceNew.SignalWorkflowExecution(signalRequest);
1✔
111
    else serviceOld.SignalWorkflowExecution(signalRequest);
1✔
112
  }
1✔
113

114
  @Override
115
  public RestartWorkflowExecutionResponse RestartWorkflowExecution(
116
      RestartWorkflowExecutionRequest restartRequest)
117
      throws BadRequestError, ServiceBusyError, DomainNotActiveError, LimitExceededError,
118
          EntityNotExistsError, ClientVersionNotSupportedError, TException {
NEW
119
    if (shouldStartInNew(restartRequest.getWorkflowExecution().getWorkflowId())) {
×
NEW
120
      return serviceNew.RestartWorkflowExecution(restartRequest);
×
121
    }
122

NEW
123
    return serviceOld.RestartWorkflowExecution(restartRequest);
×
124
  }
125

126
  @Override
127
  public GetWorkflowExecutionHistoryResponse GetWorkflowExecutionHistory(
128
      GetWorkflowExecutionHistoryRequest getRequest) throws TException {
129
    if (shouldStartInNew(getRequest.execution.getWorkflowId()))
×
130
      return serviceNew.GetWorkflowExecutionHistory(getRequest);
×
131
    return serviceOld.GetWorkflowExecutionHistory(getRequest);
×
132
  }
133

134
  private ListWorkflowExecutionsResponse callOldCluster(
135
      ListWorkflowExecutionsRequest listWorkflowExecutionsRequest,
136
      int pageSizeOverride,
137
      String searchType)
138
      throws TException {
139

140
    if (pageSizeOverride != 0) {
1✔
141
      listWorkflowExecutionsRequest.setPageSize(pageSizeOverride);
1✔
142
    }
143
    ListWorkflowExecutionsResponse response = new ListWorkflowExecutionsResponse();
1✔
144
    if (searchType.equals(_listWorkflow)) {
1✔
145
      response = serviceOld.ListWorkflowExecutions(listWorkflowExecutionsRequest);
1✔
146
    } else if (searchType.equals(_scanWorkflow)) {
1✔
147
      response = serviceOld.ScanWorkflowExecutions(listWorkflowExecutionsRequest);
1✔
148
    }
149
    return response;
1✔
150
  }
151

152
  private ListWorkflowExecutionsResponse appendResultsFromOldCluster(
153
      ListWorkflowExecutionsRequest listWorkflowExecutionsRequest,
154
      ListWorkflowExecutionsResponse response,
155
      String searchType)
156
      throws TException {
157
    int responsePageSize = response.getExecutions().size();
1✔
158
    int neededPageSize = listWorkflowExecutionsRequest.getPageSize() - responsePageSize;
1✔
159

160
    ListWorkflowExecutionsResponse fromResponse =
1✔
161
        callOldCluster(listWorkflowExecutionsRequest, neededPageSize, searchType);
1✔
162

163
    // if old cluster is empty
164
    if (fromResponse == null) {
1✔
165
      return response;
1✔
166
    }
167

168
    fromResponse.getExecutions().addAll(response.getExecutions());
1✔
169
    return fromResponse;
1✔
170
  }
171

172
  public boolean hasPrefix(byte[] s, byte[] prefix) {
173
    return s == null
1✔
174
        ? false
1✔
175
        : s.length >= prefix.length
176
            && Arrays.equals(Arrays.copyOfRange(s, 0, prefix.length), prefix);
×
177
  }
178

179
  /**
180
   * This method handles pagination and combines results from both the new and old workflow service
181
   * clusters. The method first checks if the nextPageToken is not set or starts with the marker
182
   * (_marker) to determine if it should query the new cluster (serviceNew) or combine results from
183
   * both the new and old clusters. If nextPageToken is set and doesn't start with the marker, it
184
   * queries the old cluster (serviceOld). In case the response from the new cluster is null, it
185
   * retries the request on the old cluster. If the number of workflow executions returned by the
186
   * new cluster is less than the pageSize, it appends results from the old cluster to the response.
187
   *
188
   * @param listRequest The ListWorkflowExecutionsRequest containing the query parameters, including
189
   *     domain, nextPageToken, pageSize, and other filtering options.
190
   * @return The ListWorkflowExecutionsResponse containing a list of WorkflowExecutionInfo
191
   *     representing the workflow executions that match the query criteria. The response also
192
   *     includes a nextPageToken to support pagination.
193
   * @throws TException if there's any communication error with the underlying workflow service.
194
   * @throws BadRequestError if the provided ListWorkflowExecutionsRequest is invalid (null or lacks
195
   *     a domain).
196
   */
197
  @Override
198
  public ListWorkflowExecutionsResponse ListWorkflowExecutions(
199
      ListWorkflowExecutionsRequest listRequest) throws TException {
200

201
    if (listRequest == null) {
1✔
202
      throw new BadRequestError("List request is null");
1✔
203
    } else if (Strings.isNullOrEmpty(listRequest.getDomain())) {
1✔
204
      throw new BadRequestError("Domain is null or empty");
1✔
205
    }
206
    if (!listRequest.isSetPageSize()) {
1✔
207
      listRequest.pageSize = _defaultPageSize;
1✔
208
    }
209

210
    if (!listRequest.isSetNextPageToken()
1✔
211
        || listRequest.getNextPageToken().length == 0
×
212
        || hasPrefix(listRequest.getNextPageToken(), _marker)) {
×
213
      if (hasPrefix(listRequest.getNextPageToken(), _marker) == true) {
1✔
214
        listRequest.setNextPageToken(
×
215
            Arrays.copyOfRange(
×
216
                listRequest.getNextPageToken(),
×
217
                _marker.length,
218
                listRequest.getNextPageToken().length));
×
219
      }
220
      ListWorkflowExecutionsResponse response = serviceNew.ListWorkflowExecutions(listRequest);
1✔
221
      if (response == null) return callOldCluster(listRequest, 0, _listWorkflow);
1✔
222

223
      if (response.getExecutions().size() < listRequest.getPageSize()) {
1✔
224
        return appendResultsFromOldCluster(listRequest, response, _listWorkflow);
1✔
225
      }
226

227
      byte[] combinedNextPageToken = new byte[_marker.length + response.getNextPageToken().length];
1✔
228
      System.arraycopy(_marker, 0, combinedNextPageToken, 0, _marker.length);
1✔
229
      System.arraycopy(
1✔
230
          response.getNextPageToken(),
1✔
231
          0,
232
          combinedNextPageToken,
233
          _marker.length,
234
          response.getNextPageToken().length);
1✔
235
      response.setNextPageToken(combinedNextPageToken);
1✔
236
      return response;
1✔
237
    }
238
    return callOldCluster(listRequest, 0, _listWorkflow);
×
239
  }
240

241
  /**
242
   * Scans workflow executions based on the provided request parameters, handling pagination and
243
   * combining results from the new and old clusters. The method queries the new cluster
244
   * (serviceNew) if nextPageToken is not set or starts with the marker (_marker). Otherwise, it
245
   * queries the old cluster (serviceOld). Results from the old cluster are appended if needed to
246
   * maintain correct pagination.
247
   *
248
   * @param listRequest The ListWorkflowExecutionsRequest containing query parameters.
249
   * @return The ListWorkflowExecutionsResponse with WorkflowExecutionInfo and nextPageToken.
250
   * @throws TException if there's any communication error with the workflow service.
251
   * @throws BadRequestError if the provided ListWorkflowExecutionsRequest is invalid.
252
   */
253
  @Override
254
  public ListWorkflowExecutionsResponse ScanWorkflowExecutions(
255
      ListWorkflowExecutionsRequest listRequest) throws TException {
256
    ListWorkflowExecutionsResponse response;
257
    if (listRequest == null) {
1✔
258
      throw new BadRequestError("List request is null");
1✔
259
    } else if (Strings.isNullOrEmpty(listRequest.getDomain())) {
1✔
260
      throw new BadRequestError("Domain is null or empty");
1✔
261
    }
262
    if (!listRequest.isSetPageSize()) {
1✔
263
      listRequest.pageSize = _defaultPageSize;
1✔
264
    }
265

266
    if (!listRequest.isSetNextPageToken()
1✔
267
        || listRequest.getNextPageToken().length == 0
×
268
        || hasPrefix(listRequest.getNextPageToken(), _marker)) {
×
269
      if (hasPrefix(listRequest.getNextPageToken(), _marker)) {
1✔
270
        listRequest.setNextPageToken(
×
271
            Arrays.copyOfRange(
×
272
                listRequest.getNextPageToken(),
×
273
                _marker.length,
274
                listRequest.getNextPageToken().length));
×
275
      }
276
      response = serviceNew.ScanWorkflowExecutions(listRequest);
1✔
277
      if (response == null) return callOldCluster(listRequest, 0, _scanWorkflow);
1✔
278

279
      if (response.getExecutions().size() < listRequest.getPageSize()) {
1✔
280
        return appendResultsFromOldCluster(listRequest, response, _scanWorkflow);
1✔
281
      }
282

283
      byte[] combinedNextPageToken = new byte[_marker.length + response.getNextPageToken().length];
1✔
284
      System.arraycopy(_marker, 0, combinedNextPageToken, 0, _marker.length);
1✔
285
      System.arraycopy(
1✔
286
          response.getNextPageToken(),
1✔
287
          0,
288
          combinedNextPageToken,
289
          _marker.length,
290
          response.getNextPageToken().length);
1✔
291
      response.setNextPageToken(combinedNextPageToken);
1✔
292
      return response;
1✔
293
    }
294
    return callOldCluster(listRequest, 0, _scanWorkflow);
×
295
  }
296

297
  @Override
298
  public ListOpenWorkflowExecutionsResponse ListOpenWorkflowExecutions(
299
      ListOpenWorkflowExecutionsRequest listRequest) throws TException {
300
    ListOpenWorkflowExecutionsResponse response;
301
    if (listRequest == null) {
1✔
302
      throw new BadRequestError("List request is null");
1✔
303
    } else if (Strings.isNullOrEmpty(listRequest.getDomain())) {
1✔
304
      throw new BadRequestError("Domain is null or empty");
1✔
305
    }
306
    if (!listRequest.isSetMaximumPageSize()) {
1✔
307
      listRequest.maximumPageSize = _defaultPageSize;
1✔
308
    }
309

310
    if (!listRequest.isSetNextPageToken()
1✔
311
        || listRequest.getNextPageToken().length == 0
×
312
        || hasPrefix(listRequest.getNextPageToken(), _marker)) {
×
313
      if (hasPrefix(listRequest.getNextPageToken(), _marker)) {
1✔
314
        listRequest.setNextPageToken(
×
315
            Arrays.copyOfRange(
×
316
                listRequest.getNextPageToken(),
×
317
                _marker.length,
318
                listRequest.getNextPageToken().length));
×
319
      }
320
      response = serviceNew.ListOpenWorkflowExecutions(listRequest);
1✔
321
      if (response == null) return serviceOld.ListOpenWorkflowExecutions(listRequest);
1✔
322

323
      if (response.getExecutionsSize() < listRequest.getMaximumPageSize()) {
1✔
324
        int neededPageSize = listRequest.getMaximumPageSize() - response.getExecutionsSize();
1✔
325
        ListOpenWorkflowExecutionsRequest copiedRequest =
1✔
326
            new ListOpenWorkflowExecutionsRequest(listRequest);
327
        copiedRequest.maximumPageSize = neededPageSize;
1✔
328
        ListOpenWorkflowExecutionsResponse fromResponse =
1✔
329
            serviceOld.ListOpenWorkflowExecutions(copiedRequest);
1✔
330
        if (fromResponse == null) return response;
1✔
331

332
        fromResponse.getExecutions().addAll(response.getExecutions());
1✔
333
        return fromResponse;
1✔
334
      }
335

336
      byte[] combinedNextPageToken = new byte[_marker.length + response.getNextPageToken().length];
1✔
337
      System.arraycopy(_marker, 0, combinedNextPageToken, 0, _marker.length);
1✔
338
      System.arraycopy(
1✔
339
          response.getNextPageToken(),
1✔
340
          0,
341
          combinedNextPageToken,
342
          _marker.length,
343
          response.getNextPageToken().length);
1✔
344
      response.setNextPageToken(combinedNextPageToken);
1✔
345
      return response;
1✔
346
    }
347
    return serviceOld.ListOpenWorkflowExecutions(listRequest);
×
348
  }
349

350
  @Override
351
  public ListClosedWorkflowExecutionsResponse ListClosedWorkflowExecutions(
352
      ListClosedWorkflowExecutionsRequest listRequest) throws TException {
353
    ListClosedWorkflowExecutionsResponse response;
354
    if (listRequest == null) {
1✔
355
      throw new BadRequestError("List request is null");
1✔
356
    } else if (Strings.isNullOrEmpty(listRequest.getDomain())) {
1✔
357
      throw new BadRequestError("Domain is null or empty");
1✔
358
    }
359
    if (!listRequest.isSetMaximumPageSize()) {
1✔
360
      listRequest.maximumPageSize = _defaultPageSize;
1✔
361
    }
362

363
    if (!listRequest.isSetNextPageToken()
1✔
364
        || listRequest.getNextPageToken().length == 0
×
365
        || hasPrefix(listRequest.getNextPageToken(), _marker)) {
×
366
      if (hasPrefix(listRequest.getNextPageToken(), _marker)) {
1✔
367
        listRequest.setNextPageToken(
×
368
            Arrays.copyOfRange(
×
369
                listRequest.getNextPageToken(),
×
370
                _marker.length,
371
                listRequest.getNextPageToken().length));
×
372
      }
373
      response = serviceNew.ListClosedWorkflowExecutions(listRequest);
1✔
374
      if (response == null) return serviceOld.ListClosedWorkflowExecutions(listRequest);
1✔
375

376
      if (response.getExecutionsSize() < listRequest.getMaximumPageSize()) {
1✔
377
        int neededPageSize = listRequest.getMaximumPageSize() - response.getExecutionsSize();
1✔
378
        ListClosedWorkflowExecutionsRequest copiedRequest =
1✔
379
            new ListClosedWorkflowExecutionsRequest(listRequest);
380
        copiedRequest.maximumPageSize = neededPageSize;
1✔
381
        ListClosedWorkflowExecutionsResponse fromResponse =
1✔
382
            serviceOld.ListClosedWorkflowExecutions(copiedRequest);
1✔
383
        if (fromResponse == null) return response;
1✔
384

385
        fromResponse.getExecutions().addAll(response.getExecutions());
1✔
386
        return fromResponse;
1✔
387
      }
388

389
      byte[] combinedNextPageToken = new byte[_marker.length + response.getNextPageToken().length];
1✔
390
      System.arraycopy(_marker, 0, combinedNextPageToken, 0, _marker.length);
1✔
391
      System.arraycopy(
1✔
392
          response.getNextPageToken(),
1✔
393
          0,
394
          combinedNextPageToken,
395
          _marker.length,
396
          response.getNextPageToken().length);
1✔
397
      response.setNextPageToken(combinedNextPageToken);
1✔
398
      return response;
1✔
399
    }
400
    return serviceOld.ListClosedWorkflowExecutions(listRequest);
×
401
  }
402

403
  @Override
404
  public QueryWorkflowResponse QueryWorkflow(QueryWorkflowRequest queryRequest) throws TException {
405

406
    try {
407
      if (shouldStartInNew(queryRequest.getExecution().getWorkflowId()))
1✔
408
        return serviceNew.QueryWorkflow(queryRequest);
1✔
409
      return serviceOld.QueryWorkflow(queryRequest);
1✔
410
    } catch (NullPointerException e) {
1✔
411
      throw new NullPointerException(
1✔
412
          "Query does not have workflowID associated: " + e.getMessage());
1✔
413
    }
414
  }
415

416
  @Override
417
  public CountWorkflowExecutionsResponse CountWorkflowExecutions(
418
      CountWorkflowExecutionsRequest countRequest) throws TException {
419

420
    CountWorkflowExecutionsResponse countResponseNew =
1✔
421
        serviceNew.CountWorkflowExecutions(countRequest);
1✔
422
    CountWorkflowExecutionsResponse countResponseOld =
1✔
423
        serviceOld.CountWorkflowExecutions(countRequest);
1✔
424
    if (countResponseNew == null) return countResponseOld;
1✔
425
    if (countResponseOld == null) return countResponseNew;
1✔
426

427
    countResponseOld.setCount(countResponseOld.getCount() + countResponseNew.getCount());
1✔
428
    return countResponseOld;
1✔
429
  }
430

431
  @Override
432
  public void TerminateWorkflowExecution(TerminateWorkflowExecutionRequest terminateRequest)
433
      throws TException {
434
    try {
435
      serviceNew.TerminateWorkflowExecution(terminateRequest);
×
436
    } catch (EntityNotExistsError e) {
×
437
      serviceOld.TerminateWorkflowExecution(terminateRequest);
×
438
    }
×
439
  }
×
440

441
  private Boolean shouldStartInNew(String workflowID) throws TException {
442
    try {
443
      return describeWorkflowExecution(serviceNew, domainNew, workflowID)
1✔
444
          .thenCombine(
1✔
445
              describeWorkflowExecution(serviceOld, domainOld, workflowID),
1✔
446
              (respNew, respOld) ->
447
                  respNew != null // execution already in new
1✔
448
                      || respOld == null // execution not exist in new and not exist in old
449
                      || (respOld.isSetWorkflowExecutionInfo()
1✔
450
                          && respOld
451
                              .getWorkflowExecutionInfo()
×
452
                              .isSetCloseStatus()) // execution not exist in new and execution is
1✔
453
              // closed in old
454
              )
455
          .get();
1✔
456
    } catch (CompletionException e) {
×
457
      throw e.getCause() instanceof TException
×
458
          ? (TException) e.getCause()
×
459
          : new TException("unknown error: " + e.getMessage());
×
460
    } catch (Exception e) {
×
461
      throw new TException("Unknown error: " + e.getMessage());
×
462
    }
463
  }
464

465
  private CompletableFuture<DescribeWorkflowExecutionResponse> describeWorkflowExecution(
466
      IWorkflowService service, String domain, String workflowID) {
467
    return CompletableFuture.supplyAsync(
1✔
468
        () -> {
469
          try {
470
            return service.DescribeWorkflowExecution(
1✔
471
                new DescribeWorkflowExecutionRequest()
472
                    .setDomain(domain)
1✔
473
                    .setExecution(new WorkflowExecution().setWorkflowId(workflowID)));
1✔
474
          } catch (EntityNotExistsError e) {
×
475
            return null;
×
476
          } catch (Exception e) {
×
477
            throw new CompletionException(e);
×
478
          }
479
        });
480
  }
481
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc