• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

grpc / grpc-java / #19438

28 Aug 2024 09:34PM UTC coverage: 84.493% (-0.02%) from 84.516%
#19438

push

github

ejona86
xds: ClusterManagerLB must update child configuration

While child LB policies are unlikey to change for each cluster name (RLS
returns regular cluster names, so should be unique), and the
configuration for CDS policies won't change, RLS configuration can
definitely change.

33407 of 39538 relevant lines covered (84.49%)

0.84 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

92.82
/../inprocess/src/main/java/io/grpc/inprocess/InProcessTransport.java
1
/*
2
 * Copyright 2015 The gRPC Authors
3
 *
4
 * Licensed under the Apache License, Version 2.0 (the "License");
5
 * you may not use this file except in compliance with the License.
6
 * You may obtain a copy of the License at
7
 *
8
 *     http://www.apache.org/licenses/LICENSE-2.0
9
 *
10
 * Unless required by applicable law or agreed to in writing, software
11
 * distributed under the License is distributed on an "AS IS" BASIS,
12
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
 * See the License for the specific language governing permissions and
14
 * limitations under the License.
15
 */
16

17
package io.grpc.inprocess;
18

19
import static com.google.common.base.Preconditions.checkNotNull;
20
import static io.grpc.internal.GrpcUtil.TIMEOUT_KEY;
21
import static java.lang.Math.max;
22

23
import com.google.common.base.MoreObjects;
24
import com.google.common.base.Optional;
25
import com.google.common.util.concurrent.ListenableFuture;
26
import com.google.common.util.concurrent.SettableFuture;
27
import io.grpc.Attributes;
28
import io.grpc.CallOptions;
29
import io.grpc.ClientStreamTracer;
30
import io.grpc.Compressor;
31
import io.grpc.Deadline;
32
import io.grpc.Decompressor;
33
import io.grpc.DecompressorRegistry;
34
import io.grpc.Grpc;
35
import io.grpc.InternalChannelz.SocketStats;
36
import io.grpc.InternalLogId;
37
import io.grpc.InternalMetadata;
38
import io.grpc.Metadata;
39
import io.grpc.MethodDescriptor;
40
import io.grpc.SecurityLevel;
41
import io.grpc.ServerStreamTracer;
42
import io.grpc.Status;
43
import io.grpc.SynchronizationContext;
44
import io.grpc.internal.ClientStream;
45
import io.grpc.internal.ClientStreamListener;
46
import io.grpc.internal.ClientStreamListener.RpcProgress;
47
import io.grpc.internal.ConnectionClientTransport;
48
import io.grpc.internal.GrpcAttributes;
49
import io.grpc.internal.GrpcUtil;
50
import io.grpc.internal.InUseStateAggregator;
51
import io.grpc.internal.InsightBuilder;
52
import io.grpc.internal.ManagedClientTransport;
53
import io.grpc.internal.NoopClientStream;
54
import io.grpc.internal.ObjectPool;
55
import io.grpc.internal.ServerListener;
56
import io.grpc.internal.ServerStream;
57
import io.grpc.internal.ServerStreamListener;
58
import io.grpc.internal.ServerTransport;
59
import io.grpc.internal.ServerTransportListener;
60
import io.grpc.internal.StatsTraceContext;
61
import io.grpc.internal.StreamListener;
62
import java.io.InputStream;
63
import java.net.SocketAddress;
64
import java.util.ArrayDeque;
65
import java.util.ArrayList;
66
import java.util.Collections;
67
import java.util.IdentityHashMap;
68
import java.util.List;
69
import java.util.Locale;
70
import java.util.Set;
71
import java.util.concurrent.Executor;
72
import java.util.concurrent.ScheduledExecutorService;
73
import java.util.concurrent.TimeUnit;
74
import java.util.logging.Level;
75
import java.util.logging.Logger;
76
import javax.annotation.CheckReturnValue;
77
import javax.annotation.Nullable;
78
import javax.annotation.concurrent.GuardedBy;
79
import javax.annotation.concurrent.ThreadSafe;
80

81
@ThreadSafe
82
final class InProcessTransport implements ServerTransport, ConnectionClientTransport {
83
  private static final Logger log = Logger.getLogger(InProcessTransport.class.getName());
1✔
84

85
  private final InternalLogId logId;
86
  private final SocketAddress address;
87
  private final int clientMaxInboundMetadataSize;
88
  private final String authority;
89
  private final String userAgent;
90
  private final Optional<ServerListener> optionalServerListener;
91
  private int serverMaxInboundMetadataSize;
92
  private final boolean includeCauseWithStatus;
93
  private ObjectPool<ScheduledExecutorService> serverSchedulerPool;
94
  private ScheduledExecutorService serverScheduler;
95
  private ServerTransportListener serverTransportListener;
96
  private Attributes serverStreamAttributes;
97
  private ManagedClientTransport.Listener clientTransportListener;
98
  @GuardedBy("this")
99
  private boolean shutdown;
100
  @GuardedBy("this")
101
  private boolean terminated;
102
  @GuardedBy("this")
103
  private Status shutdownStatus;
104
  @GuardedBy("this")
1✔
105
  private final Set<InProcessStream> streams = Collections.newSetFromMap(
1✔
106
          new IdentityHashMap<InProcessStream, Boolean>());
107
  @GuardedBy("this")
108
  private List<ServerStreamTracer.Factory> serverStreamTracerFactories;
109
  private Attributes attributes;
110

111
  private Thread.UncaughtExceptionHandler uncaughtExceptionHandler =
1✔
112
      new Thread.UncaughtExceptionHandler() {
1✔
113
        @Override
114
        public void uncaughtException(Thread t, Throwable e) {
115
          if (e instanceof Error) {
×
116
            throw new Error(e);
×
117
          }
118
          throw new RuntimeException(e);
×
119
        }
120
      };
121

122

123
  @GuardedBy("this")
1✔
124
  private final InUseStateAggregator<InProcessStream> inUseState =
125
      new InUseStateAggregator<InProcessStream>() {
1✔
126
        @Override
127
        protected void handleInUse() {
128
          clientTransportListener.transportInUse(true);
1✔
129
        }
1✔
130

131
        @Override
132
        protected void handleNotInUse() {
133
          clientTransportListener.transportInUse(false);
1✔
134
        }
1✔
135
      };
136

137
  private InProcessTransport(SocketAddress address, int maxInboundMetadataSize, String authority,
138
      String userAgent, Attributes eagAttrs,
139
      Optional<ServerListener> optionalServerListener, boolean includeCauseWithStatus) {
1✔
140
    this.address = address;
1✔
141
    this.clientMaxInboundMetadataSize = maxInboundMetadataSize;
1✔
142
    this.authority = authority;
1✔
143
    this.userAgent = GrpcUtil.getGrpcUserAgent("inprocess", userAgent);
1✔
144
    checkNotNull(eagAttrs, "eagAttrs");
1✔
145
    this.attributes = Attributes.newBuilder()
1✔
146
        .set(GrpcAttributes.ATTR_SECURITY_LEVEL, SecurityLevel.PRIVACY_AND_INTEGRITY)
1✔
147
        .set(GrpcAttributes.ATTR_CLIENT_EAG_ATTRS, eagAttrs)
1✔
148
        .set(Grpc.TRANSPORT_ATTR_REMOTE_ADDR, address)
1✔
149
        .set(Grpc.TRANSPORT_ATTR_LOCAL_ADDR, address)
1✔
150
        .build();
1✔
151
    this.optionalServerListener = optionalServerListener;
1✔
152
    logId = InternalLogId.allocate(getClass(), address.toString());
1✔
153
    this.includeCauseWithStatus = includeCauseWithStatus;
1✔
154
  }
1✔
155

156
  public InProcessTransport(
157
      SocketAddress address, int maxInboundMetadataSize, String authority, String userAgent,
158
      Attributes eagAttrs, boolean includeCauseWithStatus) {
159
    this(address, maxInboundMetadataSize, authority, userAgent, eagAttrs,
1✔
160
        Optional.<ServerListener>absent(), includeCauseWithStatus);
1✔
161
  }
1✔
162

163
  InProcessTransport(
164
      String name, int maxInboundMetadataSize, String authority, String userAgent,
165
      Attributes eagAttrs, ObjectPool<ScheduledExecutorService> serverSchedulerPool,
166
      List<ServerStreamTracer.Factory> serverStreamTracerFactories,
167
      ServerListener serverListener, boolean includeCauseWithStatus) {
168
    this(new InProcessSocketAddress(name), maxInboundMetadataSize, authority, userAgent, eagAttrs,
1✔
169
        Optional.of(serverListener), includeCauseWithStatus);
1✔
170
    this.serverMaxInboundMetadataSize = maxInboundMetadataSize;
1✔
171
    this.serverSchedulerPool = serverSchedulerPool;
1✔
172
    this.serverStreamTracerFactories = serverStreamTracerFactories;
1✔
173
  }
1✔
174

175
  @CheckReturnValue
176
  @Override
177
  public synchronized Runnable start(ManagedClientTransport.Listener listener) {
178
    this.clientTransportListener = listener;
1✔
179
    if (optionalServerListener.isPresent()) {
1✔
180
      serverScheduler = serverSchedulerPool.getObject();
1✔
181
      serverTransportListener = optionalServerListener.get().transportCreated(this);
1✔
182
    } else {
183
      InProcessServer server = InProcessServer.findServer(address);
1✔
184
      if (server != null) {
1✔
185
        serverMaxInboundMetadataSize = server.getMaxInboundMetadataSize();
1✔
186
        serverSchedulerPool = server.getScheduledExecutorServicePool();
1✔
187
        serverScheduler = serverSchedulerPool.getObject();
1✔
188
        serverStreamTracerFactories = server.getStreamTracerFactories();
1✔
189
        // Must be semi-initialized; past this point, can begin receiving requests
190
        serverTransportListener = server.register(this);
1✔
191
      }
192
    }
193
    if (serverTransportListener == null) {
1✔
194
      shutdownStatus = Status.UNAVAILABLE.withDescription("Could not find server: " + address);
1✔
195
      final Status localShutdownStatus = shutdownStatus;
1✔
196
      return new Runnable() {
1✔
197
        @Override
198
        public void run() {
199
          synchronized (InProcessTransport.this) {
1✔
200
            notifyShutdown(localShutdownStatus);
1✔
201
            notifyTerminated();
1✔
202
          }
1✔
203
        }
1✔
204
      };
205
    }
206
    Attributes serverTransportAttrs = Attributes.newBuilder()
1✔
207
        .set(Grpc.TRANSPORT_ATTR_REMOTE_ADDR, address)
1✔
208
        .set(Grpc.TRANSPORT_ATTR_LOCAL_ADDR, address)
1✔
209
        .build();
1✔
210
    serverStreamAttributes = serverTransportListener.transportReady(serverTransportAttrs);
1✔
211
    attributes = clientTransportListener.filterTransport(attributes);
1✔
212
    clientTransportListener.transportReady();
1✔
213
    return null;
1✔
214
  }
215

216
  @Override
217
  public synchronized ClientStream newStream(
218
      MethodDescriptor<?, ?> method, Metadata headers, CallOptions callOptions,
219
      ClientStreamTracer[] tracers) {
220
    StatsTraceContext statsTraceContext =
1✔
221
        StatsTraceContext.newClientContext(tracers, getAttributes(), headers);
1✔
222
    if (shutdownStatus != null) {
1✔
223
      return failedClientStream(statsTraceContext, shutdownStatus);
1✔
224
    }
225

226
    headers.put(GrpcUtil.USER_AGENT_KEY, userAgent);
1✔
227

228
    if (serverMaxInboundMetadataSize != Integer.MAX_VALUE) {
1✔
229
      int metadataSize = metadataSize(headers);
1✔
230
      if (metadataSize > serverMaxInboundMetadataSize) {
1✔
231
        // Other transports would compute a status with:
232
        //   GrpcUtil.httpStatusToGrpcStatus(431 /* Request Header Fields Too Large */);
233
        // However, that isn't handled specially today, so we'd leak HTTP-isms even though we're
234
        // in-process. We go ahead and make a Status, which may need to be updated if
235
        // statuscodes.md is updated.
236
        Status status = Status.RESOURCE_EXHAUSTED.withDescription(
1✔
237
            String.format(
1✔
238
                Locale.US,
239
                "Request metadata larger than %d: %d",
240
                serverMaxInboundMetadataSize,
1✔
241
                metadataSize));
1✔
242
        return failedClientStream(statsTraceContext, status);
1✔
243
      }
244
    }
245

246
    return new InProcessStream(method, headers, callOptions, authority, statsTraceContext)
1✔
247
        .clientStream;
1✔
248
  }
249

250
  private ClientStream failedClientStream(
251
      final StatsTraceContext statsTraceCtx, final Status status) {
252
    return new NoopClientStream() {
1✔
253
        @Override
254
        public void start(ClientStreamListener listener) {
255
          statsTraceCtx.clientOutboundHeaders();
1✔
256
          statsTraceCtx.streamClosed(status);
1✔
257
          listener.closed(status, RpcProgress.PROCESSED, new Metadata());
1✔
258
        }
1✔
259
      };
260
  }
261

262
  @Override
263
  public synchronized void ping(final PingCallback callback, Executor executor) {
264
    if (terminated) {
1✔
265
      final Status shutdownStatus = this.shutdownStatus;
1✔
266
      executor.execute(new Runnable() {
1✔
267
        @Override
268
        public void run() {
269
          callback.onFailure(shutdownStatus.asRuntimeException());
1✔
270
        }
1✔
271
      });
272
    } else {
1✔
273
      executor.execute(new Runnable() {
1✔
274
        @Override
275
        public void run() {
276
          callback.onSuccess(0);
1✔
277
        }
1✔
278
      });
279
    }
280
  }
1✔
281

282
  @Override
283
  public synchronized void shutdown(Status reason) {
284
    // Can be called multiple times: once for ManagedClientTransport, once for ServerTransport.
285
    if (shutdown) {
1✔
286
      return;
1✔
287
    }
288
    shutdownStatus = reason;
1✔
289
    notifyShutdown(reason);
1✔
290
    if (streams.isEmpty()) {
1✔
291
      notifyTerminated();
1✔
292
    }
293
  }
1✔
294

295
  @Override
296
  public synchronized void shutdown() {
297
    shutdown(Status.UNAVAILABLE.withDescription("InProcessTransport shutdown by the server-side"));
1✔
298
  }
1✔
299

300
  @Override
301
  public void shutdownNow(Status reason) {
302
    checkNotNull(reason, "reason");
1✔
303
    List<InProcessStream> streamsCopy;
304
    synchronized (this) {
1✔
305
      shutdown(reason);
1✔
306
      if (terminated) {
1✔
307
        return;
1✔
308
      }
309
      streamsCopy = new ArrayList<>(streams);
1✔
310
    }
1✔
311
    for (InProcessStream stream : streamsCopy) {
1✔
312
      stream.clientStream.cancel(reason);
1✔
313
    }
1✔
314
  }
1✔
315

316
  @Override
317
  public String toString() {
318
    return MoreObjects.toStringHelper(this)
×
319
        .add("logId", logId.getId())
×
320
        .add("address", address)
×
321
        .toString();
×
322
  }
323

324
  @Override
325
  public InternalLogId getLogId() {
326
    return logId;
1✔
327
  }
328

329
  @Override
330
  public Attributes getAttributes() {
331
    return attributes;
1✔
332
  }
333

334
  @Override
335
  public ScheduledExecutorService getScheduledExecutorService() {
336
    return serverScheduler;
1✔
337
  }
338

339
  @Override
340
  public ListenableFuture<SocketStats> getStats() {
341
    SettableFuture<SocketStats> ret = SettableFuture.create();
×
342
    ret.set(null);
×
343
    return ret;
×
344
  }
345

346
  private synchronized void notifyShutdown(Status s) {
347
    if (shutdown) {
1✔
348
      return;
×
349
    }
350
    shutdown = true;
1✔
351
    clientTransportListener.transportShutdown(s);
1✔
352
  }
1✔
353

354
  private synchronized void notifyTerminated() {
355
    if (terminated) {
1✔
356
      return;
×
357
    }
358
    terminated = true;
1✔
359
    if (serverScheduler != null) {
1✔
360
      serverScheduler = serverSchedulerPool.returnObject(serverScheduler);
1✔
361
    }
362
    clientTransportListener.transportTerminated();
1✔
363
    if (serverTransportListener != null) {
1✔
364
      serverTransportListener.transportTerminated();
1✔
365
    }
366
  }
1✔
367

368
  private static int metadataSize(Metadata metadata) {
369
    byte[][] serialized = InternalMetadata.serialize(metadata);
1✔
370
    if (serialized == null) {
1✔
371
      return 0;
×
372
    }
373
    // Calculate based on SETTINGS_MAX_HEADER_LIST_SIZE in RFC 7540 §6.5.2. We could use something
374
    // different, but it's "sane."
375
    long size = 0;
1✔
376
    for (int i = 0; i < serialized.length; i += 2) {
1✔
377
      size += 32 + serialized[i].length + serialized[i + 1].length;
1✔
378
    }
379
    size = Math.min(size, Integer.MAX_VALUE);
1✔
380
    return (int) size;
1✔
381
  }
382

383
  private class InProcessStream {
384
    private final InProcessClientStream clientStream;
385
    private final InProcessServerStream serverStream;
386
    private final CallOptions callOptions;
387
    private final Metadata headers;
388
    private final MethodDescriptor<?, ?> method;
389
    private volatile String authority;
390

391
    private InProcessStream(
392
        MethodDescriptor<?, ?> method, Metadata headers, CallOptions callOptions,
393
        String authority , StatsTraceContext statsTraceContext) {
1✔
394
      this.method = checkNotNull(method, "method");
1✔
395
      this.headers = checkNotNull(headers, "headers");
1✔
396
      this.callOptions = checkNotNull(callOptions, "callOptions");
1✔
397
      this.authority = authority;
1✔
398
      this.clientStream = new InProcessClientStream(callOptions, statsTraceContext);
1✔
399
      this.serverStream = new InProcessServerStream(method, headers);
1✔
400
    }
1✔
401

402
    // Can be called multiple times due to races on both client and server closing at same time.
403
    private void streamClosed() {
404
      synchronized (InProcessTransport.this) {
1✔
405
        boolean justRemovedAnElement = streams.remove(this);
1✔
406
        if (GrpcUtil.shouldBeCountedForInUse(callOptions)) {
1✔
407
          inUseState.updateObjectInUse(this, false);
1✔
408
        }
409
        if (streams.isEmpty() && justRemovedAnElement) {
1✔
410
          if (shutdown) {
1✔
411
            notifyTerminated();
1✔
412
          }
413
        }
414
      }
1✔
415
    }
1✔
416

417
    private class InProcessServerStream implements ServerStream {
418
      final StatsTraceContext statsTraceCtx;
419
      // All callbacks must run in syncContext to avoid possibility of deadlock in direct executors
420
      private ClientStreamListener clientStreamListener;
421
      private final SynchronizationContext syncContext =
1✔
422
          new SynchronizationContext(uncaughtExceptionHandler);
1✔
423
      @GuardedBy("this")
424
      private int clientRequested;
425
      @GuardedBy("this")
1✔
426
      private ArrayDeque<StreamListener.MessageProducer> clientReceiveQueue =
427
          new ArrayDeque<>();
428
      @GuardedBy("this")
429
      private Status clientNotifyStatus;
430
      @GuardedBy("this")
431
      private Metadata clientNotifyTrailers;
432
      // Only is intended to prevent double-close when client cancels.
433
      @GuardedBy("this")
434
      private boolean closed;
435
      @GuardedBy("this")
436
      private int outboundSeqNo;
437

438
      InProcessServerStream(MethodDescriptor<?, ?> method, Metadata headers) {
1✔
439
        statsTraceCtx = StatsTraceContext.newServerContext(
1✔
440
            serverStreamTracerFactories, method.getFullMethodName(), headers);
1✔
441
      }
1✔
442

443
      private synchronized void setListener(ClientStreamListener listener) {
444
        clientStreamListener = listener;
1✔
445
      }
1✔
446

447
      @Override
448
      public void setListener(ServerStreamListener serverStreamListener) {
449
        clientStream.setListener(serverStreamListener);
1✔
450
      }
1✔
451

452
      @Override
453
      public void request(int numMessages) {
454
        boolean onReady = clientStream.serverRequested(numMessages);
1✔
455
        if (onReady) {
1✔
456
          synchronized (this) {
1✔
457
            if (!closed) {
1✔
458
              syncContext.executeLater(() -> clientStreamListener.onReady());
1✔
459
            }
460
          }
1✔
461
        }
462
        syncContext.drain();
1✔
463
      }
1✔
464

465
      // This method is the only reason we have to synchronize field accesses.
466
      /**
467
       * Client requested more messages.
468
       *
469
       * @return whether onReady should be called on the server
470
       */
471
      private boolean clientRequested(int numMessages) {
472
        boolean previouslyReady;
473
        boolean nowReady;
474
        synchronized (this) {
1✔
475
          if (closed) {
1✔
476
            return false;
1✔
477
          }
478

479
          previouslyReady = clientRequested > 0;
1✔
480
          clientRequested += numMessages;
1✔
481
          while (clientRequested > 0 && !clientReceiveQueue.isEmpty()) {
1✔
482
            clientRequested--;
1✔
483
            StreamListener.MessageProducer producer = clientReceiveQueue.poll();
1✔
484
            syncContext.executeLater(() -> clientStreamListener.messagesAvailable(producer));
1✔
485
          }
1✔
486

487
          if (clientReceiveQueue.isEmpty() && clientNotifyStatus != null) {
1✔
488
            closed = true;
1✔
489
            clientStream.statsTraceCtx.clientInboundTrailers(clientNotifyTrailers);
1✔
490
            clientStream.statsTraceCtx.streamClosed(clientNotifyStatus);
1✔
491
            Status notifyStatus = this.clientNotifyStatus;
1✔
492
            Metadata notifyTrailers = this.clientNotifyTrailers;
1✔
493
            syncContext.executeLater(() ->
1✔
494
                clientStreamListener.closed(notifyStatus, RpcProgress.PROCESSED, notifyTrailers));
1✔
495
          }
496

497
          nowReady = clientRequested > 0;
1✔
498
        }
1✔
499

500
        syncContext.drain();
1✔
501
        return !previouslyReady && nowReady;
1✔
502
      }
503

504
      private void clientCancelled(Status status) {
505
        internalCancel(status);
1✔
506
      }
1✔
507

508
      @Override
509
      public void writeMessage(InputStream message) {
510
        synchronized (this) {
1✔
511
          if (closed) {
1✔
512
            return;
1✔
513
          }
514
          statsTraceCtx.outboundMessage(outboundSeqNo);
1✔
515
          statsTraceCtx.outboundMessageSent(outboundSeqNo, -1, -1);
1✔
516
          clientStream.statsTraceCtx.inboundMessage(outboundSeqNo);
1✔
517
          clientStream.statsTraceCtx.inboundMessageRead(outboundSeqNo, -1, -1);
1✔
518
          outboundSeqNo++;
1✔
519
          StreamListener.MessageProducer producer = new SingleMessageProducer(message);
1✔
520
          if (clientRequested > 0) {
1✔
521
            clientRequested--;
1✔
522
            syncContext.executeLater(() -> clientStreamListener.messagesAvailable(producer));
1✔
523
          } else {
524
            clientReceiveQueue.add(producer);
1✔
525
          }
526
        }
1✔
527

528
        syncContext.drain();
1✔
529
      }
1✔
530

531
      @Override
532
      public void flush() {}
1✔
533

534
      @Override
535
      public synchronized boolean isReady() {
536
        if (closed) {
1✔
537
          return false;
×
538
        }
539
        return clientRequested > 0;
1✔
540
      }
541

542
      @Override
543
      public void writeHeaders(Metadata headers, boolean flush) {
544
        if (clientMaxInboundMetadataSize != Integer.MAX_VALUE) {
1✔
545
          int metadataSize = metadataSize(headers);
1✔
546
          if (metadataSize > clientMaxInboundMetadataSize) {
1✔
547
            Status serverStatus = Status.CANCELLED.withDescription("Client cancelled the RPC");
1✔
548
            clientStream.serverClosed(serverStatus, serverStatus);
1✔
549
            // Other transports provide very little information in this case. We go ahead and make a
550
            // Status, which may need to be updated if statuscodes.md is updated.
551
            Status failedStatus = Status.RESOURCE_EXHAUSTED.withDescription(
1✔
552
                String.format(
1✔
553
                    Locale.US,
554
                    "Response header metadata larger than %d: %d",
555
                    clientMaxInboundMetadataSize,
1✔
556
                    metadataSize));
1✔
557
            notifyClientClose(failedStatus, new Metadata());
1✔
558
            return;
1✔
559
          }
560
        }
561

562
        synchronized (this) {
1✔
563
          if (closed) {
1✔
564
            return;
1✔
565
          }
566

567
          clientStream.statsTraceCtx.clientInboundHeaders(headers);
1✔
568
          syncContext.executeLater(() -> clientStreamListener.headersRead(headers));
1✔
569
        }
1✔
570
        syncContext.drain();
1✔
571
      }
1✔
572

573
      @Override
574
      public void close(Status status, Metadata trailers) {
575
        // clientStream.serverClosed must happen before clientStreamListener.closed, otherwise
576
        // clientStreamListener.closed can trigger clientStream.cancel (see code in
577
        // ClientCalls.blockingUnaryCall), which may race with clientStream.serverClosed as both are
578
        // calling internalCancel().
579
        clientStream.serverClosed(Status.OK, status);
1✔
580

581
        if (clientMaxInboundMetadataSize != Integer.MAX_VALUE) {
1✔
582
          int statusSize = status.getDescription() == null ? 0 : status.getDescription().length();
1✔
583
          // Go ahead and throw in the status description's length, since that could be very long.
584
          int metadataSize = metadataSize(trailers) + statusSize;
1✔
585
          if (metadataSize > clientMaxInboundMetadataSize) {
1✔
586
            // Override the status for the client, but not the server. Transports do not guarantee
587
            // notifying the server of the failure.
588

589
            // Other transports provide very little information in this case. We go ahead and make a
590
            // Status, which may need to be updated if statuscodes.md is updated.
591
            status = Status.RESOURCE_EXHAUSTED.withDescription(
1✔
592
                String.format(
1✔
593
                    Locale.US,
594
                    "Response header metadata larger than %d: %d",
595
                    clientMaxInboundMetadataSize,
1✔
596
                    metadataSize));
1✔
597
            trailers = new Metadata();
1✔
598
          }
599
        }
600

601
        notifyClientClose(status, trailers);
1✔
602
      }
1✔
603

604
      /** clientStream.serverClosed() must be called before this method */
605
      private void notifyClientClose(Status status, Metadata trailers) {
606
        Status clientStatus = cleanStatus(status, includeCauseWithStatus);
1✔
607
        synchronized (this) {
1✔
608
          if (closed) {
1✔
609
            return;
1✔
610
          }
611
          if (clientReceiveQueue.isEmpty()) {
1✔
612
            closed = true;
1✔
613
            clientStream.statsTraceCtx.clientInboundTrailers(trailers);
1✔
614
            clientStream.statsTraceCtx.streamClosed(clientStatus);
1✔
615
            syncContext.executeLater(
1✔
616
                () -> clientStreamListener.closed(clientStatus, RpcProgress.PROCESSED, trailers));
1✔
617
          } else {
618
            clientNotifyStatus = clientStatus;
1✔
619
            clientNotifyTrailers = trailers;
1✔
620
          }
621
        }
1✔
622
        syncContext.drain();
1✔
623
        streamClosed();
1✔
624
      }
1✔
625

626
      @Override
627
      public void cancel(Status status) {
628
        if (!internalCancel(Status.CANCELLED.withDescription("server cancelled stream"))) {
1✔
629
          return;
1✔
630
        }
631
        clientStream.serverClosed(status, status);
1✔
632
        streamClosed();
1✔
633
      }
1✔
634

635
      private boolean internalCancel(Status clientStatus) {
636
        synchronized (this) {
1✔
637
          if (closed) {
1✔
638
            return false;
1✔
639
          }
640
          closed = true;
1✔
641
          StreamListener.MessageProducer producer;
642
          while ((producer = clientReceiveQueue.poll()) != null) {
1✔
643
            InputStream message;
644
            while ((message = producer.next()) != null) {
×
645
              try {
646
                message.close();
×
647
              } catch (Throwable t) {
×
648
                log.log(Level.WARNING, "Exception closing stream", t);
×
649
              }
×
650
            }
651
          }
×
652
          clientStream.statsTraceCtx.streamClosed(clientStatus);
1✔
653
          syncContext.executeLater(
1✔
654
              () ->
655
                  clientStreamListener.closed(clientStatus, RpcProgress.PROCESSED, new Metadata()));
1✔
656
        }
1✔
657
        syncContext.drain();
1✔
658
        return true;
1✔
659
      }
660

661
      @Override
662
      public void setMessageCompression(boolean enable) {
663
        // noop
664
      }
×
665

666
      @Override
667
      public void optimizeForDirectExecutor() {}
1✔
668

669
      @Override
670
      public void setCompressor(Compressor compressor) {}
1✔
671

672
      @Override
673
      public void setDecompressor(Decompressor decompressor) {}
×
674

675
      @Override public Attributes getAttributes() {
676
        return serverStreamAttributes;
1✔
677
      }
678

679
      @Override
680
      public String getAuthority() {
681
        return InProcessStream.this.authority;
1✔
682
      }
683

684
      @Override
685
      public StatsTraceContext statsTraceContext() {
686
        return statsTraceCtx;
1✔
687
      }
688

689
      @Override
690
      public int streamId() {
691
        return -1;
1✔
692
      }
693

694
      @Override
695
      public void setOnReadyThreshold(int numBytes) {
696
        // noop
697
      }
×
698
    }
699

700
    private class InProcessClientStream implements ClientStream {
701
      final StatsTraceContext statsTraceCtx;
702
      final CallOptions callOptions;
703
      // All callbacks must run in syncContext to avoid possibility of deadlock in direct executors
704
      private ServerStreamListener serverStreamListener;
705
      private final SynchronizationContext syncContext =
1✔
706
          new SynchronizationContext(uncaughtExceptionHandler);
1✔
707
      @GuardedBy("this")
708
      private int serverRequested;
709
      @GuardedBy("this")
1✔
710
      private ArrayDeque<StreamListener.MessageProducer> serverReceiveQueue =
711
          new ArrayDeque<>();
712
      @GuardedBy("this")
713
      private boolean serverNotifyHalfClose;
714
      // Only is intended to prevent double-close when server closes.
715
      @GuardedBy("this")
716
      private boolean closed;
717
      @GuardedBy("this")
718
      private int outboundSeqNo;
719

720
      InProcessClientStream(
721
          CallOptions callOptions, StatsTraceContext statsTraceContext) {
1✔
722
        this.callOptions = callOptions;
1✔
723
        statsTraceCtx = statsTraceContext;
1✔
724
      }
1✔
725

726
      private synchronized void setListener(ServerStreamListener listener) {
727
        this.serverStreamListener = listener;
1✔
728
      }
1✔
729

730
      @Override
731
      public void request(int numMessages) {
732
        boolean onReady = serverStream.clientRequested(numMessages);
1✔
733
        if (onReady) {
1✔
734
          synchronized (this) {
1✔
735
            if (!closed) {
1✔
736
              syncContext.executeLater(() -> serverStreamListener.onReady());
1✔
737
            }
738
          }
1✔
739
          syncContext.drain();
1✔
740
        }
741
      }
1✔
742

743
      // This method is the only reason we have to synchronize field accesses.
744
      /**
745
       * Client requested more messages.
746
       *
747
       * @return whether onReady should be called on the server
748
       */
749
      private boolean serverRequested(int numMessages) {
750
        boolean previouslyReady;
751
        boolean nowReady;
752
        synchronized (this) {
1✔
753
          if (closed) {
1✔
754
            return false;
1✔
755
          }
756
          previouslyReady = serverRequested > 0;
1✔
757
          serverRequested += numMessages;
1✔
758

759
          while (serverRequested > 0 && !serverReceiveQueue.isEmpty()) {
1✔
760
            serverRequested--;
1✔
761
            StreamListener.MessageProducer producer = serverReceiveQueue.poll();
1✔
762
            syncContext.executeLater(() -> serverStreamListener.messagesAvailable(producer));
1✔
763
          }
1✔
764

765
          if (serverReceiveQueue.isEmpty() && serverNotifyHalfClose) {
1✔
766
            serverNotifyHalfClose = false;
1✔
767
            syncContext.executeLater(() -> serverStreamListener.halfClosed());
1✔
768
          }
769
          nowReady = serverRequested > 0;
1✔
770
        }
1✔
771
        syncContext.drain();
1✔
772
        return !previouslyReady && nowReady;
1✔
773
      }
774

775
      private void serverClosed(Status serverListenerStatus, Status serverTracerStatus) {
776
        internalCancel(serverListenerStatus, serverTracerStatus);
1✔
777
      }
1✔
778

779
      @Override
780
      public void writeMessage(InputStream message) {
781
        synchronized (this) {
1✔
782
          if (closed) {
1✔
783
            return;
1✔
784
          }
785
          statsTraceCtx.outboundMessage(outboundSeqNo);
1✔
786
          statsTraceCtx.outboundMessageSent(outboundSeqNo, -1, -1);
1✔
787
          serverStream.statsTraceCtx.inboundMessage(outboundSeqNo);
1✔
788
          serverStream.statsTraceCtx.inboundMessageRead(outboundSeqNo, -1, -1);
1✔
789
          outboundSeqNo++;
1✔
790
          StreamListener.MessageProducer producer = new SingleMessageProducer(message);
1✔
791
          if (serverRequested > 0) {
1✔
792
            serverRequested--;
1✔
793
            syncContext.executeLater(() -> serverStreamListener.messagesAvailable(producer));
1✔
794
          } else {
795
            serverReceiveQueue.add(producer);
1✔
796
          }
797
        }
1✔
798
        syncContext.drain();
1✔
799
      }
1✔
800

801
      @Override
802
      public void flush() {}
1✔
803

804
      @Override
805
      public synchronized boolean isReady() {
806
        if (closed) {
1✔
807
          return false;
×
808
        }
809
        return serverRequested > 0;
1✔
810
      }
811

812
      // Must be thread-safe for shutdownNow()
813
      @Override
814
      public void cancel(Status reason) {
815
        Status serverStatus = cleanStatus(reason, includeCauseWithStatus);
1✔
816
        if (!internalCancel(serverStatus, serverStatus)) {
1✔
817
          return;
1✔
818
        }
819
        serverStream.clientCancelled(reason);
1✔
820
        streamClosed();
1✔
821
      }
1✔
822

823
      private boolean internalCancel(
824
          Status serverListenerStatus, Status serverTracerStatus) {
825
        synchronized (this) {
1✔
826
          if (closed) {
1✔
827
            return false;
1✔
828
          }
829
          closed = true;
1✔
830

831
          StreamListener.MessageProducer producer;
832
          while ((producer = serverReceiveQueue.poll()) != null) {
1✔
833
            InputStream message;
834
            while ((message = producer.next()) != null) {
1✔
835
              try {
836
                message.close();
1✔
837
              } catch (Throwable t) {
×
838
                log.log(Level.WARNING, "Exception closing stream", t);
×
839
              }
1✔
840
            }
841
          }
1✔
842
          serverStream.statsTraceCtx.streamClosed(serverTracerStatus);
1✔
843
          syncContext.executeLater(() -> serverStreamListener.closed(serverListenerStatus));
1✔
844
        }
1✔
845
        syncContext.drain();
1✔
846
        return true;
1✔
847
      }
848

849
      @Override
850
      public void halfClose() {
851
        synchronized (this) {
1✔
852
          if (closed) {
1✔
853
            return;
1✔
854
          }
855
          if (serverReceiveQueue.isEmpty()) {
1✔
856
            syncContext.executeLater(() -> serverStreamListener.halfClosed());
1✔
857
          } else {
858
            serverNotifyHalfClose = true;
1✔
859
          }
860
        }
1✔
861
        syncContext.drain();
1✔
862
      }
1✔
863

864
      @Override
865
      public void setMessageCompression(boolean enable) {}
×
866

867
      @Override
868
      public void setAuthority(String string) {
869
        InProcessStream.this.authority = string;
×
870
      }
×
871

872
      @Override
873
      public void start(ClientStreamListener listener) {
874
        serverStream.setListener(listener);
1✔
875

876
        synchronized (InProcessTransport.this) {
1✔
877
          statsTraceCtx.clientOutboundHeaders();
1✔
878
          streams.add(InProcessTransport.InProcessStream.this);
1✔
879
          if (GrpcUtil.shouldBeCountedForInUse(callOptions)) {
1✔
880
            inUseState.updateObjectInUse(InProcessTransport.InProcessStream.this, true);
1✔
881
          }
882
          serverTransportListener.streamCreated(serverStream, method.getFullMethodName(), headers);
1✔
883
        }
1✔
884
      }
1✔
885

886
      @Override
887
      public Attributes getAttributes() {
888
        return attributes;
1✔
889
      }
890

891
      @Override
892
      public void optimizeForDirectExecutor() {}
1✔
893

894
      @Override
895
      public void setCompressor(Compressor compressor) {}
1✔
896

897
      @Override
898
      public void setFullStreamDecompression(boolean fullStreamDecompression) {}
×
899

900
      @Override
901
      public void setDecompressorRegistry(DecompressorRegistry decompressorRegistry) {}
1✔
902

903
      @Override
904
      public void setMaxInboundMessageSize(int maxSize) {}
1✔
905

906
      @Override
907
      public void setMaxOutboundMessageSize(int maxSize) {}
1✔
908

909
      @Override
910
      public void setDeadline(Deadline deadline) {
911
        headers.discardAll(TIMEOUT_KEY);
1✔
912
        long effectiveTimeout = max(0, deadline.timeRemaining(TimeUnit.NANOSECONDS));
1✔
913
        headers.put(TIMEOUT_KEY, effectiveTimeout);
1✔
914
      }
1✔
915

916
      @Override
917
      public void appendTimeoutInsight(InsightBuilder insight) {
918
      }
1✔
919
    }
920
  }
921

922
  /**
923
   * Returns a new status with the same code and description.
924
   * If includeCauseWithStatus is true, cause is also included.
925
   *
926
   * <p>For InProcess transport to behave in the same way as the other transports,
927
   * when exchanging statuses between client and server and vice versa,
928
   * the cause should be excluded from the status.
929
   * For easier debugging, the status may be optionally included.
930
   */
931
  private static Status cleanStatus(Status status, boolean includeCauseWithStatus) {
932
    if (status == null) {
1✔
933
      return null;
×
934
    }
935
    Status clientStatus = Status
1✔
936
        .fromCodeValue(status.getCode().value())
1✔
937
        .withDescription(status.getDescription());
1✔
938
    if (includeCauseWithStatus) {
1✔
939
      clientStatus = clientStatus.withCause(status.getCause());
1✔
940
    }
941
    return clientStatus;
1✔
942
  }
943

944
  private static class SingleMessageProducer implements StreamListener.MessageProducer {
945
    private InputStream message;
946

947
    private SingleMessageProducer(InputStream message) {
1✔
948
      this.message = message;
1✔
949
    }
1✔
950

951
    @Nullable
952
    @Override
953
    public InputStream next() {
954
      InputStream messageToReturn = message;
1✔
955
      message = null;
1✔
956
      return messageToReturn;
1✔
957
    }
958
  }
959
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc