• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

HotelsDotCom / waggle-dance / #328

pending completion
#328

push

web-flow
add `TServerEventHandler` to clean threadlocal  token in `TokenWrappingHMSHandler` (#276)

* add TServerEventHandler (for cleaning up tokens)

8 of 8 new or added lines in 1 file covered. (100.0%)

132 of 3226 relevant lines covered (4.09%)

0.04 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

0.0
/waggle-dance-core/src/main/java/com/hotels/bdp/waggledance/server/MetaStoreProxyServer.java
1
/**
×
2
 * Copyright (C) 2015-2021 The Apache Software Foundation and Expedia, Inc.
3
 *
4
 * This code is based on Hive's HiveMetaStore:
5
 *
6
 * https://github.com/apache/hive/blob/rel/release-2.3.0/metastore/src/java/org/apache/hadoop/hive/metastore/
7
 * HiveMetaStore.java
8
 *
9
 * Licensed to the Apache Software Foundation (ASF) under one
10
 * or more contributor license agreements.  See the NOTICE file
11
 * distributed with this work for additional information
12
 * regarding copyright ownership.  The ASF licenses this file
13
 * to you under the Apache License, Version 2.0 (the
14
 * "License"); you may not use this file except in compliance
15
 * with the License.  You may obtain a copy of the License at
16
 *
17
 *     http://www.apache.org/licenses/LICENSE-2.0
18
 *
19
 * Unless required by applicable law or agreed to in writing, software
20
 * distributed under the License is distributed on an "AS IS" BASIS,
21
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
22
 * See the License for the specific language governing permissions and
23
 * limitations under the License.
24
 */
25
package com.hotels.bdp.waggledance.server;
26

27
import java.io.IOException;
28
import java.util.ArrayList;
29
import java.util.Arrays;
30
import java.util.List;
31
import java.util.concurrent.TimeUnit;
32
import java.util.concurrent.atomic.AtomicBoolean;
33
import java.util.concurrent.locks.Condition;
34
import java.util.concurrent.locks.Lock;
35
import java.util.concurrent.locks.ReentrantLock;
36

37
import javax.annotation.PreDestroy;
38
import javax.security.auth.login.LoginException;
39

40
import org.apache.hadoop.hive.common.auth.HiveAuthUtils;
41
import org.apache.hadoop.hive.conf.HiveConf;
42
import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
43
import org.apache.hadoop.hive.metastore.TServerSocketKeepAlive;
44
import org.apache.hadoop.hive.shims.ShimLoader;
45
import org.apache.hadoop.hive.thrift.HadoopThriftAuthBridge;
46
import org.apache.hadoop.security.UserGroupInformation;
47
import org.apache.hadoop.util.StringUtils;
48
import org.apache.thrift.TProcessorFactory;
49
import org.apache.thrift.protocol.TBinaryProtocol;
50
import org.apache.thrift.protocol.TProtocol;
51
import org.apache.thrift.server.ServerContext;
52
import org.apache.thrift.server.TServer;
53
import org.apache.thrift.server.TServerEventHandler;
54
import org.apache.thrift.server.TThreadPoolServer;
55
import org.apache.thrift.transport.TFramedTransport;
56
import org.apache.thrift.transport.TServerSocket;
57
import org.apache.thrift.transport.TTransport;
58
import org.apache.thrift.transport.TTransportException;
59
import org.apache.thrift.transport.TTransportFactory;
60
import org.slf4j.Logger;
61
import org.slf4j.LoggerFactory;
62
import org.springframework.beans.factory.annotation.Autowired;
63
import org.springframework.boot.ApplicationArguments;
64
import org.springframework.boot.ApplicationRunner;
65
import org.springframework.core.Ordered;
66
import org.springframework.core.annotation.Order;
67
import org.springframework.stereotype.Component;
68

69
import com.hotels.bdp.waggledance.conf.WaggleDanceConfiguration;
70
import com.hotels.bdp.waggledance.util.SaslHelper;
71

72
@Component
73
@Order(Ordered.HIGHEST_PRECEDENCE)
74
public class MetaStoreProxyServer implements ApplicationRunner {
75

76
  private static final Logger LOG = LoggerFactory.getLogger(MetaStoreProxyServer.class);
×
77

78
  /**
79
   * default port on which to start the server (48869)
80
   */
81
  public static final int DEFAULT_WAGGLEDANCE_PORT = 0xBEE5;
82
  public static final String ADMIN = "admin";
83
  public static final String PUBLIC = "public";
×
84

85
  private final HiveConf hiveConf;
86
  private final WaggleDanceConfiguration waggleDanceConfiguration;
87
  private final TProcessorFactory tProcessorFactory;
88
  private final Lock startLock;
89
  private final Condition startCondition;
90
  private TServer tServer;
91

92
  @Autowired
93
  public MetaStoreProxyServer(
×
94
      HiveConf hiveConf,
95
      WaggleDanceConfiguration waggleDanceConfiguration,
96
      TProcessorFactory tProcessorFactory) {
97
    this.hiveConf = hiveConf;
×
98
    this.waggleDanceConfiguration = waggleDanceConfiguration;
×
99
    this.tProcessorFactory = tProcessorFactory;
×
100
    startLock = new ReentrantLock();
×
101
    startCondition = startLock.newCondition();
×
102
  }
×
103

104
  private boolean isRunning() {
105
    if (tServer == null) {
×
106
      return false;
×
107
    }
108
    return tServer.isServing();
×
109
  }
110

111
  @Override
112
  public void run(ApplicationArguments args) throws Exception {
113
    if (isRunning()) {
×
114
      throw new RuntimeException("Can't run more than one instance");
×
115
    }
116

117
    final boolean isCliVerbose = waggleDanceConfiguration.isVerbose();
×
118

119
    try {
120
      String msg = "Starting WaggleDance on port " + waggleDanceConfiguration.getPort();
×
121
      LOG.info(msg);
×
122
      if (waggleDanceConfiguration.isVerbose()) {
×
123
        System.err.println(msg);
×
124
      }
125

126
      // Add shutdown hook.
127
      Runtime.getRuntime().addShutdownHook(new Thread(() -> {
×
128
        String shutdownMsg = "Shutting down WaggleDance.";
×
129
        LOG.info(shutdownMsg);
×
130
        if (isCliVerbose) {
×
131
          System.err.println(shutdownMsg);
×
132
        }
133
      }));
×
134

135
      AtomicBoolean startedServing = new AtomicBoolean();
×
136
      startWaggleDance(startLock, startCondition, startedServing);
×
137
    } catch (Throwable t) {
×
138
      // Catch the exception, log it and rethrow it.
139
      LOG.error("WaggleDance Thrift Server threw an exception...", t);
×
140
      throw new Exception(t);
×
141
    }
142
  }
×
143

144
  /**
145
   * Start Metastore based on a passed {@link HadoopThriftAuthBridge}
146
   *
147
   * @param startLock
148
   * @param startCondition
149
   * @param startedServing
150
   * @throws Throwable
151
   */
152
  private void startWaggleDance(
153
      Lock startLock,
154
      Condition startCondition,
155
      AtomicBoolean startedServing)
156
    throws Throwable {
157
    try {
158
      // Server will create new threads up to max as necessary. After an idle
159
      // period, it will destory threads to keep the number of threads in the
160
      // pool to min.
161
      int minWorkerThreads = hiveConf.getIntVar(ConfVars.METASTORESERVERMINTHREADS);
×
162
      int maxWorkerThreads = hiveConf.getIntVar(ConfVars.METASTORESERVERMAXTHREADS);
×
163
      boolean tcpKeepAlive = hiveConf.getBoolVar(ConfVars.METASTORE_TCP_KEEP_ALIVE);
×
164
      boolean useFramedTransport = hiveConf.getBoolVar(ConfVars.METASTORE_USE_THRIFT_FRAMED_TRANSPORT);
×
165
      boolean useSSL = hiveConf.getBoolVar(ConfVars.HIVE_METASTORE_USE_SSL);
×
166
      boolean useSASL = hiveConf.getBoolVar(ConfVars.METASTORE_USE_THRIFT_SASL);
×
167

168
      TServerSocket serverSocket = createServerSocket(useSSL, waggleDanceConfiguration.getPort());
×
169

170
      if (tcpKeepAlive) {
×
171
        serverSocket = new TServerSocketKeepAlive(serverSocket);
×
172
      }
173

174
      HadoopThriftAuthBridge.Server saslServer = null;
×
175

176
      if (useSASL) {
×
177
        UserGroupInformation.setConfiguration(hiveConf);
×
178
        saslServer = SaslHelper.createSaslServer(hiveConf);
×
179
      }
180
      TTransportFactory transFactory = createTTransportFactory(useFramedTransport, useSASL, saslServer);
×
181
      TProcessorFactory tProcessorFactory = getTProcessorFactory(useSASL, saslServer);
×
182
      LOG.info("Starting WaggleDance Server");
×
183

184
      TThreadPoolServer.Args args = new TThreadPoolServer.Args(serverSocket)
×
185
          .processorFactory(tProcessorFactory)
×
186
          .transportFactory(transFactory)
×
187
          .protocolFactory(new TBinaryProtocol.Factory())
×
188
          .minWorkerThreads(minWorkerThreads)
×
189
          .maxWorkerThreads(maxWorkerThreads)
×
190
          .stopTimeoutVal(waggleDanceConfiguration.getThriftServerStopTimeoutValInSeconds())
×
191
          .requestTimeout(waggleDanceConfiguration.getThriftServerRequestTimeout())
×
192
          .requestTimeoutUnit(waggleDanceConfiguration.getThriftServerRequestTimeoutUnit());
×
193

194
      tServer = new TThreadPoolServer(args);
×
195
      if (useSASL){
×
196
        TServerEventHandler tServerEventHandler = new TServerEventHandler() {
×
197
          @Override
198
          public void preServe() {
199
          }
×
200

201
          @Override
202
          public ServerContext createContext(TProtocol tProtocol, TProtocol tProtocol1) {
203
            return null;
×
204
          }
205

206
          @Override
207
          public void deleteContext(ServerContext serverContext, TProtocol tProtocol, TProtocol tProtocol1) {
208
            TokenWrappingHMSHandler.removeToken();
×
209
          }
×
210

211
          @Override
212
          public void processContext(ServerContext serverContext, TTransport tTransport, TTransport tTransport1) {
213
          }
×
214
        };
215
        tServer.setServerEventHandler(tServerEventHandler);
×
216
      }
217
      LOG.info("Started the new WaggleDance on port [" + waggleDanceConfiguration.getPort() + "]...");
×
218
      LOG.info("Options.minWorkerThreads = " + minWorkerThreads);
×
219
      LOG.info("Options.maxWorkerThreads = " + maxWorkerThreads);
×
220
      LOG.info("TCP keepalive = " + tcpKeepAlive);
×
221

222
      if (startLock != null) {
×
223
        signalOtherThreadsToStart(tServer, startLock, startCondition, startedServing);
×
224
      }
225
      tServer.serve();
×
226
    } catch (Throwable x) {
×
227
      LOG.error(StringUtils.stringifyException(x));
×
228
      throw x;
×
229
    }
230
    LOG.info("Waggle Dance has stopped");
×
231
  }
×
232

233
  private TProcessorFactory getTProcessorFactory(boolean useSASL,
234
                                                 HadoopThriftAuthBridge.Server server) throws TTransportException {
235
    if (useSASL) {
×
236
      return new TProcessorFactorySaslDecorator(tProcessorFactory, server);
×
237
    } else {
238
      return tProcessorFactory;
×
239
    }
240
  }
241

242
  private TTransportFactory createTTransportFactory(boolean useFramedTransport, boolean useSASL,
243
                                                    HadoopThriftAuthBridge.Server server)
244
          throws LoginException {
245
    if (useSASL) {
×
246
      return SaslHelper.getAuthTransFactory(server, hiveConf);
×
247
    }
248
    if (useFramedTransport) {
×
249
      return new TFramedTransport.Factory();
×
250
    }
251
    return new TTransportFactory();
×
252

253
  }
254

255

256
  private TServerSocket createServerSocket(boolean useSSL, int port) throws IOException, TTransportException {
257
    TServerSocket serverSocket = null;
×
258
    // enable SSL support for HMS
259
    List<String> sslVersionBlacklist = new ArrayList<>(Arrays.asList(hiveConf.getVar(ConfVars.HIVE_SSL_PROTOCOL_BLACKLIST).split(",")));
×
260
    if (!useSSL) {
×
261
      serverSocket = HiveAuthUtils.getServerSocket(null, port);
×
262
    } else {
×
263
      String keyStorePath = hiveConf.getVar(ConfVars.HIVE_METASTORE_SSL_KEYSTORE_PATH).trim();
×
264
      if (keyStorePath.isEmpty()) {
×
265
        throw new IllegalArgumentException(
×
266
            ConfVars.HIVE_METASTORE_SSL_KEYSTORE_PASSWORD.varname + " Not configured for SSL connection");
×
267
      }
268
      String keyStorePassword = ShimLoader
×
269
          .getHadoopShims()
×
270
          .getPassword(hiveConf, HiveConf.ConfVars.HIVE_METASTORE_SSL_KEYSTORE_PASSWORD.varname);
×
271
      serverSocket = HiveAuthUtils.getServerSSLSocket(null, port, keyStorePath, keyStorePassword, sslVersionBlacklist);
×
272
    }
273
    return serverSocket;
×
274
  }
275

276
  private void signalOtherThreadsToStart(
277
      final TServer server,
278
      final Lock startLock,
279
      final Condition startCondition,
280
      final AtomicBoolean startedServing) {
281
    // A simple thread to wait until the server has started and then signal the other threads to
282
    // begin
283
    Thread t = new Thread(() -> {
×
284
      do {
285
        try {
286
          Thread.sleep(1000);
×
287
        } catch (InterruptedException e) {
×
288
          LOG.warn("Signalling thread was interuppted: " + e.getMessage());
×
289
        }
290
      } while (!server.isServing());
×
291
      startLock.lock();
×
292
      try {
293
        startedServing.set(true);
×
294
        startCondition.signalAll();
×
295
      } finally {
×
296
        startLock.unlock();
×
297
      }
298
    });
×
299
    t.start();
×
300
  }
×
301

302
  @PreDestroy
303
  public void stop() {
304
    if (tServer == null) {
×
305
      return;
×
306
    }
307
    tServer.stop();
×
308
    tServer = null;
×
309
  }
×
310

311
  public void waitUntilStarted() throws InterruptedException {
312
    waitUntilStarted(3, 1, TimeUnit.MINUTES);
×
313
  }
×
314

315
  public void waitUntilStarted(int retries, long waitDelay, TimeUnit waitDelayTimeUnit) throws InterruptedException {
316
    if (isRunning()) {
×
317
      return;
×
318
    }
319
    int i = 0;
×
320
    while (i < retries) {
×
321
      i++;
×
322
      startLock.lock();
×
323
      try {
324
        if (startCondition.await(waitDelay, waitDelayTimeUnit)) {
×
325
          break;
326
        }
327
      } finally {
328
        startLock.unlock();
×
329
      }
330
      if (i == retries) {
×
331
        throw new RuntimeException("Maximum number of tries reached whilst waiting for Thrift server to be ready");
×
332
      }
333
    }
334
  }
×
335

336
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc