• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

temporalio / sdk-java / #333

16 Oct 2024 07:28PM UTC coverage: 78.65% (+0.6%) from 78.085%
#333

push

github

web-flow
Fix code coverage (#2275)

22670 of 28824 relevant lines covered (78.65%)

0.79 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

93.97
/temporal-sdk/src/main/java/io/temporal/internal/statemachines/WorkflowStateMachines.java
1
/*
2
 * Copyright (C) 2022 Temporal Technologies, Inc. All Rights Reserved.
3
 *
4
 * Copyright (C) 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved.
5
 *
6
 * Modifications copyright (C) 2017 Uber Technologies, Inc.
7
 *
8
 * Licensed under the Apache License, Version 2.0 (the "License");
9
 * you may not use this material except in compliance with the License.
10
 * You may obtain a copy of the License at
11
 *
12
 *   http://www.apache.org/licenses/LICENSE-2.0
13
 *
14
 * Unless required by applicable law or agreed to in writing, software
15
 * distributed under the License is distributed on an "AS IS" BASIS,
16
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17
 * See the License for the specific language governing permissions and
18
 * limitations under the License.
19
 */
20

21
package io.temporal.internal.statemachines;
22

23
import static io.temporal.api.enums.v1.CommandType.COMMAND_TYPE_PROTOCOL_MESSAGE;
24
import static io.temporal.internal.common.WorkflowExecutionUtils.getEventTypeForCommand;
25
import static io.temporal.internal.common.WorkflowExecutionUtils.isCommandEvent;
26
import static io.temporal.serviceclient.CheckedExceptionWrapper.unwrap;
27

28
import com.google.common.annotations.VisibleForTesting;
29
import com.google.common.base.Preconditions;
30
import com.google.common.base.Strings;
31
import com.google.protobuf.Any;
32
import io.temporal.api.command.v1.*;
33
import io.temporal.api.common.v1.*;
34
import io.temporal.api.enums.v1.EventType;
35
import io.temporal.api.failure.v1.Failure;
36
import io.temporal.api.history.v1.*;
37
import io.temporal.api.protocol.v1.Message;
38
import io.temporal.api.sdk.v1.UserMetadata;
39
import io.temporal.api.workflowservice.v1.GetSystemInfoResponse;
40
import io.temporal.failure.CanceledFailure;
41
import io.temporal.internal.common.*;
42
import io.temporal.internal.history.LocalActivityMarkerUtils;
43
import io.temporal.internal.history.VersionMarkerUtils;
44
import io.temporal.internal.sync.WorkflowThread;
45
import io.temporal.internal.worker.LocalActivityResult;
46
import io.temporal.worker.NonDeterministicException;
47
import io.temporal.workflow.ChildWorkflowCancellationType;
48
import io.temporal.workflow.Functions;
49
import java.nio.charset.StandardCharsets;
50
import java.util.*;
51
import javax.annotation.Nullable;
52

53
public final class WorkflowStateMachines {
54

55
  enum HandleEventStatus {
1✔
56
    OK,
1✔
57
    NON_MATCHING_EVENT
1✔
58
  }
59

60
  /** Initial set of SDK flags that will be set on all new workflow executions. */
61
  private static final List<SdkFlag> initialFlags =
1✔
62
      Collections.unmodifiableList(
1✔
63
          Collections.singletonList(SdkFlag.SKIP_YIELD_ON_DEFAULT_VERSION));
1✔
64

65
  /**
66
   * EventId of the WorkflowTaskStarted event of the Workflow Task that was picked up by a worker
67
   * and triggered a current replay or execution. It's expected to be the last event in the history
68
   * if we continue to execute the workflow.
69
   *
70
   * <p>For direct (legacy) queries, it may be:
71
   *
72
   * <ul>
73
   *   <li>0 if it's query a closed workflow execution
74
   *   <li>id of the last successfully completed Workflow Task if the workflow is not closed
75
   * </ul>
76
   *
77
   * <p>Set from the "outside" from the PollWorkflowTaskQueueResponse. Not modified by the SDK state
78
   * machines.
79
   */
80
  private long workflowTaskStartedEventId;
81

82
  /** EventId of the last WorkflowTaskStarted event handled by these state machines. */
83
  private long lastWFTStartedEventId;
84

85
  /** The Build ID used in the current WFT if already completed and set (may be null) */
86
  private String currentTaskBuildId;
87

88
  private long historySize;
89

90
  private boolean isContinueAsNewSuggested;
91

92
  /**
93
   * EventId of the last event seen by these state machines. Events earlier than this one will be
94
   * discarded.
95
   */
96
  private long lastHandledEventId;
97

98
  private final StatesMachinesCallback callbacks;
99

100
  /** Callback to send new commands to. */
101
  private final Functions.Proc1<CancellableCommand> commandSink;
102

103
  /**
104
   * currentRunId is used as seed by Workflow.newRandom and randomUUID. It allows to generate them
105
   * deterministically.
106
   */
107
  private String currentRunId;
108

109
  /** Used Workflow.newRandom and randomUUID together with currentRunId. */
110
  private long idCounter;
111

112
  /** Current workflow time. */
113
  private long currentTimeMillis = -1;
1✔
114

115
  private final Map<Long, EntityStateMachine> stateMachines = new HashMap<>();
1✔
116

117
  /** Key is the protocol instance id */
118
  private final Map<String, EntityStateMachine> protocolStateMachines = new HashMap<>();
1✔
119

120
  private final Queue<Message> messageOutbox = new ArrayDeque<>();
1✔
121

122
  private final Queue<CancellableCommand> commands = new ArrayDeque<>();
1✔
123

124
  /**
125
   * Commands generated by the currently processed workflow task. It is a queue as commands can be
126
   * added (due to marker based commands) while iterating over already added commands.
127
   */
128
  private final Queue<CancellableCommand> cancellableCommands = new ArrayDeque<>();
1✔
129

130
  /**
131
   * Is workflow executing new code or replaying from the history. The definition of replaying here
132
   * is that we are no longer replaying as soon as we see new events that have never been seen or
133
   * produced by the SDK.
134
   *
135
   * <p>Specifically, replay ends once we have seen any non-command event (IE: events that aren't a
136
   * result of something we produced in the SDK) on a WFT which has the final event in history
137
   * (meaning we are processing the most recent WFT and there are no more subsequent WFTs). WFT
138
   * Completed in this case does not count as a non-command event, because that will typically show
139
   * up as the first event in an incremental history, and we want to ignore it and its associated
140
   * commands since we "produced" them.
141
   *
142
   * <p>Note: that this flag ALWAYS flips to true for the time when we apply events from the server
143
   * even if the commands were created by an actual execution with replaying=false.
144
   */
145
  private boolean replaying;
146

147
  /** Used to ensure that event loop is not executed recursively. */
148
  private boolean eventLoopExecuting;
149

150
  /**
151
   * Used to avoid recursive calls to {@link #prepareCommands()}.
152
   *
153
   * <p>Such calls happen when sideEffects and localActivity markers are processed.
154
   */
155
  private boolean preparing;
156

157
  /** Key is mutable side effect id */
158
  private final Map<String, MutableSideEffectStateMachine> mutableSideEffects = new HashMap<>();
1✔
159

160
  /** Key is changeId */
161
  private final Map<String, VersionStateMachine> versions = new HashMap<>();
1✔
162

163
  /** Map of local activities by their id. */
164
  private final Map<String, LocalActivityStateMachine> localActivityMap = new HashMap<>();
1✔
165

166
  private List<ExecuteLocalActivityParameters> localActivityRequests = new ArrayList<>();
1✔
167

168
  private final Functions.Proc1<ExecuteLocalActivityParameters> localActivityRequestSink;
169
  private final Functions.Proc1<StateMachine> stateMachineSink;
170

171
  private final WFTBuffer wftBuffer = new WFTBuffer();
1✔
172

173
  private List<Message> messages = new ArrayList<>();
1✔
174

175
  /**
176
   * Set of accepted durably admitted updates by update id a "durably admitted" update is one with
177
   * an UPDATE_ADMITTED event.
178
   */
179
  private final Set<String> acceptedUpdates = new HashSet<>();
1✔
180

181
  private final SdkFlags flags;
182

183
  public WorkflowStateMachines(
184
      StatesMachinesCallback callbacks, GetSystemInfoResponse.Capabilities capabilities) {
185
    this(callbacks, (stateMachine) -> {}, capabilities);
1✔
186
  }
1✔
187

188
  @VisibleForTesting
189
  public WorkflowStateMachines(
190
      StatesMachinesCallback callbacks,
191
      Functions.Proc1<StateMachine> stateMachineSink,
192
      GetSystemInfoResponse.Capabilities capabilities) {
1✔
193
    this.callbacks = Objects.requireNonNull(callbacks);
1✔
194
    this.commandSink = cancellableCommands::add;
1✔
195
    this.stateMachineSink = stateMachineSink;
1✔
196
    this.localActivityRequestSink = (request) -> localActivityRequests.add(request);
1✔
197
    this.flags = new SdkFlags(capabilities.getSdkMetadata(), this::isReplaying);
1✔
198
  }
1✔
199

200
  @VisibleForTesting
201
  public WorkflowStateMachines(
202
      StatesMachinesCallback callbacks, Functions.Proc1<StateMachine> stateMachineSink) {
1✔
203
    this.callbacks = Objects.requireNonNull(callbacks);
1✔
204
    this.commandSink = cancellableCommands::add;
1✔
205
    this.stateMachineSink = stateMachineSink;
1✔
206
    this.localActivityRequestSink = (request) -> localActivityRequests.add(request);
1✔
207
    this.flags = new SdkFlags(false, this::isReplaying);
1✔
208
  }
1✔
209

210
  // TODO revisit and potentially remove workflowTaskStartedEventId at all from the state machines.
211
  // The only place where it's used is WorkflowTaskStateMachine to understand that there will be
212
  // no completion event in the history.
213
  // This is tricky, because direct queries come with 0 as a magic value in this field if
214
  // execution is completed for example.
215
  // Most likely we can rework WorkflowTaskStateMachine to use only hasNext.
216
  /**
217
   * @param workflowTaskStartedEventId eventId of the workflowTask that was picked up by a worker
218
   *     and triggered an execution. Used in {@link WorkflowTaskStateMachine} only to understand
219
   *     that this workflow task will not have a matching closing event and needs to be executed.
220
   */
221
  public void setWorkflowStartedEventId(long workflowTaskStartedEventId) {
222
    this.workflowTaskStartedEventId = workflowTaskStartedEventId;
1✔
223
  }
1✔
224

225
  public void resetStartedEvenId(long eventId) {
226
    // We must reset the last event we handled to be after the last WFT we really completed
227
    // + any command events (since the SDK "processed" those when it emitted the commands). This
228
    // is also equal to what we just processed in the speculative task, minus two, since we
229
    // would've just handled the most recent WFT started event, and we need to drop that & the
230
    // schedule event just before it.
231
    long resetLastHandledEventId = this.lastHandledEventId - 2;
1✔
232
    // We have to drop any state machines (which should only be one workflow task machine)
233
    // created when handling the speculative workflow task
234
    for (long i = this.lastHandledEventId; i > resetLastHandledEventId; i--) {
1✔
235
      stateMachines.remove(i);
1✔
236
    }
237
    this.lastWFTStartedEventId = eventId;
1✔
238
    this.lastHandledEventId = resetLastHandledEventId;
1✔
239
  }
1✔
240

241
  public long getLastWFTStartedEventId() {
242
    return lastWFTStartedEventId;
1✔
243
  }
244

245
  public long getCurrentWFTStartedEventId() {
246
    return workflowTaskStartedEventId;
×
247
  }
248

249
  public long getHistorySize() {
250
    return historySize;
1✔
251
  }
252

253
  @Nullable
254
  public String getCurrentTaskBuildId() {
255
    return currentTaskBuildId;
×
256
  }
257

258
  public boolean isContinueAsNewSuggested() {
259
    return isContinueAsNewSuggested;
1✔
260
  }
261

262
  public void setReplaying(boolean replaying) {
263
    this.replaying = replaying;
1✔
264
  }
1✔
265

266
  public void setMessages(List<Message> messages) {
267
    this.messages = new ArrayList<>(messages);
1✔
268
  }
1✔
269

270
  /**
271
   * Handle a single event from the workflow history.
272
   *
273
   * @param event event from the history.
274
   * @param hasNextEvent false if this is the last event in the history.
275
   */
276
  public void handleEvent(HistoryEvent event, boolean hasNextEvent) {
277
    long eventId = event.getEventId();
1✔
278
    if (eventId <= lastHandledEventId) {
1✔
279
      // already handled
280
      return;
1✔
281
    }
282
    Preconditions.checkState(
1✔
283
        eventId == lastHandledEventId + 1,
284
        "History is out of order. "
285
            + "There is a gap between the last event workflow state machine observed and currently handling event. "
286
            + "Last processed eventId: %s, handling eventId: %s",
287
        lastHandledEventId,
288
        eventId);
289

290
    lastHandledEventId = eventId;
1✔
291
    boolean readyToPeek = wftBuffer.addEvent(event, hasNextEvent);
1✔
292
    if (readyToPeek) {
1✔
293
      handleEventsBatch(wftBuffer.fetch(), hasNextEvent);
1✔
294
    }
295
  }
1✔
296

297
  /**
298
   * Handle an events batch for one workflow task. Events that are related to one workflow task
299
   * during replay should be prefetched and supplied in one batch.
300
   *
301
   * @param eventBatch events belong to one workflow task
302
   * @param hasNextBatch true if there are more events in the history follow this batch, false if
303
   *     this batch contains the last events of the history
304
   */
305
  private void handleEventsBatch(WFTBuffer.EventBatch eventBatch, boolean hasNextBatch) {
306
    List<HistoryEvent> events = eventBatch.getEvents();
1✔
307
    if (EventType.EVENT_TYPE_WORKFLOW_EXECUTION_STARTED.equals(events.get(0).getEventType())) {
1✔
308
      for (SdkFlag flag : initialFlags) {
1✔
309
        flags.tryUseSdkFlag(flag);
1✔
310
      }
1✔
311
    }
312

313
    if (eventBatch.getWorkflowTaskCompletedEvent().isPresent()) {
1✔
314
      for (HistoryEvent event : events) {
1✔
315
        handleSingleEventLookahead(event);
1✔
316
      }
1✔
317
    }
318

319
    for (Iterator<HistoryEvent> iterator = events.iterator(); iterator.hasNext(); ) {
1✔
320
      HistoryEvent event = iterator.next();
1✔
321

322
      // On replay the messages are available after the workflow task schedule event, so we
323
      // need to handle them before workflow task started event to maintain a consistent order.
324
      for (Message msg : this.takeLTE(event.getEventId() - 1)) {
1✔
325
        handleSingleMessage(msg);
1✔
326
      }
1✔
327

328
      try {
329
        boolean isLastTask =
1✔
330
            !hasNextBatch && !eventBatch.getWorkflowTaskCompletedEvent().isPresent();
1✔
331
        boolean hasNextEvent = iterator.hasNext() || hasNextBatch;
1✔
332
        handleSingleEvent(event, isLastTask, hasNextEvent);
1✔
333
      } catch (RuntimeException e) {
1✔
334
        throw createEventProcessingException(e, event);
1✔
335
      }
1✔
336

337
      for (Message msg : this.takeLTE(event.getEventId())) {
1✔
338
        handleSingleMessage(msg);
1✔
339
      }
1✔
340
    }
1✔
341
  }
1✔
342

343
  /** Handle an event when looking ahead at history during replay */
344
  private void handleSingleEventLookahead(HistoryEvent event) {
345
    EventType eventType = event.getEventType();
1✔
346
    switch (eventType) {
1✔
347
      case EVENT_TYPE_MARKER_RECORDED:
348
        try {
349
          preloadVersionMarker(event);
1✔
350
        } catch (RuntimeException e) {
1✔
351
          throw createEventProcessingException(e, event);
1✔
352
        }
1✔
353
        break;
354
      case EVENT_TYPE_WORKFLOW_EXECUTION_UPDATE_ACCEPTED:
355
        // Look ahead to infer protocol messages
356
        WorkflowExecutionUpdateAcceptedEventAttributes updateEvent =
1✔
357
            event.getWorkflowExecutionUpdateAcceptedEventAttributes();
1✔
358
        // If an EXECUTION_UPDATE_ACCEPTED event does not have an accepted request, then it
359
        // must be from an admitted update. This is the only way to infer an admitted update was
360
        // accepted.
361
        if (!updateEvent.hasAcceptedRequest()) {
1✔
362
          acceptedUpdates.add(updateEvent.getProtocolInstanceId());
1✔
363
        } else {
364
          messages.add(
1✔
365
              Message.newBuilder()
1✔
366
                  .setId(updateEvent.getAcceptedRequestMessageId())
1✔
367
                  .setProtocolInstanceId(updateEvent.getProtocolInstanceId())
1✔
368
                  .setEventId(updateEvent.getAcceptedRequestSequencingEventId())
1✔
369
                  .setBody(Any.pack(updateEvent.getAcceptedRequest()))
1✔
370
                  .build());
1✔
371
        }
372
        break;
1✔
373
      case EVENT_TYPE_WORKFLOW_TASK_COMPLETED:
374
        WorkflowTaskCompletedEventAttributes completedEvent =
1✔
375
            event.getWorkflowTaskCompletedEventAttributes();
1✔
376
        String maybeBuildId = completedEvent.getWorkerVersion().getBuildId();
1✔
377
        if (!maybeBuildId.isEmpty()) {
1✔
378
          currentTaskBuildId = maybeBuildId;
×
379
        }
380
        for (Integer flag : completedEvent.getSdkMetadata().getLangUsedFlagsList()) {
1✔
381
          SdkFlag sdkFlag = SdkFlag.getValue(flag);
1✔
382
          if (sdkFlag.equals(SdkFlag.UNKNOWN)) {
1✔
383
            throw new IllegalArgumentException("Unknown SDK flag:" + flag);
×
384
          }
385
          flags.setSdkFlag(sdkFlag);
1✔
386
        }
1✔
387
        // Remove any finished update protocol state machines. We can't remove them on an event like
388
        // other state machines because a rejected update produces no event in history.
389
        protocolStateMachines.entrySet().removeIf(entry -> entry.getValue().isFinalState());
1✔
390
        break;
1✔
391
      default:
392
        break;
393
    }
394
  }
1✔
395

396
  private List<Message> takeLTE(long eventId) {
397
    List<Message> m = new ArrayList<>();
1✔
398
    List<Message> remainingMessages = new ArrayList<>();
1✔
399
    for (Message msg : this.messages) {
1✔
400
      if (msg.getEventId() > eventId) {
1✔
401
        remainingMessages.add(msg);
1✔
402
      } else {
403
        m.add(msg);
1✔
404
      }
405
    }
1✔
406
    this.messages = remainingMessages;
1✔
407
    return m;
1✔
408
  }
409

410
  private RuntimeException createEventProcessingException(RuntimeException e, HistoryEvent event) {
411
    Throwable ex = unwrap(e);
1✔
412
    if (ex instanceof NonDeterministicException) {
1✔
413
      // just appending the message in front of an existing message, saving the original stacktrace
414
      NonDeterministicException modifiedException =
1✔
415
          new NonDeterministicException(
416
              createEventHandlingMessage(event)
1✔
417
                  + ". "
418
                  + ex.getMessage()
1✔
419
                  + ". "
420
                  + createShortCurrentStateMessagePostfix(),
1✔
421
              ex.getCause());
1✔
422
      modifiedException.setStackTrace(ex.getStackTrace());
1✔
423
      return modifiedException;
1✔
424
    } else {
425
      return new InternalWorkflowTaskException(
1✔
426
          createEventHandlingMessage(event) + ". " + createShortCurrentStateMessagePostfix(), ex);
1✔
427
    }
428
  }
429

430
  private void handleSingleMessage(Message message) {
431
    // Get or create protocol state machine based on Instance ID and protocolName
432
    EntityStateMachine stateMachine =
1✔
433
        protocolStateMachines.computeIfAbsent(
1✔
434
            message.getProtocolInstanceId(),
1✔
435
            (protocolInstance) -> {
436
              String protocolName = ProtocolUtils.getProtocol(message);
1✔
437
              Optional<ProtocolType> type = ProtocolType.get(protocolName);
1✔
438
              if (type.isPresent()) {
1✔
439
                switch (type.get()) {
1✔
440
                  case UPDATE_V1:
441
                    return UpdateProtocolStateMachine.newInstance(
1✔
442
                        this::isReplaying,
443
                        callbacks::update,
1✔
444
                        this::sendMessage,
445
                        commandSink,
446
                        stateMachineSink);
447
                  default:
448
                    throw new IllegalArgumentException("Unknown protocol type:" + protocolName);
×
449
                }
450
              }
451
              throw new IllegalArgumentException("Protocol type not specified:" + message);
×
452
            });
453
    stateMachine.handleMessage(message);
1✔
454
  }
1✔
455

456
  private void handleSingleEvent(HistoryEvent event, boolean lastTask, boolean hasNextEvent) {
457
    if (isCommandEvent(event)) {
1✔
458
      handleCommandEvent(event);
1✔
459
      return;
1✔
460
    }
461

462
    // We don't explicitly check if the event is a command event here because it's already handled
463
    // above.
464
    if (replaying
1✔
465
        && lastTask
466
        && event.getEventType() != EventType.EVENT_TYPE_WORKFLOW_TASK_COMPLETED) {
1✔
467
      replaying = false;
1✔
468
    }
469

470
    final OptionalLong initialCommandEventId = getInitialCommandEventId(event);
1✔
471
    if (!initialCommandEventId.isPresent()) {
1✔
472
      return;
1✔
473
    }
474

475
    EntityStateMachine c = stateMachines.get(initialCommandEventId.getAsLong());
1✔
476
    if (c != null) {
1✔
477
      c.handleEvent(event, hasNextEvent);
1✔
478
      if (c.isFinalState()) {
1✔
479
        stateMachines.remove(initialCommandEventId.getAsLong());
1✔
480
      }
481
    } else {
482
      handleNonStatefulEvent(event, hasNextEvent);
1✔
483
    }
484
  }
1✔
485

486
  /**
487
   * Handles command event. Command event is an event which is generated from a command emitted by a
488
   * past decision. Each command has a correspondent event. For example ScheduleActivityTaskCommand
489
   * is recorded to the history as ActivityTaskScheduledEvent.
490
   *
491
   * <p>Command events always follow WorkflowTaskCompletedEvent.
492
   *
493
   * <p>The handling consists from verifying that the next command in the commands queue matches the
494
   * event, command state machine is notified about the event and the command is removed from the
495
   * commands queue.
496
   */
497
  private void handleCommandEvent(HistoryEvent event) {
498
    if (handleLocalActivityMarker(event)) {
1✔
499
      return;
1✔
500
    }
501

502
    // Match event to the next command in the stateMachine queue.
503
    // After matching the command is notified about the event and is removed from the
504
    // queue.
505
    CancellableCommand matchingCommand = null;
1✔
506
    while (matchingCommand == null) {
1✔
507
      // handleVersionMarker can skip a marker event if the getVersion call was removed.
508
      // In this case we don't want to consume a command.
509
      // That's why peek is used instead of poll.
510
      CancellableCommand command = commands.peek();
1✔
511
      if (command == null) {
1✔
512
        if (handleNonMatchingVersionMarker(event)) {
×
513
          // this event is a version marker for removed getVersion call.
514
          // Handle the version marker as unmatched and return even if there is no commands to match
515
          // it against.
516
          return;
×
517
        } else {
518
          throw new NonDeterministicException("No command scheduled that corresponds to " + event);
×
519
        }
520
      }
521

522
      if (command.isCanceled()) {
1✔
523
        // Consume and skip the command
524
        commands.poll();
1✔
525
        continue;
1✔
526
      }
527

528
      // Note that handleEvent can cause a command cancellation in case of
529
      // 1. MutableSideEffect
530
      // 2. Version State Machine during replay cancels the command and enters SKIPPED state
531
      //    if it handled non-matching event.
532
      HandleEventStatus status = command.handleEvent(event, true);
1✔
533

534
      if (command.isCanceled()) {
1✔
535
        // Consume and skip the command
536
        commands.poll();
1✔
537
        continue;
1✔
538
      }
539

540
      switch (status) {
1✔
541
        case OK:
542
          // Consume the command
543
          commands.poll();
1✔
544
          matchingCommand = command;
1✔
545
          break;
1✔
546
        case NON_MATCHING_EVENT:
547
          if (handleNonMatchingVersionMarker(event)) {
1✔
548
            // this event is a version marker for removed getVersion call.
549
            // Handle the version marker as unmatched and return without consuming the command
550
            return;
1✔
551
          } else {
552
            throw new NonDeterministicException(
1✔
553
                "Event "
554
                    + event.getEventId()
1✔
555
                    + " of type "
556
                    + event.getEventType()
1✔
557
                    + " does not"
558
                    + " match command type "
559
                    + command.getCommandType());
1✔
560
          }
561
        default:
562
          throw new IllegalStateException(
×
563
              "Got " + status + " value from command.handleEvent which is not handled");
564
      }
565
    }
1✔
566

567
    validateCommand(matchingCommand.getCommand(), event);
1✔
568
    EntityStateMachine stateMachine = matchingCommand.getStateMachine();
1✔
569
    if (!stateMachine.isFinalState()) {
1✔
570
      stateMachines.put(event.getEventId(), stateMachine);
1✔
571
    }
572
    // Marker is the only command processing of which can cause workflow code execution
573
    // and generation of new state machines.
574
    if (event.getEventType() == EventType.EVENT_TYPE_MARKER_RECORDED) {
1✔
575
      prepareCommands();
1✔
576
    }
577
  }
1✔
578

579
  private void preloadVersionMarker(HistoryEvent event) {
580
    if (VersionMarkerUtils.hasVersionMarkerStructure(event)) {
1✔
581
      String changeId = VersionMarkerUtils.tryGetChangeIdFromVersionMarkerEvent(event);
1✔
582
      if (changeId == null) {
1✔
583
        // if we can't extract changeId, this event will later fail to match with anything
584
        // and the corresponded exception will be thrown
585
        return;
×
586
      }
587
      VersionStateMachine versionStateMachine =
1✔
588
          versions.computeIfAbsent(
1✔
589
              changeId,
590
              (idKey) ->
591
                  VersionStateMachine.newInstance(
1✔
592
                      changeId, this::isReplaying, commandSink, stateMachineSink));
593
      versionStateMachine.handleMarkersPreload(event);
1✔
594
    }
595
  }
1✔
596

597
  private boolean handleNonMatchingVersionMarker(HistoryEvent event) {
598
    String changeId = VersionMarkerUtils.tryGetChangeIdFromVersionMarkerEvent(event);
1✔
599
    if (changeId == null) {
1✔
600
      return false;
1✔
601
    }
602
    VersionStateMachine versionStateMachine = versions.get(changeId);
1✔
603
    Preconditions.checkNotNull(
1✔
604
        versionStateMachine,
605
        "versionStateMachine is expected to be initialized already by execution or preloading");
606
    versionStateMachine.handleNonMatchingEvent(event);
1✔
607
    return true;
1✔
608
  }
609

610
  public List<Command> takeCommands() {
611
    List<Command> result = new ArrayList<>(commands.size());
1✔
612
    for (CancellableCommand command : commands) {
1✔
613
      if (!command.isCanceled()) {
1✔
614
        result.add(command.getCommand());
1✔
615
      }
616
    }
1✔
617
    return result;
1✔
618
  }
619

620
  public void sendMessage(Message message) {
621
    checkEventLoopExecuting();
1✔
622
    if (!isReplaying()) {
1✔
623
      messageOutbox.add(message);
1✔
624
    }
625
  }
1✔
626

627
  public List<Message> takeMessages() {
628
    List<Message> result = new ArrayList<>(messageOutbox.size());
1✔
629
    result.addAll(messageOutbox);
1✔
630
    messageOutbox.clear();
1✔
631
    // Remove any finished update protocol state machines. We can't remove them on an event like
632
    // other state machines because a rejected update produces no event in history.
633
    protocolStateMachines.entrySet().removeIf(entry -> entry.getValue().isFinalState());
1✔
634
    return result;
1✔
635
  }
636

637
  /**
638
   * @return True if the SDK flag is supported in this workflow execution
639
   */
640
  public boolean tryUseSdkFlag(SdkFlag flag) {
641
    return flags.tryUseSdkFlag(flag);
1✔
642
  }
643

644
  /**
645
   * @return Set of all new flags set since the last call
646
   */
647
  public EnumSet<SdkFlag> takeNewSdkFlags() {
648
    return flags.takeNewSdkFlags();
1✔
649
  }
650

651
  private void prepareCommands() {
652
    if (preparing) {
1✔
653
      return;
1✔
654
    }
655
    preparing = true;
1✔
656
    try {
657
      prepareImpl();
1✔
658
    } finally {
659
      preparing = false;
1✔
660
    }
661
  }
1✔
662

663
  private void prepareImpl() {
664
    // handleCommand can lead to code execution because of SideEffect, MutableSideEffect or local
665
    // activity completion. And code execution can lead to creation of new commands and
666
    // cancellation of existing commands. That is the reason for using Queue as a data structure for
667
    // commands.
668
    while (true) {
669
      CancellableCommand command = cancellableCommands.poll();
1✔
670
      if (command == null) {
1✔
671
        break;
1✔
672
      }
673
      // handleCommand should be called even on canceled ones to support mutableSideEffect
674
      command.handleCommand(command.getCommandType());
1✔
675
      commands.add(command);
1✔
676
    }
1✔
677
  }
1✔
678

679
  /**
680
   * Local activity is different from all other entities. It doesn't schedule a marker command when
681
   * the {@link #scheduleLocalActivityTask(ExecuteLocalActivityParameters, Functions.Proc2)} is
682
   * called. The marker is scheduled only when activity completes through ({@link
683
   * #handleLocalActivityCompletion(LocalActivityResult)}). That's why the normal logic of {@link
684
   * #handleCommandEvent(HistoryEvent)}, which assumes that each event has a correspondent command
685
   * during replay, doesn't work. Instead, local activities are matched by their id using
686
   * localActivityMap.
687
   *
688
   * @return true if matched and false if normal event handling should continue.
689
   */
690
  private boolean handleLocalActivityMarker(HistoryEvent event) {
691
    if (!LocalActivityMarkerUtils.hasLocalActivityStructure(event)) {
1✔
692
      return false;
1✔
693
    }
694

695
    MarkerRecordedEventAttributes markerAttributes = event.getMarkerRecordedEventAttributes();
1✔
696
    String id = LocalActivityMarkerUtils.getActivityId(markerAttributes);
1✔
697
    LocalActivityStateMachine stateMachine = localActivityMap.remove(id);
1✔
698
    if (stateMachine == null) {
1✔
699
      String activityType = LocalActivityMarkerUtils.getActivityTypeName(markerAttributes);
×
700
      throw new NonDeterministicException(
×
701
          String.format(
×
702
              "Local activity of type %s is recorded in the history with id %s but was not expected by the execution",
703
              activityType, id));
704
    }
705
    // RESULT_NOTIFIED state means that there is outstanding command that has to be matched
706
    // using standard logic. So return false to let the handleCommand method to run its standard
707
    // logic.
708
    if (stateMachine.getState() == LocalActivityStateMachine.State.RESULT_NOTIFIED) {
1✔
709
      return false;
1✔
710
    }
711
    stateMachine.handleEvent(event, true);
1✔
712
    eventLoop();
1✔
713
    return true;
1✔
714
  }
715

716
  private void eventLoop() {
717
    if (eventLoopExecuting) {
1✔
718
      return;
1✔
719
    }
720
    eventLoopExecuting = true;
1✔
721
    try {
722
      callbacks.eventLoop();
1✔
723
    } finally {
724
      eventLoopExecuting = false;
1✔
725
    }
726
    prepareCommands();
1✔
727
  }
1✔
728

729
  private void handleNonStatefulEvent(HistoryEvent event, boolean hasNextEvent) {
730
    switch (event.getEventType()) {
1✔
731
      case EVENT_TYPE_WORKFLOW_EXECUTION_STARTED:
732
        this.currentRunId =
1✔
733
            event.getWorkflowExecutionStartedEventAttributes().getOriginalExecutionRunId();
1✔
734
        callbacks.start(event);
1✔
735
        break;
1✔
736
      case EVENT_TYPE_WORKFLOW_TASK_SCHEDULED:
737
        WorkflowTaskStateMachine c =
1✔
738
            WorkflowTaskStateMachine.newInstance(
1✔
739
                workflowTaskStartedEventId, new WorkflowTaskCommandsListener());
740
        c.handleEvent(event, hasNextEvent);
1✔
741
        stateMachines.put(event.getEventId(), c);
1✔
742
        break;
1✔
743
      case EVENT_TYPE_WORKFLOW_EXECUTION_SIGNALED:
744
        callbacks.signal(event);
1✔
745
        break;
1✔
746
      case EVENT_TYPE_WORKFLOW_EXECUTION_CANCEL_REQUESTED:
747
        callbacks.cancel(event);
1✔
748
        break;
1✔
749
      case EVENT_TYPE_WORKFLOW_EXECUTION_UPDATE_ADMITTED:
750
        WorkflowExecutionUpdateAdmittedEventAttributes admittedEvent =
1✔
751
            event.getWorkflowExecutionUpdateAdmittedEventAttributes();
1✔
752
        Message msg =
753
            Message.newBuilder()
1✔
754
                .setId(admittedEvent.getRequest().getMeta().getUpdateId() + "/request")
1✔
755
                .setProtocolInstanceId(admittedEvent.getRequest().getMeta().getUpdateId())
1✔
756
                .setEventId(event.getEventId())
1✔
757
                .setBody(Any.pack(admittedEvent.getRequest()))
1✔
758
                .build();
1✔
759
        if (replaying && acceptedUpdates.remove(msg.getProtocolInstanceId()) || !replaying) {
1✔
760
          messages.add(msg);
1✔
761
        }
762
        break;
763
      case EVENT_TYPE_WORKFLOW_EXECUTION_TIMED_OUT:
764
      case UNRECOGNIZED:
765
        break;
1✔
766
      default:
767
        throw new IllegalArgumentException("Unexpected event:" + event);
×
768
    }
769
  }
1✔
770

771
  private long setCurrentTimeMillis(long currentTimeMillis) {
772
    if (this.currentTimeMillis < currentTimeMillis) {
1✔
773
      this.currentTimeMillis = currentTimeMillis;
1✔
774
    }
775
    return this.currentTimeMillis;
1✔
776
  }
777

778
  public long getLastStartedEventId() {
779
    return lastWFTStartedEventId;
1✔
780
  }
781

782
  /**
783
   * @param attributes attributes used to schedule an activity
784
   * @param callback completion callback
785
   * @return an instance of ActivityCommands
786
   */
787
  public Functions.Proc scheduleActivityTask(
788
      ExecuteActivityParameters attributes, Functions.Proc2<Optional<Payloads>, Failure> callback) {
789
    checkEventLoopExecuting();
1✔
790
    ActivityStateMachine activityStateMachine =
1✔
791
        ActivityStateMachine.newInstance(
1✔
792
            attributes,
793
            (p, f) -> {
794
              Failure failure = f != null ? f.getFailure() : null;
1✔
795
              callback.apply(p, failure);
1✔
796

797
              if (f != null
1✔
798
                  && !f.isFromEvent()
1✔
799
                  && failure.hasCause()
1✔
800
                  && failure.getCause().hasCanceledFailureInfo()) {
1✔
801
                // If !f.isFromEvent(), we want to unblock the event loop as the promise got filled
802
                // and the workflow may make progress. If f.isFromEvent(), we need to delay event
803
                // loop triggering until WorkflowTaskStarted.
804
                eventLoop();
1✔
805
              }
806
            },
1✔
807
            commandSink,
808
            stateMachineSink);
809
    return activityStateMachine::cancel;
1✔
810
  }
811

812
  /**
813
   * Creates a new timer state machine
814
   *
815
   * @param attributes timer command attributes
816
   * @param metadata user provided metadata
817
   * @param completionCallback invoked when timer fires or reports cancellation. One of
818
   *     TimerFiredEvent, TimerCanceledEvent.
819
   * @return cancellation callback that should be invoked to initiate timer cancellation
820
   */
821
  public Functions.Proc newTimer(
822
      StartTimerCommandAttributes attributes,
823
      UserMetadata metadata,
824
      Functions.Proc1<HistoryEvent> completionCallback) {
825
    checkEventLoopExecuting();
1✔
826
    TimerStateMachine timer =
1✔
827
        TimerStateMachine.newInstance(
1✔
828
            attributes,
829
            metadata,
830
            (event) -> {
831
              completionCallback.apply(event);
1✔
832
              // Needed due to immediate cancellation
833
              if (event.getEventType() == EventType.EVENT_TYPE_TIMER_CANCELED) {
1✔
834
                eventLoop();
1✔
835
              }
836
            },
1✔
837
            commandSink,
838
            stateMachineSink);
839
    return timer::cancel;
1✔
840
  }
841

842
  /**
843
   * Creates a new child state machine
844
   *
845
   * @param parameters child workflow start command parameters
846
   * @param startedCallback callback that is notified about child start
847
   * @param completionCallback invoked when child reports completion or failure
848
   * @return cancellation callback that should be invoked to cancel the child
849
   */
850
  public Functions.Proc startChildWorkflow(
851
      StartChildWorkflowExecutionParameters parameters,
852
      Functions.Proc2<WorkflowExecution, Exception> startedCallback,
853
      Functions.Proc2<Optional<Payloads>, Exception> completionCallback) {
854
    checkEventLoopExecuting();
1✔
855
    StartChildWorkflowExecutionCommandAttributes attributes = parameters.getRequest().build();
1✔
856
    ChildWorkflowCancellationType cancellationType = parameters.getCancellationType();
1✔
857
    ChildWorkflowStateMachine child =
1✔
858
        ChildWorkflowStateMachine.newInstance(
1✔
859
            attributes,
860
            parameters.getMetadata(),
1✔
861
            startedCallback,
862
            completionCallback,
863
            commandSink,
864
            stateMachineSink);
865
    return () -> {
1✔
866
      if (cancellationType == ChildWorkflowCancellationType.ABANDON) {
1✔
867
        notifyChildCanceled(completionCallback);
1✔
868
        return;
1✔
869
      }
870
      // The only time child can be canceled directly is before its start command
871
      // was sent out to the service. After that RequestCancelExternal should be used.
872
      if (child.isCancellable()) {
1✔
873
        child.cancel();
1✔
874
        return;
1✔
875
      }
876
      if (!child.isFinalState()) {
1✔
877
        requestCancelExternalWorkflowExecution(
1✔
878
            RequestCancelExternalWorkflowExecutionCommandAttributes.newBuilder()
1✔
879
                .setWorkflowId(attributes.getWorkflowId())
1✔
880
                .setNamespace(attributes.getNamespace())
1✔
881
                .setChildWorkflowOnly(true)
1✔
882
                .build(),
1✔
883
            (r, e) -> { // TODO(maxim): Decide what to do if an error is passed to the callback.
884
              if (cancellationType == ChildWorkflowCancellationType.WAIT_CANCELLATION_REQUESTED) {
1✔
885
                notifyChildCanceled(completionCallback);
1✔
886
              }
887
            });
1✔
888
        if (cancellationType == ChildWorkflowCancellationType.TRY_CANCEL) {
1✔
889
          notifyChildCanceled(completionCallback);
1✔
890
        }
891
      }
892
    };
1✔
893
  }
894

895
  public Functions.Proc startNexusOperation(
896
      ScheduleNexusOperationCommandAttributes attributes,
897
      Functions.Proc2<Optional<String>, Failure> startedCallback,
898
      Functions.Proc2<Optional<Payload>, Failure> completionCallback) {
899
    checkEventLoopExecuting();
1✔
900
    NexusOperationStateMachine operation =
1✔
901
        NexusOperationStateMachine.newInstance(
1✔
902
            attributes, startedCallback, completionCallback, commandSink, stateMachineSink);
903
    return () -> {
1✔
904
      if (operation.isCancellable()) {
1✔
905
        operation.cancel();
1✔
906
      }
907
      if (!operation.isFinalState()) {
1✔
908
        requestCancelNexusOperation(
1✔
909
            RequestCancelNexusOperationCommandAttributes.newBuilder()
1✔
910
                .setScheduledEventId(operation.getInitialCommandEventId())
1✔
911
                .build());
1✔
912
      }
913
    };
1✔
914
  }
915

916
  private void notifyChildCanceled(
917
      Functions.Proc2<Optional<Payloads>, Exception> completionCallback) {
918
    CanceledFailure failure = new CanceledFailure("Child canceled");
1✔
919
    completionCallback.apply(Optional.empty(), failure);
1✔
920
    eventLoop();
1✔
921
  }
1✔
922

923
  /**
924
   * @param attributes
925
   * @param completionCallback invoked when signal delivery completes of fails. The following types
926
   */
927
  public Functions.Proc signalExternalWorkflowExecution(
928
      SignalExternalWorkflowExecutionCommandAttributes attributes,
929
      Functions.Proc2<Void, Failure> completionCallback) {
930
    checkEventLoopExecuting();
1✔
931
    return SignalExternalStateMachine.newInstance(
1✔
932
        attributes, completionCallback, commandSink, stateMachineSink);
933
  }
934

935
  /**
936
   * @param attributes attributes to use to cancel external workflow
937
   * @param completionCallback one of ExternalWorkflowExecutionCancelRequestedEvent,
938
   */
939
  public void requestCancelExternalWorkflowExecution(
940
      RequestCancelExternalWorkflowExecutionCommandAttributes attributes,
941
      Functions.Proc2<Void, RuntimeException> completionCallback) {
942
    checkEventLoopExecuting();
1✔
943
    CancelExternalStateMachine.newInstance(
1✔
944
        attributes, completionCallback, commandSink, stateMachineSink);
945
  }
1✔
946

947
  /**
948
   * @param attributes attributes to use to cancel a nexus operation
949
   */
950
  public void requestCancelNexusOperation(RequestCancelNexusOperationCommandAttributes attributes) {
951
    checkEventLoopExecuting();
1✔
952
    CancelNexusOperationStateMachine.newInstance(attributes, commandSink, stateMachineSink);
1✔
953
  }
1✔
954

955
  public void upsertSearchAttributes(SearchAttributes attributes) {
956
    checkEventLoopExecuting();
1✔
957
    UpsertSearchAttributesStateMachine.newInstance(attributes, commandSink, stateMachineSink);
1✔
958
  }
1✔
959

960
  public void upsertMemo(Memo memo) {
961
    checkEventLoopExecuting();
1✔
962
    WorkflowPropertiesModifiedStateMachine.newInstance(
1✔
963
        ModifyWorkflowPropertiesCommandAttributes.newBuilder().setUpsertedMemo(memo).build(),
1✔
964
        commandSink,
965
        stateMachineSink);
966
  }
1✔
967

968
  public void completeWorkflow(Optional<Payloads> workflowOutput) {
969
    checkEventLoopExecuting();
1✔
970
    CompleteWorkflowStateMachine.newInstance(workflowOutput, commandSink, stateMachineSink);
1✔
971
  }
1✔
972

973
  public void failWorkflow(Failure failure) {
974
    checkEventLoopExecuting();
1✔
975
    FailWorkflowStateMachine.newInstance(failure, commandSink, stateMachineSink);
1✔
976
  }
1✔
977

978
  public void cancelWorkflow() {
979
    checkEventLoopExecuting();
1✔
980
    CancelWorkflowStateMachine.newInstance(
1✔
981
        CancelWorkflowExecutionCommandAttributes.getDefaultInstance(),
1✔
982
        commandSink,
983
        stateMachineSink);
984
  }
1✔
985

986
  public void continueAsNewWorkflow(ContinueAsNewWorkflowExecutionCommandAttributes attributes) {
987
    checkEventLoopExecuting();
1✔
988
    ContinueAsNewWorkflowStateMachine.newInstance(attributes, commandSink, stateMachineSink);
1✔
989
  }
1✔
990

991
  public boolean isReplaying() {
992
    return replaying;
1✔
993
  }
994

995
  public long currentTimeMillis() {
996
    return currentTimeMillis;
1✔
997
  }
998

999
  public UUID randomUUID() {
1000
    checkEventLoopExecuting();
1✔
1001
    String runId = currentRunId;
1✔
1002
    if (runId == null) {
1✔
1003
      throw new Error("null currentRunId");
×
1004
    }
1005
    String id = runId + ":" + idCounter++;
1✔
1006
    byte[] bytes = id.getBytes(StandardCharsets.UTF_8);
1✔
1007
    return UUID.nameUUIDFromBytes(bytes);
1✔
1008
  }
1009

1010
  public Random newRandom() {
1011
    checkEventLoopExecuting();
1✔
1012
    return new Random(randomUUID().getLeastSignificantBits());
1✔
1013
  }
1014

1015
  public void sideEffect(
1016
      Functions.Func<Optional<Payloads>> func, Functions.Proc1<Optional<Payloads>> callback) {
1017
    checkEventLoopExecuting();
1✔
1018
    SideEffectStateMachine.newInstance(
1✔
1019
        this::isReplaying,
1020
        func,
1021
        (payloads) -> {
1022
          callback.apply(payloads);
1✔
1023
          // callback unblocked sideEffect call. Give workflow code chance to make progress.
1024
          eventLoop();
1✔
1025
        },
1✔
1026
        commandSink,
1027
        stateMachineSink);
1028
  }
1✔
1029

1030
  /**
1031
   * @param id mutable side effect id
1032
   * @param func given the value from the last marker returns value to store. If result is empty
1033
   *     nothing is recorded into the history.
1034
   * @param callback used to report result or failure
1035
   */
1036
  public void mutableSideEffect(
1037
      String id,
1038
      Functions.Func1<Optional<Payloads>, Optional<Payloads>> func,
1039
      Functions.Proc1<Optional<Payloads>> callback) {
1040
    checkEventLoopExecuting();
1✔
1041
    MutableSideEffectStateMachine stateMachine =
1✔
1042
        mutableSideEffects.computeIfAbsent(
1✔
1043
            id,
1044
            (idKey) ->
1045
                MutableSideEffectStateMachine.newInstance(
1✔
1046
                    idKey, this::isReplaying, commandSink, stateMachineSink));
1047
    stateMachine.mutableSideEffect(
1✔
1048
        func,
1049
        (r) -> {
1050
          callback.apply(r);
1✔
1051
          // callback unblocked mutableSideEffect call. Give workflow code chance to make progress.
1052
          eventLoop();
1✔
1053
        },
1✔
1054
        stateMachineSink);
1055
  }
1✔
1056

1057
  public boolean getVersion(
1058
      String changeId,
1059
      int minSupported,
1060
      int maxSupported,
1061
      Functions.Proc2<Integer, RuntimeException> callback) {
1062
    VersionStateMachine stateMachine =
1✔
1063
        versions.computeIfAbsent(
1✔
1064
            changeId,
1065
            (idKey) ->
1066
                VersionStateMachine.newInstance(
1✔
1067
                    changeId, this::isReplaying, commandSink, stateMachineSink));
1068
    return stateMachine.getVersion(
1✔
1069
        minSupported,
1070
        maxSupported,
1071
        (v, e) -> {
1072
          callback.apply(v, e);
1✔
1073
          // without this getVersion call will trigger the end of WFT,
1074
          // instead we want to prepare subsequent commands and unblock the execution one more
1075
          // time.
1076
          eventLoop();
1✔
1077
        });
1✔
1078
  }
1079

1080
  public List<ExecuteLocalActivityParameters> takeLocalActivityRequests() {
1081
    List<ExecuteLocalActivityParameters> result = localActivityRequests;
1✔
1082
    localActivityRequests = new ArrayList<>();
1✔
1083
    for (ExecuteLocalActivityParameters parameters : result) {
1✔
1084
      LocalActivityStateMachine stateMachine = localActivityMap.get(parameters.getActivityId());
1✔
1085
      stateMachine.markAsSent();
1✔
1086
    }
1✔
1087
    return result;
1✔
1088
  }
1089

1090
  public void handleLocalActivityCompletion(LocalActivityResult laCompletion) {
1091
    String activityId = laCompletion.getActivityId();
1✔
1092
    LocalActivityStateMachine laStateMachine = localActivityMap.get(activityId);
1✔
1093
    if (laStateMachine == null) {
1✔
1094
      throw new IllegalStateException("Unknown local activity: " + activityId);
×
1095
    }
1096
    laStateMachine.handleCompletion(laCompletion);
1✔
1097
    prepareCommands();
1✔
1098
  }
1✔
1099

1100
  public Functions.Proc scheduleLocalActivityTask(
1101
      ExecuteLocalActivityParameters parameters,
1102
      Functions.Proc2<Optional<Payloads>, LocalActivityCallback.LocalActivityFailedException>
1103
          callback) {
1104
    checkEventLoopExecuting();
1✔
1105
    String activityId = parameters.getActivityId();
1✔
1106
    if (Strings.isNullOrEmpty(activityId)) {
1✔
1107
      throw new IllegalArgumentException("Missing activityId: " + activityId);
×
1108
    }
1109
    if (localActivityMap.containsKey(activityId)) {
1✔
1110
      throw new IllegalArgumentException("Duplicated local activity id: " + activityId);
×
1111
    }
1112
    LocalActivityStateMachine commands =
1✔
1113
        LocalActivityStateMachine.newInstance(
1✔
1114
            this::isReplaying,
1115
            this::setCurrentTimeMillis,
1116
            parameters,
1117
            (r, e) -> {
1118
              callback.apply(r, e);
1✔
1119
              // callback unblocked local activity call. Give workflow code chance to make progress.
1120
              eventLoop();
1✔
1121
            },
1✔
1122
            localActivityRequestSink,
1123
            commandSink,
1124
            stateMachineSink,
1125
            currentTimeMillis);
1126
    localActivityMap.put(activityId, commands);
1✔
1127
    return commands::cancel;
1✔
1128
  }
1129

1130
  /** Validates that command matches the event during replay. */
1131
  private void validateCommand(Command command, HistoryEvent event) {
1132
    // ProtocolMessageCommand is different from other commands because it can be associated with
1133
    // multiple types of events
1134
    // TODO(#1781) Validate protocol message is expected type.
1135
    if (command.getCommandType() == COMMAND_TYPE_PROTOCOL_MESSAGE) {
1✔
1136
      ProtocolMessageCommandAttributes commandAttributes =
1✔
1137
          command.getProtocolMessageCommandAttributes();
1✔
1138
      switch (event.getEventType()) {
1✔
1139
        case EVENT_TYPE_WORKFLOW_EXECUTION_UPDATE_ACCEPTED:
1140
          assertMatch(
1✔
1141
              command,
1142
              event,
1143
              "messageType",
1144
              true,
1✔
1145
              commandAttributes.getMessageId().endsWith("accept"));
1✔
1146
          break;
1✔
1147
        case EVENT_TYPE_WORKFLOW_EXECUTION_UPDATE_REJECTED:
1148
          assertMatch(
×
1149
              command,
1150
              event,
1151
              "messageType",
1152
              true,
×
1153
              commandAttributes.getMessageId().endsWith("reject"));
×
1154
          break;
×
1155
        case EVENT_TYPE_WORKFLOW_EXECUTION_UPDATE_COMPLETED:
1156
          assertMatch(
1✔
1157
              command,
1158
              event,
1159
              "messageType",
1160
              true,
1✔
1161
              commandAttributes.getMessageId().endsWith("complete"));
1✔
1162
          break;
1✔
1163
        default:
1164
          throw new IllegalArgumentException("Unexpected event type: " + event.getEventType());
×
1165
      }
1166
      return;
1✔
1167
    }
1168
    assertMatch(
1✔
1169
        command,
1170
        event,
1171
        "eventType",
1172
        getEventTypeForCommand(command.getCommandType()),
1✔
1173
        event.getEventType());
1✔
1174
    switch (command.getCommandType()) {
1✔
1175
      case COMMAND_TYPE_SCHEDULE_ACTIVITY_TASK:
1176
        {
1177
          ScheduleActivityTaskCommandAttributes commandAttributes =
1✔
1178
              command.getScheduleActivityTaskCommandAttributes();
1✔
1179
          ActivityTaskScheduledEventAttributes eventAttributes =
1✔
1180
              event.getActivityTaskScheduledEventAttributes();
1✔
1181
          assertMatch(
1✔
1182
              command,
1183
              event,
1184
              "activityId",
1185
              commandAttributes.getActivityId(),
1✔
1186
              eventAttributes.getActivityId());
1✔
1187
          assertMatch(
1✔
1188
              command,
1189
              event,
1190
              "activityType",
1191
              commandAttributes.getActivityType(),
1✔
1192
              eventAttributes.getActivityType());
1✔
1193
        }
1194
        break;
1✔
1195
      case COMMAND_TYPE_START_CHILD_WORKFLOW_EXECUTION:
1196
        {
1197
          StartChildWorkflowExecutionCommandAttributes commandAttributes =
1✔
1198
              command.getStartChildWorkflowExecutionCommandAttributes();
1✔
1199
          StartChildWorkflowExecutionInitiatedEventAttributes eventAttributes =
1✔
1200
              event.getStartChildWorkflowExecutionInitiatedEventAttributes();
1✔
1201
          assertMatch(
1✔
1202
              command,
1203
              event,
1204
              "workflowId",
1205
              commandAttributes.getWorkflowId(),
1✔
1206
              eventAttributes.getWorkflowId());
1✔
1207
          assertMatch(
1✔
1208
              command,
1209
              event,
1210
              "workflowType",
1211
              commandAttributes.getWorkflowType(),
1✔
1212
              eventAttributes.getWorkflowType());
1✔
1213
        }
1214
        break;
1✔
1215
      case COMMAND_TYPE_REQUEST_CANCEL_ACTIVITY_TASK:
1216
      case COMMAND_TYPE_START_TIMER:
1217
        {
1218
          StartTimerCommandAttributes commandAttributes = command.getStartTimerCommandAttributes();
1✔
1219
          TimerStartedEventAttributes eventAttributes = event.getTimerStartedEventAttributes();
1✔
1220
          assertMatch(
1✔
1221
              command,
1222
              event,
1223
              "timerId",
1224
              commandAttributes.getTimerId(),
1✔
1225
              eventAttributes.getTimerId());
1✔
1226
        }
1227
        break;
1✔
1228
      case COMMAND_TYPE_SCHEDULE_NEXUS_OPERATION:
1229
        {
1230
          ScheduleNexusOperationCommandAttributes commandAttributes =
1✔
1231
              command.getScheduleNexusOperationCommandAttributes();
1✔
1232
          NexusOperationScheduledEventAttributes eventAttributes =
1✔
1233
              event.getNexusOperationScheduledEventAttributes();
1✔
1234
          assertMatch(
1✔
1235
              command,
1236
              event,
1237
              "operation",
1238
              commandAttributes.getOperation(),
1✔
1239
              eventAttributes.getOperation());
1✔
1240
          assertMatch(
1✔
1241
              command,
1242
              event,
1243
              "service",
1244
              commandAttributes.getService(),
1✔
1245
              eventAttributes.getService());
1✔
1246
        }
1247
        break;
1✔
1248
      case COMMAND_TYPE_REQUEST_CANCEL_NEXUS_OPERATION:
1249
        {
1250
          RequestCancelNexusOperationCommandAttributes commandAttributes =
1✔
1251
              command.getRequestCancelNexusOperationCommandAttributes();
1✔
1252
          NexusOperationCancelRequestedEventAttributes eventAttributes =
1✔
1253
              event.getNexusOperationCancelRequestedEventAttributes();
1✔
1254
          assertMatch(
1✔
1255
              command,
1256
              event,
1257
              "scheduleEventId",
1258
              commandAttributes.getScheduledEventId(),
1✔
1259
              eventAttributes.getScheduledEventId());
1✔
1260
        }
1261
        break;
1✔
1262
      case COMMAND_TYPE_CANCEL_TIMER:
1263
      case COMMAND_TYPE_CANCEL_WORKFLOW_EXECUTION:
1264
      case COMMAND_TYPE_REQUEST_CANCEL_EXTERNAL_WORKFLOW_EXECUTION:
1265
      case COMMAND_TYPE_RECORD_MARKER:
1266
      case COMMAND_TYPE_CONTINUE_AS_NEW_WORKFLOW_EXECUTION:
1267
      case COMMAND_TYPE_SIGNAL_EXTERNAL_WORKFLOW_EXECUTION:
1268
      case COMMAND_TYPE_UPSERT_WORKFLOW_SEARCH_ATTRIBUTES:
1269
      case COMMAND_TYPE_COMPLETE_WORKFLOW_EXECUTION:
1270
      case COMMAND_TYPE_FAIL_WORKFLOW_EXECUTION:
1271
      case COMMAND_TYPE_PROTOCOL_MESSAGE:
1272
      case COMMAND_TYPE_MODIFY_WORKFLOW_PROPERTIES:
1273
        break;
1✔
1274
      case UNRECOGNIZED:
1275
      case COMMAND_TYPE_UNSPECIFIED:
1276
        throw new IllegalArgumentException("Unexpected command type: " + command.getCommandType());
×
1277
    }
1278
  }
1✔
1279

1280
  private void assertMatch(
1281
      Command command, HistoryEvent event, String checkType, Object expected, Object actual) {
1282
    if (!expected.equals(actual)) {
1✔
1283
      String message =
1✔
1284
          String.format(
1✔
1285
              "Command %s doesn't match event %s with EventId=%s on check %s "
1286
                  + "with an expected value '%s' and an actual value '%s'",
1287
              command.getCommandType(),
1✔
1288
              event.getEventType(),
1✔
1289
              event.getEventId(),
1✔
1290
              checkType,
1291
              expected,
1292
              actual);
1293
      throw new NonDeterministicException(message);
1✔
1294
    }
1295
  }
1✔
1296

1297
  private class WorkflowTaskCommandsListener implements WorkflowTaskStateMachine.Listener {
1✔
1298
    @Override
1299
    public void workflowTaskStarted(
1300
        long startedEventId,
1301
        long currentTimeMillis,
1302
        boolean nonProcessedWorkflowTask,
1303
        long historySize,
1304
        boolean isContinueAsNewSuggested) {
1305
      setCurrentTimeMillis(currentTimeMillis);
1✔
1306
      for (CancellableCommand cancellableCommand : commands) {
1✔
1307
        cancellableCommand.handleWorkflowTaskStarted();
1✔
1308
      }
1✔
1309
      // Give local activities a chance to recreate their requests if they were lost due
1310
      // to the last workflow task failure. The loss could happen only the last workflow task
1311
      // was forcibly created by setting forceCreate on RespondWorkflowTaskCompletedRequest.
1312
      if (nonProcessedWorkflowTask) {
1✔
1313
        for (LocalActivityStateMachine value : localActivityMap.values()) {
1✔
1314
          value.nonReplayWorkflowTaskStarted();
1✔
1315
        }
1✔
1316
      }
1317
      WorkflowStateMachines.this.lastWFTStartedEventId = startedEventId;
1✔
1318
      WorkflowStateMachines.this.historySize = historySize;
1✔
1319
      WorkflowStateMachines.this.isContinueAsNewSuggested = isContinueAsNewSuggested;
1✔
1320

1321
      eventLoop();
1✔
1322
    }
1✔
1323

1324
    @Override
1325
    public void updateRunId(String currentRunId) {
1326
      WorkflowStateMachines.this.currentRunId = currentRunId;
×
1327
    }
×
1328
  }
1329

1330
  /**
1331
   * Extracts the eventId of the "initial command" for the given event.
1332
   *
1333
   * <p>The "initial command" is the event which started a group of related events:
1334
   * ActivityTaskScheduled, TimerStarted, and so on; for events which are not part of a group, the
1335
   * event's own eventId is returned. If the event has an unknown type but is marked as ignorable,
1336
   * then {@link OptionalLong#empty()} is returned instead.
1337
   *
1338
   * @return the eventId of the initial command, or {@link OptionalLong#empty()}
1339
   */
1340
  private OptionalLong getInitialCommandEventId(HistoryEvent event) {
1341
    switch (event.getEventType()) {
1✔
1342
      case EVENT_TYPE_ACTIVITY_TASK_STARTED:
1343
        return OptionalLong.of(event.getActivityTaskStartedEventAttributes().getScheduledEventId());
1✔
1344
      case EVENT_TYPE_ACTIVITY_TASK_COMPLETED:
1345
        return OptionalLong.of(
1✔
1346
            event.getActivityTaskCompletedEventAttributes().getScheduledEventId());
1✔
1347
      case EVENT_TYPE_ACTIVITY_TASK_FAILED:
1348
        return OptionalLong.of(event.getActivityTaskFailedEventAttributes().getScheduledEventId());
1✔
1349
      case EVENT_TYPE_ACTIVITY_TASK_TIMED_OUT:
1350
        return OptionalLong.of(
1✔
1351
            event.getActivityTaskTimedOutEventAttributes().getScheduledEventId());
1✔
1352
      case EVENT_TYPE_ACTIVITY_TASK_CANCEL_REQUESTED:
1353
        return OptionalLong.of(
×
1354
            event.getActivityTaskCancelRequestedEventAttributes().getScheduledEventId());
×
1355
      case EVENT_TYPE_ACTIVITY_TASK_CANCELED:
1356
        return OptionalLong.of(
1✔
1357
            event.getActivityTaskCanceledEventAttributes().getScheduledEventId());
1✔
1358
      case EVENT_TYPE_TIMER_FIRED:
1359
        return OptionalLong.of(event.getTimerFiredEventAttributes().getStartedEventId());
1✔
1360
      case EVENT_TYPE_TIMER_CANCELED:
1361
        return OptionalLong.of(event.getTimerCanceledEventAttributes().getStartedEventId());
×
1362
      case EVENT_TYPE_REQUEST_CANCEL_EXTERNAL_WORKFLOW_EXECUTION_FAILED:
1363
        return OptionalLong.of(
×
1364
            event
1365
                .getRequestCancelExternalWorkflowExecutionFailedEventAttributes()
×
1366
                .getInitiatedEventId());
×
1367
      case EVENT_TYPE_EXTERNAL_WORKFLOW_EXECUTION_CANCEL_REQUESTED:
1368
        return OptionalLong.of(
1✔
1369
            event
1370
                .getExternalWorkflowExecutionCancelRequestedEventAttributes()
1✔
1371
                .getInitiatedEventId());
1✔
1372
      case EVENT_TYPE_START_CHILD_WORKFLOW_EXECUTION_FAILED:
1373
        return OptionalLong.of(
1✔
1374
            event.getStartChildWorkflowExecutionFailedEventAttributes().getInitiatedEventId());
1✔
1375
      case EVENT_TYPE_CHILD_WORKFLOW_EXECUTION_STARTED:
1376
        return OptionalLong.of(
1✔
1377
            event.getChildWorkflowExecutionStartedEventAttributes().getInitiatedEventId());
1✔
1378
      case EVENT_TYPE_CHILD_WORKFLOW_EXECUTION_COMPLETED:
1379
        return OptionalLong.of(
1✔
1380
            event.getChildWorkflowExecutionCompletedEventAttributes().getInitiatedEventId());
1✔
1381
      case EVENT_TYPE_CHILD_WORKFLOW_EXECUTION_FAILED:
1382
        return OptionalLong.of(
1✔
1383
            event.getChildWorkflowExecutionFailedEventAttributes().getInitiatedEventId());
1✔
1384
      case EVENT_TYPE_CHILD_WORKFLOW_EXECUTION_CANCELED:
1385
        return OptionalLong.of(
1✔
1386
            event.getChildWorkflowExecutionCanceledEventAttributes().getInitiatedEventId());
1✔
1387
      case EVENT_TYPE_CHILD_WORKFLOW_EXECUTION_TIMED_OUT:
1388
        return OptionalLong.of(
1✔
1389
            event.getChildWorkflowExecutionTimedOutEventAttributes().getInitiatedEventId());
1✔
1390
      case EVENT_TYPE_CHILD_WORKFLOW_EXECUTION_TERMINATED:
1391
        return OptionalLong.of(
×
1392
            event.getChildWorkflowExecutionTerminatedEventAttributes().getInitiatedEventId());
×
1393
      case EVENT_TYPE_SIGNAL_EXTERNAL_WORKFLOW_EXECUTION_FAILED:
1394
        return OptionalLong.of(
1✔
1395
            event.getSignalExternalWorkflowExecutionFailedEventAttributes().getInitiatedEventId());
1✔
1396
      case EVENT_TYPE_EXTERNAL_WORKFLOW_EXECUTION_SIGNALED:
1397
        return OptionalLong.of(
1✔
1398
            event.getExternalWorkflowExecutionSignaledEventAttributes().getInitiatedEventId());
1✔
1399
      case EVENT_TYPE_WORKFLOW_TASK_STARTED:
1400
        return OptionalLong.of(event.getWorkflowTaskStartedEventAttributes().getScheduledEventId());
1✔
1401
      case EVENT_TYPE_WORKFLOW_TASK_COMPLETED:
1402
        return OptionalLong.of(
1✔
1403
            event.getWorkflowTaskCompletedEventAttributes().getScheduledEventId());
1✔
1404
      case EVENT_TYPE_WORKFLOW_TASK_TIMED_OUT:
1405
        return OptionalLong.of(
1✔
1406
            event.getWorkflowTaskTimedOutEventAttributes().getScheduledEventId());
1✔
1407
      case EVENT_TYPE_WORKFLOW_TASK_FAILED:
1408
        return OptionalLong.of(event.getWorkflowTaskFailedEventAttributes().getScheduledEventId());
1✔
1409
      case EVENT_TYPE_NEXUS_OPERATION_STARTED:
1410
        return OptionalLong.of(
1✔
1411
            event.getNexusOperationStartedEventAttributes().getScheduledEventId());
1✔
1412
      case EVENT_TYPE_NEXUS_OPERATION_COMPLETED:
1413
        return OptionalLong.of(
1✔
1414
            event.getNexusOperationCompletedEventAttributes().getScheduledEventId());
1✔
1415
      case EVENT_TYPE_NEXUS_OPERATION_FAILED:
1416
        return OptionalLong.of(
1✔
1417
            event.getNexusOperationFailedEventAttributes().getScheduledEventId());
1✔
1418
      case EVENT_TYPE_NEXUS_OPERATION_CANCELED:
1419
        return OptionalLong.of(
1✔
1420
            event.getNexusOperationCanceledEventAttributes().getScheduledEventId());
1✔
1421
      case EVENT_TYPE_NEXUS_OPERATION_TIMED_OUT:
1422
        return OptionalLong.of(
1✔
1423
            event.getNexusOperationTimedOutEventAttributes().getScheduledEventId());
1✔
1424
      case EVENT_TYPE_ACTIVITY_TASK_SCHEDULED:
1425
      case EVENT_TYPE_TIMER_STARTED:
1426
      case EVENT_TYPE_MARKER_RECORDED:
1427
      case EVENT_TYPE_SIGNAL_EXTERNAL_WORKFLOW_EXECUTION_INITIATED:
1428
      case EVENT_TYPE_START_CHILD_WORKFLOW_EXECUTION_INITIATED:
1429
      case EVENT_TYPE_REQUEST_CANCEL_EXTERNAL_WORKFLOW_EXECUTION_INITIATED:
1430
      case EVENT_TYPE_WORKFLOW_EXECUTION_CONTINUED_AS_NEW:
1431
      case EVENT_TYPE_WORKFLOW_EXECUTION_TERMINATED:
1432
      case EVENT_TYPE_UPSERT_WORKFLOW_SEARCH_ATTRIBUTES:
1433
      case EVENT_TYPE_WORKFLOW_TASK_SCHEDULED:
1434
      case EVENT_TYPE_WORKFLOW_EXECUTION_SIGNALED:
1435
      case EVENT_TYPE_WORKFLOW_EXECUTION_STARTED:
1436
      case EVENT_TYPE_WORKFLOW_EXECUTION_COMPLETED:
1437
      case EVENT_TYPE_WORKFLOW_EXECUTION_FAILED:
1438
      case EVENT_TYPE_WORKFLOW_EXECUTION_TIMED_OUT:
1439
      case EVENT_TYPE_WORKFLOW_EXECUTION_CANCEL_REQUESTED:
1440
      case EVENT_TYPE_WORKFLOW_EXECUTION_CANCELED:
1441
      case EVENT_TYPE_WORKFLOW_EXECUTION_UPDATE_ADMITTED:
1442
      case EVENT_TYPE_WORKFLOW_PROPERTIES_MODIFIED:
1443
      case EVENT_TYPE_NEXUS_OPERATION_SCHEDULED:
1444
      case EVENT_TYPE_NEXUS_OPERATION_CANCEL_REQUESTED:
1445
        return OptionalLong.of(event.getEventId());
1✔
1446

1447
      default:
1448
        if (event.getWorkerMayIgnore()) {
1✔
1449
          return OptionalLong.empty();
1✔
1450
        }
1451
        throw new IllegalArgumentException("Unexpected event type: " + event.getEventType());
1✔
1452
    }
1453
  }
1454

1455
  /**
1456
   * Workflow code executes only while event loop is running. So operations that can be invoked from
1457
   * the workflow have to satisfy this condition.
1458
   */
1459
  private void checkEventLoopExecuting() {
1460
    if (!eventLoopExecuting) {
1✔
1461
      // this call doesn't yield or await, because the await function returns true,
1462
      // but it checks if the workflow thread needs to be destroyed
1463
      WorkflowThread.await("kill workflow thread if destroy requested", () -> true);
×
1464
      throw new IllegalStateException("Operation allowed only while eventLoop is running");
×
1465
    }
1466
  }
1✔
1467

1468
  private String createEventHandlingMessage(HistoryEvent event) {
1469
    return "Failure handling event "
1✔
1470
        + event.getEventId()
1✔
1471
        + " of type '"
1472
        + event.getEventType()
1✔
1473
        + "' "
1474
        + (this.isReplaying() ? "during replay" : "during execution");
1✔
1475
  }
1476

1477
  private String createShortCurrentStateMessagePostfix() {
1478
    return String.format(
1✔
1479
        "{WorkflowTaskStartedEventId=%s, CurrentStartedEventId=%s}",
1480
        this.workflowTaskStartedEventId, this.lastWFTStartedEventId);
1✔
1481
  }
1482
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc