• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

uber / cadence-java-client / 2326

15 May 2024 06:02PM CUT coverage: 61.521% (+0.08%) from 61.437%
2326

Pull #896

buildkite

shijiesheng
fix test
Pull Request #896: Fix memory leak caused by incorrect context deactivation

67 of 69 new or added lines in 6 files covered. (97.1%)

9 existing lines in 4 files now uncovered.

11975 of 19465 relevant lines covered (61.52%)

0.62 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

98.25
/src/main/java/com/uber/cadence/internal/worker/LocalActivityWorker.java
1
/*
2
 *  Copyright 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved.
3
 *
4
 *  Modifications copyright (C) 2017 Uber Technologies, Inc.
5
 *
6
 *  Licensed under the Apache License, Version 2.0 (the "License"). You may not
7
 *  use this file except in compliance with the License. A copy of the License is
8
 *  located at
9
 *
10
 *  http://aws.amazon.com/apache2.0
11
 *
12
 *  or in the "license" file accompanying this file. This file is distributed on
13
 *  an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
14
 *  express or implied. See the License for the specific language governing
15
 *  permissions and limitations under the License.
16
 */
17

18
package com.uber.cadence.internal.worker;
19

20
import com.uber.cadence.EventType;
21
import com.uber.cadence.HistoryEvent;
22
import com.uber.cadence.MarkerRecordedEventAttributes;
23
import com.uber.cadence.PollForActivityTaskResponse;
24
import com.uber.cadence.common.RetryOptions;
25
import com.uber.cadence.internal.common.LocalActivityMarkerData;
26
import com.uber.cadence.internal.metrics.MetricsTag;
27
import com.uber.cadence.internal.metrics.MetricsType;
28
import com.uber.cadence.internal.replay.ClockDecisionContext;
29
import com.uber.cadence.internal.replay.ExecuteLocalActivityParameters;
30
import com.uber.cadence.internal.tracing.TracingPropagator;
31
import com.uber.m3.tally.Scope;
32
import com.uber.m3.tally.Stopwatch;
33
import com.uber.m3.util.ImmutableMap;
34
import io.opentracing.Span;
35
import io.opentracing.Tracer;
36
import java.time.Duration;
37
import java.util.Map;
38
import java.util.Objects;
39
import java.util.Optional;
40
import java.util.function.BiFunction;
41
import java.util.function.Consumer;
42
import java.util.function.LongSupplier;
43

44
public final class LocalActivityWorker extends SuspendableWorkerBase {
45

46
  private static final String POLL_THREAD_NAME_PREFIX = "Local Activity Poller taskList=";
47

48
  private final ActivityTaskHandler handler;
49
  private final String domain;
50
  private final String taskList;
51
  private final SingleWorkerOptions options;
52
  private final LocalActivityPollTask laPollTask;
53
  private final TracingPropagator spanFactory;
54
  private final Tracer tracer;
55

56
  public LocalActivityWorker(
57
      String domain, String taskList, SingleWorkerOptions options, ActivityTaskHandler handler) {
1✔
58
    this.domain = Objects.requireNonNull(domain);
1✔
59
    this.taskList = Objects.requireNonNull(taskList);
1✔
60
    this.handler = handler;
1✔
61
    this.laPollTask = new LocalActivityPollTask();
1✔
62
    this.spanFactory = new TracingPropagator(options.getTracer());
1✔
63
    this.tracer = options.getTracer();
1✔
64

65
    PollerOptions pollerOptions = options.getPollerOptions();
1✔
66
    if (pollerOptions.getPollThreadNamePrefix() == null) {
1✔
67
      pollerOptions =
1✔
68
          PollerOptions.newBuilder(pollerOptions)
1✔
69
              .setPollThreadNamePrefix(
1✔
70
                  POLL_THREAD_NAME_PREFIX + "\"" + taskList + "\", domain=\"" + domain + "\"")
71
              .build();
1✔
72
    }
73
    this.options = SingleWorkerOptions.newBuilder(options).setPollerOptions(pollerOptions).build();
1✔
74
  }
1✔
75

76
  @Override
77
  public void start() {
78
    if (handler.isAnyTypeSupported()) {
1✔
79
      SuspendableWorker poller =
1✔
80
          new Poller<>(
81
              options.getIdentity(),
1✔
82
              laPollTask,
83
              new PollTaskExecutor<>(domain, taskList, options, new TaskHandlerImpl(handler)),
84
              options.getPollerOptions(),
1✔
85
              options.getMetricsScope());
1✔
86
      poller.start();
1✔
87
      setPoller(poller);
1✔
88
      options.getMetricsScope().counter(MetricsType.WORKER_START_COUNTER).inc(1);
1✔
89
    }
90
  }
1✔
91

92
  public static class Task {
93
    private final ExecuteLocalActivityParameters params;
94
    private final Consumer<HistoryEvent> eventConsumer;
95
    private final LongSupplier currentTimeMillis;
96
    private final LongSupplier replayTimeUpdatedAtMillis;
97
    long taskStartTime;
98
    private final int decisionTimeoutSeconds;
99

100
    public Task(
101
        ExecuteLocalActivityParameters params,
102
        Consumer<HistoryEvent> eventConsumer,
103
        int decisionTimeoutSeconds,
104
        LongSupplier currentTimeMillis,
105
        LongSupplier replayTimeUpdatedAtMillis) {
1✔
106
      this.params = params;
1✔
107
      this.eventConsumer = eventConsumer;
1✔
108
      this.currentTimeMillis = currentTimeMillis;
1✔
109
      this.replayTimeUpdatedAtMillis = replayTimeUpdatedAtMillis;
1✔
110
      this.decisionTimeoutSeconds = decisionTimeoutSeconds;
1✔
111
    }
1✔
112

113
    public ExecuteLocalActivityParameters getExecuteLocalActivityParameters() {
114
      return params;
1✔
115
    }
116
  }
117

118
  public BiFunction<Task, Duration, Boolean> getLocalActivityTaskPoller() {
119
    return laPollTask;
1✔
120
  }
121

122
  private class TaskHandlerImpl implements PollTaskExecutor.TaskHandler<Task> {
123

124
    final ActivityTaskHandler handler;
125

126
    private TaskHandlerImpl(ActivityTaskHandler handler) {
1✔
127
      this.handler = handler;
1✔
128
    }
1✔
129

130
    @Override
131
    public void handle(Task task) throws Exception {
132
      propagateContext(task.params);
1✔
133

134
      // start and activate span for local activities
135
      Span span = spanFactory.spanForExecuteLocalActivity(task);
1✔
136
      try (io.opentracing.Scope scope = tracer.activateSpan(span)) {
1✔
137
        task.taskStartTime = System.currentTimeMillis();
1✔
138
        ActivityTaskHandler.Result result = handleLocalActivity(task);
1✔
139

140
        LocalActivityMarkerData.Builder markerBuilder = new LocalActivityMarkerData.Builder();
1✔
141
        markerBuilder.setActivityId(task.params.getActivityId());
1✔
142
        markerBuilder.setActivityType(task.params.getActivityType());
1✔
143
        long replayTimeMillis =
1✔
144
            task.currentTimeMillis.getAsLong()
1✔
145
                + (System.currentTimeMillis() - task.replayTimeUpdatedAtMillis.getAsLong());
1✔
146
        markerBuilder.setReplayTimeMillis(replayTimeMillis);
1✔
147

148
        if (result.getTaskCompleted() != null) {
1✔
149
          markerBuilder.setResult(result.getTaskCompleted().getResult());
1✔
150
        } else if (result.getTaskFailedResult() != null) {
1✔
151
          markerBuilder.setTaskFailedRequest(result.getTaskFailedResult().getTaskFailedRequest());
1✔
152
          markerBuilder.setAttempt(result.getAttempt());
1✔
153
          markerBuilder.setBackoff(result.getBackoff());
1✔
154
        } else {
NEW
155
          markerBuilder.setTaskCancelledRequest(result.getTaskCancelled());
×
156
        }
157

158
        LocalActivityMarkerData marker = markerBuilder.build();
1✔
159

160
        HistoryEvent event = new HistoryEvent();
1✔
161
        event.setEventType(EventType.MarkerRecorded);
1✔
162
        MarkerRecordedEventAttributes attributes =
1✔
163
            new MarkerRecordedEventAttributes()
164
                .setMarkerName(ClockDecisionContext.LOCAL_ACTIVITY_MARKER_NAME)
1✔
165
                .setHeader(marker.getHeader(options.getDataConverter()))
1✔
166
                .setDetails(marker.getResult());
1✔
167
        event.setMarkerRecordedEventAttributes(attributes);
1✔
168
        task.eventConsumer.accept(event);
1✔
169
      } finally {
170
        span.finish();
1✔
171
      }
172
    }
1✔
173

174
    @Override
175
    public Throwable wrapFailure(Task task, Throwable failure) {
176
      return new RuntimeException("Failure processing local activity task.", failure);
×
177
    }
178

179
    private ActivityTaskHandler.Result handleLocalActivity(Task task) throws InterruptedException {
180
      Map<String, String> activityTypeTag =
1✔
181
          new ImmutableMap.Builder<String, String>(1)
182
              .put(MetricsTag.ACTIVITY_TYPE, task.params.getActivityType().getName())
1✔
183
              .build();
1✔
184

185
      Scope metricsScope = options.getMetricsScope().tagged(activityTypeTag);
1✔
186
      metricsScope.counter(MetricsType.LOCAL_ACTIVITY_TOTAL_COUNTER).inc(1);
1✔
187

188
      PollForActivityTaskResponse pollTask = new PollForActivityTaskResponse();
1✔
189
      pollTask.setWorkflowDomain(task.params.getWorkflowDomain());
1✔
190
      pollTask.setActivityId(task.params.getActivityId());
1✔
191
      pollTask.setWorkflowExecution(task.params.getWorkflowExecution());
1✔
192
      pollTask.setScheduledTimestamp(System.currentTimeMillis());
1✔
193
      pollTask.setStartedTimestamp(System.currentTimeMillis());
1✔
194
      pollTask.setActivityType(task.params.getActivityType());
1✔
195
      pollTask.setInput(task.params.getInput());
1✔
196
      pollTask.setAttempt(task.params.getAttempt());
1✔
197

198
      Stopwatch sw = metricsScope.timer(MetricsType.LOCAL_ACTIVITY_EXECUTION_LATENCY).start();
1✔
199
      ActivityTaskHandler.Result result = handler.handle(pollTask, metricsScope, true);
1✔
200
      sw.stop();
1✔
201
      result.setAttempt(task.params.getAttempt());
1✔
202

203
      if (result.getTaskCompleted() != null
1✔
204
          || result.getTaskCancelled() != null
1✔
205
          || task.params.getRetryOptions() == null) {
1✔
206
        return result;
1✔
207
      }
208

209
      RetryOptions retryOptions = task.params.getRetryOptions();
1✔
210
      long sleepMillis = retryOptions.calculateSleepTime(task.params.getAttempt());
1✔
211
      long elapsedTask = System.currentTimeMillis() - task.taskStartTime;
1✔
212
      long elapsedTotal = elapsedTask + task.params.getElapsedTime();
1✔
213
      if (retryOptions.shouldRethrow(
1✔
214
          result.getTaskFailedResult().getFailure(),
1✔
215
          task.params.getAttempt(),
1✔
216
          elapsedTotal,
217
          sleepMillis)) {
218
        return result;
1✔
219
      } else {
220
        result.setBackoff(Duration.ofMillis(sleepMillis));
1✔
221
      }
222

223
      // For small backoff we do local retry. Otherwise we will schedule timer on server side.
224
      if (elapsedTask + sleepMillis < task.decisionTimeoutSeconds * 1000) {
1✔
225
        Thread.sleep(sleepMillis);
1✔
226
        task.params.setAttempt(task.params.getAttempt() + 1);
1✔
227
        return handleLocalActivity(task);
1✔
228
      } else {
229
        return result;
1✔
230
      }
231
    }
232
  }
233

234
  private void propagateContext(ExecuteLocalActivityParameters params) {
235
    if (options.getContextPropagators() == null || options.getContextPropagators().isEmpty()) {
1✔
236
      return;
1✔
237
    }
238

239
    Optional.ofNullable(params.getContext())
1✔
240
        .filter(context -> !context.isEmpty())
1✔
241
        .ifPresent(this::restoreContext);
1✔
242
  }
1✔
243

244
  private void restoreContext(Map<String, byte[]> context) {
245
    options
1✔
246
        .getContextPropagators()
1✔
247
        .forEach(
1✔
248
            propagator -> propagator.setCurrentContext(propagator.deserializeContext(context)));
1✔
249
  }
1✔
250
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc