• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

grpc / grpc-java / #19481

30 Sep 2024 03:17PM UTC coverage: 84.554% (-0.03%) from 84.584%
#19481

push

github

ejona86
xds: Have ClusterManagerLB use child map for preserving children

Instead of doing a dance of supplementing config so the later
createChildAddressesMap() won't delete children, just look at the
existing children and don't delete any that shouldn't be deleted.

33645 of 39791 relevant lines covered (84.55%)

0.85 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

96.0
/../util/src/main/java/io/grpc/util/MultiChildLoadBalancer.java
1
/*
2
 * Copyright 2023 The gRPC Authors
3
 *
4
 * Licensed under the Apache License, Version 2.0 (the "License");
5
 * you may not use this file except in compliance with the License.
6
 * You may obtain a copy of the License at
7
 *
8
 *     http://www.apache.org/licenses/LICENSE-2.0
9
 *
10
 * Unless required by applicable law or agreed to in writing, software
11
 * distributed under the License is distributed on an "AS IS" BASIS,
12
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
 * See the License for the specific language governing permissions and
14
 * limitations under the License.
15
 */
16

17
package io.grpc.util;
18

19
import static com.google.common.base.Preconditions.checkNotNull;
20
import static io.grpc.ConnectivityState.CONNECTING;
21
import static io.grpc.ConnectivityState.IDLE;
22
import static io.grpc.ConnectivityState.READY;
23
import static io.grpc.ConnectivityState.SHUTDOWN;
24
import static io.grpc.ConnectivityState.TRANSIENT_FAILURE;
25

26
import com.google.common.annotations.VisibleForTesting;
27
import com.google.common.collect.ImmutableList;
28
import io.grpc.Attributes;
29
import io.grpc.ConnectivityState;
30
import io.grpc.EquivalentAddressGroup;
31
import io.grpc.Internal;
32
import io.grpc.LoadBalancer;
33
import io.grpc.LoadBalancerProvider;
34
import io.grpc.Status;
35
import io.grpc.internal.PickFirstLoadBalancerProvider;
36
import java.net.SocketAddress;
37
import java.util.ArrayList;
38
import java.util.Collection;
39
import java.util.Collections;
40
import java.util.HashMap;
41
import java.util.HashSet;
42
import java.util.LinkedHashMap;
43
import java.util.List;
44
import java.util.Map;
45
import java.util.Set;
46
import java.util.logging.Level;
47
import java.util.logging.Logger;
48
import javax.annotation.Nullable;
49

50
/**
51
 * A base load balancing policy for those policies which has multiple children such as
52
 * ClusterManager or the petiole policies.  For internal use only.
53
 */
54
@Internal
55
public abstract class MultiChildLoadBalancer extends LoadBalancer {
56

57
  private static final Logger logger = Logger.getLogger(MultiChildLoadBalancer.class.getName());
1✔
58
  private final Map<Object, ChildLbState> childLbStates = new LinkedHashMap<>();
1✔
59
  private final Helper helper;
60
  // Set to true if currently in the process of handling resolved addresses.
61
  protected boolean resolvingAddresses;
62

63
  protected final LoadBalancerProvider pickFirstLbProvider = new PickFirstLoadBalancerProvider();
1✔
64

65
  protected ConnectivityState currentConnectivityState;
66

67

68
  protected MultiChildLoadBalancer(Helper helper) {
1✔
69
    this.helper = checkNotNull(helper, "helper");
1✔
70
    logger.log(Level.FINE, "Created");
1✔
71
  }
1✔
72

73
  /**
74
   * Using the state of all children will calculate the current connectivity state,
75
   * update fields, generate a picker and then call
76
   * {@link Helper#updateBalancingState(ConnectivityState, SubchannelPicker)}.
77
   */
78
  protected abstract void updateOverallBalancingState();
79

80
  /**
81
   * Override to utilize parsing of the policy configuration or alternative helper/lb generation.
82
   * Override this if keys are not Endpoints or if child policies have configuration. Null map
83
   * values preserve the child without delivering the child an update.
84
   */
85
  protected Map<Object, ResolvedAddresses> createChildAddressesMap(
86
      ResolvedAddresses resolvedAddresses) {
87
    Map<Object, ResolvedAddresses> childAddresses = new HashMap<>();
1✔
88
    for (EquivalentAddressGroup eag : resolvedAddresses.getAddresses()) {
1✔
89
      ResolvedAddresses addresses = resolvedAddresses.toBuilder()
1✔
90
          .setAddresses(Collections.singletonList(eag))
1✔
91
          .setAttributes(Attributes.newBuilder().set(IS_PETIOLE_POLICY, true).build())
1✔
92
          .setLoadBalancingPolicyConfig(null)
1✔
93
          .build();
1✔
94
      childAddresses.put(new Endpoint(eag), addresses);
1✔
95
    }
1✔
96
    return childAddresses;
1✔
97
  }
98

99
  /**
100
   * Override to create an instance of a subclass.
101
   */
102
  protected ChildLbState createChildLbState(Object key) {
103
    return new ChildLbState(key, pickFirstLbProvider);
1✔
104
  }
105

106
  /**
107
   *   Override to completely replace the default logic or to do additional activities.
108
   */
109
  @Override
110
  public Status acceptResolvedAddresses(ResolvedAddresses resolvedAddresses) {
111
    try {
112
      resolvingAddresses = true;
1✔
113

114
      // process resolvedAddresses to update children
115
      AcceptResolvedAddrRetVal acceptRetVal = acceptResolvedAddressesInternal(resolvedAddresses);
1✔
116
      if (!acceptRetVal.status.isOk()) {
1✔
117
        return acceptRetVal.status;
1✔
118
      }
119

120
      // Update the picker and our connectivity state
121
      updateOverallBalancingState();
1✔
122

123
      // shutdown removed children
124
      shutdownRemoved(acceptRetVal.removedChildren);
1✔
125
      return acceptRetVal.status;
1✔
126
    } finally {
127
      resolvingAddresses = false;
1✔
128
    }
129
  }
130

131
  /**
132
   * Handle the name resolution error.
133
   *
134
   * <p/>Override if you need special handling.
135
   */
136
  @Override
137
  public void handleNameResolutionError(Status error) {
138
    if (currentConnectivityState != READY)  {
1✔
139
      helper.updateBalancingState(
1✔
140
          TRANSIENT_FAILURE, new FixedResultPicker(PickResult.withError(error)));
1✔
141
    }
142
  }
1✔
143

144
  @Override
145
  public void shutdown() {
146
    logger.log(Level.FINE, "Shutdown");
1✔
147
    for (ChildLbState state : childLbStates.values()) {
1✔
148
      state.shutdown();
1✔
149
    }
1✔
150
    childLbStates.clear();
1✔
151
  }
1✔
152

153
  /**
154
   *   This does the work to update the child map and calculate which children have been removed.
155
   *   You must call {@link #updateOverallBalancingState} to update the picker
156
   *   and call {@link #shutdownRemoved(List)} to shutdown the endpoints that have been removed.
157
    */
158
  protected final AcceptResolvedAddrRetVal acceptResolvedAddressesInternal(
159
      ResolvedAddresses resolvedAddresses) {
160
    logger.log(Level.FINE, "Received resolution result: {0}", resolvedAddresses);
1✔
161

162
    Map<Object, ResolvedAddresses> newChildAddresses = createChildAddressesMap(resolvedAddresses);
1✔
163

164
    // Handle error case
165
    if (newChildAddresses.isEmpty()) {
1✔
166
      Status unavailableStatus = Status.UNAVAILABLE.withDescription(
1✔
167
          "NameResolver returned no usable address. " + resolvedAddresses);
168
      handleNameResolutionError(unavailableStatus);
1✔
169
      return new AcceptResolvedAddrRetVal(unavailableStatus, null);
1✔
170
    }
171

172
    updateChildrenWithResolvedAddresses(newChildAddresses);
1✔
173

174
    return new AcceptResolvedAddrRetVal(Status.OK, getRemovedChildren(newChildAddresses.keySet()));
1✔
175
  }
176

177
  private void updateChildrenWithResolvedAddresses(
178
      Map<Object, ResolvedAddresses> newChildAddresses) {
179
    for (Map.Entry<Object, ResolvedAddresses> entry : newChildAddresses.entrySet()) {
1✔
180
      ChildLbState childLbState = childLbStates.get(entry.getKey());
1✔
181
      if (childLbState == null) {
1✔
182
        childLbState = createChildLbState(entry.getKey());
1✔
183
        childLbStates.put(entry.getKey(), childLbState);
1✔
184
      }
185
      if (entry.getValue() != null) {
1✔
186
        childLbState.setResolvedAddresses(entry.getValue()); // update child
1✔
187
        childLbState.lb.handleResolvedAddresses(entry.getValue()); // update child LB
1✔
188
      }
189
    }
1✔
190
  }
1✔
191

192
  /**
193
   * Identifies which children have been removed (are not part of the newChildKeys).
194
   */
195
  private List<ChildLbState> getRemovedChildren(Set<Object> newChildKeys) {
196
    List<ChildLbState> removedChildren = new ArrayList<>();
1✔
197
    // Do removals
198
    for (Object key : ImmutableList.copyOf(childLbStates.keySet())) {
1✔
199
      if (!newChildKeys.contains(key)) {
1✔
200
        ChildLbState childLbState = childLbStates.remove(key);
1✔
201
        removedChildren.add(childLbState);
1✔
202
      }
203
    }
1✔
204
    return removedChildren;
1✔
205
  }
206

207
  protected final void shutdownRemoved(List<ChildLbState> removedChildren) {
208
    // Do shutdowns after updating picker to reduce the chance of failing an RPC by picking a
209
    // subchannel that has been shutdown.
210
    for (ChildLbState childLbState : removedChildren) {
1✔
211
      childLbState.shutdown();
1✔
212
    }
1✔
213
  }
1✔
214

215
  @Nullable
216
  protected static ConnectivityState aggregateState(
217
      @Nullable ConnectivityState overallState, ConnectivityState childState) {
218
    if (overallState == null) {
1✔
219
      return childState;
1✔
220
    }
221
    if (overallState == READY || childState == READY) {
1✔
222
      return READY;
1✔
223
    }
224
    if (overallState == CONNECTING || childState == CONNECTING) {
1✔
225
      return CONNECTING;
1✔
226
    }
227
    if (overallState == IDLE || childState == IDLE) {
1✔
228
      return IDLE;
×
229
    }
230
    return overallState;
1✔
231
  }
232

233
  protected final Helper getHelper() {
234
    return helper;
1✔
235
  }
236

237
  @VisibleForTesting
238
  public final Collection<ChildLbState> getChildLbStates() {
239
    return childLbStates.values();
1✔
240
  }
241

242
  @VisibleForTesting
243
  public final ChildLbState getChildLbState(Object key) {
244
    if (key == null) {
1✔
245
      return null;
×
246
    }
247
    if (key instanceof EquivalentAddressGroup) {
1✔
248
      key = new Endpoint((EquivalentAddressGroup) key);
1✔
249
    }
250
    return childLbStates.get(key);
1✔
251
  }
252

253
  @VisibleForTesting
254
  public final ChildLbState getChildLbStateEag(EquivalentAddressGroup eag) {
255
    return getChildLbState(new Endpoint(eag));
1✔
256
  }
257

258
  /**
259
   * Filters out non-ready child load balancers (subchannels).
260
   */
261
  protected final List<ChildLbState> getReadyChildren() {
262
    List<ChildLbState> activeChildren = new ArrayList<>();
1✔
263
    for (ChildLbState child : getChildLbStates()) {
1✔
264
      if (child.getCurrentState() == READY) {
1✔
265
        activeChildren.add(child);
1✔
266
      }
267
    }
1✔
268
    return activeChildren;
1✔
269
  }
270

271
  /**
272
   * This represents the state of load balancer children.  Each endpoint (represented by an
273
   * EquivalentAddressGroup or EDS string) will have a separate ChildLbState which in turn will
274
   * have a single child LoadBalancer created from the provided factory.
275
   *
276
   * <p>A ChildLbStateHelper is the glue between ChildLbState and the helpers associated with the
277
   * petiole policy above and the PickFirstLoadBalancer's helper below.
278
   *
279
   * <p>If you wish to store additional state information related to each subchannel, then extend
280
   * this class.
281
   */
282
  public class ChildLbState {
283
    private final Object key;
284
    private ResolvedAddresses resolvedAddresses;
285

286
    private final LoadBalancer lb;
287
    private ConnectivityState currentState;
288
    private SubchannelPicker currentPicker = new FixedResultPicker(PickResult.withNoResult());
1✔
289

290
    public ChildLbState(Object key, LoadBalancer.Factory policyFactory) {
1✔
291
      this.key = key;
1✔
292
      this.lb = policyFactory.newLoadBalancer(createChildHelper());
1✔
293
      this.currentState = CONNECTING;
1✔
294
    }
1✔
295

296
    protected ChildLbStateHelper createChildHelper() {
297
      return new ChildLbStateHelper();
1✔
298
    }
299

300
    /**
301
     * Override for unique behavior such as delayed shutdowns of subchannels.
302
     */
303
    protected void shutdown() {
304
      lb.shutdown();
1✔
305
      this.currentState = SHUTDOWN;
1✔
306
      logger.log(Level.FINE, "Child balancer {0} deleted", key);
1✔
307
    }
1✔
308

309
    @Override
310
    public String toString() {
311
      return "Address = " + key
×
312
          + ", state = " + currentState
313
          + ", picker type: " + currentPicker.getClass()
×
314
          + ", lb: " + lb;
315
    }
316

317
    public final Object getKey() {
318
      return key;
1✔
319
    }
320

321
    @VisibleForTesting
322
    public final LoadBalancer getLb() {
323
      return lb;
1✔
324
    }
325

326
    @VisibleForTesting
327
    public final SubchannelPicker getCurrentPicker() {
328
      return currentPicker;
1✔
329
    }
330

331
    public final ConnectivityState getCurrentState() {
332
      return currentState;
1✔
333
    }
334

335
    protected final void setCurrentState(ConnectivityState newState) {
336
      currentState = newState;
1✔
337
    }
1✔
338

339
    protected final void setCurrentPicker(SubchannelPicker newPicker) {
340
      currentPicker = newPicker;
1✔
341
    }
1✔
342

343
    public final EquivalentAddressGroup getEag() {
344
      if (resolvedAddresses == null || resolvedAddresses.getAddresses().isEmpty()) {
1✔
345
        return null;
×
346
      }
347
      return resolvedAddresses.getAddresses().get(0);
1✔
348
    }
349

350
    protected final void setResolvedAddresses(ResolvedAddresses newAddresses) {
351
      checkNotNull(newAddresses, "Missing address list for child");
1✔
352
      resolvedAddresses = newAddresses;
1✔
353
    }
1✔
354

355
    @VisibleForTesting
356
    public final ResolvedAddresses getResolvedAddresses() {
357
      return resolvedAddresses;
1✔
358
    }
359

360
    /**
361
     * ChildLbStateHelper is the glue between ChildLbState and the helpers associated with the
362
     * petiole policy above and the PickFirstLoadBalancer's helper below.
363
     *
364
     * <p>The ChildLbState updates happen during updateBalancingState.  Otherwise, it is doing
365
     * simple forwarding.
366
     */
367
    protected class ChildLbStateHelper extends ForwardingLoadBalancerHelper {
1✔
368

369
      /**
370
       * Update current state and picker for this child and then use
371
       * {@link #updateOverallBalancingState()} for the parent LB.
372
       */
373
      @Override
374
      public void updateBalancingState(final ConnectivityState newState,
375
          final SubchannelPicker newPicker) {
376
        if (currentState == SHUTDOWN) {
1✔
377
          return;
×
378
        }
379

380
        currentState = newState;
1✔
381
        currentPicker = newPicker;
1✔
382
        // If we are already in the process of resolving addresses, the overall balancing state
383
        // will be updated at the end of it, and we don't need to trigger that update here.
384
        if (!resolvingAddresses) {
1✔
385
          updateOverallBalancingState();
1✔
386
        }
387
      }
1✔
388

389
      @Override
390
      protected Helper delegate() {
391
        return helper;
1✔
392
      }
393
    }
394
  }
395

396
  /**
397
   * Endpoint is an optimization to quickly lookup and compare EquivalentAddressGroup address sets.
398
   * It ignores the attributes. Is used as a key for ChildLbState for most load balancers
399
   * (ClusterManagerLB uses a String).
400
   */
401
  protected static class Endpoint {
402
    final Collection<SocketAddress> addrs;
403
    final int hashCode;
404

405
    public Endpoint(EquivalentAddressGroup eag) {
1✔
406
      checkNotNull(eag, "eag");
1✔
407

408
      if (eag.getAddresses().size() < 10) {
1✔
409
        addrs = eag.getAddresses();
1✔
410
      } else {
411
        // This is expected to be very unlikely in practice
412
        addrs = new HashSet<>(eag.getAddresses());
1✔
413
      }
414
      int sum = 0;
1✔
415
      for (SocketAddress address : eag.getAddresses()) {
1✔
416
        sum += address.hashCode();
1✔
417
      }
1✔
418
      hashCode = sum;
1✔
419
    }
1✔
420

421
    @Override
422
    public int hashCode() {
423
      return hashCode;
1✔
424
    }
425

426
    @Override
427
    public boolean equals(Object other) {
428
      if (this == other) {
1✔
429
        return true;
1✔
430
      }
431

432
      if (!(other instanceof Endpoint)) {
1✔
433
        return false;
1✔
434
      }
435
      Endpoint o = (Endpoint) other;
1✔
436
      if (o.hashCode != hashCode || o.addrs.size() != addrs.size()) {
1✔
437
        return false;
1✔
438
      }
439

440
      return o.addrs.containsAll(addrs);
1✔
441
    }
442

443
    @Override
444
    public String toString() {
445
      return addrs.toString();
1✔
446
    }
447
  }
448

449
  protected static class AcceptResolvedAddrRetVal {
450
    public final Status status;
451
    public final List<ChildLbState> removedChildren;
452

453
    public AcceptResolvedAddrRetVal(Status status, List<ChildLbState> removedChildren) {
1✔
454
      this.status = status;
1✔
455
      this.removedChildren = removedChildren;
1✔
456
    }
1✔
457
  }
458
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc