• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

grpc / grpc-java / #19298

21 Jun 2024 11:31PM CUT coverage: 88.456% (-0.002%) from 88.458%
#19298

push

github

web-flow
netty:Fix Netty composite buffer merging to be compatible with Netty 4.1.111 (#11294) (#11303)

* Use addComponent instead of addFlattenedComponent and do not append to components that are composites.

32060 of 36244 relevant lines covered (88.46%)

0.88 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

92.87
/../inprocess/src/main/java/io/grpc/inprocess/InProcessTransport.java
1
/*
2
 * Copyright 2015 The gRPC Authors
3
 *
4
 * Licensed under the Apache License, Version 2.0 (the "License");
5
 * you may not use this file except in compliance with the License.
6
 * You may obtain a copy of the License at
7
 *
8
 *     http://www.apache.org/licenses/LICENSE-2.0
9
 *
10
 * Unless required by applicable law or agreed to in writing, software
11
 * distributed under the License is distributed on an "AS IS" BASIS,
12
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
 * See the License for the specific language governing permissions and
14
 * limitations under the License.
15
 */
16

17
package io.grpc.inprocess;
18

19
import static com.google.common.base.Preconditions.checkNotNull;
20
import static io.grpc.internal.GrpcUtil.TIMEOUT_KEY;
21
import static java.lang.Math.max;
22

23
import com.google.common.base.MoreObjects;
24
import com.google.common.base.Optional;
25
import com.google.common.util.concurrent.ListenableFuture;
26
import com.google.common.util.concurrent.SettableFuture;
27
import io.grpc.Attributes;
28
import io.grpc.CallOptions;
29
import io.grpc.ClientStreamTracer;
30
import io.grpc.Compressor;
31
import io.grpc.Deadline;
32
import io.grpc.Decompressor;
33
import io.grpc.DecompressorRegistry;
34
import io.grpc.Grpc;
35
import io.grpc.InternalChannelz.SocketStats;
36
import io.grpc.InternalLogId;
37
import io.grpc.InternalMetadata;
38
import io.grpc.Metadata;
39
import io.grpc.MethodDescriptor;
40
import io.grpc.SecurityLevel;
41
import io.grpc.ServerStreamTracer;
42
import io.grpc.Status;
43
import io.grpc.SynchronizationContext;
44
import io.grpc.internal.ClientStream;
45
import io.grpc.internal.ClientStreamListener;
46
import io.grpc.internal.ClientStreamListener.RpcProgress;
47
import io.grpc.internal.ConnectionClientTransport;
48
import io.grpc.internal.GrpcAttributes;
49
import io.grpc.internal.GrpcUtil;
50
import io.grpc.internal.InUseStateAggregator;
51
import io.grpc.internal.InsightBuilder;
52
import io.grpc.internal.ManagedClientTransport;
53
import io.grpc.internal.NoopClientStream;
54
import io.grpc.internal.ObjectPool;
55
import io.grpc.internal.ServerListener;
56
import io.grpc.internal.ServerStream;
57
import io.grpc.internal.ServerStreamListener;
58
import io.grpc.internal.ServerTransport;
59
import io.grpc.internal.ServerTransportListener;
60
import io.grpc.internal.StatsTraceContext;
61
import io.grpc.internal.StreamListener;
62
import java.io.InputStream;
63
import java.net.SocketAddress;
64
import java.util.ArrayDeque;
65
import java.util.ArrayList;
66
import java.util.Collections;
67
import java.util.IdentityHashMap;
68
import java.util.List;
69
import java.util.Locale;
70
import java.util.Set;
71
import java.util.concurrent.Executor;
72
import java.util.concurrent.ScheduledExecutorService;
73
import java.util.concurrent.TimeUnit;
74
import java.util.logging.Level;
75
import java.util.logging.Logger;
76
import javax.annotation.CheckReturnValue;
77
import javax.annotation.Nullable;
78
import javax.annotation.concurrent.GuardedBy;
79
import javax.annotation.concurrent.ThreadSafe;
80

81
@ThreadSafe
82
final class InProcessTransport implements ServerTransport, ConnectionClientTransport {
83
  private static final Logger log = Logger.getLogger(InProcessTransport.class.getName());
1✔
84

85
  private final InternalLogId logId;
86
  private final SocketAddress address;
87
  private final int clientMaxInboundMetadataSize;
88
  private final String authority;
89
  private final String userAgent;
90
  private final Optional<ServerListener> optionalServerListener;
91
  private int serverMaxInboundMetadataSize;
92
  private final boolean includeCauseWithStatus;
93
  private ObjectPool<ScheduledExecutorService> serverSchedulerPool;
94
  private ScheduledExecutorService serverScheduler;
95
  private ServerTransportListener serverTransportListener;
96
  private Attributes serverStreamAttributes;
97
  private ManagedClientTransport.Listener clientTransportListener;
98
  @GuardedBy("this")
99
  private boolean shutdown;
100
  @GuardedBy("this")
101
  private boolean terminated;
102
  @GuardedBy("this")
103
  private Status shutdownStatus;
104
  @GuardedBy("this")
1✔
105
  private final Set<InProcessStream> streams = Collections.newSetFromMap(
1✔
106
          new IdentityHashMap<InProcessStream, Boolean>());
107
  @GuardedBy("this")
108
  private List<ServerStreamTracer.Factory> serverStreamTracerFactories;
109
  private Attributes attributes;
110

111
  private Thread.UncaughtExceptionHandler uncaughtExceptionHandler =
1✔
112
      new Thread.UncaughtExceptionHandler() {
1✔
113
        @Override
114
        public void uncaughtException(Thread t, Throwable e) {
115
          if (e instanceof Error) {
×
116
            throw new Error(e);
×
117
          }
118
          throw new RuntimeException(e);
×
119
        }
120
      };
121

122

123
  @GuardedBy("this")
1✔
124
  private final InUseStateAggregator<InProcessStream> inUseState =
125
      new InUseStateAggregator<InProcessStream>() {
1✔
126
        @Override
127
        protected void handleInUse() {
128
          clientTransportListener.transportInUse(true);
1✔
129
        }
1✔
130

131
        @Override
132
        protected void handleNotInUse() {
133
          clientTransportListener.transportInUse(false);
1✔
134
        }
1✔
135
      };
136

137
  private InProcessTransport(SocketAddress address, int maxInboundMetadataSize, String authority,
138
      String userAgent, Attributes eagAttrs,
139
      Optional<ServerListener> optionalServerListener, boolean includeCauseWithStatus) {
1✔
140
    this.address = address;
1✔
141
    this.clientMaxInboundMetadataSize = maxInboundMetadataSize;
1✔
142
    this.authority = authority;
1✔
143
    this.userAgent = GrpcUtil.getGrpcUserAgent("inprocess", userAgent);
1✔
144
    checkNotNull(eagAttrs, "eagAttrs");
1✔
145
    this.attributes = Attributes.newBuilder()
1✔
146
        .set(GrpcAttributes.ATTR_SECURITY_LEVEL, SecurityLevel.PRIVACY_AND_INTEGRITY)
1✔
147
        .set(GrpcAttributes.ATTR_CLIENT_EAG_ATTRS, eagAttrs)
1✔
148
        .set(Grpc.TRANSPORT_ATTR_REMOTE_ADDR, address)
1✔
149
        .set(Grpc.TRANSPORT_ATTR_LOCAL_ADDR, address)
1✔
150
        .build();
1✔
151
    this.optionalServerListener = optionalServerListener;
1✔
152
    logId = InternalLogId.allocate(getClass(), address.toString());
1✔
153
    this.includeCauseWithStatus = includeCauseWithStatus;
1✔
154
  }
1✔
155

156
  public InProcessTransport(
157
      SocketAddress address, int maxInboundMetadataSize, String authority, String userAgent,
158
      Attributes eagAttrs, boolean includeCauseWithStatus) {
159
    this(address, maxInboundMetadataSize, authority, userAgent, eagAttrs,
1✔
160
        Optional.<ServerListener>absent(), includeCauseWithStatus);
1✔
161
  }
1✔
162

163
  InProcessTransport(
164
      String name, int maxInboundMetadataSize, String authority, String userAgent,
165
      Attributes eagAttrs, ObjectPool<ScheduledExecutorService> serverSchedulerPool,
166
      List<ServerStreamTracer.Factory> serverStreamTracerFactories,
167
      ServerListener serverListener, boolean includeCauseWithStatus) {
168
    this(new InProcessSocketAddress(name), maxInboundMetadataSize, authority, userAgent, eagAttrs,
1✔
169
        Optional.of(serverListener), includeCauseWithStatus);
1✔
170
    this.serverMaxInboundMetadataSize = maxInboundMetadataSize;
1✔
171
    this.serverSchedulerPool = serverSchedulerPool;
1✔
172
    this.serverStreamTracerFactories = serverStreamTracerFactories;
1✔
173
  }
1✔
174

175
  @CheckReturnValue
176
  @Override
177
  public synchronized Runnable start(ManagedClientTransport.Listener listener) {
178
    this.clientTransportListener = listener;
1✔
179
    if (optionalServerListener.isPresent()) {
1✔
180
      serverScheduler = serverSchedulerPool.getObject();
1✔
181
      serverTransportListener = optionalServerListener.get().transportCreated(this);
1✔
182
    } else {
183
      InProcessServer server = InProcessServer.findServer(address);
1✔
184
      if (server != null) {
1✔
185
        serverMaxInboundMetadataSize = server.getMaxInboundMetadataSize();
1✔
186
        serverSchedulerPool = server.getScheduledExecutorServicePool();
1✔
187
        serverScheduler = serverSchedulerPool.getObject();
1✔
188
        serverStreamTracerFactories = server.getStreamTracerFactories();
1✔
189
        // Must be semi-initialized; past this point, can begin receiving requests
190
        serverTransportListener = server.register(this);
1✔
191
      }
192
    }
193
    if (serverTransportListener == null) {
1✔
194
      shutdownStatus = Status.UNAVAILABLE.withDescription("Could not find server: " + address);
1✔
195
      final Status localShutdownStatus = shutdownStatus;
1✔
196
      return new Runnable() {
1✔
197
        @Override
198
        public void run() {
199
          synchronized (InProcessTransport.this) {
1✔
200
            notifyShutdown(localShutdownStatus);
1✔
201
            notifyTerminated();
1✔
202
          }
1✔
203
        }
1✔
204
      };
205
    }
206
    return new Runnable() {
1✔
207
      @Override
208
      @SuppressWarnings("deprecation")
209
      public void run() {
210
        synchronized (InProcessTransport.this) {
1✔
211
          Attributes serverTransportAttrs = Attributes.newBuilder()
1✔
212
              .set(Grpc.TRANSPORT_ATTR_REMOTE_ADDR, address)
1✔
213
              .set(Grpc.TRANSPORT_ATTR_LOCAL_ADDR, address)
1✔
214
              .build();
1✔
215
          serverStreamAttributes = serverTransportListener.transportReady(serverTransportAttrs);
1✔
216
          attributes = clientTransportListener.filterTransport(attributes);
1✔
217
          clientTransportListener.transportReady();
1✔
218
        }
1✔
219
      }
1✔
220
    };
221
  }
222

223
  @Override
224
  public synchronized ClientStream newStream(
225
      MethodDescriptor<?, ?> method, Metadata headers, CallOptions callOptions,
226
      ClientStreamTracer[] tracers) {
227
    StatsTraceContext statsTraceContext =
1✔
228
        StatsTraceContext.newClientContext(tracers, getAttributes(), headers);
1✔
229
    if (shutdownStatus != null) {
1✔
230
      return failedClientStream(statsTraceContext, shutdownStatus);
1✔
231
    }
232

233
    headers.put(GrpcUtil.USER_AGENT_KEY, userAgent);
1✔
234

235
    if (serverMaxInboundMetadataSize != Integer.MAX_VALUE) {
1✔
236
      int metadataSize = metadataSize(headers);
1✔
237
      if (metadataSize > serverMaxInboundMetadataSize) {
1✔
238
        // Other transports would compute a status with:
239
        //   GrpcUtil.httpStatusToGrpcStatus(431 /* Request Header Fields Too Large */);
240
        // However, that isn't handled specially today, so we'd leak HTTP-isms even though we're
241
        // in-process. We go ahead and make a Status, which may need to be updated if
242
        // statuscodes.md is updated.
243
        Status status = Status.RESOURCE_EXHAUSTED.withDescription(
1✔
244
            String.format(
1✔
245
                Locale.US,
246
                "Request metadata larger than %d: %d",
247
                serverMaxInboundMetadataSize,
1✔
248
                metadataSize));
1✔
249
        return failedClientStream(statsTraceContext, status);
1✔
250
      }
251
    }
252

253
    return new InProcessStream(method, headers, callOptions, authority, statsTraceContext)
1✔
254
        .clientStream;
1✔
255
  }
256

257
  private ClientStream failedClientStream(
258
      final StatsTraceContext statsTraceCtx, final Status status) {
259
    return new NoopClientStream() {
1✔
260
        @Override
261
        public void start(ClientStreamListener listener) {
262
          statsTraceCtx.clientOutboundHeaders();
1✔
263
          statsTraceCtx.streamClosed(status);
1✔
264
          listener.closed(status, RpcProgress.PROCESSED, new Metadata());
1✔
265
        }
1✔
266
      };
267
  }
268

269
  @Override
270
  public synchronized void ping(final PingCallback callback, Executor executor) {
271
    if (terminated) {
1✔
272
      final Status shutdownStatus = this.shutdownStatus;
1✔
273
      executor.execute(new Runnable() {
1✔
274
        @Override
275
        public void run() {
276
          callback.onFailure(shutdownStatus.asRuntimeException());
1✔
277
        }
1✔
278
      });
279
    } else {
1✔
280
      executor.execute(new Runnable() {
1✔
281
        @Override
282
        public void run() {
283
          callback.onSuccess(0);
1✔
284
        }
1✔
285
      });
286
    }
287
  }
1✔
288

289
  @Override
290
  public synchronized void shutdown(Status reason) {
291
    // Can be called multiple times: once for ManagedClientTransport, once for ServerTransport.
292
    if (shutdown) {
1✔
293
      return;
1✔
294
    }
295
    shutdownStatus = reason;
1✔
296
    notifyShutdown(reason);
1✔
297
    if (streams.isEmpty()) {
1✔
298
      notifyTerminated();
1✔
299
    }
300
  }
1✔
301

302
  @Override
303
  public synchronized void shutdown() {
304
    shutdown(Status.UNAVAILABLE.withDescription("InProcessTransport shutdown by the server-side"));
1✔
305
  }
1✔
306

307
  @Override
308
  public void shutdownNow(Status reason) {
309
    checkNotNull(reason, "reason");
1✔
310
    List<InProcessStream> streamsCopy;
311
    synchronized (this) {
1✔
312
      shutdown(reason);
1✔
313
      if (terminated) {
1✔
314
        return;
1✔
315
      }
316
      streamsCopy = new ArrayList<>(streams);
1✔
317
    }
1✔
318
    for (InProcessStream stream : streamsCopy) {
1✔
319
      stream.clientStream.cancel(reason);
1✔
320
    }
1✔
321
  }
1✔
322

323
  @Override
324
  public String toString() {
325
    return MoreObjects.toStringHelper(this)
×
326
        .add("logId", logId.getId())
×
327
        .add("address", address)
×
328
        .toString();
×
329
  }
330

331
  @Override
332
  public InternalLogId getLogId() {
333
    return logId;
1✔
334
  }
335

336
  @Override
337
  public Attributes getAttributes() {
338
    return attributes;
1✔
339
  }
340

341
  @Override
342
  public ScheduledExecutorService getScheduledExecutorService() {
343
    return serverScheduler;
1✔
344
  }
345

346
  @Override
347
  public ListenableFuture<SocketStats> getStats() {
348
    SettableFuture<SocketStats> ret = SettableFuture.create();
×
349
    ret.set(null);
×
350
    return ret;
×
351
  }
352

353
  private synchronized void notifyShutdown(Status s) {
354
    if (shutdown) {
1✔
355
      return;
×
356
    }
357
    shutdown = true;
1✔
358
    clientTransportListener.transportShutdown(s);
1✔
359
  }
1✔
360

361
  private synchronized void notifyTerminated() {
362
    if (terminated) {
1✔
363
      return;
×
364
    }
365
    terminated = true;
1✔
366
    if (serverScheduler != null) {
1✔
367
      serverScheduler = serverSchedulerPool.returnObject(serverScheduler);
1✔
368
    }
369
    clientTransportListener.transportTerminated();
1✔
370
    if (serverTransportListener != null) {
1✔
371
      serverTransportListener.transportTerminated();
1✔
372
    }
373
  }
1✔
374

375
  private static int metadataSize(Metadata metadata) {
376
    byte[][] serialized = InternalMetadata.serialize(metadata);
1✔
377
    if (serialized == null) {
1✔
378
      return 0;
×
379
    }
380
    // Calculate based on SETTINGS_MAX_HEADER_LIST_SIZE in RFC 7540 §6.5.2. We could use something
381
    // different, but it's "sane."
382
    long size = 0;
1✔
383
    for (int i = 0; i < serialized.length; i += 2) {
1✔
384
      size += 32 + serialized[i].length + serialized[i + 1].length;
1✔
385
    }
386
    size = Math.min(size, Integer.MAX_VALUE);
1✔
387
    return (int) size;
1✔
388
  }
389

390
  private class InProcessStream {
391
    private final InProcessClientStream clientStream;
392
    private final InProcessServerStream serverStream;
393
    private final CallOptions callOptions;
394
    private final Metadata headers;
395
    private final MethodDescriptor<?, ?> method;
396
    private volatile String authority;
397

398
    private InProcessStream(
399
        MethodDescriptor<?, ?> method, Metadata headers, CallOptions callOptions,
400
        String authority , StatsTraceContext statsTraceContext) {
1✔
401
      this.method = checkNotNull(method, "method");
1✔
402
      this.headers = checkNotNull(headers, "headers");
1✔
403
      this.callOptions = checkNotNull(callOptions, "callOptions");
1✔
404
      this.authority = authority;
1✔
405
      this.clientStream = new InProcessClientStream(callOptions, statsTraceContext);
1✔
406
      this.serverStream = new InProcessServerStream(method, headers);
1✔
407
    }
1✔
408

409
    // Can be called multiple times due to races on both client and server closing at same time.
410
    private void streamClosed() {
411
      synchronized (InProcessTransport.this) {
1✔
412
        boolean justRemovedAnElement = streams.remove(this);
1✔
413
        if (GrpcUtil.shouldBeCountedForInUse(callOptions)) {
1✔
414
          inUseState.updateObjectInUse(this, false);
1✔
415
        }
416
        if (streams.isEmpty() && justRemovedAnElement) {
1✔
417
          if (shutdown) {
1✔
418
            notifyTerminated();
1✔
419
          }
420
        }
421
      }
1✔
422
    }
1✔
423

424
    private class InProcessServerStream implements ServerStream {
425
      final StatsTraceContext statsTraceCtx;
426
      // All callbacks must run in syncContext to avoid possibility of deadlock in direct executors
427
      private ClientStreamListener clientStreamListener;
428
      private final SynchronizationContext syncContext =
1✔
429
          new SynchronizationContext(uncaughtExceptionHandler);
1✔
430
      @GuardedBy("this")
431
      private int clientRequested;
432
      @GuardedBy("this")
1✔
433
      private ArrayDeque<StreamListener.MessageProducer> clientReceiveQueue =
434
          new ArrayDeque<>();
435
      @GuardedBy("this")
436
      private Status clientNotifyStatus;
437
      @GuardedBy("this")
438
      private Metadata clientNotifyTrailers;
439
      // Only is intended to prevent double-close when client cancels.
440
      @GuardedBy("this")
441
      private boolean closed;
442
      @GuardedBy("this")
443
      private int outboundSeqNo;
444

445
      InProcessServerStream(MethodDescriptor<?, ?> method, Metadata headers) {
1✔
446
        statsTraceCtx = StatsTraceContext.newServerContext(
1✔
447
            serverStreamTracerFactories, method.getFullMethodName(), headers);
1✔
448
      }
1✔
449

450
      private synchronized void setListener(ClientStreamListener listener) {
451
        clientStreamListener = listener;
1✔
452
      }
1✔
453

454
      @Override
455
      public void setListener(ServerStreamListener serverStreamListener) {
456
        clientStream.setListener(serverStreamListener);
1✔
457
      }
1✔
458

459
      @Override
460
      public void request(int numMessages) {
461
        boolean onReady = clientStream.serverRequested(numMessages);
1✔
462
        if (onReady) {
1✔
463
          synchronized (this) {
1✔
464
            if (!closed) {
1✔
465
              syncContext.executeLater(() -> clientStreamListener.onReady());
1✔
466
            }
467
          }
1✔
468
        }
469
        syncContext.drain();
1✔
470
      }
1✔
471

472
      // This method is the only reason we have to synchronize field accesses.
473
      /**
474
       * Client requested more messages.
475
       *
476
       * @return whether onReady should be called on the server
477
       */
478
      private boolean clientRequested(int numMessages) {
479
        boolean previouslyReady;
480
        boolean nowReady;
481
        synchronized (this) {
1✔
482
          if (closed) {
1✔
483
            return false;
1✔
484
          }
485

486
          previouslyReady = clientRequested > 0;
1✔
487
          clientRequested += numMessages;
1✔
488
          while (clientRequested > 0 && !clientReceiveQueue.isEmpty()) {
1✔
489
            clientRequested--;
1✔
490
            StreamListener.MessageProducer producer = clientReceiveQueue.poll();
1✔
491
            syncContext.executeLater(() -> clientStreamListener.messagesAvailable(producer));
1✔
492
          }
1✔
493

494
          if (clientReceiveQueue.isEmpty() && clientNotifyStatus != null) {
1✔
495
            closed = true;
1✔
496
            clientStream.statsTraceCtx.clientInboundTrailers(clientNotifyTrailers);
1✔
497
            clientStream.statsTraceCtx.streamClosed(clientNotifyStatus);
1✔
498
            Status notifyStatus = this.clientNotifyStatus;
1✔
499
            Metadata notifyTrailers = this.clientNotifyTrailers;
1✔
500
            syncContext.executeLater(() ->
1✔
501
                clientStreamListener.closed(notifyStatus, RpcProgress.PROCESSED, notifyTrailers));
1✔
502
          }
503

504
          nowReady = clientRequested > 0;
1✔
505
        }
1✔
506

507
        syncContext.drain();
1✔
508
        return !previouslyReady && nowReady;
1✔
509
      }
510

511
      private void clientCancelled(Status status) {
512
        internalCancel(status);
1✔
513
      }
1✔
514

515
      @Override
516
      public void writeMessage(InputStream message) {
517
        synchronized (this) {
1✔
518
          if (closed) {
1✔
519
            return;
1✔
520
          }
521
          statsTraceCtx.outboundMessage(outboundSeqNo);
1✔
522
          statsTraceCtx.outboundMessageSent(outboundSeqNo, -1, -1);
1✔
523
          clientStream.statsTraceCtx.inboundMessage(outboundSeqNo);
1✔
524
          clientStream.statsTraceCtx.inboundMessageRead(outboundSeqNo, -1, -1);
1✔
525
          outboundSeqNo++;
1✔
526
          StreamListener.MessageProducer producer = new SingleMessageProducer(message);
1✔
527
          if (clientRequested > 0) {
1✔
528
            clientRequested--;
1✔
529
            syncContext.executeLater(() -> clientStreamListener.messagesAvailable(producer));
1✔
530
          } else {
531
            clientReceiveQueue.add(producer);
1✔
532
          }
533
        }
1✔
534

535
        syncContext.drain();
1✔
536
      }
1✔
537

538
      @Override
539
      public void flush() {}
1✔
540

541
      @Override
542
      public synchronized boolean isReady() {
543
        if (closed) {
1✔
544
          return false;
×
545
        }
546
        return clientRequested > 0;
1✔
547
      }
548

549
      @Override
550
      public void writeHeaders(Metadata headers, boolean flush) {
551
        if (clientMaxInboundMetadataSize != Integer.MAX_VALUE) {
1✔
552
          int metadataSize = metadataSize(headers);
1✔
553
          if (metadataSize > clientMaxInboundMetadataSize) {
1✔
554
            Status serverStatus = Status.CANCELLED.withDescription("Client cancelled the RPC");
1✔
555
            clientStream.serverClosed(serverStatus, serverStatus);
1✔
556
            // Other transports provide very little information in this case. We go ahead and make a
557
            // Status, which may need to be updated if statuscodes.md is updated.
558
            Status failedStatus = Status.RESOURCE_EXHAUSTED.withDescription(
1✔
559
                String.format(
1✔
560
                    Locale.US,
561
                    "Response header metadata larger than %d: %d",
562
                    clientMaxInboundMetadataSize,
1✔
563
                    metadataSize));
1✔
564
            notifyClientClose(failedStatus, new Metadata());
1✔
565
            return;
1✔
566
          }
567
        }
568

569
        synchronized (this) {
1✔
570
          if (closed) {
1✔
571
            return;
1✔
572
          }
573

574
          clientStream.statsTraceCtx.clientInboundHeaders();
1✔
575
          syncContext.executeLater(() -> clientStreamListener.headersRead(headers));
1✔
576
        }
1✔
577
        syncContext.drain();
1✔
578
      }
1✔
579

580
      @Override
581
      public void close(Status status, Metadata trailers) {
582
        // clientStream.serverClosed must happen before clientStreamListener.closed, otherwise
583
        // clientStreamListener.closed can trigger clientStream.cancel (see code in
584
        // ClientCalls.blockingUnaryCall), which may race with clientStream.serverClosed as both are
585
        // calling internalCancel().
586
        clientStream.serverClosed(Status.OK, status);
1✔
587

588
        if (clientMaxInboundMetadataSize != Integer.MAX_VALUE) {
1✔
589
          int statusSize = status.getDescription() == null ? 0 : status.getDescription().length();
1✔
590
          // Go ahead and throw in the status description's length, since that could be very long.
591
          int metadataSize = metadataSize(trailers) + statusSize;
1✔
592
          if (metadataSize > clientMaxInboundMetadataSize) {
1✔
593
            // Override the status for the client, but not the server. Transports do not guarantee
594
            // notifying the server of the failure.
595

596
            // Other transports provide very little information in this case. We go ahead and make a
597
            // Status, which may need to be updated if statuscodes.md is updated.
598
            status = Status.RESOURCE_EXHAUSTED.withDescription(
1✔
599
                String.format(
1✔
600
                    Locale.US,
601
                    "Response header metadata larger than %d: %d",
602
                    clientMaxInboundMetadataSize,
1✔
603
                    metadataSize));
1✔
604
            trailers = new Metadata();
1✔
605
          }
606
        }
607

608
        notifyClientClose(status, trailers);
1✔
609
      }
1✔
610

611
      /** clientStream.serverClosed() must be called before this method */
612
      private void notifyClientClose(Status status, Metadata trailers) {
613
        Status clientStatus = cleanStatus(status, includeCauseWithStatus);
1✔
614
        synchronized (this) {
1✔
615
          if (closed) {
1✔
616
            return;
1✔
617
          }
618
          if (clientReceiveQueue.isEmpty()) {
1✔
619
            closed = true;
1✔
620
            clientStream.statsTraceCtx.clientInboundTrailers(trailers);
1✔
621
            clientStream.statsTraceCtx.streamClosed(clientStatus);
1✔
622
            syncContext.executeLater(
1✔
623
                () -> clientStreamListener.closed(clientStatus, RpcProgress.PROCESSED, trailers));
1✔
624
          } else {
625
            clientNotifyStatus = clientStatus;
1✔
626
            clientNotifyTrailers = trailers;
1✔
627
          }
628
        }
1✔
629
        syncContext.drain();
1✔
630
        streamClosed();
1✔
631
      }
1✔
632

633
      @Override
634
      public void cancel(Status status) {
635
        if (!internalCancel(Status.CANCELLED.withDescription("server cancelled stream"))) {
1✔
636
          return;
1✔
637
        }
638
        clientStream.serverClosed(status, status);
1✔
639
        streamClosed();
1✔
640
      }
1✔
641

642
      private boolean internalCancel(Status clientStatus) {
643
        synchronized (this) {
1✔
644
          if (closed) {
1✔
645
            return false;
1✔
646
          }
647
          closed = true;
1✔
648
          StreamListener.MessageProducer producer;
649
          while ((producer = clientReceiveQueue.poll()) != null) {
1✔
650
            InputStream message;
651
            while ((message = producer.next()) != null) {
×
652
              try {
653
                message.close();
×
654
              } catch (Throwable t) {
×
655
                log.log(Level.WARNING, "Exception closing stream", t);
×
656
              }
×
657
            }
658
          }
×
659
          clientStream.statsTraceCtx.streamClosed(clientStatus);
1✔
660
          syncContext.executeLater(
1✔
661
              () ->
662
                  clientStreamListener.closed(clientStatus, RpcProgress.PROCESSED, new Metadata()));
1✔
663
        }
1✔
664
        syncContext.drain();
1✔
665
        return true;
1✔
666
      }
667

668
      @Override
669
      public void setMessageCompression(boolean enable) {
670
        // noop
671
      }
×
672

673
      @Override
674
      public void optimizeForDirectExecutor() {}
1✔
675

676
      @Override
677
      public void setCompressor(Compressor compressor) {}
1✔
678

679
      @Override
680
      public void setDecompressor(Decompressor decompressor) {}
×
681

682
      @Override public Attributes getAttributes() {
683
        return serverStreamAttributes;
1✔
684
      }
685

686
      @Override
687
      public String getAuthority() {
688
        return InProcessStream.this.authority;
1✔
689
      }
690

691
      @Override
692
      public StatsTraceContext statsTraceContext() {
693
        return statsTraceCtx;
1✔
694
      }
695

696
      @Override
697
      public int streamId() {
698
        return -1;
1✔
699
      }
700

701
      @Override
702
      public void setOnReadyThreshold(int numBytes) {
703
        // noop
704
      }
×
705
    }
706

707
    private class InProcessClientStream implements ClientStream {
708
      final StatsTraceContext statsTraceCtx;
709
      final CallOptions callOptions;
710
      // All callbacks must run in syncContext to avoid possibility of deadlock in direct executors
711
      private ServerStreamListener serverStreamListener;
712
      private final SynchronizationContext syncContext =
1✔
713
          new SynchronizationContext(uncaughtExceptionHandler);
1✔
714
      @GuardedBy("this")
715
      private int serverRequested;
716
      @GuardedBy("this")
1✔
717
      private ArrayDeque<StreamListener.MessageProducer> serverReceiveQueue =
718
          new ArrayDeque<>();
719
      @GuardedBy("this")
720
      private boolean serverNotifyHalfClose;
721
      // Only is intended to prevent double-close when server closes.
722
      @GuardedBy("this")
723
      private boolean closed;
724
      @GuardedBy("this")
725
      private int outboundSeqNo;
726

727
      InProcessClientStream(
728
          CallOptions callOptions, StatsTraceContext statsTraceContext) {
1✔
729
        this.callOptions = callOptions;
1✔
730
        statsTraceCtx = statsTraceContext;
1✔
731
      }
1✔
732

733
      private synchronized void setListener(ServerStreamListener listener) {
734
        this.serverStreamListener = listener;
1✔
735
      }
1✔
736

737
      @Override
738
      public void request(int numMessages) {
739
        boolean onReady = serverStream.clientRequested(numMessages);
1✔
740
        if (onReady) {
1✔
741
          synchronized (this) {
1✔
742
            if (!closed) {
1✔
743
              syncContext.executeLater(() -> serverStreamListener.onReady());
1✔
744
            }
745
          }
1✔
746
          syncContext.drain();
1✔
747
        }
748
      }
1✔
749

750
      // This method is the only reason we have to synchronize field accesses.
751
      /**
752
       * Client requested more messages.
753
       *
754
       * @return whether onReady should be called on the server
755
       */
756
      private boolean serverRequested(int numMessages) {
757
        boolean previouslyReady;
758
        boolean nowReady;
759
        synchronized (this) {
1✔
760
          if (closed) {
1✔
761
            return false;
1✔
762
          }
763
          previouslyReady = serverRequested > 0;
1✔
764
          serverRequested += numMessages;
1✔
765

766
          while (serverRequested > 0 && !serverReceiveQueue.isEmpty()) {
1✔
767
            serverRequested--;
1✔
768
            StreamListener.MessageProducer producer = serverReceiveQueue.poll();
1✔
769
            syncContext.executeLater(() -> serverStreamListener.messagesAvailable(producer));
1✔
770
          }
1✔
771

772
          if (serverReceiveQueue.isEmpty() && serverNotifyHalfClose) {
1✔
773
            serverNotifyHalfClose = false;
1✔
774
            syncContext.executeLater(() -> serverStreamListener.halfClosed());
1✔
775
          }
776
          nowReady = serverRequested > 0;
1✔
777
        }
1✔
778
        syncContext.drain();
1✔
779
        return !previouslyReady && nowReady;
1✔
780
      }
781

782
      private void serverClosed(Status serverListenerStatus, Status serverTracerStatus) {
783
        internalCancel(serverListenerStatus, serverTracerStatus);
1✔
784
      }
1✔
785

786
      @Override
787
      public void writeMessage(InputStream message) {
788
        synchronized (this) {
1✔
789
          if (closed) {
1✔
790
            return;
1✔
791
          }
792
          statsTraceCtx.outboundMessage(outboundSeqNo);
1✔
793
          statsTraceCtx.outboundMessageSent(outboundSeqNo, -1, -1);
1✔
794
          serverStream.statsTraceCtx.inboundMessage(outboundSeqNo);
1✔
795
          serverStream.statsTraceCtx.inboundMessageRead(outboundSeqNo, -1, -1);
1✔
796
          outboundSeqNo++;
1✔
797
          StreamListener.MessageProducer producer = new SingleMessageProducer(message);
1✔
798
          if (serverRequested > 0) {
1✔
799
            serverRequested--;
1✔
800
            syncContext.executeLater(() -> serverStreamListener.messagesAvailable(producer));
1✔
801
          } else {
802
            serverReceiveQueue.add(producer);
1✔
803
          }
804
        }
1✔
805
        syncContext.drain();
1✔
806
      }
1✔
807

808
      @Override
809
      public void flush() {}
1✔
810

811
      @Override
812
      public synchronized boolean isReady() {
813
        if (closed) {
1✔
814
          return false;
×
815
        }
816
        return serverRequested > 0;
1✔
817
      }
818

819
      // Must be thread-safe for shutdownNow()
820
      @Override
821
      public void cancel(Status reason) {
822
        Status serverStatus = cleanStatus(reason, includeCauseWithStatus);
1✔
823
        if (!internalCancel(serverStatus, serverStatus)) {
1✔
824
          return;
1✔
825
        }
826
        serverStream.clientCancelled(reason);
1✔
827
        streamClosed();
1✔
828
      }
1✔
829

830
      private boolean internalCancel(
831
          Status serverListenerStatus, Status serverTracerStatus) {
832
        synchronized (this) {
1✔
833
          if (closed) {
1✔
834
            return false;
1✔
835
          }
836
          closed = true;
1✔
837

838
          StreamListener.MessageProducer producer;
839
          while ((producer = serverReceiveQueue.poll()) != null) {
1✔
840
            InputStream message;
841
            while ((message = producer.next()) != null) {
1✔
842
              try {
843
                message.close();
1✔
844
              } catch (Throwable t) {
×
845
                log.log(Level.WARNING, "Exception closing stream", t);
×
846
              }
1✔
847
            }
848
          }
1✔
849
          serverStream.statsTraceCtx.streamClosed(serverTracerStatus);
1✔
850
          syncContext.executeLater(() -> serverStreamListener.closed(serverListenerStatus));
1✔
851
        }
1✔
852
        syncContext.drain();
1✔
853
        return true;
1✔
854
      }
855

856
      @Override
857
      public void halfClose() {
858
        synchronized (this) {
1✔
859
          if (closed) {
1✔
860
            return;
1✔
861
          }
862
          if (serverReceiveQueue.isEmpty()) {
1✔
863
            syncContext.executeLater(() -> serverStreamListener.halfClosed());
1✔
864
          } else {
865
            serverNotifyHalfClose = true;
1✔
866
          }
867
        }
1✔
868
        syncContext.drain();
1✔
869
      }
1✔
870

871
      @Override
872
      public void setMessageCompression(boolean enable) {}
×
873

874
      @Override
875
      public void setAuthority(String string) {
876
        InProcessStream.this.authority = string;
×
877
      }
×
878

879
      @Override
880
      public void start(ClientStreamListener listener) {
881
        serverStream.setListener(listener);
1✔
882

883
        synchronized (InProcessTransport.this) {
1✔
884
          statsTraceCtx.clientOutboundHeaders();
1✔
885
          streams.add(InProcessTransport.InProcessStream.this);
1✔
886
          if (GrpcUtil.shouldBeCountedForInUse(callOptions)) {
1✔
887
            inUseState.updateObjectInUse(InProcessTransport.InProcessStream.this, true);
1✔
888
          }
889
          serverTransportListener.streamCreated(serverStream, method.getFullMethodName(), headers);
1✔
890
        }
1✔
891
      }
1✔
892

893
      @Override
894
      public Attributes getAttributes() {
895
        return attributes;
1✔
896
      }
897

898
      @Override
899
      public void optimizeForDirectExecutor() {}
1✔
900

901
      @Override
902
      public void setCompressor(Compressor compressor) {}
1✔
903

904
      @Override
905
      public void setFullStreamDecompression(boolean fullStreamDecompression) {}
×
906

907
      @Override
908
      public void setDecompressorRegistry(DecompressorRegistry decompressorRegistry) {}
1✔
909

910
      @Override
911
      public void setMaxInboundMessageSize(int maxSize) {}
1✔
912

913
      @Override
914
      public void setMaxOutboundMessageSize(int maxSize) {}
1✔
915

916
      @Override
917
      public void setDeadline(Deadline deadline) {
918
        headers.discardAll(TIMEOUT_KEY);
1✔
919
        long effectiveTimeout = max(0, deadline.timeRemaining(TimeUnit.NANOSECONDS));
1✔
920
        headers.put(TIMEOUT_KEY, effectiveTimeout);
1✔
921
      }
1✔
922

923
      @Override
924
      public void appendTimeoutInsight(InsightBuilder insight) {
925
      }
1✔
926
    }
927
  }
928

929
  /**
930
   * Returns a new status with the same code and description.
931
   * If includeCauseWithStatus is true, cause is also included.
932
   *
933
   * <p>For InProcess transport to behave in the same way as the other transports,
934
   * when exchanging statuses between client and server and vice versa,
935
   * the cause should be excluded from the status.
936
   * For easier debugging, the status may be optionally included.
937
   */
938
  private static Status cleanStatus(Status status, boolean includeCauseWithStatus) {
939
    if (status == null) {
1✔
940
      return null;
×
941
    }
942
    Status clientStatus = Status
1✔
943
        .fromCodeValue(status.getCode().value())
1✔
944
        .withDescription(status.getDescription());
1✔
945
    if (includeCauseWithStatus) {
1✔
946
      clientStatus = clientStatus.withCause(status.getCause());
1✔
947
    }
948
    return clientStatus;
1✔
949
  }
950

951
  private static class SingleMessageProducer implements StreamListener.MessageProducer {
952
    private InputStream message;
953

954
    private SingleMessageProducer(InputStream message) {
1✔
955
      this.message = message;
1✔
956
    }
1✔
957

958
    @Nullable
959
    @Override
960
    public InputStream next() {
961
      InputStream messageToReturn = message;
1✔
962
      message = null;
1✔
963
      return messageToReturn;
1✔
964
    }
965
  }
966
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc