• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

temporalio / sdk-java / #333

16 Oct 2024 07:28PM UTC coverage: 78.65% (+0.6%) from 78.085%
#333

push

github

web-flow
Fix code coverage (#2275)

22670 of 28824 relevant lines covered (78.65%)

0.79 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

92.19
/temporal-sdk/src/main/java/io/temporal/worker/Worker.java
1
/*
2
 * Copyright (C) 2022 Temporal Technologies, Inc. All Rights Reserved.
3
 *
4
 * Copyright (C) 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved.
5
 *
6
 * Modifications copyright (C) 2017 Uber Technologies, Inc.
7
 *
8
 * Licensed under the Apache License, Version 2.0 (the "License");
9
 * you may not use this material except in compliance with the License.
10
 * You may obtain a copy of the License at
11
 *
12
 *   http://www.apache.org/licenses/LICENSE-2.0
13
 *
14
 * Unless required by applicable law or agreed to in writing, software
15
 * distributed under the License is distributed on an "AS IS" BASIS,
16
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17
 * See the License for the specific language governing permissions and
18
 * limitations under the License.
19
 */
20

21
package io.temporal.worker;
22

23
import com.google.common.annotations.VisibleForTesting;
24
import com.google.common.base.Preconditions;
25
import com.google.common.base.Strings;
26
import com.uber.m3.tally.Scope;
27
import com.uber.m3.util.ImmutableMap;
28
import io.temporal.client.WorkflowClient;
29
import io.temporal.client.WorkflowClientOptions;
30
import io.temporal.common.Experimental;
31
import io.temporal.common.WorkflowExecutionHistory;
32
import io.temporal.common.context.ContextPropagator;
33
import io.temporal.common.converter.DataConverter;
34
import io.temporal.failure.TemporalFailure;
35
import io.temporal.internal.sync.WorkflowInternal;
36
import io.temporal.internal.sync.WorkflowThreadExecutor;
37
import io.temporal.internal.worker.*;
38
import io.temporal.serviceclient.MetricsTag;
39
import io.temporal.serviceclient.WorkflowServiceStubs;
40
import io.temporal.worker.tuning.*;
41
import io.temporal.workflow.Functions.Func;
42
import io.temporal.workflow.WorkflowMethod;
43
import java.time.Duration;
44
import java.util.List;
45
import java.util.Map;
46
import java.util.Objects;
47
import java.util.UUID;
48
import java.util.concurrent.CompletableFuture;
49
import java.util.concurrent.TimeUnit;
50
import java.util.concurrent.atomic.AtomicBoolean;
51
import javax.annotation.Nonnull;
52
import javax.annotation.Nullable;
53
import org.slf4j.Logger;
54
import org.slf4j.LoggerFactory;
55

56
/**
57
 * Hosts activity, nexus and workflow implementations. Uses long poll to receive workflow, activity
58
 * and nexus tasks and processes them in a correspondent thread pool.
59
 */
60
public final class Worker {
61
  private static final Logger log = LoggerFactory.getLogger(Worker.class);
1✔
62
  private final WorkerOptions options;
63
  private final String taskQueue;
64
  final SyncWorkflowWorker workflowWorker;
65
  final SyncActivityWorker activityWorker;
66
  final SyncNexusWorker nexusWorker;
67
  private final AtomicBoolean started = new AtomicBoolean();
1✔
68

69
  /**
70
   * Creates worker that connects to an instance of the Temporal Service.
71
   *
72
   * @param client client to the Temporal Service endpoint.
73
   * @param taskQueue task queue name worker uses to poll. It uses this name for both workflow and
74
   *     activity task queue polls.
75
   * @param options Options (like {@link DataConverter} override) for configuring worker.
76
   * @param useStickyTaskQueue if sticky task queue should be used
77
   * @param workflowThreadExecutor workflow methods thread executor
78
   */
79
  Worker(
80
      WorkflowClient client,
81
      String taskQueue,
82
      WorkerFactoryOptions factoryOptions,
83
      WorkerOptions options,
84
      Scope metricsScope,
85
      @Nonnull WorkflowRunLockManager runLocks,
86
      @Nonnull WorkflowExecutorCache cache,
87
      boolean useStickyTaskQueue,
88
      WorkflowThreadExecutor workflowThreadExecutor,
89
      List<ContextPropagator> contextPropagators) {
1✔
90

91
    Objects.requireNonNull(client, "client should not be null");
1✔
92
    Preconditions.checkArgument(
1✔
93
        !Strings.isNullOrEmpty(taskQueue), "taskQueue should not be an empty string");
1✔
94
    this.taskQueue = taskQueue;
1✔
95
    this.options = WorkerOptions.newBuilder(options).validateAndBuildWithDefaults();
1✔
96
    factoryOptions = WorkerFactoryOptions.newBuilder(factoryOptions).validateAndBuildWithDefaults();
1✔
97
    WorkflowServiceStubs service = client.getWorkflowServiceStubs();
1✔
98
    WorkflowClientOptions clientOptions = client.getOptions();
1✔
99
    String namespace = clientOptions.getNamespace();
1✔
100
    Map<String, String> tags =
1✔
101
        new ImmutableMap.Builder<String, String>(1).put(MetricsTag.TASK_QUEUE, taskQueue).build();
1✔
102
    Scope taggedScope = metricsScope.tagged(tags);
1✔
103
    SingleWorkerOptions activityOptions =
1✔
104
        toActivityOptions(
1✔
105
            factoryOptions, this.options, clientOptions, contextPropagators, taggedScope);
106
    if (this.options.isLocalActivityWorkerOnly()) {
1✔
107
      activityWorker = null;
1✔
108
    } else {
109
      SlotSupplier<ActivitySlotInfo> activitySlotSupplier =
110
          this.options.getWorkerTuner() == null
1✔
111
              ? new FixedSizeSlotSupplier<>(this.options.getMaxConcurrentActivityExecutionSize())
1✔
112
              : this.options.getWorkerTuner().getActivityTaskSlotSupplier();
1✔
113
      attachMetricsToResourceController(taggedScope, activitySlotSupplier);
1✔
114

115
      activityWorker =
1✔
116
          new SyncActivityWorker(
117
              service,
118
              namespace,
119
              taskQueue,
120
              this.options.getMaxTaskQueueActivitiesPerSecond(),
1✔
121
              activityOptions,
122
              activitySlotSupplier);
123
    }
124

125
    EagerActivityDispatcher eagerActivityDispatcher =
126
        (activityWorker != null && !this.options.isEagerExecutionDisabled())
1✔
127
            ? activityWorker.getEagerActivityDispatcher()
1✔
128
            : new EagerActivityDispatcher.NoopEagerActivityDispatcher();
1✔
129

130
    SingleWorkerOptions nexusOptions =
1✔
131
        toNexusOptions(
1✔
132
            factoryOptions, this.options, clientOptions, contextPropagators, taggedScope);
133
    SlotSupplier<NexusSlotInfo> nexusSlotSupplier =
134
        this.options.getWorkerTuner() == null
1✔
135
            ? new FixedSizeSlotSupplier<>(this.options.getMaxConcurrentNexusExecutionSize())
1✔
136
            : this.options.getWorkerTuner().getNexusSlotSupplier();
1✔
137
    attachMetricsToResourceController(taggedScope, nexusSlotSupplier);
1✔
138

139
    nexusWorker =
1✔
140
        new SyncNexusWorker(client, namespace, taskQueue, nexusOptions, nexusSlotSupplier);
141

142
    SingleWorkerOptions singleWorkerOptions =
1✔
143
        toWorkflowWorkerOptions(
1✔
144
            factoryOptions,
145
            this.options,
146
            clientOptions,
147
            taskQueue,
148
            contextPropagators,
149
            taggedScope);
150
    SingleWorkerOptions localActivityOptions =
1✔
151
        toLocalActivityOptions(
1✔
152
            factoryOptions, this.options, clientOptions, contextPropagators, taggedScope);
153

154
    SlotSupplier<WorkflowSlotInfo> workflowSlotSupplier =
155
        this.options.getWorkerTuner() == null
1✔
156
            ? new FixedSizeSlotSupplier<>(this.options.getMaxConcurrentWorkflowTaskExecutionSize())
1✔
157
            : this.options.getWorkerTuner().getWorkflowTaskSlotSupplier();
1✔
158
    attachMetricsToResourceController(taggedScope, workflowSlotSupplier);
1✔
159
    SlotSupplier<LocalActivitySlotInfo> localActivitySlotSupplier =
160
        this.options.getWorkerTuner() == null
1✔
161
            ? new FixedSizeSlotSupplier<>(this.options.getMaxConcurrentLocalActivityExecutionSize())
1✔
162
            : this.options.getWorkerTuner().getLocalActivitySlotSupplier();
1✔
163
    attachMetricsToResourceController(taggedScope, localActivitySlotSupplier);
1✔
164
    workflowWorker =
1✔
165
        new SyncWorkflowWorker(
166
            service,
167
            namespace,
168
            taskQueue,
169
            singleWorkerOptions,
170
            localActivityOptions,
171
            runLocks,
172
            cache,
173
            useStickyTaskQueue ? getStickyTaskQueueName(client.getOptions().getIdentity()) : null,
1✔
174
            workflowThreadExecutor,
175
            eagerActivityDispatcher,
176
            workflowSlotSupplier,
177
            localActivitySlotSupplier);
178
  }
1✔
179

180
  /**
181
   * Registers workflow implementation classes with a worker. Can be called multiple times to add
182
   * more types. A workflow implementation class must implement at least one interface with a method
183
   * annotated with {@link WorkflowMethod}. By default, the short name of the interface is used as a
184
   * workflow type that this worker supports.
185
   *
186
   * <p>Implementations that share a worker must implement different interfaces as a workflow type
187
   * is identified by the workflow interface, not by the implementation.
188
   *
189
   * <p>Use {@link io.temporal.workflow.DynamicWorkflow} implementation to implement many workflow
190
   * types dynamically. It can be useful for implementing DSL based workflows. Only a single type
191
   * that implements DynamicWorkflow can be registered per worker.
192
   *
193
   * @throws TypeAlreadyRegisteredException if one of the workflow types is already registered
194
   */
195
  public void registerWorkflowImplementationTypes(Class<?>... workflowImplementationClasses) {
196
    Preconditions.checkState(
1✔
197
        !started.get(),
1✔
198
        "registerWorkflowImplementationTypes is not allowed after worker has started");
199

200
    workflowWorker.registerWorkflowImplementationTypes(
1✔
201
        WorkflowImplementationOptions.newBuilder().build(), workflowImplementationClasses);
1✔
202
  }
1✔
203

204
  /**
205
   * Registers workflow implementation classes with a worker. Can be called multiple times to add
206
   * more types. A workflow implementation class must implement at least one interface with a method
207
   * annotated with {@link WorkflowMethod}. By default, the short name of the interface is used as a
208
   * workflow type that this worker supports.
209
   *
210
   * <p>Implementations that share a worker must implement different interfaces as a workflow type
211
   * is identified by the workflow interface, not by the implementation.
212
   *
213
   * <p>Use {@link io.temporal.workflow.DynamicWorkflow} implementation to implement many workflow
214
   * types dynamically. It can be useful for implementing DSL based workflows. Only a single type
215
   * that implements DynamicWorkflow can be registered per worker.
216
   *
217
   * @throws TypeAlreadyRegisteredException if one of the workflow types is already registered
218
   */
219
  public void registerWorkflowImplementationTypes(
220
      WorkflowImplementationOptions options, Class<?>... workflowImplementationClasses) {
221
    Preconditions.checkState(
1✔
222
        !started.get(),
1✔
223
        "registerWorkflowImplementationTypes is not allowed after worker has started");
224

225
    workflowWorker.registerWorkflowImplementationTypes(options, workflowImplementationClasses);
1✔
226
  }
1✔
227

228
  /**
229
   * Configures a factory to use when an instance of a workflow implementation is created.
230
   * !IMPORTANT to provide newly created instances, each time factory is applied.
231
   *
232
   * <p>Unless mocking a workflow execution use {@link
233
   * #registerWorkflowImplementationTypes(Class[])}.
234
   *
235
   * @param workflowInterface Workflow interface that this factory implements
236
   * @param factory factory that when called creates a new instance of the workflow implementation
237
   *     object.
238
   * @param <R> type of the workflow object to create
239
   * @throws TypeAlreadyRegisteredException if one of the workflow types is already registered
240
   * @deprecated use {@link #registerWorkflowImplementationFactory(Class, Func,
241
   *     WorkflowImplementationOptions)} instead
242
   */
243
  @VisibleForTesting
244
  @Deprecated
245
  public <R> void addWorkflowImplementationFactory(
246
      WorkflowImplementationOptions options, Class<R> workflowInterface, Func<R> factory) {
247
    registerWorkflowImplementationFactory(workflowInterface, factory, options);
×
248
  }
×
249

250
  /**
251
   * <font color="red"> This method may behave differently from your expectations! This method makes
252
   * any {@link Throwable} thrown from a Workflow code to fail the Workflow Execution.
253
   *
254
   * <p>By default, only throwing {@link TemporalFailure} or an exception of class explicitly
255
   * specified on {@link WorkflowImplementationOptions.Builder#setFailWorkflowExceptionTypes} fails
256
   * Workflow Execution. Other exceptions fail Workflow Task instead of the whole Workflow
257
   * Execution. It is designed so that an exception which is not expected by the user, doesn't fail
258
   * the Workflow Execution. Which allows the user to fix Workflow implementation and then continue
259
   * the execution from the point it got stuck.
260
   *
261
   * <p>This method is misaligned with other workflow implementation registration methods in this
262
   * aspect.
263
   *
264
   * <p></font>
265
   *
266
   * @deprecated Use {@link #registerWorkflowImplementationFactory(Class, Func,
267
   *     WorkflowImplementationOptions)} with {@code
268
   *     WorkflowImplementationOptions.newBuilder().setFailWorkflowExceptionTypes(Throwable.class).build()}
269
   *     as a 3rd parameter to preserve the unexpected behavior of this method. <br>
270
   *     Or use {@link #registerWorkflowImplementationFactory(Class, Func)} with an expected
271
   *     behavior - Workflow Execution is failed only when a {@link TemporalFailure} subtype is
272
   *     thrown.
273
   */
274
  @VisibleForTesting
275
  @Deprecated
276
  public <R> void addWorkflowImplementationFactory(Class<R> workflowInterface, Func<R> factory) {
277
    WorkflowImplementationOptions unitTestingOptions =
278
        WorkflowImplementationOptions.newBuilder()
×
279
            .setFailWorkflowExceptionTypes(Throwable.class)
×
280
            .build();
×
281
    registerWorkflowImplementationFactory(workflowInterface, factory, unitTestingOptions);
×
282
  }
×
283

284
  /**
285
   * Configures a factory to use when an instance of a workflow implementation is created.
286
   *
287
   * <p>The only valid use for this method is unit testing and mocking. <br>
288
   * An example of mocking a child workflow:
289
   *
290
   * <pre><code>
291
   *   worker.addWorkflowImplementationFactory(ChildWorkflow.class, () -&gt; {
292
   *     ChildWorkflow child = mock(ChildWorkflow.class);
293
   *     when(child.workflow(anyString(), anyString())).thenReturn("result1");
294
   *     return child;
295
   *   });
296
   * </code></pre>
297
   *
298
   * <p>Unless mocking a workflow execution use {@link
299
   * #registerWorkflowImplementationTypes(Class[])}.
300
   *
301
   * <h1><font color="red">Workflow instantiation and Dependency Injection</font></h1>
302
   *
303
   * <font color="red"> This method may look convenient to integrate with dependency injection
304
   * frameworks and inject into workflow instances. Please note that Dependency Injection into
305
   * Workflow instances is strongly discouraged. Dependency Injection into Workflow Instances is a
306
   * direct way to cause changes that are incompatible with the persisted histories and cause
307
   * NonDeterministicException.
308
   *
309
   * <p>To provide an external configuration to a workflow in a deterministic way, use a Local
310
   * Activity that returns configuration to the workflow. Dependency Injection into Activity
311
   * instances is allowed. This way, the configuration is persisted into the history and maintained
312
   * same during replay.
313
   *
314
   * <p></font>
315
   *
316
   * @param workflowInterface Workflow interface that this factory implements
317
   * @param factory should create a new instance of the workflow implementation object every time
318
   *     it's called
319
   * @param options custom workflow implementation options for a worker
320
   * @param <R> type of the workflow object to create
321
   * @throws TypeAlreadyRegisteredException if one of the workflow types is already registered
322
   */
323
  @VisibleForTesting
324
  public <R> void registerWorkflowImplementationFactory(
325
      Class<R> workflowInterface, Func<R> factory, WorkflowImplementationOptions options) {
326
    workflowWorker.registerWorkflowImplementationFactory(options, workflowInterface, factory);
1✔
327
  }
1✔
328

329
  /**
330
   * Configures a factory to use when an instance of a workflow implementation is created.
331
   *
332
   * <p>The only valid use for this method is unit testing and mocking. <br>
333
   * An example of mocking a child workflow:
334
   *
335
   * <pre><code>
336
   *   worker.addWorkflowImplementationFactory(ChildWorkflow.class, () -&gt; {
337
   *     ChildWorkflow child = mock(ChildWorkflow.class);
338
   *     when(child.workflow(anyString(), anyString())).thenReturn("result1");
339
   *     return child;
340
   *   });
341
   * </code></pre>
342
   *
343
   * <p>Unless mocking a workflow execution use {@link
344
   * #registerWorkflowImplementationTypes(Class[])}.
345
   *
346
   * <h1><font color="red">Workflow instantiation and Dependency Injection</font></h1>
347
   *
348
   * <font color="red"> This method may look convenient to integrate with dependency injection
349
   * frameworks and inject into workflow instances. Please note that Dependency Injection into
350
   * Workflow instances is strongly discouraged. Dependency Injection into Workflow Instances is a
351
   * direct way to cause changes that are incompatible with the persisted histories and cause
352
   * NonDeterministicException.
353
   *
354
   * <p>To provide an external configuration to a workflow in a deterministic way, use a Local
355
   * Activity that returns configuration to the workflow. Dependency Injection into Activity
356
   * instances is allowed. This way, the configuration is persisted into the history and maintained
357
   * same during replay. </font>
358
   *
359
   * @param workflowInterface Workflow interface that this factory implements
360
   * @param factory should create a new instance of the workflow implementation object every time
361
   *     it's called
362
   * @param <R> type of the workflow object to create
363
   * @throws TypeAlreadyRegisteredException if one of the workflow types is already registered
364
   */
365
  @VisibleForTesting
366
  public <R> void registerWorkflowImplementationFactory(
367
      Class<R> workflowInterface, Func<R> factory) {
368
    workflowWorker.registerWorkflowImplementationFactory(
1✔
369
        WorkflowImplementationOptions.getDefaultInstance(), workflowInterface, factory);
1✔
370
  }
1✔
371

372
  /**
373
   * Register activity implementation objects with a worker. An implementation object can implement
374
   * one or more activity types.
375
   *
376
   * <p>An activity implementation object must implement at least one interface annotated with
377
   * {@link io.temporal.activity.ActivityInterface}. Each method of the annotated interface becomes
378
   * an activity type.
379
   *
380
   * <p>Implementations that share a worker must implement different interfaces as an activity type
381
   * is identified by the activity interface, not by the implementation.
382
   *
383
   * <p>Use an implementation of {@link io.temporal.activity.DynamicActivity} to register an object
384
   * that can implement activity types dynamically. A single registration of DynamicActivity
385
   * implementation per worker is allowed.
386
   *
387
   * @throws TypeAlreadyRegisteredException if one of the activity types is already registered
388
   */
389
  public void registerActivitiesImplementations(Object... activityImplementations) {
390
    Preconditions.checkState(
1✔
391
        !started.get(),
1✔
392
        "registerActivitiesImplementations is not allowed after worker has started");
393

394
    if (activityWorker != null) {
1✔
395
      activityWorker.registerActivityImplementations(activityImplementations);
1✔
396
    }
397
    workflowWorker.registerLocalActivityImplementations(activityImplementations);
1✔
398
  }
1✔
399

400
  /**
401
   * Register Nexus service implementation objects with a worker.
402
   *
403
   * <p>A Nexus service object must be annotated with {@link io.nexusrpc.handler.ServiceImpl}.
404
   *
405
   * @throws TypeAlreadyRegisteredException if one of the services is already registered
406
   */
407
  @Experimental
408
  public void registerNexusServiceImplementation(Object... nexusServiceImplementations) {
409
    Preconditions.checkState(
1✔
410
        !started.get(),
1✔
411
        "registerNexusServiceImplementation is not allowed after worker has started");
412
    nexusWorker.registerNexusServiceImplementation(nexusServiceImplementations);
1✔
413
  }
1✔
414

415
  void start() {
416
    if (!started.compareAndSet(false, true)) {
1✔
417
      return;
×
418
    }
419
    workflowWorker.start();
1✔
420
    nexusWorker.start();
1✔
421
    if (activityWorker != null) {
1✔
422
      activityWorker.start();
1✔
423
    }
424
  }
1✔
425

426
  CompletableFuture<Void> shutdown(ShutdownManager shutdownManager, boolean interruptUserTasks) {
427
    CompletableFuture<Void> workflowWorkerShutdownFuture =
1✔
428
        workflowWorker.shutdown(shutdownManager, interruptUserTasks);
1✔
429
    CompletableFuture<Void> nexusWorkerShutdownFuture =
1✔
430
        nexusWorker.shutdown(shutdownManager, interruptUserTasks);
1✔
431
    if (activityWorker != null) {
1✔
432
      return CompletableFuture.allOf(
1✔
433
          activityWorker.shutdown(shutdownManager, interruptUserTasks),
1✔
434
          workflowWorkerShutdownFuture,
435
          nexusWorkerShutdownFuture);
436
    } else {
437
      return CompletableFuture.allOf(workflowWorkerShutdownFuture, nexusWorkerShutdownFuture);
1✔
438
    }
439
  }
440

441
  boolean isTerminated() {
442
    boolean isTerminated = workflowWorker.isTerminated();
1✔
443
    isTerminated &= nexusWorker.isTerminated();
1✔
444
    if (activityWorker != null) {
1✔
445
      isTerminated &= activityWorker.isTerminated();
1✔
446
    }
447
    return isTerminated;
1✔
448
  }
449

450
  void awaitTermination(long timeout, TimeUnit unit) {
451
    long timeoutMillis = unit.toMillis(timeout);
1✔
452
    if (activityWorker != null) {
1✔
453
      timeoutMillis = ShutdownManager.awaitTermination(activityWorker, timeoutMillis);
1✔
454
    }
455
    timeoutMillis = ShutdownManager.awaitTermination(nexusWorker, timeoutMillis);
1✔
456
    ShutdownManager.awaitTermination(workflowWorker, timeoutMillis);
1✔
457
  }
1✔
458

459
  @Override
460
  public String toString() {
461
    return "Worker{" + "options=" + options + '}';
×
462
  }
463

464
  /**
465
   * This is a utility method to replay a workflow execution using this particular instance of a
466
   * worker. This method is useful for troubleshooting workflows by running them in a debugger. The
467
   * workflow implementation type must be already registered with this worker for this method to
468
   * work.
469
   *
470
   * <p>There is no need to call {@link #start()} to be able to call this method <br>
471
   * The worker doesn't have to be registered on the same task queue as the execution in the
472
   * history. <br>
473
   * This method shouldn't be a part of normal production usage. It's intended for testing and
474
   * debugging only.
475
   *
476
   * @param history workflow execution history to replay
477
   * @throws Exception if replay failed for any reason
478
   */
479
  @VisibleForTesting
480
  @SuppressWarnings("deprecation")
481
  public void replayWorkflowExecution(io.temporal.internal.common.WorkflowExecutionHistory history)
482
      throws Exception {
483
    workflowWorker.queryWorkflowExecution(
1✔
484
        history,
485
        WorkflowClient.QUERY_TYPE_REPLAY_ONLY,
486
        String.class,
487
        String.class,
488
        new Object[] {});
489
  }
1✔
490

491
  /**
492
   * This is a utility method to replay a workflow execution using this particular instance of a
493
   * worker. This method is useful to troubleshoot workflows by running them in a debugger. To work
494
   * the workflow implementation type must be registered with this worker. There is no need to call
495
   * {@link #start()} to be able to call this method.
496
   *
497
   * @param jsonSerializedHistory workflow execution history in JSON format to replay
498
   * @throws Exception if replay failed for any reason
499
   */
500
  public void replayWorkflowExecution(String jsonSerializedHistory) throws Exception {
501
    WorkflowExecutionHistory history = WorkflowExecutionHistory.fromJson(jsonSerializedHistory);
×
502
    replayWorkflowExecution(history);
×
503
  }
×
504

505
  public String getTaskQueue() {
506
    return taskQueue;
1✔
507
  }
508

509
  public void suspendPolling() {
510
    workflowWorker.suspendPolling();
1✔
511
    nexusWorker.suspendPolling();
1✔
512
    if (activityWorker != null) {
1✔
513
      activityWorker.suspendPolling();
1✔
514
    }
515
  }
1✔
516

517
  public void resumePolling() {
518
    workflowWorker.resumePolling();
1✔
519
    nexusWorker.resumePolling();
1✔
520
    if (activityWorker != null) {
1✔
521
      activityWorker.resumePolling();
1✔
522
    }
523
  }
1✔
524

525
  public boolean isSuspended() {
526
    return workflowWorker.isSuspended()
1✔
527
        && nexusWorker.isSuspended()
1✔
528
        && (activityWorker == null || activityWorker.isSuspended());
1✔
529
  }
530

531
  @Nullable
532
  public WorkflowTaskDispatchHandle reserveWorkflowExecutor() {
533
    return workflowWorker.reserveWorkflowExecutor();
1✔
534
  }
535

536
  private static String getStickyTaskQueueName(String workerIdentity) {
537
    // Unique id is needed to avoid collisions with other workers that may be created for the same
538
    // task queue and with the same identity.
539
    // We don't include normal task queue name here because it's typically clear which task queue a
540
    // sticky queue is for from the workflow execution histories.
541
    UUID uniqueId = UUID.randomUUID();
1✔
542
    return String.format("%s:%s", workerIdentity, uniqueId);
1✔
543
  }
544

545
  /**
546
   * Name of the workflow type the interface defines. It is either the interface short name or value
547
   * of {@link WorkflowMethod#name()} parameter.
548
   *
549
   * @param workflowInterfaceClass interface annotated with @WorkflowInterface
550
   */
551
  public static String getWorkflowType(Class<?> workflowInterfaceClass) {
552
    return WorkflowInternal.getWorkflowType(workflowInterfaceClass);
1✔
553
  }
554

555
  private static SingleWorkerOptions toActivityOptions(
556
      WorkerFactoryOptions factoryOptions,
557
      WorkerOptions options,
558
      WorkflowClientOptions clientOptions,
559
      List<ContextPropagator> contextPropagators,
560
      Scope metricsScope) {
561
    return toSingleWorkerOptions(factoryOptions, options, clientOptions, contextPropagators)
1✔
562
        .setPollerOptions(
1✔
563
            PollerOptions.newBuilder()
1✔
564
                .setMaximumPollRatePerSecond(options.getMaxWorkerActivitiesPerSecond())
1✔
565
                .setPollThreadCount(options.getMaxConcurrentActivityTaskPollers())
1✔
566
                .build())
1✔
567
        .setMetricsScope(metricsScope)
1✔
568
        .build();
1✔
569
  }
570

571
  private static SingleWorkerOptions toNexusOptions(
572
      WorkerFactoryOptions factoryOptions,
573
      WorkerOptions options,
574
      WorkflowClientOptions clientOptions,
575
      List<ContextPropagator> contextPropagators,
576
      Scope metricsScope) {
577
    return toSingleWorkerOptions(factoryOptions, options, clientOptions, contextPropagators)
1✔
578
        .setPollerOptions(
1✔
579
            PollerOptions.newBuilder()
1✔
580
                .setPollThreadCount(options.getMaxConcurrentNexusTaskPollers())
1✔
581
                .build())
1✔
582
        .setMetricsScope(metricsScope)
1✔
583
        .build();
1✔
584
  }
585

586
  private static SingleWorkerOptions toWorkflowWorkerOptions(
587
      WorkerFactoryOptions factoryOptions,
588
      WorkerOptions options,
589
      WorkflowClientOptions clientOptions,
590
      String taskQueue,
591
      List<ContextPropagator> contextPropagators,
592
      Scope metricsScope) {
593
    Map<String, String> tags =
1✔
594
        new ImmutableMap.Builder<String, String>(1).put(MetricsTag.TASK_QUEUE, taskQueue).build();
1✔
595

596
    Duration stickyQueueScheduleToStartTimeout = options.getStickyQueueScheduleToStartTimeout();
1✔
597
    if (WorkerOptions.DEFAULT_STICKY_SCHEDULE_TO_START_TIMEOUT.equals(
1✔
598
            stickyQueueScheduleToStartTimeout)
599
        && factoryOptions.getWorkflowHostLocalTaskQueueScheduleToStartTimeout() != null) {
1✔
600
      stickyQueueScheduleToStartTimeout =
×
601
          factoryOptions.getWorkflowHostLocalTaskQueueScheduleToStartTimeout();
×
602
    }
603

604
    int maxConcurrentWorkflowTaskPollers = options.getMaxConcurrentWorkflowTaskPollers();
1✔
605
    if (maxConcurrentWorkflowTaskPollers == 1) {
1✔
606
      log.warn(
1✔
607
          "WorkerOptions.Builder#setMaxConcurrentWorkflowTaskPollers was set to 1. This is an illegal value. The number of Workflow Task Pollers is forced to 2. See documentation on WorkerOptions.Builder#setMaxConcurrentWorkflowTaskPollers");
608
      maxConcurrentWorkflowTaskPollers = 2;
1✔
609
    }
610

611
    return toSingleWorkerOptions(factoryOptions, options, clientOptions, contextPropagators)
1✔
612
        .setPollerOptions(
1✔
613
            PollerOptions.newBuilder().setPollThreadCount(maxConcurrentWorkflowTaskPollers).build())
1✔
614
        .setStickyQueueScheduleToStartTimeout(stickyQueueScheduleToStartTimeout)
1✔
615
        .setStickyTaskQueueDrainTimeout(options.getStickyTaskQueueDrainTimeout())
1✔
616
        .setDefaultDeadlockDetectionTimeout(options.getDefaultDeadlockDetectionTimeout())
1✔
617
        .setMetricsScope(metricsScope.tagged(tags))
1✔
618
        .build();
1✔
619
  }
620

621
  private static SingleWorkerOptions toLocalActivityOptions(
622
      WorkerFactoryOptions factoryOptions,
623
      WorkerOptions options,
624
      WorkflowClientOptions clientOptions,
625
      List<ContextPropagator> contextPropagators,
626
      Scope metricsScope) {
627
    return toSingleWorkerOptions(factoryOptions, options, clientOptions, contextPropagators)
1✔
628
        .setPollerOptions(PollerOptions.newBuilder().setPollThreadCount(1).build())
1✔
629
        .setMetricsScope(metricsScope)
1✔
630
        .build();
1✔
631
  }
632

633
  @SuppressWarnings("deprecation")
634
  private static SingleWorkerOptions.Builder toSingleWorkerOptions(
635
      WorkerFactoryOptions factoryOptions,
636
      WorkerOptions options,
637
      WorkflowClientOptions clientOptions,
638
      List<ContextPropagator> contextPropagators) {
639
    String buildId = null;
1✔
640
    if (options.getBuildId() != null) {
1✔
641
      buildId = options.getBuildId();
1✔
642
    } else if (clientOptions.getBinaryChecksum() != null) {
1✔
643
      buildId = clientOptions.getBinaryChecksum();
1✔
644
    }
645

646
    String identity = clientOptions.getIdentity();
1✔
647
    if (options.getIdentity() != null) {
1✔
648
      identity = options.getIdentity();
×
649
    }
650

651
    return SingleWorkerOptions.newBuilder()
1✔
652
        .setDataConverter(clientOptions.getDataConverter())
1✔
653
        .setIdentity(identity)
1✔
654
        .setBuildId(buildId)
1✔
655
        .setUseBuildIdForVersioning(options.isUsingBuildIdForVersioning())
1✔
656
        .setEnableLoggingInReplay(factoryOptions.isEnableLoggingInReplay())
1✔
657
        .setContextPropagators(contextPropagators)
1✔
658
        .setWorkerInterceptors(factoryOptions.getWorkerInterceptors())
1✔
659
        .setMaxHeartbeatThrottleInterval(options.getMaxHeartbeatThrottleInterval())
1✔
660
        .setDefaultHeartbeatThrottleInterval(options.getDefaultHeartbeatThrottleInterval());
1✔
661
  }
662

663
  /**
664
   * If any slot supplier is resource-based, we want to attach a metrics scope to the controller
665
   * (before it's labelled with the worker type).
666
   */
667
  private static void attachMetricsToResourceController(
668
      Scope metricsScope, SlotSupplier<?> supplier) {
669
    if (supplier instanceof ResourceBasedSlotSupplier) {
1✔
670
      ((ResourceBasedSlotSupplier<?>) supplier)
1✔
671
          .getResourceController()
1✔
672
          .setMetricsScope(metricsScope);
1✔
673
    }
674
  }
1✔
675
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc