• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

grpc / grpc-java / #19533

30 Oct 2024 01:19PM CUT coverage: 84.562% (-0.01%) from 84.573%
#19533

push

github

web-flow
api: Add java.time.Duration overloads to CallOptions, AbstractStub taking TimeUnit and a time value (#11562)

33927 of 40121 relevant lines covered (84.56%)

0.85 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

98.08
/../api/src/main/java/io/grpc/SynchronizationContext.java
1
/*
2
 * Copyright 2018 The gRPC Authors
3
 *
4
 * Licensed under the Apache License, Version 2.0 (the "License");
5
 * you may not use this file except in compliance with the License.
6
 * You may obtain a copy of the License at
7
 *
8
 *     http://www.apache.org/licenses/LICENSE-2.0
9
 *
10
 * Unless required by applicable law or agreed to in writing, software
11
 * distributed under the License is distributed on an "AS IS" BASIS,
12
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
 * See the License for the specific language governing permissions and
14
 * limitations under the License.
15
 */
16

17
package io.grpc;
18

19
import static com.google.common.base.Preconditions.checkNotNull;
20
import static com.google.common.base.Preconditions.checkState;
21
import static io.grpc.TimeUtils.convertToNanos;
22

23
import java.lang.Thread.UncaughtExceptionHandler;
24
import java.time.Duration;
25
import java.util.Queue;
26
import java.util.concurrent.ConcurrentLinkedQueue;
27
import java.util.concurrent.Executor;
28
import java.util.concurrent.ScheduledExecutorService;
29
import java.util.concurrent.ScheduledFuture;
30
import java.util.concurrent.TimeUnit;
31
import java.util.concurrent.atomic.AtomicReference;
32
import javax.annotation.concurrent.ThreadSafe;
33

34
/**
35
 * A synchronization context is a queue of tasks that run in sequence.  It offers following
36
 * guarantees:
37
 *
38
 * <ul>
39
 *    <li>Ordering.  Tasks are run in the same order as they are submitted via {@link #execute}
40
 *        and {@link #executeLater}.</li>
41
 *    <li>Serialization.  Tasks are run in sequence and establish a happens-before relationship
42
 *        between them. </li>
43
 *    <li>Non-reentrancy.  If a task running in a synchronization context executes or schedules
44
 *        another task in the same synchronization context, the latter task will never run
45
 *        inline.  It will instead be queued and run only after the current task has returned.</li>
46
 * </ul>
47
 *
48
 * <p>It doesn't own any thread.  Tasks are run from caller's or caller-provided threads.
49
 *
50
 * <p>Conceptually, it is fairly accurate to think of {@code SynchronizationContext} like a cheaper
51
 * {@code Executors.newSingleThreadExecutor()} when used for synchronization (not long-running
52
 * tasks). Both use a queue for tasks that are run in order and neither guarantee that tasks have
53
 * completed before returning from {@code execute()}. However, the behavior does diverge if locks
54
 * are held when calling the context. So it is encouraged to avoid mixing locks and synchronization
55
 * context except via {@link #executeLater}.
56
 *
57
 * <p>This class is thread-safe.
58
 *
59
 * @since 1.17.0
60
 */
61
@ThreadSafe
62
public final class SynchronizationContext implements Executor {
63
  private final UncaughtExceptionHandler uncaughtExceptionHandler;
64

65
  private final Queue<Runnable> queue = new ConcurrentLinkedQueue<>();
1✔
66
  private final AtomicReference<Thread> drainingThread = new AtomicReference<>();
1✔
67

68
  /**
69
   * Creates a SynchronizationContext.
70
   *
71
   * @param uncaughtExceptionHandler handles exceptions thrown out of the tasks.  Different from
72
   *        what's documented on {@link UncaughtExceptionHandler#uncaughtException}, the thread is
73
   *        not terminated when the handler is called.
74
   */
75
  public SynchronizationContext(UncaughtExceptionHandler uncaughtExceptionHandler) {
1✔
76
    this.uncaughtExceptionHandler =
1✔
77
        checkNotNull(uncaughtExceptionHandler, "uncaughtExceptionHandler");
1✔
78
  }
1✔
79

80
  /**
81
   * Run all tasks in the queue in the current thread, if no other thread is running this method.
82
   * Otherwise do nothing.
83
   *
84
   * <p>Upon returning, it guarantees that all tasks submitted by {@code #executeLater} before it
85
   * have been or will eventually be run, while not requiring any more calls to {@code drain()}.
86
   */
87
  public final void drain() {
88
    do {
89
      if (!drainingThread.compareAndSet(null, Thread.currentThread())) {
1✔
90
        return;
1✔
91
      }
92
      try {
93
        Runnable runnable;
94
        while ((runnable = queue.poll()) != null) {
1✔
95
          try {
96
            runnable.run();
1✔
97
          } catch (Throwable t) {
1✔
98
            uncaughtExceptionHandler.uncaughtException(Thread.currentThread(), t);
1✔
99
          }
1✔
100
        }
101
      } finally {
102
        drainingThread.set(null);
1✔
103
      }
104
      // must check queue again here to catch any added prior to clearing drainingThread
105
    } while (!queue.isEmpty());
1✔
106
  }
1✔
107

108
  /**
109
   * Adds a task that will be run when {@link #drain} is called.
110
   *
111
   * <p>This is useful for cases where you want to enqueue a task while under a lock of your own,
112
   * but don't want the tasks to be run under your lock (for fear of deadlock).  You can call {@link
113
   * #executeLater} in the lock, and call {@link #drain} outside the lock.
114
   */
115
  public final void executeLater(Runnable runnable) {
116
    queue.add(checkNotNull(runnable, "runnable is null"));
1✔
117
  }
1✔
118

119
  /**
120
   * Adds a task and run it in this synchronization context as soon as possible.  The task may run
121
   * inline.  If there are tasks that are previously queued by {@link #executeLater} but have not
122
   * been run, this method will trigger them to be run before the given task.  This is equivalent to
123
   * calling {@link #executeLater} immediately followed by {@link #drain}.
124
   */
125
  @Override
126
  public final void execute(Runnable task) {
127
    executeLater(task);
1✔
128
    drain();
1✔
129
  }
1✔
130

131
  /**
132
   * Throw {@link IllegalStateException} if this method is not called from this synchronization
133
   * context.
134
   */
135
  public void throwIfNotInThisSynchronizationContext() {
136
    checkState(Thread.currentThread() == drainingThread.get(),
1✔
137
        "Not called from the SynchronizationContext");
138
  }
1✔
139

140
  /**
141
   * Schedules a task to be added and run via {@link #execute} after a delay.
142
   *
143
   * @param task the task being scheduled
144
   * @param delay the delay
145
   * @param unit the time unit for the delay
146
   * @param timerService the {@code ScheduledExecutorService} that provides delayed execution
147
   *
148
   * @return an object for checking the status and/or cancel the scheduled task
149
   */
150
  public final ScheduledHandle schedule(
151
      final Runnable task, long delay, TimeUnit unit, ScheduledExecutorService timerService) {
152
    final ManagedRunnable runnable = new ManagedRunnable(task);
1✔
153
    ScheduledFuture<?> future = timerService.schedule(new Runnable() {
1✔
154
        @Override
155
        public void run() {
156
          execute(runnable);
1✔
157
        }
1✔
158

159
        @Override
160
        public String toString() {
161
          return task.toString() + "(scheduled in SynchronizationContext)";
1✔
162
        }
163
      }, delay, unit);
164
    return new ScheduledHandle(runnable, future);
1✔
165
  }
166

167
  @ExperimentalApi("https://github.com/grpc/grpc-java/issues/11657")
168
  public final ScheduledHandle schedule(
169
      final Runnable task, Duration delay, ScheduledExecutorService timerService) {
170
    return schedule(task, convertToNanos(delay), TimeUnit.NANOSECONDS, timerService);
1✔
171
  }
172

173
  /**
174
   * Schedules a task to be added and run via {@link #execute} after an initial delay and then
175
   * repeated after the delay until cancelled.
176
   *
177
   * @param task the task being scheduled
178
   * @param initialDelay the delay before the first run
179
   * @param delay the delay after the first run.
180
   * @param unit the time unit for the delay
181
   * @param timerService the {@code ScheduledExecutorService} that provides delayed execution
182
   *
183
   * @return an object for checking the status and/or cancel the scheduled task
184
   */
185
  public final ScheduledHandle scheduleWithFixedDelay(
186
      final Runnable task, long initialDelay, long delay, TimeUnit unit,
187
      ScheduledExecutorService timerService) {
188
    final ManagedRunnable runnable = new ManagedRunnable(task);
1✔
189
    ScheduledFuture<?> future = timerService.scheduleWithFixedDelay(new Runnable() {
1✔
190
      @Override
191
      public void run() {
192
        execute(runnable);
1✔
193
      }
1✔
194

195
      @Override
196
      public String toString() {
197
        return task.toString() + "(scheduled in SynchronizationContext with delay of " + delay
×
198
            + ")";
199
      }
200
    }, initialDelay, delay, unit);
201
    return new ScheduledHandle(runnable, future);
1✔
202
  }
203

204
  @ExperimentalApi("https://github.com/grpc/grpc-java/issues/11657")
205
  public final ScheduledHandle scheduleWithFixedDelay(
206
      final Runnable task, Duration initialDelay, Duration delay,
207
      ScheduledExecutorService timerService) {
208
    return scheduleWithFixedDelay(task, convertToNanos(initialDelay), convertToNanos(delay),
1✔
209
        TimeUnit.NANOSECONDS, timerService);
210
  }
211

212

213
  private static class ManagedRunnable implements Runnable {
214
    final Runnable task;
215
    boolean isCancelled;
216
    boolean hasStarted;
217

218
    ManagedRunnable(Runnable task) {
1✔
219
      this.task = checkNotNull(task, "task");
1✔
220
    }
1✔
221

222
    @Override
223
    public void run() {
224
      // The task may have been cancelled after timerService calls SynchronizationContext.execute()
225
      // but before the runnable is actually run.  We must guarantee that the task will not be run
226
      // in this case.
227
      if (!isCancelled) {
1✔
228
        hasStarted = true;
1✔
229
        task.run();
1✔
230
      }
231
    }
1✔
232
  }
233

234
  /**
235
   * Allows the user to check the status and/or cancel a task scheduled by {@link #schedule}.
236
   *
237
   * <p>This class is NOT thread-safe.  All methods must be run from the same {@link
238
   * SynchronizationContext} as which the task was scheduled in.
239
   */
240
  public static final class ScheduledHandle {
241
    private final ManagedRunnable runnable;
242
    private final ScheduledFuture<?> future;
243

244
    private ScheduledHandle(ManagedRunnable runnable, ScheduledFuture<?> future) {
1✔
245
      this.runnable = checkNotNull(runnable, "runnable");
1✔
246
      this.future = checkNotNull(future, "future");
1✔
247
    }
1✔
248

249
    /**
250
     * Cancel the task if it's still {@link #isPending pending}.
251
     */
252
    public void cancel() {
253
      runnable.isCancelled = true;
1✔
254
      future.cancel(false);
1✔
255
    }
1✔
256

257
    /**
258
     * Returns {@code true} if the task will eventually run, meaning that it has neither started
259
     * running nor been cancelled.
260
     */
261
    public boolean isPending() {
262
      return !(runnable.hasStarted || runnable.isCancelled);
1✔
263
    }
264
  }
265
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc