• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

uber / cadence-java-client / 2541

23 Oct 2024 09:52PM CUT coverage: 66.108% (+0.8%) from 65.318%
2541

Pull #924

buildkite

natemort
Add missing proto to thrift enum mappings and unit tests for both EnumMappers

Add a suite of tests that will automatically fail if new enum types are added in IDL but the mappers aren't updated. This was done in a few ways:
- For enums that have mappers both directions, we roundtrip all thrift values and assert that they're equal.
- For enums that are identical across thrift/proto, we generate a mapping based on the value names and assert that the mapper matches it.
- For enums that aren't identical, we explicitly assert that our mapping contains all values.
Pull Request #924: Add missing proto to thrift enum mappings and unit tests for EnumMapper

2 of 2 new or added lines in 1 file covered. (100.0%)

8 existing lines in 4 files now uncovered.

12893 of 19503 relevant lines covered (66.11%)

0.66 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

78.13
/src/main/java/com/uber/cadence/internal/worker/Poller.java
1
/*
2
 *  Copyright 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved.
3
 *
4
 *  Modifications copyright (C) 2017 Uber Technologies, Inc.
5
 *
6
 *  Licensed under the Apache License, Version 2.0 (the "License"). You may not
7
 *  use this file except in compliance with the License. A copy of the License is
8
 *  located at
9
 *
10
 *  http://aws.amazon.com/apache2.0
11
 *
12
 *  or in the "license" file accompanying this file. This file is distributed on
13
 *  an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
14
 *  express or implied. See the License for the specific language governing
15
 *  permissions and limitations under the License.
16
 */
17

18
package com.uber.cadence.internal.worker;
19

20
import com.uber.cadence.internal.common.BackoffThrottler;
21
import com.uber.cadence.internal.common.InternalUtils;
22
import com.uber.cadence.internal.metrics.MetricsType;
23
import com.uber.cadence.internal.worker.autoscaler.AutoScaler;
24
import com.uber.cadence.internal.worker.autoscaler.AutoScalerFactory;
25
import com.uber.m3.tally.Scope;
26
import java.util.Objects;
27
import java.util.concurrent.ArrayBlockingQueue;
28
import java.util.concurrent.CountDownLatch;
29
import java.util.concurrent.ThreadPoolExecutor;
30
import java.util.concurrent.TimeUnit;
31
import java.util.concurrent.atomic.AtomicReference;
32
import org.apache.thrift.TException;
33
import org.apache.thrift.transport.TTransportException;
34
import org.slf4j.Logger;
35
import org.slf4j.LoggerFactory;
36

37
public final class Poller<T> implements SuspendableWorker {
38

39
  public interface PollTask<TT> {
40
    TT poll() throws TException;
41
  }
42

43
  interface ThrowingRunnable {
44
    void run() throws Throwable;
45
  }
46

47
  private final String identity;
48
  private final ShutdownableTaskExecutor<T> taskExecutor;
49
  private final PollTask<T> pollTask;
50
  private final PollerOptions pollerOptions;
51
  private static final Logger log = LoggerFactory.getLogger(Poller.class);
1✔
52
  private ThreadPoolExecutor pollExecutor;
53
  private final Scope metricsScope;
54

55
  private final AtomicReference<CountDownLatch> suspendLatch = new AtomicReference<>();
1✔
56

57
  private BackoffThrottler pollBackoffThrottler;
58
  private Throttler pollRateThrottler;
59

60
  private Thread.UncaughtExceptionHandler uncaughtExceptionHandler =
1✔
61
      (t, e) -> {
62
        if (e instanceof TTransportException) {
1✔
63
          TTransportException te = (TTransportException) e;
×
64
          if (te.getType() == TTransportException.TIMED_OUT) {
×
65
            log.warn("Failure in thread " + t.getName(), e);
×
66
            return;
×
67
          }
68
        }
69

70
        log.error("Failure in thread " + t.getName(), e);
1✔
71
      };
1✔
72

73
  private final AutoScaler pollerAutoScaler;
74

75
  public Poller(
76
      String identity,
77
      PollTask<T> pollTask,
78
      ShutdownableTaskExecutor<T> taskExecutor,
79
      PollerOptions pollerOptions,
80
      Scope metricsScope) {
1✔
81
    Objects.requireNonNull(identity, "identity cannot be null");
1✔
82
    Objects.requireNonNull(pollTask, "poll service should not be null");
1✔
83
    Objects.requireNonNull(taskExecutor, "taskExecutor should not be null");
1✔
84
    Objects.requireNonNull(pollerOptions, "pollerOptions should not be null");
1✔
85
    Objects.requireNonNull(metricsScope, "metricsScope should not be null");
1✔
86

87
    this.identity = identity;
1✔
88
    this.pollTask = pollTask;
1✔
89
    this.taskExecutor = taskExecutor;
1✔
90
    this.pollerOptions = pollerOptions;
1✔
91
    this.metricsScope = metricsScope;
1✔
92
    this.pollerAutoScaler = AutoScalerFactory.getInstance().createAutoScaler(pollerOptions);
1✔
93
  }
1✔
94

95
  @Override
96
  public void start() {
97
    if (log.isDebugEnabled()) {
1✔
98
      log.debug("start(): " + toString());
×
99
    }
100
    if (pollerOptions.getMaximumPollRatePerSecond() > 0.0) {
1✔
101
      pollRateThrottler =
×
102
          new Throttler(
103
              "poller",
104
              pollerOptions.getMaximumPollRatePerSecond(),
×
105
              pollerOptions.getMaximumPollRateIntervalMilliseconds());
×
106
    }
107

108
    // It is important to pass blocking queue of at least options.getPollThreadCount() capacity.
109
    // As task enqueues next task the buffering is needed to queue task until the previous one
110
    // releases a thread.
111
    pollExecutor =
1✔
112
        new ThreadPoolExecutor(
113
            pollerOptions.getPollThreadCount(),
1✔
114
            pollerOptions.getPollThreadCount(),
1✔
115
            1,
116
            TimeUnit.SECONDS,
117
            new ArrayBlockingQueue<>(pollerOptions.getPollThreadCount()));
1✔
118
    pollExecutor.setThreadFactory(
1✔
119
        new ExecutorThreadFactory(
120
            pollerOptions.getPollThreadNamePrefix(), pollerOptions.getUncaughtExceptionHandler()));
1✔
121

122
    pollBackoffThrottler =
1✔
123
        new BackoffThrottler(
124
            pollerOptions.getPollBackoffInitialInterval(),
1✔
125
            pollerOptions.getPollBackoffMaximumInterval(),
1✔
126
            pollerOptions.getPollBackoffCoefficient());
1✔
127
    for (int i = 0; i < pollerOptions.getPollThreadCount(); i++) {
1✔
128
      pollExecutor.execute(new PollLoopTask(new PollExecutionTask()));
1✔
129
      metricsScope.counter(MetricsType.POLLER_START_COUNTER).inc(1);
1✔
130
    }
131

132
    pollerAutoScaler.start();
1✔
133
  }
1✔
134

135
  @Override
136
  public boolean isStarted() {
137
    return pollExecutor != null;
1✔
138
  }
139

140
  @Override
141
  public boolean isShutdown() {
142
    return pollExecutor.isShutdown() && taskExecutor.isShutdown();
×
143
  }
144

145
  @Override
146
  public boolean isTerminated() {
147
    return pollExecutor.isTerminated() && taskExecutor.isTerminated();
×
148
  }
149

150
  @Override
151
  public void shutdown() {
152
    log.debug("shutdown");
1✔
153
    if (!isStarted()) {
1✔
154
      return;
×
155
    }
156
    // shutdownNow and then await to stop long polling and ensure that no new tasks
157
    // are dispatched to the taskExecutor.
158
    pollExecutor.shutdownNow();
1✔
159
    try {
160
      pollExecutor.awaitTermination(1, TimeUnit.SECONDS);
1✔
161
    } catch (InterruptedException e) {
×
162
    }
1✔
163
    taskExecutor.shutdown();
1✔
164
    pollerAutoScaler.stop();
1✔
165
  }
1✔
166

167
  @Override
168
  public void shutdownNow() {
169
    if (log.isDebugEnabled()) {
1✔
170
      log.debug("shutdownNow poller=" + this.pollerOptions.getPollThreadNamePrefix());
×
171
    }
172
    if (!isStarted()) {
1✔
173
      return;
1✔
174
    }
175
    pollExecutor.shutdownNow();
1✔
176
    taskExecutor.shutdownNow();
1✔
177
  }
1✔
178

179
  @Override
180
  public void awaitTermination(long timeout, TimeUnit unit) {
181
    if (!isStarted()) {
1✔
182
      return;
1✔
183
    }
184
    long timeoutMillis = unit.toMillis(timeout);
1✔
185
    timeoutMillis = InternalUtils.awaitTermination(pollExecutor, timeoutMillis);
1✔
186
    InternalUtils.awaitTermination(taskExecutor, timeoutMillis);
1✔
187
  }
1✔
188

189
  @Override
190
  public void suspendPolling() {
191
    log.info("suspendPolling");
1✔
192
    suspendLatch.set(new CountDownLatch(1));
1✔
193
  }
1✔
194

195
  @Override
196
  public void resumePolling() {
197
    log.info("resumePolling");
1✔
198
    CountDownLatch existing = suspendLatch.getAndSet(null);
1✔
199
    if (existing != null) {
1✔
200
      existing.countDown();
1✔
201
    }
202
  }
1✔
203

204
  @Override
205
  public boolean isSuspended() {
206
    return suspendLatch.get() != null;
×
207
  }
208

209
  @Override
210
  public String toString() {
211
    return "Poller{" + "options=" + pollerOptions + ", identity=" + identity + '}';
×
212
  }
213

214
  private class PollLoopTask implements Runnable {
215

216
    private final Poller.ThrowingRunnable task;
217

218
    PollLoopTask(Poller.ThrowingRunnable task) {
1✔
219
      this.task = task;
1✔
220
    }
1✔
221

222
    @Override
223
    public void run() {
224
      try {
225
        if (pollExecutor.isTerminating()) {
1✔
UNCOV
226
          return;
×
227
        }
228
        pollBackoffThrottler.throttle();
1✔
229
        if (pollExecutor.isTerminating()) {
1✔
230
          return;
×
231
        }
232
        if (pollRateThrottler != null) {
1✔
233
          pollRateThrottler.throttle();
×
234
        }
235

236
        CountDownLatch suspender = Poller.this.suspendLatch.get();
1✔
237
        if (suspender != null) {
1✔
238
          if (log.isDebugEnabled()) {
1✔
239
            log.debug("poll task suspending latchCount=" + suspender.getCount());
×
240
          }
241
          suspender.await();
1✔
242
        }
243

244
        if (pollExecutor.isTerminating()) {
1✔
245
          return;
×
246
        }
247
        task.run();
1✔
248
        pollBackoffThrottler.success();
1✔
249
      } catch (Throwable e) {
1✔
250
        pollBackoffThrottler.failure();
1✔
251
        if (!(e.getCause() instanceof InterruptedException)) {
1✔
252
          uncaughtExceptionHandler.uncaughtException(Thread.currentThread(), e);
1✔
253
        }
254
      } finally {
255
        // Resubmit itself back to pollExecutor
256
        if (!pollExecutor.isTerminating()) {
1✔
257
          pollExecutor.execute(this);
1✔
258
        } else {
259
          log.debug("poll loop done");
1✔
260
        }
261
      }
262
    }
1✔
263
  }
264

265
  private class PollExecutionTask implements Poller.ThrowingRunnable {
266
    private static final int EXECUTOR_CAPACITY_CHECK_INTERVAL_MS = 100;
267
    private static final int EXECUTOR_CAPACITY_CHECK_OFFSET_MS = 10;
268

269
    PollExecutionTask() {}
1✔
270

271
    @Override
272
    public void run() throws Exception {
273
      try {
274
        pollerAutoScaler.acquire();
1✔
275
        try {
276
          T task = pollTask.poll();
1✔
277
          if (task == null) {
1✔
278
            pollerAutoScaler.increaseNoopPollCount();
1✔
279
            return;
1✔
280
          }
281

282
          pollerAutoScaler.increaseActionablePollCount();
1✔
283
          taskExecutor.process(task);
1✔
284
        } finally {
285
          checkIfTaskHasExecutorHasCapacity();
1✔
286
        }
287
      } finally {
288
        pollerAutoScaler.release();
1✔
289
      }
290
    }
1✔
291

292
    private void checkIfTaskHasExecutorHasCapacity() {
293
      if (pollerOptions.getPollOnlyIfExecutorHasCapacity()) {
1✔
294
        while (true) {
295
          // sleep to avoid racing condition
296
          try {
297
            Thread.sleep(EXECUTOR_CAPACITY_CHECK_OFFSET_MS);
×
298
          } catch (InterruptedException ignored) {
×
299
          }
×
300
          if (taskExecutor.hasCapacity()) {
×
301
            break;
×
302
          } else {
303
            // sleep to avoid busy loop
304
            try {
305
              Thread.sleep(EXECUTOR_CAPACITY_CHECK_INTERVAL_MS);
×
306
            } catch (InterruptedException ignored) {
×
307
            }
×
308
          }
309
        }
310
      }
311
    }
1✔
312
  }
313
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc