• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

grpc / grpc-java / #20002

29 Sep 2025 04:21PM UTC coverage: 88.592% (+0.02%) from 88.575%
#20002

push

github

web-flow
xds: xDS based SNI setting and SAN validation (#12378)

When using xDS credentials make SNI for the Tls handshake to be
configured via xDS, rather than use the channel authority as the SNI,
and make SAN validation to be able to use the SNI sent when so
instructed via xDS.

Implements A101.

34877 of 39368 relevant lines covered (88.59%)

0.89 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

86.49
/../inprocess/src/main/java/io/grpc/inprocess/InProcessTransport.java
1
/*
2
 * Copyright 2015 The gRPC Authors
3
 *
4
 * Licensed under the Apache License, Version 2.0 (the "License");
5
 * you may not use this file except in compliance with the License.
6
 * You may obtain a copy of the License at
7
 *
8
 *     http://www.apache.org/licenses/LICENSE-2.0
9
 *
10
 * Unless required by applicable law or agreed to in writing, software
11
 * distributed under the License is distributed on an "AS IS" BASIS,
12
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
 * See the License for the specific language governing permissions and
14
 * limitations under the License.
15
 */
16

17
package io.grpc.inprocess;
18

19
import static com.google.common.base.Preconditions.checkNotNull;
20
import static io.grpc.internal.GrpcUtil.TIMEOUT_KEY;
21

22
import com.google.common.base.MoreObjects;
23
import com.google.common.io.ByteStreams;
24
import com.google.common.util.concurrent.ListenableFuture;
25
import com.google.common.util.concurrent.SettableFuture;
26
import com.google.errorprone.annotations.CheckReturnValue;
27
import com.google.errorprone.annotations.concurrent.GuardedBy;
28
import io.grpc.Attributes;
29
import io.grpc.CallOptions;
30
import io.grpc.ClientStreamTracer;
31
import io.grpc.Compressor;
32
import io.grpc.Deadline;
33
import io.grpc.Decompressor;
34
import io.grpc.DecompressorRegistry;
35
import io.grpc.Grpc;
36
import io.grpc.InternalChannelz.SocketStats;
37
import io.grpc.InternalLogId;
38
import io.grpc.InternalMetadata;
39
import io.grpc.KnownLength;
40
import io.grpc.Metadata;
41
import io.grpc.MethodDescriptor;
42
import io.grpc.SecurityLevel;
43
import io.grpc.ServerStreamTracer;
44
import io.grpc.Status;
45
import io.grpc.SynchronizationContext;
46
import io.grpc.internal.ClientStream;
47
import io.grpc.internal.ClientStreamListener;
48
import io.grpc.internal.ClientStreamListener.RpcProgress;
49
import io.grpc.internal.ConnectionClientTransport;
50
import io.grpc.internal.GrpcAttributes;
51
import io.grpc.internal.GrpcUtil;
52
import io.grpc.internal.InUseStateAggregator;
53
import io.grpc.internal.InsightBuilder;
54
import io.grpc.internal.ManagedClientTransport;
55
import io.grpc.internal.NoopClientStream;
56
import io.grpc.internal.ObjectPool;
57
import io.grpc.internal.ServerStream;
58
import io.grpc.internal.ServerStreamListener;
59
import io.grpc.internal.ServerTransport;
60
import io.grpc.internal.ServerTransportListener;
61
import io.grpc.internal.StatsTraceContext;
62
import io.grpc.internal.StreamListener;
63
import java.io.ByteArrayInputStream;
64
import java.io.InputStream;
65
import java.net.SocketAddress;
66
import java.util.ArrayDeque;
67
import java.util.ArrayList;
68
import java.util.Collections;
69
import java.util.IdentityHashMap;
70
import java.util.List;
71
import java.util.Locale;
72
import java.util.Set;
73
import java.util.concurrent.Executor;
74
import java.util.concurrent.ScheduledExecutorService;
75
import java.util.concurrent.TimeUnit;
76
import java.util.logging.Level;
77
import java.util.logging.Logger;
78
import javax.annotation.Nullable;
79
import javax.annotation.concurrent.ThreadSafe;
80

81
@ThreadSafe
82
final class InProcessTransport implements ServerTransport, ConnectionClientTransport {
83
  private static final Logger log = Logger.getLogger(InProcessTransport.class.getName());
1✔
84
  static boolean isEnabledSupportTracingMessageSizes =
1✔
85
      GrpcUtil.getFlag("GRPC_EXPERIMENTAL_SUPPORT_TRACING_MESSAGE_SIZES", false);
1✔
86

87
  private final InternalLogId logId;
88
  private final SocketAddress address;
89
  private final int clientMaxInboundMetadataSize;
90
  private final String authority;
91
  private final String userAgent;
92
  private int serverMaxInboundMetadataSize;
93
  private final boolean includeCauseWithStatus;
94
  private ObjectPool<ScheduledExecutorService> serverSchedulerPool;
95
  private ScheduledExecutorService serverScheduler;
96
  private ServerTransportListener serverTransportListener;
97
  private Attributes serverStreamAttributes;
98
  private ManagedClientTransport.Listener clientTransportListener;
99
  // The size is assumed from the sender's side.
100
  private final long assumedMessageSize;
101
  @GuardedBy("this")
102
  private boolean shutdown;
103
  @GuardedBy("this")
104
  private boolean terminated;
105
  @GuardedBy("this")
106
  private Status shutdownStatus;
107
  @GuardedBy("this")
1✔
108
  private final Set<InProcessStream> streams = Collections.newSetFromMap(
1✔
109
          new IdentityHashMap<InProcessStream, Boolean>());
110
  @GuardedBy("this")
111
  private List<ServerStreamTracer.Factory> serverStreamTracerFactories;
112
  private Attributes attributes;
113

114
  private Thread.UncaughtExceptionHandler uncaughtExceptionHandler =
1✔
115
      new Thread.UncaughtExceptionHandler() {
1✔
116
        @Override
117
        public void uncaughtException(Thread t, Throwable e) {
118
          if (e instanceof Error) {
×
119
            throw new Error(e);
×
120
          }
121
          throw new RuntimeException(e);
×
122
        }
123
      };
124

125

126
  @GuardedBy("this")
1✔
127
  private final InUseStateAggregator<InProcessStream> inUseState =
128
      new InUseStateAggregator<InProcessStream>() {
1✔
129
        @Override
130
        protected void handleInUse() {
131
          clientTransportListener.transportInUse(true);
1✔
132
        }
1✔
133

134
        @Override
135
        protected void handleNotInUse() {
136
          clientTransportListener.transportInUse(false);
1✔
137
        }
1✔
138
      };
139

140
  public InProcessTransport(SocketAddress address, int maxInboundMetadataSize, String authority,
141
      String userAgent, Attributes eagAttrs,
142
      boolean includeCauseWithStatus, long assumedMessageSize) {
1✔
143
    this.address = address;
1✔
144
    this.clientMaxInboundMetadataSize = maxInboundMetadataSize;
1✔
145
    this.authority = authority;
1✔
146
    this.userAgent = GrpcUtil.getGrpcUserAgent("inprocess", userAgent);
1✔
147
    checkNotNull(eagAttrs, "eagAttrs");
1✔
148
    this.attributes = Attributes.newBuilder()
1✔
149
        .set(GrpcAttributes.ATTR_SECURITY_LEVEL, SecurityLevel.PRIVACY_AND_INTEGRITY)
1✔
150
        .set(GrpcAttributes.ATTR_CLIENT_EAG_ATTRS, eagAttrs)
1✔
151
        .set(Grpc.TRANSPORT_ATTR_REMOTE_ADDR, address)
1✔
152
        .set(Grpc.TRANSPORT_ATTR_LOCAL_ADDR, address)
1✔
153
        .build();
1✔
154
    logId = InternalLogId.allocate(getClass(), address.toString());
1✔
155
    this.includeCauseWithStatus = includeCauseWithStatus;
1✔
156
    this.assumedMessageSize = assumedMessageSize;
1✔
157
  }
1✔
158

159
  @CheckReturnValue
160
  @Override
161
  public synchronized Runnable start(ManagedClientTransport.Listener listener) {
162
    this.clientTransportListener = listener;
1✔
163
    InProcessServer server = InProcessServer.findServer(address);
1✔
164
    if (server != null) {
1✔
165
      serverMaxInboundMetadataSize = server.getMaxInboundMetadataSize();
1✔
166
      serverSchedulerPool = server.getScheduledExecutorServicePool();
1✔
167
      serverScheduler = serverSchedulerPool.getObject();
1✔
168
      serverStreamTracerFactories = server.getStreamTracerFactories();
1✔
169
      // Must be semi-initialized; past this point, can begin receiving requests
170
      serverTransportListener = server.register(this);
1✔
171
    }
172
    if (serverTransportListener == null) {
1✔
173
      shutdownStatus = Status.UNAVAILABLE.withDescription("Could not find server: " + address);
1✔
174
      final Status localShutdownStatus = shutdownStatus;
1✔
175
      return new Runnable() {
1✔
176
        @Override
177
        public void run() {
178
          synchronized (InProcessTransport.this) {
1✔
179
            notifyShutdown(localShutdownStatus);
1✔
180
            notifyTerminated();
1✔
181
          }
1✔
182
        }
1✔
183
      };
184
    }
185
    Attributes serverTransportAttrs = Attributes.newBuilder()
1✔
186
        .set(Grpc.TRANSPORT_ATTR_REMOTE_ADDR, address)
1✔
187
        .set(Grpc.TRANSPORT_ATTR_LOCAL_ADDR, address)
1✔
188
        .build();
1✔
189
    serverStreamAttributes = serverTransportListener.transportReady(serverTransportAttrs);
1✔
190
    attributes = clientTransportListener.filterTransport(attributes);
1✔
191
    clientTransportListener.transportReady();
1✔
192
    return null;
1✔
193
  }
194

195
  @Override
196
  public synchronized ClientStream newStream(
197
      MethodDescriptor<?, ?> method, Metadata headers, CallOptions callOptions,
198
      ClientStreamTracer[] tracers) {
199
    StatsTraceContext statsTraceContext =
1✔
200
        StatsTraceContext.newClientContext(tracers, getAttributes(), headers);
1✔
201
    if (shutdownStatus != null) {
1✔
202
      return failedClientStream(statsTraceContext, shutdownStatus);
1✔
203
    }
204

205
    headers.put(GrpcUtil.USER_AGENT_KEY, userAgent);
1✔
206

207
    if (serverMaxInboundMetadataSize != Integer.MAX_VALUE) {
1✔
208
      int metadataSize = metadataSize(headers);
1✔
209
      if (metadataSize > serverMaxInboundMetadataSize) {
1✔
210
        // Other transports would compute a status with:
211
        //   GrpcUtil.httpStatusToGrpcStatus(431 /* Request Header Fields Too Large */);
212
        // However, that isn't handled specially today, so we'd leak HTTP-isms even though we're
213
        // in-process. We go ahead and make a Status, which may need to be updated if
214
        // statuscodes.md is updated.
215
        Status status = Status.RESOURCE_EXHAUSTED.withDescription(
1✔
216
            String.format(
1✔
217
                Locale.US,
218
                "Request metadata larger than %d: %d",
219
                serverMaxInboundMetadataSize,
1✔
220
                metadataSize));
1✔
221
        return failedClientStream(statsTraceContext, status);
1✔
222
      }
223
    }
224

225
    return new InProcessStream(method, headers, callOptions, authority, statsTraceContext)
1✔
226
        .clientStream;
1✔
227
  }
228

229
  private ClientStream failedClientStream(
230
      final StatsTraceContext statsTraceCtx, final Status status) {
231
    return new NoopClientStream() {
1✔
232
        @Override
233
        public void start(ClientStreamListener listener) {
234
          statsTraceCtx.clientOutboundHeaders();
1✔
235
          statsTraceCtx.streamClosed(status);
1✔
236
          listener.closed(status, RpcProgress.PROCESSED, new Metadata());
1✔
237
        }
1✔
238
      };
239
  }
240

241
  @Override
242
  public synchronized void ping(final PingCallback callback, Executor executor) {
243
    if (terminated) {
1✔
244
      final Status shutdownStatus = this.shutdownStatus;
1✔
245
      executor.execute(new Runnable() {
1✔
246
        @Override
247
        public void run() {
248
          callback.onFailure(shutdownStatus);
1✔
249
        }
1✔
250
      });
251
    } else {
1✔
252
      executor.execute(new Runnable() {
1✔
253
        @Override
254
        public void run() {
255
          callback.onSuccess(0);
1✔
256
        }
1✔
257
      });
258
    }
259
  }
1✔
260

261
  @Override
262
  public synchronized void shutdown(Status reason) {
263
    // Can be called multiple times: once for ManagedClientTransport, once for ServerTransport.
264
    if (shutdown) {
1✔
265
      return;
1✔
266
    }
267
    shutdownStatus = reason;
1✔
268
    notifyShutdown(reason);
1✔
269
    if (streams.isEmpty()) {
1✔
270
      notifyTerminated();
1✔
271
    }
272
  }
1✔
273

274
  @Override
275
  public synchronized void shutdown() {
276
    shutdown(Status.UNAVAILABLE.withDescription("InProcessTransport shutdown by the server-side"));
1✔
277
  }
1✔
278

279
  @Override
280
  public void shutdownNow(Status reason) {
281
    checkNotNull(reason, "reason");
1✔
282
    List<InProcessStream> streamsCopy;
283
    synchronized (this) {
1✔
284
      shutdown(reason);
1✔
285
      if (terminated) {
1✔
286
        return;
1✔
287
      }
288
      streamsCopy = new ArrayList<>(streams);
1✔
289
    }
1✔
290
    for (InProcessStream stream : streamsCopy) {
1✔
291
      stream.clientStream.cancel(reason);
1✔
292
    }
1✔
293
  }
1✔
294

295
  @Override
296
  public String toString() {
297
    return MoreObjects.toStringHelper(this)
×
298
        .add("logId", logId.getId())
×
299
        .add("address", address)
×
300
        .toString();
×
301
  }
302

303
  @Override
304
  public InternalLogId getLogId() {
305
    return logId;
1✔
306
  }
307

308
  @Override
309
  public Attributes getAttributes() {
310
    return attributes;
1✔
311
  }
312

313
  @Override
314
  public ScheduledExecutorService getScheduledExecutorService() {
315
    return serverScheduler;
1✔
316
  }
317

318
  @Override
319
  public ListenableFuture<SocketStats> getStats() {
320
    SettableFuture<SocketStats> ret = SettableFuture.create();
×
321
    ret.set(null);
×
322
    return ret;
×
323
  }
324

325
  private synchronized void notifyShutdown(Status s) {
326
    if (shutdown) {
1✔
327
      return;
×
328
    }
329
    shutdown = true;
1✔
330
    clientTransportListener.transportShutdown(s);
1✔
331
  }
1✔
332

333
  private synchronized void notifyTerminated() {
334
    if (terminated) {
1✔
335
      return;
×
336
    }
337
    terminated = true;
1✔
338
    if (serverScheduler != null) {
1✔
339
      serverScheduler = serverSchedulerPool.returnObject(serverScheduler);
1✔
340
    }
341
    clientTransportListener.transportTerminated();
1✔
342
    if (serverTransportListener != null) {
1✔
343
      serverTransportListener.transportTerminated();
1✔
344
    }
345
  }
1✔
346

347
  private static int metadataSize(Metadata metadata) {
348
    byte[][] serialized = InternalMetadata.serialize(metadata);
1✔
349
    if (serialized == null) {
1✔
350
      return 0;
×
351
    }
352
    // Calculate based on SETTINGS_MAX_HEADER_LIST_SIZE in RFC 7540 §6.5.2. We could use something
353
    // different, but it's "sane."
354
    long size = 0;
1✔
355
    for (int i = 0; i < serialized.length; i += 2) {
1✔
356
      size += 32 + serialized[i].length + serialized[i + 1].length;
1✔
357
    }
358
    size = Math.min(size, Integer.MAX_VALUE);
1✔
359
    return (int) size;
1✔
360
  }
361

362
  private class InProcessStream {
363
    private final InProcessClientStream clientStream;
364
    private final InProcessServerStream serverStream;
365
    private final CallOptions callOptions;
366
    private final Metadata headers;
367
    private final MethodDescriptor<?, ?> method;
368
    private volatile String authority;
369

370
    private InProcessStream(
371
        MethodDescriptor<?, ?> method, Metadata headers, CallOptions callOptions,
372
        String authority , StatsTraceContext statsTraceContext) {
1✔
373
      this.method = checkNotNull(method, "method");
1✔
374
      this.headers = checkNotNull(headers, "headers");
1✔
375
      this.callOptions = checkNotNull(callOptions, "callOptions");
1✔
376
      this.authority = authority;
1✔
377
      this.clientStream = new InProcessClientStream(callOptions, statsTraceContext);
1✔
378
      this.serverStream = new InProcessServerStream(method, headers);
1✔
379
    }
1✔
380

381
    // Can be called multiple times due to races on both client and server closing at same time.
382
    private void streamClosed() {
383
      synchronized (InProcessTransport.this) {
1✔
384
        boolean justRemovedAnElement = streams.remove(this);
1✔
385
        if (GrpcUtil.shouldBeCountedForInUse(callOptions)) {
1✔
386
          inUseState.updateObjectInUse(this, false);
1✔
387
        }
388
        if (streams.isEmpty() && justRemovedAnElement) {
1✔
389
          if (shutdown) {
1✔
390
            notifyTerminated();
1✔
391
          }
392
        }
393
      }
1✔
394
    }
1✔
395

396
    private class InProcessServerStream implements ServerStream {
397
      final StatsTraceContext statsTraceCtx;
398
      // All callbacks must run in syncContext to avoid possibility of deadlock in direct executors
399
      private ClientStreamListener clientStreamListener;
400
      private final SynchronizationContext syncContext =
1✔
401
          new SynchronizationContext(uncaughtExceptionHandler);
1✔
402
      @GuardedBy("this")
403
      private int clientRequested;
404
      @GuardedBy("this")
1✔
405
      private ArrayDeque<StreamListener.MessageProducer> clientReceiveQueue =
406
          new ArrayDeque<>();
407
      @GuardedBy("this")
408
      private Status clientNotifyStatus;
409
      @GuardedBy("this")
410
      private Metadata clientNotifyTrailers;
411
      // Only is intended to prevent double-close when client cancels.
412
      @GuardedBy("this")
413
      private boolean closed;
414
      @GuardedBy("this")
415
      private int outboundSeqNo;
416

417
      InProcessServerStream(MethodDescriptor<?, ?> method, Metadata headers) {
1✔
418
        statsTraceCtx = StatsTraceContext.newServerContext(
1✔
419
            serverStreamTracerFactories, method.getFullMethodName(), headers);
1✔
420
      }
1✔
421

422
      private synchronized void setListener(ClientStreamListener listener) {
423
        clientStreamListener = listener;
1✔
424
      }
1✔
425

426
      @Override
427
      public void setListener(ServerStreamListener serverStreamListener) {
428
        clientStream.setListener(serverStreamListener);
1✔
429
      }
1✔
430

431
      @Override
432
      public void request(int numMessages) {
433
        boolean onReady = clientStream.serverRequested(numMessages);
1✔
434
        if (onReady) {
1✔
435
          synchronized (this) {
1✔
436
            if (!closed) {
1✔
437
              syncContext.executeLater(() -> clientStreamListener.onReady());
1✔
438
            }
439
          }
1✔
440
        }
441
        syncContext.drain();
1✔
442
      }
1✔
443

444
      // This method is the only reason we have to synchronize field accesses.
445
      /**
446
       * Client requested more messages.
447
       *
448
       * @return whether onReady should be called on the server
449
       */
450
      private boolean clientRequested(int numMessages) {
451
        boolean previouslyReady;
452
        boolean nowReady;
453
        synchronized (this) {
1✔
454
          if (closed) {
1✔
455
            return false;
1✔
456
          }
457

458
          previouslyReady = clientRequested > 0;
1✔
459
          clientRequested += numMessages;
1✔
460
          while (clientRequested > 0 && !clientReceiveQueue.isEmpty()) {
1✔
461
            clientRequested--;
1✔
462
            StreamListener.MessageProducer producer = clientReceiveQueue.poll();
1✔
463
            syncContext.executeLater(() -> clientStreamListener.messagesAvailable(producer));
1✔
464
          }
1✔
465

466
          if (clientReceiveQueue.isEmpty() && clientNotifyStatus != null) {
1✔
467
            closed = true;
1✔
468
            clientStream.statsTraceCtx.clientInboundTrailers(clientNotifyTrailers);
1✔
469
            clientStream.statsTraceCtx.streamClosed(clientNotifyStatus);
1✔
470
            Status notifyStatus = this.clientNotifyStatus;
1✔
471
            Metadata notifyTrailers = this.clientNotifyTrailers;
1✔
472
            syncContext.executeLater(() ->
1✔
473
                clientStreamListener.closed(notifyStatus, RpcProgress.PROCESSED, notifyTrailers));
1✔
474
          }
475

476
          nowReady = clientRequested > 0;
1✔
477
        }
1✔
478

479
        syncContext.drain();
1✔
480
        return !previouslyReady && nowReady;
1✔
481
      }
482

483
      private void clientCancelled(Status status) {
484
        internalCancel(status);
1✔
485
      }
1✔
486

487
      @Override
488
      public void writeMessage(InputStream message) {
489
        long messageLength = 0;
1✔
490
        if (isEnabledSupportTracingMessageSizes) {
1✔
491
          try {
492
            if (assumedMessageSize != -1) {
×
493
              messageLength = assumedMessageSize;
×
494
            } else if (message instanceof KnownLength || message instanceof ByteArrayInputStream) {
×
495
              messageLength = message.available();
×
496
            } else {
497
              InputStream oldMessage = message;
×
498
              byte[] payload = ByteStreams.toByteArray(message);
×
499
              messageLength = payload.length;
×
500
              message = new ByteArrayInputStream(payload);
×
501
              oldMessage.close();
×
502
            }
503
          } catch (Exception e) {
×
504
            throw new RuntimeException("Error processing the message length", e);
×
505
          }
×
506
        }
507

508
        synchronized (this) {
1✔
509
          if (closed) {
1✔
510
            return;
1✔
511
          }
512
          statsTraceCtx.outboundMessage(outboundSeqNo);
1✔
513
          statsTraceCtx.outboundMessageSent(outboundSeqNo, -1, -1);
1✔
514
          clientStream.statsTraceCtx.inboundMessage(outboundSeqNo);
1✔
515
          clientStream.statsTraceCtx.inboundMessageRead(outboundSeqNo, -1, -1);
1✔
516
          if (isEnabledSupportTracingMessageSizes) {
1✔
517
            statsTraceCtx.outboundUncompressedSize(messageLength);
×
518
            statsTraceCtx.outboundWireSize(messageLength);
×
519
            // messageLength should be same at receiver's end as no actual wire is involved.
520
            clientStream.statsTraceCtx.inboundUncompressedSize(messageLength);
×
521
            clientStream.statsTraceCtx.inboundWireSize(messageLength);
×
522
          }
523
          outboundSeqNo++;
1✔
524
          StreamListener.MessageProducer producer = new SingleMessageProducer(message);
1✔
525
          if (clientRequested > 0) {
1✔
526
            clientRequested--;
1✔
527
            syncContext.executeLater(() -> clientStreamListener.messagesAvailable(producer));
1✔
528
          } else {
529
            clientReceiveQueue.add(producer);
1✔
530
          }
531
        }
1✔
532
        syncContext.drain();
1✔
533
      }
1✔
534

535
      @Override
536
      public void flush() {}
1✔
537

538
      @Override
539
      public synchronized boolean isReady() {
540
        if (closed) {
1✔
541
          return false;
×
542
        }
543
        return clientRequested > 0;
1✔
544
      }
545

546
      @Override
547
      public void writeHeaders(Metadata headers, boolean flush) {
548
        if (clientMaxInboundMetadataSize != Integer.MAX_VALUE) {
1✔
549
          int metadataSize = metadataSize(headers);
1✔
550
          if (metadataSize > clientMaxInboundMetadataSize) {
1✔
551
            Status serverStatus = Status.CANCELLED.withDescription("Client cancelled the RPC");
1✔
552
            clientStream.serverClosed(serverStatus, serverStatus);
1✔
553
            // Other transports provide very little information in this case. We go ahead and make a
554
            // Status, which may need to be updated if statuscodes.md is updated.
555
            Status failedStatus = Status.RESOURCE_EXHAUSTED.withDescription(
1✔
556
                String.format(
1✔
557
                    Locale.US,
558
                    "Response header metadata larger than %d: %d",
559
                    clientMaxInboundMetadataSize,
1✔
560
                    metadataSize));
1✔
561
            notifyClientClose(failedStatus, new Metadata());
1✔
562
            return;
1✔
563
          }
564
        }
565

566
        synchronized (this) {
1✔
567
          if (closed) {
1✔
568
            return;
1✔
569
          }
570

571
          clientStream.statsTraceCtx.clientInboundHeaders(headers);
1✔
572
          syncContext.executeLater(() -> clientStreamListener.headersRead(headers));
1✔
573
        }
1✔
574
        syncContext.drain();
1✔
575
      }
1✔
576

577
      @Override
578
      public void close(Status status, Metadata trailers) {
579
        // clientStream.serverClosed must happen before clientStreamListener.closed, otherwise
580
        // clientStreamListener.closed can trigger clientStream.cancel (see code in
581
        // ClientCalls.blockingUnaryCall), which may race with clientStream.serverClosed as both are
582
        // calling internalCancel().
583
        clientStream.serverClosed(Status.OK, status);
1✔
584

585
        if (clientMaxInboundMetadataSize != Integer.MAX_VALUE) {
1✔
586
          int statusSize = status.getDescription() == null ? 0 : status.getDescription().length();
1✔
587
          // Go ahead and throw in the status description's length, since that could be very long.
588
          int metadataSize = metadataSize(trailers) + statusSize;
1✔
589
          if (metadataSize > clientMaxInboundMetadataSize) {
1✔
590
            // Override the status for the client, but not the server. Transports do not guarantee
591
            // notifying the server of the failure.
592

593
            // Other transports provide very little information in this case. We go ahead and make a
594
            // Status, which may need to be updated if statuscodes.md is updated.
595
            status = Status.RESOURCE_EXHAUSTED.withDescription(
1✔
596
                String.format(
1✔
597
                    Locale.US,
598
                    "Response header metadata larger than %d: %d",
599
                    clientMaxInboundMetadataSize,
1✔
600
                    metadataSize));
1✔
601
            trailers = new Metadata();
1✔
602
          }
603
        }
604

605
        notifyClientClose(status, trailers);
1✔
606
      }
1✔
607

608
      /** clientStream.serverClosed() must be called before this method. */
609
      private void notifyClientClose(Status status, Metadata trailers) {
610
        Status clientStatus = cleanStatus(status, includeCauseWithStatus);
1✔
611
        synchronized (this) {
1✔
612
          if (closed) {
1✔
613
            return;
1✔
614
          }
615
          if (clientReceiveQueue.isEmpty()) {
1✔
616
            closed = true;
1✔
617
            clientStream.statsTraceCtx.clientInboundTrailers(trailers);
1✔
618
            clientStream.statsTraceCtx.streamClosed(clientStatus);
1✔
619
            syncContext.executeLater(
1✔
620
                () -> clientStreamListener.closed(clientStatus, RpcProgress.PROCESSED, trailers));
1✔
621
          } else {
622
            clientNotifyStatus = clientStatus;
1✔
623
            clientNotifyTrailers = trailers;
1✔
624
          }
625
        }
1✔
626
        syncContext.drain();
1✔
627
        streamClosed();
1✔
628
      }
1✔
629

630
      @Override
631
      public void cancel(Status status) {
632
        if (!internalCancel(Status.CANCELLED.withDescription("server cancelled stream"))) {
1✔
633
          return;
1✔
634
        }
635
        clientStream.serverClosed(status, status);
1✔
636
        streamClosed();
1✔
637
      }
1✔
638

639
      private boolean internalCancel(Status clientStatus) {
640
        synchronized (this) {
1✔
641
          if (closed) {
1✔
642
            return false;
1✔
643
          }
644
          closed = true;
1✔
645
          StreamListener.MessageProducer producer;
646
          while ((producer = clientReceiveQueue.poll()) != null) {
1✔
647
            InputStream message;
648
            while ((message = producer.next()) != null) {
×
649
              try {
650
                message.close();
×
651
              } catch (Throwable t) {
×
652
                log.log(Level.WARNING, "Exception closing stream", t);
×
653
              }
×
654
            }
655
          }
×
656
          clientStream.statsTraceCtx.streamClosed(clientStatus);
1✔
657
          syncContext.executeLater(
1✔
658
              () ->
659
                  clientStreamListener.closed(clientStatus, RpcProgress.PROCESSED, new Metadata()));
1✔
660
        }
1✔
661
        syncContext.drain();
1✔
662
        return true;
1✔
663
      }
664

665
      @Override
666
      public void setMessageCompression(boolean enable) {
667
        // noop
668
      }
×
669

670
      @Override
671
      public void optimizeForDirectExecutor() {}
1✔
672

673
      @Override
674
      public void setCompressor(Compressor compressor) {}
1✔
675

676
      @Override
677
      public void setDecompressor(Decompressor decompressor) {}
×
678

679
      @Override public Attributes getAttributes() {
680
        return serverStreamAttributes;
1✔
681
      }
682

683
      @Override
684
      public String getAuthority() {
685
        return InProcessStream.this.authority;
1✔
686
      }
687

688
      @Override
689
      public StatsTraceContext statsTraceContext() {
690
        return statsTraceCtx;
1✔
691
      }
692

693
      @Override
694
      public int streamId() {
695
        return -1;
1✔
696
      }
697

698
      @Override
699
      public void setOnReadyThreshold(int numBytes) {
700
        // noop
701
      }
×
702
    }
703

704
    private class InProcessClientStream implements ClientStream {
705
      final StatsTraceContext statsTraceCtx;
706
      final CallOptions callOptions;
707
      // All callbacks must run in syncContext to avoid possibility of deadlock in direct executors
708
      private ServerStreamListener serverStreamListener;
709
      private final SynchronizationContext syncContext =
1✔
710
          new SynchronizationContext(uncaughtExceptionHandler);
1✔
711
      @GuardedBy("this")
712
      private int serverRequested;
713
      @GuardedBy("this")
1✔
714
      private ArrayDeque<StreamListener.MessageProducer> serverReceiveQueue =
715
          new ArrayDeque<>();
716
      @GuardedBy("this")
717
      private boolean serverNotifyHalfClose;
718
      // Only is intended to prevent double-close when server closes.
719
      @GuardedBy("this")
720
      private boolean closed;
721
      @GuardedBy("this")
722
      private int outboundSeqNo;
723

724
      InProcessClientStream(
725
          CallOptions callOptions, StatsTraceContext statsTraceContext) {
1✔
726
        this.callOptions = callOptions;
1✔
727
        statsTraceCtx = statsTraceContext;
1✔
728
      }
1✔
729

730
      private synchronized void setListener(ServerStreamListener listener) {
731
        this.serverStreamListener = listener;
1✔
732
      }
1✔
733

734
      @Override
735
      public void request(int numMessages) {
736
        boolean onReady = serverStream.clientRequested(numMessages);
1✔
737
        if (onReady) {
1✔
738
          synchronized (this) {
1✔
739
            if (!closed) {
1✔
740
              syncContext.executeLater(() -> serverStreamListener.onReady());
1✔
741
            }
742
          }
1✔
743
          syncContext.drain();
1✔
744
        }
745
      }
1✔
746

747
      // This method is the only reason we have to synchronize field accesses.
748
      /**
749
       * Client requested more messages.
750
       *
751
       * @return whether onReady should be called on the server
752
       */
753
      private boolean serverRequested(int numMessages) {
754
        boolean previouslyReady;
755
        boolean nowReady;
756
        synchronized (this) {
1✔
757
          if (closed) {
1✔
758
            return false;
1✔
759
          }
760
          previouslyReady = serverRequested > 0;
1✔
761
          serverRequested += numMessages;
1✔
762

763
          while (serverRequested > 0 && !serverReceiveQueue.isEmpty()) {
1✔
764
            serverRequested--;
1✔
765
            StreamListener.MessageProducer producer = serverReceiveQueue.poll();
1✔
766
            syncContext.executeLater(() -> serverStreamListener.messagesAvailable(producer));
1✔
767
          }
1✔
768

769
          if (serverReceiveQueue.isEmpty() && serverNotifyHalfClose) {
1✔
770
            serverNotifyHalfClose = false;
1✔
771
            syncContext.executeLater(() -> serverStreamListener.halfClosed());
1✔
772
          }
773
          nowReady = serverRequested > 0;
1✔
774
        }
1✔
775
        syncContext.drain();
1✔
776
        return !previouslyReady && nowReady;
1✔
777
      }
778

779
      private void serverClosed(Status serverListenerStatus, Status serverTracerStatus) {
780
        internalCancel(serverListenerStatus, serverTracerStatus);
1✔
781
      }
1✔
782

783
      @Override
784
      public void writeMessage(InputStream message) {
785
        long messageLength = 0;
1✔
786
        if (isEnabledSupportTracingMessageSizes) {
1✔
787
          try {
788
            if (assumedMessageSize != -1) {
×
789
              messageLength = assumedMessageSize;
×
790
            } else if (message instanceof KnownLength || message instanceof ByteArrayInputStream) {
×
791
              messageLength = message.available();
×
792
            } else {
793
              InputStream oldMessage = message;
×
794
              byte[] payload = ByteStreams.toByteArray(message);
×
795
              messageLength = payload.length;
×
796
              message = new ByteArrayInputStream(payload);
×
797
              oldMessage.close();
×
798
            }
799
          } catch (Exception e) {
×
800
            throw new RuntimeException("Error processing the message length", e);
×
801
          }
×
802
        }
803
        synchronized (this) {
1✔
804
          if (closed) {
1✔
805
            return;
1✔
806
          }
807
          statsTraceCtx.outboundMessage(outboundSeqNo);
1✔
808
          statsTraceCtx.outboundMessageSent(outboundSeqNo, -1, -1);
1✔
809
          serverStream.statsTraceCtx.inboundMessage(outboundSeqNo);
1✔
810
          serverStream.statsTraceCtx.inboundMessageRead(outboundSeqNo, -1, -1);
1✔
811
          if (isEnabledSupportTracingMessageSizes) {
1✔
812
            statsTraceCtx.outboundUncompressedSize(messageLength);
×
813
            statsTraceCtx.outboundWireSize(messageLength);
×
814
            // messageLength should be same at receiver's end as no actual wire is involved.
815
            serverStream.statsTraceCtx.inboundUncompressedSize(messageLength);
×
816
            serverStream.statsTraceCtx.inboundWireSize(messageLength);
×
817
          }
818
          outboundSeqNo++;
1✔
819
          StreamListener.MessageProducer producer = new SingleMessageProducer(message);
1✔
820
          if (serverRequested > 0) {
1✔
821
            serverRequested--;
1✔
822
            syncContext.executeLater(() -> serverStreamListener.messagesAvailable(producer));
1✔
823
          } else {
824
            serverReceiveQueue.add(producer);
1✔
825
          }
826
        }
1✔
827
        syncContext.drain();
1✔
828
      }
1✔
829

830
      @Override
831
      public void flush() {}
1✔
832

833
      @Override
834
      public synchronized boolean isReady() {
835
        if (closed) {
1✔
836
          return false;
1✔
837
        }
838
        return serverRequested > 0;
1✔
839
      }
840

841
      // Must be thread-safe for shutdownNow()
842
      @Override
843
      public void cancel(Status reason) {
844
        Status serverStatus = cleanStatus(reason, includeCauseWithStatus);
1✔
845
        if (!internalCancel(serverStatus, serverStatus)) {
1✔
846
          return;
1✔
847
        }
848
        serverStream.clientCancelled(reason);
1✔
849
        streamClosed();
1✔
850
      }
1✔
851

852
      private boolean internalCancel(
853
          Status serverListenerStatus, Status serverTracerStatus) {
854
        synchronized (this) {
1✔
855
          if (closed) {
1✔
856
            return false;
1✔
857
          }
858
          closed = true;
1✔
859

860
          StreamListener.MessageProducer producer;
861
          while ((producer = serverReceiveQueue.poll()) != null) {
1✔
862
            InputStream message;
863
            while ((message = producer.next()) != null) {
1✔
864
              try {
865
                message.close();
1✔
866
              } catch (Throwable t) {
×
867
                log.log(Level.WARNING, "Exception closing stream", t);
×
868
              }
1✔
869
            }
870
          }
1✔
871
          serverStream.statsTraceCtx.streamClosed(serverTracerStatus);
1✔
872
          syncContext.executeLater(() -> serverStreamListener.closed(serverListenerStatus));
1✔
873
        }
1✔
874
        syncContext.drain();
1✔
875
        return true;
1✔
876
      }
877

878
      @Override
879
      public void halfClose() {
880
        synchronized (this) {
1✔
881
          if (closed) {
1✔
882
            return;
1✔
883
          }
884
          if (serverReceiveQueue.isEmpty()) {
1✔
885
            syncContext.executeLater(() -> serverStreamListener.halfClosed());
1✔
886
          } else {
887
            serverNotifyHalfClose = true;
1✔
888
          }
889
        }
1✔
890
        syncContext.drain();
1✔
891
      }
1✔
892

893
      @Override
894
      public void setMessageCompression(boolean enable) {}
×
895

896
      @Override
897
      public void setAuthority(String string) {
898
        InProcessStream.this.authority = string;
×
899
      }
×
900

901
      @Override
902
      public void start(ClientStreamListener listener) {
903
        serverStream.setListener(listener);
1✔
904

905
        synchronized (InProcessTransport.this) {
1✔
906
          statsTraceCtx.clientOutboundHeaders();
1✔
907
          streams.add(InProcessTransport.InProcessStream.this);
1✔
908
          if (GrpcUtil.shouldBeCountedForInUse(callOptions)) {
1✔
909
            inUseState.updateObjectInUse(InProcessTransport.InProcessStream.this, true);
1✔
910
          }
911
          serverTransportListener.streamCreated(serverStream, method.getFullMethodName(), headers);
1✔
912
        }
1✔
913
      }
1✔
914

915
      @Override
916
      public Attributes getAttributes() {
917
        return attributes;
1✔
918
      }
919

920
      @Override
921
      public void optimizeForDirectExecutor() {}
1✔
922

923
      @Override
924
      public void setCompressor(Compressor compressor) {}
1✔
925

926
      @Override
927
      public void setFullStreamDecompression(boolean fullStreamDecompression) {}
×
928

929
      @Override
930
      public void setDecompressorRegistry(DecompressorRegistry decompressorRegistry) {}
1✔
931

932
      @Override
933
      public void setMaxInboundMessageSize(int maxSize) {}
1✔
934

935
      @Override
936
      public void setMaxOutboundMessageSize(int maxSize) {}
1✔
937

938
      @Override
939
      public void setDeadline(Deadline deadline) {
940
        headers.discardAll(TIMEOUT_KEY);
1✔
941
        headers.put(TIMEOUT_KEY, deadline.timeRemaining(TimeUnit.NANOSECONDS));
1✔
942
      }
1✔
943

944
      @Override
945
      public void appendTimeoutInsight(InsightBuilder insight) {
946
      }
1✔
947
    }
948
  }
949

950
  /**
951
   * Returns a new status with the same code and description.
952
   * If includeCauseWithStatus is true, cause is also included.
953
   *
954
   * <p>For InProcess transport to behave in the same way as the other transports,
955
   * when exchanging statuses between client and server and vice versa,
956
   * the cause should be excluded from the status.
957
   * For easier debugging, the status may be optionally included.
958
   */
959
  private static Status cleanStatus(Status status, boolean includeCauseWithStatus) {
960
    if (status == null) {
1✔
961
      return null;
×
962
    }
963
    Status clientStatus = Status
1✔
964
        .fromCodeValue(status.getCode().value())
1✔
965
        .withDescription(status.getDescription());
1✔
966
    if (includeCauseWithStatus) {
1✔
967
      clientStatus = clientStatus.withCause(status.getCause());
1✔
968
    }
969
    return clientStatus;
1✔
970
  }
971

972
  private static class SingleMessageProducer implements StreamListener.MessageProducer {
973
    private InputStream message;
974

975
    private SingleMessageProducer(InputStream message) {
1✔
976
      this.message = message;
1✔
977
    }
1✔
978

979
    @Nullable
980
    @Override
981
    public InputStream next() {
982
      InputStream messageToReturn = message;
1✔
983
      message = null;
1✔
984
      return messageToReturn;
1✔
985
    }
986
  }
987
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc