• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

grpc / grpc-java / #19108

21 Mar 2024 10:37PM UTC coverage: 88.277% (-0.002%) from 88.279%
#19108

push

github

web-flow
Allow configuration of the queued byte threshold at which a Stream is considered not ready (#10977)

* Allow the queued byte threshold for a Stream to be ready to be configurable

- on clients this is exposed by setting a CallOption
- on servers this is configured by calling a method on ServerCall or ServerStreamListener

31190 of 35332 relevant lines covered (88.28%)

0.88 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

80.0
/../api/src/main/java/io/grpc/ServerCall.java
1
/*
2
 * Copyright 2014 The gRPC Authors
3
 *
4
 * Licensed under the Apache License, Version 2.0 (the "License");
5
 * you may not use this file except in compliance with the License.
6
 * You may obtain a copy of the License at
7
 *
8
 *     http://www.apache.org/licenses/LICENSE-2.0
9
 *
10
 * Unless required by applicable law or agreed to in writing, software
11
 * distributed under the License is distributed on an "AS IS" BASIS,
12
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
 * See the License for the specific language governing permissions and
14
 * limitations under the License.
15
 */
16

17
package io.grpc;
18

19
import static com.google.common.base.Preconditions.checkArgument;
20

21
import javax.annotation.Nullable;
22

23
/**
24
 * Encapsulates a single call received from a remote client. Calls may not simply be unary
25
 * request-response even though this is the most common pattern. Calls may stream any number of
26
 * requests and responses. This API is generally intended for use by generated handlers,
27
 * but applications may use it directly if they need to.
28
 *
29
 * <p>Headers must be sent before any messages, which must be sent before closing.
30
 *
31
 * <p>No generic method for determining message receipt or providing acknowledgement is provided.
32
 * Applications are expected to utilize normal messages for such signals, as a response
33
 * naturally acknowledges its request.
34
 *
35
 * <p>Methods are guaranteed to be non-blocking. Implementations are not required to be thread-safe.
36
 *
37
 * <p>DO NOT MOCK: Use InProcessTransport and make a fake server instead.
38
 *
39
 * @param <ReqT> parsed type of request message.
40
 * @param <RespT> parsed type of response message.
41
 */
42
public abstract class ServerCall<ReqT, RespT> {
1✔
43

44
  /**
45
   * Callbacks for consuming incoming RPC messages.
46
   *
47
   * <p>Any contexts are guaranteed to arrive before any messages, which are guaranteed before half
48
   * close, which is guaranteed before completion.
49
   *
50
   * <p>Implementations are free to block for extended periods of time. Implementations are not
51
   * required to be thread-safe, but they must not be thread-hostile. The caller is free to call
52
   * an instance from multiple threads, but only one call simultaneously. A single thread may
53
   * interleave calls to multiple instances, so implementations using ThreadLocals must be careful
54
   * to avoid leaking inappropriate state (e.g., clearing the ThreadLocal before returning).
55
   */
56
  // TODO(ejona86): We need to decide what to do in the case of server closing with non-cancellation
57
  // before client half closes. It may be that we treat such a case as an error. If we permit such
58
  // a case then we either get to generate a half close or purposefully omit it.
59
  public abstract static class Listener<ReqT> {
1✔
60
    /**
61
     * A request message has been received. For streaming calls, there may be zero or more request
62
     * messages.
63
     *
64
     * @param message a received request message.
65
     */
66
    public void onMessage(ReqT message) {}
1✔
67

68
    /**
69
     * The client completed all message sending. However, the call may still be cancelled.
70
     */
71
    public void onHalfClose() {}
1✔
72

73
    /**
74
     * The call was cancelled and the server is encouraged to abort processing to save resources,
75
     * since the client will not process any further messages. Cancellations can be caused by
76
     * timeouts, explicit cancellation by the client, network errors, etc.
77
     *
78
     * <p>There will be no further callbacks for the call.
79
     */
80
    public void onCancel() {}
1✔
81

82
    /**
83
     * The call is considered complete and {@link #onCancel} is guaranteed not to be called.
84
     * However, the client is not guaranteed to have received all messages.
85
     *
86
     * <p>There will be no further callbacks for the call.
87
     */
88
    public void onComplete() {}
1✔
89

90
    /**
91
     * This indicates that the call may now be capable of sending additional messages (via
92
     * {@link #sendMessage}) without requiring excessive buffering internally. This event is
93
     * just a suggestion and the application is free to ignore it, however doing so may
94
     * result in excessive buffering within the call.
95
     *
96
     * <p>Because there is a processing delay to deliver this notification, it is possible for
97
     * concurrent writes to cause {@code isReady() == false} within this callback. Handle "spurious"
98
     * notifications by checking {@code isReady()}'s current value instead of assuming it is now
99
     * {@code true}. If {@code isReady() == false} the normal expectations apply, so there would be
100
     * <em>another</em> {@code onReady()} callback.
101
     */
102
    public void onReady() {}
1✔
103
  }
104

105
  /**
106
   * Requests up to the given number of messages from the call to be delivered to
107
   * {@link Listener#onMessage(Object)}. Once {@code numMessages} have been delivered
108
   * no further request messages will be delivered until more messages are requested by
109
   * calling this method again.
110
   *
111
   * <p>Servers use this mechanism to provide back-pressure to the client for flow-control.
112
   *
113
   * <p>This method is safe to call from multiple threads without external synchronization.
114
   *
115
   * @param numMessages the requested number of messages to be delivered to the listener.
116
   */
117
  public abstract void request(int numMessages);
118

119
  /**
120
   * Send response header metadata prior to sending a response message. This method may
121
   * only be called once and cannot be called after calls to {@link #sendMessage} or {@link #close}.
122
   *
123
   * <p>Since {@link Metadata} is not thread-safe, the caller must not access (read or write) {@code
124
   * headers} after this point.
125
   *
126
   * @param headers metadata to send prior to any response body.
127
   * @throws IllegalStateException if {@code close} has been called, a message has been sent, or
128
   *     headers have already been sent
129
   */
130
  public abstract void sendHeaders(Metadata headers);
131

132
  /**
133
   * Send a response message. Messages are the primary form of communication associated with
134
   * RPCs. Multiple response messages may exist for streaming calls.
135
   *
136
   * @param message response message.
137
   * @throws IllegalStateException if headers not sent or call is {@link #close}d
138
   */
139
  public abstract void sendMessage(RespT message);
140

141
  /**
142
   * If {@code true}, indicates that the call is capable of sending additional messages
143
   * without requiring excessive buffering internally. This event is
144
   * just a suggestion and the application is free to ignore it, however doing so may
145
   * result in excessive buffering within the call.
146
   *
147
   * <p>If {@code false}, {@link Listener#onReady()} will be called after {@code isReady()}
148
   * transitions to {@code true}.
149
   *
150
   * <p>This abstract class's implementation always returns {@code true}. Implementations generally
151
   * override the method.
152
   */
153
  public boolean isReady() {
154
    return true;
×
155
  }
156

157
  /**
158
   * Close the call with the provided status. No further sending or receiving will occur. If {@link
159
   * Status#isOk} is {@code false}, then the call is said to have failed.
160
   *
161
   * <p>If no errors or cancellations are known to have occurred, then a {@link Listener#onComplete}
162
   * notification should be expected, independent of {@code status}. Otherwise {@link
163
   * Listener#onCancel} has been or will be called.
164
   *
165
   * <p>Since {@link Metadata} is not thread-safe, the caller must not access (read or write) {@code
166
   * trailers} after this point.
167
   *
168
   * <p>This method implies the caller completed processing the RPC, but it does not imply the RPC
169
   * is complete. The call implementation will need additional time to complete the RPC and during
170
   * this time the client is still able to cancel the request or a network error might cause the
171
   * RPC to fail. If you wish to know when the call is actually completed/closed, you have to use
172
   * {@link Listener#onComplete} or {@link Listener#onCancel} instead. This method is not
173
   * necessarily invoked when Listener.onCancel() is called.
174
   *
175
   * @throws IllegalStateException if call is already {@code close}d
176
   */
177
  public abstract void close(Status status, Metadata trailers);
178

179
  /**
180
   * Returns {@code true} when the call is cancelled and the server is encouraged to abort
181
   * processing to save resources, since the client will not be processing any further methods.
182
   * Cancellations can be caused by timeouts, explicit cancel by client, network errors, and
183
   * similar.
184
   *
185
   * <p>This method may safely be called concurrently from multiple threads.
186
   */
187
  public abstract boolean isCancelled();
188

189
  /**
190
   * Enables per-message compression, if an encoding type has been negotiated.  If no message
191
   * encoding has been negotiated, this is a no-op. By default per-message compression is enabled,
192
   * but may not have any effect if compression is not enabled on the call.
193
   */
194
  public void setMessageCompression(boolean enabled) {
195
    // noop
196
  }
×
197

198
  /**
199
   * Sets the compression algorithm for this call.  This compression is utilized for sending.  If
200
   * the server does not support the compression algorithm, the call will fail.  This method may
201
   * only be called before {@link #sendHeaders}.  The compressor to use will be looked up in the
202
   * {@link CompressorRegistry}.  Default gRPC servers support the "gzip" compressor.
203
   *
204
   * <p>It is safe to call this even if the client does not support the compression format chosen.
205
   * The implementation will handle negotiation with the client and may fall back to no compression.
206
   *
207
   * @param compressor the name of the compressor to use.
208
   * @throws IllegalArgumentException if the compressor name can not be found.
209
   */
210
  public void setCompression(String compressor) {
211
    // noop
212
  }
×
213

214
  /**
215
   * A hint to the call that specifies how many bytes must be queued before
216
   * {@link #isReady()} will return false. A call may ignore this property if
217
   * unsupported. This may only be set before any messages are sent.
218
   *
219
   * @param numBytes The number of bytes that must be queued. Must be a
220
   *                 positive integer.
221
   */
222
  @ExperimentalApi("https://github.com/grpc/grpc-java/issues/11021")
223
  public void setOnReadyThreshold(int numBytes) {
224
    checkArgument(numBytes > 0, "numBytes must be positive: %s", numBytes);
1✔
225
  }
1✔
226

227
  /**
228
   * Returns the level of security guarantee in communications
229
   *
230
   * <p>Determining the level of security offered by the transport for RPCs on server-side.
231
   * This can be approximated by looking for the SSLSession, but that doesn't work for ALTS and
232
   * maybe some future TLS approaches. May return a lower security level when it cannot be
233
   * determined precisely.
234
   *
235
   * @return non-{@code null} SecurityLevel enum
236
   */
237
  @ExperimentalApi("https://github.com/grpc/grpc-java/issues/4692")
238
  public SecurityLevel getSecurityLevel() {
239
    return SecurityLevel.NONE;
1✔
240
  }
241

242
  /**
243
   * Returns properties of a single call.
244
   *
245
   * <p>Attributes originate from the transport and can be altered by {@link ServerTransportFilter}.
246
   *
247
   * @return non-{@code null} Attributes container
248
   */
249
  @ExperimentalApi("https://github.com/grpc/grpc-java/issues/1779")
250
  @Grpc.TransportAttr
251
  public Attributes getAttributes() {
252
    return Attributes.EMPTY;
1✔
253
  }
254

255
  /**
256
   * Gets the authority this call is addressed to.
257
   *
258
   * @return the authority string. {@code null} if not available.
259
   */
260
  @Nullable
261
  public String getAuthority() {
262
    return null;
1✔
263
  }
264

265
  /**
266
   * The {@link MethodDescriptor} for the call.
267
   */
268
  public abstract MethodDescriptor<ReqT, RespT> getMethodDescriptor();
269
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc