• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

grpc / grpc-java / #19674

31 Jan 2025 12:10AM CUT coverage: 88.56% (-0.03%) from 88.586%
#19674

push

github

ejona86
alts: Add ClientCall support to AltsContextUtil

This adds a createFrom(Attributes) to mirror the check(Attributes) added
in ba8ab79. It also adds conveniences for ClientCall for both
createFrom() and check(). This allows getting peer information from
ClientCall and CallCredentials.RequestInfo, as was already available
from ServerCall.

The tests were reworked to test the Attribute-based methods and then
only basic tests for client/server.

Fixes #11042

33752 of 38112 relevant lines covered (88.56%)

0.89 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

93.26
/../core/src/main/java/io/grpc/internal/AbstractClientStream.java
1
/*
2
 * Copyright 2014 The gRPC Authors
3
 *
4
 * Licensed under the Apache License, Version 2.0 (the "License");
5
 * you may not use this file except in compliance with the License.
6
 * You may obtain a copy of the License at
7
 *
8
 *     http://www.apache.org/licenses/LICENSE-2.0
9
 *
10
 * Unless required by applicable law or agreed to in writing, software
11
 * distributed under the License is distributed on an "AS IS" BASIS,
12
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
 * See the License for the specific language governing permissions and
14
 * limitations under the License.
15
 */
16

17
package io.grpc.internal;
18

19
import static com.google.common.base.Preconditions.checkNotNull;
20
import static com.google.common.base.Preconditions.checkState;
21
import static io.grpc.internal.GrpcUtil.CONTENT_ENCODING_KEY;
22
import static io.grpc.internal.GrpcUtil.MESSAGE_ENCODING_KEY;
23
import static io.grpc.internal.GrpcUtil.TIMEOUT_KEY;
24
import static java.lang.Math.max;
25

26
import com.google.common.annotations.VisibleForTesting;
27
import com.google.common.base.Preconditions;
28
import com.google.common.io.ByteStreams;
29
import io.grpc.Attributes;
30
import io.grpc.CallOptions;
31
import io.grpc.Codec;
32
import io.grpc.Compressor;
33
import io.grpc.Deadline;
34
import io.grpc.Decompressor;
35
import io.grpc.DecompressorRegistry;
36
import io.grpc.Grpc;
37
import io.grpc.Metadata;
38
import io.grpc.Status;
39
import io.grpc.internal.ClientStreamListener.RpcProgress;
40
import java.io.InputStream;
41
import java.util.concurrent.TimeUnit;
42
import java.util.logging.Level;
43
import java.util.logging.Logger;
44
import javax.annotation.Nullable;
45

46
/**
47
 * The abstract base class for {@link ClientStream} implementations. Extending classes only need to
48
 * implement {@link #transportState()} and {@link #abstractClientStreamSink()}. Must only be called
49
 * from the sending application thread.
50
 */
51
public abstract class AbstractClientStream extends AbstractStream
52
    implements ClientStream, MessageFramer.Sink {
53

54
  private static final Logger log = Logger.getLogger(AbstractClientStream.class.getName());
1✔
55

56
  /**
57
   * A sink for outbound operations, separated from the stream simply to avoid name
58
   * collisions/confusion. Only called from application thread.
59
   */
60
  protected interface Sink {
61
    /** 
62
     * Sends the request headers to the remote end point.
63
     *
64
     * @param metadata the metadata to be sent
65
     * @param payload the payload needs to be sent in the headers if not null. Should only be used
66
     *     when sending an unary GET request
67
     */
68
    void writeHeaders(Metadata metadata, @Nullable byte[] payload);
69

70
    /**
71
     * Sends an outbound frame to the remote end point.
72
     *
73
     * @param frame a buffer containing the chunk of data to be sent, or {@code null} if {@code
74
     *     endOfStream} with no data to send
75
     * @param endOfStream {@code true} if this is the last frame; {@code flush} is guaranteed to be
76
     *     {@code true} if this is {@code true}
77
     * @param flush {@code true} if more data may not be arriving soon
78
     * @param numMessages the number of messages this series of frames represents, must be >= 0.
79
     */
80
    void writeFrame(
81
        @Nullable WritableBuffer frame, boolean endOfStream, boolean flush, int numMessages);
82

83
    /**
84
     * Tears down the stream, typically in the event of a timeout. This method may be called
85
     * multiple times and from any thread.
86
     *
87
     * <p>This is a clone of {@link ClientStream#cancel(Status)};
88
     * {@link AbstractClientStream#cancel} delegates to this method.
89
     */
90
    void cancel(Status status);
91
  }
92

93
  private final TransportTracer transportTracer;
94
  private final Framer framer;
95
  private final boolean shouldBeCountedForInUse;
96
  private final boolean useGet;
97
  private Metadata headers;
98
  /**
99
   * Whether cancel() has been called. This is not strictly necessary, but removes the delay between
100
   * cancel() being called and isReady() beginning to return false, since cancel is commonly
101
   * processed asynchronously.
102
   */
103
  private volatile boolean cancelled;
104

105
  protected AbstractClientStream(
106
      WritableBufferAllocator bufferAllocator,
107
      StatsTraceContext statsTraceCtx,
108
      TransportTracer transportTracer,
109
      Metadata headers,
110
      CallOptions callOptions,
111
      boolean useGet) {
1✔
112
    checkNotNull(headers, "headers");
1✔
113
    this.transportTracer = checkNotNull(transportTracer, "transportTracer");
1✔
114
    this.shouldBeCountedForInUse = GrpcUtil.shouldBeCountedForInUse(callOptions);
1✔
115
    this.useGet = useGet;
1✔
116
    if (!useGet) {
1✔
117
      framer = new MessageFramer(this, bufferAllocator, statsTraceCtx);
1✔
118
      this.headers = headers;
1✔
119
    } else {
120
      framer = new GetFramer(headers, statsTraceCtx);
1✔
121
    }
122
  }
1✔
123

124
  @Override
125
  public void setDeadline(Deadline deadline) {
126
    headers.discardAll(TIMEOUT_KEY);
1✔
127
    long effectiveTimeout = max(0, deadline.timeRemaining(TimeUnit.NANOSECONDS));
1✔
128
    headers.put(TIMEOUT_KEY, effectiveTimeout);
1✔
129
  }
1✔
130

131
  @Override
132
  public void setMaxOutboundMessageSize(int maxSize) {
133
    framer.setMaxOutboundMessageSize(maxSize);
1✔
134
  }
1✔
135

136
  @Override
137
  public void setMaxInboundMessageSize(int maxSize) {
138
    transportState().setMaxInboundMessageSize(maxSize);
1✔
139
  }
1✔
140

141
  @Override
142
  public final void setFullStreamDecompression(boolean fullStreamDecompression) {
143
    transportState().setFullStreamDecompression(fullStreamDecompression);
1✔
144
  }
1✔
145

146
  @Override
147
  public final void setDecompressorRegistry(DecompressorRegistry decompressorRegistry) {
148
    transportState().setDecompressorRegistry(decompressorRegistry);
1✔
149
  }
1✔
150

151
  /** {@inheritDoc} */
152
  @Override
153
  protected abstract TransportState transportState();
154

155
  @Override
156
  public final void start(ClientStreamListener listener) {
157
    transportState().setListener(listener);
1✔
158
    if (!useGet) {
1✔
159
      abstractClientStreamSink().writeHeaders(headers, null);
1✔
160
      headers = null;
1✔
161
    }
162
  }
1✔
163

164
  /**
165
   * Sink for transport to be called to perform outbound operations. Each stream must have its own
166
   * unique sink.
167
   */
168
  protected abstract Sink abstractClientStreamSink();
169

170
  @Override
171
  protected final Framer framer() {
172
    return framer;
1✔
173
  }
174

175
  /**
176
   * Returns true if this stream should be counted when determining the in-use state of the
177
   * transport.
178
   */
179
  public final boolean shouldBeCountedForInUse() {
180
    return shouldBeCountedForInUse;
1✔
181
  }
182

183
  @Override
184
  public final void deliverFrame(
185
      WritableBuffer frame, boolean endOfStream, boolean flush, int numMessages) {
186
    Preconditions.checkArgument(frame != null || endOfStream, "null frame before EOS");
1✔
187
    abstractClientStreamSink().writeFrame(frame, endOfStream, flush, numMessages);
1✔
188
  }
1✔
189

190
  @Override
191
  public final void halfClose() {
192
    if (!transportState().isOutboundClosed()) {
1✔
193
      transportState().setOutboundClosed();
1✔
194
      endOfMessages();
1✔
195
    }
196
  }
1✔
197

198
  @Override
199
  public final void cancel(Status reason) {
200
    Preconditions.checkArgument(!reason.isOk(), "Should not cancel with OK status");
1✔
201
    cancelled = true;
1✔
202
    abstractClientStreamSink().cancel(reason);
1✔
203
  }
1✔
204

205
  @Override
206
  public final boolean isReady() {
207
    return super.isReady() && !cancelled;
1✔
208
  }
209

210
  @Override
211
  public final void appendTimeoutInsight(InsightBuilder insight) {
212
    Attributes attrs = getAttributes();
1✔
213
    insight.appendKeyValue("remote_addr", attrs.get(Grpc.TRANSPORT_ATTR_REMOTE_ADDR));
1✔
214
  }
1✔
215

216
  protected TransportTracer getTransportTracer() {
217
    return transportTracer;
1✔
218
  }
219

220
  /** This should only be called from the transport thread. */
221
  protected abstract static class TransportState extends AbstractStream.TransportState {
222
    /** Whether listener.closed() has been called. */
223
    private final StatsTraceContext statsTraceCtx;
224
    private boolean listenerClosed;
225
    private ClientStreamListener listener;
226
    private boolean fullStreamDecompression;
227
    private DecompressorRegistry decompressorRegistry = DecompressorRegistry.getDefaultInstance();
1✔
228

229
    private boolean deframerClosed = false;
1✔
230
    private Runnable deframerClosedTask;
231

232
    /** Whether the client has half-closed the stream. */
233
    private volatile boolean outboundClosed;
234

235
    /**
236
     * Whether the stream is closed from the transport's perspective. This can differ from {@link
237
     * #listenerClosed} because there may still be messages buffered to deliver to the application.
238
     */
239
    private boolean statusReported;
240
    /** True if the status reported (set via {@link #transportReportStatus}) is OK. */
241
    private boolean statusReportedIsOk;
242

243
    protected TransportState(
244
        int maxMessageSize,
245
        StatsTraceContext statsTraceCtx,
246
        TransportTracer transportTracer,
247
        CallOptions options) {
248
      super(maxMessageSize, statsTraceCtx, transportTracer);
1✔
249
      this.statsTraceCtx = checkNotNull(statsTraceCtx, "statsTraceCtx");
1✔
250
      if (options.getOnReadyThreshold() != null) {
1✔
251
        this.setOnReadyThreshold(options.getOnReadyThreshold());
1✔
252
      }
253
    }
1✔
254

255
    private void setFullStreamDecompression(boolean fullStreamDecompression) {
256
      this.fullStreamDecompression = fullStreamDecompression;
1✔
257
    }
1✔
258

259
    private void setDecompressorRegistry(DecompressorRegistry decompressorRegistry) {
260
      checkState(this.listener == null, "Already called start");
1✔
261
      this.decompressorRegistry =
1✔
262
          checkNotNull(decompressorRegistry, "decompressorRegistry");
1✔
263
    }
1✔
264

265
    @VisibleForTesting
266
    public final void setListener(ClientStreamListener listener) {
267
      checkState(this.listener == null, "Already called setListener");
1✔
268
      this.listener = checkNotNull(listener, "listener");
1✔
269
    }
1✔
270

271
    @Override
272
    public void deframerClosed(boolean hasPartialMessage) {
273
      checkState(statusReported, "status should have been reported on deframer closed");
1✔
274
      deframerClosed = true;
1✔
275
      if (statusReportedIsOk && hasPartialMessage) {
1✔
276
        transportReportStatus(
1✔
277
            Status.INTERNAL.withDescription("Encountered end-of-stream mid-frame"),
1✔
278
            true,
279
            new Metadata());
280
      }
281
      if (deframerClosedTask != null) {
1✔
282
        deframerClosedTask.run();
1✔
283
        deframerClosedTask = null;
1✔
284
      }
285
    }
1✔
286

287
    @Override
288
    protected final ClientStreamListener listener() {
289
      return listener;
1✔
290
    }
291

292
    private final void setOutboundClosed() {
293
      outboundClosed = true;
1✔
294
    }
1✔
295

296
    protected final boolean isOutboundClosed() {
297
      return outboundClosed;
1✔
298
    }
299

300
    /**
301
     * Called by transport implementations when they receive headers.
302
     *
303
     * @param headers the parsed headers
304
     */
305
    protected void inboundHeadersReceived(Metadata headers) {
306
      checkState(!statusReported, "Received headers on closed stream");
1✔
307
      statsTraceCtx.clientInboundHeaders(headers);
1✔
308

309
      boolean compressedStream = false;
1✔
310
      String streamEncoding = headers.get(CONTENT_ENCODING_KEY);
1✔
311
      if (fullStreamDecompression && streamEncoding != null) {
1✔
312
        if (streamEncoding.equalsIgnoreCase("gzip")) {
1✔
313
          setFullStreamDecompressor(new GzipInflatingBuffer());
1✔
314
          compressedStream = true;
1✔
315
        } else if (!streamEncoding.equalsIgnoreCase("identity")) {
1✔
316
          deframeFailed(
1✔
317
              Status.INTERNAL
318
                  .withDescription(
1✔
319
                      String.format("Can't find full stream decompressor for %s", streamEncoding))
1✔
320
                  .asRuntimeException());
1✔
321
          return;
1✔
322
        }
323
      }
324

325
      String messageEncoding = headers.get(MESSAGE_ENCODING_KEY);
1✔
326
      if (messageEncoding != null) {
1✔
327
        Decompressor decompressor = decompressorRegistry.lookupDecompressor(messageEncoding);
1✔
328
        if (decompressor == null) {
1✔
329
          deframeFailed(
1✔
330
              Status.INTERNAL
331
                  .withDescription(String.format("Can't find decompressor for %s", messageEncoding))
1✔
332
                  .asRuntimeException());
1✔
333
          return;
1✔
334
        } else if (decompressor != Codec.Identity.NONE) {
1✔
335
          if (compressedStream) {
1✔
336
            deframeFailed(
1✔
337
                Status.INTERNAL
338
                    .withDescription("Full stream and gRPC message encoding cannot both be set")
1✔
339
                    .asRuntimeException());
1✔
340
            return;
1✔
341
          }
342
          setDecompressor(decompressor);
1✔
343
        }
344
      }
345

346
      listener().headersRead(headers);
1✔
347
    }
1✔
348

349
    /**
350
     * Processes the contents of a received data frame from the server.
351
     *
352
     * @param frame the received data frame. Its ownership is transferred to this method.
353
     */
354
    protected void inboundDataReceived(ReadableBuffer frame) {
355
      checkNotNull(frame, "frame");
1✔
356
      boolean needToCloseFrame = true;
1✔
357
      try {
358
        if (statusReported) {
1✔
359
          log.log(Level.INFO, "Received data on closed stream");
×
360
          return;
×
361
        }
362

363
        needToCloseFrame = false;
1✔
364
        deframe(frame);
1✔
365
      } finally {
366
        if (needToCloseFrame) {
1✔
367
          frame.close();
×
368
        }
369
      }
370
    }
1✔
371

372
    /**
373
     * Processes the trailers and status from the server.
374
     *
375
     * @param trailers the received trailers
376
     * @param status the status extracted from the trailers
377
     */
378
    protected void inboundTrailersReceived(Metadata trailers, Status status) {
379
      checkNotNull(status, "status");
1✔
380
      checkNotNull(trailers, "trailers");
1✔
381
      if (statusReported) {
1✔
382
        log.log(Level.INFO, "Received trailers on closed stream:\n {1}\n {2}",
1✔
383
            new Object[]{status, trailers});
384
        return;
1✔
385
      }
386
      statsTraceCtx.clientInboundTrailers(trailers);
1✔
387
      transportReportStatus(status, false, trailers);
1✔
388
    }
1✔
389

390
    /**
391
     * Report stream closure with status to the application layer if not already reported. This
392
     * method must be called from the transport thread.
393
     *
394
     * @param status the new status to set
395
     * @param stopDelivery if {@code true}, interrupts any further delivery of inbound messages that
396
     *        may already be queued up in the deframer. If {@code false}, the listener will be
397
     *        notified immediately after all currently completed messages in the deframer have been
398
     *        delivered to the application.
399
     * @param trailers new instance of {@code Trailers}, either empty or those returned by the
400
     *        server
401
     */
402
    public final void transportReportStatus(final Status status, boolean stopDelivery,
403
        final Metadata trailers) {
404
      transportReportStatus(status, RpcProgress.PROCESSED, stopDelivery, trailers);
1✔
405
    }
1✔
406

407
    /**
408
     * Report stream closure with status to the application layer if not already reported. This
409
     * method must be called from the transport thread.
410
     *
411
     * @param status the new status to set
412
     * @param rpcProgress RPC progress that the
413
     *        {@link ClientStreamListener#closed(Status, RpcProgress, Metadata)}
414
     *        will receive
415
     * @param stopDelivery if {@code true}, interrupts any further delivery of inbound messages that
416
     *        may already be queued up in the deframer and overrides any previously queued status.
417
     *        If {@code false}, the listener will be notified immediately after all currently
418
     *        completed messages in the deframer have been delivered to the application.
419
     * @param trailers new instance of {@code Trailers}, either empty or those returned by the
420
     *        server
421
     */
422
    public final void transportReportStatus(
423
        final Status status,
424
        final RpcProgress rpcProgress,
425
        boolean stopDelivery,
426
        final Metadata trailers) {
427
      checkNotNull(status, "status");
1✔
428
      checkNotNull(trailers, "trailers");
1✔
429
      // If stopDelivery, we continue in case previous invocation is waiting for stall
430
      if (statusReported && !stopDelivery) {
1✔
431
        return;
1✔
432
      }
433
      statusReported = true;
1✔
434
      statusReportedIsOk = status.isOk();
1✔
435
      onStreamDeallocated();
1✔
436

437
      if (deframerClosed) {
1✔
438
        deframerClosedTask = null;
1✔
439
        closeListener(status, rpcProgress, trailers);
1✔
440
      } else {
441
        deframerClosedTask =
1✔
442
            new Runnable() {
1✔
443
              @Override
444
              public void run() {
445
                closeListener(status, rpcProgress, trailers);
1✔
446
              }
1✔
447
            };
448
        closeDeframer(stopDelivery);
1✔
449
      }
450
    }
1✔
451

452
    /**
453
     * Closes the listener if not previously closed.
454
     *
455
     * @throws IllegalStateException if the call has not yet been started.
456
     */
457
    private void closeListener(
458
        Status status, RpcProgress rpcProgress, Metadata trailers) {
459
      if (!listenerClosed) {
1✔
460
        listenerClosed = true;
1✔
461
        statsTraceCtx.streamClosed(status);
1✔
462
        if (getTransportTracer() != null) {
1✔
463
          getTransportTracer().reportStreamClosed(status.isOk());
1✔
464
        }
465
        listener().closed(status, rpcProgress, trailers);
1✔
466
      }
467
    }
1✔
468
  }
469

470
  private class GetFramer implements Framer {
471
    private Metadata headers;
472
    private boolean closed;
473
    private final StatsTraceContext statsTraceCtx;
474
    private byte[] payload;
475

476
    public GetFramer(Metadata headers, StatsTraceContext statsTraceCtx) {
1✔
477
      this.headers = checkNotNull(headers, "headers");
1✔
478
      this.statsTraceCtx = checkNotNull(statsTraceCtx, "statsTraceCtx");
1✔
479
    }
1✔
480

481
    @SuppressWarnings("BetaApi") // ByteStreams is not Beta in v27
482
    @Override
483
    public void writePayload(InputStream message) {
484
      checkState(payload == null, "writePayload should not be called multiple times");
1✔
485
      try {
486
        payload = ByteStreams.toByteArray(message);
1✔
487
      } catch (java.io.IOException ex) {
×
488
        throw new RuntimeException(ex);
×
489
      }
1✔
490
      statsTraceCtx.outboundMessage(0);
1✔
491
      statsTraceCtx.outboundMessageSent(0, payload.length, payload.length);
1✔
492
      statsTraceCtx.outboundUncompressedSize(payload.length);
1✔
493
      // NB(zhangkun83): this is not accurate, because the underlying transport will probably encode
494
      // it using e.g., base64.  However, we are not supposed to know such detail here.
495
      //
496
      // We don't want to move this line to where the encoding happens either, because we'd better
497
      // contain the message stats reporting in Framer as suggested in StatsTraceContext.
498
      // Scattering the reporting sites increases the risk of mis-counting or double-counting.
499
      //
500
      // Because the payload is usually very small, people shouldn't care about the size difference
501
      // caused by encoding.
502
      statsTraceCtx.outboundWireSize(payload.length);
1✔
503
    }
1✔
504

505
    @Override
506
    public void flush() {}
1✔
507

508
    @Override
509
    public boolean isClosed() {
510
      return closed;
1✔
511
    }
512

513
    /** Closes, with flush. */
514
    @Override
515
    public void close() {
516
      closed = true;
1✔
517
      checkState(payload != null,
1✔
518
          "Lack of request message. GET request is only supported for unary requests");
519
      abstractClientStreamSink().writeHeaders(headers, payload);
1✔
520
      payload = null;
1✔
521
      headers = null;
1✔
522
    }
1✔
523

524
    /** Closes, without flush. */
525
    @Override
526
    public void dispose() {
527
      closed = true;
×
528
      payload = null;
×
529
      headers = null;
×
530
    }
×
531

532
    // Compression is not supported for GET encoding.
533
    @Override
534
    public Framer setMessageCompression(boolean enable) {
535
      return this;
×
536
    }
537

538
    @Override
539
    public Framer setCompressor(Compressor compressor) {
540
      return this;
×
541
    }
542

543
    // TODO(zsurocking): support this
544
    @Override
545
    public void setMaxOutboundMessageSize(int maxSize) {}
×
546
  }
547
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc