• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

temporalio / sdk-java / #343

31 Oct 2024 06:31PM UTC coverage: 75.148% (-3.6%) from 78.794%
#343

push

github

web-flow
Fix jacoco coverage (#2304)

5139 of 8240 branches covered (62.37%)

Branch coverage included in aggregate %.

22841 of 28993 relevant lines covered (78.78%)

0.79 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

90.72
/temporal-sdk/src/main/java/io/temporal/internal/statemachines/VersionStateMachine.java
1
/*
2
 * Copyright (C) 2022 Temporal Technologies, Inc. All Rights Reserved.
3
 *
4
 * Copyright (C) 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved.
5
 *
6
 * Modifications copyright (C) 2017 Uber Technologies, Inc.
7
 *
8
 * Licensed under the Apache License, Version 2.0 (the "License");
9
 * you may not use this material except in compliance with the License.
10
 * You may obtain a copy of the License at
11
 *
12
 *   http://www.apache.org/licenses/LICENSE-2.0
13
 *
14
 * Unless required by applicable law or agreed to in writing, software
15
 * distributed under the License is distributed on an "AS IS" BASIS,
16
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17
 * See the License for the specific language governing permissions and
18
 * limitations under the License.
19
 */
20

21
package io.temporal.internal.statemachines;
22

23
import static io.temporal.internal.sync.WorkflowInternal.DEFAULT_VERSION;
24

25
import com.google.common.annotations.VisibleForTesting;
26
import com.google.common.base.Preconditions;
27
import com.google.common.base.Strings;
28
import io.temporal.api.command.v1.RecordMarkerCommandAttributes;
29
import io.temporal.api.enums.v1.CommandType;
30
import io.temporal.api.enums.v1.EventType;
31
import io.temporal.api.history.v1.HistoryEvent;
32
import io.temporal.internal.history.VersionMarkerUtils;
33
import io.temporal.worker.NonDeterministicException;
34
import io.temporal.workflow.Functions;
35
import java.util.Objects;
36
import javax.annotation.Nullable;
37

38
final class VersionStateMachine {
39
  private static final String RETROACTIVE_ADDITION_ERROR_STRING =
40
      "The most probable cause is retroactive addition of a getVersion call with an existing 'changeId'";
41

42
  private final String changeId;
43
  private final Functions.Func<Boolean> replaying;
44
  private final Functions.Proc1<CancellableCommand> commandSink;
45
  private final Functions.Proc1<StateMachine> stateMachineSink;
46

47
  @Nullable private Integer version;
48

49
  /**
50
   * This variable is used for replay only. When we replay, we look one workflow task ahead and
51
   * preload all version markers to be able to return from Workflow.getVersion called in the event
52
   * loop the same way as we return during the original execution (without a full block to trigger
53
   * workflow task end and match with command events) These preloaded versions are converted and
54
   * moved into {@link #version} field when we actually process and match the version marker event
55
   * after workflow task is finished / event loop is blocked.
56
   */
57
  @Nullable private Integer preloadedVersion;
58

59
  enum ExplicitEvent {
1✔
60
    CHECK_EXECUTION_STATE,
1✔
61
    SCHEDULE,
1✔
62
    NON_MATCHING_EVENT
1✔
63
  }
64

65
  enum State {
1✔
66
    CREATED,
1✔
67

68
    EXECUTING,
1✔
69
    MARKER_COMMAND_CREATED,
1✔
70
    SKIPPED,
1✔
71
    RESULT_NOTIFIED,
1✔
72

73
    REPLAYING,
1✔
74
    MARKER_COMMAND_CREATED_REPLAYING,
1✔
75
    SKIPPED_REPLAYING,
1✔
76
    RESULT_NOTIFIED_REPLAYING,
1✔
77

78
    MARKER_COMMAND_RECORDED,
1✔
79
    SKIPPED_NOTIFIED,
1✔
80
  }
81

82
  public static final StateMachineDefinition<State, ExplicitEvent, InvocationStateMachine>
83
      STATE_MACHINE_DEFINITION =
1✔
84
          StateMachineDefinition.<State, ExplicitEvent, InvocationStateMachine>newInstance(
1✔
85
                  "Version", State.CREATED, State.MARKER_COMMAND_RECORDED, State.SKIPPED_NOTIFIED)
86
              .add(
1✔
87
                  State.CREATED,
88
                  ExplicitEvent.CHECK_EXECUTION_STATE,
89
                  new State[] {State.REPLAYING, State.EXECUTING},
90
                  InvocationStateMachine::getExecutionState)
91
              .add(
1✔
92
                  State.EXECUTING,
93
                  ExplicitEvent.SCHEDULE,
94
                  new State[] {State.MARKER_COMMAND_CREATED, State.SKIPPED},
95
                  InvocationStateMachine::createMarkerExecuting)
96
              .add(
1✔
97
                  State.MARKER_COMMAND_CREATED,
98
                  CommandType.COMMAND_TYPE_RECORD_MARKER,
99
                  State.RESULT_NOTIFIED,
100
                  InvocationStateMachine::notifyFromVersionExecuting)
101
              .add(
1✔
102
                  State.RESULT_NOTIFIED,
103
                  EventType.EVENT_TYPE_MARKER_RECORDED,
104
                  State.MARKER_COMMAND_RECORDED)
105
              .add(
1✔
106
                  State.SKIPPED,
107
                  CommandType.COMMAND_TYPE_RECORD_MARKER,
108
                  State.SKIPPED_NOTIFIED,
109
                  InvocationStateMachine::notifySkippedExecuting)
110
              .add(
1✔
111
                  State.REPLAYING,
112
                  ExplicitEvent.SCHEDULE,
113
                  new State[] {State.MARKER_COMMAND_CREATED_REPLAYING, State.SKIPPED_REPLAYING},
114
                  InvocationStateMachine::createMarkerReplaying)
115
              .add(
1✔
116
                  State.MARKER_COMMAND_CREATED_REPLAYING,
117
                  CommandType.COMMAND_TYPE_RECORD_MARKER,
118
                  State.RESULT_NOTIFIED_REPLAYING,
119
                  InvocationStateMachine::notifyMarkerCreatedReplaying)
120
              .add(
1✔
121
                  State.RESULT_NOTIFIED_REPLAYING,
122
                  EventType.EVENT_TYPE_MARKER_RECORDED,
123
                  State.MARKER_COMMAND_RECORDED,
124
                  InvocationStateMachine::flushPreloadedVersionAndUpdateFromEventReplaying)
125
              .add(
1✔
126
                  State.RESULT_NOTIFIED_REPLAYING,
127
                  ExplicitEvent.NON_MATCHING_EVENT,
128
                  State.SKIPPED_NOTIFIED,
129
                  InvocationStateMachine::missingMarkerReplaying)
130
              .add(
1✔
131
                  State.SKIPPED_REPLAYING,
132
                  CommandType.COMMAND_TYPE_RECORD_MARKER,
133
                  State.SKIPPED_NOTIFIED,
134
                  InvocationStateMachine::notifySkippedReplaying);
135

136
  /** Represents a single invocation of version. */
137
  @VisibleForTesting
138
  class InvocationStateMachine
139
      extends EntityStateMachineInitialCommand<State, ExplicitEvent, InvocationStateMachine> {
140

141
    private final int minSupported;
142
    private final int maxSupported;
143

144
    private final Functions.Proc2<Integer, RuntimeException> resultCallback;
145

146
    InvocationStateMachine(
147
        int minSupported, int maxSupported, Functions.Proc2<Integer, RuntimeException> callback) {
1✔
148
      super(STATE_MACHINE_DEFINITION, VersionStateMachine.this.commandSink, stateMachineSink);
1✔
149
      this.minSupported = minSupported;
1✔
150
      this.maxSupported = maxSupported;
1✔
151
      this.resultCallback = Objects.requireNonNull(callback);
1✔
152
    }
1✔
153

154
    State getExecutionState() {
155
      return replaying.apply() ? State.REPLAYING : State.EXECUTING;
1✔
156
    }
157

158
    @Override
159
    public WorkflowStateMachines.HandleEventStatus handleEvent(
160
        HistoryEvent event, boolean hasNextEvent) {
161

162
      if (!VersionMarkerUtils.hasVersionMarkerStructure(event)) {
1✔
163
        // This event is not a version marker event, and it can't be handled as non-matching version
164
        // event.
165
        // So the best we can do with this state machine is to consider the event non-matching and
166
        // cancel the command produced by this state machine.
167
        // Then, give the event a chance to match with the next command.
168
        explicitEvent(ExplicitEvent.NON_MATCHING_EVENT);
1✔
169
        return WorkflowStateMachines.HandleEventStatus.NON_MATCHING_EVENT;
1✔
170
      }
171

172
      String expectedId = VersionMarkerUtils.getChangeId(event.getMarkerRecordedEventAttributes());
1✔
173
      if (Strings.isNullOrEmpty(expectedId)) {
1!
174
        throw new IllegalStateException(
×
175
            "Version machine found in the history, but it doesn't contain a change id");
176
      }
177

178
      if (!changeId.equals(expectedId)) {
1✔
179
        // Do not call explicitEvent(ExplicitEvent.NON_MATCHING_EVENT) here as the event with
180
        // a different changeId can be followed by an event with our changeId.
181
        // An event will be handled as non-matching history event, it's fine.
182
        return WorkflowStateMachines.HandleEventStatus.NON_MATCHING_EVENT;
1✔
183
      }
184
      return super.handleEvent(event, hasNextEvent);
1✔
185
    }
186

187
    @Override
188
    public void handleWorkflowTaskStarted() {
189
      // Accounts for the case when there are no other events following the expected version marker,
190
      // so handleEvent
191
      // has no chance to trigger ExplicitEvent.NON_MATCHING_EVENT.
192
      // Also needed for subsequent getVersion calls that has no matching events and are not getting
193
      // explicit handleEvent calls with non-matching events, because they could be located after
194
      // the last command event is matched.
195
      if (getState() == State.RESULT_NOTIFIED_REPLAYING) {
1!
196
        Preconditions.checkState(
×
197
            preloadedVersion == null, "preloadedVersion is expected to be flushed or never set");
×
198
        explicitEvent(ExplicitEvent.NON_MATCHING_EVENT);
×
199
      }
200
    }
1✔
201

202
    void createFakeCommand() {
203
      addCommand(StateMachineCommandUtils.RECORD_MARKER_FAKE_COMMAND);
1✔
204
    }
1✔
205

206
    private void validateVersionAndThrow(boolean preloaded) {
207
      Integer versionToUse = preloaded ? preloadedVersion : version;
1✔
208

209
      if (versionToUse == null) {
1!
210
        throw new IllegalStateException((preloaded ? "preloaded " : "") + " version not set");
×
211
      }
212
      if (versionToUse < minSupported || versionToUse > maxSupported) {
1!
213
        throw new UnsupportedVersion.UnsupportedVersionException(
1✔
214
            String.format(
1✔
215
                "Version %d of changeId %s is not supported. Supported v is between %d and %d.",
216
                versionToUse, changeId, minSupported, maxSupported));
1✔
217
      }
218
    }
1✔
219

220
    void notifyFromVersion(boolean preloaded) {
221
      Integer versionToUse = preloaded ? preloadedVersion : version;
1✔
222
      resultCallback.apply(versionToUse, null);
1✔
223
    }
1✔
224

225
    void notifyFromException(RuntimeException ex) {
226
      resultCallback.apply(null, ex);
1✔
227
    }
1✔
228

229
    void notifyFromVersionExecuting() {
230
      // the only case when we don't need to validate before notification because
231
      // we just initialized the version with maxVersion
232
      notifyFromVersion(false);
1✔
233
    }
1✔
234

235
    State createMarkerExecuting() {
236
      if (version != null) {
1✔
237
        // version for this change-id is already set and no maker should be created
238
        addCommand(StateMachineCommandUtils.RECORD_MARKER_FAKE_COMMAND);
1✔
239
        return State.SKIPPED;
1✔
240
      } else {
241
        version = maxSupported;
1✔
242
        RecordMarkerCommandAttributes markerAttributes =
1✔
243
            VersionMarkerUtils.createMarkerAttributes(changeId, version);
1✔
244
        addCommand(StateMachineCommandUtils.createRecordMarker(markerAttributes));
1✔
245
        return State.MARKER_COMMAND_CREATED;
1✔
246
      }
247
    }
248

249
    void notifySkippedExecuting() {
250
      cancelCommand();
1✔
251
      try {
252
        // It's an original execution, and we are in the skipping mode,
253
        // so we use a version from the original execution set by an earlier getVersion invocation
254
        final boolean usePreloadedVersion = false;
1✔
255
        validateVersionAndThrow(usePreloadedVersion);
1✔
256
        notifyFromVersion(usePreloadedVersion);
1✔
257
      } catch (RuntimeException ex) {
1✔
258
        notifyFromException(ex);
1✔
259
      }
1✔
260
    }
1✔
261

262
    void notifyMarkerCreatedReplaying() {
263
      try {
264
        // it's a replay and the version to return from the getVersion call should be preloaded from
265
        // the history
266
        final boolean usePreloadedVersion = true;
1✔
267
        validateVersionAndThrow(usePreloadedVersion);
1✔
268
        notifyFromVersion(usePreloadedVersion);
1✔
269
      } catch (RuntimeException ex) {
1✔
270
        notifyFromException(ex);
1✔
271
      }
1✔
272
    }
1✔
273

274
    State createMarkerReplaying() {
275
      createFakeCommand();
1✔
276
      if (preloadedVersion != null) {
1✔
277
        return State.MARKER_COMMAND_CREATED_REPLAYING;
1✔
278
      } else {
279
        return State.SKIPPED_REPLAYING;
1✔
280
      }
281
    }
282

283
    void flushPreloadedVersionAndUpdateFromEventReplaying() {
284
      Preconditions.checkState(
1✔
285
          preloadedVersion != null, "preloadedVersion is expected to be initialized");
1!
286
      flushPreloadedVersionAndUpdateFromEvent(currentEvent);
1✔
287
    }
1✔
288

289
    void notifySkippedReplaying() {
290
      cancelCommand();
1✔
291
      if (version == null) {
1✔
292
        // We are in replay and in a skipped state, which means there was no matching marker in the
293
        // history
294
        // getVersion call wasn't here during the original execution, so we have to assume the
295
        // version to be DEFAULT_VERSION
296
        version = DEFAULT_VERSION;
1✔
297
      }
298
      try {
299
        // we are in the skipped state, so no preloaded event is expected from the history
300
        final boolean usePreloadedVersion = false;
1✔
301
        validateVersionAndThrow(usePreloadedVersion);
1✔
302
        notifyFromVersion(usePreloadedVersion);
1✔
303
      } catch (RuntimeException ex) {
1✔
304
        notifyFromException(ex);
1✔
305
      }
1✔
306
    }
1✔
307

308
    void missingMarkerReplaying() {
309
      if (preloadedVersion != null) {
1✔
310
        // 1. There is a version marker for the changeId, because there is a preloaded version.
311
        // 2. This version marker doesn't match this command, which means this version marker is
312
        // recorded later than this command. Otherwise, it would be flushed already by either a
313
        // matched event or earlier non-matched version marker.
314
        //
315
        // So, preloadedVersion != null means that this getVersion call is added before the original
316
        // getVersion call that caused the recording of the marker.
317
        throw new NonDeterministicException(
1✔
318
            "getVersion call before the existing version marker event. "
319
                + RETROACTIVE_ADDITION_ERROR_STRING);
320
      }
321
      cancelCommand();
1✔
322
    }
1✔
323
  }
324

325
  private void updateVersionFromEvent(HistoryEvent event) {
326
    if (version != null) {
1!
327
      throw new NonDeterministicException(
×
328
          "Version is already set to " + version + ". " + RETROACTIVE_ADDITION_ERROR_STRING);
329
    }
330
    version = getVersionFromEvent(event);
1✔
331
  }
1✔
332

333
  private void preloadVersionFromEvent(HistoryEvent event) {
334
    if (version != null) {
1✔
335
      throw new NonDeterministicException(
1✔
336
          "Version is already set to " + version + ". " + RETROACTIVE_ADDITION_ERROR_STRING);
337
    }
338

339
    Preconditions.checkState(
1!
340
        preloadedVersion == null,
341
        "Preloaded version is already set to %s. "
342
            + "Most likely the history has several version marker events for the same 'changeId'",
343
        preloadedVersion);
344

345
    preloadedVersion = getVersionFromEvent(event);
1✔
346
  }
1✔
347

348
  void flushPreloadedVersionAndUpdateFromEvent(HistoryEvent event) {
349
    updateVersionFromEvent(event);
1✔
350
    preloadedVersion = null;
1✔
351
  }
1✔
352

353
  /** Creates new VersionStateMachine */
354
  public static VersionStateMachine newInstance(
355
      String id,
356
      Functions.Func<Boolean> replaying,
357
      Functions.Proc1<CancellableCommand> commandSink,
358
      Functions.Proc1<StateMachine> stateMachineSink) {
359
    return new VersionStateMachine(id, replaying, commandSink, stateMachineSink);
1✔
360
  }
361

362
  private VersionStateMachine(
363
      String changeId,
364
      Functions.Func<Boolean> replaying,
365
      Functions.Proc1<CancellableCommand> commandSink,
366
      Functions.Proc1<StateMachine> stateMachineSink) {
1✔
367
    this.changeId = Objects.requireNonNull(changeId);
1✔
368
    this.replaying = Objects.requireNonNull(replaying);
1✔
369
    this.commandSink = Objects.requireNonNull(commandSink);
1✔
370
    this.stateMachineSink = stateMachineSink;
1✔
371
  }
1✔
372

373
  /**
374
   * Get the version for this state machine.
375
   *
376
   * @param minSupported min version supported for the change
377
   * @param maxSupported max version supported for the change
378
   * @param callback used to return version
379
   * @return True if the identifier is not present in history
380
   */
381
  public boolean getVersion(
382
      int minSupported, int maxSupported, Functions.Proc2<Integer, RuntimeException> callback) {
383
    InvocationStateMachine ism = new InvocationStateMachine(minSupported, maxSupported, callback);
1✔
384
    ism.explicitEvent(ExplicitEvent.CHECK_EXECUTION_STATE);
1✔
385
    ism.explicitEvent(ExplicitEvent.SCHEDULE);
1✔
386
    //  If the state is SKIPPED_REPLAYING that means we:
387
    //    1. Are replaying
388
    //    2. We don't have a preloadedVersion
389
    // This means either this version marker did not exist in the original execution or
390
    // the version marker did exist, but was in an earlier WFT. If the version marker was in a
391
    // previous WFT then the version field should have a value.
392
    return !(ism.getState() == VersionStateMachine.State.SKIPPED_REPLAYING && version == null);
1✔
393
  }
394

395
  public void handleNonMatchingEvent(HistoryEvent event) {
396
    flushPreloadedVersionAndUpdateFromEvent(event);
1✔
397
  }
1✔
398

399
  public void handleMarkersPreload(HistoryEvent event) {
400
    preloadVersionFromEvent(event);
1✔
401
  }
1✔
402

403
  private int getVersionFromEvent(HistoryEvent event) {
404
    Preconditions.checkArgument(
1✔
405
        VersionMarkerUtils.hasVersionMarkerStructure(event),
1✔
406
        "Expected a version marker event, got %s with '%s' marker name",
407
        event.getEventType(),
1✔
408
        event.getMarkerRecordedEventAttributes().getMarkerName());
1✔
409

410
    String eventChangeId = VersionMarkerUtils.getChangeId(event.getMarkerRecordedEventAttributes());
1✔
411
    Preconditions.checkArgument(
1✔
412
        this.changeId.equals(eventChangeId),
1✔
413
        "Got an event with an incorrect changeId, expected: %s, got %s",
414
        this.changeId,
415
        eventChangeId);
416

417
    Integer version = VersionMarkerUtils.getVersion(event.getMarkerRecordedEventAttributes());
1✔
418
    Preconditions.checkArgument(version != null, "Marker details missing required version key");
1!
419

420
    return version;
1✔
421
  }
422
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc