• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

temporalio / sdk-java / #343

31 Oct 2024 06:31PM UTC coverage: 75.148% (-3.6%) from 78.794%
#343

push

github

web-flow
Fix jacoco coverage (#2304)

5139 of 8240 branches covered (62.37%)

Branch coverage included in aggregate %.

22841 of 28993 relevant lines covered (78.78%)

0.79 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

92.04
/temporal-sdk/src/main/java/io/temporal/internal/sync/SyncWorkflow.java
1
/*
2
 * Copyright (C) 2022 Temporal Technologies, Inc. All Rights Reserved.
3
 *
4
 * Copyright (C) 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved.
5
 *
6
 * Modifications copyright (C) 2017 Uber Technologies, Inc.
7
 *
8
 * Licensed under the Apache License, Version 2.0 (the "License");
9
 * you may not use this material except in compliance with the License.
10
 * You may obtain a copy of the License at
11
 *
12
 *   http://www.apache.org/licenses/LICENSE-2.0
13
 *
14
 * Unless required by applicable law or agreed to in writing, software
15
 * distributed under the License is distributed on an "AS IS" BASIS,
16
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17
 * See the License for the specific language governing permissions and
18
 * limitations under the License.
19
 */
20

21
package io.temporal.internal.sync;
22

23
import io.temporal.api.common.v1.Header;
24
import io.temporal.api.common.v1.Payloads;
25
import io.temporal.api.common.v1.WorkflowExecution;
26
import io.temporal.api.common.v1.WorkflowType;
27
import io.temporal.api.enums.v1.EventType;
28
import io.temporal.api.history.v1.HistoryEvent;
29
import io.temporal.api.history.v1.WorkflowExecutionStartedEventAttributes;
30
import io.temporal.api.query.v1.WorkflowQuery;
31
import io.temporal.client.WorkflowClient;
32
import io.temporal.common.context.ContextPropagator;
33
import io.temporal.common.converter.DataConverter;
34
import io.temporal.common.converter.DefaultDataConverter;
35
import io.temporal.internal.logging.LoggerTag;
36
import io.temporal.internal.replay.ReplayWorkflow;
37
import io.temporal.internal.replay.ReplayWorkflowContext;
38
import io.temporal.internal.replay.WorkflowContext;
39
import io.temporal.internal.statemachines.UpdateProtocolCallback;
40
import io.temporal.internal.worker.WorkflowExecutionException;
41
import io.temporal.internal.worker.WorkflowExecutorCache;
42
import io.temporal.payload.context.WorkflowSerializationContext;
43
import io.temporal.worker.WorkflowImplementationOptions;
44
import io.temporal.workflow.UpdateInfo;
45
import java.util.List;
46
import java.util.Objects;
47
import java.util.Optional;
48
import javax.annotation.Nonnull;
49
import javax.annotation.Nullable;
50
import org.slf4j.Logger;
51
import org.slf4j.LoggerFactory;
52
import org.slf4j.MDC;
53

54
/**
55
 * SyncWorkflow supports workflows that use synchronous blocking code. An instance is created per
56
 * cached workflow run.
57
 */
58
class SyncWorkflow implements ReplayWorkflow {
59

60
  private static final Logger log = LoggerFactory.getLogger(SyncWorkflow.class);
1✔
61

62
  private final WorkflowThreadExecutor workflowThreadExecutor;
63
  private final SyncWorkflowDefinition workflow;
64
  @Nonnull private final WorkflowImplementationOptions workflowImplementationOptions;
65
  private final WorkflowExecutorCache cache;
66
  private final long defaultDeadlockDetectionTimeout;
67
  private final WorkflowMethodThreadNameStrategy workflowMethodThreadNameStrategy =
1✔
68
      ExecutionInfoStrategy.INSTANCE;
69
  private final SyncWorkflowContext workflowContext;
70
  private WorkflowExecutionHandler workflowProc;
71
  private DeterministicRunner runner;
72
  private DataConverter dataConverter;
73
  private DataConverter dataConverterWithWorkflowContext;
74

75
  public SyncWorkflow(
76
      String namespace,
77
      WorkflowExecution workflowExecution,
78
      SyncWorkflowDefinition workflow,
79
      SignalDispatcher signalDispatcher,
80
      QueryDispatcher queryDispatcher,
81
      UpdateDispatcher updateDispatcher,
82
      @Nullable WorkflowImplementationOptions workflowImplementationOptions,
83
      DataConverter dataConverter,
84
      WorkflowThreadExecutor workflowThreadExecutor,
85
      WorkflowExecutorCache cache,
86
      List<ContextPropagator> contextPropagators,
87
      long defaultDeadlockDetectionTimeout) {
1✔
88
    this.workflow = Objects.requireNonNull(workflow);
1✔
89
    this.workflowImplementationOptions =
1✔
90
        workflowImplementationOptions == null
1✔
91
            ? WorkflowImplementationOptions.getDefaultInstance()
1✔
92
            : workflowImplementationOptions;
1✔
93
    this.workflowThreadExecutor = Objects.requireNonNull(workflowThreadExecutor);
1✔
94
    this.cache = cache;
1✔
95
    this.defaultDeadlockDetectionTimeout = defaultDeadlockDetectionTimeout;
1✔
96
    this.dataConverter = dataConverter;
1✔
97
    this.dataConverterWithWorkflowContext =
1✔
98
        dataConverter.withContext(
1✔
99
            new WorkflowSerializationContext(namespace, workflowExecution.getWorkflowId()));
1✔
100
    this.workflowContext =
1✔
101
        new SyncWorkflowContext(
102
            namespace,
103
            workflowExecution,
104
            signalDispatcher,
105
            queryDispatcher,
106
            updateDispatcher,
107
            workflowImplementationOptions,
108
            dataConverter,
109
            contextPropagators);
110
  }
1✔
111

112
  @Override
113
  public void start(HistoryEvent event, ReplayWorkflowContext context) {
114
    if (event.getEventType() != EventType.EVENT_TYPE_WORKFLOW_EXECUTION_STARTED
1!
115
        || !event.hasWorkflowExecutionStartedEventAttributes()) {
1!
116
      throw new IllegalArgumentException(
×
117
          "first event is not WorkflowExecutionStarted, but " + event.getEventType());
×
118
    }
119

120
    WorkflowExecutionStartedEventAttributes startEvent =
1✔
121
        event.getWorkflowExecutionStartedEventAttributes();
1✔
122
    WorkflowType workflowType = startEvent.getWorkflowType();
1✔
123
    if (workflow == null) {
1!
124
      throw new IllegalArgumentException("Unknown workflow type: " + workflowType);
×
125
    }
126

127
    this.workflowContext.setReplayContext(context);
1✔
128

129
    workflowProc =
1✔
130
        new WorkflowExecutionHandler(
131
            workflowContext, workflow, startEvent, workflowImplementationOptions);
132
    // The following order is ensured by this code and DeterministicRunner implementation:
133
    // 1. workflow.initialize
134
    // 2. signal handler (if signalWithStart was called)
135
    // 3. main workflow method
136
    runner =
1✔
137
        DeterministicRunner.newRunner(
1✔
138
            workflowThreadExecutor,
139
            workflowContext,
140
            () -> {
141
              workflowProc.runConstructor();
1✔
142
              WorkflowInternal.newWorkflowMethodThread(
1✔
143
                      () -> workflowProc.runWorkflowMethod(),
1✔
144
                      workflowMethodThreadNameStrategy.createThreadName(
1✔
145
                          context.getWorkflowExecution()))
1✔
146
                  .start();
1✔
147
            },
1✔
148
            cache);
149
  }
1✔
150

151
  @Override
152
  public void handleSignal(
153
      String signalName, Optional<Payloads> input, long eventId, Header header) {
154
    // Signals can trigger completion
155
    runner.executeInWorkflowThread(
1✔
156
        "signal " + signalName,
157
        () -> {
158
          workflowProc.handleSignal(signalName, input, eventId, header);
1✔
159
        });
1✔
160
  }
1✔
161

162
  @Override
163
  public void handleUpdate(
164
      String updateName,
165
      String updateId,
166
      Optional<Payloads> input,
167
      long eventId,
168
      Header header,
169
      UpdateProtocolCallback callbacks) {
170
    final UpdateInfo updateInfo = new UpdateInfoImpl(updateName, updateId);
1✔
171
    runner.executeInWorkflowThread(
1✔
172
        "update " + updateName,
173
        () -> {
174
          try {
175
            workflowContext.setCurrentUpdateInfo(updateInfo);
1✔
176
            MDC.put(LoggerTag.UPDATE_ID, updateInfo.getUpdateId());
1✔
177
            MDC.put(LoggerTag.UPDATE_NAME, updateInfo.getUpdateName());
1✔
178
            // Skip validator on replay
179
            if (!callbacks.isReplaying()) {
1✔
180
              try {
181
                workflowContext.setReadOnly(true);
1✔
182
                workflowProc.handleValidateUpdate(updateName, updateId, input, eventId, header);
1✔
183
              } catch (ReadOnlyException r) {
1✔
184
                // Rethrow instead on rejecting the update to fail the WFT
185
                throw r;
1✔
186
              } catch (Exception e) {
1✔
187
                callbacks.reject(
1✔
188
                    workflowContext
189
                        .getDataConverterWithCurrentWorkflowContext()
1✔
190
                        .exceptionToFailure(e));
1✔
191
                return;
1✔
192
              } finally {
193
                workflowContext.setReadOnly(false);
1✔
194
              }
195
            }
196
            callbacks.accept();
1✔
197
            try {
198
              Optional<Payloads> result =
1✔
199
                  workflowProc.handleExecuteUpdate(updateName, updateId, input, eventId, header);
1✔
200
              callbacks.complete(result, null);
1✔
201
            } catch (WorkflowExecutionException e) {
1✔
202
              callbacks.complete(Optional.empty(), e.getFailure());
1✔
203
            }
1✔
204
          } finally {
205
            workflowContext.setCurrentUpdateInfo(null);
1✔
206
          }
207
        });
1✔
208
  }
1✔
209

210
  @Override
211
  public boolean eventLoop() {
212
    if (runner == null) {
1!
213
      return false;
×
214
    }
215
    runner.runUntilAllBlocked(defaultDeadlockDetectionTimeout);
1✔
216
    return runner.isDone() || workflowProc.isDone(); // Do not wait for all other threads.
1✔
217
  }
218

219
  @Override
220
  public Optional<Payloads> getOutput() {
221
    return workflowProc.getOutput();
1✔
222
  }
223

224
  @Override
225
  public void cancel(String reason) {
226
    runner.cancel(reason);
1✔
227
  }
1✔
228

229
  @Override
230
  public void close() {
231
    if (runner != null) {
1!
232
      runner.close();
1✔
233
    }
234
  }
1✔
235

236
  @Override
237
  public Optional<Payloads> query(WorkflowQuery query) {
238
    if (WorkflowClient.QUERY_TYPE_REPLAY_ONLY.equals(query.getQueryType())) {
1✔
239
      return Optional.empty();
1✔
240
    }
241
    if (WorkflowClient.QUERY_TYPE_STACK_TRACE.equals(query.getQueryType())) {
1✔
242
      // stack trace query result should be readable for UI even if user specifies a custom data
243
      // converter
244
      return DefaultDataConverter.STANDARD_INSTANCE.toPayloads(runner.stackTrace());
1✔
245
    }
246
    if (WorkflowClient.QUERY_TYPE_WORKFLOW_METADATA.equals(query.getQueryType())) {
1✔
247
      return dataConverterWithWorkflowContext.toPayloads(workflowContext.getWorkflowMetadata());
1✔
248
    }
249
    Optional<Payloads> args =
250
        query.hasQueryArgs() ? Optional.of(query.getQueryArgs()) : Optional.empty();
1✔
251
    return workflowProc.handleQuery(query.getQueryType(), query.getHeader(), args);
1✔
252
  }
253

254
  @Override
255
  public WorkflowContext getWorkflowContext() {
256
    return workflowContext;
1✔
257
  }
258
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc