• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

temporalio / sdk-java / #157

pending completion
#157

push

github-actions

web-flow
Provide SerializationContext for PayloadConverter and PayloadCodec (#1695)

Issue #1694

497 of 497 new or added lines in 32 files covered. (100.0%)

16942 of 20806 relevant lines covered (81.43%)

0.81 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

92.59
/temporal-sdk/src/main/java/io/temporal/internal/activity/ActivityTaskExecutors.java
1
/*
2
 * Copyright (C) 2022 Temporal Technologies, Inc. All Rights Reserved.
3
 *
4
 * Copyright (C) 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved.
5
 *
6
 * Modifications copyright (C) 2017 Uber Technologies, Inc.
7
 *
8
 * Licensed under the Apache License, Version 2.0 (the "License");
9
 * you may not use this material except in compliance with the License.
10
 * You may obtain a copy of the License at
11
 *
12
 *   http://www.apache.org/licenses/LICENSE-2.0
13
 *
14
 * Unless required by applicable law or agreed to in writing, software
15
 * distributed under the License is distributed on an "AS IS" BASIS,
16
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17
 * See the License for the specific language governing permissions and
18
 * limitations under the License.
19
 */
20

21
package io.temporal.internal.activity;
22

23
import static io.temporal.internal.activity.ActivityTaskHandlerImpl.mapToActivityFailure;
24

25
import com.uber.m3.tally.Scope;
26
import io.temporal.activity.ActivityExecutionContext;
27
import io.temporal.activity.ActivityInfo;
28
import io.temporal.activity.DynamicActivity;
29
import io.temporal.api.common.v1.Payload;
30
import io.temporal.api.common.v1.Payloads;
31
import io.temporal.api.workflowservice.v1.RespondActivityTaskCompletedRequest;
32
import io.temporal.client.ActivityCanceledException;
33
import io.temporal.common.context.ContextPropagator;
34
import io.temporal.common.converter.DataConverter;
35
import io.temporal.common.converter.EncodedValues;
36
import io.temporal.common.interceptors.ActivityInboundCallsInterceptor;
37
import io.temporal.common.interceptors.ActivityInboundCallsInterceptor.ActivityOutput;
38
import io.temporal.common.interceptors.Header;
39
import io.temporal.common.interceptors.WorkerInterceptor;
40
import io.temporal.internal.worker.ActivityTaskHandler;
41
import io.temporal.payload.context.ActivitySerializationContext;
42
import io.temporal.serviceclient.CheckedExceptionWrapper;
43
import java.lang.reflect.Method;
44
import java.util.HashMap;
45
import java.util.List;
46
import java.util.Map;
47
import java.util.Optional;
48
import javax.annotation.Nonnull;
49
import javax.annotation.Nullable;
50
import org.slf4j.Logger;
51
import org.slf4j.LoggerFactory;
52

53
final class ActivityTaskExecutors {
×
54
  static final Logger log = LoggerFactory.getLogger(ActivityTaskExecutor.class);
1✔
55

56
  interface ActivityTaskExecutor {
57
    ActivityTaskHandler.Result execute(ActivityInfoInternal info, Scope metricsScope);
58
  }
59

60
  abstract static class BaseActivityTaskExecutor implements ActivityTaskExecutor {
61
    private final DataConverter dataConverter;
62
    private final List<ContextPropagator> contextPropagators;
63
    private final WorkerInterceptor[] interceptors;
64
    private final ActivityExecutionContextFactory executionContextFactory;
65

66
    public BaseActivityTaskExecutor(
67
        DataConverter dataConverter,
68
        List<ContextPropagator> contextPropagators,
69
        WorkerInterceptor[] interceptors,
70
        ActivityExecutionContextFactory executionContextFactory) {
1✔
71
      this.dataConverter = dataConverter;
1✔
72
      this.contextPropagators = contextPropagators;
1✔
73
      this.interceptors = interceptors;
1✔
74
      this.executionContextFactory = executionContextFactory;
1✔
75
    }
1✔
76

77
    @Override
78
    public ActivityTaskHandler.Result execute(ActivityInfoInternal info, Scope metricsScope) {
79
      ActivityExecutionContext context = executionContextFactory.createContext(info, metricsScope);
1✔
80
      ActivityInfo activityInfo = context.getInfo();
1✔
81
      ActivitySerializationContext serializationContext =
1✔
82
          new ActivitySerializationContext(
83
              activityInfo.getNamespace(),
1✔
84
              activityInfo.getWorkflowId(),
1✔
85
              activityInfo.getWorkflowType(),
1✔
86
              activityInfo.getActivityType(),
1✔
87
              activityInfo.getActivityTaskQueue(),
1✔
88
              activityInfo.isLocal());
1✔
89
      DataConverter dataConverterWithActivityContext =
1✔
90
          dataConverter.withContext(serializationContext);
1✔
91

92
      try {
93
        info.getHeader()
1✔
94
            .ifPresent(value -> deserializeAndPopulateContext(value, contextPropagators));
1✔
95

96
        ActivityInboundCallsInterceptor inboundCallsInterceptor = createRootInboundInterceptor();
1✔
97
        for (WorkerInterceptor interceptor : interceptors) {
1✔
98
          inboundCallsInterceptor = interceptor.interceptActivity(inboundCallsInterceptor);
1✔
99
        }
100
        inboundCallsInterceptor.init(context);
1✔
101

102
        Object[] args = provideArgs(info.getInput(), dataConverterWithActivityContext);
1✔
103
        Header header =
1✔
104
            new Header(
105
                info.getHeader().orElse(io.temporal.api.common.v1.Header.getDefaultInstance()));
1✔
106
        ActivityOutput result =
1✔
107
            inboundCallsInterceptor.execute(
1✔
108
                new ActivityInboundCallsInterceptor.ActivityInput(header, args));
109
        if (context.isDoNotCompleteOnReturn()) {
1✔
110
          return new ActivityTaskHandler.Result(
1✔
111
              info.getActivityId(), null, null, null, context.isUseLocalManualCompletion());
1✔
112
        }
113

114
        return this.constructSuccessfulResultValue(info, result, dataConverterWithActivityContext);
1✔
115
      } catch (Throwable e) {
1✔
116
        Throwable ex = CheckedExceptionWrapper.unwrap(e);
1✔
117
        boolean local = info.isLocal();
1✔
118
        if (ex instanceof ActivityCanceledException) {
1✔
119
          log.info(
×
120
              "{} canceled. ActivityId={}, activityType={}, attempt={}",
121
              local ? "Local activity" : "Activity",
×
122
              info.getActivityId(),
×
123
              info.getActivityType(),
×
124
              info.getAttempt());
×
125
        } else {
126
          log.warn(
1✔
127
              "{} failure. ActivityId={}, activityType={}, attempt={}",
128
              local ? "Local activity" : "Activity",
1✔
129
              info.getActivityId(),
1✔
130
              info.getActivityType(),
1✔
131
              info.getAttempt(),
1✔
132
              ex);
133
        }
134

135
        return mapToActivityFailure(
1✔
136
            ex, info.getActivityId(), metricsScope, local, dataConverterWithActivityContext);
1✔
137
      }
138
    }
139

140
    abstract ActivityInboundCallsInterceptor createRootInboundInterceptor();
141

142
    abstract Object[] provideArgs(
143
        Optional<Payloads> input, DataConverter dataConverterWithActivityContext);
144

145
    protected abstract ActivityTaskHandler.Result constructSuccessfulResultValue(
146
        ActivityInfoInternal info,
147
        ActivityOutput result,
148
        DataConverter dataConverterWithActivityContext);
149

150
    ActivityTaskHandler.Result constructResultValue(
151
        ActivityInfoInternal info,
152
        @Nullable ActivityOutput result,
153
        DataConverter dataConverterWithActivityContext) {
154
      RespondActivityTaskCompletedRequest.Builder request =
155
          RespondActivityTaskCompletedRequest.newBuilder();
1✔
156
      if (result != null) {
1✔
157
        Optional<Payloads> serialized =
1✔
158
            dataConverterWithActivityContext.toPayloads(result.getResult());
1✔
159
        serialized.ifPresent(request::setResult);
1✔
160
      }
161
      return new ActivityTaskHandler.Result(
1✔
162
          info.getActivityId(), request.build(), null, null, false);
1✔
163
    }
164

165
    static void deserializeAndPopulateContext(
166
        @Nonnull io.temporal.api.common.v1.Header header,
167
        @Nullable List<ContextPropagator> contextPropagatorList) {
168
      if (contextPropagatorList == null || contextPropagatorList.isEmpty()) {
1✔
169
        return;
1✔
170
      }
171

172
      Map<String, Payload> headerData = new HashMap<>(header.getFieldsMap());
1✔
173
      for (ContextPropagator propagator : contextPropagatorList) {
1✔
174
        propagator.setCurrentContext(propagator.deserializeContext(headerData));
1✔
175
      }
1✔
176
    }
1✔
177
  }
178

179
  static class POJOActivityImplementation extends BaseActivityTaskExecutor {
180
    private final Method method;
181
    private final Object activity;
182

183
    POJOActivityImplementation(
184
        Method interfaceMethod,
185
        Object activity,
186
        DataConverter dataConverter,
187
        List<ContextPropagator> contextPropagators,
188
        WorkerInterceptor[] interceptors,
189
        ActivityExecutionContextFactory executionContextFactory) {
190
      super(dataConverter, contextPropagators, interceptors, executionContextFactory);
1✔
191
      this.method = interfaceMethod;
1✔
192
      this.activity = activity;
1✔
193
    }
1✔
194

195
    @Override
196
    ActivityInboundCallsInterceptor createRootInboundInterceptor() {
197
      return new RootActivityInboundCallsInterceptor.POJOActivityInboundCallsInterceptor(
1✔
198
          activity, method);
199
    }
200

201
    @Override
202
    Object[] provideArgs(Optional<Payloads> input, DataConverter dataConverterWithActivityContext) {
203
      return DataConverter.arrayFromPayloads(
1✔
204
          dataConverterWithActivityContext,
205
          input,
206
          method.getParameterTypes(),
1✔
207
          method.getGenericParameterTypes());
1✔
208
    }
209

210
    @Override
211
    protected ActivityTaskHandler.Result constructSuccessfulResultValue(
212
        ActivityInfoInternal info,
213
        ActivityOutput result,
214
        DataConverter dataConverterWithActivityContext) {
215
      return constructResultValue(
1✔
216
          info,
217
          // if the expected result of the method is null, we don't publish result at all
218
          method.getReturnType() != Void.TYPE ? result : null,
1✔
219
          dataConverterWithActivityContext);
220
    }
221
  }
222

223
  static class DynamicActivityImplementation extends BaseActivityTaskExecutor {
224
    private final DynamicActivity activity;
225

226
    DynamicActivityImplementation(
227
        DynamicActivity activity,
228
        DataConverter dataConverter,
229
        List<ContextPropagator> contextPropagators,
230
        WorkerInterceptor[] interceptors,
231
        ActivityExecutionContextFactory executionContextFactory) {
232
      super(dataConverter, contextPropagators, interceptors, executionContextFactory);
1✔
233
      this.activity = activity;
1✔
234
    }
1✔
235

236
    @Override
237
    ActivityInboundCallsInterceptor createRootInboundInterceptor() {
238
      return new RootActivityInboundCallsInterceptor.DynamicActivityInboundCallsInterceptor(
1✔
239
          activity);
240
    }
241

242
    @Override
243
    Object[] provideArgs(Optional<Payloads> input, DataConverter dataConverterWithActivityContext) {
244
      EncodedValues encodedValues = new EncodedValues(input, dataConverterWithActivityContext);
1✔
245
      return new Object[] {encodedValues};
1✔
246
    }
247

248
    @Override
249
    protected ActivityTaskHandler.Result constructSuccessfulResultValue(
250
        ActivityInfoInternal info,
251
        ActivityOutput result,
252
        DataConverter dataConverterWithActivityContext) {
253
      return constructResultValue(info, result, dataConverterWithActivityContext);
1✔
254
    }
255
  }
256
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc