• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

grpc / grpc-java / #19420

12 Aug 2024 11:40PM UTC coverage: 84.484% (+0.008%) from 84.476%
#19420

push

github

ejona86
xds: Delegate more RingHashLB address updates to MultiChildLB

Since 04474970 RingHashLB has not used
acceptResolvedAddressesInternal(). At the time that was needed because
deactivated children were part of MultiChildLB. But in 9de8e443, the
logic of RingHashLB and MultiChildLB.acceptResolvedAddressesInternal()
converged, so it can now swap back to using the base class for more
logic.

33389 of 39521 relevant lines covered (84.48%)

0.84 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

91.71
/../xds/src/main/java/io/grpc/xds/RingHashLoadBalancer.java
1
/*
2
 * Copyright 2021 The gRPC Authors
3
 *
4
 * Licensed under the Apache License, Version 2.0 (the "License");
5
 * you may not use this file except in compliance with the License.
6
 * You may obtain a copy of the License at
7
 *
8
 *     http://www.apache.org/licenses/LICENSE-2.0
9
 *
10
 * Unless required by applicable law or agreed to in writing, software
11
 * distributed under the License is distributed on an "AS IS" BASIS,
12
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
 * See the License for the specific language governing permissions and
14
 * limitations under the License.
15
 */
16

17
package io.grpc.xds;
18

19
import static com.google.common.base.Preconditions.checkArgument;
20
import static com.google.common.base.Preconditions.checkNotNull;
21
import static com.google.common.base.Preconditions.checkState;
22
import static io.grpc.ConnectivityState.CONNECTING;
23
import static io.grpc.ConnectivityState.IDLE;
24
import static io.grpc.ConnectivityState.READY;
25
import static io.grpc.ConnectivityState.SHUTDOWN;
26
import static io.grpc.ConnectivityState.TRANSIENT_FAILURE;
27

28
import com.google.common.base.MoreObjects;
29
import com.google.common.collect.HashMultiset;
30
import com.google.common.collect.ImmutableMap;
31
import com.google.common.collect.Multiset;
32
import com.google.common.primitives.UnsignedInteger;
33
import io.grpc.Attributes;
34
import io.grpc.ConnectivityState;
35
import io.grpc.EquivalentAddressGroup;
36
import io.grpc.InternalLogId;
37
import io.grpc.LoadBalancer;
38
import io.grpc.Status;
39
import io.grpc.SynchronizationContext;
40
import io.grpc.util.MultiChildLoadBalancer;
41
import io.grpc.xds.client.XdsLogger;
42
import io.grpc.xds.client.XdsLogger.XdsLogLevel;
43
import java.net.SocketAddress;
44
import java.util.ArrayList;
45
import java.util.Collections;
46
import java.util.HashMap;
47
import java.util.HashSet;
48
import java.util.List;
49
import java.util.Map;
50
import java.util.Set;
51
import java.util.stream.Collectors;
52
import javax.annotation.Nullable;
53

54
/**
55
 * A {@link LoadBalancer} that provides consistent hashing based load balancing to upstream hosts.
56
 * It implements the "Ketama" hashing that maps hosts onto a circle (the "ring") by hashing its
57
 * addresses. Each request is routed to a host by hashing some property of the request and finding
58
 * the nearest corresponding host clockwise around the ring. Each host is placed on the ring some
59
 * number of times proportional to its weight. With the ring partitioned appropriately, the
60
 * addition or removal of one host from a set of N hosts will affect only 1/N requests.
61
 */
62
final class RingHashLoadBalancer extends MultiChildLoadBalancer {
63
  private static final Status RPC_HASH_NOT_FOUND =
1✔
64
      Status.INTERNAL.withDescription("RPC hash not found. Probably a bug because xds resolver"
1✔
65
          + " config selector always generates a hash.");
66
  private static final XxHash64 hashFunc = XxHash64.INSTANCE;
1✔
67

68
  private final LoadBalancer.Factory lazyLbFactory =
1✔
69
      new LazyLoadBalancer.Factory(pickFirstLbProvider);
70
  private final XdsLogger logger;
71
  private final SynchronizationContext syncContext;
72
  private List<RingEntry> ring;
73

74
  RingHashLoadBalancer(Helper helper) {
75
    super(helper);
1✔
76
    syncContext = checkNotNull(helper.getSynchronizationContext(), "syncContext");
1✔
77
    logger = XdsLogger.withLogId(InternalLogId.allocate("ring_hash_lb", helper.getAuthority()));
1✔
78
    logger.log(XdsLogLevel.INFO, "Created");
1✔
79
  }
1✔
80

81
  @Override
82
  public Status acceptResolvedAddresses(ResolvedAddresses resolvedAddresses) {
83
    logger.log(XdsLogLevel.DEBUG, "Received resolution result: {0}", resolvedAddresses);
1✔
84
    List<EquivalentAddressGroup> addrList = resolvedAddresses.getAddresses();
1✔
85
    Status addressValidityStatus = validateAddrList(addrList);
1✔
86
    if (!addressValidityStatus.isOk()) {
1✔
87
      return addressValidityStatus;
1✔
88
    }
89

90
    try {
91
      resolvingAddresses = true;
1✔
92
      AcceptResolvedAddrRetVal acceptRetVal = acceptResolvedAddressesInternal(resolvedAddresses);
1✔
93
      if (!acceptRetVal.status.isOk()) {
1✔
94
        return acceptRetVal.status;
×
95
      }
96

97
      // Now do the ringhash specific logic with weights and building the ring
98
      RingHashConfig config = (RingHashConfig) resolvedAddresses.getLoadBalancingPolicyConfig();
1✔
99
      if (config == null) {
1✔
100
        throw new IllegalArgumentException("Missing RingHash configuration");
×
101
      }
102
      Map<EquivalentAddressGroup, Long> serverWeights = new HashMap<>();
1✔
103
      long totalWeight = 0L;
1✔
104
      for (EquivalentAddressGroup eag : addrList) {
1✔
105
        Long weight = eag.getAttributes().get(InternalXdsAttributes.ATTR_SERVER_WEIGHT);
1✔
106
        // Support two ways of server weighing: either multiple instances of the same address
107
        // or each address contains a per-address weight attribute. If a weight is not provided,
108
        // each occurrence of the address will be counted a weight value of one.
109
        if (weight == null) {
1✔
110
          weight = 1L;
×
111
        }
112
        totalWeight += weight;
1✔
113
        EquivalentAddressGroup addrKey = stripAttrs(eag);
1✔
114
        if (serverWeights.containsKey(addrKey)) {
1✔
115
          serverWeights.put(addrKey, serverWeights.get(addrKey) + weight);
×
116
        } else {
117
          serverWeights.put(addrKey, weight);
1✔
118
        }
119
      }
1✔
120
      // Calculate scale
121
      long minWeight = Collections.min(serverWeights.values());
1✔
122
      double normalizedMinWeight = (double) minWeight / totalWeight;
1✔
123
      // Scale up the number of hashes per host such that the least-weighted host gets a whole
124
      // number of hashes on the the ring. Other hosts might not end up with whole numbers, and
125
      // that's fine (the ring-building algorithm can handle this). This preserves the original
126
      // implementation's behavior: when weights aren't provided, all hosts should get an equal
127
      // number of hashes. In the case where this number exceeds the max_ring_size, it's scaled
128
      // back down to fit.
129
      double scale = Math.min(
1✔
130
          Math.ceil(normalizedMinWeight * config.minRingSize) / normalizedMinWeight,
1✔
131
          (double) config.maxRingSize);
132

133
      // Build the ring
134
      ring = buildRing(serverWeights, totalWeight, scale);
1✔
135

136
      // Must update channel picker before return so that new RPCs will not be routed to deleted
137
      // clusters and resolver can remove them in service config.
138
      updateOverallBalancingState();
1✔
139

140
      shutdownRemoved(acceptRetVal.removedChildren);
1✔
141
    } finally {
142
      this.resolvingAddresses = false;
1✔
143
    }
144

145
    return Status.OK;
1✔
146
  }
147

148

149
  /**
150
   * Updates the overall balancing state by aggregating the connectivity states of all subchannels.
151
   *
152
   * <p>Aggregation rules (in order of dominance):
153
   * <ol>
154
   *   <li>If there is at least one subchannel in READY state, overall state is READY</li>
155
   *   <li>If there are <em>2 or more</em> subchannels in TRANSIENT_FAILURE, overall state is
156
   *   TRANSIENT_FAILURE (to allow timely failover to another policy)</li>
157
   *   <li>If there is at least one subchannel in CONNECTING state, overall state is
158
   *   CONNECTING</li>
159
   *   <li> If there is one subchannel in TRANSIENT_FAILURE state and there is
160
   *    more than one subchannel, report CONNECTING </li>
161
   *   <li>If there is at least one subchannel in IDLE state, overall state is IDLE</li>
162
   *   <li>Otherwise, overall state is TRANSIENT_FAILURE</li>
163
   * </ol>
164
   */
165
  @Override
166
  protected void updateOverallBalancingState() {
167
    checkState(!getChildLbStates().isEmpty(), "no subchannel has been created");
1✔
168
    if (this.currentConnectivityState == SHUTDOWN) {
1✔
169
      // Ignore changes that happen after shutdown is called
170
      logger.log(XdsLogLevel.DEBUG, "UpdateOverallBalancingState called after shutdown");
×
171
      return;
×
172
    }
173

174
    // Calculate the current overall state to report
175
    int numIdle = 0;
1✔
176
    int numReady = 0;
1✔
177
    int numConnecting = 0;
1✔
178
    int numTF = 0;
1✔
179

180
    forloop:
181
    for (ChildLbState childLbState : getChildLbStates()) {
1✔
182
      ConnectivityState state = childLbState.getCurrentState();
1✔
183
      switch (state) {
1✔
184
        case READY:
185
          numReady++;
1✔
186
          break forloop;
1✔
187
        case CONNECTING:
188
          numConnecting++;
1✔
189
          break;
1✔
190
        case IDLE:
191
          numIdle++;
1✔
192
          break;
1✔
193
        case TRANSIENT_FAILURE:
194
          numTF++;
1✔
195
          break;
1✔
196
        default:
197
          // ignore it
198
      }
199
    }
1✔
200

201
    ConnectivityState overallState;
202
    if (numReady > 0) {
1✔
203
      overallState = READY;
1✔
204
    } else if (numTF >= 2) {
1✔
205
      overallState = TRANSIENT_FAILURE;
1✔
206
    } else if (numConnecting > 0) {
1✔
207
      overallState = CONNECTING;
1✔
208
    } else if (numTF == 1 && getChildLbStates().size() > 1) {
1✔
209
      overallState = CONNECTING;
1✔
210
    } else if (numIdle > 0) {
1✔
211
      overallState = IDLE;
1✔
212
    } else {
213
      overallState = TRANSIENT_FAILURE;
×
214
    }
215

216
    RingHashPicker picker = new RingHashPicker(syncContext, ring, getImmutableChildMap());
1✔
217
    getHelper().updateBalancingState(overallState, picker);
1✔
218
    this.currentConnectivityState = overallState;
1✔
219
  }
1✔
220

221
  @Override
222
  protected ChildLbState createChildLbState(Object key, Object policyConfig,
223
      ResolvedAddresses resolvedAddresses) {
224
    return new ChildLbState(key, lazyLbFactory, null);
1✔
225
  }
226

227
  private Status validateAddrList(List<EquivalentAddressGroup> addrList) {
228
    if (addrList.isEmpty()) {
1✔
229
      Status unavailableStatus = Status.UNAVAILABLE.withDescription("Ring hash lb error: EDS "
×
230
              + "resolution was successful, but returned server addresses are empty.");
231
      handleNameResolutionError(unavailableStatus);
×
232
      return unavailableStatus;
×
233
    }
234

235
    String dupAddrString = validateNoDuplicateAddresses(addrList);
1✔
236
    if (dupAddrString != null) {
1✔
237
      Status unavailableStatus = Status.UNAVAILABLE.withDescription("Ring hash lb error: EDS "
1✔
238
              + "resolution was successful, but there were duplicate addresses: " + dupAddrString);
239
      handleNameResolutionError(unavailableStatus);
1✔
240
      return unavailableStatus;
1✔
241
    }
242

243
    long totalWeight = 0;
1✔
244
    for (EquivalentAddressGroup eag : addrList) {
1✔
245
      Long weight = eag.getAttributes().get(InternalXdsAttributes.ATTR_SERVER_WEIGHT);
1✔
246

247
      if (weight == null) {
1✔
248
        weight = 1L;
×
249
      }
250

251
      if (weight < 0) {
1✔
252
        Status unavailableStatus = Status.UNAVAILABLE.withDescription(
1✔
253
            String.format("Ring hash lb error: EDS resolution was successful, but returned a "
1✔
254
                        + "negative weight for %s.", stripAttrs(eag)));
1✔
255
        handleNameResolutionError(unavailableStatus);
1✔
256
        return unavailableStatus;
1✔
257
      }
258
      if (weight > UnsignedInteger.MAX_VALUE.longValue()) {
1✔
259
        Status unavailableStatus = Status.UNAVAILABLE.withDescription(
1✔
260
            String.format("Ring hash lb error: EDS resolution was successful, but returned a weight"
1✔
261
                + " too large to fit in an unsigned int for %s.", stripAttrs(eag)));
1✔
262
        handleNameResolutionError(unavailableStatus);
1✔
263
        return unavailableStatus;
1✔
264
      }
265
      totalWeight += weight;
1✔
266
    }
1✔
267

268
    if (totalWeight > UnsignedInteger.MAX_VALUE.longValue()) {
1✔
269
      Status unavailableStatus = Status.UNAVAILABLE.withDescription(
1✔
270
          String.format(
1✔
271
              "Ring hash lb error: EDS resolution was successful, but returned a sum of weights too"
272
                  + " large to fit in an unsigned int (%d).", totalWeight));
1✔
273
      handleNameResolutionError(unavailableStatus);
1✔
274
      return unavailableStatus;
1✔
275
    }
276

277
    return Status.OK;
1✔
278
  }
279

280
  @Nullable
281
  private String validateNoDuplicateAddresses(List<EquivalentAddressGroup> addrList) {
282
    Set<SocketAddress> addresses = new HashSet<>();
1✔
283
    Multiset<String> dups = HashMultiset.create();
1✔
284
    for (EquivalentAddressGroup eag : addrList) {
1✔
285
      for (SocketAddress address : eag.getAddresses()) {
1✔
286
        if (!addresses.add(address)) {
1✔
287
          dups.add(address.toString());
1✔
288
        }
289
      }
1✔
290
    }
1✔
291

292
    if (!dups.isEmpty()) {
1✔
293
      return dups.entrySet().stream()
1✔
294
          .map((dup) ->
1✔
295
              String.format("Address: %s, count: %d", dup.getElement(), dup.getCount() + 1))
1✔
296
          .collect(Collectors.joining("; "));
1✔
297
    }
298

299
    return null;
1✔
300
  }
301

302
  private static List<RingEntry> buildRing(
303
      Map<EquivalentAddressGroup, Long> serverWeights, long totalWeight, double scale) {
304
    List<RingEntry> ring = new ArrayList<>();
1✔
305
    double currentHashes = 0.0;
1✔
306
    double targetHashes = 0.0;
1✔
307
    for (Map.Entry<EquivalentAddressGroup, Long> entry : serverWeights.entrySet()) {
1✔
308
      Endpoint endpoint = new Endpoint(entry.getKey());
1✔
309
      double normalizedWeight = (double) entry.getValue() / totalWeight;
1✔
310
      // Per GRFC A61 use the first address for the hash
311
      StringBuilder sb = new StringBuilder(entry.getKey().getAddresses().get(0).toString());
1✔
312
      sb.append('_');
1✔
313
      int lengthWithoutCounter = sb.length();
1✔
314
      targetHashes += scale * normalizedWeight;
1✔
315
      long i = 0L;
1✔
316
      while (currentHashes < targetHashes) {
1✔
317
        sb.append(i);
1✔
318
        long hash = hashFunc.hashAsciiString(sb.toString());
1✔
319
        ring.add(new RingEntry(hash, endpoint));
1✔
320
        i++;
1✔
321
        currentHashes++;
1✔
322
        sb.setLength(lengthWithoutCounter);
1✔
323
      }
1✔
324
    }
1✔
325
    Collections.sort(ring);
1✔
326
    return Collections.unmodifiableList(ring);
1✔
327
  }
328

329
  @SuppressWarnings("ReferenceEquality")
330
  public static EquivalentAddressGroup stripAttrs(EquivalentAddressGroup eag) {
331
    if (eag.getAttributes() == Attributes.EMPTY) {
1✔
332
      return eag;
×
333
    }
334
    return new EquivalentAddressGroup(eag.getAddresses());
1✔
335
  }
336

337
  private static final class RingHashPicker extends SubchannelPicker {
338
    private final SynchronizationContext syncContext;
339
    private final List<RingEntry> ring;
340
    // Avoid synchronization between pickSubchannel and subchannel's connectivity state change,
341
    // freeze picker's view of subchannel's connectivity state.
342
    // TODO(chengyuanzhang): can be more performance-friendly with
343
    //  IdentityHashMap<Subchannel, ConnectivityStateInfo> and RingEntry contains Subchannel.
344
    private final Map<Endpoint, SubchannelView> pickableSubchannels;  // read-only
345

346
    private RingHashPicker(
347
        SynchronizationContext syncContext, List<RingEntry> ring,
348
        ImmutableMap<Object, ChildLbState> subchannels) {
1✔
349
      this.syncContext = syncContext;
1✔
350
      this.ring = ring;
1✔
351
      pickableSubchannels = new HashMap<>(subchannels.size());
1✔
352
      for (Map.Entry<Object, ChildLbState> entry : subchannels.entrySet()) {
1✔
353
        ChildLbState childLbState = entry.getValue();
1✔
354
        pickableSubchannels.put((Endpoint)entry.getKey(),
1✔
355
            new SubchannelView(childLbState, childLbState.getCurrentState()));
1✔
356
      }
1✔
357
    }
1✔
358

359
    // Find the ring entry with hash next to (clockwise) the RPC's hash (binary search).
360
    private int getTargetIndex(Long requestHash) {
361
      if (ring.size() <= 1) {
1✔
362
        return 0;
×
363
      }
364

365
      int low = 0;
1✔
366
      int high = ring.size() - 1;
1✔
367
      int mid = (low + high) / 2;
1✔
368
      do {
369
        long midVal = ring.get(mid).hash;
1✔
370
        long midValL = mid == 0 ? 0 : ring.get(mid - 1).hash;
1✔
371
        if (requestHash <= midVal && requestHash > midValL) {
1✔
372
          break;
1✔
373
        }
374
        if (midVal < requestHash) {
1✔
375
          low = mid + 1;
1✔
376
        } else {
377
          high =  mid - 1;
1✔
378
        }
379
        mid = (low + high) / 2;
1✔
380
      } while (mid < ring.size() && low <= high);
1✔
381
      return mid;
1✔
382
    }
383

384
    @Override
385
    public PickResult pickSubchannel(PickSubchannelArgs args) {
386
      Long requestHash = args.getCallOptions().getOption(XdsNameResolver.RPC_HASH_KEY);
1✔
387
      if (requestHash == null) {
1✔
388
        return PickResult.withError(RPC_HASH_NOT_FOUND);
×
389
      }
390

391
      int targetIndex = getTargetIndex(requestHash);
1✔
392

393
      // Per gRFC A61, because of sticky-TF with PickFirst's auto reconnect on TF, we ignore
394
      // all TF subchannels and find the first ring entry in READY, CONNECTING or IDLE.  If
395
      // CONNECTING or IDLE we return a pick with no results.  Additionally, if that entry is in
396
      // IDLE, we initiate a connection.
397
      for (int i = 0; i < ring.size(); i++) {
1✔
398
        int index = (targetIndex + i) % ring.size();
1✔
399
        SubchannelView subchannelView = pickableSubchannels.get(ring.get(index).addrKey);
1✔
400
        ChildLbState childLbState = subchannelView.childLbState;
1✔
401

402
        if (subchannelView.connectivityState  == READY) {
1✔
403
          return childLbState.getCurrentPicker().pickSubchannel(args);
1✔
404
        }
405

406
        // RPCs can be buffered if the next subchannel is pending (per A62). Otherwise, RPCs
407
        // are failed unless there is a READY connection.
408
        if (subchannelView.connectivityState == CONNECTING) {
1✔
409
          return PickResult.withNoResult();
1✔
410
        }
411

412
        if (subchannelView.connectivityState == IDLE) {
1✔
413
          syncContext.execute(() -> {
1✔
414
            childLbState.getLb().requestConnection();
1✔
415
          });
1✔
416

417
          return PickResult.withNoResult(); // Indicates that this should be retried after backoff
1✔
418
        }
419
      }
420

421
      // return the pick from the original subchannel hit by hash, which is probably an error
422
      ChildLbState originalSubchannel =
1✔
423
          pickableSubchannels.get(ring.get(targetIndex).addrKey).childLbState;
1✔
424
      return originalSubchannel.getCurrentPicker().pickSubchannel(args);
1✔
425
    }
426

427
  }
428

429
  /**
430
   * An unmodifiable view of a subchannel with state not subject to its real connectivity
431
   * state changes.
432
   */
433
  private static final class SubchannelView {
434
    private final ChildLbState childLbState;
435
    private final ConnectivityState connectivityState;
436

437
    private SubchannelView(ChildLbState childLbState, ConnectivityState state) {
1✔
438
      this.childLbState = childLbState;
1✔
439
      this.connectivityState = state;
1✔
440
    }
1✔
441
  }
442

443
  private static final class RingEntry implements Comparable<RingEntry> {
444
    private final long hash;
445
    private final Endpoint addrKey;
446

447
    private RingEntry(long hash, Endpoint addrKey) {
1✔
448
      this.hash = hash;
1✔
449
      this.addrKey = addrKey;
1✔
450
    }
1✔
451

452
    @Override
453
    public int compareTo(RingEntry entry) {
454
      return Long.compare(hash, entry.hash);
1✔
455
    }
456
  }
457

458
  /**
459
   * Configures the ring property. The larger the ring is (that is, the more hashes there are
460
   * for each provided host) the better the request distribution will reflect the desired weights.
461
   */
462
  static final class RingHashConfig {
463
    final long minRingSize;
464
    final long maxRingSize;
465

466
    RingHashConfig(long minRingSize, long maxRingSize) {
1✔
467
      checkArgument(minRingSize > 0, "minRingSize <= 0");
1✔
468
      checkArgument(maxRingSize > 0, "maxRingSize <= 0");
1✔
469
      checkArgument(minRingSize <= maxRingSize, "minRingSize > maxRingSize");
1✔
470
      this.minRingSize = minRingSize;
1✔
471
      this.maxRingSize = maxRingSize;
1✔
472
    }
1✔
473

474
    @Override
475
    public String toString() {
476
      return MoreObjects.toStringHelper(this)
×
477
          .add("minRingSize", minRingSize)
×
478
          .add("maxRingSize", maxRingSize)
×
479
          .toString();
×
480
    }
481
  }
482
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc