• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

grpc / grpc-java / #19868

17 Jun 2025 01:36PM UTC coverage: 88.559% (-0.01%) from 88.571%
#19868

push

github

ejona86
xds: XdsNR should be subscribing to clusters with XdsDepManager

This is missing behavior defined in gRFC A74:

> As per gRFC A31, the ConfigSelector gives each RPC a ref to the
> cluster that was selected for it to ensure that the cluster is not
> removed from the xds_cluster_manager LB policy config before the RPC
> is done with its LB picks. These cluster refs will also hold a
> subscription for the cluster from the XdsDependencyManager, so that
> the XdsDependencyManager will not stop watching the cluster resource
> until the cluster is removed from the xds_cluster_manager LB policy
> config.

Without the logic, RPCs can race and see the error:

> INTERNAL: CdsLb for cluster0: Unable to find non-dynamic root cluster

Fixes #12152. This fixes the regression introduced in 297ab05e

34552 of 39016 relevant lines covered (88.56%)

0.89 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

96.12
/../util/src/main/java/io/grpc/util/MultiChildLoadBalancer.java
1
/*
2
 * Copyright 2023 The gRPC Authors
3
 *
4
 * Licensed under the Apache License, Version 2.0 (the "License");
5
 * you may not use this file except in compliance with the License.
6
 * You may obtain a copy of the License at
7
 *
8
 *     http://www.apache.org/licenses/LICENSE-2.0
9
 *
10
 * Unless required by applicable law or agreed to in writing, software
11
 * distributed under the License is distributed on an "AS IS" BASIS,
12
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
 * See the License for the specific language governing permissions and
14
 * limitations under the License.
15
 */
16

17
package io.grpc.util;
18

19
import static com.google.common.base.Preconditions.checkNotNull;
20
import static io.grpc.ConnectivityState.CONNECTING;
21
import static io.grpc.ConnectivityState.IDLE;
22
import static io.grpc.ConnectivityState.READY;
23
import static io.grpc.ConnectivityState.SHUTDOWN;
24
import static io.grpc.ConnectivityState.TRANSIENT_FAILURE;
25

26
import com.google.common.annotations.VisibleForTesting;
27
import com.google.common.collect.Maps;
28
import io.grpc.Attributes;
29
import io.grpc.ConnectivityState;
30
import io.grpc.EquivalentAddressGroup;
31
import io.grpc.Internal;
32
import io.grpc.LoadBalancer;
33
import io.grpc.LoadBalancerProvider;
34
import io.grpc.Status;
35
import io.grpc.internal.PickFirstLoadBalancerProvider;
36
import java.net.SocketAddress;
37
import java.util.ArrayList;
38
import java.util.Collection;
39
import java.util.Collections;
40
import java.util.HashSet;
41
import java.util.List;
42
import java.util.Map;
43
import java.util.logging.Level;
44
import java.util.logging.Logger;
45
import javax.annotation.Nullable;
46

47
/**
48
 * A base load balancing policy for those policies which has multiple children such as
49
 * ClusterManager or the petiole policies.  For internal use only.
50
 */
51
@Internal
52
public abstract class MultiChildLoadBalancer extends LoadBalancer {
53

54
  private static final Logger logger = Logger.getLogger(MultiChildLoadBalancer.class.getName());
1✔
55
  // Modify by replacing the list to release memory when no longer used.
56
  private List<ChildLbState> childLbStates = new ArrayList<>(0);
1✔
57
  private final Helper helper;
58
  // Set to true if currently in the process of handling resolved addresses.
59
  protected boolean resolvingAddresses;
60

61
  protected final LoadBalancerProvider pickFirstLbProvider = new PickFirstLoadBalancerProvider();
1✔
62

63
  protected ConnectivityState currentConnectivityState;
64

65

66
  protected MultiChildLoadBalancer(Helper helper) {
1✔
67
    this.helper = checkNotNull(helper, "helper");
1✔
68
    logger.log(Level.FINE, "Created");
1✔
69
  }
1✔
70

71
  /**
72
   * Using the state of all children will calculate the current connectivity state,
73
   * update fields, generate a picker and then call
74
   * {@link Helper#updateBalancingState(ConnectivityState, SubchannelPicker)}.
75
   */
76
  protected abstract void updateOverallBalancingState();
77

78
  /**
79
   * Override to utilize parsing of the policy configuration or alternative helper/lb generation.
80
   * Override this if keys are not Endpoints or if child policies have configuration. Null map
81
   * values preserve the child without delivering the child an update.
82
   */
83
  protected Map<Object, ResolvedAddresses> createChildAddressesMap(
84
      ResolvedAddresses resolvedAddresses) {
85
    Map<Object, ResolvedAddresses> childAddresses =
1✔
86
        Maps.newLinkedHashMapWithExpectedSize(resolvedAddresses.getAddresses().size());
1✔
87
    for (EquivalentAddressGroup eag : resolvedAddresses.getAddresses()) {
1✔
88
      ResolvedAddresses addresses = resolvedAddresses.toBuilder()
1✔
89
          .setAddresses(Collections.singletonList(eag))
1✔
90
          .setAttributes(Attributes.newBuilder().set(IS_PETIOLE_POLICY, true).build())
1✔
91
          .setLoadBalancingPolicyConfig(null)
1✔
92
          .build();
1✔
93
      childAddresses.put(new Endpoint(eag), addresses);
1✔
94
    }
1✔
95
    return childAddresses;
1✔
96
  }
97

98
  /**
99
   * Override to create an instance of a subclass.
100
   */
101
  protected ChildLbState createChildLbState(Object key) {
102
    return new ChildLbState(key, pickFirstLbProvider);
1✔
103
  }
104

105
  /**
106
   *   Override to completely replace the default logic or to do additional activities.
107
   */
108
  @Override
109
  public Status acceptResolvedAddresses(ResolvedAddresses resolvedAddresses) {
110
    logger.log(Level.FINE, "Received resolution result: {0}", resolvedAddresses);
1✔
111
    try {
112
      resolvingAddresses = true;
1✔
113

114
      // process resolvedAddresses to update children
115
      Map<Object, ResolvedAddresses> newChildAddresses = createChildAddressesMap(resolvedAddresses);
1✔
116

117
      // Handle error case
118
      if (newChildAddresses.isEmpty()) {
1✔
119
        Status unavailableStatus = Status.UNAVAILABLE.withDescription(
1✔
120
            "NameResolver returned no usable address. " + resolvedAddresses);
121
        handleNameResolutionError(unavailableStatus);
1✔
122
        return unavailableStatus;
1✔
123
      }
124

125
      return updateChildrenWithResolvedAddresses(newChildAddresses);
1✔
126
    } finally {
127
      resolvingAddresses = false;
1✔
128
    }
129
  }
130

131
  /**
132
   * Handle the name resolution error.
133
   *
134
   * <p/>Override if you need special handling.
135
   */
136
  @Override
137
  public void handleNameResolutionError(Status error) {
138
    if (currentConnectivityState != READY)  {
1✔
139
      helper.updateBalancingState(
1✔
140
          TRANSIENT_FAILURE, new FixedResultPicker(PickResult.withError(error)));
1✔
141
    }
142
  }
1✔
143

144
  @Override
145
  public void shutdown() {
146
    logger.log(Level.FINE, "Shutdown");
1✔
147
    for (ChildLbState state : childLbStates) {
1✔
148
      state.shutdown();
1✔
149
    }
1✔
150
    childLbStates.clear();
1✔
151
  }
1✔
152

153
  private Status updateChildrenWithResolvedAddresses(
154
      Map<Object, ResolvedAddresses> newChildAddresses) {
155
    // Create a map with the old values
156
    Map<Object, ChildLbState> oldStatesMap =
1✔
157
        Maps.newLinkedHashMapWithExpectedSize(childLbStates.size());
1✔
158
    for (ChildLbState state : childLbStates) {
1✔
159
      oldStatesMap.put(state.getKey(), state);
1✔
160
    }
1✔
161

162
    // Move ChildLbStates from the map to a new list (preserving the new map's order)
163
    Status status = Status.OK;
1✔
164
    List<ChildLbState> newChildLbStates = new ArrayList<>(newChildAddresses.size());
1✔
165
    for (Map.Entry<Object, ResolvedAddresses> entry : newChildAddresses.entrySet()) {
1✔
166
      ChildLbState childLbState = oldStatesMap.remove(entry.getKey());
1✔
167
      if (childLbState == null) {
1✔
168
        childLbState = createChildLbState(entry.getKey());
1✔
169
      }
170
      newChildLbStates.add(childLbState);
1✔
171
      if (entry.getValue() != null) {
1✔
172
        // update child LB
173
        Status newStatus = childLbState.lb.acceptResolvedAddresses(entry.getValue());
1✔
174
        if (!newStatus.isOk()) {
1✔
175
          status = newStatus;
×
176
        }
177
      }
178
    }
1✔
179

180
    childLbStates = newChildLbStates;
1✔
181
    // Update the picker and our connectivity state
182
    updateOverallBalancingState();
1✔
183

184
    // Remaining entries in map are orphaned
185
    for (ChildLbState childLbState : oldStatesMap.values()) {
1✔
186
      childLbState.shutdown();
1✔
187
    }
1✔
188
    return status;
1✔
189
  }
190

191
  @Nullable
192
  protected static ConnectivityState aggregateState(
193
      @Nullable ConnectivityState overallState, ConnectivityState childState) {
194
    if (overallState == null) {
1✔
195
      return childState;
1✔
196
    }
197
    if (overallState == READY || childState == READY) {
1✔
198
      return READY;
1✔
199
    }
200
    if (overallState == CONNECTING || childState == CONNECTING) {
1✔
201
      return CONNECTING;
1✔
202
    }
203
    if (overallState == IDLE || childState == IDLE) {
1✔
204
      return IDLE;
×
205
    }
206
    return overallState;
1✔
207
  }
208

209
  protected final Helper getHelper() {
210
    return helper;
1✔
211
  }
212

213
  @VisibleForTesting
214
  public final Collection<ChildLbState> getChildLbStates() {
215
    return childLbStates;
1✔
216
  }
217

218
  /**
219
   * Filters out non-ready child load balancers (subchannels).
220
   */
221
  protected final List<ChildLbState> getReadyChildren() {
222
    List<ChildLbState> activeChildren = new ArrayList<>();
1✔
223
    for (ChildLbState child : getChildLbStates()) {
1✔
224
      if (child.getCurrentState() == READY) {
1✔
225
        activeChildren.add(child);
1✔
226
      }
227
    }
1✔
228
    return activeChildren;
1✔
229
  }
230

231
  /**
232
   * This represents the state of load balancer children.  Each endpoint (represented by an
233
   * EquivalentAddressGroup or EDS string) will have a separate ChildLbState which in turn will
234
   * have a single child LoadBalancer created from the provided factory.
235
   *
236
   * <p>A ChildLbStateHelper is the glue between ChildLbState and the helpers associated with the
237
   * petiole policy above and the PickFirstLoadBalancer's helper below.
238
   *
239
   * <p>If you wish to store additional state information related to each subchannel, then extend
240
   * this class.
241
   */
242
  public class ChildLbState {
243
    private final Object key;
244
    private final LoadBalancer lb;
245
    private ConnectivityState currentState;
246
    private SubchannelPicker currentPicker = new FixedResultPicker(PickResult.withNoResult());
1✔
247

248
    public ChildLbState(Object key, LoadBalancer.Factory policyFactory) {
1✔
249
      this.key = key;
1✔
250
      this.lb = policyFactory.newLoadBalancer(createChildHelper());
1✔
251
      this.currentState = CONNECTING;
1✔
252
    }
1✔
253

254
    protected ChildLbStateHelper createChildHelper() {
255
      return new ChildLbStateHelper();
1✔
256
    }
257

258
    /**
259
     * Override for unique behavior such as delayed shutdowns of subchannels.
260
     */
261
    protected void shutdown() {
262
      lb.shutdown();
1✔
263
      this.currentState = SHUTDOWN;
1✔
264
      logger.log(Level.FINE, "Child balancer {0} deleted", key);
1✔
265
    }
1✔
266

267
    @Override
268
    public String toString() {
269
      return "Address = " + key
×
270
          + ", state = " + currentState
271
          + ", picker type: " + currentPicker.getClass()
×
272
          + ", lb: " + lb;
273
    }
274

275
    public final Object getKey() {
276
      return key;
1✔
277
    }
278

279
    @VisibleForTesting
280
    public final LoadBalancer getLb() {
281
      return lb;
1✔
282
    }
283

284
    @VisibleForTesting
285
    public final SubchannelPicker getCurrentPicker() {
286
      return currentPicker;
1✔
287
    }
288

289
    public final ConnectivityState getCurrentState() {
290
      return currentState;
1✔
291
    }
292

293
    protected final void setCurrentState(ConnectivityState newState) {
294
      currentState = newState;
1✔
295
    }
1✔
296

297
    protected final void setCurrentPicker(SubchannelPicker newPicker) {
298
      currentPicker = newPicker;
1✔
299
    }
1✔
300

301
    /**
302
     * ChildLbStateHelper is the glue between ChildLbState and the helpers associated with the
303
     * petiole policy above and the PickFirstLoadBalancer's helper below.
304
     *
305
     * <p>The ChildLbState updates happen during updateBalancingState.  Otherwise, it is doing
306
     * simple forwarding.
307
     */
308
    protected class ChildLbStateHelper extends ForwardingLoadBalancerHelper {
1✔
309

310
      /**
311
       * Update current state and picker for this child and then use
312
       * {@link #updateOverallBalancingState()} for the parent LB.
313
       */
314
      @Override
315
      public void updateBalancingState(final ConnectivityState newState,
316
          final SubchannelPicker newPicker) {
317
        if (currentState == SHUTDOWN) {
1✔
318
          return;
×
319
        }
320

321
        currentState = newState;
1✔
322
        currentPicker = newPicker;
1✔
323
        // If we are already in the process of resolving addresses, the overall balancing state
324
        // will be updated at the end of it, and we don't need to trigger that update here.
325
        if (!resolvingAddresses) {
1✔
326
          updateOverallBalancingState();
1✔
327
        }
328
      }
1✔
329

330
      @Override
331
      protected Helper delegate() {
332
        return helper;
1✔
333
      }
334
    }
335
  }
336

337
  /**
338
   * Endpoint is an optimization to quickly lookup and compare EquivalentAddressGroup address sets.
339
   * It ignores the attributes. Is used as a key for ChildLbState for most load balancers
340
   * (ClusterManagerLB uses a String).
341
   */
342
  protected static class Endpoint {
343
    final Collection<SocketAddress> addrs;
344
    final int hashCode;
345

346
    public Endpoint(EquivalentAddressGroup eag) {
1✔
347
      checkNotNull(eag, "eag");
1✔
348

349
      if (eag.getAddresses().size() < 10) {
1✔
350
        addrs = eag.getAddresses();
1✔
351
      } else {
352
        // This is expected to be very unlikely in practice
353
        addrs = new HashSet<>(eag.getAddresses());
1✔
354
      }
355
      int sum = 0;
1✔
356
      for (SocketAddress address : eag.getAddresses()) {
1✔
357
        sum += address.hashCode();
1✔
358
      }
1✔
359
      hashCode = sum;
1✔
360
    }
1✔
361

362
    @Override
363
    public int hashCode() {
364
      return hashCode;
1✔
365
    }
366

367
    @Override
368
    public boolean equals(Object other) {
369
      if (this == other) {
1✔
370
        return true;
1✔
371
      }
372

373
      if (!(other instanceof Endpoint)) {
1✔
374
        return false;
1✔
375
      }
376
      Endpoint o = (Endpoint) other;
1✔
377
      if (o.hashCode != hashCode || o.addrs.size() != addrs.size()) {
1✔
378
        return false;
1✔
379
      }
380

381
      return o.addrs.containsAll(addrs);
1✔
382
    }
383

384
    @Override
385
    public String toString() {
386
      return addrs.toString();
1✔
387
    }
388
  }
389
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc