• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

grpc / grpc-java / #19453

06 Sep 2024 06:43PM UTC coverage: 84.514% (-0.01%) from 84.527%
#19453

push

github

web-flow
use an attribute from resolved addresses IS_PETIOLE_POLICY to control whether or not health checking is supported (#11513)

* use an attribute from resolved addresses IS_PETIOLE_POLICY to control whether or not health checking is supported so that top level versions can't do any health checking, while those under petiole policies can.

Fixes #11413

33542 of 39688 relevant lines covered (84.51%)

0.85 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

97.8
/../core/src/main/java/io/grpc/internal/PickFirstLeafLoadBalancer.java
1
/*
2
 * Copyright 2023 The gRPC Authors
3
 *
4
 * Licensed under the Apache License, Version 2.0 (the "License");
5
 * you may not use this file except in compliance with the License.
6
 * You may obtain a copy of the License at
7
 *
8
 *     http://www.apache.org/licenses/LICENSE-2.0
9
 *
10
 * Unless required by applicable law or agreed to in writing, software
11
 * distributed under the License is distributed on an "AS IS" BASIS,
12
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
 * See the License for the specific language governing permissions and
14
 * limitations under the License.
15
 */
16

17
package io.grpc.internal;
18

19
import static com.google.common.base.Preconditions.checkNotNull;
20
import static io.grpc.ConnectivityState.CONNECTING;
21
import static io.grpc.ConnectivityState.IDLE;
22
import static io.grpc.ConnectivityState.READY;
23
import static io.grpc.ConnectivityState.SHUTDOWN;
24
import static io.grpc.ConnectivityState.TRANSIENT_FAILURE;
25

26
import com.google.common.annotations.VisibleForTesting;
27
import com.google.common.base.MoreObjects;
28
import com.google.common.collect.ImmutableList;
29
import com.google.common.collect.Lists;
30
import io.grpc.Attributes;
31
import io.grpc.ConnectivityState;
32
import io.grpc.ConnectivityStateInfo;
33
import io.grpc.EquivalentAddressGroup;
34
import io.grpc.LoadBalancer;
35
import io.grpc.Status;
36
import io.grpc.SynchronizationContext.ScheduledHandle;
37
import java.net.SocketAddress;
38
import java.util.ArrayList;
39
import java.util.Collections;
40
import java.util.HashMap;
41
import java.util.HashSet;
42
import java.util.List;
43
import java.util.Map;
44
import java.util.Random;
45
import java.util.Set;
46
import java.util.concurrent.TimeUnit;
47
import java.util.concurrent.atomic.AtomicBoolean;
48
import java.util.logging.Level;
49
import java.util.logging.Logger;
50
import javax.annotation.Nullable;
51

52
/**
53
 * A {@link LoadBalancer} that provides no load-balancing over the addresses from the {@link
54
 * io.grpc.NameResolver}. The channel's default behavior is used, which is walking down the address
55
 * list and sticking to the first that works.
56
 */
57
final class PickFirstLeafLoadBalancer extends LoadBalancer {
58
  private static final Logger log = Logger.getLogger(PickFirstLeafLoadBalancer.class.getName());
1✔
59
  @VisibleForTesting
60
  static final int CONNECTION_DELAY_INTERVAL_MS = 250;
61
  private final Helper helper;
62
  private final Map<SocketAddress, SubchannelData> subchannels = new HashMap<>();
1✔
63
  private final Index addressIndex = new Index(ImmutableList.of());
1✔
64
  private int numTf = 0;
1✔
65
  private boolean firstPass = true;
1✔
66
  @Nullable
67
  private ScheduledHandle scheduleConnectionTask;
68
  private ConnectivityState rawConnectivityState = IDLE;
1✔
69
  private ConnectivityState concludedState = IDLE;
1✔
70
  private final boolean enableHappyEyeballs =
1✔
71
      PickFirstLoadBalancerProvider.isEnabledHappyEyeballs();
1✔
72
  private boolean notAPetiolePolicy = true; // means not under a petiole policy
1✔
73

74
  PickFirstLeafLoadBalancer(Helper helper) {
1✔
75
    this.helper = checkNotNull(helper, "helper");
1✔
76
  }
1✔
77

78
  @Override
79
  public Status acceptResolvedAddresses(ResolvedAddresses resolvedAddresses) {
80
    if (rawConnectivityState == SHUTDOWN) {
1✔
81
      return Status.FAILED_PRECONDITION.withDescription("Already shut down");
1✔
82
    }
83

84
    // Cache whether or not this is a petiole policy, which is based off of an address attribute
85
    Boolean isPetiolePolicy = resolvedAddresses.getAttributes().get(IS_PETIOLE_POLICY);
1✔
86
    this.notAPetiolePolicy = isPetiolePolicy == null || !isPetiolePolicy;
1✔
87

88
    List<EquivalentAddressGroup> servers = resolvedAddresses.getAddresses();
1✔
89

90
    // Validate the address list
91
    if (servers.isEmpty()) {
1✔
92
      Status unavailableStatus = Status.UNAVAILABLE.withDescription(
1✔
93
              "NameResolver returned no usable address. addrs=" + resolvedAddresses.getAddresses()
1✔
94
                      + ", attrs=" + resolvedAddresses.getAttributes());
1✔
95
      handleNameResolutionError(unavailableStatus);
1✔
96
      return unavailableStatus;
1✔
97
    }
98
    for (EquivalentAddressGroup eag : servers) {
1✔
99
      if (eag == null) {
1✔
100
        Status unavailableStatus = Status.UNAVAILABLE.withDescription(
1✔
101
            "NameResolver returned address list with null endpoint. addrs="
102
                + resolvedAddresses.getAddresses() + ", attrs="
1✔
103
                + resolvedAddresses.getAttributes());
1✔
104
        handleNameResolutionError(unavailableStatus);
1✔
105
        return unavailableStatus;
1✔
106
      }
107
    }
1✔
108

109
    // Since we have a new set of addresses, we are again at first pass
110
    firstPass = true;
1✔
111

112
    List<EquivalentAddressGroup> cleanServers = deDupAddresses(servers);
1✔
113

114
    // We can optionally be configured to shuffle the address list. This can help better distribute
115
    // the load.
116
    if (resolvedAddresses.getLoadBalancingPolicyConfig()
1✔
117
        instanceof PickFirstLeafLoadBalancerConfig) {
118
      PickFirstLeafLoadBalancerConfig config
1✔
119
          = (PickFirstLeafLoadBalancerConfig) resolvedAddresses.getLoadBalancingPolicyConfig();
1✔
120
      if (config.shuffleAddressList != null && config.shuffleAddressList) {
1✔
121
        Collections.shuffle(cleanServers,
1✔
122
            config.randomSeed != null ? new Random(config.randomSeed) : new Random());
1✔
123
      }
124
    }
125

126
    final ImmutableList<EquivalentAddressGroup> newImmutableAddressGroups =
127
        ImmutableList.<EquivalentAddressGroup>builder().addAll(cleanServers).build();
1✔
128

129
    if (rawConnectivityState == READY) {
1✔
130
      // If the previous ready subchannel exists in new address list,
131
      // keep this connection and don't create new subchannels
132
      SocketAddress previousAddress = addressIndex.getCurrentAddress();
1✔
133
      addressIndex.updateGroups(newImmutableAddressGroups);
1✔
134
      if (addressIndex.seekTo(previousAddress)) {
1✔
135
        SubchannelData subchannelData = subchannels.get(previousAddress);
1✔
136
        subchannelData.getSubchannel().updateAddresses(addressIndex.getCurrentEagAsList());
1✔
137
        return Status.OK;
1✔
138
      }
139
      // Previous ready subchannel not in the new list of addresses
140
    } else {
1✔
141
      addressIndex.updateGroups(newImmutableAddressGroups);
1✔
142
    }
143

144
    // remove old subchannels that were not in new address list
145
    Set<SocketAddress> oldAddrs = new HashSet<>(subchannels.keySet());
1✔
146

147
    // Flatten the new EAGs addresses
148
    Set<SocketAddress> newAddrs = new HashSet<>();
1✔
149
    for (EquivalentAddressGroup endpoint : newImmutableAddressGroups) {
1✔
150
      newAddrs.addAll(endpoint.getAddresses());
1✔
151
    }
1✔
152

153
    // Shut them down and remove them
154
    for (SocketAddress oldAddr : oldAddrs) {
1✔
155
      if (!newAddrs.contains(oldAddr)) {
1✔
156
        subchannels.remove(oldAddr).getSubchannel().shutdown();
1✔
157
      }
158
    }
1✔
159

160
    if (oldAddrs.size() == 0) {
1✔
161
      // Make tests happy; they don't properly assume starting in CONNECTING
162
      rawConnectivityState = CONNECTING;
1✔
163
      updateBalancingState(CONNECTING, new Picker(PickResult.withNoResult()));
1✔
164
    }
165

166
    if (rawConnectivityState == READY) {
1✔
167
      // connect from beginning when prompted
168
      rawConnectivityState = IDLE;
1✔
169
      updateBalancingState(IDLE, new RequestConnectionPicker(this));
1✔
170

171
    } else if (rawConnectivityState == CONNECTING || rawConnectivityState == TRANSIENT_FAILURE) {
1✔
172
      // start connection attempt at first address
173
      cancelScheduleTask();
1✔
174
      requestConnection();
1✔
175
    }
176

177
    return Status.OK;
1✔
178
  }
179

180
  private static List<EquivalentAddressGroup> deDupAddresses(List<EquivalentAddressGroup> groups) {
181
    Set<SocketAddress> seenAddresses = new HashSet<>();
1✔
182
    List<EquivalentAddressGroup> newGroups = new ArrayList<>();
1✔
183

184
    for (EquivalentAddressGroup group : groups) {
1✔
185
      List<SocketAddress> addrs = new ArrayList<>();
1✔
186
      for (SocketAddress addr : group.getAddresses()) {
1✔
187
        if (seenAddresses.add(addr)) {
1✔
188
          addrs.add(addr);
1✔
189
        }
190
      }
1✔
191
      if (!addrs.isEmpty()) {
1✔
192
        newGroups.add(new EquivalentAddressGroup(addrs, group.getAttributes()));
1✔
193
      }
194
    }
1✔
195

196
    return newGroups;
1✔
197
  }
198

199
  @Override
200
  public void handleNameResolutionError(Status error) {
201
    if (rawConnectivityState == SHUTDOWN) {
1✔
202
      return;
×
203
    }
204

205
    for (SubchannelData subchannelData : subchannels.values()) {
1✔
206
      subchannelData.getSubchannel().shutdown();
1✔
207
    }
1✔
208
    subchannels.clear();
1✔
209
    addressIndex.updateGroups(ImmutableList.of());
1✔
210
    rawConnectivityState = TRANSIENT_FAILURE;
1✔
211
    updateBalancingState(TRANSIENT_FAILURE, new Picker(PickResult.withError(error)));
1✔
212
  }
1✔
213

214
  void processSubchannelState(SubchannelData subchannelData, ConnectivityStateInfo stateInfo) {
215
    ConnectivityState newState = stateInfo.getState();
1✔
216

217
    // Shutdown channels/previously relevant subchannels can still callback with state updates.
218
    // To prevent pickers from returning these obsolete subchannels, this logic
219
    // is included to check if the current list of active subchannels includes this subchannel.
220
    if (subchannelData != subchannels.get(getAddress(subchannelData.subchannel))) {
1✔
221
      return;
1✔
222
    }
223

224
    if (newState == SHUTDOWN) {
1✔
225
      return;
1✔
226
    }
227

228
    if (newState == IDLE) {
1✔
229
      helper.refreshNameResolution();
1✔
230
    }
231
    // If we are transitioning from a TRANSIENT_FAILURE to CONNECTING or IDLE we ignore this state
232
    // transition and still keep the LB in TRANSIENT_FAILURE state. This is referred to as "sticky
233
    // transient failure". Only a subchannel state change to READY will get the LB out of
234
    // TRANSIENT_FAILURE. If the state is IDLE we additionally request a new connection so that we
235
    // keep retrying for a connection.
236

237
    // With the new pick first implementation, individual subchannels will have their own backoff
238
    // on a per-address basis. Thus, iterative requests for connections will not be requested
239
    // once the first pass through is complete.
240
    // However, every time there is an address update, we will perform a pass through for the new
241
    // addresses in the updated list.
242
    subchannelData.updateState(newState);
1✔
243
    if (rawConnectivityState == TRANSIENT_FAILURE || concludedState == TRANSIENT_FAILURE)  {
1✔
244
      if (newState == CONNECTING) {
1✔
245
        // each subchannel is responsible for its own backoff
246
        return;
1✔
247
      } else if (newState == IDLE) {
1✔
248
        requestConnection();
1✔
249
        return;
1✔
250
      }
251
    }
252

253
    switch (newState) {
1✔
254
      case IDLE:
255
        // Shutdown when ready: connect from beginning when prompted
256
        addressIndex.reset();
1✔
257
        rawConnectivityState = IDLE;
1✔
258
        updateBalancingState(IDLE, new RequestConnectionPicker(this));
1✔
259
        break;
1✔
260

261
      case CONNECTING:
262
        rawConnectivityState = CONNECTING;
1✔
263
        updateBalancingState(CONNECTING, new Picker(PickResult.withNoResult()));
1✔
264
        break;
1✔
265

266
      case READY:
267
        shutdownRemaining(subchannelData);
1✔
268
        addressIndex.seekTo(getAddress(subchannelData.subchannel));
1✔
269
        rawConnectivityState = READY;
1✔
270
        updateHealthCheckedState(subchannelData);
1✔
271
        break;
1✔
272

273
      case TRANSIENT_FAILURE:
274
        // If we are looking at current channel, request a connection if possible
275
        if (addressIndex.isValid()
1✔
276
            && subchannels.get(addressIndex.getCurrentAddress()) == subchannelData) {
1✔
277
          if (addressIndex.increment()) {
1✔
278
            cancelScheduleTask();
1✔
279
            requestConnection(); // is recursive so might hit the end of the addresses
1✔
280
          }
281
        }
282

283
        if (isPassComplete()) {
1✔
284
          rawConnectivityState = TRANSIENT_FAILURE;
1✔
285
          updateBalancingState(TRANSIENT_FAILURE,
1✔
286
              new Picker(PickResult.withError(stateInfo.getStatus())));
1✔
287

288
          // Refresh Name Resolution, but only when all 3 conditions are met
289
          // * We are at the end of addressIndex
290
          // * have had status reported for all subchannels.
291
          // * And one of the following conditions:
292
          //    * Have had enough TF reported since we completed first pass
293
          //    * Just completed the first pass
294
          if (++numTf >= addressIndex.size() || firstPass) {
1✔
295
            firstPass = false;
1✔
296
            numTf = 0;
1✔
297
            helper.refreshNameResolution();
1✔
298
          }
299
        }
300
        break;
301

302
      default:
303
        throw new IllegalArgumentException("Unsupported state:" + newState);
×
304
    }
305
  }
1✔
306

307
  private void updateHealthCheckedState(SubchannelData subchannelData) {
308
    if (subchannelData.state != READY) {
1✔
309
      return;
1✔
310
    }
311

312
    if (notAPetiolePolicy || subchannelData.getHealthState() == READY) {
1✔
313
      updateBalancingState(READY,
1✔
314
          new FixedResultPicker(PickResult.withSubchannel(subchannelData.subchannel)));
1✔
315
    } else if (subchannelData.getHealthState() == TRANSIENT_FAILURE) {
1✔
316
      updateBalancingState(TRANSIENT_FAILURE, new Picker(PickResult.withError(
1✔
317
          subchannelData.healthStateInfo.getStatus())));
1✔
318
    } else if (concludedState != TRANSIENT_FAILURE) {
1✔
319
      updateBalancingState(subchannelData.getHealthState(),
1✔
320
          new Picker(PickResult.withNoResult()));
1✔
321
    }
322
  }
1✔
323

324
  private void updateBalancingState(ConnectivityState state, SubchannelPicker picker) {
325
    // an optimization: de-dup IDLE or CONNECTING notification.
326
    if (state == concludedState && (state == IDLE || state == CONNECTING)) {
1✔
327
      return;
1✔
328
    }
329
    concludedState = state;
1✔
330
    helper.updateBalancingState(state, picker);
1✔
331
  }
1✔
332

333
  @Override
334
  public void shutdown() {
335
    log.log(Level.FINE,
1✔
336
        "Shutting down, currently have {} subchannels created", subchannels.size());
1✔
337
    rawConnectivityState = SHUTDOWN;
1✔
338
    concludedState = SHUTDOWN;
1✔
339
    cancelScheduleTask();
1✔
340

341
    for (SubchannelData subchannelData : subchannels.values()) {
1✔
342
      subchannelData.getSubchannel().shutdown();
1✔
343
    }
1✔
344

345
    subchannels.clear();
1✔
346
  }
1✔
347

348
  /**
349
  * Shuts down remaining subchannels. Called when a subchannel becomes ready, which means
350
  * that all other subchannels must be shutdown.
351
  */
352
  private void shutdownRemaining(SubchannelData activeSubchannelData) {
353
    cancelScheduleTask();
1✔
354
    for (SubchannelData subchannelData : subchannels.values()) {
1✔
355
      if (!subchannelData.getSubchannel().equals(activeSubchannelData.subchannel)) {
1✔
356
        subchannelData.getSubchannel().shutdown();
1✔
357
      }
358
    }
1✔
359
    subchannels.clear();
1✔
360
    activeSubchannelData.updateState(READY);
1✔
361
    subchannels.put(getAddress(activeSubchannelData.subchannel), activeSubchannelData);
1✔
362
  }
1✔
363

364
  /**
365
   * Requests a connection to the next applicable address' subchannel, creating one if necessary.
366
   * Schedules a connection to next address in list as well.
367
   * If the current channel has already attempted a connection, we attempt a connection
368
   * to the next address/subchannel in our list.  We assume that createNewSubchannel will never
369
   * return null.
370
   */
371
  @Override
372
  public void requestConnection() {
373
    if (!addressIndex.isValid() || rawConnectivityState == SHUTDOWN) {
1✔
374
      return;
1✔
375
    }
376

377
    SocketAddress currentAddress = addressIndex.getCurrentAddress();
1✔
378
    SubchannelData subchannelData = subchannels.get(currentAddress);
1✔
379
    if (subchannelData == null) {
1✔
380
      subchannelData = createNewSubchannel(currentAddress, addressIndex.getCurrentEagAttributes());
1✔
381
    }
382

383
    ConnectivityState subchannelState = subchannelData.getState();
1✔
384
    switch (subchannelState) {
1✔
385
      case IDLE:
386
        subchannelData.subchannel.requestConnection();
1✔
387
        subchannelData.updateState(CONNECTING);
1✔
388
        scheduleNextConnection();
1✔
389
        break;
1✔
390
      case CONNECTING:
391
        scheduleNextConnection();
1✔
392
        break;
1✔
393
      case TRANSIENT_FAILURE:
394
        addressIndex.increment();
1✔
395
        requestConnection();
1✔
396
        break;
1✔
397
      default:
398
        // Wait for current subchannel to change state
399
    }
400
  }
1✔
401

402

403
  /**
404
  * Happy Eyeballs
405
  * Schedules connection attempt to happen after a delay to the next available address.
406
  */
407
  private void scheduleNextConnection() {
408
    if (!enableHappyEyeballs
1✔
409
        || (scheduleConnectionTask != null && scheduleConnectionTask.isPending())) {
1✔
410
      return;
1✔
411
    }
412

413
    class StartNextConnection implements Runnable {
1✔
414
      @Override
415
      public void run() {
416
        scheduleConnectionTask = null;
1✔
417
        if (addressIndex.increment()) {
1✔
418
          requestConnection();
1✔
419
        }
420
      }
1✔
421
    }
422

423
    scheduleConnectionTask = helper.getSynchronizationContext().schedule(
1✔
424
        new StartNextConnection(),
425
        CONNECTION_DELAY_INTERVAL_MS,
426
        TimeUnit.MILLISECONDS,
427
        helper.getScheduledExecutorService());
1✔
428
  }
1✔
429

430
  private void cancelScheduleTask() {
431
    if (scheduleConnectionTask != null) {
1✔
432
      scheduleConnectionTask.cancel();
1✔
433
      scheduleConnectionTask = null;
1✔
434
    }
435
  }
1✔
436

437
  private SubchannelData createNewSubchannel(SocketAddress addr, Attributes attrs) {
438
    HealthListener hcListener = new HealthListener();
1✔
439
    final Subchannel subchannel = helper.createSubchannel(
1✔
440
        CreateSubchannelArgs.newBuilder()
1✔
441
        .setAddresses(Lists.newArrayList(
1✔
442
            new EquivalentAddressGroup(addr, attrs)))
443
        .addOption(HEALTH_CONSUMER_LISTENER_ARG_KEY, hcListener)
1✔
444
            .build());
1✔
445
    if (subchannel == null) {
1✔
446
      log.warning("Was not able to create subchannel for " + addr);
×
447
      throw new IllegalStateException("Can't create subchannel");
×
448
    }
449
    SubchannelData subchannelData = new SubchannelData(subchannel, IDLE);
1✔
450
    hcListener.subchannelData = subchannelData;
1✔
451
    subchannels.put(addr, subchannelData);
1✔
452
    Attributes scAttrs = subchannel.getAttributes();
1✔
453
    if (notAPetiolePolicy || scAttrs.get(LoadBalancer.HAS_HEALTH_PRODUCER_LISTENER_KEY) == null) {
1✔
454
      subchannelData.healthStateInfo = ConnectivityStateInfo.forNonError(READY);
1✔
455
    }
456
    subchannel.start(stateInfo -> processSubchannelState(subchannelData, stateInfo));
1✔
457
    return subchannelData;
1✔
458
  }
459

460
  private boolean isPassComplete() {
461
    if (addressIndex.isValid() || subchannels.size() < addressIndex.size()) {
1✔
462
      return false;
1✔
463
    }
464
    for (SubchannelData sc : subchannels.values()) {
1✔
465
      if (!sc.isCompletedConnectivityAttempt() ) {
1✔
466
        return false;
1✔
467
      }
468
    }
1✔
469
    return true;
1✔
470
  }
471

472
  private final class HealthListener implements SubchannelStateListener {
1✔
473
    private SubchannelData subchannelData;
474

475
    @Override
476
    public void onSubchannelState(ConnectivityStateInfo newState) {
477
      if (notAPetiolePolicy) {
1✔
478
        log.log(Level.WARNING,
1✔
479
            "Ignoring health status {0} for subchannel {1} as this is not under a petiole policy",
480
            new Object[]{newState, subchannelData.subchannel});
1✔
481
        return;
1✔
482
      }
483

484
      log.log(Level.FINE, "Received health status {0} for subchannel {1}",
1✔
485
          new Object[]{newState, subchannelData.subchannel});
1✔
486
      subchannelData.healthStateInfo = newState;
1✔
487
      if (addressIndex.isValid()
1✔
488
          && subchannelData == subchannels.get(addressIndex.getCurrentAddress())) {
1✔
489
        updateHealthCheckedState(subchannelData);
1✔
490
      }
491
    }
1✔
492
  }
493

494
  private SocketAddress getAddress(Subchannel subchannel) {
495
    return subchannel.getAddresses().getAddresses().get(0);
1✔
496
  }
497

498
  @VisibleForTesting
499
  ConnectivityState getConcludedConnectivityState() {
500
    return this.concludedState;
1✔
501
  }
502

503
  /**
504
   * No-op picker which doesn't add any custom picking logic. It just passes already known result
505
   * received in constructor.
506
   */
507
  private static final class Picker extends SubchannelPicker {
508
    private final PickResult result;
509

510
    Picker(PickResult result) {
1✔
511
      this.result = checkNotNull(result, "result");
1✔
512
    }
1✔
513

514
    @Override
515
    public PickResult pickSubchannel(PickSubchannelArgs args) {
516
      return result;
1✔
517
    }
518

519
    @Override
520
    public String toString() {
521
      return MoreObjects.toStringHelper(Picker.class).add("result", result).toString();
×
522
    }
523
  }
524

525
  /**
526
   * Picker that requests connection during the first pick, and returns noResult.
527
   */
528
  private final class RequestConnectionPicker extends SubchannelPicker {
529
    private final PickFirstLeafLoadBalancer pickFirstLeafLoadBalancer;
530
    private final AtomicBoolean connectionRequested = new AtomicBoolean(false);
1✔
531

532
    RequestConnectionPicker(PickFirstLeafLoadBalancer pickFirstLeafLoadBalancer) {
1✔
533
      this.pickFirstLeafLoadBalancer =
1✔
534
          checkNotNull(pickFirstLeafLoadBalancer, "pickFirstLeafLoadBalancer");
1✔
535
    }
1✔
536

537
    @Override
538
    public PickResult pickSubchannel(PickSubchannelArgs args) {
539
      if (connectionRequested.compareAndSet(false, true)) {
1✔
540
        helper.getSynchronizationContext().execute(pickFirstLeafLoadBalancer::requestConnection);
1✔
541
      }
542
      return PickResult.withNoResult();
1✔
543
    }
544
  }
545

546
  /**
547
   * Index as in 'i', the pointer to an entry. Not a "search index."
548
   * All updates should be done in a synchronization context.
549
   */
550
  @VisibleForTesting
551
  static final class Index {
552
    private List<EquivalentAddressGroup> addressGroups;
553
    private int size;
554
    private int groupIndex;
555
    private int addressIndex;
556

557
    public Index(List<EquivalentAddressGroup> groups) {
1✔
558
      updateGroups(groups);
1✔
559
    }
1✔
560

561
    public boolean isValid() {
562
      // Is invalid if empty or has incremented off the end
563
      return groupIndex < addressGroups.size();
1✔
564
    }
565

566
    public boolean isAtBeginning() {
567
      return groupIndex == 0 && addressIndex == 0;
1✔
568
    }
569

570
    /**
571
     * Move to next address in group.  If last address in group move to first address of next group.
572
     * @return false if went off end of the list, otherwise true
573
     */
574
    public boolean increment() {
575
      if (!isValid()) {
1✔
576
        return false;
1✔
577
      }
578

579
      EquivalentAddressGroup group = addressGroups.get(groupIndex);
1✔
580
      addressIndex++;
1✔
581
      if (addressIndex >= group.getAddresses().size()) {
1✔
582
        groupIndex++;
1✔
583
        addressIndex = 0;
1✔
584
        return groupIndex < addressGroups.size();
1✔
585
      }
586

587
      return true;
1✔
588
    }
589

590
    public void reset() {
591
      groupIndex = 0;
1✔
592
      addressIndex = 0;
1✔
593
    }
1✔
594

595
    public SocketAddress getCurrentAddress() {
596
      if (!isValid()) {
1✔
597
        throw new IllegalStateException("Index is past the end of the address group list");
×
598
      }
599
      return addressGroups.get(groupIndex).getAddresses().get(addressIndex);
1✔
600
    }
601

602
    public Attributes getCurrentEagAttributes() {
603
      if (!isValid()) {
1✔
604
        throw new IllegalStateException("Index is off the end of the address group list");
×
605
      }
606
      return addressGroups.get(groupIndex).getAttributes();
1✔
607
    }
608

609
    public List<EquivalentAddressGroup> getCurrentEagAsList() {
610
      return Collections.singletonList(
1✔
611
          new EquivalentAddressGroup(getCurrentAddress(), getCurrentEagAttributes()));
1✔
612
    }
613

614
    /**
615
     * Update to new groups, resetting the current index.
616
     */
617
    public void updateGroups(List<EquivalentAddressGroup> newGroups) {
618
      addressGroups = checkNotNull(newGroups, "newGroups");
1✔
619
      reset();
1✔
620
      int size = 0;
1✔
621
      for (EquivalentAddressGroup eag : newGroups) {
1✔
622
        size += eag.getAddresses().size();
1✔
623
      }
1✔
624
      this.size = size;
1✔
625
    }
1✔
626

627
    /**
628
     * Returns false if the needle was not found and the current index was left unchanged.
629
     */
630
    public boolean seekTo(SocketAddress needle) {
631
      for (int i = 0; i < addressGroups.size(); i++) {
1✔
632
        EquivalentAddressGroup group = addressGroups.get(i);
1✔
633
        int j = group.getAddresses().indexOf(needle);
1✔
634
        if (j == -1) {
1✔
635
          continue;
1✔
636
        }
637
        this.groupIndex = i;
1✔
638
        this.addressIndex = j;
1✔
639
        return true;
1✔
640
      }
641
      return false;
1✔
642
    }
643

644
    public int size() {
645
      return size;
1✔
646
    }
647
  }
648

649
  private static final class SubchannelData {
650
    private final Subchannel subchannel;
651
    private ConnectivityState state;
652
    private boolean completedConnectivityAttempt = false;
1✔
653
    private ConnectivityStateInfo healthStateInfo = ConnectivityStateInfo.forNonError(IDLE);
1✔
654

655
    public SubchannelData(Subchannel subchannel, ConnectivityState state) {
1✔
656
      this.subchannel = subchannel;
1✔
657
      this.state = state;
1✔
658
    }
1✔
659

660
    public Subchannel getSubchannel() {
661
      return this.subchannel;
1✔
662
    }
663

664
    public ConnectivityState getState() {
665
      return this.state;
1✔
666
    }
667

668
    public boolean isCompletedConnectivityAttempt() {
669
      return completedConnectivityAttempt;
1✔
670
    }
671

672
    private void updateState(ConnectivityState newState) {
673
      this.state = newState;
1✔
674
      if (newState == READY || newState == TRANSIENT_FAILURE) {
1✔
675
        completedConnectivityAttempt = true;
1✔
676
      } else if (newState == IDLE) {
1✔
677
        completedConnectivityAttempt = false;
1✔
678
      }
679
    }
1✔
680

681
    private ConnectivityState getHealthState() {
682
      return healthStateInfo.getState();
1✔
683
    }
684
  }
685

686
  public static final class PickFirstLeafLoadBalancerConfig {
687

688
    @Nullable
689
    public final Boolean shuffleAddressList;
690

691
    // For testing purposes only, not meant to be parsed from a real config.
692
    @Nullable
693
    final Long randomSeed;
694

695
    public PickFirstLeafLoadBalancerConfig(@Nullable Boolean shuffleAddressList) {
696
      this(shuffleAddressList, null);
1✔
697
    }
1✔
698

699
    PickFirstLeafLoadBalancerConfig(@Nullable Boolean shuffleAddressList,
700
        @Nullable Long randomSeed) {
1✔
701
      this.shuffleAddressList = shuffleAddressList;
1✔
702
      this.randomSeed = randomSeed;
1✔
703
    }
1✔
704
  }
705
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc