• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

apache / iotdb / #10019

07 Sep 2023 04:50AM UTC coverage: 47.489% (-0.2%) from 47.655%
#10019

push

travis_ci

web-flow
Pipe: Fix ConcurrentModificationException caused by concurrently iterating through CachedSchemaPatternMatcher.extractors when an PipeHeartbeatEvent is being assigned (#11074)

* try to fix ConcurrentModificationException when assigning PipeHeartbeatEvent

* Update CachedSchemaPatternMatcher.java

---------

Co-authored-by: 马子坤 <55695098+DanielWang2035@users.noreply.github.com>

1 of 1 new or added line in 1 file covered. (100.0%)

80551 of 169622 relevant lines covered (47.49%)

0.47 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

40.15
/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/client/ClientPoolFactory.java
1
/*
2
 * Licensed to the Apache Software Foundation (ASF) under one
3
 * or more contributor license agreements.  See the NOTICE file
4
 * distributed with this work for additional information
5
 * regarding copyright ownership.  The ASF licenses this file
6
 * to you under the Apache License, Version 2.0 (the
7
 * "License"); you may not use this file except in compliance
8
 * with the License.  You may obtain a copy of the License at
9
 *
10
 *     http://www.apache.org/licenses/LICENSE-2.0
11
 *
12
 * Unless required by applicable law or agreed to in writing,
13
 * software distributed under the License is distributed on an
14
 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15
 * KIND, either express or implied.  See the License for the
16
 * specific language governing permissions and limitations
17
 * under the License.
18
 */
19

20
package org.apache.iotdb.commons.client;
21

22
import org.apache.iotdb.common.rpc.thrift.TEndPoint;
23
import org.apache.iotdb.commons.client.async.AsyncConfigNodeIServiceClient;
24
import org.apache.iotdb.commons.client.async.AsyncDataNodeInternalServiceClient;
25
import org.apache.iotdb.commons.client.async.AsyncDataNodeMPPDataExchangeServiceClient;
26
import org.apache.iotdb.commons.client.async.AsyncPipeDataTransferServiceClient;
27
import org.apache.iotdb.commons.client.mlnode.MLNodeClient;
28
import org.apache.iotdb.commons.client.property.ClientPoolProperty;
29
import org.apache.iotdb.commons.client.property.ThriftClientProperty;
30
import org.apache.iotdb.commons.client.sync.SyncConfigNodeIServiceClient;
31
import org.apache.iotdb.commons.client.sync.SyncDataNodeInternalServiceClient;
32
import org.apache.iotdb.commons.client.sync.SyncDataNodeMPPDataExchangeServiceClient;
33
import org.apache.iotdb.commons.concurrent.ThreadName;
34
import org.apache.iotdb.commons.conf.CommonConfig;
35
import org.apache.iotdb.commons.conf.CommonDescriptor;
36

37
import org.apache.commons.pool2.KeyedObjectPool;
38
import org.apache.commons.pool2.impl.GenericKeyedObjectPool;
39

40
public class ClientPoolFactory {
41

42
  private static final CommonConfig conf = CommonDescriptor.getInstance().getConfig();
1✔
43

44
  private ClientPoolFactory() {}
45

46
  public static class SyncConfigNodeIServiceClientPoolFactory
×
47
      implements IClientPoolFactory<TEndPoint, SyncConfigNodeIServiceClient> {
48

49
    @Override
50
    public KeyedObjectPool<TEndPoint, SyncConfigNodeIServiceClient> createClientPool(
51
        ClientManager<TEndPoint, SyncConfigNodeIServiceClient> manager) {
52
      GenericKeyedObjectPool<TEndPoint, SyncConfigNodeIServiceClient> clientPool =
×
53
          new GenericKeyedObjectPool<>(
54
              new SyncConfigNodeIServiceClient.Factory(
55
                  manager,
56
                  new ThriftClientProperty.Builder()
57
                      .setConnectionTimeoutMs(conf.getConnectionTimeoutInMS())
×
58
                      .setRpcThriftCompressionEnabled(conf.isRpcThriftCompressionEnabled())
×
59
                      .build()),
×
60
              new ClientPoolProperty.Builder<SyncConfigNodeIServiceClient>()
61
                  .setCoreClientNumForEachNode(conf.getCoreClientNumForEachNode())
×
62
                  .setMaxClientNumForEachNode(conf.getMaxClientNumForEachNode())
×
63
                  .build()
×
64
                  .getConfig());
×
65
      ClientManagerMetrics.getInstance()
×
66
          .registerClientManager(this.getClass().getSimpleName(), clientPool);
×
67
      return clientPool;
×
68
    }
69
  }
70

71
  public static class AsyncConfigNodeIServiceClientPoolFactory
×
72
      implements IClientPoolFactory<TEndPoint, AsyncConfigNodeIServiceClient> {
73

74
    @Override
75
    public KeyedObjectPool<TEndPoint, AsyncConfigNodeIServiceClient> createClientPool(
76
        ClientManager<TEndPoint, AsyncConfigNodeIServiceClient> manager) {
77
      GenericKeyedObjectPool<TEndPoint, AsyncConfigNodeIServiceClient> clientPool =
×
78
          new GenericKeyedObjectPool<>(
79
              new AsyncConfigNodeIServiceClient.Factory(
80
                  manager,
81
                  new ThriftClientProperty.Builder()
82
                      .setConnectionTimeoutMs(conf.getConnectionTimeoutInMS())
×
83
                      .setRpcThriftCompressionEnabled(conf.isRpcThriftCompressionEnabled())
×
84
                      .setSelectorNumOfAsyncClientManager(conf.getSelectorNumOfClientManager())
×
85
                      .build(),
×
86
                  ThreadName.ASYNC_CONFIGNODE_CLIENT_POOL.getName()),
×
87
              new ClientPoolProperty.Builder<AsyncConfigNodeIServiceClient>()
88
                  .setCoreClientNumForEachNode(conf.getCoreClientNumForEachNode())
×
89
                  .setMaxClientNumForEachNode(conf.getMaxClientNumForEachNode())
×
90
                  .build()
×
91
                  .getConfig());
×
92
      ClientManagerMetrics.getInstance()
×
93
          .registerClientManager(this.getClass().getSimpleName(), clientPool);
×
94
      return clientPool;
×
95
    }
96
  }
97

98
  public static class SyncDataNodeInternalServiceClientPoolFactory
1✔
99
      implements IClientPoolFactory<TEndPoint, SyncDataNodeInternalServiceClient> {
100

101
    @Override
102
    public KeyedObjectPool<TEndPoint, SyncDataNodeInternalServiceClient> createClientPool(
103
        ClientManager<TEndPoint, SyncDataNodeInternalServiceClient> manager) {
104
      GenericKeyedObjectPool<TEndPoint, SyncDataNodeInternalServiceClient> clientPool =
1✔
105
          new GenericKeyedObjectPool<>(
106
              new SyncDataNodeInternalServiceClient.Factory(
107
                  manager,
108
                  new ThriftClientProperty.Builder()
109
                      .setConnectionTimeoutMs(conf.getConnectionTimeoutInMS())
1✔
110
                      .setRpcThriftCompressionEnabled(conf.isRpcThriftCompressionEnabled())
1✔
111
                      .build()),
1✔
112
              new ClientPoolProperty.Builder<SyncDataNodeInternalServiceClient>()
113
                  .setCoreClientNumForEachNode(conf.getCoreClientNumForEachNode())
1✔
114
                  .setMaxClientNumForEachNode(conf.getMaxClientNumForEachNode())
1✔
115
                  .build()
1✔
116
                  .getConfig());
1✔
117
      ClientManagerMetrics.getInstance()
1✔
118
          .registerClientManager(this.getClass().getSimpleName(), clientPool);
1✔
119
      return clientPool;
1✔
120
    }
121
  }
122

123
  public static class AsyncDataNodeInternalServiceClientPoolFactory
1✔
124
      implements IClientPoolFactory<TEndPoint, AsyncDataNodeInternalServiceClient> {
125

126
    @Override
127
    public KeyedObjectPool<TEndPoint, AsyncDataNodeInternalServiceClient> createClientPool(
128
        ClientManager<TEndPoint, AsyncDataNodeInternalServiceClient> manager) {
129
      GenericKeyedObjectPool<TEndPoint, AsyncDataNodeInternalServiceClient> clientPool =
1✔
130
          new GenericKeyedObjectPool<>(
131
              new AsyncDataNodeInternalServiceClient.Factory(
132
                  manager,
133
                  new ThriftClientProperty.Builder()
134
                      .setConnectionTimeoutMs(conf.getConnectionTimeoutInMS())
1✔
135
                      .setRpcThriftCompressionEnabled(conf.isRpcThriftCompressionEnabled())
1✔
136
                      .setSelectorNumOfAsyncClientManager(conf.getSelectorNumOfClientManager())
1✔
137
                      .build(),
1✔
138
                  ThreadName.ASYNC_DATANODE_CLIENT_POOL.getName()),
1✔
139
              new ClientPoolProperty.Builder<AsyncDataNodeInternalServiceClient>()
140
                  .setCoreClientNumForEachNode(conf.getCoreClientNumForEachNode())
1✔
141
                  .setMaxClientNumForEachNode(conf.getMaxClientNumForEachNode())
1✔
142
                  .build()
1✔
143
                  .getConfig());
1✔
144
      ClientManagerMetrics.getInstance()
1✔
145
          .registerClientManager(this.getClass().getSimpleName(), clientPool);
1✔
146
      return clientPool;
1✔
147
    }
148
  }
149

150
  public static class AsyncConfigNodeHeartbeatServiceClientPoolFactory
×
151
      implements IClientPoolFactory<TEndPoint, AsyncConfigNodeIServiceClient> {
152

153
    @Override
154
    public KeyedObjectPool<TEndPoint, AsyncConfigNodeIServiceClient> createClientPool(
155
        ClientManager<TEndPoint, AsyncConfigNodeIServiceClient> manager) {
156

157
      GenericKeyedObjectPool<TEndPoint, AsyncConfigNodeIServiceClient> clientPool =
×
158
          new GenericKeyedObjectPool<>(
159
              new AsyncConfigNodeIServiceClient.Factory(
160
                  manager,
161
                  new ThriftClientProperty.Builder()
162
                      .setConnectionTimeoutMs(conf.getConnectionTimeoutInMS())
×
163
                      .setRpcThriftCompressionEnabled(conf.isRpcThriftCompressionEnabled())
×
164
                      .setSelectorNumOfAsyncClientManager(conf.getSelectorNumOfClientManager())
×
165
                      .setPrintLogWhenEncounterException(false)
×
166
                      .build(),
×
167
                  ThreadName.ASYNC_CONFIGNODE_HEARTBEAT_CLIENT_POOL.getName()),
×
168
              new ClientPoolProperty.Builder<AsyncConfigNodeIServiceClient>()
169
                  .setCoreClientNumForEachNode(conf.getCoreClientNumForEachNode())
×
170
                  .setMaxClientNumForEachNode(conf.getMaxClientNumForEachNode())
×
171
                  .build()
×
172
                  .getConfig());
×
173
      ClientManagerMetrics.getInstance()
×
174
          .registerClientManager(this.getClass().getSimpleName(), clientPool);
×
175
      return clientPool;
×
176
    }
177
  }
178

179
  public static class AsyncDataNodeHeartbeatServiceClientPoolFactory
×
180
      implements IClientPoolFactory<TEndPoint, AsyncDataNodeInternalServiceClient> {
181
    @Override
182
    public KeyedObjectPool<TEndPoint, AsyncDataNodeInternalServiceClient> createClientPool(
183
        ClientManager<TEndPoint, AsyncDataNodeInternalServiceClient> manager) {
184
      GenericKeyedObjectPool<TEndPoint, AsyncDataNodeInternalServiceClient> clientPool =
×
185
          new GenericKeyedObjectPool<>(
186
              new AsyncDataNodeInternalServiceClient.Factory(
187
                  manager,
188
                  new ThriftClientProperty.Builder()
189
                      .setConnectionTimeoutMs(conf.getConnectionTimeoutInMS())
×
190
                      .setRpcThriftCompressionEnabled(conf.isRpcThriftCompressionEnabled())
×
191
                      .setSelectorNumOfAsyncClientManager(conf.getSelectorNumOfClientManager())
×
192
                      .setPrintLogWhenEncounterException(false)
×
193
                      .build(),
×
194
                  ThreadName.ASYNC_DATANODE_HEARTBEAT_CLIENT_POOL.getName()),
×
195
              new ClientPoolProperty.Builder<AsyncDataNodeInternalServiceClient>()
196
                  .setCoreClientNumForEachNode(conf.getCoreClientNumForEachNode())
×
197
                  .setMaxClientNumForEachNode(conf.getMaxClientNumForEachNode())
×
198
                  .build()
×
199
                  .getConfig());
×
200
      ClientManagerMetrics.getInstance()
×
201
          .registerClientManager(this.getClass().getSimpleName(), clientPool);
×
202
      return clientPool;
×
203
    }
204
  }
205

206
  public static class SyncDataNodeMPPDataExchangeServiceClientPoolFactory
1✔
207
      implements IClientPoolFactory<TEndPoint, SyncDataNodeMPPDataExchangeServiceClient> {
208

209
    @Override
210
    public KeyedObjectPool<TEndPoint, SyncDataNodeMPPDataExchangeServiceClient> createClientPool(
211
        ClientManager<TEndPoint, SyncDataNodeMPPDataExchangeServiceClient> manager) {
212
      GenericKeyedObjectPool<TEndPoint, SyncDataNodeMPPDataExchangeServiceClient> clientPool =
1✔
213
          new GenericKeyedObjectPool<>(
214
              new SyncDataNodeMPPDataExchangeServiceClient.Factory(
215
                  manager,
216
                  new ThriftClientProperty.Builder()
217
                      .setConnectionTimeoutMs(conf.getConnectionTimeoutInMS())
1✔
218
                      .setRpcThriftCompressionEnabled(conf.isRpcThriftCompressionEnabled())
1✔
219
                      .build()),
1✔
220
              new ClientPoolProperty.Builder<SyncDataNodeMPPDataExchangeServiceClient>()
221
                  .setCoreClientNumForEachNode(conf.getCoreClientNumForEachNode())
1✔
222
                  .setMaxClientNumForEachNode(conf.getMaxClientNumForEachNode())
1✔
223
                  .build()
1✔
224
                  .getConfig());
1✔
225
      ClientManagerMetrics.getInstance()
1✔
226
          .registerClientManager(this.getClass().getSimpleName(), clientPool);
1✔
227
      return clientPool;
1✔
228
    }
229
  }
230

231
  public static class AsyncDataNodeMPPDataExchangeServiceClientPoolFactory
×
232
      implements IClientPoolFactory<TEndPoint, AsyncDataNodeMPPDataExchangeServiceClient> {
233

234
    @Override
235
    public KeyedObjectPool<TEndPoint, AsyncDataNodeMPPDataExchangeServiceClient> createClientPool(
236
        ClientManager<TEndPoint, AsyncDataNodeMPPDataExchangeServiceClient> manager) {
237
      GenericKeyedObjectPool<TEndPoint, AsyncDataNodeMPPDataExchangeServiceClient> clientPool =
×
238
          new GenericKeyedObjectPool<>(
239
              new AsyncDataNodeMPPDataExchangeServiceClient.Factory(
240
                  manager,
241
                  new ThriftClientProperty.Builder()
242
                      .setConnectionTimeoutMs(conf.getConnectionTimeoutInMS())
×
243
                      .setRpcThriftCompressionEnabled(conf.isRpcThriftCompressionEnabled())
×
244
                      .setSelectorNumOfAsyncClientManager(conf.getSelectorNumOfClientManager())
×
245
                      .build(),
×
246
                  ThreadName.ASYNC_DATANODE_MPP_DATA_EXCHANGE_CLIENT_POOL.getName()),
×
247
              new ClientPoolProperty.Builder<AsyncDataNodeMPPDataExchangeServiceClient>()
248
                  .setCoreClientNumForEachNode(conf.getCoreClientNumForEachNode())
×
249
                  .setMaxClientNumForEachNode(conf.getMaxClientNumForEachNode())
×
250
                  .build()
×
251
                  .getConfig());
×
252
      ClientManagerMetrics.getInstance()
×
253
          .registerClientManager(this.getClass().getSimpleName(), clientPool);
×
254
      return clientPool;
×
255
    }
256
  }
257

258
  public static class AsyncPipeDataTransferServiceClientPoolFactory
1✔
259
      implements IClientPoolFactory<TEndPoint, AsyncPipeDataTransferServiceClient> {
260

261
    @Override
262
    public KeyedObjectPool<TEndPoint, AsyncPipeDataTransferServiceClient> createClientPool(
263
        ClientManager<TEndPoint, AsyncPipeDataTransferServiceClient> manager) {
264
      final GenericKeyedObjectPool<TEndPoint, AsyncPipeDataTransferServiceClient> clientPool =
1✔
265
          new GenericKeyedObjectPool<>(
266
              new AsyncPipeDataTransferServiceClient.Factory(
267
                  manager,
268
                  new ThriftClientProperty.Builder()
269
                      .setConnectionTimeoutMs((int) conf.getPipeConnectorTimeoutMs())
1✔
270
                      .setRpcThriftCompressionEnabled(
1✔
271
                          conf.isPipeConnectorRPCThriftCompressionEnabled())
1✔
272
                      .setSelectorNumOfAsyncClientManager(
1✔
273
                          conf.getPipeAsyncConnectorSelectorNumber())
1✔
274
                      .build(),
1✔
275
                  ThreadName.PIPE_ASYNC_CONNECTOR_CLIENT_POOL.getName()),
1✔
276
              new ClientPoolProperty.Builder<AsyncPipeDataTransferServiceClient>()
277
                  .setCoreClientNumForEachNode(conf.getPipeAsyncConnectorCoreClientNumber())
1✔
278
                  .setMaxClientNumForEachNode(conf.getPipeAsyncConnectorMaxClientNumber())
1✔
279
                  .build()
1✔
280
                  .getConfig());
1✔
281
      ClientManagerMetrics.getInstance()
1✔
282
          .registerClientManager(this.getClass().getSimpleName(), clientPool);
1✔
283
      return clientPool;
1✔
284
    }
285
  }
286

287
  public static class MLNodeClientPoolFactory
×
288
      implements IClientPoolFactory<TEndPoint, MLNodeClient> {
289

290
    @Override
291
    public KeyedObjectPool<TEndPoint, MLNodeClient> createClientPool(
292
        ClientManager<TEndPoint, MLNodeClient> manager) {
293
      GenericKeyedObjectPool<TEndPoint, MLNodeClient> clientPool =
×
294
          new GenericKeyedObjectPool<>(
295
              new MLNodeClient.Factory(
296
                  manager,
297
                  new ThriftClientProperty.Builder()
298
                      .setConnectionTimeoutMs(conf.getConnectionTimeoutInMS())
×
299
                      .setRpcThriftCompressionEnabled(true)
×
300
                      .build()),
×
301
              new ClientPoolProperty.Builder<MLNodeClient>()
302
                  .setCoreClientNumForEachNode(conf.getCoreClientNumForEachNode())
×
303
                  .setMaxClientNumForEachNode(conf.getMaxClientNumForEachNode())
×
304
                  .build()
×
305
                  .getConfig());
×
306
      ClientManagerMetrics.getInstance()
×
307
          .registerClientManager(this.getClass().getSimpleName(), clientPool);
×
308
      return clientPool;
×
309
    }
310
  }
311
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc