• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

uber / cadence-java-client / 2304

03 May 2024 05:14PM UTC coverage: 61.419% (-0.06%) from 61.481%
2304

Pull #890

buildkite

natemort
Lock TestWorkflowStoreImpl when listing workflows

This is the only path to the backing histories map that isn't locked. When running a test containing multiple child workflows in parallel where one child workflow attempts to call listWorkflows I'm able to consistently reproduce a concurrent modification exception.

Despite being a change to locking, this change is rather low risk. The lock is reentrant and there are no other locks acquired while holding this lock, so there's no chance of deadlock. In addition, this codepath isn't use internally at all. It exists only to implement ListOpenWorkflowExecutions and ListClosedWorkflowExecutions on the client, allowing test code to make these RPCs. This is likely why it hasn't been caught before.
Pull Request #890: Lock TestWorkflowStoreImpl when listing workflows

43 of 46 new or added lines in 1 file covered. (93.48%)

15 existing lines in 7 files now uncovered.

11951 of 19458 relevant lines covered (61.42%)

0.61 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

78.13
/src/main/java/com/uber/cadence/internal/worker/Poller.java
1
/*
2
 *  Copyright 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved.
3
 *
4
 *  Modifications copyright (C) 2017 Uber Technologies, Inc.
5
 *
6
 *  Licensed under the Apache License, Version 2.0 (the "License"). You may not
7
 *  use this file except in compliance with the License. A copy of the License is
8
 *  located at
9
 *
10
 *  http://aws.amazon.com/apache2.0
11
 *
12
 *  or in the "license" file accompanying this file. This file is distributed on
13
 *  an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
14
 *  express or implied. See the License for the specific language governing
15
 *  permissions and limitations under the License.
16
 */
17

18
package com.uber.cadence.internal.worker;
19

20
import com.uber.cadence.internal.common.BackoffThrottler;
21
import com.uber.cadence.internal.common.InternalUtils;
22
import com.uber.cadence.internal.metrics.MetricsType;
23
import com.uber.cadence.internal.worker.autoscaler.AutoScaler;
24
import com.uber.cadence.internal.worker.autoscaler.AutoScalerFactory;
25
import com.uber.m3.tally.Scope;
26
import java.util.Objects;
27
import java.util.concurrent.ArrayBlockingQueue;
28
import java.util.concurrent.CountDownLatch;
29
import java.util.concurrent.ThreadPoolExecutor;
30
import java.util.concurrent.TimeUnit;
31
import java.util.concurrent.atomic.AtomicReference;
32
import org.apache.thrift.TException;
33
import org.apache.thrift.transport.TTransportException;
34
import org.slf4j.Logger;
35
import org.slf4j.LoggerFactory;
36

37
public final class Poller<T> implements SuspendableWorker {
38

39
  public interface PollTask<TT> {
40
    TT poll() throws TException;
41
  }
42

43
  interface ThrowingRunnable {
44
    void run() throws Throwable;
45
  }
46

47
  private final String identity;
48
  private final ShutdownableTaskExecutor<T> taskExecutor;
49
  private final PollTask<T> pollTask;
50
  private final PollerOptions pollerOptions;
51
  private static final Logger log = LoggerFactory.getLogger(Poller.class);
1✔
52
  private ThreadPoolExecutor pollExecutor;
53
  private final Scope metricsScope;
54

55
  private final AtomicReference<CountDownLatch> suspendLatch = new AtomicReference<>();
1✔
56

57
  private BackoffThrottler pollBackoffThrottler;
58
  private Throttler pollRateThrottler;
59

60
  private Thread.UncaughtExceptionHandler uncaughtExceptionHandler =
1✔
61
      (t, e) -> {
62
        if (e instanceof TTransportException) {
1✔
63
          TTransportException te = (TTransportException) e;
×
64
          if (te.getType() == TTransportException.TIMED_OUT) {
×
65
            log.warn("Failure in thread " + t.getName(), e);
×
66
            return;
×
67
          }
68
        }
69

70
        log.error("Failure in thread " + t.getName(), e);
1✔
71
      };
1✔
72

73
  private final AutoScaler pollerAutoScaler;
74

75
  public Poller(
76
      String identity,
77
      PollTask<T> pollTask,
78
      ShutdownableTaskExecutor<T> taskExecutor,
79
      PollerOptions pollerOptions,
80
      Scope metricsScope) {
1✔
81
    Objects.requireNonNull(identity, "identity cannot be null");
1✔
82
    Objects.requireNonNull(pollTask, "poll service should not be null");
1✔
83
    Objects.requireNonNull(taskExecutor, "taskExecutor should not be null");
1✔
84
    Objects.requireNonNull(pollerOptions, "pollerOptions should not be null");
1✔
85
    Objects.requireNonNull(metricsScope, "metricsScope should not be null");
1✔
86

87
    this.identity = identity;
1✔
88
    this.pollTask = pollTask;
1✔
89
    this.taskExecutor = taskExecutor;
1✔
90
    this.pollerOptions = pollerOptions;
1✔
91
    this.metricsScope = metricsScope;
1✔
92
    this.pollerAutoScaler = AutoScalerFactory.getInstance().createAutoScaler(pollerOptions);
1✔
93
  }
1✔
94

95
  @Override
96
  public void start() {
97
    if (log.isDebugEnabled()) {
1✔
98
      log.debug("start(): " + toString());
×
99
    }
100
    if (pollerOptions.getMaximumPollRatePerSecond() > 0.0) {
1✔
101
      pollRateThrottler =
×
102
          new Throttler(
103
              "poller",
104
              pollerOptions.getMaximumPollRatePerSecond(),
×
105
              pollerOptions.getMaximumPollRateIntervalMilliseconds());
×
106
    }
107

108
    // It is important to pass blocking queue of at least options.getPollThreadCount() capacity.
109
    // As task enqueues next task the buffering is needed to queue task until the previous one
110
    // releases a thread.
111
    pollExecutor =
1✔
112
        new ThreadPoolExecutor(
113
            pollerOptions.getPollThreadCount(),
1✔
114
            pollerOptions.getPollThreadCount(),
1✔
115
            1,
116
            TimeUnit.SECONDS,
117
            new ArrayBlockingQueue<>(pollerOptions.getPollThreadCount()));
1✔
118
    pollExecutor.setThreadFactory(
1✔
119
        new ExecutorThreadFactory(
120
            pollerOptions.getPollThreadNamePrefix(), pollerOptions.getUncaughtExceptionHandler()));
1✔
121

122
    pollBackoffThrottler =
1✔
123
        new BackoffThrottler(
124
            pollerOptions.getPollBackoffInitialInterval(),
1✔
125
            pollerOptions.getPollBackoffMaximumInterval(),
1✔
126
            pollerOptions.getPollBackoffCoefficient());
1✔
127
    for (int i = 0; i < pollerOptions.getPollThreadCount(); i++) {
1✔
128
      pollExecutor.execute(new PollLoopTask(new PollExecutionTask()));
1✔
129
      metricsScope.counter(MetricsType.POLLER_START_COUNTER).inc(1);
1✔
130
    }
131

132
    pollerAutoScaler.start();
1✔
133
  }
1✔
134

135
  @Override
136
  public boolean isStarted() {
137
    return pollExecutor != null;
1✔
138
  }
139

140
  @Override
141
  public boolean isShutdown() {
142
    return pollExecutor.isShutdown() && taskExecutor.isShutdown();
×
143
  }
144

145
  @Override
146
  public boolean isTerminated() {
147
    return pollExecutor.isTerminated() && taskExecutor.isTerminated();
×
148
  }
149

150
  @Override
151
  public void shutdown() {
152
    log.debug("shutdown");
1✔
153
    if (!isStarted()) {
1✔
154
      return;
×
155
    }
156
    // shutdownNow and then await to stop long polling and ensure that no new tasks
157
    // are dispatched to the taskExecutor.
158
    pollExecutor.shutdownNow();
1✔
159
    try {
160
      pollExecutor.awaitTermination(1, TimeUnit.SECONDS);
1✔
161
    } catch (InterruptedException e) {
×
162
    }
1✔
163
    taskExecutor.shutdown();
1✔
164
    pollerAutoScaler.stop();
1✔
165
  }
1✔
166

167
  @Override
168
  public void shutdownNow() {
169
    if (log.isDebugEnabled()) {
1✔
170
      log.debug("shutdownNow poller=" + this.pollerOptions.getPollThreadNamePrefix());
×
171
    }
172
    if (!isStarted()) {
1✔
173
      return;
1✔
174
    }
175
    pollExecutor.shutdownNow();
1✔
176
    taskExecutor.shutdownNow();
1✔
177
  }
1✔
178

179
  @Override
180
  public void awaitTermination(long timeout, TimeUnit unit) {
181
    if (!isStarted()) {
1✔
182
      return;
1✔
183
    }
184
    long timeoutMillis = unit.toMillis(timeout);
1✔
185
    timeoutMillis = InternalUtils.awaitTermination(pollExecutor, timeoutMillis);
1✔
186
    InternalUtils.awaitTermination(taskExecutor, timeoutMillis);
1✔
187
  }
1✔
188

189
  @Override
190
  public void suspendPolling() {
191
    log.info("suspendPolling");
1✔
192
    suspendLatch.set(new CountDownLatch(1));
1✔
193
  }
1✔
194

195
  @Override
196
  public void resumePolling() {
197
    log.info("resumePolling");
1✔
198
    CountDownLatch existing = suspendLatch.getAndSet(null);
1✔
199
    if (existing != null) {
1✔
200
      existing.countDown();
1✔
201
    }
202
  }
1✔
203

204
  @Override
205
  public boolean isSuspended() {
206
    return suspendLatch.get() != null;
×
207
  }
208

209
  @Override
210
  public String toString() {
211
    return "Poller{" + "options=" + pollerOptions + ", identity=" + identity + '}';
×
212
  }
213

214
  private class PollLoopTask implements Runnable {
215

216
    private final Poller.ThrowingRunnable task;
217

218
    PollLoopTask(Poller.ThrowingRunnable task) {
1✔
219
      this.task = task;
1✔
220
    }
1✔
221

222
    @Override
223
    public void run() {
224
      try {
225
        if (pollExecutor.isTerminating()) {
1✔
UNCOV
226
          return;
×
227
        }
228
        pollBackoffThrottler.throttle();
1✔
229
        if (pollExecutor.isTerminating()) {
1✔
230
          return;
×
231
        }
232
        if (pollRateThrottler != null) {
1✔
233
          pollRateThrottler.throttle();
×
234
        }
235

236
        CountDownLatch suspender = Poller.this.suspendLatch.get();
1✔
237
        if (suspender != null) {
1✔
238
          if (log.isDebugEnabled()) {
1✔
239
            log.debug("poll task suspending latchCount=" + suspender.getCount());
×
240
          }
241
          suspender.await();
1✔
242
        }
243

244
        if (pollExecutor.isTerminating()) {
1✔
245
          return;
×
246
        }
247
        task.run();
1✔
248
        pollBackoffThrottler.success();
1✔
249
      } catch (Throwable e) {
1✔
250
        pollBackoffThrottler.failure();
1✔
251
        if (!(e.getCause() instanceof InterruptedException)) {
1✔
252
          uncaughtExceptionHandler.uncaughtException(Thread.currentThread(), e);
1✔
253
        }
254
      } finally {
255
        // Resubmit itself back to pollExecutor
256
        if (!pollExecutor.isTerminating()) {
1✔
257
          pollExecutor.execute(this);
1✔
258
        } else {
259
          log.debug("poll loop done");
1✔
260
        }
261
      }
262
    }
1✔
263
  }
264

265
  private class PollExecutionTask implements Poller.ThrowingRunnable {
266
    private static final int EXECUTOR_CAPACITY_CHECK_INTERVAL_MS = 100;
267
    private static final int EXECUTOR_CAPACITY_CHECK_OFFSET_MS = 10;
268

269
    PollExecutionTask() {}
1✔
270

271
    @Override
272
    public void run() throws Exception {
273
      try {
274
        pollerAutoScaler.acquire();
1✔
275
        try {
276
          T task = pollTask.poll();
1✔
277
          if (task == null) {
1✔
278
            pollerAutoScaler.increaseNoopPollCount();
1✔
279
            return;
1✔
280
          }
281

282
          pollerAutoScaler.increaseActionablePollCount();
1✔
283
          taskExecutor.process(task);
1✔
284
        } finally {
285
          checkIfTaskHasExecutorHasCapacity();
1✔
286
        }
287
      } finally {
288
        pollerAutoScaler.release();
1✔
289
      }
290
    }
1✔
291

292
    private void checkIfTaskHasExecutorHasCapacity() {
293
      if (pollerOptions.getPollOnlyIfExecutorHasCapacity()) {
1✔
294
        while (true) {
295
          // sleep to avoid racing condition
296
          try {
297
            Thread.sleep(EXECUTOR_CAPACITY_CHECK_OFFSET_MS);
×
298
          } catch (InterruptedException ignored) {
×
299
          }
×
300
          if (taskExecutor.hasCapacity()) {
×
301
            break;
×
302
          } else {
303
            // sleep to avoid busy loop
304
            try {
305
              Thread.sleep(EXECUTOR_CAPACITY_CHECK_INTERVAL_MS);
×
306
            } catch (InterruptedException ignored) {
×
307
            }
×
308
          }
309
        }
310
      }
311
    }
1✔
312
  }
313
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc