• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

temporalio / sdk-java / #269

10 Jun 2024 11:33PM UTC coverage: 77.444% (+0.05%) from 77.395%
#269

push

github

web-flow
Fix flake in resetWorkflowIdFromWorkflowTaskTest (#2105)

19255 of 24863 relevant lines covered (77.44%)

0.77 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

92.36
/temporal-sdk/src/main/java/io/temporal/internal/statemachines/WorkflowStateMachines.java
1
/*
2
 * Copyright (C) 2022 Temporal Technologies, Inc. All Rights Reserved.
3
 *
4
 * Copyright (C) 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved.
5
 *
6
 * Modifications copyright (C) 2017 Uber Technologies, Inc.
7
 *
8
 * Licensed under the Apache License, Version 2.0 (the "License");
9
 * you may not use this material except in compliance with the License.
10
 * You may obtain a copy of the License at
11
 *
12
 *   http://www.apache.org/licenses/LICENSE-2.0
13
 *
14
 * Unless required by applicable law or agreed to in writing, software
15
 * distributed under the License is distributed on an "AS IS" BASIS,
16
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17
 * See the License for the specific language governing permissions and
18
 * limitations under the License.
19
 */
20

21
package io.temporal.internal.statemachines;
22

23
import static io.temporal.api.enums.v1.CommandType.COMMAND_TYPE_PROTOCOL_MESSAGE;
24
import static io.temporal.internal.common.WorkflowExecutionUtils.getEventTypeForCommand;
25
import static io.temporal.internal.common.WorkflowExecutionUtils.isCommandEvent;
26
import static io.temporal.serviceclient.CheckedExceptionWrapper.unwrap;
27

28
import com.google.common.annotations.VisibleForTesting;
29
import com.google.common.base.Preconditions;
30
import com.google.common.base.Strings;
31
import com.google.protobuf.Any;
32
import io.temporal.api.command.v1.*;
33
import io.temporal.api.common.v1.*;
34
import io.temporal.api.enums.v1.EventType;
35
import io.temporal.api.failure.v1.Failure;
36
import io.temporal.api.history.v1.*;
37
import io.temporal.api.protocol.v1.Message;
38
import io.temporal.api.workflowservice.v1.GetSystemInfoResponse;
39
import io.temporal.failure.CanceledFailure;
40
import io.temporal.internal.common.*;
41
import io.temporal.internal.history.LocalActivityMarkerUtils;
42
import io.temporal.internal.history.VersionMarkerUtils;
43
import io.temporal.internal.sync.WorkflowThread;
44
import io.temporal.internal.worker.LocalActivityResult;
45
import io.temporal.worker.NonDeterministicException;
46
import io.temporal.workflow.ChildWorkflowCancellationType;
47
import io.temporal.workflow.Functions;
48
import java.nio.charset.StandardCharsets;
49
import java.util.*;
50
import javax.annotation.Nullable;
51

52
public final class WorkflowStateMachines {
53

54
  enum HandleEventStatus {
1✔
55
    OK,
1✔
56
    NON_MATCHING_EVENT
1✔
57
  }
58

59
  /** Initial set of SDK flags that will be set on all new workflow executions. */
60
  private static final List<SdkFlag> initialFlags =
1✔
61
      Collections.unmodifiableList(
1✔
62
          Collections.singletonList(SdkFlag.SKIP_YIELD_ON_DEFAULT_VERSION));
1✔
63

64
  /**
65
   * EventId of the WorkflowTaskStarted event of the Workflow Task that was picked up by a worker
66
   * and triggered a current replay or execution. It's expected to be the last event in the history
67
   * if we continue to execute the workflow.
68
   *
69
   * <p>For direct (legacy) queries, it may be:
70
   *
71
   * <ul>
72
   *   <li>0 if it's query a closed workflow execution
73
   *   <li>id of the last successfully completed Workflow Task if the workflow is not closed
74
   * </ul>
75
   *
76
   * <p>Set from the "outside" from the PollWorkflowTaskQueueResponse. Not modified by the SDK state
77
   * machines.
78
   */
79
  private long workflowTaskStartedEventId;
80

81
  /** EventId of the last WorkflowTaskStarted event handled by these state machines. */
82
  private long lastWFTStartedEventId;
83

84
  /** The Build ID used in the current WFT if already completed and set (may be null) */
85
  private String currentTaskBuildId;
86

87
  private long historySize;
88

89
  private boolean isContinueAsNewSuggested;
90

91
  /**
92
   * EventId of the last event seen by these state machines. Events earlier than this one will be
93
   * discarded.
94
   */
95
  private long lastHandledEventId;
96

97
  private final StatesMachinesCallback callbacks;
98

99
  /** Callback to send new commands to. */
100
  private final Functions.Proc1<CancellableCommand> commandSink;
101

102
  /**
103
   * currentRunId is used as seed by Workflow.newRandom and randomUUID. It allows to generate them
104
   * deterministically.
105
   */
106
  private String currentRunId;
107

108
  /** Used Workflow.newRandom and randomUUID together with currentRunId. */
109
  private long idCounter;
110

111
  /** Current workflow time. */
112
  private long currentTimeMillis = -1;
1✔
113

114
  private final Map<Long, EntityStateMachine> stateMachines = new HashMap<>();
1✔
115

116
  /** Key is the protocol instance id */
117
  private final Map<String, EntityStateMachine> protocolStateMachines = new HashMap<>();
1✔
118

119
  private final Queue<Message> messageOutbox = new ArrayDeque<>();
1✔
120

121
  private final Queue<CancellableCommand> commands = new ArrayDeque<>();
1✔
122

123
  /**
124
   * Commands generated by the currently processed workflow task. It is a queue as commands can be
125
   * added (due to marker based commands) while iterating over already added commands.
126
   */
127
  private final Queue<CancellableCommand> cancellableCommands = new ArrayDeque<>();
1✔
128

129
  /**
130
   * Is workflow executing new code or replaying from the history. The definition of replaying here
131
   * is that we are no longer replaying as soon as we see new events that have never been seen or
132
   * produced by the SDK.
133
   *
134
   * <p>Specifically, replay ends once we have seen any non-command event (IE: events that aren't a
135
   * result of something we produced in the SDK) on a WFT which has the final event in history
136
   * (meaning we are processing the most recent WFT and there are no more subsequent WFTs). WFT
137
   * Completed in this case does not count as a non-command event, because that will typically show
138
   * up as the first event in an incremental history, and we want to ignore it and its associated
139
   * commands since we "produced" them.
140
   *
141
   * <p>Note: that this flag ALWAYS flips to true for the time when we apply events from the server
142
   * even if the commands were created by an actual execution with replaying=false.
143
   */
144
  private boolean replaying;
145

146
  /** Used to ensure that event loop is not executed recursively. */
147
  private boolean eventLoopExecuting;
148

149
  /**
150
   * Used to avoid recursive calls to {@link #prepareCommands()}.
151
   *
152
   * <p>Such calls happen when sideEffects and localActivity markers are processed.
153
   */
154
  private boolean preparing;
155

156
  /** Key is mutable side effect id */
157
  private final Map<String, MutableSideEffectStateMachine> mutableSideEffects = new HashMap<>();
1✔
158

159
  /** Key is changeId */
160
  private final Map<String, VersionStateMachine> versions = new HashMap<>();
1✔
161

162
  /** Map of local activities by their id. */
163
  private final Map<String, LocalActivityStateMachine> localActivityMap = new HashMap<>();
1✔
164

165
  private List<ExecuteLocalActivityParameters> localActivityRequests = new ArrayList<>();
1✔
166

167
  private final Functions.Proc1<ExecuteLocalActivityParameters> localActivityRequestSink;
168
  private final Functions.Proc1<StateMachine> stateMachineSink;
169

170
  private final WFTBuffer wftBuffer = new WFTBuffer();
1✔
171

172
  private List<Message> messages = new ArrayList<>();
1✔
173

174
  /**
175
   * Set of accepted durably admitted updates by update id a "durably admitted" update is one with
176
   * an UPDATE_ADMITTED event.
177
   */
178
  private final Set<String> acceptedUpdates = new HashSet<>();
1✔
179

180
  private final SdkFlags flags;
181

182
  public WorkflowStateMachines(
183
      StatesMachinesCallback callbacks, GetSystemInfoResponse.Capabilities capabilities) {
184
    this(callbacks, (stateMachine) -> {}, capabilities);
1✔
185
  }
1✔
186

187
  @VisibleForTesting
188
  public WorkflowStateMachines(
189
      StatesMachinesCallback callbacks,
190
      Functions.Proc1<StateMachine> stateMachineSink,
191
      GetSystemInfoResponse.Capabilities capabilities) {
1✔
192
    this.callbacks = Objects.requireNonNull(callbacks);
1✔
193
    this.commandSink = cancellableCommands::add;
1✔
194
    this.stateMachineSink = stateMachineSink;
1✔
195
    this.localActivityRequestSink = (request) -> localActivityRequests.add(request);
1✔
196
    this.flags = new SdkFlags(capabilities.getSdkMetadata(), this::isReplaying);
1✔
197
  }
1✔
198

199
  @VisibleForTesting
200
  public WorkflowStateMachines(
201
      StatesMachinesCallback callbacks, Functions.Proc1<StateMachine> stateMachineSink) {
1✔
202
    this.callbacks = Objects.requireNonNull(callbacks);
1✔
203
    this.commandSink = cancellableCommands::add;
1✔
204
    this.stateMachineSink = stateMachineSink;
1✔
205
    this.localActivityRequestSink = (request) -> localActivityRequests.add(request);
1✔
206
    this.flags = new SdkFlags(false, this::isReplaying);
1✔
207
  }
1✔
208

209
  // TODO revisit and potentially remove workflowTaskStartedEventId at all from the state machines.
210
  // The only place where it's used is WorkflowTaskStateMachine to understand that there will be
211
  // no completion event in the history.
212
  // This is tricky, because direct queries come with 0 as a magic value in this field if
213
  // execution is completed for example.
214
  // Most likely we can rework WorkflowTaskStateMachine to use only hasNext.
215
  /**
216
   * @param workflowTaskStartedEventId eventId of the workflowTask that was picked up by a worker
217
   *     and triggered an execution. Used in {@link WorkflowTaskStateMachine} only to understand
218
   *     that this workflow task will not have a matching closing event and needs to be executed.
219
   */
220
  public void setWorkflowStartedEventId(long workflowTaskStartedEventId) {
221
    this.workflowTaskStartedEventId = workflowTaskStartedEventId;
1✔
222
  }
1✔
223

224
  public void resetStartedEvenId(long eventId) {
225
    // We must reset the last event we handled to be after the last WFT we really completed
226
    // + any command events (since the SDK "processed" those when it emitted the commands). This
227
    // is also equal to what we just processed in the speculative task, minus two, since we
228
    // would've just handled the most recent WFT started event, and we need to drop that & the
229
    // schedule event just before it.
230
    long resetLastHandledEventId = this.lastHandledEventId - 2;
1✔
231
    // We have to drop any state machines (which should only be one workflow task machine)
232
    // created when handling the speculative workflow task
233
    for (long i = this.lastHandledEventId; i > resetLastHandledEventId; i--) {
1✔
234
      stateMachines.remove(i);
1✔
235
    }
236
    this.lastWFTStartedEventId = eventId;
1✔
237
    this.lastHandledEventId = resetLastHandledEventId;
1✔
238
  }
1✔
239

240
  public long getLastWFTStartedEventId() {
241
    return lastWFTStartedEventId;
1✔
242
  }
243

244
  public long getCurrentWFTStartedEventId() {
245
    return workflowTaskStartedEventId;
×
246
  }
247

248
  public long getHistorySize() {
249
    return historySize;
1✔
250
  }
251

252
  @Nullable
253
  public String getCurrentTaskBuildId() {
254
    return currentTaskBuildId;
×
255
  }
256

257
  public boolean isContinueAsNewSuggested() {
258
    return isContinueAsNewSuggested;
1✔
259
  }
260

261
  public void setReplaying(boolean replaying) {
262
    this.replaying = replaying;
1✔
263
  }
1✔
264

265
  public void setMessages(List<Message> messages) {
266
    this.messages = new ArrayList<>(messages);
1✔
267
  }
1✔
268

269
  /**
270
   * Handle a single event from the workflow history.
271
   *
272
   * @param event event from the history.
273
   * @param hasNextEvent false if this is the last event in the history.
274
   */
275
  public void handleEvent(HistoryEvent event, boolean hasNextEvent) {
276
    long eventId = event.getEventId();
1✔
277
    if (eventId <= lastHandledEventId) {
1✔
278
      // already handled
279
      return;
1✔
280
    }
281
    Preconditions.checkState(
1✔
282
        eventId == lastHandledEventId + 1,
283
        "History is out of order. "
284
            + "There is a gap between the last event workflow state machine observed and currently handling event. "
285
            + "Last processed eventId: %s, handling eventId: %s",
286
        lastHandledEventId,
287
        eventId);
288

289
    lastHandledEventId = eventId;
1✔
290
    boolean readyToPeek = wftBuffer.addEvent(event, hasNextEvent);
1✔
291
    if (readyToPeek) {
1✔
292
      handleEventsBatch(wftBuffer.fetch(), hasNextEvent);
1✔
293
    }
294
  }
1✔
295

296
  /**
297
   * Handle an events batch for one workflow task. Events that are related to one workflow task
298
   * during replay should be prefetched and supplied in one batch.
299
   *
300
   * @param eventBatch events belong to one workflow task
301
   * @param hasNextBatch true if there are more events in the history follow this batch, false if
302
   *     this batch contains the last events of the history
303
   */
304
  private void handleEventsBatch(WFTBuffer.EventBatch eventBatch, boolean hasNextBatch) {
305
    List<HistoryEvent> events = eventBatch.getEvents();
1✔
306
    if (EventType.EVENT_TYPE_WORKFLOW_EXECUTION_STARTED.equals(events.get(0).getEventType())) {
1✔
307
      for (SdkFlag flag : initialFlags) {
1✔
308
        flags.tryUseSdkFlag(flag);
1✔
309
      }
1✔
310
    }
311

312
    if (eventBatch.getWorkflowTaskCompletedEvent().isPresent()) {
1✔
313
      for (HistoryEvent event : events) {
1✔
314
        handleSingleEventLookahead(event);
1✔
315
      }
1✔
316
    }
317

318
    for (Iterator<HistoryEvent> iterator = events.iterator(); iterator.hasNext(); ) {
1✔
319
      HistoryEvent event = iterator.next();
1✔
320

321
      // On replay the messages are available after the workflow task schedule event, so we
322
      // need to handle them before workflow task started event to maintain a consistent order.
323
      for (Message msg : this.takeLTE(event.getEventId() - 1)) {
1✔
324
        handleSingleMessage(msg);
1✔
325
      }
1✔
326

327
      try {
328
        boolean isLastTask =
1✔
329
            !hasNextBatch && !eventBatch.getWorkflowTaskCompletedEvent().isPresent();
1✔
330
        boolean hasNextEvent = iterator.hasNext() || hasNextBatch;
1✔
331
        handleSingleEvent(event, isLastTask, hasNextEvent);
1✔
332
      } catch (RuntimeException e) {
1✔
333
        throw createEventProcessingException(e, event);
1✔
334
      }
1✔
335

336
      for (Message msg : this.takeLTE(event.getEventId())) {
1✔
337
        handleSingleMessage(msg);
1✔
338
      }
1✔
339
    }
1✔
340
  }
1✔
341

342
  /** Handle an event when looking ahead at history during replay */
343
  private void handleSingleEventLookahead(HistoryEvent event) {
344
    EventType eventType = event.getEventType();
1✔
345
    switch (eventType) {
1✔
346
      case EVENT_TYPE_MARKER_RECORDED:
347
        try {
348
          preloadVersionMarker(event);
1✔
349
        } catch (RuntimeException e) {
1✔
350
          throw createEventProcessingException(e, event);
1✔
351
        }
1✔
352
        break;
353
      case EVENT_TYPE_WORKFLOW_EXECUTION_UPDATE_ACCEPTED:
354
        // Look ahead to infer protocol messages
355
        WorkflowExecutionUpdateAcceptedEventAttributes updateEvent =
1✔
356
            event.getWorkflowExecutionUpdateAcceptedEventAttributes();
1✔
357
        // If an EXECUTION_UPDATE_ACCEPTED event does not have an accepted request, then it
358
        // must be from an admitted update. This is the only way to infer an admitted update was
359
        // accepted.
360
        if (!updateEvent.hasAcceptedRequest()) {
1✔
361
          acceptedUpdates.add(updateEvent.getProtocolInstanceId());
1✔
362
        } else {
363
          messages.add(
1✔
364
              Message.newBuilder()
1✔
365
                  .setId(updateEvent.getAcceptedRequestMessageId())
1✔
366
                  .setProtocolInstanceId(updateEvent.getProtocolInstanceId())
1✔
367
                  .setEventId(updateEvent.getAcceptedRequestSequencingEventId())
1✔
368
                  .setBody(Any.pack(updateEvent.getAcceptedRequest()))
1✔
369
                  .build());
1✔
370
        }
371
        break;
1✔
372
      case EVENT_TYPE_WORKFLOW_TASK_COMPLETED:
373
        WorkflowTaskCompletedEventAttributes completedEvent =
1✔
374
            event.getWorkflowTaskCompletedEventAttributes();
1✔
375
        String maybeBuildId = completedEvent.getWorkerVersion().getBuildId();
1✔
376
        if (!maybeBuildId.isEmpty()) {
1✔
377
          currentTaskBuildId = maybeBuildId;
×
378
        }
379
        for (Integer flag : completedEvent.getSdkMetadata().getLangUsedFlagsList()) {
1✔
380
          SdkFlag sdkFlag = SdkFlag.getValue(flag);
1✔
381
          if (sdkFlag.equals(SdkFlag.UNKNOWN)) {
1✔
382
            throw new IllegalArgumentException("Unknown SDK flag:" + flag);
×
383
          }
384
          flags.setSdkFlag(sdkFlag);
1✔
385
        }
1✔
386
        // Remove any finished update protocol state machines. We can't remove them on an event like
387
        // other state machines because a rejected update produces no event in history.
388
        protocolStateMachines.entrySet().removeIf(entry -> entry.getValue().isFinalState());
1✔
389
        break;
390
    }
391
  }
1✔
392

393
  private List<Message> takeLTE(long eventId) {
394
    List<Message> m = new ArrayList<>();
1✔
395
    List<Message> remainingMessages = new ArrayList<>();
1✔
396
    for (Message msg : this.messages) {
1✔
397
      if (msg.getEventId() > eventId) {
1✔
398
        remainingMessages.add(msg);
1✔
399
      } else {
400
        m.add(msg);
1✔
401
      }
402
    }
1✔
403
    this.messages = remainingMessages;
1✔
404
    return m;
1✔
405
  }
406

407
  private RuntimeException createEventProcessingException(RuntimeException e, HistoryEvent event) {
408
    Throwable ex = unwrap(e);
1✔
409
    if (ex instanceof NonDeterministicException) {
1✔
410
      // just appending the message in front of an existing message, saving the original stacktrace
411
      NonDeterministicException modifiedException =
1✔
412
          new NonDeterministicException(
413
              createEventHandlingMessage(event)
1✔
414
                  + ". "
415
                  + ex.getMessage()
1✔
416
                  + ". "
417
                  + createShortCurrentStateMessagePostfix(),
1✔
418
              ex.getCause());
1✔
419
      modifiedException.setStackTrace(ex.getStackTrace());
1✔
420
      return modifiedException;
1✔
421
    } else {
422
      return new InternalWorkflowTaskException(
1✔
423
          createEventHandlingMessage(event) + ". " + createShortCurrentStateMessagePostfix(), ex);
1✔
424
    }
425
  }
426

427
  private void handleSingleMessage(Message message) {
428
    // Get or create protocol state machine based on Instance ID and protocolName
429
    EntityStateMachine stateMachine =
1✔
430
        protocolStateMachines.computeIfAbsent(
1✔
431
            message.getProtocolInstanceId(),
1✔
432
            (protocolInstance) -> {
433
              String protocolName = ProtocolUtils.getProtocol(message);
1✔
434
              Optional<ProtocolType> type = ProtocolType.get(protocolName);
1✔
435
              if (type.isPresent()) {
1✔
436
                switch (type.get()) {
1✔
437
                  case UPDATE_V1:
438
                    return UpdateProtocolStateMachine.newInstance(
1✔
439
                        this::isReplaying,
440
                        callbacks::update,
1✔
441
                        this::sendMessage,
442
                        commandSink,
443
                        stateMachineSink);
444
                  default:
445
                    throw new IllegalArgumentException("Unknown protocol type:" + protocolName);
×
446
                }
447
              }
448
              throw new IllegalArgumentException("Protocol type not specified:" + message);
×
449
            });
450
    stateMachine.handleMessage(message);
1✔
451
  }
1✔
452

453
  private void handleSingleEvent(HistoryEvent event, boolean lastTask, boolean hasNextEvent) {
454
    if (isCommandEvent(event)) {
1✔
455
      handleCommandEvent(event);
1✔
456
      return;
1✔
457
    }
458

459
    // We don't explicitly check if the event is a command event here because it's already handled
460
    // above.
461
    if (replaying
1✔
462
        && lastTask
463
        && event.getEventType() != EventType.EVENT_TYPE_WORKFLOW_TASK_COMPLETED) {
1✔
464
      replaying = false;
1✔
465
    }
466

467
    final OptionalLong initialCommandEventId = getInitialCommandEventId(event);
1✔
468
    if (!initialCommandEventId.isPresent()) {
1✔
469
      return;
1✔
470
    }
471

472
    EntityStateMachine c = stateMachines.get(initialCommandEventId.getAsLong());
1✔
473
    if (c != null) {
1✔
474
      c.handleEvent(event, hasNextEvent);
1✔
475
      if (c.isFinalState()) {
1✔
476
        stateMachines.remove(initialCommandEventId.getAsLong());
1✔
477
      }
478
    } else {
479
      handleNonStatefulEvent(event, hasNextEvent);
1✔
480
    }
481
  }
1✔
482

483
  /**
484
   * Handles command event. Command event is an event which is generated from a command emitted by a
485
   * past decision. Each command has a correspondent event. For example ScheduleActivityTaskCommand
486
   * is recorded to the history as ActivityTaskScheduledEvent.
487
   *
488
   * <p>Command events always follow WorkflowTaskCompletedEvent.
489
   *
490
   * <p>The handling consists from verifying that the next command in the commands queue matches the
491
   * event, command state machine is notified about the event and the command is removed from the
492
   * commands queue.
493
   */
494
  private void handleCommandEvent(HistoryEvent event) {
495
    if (handleLocalActivityMarker(event)) {
1✔
496
      return;
1✔
497
    }
498

499
    // Match event to the next command in the stateMachine queue.
500
    // After matching the command is notified about the event and is removed from the
501
    // queue.
502
    CancellableCommand matchingCommand = null;
1✔
503
    while (matchingCommand == null) {
1✔
504
      // handleVersionMarker can skip a marker event if the getVersion call was removed.
505
      // In this case we don't want to consume a command.
506
      // That's why peek is used instead of poll.
507
      CancellableCommand command = commands.peek();
1✔
508
      if (command == null) {
1✔
509
        if (handleNonMatchingVersionMarker(event)) {
×
510
          // this event is a version marker for removed getVersion call.
511
          // Handle the version marker as unmatched and return even if there is no commands to match
512
          // it against.
513
          return;
×
514
        } else {
515
          throw new NonDeterministicException("No command scheduled that corresponds to " + event);
×
516
        }
517
      }
518

519
      if (command.isCanceled()) {
1✔
520
        // Consume and skip the command
521
        commands.poll();
1✔
522
        continue;
1✔
523
      }
524

525
      // Note that handleEvent can cause a command cancellation in case of
526
      // 1. MutableSideEffect
527
      // 2. Version State Machine during replay cancels the command and enters SKIPPED state
528
      //    if it handled non-matching event.
529
      HandleEventStatus status = command.handleEvent(event, true);
1✔
530

531
      if (command.isCanceled()) {
1✔
532
        // Consume and skip the command
533
        commands.poll();
1✔
534
        continue;
1✔
535
      }
536

537
      switch (status) {
1✔
538
        case OK:
539
          // Consume the command
540
          commands.poll();
1✔
541
          matchingCommand = command;
1✔
542
          break;
1✔
543
        case NON_MATCHING_EVENT:
544
          if (handleNonMatchingVersionMarker(event)) {
1✔
545
            // this event is a version marker for removed getVersion call.
546
            // Handle the version marker as unmatched and return without consuming the command
547
            return;
1✔
548
          } else {
549
            throw new NonDeterministicException(
1✔
550
                "Event "
551
                    + event.getEventId()
1✔
552
                    + " of type "
553
                    + event.getEventType()
1✔
554
                    + " does not"
555
                    + " match command type "
556
                    + command.getCommandType());
1✔
557
          }
558
        default:
559
          throw new IllegalStateException(
×
560
              "Got " + status + " value from command.handleEvent which is not handled");
561
      }
562
    }
1✔
563

564
    validateCommand(matchingCommand.getCommand(), event);
1✔
565
    EntityStateMachine stateMachine = matchingCommand.getStateMachine();
1✔
566
    if (!stateMachine.isFinalState()) {
1✔
567
      stateMachines.put(event.getEventId(), stateMachine);
1✔
568
    }
569
    // Marker is the only command processing of which can cause workflow code execution
570
    // and generation of new state machines.
571
    if (event.getEventType() == EventType.EVENT_TYPE_MARKER_RECORDED) {
1✔
572
      prepareCommands();
1✔
573
    }
574
  }
1✔
575

576
  private void preloadVersionMarker(HistoryEvent event) {
577
    if (VersionMarkerUtils.hasVersionMarkerStructure(event)) {
1✔
578
      String changeId = VersionMarkerUtils.tryGetChangeIdFromVersionMarkerEvent(event);
1✔
579
      if (changeId == null) {
1✔
580
        // if we can't extract changeId, this event will later fail to match with anything
581
        // and the corresponded exception will be thrown
582
        return;
×
583
      }
584
      VersionStateMachine versionStateMachine =
1✔
585
          versions.computeIfAbsent(
1✔
586
              changeId,
587
              (idKey) ->
588
                  VersionStateMachine.newInstance(
1✔
589
                      changeId, this::isReplaying, commandSink, stateMachineSink));
590
      versionStateMachine.handleMarkersPreload(event);
1✔
591
    }
592
  }
1✔
593

594
  private boolean handleNonMatchingVersionMarker(HistoryEvent event) {
595
    String changeId = VersionMarkerUtils.tryGetChangeIdFromVersionMarkerEvent(event);
1✔
596
    if (changeId == null) {
1✔
597
      return false;
1✔
598
    }
599
    VersionStateMachine versionStateMachine = versions.get(changeId);
1✔
600
    Preconditions.checkNotNull(
1✔
601
        versionStateMachine,
602
        "versionStateMachine is expected to be initialized already by execution or preloading");
603
    versionStateMachine.handleNonMatchingEvent(event);
1✔
604
    return true;
1✔
605
  }
606

607
  public List<Command> takeCommands() {
608
    List<Command> result = new ArrayList<>(commands.size());
1✔
609
    for (CancellableCommand command : commands) {
1✔
610
      if (!command.isCanceled()) {
1✔
611
        result.add(command.getCommand());
1✔
612
      }
613
    }
1✔
614
    return result;
1✔
615
  }
616

617
  public void sendMessage(Message message) {
618
    checkEventLoopExecuting();
1✔
619
    if (!isReplaying()) {
1✔
620
      messageOutbox.add(message);
1✔
621
    }
622
  }
1✔
623

624
  public List<Message> takeMessages() {
625
    List<Message> result = new ArrayList<>(messageOutbox.size());
1✔
626
    result.addAll(messageOutbox);
1✔
627
    messageOutbox.clear();
1✔
628
    return result;
1✔
629
  }
630

631
  /**
632
   * @return True if the SDK flag is supported in this workflow execution
633
   */
634
  public boolean tryUseSdkFlag(SdkFlag flag) {
635
    return flags.tryUseSdkFlag(flag);
1✔
636
  }
637

638
  /**
639
   * @return Set of all new flags set since the last call
640
   */
641
  public EnumSet<SdkFlag> takeNewSdkFlags() {
642
    return flags.takeNewSdkFlags();
1✔
643
  }
644

645
  private void prepareCommands() {
646
    if (preparing) {
1✔
647
      return;
1✔
648
    }
649
    preparing = true;
1✔
650
    try {
651
      prepareImpl();
1✔
652
    } finally {
653
      preparing = false;
1✔
654
    }
655
  }
1✔
656

657
  private void prepareImpl() {
658
    // handleCommand can lead to code execution because of SideEffect, MutableSideEffect or local
659
    // activity completion. And code execution can lead to creation of new commands and
660
    // cancellation of existing commands. That is the reason for using Queue as a data structure for
661
    // commands.
662
    while (true) {
663
      CancellableCommand command = cancellableCommands.poll();
1✔
664
      if (command == null) {
1✔
665
        break;
1✔
666
      }
667
      // handleCommand should be called even on canceled ones to support mutableSideEffect
668
      command.handleCommand(command.getCommandType());
1✔
669
      commands.add(command);
1✔
670
    }
1✔
671
  }
1✔
672

673
  /**
674
   * Local activity is different from all other entities. It doesn't schedule a marker command when
675
   * the {@link #scheduleLocalActivityTask(ExecuteLocalActivityParameters, Functions.Proc2)} is
676
   * called. The marker is scheduled only when activity completes through ({@link
677
   * #handleLocalActivityCompletion(LocalActivityResult)}). That's why the normal logic of {@link
678
   * #handleCommandEvent(HistoryEvent)}, which assumes that each event has a correspondent command
679
   * during replay, doesn't work. Instead, local activities are matched by their id using
680
   * localActivityMap.
681
   *
682
   * @return true if matched and false if normal event handling should continue.
683
   */
684
  private boolean handleLocalActivityMarker(HistoryEvent event) {
685
    if (!LocalActivityMarkerUtils.hasLocalActivityStructure(event)) {
1✔
686
      return false;
1✔
687
    }
688

689
    MarkerRecordedEventAttributes markerAttributes = event.getMarkerRecordedEventAttributes();
1✔
690
    String id = LocalActivityMarkerUtils.getActivityId(markerAttributes);
1✔
691
    LocalActivityStateMachine stateMachine = localActivityMap.remove(id);
1✔
692
    if (stateMachine == null) {
1✔
693
      String activityType = LocalActivityMarkerUtils.getActivityTypeName(markerAttributes);
×
694
      throw new NonDeterministicException(
×
695
          String.format(
×
696
              "Local activity of type %s is recorded in the history with id %s but was not expected by the execution",
697
              activityType, id));
698
    }
699
    // RESULT_NOTIFIED state means that there is outstanding command that has to be matched
700
    // using standard logic. So return false to let the handleCommand method to run its standard
701
    // logic.
702
    if (stateMachine.getState() == LocalActivityStateMachine.State.RESULT_NOTIFIED) {
1✔
703
      return false;
1✔
704
    }
705
    stateMachine.handleEvent(event, true);
1✔
706
    eventLoop();
1✔
707
    return true;
1✔
708
  }
709

710
  private void eventLoop() {
711
    if (eventLoopExecuting) {
1✔
712
      return;
1✔
713
    }
714
    eventLoopExecuting = true;
1✔
715
    try {
716
      callbacks.eventLoop();
1✔
717
    } finally {
718
      eventLoopExecuting = false;
1✔
719
    }
720
    prepareCommands();
1✔
721
  }
1✔
722

723
  private void handleNonStatefulEvent(HistoryEvent event, boolean hasNextEvent) {
724
    switch (event.getEventType()) {
1✔
725
      case EVENT_TYPE_WORKFLOW_EXECUTION_STARTED:
726
        this.currentRunId =
1✔
727
            event.getWorkflowExecutionStartedEventAttributes().getOriginalExecutionRunId();
1✔
728
        callbacks.start(event);
1✔
729
        break;
1✔
730
      case EVENT_TYPE_WORKFLOW_TASK_SCHEDULED:
731
        WorkflowTaskStateMachine c =
1✔
732
            WorkflowTaskStateMachine.newInstance(
1✔
733
                workflowTaskStartedEventId, new WorkflowTaskCommandsListener());
734
        c.handleEvent(event, hasNextEvent);
1✔
735
        stateMachines.put(event.getEventId(), c);
1✔
736
        break;
1✔
737
      case EVENT_TYPE_WORKFLOW_EXECUTION_SIGNALED:
738
        callbacks.signal(event);
1✔
739
        break;
1✔
740
      case EVENT_TYPE_WORKFLOW_EXECUTION_CANCEL_REQUESTED:
741
        callbacks.cancel(event);
1✔
742
        break;
1✔
743
      case EVENT_TYPE_WORKFLOW_EXECUTION_UPDATE_ADMITTED:
744
        WorkflowExecutionUpdateAdmittedEventAttributes admittedEvent =
1✔
745
            event.getWorkflowExecutionUpdateAdmittedEventAttributes();
1✔
746
        Message msg =
747
            Message.newBuilder()
1✔
748
                .setId(admittedEvent.getRequest().getMeta().getUpdateId() + "/request")
1✔
749
                .setProtocolInstanceId(admittedEvent.getRequest().getMeta().getUpdateId())
1✔
750
                .setEventId(event.getEventId())
1✔
751
                .setBody(Any.pack(admittedEvent.getRequest()))
1✔
752
                .build();
1✔
753
        if (replaying && acceptedUpdates.remove(msg.getProtocolInstanceId()) || !replaying) {
1✔
754
          messages.add(msg);
1✔
755
        }
756
        break;
757
      case EVENT_TYPE_WORKFLOW_EXECUTION_TIMED_OUT:
758
      case UNRECOGNIZED:
759
        break;
1✔
760
      default:
761
        throw new IllegalArgumentException("Unexpected event:" + event);
×
762
    }
763
  }
1✔
764

765
  private long setCurrentTimeMillis(long currentTimeMillis) {
766
    if (this.currentTimeMillis < currentTimeMillis) {
1✔
767
      this.currentTimeMillis = currentTimeMillis;
1✔
768
    }
769
    return this.currentTimeMillis;
1✔
770
  }
771

772
  public long getLastStartedEventId() {
773
    return lastWFTStartedEventId;
1✔
774
  }
775

776
  /**
777
   * @param attributes attributes used to schedule an activity
778
   * @param callback completion callback
779
   * @return an instance of ActivityCommands
780
   */
781
  public Functions.Proc scheduleActivityTask(
782
      ExecuteActivityParameters attributes, Functions.Proc2<Optional<Payloads>, Failure> callback) {
783
    checkEventLoopExecuting();
1✔
784
    ActivityStateMachine activityStateMachine =
1✔
785
        ActivityStateMachine.newInstance(
1✔
786
            attributes,
787
            (p, f) -> {
788
              Failure failure = f != null ? f.getFailure() : null;
1✔
789
              callback.apply(p, failure);
1✔
790

791
              if (f != null
1✔
792
                  && !f.isFromEvent()
1✔
793
                  && failure.hasCause()
1✔
794
                  && failure.getCause().hasCanceledFailureInfo()) {
1✔
795
                // If !f.isFromEvent(), we want to unblock the event loop as the promise got filled
796
                // and the workflow may make progress. If f.isFromEvent(), we need to delay event
797
                // loop triggering until WorkflowTaskStarted.
798
                eventLoop();
1✔
799
              }
800
            },
1✔
801
            commandSink,
802
            stateMachineSink);
803
    return activityStateMachine::cancel;
1✔
804
  }
805

806
  /**
807
   * Creates a new timer state machine
808
   *
809
   * @param attributes timer command attributes
810
   * @param completionCallback invoked when timer fires or reports cancellation. One of
811
   *     TimerFiredEvent, TimerCanceledEvent.
812
   * @return cancellation callback that should be invoked to initiate timer cancellation
813
   */
814
  public Functions.Proc newTimer(
815
      StartTimerCommandAttributes attributes, Functions.Proc1<HistoryEvent> completionCallback) {
816
    checkEventLoopExecuting();
1✔
817
    TimerStateMachine timer =
1✔
818
        TimerStateMachine.newInstance(
1✔
819
            attributes,
820
            (event) -> {
821
              completionCallback.apply(event);
1✔
822
              // Needed due to immediate cancellation
823
              if (event.getEventType() == EventType.EVENT_TYPE_TIMER_CANCELED) {
1✔
824
                eventLoop();
1✔
825
              }
826
            },
1✔
827
            commandSink,
828
            stateMachineSink);
829
    return timer::cancel;
1✔
830
  }
831

832
  /**
833
   * Creates a new child state machine
834
   *
835
   * @param parameters child workflow start command parameters
836
   * @param startedCallback callback that is notified about child start
837
   * @param completionCallback invoked when child reports completion or failure
838
   * @return cancellation callback that should be invoked to cancel the child
839
   */
840
  public Functions.Proc startChildWorkflow(
841
      StartChildWorkflowExecutionParameters parameters,
842
      Functions.Proc2<WorkflowExecution, Exception> startedCallback,
843
      Functions.Proc2<Optional<Payloads>, Exception> completionCallback) {
844
    checkEventLoopExecuting();
1✔
845
    StartChildWorkflowExecutionCommandAttributes attributes = parameters.getRequest().build();
1✔
846
    ChildWorkflowCancellationType cancellationType = parameters.getCancellationType();
1✔
847
    ChildWorkflowStateMachine child =
1✔
848
        ChildWorkflowStateMachine.newInstance(
1✔
849
            attributes, startedCallback, completionCallback, commandSink, stateMachineSink);
850
    return () -> {
1✔
851
      if (cancellationType == ChildWorkflowCancellationType.ABANDON) {
1✔
852
        notifyChildCanceled(completionCallback);
1✔
853
        return;
1✔
854
      }
855
      // The only time child can be canceled directly is before its start command
856
      // was sent out to the service. After that RequestCancelExternal should be used.
857
      if (child.isCancellable()) {
1✔
858
        child.cancel();
1✔
859
        return;
1✔
860
      }
861
      if (!child.isFinalState()) {
1✔
862
        requestCancelExternalWorkflowExecution(
1✔
863
            RequestCancelExternalWorkflowExecutionCommandAttributes.newBuilder()
1✔
864
                .setWorkflowId(attributes.getWorkflowId())
1✔
865
                .setNamespace(attributes.getNamespace())
1✔
866
                .setChildWorkflowOnly(true)
1✔
867
                .build(),
1✔
868
            (r, e) -> { // TODO(maxim): Decide what to do if an error is passed to the callback.
869
              if (cancellationType == ChildWorkflowCancellationType.WAIT_CANCELLATION_REQUESTED) {
1✔
870
                notifyChildCanceled(completionCallback);
1✔
871
              }
872
            });
1✔
873
        if (cancellationType == ChildWorkflowCancellationType.TRY_CANCEL) {
1✔
874
          notifyChildCanceled(completionCallback);
1✔
875
        }
876
      }
877
    };
1✔
878
  }
879

880
  private void notifyChildCanceled(
881
      Functions.Proc2<Optional<Payloads>, Exception> completionCallback) {
882
    CanceledFailure failure = new CanceledFailure("Child canceled");
1✔
883
    completionCallback.apply(Optional.empty(), failure);
1✔
884
    eventLoop();
1✔
885
  }
1✔
886

887
  /**
888
   * @param attributes
889
   * @param completionCallback invoked when signal delivery completes of fails. The following types
890
   */
891
  public Functions.Proc signalExternalWorkflowExecution(
892
      SignalExternalWorkflowExecutionCommandAttributes attributes,
893
      Functions.Proc2<Void, Failure> completionCallback) {
894
    checkEventLoopExecuting();
1✔
895
    return SignalExternalStateMachine.newInstance(
1✔
896
        attributes, completionCallback, commandSink, stateMachineSink);
897
  }
898

899
  /**
900
   * @param attributes attributes to use to cancel external workflow
901
   * @param completionCallback one of ExternalWorkflowExecutionCancelRequestedEvent,
902
   */
903
  public void requestCancelExternalWorkflowExecution(
904
      RequestCancelExternalWorkflowExecutionCommandAttributes attributes,
905
      Functions.Proc2<Void, RuntimeException> completionCallback) {
906
    checkEventLoopExecuting();
1✔
907
    CancelExternalStateMachine.newInstance(
1✔
908
        attributes, completionCallback, commandSink, stateMachineSink);
909
  }
1✔
910

911
  public void upsertSearchAttributes(SearchAttributes attributes) {
912
    checkEventLoopExecuting();
1✔
913
    UpsertSearchAttributesStateMachine.newInstance(attributes, commandSink, stateMachineSink);
1✔
914
  }
1✔
915

916
  public void completeWorkflow(Optional<Payloads> workflowOutput) {
917
    checkEventLoopExecuting();
1✔
918
    CompleteWorkflowStateMachine.newInstance(workflowOutput, commandSink, stateMachineSink);
1✔
919
  }
1✔
920

921
  public void failWorkflow(Failure failure) {
922
    checkEventLoopExecuting();
1✔
923
    FailWorkflowStateMachine.newInstance(failure, commandSink, stateMachineSink);
1✔
924
  }
1✔
925

926
  public void cancelWorkflow() {
927
    checkEventLoopExecuting();
1✔
928
    CancelWorkflowStateMachine.newInstance(
1✔
929
        CancelWorkflowExecutionCommandAttributes.getDefaultInstance(),
1✔
930
        commandSink,
931
        stateMachineSink);
932
  }
1✔
933

934
  public void continueAsNewWorkflow(ContinueAsNewWorkflowExecutionCommandAttributes attributes) {
935
    checkEventLoopExecuting();
1✔
936
    ContinueAsNewWorkflowStateMachine.newInstance(attributes, commandSink, stateMachineSink);
1✔
937
  }
1✔
938

939
  public boolean isReplaying() {
940
    return replaying;
1✔
941
  }
942

943
  public long currentTimeMillis() {
944
    return currentTimeMillis;
1✔
945
  }
946

947
  public UUID randomUUID() {
948
    checkEventLoopExecuting();
1✔
949
    String runId = currentRunId;
1✔
950
    if (runId == null) {
1✔
951
      throw new Error("null currentRunId");
×
952
    }
953
    String id = runId + ":" + idCounter++;
1✔
954
    byte[] bytes = id.getBytes(StandardCharsets.UTF_8);
1✔
955
    return UUID.nameUUIDFromBytes(bytes);
1✔
956
  }
957

958
  public Random newRandom() {
959
    checkEventLoopExecuting();
1✔
960
    return new Random(randomUUID().getLeastSignificantBits());
1✔
961
  }
962

963
  public void sideEffect(
964
      Functions.Func<Optional<Payloads>> func, Functions.Proc1<Optional<Payloads>> callback) {
965
    checkEventLoopExecuting();
1✔
966
    SideEffectStateMachine.newInstance(
1✔
967
        this::isReplaying,
968
        func,
969
        (payloads) -> {
970
          callback.apply(payloads);
1✔
971
          // callback unblocked sideEffect call. Give workflow code chance to make progress.
972
          eventLoop();
1✔
973
        },
1✔
974
        commandSink,
975
        stateMachineSink);
976
  }
1✔
977

978
  /**
979
   * @param id mutable side effect id
980
   * @param func given the value from the last marker returns value to store. If result is empty
981
   *     nothing is recorded into the history.
982
   * @param callback used to report result or failure
983
   */
984
  public void mutableSideEffect(
985
      String id,
986
      Functions.Func1<Optional<Payloads>, Optional<Payloads>> func,
987
      Functions.Proc1<Optional<Payloads>> callback) {
988
    checkEventLoopExecuting();
1✔
989
    MutableSideEffectStateMachine stateMachine =
1✔
990
        mutableSideEffects.computeIfAbsent(
1✔
991
            id,
992
            (idKey) ->
993
                MutableSideEffectStateMachine.newInstance(
1✔
994
                    idKey, this::isReplaying, commandSink, stateMachineSink));
995
    stateMachine.mutableSideEffect(
1✔
996
        func,
997
        (r) -> {
998
          callback.apply(r);
1✔
999
          // callback unblocked mutableSideEffect call. Give workflow code chance to make progress.
1000
          eventLoop();
1✔
1001
        },
1✔
1002
        stateMachineSink);
1003
  }
1✔
1004

1005
  public boolean getVersion(
1006
      String changeId,
1007
      int minSupported,
1008
      int maxSupported,
1009
      Functions.Proc2<Integer, RuntimeException> callback) {
1010
    VersionStateMachine stateMachine =
1✔
1011
        versions.computeIfAbsent(
1✔
1012
            changeId,
1013
            (idKey) ->
1014
                VersionStateMachine.newInstance(
1✔
1015
                    changeId, this::isReplaying, commandSink, stateMachineSink));
1016
    return stateMachine.getVersion(
1✔
1017
        minSupported,
1018
        maxSupported,
1019
        (v, e) -> {
1020
          callback.apply(v, e);
1✔
1021
          // without this getVersion call will trigger the end of WFT,
1022
          // instead we want to prepare subsequent commands and unblock the execution one more
1023
          // time.
1024
          eventLoop();
1✔
1025
        });
1✔
1026
  }
1027

1028
  public List<ExecuteLocalActivityParameters> takeLocalActivityRequests() {
1029
    List<ExecuteLocalActivityParameters> result = localActivityRequests;
1✔
1030
    localActivityRequests = new ArrayList<>();
1✔
1031
    for (ExecuteLocalActivityParameters parameters : result) {
1✔
1032
      LocalActivityStateMachine stateMachine = localActivityMap.get(parameters.getActivityId());
1✔
1033
      stateMachine.markAsSent();
1✔
1034
    }
1✔
1035
    return result;
1✔
1036
  }
1037

1038
  public void handleLocalActivityCompletion(LocalActivityResult laCompletion) {
1039
    String activityId = laCompletion.getActivityId();
1✔
1040
    LocalActivityStateMachine laStateMachine = localActivityMap.get(activityId);
1✔
1041
    if (laStateMachine == null) {
1✔
1042
      throw new IllegalStateException("Unknown local activity: " + activityId);
×
1043
    }
1044
    laStateMachine.handleCompletion(laCompletion);
1✔
1045
    prepareCommands();
1✔
1046
  }
1✔
1047

1048
  public Functions.Proc scheduleLocalActivityTask(
1049
      ExecuteLocalActivityParameters parameters,
1050
      Functions.Proc2<Optional<Payloads>, LocalActivityCallback.LocalActivityFailedException>
1051
          callback) {
1052
    checkEventLoopExecuting();
1✔
1053
    String activityId = parameters.getActivityId();
1✔
1054
    if (Strings.isNullOrEmpty(activityId)) {
1✔
1055
      throw new IllegalArgumentException("Missing activityId: " + activityId);
×
1056
    }
1057
    if (localActivityMap.containsKey(activityId)) {
1✔
1058
      throw new IllegalArgumentException("Duplicated local activity id: " + activityId);
×
1059
    }
1060
    LocalActivityStateMachine commands =
1✔
1061
        LocalActivityStateMachine.newInstance(
1✔
1062
            this::isReplaying,
1063
            this::setCurrentTimeMillis,
1064
            parameters,
1065
            (r, e) -> {
1066
              callback.apply(r, e);
1✔
1067
              // callback unblocked local activity call. Give workflow code chance to make progress.
1068
              eventLoop();
1✔
1069
            },
1✔
1070
            localActivityRequestSink,
1071
            commandSink,
1072
            stateMachineSink,
1073
            currentTimeMillis);
1074
    localActivityMap.put(activityId, commands);
1✔
1075
    return commands::cancel;
1✔
1076
  }
1077

1078
  /** Validates that command matches the event during replay. */
1079
  private void validateCommand(Command command, HistoryEvent event) {
1080
    // ProtocolMessageCommand is different from other commands because it can be associated with
1081
    // multiple types of events
1082
    // TODO(#1781) Validate protocol message is expected type.
1083
    if (command.getCommandType() == COMMAND_TYPE_PROTOCOL_MESSAGE) {
1✔
1084
      ProtocolMessageCommandAttributes commandAttributes =
1✔
1085
          command.getProtocolMessageCommandAttributes();
1✔
1086
      switch (event.getEventType()) {
1✔
1087
        case EVENT_TYPE_WORKFLOW_EXECUTION_UPDATE_ACCEPTED:
1088
          assertMatch(
1✔
1089
              command,
1090
              event,
1091
              "messageType",
1092
              true,
1✔
1093
              commandAttributes.getMessageId().endsWith("accept"));
1✔
1094
          break;
1✔
1095
        case EVENT_TYPE_WORKFLOW_EXECUTION_UPDATE_REJECTED:
1096
          assertMatch(
×
1097
              command,
1098
              event,
1099
              "messageType",
1100
              true,
×
1101
              commandAttributes.getMessageId().endsWith("reject"));
×
1102
          break;
×
1103
        case EVENT_TYPE_WORKFLOW_EXECUTION_UPDATE_COMPLETED:
1104
          assertMatch(
1✔
1105
              command,
1106
              event,
1107
              "messageType",
1108
              true,
1✔
1109
              commandAttributes.getMessageId().endsWith("complete"));
1✔
1110
          break;
1✔
1111
        default:
1112
          throw new IllegalArgumentException("Unexpected event type: " + event.getEventType());
×
1113
      }
1114
      return;
1✔
1115
    }
1116
    assertMatch(
1✔
1117
        command,
1118
        event,
1119
        "eventType",
1120
        getEventTypeForCommand(command.getCommandType()),
1✔
1121
        event.getEventType());
1✔
1122
    switch (command.getCommandType()) {
1✔
1123
      case COMMAND_TYPE_SCHEDULE_ACTIVITY_TASK:
1124
        {
1125
          ScheduleActivityTaskCommandAttributes commandAttributes =
1✔
1126
              command.getScheduleActivityTaskCommandAttributes();
1✔
1127
          ActivityTaskScheduledEventAttributes eventAttributes =
1✔
1128
              event.getActivityTaskScheduledEventAttributes();
1✔
1129
          assertMatch(
1✔
1130
              command,
1131
              event,
1132
              "activityId",
1133
              commandAttributes.getActivityId(),
1✔
1134
              eventAttributes.getActivityId());
1✔
1135
          assertMatch(
1✔
1136
              command,
1137
              event,
1138
              "activityType",
1139
              commandAttributes.getActivityType(),
1✔
1140
              eventAttributes.getActivityType());
1✔
1141
        }
1142
        break;
1✔
1143
      case COMMAND_TYPE_START_CHILD_WORKFLOW_EXECUTION:
1144
        {
1145
          StartChildWorkflowExecutionCommandAttributes commandAttributes =
1✔
1146
              command.getStartChildWorkflowExecutionCommandAttributes();
1✔
1147
          StartChildWorkflowExecutionInitiatedEventAttributes eventAttributes =
1✔
1148
              event.getStartChildWorkflowExecutionInitiatedEventAttributes();
1✔
1149
          assertMatch(
1✔
1150
              command,
1151
              event,
1152
              "workflowId",
1153
              commandAttributes.getWorkflowId(),
1✔
1154
              eventAttributes.getWorkflowId());
1✔
1155
          assertMatch(
1✔
1156
              command,
1157
              event,
1158
              "workflowType",
1159
              commandAttributes.getWorkflowType(),
1✔
1160
              eventAttributes.getWorkflowType());
1✔
1161
        }
1162
        break;
1✔
1163
      case COMMAND_TYPE_REQUEST_CANCEL_ACTIVITY_TASK:
1164
      case COMMAND_TYPE_START_TIMER:
1165
        {
1166
          StartTimerCommandAttributes commandAttributes = command.getStartTimerCommandAttributes();
1✔
1167
          TimerStartedEventAttributes eventAttributes = event.getTimerStartedEventAttributes();
1✔
1168
          assertMatch(
1✔
1169
              command,
1170
              event,
1171
              "timerId",
1172
              commandAttributes.getTimerId(),
1✔
1173
              eventAttributes.getTimerId());
1✔
1174
        }
1175
        break;
1✔
1176
      case COMMAND_TYPE_CANCEL_TIMER:
1177
      case COMMAND_TYPE_CANCEL_WORKFLOW_EXECUTION:
1178
      case COMMAND_TYPE_REQUEST_CANCEL_EXTERNAL_WORKFLOW_EXECUTION:
1179
      case COMMAND_TYPE_RECORD_MARKER:
1180
      case COMMAND_TYPE_CONTINUE_AS_NEW_WORKFLOW_EXECUTION:
1181
      case COMMAND_TYPE_SIGNAL_EXTERNAL_WORKFLOW_EXECUTION:
1182
      case COMMAND_TYPE_UPSERT_WORKFLOW_SEARCH_ATTRIBUTES:
1183
      case COMMAND_TYPE_COMPLETE_WORKFLOW_EXECUTION:
1184
      case COMMAND_TYPE_FAIL_WORKFLOW_EXECUTION:
1185
      case COMMAND_TYPE_PROTOCOL_MESSAGE:
1186
        break;
1✔
1187
      case UNRECOGNIZED:
1188
      case COMMAND_TYPE_UNSPECIFIED:
1189
        throw new IllegalArgumentException("Unexpected command type: " + command.getCommandType());
×
1190
    }
1191
  }
1✔
1192

1193
  private void assertMatch(
1194
      Command command, HistoryEvent event, String checkType, Object expected, Object actual) {
1195
    if (!expected.equals(actual)) {
1✔
1196
      String message =
×
1197
          String.format(
×
1198
              "Command %s doesn't match event %s with EventId=%s on check %s "
1199
                  + "with an expected value '%s' and an actual value '%s'",
1200
              command.getCommandType(),
×
1201
              event.getEventType(),
×
1202
              event.getEventId(),
×
1203
              checkType,
1204
              expected,
1205
              actual);
1206
      throw new NonDeterministicException(message);
×
1207
    }
1208
  }
1✔
1209

1210
  private class WorkflowTaskCommandsListener implements WorkflowTaskStateMachine.Listener {
1✔
1211
    @Override
1212
    public void workflowTaskStarted(
1213
        long startedEventId,
1214
        long currentTimeMillis,
1215
        boolean nonProcessedWorkflowTask,
1216
        long historySize,
1217
        boolean isContinueAsNewSuggested) {
1218
      setCurrentTimeMillis(currentTimeMillis);
1✔
1219
      for (CancellableCommand cancellableCommand : commands) {
1✔
1220
        cancellableCommand.handleWorkflowTaskStarted();
1✔
1221
      }
1✔
1222
      // Give local activities a chance to recreate their requests if they were lost due
1223
      // to the last workflow task failure. The loss could happen only the last workflow task
1224
      // was forcibly created by setting forceCreate on RespondWorkflowTaskCompletedRequest.
1225
      if (nonProcessedWorkflowTask) {
1✔
1226
        for (LocalActivityStateMachine value : localActivityMap.values()) {
1✔
1227
          value.nonReplayWorkflowTaskStarted();
1✔
1228
        }
1✔
1229
      }
1230
      WorkflowStateMachines.this.lastWFTStartedEventId = startedEventId;
1✔
1231
      WorkflowStateMachines.this.historySize = historySize;
1✔
1232
      WorkflowStateMachines.this.isContinueAsNewSuggested = isContinueAsNewSuggested;
1✔
1233

1234
      eventLoop();
1✔
1235
    }
1✔
1236

1237
    @Override
1238
    public void updateRunId(String currentRunId) {
1239
      WorkflowStateMachines.this.currentRunId = currentRunId;
×
1240
    }
×
1241
  }
1242

1243
  /**
1244
   * Extracts the eventId of the "initial command" for the given event.
1245
   *
1246
   * <p>The "initial command" is the event which started a group of related events:
1247
   * ActivityTaskScheduled, TimerStarted, and so on; for events which are not part of a group, the
1248
   * event's own eventId is returned. If the event has an unknown type but is marked as ignorable,
1249
   * then {@link OptionalLong#empty()} is returned instead.
1250
   *
1251
   * @return the eventId of the initial command, or {@link OptionalLong#empty()}
1252
   */
1253
  private OptionalLong getInitialCommandEventId(HistoryEvent event) {
1254
    switch (event.getEventType()) {
1✔
1255
      case EVENT_TYPE_ACTIVITY_TASK_STARTED:
1256
        return OptionalLong.of(event.getActivityTaskStartedEventAttributes().getScheduledEventId());
1✔
1257
      case EVENT_TYPE_ACTIVITY_TASK_COMPLETED:
1258
        return OptionalLong.of(
1✔
1259
            event.getActivityTaskCompletedEventAttributes().getScheduledEventId());
1✔
1260
      case EVENT_TYPE_ACTIVITY_TASK_FAILED:
1261
        return OptionalLong.of(event.getActivityTaskFailedEventAttributes().getScheduledEventId());
1✔
1262
      case EVENT_TYPE_ACTIVITY_TASK_TIMED_OUT:
1263
        return OptionalLong.of(
1✔
1264
            event.getActivityTaskTimedOutEventAttributes().getScheduledEventId());
1✔
1265
      case EVENT_TYPE_ACTIVITY_TASK_CANCEL_REQUESTED:
1266
        return OptionalLong.of(
×
1267
            event.getActivityTaskCancelRequestedEventAttributes().getScheduledEventId());
×
1268
      case EVENT_TYPE_ACTIVITY_TASK_CANCELED:
1269
        return OptionalLong.of(
1✔
1270
            event.getActivityTaskCanceledEventAttributes().getScheduledEventId());
1✔
1271
      case EVENT_TYPE_TIMER_FIRED:
1272
        return OptionalLong.of(event.getTimerFiredEventAttributes().getStartedEventId());
1✔
1273
      case EVENT_TYPE_TIMER_CANCELED:
1274
        return OptionalLong.of(event.getTimerCanceledEventAttributes().getStartedEventId());
×
1275
      case EVENT_TYPE_REQUEST_CANCEL_EXTERNAL_WORKFLOW_EXECUTION_FAILED:
1276
        return OptionalLong.of(
×
1277
            event
1278
                .getRequestCancelExternalWorkflowExecutionFailedEventAttributes()
×
1279
                .getInitiatedEventId());
×
1280
      case EVENT_TYPE_EXTERNAL_WORKFLOW_EXECUTION_CANCEL_REQUESTED:
1281
        return OptionalLong.of(
1✔
1282
            event
1283
                .getExternalWorkflowExecutionCancelRequestedEventAttributes()
1✔
1284
                .getInitiatedEventId());
1✔
1285
      case EVENT_TYPE_START_CHILD_WORKFLOW_EXECUTION_FAILED:
1286
        return OptionalLong.of(
1✔
1287
            event.getStartChildWorkflowExecutionFailedEventAttributes().getInitiatedEventId());
1✔
1288
      case EVENT_TYPE_CHILD_WORKFLOW_EXECUTION_STARTED:
1289
        return OptionalLong.of(
1✔
1290
            event.getChildWorkflowExecutionStartedEventAttributes().getInitiatedEventId());
1✔
1291
      case EVENT_TYPE_CHILD_WORKFLOW_EXECUTION_COMPLETED:
1292
        return OptionalLong.of(
1✔
1293
            event.getChildWorkflowExecutionCompletedEventAttributes().getInitiatedEventId());
1✔
1294
      case EVENT_TYPE_CHILD_WORKFLOW_EXECUTION_FAILED:
1295
        return OptionalLong.of(
1✔
1296
            event.getChildWorkflowExecutionFailedEventAttributes().getInitiatedEventId());
1✔
1297
      case EVENT_TYPE_CHILD_WORKFLOW_EXECUTION_CANCELED:
1298
        return OptionalLong.of(
1✔
1299
            event.getChildWorkflowExecutionCanceledEventAttributes().getInitiatedEventId());
1✔
1300
      case EVENT_TYPE_CHILD_WORKFLOW_EXECUTION_TIMED_OUT:
1301
        return OptionalLong.of(
1✔
1302
            event.getChildWorkflowExecutionTimedOutEventAttributes().getInitiatedEventId());
1✔
1303
      case EVENT_TYPE_CHILD_WORKFLOW_EXECUTION_TERMINATED:
1304
        return OptionalLong.of(
×
1305
            event.getChildWorkflowExecutionTerminatedEventAttributes().getInitiatedEventId());
×
1306
      case EVENT_TYPE_SIGNAL_EXTERNAL_WORKFLOW_EXECUTION_FAILED:
1307
        return OptionalLong.of(
1✔
1308
            event.getSignalExternalWorkflowExecutionFailedEventAttributes().getInitiatedEventId());
1✔
1309
      case EVENT_TYPE_EXTERNAL_WORKFLOW_EXECUTION_SIGNALED:
1310
        return OptionalLong.of(
1✔
1311
            event.getExternalWorkflowExecutionSignaledEventAttributes().getInitiatedEventId());
1✔
1312
      case EVENT_TYPE_WORKFLOW_TASK_STARTED:
1313
        return OptionalLong.of(event.getWorkflowTaskStartedEventAttributes().getScheduledEventId());
1✔
1314
      case EVENT_TYPE_WORKFLOW_TASK_COMPLETED:
1315
        return OptionalLong.of(
1✔
1316
            event.getWorkflowTaskCompletedEventAttributes().getScheduledEventId());
1✔
1317
      case EVENT_TYPE_WORKFLOW_TASK_TIMED_OUT:
1318
        return OptionalLong.of(
1✔
1319
            event.getWorkflowTaskTimedOutEventAttributes().getScheduledEventId());
1✔
1320
      case EVENT_TYPE_WORKFLOW_TASK_FAILED:
1321
        return OptionalLong.of(event.getWorkflowTaskFailedEventAttributes().getScheduledEventId());
1✔
1322

1323
      case EVENT_TYPE_ACTIVITY_TASK_SCHEDULED:
1324
      case EVENT_TYPE_TIMER_STARTED:
1325
      case EVENT_TYPE_MARKER_RECORDED:
1326
      case EVENT_TYPE_SIGNAL_EXTERNAL_WORKFLOW_EXECUTION_INITIATED:
1327
      case EVENT_TYPE_START_CHILD_WORKFLOW_EXECUTION_INITIATED:
1328
      case EVENT_TYPE_REQUEST_CANCEL_EXTERNAL_WORKFLOW_EXECUTION_INITIATED:
1329
      case EVENT_TYPE_WORKFLOW_EXECUTION_CONTINUED_AS_NEW:
1330
      case EVENT_TYPE_WORKFLOW_EXECUTION_TERMINATED:
1331
      case EVENT_TYPE_UPSERT_WORKFLOW_SEARCH_ATTRIBUTES:
1332
      case EVENT_TYPE_WORKFLOW_TASK_SCHEDULED:
1333
      case EVENT_TYPE_WORKFLOW_EXECUTION_SIGNALED:
1334
      case EVENT_TYPE_WORKFLOW_EXECUTION_STARTED:
1335
      case EVENT_TYPE_WORKFLOW_EXECUTION_COMPLETED:
1336
      case EVENT_TYPE_WORKFLOW_EXECUTION_FAILED:
1337
      case EVENT_TYPE_WORKFLOW_EXECUTION_TIMED_OUT:
1338
      case EVENT_TYPE_WORKFLOW_EXECUTION_CANCEL_REQUESTED:
1339
      case EVENT_TYPE_WORKFLOW_EXECUTION_CANCELED:
1340
      case EVENT_TYPE_WORKFLOW_EXECUTION_UPDATE_ADMITTED:
1341
        return OptionalLong.of(event.getEventId());
1✔
1342

1343
      default:
1344
        if (event.getWorkerMayIgnore()) {
1✔
1345
          return OptionalLong.empty();
1✔
1346
        }
1347
        throw new IllegalArgumentException("Unexpected event type: " + event.getEventType());
1✔
1348
    }
1349
  }
1350

1351
  /**
1352
   * Workflow code executes only while event loop is running. So operations that can be invoked from
1353
   * the workflow have to satisfy this condition.
1354
   */
1355
  private void checkEventLoopExecuting() {
1356
    if (!eventLoopExecuting) {
1✔
1357
      // this call doesn't yield or await, because the await function returns true,
1358
      // but it checks if the workflow thread needs to be destroyed
1359
      WorkflowThread.await("kill workflow thread if destroy requested", () -> true);
×
1360
      throw new IllegalStateException("Operation allowed only while eventLoop is running");
×
1361
    }
1362
  }
1✔
1363

1364
  private String createEventHandlingMessage(HistoryEvent event) {
1365
    return "Failure handling event "
1✔
1366
        + event.getEventId()
1✔
1367
        + " of type '"
1368
        + event.getEventType()
1✔
1369
        + "' "
1370
        + (this.isReplaying() ? "during replay" : "during execution");
1✔
1371
  }
1372

1373
  private String createShortCurrentStateMessagePostfix() {
1374
    return String.format(
1✔
1375
        "{WorkflowTaskStartedEventId=%s, CurrentStartedEventId=%s}",
1376
        this.workflowTaskStartedEventId, this.lastWFTStartedEventId);
1✔
1377
  }
1378
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc