• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

grpc / grpc-java / #19512

17 Oct 2024 04:47PM UTC coverage: 84.657% (-0.01%) from 84.668%
#19512

push

github

ejona86
inprocess: Delete "standalone" internal transport

This had been used for a time with a combined inprocess+binder server.
However, just having multiple servers worked fine and this is no longer
used/needed.

33791 of 39915 relevant lines covered (84.66%)

0.85 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

90.27
/../inprocess/src/main/java/io/grpc/inprocess/InProcessTransport.java
1
/*
2
 * Copyright 2015 The gRPC Authors
3
 *
4
 * Licensed under the Apache License, Version 2.0 (the "License");
5
 * you may not use this file except in compliance with the License.
6
 * You may obtain a copy of the License at
7
 *
8
 *     http://www.apache.org/licenses/LICENSE-2.0
9
 *
10
 * Unless required by applicable law or agreed to in writing, software
11
 * distributed under the License is distributed on an "AS IS" BASIS,
12
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
 * See the License for the specific language governing permissions and
14
 * limitations under the License.
15
 */
16

17
package io.grpc.inprocess;
18

19
import static com.google.common.base.Preconditions.checkNotNull;
20
import static io.grpc.internal.GrpcUtil.TIMEOUT_KEY;
21
import static java.lang.Math.max;
22

23
import com.google.common.base.MoreObjects;
24
import com.google.common.io.ByteStreams;
25
import com.google.common.util.concurrent.ListenableFuture;
26
import com.google.common.util.concurrent.SettableFuture;
27
import io.grpc.Attributes;
28
import io.grpc.CallOptions;
29
import io.grpc.ClientStreamTracer;
30
import io.grpc.Compressor;
31
import io.grpc.Deadline;
32
import io.grpc.Decompressor;
33
import io.grpc.DecompressorRegistry;
34
import io.grpc.Grpc;
35
import io.grpc.InternalChannelz.SocketStats;
36
import io.grpc.InternalLogId;
37
import io.grpc.InternalMetadata;
38
import io.grpc.KnownLength;
39
import io.grpc.Metadata;
40
import io.grpc.MethodDescriptor;
41
import io.grpc.SecurityLevel;
42
import io.grpc.ServerStreamTracer;
43
import io.grpc.Status;
44
import io.grpc.SynchronizationContext;
45
import io.grpc.internal.ClientStream;
46
import io.grpc.internal.ClientStreamListener;
47
import io.grpc.internal.ClientStreamListener.RpcProgress;
48
import io.grpc.internal.ConnectionClientTransport;
49
import io.grpc.internal.GrpcAttributes;
50
import io.grpc.internal.GrpcUtil;
51
import io.grpc.internal.InUseStateAggregator;
52
import io.grpc.internal.InsightBuilder;
53
import io.grpc.internal.ManagedClientTransport;
54
import io.grpc.internal.NoopClientStream;
55
import io.grpc.internal.ObjectPool;
56
import io.grpc.internal.ServerStream;
57
import io.grpc.internal.ServerStreamListener;
58
import io.grpc.internal.ServerTransport;
59
import io.grpc.internal.ServerTransportListener;
60
import io.grpc.internal.StatsTraceContext;
61
import io.grpc.internal.StreamListener;
62
import java.io.ByteArrayInputStream;
63
import java.io.InputStream;
64
import java.net.SocketAddress;
65
import java.util.ArrayDeque;
66
import java.util.ArrayList;
67
import java.util.Collections;
68
import java.util.IdentityHashMap;
69
import java.util.List;
70
import java.util.Locale;
71
import java.util.Set;
72
import java.util.concurrent.Executor;
73
import java.util.concurrent.ScheduledExecutorService;
74
import java.util.concurrent.TimeUnit;
75
import java.util.logging.Level;
76
import java.util.logging.Logger;
77
import javax.annotation.CheckReturnValue;
78
import javax.annotation.Nullable;
79
import javax.annotation.concurrent.GuardedBy;
80
import javax.annotation.concurrent.ThreadSafe;
81

82
@ThreadSafe
83
final class InProcessTransport implements ServerTransport, ConnectionClientTransport {
84
  private static final Logger log = Logger.getLogger(InProcessTransport.class.getName());
1✔
85

86
  private final InternalLogId logId;
87
  private final SocketAddress address;
88
  private final int clientMaxInboundMetadataSize;
89
  private final String authority;
90
  private final String userAgent;
91
  private int serverMaxInboundMetadataSize;
92
  private final boolean includeCauseWithStatus;
93
  private ObjectPool<ScheduledExecutorService> serverSchedulerPool;
94
  private ScheduledExecutorService serverScheduler;
95
  private ServerTransportListener serverTransportListener;
96
  private Attributes serverStreamAttributes;
97
  private ManagedClientTransport.Listener clientTransportListener;
98
  // The size is assumed from the sender's side.
99
  private final long assumedMessageSize;
100
  @GuardedBy("this")
101
  private boolean shutdown;
102
  @GuardedBy("this")
103
  private boolean terminated;
104
  @GuardedBy("this")
105
  private Status shutdownStatus;
106
  @GuardedBy("this")
1✔
107
  private final Set<InProcessStream> streams = Collections.newSetFromMap(
1✔
108
          new IdentityHashMap<InProcessStream, Boolean>());
109
  @GuardedBy("this")
110
  private List<ServerStreamTracer.Factory> serverStreamTracerFactories;
111
  private Attributes attributes;
112

113
  private Thread.UncaughtExceptionHandler uncaughtExceptionHandler =
1✔
114
      new Thread.UncaughtExceptionHandler() {
1✔
115
        @Override
116
        public void uncaughtException(Thread t, Throwable e) {
117
          if (e instanceof Error) {
×
118
            throw new Error(e);
×
119
          }
120
          throw new RuntimeException(e);
×
121
        }
122
      };
123

124

125
  @GuardedBy("this")
1✔
126
  private final InUseStateAggregator<InProcessStream> inUseState =
127
      new InUseStateAggregator<InProcessStream>() {
1✔
128
        @Override
129
        protected void handleInUse() {
130
          clientTransportListener.transportInUse(true);
1✔
131
        }
1✔
132

133
        @Override
134
        protected void handleNotInUse() {
135
          clientTransportListener.transportInUse(false);
1✔
136
        }
1✔
137
      };
138

139
  public InProcessTransport(SocketAddress address, int maxInboundMetadataSize, String authority,
140
      String userAgent, Attributes eagAttrs,
141
      boolean includeCauseWithStatus, long assumedMessageSize) {
1✔
142
    this.address = address;
1✔
143
    this.clientMaxInboundMetadataSize = maxInboundMetadataSize;
1✔
144
    this.authority = authority;
1✔
145
    this.userAgent = GrpcUtil.getGrpcUserAgent("inprocess", userAgent);
1✔
146
    checkNotNull(eagAttrs, "eagAttrs");
1✔
147
    this.attributes = Attributes.newBuilder()
1✔
148
        .set(GrpcAttributes.ATTR_SECURITY_LEVEL, SecurityLevel.PRIVACY_AND_INTEGRITY)
1✔
149
        .set(GrpcAttributes.ATTR_CLIENT_EAG_ATTRS, eagAttrs)
1✔
150
        .set(Grpc.TRANSPORT_ATTR_REMOTE_ADDR, address)
1✔
151
        .set(Grpc.TRANSPORT_ATTR_LOCAL_ADDR, address)
1✔
152
        .build();
1✔
153
    logId = InternalLogId.allocate(getClass(), address.toString());
1✔
154
    this.includeCauseWithStatus = includeCauseWithStatus;
1✔
155
    this.assumedMessageSize = assumedMessageSize;
1✔
156
  }
1✔
157

158
  @CheckReturnValue
159
  @Override
160
  public synchronized Runnable start(ManagedClientTransport.Listener listener) {
161
    this.clientTransportListener = listener;
1✔
162
    InProcessServer server = InProcessServer.findServer(address);
1✔
163
    if (server != null) {
1✔
164
      serverMaxInboundMetadataSize = server.getMaxInboundMetadataSize();
1✔
165
      serverSchedulerPool = server.getScheduledExecutorServicePool();
1✔
166
      serverScheduler = serverSchedulerPool.getObject();
1✔
167
      serverStreamTracerFactories = server.getStreamTracerFactories();
1✔
168
      // Must be semi-initialized; past this point, can begin receiving requests
169
      serverTransportListener = server.register(this);
1✔
170
    }
171
    if (serverTransportListener == null) {
1✔
172
      shutdownStatus = Status.UNAVAILABLE.withDescription("Could not find server: " + address);
1✔
173
      final Status localShutdownStatus = shutdownStatus;
1✔
174
      return new Runnable() {
1✔
175
        @Override
176
        public void run() {
177
          synchronized (InProcessTransport.this) {
1✔
178
            notifyShutdown(localShutdownStatus);
1✔
179
            notifyTerminated();
1✔
180
          }
1✔
181
        }
1✔
182
      };
183
    }
184
    Attributes serverTransportAttrs = Attributes.newBuilder()
1✔
185
        .set(Grpc.TRANSPORT_ATTR_REMOTE_ADDR, address)
1✔
186
        .set(Grpc.TRANSPORT_ATTR_LOCAL_ADDR, address)
1✔
187
        .build();
1✔
188
    serverStreamAttributes = serverTransportListener.transportReady(serverTransportAttrs);
1✔
189
    attributes = clientTransportListener.filterTransport(attributes);
1✔
190
    clientTransportListener.transportReady();
1✔
191
    return null;
1✔
192
  }
193

194
  @Override
195
  public synchronized ClientStream newStream(
196
      MethodDescriptor<?, ?> method, Metadata headers, CallOptions callOptions,
197
      ClientStreamTracer[] tracers) {
198
    StatsTraceContext statsTraceContext =
1✔
199
        StatsTraceContext.newClientContext(tracers, getAttributes(), headers);
1✔
200
    if (shutdownStatus != null) {
1✔
201
      return failedClientStream(statsTraceContext, shutdownStatus);
1✔
202
    }
203

204
    headers.put(GrpcUtil.USER_AGENT_KEY, userAgent);
1✔
205

206
    if (serverMaxInboundMetadataSize != Integer.MAX_VALUE) {
1✔
207
      int metadataSize = metadataSize(headers);
1✔
208
      if (metadataSize > serverMaxInboundMetadataSize) {
1✔
209
        // Other transports would compute a status with:
210
        //   GrpcUtil.httpStatusToGrpcStatus(431 /* Request Header Fields Too Large */);
211
        // However, that isn't handled specially today, so we'd leak HTTP-isms even though we're
212
        // in-process. We go ahead and make a Status, which may need to be updated if
213
        // statuscodes.md is updated.
214
        Status status = Status.RESOURCE_EXHAUSTED.withDescription(
1✔
215
            String.format(
1✔
216
                Locale.US,
217
                "Request metadata larger than %d: %d",
218
                serverMaxInboundMetadataSize,
1✔
219
                metadataSize));
1✔
220
        return failedClientStream(statsTraceContext, status);
1✔
221
      }
222
    }
223

224
    return new InProcessStream(method, headers, callOptions, authority, statsTraceContext)
1✔
225
        .clientStream;
1✔
226
  }
227

228
  private ClientStream failedClientStream(
229
      final StatsTraceContext statsTraceCtx, final Status status) {
230
    return new NoopClientStream() {
1✔
231
        @Override
232
        public void start(ClientStreamListener listener) {
233
          statsTraceCtx.clientOutboundHeaders();
1✔
234
          statsTraceCtx.streamClosed(status);
1✔
235
          listener.closed(status, RpcProgress.PROCESSED, new Metadata());
1✔
236
        }
1✔
237
      };
238
  }
239

240
  @Override
241
  public synchronized void ping(final PingCallback callback, Executor executor) {
242
    if (terminated) {
1✔
243
      final Status shutdownStatus = this.shutdownStatus;
1✔
244
      executor.execute(new Runnable() {
1✔
245
        @Override
246
        public void run() {
247
          callback.onFailure(shutdownStatus.asRuntimeException());
1✔
248
        }
1✔
249
      });
250
    } else {
1✔
251
      executor.execute(new Runnable() {
1✔
252
        @Override
253
        public void run() {
254
          callback.onSuccess(0);
1✔
255
        }
1✔
256
      });
257
    }
258
  }
1✔
259

260
  @Override
261
  public synchronized void shutdown(Status reason) {
262
    // Can be called multiple times: once for ManagedClientTransport, once for ServerTransport.
263
    if (shutdown) {
1✔
264
      return;
1✔
265
    }
266
    shutdownStatus = reason;
1✔
267
    notifyShutdown(reason);
1✔
268
    if (streams.isEmpty()) {
1✔
269
      notifyTerminated();
1✔
270
    }
271
  }
1✔
272

273
  @Override
274
  public synchronized void shutdown() {
275
    shutdown(Status.UNAVAILABLE.withDescription("InProcessTransport shutdown by the server-side"));
1✔
276
  }
1✔
277

278
  @Override
279
  public void shutdownNow(Status reason) {
280
    checkNotNull(reason, "reason");
1✔
281
    List<InProcessStream> streamsCopy;
282
    synchronized (this) {
1✔
283
      shutdown(reason);
1✔
284
      if (terminated) {
1✔
285
        return;
1✔
286
      }
287
      streamsCopy = new ArrayList<>(streams);
1✔
288
    }
1✔
289
    for (InProcessStream stream : streamsCopy) {
1✔
290
      stream.clientStream.cancel(reason);
1✔
291
    }
1✔
292
  }
1✔
293

294
  @Override
295
  public String toString() {
296
    return MoreObjects.toStringHelper(this)
×
297
        .add("logId", logId.getId())
×
298
        .add("address", address)
×
299
        .toString();
×
300
  }
301

302
  @Override
303
  public InternalLogId getLogId() {
304
    return logId;
1✔
305
  }
306

307
  @Override
308
  public Attributes getAttributes() {
309
    return attributes;
1✔
310
  }
311

312
  @Override
313
  public ScheduledExecutorService getScheduledExecutorService() {
314
    return serverScheduler;
1✔
315
  }
316

317
  @Override
318
  public ListenableFuture<SocketStats> getStats() {
319
    SettableFuture<SocketStats> ret = SettableFuture.create();
×
320
    ret.set(null);
×
321
    return ret;
×
322
  }
323

324
  private synchronized void notifyShutdown(Status s) {
325
    if (shutdown) {
1✔
326
      return;
×
327
    }
328
    shutdown = true;
1✔
329
    clientTransportListener.transportShutdown(s);
1✔
330
  }
1✔
331

332
  private synchronized void notifyTerminated() {
333
    if (terminated) {
1✔
334
      return;
×
335
    }
336
    terminated = true;
1✔
337
    if (serverScheduler != null) {
1✔
338
      serverScheduler = serverSchedulerPool.returnObject(serverScheduler);
1✔
339
    }
340
    clientTransportListener.transportTerminated();
1✔
341
    if (serverTransportListener != null) {
1✔
342
      serverTransportListener.transportTerminated();
1✔
343
    }
344
  }
1✔
345

346
  private static int metadataSize(Metadata metadata) {
347
    byte[][] serialized = InternalMetadata.serialize(metadata);
1✔
348
    if (serialized == null) {
1✔
349
      return 0;
×
350
    }
351
    // Calculate based on SETTINGS_MAX_HEADER_LIST_SIZE in RFC 7540 §6.5.2. We could use something
352
    // different, but it's "sane."
353
    long size = 0;
1✔
354
    for (int i = 0; i < serialized.length; i += 2) {
1✔
355
      size += 32 + serialized[i].length + serialized[i + 1].length;
1✔
356
    }
357
    size = Math.min(size, Integer.MAX_VALUE);
1✔
358
    return (int) size;
1✔
359
  }
360

361
  private class InProcessStream {
362
    private final InProcessClientStream clientStream;
363
    private final InProcessServerStream serverStream;
364
    private final CallOptions callOptions;
365
    private final Metadata headers;
366
    private final MethodDescriptor<?, ?> method;
367
    private volatile String authority;
368

369
    private InProcessStream(
370
        MethodDescriptor<?, ?> method, Metadata headers, CallOptions callOptions,
371
        String authority , StatsTraceContext statsTraceContext) {
1✔
372
      this.method = checkNotNull(method, "method");
1✔
373
      this.headers = checkNotNull(headers, "headers");
1✔
374
      this.callOptions = checkNotNull(callOptions, "callOptions");
1✔
375
      this.authority = authority;
1✔
376
      this.clientStream = new InProcessClientStream(callOptions, statsTraceContext);
1✔
377
      this.serverStream = new InProcessServerStream(method, headers);
1✔
378
    }
1✔
379

380
    // Can be called multiple times due to races on both client and server closing at same time.
381
    private void streamClosed() {
382
      synchronized (InProcessTransport.this) {
1✔
383
        boolean justRemovedAnElement = streams.remove(this);
1✔
384
        if (GrpcUtil.shouldBeCountedForInUse(callOptions)) {
1✔
385
          inUseState.updateObjectInUse(this, false);
1✔
386
        }
387
        if (streams.isEmpty() && justRemovedAnElement) {
1✔
388
          if (shutdown) {
1✔
389
            notifyTerminated();
1✔
390
          }
391
        }
392
      }
1✔
393
    }
1✔
394

395
    private class InProcessServerStream implements ServerStream {
396
      final StatsTraceContext statsTraceCtx;
397
      // All callbacks must run in syncContext to avoid possibility of deadlock in direct executors
398
      private ClientStreamListener clientStreamListener;
399
      private final SynchronizationContext syncContext =
1✔
400
          new SynchronizationContext(uncaughtExceptionHandler);
1✔
401
      @GuardedBy("this")
402
      private int clientRequested;
403
      @GuardedBy("this")
1✔
404
      private ArrayDeque<StreamListener.MessageProducer> clientReceiveQueue =
405
          new ArrayDeque<>();
406
      @GuardedBy("this")
407
      private Status clientNotifyStatus;
408
      @GuardedBy("this")
409
      private Metadata clientNotifyTrailers;
410
      // Only is intended to prevent double-close when client cancels.
411
      @GuardedBy("this")
412
      private boolean closed;
413
      @GuardedBy("this")
414
      private int outboundSeqNo;
415

416
      InProcessServerStream(MethodDescriptor<?, ?> method, Metadata headers) {
1✔
417
        statsTraceCtx = StatsTraceContext.newServerContext(
1✔
418
            serverStreamTracerFactories, method.getFullMethodName(), headers);
1✔
419
      }
1✔
420

421
      private synchronized void setListener(ClientStreamListener listener) {
422
        clientStreamListener = listener;
1✔
423
      }
1✔
424

425
      @Override
426
      public void setListener(ServerStreamListener serverStreamListener) {
427
        clientStream.setListener(serverStreamListener);
1✔
428
      }
1✔
429

430
      @Override
431
      public void request(int numMessages) {
432
        boolean onReady = clientStream.serverRequested(numMessages);
1✔
433
        if (onReady) {
1✔
434
          synchronized (this) {
1✔
435
            if (!closed) {
1✔
436
              syncContext.executeLater(() -> clientStreamListener.onReady());
1✔
437
            }
438
          }
1✔
439
        }
440
        syncContext.drain();
1✔
441
      }
1✔
442

443
      // This method is the only reason we have to synchronize field accesses.
444
      /**
445
       * Client requested more messages.
446
       *
447
       * @return whether onReady should be called on the server
448
       */
449
      private boolean clientRequested(int numMessages) {
450
        boolean previouslyReady;
451
        boolean nowReady;
452
        synchronized (this) {
1✔
453
          if (closed) {
1✔
454
            return false;
1✔
455
          }
456

457
          previouslyReady = clientRequested > 0;
1✔
458
          clientRequested += numMessages;
1✔
459
          while (clientRequested > 0 && !clientReceiveQueue.isEmpty()) {
1✔
460
            clientRequested--;
1✔
461
            StreamListener.MessageProducer producer = clientReceiveQueue.poll();
1✔
462
            syncContext.executeLater(() -> clientStreamListener.messagesAvailable(producer));
1✔
463
          }
1✔
464

465
          if (clientReceiveQueue.isEmpty() && clientNotifyStatus != null) {
1✔
466
            closed = true;
1✔
467
            clientStream.statsTraceCtx.clientInboundTrailers(clientNotifyTrailers);
1✔
468
            clientStream.statsTraceCtx.streamClosed(clientNotifyStatus);
1✔
469
            Status notifyStatus = this.clientNotifyStatus;
1✔
470
            Metadata notifyTrailers = this.clientNotifyTrailers;
1✔
471
            syncContext.executeLater(() ->
1✔
472
                clientStreamListener.closed(notifyStatus, RpcProgress.PROCESSED, notifyTrailers));
1✔
473
          }
474

475
          nowReady = clientRequested > 0;
1✔
476
        }
1✔
477

478
        syncContext.drain();
1✔
479
        return !previouslyReady && nowReady;
1✔
480
      }
481

482
      private void clientCancelled(Status status) {
483
        internalCancel(status);
1✔
484
      }
1✔
485

486
      @Override
487
      public void writeMessage(InputStream message) {
488
        long messageLength;
489
        try {
490
          if (assumedMessageSize != -1) {
1✔
491
            messageLength = assumedMessageSize;
1✔
492
          } else if (message instanceof KnownLength || message instanceof ByteArrayInputStream) {
1✔
493
            messageLength = message.available();
1✔
494
          } else {
495
            InputStream oldMessage = message;
×
496
            byte[] payload = ByteStreams.toByteArray(message);
×
497
            messageLength = payload.length;
×
498
            message = new ByteArrayInputStream(payload);
×
499
            oldMessage.close();
×
500
          }
501
        } catch (Exception e) {
×
502
          throw new RuntimeException("Error processing the message length", e);
×
503
        }
1✔
504
        synchronized (this) {
1✔
505
          if (closed) {
1✔
506
            return;
1✔
507
          }
508
          statsTraceCtx.outboundMessage(outboundSeqNo);
1✔
509
          statsTraceCtx.outboundMessageSent(outboundSeqNo, -1, -1);
1✔
510
          clientStream.statsTraceCtx.inboundMessage(outboundSeqNo);
1✔
511
          clientStream.statsTraceCtx.inboundMessageRead(outboundSeqNo, -1, -1);
1✔
512
          statsTraceCtx.outboundUncompressedSize(messageLength);
1✔
513
          statsTraceCtx.outboundWireSize(messageLength);
1✔
514
          // messageLength should be same at receiver's end as no actual wire is involved.
515
          clientStream.statsTraceCtx.inboundUncompressedSize(messageLength);
1✔
516
          clientStream.statsTraceCtx.inboundWireSize(messageLength);
1✔
517
          outboundSeqNo++;
1✔
518
          StreamListener.MessageProducer producer = new SingleMessageProducer(message);
1✔
519
          if (clientRequested > 0) {
1✔
520
            clientRequested--;
1✔
521
            syncContext.executeLater(() -> clientStreamListener.messagesAvailable(producer));
1✔
522
          } else {
523
            clientReceiveQueue.add(producer);
1✔
524
          }
525
        }
1✔
526

527
        syncContext.drain();
1✔
528
      }
1✔
529

530
      @Override
531
      public void flush() {}
1✔
532

533
      @Override
534
      public synchronized boolean isReady() {
535
        if (closed) {
1✔
536
          return false;
×
537
        }
538
        return clientRequested > 0;
1✔
539
      }
540

541
      @Override
542
      public void writeHeaders(Metadata headers, boolean flush) {
543
        if (clientMaxInboundMetadataSize != Integer.MAX_VALUE) {
1✔
544
          int metadataSize = metadataSize(headers);
1✔
545
          if (metadataSize > clientMaxInboundMetadataSize) {
1✔
546
            Status serverStatus = Status.CANCELLED.withDescription("Client cancelled the RPC");
1✔
547
            clientStream.serverClosed(serverStatus, serverStatus);
1✔
548
            // Other transports provide very little information in this case. We go ahead and make a
549
            // Status, which may need to be updated if statuscodes.md is updated.
550
            Status failedStatus = Status.RESOURCE_EXHAUSTED.withDescription(
1✔
551
                String.format(
1✔
552
                    Locale.US,
553
                    "Response header metadata larger than %d: %d",
554
                    clientMaxInboundMetadataSize,
1✔
555
                    metadataSize));
1✔
556
            notifyClientClose(failedStatus, new Metadata());
1✔
557
            return;
1✔
558
          }
559
        }
560

561
        synchronized (this) {
1✔
562
          if (closed) {
1✔
563
            return;
1✔
564
          }
565

566
          clientStream.statsTraceCtx.clientInboundHeaders(headers);
1✔
567
          syncContext.executeLater(() -> clientStreamListener.headersRead(headers));
1✔
568
        }
1✔
569
        syncContext.drain();
1✔
570
      }
1✔
571

572
      @Override
573
      public void close(Status status, Metadata trailers) {
574
        // clientStream.serverClosed must happen before clientStreamListener.closed, otherwise
575
        // clientStreamListener.closed can trigger clientStream.cancel (see code in
576
        // ClientCalls.blockingUnaryCall), which may race with clientStream.serverClosed as both are
577
        // calling internalCancel().
578
        clientStream.serverClosed(Status.OK, status);
1✔
579

580
        if (clientMaxInboundMetadataSize != Integer.MAX_VALUE) {
1✔
581
          int statusSize = status.getDescription() == null ? 0 : status.getDescription().length();
1✔
582
          // Go ahead and throw in the status description's length, since that could be very long.
583
          int metadataSize = metadataSize(trailers) + statusSize;
1✔
584
          if (metadataSize > clientMaxInboundMetadataSize) {
1✔
585
            // Override the status for the client, but not the server. Transports do not guarantee
586
            // notifying the server of the failure.
587

588
            // Other transports provide very little information in this case. We go ahead and make a
589
            // Status, which may need to be updated if statuscodes.md is updated.
590
            status = Status.RESOURCE_EXHAUSTED.withDescription(
1✔
591
                String.format(
1✔
592
                    Locale.US,
593
                    "Response header metadata larger than %d: %d",
594
                    clientMaxInboundMetadataSize,
1✔
595
                    metadataSize));
1✔
596
            trailers = new Metadata();
1✔
597
          }
598
        }
599

600
        notifyClientClose(status, trailers);
1✔
601
      }
1✔
602

603
      /** clientStream.serverClosed() must be called before this method */
604
      private void notifyClientClose(Status status, Metadata trailers) {
605
        Status clientStatus = cleanStatus(status, includeCauseWithStatus);
1✔
606
        synchronized (this) {
1✔
607
          if (closed) {
1✔
608
            return;
1✔
609
          }
610
          if (clientReceiveQueue.isEmpty()) {
1✔
611
            closed = true;
1✔
612
            clientStream.statsTraceCtx.clientInboundTrailers(trailers);
1✔
613
            clientStream.statsTraceCtx.streamClosed(clientStatus);
1✔
614
            syncContext.executeLater(
1✔
615
                () -> clientStreamListener.closed(clientStatus, RpcProgress.PROCESSED, trailers));
1✔
616
          } else {
617
            clientNotifyStatus = clientStatus;
1✔
618
            clientNotifyTrailers = trailers;
1✔
619
          }
620
        }
1✔
621
        syncContext.drain();
1✔
622
        streamClosed();
1✔
623
      }
1✔
624

625
      @Override
626
      public void cancel(Status status) {
627
        if (!internalCancel(Status.CANCELLED.withDescription("server cancelled stream"))) {
1✔
628
          return;
1✔
629
        }
630
        clientStream.serverClosed(status, status);
1✔
631
        streamClosed();
1✔
632
      }
1✔
633

634
      private boolean internalCancel(Status clientStatus) {
635
        synchronized (this) {
1✔
636
          if (closed) {
1✔
637
            return false;
1✔
638
          }
639
          closed = true;
1✔
640
          StreamListener.MessageProducer producer;
641
          while ((producer = clientReceiveQueue.poll()) != null) {
1✔
642
            InputStream message;
643
            while ((message = producer.next()) != null) {
×
644
              try {
645
                message.close();
×
646
              } catch (Throwable t) {
×
647
                log.log(Level.WARNING, "Exception closing stream", t);
×
648
              }
×
649
            }
650
          }
×
651
          clientStream.statsTraceCtx.streamClosed(clientStatus);
1✔
652
          syncContext.executeLater(
1✔
653
              () ->
654
                  clientStreamListener.closed(clientStatus, RpcProgress.PROCESSED, new Metadata()));
1✔
655
        }
1✔
656
        syncContext.drain();
1✔
657
        return true;
1✔
658
      }
659

660
      @Override
661
      public void setMessageCompression(boolean enable) {
662
        // noop
663
      }
×
664

665
      @Override
666
      public void optimizeForDirectExecutor() {}
1✔
667

668
      @Override
669
      public void setCompressor(Compressor compressor) {}
1✔
670

671
      @Override
672
      public void setDecompressor(Decompressor decompressor) {}
×
673

674
      @Override public Attributes getAttributes() {
675
        return serverStreamAttributes;
1✔
676
      }
677

678
      @Override
679
      public String getAuthority() {
680
        return InProcessStream.this.authority;
1✔
681
      }
682

683
      @Override
684
      public StatsTraceContext statsTraceContext() {
685
        return statsTraceCtx;
1✔
686
      }
687

688
      @Override
689
      public int streamId() {
690
        return -1;
1✔
691
      }
692

693
      @Override
694
      public void setOnReadyThreshold(int numBytes) {
695
        // noop
696
      }
×
697
    }
698

699
    private class InProcessClientStream implements ClientStream {
700
      final StatsTraceContext statsTraceCtx;
701
      final CallOptions callOptions;
702
      // All callbacks must run in syncContext to avoid possibility of deadlock in direct executors
703
      private ServerStreamListener serverStreamListener;
704
      private final SynchronizationContext syncContext =
1✔
705
          new SynchronizationContext(uncaughtExceptionHandler);
1✔
706
      @GuardedBy("this")
707
      private int serverRequested;
708
      @GuardedBy("this")
1✔
709
      private ArrayDeque<StreamListener.MessageProducer> serverReceiveQueue =
710
          new ArrayDeque<>();
711
      @GuardedBy("this")
712
      private boolean serverNotifyHalfClose;
713
      // Only is intended to prevent double-close when server closes.
714
      @GuardedBy("this")
715
      private boolean closed;
716
      @GuardedBy("this")
717
      private int outboundSeqNo;
718

719
      InProcessClientStream(
720
          CallOptions callOptions, StatsTraceContext statsTraceContext) {
1✔
721
        this.callOptions = callOptions;
1✔
722
        statsTraceCtx = statsTraceContext;
1✔
723
      }
1✔
724

725
      private synchronized void setListener(ServerStreamListener listener) {
726
        this.serverStreamListener = listener;
1✔
727
      }
1✔
728

729
      @Override
730
      public void request(int numMessages) {
731
        boolean onReady = serverStream.clientRequested(numMessages);
1✔
732
        if (onReady) {
1✔
733
          synchronized (this) {
1✔
734
            if (!closed) {
1✔
735
              syncContext.executeLater(() -> serverStreamListener.onReady());
1✔
736
            }
737
          }
1✔
738
          syncContext.drain();
1✔
739
        }
740
      }
1✔
741

742
      // This method is the only reason we have to synchronize field accesses.
743
      /**
744
       * Client requested more messages.
745
       *
746
       * @return whether onReady should be called on the server
747
       */
748
      private boolean serverRequested(int numMessages) {
749
        boolean previouslyReady;
750
        boolean nowReady;
751
        synchronized (this) {
1✔
752
          if (closed) {
1✔
753
            return false;
1✔
754
          }
755
          previouslyReady = serverRequested > 0;
1✔
756
          serverRequested += numMessages;
1✔
757

758
          while (serverRequested > 0 && !serverReceiveQueue.isEmpty()) {
1✔
759
            serverRequested--;
1✔
760
            StreamListener.MessageProducer producer = serverReceiveQueue.poll();
1✔
761
            syncContext.executeLater(() -> serverStreamListener.messagesAvailable(producer));
1✔
762
          }
1✔
763

764
          if (serverReceiveQueue.isEmpty() && serverNotifyHalfClose) {
1✔
765
            serverNotifyHalfClose = false;
1✔
766
            syncContext.executeLater(() -> serverStreamListener.halfClosed());
1✔
767
          }
768
          nowReady = serverRequested > 0;
1✔
769
        }
1✔
770
        syncContext.drain();
1✔
771
        return !previouslyReady && nowReady;
1✔
772
      }
773

774
      private void serverClosed(Status serverListenerStatus, Status serverTracerStatus) {
775
        internalCancel(serverListenerStatus, serverTracerStatus);
1✔
776
      }
1✔
777

778
      @Override
779
      public void writeMessage(InputStream message) {
780
        long messageLength;
781
        try {
782
          if (assumedMessageSize != -1) {
1✔
783
            messageLength = assumedMessageSize;
1✔
784
          } else if (message instanceof KnownLength || message instanceof ByteArrayInputStream) {
1✔
785
            messageLength = message.available();
1✔
786
          } else {
787
            InputStream oldMessage = message;
×
788
            byte[] payload = ByteStreams.toByteArray(message);
×
789
            messageLength = payload.length;
×
790
            message = new ByteArrayInputStream(payload);
×
791
            oldMessage.close();
×
792
          }
793
        } catch (Exception e) {
×
794
          throw new RuntimeException("Error processing the message length", e);
×
795
        }
1✔
796
        synchronized (this) {
1✔
797
          if (closed) {
1✔
798
            return;
1✔
799
          }
800
          statsTraceCtx.outboundMessage(outboundSeqNo);
1✔
801
          statsTraceCtx.outboundMessageSent(outboundSeqNo, -1, -1);
1✔
802
          serverStream.statsTraceCtx.inboundMessage(outboundSeqNo);
1✔
803
          serverStream.statsTraceCtx.inboundMessageRead(outboundSeqNo, -1, -1);
1✔
804
          statsTraceCtx.outboundUncompressedSize(messageLength);
1✔
805
          statsTraceCtx.outboundWireSize(messageLength);
1✔
806
          // messageLength should be same at receiver's end as no actual wire is involved.
807
          serverStream.statsTraceCtx.inboundUncompressedSize(messageLength);
1✔
808
          serverStream.statsTraceCtx.inboundWireSize(messageLength);
1✔
809
          outboundSeqNo++;
1✔
810
          StreamListener.MessageProducer producer = new SingleMessageProducer(message);
1✔
811
          if (serverRequested > 0) {
1✔
812
            serverRequested--;
1✔
813
            syncContext.executeLater(() -> serverStreamListener.messagesAvailable(producer));
1✔
814
          } else {
815
            serverReceiveQueue.add(producer);
1✔
816
          }
817
        }
1✔
818
        syncContext.drain();
1✔
819
      }
1✔
820

821
      @Override
822
      public void flush() {}
1✔
823

824
      @Override
825
      public synchronized boolean isReady() {
826
        if (closed) {
1✔
827
          return false;
1✔
828
        }
829
        return serverRequested > 0;
1✔
830
      }
831

832
      // Must be thread-safe for shutdownNow()
833
      @Override
834
      public void cancel(Status reason) {
835
        Status serverStatus = cleanStatus(reason, includeCauseWithStatus);
1✔
836
        if (!internalCancel(serverStatus, serverStatus)) {
1✔
837
          return;
1✔
838
        }
839
        serverStream.clientCancelled(reason);
1✔
840
        streamClosed();
1✔
841
      }
1✔
842

843
      private boolean internalCancel(
844
          Status serverListenerStatus, Status serverTracerStatus) {
845
        synchronized (this) {
1✔
846
          if (closed) {
1✔
847
            return false;
1✔
848
          }
849
          closed = true;
1✔
850

851
          StreamListener.MessageProducer producer;
852
          while ((producer = serverReceiveQueue.poll()) != null) {
1✔
853
            InputStream message;
854
            while ((message = producer.next()) != null) {
1✔
855
              try {
856
                message.close();
1✔
857
              } catch (Throwable t) {
×
858
                log.log(Level.WARNING, "Exception closing stream", t);
×
859
              }
1✔
860
            }
861
          }
1✔
862
          serverStream.statsTraceCtx.streamClosed(serverTracerStatus);
1✔
863
          syncContext.executeLater(() -> serverStreamListener.closed(serverListenerStatus));
1✔
864
        }
1✔
865
        syncContext.drain();
1✔
866
        return true;
1✔
867
      }
868

869
      @Override
870
      public void halfClose() {
871
        synchronized (this) {
1✔
872
          if (closed) {
1✔
873
            return;
1✔
874
          }
875
          if (serverReceiveQueue.isEmpty()) {
1✔
876
            syncContext.executeLater(() -> serverStreamListener.halfClosed());
1✔
877
          } else {
878
            serverNotifyHalfClose = true;
1✔
879
          }
880
        }
1✔
881
        syncContext.drain();
1✔
882
      }
1✔
883

884
      @Override
885
      public void setMessageCompression(boolean enable) {}
×
886

887
      @Override
888
      public void setAuthority(String string) {
889
        InProcessStream.this.authority = string;
×
890
      }
×
891

892
      @Override
893
      public void start(ClientStreamListener listener) {
894
        serverStream.setListener(listener);
1✔
895

896
        synchronized (InProcessTransport.this) {
1✔
897
          statsTraceCtx.clientOutboundHeaders();
1✔
898
          streams.add(InProcessTransport.InProcessStream.this);
1✔
899
          if (GrpcUtil.shouldBeCountedForInUse(callOptions)) {
1✔
900
            inUseState.updateObjectInUse(InProcessTransport.InProcessStream.this, true);
1✔
901
          }
902
          serverTransportListener.streamCreated(serverStream, method.getFullMethodName(), headers);
1✔
903
        }
1✔
904
      }
1✔
905

906
      @Override
907
      public Attributes getAttributes() {
908
        return attributes;
1✔
909
      }
910

911
      @Override
912
      public void optimizeForDirectExecutor() {}
1✔
913

914
      @Override
915
      public void setCompressor(Compressor compressor) {}
1✔
916

917
      @Override
918
      public void setFullStreamDecompression(boolean fullStreamDecompression) {}
×
919

920
      @Override
921
      public void setDecompressorRegistry(DecompressorRegistry decompressorRegistry) {}
1✔
922

923
      @Override
924
      public void setMaxInboundMessageSize(int maxSize) {}
1✔
925

926
      @Override
927
      public void setMaxOutboundMessageSize(int maxSize) {}
1✔
928

929
      @Override
930
      public void setDeadline(Deadline deadline) {
931
        headers.discardAll(TIMEOUT_KEY);
1✔
932
        long effectiveTimeout = max(0, deadline.timeRemaining(TimeUnit.NANOSECONDS));
1✔
933
        headers.put(TIMEOUT_KEY, effectiveTimeout);
1✔
934
      }
1✔
935

936
      @Override
937
      public void appendTimeoutInsight(InsightBuilder insight) {
938
      }
1✔
939
    }
940
  }
941

942
  /**
943
   * Returns a new status with the same code and description.
944
   * If includeCauseWithStatus is true, cause is also included.
945
   *
946
   * <p>For InProcess transport to behave in the same way as the other transports,
947
   * when exchanging statuses between client and server and vice versa,
948
   * the cause should be excluded from the status.
949
   * For easier debugging, the status may be optionally included.
950
   */
951
  private static Status cleanStatus(Status status, boolean includeCauseWithStatus) {
952
    if (status == null) {
1✔
953
      return null;
×
954
    }
955
    Status clientStatus = Status
1✔
956
        .fromCodeValue(status.getCode().value())
1✔
957
        .withDescription(status.getDescription());
1✔
958
    if (includeCauseWithStatus) {
1✔
959
      clientStatus = clientStatus.withCause(status.getCause());
1✔
960
    }
961
    return clientStatus;
1✔
962
  }
963

964
  private static class SingleMessageProducer implements StreamListener.MessageProducer {
965
    private InputStream message;
966

967
    private SingleMessageProducer(InputStream message) {
1✔
968
      this.message = message;
1✔
969
    }
1✔
970

971
    @Nullable
972
    @Override
973
    public InputStream next() {
974
      InputStream messageToReturn = message;
1✔
975
      message = null;
1✔
976
      return messageToReturn;
1✔
977
    }
978
  }
979
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc