• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

temporalio / sdk-java / #157

pending completion
#157

push

github-actions

web-flow
Provide SerializationContext for PayloadConverter and PayloadCodec (#1695)

Issue #1694

497 of 497 new or added lines in 32 files covered. (100.0%)

16942 of 20806 relevant lines covered (81.43%)

0.81 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

90.16
/temporal-sdk/src/main/java/io/temporal/internal/replay/ReplayWorkflowExecutor.java
1
/*
2
 * Copyright (C) 2022 Temporal Technologies, Inc. All Rights Reserved.
3
 *
4
 * Copyright (C) 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved.
5
 *
6
 * Modifications copyright (C) 2017 Uber Technologies, Inc.
7
 *
8
 * Licensed under the Apache License, Version 2.0 (the "License");
9
 * you may not use this material except in compliance with the License.
10
 * You may obtain a copy of the License at
11
 *
12
 *   http://www.apache.org/licenses/LICENSE-2.0
13
 *
14
 * Unless required by applicable law or agreed to in writing, software
15
 * distributed under the License is distributed on an "AS IS" BASIS,
16
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17
 * See the License for the specific language governing permissions and
18
 * limitations under the License.
19
 */
20

21
package io.temporal.internal.replay;
22

23
import com.google.protobuf.util.Timestamps;
24
import com.uber.m3.tally.Scope;
25
import io.temporal.api.command.v1.ContinueAsNewWorkflowExecutionCommandAttributes;
26
import io.temporal.api.common.v1.Payloads;
27
import io.temporal.api.history.v1.HistoryEvent;
28
import io.temporal.api.history.v1.WorkflowExecutionCancelRequestedEventAttributes;
29
import io.temporal.api.history.v1.WorkflowExecutionSignaledEventAttributes;
30
import io.temporal.api.query.v1.WorkflowQuery;
31
import io.temporal.failure.CanceledFailure;
32
import io.temporal.internal.common.ProtobufTimeUtils;
33
import io.temporal.internal.statemachines.WorkflowStateMachines;
34
import io.temporal.internal.worker.WorkflowExecutionException;
35
import io.temporal.worker.MetricsType;
36
import java.util.Optional;
37
import javax.annotation.Nullable;
38

39
final class ReplayWorkflowExecutor {
40

41
  private final ReplayWorkflow workflow;
42

43
  private final WorkflowStateMachines workflowStateMachines;
44

45
  private final ReplayWorkflowContextImpl context;
46

47
  private final Scope metricsScope;
48

49
  public ReplayWorkflowExecutor(
50
      ReplayWorkflow workflow,
51
      WorkflowStateMachines workflowStateMachines,
52
      ReplayWorkflowContextImpl context) {
1✔
53
    this.workflow = workflow;
1✔
54
    this.workflowStateMachines = workflowStateMachines;
1✔
55
    this.context = context;
1✔
56
    this.metricsScope = context.getMetricsScope();
1✔
57
  }
1✔
58

59
  public void eventLoop() {
60
    boolean completed = context.isWorkflowMethodCompleted();
1✔
61
    if (completed) {
1✔
62
      return;
1✔
63
    }
64
    WorkflowExecutionException failure = null;
1✔
65

66
    try {
67
      completed = workflow.eventLoop();
1✔
68
    } catch (WorkflowExecutionException e) {
1✔
69
      failure = e;
1✔
70
      completed = true;
1✔
71
    } catch (CanceledFailure e) {
×
72
      if (!context.isCancelRequested()) {
×
73
        failure =
×
74
            new WorkflowExecutionException(
75
                workflow.getWorkflowContext().mapWorkflowExceptionToFailure(e));
×
76
      }
77
      completed = true;
×
78
    }
1✔
79
    if (completed) {
1✔
80
      context.setWorkflowMethodCompleted();
1✔
81
      completeWorkflow(failure);
1✔
82
    }
83
  }
1✔
84

85
  private void completeWorkflow(@Nullable WorkflowExecutionException failure) {
86
    if (context.isCancelRequested()) {
1✔
87
      workflowStateMachines.cancelWorkflow();
1✔
88
      metricsScope.counter(MetricsType.WORKFLOW_CANCELED_COUNTER).inc(1);
1✔
89
    } else if (failure != null) {
1✔
90
      workflowStateMachines.failWorkflow(failure.getFailure());
1✔
91
      metricsScope.counter(MetricsType.WORKFLOW_FAILED_COUNTER).inc(1);
1✔
92
    } else {
93
      ContinueAsNewWorkflowExecutionCommandAttributes attributes =
1✔
94
          context.getContinueAsNewOnCompletion();
1✔
95
      if (attributes != null) {
1✔
96
        // TODO Refactoring idea
97
        //  Instead of carrying attributes over like this, SyncWorkflowContext should call
98
        //  workflowStateMachines.continueAsNewWorkflow directly.
99
        //  It's safe to do and be sure that ContinueAsNew will be the last command because
100
        //  WorkflowThread.exit() that it called and it's underlying implementation
101
        //  DeterministicRunner#exit()
102
        //  guarantee that no other workflow threads will get unblocked,
103
        //  so no new commands are generated after the call.
104
        //  This way attributes will need to be carried over in the mutable state and the flow
105
        //  generally will be aligned with the flow of other commands.
106
        workflowStateMachines.continueAsNewWorkflow(attributes);
1✔
107

108
        // TODO Issue #1590
109
        metricsScope.counter(MetricsType.WORKFLOW_CONTINUE_AS_NEW_COUNTER).inc(1);
1✔
110
      } else {
111
        Optional<Payloads> workflowOutput = workflow.getOutput();
1✔
112
        workflowStateMachines.completeWorkflow(workflowOutput);
1✔
113

114
        // TODO Issue #1590
115
        metricsScope.counter(MetricsType.WORKFLOW_COMPLETED_COUNTER).inc(1);
1✔
116
      }
117
    }
118

119
    com.uber.m3.util.Duration d =
120
        ProtobufTimeUtils.toM3Duration(
1✔
121
            Timestamps.fromMillis(System.currentTimeMillis()),
1✔
122
            Timestamps.fromMillis(context.getRunStartedTimestampMillis()));
1✔
123
    metricsScope.timer(MetricsType.WORKFLOW_E2E_LATENCY).record(d);
1✔
124
  }
1✔
125

126
  public void handleWorkflowExecutionCancelRequested(HistoryEvent event) {
127
    WorkflowExecutionCancelRequestedEventAttributes attributes =
1✔
128
        event.getWorkflowExecutionCancelRequestedEventAttributes();
1✔
129
    context.setCancelRequested();
1✔
130
    String cause = attributes.getCause();
1✔
131
    workflow.cancel(cause);
1✔
132
  }
1✔
133

134
  public void handleWorkflowExecutionSignaled(HistoryEvent event) {
135
    WorkflowExecutionSignaledEventAttributes signalAttributes =
1✔
136
        event.getWorkflowExecutionSignaledEventAttributes();
1✔
137
    if (context.isWorkflowMethodCompleted()) {
1✔
138
      throw new IllegalStateException("Signal received after workflow is closed.");
×
139
    }
140
    Optional<Payloads> input =
141
        signalAttributes.hasInput() ? Optional.of(signalAttributes.getInput()) : Optional.empty();
1✔
142
    this.workflow.handleSignal(signalAttributes.getSignalName(), input, event.getEventId());
1✔
143
  }
1✔
144

145
  public Optional<Payloads> query(WorkflowQuery query) {
146
    return workflow.query(query);
1✔
147
  }
148

149
  public void close() {
150
    workflow.close();
1✔
151
  }
1✔
152

153
  public void start(HistoryEvent startWorkflowEvent) {
154
    workflow.start(startWorkflowEvent, context);
1✔
155
  }
1✔
156
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc