• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

grpc / grpc-java / #18689

pending completion
#18689

push

github-actions

web-flow
buildscripts: Build Android with main build in linux_artifacts

Note that this changes the JDK used to compile releases to Java 11. That
should only impact the appearance of the Javadoc.

This adds the Android SDK to the build container, removing the
dependency on the Android SDK being available on the CI host. This
allows running on newer Kokoro images. 'Android' and 'Android interop'
CIs still depend on the Android SDK being available on the host, but
since they aren't used as part of the release process, they can more
easily migrate off Kokoro as part of future work.

This also causes Android components to now be built with -Werror, as we
use -PfailOnWarnings=true in unix.sh but were missing it from the
Android build invocations.

Gradle will auto-download the necessary version of build-tools. We don't
want to download it ourselves because the version we specify might not
even be used. Looking at logs, we were previously downloading a version
that was unused.

We now fork javac to avoid OOM. The build fails 2/3 times before the
forking, and 0/3 after.

31011 of 35117 relevant lines covered (88.31%)

0.88 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

93.07
/../core/src/main/java/io/grpc/inprocess/InProcessTransport.java
1
/*
2
 * Copyright 2015 The gRPC Authors
3
 *
4
 * Licensed under the Apache License, Version 2.0 (the "License");
5
 * you may not use this file except in compliance with the License.
6
 * You may obtain a copy of the License at
7
 *
8
 *     http://www.apache.org/licenses/LICENSE-2.0
9
 *
10
 * Unless required by applicable law or agreed to in writing, software
11
 * distributed under the License is distributed on an "AS IS" BASIS,
12
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
 * See the License for the specific language governing permissions and
14
 * limitations under the License.
15
 */
16

17
package io.grpc.inprocess;
18

19
import static com.google.common.base.Preconditions.checkNotNull;
20
import static io.grpc.internal.GrpcUtil.TIMEOUT_KEY;
21
import static java.lang.Math.max;
22

23
import com.google.common.base.MoreObjects;
24
import com.google.common.base.Optional;
25
import com.google.common.util.concurrent.ListenableFuture;
26
import com.google.common.util.concurrent.SettableFuture;
27
import io.grpc.Attributes;
28
import io.grpc.CallOptions;
29
import io.grpc.ClientStreamTracer;
30
import io.grpc.Compressor;
31
import io.grpc.Deadline;
32
import io.grpc.Decompressor;
33
import io.grpc.DecompressorRegistry;
34
import io.grpc.Grpc;
35
import io.grpc.InternalChannelz.SocketStats;
36
import io.grpc.InternalLogId;
37
import io.grpc.InternalMetadata;
38
import io.grpc.Metadata;
39
import io.grpc.MethodDescriptor;
40
import io.grpc.SecurityLevel;
41
import io.grpc.ServerStreamTracer;
42
import io.grpc.Status;
43
import io.grpc.SynchronizationContext;
44
import io.grpc.internal.ClientStream;
45
import io.grpc.internal.ClientStreamListener;
46
import io.grpc.internal.ClientStreamListener.RpcProgress;
47
import io.grpc.internal.ConnectionClientTransport;
48
import io.grpc.internal.GrpcAttributes;
49
import io.grpc.internal.GrpcUtil;
50
import io.grpc.internal.InUseStateAggregator;
51
import io.grpc.internal.InsightBuilder;
52
import io.grpc.internal.ManagedClientTransport;
53
import io.grpc.internal.NoopClientStream;
54
import io.grpc.internal.ObjectPool;
55
import io.grpc.internal.ServerListener;
56
import io.grpc.internal.ServerStream;
57
import io.grpc.internal.ServerStreamListener;
58
import io.grpc.internal.ServerTransport;
59
import io.grpc.internal.ServerTransportListener;
60
import io.grpc.internal.StatsTraceContext;
61
import io.grpc.internal.StreamListener;
62
import java.io.InputStream;
63
import java.net.SocketAddress;
64
import java.util.ArrayDeque;
65
import java.util.ArrayList;
66
import java.util.Collections;
67
import java.util.IdentityHashMap;
68
import java.util.List;
69
import java.util.Locale;
70
import java.util.Set;
71
import java.util.concurrent.Executor;
72
import java.util.concurrent.ScheduledExecutorService;
73
import java.util.concurrent.TimeUnit;
74
import java.util.logging.Level;
75
import java.util.logging.Logger;
76
import javax.annotation.CheckReturnValue;
77
import javax.annotation.Nullable;
78
import javax.annotation.concurrent.GuardedBy;
79
import javax.annotation.concurrent.ThreadSafe;
80

81
@ThreadSafe
82
final class InProcessTransport implements ServerTransport, ConnectionClientTransport {
83
  private static final Logger log = Logger.getLogger(InProcessTransport.class.getName());
1✔
84

85
  private final InternalLogId logId;
86
  private final SocketAddress address;
87
  private final int clientMaxInboundMetadataSize;
88
  private final String authority;
89
  private final String userAgent;
90
  private final Optional<ServerListener> optionalServerListener;
91
  private int serverMaxInboundMetadataSize;
92
  private final boolean includeCauseWithStatus;
93
  private ObjectPool<ScheduledExecutorService> serverSchedulerPool;
94
  private ScheduledExecutorService serverScheduler;
95
  private ServerTransportListener serverTransportListener;
96
  private Attributes serverStreamAttributes;
97
  private ManagedClientTransport.Listener clientTransportListener;
98
  @GuardedBy("this")
99
  private boolean shutdown;
100
  @GuardedBy("this")
101
  private boolean terminated;
102
  @GuardedBy("this")
103
  private Status shutdownStatus;
104
  @GuardedBy("this")
1✔
105
  private final Set<InProcessStream> streams = Collections.newSetFromMap(
1✔
106
          new IdentityHashMap<InProcessStream, Boolean>());
107
  @GuardedBy("this")
108
  private List<ServerStreamTracer.Factory> serverStreamTracerFactories;
109
  private final Attributes attributes;
110

111
  private Thread.UncaughtExceptionHandler uncaughtExceptionHandler =
1✔
112
      new Thread.UncaughtExceptionHandler() {
1✔
113
        @Override
114
        public void uncaughtException(Thread t, Throwable e) {
115
          if (e instanceof Error) {
×
116
            throw new Error(e);
×
117
          }
118
          throw new RuntimeException(e);
×
119
        }
120
  };
121

122

123
  @GuardedBy("this")
1✔
124
  private final InUseStateAggregator<InProcessStream> inUseState =
125
      new InUseStateAggregator<InProcessStream>() {
1✔
126
        @Override
127
        protected void handleInUse() {
128
          clientTransportListener.transportInUse(true);
1✔
129
        }
1✔
130

131
        @Override
132
        protected void handleNotInUse() {
133
          clientTransportListener.transportInUse(false);
1✔
134
        }
1✔
135
      };
136

137
  private InProcessTransport(SocketAddress address, int maxInboundMetadataSize, String authority,
138
      String userAgent, Attributes eagAttrs,
139
      Optional<ServerListener> optionalServerListener, boolean includeCauseWithStatus) {
1✔
140
    this.address = address;
1✔
141
    this.clientMaxInboundMetadataSize = maxInboundMetadataSize;
1✔
142
    this.authority = authority;
1✔
143
    this.userAgent = GrpcUtil.getGrpcUserAgent("inprocess", userAgent);
1✔
144
    checkNotNull(eagAttrs, "eagAttrs");
1✔
145
    this.attributes = Attributes.newBuilder()
1✔
146
        .set(GrpcAttributes.ATTR_SECURITY_LEVEL, SecurityLevel.PRIVACY_AND_INTEGRITY)
1✔
147
        .set(GrpcAttributes.ATTR_CLIENT_EAG_ATTRS, eagAttrs)
1✔
148
        .set(Grpc.TRANSPORT_ATTR_REMOTE_ADDR, address)
1✔
149
        .set(Grpc.TRANSPORT_ATTR_LOCAL_ADDR, address)
1✔
150
        .build();
1✔
151
    this.optionalServerListener = optionalServerListener;
1✔
152
    logId = InternalLogId.allocate(getClass(), address.toString());
1✔
153
    this.includeCauseWithStatus = includeCauseWithStatus;
1✔
154
  }
1✔
155

156
  public InProcessTransport(
157
      SocketAddress address, int maxInboundMetadataSize, String authority, String userAgent,
158
      Attributes eagAttrs, boolean includeCauseWithStatus) {
159
    this(address, maxInboundMetadataSize, authority, userAgent, eagAttrs,
1✔
160
        Optional.<ServerListener>absent(), includeCauseWithStatus);
1✔
161
  }
1✔
162

163
  InProcessTransport(
164
      String name, int maxInboundMetadataSize, String authority, String userAgent,
165
      Attributes eagAttrs, ObjectPool<ScheduledExecutorService> serverSchedulerPool,
166
      List<ServerStreamTracer.Factory> serverStreamTracerFactories,
167
      ServerListener serverListener, boolean includeCauseWithStatus) {
168
    this(new InProcessSocketAddress(name), maxInboundMetadataSize, authority, userAgent, eagAttrs,
1✔
169
        Optional.of(serverListener), includeCauseWithStatus);
1✔
170
    this.serverMaxInboundMetadataSize = maxInboundMetadataSize;
1✔
171
    this.serverSchedulerPool = serverSchedulerPool;
1✔
172
    this.serverStreamTracerFactories = serverStreamTracerFactories;
1✔
173
  }
1✔
174

175
  @CheckReturnValue
176
  @Override
177
  public synchronized Runnable start(ManagedClientTransport.Listener listener) {
178
    this.clientTransportListener = listener;
1✔
179
    if (optionalServerListener.isPresent()) {
1✔
180
      serverScheduler = serverSchedulerPool.getObject();
1✔
181
      serverTransportListener = optionalServerListener.get().transportCreated(this);
1✔
182
    } else {
183
      InProcessServer server = InProcessServer.findServer(address);
1✔
184
      if (server != null) {
1✔
185
        serverMaxInboundMetadataSize = server.getMaxInboundMetadataSize();
1✔
186
        serverSchedulerPool = server.getScheduledExecutorServicePool();
1✔
187
        serverScheduler = serverSchedulerPool.getObject();
1✔
188
        serverStreamTracerFactories = server.getStreamTracerFactories();
1✔
189
        // Must be semi-initialized; past this point, can begin receiving requests
190
        serverTransportListener = server.register(this);
1✔
191
      }
192
    }
193
    if (serverTransportListener == null) {
1✔
194
      shutdownStatus = Status.UNAVAILABLE.withDescription("Could not find server: " + address);
1✔
195
      final Status localShutdownStatus = shutdownStatus;
1✔
196
      return new Runnable() {
1✔
197
        @Override
198
        public void run() {
199
          synchronized (InProcessTransport.this) {
1✔
200
            notifyShutdown(localShutdownStatus);
1✔
201
            notifyTerminated();
1✔
202
          }
1✔
203
        }
1✔
204
      };
205
    }
206
    return new Runnable() {
1✔
207
      @Override
208
      @SuppressWarnings("deprecation")
209
      public void run() {
210
        synchronized (InProcessTransport.this) {
1✔
211
          Attributes serverTransportAttrs = Attributes.newBuilder()
1✔
212
              .set(Grpc.TRANSPORT_ATTR_REMOTE_ADDR, address)
1✔
213
              .set(Grpc.TRANSPORT_ATTR_LOCAL_ADDR, address)
1✔
214
              .build();
1✔
215
          serverStreamAttributes = serverTransportListener.transportReady(serverTransportAttrs);
1✔
216
          clientTransportListener.transportReady();
1✔
217
        }
1✔
218
      }
1✔
219
    };
220
  }
221

222
  @Override
223
  public synchronized ClientStream newStream(
224
      MethodDescriptor<?, ?> method, Metadata headers, CallOptions callOptions,
225
      ClientStreamTracer[] tracers) {
226
    StatsTraceContext statsTraceContext =
1✔
227
        StatsTraceContext.newClientContext(tracers, getAttributes(), headers);
1✔
228
    if (shutdownStatus != null) {
1✔
229
      return failedClientStream(statsTraceContext, shutdownStatus);
1✔
230
    }
231

232
    headers.put(GrpcUtil.USER_AGENT_KEY, userAgent);
1✔
233

234
    if (serverMaxInboundMetadataSize != Integer.MAX_VALUE) {
1✔
235
      int metadataSize = metadataSize(headers);
1✔
236
      if (metadataSize > serverMaxInboundMetadataSize) {
1✔
237
        // Other transports would compute a status with:
238
        //   GrpcUtil.httpStatusToGrpcStatus(431 /* Request Header Fields Too Large */);
239
        // However, that isn't handled specially today, so we'd leak HTTP-isms even though we're
240
        // in-process. We go ahead and make a Status, which may need to be updated if
241
        // statuscodes.md is updated.
242
        Status status = Status.RESOURCE_EXHAUSTED.withDescription(
1✔
243
            String.format(
1✔
244
                Locale.US,
245
                "Request metadata larger than %d: %d",
246
                serverMaxInboundMetadataSize,
1✔
247
                metadataSize));
1✔
248
        return failedClientStream(statsTraceContext, status);
1✔
249
      }
250
    }
251

252
    return new InProcessStream(method, headers, callOptions, authority, statsTraceContext)
1✔
253
        .clientStream;
1✔
254
  }
255

256
  private ClientStream failedClientStream(
257
      final StatsTraceContext statsTraceCtx, final Status status) {
258
    return new NoopClientStream() {
1✔
259
        @Override
260
        public void start(ClientStreamListener listener) {
261
          statsTraceCtx.clientOutboundHeaders();
1✔
262
          statsTraceCtx.streamClosed(status);
1✔
263
          listener.closed(status, RpcProgress.PROCESSED, new Metadata());
1✔
264
        }
1✔
265
      };
266
  }
267

268
  @Override
269
  public synchronized void ping(final PingCallback callback, Executor executor) {
270
    if (terminated) {
1✔
271
      final Status shutdownStatus = this.shutdownStatus;
1✔
272
      executor.execute(new Runnable() {
1✔
273
        @Override
274
        public void run() {
275
          callback.onFailure(shutdownStatus.asRuntimeException());
1✔
276
        }
1✔
277
      });
278
    } else {
1✔
279
      executor.execute(new Runnable() {
1✔
280
        @Override
281
        public void run() {
282
          callback.onSuccess(0);
1✔
283
        }
1✔
284
      });
285
    }
286
  }
1✔
287

288
  @Override
289
  public synchronized void shutdown(Status reason) {
290
    // Can be called multiple times: once for ManagedClientTransport, once for ServerTransport.
291
    if (shutdown) {
1✔
292
      return;
1✔
293
    }
294
    shutdownStatus = reason;
1✔
295
    notifyShutdown(reason);
1✔
296
    if (streams.isEmpty()) {
1✔
297
      notifyTerminated();
1✔
298
    }
299
  }
1✔
300

301
  @Override
302
  public synchronized void shutdown() {
303
    shutdown(Status.UNAVAILABLE.withDescription("InProcessTransport shutdown by the server-side"));
1✔
304
  }
1✔
305

306
  @Override
307
  public void shutdownNow(Status reason) {
308
    checkNotNull(reason, "reason");
1✔
309
    List<InProcessStream> streamsCopy;
310
    synchronized (this) {
1✔
311
      shutdown(reason);
1✔
312
      if (terminated) {
1✔
313
        return;
1✔
314
      }
315
      streamsCopy = new ArrayList<>(streams);
1✔
316
    }
1✔
317
    for (InProcessStream stream : streamsCopy) {
1✔
318
      stream.clientStream.cancel(reason);
1✔
319
    }
1✔
320
  }
1✔
321

322
  @Override
323
  public String toString() {
324
    return MoreObjects.toStringHelper(this)
×
325
        .add("logId", logId.getId())
×
326
        .add("address", address)
×
327
        .toString();
×
328
  }
329

330
  @Override
331
  public InternalLogId getLogId() {
332
    return logId;
1✔
333
  }
334

335
  @Override
336
  public Attributes getAttributes() {
337
    return attributes;
1✔
338
  }
339

340
  @Override
341
  public ScheduledExecutorService getScheduledExecutorService() {
342
    return serverScheduler;
1✔
343
  }
344

345
  @Override
346
  public ListenableFuture<SocketStats> getStats() {
347
    SettableFuture<SocketStats> ret = SettableFuture.create();
×
348
    ret.set(null);
×
349
    return ret;
×
350
  }
351

352
  private synchronized void notifyShutdown(Status s) {
353
    if (shutdown) {
1✔
354
      return;
×
355
    }
356
    shutdown = true;
1✔
357
    clientTransportListener.transportShutdown(s);
1✔
358
  }
1✔
359

360
  private synchronized void notifyTerminated() {
361
    if (terminated) {
1✔
362
      return;
×
363
    }
364
    terminated = true;
1✔
365
    if (serverScheduler != null) {
1✔
366
      serverScheduler = serverSchedulerPool.returnObject(serverScheduler);
1✔
367
    }
368
    clientTransportListener.transportTerminated();
1✔
369
    if (serverTransportListener != null) {
1✔
370
      serverTransportListener.transportTerminated();
1✔
371
    }
372
  }
1✔
373

374
  private static int metadataSize(Metadata metadata) {
375
    byte[][] serialized = InternalMetadata.serialize(metadata);
1✔
376
    if (serialized == null) {
1✔
377
      return 0;
×
378
    }
379
    // Calculate based on SETTINGS_MAX_HEADER_LIST_SIZE in RFC 7540 §6.5.2. We could use something
380
    // different, but it's "sane."
381
    long size = 0;
1✔
382
    for (int i = 0; i < serialized.length; i += 2) {
1✔
383
      size += 32 + serialized[i].length + serialized[i + 1].length;
1✔
384
    }
385
    size = Math.min(size, Integer.MAX_VALUE);
1✔
386
    return (int) size;
1✔
387
  }
388

389
  private class InProcessStream {
390
    private final InProcessClientStream clientStream;
391
    private final InProcessServerStream serverStream;
392
    private final CallOptions callOptions;
393
    private final Metadata headers;
394
    private final MethodDescriptor<?, ?> method;
395
    private volatile String authority;
396

397
    private InProcessStream(
398
        MethodDescriptor<?, ?> method, Metadata headers, CallOptions callOptions,
399
        String authority , StatsTraceContext statsTraceContext) {
1✔
400
      this.method = checkNotNull(method, "method");
1✔
401
      this.headers = checkNotNull(headers, "headers");
1✔
402
      this.callOptions = checkNotNull(callOptions, "callOptions");
1✔
403
      this.authority = authority;
1✔
404
      this.clientStream = new InProcessClientStream(callOptions, statsTraceContext);
1✔
405
      this.serverStream = new InProcessServerStream(method, headers);
1✔
406
    }
1✔
407

408
    // Can be called multiple times due to races on both client and server closing at same time.
409
    private void streamClosed() {
410
      synchronized (InProcessTransport.this) {
1✔
411
        boolean justRemovedAnElement = streams.remove(this);
1✔
412
        if (GrpcUtil.shouldBeCountedForInUse(callOptions)) {
1✔
413
          inUseState.updateObjectInUse(this, false);
1✔
414
        }
415
        if (streams.isEmpty() && justRemovedAnElement) {
1✔
416
          if (shutdown) {
1✔
417
            notifyTerminated();
1✔
418
          }
419
        }
420
      }
1✔
421
    }
1✔
422

423
    private class InProcessServerStream implements ServerStream {
424
      final StatsTraceContext statsTraceCtx;
425
      // All callbacks must run in syncContext to avoid possibility of deadlock in direct executors
426
      private ClientStreamListener clientStreamListener;
427
      private final SynchronizationContext syncContext =
1✔
428
          new SynchronizationContext(uncaughtExceptionHandler);
1✔
429
      @GuardedBy("this")
430
      private int clientRequested;
431
      @GuardedBy("this")
1✔
432
      private ArrayDeque<StreamListener.MessageProducer> clientReceiveQueue =
433
          new ArrayDeque<>();
434
      @GuardedBy("this")
435
      private Status clientNotifyStatus;
436
      @GuardedBy("this")
437
      private Metadata clientNotifyTrailers;
438
      // Only is intended to prevent double-close when client cancels.
439
      @GuardedBy("this")
440
      private boolean closed;
441
      @GuardedBy("this")
442
      private int outboundSeqNo;
443

444
      InProcessServerStream(MethodDescriptor<?, ?> method, Metadata headers) {
1✔
445
        statsTraceCtx = StatsTraceContext.newServerContext(
1✔
446
            serverStreamTracerFactories, method.getFullMethodName(), headers);
1✔
447
      }
1✔
448

449
      private synchronized void setListener(ClientStreamListener listener) {
450
        clientStreamListener = listener;
1✔
451
      }
1✔
452

453
      @Override
454
      public void setListener(ServerStreamListener serverStreamListener) {
455
        clientStream.setListener(serverStreamListener);
1✔
456
      }
1✔
457

458
      @Override
459
      public void request(int numMessages) {
460
        boolean onReady = clientStream.serverRequested(numMessages);
1✔
461
        if (onReady) {
1✔
462
          synchronized (this) {
1✔
463
            if (!closed) {
1✔
464
              syncContext.executeLater(() -> clientStreamListener.onReady());
1✔
465
            }
466
          }
1✔
467
        }
468
        syncContext.drain();
1✔
469
      }
1✔
470

471
      // This method is the only reason we have to synchronize field accesses.
472
      /**
473
       * Client requested more messages.
474
       *
475
       * @return whether onReady should be called on the server
476
       */
477
      private boolean clientRequested(int numMessages) {
478
        boolean previouslyReady;
479
        boolean nowReady;
480
        synchronized (this) {
1✔
481
          if (closed) {
1✔
482
            return false;
1✔
483
          }
484

485
          previouslyReady = clientRequested > 0;
1✔
486
          clientRequested += numMessages;
1✔
487
          while (clientRequested > 0 && !clientReceiveQueue.isEmpty()) {
1✔
488
            clientRequested--;
1✔
489
            StreamListener.MessageProducer producer = clientReceiveQueue.poll();
1✔
490
            syncContext.executeLater(() -> clientStreamListener.messagesAvailable(producer));
1✔
491
          }
1✔
492

493
          if (clientReceiveQueue.isEmpty() && clientNotifyStatus != null) {
1✔
494
            closed = true;
1✔
495
            clientStream.statsTraceCtx.clientInboundTrailers(clientNotifyTrailers);
1✔
496
            clientStream.statsTraceCtx.streamClosed(clientNotifyStatus);
1✔
497
            Status notifyStatus = this.clientNotifyStatus;
1✔
498
            Metadata notifyTrailers = this.clientNotifyTrailers;
1✔
499
            syncContext.executeLater(() ->
1✔
500
                clientStreamListener.closed(notifyStatus, RpcProgress.PROCESSED, notifyTrailers));
1✔
501
          }
502

503
          nowReady = clientRequested > 0;
1✔
504
        }
1✔
505

506
        syncContext.drain();
1✔
507
        return !previouslyReady && nowReady;
1✔
508
      }
509

510
      private void clientCancelled(Status status) {
511
        internalCancel(status);
1✔
512
      }
1✔
513

514
      @Override
515
      public void writeMessage(InputStream message) {
516
        synchronized (this) {
1✔
517
          if (closed) {
1✔
518
            return;
1✔
519
          }
520
          statsTraceCtx.outboundMessage(outboundSeqNo);
1✔
521
          statsTraceCtx.outboundMessageSent(outboundSeqNo, -1, -1);
1✔
522
          clientStream.statsTraceCtx.inboundMessage(outboundSeqNo);
1✔
523
          clientStream.statsTraceCtx.inboundMessageRead(outboundSeqNo, -1, -1);
1✔
524
          outboundSeqNo++;
1✔
525
          StreamListener.MessageProducer producer = new SingleMessageProducer(message);
1✔
526
          if (clientRequested > 0) {
1✔
527
            clientRequested--;
1✔
528
            syncContext.executeLater(() -> clientStreamListener.messagesAvailable(producer));
1✔
529
          } else {
530
            clientReceiveQueue.add(producer);
1✔
531
          }
532
        }
1✔
533

534
        syncContext.drain();
1✔
535
      }
1✔
536

537
      @Override
538
      public void flush() {}
1✔
539

540
      @Override
541
      public synchronized boolean isReady() {
542
        if (closed) {
1✔
543
          return false;
×
544
        }
545
        return clientRequested > 0;
1✔
546
      }
547

548
      @Override
549
      public void writeHeaders(Metadata headers) {
550
        if (clientMaxInboundMetadataSize != Integer.MAX_VALUE) {
1✔
551
          int metadataSize = metadataSize(headers);
1✔
552
          if (metadataSize > clientMaxInboundMetadataSize) {
1✔
553
            Status serverStatus = Status.CANCELLED.withDescription("Client cancelled the RPC");
1✔
554
            clientStream.serverClosed(serverStatus, serverStatus);
1✔
555
            // Other transports provide very little information in this case. We go ahead and make a
556
            // Status, which may need to be updated if statuscodes.md is updated.
557
            Status failedStatus = Status.RESOURCE_EXHAUSTED.withDescription(
1✔
558
                String.format(
1✔
559
                    Locale.US,
560
                    "Response header metadata larger than %d: %d",
561
                    clientMaxInboundMetadataSize,
1✔
562
                    metadataSize));
1✔
563
            notifyClientClose(failedStatus, new Metadata());
1✔
564
            return;
1✔
565
          }
566
        }
567

568
        synchronized (this) {
1✔
569
          if (closed) {
1✔
570
            return;
1✔
571
          }
572

573
          clientStream.statsTraceCtx.clientInboundHeaders();
1✔
574
          syncContext.executeLater(() -> clientStreamListener.headersRead(headers));
1✔
575
        }
1✔
576
        syncContext.drain();
1✔
577
      }
1✔
578

579
      @Override
580
      public void close(Status status, Metadata trailers) {
581
        // clientStream.serverClosed must happen before clientStreamListener.closed, otherwise
582
        // clientStreamListener.closed can trigger clientStream.cancel (see code in
583
        // ClientCalls.blockingUnaryCall), which may race with clientStream.serverClosed as both are
584
        // calling internalCancel().
585
        clientStream.serverClosed(Status.OK, status);
1✔
586

587
        if (clientMaxInboundMetadataSize != Integer.MAX_VALUE) {
1✔
588
          int statusSize = status.getDescription() == null ? 0 : status.getDescription().length();
1✔
589
          // Go ahead and throw in the status description's length, since that could be very long.
590
          int metadataSize = metadataSize(trailers) + statusSize;
1✔
591
          if (metadataSize > clientMaxInboundMetadataSize) {
1✔
592
            // Override the status for the client, but not the server. Transports do not guarantee
593
            // notifying the server of the failure.
594

595
            // Other transports provide very little information in this case. We go ahead and make a
596
            // Status, which may need to be updated if statuscodes.md is updated.
597
            status = Status.RESOURCE_EXHAUSTED.withDescription(
1✔
598
                String.format(
1✔
599
                    Locale.US,
600
                    "Response header metadata larger than %d: %d",
601
                    clientMaxInboundMetadataSize,
1✔
602
                    metadataSize));
1✔
603
            trailers = new Metadata();
1✔
604
          }
605
        }
606

607
        notifyClientClose(status, trailers);
1✔
608
      }
1✔
609

610
      /** clientStream.serverClosed() must be called before this method */
611
      private void notifyClientClose(Status status, Metadata trailers) {
612
        Status clientStatus = cleanStatus(status, includeCauseWithStatus);
1✔
613
        synchronized (this) {
1✔
614
          if (closed) {
1✔
615
            return;
1✔
616
          }
617
          if (clientReceiveQueue.isEmpty()) {
1✔
618
            closed = true;
1✔
619
            clientStream.statsTraceCtx.clientInboundTrailers(trailers);
1✔
620
            clientStream.statsTraceCtx.streamClosed(clientStatus);
1✔
621
            syncContext.executeLater(
1✔
622
                () -> clientStreamListener.closed(clientStatus, RpcProgress.PROCESSED, trailers));
1✔
623
          } else {
624
            clientNotifyStatus = clientStatus;
1✔
625
            clientNotifyTrailers = trailers;
1✔
626
          }
627
        }
1✔
628
        syncContext.drain();
1✔
629
        streamClosed();
1✔
630
      }
1✔
631

632
      @Override
633
      public void cancel(Status status) {
634
        if (!internalCancel(Status.CANCELLED.withDescription("server cancelled stream"))) {
1✔
635
          return;
1✔
636
        }
637
        clientStream.serverClosed(status, status);
1✔
638
        streamClosed();
1✔
639
      }
1✔
640

641
      private boolean internalCancel(Status clientStatus) {
642
        synchronized (this) {
1✔
643
          if (closed) {
1✔
644
            return false;
1✔
645
          }
646
          closed = true;
1✔
647
          StreamListener.MessageProducer producer;
648
          while ((producer = clientReceiveQueue.poll()) != null) {
1✔
649
            InputStream message;
650
            while ((message = producer.next()) != null) {
×
651
              try {
652
                message.close();
×
653
              } catch (Throwable t) {
×
654
                log.log(Level.WARNING, "Exception closing stream", t);
×
655
              }
×
656
            }
657
          }
×
658
          clientStream.statsTraceCtx.streamClosed(clientStatus);
1✔
659
          syncContext.executeLater(
1✔
660
              () ->
661
                  clientStreamListener.closed(clientStatus, RpcProgress.PROCESSED, new Metadata()));
1✔
662
        }
1✔
663
        syncContext.drain();
1✔
664
        return true;
1✔
665
      }
666

667
      @Override
668
      public void setMessageCompression(boolean enable) {
669
        // noop
670
      }
×
671

672
      @Override
673
      public void optimizeForDirectExecutor() {}
1✔
674

675
      @Override
676
      public void setCompressor(Compressor compressor) {}
1✔
677

678
      @Override
679
      public void setDecompressor(Decompressor decompressor) {}
×
680

681
      @Override public Attributes getAttributes() {
682
        return serverStreamAttributes;
1✔
683
      }
684

685
      @Override
686
      public String getAuthority() {
687
        return InProcessStream.this.authority;
1✔
688
      }
689

690
      @Override
691
      public StatsTraceContext statsTraceContext() {
692
        return statsTraceCtx;
1✔
693
      }
694

695
      @Override
696
      public int streamId() {
697
        return -1;
1✔
698
      }
699
    }
700

701
    private class InProcessClientStream implements ClientStream {
702
      final StatsTraceContext statsTraceCtx;
703
      final CallOptions callOptions;
704
      // All callbacks must run in syncContext to avoid possibility of deadlock in direct executors
705
      private ServerStreamListener serverStreamListener;
706
      private final SynchronizationContext syncContext =
1✔
707
          new SynchronizationContext(uncaughtExceptionHandler);
1✔
708
      @GuardedBy("this")
709
      private int serverRequested;
710
      @GuardedBy("this")
1✔
711
      private ArrayDeque<StreamListener.MessageProducer> serverReceiveQueue =
712
          new ArrayDeque<>();
713
      @GuardedBy("this")
714
      private boolean serverNotifyHalfClose;
715
      // Only is intended to prevent double-close when server closes.
716
      @GuardedBy("this")
717
      private boolean closed;
718
      @GuardedBy("this")
719
      private int outboundSeqNo;
720

721
      InProcessClientStream(
722
          CallOptions callOptions, StatsTraceContext statsTraceContext) {
1✔
723
        this.callOptions = callOptions;
1✔
724
        statsTraceCtx = statsTraceContext;
1✔
725
      }
1✔
726

727
      private synchronized void setListener(ServerStreamListener listener) {
728
        this.serverStreamListener = listener;
1✔
729
      }
1✔
730

731
      @Override
732
      public void request(int numMessages) {
733
        boolean onReady = serverStream.clientRequested(numMessages);
1✔
734
        if (onReady) {
1✔
735
          synchronized (this) {
1✔
736
            if (!closed) {
1✔
737
              syncContext.executeLater(() -> serverStreamListener.onReady());
1✔
738
            }
739
          }
1✔
740
          syncContext.drain();
1✔
741
        }
742
      }
1✔
743

744
      // This method is the only reason we have to synchronize field accesses.
745
      /**
746
       * Client requested more messages.
747
       *
748
       * @return whether onReady should be called on the server
749
       */
750
      private boolean serverRequested(int numMessages) {
751
        boolean previouslyReady;
752
        boolean nowReady;
753
        synchronized (this) {
1✔
754
          if (closed) {
1✔
755
            return false;
1✔
756
          }
757
          previouslyReady = serverRequested > 0;
1✔
758
          serverRequested += numMessages;
1✔
759

760
          while (serverRequested > 0 && !serverReceiveQueue.isEmpty()) {
1✔
761
            serverRequested--;
1✔
762
            StreamListener.MessageProducer producer = serverReceiveQueue.poll();
1✔
763
            syncContext.executeLater(() -> serverStreamListener.messagesAvailable(producer));
1✔
764
          }
1✔
765

766
          if (serverReceiveQueue.isEmpty() && serverNotifyHalfClose) {
1✔
767
            serverNotifyHalfClose = false;
1✔
768
            syncContext.executeLater(() -> serverStreamListener.halfClosed());
1✔
769
          }
770
          nowReady = serverRequested > 0;
1✔
771
        }
1✔
772
        syncContext.drain();
1✔
773
        return !previouslyReady && nowReady;
1✔
774
      }
775

776
      private void serverClosed(Status serverListenerStatus, Status serverTracerStatus) {
777
        internalCancel(serverListenerStatus, serverTracerStatus);
1✔
778
      }
1✔
779

780
      @Override
781
      public void writeMessage(InputStream message) {
782
        synchronized (this) {
1✔
783
          if (closed) {
1✔
784
            return;
1✔
785
          }
786
          statsTraceCtx.outboundMessage(outboundSeqNo);
1✔
787
          statsTraceCtx.outboundMessageSent(outboundSeqNo, -1, -1);
1✔
788
          serverStream.statsTraceCtx.inboundMessage(outboundSeqNo);
1✔
789
          serverStream.statsTraceCtx.inboundMessageRead(outboundSeqNo, -1, -1);
1✔
790
          outboundSeqNo++;
1✔
791
          StreamListener.MessageProducer producer = new SingleMessageProducer(message);
1✔
792
          if (serverRequested > 0) {
1✔
793
            serverRequested--;
1✔
794
            syncContext.executeLater(() -> serverStreamListener.messagesAvailable(producer));
1✔
795
          } else {
796
            serverReceiveQueue.add(producer);
1✔
797
          }
798
        }
1✔
799
        syncContext.drain();
1✔
800
      }
1✔
801

802
      @Override
803
      public void flush() {}
1✔
804

805
      @Override
806
      public synchronized boolean isReady() {
807
        if (closed) {
1✔
808
          return false;
×
809
        }
810
        return serverRequested > 0;
1✔
811
      }
812

813
      // Must be thread-safe for shutdownNow()
814
      @Override
815
      public void cancel(Status reason) {
816
        Status serverStatus = cleanStatus(reason, includeCauseWithStatus);
1✔
817
        if (!internalCancel(serverStatus, serverStatus)) {
1✔
818
          return;
1✔
819
        }
820
        serverStream.clientCancelled(reason);
1✔
821
        streamClosed();
1✔
822
      }
1✔
823

824
      private boolean internalCancel(
825
          Status serverListenerStatus, Status serverTracerStatus) {
826
        synchronized (this) {
1✔
827
          if (closed) {
1✔
828
            return false;
1✔
829
          }
830
          closed = true;
1✔
831

832
          StreamListener.MessageProducer producer;
833
          while ((producer = serverReceiveQueue.poll()) != null) {
1✔
834
            InputStream message;
835
            while ((message = producer.next()) != null) {
1✔
836
              try {
837
                message.close();
1✔
838
              } catch (Throwable t) {
×
839
                log.log(Level.WARNING, "Exception closing stream", t);
×
840
              }
1✔
841
            }
842
          }
1✔
843
          serverStream.statsTraceCtx.streamClosed(serverTracerStatus);
1✔
844
          syncContext.executeLater(() -> serverStreamListener.closed(serverListenerStatus));
1✔
845
        }
1✔
846
        syncContext.drain();
1✔
847
        return true;
1✔
848
      }
849

850
      @Override
851
      public void halfClose() {
852
        synchronized (this) {
1✔
853
          if (closed) {
1✔
854
            return;
1✔
855
          }
856
          if (serverReceiveQueue.isEmpty()) {
1✔
857
            syncContext.executeLater(() -> serverStreamListener.halfClosed());
1✔
858
          } else {
859
            serverNotifyHalfClose = true;
1✔
860
          }
861
        }
1✔
862
        syncContext.drain();
1✔
863
      }
1✔
864

865
      @Override
866
      public void setMessageCompression(boolean enable) {}
×
867

868
      @Override
869
      public void setAuthority(String string) {
870
        InProcessStream.this.authority = string;
×
871
      }
×
872

873
      @Override
874
      public void start(ClientStreamListener listener) {
875
        serverStream.setListener(listener);
1✔
876

877
        synchronized (InProcessTransport.this) {
1✔
878
          statsTraceCtx.clientOutboundHeaders();
1✔
879
          streams.add(InProcessTransport.InProcessStream.this);
1✔
880
          if (GrpcUtil.shouldBeCountedForInUse(callOptions)) {
1✔
881
            inUseState.updateObjectInUse(InProcessTransport.InProcessStream.this, true);
1✔
882
          }
883
          serverTransportListener.streamCreated(serverStream, method.getFullMethodName(), headers);
1✔
884
        }
1✔
885
      }
1✔
886

887
      @Override
888
      public Attributes getAttributes() {
889
        return attributes;
1✔
890
      }
891

892
      @Override
893
      public void optimizeForDirectExecutor() {}
1✔
894

895
      @Override
896
      public void setCompressor(Compressor compressor) {}
1✔
897

898
      @Override
899
      public void setFullStreamDecompression(boolean fullStreamDecompression) {}
×
900

901
      @Override
902
      public void setDecompressorRegistry(DecompressorRegistry decompressorRegistry) {}
1✔
903

904
      @Override
905
      public void setMaxInboundMessageSize(int maxSize) {}
1✔
906

907
      @Override
908
      public void setMaxOutboundMessageSize(int maxSize) {}
1✔
909

910
      @Override
911
      public void setDeadline(Deadline deadline) {
912
        headers.discardAll(TIMEOUT_KEY);
1✔
913
        long effectiveTimeout = max(0, deadline.timeRemaining(TimeUnit.NANOSECONDS));
1✔
914
        headers.put(TIMEOUT_KEY, effectiveTimeout);
1✔
915
      }
1✔
916

917
      @Override
918
      public void appendTimeoutInsight(InsightBuilder insight) {
919
      }
1✔
920
    }
921
  }
922

923
  /**
924
   * Returns a new status with the same code and description.
925
   * If includeCauseWithStatus is true, cause is also included.
926
   *
927
   * <p>For InProcess transport to behave in the same way as the other transports,
928
   * when exchanging statuses between client and server and vice versa,
929
   * the cause should be excluded from the status.
930
   * For easier debugging, the status may be optionally included.
931
   */
932
  private static Status cleanStatus(Status status, boolean includeCauseWithStatus) {
933
    if (status == null) {
1✔
934
      return null;
×
935
    }
936
    Status clientStatus = Status
1✔
937
        .fromCodeValue(status.getCode().value())
1✔
938
        .withDescription(status.getDescription());
1✔
939
    if (includeCauseWithStatus) {
1✔
940
      clientStatus = clientStatus.withCause(status.getCause());
1✔
941
    }
942
    return clientStatus;
1✔
943
  }
944

945
  private static class SingleMessageProducer implements StreamListener.MessageProducer {
946
    private InputStream message;
947

948
    private SingleMessageProducer(InputStream message) {
1✔
949
      this.message = message;
1✔
950
    }
1✔
951

952
    @Nullable
953
    @Override
954
    public InputStream next() {
955
      InputStream messageToReturn = message;
1✔
956
      message = null;
1✔
957
      return messageToReturn;
1✔
958
    }
959
  }
960
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc