• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

uber / cadence-java-client / 1861

pending completion
1861

push

buildkite

web-flow
3.9.0 release (#823)

11116 of 18414 relevant lines covered (60.37%)

0.6 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

39.02
/src/main/java/com/uber/cadence/internal/worker/LocallyDispatchedActivityPollTask.java
1
/*
2
 *  Copyright 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved.
3
 *
4
 *  Modifications copyright (C) 2017 Uber Technologies, Inc.
5
 *
6
 *  Licensed under the Apache License, Version 2.0 (the "License"). You may not
7
 *  use this file except in compliance with the License. A copy of the License is
8
 *  located at
9
 *
10
 *  http://aws.amazon.com/apache2.0
11
 *
12
 *  or in the "license" file accompanying this file. This file is distributed on
13
 *  an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
14
 *  express or implied. See the License for the specific language governing
15
 *  permissions and limitations under the License.
16
 */
17

18
package com.uber.cadence.internal.worker;
19

20
import com.uber.cadence.PollForActivityTaskResponse;
21
import com.uber.cadence.internal.metrics.MetricsType;
22
import com.uber.cadence.internal.worker.LocallyDispatchedActivityWorker.Task;
23
import java.util.concurrent.SynchronousQueue;
24
import java.util.function.Function;
25
import org.apache.thrift.TException;
26
import org.slf4j.Logger;
27
import org.slf4j.LoggerFactory;
28

29
final class LocallyDispatchedActivityPollTask extends ActivityPollTaskBase
30
    implements Function<Task, Boolean> {
31

32
  private static final Logger log =
1✔
33
      LoggerFactory.getLogger(LocallyDispatchedActivityPollTask.class);
1✔
34
  private final SynchronousQueue<Task> pendingTasks = new SynchronousQueue<>();
1✔
35

36
  public LocallyDispatchedActivityPollTask(SingleWorkerOptions options) {
37
    super(options);
1✔
38
  }
1✔
39

40
  @Override
41
  protected PollForActivityTaskResponse pollTask() throws TException {
42
    Task task;
43
    try {
44
      task = pendingTasks.take();
1✔
45
    } catch (InterruptedException e) {
1✔
46
      throw new RuntimeException("locally dispatch activity poll task interrupted", e);
1✔
47
    }
1✔
48
    try {
49
      if (!task.await()) {
1✔
50
        options
1✔
51
            .getMetricsScope()
1✔
52
            .counter(MetricsType.LOCALLY_DISPATCHED_ACTIVITY_POLL_NO_TASK_COUNTER)
1✔
53
            .inc(1);
1✔
54
        return null;
1✔
55
      }
56
    } catch (InterruptedException e) {
×
57
      throw new RuntimeException("locally dispatch activity await task interrupted", e);
×
58
    }
×
59
    options.getMetricsScope().counter(MetricsType.ACTIVITY_POLL_COUNTER).inc(1);
×
60
    options
×
61
        .getMetricsScope()
×
62
        .counter(MetricsType.LOCALLY_DISPATCHED_ACTIVITY_POLL_SUCCEED_COUNTER)
×
63
        .inc(1);
×
64
    PollForActivityTaskResponse result = new PollForActivityTaskResponse();
×
65
    result.activityId = task.activityId;
×
66
    result.activityType = task.activityType;
×
67
    result.header = task.header;
×
68
    result.input = task.input;
×
69
    result.workflowExecution = task.workflowExecution;
×
70
    result.scheduledTimestampOfThisAttempt = task.scheduledTimestampOfThisAttempt;
×
71
    result.scheduledTimestamp = task.scheduledTimestamp;
×
72
    result.scheduleToCloseTimeoutSeconds = task.scheduleToCloseTimeoutSeconds;
×
73
    result.startedTimestamp = task.startedTimestamp;
×
74
    result.startToCloseTimeoutSeconds = task.startToCloseTimeoutSeconds;
×
75
    result.heartbeatTimeoutSeconds = task.heartbeatTimeoutSeconds;
×
76
    result.taskToken = task.taskToken;
×
77
    result.workflowType = task.workflowType;
×
78
    result.workflowDomain = task.workflowDomain;
×
79
    result.attempt = 0;
×
80
    return result;
×
81
  }
82

83
  @Override
84
  public Boolean apply(Task task) {
85
    // non blocking put to the unbuffered queue
86
    return pendingTasks.offer(task);
1✔
87
  }
88
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc