• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

grpc / grpc-java / #19876

23 Jun 2025 07:51PM UTC coverage: 88.52% (-0.01%) from 88.532%
#19876

push

github

web-flow
binder: Cancel checkAuthorization() request if still pending upon termination (#12167)

34628 of 39119 relevant lines covered (88.52%)

0.89 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

90.93
/../core/src/main/java/io/grpc/internal/ServerImpl.java
1
/*
2
 * Copyright 2014 The gRPC Authors
3
 *
4
 * Licensed under the Apache License, Version 2.0 (the "License");
5
 * you may not use this file except in compliance with the License.
6
 * You may obtain a copy of the License at
7
 *
8
 *     http://www.apache.org/licenses/LICENSE-2.0
9
 *
10
 * Unless required by applicable law or agreed to in writing, software
11
 * distributed under the License is distributed on an "AS IS" BASIS,
12
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
 * See the License for the specific language governing permissions and
14
 * limitations under the License.
15
 */
16

17
package io.grpc.internal;
18

19
import static com.google.common.base.Preconditions.checkNotNull;
20
import static com.google.common.base.Preconditions.checkState;
21
import static com.google.common.util.concurrent.MoreExecutors.directExecutor;
22
import static io.grpc.Contexts.statusFromCancelled;
23
import static io.grpc.Status.DEADLINE_EXCEEDED;
24
import static io.grpc.internal.GrpcUtil.MESSAGE_ENCODING_KEY;
25
import static io.grpc.internal.GrpcUtil.TIMEOUT_KEY;
26
import static java.util.concurrent.TimeUnit.NANOSECONDS;
27

28
import com.google.common.annotations.VisibleForTesting;
29
import com.google.common.base.MoreObjects;
30
import com.google.common.base.Preconditions;
31
import com.google.common.util.concurrent.Futures;
32
import com.google.common.util.concurrent.ListenableFuture;
33
import com.google.common.util.concurrent.SettableFuture;
34
import com.google.errorprone.annotations.concurrent.GuardedBy;
35
import io.grpc.Attributes;
36
import io.grpc.BinaryLog;
37
import io.grpc.CompressorRegistry;
38
import io.grpc.Context;
39
import io.grpc.Deadline;
40
import io.grpc.Decompressor;
41
import io.grpc.DecompressorRegistry;
42
import io.grpc.HandlerRegistry;
43
import io.grpc.InternalChannelz;
44
import io.grpc.InternalChannelz.ServerStats;
45
import io.grpc.InternalChannelz.SocketStats;
46
import io.grpc.InternalInstrumented;
47
import io.grpc.InternalLogId;
48
import io.grpc.InternalServerInterceptors;
49
import io.grpc.InternalStatus;
50
import io.grpc.Metadata;
51
import io.grpc.ServerCall;
52
import io.grpc.ServerCallExecutorSupplier;
53
import io.grpc.ServerCallHandler;
54
import io.grpc.ServerInterceptor;
55
import io.grpc.ServerMethodDefinition;
56
import io.grpc.ServerServiceDefinition;
57
import io.grpc.ServerTransportFilter;
58
import io.grpc.Status;
59
import io.perfmark.Link;
60
import io.perfmark.PerfMark;
61
import io.perfmark.Tag;
62
import io.perfmark.TaskCloseable;
63
import java.io.IOException;
64
import java.io.InputStream;
65
import java.net.InetSocketAddress;
66
import java.net.SocketAddress;
67
import java.util.ArrayList;
68
import java.util.Collection;
69
import java.util.Collections;
70
import java.util.HashSet;
71
import java.util.List;
72
import java.util.Set;
73
import java.util.concurrent.Executor;
74
import java.util.concurrent.Future;
75
import java.util.concurrent.FutureTask;
76
import java.util.concurrent.TimeUnit;
77
import java.util.logging.Level;
78
import java.util.logging.Logger;
79

80
/**
81
 * Default implementation of {@link io.grpc.Server}, for creation by transports.
82
 *
83
 * <p>Expected usage (by a theoretical TCP transport):
84
 * <pre><code>public class TcpTransportServerFactory {
85
 *   public static Server newServer(Executor executor, HandlerRegistry registry,
86
 *       String configuration) {
87
 *     return new ServerImpl(executor, registry, new TcpTransportServer(configuration));
88
 *   }
89
 * }</code></pre>
90
 *
91
 * <p>Starting the server starts the underlying transport for servicing requests. Stopping the
92
 * server stops servicing new requests and waits for all connections to terminate.
93
 */
94
public final class ServerImpl extends io.grpc.Server implements InternalInstrumented<ServerStats> {
95
  private static final Logger log = Logger.getLogger(ServerImpl.class.getName());
1✔
96
  private static final ServerStreamListener NOOP_LISTENER = new NoopListener();
1✔
97

98
  private final InternalLogId logId;
99
  private final ObjectPool<? extends Executor> executorPool;
100
  /** Executor for application processing. Safe to read after {@link #start()}. */
101
  private Executor executor;
102
  private final HandlerRegistry registry;
103
  private final HandlerRegistry fallbackRegistry;
104
  private final List<ServerTransportFilter> transportFilters;
105
  // This is iterated on a per-call basis.  Use an array instead of a Collection to avoid iterator
106
  // creations.
107
  private final ServerInterceptor[] interceptors;
108
  private final long handshakeTimeoutMillis;
109
  @GuardedBy("lock") private boolean started;
110
  @GuardedBy("lock") private boolean shutdown;
111
  /** non-{@code null} if immediate shutdown has been requested. */
112
  @GuardedBy("lock") private Status shutdownNowStatus;
113
  /** {@code true} if ServerListenerImpl.serverShutdown() was called. */
114
  @GuardedBy("lock") private boolean serverShutdownCallbackInvoked;
115
  @GuardedBy("lock") private boolean terminated;
116
  /** Service encapsulating something similar to an accept() socket. */
117
  private final InternalServer transportServer;
118
  private final Object lock = new Object();
1✔
119
  @GuardedBy("lock") private boolean transportServersTerminated;
120
  /** {@code transportServer} and services encapsulating something similar to a TCP connection. */
121
  @GuardedBy("lock") private final Set<ServerTransport> transports = new HashSet<>();
1✔
122

123
  private final Context rootContext;
124

125
  private final DecompressorRegistry decompressorRegistry;
126
  private final CompressorRegistry compressorRegistry;
127
  private final BinaryLog binlog;
128

129
  private final InternalChannelz channelz;
130
  private final CallTracer serverCallTracer;
131
  private final Deadline.Ticker ticker;
132
  private final ServerCallExecutorSupplier executorSupplier;
133

134
  /**
135
   * Construct a server.
136
   *
137
   * @param builder builder with configuration for server
138
   * @param transportServer transport servers that will create new incoming transports
139
   * @param rootContext context that callbacks for new RPCs should be derived from
140
   */
141
  ServerImpl(
142
      ServerImplBuilder builder,
143
      InternalServer transportServer,
144
      Context rootContext) {
1✔
145
    this.executorPool = Preconditions.checkNotNull(builder.executorPool, "executorPool");
1✔
146
    this.registry = Preconditions.checkNotNull(builder.registryBuilder.build(), "registryBuilder");
1✔
147
    this.fallbackRegistry =
1✔
148
        Preconditions.checkNotNull(builder.fallbackRegistry, "fallbackRegistry");
1✔
149
    this.transportServer = Preconditions.checkNotNull(transportServer, "transportServer");
1✔
150
    this.logId =
1✔
151
        InternalLogId.allocate("Server", String.valueOf(getListenSocketsIgnoringLifecycle()));
1✔
152
    // Fork from the passed in context so that it does not propagate cancellation, it only
153
    // inherits values.
154
    this.rootContext = Preconditions.checkNotNull(rootContext, "rootContext").fork();
1✔
155
    this.decompressorRegistry = builder.decompressorRegistry;
1✔
156
    this.compressorRegistry = builder.compressorRegistry;
1✔
157
    this.transportFilters = Collections.unmodifiableList(
1✔
158
        new ArrayList<>(builder.transportFilters));
159
    this.interceptors =
1✔
160
        builder.interceptors.toArray(new ServerInterceptor[builder.interceptors.size()]);
1✔
161
    this.handshakeTimeoutMillis = builder.handshakeTimeoutMillis;
1✔
162
    this.binlog = builder.binlog;
1✔
163
    this.channelz = builder.channelz;
1✔
164
    this.serverCallTracer = builder.callTracerFactory.create();
1✔
165
    this.ticker = checkNotNull(builder.ticker, "ticker");
1✔
166
    channelz.addServer(this);
1✔
167
    this.executorSupplier = builder.executorSupplier;
1✔
168
  }
1✔
169

170
  /**
171
   * Bind and start the server.
172
   *
173
   * @return {@code this} object
174
   * @throws IllegalStateException if already started
175
   * @throws IOException if unable to bind
176
   */
177
  @Override
178
  public ServerImpl start() throws IOException {
179
    synchronized (lock) {
1✔
180
      checkState(!started, "Already started");
1✔
181
      checkState(!shutdown, "Shutting down");
1✔
182
      // Start and wait for any ports to actually be bound.
183

184
      ServerListenerImpl listener = new ServerListenerImpl();
1✔
185
      transportServer.start(listener);
1✔
186
      executor = Preconditions.checkNotNull(executorPool.getObject(), "executor");
1✔
187
      started = true;
1✔
188
      return this;
1✔
189
    }
190
  }
191

192

193
  @Override
194
  public int getPort() {
195
    synchronized (lock) {
1✔
196
      checkState(started, "Not started");
1✔
197
      checkState(!terminated, "Already terminated");
1✔
198
      for (SocketAddress addr: transportServer.getListenSocketAddresses()) {
1✔
199
        if (addr instanceof InetSocketAddress) {
1✔
200
          return ((InetSocketAddress) addr).getPort();
1✔
201
        }
202
      }
×
203
      return -1;
×
204
    }
205
  }
206

207
  @Override
208
  public List<SocketAddress> getListenSockets() {
209
    synchronized (lock) {
1✔
210
      checkState(started, "Not started");
1✔
211
      checkState(!terminated, "Already terminated");
1✔
212
      return getListenSocketsIgnoringLifecycle();
1✔
213
    }
214
  }
215

216
  private List<SocketAddress> getListenSocketsIgnoringLifecycle() {
217
    synchronized (lock) {
1✔
218
      return Collections.unmodifiableList(transportServer.getListenSocketAddresses());
1✔
219
    }
220
  }
221

222
  @Override
223
  public List<ServerServiceDefinition> getServices() {
224
    List<ServerServiceDefinition> fallbackServices = fallbackRegistry.getServices();
×
225
    if (fallbackServices.isEmpty()) {
×
226
      return registry.getServices();
×
227
    } else {
228
      List<ServerServiceDefinition> registryServices = registry.getServices();
×
229
      int servicesCount = registryServices.size() + fallbackServices.size();
×
230
      List<ServerServiceDefinition> services =
×
231
          new ArrayList<>(servicesCount);
232
      services.addAll(registryServices);
×
233
      services.addAll(fallbackServices);
×
234
      return Collections.unmodifiableList(services);
×
235
    }
236
  }
237

238
  @Override
239
  public List<ServerServiceDefinition> getImmutableServices() {
240
    return registry.getServices();
1✔
241
  }
242

243
  @Override
244
  public List<ServerServiceDefinition> getMutableServices() {
245
    return Collections.unmodifiableList(fallbackRegistry.getServices());
1✔
246
  }
247

248
  /**
249
   * Initiates an orderly shutdown in which preexisting calls continue but new calls are rejected.
250
   */
251
  @Override
252
  public ServerImpl shutdown() {
253
    boolean shutdownTransportServers;
254
    synchronized (lock) {
1✔
255
      if (shutdown) {
1✔
256
        return this;
1✔
257
      }
258
      shutdown = true;
1✔
259
      shutdownTransportServers = started;
1✔
260
      if (!shutdownTransportServers) {
1✔
261
        transportServersTerminated = true;
1✔
262
        checkForTermination();
1✔
263
      }
264
    }
1✔
265
    if (shutdownTransportServers) {
1✔
266
      transportServer.shutdown();
1✔
267
    }
268
    return this;
1✔
269
  }
270

271
  @Override
272
  public ServerImpl shutdownNow() {
273
    shutdown();
1✔
274
    Collection<ServerTransport> transportsCopy;
275
    Status nowStatus = Status.UNAVAILABLE.withDescription("Server shutdownNow invoked");
1✔
276
    boolean savedServerShutdownCallbackInvoked;
277
    synchronized (lock) {
1✔
278
      // Short-circuiting not strictly necessary, but prevents transports from needing to handle
279
      // multiple shutdownNow invocations if shutdownNow is called multiple times.
280
      if (shutdownNowStatus != null) {
1✔
281
        return this;
1✔
282
      }
283
      shutdownNowStatus = nowStatus;
1✔
284
      transportsCopy = new ArrayList<>(transports);
1✔
285
      savedServerShutdownCallbackInvoked = serverShutdownCallbackInvoked;
1✔
286
    }
1✔
287
    // Short-circuiting not strictly necessary, but prevents transports from needing to handle
288
    // multiple shutdownNow invocations, between here and the serverShutdown callback.
289
    if (savedServerShutdownCallbackInvoked) {
1✔
290
      // Have to call shutdownNow, because serverShutdown callback only called shutdown, not
291
      // shutdownNow
292
      for (ServerTransport transport : transportsCopy) {
1✔
293
        transport.shutdownNow(nowStatus);
1✔
294
      }
1✔
295
    }
296
    return this;
1✔
297
  }
298

299
  @Override
300
  public boolean isShutdown() {
301
    synchronized (lock) {
1✔
302
      return shutdown;
1✔
303
    }
304
  }
305

306
  @Override
307
  public boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException {
308
    synchronized (lock) {
1✔
309
      long timeoutNanos = unit.toNanos(timeout);
1✔
310
      long endTimeNanos = System.nanoTime() + timeoutNanos;
1✔
311
      while (!terminated && (timeoutNanos = endTimeNanos - System.nanoTime()) > 0) {
1✔
312
        NANOSECONDS.timedWait(lock, timeoutNanos);
1✔
313
      }
314
      return terminated;
1✔
315
    }
316
  }
317

318
  @Override
319
  public void awaitTermination() throws InterruptedException {
320
    synchronized (lock) {
1✔
321
      while (!terminated) {
1✔
322
        lock.wait();
×
323
      }
324
    }
1✔
325
  }
1✔
326

327
  @Override
328
  public boolean isTerminated() {
329
    synchronized (lock) {
1✔
330
      return terminated;
1✔
331
    }
332
  }
333

334
  /**
335
   * Remove transport service from accounting collection and notify of complete shutdown if
336
   * necessary.
337
   *
338
   * @param transport service to remove
339
   */
340
  private void transportClosed(ServerTransport transport) {
341
    synchronized (lock) {
1✔
342
      if (!transports.remove(transport)) {
1✔
343
        throw new AssertionError("Transport already removed");
×
344
      }
345
      channelz.removeServerSocket(ServerImpl.this, transport);
1✔
346
      checkForTermination();
1✔
347
    }
1✔
348
  }
1✔
349

350
  /** Notify of complete shutdown if necessary. */
351
  private void checkForTermination() {
352
    synchronized (lock) {
1✔
353
      if (shutdown && transports.isEmpty() && transportServersTerminated) {
1✔
354
        if (terminated) {
1✔
355
          throw new AssertionError("Server already terminated");
×
356
        }
357
        terminated = true;
1✔
358
        channelz.removeServer(this);
1✔
359
        if (executor != null) {
1✔
360
          executor = executorPool.returnObject(executor);
1✔
361
        }
362
        lock.notifyAll();
1✔
363
      }
364
    }
1✔
365
  }
1✔
366

367
  private final class ServerListenerImpl implements ServerListener {
1✔
368

369
    @Override
370
    public ServerTransportListener transportCreated(ServerTransport transport) {
371
      synchronized (lock) {
1✔
372
        transports.add(transport);
1✔
373
      }
1✔
374
      ServerTransportListenerImpl stli = new ServerTransportListenerImpl(transport);
1✔
375
      stli.init();
1✔
376
      return stli;
1✔
377
    }
378

379
    @Override
380
    public void serverShutdown() {
381
      ArrayList<ServerTransport> copiedTransports;
382
      Status shutdownNowStatusCopy;
383
      synchronized (lock) {
1✔
384
        if (serverShutdownCallbackInvoked) {
1✔
385
          return;
×
386
        }
387

388
        // transports collection can be modified during shutdown(), even if we hold the lock, due
389
        // to reentrancy.
390
        copiedTransports = new ArrayList<>(transports);
1✔
391
        shutdownNowStatusCopy = shutdownNowStatus;
1✔
392
        serverShutdownCallbackInvoked = true;
1✔
393
      }
1✔
394
      for (ServerTransport transport : copiedTransports) {
1✔
395
        if (shutdownNowStatusCopy == null) {
1✔
396
          transport.shutdown();
1✔
397
        } else {
398
          transport.shutdownNow(shutdownNowStatusCopy);
1✔
399
        }
400
      }
1✔
401
      synchronized (lock) {
1✔
402
        transportServersTerminated = true;
1✔
403
        checkForTermination();
1✔
404
      }
1✔
405
    }
1✔
406
  }
407

408
  private final class ServerTransportListenerImpl implements ServerTransportListener {
409
    private final ServerTransport transport;
410
    private Future<?> handshakeTimeoutFuture;
411
    private Attributes attributes;
412

413
    ServerTransportListenerImpl(ServerTransport transport) {
1✔
414
      this.transport = transport;
1✔
415
    }
1✔
416

417
    public void init() {
418
      class TransportShutdownNow implements Runnable {
1✔
419
        @Override public void run() {
420
          transport.shutdownNow(Status.CANCELLED.withDescription("Handshake timeout exceeded"));
1✔
421
        }
1✔
422
      }
423

424
      if (handshakeTimeoutMillis != Long.MAX_VALUE) {
1✔
425
        handshakeTimeoutFuture = transport.getScheduledExecutorService()
1✔
426
            .schedule(new TransportShutdownNow(), handshakeTimeoutMillis, TimeUnit.MILLISECONDS);
1✔
427
      } else {
428
        // Noop, to avoid triggering Thread creation in InProcessServer
429
        handshakeTimeoutFuture = new FutureTask<Void>(new Runnable() {
1✔
430
          @Override public void run() {}
×
431
        }, null);
432
      }
433
      channelz.addServerSocket(ServerImpl.this, transport);
1✔
434
    }
1✔
435

436
    @Override
437
    public Attributes transportReady(Attributes attributes) {
438
      handshakeTimeoutFuture.cancel(false);
1✔
439
      handshakeTimeoutFuture = null;
1✔
440

441
      for (ServerTransportFilter filter : transportFilters) {
1✔
442
        attributes = Preconditions.checkNotNull(filter.transportReady(attributes),
1✔
443
            "Filter %s returned null", filter);
444
      }
1✔
445
      this.attributes = attributes;
1✔
446
      return attributes;
1✔
447
    }
448

449
    @Override
450
    public void transportTerminated() {
451
      if (handshakeTimeoutFuture != null) {
1✔
452
        handshakeTimeoutFuture.cancel(false);
1✔
453
        handshakeTimeoutFuture = null;
1✔
454
      }
455
      for (ServerTransportFilter filter : transportFilters) {
1✔
456
        filter.transportTerminated(attributes);
1✔
457
      }
1✔
458
      transportClosed(transport);
1✔
459
    }
1✔
460

461

462
    @Override
463
    public void streamCreated(ServerStream stream, String methodName, Metadata headers) {
464
      Tag tag = PerfMark.createTag(methodName, stream.streamId());
1✔
465
      try (TaskCloseable ignore = PerfMark.traceTask("ServerTransportListener.streamCreated")) {
1✔
466
        PerfMark.attachTag(tag);
1✔
467
        streamCreatedInternal(stream, methodName, headers, tag);
1✔
468
      }
469
    }
1✔
470

471
    private void streamCreatedInternal(
472
        final ServerStream stream, final String methodName, final Metadata headers, final Tag tag) {
473
      final Executor wrappedExecutor;
474
      // This is a performance optimization that avoids the synchronization and queuing overhead
475
      // that comes with SerializingExecutor.
476
      if (executorSupplier != null || executor != directExecutor()) {
1✔
477
        wrappedExecutor = new SerializingExecutor(executor);
1✔
478
      } else {
479
        wrappedExecutor = new SerializeReentrantCallsDirectExecutor();
1✔
480
        stream.optimizeForDirectExecutor();
1✔
481
      }
482

483
      if (headers.containsKey(MESSAGE_ENCODING_KEY)) {
1✔
484
        String encoding = headers.get(MESSAGE_ENCODING_KEY);
1✔
485
        Decompressor decompressor = decompressorRegistry.lookupDecompressor(encoding);
1✔
486
        if (decompressor == null) {
1✔
487
          stream.setListener(NOOP_LISTENER);
1✔
488
          stream.close(
1✔
489
              Status.UNIMPLEMENTED.withDescription(
1✔
490
                  String.format("Can't find decompressor for %s", encoding)),
1✔
491
              new Metadata());
492
          return;
1✔
493
        }
494
        stream.setDecompressor(decompressor);
1✔
495
      }
496

497
      final StatsTraceContext statsTraceCtx = Preconditions.checkNotNull(
1✔
498
          stream.statsTraceContext(), "statsTraceCtx not present from stream");
1✔
499

500
      final Context.CancellableContext context = createContext(headers, statsTraceCtx);
1✔
501

502
      final Link link = PerfMark.linkOut();
1✔
503

504
      final JumpToApplicationThreadServerStreamListener jumpListener
1✔
505
          = new JumpToApplicationThreadServerStreamListener(
506
                  wrappedExecutor, executor, stream, context, tag);
1✔
507
      stream.setListener(jumpListener);
1✔
508
      final SettableFuture<ServerCallParameters<?,?>> future = SettableFuture.create();
1✔
509
      // Run in serializing executor so jumpListener.setListener() is called before any callbacks
510
      // are delivered, including any errors. MethodLookup() and HandleServerCall() are proactively
511
      // queued before any callbacks are queued at serializing executor.
512
      // MethodLookup() runs on the default executor.
513
      // When executorSupplier is enabled, MethodLookup() may set/change the executor in the
514
      // SerializingExecutor before it finishes running.
515
      // Then HandleServerCall() and callbacks would switch to the executorSupplier executor.
516
      // Otherwise, they all run on the default executor.
517

518
      final class MethodLookup extends ContextRunnable {
519
        MethodLookup() {
1✔
520
          super(context);
1✔
521
        }
1✔
522

523
        @Override
524
        public void runInContext() {
525
          try (TaskCloseable ignore =
1✔
526
                   PerfMark.traceTask("ServerTransportListener$MethodLookup.startCall")) {
1✔
527
            PerfMark.attachTag(tag);
1✔
528
            PerfMark.linkIn(link);
1✔
529
            runInternal();
1✔
530
          }
531
        }
1✔
532

533
        private void runInternal() {
534
          ServerMethodDefinition<?, ?> wrapMethod;
535
          ServerCallParameters<?, ?> callParams;
536
          try {
537
            ServerMethodDefinition<?, ?> method = registry.lookupMethod(methodName);
1✔
538
            if (method == null) {
1✔
539
              method = fallbackRegistry.lookupMethod(methodName, stream.getAuthority());
1✔
540
            }
541
            if (method == null) {
1✔
542
              Status status = Status.UNIMPLEMENTED.withDescription(
1✔
543
                      "Method not found: " + methodName);
544
              // TODO(zhangkun83): this error may be recorded by the tracer, and if it's kept in
545
              // memory as a map whose key is the method name, this would allow a misbehaving
546
              // client to blow up the server in-memory stats storage by sending large number of
547
              // distinct unimplemented method
548
              // names. (https://github.com/grpc/grpc-java/issues/2285)
549
              jumpListener.setListener(NOOP_LISTENER);
1✔
550
              stream.close(status, new Metadata());
1✔
551
              context.cancel(null);
1✔
552
              future.cancel(false);
1✔
553
              return;
1✔
554
            }
555
            wrapMethod = wrapMethod(stream, method, statsTraceCtx);
1✔
556
            callParams = maySwitchExecutor(wrapMethod, stream, headers, context, tag);
1✔
557
            future.set(callParams);
1✔
558
          } catch (Throwable t) {
1✔
559
            jumpListener.setListener(NOOP_LISTENER);
1✔
560
            stream.close(Status.fromThrowable(t), new Metadata());
×
561
            context.cancel(null);
×
562
            future.cancel(false);
×
563
            throw t;
×
564
          }
1✔
565
        }
1✔
566

567
        private <ReqT, RespT> ServerCallParameters<ReqT, RespT> maySwitchExecutor(
568
            final ServerMethodDefinition<ReqT, RespT> methodDef,
569
            final ServerStream stream,
570
            final Metadata headers,
571
            final Context.CancellableContext context,
572
            final Tag tag) {
573
          final ServerCallImpl<ReqT, RespT> call = new ServerCallImpl<>(
1✔
574
                  stream,
575
                  methodDef.getMethodDescriptor(),
1✔
576
                  headers,
577
                  context,
578
                  decompressorRegistry,
1✔
579
                  compressorRegistry,
1✔
580
                  serverCallTracer,
1✔
581
                  tag);
582
          if (executorSupplier != null) {
1✔
583
            Executor switchingExecutor = executorSupplier.getExecutor(call, headers);
1✔
584
            if (switchingExecutor != null) {
1✔
585
              ((SerializingExecutor)wrappedExecutor).setExecutor(switchingExecutor);
1✔
586
            }
587
          }
588
          return new ServerCallParameters<>(call, methodDef.getServerCallHandler());
1✔
589
        }
590
      }
591

592
      final class HandleServerCall extends ContextRunnable {
593
        HandleServerCall() {
1✔
594
          super(context);
1✔
595
        }
1✔
596

597
        @Override
598
        public void runInContext() {
599
          try (TaskCloseable ignore =
1✔
600
                   PerfMark.traceTask("ServerTransportListener$HandleServerCall.startCall")) {
1✔
601
            PerfMark.linkIn(link);
1✔
602
            PerfMark.attachTag(tag);
1✔
603
            runInternal();
1✔
604
          }
605
        }
1✔
606

607
        private void runInternal() {
608
          ServerStreamListener listener = NOOP_LISTENER;
1✔
609
          if (future.isCancelled()) {
1✔
610
            return;
1✔
611
          }
612
          try {
613
            listener = startWrappedCall(methodName, Futures.getDone(future), headers);
1✔
614
          } catch (Throwable ex) {
1✔
615
            stream.close(Status.fromThrowable(ex), new Metadata());
1✔
616
            context.cancel(null);
1✔
617
            throw new IllegalStateException(ex);
1✔
618
          } finally {
619
            jumpListener.setListener(listener);
1✔
620
          }
621

622
          // An extremely short deadline may expire before stream.setListener(jumpListener).
623
          // This causes NPE as in issue: https://github.com/grpc/grpc-java/issues/6300
624
          // Delay of setting cancellationListener to context will fix the issue.
625
          final class ServerStreamCancellationListener implements Context.CancellationListener {
1✔
626
            @Override
627
            public void cancelled(Context context) {
628
              Status status = statusFromCancelled(context);
1✔
629
              if (DEADLINE_EXCEEDED.getCode().equals(status.getCode())) {
1✔
630
                // This should rarely get run, since the client will likely cancel the stream
631
                // before the timeout is reached.
632
                stream.cancel(status);
1✔
633
              }
634
            }
1✔
635
          }
636

637
          context.addListener(new ServerStreamCancellationListener(), directExecutor());
1✔
638
        }
1✔
639
      }
640

641
      wrappedExecutor.execute(new MethodLookup());
1✔
642
      wrappedExecutor.execute(new HandleServerCall());
1✔
643
    }
1✔
644

645
    private Context.CancellableContext createContext(
646
        Metadata headers, StatsTraceContext statsTraceCtx) {
647
      Long timeoutNanos = headers.get(TIMEOUT_KEY);
1✔
648

649
      Context baseContext =
1✔
650
          statsTraceCtx
651
              .serverFilterContext(rootContext)
1✔
652
              .withValue(io.grpc.InternalServer.SERVER_CONTEXT_KEY, ServerImpl.this);
1✔
653

654
      if (timeoutNanos == null) {
1✔
655
        return baseContext.withCancellation();
1✔
656
      }
657

658
      Context.CancellableContext context =
1✔
659
          baseContext.withDeadline(
1✔
660
              Deadline.after(timeoutNanos, NANOSECONDS, ticker),
1✔
661
              transport.getScheduledExecutorService());
1✔
662

663
      return context;
1✔
664
    }
665

666
    /** Never returns {@code null}. */
667
    private <ReqT, RespT> ServerMethodDefinition<?,?> wrapMethod(ServerStream stream,
668
        ServerMethodDefinition<ReqT, RespT> methodDef, StatsTraceContext statsTraceCtx) {
669
      // TODO(ejona86): should we update fullMethodName to have the canonical path of the method?
670
      statsTraceCtx.serverCallStarted(
1✔
671
          new ServerCallInfoImpl<>(
672
              methodDef.getMethodDescriptor(), // notify with original method descriptor
1✔
673
              stream.getAttributes(),
1✔
674
              stream.getAuthority()));
1✔
675
      ServerCallHandler<ReqT, RespT> handler = methodDef.getServerCallHandler();
1✔
676
      for (ServerInterceptor interceptor : interceptors) {
1✔
677
        handler = InternalServerInterceptors.interceptCallHandlerCreate(interceptor, handler);
1✔
678
      }
679
      ServerMethodDefinition<ReqT, RespT> interceptedDef = methodDef.withServerCallHandler(handler);
1✔
680
      ServerMethodDefinition<?, ?> wMethodDef = binlog == null
1✔
681
          ? interceptedDef : binlog.wrapMethodDefinition(interceptedDef);
1✔
682
      return wMethodDef;
1✔
683
    }
684

685
    private final class ServerCallParameters<ReqT, RespT> {
686
      ServerCallImpl<ReqT, RespT> call;
687
      ServerCallHandler<ReqT, RespT> callHandler;
688

689
      public ServerCallParameters(ServerCallImpl<ReqT, RespT> call,
690
                                  ServerCallHandler<ReqT, RespT> callHandler) {
1✔
691
        this.call = call;
1✔
692
        this.callHandler = callHandler;
1✔
693
      }
1✔
694
    }
695

696
    private <WReqT, WRespT> ServerStreamListener startWrappedCall(
697
        String fullMethodName,
698
        ServerCallParameters<WReqT, WRespT> params,
699
        Metadata headers) {
700
      ServerCall.Listener<WReqT> callListener =
1✔
701
              params.callHandler.startCall(params.call, headers);
1✔
702
      if (callListener == null) {
1✔
703
        throw new NullPointerException(
1✔
704
                "startCall() returned a null listener for method " + fullMethodName);
705
      }
706
      return params.call.newServerStreamListener(callListener);
1✔
707
    }
708
  }
709

710
  @Override
711
  public InternalLogId getLogId() {
712
    return logId;
1✔
713
  }
714

715
  @Override
716
  public ListenableFuture<ServerStats> getStats() {
717
    ServerStats.Builder builder = new ServerStats.Builder();
1✔
718
    List<InternalInstrumented<SocketStats>> stats = transportServer.getListenSocketStatsList();
1✔
719
    if (stats != null ) {
1✔
720
      builder.addListenSockets(stats);
×
721
    }
722
    serverCallTracer.updateBuilder(builder);
1✔
723
    SettableFuture<ServerStats> ret = SettableFuture.create();
1✔
724
    ret.set(builder.build());
1✔
725
    return ret;
1✔
726
  }
727

728
  @Override
729
  public String toString() {
730
    return MoreObjects.toStringHelper(this)
×
731
        .add("logId", logId.getId())
×
732
        .add("transportServer", transportServer)
×
733
        .toString();
×
734
  }
735

736
  private static final class NoopListener implements ServerStreamListener {
737
    @Override
738
    public void messagesAvailable(MessageProducer producer) {
739
      InputStream message;
740
      while ((message = producer.next()) != null) {
×
741
        try {
742
          message.close();
×
743
        } catch (IOException e) {
×
744
          // Close any remaining messages
745
          while ((message = producer.next()) != null) {
×
746
            try {
747
              message.close();
×
748
            } catch (IOException ioException) {
×
749
              // just log additional exceptions as we are already going to throw
750
              log.log(Level.WARNING, "Exception closing stream", ioException);
×
751
            }
×
752
          }
753
          throw new RuntimeException(e);
×
754
        }
×
755
      }
756
    }
×
757

758
    @Override
759
    public void halfClosed() {}
×
760

761
    @Override
762
    public void closed(Status status) {}
1✔
763

764
    @Override
765
    public void onReady() {}
1✔
766
  }
767

768
  /**
769
   * Dispatches callbacks onto an application-provided executor and correctly propagates
770
   * exceptions.
771
   */
772
  @VisibleForTesting
773
  static final class JumpToApplicationThreadServerStreamListener implements ServerStreamListener {
774
    private final Executor callExecutor;
775
    private final Executor cancelExecutor;
776
    private final Context.CancellableContext context;
777
    private final ServerStream stream;
778
    private final Tag tag;
779
    // Only accessed from callExecutor.
780
    private ServerStreamListener listener;
781

782
    public JumpToApplicationThreadServerStreamListener(Executor executor,
783
        Executor cancelExecutor, ServerStream stream, Context.CancellableContext context, Tag tag) {
1✔
784
      this.callExecutor = executor;
1✔
785
      this.cancelExecutor = cancelExecutor;
1✔
786
      this.stream = stream;
1✔
787
      this.context = context;
1✔
788
      this.tag = tag;
1✔
789
    }
1✔
790

791
    /**
792
     * This call MUST be serialized on callExecutor to avoid races.
793
     */
794
    private ServerStreamListener getListener() {
795
      if (listener == null) {
1✔
796
        throw new IllegalStateException("listener unset");
×
797
      }
798
      return listener;
1✔
799
    }
800

801
    @VisibleForTesting
802
    void setListener(ServerStreamListener listener) {
803
      Preconditions.checkNotNull(listener, "listener must not be null");
1✔
804
      Preconditions.checkState(this.listener == null, "Listener already set");
1✔
805
      this.listener = listener;
1✔
806
    }
1✔
807

808
    /**
809
     * Like {@link ServerCall#close(Status, Metadata)}, but thread-safe for internal use.
810
     */
811
    private void internalClose(Throwable t) {
812
      // TODO(ejona86): this is not thread-safe :)
813
      String description = "Application error processing RPC";
1✔
814
      stream.close(Status.UNKNOWN.withDescription(description).withCause(t), new Metadata());
1✔
815
    }
1✔
816

817
    @Override
818
    public void messagesAvailable(final MessageProducer producer) {
819
      try (TaskCloseable ignore = PerfMark.traceTask("ServerStreamListener.messagesAvailable")) {
1✔
820
        PerfMark.attachTag(tag);
1✔
821
        final Link link = PerfMark.linkOut();
1✔
822
        final class MessagesAvailable extends ContextRunnable {
823

824
          MessagesAvailable() {
1✔
825
            super(context);
1✔
826
          }
1✔
827

828
          @Override
829
          public void runInContext() {
830
            try (TaskCloseable ignore =
1✔
831
                     PerfMark.traceTask("ServerCallListener(app).messagesAvailable")) {
1✔
832
              PerfMark.attachTag(tag);
1✔
833
              PerfMark.linkIn(link);
1✔
834
              getListener().messagesAvailable(producer);
1✔
835
            } catch (Throwable t) {
1✔
836
              internalClose(t);
1✔
837
              throw t;
1✔
838
            }
1✔
839
          }
1✔
840
        }
841

842
        callExecutor.execute(new MessagesAvailable());
1✔
843
      }
844
    }
1✔
845

846
    @Override
847
    public void halfClosed() {
848
      try (TaskCloseable ignore = PerfMark.traceTask("ServerStreamListener.halfClosed")) {
1✔
849
        PerfMark.attachTag(tag);
1✔
850
        final Link link = PerfMark.linkOut();
1✔
851
        final class HalfClosed extends ContextRunnable {
852
          HalfClosed() {
1✔
853
            super(context);
1✔
854
          }
1✔
855

856
          @Override
857
          public void runInContext() {
858
            try (TaskCloseable ignore = PerfMark.traceTask("ServerCallListener(app).halfClosed")) {
1✔
859
              PerfMark.attachTag(tag);
1✔
860
              PerfMark.linkIn(link);
1✔
861
              getListener().halfClosed();
1✔
862
            } catch (Throwable t) {
1✔
863
              internalClose(t);
1✔
864
              throw t;
1✔
865
            }
1✔
866
          }
1✔
867
        }
868

869
        callExecutor.execute(new HalfClosed());
1✔
870
      }
871
    }
1✔
872

873
    @Override
874
    public void closed(final Status status) {
875
      try (TaskCloseable ignore = PerfMark.traceTask("ServerStreamListener.closed")) {
1✔
876
        PerfMark.attachTag(tag);
1✔
877
        closedInternal(status);
1✔
878
      }
879
    }
1✔
880

881
    private void closedInternal(final Status status) {
882
      // For cancellations, promptly inform any users of the context that their work should be
883
      // aborted. Otherwise, we can wait until pending work is done.
884
      if (!status.isOk()) {
1✔
885
        // Since status was not OK we know that the call did not complete and got cancelled. To
886
        // reflect this on the context we need to close it with a cause exception. Since not every
887
        // failed status has an exception we will create one here if needed.
888
        Throwable cancelCause = status.getCause();
1✔
889
        if (cancelCause == null) {
1✔
890
          cancelCause = InternalStatus.asRuntimeExceptionWithoutStacktrace(
1✔
891
              Status.CANCELLED.withDescription("RPC cancelled"), null);
1✔
892
        }
893

894
        // The callExecutor might be busy doing user work. To avoid waiting, use an executor that
895
        // is not serializing.
896
        cancelExecutor.execute(new ContextCloser(context, cancelCause));
1✔
897
      }
898
      final Link link = PerfMark.linkOut();
1✔
899

900
      final class Closed extends ContextRunnable {
901
        Closed() {
1✔
902
          super(context);
1✔
903
        }
1✔
904

905
        @Override
906
        public void runInContext() {
907
          try (TaskCloseable ignore = PerfMark.traceTask("ServerCallListener(app).closed")) {
1✔
908
            PerfMark.attachTag(tag);
1✔
909
            PerfMark.linkIn(link);
1✔
910
            getListener().closed(status);
1✔
911
          }
912
        }
1✔
913
      }
914

915
      callExecutor.execute(new Closed());
1✔
916
    }
1✔
917

918
    @Override
919
    public void onReady() {
920
      try (TaskCloseable ignore = PerfMark.traceTask("ServerStreamListener.onReady")) {
1✔
921
        PerfMark.attachTag(tag);
1✔
922
        final Link link = PerfMark.linkOut();
1✔
923

924
        final class OnReady extends ContextRunnable {
925
          OnReady() {
1✔
926
            super(context);
1✔
927
          }
1✔
928

929
          @Override
930
          public void runInContext() {
931
            try (TaskCloseable ignore = PerfMark.traceTask("ServerCallListener(app).onReady")) {
1✔
932
              PerfMark.attachTag(tag);
1✔
933
              PerfMark.linkIn(link);
1✔
934
              getListener().onReady();
1✔
935
            } catch (Throwable t) {
1✔
936
              internalClose(t);
1✔
937
              throw t;
1✔
938
            }
1✔
939
          }
1✔
940
        }
941

942
        callExecutor.execute(new OnReady());
1✔
943
      }
944
    }
1✔
945
  }
946

947
  @VisibleForTesting
948
  static final class ContextCloser implements Runnable {
949
    private final Context.CancellableContext context;
950
    private final Throwable cause;
951

952
    ContextCloser(Context.CancellableContext context, Throwable cause) {
1✔
953
      this.context = context;
1✔
954
      this.cause = cause;
1✔
955
    }
1✔
956

957
    @Override
958
    public void run() {
959
      context.cancel(cause);
1✔
960
    }
1✔
961
  }
962
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc