• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

grpc / grpc-java / #19104

15 Mar 2024 07:26PM UTC coverage: 88.315% (-0.004%) from 88.319%
#19104

push

github

web-flow
Have EDS resource parse the additional addresses from envoy message (#11011)

* Have EDS resource parse the additional addresses from envoy message
* Update respositories.bzl to point to current grpc-proto instead of a 2021 version.
* Update respositories.bzl to point to recent cncf/xds and envoyproxy/data-plane-api
* Add cncf_upda to repositories.bzl

31168 of 35292 relevant lines covered (88.31%)

0.88 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

81.19
/../xds/src/main/java/io/grpc/xds/XdsEndpointResource.java
1
/*
2
 * Copyright 2022 The gRPC Authors
3
 *
4
 * Licensed under the Apache License, Version 2.0 (the "License");
5
 * you may not use this file except in compliance with the License.
6
 * You may obtain a copy of the License at
7
 *
8
 *     http://www.apache.org/licenses/LICENSE-2.0
9
 *
10
 * Unless required by applicable law or agreed to in writing, software
11
 * distributed under the License is distributed on an "AS IS" BASIS,
12
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
 * See the License for the specific language governing permissions and
14
 * limitations under the License.
15
 */
16

17
package io.grpc.xds;
18

19
import static com.google.common.base.Preconditions.checkNotNull;
20

21
import com.google.common.annotations.VisibleForTesting;
22
import com.google.common.base.MoreObjects;
23
import com.google.protobuf.Message;
24
import io.envoyproxy.envoy.config.core.v3.Address;
25
import io.envoyproxy.envoy.config.core.v3.HealthStatus;
26
import io.envoyproxy.envoy.config.endpoint.v3.ClusterLoadAssignment;
27
import io.envoyproxy.envoy.config.endpoint.v3.Endpoint;
28
import io.envoyproxy.envoy.type.v3.FractionalPercent;
29
import io.grpc.EquivalentAddressGroup;
30
import io.grpc.internal.GrpcUtil;
31
import io.grpc.xds.Endpoints.DropOverload;
32
import io.grpc.xds.Endpoints.LocalityLbEndpoints;
33
import io.grpc.xds.XdsEndpointResource.EdsUpdate;
34
import io.grpc.xds.client.Locality;
35
import io.grpc.xds.client.XdsClient.ResourceUpdate;
36
import io.grpc.xds.client.XdsResourceType;
37
import java.net.InetSocketAddress;
38
import java.util.ArrayList;
39
import java.util.Collections;
40
import java.util.HashMap;
41
import java.util.HashSet;
42
import java.util.LinkedHashMap;
43
import java.util.List;
44
import java.util.Map;
45
import java.util.Objects;
46
import java.util.Set;
47
import javax.annotation.Nullable;
48

49
class XdsEndpointResource extends XdsResourceType<EdsUpdate> {
1✔
50
  static final String ADS_TYPE_URL_EDS =
51
      "type.googleapis.com/envoy.config.endpoint.v3.ClusterLoadAssignment";
52
  static final String GRPC_EXPERIMENTAL_XDS_DUALSTACK_ENDPOINTS =
53
      "grpc.experimental.xdsDualstackEndpoints";
54

55
  private static final XdsEndpointResource instance = new XdsEndpointResource();
1✔
56

57
  static XdsEndpointResource getInstance() {
58
    return instance;
1✔
59
  }
60

61
  @Override
62
  @Nullable
63
  protected String extractResourceName(Message unpackedResource) {
64
    if (!(unpackedResource instanceof ClusterLoadAssignment)) {
1✔
65
      return null;
×
66
    }
67
    return ((ClusterLoadAssignment) unpackedResource).getClusterName();
1✔
68
  }
69

70
  @Override
71
  public String typeName() {
72
    return "EDS";
1✔
73
  }
74

75
  @Override
76
  public String typeUrl() {
77
    return ADS_TYPE_URL_EDS;
1✔
78
  }
79

80
  @Override
81
  public boolean shouldRetrieveResourceKeysForArgs() {
82
    return true;
1✔
83
  }
84

85
  @Override
86
  protected boolean isFullStateOfTheWorld() {
87
    return false;
1✔
88
  }
89

90
  @Override
91
  protected Class<ClusterLoadAssignment> unpackedClassName() {
92
    return ClusterLoadAssignment.class;
1✔
93
  }
94

95
  @Override
96
  protected EdsUpdate doParse(Args args, Message unpackedMessage) throws ResourceInvalidException {
97
    if (!(unpackedMessage instanceof ClusterLoadAssignment)) {
1✔
98
      throw new ResourceInvalidException("Invalid message type: " + unpackedMessage.getClass());
×
99
    }
100
    return processClusterLoadAssignment((ClusterLoadAssignment) unpackedMessage);
1✔
101
  }
102

103
  private static boolean isEnabledXdsDualStack() {
104
    return GrpcUtil.getFlag(GRPC_EXPERIMENTAL_XDS_DUALSTACK_ENDPOINTS, false);
1✔
105
  }
106

107
  private static EdsUpdate processClusterLoadAssignment(ClusterLoadAssignment assignment)
108
      throws ResourceInvalidException {
109
    Map<Integer, Set<Locality>> priorities = new HashMap<>();
1✔
110
    Map<Locality, LocalityLbEndpoints> localityLbEndpointsMap = new LinkedHashMap<>();
1✔
111
    List<Endpoints.DropOverload> dropOverloads = new ArrayList<>();
1✔
112
    int maxPriority = -1;
1✔
113
    for (io.envoyproxy.envoy.config.endpoint.v3.LocalityLbEndpoints localityLbEndpointsProto
114
        : assignment.getEndpointsList()) {
1✔
115
      StructOrError<LocalityLbEndpoints> structOrError =
1✔
116
          parseLocalityLbEndpoints(localityLbEndpointsProto);
1✔
117
      if (structOrError == null) {
1✔
118
        continue;
1✔
119
      }
120
      if (structOrError.getErrorDetail() != null) {
1✔
121
        throw new ResourceInvalidException(structOrError.getErrorDetail());
1✔
122
      }
123

124
      LocalityLbEndpoints localityLbEndpoints = structOrError.getStruct();
1✔
125
      int priority = localityLbEndpoints.priority();
1✔
126
      maxPriority = Math.max(maxPriority, priority);
1✔
127
      // Note endpoints with health status other than HEALTHY and UNKNOWN are still
128
      // handed over to watching parties. It is watching parties' responsibility to
129
      // filter out unhealthy endpoints. See EnvoyProtoData.LbEndpoint#isHealthy().
130
      Locality locality =  parseLocality(localityLbEndpointsProto.getLocality());
1✔
131
      localityLbEndpointsMap.put(locality, localityLbEndpoints);
1✔
132
      if (!priorities.containsKey(priority)) {
1✔
133
        priorities.put(priority, new HashSet<>());
1✔
134
      }
135
      if (!priorities.get(priority).add(locality)) {
1✔
136
        throw new ResourceInvalidException("ClusterLoadAssignment has duplicate locality:"
1✔
137
            + locality + " for priority:" + priority);
138
      }
139
    }
1✔
140
    if (priorities.size() != maxPriority + 1) {
1✔
141
      throw new ResourceInvalidException("ClusterLoadAssignment has sparse priorities");
×
142
    }
143

144
    for (ClusterLoadAssignment.Policy.DropOverload dropOverloadProto
145
        : assignment.getPolicy().getDropOverloadsList()) {
1✔
146
      dropOverloads.add(parseDropOverload(dropOverloadProto));
1✔
147
    }
1✔
148
    return new EdsUpdate(assignment.getClusterName(), localityLbEndpointsMap, dropOverloads);
1✔
149
  }
150

151
  private static Locality parseLocality(io.envoyproxy.envoy.config.core.v3.Locality proto) {
152
    return Locality.create(proto.getRegion(), proto.getZone(), proto.getSubZone());
1✔
153
  }
154

155
  private static DropOverload parseDropOverload(
156
      io.envoyproxy.envoy.config.endpoint.v3.ClusterLoadAssignment.Policy.DropOverload proto) {
157
    return DropOverload.create(proto.getCategory(), getRatePerMillion(proto.getDropPercentage()));
1✔
158
  }
159

160
  private static int getRatePerMillion(FractionalPercent percent) {
161
    int numerator = percent.getNumerator();
1✔
162
    FractionalPercent.DenominatorType type = percent.getDenominator();
1✔
163
    switch (type) {
1✔
164
      case TEN_THOUSAND:
165
        numerator *= 100;
×
166
        break;
×
167
      case HUNDRED:
168
        numerator *= 10_000;
×
169
        break;
×
170
      case MILLION:
171
        break;
1✔
172
      case UNRECOGNIZED:
173
      default:
174
        throw new IllegalArgumentException("Unknown denominator type of " + percent);
×
175
    }
176

177
    if (numerator > 1_000_000 || numerator < 0) {
1✔
178
      numerator = 1_000_000;
×
179
    }
180
    return numerator;
1✔
181
  }
182

183

184
  @VisibleForTesting
185
  @Nullable
186
  static StructOrError<LocalityLbEndpoints> parseLocalityLbEndpoints(
187
      io.envoyproxy.envoy.config.endpoint.v3.LocalityLbEndpoints proto) {
188
    // Filter out localities without or with 0 weight.
189
    if (!proto.hasLoadBalancingWeight() || proto.getLoadBalancingWeight().getValue() < 1) {
1✔
190
      return null;
1✔
191
    }
192
    if (proto.getPriority() < 0) {
1✔
193
      return StructOrError.fromError("negative priority");
1✔
194
    }
195
    List<Endpoints.LbEndpoint> endpoints = new ArrayList<>(proto.getLbEndpointsCount());
1✔
196
    for (io.envoyproxy.envoy.config.endpoint.v3.LbEndpoint endpoint : proto.getLbEndpointsList()) {
1✔
197
      // The endpoint field of each lb_endpoints must be set.
198
      // Inside of it: the address field must be set.
199
      if (!endpoint.hasEndpoint() || !endpoint.getEndpoint().hasAddress()) {
1✔
200
        return StructOrError.fromError("LbEndpoint with no endpoint/address");
×
201
      }
202
      List<java.net.SocketAddress> addresses = new ArrayList<>();
1✔
203
      addresses.add(getInetSocketAddress(endpoint.getEndpoint().getAddress()));
1✔
204
      if (isEnabledXdsDualStack()) {
1✔
205
        for (Endpoint.AdditionalAddress additionalAddress
206
            : endpoint.getEndpoint().getAdditionalAddressesList()) {
1✔
207
          addresses.add(getInetSocketAddress(additionalAddress.getAddress()));
1✔
208
        }
1✔
209
      }
210
      boolean isHealthy = (endpoint.getHealthStatus() == HealthStatus.HEALTHY)
1✔
211
              || (endpoint.getHealthStatus() == HealthStatus.UNKNOWN);
1✔
212
      endpoints.add(Endpoints.LbEndpoint.create(
1✔
213
          new EquivalentAddressGroup(addresses),
214
          endpoint.getLoadBalancingWeight().getValue(), isHealthy));
1✔
215
    }
1✔
216
    return StructOrError.fromStruct(Endpoints.LocalityLbEndpoints.create(
1✔
217
        endpoints, proto.getLoadBalancingWeight().getValue(), proto.getPriority()));
1✔
218
  }
219

220
  private static InetSocketAddress getInetSocketAddress(Address address) {
221
    io.envoyproxy.envoy.config.core.v3.SocketAddress socketAddress = address.getSocketAddress();
1✔
222

223
    return new InetSocketAddress(socketAddress.getAddress(), socketAddress.getPortValue());
1✔
224
  }
225

226
  static final class EdsUpdate implements ResourceUpdate {
227
    final String clusterName;
228
    final Map<Locality, LocalityLbEndpoints> localityLbEndpointsMap;
229
    final List<DropOverload> dropPolicies;
230

231
    EdsUpdate(String clusterName, Map<Locality, LocalityLbEndpoints> localityLbEndpoints,
232
              List<DropOverload> dropPolicies) {
1✔
233
      this.clusterName = checkNotNull(clusterName, "clusterName");
1✔
234
      this.localityLbEndpointsMap = Collections.unmodifiableMap(
1✔
235
          new LinkedHashMap<>(checkNotNull(localityLbEndpoints, "localityLbEndpoints")));
1✔
236
      this.dropPolicies = Collections.unmodifiableList(
1✔
237
          new ArrayList<>(checkNotNull(dropPolicies, "dropPolicies")));
1✔
238
    }
1✔
239

240
    @Override
241
    public boolean equals(Object o) {
242
      if (this == o) {
1✔
243
        return true;
×
244
      }
245
      if (o == null || getClass() != o.getClass()) {
1✔
246
        return false;
×
247
      }
248
      EdsUpdate that = (EdsUpdate) o;
1✔
249
      return Objects.equals(clusterName, that.clusterName)
1✔
250
          && Objects.equals(localityLbEndpointsMap, that.localityLbEndpointsMap)
1✔
251
          && Objects.equals(dropPolicies, that.dropPolicies);
1✔
252
    }
253

254
    @Override
255
    public int hashCode() {
256
      return Objects.hash(clusterName, localityLbEndpointsMap, dropPolicies);
×
257
    }
258

259
    @Override
260
    public String toString() {
261
      return
×
262
          MoreObjects
263
              .toStringHelper(this)
×
264
              .add("clusterName", clusterName)
×
265
              .add("localityLbEndpointsMap", localityLbEndpointsMap)
×
266
              .add("dropPolicies", dropPolicies)
×
267
              .toString();
×
268
    }
269
  }
270
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc