• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

apache / iotdb / #9654

pending completion
#9654

push

travis_ci

web-flow
[IOTDB-6073] Add ClientManager metrics (#10617)

279 of 279 new or added lines in 8 files covered. (100.0%)

79121 of 165733 relevant lines covered (47.74%)

0.48 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

43.09
/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/client/ClientPoolFactory.java
1
/*
2
 * Licensed to the Apache Software Foundation (ASF) under one
3
 * or more contributor license agreements.  See the NOTICE file
4
 * distributed with this work for additional information
5
 * regarding copyright ownership.  The ASF licenses this file
6
 * to you under the Apache License, Version 2.0 (the
7
 * "License"); you may not use this file except in compliance
8
 * with the License.  You may obtain a copy of the License at
9
 *
10
 *     http://www.apache.org/licenses/LICENSE-2.0
11
 *
12
 * Unless required by applicable law or agreed to in writing,
13
 * software distributed under the License is distributed on an
14
 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15
 * KIND, either express or implied.  See the License for the
16
 * specific language governing permissions and limitations
17
 * under the License.
18
 */
19

20
package org.apache.iotdb.commons.client;
21

22
import org.apache.iotdb.common.rpc.thrift.TEndPoint;
23
import org.apache.iotdb.commons.client.async.AsyncConfigNodeIServiceClient;
24
import org.apache.iotdb.commons.client.async.AsyncDataNodeInternalServiceClient;
25
import org.apache.iotdb.commons.client.async.AsyncDataNodeMPPDataExchangeServiceClient;
26
import org.apache.iotdb.commons.client.async.AsyncPipeDataTransferServiceClient;
27
import org.apache.iotdb.commons.client.property.ClientPoolProperty;
28
import org.apache.iotdb.commons.client.property.ThriftClientProperty;
29
import org.apache.iotdb.commons.client.sync.SyncConfigNodeIServiceClient;
30
import org.apache.iotdb.commons.client.sync.SyncDataNodeInternalServiceClient;
31
import org.apache.iotdb.commons.client.sync.SyncDataNodeMPPDataExchangeServiceClient;
32
import org.apache.iotdb.commons.concurrent.ThreadName;
33
import org.apache.iotdb.commons.conf.CommonConfig;
34
import org.apache.iotdb.commons.conf.CommonDescriptor;
35

36
import org.apache.commons.pool2.KeyedObjectPool;
37
import org.apache.commons.pool2.impl.GenericKeyedObjectPool;
38

39
public class ClientPoolFactory {
40

41
  private static final CommonConfig conf = CommonDescriptor.getInstance().getConfig();
1✔
42

43
  private ClientPoolFactory() {}
44

45
  public static class SyncConfigNodeIServiceClientPoolFactory
×
46
      implements IClientPoolFactory<TEndPoint, SyncConfigNodeIServiceClient> {
47

48
    @Override
49
    public KeyedObjectPool<TEndPoint, SyncConfigNodeIServiceClient> createClientPool(
50
        ClientManager<TEndPoint, SyncConfigNodeIServiceClient> manager) {
51
      GenericKeyedObjectPool<TEndPoint, SyncConfigNodeIServiceClient> clientPool =
×
52
          new GenericKeyedObjectPool<>(
53
              new SyncConfigNodeIServiceClient.Factory(
54
                  manager,
55
                  new ThriftClientProperty.Builder()
56
                      .setConnectionTimeoutMs(conf.getConnectionTimeoutInMS())
×
57
                      .setRpcThriftCompressionEnabled(conf.isRpcThriftCompressionEnabled())
×
58
                      .build()),
×
59
              new ClientPoolProperty.Builder<SyncConfigNodeIServiceClient>()
60
                  .setCoreClientNumForEachNode(conf.getCoreClientNumForEachNode())
×
61
                  .setMaxClientNumForEachNode(conf.getMaxClientNumForEachNode())
×
62
                  .build()
×
63
                  .getConfig());
×
64
      ClientManagerMetrics.getInstance()
×
65
          .registerClientManager(this.getClass().getSimpleName(), clientPool);
×
66
      return clientPool;
×
67
    }
68
  }
69

70
  public static class AsyncConfigNodeIServiceClientPoolFactory
×
71
      implements IClientPoolFactory<TEndPoint, AsyncConfigNodeIServiceClient> {
72

73
    @Override
74
    public KeyedObjectPool<TEndPoint, AsyncConfigNodeIServiceClient> createClientPool(
75
        ClientManager<TEndPoint, AsyncConfigNodeIServiceClient> manager) {
76
      GenericKeyedObjectPool<TEndPoint, AsyncConfigNodeIServiceClient> clientPool =
×
77
          new GenericKeyedObjectPool<>(
78
              new AsyncConfigNodeIServiceClient.Factory(
79
                  manager,
80
                  new ThriftClientProperty.Builder()
81
                      .setConnectionTimeoutMs(conf.getConnectionTimeoutInMS())
×
82
                      .setRpcThriftCompressionEnabled(conf.isRpcThriftCompressionEnabled())
×
83
                      .setSelectorNumOfAsyncClientManager(conf.getSelectorNumOfClientManager())
×
84
                      .build(),
×
85
                  ThreadName.ASYNC_CONFIGNODE_CLIENT_POOL.getName()),
×
86
              new ClientPoolProperty.Builder<AsyncConfigNodeIServiceClient>()
87
                  .setCoreClientNumForEachNode(conf.getCoreClientNumForEachNode())
×
88
                  .setMaxClientNumForEachNode(conf.getMaxClientNumForEachNode())
×
89
                  .build()
×
90
                  .getConfig());
×
91
      ClientManagerMetrics.getInstance()
×
92
          .registerClientManager(this.getClass().getSimpleName(), clientPool);
×
93
      return clientPool;
×
94
    }
95
  }
96

97
  public static class SyncDataNodeInternalServiceClientPoolFactory
1✔
98
      implements IClientPoolFactory<TEndPoint, SyncDataNodeInternalServiceClient> {
99

100
    @Override
101
    public KeyedObjectPool<TEndPoint, SyncDataNodeInternalServiceClient> createClientPool(
102
        ClientManager<TEndPoint, SyncDataNodeInternalServiceClient> manager) {
103
      GenericKeyedObjectPool<TEndPoint, SyncDataNodeInternalServiceClient> clientPool =
1✔
104
          new GenericKeyedObjectPool<>(
105
              new SyncDataNodeInternalServiceClient.Factory(
106
                  manager,
107
                  new ThriftClientProperty.Builder()
108
                      .setConnectionTimeoutMs(conf.getConnectionTimeoutInMS())
1✔
109
                      .setRpcThriftCompressionEnabled(conf.isRpcThriftCompressionEnabled())
1✔
110
                      .build()),
1✔
111
              new ClientPoolProperty.Builder<SyncDataNodeInternalServiceClient>()
112
                  .setCoreClientNumForEachNode(conf.getCoreClientNumForEachNode())
1✔
113
                  .setMaxClientNumForEachNode(conf.getMaxClientNumForEachNode())
1✔
114
                  .build()
1✔
115
                  .getConfig());
1✔
116
      ClientManagerMetrics.getInstance()
1✔
117
          .registerClientManager(this.getClass().getSimpleName(), clientPool);
1✔
118
      return clientPool;
1✔
119
    }
120
  }
121

122
  public static class AsyncDataNodeInternalServiceClientPoolFactory
1✔
123
      implements IClientPoolFactory<TEndPoint, AsyncDataNodeInternalServiceClient> {
124

125
    @Override
126
    public KeyedObjectPool<TEndPoint, AsyncDataNodeInternalServiceClient> createClientPool(
127
        ClientManager<TEndPoint, AsyncDataNodeInternalServiceClient> manager) {
128
      GenericKeyedObjectPool<TEndPoint, AsyncDataNodeInternalServiceClient> clientPool =
1✔
129
          new GenericKeyedObjectPool<>(
130
              new AsyncDataNodeInternalServiceClient.Factory(
131
                  manager,
132
                  new ThriftClientProperty.Builder()
133
                      .setConnectionTimeoutMs(conf.getConnectionTimeoutInMS())
1✔
134
                      .setRpcThriftCompressionEnabled(conf.isRpcThriftCompressionEnabled())
1✔
135
                      .setSelectorNumOfAsyncClientManager(conf.getSelectorNumOfClientManager())
1✔
136
                      .build(),
1✔
137
                  ThreadName.ASYNC_DATANODE_CLIENT_POOL.getName()),
1✔
138
              new ClientPoolProperty.Builder<AsyncDataNodeInternalServiceClient>()
139
                  .setCoreClientNumForEachNode(conf.getCoreClientNumForEachNode())
1✔
140
                  .setMaxClientNumForEachNode(conf.getMaxClientNumForEachNode())
1✔
141
                  .build()
1✔
142
                  .getConfig());
1✔
143
      ClientManagerMetrics.getInstance()
1✔
144
          .registerClientManager(this.getClass().getSimpleName(), clientPool);
1✔
145
      return clientPool;
1✔
146
    }
147
  }
148

149
  public static class AsyncConfigNodeHeartbeatServiceClientPoolFactory
×
150
      implements IClientPoolFactory<TEndPoint, AsyncConfigNodeIServiceClient> {
151

152
    @Override
153
    public KeyedObjectPool<TEndPoint, AsyncConfigNodeIServiceClient> createClientPool(
154
        ClientManager<TEndPoint, AsyncConfigNodeIServiceClient> manager) {
155

156
      GenericKeyedObjectPool<TEndPoint, AsyncConfigNodeIServiceClient> clientPool =
×
157
          new GenericKeyedObjectPool<>(
158
              new AsyncConfigNodeIServiceClient.Factory(
159
                  manager,
160
                  new ThriftClientProperty.Builder()
161
                      .setConnectionTimeoutMs(conf.getConnectionTimeoutInMS())
×
162
                      .setRpcThriftCompressionEnabled(conf.isRpcThriftCompressionEnabled())
×
163
                      .setSelectorNumOfAsyncClientManager(conf.getSelectorNumOfClientManager())
×
164
                      .setPrintLogWhenEncounterException(false)
×
165
                      .build(),
×
166
                  ThreadName.ASYNC_CONFIGNODE_HEARTBEAT_CLIENT_POOL.getName()),
×
167
              new ClientPoolProperty.Builder<AsyncConfigNodeIServiceClient>()
168
                  .setCoreClientNumForEachNode(conf.getCoreClientNumForEachNode())
×
169
                  .setMaxClientNumForEachNode(conf.getMaxClientNumForEachNode())
×
170
                  .build()
×
171
                  .getConfig());
×
172
      ClientManagerMetrics.getInstance()
×
173
          .registerClientManager(this.getClass().getSimpleName(), clientPool);
×
174
      return clientPool;
×
175
    }
176
  }
177

178
  public static class AsyncDataNodeHeartbeatServiceClientPoolFactory
×
179
      implements IClientPoolFactory<TEndPoint, AsyncDataNodeInternalServiceClient> {
180
    @Override
181
    public KeyedObjectPool<TEndPoint, AsyncDataNodeInternalServiceClient> createClientPool(
182
        ClientManager<TEndPoint, AsyncDataNodeInternalServiceClient> manager) {
183
      GenericKeyedObjectPool<TEndPoint, AsyncDataNodeInternalServiceClient> clientPool =
×
184
          new GenericKeyedObjectPool<>(
185
              new AsyncDataNodeInternalServiceClient.Factory(
186
                  manager,
187
                  new ThriftClientProperty.Builder()
188
                      .setConnectionTimeoutMs(conf.getConnectionTimeoutInMS())
×
189
                      .setRpcThriftCompressionEnabled(conf.isRpcThriftCompressionEnabled())
×
190
                      .setSelectorNumOfAsyncClientManager(conf.getSelectorNumOfClientManager())
×
191
                      .setPrintLogWhenEncounterException(false)
×
192
                      .build(),
×
193
                  ThreadName.ASYNC_DATANODE_HEARTBEAT_CLIENT_POOL.getName()),
×
194
              new ClientPoolProperty.Builder<AsyncDataNodeInternalServiceClient>()
195
                  .setCoreClientNumForEachNode(conf.getCoreClientNumForEachNode())
×
196
                  .setMaxClientNumForEachNode(conf.getMaxClientNumForEachNode())
×
197
                  .build()
×
198
                  .getConfig());
×
199
      ClientManagerMetrics.getInstance()
×
200
          .registerClientManager(this.getClass().getSimpleName(), clientPool);
×
201
      return clientPool;
×
202
    }
203
  }
204

205
  public static class SyncDataNodeMPPDataExchangeServiceClientPoolFactory
1✔
206
      implements IClientPoolFactory<TEndPoint, SyncDataNodeMPPDataExchangeServiceClient> {
207

208
    @Override
209
    public KeyedObjectPool<TEndPoint, SyncDataNodeMPPDataExchangeServiceClient> createClientPool(
210
        ClientManager<TEndPoint, SyncDataNodeMPPDataExchangeServiceClient> manager) {
211
      GenericKeyedObjectPool<TEndPoint, SyncDataNodeMPPDataExchangeServiceClient> clientPool =
1✔
212
          new GenericKeyedObjectPool<>(
213
              new SyncDataNodeMPPDataExchangeServiceClient.Factory(
214
                  manager,
215
                  new ThriftClientProperty.Builder()
216
                      .setConnectionTimeoutMs(conf.getConnectionTimeoutInMS())
1✔
217
                      .setRpcThriftCompressionEnabled(conf.isRpcThriftCompressionEnabled())
1✔
218
                      .build()),
1✔
219
              new ClientPoolProperty.Builder<SyncDataNodeMPPDataExchangeServiceClient>()
220
                  .setCoreClientNumForEachNode(conf.getCoreClientNumForEachNode())
1✔
221
                  .setMaxClientNumForEachNode(conf.getMaxClientNumForEachNode())
1✔
222
                  .build()
1✔
223
                  .getConfig());
1✔
224
      ClientManagerMetrics.getInstance()
1✔
225
          .registerClientManager(this.getClass().getSimpleName(), clientPool);
1✔
226
      return clientPool;
1✔
227
    }
228
  }
229

230
  public static class AsyncDataNodeMPPDataExchangeServiceClientPoolFactory
×
231
      implements IClientPoolFactory<TEndPoint, AsyncDataNodeMPPDataExchangeServiceClient> {
232

233
    @Override
234
    public KeyedObjectPool<TEndPoint, AsyncDataNodeMPPDataExchangeServiceClient> createClientPool(
235
        ClientManager<TEndPoint, AsyncDataNodeMPPDataExchangeServiceClient> manager) {
236
      GenericKeyedObjectPool<TEndPoint, AsyncDataNodeMPPDataExchangeServiceClient> clientPool =
×
237
          new GenericKeyedObjectPool<>(
238
              new AsyncDataNodeMPPDataExchangeServiceClient.Factory(
239
                  manager,
240
                  new ThriftClientProperty.Builder()
241
                      .setConnectionTimeoutMs(conf.getConnectionTimeoutInMS())
×
242
                      .setRpcThriftCompressionEnabled(conf.isRpcThriftCompressionEnabled())
×
243
                      .setSelectorNumOfAsyncClientManager(conf.getSelectorNumOfClientManager())
×
244
                      .build(),
×
245
                  ThreadName.ASYNC_DATANODE_MPP_DATA_EXCHANGE_CLIENT_POOL.getName()),
×
246
              new ClientPoolProperty.Builder<AsyncDataNodeMPPDataExchangeServiceClient>()
247
                  .setCoreClientNumForEachNode(conf.getCoreClientNumForEachNode())
×
248
                  .setMaxClientNumForEachNode(conf.getMaxClientNumForEachNode())
×
249
                  .build()
×
250
                  .getConfig());
×
251
      ClientManagerMetrics.getInstance()
×
252
          .registerClientManager(this.getClass().getSimpleName(), clientPool);
×
253
      return clientPool;
×
254
    }
255
  }
256

257
  public static class AsyncPipeDataTransferServiceClientPoolFactory
1✔
258
      implements IClientPoolFactory<TEndPoint, AsyncPipeDataTransferServiceClient> {
259

260
    @Override
261
    public KeyedObjectPool<TEndPoint, AsyncPipeDataTransferServiceClient> createClientPool(
262
        ClientManager<TEndPoint, AsyncPipeDataTransferServiceClient> manager) {
263
      GenericKeyedObjectPool<TEndPoint, AsyncPipeDataTransferServiceClient> clientPool =
1✔
264
          new GenericKeyedObjectPool<>(
265
              new AsyncPipeDataTransferServiceClient.Factory(
266
                  manager,
267
                  new ThriftClientProperty.Builder()
268
                      .setConnectionTimeoutMs(conf.getConnectionTimeoutInMS())
1✔
269
                      .setRpcThriftCompressionEnabled(conf.isRpcThriftCompressionEnabled())
1✔
270
                      .setSelectorNumOfAsyncClientManager(conf.getSelectorNumOfClientManager())
1✔
271
                      .build(),
1✔
272
                  ThreadName.ASYNC_DATANODE_CLIENT_POOL.getName()),
1✔
273
              new ClientPoolProperty.Builder<AsyncPipeDataTransferServiceClient>()
274
                  .setCoreClientNumForEachNode(conf.getCoreClientNumForEachNode())
1✔
275
                  .setMaxClientNumForEachNode(conf.getMaxClientNumForEachNode())
1✔
276
                  .build()
1✔
277
                  .getConfig());
1✔
278
      ClientManagerMetrics.getInstance()
1✔
279
          .registerClientManager(this.getClass().getSimpleName(), clientPool);
1✔
280
      return clientPool;
1✔
281
    }
282
  }
283
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc