• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

grpc / grpc-java / #19548

11 Nov 2024 09:16PM UTC coverage: 84.61% (+0.005%) from 84.605%
#19548

push

github

ejona86
util: Remove EAG conveniences from MultiChildLb

This is a step toward removing ResolvedAddresses from ChildLbState,
which isn't actually used by MultiChildLb. Most usages of the EAG usages
can be served more directly without peering into MultiChildLb's
internals or even accessing ChildLbStates, which make the tests less
sensitive to implementation changes. Some changes do leverage the new
behavior of MultiChildLb where it preserves the order of the entries.

This does fix an important bug in shutdown tests. The tests looped over
the ChildLbStates after shutdown, but shutdown deleted all the children
so it looped over an entry collection. Fixing that exposed that
deliverSubchannelState() didn't function after shutdown, as the listener
was removed from the map when the subchannel was shut down. Moving the
listener onto the TestSubchannel allowed having access to the listener
even after shutdown.

A few places in LeastRequestLb lines were just deleted, but that's
because an existing assertion already provided the same check but
without digging into MultiChildLb.

34091 of 40292 relevant lines covered (84.61%)

0.85 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

88.89
/../xds/src/main/java/io/grpc/xds/LeastRequestLoadBalancer.java
1
/*
2
 * Copyright 2021 The gRPC Authors
3
 *
4
 * Licensed under the Apache License, Version 2.0 (the "License");
5
 * you may not use this file except in compliance with the License.
6
 * You may obtain a copy of the License at
7
 *
8
 *     http://www.apache.org/licenses/LICENSE-2.0
9
 *
10
 * Unless required by applicable law or agreed to in writing, software
11
 * distributed under the License is distributed on an "AS IS" BASIS,
12
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
 * See the License for the specific language governing permissions and
14
 * limitations under the License.
15
 */
16

17
package io.grpc.xds;
18

19
import static com.google.common.base.Preconditions.checkArgument;
20
import static com.google.common.base.Preconditions.checkNotNull;
21
import static io.grpc.ConnectivityState.CONNECTING;
22
import static io.grpc.ConnectivityState.IDLE;
23
import static io.grpc.ConnectivityState.READY;
24
import static io.grpc.ConnectivityState.TRANSIENT_FAILURE;
25
import static io.grpc.xds.LeastRequestLoadBalancerProvider.DEFAULT_CHOICE_COUNT;
26
import static io.grpc.xds.LeastRequestLoadBalancerProvider.MAX_CHOICE_COUNT;
27
import static io.grpc.xds.LeastRequestLoadBalancerProvider.MIN_CHOICE_COUNT;
28

29
import com.google.common.annotations.VisibleForTesting;
30
import com.google.common.base.MoreObjects;
31
import io.grpc.Attributes;
32
import io.grpc.ClientStreamTracer;
33
import io.grpc.ClientStreamTracer.StreamInfo;
34
import io.grpc.ConnectivityState;
35
import io.grpc.LoadBalancer;
36
import io.grpc.LoadBalancerProvider;
37
import io.grpc.Metadata;
38
import io.grpc.Status;
39
import io.grpc.util.MultiChildLoadBalancer;
40
import io.grpc.xds.ThreadSafeRandom.ThreadSafeRandomImpl;
41
import java.util.ArrayList;
42
import java.util.HashSet;
43
import java.util.List;
44
import java.util.concurrent.atomic.AtomicInteger;
45

46
/**
47
 * A {@link LoadBalancer} that provides least request load balancing based on
48
 * outstanding request counters.
49
 * It works by sampling a number of subchannels and picking the one with the
50
 * fewest amount of outstanding requests.
51
 * The default sampling amount of two is also known as
52
 * the "power of two choices" (P2C).
53
 */
54
final class LeastRequestLoadBalancer extends MultiChildLoadBalancer {
55
  private final ThreadSafeRandom random;
56

57
  private SubchannelPicker currentPicker = new EmptyPicker();
1✔
58
  private int choiceCount = DEFAULT_CHOICE_COUNT;
1✔
59

60
  LeastRequestLoadBalancer(Helper helper) {
61
    this(helper, ThreadSafeRandomImpl.instance);
1✔
62
  }
1✔
63

64
  @VisibleForTesting
65
  LeastRequestLoadBalancer(Helper helper, ThreadSafeRandom random) {
66
    super(helper);
1✔
67
    this.random = checkNotNull(random, "random");
1✔
68
  }
1✔
69

70
  @Override
71
  public Status acceptResolvedAddresses(ResolvedAddresses resolvedAddresses) {
72
    // Need to update choiceCount before calling super so that the updateBalancingState call has the
73
    // new value.  However, if the update fails we need to revert it.
74
    int oldChoiceCount = choiceCount;
1✔
75
    LeastRequestConfig config =
1✔
76
        (LeastRequestConfig) resolvedAddresses.getLoadBalancingPolicyConfig();
1✔
77
    if (config != null) {
1✔
78
      choiceCount = config.choiceCount;
1✔
79
    }
80

81
    Status addressAcceptanceStatus = super.acceptResolvedAddresses(resolvedAddresses);
1✔
82

83
    if (!addressAcceptanceStatus.isOk()) {
1✔
84
      choiceCount = oldChoiceCount;
1✔
85
    }
86

87
    return addressAcceptanceStatus;
1✔
88
  }
89

90
  /**
91
   * Updates picker with the list of active subchannels (state == READY).
92
   *
93
   *  <p>
94
   * If no active subchannels exist, but some are in TRANSIENT_FAILURE then returns a picker
95
   * with all of the children in TF so that the application code will get an error from a varying
96
   * random one when it tries to get a subchannel.
97
   * </p>
98
   */
99
  @SuppressWarnings("ReferenceEquality")
100
  @Override
101
  protected void updateOverallBalancingState() {
102
    List<ChildLbState> activeList = getReadyChildren();
1✔
103
    if (activeList.isEmpty()) {
1✔
104
      // No READY subchannels, determine aggregate state and error status
105
      boolean isConnecting = false;
1✔
106
      List<ChildLbState> childrenInTf = new ArrayList<>();
1✔
107
      for (ChildLbState childLbState : getChildLbStates()) {
1✔
108
        ConnectivityState state = childLbState.getCurrentState();
1✔
109
        if (state == CONNECTING || state == IDLE) {
1✔
110
          isConnecting = true;
1✔
111
        } else if (state == TRANSIENT_FAILURE) {
1✔
112
          childrenInTf.add(childLbState);
1✔
113
        }
114
      }
1✔
115
      if (isConnecting) {
1✔
116
        updateBalancingState(CONNECTING, new EmptyPicker());
1✔
117
      } else {
118
        // Give it all the failing children and let it randomly pick among them
119
        updateBalancingState(TRANSIENT_FAILURE,
1✔
120
            new ReadyPicker(childrenInTf, choiceCount, random));
121
      }
122
    } else {
1✔
123
      updateBalancingState(READY, new ReadyPicker(activeList, choiceCount, random));
1✔
124
    }
125
  }
1✔
126

127
  @Override
128
  protected ChildLbState createChildLbState(Object key) {
129
    return new LeastRequestLbState(key, pickFirstLbProvider);
1✔
130
  }
131

132
  private void updateBalancingState(ConnectivityState state, SubchannelPicker picker) {
133
    if (state != currentConnectivityState || !picker.equals(currentPicker)) {
1✔
134
      getHelper().updateBalancingState(state, picker);
1✔
135
      currentConnectivityState = state;
1✔
136
      currentPicker = picker;
1✔
137
    }
138
  }
1✔
139

140
  /**
141
   * This should ONLY be used by tests.
142
   */
143
  @VisibleForTesting
144
  void setResolvingAddresses(boolean newValue) {
145
    super.resolvingAddresses = newValue;
1✔
146
  }
1✔
147

148
  // Expose for tests in this package.
149
  private static AtomicInteger getInFlights(ChildLbState childLbState) {
150
    return ((LeastRequestLbState)childLbState).activeRequests;
1✔
151
  }
152

153
  @VisibleForTesting
154
  static final class ReadyPicker extends SubchannelPicker {
155
    private final List<SubchannelPicker> childPickers; // non-empty
156
    private final List<AtomicInteger> childInFlights; // 1:1 with childPickers
157
    private final int choiceCount;
158
    private final ThreadSafeRandom random;
159
    private final int hashCode;
160

161
    ReadyPicker(List<ChildLbState> childLbStates, int choiceCount, ThreadSafeRandom random) {
1✔
162
      checkArgument(!childLbStates.isEmpty(), "empty list");
1✔
163
      this.childPickers = new ArrayList<>(childLbStates.size());
1✔
164
      this.childInFlights = new ArrayList<>(childLbStates.size());
1✔
165
      for (ChildLbState state : childLbStates) {
1✔
166
        childPickers.add(state.getCurrentPicker());
1✔
167
        childInFlights.add(getInFlights(state));
1✔
168
      }
1✔
169
      this.choiceCount = choiceCount;
1✔
170
      this.random = checkNotNull(random, "random");
1✔
171

172
      int sum = 0;
1✔
173
      for (SubchannelPicker child : childPickers) {
1✔
174
        sum += child.hashCode();
1✔
175
      }
1✔
176
      this.hashCode = sum ^ choiceCount;
1✔
177
    }
1✔
178

179
    @Override
180
    public PickResult pickSubchannel(PickSubchannelArgs args) {
181
      int child = nextChildToUse();
1✔
182
      PickResult childResult = childPickers.get(child).pickSubchannel(args);
1✔
183

184
      if (!childResult.getStatus().isOk() || childResult.getSubchannel() == null) {
1✔
185
        return childResult;
×
186
      }
187

188
      if (childResult.getStreamTracerFactory() != null) {
1✔
189
        // Already wrapped, so just use the current picker for selected child
190
        return childResult;
×
191
      } else {
192
        // Wrap the subchannel
193
        OutstandingRequestsTracingFactory factory =
1✔
194
            new OutstandingRequestsTracingFactory(childInFlights.get(child));
1✔
195
        return PickResult.withSubchannel(childResult.getSubchannel(), factory);
1✔
196
      }
197
    }
198

199
    @Override
200
    public String toString() {
201
      return MoreObjects.toStringHelper(ReadyPicker.class)
×
202
                        .add("list", childPickers)
×
203
                        .add("choiceCount", choiceCount)
×
204
                        .toString();
×
205
    }
206

207
    private int nextChildToUse() {
208
      int candidate = random.nextInt(childPickers.size());
1✔
209
      for (int i = 0; i < choiceCount - 1; ++i) {
1✔
210
        int sampled = random.nextInt(childPickers.size());
1✔
211
        if (childInFlights.get(sampled).get() < childInFlights.get(candidate).get()) {
1✔
212
          candidate = sampled;
1✔
213
        }
214
      }
215
      return candidate;
1✔
216
    }
217

218
    @VisibleForTesting
219
    List<SubchannelPicker> getChildPickers() {
220
      return childPickers;
1✔
221
    }
222

223
    @Override
224
    public int hashCode() {
225
      return hashCode;
×
226
    }
227

228
    @Override
229
    public boolean equals(Object o) {
230
      if (!(o instanceof ReadyPicker)) {
1✔
231
        return false;
1✔
232
      }
233
      ReadyPicker other = (ReadyPicker) o;
1✔
234
      if (other == this) {
1✔
235
        return true;
×
236
      }
237
      // the lists cannot contain duplicate children
238
      return hashCode == other.hashCode
1✔
239
          && choiceCount == other.choiceCount
240
          && childPickers.size() == other.childPickers.size()
1✔
241
          && new HashSet<>(childPickers).containsAll(other.childPickers);
1✔
242
    }
243
  }
244

245
  @VisibleForTesting
246
  static final class EmptyPicker extends SubchannelPicker {
1✔
247
    @Override
248
    public PickResult pickSubchannel(PickSubchannelArgs args) {
249
      return PickResult.withNoResult();
1✔
250
    }
251

252
    @Override
253
    public int hashCode() {
254
      return getClass().hashCode();
×
255
    }
256

257
    @Override
258
    public boolean equals(Object o) {
259
      return o instanceof EmptyPicker;
1✔
260
    }
261

262
    @Override
263
    public String toString() {
264
      return MoreObjects.toStringHelper(EmptyPicker.class).toString();
×
265
    }
266
  }
267

268
  private static final class OutstandingRequestsTracingFactory extends
269
      ClientStreamTracer.Factory {
270
    private final AtomicInteger inFlights;
271

272
    private OutstandingRequestsTracingFactory(AtomicInteger inFlights) {
1✔
273
      this.inFlights = checkNotNull(inFlights, "inFlights");
1✔
274
    }
1✔
275

276
    @Override
277
    public ClientStreamTracer newClientStreamTracer(StreamInfo info, Metadata headers) {
278
      return new ClientStreamTracer() {
1✔
279
        @Override
280
        public void streamCreated(Attributes transportAttrs, Metadata headers) {
281
          inFlights.incrementAndGet();
1✔
282
        }
1✔
283

284
        @Override
285
        public void streamClosed(Status status) {
286
          inFlights.decrementAndGet();
1✔
287
        }
1✔
288
      };
289
    }
290
  }
291

292
  static final class LeastRequestConfig {
293
    final int choiceCount;
294

295
    LeastRequestConfig(int choiceCount) {
1✔
296
      checkArgument(choiceCount >= MIN_CHOICE_COUNT, "choiceCount <= 1");
1✔
297
      // Even though a choiceCount value larger than 2 is currently considered valid in xDS
298
      // we restrict it to 10 here as specified in "A48: xDS Least Request LB Policy".
299
      this.choiceCount = Math.min(choiceCount, MAX_CHOICE_COUNT);
1✔
300
    }
1✔
301

302
    @Override
303
    public String toString() {
304
      return MoreObjects.toStringHelper(this)
×
305
          .add("choiceCount", choiceCount)
×
306
          .toString();
×
307
    }
308
  }
309

310
  protected class LeastRequestLbState extends ChildLbState {
311
    private final AtomicInteger activeRequests = new AtomicInteger(0);
1✔
312

313
    public LeastRequestLbState(Object key, LoadBalancerProvider policyProvider) {
1✔
314
      super(key, policyProvider);
1✔
315
    }
1✔
316

317
    int getActiveRequests() {
318
      return activeRequests.get();
1✔
319
    }
320

321
    @Override
322
    protected ChildLbStateHelper createChildHelper() {
323
      return new ChildLbStateHelper() {
1✔
324
        @Override
325
        public void updateBalancingState(ConnectivityState newState, SubchannelPicker newPicker) {
326
          super.updateBalancingState(newState, newPicker);
1✔
327
          if (!resolvingAddresses && newState == IDLE) {
1✔
328
            getLb().requestConnection();
1✔
329
          }
330
        }
1✔
331
      };
332
    }
333
  }
334
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc