• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

temporalio / sdk-java / #153

pending completion
#153

push

github-actions

web-flow
Eager Workflow Task Dispatch (#1674)

Issue #1646

Signed-off-by: Dmitry Spikhalskiy <dmitry@spikhalskiy.com>

213 of 213 new or added lines in 22 files covered. (100.0%)

16682 of 20566 relevant lines covered (81.11%)

0.81 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

98.17
/temporal-sdk/src/main/java/io/temporal/internal/client/RootWorkflowClientInvoker.java
1
/*
2
 * Copyright (C) 2022 Temporal Technologies, Inc. All Rights Reserved.
3
 *
4
 * Copyright (C) 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved.
5
 *
6
 * Modifications copyright (C) 2017 Uber Technologies, Inc.
7
 *
8
 * Licensed under the Apache License, Version 2.0 (the "License");
9
 * you may not use this material except in compliance with the License.
10
 * You may obtain a copy of the License at
11
 *
12
 *   http://www.apache.org/licenses/LICENSE-2.0
13
 *
14
 * Unless required by applicable law or agreed to in writing, software
15
 * distributed under the License is distributed on an "AS IS" BASIS,
16
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17
 * See the License for the specific language governing permissions and
18
 * limitations under the License.
19
 */
20

21
package io.temporal.internal.client;
22

23
import io.temporal.api.common.v1.*;
24
import io.temporal.api.enums.v1.WorkflowExecutionStatus;
25
import io.temporal.api.query.v1.WorkflowQuery;
26
import io.temporal.api.workflowservice.v1.*;
27
import io.temporal.client.WorkflowClientOptions;
28
import io.temporal.common.interceptors.WorkflowClientCallsInterceptor;
29
import io.temporal.internal.client.external.GenericWorkflowClient;
30
import io.temporal.worker.WorkflowTaskDispatchHandle;
31
import java.lang.reflect.Type;
32
import java.util.*;
33
import java.util.concurrent.CompletableFuture;
34
import java.util.concurrent.TimeoutException;
35
import javax.annotation.Nullable;
36
import org.slf4j.Logger;
37
import org.slf4j.LoggerFactory;
38

39
public class RootWorkflowClientInvoker implements WorkflowClientCallsInterceptor {
40
  private static final Logger log = LoggerFactory.getLogger(RootWorkflowClientInvoker.class);
1✔
41

42
  private final GenericWorkflowClient genericClient;
43
  private final WorkflowClientOptions clientOptions;
44
  private final EagerWorkflowTaskDispatcher eagerWorkflowTaskDispatcher;
45
  private final WorkflowClientRequestFactory requestsHelper;
46

47
  public RootWorkflowClientInvoker(
48
      GenericWorkflowClient genericClient,
49
      WorkflowClientOptions clientOptions,
50
      WorkerFactoryRegistry workerFactoryRegistry) {
1✔
51
    this.genericClient = genericClient;
1✔
52
    this.clientOptions = clientOptions;
1✔
53
    this.eagerWorkflowTaskDispatcher = new EagerWorkflowTaskDispatcher(workerFactoryRegistry);
1✔
54
    this.requestsHelper = new WorkflowClientRequestFactory(clientOptions);
1✔
55
  }
1✔
56

57
  @Override
58
  public WorkflowStartOutput start(WorkflowStartInput input) {
59
    try (@Nullable WorkflowTaskDispatchHandle eagerDispatchHandle = obtainDispatchHandle(input)) {
1✔
60
      StartWorkflowExecutionRequest.Builder request =
1✔
61
          requestsHelper.newStartWorkflowExecutionRequest(input);
1✔
62
      boolean requestEagerExecution = eagerDispatchHandle != null;
1✔
63
      request.setRequestEagerExecution(requestEagerExecution);
1✔
64
      StartWorkflowExecutionResponse response = genericClient.start(request.build());
1✔
65
      WorkflowExecution execution =
66
          WorkflowExecution.newBuilder()
1✔
67
              .setRunId(response.getRunId())
1✔
68
              .setWorkflowId(request.getWorkflowId())
1✔
69
              .build();
1✔
70
      @Nullable
71
      PollWorkflowTaskQueueResponse eagerWorkflowTask =
72
          requestEagerExecution && response.hasEagerWorkflowTask()
1✔
73
              ? response.getEagerWorkflowTask()
1✔
74
              : null;
1✔
75
      if (eagerWorkflowTask != null) {
1✔
76
        try {
77
          eagerDispatchHandle.dispatch(eagerWorkflowTask);
1✔
78
        } catch (Exception e) {
×
79
          // Any exception here is not expected, and it's a bug.
80
          // But we don't allow any exception from the dispatching to disrupt the control flow here,
81
          // the Client needs to get the execution back to matter what.
82
          // Inability to dispatch a WFT creates a latency issue, but it's not a failure of the
83
          // start itself
84
          log.error(
×
85
              "[BUG] Eager Workflow Task was received from the Server, but failed to be dispatched on the local worker",
86
              e);
87
        }
1✔
88
      }
89
      return new WorkflowStartOutput(execution);
1✔
90
    }
91
  }
92

93
  @Override
94
  public WorkflowSignalOutput signal(WorkflowSignalInput input) {
95
    SignalWorkflowExecutionRequest.Builder request =
96
        SignalWorkflowExecutionRequest.newBuilder()
1✔
97
            .setSignalName(input.getSignalName())
1✔
98
            .setWorkflowExecution(input.getWorkflowExecution())
1✔
99
            .setIdentity(clientOptions.getIdentity())
1✔
100
            .setNamespace(clientOptions.getNamespace());
1✔
101

102
    Optional<Payloads> inputArgs =
1✔
103
        clientOptions.getDataConverter().toPayloads(input.getArguments());
1✔
104
    inputArgs.ifPresent(request::setInput);
1✔
105
    genericClient.signal(request.build());
1✔
106
    return new WorkflowSignalOutput();
1✔
107
  }
108

109
  @Override
110
  public WorkflowSignalWithStartOutput signalWithStart(WorkflowSignalWithStartInput input) {
111
    StartWorkflowExecutionRequestOrBuilder startRequest =
1✔
112
        requestsHelper.newStartWorkflowExecutionRequest(input.getWorkflowStartInput());
1✔
113
    Optional<Payloads> signalInput =
1✔
114
        clientOptions.getDataConverter().toPayloads(input.getSignalArguments());
1✔
115
    SignalWithStartWorkflowExecutionRequest request =
1✔
116
        requestsHelper
117
            .newSignalWithStartWorkflowExecutionRequest(
1✔
118
                startRequest, input.getSignalName(), signalInput.orElse(null))
1✔
119
            .build();
1✔
120
    SignalWithStartWorkflowExecutionResponse response = genericClient.signalWithStart(request);
1✔
121
    WorkflowExecution execution =
122
        WorkflowExecution.newBuilder()
1✔
123
            .setRunId(response.getRunId())
1✔
124
            .setWorkflowId(request.getWorkflowId())
1✔
125
            .build();
1✔
126
    // TODO currently SignalWithStartWorkflowExecutionResponse doesn't have eagerWorkflowTask.
127
    //  We should wire it when it's implemented server-side.
128
    return new WorkflowSignalWithStartOutput(new WorkflowStartOutput(execution));
1✔
129
  }
130

131
  @Override
132
  public <R> GetResultOutput<R> getResult(GetResultInput<R> input) throws TimeoutException {
133
    Optional<Payloads> resultValue =
1✔
134
        WorkflowClientLongPollHelper.getWorkflowExecutionResult(
1✔
135
            genericClient,
136
            requestsHelper,
137
            input.getWorkflowExecution(),
1✔
138
            input.getWorkflowType(),
1✔
139
            clientOptions.getDataConverter(),
1✔
140
            input.getTimeout(),
1✔
141
            input.getTimeoutUnit());
1✔
142
    return new GetResultOutput<>(
1✔
143
        convertResultPayloads(resultValue, input.getResultClass(), input.getResultType()));
1✔
144
  }
145

146
  @Override
147
  public <R> GetResultAsyncOutput<R> getResultAsync(GetResultInput<R> input) {
148
    CompletableFuture<Optional<Payloads>> resultValue =
1✔
149
        WorkflowClientLongPollAsyncHelper.getWorkflowExecutionResultAsync(
1✔
150
            genericClient,
151
            requestsHelper,
152
            input.getWorkflowExecution(),
1✔
153
            input.getWorkflowType(),
1✔
154
            input.getTimeout(),
1✔
155
            input.getTimeoutUnit(),
1✔
156
            clientOptions.getDataConverter());
1✔
157
    return new GetResultAsyncOutput<>(
1✔
158
        resultValue.thenApply(
1✔
159
            payloads ->
160
                convertResultPayloads(payloads, input.getResultClass(), input.getResultType())));
1✔
161
  }
162

163
  @Override
164
  public <R> QueryOutput<R> query(QueryInput<R> input) {
165
    WorkflowQuery.Builder query = WorkflowQuery.newBuilder().setQueryType(input.getQueryType());
1✔
166
    Optional<Payloads> inputArgs =
1✔
167
        clientOptions.getDataConverter().toPayloads(input.getArguments());
1✔
168
    inputArgs.ifPresent(query::setQueryArgs);
1✔
169
    QueryWorkflowRequest request =
170
        QueryWorkflowRequest.newBuilder()
1✔
171
            .setNamespace(clientOptions.getNamespace())
1✔
172
            .setExecution(
1✔
173
                WorkflowExecution.newBuilder()
1✔
174
                    .setWorkflowId(input.getWorkflowExecution().getWorkflowId())
1✔
175
                    .setRunId(input.getWorkflowExecution().getRunId()))
1✔
176
            .setQuery(query)
1✔
177
            .setQueryRejectCondition(clientOptions.getQueryRejectCondition())
1✔
178
            .build();
1✔
179

180
    QueryWorkflowResponse result;
181
    result = genericClient.query(request);
1✔
182

183
    boolean queryRejected = result.hasQueryRejected();
1✔
184
    WorkflowExecutionStatus rejectStatus =
185
        queryRejected ? result.getQueryRejected().getStatus() : null;
1✔
186
    Optional<Payloads> queryResult =
187
        result.hasQueryResult() ? Optional.of(result.getQueryResult()) : Optional.empty();
1✔
188
    R resultValue =
1✔
189
        convertResultPayloads(queryResult, input.getResultClass(), input.getResultType());
1✔
190
    return new QueryOutput<>(rejectStatus, resultValue);
1✔
191
  }
192

193
  @Override
194
  public CancelOutput cancel(CancelInput input) {
195
    RequestCancelWorkflowExecutionRequest.Builder request =
196
        RequestCancelWorkflowExecutionRequest.newBuilder()
1✔
197
            .setRequestId(UUID.randomUUID().toString())
1✔
198
            .setWorkflowExecution(input.getWorkflowExecution())
1✔
199
            .setNamespace(clientOptions.getNamespace())
1✔
200
            .setIdentity(clientOptions.getIdentity());
1✔
201
    genericClient.requestCancel(request.build());
1✔
202
    return new CancelOutput();
1✔
203
  }
204

205
  @Override
206
  public TerminateOutput terminate(TerminateInput input) {
207
    TerminateWorkflowExecutionRequest.Builder request =
208
        TerminateWorkflowExecutionRequest.newBuilder()
1✔
209
            .setNamespace(clientOptions.getNamespace())
1✔
210
            .setWorkflowExecution(input.getWorkflowExecution());
1✔
211
    if (input.getReason() != null) {
1✔
212
      request.setReason(input.getReason());
1✔
213
    }
214
    Optional<Payloads> payloads = clientOptions.getDataConverter().toPayloads(input.getDetails());
1✔
215
    payloads.ifPresent(request::setDetails);
1✔
216
    genericClient.terminate(request.build());
1✔
217
    return new TerminateOutput();
1✔
218
  }
219

220
  private <R> R convertResultPayloads(
221
      Optional<Payloads> resultValue, Class<R> resultClass, Type resultType) {
222
    return clientOptions.getDataConverter().fromPayloads(0, resultValue, resultClass, resultType);
1✔
223
  }
224

225
  /**
226
   * @return a handle to dispatch the eager workflow task. {@code null} if an eager execution is
227
   *     disabled through {@link io.temporal.client.WorkflowOptions} or the worker
228
   *     <ul>
229
   *       <li>is activity only worker
230
   *       <li>not started, shutdown or paused
231
   *       <li>doesn't have an executor slot available
232
   *     </ul>
233
   */
234
  @Nullable
235
  private WorkflowTaskDispatchHandle obtainDispatchHandle(WorkflowStartInput input) {
236
    if (input.getOptions().isDisableEagerExecution()) {
1✔
237
      return null;
1✔
238
    }
239
    return eagerWorkflowTaskDispatcher.tryGetLocalDispatchHandler(input);
1✔
240
  }
241
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc