• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

temporalio / sdk-java / #181

pending completion
#181

push

github-actions

web-flow
Properly wrap exceptions from schedule client (#1827)

Wrap schedule exception

37 of 37 new or added lines in 1 file covered. (100.0%)

18557 of 23894 relevant lines covered (77.66%)

0.78 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

94.44
/temporal-sdk/src/main/java/io/temporal/internal/sync/SyncWorkflow.java
1
/*
2
 * Copyright (C) 2022 Temporal Technologies, Inc. All Rights Reserved.
3
 *
4
 * Copyright (C) 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved.
5
 *
6
 * Modifications copyright (C) 2017 Uber Technologies, Inc.
7
 *
8
 * Licensed under the Apache License, Version 2.0 (the "License");
9
 * you may not use this material except in compliance with the License.
10
 * You may obtain a copy of the License at
11
 *
12
 *   http://www.apache.org/licenses/LICENSE-2.0
13
 *
14
 * Unless required by applicable law or agreed to in writing, software
15
 * distributed under the License is distributed on an "AS IS" BASIS,
16
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17
 * See the License for the specific language governing permissions and
18
 * limitations under the License.
19
 */
20

21
package io.temporal.internal.sync;
22

23
import io.temporal.api.common.v1.Payloads;
24
import io.temporal.api.common.v1.WorkflowExecution;
25
import io.temporal.api.common.v1.WorkflowType;
26
import io.temporal.api.enums.v1.EventType;
27
import io.temporal.api.history.v1.HistoryEvent;
28
import io.temporal.api.history.v1.WorkflowExecutionStartedEventAttributes;
29
import io.temporal.api.query.v1.WorkflowQuery;
30
import io.temporal.client.WorkflowClient;
31
import io.temporal.common.context.ContextPropagator;
32
import io.temporal.common.converter.DataConverter;
33
import io.temporal.common.converter.DefaultDataConverter;
34
import io.temporal.internal.replay.ReplayWorkflow;
35
import io.temporal.internal.replay.ReplayWorkflowContext;
36
import io.temporal.internal.replay.WorkflowContext;
37
import io.temporal.internal.statemachines.UpdateProtocolCallback;
38
import io.temporal.internal.worker.WorkflowExecutionException;
39
import io.temporal.internal.worker.WorkflowExecutorCache;
40
import io.temporal.worker.WorkflowImplementationOptions;
41
import java.util.List;
42
import java.util.Objects;
43
import java.util.Optional;
44
import javax.annotation.Nonnull;
45
import javax.annotation.Nullable;
46
import org.slf4j.Logger;
47
import org.slf4j.LoggerFactory;
48

49
/**
50
 * SyncWorkflow supports workflows that use synchronous blocking code. An instance is created per
51
 * cached workflow run.
52
 */
53
class SyncWorkflow implements ReplayWorkflow {
54

55
  private static final Logger log = LoggerFactory.getLogger(SyncWorkflow.class);
1✔
56

57
  private final WorkflowThreadExecutor workflowThreadExecutor;
58
  private final SyncWorkflowDefinition workflow;
59
  @Nonnull private final WorkflowImplementationOptions workflowImplementationOptions;
60
  private final WorkflowExecutorCache cache;
61
  private final long defaultDeadlockDetectionTimeout;
62
  private final WorkflowMethodThreadNameStrategy workflowMethodThreadNameStrategy =
1✔
63
      ExecutionInfoStrategy.INSTANCE;
64
  private final SyncWorkflowContext workflowContext;
65
  private WorkflowExecutionHandler workflowProc;
66
  private DeterministicRunner runner;
67
  private DataConverter dataConverter;
68

69
  public SyncWorkflow(
70
      String namespace,
71
      WorkflowExecution workflowExecution,
72
      SyncWorkflowDefinition workflow,
73
      SignalDispatcher signalDispatcher,
74
      QueryDispatcher queryDispatcher,
75
      UpdateDispatcher updateDispatcher,
76
      @Nullable WorkflowImplementationOptions workflowImplementationOptions,
77
      DataConverter dataConverter,
78
      WorkflowThreadExecutor workflowThreadExecutor,
79
      WorkflowExecutorCache cache,
80
      List<ContextPropagator> contextPropagators,
81
      long defaultDeadlockDetectionTimeout) {
1✔
82
    this.workflow = Objects.requireNonNull(workflow);
1✔
83
    this.workflowImplementationOptions =
1✔
84
        workflowImplementationOptions == null
1✔
85
            ? WorkflowImplementationOptions.getDefaultInstance()
1✔
86
            : workflowImplementationOptions;
1✔
87
    this.workflowThreadExecutor = Objects.requireNonNull(workflowThreadExecutor);
1✔
88
    this.cache = cache;
1✔
89
    this.defaultDeadlockDetectionTimeout = defaultDeadlockDetectionTimeout;
1✔
90
    this.dataConverter = dataConverter;
1✔
91
    this.workflowContext =
1✔
92
        new SyncWorkflowContext(
93
            namespace,
94
            workflowExecution,
95
            signalDispatcher,
96
            queryDispatcher,
97
            updateDispatcher,
98
            workflowImplementationOptions,
99
            dataConverter,
100
            contextPropagators);
101
  }
1✔
102

103
  @Override
104
  public void start(HistoryEvent event, ReplayWorkflowContext context) {
105
    if (event.getEventType() != EventType.EVENT_TYPE_WORKFLOW_EXECUTION_STARTED
1✔
106
        || !event.hasWorkflowExecutionStartedEventAttributes()) {
1✔
107
      throw new IllegalArgumentException(
×
108
          "first event is not WorkflowExecutionStarted, but " + event.getEventType());
×
109
    }
110

111
    WorkflowExecutionStartedEventAttributes startEvent =
1✔
112
        event.getWorkflowExecutionStartedEventAttributes();
1✔
113
    WorkflowType workflowType = startEvent.getWorkflowType();
1✔
114
    if (workflow == null) {
1✔
115
      throw new IllegalArgumentException("Unknown workflow type: " + workflowType);
×
116
    }
117

118
    this.workflowContext.setReplayContext(context);
1✔
119

120
    workflowProc =
1✔
121
        new WorkflowExecutionHandler(
122
            workflowContext, workflow, startEvent, workflowImplementationOptions);
123
    // The following order is ensured by this code and DeterministicRunner implementation:
124
    // 1. workflow.initialize
125
    // 2. signal handler (if signalWithStart was called)
126
    // 3. main workflow method
127
    runner =
1✔
128
        DeterministicRunner.newRunner(
1✔
129
            workflowThreadExecutor,
130
            workflowContext,
131
            () -> {
132
              workflow.initialize();
1✔
133
              WorkflowInternal.newWorkflowMethodThread(
1✔
134
                      () -> workflowProc.runWorkflowMethod(),
1✔
135
                      workflowMethodThreadNameStrategy.createThreadName(
1✔
136
                          context.getWorkflowExecution()))
1✔
137
                  .start();
1✔
138
            },
1✔
139
            cache);
140
  }
1✔
141

142
  @Override
143
  public void handleSignal(String signalName, Optional<Payloads> input, long eventId) {
144
    runner.executeInWorkflowThread(
1✔
145
        "signal " + signalName, () -> workflowProc.handleSignal(signalName, input, eventId));
1✔
146
  }
1✔
147

148
  @Override
149
  public void handleUpdate(
150
      String updateName, Optional<Payloads> input, long eventId, UpdateProtocolCallback callbacks) {
151
    runner.executeInWorkflowThread(
1✔
152
        "update " + updateName,
153
        () -> {
154
          // Skip validator on replay
155
          if (!callbacks.isReplaying()) {
1✔
156
            try {
157
              // TODO(https://github.com/temporalio/sdk-java/issues/1748) handleValidateUpdate
158
              // should not just be run
159
              // in a workflow thread
160
              workflowContext.setReadOnly(true);
1✔
161
              workflowProc.handleValidateUpdate(updateName, input, eventId);
1✔
162
            } catch (Exception e) {
1✔
163
              callbacks.reject(this.dataConverter.exceptionToFailure(e));
1✔
164
              return;
1✔
165
            } finally {
166
              workflowContext.setReadOnly(false);
1✔
167
            }
168
          }
169
          callbacks.accept();
1✔
170
          try {
171
            Optional<Payloads> result =
1✔
172
                workflowProc.handleExecuteUpdate(updateName, input, eventId);
1✔
173
            callbacks.complete(result, null);
1✔
174
          } catch (WorkflowExecutionException e) {
1✔
175
            callbacks.complete(Optional.empty(), e.getFailure());
1✔
176
          }
1✔
177
        });
1✔
178
  }
1✔
179

180
  @Override
181
  public boolean eventLoop() {
182
    if (runner == null) {
1✔
183
      return false;
×
184
    }
185
    runner.runUntilAllBlocked(defaultDeadlockDetectionTimeout);
1✔
186
    return runner.isDone() || workflowProc.isDone(); // Do not wait for all other threads.
1✔
187
  }
188

189
  @Override
190
  public Optional<Payloads> getOutput() {
191
    return workflowProc.getOutput();
1✔
192
  }
193

194
  @Override
195
  public void cancel(String reason) {
196
    runner.cancel(reason);
1✔
197
  }
1✔
198

199
  @Override
200
  public void close() {
201
    if (runner != null) {
1✔
202
      runner.close();
1✔
203
    }
204
  }
1✔
205

206
  @Override
207
  public Optional<Payloads> query(WorkflowQuery query) {
208
    if (WorkflowClient.QUERY_TYPE_REPLAY_ONLY.equals(query.getQueryType())) {
1✔
209
      return Optional.empty();
1✔
210
    }
211
    if (WorkflowClient.QUERY_TYPE_STACK_TRACE.equals(query.getQueryType())) {
1✔
212
      // stack trace query result should be readable for UI even if user specifies a custom data
213
      // converter
214
      return DefaultDataConverter.STANDARD_INSTANCE.toPayloads(runner.stackTrace());
1✔
215
    }
216
    Optional<Payloads> args =
217
        query.hasQueryArgs() ? Optional.of(query.getQueryArgs()) : Optional.empty();
1✔
218
    return workflowProc.handleQuery(query.getQueryType(), args);
1✔
219
  }
220

221
  @Override
222
  public WorkflowContext getWorkflowContext() {
223
    return workflowContext;
1✔
224
  }
225
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc