• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

temporalio / sdk-java / #210

09 Nov 2023 05:49AM UTC coverage: 77.289% (+0.03%) from 77.264%
#210

push

github

web-flow
Treat signal after workflow complete as NonDeterministicException (#1923)

Treat signal after workflow complete as NonDeterministicException

1 of 2 new or added lines in 1 file covered. (50.0%)

5 existing lines in 2 files now uncovered.

18731 of 24235 relevant lines covered (77.29%)

0.77 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

89.61
/temporal-sdk/src/main/java/io/temporal/internal/replay/ReplayWorkflowExecutor.java
1
/*
2
 * Copyright (C) 2022 Temporal Technologies, Inc. All Rights Reserved.
3
 *
4
 * Copyright (C) 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved.
5
 *
6
 * Modifications copyright (C) 2017 Uber Technologies, Inc.
7
 *
8
 * Licensed under the Apache License, Version 2.0 (the "License");
9
 * you may not use this material except in compliance with the License.
10
 * You may obtain a copy of the License at
11
 *
12
 *   http://www.apache.org/licenses/LICENSE-2.0
13
 *
14
 * Unless required by applicable law or agreed to in writing, software
15
 * distributed under the License is distributed on an "AS IS" BASIS,
16
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17
 * See the License for the specific language governing permissions and
18
 * limitations under the License.
19
 */
20

21
package io.temporal.internal.replay;
22

23
import com.google.protobuf.InvalidProtocolBufferException;
24
import com.google.protobuf.util.Timestamps;
25
import com.uber.m3.tally.Scope;
26
import io.temporal.api.command.v1.ContinueAsNewWorkflowExecutionCommandAttributes;
27
import io.temporal.api.common.v1.Payloads;
28
import io.temporal.api.history.v1.HistoryEvent;
29
import io.temporal.api.history.v1.WorkflowExecutionCancelRequestedEventAttributes;
30
import io.temporal.api.history.v1.WorkflowExecutionSignaledEventAttributes;
31
import io.temporal.api.protocol.v1.Message;
32
import io.temporal.api.query.v1.WorkflowQuery;
33
import io.temporal.api.update.v1.Input;
34
import io.temporal.api.update.v1.Request;
35
import io.temporal.failure.CanceledFailure;
36
import io.temporal.internal.common.ProtobufTimeUtils;
37
import io.temporal.internal.common.UpdateMessage;
38
import io.temporal.internal.statemachines.WorkflowStateMachines;
39
import io.temporal.internal.worker.WorkflowExecutionException;
40
import io.temporal.worker.MetricsType;
41
import io.temporal.worker.NonDeterministicException;
42
import java.util.Optional;
43
import javax.annotation.Nullable;
44

45
final class ReplayWorkflowExecutor {
46

47
  private final ReplayWorkflow workflow;
48

49
  private final WorkflowStateMachines workflowStateMachines;
50

51
  private final ReplayWorkflowContextImpl context;
52

53
  private final Scope metricsScope;
54

55
  public ReplayWorkflowExecutor(
56
      ReplayWorkflow workflow,
57
      WorkflowStateMachines workflowStateMachines,
58
      ReplayWorkflowContextImpl context) {
1✔
59
    this.workflow = workflow;
1✔
60
    this.workflowStateMachines = workflowStateMachines;
1✔
61
    this.context = context;
1✔
62
    this.metricsScope = context.getMetricsScope();
1✔
63
  }
1✔
64

65
  public void eventLoop() {
66
    boolean completed = context.isWorkflowMethodCompleted();
1✔
67
    if (completed) {
1✔
68
      return;
1✔
69
    }
70
    WorkflowExecutionException failure = null;
1✔
71

72
    try {
73
      completed = workflow.eventLoop();
1✔
74
    } catch (WorkflowExecutionException e) {
1✔
75
      failure = e;
1✔
76
      completed = true;
1✔
77
    } catch (CanceledFailure e) {
×
78
      if (!context.isCancelRequested()) {
×
79
        failure =
×
80
            new WorkflowExecutionException(
81
                workflow.getWorkflowContext().mapWorkflowExceptionToFailure(e));
×
82
      }
83
      completed = true;
×
84
    }
1✔
85
    if (completed) {
1✔
86
      context.setWorkflowMethodCompleted();
1✔
87
      completeWorkflow(failure);
1✔
88
    }
89
  }
1✔
90

91
  private void completeWorkflow(@Nullable WorkflowExecutionException failure) {
92
    if (context.isCancelRequested()) {
1✔
93
      workflowStateMachines.cancelWorkflow();
1✔
94
      metricsScope.counter(MetricsType.WORKFLOW_CANCELED_COUNTER).inc(1);
1✔
95
    } else if (failure != null) {
1✔
96
      workflowStateMachines.failWorkflow(failure.getFailure());
1✔
97
      metricsScope.counter(MetricsType.WORKFLOW_FAILED_COUNTER).inc(1);
1✔
98
    } else {
99
      ContinueAsNewWorkflowExecutionCommandAttributes attributes =
1✔
100
          context.getContinueAsNewOnCompletion();
1✔
101
      if (attributes != null) {
1✔
102
        // TODO Refactoring idea
103
        //  Instead of carrying attributes over like this, SyncWorkflowContext should call
104
        //  workflowStateMachines.continueAsNewWorkflow directly.
105
        //  It's safe to do and be sure that ContinueAsNew will be the last command because
106
        //  WorkflowThread.exit() that it called and it's underlying implementation
107
        //  DeterministicRunner#exit()
108
        //  guarantee that no other workflow threads will get unblocked,
109
        //  so no new commands are generated after the call.
110
        //  This way attributes will need to be carried over in the mutable state and the flow
111
        //  generally will be aligned with the flow of other commands.
112
        workflowStateMachines.continueAsNewWorkflow(attributes);
1✔
113

114
        // TODO Issue #1590
115
        metricsScope.counter(MetricsType.WORKFLOW_CONTINUE_AS_NEW_COUNTER).inc(1);
1✔
116
      } else {
117
        Optional<Payloads> workflowOutput = workflow.getOutput();
1✔
118
        workflowStateMachines.completeWorkflow(workflowOutput);
1✔
119

120
        // TODO Issue #1590
121
        metricsScope.counter(MetricsType.WORKFLOW_COMPLETED_COUNTER).inc(1);
1✔
122
      }
123
    }
124

125
    com.uber.m3.util.Duration d =
126
        ProtobufTimeUtils.toM3Duration(
1✔
127
            Timestamps.fromMillis(System.currentTimeMillis()),
1✔
128
            Timestamps.fromMillis(context.getRunStartedTimestampMillis()));
1✔
129
    metricsScope.timer(MetricsType.WORKFLOW_E2E_LATENCY).record(d);
1✔
130
  }
1✔
131

132
  public void handleWorkflowExecutionCancelRequested(HistoryEvent event) {
133
    WorkflowExecutionCancelRequestedEventAttributes attributes =
1✔
134
        event.getWorkflowExecutionCancelRequestedEventAttributes();
1✔
135
    context.setCancelRequested();
1✔
136
    String cause = attributes.getCause();
1✔
137
    workflow.cancel(cause);
1✔
138
  }
1✔
139

140
  public void handleWorkflowExecutionSignaled(HistoryEvent event) {
141
    WorkflowExecutionSignaledEventAttributes signalAttributes =
1✔
142
        event.getWorkflowExecutionSignaledEventAttributes();
1✔
143
    if (context.isWorkflowMethodCompleted()) {
1✔
144
      throw new NonDeterministicException(
1✔
145
          "Signal received after workflow is completed. Typically this is caused by a nondeterministic code change in a workflow or a change is what payloads data converters can handle");
146
    }
147
    Optional<Payloads> input =
148
        signalAttributes.hasInput() ? Optional.of(signalAttributes.getInput()) : Optional.empty();
1✔
149
    this.workflow.handleSignal(
1✔
150
        signalAttributes.getSignalName(), input, event.getEventId(), signalAttributes.getHeader());
1✔
151
  }
1✔
152

153
  public void handleWorkflowExecutionUpdated(UpdateMessage updateMessage) {
154
    if (context.isWorkflowMethodCompleted()) {
1✔
NEW
155
      throw new NonDeterministicException("Update received after workflow is completed.");
×
156
    }
157
    try {
158
      Message protocolMessage = updateMessage.getMessage();
1✔
159
      Request update = protocolMessage.getBody().unpack(Request.class);
1✔
160
      Input input = update.getInput();
1✔
161
      Optional<Payloads> args = Optional.ofNullable(input.getArgs());
1✔
162
      this.workflow.handleUpdate(
1✔
163
          input.getName(),
1✔
164
          args,
165
          protocolMessage.getEventId(),
1✔
166
          input.getHeader(),
1✔
167
          updateMessage.getCallbacks());
1✔
168
    } catch (InvalidProtocolBufferException e) {
×
169
      throw new IllegalStateException("Message is not an update.");
×
170
    }
1✔
171
  }
1✔
172

173
  public Optional<Payloads> query(WorkflowQuery query) {
174
    return workflow.query(query);
1✔
175
  }
176

177
  public void close() {
178
    workflow.close();
1✔
179
  }
1✔
180

181
  public void start(HistoryEvent startWorkflowEvent) {
182
    workflow.start(startWorkflowEvent, context);
1✔
183
  }
1✔
184
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc