• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

grpc / grpc-java / #20126

23 Dec 2025 05:06AM UTC coverage: 88.691% (-0.02%) from 88.706%
#20126

push

github

web-flow
opentelemetry: plumb subchannel metrics disconnect error (#12342)

Finishes the remaining work of
[A94](https://github.com/grpc/proposal/pull/485/files) i.e. the plumbing the disconnect error

35472 of 39995 relevant lines covered (88.69%)

0.89 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

98.0
/../core/src/main/java/io/grpc/internal/KeepAliveManager.java
1
/*
2
 * Copyright 2016 The gRPC Authors
3
 *
4
 * Licensed under the Apache License, Version 2.0 (the "License");
5
 * you may not use this file except in compliance with the License.
6
 * You may obtain a copy of the License at
7
 *
8
 *     http://www.apache.org/licenses/LICENSE-2.0
9
 *
10
 * Unless required by applicable law or agreed to in writing, software
11
 * distributed under the License is distributed on an "AS IS" BASIS,
12
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
 * See the License for the specific language governing permissions and
14
 * limitations under the License.
15
 */
16

17
package io.grpc.internal;
18

19
import static com.google.common.base.Preconditions.checkNotNull;
20
import static com.google.common.base.Preconditions.checkState;
21

22
import com.google.common.annotations.VisibleForTesting;
23
import com.google.common.base.Stopwatch;
24
import com.google.common.util.concurrent.MoreExecutors;
25
import com.google.errorprone.annotations.concurrent.GuardedBy;
26
import io.grpc.Status;
27
import java.util.concurrent.ScheduledExecutorService;
28
import java.util.concurrent.ScheduledFuture;
29
import java.util.concurrent.TimeUnit;
30
import javax.annotation.concurrent.ThreadSafe;
31

32
/**
33
 * Manages keepalive pings.
34
 */
35
public class KeepAliveManager {
36
  private static final long MIN_KEEPALIVE_TIME_NANOS = TimeUnit.SECONDS.toNanos(10);
1✔
37
  private static final long MIN_KEEPALIVE_TIMEOUT_NANOS = TimeUnit.MILLISECONDS.toNanos(10L);
1✔
38

39
  private final ScheduledExecutorService scheduler;
40
  @GuardedBy("this")
41
  private final Stopwatch stopwatch;
42
  private final KeepAlivePinger keepAlivePinger;
43
  private final boolean keepAliveDuringTransportIdle;
44
  @GuardedBy("this")
1✔
45
  private State state = State.IDLE;
46
  @GuardedBy("this")
47
  private ScheduledFuture<?> shutdownFuture;
48
  @GuardedBy("this")
49
  private ScheduledFuture<?> pingFuture;
50
  private final Runnable shutdown = new LogExceptionRunnable(new Runnable() {
1✔
51
    @Override
52
    public void run() {
53
      boolean shouldShutdown = false;
1✔
54
      synchronized (KeepAliveManager.this) {
1✔
55
        if (state != State.DISCONNECTED) {
1✔
56
          // We haven't received a ping response within the timeout. The connection is likely gone
57
          // already. Shutdown the transport and fail all existing rpcs.
58
          state = State.DISCONNECTED;
1✔
59
          shouldShutdown = true;
1✔
60
        }
61
      }
1✔
62
      if (shouldShutdown) {
1✔
63
        keepAlivePinger.onPingTimeout();
1✔
64
      }
65
    }
1✔
66
  });
67
  private final Runnable sendPing = new LogExceptionRunnable(new Runnable() {
1✔
68
    @Override
69
    public void run() {
70
      boolean shouldSendPing = false;
1✔
71
      synchronized (KeepAliveManager.this) {
1✔
72
        pingFuture = null;
1✔
73
        if (state == State.PING_SCHEDULED) {
1✔
74
          shouldSendPing = true;
1✔
75
          state = State.PING_SENT;
1✔
76
          // Schedule a shutdown. It fires if we don't receive the ping response within the timeout.
77
          shutdownFuture = scheduler.schedule(shutdown, keepAliveTimeoutInNanos,
1✔
78
              TimeUnit.NANOSECONDS);
79
        } else if (state == State.PING_DELAYED) {
1✔
80
          // We have received some data. Reschedule the ping with the new time.
81
          pingFuture = scheduler.schedule(
1✔
82
              sendPing,
1✔
83
              keepAliveTimeInNanos - stopwatch.elapsed(TimeUnit.NANOSECONDS),
1✔
84
              TimeUnit.NANOSECONDS);
85
          state = State.PING_SCHEDULED;
1✔
86
        }
87
      }
1✔
88
      if (shouldSendPing) {
1✔
89
        // Send the ping.
90
        keepAlivePinger.ping();
1✔
91
      }
92
    }
1✔
93
  });
94

95
  private final long keepAliveTimeInNanos;
96
  private final long keepAliveTimeoutInNanos;
97

98
  private enum State {
1✔
99
    /*
100
     * We don't need to do any keepalives. This means the transport has no active rpcs and
101
     * keepAliveDuringTransportIdle == false.
102
     */
103
    IDLE,
1✔
104
    /*
105
     * We have scheduled a ping to be sent in the future. We may decide to delay it if we receive
106
     * some data.
107
     */
108
    PING_SCHEDULED,
1✔
109
    /*
110
     * We need to delay the scheduled keepalive ping.
111
     */
112
    PING_DELAYED,
1✔
113
    /*
114
     * The ping has been sent out. Waiting for a ping response.
115
     */
116
    PING_SENT,
1✔
117
    /*
118
     * Transport goes idle after ping has been sent.
119
     */
120
    IDLE_AND_PING_SENT,
1✔
121
    /*
122
     * The transport has been disconnected. We won't do keepalives any more.
123
     */
124
    DISCONNECTED,
1✔
125
  }
126

127
  /**
128
   * Creates a KeepAliverManager.
129
   */
130
  public KeepAliveManager(KeepAlivePinger keepAlivePinger, ScheduledExecutorService scheduler,
131
                          long keepAliveTimeInNanos, long keepAliveTimeoutInNanos,
132
                          boolean keepAliveDuringTransportIdle) {
133
    this(keepAlivePinger, scheduler, Stopwatch.createUnstarted(), keepAliveTimeInNanos,
1✔
134
        keepAliveTimeoutInNanos,
135
        keepAliveDuringTransportIdle);
136
  }
1✔
137

138
  @VisibleForTesting
139
  KeepAliveManager(KeepAlivePinger keepAlivePinger, ScheduledExecutorService scheduler,
140
      Stopwatch stopwatch, long keepAliveTimeInNanos, long keepAliveTimeoutInNanos,
141
                   boolean keepAliveDuringTransportIdle) {
1✔
142
    this.keepAlivePinger = checkNotNull(keepAlivePinger, "keepAlivePinger");
1✔
143
    this.scheduler = checkNotNull(scheduler, "scheduler");
1✔
144
    this.stopwatch = checkNotNull(stopwatch, "stopwatch");
1✔
145
    this.keepAliveTimeInNanos = keepAliveTimeInNanos;
1✔
146
    this.keepAliveTimeoutInNanos = keepAliveTimeoutInNanos;
1✔
147
    this.keepAliveDuringTransportIdle = keepAliveDuringTransportIdle;
1✔
148
    stopwatch.reset().start();
1✔
149
  }
1✔
150

151
  /** Start keepalive monitoring. */
152
  public synchronized void onTransportStarted() {
153
    if (keepAliveDuringTransportIdle) {
1✔
154
      onTransportActive();
1✔
155
    }
156
  }
1✔
157

158
  /**
159
   * Transport has received some data so that we can delay sending keepalives.
160
   */
161
  public synchronized void onDataReceived() {
162
    stopwatch.reset().start();
1✔
163
    // We do not cancel the ping future here. This avoids constantly scheduling and cancellation in
164
    // a busy transport. Instead, we update the status here and reschedule later. So we actually
165
    // keep one sendPing task always in flight when there're active rpcs.
166
    if (state == State.PING_SCHEDULED) {
1✔
167
      state = State.PING_DELAYED;
1✔
168
    } else if (state == State.PING_SENT || state == State.IDLE_AND_PING_SENT) {
1✔
169
      // Ping acked or effectively ping acked. Cancel shutdown, and then if not idle,
170
      // schedule a new keep-alive ping.
171
      if (shutdownFuture != null) {
1✔
172
        shutdownFuture.cancel(false);
1✔
173
      }
174
      if (state == State.IDLE_AND_PING_SENT) {
1✔
175
        // not to schedule new pings until onTransportActive
176
        state = State.IDLE;
1✔
177
        return;
1✔
178
      }
179
      // schedule a new ping
180
      state = State.PING_SCHEDULED;
1✔
181
      checkState(pingFuture == null, "There should be no outstanding pingFuture");
1✔
182
      pingFuture = scheduler.schedule(sendPing, keepAliveTimeInNanos, TimeUnit.NANOSECONDS);
1✔
183
    }
184
  }
1✔
185

186
  /**
187
   * Transport has active streams. Start sending keepalives if necessary.
188
   */
189
  public synchronized void onTransportActive() {
190
    if (state == State.IDLE) {
1✔
191
      // When the transport goes active, we do not reset the nextKeepaliveTime. This allows us to
192
      // quickly check whether the connection is still working.
193
      state = State.PING_SCHEDULED;
1✔
194
      if (pingFuture == null) {
1✔
195
        pingFuture = scheduler.schedule(
1✔
196
            sendPing,
197
            keepAliveTimeInNanos - stopwatch.elapsed(TimeUnit.NANOSECONDS),
1✔
198
            TimeUnit.NANOSECONDS);
199
      }
200
    } else if (state == State.IDLE_AND_PING_SENT) {
1✔
201
      state = State.PING_SENT;
1✔
202
    } // Other states are possible when keepAliveDuringTransportIdle == true
203
  }
1✔
204

205
  /**
206
   * Transport has finished all streams.
207
   */
208
  public synchronized void onTransportIdle() {
209
    if (keepAliveDuringTransportIdle) {
1✔
210
      return;
1✔
211
    }
212
    if (state == State.PING_SCHEDULED || state == State.PING_DELAYED) {
1✔
213
      state = State.IDLE;
1✔
214
    }
215
    if (state == State.PING_SENT) {
1✔
216
      state = State.IDLE_AND_PING_SENT;
1✔
217
    }
218
  }
1✔
219

220
  /**
221
   * Transport is being terminated. We no longer need to do keepalives.
222
   */
223
  public synchronized void onTransportTermination() {
224
    if (state != State.DISCONNECTED) {
1✔
225
      state = State.DISCONNECTED;
1✔
226
      if (shutdownFuture != null) {
1✔
227
        shutdownFuture.cancel(false);
1✔
228
      }
229
      if (pingFuture != null) {
1✔
230
        pingFuture.cancel(false);
1✔
231
        pingFuture = null;
1✔
232
      }
233
    }
234
  }
1✔
235

236
  /**
237
   * Bumps keepalive time to 10 seconds if the specified value was smaller than that.
238
   */
239
  public static long clampKeepAliveTimeInNanos(long keepAliveTimeInNanos) {
240
    return Math.max(keepAliveTimeInNanos, MIN_KEEPALIVE_TIME_NANOS);
1✔
241
  }
242

243
  /**
244
   * Bumps keepalive timeout to 10 milliseconds if the specified value was smaller than that.
245
   */
246
  public static long clampKeepAliveTimeoutInNanos(long keepAliveTimeoutInNanos) {
247
    return Math.max(keepAliveTimeoutInNanos, MIN_KEEPALIVE_TIMEOUT_NANOS);
×
248
  }
249

250
  public interface KeepAlivePinger {
251
    /**
252
     * Sends out a keep-alive ping.
253
     */
254
    void ping();
255

256
    /**
257
     * Callback when Ping Ack was not received in KEEPALIVE_TIMEOUT. Should shutdown the transport.
258
     */
259
    void onPingTimeout();
260
  }
261

262
  /**
263
   * Default client side {@link KeepAlivePinger}.
264
   */
265
  public static final class ClientKeepAlivePinger implements KeepAlivePinger {
266

267

268
    /**
269
     * A {@link ClientTransport} that has life-cycle management.
270
     *
271
     */
272
    @ThreadSafe
273
    public interface TransportWithDisconnectReason extends ClientTransport {
274

275
      /**
276
       * Initiates a forceful shutdown in which preexisting and new calls are closed. Existing calls
277
       * should be closed with the provided {@code reason} and {@code disconnectError}.
278
       */
279
      void shutdownNow(Status reason, DisconnectError disconnectError);
280
    }
281

282
    private final TransportWithDisconnectReason transport;
283

284
    public ClientKeepAlivePinger(TransportWithDisconnectReason transport) {
1✔
285
      this.transport = transport;
1✔
286
    }
1✔
287

288
    @Override
289
    public void ping() {
290
      transport.ping(new ClientTransport.PingCallback() {
1✔
291
        @Override
292
        public void onSuccess(long roundTripTimeNanos) {}
×
293

294
        @Override
295
        public void onFailure(Status cause) {
296
          transport.shutdownNow(Status.UNAVAILABLE.withDescription(
1✔
297
                  "Keepalive failed. The connection is likely gone"),
298
              SimpleDisconnectError.CONNECTION_TIMED_OUT);
299
        }
1✔
300
      }, MoreExecutors.directExecutor());
1✔
301
    }
1✔
302

303
    @Override
304
    public void onPingTimeout() {
305
      transport.shutdownNow(Status.UNAVAILABLE.withDescription(
1✔
306
              "Keepalive failed. The connection is likely gone"),
307
          SimpleDisconnectError.CONNECTION_TIMED_OUT);
308
    }
1✔
309
  }
310
}
311

STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc