• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

temporalio / sdk-java / #157

pending completion
#157

push

github-actions

web-flow
Provide SerializationContext for PayloadConverter and PayloadCodec (#1695)

Issue #1694

497 of 497 new or added lines in 32 files covered. (100.0%)

16942 of 20806 relevant lines covered (81.43%)

0.81 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

78.46
/temporal-sdk/src/main/java/io/temporal/internal/activity/ActivityTaskHandlerImpl.java
1
/*
2
 * Copyright (C) 2022 Temporal Technologies, Inc. All Rights Reserved.
3
 *
4
 * Copyright (C) 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved.
5
 *
6
 * Modifications copyright (C) 2017 Uber Technologies, Inc.
7
 *
8
 * Licensed under the Apache License, Version 2.0 (the "License");
9
 * you may not use this material except in compliance with the License.
10
 * You may obtain a copy of the License at
11
 *
12
 *   http://www.apache.org/licenses/LICENSE-2.0
13
 *
14
 * Unless required by applicable law or agreed to in writing, software
15
 * distributed under the License is distributed on an "AS IS" BASIS,
16
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17
 * See the License for the specific language governing permissions and
18
 * limitations under the License.
19
 */
20

21
package io.temporal.internal.activity;
22

23
import com.google.common.base.Joiner;
24
import com.uber.m3.tally.Scope;
25
import com.uber.m3.util.ImmutableMap;
26
import io.temporal.activity.DynamicActivity;
27
import io.temporal.api.failure.v1.Failure;
28
import io.temporal.api.workflowservice.v1.PollActivityTaskQueueResponseOrBuilder;
29
import io.temporal.api.workflowservice.v1.RespondActivityTaskCanceledRequest;
30
import io.temporal.api.workflowservice.v1.RespondActivityTaskFailedRequest;
31
import io.temporal.client.ActivityCanceledException;
32
import io.temporal.common.context.ContextPropagator;
33
import io.temporal.common.converter.DataConverter;
34
import io.temporal.common.interceptors.WorkerInterceptor;
35
import io.temporal.common.metadata.POJOActivityImplMetadata;
36
import io.temporal.common.metadata.POJOActivityMethodMetadata;
37
import io.temporal.internal.activity.ActivityTaskExecutors.ActivityTaskExecutor;
38
import io.temporal.internal.worker.ActivityTask;
39
import io.temporal.internal.worker.ActivityTaskHandler;
40
import io.temporal.serviceclient.MetricsTag;
41
import io.temporal.worker.MetricsType;
42
import io.temporal.worker.TypeAlreadyRegisteredException;
43
import java.lang.reflect.Method;
44
import java.util.*;
45
import javax.annotation.Nonnull;
46
import javax.annotation.Nullable;
47

48
public final class ActivityTaskHandlerImpl implements ActivityTaskHandler {
49
  private final DataConverter dataConverter;
50
  private final String namespace;
51
  private final String taskQueue;
52
  private final ActivityExecutionContextFactory executionContextFactory;
53
  // <ActivityType, Implementation>
54
  private final Map<String, ActivityTaskExecutor> activities =
1✔
55
      Collections.synchronizedMap(new HashMap<>());
1✔
56
  private ActivityTaskExecutor dynamicActivity;
57
  private final WorkerInterceptor[] interceptors;
58
  private final List<ContextPropagator> contextPropagators;
59

60
  public ActivityTaskHandlerImpl(
61
      @Nonnull String namespace,
62
      @Nonnull String taskQueue,
63
      @Nonnull DataConverter dataConverter,
64
      @Nonnull ActivityExecutionContextFactory executionContextFactory,
65
      @Nonnull WorkerInterceptor[] interceptors,
66
      @Nullable List<ContextPropagator> contextPropagators) {
1✔
67
    this.namespace = Objects.requireNonNull(namespace);
1✔
68
    this.taskQueue = Objects.requireNonNull(taskQueue);
1✔
69
    this.dataConverter = Objects.requireNonNull(dataConverter);
1✔
70
    this.executionContextFactory = Objects.requireNonNull(executionContextFactory);
1✔
71
    this.interceptors = Objects.requireNonNull(interceptors);
1✔
72
    this.contextPropagators = contextPropagators;
1✔
73
  }
1✔
74

75
  @Override
76
  public boolean isAnyTypeSupported() {
77
    return !activities.isEmpty() || dynamicActivity != null;
1✔
78
  }
79

80
  @Override
81
  public boolean isTypeSupported(String type) {
82
    return activities.get(type) != null || dynamicActivity != null;
1✔
83
  }
84

85
  public void registerActivityImplementations(Object[] activitiesImplementation) {
86
    for (Object activity : activitiesImplementation) {
1✔
87
      registerActivityImplementation(activity);
1✔
88
    }
89
  }
1✔
90

91
  @Override
92
  public Result handle(ActivityTask activityTask, Scope metricsScope, boolean localActivity) {
93
    PollActivityTaskQueueResponseOrBuilder pollResponse = activityTask.getResponse();
1✔
94
    String activityType = pollResponse.getActivityType().getName();
1✔
95
    ActivityInfoInternal activityInfo =
1✔
96
        new ActivityInfoImpl(
97
            pollResponse,
98
            this.namespace,
99
            this.taskQueue,
100
            localActivity,
101
            activityTask.getCompletionCallback());
1✔
102
    ActivityTaskExecutor activity = activities.get(activityType);
1✔
103
    if (activity != null) {
1✔
104
      return activity.execute(activityInfo, metricsScope);
1✔
105
    }
106
    if (dynamicActivity != null) {
1✔
107
      return dynamicActivity.execute(activityInfo, metricsScope);
1✔
108
    }
109

110
    // unregistered activity
111
    try {
112
      String knownTypes = Joiner.on(", ").join(activities.keySet());
×
113
      throw new IllegalArgumentException(
×
114
          "Activity Type \""
115
              + activityType
116
              + "\" is not registered with a worker. Known types are: "
117
              + knownTypes);
118
    } catch (Exception exception) {
×
119
      return mapToActivityFailure(
×
120
          exception, pollResponse.getActivityId(), metricsScope, localActivity, dataConverter);
×
121
    }
122
  }
123

124
  private void registerActivityImplementation(Object activity) {
125
    if (activity instanceof Class) {
1✔
126
      throw new IllegalArgumentException("Activity object instance expected, not the class");
×
127
    }
128
    if (activity instanceof DynamicActivity) {
1✔
129
      if (dynamicActivity != null) {
1✔
130
        throw new TypeAlreadyRegisteredException(
×
131
            "DynamicActivity",
132
            "An implementation of DynamicActivity is already registered with the worker");
133
      }
134
      dynamicActivity =
1✔
135
          new ActivityTaskExecutors.DynamicActivityImplementation(
136
              (DynamicActivity) activity,
137
              dataConverter,
138
              contextPropagators,
139
              interceptors,
140
              executionContextFactory);
141
    } else {
142
      Class<?> cls = activity.getClass();
1✔
143
      POJOActivityImplMetadata activityImplMetadata = POJOActivityImplMetadata.newInstance(cls);
1✔
144
      for (POJOActivityMethodMetadata activityMetadata :
145
          activityImplMetadata.getActivityMethods()) {
1✔
146
        String typeName = activityMetadata.getActivityTypeName();
1✔
147
        if (activities.containsKey(typeName)) {
1✔
148
          throw new TypeAlreadyRegisteredException(
1✔
149
              typeName, "\"" + typeName + "\" activity type is already registered with the worker");
150
        }
151
        Method method = activityMetadata.getMethod();
1✔
152
        ActivityTaskExecutor implementation =
1✔
153
            new ActivityTaskExecutors.POJOActivityImplementation(
154
                method,
155
                activity,
156
                dataConverter,
157
                contextPropagators,
158
                interceptors,
159
                executionContextFactory);
160
        activities.put(typeName, implementation);
1✔
161
      }
1✔
162
    }
163
  }
1✔
164

165
  @SuppressWarnings("deprecation")
166
  static ActivityTaskHandler.Result mapToActivityFailure(
167
      Throwable exception,
168
      String activityId,
169
      Scope metricsScope,
170
      boolean isLocalActivity,
171
      DataConverter dataConverter) {
172
    if (exception instanceof ActivityCanceledException) {
1✔
173
      if (isLocalActivity) {
×
174
        metricsScope.counter(MetricsType.LOCAL_ACTIVITY_EXEC_CANCELLED_COUNTER).inc(1);
×
175
        metricsScope.counter(MetricsType.LOCAL_ACTIVITY_CANCELED_COUNTER).inc(1);
×
176
      } else {
177
        metricsScope.counter(MetricsType.ACTIVITY_EXEC_CANCELLED_COUNTER).inc(1);
×
178
        metricsScope.counter(MetricsType.ACTIVITY_CANCELED_COUNTER).inc(1);
×
179
      }
180
      return new ActivityTaskHandler.Result(
×
181
          activityId, null, null, RespondActivityTaskCanceledRequest.newBuilder().build(), false);
×
182
    }
183
    Scope ms =
1✔
184
        metricsScope.tagged(
1✔
185
            ImmutableMap.of(MetricsTag.EXCEPTION, exception.getClass().getSimpleName()));
1✔
186
    if (isLocalActivity) {
1✔
187
      ms.counter(MetricsType.LOCAL_ACTIVITY_EXEC_FAILED_COUNTER).inc(1);
1✔
188
      ms.counter(MetricsType.LOCAL_ACTIVITY_FAILED_COUNTER).inc(1);
1✔
189
    } else {
190
      ms.counter(MetricsType.ACTIVITY_EXEC_FAILED_COUNTER).inc(1);
1✔
191
    }
192
    Failure failure = dataConverter.exceptionToFailure(exception);
1✔
193
    RespondActivityTaskFailedRequest.Builder result =
194
        RespondActivityTaskFailedRequest.newBuilder().setFailure(failure);
1✔
195
    return new ActivityTaskHandler.Result(
1✔
196
        activityId,
197
        null,
198
        new ActivityTaskHandler.Result.TaskFailedResult(result.build(), exception),
1✔
199
        null,
200
        false);
201
  }
202
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc