• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

grpc / grpc-java / #19628

07 Jan 2025 08:35PM UTC coverage: 88.561% (-0.008%) from 88.569%
#19628

push

github

ejona86
xds: Remember nonces for unknown types

If the control plane sends a resource type the client doesn't understand
at-the-moment, the control plane will still expect the client to include
the nonce if the client subscribes to the type in the future.

This most easily happens when unsubscribing the last resource of a type.
Which meant 1cf1927d1 was insufficient.

33369 of 37679 relevant lines covered (88.56%)

0.89 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

86.3
/../inprocess/src/main/java/io/grpc/inprocess/InProcessTransport.java
1
/*
2
 * Copyright 2015 The gRPC Authors
3
 *
4
 * Licensed under the Apache License, Version 2.0 (the "License");
5
 * you may not use this file except in compliance with the License.
6
 * You may obtain a copy of the License at
7
 *
8
 *     http://www.apache.org/licenses/LICENSE-2.0
9
 *
10
 * Unless required by applicable law or agreed to in writing, software
11
 * distributed under the License is distributed on an "AS IS" BASIS,
12
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
 * See the License for the specific language governing permissions and
14
 * limitations under the License.
15
 */
16

17
package io.grpc.inprocess;
18

19
import static com.google.common.base.Preconditions.checkNotNull;
20
import static io.grpc.internal.GrpcUtil.TIMEOUT_KEY;
21
import static java.lang.Math.max;
22

23
import com.google.common.base.MoreObjects;
24
import com.google.common.io.ByteStreams;
25
import com.google.common.util.concurrent.ListenableFuture;
26
import com.google.common.util.concurrent.SettableFuture;
27
import io.grpc.Attributes;
28
import io.grpc.CallOptions;
29
import io.grpc.ClientStreamTracer;
30
import io.grpc.Compressor;
31
import io.grpc.Deadline;
32
import io.grpc.Decompressor;
33
import io.grpc.DecompressorRegistry;
34
import io.grpc.Grpc;
35
import io.grpc.InternalChannelz.SocketStats;
36
import io.grpc.InternalLogId;
37
import io.grpc.InternalMetadata;
38
import io.grpc.KnownLength;
39
import io.grpc.Metadata;
40
import io.grpc.MethodDescriptor;
41
import io.grpc.SecurityLevel;
42
import io.grpc.ServerStreamTracer;
43
import io.grpc.Status;
44
import io.grpc.SynchronizationContext;
45
import io.grpc.internal.ClientStream;
46
import io.grpc.internal.ClientStreamListener;
47
import io.grpc.internal.ClientStreamListener.RpcProgress;
48
import io.grpc.internal.ConnectionClientTransport;
49
import io.grpc.internal.GrpcAttributes;
50
import io.grpc.internal.GrpcUtil;
51
import io.grpc.internal.InUseStateAggregator;
52
import io.grpc.internal.InsightBuilder;
53
import io.grpc.internal.ManagedClientTransport;
54
import io.grpc.internal.NoopClientStream;
55
import io.grpc.internal.ObjectPool;
56
import io.grpc.internal.ServerStream;
57
import io.grpc.internal.ServerStreamListener;
58
import io.grpc.internal.ServerTransport;
59
import io.grpc.internal.ServerTransportListener;
60
import io.grpc.internal.StatsTraceContext;
61
import io.grpc.internal.StreamListener;
62
import java.io.ByteArrayInputStream;
63
import java.io.InputStream;
64
import java.net.SocketAddress;
65
import java.util.ArrayDeque;
66
import java.util.ArrayList;
67
import java.util.Collections;
68
import java.util.IdentityHashMap;
69
import java.util.List;
70
import java.util.Locale;
71
import java.util.Set;
72
import java.util.concurrent.Executor;
73
import java.util.concurrent.ScheduledExecutorService;
74
import java.util.concurrent.TimeUnit;
75
import java.util.logging.Level;
76
import java.util.logging.Logger;
77
import javax.annotation.CheckReturnValue;
78
import javax.annotation.Nullable;
79
import javax.annotation.concurrent.GuardedBy;
80
import javax.annotation.concurrent.ThreadSafe;
81

82
@ThreadSafe
83
final class InProcessTransport implements ServerTransport, ConnectionClientTransport {
84
  private static final Logger log = Logger.getLogger(InProcessTransport.class.getName());
1✔
85
  static boolean isEnabledSupportTracingMessageSizes =
1✔
86
      GrpcUtil.getFlag("GRPC_EXPERIMENTAL_SUPPORT_TRACING_MESSAGE_SIZES", false);
1✔
87

88
  private final InternalLogId logId;
89
  private final SocketAddress address;
90
  private final int clientMaxInboundMetadataSize;
91
  private final String authority;
92
  private final String userAgent;
93
  private int serverMaxInboundMetadataSize;
94
  private final boolean includeCauseWithStatus;
95
  private ObjectPool<ScheduledExecutorService> serverSchedulerPool;
96
  private ScheduledExecutorService serverScheduler;
97
  private ServerTransportListener serverTransportListener;
98
  private Attributes serverStreamAttributes;
99
  private ManagedClientTransport.Listener clientTransportListener;
100
  // The size is assumed from the sender's side.
101
  private final long assumedMessageSize;
102
  @GuardedBy("this")
103
  private boolean shutdown;
104
  @GuardedBy("this")
105
  private boolean terminated;
106
  @GuardedBy("this")
107
  private Status shutdownStatus;
108
  @GuardedBy("this")
1✔
109
  private final Set<InProcessStream> streams = Collections.newSetFromMap(
1✔
110
          new IdentityHashMap<InProcessStream, Boolean>());
111
  @GuardedBy("this")
112
  private List<ServerStreamTracer.Factory> serverStreamTracerFactories;
113
  private Attributes attributes;
114

115
  private Thread.UncaughtExceptionHandler uncaughtExceptionHandler =
1✔
116
      new Thread.UncaughtExceptionHandler() {
1✔
117
        @Override
118
        public void uncaughtException(Thread t, Throwable e) {
119
          if (e instanceof Error) {
×
120
            throw new Error(e);
×
121
          }
122
          throw new RuntimeException(e);
×
123
        }
124
      };
125

126

127
  @GuardedBy("this")
1✔
128
  private final InUseStateAggregator<InProcessStream> inUseState =
129
      new InUseStateAggregator<InProcessStream>() {
1✔
130
        @Override
131
        protected void handleInUse() {
132
          clientTransportListener.transportInUse(true);
1✔
133
        }
1✔
134

135
        @Override
136
        protected void handleNotInUse() {
137
          clientTransportListener.transportInUse(false);
1✔
138
        }
1✔
139
      };
140

141
  public InProcessTransport(SocketAddress address, int maxInboundMetadataSize, String authority,
142
      String userAgent, Attributes eagAttrs,
143
      boolean includeCauseWithStatus, long assumedMessageSize) {
1✔
144
    this.address = address;
1✔
145
    this.clientMaxInboundMetadataSize = maxInboundMetadataSize;
1✔
146
    this.authority = authority;
1✔
147
    this.userAgent = GrpcUtil.getGrpcUserAgent("inprocess", userAgent);
1✔
148
    checkNotNull(eagAttrs, "eagAttrs");
1✔
149
    this.attributes = Attributes.newBuilder()
1✔
150
        .set(GrpcAttributes.ATTR_SECURITY_LEVEL, SecurityLevel.PRIVACY_AND_INTEGRITY)
1✔
151
        .set(GrpcAttributes.ATTR_CLIENT_EAG_ATTRS, eagAttrs)
1✔
152
        .set(Grpc.TRANSPORT_ATTR_REMOTE_ADDR, address)
1✔
153
        .set(Grpc.TRANSPORT_ATTR_LOCAL_ADDR, address)
1✔
154
        .build();
1✔
155
    logId = InternalLogId.allocate(getClass(), address.toString());
1✔
156
    this.includeCauseWithStatus = includeCauseWithStatus;
1✔
157
    this.assumedMessageSize = assumedMessageSize;
1✔
158
  }
1✔
159

160
  @CheckReturnValue
161
  @Override
162
  public synchronized Runnable start(ManagedClientTransport.Listener listener) {
163
    this.clientTransportListener = listener;
1✔
164
    InProcessServer server = InProcessServer.findServer(address);
1✔
165
    if (server != null) {
1✔
166
      serverMaxInboundMetadataSize = server.getMaxInboundMetadataSize();
1✔
167
      serverSchedulerPool = server.getScheduledExecutorServicePool();
1✔
168
      serverScheduler = serverSchedulerPool.getObject();
1✔
169
      serverStreamTracerFactories = server.getStreamTracerFactories();
1✔
170
      // Must be semi-initialized; past this point, can begin receiving requests
171
      serverTransportListener = server.register(this);
1✔
172
    }
173
    if (serverTransportListener == null) {
1✔
174
      shutdownStatus = Status.UNAVAILABLE.withDescription("Could not find server: " + address);
1✔
175
      final Status localShutdownStatus = shutdownStatus;
1✔
176
      return new Runnable() {
1✔
177
        @Override
178
        public void run() {
179
          synchronized (InProcessTransport.this) {
1✔
180
            notifyShutdown(localShutdownStatus);
1✔
181
            notifyTerminated();
1✔
182
          }
1✔
183
        }
1✔
184
      };
185
    }
186
    Attributes serverTransportAttrs = Attributes.newBuilder()
1✔
187
        .set(Grpc.TRANSPORT_ATTR_REMOTE_ADDR, address)
1✔
188
        .set(Grpc.TRANSPORT_ATTR_LOCAL_ADDR, address)
1✔
189
        .build();
1✔
190
    serverStreamAttributes = serverTransportListener.transportReady(serverTransportAttrs);
1✔
191
    attributes = clientTransportListener.filterTransport(attributes);
1✔
192
    clientTransportListener.transportReady();
1✔
193
    return null;
1✔
194
  }
195

196
  @Override
197
  public synchronized ClientStream newStream(
198
      MethodDescriptor<?, ?> method, Metadata headers, CallOptions callOptions,
199
      ClientStreamTracer[] tracers) {
200
    StatsTraceContext statsTraceContext =
1✔
201
        StatsTraceContext.newClientContext(tracers, getAttributes(), headers);
1✔
202
    if (shutdownStatus != null) {
1✔
203
      return failedClientStream(statsTraceContext, shutdownStatus);
1✔
204
    }
205

206
    headers.put(GrpcUtil.USER_AGENT_KEY, userAgent);
1✔
207

208
    if (serverMaxInboundMetadataSize != Integer.MAX_VALUE) {
1✔
209
      int metadataSize = metadataSize(headers);
1✔
210
      if (metadataSize > serverMaxInboundMetadataSize) {
1✔
211
        // Other transports would compute a status with:
212
        //   GrpcUtil.httpStatusToGrpcStatus(431 /* Request Header Fields Too Large */);
213
        // However, that isn't handled specially today, so we'd leak HTTP-isms even though we're
214
        // in-process. We go ahead and make a Status, which may need to be updated if
215
        // statuscodes.md is updated.
216
        Status status = Status.RESOURCE_EXHAUSTED.withDescription(
1✔
217
            String.format(
1✔
218
                Locale.US,
219
                "Request metadata larger than %d: %d",
220
                serverMaxInboundMetadataSize,
1✔
221
                metadataSize));
1✔
222
        return failedClientStream(statsTraceContext, status);
1✔
223
      }
224
    }
225

226
    return new InProcessStream(method, headers, callOptions, authority, statsTraceContext)
1✔
227
        .clientStream;
1✔
228
  }
229

230
  private ClientStream failedClientStream(
231
      final StatsTraceContext statsTraceCtx, final Status status) {
232
    return new NoopClientStream() {
1✔
233
        @Override
234
        public void start(ClientStreamListener listener) {
235
          statsTraceCtx.clientOutboundHeaders();
1✔
236
          statsTraceCtx.streamClosed(status);
1✔
237
          listener.closed(status, RpcProgress.PROCESSED, new Metadata());
1✔
238
        }
1✔
239
      };
240
  }
241

242
  @Override
243
  public synchronized void ping(final PingCallback callback, Executor executor) {
244
    if (terminated) {
1✔
245
      final Status shutdownStatus = this.shutdownStatus;
1✔
246
      executor.execute(new Runnable() {
1✔
247
        @Override
248
        public void run() {
249
          callback.onFailure(shutdownStatus.asRuntimeException());
1✔
250
        }
1✔
251
      });
252
    } else {
1✔
253
      executor.execute(new Runnable() {
1✔
254
        @Override
255
        public void run() {
256
          callback.onSuccess(0);
1✔
257
        }
1✔
258
      });
259
    }
260
  }
1✔
261

262
  @Override
263
  public synchronized void shutdown(Status reason) {
264
    // Can be called multiple times: once for ManagedClientTransport, once for ServerTransport.
265
    if (shutdown) {
1✔
266
      return;
1✔
267
    }
268
    shutdownStatus = reason;
1✔
269
    notifyShutdown(reason);
1✔
270
    if (streams.isEmpty()) {
1✔
271
      notifyTerminated();
1✔
272
    }
273
  }
1✔
274

275
  @Override
276
  public synchronized void shutdown() {
277
    shutdown(Status.UNAVAILABLE.withDescription("InProcessTransport shutdown by the server-side"));
1✔
278
  }
1✔
279

280
  @Override
281
  public void shutdownNow(Status reason) {
282
    checkNotNull(reason, "reason");
1✔
283
    List<InProcessStream> streamsCopy;
284
    synchronized (this) {
1✔
285
      shutdown(reason);
1✔
286
      if (terminated) {
1✔
287
        return;
1✔
288
      }
289
      streamsCopy = new ArrayList<>(streams);
1✔
290
    }
1✔
291
    for (InProcessStream stream : streamsCopy) {
1✔
292
      stream.clientStream.cancel(reason);
1✔
293
    }
1✔
294
  }
1✔
295

296
  @Override
297
  public String toString() {
298
    return MoreObjects.toStringHelper(this)
×
299
        .add("logId", logId.getId())
×
300
        .add("address", address)
×
301
        .toString();
×
302
  }
303

304
  @Override
305
  public InternalLogId getLogId() {
306
    return logId;
1✔
307
  }
308

309
  @Override
310
  public Attributes getAttributes() {
311
    return attributes;
1✔
312
  }
313

314
  @Override
315
  public ScheduledExecutorService getScheduledExecutorService() {
316
    return serverScheduler;
1✔
317
  }
318

319
  @Override
320
  public ListenableFuture<SocketStats> getStats() {
321
    SettableFuture<SocketStats> ret = SettableFuture.create();
×
322
    ret.set(null);
×
323
    return ret;
×
324
  }
325

326
  private synchronized void notifyShutdown(Status s) {
327
    if (shutdown) {
1✔
328
      return;
×
329
    }
330
    shutdown = true;
1✔
331
    clientTransportListener.transportShutdown(s);
1✔
332
  }
1✔
333

334
  private synchronized void notifyTerminated() {
335
    if (terminated) {
1✔
336
      return;
×
337
    }
338
    terminated = true;
1✔
339
    if (serverScheduler != null) {
1✔
340
      serverScheduler = serverSchedulerPool.returnObject(serverScheduler);
1✔
341
    }
342
    clientTransportListener.transportTerminated();
1✔
343
    if (serverTransportListener != null) {
1✔
344
      serverTransportListener.transportTerminated();
1✔
345
    }
346
  }
1✔
347

348
  private static int metadataSize(Metadata metadata) {
349
    byte[][] serialized = InternalMetadata.serialize(metadata);
1✔
350
    if (serialized == null) {
1✔
351
      return 0;
×
352
    }
353
    // Calculate based on SETTINGS_MAX_HEADER_LIST_SIZE in RFC 7540 §6.5.2. We could use something
354
    // different, but it's "sane."
355
    long size = 0;
1✔
356
    for (int i = 0; i < serialized.length; i += 2) {
1✔
357
      size += 32 + serialized[i].length + serialized[i + 1].length;
1✔
358
    }
359
    size = Math.min(size, Integer.MAX_VALUE);
1✔
360
    return (int) size;
1✔
361
  }
362

363
  private class InProcessStream {
364
    private final InProcessClientStream clientStream;
365
    private final InProcessServerStream serverStream;
366
    private final CallOptions callOptions;
367
    private final Metadata headers;
368
    private final MethodDescriptor<?, ?> method;
369
    private volatile String authority;
370

371
    private InProcessStream(
372
        MethodDescriptor<?, ?> method, Metadata headers, CallOptions callOptions,
373
        String authority , StatsTraceContext statsTraceContext) {
1✔
374
      this.method = checkNotNull(method, "method");
1✔
375
      this.headers = checkNotNull(headers, "headers");
1✔
376
      this.callOptions = checkNotNull(callOptions, "callOptions");
1✔
377
      this.authority = authority;
1✔
378
      this.clientStream = new InProcessClientStream(callOptions, statsTraceContext);
1✔
379
      this.serverStream = new InProcessServerStream(method, headers);
1✔
380
    }
1✔
381

382
    // Can be called multiple times due to races on both client and server closing at same time.
383
    private void streamClosed() {
384
      synchronized (InProcessTransport.this) {
1✔
385
        boolean justRemovedAnElement = streams.remove(this);
1✔
386
        if (GrpcUtil.shouldBeCountedForInUse(callOptions)) {
1✔
387
          inUseState.updateObjectInUse(this, false);
1✔
388
        }
389
        if (streams.isEmpty() && justRemovedAnElement) {
1✔
390
          if (shutdown) {
1✔
391
            notifyTerminated();
1✔
392
          }
393
        }
394
      }
1✔
395
    }
1✔
396

397
    private class InProcessServerStream implements ServerStream {
398
      final StatsTraceContext statsTraceCtx;
399
      // All callbacks must run in syncContext to avoid possibility of deadlock in direct executors
400
      private ClientStreamListener clientStreamListener;
401
      private final SynchronizationContext syncContext =
1✔
402
          new SynchronizationContext(uncaughtExceptionHandler);
1✔
403
      @GuardedBy("this")
404
      private int clientRequested;
405
      @GuardedBy("this")
1✔
406
      private ArrayDeque<StreamListener.MessageProducer> clientReceiveQueue =
407
          new ArrayDeque<>();
408
      @GuardedBy("this")
409
      private Status clientNotifyStatus;
410
      @GuardedBy("this")
411
      private Metadata clientNotifyTrailers;
412
      // Only is intended to prevent double-close when client cancels.
413
      @GuardedBy("this")
414
      private boolean closed;
415
      @GuardedBy("this")
416
      private int outboundSeqNo;
417

418
      InProcessServerStream(MethodDescriptor<?, ?> method, Metadata headers) {
1✔
419
        statsTraceCtx = StatsTraceContext.newServerContext(
1✔
420
            serverStreamTracerFactories, method.getFullMethodName(), headers);
1✔
421
      }
1✔
422

423
      private synchronized void setListener(ClientStreamListener listener) {
424
        clientStreamListener = listener;
1✔
425
      }
1✔
426

427
      @Override
428
      public void setListener(ServerStreamListener serverStreamListener) {
429
        clientStream.setListener(serverStreamListener);
1✔
430
      }
1✔
431

432
      @Override
433
      public void request(int numMessages) {
434
        boolean onReady = clientStream.serverRequested(numMessages);
1✔
435
        if (onReady) {
1✔
436
          synchronized (this) {
1✔
437
            if (!closed) {
1✔
438
              syncContext.executeLater(() -> clientStreamListener.onReady());
1✔
439
            }
440
          }
1✔
441
        }
442
        syncContext.drain();
1✔
443
      }
1✔
444

445
      // This method is the only reason we have to synchronize field accesses.
446
      /**
447
       * Client requested more messages.
448
       *
449
       * @return whether onReady should be called on the server
450
       */
451
      private boolean clientRequested(int numMessages) {
452
        boolean previouslyReady;
453
        boolean nowReady;
454
        synchronized (this) {
1✔
455
          if (closed) {
1✔
456
            return false;
1✔
457
          }
458

459
          previouslyReady = clientRequested > 0;
1✔
460
          clientRequested += numMessages;
1✔
461
          while (clientRequested > 0 && !clientReceiveQueue.isEmpty()) {
1✔
462
            clientRequested--;
1✔
463
            StreamListener.MessageProducer producer = clientReceiveQueue.poll();
1✔
464
            syncContext.executeLater(() -> clientStreamListener.messagesAvailable(producer));
1✔
465
          }
1✔
466

467
          if (clientReceiveQueue.isEmpty() && clientNotifyStatus != null) {
1✔
468
            closed = true;
1✔
469
            clientStream.statsTraceCtx.clientInboundTrailers(clientNotifyTrailers);
1✔
470
            clientStream.statsTraceCtx.streamClosed(clientNotifyStatus);
1✔
471
            Status notifyStatus = this.clientNotifyStatus;
1✔
472
            Metadata notifyTrailers = this.clientNotifyTrailers;
1✔
473
            syncContext.executeLater(() ->
1✔
474
                clientStreamListener.closed(notifyStatus, RpcProgress.PROCESSED, notifyTrailers));
1✔
475
          }
476

477
          nowReady = clientRequested > 0;
1✔
478
        }
1✔
479

480
        syncContext.drain();
1✔
481
        return !previouslyReady && nowReady;
1✔
482
      }
483

484
      private void clientCancelled(Status status) {
485
        internalCancel(status);
1✔
486
      }
1✔
487

488
      @Override
489
      public void writeMessage(InputStream message) {
490
        long messageLength = 0;
1✔
491
        if (isEnabledSupportTracingMessageSizes) {
1✔
492
          try {
493
            if (assumedMessageSize != -1) {
×
494
              messageLength = assumedMessageSize;
×
495
            } else if (message instanceof KnownLength || message instanceof ByteArrayInputStream) {
×
496
              messageLength = message.available();
×
497
            } else {
498
              InputStream oldMessage = message;
×
499
              byte[] payload = ByteStreams.toByteArray(message);
×
500
              messageLength = payload.length;
×
501
              message = new ByteArrayInputStream(payload);
×
502
              oldMessage.close();
×
503
            }
504
          } catch (Exception e) {
×
505
            throw new RuntimeException("Error processing the message length", e);
×
506
          }
×
507
        }
508

509
        synchronized (this) {
1✔
510
          if (closed) {
1✔
511
            return;
1✔
512
          }
513
          statsTraceCtx.outboundMessage(outboundSeqNo);
1✔
514
          statsTraceCtx.outboundMessageSent(outboundSeqNo, -1, -1);
1✔
515
          clientStream.statsTraceCtx.inboundMessage(outboundSeqNo);
1✔
516
          clientStream.statsTraceCtx.inboundMessageRead(outboundSeqNo, -1, -1);
1✔
517
          if (isEnabledSupportTracingMessageSizes) {
1✔
518
            statsTraceCtx.outboundUncompressedSize(messageLength);
×
519
            statsTraceCtx.outboundWireSize(messageLength);
×
520
            // messageLength should be same at receiver's end as no actual wire is involved.
521
            clientStream.statsTraceCtx.inboundUncompressedSize(messageLength);
×
522
            clientStream.statsTraceCtx.inboundWireSize(messageLength);
×
523
          }
524
          outboundSeqNo++;
1✔
525
          StreamListener.MessageProducer producer = new SingleMessageProducer(message);
1✔
526
          if (clientRequested > 0) {
1✔
527
            clientRequested--;
1✔
528
            syncContext.executeLater(() -> clientStreamListener.messagesAvailable(producer));
1✔
529
          } else {
530
            clientReceiveQueue.add(producer);
1✔
531
          }
532
        }
1✔
533
        syncContext.drain();
1✔
534
      }
1✔
535

536
      @Override
537
      public void flush() {}
1✔
538

539
      @Override
540
      public synchronized boolean isReady() {
541
        if (closed) {
1✔
542
          return false;
×
543
        }
544
        return clientRequested > 0;
1✔
545
      }
546

547
      @Override
548
      public void writeHeaders(Metadata headers, boolean flush) {
549
        if (clientMaxInboundMetadataSize != Integer.MAX_VALUE) {
1✔
550
          int metadataSize = metadataSize(headers);
1✔
551
          if (metadataSize > clientMaxInboundMetadataSize) {
1✔
552
            Status serverStatus = Status.CANCELLED.withDescription("Client cancelled the RPC");
1✔
553
            clientStream.serverClosed(serverStatus, serverStatus);
1✔
554
            // Other transports provide very little information in this case. We go ahead and make a
555
            // Status, which may need to be updated if statuscodes.md is updated.
556
            Status failedStatus = Status.RESOURCE_EXHAUSTED.withDescription(
1✔
557
                String.format(
1✔
558
                    Locale.US,
559
                    "Response header metadata larger than %d: %d",
560
                    clientMaxInboundMetadataSize,
1✔
561
                    metadataSize));
1✔
562
            notifyClientClose(failedStatus, new Metadata());
1✔
563
            return;
1✔
564
          }
565
        }
566

567
        synchronized (this) {
1✔
568
          if (closed) {
1✔
569
            return;
1✔
570
          }
571

572
          clientStream.statsTraceCtx.clientInboundHeaders(headers);
1✔
573
          syncContext.executeLater(() -> clientStreamListener.headersRead(headers));
1✔
574
        }
1✔
575
        syncContext.drain();
1✔
576
      }
1✔
577

578
      @Override
579
      public void close(Status status, Metadata trailers) {
580
        // clientStream.serverClosed must happen before clientStreamListener.closed, otherwise
581
        // clientStreamListener.closed can trigger clientStream.cancel (see code in
582
        // ClientCalls.blockingUnaryCall), which may race with clientStream.serverClosed as both are
583
        // calling internalCancel().
584
        clientStream.serverClosed(Status.OK, status);
1✔
585

586
        if (clientMaxInboundMetadataSize != Integer.MAX_VALUE) {
1✔
587
          int statusSize = status.getDescription() == null ? 0 : status.getDescription().length();
1✔
588
          // Go ahead and throw in the status description's length, since that could be very long.
589
          int metadataSize = metadataSize(trailers) + statusSize;
1✔
590
          if (metadataSize > clientMaxInboundMetadataSize) {
1✔
591
            // Override the status for the client, but not the server. Transports do not guarantee
592
            // notifying the server of the failure.
593

594
            // Other transports provide very little information in this case. We go ahead and make a
595
            // Status, which may need to be updated if statuscodes.md is updated.
596
            status = Status.RESOURCE_EXHAUSTED.withDescription(
1✔
597
                String.format(
1✔
598
                    Locale.US,
599
                    "Response header metadata larger than %d: %d",
600
                    clientMaxInboundMetadataSize,
1✔
601
                    metadataSize));
1✔
602
            trailers = new Metadata();
1✔
603
          }
604
        }
605

606
        notifyClientClose(status, trailers);
1✔
607
      }
1✔
608

609
      /** clientStream.serverClosed() must be called before this method. */
610
      private void notifyClientClose(Status status, Metadata trailers) {
611
        Status clientStatus = cleanStatus(status, includeCauseWithStatus);
1✔
612
        synchronized (this) {
1✔
613
          if (closed) {
1✔
614
            return;
1✔
615
          }
616
          if (clientReceiveQueue.isEmpty()) {
1✔
617
            closed = true;
1✔
618
            clientStream.statsTraceCtx.clientInboundTrailers(trailers);
1✔
619
            clientStream.statsTraceCtx.streamClosed(clientStatus);
1✔
620
            syncContext.executeLater(
1✔
621
                () -> clientStreamListener.closed(clientStatus, RpcProgress.PROCESSED, trailers));
1✔
622
          } else {
623
            clientNotifyStatus = clientStatus;
1✔
624
            clientNotifyTrailers = trailers;
1✔
625
          }
626
        }
1✔
627
        syncContext.drain();
1✔
628
        streamClosed();
1✔
629
      }
1✔
630

631
      @Override
632
      public void cancel(Status status) {
633
        if (!internalCancel(Status.CANCELLED.withDescription("server cancelled stream"))) {
1✔
634
          return;
1✔
635
        }
636
        clientStream.serverClosed(status, status);
1✔
637
        streamClosed();
1✔
638
      }
1✔
639

640
      private boolean internalCancel(Status clientStatus) {
641
        synchronized (this) {
1✔
642
          if (closed) {
1✔
643
            return false;
1✔
644
          }
645
          closed = true;
1✔
646
          StreamListener.MessageProducer producer;
647
          while ((producer = clientReceiveQueue.poll()) != null) {
1✔
648
            InputStream message;
649
            while ((message = producer.next()) != null) {
×
650
              try {
651
                message.close();
×
652
              } catch (Throwable t) {
×
653
                log.log(Level.WARNING, "Exception closing stream", t);
×
654
              }
×
655
            }
656
          }
×
657
          clientStream.statsTraceCtx.streamClosed(clientStatus);
1✔
658
          syncContext.executeLater(
1✔
659
              () ->
660
                  clientStreamListener.closed(clientStatus, RpcProgress.PROCESSED, new Metadata()));
1✔
661
        }
1✔
662
        syncContext.drain();
1✔
663
        return true;
1✔
664
      }
665

666
      @Override
667
      public void setMessageCompression(boolean enable) {
668
        // noop
669
      }
×
670

671
      @Override
672
      public void optimizeForDirectExecutor() {}
1✔
673

674
      @Override
675
      public void setCompressor(Compressor compressor) {}
1✔
676

677
      @Override
678
      public void setDecompressor(Decompressor decompressor) {}
×
679

680
      @Override public Attributes getAttributes() {
681
        return serverStreamAttributes;
1✔
682
      }
683

684
      @Override
685
      public String getAuthority() {
686
        return InProcessStream.this.authority;
1✔
687
      }
688

689
      @Override
690
      public StatsTraceContext statsTraceContext() {
691
        return statsTraceCtx;
1✔
692
      }
693

694
      @Override
695
      public int streamId() {
696
        return -1;
1✔
697
      }
698

699
      @Override
700
      public void setOnReadyThreshold(int numBytes) {
701
        // noop
702
      }
×
703
    }
704

705
    private class InProcessClientStream implements ClientStream {
706
      final StatsTraceContext statsTraceCtx;
707
      final CallOptions callOptions;
708
      // All callbacks must run in syncContext to avoid possibility of deadlock in direct executors
709
      private ServerStreamListener serverStreamListener;
710
      private final SynchronizationContext syncContext =
1✔
711
          new SynchronizationContext(uncaughtExceptionHandler);
1✔
712
      @GuardedBy("this")
713
      private int serverRequested;
714
      @GuardedBy("this")
1✔
715
      private ArrayDeque<StreamListener.MessageProducer> serverReceiveQueue =
716
          new ArrayDeque<>();
717
      @GuardedBy("this")
718
      private boolean serverNotifyHalfClose;
719
      // Only is intended to prevent double-close when server closes.
720
      @GuardedBy("this")
721
      private boolean closed;
722
      @GuardedBy("this")
723
      private int outboundSeqNo;
724

725
      InProcessClientStream(
726
          CallOptions callOptions, StatsTraceContext statsTraceContext) {
1✔
727
        this.callOptions = callOptions;
1✔
728
        statsTraceCtx = statsTraceContext;
1✔
729
      }
1✔
730

731
      private synchronized void setListener(ServerStreamListener listener) {
732
        this.serverStreamListener = listener;
1✔
733
      }
1✔
734

735
      @Override
736
      public void request(int numMessages) {
737
        boolean onReady = serverStream.clientRequested(numMessages);
1✔
738
        if (onReady) {
1✔
739
          synchronized (this) {
1✔
740
            if (!closed) {
1✔
741
              syncContext.executeLater(() -> serverStreamListener.onReady());
1✔
742
            }
743
          }
1✔
744
          syncContext.drain();
1✔
745
        }
746
      }
1✔
747

748
      // This method is the only reason we have to synchronize field accesses.
749
      /**
750
       * Client requested more messages.
751
       *
752
       * @return whether onReady should be called on the server
753
       */
754
      private boolean serverRequested(int numMessages) {
755
        boolean previouslyReady;
756
        boolean nowReady;
757
        synchronized (this) {
1✔
758
          if (closed) {
1✔
759
            return false;
1✔
760
          }
761
          previouslyReady = serverRequested > 0;
1✔
762
          serverRequested += numMessages;
1✔
763

764
          while (serverRequested > 0 && !serverReceiveQueue.isEmpty()) {
1✔
765
            serverRequested--;
1✔
766
            StreamListener.MessageProducer producer = serverReceiveQueue.poll();
1✔
767
            syncContext.executeLater(() -> serverStreamListener.messagesAvailable(producer));
1✔
768
          }
1✔
769

770
          if (serverReceiveQueue.isEmpty() && serverNotifyHalfClose) {
1✔
771
            serverNotifyHalfClose = false;
1✔
772
            syncContext.executeLater(() -> serverStreamListener.halfClosed());
1✔
773
          }
774
          nowReady = serverRequested > 0;
1✔
775
        }
1✔
776
        syncContext.drain();
1✔
777
        return !previouslyReady && nowReady;
1✔
778
      }
779

780
      private void serverClosed(Status serverListenerStatus, Status serverTracerStatus) {
781
        internalCancel(serverListenerStatus, serverTracerStatus);
1✔
782
      }
1✔
783

784
      @Override
785
      public void writeMessage(InputStream message) {
786
        long messageLength = 0;
1✔
787
        if (isEnabledSupportTracingMessageSizes) {
1✔
788
          try {
789
            if (assumedMessageSize != -1) {
×
790
              messageLength = assumedMessageSize;
×
791
            } else if (message instanceof KnownLength || message instanceof ByteArrayInputStream) {
×
792
              messageLength = message.available();
×
793
            } else {
794
              InputStream oldMessage = message;
×
795
              byte[] payload = ByteStreams.toByteArray(message);
×
796
              messageLength = payload.length;
×
797
              message = new ByteArrayInputStream(payload);
×
798
              oldMessage.close();
×
799
            }
800
          } catch (Exception e) {
×
801
            throw new RuntimeException("Error processing the message length", e);
×
802
          }
×
803
        }
804
        synchronized (this) {
1✔
805
          if (closed) {
1✔
806
            return;
1✔
807
          }
808
          statsTraceCtx.outboundMessage(outboundSeqNo);
1✔
809
          statsTraceCtx.outboundMessageSent(outboundSeqNo, -1, -1);
1✔
810
          serverStream.statsTraceCtx.inboundMessage(outboundSeqNo);
1✔
811
          serverStream.statsTraceCtx.inboundMessageRead(outboundSeqNo, -1, -1);
1✔
812
          if (isEnabledSupportTracingMessageSizes) {
1✔
813
            statsTraceCtx.outboundUncompressedSize(messageLength);
×
814
            statsTraceCtx.outboundWireSize(messageLength);
×
815
            // messageLength should be same at receiver's end as no actual wire is involved.
816
            serverStream.statsTraceCtx.inboundUncompressedSize(messageLength);
×
817
            serverStream.statsTraceCtx.inboundWireSize(messageLength);
×
818
          }
819
          outboundSeqNo++;
1✔
820
          StreamListener.MessageProducer producer = new SingleMessageProducer(message);
1✔
821
          if (serverRequested > 0) {
1✔
822
            serverRequested--;
1✔
823
            syncContext.executeLater(() -> serverStreamListener.messagesAvailable(producer));
1✔
824
          } else {
825
            serverReceiveQueue.add(producer);
1✔
826
          }
827
        }
1✔
828
        syncContext.drain();
1✔
829
      }
1✔
830

831
      @Override
832
      public void flush() {}
1✔
833

834
      @Override
835
      public synchronized boolean isReady() {
836
        if (closed) {
1✔
837
          return false;
×
838
        }
839
        return serverRequested > 0;
1✔
840
      }
841

842
      // Must be thread-safe for shutdownNow()
843
      @Override
844
      public void cancel(Status reason) {
845
        Status serverStatus = cleanStatus(reason, includeCauseWithStatus);
1✔
846
        if (!internalCancel(serverStatus, serverStatus)) {
1✔
847
          return;
1✔
848
        }
849
        serverStream.clientCancelled(reason);
1✔
850
        streamClosed();
1✔
851
      }
1✔
852

853
      private boolean internalCancel(
854
          Status serverListenerStatus, Status serverTracerStatus) {
855
        synchronized (this) {
1✔
856
          if (closed) {
1✔
857
            return false;
1✔
858
          }
859
          closed = true;
1✔
860

861
          StreamListener.MessageProducer producer;
862
          while ((producer = serverReceiveQueue.poll()) != null) {
1✔
863
            InputStream message;
864
            while ((message = producer.next()) != null) {
1✔
865
              try {
866
                message.close();
1✔
867
              } catch (Throwable t) {
×
868
                log.log(Level.WARNING, "Exception closing stream", t);
×
869
              }
1✔
870
            }
871
          }
1✔
872
          serverStream.statsTraceCtx.streamClosed(serverTracerStatus);
1✔
873
          syncContext.executeLater(() -> serverStreamListener.closed(serverListenerStatus));
1✔
874
        }
1✔
875
        syncContext.drain();
1✔
876
        return true;
1✔
877
      }
878

879
      @Override
880
      public void halfClose() {
881
        synchronized (this) {
1✔
882
          if (closed) {
1✔
883
            return;
1✔
884
          }
885
          if (serverReceiveQueue.isEmpty()) {
1✔
886
            syncContext.executeLater(() -> serverStreamListener.halfClosed());
1✔
887
          } else {
888
            serverNotifyHalfClose = true;
1✔
889
          }
890
        }
1✔
891
        syncContext.drain();
1✔
892
      }
1✔
893

894
      @Override
895
      public void setMessageCompression(boolean enable) {}
×
896

897
      @Override
898
      public void setAuthority(String string) {
899
        InProcessStream.this.authority = string;
×
900
      }
×
901

902
      @Override
903
      public void start(ClientStreamListener listener) {
904
        serverStream.setListener(listener);
1✔
905

906
        synchronized (InProcessTransport.this) {
1✔
907
          statsTraceCtx.clientOutboundHeaders();
1✔
908
          streams.add(InProcessTransport.InProcessStream.this);
1✔
909
          if (GrpcUtil.shouldBeCountedForInUse(callOptions)) {
1✔
910
            inUseState.updateObjectInUse(InProcessTransport.InProcessStream.this, true);
1✔
911
          }
912
          serverTransportListener.streamCreated(serverStream, method.getFullMethodName(), headers);
1✔
913
        }
1✔
914
      }
1✔
915

916
      @Override
917
      public Attributes getAttributes() {
918
        return attributes;
1✔
919
      }
920

921
      @Override
922
      public void optimizeForDirectExecutor() {}
1✔
923

924
      @Override
925
      public void setCompressor(Compressor compressor) {}
1✔
926

927
      @Override
928
      public void setFullStreamDecompression(boolean fullStreamDecompression) {}
×
929

930
      @Override
931
      public void setDecompressorRegistry(DecompressorRegistry decompressorRegistry) {}
1✔
932

933
      @Override
934
      public void setMaxInboundMessageSize(int maxSize) {}
1✔
935

936
      @Override
937
      public void setMaxOutboundMessageSize(int maxSize) {}
1✔
938

939
      @Override
940
      public void setDeadline(Deadline deadline) {
941
        headers.discardAll(TIMEOUT_KEY);
1✔
942
        long effectiveTimeout = max(0, deadline.timeRemaining(TimeUnit.NANOSECONDS));
1✔
943
        headers.put(TIMEOUT_KEY, effectiveTimeout);
1✔
944
      }
1✔
945

946
      @Override
947
      public void appendTimeoutInsight(InsightBuilder insight) {
948
      }
1✔
949
    }
950
  }
951

952
  /**
953
   * Returns a new status with the same code and description.
954
   * If includeCauseWithStatus is true, cause is also included.
955
   *
956
   * <p>For InProcess transport to behave in the same way as the other transports,
957
   * when exchanging statuses between client and server and vice versa,
958
   * the cause should be excluded from the status.
959
   * For easier debugging, the status may be optionally included.
960
   */
961
  private static Status cleanStatus(Status status, boolean includeCauseWithStatus) {
962
    if (status == null) {
1✔
963
      return null;
×
964
    }
965
    Status clientStatus = Status
1✔
966
        .fromCodeValue(status.getCode().value())
1✔
967
        .withDescription(status.getDescription());
1✔
968
    if (includeCauseWithStatus) {
1✔
969
      clientStatus = clientStatus.withCause(status.getCause());
1✔
970
    }
971
    return clientStatus;
1✔
972
  }
973

974
  private static class SingleMessageProducer implements StreamListener.MessageProducer {
975
    private InputStream message;
976

977
    private SingleMessageProducer(InputStream message) {
1✔
978
      this.message = message;
1✔
979
    }
1✔
980

981
    @Nullable
982
    @Override
983
    public InputStream next() {
984
      InputStream messageToReturn = message;
1✔
985
      message = null;
1✔
986
      return messageToReturn;
1✔
987
    }
988
  }
989
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc