• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

grpc / grpc-java / #20087

14 Nov 2025 06:46PM UTC coverage: 88.527% (+0.007%) from 88.52%
#20087

push

github

ejona86
Bump version to 1.76.2-SNAPSHOT

34646 of 39136 relevant lines covered (88.53%)

0.89 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

86.49
/../inprocess/src/main/java/io/grpc/inprocess/InProcessTransport.java
1
/*
2
 * Copyright 2015 The gRPC Authors
3
 *
4
 * Licensed under the Apache License, Version 2.0 (the "License");
5
 * you may not use this file except in compliance with the License.
6
 * You may obtain a copy of the License at
7
 *
8
 *     http://www.apache.org/licenses/LICENSE-2.0
9
 *
10
 * Unless required by applicable law or agreed to in writing, software
11
 * distributed under the License is distributed on an "AS IS" BASIS,
12
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
 * See the License for the specific language governing permissions and
14
 * limitations under the License.
15
 */
16

17
package io.grpc.inprocess;
18

19
import static com.google.common.base.Preconditions.checkNotNull;
20
import static io.grpc.internal.GrpcUtil.TIMEOUT_KEY;
21

22
import com.google.common.base.MoreObjects;
23
import com.google.common.io.ByteStreams;
24
import com.google.common.util.concurrent.ListenableFuture;
25
import com.google.common.util.concurrent.SettableFuture;
26
import com.google.errorprone.annotations.CheckReturnValue;
27
import com.google.errorprone.annotations.concurrent.GuardedBy;
28
import io.grpc.Attributes;
29
import io.grpc.CallOptions;
30
import io.grpc.ClientStreamTracer;
31
import io.grpc.Compressor;
32
import io.grpc.Deadline;
33
import io.grpc.Decompressor;
34
import io.grpc.DecompressorRegistry;
35
import io.grpc.Grpc;
36
import io.grpc.InternalChannelz.SocketStats;
37
import io.grpc.InternalLogId;
38
import io.grpc.InternalMetadata;
39
import io.grpc.KnownLength;
40
import io.grpc.Metadata;
41
import io.grpc.MethodDescriptor;
42
import io.grpc.SecurityLevel;
43
import io.grpc.ServerStreamTracer;
44
import io.grpc.Status;
45
import io.grpc.SynchronizationContext;
46
import io.grpc.internal.ClientStream;
47
import io.grpc.internal.ClientStreamListener;
48
import io.grpc.internal.ClientStreamListener.RpcProgress;
49
import io.grpc.internal.ConnectionClientTransport;
50
import io.grpc.internal.GrpcAttributes;
51
import io.grpc.internal.GrpcUtil;
52
import io.grpc.internal.InUseStateAggregator;
53
import io.grpc.internal.InsightBuilder;
54
import io.grpc.internal.ManagedClientTransport;
55
import io.grpc.internal.NoopClientStream;
56
import io.grpc.internal.ObjectPool;
57
import io.grpc.internal.ServerStream;
58
import io.grpc.internal.ServerStreamListener;
59
import io.grpc.internal.ServerTransport;
60
import io.grpc.internal.ServerTransportListener;
61
import io.grpc.internal.StatsTraceContext;
62
import io.grpc.internal.StreamListener;
63
import java.io.ByteArrayInputStream;
64
import java.io.InputStream;
65
import java.net.SocketAddress;
66
import java.util.ArrayDeque;
67
import java.util.ArrayList;
68
import java.util.Collections;
69
import java.util.IdentityHashMap;
70
import java.util.List;
71
import java.util.Locale;
72
import java.util.Set;
73
import java.util.concurrent.Executor;
74
import java.util.concurrent.ScheduledExecutorService;
75
import java.util.concurrent.TimeUnit;
76
import java.util.logging.Level;
77
import java.util.logging.Logger;
78
import javax.annotation.Nullable;
79
import javax.annotation.concurrent.ThreadSafe;
80

81
@ThreadSafe
82
final class InProcessTransport implements ServerTransport, ConnectionClientTransport {
83
  private static final Logger log = Logger.getLogger(InProcessTransport.class.getName());
1✔
84
  static boolean isEnabledSupportTracingMessageSizes =
1✔
85
      GrpcUtil.getFlag("GRPC_EXPERIMENTAL_SUPPORT_TRACING_MESSAGE_SIZES", false);
1✔
86

87
  private final InternalLogId logId;
88
  private final SocketAddress address;
89
  private final int clientMaxInboundMetadataSize;
90
  private final String authority;
91
  private final String userAgent;
92
  private int serverMaxInboundMetadataSize;
93
  private final boolean includeCauseWithStatus;
94
  private ObjectPool<ScheduledExecutorService> serverSchedulerPool;
95
  private ScheduledExecutorService serverScheduler;
96
  private ServerTransportListener serverTransportListener;
97
  private Attributes serverStreamAttributes;
98
  private ManagedClientTransport.Listener clientTransportListener;
99
  // The size is assumed from the sender's side.
100
  private final long assumedMessageSize;
101
  @GuardedBy("this")
102
  private boolean shutdown;
103
  @GuardedBy("this")
104
  private boolean terminated;
105
  @GuardedBy("this")
106
  private Status shutdownStatus;
107
  @GuardedBy("this")
1✔
108
  private final Set<InProcessStream> streams = Collections.newSetFromMap(
1✔
109
          new IdentityHashMap<InProcessStream, Boolean>());
110
  @GuardedBy("this")
111
  private List<ServerStreamTracer.Factory> serverStreamTracerFactories;
112
  private Attributes attributes;
113

114
  private Thread.UncaughtExceptionHandler uncaughtExceptionHandler =
1✔
115
      new Thread.UncaughtExceptionHandler() {
1✔
116
        @Override
117
        public void uncaughtException(Thread t, Throwable e) {
118
          if (e instanceof Error) {
×
119
            throw new Error(e);
×
120
          }
121
          throw new RuntimeException(e);
×
122
        }
123
      };
124

125

126
  @GuardedBy("this")
1✔
127
  private final InUseStateAggregator<InProcessStream> inUseState =
128
      new InUseStateAggregator<InProcessStream>() {
1✔
129
        @Override
130
        protected void handleInUse() {
131
          clientTransportListener.transportInUse(true);
1✔
132
        }
1✔
133

134
        @Override
135
        protected void handleNotInUse() {
136
          clientTransportListener.transportInUse(false);
1✔
137
        }
1✔
138
      };
139

140
  public InProcessTransport(SocketAddress address, int maxInboundMetadataSize, String authority,
141
      String userAgent, Attributes eagAttrs,
142
      boolean includeCauseWithStatus, long assumedMessageSize) {
1✔
143
    this.address = address;
1✔
144
    this.clientMaxInboundMetadataSize = maxInboundMetadataSize;
1✔
145
    this.authority = authority;
1✔
146
    this.userAgent = GrpcUtil.getGrpcUserAgent("inprocess", userAgent);
1✔
147
    checkNotNull(eagAttrs, "eagAttrs");
1✔
148
    this.attributes = Attributes.newBuilder()
1✔
149
        .set(GrpcAttributes.ATTR_SECURITY_LEVEL, SecurityLevel.PRIVACY_AND_INTEGRITY)
1✔
150
        .set(GrpcAttributes.ATTR_CLIENT_EAG_ATTRS, eagAttrs)
1✔
151
        .set(Grpc.TRANSPORT_ATTR_REMOTE_ADDR, address)
1✔
152
        .set(Grpc.TRANSPORT_ATTR_LOCAL_ADDR, address)
1✔
153
        .build();
1✔
154
    logId = InternalLogId.allocate(getClass(), address.toString());
1✔
155
    this.includeCauseWithStatus = includeCauseWithStatus;
1✔
156
    this.assumedMessageSize = assumedMessageSize;
1✔
157
  }
1✔
158

159
  @CheckReturnValue
160
  @Override
161
  public synchronized Runnable start(ManagedClientTransport.Listener listener) {
162
    this.clientTransportListener = listener;
1✔
163
    InProcessServer server = InProcessServer.findServer(address);
1✔
164
    if (server != null) {
1✔
165
      serverMaxInboundMetadataSize = server.getMaxInboundMetadataSize();
1✔
166
      serverSchedulerPool = server.getScheduledExecutorServicePool();
1✔
167
      serverScheduler = serverSchedulerPool.getObject();
1✔
168
      serverStreamTracerFactories = server.getStreamTracerFactories();
1✔
169
      // Must be semi-initialized; past this point, can begin receiving requests
170
      serverTransportListener = server.register(this);
1✔
171
    }
172
    if (serverTransportListener == null) {
1✔
173
      shutdownStatus = Status.UNAVAILABLE.withDescription("Could not find server: " + address);
1✔
174
      final Status localShutdownStatus = shutdownStatus;
1✔
175
      return new Runnable() {
1✔
176
        @Override
177
        public void run() {
178
          synchronized (InProcessTransport.this) {
1✔
179
            notifyShutdown(localShutdownStatus);
1✔
180
            notifyTerminated();
1✔
181
          }
1✔
182
        }
1✔
183
      };
184
    }
185
    Attributes serverTransportAttrs = Attributes.newBuilder()
1✔
186
        .set(Grpc.TRANSPORT_ATTR_REMOTE_ADDR, address)
1✔
187
        .set(Grpc.TRANSPORT_ATTR_LOCAL_ADDR, address)
1✔
188
        .build();
1✔
189
    serverStreamAttributes = serverTransportListener.transportReady(serverTransportAttrs);
1✔
190
    attributes = clientTransportListener.filterTransport(attributes);
1✔
191
    clientTransportListener.transportReady();
1✔
192
    return null;
1✔
193
  }
194

195
  @Override
196
  public synchronized ClientStream newStream(
197
      MethodDescriptor<?, ?> method, Metadata headers, CallOptions callOptions,
198
      ClientStreamTracer[] tracers) {
199
    StatsTraceContext statsTraceContext =
1✔
200
        StatsTraceContext.newClientContext(tracers, getAttributes(), headers);
1✔
201
    if (shutdownStatus != null) {
1✔
202
      return failedClientStream(statsTraceContext, shutdownStatus);
1✔
203
    }
204

205
    headers.put(GrpcUtil.USER_AGENT_KEY, userAgent);
1✔
206

207
    if (serverMaxInboundMetadataSize != Integer.MAX_VALUE) {
1✔
208
      int metadataSize = metadataSize(headers);
1✔
209
      if (metadataSize > serverMaxInboundMetadataSize) {
1✔
210
        // Other transports would compute a status with:
211
        //   GrpcUtil.httpStatusToGrpcStatus(431 /* Request Header Fields Too Large */);
212
        // However, that isn't handled specially today, so we'd leak HTTP-isms even though we're
213
        // in-process. We go ahead and make a Status, which may need to be updated if
214
        // statuscodes.md is updated.
215
        Status status = Status.RESOURCE_EXHAUSTED.withDescription(
1✔
216
            String.format(
1✔
217
                Locale.US,
218
                "Request metadata larger than %d: %d",
219
                serverMaxInboundMetadataSize,
1✔
220
                metadataSize));
1✔
221
        return failedClientStream(statsTraceContext, status);
1✔
222
      }
223
    }
224

225
    return new InProcessStream(method, headers, callOptions, authority, statsTraceContext)
1✔
226
        .clientStream;
1✔
227
  }
228

229
  private ClientStream failedClientStream(
230
      final StatsTraceContext statsTraceCtx, final Status status) {
231
    return new NoopClientStream() {
1✔
232
        @Override
233
        public void start(ClientStreamListener listener) {
234
          statsTraceCtx.clientOutboundHeaders();
1✔
235
          statsTraceCtx.streamClosed(status);
1✔
236
          listener.closed(status, RpcProgress.PROCESSED, new Metadata());
1✔
237
        }
1✔
238
      };
239
  }
240

241
  @Override
242
  public synchronized void ping(final PingCallback callback, Executor executor) {
243
    if (terminated) {
1✔
244
      final Status shutdownStatus = this.shutdownStatus;
1✔
245
      executor.execute(new Runnable() {
1✔
246
        @Override
247
        public void run() {
248
          callback.onFailure(shutdownStatus);
1✔
249
        }
1✔
250
      });
251
    } else {
1✔
252
      executor.execute(new Runnable() {
1✔
253
        @Override
254
        public void run() {
255
          callback.onSuccess(0);
1✔
256
        }
1✔
257
      });
258
    }
259
  }
1✔
260

261
  @Override
262
  public synchronized void shutdown(Status reason) {
263
    // Can be called multiple times: once for ManagedClientTransport, once for ServerTransport.
264
    if (shutdown) {
1✔
265
      return;
1✔
266
    }
267
    shutdownStatus = reason;
1✔
268
    notifyShutdown(reason);
1✔
269
    if (streams.isEmpty()) {
1✔
270
      notifyTerminated();
1✔
271
    }
272
  }
1✔
273

274
  @Override
275
  public synchronized void shutdown() {
276
    shutdown(Status.UNAVAILABLE.withDescription("InProcessTransport shutdown by the server-side"));
1✔
277
  }
1✔
278

279
  @Override
280
  public void shutdownNow(Status reason) {
281
    checkNotNull(reason, "reason");
1✔
282
    List<InProcessStream> streamsCopy;
283
    synchronized (this) {
1✔
284
      shutdown(reason);
1✔
285
      if (terminated) {
1✔
286
        return;
1✔
287
      }
288
      streamsCopy = new ArrayList<>(streams);
1✔
289
    }
1✔
290
    for (InProcessStream stream : streamsCopy) {
1✔
291
      stream.clientStream.cancel(reason);
1✔
292
    }
1✔
293
  }
1✔
294

295
  @Override
296
  public String toString() {
297
    return MoreObjects.toStringHelper(this)
×
298
        .add("logId", logId.getId())
×
299
        .add("address", address)
×
300
        .toString();
×
301
  }
302

303
  @Override
304
  public InternalLogId getLogId() {
305
    return logId;
1✔
306
  }
307

308
  @Override
309
  public Attributes getAttributes() {
310
    return attributes;
1✔
311
  }
312

313
  @Override
314
  public ScheduledExecutorService getScheduledExecutorService() {
315
    return serverScheduler;
1✔
316
  }
317

318
  @Override
319
  public ListenableFuture<SocketStats> getStats() {
320
    SettableFuture<SocketStats> ret = SettableFuture.create();
×
321
    ret.set(null);
×
322
    return ret;
×
323
  }
324

325
  private synchronized void notifyShutdown(Status s) {
326
    if (shutdown) {
1✔
327
      return;
×
328
    }
329
    shutdown = true;
1✔
330
    clientTransportListener.transportShutdown(s);
1✔
331
  }
1✔
332

333
  private synchronized void notifyTerminated() {
334
    if (terminated) {
1✔
335
      return;
×
336
    }
337
    terminated = true;
1✔
338
    if (serverScheduler != null) {
1✔
339
      serverScheduler = serverSchedulerPool.returnObject(serverScheduler);
1✔
340
    }
341
    clientTransportListener.transportTerminated();
1✔
342
    if (serverTransportListener != null) {
1✔
343
      serverTransportListener.transportTerminated();
1✔
344
    }
345
  }
1✔
346

347
  private static int metadataSize(Metadata metadata) {
348
    byte[][] serialized = InternalMetadata.serialize(metadata);
1✔
349
    if (serialized == null) {
1✔
350
      return 0;
×
351
    }
352
    // Calculate based on SETTINGS_MAX_HEADER_LIST_SIZE in RFC 7540 §6.5.2. We could use something
353
    // different, but it's "sane."
354
    long size = 0;
1✔
355
    for (int i = 0; i < serialized.length; i += 2) {
1✔
356
      size += 32 + serialized[i].length + serialized[i + 1].length;
1✔
357
    }
358
    size = Math.min(size, Integer.MAX_VALUE);
1✔
359
    return (int) size;
1✔
360
  }
361

362
  private class InProcessStream {
363
    private final InProcessClientStream clientStream;
364
    private final InProcessServerStream serverStream;
365
    private final CallOptions callOptions;
366
    private final Metadata headers;
367
    private final MethodDescriptor<?, ?> method;
368
    private volatile String authority;
369

370
    private InProcessStream(
371
        MethodDescriptor<?, ?> method, Metadata headers, CallOptions callOptions,
372
        String authority , StatsTraceContext statsTraceContext) {
1✔
373
      this.method = checkNotNull(method, "method");
1✔
374
      this.headers = checkNotNull(headers, "headers");
1✔
375
      this.callOptions = checkNotNull(callOptions, "callOptions");
1✔
376
      this.authority = authority;
1✔
377
      this.clientStream = new InProcessClientStream(callOptions, statsTraceContext);
1✔
378
      this.serverStream = new InProcessServerStream(method, headers);
1✔
379
    }
1✔
380

381
    // Can be called multiple times due to races on both client and server closing at same time.
382
    private void streamClosed() {
383
      synchronized (InProcessTransport.this) {
1✔
384
        boolean justRemovedAnElement = streams.remove(this);
1✔
385
        if (GrpcUtil.shouldBeCountedForInUse(callOptions)) {
1✔
386
          inUseState.updateObjectInUse(this, false);
1✔
387
        }
388
        if (streams.isEmpty() && justRemovedAnElement) {
1✔
389
          if (shutdown) {
1✔
390
            notifyTerminated();
1✔
391
          }
392
        }
393
      }
1✔
394
    }
1✔
395

396
    private class InProcessServerStream implements ServerStream {
397
      final StatsTraceContext statsTraceCtx;
398
      // All callbacks must run in syncContext to avoid possibility of deadlock in direct executors
399
      private ClientStreamListener clientStreamListener;
400
      private final SynchronizationContext syncContext =
1✔
401
          new SynchronizationContext(uncaughtExceptionHandler);
1✔
402
      @GuardedBy("this")
403
      private int clientRequested;
404
      @GuardedBy("this")
1✔
405
      private ArrayDeque<StreamListener.MessageProducer> clientReceiveQueue =
406
          new ArrayDeque<>();
407
      @GuardedBy("this")
408
      private Status clientNotifyStatus;
409
      @GuardedBy("this")
410
      private Metadata clientNotifyTrailers;
411
      // Only is intended to prevent double-close when client cancels.
412
      @GuardedBy("this")
413
      private boolean closed;
414
      @GuardedBy("this")
415
      private int outboundSeqNo;
416

417
      InProcessServerStream(MethodDescriptor<?, ?> method, Metadata headers) {
1✔
418
        statsTraceCtx = StatsTraceContext.newServerContext(
1✔
419
            serverStreamTracerFactories, method.getFullMethodName(), headers);
1✔
420
      }
1✔
421

422
      private synchronized void setListener(ClientStreamListener listener) {
423
        clientStreamListener = listener;
1✔
424
      }
1✔
425

426
      @Override
427
      public void setListener(ServerStreamListener serverStreamListener) {
428
        clientStream.setListener(serverStreamListener);
1✔
429
      }
1✔
430

431
      @Override
432
      public void request(int numMessages) {
433
        boolean onReady = clientStream.serverRequested(numMessages);
1✔
434
        if (onReady) {
1✔
435
          synchronized (this) {
1✔
436
            if (!closed) {
1✔
437
              syncContext.executeLater(() -> clientStreamListener.onReady());
1✔
438
            }
439
          }
1✔
440
        }
441
        syncContext.drain();
1✔
442
      }
1✔
443

444
      // This method is the only reason we have to synchronize field accesses.
445
      /**
446
       * Client requested more messages.
447
       *
448
       * @return whether onReady should be called on the server
449
       */
450
      private boolean clientRequested(int numMessages) {
451
        boolean previouslyReady;
452
        boolean nowReady;
453
        synchronized (this) {
1✔
454
          if (closed) {
1✔
455
            return false;
1✔
456
          }
457

458
          previouslyReady = clientRequested > 0;
1✔
459
          clientRequested += numMessages;
1✔
460
          while (clientRequested > 0 && !clientReceiveQueue.isEmpty()) {
1✔
461
            clientRequested--;
1✔
462
            StreamListener.MessageProducer producer = clientReceiveQueue.poll();
1✔
463
            syncContext.executeLater(() -> clientStreamListener.messagesAvailable(producer));
1✔
464
          }
1✔
465

466
          if (clientReceiveQueue.isEmpty() && clientNotifyStatus != null) {
1✔
467
            closed = true;
1✔
468
            clientStream.statsTraceCtx.clientInboundTrailers(clientNotifyTrailers);
1✔
469
            clientStream.statsTraceCtx.streamClosed(clientNotifyStatus);
1✔
470
            Status notifyStatus = this.clientNotifyStatus;
1✔
471
            Metadata notifyTrailers = this.clientNotifyTrailers;
1✔
472
            syncContext.executeLater(() ->
1✔
473
                clientStreamListener.closed(notifyStatus, RpcProgress.PROCESSED, notifyTrailers));
1✔
474
          }
475

476
          nowReady = clientRequested > 0;
1✔
477
        }
1✔
478

479
        syncContext.drain();
1✔
480
        return !previouslyReady && nowReady;
1✔
481
      }
482

483
      private void clientCancelled(Status status) {
484
        internalCancel(status);
1✔
485
      }
1✔
486

487
      @Override
488
      public void writeMessage(InputStream message) {
489
        long messageLength = 0;
1✔
490
        if (isEnabledSupportTracingMessageSizes) {
1✔
491
          try {
492
            if (assumedMessageSize != -1) {
×
493
              messageLength = assumedMessageSize;
×
494
            } else if (message instanceof KnownLength || message instanceof ByteArrayInputStream) {
×
495
              messageLength = message.available();
×
496
            } else {
497
              InputStream oldMessage = message;
×
498
              byte[] payload = ByteStreams.toByteArray(message);
×
499
              messageLength = payload.length;
×
500
              message = new ByteArrayInputStream(payload);
×
501
              oldMessage.close();
×
502
            }
503
          } catch (Exception e) {
×
504
            throw new RuntimeException("Error processing the message length", e);
×
505
          }
×
506
        }
507

508
        synchronized (this) {
1✔
509
          if (closed) {
1✔
510
            return;
1✔
511
          }
512
          statsTraceCtx.outboundMessage(outboundSeqNo);
1✔
513
          statsTraceCtx.outboundMessageSent(outboundSeqNo, -1, -1);
1✔
514
          clientStream.statsTraceCtx.inboundMessage(outboundSeqNo);
1✔
515
          clientStream.statsTraceCtx.inboundMessageRead(outboundSeqNo, -1, -1);
1✔
516
          if (isEnabledSupportTracingMessageSizes) {
1✔
517
            statsTraceCtx.outboundUncompressedSize(messageLength);
×
518
            statsTraceCtx.outboundWireSize(messageLength);
×
519
            // messageLength should be same at receiver's end as no actual wire is involved.
520
            clientStream.statsTraceCtx.inboundUncompressedSize(messageLength);
×
521
            clientStream.statsTraceCtx.inboundWireSize(messageLength);
×
522
          }
523
          outboundSeqNo++;
1✔
524
          StreamListener.MessageProducer producer = new SingleMessageProducer(message);
1✔
525
          if (clientRequested > 0) {
1✔
526
            clientRequested--;
1✔
527
            syncContext.executeLater(() -> clientStreamListener.messagesAvailable(producer));
1✔
528
          } else {
529
            clientReceiveQueue.add(producer);
1✔
530
          }
531
        }
1✔
532
        syncContext.drain();
1✔
533
      }
1✔
534

535
      @Override
536
      public void flush() {}
1✔
537

538
      @Override
539
      public synchronized boolean isReady() {
540
        if (closed) {
1✔
541
          return false;
×
542
        }
543
        return clientRequested > 0;
1✔
544
      }
545

546
      @Override
547
      public void writeHeaders(Metadata headers, boolean flush) {
548
        if (clientMaxInboundMetadataSize != Integer.MAX_VALUE) {
1✔
549
          int metadataSize = metadataSize(headers);
1✔
550
          if (metadataSize > clientMaxInboundMetadataSize) {
1✔
551
            Status serverStatus = Status.CANCELLED.withDescription("Client cancelled the RPC");
1✔
552
            clientStream.serverClosed(serverStatus, serverStatus);
1✔
553
            // Other transports provide very little information in this case. We go ahead and make a
554
            // Status, which may need to be updated if statuscodes.md is updated.
555
            Status failedStatus = Status.RESOURCE_EXHAUSTED.withDescription(
1✔
556
                String.format(
1✔
557
                    Locale.US,
558
                    "Response header metadata larger than %d: %d",
559
                    clientMaxInboundMetadataSize,
1✔
560
                    metadataSize));
1✔
561
            notifyClientClose(failedStatus, new Metadata());
1✔
562
            return;
1✔
563
          }
564
        }
565

566
        synchronized (this) {
1✔
567
          if (closed) {
1✔
568
            return;
1✔
569
          }
570

571
          clientStream.statsTraceCtx.clientInboundHeaders(headers);
1✔
572
          syncContext.executeLater(() -> clientStreamListener.headersRead(headers));
1✔
573
        }
1✔
574
        syncContext.drain();
1✔
575
      }
1✔
576

577
      @Override
578
      public void close(Status status, Metadata trailers) {
579
        // clientStream.serverClosed must happen before clientStreamListener.closed, otherwise
580
        // clientStreamListener.closed can trigger clientStream.cancel (see code in
581
        // ClientCalls.blockingUnaryCall), which may race with clientStream.serverClosed as both are
582
        // calling internalCancel().
583
        clientStream.serverClosed(Status.OK, status);
1✔
584

585
        if (clientMaxInboundMetadataSize != Integer.MAX_VALUE) {
1✔
586
          int statusSize = status.getDescription() == null ? 0 : status.getDescription().length();
1✔
587
          // Go ahead and throw in the status description's length, since that could be very long.
588
          int metadataSize = metadataSize(trailers) + statusSize;
1✔
589
          if (metadataSize > clientMaxInboundMetadataSize) {
1✔
590
            // Override the status for the client, but not the server. Transports do not guarantee
591
            // notifying the server of the failure.
592

593
            // Other transports provide very little information in this case. We go ahead and make a
594
            // Status, which may need to be updated if statuscodes.md is updated.
595
            status = Status.RESOURCE_EXHAUSTED.withDescription(
1✔
596
                String.format(
1✔
597
                    Locale.US,
598
                    "Response header metadata larger than %d: %d",
599
                    clientMaxInboundMetadataSize,
1✔
600
                    metadataSize));
1✔
601
            trailers = new Metadata();
1✔
602
          }
603
        }
604

605
        notifyClientClose(status, trailers);
1✔
606
      }
1✔
607

608
      /** clientStream.serverClosed() must be called before this method. */
609
      private void notifyClientClose(Status status, Metadata trailers) {
610
        Status clientStatus = cleanStatus(status, includeCauseWithStatus);
1✔
611
        synchronized (this) {
1✔
612
          if (closed) {
1✔
613
            return;
1✔
614
          }
615
          if (clientReceiveQueue.isEmpty()) {
1✔
616
            closed = true;
1✔
617
            clientStream.statsTraceCtx.clientInboundTrailers(trailers);
1✔
618
            clientStream.statsTraceCtx.streamClosed(clientStatus);
1✔
619
            syncContext.executeLater(
1✔
620
                () -> clientStreamListener.closed(clientStatus, RpcProgress.PROCESSED, trailers));
1✔
621
          } else {
622
            clientNotifyStatus = clientStatus;
1✔
623
            clientNotifyTrailers = trailers;
1✔
624
          }
625
        }
1✔
626
        syncContext.drain();
1✔
627
        streamClosed();
1✔
628
      }
1✔
629

630
      @Override
631
      public void cancel(Status status) {
632
        if (!internalCancel(Status.CANCELLED.withDescription("server cancelled stream"))) {
1✔
633
          return;
1✔
634
        }
635
        clientStream.serverClosed(status, status);
1✔
636
        streamClosed();
1✔
637
      }
1✔
638

639
      private boolean internalCancel(Status clientStatus) {
640
        synchronized (this) {
1✔
641
          if (closed) {
1✔
642
            return false;
1✔
643
          }
644
          closed = true;
1✔
645
          StreamListener.MessageProducer producer;
646
          while ((producer = clientReceiveQueue.poll()) != null) {
1✔
647
            InputStream message;
648
            while ((message = producer.next()) != null) {
×
649
              try {
650
                message.close();
×
651
              } catch (Throwable t) {
×
652
                log.log(Level.WARNING, "Exception closing stream", t);
×
653
              }
×
654
            }
655
          }
×
656
          clientStream.statsTraceCtx.streamClosed(clientStatus);
1✔
657
          syncContext.executeLater(
1✔
658
              () ->
659
                  clientStreamListener.closed(clientStatus, RpcProgress.PROCESSED, new Metadata()));
1✔
660
        }
1✔
661
        syncContext.drain();
1✔
662
        return true;
1✔
663
      }
664

665
      @Override
666
      public void setMessageCompression(boolean enable) {
667
        // noop
668
      }
×
669

670
      @Override
671
      public void optimizeForDirectExecutor() {}
1✔
672

673
      @Override
674
      public void setCompressor(Compressor compressor) {}
1✔
675

676
      @Override
677
      public void setDecompressor(Decompressor decompressor) {}
×
678

679
      @Override public Attributes getAttributes() {
680
        return serverStreamAttributes;
1✔
681
      }
682

683
      @Override
684
      public String getAuthority() {
685
        return InProcessStream.this.authority;
1✔
686
      }
687

688
      @Override
689
      public StatsTraceContext statsTraceContext() {
690
        return statsTraceCtx;
1✔
691
      }
692

693
      @Override
694
      public int streamId() {
695
        return -1;
1✔
696
      }
697

698
      @Override
699
      public void setOnReadyThreshold(int numBytes) {
700
        // noop
701
      }
×
702
    }
703

704
    private class InProcessClientStream implements ClientStream {
705
      final StatsTraceContext statsTraceCtx;
706
      final CallOptions callOptions;
707
      // All callbacks must run in syncContext to avoid possibility of deadlock in direct executors
708
      private ServerStreamListener serverStreamListener;
709
      private final SynchronizationContext syncContext =
1✔
710
          new SynchronizationContext(uncaughtExceptionHandler);
1✔
711
      @GuardedBy("this")
712
      private int serverRequested;
713
      @GuardedBy("this")
1✔
714
      private ArrayDeque<StreamListener.MessageProducer> serverReceiveQueue =
715
          new ArrayDeque<>();
716
      @GuardedBy("this")
717
      private boolean serverNotifyHalfClose;
718
      // Only is intended to prevent double-close when server closes.
719
      @GuardedBy("this")
720
      private boolean closed;
721
      @GuardedBy("this")
722
      private int outboundSeqNo;
723

724
      InProcessClientStream(
725
          CallOptions callOptions, StatsTraceContext statsTraceContext) {
1✔
726
        this.callOptions = callOptions;
1✔
727
        statsTraceCtx = statsTraceContext;
1✔
728
      }
1✔
729

730
      private synchronized void setListener(ServerStreamListener listener) {
731
        this.serverStreamListener = listener;
1✔
732
      }
1✔
733

734
      @Override
735
      public void request(int numMessages) {
736
        boolean onReady = serverStream.clientRequested(numMessages);
1✔
737
        if (onReady) {
1✔
738
          synchronized (this) {
1✔
739
            if (!closed) {
1✔
740
              syncContext.executeLater(() -> serverStreamListener.onReady());
1✔
741
            }
742
          }
1✔
743
          syncContext.drain();
1✔
744
        }
745
      }
1✔
746

747
      // This method is the only reason we have to synchronize field accesses.
748
      /**
749
       * Client requested more messages.
750
       *
751
       * @return whether onReady should be called on the server
752
       */
753
      private boolean serverRequested(int numMessages) {
754
        boolean previouslyReady;
755
        boolean nowReady;
756
        synchronized (this) {
1✔
757
          if (closed) {
1✔
758
            return false;
1✔
759
          }
760
          previouslyReady = serverRequested > 0;
1✔
761
          serverRequested += numMessages;
1✔
762

763
          while (serverRequested > 0 && !serverReceiveQueue.isEmpty()) {
1✔
764
            serverRequested--;
1✔
765
            StreamListener.MessageProducer producer = serverReceiveQueue.poll();
1✔
766
            syncContext.executeLater(() -> serverStreamListener.messagesAvailable(producer));
1✔
767
          }
1✔
768

769
          if (serverReceiveQueue.isEmpty() && serverNotifyHalfClose) {
1✔
770
            serverNotifyHalfClose = false;
1✔
771
            syncContext.executeLater(() -> serverStreamListener.halfClosed());
1✔
772
          }
773
          nowReady = serverRequested > 0;
1✔
774
        }
1✔
775
        syncContext.drain();
1✔
776
        return !previouslyReady && nowReady;
1✔
777
      }
778

779
      private void serverClosed(Status serverListenerStatus, Status serverTracerStatus) {
780
        internalCancel(serverListenerStatus, serverTracerStatus);
1✔
781
      }
1✔
782

783
      @Override
784
      public void writeMessage(InputStream message) {
785
        long messageLength = 0;
1✔
786
        if (isEnabledSupportTracingMessageSizes) {
1✔
787
          try {
788
            if (assumedMessageSize != -1) {
×
789
              messageLength = assumedMessageSize;
×
790
            } else if (message instanceof KnownLength || message instanceof ByteArrayInputStream) {
×
791
              messageLength = message.available();
×
792
            } else {
793
              InputStream oldMessage = message;
×
794
              byte[] payload = ByteStreams.toByteArray(message);
×
795
              messageLength = payload.length;
×
796
              message = new ByteArrayInputStream(payload);
×
797
              oldMessage.close();
×
798
            }
799
          } catch (Exception e) {
×
800
            throw new RuntimeException("Error processing the message length", e);
×
801
          }
×
802
        }
803
        synchronized (this) {
1✔
804
          if (closed) {
1✔
805
            return;
1✔
806
          }
807
          statsTraceCtx.outboundMessage(outboundSeqNo);
1✔
808
          statsTraceCtx.outboundMessageSent(outboundSeqNo, -1, -1);
1✔
809
          serverStream.statsTraceCtx.inboundMessage(outboundSeqNo);
1✔
810
          serverStream.statsTraceCtx.inboundMessageRead(outboundSeqNo, -1, -1);
1✔
811
          if (isEnabledSupportTracingMessageSizes) {
1✔
812
            statsTraceCtx.outboundUncompressedSize(messageLength);
×
813
            statsTraceCtx.outboundWireSize(messageLength);
×
814
            // messageLength should be same at receiver's end as no actual wire is involved.
815
            serverStream.statsTraceCtx.inboundUncompressedSize(messageLength);
×
816
            serverStream.statsTraceCtx.inboundWireSize(messageLength);
×
817
          }
818
          outboundSeqNo++;
1✔
819
          StreamListener.MessageProducer producer = new SingleMessageProducer(message);
1✔
820
          if (serverRequested > 0) {
1✔
821
            serverRequested--;
1✔
822
            syncContext.executeLater(() -> serverStreamListener.messagesAvailable(producer));
1✔
823
          } else {
824
            serverReceiveQueue.add(producer);
1✔
825
          }
826
        }
1✔
827
        syncContext.drain();
1✔
828
      }
1✔
829

830
      @Override
831
      public void flush() {}
1✔
832

833
      @Override
834
      public synchronized boolean isReady() {
835
        if (closed) {
1✔
836
          return false;
1✔
837
        }
838
        return serverRequested > 0;
1✔
839
      }
840

841
      // Must be thread-safe for shutdownNow()
842
      @Override
843
      public void cancel(Status reason) {
844
        Status serverStatus = cleanStatus(reason, includeCauseWithStatus);
1✔
845
        if (!internalCancel(serverStatus, serverStatus)) {
1✔
846
          return;
1✔
847
        }
848
        serverStream.clientCancelled(reason);
1✔
849
        streamClosed();
1✔
850
      }
1✔
851

852
      private boolean internalCancel(
853
          Status serverListenerStatus, Status serverTracerStatus) {
854
        synchronized (this) {
1✔
855
          if (closed) {
1✔
856
            return false;
1✔
857
          }
858
          closed = true;
1✔
859

860
          StreamListener.MessageProducer producer;
861
          while ((producer = serverReceiveQueue.poll()) != null) {
1✔
862
            InputStream message;
863
            while ((message = producer.next()) != null) {
1✔
864
              try {
865
                message.close();
1✔
866
              } catch (Throwable t) {
×
867
                log.log(Level.WARNING, "Exception closing stream", t);
×
868
              }
1✔
869
            }
870
          }
1✔
871
          serverStream.statsTraceCtx.streamClosed(serverTracerStatus);
1✔
872
          syncContext.executeLater(() -> serverStreamListener.closed(serverListenerStatus));
1✔
873
        }
1✔
874
        syncContext.drain();
1✔
875
        return true;
1✔
876
      }
877

878
      @Override
879
      public void halfClose() {
880
        synchronized (this) {
1✔
881
          if (closed) {
1✔
882
            return;
1✔
883
          }
884
          if (serverReceiveQueue.isEmpty()) {
1✔
885
            syncContext.executeLater(() -> serverStreamListener.halfClosed());
1✔
886
          } else {
887
            serverNotifyHalfClose = true;
1✔
888
          }
889
        }
1✔
890
        syncContext.drain();
1✔
891
      }
1✔
892

893
      @Override
894
      public void setMessageCompression(boolean enable) {}
×
895

896
      @Override
897
      public void setAuthority(String string) {
898
        InProcessStream.this.authority = string;
×
899
      }
×
900

901
      @Override
902
      public void start(ClientStreamListener listener) {
903
        serverStream.setListener(listener);
1✔
904

905
        synchronized (InProcessTransport.this) {
1✔
906
          statsTraceCtx.clientOutboundHeaders();
1✔
907
          streams.add(InProcessTransport.InProcessStream.this);
1✔
908
          if (GrpcUtil.shouldBeCountedForInUse(callOptions)) {
1✔
909
            inUseState.updateObjectInUse(InProcessTransport.InProcessStream.this, true);
1✔
910
          }
911
          serverTransportListener.streamCreated(serverStream, method.getFullMethodName(), headers);
1✔
912
        }
1✔
913
      }
1✔
914

915
      @Override
916
      public Attributes getAttributes() {
917
        return attributes;
1✔
918
      }
919

920
      @Override
921
      public void optimizeForDirectExecutor() {}
1✔
922

923
      @Override
924
      public void setCompressor(Compressor compressor) {}
1✔
925

926
      @Override
927
      public void setFullStreamDecompression(boolean fullStreamDecompression) {}
×
928

929
      @Override
930
      public void setDecompressorRegistry(DecompressorRegistry decompressorRegistry) {}
1✔
931

932
      @Override
933
      public void setMaxInboundMessageSize(int maxSize) {}
1✔
934

935
      @Override
936
      public void setMaxOutboundMessageSize(int maxSize) {}
1✔
937

938
      @Override
939
      public void setDeadline(Deadline deadline) {
940
        headers.discardAll(TIMEOUT_KEY);
1✔
941
        headers.put(TIMEOUT_KEY, deadline.timeRemaining(TimeUnit.NANOSECONDS));
1✔
942
      }
1✔
943

944
      @Override
945
      public void appendTimeoutInsight(InsightBuilder insight) {
946
      }
1✔
947
    }
948
  }
949

950
  /**
951
   * Returns a new status with the same code and description.
952
   * If includeCauseWithStatus is true, cause is also included.
953
   *
954
   * <p>For InProcess transport to behave in the same way as the other transports,
955
   * when exchanging statuses between client and server and vice versa,
956
   * the cause should be excluded from the status.
957
   * For easier debugging, the status may be optionally included.
958
   */
959
  private static Status cleanStatus(Status status, boolean includeCauseWithStatus) {
960
    if (status == null) {
1✔
961
      return null;
×
962
    }
963
    Status clientStatus = Status
1✔
964
        .fromCodeValue(status.getCode().value())
1✔
965
        .withDescription(status.getDescription());
1✔
966
    if (includeCauseWithStatus) {
1✔
967
      clientStatus = clientStatus.withCause(status.getCause());
1✔
968
    }
969
    return clientStatus;
1✔
970
  }
971

972
  private static class SingleMessageProducer implements StreamListener.MessageProducer {
973
    private InputStream message;
974

975
    private SingleMessageProducer(InputStream message) {
1✔
976
      this.message = message;
1✔
977
    }
1✔
978

979
    @Nullable
980
    @Override
981
    public InputStream next() {
982
      InputStream messageToReturn = message;
1✔
983
      message = null;
1✔
984
      return messageToReturn;
1✔
985
    }
986
  }
987
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc