• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

uber / cadence-java-client / 2539

22 Oct 2024 10:48PM UTC coverage: 65.4% (+0.08%) from 65.318%
2539

Pull #923

buildkite

natemort
Refactor Test environment initialization to CadenceTestRule from WorkflowTest.

WorkflowTest is currently 6,000 lines long and has nearly every test related to end to end client behavior. It provides the rather neat behavior that it supports running against both an instance of Cadence running in Docker and against the test version. It's additionally parameterized to run the entire test suite with or without sticky execution enabled.

Due to the complexity in handling both environments, adding yet another test to WorkflowTest has always been the easiest option for developers. To allow for tests to easily be split into other files, extract the core functionality to a Junit test rule that can easily be reused by additional tests.

With the exception of testSignalCrossDomainExternalWorkflow and the replay tests that don't use the test environment, all tests have been left in WorkflowTest to be split out later.
Pull Request #923: Refactor Test Environment and Worker initialization to CadenceTestRule

12755 of 19503 relevant lines covered (65.4%)

0.65 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

78.91
/src/main/java/com/uber/cadence/internal/worker/Poller.java
1
/*
2
 *  Copyright 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved.
3
 *
4
 *  Modifications copyright (C) 2017 Uber Technologies, Inc.
5
 *
6
 *  Licensed under the Apache License, Version 2.0 (the "License"). You may not
7
 *  use this file except in compliance with the License. A copy of the License is
8
 *  located at
9
 *
10
 *  http://aws.amazon.com/apache2.0
11
 *
12
 *  or in the "license" file accompanying this file. This file is distributed on
13
 *  an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
14
 *  express or implied. See the License for the specific language governing
15
 *  permissions and limitations under the License.
16
 */
17

18
package com.uber.cadence.internal.worker;
19

20
import com.uber.cadence.internal.common.BackoffThrottler;
21
import com.uber.cadence.internal.common.InternalUtils;
22
import com.uber.cadence.internal.metrics.MetricsType;
23
import com.uber.cadence.internal.worker.autoscaler.AutoScaler;
24
import com.uber.cadence.internal.worker.autoscaler.AutoScalerFactory;
25
import com.uber.m3.tally.Scope;
26
import java.util.Objects;
27
import java.util.concurrent.ArrayBlockingQueue;
28
import java.util.concurrent.CountDownLatch;
29
import java.util.concurrent.ThreadPoolExecutor;
30
import java.util.concurrent.TimeUnit;
31
import java.util.concurrent.atomic.AtomicReference;
32
import org.apache.thrift.TException;
33
import org.apache.thrift.transport.TTransportException;
34
import org.slf4j.Logger;
35
import org.slf4j.LoggerFactory;
36

37
public final class Poller<T> implements SuspendableWorker {
38

39
  public interface PollTask<TT> {
40
    TT poll() throws TException;
41
  }
42

43
  interface ThrowingRunnable {
44
    void run() throws Throwable;
45
  }
46

47
  private final String identity;
48
  private final ShutdownableTaskExecutor<T> taskExecutor;
49
  private final PollTask<T> pollTask;
50
  private final PollerOptions pollerOptions;
51
  private static final Logger log = LoggerFactory.getLogger(Poller.class);
1✔
52
  private ThreadPoolExecutor pollExecutor;
53
  private final Scope metricsScope;
54

55
  private final AtomicReference<CountDownLatch> suspendLatch = new AtomicReference<>();
1✔
56

57
  private BackoffThrottler pollBackoffThrottler;
58
  private Throttler pollRateThrottler;
59

60
  private Thread.UncaughtExceptionHandler uncaughtExceptionHandler =
1✔
61
      (t, e) -> {
62
        if (e instanceof TTransportException) {
1✔
63
          TTransportException te = (TTransportException) e;
×
64
          if (te.getType() == TTransportException.TIMED_OUT) {
×
65
            log.warn("Failure in thread " + t.getName(), e);
×
66
            return;
×
67
          }
68
        }
69

70
        log.error("Failure in thread " + t.getName(), e);
1✔
71
      };
1✔
72

73
  private final AutoScaler pollerAutoScaler;
74

75
  public Poller(
76
      String identity,
77
      PollTask<T> pollTask,
78
      ShutdownableTaskExecutor<T> taskExecutor,
79
      PollerOptions pollerOptions,
80
      Scope metricsScope) {
1✔
81
    Objects.requireNonNull(identity, "identity cannot be null");
1✔
82
    Objects.requireNonNull(pollTask, "poll service should not be null");
1✔
83
    Objects.requireNonNull(taskExecutor, "taskExecutor should not be null");
1✔
84
    Objects.requireNonNull(pollerOptions, "pollerOptions should not be null");
1✔
85
    Objects.requireNonNull(metricsScope, "metricsScope should not be null");
1✔
86

87
    this.identity = identity;
1✔
88
    this.pollTask = pollTask;
1✔
89
    this.taskExecutor = taskExecutor;
1✔
90
    this.pollerOptions = pollerOptions;
1✔
91
    this.metricsScope = metricsScope;
1✔
92
    this.pollerAutoScaler = AutoScalerFactory.getInstance().createAutoScaler(pollerOptions);
1✔
93
  }
1✔
94

95
  @Override
96
  public void start() {
97
    if (log.isDebugEnabled()) {
1✔
98
      log.debug("start(): " + toString());
×
99
    }
100
    if (pollerOptions.getMaximumPollRatePerSecond() > 0.0) {
1✔
101
      pollRateThrottler =
×
102
          new Throttler(
103
              "poller",
104
              pollerOptions.getMaximumPollRatePerSecond(),
×
105
              pollerOptions.getMaximumPollRateIntervalMilliseconds());
×
106
    }
107

108
    // It is important to pass blocking queue of at least options.getPollThreadCount() capacity.
109
    // As task enqueues next task the buffering is needed to queue task until the previous one
110
    // releases a thread.
111
    pollExecutor =
1✔
112
        new ThreadPoolExecutor(
113
            pollerOptions.getPollThreadCount(),
1✔
114
            pollerOptions.getPollThreadCount(),
1✔
115
            1,
116
            TimeUnit.SECONDS,
117
            new ArrayBlockingQueue<>(pollerOptions.getPollThreadCount()));
1✔
118
    pollExecutor.setThreadFactory(
1✔
119
        new ExecutorThreadFactory(
120
            pollerOptions.getPollThreadNamePrefix(), pollerOptions.getUncaughtExceptionHandler()));
1✔
121

122
    pollBackoffThrottler =
1✔
123
        new BackoffThrottler(
124
            pollerOptions.getPollBackoffInitialInterval(),
1✔
125
            pollerOptions.getPollBackoffMaximumInterval(),
1✔
126
            pollerOptions.getPollBackoffCoefficient());
1✔
127
    for (int i = 0; i < pollerOptions.getPollThreadCount(); i++) {
1✔
128
      pollExecutor.execute(new PollLoopTask(new PollExecutionTask()));
1✔
129
      metricsScope.counter(MetricsType.POLLER_START_COUNTER).inc(1);
1✔
130
    }
131

132
    pollerAutoScaler.start();
1✔
133
  }
1✔
134

135
  @Override
136
  public boolean isStarted() {
137
    return pollExecutor != null;
1✔
138
  }
139

140
  @Override
141
  public boolean isShutdown() {
142
    return pollExecutor.isShutdown() && taskExecutor.isShutdown();
×
143
  }
144

145
  @Override
146
  public boolean isTerminated() {
147
    return pollExecutor.isTerminated() && taskExecutor.isTerminated();
1✔
148
  }
149

150
  @Override
151
  public void shutdown() {
152
    log.debug("shutdown");
1✔
153
    if (!isStarted()) {
1✔
154
      return;
×
155
    }
156
    // shutdownNow and then await to stop long polling and ensure that no new tasks
157
    // are dispatched to the taskExecutor.
158
    pollExecutor.shutdownNow();
1✔
159
    try {
160
      pollExecutor.awaitTermination(1, TimeUnit.SECONDS);
1✔
161
    } catch (InterruptedException e) {
×
162
    }
1✔
163
    taskExecutor.shutdown();
1✔
164
    pollerAutoScaler.stop();
1✔
165
  }
1✔
166

167
  @Override
168
  public void shutdownNow() {
169
    if (log.isDebugEnabled()) {
1✔
170
      log.debug("shutdownNow poller=" + this.pollerOptions.getPollThreadNamePrefix());
×
171
    }
172
    if (!isStarted()) {
1✔
173
      return;
1✔
174
    }
175
    pollExecutor.shutdownNow();
1✔
176
    taskExecutor.shutdownNow();
1✔
177
  }
1✔
178

179
  @Override
180
  public void awaitTermination(long timeout, TimeUnit unit) {
181
    if (!isStarted()) {
1✔
182
      return;
1✔
183
    }
184
    long timeoutMillis = unit.toMillis(timeout);
1✔
185
    timeoutMillis = InternalUtils.awaitTermination(pollExecutor, timeoutMillis);
1✔
186
    InternalUtils.awaitTermination(taskExecutor, timeoutMillis);
1✔
187
  }
1✔
188

189
  @Override
190
  public void suspendPolling() {
191
    log.info("suspendPolling");
1✔
192
    suspendLatch.set(new CountDownLatch(1));
1✔
193
  }
1✔
194

195
  @Override
196
  public void resumePolling() {
197
    log.info("resumePolling");
1✔
198
    CountDownLatch existing = suspendLatch.getAndSet(null);
1✔
199
    if (existing != null) {
1✔
200
      existing.countDown();
1✔
201
    }
202
  }
1✔
203

204
  @Override
205
  public boolean isSuspended() {
206
    return suspendLatch.get() != null;
×
207
  }
208

209
  @Override
210
  public String toString() {
211
    return "Poller{" + "options=" + pollerOptions + ", identity=" + identity + '}';
×
212
  }
213

214
  private class PollLoopTask implements Runnable {
215

216
    private final Poller.ThrowingRunnable task;
217

218
    PollLoopTask(Poller.ThrowingRunnable task) {
1✔
219
      this.task = task;
1✔
220
    }
1✔
221

222
    @Override
223
    public void run() {
224
      try {
225
        if (pollExecutor.isTerminating()) {
1✔
226
          return;
×
227
        }
228
        pollBackoffThrottler.throttle();
1✔
229
        if (pollExecutor.isTerminating()) {
1✔
230
          return;
×
231
        }
232
        if (pollRateThrottler != null) {
1✔
233
          pollRateThrottler.throttle();
×
234
        }
235

236
        CountDownLatch suspender = Poller.this.suspendLatch.get();
1✔
237
        if (suspender != null) {
1✔
238
          if (log.isDebugEnabled()) {
1✔
239
            log.debug("poll task suspending latchCount=" + suspender.getCount());
×
240
          }
241
          suspender.await();
1✔
242
        }
243

244
        if (pollExecutor.isTerminating()) {
1✔
245
          return;
×
246
        }
247
        task.run();
1✔
248
        pollBackoffThrottler.success();
1✔
249
      } catch (Throwable e) {
1✔
250
        pollBackoffThrottler.failure();
1✔
251
        if (!(e.getCause() instanceof InterruptedException)) {
1✔
252
          uncaughtExceptionHandler.uncaughtException(Thread.currentThread(), e);
1✔
253
        }
254
      } finally {
255
        // Resubmit itself back to pollExecutor
256
        if (!pollExecutor.isTerminating()) {
1✔
257
          pollExecutor.execute(this);
1✔
258
        } else {
259
          log.debug("poll loop done");
1✔
260
        }
261
      }
262
    }
1✔
263
  }
264

265
  private class PollExecutionTask implements Poller.ThrowingRunnable {
266
    private static final int EXECUTOR_CAPACITY_CHECK_INTERVAL_MS = 100;
267
    private static final int EXECUTOR_CAPACITY_CHECK_OFFSET_MS = 10;
268

269
    PollExecutionTask() {}
1✔
270

271
    @Override
272
    public void run() throws Exception {
273
      try {
274
        pollerAutoScaler.acquire();
1✔
275
        try {
276
          T task = pollTask.poll();
1✔
277
          if (task == null) {
1✔
278
            pollerAutoScaler.increaseNoopPollCount();
1✔
279
            return;
1✔
280
          }
281

282
          pollerAutoScaler.increaseActionablePollCount();
1✔
283
          taskExecutor.process(task);
1✔
284
        } finally {
285
          checkIfTaskHasExecutorHasCapacity();
1✔
286
        }
287
      } finally {
288
        pollerAutoScaler.release();
1✔
289
      }
290
    }
1✔
291

292
    private void checkIfTaskHasExecutorHasCapacity() {
293
      if (pollerOptions.getPollOnlyIfExecutorHasCapacity()) {
1✔
294
        while (true) {
295
          // sleep to avoid racing condition
296
          try {
297
            Thread.sleep(EXECUTOR_CAPACITY_CHECK_OFFSET_MS);
×
298
          } catch (InterruptedException ignored) {
×
299
          }
×
300
          if (taskExecutor.hasCapacity()) {
×
301
            break;
×
302
          } else {
303
            // sleep to avoid busy loop
304
            try {
305
              Thread.sleep(EXECUTOR_CAPACITY_CHECK_INTERVAL_MS);
×
306
            } catch (InterruptedException ignored) {
×
307
            }
×
308
          }
309
        }
310
      }
311
    }
1✔
312
  }
313
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc