• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

temporalio / sdk-java / #157

pending completion
#157

push

github-actions

web-flow
Provide SerializationContext for PayloadConverter and PayloadCodec (#1695)

Issue #1694

497 of 497 new or added lines in 32 files covered. (100.0%)

16942 of 20806 relevant lines covered (81.43%)

0.81 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

83.11
/temporal-testing/src/main/java/io/temporal/testing/TestActivityEnvironmentInternal.java
1
/*
2
 * Copyright (C) 2022 Temporal Technologies, Inc. All Rights Reserved.
3
 *
4
 * Copyright (C) 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved.
5
 *
6
 * Modifications copyright (C) 2017 Uber Technologies, Inc.
7
 *
8
 * Licensed under the Apache License, Version 2.0 (the "License");
9
 * you may not use this material except in compliance with the License.
10
 * You may obtain a copy of the License at
11
 *
12
 *   http://www.apache.org/licenses/LICENSE-2.0
13
 *
14
 * Unless required by applicable law or agreed to in writing, software
15
 * distributed under the License is distributed on an "AS IS" BASIS,
16
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17
 * See the License for the specific language governing permissions and
18
 * limitations under the License.
19
 */
20

21
package io.temporal.testing;
22

23
import com.google.common.base.Defaults;
24
import com.google.protobuf.ByteString;
25
import com.uber.m3.tally.NoopScope;
26
import com.uber.m3.tally.Scope;
27
import io.grpc.StatusRuntimeException;
28
import io.grpc.stub.StreamObserver;
29
import io.temporal.activity.ActivityOptions;
30
import io.temporal.activity.LocalActivityOptions;
31
import io.temporal.api.common.v1.ActivityType;
32
import io.temporal.api.common.v1.Payloads;
33
import io.temporal.api.common.v1.WorkflowExecution;
34
import io.temporal.api.enums.v1.RetryState;
35
import io.temporal.api.workflowservice.v1.PollActivityTaskQueueResponse;
36
import io.temporal.api.workflowservice.v1.RecordActivityTaskHeartbeatRequest;
37
import io.temporal.api.workflowservice.v1.RecordActivityTaskHeartbeatResponse;
38
import io.temporal.api.workflowservice.v1.RespondActivityTaskCanceledRequest;
39
import io.temporal.api.workflowservice.v1.RespondActivityTaskCompletedRequest;
40
import io.temporal.api.workflowservice.v1.RespondActivityTaskFailedRequest;
41
import io.temporal.api.workflowservice.v1.WorkflowServiceGrpc;
42
import io.temporal.common.converter.DataConverter;
43
import io.temporal.common.converter.EncodedValues;
44
import io.temporal.common.interceptors.WorkflowOutboundCallsInterceptor;
45
import io.temporal.failure.ActivityFailure;
46
import io.temporal.failure.CanceledFailure;
47
import io.temporal.internal.activity.ActivityExecutionContextFactory;
48
import io.temporal.internal.activity.ActivityExecutionContextFactoryImpl;
49
import io.temporal.internal.activity.ActivityTaskHandlerImpl;
50
import io.temporal.internal.common.ProtobufTimeUtils;
51
import io.temporal.internal.sync.*;
52
import io.temporal.internal.testservice.InProcessGRPCServer;
53
import io.temporal.internal.worker.ActivityTask;
54
import io.temporal.internal.worker.ActivityTaskHandler;
55
import io.temporal.internal.worker.ActivityTaskHandler.Result;
56
import io.temporal.serviceclient.WorkflowServiceStubs;
57
import io.temporal.serviceclient.WorkflowServiceStubsOptions;
58
import io.temporal.worker.WorkerOptions;
59
import io.temporal.workflow.Functions;
60
import io.temporal.workflow.Functions.Func;
61
import io.temporal.workflow.Promise;
62
import io.temporal.workflow.Workflow;
63
import java.lang.reflect.InvocationHandler;
64
import java.lang.reflect.Type;
65
import java.nio.charset.StandardCharsets;
66
import java.time.Duration;
67
import java.util.*;
68
import java.util.concurrent.*;
69
import java.util.concurrent.atomic.AtomicBoolean;
70
import java.util.concurrent.atomic.AtomicInteger;
71
import java.util.concurrent.atomic.AtomicReference;
72
import java.util.function.BiPredicate;
73
import java.util.function.Supplier;
74
import javax.annotation.Nullable;
75
import org.slf4j.Logger;
76
import org.slf4j.LoggerFactory;
77

78
public final class TestActivityEnvironmentInternal implements TestActivityEnvironment {
79
  private static final Logger log = LoggerFactory.getLogger(TestActivityEnvironmentInternal.class);
1✔
80

81
  private final ScheduledExecutorService heartbeatExecutor = Executors.newScheduledThreadPool(20);
1✔
82
  private final ExecutorService activityWorkerExecutor =
1✔
83
      Executors.newSingleThreadExecutor(r -> new Thread(r, "test-service-activity-worker"));
1✔
84
  private final ExecutorService deterministicRunnerExecutor =
1✔
85
      new ThreadPoolExecutor(
86
          1,
87
          1000,
88
          1,
89
          TimeUnit.SECONDS,
90
          new SynchronousQueue<>(),
91
          r -> new Thread(r, "test-service-deterministic-runner"));
1✔
92
  private final AtomicBoolean cancellationRequested = new AtomicBoolean();
1✔
93
  private final AtomicInteger idSequencer = new AtomicInteger();
1✔
94
  private final InProcessGRPCServer mockServer;
95
  private final ActivityTaskHandlerImpl activityTaskHandler;
96
  private final TestEnvironmentOptions testEnvironmentOptions;
97
  private final WorkflowServiceStubs workflowServiceStubs;
98
  private final AtomicReference<Object> heartbeatDetails = new AtomicReference<>();
1✔
99
  private ClassConsumerPair<Object> activityHeartbeatListener;
100

101
  public TestActivityEnvironmentInternal(@Nullable TestEnvironmentOptions options) {
1✔
102
    // Initialize an in-memory mock service.
103
    this.mockServer =
1✔
104
        new InProcessGRPCServer(Collections.singletonList(new HeartbeatInterceptingService()));
1✔
105
    this.testEnvironmentOptions =
1✔
106
        options != null
1✔
107
            ? TestEnvironmentOptions.newBuilder(options).validateAndBuildWithDefaults()
1✔
108
            : TestEnvironmentOptions.newBuilder().validateAndBuildWithDefaults();
1✔
109
    WorkflowServiceStubsOptions.Builder serviceStubsOptionsBuilder =
1✔
110
        WorkflowServiceStubsOptions.newBuilder(
1✔
111
                testEnvironmentOptions.getWorkflowServiceStubsOptions())
1✔
112
            .setTarget(null)
1✔
113
            .setChannel(this.mockServer.getChannel())
1✔
114
            .setRpcQueryTimeout(Duration.ofSeconds(60));
1✔
115
    Scope metricsScope = this.testEnvironmentOptions.getMetricsScope();
1✔
116
    if (metricsScope != null && !(NoopScope.class.equals(metricsScope.getClass()))) {
1✔
117
      serviceStubsOptionsBuilder.setMetricsScope(metricsScope);
1✔
118
    }
119
    this.workflowServiceStubs =
1✔
120
        WorkflowServiceStubs.newServiceStubs(serviceStubsOptionsBuilder.build());
1✔
121

122
    ActivityExecutionContextFactory activityExecutionContextFactory =
1✔
123
        new ActivityExecutionContextFactoryImpl(
124
            workflowServiceStubs,
125
            testEnvironmentOptions.getWorkflowClientOptions().getIdentity(),
1✔
126
            testEnvironmentOptions.getWorkflowClientOptions().getNamespace(),
1✔
127
            WorkerOptions.getDefaultInstance().getMaxHeartbeatThrottleInterval(),
1✔
128
            WorkerOptions.getDefaultInstance().getDefaultHeartbeatThrottleInterval(),
1✔
129
            testEnvironmentOptions.getWorkflowClientOptions().getDataConverter(),
1✔
130
            heartbeatExecutor);
131
    activityTaskHandler =
1✔
132
        new ActivityTaskHandlerImpl(
133
            testEnvironmentOptions.getWorkflowClientOptions().getNamespace(),
1✔
134
            "test-activity-env-task-queue",
135
            testEnvironmentOptions.getWorkflowClientOptions().getDataConverter(),
1✔
136
            activityExecutionContextFactory,
137
            testEnvironmentOptions.getWorkerFactoryOptions().getWorkerInterceptors(),
1✔
138
            testEnvironmentOptions.getWorkflowClientOptions().getContextPropagators());
1✔
139
  }
1✔
140

141
  private class HeartbeatInterceptingService extends WorkflowServiceGrpc.WorkflowServiceImplBase {
1✔
142
    @Override
143
    public void recordActivityTaskHeartbeat(
144
        RecordActivityTaskHeartbeatRequest request,
145
        StreamObserver<RecordActivityTaskHeartbeatResponse> responseObserver) {
146
      try {
147
        if (activityHeartbeatListener != null) {
1✔
148
          Optional<Payloads> requestDetails =
149
              request.hasDetails() ? Optional.of(request.getDetails()) : Optional.empty();
1✔
150

151
          Object details =
1✔
152
              testEnvironmentOptions
1✔
153
                  .getWorkflowClientOptions()
1✔
154
                  .getDataConverter()
1✔
155
                  .fromPayloads(
1✔
156
                      0,
157
                      requestDetails,
158
                      activityHeartbeatListener.valueClass,
1✔
159
                      activityHeartbeatListener.valueType);
1✔
160
          activityHeartbeatListener.consumer.apply(details);
1✔
161
        }
162
        responseObserver.onNext(
1✔
163
            RecordActivityTaskHeartbeatResponse.newBuilder()
1✔
164
                .setCancelRequested(cancellationRequested.get())
1✔
165
                .build());
1✔
166
        responseObserver.onCompleted();
1✔
167
      } catch (StatusRuntimeException e) {
1✔
168
        responseObserver.onError(e);
1✔
169
      }
1✔
170
    }
1✔
171
  }
172

173
  @Override
174
  public void registerActivitiesImplementations(Object... activityImplementations) {
175
    activityTaskHandler.registerActivityImplementations(activityImplementations);
1✔
176
  }
1✔
177

178
  /**
179
   * Creates client stub to activities that implement given interface.
180
   *
181
   * @param activityInterface interface type implemented by activities
182
   */
183
  @Override
184
  public <T> T newActivityStub(Class<T> activityInterface) {
185
    ActivityOptions options =
186
        ActivityOptions.newBuilder()
1✔
187
            .setScheduleToCloseTimeout(Duration.ofDays(1))
1✔
188
            .setHeartbeatTimeout(Duration.ofSeconds(1))
1✔
189
            .build();
1✔
190
    InvocationHandler invocationHandler =
1✔
191
        ActivityInvocationHandler.newInstance(
1✔
192
            activityInterface, options, null, new TestActivityExecutor());
193
    invocationHandler =
1✔
194
        new DeterministicRunnerWrapper(invocationHandler, deterministicRunnerExecutor::submit);
1✔
195
    return ActivityInvocationHandlerBase.newProxy(activityInterface, invocationHandler);
1✔
196
  }
197

198
  /**
199
   * Creates client stub to activities that implement given interface.
200
   *
201
   * @param activityInterface interface type implemented by activities
202
   * @param options options that specify the activity invocation parameters
203
   */
204
  @Override
205
  public <T> T newActivityStub(Class<T> activityInterface, ActivityOptions options) {
206
    InvocationHandler invocationHandler =
1✔
207
        ActivityInvocationHandler.newInstance(
1✔
208
            activityInterface, options, null, new TestActivityExecutor());
209
    invocationHandler =
1✔
210
        new DeterministicRunnerWrapper(invocationHandler, deterministicRunnerExecutor::submit);
1✔
211
    return ActivityInvocationHandlerBase.newProxy(activityInterface, invocationHandler);
1✔
212
  }
213

214
  /**
215
   * Creates client stub to activities that implement given interface.
216
   *
217
   * @param activityInterface interface type implemented by activities
218
   * @param options options that specify the activity invocation parameters
219
   * @param activityMethodOptions activity method-specific invocation parameters
220
   */
221
  @Override
222
  public <T> T newLocalActivityStub(
223
      Class<T> activityInterface,
224
      LocalActivityOptions options,
225
      Map<String, LocalActivityOptions> activityMethodOptions) {
226
    InvocationHandler invocationHandler =
1✔
227
        LocalActivityInvocationHandler.newInstance(
1✔
228
            activityInterface, options, activityMethodOptions, new TestActivityExecutor());
229
    invocationHandler =
1✔
230
        new DeterministicRunnerWrapper(invocationHandler, deterministicRunnerExecutor::submit);
1✔
231
    return ActivityInvocationHandlerBase.newProxy(activityInterface, invocationHandler);
1✔
232
  }
233

234
  @Override
235
  public void requestCancelActivity() {
236
    cancellationRequested.set(true);
1✔
237
  }
1✔
238

239
  @Override
240
  public <T> void setActivityHeartbeatListener(Class<T> detailsClass, Functions.Proc1<T> listener) {
241
    setActivityHeartbeatListener(detailsClass, detailsClass, listener);
1✔
242
  }
1✔
243

244
  @Override
245
  @SuppressWarnings("unchecked")
246
  public <T> void setActivityHeartbeatListener(
247
      Class<T> detailsClass, Type detailsType, Functions.Proc1<T> listener) {
248
    activityHeartbeatListener = new ClassConsumerPair(detailsClass, detailsType, listener);
1✔
249
  }
1✔
250

251
  @Override
252
  public <T> void setHeartbeatDetails(T details) {
253
    heartbeatDetails.set(details);
1✔
254
  }
1✔
255

256
  @Override
257
  public void close() {
258
    heartbeatExecutor.shutdownNow();
1✔
259
    activityWorkerExecutor.shutdownNow();
1✔
260
    deterministicRunnerExecutor.shutdownNow();
1✔
261
    workflowServiceStubs.shutdown();
1✔
262
    mockServer.shutdown();
1✔
263
    mockServer.awaitTermination(5, TimeUnit.SECONDS);
1✔
264
  }
1✔
265

266
  private class TestActivityExecutor implements WorkflowOutboundCallsInterceptor {
1✔
267

268
    @Override
269
    public <T> ActivityOutput<T> executeActivity(ActivityInput<T> i) {
270
      Optional<Payloads> payloads =
1✔
271
          testEnvironmentOptions
1✔
272
              .getWorkflowClientOptions()
1✔
273
              .getDataConverter()
1✔
274
              .toPayloads(i.getArgs());
1✔
275
      Optional<Payloads> heartbeatPayload =
1✔
276
          Optional.ofNullable(heartbeatDetails.getAndSet(null))
1✔
277
              .flatMap(
1✔
278
                  obj ->
279
                      testEnvironmentOptions
1✔
280
                          .getWorkflowClientOptions()
1✔
281
                          .getDataConverter()
1✔
282
                          .toPayloads(obj));
1✔
283

284
      ActivityOptions options = i.getOptions();
1✔
285
      PollActivityTaskQueueResponse.Builder taskBuilder =
286
          PollActivityTaskQueueResponse.newBuilder()
1✔
287
              .setScheduleToCloseTimeout(
1✔
288
                  ProtobufTimeUtils.toProtoDuration(options.getScheduleToCloseTimeout()))
1✔
289
              .setHeartbeatTimeout(ProtobufTimeUtils.toProtoDuration(options.getHeartbeatTimeout()))
1✔
290
              .setStartToCloseTimeout(
1✔
291
                  ProtobufTimeUtils.toProtoDuration(options.getStartToCloseTimeout()))
1✔
292
              .setScheduledTime(ProtobufTimeUtils.getCurrentProtoTime())
1✔
293
              .setStartedTime(ProtobufTimeUtils.getCurrentProtoTime())
1✔
294
              .setTaskToken(ByteString.copyFrom("test-task-token".getBytes(StandardCharsets.UTF_8)))
1✔
295
              .setActivityId(String.valueOf(idSequencer.incrementAndGet()))
1✔
296
              .setWorkflowExecution(
1✔
297
                  WorkflowExecution.newBuilder()
1✔
298
                      .setWorkflowId("test-workflow-id")
1✔
299
                      .setRunId(UUID.randomUUID().toString())
1✔
300
                      .build())
1✔
301
              .setActivityType(ActivityType.newBuilder().setName(i.getActivityName()).build());
1✔
302
      payloads.ifPresent(taskBuilder::setInput);
1✔
303
      heartbeatPayload.ifPresent(taskBuilder::setHeartbeatDetails);
1✔
304
      PollActivityTaskQueueResponse task = taskBuilder.build();
1✔
305
      return new ActivityOutput<>(
1✔
306
          task.getActivityId(),
1✔
307
          Workflow.newPromise(
1✔
308
              getReply(task, executeActivity(task, false), i.getResultClass(), i.getResultType())));
1✔
309
    }
310

311
    @Override
312
    public <R> LocalActivityOutput<R> executeLocalActivity(LocalActivityInput<R> i) {
313
      Optional<Payloads> payloads =
1✔
314
          testEnvironmentOptions
1✔
315
              .getWorkflowClientOptions()
1✔
316
              .getDataConverter()
1✔
317
              .toPayloads(i.getArgs());
1✔
318
      LocalActivityOptions options = i.getOptions();
1✔
319
      PollActivityTaskQueueResponse.Builder taskBuilder =
320
          PollActivityTaskQueueResponse.newBuilder()
1✔
321
              .setScheduleToCloseTimeout(
1✔
322
                  ProtobufTimeUtils.toProtoDuration(options.getScheduleToCloseTimeout()))
1✔
323
              .setStartToCloseTimeout(
1✔
324
                  ProtobufTimeUtils.toProtoDuration(options.getStartToCloseTimeout()))
1✔
325
              .setScheduledTime(ProtobufTimeUtils.getCurrentProtoTime())
1✔
326
              .setStartedTime(ProtobufTimeUtils.getCurrentProtoTime())
1✔
327
              .setTaskToken(ByteString.copyFrom("test-task-token".getBytes(StandardCharsets.UTF_8)))
1✔
328
              .setActivityId(String.valueOf(idSequencer.incrementAndGet()))
1✔
329
              .setWorkflowExecution(
1✔
330
                  WorkflowExecution.newBuilder()
1✔
331
                      .setWorkflowId("test-workflow-id")
1✔
332
                      .setRunId(UUID.randomUUID().toString())
1✔
333
                      .build())
1✔
334
              .setActivityType(ActivityType.newBuilder().setName(i.getActivityName()).build());
1✔
335
      payloads.ifPresent(taskBuilder::setInput);
1✔
336
      PollActivityTaskQueueResponse task = taskBuilder.build();
1✔
337
      return new LocalActivityOutput<>(
1✔
338
          Workflow.newPromise(
1✔
339
              getReply(task, executeActivity(task, true), i.getResultClass(), i.getResultType())));
1✔
340
    }
341

342
    /**
343
     * We execute the activity task on a separate activity worker to emulate what actually happens
344
     * in our production setup
345
     *
346
     * @param activityTask activity task to execute
347
     * @param localActivity true if it's a local activity
348
     * @return result of activity execution
349
     */
350
    private Result executeActivity(
351
        PollActivityTaskQueueResponse activityTask, boolean localActivity) {
352
      Future<Result> activityFuture =
1✔
353
          activityWorkerExecutor.submit(
1✔
354
              () ->
355
                  activityTaskHandler.handle(
1✔
356
                      new ActivityTask(activityTask, () -> {}),
×
357
                      testEnvironmentOptions.getMetricsScope(),
1✔
358
                      localActivity));
359

360
      try {
361
        // 10 seconds is just a "reasonable" wait to not make an infinite waiting
362
        return activityFuture.get(10, TimeUnit.SECONDS);
1✔
363
      } catch (InterruptedException e) {
×
364
        Thread.currentThread().interrupt();
×
365
        throw new RuntimeException(e);
×
366
      } catch (ExecutionException e) {
×
367
        log.error("Exception during processing of activity task");
×
368
        throw new RuntimeException(e);
×
369
      } catch (TimeoutException e) {
×
370
        log.error("Timeout trying execute activity task {}", activityTask);
×
371
        throw new RuntimeException(e);
×
372
      }
373
    }
374

375
    @Override
376
    public <R> ChildWorkflowOutput<R> executeChildWorkflow(ChildWorkflowInput<R> input) {
377
      throw new UnsupportedOperationException("not implemented");
×
378
    }
379

380
    @Override
381
    public Random newRandom() {
382
      throw new UnsupportedOperationException("not implemented");
×
383
    }
384

385
    @Override
386
    public SignalExternalOutput signalExternalWorkflow(SignalExternalInput input) {
387
      throw new UnsupportedOperationException("not implemented");
×
388
    }
389

390
    @Override
391
    public CancelWorkflowOutput cancelWorkflow(CancelWorkflowInput input) {
392
      throw new UnsupportedOperationException("not implemented");
×
393
    }
394

395
    @Override
396
    public void sleep(Duration duration) {
397
      throw new UnsupportedOperationException("not implemented");
×
398
    }
399

400
    @Override
401
    public boolean await(Duration timeout, String reason, Supplier<Boolean> unblockCondition) {
402
      throw new UnsupportedOperationException("not implemented");
×
403
    }
404

405
    @Override
406
    public void await(String reason, Supplier<Boolean> unblockCondition) {
407
      throw new UnsupportedOperationException("not implemented");
×
408
    }
409

410
    @Override
411
    public Promise<Void> newTimer(Duration duration) {
412
      throw new UnsupportedOperationException("not implemented");
×
413
    }
414

415
    @Override
416
    public <R> R sideEffect(Class<R> resultClass, Type resultType, Func<R> func) {
417
      throw new UnsupportedOperationException("not implemented");
×
418
    }
419

420
    @Override
421
    public <R> R mutableSideEffect(
422
        String id, Class<R> resultClass, Type resultType, BiPredicate<R, R> updated, Func<R> func) {
423
      throw new UnsupportedOperationException("not implemented");
×
424
    }
425

426
    @Override
427
    public int getVersion(String changeId, int minSupported, int maxSupported) {
428
      throw new UnsupportedOperationException("not implemented");
×
429
    }
430

431
    @Override
432
    public void continueAsNew(ContinueAsNewInput input) {
433
      throw new UnsupportedOperationException("not implemented");
×
434
    }
435

436
    @Override
437
    public void registerQuery(RegisterQueryInput input) {
438
      throw new UnsupportedOperationException("not implemented");
×
439
    }
440

441
    @Override
442
    public void registerSignalHandlers(RegisterSignalHandlersInput input) {
443
      throw new UnsupportedOperationException("not implemented");
×
444
    }
445

446
    @Override
447
    public void registerDynamicSignalHandler(RegisterDynamicSignalHandlerInput input) {
448
      throw new UnsupportedOperationException("not implemented");
×
449
    }
450

451
    @Override
452
    public void registerDynamicQueryHandler(RegisterDynamicQueryHandlerInput input) {
453
      throw new UnsupportedOperationException("not implemented");
×
454
    }
455

456
    @Override
457
    public UUID randomUUID() {
458
      throw new UnsupportedOperationException("not implemented");
×
459
    }
460

461
    @Override
462
    public void upsertSearchAttributes(Map<String, ?> searchAttributes) {
463
      throw new UnsupportedOperationException("not implemented");
×
464
    }
465

466
    @Override
467
    public Object newChildThread(Runnable runnable, boolean detached, String name) {
468
      throw new UnsupportedOperationException("not implemented");
×
469
    }
470

471
    @Override
472
    public long currentTimeMillis() {
473
      throw new UnsupportedOperationException("not implemented");
×
474
    }
475

476
    private <T> T getReply(
477
        PollActivityTaskQueueResponse task,
478
        ActivityTaskHandler.Result response,
479
        Class<T> resultClass,
480
        Type resultType) {
481
      DataConverter dataConverter =
1✔
482
          testEnvironmentOptions.getWorkflowClientOptions().getDataConverter();
1✔
483
      RespondActivityTaskCompletedRequest taskCompleted = response.getTaskCompleted();
1✔
484
      if (taskCompleted != null) {
1✔
485
        Optional<Payloads> result =
486
            taskCompleted.hasResult() ? Optional.of(taskCompleted.getResult()) : Optional.empty();
1✔
487
        return dataConverter.fromPayloads(0, result, resultClass, resultType);
1✔
488
      } else {
489
        RespondActivityTaskFailedRequest taskFailed =
1✔
490
            response.getTaskFailed().getTaskFailedRequest();
1✔
491
        if (taskFailed != null) {
1✔
492
          Exception cause = dataConverter.failureToException(taskFailed.getFailure());
1✔
493
          throw new ActivityFailure(
1✔
494
              taskFailed.getFailure().getMessage(),
1✔
495
              0,
496
              0,
497
              task.getActivityType().getName(),
1✔
498
              task.getActivityId(),
1✔
499
              RetryState.RETRY_STATE_NON_RETRYABLE_FAILURE,
500
              "TestActivityEnvironment",
501
              cause);
502
        } else {
503
          RespondActivityTaskCanceledRequest taskCanceled = response.getTaskCanceled();
×
504
          if (taskCanceled != null) {
×
505
            throw new CanceledFailure(
×
506
                "canceled",
507
                new EncodedValues(
508
                    taskCanceled.hasDetails()
×
509
                        ? Optional.of(taskCanceled.getDetails())
×
510
                        : Optional.empty(),
×
511
                    dataConverter),
512
                null);
513
          }
514
        }
515
      }
516
      return Defaults.defaultValue(resultClass);
×
517
    }
518
  }
519

520
  private static class ClassConsumerPair<T> {
521

522
    final Functions.Proc1<T> consumer;
523
    final Class<T> valueClass;
524
    final Type valueType;
525

526
    ClassConsumerPair(Class<T> valueClass, Type valueType, Functions.Proc1<T> consumer) {
1✔
527
      this.valueClass = Objects.requireNonNull(valueClass);
1✔
528
      this.valueType = Objects.requireNonNull(valueType);
1✔
529
      this.consumer = Objects.requireNonNull(consumer);
1✔
530
    }
1✔
531
  }
532
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc