• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

grpc / grpc-java / #19416

12 Aug 2024 06:23PM UTC coverage: 84.469% (-0.002%) from 84.471%
#19416

push

github

web-flow
xds: WRRPicker must not access unsynchronized data in ChildLbState

There was no point to using subchannels as keys to
subchannelToReportListenerMap, as the listener is per-child. That meant
the keys would be guaranteed to be known ahead-of-time and the
unsynchronized getOrCreateOrcaListener() during picking was unnecessary.

The picker still stores ChildLbStates to make sure that updating weights
uses the correct children, but the picker itself no longer references
ChildLbStates except in the constructor. That means weight calculation
is moved into the LB policy, as child.getWeight() is unsynchronized, and
the picker no longer needs a reference to helper.

33389 of 39528 relevant lines covered (84.47%)

0.84 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

96.13
/../util/src/main/java/io/grpc/util/MultiChildLoadBalancer.java
1
/*
2
 * Copyright 2023 The gRPC Authors
3
 *
4
 * Licensed under the Apache License, Version 2.0 (the "License");
5
 * you may not use this file except in compliance with the License.
6
 * You may obtain a copy of the License at
7
 *
8
 *     http://www.apache.org/licenses/LICENSE-2.0
9
 *
10
 * Unless required by applicable law or agreed to in writing, software
11
 * distributed under the License is distributed on an "AS IS" BASIS,
12
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
 * See the License for the specific language governing permissions and
14
 * limitations under the License.
15
 */
16

17
package io.grpc.util;
18

19
import static com.google.common.base.Preconditions.checkArgument;
20
import static com.google.common.base.Preconditions.checkNotNull;
21
import static io.grpc.ConnectivityState.CONNECTING;
22
import static io.grpc.ConnectivityState.IDLE;
23
import static io.grpc.ConnectivityState.READY;
24
import static io.grpc.ConnectivityState.SHUTDOWN;
25
import static io.grpc.ConnectivityState.TRANSIENT_FAILURE;
26

27
import com.google.common.annotations.VisibleForTesting;
28
import com.google.common.collect.ImmutableList;
29
import com.google.common.collect.ImmutableMap;
30
import io.grpc.Attributes;
31
import io.grpc.ConnectivityState;
32
import io.grpc.EquivalentAddressGroup;
33
import io.grpc.Internal;
34
import io.grpc.LoadBalancer;
35
import io.grpc.LoadBalancerProvider;
36
import io.grpc.Status;
37
import io.grpc.internal.PickFirstLoadBalancerProvider;
38
import java.net.SocketAddress;
39
import java.util.ArrayList;
40
import java.util.Collection;
41
import java.util.Collections;
42
import java.util.HashMap;
43
import java.util.HashSet;
44
import java.util.LinkedHashMap;
45
import java.util.List;
46
import java.util.Map;
47
import java.util.Set;
48
import java.util.logging.Level;
49
import java.util.logging.Logger;
50
import javax.annotation.Nullable;
51

52
/**
53
 * A base load balancing policy for those policies which has multiple children such as
54
 * ClusterManager or the petiole policies.  For internal use only.
55
 */
56
@Internal
57
public abstract class MultiChildLoadBalancer extends LoadBalancer {
58

59
  private static final Logger logger = Logger.getLogger(MultiChildLoadBalancer.class.getName());
1✔
60
  private final Map<Object, ChildLbState> childLbStates = new LinkedHashMap<>();
1✔
61
  private final Helper helper;
62
  // Set to true if currently in the process of handling resolved addresses.
63
  protected boolean resolvingAddresses;
64

65
  protected final LoadBalancerProvider pickFirstLbProvider = new PickFirstLoadBalancerProvider();
1✔
66

67
  protected ConnectivityState currentConnectivityState;
68

69

70
  protected MultiChildLoadBalancer(Helper helper) {
1✔
71
    this.helper = checkNotNull(helper, "helper");
1✔
72
    logger.log(Level.FINE, "Created");
1✔
73
  }
1✔
74

75
  /**
76
   * Using the state of all children will calculate the current connectivity state,
77
   * update fields, generate a picker and then call
78
   * {@link Helper#updateBalancingState(ConnectivityState, SubchannelPicker)}.
79
   */
80
  protected abstract void updateOverallBalancingState();
81

82
  /**
83
   * Override to utilize parsing of the policy configuration or alternative helper/lb generation.
84
   */
85
  protected Map<Object, ChildLbState> createChildLbMap(ResolvedAddresses resolvedAddresses) {
86
    Map<Object, ChildLbState> childLbMap = new HashMap<>();
1✔
87
    List<EquivalentAddressGroup> addresses = resolvedAddresses.getAddresses();
1✔
88
    for (EquivalentAddressGroup eag : addresses) {
1✔
89
      Endpoint endpoint = new Endpoint(eag); // keys need to be just addresses
1✔
90
      ChildLbState existingChildLbState = childLbStates.get(endpoint);
1✔
91
      if (existingChildLbState != null) {
1✔
92
        childLbMap.put(endpoint, existingChildLbState);
1✔
93
      } else {
94
        childLbMap.put(endpoint,
1✔
95
            createChildLbState(endpoint, null, getInitialPicker(), resolvedAddresses));
1✔
96
      }
97
    }
1✔
98
    return childLbMap;
1✔
99
  }
100

101
  /**
102
   * Override to create an instance of a subclass.
103
   */
104
  protected ChildLbState createChildLbState(Object key, Object policyConfig,
105
      SubchannelPicker initialPicker, ResolvedAddresses resolvedAddresses) {
106
    return new ChildLbState(key, pickFirstLbProvider, policyConfig, initialPicker);
1✔
107
  }
108

109
  /**
110
   *   Override to completely replace the default logic or to do additional activities.
111
   */
112
  @Override
113
  public Status acceptResolvedAddresses(ResolvedAddresses resolvedAddresses) {
114
    try {
115
      resolvingAddresses = true;
1✔
116

117
      // process resolvedAddresses to update children
118
      AcceptResolvedAddrRetVal acceptRetVal = acceptResolvedAddressesInternal(resolvedAddresses);
1✔
119
      if (!acceptRetVal.status.isOk()) {
1✔
120
        return acceptRetVal.status;
1✔
121
      }
122

123
      // Update the picker and our connectivity state
124
      updateOverallBalancingState();
1✔
125

126
      // shutdown removed children
127
      shutdownRemoved(acceptRetVal.removedChildren);
1✔
128
      return acceptRetVal.status;
1✔
129
    } finally {
130
      resolvingAddresses = false;
1✔
131
    }
132
  }
133

134
  /**
135
   * Override this if your keys are not of type Endpoint.
136
   * @param key Key to identify the ChildLbState
137
   * @param resolvedAddresses list of addresses which include attributes
138
   * @param childConfig a load balancing policy config. This field is optional.
139
   * @return a fully loaded ResolvedAddresses object for the specified key
140
   */
141
  protected ResolvedAddresses getChildAddresses(Object key, ResolvedAddresses resolvedAddresses,
142
      Object childConfig) {
143
    Endpoint endpointKey;
144
    if (key instanceof EquivalentAddressGroup) {
1✔
145
      endpointKey = new Endpoint((EquivalentAddressGroup) key);
×
146
    } else {
147
      checkArgument(key instanceof Endpoint, "key is wrong type");
1✔
148
      endpointKey = (Endpoint) key;
1✔
149
    }
150

151
    // Retrieve the non-stripped version
152
    EquivalentAddressGroup eagToUse = null;
1✔
153
    for (EquivalentAddressGroup currEag : resolvedAddresses.getAddresses()) {
1✔
154
      if (endpointKey.equals(new Endpoint(currEag))) {
1✔
155
        eagToUse = currEag;
1✔
156
        break;
1✔
157
      }
158
    }
1✔
159

160
    checkNotNull(eagToUse, key + " no longer present in load balancer children");
1✔
161

162
    return resolvedAddresses.toBuilder()
1✔
163
        .setAddresses(Collections.singletonList(eagToUse))
1✔
164
        .setAttributes(Attributes.newBuilder().set(IS_PETIOLE_POLICY, true).build())
1✔
165
        .setLoadBalancingPolicyConfig(childConfig)
1✔
166
        .build();
1✔
167
  }
168

169
  /**
170
   * Handle the name resolution error.
171
   *
172
   * <p/>Override if you need special handling.
173
   */
174
  @Override
175
  public void handleNameResolutionError(Status error) {
176
    if (currentConnectivityState != READY)  {
1✔
177
      helper.updateBalancingState(TRANSIENT_FAILURE, getErrorPicker(error));
1✔
178
    }
179
  }
1✔
180

181
  /**
182
   * Handle the name resolution error only for the specified child.
183
   *
184
   * <p/>Override if you need special handling.
185
   */
186
  protected void handleNameResolutionError(ChildLbState child, Status error) {
187
    child.lb.handleNameResolutionError(error);
1✔
188
  }
1✔
189

190
  /**
191
   * Creates a picker representing the state before any connections have been established.
192
   *
193
   * <p/>Override to produce a custom picker.
194
   */
195
  protected SubchannelPicker getInitialPicker() {
196
    return new FixedResultPicker(PickResult.withNoResult());
1✔
197
  }
198

199
  /**
200
   * Creates a new picker representing an error status.
201
   *
202
   * <p/>Override to produce a custom picker when there are errors.
203
   */
204
  protected SubchannelPicker getErrorPicker(Status error)  {
205
    return new FixedResultPicker(PickResult.withError(error));
1✔
206
  }
207

208
  @Override
209
  public void shutdown() {
210
    logger.log(Level.FINE, "Shutdown");
1✔
211
    for (ChildLbState state : childLbStates.values()) {
1✔
212
      state.shutdown();
1✔
213
    }
1✔
214
    childLbStates.clear();
1✔
215
  }
1✔
216

217
  /**
218
   *   This does the work to update the child map and calculate which children have been removed.
219
   *   You must call {@link #updateOverallBalancingState} to update the picker
220
   *   and call {@link #shutdownRemoved(List)} to shutdown the endpoints that have been removed.
221
    */
222
  protected final AcceptResolvedAddrRetVal acceptResolvedAddressesInternal(
223
      ResolvedAddresses resolvedAddresses) {
224
    logger.log(Level.FINE, "Received resolution result: {0}", resolvedAddresses);
1✔
225

226
    // Subclass handles any special manipulation to create appropriate types of keyed ChildLbStates
227
    Map<Object, ChildLbState> newChildren = createChildLbMap(resolvedAddresses);
1✔
228

229
    // Handle error case
230
    if (newChildren.isEmpty()) {
1✔
231
      Status unavailableStatus = Status.UNAVAILABLE.withDescription(
1✔
232
          "NameResolver returned no usable address. " + resolvedAddresses);
233
      handleNameResolutionError(unavailableStatus);
1✔
234
      return new AcceptResolvedAddrRetVal(unavailableStatus, null);
1✔
235
    }
236

237
    addMissingChildren(newChildren);
1✔
238

239
    updateChildrenWithResolvedAddresses(resolvedAddresses, newChildren);
1✔
240

241
    return new AcceptResolvedAddrRetVal(Status.OK, getRemovedChildren(newChildren.keySet()));
1✔
242
  }
243

244
  protected final void addMissingChildren(Map<Object, ChildLbState> newChildren) {
245
    // Do adds and identify reused children
246
    for (Map.Entry<Object, ChildLbState> entry : newChildren.entrySet()) {
1✔
247
      final Object key = entry.getKey();
1✔
248
      if (!childLbStates.containsKey(key)) {
1✔
249
        childLbStates.put(key, entry.getValue());
1✔
250
      }
251
    }
1✔
252
  }
1✔
253

254
  protected final void updateChildrenWithResolvedAddresses(ResolvedAddresses resolvedAddresses,
255
                                                     Map<Object, ChildLbState> newChildren) {
256
    for (Map.Entry<Object, ChildLbState> entry : newChildren.entrySet()) {
1✔
257
      Object childConfig = entry.getValue().getConfig();
1✔
258
      ChildLbState childLbState = childLbStates.get(entry.getKey());
1✔
259
      ResolvedAddresses childAddresses =
1✔
260
          getChildAddresses(entry.getKey(), resolvedAddresses, childConfig);
1✔
261
      childLbState.setResolvedAddresses(childAddresses); // update child
1✔
262
      childLbState.lb.handleResolvedAddresses(childAddresses); // update child LB
1✔
263
    }
1✔
264
  }
1✔
265

266
  /**
267
   * Identifies which children have been removed (are not part of the newChildKeys).
268
   */
269
  protected final List<ChildLbState> getRemovedChildren(Set<Object> newChildKeys) {
270
    List<ChildLbState> removedChildren = new ArrayList<>();
1✔
271
    // Do removals
272
    for (Object key : ImmutableList.copyOf(childLbStates.keySet())) {
1✔
273
      if (!newChildKeys.contains(key)) {
1✔
274
        ChildLbState childLbState = childLbStates.remove(key);
1✔
275
        removedChildren.add(childLbState);
1✔
276
      }
277
    }
1✔
278
    return removedChildren;
1✔
279
  }
280

281
  protected final void shutdownRemoved(List<ChildLbState> removedChildren) {
282
    // Do shutdowns after updating picker to reduce the chance of failing an RPC by picking a
283
    // subchannel that has been shutdown.
284
    for (ChildLbState childLbState : removedChildren) {
1✔
285
      childLbState.shutdown();
1✔
286
    }
1✔
287
  }
1✔
288

289
  @Nullable
290
  protected static ConnectivityState aggregateState(
291
      @Nullable ConnectivityState overallState, ConnectivityState childState) {
292
    if (overallState == null) {
1✔
293
      return childState;
1✔
294
    }
295
    if (overallState == READY || childState == READY) {
1✔
296
      return READY;
1✔
297
    }
298
    if (overallState == CONNECTING || childState == CONNECTING) {
1✔
299
      return CONNECTING;
1✔
300
    }
301
    if (overallState == IDLE || childState == IDLE) {
1✔
302
      return IDLE;
×
303
    }
304
    return overallState;
1✔
305
  }
306

307
  protected final Helper getHelper() {
308
    return helper;
1✔
309
  }
310

311
  @VisibleForTesting
312
  public final ImmutableMap<Object, ChildLbState> getImmutableChildMap() {
313
    return ImmutableMap.copyOf(childLbStates);
1✔
314
  }
315

316
  @VisibleForTesting
317
  public final Collection<ChildLbState> getChildLbStates() {
318
    return childLbStates.values();
1✔
319
  }
320

321
  @VisibleForTesting
322
  public final ChildLbState getChildLbState(Object key) {
323
    if (key == null) {
1✔
324
      return null;
×
325
    }
326
    if (key instanceof EquivalentAddressGroup) {
1✔
327
      key = new Endpoint((EquivalentAddressGroup) key);
1✔
328
    }
329
    return childLbStates.get(key);
1✔
330
  }
331

332
  @VisibleForTesting
333
  public final ChildLbState getChildLbStateEag(EquivalentAddressGroup eag) {
334
    return getChildLbState(new Endpoint(eag));
1✔
335
  }
336

337
  /**
338
   * Filters out non-ready child load balancers (subchannels).
339
   */
340
  protected final List<ChildLbState> getReadyChildren() {
341
    List<ChildLbState> activeChildren = new ArrayList<>();
1✔
342
    for (ChildLbState child : getChildLbStates()) {
1✔
343
      if (child.getCurrentState() == READY) {
1✔
344
        activeChildren.add(child);
1✔
345
      }
346
    }
1✔
347
    return activeChildren;
1✔
348
  }
349

350
  /**
351
   * This represents the state of load balancer children.  Each endpoint (represented by an
352
   * EquivalentAddressGroup or EDS string) will have a separate ChildLbState which in turn will
353
   * have a single child LoadBalancer created from the provided factory.
354
   *
355
   * <p>A ChildLbStateHelper is the glue between ChildLbState and the helpers associated with the
356
   * petiole policy above and the PickFirstLoadBalancer's helper below.
357
   *
358
   * <p>If you wish to store additional state information related to each subchannel, then extend
359
   * this class.
360
   */
361
  public class ChildLbState {
362
    private final Object key;
363
    private ResolvedAddresses resolvedAddresses;
364
    private final Object config;
365

366
    private final LoadBalancer lb;
367
    private ConnectivityState currentState;
368
    private SubchannelPicker currentPicker;
369

370
    public ChildLbState(Object key, LoadBalancer.Factory policyFactory, Object childConfig,
371
          SubchannelPicker initialPicker) {
1✔
372
      this.key = key;
1✔
373
      this.currentPicker = initialPicker;
1✔
374
      this.config = childConfig;
1✔
375
      this.lb = policyFactory.newLoadBalancer(createChildHelper());
1✔
376
      this.currentState = CONNECTING;
1✔
377
    }
1✔
378

379
    protected ChildLbStateHelper createChildHelper() {
380
      return new ChildLbStateHelper();
1✔
381
    }
382

383
    /**
384
     * Override for unique behavior such as delayed shutdowns of subchannels.
385
     */
386
    protected void shutdown() {
387
      lb.shutdown();
1✔
388
      this.currentState = SHUTDOWN;
1✔
389
      logger.log(Level.FINE, "Child balancer {0} deleted", key);
1✔
390
    }
1✔
391

392
    @Override
393
    public String toString() {
394
      return "Address = " + key
×
395
          + ", state = " + currentState
396
          + ", picker type: " + currentPicker.getClass()
×
397
          + ", lb: " + lb;
398
    }
399

400
    public final Object getKey() {
401
      return key;
1✔
402
    }
403

404
    @VisibleForTesting
405
    public final LoadBalancer getLb() {
406
      return lb;
1✔
407
    }
408

409
    @VisibleForTesting
410
    public final SubchannelPicker getCurrentPicker() {
411
      return currentPicker;
1✔
412
    }
413

414
    public final ConnectivityState getCurrentState() {
415
      return currentState;
1✔
416
    }
417

418
    protected final void setCurrentState(ConnectivityState newState) {
419
      currentState = newState;
1✔
420
    }
1✔
421

422
    protected final void setCurrentPicker(SubchannelPicker newPicker) {
423
      currentPicker = newPicker;
1✔
424
    }
1✔
425

426
    public final EquivalentAddressGroup getEag() {
427
      if (resolvedAddresses == null || resolvedAddresses.getAddresses().isEmpty()) {
1✔
428
        return null;
×
429
      }
430
      return resolvedAddresses.getAddresses().get(0);
1✔
431
    }
432

433
    protected final void setResolvedAddresses(ResolvedAddresses newAddresses) {
434
      checkNotNull(newAddresses, "Missing address list for child");
1✔
435
      resolvedAddresses = newAddresses;
1✔
436
    }
1✔
437

438
    private Object getConfig() {
439
      return config;
1✔
440
    }
441

442
    @VisibleForTesting
443
    public final ResolvedAddresses getResolvedAddresses() {
444
      return resolvedAddresses;
1✔
445
    }
446

447
    /**
448
     * ChildLbStateHelper is the glue between ChildLbState and the helpers associated with the
449
     * petiole policy above and the PickFirstLoadBalancer's helper below.
450
     *
451
     * <p>The ChildLbState updates happen during updateBalancingState.  Otherwise, it is doing
452
     * simple forwarding.
453
     */
454
    protected class ChildLbStateHelper extends ForwardingLoadBalancerHelper {
1✔
455

456
      /**
457
       * Update current state and picker for this child and then use
458
       * {@link #updateOverallBalancingState()} for the parent LB.
459
       *
460
       * <p/>Override this if you don't want to automatically request a connection when in IDLE
461
       */
462
      @Override
463
      public void updateBalancingState(final ConnectivityState newState,
464
          final SubchannelPicker newPicker) {
465
        if (!childLbStates.containsKey(key)) {
1✔
466
          return;
×
467
        }
468

469
        currentState = newState;
1✔
470
        currentPicker = newPicker;
1✔
471
        // If we are already in the process of resolving addresses, the overall balancing state
472
        // will be updated at the end of it, and we don't need to trigger that update here.
473
        if (!resolvingAddresses) {
1✔
474
          if (newState == IDLE) {
1✔
475
            lb.requestConnection();
1✔
476
          }
477
          updateOverallBalancingState();
1✔
478
        }
479
      }
1✔
480

481
      @Override
482
      protected Helper delegate() {
483
        return helper;
1✔
484
      }
485
    }
486
  }
487

488
  /**
489
   * Endpoint is an optimization to quickly lookup and compare EquivalentAddressGroup address sets.
490
   * It ignores the attributes. Is used as a key for ChildLbState for most load balancers
491
   * (ClusterManagerLB uses a String).
492
   */
493
  protected static class Endpoint {
494
    final Collection<SocketAddress> addrs;
495
    final int hashCode;
496

497
    public Endpoint(EquivalentAddressGroup eag) {
1✔
498
      checkNotNull(eag, "eag");
1✔
499

500
      if (eag.getAddresses().size() < 10) {
1✔
501
        addrs = eag.getAddresses();
1✔
502
      } else {
503
        // This is expected to be very unlikely in practice
504
        addrs = new HashSet<>(eag.getAddresses());
1✔
505
      }
506
      int sum = 0;
1✔
507
      for (SocketAddress address : eag.getAddresses()) {
1✔
508
        sum += address.hashCode();
1✔
509
      }
1✔
510
      hashCode = sum;
1✔
511
    }
1✔
512

513
    @Override
514
    public int hashCode() {
515
      return hashCode;
1✔
516
    }
517

518
    @Override
519
    public boolean equals(Object other) {
520
      if (this == other) {
1✔
521
        return true;
1✔
522
      }
523

524
      if (!(other instanceof Endpoint)) {
1✔
525
        return false;
1✔
526
      }
527
      Endpoint o = (Endpoint) other;
1✔
528
      if (o.hashCode != hashCode || o.addrs.size() != addrs.size()) {
1✔
529
        return false;
1✔
530
      }
531

532
      return o.addrs.containsAll(addrs);
1✔
533
    }
534

535
    @Override
536
    public String toString() {
537
      return addrs.toString();
1✔
538
    }
539
  }
540

541
  protected static class AcceptResolvedAddrRetVal {
542
    public final Status status;
543
    public final List<ChildLbState> removedChildren;
544

545
    public AcceptResolvedAddrRetVal(Status status, List<ChildLbState> removedChildren) {
1✔
546
      this.status = status;
1✔
547
      this.removedChildren = removedChildren;
1✔
548
    }
1✔
549
  }
550
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc