• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

temporalio / sdk-java / #169

pending completion
#169

push

github-actions

web-flow
Remove use of deprecated API (#1758)

4 of 4 new or added lines in 1 file covered. (100.0%)

17345 of 21558 relevant lines covered (80.46%)

0.8 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

88.11
/temporal-sdk/src/main/java/io/temporal/internal/sync/POJOWorkflowImplementationFactory.java
1
/*
2
 * Copyright (C) 2022 Temporal Technologies, Inc. All Rights Reserved.
3
 *
4
 * Copyright (C) 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved.
5
 *
6
 * Modifications copyright (C) 2017 Uber Technologies, Inc.
7
 *
8
 * Licensed under the Apache License, Version 2.0 (the "License");
9
 * you may not use this material except in compliance with the License.
10
 * You may obtain a copy of the License at
11
 *
12
 *   http://www.apache.org/licenses/LICENSE-2.0
13
 *
14
 * Unless required by applicable law or agreed to in writing, software
15
 * distributed under the License is distributed on an "AS IS" BASIS,
16
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17
 * See the License for the specific language governing permissions and
18
 * limitations under the License.
19
 */
20

21
package io.temporal.internal.sync;
22

23
import static io.temporal.serviceclient.CheckedExceptionWrapper.wrap;
24

25
import com.google.common.base.Preconditions;
26
import com.google.common.collect.ImmutableSet;
27
import io.temporal.api.common.v1.Payloads;
28
import io.temporal.api.common.v1.WorkflowExecution;
29
import io.temporal.api.common.v1.WorkflowType;
30
import io.temporal.common.context.ContextPropagator;
31
import io.temporal.common.converter.DataConverter;
32
import io.temporal.common.interceptors.Header;
33
import io.temporal.common.interceptors.WorkerInterceptor;
34
import io.temporal.common.interceptors.WorkflowInboundCallsInterceptor;
35
import io.temporal.common.interceptors.WorkflowOutboundCallsInterceptor;
36
import io.temporal.common.metadata.POJOWorkflowImplMetadata;
37
import io.temporal.common.metadata.POJOWorkflowInterfaceMetadata;
38
import io.temporal.common.metadata.POJOWorkflowMethodMetadata;
39
import io.temporal.failure.CanceledFailure;
40
import io.temporal.internal.common.env.ReflectionUtils;
41
import io.temporal.internal.replay.ReplayWorkflow;
42
import io.temporal.internal.replay.ReplayWorkflowFactory;
43
import io.temporal.internal.worker.SingleWorkerOptions;
44
import io.temporal.internal.worker.WorkflowExecutionException;
45
import io.temporal.internal.worker.WorkflowExecutorCache;
46
import io.temporal.payload.context.WorkflowSerializationContext;
47
import io.temporal.worker.TypeAlreadyRegisteredException;
48
import io.temporal.worker.WorkflowImplementationOptions;
49
import io.temporal.workflow.DynamicWorkflow;
50
import io.temporal.workflow.Functions;
51
import io.temporal.workflow.Functions.Func;
52
import java.lang.reflect.InvocationTargetException;
53
import java.lang.reflect.Method;
54
import java.util.Collections;
55
import java.util.HashMap;
56
import java.util.List;
57
import java.util.Map;
58
import java.util.Objects;
59
import java.util.Optional;
60
import javax.annotation.Nonnull;
61
import org.slf4j.Logger;
62
import org.slf4j.LoggerFactory;
63

64
public final class POJOWorkflowImplementationFactory implements ReplayWorkflowFactory {
65
  private static final Logger log =
1✔
66
      LoggerFactory.getLogger(POJOWorkflowImplementationFactory.class);
1✔
67

68
  public static final ImmutableSet<String> WORKFLOW_HANDLER_STACKTRACE_CUTOFF =
1✔
69
      ImmutableSet.<String>builder()
1✔
70
          // POJO
71
          .add(
1✔
72
              ReflectionUtils.getMethodNameForStackTraceCutoff(
1✔
73
                  POJOWorkflowImplementation.class, "execute", Header.class, Optional.class))
74
          // Dynamic
75
          .add(
1✔
76
              ReflectionUtils.getMethodNameForStackTraceCutoff(
1✔
77
                  DynamicSyncWorkflowDefinition.class, "execute", Header.class, Optional.class))
78
          .build();
1✔
79
  private final WorkerInterceptor[] workerInterceptors;
80

81
  private final DataConverter dataConverter;
82
  private final List<ContextPropagator> contextPropagators;
83
  private final long defaultDeadlockDetectionTimeout;
84

85
  /** Key: workflow type name, Value: function that creates SyncWorkflowDefinition instance. */
86
  private final Map<String, Functions.Func1<WorkflowExecution, SyncWorkflowDefinition>>
1✔
87
      workflowDefinitions = Collections.synchronizedMap(new HashMap<>());
1✔
88
  /** Factories providing instances of workflow classes. */
89
  private final Map<Class<?>, Functions.Func<?>> workflowInstanceFactories =
1✔
90
      Collections.synchronizedMap(new HashMap<>());
1✔
91
  /** If present then it is called for any unknown workflow type. */
92
  private Functions.Func<? extends DynamicWorkflow> dynamicWorkflowImplementationFactory;
93

94
  private final Map<String, WorkflowImplementationOptions> implementationOptions =
1✔
95
      Collections.synchronizedMap(new HashMap<>());
1✔
96

97
  private final WorkflowThreadExecutor workflowThreadExecutor;
98
  private final WorkflowExecutorCache cache;
99

100
  private final String namespace;
101

102
  public POJOWorkflowImplementationFactory(
103
      SingleWorkerOptions singleWorkerOptions,
104
      WorkflowThreadExecutor workflowThreadExecutor,
105
      WorkerInterceptor[] workerInterceptors,
106
      WorkflowExecutorCache cache,
107
      @Nonnull String namespace) {
1✔
108
    Objects.requireNonNull(singleWorkerOptions);
1✔
109
    this.dataConverter = singleWorkerOptions.getDataConverter();
1✔
110
    this.workflowThreadExecutor = Objects.requireNonNull(workflowThreadExecutor);
1✔
111
    this.workerInterceptors = Objects.requireNonNull(workerInterceptors);
1✔
112
    this.cache = cache;
1✔
113
    this.contextPropagators = singleWorkerOptions.getContextPropagators();
1✔
114
    this.defaultDeadlockDetectionTimeout = singleWorkerOptions.getDefaultDeadlockDetectionTimeout();
1✔
115
    this.namespace = namespace;
1✔
116
  }
1✔
117

118
  public void registerWorkflowImplementationTypes(
119
      WorkflowImplementationOptions options, Class<?>[] workflowImplementationTypes) {
120
    for (Class<?> type : workflowImplementationTypes) {
1✔
121
      registerWorkflowImplementationType(options, type);
1✔
122
    }
123
  }
1✔
124

125
  /**
126
   * @param clazz has to be a workflow interface class. The only exception is if it's a
127
   *     DynamicWorkflow class.
128
   */
129
  @SuppressWarnings("unchecked")
130
  public <R> void addWorkflowImplementationFactory(
131
      WorkflowImplementationOptions options, Class<R> clazz, Functions.Func<R> factory) {
132
    if (DynamicWorkflow.class.isAssignableFrom(clazz)) {
1✔
133
      if (dynamicWorkflowImplementationFactory != null) {
1✔
134
        throw new TypeAlreadyRegisteredException(
×
135
            "DynamicWorkflow",
136
            "An implementation of DynamicWorkflow or its factory is already registered with the worker");
137
      }
138
      dynamicWorkflowImplementationFactory = (Func<? extends DynamicWorkflow>) factory;
1✔
139
      return;
1✔
140
    }
141
    workflowInstanceFactories.put(clazz, factory);
1✔
142
    POJOWorkflowInterfaceMetadata workflowMetadata =
1✔
143
        POJOWorkflowInterfaceMetadata.newInstance(clazz);
1✔
144
    if (!workflowMetadata.getWorkflowMethod().isPresent()) {
1✔
145
      throw new IllegalArgumentException(
×
146
          "Workflow interface doesn't contain a method annotated with @WorkflowMethod: " + clazz);
147
    }
148
    List<POJOWorkflowMethodMetadata> methodsMetadata = workflowMetadata.getMethodsMetadata();
1✔
149
    for (POJOWorkflowMethodMetadata methodMetadata : methodsMetadata) {
1✔
150
      switch (methodMetadata.getType()) {
1✔
151
        case WORKFLOW:
152
          String typeName = methodMetadata.getName();
1✔
153
          if (workflowDefinitions.containsKey(typeName)) {
1✔
154
            throw new TypeAlreadyRegisteredException(
×
155
                typeName,
156
                "\"" + typeName + "\" workflow type is already registered with the worker");
157
          }
158
          workflowDefinitions.put(
1✔
159
              typeName,
160
              (execution) ->
161
                  new POJOWorkflowImplementation(
1✔
162
                      clazz,
163
                      methodMetadata.getWorkflowMethod(),
1✔
164
                      dataConverter.withContext(
1✔
165
                          new WorkflowSerializationContext(namespace, execution.getWorkflowId()))));
1✔
166
          implementationOptions.put(typeName, options);
1✔
167
          break;
1✔
168
        case SIGNAL:
169
          // Signals are registered through Workflow.registerListener
170
          break;
171
      }
172
    }
1✔
173
  }
1✔
174

175
  private <T> void registerWorkflowImplementationType(
176
      WorkflowImplementationOptions options, Class<T> workflowImplementationClass) {
177
    if (DynamicWorkflow.class.isAssignableFrom(workflowImplementationClass)) {
1✔
178
      addWorkflowImplementationFactory(
1✔
179
          options,
180
          workflowImplementationClass,
181
          () -> {
182
            try {
183
              return workflowImplementationClass.getDeclaredConstructor().newInstance();
1✔
184
            } catch (NoSuchMethodException
×
185
                | InstantiationException
186
                | IllegalAccessException
187
                | InvocationTargetException e) {
188
              // Error to fail workflow task as this can be fixed by a new deployment.
189
              throw new Error(
×
190
                  "Failure instantiating workflow implementation class "
191
                      + workflowImplementationClass.getName(),
×
192
                  e);
193
            }
194
          });
195
      return;
1✔
196
    }
197
    POJOWorkflowImplMetadata workflowMetadata =
1✔
198
        POJOWorkflowImplMetadata.newInstance(workflowImplementationClass);
1✔
199
    List<POJOWorkflowMethodMetadata> workflowMethods = workflowMetadata.getWorkflowMethods();
1✔
200
    if (workflowMethods.isEmpty()) {
1✔
201
      throw new IllegalArgumentException(
×
202
          "Workflow implementation doesn't implement any interface "
203
              + "with a workflow method annotated with @WorkflowMethod: "
204
              + workflowImplementationClass);
205
    }
206
    for (POJOWorkflowMethodMetadata workflowMethod : workflowMethods) {
1✔
207
      String workflowName = workflowMethod.getName();
1✔
208
      Method method = workflowMethod.getWorkflowMethod();
1✔
209
      Functions.Func1<WorkflowExecution, SyncWorkflowDefinition> definition =
1✔
210
          (execution) ->
211
              new POJOWorkflowImplementation(
1✔
212
                  workflowImplementationClass,
213
                  method,
214
                  dataConverter.withContext(
1✔
215
                      new WorkflowSerializationContext(namespace, execution.getWorkflowId())));
1✔
216

217
      if (workflowDefinitions.containsKey(workflowName)) {
1✔
218
        throw new IllegalStateException(
×
219
            workflowName + " workflow type is already registered with the worker");
220
      }
221
      workflowDefinitions.put(workflowName, definition);
1✔
222
      implementationOptions.put(workflowName, options);
1✔
223
    }
1✔
224
  }
1✔
225

226
  private SyncWorkflowDefinition getWorkflowDefinition(
227
      WorkflowType workflowType, WorkflowExecution workflowExecution) {
228
    Functions.Func1<WorkflowExecution, SyncWorkflowDefinition> factory =
1✔
229
        workflowDefinitions.get(workflowType.getName());
1✔
230
    if (factory == null) {
1✔
231
      if (dynamicWorkflowImplementationFactory != null) {
1✔
232
        return new DynamicSyncWorkflowDefinition(
1✔
233
            dynamicWorkflowImplementationFactory, workerInterceptors, dataConverter);
234
      }
235
      // throw Error to abort the workflow task, not fail the workflow
236
      throw new Error(
1✔
237
          "Unknown workflow type \""
238
              + workflowType.getName()
1✔
239
              + "\". Known types are "
240
              + workflowDefinitions.keySet());
1✔
241
    }
242
    try {
243
      return factory.apply(workflowExecution);
1✔
244
    } catch (Exception e) {
×
245
      throw new Error(e);
×
246
    }
247
  }
248

249
  @Override
250
  public ReplayWorkflow getWorkflow(
251
      WorkflowType workflowType, WorkflowExecution workflowExecution) {
252
    SyncWorkflowDefinition workflow = getWorkflowDefinition(workflowType, workflowExecution);
1✔
253
    WorkflowImplementationOptions workflowImplementationOptions =
1✔
254
        implementationOptions.get(workflowType.getName());
1✔
255
    DataConverter dataConverterWithWorkflowContext =
1✔
256
        dataConverter.withContext(
1✔
257
            new WorkflowSerializationContext(namespace, workflowExecution.getWorkflowId()));
1✔
258
    return new SyncWorkflow(
1✔
259
        namespace,
260
        workflowExecution,
261
        workflow,
262
        new SignalDispatcher(dataConverterWithWorkflowContext),
263
        new QueryDispatcher(dataConverterWithWorkflowContext),
264
        new UpdateDispatcher(dataConverterWithWorkflowContext),
265
        workflowImplementationOptions,
266
        dataConverter,
267
        workflowThreadExecutor,
268
        cache,
269
        contextPropagators,
270
        defaultDeadlockDetectionTimeout);
271
  }
272

273
  @Override
274
  public boolean isAnyTypeSupported() {
275
    return !workflowDefinitions.isEmpty() || dynamicWorkflowImplementationFactory != null;
1✔
276
  }
277

278
  private class POJOWorkflowImplementation implements SyncWorkflowDefinition {
279
    private final Class<?> workflowImplementationClass;
280
    private final Method workflowMethod;
281
    private WorkflowInboundCallsInterceptor workflowInvoker;
282
    // don't pass it down to other classes, it's a "cached" instance for internal usage only
283
    private final DataConverter dataConverterWithWorkflowContext;
284

285
    public POJOWorkflowImplementation(
286
        Class<?> workflowImplementationClass,
287
        Method workflowMethod,
288
        DataConverter dataConverterWithWorkflowContext) {
1✔
289
      this.workflowImplementationClass = workflowImplementationClass;
1✔
290
      this.workflowMethod = workflowMethod;
1✔
291
      this.dataConverterWithWorkflowContext = dataConverterWithWorkflowContext;
1✔
292
    }
1✔
293

294
    @Override
295
    public void initialize() {
296
      SyncWorkflowContext workflowContext = WorkflowInternal.getRootWorkflowContext();
1✔
297
      workflowInvoker = new RootWorkflowInboundCallsInterceptor(workflowContext);
1✔
298
      for (WorkerInterceptor workerInterceptor : workerInterceptors) {
1✔
299
        workflowInvoker = workerInterceptor.interceptWorkflow(workflowInvoker);
1✔
300
      }
301
      workflowContext.initHeadInboundCallsInterceptor(workflowInvoker);
1✔
302
      workflowInvoker.init(workflowContext);
1✔
303
    }
1✔
304

305
    @Override
306
    public Optional<Payloads> execute(Header header, Optional<Payloads> input)
307
        throws CanceledFailure, WorkflowExecutionException {
308

309
      Object[] args =
1✔
310
          dataConverterWithWorkflowContext.fromPayloads(
1✔
311
              input, workflowMethod.getParameterTypes(), workflowMethod.getGenericParameterTypes());
1✔
312
      Preconditions.checkNotNull(workflowInvoker, "initialize not called");
1✔
313
      WorkflowInboundCallsInterceptor.WorkflowOutput result =
1✔
314
          workflowInvoker.execute(new WorkflowInboundCallsInterceptor.WorkflowInput(header, args));
1✔
315
      if (workflowMethod.getReturnType() == Void.TYPE) {
1✔
316
        return Optional.empty();
1✔
317
      }
318
      return dataConverterWithWorkflowContext.toPayloads(result.getResult());
1✔
319
    }
320

321
    private class RootWorkflowInboundCallsInterceptor
322
        extends BaseRootWorkflowInboundCallsInterceptor {
323
      private Object workflow;
324

325
      public RootWorkflowInboundCallsInterceptor(SyncWorkflowContext workflowContext) {
1✔
326
        super(workflowContext);
1✔
327
      }
1✔
328

329
      @Override
330
      public void init(WorkflowOutboundCallsInterceptor outboundCalls) {
331
        super.init(outboundCalls);
1✔
332
        newInstance();
1✔
333
        WorkflowInternal.registerListener(workflow);
1✔
334
      }
1✔
335

336
      @Override
337
      public WorkflowOutput execute(WorkflowInput input) {
338
        try {
339
          Object result = workflowMethod.invoke(workflow, input.getArguments());
1✔
340
          return new WorkflowOutput(result);
1✔
341
        } catch (IllegalAccessException e) {
×
342
          throw wrap(e);
×
343
        } catch (InvocationTargetException e) {
1✔
344
          Throwable target = e.getTargetException();
1✔
345
          throw wrap(target);
1✔
346
        }
347
      }
348

349
      protected void newInstance() {
350
        Func<?> factory = workflowInstanceFactories.get(workflowImplementationClass);
1✔
351
        if (factory != null) {
1✔
352
          workflow = factory.apply();
1✔
353
        } else {
354
          try {
355
            workflow = workflowImplementationClass.getDeclaredConstructor().newInstance();
1✔
356
          } catch (NoSuchMethodException
×
357
              | InstantiationException
358
              | IllegalAccessException
359
              | InvocationTargetException e) {
360
            // Error to fail workflow task as this can be fixed by a new deployment.
361
            throw new Error(
×
362
                "Failure instantiating workflow implementation class "
363
                    + workflowImplementationClass.getName(),
×
364
                e);
365
          }
1✔
366
        }
367
      }
1✔
368
    }
369
  }
370

371
  @Override
372
  public String toString() {
373
    return "POJOWorkflowImplementationFactory{"
×
374
        + "registeredWorkflowTypes="
375
        + workflowDefinitions.keySet()
×
376
        + '}';
377
  }
378
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc