• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

uber / cadence-java-client / 16

16 Apr 2024 01:28AM UTC coverage: 60.239% (-0.1%) from 60.343%
16

push

buildkite

mstifflin
Remove unnecessary sidecar command, try executing with lower resources

11446 of 19001 relevant lines covered (60.24%)

0.6 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

82.72
/src/main/java/com/uber/cadence/internal/sync/POJOWorkflowImplementationFactory.java
1
/*
2
 *  Copyright 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved.
3
 *
4
 *  Modifications copyright (C) 2017 Uber Technologies, Inc.
5
 *
6
 *  Licensed under the Apache License, Version 2.0 (the "License"). You may not
7
 *  use this file except in compliance with the License. A copy of the License is
8
 *  located at
9
 *
10
 *  http://aws.amazon.com/apache2.0
11
 *
12
 *  or in the "license" file accompanying this file. This file is distributed on
13
 *  an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
14
 *  express or implied. See the License for the specific language governing
15
 *  permissions and limitations under the License.
16
 */
17

18
package com.uber.cadence.internal.sync;
19

20
import static com.uber.cadence.internal.errors.ErrorType.UNKNOWN_WORKFLOW_TYPE;
21
import static com.uber.cadence.worker.NonDeterministicWorkflowPolicy.FailWorkflow;
22

23
import com.google.common.reflect.TypeToken;
24
import com.uber.cadence.WorkflowType;
25
import com.uber.cadence.context.ContextPropagator;
26
import com.uber.cadence.converter.DataConverter;
27
import com.uber.cadence.converter.DataConverterException;
28
import com.uber.cadence.internal.common.CheckedExceptionWrapper;
29
import com.uber.cadence.internal.common.InternalUtils;
30
import com.uber.cadence.internal.metrics.MetricsType;
31
import com.uber.cadence.internal.replay.DeciderCache;
32
import com.uber.cadence.internal.replay.ReplayWorkflow;
33
import com.uber.cadence.internal.replay.ReplayWorkflowFactory;
34
import com.uber.cadence.internal.worker.WorkflowExecutionException;
35
import com.uber.cadence.testing.SimulatedTimeoutException;
36
import com.uber.cadence.worker.WorkflowImplementationOptions;
37
import com.uber.cadence.workflow.Functions;
38
import com.uber.cadence.workflow.Functions.Func;
39
import com.uber.cadence.workflow.QueryMethod;
40
import com.uber.cadence.workflow.SignalMethod;
41
import com.uber.cadence.workflow.Workflow;
42
import com.uber.cadence.workflow.WorkflowInfo;
43
import com.uber.cadence.workflow.WorkflowInterceptor;
44
import com.uber.cadence.workflow.WorkflowMethod;
45
import io.opentracing.Tracer;
46
import java.lang.reflect.InvocationTargetException;
47
import java.lang.reflect.Method;
48
import java.util.Collections;
49
import java.util.HashMap;
50
import java.util.List;
51
import java.util.Map;
52
import java.util.Objects;
53
import java.util.concurrent.CancellationException;
54
import java.util.concurrent.ExecutorService;
55
import java.util.function.Function;
56
import org.slf4j.Logger;
57
import org.slf4j.LoggerFactory;
58

59
final class POJOWorkflowImplementationFactory implements ReplayWorkflowFactory {
60

61
  private static final Logger log =
1✔
62
      LoggerFactory.getLogger(POJOWorkflowImplementationFactory.class);
1✔
63
  private static final byte[] EMPTY_BLOB = {};
1✔
64
  private final Function<WorkflowInterceptor, WorkflowInterceptor> interceptorFactory;
65
  private final Tracer tracer;
66

67
  private DataConverter dataConverter;
68
  private List<ContextPropagator> contextPropagators;
69

70
  /** Key: workflow type name, Value: function that creates SyncWorkflowDefinition instance. */
71
  private final Map<String, Functions.Func<SyncWorkflowDefinition>> workflowDefinitions =
1✔
72
      Collections.synchronizedMap(new HashMap<>());
1✔
73

74
  private Map<String, WorkflowImplementationOptions> implementationOptions =
1✔
75
      Collections.synchronizedMap(new HashMap<>());
1✔
76

77
  private final Map<Class<?>, Functions.Func<?>> workflowImplementationFactories =
1✔
78
      Collections.synchronizedMap(new HashMap<>());
1✔
79

80
  private final ExecutorService threadPool;
81
  private DeciderCache cache;
82

83
  POJOWorkflowImplementationFactory(
84
      DataConverter dataConverter,
85
      ExecutorService threadPool,
86
      Function<WorkflowInterceptor, WorkflowInterceptor> interceptorFactory,
87
      DeciderCache cache,
88
      List<ContextPropagator> contextPropagators,
89
      Tracer tracer) {
1✔
90
    this.dataConverter = Objects.requireNonNull(dataConverter);
1✔
91
    this.threadPool = Objects.requireNonNull(threadPool);
1✔
92
    this.interceptorFactory = Objects.requireNonNull(interceptorFactory);
1✔
93
    this.cache = cache;
1✔
94
    this.contextPropagators = contextPropagators;
1✔
95
    this.tracer = tracer;
1✔
96
  }
1✔
97

98
  void setWorkflowImplementationTypes(
99
      WorkflowImplementationOptions options, Class<?>[] workflowImplementationTypes) {
100
    workflowDefinitions.clear();
1✔
101
    for (Class<?> type : workflowImplementationTypes) {
1✔
102
      addWorkflowImplementationType(options, type);
1✔
103
    }
104
  }
1✔
105

106
  <R> void addWorkflowImplementationFactory(Class<R> clazz, Functions.Func<R> factory) {
107
    WorkflowImplementationOptions unitTestingOptions =
1✔
108
        new WorkflowImplementationOptions.Builder()
109
            .setNonDeterministicWorkflowPolicy(FailWorkflow)
1✔
110
            .build();
1✔
111
    addWorkflowImplementationFactory(unitTestingOptions, clazz, factory);
1✔
112
  }
1✔
113

114
  <R> void addWorkflowImplementationFactory(
115
      WorkflowImplementationOptions options, Class<R> clazz, Functions.Func<R> factory) {
116
    workflowImplementationFactories.put(clazz, factory);
1✔
117
    addWorkflowImplementationType(options, clazz);
1✔
118
  }
1✔
119

120
  private void addWorkflowImplementationType(
121
      WorkflowImplementationOptions options, Class<?> workflowImplementationClass) {
122
    TypeToken<?>.TypeSet interfaces =
1✔
123
        TypeToken.of(workflowImplementationClass).getTypes().interfaces();
1✔
124
    if (interfaces.isEmpty()) {
1✔
125
      throw new IllegalArgumentException("Workflow must implement at least one interface");
×
126
    }
127
    boolean hasWorkflowMethod = false;
1✔
128
    for (TypeToken<?> i : interfaces) {
1✔
129
      Map<String, Method> signalHandlers = new HashMap<>();
1✔
130
      for (Method method : i.getRawType().getMethods()) {
1✔
131
        WorkflowMethod workflowMethod = method.getAnnotation(WorkflowMethod.class);
1✔
132
        QueryMethod queryMethod = method.getAnnotation(QueryMethod.class);
1✔
133
        SignalMethod signalMethod = method.getAnnotation(SignalMethod.class);
1✔
134
        int count =
135
            (workflowMethod == null ? 0 : 1)
1✔
136
                + (queryMethod == null ? 0 : 1)
1✔
137
                + (signalMethod == null ? 0 : 1);
1✔
138
        if (count > 1) {
1✔
139
          throw new IllegalArgumentException(
×
140
              method
141
                  + " must contain at most one annotation "
142
                  + "from @WorkflowMethod, @QueryMethod or @SignalMethod");
143
        }
144
        if (workflowMethod != null) {
1✔
145
          Functions.Func<SyncWorkflowDefinition> factory =
1✔
146
              () ->
147
                  new POJOWorkflowImplementation(
1✔
148
                      method, workflowImplementationClass, signalHandlers);
149

150
          String workflowName = workflowMethod.name();
1✔
151
          if (workflowName.isEmpty()) {
1✔
152
            workflowName = InternalUtils.getSimpleName(method);
1✔
153
          }
154
          if (workflowDefinitions.containsKey(workflowName)) {
1✔
155
            throw new IllegalStateException(
×
156
                workflowName + " workflow type is already registered with the worker");
157
          }
158
          workflowDefinitions.put(workflowName, factory);
1✔
159
          implementationOptions.put(workflowName, options);
1✔
160
          hasWorkflowMethod = true;
1✔
161
        }
162
        if (signalMethod != null) {
1✔
163
          if (method.getReturnType() != Void.TYPE) {
1✔
164
            throw new IllegalArgumentException(
×
165
                "Method annotated with @SignalMethod " + "must have void return type: " + method);
166
          }
167
          String signalName = signalMethod.name();
1✔
168
          if (signalName.isEmpty()) {
1✔
169
            signalName = InternalUtils.getSimpleName(method);
1✔
170
          }
171
          signalHandlers.put(signalName, method);
1✔
172
        }
173
        if (queryMethod != null) {
1✔
174
          if (method.getReturnType() == Void.TYPE) {
1✔
175
            throw new IllegalArgumentException(
×
176
                "Method annotated with @QueryMethod " + "cannot have void return type: " + method);
177
          }
178
        }
179
      }
180
    }
1✔
181
    if (!hasWorkflowMethod) {
1✔
182
      throw new IllegalArgumentException(
×
183
          "Workflow implementation doesn't implement any interface "
184
              + "with a workflow method annotated with @WorkflowMethod: "
185
              + workflowImplementationClass);
186
    }
187
  }
1✔
188

189
  private SyncWorkflowDefinition getWorkflowDefinition(WorkflowType workflowType) {
190
    Functions.Func<SyncWorkflowDefinition> factory =
1✔
191
        workflowDefinitions.get(workflowType.getName());
1✔
192
    if (factory == null) {
1✔
193
      // throw Error to abort decision, not fail the workflow
194
      throw new Error(
×
195
          UNKNOWN_WORKFLOW_TYPE
196
              + " \""
197
              + workflowType.getName()
×
198
              + "\". Known types are "
199
              + workflowDefinitions.keySet());
×
200
    }
201
    try {
202
      return factory.apply();
1✔
203
    } catch (Exception e) {
×
204
      throw new Error(e);
×
205
    }
206
  }
207

208
  public void setDataConverter(DataConverter dataConverter) {
209
    this.dataConverter = dataConverter;
×
210
  }
×
211

212
  @Override
213
  public ReplayWorkflow getWorkflow(WorkflowType workflowType) {
214
    SyncWorkflowDefinition workflow = getWorkflowDefinition(workflowType);
1✔
215
    WorkflowImplementationOptions options = implementationOptions.get(workflowType.getName());
1✔
216
    return new SyncWorkflow(
1✔
217
        workflow,
218
        options,
219
        dataConverter,
220
        threadPool,
221
        interceptorFactory,
222
        cache,
223
        contextPropagators,
224
        tracer);
225
  }
226

227
  @Override
228
  public boolean isAnyTypeSupported() {
229
    return !workflowDefinitions.isEmpty();
1✔
230
  }
231

232
  private class POJOWorkflowImplementation implements SyncWorkflowDefinition {
233

234
    private final Method workflowMethod;
235
    private final Class<?> workflowImplementationClass;
236
    private final Map<String, Method> signalHandlers;
237
    private Object workflow;
238

239
    POJOWorkflowImplementation(
240
        Method method, Class<?> workflowImplementationClass, Map<String, Method> signalHandlers) {
1✔
241
      this.workflowMethod = method;
1✔
242
      this.workflowImplementationClass = workflowImplementationClass;
1✔
243
      this.signalHandlers = signalHandlers;
1✔
244
    }
1✔
245

246
    @Override
247
    public byte[] execute(byte[] input) throws CancellationException, WorkflowExecutionException {
248
      Object[] args = dataConverter.fromDataArray(input, workflowMethod.getGenericParameterTypes());
1✔
249
      try {
250
        newInstance();
1✔
251
        Object result = workflowMethod.invoke(workflow, args);
1✔
252
        if (workflowMethod.getReturnType() == Void.TYPE) {
1✔
253
          return EMPTY_BLOB;
1✔
254
        }
255
        return dataConverter.toData(result);
1✔
256
      } catch (IllegalAccessException e) {
×
257
        throw new Error(mapToWorkflowExecutionException(e, dataConverter));
×
258
      } catch (InvocationTargetException e) {
1✔
259
        Throwable targetException = e.getTargetException();
1✔
260
        if (targetException instanceof Error) {
1✔
261
          throw (Error) targetException;
1✔
262
        }
263
        // Cancellation should be delivered as it impacts which decision closes a workflow.
264
        if (targetException instanceof CancellationException) {
1✔
265
          throw (CancellationException) targetException;
1✔
266
        }
267
        if (log.isErrorEnabled()) {
1✔
268
          WorkflowInfo context = Workflow.getWorkflowInfo();
1✔
269
          log.error(
1✔
270
              "Workflow execution failure "
271
                  + "WorkflowID="
272
                  + context.getWorkflowId()
1✔
273
                  + ", RunID="
274
                  + context.getRunId()
1✔
275
                  + ", WorkflowType="
276
                  + context.getWorkflowType(),
1✔
277
              targetException);
278
        }
279
        // Cast to Exception is safe as Error is handled above.
280
        throw mapToWorkflowExecutionException((Exception) targetException, dataConverter);
1✔
281
      }
282
    }
283

284
    private void newInstance() {
285
      if (workflow == null) {
1✔
286
        Func<?> factory = workflowImplementationFactories.get(workflowImplementationClass);
1✔
287
        if (factory != null) {
1✔
288
          workflow = factory.apply();
1✔
289
        } else {
290
          try {
291
            workflow = workflowImplementationClass.getDeclaredConstructor().newInstance();
1✔
292
          } catch (NoSuchMethodException
×
293
              | InstantiationException
294
              | IllegalAccessException
295
              | InvocationTargetException e) {
296
            // Error to fail decision as this can be fixed by a new deployment.
297
            throw new Error(
×
298
                "Failure instantiating workflow implementation class "
299
                    + workflowImplementationClass.getName(),
×
300
                e);
301
          }
1✔
302
        }
303
        WorkflowInternal.registerQuery(workflow);
1✔
304
      }
305
    }
1✔
306

307
    /**
308
     * Signals that failed to deserialize are logged, but do not lead to workflow or decision
309
     * failure. Otherwise a single bad signal from CLI would kill any workflow. Not that throwing
310
     * Error leads to decision being aborted. Throwing any other exception leads to workflow
311
     * failure. TODO: Unknown and corrupted signals handler in application code or server side DLQ.
312
     */
313
    @Override
314
    public void processSignal(String signalName, byte[] input, long eventId) {
315
      Method signalMethod = signalHandlers.get(signalName);
1✔
316
      if (signalMethod == null) {
1✔
317
        log.error(
×
318
            "Unknown signal: "
319
                + signalName
320
                + " at eventID "
321
                + eventId
322
                + ", knownSignals="
323
                + signalHandlers.keySet());
×
324
        return;
×
325
      }
326

327
      try {
328
        Object[] args = dataConverter.fromDataArray(input, signalMethod.getGenericParameterTypes());
1✔
329
        newInstance();
1✔
330
        signalMethod.invoke(workflow, args);
1✔
331
      } catch (IllegalAccessException e) {
×
332
        String errorMessage =
×
333
            "Failed to process signal \"" + signalName + "\" at eventID " + eventId + ".";
334
        log.error(errorMessage + "\n" + e);
×
335
        throw new Error(errorMessage + " Check cause for details.", e);
×
336
      } catch (DataConverterException e) {
1✔
337
        logSerializationException(signalName, eventId, e);
1✔
338
      } catch (InvocationTargetException e) {
1✔
339
        Throwable targetException = e.getTargetException();
1✔
340
        if (targetException instanceof DataConverterException) {
1✔
341
          logSerializationException(signalName, eventId, (DataConverterException) targetException);
×
342
        } else if (targetException instanceof Error) {
1✔
343
          throw (Error) targetException;
1✔
344
        } else {
345
          String errorMessage =
1✔
346
              "Failed to process signal \"" + signalName + "\" at eventID " + eventId + ".";
347
          log.error(errorMessage + "\n" + targetException);
1✔
348
          throw new Error(errorMessage + " Check cause for details.", targetException);
1✔
349
        }
350
      }
1✔
351
    }
1✔
352
  }
353

354
  void logSerializationException(
355
      String signalName, Long eventId, DataConverterException exception) {
356
    log.error(
1✔
357
        "Failure deserializing signal input for \""
358
            + signalName
359
            + "\" at eventID "
360
            + eventId
361
            + ". Dropping it.",
362
        exception);
363
    Workflow.getMetricsScope().counter(MetricsType.CORRUPTED_SIGNALS_COUNTER).inc(1);
1✔
364
  }
1✔
365

366
  static WorkflowExecutionException mapToWorkflowExecutionException(
367
      Exception failure, DataConverter dataConverter) {
368
    failure = CheckedExceptionWrapper.unwrap(failure);
1✔
369
    // Only expected during unit tests.
370
    if (failure instanceof SimulatedTimeoutException) {
1✔
371
      SimulatedTimeoutException timeoutException = (SimulatedTimeoutException) failure;
1✔
372
      failure =
1✔
373
          new SimulatedTimeoutExceptionInternal(
374
              timeoutException.getTimeoutType(),
1✔
375
              dataConverter.toData(timeoutException.getDetails()));
1✔
376
    }
377

378
    return new WorkflowExecutionException(
1✔
379
        failure.getClass().getName(), dataConverter.toData(failure));
1✔
380
  }
381

382
  static WorkflowExecutionException mapError(Error failure, DataConverter dataConverter) {
383
    return new WorkflowExecutionException(
1✔
384
        failure.getClass().getName(), dataConverter.toData(failure));
1✔
385
  }
386

387
  @Override
388
  public String toString() {
389
    return "POJOWorkflowImplementationFactory{"
×
390
        + "registeredWorkflowTypes="
391
        + workflowDefinitions.keySet()
×
392
        + '}';
393
  }
394
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc