• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

grpc / grpc-java / #20197

16 Mar 2026 05:51PM UTC coverage: 88.694% (+0.005%) from 88.689%
#20197

push

github

web-flow
xds: reuse GrpcXdsTransport and underlying gRPC channel to the same xDS server by ref-counting

This PR implements reusing the gRPC xDS transport (and underlying gRPC
channel) to the same xDS server by ref-counting, which is already
implemented in gRPC C++
([link](https://github.com/grpc/grpc/blob/5a3a5d531/src/core/xds/grpc/xds_transport_grpc.cc#L399-L414))
and gRPC Go
([link](https://github.com/grpc/grpc-go/blob/81c7924ec/internal/xds/clients/grpctransport/grpc_transport.go#L78-L120)).
This optimization is expected to reduce memory footprint of the xDS
management server and xDS enabled clients as channel establishment and
lifecycle management of the connection is expensive.

* Implemented a map to store `GrpcXdsTransport` instances keyed by the
`Bootstrapper.ServerInfo` and each `GrpcXdsTransport` has a ref count.
Note, the map cannot be simply keyed by the xDS server address as the
client could have different channel credentials to the same xDS server,
which should be counted as different transport instances.
* When `GrpcXdsTransportFactory.create()` is called, the existing
transport is reused if it already exists in the map and increment its
ref count, otherwise create a new transport, store it in the map, and
increment its ref count.
* When `GrpcXdsTransport.shutdown()` is called, its ref count is
decremented and the underlying gRPC channel is shut down when its ref
count reaches zero.
* Note this ref-counting of the `GrpcXdsTransport` is different and
orthogonal to the ref-counting of the xDS client keyed by the xDS server
target name to allow for xDS-based fallback per [gRFC
A71](https://github.com/grpc/proposal/blob/master/A71-xds-fallback.md).

Prod risk level: Low
* Reusing the underlying gRPC channel to the xDS server would not affect
the gRPC xDS (ADS/LRS) streams which would be multiplexed on the same
channel, however, this means new xDS (ADS/LRS) streams and RPCs may fail
due ... (continued)

35473 of 39995 relevant lines covered (88.69%)

0.89 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

97.22
/../xds/src/main/java/io/grpc/xds/GrpcXdsTransportFactory.java
1
/*
2
 * Copyright 2024 The gRPC Authors
3
 *
4
 * Licensed under the Apache License, Version 2.0 (the "License");
5
 * you may not use this file except in compliance with the License.
6
 * You may obtain a copy of the License at
7
 *
8
 *     http://www.apache.org/licenses/LICENSE-2.0
9
 *
10
 * Unless required by applicable law or agreed to in writing, software
11
 * distributed under the License is distributed on an "AS IS" BASIS,
12
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
 * See the License for the specific language governing permissions and
14
 * limitations under the License.
15
 */
16

17
package io.grpc.xds;
18

19
import static com.google.common.base.Preconditions.checkNotNull;
20

21
import com.google.common.annotations.VisibleForTesting;
22
import io.grpc.CallCredentials;
23
import io.grpc.CallOptions;
24
import io.grpc.ChannelCredentials;
25
import io.grpc.ClientCall;
26
import io.grpc.Context;
27
import io.grpc.Grpc;
28
import io.grpc.ManagedChannel;
29
import io.grpc.Metadata;
30
import io.grpc.MethodDescriptor;
31
import io.grpc.Status;
32
import io.grpc.xds.client.Bootstrapper;
33
import io.grpc.xds.client.XdsTransportFactory;
34
import java.util.Map;
35
import java.util.concurrent.ConcurrentHashMap;
36
import java.util.concurrent.TimeUnit;
37

38
/**
39
 * A factory for creating gRPC-based transports for xDS communication.
40
 *
41
 * <p>WARNING: This class reuses channels when possible, based on the provided {@link
42
 * Bootstrapper.ServerInfo} with important considerations. The {@link Bootstrapper.ServerInfo}
43
 * includes {@link ChannelCredentials}, which is compared by reference equality. This means every
44
 * {@link Bootstrapper.BootstrapInfo} would have non-equal copies of {@link
45
 * Bootstrapper.ServerInfo}, even if they all represent the same xDS server configuration. For gRPC
46
 * name resolution with the {@code xds} and {@code google-c2p} scheme, this transport sharing works
47
 * as expected as it internally reuses a single {@link Bootstrapper.BootstrapInfo} instance.
48
 * Otherwise, new transports would be created for each {@link Bootstrapper.ServerInfo} despite them
49
 * possibly representing the same xDS server configuration and defeating the purpose of transport
50
 * sharing.
51
 */
52
final class GrpcXdsTransportFactory implements XdsTransportFactory {
53

54
  private final CallCredentials callCredentials;
55
  // The map of xDS server info to its corresponding gRPC xDS transport.
56
  // This enables reusing and sharing the same underlying gRPC channel.
57
  //
58
  // NOTE: ConcurrentHashMap is used as a per-entry lock and all reads and writes must be a mutation
59
  // via the ConcurrentHashMap APIs to acquire the per-entry lock in order to ensure thread safety
60
  // for reference counting of each GrpcXdsTransport instance.
61
  private static final Map<Bootstrapper.ServerInfo, GrpcXdsTransport> xdsServerInfoToTransportMap =
1✔
62
      new ConcurrentHashMap<>();
63

64
  GrpcXdsTransportFactory(CallCredentials callCredentials) {
1✔
65
    this.callCredentials = callCredentials;
1✔
66
  }
1✔
67

68
  @Override
69
  public XdsTransport create(Bootstrapper.ServerInfo serverInfo) {
70
    return xdsServerInfoToTransportMap.compute(
1✔
71
        serverInfo,
72
        (info, transport) -> {
73
          if (transport == null) {
1✔
74
            transport = new GrpcXdsTransport(serverInfo, callCredentials);
1✔
75
          }
76
          ++transport.refCount;
1✔
77
          return transport;
1✔
78
        });
79
  }
80

81
  @VisibleForTesting
82
  public XdsTransport createForTest(ManagedChannel channel) {
83
    return new GrpcXdsTransport(channel, callCredentials, null);
1✔
84
  }
85

86
  @VisibleForTesting
87
  static class GrpcXdsTransport implements XdsTransport {
88

89
    private final ManagedChannel channel;
90
    private final CallCredentials callCredentials;
91
    private final Bootstrapper.ServerInfo serverInfo;
92
    // Must only be accessed via the ConcurrentHashMap APIs which act as the locking methods.
93
    private int refCount = 0;
1✔
94

95
    public GrpcXdsTransport(Bootstrapper.ServerInfo serverInfo) {
96
      this(serverInfo, null);
×
97
    }
×
98

99
    @VisibleForTesting
100
    public GrpcXdsTransport(ManagedChannel channel) {
101
      this(channel, null, null);
1✔
102
    }
1✔
103

104
    public GrpcXdsTransport(Bootstrapper.ServerInfo serverInfo, CallCredentials callCredentials) {
1✔
105
      String target = serverInfo.target();
1✔
106
      ChannelCredentials channelCredentials = (ChannelCredentials) serverInfo.implSpecificConfig();
1✔
107
      this.channel = Grpc.newChannelBuilder(target, channelCredentials)
1✔
108
          .keepAliveTime(5, TimeUnit.MINUTES)
1✔
109
          .build();
1✔
110
      this.callCredentials = callCredentials;
1✔
111
      this.serverInfo = serverInfo;
1✔
112
    }
1✔
113

114
    @VisibleForTesting
115
    public GrpcXdsTransport(
116
        ManagedChannel channel,
117
        CallCredentials callCredentials,
118
        Bootstrapper.ServerInfo serverInfo) {
1✔
119
      this.channel = checkNotNull(channel, "channel");
1✔
120
      this.callCredentials = callCredentials;
1✔
121
      this.serverInfo = serverInfo;
1✔
122
    }
1✔
123

124
    @Override
125
    public <ReqT, RespT> StreamingCall<ReqT, RespT> createStreamingCall(
126
        String fullMethodName,
127
        MethodDescriptor.Marshaller<ReqT> reqMarshaller,
128
        MethodDescriptor.Marshaller<RespT> respMarshaller) {
129
      Context prevContext = Context.ROOT.attach();
1✔
130
      try {
131
        return new XdsStreamingCall<>(
1✔
132
            fullMethodName, reqMarshaller, respMarshaller, callCredentials);
133
      } finally {
134
        Context.ROOT.detach(prevContext);
1✔
135
      }
136

137
    }
138

139
    @Override
140
    public void shutdown() {
141
      if (serverInfo == null) {
1✔
142
        channel.shutdown();
1✔
143
        return;
1✔
144
      }
145
      xdsServerInfoToTransportMap.computeIfPresent(
1✔
146
          serverInfo,
147
          (info, transport) -> {
148
            if (--transport.refCount == 0) { // Prefix decrement and return the updated value.
1✔
149
              transport.channel.shutdown();
1✔
150
              return null; // Remove mapping.
1✔
151
            }
152
            return transport;
1✔
153
          });
154
    }
1✔
155

156
    private class XdsStreamingCall<ReqT, RespT> implements
157
        XdsTransportFactory.StreamingCall<ReqT, RespT> {
158

159
      private final ClientCall<ReqT, RespT> call;
160

161
      public XdsStreamingCall(
162
          String methodName,
163
          MethodDescriptor.Marshaller<ReqT> reqMarshaller,
164
          MethodDescriptor.Marshaller<RespT> respMarshaller,
165
          CallCredentials callCredentials) {
1✔
166
        this.call =
1✔
167
            channel.newCall(
1✔
168
                MethodDescriptor.<ReqT, RespT>newBuilder()
1✔
169
                    .setFullMethodName(methodName)
1✔
170
                    .setType(MethodDescriptor.MethodType.BIDI_STREAMING)
1✔
171
                    .setRequestMarshaller(reqMarshaller)
1✔
172
                    .setResponseMarshaller(respMarshaller)
1✔
173
                    .build(),
1✔
174
                CallOptions.DEFAULT.withCallCredentials(
1✔
175
                    callCredentials)); // TODO(zivy): support waitForReady
176
      }
1✔
177

178
      @Override
179
      public void start(EventHandler<RespT> eventHandler) {
180
        call.start(new EventHandlerToCallListenerAdapter<>(eventHandler), new Metadata());
1✔
181
        call.request(1);
1✔
182
      }
1✔
183

184
      @Override
185
      public void sendMessage(ReqT message) {
186
        call.sendMessage(message);
1✔
187
      }
1✔
188

189
      @Override
190
      public void startRecvMessage() {
191
        call.request(1);
1✔
192
      }
1✔
193

194
      @Override
195
      public void sendError(Exception e) {
196
        call.cancel("Cancelled by XdsClientImpl", e);
1✔
197
      }
1✔
198

199
      @Override
200
      public boolean isReady() {
201
        return call.isReady();
1✔
202
      }
203
    }
204
  }
205

206
  private static class EventHandlerToCallListenerAdapter<T> extends ClientCall.Listener<T> {
207
    private final EventHandler<T> handler;
208

209
    EventHandlerToCallListenerAdapter(EventHandler<T> eventHandler) {
1✔
210
      this.handler = checkNotNull(eventHandler, "eventHandler");
1✔
211
    }
1✔
212

213
    @Override
214
    public void onHeaders(Metadata headers) {}
1✔
215

216
    @Override
217
    public void onMessage(T message) {
218
      handler.onRecvMessage(message);
1✔
219
    }
1✔
220

221
    @Override
222
    public void onClose(Status status, Metadata trailers) {
223
      handler.onStatusReceived(status);
1✔
224
    }
1✔
225

226
    @Override
227
    public void onReady() {
228
      handler.onReady();
1✔
229
    }
1✔
230
  }
231
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc