• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

grpc / grpc-java / #19429

17 Aug 2024 03:55PM UTC coverage: 84.501% (+0.02%) from 84.482%
#19429

push

github

ejona86
util: Remove MultiChildLB.getImmutableChildMap()

No usages actually needed a map nor a copy.

33411 of 39539 relevant lines covered (84.5%)

0.85 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

91.67
/../xds/src/main/java/io/grpc/xds/RingHashLoadBalancer.java
1
/*
2
 * Copyright 2021 The gRPC Authors
3
 *
4
 * Licensed under the Apache License, Version 2.0 (the "License");
5
 * you may not use this file except in compliance with the License.
6
 * You may obtain a copy of the License at
7
 *
8
 *     http://www.apache.org/licenses/LICENSE-2.0
9
 *
10
 * Unless required by applicable law or agreed to in writing, software
11
 * distributed under the License is distributed on an "AS IS" BASIS,
12
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
 * See the License for the specific language governing permissions and
14
 * limitations under the License.
15
 */
16

17
package io.grpc.xds;
18

19
import static com.google.common.base.Preconditions.checkArgument;
20
import static com.google.common.base.Preconditions.checkNotNull;
21
import static com.google.common.base.Preconditions.checkState;
22
import static io.grpc.ConnectivityState.CONNECTING;
23
import static io.grpc.ConnectivityState.IDLE;
24
import static io.grpc.ConnectivityState.READY;
25
import static io.grpc.ConnectivityState.SHUTDOWN;
26
import static io.grpc.ConnectivityState.TRANSIENT_FAILURE;
27

28
import com.google.common.base.MoreObjects;
29
import com.google.common.collect.HashMultiset;
30
import com.google.common.collect.Multiset;
31
import com.google.common.primitives.UnsignedInteger;
32
import io.grpc.Attributes;
33
import io.grpc.ConnectivityState;
34
import io.grpc.EquivalentAddressGroup;
35
import io.grpc.InternalLogId;
36
import io.grpc.LoadBalancer;
37
import io.grpc.Status;
38
import io.grpc.SynchronizationContext;
39
import io.grpc.util.MultiChildLoadBalancer;
40
import io.grpc.xds.client.XdsLogger;
41
import io.grpc.xds.client.XdsLogger.XdsLogLevel;
42
import java.net.SocketAddress;
43
import java.util.ArrayList;
44
import java.util.Collection;
45
import java.util.Collections;
46
import java.util.HashMap;
47
import java.util.HashSet;
48
import java.util.List;
49
import java.util.Map;
50
import java.util.Set;
51
import java.util.stream.Collectors;
52
import javax.annotation.Nullable;
53

54
/**
55
 * A {@link LoadBalancer} that provides consistent hashing based load balancing to upstream hosts.
56
 * It implements the "Ketama" hashing that maps hosts onto a circle (the "ring") by hashing its
57
 * addresses. Each request is routed to a host by hashing some property of the request and finding
58
 * the nearest corresponding host clockwise around the ring. Each host is placed on the ring some
59
 * number of times proportional to its weight. With the ring partitioned appropriately, the
60
 * addition or removal of one host from a set of N hosts will affect only 1/N requests.
61
 */
62
final class RingHashLoadBalancer extends MultiChildLoadBalancer {
63
  private static final Status RPC_HASH_NOT_FOUND =
1✔
64
      Status.INTERNAL.withDescription("RPC hash not found. Probably a bug because xds resolver"
1✔
65
          + " config selector always generates a hash.");
66
  private static final XxHash64 hashFunc = XxHash64.INSTANCE;
1✔
67

68
  private final LoadBalancer.Factory lazyLbFactory =
1✔
69
      new LazyLoadBalancer.Factory(pickFirstLbProvider);
70
  private final XdsLogger logger;
71
  private final SynchronizationContext syncContext;
72
  private List<RingEntry> ring;
73

74
  RingHashLoadBalancer(Helper helper) {
75
    super(helper);
1✔
76
    syncContext = checkNotNull(helper.getSynchronizationContext(), "syncContext");
1✔
77
    logger = XdsLogger.withLogId(InternalLogId.allocate("ring_hash_lb", helper.getAuthority()));
1✔
78
    logger.log(XdsLogLevel.INFO, "Created");
1✔
79
  }
1✔
80

81
  @Override
82
  public Status acceptResolvedAddresses(ResolvedAddresses resolvedAddresses) {
83
    logger.log(XdsLogLevel.DEBUG, "Received resolution result: {0}", resolvedAddresses);
1✔
84
    List<EquivalentAddressGroup> addrList = resolvedAddresses.getAddresses();
1✔
85
    Status addressValidityStatus = validateAddrList(addrList);
1✔
86
    if (!addressValidityStatus.isOk()) {
1✔
87
      return addressValidityStatus;
1✔
88
    }
89

90
    try {
91
      resolvingAddresses = true;
1✔
92
      AcceptResolvedAddrRetVal acceptRetVal = acceptResolvedAddressesInternal(resolvedAddresses);
1✔
93
      if (!acceptRetVal.status.isOk()) {
1✔
94
        return acceptRetVal.status;
×
95
      }
96

97
      // Now do the ringhash specific logic with weights and building the ring
98
      RingHashConfig config = (RingHashConfig) resolvedAddresses.getLoadBalancingPolicyConfig();
1✔
99
      if (config == null) {
1✔
100
        throw new IllegalArgumentException("Missing RingHash configuration");
×
101
      }
102
      Map<EquivalentAddressGroup, Long> serverWeights = new HashMap<>();
1✔
103
      long totalWeight = 0L;
1✔
104
      for (EquivalentAddressGroup eag : addrList) {
1✔
105
        Long weight = eag.getAttributes().get(InternalXdsAttributes.ATTR_SERVER_WEIGHT);
1✔
106
        // Support two ways of server weighing: either multiple instances of the same address
107
        // or each address contains a per-address weight attribute. If a weight is not provided,
108
        // each occurrence of the address will be counted a weight value of one.
109
        if (weight == null) {
1✔
110
          weight = 1L;
×
111
        }
112
        totalWeight += weight;
1✔
113
        EquivalentAddressGroup addrKey = stripAttrs(eag);
1✔
114
        if (serverWeights.containsKey(addrKey)) {
1✔
115
          serverWeights.put(addrKey, serverWeights.get(addrKey) + weight);
×
116
        } else {
117
          serverWeights.put(addrKey, weight);
1✔
118
        }
119
      }
1✔
120
      // Calculate scale
121
      long minWeight = Collections.min(serverWeights.values());
1✔
122
      double normalizedMinWeight = (double) minWeight / totalWeight;
1✔
123
      // Scale up the number of hashes per host such that the least-weighted host gets a whole
124
      // number of hashes on the the ring. Other hosts might not end up with whole numbers, and
125
      // that's fine (the ring-building algorithm can handle this). This preserves the original
126
      // implementation's behavior: when weights aren't provided, all hosts should get an equal
127
      // number of hashes. In the case where this number exceeds the max_ring_size, it's scaled
128
      // back down to fit.
129
      double scale = Math.min(
1✔
130
          Math.ceil(normalizedMinWeight * config.minRingSize) / normalizedMinWeight,
1✔
131
          (double) config.maxRingSize);
132

133
      // Build the ring
134
      ring = buildRing(serverWeights, totalWeight, scale);
1✔
135

136
      // Must update channel picker before return so that new RPCs will not be routed to deleted
137
      // clusters and resolver can remove them in service config.
138
      updateOverallBalancingState();
1✔
139

140
      shutdownRemoved(acceptRetVal.removedChildren);
1✔
141
    } finally {
142
      this.resolvingAddresses = false;
1✔
143
    }
144

145
    return Status.OK;
1✔
146
  }
147

148

149
  /**
150
   * Updates the overall balancing state by aggregating the connectivity states of all subchannels.
151
   *
152
   * <p>Aggregation rules (in order of dominance):
153
   * <ol>
154
   *   <li>If there is at least one subchannel in READY state, overall state is READY</li>
155
   *   <li>If there are <em>2 or more</em> subchannels in TRANSIENT_FAILURE, overall state is
156
   *   TRANSIENT_FAILURE (to allow timely failover to another policy)</li>
157
   *   <li>If there is at least one subchannel in CONNECTING state, overall state is
158
   *   CONNECTING</li>
159
   *   <li> If there is one subchannel in TRANSIENT_FAILURE state and there is
160
   *    more than one subchannel, report CONNECTING </li>
161
   *   <li>If there is at least one subchannel in IDLE state, overall state is IDLE</li>
162
   *   <li>Otherwise, overall state is TRANSIENT_FAILURE</li>
163
   * </ol>
164
   */
165
  @Override
166
  protected void updateOverallBalancingState() {
167
    checkState(!getChildLbStates().isEmpty(), "no subchannel has been created");
1✔
168
    if (this.currentConnectivityState == SHUTDOWN) {
1✔
169
      // Ignore changes that happen after shutdown is called
170
      logger.log(XdsLogLevel.DEBUG, "UpdateOverallBalancingState called after shutdown");
×
171
      return;
×
172
    }
173

174
    // Calculate the current overall state to report
175
    int numIdle = 0;
1✔
176
    int numReady = 0;
1✔
177
    int numConnecting = 0;
1✔
178
    int numTF = 0;
1✔
179

180
    forloop:
181
    for (ChildLbState childLbState : getChildLbStates()) {
1✔
182
      ConnectivityState state = childLbState.getCurrentState();
1✔
183
      switch (state) {
1✔
184
        case READY:
185
          numReady++;
1✔
186
          break forloop;
1✔
187
        case CONNECTING:
188
          numConnecting++;
1✔
189
          break;
1✔
190
        case IDLE:
191
          numIdle++;
1✔
192
          break;
1✔
193
        case TRANSIENT_FAILURE:
194
          numTF++;
1✔
195
          break;
1✔
196
        default:
197
          // ignore it
198
      }
199
    }
1✔
200

201
    ConnectivityState overallState;
202
    if (numReady > 0) {
1✔
203
      overallState = READY;
1✔
204
    } else if (numTF >= 2) {
1✔
205
      overallState = TRANSIENT_FAILURE;
1✔
206
    } else if (numConnecting > 0) {
1✔
207
      overallState = CONNECTING;
1✔
208
    } else if (numTF == 1 && getChildLbStates().size() > 1) {
1✔
209
      overallState = CONNECTING;
1✔
210
    } else if (numIdle > 0) {
1✔
211
      overallState = IDLE;
1✔
212
    } else {
213
      overallState = TRANSIENT_FAILURE;
×
214
    }
215

216
    RingHashPicker picker = new RingHashPicker(syncContext, ring, getChildLbStates());
1✔
217
    getHelper().updateBalancingState(overallState, picker);
1✔
218
    this.currentConnectivityState = overallState;
1✔
219
  }
1✔
220

221
  @Override
222
  protected ChildLbState createChildLbState(Object key, Object policyConfig,
223
      ResolvedAddresses resolvedAddresses) {
224
    return new ChildLbState(key, lazyLbFactory, null);
1✔
225
  }
226

227
  private Status validateAddrList(List<EquivalentAddressGroup> addrList) {
228
    if (addrList.isEmpty()) {
1✔
229
      Status unavailableStatus = Status.UNAVAILABLE.withDescription("Ring hash lb error: EDS "
×
230
              + "resolution was successful, but returned server addresses are empty.");
231
      handleNameResolutionError(unavailableStatus);
×
232
      return unavailableStatus;
×
233
    }
234

235
    String dupAddrString = validateNoDuplicateAddresses(addrList);
1✔
236
    if (dupAddrString != null) {
1✔
237
      Status unavailableStatus = Status.UNAVAILABLE.withDescription("Ring hash lb error: EDS "
1✔
238
              + "resolution was successful, but there were duplicate addresses: " + dupAddrString);
239
      handleNameResolutionError(unavailableStatus);
1✔
240
      return unavailableStatus;
1✔
241
    }
242

243
    long totalWeight = 0;
1✔
244
    for (EquivalentAddressGroup eag : addrList) {
1✔
245
      Long weight = eag.getAttributes().get(InternalXdsAttributes.ATTR_SERVER_WEIGHT);
1✔
246

247
      if (weight == null) {
1✔
248
        weight = 1L;
×
249
      }
250

251
      if (weight < 0) {
1✔
252
        Status unavailableStatus = Status.UNAVAILABLE.withDescription(
1✔
253
            String.format("Ring hash lb error: EDS resolution was successful, but returned a "
1✔
254
                        + "negative weight for %s.", stripAttrs(eag)));
1✔
255
        handleNameResolutionError(unavailableStatus);
1✔
256
        return unavailableStatus;
1✔
257
      }
258
      if (weight > UnsignedInteger.MAX_VALUE.longValue()) {
1✔
259
        Status unavailableStatus = Status.UNAVAILABLE.withDescription(
1✔
260
            String.format("Ring hash lb error: EDS resolution was successful, but returned a weight"
1✔
261
                + " too large to fit in an unsigned int for %s.", stripAttrs(eag)));
1✔
262
        handleNameResolutionError(unavailableStatus);
1✔
263
        return unavailableStatus;
1✔
264
      }
265
      totalWeight += weight;
1✔
266
    }
1✔
267

268
    if (totalWeight > UnsignedInteger.MAX_VALUE.longValue()) {
1✔
269
      Status unavailableStatus = Status.UNAVAILABLE.withDescription(
1✔
270
          String.format(
1✔
271
              "Ring hash lb error: EDS resolution was successful, but returned a sum of weights too"
272
                  + " large to fit in an unsigned int (%d).", totalWeight));
1✔
273
      handleNameResolutionError(unavailableStatus);
1✔
274
      return unavailableStatus;
1✔
275
    }
276

277
    return Status.OK;
1✔
278
  }
279

280
  @Nullable
281
  private String validateNoDuplicateAddresses(List<EquivalentAddressGroup> addrList) {
282
    Set<SocketAddress> addresses = new HashSet<>();
1✔
283
    Multiset<String> dups = HashMultiset.create();
1✔
284
    for (EquivalentAddressGroup eag : addrList) {
1✔
285
      for (SocketAddress address : eag.getAddresses()) {
1✔
286
        if (!addresses.add(address)) {
1✔
287
          dups.add(address.toString());
1✔
288
        }
289
      }
1✔
290
    }
1✔
291

292
    if (!dups.isEmpty()) {
1✔
293
      return dups.entrySet().stream()
1✔
294
          .map((dup) ->
1✔
295
              String.format("Address: %s, count: %d", dup.getElement(), dup.getCount() + 1))
1✔
296
          .collect(Collectors.joining("; "));
1✔
297
    }
298

299
    return null;
1✔
300
  }
301

302
  private static List<RingEntry> buildRing(
303
      Map<EquivalentAddressGroup, Long> serverWeights, long totalWeight, double scale) {
304
    List<RingEntry> ring = new ArrayList<>();
1✔
305
    double currentHashes = 0.0;
1✔
306
    double targetHashes = 0.0;
1✔
307
    for (Map.Entry<EquivalentAddressGroup, Long> entry : serverWeights.entrySet()) {
1✔
308
      Endpoint endpoint = new Endpoint(entry.getKey());
1✔
309
      double normalizedWeight = (double) entry.getValue() / totalWeight;
1✔
310
      // Per GRFC A61 use the first address for the hash
311
      StringBuilder sb = new StringBuilder(entry.getKey().getAddresses().get(0).toString());
1✔
312
      sb.append('_');
1✔
313
      int lengthWithoutCounter = sb.length();
1✔
314
      targetHashes += scale * normalizedWeight;
1✔
315
      long i = 0L;
1✔
316
      while (currentHashes < targetHashes) {
1✔
317
        sb.append(i);
1✔
318
        long hash = hashFunc.hashAsciiString(sb.toString());
1✔
319
        ring.add(new RingEntry(hash, endpoint));
1✔
320
        i++;
1✔
321
        currentHashes++;
1✔
322
        sb.setLength(lengthWithoutCounter);
1✔
323
      }
1✔
324
    }
1✔
325
    Collections.sort(ring);
1✔
326
    return Collections.unmodifiableList(ring);
1✔
327
  }
328

329
  @SuppressWarnings("ReferenceEquality")
330
  public static EquivalentAddressGroup stripAttrs(EquivalentAddressGroup eag) {
331
    if (eag.getAttributes() == Attributes.EMPTY) {
1✔
332
      return eag;
×
333
    }
334
    return new EquivalentAddressGroup(eag.getAddresses());
1✔
335
  }
336

337
  private static final class RingHashPicker extends SubchannelPicker {
338
    private final SynchronizationContext syncContext;
339
    private final List<RingEntry> ring;
340
    // Avoid synchronization between pickSubchannel and subchannel's connectivity state change,
341
    // freeze picker's view of subchannel's connectivity state.
342
    // TODO(chengyuanzhang): can be more performance-friendly with
343
    //  IdentityHashMap<Subchannel, ConnectivityStateInfo> and RingEntry contains Subchannel.
344
    private final Map<Endpoint, SubchannelView> pickableSubchannels;  // read-only
345

346
    private RingHashPicker(
347
        SynchronizationContext syncContext, List<RingEntry> ring,
348
        Collection<ChildLbState> children) {
1✔
349
      this.syncContext = syncContext;
1✔
350
      this.ring = ring;
1✔
351
      pickableSubchannels = new HashMap<>(children.size());
1✔
352
      for (ChildLbState childLbState : children) {
1✔
353
        pickableSubchannels.put((Endpoint)childLbState.getKey(),
1✔
354
            new SubchannelView(childLbState, childLbState.getCurrentState()));
1✔
355
      }
1✔
356
    }
1✔
357

358
    // Find the ring entry with hash next to (clockwise) the RPC's hash (binary search).
359
    private int getTargetIndex(Long requestHash) {
360
      if (ring.size() <= 1) {
1✔
361
        return 0;
×
362
      }
363

364
      int low = 0;
1✔
365
      int high = ring.size() - 1;
1✔
366
      int mid = (low + high) / 2;
1✔
367
      do {
368
        long midVal = ring.get(mid).hash;
1✔
369
        long midValL = mid == 0 ? 0 : ring.get(mid - 1).hash;
1✔
370
        if (requestHash <= midVal && requestHash > midValL) {
1✔
371
          break;
1✔
372
        }
373
        if (midVal < requestHash) {
1✔
374
          low = mid + 1;
1✔
375
        } else {
376
          high =  mid - 1;
1✔
377
        }
378
        mid = (low + high) / 2;
1✔
379
      } while (mid < ring.size() && low <= high);
1✔
380
      return mid;
1✔
381
    }
382

383
    @Override
384
    public PickResult pickSubchannel(PickSubchannelArgs args) {
385
      Long requestHash = args.getCallOptions().getOption(XdsNameResolver.RPC_HASH_KEY);
1✔
386
      if (requestHash == null) {
1✔
387
        return PickResult.withError(RPC_HASH_NOT_FOUND);
×
388
      }
389

390
      int targetIndex = getTargetIndex(requestHash);
1✔
391

392
      // Per gRFC A61, because of sticky-TF with PickFirst's auto reconnect on TF, we ignore
393
      // all TF subchannels and find the first ring entry in READY, CONNECTING or IDLE.  If
394
      // CONNECTING or IDLE we return a pick with no results.  Additionally, if that entry is in
395
      // IDLE, we initiate a connection.
396
      for (int i = 0; i < ring.size(); i++) {
1✔
397
        int index = (targetIndex + i) % ring.size();
1✔
398
        SubchannelView subchannelView = pickableSubchannels.get(ring.get(index).addrKey);
1✔
399
        ChildLbState childLbState = subchannelView.childLbState;
1✔
400

401
        if (subchannelView.connectivityState  == READY) {
1✔
402
          return childLbState.getCurrentPicker().pickSubchannel(args);
1✔
403
        }
404

405
        // RPCs can be buffered if the next subchannel is pending (per A62). Otherwise, RPCs
406
        // are failed unless there is a READY connection.
407
        if (subchannelView.connectivityState == CONNECTING) {
1✔
408
          return PickResult.withNoResult();
1✔
409
        }
410

411
        if (subchannelView.connectivityState == IDLE) {
1✔
412
          syncContext.execute(() -> {
1✔
413
            childLbState.getLb().requestConnection();
1✔
414
          });
1✔
415

416
          return PickResult.withNoResult(); // Indicates that this should be retried after backoff
1✔
417
        }
418
      }
419

420
      // return the pick from the original subchannel hit by hash, which is probably an error
421
      ChildLbState originalSubchannel =
1✔
422
          pickableSubchannels.get(ring.get(targetIndex).addrKey).childLbState;
1✔
423
      return originalSubchannel.getCurrentPicker().pickSubchannel(args);
1✔
424
    }
425

426
  }
427

428
  /**
429
   * An unmodifiable view of a subchannel with state not subject to its real connectivity
430
   * state changes.
431
   */
432
  private static final class SubchannelView {
433
    private final ChildLbState childLbState;
434
    private final ConnectivityState connectivityState;
435

436
    private SubchannelView(ChildLbState childLbState, ConnectivityState state) {
1✔
437
      this.childLbState = childLbState;
1✔
438
      this.connectivityState = state;
1✔
439
    }
1✔
440
  }
441

442
  private static final class RingEntry implements Comparable<RingEntry> {
443
    private final long hash;
444
    private final Endpoint addrKey;
445

446
    private RingEntry(long hash, Endpoint addrKey) {
1✔
447
      this.hash = hash;
1✔
448
      this.addrKey = addrKey;
1✔
449
    }
1✔
450

451
    @Override
452
    public int compareTo(RingEntry entry) {
453
      return Long.compare(hash, entry.hash);
1✔
454
    }
455
  }
456

457
  /**
458
   * Configures the ring property. The larger the ring is (that is, the more hashes there are
459
   * for each provided host) the better the request distribution will reflect the desired weights.
460
   */
461
  static final class RingHashConfig {
462
    final long minRingSize;
463
    final long maxRingSize;
464

465
    RingHashConfig(long minRingSize, long maxRingSize) {
1✔
466
      checkArgument(minRingSize > 0, "minRingSize <= 0");
1✔
467
      checkArgument(maxRingSize > 0, "maxRingSize <= 0");
1✔
468
      checkArgument(minRingSize <= maxRingSize, "minRingSize > maxRingSize");
1✔
469
      this.minRingSize = minRingSize;
1✔
470
      this.maxRingSize = maxRingSize;
1✔
471
    }
1✔
472

473
    @Override
474
    public String toString() {
475
      return MoreObjects.toStringHelper(this)
×
476
          .add("minRingSize", minRingSize)
×
477
          .add("maxRingSize", maxRingSize)
×
478
          .toString();
×
479
    }
480
  }
481
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc