• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

temporalio / sdk-java / #157

pending completion
#157

push

github-actions

web-flow
Provide SerializationContext for PayloadConverter and PayloadCodec (#1695)

Issue #1694

497 of 497 new or added lines in 32 files covered. (100.0%)

16942 of 20806 relevant lines covered (81.43%)

0.81 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

95.27
/temporal-sdk/src/main/java/io/temporal/internal/client/RootWorkflowClientInvoker.java
1
/*
2
 * Copyright (C) 2022 Temporal Technologies, Inc. All Rights Reserved.
3
 *
4
 * Copyright (C) 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved.
5
 *
6
 * Modifications copyright (C) 2017 Uber Technologies, Inc.
7
 *
8
 * Licensed under the Apache License, Version 2.0 (the "License");
9
 * you may not use this material except in compliance with the License.
10
 * You may obtain a copy of the License at
11
 *
12
 *   http://www.apache.org/licenses/LICENSE-2.0
13
 *
14
 * Unless required by applicable law or agreed to in writing, software
15
 * distributed under the License is distributed on an "AS IS" BASIS,
16
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17
 * See the License for the specific language governing permissions and
18
 * limitations under the License.
19
 */
20

21
package io.temporal.internal.client;
22

23
import static io.temporal.internal.common.HeaderUtils.intoPayloadMap;
24

25
import io.temporal.api.common.v1.*;
26
import io.temporal.api.enums.v1.WorkflowExecutionStatus;
27
import io.temporal.api.query.v1.WorkflowQuery;
28
import io.temporal.api.workflowservice.v1.*;
29
import io.temporal.client.WorkflowClientOptions;
30
import io.temporal.common.converter.DataConverter;
31
import io.temporal.common.interceptors.WorkflowClientCallsInterceptor;
32
import io.temporal.internal.client.external.GenericWorkflowClient;
33
import io.temporal.payload.context.WorkflowSerializationContext;
34
import io.temporal.worker.WorkflowTaskDispatchHandle;
35
import java.lang.reflect.Type;
36
import java.util.*;
37
import java.util.concurrent.CompletableFuture;
38
import java.util.concurrent.TimeoutException;
39
import javax.annotation.Nullable;
40
import org.slf4j.Logger;
41
import org.slf4j.LoggerFactory;
42

43
public class RootWorkflowClientInvoker implements WorkflowClientCallsInterceptor {
44
  private static final Logger log = LoggerFactory.getLogger(RootWorkflowClientInvoker.class);
1✔
45

46
  private final GenericWorkflowClient genericClient;
47
  private final WorkflowClientOptions clientOptions;
48
  private final EagerWorkflowTaskDispatcher eagerWorkflowTaskDispatcher;
49
  private final WorkflowClientRequestFactory requestsHelper;
50

51
  public RootWorkflowClientInvoker(
52
      GenericWorkflowClient genericClient,
53
      WorkflowClientOptions clientOptions,
54
      WorkerFactoryRegistry workerFactoryRegistry) {
1✔
55
    this.genericClient = genericClient;
1✔
56
    this.clientOptions = clientOptions;
1✔
57
    this.eagerWorkflowTaskDispatcher = new EagerWorkflowTaskDispatcher(workerFactoryRegistry);
1✔
58
    this.requestsHelper = new WorkflowClientRequestFactory(clientOptions);
1✔
59
  }
1✔
60

61
  @Override
62
  public WorkflowStartOutput start(WorkflowStartInput input) {
63
    DataConverter dataConverterWithWorkflowContext =
1✔
64
        clientOptions
65
            .getDataConverter()
1✔
66
            .withContext(
1✔
67
                new WorkflowSerializationContext(
68
                    clientOptions.getNamespace(), input.getWorkflowId()));
1✔
69
    Optional<Payloads> inputArgs =
1✔
70
        dataConverterWithWorkflowContext.toPayloads(input.getArguments());
1✔
71

72
    @Nullable
73
    Memo memo =
74
        (input.getOptions().getMemo() != null)
1✔
75
            ? Memo.newBuilder()
1✔
76
                .putAllFields(
1✔
77
                    intoPayloadMap(clientOptions.getDataConverter(), input.getOptions().getMemo()))
1✔
78
                .build()
1✔
79
            : null;
1✔
80

81
    StartWorkflowExecutionRequest.Builder request =
1✔
82
        requestsHelper.newStartWorkflowExecutionRequest(
1✔
83
            input.getWorkflowId(),
1✔
84
            input.getWorkflowType(),
1✔
85
            input.getHeader(),
1✔
86
            input.getOptions(),
1✔
87
            inputArgs.orElse(null),
1✔
88
            memo);
89
    try (@Nullable WorkflowTaskDispatchHandle eagerDispatchHandle = obtainDispatchHandle(input)) {
1✔
90
      boolean requestEagerExecution = eagerDispatchHandle != null;
1✔
91
      request.setRequestEagerExecution(requestEagerExecution);
1✔
92
      StartWorkflowExecutionResponse response = genericClient.start(request.build());
1✔
93
      WorkflowExecution execution =
94
          WorkflowExecution.newBuilder()
1✔
95
              .setRunId(response.getRunId())
1✔
96
              .setWorkflowId(request.getWorkflowId())
1✔
97
              .build();
1✔
98
      @Nullable
99
      PollWorkflowTaskQueueResponse eagerWorkflowTask =
100
          requestEagerExecution && response.hasEagerWorkflowTask()
1✔
101
              ? response.getEagerWorkflowTask()
1✔
102
              : null;
1✔
103
      if (eagerWorkflowTask != null) {
1✔
104
        try {
105
          eagerDispatchHandle.dispatch(eagerWorkflowTask);
1✔
106
        } catch (Exception e) {
×
107
          // Any exception here is not expected, and it's a bug.
108
          // But we don't allow any exception from the dispatching to disrupt the control flow here,
109
          // the Client needs to get the execution back to matter what.
110
          // Inability to dispatch a WFT creates a latency issue, but it's not a failure of the
111
          // start itself
112
          log.error(
×
113
              "[BUG] Eager Workflow Task was received from the Server, but failed to be dispatched on the local worker",
114
              e);
115
        }
1✔
116
      }
117
      return new WorkflowStartOutput(execution);
1✔
118
    }
119
  }
120

121
  @Override
122
  public WorkflowSignalOutput signal(WorkflowSignalInput input) {
123
    SignalWorkflowExecutionRequest.Builder request =
124
        SignalWorkflowExecutionRequest.newBuilder()
1✔
125
            .setSignalName(input.getSignalName())
1✔
126
            .setWorkflowExecution(input.getWorkflowExecution())
1✔
127
            .setIdentity(clientOptions.getIdentity())
1✔
128
            .setNamespace(clientOptions.getNamespace());
1✔
129

130
    DataConverter dataConverterWitSignalContext =
1✔
131
        clientOptions
132
            .getDataConverter()
1✔
133
            .withContext(
1✔
134
                new WorkflowSerializationContext(
135
                    clientOptions.getNamespace(), input.getWorkflowExecution().getWorkflowId()));
1✔
136

137
    Optional<Payloads> inputArgs = dataConverterWitSignalContext.toPayloads(input.getArguments());
1✔
138
    inputArgs.ifPresent(request::setInput);
1✔
139
    genericClient.signal(request.build());
1✔
140
    return new WorkflowSignalOutput();
1✔
141
  }
142

143
  @Override
144
  public WorkflowSignalWithStartOutput signalWithStart(WorkflowSignalWithStartInput input) {
145
    WorkflowStartInput workflowStartInput = input.getWorkflowStartInput();
1✔
146

147
    DataConverter dataConverterWithWorkflowContext =
1✔
148
        clientOptions
149
            .getDataConverter()
1✔
150
            .withContext(
1✔
151
                new WorkflowSerializationContext(
152
                    clientOptions.getNamespace(), workflowStartInput.getWorkflowId()));
1✔
153
    Optional<Payloads> workflowInput =
1✔
154
        dataConverterWithWorkflowContext.toPayloads(workflowStartInput.getArguments());
1✔
155

156
    @Nullable
157
    Memo memo =
158
        (workflowStartInput.getOptions().getMemo() != null)
1✔
159
            ? Memo.newBuilder()
×
160
                .putAllFields(
×
161
                    intoPayloadMap(
×
162
                        clientOptions.getDataConverter(),
×
163
                        workflowStartInput.getOptions().getMemo()))
×
164
                .build()
×
165
            : null;
1✔
166

167
    StartWorkflowExecutionRequestOrBuilder startRequest =
1✔
168
        requestsHelper.newStartWorkflowExecutionRequest(
1✔
169
            workflowStartInput.getWorkflowId(),
1✔
170
            workflowStartInput.getWorkflowType(),
1✔
171
            workflowStartInput.getHeader(),
1✔
172
            workflowStartInput.getOptions(),
1✔
173
            workflowInput.orElse(null),
1✔
174
            memo);
175

176
    Optional<Payloads> signalInput =
1✔
177
        dataConverterWithWorkflowContext.toPayloads(input.getSignalArguments());
1✔
178
    SignalWithStartWorkflowExecutionRequest request =
1✔
179
        requestsHelper
180
            .newSignalWithStartWorkflowExecutionRequest(
1✔
181
                startRequest, input.getSignalName(), signalInput.orElse(null))
1✔
182
            .build();
1✔
183
    SignalWithStartWorkflowExecutionResponse response = genericClient.signalWithStart(request);
1✔
184
    WorkflowExecution execution =
185
        WorkflowExecution.newBuilder()
1✔
186
            .setRunId(response.getRunId())
1✔
187
            .setWorkflowId(request.getWorkflowId())
1✔
188
            .build();
1✔
189
    // TODO currently SignalWithStartWorkflowExecutionResponse doesn't have eagerWorkflowTask.
190
    //  We should wire it when it's implemented server-side.
191
    return new WorkflowSignalWithStartOutput(new WorkflowStartOutput(execution));
1✔
192
  }
193

194
  @Override
195
  public <R> GetResultOutput<R> getResult(GetResultInput<R> input) throws TimeoutException {
196
    DataConverter dataConverterWithWorkflowContext =
1✔
197
        clientOptions
198
            .getDataConverter()
1✔
199
            .withContext(
1✔
200
                new WorkflowSerializationContext(
201
                    clientOptions.getNamespace(), input.getWorkflowExecution().getWorkflowId()));
1✔
202
    Optional<Payloads> resultValue =
1✔
203
        WorkflowClientLongPollHelper.getWorkflowExecutionResult(
1✔
204
            genericClient,
205
            requestsHelper,
206
            input.getWorkflowExecution(),
1✔
207
            input.getWorkflowType(),
1✔
208
            dataConverterWithWorkflowContext,
209
            input.getTimeout(),
1✔
210
            input.getTimeoutUnit());
1✔
211
    return new GetResultOutput<>(
1✔
212
        convertResultPayloads(
1✔
213
            resultValue,
214
            input.getResultClass(),
1✔
215
            input.getResultType(),
1✔
216
            dataConverterWithWorkflowContext));
217
  }
218

219
  @Override
220
  public <R> GetResultAsyncOutput<R> getResultAsync(GetResultInput<R> input) {
221
    DataConverter dataConverterWithWorkflowContext =
1✔
222
        clientOptions
223
            .getDataConverter()
1✔
224
            .withContext(
1✔
225
                new WorkflowSerializationContext(
226
                    clientOptions.getNamespace(), input.getWorkflowExecution().getWorkflowId()));
1✔
227
    CompletableFuture<Optional<Payloads>> resultValue =
1✔
228
        WorkflowClientLongPollAsyncHelper.getWorkflowExecutionResultAsync(
1✔
229
            genericClient,
230
            requestsHelper,
231
            input.getWorkflowExecution(),
1✔
232
            input.getWorkflowType(),
1✔
233
            input.getTimeout(),
1✔
234
            input.getTimeoutUnit(),
1✔
235
            dataConverterWithWorkflowContext);
236
    return new GetResultAsyncOutput<>(
1✔
237
        resultValue.thenApply(
1✔
238
            payloads ->
239
                convertResultPayloads(
1✔
240
                    payloads,
241
                    input.getResultClass(),
1✔
242
                    input.getResultType(),
1✔
243
                    dataConverterWithWorkflowContext)));
244
  }
245

246
  @Override
247
  public <R> QueryOutput<R> query(QueryInput<R> input) {
248
    WorkflowQuery.Builder query = WorkflowQuery.newBuilder().setQueryType(input.getQueryType());
1✔
249
    DataConverter dataConverterWithWorkflowContext =
1✔
250
        clientOptions
251
            .getDataConverter()
1✔
252
            .withContext(
1✔
253
                new WorkflowSerializationContext(
254
                    clientOptions.getNamespace(), input.getWorkflowExecution().getWorkflowId()));
1✔
255

256
    Optional<Payloads> inputArgs =
1✔
257
        dataConverterWithWorkflowContext.toPayloads(input.getArguments());
1✔
258
    inputArgs.ifPresent(query::setQueryArgs);
1✔
259
    QueryWorkflowRequest request =
260
        QueryWorkflowRequest.newBuilder()
1✔
261
            .setNamespace(clientOptions.getNamespace())
1✔
262
            .setExecution(
1✔
263
                WorkflowExecution.newBuilder()
1✔
264
                    .setWorkflowId(input.getWorkflowExecution().getWorkflowId())
1✔
265
                    .setRunId(input.getWorkflowExecution().getRunId()))
1✔
266
            .setQuery(query)
1✔
267
            .setQueryRejectCondition(clientOptions.getQueryRejectCondition())
1✔
268
            .build();
1✔
269

270
    QueryWorkflowResponse result;
271
    result = genericClient.query(request);
1✔
272

273
    boolean queryRejected = result.hasQueryRejected();
1✔
274
    WorkflowExecutionStatus rejectStatus =
275
        queryRejected ? result.getQueryRejected().getStatus() : null;
1✔
276
    Optional<Payloads> queryResult =
277
        result.hasQueryResult() ? Optional.of(result.getQueryResult()) : Optional.empty();
1✔
278
    R resultValue =
1✔
279
        convertResultPayloads(
1✔
280
            queryResult,
281
            input.getResultClass(),
1✔
282
            input.getResultType(),
1✔
283
            dataConverterWithWorkflowContext);
284
    return new QueryOutput<>(rejectStatus, resultValue);
1✔
285
  }
286

287
  @Override
288
  public CancelOutput cancel(CancelInput input) {
289
    RequestCancelWorkflowExecutionRequest.Builder request =
290
        RequestCancelWorkflowExecutionRequest.newBuilder()
1✔
291
            .setRequestId(UUID.randomUUID().toString())
1✔
292
            .setWorkflowExecution(input.getWorkflowExecution())
1✔
293
            .setNamespace(clientOptions.getNamespace())
1✔
294
            .setIdentity(clientOptions.getIdentity());
1✔
295
    genericClient.requestCancel(request.build());
1✔
296
    return new CancelOutput();
1✔
297
  }
298

299
  @Override
300
  public TerminateOutput terminate(TerminateInput input) {
301
    TerminateWorkflowExecutionRequest.Builder request =
302
        TerminateWorkflowExecutionRequest.newBuilder()
1✔
303
            .setNamespace(clientOptions.getNamespace())
1✔
304
            .setWorkflowExecution(input.getWorkflowExecution());
1✔
305
    if (input.getReason() != null) {
1✔
306
      request.setReason(input.getReason());
1✔
307
    }
308
    DataConverter dataConverterWithWorkflowContext =
1✔
309
        clientOptions
310
            .getDataConverter()
1✔
311
            .withContext(
1✔
312
                new WorkflowSerializationContext(
313
                    clientOptions.getNamespace(), input.getWorkflowExecution().getWorkflowId()));
1✔
314
    Optional<Payloads> payloads = dataConverterWithWorkflowContext.toPayloads(input.getDetails());
1✔
315
    payloads.ifPresent(request::setDetails);
1✔
316
    genericClient.terminate(request.build());
1✔
317
    return new TerminateOutput();
1✔
318
  }
319

320
  private static <R> R convertResultPayloads(
321
      Optional<Payloads> resultValue,
322
      Class<R> resultClass,
323
      Type resultType,
324
      DataConverter dataConverter) {
325
    return dataConverter.fromPayloads(0, resultValue, resultClass, resultType);
1✔
326
  }
327

328
  /**
329
   * @return a handle to dispatch the eager workflow task. {@code null} if an eager execution is
330
   *     disabled through {@link io.temporal.client.WorkflowOptions} or the worker
331
   *     <ul>
332
   *       <li>is activity only worker
333
   *       <li>not started, shutdown or paused
334
   *       <li>doesn't have an executor slot available
335
   *     </ul>
336
   */
337
  @Nullable
338
  private WorkflowTaskDispatchHandle obtainDispatchHandle(WorkflowStartInput input) {
339
    if (input.getOptions().isDisableEagerExecution()) {
1✔
340
      return null;
1✔
341
    }
342
    return eagerWorkflowTaskDispatcher.tryGetLocalDispatchHandler(input);
1✔
343
  }
344
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc