• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

temporalio / sdk-java / #194

10 Oct 2023 03:10PM UTC coverage: 77.401% (+0.005%) from 77.396%
#194

push

github-actions

web-flow
Reset lastHandledEventId on speculative WFT (#1881)

3 of 3 new or added lines in 1 file covered. (100.0%)

18697 of 24156 relevant lines covered (77.4%)

0.77 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

91.43
/temporal-sdk/src/main/java/io/temporal/internal/statemachines/WorkflowStateMachines.java
1
/*
2
 * Copyright (C) 2022 Temporal Technologies, Inc. All Rights Reserved.
3
 *
4
 * Copyright (C) 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved.
5
 *
6
 * Modifications copyright (C) 2017 Uber Technologies, Inc.
7
 *
8
 * Licensed under the Apache License, Version 2.0 (the "License");
9
 * you may not use this material except in compliance with the License.
10
 * You may obtain a copy of the License at
11
 *
12
 *   http://www.apache.org/licenses/LICENSE-2.0
13
 *
14
 * Unless required by applicable law or agreed to in writing, software
15
 * distributed under the License is distributed on an "AS IS" BASIS,
16
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17
 * See the License for the specific language governing permissions and
18
 * limitations under the License.
19
 */
20

21
package io.temporal.internal.statemachines;
22

23
import static io.temporal.api.enums.v1.CommandType.COMMAND_TYPE_PROTOCOL_MESSAGE;
24
import static io.temporal.internal.common.WorkflowExecutionUtils.getEventTypeForCommand;
25
import static io.temporal.internal.common.WorkflowExecutionUtils.isCommandEvent;
26
import static io.temporal.serviceclient.CheckedExceptionWrapper.unwrap;
27

28
import com.google.common.annotations.VisibleForTesting;
29
import com.google.common.base.Preconditions;
30
import com.google.common.base.Strings;
31
import com.google.protobuf.Any;
32
import io.temporal.api.command.v1.*;
33
import io.temporal.api.common.v1.Payloads;
34
import io.temporal.api.common.v1.SearchAttributes;
35
import io.temporal.api.common.v1.WorkflowExecution;
36
import io.temporal.api.enums.v1.EventType;
37
import io.temporal.api.failure.v1.Failure;
38
import io.temporal.api.history.v1.*;
39
import io.temporal.api.protocol.v1.Message;
40
import io.temporal.api.workflowservice.v1.GetSystemInfoResponse;
41
import io.temporal.failure.CanceledFailure;
42
import io.temporal.internal.common.*;
43
import io.temporal.internal.history.LocalActivityMarkerUtils;
44
import io.temporal.internal.history.VersionMarkerUtils;
45
import io.temporal.internal.sync.WorkflowThread;
46
import io.temporal.internal.worker.LocalActivityResult;
47
import io.temporal.worker.NonDeterministicException;
48
import io.temporal.workflow.ChildWorkflowCancellationType;
49
import io.temporal.workflow.Functions;
50
import java.nio.charset.StandardCharsets;
51
import java.util.*;
52

53
public final class WorkflowStateMachines {
54

55
  enum HandleEventStatus {
1✔
56
    OK,
1✔
57
    NON_MATCHING_EVENT
1✔
58
  }
59

60
  /** Initial set of SDK flags that will be set on all new workflow executions. */
61
  private static final List<SdkFlag> initialFlags =
1✔
62
      Collections.unmodifiableList(Arrays.asList(SdkFlag.SKIP_YIELD_ON_DEFAULT_VERSION));
1✔
63

64
  /**
65
   * EventId of the WorkflowTaskStarted event of the Workflow Task that was picked up by a worker
66
   * and triggered a current replay or execution. It's expected to be the last event in the history
67
   * if we continue to execute the workflow.
68
   *
69
   * <p>For direct (legacy) queries, it may be:
70
   *
71
   * <ul>
72
   *   <li>0 if it's query a closed workflow execution
73
   *   <li>id of the last successfully completed Workflow Task if the workflow is not closed
74
   * </ul>
75
   *
76
   * <p>Set from the "outside" from the PollWorkflowTaskQueueResponse. Not modified by the SDK state
77
   * machines.
78
   */
79
  private long workflowTaskStartedEventId;
80

81
  /** EventId of the last WorkflowTaskStarted event handled by these state machines. */
82
  private long currentStartedEventId;
83

84
  private long historySize;
85

86
  private boolean isContinueAsNewSuggested;
87

88
  /**
89
   * EventId of the last event seen by these state machines. Events earlier than this one will be
90
   * discarded.
91
   */
92
  private long lastHandledEventId;
93

94
  private final StatesMachinesCallback callbacks;
95

96
  /** Callback to send new commands to. */
97
  private final Functions.Proc1<CancellableCommand> commandSink;
98

99
  /**
100
   * currentRunId is used as seed by Workflow.newRandom and randomUUID. It allows to generate them
101
   * deterministically.
102
   */
103
  private String currentRunId;
104

105
  /** Used Workflow.newRandom and randomUUID together with currentRunId. */
106
  private long idCounter;
107

108
  /** Current workflow time. */
109
  private long currentTimeMillis = -1;
1✔
110

111
  private final Map<Long, EntityStateMachine> stateMachines = new HashMap<>();
1✔
112

113
  private final Map<String, EntityStateMachine> protocolStateMachines = new HashMap<>();
1✔
114

115
  private final Queue<Message> messageOutbox = new ArrayDeque<>();
1✔
116

117
  private final Queue<CancellableCommand> commands = new ArrayDeque<>();
1✔
118

119
  /**
120
   * Commands generated by the currently processed workflow task. It is a queue as commands can be
121
   * added (due to marker based commands) while iterating over already added commands.
122
   */
123
  private final Queue<CancellableCommand> cancellableCommands = new ArrayDeque<>();
1✔
124

125
  /**
126
   * Is workflow executing new code or replaying from the history. Note that this flag ALWAYS flips
127
   * to true for the time when we apply events from the server even if the commands were created by
128
   * an actual execution with replaying=false.
129
   */
130
  private boolean replaying;
131

132
  /** Used to ensure that event loop is not executed recursively. */
133
  private boolean eventLoopExecuting;
134

135
  /**
136
   * Used to avoid recursive calls to {@link #prepareCommands()}.
137
   *
138
   * <p>Such calls happen when sideEffects and localActivity markers are processed.
139
   */
140
  private boolean preparing;
141

142
  /** Key is mutable side effect id */
143
  private final Map<String, MutableSideEffectStateMachine> mutableSideEffects = new HashMap<>();
1✔
144

145
  /** Key is changeId */
146
  private final Map<String, VersionStateMachine> versions = new HashMap<>();
1✔
147

148
  /** Map of local activities by their id. */
149
  private final Map<String, LocalActivityStateMachine> localActivityMap = new HashMap<>();
1✔
150

151
  private List<ExecuteLocalActivityParameters> localActivityRequests = new ArrayList<>();
1✔
152

153
  private final Functions.Proc1<ExecuteLocalActivityParameters> localActivityRequestSink;
154
  private final Functions.Proc1<StateMachine> stateMachineSink;
155

156
  private final WFTBuffer wftBuffer = new WFTBuffer();
1✔
157

158
  private List<Message> messages = new ArrayList<Message>();
1✔
159

160
  private final SdkFlags flags;
161

162
  public WorkflowStateMachines(
163
      StatesMachinesCallback callbacks, GetSystemInfoResponse.Capabilities capabilities) {
164
    this(callbacks, (stateMachine) -> {}, capabilities);
1✔
165
  }
1✔
166

167
  @VisibleForTesting
168
  public WorkflowStateMachines(
169
      StatesMachinesCallback callbacks,
170
      Functions.Proc1<StateMachine> stateMachineSink,
171
      GetSystemInfoResponse.Capabilities capabilities) {
1✔
172
    this.callbacks = Objects.requireNonNull(callbacks);
1✔
173
    this.commandSink = cancellableCommands::add;
1✔
174
    this.stateMachineSink = stateMachineSink;
1✔
175
    this.localActivityRequestSink = (request) -> localActivityRequests.add(request);
1✔
176
    this.flags = new SdkFlags(capabilities.getSdkMetadata(), this::isReplaying);
1✔
177
  }
1✔
178

179
  @VisibleForTesting
180
  public WorkflowStateMachines(
181
      StatesMachinesCallback callbacks, Functions.Proc1<StateMachine> stateMachineSink) {
1✔
182
    this.callbacks = Objects.requireNonNull(callbacks);
1✔
183
    this.commandSink = cancellableCommands::add;
1✔
184
    this.stateMachineSink = stateMachineSink;
1✔
185
    this.localActivityRequestSink = (request) -> localActivityRequests.add(request);
1✔
186
    this.flags = new SdkFlags(false, this::isReplaying);
1✔
187
  }
1✔
188

189
  // TODO revisit and potentially remove workflowTaskStartedEventId at all from the state machines.
190
  // The only place where it's used is WorkflowTaskStateMachine to understand that there will be
191
  // no completion event in the history.
192
  // This is tricky, because direct queries come with 0 as a magic value in this field if
193
  // execution is completed for example.
194
  // Most likely we can rework WorkflowTaskStateMachine to use only hasNext.
195
  /**
196
   * @param workflowTaskStartedEventId eventId of the workflowTask that was picked up by a worker
197
   *     and triggered an execution. Used in {@link WorkflowTaskStateMachine} only to understand
198
   *     that this workflow task will not have a matching closing event and needs to be executed.
199
   */
200
  public void setWorklfowStartedEventId(long workflowTaskStartedEventId) {
201
    this.workflowTaskStartedEventId = workflowTaskStartedEventId;
1✔
202
  }
1✔
203

204
  public void setCurrentStartedEventId(long eventId) {
205
    // We have to drop any state machines (which should only be one workflow task machine)
206
    // created when handling the speculative workflow task
207
    for (long i = this.lastHandledEventId; i > eventId; i--) {
×
208
      stateMachines.remove(i);
×
209
    }
210
    this.currentStartedEventId = eventId;
×
211
    // When we reset the event ID on a speculative WFT we need to move this counter back
212
    // to the last WFT completed to allow new tasks to be processed. Assume the WFT complete
213
    // always follows the WFT started.
214
    this.lastHandledEventId = eventId + 1;
×
215
  }
×
216

217
  public long getCurrentStartedEventId() {
218
    return currentStartedEventId;
1✔
219
  }
220

221
  public long getHistorySize() {
222
    return historySize;
1✔
223
  }
224

225
  public boolean isContinueAsNewSuggested() {
226
    return isContinueAsNewSuggested;
1✔
227
  }
228

229
  public void setReplaying(boolean replaying) {
230
    this.replaying = replaying;
1✔
231
  }
1✔
232

233
  public void setMessages(List<Message> messages) {
234
    this.messages = messages;
1✔
235
  }
1✔
236

237
  /**
238
   * Handle a single event from the workflow history.
239
   *
240
   * @param event event from the history.
241
   * @param hasNextEvent false if this is the last event in the history.
242
   */
243
  public void handleEvent(HistoryEvent event, boolean hasNextEvent) {
244
    long eventId = event.getEventId();
1✔
245
    if (eventId <= lastHandledEventId) {
1✔
246
      // already handled
247
      return;
1✔
248
    }
249
    Preconditions.checkState(
1✔
250
        eventId == lastHandledEventId + 1,
251
        "History is out of order. "
252
            + "There is a gap between the last event workflow state machine observed and currently handling event. "
253
            + "Last processed eventId: %s, handling eventId: %s",
254
        lastHandledEventId,
255
        eventId);
256

257
    lastHandledEventId = eventId;
1✔
258
    boolean readyToPeek = wftBuffer.addEvent(event, hasNextEvent);
1✔
259
    if (readyToPeek) {
1✔
260
      handleEventsBatch(wftBuffer.fetch(), hasNextEvent);
1✔
261
    }
262
  }
1✔
263

264
  /**
265
   * Handle an events batch for one workflow task. Events that are related to one workflow task
266
   * during replay should be prefetched and supplied in one batch.
267
   *
268
   * @param events events belong to one workflow task
269
   * @param hasNextEvent true if there are more events in the history follow this batch, false if
270
   *     this batch contains the last events of the history
271
   */
272
  private void handleEventsBatch(List<HistoryEvent> events, boolean hasNextEvent) {
273
    if (EventType.EVENT_TYPE_WORKFLOW_EXECUTION_STARTED.equals(events.get(0).getEventType())) {
1✔
274
      for (SdkFlag flag : initialFlags) {
1✔
275
        flags.tryUseSdkFlag(flag);
1✔
276
      }
1✔
277
    }
278

279
    if (EventType.EVENT_TYPE_WORKFLOW_TASK_STARTED.equals(events.get(0).getEventType())) {
1✔
280
      for (HistoryEvent event : events) {
1✔
281
        handleSingleEventLookahead(event);
1✔
282
      }
1✔
283
    }
284

285
    for (Iterator<HistoryEvent> iterator = events.iterator(); iterator.hasNext(); ) {
1✔
286
      HistoryEvent event = iterator.next();
1✔
287

288
      // On replay the messages are available after the workflow task schedule event, so we
289
      // need to handle them before workflow task started event to maintain a consistent order.
290
      for (Message msg : this.takeLTE(event.getEventId() - 1)) {
1✔
291
        handleSingleMessage(msg);
1✔
292
      }
1✔
293

294
      try {
295
        handleSingleEvent(event, iterator.hasNext() || hasNextEvent);
1✔
296
      } catch (RuntimeException e) {
1✔
297
        throw createEventProcessingException(e, event);
1✔
298
      }
1✔
299

300
      for (Message msg : this.takeLTE(event.getEventId())) {
1✔
301
        handleSingleMessage(msg);
1✔
302
      }
1✔
303
    }
1✔
304
  }
1✔
305

306
  /** Handle an event when looking ahead at history during replay */
307
  private void handleSingleEventLookahead(HistoryEvent event) {
308
    EventType eventType = event.getEventType();
1✔
309
    switch (eventType) {
1✔
310
      case EVENT_TYPE_MARKER_RECORDED:
311
        try {
312
          preloadVersionMarker(event);
1✔
313
        } catch (RuntimeException e) {
1✔
314
          throw createEventProcessingException(e, event);
1✔
315
        }
1✔
316
        break;
317
      case EVENT_TYPE_WORKFLOW_EXECUTION_UPDATE_ACCEPTED:
318
        // Look ahead to infer protocol messages
319
        WorkflowExecutionUpdateAcceptedEventAttributes updateEvent =
1✔
320
            event.getWorkflowExecutionUpdateAcceptedEventAttributes();
1✔
321
        this.messages.add(
1✔
322
            Message.newBuilder()
1✔
323
                .setId(updateEvent.getAcceptedRequestMessageId())
1✔
324
                .setProtocolInstanceId(updateEvent.getProtocolInstanceId())
1✔
325
                .setEventId(updateEvent.getAcceptedRequestSequencingEventId())
1✔
326
                .setBody(Any.pack(updateEvent.getAcceptedRequest()))
1✔
327
                .build());
1✔
328
        break;
1✔
329
      case EVENT_TYPE_WORKFLOW_TASK_COMPLETED:
330
        WorkflowTaskCompletedEventAttributes completedEvent =
1✔
331
            event.getWorkflowTaskCompletedEventAttributes();
1✔
332
        for (Integer flag : completedEvent.getSdkMetadata().getLangUsedFlagsList()) {
1✔
333
          SdkFlag sdkFlag = SdkFlag.getValue(flag);
1✔
334
          if (sdkFlag.equals(SdkFlag.UNKNOWN)) {
1✔
335
            throw new IllegalArgumentException("Unknown SDK flag:" + flag);
×
336
          }
337
          flags.setSdkFlag(sdkFlag);
1✔
338
        }
1✔
339
        break;
340
    }
341
  }
1✔
342

343
  private List<Message> takeLTE(long eventId) {
344
    List<Message> m = new ArrayList<Message>();
1✔
345
    List<Message> remainingMessages = new ArrayList<Message>();
1✔
346
    for (Message msg : this.messages) {
1✔
347
      if (msg.getEventId() > eventId) {
1✔
348
        remainingMessages.add(msg);
1✔
349
      } else {
350
        m.add(msg);
1✔
351
      }
352
    }
1✔
353
    this.messages = remainingMessages;
1✔
354
    return m;
1✔
355
  }
356

357
  private RuntimeException createEventProcessingException(RuntimeException e, HistoryEvent event) {
358
    Throwable ex = unwrap(e);
1✔
359
    if (ex instanceof NonDeterministicException) {
1✔
360
      // just appending the message in front of an existing message, saving the original stacktrace
361
      NonDeterministicException modifiedException =
1✔
362
          new NonDeterministicException(
363
              createEventHandlingMessage(event)
1✔
364
                  + ". "
365
                  + ex.getMessage()
1✔
366
                  + ". "
367
                  + createShortCurrentStateMessagePostfix(),
1✔
368
              ex.getCause());
1✔
369
      modifiedException.setStackTrace(ex.getStackTrace());
1✔
370
      return modifiedException;
1✔
371
    } else {
372
      return new InternalWorkflowTaskException(
1✔
373
          createEventHandlingMessage(event) + ". " + createShortCurrentStateMessagePostfix(), ex);
1✔
374
    }
375
  }
376

377
  private void handleSingleMessage(Message message) {
378
    // Get or create protocol state machine based on Instance ID and protocolName
379
    EntityStateMachine stateMachine =
1✔
380
        protocolStateMachines.computeIfAbsent(
1✔
381
            message.getProtocolInstanceId(),
1✔
382
            (protocolInstance) -> {
383
              String protocolName = ProtocolUtils.getProtocol(message);
1✔
384
              Optional<ProtocolType> type = ProtocolType.get(protocolName);
1✔
385
              if (type.isPresent()) {
1✔
386
                switch (type.get()) {
1✔
387
                  case UPDATE_V1:
388
                    return UpdateProtocolStateMachine.newInstance(
1✔
389
                        this::isReplaying,
390
                        callbacks::update,
1✔
391
                        this::sendMessage,
392
                        commandSink,
393
                        stateMachineSink);
394
                  default:
395
                    throw new IllegalArgumentException("Unknown protocol type:" + protocolName);
×
396
                }
397
              }
398
              throw new IllegalArgumentException("Protocol type not specified:" + message);
×
399
            });
400
    stateMachine.handleMessage(message);
1✔
401
  }
1✔
402

403
  private void handleSingleEvent(HistoryEvent event, boolean hasNextEvent) {
404
    if (isCommandEvent(event)) {
1✔
405
      handleCommandEvent(event);
1✔
406
      return;
1✔
407
    }
408

409
    if (replaying
1✔
410
        && !hasNextEvent
411
        && (event.getEventType() == EventType.EVENT_TYPE_WORKFLOW_TASK_STARTED
1✔
412
            || WorkflowExecutionUtils.isWorkflowTaskClosedEvent(event))) {
1✔
413
      replaying = false;
1✔
414
    }
415

416
    Long initialCommandEventId = getInitialCommandEventId(event);
1✔
417
    EntityStateMachine c = stateMachines.get(initialCommandEventId);
1✔
418
    if (c != null) {
1✔
419
      c.handleEvent(event, hasNextEvent);
1✔
420
      if (c.isFinalState()) {
1✔
421
        stateMachines.remove(initialCommandEventId);
1✔
422
      }
423
    } else {
424
      handleNonStatefulEvent(event, hasNextEvent);
1✔
425
    }
426
  }
1✔
427

428
  /**
429
   * Handles command event. Command event is an event which is generated from a command emitted by a
430
   * past decision. Each command has a correspondent event. For example ScheduleActivityTaskCommand
431
   * is recorded to the history as ActivityTaskScheduledEvent.
432
   *
433
   * <p>Command events always follow WorkflowTaskCompletedEvent.
434
   *
435
   * <p>The handling consists from verifying that the next command in the commands queue matches the
436
   * event, command state machine is notified about the event and the command is removed from the
437
   * commands queue.
438
   */
439
  private void handleCommandEvent(HistoryEvent event) {
440
    if (handleLocalActivityMarker(event)) {
1✔
441
      return;
1✔
442
    }
443

444
    // Match event to the next command in the stateMachine queue.
445
    // After matching the command is notified about the event and is removed from the
446
    // queue.
447
    CancellableCommand matchingCommand = null;
1✔
448
    while (matchingCommand == null) {
1✔
449
      // handleVersionMarker can skip a marker event if the getVersion call was removed.
450
      // In this case we don't want to consume a command.
451
      // That's why peek is used instead of poll.
452
      CancellableCommand command = commands.peek();
1✔
453
      if (command == null) {
1✔
454
        if (handleNonMatchingVersionMarker(event)) {
×
455
          // this event is a version marker for removed getVersion call.
456
          // Handle the version marker as unmatched and return even if there is no commands to match
457
          // it against.
458
          return;
×
459
        } else {
460
          throw new NonDeterministicException("No command scheduled that corresponds to " + event);
×
461
        }
462
      }
463

464
      if (command.isCanceled()) {
1✔
465
        // Consume and skip the command
466
        commands.poll();
1✔
467
        continue;
1✔
468
      }
469

470
      // Note that handleEvent can cause a command cancellation in case of
471
      // 1. MutableSideEffect
472
      // 2. Version State Machine during replay cancels the command and enters SKIPPED state
473
      //    if it handled non-matching event.
474
      HandleEventStatus status = command.handleEvent(event, true);
1✔
475

476
      if (command.isCanceled()) {
1✔
477
        // Consume and skip the command
478
        commands.poll();
1✔
479
        continue;
1✔
480
      }
481

482
      switch (status) {
1✔
483
        case OK:
484
          // Consume the command
485
          commands.poll();
1✔
486
          matchingCommand = command;
1✔
487
          break;
1✔
488
        case NON_MATCHING_EVENT:
489
          if (handleNonMatchingVersionMarker(event)) {
1✔
490
            // this event is a version marker for removed getVersion call.
491
            // Handle the version marker as unmatched and return without consuming the command
492
            return;
1✔
493
          } else {
494
            throw new NonDeterministicException(
1✔
495
                "Event "
496
                    + event.getEventId()
1✔
497
                    + " of type "
498
                    + event.getEventType()
1✔
499
                    + " does not"
500
                    + " match command type "
501
                    + command.getCommandType());
1✔
502
          }
503
        default:
504
          throw new IllegalStateException(
×
505
              "Got " + status + " value from command.handleEvent which is not handled");
506
      }
507
    }
1✔
508

509
    validateCommand(matchingCommand.getCommand(), event);
1✔
510
    EntityStateMachine stateMachine = matchingCommand.getStateMachine();
1✔
511
    if (!stateMachine.isFinalState()) {
1✔
512
      stateMachines.put(event.getEventId(), stateMachine);
1✔
513
    }
514
    // Marker is the only command processing of which can cause workflow code execution
515
    // and generation of new state machines.
516
    if (event.getEventType() == EventType.EVENT_TYPE_MARKER_RECORDED) {
1✔
517
      prepareCommands();
1✔
518
    }
519
  }
1✔
520

521
  private void preloadVersionMarker(HistoryEvent event) {
522
    if (VersionMarkerUtils.hasVersionMarkerStructure(event)) {
1✔
523
      String changeId = VersionMarkerUtils.tryGetChangeIdFromVersionMarkerEvent(event);
1✔
524
      if (changeId == null) {
1✔
525
        // if we can't extract changeId, this event will later fail to match with anything
526
        // and the corresponded exception will be thrown
527
        return;
×
528
      }
529
      VersionStateMachine versionStateMachine =
1✔
530
          versions.computeIfAbsent(
1✔
531
              changeId,
532
              (idKey) ->
533
                  VersionStateMachine.newInstance(
1✔
534
                      changeId, this::isReplaying, commandSink, stateMachineSink));
535
      versionStateMachine.handleMarkersPreload(event);
1✔
536
    }
537
  }
1✔
538

539
  private boolean handleNonMatchingVersionMarker(HistoryEvent event) {
540
    String changeId = VersionMarkerUtils.tryGetChangeIdFromVersionMarkerEvent(event);
1✔
541
    if (changeId == null) {
1✔
542
      return false;
1✔
543
    }
544
    VersionStateMachine versionStateMachine = versions.get(changeId);
1✔
545
    Preconditions.checkNotNull(
1✔
546
        versionStateMachine,
547
        "versionStateMachine is expected to be initialized already by execution or preloading");
548
    versionStateMachine.handleNonMatchingEvent(event);
1✔
549
    return true;
1✔
550
  }
551

552
  public List<Command> takeCommands() {
553
    List<Command> result = new ArrayList<>(commands.size());
1✔
554
    for (CancellableCommand command : commands) {
1✔
555
      if (!command.isCanceled()) {
1✔
556
        result.add(command.getCommand());
1✔
557
      }
558
    }
1✔
559
    return result;
1✔
560
  }
561

562
  public void sendMessage(Message message) {
563
    checkEventLoopExecuting();
1✔
564
    if (!isReplaying()) {
1✔
565
      messageOutbox.add(message);
1✔
566
    }
567
  }
1✔
568

569
  public List<Message> takeMessages() {
570
    List<Message> result = new ArrayList<>(messageOutbox.size());
1✔
571
    for (Message message : messageOutbox) {
1✔
572
      result.add(message);
1✔
573
    }
1✔
574
    messageOutbox.clear();
1✔
575
    return result;
1✔
576
  }
577

578
  /**
579
   * @return True if the SDK flag is supported in this workflow execution
580
   */
581
  public boolean tryUseSdkFlag(SdkFlag flag) {
582
    return flags.tryUseSdkFlag(flag);
1✔
583
  }
584

585
  /**
586
   * @return Set of all new flags set since the last call
587
   */
588
  public EnumSet<SdkFlag> takeNewSdkFlags() {
589
    return flags.takeNewSdkFlags();
1✔
590
  }
591

592
  private void prepareCommands() {
593
    if (preparing) {
1✔
594
      return;
1✔
595
    }
596
    preparing = true;
1✔
597
    try {
598
      prepareImpl();
1✔
599
    } finally {
600
      preparing = false;
1✔
601
    }
602
  }
1✔
603

604
  private void prepareImpl() {
605
    // handleCommand can lead to code execution because of SideEffect, MutableSideEffect or local
606
    // activity completion. And code execution can lead to creation of new commands and
607
    // cancellation of existing commands. That is the reason for using Queue as a data structure for
608
    // commands.
609
    while (true) {
610
      CancellableCommand command = cancellableCommands.poll();
1✔
611
      if (command == null) {
1✔
612
        break;
1✔
613
      }
614
      // handleCommand should be called even on canceled ones to support mutableSideEffect
615
      command.handleCommand(command.getCommandType());
1✔
616
      commands.add(command);
1✔
617
    }
1✔
618
  }
1✔
619

620
  /**
621
   * Local activity is different from all other entities. It doesn't schedule a marker command when
622
   * the {@link #scheduleLocalActivityTask(ExecuteLocalActivityParameters, Functions.Proc2)} is
623
   * called. The marker is scheduled only when activity completes through ({@link
624
   * #handleLocalActivityCompletion(LocalActivityResult)}). That's why the normal logic of {@link
625
   * #handleCommandEvent(HistoryEvent)}, which assumes that each event has a correspondent command
626
   * during replay, doesn't work. Instead, local activities are matched by their id using
627
   * localActivityMap.
628
   *
629
   * @return true if matched and false if normal event handling should continue.
630
   */
631
  private boolean handleLocalActivityMarker(HistoryEvent event) {
632
    if (!LocalActivityMarkerUtils.hasLocalActivityStructure(event)) {
1✔
633
      return false;
1✔
634
    }
635

636
    MarkerRecordedEventAttributes markerAttributes = event.getMarkerRecordedEventAttributes();
1✔
637
    String id = LocalActivityMarkerUtils.getActivityId(markerAttributes);
1✔
638
    LocalActivityStateMachine stateMachine = localActivityMap.remove(id);
1✔
639
    if (stateMachine == null) {
1✔
640
      String activityType = LocalActivityMarkerUtils.getActivityTypeName(markerAttributes);
×
641
      throw new NonDeterministicException(
×
642
          String.format(
×
643
              "Local activity of type %s is recorded in the history with id %s but was not expected by the execution",
644
              activityType, id));
645
    }
646
    // RESULT_NOTIFIED state means that there is outstanding command that has to be matched
647
    // using standard logic. So return false to let the handleCommand method to run its standard
648
    // logic.
649
    if (stateMachine.getState() == LocalActivityStateMachine.State.RESULT_NOTIFIED) {
1✔
650
      return false;
1✔
651
    }
652
    stateMachine.handleEvent(event, true);
1✔
653
    eventLoop();
1✔
654
    return true;
1✔
655
  }
656

657
  private void eventLoop() {
658
    if (eventLoopExecuting) {
1✔
659
      return;
1✔
660
    }
661
    eventLoopExecuting = true;
1✔
662
    try {
663
      callbacks.eventLoop();
1✔
664
    } finally {
665
      eventLoopExecuting = false;
1✔
666
    }
667
    prepareCommands();
1✔
668
  }
1✔
669

670
  private void handleNonStatefulEvent(HistoryEvent event, boolean hasNextEvent) {
671
    switch (event.getEventType()) {
1✔
672
      case EVENT_TYPE_WORKFLOW_EXECUTION_STARTED:
673
        this.currentRunId =
1✔
674
            event.getWorkflowExecutionStartedEventAttributes().getOriginalExecutionRunId();
1✔
675
        callbacks.start(event);
1✔
676
        break;
1✔
677
      case EVENT_TYPE_WORKFLOW_TASK_SCHEDULED:
678
        WorkflowTaskStateMachine c =
1✔
679
            WorkflowTaskStateMachine.newInstance(
1✔
680
                workflowTaskStartedEventId, new WorkflowTaskCommandsListener());
681
        c.handleEvent(event, hasNextEvent);
1✔
682
        stateMachines.put(event.getEventId(), c);
1✔
683
        break;
1✔
684
      case EVENT_TYPE_WORKFLOW_EXECUTION_SIGNALED:
685
        callbacks.signal(event);
1✔
686
        break;
1✔
687
      case EVENT_TYPE_WORKFLOW_EXECUTION_CANCEL_REQUESTED:
688
        callbacks.cancel(event);
1✔
689
        break;
1✔
690
      case EVENT_TYPE_WORKFLOW_EXECUTION_TIMED_OUT:
691
      case UNRECOGNIZED:
692
        break;
1✔
693
      default:
694
        throw new IllegalArgumentException("Unexpected event:" + event);
×
695
    }
696
  }
1✔
697

698
  private long setCurrentTimeMillis(long currentTimeMillis) {
699
    if (this.currentTimeMillis < currentTimeMillis) {
1✔
700
      this.currentTimeMillis = currentTimeMillis;
1✔
701
    }
702
    return this.currentTimeMillis;
1✔
703
  }
704

705
  public long getLastStartedEventId() {
706
    return currentStartedEventId;
1✔
707
  }
708

709
  /**
710
   * @param attributes attributes used to schedule an activity
711
   * @param callback completion callback
712
   * @return an instance of ActivityCommands
713
   */
714
  public Functions.Proc scheduleActivityTask(
715
      ExecuteActivityParameters attributes, Functions.Proc2<Optional<Payloads>, Failure> callback) {
716
    checkEventLoopExecuting();
1✔
717
    ActivityStateMachine activityStateMachine =
1✔
718
        ActivityStateMachine.newInstance(
1✔
719
            attributes,
720
            (p, f) -> {
721
              Failure failure = f != null ? f.getFailure() : null;
1✔
722
              callback.apply(p, failure);
1✔
723

724
              if (f != null
1✔
725
                  && !f.isFromEvent()
1✔
726
                  && failure.hasCause()
1✔
727
                  && failure.getCause().hasCanceledFailureInfo()) {
1✔
728
                // If !f.isFromEvent(), we want to unblock the event loop as the promise got filled
729
                // and the workflow may make progress. If f.isFromEvent(), we need to delay event
730
                // loop triggering until WorkflowTaskStarted.
731
                eventLoop();
1✔
732
              }
733
            },
1✔
734
            commandSink,
735
            stateMachineSink);
736
    return activityStateMachine::cancel;
1✔
737
  }
738

739
  /**
740
   * Creates a new timer state machine
741
   *
742
   * @param attributes timer command attributes
743
   * @param completionCallback invoked when timer fires or reports cancellation. One of
744
   *     TimerFiredEvent, TimerCanceledEvent.
745
   * @return cancellation callback that should be invoked to initiate timer cancellation
746
   */
747
  public Functions.Proc newTimer(
748
      StartTimerCommandAttributes attributes, Functions.Proc1<HistoryEvent> completionCallback) {
749
    checkEventLoopExecuting();
1✔
750
    TimerStateMachine timer =
1✔
751
        TimerStateMachine.newInstance(
1✔
752
            attributes,
753
            (event) -> {
754
              completionCallback.apply(event);
1✔
755
              // Needed due to immediate cancellation
756
              if (event.getEventType() == EventType.EVENT_TYPE_TIMER_CANCELED) {
1✔
757
                eventLoop();
1✔
758
              }
759
            },
1✔
760
            commandSink,
761
            stateMachineSink);
762
    return timer::cancel;
1✔
763
  }
764

765
  /**
766
   * Creates a new child state machine
767
   *
768
   * @param parameters child workflow start command parameters
769
   * @param startedCallback callback that is notified about child start
770
   * @param completionCallback invoked when child reports completion or failure
771
   * @return cancellation callback that should be invoked to cancel the child
772
   */
773
  public Functions.Proc startChildWorkflow(
774
      StartChildWorkflowExecutionParameters parameters,
775
      Functions.Proc2<WorkflowExecution, Exception> startedCallback,
776
      Functions.Proc2<Optional<Payloads>, Exception> completionCallback) {
777
    checkEventLoopExecuting();
1✔
778
    StartChildWorkflowExecutionCommandAttributes attributes = parameters.getRequest().build();
1✔
779
    ChildWorkflowCancellationType cancellationType = parameters.getCancellationType();
1✔
780
    ChildWorkflowStateMachine child =
1✔
781
        ChildWorkflowStateMachine.newInstance(
1✔
782
            attributes, startedCallback, completionCallback, commandSink, stateMachineSink);
783
    return () -> {
1✔
784
      if (cancellationType == ChildWorkflowCancellationType.ABANDON) {
1✔
785
        notifyChildCanceled(completionCallback);
1✔
786
        return;
1✔
787
      }
788
      // The only time child can be canceled directly is before its start command
789
      // was sent out to the service. After that RequestCancelExternal should be used.
790
      if (child.isCancellable()) {
1✔
791
        child.cancel();
1✔
792
        return;
1✔
793
      }
794
      if (!child.isFinalState()) {
1✔
795
        requestCancelExternalWorkflowExecution(
1✔
796
            RequestCancelExternalWorkflowExecutionCommandAttributes.newBuilder()
1✔
797
                .setWorkflowId(attributes.getWorkflowId())
1✔
798
                .setNamespace(attributes.getNamespace())
1✔
799
                .setChildWorkflowOnly(true)
1✔
800
                .build(),
1✔
801
            (r, e) -> { // TODO(maxim): Decide what to do if an error is passed to the callback.
802
              if (cancellationType == ChildWorkflowCancellationType.WAIT_CANCELLATION_REQUESTED) {
1✔
803
                notifyChildCanceled(completionCallback);
1✔
804
              }
805
            });
1✔
806
        if (cancellationType == ChildWorkflowCancellationType.TRY_CANCEL) {
1✔
807
          notifyChildCanceled(completionCallback);
1✔
808
        }
809
      }
810
    };
1✔
811
  }
812

813
  private void notifyChildCanceled(
814
      Functions.Proc2<Optional<Payloads>, Exception> completionCallback) {
815
    CanceledFailure failure = new CanceledFailure("Child canceled");
1✔
816
    completionCallback.apply(Optional.empty(), failure);
1✔
817
    eventLoop();
1✔
818
  }
1✔
819

820
  /**
821
   * @param attributes
822
   * @param completionCallback invoked when signal delivery completes of fails. The following types
823
   */
824
  public Functions.Proc signalExternalWorkflowExecution(
825
      SignalExternalWorkflowExecutionCommandAttributes attributes,
826
      Functions.Proc2<Void, Failure> completionCallback) {
827
    checkEventLoopExecuting();
1✔
828
    return SignalExternalStateMachine.newInstance(
1✔
829
        attributes, completionCallback, commandSink, stateMachineSink);
830
  }
831

832
  /**
833
   * @param attributes attributes to use to cancel external workflow
834
   * @param completionCallback one of ExternalWorkflowExecutionCancelRequestedEvent,
835
   */
836
  public void requestCancelExternalWorkflowExecution(
837
      RequestCancelExternalWorkflowExecutionCommandAttributes attributes,
838
      Functions.Proc2<Void, RuntimeException> completionCallback) {
839
    checkEventLoopExecuting();
1✔
840
    CancelExternalStateMachine.newInstance(
1✔
841
        attributes, completionCallback, commandSink, stateMachineSink);
842
  }
1✔
843

844
  public void upsertSearchAttributes(SearchAttributes attributes) {
845
    checkEventLoopExecuting();
1✔
846
    UpsertSearchAttributesStateMachine.newInstance(attributes, commandSink, stateMachineSink);
1✔
847
  }
1✔
848

849
  public void completeWorkflow(Optional<Payloads> workflowOutput) {
850
    checkEventLoopExecuting();
1✔
851
    CompleteWorkflowStateMachine.newInstance(workflowOutput, commandSink, stateMachineSink);
1✔
852
  }
1✔
853

854
  public void failWorkflow(Failure failure) {
855
    checkEventLoopExecuting();
1✔
856
    FailWorkflowStateMachine.newInstance(failure, commandSink, stateMachineSink);
1✔
857
  }
1✔
858

859
  public void cancelWorkflow() {
860
    checkEventLoopExecuting();
1✔
861
    CancelWorkflowStateMachine.newInstance(
1✔
862
        CancelWorkflowExecutionCommandAttributes.getDefaultInstance(),
1✔
863
        commandSink,
864
        stateMachineSink);
865
  }
1✔
866

867
  public void continueAsNewWorkflow(ContinueAsNewWorkflowExecutionCommandAttributes attributes) {
868
    checkEventLoopExecuting();
1✔
869
    ContinueAsNewWorkflowStateMachine.newInstance(attributes, commandSink, stateMachineSink);
1✔
870
  }
1✔
871

872
  public boolean isReplaying() {
873
    return replaying;
1✔
874
  }
875

876
  public long currentTimeMillis() {
877
    return currentTimeMillis;
1✔
878
  }
879

880
  public UUID randomUUID() {
881
    checkEventLoopExecuting();
1✔
882
    String runId = currentRunId;
1✔
883
    if (runId == null) {
1✔
884
      throw new Error("null currentRunId");
×
885
    }
886
    String id = runId + ":" + idCounter++;
1✔
887
    byte[] bytes = id.getBytes(StandardCharsets.UTF_8);
1✔
888
    return UUID.nameUUIDFromBytes(bytes);
1✔
889
  }
890

891
  public Random newRandom() {
892
    checkEventLoopExecuting();
1✔
893
    return new Random(randomUUID().getLeastSignificantBits());
1✔
894
  }
895

896
  public void sideEffect(
897
      Functions.Func<Optional<Payloads>> func, Functions.Proc1<Optional<Payloads>> callback) {
898
    checkEventLoopExecuting();
1✔
899
    SideEffectStateMachine.newInstance(
1✔
900
        this::isReplaying,
901
        func,
902
        (payloads) -> {
903
          callback.apply(payloads);
1✔
904
          // callback unblocked sideEffect call. Give workflow code chance to make progress.
905
          eventLoop();
1✔
906
        },
1✔
907
        commandSink,
908
        stateMachineSink);
909
  }
1✔
910

911
  /**
912
   * @param id mutable side effect id
913
   * @param func given the value from the last marker returns value to store. If result is empty
914
   *     nothing is recorded into the history.
915
   * @param callback used to report result or failure
916
   */
917
  public void mutableSideEffect(
918
      String id,
919
      Functions.Func1<Optional<Payloads>, Optional<Payloads>> func,
920
      Functions.Proc1<Optional<Payloads>> callback) {
921
    checkEventLoopExecuting();
1✔
922
    MutableSideEffectStateMachine stateMachine =
1✔
923
        mutableSideEffects.computeIfAbsent(
1✔
924
            id,
925
            (idKey) ->
926
                MutableSideEffectStateMachine.newInstance(
1✔
927
                    idKey, this::isReplaying, commandSink, stateMachineSink));
928
    stateMachine.mutableSideEffect(
1✔
929
        func,
930
        (r) -> {
931
          callback.apply(r);
1✔
932
          // callback unblocked mutableSideEffect call. Give workflow code chance to make progress.
933
          eventLoop();
1✔
934
        },
1✔
935
        stateMachineSink);
936
  }
1✔
937

938
  public boolean getVersion(
939
      String changeId,
940
      int minSupported,
941
      int maxSupported,
942
      Functions.Proc2<Integer, RuntimeException> callback) {
943
    VersionStateMachine stateMachine =
1✔
944
        versions.computeIfAbsent(
1✔
945
            changeId,
946
            (idKey) -> {
947
              return VersionStateMachine.newInstance(
1✔
948
                  changeId, this::isReplaying, commandSink, stateMachineSink);
949
            });
950
    return stateMachine.getVersion(
1✔
951
        minSupported,
952
        maxSupported,
953
        (v, e) -> {
954
          callback.apply(v, e);
1✔
955
          // without this getVersion call will trigger the end of WFT,
956
          // instead we want to prepare subsequent commands and unblock the execution one more
957
          // time.
958
          eventLoop();
1✔
959
        });
1✔
960
  }
961

962
  public List<ExecuteLocalActivityParameters> takeLocalActivityRequests() {
963
    List<ExecuteLocalActivityParameters> result = localActivityRequests;
1✔
964
    localActivityRequests = new ArrayList<>();
1✔
965
    for (ExecuteLocalActivityParameters parameters : result) {
1✔
966
      LocalActivityStateMachine stateMachine = localActivityMap.get(parameters.getActivityId());
1✔
967
      stateMachine.markAsSent();
1✔
968
    }
1✔
969
    return result;
1✔
970
  }
971

972
  public void handleLocalActivityCompletion(LocalActivityResult laCompletion) {
973
    String activityId = laCompletion.getActivityId();
1✔
974
    LocalActivityStateMachine laStateMachine = localActivityMap.get(activityId);
1✔
975
    if (laStateMachine == null) {
1✔
976
      throw new IllegalStateException("Unknown local activity: " + activityId);
×
977
    }
978
    laStateMachine.handleCompletion(laCompletion);
1✔
979
    prepareCommands();
1✔
980
  }
1✔
981

982
  public Functions.Proc scheduleLocalActivityTask(
983
      ExecuteLocalActivityParameters parameters,
984
      Functions.Proc2<Optional<Payloads>, LocalActivityCallback.LocalActivityFailedException>
985
          callback) {
986
    checkEventLoopExecuting();
1✔
987
    String activityId = parameters.getActivityId();
1✔
988
    if (Strings.isNullOrEmpty(activityId)) {
1✔
989
      throw new IllegalArgumentException("Missing activityId: " + activityId);
×
990
    }
991
    if (localActivityMap.containsKey(activityId)) {
1✔
992
      throw new IllegalArgumentException("Duplicated local activity id: " + activityId);
×
993
    }
994
    LocalActivityStateMachine commands =
1✔
995
        LocalActivityStateMachine.newInstance(
1✔
996
            this::isReplaying,
997
            this::setCurrentTimeMillis,
998
            parameters,
999
            (r, e) -> {
1000
              callback.apply(r, e);
1✔
1001
              // callback unblocked local activity call. Give workflow code chance to make progress.
1002
              eventLoop();
1✔
1003
            },
1✔
1004
            localActivityRequestSink,
1005
            commandSink,
1006
            stateMachineSink,
1007
            currentTimeMillis);
1008
    localActivityMap.put(activityId, commands);
1✔
1009
    return commands::cancel;
1✔
1010
  }
1011

1012
  /** Validates that command matches the event during replay. */
1013
  private void validateCommand(Command command, HistoryEvent event) {
1014
    // TODO(maxim): Add more thorough validation logic. For example check if activity IDs are
1015
    // matching.
1016

1017
    // ProtocolMessageCommand is different from other commands because it can be associated with
1018
    // multiple types of events
1019
    // TODO(#1781) Validate protocol message is expected type.
1020
    if (command.getCommandType() == COMMAND_TYPE_PROTOCOL_MESSAGE) {
1✔
1021
      ProtocolMessageCommandAttributes commandAttributes =
1✔
1022
          command.getProtocolMessageCommandAttributes();
1✔
1023
      switch (event.getEventType()) {
1✔
1024
        case EVENT_TYPE_WORKFLOW_EXECUTION_UPDATE_ACCEPTED:
1025
          assertMatch(
1✔
1026
              command,
1027
              event,
1028
              "messageType",
1029
              true,
1✔
1030
              commandAttributes.getMessageId().endsWith("accept"));
1✔
1031
          break;
1✔
1032
        case EVENT_TYPE_WORKFLOW_EXECUTION_UPDATE_REJECTED:
1033
          assertMatch(
×
1034
              command,
1035
              event,
1036
              "messageType",
1037
              true,
×
1038
              commandAttributes.getMessageId().endsWith("reject"));
×
1039
          break;
×
1040
        case EVENT_TYPE_WORKFLOW_EXECUTION_UPDATE_COMPLETED:
1041
          assertMatch(
1✔
1042
              command,
1043
              event,
1044
              "messageType",
1045
              true,
1✔
1046
              commandAttributes.getMessageId().endsWith("complete"));
1✔
1047
          break;
1✔
1048
        default:
1049
          throw new IllegalArgumentException("Unexpected event type: " + event.getEventType());
×
1050
      }
1051
      return;
1✔
1052
    }
1053
    assertMatch(
1✔
1054
        command,
1055
        event,
1056
        "eventType",
1057
        getEventTypeForCommand(command.getCommandType()),
1✔
1058
        event.getEventType());
1✔
1059
    switch (command.getCommandType()) {
1✔
1060
      case COMMAND_TYPE_SCHEDULE_ACTIVITY_TASK:
1061
        {
1062
          ScheduleActivityTaskCommandAttributes commandAttributes =
1✔
1063
              command.getScheduleActivityTaskCommandAttributes();
1✔
1064
          ActivityTaskScheduledEventAttributes eventAttributes =
1✔
1065
              event.getActivityTaskScheduledEventAttributes();
1✔
1066
          assertMatch(
1✔
1067
              command,
1068
              event,
1069
              "activityId",
1070
              commandAttributes.getActivityId(),
1✔
1071
              eventAttributes.getActivityId());
1✔
1072
          assertMatch(
1✔
1073
              command,
1074
              event,
1075
              "activityType",
1076
              commandAttributes.getActivityType(),
1✔
1077
              eventAttributes.getActivityType());
1✔
1078
        }
1079
        break;
1✔
1080
      case COMMAND_TYPE_START_CHILD_WORKFLOW_EXECUTION:
1081
        {
1082
          StartChildWorkflowExecutionCommandAttributes commandAttributes =
1✔
1083
              command.getStartChildWorkflowExecutionCommandAttributes();
1✔
1084
          StartChildWorkflowExecutionInitiatedEventAttributes eventAttributes =
1✔
1085
              event.getStartChildWorkflowExecutionInitiatedEventAttributes();
1✔
1086
          assertMatch(
1✔
1087
              command,
1088
              event,
1089
              "workflowId",
1090
              commandAttributes.getWorkflowId(),
1✔
1091
              eventAttributes.getWorkflowId());
1✔
1092
          assertMatch(
1✔
1093
              command,
1094
              event,
1095
              "workflowType",
1096
              commandAttributes.getWorkflowType(),
1✔
1097
              eventAttributes.getWorkflowType());
1✔
1098
        }
1099
        break;
1✔
1100
      case COMMAND_TYPE_REQUEST_CANCEL_ACTIVITY_TASK:
1101
      case COMMAND_TYPE_START_TIMER:
1102
        {
1103
          StartTimerCommandAttributes commandAttributes = command.getStartTimerCommandAttributes();
1✔
1104
          TimerStartedEventAttributes eventAttributes = event.getTimerStartedEventAttributes();
1✔
1105
          assertMatch(
1✔
1106
              command,
1107
              event,
1108
              "timerId",
1109
              commandAttributes.getTimerId(),
1✔
1110
              eventAttributes.getTimerId());
1✔
1111
        }
1112
        break;
1✔
1113
      case COMMAND_TYPE_CANCEL_TIMER:
1114
      case COMMAND_TYPE_CANCEL_WORKFLOW_EXECUTION:
1115
      case COMMAND_TYPE_REQUEST_CANCEL_EXTERNAL_WORKFLOW_EXECUTION:
1116
      case COMMAND_TYPE_RECORD_MARKER:
1117
      case COMMAND_TYPE_CONTINUE_AS_NEW_WORKFLOW_EXECUTION:
1118
      case COMMAND_TYPE_SIGNAL_EXTERNAL_WORKFLOW_EXECUTION:
1119
      case COMMAND_TYPE_UPSERT_WORKFLOW_SEARCH_ATTRIBUTES:
1120
      case COMMAND_TYPE_COMPLETE_WORKFLOW_EXECUTION:
1121
      case COMMAND_TYPE_FAIL_WORKFLOW_EXECUTION:
1122
      case COMMAND_TYPE_PROTOCOL_MESSAGE:
1123
        break;
1✔
1124
      case UNRECOGNIZED:
1125
      case COMMAND_TYPE_UNSPECIFIED:
1126
        throw new IllegalArgumentException("Unexpected command type: " + command.getCommandType());
×
1127
    }
1128
  }
1✔
1129

1130
  private void assertMatch(
1131
      Command command, HistoryEvent event, String checkType, Object expected, Object actual) {
1132
    if (!expected.equals(actual)) {
1✔
1133
      String message =
×
1134
          String.format(
×
1135
              "Command %s doesn't match event %s with EventId=%s on check %s "
1136
                  + "with an expected value '%s' and an actual value '%s'",
1137
              command.getCommandType(),
×
1138
              event.getEventType(),
×
1139
              event.getEventId(),
×
1140
              checkType,
1141
              expected,
1142
              actual);
1143
      throw new NonDeterministicException(message);
×
1144
    }
1145
  }
1✔
1146

1147
  private class WorkflowTaskCommandsListener implements WorkflowTaskStateMachine.Listener {
1✔
1148
    @Override
1149
    public void workflowTaskStarted(
1150
        long startedEventId,
1151
        long currentTimeMillis,
1152
        boolean nonProcessedWorkflowTask,
1153
        long historySize,
1154
        boolean isContinueAsNewSuggested) {
1155
      setCurrentTimeMillis(currentTimeMillis);
1✔
1156
      for (CancellableCommand cancellableCommand : commands) {
1✔
1157
        cancellableCommand.handleWorkflowTaskStarted();
1✔
1158
      }
1✔
1159
      // Give local activities a chance to recreate their requests if they were lost due
1160
      // to the last workflow task failure. The loss could happen only the last workflow task
1161
      // was forcibly created by setting forceCreate on RespondWorkflowTaskCompletedRequest.
1162
      if (nonProcessedWorkflowTask) {
1✔
1163
        for (LocalActivityStateMachine value : localActivityMap.values()) {
1✔
1164
          value.nonReplayWorkflowTaskStarted();
1✔
1165
        }
1✔
1166
      }
1167
      WorkflowStateMachines.this.currentStartedEventId = startedEventId;
1✔
1168
      WorkflowStateMachines.this.historySize = historySize;
1✔
1169
      WorkflowStateMachines.this.isContinueAsNewSuggested = isContinueAsNewSuggested;
1✔
1170

1171
      eventLoop();
1✔
1172
    }
1✔
1173

1174
    @Override
1175
    public void updateRunId(String currentRunId) {
1176
      WorkflowStateMachines.this.currentRunId = currentRunId;
×
1177
    }
×
1178
  }
1179

1180
  private long getInitialCommandEventId(HistoryEvent event) {
1181
    switch (event.getEventType()) {
1✔
1182
      case EVENT_TYPE_ACTIVITY_TASK_STARTED:
1183
        return event.getActivityTaskStartedEventAttributes().getScheduledEventId();
1✔
1184
      case EVENT_TYPE_ACTIVITY_TASK_COMPLETED:
1185
        return event.getActivityTaskCompletedEventAttributes().getScheduledEventId();
1✔
1186
      case EVENT_TYPE_ACTIVITY_TASK_FAILED:
1187
        return event.getActivityTaskFailedEventAttributes().getScheduledEventId();
1✔
1188
      case EVENT_TYPE_ACTIVITY_TASK_TIMED_OUT:
1189
        return event.getActivityTaskTimedOutEventAttributes().getScheduledEventId();
1✔
1190
      case EVENT_TYPE_ACTIVITY_TASK_CANCEL_REQUESTED:
1191
        return event.getActivityTaskCancelRequestedEventAttributes().getScheduledEventId();
×
1192
      case EVENT_TYPE_ACTIVITY_TASK_CANCELED:
1193
        return event.getActivityTaskCanceledEventAttributes().getScheduledEventId();
1✔
1194
      case EVENT_TYPE_TIMER_FIRED:
1195
        return event.getTimerFiredEventAttributes().getStartedEventId();
1✔
1196
      case EVENT_TYPE_TIMER_CANCELED:
1197
        return event.getTimerCanceledEventAttributes().getStartedEventId();
×
1198
      case EVENT_TYPE_REQUEST_CANCEL_EXTERNAL_WORKFLOW_EXECUTION_FAILED:
1199
        return event
×
1200
            .getRequestCancelExternalWorkflowExecutionFailedEventAttributes()
×
1201
            .getInitiatedEventId();
×
1202
      case EVENT_TYPE_EXTERNAL_WORKFLOW_EXECUTION_CANCEL_REQUESTED:
1203
        return event
1✔
1204
            .getExternalWorkflowExecutionCancelRequestedEventAttributes()
1✔
1205
            .getInitiatedEventId();
1✔
1206
      case EVENT_TYPE_START_CHILD_WORKFLOW_EXECUTION_FAILED:
1207
        return event.getStartChildWorkflowExecutionFailedEventAttributes().getInitiatedEventId();
1✔
1208
      case EVENT_TYPE_CHILD_WORKFLOW_EXECUTION_STARTED:
1209
        return event.getChildWorkflowExecutionStartedEventAttributes().getInitiatedEventId();
1✔
1210
      case EVENT_TYPE_CHILD_WORKFLOW_EXECUTION_COMPLETED:
1211
        return event.getChildWorkflowExecutionCompletedEventAttributes().getInitiatedEventId();
1✔
1212
      case EVENT_TYPE_CHILD_WORKFLOW_EXECUTION_FAILED:
1213
        return event.getChildWorkflowExecutionFailedEventAttributes().getInitiatedEventId();
1✔
1214
      case EVENT_TYPE_CHILD_WORKFLOW_EXECUTION_CANCELED:
1215
        return event.getChildWorkflowExecutionCanceledEventAttributes().getInitiatedEventId();
1✔
1216
      case EVENT_TYPE_CHILD_WORKFLOW_EXECUTION_TIMED_OUT:
1217
        return event.getChildWorkflowExecutionTimedOutEventAttributes().getInitiatedEventId();
1✔
1218
      case EVENT_TYPE_CHILD_WORKFLOW_EXECUTION_TERMINATED:
1219
        return event.getChildWorkflowExecutionTerminatedEventAttributes().getInitiatedEventId();
×
1220
      case EVENT_TYPE_SIGNAL_EXTERNAL_WORKFLOW_EXECUTION_FAILED:
1221
        return event
1✔
1222
            .getSignalExternalWorkflowExecutionFailedEventAttributes()
1✔
1223
            .getInitiatedEventId();
1✔
1224
      case EVENT_TYPE_EXTERNAL_WORKFLOW_EXECUTION_SIGNALED:
1225
        return event.getExternalWorkflowExecutionSignaledEventAttributes().getInitiatedEventId();
1✔
1226
      case EVENT_TYPE_WORKFLOW_TASK_STARTED:
1227
        return event.getWorkflowTaskStartedEventAttributes().getScheduledEventId();
1✔
1228
      case EVENT_TYPE_WORKFLOW_TASK_COMPLETED:
1229
        return event.getWorkflowTaskCompletedEventAttributes().getScheduledEventId();
1✔
1230
      case EVENT_TYPE_WORKFLOW_TASK_TIMED_OUT:
1231
        return event.getWorkflowTaskTimedOutEventAttributes().getScheduledEventId();
1✔
1232
      case EVENT_TYPE_WORKFLOW_TASK_FAILED:
1233
        return event.getWorkflowTaskFailedEventAttributes().getScheduledEventId();
1✔
1234

1235
      case EVENT_TYPE_ACTIVITY_TASK_SCHEDULED:
1236
      case EVENT_TYPE_TIMER_STARTED:
1237
      case EVENT_TYPE_MARKER_RECORDED:
1238
      case EVENT_TYPE_SIGNAL_EXTERNAL_WORKFLOW_EXECUTION_INITIATED:
1239
      case EVENT_TYPE_START_CHILD_WORKFLOW_EXECUTION_INITIATED:
1240
      case EVENT_TYPE_REQUEST_CANCEL_EXTERNAL_WORKFLOW_EXECUTION_INITIATED:
1241
      case EVENT_TYPE_WORKFLOW_EXECUTION_CONTINUED_AS_NEW:
1242
      case EVENT_TYPE_WORKFLOW_EXECUTION_TERMINATED:
1243
      case EVENT_TYPE_UPSERT_WORKFLOW_SEARCH_ATTRIBUTES:
1244
      case EVENT_TYPE_WORKFLOW_TASK_SCHEDULED:
1245
      case EVENT_TYPE_WORKFLOW_EXECUTION_SIGNALED:
1246
      case EVENT_TYPE_WORKFLOW_EXECUTION_STARTED:
1247
      case EVENT_TYPE_WORKFLOW_EXECUTION_COMPLETED:
1248
      case EVENT_TYPE_WORKFLOW_EXECUTION_FAILED:
1249
      case EVENT_TYPE_WORKFLOW_EXECUTION_TIMED_OUT:
1250
      case EVENT_TYPE_WORKFLOW_EXECUTION_CANCEL_REQUESTED:
1251
      case EVENT_TYPE_WORKFLOW_EXECUTION_CANCELED:
1252
        return event.getEventId();
1✔
1253
      case UNRECOGNIZED:
1254
      case EVENT_TYPE_UNSPECIFIED:
1255
        throw new IllegalArgumentException("Unexpected event type: " + event.getEventType());
×
1256
    }
1257
    throw new IllegalStateException("unreachable");
×
1258
  }
1259

1260
  /**
1261
   * Workflow code executes only while event loop is running. So operations that can be invoked from
1262
   * the workflow have to satisfy this condition.
1263
   */
1264
  private void checkEventLoopExecuting() {
1265
    if (!eventLoopExecuting) {
1✔
1266
      // this call doesn't yield or await, because the await function returns true,
1267
      // but it checks if the workflow thread needs to be destroyed
1268
      WorkflowThread.await("kill workflow thread if destroy requested", () -> true);
×
1269
      throw new IllegalStateException("Operation allowed only while eventLoop is running");
×
1270
    }
1271
  }
1✔
1272

1273
  private String createEventHandlingMessage(HistoryEvent event) {
1274
    return "Failure handling event "
1✔
1275
        + event.getEventId()
1✔
1276
        + " of type '"
1277
        + event.getEventType()
1✔
1278
        + "' "
1279
        + (this.isReplaying() ? "during replay" : "during execution");
1✔
1280
  }
1281

1282
  private String createShortCurrentStateMessagePostfix() {
1283
    return String.format(
1✔
1284
        "{WorkflowTaskStartedEventId=%s, CurrentStartedEventId=%s}",
1285
        this.workflowTaskStartedEventId, this.currentStartedEventId);
1✔
1286
  }
1287
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc