• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

grpc / grpc-java / #19506

11 Oct 2024 04:58AM UTC coverage: 84.648% (-0.02%) from 84.666%
#19506

push

github

web-flow
inprocess: Support tracing message sizes (#11406)

33810 of 39942 relevant lines covered (84.65%)

0.85 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

90.75
/../inprocess/src/main/java/io/grpc/inprocess/InProcessTransport.java
1
/*
2
 * Copyright 2015 The gRPC Authors
3
 *
4
 * Licensed under the Apache License, Version 2.0 (the "License");
5
 * you may not use this file except in compliance with the License.
6
 * You may obtain a copy of the License at
7
 *
8
 *     http://www.apache.org/licenses/LICENSE-2.0
9
 *
10
 * Unless required by applicable law or agreed to in writing, software
11
 * distributed under the License is distributed on an "AS IS" BASIS,
12
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
 * See the License for the specific language governing permissions and
14
 * limitations under the License.
15
 */
16

17
package io.grpc.inprocess;
18

19
import static com.google.common.base.Preconditions.checkNotNull;
20
import static io.grpc.internal.GrpcUtil.TIMEOUT_KEY;
21
import static java.lang.Math.max;
22

23
import com.google.common.base.MoreObjects;
24
import com.google.common.base.Optional;
25
import com.google.common.io.ByteStreams;
26
import com.google.common.util.concurrent.ListenableFuture;
27
import com.google.common.util.concurrent.SettableFuture;
28
import io.grpc.Attributes;
29
import io.grpc.CallOptions;
30
import io.grpc.ClientStreamTracer;
31
import io.grpc.Compressor;
32
import io.grpc.Deadline;
33
import io.grpc.Decompressor;
34
import io.grpc.DecompressorRegistry;
35
import io.grpc.Grpc;
36
import io.grpc.InternalChannelz.SocketStats;
37
import io.grpc.InternalLogId;
38
import io.grpc.InternalMetadata;
39
import io.grpc.KnownLength;
40
import io.grpc.Metadata;
41
import io.grpc.MethodDescriptor;
42
import io.grpc.SecurityLevel;
43
import io.grpc.ServerStreamTracer;
44
import io.grpc.Status;
45
import io.grpc.SynchronizationContext;
46
import io.grpc.internal.ClientStream;
47
import io.grpc.internal.ClientStreamListener;
48
import io.grpc.internal.ClientStreamListener.RpcProgress;
49
import io.grpc.internal.ConnectionClientTransport;
50
import io.grpc.internal.GrpcAttributes;
51
import io.grpc.internal.GrpcUtil;
52
import io.grpc.internal.InUseStateAggregator;
53
import io.grpc.internal.InsightBuilder;
54
import io.grpc.internal.ManagedClientTransport;
55
import io.grpc.internal.NoopClientStream;
56
import io.grpc.internal.ObjectPool;
57
import io.grpc.internal.ServerListener;
58
import io.grpc.internal.ServerStream;
59
import io.grpc.internal.ServerStreamListener;
60
import io.grpc.internal.ServerTransport;
61
import io.grpc.internal.ServerTransportListener;
62
import io.grpc.internal.StatsTraceContext;
63
import io.grpc.internal.StreamListener;
64
import java.io.ByteArrayInputStream;
65
import java.io.InputStream;
66
import java.net.SocketAddress;
67
import java.util.ArrayDeque;
68
import java.util.ArrayList;
69
import java.util.Collections;
70
import java.util.IdentityHashMap;
71
import java.util.List;
72
import java.util.Locale;
73
import java.util.Set;
74
import java.util.concurrent.Executor;
75
import java.util.concurrent.ScheduledExecutorService;
76
import java.util.concurrent.TimeUnit;
77
import java.util.logging.Level;
78
import java.util.logging.Logger;
79
import javax.annotation.CheckReturnValue;
80
import javax.annotation.Nullable;
81
import javax.annotation.concurrent.GuardedBy;
82
import javax.annotation.concurrent.ThreadSafe;
83

84
@ThreadSafe
85
final class InProcessTransport implements ServerTransport, ConnectionClientTransport {
86
  private static final Logger log = Logger.getLogger(InProcessTransport.class.getName());
1✔
87

88
  private final InternalLogId logId;
89
  private final SocketAddress address;
90
  private final int clientMaxInboundMetadataSize;
91
  private final String authority;
92
  private final String userAgent;
93
  private final Optional<ServerListener> optionalServerListener;
94
  private int serverMaxInboundMetadataSize;
95
  private final boolean includeCauseWithStatus;
96
  private ObjectPool<ScheduledExecutorService> serverSchedulerPool;
97
  private ScheduledExecutorService serverScheduler;
98
  private ServerTransportListener serverTransportListener;
99
  private Attributes serverStreamAttributes;
100
  private ManagedClientTransport.Listener clientTransportListener;
101
  // The size is assumed from the sender's side.
102
  private final long assumedMessageSize;
103
  @GuardedBy("this")
104
  private boolean shutdown;
105
  @GuardedBy("this")
106
  private boolean terminated;
107
  @GuardedBy("this")
108
  private Status shutdownStatus;
109
  @GuardedBy("this")
1✔
110
  private final Set<InProcessStream> streams = Collections.newSetFromMap(
1✔
111
          new IdentityHashMap<InProcessStream, Boolean>());
112
  @GuardedBy("this")
113
  private List<ServerStreamTracer.Factory> serverStreamTracerFactories;
114
  private Attributes attributes;
115

116
  private Thread.UncaughtExceptionHandler uncaughtExceptionHandler =
1✔
117
      new Thread.UncaughtExceptionHandler() {
1✔
118
        @Override
119
        public void uncaughtException(Thread t, Throwable e) {
120
          if (e instanceof Error) {
×
121
            throw new Error(e);
×
122
          }
123
          throw new RuntimeException(e);
×
124
        }
125
      };
126

127

128
  @GuardedBy("this")
1✔
129
  private final InUseStateAggregator<InProcessStream> inUseState =
130
      new InUseStateAggregator<InProcessStream>() {
1✔
131
        @Override
132
        protected void handleInUse() {
133
          clientTransportListener.transportInUse(true);
1✔
134
        }
1✔
135

136
        @Override
137
        protected void handleNotInUse() {
138
          clientTransportListener.transportInUse(false);
1✔
139
        }
1✔
140
      };
141

142
  private InProcessTransport(SocketAddress address, int maxInboundMetadataSize, String authority,
143
      String userAgent, Attributes eagAttrs, Optional<ServerListener> optionalServerListener,
144
      boolean includeCauseWithStatus, long assumedMessageSize) {
1✔
145
    this.address = address;
1✔
146
    this.clientMaxInboundMetadataSize = maxInboundMetadataSize;
1✔
147
    this.authority = authority;
1✔
148
    this.userAgent = GrpcUtil.getGrpcUserAgent("inprocess", userAgent);
1✔
149
    checkNotNull(eagAttrs, "eagAttrs");
1✔
150
    this.attributes = Attributes.newBuilder()
1✔
151
        .set(GrpcAttributes.ATTR_SECURITY_LEVEL, SecurityLevel.PRIVACY_AND_INTEGRITY)
1✔
152
        .set(GrpcAttributes.ATTR_CLIENT_EAG_ATTRS, eagAttrs)
1✔
153
        .set(Grpc.TRANSPORT_ATTR_REMOTE_ADDR, address)
1✔
154
        .set(Grpc.TRANSPORT_ATTR_LOCAL_ADDR, address)
1✔
155
        .build();
1✔
156
    this.optionalServerListener = optionalServerListener;
1✔
157
    logId = InternalLogId.allocate(getClass(), address.toString());
1✔
158
    this.includeCauseWithStatus = includeCauseWithStatus;
1✔
159
    this.assumedMessageSize = assumedMessageSize;
1✔
160
  }
1✔
161

162
  public InProcessTransport(
163
      SocketAddress address, int maxInboundMetadataSize, String authority, String userAgent,
164
      Attributes eagAttrs, boolean includeCauseWithStatus, long assumedMessageSize) {
165
    this(address, maxInboundMetadataSize, authority, userAgent, eagAttrs,
1✔
166
        Optional.<ServerListener>absent(), includeCauseWithStatus, assumedMessageSize);
1✔
167
  }
1✔
168

169
  InProcessTransport(
170
      String name, int maxInboundMetadataSize, String authority, String userAgent,
171
      Attributes eagAttrs, ObjectPool<ScheduledExecutorService> serverSchedulerPool,
172
      List<ServerStreamTracer.Factory> serverStreamTracerFactories,
173
      ServerListener serverListener, boolean includeCauseWithStatus, long assumedMessageSize) {
174
    this(new InProcessSocketAddress(name), maxInboundMetadataSize, authority, userAgent, eagAttrs,
1✔
175
        Optional.of(serverListener), includeCauseWithStatus, assumedMessageSize);
1✔
176
    this.serverMaxInboundMetadataSize = maxInboundMetadataSize;
1✔
177
    this.serverSchedulerPool = serverSchedulerPool;
1✔
178
    this.serverStreamTracerFactories = serverStreamTracerFactories;
1✔
179
  }
1✔
180

181
  @CheckReturnValue
182
  @Override
183
  public synchronized Runnable start(ManagedClientTransport.Listener listener) {
184
    this.clientTransportListener = listener;
1✔
185
    if (optionalServerListener.isPresent()) {
1✔
186
      serverScheduler = serverSchedulerPool.getObject();
1✔
187
      serverTransportListener = optionalServerListener.get().transportCreated(this);
1✔
188
    } else {
189
      InProcessServer server = InProcessServer.findServer(address);
1✔
190
      if (server != null) {
1✔
191
        serverMaxInboundMetadataSize = server.getMaxInboundMetadataSize();
1✔
192
        serverSchedulerPool = server.getScheduledExecutorServicePool();
1✔
193
        serverScheduler = serverSchedulerPool.getObject();
1✔
194
        serverStreamTracerFactories = server.getStreamTracerFactories();
1✔
195
        // Must be semi-initialized; past this point, can begin receiving requests
196
        serverTransportListener = server.register(this);
1✔
197
      }
198
    }
199
    if (serverTransportListener == null) {
1✔
200
      shutdownStatus = Status.UNAVAILABLE.withDescription("Could not find server: " + address);
1✔
201
      final Status localShutdownStatus = shutdownStatus;
1✔
202
      return new Runnable() {
1✔
203
        @Override
204
        public void run() {
205
          synchronized (InProcessTransport.this) {
1✔
206
            notifyShutdown(localShutdownStatus);
1✔
207
            notifyTerminated();
1✔
208
          }
1✔
209
        }
1✔
210
      };
211
    }
212
    Attributes serverTransportAttrs = Attributes.newBuilder()
1✔
213
        .set(Grpc.TRANSPORT_ATTR_REMOTE_ADDR, address)
1✔
214
        .set(Grpc.TRANSPORT_ATTR_LOCAL_ADDR, address)
1✔
215
        .build();
1✔
216
    serverStreamAttributes = serverTransportListener.transportReady(serverTransportAttrs);
1✔
217
    attributes = clientTransportListener.filterTransport(attributes);
1✔
218
    clientTransportListener.transportReady();
1✔
219
    return null;
1✔
220
  }
221

222
  @Override
223
  public synchronized ClientStream newStream(
224
      MethodDescriptor<?, ?> method, Metadata headers, CallOptions callOptions,
225
      ClientStreamTracer[] tracers) {
226
    StatsTraceContext statsTraceContext =
1✔
227
        StatsTraceContext.newClientContext(tracers, getAttributes(), headers);
1✔
228
    if (shutdownStatus != null) {
1✔
229
      return failedClientStream(statsTraceContext, shutdownStatus);
1✔
230
    }
231

232
    headers.put(GrpcUtil.USER_AGENT_KEY, userAgent);
1✔
233

234
    if (serverMaxInboundMetadataSize != Integer.MAX_VALUE) {
1✔
235
      int metadataSize = metadataSize(headers);
1✔
236
      if (metadataSize > serverMaxInboundMetadataSize) {
1✔
237
        // Other transports would compute a status with:
238
        //   GrpcUtil.httpStatusToGrpcStatus(431 /* Request Header Fields Too Large */);
239
        // However, that isn't handled specially today, so we'd leak HTTP-isms even though we're
240
        // in-process. We go ahead and make a Status, which may need to be updated if
241
        // statuscodes.md is updated.
242
        Status status = Status.RESOURCE_EXHAUSTED.withDescription(
1✔
243
            String.format(
1✔
244
                Locale.US,
245
                "Request metadata larger than %d: %d",
246
                serverMaxInboundMetadataSize,
1✔
247
                metadataSize));
1✔
248
        return failedClientStream(statsTraceContext, status);
1✔
249
      }
250
    }
251

252
    return new InProcessStream(method, headers, callOptions, authority, statsTraceContext)
1✔
253
        .clientStream;
1✔
254
  }
255

256
  private ClientStream failedClientStream(
257
      final StatsTraceContext statsTraceCtx, final Status status) {
258
    return new NoopClientStream() {
1✔
259
        @Override
260
        public void start(ClientStreamListener listener) {
261
          statsTraceCtx.clientOutboundHeaders();
1✔
262
          statsTraceCtx.streamClosed(status);
1✔
263
          listener.closed(status, RpcProgress.PROCESSED, new Metadata());
1✔
264
        }
1✔
265
      };
266
  }
267

268
  @Override
269
  public synchronized void ping(final PingCallback callback, Executor executor) {
270
    if (terminated) {
1✔
271
      final Status shutdownStatus = this.shutdownStatus;
1✔
272
      executor.execute(new Runnable() {
1✔
273
        @Override
274
        public void run() {
275
          callback.onFailure(shutdownStatus.asRuntimeException());
1✔
276
        }
1✔
277
      });
278
    } else {
1✔
279
      executor.execute(new Runnable() {
1✔
280
        @Override
281
        public void run() {
282
          callback.onSuccess(0);
1✔
283
        }
1✔
284
      });
285
    }
286
  }
1✔
287

288
  @Override
289
  public synchronized void shutdown(Status reason) {
290
    // Can be called multiple times: once for ManagedClientTransport, once for ServerTransport.
291
    if (shutdown) {
1✔
292
      return;
1✔
293
    }
294
    shutdownStatus = reason;
1✔
295
    notifyShutdown(reason);
1✔
296
    if (streams.isEmpty()) {
1✔
297
      notifyTerminated();
1✔
298
    }
299
  }
1✔
300

301
  @Override
302
  public synchronized void shutdown() {
303
    shutdown(Status.UNAVAILABLE.withDescription("InProcessTransport shutdown by the server-side"));
1✔
304
  }
1✔
305

306
  @Override
307
  public void shutdownNow(Status reason) {
308
    checkNotNull(reason, "reason");
1✔
309
    List<InProcessStream> streamsCopy;
310
    synchronized (this) {
1✔
311
      shutdown(reason);
1✔
312
      if (terminated) {
1✔
313
        return;
1✔
314
      }
315
      streamsCopy = new ArrayList<>(streams);
1✔
316
    }
1✔
317
    for (InProcessStream stream : streamsCopy) {
1✔
318
      stream.clientStream.cancel(reason);
1✔
319
    }
1✔
320
  }
1✔
321

322
  @Override
323
  public String toString() {
324
    return MoreObjects.toStringHelper(this)
×
325
        .add("logId", logId.getId())
×
326
        .add("address", address)
×
327
        .toString();
×
328
  }
329

330
  @Override
331
  public InternalLogId getLogId() {
332
    return logId;
1✔
333
  }
334

335
  @Override
336
  public Attributes getAttributes() {
337
    return attributes;
1✔
338
  }
339

340
  @Override
341
  public ScheduledExecutorService getScheduledExecutorService() {
342
    return serverScheduler;
1✔
343
  }
344

345
  @Override
346
  public ListenableFuture<SocketStats> getStats() {
347
    SettableFuture<SocketStats> ret = SettableFuture.create();
×
348
    ret.set(null);
×
349
    return ret;
×
350
  }
351

352
  private synchronized void notifyShutdown(Status s) {
353
    if (shutdown) {
1✔
354
      return;
×
355
    }
356
    shutdown = true;
1✔
357
    clientTransportListener.transportShutdown(s);
1✔
358
  }
1✔
359

360
  private synchronized void notifyTerminated() {
361
    if (terminated) {
1✔
362
      return;
1✔
363
    }
364
    terminated = true;
1✔
365
    if (serverScheduler != null) {
1✔
366
      serverScheduler = serverSchedulerPool.returnObject(serverScheduler);
1✔
367
    }
368
    clientTransportListener.transportTerminated();
1✔
369
    if (serverTransportListener != null) {
1✔
370
      serverTransportListener.transportTerminated();
1✔
371
    }
372
  }
1✔
373

374
  private static int metadataSize(Metadata metadata) {
375
    byte[][] serialized = InternalMetadata.serialize(metadata);
1✔
376
    if (serialized == null) {
1✔
377
      return 0;
×
378
    }
379
    // Calculate based on SETTINGS_MAX_HEADER_LIST_SIZE in RFC 7540 §6.5.2. We could use something
380
    // different, but it's "sane."
381
    long size = 0;
1✔
382
    for (int i = 0; i < serialized.length; i += 2) {
1✔
383
      size += 32 + serialized[i].length + serialized[i + 1].length;
1✔
384
    }
385
    size = Math.min(size, Integer.MAX_VALUE);
1✔
386
    return (int) size;
1✔
387
  }
388

389
  private class InProcessStream {
390
    private final InProcessClientStream clientStream;
391
    private final InProcessServerStream serverStream;
392
    private final CallOptions callOptions;
393
    private final Metadata headers;
394
    private final MethodDescriptor<?, ?> method;
395
    private volatile String authority;
396

397
    private InProcessStream(
398
        MethodDescriptor<?, ?> method, Metadata headers, CallOptions callOptions,
399
        String authority , StatsTraceContext statsTraceContext) {
1✔
400
      this.method = checkNotNull(method, "method");
1✔
401
      this.headers = checkNotNull(headers, "headers");
1✔
402
      this.callOptions = checkNotNull(callOptions, "callOptions");
1✔
403
      this.authority = authority;
1✔
404
      this.clientStream = new InProcessClientStream(callOptions, statsTraceContext);
1✔
405
      this.serverStream = new InProcessServerStream(method, headers);
1✔
406
    }
1✔
407

408
    // Can be called multiple times due to races on both client and server closing at same time.
409
    private void streamClosed() {
410
      synchronized (InProcessTransport.this) {
1✔
411
        boolean justRemovedAnElement = streams.remove(this);
1✔
412
        if (GrpcUtil.shouldBeCountedForInUse(callOptions)) {
1✔
413
          inUseState.updateObjectInUse(this, false);
1✔
414
        }
415
        if (streams.isEmpty() && justRemovedAnElement) {
1✔
416
          if (shutdown) {
1✔
417
            notifyTerminated();
1✔
418
          }
419
        }
420
      }
1✔
421
    }
1✔
422

423
    private class InProcessServerStream implements ServerStream {
424
      final StatsTraceContext statsTraceCtx;
425
      // All callbacks must run in syncContext to avoid possibility of deadlock in direct executors
426
      private ClientStreamListener clientStreamListener;
427
      private final SynchronizationContext syncContext =
1✔
428
          new SynchronizationContext(uncaughtExceptionHandler);
1✔
429
      @GuardedBy("this")
430
      private int clientRequested;
431
      @GuardedBy("this")
1✔
432
      private ArrayDeque<StreamListener.MessageProducer> clientReceiveQueue =
433
          new ArrayDeque<>();
434
      @GuardedBy("this")
435
      private Status clientNotifyStatus;
436
      @GuardedBy("this")
437
      private Metadata clientNotifyTrailers;
438
      // Only is intended to prevent double-close when client cancels.
439
      @GuardedBy("this")
440
      private boolean closed;
441
      @GuardedBy("this")
442
      private int outboundSeqNo;
443

444
      InProcessServerStream(MethodDescriptor<?, ?> method, Metadata headers) {
1✔
445
        statsTraceCtx = StatsTraceContext.newServerContext(
1✔
446
            serverStreamTracerFactories, method.getFullMethodName(), headers);
1✔
447
      }
1✔
448

449
      private synchronized void setListener(ClientStreamListener listener) {
450
        clientStreamListener = listener;
1✔
451
      }
1✔
452

453
      @Override
454
      public void setListener(ServerStreamListener serverStreamListener) {
455
        clientStream.setListener(serverStreamListener);
1✔
456
      }
1✔
457

458
      @Override
459
      public void request(int numMessages) {
460
        boolean onReady = clientStream.serverRequested(numMessages);
1✔
461
        if (onReady) {
1✔
462
          synchronized (this) {
1✔
463
            if (!closed) {
1✔
464
              syncContext.executeLater(() -> clientStreamListener.onReady());
1✔
465
            }
466
          }
1✔
467
        }
468
        syncContext.drain();
1✔
469
      }
1✔
470

471
      // This method is the only reason we have to synchronize field accesses.
472
      /**
473
       * Client requested more messages.
474
       *
475
       * @return whether onReady should be called on the server
476
       */
477
      private boolean clientRequested(int numMessages) {
478
        boolean previouslyReady;
479
        boolean nowReady;
480
        synchronized (this) {
1✔
481
          if (closed) {
1✔
482
            return false;
1✔
483
          }
484

485
          previouslyReady = clientRequested > 0;
1✔
486
          clientRequested += numMessages;
1✔
487
          while (clientRequested > 0 && !clientReceiveQueue.isEmpty()) {
1✔
488
            clientRequested--;
1✔
489
            StreamListener.MessageProducer producer = clientReceiveQueue.poll();
1✔
490
            syncContext.executeLater(() -> clientStreamListener.messagesAvailable(producer));
1✔
491
          }
1✔
492

493
          if (clientReceiveQueue.isEmpty() && clientNotifyStatus != null) {
1✔
494
            closed = true;
1✔
495
            clientStream.statsTraceCtx.clientInboundTrailers(clientNotifyTrailers);
1✔
496
            clientStream.statsTraceCtx.streamClosed(clientNotifyStatus);
1✔
497
            Status notifyStatus = this.clientNotifyStatus;
1✔
498
            Metadata notifyTrailers = this.clientNotifyTrailers;
1✔
499
            syncContext.executeLater(() ->
1✔
500
                clientStreamListener.closed(notifyStatus, RpcProgress.PROCESSED, notifyTrailers));
1✔
501
          }
502

503
          nowReady = clientRequested > 0;
1✔
504
        }
1✔
505

506
        syncContext.drain();
1✔
507
        return !previouslyReady && nowReady;
1✔
508
      }
509

510
      private void clientCancelled(Status status) {
511
        internalCancel(status);
1✔
512
      }
1✔
513

514
      @Override
515
      public void writeMessage(InputStream message) {
516
        long messageLength;
517
        try {
518
          if (assumedMessageSize != -1) {
1✔
519
            messageLength = assumedMessageSize;
1✔
520
          } else if (message instanceof KnownLength || message instanceof ByteArrayInputStream) {
1✔
521
            messageLength = message.available();
1✔
522
          } else {
523
            InputStream oldMessage = message;
×
524
            byte[] payload = ByteStreams.toByteArray(message);
×
525
            messageLength = payload.length;
×
526
            message = new ByteArrayInputStream(payload);
×
527
            oldMessage.close();
×
528
          }
529
        } catch (Exception e) {
×
530
          throw new RuntimeException("Error processing the message length", e);
×
531
        }
1✔
532
        synchronized (this) {
1✔
533
          if (closed) {
1✔
534
            return;
1✔
535
          }
536
          statsTraceCtx.outboundMessage(outboundSeqNo);
1✔
537
          statsTraceCtx.outboundMessageSent(outboundSeqNo, -1, -1);
1✔
538
          clientStream.statsTraceCtx.inboundMessage(outboundSeqNo);
1✔
539
          clientStream.statsTraceCtx.inboundMessageRead(outboundSeqNo, -1, -1);
1✔
540
          statsTraceCtx.outboundUncompressedSize(messageLength);
1✔
541
          statsTraceCtx.outboundWireSize(messageLength);
1✔
542
          // messageLength should be same at receiver's end as no actual wire is involved.
543
          clientStream.statsTraceCtx.inboundUncompressedSize(messageLength);
1✔
544
          clientStream.statsTraceCtx.inboundWireSize(messageLength);
1✔
545
          outboundSeqNo++;
1✔
546
          StreamListener.MessageProducer producer = new SingleMessageProducer(message);
1✔
547
          if (clientRequested > 0) {
1✔
548
            clientRequested--;
1✔
549
            syncContext.executeLater(() -> clientStreamListener.messagesAvailable(producer));
1✔
550
          } else {
551
            clientReceiveQueue.add(producer);
1✔
552
          }
553
        }
1✔
554

555
        syncContext.drain();
1✔
556
      }
1✔
557

558
      @Override
559
      public void flush() {}
1✔
560

561
      @Override
562
      public synchronized boolean isReady() {
563
        if (closed) {
1✔
564
          return false;
×
565
        }
566
        return clientRequested > 0;
1✔
567
      }
568

569
      @Override
570
      public void writeHeaders(Metadata headers, boolean flush) {
571
        if (clientMaxInboundMetadataSize != Integer.MAX_VALUE) {
1✔
572
          int metadataSize = metadataSize(headers);
1✔
573
          if (metadataSize > clientMaxInboundMetadataSize) {
1✔
574
            Status serverStatus = Status.CANCELLED.withDescription("Client cancelled the RPC");
1✔
575
            clientStream.serverClosed(serverStatus, serverStatus);
1✔
576
            // Other transports provide very little information in this case. We go ahead and make a
577
            // Status, which may need to be updated if statuscodes.md is updated.
578
            Status failedStatus = Status.RESOURCE_EXHAUSTED.withDescription(
1✔
579
                String.format(
1✔
580
                    Locale.US,
581
                    "Response header metadata larger than %d: %d",
582
                    clientMaxInboundMetadataSize,
1✔
583
                    metadataSize));
1✔
584
            notifyClientClose(failedStatus, new Metadata());
1✔
585
            return;
1✔
586
          }
587
        }
588

589
        synchronized (this) {
1✔
590
          if (closed) {
1✔
591
            return;
1✔
592
          }
593

594
          clientStream.statsTraceCtx.clientInboundHeaders(headers);
1✔
595
          syncContext.executeLater(() -> clientStreamListener.headersRead(headers));
1✔
596
        }
1✔
597
        syncContext.drain();
1✔
598
      }
1✔
599

600
      @Override
601
      public void close(Status status, Metadata trailers) {
602
        // clientStream.serverClosed must happen before clientStreamListener.closed, otherwise
603
        // clientStreamListener.closed can trigger clientStream.cancel (see code in
604
        // ClientCalls.blockingUnaryCall), which may race with clientStream.serverClosed as both are
605
        // calling internalCancel().
606
        clientStream.serverClosed(Status.OK, status);
1✔
607

608
        if (clientMaxInboundMetadataSize != Integer.MAX_VALUE) {
1✔
609
          int statusSize = status.getDescription() == null ? 0 : status.getDescription().length();
1✔
610
          // Go ahead and throw in the status description's length, since that could be very long.
611
          int metadataSize = metadataSize(trailers) + statusSize;
1✔
612
          if (metadataSize > clientMaxInboundMetadataSize) {
1✔
613
            // Override the status for the client, but not the server. Transports do not guarantee
614
            // notifying the server of the failure.
615

616
            // Other transports provide very little information in this case. We go ahead and make a
617
            // Status, which may need to be updated if statuscodes.md is updated.
618
            status = Status.RESOURCE_EXHAUSTED.withDescription(
1✔
619
                String.format(
1✔
620
                    Locale.US,
621
                    "Response header metadata larger than %d: %d",
622
                    clientMaxInboundMetadataSize,
1✔
623
                    metadataSize));
1✔
624
            trailers = new Metadata();
1✔
625
          }
626
        }
627

628
        notifyClientClose(status, trailers);
1✔
629
      }
1✔
630

631
      /** clientStream.serverClosed() must be called before this method */
632
      private void notifyClientClose(Status status, Metadata trailers) {
633
        Status clientStatus = cleanStatus(status, includeCauseWithStatus);
1✔
634
        synchronized (this) {
1✔
635
          if (closed) {
1✔
636
            return;
1✔
637
          }
638
          if (clientReceiveQueue.isEmpty()) {
1✔
639
            closed = true;
1✔
640
            clientStream.statsTraceCtx.clientInboundTrailers(trailers);
1✔
641
            clientStream.statsTraceCtx.streamClosed(clientStatus);
1✔
642
            syncContext.executeLater(
1✔
643
                () -> clientStreamListener.closed(clientStatus, RpcProgress.PROCESSED, trailers));
1✔
644
          } else {
645
            clientNotifyStatus = clientStatus;
1✔
646
            clientNotifyTrailers = trailers;
1✔
647
          }
648
        }
1✔
649
        syncContext.drain();
1✔
650
        streamClosed();
1✔
651
      }
1✔
652

653
      @Override
654
      public void cancel(Status status) {
655
        if (!internalCancel(Status.CANCELLED.withDescription("server cancelled stream"))) {
1✔
656
          return;
1✔
657
        }
658
        clientStream.serverClosed(status, status);
1✔
659
        streamClosed();
1✔
660
      }
1✔
661

662
      private boolean internalCancel(Status clientStatus) {
663
        synchronized (this) {
1✔
664
          if (closed) {
1✔
665
            return false;
1✔
666
          }
667
          closed = true;
1✔
668
          StreamListener.MessageProducer producer;
669
          while ((producer = clientReceiveQueue.poll()) != null) {
1✔
670
            InputStream message;
671
            while ((message = producer.next()) != null) {
×
672
              try {
673
                message.close();
×
674
              } catch (Throwable t) {
×
675
                log.log(Level.WARNING, "Exception closing stream", t);
×
676
              }
×
677
            }
678
          }
×
679
          clientStream.statsTraceCtx.streamClosed(clientStatus);
1✔
680
          syncContext.executeLater(
1✔
681
              () ->
682
                  clientStreamListener.closed(clientStatus, RpcProgress.PROCESSED, new Metadata()));
1✔
683
        }
1✔
684
        syncContext.drain();
1✔
685
        return true;
1✔
686
      }
687

688
      @Override
689
      public void setMessageCompression(boolean enable) {
690
        // noop
691
      }
×
692

693
      @Override
694
      public void optimizeForDirectExecutor() {}
1✔
695

696
      @Override
697
      public void setCompressor(Compressor compressor) {}
1✔
698

699
      @Override
700
      public void setDecompressor(Decompressor decompressor) {}
×
701

702
      @Override public Attributes getAttributes() {
703
        return serverStreamAttributes;
1✔
704
      }
705

706
      @Override
707
      public String getAuthority() {
708
        return InProcessStream.this.authority;
1✔
709
      }
710

711
      @Override
712
      public StatsTraceContext statsTraceContext() {
713
        return statsTraceCtx;
1✔
714
      }
715

716
      @Override
717
      public int streamId() {
718
        return -1;
1✔
719
      }
720

721
      @Override
722
      public void setOnReadyThreshold(int numBytes) {
723
        // noop
724
      }
×
725
    }
726

727
    private class InProcessClientStream implements ClientStream {
728
      final StatsTraceContext statsTraceCtx;
729
      final CallOptions callOptions;
730
      // All callbacks must run in syncContext to avoid possibility of deadlock in direct executors
731
      private ServerStreamListener serverStreamListener;
732
      private final SynchronizationContext syncContext =
1✔
733
          new SynchronizationContext(uncaughtExceptionHandler);
1✔
734
      @GuardedBy("this")
735
      private int serverRequested;
736
      @GuardedBy("this")
1✔
737
      private ArrayDeque<StreamListener.MessageProducer> serverReceiveQueue =
738
          new ArrayDeque<>();
739
      @GuardedBy("this")
740
      private boolean serverNotifyHalfClose;
741
      // Only is intended to prevent double-close when server closes.
742
      @GuardedBy("this")
743
      private boolean closed;
744
      @GuardedBy("this")
745
      private int outboundSeqNo;
746

747
      InProcessClientStream(
748
          CallOptions callOptions, StatsTraceContext statsTraceContext) {
1✔
749
        this.callOptions = callOptions;
1✔
750
        statsTraceCtx = statsTraceContext;
1✔
751
      }
1✔
752

753
      private synchronized void setListener(ServerStreamListener listener) {
754
        this.serverStreamListener = listener;
1✔
755
      }
1✔
756

757
      @Override
758
      public void request(int numMessages) {
759
        boolean onReady = serverStream.clientRequested(numMessages);
1✔
760
        if (onReady) {
1✔
761
          synchronized (this) {
1✔
762
            if (!closed) {
1✔
763
              syncContext.executeLater(() -> serverStreamListener.onReady());
1✔
764
            }
765
          }
1✔
766
          syncContext.drain();
1✔
767
        }
768
      }
1✔
769

770
      // This method is the only reason we have to synchronize field accesses.
771
      /**
772
       * Client requested more messages.
773
       *
774
       * @return whether onReady should be called on the server
775
       */
776
      private boolean serverRequested(int numMessages) {
777
        boolean previouslyReady;
778
        boolean nowReady;
779
        synchronized (this) {
1✔
780
          if (closed) {
1✔
781
            return false;
1✔
782
          }
783
          previouslyReady = serverRequested > 0;
1✔
784
          serverRequested += numMessages;
1✔
785

786
          while (serverRequested > 0 && !serverReceiveQueue.isEmpty()) {
1✔
787
            serverRequested--;
1✔
788
            StreamListener.MessageProducer producer = serverReceiveQueue.poll();
1✔
789
            syncContext.executeLater(() -> serverStreamListener.messagesAvailable(producer));
1✔
790
          }
1✔
791

792
          if (serverReceiveQueue.isEmpty() && serverNotifyHalfClose) {
1✔
793
            serverNotifyHalfClose = false;
1✔
794
            syncContext.executeLater(() -> serverStreamListener.halfClosed());
1✔
795
          }
796
          nowReady = serverRequested > 0;
1✔
797
        }
1✔
798
        syncContext.drain();
1✔
799
        return !previouslyReady && nowReady;
1✔
800
      }
801

802
      private void serverClosed(Status serverListenerStatus, Status serverTracerStatus) {
803
        internalCancel(serverListenerStatus, serverTracerStatus);
1✔
804
      }
1✔
805

806
      @Override
807
      public void writeMessage(InputStream message) {
808
        long messageLength;
809
        try {
810
          if (assumedMessageSize != -1) {
1✔
811
            messageLength = assumedMessageSize;
1✔
812
          } else if (message instanceof KnownLength || message instanceof ByteArrayInputStream) {
1✔
813
            messageLength = message.available();
1✔
814
          } else {
815
            InputStream oldMessage = message;
×
816
            byte[] payload = ByteStreams.toByteArray(message);
×
817
            messageLength = payload.length;
×
818
            message = new ByteArrayInputStream(payload);
×
819
            oldMessage.close();
×
820
          }
821
        } catch (Exception e) {
×
822
          throw new RuntimeException("Error processing the message length", e);
×
823
        }
1✔
824
        synchronized (this) {
1✔
825
          if (closed) {
1✔
826
            return;
1✔
827
          }
828
          statsTraceCtx.outboundMessage(outboundSeqNo);
1✔
829
          statsTraceCtx.outboundMessageSent(outboundSeqNo, -1, -1);
1✔
830
          serverStream.statsTraceCtx.inboundMessage(outboundSeqNo);
1✔
831
          serverStream.statsTraceCtx.inboundMessageRead(outboundSeqNo, -1, -1);
1✔
832
          statsTraceCtx.outboundUncompressedSize(messageLength);
1✔
833
          statsTraceCtx.outboundWireSize(messageLength);
1✔
834
          // messageLength should be same at receiver's end as no actual wire is involved.
835
          serverStream.statsTraceCtx.inboundUncompressedSize(messageLength);
1✔
836
          serverStream.statsTraceCtx.inboundWireSize(messageLength);
1✔
837
          outboundSeqNo++;
1✔
838
          StreamListener.MessageProducer producer = new SingleMessageProducer(message);
1✔
839
          if (serverRequested > 0) {
1✔
840
            serverRequested--;
1✔
841
            syncContext.executeLater(() -> serverStreamListener.messagesAvailable(producer));
1✔
842
          } else {
843
            serverReceiveQueue.add(producer);
1✔
844
          }
845
        }
1✔
846
        syncContext.drain();
1✔
847
      }
1✔
848

849
      @Override
850
      public void flush() {}
1✔
851

852
      @Override
853
      public synchronized boolean isReady() {
854
        if (closed) {
1✔
855
          return false;
1✔
856
        }
857
        return serverRequested > 0;
1✔
858
      }
859

860
      // Must be thread-safe for shutdownNow()
861
      @Override
862
      public void cancel(Status reason) {
863
        Status serverStatus = cleanStatus(reason, includeCauseWithStatus);
1✔
864
        if (!internalCancel(serverStatus, serverStatus)) {
1✔
865
          return;
1✔
866
        }
867
        serverStream.clientCancelled(reason);
1✔
868
        streamClosed();
1✔
869
      }
1✔
870

871
      private boolean internalCancel(
872
          Status serverListenerStatus, Status serverTracerStatus) {
873
        synchronized (this) {
1✔
874
          if (closed) {
1✔
875
            return false;
1✔
876
          }
877
          closed = true;
1✔
878

879
          StreamListener.MessageProducer producer;
880
          while ((producer = serverReceiveQueue.poll()) != null) {
1✔
881
            InputStream message;
882
            while ((message = producer.next()) != null) {
1✔
883
              try {
884
                message.close();
1✔
885
              } catch (Throwable t) {
×
886
                log.log(Level.WARNING, "Exception closing stream", t);
×
887
              }
1✔
888
            }
889
          }
1✔
890
          serverStream.statsTraceCtx.streamClosed(serverTracerStatus);
1✔
891
          syncContext.executeLater(() -> serverStreamListener.closed(serverListenerStatus));
1✔
892
        }
1✔
893
        syncContext.drain();
1✔
894
        return true;
1✔
895
      }
896

897
      @Override
898
      public void halfClose() {
899
        synchronized (this) {
1✔
900
          if (closed) {
1✔
901
            return;
1✔
902
          }
903
          if (serverReceiveQueue.isEmpty()) {
1✔
904
            syncContext.executeLater(() -> serverStreamListener.halfClosed());
1✔
905
          } else {
906
            serverNotifyHalfClose = true;
1✔
907
          }
908
        }
1✔
909
        syncContext.drain();
1✔
910
      }
1✔
911

912
      @Override
913
      public void setMessageCompression(boolean enable) {}
×
914

915
      @Override
916
      public void setAuthority(String string) {
917
        InProcessStream.this.authority = string;
×
918
      }
×
919

920
      @Override
921
      public void start(ClientStreamListener listener) {
922
        serverStream.setListener(listener);
1✔
923

924
        synchronized (InProcessTransport.this) {
1✔
925
          statsTraceCtx.clientOutboundHeaders();
1✔
926
          streams.add(InProcessTransport.InProcessStream.this);
1✔
927
          if (GrpcUtil.shouldBeCountedForInUse(callOptions)) {
1✔
928
            inUseState.updateObjectInUse(InProcessTransport.InProcessStream.this, true);
1✔
929
          }
930
          serverTransportListener.streamCreated(serverStream, method.getFullMethodName(), headers);
1✔
931
        }
1✔
932
      }
1✔
933

934
      @Override
935
      public Attributes getAttributes() {
936
        return attributes;
1✔
937
      }
938

939
      @Override
940
      public void optimizeForDirectExecutor() {}
1✔
941

942
      @Override
943
      public void setCompressor(Compressor compressor) {}
1✔
944

945
      @Override
946
      public void setFullStreamDecompression(boolean fullStreamDecompression) {}
×
947

948
      @Override
949
      public void setDecompressorRegistry(DecompressorRegistry decompressorRegistry) {}
1✔
950

951
      @Override
952
      public void setMaxInboundMessageSize(int maxSize) {}
1✔
953

954
      @Override
955
      public void setMaxOutboundMessageSize(int maxSize) {}
1✔
956

957
      @Override
958
      public void setDeadline(Deadline deadline) {
959
        headers.discardAll(TIMEOUT_KEY);
1✔
960
        long effectiveTimeout = max(0, deadline.timeRemaining(TimeUnit.NANOSECONDS));
1✔
961
        headers.put(TIMEOUT_KEY, effectiveTimeout);
1✔
962
      }
1✔
963

964
      @Override
965
      public void appendTimeoutInsight(InsightBuilder insight) {
966
      }
1✔
967
    }
968
  }
969

970
  /**
971
   * Returns a new status with the same code and description.
972
   * If includeCauseWithStatus is true, cause is also included.
973
   *
974
   * <p>For InProcess transport to behave in the same way as the other transports,
975
   * when exchanging statuses between client and server and vice versa,
976
   * the cause should be excluded from the status.
977
   * For easier debugging, the status may be optionally included.
978
   */
979
  private static Status cleanStatus(Status status, boolean includeCauseWithStatus) {
980
    if (status == null) {
1✔
981
      return null;
×
982
    }
983
    Status clientStatus = Status
1✔
984
        .fromCodeValue(status.getCode().value())
1✔
985
        .withDescription(status.getDescription());
1✔
986
    if (includeCauseWithStatus) {
1✔
987
      clientStatus = clientStatus.withCause(status.getCause());
1✔
988
    }
989
    return clientStatus;
1✔
990
  }
991

992
  private static class SingleMessageProducer implements StreamListener.MessageProducer {
993
    private InputStream message;
994

995
    private SingleMessageProducer(InputStream message) {
1✔
996
      this.message = message;
1✔
997
    }
1✔
998

999
    @Nullable
1000
    @Override
1001
    public InputStream next() {
1002
      InputStream messageToReturn = message;
1✔
1003
      message = null;
1✔
1004
      return messageToReturn;
1✔
1005
    }
1006
  }
1007
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc