• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

grpc / grpc-java / #19802

08 May 2025 06:04AM UTC coverage: 88.611% (-0.005%) from 88.616%
#19802

push

github

web-flow
xds: Use acceptResolvedAddresses() for WeightedTarget children (#12053)

Convert the tests to use acceptResolvedAddresses() as well.

34780 of 39250 relevant lines covered (88.61%)

0.89 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

89.11
/../services/src/main/java/io/grpc/protobuf/services/HealthCheckingLoadBalancerFactory.java
1
/*
2
 * Copyright 2018 The gRPC Authors
3
 *
4
 * Licensed under the Apache License, Version 2.0 (the "License");
5
 * you may not use this file except in compliance with the License.
6
 * You may obtain a copy of the License at
7
 *
8
 *     http://www.apache.org/licenses/LICENSE-2.0
9
 *
10
 * Unless required by applicable law or agreed to in writing, software
11
 * distributed under the License is distributed on an "AS IS" BASIS,
12
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
 * See the License for the specific language governing permissions and
14
 * limitations under the License.
15
 */
16

17
package io.grpc.protobuf.services;
18

19
import static com.google.common.base.Preconditions.checkNotNull;
20
import static com.google.common.base.Preconditions.checkState;
21
import static io.grpc.ConnectivityState.CONNECTING;
22
import static io.grpc.ConnectivityState.IDLE;
23
import static io.grpc.ConnectivityState.READY;
24
import static io.grpc.ConnectivityState.SHUTDOWN;
25
import static io.grpc.LoadBalancer.HEALTH_CONSUMER_LISTENER_ARG_KEY;
26

27
import com.google.common.annotations.VisibleForTesting;
28
import com.google.common.base.MoreObjects;
29
import com.google.common.base.Objects;
30
import com.google.common.base.Stopwatch;
31
import com.google.common.base.Supplier;
32
import io.grpc.CallOptions;
33
import io.grpc.ChannelLogger;
34
import io.grpc.ChannelLogger.ChannelLogLevel;
35
import io.grpc.ClientCall;
36
import io.grpc.ConnectivityStateInfo;
37
import io.grpc.LoadBalancer;
38
import io.grpc.LoadBalancer.CreateSubchannelArgs;
39
import io.grpc.LoadBalancer.Helper;
40
import io.grpc.LoadBalancer.Subchannel;
41
import io.grpc.LoadBalancer.SubchannelStateListener;
42
import io.grpc.Metadata;
43
import io.grpc.Status;
44
import io.grpc.Status.Code;
45
import io.grpc.SynchronizationContext;
46
import io.grpc.SynchronizationContext.ScheduledHandle;
47
import io.grpc.health.v1.HealthCheckRequest;
48
import io.grpc.health.v1.HealthCheckResponse;
49
import io.grpc.health.v1.HealthCheckResponse.ServingStatus;
50
import io.grpc.health.v1.HealthGrpc;
51
import io.grpc.internal.BackoffPolicy;
52
import io.grpc.internal.ServiceConfigUtil;
53
import io.grpc.util.ForwardingLoadBalancer;
54
import io.grpc.util.ForwardingLoadBalancerHelper;
55
import io.grpc.util.ForwardingSubchannel;
56
import io.grpc.util.HealthProducerHelper;
57
import java.util.HashSet;
58
import java.util.Map;
59
import java.util.concurrent.ScheduledExecutorService;
60
import java.util.concurrent.TimeUnit;
61
import java.util.logging.Level;
62
import java.util.logging.Logger;
63
import javax.annotation.Nullable;
64

65
/**
66
 * Wraps a {@link LoadBalancer} and implements the client-side health-checking
67
 * (https://github.com/grpc/proposal/blob/master/A17-client-side-health-checking.md).  The
68
 * Subchannel received by the states wrapped LoadBalancer will be determined by health-checking.
69
 *
70
 * <p>Note the original LoadBalancer must call {@code Helper.createSubchannel()} from the
71
 * SynchronizationContext, or it will throw.
72
 */
73
final class HealthCheckingLoadBalancerFactory extends LoadBalancer.Factory {
74
  private static final Logger logger =
1✔
75
      Logger.getLogger(HealthCheckingLoadBalancerFactory.class.getName());
1✔
76

77
  private final LoadBalancer.Factory delegateFactory;
78
  private final BackoffPolicy.Provider backoffPolicyProvider;
79
  private final Supplier<Stopwatch> stopwatchSupplier;
80

81
  public HealthCheckingLoadBalancerFactory(
82
      LoadBalancer.Factory delegateFactory, BackoffPolicy.Provider backoffPolicyProvider,
83
      Supplier<Stopwatch> stopwatchSupplier) {
1✔
84
    this.delegateFactory = checkNotNull(delegateFactory, "delegateFactory");
1✔
85
    this.backoffPolicyProvider = checkNotNull(backoffPolicyProvider, "backoffPolicyProvider");
1✔
86
    this.stopwatchSupplier = checkNotNull(stopwatchSupplier, "stopwatchSupplier");
1✔
87
  }
1✔
88

89
  @Override
90
  public LoadBalancer newLoadBalancer(Helper helper) {
91
    HelperImpl wrappedHelper = new HelperImpl(helper);
1✔
92
    LoadBalancer delegateBalancer = delegateFactory.newLoadBalancer(wrappedHelper);
1✔
93
    return new HealthCheckingLoadBalancer(wrappedHelper, delegateBalancer);
1✔
94
  }
95

96
  private final class HelperImpl extends ForwardingLoadBalancerHelper {
97
    private final Helper delegate;
98
    private final SynchronizationContext syncContext;
99

100
    @Nullable String healthCheckedService;
101

102
    final HashSet<HealthCheckState> hcStates = new HashSet<>();
1✔
103

104
    HelperImpl(Helper delegate) {
1✔
105
      this.delegate = new HealthProducerHelper(checkNotNull(delegate, "delegate"));
1✔
106
      this.syncContext = checkNotNull(delegate.getSynchronizationContext(), "syncContext");
1✔
107
    }
1✔
108

109
    @Override
110
    protected Helper delegate() {
111
      return delegate;
1✔
112
    }
113

114
    @Override
115
    public Subchannel createSubchannel(CreateSubchannelArgs args) {
116
      // HealthCheckState is not thread-safe, we are requiring the original LoadBalancer calls
117
      // createSubchannel() from the SynchronizationContext.
118
      syncContext.throwIfNotInThisSynchronizationContext();
1✔
119
      LoadBalancer.SubchannelStateListener healthConsumerListener =
1✔
120
          args.getOption(HEALTH_CONSUMER_LISTENER_ARG_KEY);
1✔
121
      HealthCheckState hcState = new HealthCheckState(
1✔
122
          this, syncContext, delegate.getScheduledExecutorService(), healthConsumerListener);
1✔
123
      if (healthConsumerListener != null) {
1✔
124
        args = args.toBuilder().addOption(HEALTH_CONSUMER_LISTENER_ARG_KEY, hcState).build();
1✔
125
      }
126
      Subchannel delegate = super.createSubchannel(args);
1✔
127
      hcState.setSubchannel(delegate);
1✔
128
      hcStates.add(hcState);
1✔
129
      Subchannel subchannel = new SubchannelImpl(delegate, hcState);
1✔
130
      if (healthCheckedService != null) {
1✔
131
        hcState.setServiceName(healthCheckedService);
1✔
132
      }
133
      return subchannel;
1✔
134
    }
135

136
    void setHealthCheckedService(@Nullable String service) {
137
      healthCheckedService = service;
1✔
138
      for (HealthCheckState hcState : hcStates) {
1✔
139
        hcState.setServiceName(service);
1✔
140
      }
1✔
141
    }
1✔
142

143
    @Override
144
    public String toString() {
145
      return MoreObjects.toStringHelper(this).add("delegate", delegate()).toString();
×
146
    }
147
  }
148

149
  @VisibleForTesting
150
  static final class SubchannelImpl extends ForwardingSubchannel {
151
    final Subchannel delegate;
152
    final HealthCheckState hcState;
153

154
    SubchannelImpl(Subchannel delegate, HealthCheckState hcState) {
1✔
155
      this.delegate = checkNotNull(delegate, "delegate");
1✔
156
      this.hcState = checkNotNull(hcState, "hcState");
1✔
157
    }
1✔
158

159
    @Override
160
    protected Subchannel delegate() {
161
      return delegate;
1✔
162
    }
163

164
    @Override
165
    public void start(final SubchannelStateListener listener) {
166
      if (hcState.stateListener == null) {
1✔
167
        hcState.init(listener);
1✔
168
        delegate().start(hcState);
1✔
169
      } else {
170
        delegate().start(listener);
1✔
171
      }
172
    }
1✔
173
  }
174

175
  private static final class HealthCheckingLoadBalancer extends ForwardingLoadBalancer {
176
    final LoadBalancer delegate;
177
    final HelperImpl helper;
178

179
    HealthCheckingLoadBalancer(HelperImpl helper, LoadBalancer delegate) {
1✔
180
      this.helper = checkNotNull(helper, "helper");
1✔
181
      this.delegate = checkNotNull(delegate, "delegate");
1✔
182
    }
1✔
183

184
    @Override
185
    protected LoadBalancer delegate() {
186
      return delegate;
1✔
187
    }
188

189
    @Override
190
    public void handleResolvedAddresses(ResolvedAddresses resolvedAddresses) {
191
      Map<String, ?> healthCheckingConfig =
×
192
          resolvedAddresses
193
              .getAttributes()
×
194
              .get(LoadBalancer.ATTR_HEALTH_CHECKING_CONFIG);
×
195
      String serviceName = ServiceConfigUtil.getHealthCheckedServiceName(healthCheckingConfig);
×
196
      helper.setHealthCheckedService(serviceName);
×
197
      delegate.handleResolvedAddresses(resolvedAddresses);
×
198
    }
×
199

200
    @Override
201
    public Status acceptResolvedAddresses(ResolvedAddresses resolvedAddresses) {
202
      Map<String, ?> healthCheckingConfig =
1✔
203
          resolvedAddresses
204
              .getAttributes()
1✔
205
              .get(LoadBalancer.ATTR_HEALTH_CHECKING_CONFIG);
1✔
206
      String serviceName = ServiceConfigUtil.getHealthCheckedServiceName(healthCheckingConfig);
1✔
207
      helper.setHealthCheckedService(serviceName);
1✔
208
      return delegate.acceptResolvedAddresses(resolvedAddresses);
1✔
209
    }
210

211
    @Override
212
    public String toString() {
213
      return MoreObjects.toStringHelper(this).add("delegate", delegate()).toString();
×
214
    }
215
  }
216

217
  
218
  // All methods are run from syncContext
219
  private final class HealthCheckState implements SubchannelStateListener {
220
    private final Runnable retryTask = new Runnable() {
1✔
221
        @Override
222
        public void run() {
223
          startRpc();
1✔
224
        }
1✔
225
      };
226

227
    private final SynchronizationContext syncContext;
228
    private final ScheduledExecutorService timerService;
229
    private final HelperImpl helperImpl;
230
    private Subchannel subchannel;
231
    private ChannelLogger subchannelLogger;
232

233
    // In dual stack new pick first, this becomes health listener from create subchannel args.
234
    private SubchannelStateListener stateListener;
235

236
    // Set when RPC started. Cleared when the RPC has closed or abandoned.
237
    @Nullable
238
    private HcStream activeRpc;
239

240
    // The service name that should be used for health checking
241
    private String serviceName;
242
    private BackoffPolicy backoffPolicy;
243
    // The state from the underlying Subchannel
244
    private ConnectivityStateInfo rawState = ConnectivityStateInfo.forNonError(IDLE);
1✔
245
    // The state concluded from health checking
246
    private ConnectivityStateInfo concludedState = ConnectivityStateInfo.forNonError(IDLE);
1✔
247
    // true if a health check stream should be kept.  When true, either there is an active RPC, or a
248
    // retry is pending.
249
    private boolean running;
250
    // true if server returned UNIMPLEMENTED
251
    private boolean disabled;
252
    private ScheduledHandle retryTimer;
253

254
    HealthCheckState(
255
        HelperImpl helperImpl, SynchronizationContext syncContext,
256
        ScheduledExecutorService timerService,
257
        @Nullable SubchannelStateListener healthListener) {
1✔
258
      this.helperImpl = checkNotNull(helperImpl, "helperImpl");
1✔
259
      this.syncContext = checkNotNull(syncContext, "syncContext");
1✔
260
      this.timerService = checkNotNull(timerService, "timerService");
1✔
261
      this.stateListener = healthListener;
1✔
262
    }
1✔
263

264
    void setSubchannel(Subchannel subchannel) {
265
      this.subchannel = checkNotNull(subchannel, "subchannel");
1✔
266
      this.subchannelLogger = checkNotNull(subchannel.getChannelLogger(), "subchannelLogger");
1✔
267
    }
1✔
268

269
    // Only called in old pick first. Can be removed after migration.
270
    void init(SubchannelStateListener listener) {
271
      checkState(this.stateListener == null, "init() already called");
1✔
272
      this.stateListener = checkNotNull(listener, "listener");
1✔
273
    }
1✔
274

275
    void setServiceName(@Nullable String newServiceName) {
276
      if (Objects.equal(newServiceName, serviceName)) {
1✔
277
        return;
1✔
278
      }
279
      serviceName = newServiceName;
1✔
280
      // If service name has changed while there is active RPC, cancel it so that
281
      // a new call will be made with the new name.
282
      String cancelMsg =
283
          serviceName == null ? "Health check disabled by service config"
1✔
284
          : "Switching to new service name: " + newServiceName;
1✔
285
      stopRpc(cancelMsg);
1✔
286
      adjustHealthCheck();
1✔
287
    }
1✔
288

289
    @Override
290
    public void onSubchannelState(ConnectivityStateInfo rawState) {
291
      if (Objects.equal(this.rawState.getState(), READY)
1✔
292
          && !Objects.equal(rawState.getState(), READY)) {
1✔
293
        // A connection was lost.  We will reset disabled flag because health check
294
        // may be available on the new connection.
295
        disabled = false;
1✔
296
      }
297
      if (Objects.equal(rawState.getState(), SHUTDOWN)) {
1✔
298
        helperImpl.hcStates.remove(this);
1✔
299
      }
300
      this.rawState = rawState;
1✔
301
      adjustHealthCheck();
1✔
302
    }
1✔
303

304
    private boolean isRetryTimerPending() {
305
      return retryTimer != null && retryTimer.isPending();
1✔
306
    }
307

308
    // Start or stop health check according to the current states.
309
    private void adjustHealthCheck() {
310
      if (!disabled && serviceName != null && Objects.equal(rawState.getState(), READY)) {
1✔
311
        running = true;
1✔
312
        if (activeRpc == null && !isRetryTimerPending()) {
1✔
313
          startRpc();
1✔
314
        }
315
      } else {
316
        running = false;
1✔
317
        // Prerequisites for health checking not met.
318
        // Make sure it's stopped.
319
        stopRpc("Client stops health check");
1✔
320
        backoffPolicy = null;
1✔
321
        gotoState(rawState);
1✔
322
      }
323
    }
1✔
324

325
    private void startRpc() {
326
      checkState(serviceName != null, "serviceName is null");
1✔
327
      checkState(activeRpc == null, "previous health-checking RPC has not been cleaned up");
1✔
328
      checkState(subchannel != null, "init() not called");
1✔
329
      // Optimization suggested by @markroth: if we are already READY and starting the health
330
      // checking RPC, either because health check is just enabled or has switched to a new service
331
      // name, we don't go to CONNECTING, otherwise there will be artificial delays on RPCs
332
      // waiting for the health check to respond.
333
      if (!Objects.equal(concludedState.getState(), READY)) {
1✔
334
        subchannelLogger.log(
1✔
335
            ChannelLogLevel.INFO, "CONNECTING: Starting health-check for \"{0}\"", serviceName);
336
        gotoState(ConnectivityStateInfo.forNonError(CONNECTING));
1✔
337
      }
338
      activeRpc = new HcStream();
1✔
339
      activeRpc.start();
1✔
340
    }
1✔
341

342
    private void stopRpc(String msg) {
343
      if (activeRpc != null) {
1✔
344
        activeRpc.cancel(msg);
1✔
345
        // Abandon this RPC.  We are not interested in anything from this RPC any more.
346
        activeRpc = null;
1✔
347
      }
348
      if (retryTimer != null) {
1✔
349
        retryTimer.cancel();
1✔
350
        retryTimer = null;
1✔
351
      }
352
    }
1✔
353

354
    private void gotoState(ConnectivityStateInfo newState) {
355
      checkState(subchannel != null, "init() not called");
1✔
356
      if (!Objects.equal(concludedState, newState)) {
1✔
357
        concludedState = newState;
1✔
358
        stateListener.onSubchannelState(concludedState);
1✔
359
      }
360
    }
1✔
361

362
    @Override
363
    public String toString() {
364
      return MoreObjects.toStringHelper(this)
×
365
          .add("running", running)
×
366
          .add("disabled", disabled)
×
367
          .add("activeRpc", activeRpc)
×
368
          .add("serviceName", serviceName)
×
369
          .add("rawState", rawState)
×
370
          .add("concludedState", concludedState)
×
371
          .toString();
×
372
    }
373

374
    private class HcStream extends ClientCall.Listener<HealthCheckResponse> {
375
      private final ClientCall<HealthCheckRequest, HealthCheckResponse> call;
376
      private final String callServiceName;
377
      private final Stopwatch stopwatch;
378
      private boolean callHasResponded;
379

380
      HcStream() {
1✔
381
        stopwatch = stopwatchSupplier.get().start();
1✔
382
        callServiceName = serviceName;
1✔
383
        call = subchannel.asChannel().newCall(HealthGrpc.getWatchMethod(), CallOptions.DEFAULT);
1✔
384
      }
1✔
385

386
      void start() {
387
        call.start(this, new Metadata());
1✔
388
        call.sendMessage(HealthCheckRequest.newBuilder().setService(serviceName).build());
1✔
389
        call.halfClose();
1✔
390
        call.request(1);
1✔
391
      }
1✔
392

393
      void cancel(String msg) {
394
        call.cancel(msg, null);
1✔
395
      }
1✔
396

397
      @Override
398
      public void onMessage(final HealthCheckResponse response) {
399
        syncContext.execute(new Runnable() {
1✔
400
            @Override
401
            public void run() {
402
              if (activeRpc == HcStream.this) {
1✔
403
                handleResponse(response);
1✔
404
              }
405
            }
1✔
406
          });
407
      }
1✔
408

409
      @Override
410
      public void onClose(final Status status, Metadata trailers) {
411
        syncContext.execute(new Runnable() {
1✔
412
            @Override
413
            public void run() {
414
              if (activeRpc == HcStream.this) {
1✔
415
                activeRpc = null;
1✔
416
                handleStreamClosed(status);
1✔
417
              }
418
            }
1✔
419
          });
420
      }
1✔
421

422
      void handleResponse(HealthCheckResponse response) {
423
        callHasResponded = true;
1✔
424
        backoffPolicy = null;
1✔
425
        ServingStatus status = response.getStatus();
1✔
426
        // running == true means the Subchannel's state (rawState) is READY
427
        if (Objects.equal(status, ServingStatus.SERVING)) {
1✔
428
          subchannelLogger.log(ChannelLogLevel.INFO, "READY: health-check responded SERVING");
1✔
429
          gotoState(ConnectivityStateInfo.forNonError(READY));
1✔
430
        } else {
431
          subchannelLogger.log(
1✔
432
              ChannelLogLevel.INFO, "TRANSIENT_FAILURE: health-check responded {0}", status);
433
          String errorDescription =  "Health-check service responded "
1✔
434
              + status + " for '" + callServiceName + "'";
435
          gotoState(ConnectivityStateInfo.forTransientFailure(
1✔
436
              Status.UNAVAILABLE.withDescription(errorDescription)));
1✔
437
        }
438
        call.request(1);
1✔
439
      }
1✔
440

441
      void handleStreamClosed(Status status) {
442
        if (Objects.equal(status.getCode(), Code.UNIMPLEMENTED)) {
1✔
443
          disabled = true;
1✔
444
          logger.log(
1✔
445
              Level.SEVERE, "Health-check with {0} is disabled. Server returned: {1}",
446
              new Object[] {subchannel.getAllAddresses(), status});
1✔
447
          subchannelLogger.log(ChannelLogLevel.ERROR, "Health-check disabled: {0}", status);
1✔
448
          subchannelLogger.log(ChannelLogLevel.INFO, "{0} (no health-check)", rawState);
1✔
449
          gotoState(rawState);
1✔
450
          return;
1✔
451
        }
452
        long delayNanos = 0;
1✔
453
        subchannelLogger.log(
1✔
454
            ChannelLogLevel.INFO, "TRANSIENT_FAILURE: health-check stream closed with {0}", status);
455
        String errorDescription = "Health-check stream unexpectedly closed with "
1✔
456
            + status + " for '" + callServiceName + "'";
457
        gotoState(ConnectivityStateInfo.forTransientFailure(
1✔
458
            Status.UNAVAILABLE.withDescription(errorDescription)));
1✔
459
        // Use backoff only when server has not responded for the previous call
460
        if (!callHasResponded) {
1✔
461
          if (backoffPolicy == null) {
1✔
462
            backoffPolicy = backoffPolicyProvider.get();
1✔
463
          }
464
          delayNanos = backoffPolicy.nextBackoffNanos() - stopwatch.elapsed(TimeUnit.NANOSECONDS);
1✔
465
        }
466
        if (delayNanos <= 0) {
1✔
467
          startRpc();
1✔
468
        } else {
469
          checkState(!isRetryTimerPending(), "Retry double scheduled");
1✔
470
          subchannelLogger.log(
1✔
471
              ChannelLogLevel.DEBUG, "Will retry health-check after {0} ns", delayNanos);
1✔
472
          retryTimer = syncContext.schedule(
1✔
473
              retryTask, delayNanos, TimeUnit.NANOSECONDS, timerService);
1✔
474
        }
475
      }
1✔
476

477
      @Override
478
      public String toString() {
479
        return MoreObjects.toStringHelper(this)
×
480
            .add("callStarted", call != null)
×
481
            .add("serviceName", callServiceName)
×
482
            .add("hasResponded", callHasResponded)
×
483
            .toString();
×
484
      }
485
    }
486
  }
487
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc