• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

uber / cadence-java-client / 16

16 Apr 2024 01:28AM UTC coverage: 60.239% (-0.1%) from 60.343%
16

push

buildkite

mstifflin
Remove unnecessary sidecar command, try executing with lower resources

11446 of 19001 relevant lines covered (60.24%)

0.6 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

98.21
/src/main/java/com/uber/cadence/internal/worker/LocalActivityWorker.java
1
/*
2
 *  Copyright 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved.
3
 *
4
 *  Modifications copyright (C) 2017 Uber Technologies, Inc.
5
 *
6
 *  Licensed under the Apache License, Version 2.0 (the "License"). You may not
7
 *  use this file except in compliance with the License. A copy of the License is
8
 *  located at
9
 *
10
 *  http://aws.amazon.com/apache2.0
11
 *
12
 *  or in the "license" file accompanying this file. This file is distributed on
13
 *  an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
14
 *  express or implied. See the License for the specific language governing
15
 *  permissions and limitations under the License.
16
 */
17

18
package com.uber.cadence.internal.worker;
19

20
import com.uber.cadence.EventType;
21
import com.uber.cadence.HistoryEvent;
22
import com.uber.cadence.MarkerRecordedEventAttributes;
23
import com.uber.cadence.PollForActivityTaskResponse;
24
import com.uber.cadence.common.RetryOptions;
25
import com.uber.cadence.internal.common.LocalActivityMarkerData;
26
import com.uber.cadence.internal.metrics.MetricsTag;
27
import com.uber.cadence.internal.metrics.MetricsType;
28
import com.uber.cadence.internal.replay.ClockDecisionContext;
29
import com.uber.cadence.internal.replay.ExecuteLocalActivityParameters;
30
import com.uber.cadence.internal.tracing.TracingPropagator;
31
import com.uber.m3.tally.Scope;
32
import com.uber.m3.tally.Stopwatch;
33
import com.uber.m3.util.ImmutableMap;
34
import io.opentracing.Span;
35
import java.time.Duration;
36
import java.util.Map;
37
import java.util.Objects;
38
import java.util.Optional;
39
import java.util.function.BiFunction;
40
import java.util.function.Consumer;
41
import java.util.function.LongSupplier;
42

43
public final class LocalActivityWorker extends SuspendableWorkerBase {
44

45
  private static final String POLL_THREAD_NAME_PREFIX = "Local Activity Poller taskList=";
46

47
  private final ActivityTaskHandler handler;
48
  private final String domain;
49
  private final String taskList;
50
  private final SingleWorkerOptions options;
51
  private final LocalActivityPollTask laPollTask;
52
  private final TracingPropagator spanFactory;
53

54
  public LocalActivityWorker(
55
      String domain, String taskList, SingleWorkerOptions options, ActivityTaskHandler handler) {
1✔
56
    this.domain = Objects.requireNonNull(domain);
1✔
57
    this.taskList = Objects.requireNonNull(taskList);
1✔
58
    this.handler = handler;
1✔
59
    this.laPollTask = new LocalActivityPollTask();
1✔
60
    this.spanFactory = new TracingPropagator(options.getTracer());
1✔
61

62
    PollerOptions pollerOptions = options.getPollerOptions();
1✔
63
    if (pollerOptions.getPollThreadNamePrefix() == null) {
1✔
64
      pollerOptions =
1✔
65
          PollerOptions.newBuilder(pollerOptions)
1✔
66
              .setPollThreadNamePrefix(
1✔
67
                  POLL_THREAD_NAME_PREFIX + "\"" + taskList + "\", domain=\"" + domain + "\"")
68
              .build();
1✔
69
    }
70
    this.options = SingleWorkerOptions.newBuilder(options).setPollerOptions(pollerOptions).build();
1✔
71
  }
1✔
72

73
  @Override
74
  public void start() {
75
    if (handler.isAnyTypeSupported()) {
1✔
76
      SuspendableWorker poller =
1✔
77
          new Poller<>(
78
              options.getIdentity(),
1✔
79
              laPollTask,
80
              new PollTaskExecutor<>(domain, taskList, options, new TaskHandlerImpl(handler)),
81
              options.getPollerOptions(),
1✔
82
              options.getMetricsScope());
1✔
83
      poller.start();
1✔
84
      setPoller(poller);
1✔
85
      options.getMetricsScope().counter(MetricsType.WORKER_START_COUNTER).inc(1);
1✔
86
    }
87
  }
1✔
88

89
  public static class Task {
90
    private final ExecuteLocalActivityParameters params;
91
    private final Consumer<HistoryEvent> eventConsumer;
92
    private final LongSupplier currentTimeMillis;
93
    private final LongSupplier replayTimeUpdatedAtMillis;
94
    long taskStartTime;
95
    private final int decisionTimeoutSeconds;
96

97
    public Task(
98
        ExecuteLocalActivityParameters params,
99
        Consumer<HistoryEvent> eventConsumer,
100
        int decisionTimeoutSeconds,
101
        LongSupplier currentTimeMillis,
102
        LongSupplier replayTimeUpdatedAtMillis) {
1✔
103
      this.params = params;
1✔
104
      this.eventConsumer = eventConsumer;
1✔
105
      this.currentTimeMillis = currentTimeMillis;
1✔
106
      this.replayTimeUpdatedAtMillis = replayTimeUpdatedAtMillis;
1✔
107
      this.decisionTimeoutSeconds = decisionTimeoutSeconds;
1✔
108
    }
1✔
109

110
    public ExecuteLocalActivityParameters getExecuteLocalActivityParameters() {
111
      return params;
1✔
112
    }
113
  }
114

115
  public BiFunction<Task, Duration, Boolean> getLocalActivityTaskPoller() {
116
    return laPollTask;
1✔
117
  }
118

119
  private class TaskHandlerImpl implements PollTaskExecutor.TaskHandler<Task> {
120

121
    final ActivityTaskHandler handler;
122

123
    private TaskHandlerImpl(ActivityTaskHandler handler) {
1✔
124
      this.handler = handler;
1✔
125
    }
1✔
126

127
    @Override
128
    public void handle(Task task) throws Exception {
129
      propagateContext(task.params);
1✔
130

131
      // start and activate span for local activities
132
      Span span = spanFactory.activateSpanForExecuteLocalActivity(task);
1✔
133

134
      task.taskStartTime = System.currentTimeMillis();
1✔
135
      ActivityTaskHandler.Result result = handleLocalActivity(task);
1✔
136

137
      LocalActivityMarkerData.Builder markerBuilder = new LocalActivityMarkerData.Builder();
1✔
138
      markerBuilder.setActivityId(task.params.getActivityId());
1✔
139
      markerBuilder.setActivityType(task.params.getActivityType());
1✔
140
      long replayTimeMillis =
1✔
141
          task.currentTimeMillis.getAsLong()
1✔
142
              + (System.currentTimeMillis() - task.replayTimeUpdatedAtMillis.getAsLong());
1✔
143
      markerBuilder.setReplayTimeMillis(replayTimeMillis);
1✔
144

145
      if (result.getTaskCompleted() != null) {
1✔
146
        markerBuilder.setResult(result.getTaskCompleted().getResult());
1✔
147
      } else if (result.getTaskFailedResult() != null) {
1✔
148
        markerBuilder.setTaskFailedRequest(result.getTaskFailedResult().getTaskFailedRequest());
1✔
149
        markerBuilder.setAttempt(result.getAttempt());
1✔
150
        markerBuilder.setBackoff(result.getBackoff());
1✔
151
      } else {
152
        markerBuilder.setTaskCancelledRequest(result.getTaskCancelled());
×
153
      }
154

155
      LocalActivityMarkerData marker = markerBuilder.build();
1✔
156

157
      HistoryEvent event = new HistoryEvent();
1✔
158
      event.setEventType(EventType.MarkerRecorded);
1✔
159
      MarkerRecordedEventAttributes attributes =
1✔
160
          new MarkerRecordedEventAttributes()
161
              .setMarkerName(ClockDecisionContext.LOCAL_ACTIVITY_MARKER_NAME)
1✔
162
              .setHeader(marker.getHeader(options.getDataConverter()))
1✔
163
              .setDetails(marker.getResult());
1✔
164
      event.setMarkerRecordedEventAttributes(attributes);
1✔
165
      task.eventConsumer.accept(event);
1✔
166

167
      span.finish();
1✔
168
    }
1✔
169

170
    @Override
171
    public Throwable wrapFailure(Task task, Throwable failure) {
172
      return new RuntimeException("Failure processing local activity task.", failure);
×
173
    }
174

175
    private ActivityTaskHandler.Result handleLocalActivity(Task task) throws InterruptedException {
176
      Map<String, String> activityTypeTag =
1✔
177
          new ImmutableMap.Builder<String, String>(1)
178
              .put(MetricsTag.ACTIVITY_TYPE, task.params.getActivityType().getName())
1✔
179
              .build();
1✔
180

181
      Scope metricsScope = options.getMetricsScope().tagged(activityTypeTag);
1✔
182
      metricsScope.counter(MetricsType.LOCAL_ACTIVITY_TOTAL_COUNTER).inc(1);
1✔
183

184
      PollForActivityTaskResponse pollTask = new PollForActivityTaskResponse();
1✔
185
      pollTask.setWorkflowDomain(task.params.getWorkflowDomain());
1✔
186
      pollTask.setActivityId(task.params.getActivityId());
1✔
187
      pollTask.setWorkflowExecution(task.params.getWorkflowExecution());
1✔
188
      pollTask.setScheduledTimestamp(System.currentTimeMillis());
1✔
189
      pollTask.setStartedTimestamp(System.currentTimeMillis());
1✔
190
      pollTask.setActivityType(task.params.getActivityType());
1✔
191
      pollTask.setInput(task.params.getInput());
1✔
192
      pollTask.setAttempt(task.params.getAttempt());
1✔
193

194
      Stopwatch sw = metricsScope.timer(MetricsType.LOCAL_ACTIVITY_EXECUTION_LATENCY).start();
1✔
195
      ActivityTaskHandler.Result result = handler.handle(pollTask, metricsScope, true);
1✔
196
      sw.stop();
1✔
197
      result.setAttempt(task.params.getAttempt());
1✔
198

199
      if (result.getTaskCompleted() != null
1✔
200
          || result.getTaskCancelled() != null
1✔
201
          || task.params.getRetryOptions() == null) {
1✔
202
        return result;
1✔
203
      }
204

205
      RetryOptions retryOptions = task.params.getRetryOptions();
1✔
206
      long sleepMillis = retryOptions.calculateSleepTime(task.params.getAttempt());
1✔
207
      long elapsedTask = System.currentTimeMillis() - task.taskStartTime;
1✔
208
      long elapsedTotal = elapsedTask + task.params.getElapsedTime();
1✔
209
      if (retryOptions.shouldRethrow(
1✔
210
          result.getTaskFailedResult().getFailure(),
1✔
211
          task.params.getAttempt(),
1✔
212
          elapsedTotal,
213
          sleepMillis)) {
214
        return result;
1✔
215
      } else {
216
        result.setBackoff(Duration.ofMillis(sleepMillis));
1✔
217
      }
218

219
      // For small backoff we do local retry. Otherwise we will schedule timer on server side.
220
      if (elapsedTask + sleepMillis < task.decisionTimeoutSeconds * 1000) {
1✔
221
        Thread.sleep(sleepMillis);
1✔
222
        task.params.setAttempt(task.params.getAttempt() + 1);
1✔
223
        return handleLocalActivity(task);
1✔
224
      } else {
225
        return result;
1✔
226
      }
227
    }
228
  }
229

230
  private void propagateContext(ExecuteLocalActivityParameters params) {
231
    if (options.getContextPropagators() == null || options.getContextPropagators().isEmpty()) {
1✔
232
      return;
1✔
233
    }
234

235
    Optional.ofNullable(params.getContext())
1✔
236
        .filter(context -> !context.isEmpty())
1✔
237
        .ifPresent(this::restoreContext);
1✔
238
  }
1✔
239

240
  private void restoreContext(Map<String, byte[]> context) {
241
    options
1✔
242
        .getContextPropagators()
1✔
243
        .forEach(
1✔
244
            propagator -> propagator.setCurrentContext(propagator.deserializeContext(context)));
1✔
245
  }
1✔
246
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc