• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

temporalio / sdk-java / #333

16 Oct 2024 07:28PM UTC coverage: 78.65% (+0.6%) from 78.085%
#333

push

github

web-flow
Fix code coverage (#2275)

22670 of 28824 relevant lines covered (78.65%)

0.79 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

91.67
/temporal-sdk/src/main/java/io/temporal/internal/worker/SyncWorkflowWorker.java
1
/*
2
 * Copyright (C) 2022 Temporal Technologies, Inc. All Rights Reserved.
3
 *
4
 * Copyright (C) 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved.
5
 *
6
 * Modifications copyright (C) 2017 Uber Technologies, Inc.
7
 *
8
 * Licensed under the Apache License, Version 2.0 (the "License");
9
 * you may not use this material except in compliance with the License.
10
 * You may obtain a copy of the License at
11
 *
12
 *   http://www.apache.org/licenses/LICENSE-2.0
13
 *
14
 * Unless required by applicable law or agreed to in writing, software
15
 * distributed under the License is distributed on an "AS IS" BASIS,
16
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17
 * See the License for the specific language governing permissions and
18
 * limitations under the License.
19
 */
20

21
package io.temporal.internal.worker;
22

23
import static io.temporal.internal.common.InternalUtils.createStickyTaskQueue;
24

25
import io.temporal.api.common.v1.Payloads;
26
import io.temporal.api.taskqueue.v1.TaskQueue;
27
import io.temporal.common.converter.DataConverter;
28
import io.temporal.internal.activity.ActivityExecutionContextFactory;
29
import io.temporal.internal.activity.ActivityTaskHandlerImpl;
30
import io.temporal.internal.activity.LocalActivityExecutionContextFactoryImpl;
31
import io.temporal.internal.replay.ReplayWorkflowTaskHandler;
32
import io.temporal.internal.sync.POJOWorkflowImplementationFactory;
33
import io.temporal.internal.sync.WorkflowThreadExecutor;
34
import io.temporal.serviceclient.WorkflowServiceStubs;
35
import io.temporal.worker.WorkflowImplementationOptions;
36
import io.temporal.worker.WorkflowTaskDispatchHandle;
37
import io.temporal.worker.tuning.LocalActivitySlotInfo;
38
import io.temporal.worker.tuning.SlotSupplier;
39
import io.temporal.worker.tuning.WorkflowSlotInfo;
40
import io.temporal.workflow.Functions.Func;
41
import java.lang.reflect.Type;
42
import java.time.Duration;
43
import java.util.Objects;
44
import java.util.Optional;
45
import java.util.concurrent.*;
46
import javax.annotation.Nonnull;
47
import javax.annotation.Nullable;
48
import org.slf4j.Logger;
49
import org.slf4j.LoggerFactory;
50

51
/**
52
 * Facade that supports a lifecycle and maintains an assembly of
53
 *
54
 * <ul>
55
 *   <li>{@link WorkflowWorker} that performing execution of workflow task
56
 *   <li>{@link LocalActivityWorker} that performs execution of local activities scheduled by the
57
 *       workflow tasks
58
 * </ul>
59
 *
60
 * and exposing additional management helper methods for the assembly.
61
 */
62
public class SyncWorkflowWorker implements SuspendableWorker {
63
  private static final Logger log = LoggerFactory.getLogger(SyncWorkflowWorker.class);
1✔
64

65
  private final String identity;
66
  private final String namespace;
67
  private final String taskQueue;
68

69
  private final WorkflowWorker workflowWorker;
70
  private final QueryReplayHelper queryReplayHelper;
71
  private final LocalActivityWorker laWorker;
72
  private final POJOWorkflowImplementationFactory factory;
73
  private final DataConverter dataConverter;
74
  private final ActivityTaskHandlerImpl laTaskHandler;
75
  private boolean runningLocalActivityWorker;
76

77
  public SyncWorkflowWorker(
78
      @Nonnull WorkflowServiceStubs service,
79
      @Nonnull String namespace,
80
      @Nonnull String taskQueue,
81
      @Nonnull SingleWorkerOptions singleWorkerOptions,
82
      @Nonnull SingleWorkerOptions localActivityOptions,
83
      @Nonnull WorkflowRunLockManager runLocks,
84
      @Nonnull WorkflowExecutorCache cache,
85
      String stickyTaskQueueName,
86
      @Nonnull WorkflowThreadExecutor workflowThreadExecutor,
87
      @Nonnull EagerActivityDispatcher eagerActivityDispatcher,
88
      @Nonnull SlotSupplier<WorkflowSlotInfo> slotSupplier,
89
      @Nonnull SlotSupplier<LocalActivitySlotInfo> laSlotSupplier) {
1✔
90
    this.identity = singleWorkerOptions.getIdentity();
1✔
91
    this.namespace = namespace;
1✔
92
    this.taskQueue = taskQueue;
1✔
93
    this.dataConverter = singleWorkerOptions.getDataConverter();
1✔
94

95
    factory =
1✔
96
        new POJOWorkflowImplementationFactory(
97
            singleWorkerOptions,
98
            Objects.requireNonNull(workflowThreadExecutor),
1✔
99
            singleWorkerOptions.getWorkerInterceptors(),
1✔
100
            cache,
101
            namespace);
102

103
    ActivityExecutionContextFactory laActivityExecutionContextFactory =
1✔
104
        new LocalActivityExecutionContextFactoryImpl();
105
    laTaskHandler =
1✔
106
        new ActivityTaskHandlerImpl(
107
            namespace,
108
            taskQueue,
109
            localActivityOptions.getDataConverter(),
1✔
110
            laActivityExecutionContextFactory,
111
            localActivityOptions.getWorkerInterceptors(),
1✔
112
            localActivityOptions.getContextPropagators());
1✔
113
    laWorker =
1✔
114
        new LocalActivityWorker(
115
            namespace, taskQueue, localActivityOptions, laTaskHandler, laSlotSupplier);
116
    TaskQueue stickyTaskQueue = null;
1✔
117
    if (stickyTaskQueueName != null) {
1✔
118
      stickyTaskQueue = createStickyTaskQueue(stickyTaskQueueName, taskQueue);
1✔
119
    }
120

121
    WorkflowTaskHandler taskHandler =
1✔
122
        new ReplayWorkflowTaskHandler(
123
            namespace,
124
            factory,
125
            cache,
126
            singleWorkerOptions,
127
            stickyTaskQueue,
128
            singleWorkerOptions.getStickyQueueScheduleToStartTimeout(),
1✔
129
            service,
130
            laWorker.getLocalActivityScheduler());
1✔
131

132
    workflowWorker =
1✔
133
        new WorkflowWorker(
134
            service,
135
            namespace,
136
            taskQueue,
137
            stickyTaskQueueName,
138
            singleWorkerOptions,
139
            runLocks,
140
            cache,
141
            taskHandler,
142
            eagerActivityDispatcher,
143
            slotSupplier);
144

145
    // Exists to support Worker#replayWorkflowExecution functionality.
146
    // This handler has to be non-sticky to avoid evicting actual executions from the cache
147
    WorkflowTaskHandler nonStickyReplayTaskHandler =
1✔
148
        new ReplayWorkflowTaskHandler(
149
            namespace,
150
            factory,
151
            null,
152
            singleWorkerOptions,
153
            null,
154
            Duration.ZERO,
155
            service,
156
            laWorker.getLocalActivityScheduler());
1✔
157

158
    queryReplayHelper = new QueryReplayHelper(nonStickyReplayTaskHandler);
1✔
159
  }
1✔
160

161
  public void registerWorkflowImplementationTypes(
162
      WorkflowImplementationOptions options, Class<?>[] workflowImplementationTypes) {
163
    factory.registerWorkflowImplementationTypes(options, workflowImplementationTypes);
1✔
164
  }
1✔
165

166
  public <R> void registerWorkflowImplementationFactory(
167
      WorkflowImplementationOptions options, Class<R> clazz, Func<R> factory) {
168
    this.factory.addWorkflowImplementationFactory(options, clazz, factory);
1✔
169
  }
1✔
170

171
  public void registerLocalActivityImplementations(Object... activitiesImplementation) {
172
    this.laTaskHandler.registerActivityImplementations(activitiesImplementation);
1✔
173
  }
1✔
174

175
  @Override
176
  public boolean start() {
177
    boolean started = workflowWorker.start();
1✔
178
    // It doesn't start if no types are registered with it.
179
    if (started) {
1✔
180
      runningLocalActivityWorker = laWorker.start();
1✔
181
    }
182
    return started;
1✔
183
  }
184

185
  @Override
186
  public CompletableFuture<Void> shutdown(ShutdownManager shutdownManager, boolean interruptTasks) {
187
    return workflowWorker
1✔
188
        .shutdown(shutdownManager, interruptTasks)
1✔
189
        .thenCompose(ignore -> laWorker.shutdown(shutdownManager, interruptTasks))
1✔
190
        .exceptionally(
1✔
191
            e -> {
192
              log.error("[BUG] Unexpected exception during shutdown", e);
×
193
              return null;
×
194
            });
195
  }
196

197
  @Override
198
  public void awaitTermination(long timeout, TimeUnit unit) {
199
    long timeoutMillis = ShutdownManager.awaitTermination(laWorker, unit.toMillis(timeout));
1✔
200
    ShutdownManager.awaitTermination(workflowWorker, timeoutMillis);
1✔
201
  }
1✔
202

203
  @Override
204
  public void suspendPolling() {
205
    workflowWorker.suspendPolling();
1✔
206
  }
1✔
207

208
  @Override
209
  public void resumePolling() {
210
    workflowWorker.resumePolling();
1✔
211
  }
1✔
212

213
  @SuppressWarnings("deprecation")
214
  public <R> R queryWorkflowExecution(
215
      io.temporal.internal.common.WorkflowExecutionHistory history,
216
      String queryType,
217
      Class<R> resultClass,
218
      Type resultType,
219
      Object[] args)
220
      throws Exception {
221
    Optional<Payloads> serializedArgs = dataConverter.toPayloads(args);
1✔
222
    Optional<Payloads> result =
1✔
223
        queryReplayHelper.queryWorkflowExecution(history, queryType, serializedArgs);
1✔
224
    return dataConverter.fromPayloads(0, result, resultClass, resultType);
1✔
225
  }
226

227
  @Override
228
  public boolean isSuspended() {
229
    return workflowWorker.isSuspended();
1✔
230
  }
231

232
  @Override
233
  public boolean isShutdown() {
234
    return workflowWorker.isShutdown() || laWorker.isShutdown();
×
235
  }
236

237
  @Override
238
  public boolean isTerminated() {
239
    return workflowWorker.isTerminated()
1✔
240
        && (!runningLocalActivityWorker || laWorker.isTerminated());
1✔
241
  }
242

243
  @Override
244
  public WorkerLifecycleState getLifecycleState() {
245
    return null;
×
246
  }
247

248
  @Override
249
  public String toString() {
250
    return String.format(
×
251
        "SyncWorkflowWorker{namespace=%s, taskQueue=%s, identity=%s}",
252
        namespace, taskQueue, identity);
253
  }
254

255
  @Nullable
256
  public WorkflowTaskDispatchHandle reserveWorkflowExecutor() {
257
    return workflowWorker.reserveWorkflowExecutor();
1✔
258
  }
259
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc