• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

grpc / grpc-java / #19535

30 Oct 2024 03:41PM UTC coverage: 84.572% (-0.005%) from 84.577%
#19535

push

github

web-flow
xds: Per-rpc rewriting of the authority header based on the selected route. (#11631)

Implementation of A81.

33970 of 40167 relevant lines covered (84.57%)

0.85 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

99.16
/../core/src/main/java/io/grpc/internal/DelayedStream.java
1
/*
2
 * Copyright 2015 The gRPC Authors
3
 *
4
 * Licensed under the Apache License, Version 2.0 (the "License");
5
 * you may not use this file except in compliance with the License.
6
 * You may obtain a copy of the License at
7
 *
8
 *     http://www.apache.org/licenses/LICENSE-2.0
9
 *
10
 * Unless required by applicable law or agreed to in writing, software
11
 * distributed under the License is distributed on an "AS IS" BASIS,
12
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
 * See the License for the specific language governing permissions and
14
 * limitations under the License.
15
 */
16

17
package io.grpc.internal;
18

19
import static com.google.common.base.Preconditions.checkNotNull;
20
import static com.google.common.base.Preconditions.checkState;
21

22
import com.google.common.annotations.VisibleForTesting;
23
import io.grpc.Attributes;
24
import io.grpc.Compressor;
25
import io.grpc.Deadline;
26
import io.grpc.DecompressorRegistry;
27
import io.grpc.Metadata;
28
import io.grpc.Status;
29
import io.grpc.internal.ClientStreamListener.RpcProgress;
30
import java.io.InputStream;
31
import java.util.ArrayList;
32
import java.util.List;
33
import javax.annotation.CheckReturnValue;
34
import javax.annotation.concurrent.GuardedBy;
35

36
/**
37
 * A stream that queues requests before the transport is available, and delegates to a real stream
38
 * implementation when the transport is available.
39
 *
40
 * <p>{@code ClientStream} itself doesn't require thread-safety. However, the state of {@code
41
 * DelayedStream} may be internally altered by different threads, thus internal synchronization is
42
 * necessary.
43
 */
44
class DelayedStream implements ClientStream {
1✔
45
  /** {@code true} once realStream is valid and all pending calls have been drained. */
46
  private volatile boolean passThrough;
47
  /**
48
   * Non-{@code null} iff start has been called. Used to assert methods are called in appropriate
49
   * order, but also used if an error occurs before {@code realStream} is set.
50
   */
51
  private ClientStreamListener listener;
52
  /** Must hold {@code this} lock when setting. */
53
  private ClientStream realStream;
54
  @GuardedBy("this")
55
  private Status error;
56
  @GuardedBy("this")
1✔
57
  private List<Runnable> pendingCalls = new ArrayList<>();
58
  @GuardedBy("this")
59
  private DelayedStreamListener delayedListener;
60
  @GuardedBy("this")
61
  private long startTimeNanos;
62
  @GuardedBy("this")
63
  private long streamSetTimeNanos;
64
  // No need to synchronize; start() synchronization provides a happens-before
65
  private List<Runnable> preStartPendingCalls = new ArrayList<>();
1✔
66

67
  @Override
68
  public void setMaxInboundMessageSize(final int maxSize) {
69
    checkState(listener == null, "May only be called before start");
1✔
70
    preStartPendingCalls.add(new Runnable() {
1✔
71
      @Override
72
      public void run() {
73
        realStream.setMaxInboundMessageSize(maxSize);
1✔
74
      }
1✔
75
    });
76
  }
1✔
77

78
  @Override
79
  public void setMaxOutboundMessageSize(final int maxSize) {
80
    checkState(listener == null, "May only be called before start");
1✔
81
    preStartPendingCalls.add(new Runnable() {
1✔
82
      @Override
83
      public void run() {
84
        realStream.setMaxOutboundMessageSize(maxSize);
1✔
85
      }
1✔
86
    });
87
  }
1✔
88

89
  @Override
90
  public void setDeadline(final Deadline deadline) {
91
    checkState(listener == null, "May only be called before start");
1✔
92
    preStartPendingCalls.add(new Runnable() {
1✔
93
      @Override
94
      public void run() {
95
        realStream.setDeadline(deadline);
1✔
96
      }
1✔
97
    });
98
  }
1✔
99

100
  @Override
101
  public void appendTimeoutInsight(InsightBuilder insight) {
102
    synchronized (this) {
1✔
103
      if (listener == null) {
1✔
104
        return;
1✔
105
      }
106
      if (realStream != null) {
1✔
107
        insight.appendKeyValue("buffered_nanos", streamSetTimeNanos - startTimeNanos);
1✔
108
        realStream.appendTimeoutInsight(insight);
1✔
109
      } else {
110
        insight.appendKeyValue("buffered_nanos", System.nanoTime() - startTimeNanos);
1✔
111
        insight.append("waiting_for_connection");
1✔
112
      }
113
    }
1✔
114
  }
1✔
115

116
  /**
117
   * Transfers all pending and future requests and mutations to the given stream. Method will return
118
   * quickly, but if the returned Runnable is non-null it must be called to complete the process.
119
   * The Runnable may take a while to execute.
120
   *
121
   * <p>No-op if either this method or {@link #cancel} have already been called.
122
   */
123
  // When this method returns, start() has been called on realStream or passThrough is guaranteed to
124
  // be true
125
  @CheckReturnValue
126
  final Runnable setStream(ClientStream stream) {
127
    ClientStreamListener savedListener;
128
    synchronized (this) {
1✔
129
      // If realStream != null, then either setStream() or cancel() has been called.
130
      if (realStream != null) {
1✔
131
        return null;
1✔
132
      }
133
      setRealStream(checkNotNull(stream, "stream"));
1✔
134
      savedListener = listener;
1✔
135
      if (savedListener == null) {
1✔
136
        assert pendingCalls.isEmpty();
1✔
137
        pendingCalls = null;
1✔
138
        passThrough = true;
1✔
139
      }
140
    }
1✔
141
    if (savedListener == null) {
1✔
142
      return null;
1✔
143
    } else {
144
      internalStart(savedListener);
1✔
145
      return new Runnable() {
1✔
146
        @Override
147
        public void run() {
148
          drainPendingCalls();
1✔
149
        }
1✔
150
      };
151
    }
152
  }
153

154
  /**
155
   * Called to transition {@code passThrough} to {@code true}. This method is not safe to be called
156
   * multiple times; the caller must ensure it will only be called once, ever. {@code this} lock
157
   * should not be held when calling this method.
158
   */
159
  private void drainPendingCalls() {
160
    assert realStream != null;
1✔
161
    assert !passThrough;
1✔
162
    List<Runnable> toRun = new ArrayList<>();
1✔
163
    DelayedStreamListener delayedListener = null;
1✔
164
    while (true) {
165
      synchronized (this) {
1✔
166
        if (pendingCalls.isEmpty()) {
1✔
167
          pendingCalls = null;
1✔
168
          passThrough = true;
1✔
169
          delayedListener = this.delayedListener;
1✔
170
          break;
1✔
171
        }
172
        // Since there were pendingCalls, we need to process them. To maintain ordering we can't set
173
        // passThrough=true until we run all pendingCalls, but new Runnables may be added after we
174
        // drop the lock. So we will have to re-check pendingCalls.
175
        List<Runnable> tmp = toRun;
1✔
176
        toRun = pendingCalls;
1✔
177
        pendingCalls = tmp;
1✔
178
      }
1✔
179
      for (Runnable runnable : toRun) {
1✔
180
        // Must not call transport while lock is held to prevent deadlocks.
181
        // TODO(ejona): exception handling
182
        runnable.run();
1✔
183
      }
1✔
184
      toRun.clear();
1✔
185
    }
186
    if (delayedListener != null) {
1✔
187
      delayedListener.drainPendingCallbacks();
1✔
188
    }
189
  }
1✔
190

191
  /**
192
   * Enqueue the runnable or execute it now. Call sites that may be called many times may want avoid
193
   * this method if {@code passThrough == true}.
194
   *
195
   * <p>Note that this method is no more thread-safe than {@code runnable}. It is thread-safe if and
196
   * only if {@code runnable} is thread-safe.
197
   */
198
  private void delayOrExecute(Runnable runnable) {
199
    checkState(listener != null, "May only be called after start");
1✔
200
    synchronized (this) {
1✔
201
      if (!passThrough) {
1✔
202
        pendingCalls.add(runnable);
1✔
203
        return;
1✔
204
      }
205
    }
1✔
206
    runnable.run();
1✔
207
  }
1✔
208

209
  @Override
210
  public void setAuthority(final String authority) {
211
    checkNotNull(authority, "authority");
1✔
212
    preStartPendingCalls.add(new Runnable() {
1✔
213
      @Override
214
      public void run() {
215
        realStream.setAuthority(authority);
1✔
216
      }
1✔
217
    });
218
  }
1✔
219

220
  @Override
221
  public void start(ClientStreamListener listener) {
222
    checkNotNull(listener, "listener");
1✔
223
    checkState(this.listener == null, "already started");
1✔
224

225
    Status savedError;
226
    boolean savedPassThrough;
227
    synchronized (this) {
1✔
228
      // If error != null, then cancel() has been called and was unable to close the listener
229
      savedError = error;
1✔
230
      savedPassThrough = passThrough;
1✔
231
      if (!savedPassThrough) {
1✔
232
        listener = delayedListener = new DelayedStreamListener(listener);
1✔
233
      }
234
      this.listener = listener;
1✔
235
      startTimeNanos = System.nanoTime();
1✔
236
    }
1✔
237
    if (savedError != null) {
1✔
238
      listener.closed(savedError, RpcProgress.PROCESSED, new Metadata());
×
239
      return;
×
240
    }
241

242
    if (savedPassThrough) {
1✔
243
      internalStart(listener);
1✔
244
    } // else internalStart() will be called by setStream
245
  }
1✔
246

247
  /**
248
   * Starts stream without synchronization. {@code listener} should be same instance as {@link
249
   * #listener}.
250
   */
251
  private void internalStart(ClientStreamListener listener) {
252
    for (Runnable runnable : preStartPendingCalls) {
1✔
253
      runnable.run();
1✔
254
    }
1✔
255
    preStartPendingCalls = null;
1✔
256
    realStream.start(listener);
1✔
257
  }
1✔
258

259
  @Override
260
  public Attributes getAttributes() {
261
    ClientStream savedRealStream;
262
    synchronized (this) {
1✔
263
      savedRealStream = realStream;
1✔
264
    }
1✔
265
    if (savedRealStream != null) {
1✔
266
      return savedRealStream.getAttributes();
1✔
267
    } else {
268
      return Attributes.EMPTY;
1✔
269
    }
270
  }
271

272
  @Override
273
  public void writeMessage(final InputStream message) {
274
    checkState(listener != null, "May only be called after start");
1✔
275
    checkNotNull(message, "message");
1✔
276
    if (passThrough) {
1✔
277
      realStream.writeMessage(message);
1✔
278
    } else {
279
      delayOrExecute(new Runnable() {
1✔
280
        @Override
281
        public void run() {
282
          realStream.writeMessage(message);
1✔
283
        }
1✔
284
      });
285
    }
286
  }
1✔
287

288
  @Override
289
  public void flush() {
290
    checkState(listener != null, "May only be called after start");
1✔
291
    if (passThrough) {
1✔
292
      realStream.flush();
1✔
293
    } else {
294
      delayOrExecute(new Runnable() {
1✔
295
        @Override
296
        public void run() {
297
          realStream.flush();
1✔
298
        }
1✔
299
      });
300
    }
301
  }
1✔
302

303
  // When this method returns, passThrough is guaranteed to be true
304
  @Override
305
  public void cancel(final Status reason) {
306
    checkState(listener != null, "May only be called after start");
1✔
307
    checkNotNull(reason, "reason");
1✔
308
    boolean delegateToRealStream = true;
1✔
309
    synchronized (this) {
1✔
310
      // If realStream != null, then either setStream() or cancel() has been called
311
      if (realStream == null) {
1✔
312
        setRealStream(NoopClientStream.INSTANCE);
1✔
313
        delegateToRealStream = false;
1✔
314
        error = reason;
1✔
315
      }
316
    }
1✔
317
    if (delegateToRealStream) {
1✔
318
      delayOrExecute(new Runnable() {
1✔
319
        @Override
320
        public void run() {
321
          realStream.cancel(reason);
1✔
322
        }
1✔
323
      });
324
    } else {
325
      drainPendingCalls();
1✔
326
      onEarlyCancellation(reason);
1✔
327
      // Note that listener is a DelayedStreamListener
328
      listener.closed(reason, RpcProgress.PROCESSED, new Metadata());
1✔
329
    }
330
  }
1✔
331

332
  protected void onEarlyCancellation(Status reason) {
333
  }
1✔
334

335
  @GuardedBy("this")
336
  private void setRealStream(ClientStream realStream) {
337
    checkState(this.realStream == null, "realStream already set to %s", this.realStream);
1✔
338
    this.realStream = realStream;
1✔
339
    streamSetTimeNanos = System.nanoTime();
1✔
340
  }
1✔
341

342
  @Override
343
  public void halfClose() {
344
    checkState(listener != null, "May only be called after start");
1✔
345
    delayOrExecute(new Runnable() {
1✔
346
      @Override
347
      public void run() {
348
        realStream.halfClose();
1✔
349
      }
1✔
350
    });
351
  }
1✔
352

353
  @Override
354
  public void request(final int numMessages) {
355
    checkState(listener != null, "May only be called after start");
1✔
356
    if (passThrough) {
1✔
357
      realStream.request(numMessages);
1✔
358
    } else {
359
      delayOrExecute(new Runnable() {
1✔
360
        @Override
361
        public void run() {
362
          realStream.request(numMessages);
1✔
363
        }
1✔
364
      });
365
    }
366
  }
1✔
367

368
  @Override
369
  public void optimizeForDirectExecutor() {
370
    checkState(listener == null, "May only be called before start");
1✔
371
    preStartPendingCalls.add(new Runnable() {
1✔
372
      @Override
373
      public void run() {
374
        realStream.optimizeForDirectExecutor();
1✔
375
      }
1✔
376
    });
377
  }
1✔
378

379
  @Override
380
  public void setCompressor(final Compressor compressor) {
381
    checkState(listener == null, "May only be called before start");
1✔
382
    checkNotNull(compressor, "compressor");
1✔
383
    preStartPendingCalls.add(new Runnable() {
1✔
384
      @Override
385
      public void run() {
386
        realStream.setCompressor(compressor);
1✔
387
      }
1✔
388
    });
389
  }
1✔
390

391
  @Override
392
  public void setFullStreamDecompression(final boolean fullStreamDecompression) {
393
    checkState(listener == null, "May only be called before start");
1✔
394
    preStartPendingCalls.add(
1✔
395
        new Runnable() {
1✔
396
          @Override
397
          public void run() {
398
            realStream.setFullStreamDecompression(fullStreamDecompression);
1✔
399
          }
1✔
400
        });
401
  }
1✔
402

403
  @Override
404
  public void setDecompressorRegistry(final DecompressorRegistry decompressorRegistry) {
405
    checkState(listener == null, "May only be called before start");
1✔
406
    checkNotNull(decompressorRegistry, "decompressorRegistry");
1✔
407
    preStartPendingCalls.add(new Runnable() {
1✔
408
      @Override
409
      public void run() {
410
        realStream.setDecompressorRegistry(decompressorRegistry);
1✔
411
      }
1✔
412
    });
413
  }
1✔
414

415
  @Override
416
  public boolean isReady() {
417
    if (passThrough) {
1✔
418
      return realStream.isReady();
1✔
419
    } else {
420
      return false;
1✔
421
    }
422
  }
423

424
  @Override
425
  public void setMessageCompression(final boolean enable) {
426
    checkState(listener != null, "May only be called after start");
1✔
427
    if (passThrough) {
1✔
428
      realStream.setMessageCompression(enable);
1✔
429
    } else {
430
      delayOrExecute(new Runnable() {
1✔
431
        @Override
432
        public void run() {
433
          realStream.setMessageCompression(enable);
1✔
434
        }
1✔
435
      });
436
    }
437
  }
1✔
438

439
  @VisibleForTesting
440
  ClientStream getRealStream() {
441
    return realStream;
1✔
442
  }
443

444
  private static class DelayedStreamListener implements ClientStreamListener {
1✔
445
    private final ClientStreamListener realListener;
446
    private volatile boolean passThrough;
447
    @GuardedBy("this")
1✔
448
    private List<Runnable> pendingCallbacks = new ArrayList<>();
449

450
    public DelayedStreamListener(ClientStreamListener listener) {
1✔
451
      this.realListener = listener;
1✔
452
    }
1✔
453

454
    private void delayOrExecute(Runnable runnable) {
455
      synchronized (this) {
1✔
456
        if (!passThrough) {
1✔
457
          pendingCallbacks.add(runnable);
1✔
458
          return;
1✔
459
        }
460
      }
1✔
461
      runnable.run();
1✔
462
    }
1✔
463

464
    @Override
465
    public void messagesAvailable(final MessageProducer producer) {
466
      if (passThrough) {
1✔
467
        realListener.messagesAvailable(producer);
1✔
468
      } else {
469
        delayOrExecute(new Runnable() {
1✔
470
          @Override
471
          public void run() {
472
            realListener.messagesAvailable(producer);
1✔
473
          }
1✔
474
        });
475
      }
476
    }
1✔
477

478
    @Override
479
    public void onReady() {
480
      if (passThrough) {
1✔
481
        realListener.onReady();
1✔
482
      } else {
483
        delayOrExecute(new Runnable() {
1✔
484
          @Override
485
          public void run() {
486
            realListener.onReady();
1✔
487
          }
1✔
488
        });
489
      }
490
    }
1✔
491

492
    @Override
493
    public void headersRead(final Metadata headers) {
494
      delayOrExecute(new Runnable() {
1✔
495
        @Override
496
        public void run() {
497
          realListener.headersRead(headers);
1✔
498
        }
1✔
499
      });
500
    }
1✔
501

502
    @Override
503
    public void closed(
504
        final Status status, final RpcProgress rpcProgress,
505
        final Metadata trailers) {
506
      delayOrExecute(new Runnable() {
1✔
507
        @Override
508
        public void run() {
509
          realListener.closed(status, rpcProgress, trailers);
1✔
510
        }
1✔
511
      });
512
    }
1✔
513

514
    public void drainPendingCallbacks() {
515
      assert !passThrough;
1✔
516
      List<Runnable> toRun = new ArrayList<>();
1✔
517
      while (true) {
518
        synchronized (this) {
1✔
519
          if (pendingCallbacks.isEmpty()) {
1✔
520
            pendingCallbacks = null;
1✔
521
            passThrough = true;
1✔
522
            break;
1✔
523
          }
524
          // Since there were pendingCallbacks, we need to process them. To maintain ordering we
525
          // can't set passThrough=true until we run all pendingCallbacks, but new Runnables may be
526
          // added after we drop the lock. So we will have to re-check pendingCallbacks.
527
          List<Runnable> tmp = toRun;
1✔
528
          toRun = pendingCallbacks;
1✔
529
          pendingCallbacks = tmp;
1✔
530
        }
1✔
531
        for (Runnable runnable : toRun) {
1✔
532
          // Avoid calling listener while lock is held to prevent deadlocks.
533
          // TODO(ejona): exception handling
534
          runnable.run();
1✔
535
        }
1✔
536
        toRun.clear();
1✔
537
      }
538
    }
1✔
539
  }
540
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc