• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

temporalio / sdk-java / #157

pending completion
#157

push

github-actions

web-flow
Provide SerializationContext for PayloadConverter and PayloadCodec (#1695)

Issue #1694

497 of 497 new or added lines in 32 files covered. (100.0%)

16942 of 20806 relevant lines covered (81.43%)

0.81 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

75.21
/temporal-sdk/src/main/java/io/temporal/common/converter/CodecDataConverter.java
1
/*
2
 * Copyright (C) 2022 Temporal Technologies, Inc. All Rights Reserved.
3
 *
4
 * Copyright (C) 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved.
5
 *
6
 * Modifications copyright (C) 2017 Uber Technologies, Inc.
7
 *
8
 * Licensed under the Apache License, Version 2.0 (the "License");
9
 * you may not use this material except in compliance with the License.
10
 * You may obtain a copy of the License at
11
 *
12
 *   http://www.apache.org/licenses/LICENSE-2.0
13
 *
14
 * Unless required by applicable law or agreed to in writing, software
15
 * distributed under the License is distributed on an "AS IS" BASIS,
16
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17
 * See the License for the specific language governing permissions and
18
 * limitations under the License.
19
 */
20

21
package io.temporal.common.converter;
22

23
import com.fasterxml.jackson.annotation.JsonProperty;
24
import com.google.common.base.Preconditions;
25
import io.temporal.api.common.v1.Payload;
26
import io.temporal.api.common.v1.Payloads;
27
import io.temporal.api.failure.v1.ApplicationFailureInfo;
28
import io.temporal.api.failure.v1.CanceledFailureInfo;
29
import io.temporal.api.failure.v1.Failure;
30
import io.temporal.api.failure.v1.ResetWorkflowFailureInfo;
31
import io.temporal.api.failure.v1.TimeoutFailureInfo;
32
import io.temporal.failure.TemporalFailure;
33
import io.temporal.payload.codec.ChainCodec;
34
import io.temporal.payload.codec.PayloadCodec;
35
import io.temporal.payload.context.SerializationContext;
36
import java.lang.reflect.Type;
37
import java.util.Collection;
38
import java.util.Collections;
39
import java.util.List;
40
import java.util.Optional;
41
import javax.annotation.Nonnull;
42
import javax.annotation.Nullable;
43

44
/**
45
 * A delegating {@link DataConverter} implementation that wraps and chains both another {@link
46
 * DataConverter} and several {@link PayloadCodec}s.
47
 *
48
 * <p>The underlying {@link DataConverter} is expected to be responsible for conversion between user
49
 * objects and bytes represented as {@link Payloads}, while the underlying chain of codecs is
50
 * responsible for a subsequent byte &lt;-&gt; byte manipulation such as encryption or compression
51
 */
52
public class CodecDataConverter implements DataConverter, PayloadCodec {
53
  private static final String ENCODED_FAILURE_MESSAGE = "Encoded failure";
54

55
  private final DataConverter dataConverter;
56
  private final ChainCodec chainCodec;
57
  private final boolean encodeFailureAttributes;
58
  private final @Nullable SerializationContext serializationContext;
59

60
  /**
61
   * When serializing to Payloads:
62
   *
63
   * <ul>
64
   *   <li>{@code dataConverter} is applied first, following by the chain of {@code codecs}.
65
   *   <li>{@code codecs} are applied last to first meaning the earlier encoders wrap the later ones
66
   * </ul>
67
   *
68
   * When deserializing from Payloads:
69
   *
70
   * <ul>
71
   *   <li>{@code codecs} are applied first to last to reverse the effect following by the {@code
72
   *       dataConverter}
73
   *   <li>{@code dataConverter} is applied last
74
   * </ul>
75
   *
76
   * See {@link #CodecDataConverter(DataConverter, Collection, boolean)} to enable encryption of
77
   * Failure attributes.
78
   *
79
   * @param dataConverter to delegate data conversion to
80
   * @param codecs to delegate bytes encoding/decoding to. When encoding, the codecs are applied
81
   *     last to first meaning the earlier encoders wrap the later ones. When decoding, the decoders
82
   *     are applied first to last to reverse the effect
83
   */
84
  public CodecDataConverter(DataConverter dataConverter, Collection<PayloadCodec> codecs) {
85
    this(dataConverter, codecs, false);
1✔
86
  }
1✔
87

88
  /**
89
   * When serializing to Payloads:
90
   *
91
   * <ul>
92
   *   <li>{@code dataConverter} is applied first, following by the chain of {@code codecs}.
93
   *   <li>{@code codecs} are applied last to first meaning the earlier encoders wrap the later ones
94
   * </ul>
95
   *
96
   * When deserializing from Payloads:
97
   *
98
   * <ul>
99
   *   <li>{@code codecs} are applied first to last to reverse the effect following by the {@code
100
   *       dataConverter}
101
   *   <li>{@code dataConverter} is applied last
102
   * </ul>
103
   *
104
   * Setting {@code encodeFailureAttributes} to true enables codec encoding of Failure attributes.
105
   * This can be used in conjunction with an encrypting codec to enable encryption of failures
106
   * message and stack traces. Note that failure's details are always codec-encoded, without regard
107
   * to {@code encodeFailureAttributes}.
108
   *
109
   * @param dataConverter to delegate data conversion to
110
   * @param codecs to delegate bytes encoding/decoding to. When encoding, the codecs are applied
111
   *     last to first meaning the earlier encoders wrap the later ones. When decoding, the decoders
112
   *     are applied first to last to reverse the effect
113
   * @param encodeFailureAttributes enable encoding of Failure attributes (message and stack trace)
114
   */
115
  public CodecDataConverter(
116
      DataConverter dataConverter,
117
      Collection<PayloadCodec> codecs,
118
      boolean encodeFailureAttributes) {
119
    this(dataConverter, new ChainCodec(codecs), encodeFailureAttributes, null);
1✔
120
  }
1✔
121

122
  CodecDataConverter(
123
      DataConverter dataConverter,
124
      ChainCodec codecs,
125
      boolean encodeFailureAttributes,
126
      @Nullable SerializationContext serializationContext) {
1✔
127
    this.dataConverter = dataConverter;
1✔
128
    this.chainCodec = codecs;
1✔
129
    this.encodeFailureAttributes = encodeFailureAttributes;
1✔
130
    this.serializationContext = serializationContext;
1✔
131
  }
1✔
132

133
  @Override
134
  public <T> Optional<Payload> toPayload(T value) {
135
    Optional<Payload> payload =
1✔
136
        ConverterUtils.withContext(dataConverter, serializationContext).toPayload(value);
1✔
137
    List<Payload> encodedPayloads =
1✔
138
        ConverterUtils.withContext(chainCodec, serializationContext)
1✔
139
            .encode(Collections.singletonList(payload.get()));
1✔
140
    Preconditions.checkState(encodedPayloads.size() == 1, "Expected one encoded payload");
1✔
141
    return Optional.of(encodedPayloads.get(0));
1✔
142
  }
143

144
  @Override
145
  public <T> T fromPayload(Payload payload, Class<T> valueClass, Type valueType) {
146
    List<Payload> decodedPayload =
1✔
147
        ConverterUtils.withContext(chainCodec, serializationContext)
1✔
148
            .decode(Collections.singletonList(payload));
1✔
149
    Preconditions.checkState(decodedPayload.size() == 1, "Expected one decoded payload");
1✔
150
    return ConverterUtils.withContext(dataConverter, serializationContext)
1✔
151
        .fromPayload(decodedPayload.get(0), valueClass, valueType);
1✔
152
  }
153

154
  @Override
155
  public Optional<Payloads> toPayloads(Object... values) throws DataConverterException {
156
    Optional<Payloads> payloads =
1✔
157
        ConverterUtils.withContext(dataConverter, serializationContext).toPayloads(values);
1✔
158
    if (payloads.isPresent()) {
1✔
159
      List<Payload> encodedPayloads =
1✔
160
          ConverterUtils.withContext(chainCodec, serializationContext)
1✔
161
              .encode(payloads.get().getPayloadsList());
1✔
162
      payloads = Optional.of(Payloads.newBuilder().addAllPayloads(encodedPayloads).build());
1✔
163
    }
164
    return payloads;
1✔
165
  }
166

167
  @Override
168
  public <T> T fromPayloads(
169
      int index, Optional<Payloads> content, Class<T> valueType, Type valueGenericType)
170
      throws DataConverterException {
171
    if (content.isPresent()) {
1✔
172
      content = Optional.of(decodePayloads(content.get()));
1✔
173
    }
174
    return ConverterUtils.withContext(dataConverter, serializationContext)
1✔
175
        .fromPayloads(index, content, valueType, valueGenericType);
1✔
176
  }
177

178
  @Override
179
  @Nonnull
180
  public Failure exceptionToFailure(@Nonnull Throwable throwable) {
181
    Preconditions.checkNotNull(throwable, "throwable");
1✔
182
    return this.encodeFailure(
1✔
183
            ConverterUtils.withContext(dataConverter, serializationContext)
1✔
184
                .exceptionToFailure(throwable)
1✔
185
                .toBuilder())
1✔
186
        .build();
1✔
187
  }
188

189
  @Override
190
  @Nonnull
191
  public TemporalFailure failureToException(@Nonnull Failure failure) {
192
    Preconditions.checkNotNull(failure, "failure");
1✔
193
    return ConverterUtils.withContext(dataConverter, serializationContext)
1✔
194
        .failureToException(this.decodeFailure(failure.toBuilder()).build());
1✔
195
  }
196

197
  @Nonnull
198
  @Override
199
  public CodecDataConverter withContext(@Nonnull SerializationContext context) {
200
    return new CodecDataConverter(dataConverter, chainCodec, encodeFailureAttributes, context);
1✔
201
  }
202

203
  @Nonnull
204
  @Override
205
  public List<Payload> encode(@Nonnull List<Payload> payloads) {
206
    return ConverterUtils.withContext(chainCodec, serializationContext).encode(payloads);
1✔
207
  }
208

209
  @Nonnull
210
  @Override
211
  public List<Payload> decode(@Nonnull List<Payload> payloads) {
212
    return ConverterUtils.withContext(chainCodec, serializationContext).decode(payloads);
1✔
213
  }
214

215
  private Failure.Builder encodeFailure(Failure.Builder failure) {
216
    if (failure.hasCause()) {
1✔
217
      failure.setCause(encodeFailure(failure.getCause().toBuilder()));
1✔
218
    }
219
    if (this.encodeFailureAttributes) {
1✔
220
      EncodedAttributes encodedAttributes = new EncodedAttributes();
1✔
221
      encodedAttributes.setStackTrace(failure.getStackTrace());
1✔
222
      encodedAttributes.setMessage(failure.getMessage());
1✔
223
      Payload encodedAttributesPayload = toPayload(Optional.of(encodedAttributes)).get();
1✔
224
      failure
1✔
225
          .setEncodedAttributes(encodedAttributesPayload)
1✔
226
          .setMessage(ENCODED_FAILURE_MESSAGE)
1✔
227
          .setStackTrace("");
1✔
228
    }
229
    switch (failure.getFailureInfoCase()) {
1✔
230
      case APPLICATION_FAILURE_INFO:
231
        {
232
          ApplicationFailureInfo.Builder info = failure.getApplicationFailureInfo().toBuilder();
1✔
233
          if (info.hasDetails()) {
1✔
234
            info.setDetails(encodePayloads(info.getDetails()));
1✔
235
          }
236
          failure.setApplicationFailureInfo(info);
1✔
237
        }
238
        break;
1✔
239
      case TIMEOUT_FAILURE_INFO:
240
        {
241
          TimeoutFailureInfo.Builder info = failure.getTimeoutFailureInfo().toBuilder();
×
242
          if (info.hasLastHeartbeatDetails()) {
×
243
            info.setLastHeartbeatDetails(encodePayloads(info.getLastHeartbeatDetails()));
×
244
          }
245
          failure.setTimeoutFailureInfo(info);
×
246
        }
247
        break;
×
248
      case CANCELED_FAILURE_INFO:
249
        {
250
          CanceledFailureInfo.Builder info = failure.getCanceledFailureInfo().toBuilder();
×
251
          if (info.hasDetails()) {
×
252
            info.setDetails(encodePayloads(info.getDetails()));
×
253
          }
254
          failure.setCanceledFailureInfo(info);
×
255
        }
256
        break;
×
257
      case RESET_WORKFLOW_FAILURE_INFO:
258
        {
259
          ResetWorkflowFailureInfo.Builder info = failure.getResetWorkflowFailureInfo().toBuilder();
×
260
          if (info.hasLastHeartbeatDetails()) {
×
261
            info.setLastHeartbeatDetails(encodePayloads(info.getLastHeartbeatDetails()));
×
262
          }
263
          failure.setResetWorkflowFailureInfo(info);
×
264
        }
265
        break;
×
266
      default:
267
        {
268
          // Other type of failure info don't have anything to encode
269
        }
270
    }
271
    return failure;
1✔
272
  }
273

274
  private Failure.Builder decodeFailure(Failure.Builder failure) {
275
    if (failure.hasCause()) {
1✔
276
      failure.setCause(decodeFailure(failure.getCause().toBuilder()));
1✔
277
    }
278
    if (failure.hasEncodedAttributes()) {
1✔
279
      EncodedAttributes encodedAttributes =
1✔
280
          fromPayload(
1✔
281
              failure.getEncodedAttributes(), EncodedAttributes.class, EncodedAttributes.class);
1✔
282
      failure
1✔
283
          .setStackTrace(encodedAttributes.getStackTrace())
1✔
284
          .setMessage(encodedAttributes.getMessage())
1✔
285
          .clearEncodedAttributes();
1✔
286
    }
287
    switch (failure.getFailureInfoCase()) {
1✔
288
      case APPLICATION_FAILURE_INFO:
289
        {
290
          ApplicationFailureInfo.Builder info = failure.getApplicationFailureInfo().toBuilder();
1✔
291
          if (info.hasDetails()) {
1✔
292
            info.setDetails(decodePayloads(info.getDetails()));
1✔
293
          }
294
          failure.setApplicationFailureInfo(info);
1✔
295
        }
296
        break;
1✔
297
      case TIMEOUT_FAILURE_INFO:
298
        {
299
          TimeoutFailureInfo.Builder info = failure.getTimeoutFailureInfo().toBuilder();
×
300
          if (info.hasLastHeartbeatDetails()) {
×
301
            info.setLastHeartbeatDetails(decodePayloads(info.getLastHeartbeatDetails()));
×
302
          }
303
          failure.setTimeoutFailureInfo(info);
×
304
        }
305
        break;
×
306
      case CANCELED_FAILURE_INFO:
307
        {
308
          CanceledFailureInfo.Builder info = failure.getCanceledFailureInfo().toBuilder();
×
309
          if (info.hasDetails()) {
×
310
            info.setDetails(decodePayloads(info.getDetails()));
×
311
          }
312
          failure.setCanceledFailureInfo(info);
×
313
        }
314
        break;
×
315
      case RESET_WORKFLOW_FAILURE_INFO:
316
        {
317
          ResetWorkflowFailureInfo.Builder info = failure.getResetWorkflowFailureInfo().toBuilder();
×
318
          if (info.hasLastHeartbeatDetails()) {
×
319
            info.setLastHeartbeatDetails(decodePayloads(info.getLastHeartbeatDetails()));
×
320
          }
321
          failure.setResetWorkflowFailureInfo(info);
×
322
        }
323
        break;
×
324
      default:
325
        {
326
          // Other type of failure info don't have anything to decode
327
        }
328
    }
329
    return failure;
1✔
330
  }
331

332
  private Payloads encodePayloads(Payloads decodedPayloads) {
333
    return Payloads.newBuilder().addAllPayloads(encode(decodedPayloads.getPayloadsList())).build();
1✔
334
  }
335

336
  private Payloads decodePayloads(Payloads encodedPayloads) {
337
    return Payloads.newBuilder().addAllPayloads(decode(encodedPayloads.getPayloadsList())).build();
1✔
338
  }
339

340
  static class EncodedAttributes {
1✔
341
    private String message;
342

343
    private String stackTrace;
344

345
    public String getMessage() {
346
      return message;
1✔
347
    }
348

349
    public void setMessage(String message) {
350
      this.message = message;
1✔
351
    }
1✔
352

353
    @JsonProperty("stack_trace")
354
    public String getStackTrace() {
355
      return stackTrace;
1✔
356
    }
357

358
    @JsonProperty("stack_trace")
359
    public void setStackTrace(String stackTrace) {
360
      this.stackTrace = stackTrace;
1✔
361
    }
1✔
362
  }
363
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc