• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

grpc / grpc-java / #19211

08 May 2024 10:50PM UTC coverage: 88.309% (-0.02%) from 88.328%
#19211

push

github

ejona86
xds: Plumb locality in xds_cluster_impl and weighted_target

As part of gRFC A78:

> To support the locality label in the WRR metrics, we will extend the
> `weighted_target` LB policy (see A28) to define a resolver attribute
> that indicates the name of its child. This attribute will be passed
> down to each of its children with the appropriate value, so that any
> LB policy that sits underneath the `weighted_target` policy will be
> able to use it.

xds_cluster_impl is involved because it uses the child names in the
AddressFilter, which must match the names used by weighted_target.
Instead of using Locality.toString() in multiple policies and assuming
the policies agree, we now have xds_cluster_impl decide the locality's
name and pass it down explicitly. This allows us to change the name
format to match gRFC A78:

> If locality information is available, the value of this label will be
> of the form `{region="${REGION}", zone="${ZONE}",
> sub_zone="${SUB_ZONE}"}`, where `${REGION}`, `${ZONE}`, and
> `${SUB_ZONE}` are replaced with the actual values. If no locality
> information is available, the label will be set to the empty string.

31515 of 35687 relevant lines covered (88.31%)

0.88 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

78.74
/../core/src/main/java/io/grpc/internal/ServerImplBuilder.java
1
/*
2
 * Copyright 2020 The gRPC Authors
3
 *
4
 * Licensed under the Apache License, Version 2.0 (the "License");
5
 * you may not use this file except in compliance with the License.
6
 * You may obtain a copy of the License at
7
 *
8
 *     http://www.apache.org/licenses/LICENSE-2.0
9
 *
10
 * Unless required by applicable law or agreed to in writing, software
11
 * distributed under the License is distributed on an "AS IS" BASIS,
12
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
 * See the License for the specific language governing permissions and
14
 * limitations under the License.
15
 */
16

17
package io.grpc.internal;
18

19
import static com.google.common.base.Preconditions.checkArgument;
20
import static com.google.common.base.Preconditions.checkNotNull;
21

22
import com.google.common.annotations.VisibleForTesting;
23
import com.google.common.util.concurrent.MoreExecutors;
24
import com.google.errorprone.annotations.DoNotCall;
25
import io.grpc.BinaryLog;
26
import io.grpc.BindableService;
27
import io.grpc.CompressorRegistry;
28
import io.grpc.Configurator;
29
import io.grpc.ConfiguratorRegistry;
30
import io.grpc.Context;
31
import io.grpc.Deadline;
32
import io.grpc.DecompressorRegistry;
33
import io.grpc.HandlerRegistry;
34
import io.grpc.InternalChannelz;
35
import io.grpc.InternalGlobalInterceptors;
36
import io.grpc.Server;
37
import io.grpc.ServerBuilder;
38
import io.grpc.ServerCallExecutorSupplier;
39
import io.grpc.ServerInterceptor;
40
import io.grpc.ServerMethodDefinition;
41
import io.grpc.ServerServiceDefinition;
42
import io.grpc.ServerStreamTracer;
43
import io.grpc.ServerTransportFilter;
44
import java.io.File;
45
import java.lang.reflect.InvocationTargetException;
46
import java.lang.reflect.Method;
47
import java.util.ArrayList;
48
import java.util.Collections;
49
import java.util.List;
50
import java.util.concurrent.Executor;
51
import java.util.concurrent.TimeUnit;
52
import java.util.logging.Level;
53
import java.util.logging.Logger;
54
import javax.annotation.Nullable;
55

56
/**
57
 * Default builder for {@link io.grpc.Server} instances, for usage in Transport implementations.
58
 */
59
public final class ServerImplBuilder extends ServerBuilder<ServerImplBuilder> {
60

61
  private static final Logger log = Logger.getLogger(ServerImplBuilder.class.getName());
1✔
62

63
  @DoNotCall("ClientTransportServersBuilder is required, use a constructor")
64
  public static ServerBuilder<?> forPort(int port) {
65
    throw new UnsupportedOperationException(
×
66
        "ClientTransportServersBuilder is required, use a constructor");
67
  }
68

69
  // defaults
70
  private static final ObjectPool<? extends Executor> DEFAULT_EXECUTOR_POOL =
1✔
71
      SharedResourcePool.forResource(GrpcUtil.SHARED_CHANNEL_EXECUTOR);
1✔
72
  private static final HandlerRegistry DEFAULT_FALLBACK_REGISTRY = new DefaultFallbackRegistry();
1✔
73
  private static final DecompressorRegistry DEFAULT_DECOMPRESSOR_REGISTRY =
74
      DecompressorRegistry.getDefaultInstance();
1✔
75
  private static final CompressorRegistry DEFAULT_COMPRESSOR_REGISTRY =
76
      CompressorRegistry.getDefaultInstance();
1✔
77
  private static final long DEFAULT_HANDSHAKE_TIMEOUT_MILLIS = TimeUnit.SECONDS.toMillis(120);
1✔
78

79
  // mutable state
80
  final InternalHandlerRegistry.Builder registryBuilder =
1✔
81
      new InternalHandlerRegistry.Builder();
82
  final List<ServerTransportFilter> transportFilters = new ArrayList<>();
1✔
83
  final List<ServerInterceptor> interceptors = new ArrayList<>();
1✔
84
  private final List<ServerStreamTracer.Factory> streamTracerFactories = new ArrayList<>();
1✔
85
  private final ClientTransportServersBuilder clientTransportServersBuilder;
86
  HandlerRegistry fallbackRegistry = DEFAULT_FALLBACK_REGISTRY;
1✔
87
  ObjectPool<? extends Executor> executorPool = DEFAULT_EXECUTOR_POOL;
1✔
88
  DecompressorRegistry decompressorRegistry = DEFAULT_DECOMPRESSOR_REGISTRY;
1✔
89
  CompressorRegistry compressorRegistry = DEFAULT_COMPRESSOR_REGISTRY;
1✔
90
  long handshakeTimeoutMillis = DEFAULT_HANDSHAKE_TIMEOUT_MILLIS;
1✔
91
  Deadline.Ticker ticker = Deadline.getSystemTicker();
1✔
92
  private boolean statsEnabled = true;
1✔
93
  private boolean recordStartedRpcs = true;
1✔
94
  private boolean recordFinishedRpcs = true;
1✔
95
  private boolean recordRealTimeMetrics = false;
1✔
96
  private boolean tracingEnabled = true;
1✔
97
  @Nullable BinaryLog binlog;
98
  InternalChannelz channelz = InternalChannelz.instance();
1✔
99
  CallTracer.Factory callTracerFactory = CallTracer.getDefaultFactory();
1✔
100
  @Nullable
101
  ServerCallExecutorSupplier executorSupplier;
102

103
  /**
104
   * An interface to provide to provide transport specific information for the server. This method
105
   * is meant for Transport implementors and should not be used by normal users.
106
   */
107
  public interface ClientTransportServersBuilder {
108
    InternalServer buildClientTransportServers(
109
        List<? extends ServerStreamTracer.Factory> streamTracerFactories);
110
  }
111

112
  /**
113
   * Creates a new server builder with given transport servers provider.
114
   */
115
  public ServerImplBuilder(ClientTransportServersBuilder clientTransportServersBuilder) {
1✔
116
    this.clientTransportServersBuilder = checkNotNull(clientTransportServersBuilder,
1✔
117
        "clientTransportServersBuilder");
118
    // TODO(dnvindhya): Move configurator to all the individual builders
119
    for (Configurator configurator : ConfiguratorRegistry.getDefaultRegistry().getConfigurators()) {
1✔
120
      configurator.configureServerBuilder(this);
×
121
    }
×
122
  }
1✔
123

124
  @Override
125
  public ServerImplBuilder directExecutor() {
126
    return executor(MoreExecutors.directExecutor());
1✔
127
  }
128

129
  @Override
130
  public ServerImplBuilder executor(@Nullable Executor executor) {
131
    this.executorPool = executor != null ? new FixedObjectPool<>(executor) : DEFAULT_EXECUTOR_POOL;
1✔
132
    return this;
1✔
133
  }
134

135
  @Override
136
  public ServerImplBuilder callExecutor(ServerCallExecutorSupplier executorSupplier) {
137
    this.executorSupplier = checkNotNull(executorSupplier);
×
138
    return this;
×
139
  }
140

141
  @Override
142
  public ServerImplBuilder addService(ServerServiceDefinition service) {
143
    registryBuilder.addService(checkNotNull(service, "service"));
1✔
144
    return this;
1✔
145
  }
146

147
  @Override
148
  public ServerImplBuilder addService(BindableService bindableService) {
149
    return addService(checkNotNull(bindableService, "bindableService").bindService());
1✔
150
  }
151

152
  @Override
153
  public ServerImplBuilder addTransportFilter(ServerTransportFilter filter) {
154
    transportFilters.add(checkNotNull(filter, "filter"));
1✔
155
    return this;
1✔
156
  }
157

158
  @Override
159
  public ServerImplBuilder intercept(ServerInterceptor interceptor) {
160
    interceptors.add(checkNotNull(interceptor, "interceptor"));
1✔
161
    return this;
1✔
162
  }
163

164
  @Override
165
  public ServerImplBuilder addStreamTracerFactory(ServerStreamTracer.Factory factory) {
166
    streamTracerFactories.add(checkNotNull(factory, "factory"));
1✔
167
    return this;
1✔
168
  }
169

170
  @Override
171
  public ServerImplBuilder fallbackHandlerRegistry(@Nullable HandlerRegistry registry) {
172
    this.fallbackRegistry = registry != null ? registry : DEFAULT_FALLBACK_REGISTRY;
1✔
173
    return this;
1✔
174
  }
175

176
  @Override
177
  public ServerImplBuilder decompressorRegistry(@Nullable DecompressorRegistry registry) {
178
    this.decompressorRegistry = registry != null ? registry : DEFAULT_DECOMPRESSOR_REGISTRY;
1✔
179
    return this;
1✔
180
  }
181

182
  @Override
183
  public ServerImplBuilder compressorRegistry(@Nullable CompressorRegistry registry) {
184
    this.compressorRegistry = registry != null ? registry : DEFAULT_COMPRESSOR_REGISTRY;
1✔
185
    return this;
1✔
186
  }
187

188
  @Override
189
  public ServerImplBuilder handshakeTimeout(long timeout, TimeUnit unit) {
190
    checkArgument(timeout > 0, "handshake timeout is %s, but must be positive", timeout);
1✔
191
    this.handshakeTimeoutMillis = checkNotNull(unit, "unit").toMillis(timeout);
1✔
192
    return this;
1✔
193
  }
194

195
  @Override
196
  public ServerImplBuilder setBinaryLog(@Nullable BinaryLog binaryLog) {
197
    this.binlog = binaryLog;
×
198
    return this;
×
199
  }
200

201
  /**
202
   * Disable or enable stats features.  Enabled by default.
203
   */
204
  public void setStatsEnabled(boolean value) {
205
    this.statsEnabled = value;
1✔
206
  }
1✔
207

208
  /**
209
   * Disable or enable stats recording for RPC upstarts.  Effective only if {@link
210
   * #setStatsEnabled} is set to true.  Enabled by default.
211
   */
212
  public void setStatsRecordStartedRpcs(boolean value) {
213
    recordStartedRpcs = value;
1✔
214
  }
1✔
215

216
  /**
217
   * Disable or enable stats recording for RPC completions.  Effective only if {@link
218
   * #setStatsEnabled} is set to true.  Enabled by default.
219
   */
220
  public void setStatsRecordFinishedRpcs(boolean value) {
221
    recordFinishedRpcs = value;
1✔
222
  }
1✔
223

224
  /**
225
   * Disable or enable real-time metrics recording.  Effective only if {@link #setStatsEnabled} is
226
   * set to true.  Disabled by default.
227
   */
228
  public void setStatsRecordRealTimeMetrics(boolean value) {
229
    recordRealTimeMetrics = value;
×
230
  }
×
231

232
  /**
233
   * Disable or enable tracing features.  Enabled by default.
234
   */
235
  public void setTracingEnabled(boolean value) {
236
    tracingEnabled = value;
1✔
237
  }
1✔
238

239
  /**
240
   * Sets a custom deadline ticker.  This should only be called from InProcessServerBuilder.
241
   */
242
  public void setDeadlineTicker(Deadline.Ticker ticker) {
243
    this.ticker = checkNotNull(ticker, "ticker");
×
244
  }
×
245

246
  @Override
247
  public Server build() {
248
    return new ServerImpl(this,
1✔
249
        clientTransportServersBuilder.buildClientTransportServers(getTracerFactories()),
1✔
250
        Context.ROOT);
251
  }
252

253
  @VisibleForTesting
254
  List<? extends ServerStreamTracer.Factory> getTracerFactories() {
255
    ArrayList<ServerStreamTracer.Factory> tracerFactories = new ArrayList<>();
1✔
256
    boolean isGlobalInterceptorsTracersSet = false;
1✔
257
    List<ServerInterceptor> globalServerInterceptors
258
        = InternalGlobalInterceptors.getServerInterceptors();
1✔
259
    List<ServerStreamTracer.Factory> globalServerStreamTracerFactories
260
        = InternalGlobalInterceptors.getServerStreamTracerFactories();
1✔
261
    if (globalServerInterceptors != null) {
1✔
262
      tracerFactories.addAll(globalServerStreamTracerFactories);
×
263
      interceptors.addAll(globalServerInterceptors);
×
264
      isGlobalInterceptorsTracersSet = true;
×
265
    }
266
    if (!isGlobalInterceptorsTracersSet && statsEnabled) {
1✔
267
      ServerStreamTracer.Factory censusStatsTracerFactory = null;
1✔
268
      try {
269
        Class<?> censusStatsAccessor =
1✔
270
            Class.forName("io.grpc.census.InternalCensusStatsAccessor");
1✔
271
        Method getServerStreamTracerFactoryMethod =
1✔
272
            censusStatsAccessor.getDeclaredMethod(
1✔
273
                "getServerStreamTracerFactory",
274
                boolean.class,
275
                boolean.class,
276
                boolean.class);
277
        censusStatsTracerFactory =
1✔
278
            (ServerStreamTracer.Factory) getServerStreamTracerFactoryMethod
279
                .invoke(
1✔
280
                    null,
281
                    recordStartedRpcs,
1✔
282
                    recordFinishedRpcs,
1✔
283
                    recordRealTimeMetrics);
1✔
284
      } catch (ClassNotFoundException e) {
1✔
285
        // Replace these separate catch statements with multicatch when Android min-API >= 19
286
        log.log(Level.FINE, "Unable to apply census stats", e);
1✔
287
      } catch (NoSuchMethodException e) {
×
288
        log.log(Level.FINE, "Unable to apply census stats", e);
×
289
      } catch (IllegalAccessException e) {
×
290
        log.log(Level.FINE, "Unable to apply census stats", e);
×
291
      } catch (InvocationTargetException e) {
×
292
        log.log(Level.FINE, "Unable to apply census stats", e);
×
293
      }
1✔
294
      if (censusStatsTracerFactory != null) {
1✔
295
        tracerFactories.add(censusStatsTracerFactory);
1✔
296
      }
297
    }
298
    if (!isGlobalInterceptorsTracersSet && tracingEnabled) {
1✔
299
      ServerStreamTracer.Factory tracingStreamTracerFactory = null;
1✔
300
      try {
301
        Class<?> censusTracingAccessor =
1✔
302
            Class.forName("io.grpc.census.InternalCensusTracingAccessor");
1✔
303
        Method getServerStreamTracerFactoryMethod =
1✔
304
            censusTracingAccessor.getDeclaredMethod("getServerStreamTracerFactory");
1✔
305
        tracingStreamTracerFactory =
1✔
306
            (ServerStreamTracer.Factory) getServerStreamTracerFactoryMethod.invoke(null);
1✔
307
      } catch (ClassNotFoundException e) {
1✔
308
        // Replace these separate catch statements with multicatch when Android min-API >= 19
309
        log.log(Level.FINE, "Unable to apply census stats", e);
1✔
310
      } catch (NoSuchMethodException e) {
×
311
        log.log(Level.FINE, "Unable to apply census stats", e);
×
312
      } catch (IllegalAccessException e) {
×
313
        log.log(Level.FINE, "Unable to apply census stats", e);
×
314
      } catch (InvocationTargetException e) {
×
315
        log.log(Level.FINE, "Unable to apply census stats", e);
×
316
      }
1✔
317
      if (tracingStreamTracerFactory != null) {
1✔
318
        tracerFactories.add(tracingStreamTracerFactory);
1✔
319
      }
320
    }
321
    tracerFactories.addAll(streamTracerFactories);
1✔
322
    tracerFactories.trimToSize();
1✔
323
    return Collections.unmodifiableList(tracerFactories);
1✔
324
  }
325

326
  public InternalChannelz getChannelz() {
327
    return channelz;
1✔
328
  }
329

330
  private static final class DefaultFallbackRegistry extends HandlerRegistry {
331
    @Override
332
    public List<ServerServiceDefinition> getServices() {
333
      return Collections.emptyList();
1✔
334
    }
335

336
    @Nullable
337
    @Override
338
    public ServerMethodDefinition<?, ?> lookupMethod(
339
        String methodName, @Nullable String authority) {
340
      return null;
1✔
341
    }
342
  }
343

344
  /**
345
   * Returns the internal ExecutorPool for offloading tasks.
346
   */
347
  public ObjectPool<? extends Executor> getExecutorPool() {
348
    return this.executorPool;
1✔
349
  }
350

351
  @Override
352
  public ServerImplBuilder useTransportSecurity(File certChain, File privateKey) {
353
    throw new UnsupportedOperationException("TLS not supported in ServerImplBuilder");
×
354
  }
355
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc