• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

grpc / grpc-java / #19701

18 Feb 2025 03:33PM UTC coverage: 88.592% (-0.001%) from 88.593%
#19701

push

github

ejona86
util: Use acceptResolvedAddresses() for MultiChildLb children

A failing Status from acceptResolvedAddresses means something is wrong
with the config, but parts of the config may still have been applied.
Thus there are now two possible flows: errors that should prevent
updateOverallBalancingState() and errors that should have no effect
other than the return code. To manage that, MultChildLb must always be
responsible for calling updateOverallBalancingState().
acceptResolvedAddressesInternal() was inlined to make that error
processing easier. No existing usages actually needed to have logic
between updating the children and regenerating the picker.

RingHashLb already was verifying that the address list was not empty, so
the short-circuiting when acceptResolvedAddressesInternal() returned an
error was impossible to trigger. WrrLb's updateWeightTask() calls the
last picker, so it can run before acceptResolvedAddressesInternal(); the
only part that matters is re-creating the weightUpdateTimer.

34238 of 38647 relevant lines covered (88.59%)

0.89 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

91.87
/../xds/src/main/java/io/grpc/xds/RingHashLoadBalancer.java
1
/*
2
 * Copyright 2021 The gRPC Authors
3
 *
4
 * Licensed under the Apache License, Version 2.0 (the "License");
5
 * you may not use this file except in compliance with the License.
6
 * You may obtain a copy of the License at
7
 *
8
 *     http://www.apache.org/licenses/LICENSE-2.0
9
 *
10
 * Unless required by applicable law or agreed to in writing, software
11
 * distributed under the License is distributed on an "AS IS" BASIS,
12
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
 * See the License for the specific language governing permissions and
14
 * limitations under the License.
15
 */
16

17
package io.grpc.xds;
18

19
import static com.google.common.base.Preconditions.checkArgument;
20
import static com.google.common.base.Preconditions.checkNotNull;
21
import static com.google.common.base.Preconditions.checkState;
22
import static io.grpc.ConnectivityState.CONNECTING;
23
import static io.grpc.ConnectivityState.IDLE;
24
import static io.grpc.ConnectivityState.READY;
25
import static io.grpc.ConnectivityState.SHUTDOWN;
26
import static io.grpc.ConnectivityState.TRANSIENT_FAILURE;
27

28
import com.google.common.base.MoreObjects;
29
import com.google.common.collect.HashMultiset;
30
import com.google.common.collect.Multiset;
31
import com.google.common.primitives.UnsignedInteger;
32
import io.grpc.Attributes;
33
import io.grpc.ConnectivityState;
34
import io.grpc.EquivalentAddressGroup;
35
import io.grpc.InternalLogId;
36
import io.grpc.LoadBalancer;
37
import io.grpc.Status;
38
import io.grpc.SynchronizationContext;
39
import io.grpc.util.MultiChildLoadBalancer;
40
import io.grpc.xds.client.XdsLogger;
41
import io.grpc.xds.client.XdsLogger.XdsLogLevel;
42
import java.net.SocketAddress;
43
import java.util.ArrayList;
44
import java.util.Collection;
45
import java.util.Collections;
46
import java.util.HashMap;
47
import java.util.HashSet;
48
import java.util.List;
49
import java.util.Map;
50
import java.util.Set;
51
import java.util.stream.Collectors;
52
import javax.annotation.Nullable;
53

54
/**
55
 * A {@link LoadBalancer} that provides consistent hashing based load balancing to upstream hosts.
56
 * It implements the "Ketama" hashing that maps hosts onto a circle (the "ring") by hashing its
57
 * addresses. Each request is routed to a host by hashing some property of the request and finding
58
 * the nearest corresponding host clockwise around the ring. Each host is placed on the ring some
59
 * number of times proportional to its weight. With the ring partitioned appropriately, the
60
 * addition or removal of one host from a set of N hosts will affect only 1/N requests.
61
 */
62
final class RingHashLoadBalancer extends MultiChildLoadBalancer {
63
  private static final Status RPC_HASH_NOT_FOUND =
1✔
64
      Status.INTERNAL.withDescription("RPC hash not found. Probably a bug because xds resolver"
1✔
65
          + " config selector always generates a hash.");
66
  private static final XxHash64 hashFunc = XxHash64.INSTANCE;
1✔
67

68
  private final LoadBalancer.Factory lazyLbFactory =
1✔
69
      new LazyLoadBalancer.Factory(pickFirstLbProvider);
70
  private final XdsLogger logger;
71
  private final SynchronizationContext syncContext;
72
  private List<RingEntry> ring;
73

74
  RingHashLoadBalancer(Helper helper) {
75
    super(helper);
1✔
76
    syncContext = checkNotNull(helper.getSynchronizationContext(), "syncContext");
1✔
77
    logger = XdsLogger.withLogId(InternalLogId.allocate("ring_hash_lb", helper.getAuthority()));
1✔
78
    logger.log(XdsLogLevel.INFO, "Created");
1✔
79
  }
1✔
80

81
  @Override
82
  public Status acceptResolvedAddresses(ResolvedAddresses resolvedAddresses) {
83
    logger.log(XdsLogLevel.DEBUG, "Received resolution result: {0}", resolvedAddresses);
1✔
84
    List<EquivalentAddressGroup> addrList = resolvedAddresses.getAddresses();
1✔
85
    Status addressValidityStatus = validateAddrList(addrList);
1✔
86
    if (!addressValidityStatus.isOk()) {
1✔
87
      return addressValidityStatus;
1✔
88
    }
89

90
    // Now do the ringhash specific logic with weights and building the ring
91
    RingHashConfig config = (RingHashConfig) resolvedAddresses.getLoadBalancingPolicyConfig();
1✔
92
    if (config == null) {
1✔
93
      throw new IllegalArgumentException("Missing RingHash configuration");
×
94
    }
95
    Map<EquivalentAddressGroup, Long> serverWeights = new HashMap<>();
1✔
96
    long totalWeight = 0L;
1✔
97
    for (EquivalentAddressGroup eag : addrList) {
1✔
98
      Long weight = eag.getAttributes().get(XdsAttributes.ATTR_SERVER_WEIGHT);
1✔
99
      // Support two ways of server weighing: either multiple instances of the same address
100
      // or each address contains a per-address weight attribute. If a weight is not provided,
101
      // each occurrence of the address will be counted a weight value of one.
102
      if (weight == null) {
1✔
103
        weight = 1L;
×
104
      }
105
      totalWeight += weight;
1✔
106
      EquivalentAddressGroup addrKey = stripAttrs(eag);
1✔
107
      if (serverWeights.containsKey(addrKey)) {
1✔
108
        serverWeights.put(addrKey, serverWeights.get(addrKey) + weight);
×
109
      } else {
110
        serverWeights.put(addrKey, weight);
1✔
111
      }
112
    }
1✔
113
    // Calculate scale
114
    long minWeight = Collections.min(serverWeights.values());
1✔
115
    double normalizedMinWeight = (double) minWeight / totalWeight;
1✔
116
    // Scale up the number of hashes per host such that the least-weighted host gets a whole
117
    // number of hashes on the the ring. Other hosts might not end up with whole numbers, and
118
    // that's fine (the ring-building algorithm can handle this). This preserves the original
119
    // implementation's behavior: when weights aren't provided, all hosts should get an equal
120
    // number of hashes. In the case where this number exceeds the max_ring_size, it's scaled
121
    // back down to fit.
122
    double scale = Math.min(
1✔
123
        Math.ceil(normalizedMinWeight * config.minRingSize) / normalizedMinWeight,
1✔
124
        (double) config.maxRingSize);
125

126
    // Build the ring
127
    ring = buildRing(serverWeights, totalWeight, scale);
1✔
128

129
    return super.acceptResolvedAddresses(resolvedAddresses);
1✔
130
  }
131

132

133
  /**
134
   * Updates the overall balancing state by aggregating the connectivity states of all subchannels.
135
   *
136
   * <p>Aggregation rules (in order of dominance):
137
   * <ol>
138
   *   <li>If there is at least one subchannel in READY state, overall state is READY</li>
139
   *   <li>If there are <em>2 or more</em> subchannels in TRANSIENT_FAILURE, overall state is
140
   *   TRANSIENT_FAILURE (to allow timely failover to another policy)</li>
141
   *   <li>If there is at least one subchannel in CONNECTING state, overall state is
142
   *   CONNECTING</li>
143
   *   <li> If there is one subchannel in TRANSIENT_FAILURE state and there is
144
   *    more than one subchannel, report CONNECTING </li>
145
   *   <li>If there is at least one subchannel in IDLE state, overall state is IDLE</li>
146
   *   <li>Otherwise, overall state is TRANSIENT_FAILURE</li>
147
   * </ol>
148
   */
149
  @Override
150
  protected void updateOverallBalancingState() {
151
    checkState(!getChildLbStates().isEmpty(), "no subchannel has been created");
1✔
152
    if (this.currentConnectivityState == SHUTDOWN) {
1✔
153
      // Ignore changes that happen after shutdown is called
154
      logger.log(XdsLogLevel.DEBUG, "UpdateOverallBalancingState called after shutdown");
×
155
      return;
×
156
    }
157

158
    // Calculate the current overall state to report
159
    int numIdle = 0;
1✔
160
    int numReady = 0;
1✔
161
    int numConnecting = 0;
1✔
162
    int numTF = 0;
1✔
163

164
    forloop:
165
    for (ChildLbState childLbState : getChildLbStates()) {
1✔
166
      ConnectivityState state = childLbState.getCurrentState();
1✔
167
      switch (state) {
1✔
168
        case READY:
169
          numReady++;
1✔
170
          break forloop;
1✔
171
        case CONNECTING:
172
          numConnecting++;
1✔
173
          break;
1✔
174
        case IDLE:
175
          numIdle++;
1✔
176
          break;
1✔
177
        case TRANSIENT_FAILURE:
178
          numTF++;
1✔
179
          break;
1✔
180
        default:
181
          // ignore it
182
      }
183
    }
1✔
184

185
    ConnectivityState overallState;
186
    if (numReady > 0) {
1✔
187
      overallState = READY;
1✔
188
    } else if (numTF >= 2) {
1✔
189
      overallState = TRANSIENT_FAILURE;
1✔
190
    } else if (numConnecting > 0) {
1✔
191
      overallState = CONNECTING;
1✔
192
    } else if (numTF == 1 && getChildLbStates().size() > 1) {
1✔
193
      overallState = CONNECTING;
1✔
194
    } else if (numIdle > 0) {
1✔
195
      overallState = IDLE;
1✔
196
    } else {
197
      overallState = TRANSIENT_FAILURE;
×
198
    }
199

200
    RingHashPicker picker = new RingHashPicker(syncContext, ring, getChildLbStates());
1✔
201
    getHelper().updateBalancingState(overallState, picker);
1✔
202
    this.currentConnectivityState = overallState;
1✔
203
  }
1✔
204

205
  @Override
206
  protected ChildLbState createChildLbState(Object key) {
207
    return new ChildLbState(key, lazyLbFactory);
1✔
208
  }
209

210
  private Status validateAddrList(List<EquivalentAddressGroup> addrList) {
211
    if (addrList.isEmpty()) {
1✔
212
      Status unavailableStatus = Status.UNAVAILABLE.withDescription("Ring hash lb error: EDS "
×
213
              + "resolution was successful, but returned server addresses are empty.");
214
      handleNameResolutionError(unavailableStatus);
×
215
      return unavailableStatus;
×
216
    }
217

218
    String dupAddrString = validateNoDuplicateAddresses(addrList);
1✔
219
    if (dupAddrString != null) {
1✔
220
      Status unavailableStatus = Status.UNAVAILABLE.withDescription("Ring hash lb error: EDS "
1✔
221
              + "resolution was successful, but there were duplicate addresses: " + dupAddrString);
222
      handleNameResolutionError(unavailableStatus);
1✔
223
      return unavailableStatus;
1✔
224
    }
225

226
    long totalWeight = 0;
1✔
227
    for (EquivalentAddressGroup eag : addrList) {
1✔
228
      Long weight = eag.getAttributes().get(XdsAttributes.ATTR_SERVER_WEIGHT);
1✔
229

230
      if (weight == null) {
1✔
231
        weight = 1L;
×
232
      }
233

234
      if (weight < 0) {
1✔
235
        Status unavailableStatus = Status.UNAVAILABLE.withDescription(
1✔
236
            String.format("Ring hash lb error: EDS resolution was successful, but returned a "
1✔
237
                        + "negative weight for %s.", stripAttrs(eag)));
1✔
238
        handleNameResolutionError(unavailableStatus);
1✔
239
        return unavailableStatus;
1✔
240
      }
241
      if (weight > UnsignedInteger.MAX_VALUE.longValue()) {
1✔
242
        Status unavailableStatus = Status.UNAVAILABLE.withDescription(
1✔
243
            String.format("Ring hash lb error: EDS resolution was successful, but returned a weight"
1✔
244
                + " too large to fit in an unsigned int for %s.", stripAttrs(eag)));
1✔
245
        handleNameResolutionError(unavailableStatus);
1✔
246
        return unavailableStatus;
1✔
247
      }
248
      totalWeight += weight;
1✔
249
    }
1✔
250

251
    if (totalWeight > UnsignedInteger.MAX_VALUE.longValue()) {
1✔
252
      Status unavailableStatus = Status.UNAVAILABLE.withDescription(
1✔
253
          String.format(
1✔
254
              "Ring hash lb error: EDS resolution was successful, but returned a sum of weights too"
255
                  + " large to fit in an unsigned int (%d).", totalWeight));
1✔
256
      handleNameResolutionError(unavailableStatus);
1✔
257
      return unavailableStatus;
1✔
258
    }
259

260
    return Status.OK;
1✔
261
  }
262

263
  @Nullable
264
  private String validateNoDuplicateAddresses(List<EquivalentAddressGroup> addrList) {
265
    Set<SocketAddress> addresses = new HashSet<>();
1✔
266
    Multiset<String> dups = HashMultiset.create();
1✔
267
    for (EquivalentAddressGroup eag : addrList) {
1✔
268
      for (SocketAddress address : eag.getAddresses()) {
1✔
269
        if (!addresses.add(address)) {
1✔
270
          dups.add(address.toString());
1✔
271
        }
272
      }
1✔
273
    }
1✔
274

275
    if (!dups.isEmpty()) {
1✔
276
      return dups.entrySet().stream()
1✔
277
          .map((dup) ->
1✔
278
              String.format("Address: %s, count: %d", dup.getElement(), dup.getCount() + 1))
1✔
279
          .collect(Collectors.joining("; "));
1✔
280
    }
281

282
    return null;
1✔
283
  }
284

285
  private static List<RingEntry> buildRing(
286
      Map<EquivalentAddressGroup, Long> serverWeights, long totalWeight, double scale) {
287
    List<RingEntry> ring = new ArrayList<>();
1✔
288
    double currentHashes = 0.0;
1✔
289
    double targetHashes = 0.0;
1✔
290
    for (Map.Entry<EquivalentAddressGroup, Long> entry : serverWeights.entrySet()) {
1✔
291
      Endpoint endpoint = new Endpoint(entry.getKey());
1✔
292
      double normalizedWeight = (double) entry.getValue() / totalWeight;
1✔
293
      // Per GRFC A61 use the first address for the hash
294
      StringBuilder sb = new StringBuilder(entry.getKey().getAddresses().get(0).toString());
1✔
295
      sb.append('_');
1✔
296
      int lengthWithoutCounter = sb.length();
1✔
297
      targetHashes += scale * normalizedWeight;
1✔
298
      long i = 0L;
1✔
299
      while (currentHashes < targetHashes) {
1✔
300
        sb.append(i);
1✔
301
        long hash = hashFunc.hashAsciiString(sb.toString());
1✔
302
        ring.add(new RingEntry(hash, endpoint));
1✔
303
        i++;
1✔
304
        currentHashes++;
1✔
305
        sb.setLength(lengthWithoutCounter);
1✔
306
      }
1✔
307
    }
1✔
308
    Collections.sort(ring);
1✔
309
    return Collections.unmodifiableList(ring);
1✔
310
  }
311

312
  @SuppressWarnings("ReferenceEquality")
313
  public static EquivalentAddressGroup stripAttrs(EquivalentAddressGroup eag) {
314
    if (eag.getAttributes() == Attributes.EMPTY) {
1✔
315
      return eag;
×
316
    }
317
    return new EquivalentAddressGroup(eag.getAddresses());
1✔
318
  }
319

320
  private static final class RingHashPicker extends SubchannelPicker {
321
    private final SynchronizationContext syncContext;
322
    private final List<RingEntry> ring;
323
    // Avoid synchronization between pickSubchannel and subchannel's connectivity state change,
324
    // freeze picker's view of subchannel's connectivity state.
325
    // TODO(chengyuanzhang): can be more performance-friendly with
326
    //  IdentityHashMap<Subchannel, ConnectivityStateInfo> and RingEntry contains Subchannel.
327
    private final Map<Endpoint, SubchannelView> pickableSubchannels;  // read-only
328

329
    private RingHashPicker(
330
        SynchronizationContext syncContext, List<RingEntry> ring,
331
        Collection<ChildLbState> children) {
1✔
332
      this.syncContext = syncContext;
1✔
333
      this.ring = ring;
1✔
334
      pickableSubchannels = new HashMap<>(children.size());
1✔
335
      for (ChildLbState childLbState : children) {
1✔
336
        pickableSubchannels.put((Endpoint)childLbState.getKey(),
1✔
337
            new SubchannelView(childLbState, childLbState.getCurrentState()));
1✔
338
      }
1✔
339
    }
1✔
340

341
    // Find the ring entry with hash next to (clockwise) the RPC's hash (binary search).
342
    private int getTargetIndex(Long requestHash) {
343
      if (ring.size() <= 1) {
1✔
344
        return 0;
×
345
      }
346

347
      int low = 0;
1✔
348
      int high = ring.size() - 1;
1✔
349
      int mid = (low + high) / 2;
1✔
350
      do {
351
        long midVal = ring.get(mid).hash;
1✔
352
        long midValL = mid == 0 ? 0 : ring.get(mid - 1).hash;
1✔
353
        if (requestHash <= midVal && requestHash > midValL) {
1✔
354
          break;
1✔
355
        }
356
        if (midVal < requestHash) {
1✔
357
          low = mid + 1;
1✔
358
        } else {
359
          high =  mid - 1;
1✔
360
        }
361
        mid = (low + high) / 2;
1✔
362
      } while (mid < ring.size() && low <= high);
1✔
363
      return mid;
1✔
364
    }
365

366
    @Override
367
    public PickResult pickSubchannel(PickSubchannelArgs args) {
368
      Long requestHash = args.getCallOptions().getOption(XdsNameResolver.RPC_HASH_KEY);
1✔
369
      if (requestHash == null) {
1✔
370
        return PickResult.withError(RPC_HASH_NOT_FOUND);
×
371
      }
372

373
      int targetIndex = getTargetIndex(requestHash);
1✔
374

375
      // Per gRFC A61, because of sticky-TF with PickFirst's auto reconnect on TF, we ignore
376
      // all TF subchannels and find the first ring entry in READY, CONNECTING or IDLE.  If
377
      // CONNECTING or IDLE we return a pick with no results.  Additionally, if that entry is in
378
      // IDLE, we initiate a connection.
379
      for (int i = 0; i < ring.size(); i++) {
1✔
380
        int index = (targetIndex + i) % ring.size();
1✔
381
        SubchannelView subchannelView = pickableSubchannels.get(ring.get(index).addrKey);
1✔
382
        ChildLbState childLbState = subchannelView.childLbState;
1✔
383

384
        if (subchannelView.connectivityState  == READY) {
1✔
385
          return childLbState.getCurrentPicker().pickSubchannel(args);
1✔
386
        }
387

388
        // RPCs can be buffered if the next subchannel is pending (per A62). Otherwise, RPCs
389
        // are failed unless there is a READY connection.
390
        if (subchannelView.connectivityState == CONNECTING) {
1✔
391
          return PickResult.withNoResult();
1✔
392
        }
393

394
        if (subchannelView.connectivityState == IDLE) {
1✔
395
          syncContext.execute(() -> {
1✔
396
            childLbState.getLb().requestConnection();
1✔
397
          });
1✔
398

399
          return PickResult.withNoResult(); // Indicates that this should be retried after backoff
1✔
400
        }
401
      }
402

403
      // return the pick from the original subchannel hit by hash, which is probably an error
404
      ChildLbState originalSubchannel =
1✔
405
          pickableSubchannels.get(ring.get(targetIndex).addrKey).childLbState;
1✔
406
      return originalSubchannel.getCurrentPicker().pickSubchannel(args);
1✔
407
    }
408

409
  }
410

411
  /**
412
   * An unmodifiable view of a subchannel with state not subject to its real connectivity
413
   * state changes.
414
   */
415
  private static final class SubchannelView {
416
    private final ChildLbState childLbState;
417
    private final ConnectivityState connectivityState;
418

419
    private SubchannelView(ChildLbState childLbState, ConnectivityState state) {
1✔
420
      this.childLbState = childLbState;
1✔
421
      this.connectivityState = state;
1✔
422
    }
1✔
423
  }
424

425
  private static final class RingEntry implements Comparable<RingEntry> {
426
    private final long hash;
427
    private final Endpoint addrKey;
428

429
    private RingEntry(long hash, Endpoint addrKey) {
1✔
430
      this.hash = hash;
1✔
431
      this.addrKey = addrKey;
1✔
432
    }
1✔
433

434
    @Override
435
    public int compareTo(RingEntry entry) {
436
      return Long.compare(hash, entry.hash);
1✔
437
    }
438
  }
439

440
  /**
441
   * Configures the ring property. The larger the ring is (that is, the more hashes there are
442
   * for each provided host) the better the request distribution will reflect the desired weights.
443
   */
444
  static final class RingHashConfig {
445
    final long minRingSize;
446
    final long maxRingSize;
447

448
    RingHashConfig(long minRingSize, long maxRingSize) {
1✔
449
      checkArgument(minRingSize > 0, "minRingSize <= 0");
1✔
450
      checkArgument(maxRingSize > 0, "maxRingSize <= 0");
1✔
451
      checkArgument(minRingSize <= maxRingSize, "minRingSize > maxRingSize");
1✔
452
      this.minRingSize = minRingSize;
1✔
453
      this.maxRingSize = maxRingSize;
1✔
454
    }
1✔
455

456
    @Override
457
    public String toString() {
458
      return MoreObjects.toStringHelper(this)
×
459
          .add("minRingSize", minRingSize)
×
460
          .add("maxRingSize", maxRingSize)
×
461
          .toString();
×
462
    }
463
  }
464
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc