• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

grpc / grpc-java / #20105

05 Dec 2025 11:11AM UTC coverage: 88.639% (-0.008%) from 88.647%
#20105

push

github

web-flow
okhttp: Fix race condition overwriting MAX_CONCURRENT_STREAMS (#12548)

### What this PR does

This PR fixes a race condition in `OkHttpClientTransport` where
`MAX_CONCURRENT_STREAMS` sent by the server could be incorrectly
overwritten by the client's default initialization.

The fix simply reorders the initialization to happen **before** starting
the reader thread, ensuring that any updates from the server are
preserved.

### Note on Testing

I attempted to add a deterministic reproduction test, but reliably
simulating this specific race condition proved difficult without
intrusive changes.
I request reviewers to primarily verify the logical correctness of the
reordering. I am open to collaborating with the team to develop a
suitable test case if required.


### Future Work

This PR covers **Step 1** (Fixing the race condition) of the plan
discussed in #11985.
I plan to follow up with **Step 2** (Adding assertions to verify no
pending streams exist) in a separate PR.

Part of #11985

35173 of 39681 relevant lines covered (88.64%)

0.89 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

95.27
/../xds/src/main/java/io/grpc/xds/client/BootstrapperImpl.java
1
/*
2
 * Copyright 2021 The gRPC Authors
3
 *
4
 * Licensed under the Apache License, Version 2.0 (the "License");
5
 * you may not use this file except in compliance with the License.
6
 * You may obtain a copy of the License at
7
 *
8
 *     http://www.apache.org/licenses/LICENSE-2.0
9
 *
10
 * Unless required by applicable law or agreed to in writing, software
11
 * distributed under the License is distributed on an "AS IS" BASIS,
12
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
 * See the License for the specific language governing permissions and
14
 * limitations under the License.
15
 */
16

17
package io.grpc.xds.client;
18

19
import com.google.common.annotations.VisibleForTesting;
20
import com.google.common.collect.ImmutableList;
21
import com.google.common.collect.ImmutableMap;
22
import io.grpc.Internal;
23
import io.grpc.InternalLogId;
24
import io.grpc.internal.GrpcUtil;
25
import io.grpc.internal.GrpcUtil.GrpcBuildVersion;
26
import io.grpc.internal.JsonParser;
27
import io.grpc.internal.JsonUtil;
28
import io.grpc.xds.client.EnvoyProtoData.Node;
29
import io.grpc.xds.client.XdsLogger.XdsLogLevel;
30
import java.io.IOException;
31
import java.nio.charset.StandardCharsets;
32
import java.nio.file.Files;
33
import java.nio.file.Paths;
34
import java.util.HashMap;
35
import java.util.List;
36
import java.util.Map;
37

38
/**
39
 * A {@link Bootstrapper} implementation that reads xDS configurations from local file system.
40
 */
41
@Internal
42
public abstract class BootstrapperImpl extends Bootstrapper {
43

44
  public static final String GRPC_EXPERIMENTAL_XDS_FALLBACK =
45
      "GRPC_EXPERIMENTAL_XDS_FALLBACK";
46
  public static final String GRPC_EXPERIMENTAL_XDS_DATA_ERROR_HANDLING =
47
      "GRPC_EXPERIMENTAL_XDS_DATA_ERROR_HANDLING";
48

49
  // Client features.
50
  @VisibleForTesting
51
  public static final String CLIENT_FEATURE_DISABLE_OVERPROVISIONING =
52
      "envoy.lb.does_not_support_overprovisioning";
53
  @VisibleForTesting
54
  public static final String CLIENT_FEATURE_RESOURCE_IN_SOTW = "xds.config.resource-in-sotw";
55

56
  // Server features.
57
  private static final String SERVER_FEATURE_IGNORE_RESOURCE_DELETION = "ignore_resource_deletion";
58
  private static final String SERVER_FEATURE_TRUSTED_XDS_SERVER = "trusted_xds_server";
59
  private static final String
60
      SERVER_FEATURE_RESOURCE_TIMER_IS_TRANSIENT_ERROR = "resource_timer_is_transient_error";
61
  private static final String SERVER_FEATURE_FAIL_ON_DATA_ERRORS = "fail_on_data_errors";
62

63
  @VisibleForTesting
64
  static boolean enableXdsFallback = GrpcUtil.getFlag(GRPC_EXPERIMENTAL_XDS_FALLBACK, true);
1✔
65

66
  @VisibleForTesting
67
  public static boolean xdsDataErrorHandlingEnabled
1✔
68
      = GrpcUtil.getFlag(GRPC_EXPERIMENTAL_XDS_DATA_ERROR_HANDLING, false);
1✔
69

70
  protected final XdsLogger logger;
71

72
  protected FileReader reader = LocalFileReader.INSTANCE;
1✔
73

74
  protected BootstrapperImpl() {
1✔
75
    logger = XdsLogger.withLogId(InternalLogId.allocate("bootstrapper", null));
1✔
76
  }
1✔
77

78
  protected abstract String getJsonContent() throws IOException, XdsInitializationException;
79

80
  protected abstract Object getImplSpecificConfig(Map<String, ?> serverConfig, String serverUri)
81
      throws XdsInitializationException;
82

83

84
  /**
85
   * Reads and parses bootstrap config. The config is expected to be in JSON format.
86
   */
87
  @SuppressWarnings("unchecked")
88
  @Override
89
  public BootstrapInfo bootstrap() throws XdsInitializationException {
90
    String jsonContent;
91
    try {
92
      jsonContent = getJsonContent();
1✔
93
    } catch (IOException e) {
×
94
      throw new XdsInitializationException("Fail to read bootstrap file", e);
×
95
    }
1✔
96

97
    Map<String, ?> rawBootstrap;
98
    try {
99
      rawBootstrap = (Map<String, ?>) JsonParser.parse(jsonContent);
1✔
100
    } catch (IOException e) {
×
101
      throw new XdsInitializationException("Failed to parse JSON", e);
×
102
    }
1✔
103

104
    logger.log(XdsLogLevel.DEBUG, "Bootstrap configuration:\n{0}", rawBootstrap);
1✔
105
    return bootstrap(rawBootstrap);
1✔
106
  }
107

108
  @Override
109
  public BootstrapInfo bootstrap(Map<String, ?> rawData) throws XdsInitializationException {
110
    return bootstrapBuilder(rawData).build();
1✔
111
  }
112

113
  protected BootstrapInfo.Builder bootstrapBuilder(Map<String, ?> rawData)
114
      throws XdsInitializationException {
115
    BootstrapInfo.Builder builder = BootstrapInfo.builder();
1✔
116

117
    List<?> rawServerConfigs = JsonUtil.getList(rawData, "xds_servers");
1✔
118
    if (rawServerConfigs == null) {
1✔
119
      throw new XdsInitializationException("Invalid bootstrap: 'xds_servers' does not exist.");
1✔
120
    }
121
    List<ServerInfo> servers = parseServerInfos(rawServerConfigs, logger);
1✔
122
    if (servers.size() > 1 && !enableXdsFallback) {
1✔
123
      servers = ImmutableList.of(servers.get(0));
×
124
    }
125
    builder.servers(servers);
1✔
126

127
    Node.Builder nodeBuilder = Node.newBuilder();
1✔
128
    Map<String, ?> rawNode = JsonUtil.getObject(rawData, "node");
1✔
129
    if (rawNode != null) {
1✔
130
      String id = JsonUtil.getString(rawNode, "id");
1✔
131
      if (id != null) {
1✔
132
        logger.log(XdsLogLevel.INFO, "Node id: {0}", id);
1✔
133
        nodeBuilder.setId(id);
1✔
134
      }
135
      String cluster = JsonUtil.getString(rawNode, "cluster");
1✔
136
      if (cluster != null) {
1✔
137
        logger.log(XdsLogLevel.INFO, "Node cluster: {0}", cluster);
1✔
138
        nodeBuilder.setCluster(cluster);
1✔
139
      }
140
      Map<String, ?> metadata = JsonUtil.getObject(rawNode, "metadata");
1✔
141
      if (metadata != null) {
1✔
142
        nodeBuilder.setMetadata(metadata);
1✔
143
      }
144
      Map<String, ?> rawLocality = JsonUtil.getObject(rawNode, "locality");
1✔
145
      if (rawLocality != null) {
1✔
146
        String region = "";
1✔
147
        String zone = "";
1✔
148
        String subZone = "";
1✔
149
        if (rawLocality.containsKey("region")) {
1✔
150
          region = JsonUtil.getString(rawLocality, "region");
1✔
151
        }
152
        if (rawLocality.containsKey("zone")) {
1✔
153
          zone = JsonUtil.getString(rawLocality, "zone");
1✔
154
        }
155
        if (rawLocality.containsKey("sub_zone")) {
1✔
156
          subZone = JsonUtil.getString(rawLocality, "sub_zone");
1✔
157
        }
158
        logger.log(XdsLogLevel.INFO, "Locality region: {0}, zone: {1}, subZone: {2}",
1✔
159
            region, zone, subZone);
160
        Locality locality = Locality.create(region, zone, subZone);
1✔
161
        nodeBuilder.setLocality(locality);
1✔
162
      }
163
    }
164
    GrpcBuildVersion buildVersion = GrpcUtil.getGrpcBuildVersion();
1✔
165
    logger.log(XdsLogLevel.INFO, "Build version: {0}", buildVersion);
1✔
166
    nodeBuilder.setBuildVersion(buildVersion.toString());
1✔
167
    nodeBuilder.setUserAgentName(buildVersion.getUserAgent());
1✔
168
    nodeBuilder.setUserAgentVersion(buildVersion.getImplementationVersion());
1✔
169
    nodeBuilder.addClientFeatures(CLIENT_FEATURE_DISABLE_OVERPROVISIONING);
1✔
170
    nodeBuilder.addClientFeatures(CLIENT_FEATURE_RESOURCE_IN_SOTW);
1✔
171
    builder.node(nodeBuilder.build());
1✔
172

173
    Map<String, ?> certProvidersBlob = JsonUtil.getObject(rawData, "certificate_providers");
1✔
174
    if (certProvidersBlob != null) {
1✔
175
      logger.log(XdsLogLevel.INFO, "Configured with {0} cert providers", certProvidersBlob.size());
1✔
176
      Map<String, CertificateProviderInfo> certProviders = new HashMap<>(certProvidersBlob.size());
1✔
177
      for (String name : certProvidersBlob.keySet()) {
1✔
178
        Map<String, ?> valueMap = JsonUtil.getObject(certProvidersBlob, name);
1✔
179
        String pluginName =
1✔
180
            checkForNull(JsonUtil.getString(valueMap, "plugin_name"), "plugin_name");
1✔
181
        logger.log(XdsLogLevel.INFO, "cert provider: {0}, plugin name: {1}", name, pluginName);
1✔
182
        Map<String, ?> config = checkForNull(JsonUtil.getObject(valueMap, "config"), "config");
1✔
183
        CertificateProviderInfo certificateProviderInfo =
1✔
184
            CertificateProviderInfo.create(pluginName, config);
1✔
185
        certProviders.put(name, certificateProviderInfo);
1✔
186
      }
1✔
187
      builder.certProviders(certProviders);
1✔
188
    }
189

190
    String serverResourceId =
1✔
191
        JsonUtil.getString(rawData, "server_listener_resource_name_template");
1✔
192
    logger.log(
1✔
193
        XdsLogLevel.INFO, "server_listener_resource_name_template: {0}", serverResourceId);
194
    builder.serverListenerResourceNameTemplate(serverResourceId);
1✔
195

196
    String clientDefaultListener =
1✔
197
        JsonUtil.getString(rawData, "client_default_listener_resource_name_template");
1✔
198
    logger.log(
1✔
199
        XdsLogLevel.INFO, "client_default_listener_resource_name_template: {0}",
200
        clientDefaultListener);
201
    if (clientDefaultListener != null) {
1✔
202
      builder.clientDefaultListenerResourceNameTemplate(clientDefaultListener);
1✔
203
    }
204

205
    Map<String, ?> rawAuthoritiesMap =
1✔
206
        JsonUtil.getObject(rawData, "authorities");
1✔
207
    ImmutableMap.Builder<String, AuthorityInfo> authorityInfoMapBuilder = ImmutableMap.builder();
1✔
208
    if (rawAuthoritiesMap != null) {
1✔
209
      logger.log(
1✔
210
          XdsLogLevel.INFO, "Configured with {0} xDS server authorities", rawAuthoritiesMap.size());
1✔
211
      for (String authorityName : rawAuthoritiesMap.keySet()) {
1✔
212
        logger.log(XdsLogLevel.INFO, "xDS server authority: {0}", authorityName);
1✔
213
        Map<String, ?> rawAuthority = JsonUtil.getObject(rawAuthoritiesMap, authorityName);
1✔
214
        String clientListnerTemplate =
1✔
215
            JsonUtil.getString(rawAuthority, "client_listener_resource_name_template");
1✔
216
        logger.log(
1✔
217
            XdsLogLevel.INFO, "client_listener_resource_name_template: {0}", clientListnerTemplate);
218
        String prefix = XDSTP_SCHEME + "//" + authorityName + "/";
1✔
219
        if (clientListnerTemplate == null) {
1✔
220
          clientListnerTemplate = prefix + "envoy.config.listener.v3.Listener/%s";
1✔
221
        } else if (!clientListnerTemplate.startsWith(prefix)) {
1✔
222
          throw new XdsInitializationException(
1✔
223
              "client_listener_resource_name_template: '" + clientListnerTemplate
224
                  + "' does not start with " + prefix);
225
        }
226
        List<?> rawAuthorityServers = JsonUtil.getList(rawAuthority, "xds_servers");
1✔
227
        List<ServerInfo> authorityServers;
228
        if (rawAuthorityServers == null || rawAuthorityServers.isEmpty()) {
1✔
229
          authorityServers = servers;
1✔
230
        } else {
231
          if (rawAuthorityServers.size() > 1 && !enableXdsFallback) {
1✔
232
            rawAuthorityServers = ImmutableList.of(rawAuthorityServers.get(0));
×
233
          }
234
          authorityServers = parseServerInfos(rawAuthorityServers, logger);
1✔
235
        }
236
        authorityInfoMapBuilder.put(
1✔
237
            authorityName, AuthorityInfo.create(clientListnerTemplate, authorityServers));
1✔
238
      }
1✔
239
      builder.authorities(authorityInfoMapBuilder.buildOrThrow());
1✔
240
    }
241

242
    return builder;
1✔
243
  }
244

245
  private List<ServerInfo> parseServerInfos(List<?> rawServerConfigs, XdsLogger logger)
246
      throws XdsInitializationException {
247
    logger.log(XdsLogLevel.INFO, "Configured with {0} xDS servers", rawServerConfigs.size());
1✔
248
    ImmutableList.Builder<ServerInfo> servers = ImmutableList.builder();
1✔
249
    List<Map<String, ?>> serverConfigList = JsonUtil.checkObjectList(rawServerConfigs);
1✔
250
    for (Map<String, ?> serverConfig : serverConfigList) {
1✔
251
      String serverUri = JsonUtil.getString(serverConfig, "server_uri");
1✔
252
      if (serverUri == null) {
1✔
253
        throw new XdsInitializationException("Invalid bootstrap: missing 'server_uri'");
1✔
254
      }
255
      logger.log(XdsLogLevel.INFO, "xDS server URI: {0}", serverUri);
1✔
256

257
      Object implSpecificConfig = getImplSpecificConfig(serverConfig, serverUri);
1✔
258

259
      boolean resourceTimerIsTransientError = false;
1✔
260
      boolean ignoreResourceDeletion = false;
1✔
261
      boolean failOnDataErrors = false;
1✔
262
      // "For forward compatibility reasons, the client will ignore any entry in the list that it
263
      // does not understand, regardless of type."
264
      List<?> serverFeatures = JsonUtil.getList(serverConfig, "server_features");
1✔
265
      if (serverFeatures != null) {
1✔
266
        logger.log(XdsLogLevel.INFO, "Server features: {0}", serverFeatures);
1✔
267
        if (serverFeatures.contains(SERVER_FEATURE_IGNORE_RESOURCE_DELETION)) {
1✔
268
          ignoreResourceDeletion = true;
1✔
269
        }
270
        resourceTimerIsTransientError = xdsDataErrorHandlingEnabled
1✔
271
            && serverFeatures.contains(SERVER_FEATURE_RESOURCE_TIMER_IS_TRANSIENT_ERROR);
1✔
272
        failOnDataErrors = xdsDataErrorHandlingEnabled
1✔
273
            && serverFeatures.contains(SERVER_FEATURE_FAIL_ON_DATA_ERRORS);
1✔
274
      }
275
      servers.add(
1✔
276
          ServerInfo.create(serverUri, implSpecificConfig, ignoreResourceDeletion,
1✔
277
              serverFeatures != null
278
                  && serverFeatures.contains(SERVER_FEATURE_TRUSTED_XDS_SERVER),
1✔
279
              resourceTimerIsTransientError, failOnDataErrors));
280
    }
1✔
281
    return servers.build();
1✔
282
  }
283

284
  @VisibleForTesting
285
  public void setFileReader(FileReader reader) {
286
    this.reader = reader;
1✔
287
  }
1✔
288

289
  /**
290
   * Reads the content of the file with the given path in the file system.
291
   */
292
  public interface FileReader {
293
    String readFile(String path) throws IOException;
294
  }
295

296
  protected enum LocalFileReader implements FileReader {
1✔
297
    INSTANCE;
1✔
298

299
    @Override
300
    public String readFile(String path) throws IOException {
301
      return new String(Files.readAllBytes(Paths.get(path)), StandardCharsets.UTF_8);
×
302
    }
303
  }
304

305
  private static <T> T checkForNull(T value, String fieldName) throws XdsInitializationException {
306
    if (value == null) {
1✔
307
      throw new XdsInitializationException(
1✔
308
          "Invalid bootstrap: '" + fieldName + "' does not exist.");
309
    }
310
    return value;
1✔
311
  }
312

313
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc