• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

grpc / grpc-java / #19400

02 Aug 2024 09:23PM CUT coverage: 84.48% (+0.001%) from 84.479%
#19400

push

github

ejona86
core: In PF, pass around SubchannelData instead of Subchannel

Each usage of the subchannel immediately looked up the SubchannelData.

33276 of 39389 relevant lines covered (84.48%)

0.84 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

95.98
/../core/src/main/java/io/grpc/internal/PickFirstLeafLoadBalancer.java
1
/*
2
 * Copyright 2023 The gRPC Authors
3
 *
4
 * Licensed under the Apache License, Version 2.0 (the "License");
5
 * you may not use this file except in compliance with the License.
6
 * You may obtain a copy of the License at
7
 *
8
 *     http://www.apache.org/licenses/LICENSE-2.0
9
 *
10
 * Unless required by applicable law or agreed to in writing, software
11
 * distributed under the License is distributed on an "AS IS" BASIS,
12
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
 * See the License for the specific language governing permissions and
14
 * limitations under the License.
15
 */
16

17
package io.grpc.internal;
18

19
import static com.google.common.base.Preconditions.checkNotNull;
20
import static io.grpc.ConnectivityState.CONNECTING;
21
import static io.grpc.ConnectivityState.IDLE;
22
import static io.grpc.ConnectivityState.READY;
23
import static io.grpc.ConnectivityState.SHUTDOWN;
24
import static io.grpc.ConnectivityState.TRANSIENT_FAILURE;
25

26
import com.google.common.annotations.VisibleForTesting;
27
import com.google.common.base.MoreObjects;
28
import com.google.common.collect.ImmutableList;
29
import com.google.common.collect.Lists;
30
import io.grpc.Attributes;
31
import io.grpc.ConnectivityState;
32
import io.grpc.ConnectivityStateInfo;
33
import io.grpc.EquivalentAddressGroup;
34
import io.grpc.LoadBalancer;
35
import io.grpc.Status;
36
import io.grpc.SynchronizationContext;
37
import io.grpc.SynchronizationContext.ScheduledHandle;
38
import java.net.SocketAddress;
39
import java.util.ArrayList;
40
import java.util.Collections;
41
import java.util.HashMap;
42
import java.util.HashSet;
43
import java.util.List;
44
import java.util.Map;
45
import java.util.Random;
46
import java.util.Set;
47
import java.util.concurrent.TimeUnit;
48
import java.util.concurrent.atomic.AtomicBoolean;
49
import java.util.logging.Level;
50
import java.util.logging.Logger;
51
import javax.annotation.Nullable;
52

53
/**
54
 * A {@link LoadBalancer} that provides no load-balancing over the addresses from the {@link
55
 * io.grpc.NameResolver}. The channel's default behavior is used, which is walking down the address
56
 * list and sticking to the first that works.
57
 */
58
final class PickFirstLeafLoadBalancer extends LoadBalancer {
59
  private static final Logger log = Logger.getLogger(PickFirstLeafLoadBalancer.class.getName());
1✔
60
  @VisibleForTesting
61
  static final int CONNECTION_DELAY_INTERVAL_MS = 250;
62
  private final Helper helper;
63
  private final Map<SocketAddress, SubchannelData> subchannels = new HashMap<>();
1✔
64
  private Index addressIndex;
65
  private int numTf = 0;
1✔
66
  private boolean firstPass = true;
1✔
67
  @Nullable
68
  private ScheduledHandle scheduleConnectionTask;
69
  private ConnectivityState rawConnectivityState = IDLE;
1✔
70
  private ConnectivityState concludedState = IDLE;
1✔
71
  private final boolean enableHappyEyeballs =
1✔
72
      PickFirstLoadBalancerProvider.isEnabledHappyEyeballs();
1✔
73

74
  PickFirstLeafLoadBalancer(Helper helper) {
1✔
75
    this.helper = checkNotNull(helper, "helper");
1✔
76
  }
1✔
77

78
  @Override
79
  public Status acceptResolvedAddresses(ResolvedAddresses resolvedAddresses) {
80
    if (rawConnectivityState == SHUTDOWN) {
1✔
81
      return Status.FAILED_PRECONDITION.withDescription("Already shut down");
1✔
82
    }
83

84
    List<EquivalentAddressGroup> servers = resolvedAddresses.getAddresses();
1✔
85

86
    // Validate the address list
87
    if (servers.isEmpty()) {
1✔
88
      Status unavailableStatus = Status.UNAVAILABLE.withDescription(
1✔
89
              "NameResolver returned no usable address. addrs=" + resolvedAddresses.getAddresses()
1✔
90
                      + ", attrs=" + resolvedAddresses.getAttributes());
1✔
91
      handleNameResolutionError(unavailableStatus);
1✔
92
      return unavailableStatus;
1✔
93
    }
94
    for (EquivalentAddressGroup eag : servers) {
1✔
95
      if (eag == null) {
1✔
96
        Status unavailableStatus = Status.UNAVAILABLE.withDescription(
1✔
97
            "NameResolver returned address list with null endpoint. addrs="
98
                + resolvedAddresses.getAddresses() + ", attrs="
1✔
99
                + resolvedAddresses.getAttributes());
1✔
100
        handleNameResolutionError(unavailableStatus);
1✔
101
        return unavailableStatus;
1✔
102
      }
103
    }
1✔
104

105
    // Since we have a new set of addresses, we are again at first pass
106
    firstPass = true;
1✔
107

108
    List<EquivalentAddressGroup> cleanServers = deDupAddresses(servers);
1✔
109

110
    // We can optionally be configured to shuffle the address list. This can help better distribute
111
    // the load.
112
    if (resolvedAddresses.getLoadBalancingPolicyConfig()
1✔
113
        instanceof PickFirstLeafLoadBalancerConfig) {
114
      PickFirstLeafLoadBalancerConfig config
1✔
115
          = (PickFirstLeafLoadBalancerConfig) resolvedAddresses.getLoadBalancingPolicyConfig();
1✔
116
      if (config.shuffleAddressList != null && config.shuffleAddressList) {
1✔
117
        Collections.shuffle(cleanServers,
1✔
118
            config.randomSeed != null ? new Random(config.randomSeed) : new Random());
1✔
119
      }
120
    }
121

122
    final ImmutableList<EquivalentAddressGroup> newImmutableAddressGroups =
123
        ImmutableList.<EquivalentAddressGroup>builder().addAll(cleanServers).build();
1✔
124

125
    if (addressIndex == null) {
1✔
126
      addressIndex = new Index(newImmutableAddressGroups);
1✔
127
    } else if (rawConnectivityState == READY) {
1✔
128
      // If the previous ready subchannel exists in new address list,
129
      // keep this connection and don't create new subchannels
130
      SocketAddress previousAddress = addressIndex.getCurrentAddress();
1✔
131
      addressIndex.updateGroups(newImmutableAddressGroups);
1✔
132
      if (addressIndex.seekTo(previousAddress)) {
1✔
133
        SubchannelData subchannelData = subchannels.get(previousAddress);
1✔
134
        subchannelData.getSubchannel().updateAddresses(addressIndex.getCurrentEagAsList());
1✔
135
        return Status.OK;
1✔
136
      } else {
137
        addressIndex.reset(); // Previous ready subchannel not in the new list of addresses
1✔
138
      }
139
    } else {
1✔
140
      addressIndex.updateGroups(newImmutableAddressGroups);
1✔
141
    }
142

143
    // remove old subchannels that were not in new address list
144
    Set<SocketAddress> oldAddrs = new HashSet<>(subchannels.keySet());
1✔
145

146
    // Flatten the new EAGs addresses
147
    Set<SocketAddress> newAddrs = new HashSet<>();
1✔
148
    for (EquivalentAddressGroup endpoint : newImmutableAddressGroups) {
1✔
149
      newAddrs.addAll(endpoint.getAddresses());
1✔
150
    }
1✔
151

152
    // Shut them down and remove them
153
    for (SocketAddress oldAddr : oldAddrs) {
1✔
154
      if (!newAddrs.contains(oldAddr)) {
1✔
155
        subchannels.remove(oldAddr).getSubchannel().shutdown();
1✔
156
      }
157
    }
1✔
158

159
    if (oldAddrs.size() == 0 || rawConnectivityState == CONNECTING
1✔
160
        || rawConnectivityState == READY) {
161
      // start connection attempt at first address
162
      rawConnectivityState = CONNECTING;
1✔
163
      updateBalancingState(CONNECTING, new Picker(PickResult.withNoResult()));
1✔
164
      cancelScheduleTask();
1✔
165
      requestConnection();
1✔
166

167
    } else if (rawConnectivityState == IDLE) {
1✔
168
      // start connection attempt at first address when requested
169
      SubchannelPicker picker = new RequestConnectionPicker(this);
1✔
170
      updateBalancingState(IDLE, picker);
1✔
171

172
    } else if (rawConnectivityState == TRANSIENT_FAILURE) {
1✔
173
      // start connection attempt at first address
174
      cancelScheduleTask();
1✔
175
      requestConnection();
1✔
176
    }
177

178
    return Status.OK;
1✔
179
  }
180

181
  private static List<EquivalentAddressGroup> deDupAddresses(List<EquivalentAddressGroup> groups) {
182
    Set<SocketAddress> seenAddresses = new HashSet<>();
1✔
183
    List<EquivalentAddressGroup> newGroups = new ArrayList<>();
1✔
184

185
    for (EquivalentAddressGroup group : groups) {
1✔
186
      List<SocketAddress> addrs = new ArrayList<>();
1✔
187
      for (SocketAddress addr : group.getAddresses()) {
1✔
188
        if (seenAddresses.add(addr)) {
1✔
189
          addrs.add(addr);
1✔
190
        }
191
      }
1✔
192
      if (!addrs.isEmpty()) {
1✔
193
        newGroups.add(new EquivalentAddressGroup(addrs, group.getAttributes()));
1✔
194
      }
195
    }
1✔
196

197
    return newGroups;
1✔
198
  }
199

200
  @Override
201
  public void handleNameResolutionError(Status error) {
202
    if (rawConnectivityState == SHUTDOWN) {
1✔
203
      return;
×
204
    }
205

206
    for (SubchannelData subchannelData : subchannels.values()) {
1✔
207
      subchannelData.getSubchannel().shutdown();
1✔
208
    }
1✔
209
    subchannels.clear();
1✔
210
    if (addressIndex != null) {
1✔
211
      addressIndex.updateGroups(null);
1✔
212
    }
213
    rawConnectivityState = TRANSIENT_FAILURE;
1✔
214
    updateBalancingState(TRANSIENT_FAILURE, new Picker(PickResult.withError(error)));
1✔
215
  }
1✔
216

217
  void processSubchannelState(SubchannelData subchannelData, ConnectivityStateInfo stateInfo) {
218
    ConnectivityState newState = stateInfo.getState();
1✔
219

220
    // Shutdown channels/previously relevant subchannels can still callback with state updates.
221
    // To prevent pickers from returning these obsolete subchannels, this logic
222
    // is included to check if the current list of active subchannels includes this subchannel.
223
    if (subchannelData != subchannels.get(getAddress(subchannelData.subchannel))) {
1✔
224
      return;
1✔
225
    }
226

227
    if (newState == SHUTDOWN) {
1✔
228
      return;
1✔
229
    }
230

231
    if (newState == IDLE) {
1✔
232
      helper.refreshNameResolution();
1✔
233
    }
234
    // If we are transitioning from a TRANSIENT_FAILURE to CONNECTING or IDLE we ignore this state
235
    // transition and still keep the LB in TRANSIENT_FAILURE state. This is referred to as "sticky
236
    // transient failure". Only a subchannel state change to READY will get the LB out of
237
    // TRANSIENT_FAILURE. If the state is IDLE we additionally request a new connection so that we
238
    // keep retrying for a connection.
239

240
    // With the new pick first implementation, individual subchannels will have their own backoff
241
    // on a per-address basis. Thus, iterative requests for connections will not be requested
242
    // once the first pass through is complete.
243
    // However, every time there is an address update, we will perform a pass through for the new
244
    // addresses in the updated list.
245
    subchannelData.updateState(newState);
1✔
246
    if (rawConnectivityState == TRANSIENT_FAILURE || concludedState == TRANSIENT_FAILURE)  {
1✔
247
      if (newState == CONNECTING) {
1✔
248
        // each subchannel is responsible for its own backoff
249
        return;
1✔
250
      } else if (newState == IDLE) {
1✔
251
        requestConnection();
1✔
252
        return;
1✔
253
      }
254
    }
255

256
    switch (newState) {
1✔
257
      case IDLE:
258
        // Shutdown when ready: connect from beginning when prompted
259
        addressIndex.reset();
1✔
260
        rawConnectivityState = IDLE;
1✔
261
        updateBalancingState(IDLE, new RequestConnectionPicker(this));
1✔
262
        break;
1✔
263

264
      case CONNECTING:
265
        rawConnectivityState = CONNECTING;
1✔
266
        updateBalancingState(CONNECTING, new Picker(PickResult.withNoResult()));
1✔
267
        break;
1✔
268

269
      case READY:
270
        shutdownRemaining(subchannelData);
1✔
271
        addressIndex.seekTo(getAddress(subchannelData.subchannel));
1✔
272
        rawConnectivityState = READY;
1✔
273
        updateHealthCheckedState(subchannelData);
1✔
274
        break;
1✔
275

276
      case TRANSIENT_FAILURE:
277
        // If we are looking at current channel, request a connection if possible
278
        if (addressIndex.isValid()
1✔
279
            && subchannels.get(addressIndex.getCurrentAddress()) == subchannelData) {
1✔
280
          if (addressIndex.increment()) {
1✔
281
            cancelScheduleTask();
1✔
282
            requestConnection(); // is recursive so might hit the end of the addresses
1✔
283
          }
284
        }
285

286
        if (isPassComplete()) {
1✔
287
          rawConnectivityState = TRANSIENT_FAILURE;
1✔
288
          updateBalancingState(TRANSIENT_FAILURE,
1✔
289
              new Picker(PickResult.withError(stateInfo.getStatus())));
1✔
290

291
          // Refresh Name Resolution, but only when all 3 conditions are met
292
          // * We are at the end of addressIndex
293
          // * have had status reported for all subchannels.
294
          // * And one of the following conditions:
295
          //    * Have had enough TF reported since we completed first pass
296
          //    * Just completed the first pass
297
          if (++numTf >= addressIndex.size() || firstPass) {
1✔
298
            firstPass = false;
1✔
299
            numTf = 0;
1✔
300
            helper.refreshNameResolution();
1✔
301
          }
302
        }
303
        break;
304

305
      default:
306
        throw new IllegalArgumentException("Unsupported state:" + newState);
×
307
    }
308
  }
1✔
309

310
  private void updateHealthCheckedState(SubchannelData subchannelData) {
311
    if (subchannelData.state != READY) {
1✔
312
      return;
1✔
313
    }
314
    if (subchannelData.getHealthState() == READY) {
1✔
315
      updateBalancingState(READY,
1✔
316
          new FixedResultPicker(PickResult.withSubchannel(subchannelData.subchannel)));
1✔
317
    } else if (subchannelData.getHealthState() == TRANSIENT_FAILURE) {
1✔
318
      updateBalancingState(TRANSIENT_FAILURE, new Picker(PickResult.withError(
1✔
319
          subchannelData.healthStateInfo.getStatus())));
1✔
320
    } else if (concludedState != TRANSIENT_FAILURE) {
1✔
321
      updateBalancingState(subchannelData.getHealthState(),
1✔
322
          new Picker(PickResult.withNoResult()));
1✔
323
    }
324
  }
1✔
325

326
  private void updateBalancingState(ConnectivityState state, SubchannelPicker picker) {
327
    // an optimization: de-dup IDLE or CONNECTING notification.
328
    if (state == concludedState && (state == IDLE || state == CONNECTING)) {
1✔
329
      return;
1✔
330
    }
331
    concludedState = state;
1✔
332
    helper.updateBalancingState(state, picker);
1✔
333
  }
1✔
334

335
  @Override
336
  public void shutdown() {
337
    log.log(Level.FINE,
1✔
338
        "Shutting down, currently have {} subchannels created", subchannels.size());
1✔
339
    rawConnectivityState = SHUTDOWN;
1✔
340
    concludedState = SHUTDOWN;
1✔
341
    cancelScheduleTask();
1✔
342

343
    for (SubchannelData subchannelData : subchannels.values()) {
1✔
344
      subchannelData.getSubchannel().shutdown();
1✔
345
    }
1✔
346

347
    subchannels.clear();
1✔
348
  }
1✔
349

350
  /**
351
  * Shuts down remaining subchannels. Called when a subchannel becomes ready, which means
352
  * that all other subchannels must be shutdown.
353
  */
354
  private void shutdownRemaining(SubchannelData activeSubchannelData) {
355
    cancelScheduleTask();
1✔
356
    for (SubchannelData subchannelData : subchannels.values()) {
1✔
357
      if (!subchannelData.getSubchannel().equals(activeSubchannelData.subchannel)) {
1✔
358
        subchannelData.getSubchannel().shutdown();
1✔
359
      }
360
    }
1✔
361
    subchannels.clear();
1✔
362
    activeSubchannelData.updateState(READY);
1✔
363
    subchannels.put(getAddress(activeSubchannelData.subchannel), activeSubchannelData);
1✔
364
  }
1✔
365

366
  /**
367
   * Requests a connection to the next applicable address' subchannel, creating one if necessary.
368
   * Schedules a connection to next address in list as well.
369
   * If the current channel has already attempted a connection, we attempt a connection
370
   * to the next address/subchannel in our list.  We assume that createNewSubchannel will never
371
   * return null.
372
   */
373
  @Override
374
  public void requestConnection() {
375
    if (addressIndex == null || !addressIndex.isValid() || rawConnectivityState == SHUTDOWN ) {
1✔
376
      return;
1✔
377
    }
378

379
    SocketAddress currentAddress = addressIndex.getCurrentAddress();
1✔
380
    SubchannelData subchannelData = subchannels.get(currentAddress);
1✔
381
    if (subchannelData == null) {
1✔
382
      subchannelData = createNewSubchannel(currentAddress, addressIndex.getCurrentEagAttributes());
1✔
383
    }
384

385
    ConnectivityState subchannelState = subchannelData.getState();
1✔
386
    switch (subchannelState) {
1✔
387
      case IDLE:
388
        subchannelData.subchannel.requestConnection();
1✔
389
        subchannelData.updateState(CONNECTING);
1✔
390
        scheduleNextConnection();
1✔
391
        break;
1✔
392
      case CONNECTING:
393
        if (enableHappyEyeballs) {
1✔
394
          scheduleNextConnection();
1✔
395
        } else {
396
          subchannelData.subchannel.requestConnection();
1✔
397
        }
398
        break;
1✔
399
      case TRANSIENT_FAILURE:
400
        addressIndex.increment();
1✔
401
        requestConnection();
1✔
402
        break;
1✔
403
      case READY: // Shouldn't ever happen
404
        log.warning("Requesting a connection even though we have a READY subchannel");
×
405
        break;
×
406
      case SHUTDOWN:
407
      default:
408
        // Makes checkstyle happy
409
    }
410
  }
1✔
411

412

413
  /**
414
  * Happy Eyeballs
415
  * Schedules connection attempt to happen after a delay to the next available address.
416
  */
417
  private void scheduleNextConnection() {
418
    if (!enableHappyEyeballs
1✔
419
        || (scheduleConnectionTask != null && scheduleConnectionTask.isPending())) {
1✔
420
      return;
1✔
421
    }
422

423
    class StartNextConnection implements Runnable {
1✔
424
      @Override
425
      public void run() {
426
        scheduleConnectionTask = null;
1✔
427
        if (addressIndex.increment()) {
1✔
428
          requestConnection();
1✔
429
        }
430
      }
1✔
431
    }
432

433
    SynchronizationContext synchronizationContext = null;
1✔
434
    try {
435
      synchronizationContext = helper.getSynchronizationContext();
1✔
436
    } catch (NullPointerException e) {
×
437
      // All helpers should have a sync context, but if one doesn't (ex. user had a custom test)
438
      // we don't want to break previously working functionality.
439
      return;
×
440
    }
1✔
441

442
    scheduleConnectionTask = synchronizationContext.schedule(
1✔
443
        new StartNextConnection(),
444
        CONNECTION_DELAY_INTERVAL_MS,
445
        TimeUnit.MILLISECONDS,
446
        helper.getScheduledExecutorService());
1✔
447
  }
1✔
448

449
  private void cancelScheduleTask() {
450
    if (scheduleConnectionTask != null) {
1✔
451
      scheduleConnectionTask.cancel();
1✔
452
      scheduleConnectionTask = null;
1✔
453
    }
454
  }
1✔
455

456
  private SubchannelData createNewSubchannel(SocketAddress addr, Attributes attrs) {
457
    HealthListener hcListener = new HealthListener();
1✔
458
    final Subchannel subchannel = helper.createSubchannel(
1✔
459
        CreateSubchannelArgs.newBuilder()
1✔
460
        .setAddresses(Lists.newArrayList(
1✔
461
            new EquivalentAddressGroup(addr, attrs)))
462
        .addOption(HEALTH_CONSUMER_LISTENER_ARG_KEY, hcListener)
1✔
463
            .build());
1✔
464
    if (subchannel == null) {
1✔
465
      log.warning("Was not able to create subchannel for " + addr);
×
466
      throw new IllegalStateException("Can't create subchannel");
×
467
    }
468
    SubchannelData subchannelData = new SubchannelData(subchannel, IDLE);
1✔
469
    hcListener.subchannelData = subchannelData;
1✔
470
    subchannels.put(addr, subchannelData);
1✔
471
    Attributes scAttrs = subchannel.getAttributes();
1✔
472
    if (scAttrs.get(LoadBalancer.HAS_HEALTH_PRODUCER_LISTENER_KEY) == null) {
1✔
473
      subchannelData.healthStateInfo = ConnectivityStateInfo.forNonError(READY);
1✔
474
    }
475
    subchannel.start(stateInfo -> processSubchannelState(subchannelData, stateInfo));
1✔
476
    return subchannelData;
1✔
477
  }
478

479
  private boolean isPassComplete() {
480
    if (addressIndex == null || addressIndex.isValid()
1✔
481
        || subchannels.size() < addressIndex.size()) {
1✔
482
      return false;
1✔
483
    }
484
    for (SubchannelData sc : subchannels.values()) {
1✔
485
      if (!sc.isCompletedConnectivityAttempt() ) {
1✔
486
        return false;
1✔
487
      }
488
    }
1✔
489
    return true;
1✔
490
  }
491

492
  private final class HealthListener implements SubchannelStateListener {
1✔
493
    private SubchannelData subchannelData;
494

495
    @Override
496
    public void onSubchannelState(ConnectivityStateInfo newState) {
497
      log.log(Level.FINE, "Received health status {0} for subchannel {1}",
1✔
498
          new Object[]{newState, subchannelData.subchannel});
1✔
499
      subchannelData.healthStateInfo = newState;
1✔
500
      try {
501
        if (subchannelData == subchannels.get(addressIndex.getCurrentAddress())) {
1✔
502
          updateHealthCheckedState(subchannelData);
1✔
503
        }
504
      } catch (IllegalStateException e) {
×
505
        log.fine("Health listener received state change after subchannel was removed");
×
506
      }
1✔
507
    }
1✔
508
  }
509

510
  private SocketAddress getAddress(Subchannel subchannel) {
511
    return subchannel.getAddresses().getAddresses().get(0);
1✔
512
  }
513

514
  @VisibleForTesting
515
  ConnectivityState getConcludedConnectivityState() {
516
    return this.concludedState;
1✔
517
  }
518

519
  /**
520
   * No-op picker which doesn't add any custom picking logic. It just passes already known result
521
   * received in constructor.
522
   */
523
  private static final class Picker extends SubchannelPicker {
524
    private final PickResult result;
525

526
    Picker(PickResult result) {
1✔
527
      this.result = checkNotNull(result, "result");
1✔
528
    }
1✔
529

530
    @Override
531
    public PickResult pickSubchannel(PickSubchannelArgs args) {
532
      return result;
1✔
533
    }
534

535
    @Override
536
    public String toString() {
537
      return MoreObjects.toStringHelper(Picker.class).add("result", result).toString();
×
538
    }
539
  }
540

541
  /**
542
   * Picker that requests connection during the first pick, and returns noResult.
543
   */
544
  private final class RequestConnectionPicker extends SubchannelPicker {
545
    private final PickFirstLeafLoadBalancer pickFirstLeafLoadBalancer;
546
    private final AtomicBoolean connectionRequested = new AtomicBoolean(false);
1✔
547

548
    RequestConnectionPicker(PickFirstLeafLoadBalancer pickFirstLeafLoadBalancer) {
1✔
549
      this.pickFirstLeafLoadBalancer =
1✔
550
          checkNotNull(pickFirstLeafLoadBalancer, "pickFirstLeafLoadBalancer");
1✔
551
    }
1✔
552

553
    @Override
554
    public PickResult pickSubchannel(PickSubchannelArgs args) {
555
      if (connectionRequested.compareAndSet(false, true)) {
1✔
556
        helper.getSynchronizationContext().execute(pickFirstLeafLoadBalancer::requestConnection);
1✔
557
      }
558
      return PickResult.withNoResult();
1✔
559
    }
560
  }
561

562
  /**
563
   * Index as in 'i', the pointer to an entry. Not a "search index."
564
   * All updates should be done in a synchronization context.
565
   */
566
  @VisibleForTesting
567
  static final class Index {
568
    private List<EquivalentAddressGroup> addressGroups;
569
    private int groupIndex;
570
    private int addressIndex;
571

572
    public Index(List<EquivalentAddressGroup> groups) {
1✔
573
      this.addressGroups = groups != null ? groups : Collections.emptyList();
1✔
574
    }
1✔
575

576
    public boolean isValid() {
577
      // Is invalid if empty or has incremented off the end
578
      return groupIndex < addressGroups.size();
1✔
579
    }
580

581
    public boolean isAtBeginning() {
582
      return groupIndex == 0 && addressIndex == 0;
1✔
583
    }
584

585
    /**
586
     * Move to next address in group.  If last address in group move to first address of next group.
587
     * @return false if went off end of the list, otherwise true
588
     */
589
    public boolean increment() {
590
      if (!isValid()) {
1✔
591
        return false;
1✔
592
      }
593

594
      EquivalentAddressGroup group = addressGroups.get(groupIndex);
1✔
595
      addressIndex++;
1✔
596
      if (addressIndex >= group.getAddresses().size()) {
1✔
597
        groupIndex++;
1✔
598
        addressIndex = 0;
1✔
599
        return groupIndex < addressGroups.size();
1✔
600
      }
601

602
      return true;
1✔
603
    }
604

605
    public void reset() {
606
      groupIndex = 0;
1✔
607
      addressIndex = 0;
1✔
608
    }
1✔
609

610
    public SocketAddress getCurrentAddress() {
611
      if (!isValid()) {
1✔
612
        throw new IllegalStateException("Index is past the end of the address group list");
×
613
      }
614
      return addressGroups.get(groupIndex).getAddresses().get(addressIndex);
1✔
615
    }
616

617
    public Attributes getCurrentEagAttributes() {
618
      if (!isValid()) {
1✔
619
        throw new IllegalStateException("Index is off the end of the address group list");
×
620
      }
621
      return addressGroups.get(groupIndex).getAttributes();
1✔
622
    }
623

624
    public List<EquivalentAddressGroup> getCurrentEagAsList() {
625
      return Collections.singletonList(
1✔
626
          new EquivalentAddressGroup(getCurrentAddress(), getCurrentEagAttributes()));
1✔
627
    }
628

629
    /**
630
     * Update to new groups, resetting the current index.
631
     */
632
    public void updateGroups(ImmutableList<EquivalentAddressGroup> newGroups) {
633
      addressGroups = newGroups != null ? newGroups : Collections.emptyList();
1✔
634
      reset();
1✔
635
    }
1✔
636

637
    /**
638
     * Returns false if the needle was not found and the current index was left unchanged.
639
     */
640
    public boolean seekTo(SocketAddress needle) {
641
      for (int i = 0; i < addressGroups.size(); i++) {
1✔
642
        EquivalentAddressGroup group = addressGroups.get(i);
1✔
643
        int j = group.getAddresses().indexOf(needle);
1✔
644
        if (j == -1) {
1✔
645
          continue;
1✔
646
        }
647
        this.groupIndex = i;
1✔
648
        this.addressIndex = j;
1✔
649
        return true;
1✔
650
      }
651
      return false;
1✔
652
    }
653

654
    public int size() {
655
      return (addressGroups != null) ? addressGroups.size() : 0;
1✔
656
    }
657
  }
658

659
  private static final class SubchannelData {
660
    private final Subchannel subchannel;
661
    private ConnectivityState state;
662
    private boolean completedConnectivityAttempt = false;
1✔
663
    private ConnectivityStateInfo healthStateInfo = ConnectivityStateInfo.forNonError(IDLE);
1✔
664

665
    public SubchannelData(Subchannel subchannel, ConnectivityState state) {
1✔
666
      this.subchannel = subchannel;
1✔
667
      this.state = state;
1✔
668
    }
1✔
669

670
    public Subchannel getSubchannel() {
671
      return this.subchannel;
1✔
672
    }
673

674
    public ConnectivityState getState() {
675
      return this.state;
1✔
676
    }
677

678
    public boolean isCompletedConnectivityAttempt() {
679
      return completedConnectivityAttempt;
1✔
680
    }
681

682
    private void updateState(ConnectivityState newState) {
683
      this.state = newState;
1✔
684
      if (newState == READY || newState == TRANSIENT_FAILURE) {
1✔
685
        completedConnectivityAttempt = true;
1✔
686
      } else if (newState == IDLE) {
1✔
687
        completedConnectivityAttempt = false;
1✔
688
      }
689
    }
1✔
690

691
    private ConnectivityState getHealthState() {
692
      return healthStateInfo.getState();
1✔
693
    }
694
  }
695

696
  public static final class PickFirstLeafLoadBalancerConfig {
697

698
    @Nullable
699
    public final Boolean shuffleAddressList;
700

701
    // For testing purposes only, not meant to be parsed from a real config.
702
    @Nullable
703
    final Long randomSeed;
704

705
    public PickFirstLeafLoadBalancerConfig(@Nullable Boolean shuffleAddressList) {
706
      this(shuffleAddressList, null);
1✔
707
    }
1✔
708

709
    PickFirstLeafLoadBalancerConfig(@Nullable Boolean shuffleAddressList,
710
        @Nullable Long randomSeed) {
1✔
711
      this.shuffleAddressList = shuffleAddressList;
1✔
712
      this.randomSeed = randomSeed;
1✔
713
    }
1✔
714
  }
715
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc