• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

grpc / grpc-java / #19440

29 Aug 2024 03:04PM UTC coverage: 84.491% (-0.002%) from 84.493%
#19440

push

github

ejona86
util: Remove child policy config from MultiChildLB state

The child policy config should be refreshed every address update, so it
shouldn't be stored in the ChildLbState. In addition, none of the
current usages actually used what was stored in the ChildLbState in a
meaningful way (it was always null).

ResolvedAddresses was also removed from createChildLbState(), as nothing
in it should be needed for creation; it varies over time and the values
passed at creation are immutable.

33402 of 39533 relevant lines covered (84.49%)

0.84 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

91.67
/../xds/src/main/java/io/grpc/xds/RingHashLoadBalancer.java
1
/*
2
 * Copyright 2021 The gRPC Authors
3
 *
4
 * Licensed under the Apache License, Version 2.0 (the "License");
5
 * you may not use this file except in compliance with the License.
6
 * You may obtain a copy of the License at
7
 *
8
 *     http://www.apache.org/licenses/LICENSE-2.0
9
 *
10
 * Unless required by applicable law or agreed to in writing, software
11
 * distributed under the License is distributed on an "AS IS" BASIS,
12
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
 * See the License for the specific language governing permissions and
14
 * limitations under the License.
15
 */
16

17
package io.grpc.xds;
18

19
import static com.google.common.base.Preconditions.checkArgument;
20
import static com.google.common.base.Preconditions.checkNotNull;
21
import static com.google.common.base.Preconditions.checkState;
22
import static io.grpc.ConnectivityState.CONNECTING;
23
import static io.grpc.ConnectivityState.IDLE;
24
import static io.grpc.ConnectivityState.READY;
25
import static io.grpc.ConnectivityState.SHUTDOWN;
26
import static io.grpc.ConnectivityState.TRANSIENT_FAILURE;
27

28
import com.google.common.base.MoreObjects;
29
import com.google.common.collect.HashMultiset;
30
import com.google.common.collect.Multiset;
31
import com.google.common.primitives.UnsignedInteger;
32
import io.grpc.Attributes;
33
import io.grpc.ConnectivityState;
34
import io.grpc.EquivalentAddressGroup;
35
import io.grpc.InternalLogId;
36
import io.grpc.LoadBalancer;
37
import io.grpc.Status;
38
import io.grpc.SynchronizationContext;
39
import io.grpc.util.MultiChildLoadBalancer;
40
import io.grpc.xds.client.XdsLogger;
41
import io.grpc.xds.client.XdsLogger.XdsLogLevel;
42
import java.net.SocketAddress;
43
import java.util.ArrayList;
44
import java.util.Collection;
45
import java.util.Collections;
46
import java.util.HashMap;
47
import java.util.HashSet;
48
import java.util.List;
49
import java.util.Map;
50
import java.util.Set;
51
import java.util.stream.Collectors;
52
import javax.annotation.Nullable;
53

54
/**
55
 * A {@link LoadBalancer} that provides consistent hashing based load balancing to upstream hosts.
56
 * It implements the "Ketama" hashing that maps hosts onto a circle (the "ring") by hashing its
57
 * addresses. Each request is routed to a host by hashing some property of the request and finding
58
 * the nearest corresponding host clockwise around the ring. Each host is placed on the ring some
59
 * number of times proportional to its weight. With the ring partitioned appropriately, the
60
 * addition or removal of one host from a set of N hosts will affect only 1/N requests.
61
 */
62
final class RingHashLoadBalancer extends MultiChildLoadBalancer {
63
  private static final Status RPC_HASH_NOT_FOUND =
1✔
64
      Status.INTERNAL.withDescription("RPC hash not found. Probably a bug because xds resolver"
1✔
65
          + " config selector always generates a hash.");
66
  private static final XxHash64 hashFunc = XxHash64.INSTANCE;
1✔
67

68
  private final LoadBalancer.Factory lazyLbFactory =
1✔
69
      new LazyLoadBalancer.Factory(pickFirstLbProvider);
70
  private final XdsLogger logger;
71
  private final SynchronizationContext syncContext;
72
  private List<RingEntry> ring;
73

74
  RingHashLoadBalancer(Helper helper) {
75
    super(helper);
1✔
76
    syncContext = checkNotNull(helper.getSynchronizationContext(), "syncContext");
1✔
77
    logger = XdsLogger.withLogId(InternalLogId.allocate("ring_hash_lb", helper.getAuthority()));
1✔
78
    logger.log(XdsLogLevel.INFO, "Created");
1✔
79
  }
1✔
80

81
  @Override
82
  public Status acceptResolvedAddresses(ResolvedAddresses resolvedAddresses) {
83
    logger.log(XdsLogLevel.DEBUG, "Received resolution result: {0}", resolvedAddresses);
1✔
84
    List<EquivalentAddressGroup> addrList = resolvedAddresses.getAddresses();
1✔
85
    Status addressValidityStatus = validateAddrList(addrList);
1✔
86
    if (!addressValidityStatus.isOk()) {
1✔
87
      return addressValidityStatus;
1✔
88
    }
89

90
    try {
91
      resolvingAddresses = true;
1✔
92
      AcceptResolvedAddrRetVal acceptRetVal = acceptResolvedAddressesInternal(resolvedAddresses);
1✔
93
      if (!acceptRetVal.status.isOk()) {
1✔
94
        return acceptRetVal.status;
×
95
      }
96

97
      // Now do the ringhash specific logic with weights and building the ring
98
      RingHashConfig config = (RingHashConfig) resolvedAddresses.getLoadBalancingPolicyConfig();
1✔
99
      if (config == null) {
1✔
100
        throw new IllegalArgumentException("Missing RingHash configuration");
×
101
      }
102
      Map<EquivalentAddressGroup, Long> serverWeights = new HashMap<>();
1✔
103
      long totalWeight = 0L;
1✔
104
      for (EquivalentAddressGroup eag : addrList) {
1✔
105
        Long weight = eag.getAttributes().get(InternalXdsAttributes.ATTR_SERVER_WEIGHT);
1✔
106
        // Support two ways of server weighing: either multiple instances of the same address
107
        // or each address contains a per-address weight attribute. If a weight is not provided,
108
        // each occurrence of the address will be counted a weight value of one.
109
        if (weight == null) {
1✔
110
          weight = 1L;
×
111
        }
112
        totalWeight += weight;
1✔
113
        EquivalentAddressGroup addrKey = stripAttrs(eag);
1✔
114
        if (serverWeights.containsKey(addrKey)) {
1✔
115
          serverWeights.put(addrKey, serverWeights.get(addrKey) + weight);
×
116
        } else {
117
          serverWeights.put(addrKey, weight);
1✔
118
        }
119
      }
1✔
120
      // Calculate scale
121
      long minWeight = Collections.min(serverWeights.values());
1✔
122
      double normalizedMinWeight = (double) minWeight / totalWeight;
1✔
123
      // Scale up the number of hashes per host such that the least-weighted host gets a whole
124
      // number of hashes on the the ring. Other hosts might not end up with whole numbers, and
125
      // that's fine (the ring-building algorithm can handle this). This preserves the original
126
      // implementation's behavior: when weights aren't provided, all hosts should get an equal
127
      // number of hashes. In the case where this number exceeds the max_ring_size, it's scaled
128
      // back down to fit.
129
      double scale = Math.min(
1✔
130
          Math.ceil(normalizedMinWeight * config.minRingSize) / normalizedMinWeight,
1✔
131
          (double) config.maxRingSize);
132

133
      // Build the ring
134
      ring = buildRing(serverWeights, totalWeight, scale);
1✔
135

136
      // Must update channel picker before return so that new RPCs will not be routed to deleted
137
      // clusters and resolver can remove them in service config.
138
      updateOverallBalancingState();
1✔
139

140
      shutdownRemoved(acceptRetVal.removedChildren);
1✔
141
    } finally {
142
      this.resolvingAddresses = false;
1✔
143
    }
144

145
    return Status.OK;
1✔
146
  }
147

148

149
  /**
150
   * Updates the overall balancing state by aggregating the connectivity states of all subchannels.
151
   *
152
   * <p>Aggregation rules (in order of dominance):
153
   * <ol>
154
   *   <li>If there is at least one subchannel in READY state, overall state is READY</li>
155
   *   <li>If there are <em>2 or more</em> subchannels in TRANSIENT_FAILURE, overall state is
156
   *   TRANSIENT_FAILURE (to allow timely failover to another policy)</li>
157
   *   <li>If there is at least one subchannel in CONNECTING state, overall state is
158
   *   CONNECTING</li>
159
   *   <li> If there is one subchannel in TRANSIENT_FAILURE state and there is
160
   *    more than one subchannel, report CONNECTING </li>
161
   *   <li>If there is at least one subchannel in IDLE state, overall state is IDLE</li>
162
   *   <li>Otherwise, overall state is TRANSIENT_FAILURE</li>
163
   * </ol>
164
   */
165
  @Override
166
  protected void updateOverallBalancingState() {
167
    checkState(!getChildLbStates().isEmpty(), "no subchannel has been created");
1✔
168
    if (this.currentConnectivityState == SHUTDOWN) {
1✔
169
      // Ignore changes that happen after shutdown is called
170
      logger.log(XdsLogLevel.DEBUG, "UpdateOverallBalancingState called after shutdown");
×
171
      return;
×
172
    }
173

174
    // Calculate the current overall state to report
175
    int numIdle = 0;
1✔
176
    int numReady = 0;
1✔
177
    int numConnecting = 0;
1✔
178
    int numTF = 0;
1✔
179

180
    forloop:
181
    for (ChildLbState childLbState : getChildLbStates()) {
1✔
182
      ConnectivityState state = childLbState.getCurrentState();
1✔
183
      switch (state) {
1✔
184
        case READY:
185
          numReady++;
1✔
186
          break forloop;
1✔
187
        case CONNECTING:
188
          numConnecting++;
1✔
189
          break;
1✔
190
        case IDLE:
191
          numIdle++;
1✔
192
          break;
1✔
193
        case TRANSIENT_FAILURE:
194
          numTF++;
1✔
195
          break;
1✔
196
        default:
197
          // ignore it
198
      }
199
    }
1✔
200

201
    ConnectivityState overallState;
202
    if (numReady > 0) {
1✔
203
      overallState = READY;
1✔
204
    } else if (numTF >= 2) {
1✔
205
      overallState = TRANSIENT_FAILURE;
1✔
206
    } else if (numConnecting > 0) {
1✔
207
      overallState = CONNECTING;
1✔
208
    } else if (numTF == 1 && getChildLbStates().size() > 1) {
1✔
209
      overallState = CONNECTING;
1✔
210
    } else if (numIdle > 0) {
1✔
211
      overallState = IDLE;
1✔
212
    } else {
213
      overallState = TRANSIENT_FAILURE;
×
214
    }
215

216
    RingHashPicker picker = new RingHashPicker(syncContext, ring, getChildLbStates());
1✔
217
    getHelper().updateBalancingState(overallState, picker);
1✔
218
    this.currentConnectivityState = overallState;
1✔
219
  }
1✔
220

221
  @Override
222
  protected ChildLbState createChildLbState(Object key) {
223
    return new ChildLbState(key, lazyLbFactory);
1✔
224
  }
225

226
  private Status validateAddrList(List<EquivalentAddressGroup> addrList) {
227
    if (addrList.isEmpty()) {
1✔
228
      Status unavailableStatus = Status.UNAVAILABLE.withDescription("Ring hash lb error: EDS "
×
229
              + "resolution was successful, but returned server addresses are empty.");
230
      handleNameResolutionError(unavailableStatus);
×
231
      return unavailableStatus;
×
232
    }
233

234
    String dupAddrString = validateNoDuplicateAddresses(addrList);
1✔
235
    if (dupAddrString != null) {
1✔
236
      Status unavailableStatus = Status.UNAVAILABLE.withDescription("Ring hash lb error: EDS "
1✔
237
              + "resolution was successful, but there were duplicate addresses: " + dupAddrString);
238
      handleNameResolutionError(unavailableStatus);
1✔
239
      return unavailableStatus;
1✔
240
    }
241

242
    long totalWeight = 0;
1✔
243
    for (EquivalentAddressGroup eag : addrList) {
1✔
244
      Long weight = eag.getAttributes().get(InternalXdsAttributes.ATTR_SERVER_WEIGHT);
1✔
245

246
      if (weight == null) {
1✔
247
        weight = 1L;
×
248
      }
249

250
      if (weight < 0) {
1✔
251
        Status unavailableStatus = Status.UNAVAILABLE.withDescription(
1✔
252
            String.format("Ring hash lb error: EDS resolution was successful, but returned a "
1✔
253
                        + "negative weight for %s.", stripAttrs(eag)));
1✔
254
        handleNameResolutionError(unavailableStatus);
1✔
255
        return unavailableStatus;
1✔
256
      }
257
      if (weight > UnsignedInteger.MAX_VALUE.longValue()) {
1✔
258
        Status unavailableStatus = Status.UNAVAILABLE.withDescription(
1✔
259
            String.format("Ring hash lb error: EDS resolution was successful, but returned a weight"
1✔
260
                + " too large to fit in an unsigned int for %s.", stripAttrs(eag)));
1✔
261
        handleNameResolutionError(unavailableStatus);
1✔
262
        return unavailableStatus;
1✔
263
      }
264
      totalWeight += weight;
1✔
265
    }
1✔
266

267
    if (totalWeight > UnsignedInteger.MAX_VALUE.longValue()) {
1✔
268
      Status unavailableStatus = Status.UNAVAILABLE.withDescription(
1✔
269
          String.format(
1✔
270
              "Ring hash lb error: EDS resolution was successful, but returned a sum of weights too"
271
                  + " large to fit in an unsigned int (%d).", totalWeight));
1✔
272
      handleNameResolutionError(unavailableStatus);
1✔
273
      return unavailableStatus;
1✔
274
    }
275

276
    return Status.OK;
1✔
277
  }
278

279
  @Nullable
280
  private String validateNoDuplicateAddresses(List<EquivalentAddressGroup> addrList) {
281
    Set<SocketAddress> addresses = new HashSet<>();
1✔
282
    Multiset<String> dups = HashMultiset.create();
1✔
283
    for (EquivalentAddressGroup eag : addrList) {
1✔
284
      for (SocketAddress address : eag.getAddresses()) {
1✔
285
        if (!addresses.add(address)) {
1✔
286
          dups.add(address.toString());
1✔
287
        }
288
      }
1✔
289
    }
1✔
290

291
    if (!dups.isEmpty()) {
1✔
292
      return dups.entrySet().stream()
1✔
293
          .map((dup) ->
1✔
294
              String.format("Address: %s, count: %d", dup.getElement(), dup.getCount() + 1))
1✔
295
          .collect(Collectors.joining("; "));
1✔
296
    }
297

298
    return null;
1✔
299
  }
300

301
  private static List<RingEntry> buildRing(
302
      Map<EquivalentAddressGroup, Long> serverWeights, long totalWeight, double scale) {
303
    List<RingEntry> ring = new ArrayList<>();
1✔
304
    double currentHashes = 0.0;
1✔
305
    double targetHashes = 0.0;
1✔
306
    for (Map.Entry<EquivalentAddressGroup, Long> entry : serverWeights.entrySet()) {
1✔
307
      Endpoint endpoint = new Endpoint(entry.getKey());
1✔
308
      double normalizedWeight = (double) entry.getValue() / totalWeight;
1✔
309
      // Per GRFC A61 use the first address for the hash
310
      StringBuilder sb = new StringBuilder(entry.getKey().getAddresses().get(0).toString());
1✔
311
      sb.append('_');
1✔
312
      int lengthWithoutCounter = sb.length();
1✔
313
      targetHashes += scale * normalizedWeight;
1✔
314
      long i = 0L;
1✔
315
      while (currentHashes < targetHashes) {
1✔
316
        sb.append(i);
1✔
317
        long hash = hashFunc.hashAsciiString(sb.toString());
1✔
318
        ring.add(new RingEntry(hash, endpoint));
1✔
319
        i++;
1✔
320
        currentHashes++;
1✔
321
        sb.setLength(lengthWithoutCounter);
1✔
322
      }
1✔
323
    }
1✔
324
    Collections.sort(ring);
1✔
325
    return Collections.unmodifiableList(ring);
1✔
326
  }
327

328
  @SuppressWarnings("ReferenceEquality")
329
  public static EquivalentAddressGroup stripAttrs(EquivalentAddressGroup eag) {
330
    if (eag.getAttributes() == Attributes.EMPTY) {
1✔
331
      return eag;
×
332
    }
333
    return new EquivalentAddressGroup(eag.getAddresses());
1✔
334
  }
335

336
  private static final class RingHashPicker extends SubchannelPicker {
337
    private final SynchronizationContext syncContext;
338
    private final List<RingEntry> ring;
339
    // Avoid synchronization between pickSubchannel and subchannel's connectivity state change,
340
    // freeze picker's view of subchannel's connectivity state.
341
    // TODO(chengyuanzhang): can be more performance-friendly with
342
    //  IdentityHashMap<Subchannel, ConnectivityStateInfo> and RingEntry contains Subchannel.
343
    private final Map<Endpoint, SubchannelView> pickableSubchannels;  // read-only
344

345
    private RingHashPicker(
346
        SynchronizationContext syncContext, List<RingEntry> ring,
347
        Collection<ChildLbState> children) {
1✔
348
      this.syncContext = syncContext;
1✔
349
      this.ring = ring;
1✔
350
      pickableSubchannels = new HashMap<>(children.size());
1✔
351
      for (ChildLbState childLbState : children) {
1✔
352
        pickableSubchannels.put((Endpoint)childLbState.getKey(),
1✔
353
            new SubchannelView(childLbState, childLbState.getCurrentState()));
1✔
354
      }
1✔
355
    }
1✔
356

357
    // Find the ring entry with hash next to (clockwise) the RPC's hash (binary search).
358
    private int getTargetIndex(Long requestHash) {
359
      if (ring.size() <= 1) {
1✔
360
        return 0;
×
361
      }
362

363
      int low = 0;
1✔
364
      int high = ring.size() - 1;
1✔
365
      int mid = (low + high) / 2;
1✔
366
      do {
367
        long midVal = ring.get(mid).hash;
1✔
368
        long midValL = mid == 0 ? 0 : ring.get(mid - 1).hash;
1✔
369
        if (requestHash <= midVal && requestHash > midValL) {
1✔
370
          break;
1✔
371
        }
372
        if (midVal < requestHash) {
1✔
373
          low = mid + 1;
1✔
374
        } else {
375
          high =  mid - 1;
1✔
376
        }
377
        mid = (low + high) / 2;
1✔
378
      } while (mid < ring.size() && low <= high);
1✔
379
      return mid;
1✔
380
    }
381

382
    @Override
383
    public PickResult pickSubchannel(PickSubchannelArgs args) {
384
      Long requestHash = args.getCallOptions().getOption(XdsNameResolver.RPC_HASH_KEY);
1✔
385
      if (requestHash == null) {
1✔
386
        return PickResult.withError(RPC_HASH_NOT_FOUND);
×
387
      }
388

389
      int targetIndex = getTargetIndex(requestHash);
1✔
390

391
      // Per gRFC A61, because of sticky-TF with PickFirst's auto reconnect on TF, we ignore
392
      // all TF subchannels and find the first ring entry in READY, CONNECTING or IDLE.  If
393
      // CONNECTING or IDLE we return a pick with no results.  Additionally, if that entry is in
394
      // IDLE, we initiate a connection.
395
      for (int i = 0; i < ring.size(); i++) {
1✔
396
        int index = (targetIndex + i) % ring.size();
1✔
397
        SubchannelView subchannelView = pickableSubchannels.get(ring.get(index).addrKey);
1✔
398
        ChildLbState childLbState = subchannelView.childLbState;
1✔
399

400
        if (subchannelView.connectivityState  == READY) {
1✔
401
          return childLbState.getCurrentPicker().pickSubchannel(args);
1✔
402
        }
403

404
        // RPCs can be buffered if the next subchannel is pending (per A62). Otherwise, RPCs
405
        // are failed unless there is a READY connection.
406
        if (subchannelView.connectivityState == CONNECTING) {
1✔
407
          return PickResult.withNoResult();
1✔
408
        }
409

410
        if (subchannelView.connectivityState == IDLE) {
1✔
411
          syncContext.execute(() -> {
1✔
412
            childLbState.getLb().requestConnection();
1✔
413
          });
1✔
414

415
          return PickResult.withNoResult(); // Indicates that this should be retried after backoff
1✔
416
        }
417
      }
418

419
      // return the pick from the original subchannel hit by hash, which is probably an error
420
      ChildLbState originalSubchannel =
1✔
421
          pickableSubchannels.get(ring.get(targetIndex).addrKey).childLbState;
1✔
422
      return originalSubchannel.getCurrentPicker().pickSubchannel(args);
1✔
423
    }
424

425
  }
426

427
  /**
428
   * An unmodifiable view of a subchannel with state not subject to its real connectivity
429
   * state changes.
430
   */
431
  private static final class SubchannelView {
432
    private final ChildLbState childLbState;
433
    private final ConnectivityState connectivityState;
434

435
    private SubchannelView(ChildLbState childLbState, ConnectivityState state) {
1✔
436
      this.childLbState = childLbState;
1✔
437
      this.connectivityState = state;
1✔
438
    }
1✔
439
  }
440

441
  private static final class RingEntry implements Comparable<RingEntry> {
442
    private final long hash;
443
    private final Endpoint addrKey;
444

445
    private RingEntry(long hash, Endpoint addrKey) {
1✔
446
      this.hash = hash;
1✔
447
      this.addrKey = addrKey;
1✔
448
    }
1✔
449

450
    @Override
451
    public int compareTo(RingEntry entry) {
452
      return Long.compare(hash, entry.hash);
1✔
453
    }
454
  }
455

456
  /**
457
   * Configures the ring property. The larger the ring is (that is, the more hashes there are
458
   * for each provided host) the better the request distribution will reflect the desired weights.
459
   */
460
  static final class RingHashConfig {
461
    final long minRingSize;
462
    final long maxRingSize;
463

464
    RingHashConfig(long minRingSize, long maxRingSize) {
1✔
465
      checkArgument(minRingSize > 0, "minRingSize <= 0");
1✔
466
      checkArgument(maxRingSize > 0, "maxRingSize <= 0");
1✔
467
      checkArgument(minRingSize <= maxRingSize, "minRingSize > maxRingSize");
1✔
468
      this.minRingSize = minRingSize;
1✔
469
      this.maxRingSize = maxRingSize;
1✔
470
    }
1✔
471

472
    @Override
473
    public String toString() {
474
      return MoreObjects.toStringHelper(this)
×
475
          .add("minRingSize", minRingSize)
×
476
          .add("maxRingSize", maxRingSize)
×
477
          .toString();
×
478
    }
479
  }
480
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc