• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

temporalio / sdk-java / #188

25 Sep 2023 04:42PM UTC coverage: 77.369% (-0.3%) from 77.663%
#188

push

github-actions

web-flow
Fix null pointer on trigger immediately (#1865)

4 of 4 new or added lines in 1 file covered. (100.0%)

18670 of 24131 relevant lines covered (77.37%)

0.77 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

94.44
/temporal-sdk/src/main/java/io/temporal/internal/sync/SyncWorkflow.java
1
/*
2
 * Copyright (C) 2022 Temporal Technologies, Inc. All Rights Reserved.
3
 *
4
 * Copyright (C) 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved.
5
 *
6
 * Modifications copyright (C) 2017 Uber Technologies, Inc.
7
 *
8
 * Licensed under the Apache License, Version 2.0 (the "License");
9
 * you may not use this material except in compliance with the License.
10
 * You may obtain a copy of the License at
11
 *
12
 *   http://www.apache.org/licenses/LICENSE-2.0
13
 *
14
 * Unless required by applicable law or agreed to in writing, software
15
 * distributed under the License is distributed on an "AS IS" BASIS,
16
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17
 * See the License for the specific language governing permissions and
18
 * limitations under the License.
19
 */
20

21
package io.temporal.internal.sync;
22

23
import io.temporal.api.common.v1.Header;
24
import io.temporal.api.common.v1.Payloads;
25
import io.temporal.api.common.v1.WorkflowExecution;
26
import io.temporal.api.common.v1.WorkflowType;
27
import io.temporal.api.enums.v1.EventType;
28
import io.temporal.api.history.v1.HistoryEvent;
29
import io.temporal.api.history.v1.WorkflowExecutionStartedEventAttributes;
30
import io.temporal.api.query.v1.WorkflowQuery;
31
import io.temporal.client.WorkflowClient;
32
import io.temporal.common.context.ContextPropagator;
33
import io.temporal.common.converter.DataConverter;
34
import io.temporal.common.converter.DefaultDataConverter;
35
import io.temporal.internal.replay.ReplayWorkflow;
36
import io.temporal.internal.replay.ReplayWorkflowContext;
37
import io.temporal.internal.replay.WorkflowContext;
38
import io.temporal.internal.statemachines.UpdateProtocolCallback;
39
import io.temporal.internal.worker.WorkflowExecutionException;
40
import io.temporal.internal.worker.WorkflowExecutorCache;
41
import io.temporal.worker.WorkflowImplementationOptions;
42
import java.util.List;
43
import java.util.Objects;
44
import java.util.Optional;
45
import javax.annotation.Nonnull;
46
import javax.annotation.Nullable;
47
import org.slf4j.Logger;
48
import org.slf4j.LoggerFactory;
49

50
/**
51
 * SyncWorkflow supports workflows that use synchronous blocking code. An instance is created per
52
 * cached workflow run.
53
 */
54
class SyncWorkflow implements ReplayWorkflow {
55

56
  private static final Logger log = LoggerFactory.getLogger(SyncWorkflow.class);
1✔
57

58
  private final WorkflowThreadExecutor workflowThreadExecutor;
59
  private final SyncWorkflowDefinition workflow;
60
  @Nonnull private final WorkflowImplementationOptions workflowImplementationOptions;
61
  private final WorkflowExecutorCache cache;
62
  private final long defaultDeadlockDetectionTimeout;
63
  private final WorkflowMethodThreadNameStrategy workflowMethodThreadNameStrategy =
1✔
64
      ExecutionInfoStrategy.INSTANCE;
65
  private final SyncWorkflowContext workflowContext;
66
  private WorkflowExecutionHandler workflowProc;
67
  private DeterministicRunner runner;
68
  private DataConverter dataConverter;
69

70
  public SyncWorkflow(
71
      String namespace,
72
      WorkflowExecution workflowExecution,
73
      SyncWorkflowDefinition workflow,
74
      SignalDispatcher signalDispatcher,
75
      QueryDispatcher queryDispatcher,
76
      UpdateDispatcher updateDispatcher,
77
      @Nullable WorkflowImplementationOptions workflowImplementationOptions,
78
      DataConverter dataConverter,
79
      WorkflowThreadExecutor workflowThreadExecutor,
80
      WorkflowExecutorCache cache,
81
      List<ContextPropagator> contextPropagators,
82
      long defaultDeadlockDetectionTimeout) {
1✔
83
    this.workflow = Objects.requireNonNull(workflow);
1✔
84
    this.workflowImplementationOptions =
1✔
85
        workflowImplementationOptions == null
1✔
86
            ? WorkflowImplementationOptions.getDefaultInstance()
1✔
87
            : workflowImplementationOptions;
1✔
88
    this.workflowThreadExecutor = Objects.requireNonNull(workflowThreadExecutor);
1✔
89
    this.cache = cache;
1✔
90
    this.defaultDeadlockDetectionTimeout = defaultDeadlockDetectionTimeout;
1✔
91
    this.dataConverter = dataConverter;
1✔
92
    this.workflowContext =
1✔
93
        new SyncWorkflowContext(
94
            namespace,
95
            workflowExecution,
96
            signalDispatcher,
97
            queryDispatcher,
98
            updateDispatcher,
99
            workflowImplementationOptions,
100
            dataConverter,
101
            contextPropagators);
102
  }
1✔
103

104
  @Override
105
  public void start(HistoryEvent event, ReplayWorkflowContext context) {
106
    if (event.getEventType() != EventType.EVENT_TYPE_WORKFLOW_EXECUTION_STARTED
1✔
107
        || !event.hasWorkflowExecutionStartedEventAttributes()) {
1✔
108
      throw new IllegalArgumentException(
×
109
          "first event is not WorkflowExecutionStarted, but " + event.getEventType());
×
110
    }
111

112
    WorkflowExecutionStartedEventAttributes startEvent =
1✔
113
        event.getWorkflowExecutionStartedEventAttributes();
1✔
114
    WorkflowType workflowType = startEvent.getWorkflowType();
1✔
115
    if (workflow == null) {
1✔
116
      throw new IllegalArgumentException("Unknown workflow type: " + workflowType);
×
117
    }
118

119
    this.workflowContext.setReplayContext(context);
1✔
120

121
    workflowProc =
1✔
122
        new WorkflowExecutionHandler(
123
            workflowContext, workflow, startEvent, workflowImplementationOptions);
124
    // The following order is ensured by this code and DeterministicRunner implementation:
125
    // 1. workflow.initialize
126
    // 2. signal handler (if signalWithStart was called)
127
    // 3. main workflow method
128
    runner =
1✔
129
        DeterministicRunner.newRunner(
1✔
130
            workflowThreadExecutor,
131
            workflowContext,
132
            () -> {
133
              workflow.initialize();
1✔
134
              WorkflowInternal.newWorkflowMethodThread(
1✔
135
                      () -> workflowProc.runWorkflowMethod(),
1✔
136
                      workflowMethodThreadNameStrategy.createThreadName(
1✔
137
                          context.getWorkflowExecution()))
1✔
138
                  .start();
1✔
139
            },
1✔
140
            cache);
141
  }
1✔
142

143
  @Override
144
  public void handleSignal(
145
      String signalName, Optional<Payloads> input, long eventId, Header header) {
146
    runner.executeInWorkflowThread(
1✔
147
        "signal " + signalName,
148
        () -> workflowProc.handleSignal(signalName, input, eventId, header));
1✔
149
  }
1✔
150

151
  @Override
152
  public void handleUpdate(
153
      String updateName,
154
      Optional<Payloads> input,
155
      long eventId,
156
      Header header,
157
      UpdateProtocolCallback callbacks) {
158
    runner.executeInWorkflowThread(
1✔
159
        "update " + updateName,
160
        () -> {
161
          // Skip validator on replay
162
          if (!callbacks.isReplaying()) {
1✔
163
            try {
164
              // TODO(https://github.com/temporalio/sdk-java/issues/1748) handleValidateUpdate
165
              // should not just be run
166
              // in a workflow thread
167
              workflowContext.setReadOnly(true);
1✔
168
              workflowProc.handleValidateUpdate(updateName, input, eventId, header);
1✔
169
            } catch (Exception e) {
1✔
170
              callbacks.reject(this.dataConverter.exceptionToFailure(e));
1✔
171
              return;
1✔
172
            } finally {
173
              workflowContext.setReadOnly(false);
1✔
174
            }
175
          }
176
          callbacks.accept();
1✔
177
          try {
178
            Optional<Payloads> result =
1✔
179
                workflowProc.handleExecuteUpdate(updateName, input, eventId, header);
1✔
180
            callbacks.complete(result, null);
1✔
181
          } catch (WorkflowExecutionException e) {
1✔
182
            callbacks.complete(Optional.empty(), e.getFailure());
1✔
183
          }
1✔
184
        });
1✔
185
  }
1✔
186

187
  @Override
188
  public boolean eventLoop() {
189
    if (runner == null) {
1✔
190
      return false;
×
191
    }
192
    runner.runUntilAllBlocked(defaultDeadlockDetectionTimeout);
1✔
193
    return runner.isDone() || workflowProc.isDone(); // Do not wait for all other threads.
1✔
194
  }
195

196
  @Override
197
  public Optional<Payloads> getOutput() {
198
    return workflowProc.getOutput();
1✔
199
  }
200

201
  @Override
202
  public void cancel(String reason) {
203
    runner.cancel(reason);
1✔
204
  }
1✔
205

206
  @Override
207
  public void close() {
208
    if (runner != null) {
1✔
209
      runner.close();
1✔
210
    }
211
  }
1✔
212

213
  @Override
214
  public Optional<Payloads> query(WorkflowQuery query) {
215
    if (WorkflowClient.QUERY_TYPE_REPLAY_ONLY.equals(query.getQueryType())) {
1✔
216
      return Optional.empty();
1✔
217
    }
218
    if (WorkflowClient.QUERY_TYPE_STACK_TRACE.equals(query.getQueryType())) {
1✔
219
      // stack trace query result should be readable for UI even if user specifies a custom data
220
      // converter
221
      return DefaultDataConverter.STANDARD_INSTANCE.toPayloads(runner.stackTrace());
1✔
222
    }
223
    Optional<Payloads> args =
224
        query.hasQueryArgs() ? Optional.of(query.getQueryArgs()) : Optional.empty();
1✔
225
    return workflowProc.handleQuery(query.getQueryType(), query.getHeader(), args);
1✔
226
  }
227

228
  @Override
229
  public WorkflowContext getWorkflowContext() {
230
    return workflowContext;
1✔
231
  }
232
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc