• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

apache / iotdb / #9673

pending completion
#9673

push

travis_ci

web-flow
Fixes #10676: Refactoring and warning messages improvement for ConfigNodeClient (#10665)

206 of 206 new or added lines in 1 file covered. (100.0%)

79125 of 164945 relevant lines covered (47.97%)

0.48 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

7.72
/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/client/ConfigNodeClient.java
1
/*
2
 * Licensed to the Apache Software Foundation (ASF) under one
3
 * or more contributor license agreements.  See the NOTICE file
4
 * distributed with this work for additional information
5
 * regarding copyright ownership.  The ASF licenses this file
6
 * to you under the Apache License, Version 2.0 (the
7
 * "License"); you may not use this file except in compliance
8
 * with the License.  You may obtain a copy of the License at
9
 *
10
 *     http://www.apache.org/licenses/LICENSE-2.0
11
 *
12
 * Unless required by applicable law or agreed to in writing,
13
 * software distributed under the License is distributed on an
14
 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15
 * KIND, either express or implied.  See the License for the
16
 * specific language governing permissions and limitations
17
 * under the License.
18
 */
19

20
package org.apache.iotdb.db.protocol.client;
21

22
import org.apache.iotdb.common.rpc.thrift.TConfigNodeLocation;
23
import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation;
24
import org.apache.iotdb.common.rpc.thrift.TEndPoint;
25
import org.apache.iotdb.common.rpc.thrift.TFlushReq;
26
import org.apache.iotdb.common.rpc.thrift.TSStatus;
27
import org.apache.iotdb.common.rpc.thrift.TSetSpaceQuotaReq;
28
import org.apache.iotdb.common.rpc.thrift.TSetTTLReq;
29
import org.apache.iotdb.common.rpc.thrift.TSetThrottleQuotaReq;
30
import org.apache.iotdb.commons.client.ClientManager;
31
import org.apache.iotdb.commons.client.ThriftClient;
32
import org.apache.iotdb.commons.client.factory.ThriftClientFactory;
33
import org.apache.iotdb.commons.client.property.ThriftClientProperty;
34
import org.apache.iotdb.commons.client.sync.SyncThriftClientWithErrorHandler;
35
import org.apache.iotdb.commons.consensus.ConfigRegionId;
36
import org.apache.iotdb.confignode.rpc.thrift.IConfigNodeRPCService;
37
import org.apache.iotdb.confignode.rpc.thrift.TAddConsensusGroupReq;
38
import org.apache.iotdb.confignode.rpc.thrift.TAlterLogicalViewReq;
39
import org.apache.iotdb.confignode.rpc.thrift.TAlterSchemaTemplateReq;
40
import org.apache.iotdb.confignode.rpc.thrift.TAuthorizerReq;
41
import org.apache.iotdb.confignode.rpc.thrift.TAuthorizerResp;
42
import org.apache.iotdb.confignode.rpc.thrift.TCheckUserPrivilegesReq;
43
import org.apache.iotdb.confignode.rpc.thrift.TConfigNodeRegisterReq;
44
import org.apache.iotdb.confignode.rpc.thrift.TConfigNodeRegisterResp;
45
import org.apache.iotdb.confignode.rpc.thrift.TConfigNodeRestartReq;
46
import org.apache.iotdb.confignode.rpc.thrift.TCountDatabaseResp;
47
import org.apache.iotdb.confignode.rpc.thrift.TCountTimeSlotListReq;
48
import org.apache.iotdb.confignode.rpc.thrift.TCountTimeSlotListResp;
49
import org.apache.iotdb.confignode.rpc.thrift.TCreateCQReq;
50
import org.apache.iotdb.confignode.rpc.thrift.TCreateFunctionReq;
51
import org.apache.iotdb.confignode.rpc.thrift.TCreateModelReq;
52
import org.apache.iotdb.confignode.rpc.thrift.TCreatePipePluginReq;
53
import org.apache.iotdb.confignode.rpc.thrift.TCreatePipeReq;
54
import org.apache.iotdb.confignode.rpc.thrift.TCreateSchemaTemplateReq;
55
import org.apache.iotdb.confignode.rpc.thrift.TCreateTriggerReq;
56
import org.apache.iotdb.confignode.rpc.thrift.TDataNodeConfigurationResp;
57
import org.apache.iotdb.confignode.rpc.thrift.TDataNodeRegisterReq;
58
import org.apache.iotdb.confignode.rpc.thrift.TDataNodeRegisterResp;
59
import org.apache.iotdb.confignode.rpc.thrift.TDataNodeRemoveReq;
60
import org.apache.iotdb.confignode.rpc.thrift.TDataNodeRemoveResp;
61
import org.apache.iotdb.confignode.rpc.thrift.TDataNodeRestartReq;
62
import org.apache.iotdb.confignode.rpc.thrift.TDataNodeRestartResp;
63
import org.apache.iotdb.confignode.rpc.thrift.TDataPartitionReq;
64
import org.apache.iotdb.confignode.rpc.thrift.TDataPartitionTableResp;
65
import org.apache.iotdb.confignode.rpc.thrift.TDatabaseSchema;
66
import org.apache.iotdb.confignode.rpc.thrift.TDatabaseSchemaResp;
67
import org.apache.iotdb.confignode.rpc.thrift.TDeactivateSchemaTemplateReq;
68
import org.apache.iotdb.confignode.rpc.thrift.TDeleteDatabaseReq;
69
import org.apache.iotdb.confignode.rpc.thrift.TDeleteDatabasesReq;
70
import org.apache.iotdb.confignode.rpc.thrift.TDeleteLogicalViewReq;
71
import org.apache.iotdb.confignode.rpc.thrift.TDeleteTimeSeriesReq;
72
import org.apache.iotdb.confignode.rpc.thrift.TDropCQReq;
73
import org.apache.iotdb.confignode.rpc.thrift.TDropFunctionReq;
74
import org.apache.iotdb.confignode.rpc.thrift.TDropModelReq;
75
import org.apache.iotdb.confignode.rpc.thrift.TDropPipePluginReq;
76
import org.apache.iotdb.confignode.rpc.thrift.TDropPipeSinkReq;
77
import org.apache.iotdb.confignode.rpc.thrift.TDropTriggerReq;
78
import org.apache.iotdb.confignode.rpc.thrift.TGetAllPipeInfoResp;
79
import org.apache.iotdb.confignode.rpc.thrift.TGetAllTemplatesResp;
80
import org.apache.iotdb.confignode.rpc.thrift.TGetDataNodeLocationsResp;
81
import org.apache.iotdb.confignode.rpc.thrift.TGetJarInListReq;
82
import org.apache.iotdb.confignode.rpc.thrift.TGetJarInListResp;
83
import org.apache.iotdb.confignode.rpc.thrift.TGetLocationForTriggerResp;
84
import org.apache.iotdb.confignode.rpc.thrift.TGetPathsSetTemplatesResp;
85
import org.apache.iotdb.confignode.rpc.thrift.TGetPipePluginTableResp;
86
import org.apache.iotdb.confignode.rpc.thrift.TGetPipeSinkReq;
87
import org.apache.iotdb.confignode.rpc.thrift.TGetPipeSinkResp;
88
import org.apache.iotdb.confignode.rpc.thrift.TGetRegionIdReq;
89
import org.apache.iotdb.confignode.rpc.thrift.TGetRegionIdResp;
90
import org.apache.iotdb.confignode.rpc.thrift.TGetSeriesSlotListReq;
91
import org.apache.iotdb.confignode.rpc.thrift.TGetSeriesSlotListResp;
92
import org.apache.iotdb.confignode.rpc.thrift.TGetTemplateResp;
93
import org.apache.iotdb.confignode.rpc.thrift.TGetTimeSlotListReq;
94
import org.apache.iotdb.confignode.rpc.thrift.TGetTimeSlotListResp;
95
import org.apache.iotdb.confignode.rpc.thrift.TGetTriggerTableResp;
96
import org.apache.iotdb.confignode.rpc.thrift.TGetUDFTableResp;
97
import org.apache.iotdb.confignode.rpc.thrift.TLoginReq;
98
import org.apache.iotdb.confignode.rpc.thrift.TMigrateRegionReq;
99
import org.apache.iotdb.confignode.rpc.thrift.TPermissionInfoResp;
100
import org.apache.iotdb.confignode.rpc.thrift.TPipeSinkInfo;
101
import org.apache.iotdb.confignode.rpc.thrift.TRegionMigrateResultReportReq;
102
import org.apache.iotdb.confignode.rpc.thrift.TRegionRouteMapResp;
103
import org.apache.iotdb.confignode.rpc.thrift.TSchemaNodeManagementReq;
104
import org.apache.iotdb.confignode.rpc.thrift.TSchemaNodeManagementResp;
105
import org.apache.iotdb.confignode.rpc.thrift.TSchemaPartitionReq;
106
import org.apache.iotdb.confignode.rpc.thrift.TSchemaPartitionTableResp;
107
import org.apache.iotdb.confignode.rpc.thrift.TSetDataNodeStatusReq;
108
import org.apache.iotdb.confignode.rpc.thrift.TSetDataReplicationFactorReq;
109
import org.apache.iotdb.confignode.rpc.thrift.TSetSchemaReplicationFactorReq;
110
import org.apache.iotdb.confignode.rpc.thrift.TSetSchemaTemplateReq;
111
import org.apache.iotdb.confignode.rpc.thrift.TSetTimePartitionIntervalReq;
112
import org.apache.iotdb.confignode.rpc.thrift.TShowCQResp;
113
import org.apache.iotdb.confignode.rpc.thrift.TShowClusterResp;
114
import org.apache.iotdb.confignode.rpc.thrift.TShowConfigNodesResp;
115
import org.apache.iotdb.confignode.rpc.thrift.TShowDataNodesResp;
116
import org.apache.iotdb.confignode.rpc.thrift.TShowDatabaseResp;
117
import org.apache.iotdb.confignode.rpc.thrift.TShowModelReq;
118
import org.apache.iotdb.confignode.rpc.thrift.TShowModelResp;
119
import org.apache.iotdb.confignode.rpc.thrift.TShowPipeReq;
120
import org.apache.iotdb.confignode.rpc.thrift.TShowPipeResp;
121
import org.apache.iotdb.confignode.rpc.thrift.TShowRegionReq;
122
import org.apache.iotdb.confignode.rpc.thrift.TShowRegionResp;
123
import org.apache.iotdb.confignode.rpc.thrift.TShowThrottleReq;
124
import org.apache.iotdb.confignode.rpc.thrift.TShowTrailReq;
125
import org.apache.iotdb.confignode.rpc.thrift.TShowTrailResp;
126
import org.apache.iotdb.confignode.rpc.thrift.TShowVariablesResp;
127
import org.apache.iotdb.confignode.rpc.thrift.TSpaceQuotaResp;
128
import org.apache.iotdb.confignode.rpc.thrift.TSystemConfigurationResp;
129
import org.apache.iotdb.confignode.rpc.thrift.TThrottleQuotaResp;
130
import org.apache.iotdb.confignode.rpc.thrift.TUnsetSchemaTemplateReq;
131
import org.apache.iotdb.confignode.rpc.thrift.TUpdateModelInfoReq;
132
import org.apache.iotdb.confignode.rpc.thrift.TUpdateModelStateReq;
133
import org.apache.iotdb.db.conf.IoTDBConfig;
134
import org.apache.iotdb.db.conf.IoTDBDescriptor;
135
import org.apache.iotdb.rpc.RpcTransportFactory;
136
import org.apache.iotdb.rpc.TSStatusCode;
137

138
import org.apache.commons.pool2.PooledObject;
139
import org.apache.commons.pool2.impl.DefaultPooledObject;
140
import org.apache.thrift.TException;
141
import org.apache.thrift.transport.TTransport;
142
import org.apache.thrift.transport.TTransportException;
143
import org.slf4j.Logger;
144
import org.slf4j.LoggerFactory;
145

146
import java.util.ArrayList;
147
import java.util.List;
148
import java.util.Optional;
149
import java.util.function.Function;
150

151
public class ConfigNodeClient implements IConfigNodeRPCService.Iface, ThriftClient, AutoCloseable {
152

153
  private static final Logger logger = LoggerFactory.getLogger(ConfigNodeClient.class);
1✔
154

155
  private static final int RETRY_NUM = 5;
156

157
  public static final String MSG_RECONNECTION_FAIL =
158
      "Fail to connect to any config node. Please check status of ConfigNodes";
159

160
  private static final String MSG_RECONNECTION_DATANODE_FAIL =
161
      "Failed to connect to ConfigNode %s from DataNode %s when executing %s, Exception:";
162
  private static final int RETRY_INTERVAL_MS = 1000;
163

164
  private final ThriftClientProperty property;
165

166
  private IConfigNodeRPCService.Iface client;
167

168
  private TTransport transport;
169

170
  private TEndPoint configLeader;
171

172
  private List<TEndPoint> configNodes;
173

174
  private TEndPoint configNode;
175

176
  private int cursor = 0;
1✔
177

178
  private final IoTDBConfig config = IoTDBDescriptor.getInstance().getConfig();
1✔
179

180
  ClientManager<ConfigRegionId, ConfigNodeClient> clientManager;
181

182
  ConfigRegionId configRegionId = ConfigNodeInfo.CONFIG_REGION_ID;
1✔
183

184
  public ConfigNodeClient(
185
      List<TEndPoint> configNodes,
186
      ThriftClientProperty property,
187
      ClientManager<ConfigRegionId, ConfigNodeClient> clientManager)
188
      throws TException {
1✔
189
    this.configNodes = configNodes;
1✔
190
    this.property = property;
1✔
191
    this.clientManager = clientManager;
1✔
192

193
    init();
×
194
  }
×
195

196
  public void init() throws TException {
197
    try {
198
      tryToConnect();
×
199
    } catch (TException e) {
1✔
200
      // Can not connect to each config node
201
      syncLatestConfigNodeList();
1✔
202
      tryToConnect();
×
203
    }
×
204
  }
×
205

206
  public void connect(TEndPoint endpoint) throws TException {
207
    try {
208
      transport =
×
209
          RpcTransportFactory.INSTANCE.getTransport(
×
210
              // As there is a try-catch already, we do not need to use TSocket.wrap
211
              endpoint.getIp(), endpoint.getPort(), property.getConnectionTimeoutMs());
×
212
      if (!transport.isOpen()) {
×
213
        transport.open();
×
214
      }
215
      configNode = endpoint;
×
216
    } catch (TTransportException e) {
×
217
      throw new TException(e);
×
218
    }
×
219

220
    client = new IConfigNodeRPCService.Client(property.getProtocolFactory().getProtocol(transport));
×
221
  }
×
222

223
  private void waitAndReconnect() throws TException {
224
    try {
225
      // Wait to start the next try
226
      Thread.sleep(RETRY_INTERVAL_MS);
×
227
    } catch (InterruptedException e) {
×
228
      Thread.currentThread().interrupt();
×
229
      throw new TException(
×
230
          "Unexpected interruption when waiting to retry to connect to ConfigNode");
231
    }
×
232

233
    try {
234
      tryToConnect();
×
235
    } catch (TException e) {
×
236
      // Can not connect to each config node
237
      syncLatestConfigNodeList();
×
238
      tryToConnect();
×
239
    }
×
240
  }
×
241

242
  private void tryToConnect() throws TException {
243
    if (configLeader != null) {
1✔
244
      try {
245
        connect(configLeader);
×
246
        return;
×
247
      } catch (TException e) {
×
248
        logger.warn("The current node may have been down {},try next node", configLeader);
×
249
        configLeader = null;
×
250
      }
251
    }
252

253
    if (transport != null) {
1✔
254
      transport.close();
×
255
    }
256

257
    for (int tryHostNum = 0; tryHostNum < configNodes.size(); tryHostNum++) {
1✔
258
      cursor = (cursor + 1) % configNodes.size();
×
259
      TEndPoint tryEndpoint = configNodes.get(cursor);
×
260

261
      try {
262
        connect(tryEndpoint);
×
263
        return;
×
264
      } catch (TException e) {
×
265
        logger.warn("The current node may have been down {},try next node", tryEndpoint);
×
266
      }
267
    }
268

269
    throw new TException(MSG_RECONNECTION_FAIL);
1✔
270
  }
271

272
  public TTransport getTransport() {
273
    return transport;
×
274
  }
275

276
  public void syncLatestConfigNodeList() {
277
    configNodes = ConfigNodeInfo.getInstance().getLatestConfigNodes();
1✔
278
    cursor = 0;
1✔
279
  }
1✔
280

281
  @Override
282
  public void close() {
283
    clientManager.returnClient(configRegionId, this);
×
284
  }
×
285

286
  @Override
287
  public void invalidate() {
288
    Optional.ofNullable(transport).ifPresent(TTransport::close);
1✔
289
  }
1✔
290

291
  @Override
292
  public void invalidateAll() {
293
    clientManager.clear(ConfigNodeInfo.CONFIG_REGION_ID);
×
294
  }
×
295

296
  @Override
297
  public boolean printLogWhenEncounterException() {
298
    return property.isPrintLogWhenEncounterException();
×
299
  }
300

301
  private boolean updateConfigNodeLeader(TSStatus status) {
302
    if (status.getCode() == TSStatusCode.REDIRECTION_RECOMMEND.getStatusCode()) {
×
303
      if (status.isSetRedirectNode()) {
×
304
        configLeader =
×
305
            new TEndPoint(status.getRedirectNode().getIp(), status.getRedirectNode().getPort());
×
306
      } else {
307
        configLeader = null;
×
308
      }
309
      logger.warn(
×
310
          "Failed to connect to ConfigNode {} from DataNode {}, because the current node is not leader, try next node",
311
          configNode,
312
          config.getAddressAndPort());
×
313
      return true;
×
314
    }
315
    return false;
×
316
  }
317

318
  /**
319
   * The frame of execute RPC, include logic of retry and exception handling.
320
   *
321
   * @param call which rpc should call
322
   * @param check check the rpc's result
323
   * @return rpc's result
324
   * @param <T> the type of rpc result
325
   * @throws TException if fails more than RETRY_NUM times, throw TException(MSG_RECONNECTION_FAIL)
326
   */
327
  private <T> T executeRemoteCallWithRetry(Operation<T> call, Function<T, Boolean> check)
328
      throws TException {
329
    for (int i = 0; i < RETRY_NUM; i++) {
×
330
      try {
331
        T result = call.execute();
×
332
        if (check.apply(result)) {
×
333
          return result;
×
334
        }
335
      } catch (TException e) {
×
336
        String message =
×
337
            String.format(
×
338
                MSG_RECONNECTION_DATANODE_FAIL,
339
                configNode,
340
                config.getAddressAndPort(),
×
341
                Thread.currentThread().getStackTrace()[2].getMethodName());
×
342
        logger.warn(message, e);
×
343
        configLeader = null;
×
344
      }
×
345
      waitAndReconnect();
×
346
    }
347
    throw new TException(MSG_RECONNECTION_FAIL);
×
348
  }
349

350
  @FunctionalInterface
351
  private interface Operation<T> {
352
    T execute() throws TException;
353
  }
354

355
  @Override
356
  public TSystemConfigurationResp getSystemConfiguration() throws TException {
357
    return executeRemoteCallWithRetry(
×
358
        () -> client.getSystemConfiguration(), resp -> !updateConfigNodeLeader(resp.status));
×
359
  }
360

361
  @Override
362
  public TDataNodeRegisterResp registerDataNode(TDataNodeRegisterReq req) throws TException {
363
    for (int i = 0; i < RETRY_NUM; i++) {
×
364
      try {
365
        TDataNodeRegisterResp resp = client.registerDataNode(req);
×
366

367
        if (!updateConfigNodeLeader(resp.status)) {
×
368
          return resp;
×
369
        }
370

371
        // set latest config node list
372
        List<TEndPoint> newConfigNodes = new ArrayList<>();
×
373
        for (TConfigNodeLocation configNodeLocation : resp.getConfigNodeList()) {
×
374
          newConfigNodes.add(configNodeLocation.getInternalEndPoint());
×
375
        }
×
376
        configNodes = newConfigNodes;
×
377
      } catch (TException e) {
×
378
        String message =
×
379
            String.format(
×
380
                MSG_RECONNECTION_DATANODE_FAIL,
381
                configNode,
382
                config.getAddressAndPort(),
×
383
                Thread.currentThread().getStackTrace()[1].getMethodName());
×
384
        logger.warn(message, e);
×
385
        configLeader = null;
×
386
      }
×
387
      waitAndReconnect();
×
388
    }
389
    throw new TException(MSG_RECONNECTION_FAIL);
×
390
  }
391

392
  @Override
393
  public TDataNodeRestartResp restartDataNode(TDataNodeRestartReq req) throws TException {
394
    return executeRemoteCallWithRetry(
×
395
        () -> client.restartDataNode(req), resp -> !updateConfigNodeLeader(resp.status));
×
396
  }
397

398
  @Override
399
  public TDataNodeRemoveResp removeDataNode(TDataNodeRemoveReq req) throws TException {
400
    return executeRemoteCallWithRetry(
×
401
        () -> client.removeDataNode(req), resp -> !updateConfigNodeLeader(resp.status));
×
402
  }
403

404
  @Override
405
  public TSStatus reportDataNodeShutdown(TDataNodeLocation dataNodeLocation) throws TException {
406
    return executeRemoteCallWithRetry(
×
407
        () -> client.reportDataNodeShutdown(dataNodeLocation),
×
408
        status -> !updateConfigNodeLeader(status));
×
409
  }
410

411
  @Override
412
  public TDataNodeConfigurationResp getDataNodeConfiguration(int dataNodeId) throws TException {
413
    return executeRemoteCallWithRetry(
×
414
        () -> client.getDataNodeConfiguration(dataNodeId),
×
415
        resp -> !updateConfigNodeLeader(resp.status));
×
416
  }
417

418
  @Override
419
  public TSStatus reportRegionMigrateResult(TRegionMigrateResultReportReq req) throws TException {
420
    return executeRemoteCallWithRetry(
×
421
        () -> client.reportRegionMigrateResult(req), status -> !updateConfigNodeLeader(status));
×
422
  }
423

424
  @Override
425
  public TShowClusterResp showCluster() throws TException {
426
    return executeRemoteCallWithRetry(
×
427
        () -> client.showCluster(), resp -> !updateConfigNodeLeader(resp.status));
×
428
  }
429

430
  @Override
431
  public TShowVariablesResp showVariables() throws TException {
432
    return executeRemoteCallWithRetry(
×
433
        () -> client.showVariables(), resp -> !updateConfigNodeLeader(resp.status));
×
434
  }
435

436
  @Override
437
  public TSStatus setDatabase(TDatabaseSchema databaseSchema) throws TException {
438
    return executeRemoteCallWithRetry(
×
439
        () -> client.setDatabase(databaseSchema), status -> !updateConfigNodeLeader(status));
×
440
  }
441

442
  @Override
443
  public TSStatus alterDatabase(TDatabaseSchema databaseSchema) throws TException {
444
    return executeRemoteCallWithRetry(
×
445
        () -> client.alterDatabase(databaseSchema), status -> !updateConfigNodeLeader(status));
×
446
  }
447

448
  @Override
449
  public TSStatus deleteDatabase(TDeleteDatabaseReq req) throws TException {
450
    return executeRemoteCallWithRetry(
×
451
        () -> client.deleteDatabase(req), status -> !updateConfigNodeLeader(status));
×
452
  }
453

454
  @Override
455
  public TSStatus deleteDatabases(TDeleteDatabasesReq req) throws TException {
456
    return executeRemoteCallWithRetry(
×
457
        () -> client.deleteDatabases(req), status -> !updateConfigNodeLeader(status));
×
458
  }
459

460
  @Override
461
  public TCountDatabaseResp countMatchedDatabases(List<String> storageGroupPathPattern)
462
      throws TException {
463
    return executeRemoteCallWithRetry(
×
464
        () -> client.countMatchedDatabases(storageGroupPathPattern),
×
465
        resp -> !updateConfigNodeLeader(resp.status));
×
466
  }
467

468
  @Override
469
  public TDatabaseSchemaResp getMatchedDatabaseSchemas(List<String> storageGroupPathPattern)
470
      throws TException {
471
    return executeRemoteCallWithRetry(
×
472
        () -> client.getMatchedDatabaseSchemas(storageGroupPathPattern),
×
473
        resp -> !updateConfigNodeLeader(resp.status));
×
474
  }
475

476
  @Override
477
  public TSStatus setTTL(TSetTTLReq setTTLReq) throws TException {
478
    return executeRemoteCallWithRetry(
×
479
        () -> client.setTTL(setTTLReq), status -> !updateConfigNodeLeader(status));
×
480
  }
481

482
  @Override
483
  public TSStatus setSchemaReplicationFactor(TSetSchemaReplicationFactorReq req) throws TException {
484
    return executeRemoteCallWithRetry(
×
485
        () -> client.setSchemaReplicationFactor(req), status -> !updateConfigNodeLeader(status));
×
486
  }
487

488
  @Override
489
  public TSStatus setDataReplicationFactor(TSetDataReplicationFactorReq req) throws TException {
490
    return executeRemoteCallWithRetry(
×
491
        () -> client.setDataReplicationFactor(req), status -> !updateConfigNodeLeader(status));
×
492
  }
493

494
  @Override
495
  public TSStatus setTimePartitionInterval(TSetTimePartitionIntervalReq req) throws TException {
496
    return executeRemoteCallWithRetry(
×
497
        () -> client.setTimePartitionInterval(req), status -> !updateConfigNodeLeader(status));
×
498
  }
499

500
  @Override
501
  public TSchemaPartitionTableResp getSchemaPartitionTable(TSchemaPartitionReq req)
502
      throws TException {
503
    return executeRemoteCallWithRetry(
×
504
        () -> client.getSchemaPartitionTable(req), resp -> !updateConfigNodeLeader(resp.status));
×
505
  }
506

507
  @Override
508
  public TSchemaPartitionTableResp getOrCreateSchemaPartitionTable(TSchemaPartitionReq req)
509
      throws TException {
510
    return executeRemoteCallWithRetry(
×
511
        () -> client.getOrCreateSchemaPartitionTable(req),
×
512
        resp -> !updateConfigNodeLeader(resp.status));
×
513
  }
514

515
  @Override
516
  public TSchemaNodeManagementResp getSchemaNodeManagementPartition(TSchemaNodeManagementReq req)
517
      throws TException {
518
    return executeRemoteCallWithRetry(
×
519
        () -> client.getSchemaNodeManagementPartition(req),
×
520
        resp -> !updateConfigNodeLeader(resp.status));
×
521
  }
522

523
  @Override
524
  public TDataPartitionTableResp getDataPartitionTable(TDataPartitionReq req) throws TException {
525
    return executeRemoteCallWithRetry(
×
526
        () -> client.getDataPartitionTable(req), resp -> !updateConfigNodeLeader(resp.status));
×
527
  }
528

529
  @Override
530
  public TDataPartitionTableResp getOrCreateDataPartitionTable(TDataPartitionReq req)
531
      throws TException {
532
    return executeRemoteCallWithRetry(
×
533
        () -> client.getOrCreateDataPartitionTable(req),
×
534
        resp -> !updateConfigNodeLeader(resp.status));
×
535
  }
536

537
  @Override
538
  public TSStatus operatePermission(TAuthorizerReq req) throws TException {
539
    return executeRemoteCallWithRetry(
×
540
        () -> client.operatePermission(req), status -> !updateConfigNodeLeader(status));
×
541
  }
542

543
  @Override
544
  public TAuthorizerResp queryPermission(TAuthorizerReq req) throws TException {
545
    return executeRemoteCallWithRetry(
×
546
        () -> client.queryPermission(req), resp -> !updateConfigNodeLeader(resp.status));
×
547
  }
548

549
  @Override
550
  public TPermissionInfoResp login(TLoginReq req) throws TException {
551
    return executeRemoteCallWithRetry(
×
552
        () -> client.login(req), resp -> !updateConfigNodeLeader(resp.status));
×
553
  }
554

555
  @Override
556
  public TPermissionInfoResp checkUserPrivileges(TCheckUserPrivilegesReq req) throws TException {
557
    return executeRemoteCallWithRetry(
×
558
        () -> client.checkUserPrivileges(req), resp -> !updateConfigNodeLeader(resp.status));
×
559
  }
560

561
  @Override
562
  public TConfigNodeRegisterResp registerConfigNode(TConfigNodeRegisterReq req) throws TException {
563
    throw new TException("DataNode to ConfigNode client doesn't support registerConfigNode.");
×
564
  }
565

566
  @Override
567
  public TSStatus restartConfigNode(TConfigNodeRestartReq req) throws TException {
568
    throw new TException("DataNode to ConfigNode client doesn't support restartConfigNode.");
×
569
  }
570

571
  @Override
572
  public TSStatus addConsensusGroup(TAddConsensusGroupReq registerResp) throws TException {
573
    throw new TException("DataNode to ConfigNode client doesn't support addConsensusGroup.");
×
574
  }
575

576
  @Override
577
  public TSStatus notifyRegisterSuccess() throws TException {
578
    throw new TException("DataNode to ConfigNode client doesn't support notifyRegisterSuccess.");
×
579
  }
580

581
  @Override
582
  public TSStatus removeConfigNode(TConfigNodeLocation configNodeLocation) throws TException {
583
    throw new TException("DataNode to ConfigNode client doesn't support removeConfigNode.");
×
584
  }
585

586
  @Override
587
  public TSStatus deleteConfigNodePeer(TConfigNodeLocation configNodeLocation) throws TException {
588
    throw new TException("DataNode to ConfigNode client doesn't support removeConsensusGroup.");
×
589
  }
590

591
  @Override
592
  public TSStatus reportConfigNodeShutdown(TConfigNodeLocation configNodeLocation)
593
      throws TException {
594
    throw new TException("DataNode to ConfigNode client doesn't support reportConfigNodeShutdown.");
×
595
  }
596

597
  @Override
598
  public TSStatus stopConfigNode(TConfigNodeLocation configNodeLocation) throws TException {
599
    throw new TException("DataNode to ConfigNode client doesn't support stopConfigNode.");
×
600
  }
601

602
  @Override
603
  public TSStatus merge() throws TException {
604
    return executeRemoteCallWithRetry(
×
605
        () -> client.merge(), status -> !updateConfigNodeLeader(status));
×
606
  }
607

608
  @Override
609
  public TSStatus flush(TFlushReq req) throws TException {
610
    return executeRemoteCallWithRetry(
×
611
        () -> client.flush(req), status -> !updateConfigNodeLeader(status));
×
612
  }
613

614
  @Override
615
  public TSStatus clearCache() throws TException {
616
    return executeRemoteCallWithRetry(
×
617
        () -> client.clearCache(), status -> !updateConfigNodeLeader(status));
×
618
  }
619

620
  @Override
621
  public TSStatus loadConfiguration() throws TException {
622
    return executeRemoteCallWithRetry(
×
623
        () -> client.loadConfiguration(), status -> !updateConfigNodeLeader(status));
×
624
  }
625

626
  @Override
627
  public TSStatus setSystemStatus(String systemStatus) throws TException {
628
    return executeRemoteCallWithRetry(
×
629
        () -> client.setSystemStatus(systemStatus), status -> !updateConfigNodeLeader(status));
×
630
  }
631

632
  @Override
633
  public TSStatus setDataNodeStatus(TSetDataNodeStatusReq req) throws TException {
634
    throw new TException("DataNode to ConfigNode client doesn't support setDataNodeStatus.");
×
635
  }
636

637
  @Override
638
  public TSStatus killQuery(String queryId, int dataNodeId) throws TException {
639
    return executeRemoteCallWithRetry(
×
640
        () -> client.killQuery(queryId, dataNodeId), status -> !updateConfigNodeLeader(status));
×
641
  }
642

643
  @Override
644
  public TGetDataNodeLocationsResp getRunningDataNodeLocations() throws TException {
645
    return executeRemoteCallWithRetry(
×
646
        () -> client.getRunningDataNodeLocations(), resp -> !updateConfigNodeLeader(resp.status));
×
647
  }
648

649
  @Override
650
  public TShowRegionResp showRegion(TShowRegionReq req) throws TException {
651
    return executeRemoteCallWithRetry(
×
652
        () -> client.showRegion(req), resp -> !updateConfigNodeLeader(resp.status));
×
653
  }
654

655
  @Override
656
  public TShowDataNodesResp showDataNodes() throws TException {
657
    return executeRemoteCallWithRetry(
×
658
        () -> client.showDataNodes(), resp -> !updateConfigNodeLeader(resp.status));
×
659
  }
660

661
  @Override
662
  public TShowConfigNodesResp showConfigNodes() throws TException {
663
    return executeRemoteCallWithRetry(
×
664
        () -> client.showConfigNodes(), resp -> !updateConfigNodeLeader(resp.status));
×
665
  }
666

667
  @Override
668
  public TShowDatabaseResp showDatabase(List<String> storageGroupPathPattern) throws TException {
669
    return executeRemoteCallWithRetry(
×
670
        () -> client.showDatabase(storageGroupPathPattern),
×
671
        resp -> !updateConfigNodeLeader(resp.status));
×
672
  }
673

674
  @Override
675
  public TRegionRouteMapResp getLatestRegionRouteMap() throws TException {
676
    return executeRemoteCallWithRetry(
×
677
        () -> client.getLatestRegionRouteMap(), resp -> !updateConfigNodeLeader(resp.status));
×
678
  }
679

680
  @Override
681
  public long getConfigNodeHeartBeat(long timestamp) throws TException {
682
    throw new TException("DataNode to ConfigNode client doesn't support getConfigNodeHeartBeat.");
×
683
  }
684

685
  @Override
686
  public TSStatus createFunction(TCreateFunctionReq req) throws TException {
687
    return executeRemoteCallWithRetry(
×
688
        () -> client.createFunction(req), status -> !updateConfigNodeLeader(status));
×
689
  }
690

691
  @Override
692
  public TSStatus dropFunction(TDropFunctionReq req) throws TException {
693
    return executeRemoteCallWithRetry(
×
694
        () -> client.dropFunction(req), status -> !updateConfigNodeLeader(status));
×
695
  }
696

697
  @Override
698
  public TGetUDFTableResp getUDFTable() throws TException {
699
    return executeRemoteCallWithRetry(
×
700
        () -> client.getUDFTable(), resp -> !updateConfigNodeLeader(resp.status));
×
701
  }
702

703
  @Override
704
  public TGetJarInListResp getUDFJar(TGetJarInListReq req) throws TException {
705
    return executeRemoteCallWithRetry(
×
706
        () -> client.getUDFJar(req), resp -> !updateConfigNodeLeader(resp.status));
×
707
  }
708

709
  @Override
710
  public TSStatus createTrigger(TCreateTriggerReq req) throws TException {
711
    return executeRemoteCallWithRetry(
×
712
        () -> client.createTrigger(req), status -> !updateConfigNodeLeader(status));
×
713
  }
714

715
  @Override
716
  public TSStatus dropTrigger(TDropTriggerReq req) throws TException {
717
    return executeRemoteCallWithRetry(
×
718
        () -> client.dropTrigger(req), status -> !updateConfigNodeLeader(status));
×
719
  }
720

721
  @Override
722
  public TGetLocationForTriggerResp getLocationOfStatefulTrigger(String triggerName)
723
      throws TException {
724
    return executeRemoteCallWithRetry(
×
725
        () -> client.getLocationOfStatefulTrigger(triggerName),
×
726
        resp -> !updateConfigNodeLeader(resp.status));
×
727
  }
728

729
  public TGetTriggerTableResp getTriggerTable() throws TException {
730
    return executeRemoteCallWithRetry(
×
731
        () -> client.getTriggerTable(), resp -> !updateConfigNodeLeader(resp.status));
×
732
  }
733

734
  @Override
735
  public TGetTriggerTableResp getStatefulTriggerTable() throws TException {
736
    return executeRemoteCallWithRetry(
×
737
        () -> client.getStatefulTriggerTable(), resp -> !updateConfigNodeLeader(resp.status));
×
738
  }
739

740
  @Override
741
  public TGetJarInListResp getTriggerJar(TGetJarInListReq req) throws TException {
742
    return executeRemoteCallWithRetry(
×
743
        () -> client.getTriggerJar(req), resp -> !updateConfigNodeLeader(resp.status));
×
744
  }
745

746
  @Override
747
  public TSStatus createPipePlugin(TCreatePipePluginReq req) throws TException {
748
    return executeRemoteCallWithRetry(
×
749
        () -> client.createPipePlugin(req), status -> !updateConfigNodeLeader(status));
×
750
  }
751

752
  @Override
753
  public TSStatus dropPipePlugin(TDropPipePluginReq req) throws TException {
754
    return executeRemoteCallWithRetry(
×
755
        () -> client.dropPipePlugin(req), status -> !updateConfigNodeLeader(status));
×
756
  }
757

758
  @Override
759
  public TGetPipePluginTableResp getPipePluginTable() throws TException {
760
    return executeRemoteCallWithRetry(
×
761
        () -> client.getPipePluginTable(), resp -> !updateConfigNodeLeader(resp.status));
×
762
  }
763

764
  @Override
765
  public TGetJarInListResp getPipePluginJar(TGetJarInListReq req) throws TException {
766
    return executeRemoteCallWithRetry(
×
767
        () -> client.getPipePluginJar(req), resp -> !updateConfigNodeLeader(resp.status));
×
768
  }
769

770
  @Override
771
  public TSStatus createSchemaTemplate(TCreateSchemaTemplateReq req) throws TException {
772
    return executeRemoteCallWithRetry(
×
773
        () -> client.createSchemaTemplate(req), status -> !updateConfigNodeLeader(status));
×
774
  }
775

776
  @Override
777
  public TGetAllTemplatesResp getAllTemplates() throws TException {
778
    return executeRemoteCallWithRetry(
×
779
        () -> client.getAllTemplates(), resp -> !updateConfigNodeLeader(resp.status));
×
780
  }
781

782
  @Override
783
  public TGetTemplateResp getTemplate(String req) throws TException {
784
    return executeRemoteCallWithRetry(
×
785
        () -> client.getTemplate(req), resp -> !updateConfigNodeLeader(resp.status));
×
786
  }
787

788
  @Override
789
  public TSStatus setSchemaTemplate(TSetSchemaTemplateReq req) throws TException {
790
    return executeRemoteCallWithRetry(
×
791
        () -> client.setSchemaTemplate(req), status -> !updateConfigNodeLeader(status));
×
792
  }
793

794
  @Override
795
  public TGetPathsSetTemplatesResp getPathsSetTemplate(String req) throws TException {
796
    return executeRemoteCallWithRetry(
×
797
        () -> client.getPathsSetTemplate(req), resp -> !updateConfigNodeLeader(resp.status));
×
798
  }
799

800
  @Override
801
  public TSStatus deactivateSchemaTemplate(TDeactivateSchemaTemplateReq req) throws TException {
802
    return executeRemoteCallWithRetry(
×
803
        () -> client.deactivateSchemaTemplate(req), status -> !updateConfigNodeLeader(status));
×
804
  }
805

806
  @Override
807
  public TSStatus unsetSchemaTemplate(TUnsetSchemaTemplateReq req) throws TException {
808
    return executeRemoteCallWithRetry(
×
809
        () -> client.unsetSchemaTemplate(req), status -> !updateConfigNodeLeader(status));
×
810
  }
811

812
  @Override
813
  public TSStatus dropSchemaTemplate(String req) throws TException {
814
    return executeRemoteCallWithRetry(
×
815
        () -> client.dropSchemaTemplate(req), status -> !updateConfigNodeLeader(status));
×
816
  }
817

818
  @Override
819
  public TSStatus alterSchemaTemplate(TAlterSchemaTemplateReq req) throws TException {
820
    return executeRemoteCallWithRetry(
×
821
        () -> client.alterSchemaTemplate(req), status -> !updateConfigNodeLeader(status));
×
822
  }
823

824
  @Override
825
  public TSStatus deleteTimeSeries(TDeleteTimeSeriesReq req) throws TException {
826
    return executeRemoteCallWithRetry(
×
827
        () -> client.deleteTimeSeries(req), status -> !updateConfigNodeLeader(status));
×
828
  }
829

830
  @Override
831
  public TSStatus deleteLogicalView(TDeleteLogicalViewReq req) throws TException {
832
    return executeRemoteCallWithRetry(
×
833
        () -> client.deleteLogicalView(req), status -> !updateConfigNodeLeader(status));
×
834
  }
835

836
  @Override
837
  public TSStatus alterLogicalView(TAlterLogicalViewReq req) throws TException {
838
    return executeRemoteCallWithRetry(
×
839
        () -> client.alterLogicalView(req), status -> !updateConfigNodeLeader(status));
×
840
  }
841

842
  @Override
843
  public TSStatus createPipeSink(TPipeSinkInfo req) throws TException {
844
    return executeRemoteCallWithRetry(
×
845
        () -> client.createPipeSink(req), status -> !updateConfigNodeLeader(status));
×
846
  }
847

848
  @Override
849
  public TSStatus dropPipeSink(TDropPipeSinkReq req) throws TException {
850
    return executeRemoteCallWithRetry(
×
851
        () -> client.dropPipeSink(req), status -> !updateConfigNodeLeader(status));
×
852
  }
853

854
  @Override
855
  public TGetPipeSinkResp getPipeSink(TGetPipeSinkReq req) throws TException {
856
    return executeRemoteCallWithRetry(
×
857
        () -> client.getPipeSink(req), resp -> !updateConfigNodeLeader(resp.status));
×
858
  }
859

860
  @Override
861
  public TSStatus createPipe(TCreatePipeReq req) throws TException {
862
    return executeRemoteCallWithRetry(
×
863
        () -> client.createPipe(req), status -> !updateConfigNodeLeader(status));
×
864
  }
865

866
  @Override
867
  public TSStatus startPipe(String pipeName) throws TException {
868
    return executeRemoteCallWithRetry(
×
869
        () -> client.startPipe(pipeName), status -> !updateConfigNodeLeader(status));
×
870
  }
871

872
  @Override
873
  public TSStatus stopPipe(String pipeName) throws TException {
874
    return executeRemoteCallWithRetry(
×
875
        () -> client.stopPipe(pipeName), status -> !updateConfigNodeLeader(status));
×
876
  }
877

878
  @Override
879
  public TSStatus dropPipe(String pipeName) throws TException {
880
    return executeRemoteCallWithRetry(
×
881
        () -> client.dropPipe(pipeName), status -> !updateConfigNodeLeader(status));
×
882
  }
883

884
  @Override
885
  public TShowPipeResp showPipe(TShowPipeReq req) throws TException {
886
    return executeRemoteCallWithRetry(
×
887
        () -> client.showPipe(req), resp -> !updateConfigNodeLeader(resp.status));
×
888
  }
889

890
  @Override
891
  public TGetAllPipeInfoResp getAllPipeInfo() throws TException {
892
    return executeRemoteCallWithRetry(
×
893
        () -> client.getAllPipeInfo(), resp -> !updateConfigNodeLeader(resp.status));
×
894
  }
895

896
  @Override
897
  public TGetRegionIdResp getRegionId(TGetRegionIdReq req) throws TException {
898
    return executeRemoteCallWithRetry(
×
899
        () -> client.getRegionId(req), resp -> !updateConfigNodeLeader(resp.status));
×
900
  }
901

902
  @Override
903
  public TGetTimeSlotListResp getTimeSlotList(TGetTimeSlotListReq req) throws TException {
904
    return executeRemoteCallWithRetry(
×
905
        () -> client.getTimeSlotList(req), resp -> !updateConfigNodeLeader(resp.status));
×
906
  }
907

908
  public TCountTimeSlotListResp countTimeSlotList(TCountTimeSlotListReq req) throws TException {
909
    return executeRemoteCallWithRetry(
×
910
        () -> client.countTimeSlotList(req), resp -> !updateConfigNodeLeader(resp.status));
×
911
  }
912

913
  @Override
914
  public TGetSeriesSlotListResp getSeriesSlotList(TGetSeriesSlotListReq req) throws TException {
915
    return executeRemoteCallWithRetry(
×
916
        () -> client.getSeriesSlotList(req), resp -> !updateConfigNodeLeader(resp.status));
×
917
  }
918

919
  @Override
920
  public TSStatus migrateRegion(TMigrateRegionReq req) throws TException {
921
    return executeRemoteCallWithRetry(
×
922
        () -> client.migrateRegion(req), status -> !updateConfigNodeLeader(status));
×
923
  }
924

925
  @Override
926
  public TSStatus createCQ(TCreateCQReq req) throws TException {
927
    return executeRemoteCallWithRetry(
×
928
        () -> client.createCQ(req), status -> !updateConfigNodeLeader(status));
×
929
  }
930

931
  @Override
932
  public TSStatus dropCQ(TDropCQReq req) throws TException {
933
    return executeRemoteCallWithRetry(
×
934
        () -> client.dropCQ(req), status -> !updateConfigNodeLeader(status));
×
935
  }
936

937
  @Override
938
  public TShowCQResp showCQ() throws TException {
939
    return executeRemoteCallWithRetry(
×
940
        () -> client.showCQ(), resp -> !updateConfigNodeLeader(resp.status));
×
941
  }
942

943
  @Override
944
  public TSStatus createModel(TCreateModelReq req) throws TException {
945
    return executeRemoteCallWithRetry(
×
946
        () -> client.createModel(req), status -> !updateConfigNodeLeader(status));
×
947
  }
948

949
  @Override
950
  public TSStatus dropModel(TDropModelReq req) throws TException {
951
    return executeRemoteCallWithRetry(
×
952
        () -> client.dropModel(req), status -> !updateConfigNodeLeader(status));
×
953
  }
954

955
  @Override
956
  public TShowModelResp showModel(TShowModelReq req) throws TException {
957
    return executeRemoteCallWithRetry(
×
958
        () -> client.showModel(req), resp -> !updateConfigNodeLeader(resp.status));
×
959
  }
960

961
  @Override
962
  public TShowTrailResp showTrail(TShowTrailReq req) throws TException {
963
    return executeRemoteCallWithRetry(
×
964
        () -> client.showTrail(req), resp -> !updateConfigNodeLeader(resp.status));
×
965
  }
966

967
  @Override
968
  public TSStatus updateModelInfo(TUpdateModelInfoReq req) throws TException {
969
    throw new TException(new UnsupportedOperationException().getCause());
×
970
  }
971

972
  @Override
973
  public TSStatus updateModelState(TUpdateModelStateReq req) throws TException {
974
    throw new TException(new UnsupportedOperationException().getCause());
×
975
  }
976

977
  @Override
978
  public TSStatus setSpaceQuota(TSetSpaceQuotaReq req) throws TException {
979
    return executeRemoteCallWithRetry(
×
980
        () -> client.setSpaceQuota(req), status -> !updateConfigNodeLeader(status));
×
981
  }
982

983
  @Override
984
  public TSpaceQuotaResp showSpaceQuota(List<String> databases) throws TException {
985
    return executeRemoteCallWithRetry(
×
986
        () -> client.showSpaceQuota(databases), resp -> !updateConfigNodeLeader(resp.status));
×
987
  }
988

989
  @Override
990
  public TSpaceQuotaResp getSpaceQuota() throws TException {
991
    return executeRemoteCallWithRetry(
×
992
        () -> client.getSpaceQuota(), resp -> !updateConfigNodeLeader(resp.status));
×
993
  }
994

995
  @Override
996
  public TSStatus setThrottleQuota(TSetThrottleQuotaReq req) throws TException {
997
    return executeRemoteCallWithRetry(
×
998
        () -> client.setThrottleQuota(req), status -> !updateConfigNodeLeader(status));
×
999
  }
1000

1001
  @Override
1002
  public TThrottleQuotaResp showThrottleQuota(TShowThrottleReq req) throws TException {
1003
    return executeRemoteCallWithRetry(
×
1004
        () -> client.showThrottleQuota(req), resp -> !updateConfigNodeLeader(resp.status));
×
1005
  }
1006

1007
  @Override
1008
  public TThrottleQuotaResp getThrottleQuota() throws TException {
1009
    return executeRemoteCallWithRetry(
×
1010
        () -> client.getThrottleQuota(), resp -> !updateConfigNodeLeader(resp.status));
×
1011
  }
1012

1013
  public static class Factory extends ThriftClientFactory<ConfigRegionId, ConfigNodeClient> {
1014

1015
    public Factory(
1016
        ClientManager<ConfigRegionId, ConfigNodeClient> clientManager,
1017
        ThriftClientProperty thriftClientProperty) {
1018
      super(clientManager, thriftClientProperty);
1✔
1019
    }
1✔
1020

1021
    @Override
1022
    public void destroyObject(
1023
        ConfigRegionId configRegionId, PooledObject<ConfigNodeClient> pooledObject) {
1024
      pooledObject.getObject().invalidate();
×
1025
    }
×
1026

1027
    @Override
1028
    public PooledObject<ConfigNodeClient> makeObject(ConfigRegionId configRegionId)
1029
        throws Exception {
1030
      return new DefaultPooledObject<>(
1✔
1031
          SyncThriftClientWithErrorHandler.newErrorHandler(
×
1032
              ConfigNodeClient.class,
1033
              ConfigNodeClient.class.getConstructor(
1✔
1034
                  List.class, thriftClientProperty.getClass(), clientManager.getClass()),
1✔
1035
              ConfigNodeInfo.getInstance().getLatestConfigNodes(),
1✔
1036
              thriftClientProperty,
1037
              clientManager));
1038
    }
1039

1040
    @Override
1041
    public boolean validateObject(
1042
        ConfigRegionId configRegionId, PooledObject<ConfigNodeClient> pooledObject) {
1043
      return Optional.ofNullable(pooledObject.getObject().getTransport())
×
1044
          .map(TTransport::isOpen)
×
1045
          .orElse(false);
×
1046
    }
1047
  }
1048
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc