• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

grpc / grpc-java / #19156

08 Apr 2024 06:30PM CUT coverage: 88.262% (-0.009%) from 88.271%
#19156

push

github

web-flow
buildscripts: Migrate PSM Interop to Artifact Registry (#11079) (#11090)

From Container Registry (gcr.io) to Artifact Registry (pkg.dev).

30912 of 35023 relevant lines covered (88.26%)

0.88 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

93.07
/../inprocess/src/main/java/io/grpc/inprocess/InProcessTransport.java
1
/*
2
 * Copyright 2015 The gRPC Authors
3
 *
4
 * Licensed under the Apache License, Version 2.0 (the "License");
5
 * you may not use this file except in compliance with the License.
6
 * You may obtain a copy of the License at
7
 *
8
 *     http://www.apache.org/licenses/LICENSE-2.0
9
 *
10
 * Unless required by applicable law or agreed to in writing, software
11
 * distributed under the License is distributed on an "AS IS" BASIS,
12
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
 * See the License for the specific language governing permissions and
14
 * limitations under the License.
15
 */
16

17
package io.grpc.inprocess;
18

19
import static com.google.common.base.Preconditions.checkNotNull;
20
import static io.grpc.internal.GrpcUtil.TIMEOUT_KEY;
21
import static java.lang.Math.max;
22

23
import com.google.common.base.MoreObjects;
24
import com.google.common.base.Optional;
25
import com.google.common.util.concurrent.ListenableFuture;
26
import com.google.common.util.concurrent.SettableFuture;
27
import io.grpc.Attributes;
28
import io.grpc.CallOptions;
29
import io.grpc.ClientStreamTracer;
30
import io.grpc.Compressor;
31
import io.grpc.Deadline;
32
import io.grpc.Decompressor;
33
import io.grpc.DecompressorRegistry;
34
import io.grpc.Grpc;
35
import io.grpc.InternalChannelz.SocketStats;
36
import io.grpc.InternalLogId;
37
import io.grpc.InternalMetadata;
38
import io.grpc.Metadata;
39
import io.grpc.MethodDescriptor;
40
import io.grpc.SecurityLevel;
41
import io.grpc.ServerStreamTracer;
42
import io.grpc.Status;
43
import io.grpc.SynchronizationContext;
44
import io.grpc.internal.ClientStream;
45
import io.grpc.internal.ClientStreamListener;
46
import io.grpc.internal.ClientStreamListener.RpcProgress;
47
import io.grpc.internal.ConnectionClientTransport;
48
import io.grpc.internal.GrpcAttributes;
49
import io.grpc.internal.GrpcUtil;
50
import io.grpc.internal.InUseStateAggregator;
51
import io.grpc.internal.InsightBuilder;
52
import io.grpc.internal.ManagedClientTransport;
53
import io.grpc.internal.NoopClientStream;
54
import io.grpc.internal.ObjectPool;
55
import io.grpc.internal.ServerListener;
56
import io.grpc.internal.ServerStream;
57
import io.grpc.internal.ServerStreamListener;
58
import io.grpc.internal.ServerTransport;
59
import io.grpc.internal.ServerTransportListener;
60
import io.grpc.internal.StatsTraceContext;
61
import io.grpc.internal.StreamListener;
62
import java.io.InputStream;
63
import java.net.SocketAddress;
64
import java.util.ArrayDeque;
65
import java.util.ArrayList;
66
import java.util.Collections;
67
import java.util.IdentityHashMap;
68
import java.util.List;
69
import java.util.Locale;
70
import java.util.Set;
71
import java.util.concurrent.Executor;
72
import java.util.concurrent.ScheduledExecutorService;
73
import java.util.concurrent.TimeUnit;
74
import java.util.logging.Level;
75
import java.util.logging.Logger;
76
import javax.annotation.CheckReturnValue;
77
import javax.annotation.Nullable;
78
import javax.annotation.concurrent.GuardedBy;
79
import javax.annotation.concurrent.ThreadSafe;
80

81
@ThreadSafe
82
final class InProcessTransport implements ServerTransport, ConnectionClientTransport {
83
  private static final Logger log = Logger.getLogger(InProcessTransport.class.getName());
1✔
84

85
  private final InternalLogId logId;
86
  private final SocketAddress address;
87
  private final int clientMaxInboundMetadataSize;
88
  private final String authority;
89
  private final String userAgent;
90
  private final Optional<ServerListener> optionalServerListener;
91
  private int serverMaxInboundMetadataSize;
92
  private final boolean includeCauseWithStatus;
93
  private ObjectPool<ScheduledExecutorService> serverSchedulerPool;
94
  private ScheduledExecutorService serverScheduler;
95
  private ServerTransportListener serverTransportListener;
96
  private Attributes serverStreamAttributes;
97
  private ManagedClientTransport.Listener clientTransportListener;
98
  @GuardedBy("this")
99
  private boolean shutdown;
100
  @GuardedBy("this")
101
  private boolean terminated;
102
  @GuardedBy("this")
103
  private Status shutdownStatus;
104
  @GuardedBy("this")
1✔
105
  private final Set<InProcessStream> streams = Collections.newSetFromMap(
1✔
106
          new IdentityHashMap<InProcessStream, Boolean>());
107
  @GuardedBy("this")
108
  private List<ServerStreamTracer.Factory> serverStreamTracerFactories;
109
  private final Attributes attributes;
110

111
  private Thread.UncaughtExceptionHandler uncaughtExceptionHandler =
1✔
112
      new Thread.UncaughtExceptionHandler() {
1✔
113
        @Override
114
        public void uncaughtException(Thread t, Throwable e) {
115
          if (e instanceof Error) {
×
116
            throw new Error(e);
×
117
          }
118
          throw new RuntimeException(e);
×
119
        }
120
      };
121

122

123
  @GuardedBy("this")
1✔
124
  private final InUseStateAggregator<InProcessStream> inUseState =
125
      new InUseStateAggregator<InProcessStream>() {
1✔
126
        @Override
127
        protected void handleInUse() {
128
          clientTransportListener.transportInUse(true);
1✔
129
        }
1✔
130

131
        @Override
132
        protected void handleNotInUse() {
133
          clientTransportListener.transportInUse(false);
1✔
134
        }
1✔
135
      };
136

137
  private InProcessTransport(SocketAddress address, int maxInboundMetadataSize, String authority,
138
      String userAgent, Attributes eagAttrs,
139
      Optional<ServerListener> optionalServerListener, boolean includeCauseWithStatus) {
1✔
140
    this.address = address;
1✔
141
    this.clientMaxInboundMetadataSize = maxInboundMetadataSize;
1✔
142
    this.authority = authority;
1✔
143
    this.userAgent = GrpcUtil.getGrpcUserAgent("inprocess", userAgent);
1✔
144
    checkNotNull(eagAttrs, "eagAttrs");
1✔
145
    this.attributes = Attributes.newBuilder()
1✔
146
        .set(GrpcAttributes.ATTR_SECURITY_LEVEL, SecurityLevel.PRIVACY_AND_INTEGRITY)
1✔
147
        .set(GrpcAttributes.ATTR_CLIENT_EAG_ATTRS, eagAttrs)
1✔
148
        .set(Grpc.TRANSPORT_ATTR_REMOTE_ADDR, address)
1✔
149
        .set(Grpc.TRANSPORT_ATTR_LOCAL_ADDR, address)
1✔
150
        .build();
1✔
151
    this.optionalServerListener = optionalServerListener;
1✔
152
    logId = InternalLogId.allocate(getClass(), address.toString());
1✔
153
    this.includeCauseWithStatus = includeCauseWithStatus;
1✔
154
  }
1✔
155

156
  public InProcessTransport(
157
      SocketAddress address, int maxInboundMetadataSize, String authority, String userAgent,
158
      Attributes eagAttrs, boolean includeCauseWithStatus) {
159
    this(address, maxInboundMetadataSize, authority, userAgent, eagAttrs,
1✔
160
        Optional.<ServerListener>absent(), includeCauseWithStatus);
1✔
161
  }
1✔
162

163
  InProcessTransport(
164
      String name, int maxInboundMetadataSize, String authority, String userAgent,
165
      Attributes eagAttrs, ObjectPool<ScheduledExecutorService> serverSchedulerPool,
166
      List<ServerStreamTracer.Factory> serverStreamTracerFactories,
167
      ServerListener serverListener, boolean includeCauseWithStatus) {
168
    this(new InProcessSocketAddress(name), maxInboundMetadataSize, authority, userAgent, eagAttrs,
1✔
169
        Optional.of(serverListener), includeCauseWithStatus);
1✔
170
    this.serverMaxInboundMetadataSize = maxInboundMetadataSize;
1✔
171
    this.serverSchedulerPool = serverSchedulerPool;
1✔
172
    this.serverStreamTracerFactories = serverStreamTracerFactories;
1✔
173
  }
1✔
174

175
  @CheckReturnValue
176
  @Override
177
  public synchronized Runnable start(ManagedClientTransport.Listener listener) {
178
    this.clientTransportListener = listener;
1✔
179
    if (optionalServerListener.isPresent()) {
1✔
180
      serverScheduler = serverSchedulerPool.getObject();
1✔
181
      serverTransportListener = optionalServerListener.get().transportCreated(this);
1✔
182
    } else {
183
      InProcessServer server = InProcessServer.findServer(address);
1✔
184
      if (server != null) {
1✔
185
        serverMaxInboundMetadataSize = server.getMaxInboundMetadataSize();
1✔
186
        serverSchedulerPool = server.getScheduledExecutorServicePool();
1✔
187
        serverScheduler = serverSchedulerPool.getObject();
1✔
188
        serverStreamTracerFactories = server.getStreamTracerFactories();
1✔
189
        // Must be semi-initialized; past this point, can begin receiving requests
190
        serverTransportListener = server.register(this);
1✔
191
      }
192
    }
193
    if (serverTransportListener == null) {
1✔
194
      shutdownStatus = Status.UNAVAILABLE.withDescription("Could not find server: " + address);
1✔
195
      final Status localShutdownStatus = shutdownStatus;
1✔
196
      return new Runnable() {
1✔
197
        @Override
198
        public void run() {
199
          synchronized (InProcessTransport.this) {
1✔
200
            notifyShutdown(localShutdownStatus);
1✔
201
            notifyTerminated();
1✔
202
          }
1✔
203
        }
1✔
204
      };
205
    }
206
    return new Runnable() {
1✔
207
      @Override
208
      @SuppressWarnings("deprecation")
209
      public void run() {
210
        synchronized (InProcessTransport.this) {
1✔
211
          Attributes serverTransportAttrs = Attributes.newBuilder()
1✔
212
              .set(Grpc.TRANSPORT_ATTR_REMOTE_ADDR, address)
1✔
213
              .set(Grpc.TRANSPORT_ATTR_LOCAL_ADDR, address)
1✔
214
              .build();
1✔
215
          serverStreamAttributes = serverTransportListener.transportReady(serverTransportAttrs);
1✔
216
          clientTransportListener.transportReady();
1✔
217
        }
1✔
218
      }
1✔
219
    };
220
  }
221

222
  @Override
223
  public synchronized ClientStream newStream(
224
      MethodDescriptor<?, ?> method, Metadata headers, CallOptions callOptions,
225
      ClientStreamTracer[] tracers) {
226
    StatsTraceContext statsTraceContext =
1✔
227
        StatsTraceContext.newClientContext(tracers, getAttributes(), headers);
1✔
228
    if (shutdownStatus != null) {
1✔
229
      return failedClientStream(statsTraceContext, shutdownStatus);
1✔
230
    }
231

232
    headers.put(GrpcUtil.USER_AGENT_KEY, userAgent);
1✔
233

234
    if (serverMaxInboundMetadataSize != Integer.MAX_VALUE) {
1✔
235
      int metadataSize = metadataSize(headers);
1✔
236
      if (metadataSize > serverMaxInboundMetadataSize) {
1✔
237
        // Other transports would compute a status with:
238
        //   GrpcUtil.httpStatusToGrpcStatus(431 /* Request Header Fields Too Large */);
239
        // However, that isn't handled specially today, so we'd leak HTTP-isms even though we're
240
        // in-process. We go ahead and make a Status, which may need to be updated if
241
        // statuscodes.md is updated.
242
        Status status = Status.RESOURCE_EXHAUSTED.withDescription(
1✔
243
            String.format(
1✔
244
                Locale.US,
245
                "Request metadata larger than %d: %d",
246
                serverMaxInboundMetadataSize,
1✔
247
                metadataSize));
1✔
248
        return failedClientStream(statsTraceContext, status);
1✔
249
      }
250
    }
251

252
    return new InProcessStream(method, headers, callOptions, authority, statsTraceContext)
1✔
253
        .clientStream;
1✔
254
  }
255

256
  private ClientStream failedClientStream(
257
      final StatsTraceContext statsTraceCtx, final Status status) {
258
    return new NoopClientStream() {
1✔
259
        @Override
260
        public void start(ClientStreamListener listener) {
261
          statsTraceCtx.clientOutboundHeaders();
1✔
262
          statsTraceCtx.streamClosed(status);
1✔
263
          listener.closed(status, RpcProgress.PROCESSED, new Metadata());
1✔
264
        }
1✔
265
      };
266
  }
267

268
  @Override
269
  public synchronized void ping(final PingCallback callback, Executor executor) {
270
    if (terminated) {
1✔
271
      final Status shutdownStatus = this.shutdownStatus;
1✔
272
      executor.execute(new Runnable() {
1✔
273
        @Override
274
        public void run() {
275
          callback.onFailure(shutdownStatus.asRuntimeException());
1✔
276
        }
1✔
277
      });
278
    } else {
1✔
279
      executor.execute(new Runnable() {
1✔
280
        @Override
281
        public void run() {
282
          callback.onSuccess(0);
1✔
283
        }
1✔
284
      });
285
    }
286
  }
1✔
287

288
  @Override
289
  public synchronized void shutdown(Status reason) {
290
    // Can be called multiple times: once for ManagedClientTransport, once for ServerTransport.
291
    if (shutdown) {
1✔
292
      return;
1✔
293
    }
294
    shutdownStatus = reason;
1✔
295
    notifyShutdown(reason);
1✔
296
    if (streams.isEmpty()) {
1✔
297
      notifyTerminated();
1✔
298
    }
299
  }
1✔
300

301
  @Override
302
  public synchronized void shutdown() {
303
    shutdown(Status.UNAVAILABLE.withDescription("InProcessTransport shutdown by the server-side"));
1✔
304
  }
1✔
305

306
  @Override
307
  public void shutdownNow(Status reason) {
308
    checkNotNull(reason, "reason");
1✔
309
    List<InProcessStream> streamsCopy;
310
    synchronized (this) {
1✔
311
      shutdown(reason);
1✔
312
      if (terminated) {
1✔
313
        return;
1✔
314
      }
315
      streamsCopy = new ArrayList<>(streams);
1✔
316
    }
1✔
317
    for (InProcessStream stream : streamsCopy) {
1✔
318
      stream.clientStream.cancel(reason);
1✔
319
    }
1✔
320
  }
1✔
321

322
  @Override
323
  public String toString() {
324
    return MoreObjects.toStringHelper(this)
×
325
        .add("logId", logId.getId())
×
326
        .add("address", address)
×
327
        .toString();
×
328
  }
329

330
  @Override
331
  public InternalLogId getLogId() {
332
    return logId;
1✔
333
  }
334

335
  @Override
336
  public Attributes getAttributes() {
337
    return attributes;
1✔
338
  }
339

340
  @Override
341
  public ScheduledExecutorService getScheduledExecutorService() {
342
    return serverScheduler;
1✔
343
  }
344

345
  @Override
346
  public ListenableFuture<SocketStats> getStats() {
347
    SettableFuture<SocketStats> ret = SettableFuture.create();
×
348
    ret.set(null);
×
349
    return ret;
×
350
  }
351

352
  private synchronized void notifyShutdown(Status s) {
353
    if (shutdown) {
1✔
354
      return;
×
355
    }
356
    shutdown = true;
1✔
357
    clientTransportListener.transportShutdown(s);
1✔
358
  }
1✔
359

360
  private synchronized void notifyTerminated() {
361
    if (terminated) {
1✔
362
      return;
×
363
    }
364
    terminated = true;
1✔
365
    if (serverScheduler != null) {
1✔
366
      serverScheduler = serverSchedulerPool.returnObject(serverScheduler);
1✔
367
    }
368
    clientTransportListener.transportTerminated();
1✔
369
    if (serverTransportListener != null) {
1✔
370
      serverTransportListener.transportTerminated();
1✔
371
    }
372
  }
1✔
373

374
  private static int metadataSize(Metadata metadata) {
375
    byte[][] serialized = InternalMetadata.serialize(metadata);
1✔
376
    if (serialized == null) {
1✔
377
      return 0;
×
378
    }
379
    // Calculate based on SETTINGS_MAX_HEADER_LIST_SIZE in RFC 7540 §6.5.2. We could use something
380
    // different, but it's "sane."
381
    long size = 0;
1✔
382
    for (int i = 0; i < serialized.length; i += 2) {
1✔
383
      size += 32 + serialized[i].length + serialized[i + 1].length;
1✔
384
    }
385
    size = Math.min(size, Integer.MAX_VALUE);
1✔
386
    return (int) size;
1✔
387
  }
388

389
  private class InProcessStream {
390
    private final InProcessClientStream clientStream;
391
    private final InProcessServerStream serverStream;
392
    private final CallOptions callOptions;
393
    private final Metadata headers;
394
    private final MethodDescriptor<?, ?> method;
395
    private volatile String authority;
396

397
    private InProcessStream(
398
        MethodDescriptor<?, ?> method, Metadata headers, CallOptions callOptions,
399
        String authority , StatsTraceContext statsTraceContext) {
1✔
400
      this.method = checkNotNull(method, "method");
1✔
401
      this.headers = checkNotNull(headers, "headers");
1✔
402
      this.callOptions = checkNotNull(callOptions, "callOptions");
1✔
403
      this.authority = authority;
1✔
404
      this.clientStream = new InProcessClientStream(callOptions, statsTraceContext);
1✔
405
      this.serverStream = new InProcessServerStream(method, headers);
1✔
406
    }
1✔
407

408
    // Can be called multiple times due to races on both client and server closing at same time.
409
    private void streamClosed() {
410
      synchronized (InProcessTransport.this) {
1✔
411
        boolean justRemovedAnElement = streams.remove(this);
1✔
412
        if (GrpcUtil.shouldBeCountedForInUse(callOptions)) {
1✔
413
          inUseState.updateObjectInUse(this, false);
1✔
414
        }
415
        if (streams.isEmpty() && justRemovedAnElement) {
1✔
416
          if (shutdown) {
1✔
417
            notifyTerminated();
1✔
418
          }
419
        }
420
      }
1✔
421
    }
1✔
422

423
    private class InProcessServerStream implements ServerStream {
424
      final StatsTraceContext statsTraceCtx;
425
      // All callbacks must run in syncContext to avoid possibility of deadlock in direct executors
426
      private ClientStreamListener clientStreamListener;
427
      private final SynchronizationContext syncContext =
1✔
428
          new SynchronizationContext(uncaughtExceptionHandler);
1✔
429
      @GuardedBy("this")
430
      private int clientRequested;
431
      @GuardedBy("this")
1✔
432
      private ArrayDeque<StreamListener.MessageProducer> clientReceiveQueue =
433
          new ArrayDeque<>();
434
      @GuardedBy("this")
435
      private Status clientNotifyStatus;
436
      @GuardedBy("this")
437
      private Metadata clientNotifyTrailers;
438
      // Only is intended to prevent double-close when client cancels.
439
      @GuardedBy("this")
440
      private boolean closed;
441
      @GuardedBy("this")
442
      private int outboundSeqNo;
443

444
      InProcessServerStream(MethodDescriptor<?, ?> method, Metadata headers) {
1✔
445
        statsTraceCtx = StatsTraceContext.newServerContext(
1✔
446
            serverStreamTracerFactories, method.getFullMethodName(), headers);
1✔
447
      }
1✔
448

449
      private synchronized void setListener(ClientStreamListener listener) {
450
        clientStreamListener = listener;
1✔
451
      }
1✔
452

453
      @Override
454
      public void setListener(ServerStreamListener serverStreamListener) {
455
        clientStream.setListener(serverStreamListener);
1✔
456
      }
1✔
457

458
      @Override
459
      public void request(int numMessages) {
460
        boolean onReady = clientStream.serverRequested(numMessages);
1✔
461
        if (onReady) {
1✔
462
          synchronized (this) {
1✔
463
            if (!closed) {
1✔
464
              syncContext.executeLater(() -> clientStreamListener.onReady());
1✔
465
            }
466
          }
1✔
467
        }
468
        syncContext.drain();
1✔
469
      }
1✔
470

471
      // This method is the only reason we have to synchronize field accesses.
472
      /**
473
       * Client requested more messages.
474
       *
475
       * @return whether onReady should be called on the server
476
       */
477
      private boolean clientRequested(int numMessages) {
478
        boolean previouslyReady;
479
        boolean nowReady;
480
        synchronized (this) {
1✔
481
          if (closed) {
1✔
482
            return false;
1✔
483
          }
484

485
          previouslyReady = clientRequested > 0;
1✔
486
          clientRequested += numMessages;
1✔
487
          while (clientRequested > 0 && !clientReceiveQueue.isEmpty()) {
1✔
488
            clientRequested--;
1✔
489
            StreamListener.MessageProducer producer = clientReceiveQueue.poll();
1✔
490
            syncContext.executeLater(() -> clientStreamListener.messagesAvailable(producer));
1✔
491
          }
1✔
492

493
          if (clientReceiveQueue.isEmpty() && clientNotifyStatus != null) {
1✔
494
            closed = true;
1✔
495
            clientStream.statsTraceCtx.clientInboundTrailers(clientNotifyTrailers);
1✔
496
            clientStream.statsTraceCtx.streamClosed(clientNotifyStatus);
1✔
497
            Status notifyStatus = this.clientNotifyStatus;
1✔
498
            Metadata notifyTrailers = this.clientNotifyTrailers;
1✔
499
            syncContext.executeLater(() ->
1✔
500
                clientStreamListener.closed(notifyStatus, RpcProgress.PROCESSED, notifyTrailers));
1✔
501
          }
502

503
          nowReady = clientRequested > 0;
1✔
504
        }
1✔
505

506
        syncContext.drain();
1✔
507
        return !previouslyReady && nowReady;
1✔
508
      }
509

510
      private void clientCancelled(Status status) {
511
        internalCancel(status);
1✔
512
      }
1✔
513

514
      @Override
515
      public void writeMessage(InputStream message) {
516
        synchronized (this) {
1✔
517
          if (closed) {
1✔
518
            return;
1✔
519
          }
520
          statsTraceCtx.outboundMessage(outboundSeqNo);
1✔
521
          statsTraceCtx.outboundMessageSent(outboundSeqNo, -1, -1);
1✔
522
          clientStream.statsTraceCtx.inboundMessage(outboundSeqNo);
1✔
523
          clientStream.statsTraceCtx.inboundMessageRead(outboundSeqNo, -1, -1);
1✔
524
          outboundSeqNo++;
1✔
525
          StreamListener.MessageProducer producer = new SingleMessageProducer(message);
1✔
526
          if (clientRequested > 0) {
1✔
527
            clientRequested--;
1✔
528
            syncContext.executeLater(() -> clientStreamListener.messagesAvailable(producer));
1✔
529
          } else {
530
            clientReceiveQueue.add(producer);
1✔
531
          }
532
        }
1✔
533

534
        syncContext.drain();
1✔
535
      }
1✔
536

537
      @Override
538
      public void flush() {}
1✔
539

540
      @Override
541
      public synchronized boolean isReady() {
542
        if (closed) {
1✔
543
          return false;
×
544
        }
545
        return clientRequested > 0;
1✔
546
      }
547

548
      @Override
549
      public void writeHeaders(Metadata headers, boolean flush) {
550
        if (clientMaxInboundMetadataSize != Integer.MAX_VALUE) {
1✔
551
          int metadataSize = metadataSize(headers);
1✔
552
          if (metadataSize > clientMaxInboundMetadataSize) {
1✔
553
            Status serverStatus = Status.CANCELLED.withDescription("Client cancelled the RPC");
1✔
554
            clientStream.serverClosed(serverStatus, serverStatus);
1✔
555
            // Other transports provide very little information in this case. We go ahead and make a
556
            // Status, which may need to be updated if statuscodes.md is updated.
557
            Status failedStatus = Status.RESOURCE_EXHAUSTED.withDescription(
1✔
558
                String.format(
1✔
559
                    Locale.US,
560
                    "Response header metadata larger than %d: %d",
561
                    clientMaxInboundMetadataSize,
1✔
562
                    metadataSize));
1✔
563
            notifyClientClose(failedStatus, new Metadata());
1✔
564
            return;
1✔
565
          }
566
        }
567

568
        synchronized (this) {
1✔
569
          if (closed) {
1✔
570
            return;
1✔
571
          }
572

573
          clientStream.statsTraceCtx.clientInboundHeaders();
1✔
574
          syncContext.executeLater(() -> clientStreamListener.headersRead(headers));
1✔
575
        }
1✔
576
        syncContext.drain();
1✔
577
      }
1✔
578

579
      @Override
580
      public void close(Status status, Metadata trailers) {
581
        // clientStream.serverClosed must happen before clientStreamListener.closed, otherwise
582
        // clientStreamListener.closed can trigger clientStream.cancel (see code in
583
        // ClientCalls.blockingUnaryCall), which may race with clientStream.serverClosed as both are
584
        // calling internalCancel().
585
        clientStream.serverClosed(Status.OK, status);
1✔
586

587
        if (clientMaxInboundMetadataSize != Integer.MAX_VALUE) {
1✔
588
          int statusSize = status.getDescription() == null ? 0 : status.getDescription().length();
1✔
589
          // Go ahead and throw in the status description's length, since that could be very long.
590
          int metadataSize = metadataSize(trailers) + statusSize;
1✔
591
          if (metadataSize > clientMaxInboundMetadataSize) {
1✔
592
            // Override the status for the client, but not the server. Transports do not guarantee
593
            // notifying the server of the failure.
594

595
            // Other transports provide very little information in this case. We go ahead and make a
596
            // Status, which may need to be updated if statuscodes.md is updated.
597
            status = Status.RESOURCE_EXHAUSTED.withDescription(
1✔
598
                String.format(
1✔
599
                    Locale.US,
600
                    "Response header metadata larger than %d: %d",
601
                    clientMaxInboundMetadataSize,
1✔
602
                    metadataSize));
1✔
603
            trailers = new Metadata();
1✔
604
          }
605
        }
606

607
        notifyClientClose(status, trailers);
1✔
608
      }
1✔
609

610
      /** clientStream.serverClosed() must be called before this method */
611
      private void notifyClientClose(Status status, Metadata trailers) {
612
        Status clientStatus = cleanStatus(status, includeCauseWithStatus);
1✔
613
        synchronized (this) {
1✔
614
          if (closed) {
1✔
615
            return;
1✔
616
          }
617
          if (clientReceiveQueue.isEmpty()) {
1✔
618
            closed = true;
1✔
619
            clientStream.statsTraceCtx.clientInboundTrailers(trailers);
1✔
620
            clientStream.statsTraceCtx.streamClosed(clientStatus);
1✔
621
            syncContext.executeLater(
1✔
622
                () -> clientStreamListener.closed(clientStatus, RpcProgress.PROCESSED, trailers));
1✔
623
          } else {
624
            clientNotifyStatus = clientStatus;
1✔
625
            clientNotifyTrailers = trailers;
1✔
626
          }
627
        }
1✔
628
        syncContext.drain();
1✔
629
        streamClosed();
1✔
630
      }
1✔
631

632
      @Override
633
      public void cancel(Status status) {
634
        if (!internalCancel(Status.CANCELLED.withDescription("server cancelled stream"))) {
1✔
635
          return;
1✔
636
        }
637
        clientStream.serverClosed(status, status);
1✔
638
        streamClosed();
1✔
639
      }
1✔
640

641
      private boolean internalCancel(Status clientStatus) {
642
        synchronized (this) {
1✔
643
          if (closed) {
1✔
644
            return false;
1✔
645
          }
646
          closed = true;
1✔
647
          StreamListener.MessageProducer producer;
648
          while ((producer = clientReceiveQueue.poll()) != null) {
1✔
649
            InputStream message;
650
            while ((message = producer.next()) != null) {
×
651
              try {
652
                message.close();
×
653
              } catch (Throwable t) {
×
654
                log.log(Level.WARNING, "Exception closing stream", t);
×
655
              }
×
656
            }
657
          }
×
658
          clientStream.statsTraceCtx.streamClosed(clientStatus);
1✔
659
          syncContext.executeLater(
1✔
660
              () ->
661
                  clientStreamListener.closed(clientStatus, RpcProgress.PROCESSED, new Metadata()));
1✔
662
        }
1✔
663
        syncContext.drain();
1✔
664
        return true;
1✔
665
      }
666

667
      @Override
668
      public void setMessageCompression(boolean enable) {
669
        // noop
670
      }
×
671

672
      @Override
673
      public void optimizeForDirectExecutor() {}
1✔
674

675
      @Override
676
      public void setCompressor(Compressor compressor) {}
1✔
677

678
      @Override
679
      public void setDecompressor(Decompressor decompressor) {}
×
680

681
      @Override public Attributes getAttributes() {
682
        return serverStreamAttributes;
1✔
683
      }
684

685
      @Override
686
      public String getAuthority() {
687
        return InProcessStream.this.authority;
1✔
688
      }
689

690
      @Override
691
      public StatsTraceContext statsTraceContext() {
692
        return statsTraceCtx;
1✔
693
      }
694

695
      @Override
696
      public int streamId() {
697
        return -1;
1✔
698
      }
699
    }
700

701
    private class InProcessClientStream implements ClientStream {
702
      final StatsTraceContext statsTraceCtx;
703
      final CallOptions callOptions;
704
      // All callbacks must run in syncContext to avoid possibility of deadlock in direct executors
705
      private ServerStreamListener serverStreamListener;
706
      private final SynchronizationContext syncContext =
1✔
707
          new SynchronizationContext(uncaughtExceptionHandler);
1✔
708
      @GuardedBy("this")
709
      private int serverRequested;
710
      @GuardedBy("this")
1✔
711
      private ArrayDeque<StreamListener.MessageProducer> serverReceiveQueue =
712
          new ArrayDeque<>();
713
      @GuardedBy("this")
714
      private boolean serverNotifyHalfClose;
715
      // Only is intended to prevent double-close when server closes.
716
      @GuardedBy("this")
717
      private boolean closed;
718
      @GuardedBy("this")
719
      private int outboundSeqNo;
720

721
      InProcessClientStream(
722
          CallOptions callOptions, StatsTraceContext statsTraceContext) {
1✔
723
        this.callOptions = callOptions;
1✔
724
        statsTraceCtx = statsTraceContext;
1✔
725
      }
1✔
726

727
      private synchronized void setListener(ServerStreamListener listener) {
728
        this.serverStreamListener = listener;
1✔
729
      }
1✔
730

731
      @Override
732
      public void request(int numMessages) {
733
        boolean onReady = serverStream.clientRequested(numMessages);
1✔
734
        if (onReady) {
1✔
735
          synchronized (this) {
1✔
736
            if (!closed) {
1✔
737
              syncContext.executeLater(() -> serverStreamListener.onReady());
1✔
738
            }
739
          }
1✔
740
          syncContext.drain();
1✔
741
        }
742
      }
1✔
743

744
      // This method is the only reason we have to synchronize field accesses.
745
      /**
746
       * Client requested more messages.
747
       *
748
       * @return whether onReady should be called on the server
749
       */
750
      private boolean serverRequested(int numMessages) {
751
        boolean previouslyReady;
752
        boolean nowReady;
753
        synchronized (this) {
1✔
754
          if (closed) {
1✔
755
            return false;
1✔
756
          }
757
          previouslyReady = serverRequested > 0;
1✔
758
          serverRequested += numMessages;
1✔
759

760
          while (serverRequested > 0 && !serverReceiveQueue.isEmpty()) {
1✔
761
            serverRequested--;
1✔
762
            StreamListener.MessageProducer producer = serverReceiveQueue.poll();
1✔
763
            syncContext.executeLater(() -> serverStreamListener.messagesAvailable(producer));
1✔
764
          }
1✔
765

766
          if (serverReceiveQueue.isEmpty() && serverNotifyHalfClose) {
1✔
767
            serverNotifyHalfClose = false;
1✔
768
            syncContext.executeLater(() -> serverStreamListener.halfClosed());
1✔
769
          }
770
          nowReady = serverRequested > 0;
1✔
771
        }
1✔
772
        syncContext.drain();
1✔
773
        return !previouslyReady && nowReady;
1✔
774
      }
775

776
      private void serverClosed(Status serverListenerStatus, Status serverTracerStatus) {
777
        internalCancel(serverListenerStatus, serverTracerStatus);
1✔
778
      }
1✔
779

780
      @Override
781
      public void writeMessage(InputStream message) {
782
        synchronized (this) {
1✔
783
          if (closed) {
1✔
784
            return;
1✔
785
          }
786
          statsTraceCtx.outboundMessage(outboundSeqNo);
1✔
787
          statsTraceCtx.outboundMessageSent(outboundSeqNo, -1, -1);
1✔
788
          serverStream.statsTraceCtx.inboundMessage(outboundSeqNo);
1✔
789
          serverStream.statsTraceCtx.inboundMessageRead(outboundSeqNo, -1, -1);
1✔
790
          outboundSeqNo++;
1✔
791
          StreamListener.MessageProducer producer = new SingleMessageProducer(message);
1✔
792
          if (serverRequested > 0) {
1✔
793
            serverRequested--;
1✔
794
            syncContext.executeLater(() -> serverStreamListener.messagesAvailable(producer));
1✔
795
          } else {
796
            serverReceiveQueue.add(producer);
1✔
797
          }
798
        }
1✔
799
        syncContext.drain();
1✔
800
      }
1✔
801

802
      @Override
803
      public void flush() {}
1✔
804

805
      @Override
806
      public synchronized boolean isReady() {
807
        if (closed) {
1✔
808
          return false;
×
809
        }
810
        return serverRequested > 0;
1✔
811
      }
812

813
      // Must be thread-safe for shutdownNow()
814
      @Override
815
      public void cancel(Status reason) {
816
        Status serverStatus = cleanStatus(reason, includeCauseWithStatus);
1✔
817
        if (!internalCancel(serverStatus, serverStatus)) {
1✔
818
          return;
1✔
819
        }
820
        serverStream.clientCancelled(reason);
1✔
821
        streamClosed();
1✔
822
      }
1✔
823

824
      private boolean internalCancel(
825
          Status serverListenerStatus, Status serverTracerStatus) {
826
        synchronized (this) {
1✔
827
          if (closed) {
1✔
828
            return false;
1✔
829
          }
830
          closed = true;
1✔
831

832
          StreamListener.MessageProducer producer;
833
          while ((producer = serverReceiveQueue.poll()) != null) {
1✔
834
            InputStream message;
835
            while ((message = producer.next()) != null) {
1✔
836
              try {
837
                message.close();
1✔
838
              } catch (Throwable t) {
×
839
                log.log(Level.WARNING, "Exception closing stream", t);
×
840
              }
1✔
841
            }
842
          }
1✔
843
          serverStream.statsTraceCtx.streamClosed(serverTracerStatus);
1✔
844
          syncContext.executeLater(() -> serverStreamListener.closed(serverListenerStatus));
1✔
845
        }
1✔
846
        syncContext.drain();
1✔
847
        return true;
1✔
848
      }
849

850
      @Override
851
      public void halfClose() {
852
        synchronized (this) {
1✔
853
          if (closed) {
1✔
854
            return;
1✔
855
          }
856
          if (serverReceiveQueue.isEmpty()) {
1✔
857
            syncContext.executeLater(() -> serverStreamListener.halfClosed());
1✔
858
          } else {
859
            serverNotifyHalfClose = true;
1✔
860
          }
861
        }
1✔
862
        syncContext.drain();
1✔
863
      }
1✔
864

865
      @Override
866
      public void setMessageCompression(boolean enable) {}
×
867

868
      @Override
869
      public void setAuthority(String string) {
870
        InProcessStream.this.authority = string;
×
871
      }
×
872

873
      @Override
874
      public void start(ClientStreamListener listener) {
875
        serverStream.setListener(listener);
1✔
876

877
        synchronized (InProcessTransport.this) {
1✔
878
          statsTraceCtx.clientOutboundHeaders();
1✔
879
          streams.add(InProcessTransport.InProcessStream.this);
1✔
880
          if (GrpcUtil.shouldBeCountedForInUse(callOptions)) {
1✔
881
            inUseState.updateObjectInUse(InProcessTransport.InProcessStream.this, true);
1✔
882
          }
883
          serverTransportListener.streamCreated(serverStream, method.getFullMethodName(), headers);
1✔
884
        }
1✔
885
      }
1✔
886

887
      @Override
888
      public Attributes getAttributes() {
889
        return attributes;
1✔
890
      }
891

892
      @Override
893
      public void optimizeForDirectExecutor() {}
1✔
894

895
      @Override
896
      public void setCompressor(Compressor compressor) {}
1✔
897

898
      @Override
899
      public void setFullStreamDecompression(boolean fullStreamDecompression) {}
×
900

901
      @Override
902
      public void setDecompressorRegistry(DecompressorRegistry decompressorRegistry) {}
1✔
903

904
      @Override
905
      public void setMaxInboundMessageSize(int maxSize) {}
1✔
906

907
      @Override
908
      public void setMaxOutboundMessageSize(int maxSize) {}
1✔
909

910
      @Override
911
      public void setDeadline(Deadline deadline) {
912
        headers.discardAll(TIMEOUT_KEY);
1✔
913
        long effectiveTimeout = max(0, deadline.timeRemaining(TimeUnit.NANOSECONDS));
1✔
914
        headers.put(TIMEOUT_KEY, effectiveTimeout);
1✔
915
      }
1✔
916

917
      @Override
918
      public void appendTimeoutInsight(InsightBuilder insight) {
919
      }
1✔
920
    }
921
  }
922

923
  /**
924
   * Returns a new status with the same code and description.
925
   * If includeCauseWithStatus is true, cause is also included.
926
   *
927
   * <p>For InProcess transport to behave in the same way as the other transports,
928
   * when exchanging statuses between client and server and vice versa,
929
   * the cause should be excluded from the status.
930
   * For easier debugging, the status may be optionally included.
931
   */
932
  private static Status cleanStatus(Status status, boolean includeCauseWithStatus) {
933
    if (status == null) {
1✔
934
      return null;
×
935
    }
936
    Status clientStatus = Status
1✔
937
        .fromCodeValue(status.getCode().value())
1✔
938
        .withDescription(status.getDescription());
1✔
939
    if (includeCauseWithStatus) {
1✔
940
      clientStatus = clientStatus.withCause(status.getCause());
1✔
941
    }
942
    return clientStatus;
1✔
943
  }
944

945
  private static class SingleMessageProducer implements StreamListener.MessageProducer {
946
    private InputStream message;
947

948
    private SingleMessageProducer(InputStream message) {
1✔
949
      this.message = message;
1✔
950
    }
1✔
951

952
    @Nullable
953
    @Override
954
    public InputStream next() {
955
      InputStream messageToReturn = message;
1✔
956
      message = null;
1✔
957
      return messageToReturn;
1✔
958
    }
959
  }
960
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc