• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

grpc / grpc-java / #18721

pending completion
#18721

push

github-actions

web-flow
core, inprocess, util: move inprocess and util code into their own new artifacts grpc-inprocess and grpc-util (#10362)

* core, inprocess, util: move inprocess and util code into their own new artifacts grpc-inprocess and grpc-util

29155 of 33044 relevant lines covered (88.23%)

0.88 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

88.0
/../api/src/main/java/io/grpc/SynchronizationContext.java
1
/*
2
 * Copyright 2018 The gRPC Authors
3
 *
4
 * Licensed under the Apache License, Version 2.0 (the "License");
5
 * you may not use this file except in compliance with the License.
6
 * You may obtain a copy of the License at
7
 *
8
 *     http://www.apache.org/licenses/LICENSE-2.0
9
 *
10
 * Unless required by applicable law or agreed to in writing, software
11
 * distributed under the License is distributed on an "AS IS" BASIS,
12
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
 * See the License for the specific language governing permissions and
14
 * limitations under the License.
15
 */
16

17
package io.grpc;
18

19
import static com.google.common.base.Preconditions.checkNotNull;
20
import static com.google.common.base.Preconditions.checkState;
21

22
import java.lang.Thread.UncaughtExceptionHandler;
23
import java.util.Queue;
24
import java.util.concurrent.ConcurrentLinkedQueue;
25
import java.util.concurrent.Executor;
26
import java.util.concurrent.ScheduledExecutorService;
27
import java.util.concurrent.ScheduledFuture;
28
import java.util.concurrent.TimeUnit;
29
import java.util.concurrent.atomic.AtomicReference;
30
import javax.annotation.concurrent.ThreadSafe;
31

32
/**
33
 * A synchronization context is a queue of tasks that run in sequence.  It offers following
34
 * guarantees:
35
 *
36
 * <ul>
37
 *    <li>Ordering.  Tasks are run in the same order as they are submitted via {@link #execute}
38
 *        and {@link #executeLater}.</li>
39
 *    <li>Serialization.  Tasks are run in sequence and establish a happens-before relationship
40
 *        between them. </li>
41
 *    <li>Non-reentrancy.  If a task running in a synchronization context executes or schedules
42
 *        another task in the same synchronization context, the latter task will never run
43
 *        inline.  It will instead be queued and run only after the current task has returned.</li>
44
 * </ul>
45
 *
46
 * <p>It doesn't own any thread.  Tasks are run from caller's or caller-provided threads.
47
 *
48
 * <p>Conceptually, it is fairly accurate to think of {@code SynchronizationContext} like a cheaper
49
 * {@code Executors.newSingleThreadExecutor()} when used for synchronization (not long-running
50
 * tasks). Both use a queue for tasks that are run in order and neither guarantee that tasks have
51
 * completed before returning from {@code execute()}. However, the behavior does diverge if locks
52
 * are held when calling the context. So it is encouraged to avoid mixing locks and synchronization
53
 * context except via {@link #executeLater}.
54
 *
55
 * <p>This class is thread-safe.
56
 *
57
 * @since 1.17.0
58
 */
59
@ThreadSafe
60
public final class SynchronizationContext implements Executor {
61
  private final UncaughtExceptionHandler uncaughtExceptionHandler;
62

63
  private final Queue<Runnable> queue = new ConcurrentLinkedQueue<>();
1✔
64
  private final AtomicReference<Thread> drainingThread = new AtomicReference<>();
1✔
65

66
  /**
67
   * Creates a SynchronizationContext.
68
   *
69
   * @param uncaughtExceptionHandler handles exceptions thrown out of the tasks.  Different from
70
   *        what's documented on {@link UncaughtExceptionHandler#uncaughtException}, the thread is
71
   *        not terminated when the handler is called.
72
   */
73
  public SynchronizationContext(UncaughtExceptionHandler uncaughtExceptionHandler) {
1✔
74
    this.uncaughtExceptionHandler =
1✔
75
        checkNotNull(uncaughtExceptionHandler, "uncaughtExceptionHandler");
1✔
76
  }
1✔
77

78
  /**
79
   * Run all tasks in the queue in the current thread, if no other thread is running this method.
80
   * Otherwise do nothing.
81
   *
82
   * <p>Upon returning, it guarantees that all tasks submitted by {@code #executeLater} before it
83
   * have been or will eventually be run, while not requiring any more calls to {@code drain()}.
84
   */
85
  public final void drain() {
86
    do {
87
      if (!drainingThread.compareAndSet(null, Thread.currentThread())) {
1✔
88
        return;
1✔
89
      }
90
      try {
91
        Runnable runnable;
92
        while ((runnable = queue.poll()) != null) {
1✔
93
          try {
94
            runnable.run();
1✔
95
          } catch (Throwable t) {
1✔
96
            uncaughtExceptionHandler.uncaughtException(Thread.currentThread(), t);
1✔
97
          }
1✔
98
        }
99
      } finally {
100
        drainingThread.set(null);
1✔
101
      }
102
      // must check queue again here to catch any added prior to clearing drainingThread
103
    } while (!queue.isEmpty());
1✔
104
  }
1✔
105

106
  /**
107
   * Adds a task that will be run when {@link #drain} is called.
108
   *
109
   * <p>This is useful for cases where you want to enqueue a task while under a lock of your own,
110
   * but don't want the tasks to be run under your lock (for fear of deadlock).  You can call {@link
111
   * #executeLater} in the lock, and call {@link #drain} outside the lock.
112
   */
113
  public final void executeLater(Runnable runnable) {
114
    queue.add(checkNotNull(runnable, "runnable is null"));
1✔
115
  }
1✔
116

117
  /**
118
   * Adds a task and run it in this synchronization context as soon as possible.  The task may run
119
   * inline.  If there are tasks that are previously queued by {@link #executeLater} but have not
120
   * been run, this method will trigger them to be run before the given task.  This is equivalent to
121
   * calling {@link #executeLater} immediately followed by {@link #drain}.
122
   */
123
  @Override
124
  public final void execute(Runnable task) {
125
    executeLater(task);
1✔
126
    drain();
1✔
127
  }
1✔
128

129
  /**
130
   * Throw {@link IllegalStateException} if this method is not called from this synchronization
131
   * context.
132
   */
133
  public void throwIfNotInThisSynchronizationContext() {
134
    checkState(Thread.currentThread() == drainingThread.get(),
1✔
135
        "Not called from the SynchronizationContext");
136
  }
1✔
137

138
  /**
139
   * Schedules a task to be added and run via {@link #execute} after a delay.
140
   *
141
   * @param task the task being scheduled
142
   * @param delay the delay
143
   * @param unit the time unit for the delay
144
   * @param timerService the {@code ScheduledExecutorService} that provides delayed execution
145
   *
146
   * @return an object for checking the status and/or cancel the scheduled task
147
   */
148
  public final ScheduledHandle schedule(
149
      final Runnable task, long delay, TimeUnit unit, ScheduledExecutorService timerService) {
150
    final ManagedRunnable runnable = new ManagedRunnable(task);
1✔
151
    ScheduledFuture<?> future = timerService.schedule(new Runnable() {
1✔
152
        @Override
153
        public void run() {
154
          execute(runnable);
1✔
155
        }
1✔
156

157
        @Override
158
        public String toString() {
159
          return task.toString() + "(scheduled in SynchronizationContext)";
1✔
160
        }
161
      }, delay, unit);
162
    return new ScheduledHandle(runnable, future);
1✔
163
  }
164

165
  /**
166
   * Schedules a task to be added and run via {@link #execute} after an inital delay and then
167
   * repeated after the delay until cancelled.
168
   *
169
   * @param task the task being scheduled
170
   * @param initialDelay the delay before the first run
171
   * @param delay the delay after the first run.
172
   * @param unit the time unit for the delay
173
   * @param timerService the {@code ScheduledExecutorService} that provides delayed execution
174
   *
175
   * @return an object for checking the status and/or cancel the scheduled task
176
   */
177
  public final ScheduledHandle scheduleWithFixedDelay(
178
      final Runnable task, long initialDelay, long delay, TimeUnit unit,
179
      ScheduledExecutorService timerService) {
180
    final ManagedRunnable runnable = new ManagedRunnable(task);
×
181
    ScheduledFuture<?> future = timerService.scheduleWithFixedDelay(new Runnable() {
×
182
      @Override
183
      public void run() {
184
        execute(runnable);
×
185
      }
×
186

187
      @Override
188
      public String toString() {
189
        return task.toString() + "(scheduled in SynchronizationContext with delay of " + delay
×
190
            + ")";
191
      }
192
    }, initialDelay, delay, unit);
193
    return new ScheduledHandle(runnable, future);
×
194
  }
195

196

197
  private static class ManagedRunnable implements Runnable {
198
    final Runnable task;
199
    boolean isCancelled;
200
    boolean hasStarted;
201

202
    ManagedRunnable(Runnable task) {
1✔
203
      this.task = checkNotNull(task, "task");
1✔
204
    }
1✔
205

206
    @Override
207
    public void run() {
208
      // The task may have been cancelled after timerService calls SynchronizationContext.execute()
209
      // but before the runnable is actually run.  We must guarantee that the task will not be run
210
      // in this case.
211
      if (!isCancelled) {
1✔
212
        hasStarted = true;
1✔
213
        task.run();
1✔
214
      }
215
    }
1✔
216
  }
217

218
  /**
219
   * Allows the user to check the status and/or cancel a task scheduled by {@link #schedule}.
220
   *
221
   * <p>This class is NOT thread-safe.  All methods must be run from the same {@link
222
   * SynchronizationContext} as which the task was scheduled in.
223
   */
224
  public static final class ScheduledHandle {
225
    private final ManagedRunnable runnable;
226
    private final ScheduledFuture<?> future;
227

228
    private ScheduledHandle(ManagedRunnable runnable, ScheduledFuture<?> future) {
1✔
229
      this.runnable = checkNotNull(runnable, "runnable");
1✔
230
      this.future = checkNotNull(future, "future");
1✔
231
    }
1✔
232

233
    /**
234
     * Cancel the task if it's still {@link #isPending pending}.
235
     */
236
    public void cancel() {
237
      runnable.isCancelled = true;
1✔
238
      future.cancel(false);
1✔
239
    }
1✔
240

241
    /**
242
     * Returns {@code true} if the task will eventually run, meaning that it has neither started
243
     * running nor been cancelled.
244
     */
245
    public boolean isPending() {
246
      return !(runnable.hasStarted || runnable.isCancelled);
1✔
247
    }
248
  }
249
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc