• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

temporalio / sdk-java / #172

pending completion
#172

push

github-actions

web-flow
Update CODEOWNERS (#1773)

## What was changed
Update CODEOWNERS so that Security can own the Semgrep rules files and paths.

## Why?
We are adding Semgrep for static analysis to this repository, and only the security team should be able to approve exclusions from the policy.

## Checklist

How was this tested:
We ran this scanner on internal repos with this CODEOWNERS file and it worked as expected.

18029 of 22084 relevant lines covered (81.64%)

0.82 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

92.96
/temporal-sdk/src/main/java/io/temporal/internal/client/RootWorkflowClientInvoker.java
1
/*
2
 * Copyright (C) 2022 Temporal Technologies, Inc. All Rights Reserved.
3
 *
4
 * Copyright (C) 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved.
5
 *
6
 * Modifications copyright (C) 2017 Uber Technologies, Inc.
7
 *
8
 * Licensed under the Apache License, Version 2.0 (the "License");
9
 * you may not use this material except in compliance with the License.
10
 * You may obtain a copy of the License at
11
 *
12
 *   http://www.apache.org/licenses/LICENSE-2.0
13
 *
14
 * Unless required by applicable law or agreed to in writing, software
15
 * distributed under the License is distributed on an "AS IS" BASIS,
16
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17
 * See the License for the specific language governing permissions and
18
 * limitations under the License.
19
 */
20

21
package io.temporal.internal.client;
22

23
import static io.temporal.internal.common.HeaderUtils.intoPayloadMap;
24

25
import io.grpc.Deadline;
26
import io.grpc.Status;
27
import io.grpc.StatusRuntimeException;
28
import io.temporal.api.common.v1.*;
29
import io.temporal.api.enums.v1.UpdateWorkflowExecutionLifecycleStage;
30
import io.temporal.api.enums.v1.WorkflowExecutionStatus;
31
import io.temporal.api.query.v1.WorkflowQuery;
32
import io.temporal.api.update.v1.*;
33
import io.temporal.api.workflowservice.v1.*;
34
import io.temporal.client.WorkflowClientOptions;
35
import io.temporal.client.WorkflowUpdateException;
36
import io.temporal.common.converter.DataConverter;
37
import io.temporal.common.interceptors.WorkflowClientCallsInterceptor;
38
import io.temporal.internal.client.external.GenericWorkflowClient;
39
import io.temporal.payload.context.WorkflowSerializationContext;
40
import io.temporal.worker.WorkflowTaskDispatchHandle;
41
import java.lang.reflect.Type;
42
import java.util.*;
43
import java.util.concurrent.CompletableFuture;
44
import java.util.concurrent.TimeUnit;
45
import java.util.concurrent.TimeoutException;
46
import javax.annotation.Nullable;
47
import org.slf4j.Logger;
48
import org.slf4j.LoggerFactory;
49

50
public class RootWorkflowClientInvoker implements WorkflowClientCallsInterceptor {
51
  private static final Logger log = LoggerFactory.getLogger(RootWorkflowClientInvoker.class);
52

53
  private final GenericWorkflowClient genericClient;
54
  private final WorkflowClientOptions clientOptions;
55
  private final EagerWorkflowTaskDispatcher eagerWorkflowTaskDispatcher;
56
  private final WorkflowClientRequestFactory requestsHelper;
57

58
  public RootWorkflowClientInvoker(
59
      GenericWorkflowClient genericClient,
60
      WorkflowClientOptions clientOptions,
61
      WorkerFactoryRegistry workerFactoryRegistry) {
62
    this.genericClient = genericClient;
63
    this.clientOptions = clientOptions;
64
    this.eagerWorkflowTaskDispatcher = new EagerWorkflowTaskDispatcher(workerFactoryRegistry);
65
    this.requestsHelper = new WorkflowClientRequestFactory(clientOptions);
66
  }
67

68
  @Override
69
  public WorkflowStartOutput start(WorkflowStartInput input) {
70
    DataConverter dataConverterWithWorkflowContext =
71
        clientOptions
72
            .getDataConverter()
73
            .withContext(
74
                new WorkflowSerializationContext(
75
                    clientOptions.getNamespace(), input.getWorkflowId()));
76
    Optional<Payloads> inputArgs =
77
        dataConverterWithWorkflowContext.toPayloads(input.getArguments());
78

79
    @Nullable
80
    Memo memo =
81
        (input.getOptions().getMemo() != null)
82
            ? Memo.newBuilder()
83
                .putAllFields(
84
                    intoPayloadMap(clientOptions.getDataConverter(), input.getOptions().getMemo()))
85
                .build()
86
            : null;
87

88
    StartWorkflowExecutionRequest.Builder request =
89
        requestsHelper.newStartWorkflowExecutionRequest(
90
            input.getWorkflowId(),
91
            input.getWorkflowType(),
92
            input.getHeader(),
93
            input.getOptions(),
94
            inputArgs.orElse(null),
95
            memo);
96
    try (@Nullable WorkflowTaskDispatchHandle eagerDispatchHandle = obtainDispatchHandle(input)) {
97
      boolean requestEagerExecution = eagerDispatchHandle != null;
98
      request.setRequestEagerExecution(requestEagerExecution);
99
      StartWorkflowExecutionResponse response = genericClient.start(request.build());
100
      WorkflowExecution execution =
101
          WorkflowExecution.newBuilder()
102
              .setRunId(response.getRunId())
103
              .setWorkflowId(request.getWorkflowId())
104
              .build();
105
      @Nullable
106
      PollWorkflowTaskQueueResponse eagerWorkflowTask =
107
          requestEagerExecution && response.hasEagerWorkflowTask()
108
              ? response.getEagerWorkflowTask()
109
              : null;
110
      if (eagerWorkflowTask != null) {
111
        try {
112
          eagerDispatchHandle.dispatch(eagerWorkflowTask);
113
        } catch (Exception e) {
114
          // Any exception here is not expected, and it's a bug.
115
          // But we don't allow any exception from the dispatching to disrupt the control flow here,
116
          // the Client needs to get the execution back to matter what.
117
          // Inability to dispatch a WFT creates a latency issue, but it's not a failure of the
118
          // start itself
119
          log.error(
120
              "[BUG] Eager Workflow Task was received from the Server, but failed to be dispatched on the local worker",
121
              e);
122
        }
123
      }
124
      return new WorkflowStartOutput(execution);
125
    }
126
  }
127

128
  @Override
129
  public WorkflowSignalOutput signal(WorkflowSignalInput input) {
130
    SignalWorkflowExecutionRequest.Builder request =
131
        SignalWorkflowExecutionRequest.newBuilder()
132
            .setSignalName(input.getSignalName())
133
            .setWorkflowExecution(input.getWorkflowExecution())
134
            .setIdentity(clientOptions.getIdentity())
135
            .setNamespace(clientOptions.getNamespace());
136

137
    DataConverter dataConverterWitSignalContext =
138
        clientOptions
139
            .getDataConverter()
140
            .withContext(
141
                new WorkflowSerializationContext(
142
                    clientOptions.getNamespace(), input.getWorkflowExecution().getWorkflowId()));
143

144
    Optional<Payloads> inputArgs = dataConverterWitSignalContext.toPayloads(input.getArguments());
145
    inputArgs.ifPresent(request::setInput);
146
    genericClient.signal(request.build());
147
    return new WorkflowSignalOutput();
148
  }
149

150
  @Override
151
  public WorkflowSignalWithStartOutput signalWithStart(WorkflowSignalWithStartInput input) {
152
    WorkflowStartInput workflowStartInput = input.getWorkflowStartInput();
153

154
    DataConverter dataConverterWithWorkflowContext =
155
        clientOptions
156
            .getDataConverter()
157
            .withContext(
158
                new WorkflowSerializationContext(
159
                    clientOptions.getNamespace(), workflowStartInput.getWorkflowId()));
160
    Optional<Payloads> workflowInput =
161
        dataConverterWithWorkflowContext.toPayloads(workflowStartInput.getArguments());
162

163
    @Nullable
164
    Memo memo =
165
        (workflowStartInput.getOptions().getMemo() != null)
166
            ? Memo.newBuilder()
167
                .putAllFields(
168
                    intoPayloadMap(
169
                        clientOptions.getDataConverter(),
170
                        workflowStartInput.getOptions().getMemo()))
171
                .build()
172
            : null;
173

174
    StartWorkflowExecutionRequestOrBuilder startRequest =
175
        requestsHelper.newStartWorkflowExecutionRequest(
176
            workflowStartInput.getWorkflowId(),
177
            workflowStartInput.getWorkflowType(),
178
            workflowStartInput.getHeader(),
179
            workflowStartInput.getOptions(),
180
            workflowInput.orElse(null),
181
            memo);
182

183
    Optional<Payloads> signalInput =
184
        dataConverterWithWorkflowContext.toPayloads(input.getSignalArguments());
185
    SignalWithStartWorkflowExecutionRequest request =
186
        requestsHelper
187
            .newSignalWithStartWorkflowExecutionRequest(
188
                startRequest, input.getSignalName(), signalInput.orElse(null))
189
            .build();
190
    SignalWithStartWorkflowExecutionResponse response = genericClient.signalWithStart(request);
191
    WorkflowExecution execution =
192
        WorkflowExecution.newBuilder()
193
            .setRunId(response.getRunId())
194
            .setWorkflowId(request.getWorkflowId())
195
            .build();
196
    // TODO currently SignalWithStartWorkflowExecutionResponse doesn't have eagerWorkflowTask.
197
    //  We should wire it when it's implemented server-side.
198
    return new WorkflowSignalWithStartOutput(new WorkflowStartOutput(execution));
199
  }
200

201
  @Override
202
  public <R> GetResultOutput<R> getResult(GetResultInput<R> input) throws TimeoutException {
203
    DataConverter dataConverterWithWorkflowContext =
204
        clientOptions
205
            .getDataConverter()
206
            .withContext(
207
                new WorkflowSerializationContext(
208
                    clientOptions.getNamespace(), input.getWorkflowExecution().getWorkflowId()));
209
    Optional<Payloads> resultValue =
210
        WorkflowClientLongPollHelper.getWorkflowExecutionResult(
211
            genericClient,
212
            requestsHelper,
213
            input.getWorkflowExecution(),
214
            input.getWorkflowType(),
215
            dataConverterWithWorkflowContext,
216
            input.getTimeout(),
217
            input.getTimeoutUnit());
218
    return new GetResultOutput<>(
219
        convertResultPayloads(
220
            resultValue,
221
            input.getResultClass(),
222
            input.getResultType(),
223
            dataConverterWithWorkflowContext));
224
  }
225

226
  @Override
227
  public <R> GetResultAsyncOutput<R> getResultAsync(GetResultInput<R> input) {
228
    DataConverter dataConverterWithWorkflowContext =
229
        clientOptions
230
            .getDataConverter()
231
            .withContext(
232
                new WorkflowSerializationContext(
233
                    clientOptions.getNamespace(), input.getWorkflowExecution().getWorkflowId()));
234
    CompletableFuture<Optional<Payloads>> resultValue =
235
        WorkflowClientLongPollAsyncHelper.getWorkflowExecutionResultAsync(
236
            genericClient,
237
            requestsHelper,
238
            input.getWorkflowExecution(),
239
            input.getWorkflowType(),
240
            input.getTimeout(),
241
            input.getTimeoutUnit(),
242
            dataConverterWithWorkflowContext);
243
    return new GetResultAsyncOutput<>(
244
        resultValue.thenApply(
245
            payloads ->
246
                convertResultPayloads(
247
                    payloads,
248
                    input.getResultClass(),
249
                    input.getResultType(),
250
                    dataConverterWithWorkflowContext)));
251
  }
252

253
  @Override
254
  public <R> QueryOutput<R> query(QueryInput<R> input) {
255
    WorkflowQuery.Builder query = WorkflowQuery.newBuilder().setQueryType(input.getQueryType());
256
    DataConverter dataConverterWithWorkflowContext =
257
        clientOptions
258
            .getDataConverter()
259
            .withContext(
260
                new WorkflowSerializationContext(
261
                    clientOptions.getNamespace(), input.getWorkflowExecution().getWorkflowId()));
262

263
    Optional<Payloads> inputArgs =
264
        dataConverterWithWorkflowContext.toPayloads(input.getArguments());
265
    inputArgs.ifPresent(query::setQueryArgs);
266
    QueryWorkflowRequest request =
267
        QueryWorkflowRequest.newBuilder()
268
            .setNamespace(clientOptions.getNamespace())
269
            .setExecution(
270
                WorkflowExecution.newBuilder()
271
                    .setWorkflowId(input.getWorkflowExecution().getWorkflowId())
272
                    .setRunId(input.getWorkflowExecution().getRunId()))
273
            .setQuery(query)
274
            .setQueryRejectCondition(clientOptions.getQueryRejectCondition())
275
            .build();
276

277
    QueryWorkflowResponse result;
278
    result = genericClient.query(request);
279

280
    boolean queryRejected = result.hasQueryRejected();
281
    WorkflowExecutionStatus rejectStatus =
282
        queryRejected ? result.getQueryRejected().getStatus() : null;
283
    Optional<Payloads> queryResult =
284
        result.hasQueryResult() ? Optional.of(result.getQueryResult()) : Optional.empty();
285
    R resultValue =
286
        convertResultPayloads(
287
            queryResult,
288
            input.getResultClass(),
289
            input.getResultType(),
290
            dataConverterWithWorkflowContext);
291
    return new QueryOutput<>(rejectStatus, resultValue);
292
  }
293

294
  @Override
295
  public <R> StartUpdateOutput<R> startUpdate(StartUpdateInput<R> input) {
296
    DataConverter dataConverterWithWorkflowContext =
297
        clientOptions
298
            .getDataConverter()
299
            .withContext(
300
                new WorkflowSerializationContext(
301
                    clientOptions.getNamespace(), input.getWorkflowExecution().getWorkflowId()));
302

303
    Optional<Payloads> inputArgs =
304
        dataConverterWithWorkflowContext.toPayloads(input.getArguments());
305
    Input.Builder updateInput = Input.newBuilder().setName(input.getUpdateName());
306
    inputArgs.ifPresent(updateInput::setArgs);
307

308
    Request request =
309
        Request.newBuilder()
310
            .setMeta(
311
                Meta.newBuilder()
312
                    .setUpdateId(input.getUpdateId())
313
                    .setIdentity(clientOptions.getIdentity()))
314
            .setInput(updateInput)
315
            .build();
316
    UpdateWorkflowExecutionRequest updateRequest =
317
        UpdateWorkflowExecutionRequest.newBuilder()
318
            .setNamespace(clientOptions.getNamespace())
319
            .setWaitPolicy(input.getWaitPolicy())
320
            .setWorkflowExecution(
321
                WorkflowExecution.newBuilder()
322
                    .setWorkflowId(input.getWorkflowExecution().getWorkflowId())
323
                    .setRunId(input.getWorkflowExecution().getRunId()))
324
            .setFirstExecutionRunId(input.getFirstExecutionRunId())
325
            .setRequest(request)
326
            .build();
327
    UpdateWorkflowExecutionResponse result = genericClient.update(updateRequest);
328

329
    if (result.hasOutcome()) {
330
      switch (result.getOutcome().getValueCase()) {
331
        case SUCCESS:
332
          Optional<Payloads> updateResult = Optional.of(result.getOutcome().getSuccess());
333
          R resultValue =
334
              convertResultPayloads(
335
                  updateResult,
336
                  input.getResultClass(),
337
                  input.getResultType(),
338
                  dataConverterWithWorkflowContext);
339
          return new StartUpdateOutput<R>(result.getUpdateRef(), true, resultValue);
340
        case FAILURE:
341
          throw new WorkflowUpdateException(
342
              result.getUpdateRef().getWorkflowExecution(),
343
              result.getUpdateRef().getUpdateId(),
344
              input.getUpdateName(),
345
              dataConverterWithWorkflowContext.failureToException(
346
                  result.getOutcome().getFailure()));
347
        default:
348
          throw new RuntimeException(
349
              "Received unexpected outcome from update request: "
350
                  + result.getOutcome().getValueCase());
351
      }
352
    } else {
353
      return new StartUpdateOutput<R>(result.getUpdateRef(), false, null);
354
    }
355
  }
356

357
  @Override
358
  public <R> PollWorkflowUpdateOutput<R> pollWorkflowUpdate(PollWorkflowUpdateInput<R> input) {
359
    DataConverter dataConverterWithWorkflowContext =
360
        clientOptions
361
            .getDataConverter()
362
            .withContext(
363
                new WorkflowSerializationContext(
364
                    clientOptions.getNamespace(), input.getWorkflowExecution().getWorkflowId()));
365

366
    UpdateRef update =
367
        UpdateRef.newBuilder()
368
            .setWorkflowExecution(input.getWorkflowExecution())
369
            .setUpdateId(input.getUpdateId())
370
            .build();
371

372
    WaitPolicy waitPolicy =
373
        WaitPolicy.newBuilder()
374
            .setLifecycleStage(
375
                UpdateWorkflowExecutionLifecycleStage
376
                    .UPDATE_WORKFLOW_EXECUTION_LIFECYCLE_STAGE_COMPLETED)
377
            .build();
378

379
    PollWorkflowExecutionUpdateRequest pollUpdateRequest =
380
        PollWorkflowExecutionUpdateRequest.newBuilder()
381
            .setNamespace(clientOptions.getNamespace())
382
            .setIdentity(clientOptions.getIdentity())
383
            .setUpdateRef(update)
384
            .setWaitPolicy(waitPolicy)
385
            .build();
386

387
    CompletableFuture<PollWorkflowExecutionUpdateResponse> future = new CompletableFuture<>();
388

389
    Deadline pollTimeoutDeadline = Deadline.after(input.getTimeout(), input.getTimeoutUnit());
390
    pollWorkflowUpdateHelper(future, pollUpdateRequest, pollTimeoutDeadline);
391
    return new PollWorkflowUpdateOutput(
392
        future.thenApply(
393
            (result) -> {
394
              if (result.hasOutcome()) {
395
                switch (result.getOutcome().getValueCase()) {
396
                  case SUCCESS:
397
                    Optional<Payloads> updateResult = Optional.of(result.getOutcome().getSuccess());
398
                    R resultValue =
399
                        convertResultPayloads(
400
                            updateResult,
401
                            input.getResultClass(),
402
                            input.getResultType(),
403
                            dataConverterWithWorkflowContext);
404
                    return resultValue;
405
                  case FAILURE:
406
                    throw new WorkflowUpdateException(
407
                        input.getWorkflowExecution(),
408
                        input.getUpdateId(),
409
                        input.getUpdateName(),
410
                        dataConverterWithWorkflowContext.failureToException(
411
                            result.getOutcome().getFailure()));
412
                  default:
413
                    throw new RuntimeException(
414
                        "Received unexpected outcome from poll update request: "
415
                            + result.getOutcome().getValueCase());
416
                }
417
              }
418
              throw new RuntimeException("Received no outcome from server");
419
            }));
420
  }
421

422
  private void pollWorkflowUpdateHelper(
423
      CompletableFuture<PollWorkflowExecutionUpdateResponse> resultCF,
424
      PollWorkflowExecutionUpdateRequest request,
425
      Deadline deadline) {
426

427
    Deadline pollTimeoutDeadline = Deadline.after(60L, TimeUnit.SECONDS).minimum(deadline);
428
    genericClient
429
        .pollUpdateAsync(request, pollTimeoutDeadline)
430
        .whenComplete(
431
            (r, e) -> {
432
              if ((e instanceof StatusRuntimeException
433
                      && ((StatusRuntimeException) e).getStatus().getCode()
434
                          == Status.Code.DEADLINE_EXCEEDED)
435
                  || pollTimeoutDeadline.isExpired()
436
                  || (e == null && !r.hasOutcome())) {
437
                // if the request has timed out, stop retrying
438
                if (!deadline.isExpired()) {
439
                  pollWorkflowUpdateHelper(resultCF, request, deadline);
440
                } else {
441
                  resultCF.completeExceptionally(
442
                      new TimeoutException(
443
                          "WorkflowId="
444
                              + request.getUpdateRef().getWorkflowExecution().getWorkflowId()
445
                              + ", runId="
446
                              + request.getUpdateRef().getWorkflowExecution().getRunId()
447
                              + ", updateId="
448
                              + request.getUpdateRef().getUpdateId()));
449
                }
450
              } else if (e != null) {
451
                resultCF.completeExceptionally(e);
452
              } else {
453
                resultCF.complete(r);
454
              }
455
            });
456
  }
457

458
  @Override
459
  public CancelOutput cancel(CancelInput input) {
460
    RequestCancelWorkflowExecutionRequest.Builder request =
461
        RequestCancelWorkflowExecutionRequest.newBuilder()
462
            .setRequestId(UUID.randomUUID().toString())
463
            .setWorkflowExecution(input.getWorkflowExecution())
464
            .setNamespace(clientOptions.getNamespace())
465
            .setIdentity(clientOptions.getIdentity());
466
    genericClient.requestCancel(request.build());
467
    return new CancelOutput();
468
  }
469

470
  @Override
471
  public TerminateOutput terminate(TerminateInput input) {
472
    TerminateWorkflowExecutionRequest.Builder request =
473
        TerminateWorkflowExecutionRequest.newBuilder()
474
            .setNamespace(clientOptions.getNamespace())
475
            .setWorkflowExecution(input.getWorkflowExecution());
476
    if (input.getReason() != null) {
477
      request.setReason(input.getReason());
478
    }
479
    DataConverter dataConverterWithWorkflowContext =
480
        clientOptions
481
            .getDataConverter()
482
            .withContext(
483
                new WorkflowSerializationContext(
484
                    clientOptions.getNamespace(), input.getWorkflowExecution().getWorkflowId()));
485
    Optional<Payloads> payloads = dataConverterWithWorkflowContext.toPayloads(input.getDetails());
486
    payloads.ifPresent(request::setDetails);
487
    genericClient.terminate(request.build());
488
    return new TerminateOutput();
489
  }
490

491
  private static <R> R convertResultPayloads(
492
      Optional<Payloads> resultValue,
493
      Class<R> resultClass,
494
      Type resultType,
495
      DataConverter dataConverter) {
496
    return dataConverter.fromPayloads(0, resultValue, resultClass, resultType);
497
  }
498

499
  /**
500
   * @return a handle to dispatch the eager workflow task. {@code null} if an eager execution is
501
   *     disabled through {@link io.temporal.client.WorkflowOptions} or the worker
502
   *     <ul>
503
   *       <li>is activity only worker
504
   *       <li>not started, shutdown or paused
505
   *       <li>doesn't have an executor slot available
506
   *     </ul>
507
   */
508
  @Nullable
509
  private WorkflowTaskDispatchHandle obtainDispatchHandle(WorkflowStartInput input) {
510
    if (input.getOptions().isDisableEagerExecution()) {
511
      return null;
512
    }
513
    return eagerWorkflowTaskDispatcher.tryGetLocalDispatchHandler(input);
514
  }
515
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc