• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

grpc / grpc-java / #20272

07 May 2026 12:34PM UTC coverage: 88.836% (+0.008%) from 88.828%
#20272

push

github

web-flow
core,opentelemetry: Fix server metric labels on early close (#12774)

This addresses the server-side OpenTelemetry metric labeling bug from
#12117 where a generated method can be recorded as `grpc.method="other"`
if `streamClosed()` happens before `serverCallStarted()`.

### What changed

- add an internal `StatsTraceContext.ServerCallMethodListener` hook so
tracers can consume an already-resolved primary-registry
`MethodDescriptor`
- resolve the immutable internal primary registry on the transport path
and seed method classification before the async `MethodLookup` path runs
- keep fallback registry lookup on the existing async path
- update the OpenTelemetry server tracer to use the early-resolved
method classification for close metrics

### Why this shape

- avoids tracer-side `HandlerRegistry` lookup
- uses only the immutable internal primary registry for early
transport-path lookup
- keeps fallback registry lookup on the existing async path

### Tests

- primary generated method: early close preserves the generated method
name
- primary non-generated method: early close still records `other`
- fallback generated method: fallback lookup remains on the existing
async path and does not introduce early transport-path classification
- tracer-level regression: `serverCallMethodResolved()` +
`streamClosed()` records the generated method name without waiting for
`serverCallStarted()`

### Notes

- `ServerCallMethodListener` is an internal hook that carries the
resolved `MethodDescriptor`; tracers consume the resolved result instead
of performing registry lookup themselves
- `ServerImpl` uses `InternalHandlerRegistry` explicitly for the primary
registry to make it clear that the early transport- path lookup is
limited to the immutable internal primary registry
- this PR intentionally does not widen transport-path lookup to the
fallback registry

Ref #12117

36260 of 40817 relevant lines covered (88.84%)

0.89 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

91.29
/../core/src/main/java/io/grpc/internal/ServerImpl.java
1
/*
2
 * Copyright 2014 The gRPC Authors
3
 *
4
 * Licensed under the Apache License, Version 2.0 (the "License");
5
 * you may not use this file except in compliance with the License.
6
 * You may obtain a copy of the License at
7
 *
8
 *     http://www.apache.org/licenses/LICENSE-2.0
9
 *
10
 * Unless required by applicable law or agreed to in writing, software
11
 * distributed under the License is distributed on an "AS IS" BASIS,
12
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
 * See the License for the specific language governing permissions and
14
 * limitations under the License.
15
 */
16

17
package io.grpc.internal;
18

19
import static com.google.common.base.Preconditions.checkNotNull;
20
import static com.google.common.base.Preconditions.checkState;
21
import static com.google.common.util.concurrent.MoreExecutors.directExecutor;
22
import static io.grpc.Contexts.statusFromCancelled;
23
import static io.grpc.Status.DEADLINE_EXCEEDED;
24
import static io.grpc.internal.GrpcUtil.MESSAGE_ENCODING_KEY;
25
import static io.grpc.internal.GrpcUtil.TIMEOUT_KEY;
26
import static java.util.concurrent.TimeUnit.NANOSECONDS;
27

28
import com.google.common.annotations.VisibleForTesting;
29
import com.google.common.base.MoreObjects;
30
import com.google.common.base.Preconditions;
31
import com.google.common.util.concurrent.Futures;
32
import com.google.common.util.concurrent.ListenableFuture;
33
import com.google.common.util.concurrent.SettableFuture;
34
import com.google.errorprone.annotations.concurrent.GuardedBy;
35
import io.grpc.Attributes;
36
import io.grpc.BinaryLog;
37
import io.grpc.CompressorRegistry;
38
import io.grpc.Context;
39
import io.grpc.Deadline;
40
import io.grpc.Decompressor;
41
import io.grpc.DecompressorRegistry;
42
import io.grpc.HandlerRegistry;
43
import io.grpc.InternalChannelz;
44
import io.grpc.InternalChannelz.ServerStats;
45
import io.grpc.InternalChannelz.SocketStats;
46
import io.grpc.InternalInstrumented;
47
import io.grpc.InternalLogId;
48
import io.grpc.InternalServerInterceptors;
49
import io.grpc.InternalStatus;
50
import io.grpc.Metadata;
51
import io.grpc.ServerCall;
52
import io.grpc.ServerCallExecutorSupplier;
53
import io.grpc.ServerCallHandler;
54
import io.grpc.ServerInterceptor;
55
import io.grpc.ServerMethodDefinition;
56
import io.grpc.ServerServiceDefinition;
57
import io.grpc.ServerTransportFilter;
58
import io.grpc.Status;
59
import io.perfmark.Link;
60
import io.perfmark.PerfMark;
61
import io.perfmark.Tag;
62
import io.perfmark.TaskCloseable;
63
import java.io.IOException;
64
import java.io.InputStream;
65
import java.net.InetSocketAddress;
66
import java.net.SocketAddress;
67
import java.util.ArrayList;
68
import java.util.Collection;
69
import java.util.Collections;
70
import java.util.HashSet;
71
import java.util.List;
72
import java.util.Set;
73
import java.util.concurrent.Executor;
74
import java.util.concurrent.Future;
75
import java.util.concurrent.FutureTask;
76
import java.util.concurrent.TimeUnit;
77
import java.util.logging.Level;
78
import java.util.logging.Logger;
79

80
/**
81
 * Default implementation of {@link io.grpc.Server}, for creation by transports.
82
 *
83
 * <p>Expected usage (by a theoretical TCP transport):
84
 * <pre><code>public class TcpTransportServerFactory {
85
 *   public static Server newServer(Executor executor, HandlerRegistry registry,
86
 *       String configuration) {
87
 *     return new ServerImpl(executor, registry, new TcpTransportServer(configuration));
88
 *   }
89
 * }</code></pre>
90
 *
91
 * <p>Starting the server starts the underlying transport for servicing requests. Stopping the
92
 * server stops servicing new requests and waits for all connections to terminate.
93
 */
94
public final class ServerImpl extends io.grpc.Server implements InternalInstrumented<ServerStats> {
95
  private static final Logger log = Logger.getLogger(ServerImpl.class.getName());
1✔
96
  private static final ServerStreamListener NOOP_LISTENER = new NoopListener();
1✔
97

98
  private final InternalLogId logId;
99
  private final ObjectPool<? extends Executor> executorPool;
100
  /** Executor for application processing. Safe to read after {@link #start()}. */
101
  private Executor executor;
102
  private final InternalHandlerRegistry registry;
103
  private final HandlerRegistry fallbackRegistry;
104
  private final List<ServerTransportFilter> transportFilters;
105
  // This is iterated on a per-call basis.  Use an array instead of a Collection to avoid iterator
106
  // creations.
107
  private final ServerInterceptor[] interceptors;
108
  private final long handshakeTimeoutMillis;
109
  @GuardedBy("lock") private boolean started;
110
  @GuardedBy("lock") private boolean shutdown;
111
  /** non-{@code null} if immediate shutdown has been requested. */
112
  @GuardedBy("lock") private Status shutdownNowStatus;
113
  /** {@code true} if ServerListenerImpl.serverShutdown() was called. */
114
  @GuardedBy("lock") private boolean serverShutdownCallbackInvoked;
115
  @GuardedBy("lock") private boolean terminated;
116
  /** Service encapsulating something similar to an accept() socket. */
117
  private final InternalServer transportServer;
118
  private final Object lock = new Object();
1✔
119
  @GuardedBy("lock") private boolean transportServersTerminated;
120
  /** {@code transportServer} and services encapsulating something similar to a TCP connection. */
121
  @GuardedBy("lock") private final Set<ServerTransport> transports = new HashSet<>();
1✔
122

123
  private final Context rootContext;
124

125
  private final DecompressorRegistry decompressorRegistry;
126
  private final CompressorRegistry compressorRegistry;
127
  private final BinaryLog binlog;
128

129
  private final InternalChannelz channelz;
130
  private final CallTracer serverCallTracer;
131
  private final Deadline.Ticker ticker;
132
  private final ServerCallExecutorSupplier executorSupplier;
133

134
  /**
135
   * Construct a server.
136
   *
137
   * @param builder builder with configuration for server
138
   * @param transportServer transport servers that will create new incoming transports
139
   * @param rootContext context that callbacks for new RPCs should be derived from
140
   */
141
  ServerImpl(
142
      ServerImplBuilder builder,
143
      InternalServer transportServer,
144
      Context rootContext) {
1✔
145
    this.executorPool = Preconditions.checkNotNull(builder.executorPool, "executorPool");
1✔
146
    this.registry = Preconditions.checkNotNull(builder.registryBuilder.build(), "registryBuilder");
1✔
147
    this.fallbackRegistry =
1✔
148
        Preconditions.checkNotNull(builder.fallbackRegistry, "fallbackRegistry");
1✔
149
    this.transportServer = Preconditions.checkNotNull(transportServer, "transportServer");
1✔
150
    this.logId =
1✔
151
        InternalLogId.allocate("Server", String.valueOf(getListenSocketsIgnoringLifecycle()));
1✔
152
    // Fork from the passed in context so that it does not propagate cancellation, it only
153
    // inherits values.
154
    this.rootContext = Preconditions.checkNotNull(rootContext, "rootContext")
1✔
155
        .fork()
1✔
156
        .withValue(io.grpc.InternalServer.SERVER_CONTEXT_KEY, ServerImpl.this);
1✔
157
    this.decompressorRegistry = builder.decompressorRegistry;
1✔
158
    this.compressorRegistry = builder.compressorRegistry;
1✔
159
    this.transportFilters = Collections.unmodifiableList(
1✔
160
        new ArrayList<>(builder.transportFilters));
161
    this.interceptors =
1✔
162
        builder.interceptors.toArray(new ServerInterceptor[builder.interceptors.size()]);
1✔
163
    this.handshakeTimeoutMillis = builder.handshakeTimeoutMillis;
1✔
164
    this.binlog = builder.binlog;
1✔
165
    this.channelz = builder.channelz;
1✔
166
    this.serverCallTracer = builder.callTracerFactory.create();
1✔
167
    this.ticker = checkNotNull(builder.ticker, "ticker");
1✔
168
    channelz.addServer(this);
1✔
169
    this.executorSupplier = builder.executorSupplier;
1✔
170
  }
1✔
171

172
  /**
173
   * Bind and start the server.
174
   *
175
   * @return {@code this} object
176
   * @throws IllegalStateException if already started
177
   * @throws IOException if unable to bind
178
   */
179
  @Override
180
  public ServerImpl start() throws IOException {
181
    synchronized (lock) {
1✔
182
      checkState(!started, "Already started");
1✔
183
      checkState(!shutdown, "Shutting down");
1✔
184
      // Start and wait for any ports to actually be bound.
185

186
      ServerListenerImpl listener = new ServerListenerImpl();
1✔
187
      transportServer.start(listener);
1✔
188
      executor = Preconditions.checkNotNull(executorPool.getObject(), "executor");
1✔
189
      started = true;
1✔
190
      return this;
1✔
191
    }
192
  }
193

194

195
  @Override
196
  public int getPort() {
197
    synchronized (lock) {
1✔
198
      checkState(started, "Not started");
1✔
199
      checkState(!terminated, "Already terminated");
1✔
200
      for (SocketAddress addr: transportServer.getListenSocketAddresses()) {
1✔
201
        if (addr instanceof InetSocketAddress) {
1✔
202
          return ((InetSocketAddress) addr).getPort();
1✔
203
        }
204
      }
×
205
      return -1;
×
206
    }
207
  }
208

209
  @Override
210
  public List<SocketAddress> getListenSockets() {
211
    synchronized (lock) {
1✔
212
      checkState(started, "Not started");
1✔
213
      checkState(!terminated, "Already terminated");
1✔
214
      return getListenSocketsIgnoringLifecycle();
1✔
215
    }
216
  }
217

218
  private List<SocketAddress> getListenSocketsIgnoringLifecycle() {
219
    synchronized (lock) {
1✔
220
      return Collections.unmodifiableList(transportServer.getListenSocketAddresses());
1✔
221
    }
222
  }
223

224
  @Override
225
  public List<ServerServiceDefinition> getServices() {
226
    List<ServerServiceDefinition> fallbackServices = fallbackRegistry.getServices();
×
227
    if (fallbackServices.isEmpty()) {
×
228
      return registry.getServices();
×
229
    } else {
230
      List<ServerServiceDefinition> registryServices = registry.getServices();
×
231
      int servicesCount = registryServices.size() + fallbackServices.size();
×
232
      List<ServerServiceDefinition> services =
×
233
          new ArrayList<>(servicesCount);
234
      services.addAll(registryServices);
×
235
      services.addAll(fallbackServices);
×
236
      return Collections.unmodifiableList(services);
×
237
    }
238
  }
239

240
  @Override
241
  public List<ServerServiceDefinition> getImmutableServices() {
242
    return registry.getServices();
1✔
243
  }
244

245
  @Override
246
  public List<ServerServiceDefinition> getMutableServices() {
247
    return Collections.unmodifiableList(fallbackRegistry.getServices());
1✔
248
  }
249

250
  /**
251
   * Initiates an orderly shutdown in which preexisting calls continue but new calls are rejected.
252
   */
253
  @Override
254
  public ServerImpl shutdown() {
255
    boolean shutdownTransportServers;
256
    synchronized (lock) {
1✔
257
      if (shutdown) {
1✔
258
        return this;
1✔
259
      }
260
      shutdown = true;
1✔
261
      shutdownTransportServers = started;
1✔
262
      if (!shutdownTransportServers) {
1✔
263
        transportServersTerminated = true;
1✔
264
        checkForTermination();
1✔
265
      }
266
    }
1✔
267
    if (shutdownTransportServers) {
1✔
268
      transportServer.shutdown();
1✔
269
    }
270
    return this;
1✔
271
  }
272

273
  @Override
274
  public ServerImpl shutdownNow() {
275
    shutdown();
1✔
276
    Collection<ServerTransport> transportsCopy;
277
    Status nowStatus = Status.UNAVAILABLE.withDescription("Server shutdownNow invoked");
1✔
278
    boolean savedServerShutdownCallbackInvoked;
279
    synchronized (lock) {
1✔
280
      // Short-circuiting not strictly necessary, but prevents transports from needing to handle
281
      // multiple shutdownNow invocations if shutdownNow is called multiple times.
282
      if (shutdownNowStatus != null) {
1✔
283
        return this;
1✔
284
      }
285
      shutdownNowStatus = nowStatus;
1✔
286
      transportsCopy = new ArrayList<>(transports);
1✔
287
      savedServerShutdownCallbackInvoked = serverShutdownCallbackInvoked;
1✔
288
    }
1✔
289
    // Short-circuiting not strictly necessary, but prevents transports from needing to handle
290
    // multiple shutdownNow invocations, between here and the serverShutdown callback.
291
    if (savedServerShutdownCallbackInvoked) {
1✔
292
      // Have to call shutdownNow, because serverShutdown callback only called shutdown, not
293
      // shutdownNow
294
      for (ServerTransport transport : transportsCopy) {
1✔
295
        transport.shutdownNow(nowStatus);
1✔
296
      }
1✔
297
    }
298
    return this;
1✔
299
  }
300

301
  @Override
302
  public boolean isShutdown() {
303
    synchronized (lock) {
1✔
304
      return shutdown;
1✔
305
    }
306
  }
307

308
  @Override
309
  public boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException {
310
    synchronized (lock) {
1✔
311
      long timeoutNanos = unit.toNanos(timeout);
1✔
312
      long endTimeNanos = System.nanoTime() + timeoutNanos;
1✔
313
      while (!terminated && (timeoutNanos = endTimeNanos - System.nanoTime()) > 0) {
1✔
314
        NANOSECONDS.timedWait(lock, timeoutNanos);
1✔
315
      }
316
      return terminated;
1✔
317
    }
318
  }
319

320
  @Override
321
  public void awaitTermination() throws InterruptedException {
322
    synchronized (lock) {
1✔
323
      while (!terminated) {
1✔
324
        lock.wait();
1✔
325
      }
326
    }
1✔
327
  }
1✔
328

329
  @Override
330
  public boolean isTerminated() {
331
    synchronized (lock) {
1✔
332
      return terminated;
1✔
333
    }
334
  }
335

336
  /**
337
   * Remove transport service from accounting collection and notify of complete shutdown if
338
   * necessary.
339
   *
340
   * @param transport service to remove
341
   */
342
  private void transportClosed(ServerTransport transport) {
343
    synchronized (lock) {
1✔
344
      if (!transports.remove(transport)) {
1✔
345
        throw new AssertionError("Transport already removed");
×
346
      }
347
      channelz.removeServerSocket(ServerImpl.this, transport);
1✔
348
      checkForTermination();
1✔
349
    }
1✔
350
  }
1✔
351

352
  /** Notify of complete shutdown if necessary. */
353
  private void checkForTermination() {
354
    synchronized (lock) {
1✔
355
      if (shutdown && transports.isEmpty() && transportServersTerminated) {
1✔
356
        if (terminated) {
1✔
357
          throw new AssertionError("Server already terminated");
×
358
        }
359
        terminated = true;
1✔
360
        channelz.removeServer(this);
1✔
361
        if (executor != null) {
1✔
362
          executor = executorPool.returnObject(executor);
1✔
363
        }
364
        lock.notifyAll();
1✔
365
      }
366
    }
1✔
367
  }
1✔
368

369
  private final class ServerListenerImpl implements ServerListener {
1✔
370

371
    @Override
372
    public ServerTransportListener transportCreated(ServerTransport transport) {
373
      synchronized (lock) {
1✔
374
        transports.add(transport);
1✔
375
      }
1✔
376
      ServerTransportListenerImpl stli = new ServerTransportListenerImpl(transport);
1✔
377
      stli.init();
1✔
378
      return stli;
1✔
379
    }
380

381
    @Override
382
    public void serverShutdown() {
383
      ArrayList<ServerTransport> copiedTransports;
384
      Status shutdownNowStatusCopy;
385
      synchronized (lock) {
1✔
386
        if (serverShutdownCallbackInvoked) {
1✔
387
          return;
×
388
        }
389

390
        // transports collection can be modified during shutdown(), even if we hold the lock, due
391
        // to reentrancy.
392
        copiedTransports = new ArrayList<>(transports);
1✔
393
        shutdownNowStatusCopy = shutdownNowStatus;
1✔
394
        serverShutdownCallbackInvoked = true;
1✔
395
      }
1✔
396
      for (ServerTransport transport : copiedTransports) {
1✔
397
        if (shutdownNowStatusCopy == null) {
1✔
398
          transport.shutdown();
1✔
399
        } else {
400
          transport.shutdownNow(shutdownNowStatusCopy);
1✔
401
        }
402
      }
1✔
403
      synchronized (lock) {
1✔
404
        transportServersTerminated = true;
1✔
405
        checkForTermination();
1✔
406
      }
1✔
407
    }
1✔
408
  }
409

410
  private final class ServerTransportListenerImpl implements ServerTransportListener {
411
    private final ServerTransport transport;
412
    private Future<?> handshakeTimeoutFuture;
413
    private Attributes attributes;
414

415
    ServerTransportListenerImpl(ServerTransport transport) {
1✔
416
      this.transport = transport;
1✔
417
    }
1✔
418

419
    public void init() {
420
      class TransportShutdownNow implements Runnable {
1✔
421
        @Override public void run() {
422
          transport.shutdownNow(Status.CANCELLED.withDescription("Handshake timeout exceeded"));
1✔
423
        }
1✔
424
      }
425

426
      if (handshakeTimeoutMillis != Long.MAX_VALUE) {
1✔
427
        handshakeTimeoutFuture = transport.getScheduledExecutorService()
1✔
428
            .schedule(new TransportShutdownNow(), handshakeTimeoutMillis, TimeUnit.MILLISECONDS);
1✔
429
      } else {
430
        // Noop, to avoid triggering Thread creation in InProcessServer
431
        handshakeTimeoutFuture = new FutureTask<Void>(new Runnable() {
1✔
432
          @Override public void run() {}
×
433
        }, null);
434
      }
435
      channelz.addServerSocket(ServerImpl.this, transport);
1✔
436
    }
1✔
437

438
    @Override
439
    public Attributes transportReady(Attributes attributes) {
440
      handshakeTimeoutFuture.cancel(false);
1✔
441
      handshakeTimeoutFuture = null;
1✔
442

443
      for (ServerTransportFilter filter : transportFilters) {
1✔
444
        attributes = Preconditions.checkNotNull(filter.transportReady(attributes),
1✔
445
            "Filter %s returned null", filter);
446
      }
1✔
447
      this.attributes = attributes;
1✔
448
      return attributes;
1✔
449
    }
450

451
    @Override
452
    public void transportTerminated() {
453
      if (handshakeTimeoutFuture != null) {
1✔
454
        handshakeTimeoutFuture.cancel(false);
1✔
455
        handshakeTimeoutFuture = null;
1✔
456
      }
457
      for (ServerTransportFilter filter : transportFilters) {
1✔
458
        filter.transportTerminated(attributes);
1✔
459
      }
1✔
460
      transportClosed(transport);
1✔
461
    }
1✔
462

463

464
    @Override
465
    public void streamCreated(ServerStream stream, String methodName, Metadata headers) {
466
      Tag tag = PerfMark.createTag(methodName, stream.streamId());
1✔
467
      try (TaskCloseable ignore = PerfMark.traceTask("ServerTransportListener.streamCreated")) {
1✔
468
        PerfMark.attachTag(tag);
1✔
469
        streamCreatedInternal(stream, methodName, headers, tag);
1✔
470
      }
471
    }
1✔
472

473
    private void streamCreatedInternal(
474
        final ServerStream stream, final String methodName, final Metadata headers, final Tag tag) {
475
      final Executor wrappedExecutor;
476
      // This is a performance optimization that avoids the synchronization and queuing overhead
477
      // that comes with SerializingExecutor.
478
      if (executorSupplier != null || executor != directExecutor()) {
1✔
479
        wrappedExecutor = new SerializingExecutor(executor);
1✔
480
      } else {
481
        wrappedExecutor = new SerializeReentrantCallsDirectExecutor();
1✔
482
        stream.optimizeForDirectExecutor();
1✔
483
      }
484

485
      if (headers.containsKey(MESSAGE_ENCODING_KEY)) {
1✔
486
        String encoding = headers.get(MESSAGE_ENCODING_KEY);
1✔
487
        Decompressor decompressor = decompressorRegistry.lookupDecompressor(encoding);
1✔
488
        if (decompressor == null) {
1✔
489
          stream.setListener(NOOP_LISTENER);
1✔
490
          stream.close(
1✔
491
              Status.UNIMPLEMENTED.withDescription(
1✔
492
                  String.format("Can't find decompressor for %s", encoding)),
1✔
493
              new Metadata());
494
          return;
1✔
495
        }
496
        stream.setDecompressor(decompressor);
1✔
497
      }
498

499
      final StatsTraceContext statsTraceCtx = Preconditions.checkNotNull(
1✔
500
          stream.statsTraceContext(), "statsTraceCtx not present from stream");
1✔
501
      final ServerMethodDefinition<?, ?> primaryMethod = registry.lookupMethod(methodName, null);
1✔
502

503
      final Context.CancellableContext context = createContext(headers, statsTraceCtx);
1✔
504
      if (primaryMethod != null) {
1✔
505
        statsTraceCtx.serverCallMethodResolved(primaryMethod.getMethodDescriptor());
1✔
506
      }
507

508
      final Link link = PerfMark.linkOut();
1✔
509

510
      final JumpToApplicationThreadServerStreamListener jumpListener
1✔
511
          = new JumpToApplicationThreadServerStreamListener(
512
                  wrappedExecutor, executor, stream, context, tag);
1✔
513
      stream.setListener(jumpListener);
1✔
514
      final SettableFuture<ServerCallParameters<?,?>> future = SettableFuture.create();
1✔
515
      // Run in serializing executor so jumpListener.setListener() is called before any callbacks
516
      // are delivered, including any errors. MethodLookup() and HandleServerCall() are proactively
517
      // queued before any callbacks are queued at serializing executor.
518
      // MethodLookup() runs on the default executor.
519
      // When executorSupplier is enabled, MethodLookup() may set/change the executor in the
520
      // SerializingExecutor before it finishes running.
521
      // Then HandleServerCall() and callbacks would switch to the executorSupplier executor.
522
      // Otherwise, they all run on the default executor.
523

524
      final class MethodLookup extends ContextRunnable {
525
        MethodLookup() {
1✔
526
          super(context);
1✔
527
        }
1✔
528

529
        @Override
530
        public void runInContext() {
531
          try (TaskCloseable ignore =
1✔
532
                   PerfMark.traceTask("ServerTransportListener$MethodLookup.startCall")) {
1✔
533
            PerfMark.attachTag(tag);
1✔
534
            PerfMark.linkIn(link);
1✔
535
            runInternal();
1✔
536
          }
537
        }
1✔
538

539
        private void runInternal() {
540
          ServerMethodDefinition<?, ?> wrapMethod;
541
          ServerCallParameters<?, ?> callParams;
542
          try {
543
            ServerMethodDefinition<?, ?> method = primaryMethod;
1✔
544
            if (method == null) {
1✔
545
              method = fallbackRegistry.lookupMethod(methodName, stream.getAuthority());
1✔
546
            }
547
            if (method == null) {
1✔
548
              Status status = Status.UNIMPLEMENTED.withDescription(
1✔
549
                      "Method not found: " + methodName);
550
              // TODO(zhangkun83): this error may be recorded by the tracer, and if it's kept in
551
              // memory as a map whose key is the method name, this would allow a misbehaving
552
              // client to blow up the server in-memory stats storage by sending large number of
553
              // distinct unimplemented method
554
              // names. (https://github.com/grpc/grpc-java/issues/2285)
555
              jumpListener.setListener(NOOP_LISTENER);
1✔
556
              stream.close(status, new Metadata());
1✔
557
              context.cancel(null);
1✔
558
              future.cancel(false);
1✔
559
              return;
1✔
560
            }
561
            wrapMethod = wrapMethod(stream, method, statsTraceCtx);
1✔
562
            callParams = maySwitchExecutor(wrapMethod, stream, headers, context, tag);
1✔
563
            future.set(callParams);
1✔
564
          } catch (Throwable t) {
1✔
565
            jumpListener.setListener(NOOP_LISTENER);
1✔
566
            stream.close(Status.fromThrowable(t), new Metadata());
×
567
            context.cancel(null);
×
568
            future.cancel(false);
×
569
            throw t;
×
570
          }
1✔
571
        }
1✔
572

573
        private <ReqT, RespT> ServerCallParameters<ReqT, RespT> maySwitchExecutor(
574
            final ServerMethodDefinition<ReqT, RespT> methodDef,
575
            final ServerStream stream,
576
            final Metadata headers,
577
            final Context.CancellableContext context,
578
            final Tag tag) {
579
          final ServerCallImpl<ReqT, RespT> call = new ServerCallImpl<>(
1✔
580
                  stream,
581
                  methodDef.getMethodDescriptor(),
1✔
582
                  headers,
583
                  context,
584
                  decompressorRegistry,
1✔
585
                  compressorRegistry,
1✔
586
                  serverCallTracer,
1✔
587
                  tag);
588
          if (executorSupplier != null) {
1✔
589
            Executor switchingExecutor = executorSupplier.getExecutor(call, headers);
1✔
590
            if (switchingExecutor != null) {
1✔
591
              ((SerializingExecutor)wrappedExecutor).setExecutor(switchingExecutor);
1✔
592
            }
593
          }
594
          return new ServerCallParameters<>(call, methodDef.getServerCallHandler());
1✔
595
        }
596
      }
597

598
      final class HandleServerCall extends ContextRunnable {
599
        HandleServerCall() {
1✔
600
          super(context);
1✔
601
        }
1✔
602

603
        @Override
604
        public void runInContext() {
605
          try (TaskCloseable ignore =
1✔
606
                   PerfMark.traceTask("ServerTransportListener$HandleServerCall.startCall")) {
1✔
607
            PerfMark.linkIn(link);
1✔
608
            PerfMark.attachTag(tag);
1✔
609
            runInternal();
1✔
610
          }
611
        }
1✔
612

613
        private void runInternal() {
614
          ServerStreamListener listener = NOOP_LISTENER;
1✔
615
          if (future.isCancelled()) {
1✔
616
            return;
1✔
617
          }
618
          try {
619
            listener = startWrappedCall(methodName, Futures.getDone(future), headers);
1✔
620
          } catch (Throwable ex) {
1✔
621
            stream.close(Status.fromThrowable(ex), new Metadata());
1✔
622
            context.cancel(null);
1✔
623
            throw new IllegalStateException(ex);
1✔
624
          } finally {
625
            jumpListener.setListener(listener);
1✔
626
          }
627

628
          // An extremely short deadline may expire before stream.setListener(jumpListener).
629
          // This causes NPE as in issue: https://github.com/grpc/grpc-java/issues/6300
630
          // Delay of setting cancellationListener to context will fix the issue.
631
          context.addListener(new ServerStreamCancellationListener(stream), directExecutor());
1✔
632
        }
1✔
633
      }
634

635
      wrappedExecutor.execute(new MethodLookup());
1✔
636
      wrappedExecutor.execute(new HandleServerCall());
1✔
637
    }
1✔
638

639
    private Context.CancellableContext createContext(
640
        Metadata headers, StatsTraceContext statsTraceCtx) {
641
      Long timeoutNanos = headers.get(TIMEOUT_KEY);
1✔
642

643
      Context baseContext =
1✔
644
          statsTraceCtx
645
              .serverFilterContext(rootContext);
1✔
646

647
      if (timeoutNanos == null) {
1✔
648
        return baseContext.withCancellation();
1✔
649
      }
650

651
      Context.CancellableContext context =
1✔
652
          baseContext.withDeadline(
1✔
653
              Deadline.after(timeoutNanos, NANOSECONDS, ticker),
1✔
654
              transport.getScheduledExecutorService());
1✔
655

656
      return context;
1✔
657
    }
658

659
    /** Never returns {@code null}. */
660
    private <ReqT, RespT> ServerMethodDefinition<?,?> wrapMethod(ServerStream stream,
661
        ServerMethodDefinition<ReqT, RespT> methodDef, StatsTraceContext statsTraceCtx) {
662
      // TODO(ejona86): should we update fullMethodName to have the canonical path of the method?
663
      statsTraceCtx.serverCallStarted(
1✔
664
          new ServerCallInfoImpl<>(
665
              methodDef.getMethodDescriptor(), // notify with original method descriptor
1✔
666
              stream.getAttributes(),
1✔
667
              stream.getAuthority()));
1✔
668
      ServerCallHandler<ReqT, RespT> handler = methodDef.getServerCallHandler();
1✔
669
      for (ServerInterceptor interceptor : interceptors) {
1✔
670
        handler = InternalServerInterceptors.interceptCallHandlerCreate(interceptor, handler);
1✔
671
      }
672
      ServerMethodDefinition<ReqT, RespT> interceptedDef = methodDef.withServerCallHandler(handler);
1✔
673
      ServerMethodDefinition<?, ?> wMethodDef = binlog == null
1✔
674
          ? interceptedDef : binlog.wrapMethodDefinition(interceptedDef);
1✔
675
      return wMethodDef;
1✔
676
    }
677

678
    private final class ServerCallParameters<ReqT, RespT> {
679
      ServerCallImpl<ReqT, RespT> call;
680
      ServerCallHandler<ReqT, RespT> callHandler;
681

682
      public ServerCallParameters(ServerCallImpl<ReqT, RespT> call,
683
                                  ServerCallHandler<ReqT, RespT> callHandler) {
1✔
684
        this.call = call;
1✔
685
        this.callHandler = callHandler;
1✔
686
      }
1✔
687
    }
688

689
    private <WReqT, WRespT> ServerStreamListener startWrappedCall(
690
        String fullMethodName,
691
        ServerCallParameters<WReqT, WRespT> params,
692
        Metadata headers) {
693
      ServerCall.Listener<WReqT> callListener =
1✔
694
              params.callHandler.startCall(params.call, headers);
1✔
695
      if (callListener == null) {
1✔
696
        throw new NullPointerException(
1✔
697
                "startCall() returned a null listener for method " + fullMethodName);
698
      }
699
      return params.call.newServerStreamListener(callListener);
1✔
700
    }
701
  }
702

703
  /**
704
   * Propagates context cancellation to the ServerStream.
705
   *
706
   * <p>This is outside of HandleServerCall because that class holds Metadata and other state needed
707
   * only when starting the RPC. The cancellation listener will live for the life of the call, so we
708
   * avoid that useless state being retained.
709
   */
710
  static final class ServerStreamCancellationListener implements Context.CancellationListener {
711
    private final ServerStream stream;
712

713
    ServerStreamCancellationListener(ServerStream stream) {
1✔
714
      this.stream = checkNotNull(stream, "stream");
1✔
715
    }
1✔
716

717
    @Override
718
    public void cancelled(Context context) {
719
      Status status = statusFromCancelled(context);
1✔
720
      if (DEADLINE_EXCEEDED.getCode().equals(status.getCode())) {
1✔
721
        // This should rarely get run, since the client will likely cancel the stream
722
        // before the timeout is reached.
723
        stream.cancel(status);
1✔
724
      }
725
    }
1✔
726
  }
727

728
  @Override
729
  public InternalLogId getLogId() {
730
    return logId;
1✔
731
  }
732

733
  @Override
734
  public ListenableFuture<ServerStats> getStats() {
735
    ServerStats.Builder builder = new ServerStats.Builder();
1✔
736
    List<InternalInstrumented<SocketStats>> stats = transportServer.getListenSocketStatsList();
1✔
737
    if (stats != null ) {
1✔
738
      builder.addListenSockets(stats);
×
739
    }
740
    serverCallTracer.updateBuilder(builder);
1✔
741
    SettableFuture<ServerStats> ret = SettableFuture.create();
1✔
742
    ret.set(builder.build());
1✔
743
    return ret;
1✔
744
  }
745

746
  @Override
747
  public String toString() {
748
    return MoreObjects.toStringHelper(this)
×
749
        .add("logId", logId.getId())
×
750
        .add("transportServer", transportServer)
×
751
        .toString();
×
752
  }
753

754
  private static final class NoopListener implements ServerStreamListener {
755
    @Override
756
    public void messagesAvailable(MessageProducer producer) {
757
      InputStream message;
758
      while ((message = producer.next()) != null) {
×
759
        try {
760
          message.close();
×
761
        } catch (IOException e) {
×
762
          // Close any remaining messages
763
          while ((message = producer.next()) != null) {
×
764
            try {
765
              message.close();
×
766
            } catch (IOException ioException) {
×
767
              // just log additional exceptions as we are already going to throw
768
              log.log(Level.WARNING, "Exception closing stream", ioException);
×
769
            }
×
770
          }
771
          throw new RuntimeException(e);
×
772
        }
×
773
      }
774
    }
×
775

776
    @Override
777
    public void halfClosed() {}
×
778

779
    @Override
780
    public void closed(Status status) {}
1✔
781

782
    @Override
783
    public void onReady() {}
1✔
784
  }
785

786
  /**
787
   * Dispatches callbacks onto an application-provided executor and correctly propagates
788
   * exceptions.
789
   */
790
  @VisibleForTesting
791
  static final class JumpToApplicationThreadServerStreamListener implements ServerStreamListener {
792
    private final Executor callExecutor;
793
    private final Executor cancelExecutor;
794
    private final Context.CancellableContext context;
795
    private final ServerStream stream;
796
    private final Tag tag;
797
    // Only accessed from callExecutor.
798
    private ServerStreamListener listener;
799

800
    public JumpToApplicationThreadServerStreamListener(Executor executor,
801
        Executor cancelExecutor, ServerStream stream, Context.CancellableContext context, Tag tag) {
1✔
802
      this.callExecutor = executor;
1✔
803
      this.cancelExecutor = cancelExecutor;
1✔
804
      this.stream = stream;
1✔
805
      this.context = context;
1✔
806
      this.tag = tag;
1✔
807
    }
1✔
808

809
    /**
810
     * This call MUST be serialized on callExecutor to avoid races.
811
     */
812
    private ServerStreamListener getListener() {
813
      if (listener == null) {
1✔
814
        throw new IllegalStateException("listener unset");
×
815
      }
816
      return listener;
1✔
817
    }
818

819
    @VisibleForTesting
820
    void setListener(ServerStreamListener listener) {
821
      Preconditions.checkNotNull(listener, "listener must not be null");
1✔
822
      Preconditions.checkState(this.listener == null, "Listener already set");
1✔
823
      this.listener = listener;
1✔
824
    }
1✔
825

826
    /**
827
     * Like {@link ServerCall#close(Status, Metadata)}, but thread-safe for internal use.
828
     */
829
    private void internalClose(Throwable t) {
830
      // TODO(ejona86): this is not thread-safe :)
831
      String description = "Application error processing RPC";
1✔
832
      stream.close(Status.UNKNOWN.withDescription(description).withCause(t), new Metadata());
1✔
833
    }
1✔
834

835
    @Override
836
    public void messagesAvailable(final MessageProducer producer) {
837
      try (TaskCloseable ignore = PerfMark.traceTask("ServerStreamListener.messagesAvailable")) {
1✔
838
        PerfMark.attachTag(tag);
1✔
839
        final Link link = PerfMark.linkOut();
1✔
840
        final class MessagesAvailable extends ContextRunnable {
841

842
          MessagesAvailable() {
1✔
843
            super(context);
1✔
844
          }
1✔
845

846
          @Override
847
          public void runInContext() {
848
            try (TaskCloseable ignore =
1✔
849
                     PerfMark.traceTask("ServerCallListener(app).messagesAvailable")) {
1✔
850
              PerfMark.attachTag(tag);
1✔
851
              PerfMark.linkIn(link);
1✔
852
              getListener().messagesAvailable(producer);
1✔
853
            } catch (Throwable t) {
1✔
854
              internalClose(t);
1✔
855
              throw t;
1✔
856
            }
1✔
857
          }
1✔
858
        }
859

860
        callExecutor.execute(new MessagesAvailable());
1✔
861
      }
862
    }
1✔
863

864
    @Override
865
    public void halfClosed() {
866
      try (TaskCloseable ignore = PerfMark.traceTask("ServerStreamListener.halfClosed")) {
1✔
867
        PerfMark.attachTag(tag);
1✔
868
        final Link link = PerfMark.linkOut();
1✔
869
        final class HalfClosed extends ContextRunnable {
870
          HalfClosed() {
1✔
871
            super(context);
1✔
872
          }
1✔
873

874
          @Override
875
          public void runInContext() {
876
            try (TaskCloseable ignore = PerfMark.traceTask("ServerCallListener(app).halfClosed")) {
1✔
877
              PerfMark.attachTag(tag);
1✔
878
              PerfMark.linkIn(link);
1✔
879
              getListener().halfClosed();
1✔
880
            } catch (Throwable t) {
1✔
881
              internalClose(t);
1✔
882
              throw t;
1✔
883
            }
1✔
884
          }
1✔
885
        }
886

887
        callExecutor.execute(new HalfClosed());
1✔
888
      }
889
    }
1✔
890

891
    @Override
892
    public void closed(final Status status) {
893
      try (TaskCloseable ignore = PerfMark.traceTask("ServerStreamListener.closed")) {
1✔
894
        PerfMark.attachTag(tag);
1✔
895
        closedInternal(status);
1✔
896
      }
897
    }
1✔
898

899
    private void closedInternal(final Status status) {
900
      // For cancellations, promptly inform any users of the context that their work should be
901
      // aborted. Otherwise, we can wait until pending work is done.
902
      if (!status.isOk()) {
1✔
903
        // Since status was not OK we know that the call did not complete and got cancelled. To
904
        // reflect this on the context we need to close it with a cause exception. Since not every
905
        // failed status has an exception we will create one here if needed.
906
        Throwable cancelCause = status.getCause();
1✔
907
        if (cancelCause == null) {
1✔
908
          cancelCause = InternalStatus.asRuntimeExceptionWithoutStacktrace(
1✔
909
              Status.CANCELLED.withDescription("RPC cancelled"), null);
1✔
910
        }
911

912
        // The callExecutor might be busy doing user work. To avoid waiting, use an executor that
913
        // is not serializing.
914
        cancelExecutor.execute(new ContextCloser(context, cancelCause));
1✔
915
      }
916
      final Link link = PerfMark.linkOut();
1✔
917

918
      final class Closed extends ContextRunnable {
919
        Closed() {
1✔
920
          super(context);
1✔
921
        }
1✔
922

923
        @Override
924
        public void runInContext() {
925
          try (TaskCloseable ignore = PerfMark.traceTask("ServerCallListener(app).closed")) {
1✔
926
            PerfMark.attachTag(tag);
1✔
927
            PerfMark.linkIn(link);
1✔
928
            getListener().closed(status);
1✔
929
          }
930
        }
1✔
931
      }
932

933
      callExecutor.execute(new Closed());
1✔
934
    }
1✔
935

936
    @Override
937
    public void onReady() {
938
      try (TaskCloseable ignore = PerfMark.traceTask("ServerStreamListener.onReady")) {
1✔
939
        PerfMark.attachTag(tag);
1✔
940
        final Link link = PerfMark.linkOut();
1✔
941

942
        final class OnReady extends ContextRunnable {
943
          OnReady() {
1✔
944
            super(context);
1✔
945
          }
1✔
946

947
          @Override
948
          public void runInContext() {
949
            try (TaskCloseable ignore = PerfMark.traceTask("ServerCallListener(app).onReady")) {
1✔
950
              PerfMark.attachTag(tag);
1✔
951
              PerfMark.linkIn(link);
1✔
952
              getListener().onReady();
1✔
953
            } catch (Throwable t) {
1✔
954
              internalClose(t);
1✔
955
              throw t;
1✔
956
            }
1✔
957
          }
1✔
958
        }
959

960
        callExecutor.execute(new OnReady());
1✔
961
      }
962
    }
1✔
963
  }
964

965
  @VisibleForTesting
966
  static final class ContextCloser implements Runnable {
967
    private final Context.CancellableContext context;
968
    private final Throwable cause;
969

970
    ContextCloser(Context.CancellableContext context, Throwable cause) {
1✔
971
      this.context = context;
1✔
972
      this.cause = cause;
1✔
973
    }
1✔
974

975
    @Override
976
    public void run() {
977
      context.cancel(cause);
1✔
978
    }
1✔
979
  }
980
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc