• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

temporalio / sdk-java / #157

pending completion
#157

push

github-actions

web-flow
Provide SerializationContext for PayloadConverter and PayloadCodec (#1695)

Issue #1694

497 of 497 new or added lines in 32 files covered. (100.0%)

16942 of 20806 relevant lines covered (81.43%)

0.81 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

92.59
/temporal-sdk/src/main/java/io/temporal/internal/sync/SyncWorkflow.java
1
/*
2
 * Copyright (C) 2022 Temporal Technologies, Inc. All Rights Reserved.
3
 *
4
 * Copyright (C) 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved.
5
 *
6
 * Modifications copyright (C) 2017 Uber Technologies, Inc.
7
 *
8
 * Licensed under the Apache License, Version 2.0 (the "License");
9
 * you may not use this material except in compliance with the License.
10
 * You may obtain a copy of the License at
11
 *
12
 *   http://www.apache.org/licenses/LICENSE-2.0
13
 *
14
 * Unless required by applicable law or agreed to in writing, software
15
 * distributed under the License is distributed on an "AS IS" BASIS,
16
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17
 * See the License for the specific language governing permissions and
18
 * limitations under the License.
19
 */
20

21
package io.temporal.internal.sync;
22

23
import io.temporal.api.common.v1.Payloads;
24
import io.temporal.api.common.v1.WorkflowExecution;
25
import io.temporal.api.common.v1.WorkflowType;
26
import io.temporal.api.enums.v1.EventType;
27
import io.temporal.api.history.v1.HistoryEvent;
28
import io.temporal.api.history.v1.WorkflowExecutionStartedEventAttributes;
29
import io.temporal.api.query.v1.WorkflowQuery;
30
import io.temporal.client.WorkflowClient;
31
import io.temporal.common.context.ContextPropagator;
32
import io.temporal.common.converter.DataConverter;
33
import io.temporal.common.converter.DefaultDataConverter;
34
import io.temporal.internal.replay.ReplayWorkflow;
35
import io.temporal.internal.replay.ReplayWorkflowContext;
36
import io.temporal.internal.replay.WorkflowContext;
37
import io.temporal.internal.worker.WorkflowExecutorCache;
38
import io.temporal.worker.WorkflowImplementationOptions;
39
import java.util.List;
40
import java.util.Objects;
41
import java.util.Optional;
42
import javax.annotation.Nonnull;
43
import javax.annotation.Nullable;
44
import org.slf4j.Logger;
45
import org.slf4j.LoggerFactory;
46

47
/**
48
 * SyncWorkflow supports workflows that use synchronous blocking code. An instance is created per
49
 * cached workflow run.
50
 */
51
class SyncWorkflow implements ReplayWorkflow {
52

53
  private static final Logger log = LoggerFactory.getLogger(SyncWorkflow.class);
1✔
54

55
  private final WorkflowThreadExecutor workflowThreadExecutor;
56
  private final SyncWorkflowDefinition workflow;
57
  @Nonnull private final WorkflowImplementationOptions workflowImplementationOptions;
58
  private final WorkflowExecutorCache cache;
59
  private final long defaultDeadlockDetectionTimeout;
60
  private final WorkflowMethodThreadNameStrategy workflowMethodThreadNameStrategy =
1✔
61
      ExecutionInfoStrategy.INSTANCE;
62
  private final SyncWorkflowContext workflowContext;
63
  private WorkflowExecutionHandler workflowProc;
64
  private DeterministicRunner runner;
65

66
  public SyncWorkflow(
67
      String namespace,
68
      WorkflowExecution workflowExecution,
69
      SyncWorkflowDefinition workflow,
70
      SignalDispatcher signalDispatcher,
71
      QueryDispatcher queryDispatcher,
72
      @Nullable WorkflowImplementationOptions workflowImplementationOptions,
73
      DataConverter dataConverter,
74
      WorkflowThreadExecutor workflowThreadExecutor,
75
      WorkflowExecutorCache cache,
76
      List<ContextPropagator> contextPropagators,
77
      long defaultDeadlockDetectionTimeout) {
1✔
78
    this.workflow = Objects.requireNonNull(workflow);
1✔
79
    this.workflowImplementationOptions =
1✔
80
        workflowImplementationOptions == null
1✔
81
            ? WorkflowImplementationOptions.getDefaultInstance()
1✔
82
            : workflowImplementationOptions;
1✔
83
    this.workflowThreadExecutor = Objects.requireNonNull(workflowThreadExecutor);
1✔
84
    this.cache = cache;
1✔
85
    this.defaultDeadlockDetectionTimeout = defaultDeadlockDetectionTimeout;
1✔
86
    this.workflowContext =
1✔
87
        new SyncWorkflowContext(
88
            namespace,
89
            workflowExecution,
90
            signalDispatcher,
91
            queryDispatcher,
92
            workflowImplementationOptions,
93
            dataConverter,
94
            contextPropagators);
95
  }
1✔
96

97
  @Override
98
  public void start(HistoryEvent event, ReplayWorkflowContext context) {
99
    if (event.getEventType() != EventType.EVENT_TYPE_WORKFLOW_EXECUTION_STARTED
1✔
100
        || !event.hasWorkflowExecutionStartedEventAttributes()) {
1✔
101
      throw new IllegalArgumentException(
×
102
          "first event is not WorkflowExecutionStarted, but " + event.getEventType());
×
103
    }
104

105
    WorkflowExecutionStartedEventAttributes startEvent =
1✔
106
        event.getWorkflowExecutionStartedEventAttributes();
1✔
107
    WorkflowType workflowType = startEvent.getWorkflowType();
1✔
108
    if (workflow == null) {
1✔
109
      throw new IllegalArgumentException("Unknown workflow type: " + workflowType);
×
110
    }
111

112
    this.workflowContext.setReplayContext(context);
1✔
113

114
    workflowProc =
1✔
115
        new WorkflowExecutionHandler(
116
            workflowContext, workflow, startEvent, workflowImplementationOptions);
117
    // The following order is ensured by this code and DeterministicRunner implementation:
118
    // 1. workflow.initialize
119
    // 2. signal handler (if signalWithStart was called)
120
    // 3. main workflow method
121
    runner =
1✔
122
        DeterministicRunner.newRunner(
1✔
123
            workflowThreadExecutor,
124
            workflowContext,
125
            () -> {
126
              workflow.initialize();
1✔
127
              WorkflowInternal.newWorkflowMethodThread(
1✔
128
                      () -> workflowProc.runWorkflowMethod(),
1✔
129
                      workflowMethodThreadNameStrategy.createThreadName(
1✔
130
                          context.getWorkflowExecution()))
1✔
131
                  .start();
1✔
132
            },
1✔
133
            cache);
134
  }
1✔
135

136
  @Override
137
  public void handleSignal(String signalName, Optional<Payloads> input, long eventId) {
138
    runner.executeInWorkflowThread(
1✔
139
        "signal " + signalName, () -> workflowProc.handleSignal(signalName, input, eventId));
1✔
140
  }
1✔
141

142
  @Override
143
  public boolean eventLoop() {
144
    if (runner == null) {
1✔
145
      return false;
×
146
    }
147
    runner.runUntilAllBlocked(defaultDeadlockDetectionTimeout);
1✔
148
    return runner.isDone() || workflowProc.isDone(); // Do not wait for all other threads.
1✔
149
  }
150

151
  @Override
152
  public Optional<Payloads> getOutput() {
153
    return workflowProc.getOutput();
1✔
154
  }
155

156
  @Override
157
  public void cancel(String reason) {
158
    runner.cancel(reason);
1✔
159
  }
1✔
160

161
  @Override
162
  public void close() {
163
    if (runner != null) {
1✔
164
      runner.close();
1✔
165
    }
166
  }
1✔
167

168
  @Override
169
  public Optional<Payloads> query(WorkflowQuery query) {
170
    if (WorkflowClient.QUERY_TYPE_REPLAY_ONLY.equals(query.getQueryType())) {
1✔
171
      return Optional.empty();
1✔
172
    }
173
    if (WorkflowClient.QUERY_TYPE_STACK_TRACE.equals(query.getQueryType())) {
1✔
174
      // stack trace query result should be readable for UI even if user specifies a custom data
175
      // converter
176
      return DefaultDataConverter.STANDARD_INSTANCE.toPayloads(runner.stackTrace());
1✔
177
    }
178
    Optional<Payloads> args =
179
        query.hasQueryArgs() ? Optional.of(query.getQueryArgs()) : Optional.empty();
1✔
180
    return workflowProc.handleQuery(query.getQueryType(), args);
1✔
181
  }
182

183
  @Override
184
  public WorkflowContext getWorkflowContext() {
185
    return workflowContext;
1✔
186
  }
187
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc