• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

HotelsDotCom / waggle-dance / #326

pending completion
#326

push

web-flow
update (#274)

* update for release

132 of 3217 relevant lines covered (4.1%)

0.04 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

0.0
/waggle-dance-core/src/main/java/com/hotels/bdp/waggledance/server/MetaStoreProxyServer.java
1
/**
2
 * Copyright (C) 2015-2021 The Apache Software Foundation and Expedia, Inc.
3
 *
4
 * This code is based on Hive's HiveMetaStore:
5
 *
6
 * https://github.com/apache/hive/blob/rel/release-2.3.0/metastore/src/java/org/apache/hadoop/hive/metastore/
7
 * HiveMetaStore.java
8
 *
9
 * Licensed to the Apache Software Foundation (ASF) under one
10
 * or more contributor license agreements.  See the NOTICE file
11
 * distributed with this work for additional information
12
 * regarding copyright ownership.  The ASF licenses this file
13
 * to you under the Apache License, Version 2.0 (the
14
 * "License"); you may not use this file except in compliance
15
 * with the License.  You may obtain a copy of the License at
16
 *
17
 *     http://www.apache.org/licenses/LICENSE-2.0
18
 *
19
 * Unless required by applicable law or agreed to in writing, software
20
 * distributed under the License is distributed on an "AS IS" BASIS,
21
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
22
 * See the License for the specific language governing permissions and
23
 * limitations under the License.
24
 */
25
package com.hotels.bdp.waggledance.server;
26

27
import java.io.IOException;
28
import java.util.ArrayList;
29
import java.util.Arrays;
30
import java.util.List;
31
import java.util.concurrent.TimeUnit;
32
import java.util.concurrent.atomic.AtomicBoolean;
33
import java.util.concurrent.locks.Condition;
34
import java.util.concurrent.locks.Lock;
35
import java.util.concurrent.locks.ReentrantLock;
36

37
import javax.annotation.PreDestroy;
38
import javax.security.auth.login.LoginException;
39

40
import org.apache.hadoop.hive.common.auth.HiveAuthUtils;
41
import org.apache.hadoop.hive.conf.HiveConf;
42
import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
43
import org.apache.hadoop.hive.metastore.TServerSocketKeepAlive;
44
import org.apache.hadoop.hive.shims.ShimLoader;
45
import org.apache.hadoop.hive.thrift.HadoopThriftAuthBridge;
46
import org.apache.hadoop.security.UserGroupInformation;
47
import org.apache.hadoop.util.StringUtils;
48
import org.apache.thrift.TProcessorFactory;
49
import org.apache.thrift.protocol.TBinaryProtocol;
50
import org.apache.thrift.server.TServer;
51
import org.apache.thrift.server.TThreadPoolServer;
52
import org.apache.thrift.transport.TFramedTransport;
53
import org.apache.thrift.transport.TServerSocket;
54
import org.apache.thrift.transport.TTransportException;
55
import org.apache.thrift.transport.TTransportFactory;
56
import org.slf4j.Logger;
57
import org.slf4j.LoggerFactory;
58
import org.springframework.beans.factory.annotation.Autowired;
59
import org.springframework.boot.ApplicationArguments;
60
import org.springframework.boot.ApplicationRunner;
61
import org.springframework.core.Ordered;
62
import org.springframework.core.annotation.Order;
63
import org.springframework.stereotype.Component;
64

65
import com.hotels.bdp.waggledance.conf.WaggleDanceConfiguration;
66
import com.hotels.bdp.waggledance.util.SaslHelper;
67

68
@Component
69
@Order(Ordered.HIGHEST_PRECEDENCE)
70
public class MetaStoreProxyServer implements ApplicationRunner {
71

72
  private static final Logger LOG = LoggerFactory.getLogger(MetaStoreProxyServer.class);
×
73

74
  /**
75
   * default port on which to start the server (48869)
76
   */
77
  public static final int DEFAULT_WAGGLEDANCE_PORT = 0xBEE5;
78
  public static final String ADMIN = "admin";
79
  public static final String PUBLIC = "public";
×
80

81
  private final HiveConf hiveConf;
82
  private final WaggleDanceConfiguration waggleDanceConfiguration;
83
  private final TProcessorFactory tProcessorFactory;
84
  private final Lock startLock;
85
  private final Condition startCondition;
86
  private TServer tServer;
87

88
  @Autowired
89
  public MetaStoreProxyServer(
×
90
      HiveConf hiveConf,
91
      WaggleDanceConfiguration waggleDanceConfiguration,
92
      TProcessorFactory tProcessorFactory) {
93
    this.hiveConf = hiveConf;
×
94
    this.waggleDanceConfiguration = waggleDanceConfiguration;
×
95
    this.tProcessorFactory = tProcessorFactory;
×
96
    startLock = new ReentrantLock();
×
97
    startCondition = startLock.newCondition();
×
98
  }
×
99

100
  private boolean isRunning() {
101
    if (tServer == null) {
×
102
      return false;
×
103
    }
104
    return tServer.isServing();
×
105
  }
106

107
  @Override
108
  public void run(ApplicationArguments args) throws Exception {
109
    if (isRunning()) {
×
110
      throw new RuntimeException("Can't run more than one instance");
×
111
    }
112

113
    final boolean isCliVerbose = waggleDanceConfiguration.isVerbose();
×
114

115
    try {
116
      String msg = "Starting WaggleDance on port " + waggleDanceConfiguration.getPort();
×
117
      LOG.info(msg);
×
118
      if (waggleDanceConfiguration.isVerbose()) {
×
119
        System.err.println(msg);
×
120
      }
121

122
      // Add shutdown hook.
123
      Runtime.getRuntime().addShutdownHook(new Thread(() -> {
×
124
        String shutdownMsg = "Shutting down WaggleDance.";
×
125
        LOG.info(shutdownMsg);
×
126
        if (isCliVerbose) {
×
127
          System.err.println(shutdownMsg);
×
128
        }
129
      }));
×
130

131
      AtomicBoolean startedServing = new AtomicBoolean();
×
132
      startWaggleDance(startLock, startCondition, startedServing);
×
133
    } catch (Throwable t) {
×
134
      // Catch the exception, log it and rethrow it.
135
      LOG.error("WaggleDance Thrift Server threw an exception...", t);
×
136
      throw new Exception(t);
×
137
    }
138
  }
×
139

140
  /**
141
   * Start Metastore based on a passed {@link HadoopThriftAuthBridge}
142
   *
143
   * @param startLock
144
   * @param startCondition
145
   * @param startedServing
146
   * @throws Throwable
147
   */
148
  private void startWaggleDance(
149
      Lock startLock,
150
      Condition startCondition,
151
      AtomicBoolean startedServing)
152
    throws Throwable {
153
    try {
154
      // Server will create new threads up to max as necessary. After an idle
155
      // period, it will destory threads to keep the number of threads in the
156
      // pool to min.
157
      int minWorkerThreads = hiveConf.getIntVar(ConfVars.METASTORESERVERMINTHREADS);
×
158
      int maxWorkerThreads = hiveConf.getIntVar(ConfVars.METASTORESERVERMAXTHREADS);
×
159
      boolean tcpKeepAlive = hiveConf.getBoolVar(ConfVars.METASTORE_TCP_KEEP_ALIVE);
×
160
      boolean useFramedTransport = hiveConf.getBoolVar(ConfVars.METASTORE_USE_THRIFT_FRAMED_TRANSPORT);
×
161
      boolean useSSL = hiveConf.getBoolVar(ConfVars.HIVE_METASTORE_USE_SSL);
×
162
      boolean useSASL = hiveConf.getBoolVar(ConfVars.METASTORE_USE_THRIFT_SASL);
×
163

164
      TServerSocket serverSocket = createServerSocket(useSSL, waggleDanceConfiguration.getPort());
×
165

166
      if (tcpKeepAlive) {
×
167
        serverSocket = new TServerSocketKeepAlive(serverSocket);
×
168
      }
169

170
      HadoopThriftAuthBridge.Server saslServer = null;
×
171

172
      if (useSASL) {
×
173
        UserGroupInformation.setConfiguration(hiveConf);
×
174
        saslServer = SaslHelper.createSaslServer(hiveConf);
×
175
      }
176
      TTransportFactory transFactory = createTTransportFactory(useFramedTransport, useSASL, saslServer);
×
177
      TProcessorFactory tProcessorFactory = getTProcessorFactory(useSASL, saslServer);
×
178
      LOG.info("Starting WaggleDance Server");
×
179

180
      TThreadPoolServer.Args args = new TThreadPoolServer.Args(serverSocket)
×
181
          .processorFactory(tProcessorFactory)
×
182
          .transportFactory(transFactory)
×
183
          .protocolFactory(new TBinaryProtocol.Factory())
×
184
          .minWorkerThreads(minWorkerThreads)
×
185
          .maxWorkerThreads(maxWorkerThreads)
×
186
          .stopTimeoutVal(waggleDanceConfiguration.getThriftServerStopTimeoutValInSeconds())
×
187
          .requestTimeout(waggleDanceConfiguration.getThriftServerRequestTimeout())
×
188
          .requestTimeoutUnit(waggleDanceConfiguration.getThriftServerRequestTimeoutUnit());
×
189

190
      tServer = new TThreadPoolServer(args);
×
191
      LOG.info("Started the new WaggleDance on port [" + waggleDanceConfiguration.getPort() + "]...");
×
192
      LOG.info("Options.minWorkerThreads = " + minWorkerThreads);
×
193
      LOG.info("Options.maxWorkerThreads = " + maxWorkerThreads);
×
194
      LOG.info("TCP keepalive = " + tcpKeepAlive);
×
195

196
      if (startLock != null) {
×
197
        signalOtherThreadsToStart(tServer, startLock, startCondition, startedServing);
×
198
      }
199
      tServer.serve();
×
200
    } catch (Throwable x) {
×
201
      LOG.error(StringUtils.stringifyException(x));
×
202
      throw x;
×
203
    }
204
    LOG.info("Waggle Dance has stopped");
×
205
  }
×
206

207
  private TProcessorFactory getTProcessorFactory(boolean useSASL,
208
                                                 HadoopThriftAuthBridge.Server server) throws TTransportException {
209
    if (useSASL) {
×
210
      return new TProcessorFactorySaslDecorator(tProcessorFactory, server);
×
211
    } else {
212
      return tProcessorFactory;
×
213
    }
214
  }
215

216
  private TTransportFactory createTTransportFactory(boolean useFramedTransport, boolean useSASL,
217
                                                    HadoopThriftAuthBridge.Server server)
218
          throws LoginException {
219
    if (useSASL) {
×
220
      return SaslHelper.getAuthTransFactory(server, hiveConf);
×
221
    }
222
    if (useFramedTransport) {
×
223
      return new TFramedTransport.Factory();
×
224
    }
225
    return new TTransportFactory();
×
226

227
  }
228

229

230
  private TServerSocket createServerSocket(boolean useSSL, int port) throws IOException, TTransportException {
231
    TServerSocket serverSocket = null;
×
232
    // enable SSL support for HMS
233
    List<String> sslVersionBlacklist = new ArrayList<>(Arrays.asList(hiveConf.getVar(ConfVars.HIVE_SSL_PROTOCOL_BLACKLIST).split(",")));
×
234
    if (!useSSL) {
×
235
      serverSocket = HiveAuthUtils.getServerSocket(null, port);
×
236
    } else {
×
237
      String keyStorePath = hiveConf.getVar(ConfVars.HIVE_METASTORE_SSL_KEYSTORE_PATH).trim();
×
238
      if (keyStorePath.isEmpty()) {
×
239
        throw new IllegalArgumentException(
×
240
            ConfVars.HIVE_METASTORE_SSL_KEYSTORE_PASSWORD.varname + " Not configured for SSL connection");
×
241
      }
242
      String keyStorePassword = ShimLoader
×
243
          .getHadoopShims()
×
244
          .getPassword(hiveConf, HiveConf.ConfVars.HIVE_METASTORE_SSL_KEYSTORE_PASSWORD.varname);
×
245
      serverSocket = HiveAuthUtils.getServerSSLSocket(null, port, keyStorePath, keyStorePassword, sslVersionBlacklist);
×
246
    }
247
    return serverSocket;
×
248
  }
249

250
  private void signalOtherThreadsToStart(
251
      final TServer server,
252
      final Lock startLock,
253
      final Condition startCondition,
254
      final AtomicBoolean startedServing) {
255
    // A simple thread to wait until the server has started and then signal the other threads to
256
    // begin
257
    Thread t = new Thread(() -> {
×
258
      do {
259
        try {
260
          Thread.sleep(1000);
×
261
        } catch (InterruptedException e) {
×
262
          LOG.warn("Signalling thread was interuppted: " + e.getMessage());
×
263
        }
264
      } while (!server.isServing());
×
265
      startLock.lock();
×
266
      try {
267
        startedServing.set(true);
×
268
        startCondition.signalAll();
×
269
      } finally {
×
270
        startLock.unlock();
×
271
      }
272
    });
×
273
    t.start();
×
274
  }
×
275

276
  @PreDestroy
277
  public void stop() {
278
    if (tServer == null) {
×
279
      return;
×
280
    }
281
    tServer.stop();
×
282
    tServer = null;
×
283
  }
×
284

285
  public void waitUntilStarted() throws InterruptedException {
286
    waitUntilStarted(3, 1, TimeUnit.MINUTES);
×
287
  }
×
288

289
  public void waitUntilStarted(int retries, long waitDelay, TimeUnit waitDelayTimeUnit) throws InterruptedException {
290
    if (isRunning()) {
×
291
      return;
×
292
    }
293
    int i = 0;
×
294
    while (i < retries) {
×
295
      i++;
×
296
      startLock.lock();
×
297
      try {
298
        if (startCondition.await(waitDelay, waitDelayTimeUnit)) {
×
299
          break;
300
        }
301
      } finally {
302
        startLock.unlock();
×
303
      }
304
      if (i == retries) {
×
305
        throw new RuntimeException("Maximum number of tries reached whilst waiting for Thrift server to be ready");
×
306
      }
307
    }
308
  }
×
309

310
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc