• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

uber / cadence-java-client / 2562

28 Oct 2024 08:51PM UTC coverage: 66.637% (+0.02%) from 66.622%
2562

Pull #929

buildkite

shijiesheng
fix unit test
Pull Request #929: upgrade mockito to 4.5.1

12913 of 19378 relevant lines covered (66.64%)

0.67 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

89.17
/src/main/java/com/uber/cadence/internal/sync/POJOActivityTaskHandler.java
1
/*
2
 *  Copyright 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved.
3
 *
4
 *  Modifications copyright (C) 2017 Uber Technologies, Inc.
5
 *
6
 *  Licensed under the Apache License, Version 2.0 (the "License"). You may not
7
 *  use this file except in compliance with the License. A copy of the License is
8
 *  located at
9
 *
10
 *  http://aws.amazon.com/apache2.0
11
 *
12
 *  or in the "license" file accompanying this file. This file is distributed on
13
 *  an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
14
 *  express or implied. See the License for the specific language governing
15
 *  permissions and limitations under the License.
16
 */
17

18
package com.uber.cadence.internal.sync;
19

20
import com.google.common.base.Joiner;
21
import com.google.common.reflect.TypeToken;
22
import com.google.common.util.concurrent.RateLimiter;
23
import com.uber.cadence.PollForActivityTaskResponse;
24
import com.uber.cadence.RespondActivityTaskCompletedRequest;
25
import com.uber.cadence.RespondActivityTaskFailedRequest;
26
import com.uber.cadence.activity.ActivityMethod;
27
import com.uber.cadence.activity.ActivityTask;
28
import com.uber.cadence.client.ActivityCancelledException;
29
import com.uber.cadence.common.MethodRetry;
30
import com.uber.cadence.converter.DataConverter;
31
import com.uber.cadence.internal.common.CheckedExceptionWrapper;
32
import com.uber.cadence.internal.common.InternalUtils;
33
import com.uber.cadence.internal.metrics.MetricsType;
34
import com.uber.cadence.internal.worker.ActivityTaskHandler;
35
import com.uber.cadence.serviceclient.IWorkflowService;
36
import com.uber.cadence.testing.SimulatedTimeoutException;
37
import com.uber.m3.tally.Scope;
38
import java.lang.reflect.InvocationTargetException;
39
import java.lang.reflect.Method;
40
import java.util.Collections;
41
import java.util.HashMap;
42
import java.util.Map;
43
import java.util.concurrent.CancellationException;
44
import java.util.concurrent.ScheduledExecutorService;
45
import java.util.function.BiFunction;
46

47
class POJOActivityTaskHandler implements ActivityTaskHandler {
48
  private static final RateLimiter metricsRateLimiter = RateLimiter.create(1);
1✔
49

50
  private final DataConverter dataConverter;
51
  private final ScheduledExecutorService heartbeatExecutor;
52
  private final Map<String, ActivityTaskExecutor> activities =
1✔
53
      Collections.synchronizedMap(new HashMap<>());
1✔
54
  private IWorkflowService service;
55
  private final String domain;
56

57
  POJOActivityTaskHandler(
58
      IWorkflowService service,
59
      String domain,
60
      DataConverter dataConverter,
61
      ScheduledExecutorService heartbeatExecutor) {
1✔
62
    this.service = service;
1✔
63
    this.domain = domain;
1✔
64
    this.dataConverter = dataConverter;
1✔
65
    this.heartbeatExecutor = heartbeatExecutor;
1✔
66
  }
1✔
67

68
  private void addActivityImplementation(
69
      Object activity, BiFunction<Method, Object, ActivityTaskExecutor> newTaskExecutor) {
70
    if (activity instanceof Class) {
1✔
71
      throw new IllegalArgumentException("Activity object instance expected, not the class");
×
72
    }
73
    Class<?> cls = activity.getClass();
1✔
74
    for (Method method : cls.getMethods()) {
1✔
75
      if (method.getAnnotation(ActivityMethod.class) != null) {
1✔
76
        throw new IllegalArgumentException(
×
77
            "Found @ActivityMethod annotation on \""
78
                + method
79
                + "\" This annotation can be used only on the interface method it implements.");
80
      }
81
      if (method.getAnnotation(MethodRetry.class) != null) {
1✔
82
        throw new IllegalArgumentException(
×
83
            "Found @MethodRetry annotation on \""
84
                + method
85
                + "\" This annotation can be used only on the interface method it implements.");
86
      }
87
    }
88
    TypeToken<?>.TypeSet interfaces = TypeToken.of(cls).getTypes().interfaces();
1✔
89
    if (interfaces.isEmpty()) {
1✔
90
      throw new IllegalArgumentException("Activity must implement at least one interface");
×
91
    }
92
    for (TypeToken<?> i : interfaces) {
1✔
93
      if (i.getType().getTypeName().startsWith("org.mockito")) {
1✔
94
        continue;
×
95
      }
96
      for (Method method : i.getRawType().getMethods()) {
1✔
97
        ActivityMethod annotation = method.getAnnotation(ActivityMethod.class);
1✔
98
        String activityType;
99
        if (annotation != null && !annotation.name().isEmpty()) {
1✔
100
          activityType = annotation.name();
1✔
101
        } else {
102
          activityType = InternalUtils.getSimpleName(method);
1✔
103
        }
104
        if (activities.containsKey(activityType)) {
1✔
105
          throw new IllegalStateException(
×
106
              activityType + " activity type is already registered with the worker");
107
        }
108

109
        ActivityTaskExecutor implementation = newTaskExecutor.apply(method, activity);
1✔
110
        activities.put(activityType, implementation);
1✔
111
      }
112
    }
1✔
113
  }
1✔
114

115
  private ActivityTaskHandler.Result mapToActivityFailure(
116
      Throwable failure, Scope metricsScope, boolean isLocalActivity) {
117

118
    if (failure instanceof ActivityCancelledException) {
1✔
119
      if (isLocalActivity) {
1✔
120
        metricsScope.counter(MetricsType.LOCAL_ACTIVITY_CANCELED_COUNTER).inc(1);
×
121
      }
122
      throw new CancellationException(failure.getMessage());
1✔
123
    }
124

125
    // Only expected during unit tests.
126
    if (failure instanceof SimulatedTimeoutException) {
1✔
127
      SimulatedTimeoutException timeoutException = (SimulatedTimeoutException) failure;
1✔
128
      failure =
1✔
129
          new SimulatedTimeoutExceptionInternal(
130
              timeoutException.getTimeoutType(),
1✔
131
              dataConverter.toData(timeoutException.getDetails()));
1✔
132
    }
133

134
    if (failure instanceof Error) {
1✔
135
      if (isLocalActivity) {
×
136
        metricsScope.counter(MetricsType.LOCAL_ACTIVITY_ERROR_COUNTER).inc(1);
×
137
      } else {
138
        metricsScope.counter(MetricsType.ACTIVITY_TASK_ERROR_COUNTER).inc(1);
×
139
      }
140
      throw (Error) failure;
×
141
    }
142

143
    if (isLocalActivity) {
1✔
144
      metricsScope.counter(MetricsType.LOCAL_ACTIVITY_FAILED_COUNTER).inc(1);
1✔
145
    } else {
146
      metricsScope.counter(MetricsType.ACTIVITY_EXEC_FAILED_COUNTER).inc(1);
1✔
147
    }
148

149
    RespondActivityTaskFailedRequest result = new RespondActivityTaskFailedRequest();
1✔
150
    failure = CheckedExceptionWrapper.unwrap(failure);
1✔
151
    result.setReason(failure.getClass().getName());
1✔
152
    result.setDetails(dataConverter.toData(failure));
1✔
153
    return new ActivityTaskHandler.Result(null, new Result.TaskFailedResult(result, failure), null);
1✔
154
  }
155

156
  @Override
157
  public boolean isAnyTypeSupported() {
158
    return !activities.isEmpty();
1✔
159
  }
160

161
  void setActivitiesImplementation(Object[] activitiesImplementation) {
162
    activities.clear();
1✔
163
    for (Object activity : activitiesImplementation) {
1✔
164
      addActivityImplementation(activity, POJOActivityImplementation::new);
1✔
165
    }
166
  }
1✔
167

168
  void setLocalActivitiesImplementation(Object[] activitiesImplementation) {
169
    activities.clear();
1✔
170
    for (Object activity : activitiesImplementation) {
1✔
171
      addActivityImplementation(activity, POJOLocalActivityImplementation::new);
1✔
172
    }
173
  }
1✔
174

175
  @Override
176
  public Result handle(
177
      PollForActivityTaskResponse pollResponse, Scope metricsScope, boolean isLocalActivity) {
178
    String activityType = pollResponse.getActivityType().getName();
1✔
179
    ActivityTaskImpl activityTask = new ActivityTaskImpl(pollResponse);
1✔
180
    ActivityTaskExecutor activity = activities.get(activityType);
1✔
181
    if (activity == null) {
1✔
182
      String knownTypes = Joiner.on(", ").join(activities.keySet());
×
183
      return mapToActivityFailure(
×
184
          new IllegalArgumentException(
185
              "Activity Type \""
186
                  + activityType
187
                  + "\" is not registered with a worker. Known types are: "
188
                  + knownTypes),
189
          metricsScope,
190
          isLocalActivity);
191
    }
192
    if (metricsRateLimiter.tryAcquire(1)) {
1✔
193
      if (isLocalActivity) {
1✔
194
        metricsScope
1✔
195
            .gauge(MetricsType.LOCAL_ACTIVITY_ACTIVE_THREAD_COUNT)
1✔
196
            .update(Thread.activeCount());
1✔
197
      } else {
198
        metricsScope.gauge(MetricsType.ACTIVITY_ACTIVE_THREAD_COUNT).update(Thread.activeCount());
1✔
199
      }
200
    }
201
    return activity.execute(activityTask, metricsScope);
1✔
202
  }
203

204
  interface ActivityTaskExecutor {
205
    ActivityTaskHandler.Result execute(ActivityTask task, Scope metricsScope);
206
  }
207

208
  private class POJOActivityImplementation implements ActivityTaskExecutor {
209
    private final Method method;
210
    private final Object activity;
211

212
    POJOActivityImplementation(Method interfaceMethod, Object activity) {
1✔
213
      this.method = interfaceMethod;
1✔
214
      this.activity = activity;
1✔
215
    }
1✔
216

217
    @Override
218
    public ActivityTaskHandler.Result execute(ActivityTask task, Scope metricsScope) {
219
      ActivityExecutionContext context =
1✔
220
          new ActivityExecutionContextImpl(service, domain, task, dataConverter, heartbeatExecutor);
1✔
221
      byte[] input = task.getInput();
1✔
222
      CurrentActivityExecutionContext.set(context);
1✔
223
      try {
224
        Object[] args = dataConverter.fromDataArray(input, method.getGenericParameterTypes());
1✔
225
        Object result = method.invoke(activity, args);
1✔
226
        RespondActivityTaskCompletedRequest request = new RespondActivityTaskCompletedRequest();
1✔
227
        if (context.isDoNotCompleteOnReturn()) {
1✔
228
          return new ActivityTaskHandler.Result(null, null, null);
1✔
229
        }
230
        if (method.getReturnType() != Void.TYPE) {
1✔
231
          request.setResult(dataConverter.toData(result));
1✔
232
        }
233
        return new ActivityTaskHandler.Result(request, null, null);
1✔
234
      } catch (RuntimeException | IllegalAccessException e) {
1✔
235
        return mapToActivityFailure(e, metricsScope, false);
1✔
236
      } catch (InvocationTargetException e) {
1✔
237
        return mapToActivityFailure(e.getTargetException(), metricsScope, false);
1✔
238
      } finally {
239
        CurrentActivityExecutionContext.unset();
1✔
240
      }
241
    }
242
  }
243

244
  private class POJOLocalActivityImplementation implements ActivityTaskExecutor {
245
    private final Method method;
246
    private final Object activity;
247

248
    POJOLocalActivityImplementation(Method interfaceMethod, Object activity) {
1✔
249
      this.method = interfaceMethod;
1✔
250
      this.activity = activity;
1✔
251
    }
1✔
252

253
    @Override
254
    public ActivityTaskHandler.Result execute(ActivityTask task, Scope metricsScope) {
255
      ActivityExecutionContext context =
1✔
256
          new LocalActivityExecutionContextImpl(service, domain, task);
1✔
257
      CurrentActivityExecutionContext.set(context);
1✔
258
      byte[] input = task.getInput();
1✔
259
      try {
260
        Object[] args = dataConverter.fromDataArray(input, method.getGenericParameterTypes());
1✔
261
        Object result = method.invoke(activity, args);
1✔
262
        RespondActivityTaskCompletedRequest request = new RespondActivityTaskCompletedRequest();
1✔
263
        if (method.getReturnType() != Void.TYPE) {
1✔
264
          request.setResult(dataConverter.toData(result));
1✔
265
        }
266
        return new ActivityTaskHandler.Result(request, null, null);
1✔
267
      } catch (RuntimeException | IllegalAccessException e) {
1✔
268
        return mapToActivityFailure(e, metricsScope, true);
1✔
269
      } catch (InvocationTargetException e) {
1✔
270
        return mapToActivityFailure(e.getTargetException(), metricsScope, true);
1✔
271
      } finally {
272
        CurrentActivityExecutionContext.unset();
1✔
273
      }
274
    }
275
  }
276

277
  // This is only for unit test to mock service and set expectations.
278
  void setWorkflowService(IWorkflowService service) {
279
    this.service = service;
1✔
280
  }
1✔
281
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc