• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

temporalio / sdk-java / #169

pending completion
#169

push

github-actions

web-flow
Remove use of deprecated API (#1758)

4 of 4 new or added lines in 1 file covered. (100.0%)

17345 of 21558 relevant lines covered (80.46%)

0.8 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

91.46
/temporal-sdk/src/main/java/io/temporal/internal/statemachines/WorkflowStateMachines.java
1
/*
2
 * Copyright (C) 2022 Temporal Technologies, Inc. All Rights Reserved.
3
 *
4
 * Copyright (C) 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved.
5
 *
6
 * Modifications copyright (C) 2017 Uber Technologies, Inc.
7
 *
8
 * Licensed under the Apache License, Version 2.0 (the "License");
9
 * you may not use this material except in compliance with the License.
10
 * You may obtain a copy of the License at
11
 *
12
 *   http://www.apache.org/licenses/LICENSE-2.0
13
 *
14
 * Unless required by applicable law or agreed to in writing, software
15
 * distributed under the License is distributed on an "AS IS" BASIS,
16
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17
 * See the License for the specific language governing permissions and
18
 * limitations under the License.
19
 */
20

21
package io.temporal.internal.statemachines;
22

23
import static io.temporal.internal.common.WorkflowExecutionUtils.getEventTypeForCommand;
24
import static io.temporal.internal.common.WorkflowExecutionUtils.isCommandEvent;
25
import static io.temporal.serviceclient.CheckedExceptionWrapper.unwrap;
26

27
import com.google.common.annotations.VisibleForTesting;
28
import com.google.common.base.Preconditions;
29
import com.google.common.base.Strings;
30
import com.google.protobuf.Any;
31
import io.temporal.api.command.v1.CancelWorkflowExecutionCommandAttributes;
32
import io.temporal.api.command.v1.Command;
33
import io.temporal.api.command.v1.ContinueAsNewWorkflowExecutionCommandAttributes;
34
import io.temporal.api.command.v1.RequestCancelExternalWorkflowExecutionCommandAttributes;
35
import io.temporal.api.command.v1.ScheduleActivityTaskCommandAttributes;
36
import io.temporal.api.command.v1.SignalExternalWorkflowExecutionCommandAttributes;
37
import io.temporal.api.command.v1.StartChildWorkflowExecutionCommandAttributes;
38
import io.temporal.api.command.v1.StartTimerCommandAttributes;
39
import io.temporal.api.common.v1.Payloads;
40
import io.temporal.api.common.v1.SearchAttributes;
41
import io.temporal.api.common.v1.WorkflowExecution;
42
import io.temporal.api.enums.v1.EventType;
43
import io.temporal.api.failure.v1.Failure;
44
import io.temporal.api.history.v1.*;
45
import io.temporal.api.protocol.v1.Message;
46
import io.temporal.failure.CanceledFailure;
47
import io.temporal.internal.common.ProtocolType;
48
import io.temporal.internal.common.ProtocolUtils;
49
import io.temporal.internal.common.WorkflowExecutionUtils;
50
import io.temporal.internal.history.LocalActivityMarkerUtils;
51
import io.temporal.internal.history.VersionMarkerUtils;
52
import io.temporal.internal.sync.WorkflowThread;
53
import io.temporal.internal.worker.LocalActivityResult;
54
import io.temporal.worker.NonDeterministicException;
55
import io.temporal.workflow.ChildWorkflowCancellationType;
56
import io.temporal.workflow.Functions;
57
import java.nio.charset.StandardCharsets;
58
import java.util.*;
59

60
public final class WorkflowStateMachines {
61

62
  enum HandleEventStatus {
1✔
63
    OK,
1✔
64
    NON_MATCHING_EVENT
1✔
65
  }
66

67
  /**
68
   * EventId of the WorkflowTaskStarted event of the Workflow Task that was picked up by a worker
69
   * and triggered a current replay or execution. It's expected to be the last event in the history
70
   * if we continue to execute the workflow.
71
   *
72
   * <p>For direct (legacy) queries, it may be:
73
   *
74
   * <ul>
75
   *   <li>0 if it's query a closed workflow execution
76
   *   <li>id of the last successfully completed Workflow Task if the workflow is not closed
77
   * </ul>
78
   *
79
   * <p>Set from the "outside" from the PollWorkflowTaskQueueResponse. Not modified by the SDK state
80
   * machines.
81
   */
82
  private long workflowTaskStartedEventId;
83

84
  /** EventId of the last WorkflowTaskStarted event handled by these state machines. */
85
  private long currentStartedEventId;
86

87
  /**
88
   * EventId of the last event seen by these state machines. Events earlier than this one will be
89
   * discarded.
90
   */
91
  private long lastHandledEventId;
92

93
  private final StatesMachinesCallback callbacks;
94

95
  /** Callback to send new commands to. */
96
  private final Functions.Proc1<CancellableCommand> commandSink;
97

98
  /**
99
   * currentRunId is used as seed by Workflow.newRandom and randomUUID. It allows to generate them
100
   * deterministically.
101
   */
102
  private String currentRunId;
103

104
  /** Used Workflow.newRandom and randomUUID together with currentRunId. */
105
  private long idCounter;
106

107
  /** Current workflow time. */
108
  private long currentTimeMillis = -1;
1✔
109

110
  private final Map<Long, EntityStateMachine> stateMachines = new HashMap<>();
1✔
111

112
  private final Map<String, EntityStateMachine> protocolStateMachines = new HashMap<>();
1✔
113

114
  private final Queue<Message> messageOutbox = new ArrayDeque<>();
1✔
115

116
  private final Queue<CancellableCommand> commands = new ArrayDeque<>();
1✔
117

118
  /**
119
   * Commands generated by the currently processed workflow task. It is a queue as commands can be
120
   * added (due to marker based commands) while iterating over already added commands.
121
   */
122
  private final Queue<CancellableCommand> cancellableCommands = new ArrayDeque<>();
1✔
123

124
  /**
125
   * Is workflow executing new code or replaying from the history. Note that this flag ALWAYS flips
126
   * to true for the time when we apply events from the server even if the commands were created by
127
   * an actual execution with replaying=false.
128
   */
129
  private boolean replaying;
130

131
  /** Used to ensure that event loop is not executed recursively. */
132
  private boolean eventLoopExecuting;
133

134
  /**
135
   * Used to avoid recursive calls to {@link #prepareCommands()}.
136
   *
137
   * <p>Such calls happen when sideEffects and localActivity markers are processed.
138
   */
139
  private boolean preparing;
140

141
  /** Key is mutable side effect id */
142
  private final Map<String, MutableSideEffectStateMachine> mutableSideEffects = new HashMap<>();
1✔
143

144
  /** Key is changeId */
145
  private final Map<String, VersionStateMachine> versions = new HashMap<>();
1✔
146

147
  /** Map of local activities by their id. */
148
  private final Map<String, LocalActivityStateMachine> localActivityMap = new HashMap<>();
1✔
149

150
  private List<ExecuteLocalActivityParameters> localActivityRequests = new ArrayList<>();
1✔
151

152
  private final Functions.Proc1<ExecuteLocalActivityParameters> localActivityRequestSink;
153
  private final Functions.Proc1<StateMachine> stateMachineSink;
154

155
  private final WFTBuffer wftBuffer = new WFTBuffer();
1✔
156

157
  /** */
158
  private List<Message> messages = new ArrayList<Message>();
1✔
159

160
  public WorkflowStateMachines(StatesMachinesCallback callbacks) {
161
    this(callbacks, (stateMachine) -> {});
1✔
162
  }
1✔
163

164
  @VisibleForTesting
165
  public WorkflowStateMachines(
166
      StatesMachinesCallback callbacks, Functions.Proc1<StateMachine> stateMachineSink) {
1✔
167
    this.callbacks = Objects.requireNonNull(callbacks);
1✔
168
    this.commandSink = cancellableCommands::add;
1✔
169
    this.stateMachineSink = stateMachineSink;
1✔
170
    this.localActivityRequestSink = (request) -> localActivityRequests.add(request);
1✔
171
  }
1✔
172

173
  // TODO revisit and potentially remove workflowTaskStartedEventId at all from the state machines.
174
  // The only place where it's used is WorkflowTaskStateMachine to understand that there will be
175
  // no completion event in the history.
176
  // This is tricky, because direct queries come with 0 as a magic value in this field if
177
  // execution is completed for example.
178
  // Most likely we can rework WorkflowTaskStateMachine to use only hasNext.
179
  /**
180
   * @param workflowTaskStartedEventId eventId of the workflowTask that was picked up by a worker
181
   *     and triggered an execution. Used in {@link WorkflowTaskStateMachine} only to understand
182
   *     that this workflow task will not have a matching closing event and needs to be executed.
183
   */
184
  public void setWorklfowStartedEventId(long workflowTaskStartedEventId) {
185
    this.workflowTaskStartedEventId = workflowTaskStartedEventId;
1✔
186
  }
1✔
187

188
  public void setCurrentStartedEventId(long eventId) {
189
    this.currentStartedEventId = eventId;
×
190
  }
×
191

192
  public long getCurrentStartedEventId() {
193
    return currentStartedEventId;
1✔
194
  }
195

196
  public void setReplaying(boolean replaying) {
197
    this.replaying = replaying;
1✔
198
  }
1✔
199

200
  public void setMessages(List<Message> messages) {
201
    this.messages = messages;
1✔
202
  }
1✔
203

204
  /**
205
   * Handle a single event from the workflow history.
206
   *
207
   * @param event event from the history.
208
   * @param hasNextEvent false if this is the last event in the history.
209
   */
210
  public void handleEvent(HistoryEvent event, boolean hasNextEvent) {
211
    long eventId = event.getEventId();
1✔
212
    if (eventId <= lastHandledEventId) {
1✔
213
      // already handled
214
      return;
1✔
215
    }
216
    Preconditions.checkState(
1✔
217
        eventId == lastHandledEventId + 1,
218
        "History is out of order. "
219
            + "There is a gap between the last event workflow state machine observed and currently handling event. "
220
            + "Last processed eventId: %s, handling eventId: %s",
221
        lastHandledEventId,
222
        eventId);
223

224
    lastHandledEventId = eventId;
1✔
225
    boolean readyToPeek = wftBuffer.addEvent(event, hasNextEvent);
1✔
226
    if (readyToPeek) {
1✔
227
      handleEventsBatch(wftBuffer.fetch(), hasNextEvent);
1✔
228
    }
229
  }
1✔
230

231
  /**
232
   * Handle an events batch for one workflow task. Events that are related to one workflow task
233
   * during replay should be prefetched and supplied in one batch.
234
   *
235
   * @param events events belong to one workflow task
236
   * @param hasNextEvent true if there are more events in the history follow this batch, false if
237
   *     this batch contains the last events of the history
238
   */
239
  private void handleEventsBatch(List<HistoryEvent> events, boolean hasNextEvent) {
240
    if (EventType.EVENT_TYPE_WORKFLOW_TASK_STARTED.equals(events.get(0).getEventType())) {
1✔
241
      for (HistoryEvent event : events) {
1✔
242
        try {
243
          preloadVersionMarker(event);
1✔
244
        } catch (RuntimeException e) {
1✔
245
          throw createEventProcessingException(e, event);
1✔
246
        }
1✔
247
      }
1✔
248
    }
249

250
    // Look ahead to infer protocol messages
251
    if (EventType.EVENT_TYPE_WORKFLOW_TASK_STARTED.equals(events.get(0).getEventType())) {
1✔
252
      if (this.isReplaying()) {
1✔
253
        for (HistoryEvent event : events) {
1✔
254
          if (event.hasWorkflowExecutionUpdateAcceptedEventAttributes()) {
1✔
255
            WorkflowExecutionUpdateAcceptedEventAttributes updateEvent =
1✔
256
                event.getWorkflowExecutionUpdateAcceptedEventAttributes();
1✔
257
            this.messages.add(
1✔
258
                Message.newBuilder()
1✔
259
                    .setId(updateEvent.getAcceptedRequestMessageId())
1✔
260
                    .setProtocolInstanceId(updateEvent.getProtocolInstanceId())
1✔
261
                    .setEventId(updateEvent.getAcceptedRequestSequencingEventId())
1✔
262
                    .setBody(Any.pack(updateEvent.getAcceptedRequest()))
1✔
263
                    .build());
1✔
264
          }
265
        }
1✔
266
      }
267
    }
268

269
    for (Iterator<HistoryEvent> iterator = events.iterator(); iterator.hasNext(); ) {
1✔
270
      HistoryEvent event = iterator.next();
1✔
271

272
      // On replay the messages are available after the workflow task schedule event, so we
273
      // need to handle them before workflow task started event to maintain a consistent order.
274
      for (Message msg : this.takeLTE(event.getEventId() - 1)) {
1✔
275
        handleSingleMessage(msg);
1✔
276
      }
1✔
277

278
      try {
279
        handleSingleEvent(event, iterator.hasNext() || hasNextEvent);
1✔
280
      } catch (RuntimeException e) {
1✔
281
        throw createEventProcessingException(e, event);
1✔
282
      }
1✔
283

284
      for (Message msg : this.takeLTE(event.getEventId())) {
1✔
285
        handleSingleMessage(msg);
×
286
      }
×
287
    }
1✔
288
  }
1✔
289

290
  private List<Message> takeLTE(long eventId) {
291
    List<Message> m = new ArrayList<Message>();
1✔
292
    List<Message> remainingMessages = new ArrayList<Message>();
1✔
293
    for (Message msg : this.messages) {
1✔
294
      if (msg.getEventId() > eventId) {
1✔
295
        remainingMessages.add(msg);
×
296
      } else {
297
        m.add(msg);
1✔
298
      }
299
    }
1✔
300
    this.messages = remainingMessages;
1✔
301
    return m;
1✔
302
  }
303

304
  private RuntimeException createEventProcessingException(RuntimeException e, HistoryEvent event) {
305
    Throwable ex = unwrap(e);
1✔
306
    if (ex instanceof NonDeterministicException) {
1✔
307
      // just appending the message in front of an existing message, saving the original stacktrace
308
      NonDeterministicException modifiedException =
1✔
309
          new NonDeterministicException(
310
              createEventHandlingMessage(event)
1✔
311
                  + ". "
312
                  + ex.getMessage()
1✔
313
                  + ". "
314
                  + createShortCurrentStateMessagePostfix(),
1✔
315
              ex.getCause());
1✔
316
      modifiedException.setStackTrace(ex.getStackTrace());
1✔
317
      return modifiedException;
1✔
318
    } else {
319
      return new InternalWorkflowTaskException(
1✔
320
          createEventHandlingMessage(event) + ". " + createShortCurrentStateMessagePostfix(), ex);
1✔
321
    }
322
  }
323

324
  private void handleSingleMessage(Message message) {
325
    // Get or create protocol state machine based on Instance ID and protocolName
326
    EntityStateMachine stateMachine =
1✔
327
        protocolStateMachines.computeIfAbsent(
1✔
328
            message.getProtocolInstanceId(),
1✔
329
            (protocolInstance) -> {
330
              String protocolName = ProtocolUtils.getProtocol(message);
1✔
331
              Optional<ProtocolType> type = ProtocolType.get(protocolName);
1✔
332
              if (type.isPresent()) {
1✔
333
                switch (type.get()) {
1✔
334
                  case UPDATE_V1:
335
                    return UpdateProtocolStateMachine.newInstance(
1✔
336
                        this::isReplaying,
337
                        callbacks::update,
1✔
338
                        this::sendMessage,
339
                        commandSink,
340
                        stateMachineSink);
341
                  default:
342
                    throw new IllegalArgumentException("Unknown protocol type:" + protocolName);
×
343
                }
344
              }
345
              throw new IllegalArgumentException("Protocol type not specified:" + message);
×
346
            });
347
    stateMachine.handleMessage(message);
1✔
348
  }
1✔
349

350
  private void handleSingleEvent(HistoryEvent event, boolean hasNextEvent) {
351
    if (isCommandEvent(event)) {
1✔
352
      handleCommandEvent(event);
1✔
353
      return;
1✔
354
    }
355

356
    if (replaying
1✔
357
        && !hasNextEvent
358
        && (event.getEventType() == EventType.EVENT_TYPE_WORKFLOW_TASK_STARTED
1✔
359
            || WorkflowExecutionUtils.isWorkflowTaskClosedEvent(event))) {
1✔
360
      replaying = false;
1✔
361
    }
362

363
    Long initialCommandEventId = getInitialCommandEventId(event);
1✔
364
    EntityStateMachine c = stateMachines.get(initialCommandEventId);
1✔
365
    if (c != null) {
1✔
366
      c.handleEvent(event, hasNextEvent);
1✔
367
      if (c.isFinalState()) {
1✔
368
        stateMachines.remove(initialCommandEventId);
1✔
369
      }
370
    } else {
371
      handleNonStatefulEvent(event, hasNextEvent);
1✔
372
    }
373
  }
1✔
374

375
  /**
376
   * Handles command event. Command event is an event which is generated from a command emitted by a
377
   * past decision. Each command has a correspondent event. For example ScheduleActivityTaskCommand
378
   * is recorded to the history as ActivityTaskScheduledEvent.
379
   *
380
   * <p>Command events always follow WorkflowTaskCompletedEvent.
381
   *
382
   * <p>The handling consists from verifying that the next command in the commands queue matches the
383
   * event, command state machine is notified about the event and the command is removed from the
384
   * commands queue.
385
   */
386
  private void handleCommandEvent(HistoryEvent event) {
387
    if (handleLocalActivityMarker(event)) {
1✔
388
      return;
1✔
389
    }
390
    // Currently Update events are command events that have no
391
    // associated command so there is nothing to handle currently.
392
    // Once the server supports ProtocolMessageCommand we can handle them here.
393
    // TODO(https://github.com/temporalio/sdk-java/issues/1744)
394
    if (event.hasWorkflowExecutionUpdateAcceptedEventAttributes()
1✔
395
        || event.hasWorkflowExecutionUpdateCompletedEventAttributes()) {
1✔
396
      return;
1✔
397
    }
398
    // Match event to the next command in the stateMachine queue.
399
    // After matching the command is notified about the event and is removed from the
400
    // queue.
401
    CancellableCommand matchingCommand = null;
1✔
402
    while (matchingCommand == null) {
1✔
403
      // handleVersionMarker can skip a marker event if the getVersion call was removed.
404
      // In this case we don't want to consume a command.
405
      // That's why peek is used instead of poll.
406
      CancellableCommand command = commands.peek();
1✔
407
      if (command == null) {
1✔
408
        if (handleNonMatchingVersionMarker(event)) {
×
409
          // this event is a version marker for removed getVersion call.
410
          // Handle the version marker as unmatched and return even if there is no commands to match
411
          // it against.
412
          return;
×
413
        } else {
414
          throw new NonDeterministicException("No command scheduled that corresponds to " + event);
×
415
        }
416
      }
417

418
      if (command.isCanceled()) {
1✔
419
        // Consume and skip the command
420
        commands.poll();
1✔
421
        continue;
1✔
422
      }
423

424
      // Note that handleEvent can cause a command cancellation in case of
425
      // 1. MutableSideEffect
426
      // 2. Version State Machine during replay cancels the command and enters SKIPPED state
427
      //    if it handled non-matching event.
428
      HandleEventStatus status = command.handleEvent(event, true);
1✔
429

430
      if (command.isCanceled()) {
1✔
431
        // Consume and skip the command
432
        commands.poll();
1✔
433
        continue;
1✔
434
      }
435

436
      switch (status) {
1✔
437
        case OK:
438
          // Consume the command
439
          commands.poll();
1✔
440
          matchingCommand = command;
1✔
441
          break;
1✔
442
        case NON_MATCHING_EVENT:
443
          if (handleNonMatchingVersionMarker(event)) {
1✔
444
            // this event is a version marker for removed getVersion call.
445
            // Handle the version marker as unmatched and return without consuming the command
446
            return;
1✔
447
          } else {
448
            throw new NonDeterministicException(
1✔
449
                "Event "
450
                    + event.getEventId()
1✔
451
                    + " of type "
452
                    + event.getEventType()
1✔
453
                    + " does not"
454
                    + " match command type "
455
                    + command.getCommandType());
1✔
456
          }
457
        default:
458
          throw new IllegalStateException(
×
459
              "Got " + status + " value from command.handleEvent which is not handled");
460
      }
461
    }
1✔
462

463
    validateCommand(matchingCommand.getCommand(), event);
1✔
464
    EntityStateMachine stateMachine = matchingCommand.getStateMachine();
1✔
465
    if (!stateMachine.isFinalState()) {
1✔
466
      stateMachines.put(event.getEventId(), stateMachine);
1✔
467
    }
468
    // Marker is the only command processing of which can cause workflow code execution
469
    // and generation of new state machines.
470
    if (event.getEventType() == EventType.EVENT_TYPE_MARKER_RECORDED) {
1✔
471
      prepareCommands();
1✔
472
    }
473
  }
1✔
474

475
  private void preloadVersionMarker(HistoryEvent event) {
476
    if (VersionMarkerUtils.hasVersionMarkerStructure(event)) {
1✔
477
      String changeId = VersionMarkerUtils.tryGetChangeIdFromVersionMarkerEvent(event);
1✔
478
      if (changeId == null) {
1✔
479
        // if we can't extract changeId, this event will later fail to match with anything
480
        // and the corresponded exception will be thrown
481
        return;
×
482
      }
483
      VersionStateMachine versionStateMachine =
1✔
484
          versions.computeIfAbsent(
1✔
485
              changeId,
486
              (idKey) ->
487
                  VersionStateMachine.newInstance(
1✔
488
                      changeId, this::isReplaying, commandSink, stateMachineSink));
489
      versionStateMachine.handleMarkersPreload(event);
1✔
490
    }
491
  }
1✔
492

493
  private boolean handleNonMatchingVersionMarker(HistoryEvent event) {
494
    String changeId = VersionMarkerUtils.tryGetChangeIdFromVersionMarkerEvent(event);
1✔
495
    if (changeId == null) {
1✔
496
      return false;
1✔
497
    }
498
    VersionStateMachine versionStateMachine = versions.get(changeId);
1✔
499
    Preconditions.checkNotNull(
1✔
500
        versionStateMachine,
501
        "versionStateMachine is expected to be initialized already by execution or preloading");
502
    versionStateMachine.handleNonMatchingEvent(event);
1✔
503
    return true;
1✔
504
  }
505

506
  public List<Command> takeCommands() {
507
    List<Command> result = new ArrayList<>(commands.size());
1✔
508
    for (CancellableCommand command : commands) {
1✔
509
      if (!command.isCanceled()) {
1✔
510
        result.add(command.getCommand());
1✔
511
      }
512
    }
1✔
513
    return result;
1✔
514
  }
515

516
  public void sendMessage(Message message) {
517
    checkEventLoopExecuting();
1✔
518
    if (!isReplaying()) {
1✔
519
      messageOutbox.add(message);
1✔
520
    }
521
  }
1✔
522

523
  public List<Message> takeMessages() {
524
    List<Message> result = new ArrayList<>(messageOutbox.size());
1✔
525
    for (Message message : messageOutbox) {
1✔
526
      result.add(message);
×
527
    }
×
528
    messageOutbox.clear();
1✔
529
    return result;
1✔
530
  }
531

532
  private void prepareCommands() {
533
    if (preparing) {
1✔
534
      return;
1✔
535
    }
536
    preparing = true;
1✔
537
    try {
538
      prepareImpl();
1✔
539
    } finally {
540
      preparing = false;
1✔
541
    }
542
  }
1✔
543

544
  private void prepareImpl() {
545
    // handleCommand can lead to code execution because of SideEffect, MutableSideEffect or local
546
    // activity completion. And code execution can lead to creation of new commands and
547
    // cancellation of existing commands. That is the reason for using Queue as a data structure for
548
    // commands.
549
    while (true) {
550
      CancellableCommand command = cancellableCommands.poll();
1✔
551
      if (command == null) {
1✔
552
        break;
1✔
553
      }
554
      // handleCommand should be called even on canceled ones to support mutableSideEffect
555
      command.handleCommand(command.getCommandType());
1✔
556
      commands.add(command);
1✔
557
    }
1✔
558
  }
1✔
559

560
  /**
561
   * Local activity is different from all other entities. It doesn't schedule a marker command when
562
   * the {@link #scheduleLocalActivityTask(ExecuteLocalActivityParameters, Functions.Proc2)} is
563
   * called. The marker is scheduled only when activity completes through ({@link
564
   * #handleLocalActivityCompletion(LocalActivityResult)}). That's why the normal logic of {@link
565
   * #handleCommandEvent(HistoryEvent)}, which assumes that each event has a correspondent command
566
   * during replay, doesn't work. Instead, local activities are matched by their id using
567
   * localActivityMap.
568
   *
569
   * @return true if matched and false if normal event handling should continue.
570
   */
571
  private boolean handleLocalActivityMarker(HistoryEvent event) {
572
    if (!LocalActivityMarkerUtils.hasLocalActivityStructure(event)) {
1✔
573
      return false;
1✔
574
    }
575

576
    MarkerRecordedEventAttributes markerAttributes = event.getMarkerRecordedEventAttributes();
1✔
577
    String id = LocalActivityMarkerUtils.getActivityId(markerAttributes);
1✔
578
    LocalActivityStateMachine stateMachine = localActivityMap.remove(id);
1✔
579
    if (stateMachine == null) {
1✔
580
      String activityType = LocalActivityMarkerUtils.getActivityTypeName(markerAttributes);
×
581
      throw new NonDeterministicException(
×
582
          String.format(
×
583
              "Local activity of type %s is recorded in the history with id %s but was not expected by the execution",
584
              activityType, id));
585
    }
586
    // RESULT_NOTIFIED state means that there is outstanding command that has to be matched
587
    // using standard logic. So return false to let the handleCommand method to run its standard
588
    // logic.
589
    if (stateMachine.getState() == LocalActivityStateMachine.State.RESULT_NOTIFIED) {
1✔
590
      return false;
1✔
591
    }
592
    stateMachine.handleEvent(event, true);
1✔
593
    eventLoop();
1✔
594
    return true;
1✔
595
  }
596

597
  private void eventLoop() {
598
    if (eventLoopExecuting) {
1✔
599
      return;
1✔
600
    }
601
    eventLoopExecuting = true;
1✔
602
    try {
603
      callbacks.eventLoop();
1✔
604
    } finally {
605
      eventLoopExecuting = false;
1✔
606
    }
607
    prepareCommands();
1✔
608
  }
1✔
609

610
  private void handleNonStatefulEvent(HistoryEvent event, boolean hasNextEvent) {
611
    switch (event.getEventType()) {
1✔
612
      case EVENT_TYPE_WORKFLOW_EXECUTION_STARTED:
613
        this.currentRunId =
1✔
614
            event.getWorkflowExecutionStartedEventAttributes().getOriginalExecutionRunId();
1✔
615
        callbacks.start(event);
1✔
616
        break;
1✔
617
      case EVENT_TYPE_WORKFLOW_TASK_SCHEDULED:
618
        WorkflowTaskStateMachine c =
1✔
619
            WorkflowTaskStateMachine.newInstance(
1✔
620
                workflowTaskStartedEventId, new WorkflowTaskCommandsListener());
621
        c.handleEvent(event, hasNextEvent);
1✔
622
        stateMachines.put(event.getEventId(), c);
1✔
623
        break;
1✔
624
      case EVENT_TYPE_WORKFLOW_EXECUTION_SIGNALED:
625
        callbacks.signal(event);
1✔
626
        break;
1✔
627
      case EVENT_TYPE_WORKFLOW_EXECUTION_CANCEL_REQUESTED:
628
        callbacks.cancel(event);
1✔
629
        break;
1✔
630
      case EVENT_TYPE_WORKFLOW_EXECUTION_TIMED_OUT:
631
      case UNRECOGNIZED:
632
        break;
1✔
633
      default:
634
        throw new IllegalArgumentException("Unexpected event:" + event);
×
635
    }
636
  }
1✔
637

638
  private long setCurrentTimeMillis(long currentTimeMillis) {
639
    if (this.currentTimeMillis < currentTimeMillis) {
1✔
640
      this.currentTimeMillis = currentTimeMillis;
1✔
641
    }
642
    return this.currentTimeMillis;
1✔
643
  }
644

645
  public long getLastStartedEventId() {
646
    return currentStartedEventId;
1✔
647
  }
648

649
  /**
650
   * @param attributes attributes used to schedule an activity
651
   * @param callback completion callback
652
   * @return an instance of ActivityCommands
653
   */
654
  public Functions.Proc scheduleActivityTask(
655
      ExecuteActivityParameters attributes, Functions.Proc2<Optional<Payloads>, Failure> callback) {
656
    checkEventLoopExecuting();
1✔
657
    ActivityStateMachine activityStateMachine =
1✔
658
        ActivityStateMachine.newInstance(
1✔
659
            attributes,
660
            (p, f) -> {
661
              Failure failure = f != null ? f.getFailure() : null;
1✔
662
              callback.apply(p, failure);
1✔
663

664
              if (f != null
1✔
665
                  && !f.isFromEvent()
1✔
666
                  && failure.hasCause()
1✔
667
                  && failure.getCause().hasCanceledFailureInfo()) {
1✔
668
                // If !f.isFromEvent(), we want to unblock the event loop as the promise got filled
669
                // and the workflow may make progress. If f.isFromEvent(), we need to delay event
670
                // loop triggering until WorkflowTaskStarted.
671
                eventLoop();
1✔
672
              }
673
            },
1✔
674
            commandSink,
675
            stateMachineSink);
676
    return activityStateMachine::cancel;
1✔
677
  }
678

679
  /**
680
   * Creates a new timer state machine
681
   *
682
   * @param attributes timer command attributes
683
   * @param completionCallback invoked when timer fires or reports cancellation. One of
684
   *     TimerFiredEvent, TimerCanceledEvent.
685
   * @return cancellation callback that should be invoked to initiate timer cancellation
686
   */
687
  public Functions.Proc newTimer(
688
      StartTimerCommandAttributes attributes, Functions.Proc1<HistoryEvent> completionCallback) {
689
    checkEventLoopExecuting();
1✔
690
    TimerStateMachine timer =
1✔
691
        TimerStateMachine.newInstance(
1✔
692
            attributes,
693
            (event) -> {
694
              completionCallback.apply(event);
1✔
695
              // Needed due to immediate cancellation
696
              if (event.getEventType() == EventType.EVENT_TYPE_TIMER_CANCELED) {
1✔
697
                eventLoop();
1✔
698
              }
699
            },
1✔
700
            commandSink,
701
            stateMachineSink);
702
    return timer::cancel;
1✔
703
  }
704

705
  /**
706
   * Creates a new child state machine
707
   *
708
   * @param parameters child workflow start command parameters
709
   * @param startedCallback callback that is notified about child start
710
   * @param completionCallback invoked when child reports completion or failure
711
   * @return cancellation callback that should be invoked to cancel the child
712
   */
713
  public Functions.Proc startChildWorkflow(
714
      StartChildWorkflowExecutionParameters parameters,
715
      Functions.Proc2<WorkflowExecution, Exception> startedCallback,
716
      Functions.Proc2<Optional<Payloads>, Exception> completionCallback) {
717
    checkEventLoopExecuting();
1✔
718
    StartChildWorkflowExecutionCommandAttributes attributes = parameters.getRequest().build();
1✔
719
    ChildWorkflowCancellationType cancellationType = parameters.getCancellationType();
1✔
720
    ChildWorkflowStateMachine child =
1✔
721
        ChildWorkflowStateMachine.newInstance(
1✔
722
            attributes, startedCallback, completionCallback, commandSink, stateMachineSink);
723
    return () -> {
1✔
724
      if (cancellationType == ChildWorkflowCancellationType.ABANDON) {
1✔
725
        notifyChildCanceled(completionCallback);
1✔
726
        return;
1✔
727
      }
728
      // The only time child can be canceled directly is before its start command
729
      // was sent out to the service. After that RequestCancelExternal should be used.
730
      if (child.isCancellable()) {
1✔
731
        child.cancel();
1✔
732
        return;
1✔
733
      }
734
      if (!child.isFinalState()) {
1✔
735
        requestCancelExternalWorkflowExecution(
1✔
736
            RequestCancelExternalWorkflowExecutionCommandAttributes.newBuilder()
1✔
737
                .setWorkflowId(attributes.getWorkflowId())
1✔
738
                .setNamespace(attributes.getNamespace())
1✔
739
                .setChildWorkflowOnly(true)
1✔
740
                .build(),
1✔
741
            (r, e) -> { // TODO(maxim): Decide what to do if an error is passed to the callback.
742
              if (cancellationType == ChildWorkflowCancellationType.WAIT_CANCELLATION_REQUESTED) {
1✔
743
                notifyChildCanceled(completionCallback);
1✔
744
              }
745
            });
1✔
746
        if (cancellationType == ChildWorkflowCancellationType.TRY_CANCEL) {
1✔
747
          notifyChildCanceled(completionCallback);
1✔
748
        }
749
      }
750
    };
1✔
751
  }
752

753
  private void notifyChildCanceled(
754
      Functions.Proc2<Optional<Payloads>, Exception> completionCallback) {
755
    CanceledFailure failure = new CanceledFailure("Child canceled");
1✔
756
    completionCallback.apply(Optional.empty(), failure);
1✔
757
    eventLoop();
1✔
758
  }
1✔
759

760
  /**
761
   * @param attributes
762
   * @param completionCallback invoked when signal delivery completes of fails. The following types
763
   */
764
  public Functions.Proc signalExternalWorkflowExecution(
765
      SignalExternalWorkflowExecutionCommandAttributes attributes,
766
      Functions.Proc2<Void, Failure> completionCallback) {
767
    checkEventLoopExecuting();
1✔
768
    return SignalExternalStateMachine.newInstance(
1✔
769
        attributes, completionCallback, commandSink, stateMachineSink);
770
  }
771

772
  /**
773
   * @param attributes attributes to use to cancel external workflow
774
   * @param completionCallback one of ExternalWorkflowExecutionCancelRequestedEvent,
775
   */
776
  public void requestCancelExternalWorkflowExecution(
777
      RequestCancelExternalWorkflowExecutionCommandAttributes attributes,
778
      Functions.Proc2<Void, RuntimeException> completionCallback) {
779
    checkEventLoopExecuting();
1✔
780
    CancelExternalStateMachine.newInstance(
1✔
781
        attributes, completionCallback, commandSink, stateMachineSink);
782
  }
1✔
783

784
  public void upsertSearchAttributes(SearchAttributes attributes) {
785
    checkEventLoopExecuting();
1✔
786
    UpsertSearchAttributesStateMachine.newInstance(attributes, commandSink, stateMachineSink);
1✔
787
  }
1✔
788

789
  public void completeWorkflow(Optional<Payloads> workflowOutput) {
790
    checkEventLoopExecuting();
1✔
791
    CompleteWorkflowStateMachine.newInstance(workflowOutput, commandSink, stateMachineSink);
1✔
792
  }
1✔
793

794
  public void failWorkflow(Failure failure) {
795
    checkEventLoopExecuting();
1✔
796
    FailWorkflowStateMachine.newInstance(failure, commandSink, stateMachineSink);
1✔
797
  }
1✔
798

799
  public void cancelWorkflow() {
800
    checkEventLoopExecuting();
1✔
801
    CancelWorkflowStateMachine.newInstance(
1✔
802
        CancelWorkflowExecutionCommandAttributes.getDefaultInstance(),
1✔
803
        commandSink,
804
        stateMachineSink);
805
  }
1✔
806

807
  public void continueAsNewWorkflow(ContinueAsNewWorkflowExecutionCommandAttributes attributes) {
808
    checkEventLoopExecuting();
1✔
809
    ContinueAsNewWorkflowStateMachine.newInstance(attributes, commandSink, stateMachineSink);
1✔
810
  }
1✔
811

812
  public boolean isReplaying() {
813
    return replaying;
1✔
814
  }
815

816
  public long currentTimeMillis() {
817
    return currentTimeMillis;
1✔
818
  }
819

820
  public UUID randomUUID() {
821
    checkEventLoopExecuting();
1✔
822
    String runId = currentRunId;
1✔
823
    if (runId == null) {
1✔
824
      throw new Error("null currentRunId");
×
825
    }
826
    String id = runId + ":" + idCounter++;
1✔
827
    byte[] bytes = id.getBytes(StandardCharsets.UTF_8);
1✔
828
    return UUID.nameUUIDFromBytes(bytes);
1✔
829
  }
830

831
  public Random newRandom() {
832
    checkEventLoopExecuting();
1✔
833
    return new Random(randomUUID().getLeastSignificantBits());
1✔
834
  }
835

836
  public void sideEffect(
837
      Functions.Func<Optional<Payloads>> func, Functions.Proc1<Optional<Payloads>> callback) {
838
    checkEventLoopExecuting();
1✔
839
    SideEffectStateMachine.newInstance(
1✔
840
        this::isReplaying,
841
        func,
842
        (payloads) -> {
843
          callback.apply(payloads);
1✔
844
          // callback unblocked sideEffect call. Give workflow code chance to make progress.
845
          eventLoop();
1✔
846
        },
1✔
847
        commandSink,
848
        stateMachineSink);
849
  }
1✔
850

851
  /**
852
   * @param id mutable side effect id
853
   * @param func given the value from the last marker returns value to store. If result is empty
854
   *     nothing is recorded into the history.
855
   * @param callback used to report result or failure
856
   */
857
  public void mutableSideEffect(
858
      String id,
859
      Functions.Func1<Optional<Payloads>, Optional<Payloads>> func,
860
      Functions.Proc1<Optional<Payloads>> callback) {
861
    checkEventLoopExecuting();
1✔
862
    MutableSideEffectStateMachine stateMachine =
1✔
863
        mutableSideEffects.computeIfAbsent(
1✔
864
            id,
865
            (idKey) ->
866
                MutableSideEffectStateMachine.newInstance(
1✔
867
                    idKey, this::isReplaying, commandSink, stateMachineSink));
868
    stateMachine.mutableSideEffect(
1✔
869
        func,
870
        (r) -> {
871
          callback.apply(r);
1✔
872
          // callback unblocked mutableSideEffect call. Give workflow code chance to make progress.
873
          eventLoop();
1✔
874
        },
1✔
875
        stateMachineSink);
876
  }
1✔
877

878
  public void getVersion(
879
      String changeId,
880
      int minSupported,
881
      int maxSupported,
882
      Functions.Proc2<Integer, RuntimeException> callback) {
883
    VersionStateMachine stateMachine =
1✔
884
        versions.computeIfAbsent(
1✔
885
            changeId,
886
            (idKey) ->
887
                VersionStateMachine.newInstance(
1✔
888
                    changeId, this::isReplaying, commandSink, stateMachineSink));
889
    stateMachine.getVersion(
1✔
890
        minSupported,
891
        maxSupported,
892
        (v, e) -> {
893
          callback.apply(v, e);
1✔
894
          // without this getVersion call will trigger the end of WFT,
895
          // instead we want to prepare subsequent commands and unblock the execution one more
896
          // time.
897
          eventLoop();
1✔
898
        });
1✔
899
  }
1✔
900

901
  public List<ExecuteLocalActivityParameters> takeLocalActivityRequests() {
902
    List<ExecuteLocalActivityParameters> result = localActivityRequests;
1✔
903
    localActivityRequests = new ArrayList<>();
1✔
904
    for (ExecuteLocalActivityParameters parameters : result) {
1✔
905
      LocalActivityStateMachine stateMachine = localActivityMap.get(parameters.getActivityId());
1✔
906
      stateMachine.markAsSent();
1✔
907
    }
1✔
908
    return result;
1✔
909
  }
910

911
  public void handleLocalActivityCompletion(LocalActivityResult laCompletion) {
912
    String activityId = laCompletion.getActivityId();
1✔
913
    LocalActivityStateMachine laStateMachine = localActivityMap.get(activityId);
1✔
914
    if (laStateMachine == null) {
1✔
915
      throw new IllegalStateException("Unknown local activity: " + activityId);
×
916
    }
917
    laStateMachine.handleCompletion(laCompletion);
1✔
918
    prepareCommands();
1✔
919
  }
1✔
920

921
  public Functions.Proc scheduleLocalActivityTask(
922
      ExecuteLocalActivityParameters parameters,
923
      Functions.Proc2<Optional<Payloads>, LocalActivityCallback.LocalActivityFailedException>
924
          callback) {
925
    checkEventLoopExecuting();
1✔
926
    String activityId = parameters.getActivityId();
1✔
927
    if (Strings.isNullOrEmpty(activityId)) {
1✔
928
      throw new IllegalArgumentException("Missing activityId: " + activityId);
×
929
    }
930
    if (localActivityMap.containsKey(activityId)) {
1✔
931
      throw new IllegalArgumentException("Duplicated local activity id: " + activityId);
×
932
    }
933
    LocalActivityStateMachine commands =
1✔
934
        LocalActivityStateMachine.newInstance(
1✔
935
            this::isReplaying,
936
            this::setCurrentTimeMillis,
937
            parameters,
938
            (r, e) -> {
939
              callback.apply(r, e);
1✔
940
              // callback unblocked local activity call. Give workflow code chance to make progress.
941
              eventLoop();
1✔
942
            },
1✔
943
            localActivityRequestSink,
944
            commandSink,
945
            stateMachineSink,
946
            currentTimeMillis);
947
    localActivityMap.put(activityId, commands);
1✔
948
    return commands::cancel;
1✔
949
  }
950

951
  /** Validates that command matches the event during replay. */
952
  private void validateCommand(Command command, HistoryEvent event) {
953
    // TODO(maxim): Add more thorough validation logic. For example check if activity IDs are
954
    // matching.
955
    assertMatch(
1✔
956
        command,
957
        event,
958
        "eventType",
959
        getEventTypeForCommand(command.getCommandType()),
1✔
960
        event.getEventType());
1✔
961
    switch (command.getCommandType()) {
1✔
962
      case COMMAND_TYPE_SCHEDULE_ACTIVITY_TASK:
963
        {
964
          ScheduleActivityTaskCommandAttributes commandAttributes =
1✔
965
              command.getScheduleActivityTaskCommandAttributes();
1✔
966
          ActivityTaskScheduledEventAttributes eventAttributes =
1✔
967
              event.getActivityTaskScheduledEventAttributes();
1✔
968
          assertMatch(
1✔
969
              command,
970
              event,
971
              "activityId",
972
              commandAttributes.getActivityId(),
1✔
973
              eventAttributes.getActivityId());
1✔
974
          assertMatch(
1✔
975
              command,
976
              event,
977
              "activityType",
978
              commandAttributes.getActivityType(),
1✔
979
              eventAttributes.getActivityType());
1✔
980
        }
981
        break;
1✔
982
      case COMMAND_TYPE_START_CHILD_WORKFLOW_EXECUTION:
983
        {
984
          StartChildWorkflowExecutionCommandAttributes commandAttributes =
1✔
985
              command.getStartChildWorkflowExecutionCommandAttributes();
1✔
986
          StartChildWorkflowExecutionInitiatedEventAttributes eventAttributes =
1✔
987
              event.getStartChildWorkflowExecutionInitiatedEventAttributes();
1✔
988
          assertMatch(
1✔
989
              command,
990
              event,
991
              "workflowId",
992
              commandAttributes.getWorkflowId(),
1✔
993
              eventAttributes.getWorkflowId());
1✔
994
          assertMatch(
1✔
995
              command,
996
              event,
997
              "workflowType",
998
              commandAttributes.getWorkflowType(),
1✔
999
              eventAttributes.getWorkflowType());
1✔
1000
        }
1001
        break;
1✔
1002
      case COMMAND_TYPE_REQUEST_CANCEL_ACTIVITY_TASK:
1003
      case COMMAND_TYPE_START_TIMER:
1004
        {
1005
          StartTimerCommandAttributes commandAttributes = command.getStartTimerCommandAttributes();
1✔
1006
          TimerStartedEventAttributes eventAttributes = event.getTimerStartedEventAttributes();
1✔
1007
          assertMatch(
1✔
1008
              command,
1009
              event,
1010
              "timerId",
1011
              commandAttributes.getTimerId(),
1✔
1012
              eventAttributes.getTimerId());
1✔
1013
        }
1014
        break;
1✔
1015
      case COMMAND_TYPE_CANCEL_TIMER:
1016
      case COMMAND_TYPE_CANCEL_WORKFLOW_EXECUTION:
1017
      case COMMAND_TYPE_REQUEST_CANCEL_EXTERNAL_WORKFLOW_EXECUTION:
1018
      case COMMAND_TYPE_RECORD_MARKER:
1019
      case COMMAND_TYPE_CONTINUE_AS_NEW_WORKFLOW_EXECUTION:
1020
      case COMMAND_TYPE_SIGNAL_EXTERNAL_WORKFLOW_EXECUTION:
1021
      case COMMAND_TYPE_UPSERT_WORKFLOW_SEARCH_ATTRIBUTES:
1022
      case COMMAND_TYPE_COMPLETE_WORKFLOW_EXECUTION:
1023
      case COMMAND_TYPE_FAIL_WORKFLOW_EXECUTION:
1024
        break;
1✔
1025
      case UNRECOGNIZED:
1026
      case COMMAND_TYPE_UNSPECIFIED:
1027
        throw new IllegalArgumentException("Unexpected command type: " + command.getCommandType());
×
1028
    }
1029
  }
1✔
1030

1031
  private void assertMatch(
1032
      Command command, HistoryEvent event, String checkType, Object expected, Object actual) {
1033
    if (!expected.equals(actual)) {
1✔
1034
      String message =
×
1035
          String.format(
×
1036
              "Command %s doesn't match event %s with EventId=%s on check %s "
1037
                  + "with an expected value '%s' and an actual value '%s'",
1038
              command.getCommandType(),
×
1039
              event.getEventType(),
×
1040
              event.getEventId(),
×
1041
              checkType,
1042
              expected,
1043
              actual);
1044
      throw new NonDeterministicException(message);
×
1045
    }
1046
  }
1✔
1047

1048
  private class WorkflowTaskCommandsListener implements WorkflowTaskStateMachine.Listener {
1✔
1049
    @Override
1050
    public void workflowTaskStarted(
1051
        long startedEventId, long currentTimeMillis, boolean nonProcessedWorkflowTask) {
1052
      setCurrentTimeMillis(currentTimeMillis);
1✔
1053
      for (CancellableCommand cancellableCommand : commands) {
1✔
1054
        cancellableCommand.handleWorkflowTaskStarted();
1✔
1055
      }
1✔
1056
      // Give local activities a chance to recreate their requests if they were lost due
1057
      // to the last workflow task failure. The loss could happen only the last workflow task
1058
      // was forcibly created by setting forceCreate on RespondWorkflowTaskCompletedRequest.
1059
      if (nonProcessedWorkflowTask) {
1✔
1060
        for (LocalActivityStateMachine value : localActivityMap.values()) {
1✔
1061
          value.nonReplayWorkflowTaskStarted();
1✔
1062
        }
1✔
1063
      }
1064
      WorkflowStateMachines.this.currentStartedEventId = startedEventId;
1✔
1065

1066
      eventLoop();
1✔
1067
    }
1✔
1068

1069
    @Override
1070
    public void updateRunId(String currentRunId) {
1071
      WorkflowStateMachines.this.currentRunId = currentRunId;
×
1072
    }
×
1073
  }
1074

1075
  private long getInitialCommandEventId(HistoryEvent event) {
1076
    switch (event.getEventType()) {
1✔
1077
      case EVENT_TYPE_ACTIVITY_TASK_STARTED:
1078
        return event.getActivityTaskStartedEventAttributes().getScheduledEventId();
1✔
1079
      case EVENT_TYPE_ACTIVITY_TASK_COMPLETED:
1080
        return event.getActivityTaskCompletedEventAttributes().getScheduledEventId();
1✔
1081
      case EVENT_TYPE_ACTIVITY_TASK_FAILED:
1082
        return event.getActivityTaskFailedEventAttributes().getScheduledEventId();
1✔
1083
      case EVENT_TYPE_ACTIVITY_TASK_TIMED_OUT:
1084
        return event.getActivityTaskTimedOutEventAttributes().getScheduledEventId();
1✔
1085
      case EVENT_TYPE_ACTIVITY_TASK_CANCEL_REQUESTED:
1086
        return event.getActivityTaskCancelRequestedEventAttributes().getScheduledEventId();
×
1087
      case EVENT_TYPE_ACTIVITY_TASK_CANCELED:
1088
        return event.getActivityTaskCanceledEventAttributes().getScheduledEventId();
1✔
1089
      case EVENT_TYPE_TIMER_FIRED:
1090
        return event.getTimerFiredEventAttributes().getStartedEventId();
1✔
1091
      case EVENT_TYPE_TIMER_CANCELED:
1092
        return event.getTimerCanceledEventAttributes().getStartedEventId();
×
1093
      case EVENT_TYPE_REQUEST_CANCEL_EXTERNAL_WORKFLOW_EXECUTION_FAILED:
1094
        return event
×
1095
            .getRequestCancelExternalWorkflowExecutionFailedEventAttributes()
×
1096
            .getInitiatedEventId();
×
1097
      case EVENT_TYPE_EXTERNAL_WORKFLOW_EXECUTION_CANCEL_REQUESTED:
1098
        return event
1✔
1099
            .getExternalWorkflowExecutionCancelRequestedEventAttributes()
1✔
1100
            .getInitiatedEventId();
1✔
1101
      case EVENT_TYPE_START_CHILD_WORKFLOW_EXECUTION_FAILED:
1102
        return event.getStartChildWorkflowExecutionFailedEventAttributes().getInitiatedEventId();
1✔
1103
      case EVENT_TYPE_CHILD_WORKFLOW_EXECUTION_STARTED:
1104
        return event.getChildWorkflowExecutionStartedEventAttributes().getInitiatedEventId();
1✔
1105
      case EVENT_TYPE_CHILD_WORKFLOW_EXECUTION_COMPLETED:
1106
        return event.getChildWorkflowExecutionCompletedEventAttributes().getInitiatedEventId();
1✔
1107
      case EVENT_TYPE_CHILD_WORKFLOW_EXECUTION_FAILED:
1108
        return event.getChildWorkflowExecutionFailedEventAttributes().getInitiatedEventId();
1✔
1109
      case EVENT_TYPE_CHILD_WORKFLOW_EXECUTION_CANCELED:
1110
        return event.getChildWorkflowExecutionCanceledEventAttributes().getInitiatedEventId();
1✔
1111
      case EVENT_TYPE_CHILD_WORKFLOW_EXECUTION_TIMED_OUT:
1112
        return event.getChildWorkflowExecutionTimedOutEventAttributes().getInitiatedEventId();
1✔
1113
      case EVENT_TYPE_CHILD_WORKFLOW_EXECUTION_TERMINATED:
1114
        return event.getChildWorkflowExecutionTerminatedEventAttributes().getInitiatedEventId();
×
1115
      case EVENT_TYPE_SIGNAL_EXTERNAL_WORKFLOW_EXECUTION_FAILED:
1116
        return event
1✔
1117
            .getSignalExternalWorkflowExecutionFailedEventAttributes()
1✔
1118
            .getInitiatedEventId();
1✔
1119
      case EVENT_TYPE_EXTERNAL_WORKFLOW_EXECUTION_SIGNALED:
1120
        return event.getExternalWorkflowExecutionSignaledEventAttributes().getInitiatedEventId();
1✔
1121
      case EVENT_TYPE_WORKFLOW_TASK_STARTED:
1122
        return event.getWorkflowTaskStartedEventAttributes().getScheduledEventId();
1✔
1123
      case EVENT_TYPE_WORKFLOW_TASK_COMPLETED:
1124
        return event.getWorkflowTaskCompletedEventAttributes().getScheduledEventId();
1✔
1125
      case EVENT_TYPE_WORKFLOW_TASK_TIMED_OUT:
1126
        return event.getWorkflowTaskTimedOutEventAttributes().getScheduledEventId();
1✔
1127
      case EVENT_TYPE_WORKFLOW_TASK_FAILED:
1128
        return event.getWorkflowTaskFailedEventAttributes().getScheduledEventId();
1✔
1129

1130
      case EVENT_TYPE_ACTIVITY_TASK_SCHEDULED:
1131
      case EVENT_TYPE_TIMER_STARTED:
1132
      case EVENT_TYPE_MARKER_RECORDED:
1133
      case EVENT_TYPE_SIGNAL_EXTERNAL_WORKFLOW_EXECUTION_INITIATED:
1134
      case EVENT_TYPE_START_CHILD_WORKFLOW_EXECUTION_INITIATED:
1135
      case EVENT_TYPE_REQUEST_CANCEL_EXTERNAL_WORKFLOW_EXECUTION_INITIATED:
1136
      case EVENT_TYPE_WORKFLOW_EXECUTION_CONTINUED_AS_NEW:
1137
      case EVENT_TYPE_WORKFLOW_EXECUTION_TERMINATED:
1138
      case EVENT_TYPE_UPSERT_WORKFLOW_SEARCH_ATTRIBUTES:
1139
      case EVENT_TYPE_WORKFLOW_TASK_SCHEDULED:
1140
      case EVENT_TYPE_WORKFLOW_EXECUTION_SIGNALED:
1141
      case EVENT_TYPE_WORKFLOW_EXECUTION_STARTED:
1142
      case EVENT_TYPE_WORKFLOW_EXECUTION_COMPLETED:
1143
      case EVENT_TYPE_WORKFLOW_EXECUTION_FAILED:
1144
      case EVENT_TYPE_WORKFLOW_EXECUTION_TIMED_OUT:
1145
      case EVENT_TYPE_WORKFLOW_EXECUTION_CANCEL_REQUESTED:
1146
      case EVENT_TYPE_WORKFLOW_EXECUTION_CANCELED:
1147
        return event.getEventId();
1✔
1148
      case UNRECOGNIZED:
1149
      case EVENT_TYPE_UNSPECIFIED:
1150
        throw new IllegalArgumentException("Unexpected event type: " + event.getEventType());
×
1151
    }
1152
    throw new IllegalStateException("unreachable");
×
1153
  }
1154

1155
  /**
1156
   * Workflow code executes only while event loop is running. So operations that can be invoked from
1157
   * the workflow have to satisfy this condition.
1158
   */
1159
  private void checkEventLoopExecuting() {
1160
    if (!eventLoopExecuting) {
1✔
1161
      // this call doesn't yield or await, because the await function returns true,
1162
      // but it checks if the workflow thread needs to be destroyed
1163
      WorkflowThread.await("kill workflow thread if destroy requested", () -> true);
×
1164
      throw new IllegalStateException("Operation allowed only while eventLoop is running");
×
1165
    }
1166
  }
1✔
1167

1168
  private String createEventHandlingMessage(HistoryEvent event) {
1169
    return "Failure handling event "
1✔
1170
        + event.getEventId()
1✔
1171
        + " of type '"
1172
        + event.getEventType()
1✔
1173
        + "' "
1174
        + (this.isReplaying() ? "during replay" : "during execution");
1✔
1175
  }
1176

1177
  private String createShortCurrentStateMessagePostfix() {
1178
    return String.format(
1✔
1179
        "{WorkflowTaskStartedEventId=%s, CurrentStartedEventId=%s}",
1180
        this.workflowTaskStartedEventId, this.currentStartedEventId);
1✔
1181
  }
1182
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc