• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

temporalio / sdk-java / #169

pending completion
#169

push

github-actions

web-flow
Remove use of deprecated API (#1758)

4 of 4 new or added lines in 1 file covered. (100.0%)

17345 of 21558 relevant lines covered (80.46%)

0.8 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

72.85
/temporal-sdk/src/main/java/io/temporal/internal/client/RootWorkflowClientInvoker.java
1
/*
2
 * Copyright (C) 2022 Temporal Technologies, Inc. All Rights Reserved.
3
 *
4
 * Copyright (C) 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved.
5
 *
6
 * Modifications copyright (C) 2017 Uber Technologies, Inc.
7
 *
8
 * Licensed under the Apache License, Version 2.0 (the "License");
9
 * you may not use this material except in compliance with the License.
10
 * You may obtain a copy of the License at
11
 *
12
 *   http://www.apache.org/licenses/LICENSE-2.0
13
 *
14
 * Unless required by applicable law or agreed to in writing, software
15
 * distributed under the License is distributed on an "AS IS" BASIS,
16
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17
 * See the License for the specific language governing permissions and
18
 * limitations under the License.
19
 */
20

21
package io.temporal.internal.client;
22

23
import static io.temporal.internal.common.HeaderUtils.intoPayloadMap;
24

25
import io.temporal.api.common.v1.*;
26
import io.temporal.api.enums.v1.UpdateWorkflowExecutionLifecycleStage;
27
import io.temporal.api.enums.v1.WorkflowExecutionStatus;
28
import io.temporal.api.query.v1.WorkflowQuery;
29
import io.temporal.api.update.v1.Input;
30
import io.temporal.api.update.v1.Meta;
31
import io.temporal.api.update.v1.Request;
32
import io.temporal.api.update.v1.WaitPolicy;
33
import io.temporal.api.workflowservice.v1.*;
34
import io.temporal.client.WorkflowClientOptions;
35
import io.temporal.client.WorkflowUpdateException;
36
import io.temporal.common.converter.DataConverter;
37
import io.temporal.common.interceptors.WorkflowClientCallsInterceptor;
38
import io.temporal.internal.client.external.GenericWorkflowClient;
39
import io.temporal.payload.context.WorkflowSerializationContext;
40
import io.temporal.worker.WorkflowTaskDispatchHandle;
41
import java.lang.reflect.Type;
42
import java.util.*;
43
import java.util.concurrent.CompletableFuture;
44
import java.util.concurrent.TimeoutException;
45
import javax.annotation.Nullable;
46
import org.slf4j.Logger;
47
import org.slf4j.LoggerFactory;
48

49
public class RootWorkflowClientInvoker implements WorkflowClientCallsInterceptor {
50
  private static final Logger log = LoggerFactory.getLogger(RootWorkflowClientInvoker.class);
1✔
51

52
  private final GenericWorkflowClient genericClient;
53
  private final WorkflowClientOptions clientOptions;
54
  private final EagerWorkflowTaskDispatcher eagerWorkflowTaskDispatcher;
55
  private final WorkflowClientRequestFactory requestsHelper;
56

57
  public RootWorkflowClientInvoker(
58
      GenericWorkflowClient genericClient,
59
      WorkflowClientOptions clientOptions,
60
      WorkerFactoryRegistry workerFactoryRegistry) {
1✔
61
    this.genericClient = genericClient;
1✔
62
    this.clientOptions = clientOptions;
1✔
63
    this.eagerWorkflowTaskDispatcher = new EagerWorkflowTaskDispatcher(workerFactoryRegistry);
1✔
64
    this.requestsHelper = new WorkflowClientRequestFactory(clientOptions);
1✔
65
  }
1✔
66

67
  @Override
68
  public WorkflowStartOutput start(WorkflowStartInput input) {
69
    DataConverter dataConverterWithWorkflowContext =
1✔
70
        clientOptions
71
            .getDataConverter()
1✔
72
            .withContext(
1✔
73
                new WorkflowSerializationContext(
74
                    clientOptions.getNamespace(), input.getWorkflowId()));
1✔
75
    Optional<Payloads> inputArgs =
1✔
76
        dataConverterWithWorkflowContext.toPayloads(input.getArguments());
1✔
77

78
    @Nullable
79
    Memo memo =
80
        (input.getOptions().getMemo() != null)
1✔
81
            ? Memo.newBuilder()
1✔
82
                .putAllFields(
1✔
83
                    intoPayloadMap(clientOptions.getDataConverter(), input.getOptions().getMemo()))
1✔
84
                .build()
1✔
85
            : null;
1✔
86

87
    StartWorkflowExecutionRequest.Builder request =
1✔
88
        requestsHelper.newStartWorkflowExecutionRequest(
1✔
89
            input.getWorkflowId(),
1✔
90
            input.getWorkflowType(),
1✔
91
            input.getHeader(),
1✔
92
            input.getOptions(),
1✔
93
            inputArgs.orElse(null),
1✔
94
            memo);
95
    try (@Nullable WorkflowTaskDispatchHandle eagerDispatchHandle = obtainDispatchHandle(input)) {
1✔
96
      boolean requestEagerExecution = eagerDispatchHandle != null;
1✔
97
      request.setRequestEagerExecution(requestEagerExecution);
1✔
98
      StartWorkflowExecutionResponse response = genericClient.start(request.build());
1✔
99
      WorkflowExecution execution =
100
          WorkflowExecution.newBuilder()
1✔
101
              .setRunId(response.getRunId())
1✔
102
              .setWorkflowId(request.getWorkflowId())
1✔
103
              .build();
1✔
104
      @Nullable
105
      PollWorkflowTaskQueueResponse eagerWorkflowTask =
106
          requestEagerExecution && response.hasEagerWorkflowTask()
1✔
107
              ? response.getEagerWorkflowTask()
1✔
108
              : null;
1✔
109
      if (eagerWorkflowTask != null) {
1✔
110
        try {
111
          eagerDispatchHandle.dispatch(eagerWorkflowTask);
1✔
112
        } catch (Exception e) {
×
113
          // Any exception here is not expected, and it's a bug.
114
          // But we don't allow any exception from the dispatching to disrupt the control flow here,
115
          // the Client needs to get the execution back to matter what.
116
          // Inability to dispatch a WFT creates a latency issue, but it's not a failure of the
117
          // start itself
118
          log.error(
×
119
              "[BUG] Eager Workflow Task was received from the Server, but failed to be dispatched on the local worker",
120
              e);
121
        }
1✔
122
      }
123
      return new WorkflowStartOutput(execution);
1✔
124
    }
125
  }
126

127
  @Override
128
  public WorkflowSignalOutput signal(WorkflowSignalInput input) {
129
    SignalWorkflowExecutionRequest.Builder request =
130
        SignalWorkflowExecutionRequest.newBuilder()
1✔
131
            .setSignalName(input.getSignalName())
1✔
132
            .setWorkflowExecution(input.getWorkflowExecution())
1✔
133
            .setIdentity(clientOptions.getIdentity())
1✔
134
            .setNamespace(clientOptions.getNamespace());
1✔
135

136
    DataConverter dataConverterWitSignalContext =
1✔
137
        clientOptions
138
            .getDataConverter()
1✔
139
            .withContext(
1✔
140
                new WorkflowSerializationContext(
141
                    clientOptions.getNamespace(), input.getWorkflowExecution().getWorkflowId()));
1✔
142

143
    Optional<Payloads> inputArgs = dataConverterWitSignalContext.toPayloads(input.getArguments());
1✔
144
    inputArgs.ifPresent(request::setInput);
1✔
145
    genericClient.signal(request.build());
1✔
146
    return new WorkflowSignalOutput();
1✔
147
  }
148

149
  @Override
150
  public WorkflowSignalWithStartOutput signalWithStart(WorkflowSignalWithStartInput input) {
151
    WorkflowStartInput workflowStartInput = input.getWorkflowStartInput();
1✔
152

153
    DataConverter dataConverterWithWorkflowContext =
1✔
154
        clientOptions
155
            .getDataConverter()
1✔
156
            .withContext(
1✔
157
                new WorkflowSerializationContext(
158
                    clientOptions.getNamespace(), workflowStartInput.getWorkflowId()));
1✔
159
    Optional<Payloads> workflowInput =
1✔
160
        dataConverterWithWorkflowContext.toPayloads(workflowStartInput.getArguments());
1✔
161

162
    @Nullable
163
    Memo memo =
164
        (workflowStartInput.getOptions().getMemo() != null)
1✔
165
            ? Memo.newBuilder()
×
166
                .putAllFields(
×
167
                    intoPayloadMap(
×
168
                        clientOptions.getDataConverter(),
×
169
                        workflowStartInput.getOptions().getMemo()))
×
170
                .build()
×
171
            : null;
1✔
172

173
    StartWorkflowExecutionRequestOrBuilder startRequest =
1✔
174
        requestsHelper.newStartWorkflowExecutionRequest(
1✔
175
            workflowStartInput.getWorkflowId(),
1✔
176
            workflowStartInput.getWorkflowType(),
1✔
177
            workflowStartInput.getHeader(),
1✔
178
            workflowStartInput.getOptions(),
1✔
179
            workflowInput.orElse(null),
1✔
180
            memo);
181

182
    Optional<Payloads> signalInput =
1✔
183
        dataConverterWithWorkflowContext.toPayloads(input.getSignalArguments());
1✔
184
    SignalWithStartWorkflowExecutionRequest request =
1✔
185
        requestsHelper
186
            .newSignalWithStartWorkflowExecutionRequest(
1✔
187
                startRequest, input.getSignalName(), signalInput.orElse(null))
1✔
188
            .build();
1✔
189
    SignalWithStartWorkflowExecutionResponse response = genericClient.signalWithStart(request);
1✔
190
    WorkflowExecution execution =
191
        WorkflowExecution.newBuilder()
1✔
192
            .setRunId(response.getRunId())
1✔
193
            .setWorkflowId(request.getWorkflowId())
1✔
194
            .build();
1✔
195
    // TODO currently SignalWithStartWorkflowExecutionResponse doesn't have eagerWorkflowTask.
196
    //  We should wire it when it's implemented server-side.
197
    return new WorkflowSignalWithStartOutput(new WorkflowStartOutput(execution));
1✔
198
  }
199

200
  @Override
201
  public <R> GetResultOutput<R> getResult(GetResultInput<R> input) throws TimeoutException {
202
    DataConverter dataConverterWithWorkflowContext =
1✔
203
        clientOptions
204
            .getDataConverter()
1✔
205
            .withContext(
1✔
206
                new WorkflowSerializationContext(
207
                    clientOptions.getNamespace(), input.getWorkflowExecution().getWorkflowId()));
1✔
208
    Optional<Payloads> resultValue =
1✔
209
        WorkflowClientLongPollHelper.getWorkflowExecutionResult(
1✔
210
            genericClient,
211
            requestsHelper,
212
            input.getWorkflowExecution(),
1✔
213
            input.getWorkflowType(),
1✔
214
            dataConverterWithWorkflowContext,
215
            input.getTimeout(),
1✔
216
            input.getTimeoutUnit());
1✔
217
    return new GetResultOutput<>(
1✔
218
        convertResultPayloads(
1✔
219
            resultValue,
220
            input.getResultClass(),
1✔
221
            input.getResultType(),
1✔
222
            dataConverterWithWorkflowContext));
223
  }
224

225
  @Override
226
  public <R> GetResultAsyncOutput<R> getResultAsync(GetResultInput<R> input) {
227
    DataConverter dataConverterWithWorkflowContext =
1✔
228
        clientOptions
229
            .getDataConverter()
1✔
230
            .withContext(
1✔
231
                new WorkflowSerializationContext(
232
                    clientOptions.getNamespace(), input.getWorkflowExecution().getWorkflowId()));
1✔
233
    CompletableFuture<Optional<Payloads>> resultValue =
1✔
234
        WorkflowClientLongPollAsyncHelper.getWorkflowExecutionResultAsync(
1✔
235
            genericClient,
236
            requestsHelper,
237
            input.getWorkflowExecution(),
1✔
238
            input.getWorkflowType(),
1✔
239
            input.getTimeout(),
1✔
240
            input.getTimeoutUnit(),
1✔
241
            dataConverterWithWorkflowContext);
242
    return new GetResultAsyncOutput<>(
1✔
243
        resultValue.thenApply(
1✔
244
            payloads ->
245
                convertResultPayloads(
1✔
246
                    payloads,
247
                    input.getResultClass(),
1✔
248
                    input.getResultType(),
1✔
249
                    dataConverterWithWorkflowContext)));
250
  }
251

252
  @Override
253
  public <R> QueryOutput<R> query(QueryInput<R> input) {
254
    WorkflowQuery.Builder query = WorkflowQuery.newBuilder().setQueryType(input.getQueryType());
1✔
255
    DataConverter dataConverterWithWorkflowContext =
1✔
256
        clientOptions
257
            .getDataConverter()
1✔
258
            .withContext(
1✔
259
                new WorkflowSerializationContext(
260
                    clientOptions.getNamespace(), input.getWorkflowExecution().getWorkflowId()));
1✔
261

262
    Optional<Payloads> inputArgs =
1✔
263
        dataConverterWithWorkflowContext.toPayloads(input.getArguments());
1✔
264
    inputArgs.ifPresent(query::setQueryArgs);
1✔
265
    QueryWorkflowRequest request =
266
        QueryWorkflowRequest.newBuilder()
1✔
267
            .setNamespace(clientOptions.getNamespace())
1✔
268
            .setExecution(
1✔
269
                WorkflowExecution.newBuilder()
1✔
270
                    .setWorkflowId(input.getWorkflowExecution().getWorkflowId())
1✔
271
                    .setRunId(input.getWorkflowExecution().getRunId()))
1✔
272
            .setQuery(query)
1✔
273
            .setQueryRejectCondition(clientOptions.getQueryRejectCondition())
1✔
274
            .build();
1✔
275

276
    QueryWorkflowResponse result;
277
    result = genericClient.query(request);
1✔
278

279
    boolean queryRejected = result.hasQueryRejected();
1✔
280
    WorkflowExecutionStatus rejectStatus =
281
        queryRejected ? result.getQueryRejected().getStatus() : null;
1✔
282
    Optional<Payloads> queryResult =
283
        result.hasQueryResult() ? Optional.of(result.getQueryResult()) : Optional.empty();
1✔
284
    R resultValue =
1✔
285
        convertResultPayloads(
1✔
286
            queryResult,
287
            input.getResultClass(),
1✔
288
            input.getResultType(),
1✔
289
            dataConverterWithWorkflowContext);
290
    return new QueryOutput<>(rejectStatus, resultValue);
1✔
291
  }
292

293
  @Override
294
  public <R> UpdateOutput<R> update(UpdateInput<R> input) {
295
    DataConverter dataConverterWithWorkflowContext =
×
296
        clientOptions
297
            .getDataConverter()
×
298
            .withContext(
×
299
                new WorkflowSerializationContext(
300
                    clientOptions.getNamespace(), input.getWorkflowExecution().getWorkflowId()));
×
301

302
    Optional<Payloads> inputArgs =
×
303
        dataConverterWithWorkflowContext.toPayloads(input.getArguments());
×
304
    Input.Builder updateInput = Input.newBuilder().setName(input.getUpdateName());
×
305
    inputArgs.ifPresent(updateInput::setArgs);
×
306

307
    WaitPolicy policy =
308
        WaitPolicy.newBuilder()
×
309
            .setLifecycleStage(
×
310
                UpdateWorkflowExecutionLifecycleStage
311
                    .UPDATE_WORKFLOW_EXECUTION_LIFECYCLE_STAGE_COMPLETED)
312
            .build();
×
313

314
    Request request =
315
        Request.newBuilder()
×
316
            .setMeta(
×
317
                Meta.newBuilder()
×
318
                    .setUpdateId(input.getUpdateId())
×
319
                    .setIdentity(clientOptions.getIdentity()))
×
320
            .setInput(updateInput)
×
321
            .build();
×
322
    UpdateWorkflowExecutionRequest updateRequest =
323
        UpdateWorkflowExecutionRequest.newBuilder()
×
324
            .setNamespace(clientOptions.getNamespace())
×
325
            .setWaitPolicy(policy)
×
326
            .setWorkflowExecution(
×
327
                WorkflowExecution.newBuilder()
×
328
                    .setWorkflowId(input.getWorkflowExecution().getWorkflowId())
×
329
                    .setRunId(input.getWorkflowExecution().getRunId()))
×
330
            .setFirstExecutionRunId(input.getFirstExecutionRunId())
×
331
            .setRequest(request)
×
332
            .build();
×
333
    UpdateWorkflowExecutionResponse result;
334
    result = genericClient.update(updateRequest);
×
335

336
    if (result.hasOutcome()) {
×
337
      switch (result.getOutcome().getValueCase()) {
×
338
        case SUCCESS:
339
          Optional<Payloads> updateResult = Optional.of(result.getOutcome().getSuccess());
×
340
          R resultValue =
×
341
              convertResultPayloads(
×
342
                  updateResult,
343
                  input.getResultClass(),
×
344
                  input.getResultType(),
×
345
                  dataConverterWithWorkflowContext);
346
          return new UpdateOutput<>(resultValue);
×
347
        case FAILURE:
348
          throw new WorkflowUpdateException(
×
349
              input.getWorkflowExecution(),
×
350
              input.getUpdateId(),
×
351
              input.getUpdateName(),
×
352
              dataConverterWithWorkflowContext.failureToException(
×
353
                  result.getOutcome().getFailure()));
×
354
        default:
355
          throw new RuntimeException(
×
356
              "Received unexpected outcome from update request: "
357
                  + result.getOutcome().getValueCase());
×
358
      }
359
    }
360
    throw new RuntimeException("Received no outcome from server");
×
361
  }
362

363
  @Override
364
  public <R> UpdateAsyncOutput<R> updateAsync(UpdateInput<R> input) {
365
    // TODO(https://github.com/temporalio/sdk-java/issues/1743) Support update polling
366
    CompletableFuture future = new CompletableFuture<R>();
×
367
    try {
368
      future.complete(update(input).getResult());
×
369
    } catch (Exception e) {
×
370
      future.completeExceptionally(e);
×
371
    }
×
372
    return new UpdateAsyncOutput<>(future);
×
373
  }
374

375
  @Override
376
  public CancelOutput cancel(CancelInput input) {
377
    RequestCancelWorkflowExecutionRequest.Builder request =
378
        RequestCancelWorkflowExecutionRequest.newBuilder()
1✔
379
            .setRequestId(UUID.randomUUID().toString())
1✔
380
            .setWorkflowExecution(input.getWorkflowExecution())
1✔
381
            .setNamespace(clientOptions.getNamespace())
1✔
382
            .setIdentity(clientOptions.getIdentity());
1✔
383
    genericClient.requestCancel(request.build());
1✔
384
    return new CancelOutput();
1✔
385
  }
386

387
  @Override
388
  public TerminateOutput terminate(TerminateInput input) {
389
    TerminateWorkflowExecutionRequest.Builder request =
390
        TerminateWorkflowExecutionRequest.newBuilder()
1✔
391
            .setNamespace(clientOptions.getNamespace())
1✔
392
            .setWorkflowExecution(input.getWorkflowExecution());
1✔
393
    if (input.getReason() != null) {
1✔
394
      request.setReason(input.getReason());
1✔
395
    }
396
    DataConverter dataConverterWithWorkflowContext =
1✔
397
        clientOptions
398
            .getDataConverter()
1✔
399
            .withContext(
1✔
400
                new WorkflowSerializationContext(
401
                    clientOptions.getNamespace(), input.getWorkflowExecution().getWorkflowId()));
1✔
402
    Optional<Payloads> payloads = dataConverterWithWorkflowContext.toPayloads(input.getDetails());
1✔
403
    payloads.ifPresent(request::setDetails);
1✔
404
    genericClient.terminate(request.build());
1✔
405
    return new TerminateOutput();
1✔
406
  }
407

408
  private static <R> R convertResultPayloads(
409
      Optional<Payloads> resultValue,
410
      Class<R> resultClass,
411
      Type resultType,
412
      DataConverter dataConverter) {
413
    return dataConverter.fromPayloads(0, resultValue, resultClass, resultType);
1✔
414
  }
415

416
  /**
417
   * @return a handle to dispatch the eager workflow task. {@code null} if an eager execution is
418
   *     disabled through {@link io.temporal.client.WorkflowOptions} or the worker
419
   *     <ul>
420
   *       <li>is activity only worker
421
   *       <li>not started, shutdown or paused
422
   *       <li>doesn't have an executor slot available
423
   *     </ul>
424
   */
425
  @Nullable
426
  private WorkflowTaskDispatchHandle obtainDispatchHandle(WorkflowStartInput input) {
427
    if (input.getOptions().isDisableEagerExecution()) {
1✔
428
      return null;
1✔
429
    }
430
    return eagerWorkflowTaskDispatcher.tryGetLocalDispatchHandler(input);
1✔
431
  }
432
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc