• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

grpc / grpc-java / #19715

06 Mar 2025 08:10AM UTC coverage: 88.479% (-0.04%) from 88.515%
#19715

push

github

web-flow
xds: xDS-based HTTP CONNECT configuration (#11861)

34489 of 38980 relevant lines covered (88.48%)

0.88 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

76.69
/../xds/src/main/java/io/grpc/xds/XdsEndpointResource.java
1
/*
2
 * Copyright 2022 The gRPC Authors
3
 *
4
 * Licensed under the Apache License, Version 2.0 (the "License");
5
 * you may not use this file except in compliance with the License.
6
 * You may obtain a copy of the License at
7
 *
8
 *     http://www.apache.org/licenses/LICENSE-2.0
9
 *
10
 * Unless required by applicable law or agreed to in writing, software
11
 * distributed under the License is distributed on an "AS IS" BASIS,
12
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
 * See the License for the specific language governing permissions and
14
 * limitations under the License.
15
 */
16

17
package io.grpc.xds;
18

19
import static com.google.common.base.Preconditions.checkNotNull;
20

21
import com.google.common.annotations.VisibleForTesting;
22
import com.google.common.base.MoreObjects;
23
import com.google.common.collect.ImmutableMap;
24
import com.google.common.net.InetAddresses;
25
import com.google.protobuf.Any;
26
import com.google.protobuf.InvalidProtocolBufferException;
27
import com.google.protobuf.Message;
28
import io.envoyproxy.envoy.config.core.v3.Address;
29
import io.envoyproxy.envoy.config.core.v3.HealthStatus;
30
import io.envoyproxy.envoy.config.core.v3.SocketAddress;
31
import io.envoyproxy.envoy.config.endpoint.v3.ClusterLoadAssignment;
32
import io.envoyproxy.envoy.config.endpoint.v3.Endpoint;
33
import io.envoyproxy.envoy.type.v3.FractionalPercent;
34
import io.grpc.EquivalentAddressGroup;
35
import io.grpc.internal.GrpcUtil;
36
import io.grpc.xds.Endpoints.DropOverload;
37
import io.grpc.xds.Endpoints.LocalityLbEndpoints;
38
import io.grpc.xds.MetadataRegistry.MetadataValueParser;
39
import io.grpc.xds.XdsEndpointResource.EdsUpdate;
40
import io.grpc.xds.client.Locality;
41
import io.grpc.xds.client.XdsClient.ResourceUpdate;
42
import io.grpc.xds.client.XdsResourceType;
43
import java.net.InetSocketAddress;
44
import java.util.ArrayList;
45
import java.util.Collections;
46
import java.util.HashMap;
47
import java.util.HashSet;
48
import java.util.LinkedHashMap;
49
import java.util.List;
50
import java.util.Map;
51
import java.util.Objects;
52
import java.util.Set;
53
import javax.annotation.Nullable;
54

55
class XdsEndpointResource extends XdsResourceType<EdsUpdate> {
1✔
56
  static final String ADS_TYPE_URL_EDS =
57
      "type.googleapis.com/envoy.config.endpoint.v3.ClusterLoadAssignment";
58

59
  public static final String GRPC_EXPERIMENTAL_XDS_DUALSTACK_ENDPOINTS =
60
      "GRPC_EXPERIMENTAL_XDS_DUALSTACK_ENDPOINTS";
61

62
  private static final XdsEndpointResource instance = new XdsEndpointResource();
1✔
63

64
  static XdsEndpointResource getInstance() {
65
    return instance;
1✔
66
  }
67

68
  @Override
69
  @Nullable
70
  protected String extractResourceName(Message unpackedResource) {
71
    if (!(unpackedResource instanceof ClusterLoadAssignment)) {
1✔
72
      return null;
×
73
    }
74
    return ((ClusterLoadAssignment) unpackedResource).getClusterName();
1✔
75
  }
76

77
  @Override
78
  public String typeName() {
79
    return "EDS";
1✔
80
  }
81

82
  @Override
83
  public String typeUrl() {
84
    return ADS_TYPE_URL_EDS;
1✔
85
  }
86

87
  @Override
88
  public boolean shouldRetrieveResourceKeysForArgs() {
89
    return true;
1✔
90
  }
91

92
  @Override
93
  protected boolean isFullStateOfTheWorld() {
94
    return false;
1✔
95
  }
96

97
  @Override
98
  protected Class<ClusterLoadAssignment> unpackedClassName() {
99
    return ClusterLoadAssignment.class;
1✔
100
  }
101

102
  @Override
103
  protected EdsUpdate doParse(Args args, Message unpackedMessage) throws ResourceInvalidException {
104
    if (!(unpackedMessage instanceof ClusterLoadAssignment)) {
1✔
105
      throw new ResourceInvalidException("Invalid message type: " + unpackedMessage.getClass());
×
106
    }
107
    return processClusterLoadAssignment((ClusterLoadAssignment) unpackedMessage);
1✔
108
  }
109

110
  private static boolean isEnabledXdsDualStack() {
111
    return GrpcUtil.getFlag(GRPC_EXPERIMENTAL_XDS_DUALSTACK_ENDPOINTS, false);
1✔
112
  }
113

114
  private static EdsUpdate processClusterLoadAssignment(ClusterLoadAssignment assignment)
115
      throws ResourceInvalidException {
116
    Map<Integer, Set<Locality>> priorities = new HashMap<>();
1✔
117
    Map<Locality, LocalityLbEndpoints> localityLbEndpointsMap = new LinkedHashMap<>();
1✔
118
    List<Endpoints.DropOverload> dropOverloads = new ArrayList<>();
1✔
119
    int maxPriority = -1;
1✔
120
    for (io.envoyproxy.envoy.config.endpoint.v3.LocalityLbEndpoints localityLbEndpointsProto
121
        : assignment.getEndpointsList()) {
1✔
122
      StructOrError<LocalityLbEndpoints> structOrError =
1✔
123
          parseLocalityLbEndpoints(localityLbEndpointsProto);
1✔
124
      if (structOrError == null) {
1✔
125
        continue;
1✔
126
      }
127
      if (structOrError.getErrorDetail() != null) {
1✔
128
        throw new ResourceInvalidException(structOrError.getErrorDetail());
1✔
129
      }
130

131
      LocalityLbEndpoints localityLbEndpoints = structOrError.getStruct();
1✔
132
      int priority = localityLbEndpoints.priority();
1✔
133
      maxPriority = Math.max(maxPriority, priority);
1✔
134
      // Note endpoints with health status other than HEALTHY and UNKNOWN are still
135
      // handed over to watching parties. It is watching parties' responsibility to
136
      // filter out unhealthy endpoints. See EnvoyProtoData.LbEndpoint#isHealthy().
137
      Locality locality =  parseLocality(localityLbEndpointsProto.getLocality());
1✔
138
      localityLbEndpointsMap.put(locality, localityLbEndpoints);
1✔
139
      if (!priorities.containsKey(priority)) {
1✔
140
        priorities.put(priority, new HashSet<>());
1✔
141
      }
142
      if (!priorities.get(priority).add(locality)) {
1✔
143
        throw new ResourceInvalidException("ClusterLoadAssignment has duplicate locality:"
1✔
144
            + locality + " for priority:" + priority);
145
      }
146
    }
1✔
147
    if (priorities.size() != maxPriority + 1) {
1✔
148
      throw new ResourceInvalidException("ClusterLoadAssignment has sparse priorities");
×
149
    }
150

151
    for (ClusterLoadAssignment.Policy.DropOverload dropOverloadProto
152
        : assignment.getPolicy().getDropOverloadsList()) {
1✔
153
      dropOverloads.add(parseDropOverload(dropOverloadProto));
1✔
154
    }
1✔
155
    return new EdsUpdate(assignment.getClusterName(), localityLbEndpointsMap, dropOverloads);
1✔
156
  }
157

158
  private static Locality parseLocality(io.envoyproxy.envoy.config.core.v3.Locality proto) {
159
    return Locality.create(proto.getRegion(), proto.getZone(), proto.getSubZone());
1✔
160
  }
161

162
  private static DropOverload parseDropOverload(
163
      io.envoyproxy.envoy.config.endpoint.v3.ClusterLoadAssignment.Policy.DropOverload proto) {
164
    return DropOverload.create(proto.getCategory(), getRatePerMillion(proto.getDropPercentage()));
1✔
165
  }
166

167
  private static int getRatePerMillion(FractionalPercent percent) {
168
    int numerator = percent.getNumerator();
1✔
169
    FractionalPercent.DenominatorType type = percent.getDenominator();
1✔
170
    switch (type) {
1✔
171
      case TEN_THOUSAND:
172
        numerator *= 100;
×
173
        break;
×
174
      case HUNDRED:
175
        numerator *= 10_000;
×
176
        break;
×
177
      case MILLION:
178
        break;
1✔
179
      case UNRECOGNIZED:
180
      default:
181
        throw new IllegalArgumentException("Unknown denominator type of " + percent);
×
182
    }
183

184
    if (numerator > 1_000_000 || numerator < 0) {
1✔
185
      numerator = 1_000_000;
×
186
    }
187
    return numerator;
1✔
188
  }
189

190

191
  @VisibleForTesting
192
  @Nullable
193
  static StructOrError<LocalityLbEndpoints> parseLocalityLbEndpoints(
194
      io.envoyproxy.envoy.config.endpoint.v3.LocalityLbEndpoints proto)
195
      throws ResourceInvalidException {
196
    // Filter out localities without or with 0 weight.
197
    if (!proto.hasLoadBalancingWeight() || proto.getLoadBalancingWeight().getValue() < 1) {
1✔
198
      return null;
1✔
199
    }
200
    if (proto.getPriority() < 0) {
1✔
201
      return StructOrError.fromError("negative priority");
1✔
202
    }
203

204
    ImmutableMap<String, Object> localityMetadata;
205
    MetadataRegistry registry = MetadataRegistry.getInstance();
1✔
206
    try {
207
      localityMetadata = registry.parseMetadata(proto.getMetadata());
1✔
208
    } catch (ResourceInvalidException e) {
×
209
      throw new ResourceInvalidException("Failed to parse Locality Endpoint metadata: "
×
210
          + e.getMessage(), e);
×
211
    }
1✔
212
    List<Endpoints.LbEndpoint> endpoints = new ArrayList<>(proto.getLbEndpointsCount());
1✔
213
    for (io.envoyproxy.envoy.config.endpoint.v3.LbEndpoint endpoint : proto.getLbEndpointsList()) {
1✔
214
      // The endpoint field of each lb_endpoints must be set.
215
      // Inside of it: the address field must be set.
216
      if (!endpoint.hasEndpoint() || !endpoint.getEndpoint().hasAddress()) {
1✔
217
        return StructOrError.fromError("LbEndpoint with no endpoint/address");
×
218
      }
219
      ImmutableMap<String, Object> endpointMetadata;
220
      try {
221
        endpointMetadata = registry.parseMetadata(endpoint.getMetadata());
1✔
222
      } catch (ResourceInvalidException e) {
×
223
        throw new ResourceInvalidException("Failed to parse Endpoint metadata: "
×
224
            + e.getMessage(), e);
×
225
      }
1✔
226
      List<java.net.SocketAddress> addresses = new ArrayList<>();
1✔
227
      addresses.add(getInetSocketAddress(endpoint.getEndpoint().getAddress()));
1✔
228

229
      if (isEnabledXdsDualStack()) {
1✔
230
        for (Endpoint.AdditionalAddress additionalAddress
231
            : endpoint.getEndpoint().getAdditionalAddressesList()) {
1✔
232
          addresses.add(getInetSocketAddress(additionalAddress.getAddress()));
1✔
233
        }
1✔
234
      }
235
      boolean isHealthy = (endpoint.getHealthStatus() == HealthStatus.HEALTHY)
1✔
236
              || (endpoint.getHealthStatus() == HealthStatus.UNKNOWN);
1✔
237
      endpoints.add(Endpoints.LbEndpoint.create(
1✔
238
          new EquivalentAddressGroup(addresses),
239
          endpoint.getLoadBalancingWeight().getValue(), isHealthy,
1✔
240
          endpoint.getEndpoint().getHostname(),
1✔
241
          endpointMetadata));
242
    }
1✔
243
    return StructOrError.fromStruct(Endpoints.LocalityLbEndpoints.create(
1✔
244
        endpoints, proto.getLoadBalancingWeight().getValue(),
1✔
245
        proto.getPriority(), localityMetadata));
1✔
246
  }
247

248
  private static InetSocketAddress getInetSocketAddress(Address address) {
249
    io.envoyproxy.envoy.config.core.v3.SocketAddress socketAddress = address.getSocketAddress();
1✔
250

251
    return new InetSocketAddress(socketAddress.getAddress(), socketAddress.getPortValue());
1✔
252
  }
253

254
  static final class EdsUpdate implements ResourceUpdate {
255
    final String clusterName;
256
    final Map<Locality, LocalityLbEndpoints> localityLbEndpointsMap;
257
    final List<DropOverload> dropPolicies;
258

259
    EdsUpdate(String clusterName, Map<Locality, LocalityLbEndpoints> localityLbEndpoints,
260
              List<DropOverload> dropPolicies) {
1✔
261
      this.clusterName = checkNotNull(clusterName, "clusterName");
1✔
262
      this.localityLbEndpointsMap = Collections.unmodifiableMap(
1✔
263
          new LinkedHashMap<>(checkNotNull(localityLbEndpoints, "localityLbEndpoints")));
1✔
264
      this.dropPolicies = Collections.unmodifiableList(
1✔
265
          new ArrayList<>(checkNotNull(dropPolicies, "dropPolicies")));
1✔
266
    }
1✔
267

268
    @Override
269
    public boolean equals(Object o) {
270
      if (this == o) {
1✔
271
        return true;
×
272
      }
273
      if (o == null || getClass() != o.getClass()) {
1✔
274
        return false;
×
275
      }
276
      EdsUpdate that = (EdsUpdate) o;
1✔
277
      return Objects.equals(clusterName, that.clusterName)
1✔
278
          && Objects.equals(localityLbEndpointsMap, that.localityLbEndpointsMap)
1✔
279
          && Objects.equals(dropPolicies, that.dropPolicies);
1✔
280
    }
281

282
    @Override
283
    public int hashCode() {
284
      return Objects.hash(clusterName, localityLbEndpointsMap, dropPolicies);
1✔
285
    }
286

287
    @Override
288
    public String toString() {
289
      return
×
290
          MoreObjects
291
              .toStringHelper(this)
×
292
              .add("clusterName", clusterName)
×
293
              .add("localityLbEndpointsMap", localityLbEndpointsMap)
×
294
              .add("dropPolicies", dropPolicies)
×
295
              .toString();
×
296
    }
297
  }
298

299
  public static class AddressMetadataParser implements MetadataValueParser {
1✔
300

301
    @Override
302
    public String getTypeUrl() {
303
      return "type.googleapis.com/envoy.config.core.v3.Address";
1✔
304
    }
305

306
    @Override
307
    public java.net.SocketAddress parse(Any any) throws ResourceInvalidException {
308
      SocketAddress socketAddress;
309
      try {
310
        socketAddress = any.unpack(Address.class).getSocketAddress();
1✔
311
      } catch (InvalidProtocolBufferException ex) {
×
312
        throw new ResourceInvalidException("Invalid Resource in address proto", ex);
×
313
      }
1✔
314
      validateAddress(socketAddress);
1✔
315

316
      String ip = socketAddress.getAddress();
1✔
317
      int port = socketAddress.getPortValue();
1✔
318

319
      try {
320
        return new InetSocketAddress(InetAddresses.forString(ip), port);
1✔
321
      } catch (IllegalArgumentException e) {
×
322
        throw createException("Invalid IP address or port: " + ip + ":" + port);
×
323
      }
324
    }
325

326
    private void validateAddress(SocketAddress socketAddress) throws ResourceInvalidException {
327
      if (socketAddress.getAddress().isEmpty()) {
1✔
328
        throw createException("Address field is empty or invalid.");
×
329
      }
330
      long port = Integer.toUnsignedLong(socketAddress.getPortValue());
1✔
331
      if (port > 65535) {
1✔
332
        throw createException(String.format("Port value %d out of range 1-65535.", port));
×
333
      }
334
    }
1✔
335

336
    private ResourceInvalidException createException(String message) {
337
      return new ResourceInvalidException(
×
338
          "Failed to parse envoy.config.core.v3.Address: " + message);
339
    }
340
  }
341
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc