• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

grpc / grpc-java / #19108

21 Mar 2024 10:37PM UTC coverage: 88.277% (-0.002%) from 88.279%
#19108

push

github

web-flow
Allow configuration of the queued byte threshold at which a Stream is considered not ready (#10977)

* Allow the queued byte threshold for a Stream to be ready to be configurable

- on clients this is exposed by setting a CallOption
- on servers this is configured by calling a method on ServerCall or ServerStreamListener

31190 of 35332 relevant lines covered (88.28%)

0.88 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

97.69
/../api/src/main/java/io/grpc/CallOptions.java
1
/*
2
 * Copyright 2015 The gRPC Authors
3
 *
4
 * Licensed under the Apache License, Version 2.0 (the "License");
5
 * you may not use this file except in compliance with the License.
6
 * You may obtain a copy of the License at
7
 *
8
 *     http://www.apache.org/licenses/LICENSE-2.0
9
 *
10
 * Unless required by applicable law or agreed to in writing, software
11
 * distributed under the License is distributed on an "AS IS" BASIS,
12
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
 * See the License for the specific language governing permissions and
14
 * limitations under the License.
15
 */
16

17
package io.grpc;
18

19
import static com.google.common.base.Preconditions.checkArgument;
20

21
import com.google.common.base.MoreObjects;
22
import com.google.common.base.Preconditions;
23
import java.util.ArrayList;
24
import java.util.Arrays;
25
import java.util.Collections;
26
import java.util.List;
27
import java.util.concurrent.Executor;
28
import java.util.concurrent.TimeUnit;
29
import javax.annotation.CheckReturnValue;
30
import javax.annotation.Nullable;
31
import javax.annotation.concurrent.Immutable;
32

33
/**
34
 * The collection of runtime options for a new RPC call.
35
 *
36
 * <p>A field that is not set is {@code null}.
37
 */
38
@Immutable
39
@CheckReturnValue
40
public final class CallOptions {
41
  /**
42
   * A blank {@code CallOptions} that all fields are not set.
43
   */
44
  public static final CallOptions DEFAULT;
45

46
  static {
47
    Builder b = new Builder();
1✔
48
    b.customOptions = new Object[0][2];
1✔
49
    b.streamTracerFactories = Collections.emptyList();
1✔
50
    DEFAULT = b.build();
1✔
51
  }
1✔
52

53
  @Nullable
54
  private final Deadline deadline;
55

56
  @Nullable
57
  private final Executor executor;
58

59
  @Nullable
60
  private final String authority;
61

62
  @Nullable
63
  private final CallCredentials credentials;
64

65
  @Nullable
66
  private final String compressorName;
67

68
  private final Object[][] customOptions;
69

70
  private final List<ClientStreamTracer.Factory> streamTracerFactories;
71

72
  /**
73
   * Opposite to fail fast.
74
   */
75
  @Nullable
76
  private final Boolean waitForReady;
77

78
  @Nullable
79
  private final Integer maxInboundMessageSize;
80
  @Nullable
81
  private final Integer maxOutboundMessageSize;
82
  @Nullable
83
  private final Integer onReadyThreshold;
84

85
  private CallOptions(Builder builder) {
1✔
86
    this.deadline = builder.deadline;
1✔
87
    this.executor = builder.executor;
1✔
88
    this.authority = builder.authority;
1✔
89
    this.credentials = builder.credentials;
1✔
90
    this.compressorName = builder.compressorName;
1✔
91
    this.customOptions = builder.customOptions;
1✔
92
    this.streamTracerFactories = builder.streamTracerFactories;
1✔
93
    this.waitForReady = builder.waitForReady;
1✔
94
    this.maxInboundMessageSize = builder.maxInboundMessageSize;
1✔
95
    this.maxOutboundMessageSize = builder.maxOutboundMessageSize;
1✔
96
    this.onReadyThreshold = builder.onReadyThreshold;
1✔
97
  }
1✔
98

99
  static class Builder {
1✔
100
    Deadline deadline;
101
    Executor executor;
102
    String authority;
103
    CallCredentials credentials;
104
    String compressorName;
105
    Object[][] customOptions;
106
    // Unmodifiable list
107
    List<ClientStreamTracer.Factory> streamTracerFactories;
108
    Boolean waitForReady;
109
    Integer maxInboundMessageSize;
110
    Integer maxOutboundMessageSize;
111
    Integer onReadyThreshold;
112

113
    private CallOptions build() {
114
      return new CallOptions(this);
1✔
115
    }
116
  }
117

118
  /**
119
   * Override the HTTP/2 authority the channel claims to be connecting to. <em>This is not
120
   * generally safe.</em> Overriding allows advanced users to re-use a single Channel for multiple
121
   * services, even if those services are hosted on different domain names. That assumes the
122
   * server is virtually hosting multiple domains and is guaranteed to continue doing so. It is
123
   * rare for a service provider to make such a guarantee. <em>At this time, there is no security
124
   * verification of the overridden value, such as making sure the authority matches the server's
125
   * TLS certificate.</em>
126
   */
127
  @ExperimentalApi("https://github.com/grpc/grpc-java/issues/1767")
128
  public CallOptions withAuthority(@Nullable String authority) {
129
    Builder builder = toBuilder(this);
1✔
130
    builder.authority = authority;
1✔
131
    return builder.build();
1✔
132
  }
133

134
  /**
135
   * Returns a new {@code CallOptions} with the given call credentials.
136
   */
137
  public CallOptions withCallCredentials(@Nullable CallCredentials credentials) {
138
    Builder builder = toBuilder(this);
1✔
139
    builder.credentials = credentials;
1✔
140
    return builder.build();
1✔
141
  }
142

143
  /**
144
   * Sets the compression to use for the call.  The compressor must be a valid name known in the
145
   * {@link CompressorRegistry}.  By default, the "gzip" compressor will be available.
146
   *
147
   * <p>It is only safe to call this if the server supports the compression format chosen. There is
148
   * no negotiation performed; if the server does not support the compression chosen, the call will
149
   * fail.
150
   */
151
  public CallOptions withCompression(@Nullable String compressorName) {
152
    Builder builder = toBuilder(this);
1✔
153
    builder.compressorName = compressorName;
1✔
154
    return builder.build();
1✔
155
  }
156

157
  /**
158
   * Returns a new {@code CallOptions} with the given absolute deadline.
159
   *
160
   * <p>This is mostly used for propagating an existing deadline. {@link #withDeadlineAfter} is the
161
   * recommended way of setting a new deadline,
162
   *
163
   * @param deadline the deadline or {@code null} for unsetting the deadline.
164
   */
165
  public CallOptions withDeadline(@Nullable Deadline deadline) {
166
    Builder builder = toBuilder(this);
1✔
167
    builder.deadline = deadline;
1✔
168
    return builder.build();
1✔
169
  }
170

171
  /**
172
   * Returns a new {@code CallOptions} with a deadline that is after the given {@code duration} from
173
   * now.
174
   */
175
  public CallOptions withDeadlineAfter(long duration, TimeUnit unit) {
176
    return withDeadline(Deadline.after(duration, unit));
1✔
177
  }
178

179
  /**
180
   * Returns the deadline or {@code null} if the deadline is not set.
181
   */
182
  @Nullable
183
  public Deadline getDeadline() {
184
    return deadline;
1✔
185
  }
186

187
  /**
188
   * Enables <a href="https://github.com/grpc/grpc/blob/master/doc/wait-for-ready.md">
189
   * 'wait for ready'</a> for the call. Wait-for-ready queues the RPC until a connection is
190
   * available. This may dramatically increase the latency of the RPC, but avoids failing
191
   * "unnecessarily." The default queues the RPC until an attempt to connect has completed, but
192
   * fails RPCs without sending them if unable to connect.
193
   */
194
  public CallOptions withWaitForReady() {
195
    Builder builder = toBuilder(this);
1✔
196
    builder.waitForReady = Boolean.TRUE;
1✔
197
    return builder.build();
1✔
198
  }
199

200
  /**
201
   * Disables 'wait for ready' feature for the call.
202
   * This method should be rarely used because the default is without 'wait for ready'.
203
   */
204
  public CallOptions withoutWaitForReady() {
205
    Builder builder = toBuilder(this);
1✔
206
    builder.waitForReady = Boolean.FALSE;
1✔
207
    return builder.build();
1✔
208
  }
209

210
  /**
211
   * Specifies how many bytes must be queued before the call is
212
   * considered not ready to send more messages.
213
   *
214
   * @param numBytes The number of bytes that must be queued. Must be a
215
   *                 positive integer.
216
   */
217
  @ExperimentalApi("https://github.com/grpc/grpc-java/issues/11021")
218
  public CallOptions withOnReadyThreshold(int numBytes) {
219
    checkArgument(numBytes > 0, "numBytes must be positive: %s", numBytes);
1✔
220
    Builder builder = toBuilder(this);
1✔
221
    builder.onReadyThreshold = numBytes;
1✔
222
    return builder.build();
1✔
223
  }
224

225
  /**
226
   * Resets to the default number of bytes that must be queued before the
227
   * call will leave the <a href="https://github.com/grpc/grpc/blob/master/doc/wait-for-ready.md">
228
   * 'wait for ready'</a> state.
229
   */
230
  @ExperimentalApi("https://github.com/grpc/grpc-java/issues/11021")
231
  public CallOptions clearOnReadyThreshold() {
232
    Builder builder = toBuilder(this);
1✔
233
    builder.onReadyThreshold = null;
1✔
234
    return builder.build();
1✔
235
  }
236

237
  /**
238
   * Returns to the default number of bytes that must be queued before the
239
   * call will leave the <a href="https://github.com/grpc/grpc/blob/master/doc/wait-for-ready.md">
240
   * 'wait for ready'</a> state.
241
   *
242
   * @return null if the default threshold is used.
243
   */
244
  @Nullable
245
  @ExperimentalApi("https://github.com/grpc/grpc-java/issues/11021")
246
  public Integer getOnReadyThreshold() {
247
    return onReadyThreshold;
1✔
248
  }
249

250
  /**
251
   * Returns the compressor's name.
252
   */
253
  @Nullable
254
  public String getCompressor() {
255
    return compressorName;
1✔
256
  }
257

258
  /**
259
   * Override the HTTP/2 authority the channel claims to be connecting to. <em>This is not
260
   * generally safe.</em> Overriding allows advanced users to re-use a single Channel for multiple
261
   * services, even if those services are hosted on different domain names. That assumes the
262
   * server is virtually hosting multiple domains and is guaranteed to continue doing so. It is
263
   * rare for a service provider to make such a guarantee. <em>At this time, there is no security
264
   * verification of the overridden value, such as making sure the authority matches the server's
265
   * TLS certificate.</em>
266
   */
267
  @Nullable
268
  @ExperimentalApi("https://github.com/grpc/grpc-java/issues/1767")
269
  public String getAuthority() {
270
    return authority;
1✔
271
  }
272

273
  /**
274
   * Returns the call credentials.
275
   */
276
  @Nullable
277
  public CallCredentials getCredentials() {
278
    return credentials;
1✔
279
  }
280

281
  /**
282
   * Returns a new {@code CallOptions} with {@code executor} to be used instead of the default
283
   * executor specified with {@link ManagedChannelBuilder#executor}.
284
   */
285
  public CallOptions withExecutor(@Nullable Executor executor) {
286
    Builder builder = toBuilder(this);
1✔
287
    builder.executor = executor;
1✔
288
    return builder.build();
1✔
289
  }
290

291
  /**
292
   * Returns a new {@code CallOptions} with a {@code ClientStreamTracerFactory} in addition to
293
   * the existing factories.
294
   *
295
   * <p>This method doesn't replace existing factories, or try to de-duplicate factories.
296
   */
297
  @ExperimentalApi("https://github.com/grpc/grpc-java/issues/2861")
298
  public CallOptions withStreamTracerFactory(ClientStreamTracer.Factory factory) {
299
    ArrayList<ClientStreamTracer.Factory> newList =
1✔
300
        new ArrayList<>(streamTracerFactories.size() + 1);
1✔
301
    newList.addAll(streamTracerFactories);
1✔
302
    newList.add(factory);
1✔
303
    Builder builder = toBuilder(this);
1✔
304
    builder.streamTracerFactories = Collections.unmodifiableList(newList);
1✔
305
    return builder.build();
1✔
306
  }
307

308
  /**
309
   * Returns an immutable list of {@code ClientStreamTracerFactory}s.
310
   */
311
  @ExperimentalApi("https://github.com/grpc/grpc-java/issues/2861")
312
  public List<ClientStreamTracer.Factory> getStreamTracerFactories() {
313
    return streamTracerFactories;
1✔
314
  }
315

316
  /**
317
   * Key for a key-value pair. Uses reference equality.
318
   */
319
  public static final class Key<T> {
320
    private final String debugString;
321
    private final T defaultValue;
322

323
    private Key(String debugString, T defaultValue) {
1✔
324
      this.debugString = debugString;
1✔
325
      this.defaultValue = defaultValue;
1✔
326
    }
1✔
327

328
    /**
329
     * Returns the user supplied default value for this key.
330
     */
331
    public T getDefault() {
332
      return defaultValue;
×
333
    }
334

335
    @Override
336
    public String toString() {
337
      return debugString;
1✔
338
    }
339

340
    /**
341
     * Factory method for creating instances of {@link Key}.
342
     *
343
     * @param debugString a string used to describe this key, used for debugging.
344
     * @param defaultValue default value to return when value for key not set
345
     * @param <T> Key type
346
     * @return Key object
347
     * @deprecated Use {@link #create} or {@link #createWithDefault} instead. This method will
348
     *     be removed.
349
     */
350
    @ExperimentalApi("https://github.com/grpc/grpc-java/issues/1869")
351
    @Deprecated
352
    public static <T> Key<T> of(String debugString, T defaultValue) {
353
      Preconditions.checkNotNull(debugString, "debugString");
×
354
      return new Key<>(debugString, defaultValue);
×
355
    }
356

357
    /**
358
     * Factory method for creating instances of {@link Key}. The default value of the
359
     * key is {@code null}.
360
     *
361
     * @param debugString a debug string that describes this key.
362
     * @param <T> Key type
363
     * @return Key object
364
     * @since 1.13.0
365
     */
366
    public static <T> Key<T> create(String debugString) {
367
      Preconditions.checkNotNull(debugString, "debugString");
1✔
368
      return new Key<>(debugString, /*defaultValue=*/ null);
1✔
369
    }
370

371
    /**
372
     * Factory method for creating instances of {@link Key}.
373
     *
374
     * @param debugString a debug string that describes this key.
375
     * @param defaultValue default value to return when value for key not set
376
     * @param <T> Key type
377
     * @return Key object
378
     * @since 1.13.0
379
     */
380
    public static <T> Key<T> createWithDefault(String debugString, T defaultValue) {
381
      Preconditions.checkNotNull(debugString, "debugString");
1✔
382
      return new Key<>(debugString, defaultValue);
1✔
383
    }
384
  }
385

386
  /**
387
   * Sets a custom option. Any existing value for the key is overwritten.
388
   *
389
   * @param key The option key
390
   * @param value The option value.
391
   * @since 1.13.0
392
   */
393
  public <T> CallOptions withOption(Key<T> key, T value) {
394
    Preconditions.checkNotNull(key, "key");
1✔
395
    Preconditions.checkNotNull(value, "value");
1✔
396

397
    Builder builder = toBuilder(this);
1✔
398
    int existingIdx = -1;
1✔
399
    for (int i = 0; i < customOptions.length; i++) {
1✔
400
      if (key.equals(customOptions[i][0])) {
1✔
401
        existingIdx = i;
1✔
402
        break;
1✔
403
      }
404
    }
405

406
    builder.customOptions = new Object[customOptions.length + (existingIdx == -1 ? 1 : 0)][2];
1✔
407
    System.arraycopy(customOptions, 0, builder.customOptions, 0, customOptions.length);
1✔
408

409
    if (existingIdx == -1) {
1✔
410
      // Add a new option
411
      builder.customOptions[customOptions.length] = new Object[] {key, value};
1✔
412
    } else {
413
      // Replace an existing option
414
      builder.customOptions[existingIdx] = new Object[] {key, value};
1✔
415
    }
416

417
    return builder.build();
1✔
418
  }
419

420
  /**
421
   * Get the value for a custom option or its inherent default.
422
   * @param key Key identifying option
423
   */
424
  @ExperimentalApi("https://github.com/grpc/grpc-java/issues/1869")
425
  @SuppressWarnings("unchecked")
426
  public <T> T getOption(Key<T> key) {
427
    Preconditions.checkNotNull(key, "key");
1✔
428
    for (int i = 0; i < customOptions.length; i++) {
1✔
429
      if (key.equals(customOptions[i][0])) {
1✔
430
        return (T) customOptions[i][1];
1✔
431
      }
432
    }
433
    return key.defaultValue;
1✔
434
  }
435

436
  /**
437
   * Returns the executor override to use for this specific call, or {@code null} if there is no
438
   * override. The executor is only for servicing this one call, so is not safe to use after
439
   * {@link ClientCall.Listener#onClose}.
440
   */
441
  @Nullable
442
  public Executor getExecutor() {
443
    return executor;
1✔
444
  }
445

446
  /**
447
   * Returns whether <a href="https://github.com/grpc/grpc/blob/master/doc/wait-for-ready.md">
448
   * 'wait for ready'</a> option is enabled for the call. 'Fail fast' is the default option for gRPC
449
   * calls and 'wait for ready' is the opposite to it.
450
   */
451
  public boolean isWaitForReady() {
452
    return Boolean.TRUE.equals(waitForReady);
1✔
453
  }
454

455
  Boolean getWaitForReady() {
456
    return waitForReady;
1✔
457
  }
458

459
  /**
460
   * Sets the maximum allowed message size acceptable from the remote peer.  If unset, this will
461
   * default to the value set on the {@link ManagedChannelBuilder#maxInboundMessageSize(int)}.
462
   */
463
  @ExperimentalApi("https://github.com/grpc/grpc-java/issues/2563")
464
  public CallOptions withMaxInboundMessageSize(int maxSize) {
465
    checkArgument(maxSize >= 0, "invalid maxsize %s", maxSize);
1✔
466
    Builder builder = toBuilder(this);
1✔
467
    builder.maxInboundMessageSize = maxSize;
1✔
468
    return builder.build();
1✔
469
  }
470

471
  /**
472
   * Sets the maximum allowed message size acceptable sent to the remote peer.
473
   */
474
  @ExperimentalApi("https://github.com/grpc/grpc-java/issues/2563")
475
  public CallOptions withMaxOutboundMessageSize(int maxSize) {
476
    checkArgument(maxSize >= 0, "invalid maxsize %s", maxSize);
1✔
477
    Builder builder = toBuilder(this);
1✔
478
    builder.maxOutboundMessageSize = maxSize;
1✔
479
    return builder.build();
1✔
480
  }
481

482
  /**
483
   * Gets the maximum allowed message size acceptable from the remote peer.
484
   */
485
  @Nullable
486
  @ExperimentalApi("https://github.com/grpc/grpc-java/issues/2563")
487
  public Integer getMaxInboundMessageSize() {
488
    return maxInboundMessageSize;
1✔
489
  }
490

491
  /**
492
   * Gets the maximum allowed message size acceptable to send the remote peer.
493
   */
494
  @Nullable
495
  @ExperimentalApi("https://github.com/grpc/grpc-java/issues/2563")
496
  public Integer getMaxOutboundMessageSize() {
497
    return maxOutboundMessageSize;
1✔
498
  }
499

500
  /**
501
   * Copy CallOptions.
502
   */
503
  private static Builder toBuilder(CallOptions other) {
504
    Builder builder = new Builder();
1✔
505
    builder.deadline = other.deadline;
1✔
506
    builder.executor = other.executor;
1✔
507
    builder.authority = other.authority;
1✔
508
    builder.credentials = other.credentials;
1✔
509
    builder.compressorName = other.compressorName;
1✔
510
    builder.customOptions = other.customOptions;
1✔
511
    builder.streamTracerFactories = other.streamTracerFactories;
1✔
512
    builder.waitForReady = other.waitForReady;
1✔
513
    builder.maxInboundMessageSize = other.maxInboundMessageSize;
1✔
514
    builder.maxOutboundMessageSize = other.maxOutboundMessageSize;
1✔
515
    return builder;
1✔
516
  }
517

518
  @Override
519
  public String toString() {
520
    return MoreObjects.toStringHelper(this)
1✔
521
        .add("deadline", deadline)
1✔
522
        .add("authority", authority)
1✔
523
        .add("callCredentials", credentials)
1✔
524
        .add("executor", executor != null ? executor.getClass() : null)
1✔
525
        .add("compressorName", compressorName)
1✔
526
        .add("customOptions", Arrays.deepToString(customOptions))
1✔
527
        .add("waitForReady", isWaitForReady())
1✔
528
        .add("maxInboundMessageSize", maxInboundMessageSize)
1✔
529
        .add("maxOutboundMessageSize", maxOutboundMessageSize)
1✔
530
        .add("streamTracerFactories", streamTracerFactories)
1✔
531
        .toString();
1✔
532
  }
533
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc