• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

temporalio / sdk-java / #174

pending completion
#174

push

github-actions

web-flow
Add schedules API (#1776)

Add schedules API

1143 of 1143 new or added lines in 35 files covered. (100.0%)

18101 of 23284 relevant lines covered (77.74%)

0.78 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

91.72
/temporal-sdk/src/main/java/io/temporal/internal/statemachines/WorkflowStateMachines.java
1
/*
2
 * Copyright (C) 2022 Temporal Technologies, Inc. All Rights Reserved.
3
 *
4
 * Copyright (C) 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved.
5
 *
6
 * Modifications copyright (C) 2017 Uber Technologies, Inc.
7
 *
8
 * Licensed under the Apache License, Version 2.0 (the "License");
9
 * you may not use this material except in compliance with the License.
10
 * You may obtain a copy of the License at
11
 *
12
 *   http://www.apache.org/licenses/LICENSE-2.0
13
 *
14
 * Unless required by applicable law or agreed to in writing, software
15
 * distributed under the License is distributed on an "AS IS" BASIS,
16
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17
 * See the License for the specific language governing permissions and
18
 * limitations under the License.
19
 */
20

21
package io.temporal.internal.statemachines;
22

23
import static io.temporal.api.enums.v1.CommandType.COMMAND_TYPE_PROTOCOL_MESSAGE;
24
import static io.temporal.internal.common.WorkflowExecutionUtils.getEventTypeForCommand;
25
import static io.temporal.internal.common.WorkflowExecutionUtils.isCommandEvent;
26
import static io.temporal.serviceclient.CheckedExceptionWrapper.unwrap;
27

28
import com.google.common.annotations.VisibleForTesting;
29
import com.google.common.base.Preconditions;
30
import com.google.common.base.Strings;
31
import com.google.protobuf.Any;
32
import io.temporal.api.command.v1.*;
33
import io.temporal.api.common.v1.Payloads;
34
import io.temporal.api.common.v1.SearchAttributes;
35
import io.temporal.api.common.v1.WorkflowExecution;
36
import io.temporal.api.enums.v1.EventType;
37
import io.temporal.api.failure.v1.Failure;
38
import io.temporal.api.history.v1.*;
39
import io.temporal.api.protocol.v1.Message;
40
import io.temporal.failure.CanceledFailure;
41
import io.temporal.internal.common.ProtocolType;
42
import io.temporal.internal.common.ProtocolUtils;
43
import io.temporal.internal.common.WorkflowExecutionUtils;
44
import io.temporal.internal.history.LocalActivityMarkerUtils;
45
import io.temporal.internal.history.VersionMarkerUtils;
46
import io.temporal.internal.sync.WorkflowThread;
47
import io.temporal.internal.worker.LocalActivityResult;
48
import io.temporal.worker.NonDeterministicException;
49
import io.temporal.workflow.ChildWorkflowCancellationType;
50
import io.temporal.workflow.Functions;
51
import java.nio.charset.StandardCharsets;
52
import java.util.*;
53

54
public final class WorkflowStateMachines {
55

56
  enum HandleEventStatus {
1✔
57
    OK,
1✔
58
    NON_MATCHING_EVENT
1✔
59
  }
60

61
  /**
62
   * EventId of the WorkflowTaskStarted event of the Workflow Task that was picked up by a worker
63
   * and triggered a current replay or execution. It's expected to be the last event in the history
64
   * if we continue to execute the workflow.
65
   *
66
   * <p>For direct (legacy) queries, it may be:
67
   *
68
   * <ul>
69
   *   <li>0 if it's query a closed workflow execution
70
   *   <li>id of the last successfully completed Workflow Task if the workflow is not closed
71
   * </ul>
72
   *
73
   * <p>Set from the "outside" from the PollWorkflowTaskQueueResponse. Not modified by the SDK state
74
   * machines.
75
   */
76
  private long workflowTaskStartedEventId;
77

78
  /** EventId of the last WorkflowTaskStarted event handled by these state machines. */
79
  private long currentStartedEventId;
80

81
  /**
82
   * EventId of the last event seen by these state machines. Events earlier than this one will be
83
   * discarded.
84
   */
85
  private long lastHandledEventId;
86

87
  private final StatesMachinesCallback callbacks;
88

89
  /** Callback to send new commands to. */
90
  private final Functions.Proc1<CancellableCommand> commandSink;
91

92
  /**
93
   * currentRunId is used as seed by Workflow.newRandom and randomUUID. It allows to generate them
94
   * deterministically.
95
   */
96
  private String currentRunId;
97

98
  /** Used Workflow.newRandom and randomUUID together with currentRunId. */
99
  private long idCounter;
100

101
  /** Current workflow time. */
102
  private long currentTimeMillis = -1;
1✔
103

104
  private final Map<Long, EntityStateMachine> stateMachines = new HashMap<>();
1✔
105

106
  private final Map<String, EntityStateMachine> protocolStateMachines = new HashMap<>();
1✔
107

108
  private final Queue<Message> messageOutbox = new ArrayDeque<>();
1✔
109

110
  private final Queue<CancellableCommand> commands = new ArrayDeque<>();
1✔
111

112
  /**
113
   * Commands generated by the currently processed workflow task. It is a queue as commands can be
114
   * added (due to marker based commands) while iterating over already added commands.
115
   */
116
  private final Queue<CancellableCommand> cancellableCommands = new ArrayDeque<>();
1✔
117

118
  /**
119
   * Is workflow executing new code or replaying from the history. Note that this flag ALWAYS flips
120
   * to true for the time when we apply events from the server even if the commands were created by
121
   * an actual execution with replaying=false.
122
   */
123
  private boolean replaying;
124

125
  /** Used to ensure that event loop is not executed recursively. */
126
  private boolean eventLoopExecuting;
127

128
  /**
129
   * Used to avoid recursive calls to {@link #prepareCommands()}.
130
   *
131
   * <p>Such calls happen when sideEffects and localActivity markers are processed.
132
   */
133
  private boolean preparing;
134

135
  /** Key is mutable side effect id */
136
  private final Map<String, MutableSideEffectStateMachine> mutableSideEffects = new HashMap<>();
1✔
137

138
  /** Key is changeId */
139
  private final Map<String, VersionStateMachine> versions = new HashMap<>();
1✔
140

141
  /** Map of local activities by their id. */
142
  private final Map<String, LocalActivityStateMachine> localActivityMap = new HashMap<>();
1✔
143

144
  private List<ExecuteLocalActivityParameters> localActivityRequests = new ArrayList<>();
1✔
145

146
  private final Functions.Proc1<ExecuteLocalActivityParameters> localActivityRequestSink;
147
  private final Functions.Proc1<StateMachine> stateMachineSink;
148

149
  private final WFTBuffer wftBuffer = new WFTBuffer();
1✔
150

151
  /** */
152
  private List<Message> messages = new ArrayList<Message>();
1✔
153

154
  public WorkflowStateMachines(StatesMachinesCallback callbacks) {
155
    this(callbacks, (stateMachine) -> {});
1✔
156
  }
1✔
157

158
  @VisibleForTesting
159
  public WorkflowStateMachines(
160
      StatesMachinesCallback callbacks, Functions.Proc1<StateMachine> stateMachineSink) {
1✔
161
    this.callbacks = Objects.requireNonNull(callbacks);
1✔
162
    this.commandSink = cancellableCommands::add;
1✔
163
    this.stateMachineSink = stateMachineSink;
1✔
164
    this.localActivityRequestSink = (request) -> localActivityRequests.add(request);
1✔
165
  }
1✔
166

167
  // TODO revisit and potentially remove workflowTaskStartedEventId at all from the state machines.
168
  // The only place where it's used is WorkflowTaskStateMachine to understand that there will be
169
  // no completion event in the history.
170
  // This is tricky, because direct queries come with 0 as a magic value in this field if
171
  // execution is completed for example.
172
  // Most likely we can rework WorkflowTaskStateMachine to use only hasNext.
173
  /**
174
   * @param workflowTaskStartedEventId eventId of the workflowTask that was picked up by a worker
175
   *     and triggered an execution. Used in {@link WorkflowTaskStateMachine} only to understand
176
   *     that this workflow task will not have a matching closing event and needs to be executed.
177
   */
178
  public void setWorklfowStartedEventId(long workflowTaskStartedEventId) {
179
    this.workflowTaskStartedEventId = workflowTaskStartedEventId;
1✔
180
  }
1✔
181

182
  public void setCurrentStartedEventId(long eventId) {
183
    this.currentStartedEventId = eventId;
×
184
  }
×
185

186
  public long getCurrentStartedEventId() {
187
    return currentStartedEventId;
1✔
188
  }
189

190
  public void setReplaying(boolean replaying) {
191
    this.replaying = replaying;
1✔
192
  }
1✔
193

194
  public void setMessages(List<Message> messages) {
195
    this.messages = messages;
1✔
196
  }
1✔
197

198
  /**
199
   * Handle a single event from the workflow history.
200
   *
201
   * @param event event from the history.
202
   * @param hasNextEvent false if this is the last event in the history.
203
   */
204
  public void handleEvent(HistoryEvent event, boolean hasNextEvent) {
205
    long eventId = event.getEventId();
1✔
206
    if (eventId <= lastHandledEventId) {
1✔
207
      // already handled
208
      return;
1✔
209
    }
210
    Preconditions.checkState(
1✔
211
        eventId == lastHandledEventId + 1,
212
        "History is out of order. "
213
            + "There is a gap between the last event workflow state machine observed and currently handling event. "
214
            + "Last processed eventId: %s, handling eventId: %s",
215
        lastHandledEventId,
216
        eventId);
217

218
    lastHandledEventId = eventId;
1✔
219
    boolean readyToPeek = wftBuffer.addEvent(event, hasNextEvent);
1✔
220
    if (readyToPeek) {
1✔
221
      handleEventsBatch(wftBuffer.fetch(), hasNextEvent);
1✔
222
    }
223
  }
1✔
224

225
  /**
226
   * Handle an events batch for one workflow task. Events that are related to one workflow task
227
   * during replay should be prefetched and supplied in one batch.
228
   *
229
   * @param events events belong to one workflow task
230
   * @param hasNextEvent true if there are more events in the history follow this batch, false if
231
   *     this batch contains the last events of the history
232
   */
233
  private void handleEventsBatch(List<HistoryEvent> events, boolean hasNextEvent) {
234
    if (EventType.EVENT_TYPE_WORKFLOW_TASK_STARTED.equals(events.get(0).getEventType())) {
1✔
235
      for (HistoryEvent event : events) {
1✔
236
        try {
237
          preloadVersionMarker(event);
1✔
238
        } catch (RuntimeException e) {
1✔
239
          throw createEventProcessingException(e, event);
1✔
240
        }
1✔
241
      }
1✔
242
    }
243

244
    // Look ahead to infer protocol messages
245
    if (EventType.EVENT_TYPE_WORKFLOW_TASK_STARTED.equals(events.get(0).getEventType())) {
1✔
246
      if (this.isReplaying()) {
1✔
247
        for (HistoryEvent event : events) {
1✔
248
          if (event.hasWorkflowExecutionUpdateAcceptedEventAttributes()) {
1✔
249
            WorkflowExecutionUpdateAcceptedEventAttributes updateEvent =
1✔
250
                event.getWorkflowExecutionUpdateAcceptedEventAttributes();
1✔
251
            this.messages.add(
1✔
252
                Message.newBuilder()
1✔
253
                    .setId(updateEvent.getAcceptedRequestMessageId())
1✔
254
                    .setProtocolInstanceId(updateEvent.getProtocolInstanceId())
1✔
255
                    .setEventId(updateEvent.getAcceptedRequestSequencingEventId())
1✔
256
                    .setBody(Any.pack(updateEvent.getAcceptedRequest()))
1✔
257
                    .build());
1✔
258
          }
259
        }
1✔
260
      }
261
    }
262

263
    for (Iterator<HistoryEvent> iterator = events.iterator(); iterator.hasNext(); ) {
1✔
264
      HistoryEvent event = iterator.next();
1✔
265

266
      // On replay the messages are available after the workflow task schedule event, so we
267
      // need to handle them before workflow task started event to maintain a consistent order.
268
      for (Message msg : this.takeLTE(event.getEventId() - 1)) {
1✔
269
        handleSingleMessage(msg);
1✔
270
      }
1✔
271

272
      try {
273
        handleSingleEvent(event, iterator.hasNext() || hasNextEvent);
1✔
274
      } catch (RuntimeException e) {
1✔
275
        throw createEventProcessingException(e, event);
1✔
276
      }
1✔
277

278
      for (Message msg : this.takeLTE(event.getEventId())) {
1✔
279
        handleSingleMessage(msg);
1✔
280
      }
1✔
281
    }
1✔
282
  }
1✔
283

284
  private List<Message> takeLTE(long eventId) {
285
    List<Message> m = new ArrayList<Message>();
1✔
286
    List<Message> remainingMessages = new ArrayList<Message>();
1✔
287
    for (Message msg : this.messages) {
1✔
288
      if (msg.getEventId() > eventId) {
1✔
289
        remainingMessages.add(msg);
1✔
290
      } else {
291
        m.add(msg);
1✔
292
      }
293
    }
1✔
294
    this.messages = remainingMessages;
1✔
295
    return m;
1✔
296
  }
297

298
  private RuntimeException createEventProcessingException(RuntimeException e, HistoryEvent event) {
299
    Throwable ex = unwrap(e);
1✔
300
    if (ex instanceof NonDeterministicException) {
1✔
301
      // just appending the message in front of an existing message, saving the original stacktrace
302
      NonDeterministicException modifiedException =
1✔
303
          new NonDeterministicException(
304
              createEventHandlingMessage(event)
1✔
305
                  + ". "
306
                  + ex.getMessage()
1✔
307
                  + ". "
308
                  + createShortCurrentStateMessagePostfix(),
1✔
309
              ex.getCause());
1✔
310
      modifiedException.setStackTrace(ex.getStackTrace());
1✔
311
      return modifiedException;
1✔
312
    } else {
313
      return new InternalWorkflowTaskException(
1✔
314
          createEventHandlingMessage(event) + ". " + createShortCurrentStateMessagePostfix(), ex);
1✔
315
    }
316
  }
317

318
  private void handleSingleMessage(Message message) {
319
    // Get or create protocol state machine based on Instance ID and protocolName
320
    EntityStateMachine stateMachine =
1✔
321
        protocolStateMachines.computeIfAbsent(
1✔
322
            message.getProtocolInstanceId(),
1✔
323
            (protocolInstance) -> {
324
              String protocolName = ProtocolUtils.getProtocol(message);
1✔
325
              Optional<ProtocolType> type = ProtocolType.get(protocolName);
1✔
326
              if (type.isPresent()) {
1✔
327
                switch (type.get()) {
1✔
328
                  case UPDATE_V1:
329
                    return UpdateProtocolStateMachine.newInstance(
1✔
330
                        this::isReplaying,
331
                        callbacks::update,
1✔
332
                        this::sendMessage,
333
                        commandSink,
334
                        stateMachineSink);
335
                  default:
336
                    throw new IllegalArgumentException("Unknown protocol type:" + protocolName);
×
337
                }
338
              }
339
              throw new IllegalArgumentException("Protocol type not specified:" + message);
×
340
            });
341
    stateMachine.handleMessage(message);
1✔
342
  }
1✔
343

344
  private void handleSingleEvent(HistoryEvent event, boolean hasNextEvent) {
345
    if (isCommandEvent(event)) {
1✔
346
      handleCommandEvent(event);
1✔
347
      return;
1✔
348
    }
349

350
    if (replaying
1✔
351
        && !hasNextEvent
352
        && (event.getEventType() == EventType.EVENT_TYPE_WORKFLOW_TASK_STARTED
1✔
353
            || WorkflowExecutionUtils.isWorkflowTaskClosedEvent(event))) {
1✔
354
      replaying = false;
1✔
355
    }
356

357
    Long initialCommandEventId = getInitialCommandEventId(event);
1✔
358
    EntityStateMachine c = stateMachines.get(initialCommandEventId);
1✔
359
    if (c != null) {
1✔
360
      c.handleEvent(event, hasNextEvent);
1✔
361
      if (c.isFinalState()) {
1✔
362
        stateMachines.remove(initialCommandEventId);
1✔
363
      }
364
    } else {
365
      handleNonStatefulEvent(event, hasNextEvent);
1✔
366
    }
367
  }
1✔
368

369
  /**
370
   * Handles command event. Command event is an event which is generated from a command emitted by a
371
   * past decision. Each command has a correspondent event. For example ScheduleActivityTaskCommand
372
   * is recorded to the history as ActivityTaskScheduledEvent.
373
   *
374
   * <p>Command events always follow WorkflowTaskCompletedEvent.
375
   *
376
   * <p>The handling consists from verifying that the next command in the commands queue matches the
377
   * event, command state machine is notified about the event and the command is removed from the
378
   * commands queue.
379
   */
380
  private void handleCommandEvent(HistoryEvent event) {
381
    if (handleLocalActivityMarker(event)) {
1✔
382
      return;
1✔
383
    }
384

385
    // Match event to the next command in the stateMachine queue.
386
    // After matching the command is notified about the event and is removed from the
387
    // queue.
388
    CancellableCommand matchingCommand = null;
1✔
389
    while (matchingCommand == null) {
1✔
390
      // handleVersionMarker can skip a marker event if the getVersion call was removed.
391
      // In this case we don't want to consume a command.
392
      // That's why peek is used instead of poll.
393
      CancellableCommand command = commands.peek();
1✔
394
      if (command == null) {
1✔
395
        if (handleNonMatchingVersionMarker(event)) {
×
396
          // this event is a version marker for removed getVersion call.
397
          // Handle the version marker as unmatched and return even if there is no commands to match
398
          // it against.
399
          return;
×
400
        } else {
401
          throw new NonDeterministicException("No command scheduled that corresponds to " + event);
×
402
        }
403
      }
404

405
      if (command.isCanceled()) {
1✔
406
        // Consume and skip the command
407
        commands.poll();
1✔
408
        continue;
1✔
409
      }
410

411
      // Note that handleEvent can cause a command cancellation in case of
412
      // 1. MutableSideEffect
413
      // 2. Version State Machine during replay cancels the command and enters SKIPPED state
414
      //    if it handled non-matching event.
415
      HandleEventStatus status = command.handleEvent(event, true);
1✔
416

417
      if (command.isCanceled()) {
1✔
418
        // Consume and skip the command
419
        commands.poll();
1✔
420
        continue;
1✔
421
      }
422

423
      switch (status) {
1✔
424
        case OK:
425
          // Consume the command
426
          commands.poll();
1✔
427
          matchingCommand = command;
1✔
428
          break;
1✔
429
        case NON_MATCHING_EVENT:
430
          if (handleNonMatchingVersionMarker(event)) {
1✔
431
            // this event is a version marker for removed getVersion call.
432
            // Handle the version marker as unmatched and return without consuming the command
433
            return;
1✔
434
          } else {
435
            throw new NonDeterministicException(
1✔
436
                "Event "
437
                    + event.getEventId()
1✔
438
                    + " of type "
439
                    + event.getEventType()
1✔
440
                    + " does not"
441
                    + " match command type "
442
                    + command.getCommandType());
1✔
443
          }
444
        default:
445
          throw new IllegalStateException(
×
446
              "Got " + status + " value from command.handleEvent which is not handled");
447
      }
448
    }
1✔
449

450
    validateCommand(matchingCommand.getCommand(), event);
1✔
451
    EntityStateMachine stateMachine = matchingCommand.getStateMachine();
1✔
452
    if (!stateMachine.isFinalState()) {
1✔
453
      stateMachines.put(event.getEventId(), stateMachine);
1✔
454
    }
455
    // Marker is the only command processing of which can cause workflow code execution
456
    // and generation of new state machines.
457
    if (event.getEventType() == EventType.EVENT_TYPE_MARKER_RECORDED) {
1✔
458
      prepareCommands();
1✔
459
    }
460
  }
1✔
461

462
  private void preloadVersionMarker(HistoryEvent event) {
463
    if (VersionMarkerUtils.hasVersionMarkerStructure(event)) {
1✔
464
      String changeId = VersionMarkerUtils.tryGetChangeIdFromVersionMarkerEvent(event);
1✔
465
      if (changeId == null) {
1✔
466
        // if we can't extract changeId, this event will later fail to match with anything
467
        // and the corresponded exception will be thrown
468
        return;
×
469
      }
470
      VersionStateMachine versionStateMachine =
1✔
471
          versions.computeIfAbsent(
1✔
472
              changeId,
473
              (idKey) ->
474
                  VersionStateMachine.newInstance(
1✔
475
                      changeId, this::isReplaying, commandSink, stateMachineSink));
476
      versionStateMachine.handleMarkersPreload(event);
1✔
477
    }
478
  }
1✔
479

480
  private boolean handleNonMatchingVersionMarker(HistoryEvent event) {
481
    String changeId = VersionMarkerUtils.tryGetChangeIdFromVersionMarkerEvent(event);
1✔
482
    if (changeId == null) {
1✔
483
      return false;
1✔
484
    }
485
    VersionStateMachine versionStateMachine = versions.get(changeId);
1✔
486
    Preconditions.checkNotNull(
1✔
487
        versionStateMachine,
488
        "versionStateMachine is expected to be initialized already by execution or preloading");
489
    versionStateMachine.handleNonMatchingEvent(event);
1✔
490
    return true;
1✔
491
  }
492

493
  public List<Command> takeCommands() {
494
    List<Command> result = new ArrayList<>(commands.size());
1✔
495
    for (CancellableCommand command : commands) {
1✔
496
      if (!command.isCanceled()) {
1✔
497
        result.add(command.getCommand());
1✔
498
      }
499
    }
1✔
500
    return result;
1✔
501
  }
502

503
  public void sendMessage(Message message) {
504
    checkEventLoopExecuting();
1✔
505
    if (!isReplaying()) {
1✔
506
      messageOutbox.add(message);
1✔
507
    }
508
  }
1✔
509

510
  public List<Message> takeMessages() {
511
    List<Message> result = new ArrayList<>(messageOutbox.size());
1✔
512
    for (Message message : messageOutbox) {
1✔
513
      result.add(message);
1✔
514
    }
1✔
515
    messageOutbox.clear();
1✔
516
    return result;
1✔
517
  }
518

519
  private void prepareCommands() {
520
    if (preparing) {
1✔
521
      return;
1✔
522
    }
523
    preparing = true;
1✔
524
    try {
525
      prepareImpl();
1✔
526
    } finally {
527
      preparing = false;
1✔
528
    }
529
  }
1✔
530

531
  private void prepareImpl() {
532
    // handleCommand can lead to code execution because of SideEffect, MutableSideEffect or local
533
    // activity completion. And code execution can lead to creation of new commands and
534
    // cancellation of existing commands. That is the reason for using Queue as a data structure for
535
    // commands.
536
    while (true) {
537
      CancellableCommand command = cancellableCommands.poll();
1✔
538
      if (command == null) {
1✔
539
        break;
1✔
540
      }
541
      // handleCommand should be called even on canceled ones to support mutableSideEffect
542
      command.handleCommand(command.getCommandType());
1✔
543
      commands.add(command);
1✔
544
    }
1✔
545
  }
1✔
546

547
  /**
548
   * Local activity is different from all other entities. It doesn't schedule a marker command when
549
   * the {@link #scheduleLocalActivityTask(ExecuteLocalActivityParameters, Functions.Proc2)} is
550
   * called. The marker is scheduled only when activity completes through ({@link
551
   * #handleLocalActivityCompletion(LocalActivityResult)}). That's why the normal logic of {@link
552
   * #handleCommandEvent(HistoryEvent)}, which assumes that each event has a correspondent command
553
   * during replay, doesn't work. Instead, local activities are matched by their id using
554
   * localActivityMap.
555
   *
556
   * @return true if matched and false if normal event handling should continue.
557
   */
558
  private boolean handleLocalActivityMarker(HistoryEvent event) {
559
    if (!LocalActivityMarkerUtils.hasLocalActivityStructure(event)) {
1✔
560
      return false;
1✔
561
    }
562

563
    MarkerRecordedEventAttributes markerAttributes = event.getMarkerRecordedEventAttributes();
1✔
564
    String id = LocalActivityMarkerUtils.getActivityId(markerAttributes);
1✔
565
    LocalActivityStateMachine stateMachine = localActivityMap.remove(id);
1✔
566
    if (stateMachine == null) {
1✔
567
      String activityType = LocalActivityMarkerUtils.getActivityTypeName(markerAttributes);
×
568
      throw new NonDeterministicException(
×
569
          String.format(
×
570
              "Local activity of type %s is recorded in the history with id %s but was not expected by the execution",
571
              activityType, id));
572
    }
573
    // RESULT_NOTIFIED state means that there is outstanding command that has to be matched
574
    // using standard logic. So return false to let the handleCommand method to run its standard
575
    // logic.
576
    if (stateMachine.getState() == LocalActivityStateMachine.State.RESULT_NOTIFIED) {
1✔
577
      return false;
1✔
578
    }
579
    stateMachine.handleEvent(event, true);
1✔
580
    eventLoop();
1✔
581
    return true;
1✔
582
  }
583

584
  private void eventLoop() {
585
    if (eventLoopExecuting) {
1✔
586
      return;
1✔
587
    }
588
    eventLoopExecuting = true;
1✔
589
    try {
590
      callbacks.eventLoop();
1✔
591
    } finally {
592
      eventLoopExecuting = false;
1✔
593
    }
594
    prepareCommands();
1✔
595
  }
1✔
596

597
  private void handleNonStatefulEvent(HistoryEvent event, boolean hasNextEvent) {
598
    switch (event.getEventType()) {
1✔
599
      case EVENT_TYPE_WORKFLOW_EXECUTION_STARTED:
600
        this.currentRunId =
1✔
601
            event.getWorkflowExecutionStartedEventAttributes().getOriginalExecutionRunId();
1✔
602
        callbacks.start(event);
1✔
603
        break;
1✔
604
      case EVENT_TYPE_WORKFLOW_TASK_SCHEDULED:
605
        WorkflowTaskStateMachine c =
1✔
606
            WorkflowTaskStateMachine.newInstance(
1✔
607
                workflowTaskStartedEventId, new WorkflowTaskCommandsListener());
608
        c.handleEvent(event, hasNextEvent);
1✔
609
        stateMachines.put(event.getEventId(), c);
1✔
610
        break;
1✔
611
      case EVENT_TYPE_WORKFLOW_EXECUTION_SIGNALED:
612
        callbacks.signal(event);
1✔
613
        break;
1✔
614
      case EVENT_TYPE_WORKFLOW_EXECUTION_CANCEL_REQUESTED:
615
        callbacks.cancel(event);
1✔
616
        break;
1✔
617
      case EVENT_TYPE_WORKFLOW_EXECUTION_TIMED_OUT:
618
      case UNRECOGNIZED:
619
        break;
1✔
620
      default:
621
        throw new IllegalArgumentException("Unexpected event:" + event);
×
622
    }
623
  }
1✔
624

625
  private long setCurrentTimeMillis(long currentTimeMillis) {
626
    if (this.currentTimeMillis < currentTimeMillis) {
1✔
627
      this.currentTimeMillis = currentTimeMillis;
1✔
628
    }
629
    return this.currentTimeMillis;
1✔
630
  }
631

632
  public long getLastStartedEventId() {
633
    return currentStartedEventId;
1✔
634
  }
635

636
  /**
637
   * @param attributes attributes used to schedule an activity
638
   * @param callback completion callback
639
   * @return an instance of ActivityCommands
640
   */
641
  public Functions.Proc scheduleActivityTask(
642
      ExecuteActivityParameters attributes, Functions.Proc2<Optional<Payloads>, Failure> callback) {
643
    checkEventLoopExecuting();
1✔
644
    ActivityStateMachine activityStateMachine =
1✔
645
        ActivityStateMachine.newInstance(
1✔
646
            attributes,
647
            (p, f) -> {
648
              Failure failure = f != null ? f.getFailure() : null;
1✔
649
              callback.apply(p, failure);
1✔
650

651
              if (f != null
1✔
652
                  && !f.isFromEvent()
1✔
653
                  && failure.hasCause()
1✔
654
                  && failure.getCause().hasCanceledFailureInfo()) {
1✔
655
                // If !f.isFromEvent(), we want to unblock the event loop as the promise got filled
656
                // and the workflow may make progress. If f.isFromEvent(), we need to delay event
657
                // loop triggering until WorkflowTaskStarted.
658
                eventLoop();
1✔
659
              }
660
            },
1✔
661
            commandSink,
662
            stateMachineSink);
663
    return activityStateMachine::cancel;
1✔
664
  }
665

666
  /**
667
   * Creates a new timer state machine
668
   *
669
   * @param attributes timer command attributes
670
   * @param completionCallback invoked when timer fires or reports cancellation. One of
671
   *     TimerFiredEvent, TimerCanceledEvent.
672
   * @return cancellation callback that should be invoked to initiate timer cancellation
673
   */
674
  public Functions.Proc newTimer(
675
      StartTimerCommandAttributes attributes, Functions.Proc1<HistoryEvent> completionCallback) {
676
    checkEventLoopExecuting();
1✔
677
    TimerStateMachine timer =
1✔
678
        TimerStateMachine.newInstance(
1✔
679
            attributes,
680
            (event) -> {
681
              completionCallback.apply(event);
1✔
682
              // Needed due to immediate cancellation
683
              if (event.getEventType() == EventType.EVENT_TYPE_TIMER_CANCELED) {
1✔
684
                eventLoop();
1✔
685
              }
686
            },
1✔
687
            commandSink,
688
            stateMachineSink);
689
    return timer::cancel;
1✔
690
  }
691

692
  /**
693
   * Creates a new child state machine
694
   *
695
   * @param parameters child workflow start command parameters
696
   * @param startedCallback callback that is notified about child start
697
   * @param completionCallback invoked when child reports completion or failure
698
   * @return cancellation callback that should be invoked to cancel the child
699
   */
700
  public Functions.Proc startChildWorkflow(
701
      StartChildWorkflowExecutionParameters parameters,
702
      Functions.Proc2<WorkflowExecution, Exception> startedCallback,
703
      Functions.Proc2<Optional<Payloads>, Exception> completionCallback) {
704
    checkEventLoopExecuting();
1✔
705
    StartChildWorkflowExecutionCommandAttributes attributes = parameters.getRequest().build();
1✔
706
    ChildWorkflowCancellationType cancellationType = parameters.getCancellationType();
1✔
707
    ChildWorkflowStateMachine child =
1✔
708
        ChildWorkflowStateMachine.newInstance(
1✔
709
            attributes, startedCallback, completionCallback, commandSink, stateMachineSink);
710
    return () -> {
1✔
711
      if (cancellationType == ChildWorkflowCancellationType.ABANDON) {
1✔
712
        notifyChildCanceled(completionCallback);
1✔
713
        return;
1✔
714
      }
715
      // The only time child can be canceled directly is before its start command
716
      // was sent out to the service. After that RequestCancelExternal should be used.
717
      if (child.isCancellable()) {
1✔
718
        child.cancel();
1✔
719
        return;
1✔
720
      }
721
      if (!child.isFinalState()) {
1✔
722
        requestCancelExternalWorkflowExecution(
1✔
723
            RequestCancelExternalWorkflowExecutionCommandAttributes.newBuilder()
1✔
724
                .setWorkflowId(attributes.getWorkflowId())
1✔
725
                .setNamespace(attributes.getNamespace())
1✔
726
                .setChildWorkflowOnly(true)
1✔
727
                .build(),
1✔
728
            (r, e) -> { // TODO(maxim): Decide what to do if an error is passed to the callback.
729
              if (cancellationType == ChildWorkflowCancellationType.WAIT_CANCELLATION_REQUESTED) {
1✔
730
                notifyChildCanceled(completionCallback);
1✔
731
              }
732
            });
1✔
733
        if (cancellationType == ChildWorkflowCancellationType.TRY_CANCEL) {
1✔
734
          notifyChildCanceled(completionCallback);
1✔
735
        }
736
      }
737
    };
1✔
738
  }
739

740
  private void notifyChildCanceled(
741
      Functions.Proc2<Optional<Payloads>, Exception> completionCallback) {
742
    CanceledFailure failure = new CanceledFailure("Child canceled");
1✔
743
    completionCallback.apply(Optional.empty(), failure);
1✔
744
    eventLoop();
1✔
745
  }
1✔
746

747
  /**
748
   * @param attributes
749
   * @param completionCallback invoked when signal delivery completes of fails. The following types
750
   */
751
  public Functions.Proc signalExternalWorkflowExecution(
752
      SignalExternalWorkflowExecutionCommandAttributes attributes,
753
      Functions.Proc2<Void, Failure> completionCallback) {
754
    checkEventLoopExecuting();
1✔
755
    return SignalExternalStateMachine.newInstance(
1✔
756
        attributes, completionCallback, commandSink, stateMachineSink);
757
  }
758

759
  /**
760
   * @param attributes attributes to use to cancel external workflow
761
   * @param completionCallback one of ExternalWorkflowExecutionCancelRequestedEvent,
762
   */
763
  public void requestCancelExternalWorkflowExecution(
764
      RequestCancelExternalWorkflowExecutionCommandAttributes attributes,
765
      Functions.Proc2<Void, RuntimeException> completionCallback) {
766
    checkEventLoopExecuting();
1✔
767
    CancelExternalStateMachine.newInstance(
1✔
768
        attributes, completionCallback, commandSink, stateMachineSink);
769
  }
1✔
770

771
  public void upsertSearchAttributes(SearchAttributes attributes) {
772
    checkEventLoopExecuting();
1✔
773
    UpsertSearchAttributesStateMachine.newInstance(attributes, commandSink, stateMachineSink);
1✔
774
  }
1✔
775

776
  public void completeWorkflow(Optional<Payloads> workflowOutput) {
777
    checkEventLoopExecuting();
1✔
778
    CompleteWorkflowStateMachine.newInstance(workflowOutput, commandSink, stateMachineSink);
1✔
779
  }
1✔
780

781
  public void failWorkflow(Failure failure) {
782
    checkEventLoopExecuting();
1✔
783
    FailWorkflowStateMachine.newInstance(failure, commandSink, stateMachineSink);
1✔
784
  }
1✔
785

786
  public void cancelWorkflow() {
787
    checkEventLoopExecuting();
1✔
788
    CancelWorkflowStateMachine.newInstance(
1✔
789
        CancelWorkflowExecutionCommandAttributes.getDefaultInstance(),
1✔
790
        commandSink,
791
        stateMachineSink);
792
  }
1✔
793

794
  public void continueAsNewWorkflow(ContinueAsNewWorkflowExecutionCommandAttributes attributes) {
795
    checkEventLoopExecuting();
1✔
796
    ContinueAsNewWorkflowStateMachine.newInstance(attributes, commandSink, stateMachineSink);
1✔
797
  }
1✔
798

799
  public boolean isReplaying() {
800
    return replaying;
1✔
801
  }
802

803
  public long currentTimeMillis() {
804
    return currentTimeMillis;
1✔
805
  }
806

807
  public UUID randomUUID() {
808
    checkEventLoopExecuting();
1✔
809
    String runId = currentRunId;
1✔
810
    if (runId == null) {
1✔
811
      throw new Error("null currentRunId");
×
812
    }
813
    String id = runId + ":" + idCounter++;
1✔
814
    byte[] bytes = id.getBytes(StandardCharsets.UTF_8);
1✔
815
    return UUID.nameUUIDFromBytes(bytes);
1✔
816
  }
817

818
  public Random newRandom() {
819
    checkEventLoopExecuting();
1✔
820
    return new Random(randomUUID().getLeastSignificantBits());
1✔
821
  }
822

823
  public void sideEffect(
824
      Functions.Func<Optional<Payloads>> func, Functions.Proc1<Optional<Payloads>> callback) {
825
    checkEventLoopExecuting();
1✔
826
    SideEffectStateMachine.newInstance(
1✔
827
        this::isReplaying,
828
        func,
829
        (payloads) -> {
830
          callback.apply(payloads);
1✔
831
          // callback unblocked sideEffect call. Give workflow code chance to make progress.
832
          eventLoop();
1✔
833
        },
1✔
834
        commandSink,
835
        stateMachineSink);
836
  }
1✔
837

838
  /**
839
   * @param id mutable side effect id
840
   * @param func given the value from the last marker returns value to store. If result is empty
841
   *     nothing is recorded into the history.
842
   * @param callback used to report result or failure
843
   */
844
  public void mutableSideEffect(
845
      String id,
846
      Functions.Func1<Optional<Payloads>, Optional<Payloads>> func,
847
      Functions.Proc1<Optional<Payloads>> callback) {
848
    checkEventLoopExecuting();
1✔
849
    MutableSideEffectStateMachine stateMachine =
1✔
850
        mutableSideEffects.computeIfAbsent(
1✔
851
            id,
852
            (idKey) ->
853
                MutableSideEffectStateMachine.newInstance(
1✔
854
                    idKey, this::isReplaying, commandSink, stateMachineSink));
855
    stateMachine.mutableSideEffect(
1✔
856
        func,
857
        (r) -> {
858
          callback.apply(r);
1✔
859
          // callback unblocked mutableSideEffect call. Give workflow code chance to make progress.
860
          eventLoop();
1✔
861
        },
1✔
862
        stateMachineSink);
863
  }
1✔
864

865
  public void getVersion(
866
      String changeId,
867
      int minSupported,
868
      int maxSupported,
869
      Functions.Proc2<Integer, RuntimeException> callback) {
870
    VersionStateMachine stateMachine =
1✔
871
        versions.computeIfAbsent(
1✔
872
            changeId,
873
            (idKey) ->
874
                VersionStateMachine.newInstance(
1✔
875
                    changeId, this::isReplaying, commandSink, stateMachineSink));
876
    stateMachine.getVersion(
1✔
877
        minSupported,
878
        maxSupported,
879
        (v, e) -> {
880
          callback.apply(v, e);
1✔
881
          // without this getVersion call will trigger the end of WFT,
882
          // instead we want to prepare subsequent commands and unblock the execution one more
883
          // time.
884
          eventLoop();
1✔
885
        });
1✔
886
  }
1✔
887

888
  public List<ExecuteLocalActivityParameters> takeLocalActivityRequests() {
889
    List<ExecuteLocalActivityParameters> result = localActivityRequests;
1✔
890
    localActivityRequests = new ArrayList<>();
1✔
891
    for (ExecuteLocalActivityParameters parameters : result) {
1✔
892
      LocalActivityStateMachine stateMachine = localActivityMap.get(parameters.getActivityId());
1✔
893
      stateMachine.markAsSent();
1✔
894
    }
1✔
895
    return result;
1✔
896
  }
897

898
  public void handleLocalActivityCompletion(LocalActivityResult laCompletion) {
899
    String activityId = laCompletion.getActivityId();
1✔
900
    LocalActivityStateMachine laStateMachine = localActivityMap.get(activityId);
1✔
901
    if (laStateMachine == null) {
1✔
902
      throw new IllegalStateException("Unknown local activity: " + activityId);
×
903
    }
904
    laStateMachine.handleCompletion(laCompletion);
1✔
905
    prepareCommands();
1✔
906
  }
1✔
907

908
  public Functions.Proc scheduleLocalActivityTask(
909
      ExecuteLocalActivityParameters parameters,
910
      Functions.Proc2<Optional<Payloads>, LocalActivityCallback.LocalActivityFailedException>
911
          callback) {
912
    checkEventLoopExecuting();
1✔
913
    String activityId = parameters.getActivityId();
1✔
914
    if (Strings.isNullOrEmpty(activityId)) {
1✔
915
      throw new IllegalArgumentException("Missing activityId: " + activityId);
×
916
    }
917
    if (localActivityMap.containsKey(activityId)) {
1✔
918
      throw new IllegalArgumentException("Duplicated local activity id: " + activityId);
×
919
    }
920
    LocalActivityStateMachine commands =
1✔
921
        LocalActivityStateMachine.newInstance(
1✔
922
            this::isReplaying,
923
            this::setCurrentTimeMillis,
924
            parameters,
925
            (r, e) -> {
926
              callback.apply(r, e);
1✔
927
              // callback unblocked local activity call. Give workflow code chance to make progress.
928
              eventLoop();
1✔
929
            },
1✔
930
            localActivityRequestSink,
931
            commandSink,
932
            stateMachineSink,
933
            currentTimeMillis);
934
    localActivityMap.put(activityId, commands);
1✔
935
    return commands::cancel;
1✔
936
  }
937

938
  /** Validates that command matches the event during replay. */
939
  private void validateCommand(Command command, HistoryEvent event) {
940
    // TODO(maxim): Add more thorough validation logic. For example check if activity IDs are
941
    // matching.
942

943
    // ProtocolMessageCommand is different from other commands because it can be associated with
944
    // multiple types of events
945
    // TODO(#1781) Validate protocol message is expected type.
946
    if (command.getCommandType() == COMMAND_TYPE_PROTOCOL_MESSAGE) {
1✔
947
      ProtocolMessageCommandAttributes commandAttributes =
1✔
948
          command.getProtocolMessageCommandAttributes();
1✔
949
      switch (event.getEventType()) {
1✔
950
        case EVENT_TYPE_WORKFLOW_EXECUTION_UPDATE_ACCEPTED:
951
          assertMatch(
1✔
952
              command,
953
              event,
954
              "messageType",
955
              true,
1✔
956
              commandAttributes.getMessageId().endsWith("accept"));
1✔
957
          break;
1✔
958
        case EVENT_TYPE_WORKFLOW_EXECUTION_UPDATE_REJECTED:
959
          assertMatch(
×
960
              command,
961
              event,
962
              "messageType",
963
              true,
×
964
              commandAttributes.getMessageId().endsWith("reject"));
×
965
          break;
×
966
        case EVENT_TYPE_WORKFLOW_EXECUTION_UPDATE_COMPLETED:
967
          assertMatch(
1✔
968
              command,
969
              event,
970
              "messageType",
971
              true,
1✔
972
              commandAttributes.getMessageId().endsWith("complete"));
1✔
973
          break;
1✔
974
        default:
975
          throw new IllegalArgumentException("Unexpected event type: " + event.getEventType());
×
976
      }
977
      return;
1✔
978
    }
979
    assertMatch(
1✔
980
        command,
981
        event,
982
        "eventType",
983
        getEventTypeForCommand(command.getCommandType()),
1✔
984
        event.getEventType());
1✔
985
    switch (command.getCommandType()) {
1✔
986
      case COMMAND_TYPE_SCHEDULE_ACTIVITY_TASK:
987
        {
988
          ScheduleActivityTaskCommandAttributes commandAttributes =
1✔
989
              command.getScheduleActivityTaskCommandAttributes();
1✔
990
          ActivityTaskScheduledEventAttributes eventAttributes =
1✔
991
              event.getActivityTaskScheduledEventAttributes();
1✔
992
          assertMatch(
1✔
993
              command,
994
              event,
995
              "activityId",
996
              commandAttributes.getActivityId(),
1✔
997
              eventAttributes.getActivityId());
1✔
998
          assertMatch(
1✔
999
              command,
1000
              event,
1001
              "activityType",
1002
              commandAttributes.getActivityType(),
1✔
1003
              eventAttributes.getActivityType());
1✔
1004
        }
1005
        break;
1✔
1006
      case COMMAND_TYPE_START_CHILD_WORKFLOW_EXECUTION:
1007
        {
1008
          StartChildWorkflowExecutionCommandAttributes commandAttributes =
1✔
1009
              command.getStartChildWorkflowExecutionCommandAttributes();
1✔
1010
          StartChildWorkflowExecutionInitiatedEventAttributes eventAttributes =
1✔
1011
              event.getStartChildWorkflowExecutionInitiatedEventAttributes();
1✔
1012
          assertMatch(
1✔
1013
              command,
1014
              event,
1015
              "workflowId",
1016
              commandAttributes.getWorkflowId(),
1✔
1017
              eventAttributes.getWorkflowId());
1✔
1018
          assertMatch(
1✔
1019
              command,
1020
              event,
1021
              "workflowType",
1022
              commandAttributes.getWorkflowType(),
1✔
1023
              eventAttributes.getWorkflowType());
1✔
1024
        }
1025
        break;
1✔
1026
      case COMMAND_TYPE_REQUEST_CANCEL_ACTIVITY_TASK:
1027
      case COMMAND_TYPE_START_TIMER:
1028
        {
1029
          StartTimerCommandAttributes commandAttributes = command.getStartTimerCommandAttributes();
1✔
1030
          TimerStartedEventAttributes eventAttributes = event.getTimerStartedEventAttributes();
1✔
1031
          assertMatch(
1✔
1032
              command,
1033
              event,
1034
              "timerId",
1035
              commandAttributes.getTimerId(),
1✔
1036
              eventAttributes.getTimerId());
1✔
1037
        }
1038
        break;
1✔
1039
      case COMMAND_TYPE_CANCEL_TIMER:
1040
      case COMMAND_TYPE_CANCEL_WORKFLOW_EXECUTION:
1041
      case COMMAND_TYPE_REQUEST_CANCEL_EXTERNAL_WORKFLOW_EXECUTION:
1042
      case COMMAND_TYPE_RECORD_MARKER:
1043
      case COMMAND_TYPE_CONTINUE_AS_NEW_WORKFLOW_EXECUTION:
1044
      case COMMAND_TYPE_SIGNAL_EXTERNAL_WORKFLOW_EXECUTION:
1045
      case COMMAND_TYPE_UPSERT_WORKFLOW_SEARCH_ATTRIBUTES:
1046
      case COMMAND_TYPE_COMPLETE_WORKFLOW_EXECUTION:
1047
      case COMMAND_TYPE_FAIL_WORKFLOW_EXECUTION:
1048
      case COMMAND_TYPE_PROTOCOL_MESSAGE:
1049
        break;
1✔
1050
      case UNRECOGNIZED:
1051
      case COMMAND_TYPE_UNSPECIFIED:
1052
        throw new IllegalArgumentException("Unexpected command type: " + command.getCommandType());
×
1053
    }
1054
  }
1✔
1055

1056
  private void assertMatch(
1057
      Command command, HistoryEvent event, String checkType, Object expected, Object actual) {
1058
    if (!expected.equals(actual)) {
1✔
1059
      String message =
×
1060
          String.format(
×
1061
              "Command %s doesn't match event %s with EventId=%s on check %s "
1062
                  + "with an expected value '%s' and an actual value '%s'",
1063
              command.getCommandType(),
×
1064
              event.getEventType(),
×
1065
              event.getEventId(),
×
1066
              checkType,
1067
              expected,
1068
              actual);
1069
      throw new NonDeterministicException(message);
×
1070
    }
1071
  }
1✔
1072

1073
  private class WorkflowTaskCommandsListener implements WorkflowTaskStateMachine.Listener {
1✔
1074
    @Override
1075
    public void workflowTaskStarted(
1076
        long startedEventId, long currentTimeMillis, boolean nonProcessedWorkflowTask) {
1077
      setCurrentTimeMillis(currentTimeMillis);
1✔
1078
      for (CancellableCommand cancellableCommand : commands) {
1✔
1079
        cancellableCommand.handleWorkflowTaskStarted();
1✔
1080
      }
1✔
1081
      // Give local activities a chance to recreate their requests if they were lost due
1082
      // to the last workflow task failure. The loss could happen only the last workflow task
1083
      // was forcibly created by setting forceCreate on RespondWorkflowTaskCompletedRequest.
1084
      if (nonProcessedWorkflowTask) {
1✔
1085
        for (LocalActivityStateMachine value : localActivityMap.values()) {
1✔
1086
          value.nonReplayWorkflowTaskStarted();
1✔
1087
        }
1✔
1088
      }
1089
      WorkflowStateMachines.this.currentStartedEventId = startedEventId;
1✔
1090

1091
      eventLoop();
1✔
1092
    }
1✔
1093

1094
    @Override
1095
    public void updateRunId(String currentRunId) {
1096
      WorkflowStateMachines.this.currentRunId = currentRunId;
×
1097
    }
×
1098
  }
1099

1100
  private long getInitialCommandEventId(HistoryEvent event) {
1101
    switch (event.getEventType()) {
1✔
1102
      case EVENT_TYPE_ACTIVITY_TASK_STARTED:
1103
        return event.getActivityTaskStartedEventAttributes().getScheduledEventId();
1✔
1104
      case EVENT_TYPE_ACTIVITY_TASK_COMPLETED:
1105
        return event.getActivityTaskCompletedEventAttributes().getScheduledEventId();
1✔
1106
      case EVENT_TYPE_ACTIVITY_TASK_FAILED:
1107
        return event.getActivityTaskFailedEventAttributes().getScheduledEventId();
1✔
1108
      case EVENT_TYPE_ACTIVITY_TASK_TIMED_OUT:
1109
        return event.getActivityTaskTimedOutEventAttributes().getScheduledEventId();
1✔
1110
      case EVENT_TYPE_ACTIVITY_TASK_CANCEL_REQUESTED:
1111
        return event.getActivityTaskCancelRequestedEventAttributes().getScheduledEventId();
×
1112
      case EVENT_TYPE_ACTIVITY_TASK_CANCELED:
1113
        return event.getActivityTaskCanceledEventAttributes().getScheduledEventId();
1✔
1114
      case EVENT_TYPE_TIMER_FIRED:
1115
        return event.getTimerFiredEventAttributes().getStartedEventId();
1✔
1116
      case EVENT_TYPE_TIMER_CANCELED:
1117
        return event.getTimerCanceledEventAttributes().getStartedEventId();
×
1118
      case EVENT_TYPE_REQUEST_CANCEL_EXTERNAL_WORKFLOW_EXECUTION_FAILED:
1119
        return event
×
1120
            .getRequestCancelExternalWorkflowExecutionFailedEventAttributes()
×
1121
            .getInitiatedEventId();
×
1122
      case EVENT_TYPE_EXTERNAL_WORKFLOW_EXECUTION_CANCEL_REQUESTED:
1123
        return event
1✔
1124
            .getExternalWorkflowExecutionCancelRequestedEventAttributes()
1✔
1125
            .getInitiatedEventId();
1✔
1126
      case EVENT_TYPE_START_CHILD_WORKFLOW_EXECUTION_FAILED:
1127
        return event.getStartChildWorkflowExecutionFailedEventAttributes().getInitiatedEventId();
1✔
1128
      case EVENT_TYPE_CHILD_WORKFLOW_EXECUTION_STARTED:
1129
        return event.getChildWorkflowExecutionStartedEventAttributes().getInitiatedEventId();
1✔
1130
      case EVENT_TYPE_CHILD_WORKFLOW_EXECUTION_COMPLETED:
1131
        return event.getChildWorkflowExecutionCompletedEventAttributes().getInitiatedEventId();
1✔
1132
      case EVENT_TYPE_CHILD_WORKFLOW_EXECUTION_FAILED:
1133
        return event.getChildWorkflowExecutionFailedEventAttributes().getInitiatedEventId();
1✔
1134
      case EVENT_TYPE_CHILD_WORKFLOW_EXECUTION_CANCELED:
1135
        return event.getChildWorkflowExecutionCanceledEventAttributes().getInitiatedEventId();
1✔
1136
      case EVENT_TYPE_CHILD_WORKFLOW_EXECUTION_TIMED_OUT:
1137
        return event.getChildWorkflowExecutionTimedOutEventAttributes().getInitiatedEventId();
1✔
1138
      case EVENT_TYPE_CHILD_WORKFLOW_EXECUTION_TERMINATED:
1139
        return event.getChildWorkflowExecutionTerminatedEventAttributes().getInitiatedEventId();
×
1140
      case EVENT_TYPE_SIGNAL_EXTERNAL_WORKFLOW_EXECUTION_FAILED:
1141
        return event
1✔
1142
            .getSignalExternalWorkflowExecutionFailedEventAttributes()
1✔
1143
            .getInitiatedEventId();
1✔
1144
      case EVENT_TYPE_EXTERNAL_WORKFLOW_EXECUTION_SIGNALED:
1145
        return event.getExternalWorkflowExecutionSignaledEventAttributes().getInitiatedEventId();
1✔
1146
      case EVENT_TYPE_WORKFLOW_TASK_STARTED:
1147
        return event.getWorkflowTaskStartedEventAttributes().getScheduledEventId();
1✔
1148
      case EVENT_TYPE_WORKFLOW_TASK_COMPLETED:
1149
        return event.getWorkflowTaskCompletedEventAttributes().getScheduledEventId();
1✔
1150
      case EVENT_TYPE_WORKFLOW_TASK_TIMED_OUT:
1151
        return event.getWorkflowTaskTimedOutEventAttributes().getScheduledEventId();
1✔
1152
      case EVENT_TYPE_WORKFLOW_TASK_FAILED:
1153
        return event.getWorkflowTaskFailedEventAttributes().getScheduledEventId();
1✔
1154

1155
      case EVENT_TYPE_ACTIVITY_TASK_SCHEDULED:
1156
      case EVENT_TYPE_TIMER_STARTED:
1157
      case EVENT_TYPE_MARKER_RECORDED:
1158
      case EVENT_TYPE_SIGNAL_EXTERNAL_WORKFLOW_EXECUTION_INITIATED:
1159
      case EVENT_TYPE_START_CHILD_WORKFLOW_EXECUTION_INITIATED:
1160
      case EVENT_TYPE_REQUEST_CANCEL_EXTERNAL_WORKFLOW_EXECUTION_INITIATED:
1161
      case EVENT_TYPE_WORKFLOW_EXECUTION_CONTINUED_AS_NEW:
1162
      case EVENT_TYPE_WORKFLOW_EXECUTION_TERMINATED:
1163
      case EVENT_TYPE_UPSERT_WORKFLOW_SEARCH_ATTRIBUTES:
1164
      case EVENT_TYPE_WORKFLOW_TASK_SCHEDULED:
1165
      case EVENT_TYPE_WORKFLOW_EXECUTION_SIGNALED:
1166
      case EVENT_TYPE_WORKFLOW_EXECUTION_STARTED:
1167
      case EVENT_TYPE_WORKFLOW_EXECUTION_COMPLETED:
1168
      case EVENT_TYPE_WORKFLOW_EXECUTION_FAILED:
1169
      case EVENT_TYPE_WORKFLOW_EXECUTION_TIMED_OUT:
1170
      case EVENT_TYPE_WORKFLOW_EXECUTION_CANCEL_REQUESTED:
1171
      case EVENT_TYPE_WORKFLOW_EXECUTION_CANCELED:
1172
        return event.getEventId();
1✔
1173
      case UNRECOGNIZED:
1174
      case EVENT_TYPE_UNSPECIFIED:
1175
        throw new IllegalArgumentException("Unexpected event type: " + event.getEventType());
×
1176
    }
1177
    throw new IllegalStateException("unreachable");
×
1178
  }
1179

1180
  /**
1181
   * Workflow code executes only while event loop is running. So operations that can be invoked from
1182
   * the workflow have to satisfy this condition.
1183
   */
1184
  private void checkEventLoopExecuting() {
1185
    if (!eventLoopExecuting) {
1✔
1186
      // this call doesn't yield or await, because the await function returns true,
1187
      // but it checks if the workflow thread needs to be destroyed
1188
      WorkflowThread.await("kill workflow thread if destroy requested", () -> true);
×
1189
      throw new IllegalStateException("Operation allowed only while eventLoop is running");
×
1190
    }
1191
  }
1✔
1192

1193
  private String createEventHandlingMessage(HistoryEvent event) {
1194
    return "Failure handling event "
1✔
1195
        + event.getEventId()
1✔
1196
        + " of type '"
1197
        + event.getEventType()
1✔
1198
        + "' "
1199
        + (this.isReplaying() ? "during replay" : "during execution");
1✔
1200
  }
1201

1202
  private String createShortCurrentStateMessagePostfix() {
1203
    return String.format(
1✔
1204
        "{WorkflowTaskStartedEventId=%s, CurrentStartedEventId=%s}",
1205
        this.workflowTaskStartedEventId, this.currentStartedEventId);
1✔
1206
  }
1207
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc