• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

temporalio / sdk-java / #153

pending completion
#153

push

github-actions

web-flow
Eager Workflow Task Dispatch (#1674)

Issue #1646

Signed-off-by: Dmitry Spikhalskiy <dmitry@spikhalskiy.com>

213 of 213 new or added lines in 22 files covered. (100.0%)

16682 of 20566 relevant lines covered (81.11%)

0.81 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

81.68
/temporal-sdk/src/main/java/io/temporal/internal/worker/Poller.java
1
/*
2
 * Copyright (C) 2022 Temporal Technologies, Inc. All Rights Reserved.
3
 *
4
 * Copyright (C) 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved.
5
 *
6
 * Modifications copyright (C) 2017 Uber Technologies, Inc.
7
 *
8
 * Licensed under the Apache License, Version 2.0 (the "License");
9
 * you may not use this material except in compliance with the License.
10
 * You may obtain a copy of the License at
11
 *
12
 *   http://www.apache.org/licenses/LICENSE-2.0
13
 *
14
 * Unless required by applicable law or agreed to in writing, software
15
 * distributed under the License is distributed on an "AS IS" BASIS,
16
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17
 * See the License for the specific language governing permissions and
18
 * limitations under the License.
19
 */
20

21
package io.temporal.internal.worker;
22

23
import com.uber.m3.tally.Scope;
24
import io.grpc.Status;
25
import io.grpc.StatusRuntimeException;
26
import io.temporal.internal.BackoffThrottler;
27
import io.temporal.internal.common.GrpcUtils;
28
import io.temporal.worker.MetricsType;
29
import java.time.Duration;
30
import java.util.Objects;
31
import java.util.concurrent.*;
32
import java.util.concurrent.atomic.AtomicReference;
33
import org.slf4j.Logger;
34
import org.slf4j.LoggerFactory;
35

36
final class Poller<T> implements SuspendableWorker {
37

38
  public interface PollTask<TT> {
39
    /**
40
     * Pollers should shade or wrap all {@code java.lang.InterruptedException}s and raise {@code
41
     * Thread.interrupted()} flag. This follows GRPC stubs approach, see {@code
42
     * io.grpc.stub.ClientCalls#blockingUnaryCall}. Because pollers use GRPC stubs anyway, we chose
43
     * this implementation for consistency. The caller of the poll task is responsible for handling
44
     * the flag.
45
     *
46
     * @return result of the task
47
     */
48
    TT poll();
49
  }
50

51
  interface ThrowingRunnable {
52
    void run() throws Throwable;
53
  }
54

55
  private final String identity;
56
  private final ShutdownableTaskExecutor<T> taskExecutor;
57
  private final PollTask<T> pollTask;
58
  private final PollerOptions pollerOptions;
59
  private static final Logger log = LoggerFactory.getLogger(Poller.class);
1✔
60
  private ThreadPoolExecutor pollExecutor;
61
  private final Scope workerMetricsScope;
62

63
  private final AtomicReference<CountDownLatch> suspendLatch = new AtomicReference<>();
1✔
64

65
  private Throttler pollRateThrottler;
66

67
  private final Thread.UncaughtExceptionHandler uncaughtExceptionHandler =
1✔
68
      new PollerUncaughtExceptionHandler();
69

70
  public Poller(
71
      String identity,
72
      PollTask<T> pollTask,
73
      ShutdownableTaskExecutor<T> taskExecutor,
74
      PollerOptions pollerOptions,
75
      Scope workerMetricsScope) {
1✔
76
    Objects.requireNonNull(identity, "identity cannot be null");
1✔
77
    Objects.requireNonNull(pollTask, "poll service should not be null");
1✔
78
    Objects.requireNonNull(taskExecutor, "taskExecutor should not be null");
1✔
79
    Objects.requireNonNull(pollerOptions, "pollerOptions should not be null");
1✔
80
    Objects.requireNonNull(workerMetricsScope, "workerMetricsScope should not be null");
1✔
81

82
    this.identity = identity;
1✔
83
    this.pollTask = pollTask;
1✔
84
    this.taskExecutor = taskExecutor;
1✔
85
    this.pollerOptions = pollerOptions;
1✔
86
    this.workerMetricsScope = workerMetricsScope;
1✔
87
  }
1✔
88

89
  @Override
90
  public boolean start() {
91
    log.info("start: {}", this);
1✔
92

93
    if (pollerOptions.getMaximumPollRatePerSecond() > 0.0) {
1✔
94
      pollRateThrottler =
×
95
          new Throttler(
96
              "poller",
97
              pollerOptions.getMaximumPollRatePerSecond(),
×
98
              pollerOptions.getMaximumPollRateIntervalMilliseconds());
×
99
    }
100

101
    // It is important to pass blocking queue of at least options.getPollThreadCount() capacity. As
102
    // task enqueues next task the buffering is needed to queue task until the previous one releases
103
    // a thread.
104
    pollExecutor =
1✔
105
        new ThreadPoolExecutor(
106
            pollerOptions.getPollThreadCount(),
1✔
107
            pollerOptions.getPollThreadCount(),
1✔
108
            1,
109
            TimeUnit.SECONDS,
110
            new ArrayBlockingQueue<>(pollerOptions.getPollThreadCount()));
1✔
111
    pollExecutor.setThreadFactory(
1✔
112
        new ExecutorThreadFactory(
113
            pollerOptions.getPollThreadNamePrefix(), pollerOptions.getUncaughtExceptionHandler()));
1✔
114

115
    for (int i = 0; i < pollerOptions.getPollThreadCount(); i++) {
1✔
116
      pollExecutor.execute(new PollLoopTask(new PollExecutionTask()));
1✔
117
      workerMetricsScope.counter(MetricsType.POLLER_START_COUNTER).inc(1);
1✔
118
    }
119

120
    return true;
1✔
121
  }
122

123
  @Override
124
  public CompletableFuture<Void> shutdown(ShutdownManager shutdownManager, boolean interruptTasks) {
125
    log.info("shutdown: {}", this);
1✔
126
    WorkerLifecycleState lifecycleState = getLifecycleState();
1✔
127
    switch (lifecycleState) {
1✔
128
      case NOT_STARTED:
129
      case TERMINATED:
130
        return CompletableFuture.completedFuture(null);
1✔
131
    }
132

133
    return shutdownManager
1✔
134
        // it's ok to forcefully shutdown pollers, especially because they stuck in a long poll call
135
        // we don't lose any progress doing that
136
        .shutdownExecutorNow(pollExecutor, this + "#pollExecutor", Duration.ofSeconds(1))
1✔
137
        // TODO Poller shouldn't shutdown taskExecutor, because it gets it already created
138
        //  externally. Creator of taskExecutor should be responsible for it's shutdown.
139
        .thenCompose(ignore -> taskExecutor.shutdown(shutdownManager, interruptTasks))
1✔
140
        .exceptionally(
1✔
141
            e -> {
142
              log.error("Unexpected exception during shutdown", e);
×
143
              return null;
×
144
            });
145
  }
146

147
  @Override
148
  public void awaitTermination(long timeout, TimeUnit unit) {
149
    WorkerLifecycleState lifecycleState = getLifecycleState();
1✔
150
    switch (lifecycleState) {
1✔
151
      case NOT_STARTED:
152
      case TERMINATED:
153
        return;
1✔
154
    }
155

156
    long timeoutMillis = unit.toMillis(timeout);
1✔
157
    timeoutMillis = ShutdownManager.awaitTermination(pollExecutor, timeoutMillis);
1✔
158
    ShutdownManager.awaitTermination(taskExecutor, timeoutMillis);
1✔
159
  }
1✔
160

161
  @Override
162
  public void suspendPolling() {
163
    if (suspendLatch.compareAndSet(null, new CountDownLatch(1))) {
1✔
164
      log.info("Suspend Polling: {}", this);
1✔
165
    } else {
166
      log.info("Polling is already suspended: {}", this);
×
167
    }
168
  }
1✔
169

170
  @Override
171
  public void resumePolling() {
172
    CountDownLatch existing = suspendLatch.getAndSet(null);
1✔
173
    if (existing != null) {
1✔
174
      log.info("Resume Polling {}", this);
1✔
175
      existing.countDown();
1✔
176
    }
177
  }
1✔
178

179
  @Override
180
  public boolean isSuspended() {
181
    return suspendLatch.get() != null;
1✔
182
  }
183

184
  @Override
185
  public boolean isShutdown() {
186
    return pollExecutor.isShutdown();
×
187
  }
188

189
  @Override
190
  public boolean isTerminated() {
191
    return pollExecutor.isTerminated() && taskExecutor.isTerminated();
×
192
  }
193

194
  @Override
195
  public WorkerLifecycleState getLifecycleState() {
196
    if (pollExecutor == null) {
1✔
197
      return WorkerLifecycleState.NOT_STARTED;
×
198
    }
199
    if (suspendLatch.get() != null) {
1✔
200
      return WorkerLifecycleState.SUSPENDED;
1✔
201
    }
202
    if (pollExecutor.isShutdown()) {
1✔
203
      // return TERMINATED only if both pollExecutor and taskExecutor are terminated
204
      if (pollExecutor.isTerminated() && taskExecutor.isTerminated()) {
1✔
205
        return WorkerLifecycleState.TERMINATED;
1✔
206
      } else {
207
        return WorkerLifecycleState.SHUTDOWN;
1✔
208
      }
209
    }
210
    return WorkerLifecycleState.ACTIVE;
1✔
211
  }
212

213
  @Override
214
  public String toString() {
215
    // TODO using pollThreadNamePrefix here is ugly. We should consider introducing some concept of
216
    // WorkerContext [workerIdentity, namespace, queue, local/non-local if applicable] and pass it
217
    // around
218
    // that will simplify such kind of logging through workers.
219
    return String.format(
1✔
220
        "Poller{name=%s, identity=%s}", pollerOptions.getPollThreadNamePrefix(), identity);
1✔
221
  }
222

223
  private class PollLoopTask implements Runnable {
224

225
    private final Poller.ThrowingRunnable task;
226
    private final BackoffThrottler pollBackoffThrottler;
227

228
    PollLoopTask(Poller.ThrowingRunnable task) {
1✔
229
      this.task = task;
1✔
230
      this.pollBackoffThrottler =
1✔
231
          new BackoffThrottler(
232
              pollerOptions.getBackoffInitialInterval(),
1✔
233
              pollerOptions.getBackoffCongestionInitialInterval(),
1✔
234
              pollerOptions.getBackoffMaximumInterval(),
1✔
235
              pollerOptions.getBackoffCoefficient(),
1✔
236
              pollerOptions.getBackoffMaximumJitterCoefficient());
1✔
237
    }
1✔
238

239
    @Override
240
    public void run() {
241
      try {
242
        long throttleMs = pollBackoffThrottler.getSleepTime();
1✔
243
        if (throttleMs > 0) {
1✔
244
          Thread.sleep(throttleMs);
×
245
        }
246
        if (pollRateThrottler != null) {
1✔
247
          pollRateThrottler.throttle();
×
248
        }
249

250
        CountDownLatch suspender = Poller.this.suspendLatch.get();
1✔
251
        if (suspender != null) {
1✔
252
          if (log.isDebugEnabled()) {
1✔
253
            log.debug("poll task suspending latchCount=" + suspender.getCount());
×
254
          }
255
          suspender.await();
×
256
        }
257

258
        if (shouldTerminate()) {
1✔
259
          return;
×
260
        }
261

262
        task.run();
1✔
263
        pollBackoffThrottler.success();
1✔
264
      } catch (Throwable e) {
1✔
265
        if (e instanceof InterruptedException) {
1✔
266
          // we restore the flag here, so it can be checked and processed (with exit) in finally.
267
          Thread.currentThread().interrupt();
1✔
268
        } else {
269
          // Don't increase throttle on InterruptedException
270
          pollBackoffThrottler.failure(
1✔
271
              (e instanceof StatusRuntimeException)
1✔
272
                  ? ((StatusRuntimeException) e).getStatus().getCode()
1✔
273
                  : Status.Code.UNKNOWN);
×
274
        }
275
        uncaughtExceptionHandler.uncaughtException(Thread.currentThread(), e);
1✔
276
      } finally {
277
        if (!shouldTerminate()) {
1✔
278
          // Resubmit itself back to pollExecutor
279
          pollExecutor.execute(this);
1✔
280
        } else {
281
          log.info("poll loop is terminated: {}", Poller.this.pollTask.getClass().getSimpleName());
1✔
282
        }
283
      }
284
    }
1✔
285

286
    /**
287
     * Defines if the task should be terminated.
288
     *
289
     * <p>This method preserves the interrupted flag of the current thread.
290
     *
291
     * @return true if pollExecutor is terminating, or the current thread is interrupted.
292
     */
293
    private boolean shouldTerminate() {
294
      return pollExecutor.isShutdown() || Thread.currentThread().isInterrupted();
1✔
295
    }
296
  }
297

298
  private class PollExecutionTask implements Poller.ThrowingRunnable {
1✔
299

300
    @Override
301
    public void run() throws Exception {
302
      T task = pollTask.poll();
1✔
303
      if (task != null) {
1✔
304
        taskExecutor.process(task);
1✔
305
      }
306
    }
1✔
307
  }
308

309
  private final class PollerUncaughtExceptionHandler implements Thread.UncaughtExceptionHandler {
1✔
310

311
    @Override
312
    public void uncaughtException(Thread t, Throwable e) {
313
      if (!pollExecutor.isShutdown() || !shouldIgnoreDuringShutdown(e)) {
1✔
314
        logPollErrors(t, e);
×
315
      } else {
316
        logPollExceptionsSuppressedDuringShutdown(t, e);
1✔
317
      }
318
    }
1✔
319

320
    private void logPollErrors(Thread t, Throwable e) {
321
      if (e instanceof StatusRuntimeException) {
×
322
        StatusRuntimeException te = (StatusRuntimeException) e;
×
323
        if (te.getStatus().getCode() == Status.Code.DEADLINE_EXCEEDED) {
×
324
          log.info("DEADLINE_EXCEEDED in poller thread {}", t.getName(), e);
×
325
          return;
×
326
        }
327
      }
328
      log.warn("Failure in poller thread {}", t.getName(), e);
×
329
    }
×
330

331
    /**
332
     * Some exceptions are considered normal during shutdown {@link #shouldIgnoreDuringShutdown} and
333
     * we log them in the most quite manner.
334
     *
335
     * @param t thread where the exception happened
336
     * @param e the exception itself
337
     */
338
    private void logPollExceptionsSuppressedDuringShutdown(Thread t, Throwable e) {
339
      log.trace(
1✔
340
          "Failure in thread {} is suppressed, considered normal during shutdown", t.getName(), e);
1✔
341
    }
1✔
342

343
    private boolean shouldIgnoreDuringShutdown(Throwable ex) {
344
      if (ex instanceof StatusRuntimeException) {
1✔
345
        if (GrpcUtils.isChannelShutdownException((StatusRuntimeException) ex)) {
1✔
346
          return true;
×
347
        }
348
      }
349
      return
1✔
350
      // if we are terminating and getting rejected execution - it's normal
351
      ex instanceof RejectedExecutionException
352
          // if the worker thread gets InterruptedException - it's normal during shutdown
353
          || ex instanceof InterruptedException
354
          // if we get wrapped InterruptedException like what PollTask or GRPC clients do with
355
          // setting Thread.interrupted() on - it's normal during shutdown too. See PollTask
356
          // javadoc.
357
          || ex.getCause() instanceof InterruptedException;
1✔
358
    }
359
  }
360
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc