• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

apache / servicecomb-pack / #2765

25 Apr 2024 05:19AM UTC coverage: 81.783% (-0.04%) from 81.824%
#2765

push

web-flow
Merge pull request #784 from apache/dependabot/maven/pack-dependencies/org.apache.zookeeper-zookeeper-3.8.4

Bump org.apache.zookeeper:zookeeper from 3.6.2 to 3.8.4 in /pack-dependencies

4054 of 4957 relevant lines covered (81.78%)

1.29 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

66.38
/alpha/alpha-server/src/main/java/org/apache/servicecomb/pack/alpha/server/GrpcStartable.java
1
/*
2
 *
3
 * Licensed to the Apache Software Foundation (ASF) under one or more
4
 * contributor license agreements.  See the NOTICE file distributed with
5
 * this work for additional information regarding copyright ownership.
6
 * The ASF licenses this file to You under the Apache License, Version 2.0
7
 * (the "License"); you may not use this file except in compliance with
8
 * the License.  You may obtain a copy of the License at
9
 *
10
 *      http://www.apache.org/licenses/LICENSE-2.0
11
 *
12
 * Unless required by applicable law or agreed to in writing, software
13
 * distributed under the License is distributed on an "AS IS" BASIS,
14
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15
 * See the License for the specific language governing permissions and
16
 * limitations under the License.
17
 *
18
 *
19
 */
20

21
package org.apache.servicecomb.pack.alpha.server;
22

23
import io.netty.channel.EventLoopGroup;
24
import io.netty.channel.ServerChannel;
25
import io.netty.channel.epoll.EpollEventLoopGroup;
26
import io.netty.channel.epoll.EpollServerSocketChannel;
27
import io.netty.channel.kqueue.KQueueEventLoopGroup;
28
import io.netty.channel.kqueue.KQueueServerSocketChannel;
29
import io.netty.channel.nio.NioEventLoopGroup;
30
import io.netty.channel.socket.nio.NioServerSocketChannel;
31
import java.io.IOException;
32
import java.io.InputStream;
33
import java.lang.invoke.MethodHandles;
34
import java.net.InetSocketAddress;
35
import java.net.ServerSocket;
36
import java.nio.channels.ServerSocketChannel;
37
import java.util.*;
38
import java.util.concurrent.TimeUnit;
39
import java.util.stream.IntStream;
40

41
import javax.net.ssl.SSLException;
42

43
import com.google.common.eventbus.EventBus;
44
import org.apache.servicecomb.pack.alpha.core.event.GrpcStartableStartedEvent;
45
import org.slf4j.Logger;
46
import org.slf4j.LoggerFactory;
47

48
import io.grpc.BindableService;
49
import io.grpc.Server;
50
import io.grpc.ServerBuilder;
51
import io.grpc.netty.GrpcSslContexts;
52
import io.grpc.netty.NettyServerBuilder;
53
import io.netty.handler.ssl.ClientAuth;
54
import io.netty.handler.ssl.SslContextBuilder;
55
import io.netty.handler.ssl.SslProvider;
56

57
public class GrpcStartable implements ServerStartable {
58

59
  private static final Logger LOG = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
2✔
60
  private Server server;
61
  private final GrpcServerConfig serverConfig;
62
  private final EventBus eventBus;
63

64
  public GrpcStartable(GrpcServerConfig serverConfig, EventBus eventBus, BindableService... services) throws IOException {
2✔
65
    this.serverConfig = serverConfig;
2✔
66
    this.eventBus = eventBus;
2✔
67
    ServerBuilder<?> serverBuilder;
68
    try {
69
      OptionalInt unusedPort = findUnusedPort(serverConfig);
2✔
70
      if(unusedPort.isPresent()){
2✔
71
        serverBuilder = getServerBuilder(unusedPort.getAsInt());
2✔
72
        if (serverConfig.isSslEnable()){
2✔
73
          try {
74
            ((NettyServerBuilder) serverBuilder).sslContext(getSslContextBuilder(serverConfig).build());
2✔
75
          } catch (SSLException e) {
×
76
            throw new IllegalStateException("Unable to setup grpc to use SSL.", e);
×
77
          }
2✔
78
        }
79
        Arrays.stream(services).forEach(serverBuilder::addService);
2✔
80
        server = serverBuilder.build();
2✔
81
        serverConfig.setPort(unusedPort.getAsInt());
2✔
82
      }
83
    } catch (IOException e) {
×
84
      throw e;
×
85
    }
2✔
86
  }
2✔
87

88
  @Override
89
  public void start() {
90
    Runtime.getRuntime().addShutdownHook(new Thread(server::shutdown));
2✔
91

92
    try {
93
      eventBus.post(new GrpcStartableStartedEvent(serverConfig.getPort()));
2✔
94
      server.start();
2✔
95
      server.awaitTermination();
×
96
    } catch (IOException e) {
×
97
      throw new IllegalStateException("Unable to start grpc server.", e);
×
98
    } catch (InterruptedException e) {
×
99
      LOG.error("grpc server was interrupted.", e);
×
100
      Thread.currentThread().interrupt();
×
101
    }
×
102
  }
×
103

104
  @Override
105
  public GrpcServerConfig getGrpcServerConfig() {
106
    return this.serverConfig;
2✔
107
  }
108

109
  private ServerBuilder getServerBuilder(int port) {
110
    return NettyServerBuilder.forAddress(
2✔
111
        new InetSocketAddress(serverConfig.getHost(), port))
2✔
112
        .channelType(selectorServerChannel())
2✔
113
        .bossEventLoopGroup(selectorEventLoopGroup(1))
2✔
114
        .workerEventLoopGroup(selectorEventLoopGroup(0));
2✔
115
  }
116

117
  private SslContextBuilder getSslContextBuilder(GrpcServerConfig config) {
118

119
    Properties prop = new Properties();
2✔
120
    ClassLoader classLoader = getClass().getClassLoader();
2✔
121
    try {
122
      prop.load(classLoader.getResourceAsStream("ssl.properties"));
2✔
123
    } catch (IOException e) {
×
124
      throw new IllegalStateException("Unable to read ssl.properties.", e);
×
125
    }
2✔
126

127
    InputStream cert = getInputStream(classLoader, config.getCert(), "Server Cert");
2✔
128
    InputStream key = getInputStream(classLoader, config.getKey(), "Server Key");
2✔
129

130
    SslContextBuilder sslClientContextBuilder = SslContextBuilder.forServer(cert, key)
2✔
131
        .protocols(prop.getProperty("protocols"))
2✔
132
        .ciphers(Arrays.asList(prop.getProperty("ciphers").split(",")));
2✔
133
    if (config.isMutualAuth()) {
2✔
134
      InputStream clientCert = getInputStream(classLoader, config.getClientCert(), "Client Cert");
2✔
135
      sslClientContextBuilder.trustManager(clientCert);
2✔
136
      sslClientContextBuilder.clientAuth(ClientAuth.REQUIRE);
2✔
137
    }
138
    return GrpcSslContexts.configure(sslClientContextBuilder,
2✔
139
        SslProvider.OPENSSL);
140
  }
141

142
  private InputStream getInputStream(ClassLoader classLoader, String resource, String config) {
143
    InputStream is = classLoader.getResourceAsStream(resource);
2✔
144
    if (is == null) {
2✔
145
      throw new IllegalStateException("Cannot load the " + config + " from " + resource);
×
146
    }
147
    return is;
2✔
148

149
  }
150

151
  private OptionalInt findUnusedPort(GrpcServerConfig serverConfig) throws IOException{
152
    IntStream trialPorts;
153
    if(serverConfig.getPort()==0){
2✔
154
      LOG.info("No explicit port is given, system will pick up an ephemeral port.");
2✔
155
      if(serverConfig.isPortAutoIncrement() && serverConfig.getPortCount()>0){
2✔
156
        LOG.info("Port trial count must be positive: {}",serverConfig.getPortCount());
2✔
157
        trialPorts = IntStream.range(serverConfig.getInitialPort(),serverConfig.getInitialPort()+serverConfig.getPortCount());
2✔
158
      }else{
159
        trialPorts = IntStream.range(serverConfig.getInitialPort(),serverConfig.getInitialPort()+1);
×
160
      }
161
    }else{
162
      trialPorts = IntStream.range(serverConfig.getPort(),serverConfig.getPort()+1);
2✔
163
    }
164

165
    IOException[] error = new IOException[1];
2✔
166
    OptionalInt bindPort =  trialPorts.filter(port -> {
2✔
167
      try{
168
        ServerSocketChannel preBindServerSocketChannel = null;
2✔
169
        ServerSocket preBindServerSocket = null;
2✔
170
        InetSocketAddress inetSocketAddress = new InetSocketAddress(serverConfig.getHost(), port);
2✔
171
        try {
172
          preBindServerSocketChannel = ServerSocketChannel.open();
2✔
173
          preBindServerSocket = preBindServerSocketChannel.socket();
2✔
174
          preBindServerSocket.setReuseAddress(true);
2✔
175
          preBindServerSocket.setSoTimeout((int)TimeUnit.SECONDS.toMillis(1));
2✔
176
          preBindServerSocket.bind(inetSocketAddress, 100);
2✔
177
          LOG.info("Bind successful to inet socket address {}", inetSocketAddress);
2✔
178
          preBindServerSocketChannel.configureBlocking(false);
2✔
179
          return true;
2✔
180
        } catch (IOException e) {
×
181
          LOG.info("Bind failed to inet socket address {}", inetSocketAddress);
×
182
          throw e;
×
183
        }finally {
184
          if (preBindServerSocket != null) {
2✔
185
            try {
186
              preBindServerSocket.close();
2✔
187
            } catch (IOException ex) {
×
188
              LOG.error("closeResource failed", ex);
×
189
            }
2✔
190
          }
191
          if(preBindServerSocketChannel != null){
2✔
192
            try {
193
              preBindServerSocketChannel.close();
2✔
194
            } catch (IOException ex) {
×
195
              LOG.error("closeResource failed", ex);
×
196
            }
2✔
197
          }
198
        }
199
      }catch (IOException e){
×
200
        error[0] = e;
×
201
      }
202
      return false;
×
203
    }).findAny();
2✔
204

205
    if(bindPort.isPresent()){
2✔
206
      return bindPort;
2✔
207
    }else{
208
      throw error[0];
×
209
    }
210
  }
211

212
  /**
213
   * https://netty.io/wiki/native-transports.html
214
   *
215
   * RHEL/CentOS/Fedora:
216
   * sudo yum install autoconf automake libtool make tar \
217
   *                  glibc-devel libaio-devel \
218
   *                  libgcc.i686 glibc-devel.i686
219
   * Debian/Ubuntu:
220
   * sudo apt-get install autoconf automake libtool make tar \
221
   *                      gcc-multilib libaio-dev
222
   *
223
   * brew install autoconf automake libtool
224
   * */
225
  private Class<? extends ServerChannel> selectorServerChannel() {
226
    Class<? extends ServerChannel> channel = NioServerSocketChannel.class;
2✔
227
    if (serverConfig.isNativeTransport()) {
2✔
228
      if (OSInfo.isLinux()) {
×
229
        channel = EpollServerSocketChannel.class;
×
230
      } else if (OSInfo.isMacOS()) {
×
231
        channel = KQueueServerSocketChannel.class;
×
232
      }
233
    }
234
    LOG.info("Netty channel type is " + channel.getSimpleName());
2✔
235
    return channel;
2✔
236
  }
237

238
  private EventLoopGroup selectorEventLoopGroup(int nThreads) {
239
    EventLoopGroup group = new NioEventLoopGroup(nThreads);
2✔
240
    if (serverConfig.isNativeTransport()) {
2✔
241
      if (OSInfo.isLinux()) {
×
242
        group = new EpollEventLoopGroup(nThreads);
×
243
      } else if (OSInfo.isMacOS()) {
×
244
        group = new KQueueEventLoopGroup(nThreads);
×
245
      }
246
    }
247
    LOG.info("Netty event loop group is " + group.getClass().getSimpleName());
2✔
248
    return group;
2✔
249
  }
250

251
  static class OSInfo {
×
252
    private static String OS = System.getProperty("os.name").toLowerCase();
×
253

254
    public static boolean isLinux() {
255
      return OS.indexOf("linux") >= 0;
×
256
    }
257

258
    public static boolean isMacOS() {
259
      return OS.indexOf("mac") >= 0;
×
260
    }
261
  }
262
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc