• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

temporalio / sdk-java / #343

31 Oct 2024 06:31PM UTC coverage: 75.148% (-3.6%) from 78.794%
#343

push

github

web-flow
Fix jacoco coverage (#2304)

5139 of 8240 branches covered (62.37%)

Branch coverage included in aggregate %.

22841 of 28993 relevant lines covered (78.78%)

0.79 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

91.06
/temporal-sdk/src/main/java/io/temporal/internal/statemachines/LocalActivityStateMachine.java
1
/*
2
 * Copyright (C) 2022 Temporal Technologies, Inc. All Rights Reserved.
3
 *
4
 * Copyright (C) 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved.
5
 *
6
 * Modifications copyright (C) 2017 Uber Technologies, Inc.
7
 *
8
 * Licensed under the Apache License, Version 2.0 (the "License");
9
 * you may not use this material except in compliance with the License.
10
 * You may obtain a copy of the License at
11
 *
12
 *   http://www.apache.org/licenses/LICENSE-2.0
13
 *
14
 * Unless required by applicable law or agreed to in writing, software
15
 * distributed under the License is distributed on an "AS IS" BASIS,
16
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17
 * See the License for the specific language governing permissions and
18
 * limitations under the License.
19
 */
20

21
package io.temporal.internal.statemachines;
22

23
import com.google.common.base.Preconditions;
24
import io.temporal.api.command.v1.Command;
25
import io.temporal.api.command.v1.RecordMarkerCommandAttributes;
26
import io.temporal.api.common.v1.ActivityType;
27
import io.temporal.api.common.v1.Payloads;
28
import io.temporal.api.enums.v1.CommandType;
29
import io.temporal.api.enums.v1.EventType;
30
import io.temporal.api.failure.v1.ActivityFailureInfo;
31
import io.temporal.api.failure.v1.CanceledFailureInfo;
32
import io.temporal.api.failure.v1.Failure;
33
import io.temporal.api.history.v1.MarkerRecordedEventAttributes;
34
import io.temporal.api.workflowservice.v1.RespondActivityTaskCanceledRequest;
35
import io.temporal.api.workflowservice.v1.RespondActivityTaskCompletedRequest;
36
import io.temporal.common.converter.DefaultDataConverter;
37
import io.temporal.internal.history.LocalActivityMarkerMetadata;
38
import io.temporal.internal.history.LocalActivityMarkerUtils;
39
import io.temporal.internal.worker.LocalActivityResult;
40
import io.temporal.workflow.Functions;
41
import java.time.Duration;
42
import java.util.HashMap;
43
import java.util.Map;
44
import java.util.Optional;
45
import java.util.concurrent.TimeUnit;
46
import javax.annotation.Nullable;
47

48
final class LocalActivityStateMachine
49
    extends EntityStateMachineInitialCommand<
50
        LocalActivityStateMachine.State,
51
        LocalActivityStateMachine.ExplicitEvent,
52
        LocalActivityStateMachine> {
53
  static final String LOCAL_ACTIVITY_FAILED_MESSAGE =
54
      "Local " + ActivityStateMachine.ACTIVITY_FAILED_MESSAGE;
55

56
  static final String LOCAL_ACTIVITY_TIMED_OUT_MESSAGE =
57
      "Local " + ActivityStateMachine.ACTIVITY_TIMED_OUT_MESSAGE;
58

59
  static final String LOCAL_ACTIVITY_CANCELED_MESSAGE =
60
      "Local " + ActivityStateMachine.ACTIVITY_CANCELED_MESSAGE;
61

62
  private final Functions.Proc1<ExecuteLocalActivityParameters> localActivityRequestSink;
63
  private final LocalActivityCallback callback;
64

65
  private ExecuteLocalActivityParameters localActivityParameters;
66
  private final Functions.Func<Boolean> replaying;
67

68
  /** Accepts proposed current time. Returns accepted current time. */
69
  private final Functions.Func1<Long, Long> setCurrentTimeCallback;
70

71
  private final String activityId;
72
  private final ActivityType activityType;
73
  // The value from the marker is taking over this value in case of replay
74
  private final long originalScheduledTimestamp;
75

76
  /** Workflow timestamp when the LA state machine is initialized */
77
  private final long workflowTimeMillisWhenStarted;
78

79
  /**
80
   * System.nanoTime result at the moment of LA state machine initialization. May be used to
81
   * calculate elapsed time
82
   */
83
  private final long systemNanoTimeWhenStarted;
84

85
  // These three fields are set when an actual execution was performed instead of a replay from the
86
  // LA marker
87
  private @Nullable LocalActivityResult executionResult;
88
  private @Nullable Optional<Payloads> executionSuccess;
89
  private @Nullable LocalActivityCallback.LocalActivityFailedException executionFailure;
90

91
  enum ExplicitEvent {
1✔
92
    CHECK_EXECUTION_STATE,
1✔
93
    SCHEDULE,
1✔
94
    MARK_AS_SENT,
1✔
95
    HANDLE_RESULT,
1✔
96
    NON_REPLAY_WORKFLOW_TASK_STARTED
1✔
97
  }
98

99
  enum State {
1✔
100
    CREATED,
1✔
101
    REPLAYING,
1✔
102
    EXECUTING,
1✔
103
    WAITING_MARKER_EVENT,
1✔
104
    REQUEST_PREPARED,
1✔
105
    REQUEST_SENT,
1✔
106
    MARKER_COMMAND_CREATED,
1✔
107
    RESULT_NOTIFIED,
1✔
108
    MARKER_COMMAND_RECORDED
1✔
109
  }
110

111
  public static final StateMachineDefinition<State, ExplicitEvent, LocalActivityStateMachine>
112
      STATE_MACHINE_DEFINITION =
1✔
113
          StateMachineDefinition.<State, ExplicitEvent, LocalActivityStateMachine>newInstance(
1✔
114
                  "LocalActivity", State.CREATED, State.MARKER_COMMAND_RECORDED)
115
              .add(
1✔
116
                  State.CREATED,
117
                  ExplicitEvent.CHECK_EXECUTION_STATE,
118
                  new State[] {State.REPLAYING, State.EXECUTING},
119
                  LocalActivityStateMachine::getExecutionState)
120
              .add(
1✔
121
                  State.EXECUTING,
122
                  ExplicitEvent.SCHEDULE,
123
                  State.REQUEST_PREPARED,
124
                  LocalActivityStateMachine::sendRequest)
125
              .add(State.REQUEST_PREPARED, ExplicitEvent.MARK_AS_SENT, State.REQUEST_SENT)
1✔
126
              // This is to cover an edge case where the event loop is
127
              // run more than once while processing a workflow task.
128
              // This can happen due to external cancellation
129
              .add(
1✔
130
                  State.REQUEST_PREPARED,
131
                  ExplicitEvent.NON_REPLAY_WORKFLOW_TASK_STARTED,
132
                  State.REQUEST_PREPARED)
133
              .add(
1✔
134
                  State.REQUEST_SENT,
135
                  ExplicitEvent.NON_REPLAY_WORKFLOW_TASK_STARTED,
136
                  State.REQUEST_SENT)
137
              .add(
1✔
138
                  State.REQUEST_SENT,
139
                  ExplicitEvent.HANDLE_RESULT,
140
                  State.MARKER_COMMAND_CREATED,
141
                  LocalActivityStateMachine::createMarker)
142
              .add(
1✔
143
                  State.MARKER_COMMAND_CREATED,
144
                  CommandType.COMMAND_TYPE_RECORD_MARKER,
145
                  State.RESULT_NOTIFIED,
146
                  LocalActivityStateMachine::notifyResultFromResponse)
147
              .add(
1✔
148
                  State.RESULT_NOTIFIED,
149
                  EventType.EVENT_TYPE_MARKER_RECORDED,
150
                  State.MARKER_COMMAND_RECORDED)
151
              .add(State.REPLAYING, ExplicitEvent.SCHEDULE, State.WAITING_MARKER_EVENT)
1✔
152
              .add(
1✔
153
                  State.WAITING_MARKER_EVENT,
154
                  EventType.EVENT_TYPE_MARKER_RECORDED,
155
                  State.MARKER_COMMAND_RECORDED,
156
                  LocalActivityStateMachine::notifyResultFromEvent)
157
              .add(
1✔
158
                  // This is to cover the following edge case:
159
                  // 1. WorkflowTaskStarted
160
                  // 2. Local activity scheduled
161
                  // 3. Local activity taken and started execution
162
                  // 4. Forced workflow task is started
163
                  // 5. Workflow task fails or worker crashes
164
                  // When replaying the above sequence without this state transition the local
165
                  // activity
166
                  // scheduled at step 2 is going to be lost.
167
                  State.WAITING_MARKER_EVENT,
168
                  ExplicitEvent.NON_REPLAY_WORKFLOW_TASK_STARTED,
169
                  State.REQUEST_PREPARED,
170
                  LocalActivityStateMachine::sendRequest);
171

172
  /**
173
   * Creates new local activity marker
174
   *
175
   * @param localActivityParameters used to produce side effect value. null if replaying.
176
   * @param callback returns side effect value or failure
177
   * @param commandSink callback to send commands to
178
   */
179
  public static LocalActivityStateMachine newInstance(
180
      Functions.Func<Boolean> replaying,
181
      Functions.Func1<Long, Long> setCurrentTimeCallback,
182
      ExecuteLocalActivityParameters localActivityParameters,
183
      LocalActivityCallback callback,
184
      Functions.Proc1<ExecuteLocalActivityParameters> localActivityRequestSink,
185
      Functions.Proc1<CancellableCommand> commandSink,
186
      Functions.Proc1<StateMachine> stateMachineSink,
187
      long workflowTimeMillisWhenStarted) {
188
    return new LocalActivityStateMachine(
1✔
189
        replaying,
190
        setCurrentTimeCallback,
191
        localActivityParameters,
192
        callback,
193
        localActivityRequestSink,
194
        commandSink,
195
        stateMachineSink,
196
        workflowTimeMillisWhenStarted,
197
        System.nanoTime());
1✔
198
  }
199

200
  private LocalActivityStateMachine(
201
      Functions.Func<Boolean> replaying,
202
      Functions.Func1<Long, Long> setCurrentTimeCallback,
203
      ExecuteLocalActivityParameters localActivityParameters,
204
      LocalActivityCallback callback,
205
      Functions.Proc1<ExecuteLocalActivityParameters> localActivityRequestSink,
206
      Functions.Proc1<CancellableCommand> commandSink,
207
      Functions.Proc1<StateMachine> stateMachineSink,
208
      long workflowTimeMillisWhenStarted,
209
      long systemNanoTimeWhenStarted) {
210
    super(STATE_MACHINE_DEFINITION, commandSink, stateMachineSink);
1✔
211
    this.replaying = replaying;
1✔
212
    this.setCurrentTimeCallback = setCurrentTimeCallback;
1✔
213
    this.localActivityParameters = localActivityParameters;
1✔
214
    this.activityId = localActivityParameters.getActivityId();
1✔
215
    this.activityType = localActivityParameters.getActivityType();
1✔
216
    this.originalScheduledTimestamp = localActivityParameters.getOriginalScheduledTimestamp();
1✔
217
    this.localActivityRequestSink = localActivityRequestSink;
1✔
218
    this.callback = callback;
1✔
219
    this.workflowTimeMillisWhenStarted = workflowTimeMillisWhenStarted;
1✔
220
    this.systemNanoTimeWhenStarted = systemNanoTimeWhenStarted;
1✔
221
    explicitEvent(ExplicitEvent.CHECK_EXECUTION_STATE);
1✔
222
    explicitEvent(ExplicitEvent.SCHEDULE);
1✔
223
  }
1✔
224

225
  State getExecutionState() {
226
    return replaying.apply() ? State.REPLAYING : State.EXECUTING;
1✔
227
  }
228

229
  public void cancel() {
230
    // TODO(maxim): Cancellation of local activity.
231
    //    if (!isFinalState()) {
232
    //      explicitEvent(ExplicitEvent.CANCEL);
233
    //    }
234
  }
×
235

236
  public void sendRequest() {
237
    localActivityRequestSink.apply(localActivityParameters);
1✔
238
    if (localActivityParameters.isDoNotIncludeArgumentsIntoMarker()) {
1✔
239
      // avoid retaining parameters for the duration of activity execution
240
      localActivityParameters = null;
1✔
241
    }
242
  }
1✔
243

244
  public void markAsSent() {
245
    explicitEvent(ExplicitEvent.MARK_AS_SENT);
1✔
246
  }
1✔
247

248
  public void handleCompletion(LocalActivityResult result) {
249
    this.executionResult = result;
1✔
250
    explicitEvent(ExplicitEvent.HANDLE_RESULT);
1✔
251
  }
1✔
252

253
  /** Called once per workflow task for the last WorkflowTaskStarted event in the history. */
254
  public void nonReplayWorkflowTaskStarted() {
255
    explicitEvent(ExplicitEvent.NON_REPLAY_WORKFLOW_TASK_STARTED);
1✔
256
  }
1✔
257

258
  private void createMarker() {
259
    RecordMarkerCommandAttributes.Builder markerAttributes =
260
        RecordMarkerCommandAttributes.newBuilder();
1✔
261
    Map<String, Payloads> details = new HashMap<>();
1✔
262
    if (!replaying.apply()) {
1!
263
      markerAttributes.setMarkerName(LocalActivityMarkerUtils.MARKER_NAME);
1✔
264
      Payloads id = DefaultDataConverter.STANDARD_INSTANCE.toPayloads(activityId).get();
1✔
265
      details.put(LocalActivityMarkerUtils.MARKER_ACTIVITY_ID_KEY, id);
1✔
266
      Payloads type =
1✔
267
          DefaultDataConverter.STANDARD_INSTANCE.toPayloads(activityType.getName()).get();
1✔
268
      details.put(LocalActivityMarkerUtils.MARKER_ACTIVITY_TYPE_KEY, type);
1✔
269

270
      long elapsedNanoseconds = System.nanoTime() - systemNanoTimeWhenStarted;
1✔
271
      long currentTime =
1✔
272
          setCurrentTimeCallback.apply(
1✔
273
              workflowTimeMillisWhenStarted + TimeUnit.NANOSECONDS.toMillis(elapsedNanoseconds));
1✔
274
      Payloads t = DefaultDataConverter.STANDARD_INSTANCE.toPayloads(currentTime).get();
1✔
275
      details.put(LocalActivityMarkerUtils.MARKER_TIME_KEY, t);
1✔
276

277
      if (localActivityParameters != null
1✔
278
          && !localActivityParameters.isDoNotIncludeArgumentsIntoMarker()) {
1!
279
        details.put(
1✔
280
            LocalActivityMarkerUtils.MARKER_ACTIVITY_INPUT_KEY, localActivityParameters.getInput());
1✔
281
      }
282
      Preconditions.checkState(
1!
283
          executionResult != null,
284
          "Local activity execution result should be populated before triggering createMarker()");
285
      final LocalActivityMarkerMetadata localActivityMarkerMetadata =
1✔
286
          new LocalActivityMarkerMetadata(
287
              executionResult.getLastAttempt(), originalScheduledTimestamp);
1✔
288
      if (executionResult.getExecutionCompleted() != null) {
1✔
289
        RespondActivityTaskCompletedRequest completed = executionResult.getExecutionCompleted();
1✔
290
        if (completed.hasResult()) {
1✔
291
          Payloads p = completed.getResult();
1✔
292
          executionSuccess = Optional.of(p);
1✔
293
          details.put(LocalActivityMarkerUtils.MARKER_ACTIVITY_RESULT_KEY, p);
1✔
294
        } else {
1✔
295
          executionSuccess = Optional.empty();
1✔
296
        }
297
      } else if (executionResult.getExecutionFailed() != null) {
1!
298
        LocalActivityResult.ExecutionFailedResult failedResult =
1✔
299
            executionResult.getExecutionFailed();
1✔
300
        String message =
301
            failedResult.isTimeout()
1✔
302
                ? LOCAL_ACTIVITY_TIMED_OUT_MESSAGE
1✔
303
                : LOCAL_ACTIVITY_FAILED_MESSAGE;
1✔
304
        Failure failure =
305
            Failure.newBuilder()
1✔
306
                .setMessage(message)
1✔
307
                .setActivityFailureInfo(
1✔
308
                    ActivityFailureInfo.newBuilder()
1✔
309
                        .setRetryState(failedResult.getRetryState())
1✔
310
                        .setActivityId(activityId)
1✔
311
                        .setActivityType(activityType))
1✔
312
                .setCause(failedResult.getFailure())
1✔
313
                .build();
1✔
314
        markerAttributes.setFailure(failure);
1✔
315

316
        localActivityMarkerMetadata.setBackoff(failedResult.getBackoff());
1✔
317
        executionFailure =
1✔
318
            new LocalActivityCallback.LocalActivityFailedException(
319
                failure,
320
                originalScheduledTimestamp,
321
                localActivityMarkerMetadata.getAttempt(),
1✔
322
                failedResult.getBackoff());
1✔
323
      } else if (executionResult.getExecutionCanceled() != null) {
1!
324
        RespondActivityTaskCanceledRequest failed = executionResult.getExecutionCanceled();
×
325
        Failure failure =
326
            Failure.newBuilder()
×
327
                .setMessage(LOCAL_ACTIVITY_CANCELED_MESSAGE)
×
328
                .setCanceledFailureInfo(
×
329
                    CanceledFailureInfo.newBuilder().setDetails(failed.getDetails()))
×
330
                .build();
×
331
        markerAttributes.setFailure(failure);
×
332
        executionFailure =
×
333
            new LocalActivityCallback.LocalActivityFailedException(
334
                failure,
335
                originalScheduledTimestamp,
336
                localActivityMarkerMetadata.getAttempt(),
×
337
                null);
338
      }
339

340
      details.put(
1✔
341
          LocalActivityMarkerUtils.MARKER_METADATA_KEY,
342
          DefaultDataConverter.STANDARD_INSTANCE.toPayloads(localActivityMarkerMetadata).get());
1✔
343
      markerAttributes.putAllDetails(details);
1✔
344
    }
345
    addCommand(
1✔
346
        Command.newBuilder()
1✔
347
            .setCommandType(CommandType.COMMAND_TYPE_RECORD_MARKER)
1✔
348
            .setRecordMarkerCommandAttributes(markerAttributes.build())
1✔
349
            .build());
1✔
350
  }
1✔
351

352
  private void notifyResultFromEvent() {
353
    MarkerRecordedEventAttributes attributes = currentEvent.getMarkerRecordedEventAttributes();
1✔
354
    Preconditions.checkState(
1✔
355
        LocalActivityMarkerUtils.hasLocalActivityStructure(currentEvent),
1✔
356
        "Expected " + LocalActivityMarkerUtils.MARKER_NAME + ", received: %s",
357
        attributes);
358
    long time =
1✔
359
        Preconditions.checkNotNull(
1✔
360
            LocalActivityMarkerUtils.getTime(attributes),
1✔
361
            "'time' payload of a LocalActivity marker can't be empty");
362
    setCurrentTimeCallback.apply(time);
1✔
363
    if (attributes.hasFailure()) {
1✔
364
      // In older markers metadata is missing
365
      @Nullable
366
      LocalActivityMarkerMetadata metadata = LocalActivityMarkerUtils.getMetadata(attributes);
1✔
367
      long originalScheduledTimestamp =
368
          metadata != null ? metadata.getOriginalScheduledTimestamp() : -1;
1✔
369
      int lastAttempt = metadata != null ? metadata.getAttempt() : 0;
1✔
370
      Duration backoff = metadata != null ? metadata.getBackoff() : null;
1✔
371
      LocalActivityCallback.LocalActivityFailedException localActivityFailedException =
1✔
372
          new LocalActivityCallback.LocalActivityFailedException(
373
              attributes.getFailure(), originalScheduledTimestamp, lastAttempt, backoff);
1✔
374
      callback.apply(null, localActivityFailedException);
1✔
375
    } else {
1✔
376
      Optional<Payloads> result =
1✔
377
          Optional.ofNullable(LocalActivityMarkerUtils.getResult(attributes));
1✔
378
      callback.apply(result, null);
1✔
379
    }
380
  }
1✔
381

382
  private void notifyResultFromResponse() {
383
    callback.apply(executionSuccess, executionFailure);
1✔
384
  }
1✔
385
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc