• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

grpc / grpc-java / #18760

pending completion
#18760

push

github-actions

ejona86
Avoid accidental Java 9+ dependency with --release

In d654707 we swapped compiling the uploaded artifacts to Java 11. This
caused ABI issues with ByteBuffer, like clear() returning ByteBuffer
instead of Buffer.

There are source-level approaches to avoid the accidental ABI dependency
on Java 11, but we have no tool able to detect such breakages.
We use Animalsniffer for similar cases, but it fails to detect these[1].
Since we have no tool, source-level approaches can't gain the necessary
confidence that all incompatibility fixes have been resolved.

Java has had javac-level ways to address this, but they used to require
setting bootclasspath. Since Java 9, though, they made it easier and we
can use --release, which does exactly what we need.

Fixes #10432

1. https://github.com/mojohaus/animal-sniffer/issues/77

30622 of 34701 relevant lines covered (88.25%)

0.88 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

93.18
/../core/src/main/java/io/grpc/internal/AbstractClientStream.java
1
/*
2
 * Copyright 2014 The gRPC Authors
3
 *
4
 * Licensed under the Apache License, Version 2.0 (the "License");
5
 * you may not use this file except in compliance with the License.
6
 * You may obtain a copy of the License at
7
 *
8
 *     http://www.apache.org/licenses/LICENSE-2.0
9
 *
10
 * Unless required by applicable law or agreed to in writing, software
11
 * distributed under the License is distributed on an "AS IS" BASIS,
12
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
 * See the License for the specific language governing permissions and
14
 * limitations under the License.
15
 */
16

17
package io.grpc.internal;
18

19
import static com.google.common.base.Preconditions.checkNotNull;
20
import static com.google.common.base.Preconditions.checkState;
21
import static io.grpc.internal.GrpcUtil.CONTENT_ENCODING_KEY;
22
import static io.grpc.internal.GrpcUtil.MESSAGE_ENCODING_KEY;
23
import static io.grpc.internal.GrpcUtil.TIMEOUT_KEY;
24
import static java.lang.Math.max;
25

26
import com.google.common.annotations.VisibleForTesting;
27
import com.google.common.base.Preconditions;
28
import com.google.common.io.ByteStreams;
29
import io.grpc.Attributes;
30
import io.grpc.CallOptions;
31
import io.grpc.Codec;
32
import io.grpc.Compressor;
33
import io.grpc.Deadline;
34
import io.grpc.Decompressor;
35
import io.grpc.DecompressorRegistry;
36
import io.grpc.Grpc;
37
import io.grpc.Metadata;
38
import io.grpc.Status;
39
import io.grpc.internal.ClientStreamListener.RpcProgress;
40
import java.io.InputStream;
41
import java.util.concurrent.TimeUnit;
42
import java.util.logging.Level;
43
import java.util.logging.Logger;
44
import javax.annotation.Nullable;
45

46
/**
47
 * The abstract base class for {@link ClientStream} implementations. Extending classes only need to
48
 * implement {@link #transportState()} and {@link #abstractClientStreamSink()}. Must only be called
49
 * from the sending application thread.
50
 */
51
public abstract class AbstractClientStream extends AbstractStream
52
    implements ClientStream, MessageFramer.Sink {
53

54
  private static final Logger log = Logger.getLogger(AbstractClientStream.class.getName());
1✔
55

56
  /**
57
   * A sink for outbound operations, separated from the stream simply to avoid name
58
   * collisions/confusion. Only called from application thread.
59
   */
60
  protected interface Sink {
61
    /** 
62
     * Sends the request headers to the remote end point.
63
     *
64
     * @param metadata the metadata to be sent
65
     * @param payload the payload needs to be sent in the headers if not null. Should only be used
66
     *     when sending an unary GET request
67
     */
68
    void writeHeaders(Metadata metadata, @Nullable byte[] payload);
69

70
    /**
71
     * Sends an outbound frame to the remote end point.
72
     *
73
     * @param frame a buffer containing the chunk of data to be sent, or {@code null} if {@code
74
     *     endOfStream} with no data to send
75
     * @param endOfStream {@code true} if this is the last frame; {@code flush} is guaranteed to be
76
     *     {@code true} if this is {@code true}
77
     * @param flush {@code true} if more data may not be arriving soon
78
     * @param numMessages the number of messages this series of frames represents, must be >= 0.
79
     */
80
    void writeFrame(
81
        @Nullable WritableBuffer frame, boolean endOfStream, boolean flush, int numMessages);
82

83
    /**
84
     * Tears down the stream, typically in the event of a timeout. This method may be called
85
     * multiple times and from any thread.
86
     *
87
     * <p>This is a clone of {@link ClientStream#cancel(Status)};
88
     * {@link AbstractClientStream#cancel} delegates to this method.
89
     */
90
    void cancel(Status status);
91
  }
92

93
  private final TransportTracer transportTracer;
94
  private final Framer framer;
95
  private boolean shouldBeCountedForInUse;
96
  private boolean useGet;
97
  private Metadata headers;
98
  /**
99
   * Whether cancel() has been called. This is not strictly necessary, but removes the delay between
100
   * cancel() being called and isReady() beginning to return false, since cancel is commonly
101
   * processed asynchronously.
102
   */
103
  private volatile boolean cancelled;
104

105
  protected AbstractClientStream(
106
      WritableBufferAllocator bufferAllocator,
107
      StatsTraceContext statsTraceCtx,
108
      TransportTracer transportTracer,
109
      Metadata headers,
110
      CallOptions callOptions,
111
      boolean useGet) {
1✔
112
    checkNotNull(headers, "headers");
1✔
113
    this.transportTracer = checkNotNull(transportTracer, "transportTracer");
1✔
114
    this.shouldBeCountedForInUse = GrpcUtil.shouldBeCountedForInUse(callOptions);
1✔
115
    this.useGet = useGet;
1✔
116
    if (!useGet) {
1✔
117
      framer = new MessageFramer(this, bufferAllocator, statsTraceCtx);
1✔
118
      this.headers = headers;
1✔
119
    } else {
120
      framer = new GetFramer(headers, statsTraceCtx);
1✔
121
    }
122
  }
1✔
123

124
  @Override
125
  public void setDeadline(Deadline deadline) {
126
    headers.discardAll(TIMEOUT_KEY);
1✔
127
    long effectiveTimeout = max(0, deadline.timeRemaining(TimeUnit.NANOSECONDS));
1✔
128
    headers.put(TIMEOUT_KEY, effectiveTimeout);
1✔
129
  }
1✔
130

131
  @Override
132
  public void setMaxOutboundMessageSize(int maxSize) {
133
    framer.setMaxOutboundMessageSize(maxSize);
1✔
134
  }
1✔
135

136
  @Override
137
  public void setMaxInboundMessageSize(int maxSize) {
138
    transportState().setMaxInboundMessageSize(maxSize);
1✔
139
  }
1✔
140

141
  @Override
142
  public final void setFullStreamDecompression(boolean fullStreamDecompression) {
143
    transportState().setFullStreamDecompression(fullStreamDecompression);
1✔
144
  }
1✔
145

146
  @Override
147
  public final void setDecompressorRegistry(DecompressorRegistry decompressorRegistry) {
148
    transportState().setDecompressorRegistry(decompressorRegistry);
1✔
149
  }
1✔
150

151
  /** {@inheritDoc} */
152
  @Override
153
  protected abstract TransportState transportState();
154

155
  @Override
156
  public final void start(ClientStreamListener listener) {
157
    transportState().setListener(listener);
1✔
158
    if (!useGet) {
1✔
159
      abstractClientStreamSink().writeHeaders(headers, null);
1✔
160
      headers = null;
1✔
161
    }
162
  }
1✔
163

164
  /**
165
   * Sink for transport to be called to perform outbound operations. Each stream must have its own
166
   * unique sink.
167
   */
168
  protected abstract Sink abstractClientStreamSink();
169

170
  @Override
171
  protected final Framer framer() {
172
    return framer;
1✔
173
  }
174

175
  /**
176
   * Returns true if this stream should be counted when determining the in-use state of the
177
   * transport.
178
   */
179
  public final boolean shouldBeCountedForInUse() {
180
    return shouldBeCountedForInUse;
1✔
181
  }
182

183
  @Override
184
  public final void deliverFrame(
185
      WritableBuffer frame, boolean endOfStream, boolean flush, int numMessages) {
186
    Preconditions.checkArgument(frame != null || endOfStream, "null frame before EOS");
1✔
187
    abstractClientStreamSink().writeFrame(frame, endOfStream, flush, numMessages);
1✔
188
  }
1✔
189

190
  @Override
191
  public final void halfClose() {
192
    if (!transportState().isOutboundClosed()) {
1✔
193
      transportState().setOutboundClosed();
1✔
194
      endOfMessages();
1✔
195
    }
196
  }
1✔
197

198
  @Override
199
  public final void cancel(Status reason) {
200
    Preconditions.checkArgument(!reason.isOk(), "Should not cancel with OK status");
1✔
201
    cancelled = true;
1✔
202
    abstractClientStreamSink().cancel(reason);
1✔
203
  }
1✔
204

205
  @Override
206
  public final boolean isReady() {
207
    return super.isReady() && !cancelled;
1✔
208
  }
209

210
  @Override
211
  public final void appendTimeoutInsight(InsightBuilder insight) {
212
    Attributes attrs = getAttributes();
1✔
213
    insight.appendKeyValue("remote_addr", attrs.get(Grpc.TRANSPORT_ATTR_REMOTE_ADDR));
1✔
214
  }
1✔
215

216
  protected TransportTracer getTransportTracer() {
217
    return transportTracer;
1✔
218
  }
219

220
  /** This should only be called from the transport thread. */
221
  protected abstract static class TransportState extends AbstractStream.TransportState {
222
    /** Whether listener.closed() has been called. */
223
    private final StatsTraceContext statsTraceCtx;
224
    private boolean listenerClosed;
225
    private ClientStreamListener listener;
226
    private boolean fullStreamDecompression;
227
    private DecompressorRegistry decompressorRegistry = DecompressorRegistry.getDefaultInstance();
1✔
228

229
    private boolean deframerClosed = false;
1✔
230
    private Runnable deframerClosedTask;
231

232
    /** Whether the client has half-closed the stream. */
233
    private volatile boolean outboundClosed;
234

235
    /**
236
     * Whether the stream is closed from the transport's perspective. This can differ from {@link
237
     * #listenerClosed} because there may still be messages buffered to deliver to the application.
238
     */
239
    private boolean statusReported;
240
    /** True if the status reported (set via {@link #transportReportStatus}) is OK. */
241
    private boolean statusReportedIsOk;
242

243
    protected TransportState(
244
        int maxMessageSize,
245
        StatsTraceContext statsTraceCtx,
246
        TransportTracer transportTracer) {
247
      super(maxMessageSize, statsTraceCtx, transportTracer);
1✔
248
      this.statsTraceCtx = checkNotNull(statsTraceCtx, "statsTraceCtx");
1✔
249
    }
1✔
250

251
    private void setFullStreamDecompression(boolean fullStreamDecompression) {
252
      this.fullStreamDecompression = fullStreamDecompression;
1✔
253
    }
1✔
254

255
    private void setDecompressorRegistry(DecompressorRegistry decompressorRegistry) {
256
      checkState(this.listener == null, "Already called start");
1✔
257
      this.decompressorRegistry =
1✔
258
          checkNotNull(decompressorRegistry, "decompressorRegistry");
1✔
259
    }
1✔
260

261
    @VisibleForTesting
262
    public final void setListener(ClientStreamListener listener) {
263
      checkState(this.listener == null, "Already called setListener");
1✔
264
      this.listener = checkNotNull(listener, "listener");
1✔
265
    }
1✔
266

267
    @Override
268
    public void deframerClosed(boolean hasPartialMessage) {
269
      checkState(statusReported, "status should have been reported on deframer closed");
1✔
270
      deframerClosed = true;
1✔
271
      if (statusReportedIsOk && hasPartialMessage) {
1✔
272
        transportReportStatus(
1✔
273
            Status.INTERNAL.withDescription("Encountered end-of-stream mid-frame"),
1✔
274
            true,
275
            new Metadata());
276
      }
277
      if (deframerClosedTask != null) {
1✔
278
        deframerClosedTask.run();
1✔
279
        deframerClosedTask = null;
1✔
280
      }
281
    }
1✔
282

283
    @Override
284
    protected final ClientStreamListener listener() {
285
      return listener;
1✔
286
    }
287

288
    private final void setOutboundClosed() {
289
      outboundClosed = true;
1✔
290
    }
1✔
291

292
    protected final boolean isOutboundClosed() {
293
      return outboundClosed;
1✔
294
    }
295

296
    /**
297
     * Called by transport implementations when they receive headers.
298
     *
299
     * @param headers the parsed headers
300
     */
301
    protected void inboundHeadersReceived(Metadata headers) {
302
      checkState(!statusReported, "Received headers on closed stream");
1✔
303
      statsTraceCtx.clientInboundHeaders();
1✔
304

305
      boolean compressedStream = false;
1✔
306
      String streamEncoding = headers.get(CONTENT_ENCODING_KEY);
1✔
307
      if (fullStreamDecompression && streamEncoding != null) {
1✔
308
        if (streamEncoding.equalsIgnoreCase("gzip")) {
1✔
309
          setFullStreamDecompressor(new GzipInflatingBuffer());
1✔
310
          compressedStream = true;
1✔
311
        } else if (!streamEncoding.equalsIgnoreCase("identity")) {
1✔
312
          deframeFailed(
1✔
313
              Status.INTERNAL
314
                  .withDescription(
1✔
315
                      String.format("Can't find full stream decompressor for %s", streamEncoding))
1✔
316
                  .asRuntimeException());
1✔
317
          return;
1✔
318
        }
319
      }
320

321
      String messageEncoding = headers.get(MESSAGE_ENCODING_KEY);
1✔
322
      if (messageEncoding != null) {
1✔
323
        Decompressor decompressor = decompressorRegistry.lookupDecompressor(messageEncoding);
1✔
324
        if (decompressor == null) {
1✔
325
          deframeFailed(
1✔
326
              Status.INTERNAL
327
                  .withDescription(String.format("Can't find decompressor for %s", messageEncoding))
1✔
328
                  .asRuntimeException());
1✔
329
          return;
1✔
330
        } else if (decompressor != Codec.Identity.NONE) {
1✔
331
          if (compressedStream) {
1✔
332
            deframeFailed(
1✔
333
                Status.INTERNAL
334
                    .withDescription("Full stream and gRPC message encoding cannot both be set")
1✔
335
                    .asRuntimeException());
1✔
336
            return;
1✔
337
          }
338
          setDecompressor(decompressor);
1✔
339
        }
340
      }
341

342
      listener().headersRead(headers);
1✔
343
    }
1✔
344

345
    /**
346
     * Processes the contents of a received data frame from the server.
347
     *
348
     * @param frame the received data frame. Its ownership is transferred to this method.
349
     */
350
    protected void inboundDataReceived(ReadableBuffer frame) {
351
      checkNotNull(frame, "frame");
1✔
352
      boolean needToCloseFrame = true;
1✔
353
      try {
354
        if (statusReported) {
1✔
355
          log.log(Level.INFO, "Received data on closed stream");
×
356
          return;
×
357
        }
358

359
        needToCloseFrame = false;
1✔
360
        deframe(frame);
1✔
361
      } finally {
362
        if (needToCloseFrame) {
1✔
363
          frame.close();
×
364
        }
365
      }
366
    }
1✔
367

368
    /**
369
     * Processes the trailers and status from the server.
370
     *
371
     * @param trailers the received trailers
372
     * @param status the status extracted from the trailers
373
     */
374
    protected void inboundTrailersReceived(Metadata trailers, Status status) {
375
      checkNotNull(status, "status");
1✔
376
      checkNotNull(trailers, "trailers");
1✔
377
      if (statusReported) {
1✔
378
        log.log(Level.INFO, "Received trailers on closed stream:\n {1}\n {2}",
1✔
379
            new Object[]{status, trailers});
380
        return;
1✔
381
      }
382
      statsTraceCtx.clientInboundTrailers(trailers);
1✔
383
      transportReportStatus(status, false, trailers);
1✔
384
    }
1✔
385

386
    /**
387
     * Report stream closure with status to the application layer if not already reported. This
388
     * method must be called from the transport thread.
389
     *
390
     * @param status the new status to set
391
     * @param stopDelivery if {@code true}, interrupts any further delivery of inbound messages that
392
     *        may already be queued up in the deframer. If {@code false}, the listener will be
393
     *        notified immediately after all currently completed messages in the deframer have been
394
     *        delivered to the application.
395
     * @param trailers new instance of {@code Trailers}, either empty or those returned by the
396
     *        server
397
     */
398
    public final void transportReportStatus(final Status status, boolean stopDelivery,
399
        final Metadata trailers) {
400
      transportReportStatus(status, RpcProgress.PROCESSED, stopDelivery, trailers);
1✔
401
    }
1✔
402

403
    /**
404
     * Report stream closure with status to the application layer if not already reported. This
405
     * method must be called from the transport thread.
406
     *
407
     * @param status the new status to set
408
     * @param rpcProgress RPC progress that the
409
     *        {@link ClientStreamListener#closed(Status, RpcProgress, Metadata)}
410
     *        will receive
411
     * @param stopDelivery if {@code true}, interrupts any further delivery of inbound messages that
412
     *        may already be queued up in the deframer and overrides any previously queued status.
413
     *        If {@code false}, the listener will be notified immediately after all currently
414
     *        completed messages in the deframer have been delivered to the application.
415
     * @param trailers new instance of {@code Trailers}, either empty or those returned by the
416
     *        server
417
     */
418
    public final void transportReportStatus(
419
        final Status status,
420
        final RpcProgress rpcProgress,
421
        boolean stopDelivery,
422
        final Metadata trailers) {
423
      checkNotNull(status, "status");
1✔
424
      checkNotNull(trailers, "trailers");
1✔
425
      // If stopDelivery, we continue in case previous invocation is waiting for stall
426
      if (statusReported && !stopDelivery) {
1✔
427
        return;
1✔
428
      }
429
      statusReported = true;
1✔
430
      statusReportedIsOk = status.isOk();
1✔
431
      onStreamDeallocated();
1✔
432

433
      if (deframerClosed) {
1✔
434
        deframerClosedTask = null;
1✔
435
        closeListener(status, rpcProgress, trailers);
1✔
436
      } else {
437
        deframerClosedTask =
1✔
438
            new Runnable() {
1✔
439
              @Override
440
              public void run() {
441
                closeListener(status, rpcProgress, trailers);
1✔
442
              }
1✔
443
            };
444
        closeDeframer(stopDelivery);
1✔
445
      }
446
    }
1✔
447

448
    /**
449
     * Closes the listener if not previously closed.
450
     *
451
     * @throws IllegalStateException if the call has not yet been started.
452
     */
453
    private void closeListener(
454
        Status status, RpcProgress rpcProgress, Metadata trailers) {
455
      if (!listenerClosed) {
1✔
456
        listenerClosed = true;
1✔
457
        statsTraceCtx.streamClosed(status);
1✔
458
        listener().closed(status, rpcProgress, trailers);
1✔
459
        if (getTransportTracer() != null) {
1✔
460
          getTransportTracer().reportStreamClosed(status.isOk());
1✔
461
        }
462
      }
463
    }
1✔
464
  }
465

466
  private class GetFramer implements Framer {
467
    private Metadata headers;
468
    private boolean closed;
469
    private final StatsTraceContext statsTraceCtx;
470
    private byte[] payload;
471

472
    public GetFramer(Metadata headers, StatsTraceContext statsTraceCtx) {
1✔
473
      this.headers = checkNotNull(headers, "headers");
1✔
474
      this.statsTraceCtx = checkNotNull(statsTraceCtx, "statsTraceCtx");
1✔
475
    }
1✔
476

477
    @SuppressWarnings("BetaApi") // ByteStreams is not Beta in v27
478
    @Override
479
    public void writePayload(InputStream message) {
480
      checkState(payload == null, "writePayload should not be called multiple times");
1✔
481
      try {
482
        payload = ByteStreams.toByteArray(message);
1✔
483
      } catch (java.io.IOException ex) {
×
484
        throw new RuntimeException(ex);
×
485
      }
1✔
486
      statsTraceCtx.outboundMessage(0);
1✔
487
      statsTraceCtx.outboundMessageSent(0, payload.length, payload.length);
1✔
488
      statsTraceCtx.outboundUncompressedSize(payload.length);
1✔
489
      // NB(zhangkun83): this is not accurate, because the underlying transport will probably encode
490
      // it using e.g., base64.  However, we are not supposed to know such detail here.
491
      //
492
      // We don't want to move this line to where the encoding happens either, because we'd better
493
      // contain the message stats reporting in Framer as suggested in StatsTraceContext.
494
      // Scattering the reporting sites increases the risk of mis-counting or double-counting.
495
      //
496
      // Because the payload is usually very small, people shouldn't care about the size difference
497
      // caused by encoding.
498
      statsTraceCtx.outboundWireSize(payload.length);
1✔
499
    }
1✔
500

501
    @Override
502
    public void flush() {}
1✔
503

504
    @Override
505
    public boolean isClosed() {
506
      return closed;
1✔
507
    }
508

509
    /** Closes, with flush. */
510
    @Override
511
    public void close() {
512
      closed = true;
1✔
513
      checkState(payload != null,
1✔
514
          "Lack of request message. GET request is only supported for unary requests");
515
      abstractClientStreamSink().writeHeaders(headers, payload);
1✔
516
      payload = null;
1✔
517
      headers = null;
1✔
518
    }
1✔
519

520
    /** Closes, without flush. */
521
    @Override
522
    public void dispose() {
523
      closed = true;
×
524
      payload = null;
×
525
      headers = null;
×
526
    }
×
527

528
    // Compression is not supported for GET encoding.
529
    @Override
530
    public Framer setMessageCompression(boolean enable) {
531
      return this;
×
532
    }
533

534
    @Override
535
    public Framer setCompressor(Compressor compressor) {
536
      return this;
×
537
    }
538

539
    // TODO(zsurocking): support this
540
    @Override
541
    public void setMaxOutboundMessageSize(int maxSize) {}
×
542
  }
543
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc