• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

temporalio / sdk-java / #157

pending completion
#157

push

github-actions

web-flow
Provide SerializationContext for PayloadConverter and PayloadCodec (#1695)

Issue #1694

497 of 497 new or added lines in 32 files covered. (100.0%)

16942 of 20806 relevant lines covered (81.43%)

0.81 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

87.59
/temporal-sdk/src/main/java/io/temporal/internal/sync/POJOWorkflowImplementationFactory.java
1
/*
2
 * Copyright (C) 2022 Temporal Technologies, Inc. All Rights Reserved.
3
 *
4
 * Copyright (C) 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved.
5
 *
6
 * Modifications copyright (C) 2017 Uber Technologies, Inc.
7
 *
8
 * Licensed under the Apache License, Version 2.0 (the "License");
9
 * you may not use this material except in compliance with the License.
10
 * You may obtain a copy of the License at
11
 *
12
 *   http://www.apache.org/licenses/LICENSE-2.0
13
 *
14
 * Unless required by applicable law or agreed to in writing, software
15
 * distributed under the License is distributed on an "AS IS" BASIS,
16
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17
 * See the License for the specific language governing permissions and
18
 * limitations under the License.
19
 */
20

21
package io.temporal.internal.sync;
22

23
import static io.temporal.serviceclient.CheckedExceptionWrapper.wrap;
24

25
import com.google.common.base.Preconditions;
26
import io.temporal.api.common.v1.Payloads;
27
import io.temporal.api.common.v1.WorkflowExecution;
28
import io.temporal.api.common.v1.WorkflowType;
29
import io.temporal.common.context.ContextPropagator;
30
import io.temporal.common.converter.DataConverter;
31
import io.temporal.common.interceptors.Header;
32
import io.temporal.common.interceptors.WorkerInterceptor;
33
import io.temporal.common.interceptors.WorkflowInboundCallsInterceptor;
34
import io.temporal.common.interceptors.WorkflowOutboundCallsInterceptor;
35
import io.temporal.common.metadata.POJOWorkflowImplMetadata;
36
import io.temporal.common.metadata.POJOWorkflowInterfaceMetadata;
37
import io.temporal.common.metadata.POJOWorkflowMethodMetadata;
38
import io.temporal.failure.CanceledFailure;
39
import io.temporal.internal.replay.ReplayWorkflow;
40
import io.temporal.internal.replay.ReplayWorkflowFactory;
41
import io.temporal.internal.worker.SingleWorkerOptions;
42
import io.temporal.internal.worker.WorkflowExecutionException;
43
import io.temporal.internal.worker.WorkflowExecutorCache;
44
import io.temporal.payload.context.WorkflowSerializationContext;
45
import io.temporal.worker.TypeAlreadyRegisteredException;
46
import io.temporal.worker.WorkflowImplementationOptions;
47
import io.temporal.workflow.DynamicWorkflow;
48
import io.temporal.workflow.Functions;
49
import io.temporal.workflow.Functions.Func;
50
import java.lang.reflect.InvocationTargetException;
51
import java.lang.reflect.Method;
52
import java.util.Collections;
53
import java.util.HashMap;
54
import java.util.List;
55
import java.util.Map;
56
import java.util.Objects;
57
import java.util.Optional;
58
import javax.annotation.Nonnull;
59
import org.slf4j.Logger;
60
import org.slf4j.LoggerFactory;
61

62
public final class POJOWorkflowImplementationFactory implements ReplayWorkflowFactory {
63
  private static final Logger log =
1✔
64
      LoggerFactory.getLogger(POJOWorkflowImplementationFactory.class);
1✔
65
  private final WorkerInterceptor[] workerInterceptors;
66

67
  private final DataConverter dataConverter;
68
  private final List<ContextPropagator> contextPropagators;
69
  private final long defaultDeadlockDetectionTimeout;
70

71
  /** Key: workflow type name, Value: function that creates SyncWorkflowDefinition instance. */
72
  private final Map<String, Functions.Func1<WorkflowExecution, SyncWorkflowDefinition>>
1✔
73
      workflowDefinitions = Collections.synchronizedMap(new HashMap<>());
1✔
74
  /** Factories providing instances of workflow classes. */
75
  private final Map<Class<?>, Functions.Func<?>> workflowInstanceFactories =
1✔
76
      Collections.synchronizedMap(new HashMap<>());
1✔
77
  /** If present then it is called for any unknown workflow type. */
78
  private Functions.Func<? extends DynamicWorkflow> dynamicWorkflowImplementationFactory;
79

80
  private final Map<String, WorkflowImplementationOptions> implementationOptions =
1✔
81
      Collections.synchronizedMap(new HashMap<>());
1✔
82

83
  private final WorkflowThreadExecutor workflowThreadExecutor;
84
  private final WorkflowExecutorCache cache;
85

86
  private final String namespace;
87

88
  public POJOWorkflowImplementationFactory(
89
      SingleWorkerOptions singleWorkerOptions,
90
      WorkflowThreadExecutor workflowThreadExecutor,
91
      WorkerInterceptor[] workerInterceptors,
92
      WorkflowExecutorCache cache,
93
      @Nonnull String namespace) {
1✔
94
    Objects.requireNonNull(singleWorkerOptions);
1✔
95
    this.dataConverter = singleWorkerOptions.getDataConverter();
1✔
96
    this.workflowThreadExecutor = Objects.requireNonNull(workflowThreadExecutor);
1✔
97
    this.workerInterceptors = Objects.requireNonNull(workerInterceptors);
1✔
98
    this.cache = cache;
1✔
99
    this.contextPropagators = singleWorkerOptions.getContextPropagators();
1✔
100
    this.defaultDeadlockDetectionTimeout = singleWorkerOptions.getDefaultDeadlockDetectionTimeout();
1✔
101
    this.namespace = namespace;
1✔
102
  }
1✔
103

104
  public void registerWorkflowImplementationTypes(
105
      WorkflowImplementationOptions options, Class<?>[] workflowImplementationTypes) {
106
    for (Class<?> type : workflowImplementationTypes) {
1✔
107
      registerWorkflowImplementationType(options, type);
1✔
108
    }
109
  }
1✔
110

111
  /**
112
   * @param clazz has to be a workflow interface class. The only exception is if it's a
113
   *     DynamicWorkflow class.
114
   */
115
  @SuppressWarnings("unchecked")
116
  public <R> void addWorkflowImplementationFactory(
117
      WorkflowImplementationOptions options, Class<R> clazz, Functions.Func<R> factory) {
118
    if (DynamicWorkflow.class.isAssignableFrom(clazz)) {
1✔
119
      if (dynamicWorkflowImplementationFactory != null) {
1✔
120
        throw new TypeAlreadyRegisteredException(
×
121
            "DynamicWorkflow",
122
            "An implementation of DynamicWorkflow or its factory is already registered with the worker");
123
      }
124
      dynamicWorkflowImplementationFactory = (Func<? extends DynamicWorkflow>) factory;
1✔
125
      return;
1✔
126
    }
127
    workflowInstanceFactories.put(clazz, factory);
1✔
128
    POJOWorkflowInterfaceMetadata workflowMetadata =
1✔
129
        POJOWorkflowInterfaceMetadata.newInstance(clazz);
1✔
130
    if (!workflowMetadata.getWorkflowMethod().isPresent()) {
1✔
131
      throw new IllegalArgumentException(
×
132
          "Workflow interface doesn't contain a method annotated with @WorkflowMethod: " + clazz);
133
    }
134
    List<POJOWorkflowMethodMetadata> methodsMetadata = workflowMetadata.getMethodsMetadata();
1✔
135
    for (POJOWorkflowMethodMetadata methodMetadata : methodsMetadata) {
1✔
136
      switch (methodMetadata.getType()) {
1✔
137
        case WORKFLOW:
138
          String typeName = methodMetadata.getName();
1✔
139
          if (workflowDefinitions.containsKey(typeName)) {
1✔
140
            throw new TypeAlreadyRegisteredException(
×
141
                typeName,
142
                "\"" + typeName + "\" workflow type is already registered with the worker");
143
          }
144
          workflowDefinitions.put(
1✔
145
              typeName,
146
              (execution) ->
147
                  new POJOWorkflowImplementation(
1✔
148
                      clazz,
149
                      methodMetadata.getWorkflowMethod(),
1✔
150
                      dataConverter.withContext(
1✔
151
                          new WorkflowSerializationContext(namespace, execution.getWorkflowId()))));
1✔
152
          implementationOptions.put(typeName, options);
1✔
153
          break;
1✔
154
        case SIGNAL:
155
          // Signals are registered through Workflow.registerListener
156
          break;
157
      }
158
    }
1✔
159
  }
1✔
160

161
  private <T> void registerWorkflowImplementationType(
162
      WorkflowImplementationOptions options, Class<T> workflowImplementationClass) {
163
    if (DynamicWorkflow.class.isAssignableFrom(workflowImplementationClass)) {
1✔
164
      addWorkflowImplementationFactory(
1✔
165
          options,
166
          workflowImplementationClass,
167
          () -> {
168
            try {
169
              return workflowImplementationClass.getDeclaredConstructor().newInstance();
1✔
170
            } catch (NoSuchMethodException
×
171
                | InstantiationException
172
                | IllegalAccessException
173
                | InvocationTargetException e) {
174
              // Error to fail workflow task as this can be fixed by a new deployment.
175
              throw new Error(
×
176
                  "Failure instantiating workflow implementation class "
177
                      + workflowImplementationClass.getName(),
×
178
                  e);
179
            }
180
          });
181
      return;
1✔
182
    }
183
    POJOWorkflowImplMetadata workflowMetadata =
1✔
184
        POJOWorkflowImplMetadata.newInstance(workflowImplementationClass);
1✔
185
    List<POJOWorkflowMethodMetadata> workflowMethods = workflowMetadata.getWorkflowMethods();
1✔
186
    if (workflowMethods.isEmpty()) {
1✔
187
      throw new IllegalArgumentException(
×
188
          "Workflow implementation doesn't implement any interface "
189
              + "with a workflow method annotated with @WorkflowMethod: "
190
              + workflowImplementationClass);
191
    }
192
    for (POJOWorkflowMethodMetadata workflowMethod : workflowMethods) {
1✔
193
      String workflowName = workflowMethod.getName();
1✔
194
      Method method = workflowMethod.getWorkflowMethod();
1✔
195
      Functions.Func1<WorkflowExecution, SyncWorkflowDefinition> definition =
1✔
196
          (execution) ->
197
              new POJOWorkflowImplementation(
1✔
198
                  workflowImplementationClass,
199
                  method,
200
                  dataConverter.withContext(
1✔
201
                      new WorkflowSerializationContext(namespace, execution.getWorkflowId())));
1✔
202

203
      if (workflowDefinitions.containsKey(workflowName)) {
1✔
204
        throw new IllegalStateException(
×
205
            workflowName + " workflow type is already registered with the worker");
206
      }
207
      workflowDefinitions.put(workflowName, definition);
1✔
208
      implementationOptions.put(workflowName, options);
1✔
209
    }
1✔
210
  }
1✔
211

212
  private SyncWorkflowDefinition getWorkflowDefinition(
213
      WorkflowType workflowType, WorkflowExecution workflowExecution) {
214
    Functions.Func1<WorkflowExecution, SyncWorkflowDefinition> factory =
1✔
215
        workflowDefinitions.get(workflowType.getName());
1✔
216
    if (factory == null) {
1✔
217
      if (dynamicWorkflowImplementationFactory != null) {
1✔
218
        return new DynamicSyncWorkflowDefinition(
1✔
219
            dynamicWorkflowImplementationFactory, workerInterceptors, dataConverter);
220
      }
221
      // throw Error to abort the workflow task, not fail the workflow
222
      throw new Error(
1✔
223
          "Unknown workflow type \""
224
              + workflowType.getName()
1✔
225
              + "\". Known types are "
226
              + workflowDefinitions.keySet());
1✔
227
    }
228
    try {
229
      return factory.apply(workflowExecution);
1✔
230
    } catch (Exception e) {
×
231
      throw new Error(e);
×
232
    }
233
  }
234

235
  @Override
236
  public ReplayWorkflow getWorkflow(
237
      WorkflowType workflowType, WorkflowExecution workflowExecution) {
238
    SyncWorkflowDefinition workflow = getWorkflowDefinition(workflowType, workflowExecution);
1✔
239
    WorkflowImplementationOptions workflowImplementationOptions =
1✔
240
        implementationOptions.get(workflowType.getName());
1✔
241
    DataConverter dataConverterWithWorkflowContext =
1✔
242
        dataConverter.withContext(
1✔
243
            new WorkflowSerializationContext(namespace, workflowExecution.getWorkflowId()));
1✔
244
    return new SyncWorkflow(
1✔
245
        namespace,
246
        workflowExecution,
247
        workflow,
248
        new SignalDispatcher(dataConverterWithWorkflowContext),
249
        new QueryDispatcher(dataConverterWithWorkflowContext),
250
        workflowImplementationOptions,
251
        dataConverter,
252
        workflowThreadExecutor,
253
        cache,
254
        contextPropagators,
255
        defaultDeadlockDetectionTimeout);
256
  }
257

258
  @Override
259
  public boolean isAnyTypeSupported() {
260
    return !workflowDefinitions.isEmpty() || dynamicWorkflowImplementationFactory != null;
1✔
261
  }
262

263
  private class POJOWorkflowImplementation implements SyncWorkflowDefinition {
264
    private final Class<?> workflowImplementationClass;
265
    private final Method workflowMethod;
266
    private WorkflowInboundCallsInterceptor workflowInvoker;
267
    // don't pass it down to other classes, it's a "cached" instance for internal usage only
268
    private final DataConverter dataConverterWithWorkflowContext;
269

270
    public POJOWorkflowImplementation(
271
        Class<?> workflowImplementationClass,
272
        Method workflowMethod,
273
        DataConverter dataConverterWithWorkflowContext) {
1✔
274
      this.workflowImplementationClass = workflowImplementationClass;
1✔
275
      this.workflowMethod = workflowMethod;
1✔
276
      this.dataConverterWithWorkflowContext = dataConverterWithWorkflowContext;
1✔
277
    }
1✔
278

279
    @Override
280
    public void initialize() {
281
      SyncWorkflowContext workflowContext = WorkflowInternal.getRootWorkflowContext();
1✔
282
      workflowInvoker = new RootWorkflowInboundCallsInterceptor(workflowContext);
1✔
283
      for (WorkerInterceptor workerInterceptor : workerInterceptors) {
1✔
284
        workflowInvoker = workerInterceptor.interceptWorkflow(workflowInvoker);
1✔
285
      }
286
      workflowContext.initHeadInboundCallsInterceptor(workflowInvoker);
1✔
287
      workflowInvoker.init(workflowContext);
1✔
288
    }
1✔
289

290
    @Override
291
    public Optional<Payloads> execute(Header header, Optional<Payloads> input)
292
        throws CanceledFailure, WorkflowExecutionException {
293

294
      Object[] args =
1✔
295
          DataConverter.arrayFromPayloads(
1✔
296
              dataConverterWithWorkflowContext,
297
              input,
298
              workflowMethod.getParameterTypes(),
1✔
299
              workflowMethod.getGenericParameterTypes());
1✔
300
      Preconditions.checkNotNull(workflowInvoker, "initialize not called");
1✔
301
      WorkflowInboundCallsInterceptor.WorkflowOutput result =
1✔
302
          workflowInvoker.execute(new WorkflowInboundCallsInterceptor.WorkflowInput(header, args));
1✔
303
      if (workflowMethod.getReturnType() == Void.TYPE) {
1✔
304
        return Optional.empty();
1✔
305
      }
306
      return dataConverterWithWorkflowContext.toPayloads(result.getResult());
1✔
307
    }
308

309
    private class RootWorkflowInboundCallsInterceptor
310
        extends BaseRootWorkflowInboundCallsInterceptor {
311
      private Object workflow;
312

313
      public RootWorkflowInboundCallsInterceptor(SyncWorkflowContext workflowContext) {
1✔
314
        super(workflowContext);
1✔
315
      }
1✔
316

317
      @Override
318
      public void init(WorkflowOutboundCallsInterceptor outboundCalls) {
319
        super.init(outboundCalls);
1✔
320
        newInstance();
1✔
321
        WorkflowInternal.registerListener(workflow);
1✔
322
      }
1✔
323

324
      @Override
325
      public WorkflowOutput execute(WorkflowInput input) {
326
        try {
327
          Object result = workflowMethod.invoke(workflow, input.getArguments());
1✔
328
          return new WorkflowOutput(result);
1✔
329
        } catch (IllegalAccessException e) {
×
330
          throw wrap(e);
×
331
        } catch (InvocationTargetException e) {
1✔
332
          Throwable target = e.getTargetException();
1✔
333
          throw wrap(target);
1✔
334
        }
335
      }
336

337
      protected void newInstance() {
338
        Func<?> factory = workflowInstanceFactories.get(workflowImplementationClass);
1✔
339
        if (factory != null) {
1✔
340
          workflow = factory.apply();
1✔
341
        } else {
342
          try {
343
            workflow = workflowImplementationClass.getDeclaredConstructor().newInstance();
1✔
344
          } catch (NoSuchMethodException
×
345
              | InstantiationException
346
              | IllegalAccessException
347
              | InvocationTargetException e) {
348
            // Error to fail workflow task as this can be fixed by a new deployment.
349
            throw new Error(
×
350
                "Failure instantiating workflow implementation class "
351
                    + workflowImplementationClass.getName(),
×
352
                e);
353
          }
1✔
354
        }
355
      }
1✔
356
    }
357
  }
358

359
  @Override
360
  public String toString() {
361
    return "POJOWorkflowImplementationFactory{"
×
362
        + "registeredWorkflowTypes="
363
        + workflowDefinitions.keySet()
×
364
        + '}';
365
  }
366
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc