• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

grpc / grpc-java / #20282

18 May 2026 12:47PM UTC coverage: 88.845% (-0.002%) from 88.847%
#20282

push

github

web-flow
Remove JSR-305 ThreadSafe annotation and replace with JavaDoc (#12762)

For non-final public classes and interfaces only, this PR removes JSR-305 ThreadSafe annotations but instead of replacing with ErrorProne's ThreadSafe, sticks to adding a JavaDoc comment. This should basically keep things inline with what JSR-305 ThreadSafe affords.

Adding ErrorProne's ThreadSafe can be considered in the future, as it
expects more things than JSR-305.

Removing the JSR-305 dependency here allows Java applications that have
moved away from javax to compile and avoids a bug in Immutables and
Lombok (and possibly other annotation processors) from failing when
JSR-305 is not present.

36269 of 40823 relevant lines covered (88.84%)

0.89 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

98.0
/../core/src/main/java/io/grpc/internal/KeepAliveManager.java
1
/*
2
 * Copyright 2016 The gRPC Authors
3
 *
4
 * Licensed under the Apache License, Version 2.0 (the "License");
5
 * you may not use this file except in compliance with the License.
6
 * You may obtain a copy of the License at
7
 *
8
 *     http://www.apache.org/licenses/LICENSE-2.0
9
 *
10
 * Unless required by applicable law or agreed to in writing, software
11
 * distributed under the License is distributed on an "AS IS" BASIS,
12
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
 * See the License for the specific language governing permissions and
14
 * limitations under the License.
15
 */
16

17
package io.grpc.internal;
18

19
import static com.google.common.base.Preconditions.checkNotNull;
20
import static com.google.common.base.Preconditions.checkState;
21

22
import com.google.common.annotations.VisibleForTesting;
23
import com.google.common.base.Stopwatch;
24
import com.google.common.util.concurrent.MoreExecutors;
25
import com.google.errorprone.annotations.concurrent.GuardedBy;
26
import io.grpc.Status;
27
import java.util.concurrent.ScheduledExecutorService;
28
import java.util.concurrent.ScheduledFuture;
29
import java.util.concurrent.TimeUnit;
30

31
/**
32
 * Manages keepalive pings.
33
 */
34
public class KeepAliveManager {
35
  private static final long MIN_KEEPALIVE_TIME_NANOS = TimeUnit.SECONDS.toNanos(10);
1✔
36
  private static final long MIN_KEEPALIVE_TIMEOUT_NANOS = TimeUnit.MILLISECONDS.toNanos(10L);
1✔
37

38
  private final ScheduledExecutorService scheduler;
39
  @GuardedBy("this")
40
  private final Stopwatch stopwatch;
41
  private final KeepAlivePinger keepAlivePinger;
42
  private final boolean keepAliveDuringTransportIdle;
43
  @GuardedBy("this")
1✔
44
  private State state = State.IDLE;
45
  @GuardedBy("this")
46
  private ScheduledFuture<?> shutdownFuture;
47
  @GuardedBy("this")
48
  private ScheduledFuture<?> pingFuture;
49
  private final Runnable shutdown = new LogExceptionRunnable(new Runnable() {
1✔
50
    @Override
51
    public void run() {
52
      boolean shouldShutdown = false;
1✔
53
      synchronized (KeepAliveManager.this) {
1✔
54
        if (state != State.DISCONNECTED) {
1✔
55
          // We haven't received a ping response within the timeout. The connection is likely gone
56
          // already. Shutdown the transport and fail all existing rpcs.
57
          state = State.DISCONNECTED;
1✔
58
          shouldShutdown = true;
1✔
59
        }
60
      }
1✔
61
      if (shouldShutdown) {
1✔
62
        keepAlivePinger.onPingTimeout();
1✔
63
      }
64
    }
1✔
65
  });
66
  private final Runnable sendPing = new LogExceptionRunnable(new Runnable() {
1✔
67
    @Override
68
    public void run() {
69
      boolean shouldSendPing = false;
1✔
70
      synchronized (KeepAliveManager.this) {
1✔
71
        pingFuture = null;
1✔
72
        if (state == State.PING_SCHEDULED) {
1✔
73
          shouldSendPing = true;
1✔
74
          state = State.PING_SENT;
1✔
75
          // Schedule a shutdown. It fires if we don't receive the ping response within the timeout.
76
          shutdownFuture = scheduler.schedule(shutdown, keepAliveTimeoutInNanos,
1✔
77
              TimeUnit.NANOSECONDS);
78
        } else if (state == State.PING_DELAYED) {
1✔
79
          // We have received some data. Reschedule the ping with the new time.
80
          pingFuture = scheduler.schedule(
1✔
81
              sendPing,
1✔
82
              keepAliveTimeInNanos - stopwatch.elapsed(TimeUnit.NANOSECONDS),
1✔
83
              TimeUnit.NANOSECONDS);
84
          state = State.PING_SCHEDULED;
1✔
85
        }
86
      }
1✔
87
      if (shouldSendPing) {
1✔
88
        // Send the ping.
89
        keepAlivePinger.ping();
1✔
90
      }
91
    }
1✔
92
  });
93

94
  private final long keepAliveTimeInNanos;
95
  private final long keepAliveTimeoutInNanos;
96

97
  private enum State {
1✔
98
    /*
99
     * We don't need to do any keepalives. This means the transport has no active rpcs and
100
     * keepAliveDuringTransportIdle == false.
101
     */
102
    IDLE,
1✔
103
    /*
104
     * We have scheduled a ping to be sent in the future. We may decide to delay it if we receive
105
     * some data.
106
     */
107
    PING_SCHEDULED,
1✔
108
    /*
109
     * We need to delay the scheduled keepalive ping.
110
     */
111
    PING_DELAYED,
1✔
112
    /*
113
     * The ping has been sent out. Waiting for a ping response.
114
     */
115
    PING_SENT,
1✔
116
    /*
117
     * Transport goes idle after ping has been sent.
118
     */
119
    IDLE_AND_PING_SENT,
1✔
120
    /*
121
     * The transport has been disconnected. We won't do keepalives any more.
122
     */
123
    DISCONNECTED,
1✔
124
  }
125

126
  /**
127
   * Creates a KeepAliverManager.
128
   */
129
  public KeepAliveManager(KeepAlivePinger keepAlivePinger, ScheduledExecutorService scheduler,
130
                          long keepAliveTimeInNanos, long keepAliveTimeoutInNanos,
131
                          boolean keepAliveDuringTransportIdle) {
132
    this(keepAlivePinger, scheduler, Stopwatch.createUnstarted(), keepAliveTimeInNanos,
1✔
133
        keepAliveTimeoutInNanos,
134
        keepAliveDuringTransportIdle);
135
  }
1✔
136

137
  @VisibleForTesting
138
  KeepAliveManager(KeepAlivePinger keepAlivePinger, ScheduledExecutorService scheduler,
139
      Stopwatch stopwatch, long keepAliveTimeInNanos, long keepAliveTimeoutInNanos,
140
                   boolean keepAliveDuringTransportIdle) {
1✔
141
    this.keepAlivePinger = checkNotNull(keepAlivePinger, "keepAlivePinger");
1✔
142
    this.scheduler = checkNotNull(scheduler, "scheduler");
1✔
143
    this.stopwatch = checkNotNull(stopwatch, "stopwatch");
1✔
144
    this.keepAliveTimeInNanos = keepAliveTimeInNanos;
1✔
145
    this.keepAliveTimeoutInNanos = keepAliveTimeoutInNanos;
1✔
146
    this.keepAliveDuringTransportIdle = keepAliveDuringTransportIdle;
1✔
147
    stopwatch.reset().start();
1✔
148
  }
1✔
149

150
  /** Start keepalive monitoring. */
151
  public synchronized void onTransportStarted() {
152
    if (keepAliveDuringTransportIdle) {
1✔
153
      onTransportActive();
1✔
154
    }
155
  }
1✔
156

157
  /**
158
   * Transport has received some data so that we can delay sending keepalives.
159
   */
160
  public synchronized void onDataReceived() {
161
    stopwatch.reset().start();
1✔
162
    // We do not cancel the ping future here. This avoids constantly scheduling and cancellation in
163
    // a busy transport. Instead, we update the status here and reschedule later. So we actually
164
    // keep one sendPing task always in flight when there're active rpcs.
165
    if (state == State.PING_SCHEDULED) {
1✔
166
      state = State.PING_DELAYED;
1✔
167
    } else if (state == State.PING_SENT || state == State.IDLE_AND_PING_SENT) {
1✔
168
      // Ping acked or effectively ping acked. Cancel shutdown, and then if not idle,
169
      // schedule a new keep-alive ping.
170
      if (shutdownFuture != null) {
1✔
171
        shutdownFuture.cancel(false);
1✔
172
      }
173
      if (state == State.IDLE_AND_PING_SENT) {
1✔
174
        // not to schedule new pings until onTransportActive
175
        state = State.IDLE;
1✔
176
        return;
1✔
177
      }
178
      // schedule a new ping
179
      state = State.PING_SCHEDULED;
1✔
180
      checkState(pingFuture == null, "There should be no outstanding pingFuture");
1✔
181
      pingFuture = scheduler.schedule(sendPing, keepAliveTimeInNanos, TimeUnit.NANOSECONDS);
1✔
182
    }
183
  }
1✔
184

185
  /**
186
   * Transport has active streams. Start sending keepalives if necessary.
187
   */
188
  public synchronized void onTransportActive() {
189
    if (state == State.IDLE) {
1✔
190
      // When the transport goes active, we do not reset the nextKeepaliveTime. This allows us to
191
      // quickly check whether the connection is still working.
192
      state = State.PING_SCHEDULED;
1✔
193
      if (pingFuture == null) {
1✔
194
        pingFuture = scheduler.schedule(
1✔
195
            sendPing,
196
            keepAliveTimeInNanos - stopwatch.elapsed(TimeUnit.NANOSECONDS),
1✔
197
            TimeUnit.NANOSECONDS);
198
      }
199
    } else if (state == State.IDLE_AND_PING_SENT) {
1✔
200
      state = State.PING_SENT;
1✔
201
    } // Other states are possible when keepAliveDuringTransportIdle == true
202
  }
1✔
203

204
  /**
205
   * Transport has finished all streams.
206
   */
207
  public synchronized void onTransportIdle() {
208
    if (keepAliveDuringTransportIdle) {
1✔
209
      return;
1✔
210
    }
211
    if (state == State.PING_SCHEDULED || state == State.PING_DELAYED) {
1✔
212
      state = State.IDLE;
1✔
213
    }
214
    if (state == State.PING_SENT) {
1✔
215
      state = State.IDLE_AND_PING_SENT;
1✔
216
    }
217
  }
1✔
218

219
  /**
220
   * Transport is being terminated. We no longer need to do keepalives.
221
   */
222
  public synchronized void onTransportTermination() {
223
    if (state != State.DISCONNECTED) {
1✔
224
      state = State.DISCONNECTED;
1✔
225
      if (shutdownFuture != null) {
1✔
226
        shutdownFuture.cancel(false);
1✔
227
      }
228
      if (pingFuture != null) {
1✔
229
        pingFuture.cancel(false);
1✔
230
        pingFuture = null;
1✔
231
      }
232
    }
233
  }
1✔
234

235
  /**
236
   * Bumps keepalive time to 10 seconds if the specified value was smaller than that.
237
   */
238
  public static long clampKeepAliveTimeInNanos(long keepAliveTimeInNanos) {
239
    return Math.max(keepAliveTimeInNanos, MIN_KEEPALIVE_TIME_NANOS);
1✔
240
  }
241

242
  /**
243
   * Bumps keepalive timeout to 10 milliseconds if the specified value was smaller than that.
244
   */
245
  public static long clampKeepAliveTimeoutInNanos(long keepAliveTimeoutInNanos) {
246
    return Math.max(keepAliveTimeoutInNanos, MIN_KEEPALIVE_TIMEOUT_NANOS);
×
247
  }
248

249
  public interface KeepAlivePinger {
250
    /**
251
     * Sends out a keep-alive ping.
252
     */
253
    void ping();
254

255
    /**
256
     * Callback when Ping Ack was not received in KEEPALIVE_TIMEOUT. Should shutdown the transport.
257
     */
258
    void onPingTimeout();
259
  }
260

261
  /**
262
   * Default client side {@link KeepAlivePinger}.
263
   */
264
  public static final class ClientKeepAlivePinger implements KeepAlivePinger {
265

266

267
    /**
268
     * A {@link ClientTransport} that has life-cycle management.
269
     *
270
     * <p>This interface is thread-safe.
271
     */
272
    public interface TransportWithDisconnectReason extends ClientTransport {
273

274
      /**
275
       * Initiates a forceful shutdown in which preexisting and new calls are closed. Existing calls
276
       * should be closed with the provided {@code reason} and {@code disconnectError}.
277
       */
278
      void shutdownNow(Status reason, DisconnectError disconnectError);
279
    }
280

281
    private final TransportWithDisconnectReason transport;
282

283
    public ClientKeepAlivePinger(TransportWithDisconnectReason transport) {
1✔
284
      this.transport = transport;
1✔
285
    }
1✔
286

287
    @Override
288
    public void ping() {
289
      transport.ping(new ClientTransport.PingCallback() {
1✔
290
        @Override
291
        public void onSuccess(long roundTripTimeNanos) {}
×
292

293
        @Override
294
        public void onFailure(Status cause) {
295
          transport.shutdownNow(Status.UNAVAILABLE.withDescription(
1✔
296
                  "Keepalive failed. The connection is likely gone"),
297
              SimpleDisconnectError.CONNECTION_TIMED_OUT);
298
        }
1✔
299
      }, MoreExecutors.directExecutor());
1✔
300
    }
1✔
301

302
    @Override
303
    public void onPingTimeout() {
304
      transport.shutdownNow(Status.UNAVAILABLE.withDescription(
1✔
305
              "Keepalive failed. The connection is likely gone"),
306
          SimpleDisconnectError.CONNECTION_TIMED_OUT);
307
    }
1✔
308
  }
309
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc