• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

temporalio / sdk-java / #103

pending completion
#103

push

github-actions

web-flow
Implement retry of local activities for over local retry threshold duration (#1542)

Issue #1261

244 of 244 new or added lines in 16 files covered. (100.0%)

16122 of 19841 relevant lines covered (81.26%)

0.81 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

93.24
/temporal-sdk/src/main/java/io/temporal/internal/statemachines/LocalActivityStateMachine.java
1
/*
2
 * Copyright (C) 2022 Temporal Technologies, Inc. All Rights Reserved.
3
 *
4
 * Copyright (C) 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved.
5
 *
6
 * Modifications copyright (C) 2017 Uber Technologies, Inc.
7
 *
8
 * Licensed under the Apache License, Version 2.0 (the "License");
9
 * you may not use this material except in compliance with the License.
10
 * You may obtain a copy of the License at
11
 *
12
 *   http://www.apache.org/licenses/LICENSE-2.0
13
 *
14
 * Unless required by applicable law or agreed to in writing, software
15
 * distributed under the License is distributed on an "AS IS" BASIS,
16
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17
 * See the License for the specific language governing permissions and
18
 * limitations under the License.
19
 */
20

21
package io.temporal.internal.statemachines;
22

23
import com.google.common.base.Preconditions;
24
import io.temporal.api.command.v1.Command;
25
import io.temporal.api.command.v1.RecordMarkerCommandAttributes;
26
import io.temporal.api.common.v1.ActivityType;
27
import io.temporal.api.common.v1.Payloads;
28
import io.temporal.api.enums.v1.CommandType;
29
import io.temporal.api.enums.v1.EventType;
30
import io.temporal.api.failure.v1.ActivityFailureInfo;
31
import io.temporal.api.failure.v1.CanceledFailureInfo;
32
import io.temporal.api.failure.v1.Failure;
33
import io.temporal.api.history.v1.MarkerRecordedEventAttributes;
34
import io.temporal.api.workflowservice.v1.RespondActivityTaskCanceledRequest;
35
import io.temporal.api.workflowservice.v1.RespondActivityTaskCompletedRequest;
36
import io.temporal.common.converter.DefaultDataConverter;
37
import io.temporal.internal.history.LocalActivityMarkerMetadata;
38
import io.temporal.internal.history.LocalActivityMarkerUtils;
39
import io.temporal.internal.worker.LocalActivityResult;
40
import io.temporal.workflow.Functions;
41
import java.time.Duration;
42
import java.util.HashMap;
43
import java.util.Map;
44
import java.util.Optional;
45
import java.util.concurrent.TimeUnit;
46
import javax.annotation.Nullable;
47

48
final class LocalActivityStateMachine
49
    extends EntityStateMachineInitialCommand<
50
        LocalActivityStateMachine.State,
51
        LocalActivityStateMachine.ExplicitEvent,
52
        LocalActivityStateMachine> {
53
  static final String LOCAL_ACTIVITY_FAILED_MESSAGE =
54
      "Local " + ActivityStateMachine.ACTIVITY_FAILED_MESSAGE;
55

56
  static final String LOCAL_ACTIVITY_TIMED_OUT_MESSAGE =
57
      "Local " + ActivityStateMachine.ACTIVITY_TIMED_OUT_MESSAGE;
58

59
  static final String LOCAL_ACTIVITY_CANCELED_MESSAGE =
60
      "Local " + ActivityStateMachine.ACTIVITY_CANCELED_MESSAGE;
61

62
  private final Functions.Proc1<ExecuteLocalActivityParameters> localActivityRequestSink;
63
  private final LocalActivityCallback callback;
64

65
  private ExecuteLocalActivityParameters localActivityParameters;
66
  private final Functions.Func<Boolean> replaying;
67
  /** Accepts proposed current time. Returns accepted current time. */
68
  private final Functions.Func1<Long, Long> setCurrentTimeCallback;
69

70
  private final String activityId;
71
  private final ActivityType activityType;
72
  // The value from the marker is taking over this value in case of replay
73
  private final long originalScheduledTimestamp;
74

75
  /** Workflow timestamp when the LA state machine is initialized */
76
  private final long workflowTimeMillisWhenStarted;
77
  /**
78
   * System.nanoTime result at the moment of LA state machine initialization. May be used to
79
   * calculate elapsed time
80
   */
81
  private final long systemNanoTimeWhenStarted;
82

83
  // These three fields are set when an actual execution was performed instead of a replay from the
84
  // LA marker
85
  private @Nullable LocalActivityResult executionResult;
86
  private @Nullable Optional<Payloads> executionSuccess;
87
  private @Nullable LocalActivityCallback.LocalActivityFailedException executionFailure;
88

89
  enum ExplicitEvent {
1✔
90
    CHECK_EXECUTION_STATE,
1✔
91
    SCHEDULE,
1✔
92
    MARK_AS_SENT,
1✔
93
    HANDLE_RESULT,
1✔
94
    NON_REPLAY_WORKFLOW_TASK_STARTED
1✔
95
  }
96

97
  enum State {
1✔
98
    CREATED,
1✔
99
    REPLAYING,
1✔
100
    EXECUTING,
1✔
101
    WAITING_MARKER_EVENT,
1✔
102
    REQUEST_PREPARED,
1✔
103
    REQUEST_SENT,
1✔
104
    MARKER_COMMAND_CREATED,
1✔
105
    RESULT_NOTIFIED,
1✔
106
    MARKER_COMMAND_RECORDED
1✔
107
  }
108

109
  public static final StateMachineDefinition<State, ExplicitEvent, LocalActivityStateMachine>
110
      STATE_MACHINE_DEFINITION =
1✔
111
          StateMachineDefinition.<State, ExplicitEvent, LocalActivityStateMachine>newInstance(
1✔
112
                  "LocalActivity", State.CREATED, State.MARKER_COMMAND_RECORDED)
113
              .add(
1✔
114
                  State.CREATED,
115
                  ExplicitEvent.CHECK_EXECUTION_STATE,
116
                  new State[] {State.REPLAYING, State.EXECUTING},
117
                  LocalActivityStateMachine::getExecutionState)
118
              .add(
1✔
119
                  State.EXECUTING,
120
                  ExplicitEvent.SCHEDULE,
121
                  State.REQUEST_PREPARED,
122
                  LocalActivityStateMachine::sendRequest)
123
              .add(State.REQUEST_PREPARED, ExplicitEvent.MARK_AS_SENT, State.REQUEST_SENT)
1✔
124
              .add(
1✔
125
                  State.REQUEST_SENT,
126
                  ExplicitEvent.NON_REPLAY_WORKFLOW_TASK_STARTED,
127
                  State.REQUEST_SENT)
128
              .add(
1✔
129
                  State.REQUEST_SENT,
130
                  ExplicitEvent.HANDLE_RESULT,
131
                  State.MARKER_COMMAND_CREATED,
132
                  LocalActivityStateMachine::createMarker)
133
              .add(
1✔
134
                  State.MARKER_COMMAND_CREATED,
135
                  CommandType.COMMAND_TYPE_RECORD_MARKER,
136
                  State.RESULT_NOTIFIED,
137
                  LocalActivityStateMachine::notifyResultFromResponse)
138
              .add(
1✔
139
                  State.RESULT_NOTIFIED,
140
                  EventType.EVENT_TYPE_MARKER_RECORDED,
141
                  State.MARKER_COMMAND_RECORDED)
142
              .add(State.REPLAYING, ExplicitEvent.SCHEDULE, State.WAITING_MARKER_EVENT)
1✔
143
              .add(
1✔
144
                  State.WAITING_MARKER_EVENT,
145
                  EventType.EVENT_TYPE_MARKER_RECORDED,
146
                  State.MARKER_COMMAND_RECORDED,
147
                  LocalActivityStateMachine::notifyResultFromEvent)
148
              .add(
1✔
149
                  // This is to cover the following edge case:
150
                  // 1. WorkflowTaskStarted
151
                  // 2. Local activity scheduled
152
                  // 3. Local activity taken and started execution
153
                  // 4. Forced workflow task is started
154
                  // 5. Workflow task fails or worker crashes
155
                  // When replaying the above sequence without this state transition the local
156
                  // activity
157
                  // scheduled at step 2 is going to be lost.
158
                  State.WAITING_MARKER_EVENT,
159
                  ExplicitEvent.NON_REPLAY_WORKFLOW_TASK_STARTED,
160
                  State.REQUEST_PREPARED,
161
                  LocalActivityStateMachine::sendRequest);
162

163
  /**
164
   * Creates new local activity marker
165
   *
166
   * @param localActivityParameters used to produce side effect value. null if replaying.
167
   * @param callback returns side effect value or failure
168
   * @param commandSink callback to send commands to
169
   */
170
  public static LocalActivityStateMachine newInstance(
171
      Functions.Func<Boolean> replaying,
172
      Functions.Func1<Long, Long> setCurrentTimeCallback,
173
      ExecuteLocalActivityParameters localActivityParameters,
174
      LocalActivityCallback callback,
175
      Functions.Proc1<ExecuteLocalActivityParameters> localActivityRequestSink,
176
      Functions.Proc1<CancellableCommand> commandSink,
177
      Functions.Proc1<StateMachine> stateMachineSink,
178
      long workflowTimeMillisWhenStarted) {
179
    return new LocalActivityStateMachine(
1✔
180
        replaying,
181
        setCurrentTimeCallback,
182
        localActivityParameters,
183
        callback,
184
        localActivityRequestSink,
185
        commandSink,
186
        stateMachineSink,
187
        workflowTimeMillisWhenStarted,
188
        System.nanoTime());
1✔
189
  }
190

191
  private LocalActivityStateMachine(
192
      Functions.Func<Boolean> replaying,
193
      Functions.Func1<Long, Long> setCurrentTimeCallback,
194
      ExecuteLocalActivityParameters localActivityParameters,
195
      LocalActivityCallback callback,
196
      Functions.Proc1<ExecuteLocalActivityParameters> localActivityRequestSink,
197
      Functions.Proc1<CancellableCommand> commandSink,
198
      Functions.Proc1<StateMachine> stateMachineSink,
199
      long workflowTimeMillisWhenStarted,
200
      long systemNanoTimeWhenStarted) {
201
    super(STATE_MACHINE_DEFINITION, commandSink, stateMachineSink);
1✔
202
    this.replaying = replaying;
1✔
203
    this.setCurrentTimeCallback = setCurrentTimeCallback;
1✔
204
    this.localActivityParameters = localActivityParameters;
1✔
205
    this.activityId = localActivityParameters.getActivityId();
1✔
206
    this.activityType = localActivityParameters.getActivityType();
1✔
207
    this.originalScheduledTimestamp = localActivityParameters.getOriginalScheduledTimestamp();
1✔
208
    this.localActivityRequestSink = localActivityRequestSink;
1✔
209
    this.callback = callback;
1✔
210
    this.workflowTimeMillisWhenStarted = workflowTimeMillisWhenStarted;
1✔
211
    this.systemNanoTimeWhenStarted = systemNanoTimeWhenStarted;
1✔
212
    explicitEvent(ExplicitEvent.CHECK_EXECUTION_STATE);
1✔
213
    explicitEvent(ExplicitEvent.SCHEDULE);
1✔
214
  }
1✔
215

216
  State getExecutionState() {
217
    return replaying.apply() ? State.REPLAYING : State.EXECUTING;
1✔
218
  }
219

220
  public void cancel() {
221
    // TODO(maxim): Cancellation of local activity.
222
    //    if (!isFinalState()) {
223
    //      explicitEvent(ExplicitEvent.CANCEL);
224
    //    }
225
  }
×
226

227
  public void sendRequest() {
228
    localActivityRequestSink.apply(localActivityParameters);
1✔
229
    if (localActivityParameters.isDoNotIncludeArgumentsIntoMarker()) {
1✔
230
      // avoid retaining parameters for the duration of activity execution
231
      localActivityParameters = null;
1✔
232
    }
233
  }
1✔
234

235
  public void markAsSent() {
236
    explicitEvent(ExplicitEvent.MARK_AS_SENT);
1✔
237
  }
1✔
238

239
  public void handleCompletion(LocalActivityResult result) {
240
    this.executionResult = result;
1✔
241
    explicitEvent(ExplicitEvent.HANDLE_RESULT);
1✔
242
  }
1✔
243

244
  /** Called once per workflow task for the last WorkflowTaskStarted event in the history. */
245
  public void nonReplayWorkflowTaskStarted() {
246
    explicitEvent(ExplicitEvent.NON_REPLAY_WORKFLOW_TASK_STARTED);
1✔
247
  }
1✔
248

249
  private void createMarker() {
250
    RecordMarkerCommandAttributes.Builder markerAttributes =
251
        RecordMarkerCommandAttributes.newBuilder();
1✔
252
    Map<String, Payloads> details = new HashMap<>();
1✔
253
    if (!replaying.apply()) {
1✔
254
      markerAttributes.setMarkerName(LocalActivityMarkerUtils.MARKER_NAME);
1✔
255
      Payloads id = DefaultDataConverter.STANDARD_INSTANCE.toPayloads(activityId).get();
1✔
256
      details.put(LocalActivityMarkerUtils.MARKER_ACTIVITY_ID_KEY, id);
1✔
257
      Payloads type =
1✔
258
          DefaultDataConverter.STANDARD_INSTANCE.toPayloads(activityType.getName()).get();
1✔
259
      details.put(LocalActivityMarkerUtils.MARKER_ACTIVITY_TYPE_KEY, type);
1✔
260

261
      long elapsedNanoseconds = System.nanoTime() - systemNanoTimeWhenStarted;
1✔
262
      long currentTime =
1✔
263
          setCurrentTimeCallback.apply(
1✔
264
              workflowTimeMillisWhenStarted + TimeUnit.NANOSECONDS.toMillis(elapsedNanoseconds));
1✔
265
      Payloads t = DefaultDataConverter.STANDARD_INSTANCE.toPayloads(currentTime).get();
1✔
266
      details.put(LocalActivityMarkerUtils.MARKER_TIME_KEY, t);
1✔
267

268
      if (localActivityParameters != null
1✔
269
          && !localActivityParameters.isDoNotIncludeArgumentsIntoMarker()) {
1✔
270
        details.put(
1✔
271
            LocalActivityMarkerUtils.MARKER_ACTIVITY_INPUT_KEY, localActivityParameters.getInput());
1✔
272
      }
273
      Preconditions.checkState(
1✔
274
          executionResult != null,
275
          "Local activity execution result should be populated before triggering createMarker()");
276
      final LocalActivityMarkerMetadata localActivityMarkerMetadata =
1✔
277
          new LocalActivityMarkerMetadata(
278
              executionResult.getLastAttempt(), originalScheduledTimestamp);
1✔
279
      if (executionResult.getExecutionCompleted() != null) {
1✔
280
        RespondActivityTaskCompletedRequest completed = executionResult.getExecutionCompleted();
1✔
281
        if (completed.hasResult()) {
1✔
282
          Payloads p = completed.getResult();
1✔
283
          executionSuccess = Optional.of(p);
1✔
284
          details.put(LocalActivityMarkerUtils.MARKER_ACTIVITY_RESULT_KEY, p);
1✔
285
        } else {
1✔
286
          executionSuccess = Optional.empty();
1✔
287
        }
288
      } else if (executionResult.getExecutionFailed() != null) {
1✔
289
        LocalActivityResult.ExecutionFailedResult failedResult =
1✔
290
            executionResult.getExecutionFailed();
1✔
291
        String message =
292
            failedResult.isTimeout()
1✔
293
                ? LOCAL_ACTIVITY_TIMED_OUT_MESSAGE
1✔
294
                : LOCAL_ACTIVITY_FAILED_MESSAGE;
1✔
295
        Failure failure =
296
            Failure.newBuilder()
1✔
297
                .setMessage(message)
1✔
298
                .setActivityFailureInfo(
1✔
299
                    ActivityFailureInfo.newBuilder()
1✔
300
                        .setRetryState(failedResult.getRetryState())
1✔
301
                        .setActivityId(activityId)
1✔
302
                        .setActivityType(activityType))
1✔
303
                .setCause(failedResult.getFailure())
1✔
304
                .build();
1✔
305
        markerAttributes.setFailure(failure);
1✔
306

307
        localActivityMarkerMetadata.setBackoff(failedResult.getBackoff());
1✔
308
        executionFailure =
1✔
309
            new LocalActivityCallback.LocalActivityFailedException(
310
                failure,
311
                originalScheduledTimestamp,
312
                localActivityMarkerMetadata.getAttempt(),
1✔
313
                failedResult.getBackoff());
1✔
314
      } else if (executionResult.getExecutionCanceled() != null) {
1✔
315
        RespondActivityTaskCanceledRequest failed = executionResult.getExecutionCanceled();
×
316
        Failure failure =
317
            Failure.newBuilder()
×
318
                .setMessage(LOCAL_ACTIVITY_CANCELED_MESSAGE)
×
319
                .setCanceledFailureInfo(
×
320
                    CanceledFailureInfo.newBuilder().setDetails(failed.getDetails()))
×
321
                .build();
×
322
        markerAttributes.setFailure(failure);
×
323
        executionFailure =
×
324
            new LocalActivityCallback.LocalActivityFailedException(
325
                failure,
326
                originalScheduledTimestamp,
327
                localActivityMarkerMetadata.getAttempt(),
×
328
                null);
329
      }
330

331
      details.put(
1✔
332
          LocalActivityMarkerUtils.MARKER_METADATA_KEY,
333
          DefaultDataConverter.STANDARD_INSTANCE.toPayloads(localActivityMarkerMetadata).get());
1✔
334
      markerAttributes.putAllDetails(details);
1✔
335
    }
336
    addCommand(
1✔
337
        Command.newBuilder()
1✔
338
            .setCommandType(CommandType.COMMAND_TYPE_RECORD_MARKER)
1✔
339
            .setRecordMarkerCommandAttributes(markerAttributes.build())
1✔
340
            .build());
1✔
341
  }
1✔
342

343
  private void notifyResultFromEvent() {
344
    MarkerRecordedEventAttributes attributes = currentEvent.getMarkerRecordedEventAttributes();
1✔
345
    Preconditions.checkState(
1✔
346
        LocalActivityMarkerUtils.hasLocalActivityStructure(currentEvent),
1✔
347
        "Expected " + LocalActivityMarkerUtils.MARKER_NAME + ", received: %s",
348
        attributes);
349
    long time =
1✔
350
        Preconditions.checkNotNull(
1✔
351
            LocalActivityMarkerUtils.getTime(attributes),
1✔
352
            "'time' payload of a LocalActivity marker can't be empty");
353
    setCurrentTimeCallback.apply(time);
1✔
354
    if (attributes.hasFailure()) {
1✔
355
      // In older markers metadata is missing
356
      @Nullable
357
      LocalActivityMarkerMetadata metadata = LocalActivityMarkerUtils.getMetadata(attributes);
1✔
358
      long originalScheduledTimestamp =
359
          metadata != null ? metadata.getOriginalScheduledTimestamp() : -1;
1✔
360
      int lastAttempt = metadata != null ? metadata.getAttempt() : 0;
1✔
361
      Duration backoff = metadata != null ? metadata.getBackoff() : null;
1✔
362
      LocalActivityCallback.LocalActivityFailedException localActivityFailedException =
1✔
363
          new LocalActivityCallback.LocalActivityFailedException(
364
              attributes.getFailure(), originalScheduledTimestamp, lastAttempt, backoff);
1✔
365
      callback.apply(null, localActivityFailedException);
1✔
366
    } else {
1✔
367
      Optional<Payloads> result =
1✔
368
          Optional.ofNullable(LocalActivityMarkerUtils.getResult(attributes));
1✔
369
      callback.apply(result, null);
1✔
370
    }
371
  }
1✔
372

373
  private void notifyResultFromResponse() {
374
    callback.apply(executionSuccess, executionFailure);
1✔
375
  }
1✔
376
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc