• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

uber / cadence-java-client / 2542

23 Oct 2024 10:17PM UTC coverage: 65.75% (+0.4%) from 65.318%
2542

Pull #921

buildkite

natemort
Remove unused code from cadence.internal package
Pull Request #921: Remove unused code from cadence.internal package

12741 of 19378 relevant lines covered (65.75%)

0.66 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

86.44
/src/main/java/com/uber/cadence/internal/sync/WorkflowRetryerInternal.java
1
/*
2
 *  Copyright 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved.
3
 *
4
 *  Modifications copyright (C) 2017 Uber Technologies, Inc.
5
 *
6
 *  Licensed under the Apache License, Version 2.0 (the "License"). You may not
7
 *  use this file except in compliance with the License. A copy of the License is
8
 *  located at
9
 *
10
 *  http://aws.amazon.com/apache2.0
11
 *
12
 *  or in the "license" file accompanying this file. This file is distributed on
13
 *  an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
14
 *  express or implied. See the License for the specific language governing
15
 *  permissions and limitations under the License.
16
 */
17

18
package com.uber.cadence.internal.sync;
19

20
import com.uber.cadence.common.RetryOptions;
21
import com.uber.cadence.workflow.ActivityFailureException;
22
import com.uber.cadence.workflow.CompletablePromise;
23
import com.uber.cadence.workflow.Functions;
24
import com.uber.cadence.workflow.Promise;
25
import com.uber.cadence.workflow.Workflow;
26
import java.time.Duration;
27

28
/**
29
 * Implements operation retry logic for both synchronous and asynchronous operations. Internal
30
 * class. Do not reference this class directly. Use {@link Workflow#retry(RetryOptions,
31
 * Functions.Func)} or Async{@link #retry(RetryOptions, Functions.Func)}.
32
 */
33
final class WorkflowRetryerInternal {
34

35
  public static <R> R validateOptionsAndRetry(RetryOptions options, Functions.Func<R> func) {
36
    return retry(RetryOptions.merge(null, options), func);
×
37
  }
38

39
  /**
40
   * Retry function synchronously.
41
   *
42
   * @param options retry options.
43
   * @param func procedure to retry.
44
   * @return result of func if ever completed successfully.
45
   */
46
  public static <R> R retry(RetryOptions options, Functions.Func<R> func) {
47
    options.validate();
1✔
48
    int attempt = 1;
1✔
49
    long startTime = WorkflowInternal.currentTimeMillis();
1✔
50
    // Records retry options in the history allowing changing them without breaking determinism.
51
    String retryId = WorkflowInternal.randomUUID().toString();
1✔
52
    RetryOptions retryOptions =
1✔
53
        WorkflowInternal.mutableSideEffect(
1✔
54
            retryId, RetryOptions.class, RetryOptions.class, Object::equals, () -> options);
1✔
55
    while (true) {
56
      long nextSleepTime = retryOptions.calculateSleepTime(attempt);
1✔
57
      try {
58
        return func.apply();
×
59
      } catch (Exception e) {
1✔
60
        long elapsed = WorkflowInternal.currentTimeMillis() - startTime;
1✔
61
        if (retryOptions.shouldRethrow(e, attempt, elapsed, nextSleepTime)) {
1✔
62
          throw WorkflowInternal.wrap(e);
1✔
63
        }
64
      }
65
      attempt++;
1✔
66
      WorkflowInternal.sleep(Duration.ofMillis(nextSleepTime));
1✔
67
    }
1✔
68
  }
69

70
  /**
71
   * Retry function asynchronously.
72
   *
73
   * @param options retry options.
74
   * @param func procedure to retry.
75
   * @return result promise to the result or failure if retries stopped according to options.
76
   */
77
  public static <R> Promise<R> retryAsync(RetryOptions options, Functions.Func<Promise<R>> func) {
78
    String retryId = WorkflowInternal.randomUUID().toString();
1✔
79
    long startTime = WorkflowInternal.currentTimeMillis();
1✔
80
    return retryAsync(retryId, options, func, startTime, 1);
1✔
81
  }
82

83
  private static <R> Promise<R> retryAsync(
84
      String retryId,
85
      RetryOptions options,
86
      Functions.Func<Promise<R>> func,
87
      long startTime,
88
      long attempt) {
89
    options.validate();
1✔
90
    RetryOptions retryOptions =
1✔
91
        WorkflowInternal.mutableSideEffect(
1✔
92
            retryId, RetryOptions.class, RetryOptions.class, Object::equals, () -> options);
1✔
93

94
    CompletablePromise<R> funcResult = WorkflowInternal.newCompletablePromise();
1✔
95
    try {
96
      funcResult.completeFrom(func.apply());
1✔
97
    } catch (RuntimeException e) {
×
98
      funcResult.completeExceptionally(e);
×
99
    }
1✔
100
    return funcResult
1✔
101
        .handle(
1✔
102
            (r, e) -> {
103
              if (e == null) {
1✔
104
                return WorkflowInternal.newPromise(r);
×
105
              }
106
              long elapsed = WorkflowInternal.currentTimeMillis() - startTime;
1✔
107
              long sleepTime = retryOptions.calculateSleepTime(attempt);
1✔
108
              if (retryOptions.shouldRethrow(e, attempt, elapsed, sleepTime)) {
1✔
109
                throw e;
1✔
110
              }
111
              // newTimer runs in a separate thread, so it performs trampolining eliminating tail
112
              // recursion.
113
              return WorkflowInternal.newTimer(Duration.ofMillis(sleepTime))
1✔
114
                  .thenCompose(
1✔
115
                      (nil) -> retryAsync(retryId, retryOptions, func, startTime, attempt + 1));
1✔
116
            })
117
        .thenCompose((r) -> r);
1✔
118
  }
119

120
  static <R> Promise<R> retryAsync(
121
      Functions.Func2<Integer, Long, Promise<R>> func, int attempt, long startTime) {
122

123
    CompletablePromise<R> funcResult = WorkflowInternal.newCompletablePromise();
1✔
124
    try {
125
      funcResult.completeFrom(func.apply(attempt, startTime));
1✔
126
    } catch (RuntimeException e) {
×
127
      funcResult.completeExceptionally(e);
×
128
    }
1✔
129

130
    return funcResult
1✔
131
        .handle(
1✔
132
            (r, e) -> {
133
              if (e == null) {
1✔
134
                return WorkflowInternal.newPromise(r);
1✔
135
              }
136

137
              if (!(e instanceof ActivityFailureException)) {
1✔
138
                throw e;
×
139
              }
140

141
              ActivityFailureException afe = (ActivityFailureException) e;
1✔
142

143
              if (afe.getBackoff() == null) {
1✔
144
                throw e;
1✔
145
              }
146

147
              // newTimer runs in a separate thread, so it performs trampolining eliminating tail
148
              // recursion.
149
              long nextStart = WorkflowInternal.currentTimeMillis() + afe.getBackoff().toMillis();
1✔
150
              return WorkflowInternal.newTimer(afe.getBackoff())
1✔
151
                  .thenCompose((nil) -> retryAsync(func, afe.getAttempt() + 1, nextStart));
1✔
152
            })
153
        .thenCompose((r) -> r);
1✔
154
  }
155

156
  private WorkflowRetryerInternal() {}
157
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc