• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

grpc / grpc-java / #19438

28 Aug 2024 09:34PM UTC coverage: 84.493% (-0.02%) from 84.516%
#19438

push

github

ejona86
xds: ClusterManagerLB must update child configuration

While child LB policies are unlikey to change for each cluster name (RLS
returns regular cluster names, so should be unique), and the
configuration for CDS policies won't change, RLS configuration can
definitely change.

33407 of 39538 relevant lines covered (84.49%)

0.84 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

99.17
/../xds/src/main/java/io/grpc/xds/ClusterManagerLoadBalancer.java
1
/*
2
 * Copyright 2020 The gRPC Authors
3
 *
4
 * Licensed under the Apache License, Version 2.0 (the "License");
5
 * you may not use this file except in compliance with the License.
6
 * You may obtain a copy of the License at
7
 *
8
 *     http://www.apache.org/licenses/LICENSE-2.0
9
 *
10
 * Unless required by applicable law or agreed to in writing, software
11
 * distributed under the License is distributed on an "AS IS" BASIS,
12
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
 * See the License for the specific language governing permissions and
14
 * limitations under the License.
15
 */
16

17
package io.grpc.xds;
18

19
import static com.google.common.base.Preconditions.checkNotNull;
20
import static io.grpc.ConnectivityState.TRANSIENT_FAILURE;
21

22
import com.google.common.annotations.VisibleForTesting;
23
import com.google.common.base.MoreObjects;
24
import io.grpc.ConnectivityState;
25
import io.grpc.InternalLogId;
26
import io.grpc.LoadBalancer;
27
import io.grpc.Status;
28
import io.grpc.SynchronizationContext;
29
import io.grpc.SynchronizationContext.ScheduledHandle;
30
import io.grpc.util.GracefulSwitchLoadBalancer;
31
import io.grpc.util.MultiChildLoadBalancer;
32
import io.grpc.xds.ClusterManagerLoadBalancerProvider.ClusterManagerConfig;
33
import io.grpc.xds.client.XdsLogger;
34
import io.grpc.xds.client.XdsLogger.XdsLogLevel;
35
import java.util.HashMap;
36
import java.util.Map;
37
import java.util.Map.Entry;
38
import java.util.concurrent.ScheduledExecutorService;
39
import java.util.concurrent.TimeUnit;
40
import javax.annotation.Nullable;
41

42
/**
43
 * The top-level load balancing policy for use in XDS.
44
 * This policy does not immediately delete its children.  Instead, it marks them deactivated
45
 * and starts a timer for deletion.  If a subsequent address update restores the child, then it is
46
 * simply reactivated instead of built from scratch.  This is necessary because XDS can frequently
47
 * remove and then add back a server as machines are rebooted or repurposed for load management.
48
 *
49
 * <p>Note that this LB does not automatically reconnect children who go into IDLE status
50
 */
51
class ClusterManagerLoadBalancer extends MultiChildLoadBalancer {
52

53
  // 15 minutes is long enough for a reboot and the services to restart while not so long that
54
  // many children are waiting for cleanup.
55
  @VisibleForTesting
56
  public static final int DELAYED_CHILD_DELETION_TIME_MINUTES = 15;
57
  protected final SynchronizationContext syncContext;
58
  private final ScheduledExecutorService timeService;
59
  private final XdsLogger logger;
60
  private ResolvedAddresses lastResolvedAddresses;
61

62
  ClusterManagerLoadBalancer(Helper helper) {
63
    super(helper);
1✔
64
    this.syncContext = checkNotNull(helper.getSynchronizationContext(), "syncContext");
1✔
65
    this.timeService = checkNotNull(helper.getScheduledExecutorService(), "timeService");
1✔
66
    logger = XdsLogger.withLogId(
1✔
67
        InternalLogId.allocate("cluster_manager-lb", helper.getAuthority()));
1✔
68

69
    logger.log(XdsLogLevel.INFO, "Created");
1✔
70
  }
1✔
71

72
  @Override
73
  protected ResolvedAddresses getChildAddresses(Object key, ResolvedAddresses resolvedAddresses,
74
      Object unusedChildConfig) {
75
    ClusterManagerConfig config = (ClusterManagerConfig)
1✔
76
        resolvedAddresses.getLoadBalancingPolicyConfig();
1✔
77
    Object childConfig = config.childPolicies.get(key);
1✔
78
    return resolvedAddresses.toBuilder().setLoadBalancingPolicyConfig(childConfig).build();
1✔
79
  }
80

81
  @Override
82
  protected Map<Object, ChildLbState> createChildLbMap(ResolvedAddresses resolvedAddresses) {
83
    ClusterManagerConfig config = (ClusterManagerConfig)
1✔
84
        resolvedAddresses.getLoadBalancingPolicyConfig();
1✔
85
    Map<Object, ChildLbState> newChildPolicies = new HashMap<>();
1✔
86
    if (config != null) {
1✔
87
      for (String key : config.childPolicies.keySet()) {
1✔
88
        ChildLbState child = getChildLbState(key);
1✔
89
        if (child == null) {
1✔
90
          child = new ClusterManagerLbState(key, GracefulSwitchLoadBalancerFactory.INSTANCE, null);
1✔
91
        }
92
        newChildPolicies.put(key, child);
1✔
93
      }
1✔
94
    }
95
    logger.log(
1✔
96
        XdsLogLevel.INFO,
97
        "Received cluster_manager lb config: child names={0}", newChildPolicies.keySet());
1✔
98
    return newChildPolicies;
1✔
99
  }
100

101
  /**
102
   * This is like the parent except that it doesn't shutdown the removed children since we want that
103
   * to be done by the timer.
104
   */
105
  @Override
106
  public Status acceptResolvedAddresses(ResolvedAddresses resolvedAddresses) {
107
    if (lastResolvedAddresses != null) {
1✔
108
      // Handle deactivated children
109
      ClusterManagerConfig config = (ClusterManagerConfig)
1✔
110
          resolvedAddresses.getLoadBalancingPolicyConfig();
1✔
111
      ClusterManagerConfig lastConfig = (ClusterManagerConfig)
1✔
112
          lastResolvedAddresses.getLoadBalancingPolicyConfig();
1✔
113
      Map<String, Object> adjChildPolicies = new HashMap<>(config.childPolicies);
1✔
114
      for (Entry<String, Object> entry : lastConfig.childPolicies.entrySet()) {
1✔
115
        ClusterManagerLbState state = (ClusterManagerLbState) getChildLbState(entry.getKey());
1✔
116
        if (adjChildPolicies.containsKey(entry.getKey())) {
1✔
117
          if (state.deletionTimer != null) {
1✔
118
            state.reactivateChild();
1✔
119
          }
120
        } else if (state != null) {
1✔
121
          adjChildPolicies.put(entry.getKey(), entry.getValue());
1✔
122
          if (state.deletionTimer == null) {
1✔
123
            state.deactivateChild();
1✔
124
          }
125
        }
126
      }
1✔
127
      config = new ClusterManagerConfig(adjChildPolicies);
1✔
128
      resolvedAddresses =
1✔
129
          resolvedAddresses.toBuilder().setLoadBalancingPolicyConfig(config).build();
1✔
130
    }
131
    lastResolvedAddresses = resolvedAddresses;
1✔
132
    return super.acceptResolvedAddresses(resolvedAddresses);
1✔
133
  }
134

135
  /**
136
   * Using the state of all children will calculate the current connectivity state,
137
   * update currentConnectivityState, generate a picker and then call
138
   * {@link Helper#updateBalancingState(ConnectivityState, SubchannelPicker)}.
139
   */
140
  @Override
141
  protected void updateOverallBalancingState() {
142
    ConnectivityState overallState = null;
1✔
143
    final Map<Object, SubchannelPicker> childPickers = new HashMap<>();
1✔
144
    for (ChildLbState childLbState : getChildLbStates()) {
1✔
145
      if (((ClusterManagerLbState) childLbState).deletionTimer != null) {
1✔
146
        continue;
1✔
147
      }
148
      childPickers.put(childLbState.getKey(), childLbState.getCurrentPicker());
1✔
149
      overallState = aggregateState(overallState, childLbState.getCurrentState());
1✔
150
    }
1✔
151

152
    if (overallState != null) {
1✔
153
      getHelper().updateBalancingState(overallState, getSubchannelPicker(childPickers));
1✔
154
      currentConnectivityState = overallState;
1✔
155
    }
156
  }
1✔
157

158
  protected SubchannelPicker getSubchannelPicker(Map<Object, SubchannelPicker> childPickers) {
159
    return new SubchannelPicker() {
1✔
160
      @Override
161
      public PickResult pickSubchannel(PickSubchannelArgs args) {
162
        String clusterName =
1✔
163
            args.getCallOptions().getOption(XdsNameResolver.CLUSTER_SELECTION_KEY);
1✔
164
        SubchannelPicker childPicker = childPickers.get(clusterName);
1✔
165
        if (childPicker == null) {
1✔
166
          return
1✔
167
              PickResult.withError(
1✔
168
                  Status.UNAVAILABLE.withDescription("CDS encountered error: unable to find "
1✔
169
                      + "available subchannel for cluster " + clusterName));
170
        }
171
        return childPicker.pickSubchannel(args);
1✔
172
      }
173

174
      @Override
175
      public String toString() {
176
        return MoreObjects.toStringHelper(this).add("pickers", childPickers).toString();
×
177
      }
178
    };
179
  }
180

181
  @Override
182
  public void handleNameResolutionError(Status error) {
183
    logger.log(XdsLogLevel.WARNING, "Received name resolution error: {0}", error);
1✔
184
    boolean gotoTransientFailure = true;
1✔
185
    for (ChildLbState state : getChildLbStates()) {
1✔
186
      if (((ClusterManagerLbState) state).deletionTimer == null) {
1✔
187
        gotoTransientFailure = false;
1✔
188
        state.getLb().handleNameResolutionError(error);
1✔
189
      }
190
    }
1✔
191
    if (gotoTransientFailure) {
1✔
192
      getHelper().updateBalancingState(
1✔
193
          TRANSIENT_FAILURE, new FixedResultPicker(PickResult.withError(error)));
1✔
194
    }
195
  }
1✔
196

197
  /**
198
   * This differs from the base class in the use of the deletion timer.  When it is deactivated,
199
   * rather than immediately calling shutdown it starts a timer.  If shutdown or reactivate
200
   * are called before the timer fires, the timer is canceled.  Otherwise, time timer calls shutdown
201
   * and removes the child from the petiole policy when it is triggered.
202
   */
203
  private class ClusterManagerLbState extends ChildLbState {
1✔
204
    @Nullable
205
    ScheduledHandle deletionTimer;
206

207
    public ClusterManagerLbState(Object key, LoadBalancer.Factory policyFactory,
208
        Object childConfig) {
1✔
209
      super(key, policyFactory, childConfig);
1✔
210
    }
1✔
211

212
    @Override
213
    protected ChildLbStateHelper createChildHelper() {
214
      return new ClusterManagerChildHelper();
1✔
215
    }
216

217
    @Override
218
    protected void shutdown() {
219
      if (deletionTimer != null) {
1✔
220
        deletionTimer.cancel();
1✔
221
        deletionTimer = null;
1✔
222
      }
223
      super.shutdown();
1✔
224
    }
1✔
225

226
    void reactivateChild() {
227
      assert deletionTimer != null;
1✔
228
      deletionTimer.cancel();
1✔
229
      deletionTimer = null;
1✔
230
      logger.log(XdsLogLevel.DEBUG, "Child balancer {0} reactivated", getKey());
1✔
231
    }
1✔
232

233
    void deactivateChild() {
234
      assert deletionTimer == null;
1✔
235

236
      class DeletionTask implements Runnable {
1✔
237

238
        @Override
239
        public void run() {
240
          ClusterManagerConfig config = (ClusterManagerConfig)
1✔
241
              lastResolvedAddresses.getLoadBalancingPolicyConfig();
1✔
242
          Map<String, Object> childPolicies = new HashMap<>(config.childPolicies);
1✔
243
          Object removed = childPolicies.remove(getKey());
1✔
244
          assert removed != null;
1✔
245
          config = new ClusterManagerConfig(childPolicies);
1✔
246
          lastResolvedAddresses =
1✔
247
              lastResolvedAddresses.toBuilder().setLoadBalancingPolicyConfig(config).build();
1✔
248
          acceptResolvedAddresses(lastResolvedAddresses);
1✔
249
        }
1✔
250
      }
251

252
      deletionTimer =
1✔
253
          syncContext.schedule(
1✔
254
              new DeletionTask(),
255
              DELAYED_CHILD_DELETION_TIME_MINUTES,
256
              TimeUnit.MINUTES,
257
              timeService);
1✔
258
      logger.log(XdsLogLevel.DEBUG, "Child balancer {0} deactivated", getKey());
1✔
259
    }
1✔
260

261
    private class ClusterManagerChildHelper extends ChildLbStateHelper {
1✔
262
      @Override
263
      public void updateBalancingState(final ConnectivityState newState,
264
                                       final SubchannelPicker newPicker) {
265
        // If we are already in the process of resolving addresses, the overall balancing state
266
        // will be updated at the end of it, and we don't need to trigger that update here.
267
        if (getChildLbState(getKey()) == null) {
1✔
268
          return;
1✔
269
        }
270

271
        // Subchannel picker and state are saved, but will only be propagated to the channel
272
        // when the child instance exits deactivated state.
273
        setCurrentState(newState);
1✔
274
        setCurrentPicker(newPicker);
1✔
275
        if (deletionTimer == null && !resolvingAddresses) {
1✔
276
          updateOverallBalancingState();
1✔
277
        }
278
      }
1✔
279
    }
280
  }
281

282
  static final class GracefulSwitchLoadBalancerFactory extends LoadBalancer.Factory {
1✔
283
    static final LoadBalancer.Factory INSTANCE = new GracefulSwitchLoadBalancerFactory();
1✔
284

285
    @Override
286
    public LoadBalancer newLoadBalancer(LoadBalancer.Helper helper) {
287
      return new GracefulSwitchLoadBalancer(helper);
1✔
288
    }
289
  }
290
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc