• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

grpc / grpc-java / #19211

08 May 2024 10:50PM UTC coverage: 88.309% (-0.02%) from 88.328%
#19211

push

github

ejona86
xds: Plumb locality in xds_cluster_impl and weighted_target

As part of gRFC A78:

> To support the locality label in the WRR metrics, we will extend the
> `weighted_target` LB policy (see A28) to define a resolver attribute
> that indicates the name of its child. This attribute will be passed
> down to each of its children with the appropriate value, so that any
> LB policy that sits underneath the `weighted_target` policy will be
> able to use it.

xds_cluster_impl is involved because it uses the child names in the
AddressFilter, which must match the names used by weighted_target.
Instead of using Locality.toString() in multiple policies and assuming
the policies agree, we now have xds_cluster_impl decide the locality's
name and pass it down explicitly. This allows us to change the name
format to match gRFC A78:

> If locality information is available, the value of this label will be
> of the form `{region="${REGION}", zone="${ZONE}",
> sub_zone="${SUB_ZONE}"}`, where `${REGION}`, `${ZONE}`, and
> `${SUB_ZONE}` are replaced with the actual values. If no locality
> information is available, the label will be set to the empty string.

31515 of 35687 relevant lines covered (88.31%)

0.88 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

96.26
/../xds/src/main/java/io/grpc/xds/WeightedTargetLoadBalancer.java
1
/*
2
 * Copyright 2020 The gRPC Authors
3
 *
4
 * Licensed under the Apache License, Version 2.0 (the "License");
5
 * you may not use this file except in compliance with the License.
6
 * You may obtain a copy of the License at
7
 *
8
 *     http://www.apache.org/licenses/LICENSE-2.0
9
 *
10
 * Unless required by applicable law or agreed to in writing, software
11
 * distributed under the License is distributed on an "AS IS" BASIS,
12
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
 * See the License for the specific language governing permissions and
14
 * limitations under the License.
15
 */
16

17
package io.grpc.xds;
18

19
import static com.google.common.base.Preconditions.checkNotNull;
20
import static io.grpc.ConnectivityState.CONNECTING;
21
import static io.grpc.ConnectivityState.IDLE;
22
import static io.grpc.ConnectivityState.READY;
23
import static io.grpc.ConnectivityState.TRANSIENT_FAILURE;
24

25
import com.google.common.collect.ImmutableMap;
26
import io.grpc.Attributes;
27
import io.grpc.ConnectivityState;
28
import io.grpc.InternalLogId;
29
import io.grpc.LoadBalancer;
30
import io.grpc.Status;
31
import io.grpc.util.ForwardingLoadBalancerHelper;
32
import io.grpc.util.GracefulSwitchLoadBalancer;
33
import io.grpc.xds.WeightedRandomPicker.WeightedChildPicker;
34
import io.grpc.xds.WeightedTargetLoadBalancerProvider.WeightedPolicySelection;
35
import io.grpc.xds.WeightedTargetLoadBalancerProvider.WeightedTargetConfig;
36
import io.grpc.xds.client.XdsLogger;
37
import io.grpc.xds.client.XdsLogger.XdsLogLevel;
38
import java.util.ArrayList;
39
import java.util.HashMap;
40
import java.util.List;
41
import java.util.Map;
42
import javax.annotation.Nullable;
43

44
/** Load balancer for weighted_target policy. */
45
final class WeightedTargetLoadBalancer extends LoadBalancer {
46
  public static final Attributes.Key<String> CHILD_NAME =
1✔
47
      Attributes.Key.create("io.grpc.xds.WeightedTargetLoadBalancer.CHILD_NAME");
1✔
48

49
  private final XdsLogger logger;
50
  private final Map<String, GracefulSwitchLoadBalancer> childBalancers = new HashMap<>();
1✔
51
  private final Map<String, ChildHelper> childHelpers = new HashMap<>();
1✔
52
  private final Helper helper;
53

54
  private Map<String, WeightedPolicySelection> targets = ImmutableMap.of();
1✔
55
  // Set to true if currently in the process of handling resolved addresses.
56
  private boolean resolvingAddresses;
57

58
  WeightedTargetLoadBalancer(Helper helper) {
1✔
59
    this.helper = checkNotNull(helper, "helper");
1✔
60
    logger = XdsLogger.withLogId(
1✔
61
        InternalLogId.allocate("weighted-target-lb", helper.getAuthority()));
1✔
62
    logger.log(XdsLogLevel.INFO, "Created");
1✔
63
  }
1✔
64

65
  @Override
66
  public Status acceptResolvedAddresses(ResolvedAddresses resolvedAddresses) {
67
    try {
68
      resolvingAddresses = true;
1✔
69
      return acceptResolvedAddressesInternal(resolvedAddresses);
1✔
70
    } finally {
71
      resolvingAddresses = false;
1✔
72
    }
73
  }
74

75
  public Status acceptResolvedAddressesInternal(ResolvedAddresses resolvedAddresses) {
76
    logger.log(XdsLogLevel.DEBUG, "Received resolution result: {0}", resolvedAddresses);
1✔
77
    Object lbConfig = resolvedAddresses.getLoadBalancingPolicyConfig();
1✔
78
    checkNotNull(lbConfig, "missing weighted_target lb config");
1✔
79
    WeightedTargetConfig weightedTargetConfig = (WeightedTargetConfig) lbConfig;
1✔
80
    Map<String, WeightedPolicySelection> newTargets = weightedTargetConfig.targets;
1✔
81
    for (String targetName : newTargets.keySet()) {
1✔
82
      WeightedPolicySelection weightedChildLbConfig = newTargets.get(targetName);
1✔
83
      if (!targets.containsKey(targetName)) {
1✔
84
        ChildHelper childHelper = new ChildHelper(targetName);
1✔
85
        GracefulSwitchLoadBalancer childBalancer = new GracefulSwitchLoadBalancer(childHelper);
1✔
86
        childBalancer.switchTo(weightedChildLbConfig.policySelection.getProvider());
1✔
87
        childHelpers.put(targetName, childHelper);
1✔
88
        childBalancers.put(targetName, childBalancer);
1✔
89
      } else if (!weightedChildLbConfig.policySelection.getProvider().equals(
1✔
90
          targets.get(targetName).policySelection.getProvider())) {
1✔
91
        childBalancers.get(targetName)
×
92
            .switchTo(weightedChildLbConfig.policySelection.getProvider());
×
93
      }
94
    }
1✔
95
    targets = newTargets;
1✔
96
    for (String targetName : targets.keySet()) {
1✔
97
      childBalancers.get(targetName).handleResolvedAddresses(
1✔
98
          resolvedAddresses.toBuilder()
1✔
99
              .setAddresses(AddressFilter.filter(resolvedAddresses.getAddresses(), targetName))
1✔
100
              .setLoadBalancingPolicyConfig(targets.get(targetName).policySelection.getConfig())
1✔
101
              .setAttributes(resolvedAddresses.getAttributes().toBuilder()
1✔
102
                .set(CHILD_NAME, targetName)
1✔
103
                .build())
1✔
104
              .build());
1✔
105
    }
1✔
106

107
    // Cleanup removed targets.
108
    // TODO(zdapeng): cache removed target for 15 minutes.
109
    for (String targetName : childBalancers.keySet()) {
1✔
110
      if (!targets.containsKey(targetName)) {
1✔
111
        childBalancers.get(targetName).shutdown();
1✔
112
      }
113
    }
1✔
114
    childBalancers.keySet().retainAll(targets.keySet());
1✔
115
    childHelpers.keySet().retainAll(targets.keySet());
1✔
116
    updateOverallBalancingState();
1✔
117
    return Status.OK;
1✔
118
  }
119

120
  @Override
121
  public void handleNameResolutionError(Status error) {
122
    logger.log(XdsLogLevel.WARNING, "Received name resolution error: {0}", error);
1✔
123
    if (childBalancers.isEmpty()) {
1✔
124
      helper.updateBalancingState(
1✔
125
          TRANSIENT_FAILURE, new FixedResultPicker(PickResult.withError(error)));
1✔
126
    }
127
    for (LoadBalancer childBalancer : childBalancers.values()) {
1✔
128
      childBalancer.handleNameResolutionError(error);
1✔
129
    }
1✔
130
  }
1✔
131

132
  @Override
133
  public boolean canHandleEmptyAddressListFromNameResolution() {
134
    return true;
×
135
  }
136

137
  @Override
138
  public void shutdown() {
139
    logger.log(XdsLogLevel.INFO, "Shutdown");
1✔
140
    for (LoadBalancer childBalancer : childBalancers.values()) {
1✔
141
      childBalancer.shutdown();
1✔
142
    }
1✔
143
    childBalancers.clear();
1✔
144
  }
1✔
145

146
  private void updateOverallBalancingState() {
147
    List<WeightedChildPicker> childPickers = new ArrayList<>();
1✔
148

149
    ConnectivityState overallState = null;
1✔
150
    List<WeightedChildPicker> errorPickers = new ArrayList<>();
1✔
151
    for (String name : targets.keySet()) {
1✔
152
      ChildHelper childHelper = childHelpers.get(name);
1✔
153
      ConnectivityState childState = childHelper.currentState;
1✔
154
      overallState = aggregateState(overallState, childState);
1✔
155
      int weight = targets.get(name).weight;
1✔
156
      if (READY == childState) {
1✔
157
        childPickers.add(new WeightedChildPicker(weight, childHelper.currentPicker));
1✔
158
      } else if (TRANSIENT_FAILURE == childState) {
1✔
159
        errorPickers.add(new WeightedChildPicker(weight, childHelper.currentPicker));
1✔
160
      }
161
    }
1✔
162

163
    SubchannelPicker picker;
164
    if (childPickers.isEmpty()) {
1✔
165
      if (overallState == TRANSIENT_FAILURE) {
1✔
166
        picker = new WeightedRandomPicker(errorPickers);
1✔
167
      } else {
168
        picker = new FixedResultPicker(PickResult.withNoResult());
1✔
169
      }
170
    } else {
171
      picker = new WeightedRandomPicker(childPickers);
1✔
172
    }
173

174
    if (overallState != null) {
1✔
175
      helper.updateBalancingState(overallState, picker);
1✔
176
    }
177
  }
1✔
178

179
  @Nullable
180
  private static ConnectivityState aggregateState(
181
      @Nullable ConnectivityState overallState, ConnectivityState childState) {
182
    if (overallState == null) {
1✔
183
      return childState;
1✔
184
    }
185
    if (overallState == READY || childState == READY) {
1✔
186
      return READY;
1✔
187
    }
188
    if (overallState == CONNECTING || childState == CONNECTING) {
1✔
189
      return CONNECTING;
1✔
190
    }
191
    if (overallState == IDLE || childState == IDLE) {
1✔
192
      return IDLE;
×
193
    }
194
    return overallState;
1✔
195
  }
196

197
  private final class ChildHelper extends ForwardingLoadBalancerHelper {
198
    String name;
199
    ConnectivityState currentState = CONNECTING;
1✔
200
    SubchannelPicker currentPicker = new FixedResultPicker(PickResult.withNoResult());
1✔
201

202
    private ChildHelper(String name) {
1✔
203
      this.name = name;
1✔
204
    }
1✔
205

206
    @Override
207
    public void updateBalancingState(final ConnectivityState newState,
208
        final SubchannelPicker newPicker) {
209
      currentState = newState;
1✔
210
      currentPicker = newPicker;
1✔
211

212
      // If we are already in the process of resolving addresses, the overall balancing state
213
      // will be updated at the end of it, and we don't need to trigger that update here.
214
      if (!resolvingAddresses && childBalancers.containsKey(name)) {
1✔
215
        updateOverallBalancingState();
1✔
216
      }
217
    }
1✔
218

219
    @Override
220
    protected Helper delegate() {
221
      return helper;
1✔
222
    }
223
  }
224
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc