• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

temporalio / sdk-java / #181

pending completion
#181

push

github-actions

web-flow
Properly wrap exceptions from schedule client (#1827)

Wrap schedule exception

37 of 37 new or added lines in 1 file covered. (100.0%)

18557 of 23894 relevant lines covered (77.66%)

0.78 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

82.22
/temporal-testing/src/main/java/io/temporal/testing/TestActivityEnvironmentInternal.java
1
/*
2
 * Copyright (C) 2022 Temporal Technologies, Inc. All Rights Reserved.
3
 *
4
 * Copyright (C) 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved.
5
 *
6
 * Modifications copyright (C) 2017 Uber Technologies, Inc.
7
 *
8
 * Licensed under the Apache License, Version 2.0 (the "License");
9
 * you may not use this material except in compliance with the License.
10
 * You may obtain a copy of the License at
11
 *
12
 *   http://www.apache.org/licenses/LICENSE-2.0
13
 *
14
 * Unless required by applicable law or agreed to in writing, software
15
 * distributed under the License is distributed on an "AS IS" BASIS,
16
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17
 * See the License for the specific language governing permissions and
18
 * limitations under the License.
19
 */
20

21
package io.temporal.testing;
22

23
import com.google.common.base.Defaults;
24
import com.google.protobuf.ByteString;
25
import com.uber.m3.tally.NoopScope;
26
import com.uber.m3.tally.Scope;
27
import io.grpc.StatusRuntimeException;
28
import io.grpc.stub.StreamObserver;
29
import io.temporal.activity.ActivityOptions;
30
import io.temporal.activity.LocalActivityOptions;
31
import io.temporal.api.common.v1.ActivityType;
32
import io.temporal.api.common.v1.Payloads;
33
import io.temporal.api.common.v1.WorkflowExecution;
34
import io.temporal.api.enums.v1.RetryState;
35
import io.temporal.api.workflowservice.v1.PollActivityTaskQueueResponse;
36
import io.temporal.api.workflowservice.v1.RecordActivityTaskHeartbeatRequest;
37
import io.temporal.api.workflowservice.v1.RecordActivityTaskHeartbeatResponse;
38
import io.temporal.api.workflowservice.v1.RespondActivityTaskCanceledRequest;
39
import io.temporal.api.workflowservice.v1.RespondActivityTaskCompletedRequest;
40
import io.temporal.api.workflowservice.v1.RespondActivityTaskFailedRequest;
41
import io.temporal.api.workflowservice.v1.WorkflowServiceGrpc;
42
import io.temporal.common.SearchAttributeUpdate;
43
import io.temporal.common.converter.DataConverter;
44
import io.temporal.common.converter.EncodedValues;
45
import io.temporal.common.interceptors.WorkflowOutboundCallsInterceptor;
46
import io.temporal.failure.ActivityFailure;
47
import io.temporal.failure.CanceledFailure;
48
import io.temporal.internal.activity.ActivityExecutionContextFactory;
49
import io.temporal.internal.activity.ActivityExecutionContextFactoryImpl;
50
import io.temporal.internal.activity.ActivityTaskHandlerImpl;
51
import io.temporal.internal.common.ProtobufTimeUtils;
52
import io.temporal.internal.sync.*;
53
import io.temporal.internal.testservice.InProcessGRPCServer;
54
import io.temporal.internal.worker.ActivityTask;
55
import io.temporal.internal.worker.ActivityTaskHandler;
56
import io.temporal.internal.worker.ActivityTaskHandler.Result;
57
import io.temporal.serviceclient.WorkflowServiceStubs;
58
import io.temporal.serviceclient.WorkflowServiceStubsOptions;
59
import io.temporal.worker.WorkerOptions;
60
import io.temporal.workflow.Functions;
61
import io.temporal.workflow.Functions.Func;
62
import io.temporal.workflow.Promise;
63
import io.temporal.workflow.Workflow;
64
import java.lang.reflect.InvocationHandler;
65
import java.lang.reflect.Type;
66
import java.nio.charset.StandardCharsets;
67
import java.time.Duration;
68
import java.util.*;
69
import java.util.concurrent.*;
70
import java.util.concurrent.atomic.AtomicBoolean;
71
import java.util.concurrent.atomic.AtomicInteger;
72
import java.util.concurrent.atomic.AtomicReference;
73
import java.util.function.BiPredicate;
74
import java.util.function.Supplier;
75
import javax.annotation.Nullable;
76
import org.slf4j.Logger;
77
import org.slf4j.LoggerFactory;
78

79
public final class TestActivityEnvironmentInternal implements TestActivityEnvironment {
80
  private static final Logger log = LoggerFactory.getLogger(TestActivityEnvironmentInternal.class);
1✔
81

82
  private final ScheduledExecutorService heartbeatExecutor = Executors.newScheduledThreadPool(20);
1✔
83
  private final ExecutorService activityWorkerExecutor =
1✔
84
      Executors.newSingleThreadExecutor(r -> new Thread(r, "test-service-activity-worker"));
1✔
85
  private final ExecutorService deterministicRunnerExecutor =
1✔
86
      new ThreadPoolExecutor(
87
          1,
88
          1000,
89
          1,
90
          TimeUnit.SECONDS,
91
          new SynchronousQueue<>(),
92
          r -> new Thread(r, "test-service-deterministic-runner"));
1✔
93
  private final AtomicBoolean cancellationRequested = new AtomicBoolean();
1✔
94
  private final AtomicInteger idSequencer = new AtomicInteger();
1✔
95
  private final InProcessGRPCServer mockServer;
96
  private final ActivityTaskHandlerImpl activityTaskHandler;
97
  private final TestEnvironmentOptions testEnvironmentOptions;
98
  private final WorkflowServiceStubs workflowServiceStubs;
99
  private final AtomicReference<Object> heartbeatDetails = new AtomicReference<>();
1✔
100
  private ClassConsumerPair<Object> activityHeartbeatListener;
101

102
  public TestActivityEnvironmentInternal(@Nullable TestEnvironmentOptions options) {
1✔
103
    // Initialize an in-memory mock service.
104
    this.mockServer =
1✔
105
        new InProcessGRPCServer(Collections.singletonList(new HeartbeatInterceptingService()));
1✔
106
    this.testEnvironmentOptions =
1✔
107
        options != null
1✔
108
            ? TestEnvironmentOptions.newBuilder(options).validateAndBuildWithDefaults()
1✔
109
            : TestEnvironmentOptions.newBuilder().validateAndBuildWithDefaults();
1✔
110
    WorkflowServiceStubsOptions.Builder serviceStubsOptionsBuilder =
1✔
111
        WorkflowServiceStubsOptions.newBuilder(
1✔
112
                testEnvironmentOptions.getWorkflowServiceStubsOptions())
1✔
113
            .setTarget(null)
1✔
114
            .setChannel(this.mockServer.getChannel())
1✔
115
            .setRpcQueryTimeout(Duration.ofSeconds(60));
1✔
116
    Scope metricsScope = this.testEnvironmentOptions.getMetricsScope();
1✔
117
    if (metricsScope != null && !(NoopScope.class.equals(metricsScope.getClass()))) {
1✔
118
      serviceStubsOptionsBuilder.setMetricsScope(metricsScope);
1✔
119
    }
120
    this.workflowServiceStubs =
1✔
121
        WorkflowServiceStubs.newServiceStubs(serviceStubsOptionsBuilder.build());
1✔
122

123
    ActivityExecutionContextFactory activityExecutionContextFactory =
1✔
124
        new ActivityExecutionContextFactoryImpl(
125
            workflowServiceStubs,
126
            testEnvironmentOptions.getWorkflowClientOptions().getIdentity(),
1✔
127
            testEnvironmentOptions.getWorkflowClientOptions().getNamespace(),
1✔
128
            WorkerOptions.getDefaultInstance().getMaxHeartbeatThrottleInterval(),
1✔
129
            WorkerOptions.getDefaultInstance().getDefaultHeartbeatThrottleInterval(),
1✔
130
            testEnvironmentOptions.getWorkflowClientOptions().getDataConverter(),
1✔
131
            heartbeatExecutor);
132
    activityTaskHandler =
1✔
133
        new ActivityTaskHandlerImpl(
134
            testEnvironmentOptions.getWorkflowClientOptions().getNamespace(),
1✔
135
            "test-activity-env-task-queue",
136
            testEnvironmentOptions.getWorkflowClientOptions().getDataConverter(),
1✔
137
            activityExecutionContextFactory,
138
            testEnvironmentOptions.getWorkerFactoryOptions().getWorkerInterceptors(),
1✔
139
            testEnvironmentOptions.getWorkflowClientOptions().getContextPropagators());
1✔
140
  }
1✔
141

142
  private class HeartbeatInterceptingService extends WorkflowServiceGrpc.WorkflowServiceImplBase {
1✔
143
    @Override
144
    public void recordActivityTaskHeartbeat(
145
        RecordActivityTaskHeartbeatRequest request,
146
        StreamObserver<RecordActivityTaskHeartbeatResponse> responseObserver) {
147
      try {
148
        if (activityHeartbeatListener != null) {
1✔
149
          Optional<Payloads> requestDetails =
150
              request.hasDetails() ? Optional.of(request.getDetails()) : Optional.empty();
1✔
151

152
          Object details =
1✔
153
              testEnvironmentOptions
1✔
154
                  .getWorkflowClientOptions()
1✔
155
                  .getDataConverter()
1✔
156
                  .fromPayloads(
1✔
157
                      0,
158
                      requestDetails,
159
                      activityHeartbeatListener.valueClass,
1✔
160
                      activityHeartbeatListener.valueType);
1✔
161
          activityHeartbeatListener.consumer.apply(details);
1✔
162
        }
163
        responseObserver.onNext(
1✔
164
            RecordActivityTaskHeartbeatResponse.newBuilder()
1✔
165
                .setCancelRequested(cancellationRequested.get())
1✔
166
                .build());
1✔
167
        responseObserver.onCompleted();
1✔
168
      } catch (StatusRuntimeException e) {
1✔
169
        responseObserver.onError(e);
1✔
170
      }
1✔
171
    }
1✔
172
  }
173

174
  @Override
175
  public void registerActivitiesImplementations(Object... activityImplementations) {
176
    activityTaskHandler.registerActivityImplementations(activityImplementations);
1✔
177
  }
1✔
178

179
  /**
180
   * Creates client stub to activities that implement given interface.
181
   *
182
   * @param activityInterface interface type implemented by activities
183
   */
184
  @Override
185
  public <T> T newActivityStub(Class<T> activityInterface) {
186
    ActivityOptions options =
187
        ActivityOptions.newBuilder()
1✔
188
            .setScheduleToCloseTimeout(Duration.ofDays(1))
1✔
189
            .setHeartbeatTimeout(Duration.ofSeconds(1))
1✔
190
            .build();
1✔
191
    InvocationHandler invocationHandler =
1✔
192
        ActivityInvocationHandler.newInstance(
1✔
193
            activityInterface, options, null, new TestActivityExecutor(), () -> {});
1✔
194
    invocationHandler =
1✔
195
        new DeterministicRunnerWrapper(invocationHandler, deterministicRunnerExecutor::submit);
1✔
196
    return ActivityInvocationHandlerBase.newProxy(activityInterface, invocationHandler);
1✔
197
  }
198

199
  /**
200
   * Creates client stub to activities that implement given interface.
201
   *
202
   * @param activityInterface interface type implemented by activities
203
   * @param options options that specify the activity invocation parameters
204
   */
205
  @Override
206
  public <T> T newActivityStub(Class<T> activityInterface, ActivityOptions options) {
207
    InvocationHandler invocationHandler =
1✔
208
        ActivityInvocationHandler.newInstance(
1✔
209
            activityInterface, options, null, new TestActivityExecutor(), () -> {});
1✔
210
    invocationHandler =
1✔
211
        new DeterministicRunnerWrapper(invocationHandler, deterministicRunnerExecutor::submit);
1✔
212
    return ActivityInvocationHandlerBase.newProxy(activityInterface, invocationHandler);
1✔
213
  }
214

215
  /**
216
   * Creates client stub to activities that implement given interface.
217
   *
218
   * @param activityInterface interface type implemented by activities
219
   * @param options options that specify the activity invocation parameters
220
   * @param activityMethodOptions activity method-specific invocation parameters
221
   */
222
  @Override
223
  public <T> T newLocalActivityStub(
224
      Class<T> activityInterface,
225
      LocalActivityOptions options,
226
      Map<String, LocalActivityOptions> activityMethodOptions) {
227
    InvocationHandler invocationHandler =
1✔
228
        LocalActivityInvocationHandler.newInstance(
1✔
229
            activityInterface,
230
            options,
231
            activityMethodOptions,
232
            new TestActivityExecutor(),
233
            () -> {});
1✔
234
    invocationHandler =
1✔
235
        new DeterministicRunnerWrapper(invocationHandler, deterministicRunnerExecutor::submit);
1✔
236
    return ActivityInvocationHandlerBase.newProxy(activityInterface, invocationHandler);
1✔
237
  }
238

239
  @Override
240
  public void requestCancelActivity() {
241
    cancellationRequested.set(true);
1✔
242
  }
1✔
243

244
  @Override
245
  public <T> void setActivityHeartbeatListener(Class<T> detailsClass, Functions.Proc1<T> listener) {
246
    setActivityHeartbeatListener(detailsClass, detailsClass, listener);
1✔
247
  }
1✔
248

249
  @Override
250
  @SuppressWarnings("unchecked")
251
  public <T> void setActivityHeartbeatListener(
252
      Class<T> detailsClass, Type detailsType, Functions.Proc1<T> listener) {
253
    activityHeartbeatListener = new ClassConsumerPair(detailsClass, detailsType, listener);
1✔
254
  }
1✔
255

256
  @Override
257
  public <T> void setHeartbeatDetails(T details) {
258
    heartbeatDetails.set(details);
1✔
259
  }
1✔
260

261
  @Override
262
  public void close() {
263
    heartbeatExecutor.shutdownNow();
1✔
264
    activityWorkerExecutor.shutdownNow();
1✔
265
    deterministicRunnerExecutor.shutdownNow();
1✔
266
    workflowServiceStubs.shutdown();
1✔
267
    mockServer.shutdown();
1✔
268
    mockServer.awaitTermination(5, TimeUnit.SECONDS);
1✔
269
  }
1✔
270

271
  private class TestActivityExecutor implements WorkflowOutboundCallsInterceptor {
1✔
272

273
    @Override
274
    public <T> ActivityOutput<T> executeActivity(ActivityInput<T> i) {
275
      Optional<Payloads> payloads =
1✔
276
          testEnvironmentOptions
1✔
277
              .getWorkflowClientOptions()
1✔
278
              .getDataConverter()
1✔
279
              .toPayloads(i.getArgs());
1✔
280
      Optional<Payloads> heartbeatPayload =
1✔
281
          Optional.ofNullable(heartbeatDetails.getAndSet(null))
1✔
282
              .flatMap(
1✔
283
                  obj ->
284
                      testEnvironmentOptions
1✔
285
                          .getWorkflowClientOptions()
1✔
286
                          .getDataConverter()
1✔
287
                          .toPayloads(obj));
1✔
288

289
      ActivityOptions options = i.getOptions();
1✔
290
      PollActivityTaskQueueResponse.Builder taskBuilder =
291
          PollActivityTaskQueueResponse.newBuilder()
1✔
292
              .setScheduleToCloseTimeout(
1✔
293
                  ProtobufTimeUtils.toProtoDuration(options.getScheduleToCloseTimeout()))
1✔
294
              .setHeartbeatTimeout(ProtobufTimeUtils.toProtoDuration(options.getHeartbeatTimeout()))
1✔
295
              .setStartToCloseTimeout(
1✔
296
                  ProtobufTimeUtils.toProtoDuration(options.getStartToCloseTimeout()))
1✔
297
              .setScheduledTime(ProtobufTimeUtils.getCurrentProtoTime())
1✔
298
              .setStartedTime(ProtobufTimeUtils.getCurrentProtoTime())
1✔
299
              .setTaskToken(ByteString.copyFrom("test-task-token".getBytes(StandardCharsets.UTF_8)))
1✔
300
              .setActivityId(String.valueOf(idSequencer.incrementAndGet()))
1✔
301
              .setWorkflowExecution(
1✔
302
                  WorkflowExecution.newBuilder()
1✔
303
                      .setWorkflowId("test-workflow-id")
1✔
304
                      .setRunId(UUID.randomUUID().toString())
1✔
305
                      .build())
1✔
306
              .setActivityType(ActivityType.newBuilder().setName(i.getActivityName()).build());
1✔
307
      payloads.ifPresent(taskBuilder::setInput);
1✔
308
      heartbeatPayload.ifPresent(taskBuilder::setHeartbeatDetails);
1✔
309
      PollActivityTaskQueueResponse task = taskBuilder.build();
1✔
310
      return new ActivityOutput<>(
1✔
311
          task.getActivityId(),
1✔
312
          Workflow.newPromise(
1✔
313
              getReply(task, executeActivity(task, false), i.getResultClass(), i.getResultType())));
1✔
314
    }
315

316
    @Override
317
    public <R> LocalActivityOutput<R> executeLocalActivity(LocalActivityInput<R> i) {
318
      Optional<Payloads> payloads =
1✔
319
          testEnvironmentOptions
1✔
320
              .getWorkflowClientOptions()
1✔
321
              .getDataConverter()
1✔
322
              .toPayloads(i.getArgs());
1✔
323
      LocalActivityOptions options = i.getOptions();
1✔
324
      PollActivityTaskQueueResponse.Builder taskBuilder =
325
          PollActivityTaskQueueResponse.newBuilder()
1✔
326
              .setScheduleToCloseTimeout(
1✔
327
                  ProtobufTimeUtils.toProtoDuration(options.getScheduleToCloseTimeout()))
1✔
328
              .setStartToCloseTimeout(
1✔
329
                  ProtobufTimeUtils.toProtoDuration(options.getStartToCloseTimeout()))
1✔
330
              .setScheduledTime(ProtobufTimeUtils.getCurrentProtoTime())
1✔
331
              .setStartedTime(ProtobufTimeUtils.getCurrentProtoTime())
1✔
332
              .setTaskToken(ByteString.copyFrom("test-task-token".getBytes(StandardCharsets.UTF_8)))
1✔
333
              .setActivityId(String.valueOf(idSequencer.incrementAndGet()))
1✔
334
              .setWorkflowExecution(
1✔
335
                  WorkflowExecution.newBuilder()
1✔
336
                      .setWorkflowId("test-workflow-id")
1✔
337
                      .setRunId(UUID.randomUUID().toString())
1✔
338
                      .build())
1✔
339
              .setActivityType(ActivityType.newBuilder().setName(i.getActivityName()).build());
1✔
340
      payloads.ifPresent(taskBuilder::setInput);
1✔
341
      PollActivityTaskQueueResponse task = taskBuilder.build();
1✔
342
      return new LocalActivityOutput<>(
1✔
343
          Workflow.newPromise(
1✔
344
              getReply(task, executeActivity(task, true), i.getResultClass(), i.getResultType())));
1✔
345
    }
346

347
    /**
348
     * We execute the activity task on a separate activity worker to emulate what actually happens
349
     * in our production setup
350
     *
351
     * @param activityTask activity task to execute
352
     * @param localActivity true if it's a local activity
353
     * @return result of activity execution
354
     */
355
    private Result executeActivity(
356
        PollActivityTaskQueueResponse activityTask, boolean localActivity) {
357
      Future<Result> activityFuture =
1✔
358
          activityWorkerExecutor.submit(
1✔
359
              () ->
360
                  activityTaskHandler.handle(
1✔
361
                      new ActivityTask(activityTask, () -> {}),
×
362
                      testEnvironmentOptions.getMetricsScope(),
1✔
363
                      localActivity));
364

365
      try {
366
        // 10 seconds is just a "reasonable" wait to not make an infinite waiting
367
        return activityFuture.get(10, TimeUnit.SECONDS);
1✔
368
      } catch (InterruptedException e) {
×
369
        Thread.currentThread().interrupt();
×
370
        throw new RuntimeException(e);
×
371
      } catch (ExecutionException e) {
×
372
        log.error("Exception during processing of activity task");
×
373
        throw new RuntimeException(e);
×
374
      } catch (TimeoutException e) {
×
375
        log.error("Timeout trying execute activity task {}", activityTask);
×
376
        throw new RuntimeException(e);
×
377
      }
378
    }
379

380
    @Override
381
    public <R> ChildWorkflowOutput<R> executeChildWorkflow(ChildWorkflowInput<R> input) {
382
      throw new UnsupportedOperationException("not implemented");
×
383
    }
384

385
    @Override
386
    public Random newRandom() {
387
      throw new UnsupportedOperationException("not implemented");
×
388
    }
389

390
    @Override
391
    public SignalExternalOutput signalExternalWorkflow(SignalExternalInput input) {
392
      throw new UnsupportedOperationException("not implemented");
×
393
    }
394

395
    @Override
396
    public CancelWorkflowOutput cancelWorkflow(CancelWorkflowInput input) {
397
      throw new UnsupportedOperationException("not implemented");
×
398
    }
399

400
    @Override
401
    public void sleep(Duration duration) {
402
      throw new UnsupportedOperationException("not implemented");
×
403
    }
404

405
    @Override
406
    public boolean await(Duration timeout, String reason, Supplier<Boolean> unblockCondition) {
407
      throw new UnsupportedOperationException("not implemented");
×
408
    }
409

410
    @Override
411
    public void await(String reason, Supplier<Boolean> unblockCondition) {
412
      throw new UnsupportedOperationException("not implemented");
×
413
    }
414

415
    @Override
416
    public Promise<Void> newTimer(Duration duration) {
417
      throw new UnsupportedOperationException("not implemented");
×
418
    }
419

420
    @Override
421
    public <R> R sideEffect(Class<R> resultClass, Type resultType, Func<R> func) {
422
      throw new UnsupportedOperationException("not implemented");
×
423
    }
424

425
    @Override
426
    public <R> R mutableSideEffect(
427
        String id, Class<R> resultClass, Type resultType, BiPredicate<R, R> updated, Func<R> func) {
428
      throw new UnsupportedOperationException("not implemented");
×
429
    }
430

431
    @Override
432
    public int getVersion(String changeId, int minSupported, int maxSupported) {
433
      throw new UnsupportedOperationException("not implemented");
×
434
    }
435

436
    @Override
437
    public void continueAsNew(ContinueAsNewInput input) {
438
      throw new UnsupportedOperationException("not implemented");
×
439
    }
440

441
    @Override
442
    public void registerQuery(RegisterQueryInput input) {
443
      throw new UnsupportedOperationException("not implemented");
×
444
    }
445

446
    @Override
447
    public void registerSignalHandlers(RegisterSignalHandlersInput input) {
448
      throw new UnsupportedOperationException("not implemented");
×
449
    }
450

451
    @Override
452
    public void registerDynamicSignalHandler(RegisterDynamicSignalHandlerInput input) {
453
      throw new UnsupportedOperationException("not implemented");
×
454
    }
455

456
    @Override
457
    public void registerDynamicQueryHandler(RegisterDynamicQueryHandlerInput input) {
458
      throw new UnsupportedOperationException("not implemented");
×
459
    }
460

461
    @Override
462
    public UUID randomUUID() {
463
      throw new UnsupportedOperationException("not implemented");
×
464
    }
465

466
    @Override
467
    public void upsertSearchAttributes(Map<String, ?> searchAttributes) {
468
      throw new UnsupportedOperationException("not implemented");
×
469
    }
470

471
    @Override
472
    public void upsertTypedSearchAttributes(SearchAttributeUpdate<?>... searchAttributeUpdates) {
473
      throw new UnsupportedOperationException("not implemented");
×
474
    }
475

476
    @Override
477
    public Object newChildThread(Runnable runnable, boolean detached, String name) {
478
      throw new UnsupportedOperationException("not implemented");
×
479
    }
480

481
    @Override
482
    public long currentTimeMillis() {
483
      throw new UnsupportedOperationException("not implemented");
×
484
    }
485

486
    private <T> T getReply(
487
        PollActivityTaskQueueResponse task,
488
        ActivityTaskHandler.Result response,
489
        Class<T> resultClass,
490
        Type resultType) {
491
      DataConverter dataConverter =
1✔
492
          testEnvironmentOptions.getWorkflowClientOptions().getDataConverter();
1✔
493
      RespondActivityTaskCompletedRequest taskCompleted = response.getTaskCompleted();
1✔
494
      if (taskCompleted != null) {
1✔
495
        Optional<Payloads> result =
496
            taskCompleted.hasResult() ? Optional.of(taskCompleted.getResult()) : Optional.empty();
1✔
497
        return dataConverter.fromPayloads(0, result, resultClass, resultType);
1✔
498
      } else {
499
        RespondActivityTaskFailedRequest taskFailed =
1✔
500
            response.getTaskFailed().getTaskFailedRequest();
1✔
501
        if (taskFailed != null) {
1✔
502
          Exception cause = dataConverter.failureToException(taskFailed.getFailure());
1✔
503
          throw new ActivityFailure(
1✔
504
              taskFailed.getFailure().getMessage(),
1✔
505
              0,
506
              0,
507
              task.getActivityType().getName(),
1✔
508
              task.getActivityId(),
1✔
509
              RetryState.RETRY_STATE_NON_RETRYABLE_FAILURE,
510
              "TestActivityEnvironment",
511
              cause);
512
        } else {
513
          RespondActivityTaskCanceledRequest taskCanceled = response.getTaskCanceled();
×
514
          if (taskCanceled != null) {
×
515
            throw new CanceledFailure(
×
516
                "canceled",
517
                new EncodedValues(
518
                    taskCanceled.hasDetails()
×
519
                        ? Optional.of(taskCanceled.getDetails())
×
520
                        : Optional.empty(),
×
521
                    dataConverter),
522
                null);
523
          }
524
        }
525
      }
526
      return Defaults.defaultValue(resultClass);
×
527
    }
528

529
    @Override
530
    public void registerUpdateHandlers(RegisterUpdateHandlersInput input) {
531
      throw new UnsupportedOperationException("not implemented");
×
532
    }
533

534
    @Override
535
    public void registerDynamicUpdateHandler(RegisterDynamicUpdateHandlerInput input) {
536
      throw new UnsupportedOperationException("not implemented");
×
537
    }
538
  }
539

540
  private static class ClassConsumerPair<T> {
541

542
    final Functions.Proc1<T> consumer;
543
    final Class<T> valueClass;
544
    final Type valueType;
545

546
    ClassConsumerPair(Class<T> valueClass, Type valueType, Functions.Proc1<T> consumer) {
1✔
547
      this.valueClass = Objects.requireNonNull(valueClass);
1✔
548
      this.valueType = Objects.requireNonNull(valueType);
1✔
549
      this.consumer = Objects.requireNonNull(consumer);
1✔
550
    }
1✔
551
  }
552
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc