• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

temporalio / sdk-java / #153

pending completion
#153

push

github-actions

web-flow
Eager Workflow Task Dispatch (#1674)

Issue #1646

Signed-off-by: Dmitry Spikhalskiy <dmitry@spikhalskiy.com>

213 of 213 new or added lines in 22 files covered. (100.0%)

16682 of 20566 relevant lines covered (81.11%)

0.81 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

87.9
/temporal-sdk/src/main/java/io/temporal/worker/WorkerFactory.java
1
/*
2
 * Copyright (C) 2022 Temporal Technologies, Inc. All Rights Reserved.
3
 *
4
 * Copyright (C) 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved.
5
 *
6
 * Modifications copyright (C) 2017 Uber Technologies, Inc.
7
 *
8
 * Licensed under the Apache License, Version 2.0 (the "License");
9
 * you may not use this material except in compliance with the License.
10
 * You may obtain a copy of the License at
11
 *
12
 *   http://www.apache.org/licenses/LICENSE-2.0
13
 *
14
 * Unless required by applicable law or agreed to in writing, software
15
 * distributed under the License is distributed on an "AS IS" BASIS,
16
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17
 * See the License for the specific language governing permissions and
18
 * limitations under the License.
19
 */
20

21
package io.temporal.worker;
22

23
import com.google.common.annotations.VisibleForTesting;
24
import com.google.common.base.Preconditions;
25
import com.google.common.base.Strings;
26
import com.uber.m3.tally.Scope;
27
import io.temporal.client.WorkflowClient;
28
import io.temporal.client.WorkflowClientOptions;
29
import io.temporal.common.converter.DataConverter;
30
import io.temporal.internal.client.WorkflowClientInternal;
31
import io.temporal.internal.sync.WorkflowThreadExecutor;
32
import io.temporal.internal.worker.*;
33
import io.temporal.internal.worker.WorkflowExecutorCache;
34
import io.temporal.serviceclient.MetricsTag;
35
import java.util.HashMap;
36
import java.util.Map;
37
import java.util.Objects;
38
import java.util.concurrent.CompletableFuture;
39
import java.util.concurrent.SynchronousQueue;
40
import java.util.concurrent.ThreadPoolExecutor;
41
import java.util.concurrent.TimeUnit;
42
import java.util.concurrent.atomic.AtomicInteger;
43
import javax.annotation.Nonnull;
44
import javax.annotation.Nullable;
45
import org.slf4j.Logger;
46
import org.slf4j.LoggerFactory;
47

48
/** Maintains worker creation and lifecycle. */
49
public final class WorkerFactory {
50
  private static final Logger log = LoggerFactory.getLogger(WorkerFactory.class);
1✔
51

52
  private final WorkflowRunLockManager runLocks = new WorkflowRunLockManager();
1✔
53

54
  private final Scope metricsScope;
55

56
  private final Map<String, Worker> workers = new HashMap<>();
1✔
57
  private final WorkflowClient workflowClient;
58
  private final ThreadPoolExecutor workflowThreadPool;
59
  private final WorkflowThreadExecutor workflowThreadExecutor;
60
  private final AtomicInteger workflowThreadCounter = new AtomicInteger();
1✔
61
  private final WorkerFactoryOptions factoryOptions;
62

63
  private final @Nonnull WorkflowExecutorCache cache;
64

65
  private State state = State.Initial;
1✔
66

67
  private final String statusErrorMessage =
1✔
68
      "attempted to %s while in %s state. Acceptable States: %s";
69

70
  public static WorkerFactory newInstance(WorkflowClient workflowClient) {
71
    return WorkerFactory.newInstance(workflowClient, WorkerFactoryOptions.getDefaultInstance());
1✔
72
  }
73

74
  public static WorkerFactory newInstance(
75
      WorkflowClient workflowClient, WorkerFactoryOptions options) {
76
    return new WorkerFactory(workflowClient, options);
1✔
77
  }
78

79
  /**
80
   * Creates a factory. Workers will connect to the temporal server using the workflowService client
81
   * passed in.
82
   *
83
   * @param workflowClient client to the Temporal Service endpoint.
84
   * @param factoryOptions Options used to configure factory settings
85
   */
86
  private WorkerFactory(WorkflowClient workflowClient, WorkerFactoryOptions factoryOptions) {
1✔
87
    this.workflowClient = Objects.requireNonNull(workflowClient);
1✔
88
    WorkflowClientOptions workflowClientOptions = workflowClient.getOptions();
1✔
89
    String namespace = workflowClientOptions.getNamespace();
1✔
90

91
    this.factoryOptions =
1✔
92
        WorkerFactoryOptions.newBuilder(factoryOptions).validateAndBuildWithDefaults();
1✔
93

94
    this.metricsScope =
1✔
95
        this.workflowClient
96
            .getWorkflowServiceStubs()
1✔
97
            .getOptions()
1✔
98
            .getMetricsScope()
1✔
99
            .tagged(MetricsTag.defaultTags(namespace));
1✔
100

101
    this.workflowThreadPool =
1✔
102
        new ThreadPoolExecutor(
103
            0,
104
            this.factoryOptions.getMaxWorkflowThreadCount(),
1✔
105
            1,
106
            TimeUnit.MINUTES,
107
            new SynchronousQueue<>());
108
    this.workflowThreadPool.setThreadFactory(
1✔
109
        r -> new Thread(r, "workflow-thread-" + workflowThreadCounter.incrementAndGet()));
1✔
110
    this.workflowThreadExecutor =
1✔
111
        new ActiveThreadReportingExecutor(this.workflowThreadPool, this.metricsScope);
112

113
    this.cache =
1✔
114
        new WorkflowExecutorCache(
115
            this.factoryOptions.getWorkflowCacheSize(), runLocks, metricsScope);
1✔
116
  }
1✔
117

118
  /**
119
   * Creates worker that connects to an instance of the Temporal Service. It uses the namespace
120
   * configured at the Factory level. New workers cannot be created after the start() has been
121
   * called
122
   *
123
   * @param taskQueue task queue name worker uses to poll. It uses this name for both workflow and
124
   *     activity task queue polls.
125
   * @return Worker
126
   */
127
  public Worker newWorker(String taskQueue) {
128
    return newWorker(taskQueue, null);
1✔
129
  }
130

131
  /**
132
   * Creates worker that connects to an instance of the Temporal Service. It uses the namespace
133
   * configured at the Factory level. New workers cannot be created after the start() has been
134
   * called
135
   *
136
   * @param taskQueue task queue name worker uses to poll. It uses this name for both workflow and
137
   *     activity task queue polls.
138
   * @param options Options (like {@link DataConverter} override) for configuring worker.
139
   * @return Worker
140
   */
141
  public synchronized Worker newWorker(String taskQueue, WorkerOptions options) {
142
    Preconditions.checkArgument(
1✔
143
        !Strings.isNullOrEmpty(taskQueue), "taskQueue should not be an empty string");
1✔
144
    Preconditions.checkState(
1✔
145
        state == State.Initial,
146
        String.format(statusErrorMessage, "create new worker", state.name(), State.Initial.name()));
1✔
147

148
    // Only one worker can exist for a task queue
149
    Worker existingWorker = workers.get(taskQueue);
1✔
150
    if (existingWorker == null) {
1✔
151
      Worker worker =
1✔
152
          new Worker(
153
              workflowClient,
154
              taskQueue,
155
              factoryOptions,
156
              options,
157
              metricsScope,
158
              runLocks,
159
              cache,
160
              true,
161
              workflowThreadExecutor,
162
              workflowClient.getOptions().getContextPropagators());
1✔
163
      workers.put(taskQueue, worker);
1✔
164
      return worker;
1✔
165
    } else {
166
      log.warn(
×
167
          "Only one worker can be registered for a task queue, "
168
              + "subsequent calls to WorkerFactory#newWorker with the same task queue are ignored and "
169
              + "initially created worker is returned");
170
      return existingWorker;
×
171
    }
172
  }
173

174
  /**
175
   * @param taskQueue task queue name to lookup an existing worker for
176
   * @return a worker created previously through {@link #newWorker(String)} for the given task
177
   *     queue.
178
   * @throws IllegalStateException if the worker has not been registered for the given task queue.
179
   */
180
  public synchronized Worker getWorker(String taskQueue) {
181
    Worker result = workers.get(taskQueue);
1✔
182
    if (result == null) {
1✔
183
      throw new IllegalArgumentException("No worker for taskQueue: " + taskQueue);
×
184
    }
185
    return result;
1✔
186
  }
187

188
  /**
189
   * @param taskQueue task queue name to lookup an existing worker for
190
   * @return a worker created previously through {@link #newWorker(String)} for the given task queue
191
   *     or null.
192
   */
193
  @Nullable
194
  public synchronized Worker tryGetWorker(@Nonnull String taskQueue) {
195
    return workers.get(taskQueue);
1✔
196
  }
197

198
  /** Starts all the workers created by this factory. */
199
  public synchronized void start() {
200
    Preconditions.checkState(
1✔
201
        state == State.Initial || state == State.Started,
202
        String.format(
1✔
203
            statusErrorMessage,
204
            "start WorkerFactory",
205
            state.name(),
1✔
206
            String.format("%s, %s", State.Initial.name(), State.Initial.name())));
1✔
207
    if (state == State.Started) {
1✔
208
      return;
×
209
    }
210

211
    // Workers check and require that Temporal Server is available during start to fail-fast in case
212
    // of configuration issues.
213
    workflowClient.getWorkflowServiceStubs().connect(null);
1✔
214

215
    for (Worker worker : workers.values()) {
1✔
216
      worker.start();
1✔
217
    }
1✔
218

219
    state = State.Started;
1✔
220
    ((WorkflowClientInternal) workflowClient.getInternal()).registerWorkerFactory(this);
1✔
221
  }
1✔
222

223
  /** Was {@link #start()} called. */
224
  public synchronized boolean isStarted() {
225
    return state != State.Initial;
1✔
226
  }
227

228
  /** Was {@link #shutdown()} or {@link #shutdownNow()} called. */
229
  public synchronized boolean isShutdown() {
230
    return state == State.Shutdown;
×
231
  }
232

233
  /**
234
   * Returns true if all tasks have completed following shut down. Note that isTerminated is never
235
   * true unless either shutdown or shutdownNow was called first.
236
   */
237
  public synchronized boolean isTerminated() {
238
    if (state != State.Shutdown) {
×
239
      return false;
×
240
    }
241
    for (Worker worker : workers.values()) {
×
242
      if (!worker.isTerminated()) {
×
243
        return false;
×
244
      }
245
    }
×
246
    return true;
×
247
  }
248

249
  /**
250
   * @return instance of the Temporal client that this worker factory uses.
251
   */
252
  public WorkflowClient getWorkflowClient() {
253
    return workflowClient;
1✔
254
  }
255

256
  /**
257
   * Initiates an orderly shutdown in which polls are stopped and already received workflow and
258
   * activity tasks are executed. <br>
259
   * After the shutdown, calls to {@link
260
   * io.temporal.activity.ActivityExecutionContext#heartbeat(Object)} start throwing {@link
261
   * io.temporal.client.ActivityWorkerShutdownException}.<br>
262
   * This method does not wait for the shutdown to complete. Use {@link #awaitTermination(long,
263
   * TimeUnit)} to do that.<br>
264
   * Invocation has no additional effect if already shut down.
265
   */
266
  public synchronized void shutdown() {
267
    log.info("shutdown: {}", this);
1✔
268
    shutdownInternal(false);
1✔
269
  }
1✔
270

271
  /**
272
   * Initiates an orderly shutdown in which polls are stopped and already received workflow and
273
   * activity tasks are attempted to be stopped. <br>
274
   * This implementation cancels tasks via Thread.interrupt(), so any task that fails to respond to
275
   * interrupts may never terminate.<br>
276
   * After the shutdownNow calls to {@link
277
   * io.temporal.activity.ActivityExecutionContext#heartbeat(Object)} start throwing {@link
278
   * io.temporal.client.ActivityWorkerShutdownException}.<br>
279
   * This method does not wait for the shutdown to complete. Use {@link #awaitTermination(long,
280
   * TimeUnit)} to do that.<br>
281
   * Invocation has no additional effect if already shut down.
282
   */
283
  public synchronized void shutdownNow() {
284
    log.info("shutdownNow: {}", this);
1✔
285
    shutdownInternal(true);
1✔
286
  }
1✔
287

288
  private void shutdownInternal(boolean interruptUserTasks) {
289
    state = State.Shutdown;
1✔
290
    ((WorkflowClientInternal) workflowClient.getInternal()).deregisterWorkerFactory(this);
1✔
291
    ShutdownManager shutdownManager = new ShutdownManager();
1✔
292
    CompletableFuture.allOf(
1✔
293
            workers.values().stream()
1✔
294
                .map(worker -> worker.shutdown(shutdownManager, interruptUserTasks))
1✔
295
                .toArray(CompletableFuture[]::new))
1✔
296
        .thenApply(
1✔
297
            r -> {
298
              cache.invalidateAll();
1✔
299
              workflowThreadPool.shutdownNow();
1✔
300
              return null;
1✔
301
            })
302
        .whenComplete(
1✔
303
            (r, e) -> {
304
              if (e != null) {
1✔
305
                log.error("[BUG] Unexpected exception during shutdown", e);
×
306
              }
307
              shutdownManager.close();
1✔
308
            });
1✔
309
  }
1✔
310

311
  /**
312
   * Blocks until all tasks have completed execution after a shutdown request, or the timeout
313
   * occurs.
314
   */
315
  public void awaitTermination(long timeout, TimeUnit unit) {
316
    log.info("awaitTermination begin: {}", this);
1✔
317
    long timeoutMillis = unit.toMillis(timeout);
1✔
318
    for (Worker worker : workers.values()) {
1✔
319
      long t = timeoutMillis; // closure needs immutable value
1✔
320
      timeoutMillis =
1✔
321
          ShutdownManager.runAndGetRemainingTimeoutMs(
1✔
322
              t, () -> worker.awaitTermination(t, TimeUnit.MILLISECONDS));
1✔
323
    }
1✔
324
    log.info("awaitTermination done: {}", this);
1✔
325
  }
1✔
326

327
  // TODO we should hide an actual implementation of WorkerFactory under WorkerFactory interface and
328
  // expose this method on the implementation only
329
  @VisibleForTesting
330
  WorkflowExecutorCache getCache() {
331
    return this.cache;
1✔
332
  }
333

334
  public synchronized void suspendPolling() {
335
    if (state != State.Started) {
1✔
336
      return;
×
337
    }
338

339
    log.info("suspendPolling: {}", this);
1✔
340
    state = State.Suspended;
1✔
341
    for (Worker worker : workers.values()) {
1✔
342
      worker.suspendPolling();
1✔
343
    }
1✔
344
  }
1✔
345

346
  public synchronized void resumePolling() {
347
    if (state != State.Suspended) {
1✔
348
      return;
×
349
    }
350

351
    log.info("resumePolling: {}", this);
1✔
352
    state = State.Started;
1✔
353
    for (Worker worker : workers.values()) {
1✔
354
      worker.resumePolling();
1✔
355
    }
1✔
356
  }
1✔
357

358
  @Override
359
  public String toString() {
360
    return String.format("WorkerFactory{identity=%s}", workflowClient.getOptions().getIdentity());
1✔
361
  }
362

363
  enum State {
1✔
364
    Initial,
1✔
365
    Started,
1✔
366
    Suspended,
1✔
367
    Shutdown
1✔
368
  }
369
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc