• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

temporalio / sdk-java / #181

pending completion
#181

push

github-actions

web-flow
Properly wrap exceptions from schedule client (#1827)

Wrap schedule exception

37 of 37 new or added lines in 1 file covered. (100.0%)

18557 of 23894 relevant lines covered (77.66%)

0.78 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

91.92
/temporal-sdk/src/main/java/io/temporal/internal/statemachines/WorkflowStateMachines.java
1
/*
2
 * Copyright (C) 2022 Temporal Technologies, Inc. All Rights Reserved.
3
 *
4
 * Copyright (C) 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved.
5
 *
6
 * Modifications copyright (C) 2017 Uber Technologies, Inc.
7
 *
8
 * Licensed under the Apache License, Version 2.0 (the "License");
9
 * you may not use this material except in compliance with the License.
10
 * You may obtain a copy of the License at
11
 *
12
 *   http://www.apache.org/licenses/LICENSE-2.0
13
 *
14
 * Unless required by applicable law or agreed to in writing, software
15
 * distributed under the License is distributed on an "AS IS" BASIS,
16
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17
 * See the License for the specific language governing permissions and
18
 * limitations under the License.
19
 */
20

21
package io.temporal.internal.statemachines;
22

23
import static io.temporal.api.enums.v1.CommandType.COMMAND_TYPE_PROTOCOL_MESSAGE;
24
import static io.temporal.internal.common.WorkflowExecutionUtils.getEventTypeForCommand;
25
import static io.temporal.internal.common.WorkflowExecutionUtils.isCommandEvent;
26
import static io.temporal.serviceclient.CheckedExceptionWrapper.unwrap;
27

28
import com.google.common.annotations.VisibleForTesting;
29
import com.google.common.base.Preconditions;
30
import com.google.common.base.Strings;
31
import com.google.protobuf.Any;
32
import io.temporal.api.command.v1.*;
33
import io.temporal.api.common.v1.Payloads;
34
import io.temporal.api.common.v1.SearchAttributes;
35
import io.temporal.api.common.v1.WorkflowExecution;
36
import io.temporal.api.enums.v1.EventType;
37
import io.temporal.api.failure.v1.Failure;
38
import io.temporal.api.history.v1.*;
39
import io.temporal.api.protocol.v1.Message;
40
import io.temporal.api.workflowservice.v1.GetSystemInfoResponse;
41
import io.temporal.failure.CanceledFailure;
42
import io.temporal.internal.common.*;
43
import io.temporal.internal.history.LocalActivityMarkerUtils;
44
import io.temporal.internal.history.VersionMarkerUtils;
45
import io.temporal.internal.sync.WorkflowThread;
46
import io.temporal.internal.worker.LocalActivityResult;
47
import io.temporal.worker.NonDeterministicException;
48
import io.temporal.workflow.ChildWorkflowCancellationType;
49
import io.temporal.workflow.Functions;
50
import java.nio.charset.StandardCharsets;
51
import java.util.*;
52

53
public final class WorkflowStateMachines {
54

55
  enum HandleEventStatus {
1✔
56
    OK,
1✔
57
    NON_MATCHING_EVENT
1✔
58
  }
59

60
  /** Initial set of SDK flags that will be set on all new workflow executions. */
61
  private static final List<SdkFlag> initialFlags =
1✔
62
      Collections.unmodifiableList(Arrays.asList(SdkFlag.SKIP_YIELD_ON_DEFAULT_VERSION));
1✔
63

64
  /**
65
   * EventId of the WorkflowTaskStarted event of the Workflow Task that was picked up by a worker
66
   * and triggered a current replay or execution. It's expected to be the last event in the history
67
   * if we continue to execute the workflow.
68
   *
69
   * <p>For direct (legacy) queries, it may be:
70
   *
71
   * <ul>
72
   *   <li>0 if it's query a closed workflow execution
73
   *   <li>id of the last successfully completed Workflow Task if the workflow is not closed
74
   * </ul>
75
   *
76
   * <p>Set from the "outside" from the PollWorkflowTaskQueueResponse. Not modified by the SDK state
77
   * machines.
78
   */
79
  private long workflowTaskStartedEventId;
80

81
  /** EventId of the last WorkflowTaskStarted event handled by these state machines. */
82
  private long currentStartedEventId;
83

84
  /**
85
   * EventId of the last event seen by these state machines. Events earlier than this one will be
86
   * discarded.
87
   */
88
  private long lastHandledEventId;
89

90
  private final StatesMachinesCallback callbacks;
91

92
  /** Callback to send new commands to. */
93
  private final Functions.Proc1<CancellableCommand> commandSink;
94

95
  /**
96
   * currentRunId is used as seed by Workflow.newRandom and randomUUID. It allows to generate them
97
   * deterministically.
98
   */
99
  private String currentRunId;
100

101
  /** Used Workflow.newRandom and randomUUID together with currentRunId. */
102
  private long idCounter;
103

104
  /** Current workflow time. */
105
  private long currentTimeMillis = -1;
1✔
106

107
  private final Map<Long, EntityStateMachine> stateMachines = new HashMap<>();
1✔
108

109
  private final Map<String, EntityStateMachine> protocolStateMachines = new HashMap<>();
1✔
110

111
  private final Queue<Message> messageOutbox = new ArrayDeque<>();
1✔
112

113
  private final Queue<CancellableCommand> commands = new ArrayDeque<>();
1✔
114

115
  /**
116
   * Commands generated by the currently processed workflow task. It is a queue as commands can be
117
   * added (due to marker based commands) while iterating over already added commands.
118
   */
119
  private final Queue<CancellableCommand> cancellableCommands = new ArrayDeque<>();
1✔
120

121
  /**
122
   * Is workflow executing new code or replaying from the history. Note that this flag ALWAYS flips
123
   * to true for the time when we apply events from the server even if the commands were created by
124
   * an actual execution with replaying=false.
125
   */
126
  private boolean replaying;
127

128
  /** Used to ensure that event loop is not executed recursively. */
129
  private boolean eventLoopExecuting;
130

131
  /**
132
   * Used to avoid recursive calls to {@link #prepareCommands()}.
133
   *
134
   * <p>Such calls happen when sideEffects and localActivity markers are processed.
135
   */
136
  private boolean preparing;
137

138
  /** Key is mutable side effect id */
139
  private final Map<String, MutableSideEffectStateMachine> mutableSideEffects = new HashMap<>();
1✔
140

141
  /** Key is changeId */
142
  private final Map<String, VersionStateMachine> versions = new HashMap<>();
1✔
143

144
  /** Map of local activities by their id. */
145
  private final Map<String, LocalActivityStateMachine> localActivityMap = new HashMap<>();
1✔
146

147
  private List<ExecuteLocalActivityParameters> localActivityRequests = new ArrayList<>();
1✔
148

149
  private final Functions.Proc1<ExecuteLocalActivityParameters> localActivityRequestSink;
150
  private final Functions.Proc1<StateMachine> stateMachineSink;
151

152
  private final WFTBuffer wftBuffer = new WFTBuffer();
1✔
153

154
  private List<Message> messages = new ArrayList<Message>();
1✔
155

156
  private final SdkFlags flags;
157

158
  public WorkflowStateMachines(
159
      StatesMachinesCallback callbacks, GetSystemInfoResponse.Capabilities capabilities) {
160
    this(callbacks, (stateMachine) -> {}, capabilities);
1✔
161
  }
1✔
162

163
  @VisibleForTesting
164
  public WorkflowStateMachines(
165
      StatesMachinesCallback callbacks,
166
      Functions.Proc1<StateMachine> stateMachineSink,
167
      GetSystemInfoResponse.Capabilities capabilities) {
1✔
168
    this.callbacks = Objects.requireNonNull(callbacks);
1✔
169
    this.commandSink = cancellableCommands::add;
1✔
170
    this.stateMachineSink = stateMachineSink;
1✔
171
    this.localActivityRequestSink = (request) -> localActivityRequests.add(request);
1✔
172
    this.flags = new SdkFlags(capabilities.getSdkMetadata(), this::isReplaying);
1✔
173
  }
1✔
174

175
  @VisibleForTesting
176
  public WorkflowStateMachines(
177
      StatesMachinesCallback callbacks, Functions.Proc1<StateMachine> stateMachineSink) {
1✔
178
    this.callbacks = Objects.requireNonNull(callbacks);
1✔
179
    this.commandSink = cancellableCommands::add;
1✔
180
    this.stateMachineSink = stateMachineSink;
1✔
181
    this.localActivityRequestSink = (request) -> localActivityRequests.add(request);
1✔
182
    this.flags = new SdkFlags(false, this::isReplaying);
1✔
183
  }
1✔
184

185
  // TODO revisit and potentially remove workflowTaskStartedEventId at all from the state machines.
186
  // The only place where it's used is WorkflowTaskStateMachine to understand that there will be
187
  // no completion event in the history.
188
  // This is tricky, because direct queries come with 0 as a magic value in this field if
189
  // execution is completed for example.
190
  // Most likely we can rework WorkflowTaskStateMachine to use only hasNext.
191
  /**
192
   * @param workflowTaskStartedEventId eventId of the workflowTask that was picked up by a worker
193
   *     and triggered an execution. Used in {@link WorkflowTaskStateMachine} only to understand
194
   *     that this workflow task will not have a matching closing event and needs to be executed.
195
   */
196
  public void setWorklfowStartedEventId(long workflowTaskStartedEventId) {
197
    this.workflowTaskStartedEventId = workflowTaskStartedEventId;
1✔
198
  }
1✔
199

200
  public void setCurrentStartedEventId(long eventId) {
201
    this.currentStartedEventId = eventId;
×
202
  }
×
203

204
  public long getCurrentStartedEventId() {
205
    return currentStartedEventId;
1✔
206
  }
207

208
  public void setReplaying(boolean replaying) {
209
    this.replaying = replaying;
1✔
210
  }
1✔
211

212
  public void setMessages(List<Message> messages) {
213
    this.messages = messages;
1✔
214
  }
1✔
215

216
  /**
217
   * Handle a single event from the workflow history.
218
   *
219
   * @param event event from the history.
220
   * @param hasNextEvent false if this is the last event in the history.
221
   */
222
  public void handleEvent(HistoryEvent event, boolean hasNextEvent) {
223
    long eventId = event.getEventId();
1✔
224
    if (eventId <= lastHandledEventId) {
1✔
225
      // already handled
226
      return;
1✔
227
    }
228
    Preconditions.checkState(
1✔
229
        eventId == lastHandledEventId + 1,
230
        "History is out of order. "
231
            + "There is a gap between the last event workflow state machine observed and currently handling event. "
232
            + "Last processed eventId: %s, handling eventId: %s",
233
        lastHandledEventId,
234
        eventId);
235

236
    lastHandledEventId = eventId;
1✔
237
    boolean readyToPeek = wftBuffer.addEvent(event, hasNextEvent);
1✔
238
    if (readyToPeek) {
1✔
239
      handleEventsBatch(wftBuffer.fetch(), hasNextEvent);
1✔
240
    }
241
  }
1✔
242

243
  /**
244
   * Handle an events batch for one workflow task. Events that are related to one workflow task
245
   * during replay should be prefetched and supplied in one batch.
246
   *
247
   * @param events events belong to one workflow task
248
   * @param hasNextEvent true if there are more events in the history follow this batch, false if
249
   *     this batch contains the last events of the history
250
   */
251
  private void handleEventsBatch(List<HistoryEvent> events, boolean hasNextEvent) {
252
    if (EventType.EVENT_TYPE_WORKFLOW_EXECUTION_STARTED.equals(events.get(0).getEventType())) {
1✔
253
      for (SdkFlag flag : initialFlags) {
1✔
254
        flags.tryUseSdkFlag(flag);
1✔
255
      }
1✔
256
    }
257

258
    if (EventType.EVENT_TYPE_WORKFLOW_TASK_STARTED.equals(events.get(0).getEventType())) {
1✔
259
      for (HistoryEvent event : events) {
1✔
260
        handleSingleEventLookahead(event);
1✔
261
      }
1✔
262
    }
263

264
    for (Iterator<HistoryEvent> iterator = events.iterator(); iterator.hasNext(); ) {
1✔
265
      HistoryEvent event = iterator.next();
1✔
266

267
      // On replay the messages are available after the workflow task schedule event, so we
268
      // need to handle them before workflow task started event to maintain a consistent order.
269
      for (Message msg : this.takeLTE(event.getEventId() - 1)) {
1✔
270
        handleSingleMessage(msg);
1✔
271
      }
1✔
272

273
      try {
274
        handleSingleEvent(event, iterator.hasNext() || hasNextEvent);
1✔
275
      } catch (RuntimeException e) {
1✔
276
        throw createEventProcessingException(e, event);
1✔
277
      }
1✔
278

279
      for (Message msg : this.takeLTE(event.getEventId())) {
1✔
280
        handleSingleMessage(msg);
1✔
281
      }
1✔
282
    }
1✔
283
  }
1✔
284

285
  /** Handle an event when looking ahead at history during replay */
286
  private void handleSingleEventLookahead(HistoryEvent event) {
287
    EventType eventType = event.getEventType();
1✔
288
    switch (eventType) {
1✔
289
      case EVENT_TYPE_MARKER_RECORDED:
290
        try {
291
          preloadVersionMarker(event);
1✔
292
        } catch (RuntimeException e) {
1✔
293
          throw createEventProcessingException(e, event);
1✔
294
        }
1✔
295
        break;
296
      case EVENT_TYPE_WORKFLOW_EXECUTION_UPDATE_ACCEPTED:
297
        // Look ahead to infer protocol messages
298
        WorkflowExecutionUpdateAcceptedEventAttributes updateEvent =
1✔
299
            event.getWorkflowExecutionUpdateAcceptedEventAttributes();
1✔
300
        this.messages.add(
1✔
301
            Message.newBuilder()
1✔
302
                .setId(updateEvent.getAcceptedRequestMessageId())
1✔
303
                .setProtocolInstanceId(updateEvent.getProtocolInstanceId())
1✔
304
                .setEventId(updateEvent.getAcceptedRequestSequencingEventId())
1✔
305
                .setBody(Any.pack(updateEvent.getAcceptedRequest()))
1✔
306
                .build());
1✔
307
        break;
1✔
308
      case EVENT_TYPE_WORKFLOW_TASK_COMPLETED:
309
        WorkflowTaskCompletedEventAttributes completedEvent =
1✔
310
            event.getWorkflowTaskCompletedEventAttributes();
1✔
311
        for (Integer flag : completedEvent.getSdkMetadata().getLangUsedFlagsList()) {
1✔
312
          SdkFlag sdkFlag = SdkFlag.getValue(flag);
1✔
313
          if (sdkFlag.equals(SdkFlag.UNKNOWN)) {
1✔
314
            throw new IllegalArgumentException("Unknown SDK flag:" + flag);
×
315
          }
316
          flags.setSdkFlag(sdkFlag);
1✔
317
        }
1✔
318
        break;
319
    }
320
  }
1✔
321

322
  private List<Message> takeLTE(long eventId) {
323
    List<Message> m = new ArrayList<Message>();
1✔
324
    List<Message> remainingMessages = new ArrayList<Message>();
1✔
325
    for (Message msg : this.messages) {
1✔
326
      if (msg.getEventId() > eventId) {
1✔
327
        remainingMessages.add(msg);
1✔
328
      } else {
329
        m.add(msg);
1✔
330
      }
331
    }
1✔
332
    this.messages = remainingMessages;
1✔
333
    return m;
1✔
334
  }
335

336
  private RuntimeException createEventProcessingException(RuntimeException e, HistoryEvent event) {
337
    Throwable ex = unwrap(e);
1✔
338
    if (ex instanceof NonDeterministicException) {
1✔
339
      // just appending the message in front of an existing message, saving the original stacktrace
340
      NonDeterministicException modifiedException =
1✔
341
          new NonDeterministicException(
342
              createEventHandlingMessage(event)
1✔
343
                  + ". "
344
                  + ex.getMessage()
1✔
345
                  + ". "
346
                  + createShortCurrentStateMessagePostfix(),
1✔
347
              ex.getCause());
1✔
348
      modifiedException.setStackTrace(ex.getStackTrace());
1✔
349
      return modifiedException;
1✔
350
    } else {
351
      return new InternalWorkflowTaskException(
1✔
352
          createEventHandlingMessage(event) + ". " + createShortCurrentStateMessagePostfix(), ex);
1✔
353
    }
354
  }
355

356
  private void handleSingleMessage(Message message) {
357
    // Get or create protocol state machine based on Instance ID and protocolName
358
    EntityStateMachine stateMachine =
1✔
359
        protocolStateMachines.computeIfAbsent(
1✔
360
            message.getProtocolInstanceId(),
1✔
361
            (protocolInstance) -> {
362
              String protocolName = ProtocolUtils.getProtocol(message);
1✔
363
              Optional<ProtocolType> type = ProtocolType.get(protocolName);
1✔
364
              if (type.isPresent()) {
1✔
365
                switch (type.get()) {
1✔
366
                  case UPDATE_V1:
367
                    return UpdateProtocolStateMachine.newInstance(
1✔
368
                        this::isReplaying,
369
                        callbacks::update,
1✔
370
                        this::sendMessage,
371
                        commandSink,
372
                        stateMachineSink);
373
                  default:
374
                    throw new IllegalArgumentException("Unknown protocol type:" + protocolName);
×
375
                }
376
              }
377
              throw new IllegalArgumentException("Protocol type not specified:" + message);
×
378
            });
379
    stateMachine.handleMessage(message);
1✔
380
  }
1✔
381

382
  private void handleSingleEvent(HistoryEvent event, boolean hasNextEvent) {
383
    if (isCommandEvent(event)) {
1✔
384
      handleCommandEvent(event);
1✔
385
      return;
1✔
386
    }
387

388
    if (replaying
1✔
389
        && !hasNextEvent
390
        && (event.getEventType() == EventType.EVENT_TYPE_WORKFLOW_TASK_STARTED
1✔
391
            || WorkflowExecutionUtils.isWorkflowTaskClosedEvent(event))) {
1✔
392
      replaying = false;
1✔
393
    }
394

395
    Long initialCommandEventId = getInitialCommandEventId(event);
1✔
396
    EntityStateMachine c = stateMachines.get(initialCommandEventId);
1✔
397
    if (c != null) {
1✔
398
      c.handleEvent(event, hasNextEvent);
1✔
399
      if (c.isFinalState()) {
1✔
400
        stateMachines.remove(initialCommandEventId);
1✔
401
      }
402
    } else {
403
      handleNonStatefulEvent(event, hasNextEvent);
1✔
404
    }
405
  }
1✔
406

407
  /**
408
   * Handles command event. Command event is an event which is generated from a command emitted by a
409
   * past decision. Each command has a correspondent event. For example ScheduleActivityTaskCommand
410
   * is recorded to the history as ActivityTaskScheduledEvent.
411
   *
412
   * <p>Command events always follow WorkflowTaskCompletedEvent.
413
   *
414
   * <p>The handling consists from verifying that the next command in the commands queue matches the
415
   * event, command state machine is notified about the event and the command is removed from the
416
   * commands queue.
417
   */
418
  private void handleCommandEvent(HistoryEvent event) {
419
    if (handleLocalActivityMarker(event)) {
1✔
420
      return;
1✔
421
    }
422

423
    // Match event to the next command in the stateMachine queue.
424
    // After matching the command is notified about the event and is removed from the
425
    // queue.
426
    CancellableCommand matchingCommand = null;
1✔
427
    while (matchingCommand == null) {
1✔
428
      // handleVersionMarker can skip a marker event if the getVersion call was removed.
429
      // In this case we don't want to consume a command.
430
      // That's why peek is used instead of poll.
431
      CancellableCommand command = commands.peek();
1✔
432
      if (command == null) {
1✔
433
        if (handleNonMatchingVersionMarker(event)) {
×
434
          // this event is a version marker for removed getVersion call.
435
          // Handle the version marker as unmatched and return even if there is no commands to match
436
          // it against.
437
          return;
×
438
        } else {
439
          throw new NonDeterministicException("No command scheduled that corresponds to " + event);
×
440
        }
441
      }
442

443
      if (command.isCanceled()) {
1✔
444
        // Consume and skip the command
445
        commands.poll();
1✔
446
        continue;
1✔
447
      }
448

449
      // Note that handleEvent can cause a command cancellation in case of
450
      // 1. MutableSideEffect
451
      // 2. Version State Machine during replay cancels the command and enters SKIPPED state
452
      //    if it handled non-matching event.
453
      HandleEventStatus status = command.handleEvent(event, true);
1✔
454

455
      if (command.isCanceled()) {
1✔
456
        // Consume and skip the command
457
        commands.poll();
1✔
458
        continue;
1✔
459
      }
460

461
      switch (status) {
1✔
462
        case OK:
463
          // Consume the command
464
          commands.poll();
1✔
465
          matchingCommand = command;
1✔
466
          break;
1✔
467
        case NON_MATCHING_EVENT:
468
          if (handleNonMatchingVersionMarker(event)) {
1✔
469
            // this event is a version marker for removed getVersion call.
470
            // Handle the version marker as unmatched and return without consuming the command
471
            return;
1✔
472
          } else {
473
            throw new NonDeterministicException(
1✔
474
                "Event "
475
                    + event.getEventId()
1✔
476
                    + " of type "
477
                    + event.getEventType()
1✔
478
                    + " does not"
479
                    + " match command type "
480
                    + command.getCommandType());
1✔
481
          }
482
        default:
483
          throw new IllegalStateException(
×
484
              "Got " + status + " value from command.handleEvent which is not handled");
485
      }
486
    }
1✔
487

488
    validateCommand(matchingCommand.getCommand(), event);
1✔
489
    EntityStateMachine stateMachine = matchingCommand.getStateMachine();
1✔
490
    if (!stateMachine.isFinalState()) {
1✔
491
      stateMachines.put(event.getEventId(), stateMachine);
1✔
492
    }
493
    // Marker is the only command processing of which can cause workflow code execution
494
    // and generation of new state machines.
495
    if (event.getEventType() == EventType.EVENT_TYPE_MARKER_RECORDED) {
1✔
496
      prepareCommands();
1✔
497
    }
498
  }
1✔
499

500
  private void preloadVersionMarker(HistoryEvent event) {
501
    if (VersionMarkerUtils.hasVersionMarkerStructure(event)) {
1✔
502
      String changeId = VersionMarkerUtils.tryGetChangeIdFromVersionMarkerEvent(event);
1✔
503
      if (changeId == null) {
1✔
504
        // if we can't extract changeId, this event will later fail to match with anything
505
        // and the corresponded exception will be thrown
506
        return;
×
507
      }
508
      VersionStateMachine versionStateMachine =
1✔
509
          versions.computeIfAbsent(
1✔
510
              changeId,
511
              (idKey) ->
512
                  VersionStateMachine.newInstance(
1✔
513
                      changeId, this::isReplaying, commandSink, stateMachineSink));
514
      versionStateMachine.handleMarkersPreload(event);
1✔
515
    }
516
  }
1✔
517

518
  private boolean handleNonMatchingVersionMarker(HistoryEvent event) {
519
    String changeId = VersionMarkerUtils.tryGetChangeIdFromVersionMarkerEvent(event);
1✔
520
    if (changeId == null) {
1✔
521
      return false;
1✔
522
    }
523
    VersionStateMachine versionStateMachine = versions.get(changeId);
1✔
524
    Preconditions.checkNotNull(
1✔
525
        versionStateMachine,
526
        "versionStateMachine is expected to be initialized already by execution or preloading");
527
    versionStateMachine.handleNonMatchingEvent(event);
1✔
528
    return true;
1✔
529
  }
530

531
  public List<Command> takeCommands() {
532
    List<Command> result = new ArrayList<>(commands.size());
1✔
533
    for (CancellableCommand command : commands) {
1✔
534
      if (!command.isCanceled()) {
1✔
535
        result.add(command.getCommand());
1✔
536
      }
537
    }
1✔
538
    return result;
1✔
539
  }
540

541
  public void sendMessage(Message message) {
542
    checkEventLoopExecuting();
1✔
543
    if (!isReplaying()) {
1✔
544
      messageOutbox.add(message);
1✔
545
    }
546
  }
1✔
547

548
  public List<Message> takeMessages() {
549
    List<Message> result = new ArrayList<>(messageOutbox.size());
1✔
550
    for (Message message : messageOutbox) {
1✔
551
      result.add(message);
1✔
552
    }
1✔
553
    messageOutbox.clear();
1✔
554
    return result;
1✔
555
  }
556

557
  /**
558
   * @return True if the SDK flag is supported in this workflow execution
559
   */
560
  public boolean tryUseSdkFlag(SdkFlag flag) {
561
    return flags.tryUseSdkFlag(flag);
1✔
562
  }
563

564
  /**
565
   * @return Set of all new flags set since the last call
566
   */
567
  public EnumSet<SdkFlag> takeNewSdkFlags() {
568
    return flags.takeNewSdkFlags();
1✔
569
  }
570

571
  private void prepareCommands() {
572
    if (preparing) {
1✔
573
      return;
1✔
574
    }
575
    preparing = true;
1✔
576
    try {
577
      prepareImpl();
1✔
578
    } finally {
579
      preparing = false;
1✔
580
    }
581
  }
1✔
582

583
  private void prepareImpl() {
584
    // handleCommand can lead to code execution because of SideEffect, MutableSideEffect or local
585
    // activity completion. And code execution can lead to creation of new commands and
586
    // cancellation of existing commands. That is the reason for using Queue as a data structure for
587
    // commands.
588
    while (true) {
589
      CancellableCommand command = cancellableCommands.poll();
1✔
590
      if (command == null) {
1✔
591
        break;
1✔
592
      }
593
      // handleCommand should be called even on canceled ones to support mutableSideEffect
594
      command.handleCommand(command.getCommandType());
1✔
595
      commands.add(command);
1✔
596
    }
1✔
597
  }
1✔
598

599
  /**
600
   * Local activity is different from all other entities. It doesn't schedule a marker command when
601
   * the {@link #scheduleLocalActivityTask(ExecuteLocalActivityParameters, Functions.Proc2)} is
602
   * called. The marker is scheduled only when activity completes through ({@link
603
   * #handleLocalActivityCompletion(LocalActivityResult)}). That's why the normal logic of {@link
604
   * #handleCommandEvent(HistoryEvent)}, which assumes that each event has a correspondent command
605
   * during replay, doesn't work. Instead, local activities are matched by their id using
606
   * localActivityMap.
607
   *
608
   * @return true if matched and false if normal event handling should continue.
609
   */
610
  private boolean handleLocalActivityMarker(HistoryEvent event) {
611
    if (!LocalActivityMarkerUtils.hasLocalActivityStructure(event)) {
1✔
612
      return false;
1✔
613
    }
614

615
    MarkerRecordedEventAttributes markerAttributes = event.getMarkerRecordedEventAttributes();
1✔
616
    String id = LocalActivityMarkerUtils.getActivityId(markerAttributes);
1✔
617
    LocalActivityStateMachine stateMachine = localActivityMap.remove(id);
1✔
618
    if (stateMachine == null) {
1✔
619
      String activityType = LocalActivityMarkerUtils.getActivityTypeName(markerAttributes);
×
620
      throw new NonDeterministicException(
×
621
          String.format(
×
622
              "Local activity of type %s is recorded in the history with id %s but was not expected by the execution",
623
              activityType, id));
624
    }
625
    // RESULT_NOTIFIED state means that there is outstanding command that has to be matched
626
    // using standard logic. So return false to let the handleCommand method to run its standard
627
    // logic.
628
    if (stateMachine.getState() == LocalActivityStateMachine.State.RESULT_NOTIFIED) {
1✔
629
      return false;
1✔
630
    }
631
    stateMachine.handleEvent(event, true);
1✔
632
    eventLoop();
1✔
633
    return true;
1✔
634
  }
635

636
  private void eventLoop() {
637
    if (eventLoopExecuting) {
1✔
638
      return;
1✔
639
    }
640
    eventLoopExecuting = true;
1✔
641
    try {
642
      callbacks.eventLoop();
1✔
643
    } finally {
644
      eventLoopExecuting = false;
1✔
645
    }
646
    prepareCommands();
1✔
647
  }
1✔
648

649
  private void handleNonStatefulEvent(HistoryEvent event, boolean hasNextEvent) {
650
    switch (event.getEventType()) {
1✔
651
      case EVENT_TYPE_WORKFLOW_EXECUTION_STARTED:
652
        this.currentRunId =
1✔
653
            event.getWorkflowExecutionStartedEventAttributes().getOriginalExecutionRunId();
1✔
654
        callbacks.start(event);
1✔
655
        break;
1✔
656
      case EVENT_TYPE_WORKFLOW_TASK_SCHEDULED:
657
        WorkflowTaskStateMachine c =
1✔
658
            WorkflowTaskStateMachine.newInstance(
1✔
659
                workflowTaskStartedEventId, new WorkflowTaskCommandsListener());
660
        c.handleEvent(event, hasNextEvent);
1✔
661
        stateMachines.put(event.getEventId(), c);
1✔
662
        break;
1✔
663
      case EVENT_TYPE_WORKFLOW_EXECUTION_SIGNALED:
664
        callbacks.signal(event);
1✔
665
        break;
1✔
666
      case EVENT_TYPE_WORKFLOW_EXECUTION_CANCEL_REQUESTED:
667
        callbacks.cancel(event);
1✔
668
        break;
1✔
669
      case EVENT_TYPE_WORKFLOW_EXECUTION_TIMED_OUT:
670
      case UNRECOGNIZED:
671
        break;
1✔
672
      default:
673
        throw new IllegalArgumentException("Unexpected event:" + event);
×
674
    }
675
  }
1✔
676

677
  private long setCurrentTimeMillis(long currentTimeMillis) {
678
    if (this.currentTimeMillis < currentTimeMillis) {
1✔
679
      this.currentTimeMillis = currentTimeMillis;
1✔
680
    }
681
    return this.currentTimeMillis;
1✔
682
  }
683

684
  public long getLastStartedEventId() {
685
    return currentStartedEventId;
1✔
686
  }
687

688
  /**
689
   * @param attributes attributes used to schedule an activity
690
   * @param callback completion callback
691
   * @return an instance of ActivityCommands
692
   */
693
  public Functions.Proc scheduleActivityTask(
694
      ExecuteActivityParameters attributes, Functions.Proc2<Optional<Payloads>, Failure> callback) {
695
    checkEventLoopExecuting();
1✔
696
    ActivityStateMachine activityStateMachine =
1✔
697
        ActivityStateMachine.newInstance(
1✔
698
            attributes,
699
            (p, f) -> {
700
              Failure failure = f != null ? f.getFailure() : null;
1✔
701
              callback.apply(p, failure);
1✔
702

703
              if (f != null
1✔
704
                  && !f.isFromEvent()
1✔
705
                  && failure.hasCause()
1✔
706
                  && failure.getCause().hasCanceledFailureInfo()) {
1✔
707
                // If !f.isFromEvent(), we want to unblock the event loop as the promise got filled
708
                // and the workflow may make progress. If f.isFromEvent(), we need to delay event
709
                // loop triggering until WorkflowTaskStarted.
710
                eventLoop();
1✔
711
              }
712
            },
1✔
713
            commandSink,
714
            stateMachineSink);
715
    return activityStateMachine::cancel;
1✔
716
  }
717

718
  /**
719
   * Creates a new timer state machine
720
   *
721
   * @param attributes timer command attributes
722
   * @param completionCallback invoked when timer fires or reports cancellation. One of
723
   *     TimerFiredEvent, TimerCanceledEvent.
724
   * @return cancellation callback that should be invoked to initiate timer cancellation
725
   */
726
  public Functions.Proc newTimer(
727
      StartTimerCommandAttributes attributes, Functions.Proc1<HistoryEvent> completionCallback) {
728
    checkEventLoopExecuting();
1✔
729
    TimerStateMachine timer =
1✔
730
        TimerStateMachine.newInstance(
1✔
731
            attributes,
732
            (event) -> {
733
              completionCallback.apply(event);
1✔
734
              // Needed due to immediate cancellation
735
              if (event.getEventType() == EventType.EVENT_TYPE_TIMER_CANCELED) {
1✔
736
                eventLoop();
1✔
737
              }
738
            },
1✔
739
            commandSink,
740
            stateMachineSink);
741
    return timer::cancel;
1✔
742
  }
743

744
  /**
745
   * Creates a new child state machine
746
   *
747
   * @param parameters child workflow start command parameters
748
   * @param startedCallback callback that is notified about child start
749
   * @param completionCallback invoked when child reports completion or failure
750
   * @return cancellation callback that should be invoked to cancel the child
751
   */
752
  public Functions.Proc startChildWorkflow(
753
      StartChildWorkflowExecutionParameters parameters,
754
      Functions.Proc2<WorkflowExecution, Exception> startedCallback,
755
      Functions.Proc2<Optional<Payloads>, Exception> completionCallback) {
756
    checkEventLoopExecuting();
1✔
757
    StartChildWorkflowExecutionCommandAttributes attributes = parameters.getRequest().build();
1✔
758
    ChildWorkflowCancellationType cancellationType = parameters.getCancellationType();
1✔
759
    ChildWorkflowStateMachine child =
1✔
760
        ChildWorkflowStateMachine.newInstance(
1✔
761
            attributes, startedCallback, completionCallback, commandSink, stateMachineSink);
762
    return () -> {
1✔
763
      if (cancellationType == ChildWorkflowCancellationType.ABANDON) {
1✔
764
        notifyChildCanceled(completionCallback);
1✔
765
        return;
1✔
766
      }
767
      // The only time child can be canceled directly is before its start command
768
      // was sent out to the service. After that RequestCancelExternal should be used.
769
      if (child.isCancellable()) {
1✔
770
        child.cancel();
1✔
771
        return;
1✔
772
      }
773
      if (!child.isFinalState()) {
1✔
774
        requestCancelExternalWorkflowExecution(
1✔
775
            RequestCancelExternalWorkflowExecutionCommandAttributes.newBuilder()
1✔
776
                .setWorkflowId(attributes.getWorkflowId())
1✔
777
                .setNamespace(attributes.getNamespace())
1✔
778
                .setChildWorkflowOnly(true)
1✔
779
                .build(),
1✔
780
            (r, e) -> { // TODO(maxim): Decide what to do if an error is passed to the callback.
781
              if (cancellationType == ChildWorkflowCancellationType.WAIT_CANCELLATION_REQUESTED) {
1✔
782
                notifyChildCanceled(completionCallback);
1✔
783
              }
784
            });
1✔
785
        if (cancellationType == ChildWorkflowCancellationType.TRY_CANCEL) {
1✔
786
          notifyChildCanceled(completionCallback);
1✔
787
        }
788
      }
789
    };
1✔
790
  }
791

792
  private void notifyChildCanceled(
793
      Functions.Proc2<Optional<Payloads>, Exception> completionCallback) {
794
    CanceledFailure failure = new CanceledFailure("Child canceled");
1✔
795
    completionCallback.apply(Optional.empty(), failure);
1✔
796
    eventLoop();
1✔
797
  }
1✔
798

799
  /**
800
   * @param attributes
801
   * @param completionCallback invoked when signal delivery completes of fails. The following types
802
   */
803
  public Functions.Proc signalExternalWorkflowExecution(
804
      SignalExternalWorkflowExecutionCommandAttributes attributes,
805
      Functions.Proc2<Void, Failure> completionCallback) {
806
    checkEventLoopExecuting();
1✔
807
    return SignalExternalStateMachine.newInstance(
1✔
808
        attributes, completionCallback, commandSink, stateMachineSink);
809
  }
810

811
  /**
812
   * @param attributes attributes to use to cancel external workflow
813
   * @param completionCallback one of ExternalWorkflowExecutionCancelRequestedEvent,
814
   */
815
  public void requestCancelExternalWorkflowExecution(
816
      RequestCancelExternalWorkflowExecutionCommandAttributes attributes,
817
      Functions.Proc2<Void, RuntimeException> completionCallback) {
818
    checkEventLoopExecuting();
1✔
819
    CancelExternalStateMachine.newInstance(
1✔
820
        attributes, completionCallback, commandSink, stateMachineSink);
821
  }
1✔
822

823
  public void upsertSearchAttributes(SearchAttributes attributes) {
824
    checkEventLoopExecuting();
1✔
825
    UpsertSearchAttributesStateMachine.newInstance(attributes, commandSink, stateMachineSink);
1✔
826
  }
1✔
827

828
  public void completeWorkflow(Optional<Payloads> workflowOutput) {
829
    checkEventLoopExecuting();
1✔
830
    CompleteWorkflowStateMachine.newInstance(workflowOutput, commandSink, stateMachineSink);
1✔
831
  }
1✔
832

833
  public void failWorkflow(Failure failure) {
834
    checkEventLoopExecuting();
1✔
835
    FailWorkflowStateMachine.newInstance(failure, commandSink, stateMachineSink);
1✔
836
  }
1✔
837

838
  public void cancelWorkflow() {
839
    checkEventLoopExecuting();
1✔
840
    CancelWorkflowStateMachine.newInstance(
1✔
841
        CancelWorkflowExecutionCommandAttributes.getDefaultInstance(),
1✔
842
        commandSink,
843
        stateMachineSink);
844
  }
1✔
845

846
  public void continueAsNewWorkflow(ContinueAsNewWorkflowExecutionCommandAttributes attributes) {
847
    checkEventLoopExecuting();
1✔
848
    ContinueAsNewWorkflowStateMachine.newInstance(attributes, commandSink, stateMachineSink);
1✔
849
  }
1✔
850

851
  public boolean isReplaying() {
852
    return replaying;
1✔
853
  }
854

855
  public long currentTimeMillis() {
856
    return currentTimeMillis;
1✔
857
  }
858

859
  public UUID randomUUID() {
860
    checkEventLoopExecuting();
1✔
861
    String runId = currentRunId;
1✔
862
    if (runId == null) {
1✔
863
      throw new Error("null currentRunId");
×
864
    }
865
    String id = runId + ":" + idCounter++;
1✔
866
    byte[] bytes = id.getBytes(StandardCharsets.UTF_8);
1✔
867
    return UUID.nameUUIDFromBytes(bytes);
1✔
868
  }
869

870
  public Random newRandom() {
871
    checkEventLoopExecuting();
1✔
872
    return new Random(randomUUID().getLeastSignificantBits());
1✔
873
  }
874

875
  public void sideEffect(
876
      Functions.Func<Optional<Payloads>> func, Functions.Proc1<Optional<Payloads>> callback) {
877
    checkEventLoopExecuting();
1✔
878
    SideEffectStateMachine.newInstance(
1✔
879
        this::isReplaying,
880
        func,
881
        (payloads) -> {
882
          callback.apply(payloads);
1✔
883
          // callback unblocked sideEffect call. Give workflow code chance to make progress.
884
          eventLoop();
1✔
885
        },
1✔
886
        commandSink,
887
        stateMachineSink);
888
  }
1✔
889

890
  /**
891
   * @param id mutable side effect id
892
   * @param func given the value from the last marker returns value to store. If result is empty
893
   *     nothing is recorded into the history.
894
   * @param callback used to report result or failure
895
   */
896
  public void mutableSideEffect(
897
      String id,
898
      Functions.Func1<Optional<Payloads>, Optional<Payloads>> func,
899
      Functions.Proc1<Optional<Payloads>> callback) {
900
    checkEventLoopExecuting();
1✔
901
    MutableSideEffectStateMachine stateMachine =
1✔
902
        mutableSideEffects.computeIfAbsent(
1✔
903
            id,
904
            (idKey) ->
905
                MutableSideEffectStateMachine.newInstance(
1✔
906
                    idKey, this::isReplaying, commandSink, stateMachineSink));
907
    stateMachine.mutableSideEffect(
1✔
908
        func,
909
        (r) -> {
910
          callback.apply(r);
1✔
911
          // callback unblocked mutableSideEffect call. Give workflow code chance to make progress.
912
          eventLoop();
1✔
913
        },
1✔
914
        stateMachineSink);
915
  }
1✔
916

917
  public boolean getVersion(
918
      String changeId,
919
      int minSupported,
920
      int maxSupported,
921
      Functions.Proc2<Integer, RuntimeException> callback) {
922
    VersionStateMachine stateMachine =
1✔
923
        versions.computeIfAbsent(
1✔
924
            changeId,
925
            (idKey) -> {
926
              return VersionStateMachine.newInstance(
1✔
927
                  changeId, this::isReplaying, commandSink, stateMachineSink);
928
            });
929
    VersionStateMachine.State state =
1✔
930
        stateMachine.getVersion(
1✔
931
            minSupported,
932
            maxSupported,
933
            (v, e) -> {
934
              callback.apply(v, e);
1✔
935
              // without this getVersion call will trigger the end of WFT,
936
              // instead we want to prepare subsequent commands and unblock the execution one more
937
              // time.
938
              eventLoop();
1✔
939
            });
1✔
940
    return state != VersionStateMachine.State.SKIPPED_REPLAYING;
1✔
941
  }
942

943
  public List<ExecuteLocalActivityParameters> takeLocalActivityRequests() {
944
    List<ExecuteLocalActivityParameters> result = localActivityRequests;
1✔
945
    localActivityRequests = new ArrayList<>();
1✔
946
    for (ExecuteLocalActivityParameters parameters : result) {
1✔
947
      LocalActivityStateMachine stateMachine = localActivityMap.get(parameters.getActivityId());
1✔
948
      stateMachine.markAsSent();
1✔
949
    }
1✔
950
    return result;
1✔
951
  }
952

953
  public void handleLocalActivityCompletion(LocalActivityResult laCompletion) {
954
    String activityId = laCompletion.getActivityId();
1✔
955
    LocalActivityStateMachine laStateMachine = localActivityMap.get(activityId);
1✔
956
    if (laStateMachine == null) {
1✔
957
      throw new IllegalStateException("Unknown local activity: " + activityId);
×
958
    }
959
    laStateMachine.handleCompletion(laCompletion);
1✔
960
    prepareCommands();
1✔
961
  }
1✔
962

963
  public Functions.Proc scheduleLocalActivityTask(
964
      ExecuteLocalActivityParameters parameters,
965
      Functions.Proc2<Optional<Payloads>, LocalActivityCallback.LocalActivityFailedException>
966
          callback) {
967
    checkEventLoopExecuting();
1✔
968
    String activityId = parameters.getActivityId();
1✔
969
    if (Strings.isNullOrEmpty(activityId)) {
1✔
970
      throw new IllegalArgumentException("Missing activityId: " + activityId);
×
971
    }
972
    if (localActivityMap.containsKey(activityId)) {
1✔
973
      throw new IllegalArgumentException("Duplicated local activity id: " + activityId);
×
974
    }
975
    LocalActivityStateMachine commands =
1✔
976
        LocalActivityStateMachine.newInstance(
1✔
977
            this::isReplaying,
978
            this::setCurrentTimeMillis,
979
            parameters,
980
            (r, e) -> {
981
              callback.apply(r, e);
1✔
982
              // callback unblocked local activity call. Give workflow code chance to make progress.
983
              eventLoop();
1✔
984
            },
1✔
985
            localActivityRequestSink,
986
            commandSink,
987
            stateMachineSink,
988
            currentTimeMillis);
989
    localActivityMap.put(activityId, commands);
1✔
990
    return commands::cancel;
1✔
991
  }
992

993
  /** Validates that command matches the event during replay. */
994
  private void validateCommand(Command command, HistoryEvent event) {
995
    // TODO(maxim): Add more thorough validation logic. For example check if activity IDs are
996
    // matching.
997

998
    // ProtocolMessageCommand is different from other commands because it can be associated with
999
    // multiple types of events
1000
    // TODO(#1781) Validate protocol message is expected type.
1001
    if (command.getCommandType() == COMMAND_TYPE_PROTOCOL_MESSAGE) {
1✔
1002
      ProtocolMessageCommandAttributes commandAttributes =
1✔
1003
          command.getProtocolMessageCommandAttributes();
1✔
1004
      switch (event.getEventType()) {
1✔
1005
        case EVENT_TYPE_WORKFLOW_EXECUTION_UPDATE_ACCEPTED:
1006
          assertMatch(
1✔
1007
              command,
1008
              event,
1009
              "messageType",
1010
              true,
1✔
1011
              commandAttributes.getMessageId().endsWith("accept"));
1✔
1012
          break;
1✔
1013
        case EVENT_TYPE_WORKFLOW_EXECUTION_UPDATE_REJECTED:
1014
          assertMatch(
×
1015
              command,
1016
              event,
1017
              "messageType",
1018
              true,
×
1019
              commandAttributes.getMessageId().endsWith("reject"));
×
1020
          break;
×
1021
        case EVENT_TYPE_WORKFLOW_EXECUTION_UPDATE_COMPLETED:
1022
          assertMatch(
1✔
1023
              command,
1024
              event,
1025
              "messageType",
1026
              true,
1✔
1027
              commandAttributes.getMessageId().endsWith("complete"));
1✔
1028
          break;
1✔
1029
        default:
1030
          throw new IllegalArgumentException("Unexpected event type: " + event.getEventType());
×
1031
      }
1032
      return;
1✔
1033
    }
1034
    assertMatch(
1✔
1035
        command,
1036
        event,
1037
        "eventType",
1038
        getEventTypeForCommand(command.getCommandType()),
1✔
1039
        event.getEventType());
1✔
1040
    switch (command.getCommandType()) {
1✔
1041
      case COMMAND_TYPE_SCHEDULE_ACTIVITY_TASK:
1042
        {
1043
          ScheduleActivityTaskCommandAttributes commandAttributes =
1✔
1044
              command.getScheduleActivityTaskCommandAttributes();
1✔
1045
          ActivityTaskScheduledEventAttributes eventAttributes =
1✔
1046
              event.getActivityTaskScheduledEventAttributes();
1✔
1047
          assertMatch(
1✔
1048
              command,
1049
              event,
1050
              "activityId",
1051
              commandAttributes.getActivityId(),
1✔
1052
              eventAttributes.getActivityId());
1✔
1053
          assertMatch(
1✔
1054
              command,
1055
              event,
1056
              "activityType",
1057
              commandAttributes.getActivityType(),
1✔
1058
              eventAttributes.getActivityType());
1✔
1059
        }
1060
        break;
1✔
1061
      case COMMAND_TYPE_START_CHILD_WORKFLOW_EXECUTION:
1062
        {
1063
          StartChildWorkflowExecutionCommandAttributes commandAttributes =
1✔
1064
              command.getStartChildWorkflowExecutionCommandAttributes();
1✔
1065
          StartChildWorkflowExecutionInitiatedEventAttributes eventAttributes =
1✔
1066
              event.getStartChildWorkflowExecutionInitiatedEventAttributes();
1✔
1067
          assertMatch(
1✔
1068
              command,
1069
              event,
1070
              "workflowId",
1071
              commandAttributes.getWorkflowId(),
1✔
1072
              eventAttributes.getWorkflowId());
1✔
1073
          assertMatch(
1✔
1074
              command,
1075
              event,
1076
              "workflowType",
1077
              commandAttributes.getWorkflowType(),
1✔
1078
              eventAttributes.getWorkflowType());
1✔
1079
        }
1080
        break;
1✔
1081
      case COMMAND_TYPE_REQUEST_CANCEL_ACTIVITY_TASK:
1082
      case COMMAND_TYPE_START_TIMER:
1083
        {
1084
          StartTimerCommandAttributes commandAttributes = command.getStartTimerCommandAttributes();
1✔
1085
          TimerStartedEventAttributes eventAttributes = event.getTimerStartedEventAttributes();
1✔
1086
          assertMatch(
1✔
1087
              command,
1088
              event,
1089
              "timerId",
1090
              commandAttributes.getTimerId(),
1✔
1091
              eventAttributes.getTimerId());
1✔
1092
        }
1093
        break;
1✔
1094
      case COMMAND_TYPE_CANCEL_TIMER:
1095
      case COMMAND_TYPE_CANCEL_WORKFLOW_EXECUTION:
1096
      case COMMAND_TYPE_REQUEST_CANCEL_EXTERNAL_WORKFLOW_EXECUTION:
1097
      case COMMAND_TYPE_RECORD_MARKER:
1098
      case COMMAND_TYPE_CONTINUE_AS_NEW_WORKFLOW_EXECUTION:
1099
      case COMMAND_TYPE_SIGNAL_EXTERNAL_WORKFLOW_EXECUTION:
1100
      case COMMAND_TYPE_UPSERT_WORKFLOW_SEARCH_ATTRIBUTES:
1101
      case COMMAND_TYPE_COMPLETE_WORKFLOW_EXECUTION:
1102
      case COMMAND_TYPE_FAIL_WORKFLOW_EXECUTION:
1103
      case COMMAND_TYPE_PROTOCOL_MESSAGE:
1104
        break;
1✔
1105
      case UNRECOGNIZED:
1106
      case COMMAND_TYPE_UNSPECIFIED:
1107
        throw new IllegalArgumentException("Unexpected command type: " + command.getCommandType());
×
1108
    }
1109
  }
1✔
1110

1111
  private void assertMatch(
1112
      Command command, HistoryEvent event, String checkType, Object expected, Object actual) {
1113
    if (!expected.equals(actual)) {
1✔
1114
      String message =
×
1115
          String.format(
×
1116
              "Command %s doesn't match event %s with EventId=%s on check %s "
1117
                  + "with an expected value '%s' and an actual value '%s'",
1118
              command.getCommandType(),
×
1119
              event.getEventType(),
×
1120
              event.getEventId(),
×
1121
              checkType,
1122
              expected,
1123
              actual);
1124
      throw new NonDeterministicException(message);
×
1125
    }
1126
  }
1✔
1127

1128
  private class WorkflowTaskCommandsListener implements WorkflowTaskStateMachine.Listener {
1✔
1129
    @Override
1130
    public void workflowTaskStarted(
1131
        long startedEventId, long currentTimeMillis, boolean nonProcessedWorkflowTask) {
1132
      setCurrentTimeMillis(currentTimeMillis);
1✔
1133
      for (CancellableCommand cancellableCommand : commands) {
1✔
1134
        cancellableCommand.handleWorkflowTaskStarted();
1✔
1135
      }
1✔
1136
      // Give local activities a chance to recreate their requests if they were lost due
1137
      // to the last workflow task failure. The loss could happen only the last workflow task
1138
      // was forcibly created by setting forceCreate on RespondWorkflowTaskCompletedRequest.
1139
      if (nonProcessedWorkflowTask) {
1✔
1140
        for (LocalActivityStateMachine value : localActivityMap.values()) {
1✔
1141
          value.nonReplayWorkflowTaskStarted();
1✔
1142
        }
1✔
1143
      }
1144
      WorkflowStateMachines.this.currentStartedEventId = startedEventId;
1✔
1145

1146
      eventLoop();
1✔
1147
    }
1✔
1148

1149
    @Override
1150
    public void updateRunId(String currentRunId) {
1151
      WorkflowStateMachines.this.currentRunId = currentRunId;
×
1152
    }
×
1153
  }
1154

1155
  private long getInitialCommandEventId(HistoryEvent event) {
1156
    switch (event.getEventType()) {
1✔
1157
      case EVENT_TYPE_ACTIVITY_TASK_STARTED:
1158
        return event.getActivityTaskStartedEventAttributes().getScheduledEventId();
1✔
1159
      case EVENT_TYPE_ACTIVITY_TASK_COMPLETED:
1160
        return event.getActivityTaskCompletedEventAttributes().getScheduledEventId();
1✔
1161
      case EVENT_TYPE_ACTIVITY_TASK_FAILED:
1162
        return event.getActivityTaskFailedEventAttributes().getScheduledEventId();
1✔
1163
      case EVENT_TYPE_ACTIVITY_TASK_TIMED_OUT:
1164
        return event.getActivityTaskTimedOutEventAttributes().getScheduledEventId();
1✔
1165
      case EVENT_TYPE_ACTIVITY_TASK_CANCEL_REQUESTED:
1166
        return event.getActivityTaskCancelRequestedEventAttributes().getScheduledEventId();
×
1167
      case EVENT_TYPE_ACTIVITY_TASK_CANCELED:
1168
        return event.getActivityTaskCanceledEventAttributes().getScheduledEventId();
1✔
1169
      case EVENT_TYPE_TIMER_FIRED:
1170
        return event.getTimerFiredEventAttributes().getStartedEventId();
1✔
1171
      case EVENT_TYPE_TIMER_CANCELED:
1172
        return event.getTimerCanceledEventAttributes().getStartedEventId();
×
1173
      case EVENT_TYPE_REQUEST_CANCEL_EXTERNAL_WORKFLOW_EXECUTION_FAILED:
1174
        return event
×
1175
            .getRequestCancelExternalWorkflowExecutionFailedEventAttributes()
×
1176
            .getInitiatedEventId();
×
1177
      case EVENT_TYPE_EXTERNAL_WORKFLOW_EXECUTION_CANCEL_REQUESTED:
1178
        return event
1✔
1179
            .getExternalWorkflowExecutionCancelRequestedEventAttributes()
1✔
1180
            .getInitiatedEventId();
1✔
1181
      case EVENT_TYPE_START_CHILD_WORKFLOW_EXECUTION_FAILED:
1182
        return event.getStartChildWorkflowExecutionFailedEventAttributes().getInitiatedEventId();
1✔
1183
      case EVENT_TYPE_CHILD_WORKFLOW_EXECUTION_STARTED:
1184
        return event.getChildWorkflowExecutionStartedEventAttributes().getInitiatedEventId();
1✔
1185
      case EVENT_TYPE_CHILD_WORKFLOW_EXECUTION_COMPLETED:
1186
        return event.getChildWorkflowExecutionCompletedEventAttributes().getInitiatedEventId();
1✔
1187
      case EVENT_TYPE_CHILD_WORKFLOW_EXECUTION_FAILED:
1188
        return event.getChildWorkflowExecutionFailedEventAttributes().getInitiatedEventId();
1✔
1189
      case EVENT_TYPE_CHILD_WORKFLOW_EXECUTION_CANCELED:
1190
        return event.getChildWorkflowExecutionCanceledEventAttributes().getInitiatedEventId();
1✔
1191
      case EVENT_TYPE_CHILD_WORKFLOW_EXECUTION_TIMED_OUT:
1192
        return event.getChildWorkflowExecutionTimedOutEventAttributes().getInitiatedEventId();
1✔
1193
      case EVENT_TYPE_CHILD_WORKFLOW_EXECUTION_TERMINATED:
1194
        return event.getChildWorkflowExecutionTerminatedEventAttributes().getInitiatedEventId();
×
1195
      case EVENT_TYPE_SIGNAL_EXTERNAL_WORKFLOW_EXECUTION_FAILED:
1196
        return event
1✔
1197
            .getSignalExternalWorkflowExecutionFailedEventAttributes()
1✔
1198
            .getInitiatedEventId();
1✔
1199
      case EVENT_TYPE_EXTERNAL_WORKFLOW_EXECUTION_SIGNALED:
1200
        return event.getExternalWorkflowExecutionSignaledEventAttributes().getInitiatedEventId();
1✔
1201
      case EVENT_TYPE_WORKFLOW_TASK_STARTED:
1202
        return event.getWorkflowTaskStartedEventAttributes().getScheduledEventId();
1✔
1203
      case EVENT_TYPE_WORKFLOW_TASK_COMPLETED:
1204
        return event.getWorkflowTaskCompletedEventAttributes().getScheduledEventId();
1✔
1205
      case EVENT_TYPE_WORKFLOW_TASK_TIMED_OUT:
1206
        return event.getWorkflowTaskTimedOutEventAttributes().getScheduledEventId();
1✔
1207
      case EVENT_TYPE_WORKFLOW_TASK_FAILED:
1208
        return event.getWorkflowTaskFailedEventAttributes().getScheduledEventId();
1✔
1209

1210
      case EVENT_TYPE_ACTIVITY_TASK_SCHEDULED:
1211
      case EVENT_TYPE_TIMER_STARTED:
1212
      case EVENT_TYPE_MARKER_RECORDED:
1213
      case EVENT_TYPE_SIGNAL_EXTERNAL_WORKFLOW_EXECUTION_INITIATED:
1214
      case EVENT_TYPE_START_CHILD_WORKFLOW_EXECUTION_INITIATED:
1215
      case EVENT_TYPE_REQUEST_CANCEL_EXTERNAL_WORKFLOW_EXECUTION_INITIATED:
1216
      case EVENT_TYPE_WORKFLOW_EXECUTION_CONTINUED_AS_NEW:
1217
      case EVENT_TYPE_WORKFLOW_EXECUTION_TERMINATED:
1218
      case EVENT_TYPE_UPSERT_WORKFLOW_SEARCH_ATTRIBUTES:
1219
      case EVENT_TYPE_WORKFLOW_TASK_SCHEDULED:
1220
      case EVENT_TYPE_WORKFLOW_EXECUTION_SIGNALED:
1221
      case EVENT_TYPE_WORKFLOW_EXECUTION_STARTED:
1222
      case EVENT_TYPE_WORKFLOW_EXECUTION_COMPLETED:
1223
      case EVENT_TYPE_WORKFLOW_EXECUTION_FAILED:
1224
      case EVENT_TYPE_WORKFLOW_EXECUTION_TIMED_OUT:
1225
      case EVENT_TYPE_WORKFLOW_EXECUTION_CANCEL_REQUESTED:
1226
      case EVENT_TYPE_WORKFLOW_EXECUTION_CANCELED:
1227
        return event.getEventId();
1✔
1228
      case UNRECOGNIZED:
1229
      case EVENT_TYPE_UNSPECIFIED:
1230
        throw new IllegalArgumentException("Unexpected event type: " + event.getEventType());
×
1231
    }
1232
    throw new IllegalStateException("unreachable");
×
1233
  }
1234

1235
  /**
1236
   * Workflow code executes only while event loop is running. So operations that can be invoked from
1237
   * the workflow have to satisfy this condition.
1238
   */
1239
  private void checkEventLoopExecuting() {
1240
    if (!eventLoopExecuting) {
1✔
1241
      // this call doesn't yield or await, because the await function returns true,
1242
      // but it checks if the workflow thread needs to be destroyed
1243
      WorkflowThread.await("kill workflow thread if destroy requested", () -> true);
×
1244
      throw new IllegalStateException("Operation allowed only while eventLoop is running");
×
1245
    }
1246
  }
1✔
1247

1248
  private String createEventHandlingMessage(HistoryEvent event) {
1249
    return "Failure handling event "
1✔
1250
        + event.getEventId()
1✔
1251
        + " of type '"
1252
        + event.getEventType()
1✔
1253
        + "' "
1254
        + (this.isReplaying() ? "during replay" : "during execution");
1✔
1255
  }
1256

1257
  private String createShortCurrentStateMessagePostfix() {
1258
    return String.format(
1✔
1259
        "{WorkflowTaskStartedEventId=%s, CurrentStartedEventId=%s}",
1260
        this.workflowTaskStartedEventId, this.currentStartedEventId);
1✔
1261
  }
1262
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc