• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

temporalio / sdk-java / #333

16 Oct 2024 07:28PM UTC coverage: 78.65% (+0.6%) from 78.085%
#333

push

github

web-flow
Fix code coverage (#2275)

22670 of 28824 relevant lines covered (78.65%)

0.79 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

81.86
/temporal-testing/src/main/java/io/temporal/testing/TestActivityEnvironmentInternal.java
1
/*
2
 * Copyright (C) 2022 Temporal Technologies, Inc. All Rights Reserved.
3
 *
4
 * Copyright (C) 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved.
5
 *
6
 * Modifications copyright (C) 2017 Uber Technologies, Inc.
7
 *
8
 * Licensed under the Apache License, Version 2.0 (the "License");
9
 * you may not use this material except in compliance with the License.
10
 * You may obtain a copy of the License at
11
 *
12
 *   http://www.apache.org/licenses/LICENSE-2.0
13
 *
14
 * Unless required by applicable law or agreed to in writing, software
15
 * distributed under the License is distributed on an "AS IS" BASIS,
16
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17
 * See the License for the specific language governing permissions and
18
 * limitations under the License.
19
 */
20

21
package io.temporal.testing;
22

23
import com.google.common.base.Defaults;
24
import com.google.protobuf.ByteString;
25
import com.uber.m3.tally.NoopScope;
26
import com.uber.m3.tally.Scope;
27
import io.grpc.StatusRuntimeException;
28
import io.grpc.stub.StreamObserver;
29
import io.temporal.activity.ActivityOptions;
30
import io.temporal.activity.LocalActivityOptions;
31
import io.temporal.api.common.v1.ActivityType;
32
import io.temporal.api.common.v1.Payloads;
33
import io.temporal.api.common.v1.WorkflowExecution;
34
import io.temporal.api.enums.v1.RetryState;
35
import io.temporal.api.workflowservice.v1.PollActivityTaskQueueResponse;
36
import io.temporal.api.workflowservice.v1.RecordActivityTaskHeartbeatRequest;
37
import io.temporal.api.workflowservice.v1.RecordActivityTaskHeartbeatResponse;
38
import io.temporal.api.workflowservice.v1.RespondActivityTaskCanceledRequest;
39
import io.temporal.api.workflowservice.v1.RespondActivityTaskCompletedRequest;
40
import io.temporal.api.workflowservice.v1.RespondActivityTaskFailedRequest;
41
import io.temporal.api.workflowservice.v1.WorkflowServiceGrpc;
42
import io.temporal.common.SearchAttributeUpdate;
43
import io.temporal.common.converter.DataConverter;
44
import io.temporal.common.converter.EncodedValues;
45
import io.temporal.common.interceptors.WorkflowOutboundCallsInterceptor;
46
import io.temporal.failure.ActivityFailure;
47
import io.temporal.failure.CanceledFailure;
48
import io.temporal.internal.activity.ActivityExecutionContextFactory;
49
import io.temporal.internal.activity.ActivityExecutionContextFactoryImpl;
50
import io.temporal.internal.activity.ActivityTaskHandlerImpl;
51
import io.temporal.internal.common.ProtobufTimeUtils;
52
import io.temporal.internal.sync.*;
53
import io.temporal.internal.testservice.InProcessGRPCServer;
54
import io.temporal.internal.worker.ActivityTask;
55
import io.temporal.internal.worker.ActivityTaskHandler;
56
import io.temporal.internal.worker.ActivityTaskHandler.Result;
57
import io.temporal.serviceclient.WorkflowServiceStubs;
58
import io.temporal.serviceclient.WorkflowServiceStubsOptions;
59
import io.temporal.worker.WorkerOptions;
60
import io.temporal.workflow.Functions;
61
import io.temporal.workflow.Functions.Func;
62
import io.temporal.workflow.Promise;
63
import io.temporal.workflow.TimerOptions;
64
import io.temporal.workflow.Workflow;
65
import java.lang.reflect.InvocationHandler;
66
import java.lang.reflect.Type;
67
import java.nio.charset.StandardCharsets;
68
import java.time.Duration;
69
import java.util.*;
70
import java.util.concurrent.*;
71
import java.util.concurrent.atomic.AtomicBoolean;
72
import java.util.concurrent.atomic.AtomicInteger;
73
import java.util.concurrent.atomic.AtomicReference;
74
import java.util.function.BiPredicate;
75
import java.util.function.Supplier;
76
import javax.annotation.Nullable;
77
import org.slf4j.Logger;
78
import org.slf4j.LoggerFactory;
79

80
public final class TestActivityEnvironmentInternal implements TestActivityEnvironment {
81
  private static final Logger log = LoggerFactory.getLogger(TestActivityEnvironmentInternal.class);
1✔
82

83
  private final ScheduledExecutorService heartbeatExecutor = Executors.newScheduledThreadPool(20);
1✔
84
  private final ExecutorService activityWorkerExecutor =
1✔
85
      Executors.newSingleThreadExecutor(r -> new Thread(r, "test-service-activity-worker"));
1✔
86
  private final ExecutorService deterministicRunnerExecutor =
1✔
87
      new ThreadPoolExecutor(
88
          1,
89
          1000,
90
          1,
91
          TimeUnit.SECONDS,
92
          new SynchronousQueue<>(),
93
          r -> new Thread(r, "test-service-deterministic-runner"));
1✔
94
  private final AtomicBoolean cancellationRequested = new AtomicBoolean();
1✔
95
  private final AtomicInteger idSequencer = new AtomicInteger();
1✔
96
  private final InProcessGRPCServer mockServer;
97
  private final ActivityTaskHandlerImpl activityTaskHandler;
98
  private final TestEnvironmentOptions testEnvironmentOptions;
99
  private final WorkflowServiceStubs workflowServiceStubs;
100
  private final AtomicReference<Object> heartbeatDetails = new AtomicReference<>();
1✔
101
  private ClassConsumerPair<Object> activityHeartbeatListener;
102

103
  public TestActivityEnvironmentInternal(@Nullable TestEnvironmentOptions options) {
1✔
104
    // Initialize an in-memory mock service.
105
    this.mockServer =
1✔
106
        new InProcessGRPCServer(Collections.singletonList(new HeartbeatInterceptingService()));
1✔
107
    this.testEnvironmentOptions =
1✔
108
        options != null
1✔
109
            ? TestEnvironmentOptions.newBuilder(options).validateAndBuildWithDefaults()
1✔
110
            : TestEnvironmentOptions.newBuilder().validateAndBuildWithDefaults();
1✔
111
    WorkflowServiceStubsOptions.Builder serviceStubsOptionsBuilder =
1✔
112
        WorkflowServiceStubsOptions.newBuilder(
1✔
113
                testEnvironmentOptions.getWorkflowServiceStubsOptions())
1✔
114
            .setTarget(null)
1✔
115
            .setChannel(this.mockServer.getChannel())
1✔
116
            .setRpcQueryTimeout(Duration.ofSeconds(60));
1✔
117
    Scope metricsScope = this.testEnvironmentOptions.getMetricsScope();
1✔
118
    if (metricsScope != null && !(NoopScope.class.equals(metricsScope.getClass()))) {
1✔
119
      serviceStubsOptionsBuilder.setMetricsScope(metricsScope);
1✔
120
    }
121
    this.workflowServiceStubs =
1✔
122
        WorkflowServiceStubs.newServiceStubs(serviceStubsOptionsBuilder.build());
1✔
123

124
    ActivityExecutionContextFactory activityExecutionContextFactory =
1✔
125
        new ActivityExecutionContextFactoryImpl(
126
            workflowServiceStubs,
127
            testEnvironmentOptions.getWorkflowClientOptions().getIdentity(),
1✔
128
            testEnvironmentOptions.getWorkflowClientOptions().getNamespace(),
1✔
129
            WorkerOptions.getDefaultInstance().getMaxHeartbeatThrottleInterval(),
1✔
130
            WorkerOptions.getDefaultInstance().getDefaultHeartbeatThrottleInterval(),
1✔
131
            testEnvironmentOptions.getWorkflowClientOptions().getDataConverter(),
1✔
132
            heartbeatExecutor);
133
    activityTaskHandler =
1✔
134
        new ActivityTaskHandlerImpl(
135
            testEnvironmentOptions.getWorkflowClientOptions().getNamespace(),
1✔
136
            "test-activity-env-task-queue",
137
            testEnvironmentOptions.getWorkflowClientOptions().getDataConverter(),
1✔
138
            activityExecutionContextFactory,
139
            testEnvironmentOptions.getWorkerFactoryOptions().getWorkerInterceptors(),
1✔
140
            testEnvironmentOptions.getWorkflowClientOptions().getContextPropagators());
1✔
141
  }
1✔
142

143
  private class HeartbeatInterceptingService extends WorkflowServiceGrpc.WorkflowServiceImplBase {
1✔
144
    @Override
145
    public void recordActivityTaskHeartbeat(
146
        RecordActivityTaskHeartbeatRequest request,
147
        StreamObserver<RecordActivityTaskHeartbeatResponse> responseObserver) {
148
      try {
149
        if (activityHeartbeatListener != null) {
1✔
150
          Optional<Payloads> requestDetails =
151
              request.hasDetails() ? Optional.of(request.getDetails()) : Optional.empty();
1✔
152

153
          Object details =
1✔
154
              testEnvironmentOptions
1✔
155
                  .getWorkflowClientOptions()
1✔
156
                  .getDataConverter()
1✔
157
                  .fromPayloads(
1✔
158
                      0,
159
                      requestDetails,
160
                      activityHeartbeatListener.valueClass,
1✔
161
                      activityHeartbeatListener.valueType);
1✔
162
          activityHeartbeatListener.consumer.apply(details);
1✔
163
        }
164
        responseObserver.onNext(
1✔
165
            RecordActivityTaskHeartbeatResponse.newBuilder()
1✔
166
                .setCancelRequested(cancellationRequested.get())
1✔
167
                .build());
1✔
168
        responseObserver.onCompleted();
1✔
169
      } catch (StatusRuntimeException e) {
1✔
170
        responseObserver.onError(e);
1✔
171
      }
1✔
172
    }
1✔
173
  }
174

175
  @Override
176
  public void registerActivitiesImplementations(Object... activityImplementations) {
177
    activityTaskHandler.registerActivityImplementations(activityImplementations);
1✔
178
  }
1✔
179

180
  /**
181
   * Creates client stub to activities that implement given interface.
182
   *
183
   * @param activityInterface interface type implemented by activities
184
   */
185
  @Override
186
  public <T> T newActivityStub(Class<T> activityInterface) {
187
    ActivityOptions options =
188
        ActivityOptions.newBuilder()
1✔
189
            .setScheduleToCloseTimeout(Duration.ofDays(1))
1✔
190
            .setHeartbeatTimeout(Duration.ofSeconds(1))
1✔
191
            .build();
1✔
192
    InvocationHandler invocationHandler =
1✔
193
        ActivityInvocationHandler.newInstance(
1✔
194
            activityInterface, options, null, new TestActivityExecutor(), () -> {});
1✔
195
    invocationHandler =
1✔
196
        new DeterministicRunnerWrapper(invocationHandler, deterministicRunnerExecutor::submit);
1✔
197
    return ActivityInvocationHandlerBase.newProxy(activityInterface, invocationHandler);
1✔
198
  }
199

200
  /**
201
   * Creates client stub to activities that implement given interface.
202
   *
203
   * @param activityInterface interface type implemented by activities
204
   * @param options options that specify the activity invocation parameters
205
   */
206
  @Override
207
  public <T> T newActivityStub(Class<T> activityInterface, ActivityOptions options) {
208
    InvocationHandler invocationHandler =
1✔
209
        ActivityInvocationHandler.newInstance(
1✔
210
            activityInterface, options, null, new TestActivityExecutor(), () -> {});
1✔
211
    invocationHandler =
1✔
212
        new DeterministicRunnerWrapper(invocationHandler, deterministicRunnerExecutor::submit);
1✔
213
    return ActivityInvocationHandlerBase.newProxy(activityInterface, invocationHandler);
1✔
214
  }
215

216
  /**
217
   * Creates client stub to activities that implement given interface.
218
   *
219
   * @param activityInterface interface type implemented by activities
220
   * @param options options that specify the activity invocation parameters
221
   * @param activityMethodOptions activity method-specific invocation parameters
222
   */
223
  @Override
224
  public <T> T newLocalActivityStub(
225
      Class<T> activityInterface,
226
      LocalActivityOptions options,
227
      Map<String, LocalActivityOptions> activityMethodOptions) {
228
    InvocationHandler invocationHandler =
1✔
229
        LocalActivityInvocationHandler.newInstance(
1✔
230
            activityInterface,
231
            options,
232
            activityMethodOptions,
233
            new TestActivityExecutor(),
234
            () -> {});
1✔
235
    invocationHandler =
1✔
236
        new DeterministicRunnerWrapper(invocationHandler, deterministicRunnerExecutor::submit);
1✔
237
    return ActivityInvocationHandlerBase.newProxy(activityInterface, invocationHandler);
1✔
238
  }
239

240
  @Override
241
  public void requestCancelActivity() {
242
    cancellationRequested.set(true);
1✔
243
  }
1✔
244

245
  @Override
246
  public <T> void setActivityHeartbeatListener(Class<T> detailsClass, Functions.Proc1<T> listener) {
247
    setActivityHeartbeatListener(detailsClass, detailsClass, listener);
1✔
248
  }
1✔
249

250
  @Override
251
  @SuppressWarnings("unchecked")
252
  public <T> void setActivityHeartbeatListener(
253
      Class<T> detailsClass, Type detailsType, Functions.Proc1<T> listener) {
254
    activityHeartbeatListener = new ClassConsumerPair(detailsClass, detailsType, listener);
1✔
255
  }
1✔
256

257
  @Override
258
  public <T> void setHeartbeatDetails(T details) {
259
    heartbeatDetails.set(details);
1✔
260
  }
1✔
261

262
  @Override
263
  public void close() {
264
    heartbeatExecutor.shutdownNow();
1✔
265
    activityWorkerExecutor.shutdownNow();
1✔
266
    deterministicRunnerExecutor.shutdownNow();
1✔
267
    workflowServiceStubs.shutdown();
1✔
268
    mockServer.shutdown();
1✔
269
    mockServer.awaitTermination(5, TimeUnit.SECONDS);
1✔
270
  }
1✔
271

272
  private class TestActivityExecutor implements WorkflowOutboundCallsInterceptor {
1✔
273

274
    @Override
275
    public <T> ActivityOutput<T> executeActivity(ActivityInput<T> i) {
276
      Optional<Payloads> payloads =
1✔
277
          testEnvironmentOptions
1✔
278
              .getWorkflowClientOptions()
1✔
279
              .getDataConverter()
1✔
280
              .toPayloads(i.getArgs());
1✔
281
      Optional<Payloads> heartbeatPayload =
1✔
282
          Optional.ofNullable(heartbeatDetails.getAndSet(null))
1✔
283
              .flatMap(
1✔
284
                  obj ->
285
                      testEnvironmentOptions
1✔
286
                          .getWorkflowClientOptions()
1✔
287
                          .getDataConverter()
1✔
288
                          .toPayloads(obj));
1✔
289

290
      ActivityOptions options = i.getOptions();
1✔
291
      PollActivityTaskQueueResponse.Builder taskBuilder =
292
          PollActivityTaskQueueResponse.newBuilder()
1✔
293
              .setScheduleToCloseTimeout(
1✔
294
                  ProtobufTimeUtils.toProtoDuration(options.getScheduleToCloseTimeout()))
1✔
295
              .setHeartbeatTimeout(ProtobufTimeUtils.toProtoDuration(options.getHeartbeatTimeout()))
1✔
296
              .setStartToCloseTimeout(
1✔
297
                  ProtobufTimeUtils.toProtoDuration(options.getStartToCloseTimeout()))
1✔
298
              .setScheduledTime(ProtobufTimeUtils.getCurrentProtoTime())
1✔
299
              .setStartedTime(ProtobufTimeUtils.getCurrentProtoTime())
1✔
300
              .setTaskToken(ByteString.copyFrom("test-task-token".getBytes(StandardCharsets.UTF_8)))
1✔
301
              .setActivityId(String.valueOf(idSequencer.incrementAndGet()))
1✔
302
              .setWorkflowExecution(
1✔
303
                  WorkflowExecution.newBuilder()
1✔
304
                      .setWorkflowId("test-workflow-id")
1✔
305
                      .setRunId(UUID.randomUUID().toString())
1✔
306
                      .build())
1✔
307
              .setActivityType(ActivityType.newBuilder().setName(i.getActivityName()).build());
1✔
308
      payloads.ifPresent(taskBuilder::setInput);
1✔
309
      heartbeatPayload.ifPresent(taskBuilder::setHeartbeatDetails);
1✔
310
      PollActivityTaskQueueResponse task = taskBuilder.build();
1✔
311
      return new ActivityOutput<>(
1✔
312
          task.getActivityId(),
1✔
313
          Workflow.newPromise(
1✔
314
              getReply(task, executeActivity(task, false), i.getResultClass(), i.getResultType())));
1✔
315
    }
316

317
    @Override
318
    public <R> LocalActivityOutput<R> executeLocalActivity(LocalActivityInput<R> i) {
319
      Optional<Payloads> payloads =
1✔
320
          testEnvironmentOptions
1✔
321
              .getWorkflowClientOptions()
1✔
322
              .getDataConverter()
1✔
323
              .toPayloads(i.getArgs());
1✔
324
      LocalActivityOptions options = i.getOptions();
1✔
325
      PollActivityTaskQueueResponse.Builder taskBuilder =
326
          PollActivityTaskQueueResponse.newBuilder()
1✔
327
              .setScheduleToCloseTimeout(
1✔
328
                  ProtobufTimeUtils.toProtoDuration(options.getScheduleToCloseTimeout()))
1✔
329
              .setStartToCloseTimeout(
1✔
330
                  ProtobufTimeUtils.toProtoDuration(options.getStartToCloseTimeout()))
1✔
331
              .setScheduledTime(ProtobufTimeUtils.getCurrentProtoTime())
1✔
332
              .setStartedTime(ProtobufTimeUtils.getCurrentProtoTime())
1✔
333
              .setTaskToken(ByteString.copyFrom("test-task-token".getBytes(StandardCharsets.UTF_8)))
1✔
334
              .setActivityId(String.valueOf(idSequencer.incrementAndGet()))
1✔
335
              .setWorkflowExecution(
1✔
336
                  WorkflowExecution.newBuilder()
1✔
337
                      .setWorkflowId("test-workflow-id")
1✔
338
                      .setRunId(UUID.randomUUID().toString())
1✔
339
                      .build())
1✔
340
              .setActivityType(ActivityType.newBuilder().setName(i.getActivityName()).build());
1✔
341
      payloads.ifPresent(taskBuilder::setInput);
1✔
342
      PollActivityTaskQueueResponse task = taskBuilder.build();
1✔
343
      return new LocalActivityOutput<>(
1✔
344
          Workflow.newPromise(
1✔
345
              getReply(task, executeActivity(task, true), i.getResultClass(), i.getResultType())));
1✔
346
    }
347

348
    /**
349
     * We execute the activity task on a separate activity worker to emulate what actually happens
350
     * in our production setup
351
     *
352
     * @param activityTask activity task to execute
353
     * @param localActivity true if it's a local activity
354
     * @return result of activity execution
355
     */
356
    private Result executeActivity(
357
        PollActivityTaskQueueResponse activityTask, boolean localActivity) {
358
      //noinspection DataFlowIssue -- no permit for the LA in this test
359
      Future<Result> activityFuture =
1✔
360
          activityWorkerExecutor.submit(
1✔
361
              () ->
362
                  activityTaskHandler.handle(
1✔
363
                      new ActivityTask(activityTask, null, () -> {}),
×
364
                      testEnvironmentOptions.getMetricsScope(),
1✔
365
                      localActivity));
366

367
      try {
368
        return activityFuture.get();
1✔
369
      } catch (InterruptedException e) {
×
370
        Thread.currentThread().interrupt();
×
371
        throw new RuntimeException(e);
×
372
      } catch (ExecutionException e) {
×
373
        log.error("Exception during processing of activity task");
×
374
        throw new RuntimeException(e);
×
375
      }
376
    }
377

378
    @Override
379
    public <R> ChildWorkflowOutput<R> executeChildWorkflow(ChildWorkflowInput<R> input) {
380
      throw new UnsupportedOperationException("not implemented");
×
381
    }
382

383
    @Override
384
    public <R> ExecuteNexusOperationOutput<R> executeNexusOperation(
385
        ExecuteNexusOperationInput<R> input) {
386
      throw new UnsupportedOperationException("not implemented");
×
387
    }
388

389
    @Override
390
    public Random newRandom() {
391
      throw new UnsupportedOperationException("not implemented");
×
392
    }
393

394
    @Override
395
    public SignalExternalOutput signalExternalWorkflow(SignalExternalInput input) {
396
      throw new UnsupportedOperationException("not implemented");
×
397
    }
398

399
    @Override
400
    public CancelWorkflowOutput cancelWorkflow(CancelWorkflowInput input) {
401
      throw new UnsupportedOperationException("not implemented");
×
402
    }
403

404
    @Override
405
    public void sleep(Duration duration) {
406
      throw new UnsupportedOperationException("not implemented");
×
407
    }
408

409
    @Override
410
    public boolean await(Duration timeout, String reason, Supplier<Boolean> unblockCondition) {
411
      throw new UnsupportedOperationException("not implemented");
×
412
    }
413

414
    @Override
415
    public void await(String reason, Supplier<Boolean> unblockCondition) {
416
      throw new UnsupportedOperationException("not implemented");
×
417
    }
418

419
    @Override
420
    public Promise<Void> newTimer(Duration duration) {
421
      throw new UnsupportedOperationException("not implemented");
×
422
    }
423

424
    @Override
425
    public Promise<Void> newTimer(Duration duration, TimerOptions options) {
426
      throw new UnsupportedOperationException("not implemented");
×
427
    }
428

429
    @Override
430
    public <R> R sideEffect(Class<R> resultClass, Type resultType, Func<R> func) {
431
      throw new UnsupportedOperationException("not implemented");
×
432
    }
433

434
    @Override
435
    public <R> R mutableSideEffect(
436
        String id, Class<R> resultClass, Type resultType, BiPredicate<R, R> updated, Func<R> func) {
437
      throw new UnsupportedOperationException("not implemented");
×
438
    }
439

440
    @Override
441
    public int getVersion(String changeId, int minSupported, int maxSupported) {
442
      throw new UnsupportedOperationException("not implemented");
×
443
    }
444

445
    @Override
446
    public void continueAsNew(ContinueAsNewInput input) {
447
      throw new UnsupportedOperationException("not implemented");
×
448
    }
449

450
    @Override
451
    public void registerQuery(RegisterQueryInput input) {
452
      throw new UnsupportedOperationException("not implemented");
×
453
    }
454

455
    @Override
456
    public void registerSignalHandlers(RegisterSignalHandlersInput input) {
457
      throw new UnsupportedOperationException("not implemented");
×
458
    }
459

460
    @Override
461
    public void registerDynamicSignalHandler(RegisterDynamicSignalHandlerInput input) {
462
      throw new UnsupportedOperationException("not implemented");
×
463
    }
464

465
    @Override
466
    public void registerDynamicQueryHandler(RegisterDynamicQueryHandlerInput input) {
467
      throw new UnsupportedOperationException("not implemented");
×
468
    }
469

470
    @Override
471
    public UUID randomUUID() {
472
      throw new UnsupportedOperationException("not implemented");
×
473
    }
474

475
    @Override
476
    public void upsertSearchAttributes(Map<String, ?> searchAttributes) {
477
      throw new UnsupportedOperationException("not implemented");
×
478
    }
479

480
    @Override
481
    public void upsertTypedSearchAttributes(SearchAttributeUpdate<?>... searchAttributeUpdates) {
482
      throw new UnsupportedOperationException("not implemented");
×
483
    }
484

485
    @Override
486
    public void upsertMemo(Map<String, Object> memo) {
487
      throw new UnsupportedOperationException("not implemented");
×
488
    }
489

490
    @Override
491
    public Scope getMetricsScope() {
492
      throw new UnsupportedOperationException("not implemented");
×
493
    }
494

495
    @Override
496
    public Object newChildThread(Runnable runnable, boolean detached, String name) {
497
      throw new UnsupportedOperationException("not implemented");
×
498
    }
499

500
    @Override
501
    public long currentTimeMillis() {
502
      throw new UnsupportedOperationException("not implemented");
×
503
    }
504

505
    private <T> T getReply(
506
        PollActivityTaskQueueResponse task,
507
        ActivityTaskHandler.Result response,
508
        Class<T> resultClass,
509
        Type resultType) {
510
      DataConverter dataConverter =
1✔
511
          testEnvironmentOptions.getWorkflowClientOptions().getDataConverter();
1✔
512
      RespondActivityTaskCompletedRequest taskCompleted = response.getTaskCompleted();
1✔
513
      if (taskCompleted != null) {
1✔
514
        Optional<Payloads> result =
515
            taskCompleted.hasResult() ? Optional.of(taskCompleted.getResult()) : Optional.empty();
1✔
516
        return dataConverter.fromPayloads(0, result, resultClass, resultType);
1✔
517
      } else {
518
        RespondActivityTaskFailedRequest taskFailed =
1✔
519
            response.getTaskFailed().getTaskFailedRequest();
1✔
520
        if (taskFailed != null) {
1✔
521
          Exception cause = dataConverter.failureToException(taskFailed.getFailure());
1✔
522
          throw new ActivityFailure(
1✔
523
              taskFailed.getFailure().getMessage(),
1✔
524
              0,
525
              0,
526
              task.getActivityType().getName(),
1✔
527
              task.getActivityId(),
1✔
528
              RetryState.RETRY_STATE_NON_RETRYABLE_FAILURE,
529
              "TestActivityEnvironment",
530
              cause);
531
        } else {
532
          RespondActivityTaskCanceledRequest taskCanceled = response.getTaskCanceled();
×
533
          if (taskCanceled != null) {
×
534
            throw new CanceledFailure(
×
535
                "canceled",
536
                new EncodedValues(
537
                    taskCanceled.hasDetails()
×
538
                        ? Optional.of(taskCanceled.getDetails())
×
539
                        : Optional.empty(),
×
540
                    dataConverter),
541
                null);
542
          }
543
        }
544
      }
545
      return Defaults.defaultValue(resultClass);
×
546
    }
547

548
    @Override
549
    public void registerUpdateHandlers(RegisterUpdateHandlersInput input) {
550
      throw new UnsupportedOperationException("not implemented");
×
551
    }
552

553
    @Override
554
    public void registerDynamicUpdateHandler(RegisterDynamicUpdateHandlerInput input) {
555
      throw new UnsupportedOperationException("not implemented");
×
556
    }
557
  }
558

559
  private static class ClassConsumerPair<T> {
560

561
    final Functions.Proc1<T> consumer;
562
    final Class<T> valueClass;
563
    final Type valueType;
564

565
    ClassConsumerPair(Class<T> valueClass, Type valueType, Functions.Proc1<T> consumer) {
1✔
566
      this.valueClass = Objects.requireNonNull(valueClass);
1✔
567
      this.valueType = Objects.requireNonNull(valueType);
1✔
568
      this.consumer = Objects.requireNonNull(consumer);
1✔
569
    }
1✔
570
  }
571
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc