• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

grpc / grpc-java / #19493

03 Oct 2024 12:03AM UTC coverage: 84.599% (+0.01%) from 84.586%
#19493

push

github

web-flow
Change PickFirstLeafLoadBalancer to only have 1 subchannel at a time (#11520)

* Change PickFirstLeafLoadBalancer to only have 1 subchannel at a time if environment variable GRPC_SERIALIZE_RETRIES == true.

Cache serializingRetries value so that it doesn't have to look up the flag every time.

Clear the correct task when READY in processSubchannelState and move the logic to cancelScheduledTasks

Cleanup based on PR review

remove unneeded checks for shutdown.

* Fix previously broken tests

* Shutdown previous subchannel when run off end of index.

* Provide option to disable subchannel retries to let PFLeafLB take control of retries.

* InternalSubchannel internally goes to IDLE when sees TF when reconnect is disabled.
Remove an extra index.increment in LeafLB

33705 of 39841 relevant lines covered (84.6%)

0.85 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

97.46
/../core/src/main/java/io/grpc/internal/PickFirstLeafLoadBalancer.java
1
/*
2
 * Copyright 2023 The gRPC Authors
3
 *
4
 * Licensed under the Apache License, Version 2.0 (the "License");
5
 * you may not use this file except in compliance with the License.
6
 * You may obtain a copy of the License at
7
 *
8
 *     http://www.apache.org/licenses/LICENSE-2.0
9
 *
10
 * Unless required by applicable law or agreed to in writing, software
11
 * distributed under the License is distributed on an "AS IS" BASIS,
12
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
 * See the License for the specific language governing permissions and
14
 * limitations under the License.
15
 */
16

17
package io.grpc.internal;
18

19
import static com.google.common.base.Preconditions.checkNotNull;
20
import static io.grpc.ConnectivityState.CONNECTING;
21
import static io.grpc.ConnectivityState.IDLE;
22
import static io.grpc.ConnectivityState.READY;
23
import static io.grpc.ConnectivityState.SHUTDOWN;
24
import static io.grpc.ConnectivityState.TRANSIENT_FAILURE;
25

26
import com.google.common.annotations.VisibleForTesting;
27
import com.google.common.base.MoreObjects;
28
import com.google.common.collect.ImmutableList;
29
import com.google.common.collect.Lists;
30
import io.grpc.Attributes;
31
import io.grpc.ConnectivityState;
32
import io.grpc.ConnectivityStateInfo;
33
import io.grpc.EquivalentAddressGroup;
34
import io.grpc.LoadBalancer;
35
import io.grpc.Status;
36
import io.grpc.SynchronizationContext.ScheduledHandle;
37
import java.net.SocketAddress;
38
import java.util.ArrayList;
39
import java.util.Collections;
40
import java.util.HashMap;
41
import java.util.HashSet;
42
import java.util.List;
43
import java.util.Map;
44
import java.util.Random;
45
import java.util.Set;
46
import java.util.concurrent.TimeUnit;
47
import java.util.concurrent.atomic.AtomicBoolean;
48
import java.util.logging.Level;
49
import java.util.logging.Logger;
50
import javax.annotation.Nullable;
51

52
/**
53
 * A {@link LoadBalancer} that provides no load-balancing over the addresses from the {@link
54
 * io.grpc.NameResolver}. The channel's default behavior is used, which is walking down the address
55
 * list and sticking to the first that works.
56
 */
57
final class PickFirstLeafLoadBalancer extends LoadBalancer {
58
  private static final Logger log = Logger.getLogger(PickFirstLeafLoadBalancer.class.getName());
1✔
59
  @VisibleForTesting
60
  static final int CONNECTION_DELAY_INTERVAL_MS = 250;
61
  private final Helper helper;
62
  private final Map<SocketAddress, SubchannelData> subchannels = new HashMap<>();
1✔
63
  private final Index addressIndex = new Index(ImmutableList.of());
1✔
64
  private int numTf = 0;
1✔
65
  private boolean firstPass = true;
1✔
66
  @Nullable
1✔
67
  private ScheduledHandle scheduleConnectionTask = null;
68
  private ConnectivityState rawConnectivityState = IDLE;
1✔
69
  private ConnectivityState concludedState = IDLE;
1✔
70
  private final boolean enableHappyEyeballs = !isSerializingRetries()
1✔
71
      && PickFirstLoadBalancerProvider.isEnabledHappyEyeballs();
1✔
72
  private boolean notAPetiolePolicy = true; // means not under a petiole policy
1✔
73
  private final BackoffPolicy.Provider bkoffPolProvider = new ExponentialBackoffPolicy.Provider();
1✔
74
  private BackoffPolicy reconnectPolicy;
75
  @Nullable
1✔
76
  private ScheduledHandle reconnectTask = null;
77
  private final boolean serializingRetries = isSerializingRetries();
1✔
78

79
  PickFirstLeafLoadBalancer(Helper helper) {
1✔
80
    this.helper = checkNotNull(helper, "helper");
1✔
81
  }
1✔
82

83
  static boolean isSerializingRetries() {
84
    return GrpcUtil.getFlag("GRPC_SERIALIZE_RETRIES", false);
1✔
85
  }
86

87
  @Override
88
  public Status acceptResolvedAddresses(ResolvedAddresses resolvedAddresses) {
89
    if (rawConnectivityState == SHUTDOWN) {
1✔
90
      return Status.FAILED_PRECONDITION.withDescription("Already shut down");
1✔
91
    }
92

93
    // Cache whether or not this is a petiole policy, which is based off of an address attribute
94
    Boolean isPetiolePolicy = resolvedAddresses.getAttributes().get(IS_PETIOLE_POLICY);
1✔
95
    this.notAPetiolePolicy = isPetiolePolicy == null || !isPetiolePolicy;
1✔
96

97
    List<EquivalentAddressGroup> servers = resolvedAddresses.getAddresses();
1✔
98

99
    // Validate the address list
100
    if (servers.isEmpty()) {
1✔
101
      Status unavailableStatus = Status.UNAVAILABLE.withDescription(
1✔
102
              "NameResolver returned no usable address. addrs=" + resolvedAddresses.getAddresses()
1✔
103
                      + ", attrs=" + resolvedAddresses.getAttributes());
1✔
104
      handleNameResolutionError(unavailableStatus);
1✔
105
      return unavailableStatus;
1✔
106
    }
107
    for (EquivalentAddressGroup eag : servers) {
1✔
108
      if (eag == null) {
1✔
109
        Status unavailableStatus = Status.UNAVAILABLE.withDescription(
1✔
110
            "NameResolver returned address list with null endpoint. addrs="
111
                + resolvedAddresses.getAddresses() + ", attrs="
1✔
112
                + resolvedAddresses.getAttributes());
1✔
113
        handleNameResolutionError(unavailableStatus);
1✔
114
        return unavailableStatus;
1✔
115
      }
116
    }
1✔
117

118
    // Since we have a new set of addresses, we are again at first pass
119
    firstPass = true;
1✔
120

121
    List<EquivalentAddressGroup> cleanServers = deDupAddresses(servers);
1✔
122

123
    // We can optionally be configured to shuffle the address list. This can help better distribute
124
    // the load.
125
    if (resolvedAddresses.getLoadBalancingPolicyConfig()
1✔
126
        instanceof PickFirstLeafLoadBalancerConfig) {
127
      PickFirstLeafLoadBalancerConfig config
1✔
128
          = (PickFirstLeafLoadBalancerConfig) resolvedAddresses.getLoadBalancingPolicyConfig();
1✔
129
      if (config.shuffleAddressList != null && config.shuffleAddressList) {
1✔
130
        Collections.shuffle(cleanServers,
1✔
131
            config.randomSeed != null ? new Random(config.randomSeed) : new Random());
1✔
132
      }
133
    }
134

135
    final ImmutableList<EquivalentAddressGroup> newImmutableAddressGroups =
136
        ImmutableList.<EquivalentAddressGroup>builder().addAll(cleanServers).build();
1✔
137

138
    if (rawConnectivityState == READY) {
1✔
139
      // If the previous ready subchannel exists in new address list,
140
      // keep this connection and don't create new subchannels
141
      SocketAddress previousAddress = addressIndex.getCurrentAddress();
1✔
142
      addressIndex.updateGroups(newImmutableAddressGroups);
1✔
143
      if (addressIndex.seekTo(previousAddress)) {
1✔
144
        SubchannelData subchannelData = subchannels.get(previousAddress);
1✔
145
        subchannelData.getSubchannel().updateAddresses(addressIndex.getCurrentEagAsList());
1✔
146
        return Status.OK;
1✔
147
      }
148
      // Previous ready subchannel not in the new list of addresses
149
    } else {
1✔
150
      addressIndex.updateGroups(newImmutableAddressGroups);
1✔
151
    }
152

153
    // remove old subchannels that were not in new address list
154
    Set<SocketAddress> oldAddrs = new HashSet<>(subchannels.keySet());
1✔
155

156
    // Flatten the new EAGs addresses
157
    Set<SocketAddress> newAddrs = new HashSet<>();
1✔
158
    for (EquivalentAddressGroup endpoint : newImmutableAddressGroups) {
1✔
159
      newAddrs.addAll(endpoint.getAddresses());
1✔
160
    }
1✔
161

162
    // Shut them down and remove them
163
    for (SocketAddress oldAddr : oldAddrs) {
1✔
164
      if (!newAddrs.contains(oldAddr)) {
1✔
165
        subchannels.remove(oldAddr).getSubchannel().shutdown();
1✔
166
      }
167
    }
1✔
168

169
    if (oldAddrs.size() == 0) {
1✔
170
      // Make tests happy; they don't properly assume starting in CONNECTING
171
      rawConnectivityState = CONNECTING;
1✔
172
      updateBalancingState(CONNECTING, new Picker(PickResult.withNoResult()));
1✔
173
    }
174

175
    if (rawConnectivityState == READY) {
1✔
176
      // connect from beginning when prompted
177
      rawConnectivityState = IDLE;
1✔
178
      updateBalancingState(IDLE, new RequestConnectionPicker(this));
1✔
179

180
    } else if (rawConnectivityState == CONNECTING || rawConnectivityState == TRANSIENT_FAILURE) {
1✔
181
      // start connection attempt at first address
182
      cancelScheduleTask();
1✔
183
      requestConnection();
1✔
184
    }
185

186
    return Status.OK;
1✔
187
  }
188

189
  private static List<EquivalentAddressGroup> deDupAddresses(List<EquivalentAddressGroup> groups) {
190
    Set<SocketAddress> seenAddresses = new HashSet<>();
1✔
191
    List<EquivalentAddressGroup> newGroups = new ArrayList<>();
1✔
192

193
    for (EquivalentAddressGroup group : groups) {
1✔
194
      List<SocketAddress> addrs = new ArrayList<>();
1✔
195
      for (SocketAddress addr : group.getAddresses()) {
1✔
196
        if (seenAddresses.add(addr)) {
1✔
197
          addrs.add(addr);
1✔
198
        }
199
      }
1✔
200
      if (!addrs.isEmpty()) {
1✔
201
        newGroups.add(new EquivalentAddressGroup(addrs, group.getAttributes()));
1✔
202
      }
203
    }
1✔
204

205
    return newGroups;
1✔
206
  }
207

208
  @Override
209
  public void handleNameResolutionError(Status error) {
210
    if (rawConnectivityState == SHUTDOWN) {
1✔
211
      return;
×
212
    }
213

214
    for (SubchannelData subchannelData : subchannels.values()) {
1✔
215
      subchannelData.getSubchannel().shutdown();
1✔
216
    }
1✔
217
    subchannels.clear();
1✔
218
    addressIndex.updateGroups(ImmutableList.of());
1✔
219
    rawConnectivityState = TRANSIENT_FAILURE;
1✔
220
    updateBalancingState(TRANSIENT_FAILURE, new Picker(PickResult.withError(error)));
1✔
221
  }
1✔
222

223
  void processSubchannelState(SubchannelData subchannelData, ConnectivityStateInfo stateInfo) {
224
    ConnectivityState newState = stateInfo.getState();
1✔
225

226
    // Shutdown channels/previously relevant subchannels can still callback with state updates.
227
    // To prevent pickers from returning these obsolete subchannels, this logic
228
    // is included to check if the current list of active subchannels includes this subchannel.
229
    if (subchannelData != subchannels.get(getAddress(subchannelData.subchannel))) {
1✔
230
      return;
1✔
231
    }
232

233
    if (newState == SHUTDOWN) {
1✔
234
      return;
1✔
235
    }
236

237
    if (newState == IDLE && subchannelData.state == READY) {
1✔
238
      helper.refreshNameResolution();
1✔
239
    }
240

241
    // If we are transitioning from a TRANSIENT_FAILURE to CONNECTING or IDLE we ignore this state
242
    // transition and still keep the LB in TRANSIENT_FAILURE state. This is referred to as "sticky
243
    // transient failure". Only a subchannel state change to READY will get the LB out of
244
    // TRANSIENT_FAILURE. If the state is IDLE we additionally request a new connection so that we
245
    // keep retrying for a connection.
246

247
    // With the new pick first implementation, individual subchannels will have their own backoff
248
    // on a per-address basis. Thus, iterative requests for connections will not be requested
249
    // once the first pass through is complete.
250
    // However, every time there is an address update, we will perform a pass through for the new
251
    // addresses in the updated list.
252
    subchannelData.updateState(newState);
1✔
253
    if (rawConnectivityState == TRANSIENT_FAILURE || concludedState == TRANSIENT_FAILURE)  {
1✔
254
      if (newState == CONNECTING) {
1✔
255
        // each subchannel is responsible for its own backoff
256
        return;
1✔
257
      } else if (newState == IDLE) {
1✔
258
        requestConnection();
1✔
259
        return;
1✔
260
      }
261
    }
262

263
    switch (newState) {
1✔
264
      case IDLE:
265
        // Shutdown when ready: connect from beginning when prompted
266
        addressIndex.reset();
1✔
267
        rawConnectivityState = IDLE;
1✔
268
        updateBalancingState(IDLE, new RequestConnectionPicker(this));
1✔
269
        break;
1✔
270

271
      case CONNECTING:
272
        rawConnectivityState = CONNECTING;
1✔
273
        updateBalancingState(CONNECTING, new Picker(PickResult.withNoResult()));
1✔
274
        break;
1✔
275

276
      case READY:
277
        shutdownRemaining(subchannelData);
1✔
278
        addressIndex.seekTo(getAddress(subchannelData.subchannel));
1✔
279
        rawConnectivityState = READY;
1✔
280
        updateHealthCheckedState(subchannelData);
1✔
281
        break;
1✔
282

283
      case TRANSIENT_FAILURE:
284
        // If we are looking at current channel, request a connection if possible
285
        if (addressIndex.isValid()
1✔
286
            && subchannels.get(addressIndex.getCurrentAddress()) == subchannelData) {
1✔
287
          if (addressIndex.increment()) {
1✔
288
            cancelScheduleTask();
1✔
289
            requestConnection(); // is recursive so might hit the end of the addresses
1✔
290
          } else {
291
            scheduleBackoff();
1✔
292
          }
293
        }
294

295
        if (isPassComplete()) {
1✔
296
          rawConnectivityState = TRANSIENT_FAILURE;
1✔
297
          updateBalancingState(TRANSIENT_FAILURE,
1✔
298
              new Picker(PickResult.withError(stateInfo.getStatus())));
1✔
299

300
          // Refresh Name Resolution, but only when all 3 conditions are met
301
          // * We are at the end of addressIndex
302
          // * have had status reported for all subchannels.
303
          // * And one of the following conditions:
304
          //    * Have had enough TF reported since we completed first pass
305
          //    * Just completed the first pass
306
          if (++numTf >= addressIndex.size() || firstPass) {
1✔
307
            firstPass = false;
1✔
308
            numTf = 0;
1✔
309
            helper.refreshNameResolution();
1✔
310
          }
311
        }
312
        break;
313

314
      default:
315
        throw new IllegalArgumentException("Unsupported state:" + newState);
×
316
    }
317
  }
1✔
318

319
  /**
320
   * Only called after all addresses attempted and failed (TRANSIENT_FAILURE).
321
   */
322
  private void scheduleBackoff() {
323
    if (!serializingRetries) {
1✔
324
      return;
1✔
325
    }
326

327
    class EndOfCurrentBackoff implements Runnable {
1✔
328
      @Override
329
      public void run() {
330
        reconnectTask = null;
1✔
331
        addressIndex.reset();
1✔
332
        requestConnection();
1✔
333
      }
1✔
334
    }
335

336
    // Just allow the previous one to trigger when ready if we're already in backoff
337
    if (reconnectTask != null) {
1✔
338
      return;
×
339
    }
340

341
    if (reconnectPolicy == null) {
1✔
342
      reconnectPolicy = bkoffPolProvider.get();
1✔
343
    }
344
    long delayNanos = reconnectPolicy.nextBackoffNanos();
1✔
345
    reconnectTask = helper.getSynchronizationContext().schedule(
1✔
346
        new EndOfCurrentBackoff(),
347
        delayNanos,
348
        TimeUnit.NANOSECONDS,
349
        helper.getScheduledExecutorService());
1✔
350
  }
1✔
351

352
  private void updateHealthCheckedState(SubchannelData subchannelData) {
353
    if (subchannelData.state != READY) {
1✔
354
      return;
1✔
355
    }
356

357
    if (notAPetiolePolicy || subchannelData.getHealthState() == READY) {
1✔
358
      updateBalancingState(READY,
1✔
359
          new FixedResultPicker(PickResult.withSubchannel(subchannelData.subchannel)));
1✔
360
    } else if (subchannelData.getHealthState() == TRANSIENT_FAILURE) {
1✔
361
      updateBalancingState(TRANSIENT_FAILURE, new Picker(PickResult.withError(
1✔
362
          subchannelData.healthStateInfo.getStatus())));
1✔
363
    } else if (concludedState != TRANSIENT_FAILURE) {
1✔
364
      updateBalancingState(subchannelData.getHealthState(),
1✔
365
          new Picker(PickResult.withNoResult()));
1✔
366
    }
367
  }
1✔
368

369
  private void updateBalancingState(ConnectivityState state, SubchannelPicker picker) {
370
    // an optimization: de-dup IDLE or CONNECTING notification.
371
    if (state == concludedState && (state == IDLE || state == CONNECTING)) {
1✔
372
      return;
1✔
373
    }
374
    concludedState = state;
1✔
375
    helper.updateBalancingState(state, picker);
1✔
376
  }
1✔
377

378
  @Override
379
  public void shutdown() {
380
    log.log(Level.FINE,
1✔
381
        "Shutting down, currently have {} subchannels created", subchannels.size());
1✔
382
    rawConnectivityState = SHUTDOWN;
1✔
383
    concludedState = SHUTDOWN;
1✔
384
    cancelScheduleTask();
1✔
385
    if (reconnectTask != null) {
1✔
386
      reconnectTask.cancel();
1✔
387
      reconnectTask = null;
1✔
388
    }
389
    reconnectPolicy = null;
1✔
390

391
    for (SubchannelData subchannelData : subchannels.values()) {
1✔
392
      subchannelData.getSubchannel().shutdown();
1✔
393
    }
1✔
394

395
    subchannels.clear();
1✔
396
  }
1✔
397

398
  /**
399
  * Shuts down remaining subchannels. Called when a subchannel becomes ready, which means
400
  * that all other subchannels must be shutdown.
401
  */
402
  private void shutdownRemaining(SubchannelData activeSubchannelData) {
403
    if (reconnectTask != null) {
1✔
404
      reconnectTask.cancel();
1✔
405
      reconnectTask = null;
1✔
406
    }
407
    reconnectPolicy = null;
1✔
408

409
    cancelScheduleTask();
1✔
410
    for (SubchannelData subchannelData : subchannels.values()) {
1✔
411
      if (!subchannelData.getSubchannel().equals(activeSubchannelData.subchannel)) {
1✔
412
        subchannelData.getSubchannel().shutdown();
1✔
413
      }
414
    }
1✔
415
    subchannels.clear();
1✔
416
    activeSubchannelData.updateState(READY);
1✔
417
    subchannels.put(getAddress(activeSubchannelData.subchannel), activeSubchannelData);
1✔
418
  }
1✔
419

420
  /**
421
   * Requests a connection to the next applicable address' subchannel, creating one if necessary.
422
   * Schedules a connection to next address in list as well.
423
   * If the current channel has already attempted a connection, we attempt a connection
424
   * to the next address/subchannel in our list.  We assume that createNewSubchannel will never
425
   * return null.
426
   */
427
  @Override
428
  public void requestConnection() {
429
    if (!addressIndex.isValid() || rawConnectivityState == SHUTDOWN) {
1✔
430
      return;
1✔
431
    }
432

433
    SocketAddress currentAddress = addressIndex.getCurrentAddress();
1✔
434
    SubchannelData subchannelData = subchannels.get(currentAddress);
1✔
435
    if (subchannelData == null) {
1✔
436
      subchannelData = createNewSubchannel(currentAddress, addressIndex.getCurrentEagAttributes());
1✔
437
    }
438

439
    ConnectivityState subchannelState = subchannelData.getState();
1✔
440
    switch (subchannelState) {
1✔
441
      case IDLE:
442
        subchannelData.subchannel.requestConnection();
1✔
443
        subchannelData.updateState(CONNECTING);
1✔
444
        scheduleNextConnection();
1✔
445
        break;
1✔
446
      case CONNECTING:
447
        scheduleNextConnection();
1✔
448
        break;
1✔
449
      case TRANSIENT_FAILURE:
450
        if (!serializingRetries) {
1✔
451
          addressIndex.increment();
1✔
452
          requestConnection();
1✔
453
        } else {
454
          if (!addressIndex.isValid()) {
1✔
455
            scheduleBackoff();
×
456
          } else {
457
            subchannelData.subchannel.requestConnection();
1✔
458
            subchannelData.updateState(CONNECTING);
1✔
459
          }
460
        }
461
        break;
1✔
462
      default:
463
        // Wait for current subchannel to change state
464
    }
465
  }
1✔
466

467

468
  /**
469
  * Happy Eyeballs
470
  * Schedules connection attempt to happen after a delay to the next available address.
471
  */
472
  private void scheduleNextConnection() {
473
    if (!enableHappyEyeballs
1✔
474
        || (scheduleConnectionTask != null && scheduleConnectionTask.isPending())) {
1✔
475
      return;
1✔
476
    }
477

478
    class StartNextConnection implements Runnable {
1✔
479
      @Override
480
      public void run() {
481
        scheduleConnectionTask = null;
1✔
482
        if (addressIndex.increment()) {
1✔
483
          requestConnection();
1✔
484
        }
485
      }
1✔
486
    }
487

488
    scheduleConnectionTask = helper.getSynchronizationContext().schedule(
1✔
489
        new StartNextConnection(),
490
        CONNECTION_DELAY_INTERVAL_MS,
491
        TimeUnit.MILLISECONDS,
492
        helper.getScheduledExecutorService());
1✔
493
  }
1✔
494

495
  private void cancelScheduleTask() {
496
    if (scheduleConnectionTask != null) {
1✔
497
      scheduleConnectionTask.cancel();
1✔
498
      scheduleConnectionTask = null;
1✔
499
    }
500
  }
1✔
501

502
  private SubchannelData createNewSubchannel(SocketAddress addr, Attributes attrs) {
503
    HealthListener hcListener = new HealthListener();
1✔
504
    final Subchannel subchannel = helper.createSubchannel(
1✔
505
        CreateSubchannelArgs.newBuilder()
1✔
506
            .setAddresses(Lists.newArrayList(
1✔
507
                new EquivalentAddressGroup(addr, attrs)))
508
            .addOption(HEALTH_CONSUMER_LISTENER_ARG_KEY, hcListener)
1✔
509
            .addOption(LoadBalancer.DISABLE_SUBCHANNEL_RECONNECT_KEY, serializingRetries)
1✔
510
            .build());
1✔
511
    if (subchannel == null) {
1✔
512
      log.warning("Was not able to create subchannel for " + addr);
×
513
      throw new IllegalStateException("Can't create subchannel");
×
514
    }
515
    SubchannelData subchannelData = new SubchannelData(subchannel, IDLE);
1✔
516
    hcListener.subchannelData = subchannelData;
1✔
517
    subchannels.put(addr, subchannelData);
1✔
518
    Attributes scAttrs = subchannel.getAttributes();
1✔
519
    if (notAPetiolePolicy || scAttrs.get(LoadBalancer.HAS_HEALTH_PRODUCER_LISTENER_KEY) == null) {
1✔
520
      subchannelData.healthStateInfo = ConnectivityStateInfo.forNonError(READY);
1✔
521
    }
522
    subchannel.start(stateInfo -> processSubchannelState(subchannelData, stateInfo));
1✔
523
    return subchannelData;
1✔
524
  }
525

526
  private boolean isPassComplete() {
527
    if (subchannels.size() < addressIndex.size()) {
1✔
528
      return false;
1✔
529
    }
530
    for (SubchannelData sc : subchannels.values()) {
1✔
531
      if (!sc.isCompletedConnectivityAttempt() ) {
1✔
532
        return false;
1✔
533
      }
534
    }
1✔
535
    return true;
1✔
536
  }
537

538
  private final class HealthListener implements SubchannelStateListener {
1✔
539
    private SubchannelData subchannelData;
540

541
    @Override
542
    public void onSubchannelState(ConnectivityStateInfo newState) {
543
      if (notAPetiolePolicy) {
1✔
544
        log.log(Level.WARNING,
1✔
545
            "Ignoring health status {0} for subchannel {1} as this is not under a petiole policy",
546
            new Object[]{newState, subchannelData.subchannel});
1✔
547
        return;
1✔
548
      }
549

550
      log.log(Level.FINE, "Received health status {0} for subchannel {1}",
1✔
551
          new Object[]{newState, subchannelData.subchannel});
1✔
552
      subchannelData.healthStateInfo = newState;
1✔
553
      if (addressIndex.isValid()
1✔
554
          && subchannelData == subchannels.get(addressIndex.getCurrentAddress())) {
1✔
555
        updateHealthCheckedState(subchannelData);
1✔
556
      }
557
    }
1✔
558
  }
559

560
  private SocketAddress getAddress(Subchannel subchannel) {
561
    return subchannel.getAddresses().getAddresses().get(0);
1✔
562
  }
563

564
  @VisibleForTesting
565
  ConnectivityState getConcludedConnectivityState() {
566
    return this.concludedState;
1✔
567
  }
568

569
  /**
570
   * No-op picker which doesn't add any custom picking logic. It just passes already known result
571
   * received in constructor.
572
   */
573
  private static final class Picker extends SubchannelPicker {
574
    private final PickResult result;
575

576
    Picker(PickResult result) {
1✔
577
      this.result = checkNotNull(result, "result");
1✔
578
    }
1✔
579

580
    @Override
581
    public PickResult pickSubchannel(PickSubchannelArgs args) {
582
      return result;
1✔
583
    }
584

585
    @Override
586
    public String toString() {
587
      return MoreObjects.toStringHelper(Picker.class).add("result", result).toString();
×
588
    }
589
  }
590

591
  /**
592
   * Picker that requests connection during the first pick, and returns noResult.
593
   */
594
  private final class RequestConnectionPicker extends SubchannelPicker {
595
    private final PickFirstLeafLoadBalancer pickFirstLeafLoadBalancer;
596
    private final AtomicBoolean connectionRequested = new AtomicBoolean(false);
1✔
597

598
    RequestConnectionPicker(PickFirstLeafLoadBalancer pickFirstLeafLoadBalancer) {
1✔
599
      this.pickFirstLeafLoadBalancer =
1✔
600
          checkNotNull(pickFirstLeafLoadBalancer, "pickFirstLeafLoadBalancer");
1✔
601
    }
1✔
602

603
    @Override
604
    public PickResult pickSubchannel(PickSubchannelArgs args) {
605
      if (connectionRequested.compareAndSet(false, true)) {
1✔
606
        helper.getSynchronizationContext().execute(pickFirstLeafLoadBalancer::requestConnection);
1✔
607
      }
608
      return PickResult.withNoResult();
1✔
609
    }
610
  }
611

612
  /**
613
   * Index as in 'i', the pointer to an entry. Not a "search index."
614
   * All updates should be done in a synchronization context.
615
   */
616
  @VisibleForTesting
617
  static final class Index {
618
    private List<EquivalentAddressGroup> addressGroups;
619
    private int size;
620
    private int groupIndex;
621
    private int addressIndex;
622

623
    public Index(List<EquivalentAddressGroup> groups) {
1✔
624
      updateGroups(groups);
1✔
625
    }
1✔
626

627
    public boolean isValid() {
628
      // Is invalid if empty or has incremented off the end
629
      return groupIndex < addressGroups.size();
1✔
630
    }
631

632
    public boolean isAtBeginning() {
633
      return groupIndex == 0 && addressIndex == 0;
1✔
634
    }
635

636
    /**
637
     * Move to next address in group.  If last address in group move to first address of next group.
638
     * @return false if went off end of the list, otherwise true
639
     */
640
    public boolean increment() {
641
      if (!isValid()) {
1✔
642
        return false;
1✔
643
      }
644

645
      EquivalentAddressGroup group = addressGroups.get(groupIndex);
1✔
646
      addressIndex++;
1✔
647
      if (addressIndex >= group.getAddresses().size()) {
1✔
648
        groupIndex++;
1✔
649
        addressIndex = 0;
1✔
650
        return groupIndex < addressGroups.size();
1✔
651
      }
652

653
      return true;
1✔
654
    }
655

656
    public void reset() {
657
      groupIndex = 0;
1✔
658
      addressIndex = 0;
1✔
659
    }
1✔
660

661
    public SocketAddress getCurrentAddress() {
662
      if (!isValid()) {
1✔
663
        throw new IllegalStateException("Index is past the end of the address group list");
×
664
      }
665
      return addressGroups.get(groupIndex).getAddresses().get(addressIndex);
1✔
666
    }
667

668
    public Attributes getCurrentEagAttributes() {
669
      if (!isValid()) {
1✔
670
        throw new IllegalStateException("Index is off the end of the address group list");
×
671
      }
672
      return addressGroups.get(groupIndex).getAttributes();
1✔
673
    }
674

675
    public List<EquivalentAddressGroup> getCurrentEagAsList() {
676
      return Collections.singletonList(
1✔
677
          new EquivalentAddressGroup(getCurrentAddress(), getCurrentEagAttributes()));
1✔
678
    }
679

680
    /**
681
     * Update to new groups, resetting the current index.
682
     */
683
    public void updateGroups(List<EquivalentAddressGroup> newGroups) {
684
      addressGroups = checkNotNull(newGroups, "newGroups");
1✔
685
      reset();
1✔
686
      int size = 0;
1✔
687
      for (EquivalentAddressGroup eag : newGroups) {
1✔
688
        size += eag.getAddresses().size();
1✔
689
      }
1✔
690
      this.size = size;
1✔
691
    }
1✔
692

693
    /**
694
     * Returns false if the needle was not found and the current index was left unchanged.
695
     */
696
    public boolean seekTo(SocketAddress needle) {
697
      for (int i = 0; i < addressGroups.size(); i++) {
1✔
698
        EquivalentAddressGroup group = addressGroups.get(i);
1✔
699
        int j = group.getAddresses().indexOf(needle);
1✔
700
        if (j == -1) {
1✔
701
          continue;
1✔
702
        }
703
        this.groupIndex = i;
1✔
704
        this.addressIndex = j;
1✔
705
        return true;
1✔
706
      }
707
      return false;
1✔
708
    }
709

710
    public int size() {
711
      return size;
1✔
712
    }
713
  }
714

715
  @VisibleForTesting
716
  int getGroupIndex() {
717
    return addressIndex.groupIndex;
1✔
718
  }
719

720
  @VisibleForTesting
721
  boolean isIndexValid() {
722
    return addressIndex.isValid();
1✔
723
  }
724

725
  private static final class SubchannelData {
726
    private final Subchannel subchannel;
727
    private ConnectivityState state;
728
    private boolean completedConnectivityAttempt = false;
1✔
729
    private ConnectivityStateInfo healthStateInfo = ConnectivityStateInfo.forNonError(IDLE);
1✔
730

731
    public SubchannelData(Subchannel subchannel, ConnectivityState state) {
1✔
732
      this.subchannel = subchannel;
1✔
733
      this.state = state;
1✔
734
    }
1✔
735

736
    public Subchannel getSubchannel() {
737
      return this.subchannel;
1✔
738
    }
739

740
    public ConnectivityState getState() {
741
      return this.state;
1✔
742
    }
743

744
    public boolean isCompletedConnectivityAttempt() {
745
      return completedConnectivityAttempt;
1✔
746
    }
747

748
    private void updateState(ConnectivityState newState) {
749
      this.state = newState;
1✔
750
      if (newState == READY || newState == TRANSIENT_FAILURE) {
1✔
751
        completedConnectivityAttempt = true;
1✔
752
      } else if (newState == IDLE) {
1✔
753
        completedConnectivityAttempt = false;
1✔
754
      }
755
    }
1✔
756

757
    private ConnectivityState getHealthState() {
758
      return healthStateInfo.getState();
1✔
759
    }
760
  }
761

762
  public static final class PickFirstLeafLoadBalancerConfig {
763

764
    @Nullable
765
    public final Boolean shuffleAddressList;
766

767
    // For testing purposes only, not meant to be parsed from a real config.
768
    @Nullable
769
    final Long randomSeed;
770

771
    public PickFirstLeafLoadBalancerConfig(@Nullable Boolean shuffleAddressList) {
772
      this(shuffleAddressList, null);
1✔
773
    }
1✔
774

775
    PickFirstLeafLoadBalancerConfig(@Nullable Boolean shuffleAddressList,
776
        @Nullable Long randomSeed) {
1✔
777
      this.shuffleAddressList = shuffleAddressList;
1✔
778
      this.randomSeed = randomSeed;
1✔
779
    }
1✔
780
  }
781
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc