• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

temporalio / sdk-java / #188

25 Sep 2023 04:42PM UTC coverage: 77.369% (-0.3%) from 77.663%
#188

push

github-actions

web-flow
Fix null pointer on trigger immediately (#1865)

4 of 4 new or added lines in 1 file covered. (100.0%)

18670 of 24131 relevant lines covered (77.37%)

0.77 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

88.31
/temporal-sdk/src/main/java/io/temporal/internal/replay/ReplayWorkflowExecutor.java
1
/*
2
 * Copyright (C) 2022 Temporal Technologies, Inc. All Rights Reserved.
3
 *
4
 * Copyright (C) 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved.
5
 *
6
 * Modifications copyright (C) 2017 Uber Technologies, Inc.
7
 *
8
 * Licensed under the Apache License, Version 2.0 (the "License");
9
 * you may not use this material except in compliance with the License.
10
 * You may obtain a copy of the License at
11
 *
12
 *   http://www.apache.org/licenses/LICENSE-2.0
13
 *
14
 * Unless required by applicable law or agreed to in writing, software
15
 * distributed under the License is distributed on an "AS IS" BASIS,
16
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17
 * See the License for the specific language governing permissions and
18
 * limitations under the License.
19
 */
20

21
package io.temporal.internal.replay;
22

23
import com.google.protobuf.InvalidProtocolBufferException;
24
import com.google.protobuf.util.Timestamps;
25
import com.uber.m3.tally.Scope;
26
import io.temporal.api.command.v1.ContinueAsNewWorkflowExecutionCommandAttributes;
27
import io.temporal.api.common.v1.Payloads;
28
import io.temporal.api.history.v1.HistoryEvent;
29
import io.temporal.api.history.v1.WorkflowExecutionCancelRequestedEventAttributes;
30
import io.temporal.api.history.v1.WorkflowExecutionSignaledEventAttributes;
31
import io.temporal.api.protocol.v1.Message;
32
import io.temporal.api.query.v1.WorkflowQuery;
33
import io.temporal.api.update.v1.Input;
34
import io.temporal.api.update.v1.Request;
35
import io.temporal.failure.CanceledFailure;
36
import io.temporal.internal.common.ProtobufTimeUtils;
37
import io.temporal.internal.common.UpdateMessage;
38
import io.temporal.internal.statemachines.WorkflowStateMachines;
39
import io.temporal.internal.worker.WorkflowExecutionException;
40
import io.temporal.worker.MetricsType;
41
import java.util.Optional;
42
import javax.annotation.Nullable;
43

44
final class ReplayWorkflowExecutor {
45

46
  private final ReplayWorkflow workflow;
47

48
  private final WorkflowStateMachines workflowStateMachines;
49

50
  private final ReplayWorkflowContextImpl context;
51

52
  private final Scope metricsScope;
53

54
  public ReplayWorkflowExecutor(
55
      ReplayWorkflow workflow,
56
      WorkflowStateMachines workflowStateMachines,
57
      ReplayWorkflowContextImpl context) {
1✔
58
    this.workflow = workflow;
1✔
59
    this.workflowStateMachines = workflowStateMachines;
1✔
60
    this.context = context;
1✔
61
    this.metricsScope = context.getMetricsScope();
1✔
62
  }
1✔
63

64
  public void eventLoop() {
65
    boolean completed = context.isWorkflowMethodCompleted();
1✔
66
    if (completed) {
1✔
67
      return;
1✔
68
    }
69
    WorkflowExecutionException failure = null;
1✔
70

71
    try {
72
      completed = workflow.eventLoop();
1✔
73
    } catch (WorkflowExecutionException e) {
1✔
74
      failure = e;
1✔
75
      completed = true;
1✔
76
    } catch (CanceledFailure e) {
×
77
      if (!context.isCancelRequested()) {
×
78
        failure =
×
79
            new WorkflowExecutionException(
80
                workflow.getWorkflowContext().mapWorkflowExceptionToFailure(e));
×
81
      }
82
      completed = true;
×
83
    }
1✔
84
    if (completed) {
1✔
85
      context.setWorkflowMethodCompleted();
1✔
86
      completeWorkflow(failure);
1✔
87
    }
88
  }
1✔
89

90
  private void completeWorkflow(@Nullable WorkflowExecutionException failure) {
91
    if (context.isCancelRequested()) {
1✔
92
      workflowStateMachines.cancelWorkflow();
1✔
93
      metricsScope.counter(MetricsType.WORKFLOW_CANCELED_COUNTER).inc(1);
1✔
94
    } else if (failure != null) {
1✔
95
      workflowStateMachines.failWorkflow(failure.getFailure());
1✔
96
      metricsScope.counter(MetricsType.WORKFLOW_FAILED_COUNTER).inc(1);
1✔
97
    } else {
98
      ContinueAsNewWorkflowExecutionCommandAttributes attributes =
1✔
99
          context.getContinueAsNewOnCompletion();
1✔
100
      if (attributes != null) {
1✔
101
        // TODO Refactoring idea
102
        //  Instead of carrying attributes over like this, SyncWorkflowContext should call
103
        //  workflowStateMachines.continueAsNewWorkflow directly.
104
        //  It's safe to do and be sure that ContinueAsNew will be the last command because
105
        //  WorkflowThread.exit() that it called and it's underlying implementation
106
        //  DeterministicRunner#exit()
107
        //  guarantee that no other workflow threads will get unblocked,
108
        //  so no new commands are generated after the call.
109
        //  This way attributes will need to be carried over in the mutable state and the flow
110
        //  generally will be aligned with the flow of other commands.
111
        workflowStateMachines.continueAsNewWorkflow(attributes);
1✔
112

113
        // TODO Issue #1590
114
        metricsScope.counter(MetricsType.WORKFLOW_CONTINUE_AS_NEW_COUNTER).inc(1);
1✔
115
      } else {
116
        Optional<Payloads> workflowOutput = workflow.getOutput();
1✔
117
        workflowStateMachines.completeWorkflow(workflowOutput);
1✔
118

119
        // TODO Issue #1590
120
        metricsScope.counter(MetricsType.WORKFLOW_COMPLETED_COUNTER).inc(1);
1✔
121
      }
122
    }
123

124
    com.uber.m3.util.Duration d =
125
        ProtobufTimeUtils.toM3Duration(
1✔
126
            Timestamps.fromMillis(System.currentTimeMillis()),
1✔
127
            Timestamps.fromMillis(context.getRunStartedTimestampMillis()));
1✔
128
    metricsScope.timer(MetricsType.WORKFLOW_E2E_LATENCY).record(d);
1✔
129
  }
1✔
130

131
  public void handleWorkflowExecutionCancelRequested(HistoryEvent event) {
132
    WorkflowExecutionCancelRequestedEventAttributes attributes =
1✔
133
        event.getWorkflowExecutionCancelRequestedEventAttributes();
1✔
134
    context.setCancelRequested();
1✔
135
    String cause = attributes.getCause();
1✔
136
    workflow.cancel(cause);
1✔
137
  }
1✔
138

139
  public void handleWorkflowExecutionSignaled(HistoryEvent event) {
140
    WorkflowExecutionSignaledEventAttributes signalAttributes =
1✔
141
        event.getWorkflowExecutionSignaledEventAttributes();
1✔
142
    if (context.isWorkflowMethodCompleted()) {
1✔
143
      throw new IllegalStateException("Signal received after workflow is closed.");
×
144
    }
145
    Optional<Payloads> input =
146
        signalAttributes.hasInput() ? Optional.of(signalAttributes.getInput()) : Optional.empty();
1✔
147
    this.workflow.handleSignal(
1✔
148
        signalAttributes.getSignalName(), input, event.getEventId(), signalAttributes.getHeader());
1✔
149
  }
1✔
150

151
  public void handleWorkflowExecutionUpdated(UpdateMessage updateMessage) {
152
    if (context.isWorkflowMethodCompleted()) {
1✔
153
      throw new IllegalStateException("Update received after workflow is closed.");
×
154
    }
155
    try {
156
      Message protocolMessage = updateMessage.getMessage();
1✔
157
      Request update = protocolMessage.getBody().unpack(Request.class);
1✔
158
      Input input = update.getInput();
1✔
159
      Optional<Payloads> args = Optional.ofNullable(input.getArgs());
1✔
160
      this.workflow.handleUpdate(
1✔
161
          input.getName(),
1✔
162
          args,
163
          protocolMessage.getEventId(),
1✔
164
          input.getHeader(),
1✔
165
          updateMessage.getCallbacks());
1✔
166
    } catch (InvalidProtocolBufferException e) {
×
167
      throw new IllegalStateException("Message is not an update.");
×
168
    }
1✔
169
  }
1✔
170

171
  public Optional<Payloads> query(WorkflowQuery query) {
172
    return workflow.query(query);
1✔
173
  }
174

175
  public void close() {
176
    workflow.close();
1✔
177
  }
1✔
178

179
  public void start(HistoryEvent startWorkflowEvent) {
180
    workflow.start(startWorkflowEvent, context);
1✔
181
  }
1✔
182
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc