• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

HotelsDotCom / waggle-dance / #445

24 Jul 2025 02:27PM UTC coverage: 69.257% (-7.1%) from 76.395%
#445

push

patduin
Merge branch 'hive-3.x' of github.com:ExpediaGroup/waggle-dance into hive-3.x

2620 of 3783 relevant lines covered (69.26%)

0.69 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

88.57
/waggle-dance-core/src/main/java/com/hotels/bdp/waggledance/server/TTransportMonitor.java
1
/**
2
 * Copyright (C) 2016-2025 Expedia, Inc.
3
 *
4
 * Licensed under the Apache License, Version 2.0 (the "License");
5
 * you may not use this file except in compliance with the License.
6
 * You may obtain a copy of the License at
7
 *
8
 * http://www.apache.org/licenses/LICENSE-2.0
9
 *
10
 * Unless required by applicable law or agreed to in writing, software
11
 * distributed under the License is distributed on an "AS IS" BASIS,
12
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
 * See the License for the specific language governing permissions and
14
 * limitations under the License.
15
 */
16
package com.hotels.bdp.waggledance.server;
17

18
import java.io.Closeable;
19
import java.util.Iterator;
20
import java.util.concurrent.ConcurrentLinkedQueue;
21
import java.util.concurrent.Executors;
22
import java.util.concurrent.ScheduledExecutorService;
23

24
import javax.annotation.PreDestroy;
25
import javax.annotation.WillClose;
26

27
import org.apache.thrift.transport.TTransport;
28
import org.slf4j.Logger;
29
import org.slf4j.LoggerFactory;
30
import org.springframework.beans.factory.annotation.Autowired;
31
import org.springframework.stereotype.Component;
32

33
import io.micrometer.core.instrument.Gauge;
34
import io.micrometer.core.instrument.MeterRegistry;
35
import lombok.AllArgsConstructor;
36

37
import com.google.common.annotations.VisibleForTesting;
38

39
import com.hotels.bdp.waggledance.conf.WaggleDanceConfiguration;
40

41
@Component
42
public class TTransportMonitor {
43
  private static Logger log = LoggerFactory.getLogger(TTransportMonitor.class);
1✔
44
  static final String METRIC_NAME_OPEN_TRANSPORTS = "com_hotels_bdp_waggledance_open_transports_gauge";
45
  private static final Logger LOG = LoggerFactory.getLogger(TTransportMonitor.class);
1✔
46

47
  @AllArgsConstructor
48
  private static class ActionContainer {
49
    private final TTransport transport;
50
    private final Closeable action;
51
  }
52

53
  private final ScheduledExecutorService scheduler;
54
  private final ConcurrentLinkedQueue<ActionContainer> transports = new ConcurrentLinkedQueue<>();
1✔
55

56
  @Autowired
57
  public TTransportMonitor(WaggleDanceConfiguration waggleDanceConfiguration, MeterRegistry meterRegistry) {
58
    this(waggleDanceConfiguration, Executors.newScheduledThreadPool(1), meterRegistry);
×
59
  }
×
60

61
  @VisibleForTesting
62
  TTransportMonitor(
63
      WaggleDanceConfiguration waggleDanceConfiguration,
64
      ScheduledExecutorService scheduler,
65
      MeterRegistry meterRegistry) {
1✔
66
    this.scheduler = scheduler;
1✔
67
    Gauge.builder(METRIC_NAME_OPEN_TRANSPORTS, transports, ConcurrentLinkedQueue::size).register(meterRegistry);
1✔
68
    Runnable monitor = () -> {
1✔
69
      log.debug("Releasing disconnected sessions");
1✔
70
      Iterator<ActionContainer> iterator = transports.iterator();
1✔
71
      while (iterator.hasNext()) {
1✔
72
        ActionContainer actionContainer = iterator.next();
1✔
73
        if (actionContainer.transport.peek()) {
1✔
74
          continue;
1✔
75
        }
76
        try {
77
          actionContainer.action.close();
1✔
78
        } catch (Exception e) {
1✔
79
          log.warn("Error closing action", e);
1✔
80
        }
1✔
81
        try {
82
          actionContainer.transport.close();
1✔
83
        } catch (Exception e) {
1✔
84
          log.warn("Error closing transport", e);
1✔
85
        }
1✔
86
        iterator.remove();
1✔
87
      }
1✔
88
      LOG.info("Number of open transports (#connections clients -> WD ): {}", transports.size());
1✔
89
    };
1✔
90
    this.scheduler
1✔
91
        .scheduleAtFixedRate(monitor, waggleDanceConfiguration.getDisconnectConnectionDelay(),
1✔
92
            waggleDanceConfiguration.getDisconnectConnectionDelay(), waggleDanceConfiguration.getDisconnectTimeUnit());
1✔
93
  }
1✔
94

95
  @PreDestroy
96
  public void shutdown() {
97
    scheduler.shutdown();
×
98
  }
×
99

100
  public void monitor(@WillClose TTransport transport, @WillClose Closeable action) {
101
    transports.offer(new ActionContainer(transport, action));
1✔
102
  }
1✔
103

104
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc