• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

temporalio / sdk-java / #175

pending completion
#175

push

github-actions

web-flow
Worker / Build Id versioning (#1786)

Implement new worker build id based versioning feature

236 of 236 new or added lines in 24 files covered. (100.0%)

18343 of 23697 relevant lines covered (77.41%)

0.81 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

81.98
/temporal-testing/src/main/java/io/temporal/testing/TestActivityEnvironmentInternal.java
1
/*
2
 * Copyright (C) 2022 Temporal Technologies, Inc. All Rights Reserved.
3
 *
4
 * Copyright (C) 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved.
5
 *
6
 * Modifications copyright (C) 2017 Uber Technologies, Inc.
7
 *
8
 * Licensed under the Apache License, Version 2.0 (the "License");
9
 * you may not use this material except in compliance with the License.
10
 * You may obtain a copy of the License at
11
 *
12
 *   http://www.apache.org/licenses/LICENSE-2.0
13
 *
14
 * Unless required by applicable law or agreed to in writing, software
15
 * distributed under the License is distributed on an "AS IS" BASIS,
16
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17
 * See the License for the specific language governing permissions and
18
 * limitations under the License.
19
 */
20

21
package io.temporal.testing;
22

23
import com.google.common.base.Defaults;
24
import com.google.protobuf.ByteString;
25
import com.uber.m3.tally.NoopScope;
26
import com.uber.m3.tally.Scope;
27
import io.grpc.StatusRuntimeException;
28
import io.grpc.stub.StreamObserver;
29
import io.temporal.activity.ActivityOptions;
30
import io.temporal.activity.LocalActivityOptions;
31
import io.temporal.api.common.v1.ActivityType;
32
import io.temporal.api.common.v1.Payloads;
33
import io.temporal.api.common.v1.WorkflowExecution;
34
import io.temporal.api.enums.v1.RetryState;
35
import io.temporal.api.workflowservice.v1.PollActivityTaskQueueResponse;
36
import io.temporal.api.workflowservice.v1.RecordActivityTaskHeartbeatRequest;
37
import io.temporal.api.workflowservice.v1.RecordActivityTaskHeartbeatResponse;
38
import io.temporal.api.workflowservice.v1.RespondActivityTaskCanceledRequest;
39
import io.temporal.api.workflowservice.v1.RespondActivityTaskCompletedRequest;
40
import io.temporal.api.workflowservice.v1.RespondActivityTaskFailedRequest;
41
import io.temporal.api.workflowservice.v1.WorkflowServiceGrpc;
42
import io.temporal.common.SearchAttributeUpdate;
43
import io.temporal.common.converter.DataConverter;
44
import io.temporal.common.converter.EncodedValues;
45
import io.temporal.common.interceptors.WorkflowOutboundCallsInterceptor;
46
import io.temporal.failure.ActivityFailure;
47
import io.temporal.failure.CanceledFailure;
48
import io.temporal.internal.activity.ActivityExecutionContextFactory;
49
import io.temporal.internal.activity.ActivityExecutionContextFactoryImpl;
50
import io.temporal.internal.activity.ActivityTaskHandlerImpl;
51
import io.temporal.internal.common.ProtobufTimeUtils;
52
import io.temporal.internal.sync.*;
53
import io.temporal.internal.testservice.InProcessGRPCServer;
54
import io.temporal.internal.worker.ActivityTask;
55
import io.temporal.internal.worker.ActivityTaskHandler;
56
import io.temporal.internal.worker.ActivityTaskHandler.Result;
57
import io.temporal.serviceclient.WorkflowServiceStubs;
58
import io.temporal.serviceclient.WorkflowServiceStubsOptions;
59
import io.temporal.worker.WorkerOptions;
60
import io.temporal.workflow.Functions;
61
import io.temporal.workflow.Functions.Func;
62
import io.temporal.workflow.Promise;
63
import io.temporal.workflow.Workflow;
64
import java.lang.reflect.InvocationHandler;
65
import java.lang.reflect.Type;
66
import java.nio.charset.StandardCharsets;
67
import java.time.Duration;
68
import java.util.*;
69
import java.util.concurrent.*;
70
import java.util.concurrent.atomic.AtomicBoolean;
71
import java.util.concurrent.atomic.AtomicInteger;
72
import java.util.concurrent.atomic.AtomicReference;
73
import java.util.function.BiPredicate;
74
import java.util.function.Supplier;
75
import javax.annotation.Nullable;
76
import org.slf4j.Logger;
77
import org.slf4j.LoggerFactory;
78

79
public final class TestActivityEnvironmentInternal implements TestActivityEnvironment {
80
  private static final Logger log = LoggerFactory.getLogger(TestActivityEnvironmentInternal.class);
1✔
81

82
  private final ScheduledExecutorService heartbeatExecutor = Executors.newScheduledThreadPool(20);
1✔
83
  private final ExecutorService activityWorkerExecutor =
1✔
84
      Executors.newSingleThreadExecutor(r -> new Thread(r, "test-service-activity-worker"));
1✔
85
  private final ExecutorService deterministicRunnerExecutor =
1✔
86
      new ThreadPoolExecutor(
87
          1,
88
          1000,
89
          1,
90
          TimeUnit.SECONDS,
91
          new SynchronousQueue<>(),
92
          r -> new Thread(r, "test-service-deterministic-runner"));
1✔
93
  private final AtomicBoolean cancellationRequested = new AtomicBoolean();
1✔
94
  private final AtomicInteger idSequencer = new AtomicInteger();
1✔
95
  private final InProcessGRPCServer mockServer;
96
  private final ActivityTaskHandlerImpl activityTaskHandler;
97
  private final TestEnvironmentOptions testEnvironmentOptions;
98
  private final WorkflowServiceStubs workflowServiceStubs;
99
  private final AtomicReference<Object> heartbeatDetails = new AtomicReference<>();
1✔
100
  private ClassConsumerPair<Object> activityHeartbeatListener;
101

102
  public TestActivityEnvironmentInternal(@Nullable TestEnvironmentOptions options) {
1✔
103
    // Initialize an in-memory mock service.
104
    this.mockServer =
1✔
105
        new InProcessGRPCServer(Collections.singletonList(new HeartbeatInterceptingService()));
1✔
106
    this.testEnvironmentOptions =
1✔
107
        options != null
1✔
108
            ? TestEnvironmentOptions.newBuilder(options).validateAndBuildWithDefaults()
1✔
109
            : TestEnvironmentOptions.newBuilder().validateAndBuildWithDefaults();
1✔
110
    WorkflowServiceStubsOptions.Builder serviceStubsOptionsBuilder =
1✔
111
        WorkflowServiceStubsOptions.newBuilder(
1✔
112
                testEnvironmentOptions.getWorkflowServiceStubsOptions())
1✔
113
            .setTarget(null)
1✔
114
            .setChannel(this.mockServer.getChannel())
1✔
115
            .setRpcQueryTimeout(Duration.ofSeconds(60));
1✔
116
    Scope metricsScope = this.testEnvironmentOptions.getMetricsScope();
1✔
117
    if (metricsScope != null && !(NoopScope.class.equals(metricsScope.getClass()))) {
1✔
118
      serviceStubsOptionsBuilder.setMetricsScope(metricsScope);
1✔
119
    }
120
    this.workflowServiceStubs =
1✔
121
        WorkflowServiceStubs.newServiceStubs(serviceStubsOptionsBuilder.build());
1✔
122

123
    ActivityExecutionContextFactory activityExecutionContextFactory =
1✔
124
        new ActivityExecutionContextFactoryImpl(
125
            workflowServiceStubs,
126
            testEnvironmentOptions.getWorkflowClientOptions().getIdentity(),
1✔
127
            testEnvironmentOptions.getWorkflowClientOptions().getNamespace(),
1✔
128
            WorkerOptions.getDefaultInstance().getMaxHeartbeatThrottleInterval(),
1✔
129
            WorkerOptions.getDefaultInstance().getDefaultHeartbeatThrottleInterval(),
1✔
130
            testEnvironmentOptions.getWorkflowClientOptions().getDataConverter(),
1✔
131
            heartbeatExecutor);
132
    activityTaskHandler =
1✔
133
        new ActivityTaskHandlerImpl(
134
            testEnvironmentOptions.getWorkflowClientOptions().getNamespace(),
1✔
135
            "test-activity-env-task-queue",
136
            testEnvironmentOptions.getWorkflowClientOptions().getDataConverter(),
1✔
137
            activityExecutionContextFactory,
138
            testEnvironmentOptions.getWorkerFactoryOptions().getWorkerInterceptors(),
1✔
139
            testEnvironmentOptions.getWorkflowClientOptions().getContextPropagators());
1✔
140
  }
1✔
141

142
  private class HeartbeatInterceptingService extends WorkflowServiceGrpc.WorkflowServiceImplBase {
1✔
143
    @Override
144
    public void recordActivityTaskHeartbeat(
145
        RecordActivityTaskHeartbeatRequest request,
146
        StreamObserver<RecordActivityTaskHeartbeatResponse> responseObserver) {
147
      try {
148
        if (activityHeartbeatListener != null) {
1✔
149
          Optional<Payloads> requestDetails =
150
              request.hasDetails() ? Optional.of(request.getDetails()) : Optional.empty();
1✔
151

152
          Object details =
1✔
153
              testEnvironmentOptions
1✔
154
                  .getWorkflowClientOptions()
1✔
155
                  .getDataConverter()
1✔
156
                  .fromPayloads(
1✔
157
                      0,
158
                      requestDetails,
159
                      activityHeartbeatListener.valueClass,
1✔
160
                      activityHeartbeatListener.valueType);
1✔
161
          activityHeartbeatListener.consumer.apply(details);
1✔
162
        }
163
        responseObserver.onNext(
1✔
164
            RecordActivityTaskHeartbeatResponse.newBuilder()
1✔
165
                .setCancelRequested(cancellationRequested.get())
1✔
166
                .build());
1✔
167
        responseObserver.onCompleted();
1✔
168
      } catch (StatusRuntimeException e) {
1✔
169
        responseObserver.onError(e);
1✔
170
      }
1✔
171
    }
1✔
172
  }
173

174
  @Override
175
  public void registerActivitiesImplementations(Object... activityImplementations) {
176
    activityTaskHandler.registerActivityImplementations(activityImplementations);
1✔
177
  }
1✔
178

179
  /**
180
   * Creates client stub to activities that implement given interface.
181
   *
182
   * @param activityInterface interface type implemented by activities
183
   */
184
  @Override
185
  public <T> T newActivityStub(Class<T> activityInterface) {
186
    ActivityOptions options =
187
        ActivityOptions.newBuilder()
1✔
188
            .setScheduleToCloseTimeout(Duration.ofDays(1))
1✔
189
            .setHeartbeatTimeout(Duration.ofSeconds(1))
1✔
190
            .build();
1✔
191
    InvocationHandler invocationHandler =
1✔
192
        ActivityInvocationHandler.newInstance(
1✔
193
            activityInterface, options, null, new TestActivityExecutor());
194
    invocationHandler =
1✔
195
        new DeterministicRunnerWrapper(invocationHandler, deterministicRunnerExecutor::submit);
1✔
196
    return ActivityInvocationHandlerBase.newProxy(activityInterface, invocationHandler);
1✔
197
  }
198

199
  /**
200
   * Creates client stub to activities that implement given interface.
201
   *
202
   * @param activityInterface interface type implemented by activities
203
   * @param options options that specify the activity invocation parameters
204
   */
205
  @Override
206
  public <T> T newActivityStub(Class<T> activityInterface, ActivityOptions options) {
207
    InvocationHandler invocationHandler =
1✔
208
        ActivityInvocationHandler.newInstance(
1✔
209
            activityInterface, options, null, new TestActivityExecutor());
210
    invocationHandler =
1✔
211
        new DeterministicRunnerWrapper(invocationHandler, deterministicRunnerExecutor::submit);
1✔
212
    return ActivityInvocationHandlerBase.newProxy(activityInterface, invocationHandler);
1✔
213
  }
214

215
  /**
216
   * Creates client stub to activities that implement given interface.
217
   *
218
   * @param activityInterface interface type implemented by activities
219
   * @param options options that specify the activity invocation parameters
220
   * @param activityMethodOptions activity method-specific invocation parameters
221
   */
222
  @Override
223
  public <T> T newLocalActivityStub(
224
      Class<T> activityInterface,
225
      LocalActivityOptions options,
226
      Map<String, LocalActivityOptions> activityMethodOptions) {
227
    InvocationHandler invocationHandler =
1✔
228
        LocalActivityInvocationHandler.newInstance(
1✔
229
            activityInterface, options, activityMethodOptions, new TestActivityExecutor());
230
    invocationHandler =
1✔
231
        new DeterministicRunnerWrapper(invocationHandler, deterministicRunnerExecutor::submit);
1✔
232
    return ActivityInvocationHandlerBase.newProxy(activityInterface, invocationHandler);
1✔
233
  }
234

235
  @Override
236
  public void requestCancelActivity() {
237
    cancellationRequested.set(true);
1✔
238
  }
1✔
239

240
  @Override
241
  public <T> void setActivityHeartbeatListener(Class<T> detailsClass, Functions.Proc1<T> listener) {
242
    setActivityHeartbeatListener(detailsClass, detailsClass, listener);
1✔
243
  }
1✔
244

245
  @Override
246
  @SuppressWarnings("unchecked")
247
  public <T> void setActivityHeartbeatListener(
248
      Class<T> detailsClass, Type detailsType, Functions.Proc1<T> listener) {
249
    activityHeartbeatListener = new ClassConsumerPair(detailsClass, detailsType, listener);
1✔
250
  }
1✔
251

252
  @Override
253
  public <T> void setHeartbeatDetails(T details) {
254
    heartbeatDetails.set(details);
1✔
255
  }
1✔
256

257
  @Override
258
  public void close() {
259
    heartbeatExecutor.shutdownNow();
1✔
260
    activityWorkerExecutor.shutdownNow();
1✔
261
    deterministicRunnerExecutor.shutdownNow();
1✔
262
    workflowServiceStubs.shutdown();
1✔
263
    mockServer.shutdown();
1✔
264
    mockServer.awaitTermination(5, TimeUnit.SECONDS);
1✔
265
  }
1✔
266

267
  private class TestActivityExecutor implements WorkflowOutboundCallsInterceptor {
1✔
268

269
    @Override
270
    public <T> ActivityOutput<T> executeActivity(ActivityInput<T> i) {
271
      Optional<Payloads> payloads =
1✔
272
          testEnvironmentOptions
1✔
273
              .getWorkflowClientOptions()
1✔
274
              .getDataConverter()
1✔
275
              .toPayloads(i.getArgs());
1✔
276
      Optional<Payloads> heartbeatPayload =
1✔
277
          Optional.ofNullable(heartbeatDetails.getAndSet(null))
1✔
278
              .flatMap(
1✔
279
                  obj ->
280
                      testEnvironmentOptions
1✔
281
                          .getWorkflowClientOptions()
1✔
282
                          .getDataConverter()
1✔
283
                          .toPayloads(obj));
1✔
284

285
      ActivityOptions options = i.getOptions();
1✔
286
      PollActivityTaskQueueResponse.Builder taskBuilder =
287
          PollActivityTaskQueueResponse.newBuilder()
1✔
288
              .setScheduleToCloseTimeout(
1✔
289
                  ProtobufTimeUtils.toProtoDuration(options.getScheduleToCloseTimeout()))
1✔
290
              .setHeartbeatTimeout(ProtobufTimeUtils.toProtoDuration(options.getHeartbeatTimeout()))
1✔
291
              .setStartToCloseTimeout(
1✔
292
                  ProtobufTimeUtils.toProtoDuration(options.getStartToCloseTimeout()))
1✔
293
              .setScheduledTime(ProtobufTimeUtils.getCurrentProtoTime())
1✔
294
              .setStartedTime(ProtobufTimeUtils.getCurrentProtoTime())
1✔
295
              .setTaskToken(ByteString.copyFrom("test-task-token".getBytes(StandardCharsets.UTF_8)))
1✔
296
              .setActivityId(String.valueOf(idSequencer.incrementAndGet()))
1✔
297
              .setWorkflowExecution(
1✔
298
                  WorkflowExecution.newBuilder()
1✔
299
                      .setWorkflowId("test-workflow-id")
1✔
300
                      .setRunId(UUID.randomUUID().toString())
1✔
301
                      .build())
1✔
302
              .setActivityType(ActivityType.newBuilder().setName(i.getActivityName()).build());
1✔
303
      payloads.ifPresent(taskBuilder::setInput);
1✔
304
      heartbeatPayload.ifPresent(taskBuilder::setHeartbeatDetails);
1✔
305
      PollActivityTaskQueueResponse task = taskBuilder.build();
1✔
306
      return new ActivityOutput<>(
1✔
307
          task.getActivityId(),
1✔
308
          Workflow.newPromise(
1✔
309
              getReply(task, executeActivity(task, false), i.getResultClass(), i.getResultType())));
1✔
310
    }
311

312
    @Override
313
    public <R> LocalActivityOutput<R> executeLocalActivity(LocalActivityInput<R> i) {
314
      Optional<Payloads> payloads =
1✔
315
          testEnvironmentOptions
1✔
316
              .getWorkflowClientOptions()
1✔
317
              .getDataConverter()
1✔
318
              .toPayloads(i.getArgs());
1✔
319
      LocalActivityOptions options = i.getOptions();
1✔
320
      PollActivityTaskQueueResponse.Builder taskBuilder =
321
          PollActivityTaskQueueResponse.newBuilder()
1✔
322
              .setScheduleToCloseTimeout(
1✔
323
                  ProtobufTimeUtils.toProtoDuration(options.getScheduleToCloseTimeout()))
1✔
324
              .setStartToCloseTimeout(
1✔
325
                  ProtobufTimeUtils.toProtoDuration(options.getStartToCloseTimeout()))
1✔
326
              .setScheduledTime(ProtobufTimeUtils.getCurrentProtoTime())
1✔
327
              .setStartedTime(ProtobufTimeUtils.getCurrentProtoTime())
1✔
328
              .setTaskToken(ByteString.copyFrom("test-task-token".getBytes(StandardCharsets.UTF_8)))
1✔
329
              .setActivityId(String.valueOf(idSequencer.incrementAndGet()))
1✔
330
              .setWorkflowExecution(
1✔
331
                  WorkflowExecution.newBuilder()
1✔
332
                      .setWorkflowId("test-workflow-id")
1✔
333
                      .setRunId(UUID.randomUUID().toString())
1✔
334
                      .build())
1✔
335
              .setActivityType(ActivityType.newBuilder().setName(i.getActivityName()).build());
1✔
336
      payloads.ifPresent(taskBuilder::setInput);
1✔
337
      PollActivityTaskQueueResponse task = taskBuilder.build();
1✔
338
      return new LocalActivityOutput<>(
1✔
339
          Workflow.newPromise(
1✔
340
              getReply(task, executeActivity(task, true), i.getResultClass(), i.getResultType())));
1✔
341
    }
342

343
    /**
344
     * We execute the activity task on a separate activity worker to emulate what actually happens
345
     * in our production setup
346
     *
347
     * @param activityTask activity task to execute
348
     * @param localActivity true if it's a local activity
349
     * @return result of activity execution
350
     */
351
    private Result executeActivity(
352
        PollActivityTaskQueueResponse activityTask, boolean localActivity) {
353
      Future<Result> activityFuture =
1✔
354
          activityWorkerExecutor.submit(
1✔
355
              () ->
356
                  activityTaskHandler.handle(
1✔
357
                      new ActivityTask(activityTask, () -> {}),
×
358
                      testEnvironmentOptions.getMetricsScope(),
1✔
359
                      localActivity));
360

361
      try {
362
        // 10 seconds is just a "reasonable" wait to not make an infinite waiting
363
        return activityFuture.get(10, TimeUnit.SECONDS);
1✔
364
      } catch (InterruptedException e) {
×
365
        Thread.currentThread().interrupt();
×
366
        throw new RuntimeException(e);
×
367
      } catch (ExecutionException e) {
×
368
        log.error("Exception during processing of activity task");
×
369
        throw new RuntimeException(e);
×
370
      } catch (TimeoutException e) {
×
371
        log.error("Timeout trying execute activity task {}", activityTask);
×
372
        throw new RuntimeException(e);
×
373
      }
374
    }
375

376
    @Override
377
    public <R> ChildWorkflowOutput<R> executeChildWorkflow(ChildWorkflowInput<R> input) {
378
      throw new UnsupportedOperationException("not implemented");
×
379
    }
380

381
    @Override
382
    public Random newRandom() {
383
      throw new UnsupportedOperationException("not implemented");
×
384
    }
385

386
    @Override
387
    public SignalExternalOutput signalExternalWorkflow(SignalExternalInput input) {
388
      throw new UnsupportedOperationException("not implemented");
×
389
    }
390

391
    @Override
392
    public CancelWorkflowOutput cancelWorkflow(CancelWorkflowInput input) {
393
      throw new UnsupportedOperationException("not implemented");
×
394
    }
395

396
    @Override
397
    public void sleep(Duration duration) {
398
      throw new UnsupportedOperationException("not implemented");
×
399
    }
400

401
    @Override
402
    public boolean await(Duration timeout, String reason, Supplier<Boolean> unblockCondition) {
403
      throw new UnsupportedOperationException("not implemented");
×
404
    }
405

406
    @Override
407
    public void await(String reason, Supplier<Boolean> unblockCondition) {
408
      throw new UnsupportedOperationException("not implemented");
×
409
    }
410

411
    @Override
412
    public Promise<Void> newTimer(Duration duration) {
413
      throw new UnsupportedOperationException("not implemented");
×
414
    }
415

416
    @Override
417
    public <R> R sideEffect(Class<R> resultClass, Type resultType, Func<R> func) {
418
      throw new UnsupportedOperationException("not implemented");
×
419
    }
420

421
    @Override
422
    public <R> R mutableSideEffect(
423
        String id, Class<R> resultClass, Type resultType, BiPredicate<R, R> updated, Func<R> func) {
424
      throw new UnsupportedOperationException("not implemented");
×
425
    }
426

427
    @Override
428
    public int getVersion(String changeId, int minSupported, int maxSupported) {
429
      throw new UnsupportedOperationException("not implemented");
×
430
    }
431

432
    @Override
433
    public void continueAsNew(ContinueAsNewInput input) {
434
      throw new UnsupportedOperationException("not implemented");
×
435
    }
436

437
    @Override
438
    public void registerQuery(RegisterQueryInput input) {
439
      throw new UnsupportedOperationException("not implemented");
×
440
    }
441

442
    @Override
443
    public void registerSignalHandlers(RegisterSignalHandlersInput input) {
444
      throw new UnsupportedOperationException("not implemented");
×
445
    }
446

447
    @Override
448
    public void registerDynamicSignalHandler(RegisterDynamicSignalHandlerInput input) {
449
      throw new UnsupportedOperationException("not implemented");
×
450
    }
451

452
    @Override
453
    public void registerDynamicQueryHandler(RegisterDynamicQueryHandlerInput input) {
454
      throw new UnsupportedOperationException("not implemented");
×
455
    }
456

457
    @Override
458
    public UUID randomUUID() {
459
      throw new UnsupportedOperationException("not implemented");
×
460
    }
461

462
    @Override
463
    public void upsertSearchAttributes(Map<String, ?> searchAttributes) {
464
      throw new UnsupportedOperationException("not implemented");
×
465
    }
466

467
    @Override
468
    public void upsertTypedSearchAttributes(SearchAttributeUpdate<?>... searchAttributeUpdates) {
469
      throw new UnsupportedOperationException("not implemented");
×
470
    }
471

472
    @Override
473
    public Object newChildThread(Runnable runnable, boolean detached, String name) {
474
      throw new UnsupportedOperationException("not implemented");
×
475
    }
476

477
    @Override
478
    public long currentTimeMillis() {
479
      throw new UnsupportedOperationException("not implemented");
×
480
    }
481

482
    private <T> T getReply(
483
        PollActivityTaskQueueResponse task,
484
        ActivityTaskHandler.Result response,
485
        Class<T> resultClass,
486
        Type resultType) {
487
      DataConverter dataConverter =
1✔
488
          testEnvironmentOptions.getWorkflowClientOptions().getDataConverter();
1✔
489
      RespondActivityTaskCompletedRequest taskCompleted = response.getTaskCompleted();
1✔
490
      if (taskCompleted != null) {
1✔
491
        Optional<Payloads> result =
492
            taskCompleted.hasResult() ? Optional.of(taskCompleted.getResult()) : Optional.empty();
1✔
493
        return dataConverter.fromPayloads(0, result, resultClass, resultType);
1✔
494
      } else {
495
        RespondActivityTaskFailedRequest taskFailed =
1✔
496
            response.getTaskFailed().getTaskFailedRequest();
1✔
497
        if (taskFailed != null) {
1✔
498
          Exception cause = dataConverter.failureToException(taskFailed.getFailure());
1✔
499
          throw new ActivityFailure(
1✔
500
              taskFailed.getFailure().getMessage(),
1✔
501
              0,
502
              0,
503
              task.getActivityType().getName(),
1✔
504
              task.getActivityId(),
1✔
505
              RetryState.RETRY_STATE_NON_RETRYABLE_FAILURE,
506
              "TestActivityEnvironment",
507
              cause);
508
        } else {
509
          RespondActivityTaskCanceledRequest taskCanceled = response.getTaskCanceled();
×
510
          if (taskCanceled != null) {
×
511
            throw new CanceledFailure(
×
512
                "canceled",
513
                new EncodedValues(
514
                    taskCanceled.hasDetails()
×
515
                        ? Optional.of(taskCanceled.getDetails())
×
516
                        : Optional.empty(),
×
517
                    dataConverter),
518
                null);
519
          }
520
        }
521
      }
522
      return Defaults.defaultValue(resultClass);
×
523
    }
524

525
    @Override
526
    public void registerUpdateHandlers(RegisterUpdateHandlersInput input) {
527
      throw new UnsupportedOperationException("not implemented");
×
528
    }
529

530
    @Override
531
    public void registerDynamicUpdateHandler(RegisterDynamicUpdateHandlerInput input) {
532
      throw new UnsupportedOperationException("not implemented");
×
533
    }
534
  }
535

536
  private static class ClassConsumerPair<T> {
537

538
    final Functions.Proc1<T> consumer;
539
    final Class<T> valueClass;
540
    final Type valueType;
541

542
    ClassConsumerPair(Class<T> valueClass, Type valueType, Functions.Proc1<T> consumer) {
1✔
543
      this.valueClass = Objects.requireNonNull(valueClass);
1✔
544
      this.valueType = Objects.requireNonNull(valueType);
1✔
545
      this.consumer = Objects.requireNonNull(consumer);
1✔
546
    }
1✔
547
  }
548
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc