• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

grpc / grpc-java / #19531

29 Oct 2024 09:09PM UTC coverage: 84.591% (+0.01%) from 84.581%
#19531

push

github

ejona86
Update README etc to reference 1.68.1

33931 of 40112 relevant lines covered (84.59%)

0.85 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

86.3
/../inprocess/src/main/java/io/grpc/inprocess/InProcessTransport.java
1
/*
2
 * Copyright 2015 The gRPC Authors
3
 *
4
 * Licensed under the Apache License, Version 2.0 (the "License");
5
 * you may not use this file except in compliance with the License.
6
 * You may obtain a copy of the License at
7
 *
8
 *     http://www.apache.org/licenses/LICENSE-2.0
9
 *
10
 * Unless required by applicable law or agreed to in writing, software
11
 * distributed under the License is distributed on an "AS IS" BASIS,
12
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
 * See the License for the specific language governing permissions and
14
 * limitations under the License.
15
 */
16

17
package io.grpc.inprocess;
18

19
import static com.google.common.base.Preconditions.checkNotNull;
20
import static io.grpc.internal.GrpcUtil.TIMEOUT_KEY;
21
import static java.lang.Math.max;
22

23
import com.google.common.base.MoreObjects;
24
import com.google.common.io.ByteStreams;
25
import com.google.common.util.concurrent.ListenableFuture;
26
import com.google.common.util.concurrent.SettableFuture;
27
import io.grpc.Attributes;
28
import io.grpc.CallOptions;
29
import io.grpc.ClientStreamTracer;
30
import io.grpc.Compressor;
31
import io.grpc.Deadline;
32
import io.grpc.Decompressor;
33
import io.grpc.DecompressorRegistry;
34
import io.grpc.Grpc;
35
import io.grpc.InternalChannelz.SocketStats;
36
import io.grpc.InternalLogId;
37
import io.grpc.InternalMetadata;
38
import io.grpc.KnownLength;
39
import io.grpc.Metadata;
40
import io.grpc.MethodDescriptor;
41
import io.grpc.SecurityLevel;
42
import io.grpc.ServerStreamTracer;
43
import io.grpc.Status;
44
import io.grpc.SynchronizationContext;
45
import io.grpc.internal.ClientStream;
46
import io.grpc.internal.ClientStreamListener;
47
import io.grpc.internal.ClientStreamListener.RpcProgress;
48
import io.grpc.internal.ConnectionClientTransport;
49
import io.grpc.internal.GrpcAttributes;
50
import io.grpc.internal.GrpcUtil;
51
import io.grpc.internal.InUseStateAggregator;
52
import io.grpc.internal.InsightBuilder;
53
import io.grpc.internal.ManagedClientTransport;
54
import io.grpc.internal.NoopClientStream;
55
import io.grpc.internal.ObjectPool;
56
import io.grpc.internal.ServerStream;
57
import io.grpc.internal.ServerStreamListener;
58
import io.grpc.internal.ServerTransport;
59
import io.grpc.internal.ServerTransportListener;
60
import io.grpc.internal.StatsTraceContext;
61
import io.grpc.internal.StreamListener;
62
import java.io.ByteArrayInputStream;
63
import java.io.InputStream;
64
import java.net.SocketAddress;
65
import java.util.ArrayDeque;
66
import java.util.ArrayList;
67
import java.util.Collections;
68
import java.util.IdentityHashMap;
69
import java.util.List;
70
import java.util.Locale;
71
import java.util.Set;
72
import java.util.concurrent.Executor;
73
import java.util.concurrent.ScheduledExecutorService;
74
import java.util.concurrent.TimeUnit;
75
import java.util.logging.Level;
76
import java.util.logging.Logger;
77
import javax.annotation.CheckReturnValue;
78
import javax.annotation.Nullable;
79
import javax.annotation.concurrent.GuardedBy;
80
import javax.annotation.concurrent.ThreadSafe;
81

82
@ThreadSafe
83
final class InProcessTransport implements ServerTransport, ConnectionClientTransport {
84
  private static final Logger log = Logger.getLogger(InProcessTransport.class.getName());
1✔
85
  static boolean isEnabledSupportTracingMessageSizes =
1✔
86
      GrpcUtil.getFlag("GRPC_EXPERIMENTAL_SUPPORT_TRACING_MESSAGE_SIZES", false);
1✔
87

88
  private final InternalLogId logId;
89
  private final SocketAddress address;
90
  private final int clientMaxInboundMetadataSize;
91
  private final String authority;
92
  private final String userAgent;
93
  private int serverMaxInboundMetadataSize;
94
  private final boolean includeCauseWithStatus;
95
  private ObjectPool<ScheduledExecutorService> serverSchedulerPool;
96
  private ScheduledExecutorService serverScheduler;
97
  private ServerTransportListener serverTransportListener;
98
  private Attributes serverStreamAttributes;
99
  private ManagedClientTransport.Listener clientTransportListener;
100
  // The size is assumed from the sender's side.
101
  private final long assumedMessageSize;
102
  @GuardedBy("this")
103
  private boolean shutdown;
104
  @GuardedBy("this")
105
  private boolean terminated;
106
  @GuardedBy("this")
107
  private Status shutdownStatus;
108
  @GuardedBy("this")
1✔
109
  private final Set<InProcessStream> streams = Collections.newSetFromMap(
1✔
110
          new IdentityHashMap<InProcessStream, Boolean>());
111
  @GuardedBy("this")
112
  private List<ServerStreamTracer.Factory> serverStreamTracerFactories;
113
  private Attributes attributes;
114

115
  private Thread.UncaughtExceptionHandler uncaughtExceptionHandler =
1✔
116
      new Thread.UncaughtExceptionHandler() {
1✔
117
        @Override
118
        public void uncaughtException(Thread t, Throwable e) {
119
          if (e instanceof Error) {
×
120
            throw new Error(e);
×
121
          }
122
          throw new RuntimeException(e);
×
123
        }
124
      };
125

126

127
  @GuardedBy("this")
1✔
128
  private final InUseStateAggregator<InProcessStream> inUseState =
129
      new InUseStateAggregator<InProcessStream>() {
1✔
130
        @Override
131
        protected void handleInUse() {
132
          clientTransportListener.transportInUse(true);
1✔
133
        }
1✔
134

135
        @Override
136
        protected void handleNotInUse() {
137
          clientTransportListener.transportInUse(false);
1✔
138
        }
1✔
139
      };
140

141
  public InProcessTransport(SocketAddress address, int maxInboundMetadataSize, String authority,
142
      String userAgent, Attributes eagAttrs,
143
      boolean includeCauseWithStatus, long assumedMessageSize) {
1✔
144
    this.address = address;
1✔
145
    this.clientMaxInboundMetadataSize = maxInboundMetadataSize;
1✔
146
    this.authority = authority;
1✔
147
    this.userAgent = GrpcUtil.getGrpcUserAgent("inprocess", userAgent);
1✔
148
    checkNotNull(eagAttrs, "eagAttrs");
1✔
149
    this.attributes = Attributes.newBuilder()
1✔
150
        .set(GrpcAttributes.ATTR_SECURITY_LEVEL, SecurityLevel.PRIVACY_AND_INTEGRITY)
1✔
151
        .set(GrpcAttributes.ATTR_CLIENT_EAG_ATTRS, eagAttrs)
1✔
152
        .set(Grpc.TRANSPORT_ATTR_REMOTE_ADDR, address)
1✔
153
        .set(Grpc.TRANSPORT_ATTR_LOCAL_ADDR, address)
1✔
154
        .build();
1✔
155
    logId = InternalLogId.allocate(getClass(), address.toString());
1✔
156
    this.includeCauseWithStatus = includeCauseWithStatus;
1✔
157
    this.assumedMessageSize = assumedMessageSize;
1✔
158
  }
1✔
159

160
  @CheckReturnValue
161
  @Override
162
  public synchronized Runnable start(ManagedClientTransport.Listener listener) {
163
    this.clientTransportListener = listener;
1✔
164
    InProcessServer server = InProcessServer.findServer(address);
1✔
165
    if (server != null) {
1✔
166
      serverMaxInboundMetadataSize = server.getMaxInboundMetadataSize();
1✔
167
      serverSchedulerPool = server.getScheduledExecutorServicePool();
1✔
168
      serverScheduler = serverSchedulerPool.getObject();
1✔
169
      serverStreamTracerFactories = server.getStreamTracerFactories();
1✔
170
      // Must be semi-initialized; past this point, can begin receiving requests
171
      serverTransportListener = server.register(this);
1✔
172
    }
173
    if (serverTransportListener == null) {
1✔
174
      shutdownStatus = Status.UNAVAILABLE.withDescription("Could not find server: " + address);
1✔
175
      final Status localShutdownStatus = shutdownStatus;
1✔
176
      return new Runnable() {
1✔
177
        @Override
178
        public void run() {
179
          synchronized (InProcessTransport.this) {
1✔
180
            notifyShutdown(localShutdownStatus);
1✔
181
            notifyTerminated();
1✔
182
          }
1✔
183
        }
1✔
184
      };
185
    }
186
    Attributes serverTransportAttrs = Attributes.newBuilder()
1✔
187
        .set(Grpc.TRANSPORT_ATTR_REMOTE_ADDR, address)
1✔
188
        .set(Grpc.TRANSPORT_ATTR_LOCAL_ADDR, address)
1✔
189
        .build();
1✔
190
    serverStreamAttributes = serverTransportListener.transportReady(serverTransportAttrs);
1✔
191
    attributes = clientTransportListener.filterTransport(attributes);
1✔
192
    clientTransportListener.transportReady();
1✔
193
    return null;
1✔
194
  }
195

196
  @Override
197
  public synchronized ClientStream newStream(
198
      MethodDescriptor<?, ?> method, Metadata headers, CallOptions callOptions,
199
      ClientStreamTracer[] tracers) {
200
    StatsTraceContext statsTraceContext =
1✔
201
        StatsTraceContext.newClientContext(tracers, getAttributes(), headers);
1✔
202
    if (shutdownStatus != null) {
1✔
203
      return failedClientStream(statsTraceContext, shutdownStatus);
1✔
204
    }
205

206
    headers.put(GrpcUtil.USER_AGENT_KEY, userAgent);
1✔
207

208
    if (serverMaxInboundMetadataSize != Integer.MAX_VALUE) {
1✔
209
      int metadataSize = metadataSize(headers);
1✔
210
      if (metadataSize > serverMaxInboundMetadataSize) {
1✔
211
        // Other transports would compute a status with:
212
        //   GrpcUtil.httpStatusToGrpcStatus(431 /* Request Header Fields Too Large */);
213
        // However, that isn't handled specially today, so we'd leak HTTP-isms even though we're
214
        // in-process. We go ahead and make a Status, which may need to be updated if
215
        // statuscodes.md is updated.
216
        Status status = Status.RESOURCE_EXHAUSTED.withDescription(
1✔
217
            String.format(
1✔
218
                Locale.US,
219
                "Request metadata larger than %d: %d",
220
                serverMaxInboundMetadataSize,
1✔
221
                metadataSize));
1✔
222
        return failedClientStream(statsTraceContext, status);
1✔
223
      }
224
    }
225

226
    return new InProcessStream(method, headers, callOptions, authority, statsTraceContext)
1✔
227
        .clientStream;
1✔
228
  }
229

230
  private ClientStream failedClientStream(
231
      final StatsTraceContext statsTraceCtx, final Status status) {
232
    return new NoopClientStream() {
1✔
233
        @Override
234
        public void start(ClientStreamListener listener) {
235
          statsTraceCtx.clientOutboundHeaders();
1✔
236
          statsTraceCtx.streamClosed(status);
1✔
237
          listener.closed(status, RpcProgress.PROCESSED, new Metadata());
1✔
238
        }
1✔
239
      };
240
  }
241

242
  @Override
243
  public synchronized void ping(final PingCallback callback, Executor executor) {
244
    if (terminated) {
1✔
245
      final Status shutdownStatus = this.shutdownStatus;
1✔
246
      executor.execute(new Runnable() {
1✔
247
        @Override
248
        public void run() {
249
          callback.onFailure(shutdownStatus.asRuntimeException());
1✔
250
        }
1✔
251
      });
252
    } else {
1✔
253
      executor.execute(new Runnable() {
1✔
254
        @Override
255
        public void run() {
256
          callback.onSuccess(0);
1✔
257
        }
1✔
258
      });
259
    }
260
  }
1✔
261

262
  @Override
263
  public synchronized void shutdown(Status reason) {
264
    // Can be called multiple times: once for ManagedClientTransport, once for ServerTransport.
265
    if (shutdown) {
1✔
266
      return;
1✔
267
    }
268
    shutdownStatus = reason;
1✔
269
    notifyShutdown(reason);
1✔
270
    if (streams.isEmpty()) {
1✔
271
      notifyTerminated();
1✔
272
    }
273
  }
1✔
274

275
  @Override
276
  public synchronized void shutdown() {
277
    shutdown(Status.UNAVAILABLE.withDescription("InProcessTransport shutdown by the server-side"));
1✔
278
  }
1✔
279

280
  @Override
281
  public void shutdownNow(Status reason) {
282
    checkNotNull(reason, "reason");
1✔
283
    List<InProcessStream> streamsCopy;
284
    synchronized (this) {
1✔
285
      shutdown(reason);
1✔
286
      if (terminated) {
1✔
287
        return;
1✔
288
      }
289
      streamsCopy = new ArrayList<>(streams);
1✔
290
    }
1✔
291
    for (InProcessStream stream : streamsCopy) {
1✔
292
      stream.clientStream.cancel(reason);
1✔
293
    }
1✔
294
  }
1✔
295

296
  @Override
297
  public String toString() {
298
    return MoreObjects.toStringHelper(this)
×
299
        .add("logId", logId.getId())
×
300
        .add("address", address)
×
301
        .toString();
×
302
  }
303

304
  @Override
305
  public InternalLogId getLogId() {
306
    return logId;
1✔
307
  }
308

309
  @Override
310
  public Attributes getAttributes() {
311
    return attributes;
1✔
312
  }
313

314
  @Override
315
  public ScheduledExecutorService getScheduledExecutorService() {
316
    return serverScheduler;
1✔
317
  }
318

319
  @Override
320
  public ListenableFuture<SocketStats> getStats() {
321
    SettableFuture<SocketStats> ret = SettableFuture.create();
×
322
    ret.set(null);
×
323
    return ret;
×
324
  }
325

326
  private synchronized void notifyShutdown(Status s) {
327
    if (shutdown) {
1✔
328
      return;
×
329
    }
330
    shutdown = true;
1✔
331
    clientTransportListener.transportShutdown(s);
1✔
332
  }
1✔
333

334
  private synchronized void notifyTerminated() {
335
    if (terminated) {
1✔
336
      return;
×
337
    }
338
    terminated = true;
1✔
339
    if (serverScheduler != null) {
1✔
340
      serverScheduler = serverSchedulerPool.returnObject(serverScheduler);
1✔
341
    }
342
    clientTransportListener.transportTerminated();
1✔
343
    if (serverTransportListener != null) {
1✔
344
      serverTransportListener.transportTerminated();
1✔
345
    }
346
  }
1✔
347

348
  private static int metadataSize(Metadata metadata) {
349
    byte[][] serialized = InternalMetadata.serialize(metadata);
1✔
350
    if (serialized == null) {
1✔
351
      return 0;
×
352
    }
353
    // Calculate based on SETTINGS_MAX_HEADER_LIST_SIZE in RFC 7540 §6.5.2. We could use something
354
    // different, but it's "sane."
355
    long size = 0;
1✔
356
    for (int i = 0; i < serialized.length; i += 2) {
1✔
357
      size += 32 + serialized[i].length + serialized[i + 1].length;
1✔
358
    }
359
    size = Math.min(size, Integer.MAX_VALUE);
1✔
360
    return (int) size;
1✔
361
  }
362

363
  private class InProcessStream {
364
    private final InProcessClientStream clientStream;
365
    private final InProcessServerStream serverStream;
366
    private final CallOptions callOptions;
367
    private final Metadata headers;
368
    private final MethodDescriptor<?, ?> method;
369
    private volatile String authority;
370

371
    private InProcessStream(
372
        MethodDescriptor<?, ?> method, Metadata headers, CallOptions callOptions,
373
        String authority , StatsTraceContext statsTraceContext) {
1✔
374
      this.method = checkNotNull(method, "method");
1✔
375
      this.headers = checkNotNull(headers, "headers");
1✔
376
      this.callOptions = checkNotNull(callOptions, "callOptions");
1✔
377
      this.authority = authority;
1✔
378
      this.clientStream = new InProcessClientStream(callOptions, statsTraceContext);
1✔
379
      this.serverStream = new InProcessServerStream(method, headers);
1✔
380
    }
1✔
381

382
    // Can be called multiple times due to races on both client and server closing at same time.
383
    private void streamClosed() {
384
      synchronized (InProcessTransport.this) {
1✔
385
        boolean justRemovedAnElement = streams.remove(this);
1✔
386
        if (GrpcUtil.shouldBeCountedForInUse(callOptions)) {
1✔
387
          inUseState.updateObjectInUse(this, false);
1✔
388
        }
389
        if (streams.isEmpty() && justRemovedAnElement) {
1✔
390
          if (shutdown) {
1✔
391
            notifyTerminated();
1✔
392
          }
393
        }
394
      }
1✔
395
    }
1✔
396

397
    private class InProcessServerStream implements ServerStream {
398
      final StatsTraceContext statsTraceCtx;
399
      // All callbacks must run in syncContext to avoid possibility of deadlock in direct executors
400
      private ClientStreamListener clientStreamListener;
401
      private final SynchronizationContext syncContext =
1✔
402
          new SynchronizationContext(uncaughtExceptionHandler);
1✔
403
      @GuardedBy("this")
404
      private int clientRequested;
405
      @GuardedBy("this")
1✔
406
      private ArrayDeque<StreamListener.MessageProducer> clientReceiveQueue =
407
          new ArrayDeque<>();
408
      @GuardedBy("this")
409
      private Status clientNotifyStatus;
410
      @GuardedBy("this")
411
      private Metadata clientNotifyTrailers;
412
      // Only is intended to prevent double-close when client cancels.
413
      @GuardedBy("this")
414
      private boolean closed;
415
      @GuardedBy("this")
416
      private int outboundSeqNo;
417

418
      InProcessServerStream(MethodDescriptor<?, ?> method, Metadata headers) {
1✔
419
        statsTraceCtx = StatsTraceContext.newServerContext(
1✔
420
            serverStreamTracerFactories, method.getFullMethodName(), headers);
1✔
421
      }
1✔
422

423
      private synchronized void setListener(ClientStreamListener listener) {
424
        clientStreamListener = listener;
1✔
425
      }
1✔
426

427
      @Override
428
      public void setListener(ServerStreamListener serverStreamListener) {
429
        clientStream.setListener(serverStreamListener);
1✔
430
      }
1✔
431

432
      @Override
433
      public void request(int numMessages) {
434
        boolean onReady = clientStream.serverRequested(numMessages);
1✔
435
        if (onReady) {
1✔
436
          synchronized (this) {
1✔
437
            if (!closed) {
1✔
438
              syncContext.executeLater(() -> clientStreamListener.onReady());
1✔
439
            }
440
          }
1✔
441
        }
442
        syncContext.drain();
1✔
443
      }
1✔
444

445
      // This method is the only reason we have to synchronize field accesses.
446
      /**
447
       * Client requested more messages.
448
       *
449
       * @return whether onReady should be called on the server
450
       */
451
      private boolean clientRequested(int numMessages) {
452
        boolean previouslyReady;
453
        boolean nowReady;
454
        synchronized (this) {
1✔
455
          if (closed) {
1✔
456
            return false;
1✔
457
          }
458

459
          previouslyReady = clientRequested > 0;
1✔
460
          clientRequested += numMessages;
1✔
461
          while (clientRequested > 0 && !clientReceiveQueue.isEmpty()) {
1✔
462
            clientRequested--;
1✔
463
            StreamListener.MessageProducer producer = clientReceiveQueue.poll();
1✔
464
            syncContext.executeLater(() -> clientStreamListener.messagesAvailable(producer));
1✔
465
          }
1✔
466

467
          if (clientReceiveQueue.isEmpty() && clientNotifyStatus != null) {
1✔
468
            closed = true;
1✔
469
            clientStream.statsTraceCtx.clientInboundTrailers(clientNotifyTrailers);
1✔
470
            clientStream.statsTraceCtx.streamClosed(clientNotifyStatus);
1✔
471
            Status notifyStatus = this.clientNotifyStatus;
1✔
472
            Metadata notifyTrailers = this.clientNotifyTrailers;
1✔
473
            syncContext.executeLater(() ->
1✔
474
                clientStreamListener.closed(notifyStatus, RpcProgress.PROCESSED, notifyTrailers));
1✔
475
          }
476

477
          nowReady = clientRequested > 0;
1✔
478
        }
1✔
479

480
        syncContext.drain();
1✔
481
        return !previouslyReady && nowReady;
1✔
482
      }
483

484
      private void clientCancelled(Status status) {
485
        internalCancel(status);
1✔
486
      }
1✔
487

488
      @Override
489
      public void writeMessage(InputStream message) {
490
        long messageLength = 0;
1✔
491
        if (isEnabledSupportTracingMessageSizes) {
1✔
492
          try {
493
            if (assumedMessageSize != -1) {
×
494
              messageLength = assumedMessageSize;
×
495
            } else if (message instanceof KnownLength || message instanceof ByteArrayInputStream) {
×
496
              messageLength = message.available();
×
497
            } else {
498
              InputStream oldMessage = message;
×
499
              byte[] payload = ByteStreams.toByteArray(message);
×
500
              messageLength = payload.length;
×
501
              message = new ByteArrayInputStream(payload);
×
502
              oldMessage.close();
×
503
            }
504
          } catch (Exception e) {
×
505
            throw new RuntimeException("Error processing the message length", e);
×
506
          }
×
507
        }
508

509
        synchronized (this) {
1✔
510
          if (closed) {
1✔
511
            return;
1✔
512
          }
513
          statsTraceCtx.outboundMessage(outboundSeqNo);
1✔
514
          statsTraceCtx.outboundMessageSent(outboundSeqNo, -1, -1);
1✔
515
          clientStream.statsTraceCtx.inboundMessage(outboundSeqNo);
1✔
516
          clientStream.statsTraceCtx.inboundMessageRead(outboundSeqNo, -1, -1);
1✔
517
          if (isEnabledSupportTracingMessageSizes) {
1✔
518
            statsTraceCtx.outboundUncompressedSize(messageLength);
×
519
            statsTraceCtx.outboundWireSize(messageLength);
×
520
            // messageLength should be same at receiver's end as no actual wire is involved.
521
            clientStream.statsTraceCtx.inboundUncompressedSize(messageLength);
×
522
            clientStream.statsTraceCtx.inboundWireSize(messageLength);
×
523
          }
524
          outboundSeqNo++;
1✔
525
          StreamListener.MessageProducer producer = new SingleMessageProducer(message);
1✔
526
          if (clientRequested > 0) {
1✔
527
            clientRequested--;
1✔
528
            syncContext.executeLater(() -> clientStreamListener.messagesAvailable(producer));
1✔
529
          } else {
530
            clientReceiveQueue.add(producer);
1✔
531
          }
532
        }
1✔
533
        syncContext.drain();
1✔
534
      }
1✔
535

536
      @Override
537
      public void flush() {}
1✔
538

539
      @Override
540
      public synchronized boolean isReady() {
541
        if (closed) {
1✔
542
          return false;
×
543
        }
544
        return clientRequested > 0;
1✔
545
      }
546

547
      @Override
548
      public void writeHeaders(Metadata headers, boolean flush) {
549
        if (clientMaxInboundMetadataSize != Integer.MAX_VALUE) {
1✔
550
          int metadataSize = metadataSize(headers);
1✔
551
          if (metadataSize > clientMaxInboundMetadataSize) {
1✔
552
            Status serverStatus = Status.CANCELLED.withDescription("Client cancelled the RPC");
1✔
553
            clientStream.serverClosed(serverStatus, serverStatus);
1✔
554
            // Other transports provide very little information in this case. We go ahead and make a
555
            // Status, which may need to be updated if statuscodes.md is updated.
556
            Status failedStatus = Status.RESOURCE_EXHAUSTED.withDescription(
1✔
557
                String.format(
1✔
558
                    Locale.US,
559
                    "Response header metadata larger than %d: %d",
560
                    clientMaxInboundMetadataSize,
1✔
561
                    metadataSize));
1✔
562
            notifyClientClose(failedStatus, new Metadata());
1✔
563
            return;
1✔
564
          }
565
        }
566

567
        synchronized (this) {
1✔
568
          if (closed) {
1✔
569
            return;
1✔
570
          }
571

572
          clientStream.statsTraceCtx.clientInboundHeaders(headers);
1✔
573
          syncContext.executeLater(() -> clientStreamListener.headersRead(headers));
1✔
574
        }
1✔
575
        syncContext.drain();
1✔
576
      }
1✔
577

578
      @Override
579
      public void close(Status status, Metadata trailers) {
580
        // clientStream.serverClosed must happen before clientStreamListener.closed, otherwise
581
        // clientStreamListener.closed can trigger clientStream.cancel (see code in
582
        // ClientCalls.blockingUnaryCall), which may race with clientStream.serverClosed as both are
583
        // calling internalCancel().
584
        clientStream.serverClosed(Status.OK, status);
1✔
585

586
        if (clientMaxInboundMetadataSize != Integer.MAX_VALUE) {
1✔
587
          int statusSize = status.getDescription() == null ? 0 : status.getDescription().length();
1✔
588
          // Go ahead and throw in the status description's length, since that could be very long.
589
          int metadataSize = metadataSize(trailers) + statusSize;
1✔
590
          if (metadataSize > clientMaxInboundMetadataSize) {
1✔
591
            // Override the status for the client, but not the server. Transports do not guarantee
592
            // notifying the server of the failure.
593

594
            // Other transports provide very little information in this case. We go ahead and make a
595
            // Status, which may need to be updated if statuscodes.md is updated.
596
            status = Status.RESOURCE_EXHAUSTED.withDescription(
1✔
597
                String.format(
1✔
598
                    Locale.US,
599
                    "Response header metadata larger than %d: %d",
600
                    clientMaxInboundMetadataSize,
1✔
601
                    metadataSize));
1✔
602
            trailers = new Metadata();
1✔
603
          }
604
        }
605

606
        notifyClientClose(status, trailers);
1✔
607
      }
1✔
608

609
      /** clientStream.serverClosed() must be called before this method */
610
      private void notifyClientClose(Status status, Metadata trailers) {
611
        Status clientStatus = cleanStatus(status, includeCauseWithStatus);
1✔
612
        synchronized (this) {
1✔
613
          if (closed) {
1✔
614
            return;
1✔
615
          }
616
          if (clientReceiveQueue.isEmpty()) {
1✔
617
            closed = true;
1✔
618
            clientStream.statsTraceCtx.clientInboundTrailers(trailers);
1✔
619
            clientStream.statsTraceCtx.streamClosed(clientStatus);
1✔
620
            syncContext.executeLater(
1✔
621
                () -> clientStreamListener.closed(clientStatus, RpcProgress.PROCESSED, trailers));
1✔
622
          } else {
623
            clientNotifyStatus = clientStatus;
1✔
624
            clientNotifyTrailers = trailers;
1✔
625
          }
626
        }
1✔
627
        syncContext.drain();
1✔
628
        streamClosed();
1✔
629
      }
1✔
630

631
      @Override
632
      public void cancel(Status status) {
633
        if (!internalCancel(Status.CANCELLED.withDescription("server cancelled stream"))) {
1✔
634
          return;
1✔
635
        }
636
        clientStream.serverClosed(status, status);
1✔
637
        streamClosed();
1✔
638
      }
1✔
639

640
      private boolean internalCancel(Status clientStatus) {
641
        synchronized (this) {
1✔
642
          if (closed) {
1✔
643
            return false;
1✔
644
          }
645
          closed = true;
1✔
646
          StreamListener.MessageProducer producer;
647
          while ((producer = clientReceiveQueue.poll()) != null) {
1✔
648
            InputStream message;
649
            while ((message = producer.next()) != null) {
×
650
              try {
651
                message.close();
×
652
              } catch (Throwable t) {
×
653
                log.log(Level.WARNING, "Exception closing stream", t);
×
654
              }
×
655
            }
656
          }
×
657
          clientStream.statsTraceCtx.streamClosed(clientStatus);
1✔
658
          syncContext.executeLater(
1✔
659
              () ->
660
                  clientStreamListener.closed(clientStatus, RpcProgress.PROCESSED, new Metadata()));
1✔
661
        }
1✔
662
        syncContext.drain();
1✔
663
        return true;
1✔
664
      }
665

666
      @Override
667
      public void setMessageCompression(boolean enable) {
668
        // noop
669
      }
×
670

671
      @Override
672
      public void optimizeForDirectExecutor() {}
1✔
673

674
      @Override
675
      public void setCompressor(Compressor compressor) {}
1✔
676

677
      @Override
678
      public void setDecompressor(Decompressor decompressor) {}
×
679

680
      @Override public Attributes getAttributes() {
681
        return serverStreamAttributes;
1✔
682
      }
683

684
      @Override
685
      public String getAuthority() {
686
        return InProcessStream.this.authority;
1✔
687
      }
688

689
      @Override
690
      public StatsTraceContext statsTraceContext() {
691
        return statsTraceCtx;
1✔
692
      }
693

694
      @Override
695
      public int streamId() {
696
        return -1;
1✔
697
      }
698

699
      @Override
700
      public void setOnReadyThreshold(int numBytes) {
701
        // noop
702
      }
×
703
    }
704

705
    private class InProcessClientStream implements ClientStream {
706
      final StatsTraceContext statsTraceCtx;
707
      final CallOptions callOptions;
708
      // All callbacks must run in syncContext to avoid possibility of deadlock in direct executors
709
      private ServerStreamListener serverStreamListener;
710
      private final SynchronizationContext syncContext =
1✔
711
          new SynchronizationContext(uncaughtExceptionHandler);
1✔
712
      @GuardedBy("this")
713
      private int serverRequested;
714
      @GuardedBy("this")
1✔
715
      private ArrayDeque<StreamListener.MessageProducer> serverReceiveQueue =
716
          new ArrayDeque<>();
717
      @GuardedBy("this")
718
      private boolean serverNotifyHalfClose;
719
      // Only is intended to prevent double-close when server closes.
720
      @GuardedBy("this")
721
      private boolean closed;
722
      @GuardedBy("this")
723
      private int outboundSeqNo;
724

725
      InProcessClientStream(
726
          CallOptions callOptions, StatsTraceContext statsTraceContext) {
1✔
727
        this.callOptions = callOptions;
1✔
728
        statsTraceCtx = statsTraceContext;
1✔
729
      }
1✔
730

731
      private synchronized void setListener(ServerStreamListener listener) {
732
        this.serverStreamListener = listener;
1✔
733
      }
1✔
734

735
      @Override
736
      public void request(int numMessages) {
737
        boolean onReady = serverStream.clientRequested(numMessages);
1✔
738
        if (onReady) {
1✔
739
          synchronized (this) {
1✔
740
            if (!closed) {
1✔
741
              syncContext.executeLater(() -> serverStreamListener.onReady());
1✔
742
            }
743
          }
1✔
744
          syncContext.drain();
1✔
745
        }
746
      }
1✔
747

748
      // This method is the only reason we have to synchronize field accesses.
749
      /**
750
       * Client requested more messages.
751
       *
752
       * @return whether onReady should be called on the server
753
       */
754
      private boolean serverRequested(int numMessages) {
755
        boolean previouslyReady;
756
        boolean nowReady;
757
        synchronized (this) {
1✔
758
          if (closed) {
1✔
759
            return false;
1✔
760
          }
761
          previouslyReady = serverRequested > 0;
1✔
762
          serverRequested += numMessages;
1✔
763

764
          while (serverRequested > 0 && !serverReceiveQueue.isEmpty()) {
1✔
765
            serverRequested--;
1✔
766
            StreamListener.MessageProducer producer = serverReceiveQueue.poll();
1✔
767
            syncContext.executeLater(() -> serverStreamListener.messagesAvailable(producer));
1✔
768
          }
1✔
769

770
          if (serverReceiveQueue.isEmpty() && serverNotifyHalfClose) {
1✔
771
            serverNotifyHalfClose = false;
1✔
772
            syncContext.executeLater(() -> serverStreamListener.halfClosed());
1✔
773
          }
774
          nowReady = serverRequested > 0;
1✔
775
        }
1✔
776
        syncContext.drain();
1✔
777
        return !previouslyReady && nowReady;
1✔
778
      }
779

780
      private void serverClosed(Status serverListenerStatus, Status serverTracerStatus) {
781
        internalCancel(serverListenerStatus, serverTracerStatus);
1✔
782
      }
1✔
783

784
      @Override
785
      public void writeMessage(InputStream message) {
786
        long messageLength = 0;
1✔
787
        if (isEnabledSupportTracingMessageSizes) {
1✔
788
          try {
789
            if (assumedMessageSize != -1) {
×
790
              messageLength = assumedMessageSize;
×
791
            } else if (message instanceof KnownLength || message instanceof ByteArrayInputStream) {
×
792
              messageLength = message.available();
×
793
            } else {
794
              InputStream oldMessage = message;
×
795
              byte[] payload = ByteStreams.toByteArray(message);
×
796
              messageLength = payload.length;
×
797
              message = new ByteArrayInputStream(payload);
×
798
              oldMessage.close();
×
799
            }
800
          } catch (Exception e) {
×
801
            throw new RuntimeException("Error processing the message length", e);
×
802
          }
×
803
        }
804
        synchronized (this) {
1✔
805
          if (closed) {
1✔
806
            return;
1✔
807
          }
808
          statsTraceCtx.outboundMessage(outboundSeqNo);
1✔
809
          statsTraceCtx.outboundMessageSent(outboundSeqNo, -1, -1);
1✔
810
          serverStream.statsTraceCtx.inboundMessage(outboundSeqNo);
1✔
811
          serverStream.statsTraceCtx.inboundMessageRead(outboundSeqNo, -1, -1);
1✔
812
          if (isEnabledSupportTracingMessageSizes) {
1✔
813
            statsTraceCtx.outboundUncompressedSize(messageLength);
×
814
            statsTraceCtx.outboundWireSize(messageLength);
×
815
            // messageLength should be same at receiver's end as no actual wire is involved.
816
            serverStream.statsTraceCtx.inboundUncompressedSize(messageLength);
×
817
            serverStream.statsTraceCtx.inboundWireSize(messageLength);
×
818
          }
819
          outboundSeqNo++;
1✔
820
          StreamListener.MessageProducer producer = new SingleMessageProducer(message);
1✔
821
          if (serverRequested > 0) {
1✔
822
            serverRequested--;
1✔
823
            syncContext.executeLater(() -> serverStreamListener.messagesAvailable(producer));
1✔
824
          } else {
825
            serverReceiveQueue.add(producer);
1✔
826
          }
827
        }
1✔
828
        syncContext.drain();
1✔
829
      }
1✔
830

831
      @Override
832
      public void flush() {}
1✔
833

834
      @Override
835
      public synchronized boolean isReady() {
836
        if (closed) {
1✔
837
          return false;
×
838
        }
839
        return serverRequested > 0;
1✔
840
      }
841

842
      // Must be thread-safe for shutdownNow()
843
      @Override
844
      public void cancel(Status reason) {
845
        Status serverStatus = cleanStatus(reason, includeCauseWithStatus);
1✔
846
        if (!internalCancel(serverStatus, serverStatus)) {
1✔
847
          return;
1✔
848
        }
849
        serverStream.clientCancelled(reason);
1✔
850
        streamClosed();
1✔
851
      }
1✔
852

853
      private boolean internalCancel(
854
          Status serverListenerStatus, Status serverTracerStatus) {
855
        synchronized (this) {
1✔
856
          if (closed) {
1✔
857
            return false;
1✔
858
          }
859
          closed = true;
1✔
860

861
          StreamListener.MessageProducer producer;
862
          while ((producer = serverReceiveQueue.poll()) != null) {
1✔
863
            InputStream message;
864
            while ((message = producer.next()) != null) {
1✔
865
              try {
866
                message.close();
1✔
867
              } catch (Throwable t) {
×
868
                log.log(Level.WARNING, "Exception closing stream", t);
×
869
              }
1✔
870
            }
871
          }
1✔
872
          serverStream.statsTraceCtx.streamClosed(serverTracerStatus);
1✔
873
          syncContext.executeLater(() -> serverStreamListener.closed(serverListenerStatus));
1✔
874
        }
1✔
875
        syncContext.drain();
1✔
876
        return true;
1✔
877
      }
878

879
      @Override
880
      public void halfClose() {
881
        synchronized (this) {
1✔
882
          if (closed) {
1✔
883
            return;
1✔
884
          }
885
          if (serverReceiveQueue.isEmpty()) {
1✔
886
            syncContext.executeLater(() -> serverStreamListener.halfClosed());
1✔
887
          } else {
888
            serverNotifyHalfClose = true;
1✔
889
          }
890
        }
1✔
891
        syncContext.drain();
1✔
892
      }
1✔
893

894
      @Override
895
      public void setMessageCompression(boolean enable) {}
×
896

897
      @Override
898
      public void setAuthority(String string) {
899
        InProcessStream.this.authority = string;
×
900
      }
×
901

902
      @Override
903
      public void start(ClientStreamListener listener) {
904
        serverStream.setListener(listener);
1✔
905

906
        synchronized (InProcessTransport.this) {
1✔
907
          statsTraceCtx.clientOutboundHeaders();
1✔
908
          streams.add(InProcessTransport.InProcessStream.this);
1✔
909
          if (GrpcUtil.shouldBeCountedForInUse(callOptions)) {
1✔
910
            inUseState.updateObjectInUse(InProcessTransport.InProcessStream.this, true);
1✔
911
          }
912
          serverTransportListener.streamCreated(serverStream, method.getFullMethodName(), headers);
1✔
913
        }
1✔
914
      }
1✔
915

916
      @Override
917
      public Attributes getAttributes() {
918
        return attributes;
1✔
919
      }
920

921
      @Override
922
      public void optimizeForDirectExecutor() {}
1✔
923

924
      @Override
925
      public void setCompressor(Compressor compressor) {}
1✔
926

927
      @Override
928
      public void setFullStreamDecompression(boolean fullStreamDecompression) {}
×
929

930
      @Override
931
      public void setDecompressorRegistry(DecompressorRegistry decompressorRegistry) {}
1✔
932

933
      @Override
934
      public void setMaxInboundMessageSize(int maxSize) {}
1✔
935

936
      @Override
937
      public void setMaxOutboundMessageSize(int maxSize) {}
1✔
938

939
      @Override
940
      public void setDeadline(Deadline deadline) {
941
        headers.discardAll(TIMEOUT_KEY);
1✔
942
        long effectiveTimeout = max(0, deadline.timeRemaining(TimeUnit.NANOSECONDS));
1✔
943
        headers.put(TIMEOUT_KEY, effectiveTimeout);
1✔
944
      }
1✔
945

946
      @Override
947
      public void appendTimeoutInsight(InsightBuilder insight) {
948
      }
1✔
949
    }
950
  }
951

952
  /**
953
   * Returns a new status with the same code and description.
954
   * If includeCauseWithStatus is true, cause is also included.
955
   *
956
   * <p>For InProcess transport to behave in the same way as the other transports,
957
   * when exchanging statuses between client and server and vice versa,
958
   * the cause should be excluded from the status.
959
   * For easier debugging, the status may be optionally included.
960
   */
961
  private static Status cleanStatus(Status status, boolean includeCauseWithStatus) {
962
    if (status == null) {
1✔
963
      return null;
×
964
    }
965
    Status clientStatus = Status
1✔
966
        .fromCodeValue(status.getCode().value())
1✔
967
        .withDescription(status.getDescription());
1✔
968
    if (includeCauseWithStatus) {
1✔
969
      clientStatus = clientStatus.withCause(status.getCause());
1✔
970
    }
971
    return clientStatus;
1✔
972
  }
973

974
  private static class SingleMessageProducer implements StreamListener.MessageProducer {
975
    private InputStream message;
976

977
    private SingleMessageProducer(InputStream message) {
1✔
978
      this.message = message;
1✔
979
    }
1✔
980

981
    @Nullable
982
    @Override
983
    public InputStream next() {
984
      InputStream messageToReturn = message;
1✔
985
      message = null;
1✔
986
      return messageToReturn;
1✔
987
    }
988
  }
989
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc