• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

grpc / grpc-java / #20244

17 Apr 2026 02:24PM UTC coverage: 88.793% (-0.02%) from 88.811%
#20244

push

github

web-flow
core: Reduce per-stream idle memory by 20%

Metadata was accidentally being retained after the start of the call.
That can be an overwhelming percentage of memory for an idle RPC; don't
do that. The other changes are considerably smaller, but I happened to
notice them and the changes are straight-forward without magic numbers
(e.g., there's many arrays that could be tuned).

The regular interop server uses 4600 bytes per full duplex stream while
idle, but much of that is Census recorded events hanging around. Keeping
the Census integration but removing the Census impl (so a noop is used)
drops that to 3000 bytes. This change brings that down to ~2450 bytes
(which is still including stuff from TestServiceImpl). But there's very
little Metadata in the interop tests, so absolute real-life savings
would be much higher (but relative real-life savings may be lower,
because the application will often have more state).

The measurements were captured using a modified
timeout_on_sleeping_server client that had 100,000 concurrent full
duplex calls on one connection.

36017 of 40563 relevant lines covered (88.79%)

0.89 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

91.23
/../core/src/main/java/io/grpc/internal/ServerImpl.java
1
/*
2
 * Copyright 2014 The gRPC Authors
3
 *
4
 * Licensed under the Apache License, Version 2.0 (the "License");
5
 * you may not use this file except in compliance with the License.
6
 * You may obtain a copy of the License at
7
 *
8
 *     http://www.apache.org/licenses/LICENSE-2.0
9
 *
10
 * Unless required by applicable law or agreed to in writing, software
11
 * distributed under the License is distributed on an "AS IS" BASIS,
12
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
 * See the License for the specific language governing permissions and
14
 * limitations under the License.
15
 */
16

17
package io.grpc.internal;
18

19
import static com.google.common.base.Preconditions.checkNotNull;
20
import static com.google.common.base.Preconditions.checkState;
21
import static com.google.common.util.concurrent.MoreExecutors.directExecutor;
22
import static io.grpc.Contexts.statusFromCancelled;
23
import static io.grpc.Status.DEADLINE_EXCEEDED;
24
import static io.grpc.internal.GrpcUtil.MESSAGE_ENCODING_KEY;
25
import static io.grpc.internal.GrpcUtil.TIMEOUT_KEY;
26
import static java.util.concurrent.TimeUnit.NANOSECONDS;
27

28
import com.google.common.annotations.VisibleForTesting;
29
import com.google.common.base.MoreObjects;
30
import com.google.common.base.Preconditions;
31
import com.google.common.util.concurrent.Futures;
32
import com.google.common.util.concurrent.ListenableFuture;
33
import com.google.common.util.concurrent.SettableFuture;
34
import com.google.errorprone.annotations.concurrent.GuardedBy;
35
import io.grpc.Attributes;
36
import io.grpc.BinaryLog;
37
import io.grpc.CompressorRegistry;
38
import io.grpc.Context;
39
import io.grpc.Deadline;
40
import io.grpc.Decompressor;
41
import io.grpc.DecompressorRegistry;
42
import io.grpc.HandlerRegistry;
43
import io.grpc.InternalChannelz;
44
import io.grpc.InternalChannelz.ServerStats;
45
import io.grpc.InternalChannelz.SocketStats;
46
import io.grpc.InternalInstrumented;
47
import io.grpc.InternalLogId;
48
import io.grpc.InternalServerInterceptors;
49
import io.grpc.InternalStatus;
50
import io.grpc.Metadata;
51
import io.grpc.ServerCall;
52
import io.grpc.ServerCallExecutorSupplier;
53
import io.grpc.ServerCallHandler;
54
import io.grpc.ServerInterceptor;
55
import io.grpc.ServerMethodDefinition;
56
import io.grpc.ServerServiceDefinition;
57
import io.grpc.ServerTransportFilter;
58
import io.grpc.Status;
59
import io.perfmark.Link;
60
import io.perfmark.PerfMark;
61
import io.perfmark.Tag;
62
import io.perfmark.TaskCloseable;
63
import java.io.IOException;
64
import java.io.InputStream;
65
import java.net.InetSocketAddress;
66
import java.net.SocketAddress;
67
import java.util.ArrayList;
68
import java.util.Collection;
69
import java.util.Collections;
70
import java.util.HashSet;
71
import java.util.List;
72
import java.util.Set;
73
import java.util.concurrent.Executor;
74
import java.util.concurrent.Future;
75
import java.util.concurrent.FutureTask;
76
import java.util.concurrent.TimeUnit;
77
import java.util.logging.Level;
78
import java.util.logging.Logger;
79

80
/**
81
 * Default implementation of {@link io.grpc.Server}, for creation by transports.
82
 *
83
 * <p>Expected usage (by a theoretical TCP transport):
84
 * <pre><code>public class TcpTransportServerFactory {
85
 *   public static Server newServer(Executor executor, HandlerRegistry registry,
86
 *       String configuration) {
87
 *     return new ServerImpl(executor, registry, new TcpTransportServer(configuration));
88
 *   }
89
 * }</code></pre>
90
 *
91
 * <p>Starting the server starts the underlying transport for servicing requests. Stopping the
92
 * server stops servicing new requests and waits for all connections to terminate.
93
 */
94
public final class ServerImpl extends io.grpc.Server implements InternalInstrumented<ServerStats> {
95
  private static final Logger log = Logger.getLogger(ServerImpl.class.getName());
1✔
96
  private static final ServerStreamListener NOOP_LISTENER = new NoopListener();
1✔
97

98
  private final InternalLogId logId;
99
  private final ObjectPool<? extends Executor> executorPool;
100
  /** Executor for application processing. Safe to read after {@link #start()}. */
101
  private Executor executor;
102
  private final HandlerRegistry registry;
103
  private final HandlerRegistry fallbackRegistry;
104
  private final List<ServerTransportFilter> transportFilters;
105
  // This is iterated on a per-call basis.  Use an array instead of a Collection to avoid iterator
106
  // creations.
107
  private final ServerInterceptor[] interceptors;
108
  private final long handshakeTimeoutMillis;
109
  @GuardedBy("lock") private boolean started;
110
  @GuardedBy("lock") private boolean shutdown;
111
  /** non-{@code null} if immediate shutdown has been requested. */
112
  @GuardedBy("lock") private Status shutdownNowStatus;
113
  /** {@code true} if ServerListenerImpl.serverShutdown() was called. */
114
  @GuardedBy("lock") private boolean serverShutdownCallbackInvoked;
115
  @GuardedBy("lock") private boolean terminated;
116
  /** Service encapsulating something similar to an accept() socket. */
117
  private final InternalServer transportServer;
118
  private final Object lock = new Object();
1✔
119
  @GuardedBy("lock") private boolean transportServersTerminated;
120
  /** {@code transportServer} and services encapsulating something similar to a TCP connection. */
121
  @GuardedBy("lock") private final Set<ServerTransport> transports = new HashSet<>();
1✔
122

123
  private final Context rootContext;
124

125
  private final DecompressorRegistry decompressorRegistry;
126
  private final CompressorRegistry compressorRegistry;
127
  private final BinaryLog binlog;
128

129
  private final InternalChannelz channelz;
130
  private final CallTracer serverCallTracer;
131
  private final Deadline.Ticker ticker;
132
  private final ServerCallExecutorSupplier executorSupplier;
133

134
  /**
135
   * Construct a server.
136
   *
137
   * @param builder builder with configuration for server
138
   * @param transportServer transport servers that will create new incoming transports
139
   * @param rootContext context that callbacks for new RPCs should be derived from
140
   */
141
  ServerImpl(
142
      ServerImplBuilder builder,
143
      InternalServer transportServer,
144
      Context rootContext) {
1✔
145
    this.executorPool = Preconditions.checkNotNull(builder.executorPool, "executorPool");
1✔
146
    this.registry = Preconditions.checkNotNull(builder.registryBuilder.build(), "registryBuilder");
1✔
147
    this.fallbackRegistry =
1✔
148
        Preconditions.checkNotNull(builder.fallbackRegistry, "fallbackRegistry");
1✔
149
    this.transportServer = Preconditions.checkNotNull(transportServer, "transportServer");
1✔
150
    this.logId =
1✔
151
        InternalLogId.allocate("Server", String.valueOf(getListenSocketsIgnoringLifecycle()));
1✔
152
    // Fork from the passed in context so that it does not propagate cancellation, it only
153
    // inherits values.
154
    this.rootContext = Preconditions.checkNotNull(rootContext, "rootContext")
1✔
155
        .fork()
1✔
156
        .withValue(io.grpc.InternalServer.SERVER_CONTEXT_KEY, ServerImpl.this);
1✔
157
    this.decompressorRegistry = builder.decompressorRegistry;
1✔
158
    this.compressorRegistry = builder.compressorRegistry;
1✔
159
    this.transportFilters = Collections.unmodifiableList(
1✔
160
        new ArrayList<>(builder.transportFilters));
161
    this.interceptors =
1✔
162
        builder.interceptors.toArray(new ServerInterceptor[builder.interceptors.size()]);
1✔
163
    this.handshakeTimeoutMillis = builder.handshakeTimeoutMillis;
1✔
164
    this.binlog = builder.binlog;
1✔
165
    this.channelz = builder.channelz;
1✔
166
    this.serverCallTracer = builder.callTracerFactory.create();
1✔
167
    this.ticker = checkNotNull(builder.ticker, "ticker");
1✔
168
    channelz.addServer(this);
1✔
169
    this.executorSupplier = builder.executorSupplier;
1✔
170
  }
1✔
171

172
  /**
173
   * Bind and start the server.
174
   *
175
   * @return {@code this} object
176
   * @throws IllegalStateException if already started
177
   * @throws IOException if unable to bind
178
   */
179
  @Override
180
  public ServerImpl start() throws IOException {
181
    synchronized (lock) {
1✔
182
      checkState(!started, "Already started");
1✔
183
      checkState(!shutdown, "Shutting down");
1✔
184
      // Start and wait for any ports to actually be bound.
185

186
      ServerListenerImpl listener = new ServerListenerImpl();
1✔
187
      transportServer.start(listener);
1✔
188
      executor = Preconditions.checkNotNull(executorPool.getObject(), "executor");
1✔
189
      started = true;
1✔
190
      return this;
1✔
191
    }
192
  }
193

194

195
  @Override
196
  public int getPort() {
197
    synchronized (lock) {
1✔
198
      checkState(started, "Not started");
1✔
199
      checkState(!terminated, "Already terminated");
1✔
200
      for (SocketAddress addr: transportServer.getListenSocketAddresses()) {
1✔
201
        if (addr instanceof InetSocketAddress) {
1✔
202
          return ((InetSocketAddress) addr).getPort();
1✔
203
        }
204
      }
×
205
      return -1;
×
206
    }
207
  }
208

209
  @Override
210
  public List<SocketAddress> getListenSockets() {
211
    synchronized (lock) {
1✔
212
      checkState(started, "Not started");
1✔
213
      checkState(!terminated, "Already terminated");
1✔
214
      return getListenSocketsIgnoringLifecycle();
1✔
215
    }
216
  }
217

218
  private List<SocketAddress> getListenSocketsIgnoringLifecycle() {
219
    synchronized (lock) {
1✔
220
      return Collections.unmodifiableList(transportServer.getListenSocketAddresses());
1✔
221
    }
222
  }
223

224
  @Override
225
  public List<ServerServiceDefinition> getServices() {
226
    List<ServerServiceDefinition> fallbackServices = fallbackRegistry.getServices();
×
227
    if (fallbackServices.isEmpty()) {
×
228
      return registry.getServices();
×
229
    } else {
230
      List<ServerServiceDefinition> registryServices = registry.getServices();
×
231
      int servicesCount = registryServices.size() + fallbackServices.size();
×
232
      List<ServerServiceDefinition> services =
×
233
          new ArrayList<>(servicesCount);
234
      services.addAll(registryServices);
×
235
      services.addAll(fallbackServices);
×
236
      return Collections.unmodifiableList(services);
×
237
    }
238
  }
239

240
  @Override
241
  public List<ServerServiceDefinition> getImmutableServices() {
242
    return registry.getServices();
1✔
243
  }
244

245
  @Override
246
  public List<ServerServiceDefinition> getMutableServices() {
247
    return Collections.unmodifiableList(fallbackRegistry.getServices());
1✔
248
  }
249

250
  /**
251
   * Initiates an orderly shutdown in which preexisting calls continue but new calls are rejected.
252
   */
253
  @Override
254
  public ServerImpl shutdown() {
255
    boolean shutdownTransportServers;
256
    synchronized (lock) {
1✔
257
      if (shutdown) {
1✔
258
        return this;
1✔
259
      }
260
      shutdown = true;
1✔
261
      shutdownTransportServers = started;
1✔
262
      if (!shutdownTransportServers) {
1✔
263
        transportServersTerminated = true;
1✔
264
        checkForTermination();
1✔
265
      }
266
    }
1✔
267
    if (shutdownTransportServers) {
1✔
268
      transportServer.shutdown();
1✔
269
    }
270
    return this;
1✔
271
  }
272

273
  @Override
274
  public ServerImpl shutdownNow() {
275
    shutdown();
1✔
276
    Collection<ServerTransport> transportsCopy;
277
    Status nowStatus = Status.UNAVAILABLE.withDescription("Server shutdownNow invoked");
1✔
278
    boolean savedServerShutdownCallbackInvoked;
279
    synchronized (lock) {
1✔
280
      // Short-circuiting not strictly necessary, but prevents transports from needing to handle
281
      // multiple shutdownNow invocations if shutdownNow is called multiple times.
282
      if (shutdownNowStatus != null) {
1✔
283
        return this;
1✔
284
      }
285
      shutdownNowStatus = nowStatus;
1✔
286
      transportsCopy = new ArrayList<>(transports);
1✔
287
      savedServerShutdownCallbackInvoked = serverShutdownCallbackInvoked;
1✔
288
    }
1✔
289
    // Short-circuiting not strictly necessary, but prevents transports from needing to handle
290
    // multiple shutdownNow invocations, between here and the serverShutdown callback.
291
    if (savedServerShutdownCallbackInvoked) {
1✔
292
      // Have to call shutdownNow, because serverShutdown callback only called shutdown, not
293
      // shutdownNow
294
      for (ServerTransport transport : transportsCopy) {
1✔
295
        transport.shutdownNow(nowStatus);
1✔
296
      }
1✔
297
    }
298
    return this;
1✔
299
  }
300

301
  @Override
302
  public boolean isShutdown() {
303
    synchronized (lock) {
1✔
304
      return shutdown;
1✔
305
    }
306
  }
307

308
  @Override
309
  public boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException {
310
    synchronized (lock) {
1✔
311
      long timeoutNanos = unit.toNanos(timeout);
1✔
312
      long endTimeNanos = System.nanoTime() + timeoutNanos;
1✔
313
      while (!terminated && (timeoutNanos = endTimeNanos - System.nanoTime()) > 0) {
1✔
314
        NANOSECONDS.timedWait(lock, timeoutNanos);
1✔
315
      }
316
      return terminated;
1✔
317
    }
318
  }
319

320
  @Override
321
  public void awaitTermination() throws InterruptedException {
322
    synchronized (lock) {
1✔
323
      while (!terminated) {
1✔
324
        lock.wait();
1✔
325
      }
326
    }
1✔
327
  }
1✔
328

329
  @Override
330
  public boolean isTerminated() {
331
    synchronized (lock) {
1✔
332
      return terminated;
1✔
333
    }
334
  }
335

336
  /**
337
   * Remove transport service from accounting collection and notify of complete shutdown if
338
   * necessary.
339
   *
340
   * @param transport service to remove
341
   */
342
  private void transportClosed(ServerTransport transport) {
343
    synchronized (lock) {
1✔
344
      if (!transports.remove(transport)) {
1✔
345
        throw new AssertionError("Transport already removed");
×
346
      }
347
      channelz.removeServerSocket(ServerImpl.this, transport);
1✔
348
      checkForTermination();
1✔
349
    }
1✔
350
  }
1✔
351

352
  /** Notify of complete shutdown if necessary. */
353
  private void checkForTermination() {
354
    synchronized (lock) {
1✔
355
      if (shutdown && transports.isEmpty() && transportServersTerminated) {
1✔
356
        if (terminated) {
1✔
357
          throw new AssertionError("Server already terminated");
×
358
        }
359
        terminated = true;
1✔
360
        channelz.removeServer(this);
1✔
361
        if (executor != null) {
1✔
362
          executor = executorPool.returnObject(executor);
1✔
363
        }
364
        lock.notifyAll();
1✔
365
      }
366
    }
1✔
367
  }
1✔
368

369
  private final class ServerListenerImpl implements ServerListener {
1✔
370

371
    @Override
372
    public ServerTransportListener transportCreated(ServerTransport transport) {
373
      synchronized (lock) {
1✔
374
        transports.add(transport);
1✔
375
      }
1✔
376
      ServerTransportListenerImpl stli = new ServerTransportListenerImpl(transport);
1✔
377
      stli.init();
1✔
378
      return stli;
1✔
379
    }
380

381
    @Override
382
    public void serverShutdown() {
383
      ArrayList<ServerTransport> copiedTransports;
384
      Status shutdownNowStatusCopy;
385
      synchronized (lock) {
1✔
386
        if (serverShutdownCallbackInvoked) {
1✔
387
          return;
×
388
        }
389

390
        // transports collection can be modified during shutdown(), even if we hold the lock, due
391
        // to reentrancy.
392
        copiedTransports = new ArrayList<>(transports);
1✔
393
        shutdownNowStatusCopy = shutdownNowStatus;
1✔
394
        serverShutdownCallbackInvoked = true;
1✔
395
      }
1✔
396
      for (ServerTransport transport : copiedTransports) {
1✔
397
        if (shutdownNowStatusCopy == null) {
1✔
398
          transport.shutdown();
1✔
399
        } else {
400
          transport.shutdownNow(shutdownNowStatusCopy);
1✔
401
        }
402
      }
1✔
403
      synchronized (lock) {
1✔
404
        transportServersTerminated = true;
1✔
405
        checkForTermination();
1✔
406
      }
1✔
407
    }
1✔
408
  }
409

410
  private final class ServerTransportListenerImpl implements ServerTransportListener {
411
    private final ServerTransport transport;
412
    private Future<?> handshakeTimeoutFuture;
413
    private Attributes attributes;
414

415
    ServerTransportListenerImpl(ServerTransport transport) {
1✔
416
      this.transport = transport;
1✔
417
    }
1✔
418

419
    public void init() {
420
      class TransportShutdownNow implements Runnable {
1✔
421
        @Override public void run() {
422
          transport.shutdownNow(Status.CANCELLED.withDescription("Handshake timeout exceeded"));
1✔
423
        }
1✔
424
      }
425

426
      if (handshakeTimeoutMillis != Long.MAX_VALUE) {
1✔
427
        handshakeTimeoutFuture = transport.getScheduledExecutorService()
1✔
428
            .schedule(new TransportShutdownNow(), handshakeTimeoutMillis, TimeUnit.MILLISECONDS);
1✔
429
      } else {
430
        // Noop, to avoid triggering Thread creation in InProcessServer
431
        handshakeTimeoutFuture = new FutureTask<Void>(new Runnable() {
1✔
432
          @Override public void run() {}
×
433
        }, null);
434
      }
435
      channelz.addServerSocket(ServerImpl.this, transport);
1✔
436
    }
1✔
437

438
    @Override
439
    public Attributes transportReady(Attributes attributes) {
440
      handshakeTimeoutFuture.cancel(false);
1✔
441
      handshakeTimeoutFuture = null;
1✔
442

443
      for (ServerTransportFilter filter : transportFilters) {
1✔
444
        attributes = Preconditions.checkNotNull(filter.transportReady(attributes),
1✔
445
            "Filter %s returned null", filter);
446
      }
1✔
447
      this.attributes = attributes;
1✔
448
      return attributes;
1✔
449
    }
450

451
    @Override
452
    public void transportTerminated() {
453
      if (handshakeTimeoutFuture != null) {
1✔
454
        handshakeTimeoutFuture.cancel(false);
1✔
455
        handshakeTimeoutFuture = null;
1✔
456
      }
457
      for (ServerTransportFilter filter : transportFilters) {
1✔
458
        filter.transportTerminated(attributes);
1✔
459
      }
1✔
460
      transportClosed(transport);
1✔
461
    }
1✔
462

463

464
    @Override
465
    public void streamCreated(ServerStream stream, String methodName, Metadata headers) {
466
      Tag tag = PerfMark.createTag(methodName, stream.streamId());
1✔
467
      try (TaskCloseable ignore = PerfMark.traceTask("ServerTransportListener.streamCreated")) {
1✔
468
        PerfMark.attachTag(tag);
1✔
469
        streamCreatedInternal(stream, methodName, headers, tag);
1✔
470
      }
471
    }
1✔
472

473
    private void streamCreatedInternal(
474
        final ServerStream stream, final String methodName, final Metadata headers, final Tag tag) {
475
      final Executor wrappedExecutor;
476
      // This is a performance optimization that avoids the synchronization and queuing overhead
477
      // that comes with SerializingExecutor.
478
      if (executorSupplier != null || executor != directExecutor()) {
1✔
479
        wrappedExecutor = new SerializingExecutor(executor);
1✔
480
      } else {
481
        wrappedExecutor = new SerializeReentrantCallsDirectExecutor();
1✔
482
        stream.optimizeForDirectExecutor();
1✔
483
      }
484

485
      if (headers.containsKey(MESSAGE_ENCODING_KEY)) {
1✔
486
        String encoding = headers.get(MESSAGE_ENCODING_KEY);
1✔
487
        Decompressor decompressor = decompressorRegistry.lookupDecompressor(encoding);
1✔
488
        if (decompressor == null) {
1✔
489
          stream.setListener(NOOP_LISTENER);
1✔
490
          stream.close(
1✔
491
              Status.UNIMPLEMENTED.withDescription(
1✔
492
                  String.format("Can't find decompressor for %s", encoding)),
1✔
493
              new Metadata());
494
          return;
1✔
495
        }
496
        stream.setDecompressor(decompressor);
1✔
497
      }
498

499
      final StatsTraceContext statsTraceCtx = Preconditions.checkNotNull(
1✔
500
          stream.statsTraceContext(), "statsTraceCtx not present from stream");
1✔
501

502
      final Context.CancellableContext context = createContext(headers, statsTraceCtx);
1✔
503

504
      final Link link = PerfMark.linkOut();
1✔
505

506
      final JumpToApplicationThreadServerStreamListener jumpListener
1✔
507
          = new JumpToApplicationThreadServerStreamListener(
508
                  wrappedExecutor, executor, stream, context, tag);
1✔
509
      stream.setListener(jumpListener);
1✔
510
      final SettableFuture<ServerCallParameters<?,?>> future = SettableFuture.create();
1✔
511
      // Run in serializing executor so jumpListener.setListener() is called before any callbacks
512
      // are delivered, including any errors. MethodLookup() and HandleServerCall() are proactively
513
      // queued before any callbacks are queued at serializing executor.
514
      // MethodLookup() runs on the default executor.
515
      // When executorSupplier is enabled, MethodLookup() may set/change the executor in the
516
      // SerializingExecutor before it finishes running.
517
      // Then HandleServerCall() and callbacks would switch to the executorSupplier executor.
518
      // Otherwise, they all run on the default executor.
519

520
      final class MethodLookup extends ContextRunnable {
521
        MethodLookup() {
1✔
522
          super(context);
1✔
523
        }
1✔
524

525
        @Override
526
        public void runInContext() {
527
          try (TaskCloseable ignore =
1✔
528
                   PerfMark.traceTask("ServerTransportListener$MethodLookup.startCall")) {
1✔
529
            PerfMark.attachTag(tag);
1✔
530
            PerfMark.linkIn(link);
1✔
531
            runInternal();
1✔
532
          }
533
        }
1✔
534

535
        private void runInternal() {
536
          ServerMethodDefinition<?, ?> wrapMethod;
537
          ServerCallParameters<?, ?> callParams;
538
          try {
539
            ServerMethodDefinition<?, ?> method = registry.lookupMethod(methodName);
1✔
540
            if (method == null) {
1✔
541
              method = fallbackRegistry.lookupMethod(methodName, stream.getAuthority());
1✔
542
            }
543
            if (method == null) {
1✔
544
              Status status = Status.UNIMPLEMENTED.withDescription(
1✔
545
                      "Method not found: " + methodName);
546
              // TODO(zhangkun83): this error may be recorded by the tracer, and if it's kept in
547
              // memory as a map whose key is the method name, this would allow a misbehaving
548
              // client to blow up the server in-memory stats storage by sending large number of
549
              // distinct unimplemented method
550
              // names. (https://github.com/grpc/grpc-java/issues/2285)
551
              jumpListener.setListener(NOOP_LISTENER);
1✔
552
              stream.close(status, new Metadata());
1✔
553
              context.cancel(null);
1✔
554
              future.cancel(false);
1✔
555
              return;
1✔
556
            }
557
            wrapMethod = wrapMethod(stream, method, statsTraceCtx);
1✔
558
            callParams = maySwitchExecutor(wrapMethod, stream, headers, context, tag);
1✔
559
            future.set(callParams);
1✔
560
          } catch (Throwable t) {
1✔
561
            jumpListener.setListener(NOOP_LISTENER);
1✔
562
            stream.close(Status.fromThrowable(t), new Metadata());
×
563
            context.cancel(null);
×
564
            future.cancel(false);
×
565
            throw t;
×
566
          }
1✔
567
        }
1✔
568

569
        private <ReqT, RespT> ServerCallParameters<ReqT, RespT> maySwitchExecutor(
570
            final ServerMethodDefinition<ReqT, RespT> methodDef,
571
            final ServerStream stream,
572
            final Metadata headers,
573
            final Context.CancellableContext context,
574
            final Tag tag) {
575
          final ServerCallImpl<ReqT, RespT> call = new ServerCallImpl<>(
1✔
576
                  stream,
577
                  methodDef.getMethodDescriptor(),
1✔
578
                  headers,
579
                  context,
580
                  decompressorRegistry,
1✔
581
                  compressorRegistry,
1✔
582
                  serverCallTracer,
1✔
583
                  tag);
584
          if (executorSupplier != null) {
1✔
585
            Executor switchingExecutor = executorSupplier.getExecutor(call, headers);
1✔
586
            if (switchingExecutor != null) {
1✔
587
              ((SerializingExecutor)wrappedExecutor).setExecutor(switchingExecutor);
1✔
588
            }
589
          }
590
          return new ServerCallParameters<>(call, methodDef.getServerCallHandler());
1✔
591
        }
592
      }
593

594
      final class HandleServerCall extends ContextRunnable {
595
        HandleServerCall() {
1✔
596
          super(context);
1✔
597
        }
1✔
598

599
        @Override
600
        public void runInContext() {
601
          try (TaskCloseable ignore =
1✔
602
                   PerfMark.traceTask("ServerTransportListener$HandleServerCall.startCall")) {
1✔
603
            PerfMark.linkIn(link);
1✔
604
            PerfMark.attachTag(tag);
1✔
605
            runInternal();
1✔
606
          }
607
        }
1✔
608

609
        private void runInternal() {
610
          ServerStreamListener listener = NOOP_LISTENER;
1✔
611
          if (future.isCancelled()) {
1✔
612
            return;
1✔
613
          }
614
          try {
615
            listener = startWrappedCall(methodName, Futures.getDone(future), headers);
1✔
616
          } catch (Throwable ex) {
1✔
617
            stream.close(Status.fromThrowable(ex), new Metadata());
1✔
618
            context.cancel(null);
1✔
619
            throw new IllegalStateException(ex);
1✔
620
          } finally {
621
            jumpListener.setListener(listener);
1✔
622
          }
623

624
          // An extremely short deadline may expire before stream.setListener(jumpListener).
625
          // This causes NPE as in issue: https://github.com/grpc/grpc-java/issues/6300
626
          // Delay of setting cancellationListener to context will fix the issue.
627
          context.addListener(new ServerStreamCancellationListener(stream), directExecutor());
1✔
628
        }
1✔
629
      }
630

631
      wrappedExecutor.execute(new MethodLookup());
1✔
632
      wrappedExecutor.execute(new HandleServerCall());
1✔
633
    }
1✔
634

635
    private Context.CancellableContext createContext(
636
        Metadata headers, StatsTraceContext statsTraceCtx) {
637
      Long timeoutNanos = headers.get(TIMEOUT_KEY);
1✔
638

639
      Context baseContext =
1✔
640
          statsTraceCtx
641
              .serverFilterContext(rootContext);
1✔
642

643
      if (timeoutNanos == null) {
1✔
644
        return baseContext.withCancellation();
1✔
645
      }
646

647
      Context.CancellableContext context =
1✔
648
          baseContext.withDeadline(
1✔
649
              Deadline.after(timeoutNanos, NANOSECONDS, ticker),
1✔
650
              transport.getScheduledExecutorService());
1✔
651

652
      return context;
1✔
653
    }
654

655
    /** Never returns {@code null}. */
656
    private <ReqT, RespT> ServerMethodDefinition<?,?> wrapMethod(ServerStream stream,
657
        ServerMethodDefinition<ReqT, RespT> methodDef, StatsTraceContext statsTraceCtx) {
658
      // TODO(ejona86): should we update fullMethodName to have the canonical path of the method?
659
      statsTraceCtx.serverCallStarted(
1✔
660
          new ServerCallInfoImpl<>(
661
              methodDef.getMethodDescriptor(), // notify with original method descriptor
1✔
662
              stream.getAttributes(),
1✔
663
              stream.getAuthority()));
1✔
664
      ServerCallHandler<ReqT, RespT> handler = methodDef.getServerCallHandler();
1✔
665
      for (ServerInterceptor interceptor : interceptors) {
1✔
666
        handler = InternalServerInterceptors.interceptCallHandlerCreate(interceptor, handler);
1✔
667
      }
668
      ServerMethodDefinition<ReqT, RespT> interceptedDef = methodDef.withServerCallHandler(handler);
1✔
669
      ServerMethodDefinition<?, ?> wMethodDef = binlog == null
1✔
670
          ? interceptedDef : binlog.wrapMethodDefinition(interceptedDef);
1✔
671
      return wMethodDef;
1✔
672
    }
673

674
    private final class ServerCallParameters<ReqT, RespT> {
675
      ServerCallImpl<ReqT, RespT> call;
676
      ServerCallHandler<ReqT, RespT> callHandler;
677

678
      public ServerCallParameters(ServerCallImpl<ReqT, RespT> call,
679
                                  ServerCallHandler<ReqT, RespT> callHandler) {
1✔
680
        this.call = call;
1✔
681
        this.callHandler = callHandler;
1✔
682
      }
1✔
683
    }
684

685
    private <WReqT, WRespT> ServerStreamListener startWrappedCall(
686
        String fullMethodName,
687
        ServerCallParameters<WReqT, WRespT> params,
688
        Metadata headers) {
689
      ServerCall.Listener<WReqT> callListener =
1✔
690
              params.callHandler.startCall(params.call, headers);
1✔
691
      if (callListener == null) {
1✔
692
        throw new NullPointerException(
1✔
693
                "startCall() returned a null listener for method " + fullMethodName);
694
      }
695
      return params.call.newServerStreamListener(callListener);
1✔
696
    }
697
  }
698

699
  /**
700
   * Propagates context cancellation to the ServerStream.
701
   *
702
   * <p>This is outside of HandleServerCall because that class holds Metadata and other state needed
703
   * only when starting the RPC. The cancellation listener will live for the life of the call, so we
704
   * avoid that useless state being retained.
705
   */
706
  static final class ServerStreamCancellationListener implements Context.CancellationListener {
707
    private final ServerStream stream;
708

709
    ServerStreamCancellationListener(ServerStream stream) {
1✔
710
      this.stream = checkNotNull(stream, "stream");
1✔
711
    }
1✔
712

713
    @Override
714
    public void cancelled(Context context) {
715
      Status status = statusFromCancelled(context);
1✔
716
      if (DEADLINE_EXCEEDED.getCode().equals(status.getCode())) {
1✔
717
        // This should rarely get run, since the client will likely cancel the stream
718
        // before the timeout is reached.
719
        stream.cancel(status);
1✔
720
      }
721
    }
1✔
722
  }
723

724
  @Override
725
  public InternalLogId getLogId() {
726
    return logId;
1✔
727
  }
728

729
  @Override
730
  public ListenableFuture<ServerStats> getStats() {
731
    ServerStats.Builder builder = new ServerStats.Builder();
1✔
732
    List<InternalInstrumented<SocketStats>> stats = transportServer.getListenSocketStatsList();
1✔
733
    if (stats != null ) {
1✔
734
      builder.addListenSockets(stats);
×
735
    }
736
    serverCallTracer.updateBuilder(builder);
1✔
737
    SettableFuture<ServerStats> ret = SettableFuture.create();
1✔
738
    ret.set(builder.build());
1✔
739
    return ret;
1✔
740
  }
741

742
  @Override
743
  public String toString() {
744
    return MoreObjects.toStringHelper(this)
×
745
        .add("logId", logId.getId())
×
746
        .add("transportServer", transportServer)
×
747
        .toString();
×
748
  }
749

750
  private static final class NoopListener implements ServerStreamListener {
751
    @Override
752
    public void messagesAvailable(MessageProducer producer) {
753
      InputStream message;
754
      while ((message = producer.next()) != null) {
×
755
        try {
756
          message.close();
×
757
        } catch (IOException e) {
×
758
          // Close any remaining messages
759
          while ((message = producer.next()) != null) {
×
760
            try {
761
              message.close();
×
762
            } catch (IOException ioException) {
×
763
              // just log additional exceptions as we are already going to throw
764
              log.log(Level.WARNING, "Exception closing stream", ioException);
×
765
            }
×
766
          }
767
          throw new RuntimeException(e);
×
768
        }
×
769
      }
770
    }
×
771

772
    @Override
773
    public void halfClosed() {}
×
774

775
    @Override
776
    public void closed(Status status) {}
1✔
777

778
    @Override
779
    public void onReady() {}
1✔
780
  }
781

782
  /**
783
   * Dispatches callbacks onto an application-provided executor and correctly propagates
784
   * exceptions.
785
   */
786
  @VisibleForTesting
787
  static final class JumpToApplicationThreadServerStreamListener implements ServerStreamListener {
788
    private final Executor callExecutor;
789
    private final Executor cancelExecutor;
790
    private final Context.CancellableContext context;
791
    private final ServerStream stream;
792
    private final Tag tag;
793
    // Only accessed from callExecutor.
794
    private ServerStreamListener listener;
795

796
    public JumpToApplicationThreadServerStreamListener(Executor executor,
797
        Executor cancelExecutor, ServerStream stream, Context.CancellableContext context, Tag tag) {
1✔
798
      this.callExecutor = executor;
1✔
799
      this.cancelExecutor = cancelExecutor;
1✔
800
      this.stream = stream;
1✔
801
      this.context = context;
1✔
802
      this.tag = tag;
1✔
803
    }
1✔
804

805
    /**
806
     * This call MUST be serialized on callExecutor to avoid races.
807
     */
808
    private ServerStreamListener getListener() {
809
      if (listener == null) {
1✔
810
        throw new IllegalStateException("listener unset");
×
811
      }
812
      return listener;
1✔
813
    }
814

815
    @VisibleForTesting
816
    void setListener(ServerStreamListener listener) {
817
      Preconditions.checkNotNull(listener, "listener must not be null");
1✔
818
      Preconditions.checkState(this.listener == null, "Listener already set");
1✔
819
      this.listener = listener;
1✔
820
    }
1✔
821

822
    /**
823
     * Like {@link ServerCall#close(Status, Metadata)}, but thread-safe for internal use.
824
     */
825
    private void internalClose(Throwable t) {
826
      // TODO(ejona86): this is not thread-safe :)
827
      String description = "Application error processing RPC";
1✔
828
      stream.close(Status.UNKNOWN.withDescription(description).withCause(t), new Metadata());
1✔
829
    }
1✔
830

831
    @Override
832
    public void messagesAvailable(final MessageProducer producer) {
833
      try (TaskCloseable ignore = PerfMark.traceTask("ServerStreamListener.messagesAvailable")) {
1✔
834
        PerfMark.attachTag(tag);
1✔
835
        final Link link = PerfMark.linkOut();
1✔
836
        final class MessagesAvailable extends ContextRunnable {
837

838
          MessagesAvailable() {
1✔
839
            super(context);
1✔
840
          }
1✔
841

842
          @Override
843
          public void runInContext() {
844
            try (TaskCloseable ignore =
1✔
845
                     PerfMark.traceTask("ServerCallListener(app).messagesAvailable")) {
1✔
846
              PerfMark.attachTag(tag);
1✔
847
              PerfMark.linkIn(link);
1✔
848
              getListener().messagesAvailable(producer);
1✔
849
            } catch (Throwable t) {
1✔
850
              internalClose(t);
1✔
851
              throw t;
1✔
852
            }
1✔
853
          }
1✔
854
        }
855

856
        callExecutor.execute(new MessagesAvailable());
1✔
857
      }
858
    }
1✔
859

860
    @Override
861
    public void halfClosed() {
862
      try (TaskCloseable ignore = PerfMark.traceTask("ServerStreamListener.halfClosed")) {
1✔
863
        PerfMark.attachTag(tag);
1✔
864
        final Link link = PerfMark.linkOut();
1✔
865
        final class HalfClosed extends ContextRunnable {
866
          HalfClosed() {
1✔
867
            super(context);
1✔
868
          }
1✔
869

870
          @Override
871
          public void runInContext() {
872
            try (TaskCloseable ignore = PerfMark.traceTask("ServerCallListener(app).halfClosed")) {
1✔
873
              PerfMark.attachTag(tag);
1✔
874
              PerfMark.linkIn(link);
1✔
875
              getListener().halfClosed();
1✔
876
            } catch (Throwable t) {
1✔
877
              internalClose(t);
1✔
878
              throw t;
1✔
879
            }
1✔
880
          }
1✔
881
        }
882

883
        callExecutor.execute(new HalfClosed());
1✔
884
      }
885
    }
1✔
886

887
    @Override
888
    public void closed(final Status status) {
889
      try (TaskCloseable ignore = PerfMark.traceTask("ServerStreamListener.closed")) {
1✔
890
        PerfMark.attachTag(tag);
1✔
891
        closedInternal(status);
1✔
892
      }
893
    }
1✔
894

895
    private void closedInternal(final Status status) {
896
      // For cancellations, promptly inform any users of the context that their work should be
897
      // aborted. Otherwise, we can wait until pending work is done.
898
      if (!status.isOk()) {
1✔
899
        // Since status was not OK we know that the call did not complete and got cancelled. To
900
        // reflect this on the context we need to close it with a cause exception. Since not every
901
        // failed status has an exception we will create one here if needed.
902
        Throwable cancelCause = status.getCause();
1✔
903
        if (cancelCause == null) {
1✔
904
          cancelCause = InternalStatus.asRuntimeExceptionWithoutStacktrace(
1✔
905
              Status.CANCELLED.withDescription("RPC cancelled"), null);
1✔
906
        }
907

908
        // The callExecutor might be busy doing user work. To avoid waiting, use an executor that
909
        // is not serializing.
910
        cancelExecutor.execute(new ContextCloser(context, cancelCause));
1✔
911
      }
912
      final Link link = PerfMark.linkOut();
1✔
913

914
      final class Closed extends ContextRunnable {
915
        Closed() {
1✔
916
          super(context);
1✔
917
        }
1✔
918

919
        @Override
920
        public void runInContext() {
921
          try (TaskCloseable ignore = PerfMark.traceTask("ServerCallListener(app).closed")) {
1✔
922
            PerfMark.attachTag(tag);
1✔
923
            PerfMark.linkIn(link);
1✔
924
            getListener().closed(status);
1✔
925
          }
926
        }
1✔
927
      }
928

929
      callExecutor.execute(new Closed());
1✔
930
    }
1✔
931

932
    @Override
933
    public void onReady() {
934
      try (TaskCloseable ignore = PerfMark.traceTask("ServerStreamListener.onReady")) {
1✔
935
        PerfMark.attachTag(tag);
1✔
936
        final Link link = PerfMark.linkOut();
1✔
937

938
        final class OnReady extends ContextRunnable {
939
          OnReady() {
1✔
940
            super(context);
1✔
941
          }
1✔
942

943
          @Override
944
          public void runInContext() {
945
            try (TaskCloseable ignore = PerfMark.traceTask("ServerCallListener(app).onReady")) {
1✔
946
              PerfMark.attachTag(tag);
1✔
947
              PerfMark.linkIn(link);
1✔
948
              getListener().onReady();
1✔
949
            } catch (Throwable t) {
1✔
950
              internalClose(t);
1✔
951
              throw t;
1✔
952
            }
1✔
953
          }
1✔
954
        }
955

956
        callExecutor.execute(new OnReady());
1✔
957
      }
958
    }
1✔
959
  }
960

961
  @VisibleForTesting
962
  static final class ContextCloser implements Runnable {
963
    private final Context.CancellableContext context;
964
    private final Throwable cause;
965

966
    ContextCloser(Context.CancellableContext context, Throwable cause) {
1✔
967
      this.context = context;
1✔
968
      this.cause = cause;
1✔
969
    }
1✔
970

971
    @Override
972
    public void run() {
973
      context.cancel(cause);
1✔
974
    }
1✔
975
  }
976
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc