• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

grpc / grpc-java / #19108

21 Mar 2024 10:37PM UTC coverage: 88.277% (-0.002%) from 88.279%
#19108

push

github

web-flow
Allow configuration of the queued byte threshold at which a Stream is considered not ready (#10977)

* Allow the queued byte threshold for a Stream to be ready to be configurable

- on clients this is exposed by setting a CallOption
- on servers this is configured by calling a method on ServerCall or ServerStreamListener

31190 of 35332 relevant lines covered (88.28%)

0.88 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

96.27
/../core/src/main/java/io/grpc/internal/ServerCallImpl.java
1
/*
2
 * Copyright 2015 The gRPC Authors
3
 *
4
 * Licensed under the Apache License, Version 2.0 (the "License");
5
 * you may not use this file except in compliance with the License.
6
 * You may obtain a copy of the License at
7
 *
8
 *     http://www.apache.org/licenses/LICENSE-2.0
9
 *
10
 * Unless required by applicable law or agreed to in writing, software
11
 * distributed under the License is distributed on an "AS IS" BASIS,
12
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
 * See the License for the specific language governing permissions and
14
 * limitations under the License.
15
 */
16

17
package io.grpc.internal;
18

19
import static com.google.common.base.Preconditions.checkArgument;
20
import static com.google.common.base.Preconditions.checkNotNull;
21
import static com.google.common.base.Preconditions.checkState;
22
import static io.grpc.internal.GrpcAttributes.ATTR_SECURITY_LEVEL;
23
import static io.grpc.internal.GrpcUtil.ACCEPT_ENCODING_SPLITTER;
24
import static io.grpc.internal.GrpcUtil.CONTENT_LENGTH_KEY;
25
import static io.grpc.internal.GrpcUtil.MESSAGE_ACCEPT_ENCODING_KEY;
26
import static io.grpc.internal.GrpcUtil.MESSAGE_ENCODING_KEY;
27

28
import com.google.common.annotations.VisibleForTesting;
29
import com.google.common.base.Throwables;
30
import com.google.common.util.concurrent.MoreExecutors;
31
import io.grpc.Attributes;
32
import io.grpc.Codec;
33
import io.grpc.Compressor;
34
import io.grpc.CompressorRegistry;
35
import io.grpc.Context;
36
import io.grpc.DecompressorRegistry;
37
import io.grpc.InternalDecompressorRegistry;
38
import io.grpc.InternalStatus;
39
import io.grpc.Metadata;
40
import io.grpc.MethodDescriptor;
41
import io.grpc.SecurityLevel;
42
import io.grpc.ServerCall;
43
import io.grpc.Status;
44
import io.grpc.StatusRuntimeException;
45
import io.perfmark.PerfMark;
46
import io.perfmark.Tag;
47
import io.perfmark.TaskCloseable;
48
import java.io.InputStream;
49
import java.util.logging.Level;
50
import java.util.logging.Logger;
51

52
final class ServerCallImpl<ReqT, RespT> extends ServerCall<ReqT, RespT> {
53

54
  private static final Logger log = Logger.getLogger(ServerCallImpl.class.getName());
1✔
55

56
  @VisibleForTesting
57
  static final String TOO_MANY_RESPONSES = "Too many responses";
58
  @VisibleForTesting
59
  static final String MISSING_RESPONSE = "Completed without a response";
60

61
  private final ServerStream stream;
62
  private final MethodDescriptor<ReqT, RespT> method;
63
  private final Tag tag;
64
  private final Context.CancellableContext context;
65
  private final byte[] messageAcceptEncoding;
66
  private final DecompressorRegistry decompressorRegistry;
67
  private final CompressorRegistry compressorRegistry;
68
  private CallTracer serverCallTracer;
69

70
  // state
71
  private volatile boolean cancelled;
72
  private boolean sendHeadersCalled;
73
  private boolean closeCalled;
74
  private Compressor compressor;
75
  private boolean messageSent;
76

77
  ServerCallImpl(ServerStream stream, MethodDescriptor<ReqT, RespT> method,
78
      Metadata inboundHeaders, Context.CancellableContext context,
79
      DecompressorRegistry decompressorRegistry, CompressorRegistry compressorRegistry,
80
      CallTracer serverCallTracer, Tag tag) {
1✔
81
    this.stream = stream;
1✔
82
    this.method = method;
1✔
83
    this.context = context;
1✔
84
    this.messageAcceptEncoding = inboundHeaders.get(MESSAGE_ACCEPT_ENCODING_KEY);
1✔
85
    this.decompressorRegistry = decompressorRegistry;
1✔
86
    this.compressorRegistry = compressorRegistry;
1✔
87
    this.serverCallTracer = serverCallTracer;
1✔
88
    this.serverCallTracer.reportCallStarted();
1✔
89
    this.tag = tag;
1✔
90
  }
1✔
91

92
  @Override
93
  public void request(int numMessages) {
94
    try (TaskCloseable ignore = PerfMark.traceTask("ServerCall.request")) {
1✔
95
      PerfMark.attachTag(tag);
1✔
96
      stream.request(numMessages);
1✔
97
    }
98
  }
1✔
99

100
  @Override
101
  public void sendHeaders(Metadata headers) {
102
    try (TaskCloseable ignore = PerfMark.traceTask("ServerCall.sendHeaders")) {
1✔
103
      PerfMark.attachTag(tag);
1✔
104
      sendHeadersInternal(headers);
1✔
105
    }
106
  }
1✔
107

108
  private void sendHeadersInternal(Metadata headers) {
109
    checkState(!sendHeadersCalled, "sendHeaders has already been called");
1✔
110
    checkState(!closeCalled, "call is closed");
1✔
111

112
    headers.discardAll(CONTENT_LENGTH_KEY);
1✔
113
    headers.discardAll(MESSAGE_ENCODING_KEY);
1✔
114
    if (compressor == null) {
1✔
115
      compressor = Codec.Identity.NONE;
1✔
116
    } else {
117
      if (messageAcceptEncoding != null) {
1✔
118
        // TODO(carl-mastrangelo): remove the string allocation.
119
        if (!GrpcUtil.iterableContains(
1✔
120
            ACCEPT_ENCODING_SPLITTER.split(new String(messageAcceptEncoding, GrpcUtil.US_ASCII)),
1✔
121
            compressor.getMessageEncoding())) {
1✔
122
          // resort to using no compression.
123
          compressor = Codec.Identity.NONE;
1✔
124
        }
125
      } else {
126
        compressor = Codec.Identity.NONE;
1✔
127
      }
128
    }
129

130
    // Always put compressor, even if it's identity.
131
    headers.put(MESSAGE_ENCODING_KEY, compressor.getMessageEncoding());
1✔
132

133
    stream.setCompressor(compressor);
1✔
134

135
    headers.discardAll(MESSAGE_ACCEPT_ENCODING_KEY);
1✔
136
    byte[] advertisedEncodings =
1✔
137
        InternalDecompressorRegistry.getRawAdvertisedMessageEncodings(decompressorRegistry);
1✔
138
    if (advertisedEncodings.length != 0) {
1✔
139
      headers.put(MESSAGE_ACCEPT_ENCODING_KEY, advertisedEncodings);
1✔
140
    }
141

142
    // Don't check if sendMessage has been called, since it requires that sendHeaders was already
143
    // called.
144
    sendHeadersCalled = true;
1✔
145
    stream.writeHeaders(headers, !getMethodDescriptor().getType().serverSendsOneMessage());
1✔
146
  }
1✔
147

148
  @Override
149
  public void sendMessage(RespT message) {
150
    try (TaskCloseable ignore = PerfMark.traceTask("ServerCall.sendMessage")) {
1✔
151
      PerfMark.attachTag(tag);
1✔
152
      sendMessageInternal(message);
1✔
153
    }
154
  }
1✔
155

156
  private void sendMessageInternal(RespT message) {
157
    checkState(sendHeadersCalled, "sendHeaders has not been called");
1✔
158
    checkState(!closeCalled, "call is closed");
1✔
159

160
    if (method.getType().serverSendsOneMessage() && messageSent) {
1✔
161
      handleInternalError(Status.INTERNAL.withDescription(TOO_MANY_RESPONSES).asRuntimeException());
1✔
162
      return;
1✔
163
    }
164

165
    messageSent = true;
1✔
166
    try {
167
      InputStream resp = method.streamResponse(message);
1✔
168
      stream.writeMessage(resp);
1✔
169
      if (!getMethodDescriptor().getType().serverSendsOneMessage()) {
1✔
170
        stream.flush();
1✔
171
      }
172
    } catch (RuntimeException e) {
1✔
173
      handleInternalError(e);
1✔
174
    } catch (Error e) {
×
175
      close(
×
176
          Status.CANCELLED.withDescription("Server sendMessage() failed with Error"),
×
177
          new Metadata());
178
      throw e;
×
179
    }
1✔
180
  }
1✔
181

182
  @Override
183
  public void setMessageCompression(boolean enable) {
184
    stream.setMessageCompression(enable);
1✔
185
  }
1✔
186

187
  @Override
188
  public void setOnReadyThreshold(int numBytes) {
189
    stream.setOnReadyThreshold(numBytes);
1✔
190
  }
1✔
191

192
  @Override
193
  public void setCompression(String compressorName) {
194
    // Added here to give a better error message.
195
    checkState(!sendHeadersCalled, "sendHeaders has been called");
1✔
196

197
    compressor = compressorRegistry.lookupCompressor(compressorName);
1✔
198
    checkArgument(compressor != null, "Unable to find compressor by name %s", compressorName);
1✔
199
  }
1✔
200

201
  @Override
202
  public boolean isReady() {
203
    if (closeCalled) {
1✔
204
      return false;
1✔
205
    }
206
    return stream.isReady();
1✔
207
  }
208

209
  @Override
210
  public void close(Status status, Metadata trailers) {
211
    try (TaskCloseable ignore = PerfMark.traceTask("ServerCall.close")) {
1✔
212
      PerfMark.attachTag(tag);
1✔
213
      closeInternal(status, trailers);
1✔
214
    }
215
  }
1✔
216

217
  private void closeInternal(Status status, Metadata trailers) {
218
    checkState(!closeCalled, "call already closed");
1✔
219
    try {
220
      closeCalled = true;
1✔
221

222
      if (status.isOk() && method.getType().serverSendsOneMessage() && !messageSent) {
1✔
223
        handleInternalError(Status.INTERNAL.withDescription(MISSING_RESPONSE).asRuntimeException());
1✔
224
        return;
1✔
225
      }
226

227
      stream.close(status, trailers);
1✔
228
    } finally {
229
      serverCallTracer.reportCallEnded(status.isOk());
1✔
230
    }
231
  }
1✔
232

233
  @Override
234
  public boolean isCancelled() {
235
    return cancelled;
1✔
236
  }
237

238
  ServerStreamListener newServerStreamListener(ServerCall.Listener<ReqT> listener) {
239
    return new ServerStreamListenerImpl<>(this, listener, context);
1✔
240
  }
241

242
  @Override
243
  public Attributes getAttributes() {
244
    return stream.getAttributes();
1✔
245
  }
246

247
  @Override
248
  public String getAuthority() {
249
    return stream.getAuthority();
1✔
250
  }
251

252
  @Override
253
  public MethodDescriptor<ReqT, RespT> getMethodDescriptor() {
254
    return method;
1✔
255
  }
256

257
  @Override
258
  public SecurityLevel getSecurityLevel() {
259
    final Attributes attributes = getAttributes();
1✔
260
    if (attributes == null) {
1✔
261
      return super.getSecurityLevel();
1✔
262
    }
263
    final SecurityLevel securityLevel = attributes.get(ATTR_SECURITY_LEVEL);
1✔
264
    return securityLevel == null ? super.getSecurityLevel() : securityLevel;
1✔
265
  }
266

267
  /**
268
   * Close the {@link ServerStream} because an internal error occurred. Allow the application to
269
   * run until completion, but silently ignore interactions with the {@link ServerStream} from now
270
   * on.
271
   */
272
  private void handleInternalError(Throwable internalError) {
273
    log.log(Level.WARNING, "Cancelling the stream because of internal error", internalError);
1✔
274
    Status status = (internalError instanceof StatusRuntimeException)
1✔
275
        ? ((StatusRuntimeException) internalError).getStatus()
1✔
276
        : Status.INTERNAL.withCause(internalError)
1✔
277
            .withDescription("Internal error so cancelling stream.");
1✔
278
    stream.cancel(status);
1✔
279
    serverCallTracer.reportCallEnded(false); // error so always false
1✔
280
  }
1✔
281

282
  /**
283
   * All of these callbacks are assumed to called on an application thread, and the caller is
284
   * responsible for handling thrown exceptions.
285
   */
286
  @VisibleForTesting
287
  static final class ServerStreamListenerImpl<ReqT> implements ServerStreamListener {
288
    private final ServerCallImpl<ReqT, ?> call;
289
    private final ServerCall.Listener<ReqT> listener;
290
    private final Context.CancellableContext context;
291

292
    public ServerStreamListenerImpl(
293
        ServerCallImpl<ReqT, ?> call, ServerCall.Listener<ReqT> listener,
294
        Context.CancellableContext context) {
1✔
295
      this.call = checkNotNull(call, "call");
1✔
296
      this.listener = checkNotNull(listener, "listener must not be null");
1✔
297
      this.context = checkNotNull(context, "context");
1✔
298
      // Wire ourselves up so that if the context is cancelled, our flag call.cancelled also
299
      // reflects the new state. Use a DirectExecutor so that it happens in the same thread
300
      // as the caller of {@link Context#cancel}.
301
      this.context.addListener(
1✔
302
          new Context.CancellationListener() {
1✔
303
            @Override
304
            public void cancelled(Context context) {
305
              // If the context has a cancellation cause then something exceptional happened
306
              // and we should also mark the call as cancelled.
307
              if (context.cancellationCause() != null) {
1✔
308
                ServerStreamListenerImpl.this.call.cancelled = true;
1✔
309
              }
310
            }
1✔
311
          },
312
          MoreExecutors.directExecutor());
1✔
313
    }
1✔
314

315
    @Override
316
    public void messagesAvailable(MessageProducer producer) {
317
      try (TaskCloseable ignore = PerfMark.traceTask("ServerStreamListener.messagesAvailable")) {
1✔
318
        PerfMark.attachTag(call.tag);
1✔
319
        messagesAvailableInternal(producer);
1✔
320
      }
321
    }
1✔
322

323
    @SuppressWarnings("Finally") // The code avoids suppressing the exception thrown from try
324
    private void messagesAvailableInternal(final MessageProducer producer) {
325
      if (call.cancelled) {
1✔
326
        GrpcUtil.closeQuietly(producer);
1✔
327
        return;
1✔
328
      }
329

330
      InputStream message;
331
      try {
332
        while ((message = producer.next()) != null) {
1✔
333
          try {
334
            listener.onMessage(call.method.parseRequest(message));
1✔
335
          } catch (Throwable t) {
1✔
336
            GrpcUtil.closeQuietly(message);
1✔
337
            throw t;
1✔
338
          }
1✔
339
          message.close();
1✔
340
        }
341
      } catch (Throwable t) {
1✔
342
        GrpcUtil.closeQuietly(producer);
1✔
343
        Throwables.throwIfUnchecked(t);
×
344
        throw new RuntimeException(t);
×
345
      }
1✔
346
    }
1✔
347

348
    @Override
349
    public void halfClosed() {
350
      try (TaskCloseable ignore = PerfMark.traceTask("ServerStreamListener.halfClosed")) {
1✔
351
        PerfMark.attachTag(call.tag);
1✔
352
        if (call.cancelled) {
1✔
353
          return;
1✔
354
        }
355

356
        listener.onHalfClose();
1✔
357
      }
358
    }
1✔
359

360
    @Override
361
    public void closed(Status status) {
362
      try (TaskCloseable ignore = PerfMark.traceTask("ServerStreamListener.closed")) {
1✔
363
        PerfMark.attachTag(call.tag);
1✔
364
        closedInternal(status);
1✔
365
      }
366
    }
1✔
367

368
    private void closedInternal(Status status) {
369
      Throwable cancelCause = null;
1✔
370
      try {
371
        if (status.isOk()) {
1✔
372
          listener.onComplete();
1✔
373
        } else {
374
          call.cancelled = true;
1✔
375
          listener.onCancel();
1✔
376
          // The status will not have a cause in all failure scenarios but we want to make sure
377
          // we always cancel the context with one to keep the context cancelled state consistent.
378
          cancelCause = InternalStatus.asRuntimeException(
1✔
379
              Status.CANCELLED.withDescription("RPC cancelled"), null, false);
1✔
380
        }
381
      } finally {
382
        // Cancel context after delivering RPC closure notification to allow the application to
383
        // clean up and update any state based on whether onComplete or onCancel was called.
384
        context.cancel(cancelCause);
1✔
385
      }
386
    }
1✔
387

388
    @Override
389
    public void onReady() {
390
      try (TaskCloseable ignore = PerfMark.traceTask("ServerStreamListener.onReady")) {
1✔
391
        PerfMark.attachTag(call.tag);
1✔
392
        if (call.cancelled) {
1✔
393
          return;
1✔
394
        }
395
        listener.onReady();
1✔
396
      }
397
    }
1✔
398
  }
399
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc