• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

temporalio / sdk-java / #153

pending completion
#153

push

github-actions

web-flow
Eager Workflow Task Dispatch (#1674)

Issue #1646

Signed-off-by: Dmitry Spikhalskiy <dmitry@spikhalskiy.com>

213 of 213 new or added lines in 22 files covered. (100.0%)

16682 of 20566 relevant lines covered (81.11%)

0.81 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

93.52
/temporal-sdk/src/main/java/io/temporal/client/WorkflowOptions.java
1
/*
2
 * Copyright (C) 2022 Temporal Technologies, Inc. All Rights Reserved.
3
 *
4
 * Copyright (C) 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved.
5
 *
6
 * Modifications copyright (C) 2017 Uber Technologies, Inc.
7
 *
8
 * Licensed under the Apache License, Version 2.0 (the "License");
9
 * you may not use this material except in compliance with the License.
10
 * You may obtain a copy of the License at
11
 *
12
 *   http://www.apache.org/licenses/LICENSE-2.0
13
 *
14
 * Unless required by applicable law or agreed to in writing, software
15
 * distributed under the License is distributed on an "AS IS" BASIS,
16
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17
 * See the License for the specific language governing permissions and
18
 * limitations under the License.
19
 */
20

21
package io.temporal.client;
22

23
import com.google.common.base.Objects;
24
import io.temporal.api.enums.v1.WorkflowIdReusePolicy;
25
import io.temporal.common.CronSchedule;
26
import io.temporal.common.MethodRetry;
27
import io.temporal.common.RetryOptions;
28
import io.temporal.common.context.ContextPropagator;
29
import io.temporal.internal.common.OptionsUtils;
30
import io.temporal.worker.WorkerFactory;
31
import io.temporal.workflow.Workflow;
32
import java.time.Duration;
33
import java.util.Collection;
34
import java.util.List;
35
import java.util.Map;
36
import javax.annotation.Nullable;
37

38
public final class WorkflowOptions {
39

40
  public static Builder newBuilder() {
41
    return new Builder();
42
  }
43

44
  public static Builder newBuilder(WorkflowOptions options) {
45
    return new Builder(options);
46
  }
47

48
  public static WorkflowOptions getDefaultInstance() {
49
    return DEFAULT_INSTANCE;
50
  }
51

52
  private static final WorkflowOptions DEFAULT_INSTANCE;
53

54
  static {
55
    DEFAULT_INSTANCE = WorkflowOptions.newBuilder().build();
56
  }
57

58
  public static WorkflowOptions merge(
59
      MethodRetry methodRetry, CronSchedule cronSchedule, WorkflowOptions o) {
60
    if (o == null) {
61
      o = WorkflowOptions.newBuilder().build();
62
    }
63
    String cronAnnotation = cronSchedule == null ? "" : cronSchedule.value();
64
    return WorkflowOptions.newBuilder()
65
        .setWorkflowId(o.getWorkflowId())
66
        .setWorkflowIdReusePolicy(o.getWorkflowIdReusePolicy())
67
        .setWorkflowRunTimeout(o.getWorkflowRunTimeout())
68
        .setWorkflowExecutionTimeout(o.getWorkflowExecutionTimeout())
69
        .setWorkflowTaskTimeout(o.getWorkflowTaskTimeout())
70
        .setTaskQueue(o.getTaskQueue())
71
        .setRetryOptions(RetryOptions.merge(methodRetry, o.getRetryOptions()))
72
        .setCronSchedule(OptionsUtils.merge(cronAnnotation, o.getCronSchedule(), String.class))
73
        .setMemo(o.getMemo())
74
        .setSearchAttributes(o.getSearchAttributes())
75
        .setContextPropagators(o.getContextPropagators())
76
        .setDisableEagerExecution(o.isDisableEagerExecution())
77
        .validateBuildWithDefaults();
78
  }
79

80
  public static final class Builder {
81

82
    private String workflowId;
83

84
    private WorkflowIdReusePolicy workflowIdReusePolicy;
85

86
    private Duration workflowRunTimeout;
87

88
    private Duration workflowExecutionTimeout;
89

90
    private Duration workflowTaskTimeout;
91

92
    private String taskQueue;
93

94
    private RetryOptions retryOptions;
95

96
    private String cronSchedule;
97

98
    private Map<String, Object> memo;
99

100
    private Map<String, ?> searchAttributes;
101

102
    private List<ContextPropagator> contextPropagators;
103

104
    private boolean disableEagerExecution;
105

106
    private Builder() {}
107

108
    private Builder(WorkflowOptions options) {
109
      if (options == null) {
110
        return;
111
      }
112
      this.workflowIdReusePolicy = options.workflowIdReusePolicy;
113
      this.workflowId = options.workflowId;
114
      this.workflowTaskTimeout = options.workflowTaskTimeout;
115
      this.workflowRunTimeout = options.workflowRunTimeout;
116
      this.workflowExecutionTimeout = options.workflowExecutionTimeout;
117
      this.taskQueue = options.taskQueue;
118
      this.retryOptions = options.retryOptions;
119
      this.cronSchedule = options.cronSchedule;
120
      this.memo = options.memo;
121
      this.searchAttributes = options.searchAttributes;
122
      this.contextPropagators = options.contextPropagators;
123
      this.disableEagerExecution = options.disableEagerExecution;
124
    }
125

126
    /**
127
     * Workflow id to use when starting. If not specified a UUID is generated. Note that it is
128
     * dangerous as in case of client side retries no deduplication will happen based on the
129
     * generated id. So prefer assigning business meaningful ids if possible.
130
     */
131
    public Builder setWorkflowId(String workflowId) {
132
      this.workflowId = workflowId;
133
      return this;
134
    }
135

136
    /**
137
     * Specifies server behavior if a completed workflow with the same id exists. Note that under no
138
     * conditions Temporal allows two workflows with the same namespace and workflow id run
139
     * simultaneously.
140
     *
141
     * <p>Default value if not set: <b>AllowDuplicate</b>
142
     *
143
     * <ul>
144
     *   <li><b>AllowDuplicate</b> allows a new run regardless of the previous run's final status.
145
     *       The previous run still must be closed or the new run will be rejected.
146
     *   <li><b>AllowDuplicateFailedOnly</b> allows a new run if the previous run failed, was
147
     *       canceled, or terminated.
148
     *   <li><b>RejectDuplicate</b> never allows a new run, regardless of the previous run's final
149
     *       status.
150
     *   <li><b>TerminateIfRunning</b> is the same as <b>AllowDuplicate</b>, but if there exists a
151
     *       not-closed run in progress, it will be terminated.
152
     * </ul>
153
     */
154
    public Builder setWorkflowIdReusePolicy(WorkflowIdReusePolicy workflowIdReusePolicy) {
155
      this.workflowIdReusePolicy = workflowIdReusePolicy;
156
      return this;
157
    }
158

159
    /**
160
     * The time after which a workflow run is automatically terminated by Temporal service with
161
     * WORKFLOW_EXECUTION_TIMED_OUT status.
162
     *
163
     * <p>When a workflow reaches Workflow Run Timeout, it can't make any progress after that. Do
164
     * not rely on this timeout in workflow implementation or business logic. This timeout is not
165
     * designed to be handled in workflow code to perform any logic in case of timeout. Consider
166
     * using workflow timers instead.
167
     *
168
     * <p>If you catch yourself setting this timeout to very small values, you're likely using it
169
     * wrong.
170
     *
171
     * <p>Example: If Workflow Run Timeout is 30 seconds and the network was unavailable for 1
172
     * minute, workflows that were scheduled before the network blip will never have a chance to
173
     * make progress or react, and will be terminated. <br>
174
     * A timer that is scheduled in the workflow code using {@link Workflow#newTimer(Duration)} will
175
     * handle this situation gracefully. A workflow with such a timer will start after the network
176
     * blip. If it started before the network blip and the timer fires during the network blip, it
177
     * will get delivered after connectivity is restored and the workflow will be able to resume.
178
     */
179
    public Builder setWorkflowRunTimeout(Duration workflowRunTimeout) {
180
      this.workflowRunTimeout = workflowRunTimeout;
181
      return this;
182
    }
183

184
    /**
185
     * The time after which workflow execution (which includes run retries and continue as new) is
186
     * automatically terminated by Temporal service with WORKFLOW_EXECUTION_TIMED_OUT status.
187
     *
188
     * <p>When a workflow reaches Workflow Execution Timeout, it can't make any progress after that.
189
     * Do not rely on this timeout in workflow implementation or business logic. This timeout is not
190
     * designed to be handled in workflow code to perform any logic in case of timeout. Consider
191
     * using workflow timers instead.
192
     *
193
     * <p>If you catch yourself setting this timeout to very small values, you're likely using it
194
     * wrong.
195
     *
196
     * <p>Example: If Workflow Execution Timeout is 30 seconds and the network was unavailable for 1
197
     * minute, workflows that were scheduled before the network blip will never have a chance to
198
     * make progress or react, and will be terminated. <br>
199
     * A timer that is scheduled in the workflow code using {@link Workflow#newTimer(Duration)} will
200
     * handle this situation gracefully. A workflow with such a timer will start after the network
201
     * blip. If it started before the network blip and the timer fires during the network blip, it
202
     * will get delivered after connectivity is restored and the workflow will be able to resume.
203
     */
204
    public Builder setWorkflowExecutionTimeout(Duration workflowExecutionTimeout) {
205
      this.workflowExecutionTimeout = workflowExecutionTimeout;
206
      return this;
207
    }
208

209
    /**
210
     * Maximum execution time of a single Workflow Task. In the majority of cases there is no need
211
     * to change this timeout. Note that this timeout is not related to the overall Workflow
212
     * duration in any way. It defines for how long the Workflow can get blocked in the case of a
213
     * Workflow Worker crash.
214
     *
215
     * <p>Default is 10 seconds. Maximum value allowed by the Temporal Server is 1 minute.
216
     */
217
    public Builder setWorkflowTaskTimeout(Duration workflowTaskTimeout) {
218
      this.workflowTaskTimeout = workflowTaskTimeout;
219
      return this;
220
    }
221

222
    /**
223
     * Task queue to use for workflow tasks. It should match a task queue specified when creating a
224
     * {@link io.temporal.worker.Worker} that hosts the workflow code.
225
     */
226
    public Builder setTaskQueue(String taskQueue) {
227
      this.taskQueue = taskQueue;
228
      return this;
229
    }
230

231
    public Builder setRetryOptions(RetryOptions retryOptions) {
232
      this.retryOptions = retryOptions;
233
      return this;
234
    }
235

236
    public Builder setCronSchedule(String cronSchedule) {
237
      this.cronSchedule = cronSchedule;
238
      return this;
239
    }
240

241
    /**
242
     * Specifies additional non-indexed information in result of list workflow. The type of value
243
     * can be any object that are serializable by {@link io.temporal.common.converter.DataConverter}
244
     */
245
    public Builder setMemo(Map<String, Object> memo) {
246
      this.memo = memo;
247
      return this;
248
    }
249

250
    /**
251
     * Specifies Search Attributes map {@code searchAttributes} that will be attached to the
252
     * Workflow. Search Attributes are additional indexed information attributed to workflow and
253
     * used for search and visibility.
254
     *
255
     * <p>The search attributes can be used in query of List/Scan/Count workflow APIs. The key and
256
     * its value type must be registered on Temporal server side.
257
     *
258
     * <p>Supported Java types of the value:
259
     *
260
     * <ul>
261
     *   <li>String
262
     *   <li>Long, Integer, Short, Byte
263
     *   <li>Boolean
264
     *   <li>Double
265
     *   <li>OffsetDateTime
266
     *   <li>{@link Collection} of the types above
267
     * </ul>
268
     */
269
    // Workflow#upsertSearchAttributes docs needs to be kept in sync with this method
270
    public Builder setSearchAttributes(Map<String, ?> searchAttributes) {
271
      this.searchAttributes = searchAttributes;
272
      return this;
273
    }
274

275
    /**
276
     * This list of context propagators overrides the list specified on {@link
277
     * WorkflowClientOptions#getContextPropagators()}. <br>
278
     * This method is uncommon, the majority of users should just set {@link
279
     * WorkflowClientOptions#getContextPropagators()}
280
     *
281
     * @param contextPropagators specifies the list of overriding context propagators, {@code null}
282
     *     means no overriding.
283
     */
284
    public Builder setContextPropagators(@Nullable List<ContextPropagator> contextPropagators) {
285
      this.contextPropagators = contextPropagators;
286
      return this;
287
    }
288

289
    /**
290
     * If {@link WorkflowClient} is used to create a {@link WorkerFactory} that is
291
     *
292
     * <ul>
293
     *   <li>started
294
     *   <li>has a non-paused worker on the right task queue
295
     *   <li>has available workflow task executor slots
296
     * </ul>
297
     *
298
     * and such a {@link WorkflowClient} is used to start a workflow, then the first workflow task
299
     * could be dispatched on this local worker with the response to the start call if Server
300
     * supports it. This option can be used to disable this mechanism.
301
     *
302
     * @param disableEagerExecution if true, an eager local execution of the workflow task will
303
     *     never be requested even if it is possible.
304
     */
305
    public Builder setDisableEagerExecution(boolean disableEagerExecution) {
306
      this.disableEagerExecution = disableEagerExecution;
307
      return this;
308
    }
309

310
    public WorkflowOptions build() {
311
      return new WorkflowOptions(
312
          workflowId,
313
          workflowIdReusePolicy,
314
          workflowRunTimeout,
315
          workflowExecutionTimeout,
316
          workflowTaskTimeout,
317
          taskQueue,
318
          retryOptions,
319
          cronSchedule,
320
          memo,
321
          searchAttributes,
322
          contextPropagators,
323
          disableEagerExecution);
324
    }
325

326
    /**
327
     * Validates that all required properties are set and fills all other with default parameters.
328
     */
329
    public WorkflowOptions validateBuildWithDefaults() {
330
      return new WorkflowOptions(
331
          workflowId,
332
          workflowIdReusePolicy,
333
          workflowRunTimeout,
334
          workflowExecutionTimeout,
335
          workflowTaskTimeout,
336
          taskQueue,
337
          retryOptions,
338
          cronSchedule,
339
          memo,
340
          searchAttributes,
341
          contextPropagators,
342
          disableEagerExecution);
343
    }
344
  }
345

346
  private final String workflowId;
347

348
  private final WorkflowIdReusePolicy workflowIdReusePolicy;
349

350
  private final Duration workflowRunTimeout;
351

352
  private final Duration workflowExecutionTimeout;
353

354
  private final Duration workflowTaskTimeout;
355

356
  private final String taskQueue;
357

358
  private final RetryOptions retryOptions;
359

360
  private final String cronSchedule;
361

362
  private final Map<String, Object> memo;
363

364
  private final Map<String, ?> searchAttributes;
365

366
  private final List<ContextPropagator> contextPropagators;
367

368
  private final boolean disableEagerExecution;
369

370
  private WorkflowOptions(
371
      String workflowId,
372
      WorkflowIdReusePolicy workflowIdReusePolicy,
373
      Duration workflowRunTimeout,
374
      Duration workflowExecutionTimeout,
375
      Duration workflowTaskTimeout,
376
      String taskQueue,
377
      RetryOptions retryOptions,
378
      String cronSchedule,
379
      Map<String, Object> memo,
380
      Map<String, ?> searchAttributes,
381
      List<ContextPropagator> contextPropagators,
382
      boolean disableEagerExecution) {
383
    this.workflowId = workflowId;
384
    this.workflowIdReusePolicy = workflowIdReusePolicy;
385
    this.workflowRunTimeout = workflowRunTimeout;
386
    this.workflowExecutionTimeout = workflowExecutionTimeout;
387
    this.workflowTaskTimeout = workflowTaskTimeout;
388
    this.taskQueue = taskQueue;
389
    this.retryOptions = retryOptions;
390
    this.cronSchedule = cronSchedule;
391
    this.memo = memo;
392
    this.searchAttributes = searchAttributes;
393
    this.contextPropagators = contextPropagators;
394
    this.disableEagerExecution = disableEagerExecution;
395
  }
396

397
  public String getWorkflowId() {
398
    return workflowId;
399
  }
400

401
  public WorkflowIdReusePolicy getWorkflowIdReusePolicy() {
402
    return workflowIdReusePolicy;
403
  }
404

405
  public Duration getWorkflowRunTimeout() {
406
    return workflowRunTimeout;
407
  }
408

409
  public Duration getWorkflowExecutionTimeout() {
410
    return workflowExecutionTimeout;
411
  }
412

413
  public Duration getWorkflowTaskTimeout() {
414
    return workflowTaskTimeout;
415
  }
416

417
  public String getTaskQueue() {
418
    return taskQueue;
419
  }
420

421
  public RetryOptions getRetryOptions() {
422
    return retryOptions;
423
  }
424

425
  public String getCronSchedule() {
426
    return cronSchedule;
427
  }
428

429
  public Map<String, Object> getMemo() {
430
    return memo;
431
  }
432

433
  public Map<String, ?> getSearchAttributes() {
434
    return searchAttributes;
435
  }
436

437
  /**
438
   * @return the list of context propagators to use during this workflow. This list overrides the
439
   *     list specified on {@link WorkflowClientOptions#getContextPropagators()}, {@code null} means
440
   *     no overriding
441
   */
442
  public @Nullable List<ContextPropagator> getContextPropagators() {
443
    return contextPropagators;
444
  }
445

446
  public boolean isDisableEagerExecution() {
447
    return disableEagerExecution;
448
  }
449

450
  public Builder toBuilder() {
451
    return new Builder(this);
452
  }
453

454
  @Override
455
  public boolean equals(Object o) {
456
    if (this == o) return true;
457
    if (o == null || getClass() != o.getClass()) return false;
458
    WorkflowOptions that = (WorkflowOptions) o;
459
    return Objects.equal(workflowId, that.workflowId)
460
        && workflowIdReusePolicy == that.workflowIdReusePolicy
461
        && Objects.equal(workflowRunTimeout, that.workflowRunTimeout)
462
        && Objects.equal(workflowExecutionTimeout, that.workflowExecutionTimeout)
463
        && Objects.equal(workflowTaskTimeout, that.workflowTaskTimeout)
464
        && Objects.equal(taskQueue, that.taskQueue)
465
        && Objects.equal(retryOptions, that.retryOptions)
466
        && Objects.equal(cronSchedule, that.cronSchedule)
467
        && Objects.equal(memo, that.memo)
468
        && Objects.equal(searchAttributes, that.searchAttributes)
469
        && Objects.equal(contextPropagators, that.contextPropagators)
470
        && Objects.equal(disableEagerExecution, that.disableEagerExecution);
471
  }
472

473
  @Override
474
  public int hashCode() {
475
    return Objects.hashCode(
476
        workflowId,
477
        workflowIdReusePolicy,
478
        workflowRunTimeout,
479
        workflowExecutionTimeout,
480
        workflowTaskTimeout,
481
        taskQueue,
482
        retryOptions,
483
        cronSchedule,
484
        memo,
485
        searchAttributes,
486
        contextPropagators,
487
        disableEagerExecution);
488
  }
489

490
  @Override
491
  public String toString() {
492
    return "WorkflowOptions{"
493
        + "workflowId='"
494
        + workflowId
495
        + '\''
496
        + ", workflowIdReusePolicy="
497
        + workflowIdReusePolicy
498
        + ", workflowRunTimeout="
499
        + workflowRunTimeout
500
        + ", workflowExecutionTimeout="
501
        + workflowExecutionTimeout
502
        + ", workflowTaskTimeout="
503
        + workflowTaskTimeout
504
        + ", taskQueue='"
505
        + taskQueue
506
        + '\''
507
        + ", retryOptions="
508
        + retryOptions
509
        + ", cronSchedule='"
510
        + cronSchedule
511
        + '\''
512
        + ", memo="
513
        + memo
514
        + ", searchAttributes="
515
        + searchAttributes
516
        + ", contextPropagators="
517
        + contextPropagators
518
        + ", disableEagerExecution="
519
        + disableEagerExecution
520
        + '}';
521
  }
522
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc