• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

temporalio / sdk-java / #153

pending completion
#153

push

github-actions

web-flow
Eager Workflow Task Dispatch (#1674)

Issue #1646

Signed-off-by: Dmitry Spikhalskiy <dmitry@spikhalskiy.com>

213 of 213 new or added lines in 22 files covered. (100.0%)

16682 of 20566 relevant lines covered (81.11%)

0.81 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

89.29
/temporal-sdk/src/main/java/io/temporal/internal/worker/SyncWorkflowWorker.java
1
/*
2
 * Copyright (C) 2022 Temporal Technologies, Inc. All Rights Reserved.
3
 *
4
 * Copyright (C) 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved.
5
 *
6
 * Modifications copyright (C) 2017 Uber Technologies, Inc.
7
 *
8
 * Licensed under the Apache License, Version 2.0 (the "License");
9
 * you may not use this material except in compliance with the License.
10
 * You may obtain a copy of the License at
11
 *
12
 *   http://www.apache.org/licenses/LICENSE-2.0
13
 *
14
 * Unless required by applicable law or agreed to in writing, software
15
 * distributed under the License is distributed on an "AS IS" BASIS,
16
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17
 * See the License for the specific language governing permissions and
18
 * limitations under the License.
19
 */
20

21
package io.temporal.internal.worker;
22

23
import io.temporal.api.common.v1.Payloads;
24
import io.temporal.common.converter.DataConverter;
25
import io.temporal.internal.activity.ActivityExecutionContextFactory;
26
import io.temporal.internal.activity.ActivityTaskHandlerImpl;
27
import io.temporal.internal.activity.LocalActivityExecutionContextFactoryImpl;
28
import io.temporal.internal.replay.ReplayWorkflowTaskHandler;
29
import io.temporal.internal.sync.POJOWorkflowImplementationFactory;
30
import io.temporal.internal.sync.WorkflowThreadExecutor;
31
import io.temporal.serviceclient.WorkflowServiceStubs;
32
import io.temporal.worker.WorkflowImplementationOptions;
33
import io.temporal.worker.WorkflowTaskDispatchHandle;
34
import io.temporal.workflow.Functions.Func;
35
import java.lang.reflect.Type;
36
import java.time.Duration;
37
import java.util.Objects;
38
import java.util.Optional;
39
import java.util.concurrent.*;
40
import javax.annotation.Nonnull;
41
import javax.annotation.Nullable;
42
import org.slf4j.Logger;
43
import org.slf4j.LoggerFactory;
44

45
/**
46
 * Facade that supports a lifecycle and maintains an assembly of
47
 *
48
 * <ul>
49
 *   <li>{@link WorkflowWorker} that performing execution of workflow task
50
 *   <li>{@link LocalActivityWorker} that performs execution of local activities scheduled by the
51
 *       workflow tasks
52
 * </ul>
53
 *
54
 * and exposing additional management helper methods for the assembly.
55
 */
56
public class SyncWorkflowWorker implements SuspendableWorker {
57
  private static final Logger log = LoggerFactory.getLogger(SyncWorkflowWorker.class);
1✔
58

59
  private final String identity;
60
  private final String namespace;
61
  private final String taskQueue;
62

63
  private final WorkflowWorker workflowWorker;
64
  private final QueryReplayHelper queryReplayHelper;
65
  private final LocalActivityWorker laWorker;
66
  private final POJOWorkflowImplementationFactory factory;
67
  private final DataConverter dataConverter;
68
  private final ActivityTaskHandlerImpl laTaskHandler;
69

70
  public SyncWorkflowWorker(
71
      WorkflowServiceStubs service,
72
      String namespace,
73
      String taskQueue,
74
      SingleWorkerOptions singleWorkerOptions,
75
      SingleWorkerOptions localActivityOptions,
76
      @Nonnull WorkflowRunLockManager runLocks,
77
      @Nonnull WorkflowExecutorCache cache,
78
      String stickyTaskQueueName,
79
      WorkflowThreadExecutor workflowThreadExecutor,
80
      @Nonnull EagerActivityDispatcher eagerActivityDispatcher) {
1✔
81
    this.identity = singleWorkerOptions.getIdentity();
1✔
82
    this.namespace = namespace;
1✔
83
    this.taskQueue = taskQueue;
1✔
84
    this.dataConverter = singleWorkerOptions.getDataConverter();
1✔
85

86
    factory =
1✔
87
        new POJOWorkflowImplementationFactory(
88
            singleWorkerOptions,
89
            Objects.requireNonNull(workflowThreadExecutor),
1✔
90
            singleWorkerOptions.getWorkerInterceptors(),
1✔
91
            cache);
92

93
    ActivityExecutionContextFactory laActivityExecutionContextFactory =
1✔
94
        new LocalActivityExecutionContextFactoryImpl();
95
    laTaskHandler =
1✔
96
        new ActivityTaskHandlerImpl(
97
            namespace,
98
            localActivityOptions.getDataConverter(),
1✔
99
            laActivityExecutionContextFactory,
100
            localActivityOptions.getWorkerInterceptors(),
1✔
101
            localActivityOptions.getContextPropagators());
1✔
102
    laWorker = new LocalActivityWorker(namespace, taskQueue, localActivityOptions, laTaskHandler);
1✔
103

104
    WorkflowTaskHandler taskHandler =
1✔
105
        new ReplayWorkflowTaskHandler(
106
            namespace,
107
            factory,
108
            cache,
109
            singleWorkerOptions,
110
            stickyTaskQueueName,
111
            singleWorkerOptions.getStickyQueueScheduleToStartTimeout(),
1✔
112
            service,
113
            laWorker.getLocalActivityScheduler());
1✔
114

115
    workflowWorker =
1✔
116
        new WorkflowWorker(
117
            service,
118
            namespace,
119
            taskQueue,
120
            stickyTaskQueueName,
121
            singleWorkerOptions,
122
            runLocks,
123
            cache,
124
            taskHandler,
125
            eagerActivityDispatcher);
126

127
    // Exists to support Worker#replayWorkflowExecution functionality.
128
    // This handler has to be non-sticky to avoid evicting actual executions from the cache
129
    WorkflowTaskHandler nonStickyReplayTaskHandler =
1✔
130
        new ReplayWorkflowTaskHandler(
131
            namespace,
132
            factory,
133
            null,
134
            singleWorkerOptions,
135
            null,
136
            Duration.ZERO,
137
            service,
138
            laWorker.getLocalActivityScheduler());
1✔
139

140
    queryReplayHelper = new QueryReplayHelper(nonStickyReplayTaskHandler);
1✔
141
  }
1✔
142

143
  public void registerWorkflowImplementationTypes(
144
      WorkflowImplementationOptions options, Class<?>[] workflowImplementationTypes) {
145
    factory.registerWorkflowImplementationTypes(options, workflowImplementationTypes);
1✔
146
  }
1✔
147

148
  public <R> void registerWorkflowImplementationFactory(
149
      WorkflowImplementationOptions options, Class<R> clazz, Func<R> factory) {
150
    this.factory.addWorkflowImplementationFactory(options, clazz, factory);
1✔
151
  }
1✔
152

153
  public void registerLocalActivityImplementations(Object... activitiesImplementation) {
154
    this.laTaskHandler.registerActivityImplementations(activitiesImplementation);
1✔
155
  }
1✔
156

157
  @Override
158
  public boolean start() {
159
    boolean started = workflowWorker.start();
1✔
160
    // It doesn't start if no types are registered with it.
161
    if (started) {
1✔
162
      laWorker.start();
1✔
163
    }
164
    return started;
1✔
165
  }
166

167
  @Override
168
  public CompletableFuture<Void> shutdown(ShutdownManager shutdownManager, boolean interruptTasks) {
169
    return workflowWorker
1✔
170
        .shutdown(shutdownManager, interruptTasks)
1✔
171
        .thenCompose(ignore -> laWorker.shutdown(shutdownManager, interruptTasks))
1✔
172
        .exceptionally(
1✔
173
            e -> {
174
              log.error("[BUG] Unexpected exception during shutdown", e);
×
175
              return null;
×
176
            });
177
  }
178

179
  @Override
180
  public void awaitTermination(long timeout, TimeUnit unit) {
181
    long timeoutMillis = ShutdownManager.awaitTermination(laWorker, unit.toMillis(timeout));
1✔
182
    ShutdownManager.awaitTermination(workflowWorker, timeoutMillis);
1✔
183
  }
1✔
184

185
  @Override
186
  public void suspendPolling() {
187
    workflowWorker.suspendPolling();
1✔
188
  }
1✔
189

190
  @Override
191
  public void resumePolling() {
192
    workflowWorker.resumePolling();
1✔
193
  }
1✔
194

195
  @SuppressWarnings("deprecation")
196
  public <R> R queryWorkflowExecution(
197
      io.temporal.internal.common.WorkflowExecutionHistory history,
198
      String queryType,
199
      Class<R> resultClass,
200
      Type resultType,
201
      Object[] args)
202
      throws Exception {
203
    Optional<Payloads> serializedArgs = dataConverter.toPayloads(args);
1✔
204
    Optional<Payloads> result =
1✔
205
        queryReplayHelper.queryWorkflowExecution(history, queryType, serializedArgs);
1✔
206
    return dataConverter.fromPayloads(0, result, resultClass, resultType);
1✔
207
  }
208

209
  @Override
210
  public boolean isSuspended() {
211
    return workflowWorker.isSuspended();
1✔
212
  }
213

214
  @Override
215
  public boolean isShutdown() {
216
    return workflowWorker.isShutdown() || laWorker.isShutdown();
×
217
  }
218

219
  @Override
220
  public boolean isTerminated() {
221
    return workflowWorker.isTerminated() && laWorker.isTerminated();
×
222
  }
223

224
  @Override
225
  public WorkerLifecycleState getLifecycleState() {
226
    return null;
×
227
  }
228

229
  @Override
230
  public String toString() {
231
    return String.format(
×
232
        "SyncWorkflowWorker{namespace=%s, taskQueue=%s, identity=%s}",
233
        namespace, taskQueue, identity);
234
  }
235

236
  @Nullable
237
  public WorkflowTaskDispatchHandle reserveWorkflowExecutor() {
238
    return workflowWorker.reserveWorkflowExecutor();
1✔
239
  }
240
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc