• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

grpc / grpc-java / #19211

08 May 2024 10:50PM UTC coverage: 88.309% (-0.02%) from 88.328%
#19211

push

github

ejona86
xds: Plumb locality in xds_cluster_impl and weighted_target

As part of gRFC A78:

> To support the locality label in the WRR metrics, we will extend the
> `weighted_target` LB policy (see A28) to define a resolver attribute
> that indicates the name of its child. This attribute will be passed
> down to each of its children with the appropriate value, so that any
> LB policy that sits underneath the `weighted_target` policy will be
> able to use it.

xds_cluster_impl is involved because it uses the child names in the
AddressFilter, which must match the names used by weighted_target.
Instead of using Locality.toString() in multiple policies and assuming
the policies agree, we now have xds_cluster_impl decide the locality's
name and pass it down explicitly. This allows us to change the name
format to match gRFC A78:

> If locality information is available, the value of this label will be
> of the form `{region="${REGION}", zone="${ZONE}",
> sub_zone="${SUB_ZONE}"}`, where `${REGION}`, `${ZONE}`, and
> `${SUB_ZONE}` are replaced with the actual values. If no locality
> information is available, the label will be set to the empty string.

31515 of 35687 relevant lines covered (88.31%)

0.88 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

93.75
/../xds/src/main/java/io/grpc/xds/ClusterImplLoadBalancer.java
1
/*
2
 * Copyright 2020 The gRPC Authors
3
 *
4
 * Licensed under the Apache License, Version 2.0 (the "License");
5
 * you may not use this file except in compliance with the License.
6
 * You may obtain a copy of the License at
7
 *
8
 *     http://www.apache.org/licenses/LICENSE-2.0
9
 *
10
 * Unless required by applicable law or agreed to in writing, software
11
 * distributed under the License is distributed on an "AS IS" BASIS,
12
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
 * See the License for the specific language governing permissions and
14
 * limitations under the License.
15
 */
16

17
package io.grpc.xds;
18

19
import static com.google.common.base.Preconditions.checkNotNull;
20

21
import com.google.common.annotations.VisibleForTesting;
22
import com.google.common.base.MoreObjects;
23
import com.google.common.base.Strings;
24
import io.grpc.Attributes;
25
import io.grpc.ClientStreamTracer;
26
import io.grpc.ClientStreamTracer.StreamInfo;
27
import io.grpc.ConnectivityState;
28
import io.grpc.EquivalentAddressGroup;
29
import io.grpc.InternalLogId;
30
import io.grpc.LoadBalancer;
31
import io.grpc.Metadata;
32
import io.grpc.Status;
33
import io.grpc.internal.ForwardingClientStreamTracer;
34
import io.grpc.internal.ObjectPool;
35
import io.grpc.services.MetricReport;
36
import io.grpc.util.ForwardingLoadBalancerHelper;
37
import io.grpc.util.ForwardingSubchannel;
38
import io.grpc.util.GracefulSwitchLoadBalancer;
39
import io.grpc.xds.ClusterImplLoadBalancerProvider.ClusterImplConfig;
40
import io.grpc.xds.Endpoints.DropOverload;
41
import io.grpc.xds.EnvoyServerProtoData.UpstreamTlsContext;
42
import io.grpc.xds.ThreadSafeRandom.ThreadSafeRandomImpl;
43
import io.grpc.xds.XdsNameResolverProvider.CallCounterProvider;
44
import io.grpc.xds.client.Bootstrapper.ServerInfo;
45
import io.grpc.xds.client.LoadStatsManager2.ClusterDropStats;
46
import io.grpc.xds.client.LoadStatsManager2.ClusterLocalityStats;
47
import io.grpc.xds.client.Locality;
48
import io.grpc.xds.client.XdsClient;
49
import io.grpc.xds.client.XdsLogger;
50
import io.grpc.xds.client.XdsLogger.XdsLogLevel;
51
import io.grpc.xds.internal.security.SslContextProviderSupplier;
52
import io.grpc.xds.orca.OrcaPerRequestUtil;
53
import io.grpc.xds.orca.OrcaPerRequestUtil.OrcaPerRequestReportListener;
54
import java.util.ArrayList;
55
import java.util.Collections;
56
import java.util.List;
57
import java.util.Objects;
58
import java.util.concurrent.atomic.AtomicLong;
59
import javax.annotation.Nullable;
60

61
/**
62
 * Load balancer for cluster_impl_experimental LB policy. This LB policy is the child LB policy of
63
 * the priority_experimental LB policy and the parent LB policy of the weighted_target_experimental
64
 * LB policy in the xDS load balancing hierarchy. This LB policy applies cluster-level
65
 * configurations to requests sent to the corresponding cluster, such as drop policies, circuit
66
 * breakers.
67
 */
68
final class ClusterImplLoadBalancer extends LoadBalancer {
69

70
  @VisibleForTesting
71
  static final long DEFAULT_PER_CLUSTER_MAX_CONCURRENT_REQUESTS = 1024L;
72
  @VisibleForTesting
73
  static boolean enableCircuitBreaking =
1✔
74
      Strings.isNullOrEmpty(System.getenv("GRPC_XDS_EXPERIMENTAL_CIRCUIT_BREAKING"))
1✔
75
          || Boolean.parseBoolean(System.getenv("GRPC_XDS_EXPERIMENTAL_CIRCUIT_BREAKING"));
1✔
76

77
  private static final Attributes.Key<ClusterLocalityStats> ATTR_CLUSTER_LOCALITY_STATS =
1✔
78
      Attributes.Key.create("io.grpc.xds.ClusterImplLoadBalancer.clusterLocalityStats");
1✔
79
  private static final Attributes.Key<String> ATTR_CLUSTER_LOCALITY_NAME =
1✔
80
      Attributes.Key.create("io.grpc.xds.ClusterImplLoadBalancer.clusterLocalityName");
1✔
81

82
  private final XdsLogger logger;
83
  private final Helper helper;
84
  private final ThreadSafeRandom random;
85
  // The following fields are effectively final.
86
  private String cluster;
87
  @Nullable
88
  private String edsServiceName;
89
  private ObjectPool<XdsClient> xdsClientPool;
90
  private XdsClient xdsClient;
91
  private CallCounterProvider callCounterProvider;
92
  private ClusterDropStats dropStats;
93
  private ClusterImplLbHelper childLbHelper;
94
  private GracefulSwitchLoadBalancer childSwitchLb;
95

96
  ClusterImplLoadBalancer(Helper helper) {
97
    this(helper, ThreadSafeRandomImpl.instance);
1✔
98
  }
1✔
99

100
  ClusterImplLoadBalancer(Helper helper, ThreadSafeRandom random) {
1✔
101
    this.helper = checkNotNull(helper, "helper");
1✔
102
    this.random = checkNotNull(random, "random");
1✔
103
    InternalLogId logId = InternalLogId.allocate("cluster-impl-lb", helper.getAuthority());
1✔
104
    logger = XdsLogger.withLogId(logId);
1✔
105
    logger.log(XdsLogLevel.INFO, "Created");
1✔
106
  }
1✔
107

108
  @Override
109
  public Status acceptResolvedAddresses(ResolvedAddresses resolvedAddresses) {
110
    logger.log(XdsLogLevel.DEBUG, "Received resolution result: {0}", resolvedAddresses);
1✔
111
    Attributes attributes = resolvedAddresses.getAttributes();
1✔
112
    if (xdsClientPool == null) {
1✔
113
      xdsClientPool = attributes.get(InternalXdsAttributes.XDS_CLIENT_POOL);
1✔
114
      assert xdsClientPool != null;
1✔
115
      xdsClient = xdsClientPool.getObject();
1✔
116
    }
117
    if (callCounterProvider == null) {
1✔
118
      callCounterProvider = attributes.get(InternalXdsAttributes.CALL_COUNTER_PROVIDER);
1✔
119
    }
120

121
    ClusterImplConfig config =
1✔
122
        (ClusterImplConfig) resolvedAddresses.getLoadBalancingPolicyConfig();
1✔
123
    if (config == null) {
1✔
124
      return Status.INTERNAL.withDescription("No cluster configuration found");
×
125
    }
126

127
    if (cluster == null) {
1✔
128
      cluster = config.cluster;
1✔
129
      edsServiceName = config.edsServiceName;
1✔
130
      childLbHelper = new ClusterImplLbHelper(
1✔
131
          callCounterProvider.getOrCreate(config.cluster, config.edsServiceName),
1✔
132
          config.lrsServerInfo);
133
      childSwitchLb = new GracefulSwitchLoadBalancer(childLbHelper);
1✔
134
      // Assume load report server does not change throughout cluster lifetime.
135
      if (config.lrsServerInfo != null) {
1✔
136
        dropStats = xdsClient.addClusterDropStats(config.lrsServerInfo, cluster, edsServiceName);
1✔
137
      }
138
    }
139

140
    childLbHelper.updateDropPolicies(config.dropCategories);
1✔
141
    childLbHelper.updateMaxConcurrentRequests(config.maxConcurrentRequests);
1✔
142
    childLbHelper.updateSslContextProviderSupplier(config.tlsContext);
1✔
143

144
    childSwitchLb.switchTo(config.childPolicy.getProvider());
1✔
145
    childSwitchLb.handleResolvedAddresses(
1✔
146
        resolvedAddresses.toBuilder()
1✔
147
            .setAttributes(attributes)
1✔
148
            .setLoadBalancingPolicyConfig(config.childPolicy.getConfig())
1✔
149
            .build());
1✔
150
    return Status.OK;
1✔
151
  }
152

153
  @Override
154
  public void handleNameResolutionError(Status error) {
155
    if (childSwitchLb != null) {
1✔
156
      childSwitchLb.handleNameResolutionError(error);
1✔
157
    } else {
158
      helper.updateBalancingState(
1✔
159
          ConnectivityState.TRANSIENT_FAILURE, new FixedResultPicker(PickResult.withError(error)));
1✔
160
    }
161
  }
1✔
162

163
  @Override
164
  public void shutdown() {
165
    if (dropStats != null) {
1✔
166
      dropStats.release();
1✔
167
    }
168
    if (childSwitchLb != null) {
1✔
169
      childSwitchLb.shutdown();
1✔
170
      if (childLbHelper != null) {
1✔
171
        childLbHelper.updateSslContextProviderSupplier(null);
1✔
172
        childLbHelper = null;
1✔
173
      }
174
    }
175
    if (xdsClient != null) {
1✔
176
      xdsClient = xdsClientPool.returnObject(xdsClient);
1✔
177
    }
178
  }
1✔
179

180
  /**
181
   * A decorated {@link LoadBalancer.Helper} that applies configurations for connections
182
   * or requests to endpoints in the cluster.
183
   */
184
  private final class ClusterImplLbHelper extends ForwardingLoadBalancerHelper {
185
    private final AtomicLong inFlights;
186
    private ConnectivityState currentState = ConnectivityState.IDLE;
1✔
187
    private SubchannelPicker currentPicker = new FixedResultPicker(PickResult.withNoResult());
1✔
188
    private List<DropOverload> dropPolicies = Collections.emptyList();
1✔
189
    private long maxConcurrentRequests = DEFAULT_PER_CLUSTER_MAX_CONCURRENT_REQUESTS;
1✔
190
    @Nullable
191
    private SslContextProviderSupplier sslContextProviderSupplier;
192
    @Nullable
193
    private final ServerInfo lrsServerInfo;
194

195
    private ClusterImplLbHelper(AtomicLong inFlights, @Nullable ServerInfo lrsServerInfo) {
1✔
196
      this.inFlights = checkNotNull(inFlights, "inFlights");
1✔
197
      this.lrsServerInfo = lrsServerInfo;
1✔
198
    }
1✔
199

200
    @Override
201
    public void updateBalancingState(ConnectivityState newState, SubchannelPicker newPicker) {
202
      currentState = newState;
1✔
203
      currentPicker =  newPicker;
1✔
204
      SubchannelPicker picker =
1✔
205
          new RequestLimitingSubchannelPicker(newPicker, dropPolicies, maxConcurrentRequests);
206
      delegate().updateBalancingState(newState, picker);
1✔
207
    }
1✔
208

209
    @Override
210
    public Subchannel createSubchannel(CreateSubchannelArgs args) {
211
      List<EquivalentAddressGroup> addresses = withAdditionalAttributes(args.getAddresses());
1✔
212
      Locality locality = args.getAddresses().get(0).getAttributes().get(
1✔
213
          InternalXdsAttributes.ATTR_LOCALITY);  // all addresses should be in the same locality
214
      String localityName = args.getAddresses().get(0).getAttributes().get(
1✔
215
          InternalXdsAttributes.ATTR_LOCALITY_NAME);
216
      // Endpoint addresses resolved by ClusterResolverLoadBalancer should always contain
217
      // attributes with its locality, including endpoints in LOGICAL_DNS clusters.
218
      // In case of not (which really shouldn't), loads are aggregated under an empty locality.
219
      if (locality == null) {
1✔
220
        locality = Locality.create("", "", "");
×
221
        localityName = "";
×
222
      }
223
      final ClusterLocalityStats localityStats =
224
          (lrsServerInfo == null)
1✔
225
              ? null
1✔
226
              : xdsClient.addClusterLocalityStats(lrsServerInfo, cluster,
1✔
227
              edsServiceName, locality);
1✔
228

229
      Attributes attrs = args.getAttributes().toBuilder()
1✔
230
          .set(ATTR_CLUSTER_LOCALITY_STATS, localityStats)
1✔
231
          .set(ATTR_CLUSTER_LOCALITY_NAME, localityName)
1✔
232
          .build();
1✔
233
      args = args.toBuilder().setAddresses(addresses).setAttributes(attrs).build();
1✔
234
      final Subchannel subchannel = delegate().createSubchannel(args);
1✔
235

236
      return new ForwardingSubchannel() {
1✔
237
        @Override
238
        public void shutdown() {
239
          if (localityStats != null) {
1✔
240
            localityStats.release();
1✔
241
          }
242
          delegate().shutdown();
1✔
243
        }
1✔
244

245
        @Override
246
        public void updateAddresses(List<EquivalentAddressGroup> addresses) {
247
          delegate().updateAddresses(withAdditionalAttributes(addresses));
1✔
248
        }
1✔
249

250
        @Override
251
        protected Subchannel delegate() {
252
          return subchannel;
1✔
253
        }
254
      };
255
    }
256

257
    private List<EquivalentAddressGroup> withAdditionalAttributes(
258
        List<EquivalentAddressGroup> addresses) {
259
      List<EquivalentAddressGroup> newAddresses = new ArrayList<>();
1✔
260
      for (EquivalentAddressGroup eag : addresses) {
1✔
261
        Attributes.Builder attrBuilder = eag.getAttributes().toBuilder().set(
1✔
262
            InternalXdsAttributes.ATTR_CLUSTER_NAME, cluster);
1✔
263
        if (sslContextProviderSupplier != null) {
1✔
264
          attrBuilder.set(
1✔
265
              InternalXdsAttributes.ATTR_SSL_CONTEXT_PROVIDER_SUPPLIER,
266
              sslContextProviderSupplier);
267
        }
268
        newAddresses.add(new EquivalentAddressGroup(eag.getAddresses(), attrBuilder.build()));
1✔
269
      }
1✔
270
      return newAddresses;
1✔
271
    }
272

273
    @Override
274
    protected Helper delegate()  {
275
      return helper;
1✔
276
    }
277

278
    private void updateDropPolicies(List<DropOverload> dropOverloads) {
279
      if (!dropPolicies.equals(dropOverloads)) {
1✔
280
        dropPolicies = dropOverloads;
1✔
281
        updateBalancingState(currentState, currentPicker);
1✔
282
      }
283
    }
1✔
284

285
    private void updateMaxConcurrentRequests(@Nullable Long maxConcurrentRequests) {
286
      if (Objects.equals(this.maxConcurrentRequests, maxConcurrentRequests)) {
1✔
287
        return;
×
288
      }
289
      this.maxConcurrentRequests =
1✔
290
          maxConcurrentRequests != null
1✔
291
              ? maxConcurrentRequests
1✔
292
              : DEFAULT_PER_CLUSTER_MAX_CONCURRENT_REQUESTS;
1✔
293
      updateBalancingState(currentState, currentPicker);
1✔
294
    }
1✔
295

296
    private void updateSslContextProviderSupplier(@Nullable UpstreamTlsContext tlsContext) {
297
      UpstreamTlsContext currentTlsContext =
298
          sslContextProviderSupplier != null
1✔
299
              ? (UpstreamTlsContext)sslContextProviderSupplier.getTlsContext()
1✔
300
              : null;
1✔
301
      if (Objects.equals(currentTlsContext,  tlsContext)) {
1✔
302
        return;
1✔
303
      }
304
      if (sslContextProviderSupplier != null) {
1✔
305
        sslContextProviderSupplier.close();
1✔
306
      }
307
      sslContextProviderSupplier =
1✔
308
          tlsContext != null
1✔
309
              ? new SslContextProviderSupplier(tlsContext,
1✔
310
                                               (TlsContextManager) xdsClient.getSecurityConfig())
1✔
311
              : null;
1✔
312
    }
1✔
313

314
    private class RequestLimitingSubchannelPicker extends SubchannelPicker {
315
      private final SubchannelPicker delegate;
316
      private final List<DropOverload> dropPolicies;
317
      private final long maxConcurrentRequests;
318

319
      private RequestLimitingSubchannelPicker(SubchannelPicker delegate,
320
          List<DropOverload> dropPolicies, long maxConcurrentRequests) {
1✔
321
        this.delegate = delegate;
1✔
322
        this.dropPolicies = dropPolicies;
1✔
323
        this.maxConcurrentRequests = maxConcurrentRequests;
1✔
324
      }
1✔
325

326
      @Override
327
      public PickResult pickSubchannel(PickSubchannelArgs args) {
328
        for (DropOverload dropOverload : dropPolicies) {
1✔
329
          int rand = random.nextInt(1_000_000);
1✔
330
          if (rand < dropOverload.dropsPerMillion()) {
1✔
331
            logger.log(XdsLogLevel.INFO, "Drop request with category: {0}",
1✔
332
                dropOverload.category());
1✔
333
            if (dropStats != null) {
1✔
334
              dropStats.recordDroppedRequest(dropOverload.category());
1✔
335
            }
336
            return PickResult.withDrop(
1✔
337
                Status.UNAVAILABLE.withDescription("Dropped: " + dropOverload.category()));
1✔
338
          }
339
        }
1✔
340
        final PickResult result = delegate.pickSubchannel(args);
1✔
341
        if (result.getStatus().isOk() && result.getSubchannel() != null) {
1✔
342
          if (enableCircuitBreaking) {
1✔
343
            if (inFlights.get() >= maxConcurrentRequests) {
1✔
344
              if (dropStats != null) {
1✔
345
                dropStats.recordDroppedRequest();
1✔
346
              }
347
              return PickResult.withDrop(Status.UNAVAILABLE.withDescription(
1✔
348
                  "Cluster max concurrent requests limit exceeded"));
349
            }
350
          }
351
          final ClusterLocalityStats stats =
1✔
352
              result.getSubchannel().getAttributes().get(ATTR_CLUSTER_LOCALITY_STATS);
1✔
353
          if (stats != null) {
1✔
354
            String localityName =
1✔
355
                result.getSubchannel().getAttributes().get(ATTR_CLUSTER_LOCALITY_NAME);
1✔
356
            args.getPickDetailsConsumer().addOptionalLabel("grpc.lb.locality", localityName);
1✔
357

358
            ClientStreamTracer.Factory tracerFactory = new CountingStreamTracerFactory(
1✔
359
                stats, inFlights, result.getStreamTracerFactory());
1✔
360
            ClientStreamTracer.Factory orcaTracerFactory = OrcaPerRequestUtil.getInstance()
1✔
361
                .newOrcaClientStreamTracerFactory(tracerFactory, new OrcaPerRpcListener(stats));
1✔
362
            return PickResult.withSubchannel(result.getSubchannel(), orcaTracerFactory);
1✔
363
          }
364
        }
365
        return result;
1✔
366
      }
367

368
      @Override
369
      public String toString() {
370
        return MoreObjects.toStringHelper(this).add("delegate", delegate).toString();
×
371
      }
372
    }
373
  }
374

375
  private static final class CountingStreamTracerFactory extends
376
      ClientStreamTracer.Factory {
377
    private final ClusterLocalityStats stats;
378
    private final AtomicLong inFlights;
379
    @Nullable
380
    private final ClientStreamTracer.Factory delegate;
381

382
    private CountingStreamTracerFactory(
383
        ClusterLocalityStats stats, AtomicLong inFlights,
384
        @Nullable ClientStreamTracer.Factory delegate) {
1✔
385
      this.stats = checkNotNull(stats, "stats");
1✔
386
      this.inFlights = checkNotNull(inFlights, "inFlights");
1✔
387
      this.delegate = delegate;
1✔
388
    }
1✔
389

390
    @Override
391
    public ClientStreamTracer newClientStreamTracer(StreamInfo info, Metadata headers) {
392
      stats.recordCallStarted();
1✔
393
      inFlights.incrementAndGet();
1✔
394
      if (delegate == null) {
1✔
395
        return new ClientStreamTracer() {
1✔
396
          @Override
397
          public void streamClosed(Status status) {
398
            stats.recordCallFinished(status);
1✔
399
            inFlights.decrementAndGet();
1✔
400
          }
1✔
401
        };
402
      }
403
      final ClientStreamTracer delegatedTracer = delegate.newClientStreamTracer(info, headers);
×
404
      return new ForwardingClientStreamTracer() {
×
405
        @Override
406
        protected ClientStreamTracer delegate() {
407
          return delegatedTracer;
×
408
        }
409

410
        @Override
411
        public void streamClosed(Status status) {
412
          stats.recordCallFinished(status);
×
413
          inFlights.decrementAndGet();
×
414
          delegate().streamClosed(status);
×
415
        }
×
416
      };
417
    }
418
  }
419

420
  private static final class OrcaPerRpcListener implements OrcaPerRequestReportListener {
421

422
    private final ClusterLocalityStats stats;
423

424
    private OrcaPerRpcListener(ClusterLocalityStats stats) {
1✔
425
      this.stats = checkNotNull(stats, "stats");
1✔
426
    }
1✔
427

428
    /**
429
     * Copies {@link MetricReport#getNamedMetrics()} to {@link ClusterLocalityStats} such that it is
430
     * included in the snapshot for the LRS report sent to the LRS server.
431
     */
432
    @Override
433
    public void onLoadReport(MetricReport report) {
434
      stats.recordBackendLoadMetricStats(report.getNamedMetrics());
1✔
435
    }
1✔
436
  }
437
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc