• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

grpc / grpc-java / #19108

21 Mar 2024 10:37PM UTC coverage: 88.277% (-0.002%) from 88.279%
#19108

push

github

web-flow
Allow configuration of the queued byte threshold at which a Stream is considered not ready (#10977)

* Allow the queued byte threshold for a Stream to be ready to be configurable

- on clients this is exposed by setting a CallOption
- on servers this is configured by calling a method on ServerCall or ServerStreamListener

31190 of 35332 relevant lines covered (88.28%)

0.88 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

33.33
/../stub/src/main/java/io/grpc/stub/ServerCallStreamObserver.java
1
/*
2
 * Copyright 2016 The gRPC Authors
3
 *
4
 * Licensed under the Apache License, Version 2.0 (the "License");
5
 * you may not use this file except in compliance with the License.
6
 * You may obtain a copy of the License at
7
 *
8
 *     http://www.apache.org/licenses/LICENSE-2.0
9
 *
10
 * Unless required by applicable law or agreed to in writing, software
11
 * distributed under the License is distributed on an "AS IS" BASIS,
12
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
 * See the License for the specific language governing permissions and
14
 * limitations under the License.
15
 */
16

17
package io.grpc.stub;
18

19
import io.grpc.ExperimentalApi;
20

21
/**
22
 * A refinement of {@link CallStreamObserver} to allows for interaction with call
23
 * cancellation events on the server side. An instance of this class is obtained by casting the
24
 * {@code StreamObserver} passed as an argument to service implementations.
25
 *
26
 * <p>Like {@code StreamObserver}, implementations are not required to be thread-safe; if multiple
27
 * threads will be writing to an instance concurrently, the application must synchronize its calls.
28
 *
29
 * <p>DO NOT MOCK: The API is too complex to reliably mock. Use InProcessChannelBuilder to create
30
 * "real" RPCs suitable for testing and interact with the server using a normal client stub.
31
 */
32
public abstract class ServerCallStreamObserver<RespT> extends CallStreamObserver<RespT> {
1✔
33

34
  /**
35
   * Returns {@code true} when the call is cancelled and the server is encouraged to abort
36
   * processing to save resources, since the client will not be processing any further methods.
37
   * Cancellations can be caused by timeouts, explicit cancellation by client, network errors, and
38
   * similar.
39
   *
40
   * <p>This method may safely be called concurrently from multiple threads.
41
   */
42
  public abstract boolean isCancelled();
43

44
  /**
45
   * Sets a {@link Runnable} to be called if the call is cancelled and the server is encouraged to
46
   * abort processing to save resources, since the client will not process any further messages.
47
   * Cancellations can be caused by timeouts, explicit cancellation by the client, network errors,
48
   * etc.
49
   *
50
   * <p>It is guaranteed that execution of the {@link Runnable} is serialized with calls to the
51
   * 'inbound' {@link StreamObserver}. That also means that the callback will be delayed if other
52
   * callbacks are running; if one of those other callbacks runs for a significant amount of time
53
   * it can poll {@link #isCancelled()}, which is not delayed.
54
   *
55
   * <p>This method may only be called during the initial call to the application, before the
56
   * service returns its {@code StreamObserver}.
57
   *
58
   * <p>Setting the onCancelHandler will suppress the on-cancel exception thrown by
59
   * {@link #onNext}. If the caller is already handling cancellation via polling or cannot
60
   * substantially benefit from observing cancellation, using a no-op {@code onCancelHandler} is
61
   * useful just to suppress the {@code onNext()} exception.
62
   *
63
   * @param onCancelHandler to call when client has cancelled the call.
64
   */
65
  public abstract void setOnCancelHandler(Runnable onCancelHandler);
66

67

68
  /**
69
   * A hint to the call that specifies how many bytes must be queued before
70
   * {@link #isReady()} will return false. A call may ignore this property if
71
   * unsupported. This may only be set during stream initialization before
72
   * any messages are set.
73
   *
74
   * @param numBytes The number of bytes that must be queued. Must be a
75
   *                 positive integer.
76
   */
77
  @ExperimentalApi("https://github.com/grpc/grpc-java/issues/11021")
78
  public abstract void setOnReadyThreshold(int numBytes);
79

80
  /**
81
   * Sets the compression algorithm to use for the call. May only be called before sending any
82
   * messages. Default gRPC servers support the "gzip" compressor.
83
   *
84
   * <p>It is safe to call this even if the client does not support the compression format chosen.
85
   * The implementation will handle negotiation with the client and may fall back to no compression.
86
   *
87
   * @param compression the compression algorithm to use.
88
   * @throws IllegalArgumentException if the compressor name can not be found.
89
   */
90
  public abstract void setCompression(String compression);
91

92
  /**
93
   * Swaps to manual flow control where no message will be delivered to {@link
94
   * StreamObserver#onNext(Object)} unless it is {@link #request request()}ed.
95
   *
96
   * <p>It may only be called during the initial call to the application, before the service returns
97
   * its {@code StreamObserver}.
98
   *
99
   * <p>Note that for cases where the message is received before the service handler is invoked,
100
   * this method will have no effect. This is true for:
101
   *
102
   * <ul>
103
   *   <li>{@link io.grpc.MethodDescriptor.MethodType#UNARY} operations.</li>
104
   *   <li>{@link io.grpc.MethodDescriptor.MethodType#SERVER_STREAMING} operations.</li>
105
   * </ul>
106
   * </p>
107
   */
108
  public void disableAutoRequest() {
109
    throw new UnsupportedOperationException();
×
110
  }
111

112

113
  /**
114
   * If {@code true}, indicates that the observer is capable of sending additional messages
115
   * without requiring excessive buffering internally. This value is just a suggestion and the
116
   * application is free to ignore it, however doing so may result in excessive buffering within the
117
   * observer.
118
   *
119
   * <p>If {@code false}, the runnable passed to {@link #setOnReadyHandler} will be called after
120
   * {@code isReady()} transitions to {@code true}.
121
   */
122
  @Override
123
  public abstract boolean isReady();
124

125
  /**
126
   * Set a {@link Runnable} that will be executed every time the stream {@link #isReady()} state
127
   * changes from {@code false} to {@code true}.  While it is not guaranteed that the same
128
   * thread will always be used to execute the {@link Runnable}, it is guaranteed that executions
129
   * are serialized with calls to the 'inbound' {@link StreamObserver}.
130
   *
131
   * <p>May only be called during the initial call to the application, before the service returns
132
   * its {@code StreamObserver}.
133
   *
134
   * <p>Because there is a processing delay to deliver this notification, it is possible for
135
   * concurrent writes to cause {@code isReady() == false} within this callback. Handle "spurious"
136
   * notifications by checking {@code isReady()}'s current value instead of assuming it is now
137
   * {@code true}. If {@code isReady() == false} the normal expectations apply, so there would be
138
   * <em>another</em> {@code onReadyHandler} callback.
139
   *
140
   * @param onReadyHandler to call when peer is ready to receive more messages.
141
   */
142
  @Override
143
  public abstract void setOnReadyHandler(Runnable onReadyHandler);
144

145
  /**
146
   * Requests the peer to produce {@code count} more messages to be delivered to the 'inbound'
147
   * {@link StreamObserver}.
148
   *
149
   * <p>This method is safe to call from multiple threads without external synchronization.
150
   *
151
   * @param count more messages
152
   */
153
  @Override
154
  public abstract void request(int count);
155

156
  /**
157
   * Sets message compression for subsequent calls to {@link #onNext}.
158
   *
159
   * @param enable whether to enable compression.
160
   */
161
  @Override
162
  public abstract void setMessageCompression(boolean enable);
163

164
  /**
165
   * Sets a {@link Runnable} to be executed when the call is closed cleanly from the server's
166
   * point of view: either {@link #onCompleted()} or {@link #onError(Throwable)} has been called,
167
   * all the messages and trailing metadata have been sent and the stream has been closed. Note
168
   * however that the client still may have not received all the messages due to network delay,
169
   * client crashes, and cancellation races.
170
   *
171
   * <p>Exactly one of {@code onCloseHandler} and {@code onCancelHandler} is guaranteed to be called
172
   * when the RPC terminates.</p>
173
   *
174
   * <p>It is guaranteed that execution of {@code onCloseHandler} is serialized with calls to
175
   * the 'inbound' {@link StreamObserver}. That also means that the callback will be delayed if
176
   * other callbacks are running.</p>
177
   *
178
   * <p>This method may only be called during the initial call to the application, before the
179
   * service returns its {@link StreamObserver request observer}.</p>
180
   *
181
   * @param onCloseHandler to execute when the call has been closed cleanly.
182
   */
183
  @ExperimentalApi("https://github.com/grpc/grpc-java/issues/8467")
184
  public void setOnCloseHandler(Runnable onCloseHandler) {
185
    throw new UnsupportedOperationException();
×
186
  }
187
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc