• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

temporalio / sdk-java / #103

pending completion
#103

push

github-actions

web-flow
Implement retry of local activities for over local retry threshold duration (#1542)

Issue #1261

244 of 244 new or added lines in 16 files covered. (100.0%)

16122 of 19841 relevant lines covered (81.26%)

0.81 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

97.3
/temporal-sdk/src/main/java/io/temporal/internal/worker/LocalActivityExecutionContext.java
1
/*
2
 * Copyright (C) 2022 Temporal Technologies, Inc. All Rights Reserved.
3
 *
4
 * Copyright (C) 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved.
5
 *
6
 * Modifications copyright (C) 2017 Uber Technologies, Inc.
7
 *
8
 * Licensed under the Apache License, Version 2.0 (the "License");
9
 * you may not use this material except in compliance with the License.
10
 * You may obtain a copy of the License at
11
 *
12
 *   http://www.apache.org/licenses/LICENSE-2.0
13
 *
14
 * Unless required by applicable law or agreed to in writing, software
15
 * distributed under the License is distributed on an "AS IS" BASIS,
16
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17
 * See the License for the specific language governing permissions and
18
 * limitations under the License.
19
 */
20

21
package io.temporal.internal.worker;
22

23
import io.grpc.Deadline;
24
import io.temporal.api.failure.v1.Failure;
25
import io.temporal.api.workflow.v1.PendingActivityInfo;
26
import io.temporal.api.workflowservice.v1.DescribeWorkflowExecutionRequest;
27
import io.temporal.api.workflowservice.v1.PollActivityTaskQueueResponse;
28
import io.temporal.internal.statemachines.ExecuteLocalActivityParameters;
29
import io.temporal.workflow.Functions;
30
import java.time.Duration;
31
import java.util.Objects;
32
import java.util.concurrent.CompletableFuture;
33
import java.util.concurrent.ScheduledFuture;
34
import java.util.concurrent.atomic.AtomicInteger;
35
import java.util.concurrent.atomic.AtomicReference;
36
import javax.annotation.Nonnull;
37
import javax.annotation.Nullable;
38

39
class LocalActivityExecutionContext {
40
  private final @Nonnull ExecuteLocalActivityParameters executionParams;
41
  private final @Nonnull CompletableFuture<LocalActivityResult> executionResult =
1✔
42
      new CompletableFuture<>();
43

44
  private final @Nullable Deadline scheduleToCloseDeadline;
45

46
  private final @Nonnull AtomicInteger currentAttempt;
47

48
  private final @Nonnull AtomicReference<Failure> lastAttemptFailure = new AtomicReference<>();
1✔
49

50
  private @Nullable ScheduledFuture<?> scheduleToCloseFuture;
51

52
  public LocalActivityExecutionContext(
53
      @Nonnull ExecuteLocalActivityParameters executionParams,
54
      @Nonnull Functions.Proc1<LocalActivityResult> resultCallback,
55
      @Nullable Deadline scheduleToCloseDeadline) {
1✔
56
    this.executionParams = Objects.requireNonNull(executionParams, "executionParams");
1✔
57
    this.executionResult.thenAccept(
1✔
58
        Objects.requireNonNull(resultCallback, "resultCallback")::apply);
1✔
59
    this.scheduleToCloseDeadline = scheduleToCloseDeadline;
1✔
60
    this.currentAttempt = new AtomicInteger(executionParams.getInitialAttempt());
1✔
61
    Failure previousExecutionFailure = executionParams.getPreviousLocalExecutionFailure();
1✔
62
    if (previousExecutionFailure != null) {
1✔
63
      if (previousExecutionFailure.hasTimeoutFailureInfo() && previousExecutionFailure.hasCause()) {
1✔
64
        // It can be only startToClose timeout (others would be fatal).
65
        // Structure TIMEOUT_TYPE_START_TO_CLOSE -> TIMEOUT_TYPE_START_TO_CLOSE or
66
        // TIMEOUT_TYPE_START_TO_CLOSE -> ApplicationFailure is possible here - chaining of
67
        // startToClose timeout with a previous failure.
68
        // We reconstruct the last attempt failure (that would typically be preserved in a mutable
69
        // state) from local activity execution failure.
70
        // See a similar logic in
71
        // io.temporal.internal.testservice.StateMachines#timeoutActivityTask.
72
        lastAttemptFailure.set(Failure.newBuilder(previousExecutionFailure).clearCause().build());
×
73
      } else {
74
        lastAttemptFailure.set(previousExecutionFailure);
1✔
75
      }
76
    }
77
  }
1✔
78

79
  public String getActivityId() {
80
    return executionParams.getActivityId();
1✔
81
  }
82

83
  public int getCurrentAttempt() {
84
    return currentAttempt.get();
1✔
85
  }
86

87
  /**
88
   * The last failure preserved for this activity execution. This field is mimicking the behavior of
89
   * {@link PendingActivityInfo#getLastFailure()} that is maintained by the server in the mutable
90
   * state and is used to create ActivityFailures with meaningful causes and returned by {@link
91
   * io.temporal.api.workflowservice.v1.WorkflowServiceGrpc.WorkflowServiceBlockingStub#describeWorkflowExecution(DescribeWorkflowExecutionRequest)}
92
   */
93
  @Nullable
94
  public Failure getLastAttemptFailure() {
95
    return lastAttemptFailure.get();
1✔
96
  }
97

98
  @Nullable
99
  public Failure getPreviousExecutionFailure() {
100
    return executionParams.getPreviousLocalExecutionFailure();
1✔
101
  }
102

103
  @Nonnull
104
  public PollActivityTaskQueueResponse.Builder getInitialTask() {
105
    return executionParams.cloneActivityTaskBuilder();
1✔
106
  }
107

108
  @Nonnull
109
  public PollActivityTaskQueueResponse.Builder getNextAttemptActivityTask(
110
      @Nullable Failure lastFailure) {
111
    // synchronization here is not absolutely needed as LocalActivityWorker#scheduleNextAttempt
112
    // shouldn't be executed concurrently. But to make sure this code is safe for future changes,
113
    // let's make this method atomic and protect thread-unsafe protobuf builder modification.
114

115
    // executionResult here is used just as an internal monitor object that is final and never
116
    // escapes the class
117
    int nextAttempt;
118
    synchronized (executionResult) {
1✔
119
      nextAttempt = currentAttempt.incrementAndGet();
1✔
120
      if (lastFailure != null) {
1✔
121
        this.lastAttemptFailure.set(lastFailure);
1✔
122
      }
123
    }
1✔
124
    // doesn't need to be synchronized as we clone instead of modifying the original task builder
125
    return executionParams
1✔
126
        .cloneActivityTaskBuilder()
1✔
127
        .setAttempt(nextAttempt)
1✔
128
        .clearCurrentAttemptScheduledTime();
1✔
129
  }
130

131
  @Nullable
132
  public Deadline getScheduleToCloseDeadline() {
133
    return scheduleToCloseDeadline;
1✔
134
  }
135

136
  public void setScheduleToCloseFuture(@Nullable ScheduledFuture<?> scheduleToCloseFuture) {
137
    this.scheduleToCloseFuture = scheduleToCloseFuture;
1✔
138
  }
1✔
139

140
  public boolean callback(LocalActivityResult result) {
141
    if (scheduleToCloseFuture != null) {
1✔
142
      scheduleToCloseFuture.cancel(false);
1✔
143
    }
144
    return executionResult.complete(result);
1✔
145
  }
146

147
  @Nullable
148
  public Duration getScheduleToStartTimeout() {
149
    return executionParams.getScheduleToStartTimeout();
1✔
150
  }
151

152
  public long getOriginalScheduledTimestamp() {
153
    return executionParams.getOriginalScheduledTimestamp();
1✔
154
  }
155

156
  @Nonnull
157
  public Duration getLocalRetryThreshold() {
158
    return executionParams.getLocalRetryThreshold();
1✔
159
  }
160
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc