• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

temporalio / sdk-java / #256

29 May 2024 06:28PM UTC coverage: 77.463% (+0.02%) from 77.448%
#256

push

github

web-flow
Add identity to WorkflowOptions (#2080)

Provides the ability to override identity specified in a WorkflowClient

9 of 12 new or added lines in 2 files covered. (75.0%)

6 existing lines in 2 files now uncovered.

19190 of 24773 relevant lines covered (77.46%)

0.77 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

90.8
/temporal-sdk/src/main/java/io/temporal/worker/Worker.java
1
/*
2
 * Copyright (C) 2022 Temporal Technologies, Inc. All Rights Reserved.
3
 *
4
 * Copyright (C) 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved.
5
 *
6
 * Modifications copyright (C) 2017 Uber Technologies, Inc.
7
 *
8
 * Licensed under the Apache License, Version 2.0 (the "License");
9
 * you may not use this material except in compliance with the License.
10
 * You may obtain a copy of the License at
11
 *
12
 *   http://www.apache.org/licenses/LICENSE-2.0
13
 *
14
 * Unless required by applicable law or agreed to in writing, software
15
 * distributed under the License is distributed on an "AS IS" BASIS,
16
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17
 * See the License for the specific language governing permissions and
18
 * limitations under the License.
19
 */
20

21
package io.temporal.worker;
22

23
import com.google.common.annotations.VisibleForTesting;
24
import com.google.common.base.Preconditions;
25
import com.google.common.base.Strings;
26
import com.uber.m3.tally.Scope;
27
import com.uber.m3.util.ImmutableMap;
28
import io.temporal.client.WorkflowClient;
29
import io.temporal.client.WorkflowClientOptions;
30
import io.temporal.common.WorkflowExecutionHistory;
31
import io.temporal.common.context.ContextPropagator;
32
import io.temporal.common.converter.DataConverter;
33
import io.temporal.failure.TemporalFailure;
34
import io.temporal.internal.sync.WorkflowInternal;
35
import io.temporal.internal.sync.WorkflowThreadExecutor;
36
import io.temporal.internal.worker.*;
37
import io.temporal.internal.worker.SyncActivityWorker;
38
import io.temporal.internal.worker.SyncWorkflowWorker;
39
import io.temporal.internal.worker.WorkflowExecutorCache;
40
import io.temporal.serviceclient.MetricsTag;
41
import io.temporal.serviceclient.WorkflowServiceStubs;
42
import io.temporal.worker.tuning.*;
43
import io.temporal.workflow.Functions.Func;
44
import io.temporal.workflow.WorkflowMethod;
45
import java.time.Duration;
46
import java.util.List;
47
import java.util.Map;
48
import java.util.Objects;
49
import java.util.UUID;
50
import java.util.concurrent.CompletableFuture;
51
import java.util.concurrent.TimeUnit;
52
import java.util.concurrent.atomic.AtomicBoolean;
53
import javax.annotation.Nonnull;
54
import javax.annotation.Nullable;
55
import org.slf4j.Logger;
56
import org.slf4j.LoggerFactory;
57

58
/**
59
 * Hosts activity and workflow implementations. Uses long poll to receive activity and workflow
60
 * tasks and processes them in a correspondent thread pool.
61
 */
62
public final class Worker {
63
  private static final Logger log = LoggerFactory.getLogger(Worker.class);
1✔
64
  private final WorkerOptions options;
65
  private final String taskQueue;
66
  final SyncWorkflowWorker workflowWorker;
67
  final SyncActivityWorker activityWorker;
68
  private final AtomicBoolean started = new AtomicBoolean();
1✔
69

70
  /**
71
   * Creates worker that connects to an instance of the Temporal Service.
72
   *
73
   * @param client client to the Temporal Service endpoint.
74
   * @param taskQueue task queue name worker uses to poll. It uses this name for both workflow and
75
   *     activity task queue polls.
76
   * @param options Options (like {@link DataConverter} override) for configuring worker.
77
   * @param useStickyTaskQueue if sticky task queue should be used
78
   * @param workflowThreadExecutor workflow methods thread executor
79
   */
80
  Worker(
81
      WorkflowClient client,
82
      String taskQueue,
83
      WorkerFactoryOptions factoryOptions,
84
      WorkerOptions options,
85
      Scope metricsScope,
86
      @Nonnull WorkflowRunLockManager runLocks,
87
      @Nonnull WorkflowExecutorCache cache,
88
      boolean useStickyTaskQueue,
89
      WorkflowThreadExecutor workflowThreadExecutor,
90
      List<ContextPropagator> contextPropagators) {
1✔
91

92
    Objects.requireNonNull(client, "client should not be null");
1✔
93
    Preconditions.checkArgument(
1✔
94
        !Strings.isNullOrEmpty(taskQueue), "taskQueue should not be an empty string");
1✔
95
    this.taskQueue = taskQueue;
1✔
96
    this.options = WorkerOptions.newBuilder(options).validateAndBuildWithDefaults();
1✔
97
    factoryOptions = WorkerFactoryOptions.newBuilder(factoryOptions).validateAndBuildWithDefaults();
1✔
98
    WorkflowServiceStubs service = client.getWorkflowServiceStubs();
1✔
99
    WorkflowClientOptions clientOptions = client.getOptions();
1✔
100
    String namespace = clientOptions.getNamespace();
1✔
101
    Map<String, String> tags =
1✔
102
        new ImmutableMap.Builder<String, String>(1).put(MetricsTag.TASK_QUEUE, taskQueue).build();
1✔
103
    Scope taggedScope = metricsScope.tagged(tags);
1✔
104
    SingleWorkerOptions activityOptions =
1✔
105
        toActivityOptions(
1✔
106
            factoryOptions, this.options, clientOptions, contextPropagators, taggedScope);
107
    if (this.options.isLocalActivityWorkerOnly()) {
1✔
108
      activityWorker = null;
1✔
109
    } else {
110
      TrackingSlotSupplier<ActivitySlotInfo> activitySlotSupplier =
1✔
111
          new TrackingSlotSupplier<>(
112
              this.options.getActivitySlotSupplier() == null
1✔
113
                  ? new FixedSizeSlotSupplier<>(
1✔
114
                      this.options.getMaxConcurrentActivityExecutionSize())
1✔
115
                  : this.options.getActivitySlotSupplier());
1✔
116

117
      activityWorker =
1✔
118
          new SyncActivityWorker(
119
              service,
120
              namespace,
121
              taskQueue,
122
              this.options.getMaxTaskQueueActivitiesPerSecond(),
1✔
123
              activityOptions,
124
              activitySlotSupplier);
125
    }
126

127
    EagerActivityDispatcher eagerActivityDispatcher =
128
        (activityWorker != null && !this.options.isEagerExecutionDisabled())
1✔
129
            ? activityWorker.getEagerActivityDispatcher()
1✔
130
            : new EagerActivityDispatcher.NoopEagerActivityDispatcher();
1✔
131

132
    SingleWorkerOptions singleWorkerOptions =
1✔
133
        toWorkflowWorkerOptions(
1✔
134
            factoryOptions,
135
            this.options,
136
            clientOptions,
137
            taskQueue,
138
            contextPropagators,
139
            taggedScope);
140
    SingleWorkerOptions localActivityOptions =
1✔
141
        toLocalActivityOptions(
1✔
142
            factoryOptions, this.options, clientOptions, contextPropagators, taggedScope);
143

144
    TrackingSlotSupplier<WorkflowSlotInfo> workflowSlotSupplier =
1✔
145
        new TrackingSlotSupplier<>(
146
            this.options.getWorkflowSlotSupplier() == null
1✔
147
                ? new FixedSizeSlotSupplier<>(
1✔
148
                    this.options.getMaxConcurrentWorkflowTaskExecutionSize())
1✔
149
                : this.options.getWorkflowSlotSupplier());
1✔
150
    TrackingSlotSupplier<LocalActivitySlotInfo> localActivitySlotSupplier =
1✔
151
        new TrackingSlotSupplier<>(
152
            this.options.getLocalActivitySlotSupplier() == null
1✔
153
                ? new FixedSizeSlotSupplier<>(
1✔
154
                    this.options.getMaxConcurrentLocalActivityExecutionSize())
1✔
155
                : this.options.getLocalActivitySlotSupplier());
1✔
156
    workflowWorker =
1✔
157
        new SyncWorkflowWorker(
158
            service,
159
            namespace,
160
            taskQueue,
161
            singleWorkerOptions,
162
            localActivityOptions,
163
            runLocks,
164
            cache,
165
            useStickyTaskQueue ? getStickyTaskQueueName(client.getOptions().getIdentity()) : null,
1✔
166
            workflowThreadExecutor,
167
            eagerActivityDispatcher,
168
            workflowSlotSupplier,
169
            localActivitySlotSupplier);
170
  }
1✔
171

172
  /**
173
   * Registers workflow implementation classes with a worker. Can be called multiple times to add
174
   * more types. A workflow implementation class must implement at least one interface with a method
175
   * annotated with {@link WorkflowMethod}. By default, the short name of the interface is used as a
176
   * workflow type that this worker supports.
177
   *
178
   * <p>Implementations that share a worker must implement different interfaces as a workflow type
179
   * is identified by the workflow interface, not by the implementation.
180
   *
181
   * <p>Use {@link io.temporal.workflow.DynamicWorkflow} implementation to implement many workflow
182
   * types dynamically. It can be useful for implementing DSL based workflows. Only a single type
183
   * that implements DynamicWorkflow can be registered per worker.
184
   *
185
   * @throws TypeAlreadyRegisteredException if one of the workflow types is already registered
186
   */
187
  public void registerWorkflowImplementationTypes(Class<?>... workflowImplementationClasses) {
188
    Preconditions.checkState(
1✔
189
        !started.get(),
1✔
190
        "registerWorkflowImplementationTypes is not allowed after worker has started");
191

192
    workflowWorker.registerWorkflowImplementationTypes(
1✔
193
        WorkflowImplementationOptions.newBuilder().build(), workflowImplementationClasses);
1✔
194
  }
1✔
195

196
  /**
197
   * Registers workflow implementation classes with a worker. Can be called multiple times to add
198
   * more types. A workflow implementation class must implement at least one interface with a method
199
   * annotated with {@link WorkflowMethod}. By default, the short name of the interface is used as a
200
   * workflow type that this worker supports.
201
   *
202
   * <p>Implementations that share a worker must implement different interfaces as a workflow type
203
   * is identified by the workflow interface, not by the implementation.
204
   *
205
   * <p>Use {@link io.temporal.workflow.DynamicWorkflow} implementation to implement many workflow
206
   * types dynamically. It can be useful for implementing DSL based workflows. Only a single type
207
   * that implements DynamicWorkflow can be registered per worker.
208
   *
209
   * @throws TypeAlreadyRegisteredException if one of the workflow types is already registered
210
   */
211
  public void registerWorkflowImplementationTypes(
212
      WorkflowImplementationOptions options, Class<?>... workflowImplementationClasses) {
213
    Preconditions.checkState(
1✔
214
        !started.get(),
1✔
215
        "registerWorkflowImplementationTypes is not allowed after worker has started");
216

217
    workflowWorker.registerWorkflowImplementationTypes(options, workflowImplementationClasses);
1✔
218
  }
1✔
219

220
  /**
221
   * Configures a factory to use when an instance of a workflow implementation is created.
222
   * !IMPORTANT to provide newly created instances, each time factory is applied.
223
   *
224
   * <p>Unless mocking a workflow execution use {@link
225
   * #registerWorkflowImplementationTypes(Class[])}.
226
   *
227
   * @param workflowInterface Workflow interface that this factory implements
228
   * @param factory factory that when called creates a new instance of the workflow implementation
229
   *     object.
230
   * @param <R> type of the workflow object to create
231
   * @throws TypeAlreadyRegisteredException if one of the workflow types is already registered
232
   * @deprecated use {@link #registerWorkflowImplementationFactory(Class, Func,
233
   *     WorkflowImplementationOptions)} instead
234
   */
235
  @VisibleForTesting
236
  @Deprecated
237
  public <R> void addWorkflowImplementationFactory(
238
      WorkflowImplementationOptions options, Class<R> workflowInterface, Func<R> factory) {
239
    registerWorkflowImplementationFactory(workflowInterface, factory, options);
×
240
  }
×
241

242
  /**
243
   * <font color="red"> This method may behave differently from your expectations! This method makes
244
   * any {@link Throwable} thrown from a Workflow code to fail the Workflow Execution.
245
   *
246
   * <p>By default, only throwing {@link TemporalFailure} or an exception of class explicitly
247
   * specified on {@link WorkflowImplementationOptions.Builder#setFailWorkflowExceptionTypes} fails
248
   * Workflow Execution. Other exceptions fail Workflow Task instead of the whole Workflow
249
   * Execution. It is designed so that an exception which is not expected by the user, doesn't fail
250
   * the Workflow Execution. Which allows the user to fix Workflow implementation and then continue
251
   * the execution from the point it got stuck.
252
   *
253
   * <p>This method is misaligned with other workflow implementation registration methods in this
254
   * aspect.
255
   *
256
   * <p></font>
257
   *
258
   * @deprecated Use {@link #registerWorkflowImplementationFactory(Class, Func,
259
   *     WorkflowImplementationOptions)} with {@code
260
   *     WorkflowImplementationOptions.newBuilder().setFailWorkflowExceptionTypes(Throwable.class).build()}
261
   *     as a 3rd parameter to preserve the unexpected behavior of this method. <br>
262
   *     Or use {@link #registerWorkflowImplementationFactory(Class, Func)} with an expected
263
   *     behavior - Workflow Execution is failed only when a {@link TemporalFailure} subtype is
264
   *     thrown.
265
   */
266
  @VisibleForTesting
267
  @Deprecated
268
  public <R> void addWorkflowImplementationFactory(Class<R> workflowInterface, Func<R> factory) {
269
    WorkflowImplementationOptions unitTestingOptions =
270
        WorkflowImplementationOptions.newBuilder()
×
271
            .setFailWorkflowExceptionTypes(Throwable.class)
×
272
            .build();
×
273
    registerWorkflowImplementationFactory(workflowInterface, factory, unitTestingOptions);
×
274
  }
×
275

276
  /**
277
   * Configures a factory to use when an instance of a workflow implementation is created.
278
   *
279
   * <p>The only valid use for this method is unit testing and mocking. <br>
280
   * An example of mocking a child workflow:
281
   *
282
   * <pre><code>
283
   *   worker.addWorkflowImplementationFactory(ChildWorkflow.class, () -&gt; {
284
   *     ChildWorkflow child = mock(ChildWorkflow.class);
285
   *     when(child.workflow(anyString(), anyString())).thenReturn("result1");
286
   *     return child;
287
   *   });
288
   * </code></pre>
289
   *
290
   * <p>Unless mocking a workflow execution use {@link
291
   * #registerWorkflowImplementationTypes(Class[])}.
292
   *
293
   * <h1><font color="red">Workflow instantiation and Dependency Injection</font></h1>
294
   *
295
   * <font color="red"> This method may look convenient to integrate with dependency injection
296
   * frameworks and inject into workflow instances. Please note that Dependency Injection into
297
   * Workflow instances is strongly discouraged. Dependency Injection into Workflow Instances is a
298
   * direct way to cause changes that are incompatible with the persisted histories and cause
299
   * NonDeterministicException.
300
   *
301
   * <p>To provide an external configuration to a workflow in a deterministic way, use a Local
302
   * Activity that returns configuration to the workflow. Dependency Injection into Activity
303
   * instances is allowed. This way, the configuration is persisted into the history and maintained
304
   * same during replay.
305
   *
306
   * <p></font>
307
   *
308
   * @param workflowInterface Workflow interface that this factory implements
309
   * @param factory should create a new instance of the workflow implementation object every time
310
   *     it's called
311
   * @param options custom workflow implementation options for a worker
312
   * @param <R> type of the workflow object to create
313
   * @throws TypeAlreadyRegisteredException if one of the workflow types is already registered
314
   */
315
  @VisibleForTesting
316
  public <R> void registerWorkflowImplementationFactory(
317
      Class<R> workflowInterface, Func<R> factory, WorkflowImplementationOptions options) {
318
    workflowWorker.registerWorkflowImplementationFactory(options, workflowInterface, factory);
1✔
319
  }
1✔
320

321
  /**
322
   * Configures a factory to use when an instance of a workflow implementation is created.
323
   *
324
   * <p>The only valid use for this method is unit testing and mocking. <br>
325
   * An example of mocking a child workflow:
326
   *
327
   * <pre><code>
328
   *   worker.addWorkflowImplementationFactory(ChildWorkflow.class, () -&gt; {
329
   *     ChildWorkflow child = mock(ChildWorkflow.class);
330
   *     when(child.workflow(anyString(), anyString())).thenReturn("result1");
331
   *     return child;
332
   *   });
333
   * </code></pre>
334
   *
335
   * <p>Unless mocking a workflow execution use {@link
336
   * #registerWorkflowImplementationTypes(Class[])}.
337
   *
338
   * <h1><font color="red">Workflow instantiation and Dependency Injection</font></h1>
339
   *
340
   * <font color="red"> This method may look convenient to integrate with dependency injection
341
   * frameworks and inject into workflow instances. Please note that Dependency Injection into
342
   * Workflow instances is strongly discouraged. Dependency Injection into Workflow Instances is a
343
   * direct way to cause changes that are incompatible with the persisted histories and cause
344
   * NonDeterministicException.
345
   *
346
   * <p>To provide an external configuration to a workflow in a deterministic way, use a Local
347
   * Activity that returns configuration to the workflow. Dependency Injection into Activity
348
   * instances is allowed. This way, the configuration is persisted into the history and maintained
349
   * same during replay. </font>
350
   *
351
   * @param workflowInterface Workflow interface that this factory implements
352
   * @param factory should create a new instance of the workflow implementation object every time
353
   *     it's called
354
   * @param <R> type of the workflow object to create
355
   * @throws TypeAlreadyRegisteredException if one of the workflow types is already registered
356
   */
357
  @VisibleForTesting
358
  public <R> void registerWorkflowImplementationFactory(
359
      Class<R> workflowInterface, Func<R> factory) {
360
    workflowWorker.registerWorkflowImplementationFactory(
1✔
361
        WorkflowImplementationOptions.getDefaultInstance(), workflowInterface, factory);
1✔
362
  }
1✔
363

364
  /**
365
   * Register activity implementation objects with a worker. An implementation object can implement
366
   * one or more activity types.
367
   *
368
   * <p>An activity implementation object must implement at least one interface annotated with
369
   * {@link io.temporal.activity.ActivityInterface}. Each method of the annotated interface becomes
370
   * an activity type.
371
   *
372
   * <p>Implementations that share a worker must implement different interfaces as an activity type
373
   * is identified by the activity interface, not by the implementation.
374
   *
375
   * <p>Use an implementation of {@link io.temporal.activity.DynamicActivity} to register an object
376
   * that can implement activity types dynamically. A single registration of DynamicActivity
377
   * implementation per worker is allowed.
378
   *
379
   * @throws TypeAlreadyRegisteredException if one of the activity types is already registered
380
   */
381
  public void registerActivitiesImplementations(Object... activityImplementations) {
382
    Preconditions.checkState(
1✔
383
        !started.get(),
1✔
384
        "registerActivitiesImplementations is not allowed after worker has started");
385

386
    if (activityWorker != null) {
1✔
387
      activityWorker.registerActivityImplementations(activityImplementations);
1✔
388
    }
389
    workflowWorker.registerLocalActivityImplementations(activityImplementations);
1✔
390
  }
1✔
391

392
  void start() {
393
    if (!started.compareAndSet(false, true)) {
1✔
394
      return;
×
395
    }
396
    workflowWorker.start();
1✔
397
    if (activityWorker != null) {
1✔
398
      activityWorker.start();
1✔
399
    }
400
  }
1✔
401

402
  CompletableFuture<Void> shutdown(ShutdownManager shutdownManager, boolean interruptUserTasks) {
403
    CompletableFuture<Void> workflowWorkerShutdownFuture =
1✔
404
        workflowWorker.shutdown(shutdownManager, interruptUserTasks);
1✔
405
    if (activityWorker != null) {
1✔
406
      return CompletableFuture.allOf(
1✔
407
          activityWorker.shutdown(shutdownManager, interruptUserTasks),
1✔
408
          workflowWorkerShutdownFuture);
409
    } else {
410
      return workflowWorkerShutdownFuture;
1✔
411
    }
412
  }
413

414
  boolean isTerminated() {
415
    boolean isTerminated = workflowWorker.isTerminated();
1✔
416
    if (activityWorker != null) {
1✔
417
      isTerminated = activityWorker.isTerminated();
1✔
418
    }
419
    return isTerminated;
1✔
420
  }
421

422
  void awaitTermination(long timeout, TimeUnit unit) {
423
    long timeoutMillis = unit.toMillis(timeout);
1✔
424
    if (activityWorker != null) {
1✔
425
      timeoutMillis = ShutdownManager.awaitTermination(activityWorker, timeoutMillis);
1✔
426
    }
427
    ShutdownManager.awaitTermination(workflowWorker, timeoutMillis);
1✔
428
  }
1✔
429

430
  @Override
431
  public String toString() {
432
    return "Worker{" + "options=" + options + '}';
×
433
  }
434

435
  /**
436
   * This is a utility method to replay a workflow execution using this particular instance of a
437
   * worker. This method is useful for troubleshooting workflows by running them in a debugger. The
438
   * workflow implementation type must be already registered with this worker for this method to
439
   * work.
440
   *
441
   * <p>There is no need to call {@link #start()} to be able to call this method <br>
442
   * The worker doesn't have to be registered on the same task queue as the execution in the
443
   * history. <br>
444
   * This method shouldn't be a part of normal production usage. It's intended for testing and
445
   * debugging only.
446
   *
447
   * @param history workflow execution history to replay
448
   * @throws Exception if replay failed for any reason
449
   */
450
  @VisibleForTesting
451
  @SuppressWarnings("deprecation")
452
  public void replayWorkflowExecution(io.temporal.internal.common.WorkflowExecutionHistory history)
453
      throws Exception {
454
    workflowWorker.queryWorkflowExecution(
1✔
455
        history,
456
        WorkflowClient.QUERY_TYPE_REPLAY_ONLY,
457
        String.class,
458
        String.class,
459
        new Object[] {});
460
  }
1✔
461

462
  /**
463
   * This is a utility method to replay a workflow execution using this particular instance of a
464
   * worker. This method is useful to troubleshoot workflows by running them in a debugger. To work
465
   * the workflow implementation type must be registered with this worker. There is no need to call
466
   * {@link #start()} to be able to call this method.
467
   *
468
   * @param jsonSerializedHistory workflow execution history in JSON format to replay
469
   * @throws Exception if replay failed for any reason
470
   */
471
  public void replayWorkflowExecution(String jsonSerializedHistory) throws Exception {
472
    WorkflowExecutionHistory history = WorkflowExecutionHistory.fromJson(jsonSerializedHistory);
×
473
    replayWorkflowExecution(history);
×
474
  }
×
475

476
  public String getTaskQueue() {
477
    return taskQueue;
1✔
478
  }
479

480
  public void suspendPolling() {
481
    workflowWorker.suspendPolling();
1✔
482
    if (activityWorker != null) {
1✔
483
      activityWorker.suspendPolling();
1✔
484
    }
485
  }
1✔
486

487
  public void resumePolling() {
488
    workflowWorker.resumePolling();
1✔
489
    if (activityWorker != null) {
1✔
490
      activityWorker.resumePolling();
1✔
491
    }
492
  }
1✔
493

494
  public boolean isSuspended() {
495
    return workflowWorker.isSuspended() && (activityWorker == null || activityWorker.isSuspended());
1✔
496
  }
497

498
  @Nullable
499
  public WorkflowTaskDispatchHandle reserveWorkflowExecutor() {
500
    return workflowWorker.reserveWorkflowExecutor();
1✔
501
  }
502

503
  private static String getStickyTaskQueueName(String workerIdentity) {
504
    // Unique id is needed to avoid collisions with other workers that may be created for the same
505
    // task queue and with the same identity.
506
    // We don't include normal task queue name here because it's typically clear which task queue a
507
    // sticky queue is for from the workflow execution histories.
508
    UUID uniqueId = UUID.randomUUID();
1✔
509
    return String.format("%s:%s", workerIdentity, uniqueId);
1✔
510
  }
511

512
  /**
513
   * Name of the workflow type the interface defines. It is either the interface short name or value
514
   * of {@link WorkflowMethod#name()} parameter.
515
   *
516
   * @param workflowInterfaceClass interface annotated with @WorkflowInterface
517
   */
518
  public static String getWorkflowType(Class<?> workflowInterfaceClass) {
519
    return WorkflowInternal.getWorkflowType(workflowInterfaceClass);
1✔
520
  }
521

522
  private static SingleWorkerOptions toActivityOptions(
523
      WorkerFactoryOptions factoryOptions,
524
      WorkerOptions options,
525
      WorkflowClientOptions clientOptions,
526
      List<ContextPropagator> contextPropagators,
527
      Scope metricsScope) {
528
    return toSingleWorkerOptions(factoryOptions, options, clientOptions, contextPropagators)
1✔
529
        .setPollerOptions(
1✔
530
            PollerOptions.newBuilder()
1✔
531
                .setMaximumPollRatePerSecond(options.getMaxWorkerActivitiesPerSecond())
1✔
532
                .setPollThreadCount(options.getMaxConcurrentActivityTaskPollers())
1✔
533
                .build())
1✔
534
        .setMetricsScope(metricsScope)
1✔
535
        .build();
1✔
536
  }
537

538
  private static SingleWorkerOptions toWorkflowWorkerOptions(
539
      WorkerFactoryOptions factoryOptions,
540
      WorkerOptions options,
541
      WorkflowClientOptions clientOptions,
542
      String taskQueue,
543
      List<ContextPropagator> contextPropagators,
544
      Scope metricsScope) {
545
    Map<String, String> tags =
1✔
546
        new ImmutableMap.Builder<String, String>(1).put(MetricsTag.TASK_QUEUE, taskQueue).build();
1✔
547

548
    Duration stickyQueueScheduleToStartTimeout = options.getStickyQueueScheduleToStartTimeout();
1✔
549
    if (WorkerOptions.DEFAULT_STICKY_SCHEDULE_TO_START_TIMEOUT.equals(
1✔
550
            stickyQueueScheduleToStartTimeout)
551
        && factoryOptions.getWorkflowHostLocalTaskQueueScheduleToStartTimeout() != null) {
1✔
552
      stickyQueueScheduleToStartTimeout =
×
553
          factoryOptions.getWorkflowHostLocalTaskQueueScheduleToStartTimeout();
×
554
    }
555

556
    int maxConcurrentWorkflowTaskPollers = options.getMaxConcurrentWorkflowTaskPollers();
1✔
557
    if (maxConcurrentWorkflowTaskPollers == 1) {
1✔
558
      log.warn(
1✔
559
          "WorkerOptions.Builder#setMaxConcurrentWorkflowTaskPollers was set to 1. This is an illegal value. The number of Workflow Task Pollers is forced to 2. See documentation on WorkerOptions.Builder#setMaxConcurrentWorkflowTaskPollers");
560
      maxConcurrentWorkflowTaskPollers = 2;
1✔
561
    }
562

563
    return toSingleWorkerOptions(factoryOptions, options, clientOptions, contextPropagators)
1✔
564
        .setPollerOptions(
1✔
565
            PollerOptions.newBuilder().setPollThreadCount(maxConcurrentWorkflowTaskPollers).build())
1✔
566
        .setStickyQueueScheduleToStartTimeout(stickyQueueScheduleToStartTimeout)
1✔
567
        .setStickyTaskQueueDrainTimeout(options.getStickyTaskQueueDrainTimeout())
1✔
568
        .setDefaultDeadlockDetectionTimeout(options.getDefaultDeadlockDetectionTimeout())
1✔
569
        .setMetricsScope(metricsScope.tagged(tags))
1✔
570
        .build();
1✔
571
  }
572

573
  private static SingleWorkerOptions toLocalActivityOptions(
574
      WorkerFactoryOptions factoryOptions,
575
      WorkerOptions options,
576
      WorkflowClientOptions clientOptions,
577
      List<ContextPropagator> contextPropagators,
578
      Scope metricsScope) {
579
    return toSingleWorkerOptions(factoryOptions, options, clientOptions, contextPropagators)
1✔
580
        .setPollerOptions(PollerOptions.newBuilder().setPollThreadCount(1).build())
1✔
581
        .setMetricsScope(metricsScope)
1✔
582
        .build();
1✔
583
  }
584

585
  @SuppressWarnings("deprecation")
586
  private static SingleWorkerOptions.Builder toSingleWorkerOptions(
587
      WorkerFactoryOptions factoryOptions,
588
      WorkerOptions options,
589
      WorkflowClientOptions clientOptions,
590
      List<ContextPropagator> contextPropagators) {
591
    String buildId = null;
1✔
592
    if (options.getBuildId() != null) {
1✔
593
      buildId = options.getBuildId();
1✔
594
    } else if (clientOptions.getBinaryChecksum() != null) {
1✔
595
      buildId = clientOptions.getBinaryChecksum();
1✔
596
    }
597

598
    String identity = clientOptions.getIdentity();
1✔
599
    if (options.getIdentity() != null) {
1✔
NEW
600
      identity = options.getIdentity();
×
601
    }
602

603
    return SingleWorkerOptions.newBuilder()
1✔
604
        .setDataConverter(clientOptions.getDataConverter())
1✔
605
        .setIdentity(identity)
1✔
606
        .setBuildId(buildId)
1✔
607
        .setUseBuildIdForVersioning(options.isUsingBuildIdForVersioning())
1✔
608
        .setEnableLoggingInReplay(factoryOptions.isEnableLoggingInReplay())
1✔
609
        .setContextPropagators(contextPropagators)
1✔
610
        .setWorkerInterceptors(factoryOptions.getWorkerInterceptors())
1✔
611
        .setMaxHeartbeatThrottleInterval(options.getMaxHeartbeatThrottleInterval())
1✔
612
        .setDefaultHeartbeatThrottleInterval(options.getDefaultHeartbeatThrottleInterval());
1✔
613
  }
614
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc