• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

temporalio / sdk-java / #257

29 May 2024 08:30PM UTC coverage: 77.442% (-0.02%) from 77.463%
#257

push

github

web-flow
Add support for nextRetryDelay (#2081)

Add support for nextRetryDelay

38 of 43 new or added lines in 6 files covered. (88.37%)

11 existing lines in 5 files now uncovered.

19204 of 24798 relevant lines covered (77.44%)

0.77 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

82.16
/temporal-sdk/src/main/java/io/temporal/failure/DefaultFailureConverter.java
1
/*
2
 * Copyright (C) 2022 Temporal Technologies, Inc. All Rights Reserved.
3
 *
4
 * Copyright (C) 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved.
5
 *
6
 * Modifications copyright (C) 2017 Uber Technologies, Inc.
7
 *
8
 * Licensed under the Apache License, Version 2.0 (the "License");
9
 * you may not use this material except in compliance with the License.
10
 * You may obtain a copy of the License at
11
 *
12
 *   http://www.apache.org/licenses/LICENSE-2.0
13
 *
14
 * Unless required by applicable law or agreed to in writing, software
15
 * distributed under the License is distributed on an "AS IS" BASIS,
16
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17
 * See the License for the specific language governing permissions and
18
 * limitations under the License.
19
 */
20

21
package io.temporal.failure;
22

23
import com.google.common.base.Preconditions;
24
import com.google.common.base.Strings;
25
import com.google.common.collect.ImmutableSet;
26
import io.temporal.api.common.v1.ActivityType;
27
import io.temporal.api.common.v1.Payloads;
28
import io.temporal.api.common.v1.WorkflowType;
29
import io.temporal.api.failure.v1.ActivityFailureInfo;
30
import io.temporal.api.failure.v1.ApplicationFailureInfo;
31
import io.temporal.api.failure.v1.CanceledFailureInfo;
32
import io.temporal.api.failure.v1.ChildWorkflowExecutionFailureInfo;
33
import io.temporal.api.failure.v1.Failure;
34
import io.temporal.api.failure.v1.ResetWorkflowFailureInfo;
35
import io.temporal.api.failure.v1.ServerFailureInfo;
36
import io.temporal.api.failure.v1.TerminatedFailureInfo;
37
import io.temporal.api.failure.v1.TimeoutFailureInfo;
38
import io.temporal.client.ActivityCanceledException;
39
import io.temporal.common.converter.DataConverter;
40
import io.temporal.common.converter.EncodedValues;
41
import io.temporal.common.converter.FailureConverter;
42
import io.temporal.internal.activity.ActivityTaskHandlerImpl;
43
import io.temporal.internal.common.ProtobufTimeUtils;
44
import io.temporal.internal.sync.POJOWorkflowImplementationFactory;
45
import io.temporal.serviceclient.CheckedExceptionWrapper;
46
import java.io.PrintWriter;
47
import java.io.StringWriter;
48
import java.util.ArrayList;
49
import java.util.Optional;
50
import java.util.regex.Matcher;
51
import java.util.regex.Pattern;
52
import javax.annotation.Nonnull;
53
import org.slf4j.Logger;
54
import org.slf4j.LoggerFactory;
55

56
/**
57
 * A {@link FailureConverter} that implements the default cross-language-compatible conversion
58
 * algorithm.
59
 */
60
public final class DefaultFailureConverter implements FailureConverter {
1✔
61

62
  private static final Logger log = LoggerFactory.getLogger(DefaultFailureConverter.class);
1✔
63

64
  private static final String JAVA_SDK = "JavaSDK";
65

66
  /**
67
   * Stop emitting stack trace after this line. Makes serialized stack traces more readable and
68
   * compact as it omits most of framework-level code.
69
   */
70
  private static final ImmutableSet<String> CUTOFF_METHOD_NAMES =
71
      ImmutableSet.<String>builder()
1✔
72
          .addAll(ActivityTaskHandlerImpl.ACTIVITY_HANDLER_STACKTRACE_CUTOFF)
1✔
73
          .addAll(POJOWorkflowImplementationFactory.WORKFLOW_HANDLER_STACKTRACE_CUTOFF)
1✔
74
          .build();
1✔
75

76
  /** Used to parse a stack trace line. */
77
  private static final Pattern TRACE_ELEMENT_PATTERN =
1✔
78
      Pattern.compile(
1✔
79
          "((?<className>.*)\\.(?<methodName>.*))\\(((?<fileName>.*?)(:(?<lineNumber>\\d+))?)\\)");
80

81
  @Override
82
  @Nonnull
83
  public TemporalFailure failureToException(
84
      @Nonnull Failure failure, @Nonnull DataConverter dataConverter) {
85
    Preconditions.checkNotNull(failure, "failure");
1✔
86
    Preconditions.checkNotNull(dataConverter, "dataConverter");
1✔
87
    TemporalFailure result = failureToExceptionImpl(failure, dataConverter);
1✔
88
    result.setFailure(failure);
1✔
89
    if (failure.getSource().equals(JAVA_SDK) && !failure.getStackTrace().isEmpty()) {
1✔
90
      StackTraceElement[] stackTrace = parseStackTrace(failure.getStackTrace());
1✔
91
      result.setStackTrace(stackTrace);
1✔
92
    }
93
    return result;
1✔
94
  }
95

96
  private TemporalFailure failureToExceptionImpl(Failure failure, DataConverter dataConverter) {
97
    TemporalFailure cause =
98
        failure.hasCause() ? failureToException(failure.getCause(), dataConverter) : null;
1✔
99
    switch (failure.getFailureInfoCase()) {
1✔
100
      case APPLICATION_FAILURE_INFO:
101
        {
102
          ApplicationFailureInfo info = failure.getApplicationFailureInfo();
1✔
103
          Optional<Payloads> details =
104
              info.hasDetails() ? Optional.of(info.getDetails()) : Optional.empty();
1✔
105
          return ApplicationFailure.newFromValues(
1✔
106
              failure.getMessage(),
1✔
107
              info.getType(),
1✔
108
              info.getNonRetryable(),
1✔
109
              new EncodedValues(details, dataConverter),
110
              cause,
111
              info.hasNextRetryDelay()
1✔
NEW
112
                  ? ProtobufTimeUtils.toJavaDuration(info.getNextRetryDelay())
×
113
                  : null);
1✔
114
        }
115
      case TIMEOUT_FAILURE_INFO:
116
        {
117
          TimeoutFailureInfo info = failure.getTimeoutFailureInfo();
1✔
118
          Optional<Payloads> lastHeartbeatDetails =
119
              info.hasLastHeartbeatDetails()
1✔
120
                  ? Optional.of(info.getLastHeartbeatDetails())
1✔
121
                  : Optional.empty();
1✔
122
          TimeoutFailure tf =
1✔
123
              new TimeoutFailure(
124
                  failure.getMessage(),
1✔
125
                  new EncodedValues(lastHeartbeatDetails, dataConverter),
126
                  info.getTimeoutType(),
1✔
127
                  cause);
128
          tf.setStackTrace(new StackTraceElement[0]);
1✔
129
          return tf;
1✔
130
        }
131
      case CANCELED_FAILURE_INFO:
132
        {
133
          CanceledFailureInfo info = failure.getCanceledFailureInfo();
1✔
134
          Optional<Payloads> details =
135
              info.hasDetails() ? Optional.of(info.getDetails()) : Optional.empty();
1✔
136
          return new CanceledFailure(
1✔
137
              failure.getMessage(), new EncodedValues(details, dataConverter), cause);
1✔
138
        }
139
      case TERMINATED_FAILURE_INFO:
140
        return new TerminatedFailure(failure.getMessage(), cause);
×
141
      case SERVER_FAILURE_INFO:
142
        {
143
          ServerFailureInfo info = failure.getServerFailureInfo();
×
144
          return new ServerFailure(failure.getMessage(), info.getNonRetryable(), cause);
×
145
        }
146
      case RESET_WORKFLOW_FAILURE_INFO:
147
        {
148
          ResetWorkflowFailureInfo info = failure.getResetWorkflowFailureInfo();
×
149
          Optional<Payloads> details =
150
              info.hasLastHeartbeatDetails()
×
151
                  ? Optional.of(info.getLastHeartbeatDetails())
×
152
                  : Optional.empty();
×
153
          return new ApplicationFailure(
×
154
              failure.getMessage(),
×
155
              "ResetWorkflow",
156
              false,
157
              new EncodedValues(details, dataConverter),
158
              cause,
159
              null);
160
        }
161
      case ACTIVITY_FAILURE_INFO:
162
        {
163
          ActivityFailureInfo info = failure.getActivityFailureInfo();
1✔
164
          return new ActivityFailure(
1✔
165
              failure.getMessage(),
1✔
166
              info.getScheduledEventId(),
1✔
167
              info.getStartedEventId(),
1✔
168
              info.getActivityType().getName(),
1✔
169
              info.getActivityId(),
1✔
170
              info.getRetryState(),
1✔
171
              info.getIdentity(),
1✔
172
              cause);
173
        }
174
      case CHILD_WORKFLOW_EXECUTION_FAILURE_INFO:
175
        {
176
          ChildWorkflowExecutionFailureInfo info = failure.getChildWorkflowExecutionFailureInfo();
1✔
177
          return new ChildWorkflowFailure(
1✔
178
              info.getInitiatedEventId(),
1✔
179
              info.getStartedEventId(),
1✔
180
              info.getWorkflowType().getName(),
1✔
181
              info.getWorkflowExecution(),
1✔
182
              info.getNamespace(),
1✔
183
              info.getRetryState(),
1✔
184
              cause);
185
        }
186
      case FAILUREINFO_NOT_SET:
187
      default:
188
        throw new IllegalArgumentException("Failure info not set");
×
189
    }
190
  }
191

192
  @Override
193
  @Nonnull
194
  public Failure exceptionToFailure(
195
      @Nonnull Throwable throwable, @Nonnull DataConverter dataConverter) {
196
    Preconditions.checkNotNull(dataConverter, "dataConverter");
1✔
197
    Preconditions.checkNotNull(throwable, "throwable");
1✔
198
    Throwable ex = throwable;
1✔
199
    while (ex != null) {
1✔
200
      if (ex instanceof TemporalFailure) {
1✔
201
        ((TemporalFailure) ex).setDataConverter(dataConverter);
1✔
202
      }
203
      ex = ex.getCause();
1✔
204
    }
205
    return this.exceptionToFailure(throwable);
1✔
206
  }
207

208
  @Nonnull
209
  private Failure exceptionToFailure(Throwable throwable) {
210
    if (throwable instanceof CheckedExceptionWrapper) {
1✔
211
      return exceptionToFailure(throwable.getCause());
×
212
    }
213
    String message;
214
    if (throwable instanceof TemporalFailure) {
1✔
215
      TemporalFailure tf = (TemporalFailure) throwable;
1✔
216
      if (tf.getFailure().isPresent()) {
1✔
217
        return tf.getFailure().get();
1✔
218
      }
219
      message = tf.getOriginalMessage();
1✔
220
    } else {
1✔
221
      message = throwable.getMessage() == null ? "" : throwable.getMessage();
1✔
222
    }
223
    String stackTrace = serializeStackTrace(throwable);
1✔
224
    Failure.Builder failure = Failure.newBuilder().setSource(JAVA_SDK);
1✔
225
    failure.setMessage(message).setStackTrace(stackTrace);
1✔
226
    if (throwable.getCause() != null) {
1✔
227
      failure.setCause(exceptionToFailure(throwable.getCause()));
1✔
228
    }
229
    if (throwable instanceof ApplicationFailure) {
1✔
230
      ApplicationFailure ae = (ApplicationFailure) throwable;
1✔
231
      ApplicationFailureInfo.Builder info =
232
          ApplicationFailureInfo.newBuilder()
1✔
233
              .setType(ae.getType())
1✔
234
              .setNonRetryable(ae.isNonRetryable());
1✔
235
      Optional<Payloads> details = ((EncodedValues) ae.getDetails()).toPayloads();
1✔
236
      if (details.isPresent()) {
1✔
237
        info.setDetails(details.get());
1✔
238
      }
239
      if (ae.getNextRetryDelay() != null) {
1✔
240
        info.setNextRetryDelay(ProtobufTimeUtils.toProtoDuration(ae.getNextRetryDelay()));
1✔
241
      }
242
      failure.setApplicationFailureInfo(info);
1✔
243
    } else if (throwable instanceof TimeoutFailure) {
1✔
244
      TimeoutFailure te = (TimeoutFailure) throwable;
1✔
245
      TimeoutFailureInfo.Builder info =
246
          TimeoutFailureInfo.newBuilder().setTimeoutType(te.getTimeoutType());
1✔
247
      Optional<Payloads> details = ((EncodedValues) te.getLastHeartbeatDetails()).toPayloads();
1✔
248
      if (details.isPresent()) {
1✔
249
        info.setLastHeartbeatDetails(details.get());
1✔
250
      }
251
      failure.setTimeoutFailureInfo(info);
1✔
252
    } else if (throwable instanceof CanceledFailure) {
1✔
253
      CanceledFailure ce = (CanceledFailure) throwable;
1✔
254
      CanceledFailureInfo.Builder info = CanceledFailureInfo.newBuilder();
1✔
255
      Optional<Payloads> details = ((EncodedValues) ce.getDetails()).toPayloads();
1✔
256
      if (details.isPresent()) {
1✔
257
        info.setDetails(details.get());
1✔
258
      }
259
      failure.setCanceledFailureInfo(info);
1✔
260
    } else if (throwable instanceof TerminatedFailure) {
1✔
261
      TerminatedFailure te = (TerminatedFailure) throwable;
×
262
      failure.setTerminatedFailureInfo(TerminatedFailureInfo.getDefaultInstance());
×
263
    } else if (throwable instanceof ServerFailure) {
1✔
264
      ServerFailure se = (ServerFailure) throwable;
1✔
265
      failure.setServerFailureInfo(
1✔
266
          ServerFailureInfo.newBuilder().setNonRetryable(se.isNonRetryable()));
1✔
267
    } else if (throwable instanceof ActivityFailure) {
1✔
268
      ActivityFailure ae = (ActivityFailure) throwable;
×
269
      ActivityFailureInfo.Builder info =
270
          ActivityFailureInfo.newBuilder()
×
271
              .setActivityId(ae.getActivityId() == null ? "" : ae.getActivityId())
×
272
              .setActivityType(ActivityType.newBuilder().setName(ae.getActivityType()))
×
273
              .setIdentity(ae.getIdentity())
×
274
              .setRetryState(ae.getRetryState())
×
275
              .setScheduledEventId(ae.getScheduledEventId())
×
276
              .setStartedEventId(ae.getStartedEventId());
×
277
      failure.setActivityFailureInfo(info);
×
278
    } else if (throwable instanceof ChildWorkflowFailure) {
1✔
279
      ChildWorkflowFailure ce = (ChildWorkflowFailure) throwable;
1✔
280
      ChildWorkflowExecutionFailureInfo.Builder info =
281
          ChildWorkflowExecutionFailureInfo.newBuilder()
1✔
282
              .setInitiatedEventId(ce.getInitiatedEventId())
1✔
283
              .setStartedEventId(ce.getStartedEventId())
1✔
284
              .setNamespace(ce.getNamespace() == null ? "" : ce.getNamespace())
1✔
285
              .setRetryState(ce.getRetryState())
1✔
286
              .setWorkflowType(WorkflowType.newBuilder().setName(ce.getWorkflowType()))
1✔
287
              .setWorkflowExecution(ce.getExecution());
1✔
288
      failure.setChildWorkflowExecutionFailureInfo(info);
1✔
289
    } else if (throwable instanceof ActivityCanceledException) {
1✔
290
      CanceledFailureInfo.Builder info = CanceledFailureInfo.newBuilder();
×
291
      failure.setCanceledFailureInfo(info);
×
292
    } else {
×
293
      ApplicationFailureInfo.Builder info =
294
          ApplicationFailureInfo.newBuilder()
1✔
295
              .setType(throwable.getClass().getName())
1✔
296
              .setNonRetryable(false);
1✔
297
      failure.setApplicationFailureInfo(info);
1✔
298
    }
299
    return failure.build();
1✔
300
  }
301

302
  /** Parses stack trace serialized using {@link #serializeStackTrace(Throwable)}. */
303
  private StackTraceElement[] parseStackTrace(String stackTrace) {
304
    if (Strings.isNullOrEmpty(stackTrace)) {
1✔
305
      return new StackTraceElement[0];
×
306
    }
307
    try {
308
      @SuppressWarnings("StringSplitter")
309
      String[] lines = stackTrace.split("\r\n|\n");
1✔
310
      ArrayList<StackTraceElement> result = new ArrayList<>(lines.length);
1✔
311
      for (int i = 0; i < lines.length; i++) {
1✔
312
        StackTraceElement elem = parseStackTraceElement(lines[i]);
1✔
313
        if (elem != null) {
1✔
314
          result.add(elem);
1✔
315
        }
316
      }
317
      return result.toArray(new StackTraceElement[result.size()]);
1✔
318
    } catch (Exception e) {
×
319
      if (log.isWarnEnabled()) {
×
320
        log.warn("Failed to parse stack trace: " + stackTrace);
×
321
      }
322
      return new StackTraceElement[0];
×
323
    }
324
  }
325

326
  /**
327
   * See {@link StackTraceElement#toString()} for input specification.
328
   *
329
   * @param line line of stack trace.
330
   * @return StackTraceElement that contains data from that line.
331
   */
332
  private StackTraceElement parseStackTraceElement(String line) {
333
    Matcher matcher = TRACE_ELEMENT_PATTERN.matcher(line);
1✔
334
    if (!matcher.matches()) {
1✔
335
      return null;
×
336
    }
337
    String declaringClass = matcher.group("className");
1✔
338
    String methodName = matcher.group("methodName");
1✔
339
    String fileName = matcher.group("fileName");
1✔
340
    int lineNumber = 0;
1✔
341
    String lns = matcher.group("lineNumber");
1✔
342
    if (lns != null && lns.length() > 0) {
1✔
343
      try {
344
        lineNumber = Integer.parseInt(matcher.group("lineNumber"));
1✔
345
      } catch (NumberFormatException e) {
×
346
      }
1✔
347
    }
348
    return new StackTraceElement(declaringClass, methodName, fileName, lineNumber);
1✔
349
  }
350

351
  private String serializeStackTrace(Throwable e) {
352
    StringWriter sw = new StringWriter();
1✔
353
    PrintWriter pw = new PrintWriter(sw);
1✔
354
    StackTraceElement[] trace = e.getStackTrace();
1✔
355
    for (StackTraceElement element : trace) {
1✔
356
      pw.println(element);
1✔
357
      String fullMethodName = element.getClassName() + "." + element.getMethodName();
1✔
358
      if (CUTOFF_METHOD_NAMES.contains(fullMethodName)) {
1✔
359
        break;
1✔
360
      }
361
    }
362
    return sw.toString();
1✔
363
  }
364
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc