• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

grpc / grpc-java / #19750

25 Mar 2025 12:12PM UTC coverage: 88.589% (+0.01%) from 88.579%
#19750

push

github

web-flow
services: Avoid cancellation exceptions when notifying watchers that already have their connections cancelled (#11934)

Some clients watching health status can cancel their watch and `HealthService` when trying to notify these watchers were getting CANCELLED exception because there was no cancellation  handler set on the `StreamObserver`. This change sets the cancellation handler that removes the watcher from the set of watcher clients to be notified of the health status.

34617 of 39076 relevant lines covered (88.59%)

0.89 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

97.8
/../services/src/main/java/io/grpc/protobuf/services/HealthServiceImpl.java
1
/*
2
 * Copyright 2016 The gRPC Authors
3
 *
4
 * Licensed under the Apache License, Version 2.0 (the "License");
5
 * you may not use this file except in compliance with the License.
6
 * You may obtain a copy of the License at
7
 *
8
 *     http://www.apache.org/licenses/LICENSE-2.0
9
 *
10
 * Unless required by applicable law or agreed to in writing, software
11
 * distributed under the License is distributed on an "AS IS" BASIS,
12
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
 * See the License for the specific language governing permissions and
14
 * limitations under the License.
15
 */
16

17
package io.grpc.protobuf.services;
18

19
import com.google.common.annotations.VisibleForTesting;
20
import com.google.common.util.concurrent.MoreExecutors;
21
import com.google.errorprone.annotations.concurrent.GuardedBy;
22
import io.grpc.Context;
23
import io.grpc.Context.CancellationListener;
24
import io.grpc.Status;
25
import io.grpc.StatusException;
26
import io.grpc.health.v1.HealthCheckRequest;
27
import io.grpc.health.v1.HealthCheckResponse;
28
import io.grpc.health.v1.HealthCheckResponse.ServingStatus;
29
import io.grpc.health.v1.HealthGrpc;
30
import io.grpc.stub.ServerCallStreamObserver;
31
import io.grpc.stub.StreamObserver;
32
import java.util.HashMap;
33
import java.util.IdentityHashMap;
34
import java.util.Map;
35
import java.util.concurrent.ConcurrentHashMap;
36
import java.util.logging.Level;
37
import java.util.logging.Logger;
38
import javax.annotation.Nullable;
39

40
final class HealthServiceImpl extends HealthGrpc.HealthImplBase {
41

42
  private static final Logger logger = Logger.getLogger(HealthServiceImpl.class.getName());
1✔
43

44
  // Due to the latency of rpc calls, synchronization of the map does not help with consistency.
45
  // However, need use ConcurrentHashMap to allow concurrent reading by check().
46
  private final Map<String, ServingStatus> statusMap = new ConcurrentHashMap<>();
1✔
47

48
  private final Object watchLock = new Object();
1✔
49

50
  // Indicates if future status changes should be ignored.
51
  @GuardedBy("watchLock")
52
  private boolean terminal;
53

54
  // Technically a Multimap<String, StreamObserver<HealthCheckResponse>>.  The Boolean value is not
55
  // used.  The StreamObservers need to be kept in a identity-equality set, to make sure
56
  // user-defined equals() doesn't confuse our book-keeping of the StreamObservers.  Constructing
57
  // such Multimap would require extra lines and the end result is not significantly simpler, thus I
58
  // would rather not have the Guava collections dependency.
59
  @GuardedBy("watchLock")
1✔
60
  private final HashMap<String, IdentityHashMap<StreamObserver<HealthCheckResponse>, Boolean>>
61
      watchers = new HashMap<>();
62

63
  HealthServiceImpl() {
1✔
64
    // Copy of what Go and C++ do.
65
    statusMap.put(HealthStatusManager.SERVICE_NAME_ALL_SERVICES, ServingStatus.SERVING);
1✔
66
  }
1✔
67

68
  @Override
69
  public void check(HealthCheckRequest request,
70
      StreamObserver<HealthCheckResponse> responseObserver) {
71
    ServingStatus status = statusMap.get(request.getService());
1✔
72
    if (status == null) {
1✔
73
      responseObserver.onError(new StatusException(
1✔
74
          Status.NOT_FOUND.withDescription("unknown service " + request.getService())));
1✔
75
    } else {
76
      HealthCheckResponse response = HealthCheckResponse.newBuilder().setStatus(status).build();
1✔
77
      responseObserver.onNext(response);
1✔
78
      responseObserver.onCompleted();
1✔
79
    }
80
  }
1✔
81

82
  @Override
83
  public void watch(HealthCheckRequest request,
84
      final StreamObserver<HealthCheckResponse> responseObserver) {
85
    final String service = request.getService();
1✔
86
    synchronized (watchLock) {
1✔
87
      if (responseObserver instanceof ServerCallStreamObserver) {
1✔
88
        ((ServerCallStreamObserver) responseObserver).setOnCancelHandler(() -> {
1✔
89
          removeWatcher(service, responseObserver);
1✔
90
        });
1✔
91
      }
92
      ServingStatus status = statusMap.get(service);
1✔
93
      responseObserver.onNext(getResponseForWatch(status));
1✔
94
      IdentityHashMap<StreamObserver<HealthCheckResponse>, Boolean> serviceWatchers =
1✔
95
          watchers.get(service);
1✔
96
      if (serviceWatchers == null) {
1✔
97
        serviceWatchers = new IdentityHashMap<>();
1✔
98
        watchers.put(service, serviceWatchers);
1✔
99
      }
100
      serviceWatchers.put(responseObserver, Boolean.TRUE);
1✔
101
    }
1✔
102
    Context.current().addListener(
1✔
103
        new CancellationListener() {
1✔
104
          @Override
105
          // Called when the client has closed the stream
106
          public void cancelled(Context context) {
107
            removeWatcher(service, responseObserver);
1✔
108
          }
1✔
109
        },
110
        MoreExecutors.directExecutor());
1✔
111
  }
1✔
112

113
  void removeWatcher(String service, StreamObserver<HealthCheckResponse> responseObserver) {
114
    synchronized (watchLock) {
1✔
115
      IdentityHashMap<StreamObserver<HealthCheckResponse>, Boolean> serviceWatchers =
1✔
116
              watchers.get(service);
1✔
117
      if (serviceWatchers != null) {
1✔
118
        serviceWatchers.remove(responseObserver);
1✔
119
        if (serviceWatchers.isEmpty()) {
1✔
120
          watchers.remove(service);
1✔
121
        }
122
      }
123
    }
1✔
124
  }
1✔
125

126
  void setStatus(String service, ServingStatus status) {
127
    synchronized (watchLock) {
1✔
128
      if (terminal) {
1✔
129
        logger.log(Level.FINE, "Ignoring status {} for {}", new Object[]{status, service});
1✔
130
        return;
1✔
131
      }
132
      setStatusInternal(service, status);
1✔
133
    }
1✔
134
  }
1✔
135

136
  @GuardedBy("watchLock")
137
  private void setStatusInternal(String service, ServingStatus status) {
138
    ServingStatus prevStatus = statusMap.put(service, status);
1✔
139
    if (prevStatus != status) {
1✔
140
      notifyWatchers(service, status);
1✔
141
    }
142
  }
1✔
143

144
  void clearStatus(String service) {
145
    synchronized (watchLock) {
1✔
146
      if (terminal) {
1✔
147
        logger.log(Level.FINE, "Ignoring status clearing for {}", new Object[]{service});
1✔
148
        return;
1✔
149
      }
150
      ServingStatus prevStatus = statusMap.remove(service);
1✔
151
      if (prevStatus != null) {
1✔
152
        notifyWatchers(service, null);
1✔
153
      }
154
    }
1✔
155
  }
1✔
156

157
  void enterTerminalState() {
158
    synchronized (watchLock) {
1✔
159
      if (terminal) {
1✔
160
        logger.log(Level.WARNING, "Already terminating", new RuntimeException());
×
161
        return;
×
162
      }
163
      terminal = true;
1✔
164
      for (String service : statusMap.keySet()) {
1✔
165
        setStatusInternal(service, ServingStatus.NOT_SERVING);
1✔
166
      }
1✔
167
    }
1✔
168
  }
1✔
169

170
  @VisibleForTesting
171
  int numWatchersForTest(String service) {
172
    synchronized (watchLock) {
1✔
173
      IdentityHashMap<StreamObserver<HealthCheckResponse>, Boolean> serviceWatchers =
1✔
174
          watchers.get(service);
1✔
175
      if (serviceWatchers == null) {
1✔
176
        return 0;
1✔
177
      }
178
      return serviceWatchers.size();
1✔
179
    }
180
  }
181

182
  @GuardedBy("watchLock")
183
  private void notifyWatchers(String service, @Nullable ServingStatus status) {
184
    HealthCheckResponse response = getResponseForWatch(status);
1✔
185
    IdentityHashMap<StreamObserver<HealthCheckResponse>, Boolean> serviceWatchers =
1✔
186
        watchers.get(service);
1✔
187
    if (serviceWatchers != null) {
1✔
188
      for (StreamObserver<HealthCheckResponse> responseObserver : serviceWatchers.keySet()) {
1✔
189
        responseObserver.onNext(response);
1✔
190
      }
1✔
191
    }
192
  }
1✔
193

194
  private static HealthCheckResponse getResponseForWatch(@Nullable ServingStatus recordedStatus) {
195
    return HealthCheckResponse.newBuilder().setStatus(
1✔
196
        recordedStatus == null ? ServingStatus.SERVICE_UNKNOWN : recordedStatus).build();
1✔
197
  }
198
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc