• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

temporalio / sdk-java / #343

31 Oct 2024 06:31PM UTC coverage: 75.148% (-3.6%) from 78.794%
#343

push

github

web-flow
Fix jacoco coverage (#2304)

5139 of 8240 branches covered (62.37%)

Branch coverage included in aggregate %.

22841 of 28993 relevant lines covered (78.78%)

0.79 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

87.44
/temporal-sdk/src/main/java/io/temporal/internal/sync/POJOWorkflowImplementationFactory.java
1
/*
2
 * Copyright (C) 2022 Temporal Technologies, Inc. All Rights Reserved.
3
 *
4
 * Copyright (C) 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved.
5
 *
6
 * Modifications copyright (C) 2017 Uber Technologies, Inc.
7
 *
8
 * Licensed under the Apache License, Version 2.0 (the "License");
9
 * you may not use this material except in compliance with the License.
10
 * You may obtain a copy of the License at
11
 *
12
 *   http://www.apache.org/licenses/LICENSE-2.0
13
 *
14
 * Unless required by applicable law or agreed to in writing, software
15
 * distributed under the License is distributed on an "AS IS" BASIS,
16
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17
 * See the License for the specific language governing permissions and
18
 * limitations under the License.
19
 */
20

21
package io.temporal.internal.sync;
22

23
import static io.temporal.serviceclient.CheckedExceptionWrapper.wrap;
24

25
import com.google.common.base.Preconditions;
26
import com.google.common.collect.ImmutableSet;
27
import io.temporal.api.common.v1.Payloads;
28
import io.temporal.api.common.v1.WorkflowExecution;
29
import io.temporal.api.common.v1.WorkflowType;
30
import io.temporal.common.context.ContextPropagator;
31
import io.temporal.common.converter.DataConverter;
32
import io.temporal.common.converter.EncodedValues;
33
import io.temporal.common.interceptors.Header;
34
import io.temporal.common.interceptors.WorkerInterceptor;
35
import io.temporal.common.interceptors.WorkflowInboundCallsInterceptor;
36
import io.temporal.common.interceptors.WorkflowOutboundCallsInterceptor;
37
import io.temporal.common.metadata.POJOWorkflowImplMetadata;
38
import io.temporal.common.metadata.POJOWorkflowInterfaceMetadata;
39
import io.temporal.common.metadata.POJOWorkflowMethodMetadata;
40
import io.temporal.failure.CanceledFailure;
41
import io.temporal.internal.common.env.ReflectionUtils;
42
import io.temporal.internal.replay.ReplayWorkflow;
43
import io.temporal.internal.replay.ReplayWorkflowFactory;
44
import io.temporal.internal.worker.SingleWorkerOptions;
45
import io.temporal.internal.worker.WorkflowExecutionException;
46
import io.temporal.internal.worker.WorkflowExecutorCache;
47
import io.temporal.payload.context.WorkflowSerializationContext;
48
import io.temporal.worker.TypeAlreadyRegisteredException;
49
import io.temporal.worker.WorkflowImplementationOptions;
50
import io.temporal.workflow.DynamicWorkflow;
51
import io.temporal.workflow.Functions;
52
import io.temporal.workflow.Functions.Func;
53
import java.lang.reflect.Constructor;
54
import java.lang.reflect.InvocationTargetException;
55
import java.lang.reflect.Method;
56
import java.util.Collections;
57
import java.util.HashMap;
58
import java.util.List;
59
import java.util.Map;
60
import java.util.Objects;
61
import java.util.Optional;
62
import javax.annotation.Nonnull;
63
import org.slf4j.Logger;
64
import org.slf4j.LoggerFactory;
65

66
public final class POJOWorkflowImplementationFactory implements ReplayWorkflowFactory {
67
  private static final Logger log =
1✔
68
      LoggerFactory.getLogger(POJOWorkflowImplementationFactory.class);
1✔
69

70
  public static final ImmutableSet<String> WORKFLOW_HANDLER_STACKTRACE_CUTOFF =
1✔
71
      ImmutableSet.<String>builder()
1✔
72
          // POJO
73
          .add(
1✔
74
              ReflectionUtils.getMethodNameForStackTraceCutoff(
1✔
75
                  POJOWorkflowImplementation.class, "execute", Header.class, Optional.class))
76
          // Dynamic
77
          .add(
1✔
78
              ReflectionUtils.getMethodNameForStackTraceCutoff(
1✔
79
                  DynamicSyncWorkflowDefinition.class, "execute", Header.class, Optional.class))
80
          .build();
1✔
81
  private final WorkerInterceptor[] workerInterceptors;
82

83
  private final DataConverter dataConverter;
84
  private final List<ContextPropagator> contextPropagators;
85
  private final long defaultDeadlockDetectionTimeout;
86

87
  /** Key: workflow type name, Value: function that creates SyncWorkflowDefinition instance. */
88
  private final Map<String, Functions.Func1<WorkflowExecution, SyncWorkflowDefinition>>
1✔
89
      workflowDefinitions = Collections.synchronizedMap(new HashMap<>());
1✔
90

91
  /** Factories providing instances of workflow classes. */
92
  private final Map<Class<?>, Functions.Func<?>> workflowInstanceFactories =
1✔
93
      Collections.synchronizedMap(new HashMap<>());
1✔
94

95
  /** If present then it is called for any unknown workflow type. */
96
  private Functions.Func1<EncodedValues, ? extends DynamicWorkflow>
97
      dynamicWorkflowImplementationFactory;
98

99
  private final Map<String, WorkflowImplementationOptions> implementationOptions =
1✔
100
      Collections.synchronizedMap(new HashMap<>());
1✔
101

102
  private final WorkflowThreadExecutor workflowThreadExecutor;
103
  private final WorkflowExecutorCache cache;
104

105
  private final String namespace;
106

107
  public POJOWorkflowImplementationFactory(
108
      SingleWorkerOptions singleWorkerOptions,
109
      WorkflowThreadExecutor workflowThreadExecutor,
110
      WorkerInterceptor[] workerInterceptors,
111
      WorkflowExecutorCache cache,
112
      @Nonnull String namespace) {
1✔
113
    Objects.requireNonNull(singleWorkerOptions);
1✔
114
    this.dataConverter = singleWorkerOptions.getDataConverter();
1✔
115
    this.workflowThreadExecutor = Objects.requireNonNull(workflowThreadExecutor);
1✔
116
    this.workerInterceptors = Objects.requireNonNull(workerInterceptors);
1✔
117
    this.cache = cache;
1✔
118
    this.contextPropagators = singleWorkerOptions.getContextPropagators();
1✔
119
    this.defaultDeadlockDetectionTimeout = singleWorkerOptions.getDefaultDeadlockDetectionTimeout();
1✔
120
    this.namespace = namespace;
1✔
121
  }
1✔
122

123
  public void registerWorkflowImplementationTypes(
124
      WorkflowImplementationOptions options, Class<?>[] workflowImplementationTypes) {
125
    for (Class<?> type : workflowImplementationTypes) {
1✔
126
      registerWorkflowImplementationType(options, type);
1✔
127
    }
128
  }
1✔
129

130
  /**
131
   * @param clazz has to be a workflow interface class. The only exception is if it's a
132
   *     DynamicWorkflow class.
133
   */
134
  @SuppressWarnings("unchecked")
135
  public <R> void addWorkflowImplementationFactory(
136
      WorkflowImplementationOptions options, Class<R> clazz, Functions.Func<R> factory) {
137
    if (DynamicWorkflow.class.isAssignableFrom(clazz)) {
1✔
138
      if (dynamicWorkflowImplementationFactory != null) {
1!
139
        throw new TypeAlreadyRegisteredException(
×
140
            "DynamicWorkflow",
141
            "An implementation of DynamicWorkflow or its factory is already registered with the worker");
142
      }
143
      dynamicWorkflowImplementationFactory =
1✔
144
          (unused) -> ((Func<? extends DynamicWorkflow>) factory).apply();
1✔
145
      return;
1✔
146
    }
147
    workflowInstanceFactories.put(clazz, factory);
1✔
148
    POJOWorkflowInterfaceMetadata workflowMetadata =
1✔
149
        POJOWorkflowInterfaceMetadata.newInstance(clazz);
1✔
150
    if (!workflowMetadata.getWorkflowMethod().isPresent()) {
1!
151
      throw new IllegalArgumentException(
×
152
          "Workflow interface doesn't contain a method annotated with @WorkflowMethod: " + clazz);
153
    }
154
    List<POJOWorkflowMethodMetadata> methodsMetadata = workflowMetadata.getMethodsMetadata();
1✔
155
    for (POJOWorkflowMethodMetadata methodMetadata : methodsMetadata) {
1✔
156
      switch (methodMetadata.getType()) {
1!
157
        case WORKFLOW:
158
          String typeName = methodMetadata.getName();
1✔
159
          if (workflowDefinitions.containsKey(typeName)) {
1!
160
            throw new TypeAlreadyRegisteredException(
×
161
                typeName,
162
                "\"" + typeName + "\" workflow type is already registered with the worker");
163
          }
164
          workflowDefinitions.put(
1✔
165
              typeName,
166
              (execution) ->
167
                  new POJOWorkflowImplementation(
1✔
168
                      clazz,
169
                      null,
170
                      methodMetadata.getWorkflowMethod(),
1✔
171
                      dataConverter.withContext(
1✔
172
                          new WorkflowSerializationContext(namespace, execution.getWorkflowId()))));
1✔
173
          implementationOptions.put(typeName, options);
1✔
174
          break;
1✔
175
        case SIGNAL:
176
          // Signals are registered through Workflow.registerListener
177
          break;
178
      }
179
    }
1✔
180
  }
1✔
181

182
  private <T> void registerWorkflowImplementationType(
183
      WorkflowImplementationOptions options, Class<T> workflowImplementationClass) {
184
    if (DynamicWorkflow.class.isAssignableFrom(workflowImplementationClass)) {
1✔
185
      if (dynamicWorkflowImplementationFactory != null) {
1!
186
        throw new TypeAlreadyRegisteredException(
×
187
            "DynamicWorkflow",
188
            "An implementation of DynamicWorkflow or its factory is already registered with the worker");
189
      }
190
      try {
191
        Method executeMethod =
1✔
192
            workflowImplementationClass.getMethod("execute", EncodedValues.class);
1✔
193
        Optional<Constructor<?>> ctor =
1✔
194
            ReflectionUtils.getConstructor(
1✔
195
                workflowImplementationClass, Collections.singletonList(executeMethod));
1✔
196
        dynamicWorkflowImplementationFactory =
1✔
197
            (encodedValues) -> {
198
              if (ctor.isPresent()) {
1✔
199
                try {
200
                  return (DynamicWorkflow) ctor.get().newInstance(encodedValues);
1✔
201
                } catch (InstantiationException
1✔
202
                    | IllegalAccessException
203
                    | InvocationTargetException e) {
204
                  throw wrap(e);
1✔
205
                }
206
              } else {
207
                try {
208
                  return (DynamicWorkflow)
1✔
209
                      workflowImplementationClass.getDeclaredConstructor().newInstance();
1✔
210
                } catch (NoSuchMethodException
×
211
                    | InstantiationException
212
                    | IllegalAccessException
213
                    | InvocationTargetException e) {
214
                  // Error to fail workflow task as this can be fixed by a new deployment.
215
                  throw new Error(
×
216
                      "Failure instantiating workflow implementation class "
217
                          + workflowImplementationClass.getName(),
×
218
                      e);
219
                }
220
              }
221
            };
222
        return;
1✔
223
      } catch (NoSuchMethodException e) {
×
224
        throw new IllegalArgumentException(
×
225
            "DynamicWorkflow implementation doesn't implement execute method: "
226
                + workflowImplementationClass);
227
      }
228
    }
229
    POJOWorkflowImplMetadata workflowMetadata =
1✔
230
        POJOWorkflowImplMetadata.newInstance(workflowImplementationClass);
1✔
231
    List<POJOWorkflowMethodMetadata> workflowMethods = workflowMetadata.getWorkflowMethods();
1✔
232
    if (workflowMethods.isEmpty()) {
1!
233
      throw new IllegalArgumentException(
×
234
          "Workflow implementation doesn't implement any interface "
235
              + "with a workflow method annotated with @WorkflowMethod: "
236
              + workflowImplementationClass);
237
    }
238
    for (POJOWorkflowMethodMetadata workflowMethod : workflowMethods) {
1✔
239
      String workflowName = workflowMethod.getName();
1✔
240
      Method method = workflowMethod.getWorkflowMethod();
1✔
241
      Functions.Func1<WorkflowExecution, SyncWorkflowDefinition> definition =
1✔
242
          (execution) ->
243
              new POJOWorkflowImplementation(
1✔
244
                  workflowImplementationClass,
245
                  workflowMetadata.getWorkflowInit(),
1✔
246
                  method,
247
                  dataConverter.withContext(
1✔
248
                      new WorkflowSerializationContext(namespace, execution.getWorkflowId())));
1✔
249

250
      if (workflowDefinitions.containsKey(workflowName)) {
1!
251
        throw new IllegalStateException(
×
252
            workflowName + " workflow type is already registered with the worker");
253
      }
254
      workflowDefinitions.put(workflowName, definition);
1✔
255
      implementationOptions.put(workflowName, options);
1✔
256
    }
1✔
257
  }
1✔
258

259
  private SyncWorkflowDefinition getWorkflowDefinition(
260
      WorkflowType workflowType, WorkflowExecution workflowExecution) {
261
    Functions.Func1<WorkflowExecution, SyncWorkflowDefinition> factory =
1✔
262
        workflowDefinitions.get(workflowType.getName());
1✔
263
    if (factory == null) {
1✔
264
      if (dynamicWorkflowImplementationFactory != null) {
1✔
265
        return new DynamicSyncWorkflowDefinition(
1✔
266
            dynamicWorkflowImplementationFactory,
267
            workerInterceptors,
268
            dataConverter.withContext(
1✔
269
                new WorkflowSerializationContext(namespace, workflowExecution.getWorkflowId())));
1✔
270
      }
271
      // throw Error to abort the workflow task, not fail the workflow
272
      throw new Error(
1✔
273
          "Unknown workflow type \""
274
              + workflowType.getName()
1✔
275
              + "\". Known types are "
276
              + workflowDefinitions.keySet());
1✔
277
    }
278
    try {
279
      return factory.apply(workflowExecution);
1✔
280
    } catch (Exception e) {
×
281
      throw new Error(e);
×
282
    }
283
  }
284

285
  @Override
286
  public ReplayWorkflow getWorkflow(
287
      WorkflowType workflowType, WorkflowExecution workflowExecution) {
288
    SyncWorkflowDefinition workflow = getWorkflowDefinition(workflowType, workflowExecution);
1✔
289
    WorkflowImplementationOptions workflowImplementationOptions =
1✔
290
        implementationOptions.get(workflowType.getName());
1✔
291
    DataConverter dataConverterWithWorkflowContext =
1✔
292
        dataConverter.withContext(
1✔
293
            new WorkflowSerializationContext(namespace, workflowExecution.getWorkflowId()));
1✔
294
    return new SyncWorkflow(
1✔
295
        namespace,
296
        workflowExecution,
297
        workflow,
298
        new SignalDispatcher(dataConverterWithWorkflowContext),
299
        new QueryDispatcher(dataConverterWithWorkflowContext),
300
        new UpdateDispatcher(dataConverterWithWorkflowContext),
301
        workflowImplementationOptions,
302
        dataConverter,
303
        workflowThreadExecutor,
304
        cache,
305
        contextPropagators,
306
        defaultDeadlockDetectionTimeout);
307
  }
308

309
  @Override
310
  public boolean isAnyTypeSupported() {
311
    return !workflowDefinitions.isEmpty() || dynamicWorkflowImplementationFactory != null;
1✔
312
  }
313

314
  private class POJOWorkflowImplementation implements SyncWorkflowDefinition {
315
    private final Class<?> workflowImplementationClass;
316
    private final Method workflowMethod;
317
    private final Constructor<?> ctor;
318
    private WorkflowInboundCallsInterceptor workflowInvoker;
319
    // don't pass it down to other classes, it's a "cached" instance for internal usage only
320
    private final DataConverter dataConverterWithWorkflowContext;
321

322
    public POJOWorkflowImplementation(
323
        Class<?> workflowImplementationClass,
324
        Constructor<?> ctor,
325
        Method workflowMethod,
326
        DataConverter dataConverterWithWorkflowContext) {
1✔
327
      this.workflowImplementationClass = workflowImplementationClass;
1✔
328
      this.ctor = ctor;
1✔
329
      this.workflowMethod = workflowMethod;
1✔
330
      this.dataConverterWithWorkflowContext = dataConverterWithWorkflowContext;
1✔
331
    }
1✔
332

333
    @Override
334
    public void initialize(Optional<Payloads> input) {
335
      SyncWorkflowContext workflowContext = WorkflowInternal.getRootWorkflowContext();
1✔
336
      workflowInvoker = new RootWorkflowInboundCallsInterceptor(workflowContext, input);
1✔
337
      for (WorkerInterceptor workerInterceptor : workerInterceptors) {
1✔
338
        workflowInvoker = workerInterceptor.interceptWorkflow(workflowInvoker);
1✔
339
      }
340
      workflowContext.initHeadInboundCallsInterceptor(workflowInvoker);
1✔
341
      workflowInvoker.init(workflowContext);
1✔
342
    }
1✔
343

344
    @Override
345
    public Optional<Payloads> execute(Header header, Optional<Payloads> input)
346
        throws CanceledFailure, WorkflowExecutionException {
347

348
      Object[] args =
1✔
349
          dataConverterWithWorkflowContext.fromPayloads(
1✔
350
              input, workflowMethod.getParameterTypes(), workflowMethod.getGenericParameterTypes());
1✔
351
      Preconditions.checkNotNull(workflowInvoker, "initialize not called");
1✔
352
      WorkflowInboundCallsInterceptor.WorkflowOutput result =
1✔
353
          workflowInvoker.execute(new WorkflowInboundCallsInterceptor.WorkflowInput(header, args));
1✔
354
      if (workflowMethod.getReturnType() == Void.TYPE) {
1✔
355
        return Optional.empty();
1✔
356
      }
357
      return dataConverterWithWorkflowContext.toPayloads(result.getResult());
1✔
358
    }
359

360
    private class RootWorkflowInboundCallsInterceptor
361
        extends BaseRootWorkflowInboundCallsInterceptor {
362
      private Object workflow;
363
      private Optional<Payloads> input;
364

365
      public RootWorkflowInboundCallsInterceptor(
366
          SyncWorkflowContext workflowContext, Optional<Payloads> input) {
1✔
367
        super(workflowContext);
1✔
368
        this.input = input;
1✔
369
      }
1✔
370

371
      @Override
372
      public void init(WorkflowOutboundCallsInterceptor outboundCalls) {
373
        super.init(outboundCalls);
1✔
374
        newInstance(input);
1✔
375
        WorkflowInternal.registerListener(workflow);
1✔
376
      }
1✔
377

378
      @Override
379
      public WorkflowOutput execute(WorkflowInput input) {
380
        try {
381
          Object result = workflowMethod.invoke(workflow, input.getArguments());
1✔
382
          return new WorkflowOutput(result);
1✔
383
        } catch (IllegalAccessException e) {
×
384
          throw wrap(e);
×
385
        } catch (InvocationTargetException e) {
1✔
386
          Throwable target = e.getTargetException();
1✔
387
          throw wrap(target);
1✔
388
        }
389
      }
390

391
      protected void newInstance(Optional<Payloads> input) {
392
        Func<?> factory = workflowInstanceFactories.get(workflowImplementationClass);
1✔
393
        if (factory != null) {
1✔
394
          workflow = factory.apply();
1✔
395
        } else {
396
          // Historically any exception thrown from the constructor was wrapped into Error causing a
397
          // workflow task failure.
398
          // This is not consistent with throwing exception from the workflow method which can
399
          // causes a workflow failure depending on the exception type.
400
          // To preserve backwards compatibility we only change behaviour if a constructor is
401
          // annotated with WorkflowInit.
402
          if (ctor != null) {
1✔
403
            try {
404
              workflow =
1✔
405
                  ctor.newInstance(
1✔
406
                      dataConverterWithWorkflowContext.fromPayloads(
1✔
407
                          input, ctor.getParameterTypes(), ctor.getGenericParameterTypes()));
1✔
408
            } catch (InstantiationException
1✔
409
                | IllegalAccessException
410
                | InvocationTargetException e) {
411
              throw wrap(e);
1✔
412
            }
1✔
413
          } else {
414
            legacyNewInstance();
1✔
415
          }
416
        }
417
      }
1✔
418

419
      private void legacyNewInstance() {
420
        try {
421
          workflow = workflowImplementationClass.getDeclaredConstructor().newInstance();
1✔
422
        } catch (NoSuchMethodException
×
423
            | InstantiationException
424
            | IllegalAccessException
425
            | InvocationTargetException e) {
426
          // Error to fail workflow task as this can be fixed by a new deployment.
427
          throw new Error(
×
428
              "Failure instantiating workflow implementation class "
429
                  + workflowImplementationClass.getName(),
×
430
              e);
431
        }
1✔
432
      }
1✔
433
    }
434
  }
435

436
  @Override
437
  public String toString() {
438
    return "POJOWorkflowImplementationFactory{"
×
439
        + "registeredWorkflowTypes="
440
        + workflowDefinitions.keySet()
×
441
        + '}';
442
  }
443
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc