• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

grpc / grpc-java / #19500

08 Oct 2024 05:57PM UTC coverage: 84.666% (-0.003%) from 84.669%
#19500

push

github

web-flow
Don't use Utils.pickUnusedPort. (#11601)

33776 of 39893 relevant lines covered (84.67%)

0.85 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

92.82
/../inprocess/src/main/java/io/grpc/inprocess/InProcessTransport.java
1
/*
2
 * Copyright 2015 The gRPC Authors
3
 *
4
 * Licensed under the Apache License, Version 2.0 (the "License");
5
 * you may not use this file except in compliance with the License.
6
 * You may obtain a copy of the License at
7
 *
8
 *     http://www.apache.org/licenses/LICENSE-2.0
9
 *
10
 * Unless required by applicable law or agreed to in writing, software
11
 * distributed under the License is distributed on an "AS IS" BASIS,
12
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
 * See the License for the specific language governing permissions and
14
 * limitations under the License.
15
 */
16

17
package io.grpc.inprocess;
18

19
import static com.google.common.base.Preconditions.checkNotNull;
20
import static io.grpc.internal.GrpcUtil.TIMEOUT_KEY;
21
import static java.lang.Math.max;
22

23
import com.google.common.base.MoreObjects;
24
import com.google.common.base.Optional;
25
import com.google.common.util.concurrent.ListenableFuture;
26
import com.google.common.util.concurrent.SettableFuture;
27
import io.grpc.Attributes;
28
import io.grpc.CallOptions;
29
import io.grpc.ClientStreamTracer;
30
import io.grpc.Compressor;
31
import io.grpc.Deadline;
32
import io.grpc.Decompressor;
33
import io.grpc.DecompressorRegistry;
34
import io.grpc.Grpc;
35
import io.grpc.InternalChannelz.SocketStats;
36
import io.grpc.InternalLogId;
37
import io.grpc.InternalMetadata;
38
import io.grpc.Metadata;
39
import io.grpc.MethodDescriptor;
40
import io.grpc.SecurityLevel;
41
import io.grpc.ServerStreamTracer;
42
import io.grpc.Status;
43
import io.grpc.SynchronizationContext;
44
import io.grpc.internal.ClientStream;
45
import io.grpc.internal.ClientStreamListener;
46
import io.grpc.internal.ClientStreamListener.RpcProgress;
47
import io.grpc.internal.ConnectionClientTransport;
48
import io.grpc.internal.GrpcAttributes;
49
import io.grpc.internal.GrpcUtil;
50
import io.grpc.internal.InUseStateAggregator;
51
import io.grpc.internal.InsightBuilder;
52
import io.grpc.internal.ManagedClientTransport;
53
import io.grpc.internal.NoopClientStream;
54
import io.grpc.internal.ObjectPool;
55
import io.grpc.internal.ServerListener;
56
import io.grpc.internal.ServerStream;
57
import io.grpc.internal.ServerStreamListener;
58
import io.grpc.internal.ServerTransport;
59
import io.grpc.internal.ServerTransportListener;
60
import io.grpc.internal.StatsTraceContext;
61
import io.grpc.internal.StreamListener;
62
import java.io.InputStream;
63
import java.net.SocketAddress;
64
import java.util.ArrayDeque;
65
import java.util.ArrayList;
66
import java.util.Collections;
67
import java.util.IdentityHashMap;
68
import java.util.List;
69
import java.util.Locale;
70
import java.util.Set;
71
import java.util.concurrent.Executor;
72
import java.util.concurrent.ScheduledExecutorService;
73
import java.util.concurrent.TimeUnit;
74
import java.util.logging.Level;
75
import java.util.logging.Logger;
76
import javax.annotation.CheckReturnValue;
77
import javax.annotation.Nullable;
78
import javax.annotation.concurrent.GuardedBy;
79
import javax.annotation.concurrent.ThreadSafe;
80

81
@ThreadSafe
82
final class InProcessTransport implements ServerTransport, ConnectionClientTransport {
83
  private static final Logger log = Logger.getLogger(InProcessTransport.class.getName());
1✔
84

85
  private final InternalLogId logId;
86
  private final SocketAddress address;
87
  private final int clientMaxInboundMetadataSize;
88
  private final String authority;
89
  private final String userAgent;
90
  private final Optional<ServerListener> optionalServerListener;
91
  private int serverMaxInboundMetadataSize;
92
  private final boolean includeCauseWithStatus;
93
  private ObjectPool<ScheduledExecutorService> serverSchedulerPool;
94
  private ScheduledExecutorService serverScheduler;
95
  private ServerTransportListener serverTransportListener;
96
  private Attributes serverStreamAttributes;
97
  private ManagedClientTransport.Listener clientTransportListener;
98
  @GuardedBy("this")
99
  private boolean shutdown;
100
  @GuardedBy("this")
101
  private boolean terminated;
102
  @GuardedBy("this")
103
  private Status shutdownStatus;
104
  @GuardedBy("this")
1✔
105
  private final Set<InProcessStream> streams = Collections.newSetFromMap(
1✔
106
          new IdentityHashMap<InProcessStream, Boolean>());
107
  @GuardedBy("this")
108
  private List<ServerStreamTracer.Factory> serverStreamTracerFactories;
109
  private Attributes attributes;
110

111
  private Thread.UncaughtExceptionHandler uncaughtExceptionHandler =
1✔
112
      new Thread.UncaughtExceptionHandler() {
1✔
113
        @Override
114
        public void uncaughtException(Thread t, Throwable e) {
115
          if (e instanceof Error) {
×
116
            throw new Error(e);
×
117
          }
118
          throw new RuntimeException(e);
×
119
        }
120
      };
121

122

123
  @GuardedBy("this")
1✔
124
  private final InUseStateAggregator<InProcessStream> inUseState =
125
      new InUseStateAggregator<InProcessStream>() {
1✔
126
        @Override
127
        protected void handleInUse() {
128
          clientTransportListener.transportInUse(true);
1✔
129
        }
1✔
130

131
        @Override
132
        protected void handleNotInUse() {
133
          clientTransportListener.transportInUse(false);
1✔
134
        }
1✔
135
      };
136

137
  private InProcessTransport(SocketAddress address, int maxInboundMetadataSize, String authority,
138
      String userAgent, Attributes eagAttrs,
139
      Optional<ServerListener> optionalServerListener, boolean includeCauseWithStatus) {
1✔
140
    this.address = address;
1✔
141
    this.clientMaxInboundMetadataSize = maxInboundMetadataSize;
1✔
142
    this.authority = authority;
1✔
143
    this.userAgent = GrpcUtil.getGrpcUserAgent("inprocess", userAgent);
1✔
144
    checkNotNull(eagAttrs, "eagAttrs");
1✔
145
    this.attributes = Attributes.newBuilder()
1✔
146
        .set(GrpcAttributes.ATTR_SECURITY_LEVEL, SecurityLevel.PRIVACY_AND_INTEGRITY)
1✔
147
        .set(GrpcAttributes.ATTR_CLIENT_EAG_ATTRS, eagAttrs)
1✔
148
        .set(Grpc.TRANSPORT_ATTR_REMOTE_ADDR, address)
1✔
149
        .set(Grpc.TRANSPORT_ATTR_LOCAL_ADDR, address)
1✔
150
        .build();
1✔
151
    this.optionalServerListener = optionalServerListener;
1✔
152
    logId = InternalLogId.allocate(getClass(), address.toString());
1✔
153
    this.includeCauseWithStatus = includeCauseWithStatus;
1✔
154
  }
1✔
155

156
  public InProcessTransport(
157
      SocketAddress address, int maxInboundMetadataSize, String authority, String userAgent,
158
      Attributes eagAttrs, boolean includeCauseWithStatus) {
159
    this(address, maxInboundMetadataSize, authority, userAgent, eagAttrs,
1✔
160
        Optional.<ServerListener>absent(), includeCauseWithStatus);
1✔
161
  }
1✔
162

163
  InProcessTransport(
164
      String name, int maxInboundMetadataSize, String authority, String userAgent,
165
      Attributes eagAttrs, ObjectPool<ScheduledExecutorService> serverSchedulerPool,
166
      List<ServerStreamTracer.Factory> serverStreamTracerFactories,
167
      ServerListener serverListener, boolean includeCauseWithStatus) {
168
    this(new InProcessSocketAddress(name), maxInboundMetadataSize, authority, userAgent, eagAttrs,
1✔
169
        Optional.of(serverListener), includeCauseWithStatus);
1✔
170
    this.serverMaxInboundMetadataSize = maxInboundMetadataSize;
1✔
171
    this.serverSchedulerPool = serverSchedulerPool;
1✔
172
    this.serverStreamTracerFactories = serverStreamTracerFactories;
1✔
173
  }
1✔
174

175
  @CheckReturnValue
176
  @Override
177
  public synchronized Runnable start(ManagedClientTransport.Listener listener) {
178
    this.clientTransportListener = listener;
1✔
179
    if (optionalServerListener.isPresent()) {
1✔
180
      serverScheduler = serverSchedulerPool.getObject();
1✔
181
      serverTransportListener = optionalServerListener.get().transportCreated(this);
1✔
182
    } else {
183
      InProcessServer server = InProcessServer.findServer(address);
1✔
184
      if (server != null) {
1✔
185
        serverMaxInboundMetadataSize = server.getMaxInboundMetadataSize();
1✔
186
        serverSchedulerPool = server.getScheduledExecutorServicePool();
1✔
187
        serverScheduler = serverSchedulerPool.getObject();
1✔
188
        serverStreamTracerFactories = server.getStreamTracerFactories();
1✔
189
        // Must be semi-initialized; past this point, can begin receiving requests
190
        serverTransportListener = server.register(this);
1✔
191
      }
192
    }
193
    if (serverTransportListener == null) {
1✔
194
      shutdownStatus = Status.UNAVAILABLE.withDescription("Could not find server: " + address);
1✔
195
      final Status localShutdownStatus = shutdownStatus;
1✔
196
      return new Runnable() {
1✔
197
        @Override
198
        public void run() {
199
          synchronized (InProcessTransport.this) {
1✔
200
            notifyShutdown(localShutdownStatus);
1✔
201
            notifyTerminated();
1✔
202
          }
1✔
203
        }
1✔
204
      };
205
    }
206
    Attributes serverTransportAttrs = Attributes.newBuilder()
1✔
207
        .set(Grpc.TRANSPORT_ATTR_REMOTE_ADDR, address)
1✔
208
        .set(Grpc.TRANSPORT_ATTR_LOCAL_ADDR, address)
1✔
209
        .build();
1✔
210
    serverStreamAttributes = serverTransportListener.transportReady(serverTransportAttrs);
1✔
211
    attributes = clientTransportListener.filterTransport(attributes);
1✔
212
    clientTransportListener.transportReady();
1✔
213
    return null;
1✔
214
  }
215

216
  @Override
217
  public synchronized ClientStream newStream(
218
      MethodDescriptor<?, ?> method, Metadata headers, CallOptions callOptions,
219
      ClientStreamTracer[] tracers) {
220
    StatsTraceContext statsTraceContext =
1✔
221
        StatsTraceContext.newClientContext(tracers, getAttributes(), headers);
1✔
222
    if (shutdownStatus != null) {
1✔
223
      return failedClientStream(statsTraceContext, shutdownStatus);
1✔
224
    }
225

226
    headers.put(GrpcUtil.USER_AGENT_KEY, userAgent);
1✔
227

228
    if (serverMaxInboundMetadataSize != Integer.MAX_VALUE) {
1✔
229
      int metadataSize = metadataSize(headers);
1✔
230
      if (metadataSize > serverMaxInboundMetadataSize) {
1✔
231
        // Other transports would compute a status with:
232
        //   GrpcUtil.httpStatusToGrpcStatus(431 /* Request Header Fields Too Large */);
233
        // However, that isn't handled specially today, so we'd leak HTTP-isms even though we're
234
        // in-process. We go ahead and make a Status, which may need to be updated if
235
        // statuscodes.md is updated.
236
        Status status = Status.RESOURCE_EXHAUSTED.withDescription(
1✔
237
            String.format(
1✔
238
                Locale.US,
239
                "Request metadata larger than %d: %d",
240
                serverMaxInboundMetadataSize,
1✔
241
                metadataSize));
1✔
242
        return failedClientStream(statsTraceContext, status);
1✔
243
      }
244
    }
245

246
    return new InProcessStream(method, headers, callOptions, authority, statsTraceContext)
1✔
247
        .clientStream;
1✔
248
  }
249

250
  private ClientStream failedClientStream(
251
      final StatsTraceContext statsTraceCtx, final Status status) {
252
    return new NoopClientStream() {
1✔
253
        @Override
254
        public void start(ClientStreamListener listener) {
255
          statsTraceCtx.clientOutboundHeaders();
1✔
256
          statsTraceCtx.streamClosed(status);
1✔
257
          listener.closed(status, RpcProgress.PROCESSED, new Metadata());
1✔
258
        }
1✔
259
      };
260
  }
261

262
  @Override
263
  public synchronized void ping(final PingCallback callback, Executor executor) {
264
    if (terminated) {
1✔
265
      final Status shutdownStatus = this.shutdownStatus;
1✔
266
      executor.execute(new Runnable() {
1✔
267
        @Override
268
        public void run() {
269
          callback.onFailure(shutdownStatus.asRuntimeException());
1✔
270
        }
1✔
271
      });
272
    } else {
1✔
273
      executor.execute(new Runnable() {
1✔
274
        @Override
275
        public void run() {
276
          callback.onSuccess(0);
1✔
277
        }
1✔
278
      });
279
    }
280
  }
1✔
281

282
  @Override
283
  public synchronized void shutdown(Status reason) {
284
    // Can be called multiple times: once for ManagedClientTransport, once for ServerTransport.
285
    if (shutdown) {
1✔
286
      return;
1✔
287
    }
288
    shutdownStatus = reason;
1✔
289
    notifyShutdown(reason);
1✔
290
    if (streams.isEmpty()) {
1✔
291
      notifyTerminated();
1✔
292
    }
293
  }
1✔
294

295
  @Override
296
  public synchronized void shutdown() {
297
    shutdown(Status.UNAVAILABLE.withDescription("InProcessTransport shutdown by the server-side"));
1✔
298
  }
1✔
299

300
  @Override
301
  public void shutdownNow(Status reason) {
302
    checkNotNull(reason, "reason");
1✔
303
    List<InProcessStream> streamsCopy;
304
    synchronized (this) {
1✔
305
      shutdown(reason);
1✔
306
      if (terminated) {
1✔
307
        return;
1✔
308
      }
309
      streamsCopy = new ArrayList<>(streams);
1✔
310
    }
1✔
311
    for (InProcessStream stream : streamsCopy) {
1✔
312
      stream.clientStream.cancel(reason);
1✔
313
    }
1✔
314
  }
1✔
315

316
  @Override
317
  public String toString() {
318
    return MoreObjects.toStringHelper(this)
×
319
        .add("logId", logId.getId())
×
320
        .add("address", address)
×
321
        .toString();
×
322
  }
323

324
  @Override
325
  public InternalLogId getLogId() {
326
    return logId;
1✔
327
  }
328

329
  @Override
330
  public Attributes getAttributes() {
331
    return attributes;
1✔
332
  }
333

334
  @Override
335
  public ScheduledExecutorService getScheduledExecutorService() {
336
    return serverScheduler;
1✔
337
  }
338

339
  @Override
340
  public ListenableFuture<SocketStats> getStats() {
341
    SettableFuture<SocketStats> ret = SettableFuture.create();
×
342
    ret.set(null);
×
343
    return ret;
×
344
  }
345

346
  private synchronized void notifyShutdown(Status s) {
347
    if (shutdown) {
1✔
348
      return;
×
349
    }
350
    shutdown = true;
1✔
351
    clientTransportListener.transportShutdown(s);
1✔
352
  }
1✔
353

354
  private synchronized void notifyTerminated() {
355
    if (terminated) {
1✔
356
      return;
×
357
    }
358
    terminated = true;
1✔
359
    if (serverScheduler != null) {
1✔
360
      serverScheduler = serverSchedulerPool.returnObject(serverScheduler);
1✔
361
    }
362
    clientTransportListener.transportTerminated();
1✔
363
    if (serverTransportListener != null) {
1✔
364
      serverTransportListener.transportTerminated();
1✔
365
    }
366
  }
1✔
367

368
  private static int metadataSize(Metadata metadata) {
369
    byte[][] serialized = InternalMetadata.serialize(metadata);
1✔
370
    if (serialized == null) {
1✔
371
      return 0;
×
372
    }
373
    // Calculate based on SETTINGS_MAX_HEADER_LIST_SIZE in RFC 7540 §6.5.2. We could use something
374
    // different, but it's "sane."
375
    long size = 0;
1✔
376
    for (int i = 0; i < serialized.length; i += 2) {
1✔
377
      size += 32 + serialized[i].length + serialized[i + 1].length;
1✔
378
    }
379
    size = Math.min(size, Integer.MAX_VALUE);
1✔
380
    return (int) size;
1✔
381
  }
382

383
  private class InProcessStream {
384
    private final InProcessClientStream clientStream;
385
    private final InProcessServerStream serverStream;
386
    private final CallOptions callOptions;
387
    private final Metadata headers;
388
    private final MethodDescriptor<?, ?> method;
389
    private volatile String authority;
390

391
    private InProcessStream(
392
        MethodDescriptor<?, ?> method, Metadata headers, CallOptions callOptions,
393
        String authority , StatsTraceContext statsTraceContext) {
1✔
394
      this.method = checkNotNull(method, "method");
1✔
395
      this.headers = checkNotNull(headers, "headers");
1✔
396
      this.callOptions = checkNotNull(callOptions, "callOptions");
1✔
397
      this.authority = authority;
1✔
398
      this.clientStream = new InProcessClientStream(callOptions, statsTraceContext);
1✔
399
      this.serverStream = new InProcessServerStream(method, headers);
1✔
400
    }
1✔
401

402
    // Can be called multiple times due to races on both client and server closing at same time.
403
    private void streamClosed() {
404
      synchronized (InProcessTransport.this) {
1✔
405
        boolean justRemovedAnElement = streams.remove(this);
1✔
406
        if (GrpcUtil.shouldBeCountedForInUse(callOptions)) {
1✔
407
          inUseState.updateObjectInUse(this, false);
1✔
408
        }
409
        if (streams.isEmpty() && justRemovedAnElement) {
1✔
410
          if (shutdown) {
1✔
411
            notifyTerminated();
1✔
412
          }
413
        }
414
      }
1✔
415
    }
1✔
416

417
    private class InProcessServerStream implements ServerStream {
418
      final StatsTraceContext statsTraceCtx;
419
      // All callbacks must run in syncContext to avoid possibility of deadlock in direct executors
420
      private ClientStreamListener clientStreamListener;
421
      private final SynchronizationContext syncContext =
1✔
422
          new SynchronizationContext(uncaughtExceptionHandler);
1✔
423
      @GuardedBy("this")
424
      private int clientRequested;
425
      @GuardedBy("this")
1✔
426
      private ArrayDeque<StreamListener.MessageProducer> clientReceiveQueue =
427
          new ArrayDeque<>();
428
      @GuardedBy("this")
429
      private Status clientNotifyStatus;
430
      @GuardedBy("this")
431
      private Metadata clientNotifyTrailers;
432
      // Only is intended to prevent double-close when client cancels.
433
      @GuardedBy("this")
434
      private boolean closed;
435
      @GuardedBy("this")
436
      private int outboundSeqNo;
437

438
      InProcessServerStream(MethodDescriptor<?, ?> method, Metadata headers) {
1✔
439
        statsTraceCtx = StatsTraceContext.newServerContext(
1✔
440
            serverStreamTracerFactories, method.getFullMethodName(), headers);
1✔
441
      }
1✔
442

443
      private synchronized void setListener(ClientStreamListener listener) {
444
        clientStreamListener = listener;
1✔
445
      }
1✔
446

447
      @Override
448
      public void setListener(ServerStreamListener serverStreamListener) {
449
        clientStream.setListener(serverStreamListener);
1✔
450
      }
1✔
451

452
      @Override
453
      public void request(int numMessages) {
454
        boolean onReady = clientStream.serverRequested(numMessages);
1✔
455
        if (onReady) {
1✔
456
          synchronized (this) {
1✔
457
            if (!closed) {
1✔
458
              syncContext.executeLater(() -> clientStreamListener.onReady());
1✔
459
            }
460
          }
1✔
461
        }
462
        syncContext.drain();
1✔
463
      }
1✔
464

465
      // This method is the only reason we have to synchronize field accesses.
466
      /**
467
       * Client requested more messages.
468
       *
469
       * @return whether onReady should be called on the server
470
       */
471
      private boolean clientRequested(int numMessages) {
472
        boolean previouslyReady;
473
        boolean nowReady;
474
        synchronized (this) {
1✔
475
          if (closed) {
1✔
476
            return false;
1✔
477
          }
478

479
          previouslyReady = clientRequested > 0;
1✔
480
          clientRequested += numMessages;
1✔
481
          while (clientRequested > 0 && !clientReceiveQueue.isEmpty()) {
1✔
482
            clientRequested--;
1✔
483
            StreamListener.MessageProducer producer = clientReceiveQueue.poll();
1✔
484
            syncContext.executeLater(() -> clientStreamListener.messagesAvailable(producer));
1✔
485
          }
1✔
486

487
          if (clientReceiveQueue.isEmpty() && clientNotifyStatus != null) {
1✔
488
            closed = true;
1✔
489
            clientStream.statsTraceCtx.clientInboundTrailers(clientNotifyTrailers);
1✔
490
            clientStream.statsTraceCtx.streamClosed(clientNotifyStatus);
1✔
491
            Status notifyStatus = this.clientNotifyStatus;
1✔
492
            Metadata notifyTrailers = this.clientNotifyTrailers;
1✔
493
            syncContext.executeLater(() ->
1✔
494
                clientStreamListener.closed(notifyStatus, RpcProgress.PROCESSED, notifyTrailers));
1✔
495
          }
496

497
          nowReady = clientRequested > 0;
1✔
498
        }
1✔
499

500
        syncContext.drain();
1✔
501
        return !previouslyReady && nowReady;
1✔
502
      }
503

504
      private void clientCancelled(Status status) {
505
        internalCancel(status);
1✔
506
      }
1✔
507

508
      @Override
509
      public void writeMessage(InputStream message) {
510
        synchronized (this) {
1✔
511
          if (closed) {
1✔
512
            return;
1✔
513
          }
514
          statsTraceCtx.outboundMessage(outboundSeqNo);
1✔
515
          statsTraceCtx.outboundMessageSent(outboundSeqNo, -1, -1);
1✔
516
          clientStream.statsTraceCtx.inboundMessage(outboundSeqNo);
1✔
517
          clientStream.statsTraceCtx.inboundMessageRead(outboundSeqNo, -1, -1);
1✔
518
          outboundSeqNo++;
1✔
519
          StreamListener.MessageProducer producer = new SingleMessageProducer(message);
1✔
520
          if (clientRequested > 0) {
1✔
521
            clientRequested--;
1✔
522
            syncContext.executeLater(() -> clientStreamListener.messagesAvailable(producer));
1✔
523
          } else {
524
            clientReceiveQueue.add(producer);
1✔
525
          }
526
        }
1✔
527

528
        syncContext.drain();
1✔
529
      }
1✔
530

531
      @Override
532
      public void flush() {}
1✔
533

534
      @Override
535
      public synchronized boolean isReady() {
536
        if (closed) {
1✔
537
          return false;
×
538
        }
539
        return clientRequested > 0;
1✔
540
      }
541

542
      @Override
543
      public void writeHeaders(Metadata headers, boolean flush) {
544
        if (clientMaxInboundMetadataSize != Integer.MAX_VALUE) {
1✔
545
          int metadataSize = metadataSize(headers);
1✔
546
          if (metadataSize > clientMaxInboundMetadataSize) {
1✔
547
            Status serverStatus = Status.CANCELLED.withDescription("Client cancelled the RPC");
1✔
548
            clientStream.serverClosed(serverStatus, serverStatus);
1✔
549
            // Other transports provide very little information in this case. We go ahead and make a
550
            // Status, which may need to be updated if statuscodes.md is updated.
551
            Status failedStatus = Status.RESOURCE_EXHAUSTED.withDescription(
1✔
552
                String.format(
1✔
553
                    Locale.US,
554
                    "Response header metadata larger than %d: %d",
555
                    clientMaxInboundMetadataSize,
1✔
556
                    metadataSize));
1✔
557
            notifyClientClose(failedStatus, new Metadata());
1✔
558
            return;
1✔
559
          }
560
        }
561

562
        synchronized (this) {
1✔
563
          if (closed) {
1✔
564
            return;
1✔
565
          }
566

567
          clientStream.statsTraceCtx.clientInboundHeaders(headers);
1✔
568
          syncContext.executeLater(() -> clientStreamListener.headersRead(headers));
1✔
569
        }
1✔
570
        syncContext.drain();
1✔
571
      }
1✔
572

573
      @Override
574
      public void close(Status status, Metadata trailers) {
575
        // clientStream.serverClosed must happen before clientStreamListener.closed, otherwise
576
        // clientStreamListener.closed can trigger clientStream.cancel (see code in
577
        // ClientCalls.blockingUnaryCall), which may race with clientStream.serverClosed as both are
578
        // calling internalCancel().
579
        clientStream.serverClosed(Status.OK, status);
1✔
580

581
        if (clientMaxInboundMetadataSize != Integer.MAX_VALUE) {
1✔
582
          int statusSize = status.getDescription() == null ? 0 : status.getDescription().length();
1✔
583
          // Go ahead and throw in the status description's length, since that could be very long.
584
          int metadataSize = metadataSize(trailers) + statusSize;
1✔
585
          if (metadataSize > clientMaxInboundMetadataSize) {
1✔
586
            // Override the status for the client, but not the server. Transports do not guarantee
587
            // notifying the server of the failure.
588

589
            // Other transports provide very little information in this case. We go ahead and make a
590
            // Status, which may need to be updated if statuscodes.md is updated.
591
            status = Status.RESOURCE_EXHAUSTED.withDescription(
1✔
592
                String.format(
1✔
593
                    Locale.US,
594
                    "Response header metadata larger than %d: %d",
595
                    clientMaxInboundMetadataSize,
1✔
596
                    metadataSize));
1✔
597
            trailers = new Metadata();
1✔
598
          }
599
        }
600

601
        notifyClientClose(status, trailers);
1✔
602
      }
1✔
603

604
      /** clientStream.serverClosed() must be called before this method */
605
      private void notifyClientClose(Status status, Metadata trailers) {
606
        Status clientStatus = cleanStatus(status, includeCauseWithStatus);
1✔
607
        synchronized (this) {
1✔
608
          if (closed) {
1✔
609
            return;
1✔
610
          }
611
          if (clientReceiveQueue.isEmpty()) {
1✔
612
            closed = true;
1✔
613
            clientStream.statsTraceCtx.clientInboundTrailers(trailers);
1✔
614
            clientStream.statsTraceCtx.streamClosed(clientStatus);
1✔
615
            syncContext.executeLater(
1✔
616
                () -> clientStreamListener.closed(clientStatus, RpcProgress.PROCESSED, trailers));
1✔
617
          } else {
618
            clientNotifyStatus = clientStatus;
1✔
619
            clientNotifyTrailers = trailers;
1✔
620
          }
621
        }
1✔
622
        syncContext.drain();
1✔
623
        streamClosed();
1✔
624
      }
1✔
625

626
      @Override
627
      public void cancel(Status status) {
628
        if (!internalCancel(Status.CANCELLED.withDescription("server cancelled stream"))) {
1✔
629
          return;
1✔
630
        }
631
        clientStream.serverClosed(status, status);
1✔
632
        streamClosed();
1✔
633
      }
1✔
634

635
      private boolean internalCancel(Status clientStatus) {
636
        synchronized (this) {
1✔
637
          if (closed) {
1✔
638
            return false;
1✔
639
          }
640
          closed = true;
1✔
641
          StreamListener.MessageProducer producer;
642
          while ((producer = clientReceiveQueue.poll()) != null) {
1✔
643
            InputStream message;
644
            while ((message = producer.next()) != null) {
×
645
              try {
646
                message.close();
×
647
              } catch (Throwable t) {
×
648
                log.log(Level.WARNING, "Exception closing stream", t);
×
649
              }
×
650
            }
651
          }
×
652
          clientStream.statsTraceCtx.streamClosed(clientStatus);
1✔
653
          syncContext.executeLater(
1✔
654
              () ->
655
                  clientStreamListener.closed(clientStatus, RpcProgress.PROCESSED, new Metadata()));
1✔
656
        }
1✔
657
        syncContext.drain();
1✔
658
        return true;
1✔
659
      }
660

661
      @Override
662
      public void setMessageCompression(boolean enable) {
663
        // noop
664
      }
×
665

666
      @Override
667
      public void optimizeForDirectExecutor() {}
1✔
668

669
      @Override
670
      public void setCompressor(Compressor compressor) {}
1✔
671

672
      @Override
673
      public void setDecompressor(Decompressor decompressor) {}
×
674

675
      @Override public Attributes getAttributes() {
676
        return serverStreamAttributes;
1✔
677
      }
678

679
      @Override
680
      public String getAuthority() {
681
        return InProcessStream.this.authority;
1✔
682
      }
683

684
      @Override
685
      public StatsTraceContext statsTraceContext() {
686
        return statsTraceCtx;
1✔
687
      }
688

689
      @Override
690
      public int streamId() {
691
        return -1;
1✔
692
      }
693

694
      @Override
695
      public void setOnReadyThreshold(int numBytes) {
696
        // noop
697
      }
×
698
    }
699

700
    private class InProcessClientStream implements ClientStream {
701
      final StatsTraceContext statsTraceCtx;
702
      final CallOptions callOptions;
703
      // All callbacks must run in syncContext to avoid possibility of deadlock in direct executors
704
      private ServerStreamListener serverStreamListener;
705
      private final SynchronizationContext syncContext =
1✔
706
          new SynchronizationContext(uncaughtExceptionHandler);
1✔
707
      @GuardedBy("this")
708
      private int serverRequested;
709
      @GuardedBy("this")
1✔
710
      private ArrayDeque<StreamListener.MessageProducer> serverReceiveQueue =
711
          new ArrayDeque<>();
712
      @GuardedBy("this")
713
      private boolean serverNotifyHalfClose;
714
      // Only is intended to prevent double-close when server closes.
715
      @GuardedBy("this")
716
      private boolean closed;
717
      @GuardedBy("this")
718
      private int outboundSeqNo;
719

720
      InProcessClientStream(
721
          CallOptions callOptions, StatsTraceContext statsTraceContext) {
1✔
722
        this.callOptions = callOptions;
1✔
723
        statsTraceCtx = statsTraceContext;
1✔
724
      }
1✔
725

726
      private synchronized void setListener(ServerStreamListener listener) {
727
        this.serverStreamListener = listener;
1✔
728
      }
1✔
729

730
      @Override
731
      public void request(int numMessages) {
732
        boolean onReady = serverStream.clientRequested(numMessages);
1✔
733
        if (onReady) {
1✔
734
          synchronized (this) {
1✔
735
            if (!closed) {
1✔
736
              syncContext.executeLater(() -> serverStreamListener.onReady());
1✔
737
            }
738
          }
1✔
739
          syncContext.drain();
1✔
740
        }
741
      }
1✔
742

743
      // This method is the only reason we have to synchronize field accesses.
744
      /**
745
       * Client requested more messages.
746
       *
747
       * @return whether onReady should be called on the server
748
       */
749
      private boolean serverRequested(int numMessages) {
750
        boolean previouslyReady;
751
        boolean nowReady;
752
        synchronized (this) {
1✔
753
          if (closed) {
1✔
754
            return false;
1✔
755
          }
756
          previouslyReady = serverRequested > 0;
1✔
757
          serverRequested += numMessages;
1✔
758

759
          while (serverRequested > 0 && !serverReceiveQueue.isEmpty()) {
1✔
760
            serverRequested--;
1✔
761
            StreamListener.MessageProducer producer = serverReceiveQueue.poll();
1✔
762
            syncContext.executeLater(() -> serverStreamListener.messagesAvailable(producer));
1✔
763
          }
1✔
764

765
          if (serverReceiveQueue.isEmpty() && serverNotifyHalfClose) {
1✔
766
            serverNotifyHalfClose = false;
1✔
767
            syncContext.executeLater(() -> serverStreamListener.halfClosed());
1✔
768
          }
769
          nowReady = serverRequested > 0;
1✔
770
        }
1✔
771
        syncContext.drain();
1✔
772
        return !previouslyReady && nowReady;
1✔
773
      }
774

775
      private void serverClosed(Status serverListenerStatus, Status serverTracerStatus) {
776
        internalCancel(serverListenerStatus, serverTracerStatus);
1✔
777
      }
1✔
778

779
      @Override
780
      public void writeMessage(InputStream message) {
781
        synchronized (this) {
1✔
782
          if (closed) {
1✔
783
            return;
1✔
784
          }
785
          statsTraceCtx.outboundMessage(outboundSeqNo);
1✔
786
          statsTraceCtx.outboundMessageSent(outboundSeqNo, -1, -1);
1✔
787
          serverStream.statsTraceCtx.inboundMessage(outboundSeqNo);
1✔
788
          serverStream.statsTraceCtx.inboundMessageRead(outboundSeqNo, -1, -1);
1✔
789
          outboundSeqNo++;
1✔
790
          StreamListener.MessageProducer producer = new SingleMessageProducer(message);
1✔
791
          if (serverRequested > 0) {
1✔
792
            serverRequested--;
1✔
793
            syncContext.executeLater(() -> serverStreamListener.messagesAvailable(producer));
1✔
794
          } else {
795
            serverReceiveQueue.add(producer);
1✔
796
          }
797
        }
1✔
798
        syncContext.drain();
1✔
799
      }
1✔
800

801
      @Override
802
      public void flush() {}
1✔
803

804
      @Override
805
      public synchronized boolean isReady() {
806
        if (closed) {
1✔
807
          return false;
×
808
        }
809
        return serverRequested > 0;
1✔
810
      }
811

812
      // Must be thread-safe for shutdownNow()
813
      @Override
814
      public void cancel(Status reason) {
815
        Status serverStatus = cleanStatus(reason, includeCauseWithStatus);
1✔
816
        if (!internalCancel(serverStatus, serverStatus)) {
1✔
817
          return;
1✔
818
        }
819
        serverStream.clientCancelled(reason);
1✔
820
        streamClosed();
1✔
821
      }
1✔
822

823
      private boolean internalCancel(
824
          Status serverListenerStatus, Status serverTracerStatus) {
825
        synchronized (this) {
1✔
826
          if (closed) {
1✔
827
            return false;
1✔
828
          }
829
          closed = true;
1✔
830

831
          StreamListener.MessageProducer producer;
832
          while ((producer = serverReceiveQueue.poll()) != null) {
1✔
833
            InputStream message;
834
            while ((message = producer.next()) != null) {
1✔
835
              try {
836
                message.close();
1✔
837
              } catch (Throwable t) {
×
838
                log.log(Level.WARNING, "Exception closing stream", t);
×
839
              }
1✔
840
            }
841
          }
1✔
842
          serverStream.statsTraceCtx.streamClosed(serverTracerStatus);
1✔
843
          syncContext.executeLater(() -> serverStreamListener.closed(serverListenerStatus));
1✔
844
        }
1✔
845
        syncContext.drain();
1✔
846
        return true;
1✔
847
      }
848

849
      @Override
850
      public void halfClose() {
851
        synchronized (this) {
1✔
852
          if (closed) {
1✔
853
            return;
1✔
854
          }
855
          if (serverReceiveQueue.isEmpty()) {
1✔
856
            syncContext.executeLater(() -> serverStreamListener.halfClosed());
1✔
857
          } else {
858
            serverNotifyHalfClose = true;
1✔
859
          }
860
        }
1✔
861
        syncContext.drain();
1✔
862
      }
1✔
863

864
      @Override
865
      public void setMessageCompression(boolean enable) {}
×
866

867
      @Override
868
      public void setAuthority(String string) {
869
        InProcessStream.this.authority = string;
×
870
      }
×
871

872
      @Override
873
      public void start(ClientStreamListener listener) {
874
        serverStream.setListener(listener);
1✔
875

876
        synchronized (InProcessTransport.this) {
1✔
877
          statsTraceCtx.clientOutboundHeaders();
1✔
878
          streams.add(InProcessTransport.InProcessStream.this);
1✔
879
          if (GrpcUtil.shouldBeCountedForInUse(callOptions)) {
1✔
880
            inUseState.updateObjectInUse(InProcessTransport.InProcessStream.this, true);
1✔
881
          }
882
          serverTransportListener.streamCreated(serverStream, method.getFullMethodName(), headers);
1✔
883
        }
1✔
884
      }
1✔
885

886
      @Override
887
      public Attributes getAttributes() {
888
        return attributes;
1✔
889
      }
890

891
      @Override
892
      public void optimizeForDirectExecutor() {}
1✔
893

894
      @Override
895
      public void setCompressor(Compressor compressor) {}
1✔
896

897
      @Override
898
      public void setFullStreamDecompression(boolean fullStreamDecompression) {}
×
899

900
      @Override
901
      public void setDecompressorRegistry(DecompressorRegistry decompressorRegistry) {}
1✔
902

903
      @Override
904
      public void setMaxInboundMessageSize(int maxSize) {}
1✔
905

906
      @Override
907
      public void setMaxOutboundMessageSize(int maxSize) {}
1✔
908

909
      @Override
910
      public void setDeadline(Deadline deadline) {
911
        headers.discardAll(TIMEOUT_KEY);
1✔
912
        long effectiveTimeout = max(0, deadline.timeRemaining(TimeUnit.NANOSECONDS));
1✔
913
        headers.put(TIMEOUT_KEY, effectiveTimeout);
1✔
914
      }
1✔
915

916
      @Override
917
      public void appendTimeoutInsight(InsightBuilder insight) {
918
      }
1✔
919
    }
920
  }
921

922
  /**
923
   * Returns a new status with the same code and description.
924
   * If includeCauseWithStatus is true, cause is also included.
925
   *
926
   * <p>For InProcess transport to behave in the same way as the other transports,
927
   * when exchanging statuses between client and server and vice versa,
928
   * the cause should be excluded from the status.
929
   * For easier debugging, the status may be optionally included.
930
   */
931
  private static Status cleanStatus(Status status, boolean includeCauseWithStatus) {
932
    if (status == null) {
1✔
933
      return null;
×
934
    }
935
    Status clientStatus = Status
1✔
936
        .fromCodeValue(status.getCode().value())
1✔
937
        .withDescription(status.getDescription());
1✔
938
    if (includeCauseWithStatus) {
1✔
939
      clientStatus = clientStatus.withCause(status.getCause());
1✔
940
    }
941
    return clientStatus;
1✔
942
  }
943

944
  private static class SingleMessageProducer implements StreamListener.MessageProducer {
945
    private InputStream message;
946

947
    private SingleMessageProducer(InputStream message) {
1✔
948
      this.message = message;
1✔
949
    }
1✔
950

951
    @Nullable
952
    @Override
953
    public InputStream next() {
954
      InputStream messageToReturn = message;
1✔
955
      message = null;
1✔
956
      return messageToReturn;
1✔
957
    }
958
  }
959
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc