• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

apache / iotdb / #10019

07 Sep 2023 04:50AM UTC coverage: 47.489% (-0.2%) from 47.655%
#10019

push

travis_ci

web-flow
Pipe: Fix ConcurrentModificationException caused by concurrently iterating through CachedSchemaPatternMatcher.extractors when an PipeHeartbeatEvent is being assigned (#11074)

* try to fix ConcurrentModificationException when assigning PipeHeartbeatEvent

* Update CachedSchemaPatternMatcher.java

---------

Co-authored-by: 马子坤 <55695098+DanielWang2035@users.noreply.github.com>

1 of 1 new or added line in 1 file covered. (100.0%)

80551 of 169622 relevant lines covered (47.49%)

0.47 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

7.53
/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/client/ConfigNodeClient.java
1
/*
2
 * Licensed to the Apache Software Foundation (ASF) under one
3
 * or more contributor license agreements.  See the NOTICE file
4
 * distributed with this work for additional information
5
 * regarding copyright ownership.  The ASF licenses this file
6
 * to you under the Apache License, Version 2.0 (the
7
 * "License"); you may not use this file except in compliance
8
 * with the License.  You may obtain a copy of the License at
9
 *
10
 *     http://www.apache.org/licenses/LICENSE-2.0
11
 *
12
 * Unless required by applicable law or agreed to in writing,
13
 * software distributed under the License is distributed on an
14
 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15
 * KIND, either express or implied.  See the License for the
16
 * specific language governing permissions and limitations
17
 * under the License.
18
 */
19

20
package org.apache.iotdb.db.protocol.client;
21

22
import org.apache.iotdb.common.rpc.thrift.TConfigNodeLocation;
23
import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation;
24
import org.apache.iotdb.common.rpc.thrift.TEndPoint;
25
import org.apache.iotdb.common.rpc.thrift.TFlushReq;
26
import org.apache.iotdb.common.rpc.thrift.TSStatus;
27
import org.apache.iotdb.common.rpc.thrift.TSetSpaceQuotaReq;
28
import org.apache.iotdb.common.rpc.thrift.TSetTTLReq;
29
import org.apache.iotdb.common.rpc.thrift.TSetThrottleQuotaReq;
30
import org.apache.iotdb.commons.client.ClientManager;
31
import org.apache.iotdb.commons.client.ThriftClient;
32
import org.apache.iotdb.commons.client.factory.ThriftClientFactory;
33
import org.apache.iotdb.commons.client.property.ThriftClientProperty;
34
import org.apache.iotdb.commons.client.sync.SyncThriftClientWithErrorHandler;
35
import org.apache.iotdb.commons.consensus.ConfigRegionId;
36
import org.apache.iotdb.confignode.rpc.thrift.IConfigNodeRPCService;
37
import org.apache.iotdb.confignode.rpc.thrift.TAddConsensusGroupReq;
38
import org.apache.iotdb.confignode.rpc.thrift.TAlterLogicalViewReq;
39
import org.apache.iotdb.confignode.rpc.thrift.TAlterSchemaTemplateReq;
40
import org.apache.iotdb.confignode.rpc.thrift.TAuthorizerReq;
41
import org.apache.iotdb.confignode.rpc.thrift.TAuthorizerResp;
42
import org.apache.iotdb.confignode.rpc.thrift.TCheckUserPrivilegesReq;
43
import org.apache.iotdb.confignode.rpc.thrift.TConfigNodeRegisterReq;
44
import org.apache.iotdb.confignode.rpc.thrift.TConfigNodeRegisterResp;
45
import org.apache.iotdb.confignode.rpc.thrift.TCountDatabaseResp;
46
import org.apache.iotdb.confignode.rpc.thrift.TCountTimeSlotListReq;
47
import org.apache.iotdb.confignode.rpc.thrift.TCountTimeSlotListResp;
48
import org.apache.iotdb.confignode.rpc.thrift.TCreateCQReq;
49
import org.apache.iotdb.confignode.rpc.thrift.TCreateFunctionReq;
50
import org.apache.iotdb.confignode.rpc.thrift.TCreateModelReq;
51
import org.apache.iotdb.confignode.rpc.thrift.TCreatePipePluginReq;
52
import org.apache.iotdb.confignode.rpc.thrift.TCreatePipeReq;
53
import org.apache.iotdb.confignode.rpc.thrift.TCreateSchemaTemplateReq;
54
import org.apache.iotdb.confignode.rpc.thrift.TCreateTriggerReq;
55
import org.apache.iotdb.confignode.rpc.thrift.TDataNodeConfigurationResp;
56
import org.apache.iotdb.confignode.rpc.thrift.TDataNodeRegisterReq;
57
import org.apache.iotdb.confignode.rpc.thrift.TDataNodeRegisterResp;
58
import org.apache.iotdb.confignode.rpc.thrift.TDataNodeRemoveReq;
59
import org.apache.iotdb.confignode.rpc.thrift.TDataNodeRemoveResp;
60
import org.apache.iotdb.confignode.rpc.thrift.TDataNodeRestartReq;
61
import org.apache.iotdb.confignode.rpc.thrift.TDataNodeRestartResp;
62
import org.apache.iotdb.confignode.rpc.thrift.TDataPartitionReq;
63
import org.apache.iotdb.confignode.rpc.thrift.TDataPartitionTableResp;
64
import org.apache.iotdb.confignode.rpc.thrift.TDatabaseSchema;
65
import org.apache.iotdb.confignode.rpc.thrift.TDatabaseSchemaResp;
66
import org.apache.iotdb.confignode.rpc.thrift.TDeactivateSchemaTemplateReq;
67
import org.apache.iotdb.confignode.rpc.thrift.TDeleteDatabaseReq;
68
import org.apache.iotdb.confignode.rpc.thrift.TDeleteDatabasesReq;
69
import org.apache.iotdb.confignode.rpc.thrift.TDeleteLogicalViewReq;
70
import org.apache.iotdb.confignode.rpc.thrift.TDeleteTimeSeriesReq;
71
import org.apache.iotdb.confignode.rpc.thrift.TDropCQReq;
72
import org.apache.iotdb.confignode.rpc.thrift.TDropFunctionReq;
73
import org.apache.iotdb.confignode.rpc.thrift.TDropModelReq;
74
import org.apache.iotdb.confignode.rpc.thrift.TDropPipePluginReq;
75
import org.apache.iotdb.confignode.rpc.thrift.TDropPipeSinkReq;
76
import org.apache.iotdb.confignode.rpc.thrift.TDropTriggerReq;
77
import org.apache.iotdb.confignode.rpc.thrift.TGetAllPipeInfoResp;
78
import org.apache.iotdb.confignode.rpc.thrift.TGetAllTemplatesResp;
79
import org.apache.iotdb.confignode.rpc.thrift.TGetDataNodeLocationsResp;
80
import org.apache.iotdb.confignode.rpc.thrift.TGetJarInListReq;
81
import org.apache.iotdb.confignode.rpc.thrift.TGetJarInListResp;
82
import org.apache.iotdb.confignode.rpc.thrift.TGetLocationForTriggerResp;
83
import org.apache.iotdb.confignode.rpc.thrift.TGetModelInfoReq;
84
import org.apache.iotdb.confignode.rpc.thrift.TGetModelInfoResp;
85
import org.apache.iotdb.confignode.rpc.thrift.TGetPathsSetTemplatesResp;
86
import org.apache.iotdb.confignode.rpc.thrift.TGetPipePluginTableResp;
87
import org.apache.iotdb.confignode.rpc.thrift.TGetPipeSinkReq;
88
import org.apache.iotdb.confignode.rpc.thrift.TGetPipeSinkResp;
89
import org.apache.iotdb.confignode.rpc.thrift.TGetRegionIdReq;
90
import org.apache.iotdb.confignode.rpc.thrift.TGetRegionIdResp;
91
import org.apache.iotdb.confignode.rpc.thrift.TGetSeriesSlotListReq;
92
import org.apache.iotdb.confignode.rpc.thrift.TGetSeriesSlotListResp;
93
import org.apache.iotdb.confignode.rpc.thrift.TGetTemplateResp;
94
import org.apache.iotdb.confignode.rpc.thrift.TGetTimeSlotListReq;
95
import org.apache.iotdb.confignode.rpc.thrift.TGetTimeSlotListResp;
96
import org.apache.iotdb.confignode.rpc.thrift.TGetTriggerTableResp;
97
import org.apache.iotdb.confignode.rpc.thrift.TGetUDFTableResp;
98
import org.apache.iotdb.confignode.rpc.thrift.TLoginReq;
99
import org.apache.iotdb.confignode.rpc.thrift.TMigrateRegionReq;
100
import org.apache.iotdb.confignode.rpc.thrift.TPermissionInfoResp;
101
import org.apache.iotdb.confignode.rpc.thrift.TPipeSinkInfo;
102
import org.apache.iotdb.confignode.rpc.thrift.TRegionMigrateResultReportReq;
103
import org.apache.iotdb.confignode.rpc.thrift.TRegionRouteMapResp;
104
import org.apache.iotdb.confignode.rpc.thrift.TSchemaNodeManagementReq;
105
import org.apache.iotdb.confignode.rpc.thrift.TSchemaNodeManagementResp;
106
import org.apache.iotdb.confignode.rpc.thrift.TSchemaPartitionReq;
107
import org.apache.iotdb.confignode.rpc.thrift.TSchemaPartitionTableResp;
108
import org.apache.iotdb.confignode.rpc.thrift.TSetDataNodeStatusReq;
109
import org.apache.iotdb.confignode.rpc.thrift.TSetDataReplicationFactorReq;
110
import org.apache.iotdb.confignode.rpc.thrift.TSetSchemaReplicationFactorReq;
111
import org.apache.iotdb.confignode.rpc.thrift.TSetSchemaTemplateReq;
112
import org.apache.iotdb.confignode.rpc.thrift.TSetTimePartitionIntervalReq;
113
import org.apache.iotdb.confignode.rpc.thrift.TShowCQResp;
114
import org.apache.iotdb.confignode.rpc.thrift.TShowClusterResp;
115
import org.apache.iotdb.confignode.rpc.thrift.TShowConfigNodesResp;
116
import org.apache.iotdb.confignode.rpc.thrift.TShowDataNodesResp;
117
import org.apache.iotdb.confignode.rpc.thrift.TShowDatabaseResp;
118
import org.apache.iotdb.confignode.rpc.thrift.TShowModelReq;
119
import org.apache.iotdb.confignode.rpc.thrift.TShowModelResp;
120
import org.apache.iotdb.confignode.rpc.thrift.TShowPipeReq;
121
import org.apache.iotdb.confignode.rpc.thrift.TShowPipeResp;
122
import org.apache.iotdb.confignode.rpc.thrift.TShowRegionReq;
123
import org.apache.iotdb.confignode.rpc.thrift.TShowRegionResp;
124
import org.apache.iotdb.confignode.rpc.thrift.TShowThrottleReq;
125
import org.apache.iotdb.confignode.rpc.thrift.TShowTrialReq;
126
import org.apache.iotdb.confignode.rpc.thrift.TShowTrialResp;
127
import org.apache.iotdb.confignode.rpc.thrift.TShowVariablesResp;
128
import org.apache.iotdb.confignode.rpc.thrift.TSpaceQuotaResp;
129
import org.apache.iotdb.confignode.rpc.thrift.TSystemConfigurationResp;
130
import org.apache.iotdb.confignode.rpc.thrift.TThrottleQuotaResp;
131
import org.apache.iotdb.confignode.rpc.thrift.TUnsetSchemaTemplateReq;
132
import org.apache.iotdb.confignode.rpc.thrift.TUpdateModelInfoReq;
133
import org.apache.iotdb.confignode.rpc.thrift.TUpdateModelStateReq;
134
import org.apache.iotdb.db.conf.IoTDBConfig;
135
import org.apache.iotdb.db.conf.IoTDBDescriptor;
136
import org.apache.iotdb.rpc.RpcTransportFactory;
137
import org.apache.iotdb.rpc.TSStatusCode;
138

139
import org.apache.commons.pool2.PooledObject;
140
import org.apache.commons.pool2.impl.DefaultPooledObject;
141
import org.apache.thrift.TException;
142
import org.apache.thrift.transport.TTransport;
143
import org.apache.thrift.transport.TTransportException;
144
import org.slf4j.Logger;
145
import org.slf4j.LoggerFactory;
146

147
import java.util.ArrayList;
148
import java.util.List;
149
import java.util.Optional;
150
import java.util.function.Predicate;
151

152
public class ConfigNodeClient implements IConfigNodeRPCService.Iface, ThriftClient, AutoCloseable {
153

154
  private static final Logger logger = LoggerFactory.getLogger(ConfigNodeClient.class);
1✔
155

156
  private static final int RETRY_NUM = 5;
157

158
  public static final String MSG_RECONNECTION_FAIL =
159
      "Fail to connect to any config node. Please check status of ConfigNodes or logs of connected DataNode";
160

161
  private static final String MSG_RECONNECTION_DATANODE_FAIL =
162
      "Failed to connect to ConfigNode %s from DataNode %s when executing %s, Exception:";
163
  private static final int RETRY_INTERVAL_MS = 2000;
164

165
  private final ThriftClientProperty property;
166

167
  private IConfigNodeRPCService.Iface client;
168

169
  private TTransport transport;
170

171
  private TEndPoint configLeader;
172

173
  private List<TEndPoint> configNodes;
174

175
  private TEndPoint configNode;
176

177
  private int cursor = 0;
1✔
178

179
  private final IoTDBConfig config = IoTDBDescriptor.getInstance().getConfig();
1✔
180

181
  ClientManager<ConfigRegionId, ConfigNodeClient> clientManager;
182

183
  ConfigRegionId configRegionId = ConfigNodeInfo.CONFIG_REGION_ID;
1✔
184

185
  public ConfigNodeClient(
186
      List<TEndPoint> configNodes,
187
      ThriftClientProperty property,
188
      ClientManager<ConfigRegionId, ConfigNodeClient> clientManager)
189
      throws TException {
1✔
190
    this.configNodes = configNodes;
1✔
191
    this.property = property;
1✔
192
    this.clientManager = clientManager;
1✔
193

194
    init();
×
195
  }
×
196

197
  public void init() throws TException {
198
    try {
199
      tryToConnect();
×
200
    } catch (TException e) {
1✔
201
      // Can not connect to each config node
202
      syncLatestConfigNodeList();
1✔
203
      tryToConnect();
×
204
    }
×
205
  }
×
206

207
  public void connect(TEndPoint endpoint) throws TException {
208
    try {
209
      transport =
×
210
          RpcTransportFactory.INSTANCE.getTransport(
×
211
              // As there is a try-catch already, we do not need to use TSocket.wrap
212
              endpoint.getIp(), endpoint.getPort(), property.getConnectionTimeoutMs());
×
213
      if (!transport.isOpen()) {
×
214
        transport.open();
×
215
      }
216
      configNode = endpoint;
×
217
    } catch (TTransportException e) {
×
218
      throw new TException(e);
×
219
    }
×
220

221
    client = new IConfigNodeRPCService.Client(property.getProtocolFactory().getProtocol(transport));
×
222
  }
×
223

224
  private void waitAndReconnect() throws TException {
225
    try {
226
      // Wait to start the next try
227
      Thread.sleep(RETRY_INTERVAL_MS);
×
228
    } catch (InterruptedException e) {
×
229
      Thread.currentThread().interrupt();
×
230
      throw new TException(
×
231
          "Unexpected interruption when waiting to retry to connect to ConfigNode");
232
    }
×
233

234
    try {
235
      tryToConnect();
×
236
    } catch (TException e) {
×
237
      // Can not connect to each config node
238
      syncLatestConfigNodeList();
×
239
      tryToConnect();
×
240
    }
×
241
  }
×
242

243
  private void tryToConnect() throws TException {
244
    if (configLeader != null) {
1✔
245
      try {
246
        connect(configLeader);
×
247
        return;
×
248
      } catch (TException e) {
×
249
        logger.warn("The current node may have been down {},try next node", configLeader);
×
250
        configLeader = null;
×
251
      }
252
    }
253

254
    if (transport != null) {
1✔
255
      transport.close();
×
256
    }
257

258
    for (int tryHostNum = 0; tryHostNum < configNodes.size(); tryHostNum++) {
1✔
259
      cursor = (cursor + 1) % configNodes.size();
×
260
      TEndPoint tryEndpoint = configNodes.get(cursor);
×
261

262
      try {
263
        connect(tryEndpoint);
×
264
        return;
×
265
      } catch (TException e) {
×
266
        logger.warn("The current node may have been down {},try next node", tryEndpoint);
×
267
      }
268
    }
269

270
    throw new TException(MSG_RECONNECTION_FAIL);
1✔
271
  }
272

273
  public TTransport getTransport() {
274
    return transport;
×
275
  }
276

277
  public void syncLatestConfigNodeList() {
278
    configNodes = ConfigNodeInfo.getInstance().getLatestConfigNodes();
1✔
279
    cursor = 0;
1✔
280
  }
1✔
281

282
  @Override
283
  public void close() {
284
    clientManager.returnClient(configRegionId, this);
×
285
  }
×
286

287
  @Override
288
  public void invalidate() {
289
    Optional.ofNullable(transport).ifPresent(TTransport::close);
1✔
290
  }
1✔
291

292
  @Override
293
  public void invalidateAll() {
294
    clientManager.clear(ConfigNodeInfo.CONFIG_REGION_ID);
×
295
  }
×
296

297
  @Override
298
  public boolean printLogWhenEncounterException() {
299
    return property.isPrintLogWhenEncounterException();
×
300
  }
301

302
  private boolean updateConfigNodeLeader(TSStatus status) {
303
    if (status.getCode() == TSStatusCode.REDIRECTION_RECOMMEND.getStatusCode()) {
×
304
      if (status.isSetRedirectNode()) {
×
305
        configLeader =
×
306
            new TEndPoint(status.getRedirectNode().getIp(), status.getRedirectNode().getPort());
×
307
      } else {
308
        configLeader = null;
×
309
      }
310
      logger.warn(
×
311
          "Failed to connect to ConfigNode {} from DataNode {}, because the current node is not "
312
              + "leader or not ready yet, will try again later",
313
          configNode,
314
          config.getAddressAndPort());
×
315
      return true;
×
316
    }
317
    return false;
×
318
  }
319

320
  /**
321
   * The frame of execute RPC, include logic of retry and exception handling.
322
   *
323
   * @param call which rpc should call
324
   * @param check check the rpc's result
325
   * @return rpc's result
326
   * @param <T> the type of rpc result
327
   * @throws TException if fails more than RETRY_NUM times, throw TException(MSG_RECONNECTION_FAIL)
328
   */
329
  private <T> T executeRemoteCallWithRetry(Operation<T> call, Predicate<T> check)
330
      throws TException {
331
    for (int i = 0; i < RETRY_NUM; i++) {
×
332
      try {
333
        T result = call.execute();
×
334
        if (check.test(result)) {
×
335
          return result;
×
336
        }
337
      } catch (TException e) {
×
338
        String message =
×
339
            String.format(
×
340
                MSG_RECONNECTION_DATANODE_FAIL,
341
                configNode,
342
                config.getAddressAndPort(),
×
343
                Thread.currentThread().getStackTrace()[2].getMethodName());
×
344
        logger.warn(message, e);
×
345
        configLeader = null;
×
346
      }
×
347
      waitAndReconnect();
×
348
    }
349
    throw new TException(MSG_RECONNECTION_FAIL);
×
350
  }
351

352
  @FunctionalInterface
353
  private interface Operation<T> {
354
    T execute() throws TException;
355
  }
356

357
  @Override
358
  public TSystemConfigurationResp getSystemConfiguration() throws TException {
359
    return executeRemoteCallWithRetry(
×
360
        () -> client.getSystemConfiguration(), resp -> !updateConfigNodeLeader(resp.status));
×
361
  }
362

363
  @Override
364
  public TDataNodeRegisterResp registerDataNode(TDataNodeRegisterReq req) throws TException {
365
    for (int i = 0; i < RETRY_NUM; i++) {
×
366
      try {
367
        TDataNodeRegisterResp resp = client.registerDataNode(req);
×
368

369
        if (!updateConfigNodeLeader(resp.status)) {
×
370
          return resp;
×
371
        }
372

373
        // set latest config node list
374
        List<TEndPoint> newConfigNodes = new ArrayList<>();
×
375
        for (TConfigNodeLocation configNodeLocation : resp.getConfigNodeList()) {
×
376
          newConfigNodes.add(configNodeLocation.getInternalEndPoint());
×
377
        }
×
378
        configNodes = newConfigNodes;
×
379
      } catch (TException e) {
×
380
        String message =
×
381
            String.format(
×
382
                MSG_RECONNECTION_DATANODE_FAIL,
383
                configNode,
384
                config.getAddressAndPort(),
×
385
                Thread.currentThread().getStackTrace()[1].getMethodName());
×
386
        logger.warn(message, e);
×
387
        configLeader = null;
×
388
      }
×
389
      waitAndReconnect();
×
390
    }
391
    throw new TException(MSG_RECONNECTION_FAIL);
×
392
  }
393

394
  @Override
395
  public TDataNodeRestartResp restartDataNode(TDataNodeRestartReq req) throws TException {
396
    return executeRemoteCallWithRetry(
×
397
        () -> client.restartDataNode(req), resp -> !updateConfigNodeLeader(resp.status));
×
398
  }
399

400
  @Override
401
  public TDataNodeRemoveResp removeDataNode(TDataNodeRemoveReq req) throws TException {
402
    return executeRemoteCallWithRetry(
×
403
        () -> client.removeDataNode(req), resp -> !updateConfigNodeLeader(resp.status));
×
404
  }
405

406
  @Override
407
  public TSStatus reportDataNodeShutdown(TDataNodeLocation dataNodeLocation) throws TException {
408
    return executeRemoteCallWithRetry(
×
409
        () -> client.reportDataNodeShutdown(dataNodeLocation),
×
410
        status -> !updateConfigNodeLeader(status));
×
411
  }
412

413
  @Override
414
  public TDataNodeConfigurationResp getDataNodeConfiguration(int dataNodeId) throws TException {
415
    return executeRemoteCallWithRetry(
×
416
        () -> client.getDataNodeConfiguration(dataNodeId),
×
417
        resp -> !updateConfigNodeLeader(resp.status));
×
418
  }
419

420
  @Override
421
  public TSStatus reportRegionMigrateResult(TRegionMigrateResultReportReq req) throws TException {
422
    return executeRemoteCallWithRetry(
×
423
        () -> client.reportRegionMigrateResult(req), status -> !updateConfigNodeLeader(status));
×
424
  }
425

426
  @Override
427
  public TShowClusterResp showCluster() throws TException {
428
    return executeRemoteCallWithRetry(
×
429
        () -> client.showCluster(), resp -> !updateConfigNodeLeader(resp.status));
×
430
  }
431

432
  @Override
433
  public TShowVariablesResp showVariables() throws TException {
434
    return executeRemoteCallWithRetry(
×
435
        () -> client.showVariables(), resp -> !updateConfigNodeLeader(resp.status));
×
436
  }
437

438
  @Override
439
  public TSStatus setDatabase(TDatabaseSchema databaseSchema) throws TException {
440
    return executeRemoteCallWithRetry(
×
441
        () -> client.setDatabase(databaseSchema), status -> !updateConfigNodeLeader(status));
×
442
  }
443

444
  @Override
445
  public TSStatus alterDatabase(TDatabaseSchema databaseSchema) throws TException {
446
    return executeRemoteCallWithRetry(
×
447
        () -> client.alterDatabase(databaseSchema), status -> !updateConfigNodeLeader(status));
×
448
  }
449

450
  @Override
451
  public TSStatus deleteDatabase(TDeleteDatabaseReq req) throws TException {
452
    return executeRemoteCallWithRetry(
×
453
        () -> client.deleteDatabase(req), status -> !updateConfigNodeLeader(status));
×
454
  }
455

456
  @Override
457
  public TSStatus deleteDatabases(TDeleteDatabasesReq req) throws TException {
458
    return executeRemoteCallWithRetry(
×
459
        () -> client.deleteDatabases(req), status -> !updateConfigNodeLeader(status));
×
460
  }
461

462
  @Override
463
  public TCountDatabaseResp countMatchedDatabases(List<String> storageGroupPathPattern)
464
      throws TException {
465
    return executeRemoteCallWithRetry(
×
466
        () -> client.countMatchedDatabases(storageGroupPathPattern),
×
467
        resp -> !updateConfigNodeLeader(resp.status));
×
468
  }
469

470
  @Override
471
  public TDatabaseSchemaResp getMatchedDatabaseSchemas(List<String> storageGroupPathPattern)
472
      throws TException {
473
    return executeRemoteCallWithRetry(
×
474
        () -> client.getMatchedDatabaseSchemas(storageGroupPathPattern),
×
475
        resp -> !updateConfigNodeLeader(resp.status));
×
476
  }
477

478
  @Override
479
  public TSStatus setTTL(TSetTTLReq setTTLReq) throws TException {
480
    return executeRemoteCallWithRetry(
×
481
        () -> client.setTTL(setTTLReq), status -> !updateConfigNodeLeader(status));
×
482
  }
483

484
  @Override
485
  public TSStatus setSchemaReplicationFactor(TSetSchemaReplicationFactorReq req) throws TException {
486
    return executeRemoteCallWithRetry(
×
487
        () -> client.setSchemaReplicationFactor(req), status -> !updateConfigNodeLeader(status));
×
488
  }
489

490
  @Override
491
  public TSStatus setDataReplicationFactor(TSetDataReplicationFactorReq req) throws TException {
492
    return executeRemoteCallWithRetry(
×
493
        () -> client.setDataReplicationFactor(req), status -> !updateConfigNodeLeader(status));
×
494
  }
495

496
  @Override
497
  public TSStatus setTimePartitionInterval(TSetTimePartitionIntervalReq req) throws TException {
498
    return executeRemoteCallWithRetry(
×
499
        () -> client.setTimePartitionInterval(req), status -> !updateConfigNodeLeader(status));
×
500
  }
501

502
  @Override
503
  public TSchemaPartitionTableResp getSchemaPartitionTable(TSchemaPartitionReq req)
504
      throws TException {
505
    return executeRemoteCallWithRetry(
×
506
        () -> client.getSchemaPartitionTable(req), resp -> !updateConfigNodeLeader(resp.status));
×
507
  }
508

509
  @Override
510
  public TSchemaPartitionTableResp getOrCreateSchemaPartitionTable(TSchemaPartitionReq req)
511
      throws TException {
512
    return executeRemoteCallWithRetry(
×
513
        () -> client.getOrCreateSchemaPartitionTable(req),
×
514
        resp -> !updateConfigNodeLeader(resp.status));
×
515
  }
516

517
  @Override
518
  public TSchemaNodeManagementResp getSchemaNodeManagementPartition(TSchemaNodeManagementReq req)
519
      throws TException {
520
    return executeRemoteCallWithRetry(
×
521
        () -> client.getSchemaNodeManagementPartition(req),
×
522
        resp -> !updateConfigNodeLeader(resp.status));
×
523
  }
524

525
  @Override
526
  public TDataPartitionTableResp getDataPartitionTable(TDataPartitionReq req) throws TException {
527
    return executeRemoteCallWithRetry(
×
528
        () -> client.getDataPartitionTable(req), resp -> !updateConfigNodeLeader(resp.status));
×
529
  }
530

531
  @Override
532
  public TDataPartitionTableResp getOrCreateDataPartitionTable(TDataPartitionReq req)
533
      throws TException {
534
    return executeRemoteCallWithRetry(
×
535
        () -> client.getOrCreateDataPartitionTable(req),
×
536
        resp -> !updateConfigNodeLeader(resp.status));
×
537
  }
538

539
  @Override
540
  public TSStatus operatePermission(TAuthorizerReq req) throws TException {
541
    return executeRemoteCallWithRetry(
×
542
        () -> client.operatePermission(req), status -> !updateConfigNodeLeader(status));
×
543
  }
544

545
  @Override
546
  public TAuthorizerResp queryPermission(TAuthorizerReq req) throws TException {
547
    return executeRemoteCallWithRetry(
×
548
        () -> client.queryPermission(req), resp -> !updateConfigNodeLeader(resp.status));
×
549
  }
550

551
  @Override
552
  public TPermissionInfoResp login(TLoginReq req) throws TException {
553
    return executeRemoteCallWithRetry(
×
554
        () -> client.login(req), resp -> !updateConfigNodeLeader(resp.status));
×
555
  }
556

557
  @Override
558
  public TPermissionInfoResp checkUserPrivileges(TCheckUserPrivilegesReq req) throws TException {
559
    return executeRemoteCallWithRetry(
×
560
        () -> client.checkUserPrivileges(req), resp -> !updateConfigNodeLeader(resp.status));
×
561
  }
562

563
  @Override
564
  public TConfigNodeRegisterResp registerConfigNode(TConfigNodeRegisterReq req) throws TException {
565
    throw new TException("DataNode to ConfigNode client doesn't support registerConfigNode.");
×
566
  }
567

568
  @Override
569
  public TSStatus addConsensusGroup(TAddConsensusGroupReq registerResp) throws TException {
570
    throw new TException("DataNode to ConfigNode client doesn't support addConsensusGroup.");
×
571
  }
572

573
  @Override
574
  public TSStatus notifyRegisterSuccess() throws TException {
575
    throw new TException("DataNode to ConfigNode client doesn't support notifyRegisterSuccess.");
×
576
  }
577

578
  @Override
579
  public TSStatus removeConfigNode(TConfigNodeLocation configNodeLocation) throws TException {
580
    throw new TException("DataNode to ConfigNode client doesn't support removeConfigNode.");
×
581
  }
582

583
  @Override
584
  public TSStatus deleteConfigNodePeer(TConfigNodeLocation configNodeLocation) throws TException {
585
    throw new TException("DataNode to ConfigNode client doesn't support removeConsensusGroup.");
×
586
  }
587

588
  @Override
589
  public TSStatus reportConfigNodeShutdown(TConfigNodeLocation configNodeLocation)
590
      throws TException {
591
    throw new TException("DataNode to ConfigNode client doesn't support reportConfigNodeShutdown.");
×
592
  }
593

594
  @Override
595
  public TSStatus stopConfigNode(TConfigNodeLocation configNodeLocation) throws TException {
596
    throw new TException("DataNode to ConfigNode client doesn't support stopConfigNode.");
×
597
  }
598

599
  @Override
600
  public TSStatus merge() throws TException {
601
    return executeRemoteCallWithRetry(
×
602
        () -> client.merge(), status -> !updateConfigNodeLeader(status));
×
603
  }
604

605
  @Override
606
  public TSStatus flush(TFlushReq req) throws TException {
607
    return executeRemoteCallWithRetry(
×
608
        () -> client.flush(req), status -> !updateConfigNodeLeader(status));
×
609
  }
610

611
  @Override
612
  public TSStatus clearCache() throws TException {
613
    return executeRemoteCallWithRetry(
×
614
        () -> client.clearCache(), status -> !updateConfigNodeLeader(status));
×
615
  }
616

617
  @Override
618
  public TSStatus loadConfiguration() throws TException {
619
    return executeRemoteCallWithRetry(
×
620
        () -> client.loadConfiguration(), status -> !updateConfigNodeLeader(status));
×
621
  }
622

623
  @Override
624
  public TSStatus setSystemStatus(String systemStatus) throws TException {
625
    return executeRemoteCallWithRetry(
×
626
        () -> client.setSystemStatus(systemStatus), status -> !updateConfigNodeLeader(status));
×
627
  }
628

629
  @Override
630
  public TSStatus setDataNodeStatus(TSetDataNodeStatusReq req) throws TException {
631
    throw new TException("DataNode to ConfigNode client doesn't support setDataNodeStatus.");
×
632
  }
633

634
  @Override
635
  public TSStatus killQuery(String queryId, int dataNodeId) throws TException {
636
    return executeRemoteCallWithRetry(
×
637
        () -> client.killQuery(queryId, dataNodeId), status -> !updateConfigNodeLeader(status));
×
638
  }
639

640
  @Override
641
  public TGetDataNodeLocationsResp getRunningDataNodeLocations() throws TException {
642
    return executeRemoteCallWithRetry(
×
643
        () -> client.getRunningDataNodeLocations(), resp -> !updateConfigNodeLeader(resp.status));
×
644
  }
645

646
  @Override
647
  public TShowRegionResp showRegion(TShowRegionReq req) throws TException {
648
    return executeRemoteCallWithRetry(
×
649
        () -> client.showRegion(req), resp -> !updateConfigNodeLeader(resp.status));
×
650
  }
651

652
  @Override
653
  public TShowDataNodesResp showDataNodes() throws TException {
654
    return executeRemoteCallWithRetry(
×
655
        () -> client.showDataNodes(), resp -> !updateConfigNodeLeader(resp.status));
×
656
  }
657

658
  @Override
659
  public TShowConfigNodesResp showConfigNodes() throws TException {
660
    return executeRemoteCallWithRetry(
×
661
        () -> client.showConfigNodes(), resp -> !updateConfigNodeLeader(resp.status));
×
662
  }
663

664
  @Override
665
  public TShowDatabaseResp showDatabase(List<String> storageGroupPathPattern) throws TException {
666
    return executeRemoteCallWithRetry(
×
667
        () -> client.showDatabase(storageGroupPathPattern),
×
668
        resp -> !updateConfigNodeLeader(resp.status));
×
669
  }
670

671
  @Override
672
  public TRegionRouteMapResp getLatestRegionRouteMap() throws TException {
673
    return executeRemoteCallWithRetry(
×
674
        () -> client.getLatestRegionRouteMap(), resp -> !updateConfigNodeLeader(resp.status));
×
675
  }
676

677
  @Override
678
  public long getConfigNodeHeartBeat(long timestamp) throws TException {
679
    throw new TException("DataNode to ConfigNode client doesn't support getConfigNodeHeartBeat.");
×
680
  }
681

682
  @Override
683
  public TSStatus createFunction(TCreateFunctionReq req) throws TException {
684
    return executeRemoteCallWithRetry(
×
685
        () -> client.createFunction(req), status -> !updateConfigNodeLeader(status));
×
686
  }
687

688
  @Override
689
  public TSStatus dropFunction(TDropFunctionReq req) throws TException {
690
    return executeRemoteCallWithRetry(
×
691
        () -> client.dropFunction(req), status -> !updateConfigNodeLeader(status));
×
692
  }
693

694
  @Override
695
  public TGetUDFTableResp getUDFTable() throws TException {
696
    return executeRemoteCallWithRetry(
×
697
        () -> client.getUDFTable(), resp -> !updateConfigNodeLeader(resp.status));
×
698
  }
699

700
  @Override
701
  public TGetJarInListResp getUDFJar(TGetJarInListReq req) throws TException {
702
    return executeRemoteCallWithRetry(
×
703
        () -> client.getUDFJar(req), resp -> !updateConfigNodeLeader(resp.status));
×
704
  }
705

706
  @Override
707
  public TSStatus createTrigger(TCreateTriggerReq req) throws TException {
708
    return executeRemoteCallWithRetry(
×
709
        () -> client.createTrigger(req), status -> !updateConfigNodeLeader(status));
×
710
  }
711

712
  @Override
713
  public TSStatus dropTrigger(TDropTriggerReq req) throws TException {
714
    return executeRemoteCallWithRetry(
×
715
        () -> client.dropTrigger(req), status -> !updateConfigNodeLeader(status));
×
716
  }
717

718
  @Override
719
  public TGetLocationForTriggerResp getLocationOfStatefulTrigger(String triggerName)
720
      throws TException {
721
    return executeRemoteCallWithRetry(
×
722
        () -> client.getLocationOfStatefulTrigger(triggerName),
×
723
        resp -> !updateConfigNodeLeader(resp.status));
×
724
  }
725

726
  public TGetTriggerTableResp getTriggerTable() throws TException {
727
    return executeRemoteCallWithRetry(
×
728
        () -> client.getTriggerTable(), resp -> !updateConfigNodeLeader(resp.status));
×
729
  }
730

731
  @Override
732
  public TGetTriggerTableResp getStatefulTriggerTable() throws TException {
733
    return executeRemoteCallWithRetry(
×
734
        () -> client.getStatefulTriggerTable(), resp -> !updateConfigNodeLeader(resp.status));
×
735
  }
736

737
  @Override
738
  public TGetJarInListResp getTriggerJar(TGetJarInListReq req) throws TException {
739
    return executeRemoteCallWithRetry(
×
740
        () -> client.getTriggerJar(req), resp -> !updateConfigNodeLeader(resp.status));
×
741
  }
742

743
  @Override
744
  public TSStatus createPipePlugin(TCreatePipePluginReq req) throws TException {
745
    return executeRemoteCallWithRetry(
×
746
        () -> client.createPipePlugin(req), status -> !updateConfigNodeLeader(status));
×
747
  }
748

749
  @Override
750
  public TSStatus dropPipePlugin(TDropPipePluginReq req) throws TException {
751
    return executeRemoteCallWithRetry(
×
752
        () -> client.dropPipePlugin(req), status -> !updateConfigNodeLeader(status));
×
753
  }
754

755
  @Override
756
  public TGetPipePluginTableResp getPipePluginTable() throws TException {
757
    return executeRemoteCallWithRetry(
×
758
        () -> client.getPipePluginTable(), resp -> !updateConfigNodeLeader(resp.status));
×
759
  }
760

761
  @Override
762
  public TGetJarInListResp getPipePluginJar(TGetJarInListReq req) throws TException {
763
    return executeRemoteCallWithRetry(
×
764
        () -> client.getPipePluginJar(req), resp -> !updateConfigNodeLeader(resp.status));
×
765
  }
766

767
  @Override
768
  public TSStatus createSchemaTemplate(TCreateSchemaTemplateReq req) throws TException {
769
    return executeRemoteCallWithRetry(
×
770
        () -> client.createSchemaTemplate(req), status -> !updateConfigNodeLeader(status));
×
771
  }
772

773
  @Override
774
  public TGetAllTemplatesResp getAllTemplates() throws TException {
775
    return executeRemoteCallWithRetry(
×
776
        () -> client.getAllTemplates(), resp -> !updateConfigNodeLeader(resp.status));
×
777
  }
778

779
  @Override
780
  public TGetTemplateResp getTemplate(String req) throws TException {
781
    return executeRemoteCallWithRetry(
×
782
        () -> client.getTemplate(req), resp -> !updateConfigNodeLeader(resp.status));
×
783
  }
784

785
  @Override
786
  public TSStatus setSchemaTemplate(TSetSchemaTemplateReq req) throws TException {
787
    return executeRemoteCallWithRetry(
×
788
        () -> client.setSchemaTemplate(req), status -> !updateConfigNodeLeader(status));
×
789
  }
790

791
  @Override
792
  public TGetPathsSetTemplatesResp getPathsSetTemplate(String req) throws TException {
793
    return executeRemoteCallWithRetry(
×
794
        () -> client.getPathsSetTemplate(req), resp -> !updateConfigNodeLeader(resp.status));
×
795
  }
796

797
  @Override
798
  public TSStatus deactivateSchemaTemplate(TDeactivateSchemaTemplateReq req) throws TException {
799
    return executeRemoteCallWithRetry(
×
800
        () -> client.deactivateSchemaTemplate(req), status -> !updateConfigNodeLeader(status));
×
801
  }
802

803
  @Override
804
  public TSStatus unsetSchemaTemplate(TUnsetSchemaTemplateReq req) throws TException {
805
    return executeRemoteCallWithRetry(
×
806
        () -> client.unsetSchemaTemplate(req), status -> !updateConfigNodeLeader(status));
×
807
  }
808

809
  @Override
810
  public TSStatus dropSchemaTemplate(String req) throws TException {
811
    return executeRemoteCallWithRetry(
×
812
        () -> client.dropSchemaTemplate(req), status -> !updateConfigNodeLeader(status));
×
813
  }
814

815
  @Override
816
  public TSStatus alterSchemaTemplate(TAlterSchemaTemplateReq req) throws TException {
817
    return executeRemoteCallWithRetry(
×
818
        () -> client.alterSchemaTemplate(req), status -> !updateConfigNodeLeader(status));
×
819
  }
820

821
  @Override
822
  public TSStatus deleteTimeSeries(TDeleteTimeSeriesReq req) throws TException {
823
    return executeRemoteCallWithRetry(
×
824
        () -> client.deleteTimeSeries(req), status -> !updateConfigNodeLeader(status));
×
825
  }
826

827
  @Override
828
  public TSStatus deleteLogicalView(TDeleteLogicalViewReq req) throws TException {
829
    return executeRemoteCallWithRetry(
×
830
        () -> client.deleteLogicalView(req), status -> !updateConfigNodeLeader(status));
×
831
  }
832

833
  @Override
834
  public TSStatus alterLogicalView(TAlterLogicalViewReq req) throws TException {
835
    return executeRemoteCallWithRetry(
×
836
        () -> client.alterLogicalView(req), status -> !updateConfigNodeLeader(status));
×
837
  }
838

839
  @Override
840
  public TSStatus createPipeSink(TPipeSinkInfo req) throws TException {
841
    return executeRemoteCallWithRetry(
×
842
        () -> client.createPipeSink(req), status -> !updateConfigNodeLeader(status));
×
843
  }
844

845
  @Override
846
  public TSStatus dropPipeSink(TDropPipeSinkReq req) throws TException {
847
    return executeRemoteCallWithRetry(
×
848
        () -> client.dropPipeSink(req), status -> !updateConfigNodeLeader(status));
×
849
  }
850

851
  @Override
852
  public TGetPipeSinkResp getPipeSink(TGetPipeSinkReq req) throws TException {
853
    return executeRemoteCallWithRetry(
×
854
        () -> client.getPipeSink(req), resp -> !updateConfigNodeLeader(resp.status));
×
855
  }
856

857
  @Override
858
  public TSStatus createPipe(TCreatePipeReq req) throws TException {
859
    return executeRemoteCallWithRetry(
×
860
        () -> client.createPipe(req), status -> !updateConfigNodeLeader(status));
×
861
  }
862

863
  @Override
864
  public TSStatus startPipe(String pipeName) throws TException {
865
    return executeRemoteCallWithRetry(
×
866
        () -> client.startPipe(pipeName), status -> !updateConfigNodeLeader(status));
×
867
  }
868

869
  @Override
870
  public TSStatus stopPipe(String pipeName) throws TException {
871
    return executeRemoteCallWithRetry(
×
872
        () -> client.stopPipe(pipeName), status -> !updateConfigNodeLeader(status));
×
873
  }
874

875
  @Override
876
  public TSStatus dropPipe(String pipeName) throws TException {
877
    return executeRemoteCallWithRetry(
×
878
        () -> client.dropPipe(pipeName), status -> !updateConfigNodeLeader(status));
×
879
  }
880

881
  @Override
882
  public TShowPipeResp showPipe(TShowPipeReq req) throws TException {
883
    return executeRemoteCallWithRetry(
×
884
        () -> client.showPipe(req), resp -> !updateConfigNodeLeader(resp.status));
×
885
  }
886

887
  @Override
888
  public TGetAllPipeInfoResp getAllPipeInfo() throws TException {
889
    return executeRemoteCallWithRetry(
×
890
        () -> client.getAllPipeInfo(), resp -> !updateConfigNodeLeader(resp.status));
×
891
  }
892

893
  @Override
894
  public TGetRegionIdResp getRegionId(TGetRegionIdReq req) throws TException {
895
    return executeRemoteCallWithRetry(
×
896
        () -> client.getRegionId(req), resp -> !updateConfigNodeLeader(resp.status));
×
897
  }
898

899
  @Override
900
  public TGetTimeSlotListResp getTimeSlotList(TGetTimeSlotListReq req) throws TException {
901
    return executeRemoteCallWithRetry(
×
902
        () -> client.getTimeSlotList(req), resp -> !updateConfigNodeLeader(resp.status));
×
903
  }
904

905
  public TCountTimeSlotListResp countTimeSlotList(TCountTimeSlotListReq req) throws TException {
906
    return executeRemoteCallWithRetry(
×
907
        () -> client.countTimeSlotList(req), resp -> !updateConfigNodeLeader(resp.status));
×
908
  }
909

910
  @Override
911
  public TGetSeriesSlotListResp getSeriesSlotList(TGetSeriesSlotListReq req) throws TException {
912
    return executeRemoteCallWithRetry(
×
913
        () -> client.getSeriesSlotList(req), resp -> !updateConfigNodeLeader(resp.status));
×
914
  }
915

916
  @Override
917
  public TSStatus migrateRegion(TMigrateRegionReq req) throws TException {
918
    return executeRemoteCallWithRetry(
×
919
        () -> client.migrateRegion(req), status -> !updateConfigNodeLeader(status));
×
920
  }
921

922
  @Override
923
  public TSStatus createCQ(TCreateCQReq req) throws TException {
924
    return executeRemoteCallWithRetry(
×
925
        () -> client.createCQ(req), status -> !updateConfigNodeLeader(status));
×
926
  }
927

928
  @Override
929
  public TSStatus dropCQ(TDropCQReq req) throws TException {
930
    return executeRemoteCallWithRetry(
×
931
        () -> client.dropCQ(req), status -> !updateConfigNodeLeader(status));
×
932
  }
933

934
  @Override
935
  public TShowCQResp showCQ() throws TException {
936
    return executeRemoteCallWithRetry(
×
937
        () -> client.showCQ(), resp -> !updateConfigNodeLeader(resp.status));
×
938
  }
939

940
  @Override
941
  public TSStatus createModel(TCreateModelReq req) throws TException {
942
    return executeRemoteCallWithRetry(
×
943
        () -> client.createModel(req), status -> !updateConfigNodeLeader(status));
×
944
  }
945

946
  @Override
947
  public TSStatus dropModel(TDropModelReq req) throws TException {
948
    return executeRemoteCallWithRetry(
×
949
        () -> client.dropModel(req), status -> !updateConfigNodeLeader(status));
×
950
  }
951

952
  @Override
953
  public TShowModelResp showModel(TShowModelReq req) throws TException {
954
    return executeRemoteCallWithRetry(
×
955
        () -> client.showModel(req), resp -> !updateConfigNodeLeader(resp.status));
×
956
  }
957

958
  @Override
959
  public TShowTrialResp showTrial(TShowTrialReq req) throws TException {
960
    return executeRemoteCallWithRetry(
×
961
        () -> client.showTrial(req), resp -> !updateConfigNodeLeader(resp.status));
×
962
  }
963

964
  @Override
965
  public TSStatus updateModelInfo(TUpdateModelInfoReq req) throws TException {
966
    throw new TException(new UnsupportedOperationException().getCause());
×
967
  }
968

969
  @Override
970
  public TSStatus updateModelState(TUpdateModelStateReq req) throws TException {
971
    throw new TException(new UnsupportedOperationException().getCause());
×
972
  }
973

974
  @Override
975
  public TGetModelInfoResp getModelInfo(TGetModelInfoReq req) throws TException {
976
    for (int i = 0; i < RETRY_NUM; i++) {
×
977
      try {
978
        TGetModelInfoResp resp = client.getModelInfo(req);
×
979
        if (!updateConfigNodeLeader(resp.getStatus())) {
×
980
          return resp;
×
981
        }
982
      } catch (TException e) {
×
983
        configLeader = null;
×
984
      }
×
985
      waitAndReconnect();
×
986
    }
987
    throw new TException(MSG_RECONNECTION_FAIL);
×
988
  }
989

990
  @Override
991
  public TSStatus setSpaceQuota(TSetSpaceQuotaReq req) throws TException {
992
    return executeRemoteCallWithRetry(
×
993
        () -> client.setSpaceQuota(req), status -> !updateConfigNodeLeader(status));
×
994
  }
995

996
  @Override
997
  public TSpaceQuotaResp showSpaceQuota(List<String> databases) throws TException {
998
    return executeRemoteCallWithRetry(
×
999
        () -> client.showSpaceQuota(databases), resp -> !updateConfigNodeLeader(resp.status));
×
1000
  }
1001

1002
  @Override
1003
  public TSpaceQuotaResp getSpaceQuota() throws TException {
1004
    return executeRemoteCallWithRetry(
×
1005
        () -> client.getSpaceQuota(), resp -> !updateConfigNodeLeader(resp.status));
×
1006
  }
1007

1008
  @Override
1009
  public TSStatus setThrottleQuota(TSetThrottleQuotaReq req) throws TException {
1010
    return executeRemoteCallWithRetry(
×
1011
        () -> client.setThrottleQuota(req), status -> !updateConfigNodeLeader(status));
×
1012
  }
1013

1014
  @Override
1015
  public TThrottleQuotaResp showThrottleQuota(TShowThrottleReq req) throws TException {
1016
    return executeRemoteCallWithRetry(
×
1017
        () -> client.showThrottleQuota(req), resp -> !updateConfigNodeLeader(resp.status));
×
1018
  }
1019

1020
  @Override
1021
  public TThrottleQuotaResp getThrottleQuota() throws TException {
1022
    return executeRemoteCallWithRetry(
×
1023
        () -> client.getThrottleQuota(), resp -> !updateConfigNodeLeader(resp.status));
×
1024
  }
1025

1026
  public static class Factory extends ThriftClientFactory<ConfigRegionId, ConfigNodeClient> {
1027

1028
    public Factory(
1029
        ClientManager<ConfigRegionId, ConfigNodeClient> clientManager,
1030
        ThriftClientProperty thriftClientProperty) {
1031
      super(clientManager, thriftClientProperty);
1✔
1032
    }
1✔
1033

1034
    @Override
1035
    public void destroyObject(
1036
        ConfigRegionId configRegionId, PooledObject<ConfigNodeClient> pooledObject) {
1037
      pooledObject.getObject().invalidate();
×
1038
    }
×
1039

1040
    @Override
1041
    public PooledObject<ConfigNodeClient> makeObject(ConfigRegionId configRegionId)
1042
        throws Exception {
1043
      return new DefaultPooledObject<>(
1✔
1044
          SyncThriftClientWithErrorHandler.newErrorHandler(
×
1045
              ConfigNodeClient.class,
1046
              ConfigNodeClient.class.getConstructor(
1✔
1047
                  List.class, thriftClientProperty.getClass(), clientManager.getClass()),
1✔
1048
              ConfigNodeInfo.getInstance().getLatestConfigNodes(),
1✔
1049
              thriftClientProperty,
1050
              clientManager));
1051
    }
1052

1053
    @Override
1054
    public boolean validateObject(
1055
        ConfigRegionId configRegionId, PooledObject<ConfigNodeClient> pooledObject) {
1056
      return Optional.ofNullable(pooledObject.getObject().getTransport())
×
1057
          .map(TTransport::isOpen)
×
1058
          .orElse(false);
×
1059
    }
1060
  }
1061
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc