• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

apache / iotdb / #9972

31 Aug 2023 07:24AM UTC coverage: 47.709% (+0.02%) from 47.685%
#9972

push

travis_ci

web-flow
[To rel/1.2] fix wal npe when memTable has flushed. (#10900)

7 of 7 new or added lines in 1 file covered. (100.0%)

80186 of 168074 relevant lines covered (47.71%)

0.48 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

7.74
/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/client/ConfigNodeClient.java
1
/*
2
 * Licensed to the Apache Software Foundation (ASF) under one
3
 * or more contributor license agreements.  See the NOTICE file
4
 * distributed with this work for additional information
5
 * regarding copyright ownership.  The ASF licenses this file
6
 * to you under the Apache License, Version 2.0 (the
7
 * "License"); you may not use this file except in compliance
8
 * with the License.  You may obtain a copy of the License at
9
 *
10
 *     http://www.apache.org/licenses/LICENSE-2.0
11
 *
12
 * Unless required by applicable law or agreed to in writing,
13
 * software distributed under the License is distributed on an
14
 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15
 * KIND, either express or implied.  See the License for the
16
 * specific language governing permissions and limitations
17
 * under the License.
18
 */
19

20
package org.apache.iotdb.db.protocol.client;
21

22
import org.apache.iotdb.common.rpc.thrift.TConfigNodeLocation;
23
import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation;
24
import org.apache.iotdb.common.rpc.thrift.TEndPoint;
25
import org.apache.iotdb.common.rpc.thrift.TFlushReq;
26
import org.apache.iotdb.common.rpc.thrift.TSStatus;
27
import org.apache.iotdb.common.rpc.thrift.TSetSpaceQuotaReq;
28
import org.apache.iotdb.common.rpc.thrift.TSetTTLReq;
29
import org.apache.iotdb.common.rpc.thrift.TSetThrottleQuotaReq;
30
import org.apache.iotdb.commons.client.ClientManager;
31
import org.apache.iotdb.commons.client.ThriftClient;
32
import org.apache.iotdb.commons.client.factory.ThriftClientFactory;
33
import org.apache.iotdb.commons.client.property.ThriftClientProperty;
34
import org.apache.iotdb.commons.client.sync.SyncThriftClientWithErrorHandler;
35
import org.apache.iotdb.commons.consensus.ConfigRegionId;
36
import org.apache.iotdb.confignode.rpc.thrift.IConfigNodeRPCService;
37
import org.apache.iotdb.confignode.rpc.thrift.TAddConsensusGroupReq;
38
import org.apache.iotdb.confignode.rpc.thrift.TAlterLogicalViewReq;
39
import org.apache.iotdb.confignode.rpc.thrift.TAlterSchemaTemplateReq;
40
import org.apache.iotdb.confignode.rpc.thrift.TAuthorizerReq;
41
import org.apache.iotdb.confignode.rpc.thrift.TAuthorizerResp;
42
import org.apache.iotdb.confignode.rpc.thrift.TCheckUserPrivilegesReq;
43
import org.apache.iotdb.confignode.rpc.thrift.TConfigNodeRegisterReq;
44
import org.apache.iotdb.confignode.rpc.thrift.TConfigNodeRegisterResp;
45
import org.apache.iotdb.confignode.rpc.thrift.TCountDatabaseResp;
46
import org.apache.iotdb.confignode.rpc.thrift.TCountTimeSlotListReq;
47
import org.apache.iotdb.confignode.rpc.thrift.TCountTimeSlotListResp;
48
import org.apache.iotdb.confignode.rpc.thrift.TCreateCQReq;
49
import org.apache.iotdb.confignode.rpc.thrift.TCreateFunctionReq;
50
import org.apache.iotdb.confignode.rpc.thrift.TCreateModelReq;
51
import org.apache.iotdb.confignode.rpc.thrift.TCreatePipePluginReq;
52
import org.apache.iotdb.confignode.rpc.thrift.TCreatePipeReq;
53
import org.apache.iotdb.confignode.rpc.thrift.TCreateSchemaTemplateReq;
54
import org.apache.iotdb.confignode.rpc.thrift.TCreateTriggerReq;
55
import org.apache.iotdb.confignode.rpc.thrift.TDataNodeConfigurationResp;
56
import org.apache.iotdb.confignode.rpc.thrift.TDataNodeRegisterReq;
57
import org.apache.iotdb.confignode.rpc.thrift.TDataNodeRegisterResp;
58
import org.apache.iotdb.confignode.rpc.thrift.TDataNodeRemoveReq;
59
import org.apache.iotdb.confignode.rpc.thrift.TDataNodeRemoveResp;
60
import org.apache.iotdb.confignode.rpc.thrift.TDataNodeRestartReq;
61
import org.apache.iotdb.confignode.rpc.thrift.TDataNodeRestartResp;
62
import org.apache.iotdb.confignode.rpc.thrift.TDataPartitionReq;
63
import org.apache.iotdb.confignode.rpc.thrift.TDataPartitionTableResp;
64
import org.apache.iotdb.confignode.rpc.thrift.TDatabaseSchema;
65
import org.apache.iotdb.confignode.rpc.thrift.TDatabaseSchemaResp;
66
import org.apache.iotdb.confignode.rpc.thrift.TDeactivateSchemaTemplateReq;
67
import org.apache.iotdb.confignode.rpc.thrift.TDeleteDatabaseReq;
68
import org.apache.iotdb.confignode.rpc.thrift.TDeleteDatabasesReq;
69
import org.apache.iotdb.confignode.rpc.thrift.TDeleteLogicalViewReq;
70
import org.apache.iotdb.confignode.rpc.thrift.TDeleteTimeSeriesReq;
71
import org.apache.iotdb.confignode.rpc.thrift.TDropCQReq;
72
import org.apache.iotdb.confignode.rpc.thrift.TDropFunctionReq;
73
import org.apache.iotdb.confignode.rpc.thrift.TDropModelReq;
74
import org.apache.iotdb.confignode.rpc.thrift.TDropPipePluginReq;
75
import org.apache.iotdb.confignode.rpc.thrift.TDropPipeSinkReq;
76
import org.apache.iotdb.confignode.rpc.thrift.TDropTriggerReq;
77
import org.apache.iotdb.confignode.rpc.thrift.TGetAllPipeInfoResp;
78
import org.apache.iotdb.confignode.rpc.thrift.TGetAllTemplatesResp;
79
import org.apache.iotdb.confignode.rpc.thrift.TGetDataNodeLocationsResp;
80
import org.apache.iotdb.confignode.rpc.thrift.TGetJarInListReq;
81
import org.apache.iotdb.confignode.rpc.thrift.TGetJarInListResp;
82
import org.apache.iotdb.confignode.rpc.thrift.TGetLocationForTriggerResp;
83
import org.apache.iotdb.confignode.rpc.thrift.TGetPathsSetTemplatesResp;
84
import org.apache.iotdb.confignode.rpc.thrift.TGetPipePluginTableResp;
85
import org.apache.iotdb.confignode.rpc.thrift.TGetPipeSinkReq;
86
import org.apache.iotdb.confignode.rpc.thrift.TGetPipeSinkResp;
87
import org.apache.iotdb.confignode.rpc.thrift.TGetRegionIdReq;
88
import org.apache.iotdb.confignode.rpc.thrift.TGetRegionIdResp;
89
import org.apache.iotdb.confignode.rpc.thrift.TGetSeriesSlotListReq;
90
import org.apache.iotdb.confignode.rpc.thrift.TGetSeriesSlotListResp;
91
import org.apache.iotdb.confignode.rpc.thrift.TGetTemplateResp;
92
import org.apache.iotdb.confignode.rpc.thrift.TGetTimeSlotListReq;
93
import org.apache.iotdb.confignode.rpc.thrift.TGetTimeSlotListResp;
94
import org.apache.iotdb.confignode.rpc.thrift.TGetTriggerTableResp;
95
import org.apache.iotdb.confignode.rpc.thrift.TGetUDFTableResp;
96
import org.apache.iotdb.confignode.rpc.thrift.TLoginReq;
97
import org.apache.iotdb.confignode.rpc.thrift.TMigrateRegionReq;
98
import org.apache.iotdb.confignode.rpc.thrift.TPermissionInfoResp;
99
import org.apache.iotdb.confignode.rpc.thrift.TPipeSinkInfo;
100
import org.apache.iotdb.confignode.rpc.thrift.TRegionMigrateResultReportReq;
101
import org.apache.iotdb.confignode.rpc.thrift.TRegionRouteMapResp;
102
import org.apache.iotdb.confignode.rpc.thrift.TSchemaNodeManagementReq;
103
import org.apache.iotdb.confignode.rpc.thrift.TSchemaNodeManagementResp;
104
import org.apache.iotdb.confignode.rpc.thrift.TSchemaPartitionReq;
105
import org.apache.iotdb.confignode.rpc.thrift.TSchemaPartitionTableResp;
106
import org.apache.iotdb.confignode.rpc.thrift.TSetDataNodeStatusReq;
107
import org.apache.iotdb.confignode.rpc.thrift.TSetDataReplicationFactorReq;
108
import org.apache.iotdb.confignode.rpc.thrift.TSetSchemaReplicationFactorReq;
109
import org.apache.iotdb.confignode.rpc.thrift.TSetSchemaTemplateReq;
110
import org.apache.iotdb.confignode.rpc.thrift.TSetTimePartitionIntervalReq;
111
import org.apache.iotdb.confignode.rpc.thrift.TShowCQResp;
112
import org.apache.iotdb.confignode.rpc.thrift.TShowClusterResp;
113
import org.apache.iotdb.confignode.rpc.thrift.TShowConfigNodesResp;
114
import org.apache.iotdb.confignode.rpc.thrift.TShowDataNodesResp;
115
import org.apache.iotdb.confignode.rpc.thrift.TShowDatabaseResp;
116
import org.apache.iotdb.confignode.rpc.thrift.TShowModelReq;
117
import org.apache.iotdb.confignode.rpc.thrift.TShowModelResp;
118
import org.apache.iotdb.confignode.rpc.thrift.TShowPipeReq;
119
import org.apache.iotdb.confignode.rpc.thrift.TShowPipeResp;
120
import org.apache.iotdb.confignode.rpc.thrift.TShowRegionReq;
121
import org.apache.iotdb.confignode.rpc.thrift.TShowRegionResp;
122
import org.apache.iotdb.confignode.rpc.thrift.TShowThrottleReq;
123
import org.apache.iotdb.confignode.rpc.thrift.TShowTrailReq;
124
import org.apache.iotdb.confignode.rpc.thrift.TShowTrailResp;
125
import org.apache.iotdb.confignode.rpc.thrift.TShowVariablesResp;
126
import org.apache.iotdb.confignode.rpc.thrift.TSpaceQuotaResp;
127
import org.apache.iotdb.confignode.rpc.thrift.TSystemConfigurationResp;
128
import org.apache.iotdb.confignode.rpc.thrift.TThrottleQuotaResp;
129
import org.apache.iotdb.confignode.rpc.thrift.TUnsetSchemaTemplateReq;
130
import org.apache.iotdb.confignode.rpc.thrift.TUpdateModelInfoReq;
131
import org.apache.iotdb.confignode.rpc.thrift.TUpdateModelStateReq;
132
import org.apache.iotdb.db.conf.IoTDBConfig;
133
import org.apache.iotdb.db.conf.IoTDBDescriptor;
134
import org.apache.iotdb.rpc.RpcTransportFactory;
135
import org.apache.iotdb.rpc.TSStatusCode;
136

137
import org.apache.commons.pool2.PooledObject;
138
import org.apache.commons.pool2.impl.DefaultPooledObject;
139
import org.apache.thrift.TException;
140
import org.apache.thrift.transport.TTransport;
141
import org.apache.thrift.transport.TTransportException;
142
import org.slf4j.Logger;
143
import org.slf4j.LoggerFactory;
144

145
import java.util.ArrayList;
146
import java.util.List;
147
import java.util.Optional;
148
import java.util.function.Predicate;
149

150
public class ConfigNodeClient implements IConfigNodeRPCService.Iface, ThriftClient, AutoCloseable {
151

152
  private static final Logger logger = LoggerFactory.getLogger(ConfigNodeClient.class);
1✔
153

154
  private static final int RETRY_NUM = 5;
155

156
  public static final String MSG_RECONNECTION_FAIL =
157
      "Fail to connect to any config node. Please check status of ConfigNodes or logs of connected DataNode";
158

159
  private static final String MSG_RECONNECTION_DATANODE_FAIL =
160
      "Failed to connect to ConfigNode %s from DataNode %s when executing %s, Exception:";
161
  private static final int RETRY_INTERVAL_MS = 2000;
162

163
  private final ThriftClientProperty property;
164

165
  private IConfigNodeRPCService.Iface client;
166

167
  private TTransport transport;
168

169
  private TEndPoint configLeader;
170

171
  private List<TEndPoint> configNodes;
172

173
  private TEndPoint configNode;
174

175
  private int cursor = 0;
1✔
176

177
  private final IoTDBConfig config = IoTDBDescriptor.getInstance().getConfig();
1✔
178

179
  ClientManager<ConfigRegionId, ConfigNodeClient> clientManager;
180

181
  ConfigRegionId configRegionId = ConfigNodeInfo.CONFIG_REGION_ID;
1✔
182

183
  public ConfigNodeClient(
184
      List<TEndPoint> configNodes,
185
      ThriftClientProperty property,
186
      ClientManager<ConfigRegionId, ConfigNodeClient> clientManager)
187
      throws TException {
1✔
188
    this.configNodes = configNodes;
1✔
189
    this.property = property;
1✔
190
    this.clientManager = clientManager;
1✔
191

192
    init();
×
193
  }
×
194

195
  public void init() throws TException {
196
    try {
197
      tryToConnect();
×
198
    } catch (TException e) {
1✔
199
      // Can not connect to each config node
200
      syncLatestConfigNodeList();
1✔
201
      tryToConnect();
×
202
    }
×
203
  }
×
204

205
  public void connect(TEndPoint endpoint) throws TException {
206
    try {
207
      transport =
×
208
          RpcTransportFactory.INSTANCE.getTransport(
×
209
              // As there is a try-catch already, we do not need to use TSocket.wrap
210
              endpoint.getIp(), endpoint.getPort(), property.getConnectionTimeoutMs());
×
211
      if (!transport.isOpen()) {
×
212
        transport.open();
×
213
      }
214
      configNode = endpoint;
×
215
    } catch (TTransportException e) {
×
216
      throw new TException(e);
×
217
    }
×
218

219
    client = new IConfigNodeRPCService.Client(property.getProtocolFactory().getProtocol(transport));
×
220
  }
×
221

222
  private void waitAndReconnect() throws TException {
223
    try {
224
      // Wait to start the next try
225
      Thread.sleep(RETRY_INTERVAL_MS);
×
226
    } catch (InterruptedException e) {
×
227
      Thread.currentThread().interrupt();
×
228
      throw new TException(
×
229
          "Unexpected interruption when waiting to retry to connect to ConfigNode");
230
    }
×
231

232
    try {
233
      tryToConnect();
×
234
    } catch (TException e) {
×
235
      // Can not connect to each config node
236
      syncLatestConfigNodeList();
×
237
      tryToConnect();
×
238
    }
×
239
  }
×
240

241
  private void tryToConnect() throws TException {
242
    if (configLeader != null) {
1✔
243
      try {
244
        connect(configLeader);
×
245
        return;
×
246
      } catch (TException e) {
×
247
        logger.warn("The current node may have been down {},try next node", configLeader);
×
248
        configLeader = null;
×
249
      }
250
    }
251

252
    if (transport != null) {
1✔
253
      transport.close();
×
254
    }
255

256
    for (int tryHostNum = 0; tryHostNum < configNodes.size(); tryHostNum++) {
1✔
257
      cursor = (cursor + 1) % configNodes.size();
×
258
      TEndPoint tryEndpoint = configNodes.get(cursor);
×
259

260
      try {
261
        connect(tryEndpoint);
×
262
        return;
×
263
      } catch (TException e) {
×
264
        logger.warn("The current node may have been down {},try next node", tryEndpoint);
×
265
      }
266
    }
267

268
    throw new TException(MSG_RECONNECTION_FAIL);
1✔
269
  }
270

271
  public TTransport getTransport() {
272
    return transport;
×
273
  }
274

275
  public void syncLatestConfigNodeList() {
276
    configNodes = ConfigNodeInfo.getInstance().getLatestConfigNodes();
1✔
277
    cursor = 0;
1✔
278
  }
1✔
279

280
  @Override
281
  public void close() {
282
    clientManager.returnClient(configRegionId, this);
×
283
  }
×
284

285
  @Override
286
  public void invalidate() {
287
    Optional.ofNullable(transport).ifPresent(TTransport::close);
1✔
288
  }
1✔
289

290
  @Override
291
  public void invalidateAll() {
292
    clientManager.clear(ConfigNodeInfo.CONFIG_REGION_ID);
×
293
  }
×
294

295
  @Override
296
  public boolean printLogWhenEncounterException() {
297
    return property.isPrintLogWhenEncounterException();
×
298
  }
299

300
  private boolean updateConfigNodeLeader(TSStatus status) {
301
    if (status.getCode() == TSStatusCode.REDIRECTION_RECOMMEND.getStatusCode()) {
×
302
      if (status.isSetRedirectNode()) {
×
303
        configLeader =
×
304
            new TEndPoint(status.getRedirectNode().getIp(), status.getRedirectNode().getPort());
×
305
      } else {
306
        configLeader = null;
×
307
      }
308
      logger.warn(
×
309
          "Failed to connect to ConfigNode {} from DataNode {}, because the current node is not "
310
              + "leader or not ready yet, will try again later",
311
          configNode,
312
          config.getAddressAndPort());
×
313
      return true;
×
314
    }
315
    return false;
×
316
  }
317

318
  /**
319
   * The frame of execute RPC, include logic of retry and exception handling.
320
   *
321
   * @param call which rpc should call
322
   * @param check check the rpc's result
323
   * @return rpc's result
324
   * @param <T> the type of rpc result
325
   * @throws TException if fails more than RETRY_NUM times, throw TException(MSG_RECONNECTION_FAIL)
326
   */
327
  private <T> T executeRemoteCallWithRetry(Operation<T> call, Predicate<T> check)
328
      throws TException {
329
    for (int i = 0; i < RETRY_NUM; i++) {
×
330
      try {
331
        T result = call.execute();
×
332
        if (check.test(result)) {
×
333
          return result;
×
334
        }
335
      } catch (TException e) {
×
336
        String message =
×
337
            String.format(
×
338
                MSG_RECONNECTION_DATANODE_FAIL,
339
                configNode,
340
                config.getAddressAndPort(),
×
341
                Thread.currentThread().getStackTrace()[2].getMethodName());
×
342
        logger.warn(message, e);
×
343
        configLeader = null;
×
344
      }
×
345
      waitAndReconnect();
×
346
    }
347
    throw new TException(MSG_RECONNECTION_FAIL);
×
348
  }
349

350
  @FunctionalInterface
351
  private interface Operation<T> {
352
    T execute() throws TException;
353
  }
354

355
  @Override
356
  public TSystemConfigurationResp getSystemConfiguration() throws TException {
357
    return executeRemoteCallWithRetry(
×
358
        () -> client.getSystemConfiguration(), resp -> !updateConfigNodeLeader(resp.status));
×
359
  }
360

361
  @Override
362
  public TDataNodeRegisterResp registerDataNode(TDataNodeRegisterReq req) throws TException {
363
    for (int i = 0; i < RETRY_NUM; i++) {
×
364
      try {
365
        TDataNodeRegisterResp resp = client.registerDataNode(req);
×
366

367
        if (!updateConfigNodeLeader(resp.status)) {
×
368
          return resp;
×
369
        }
370

371
        // set latest config node list
372
        List<TEndPoint> newConfigNodes = new ArrayList<>();
×
373
        for (TConfigNodeLocation configNodeLocation : resp.getConfigNodeList()) {
×
374
          newConfigNodes.add(configNodeLocation.getInternalEndPoint());
×
375
        }
×
376
        configNodes = newConfigNodes;
×
377
      } catch (TException e) {
×
378
        String message =
×
379
            String.format(
×
380
                MSG_RECONNECTION_DATANODE_FAIL,
381
                configNode,
382
                config.getAddressAndPort(),
×
383
                Thread.currentThread().getStackTrace()[1].getMethodName());
×
384
        logger.warn(message, e);
×
385
        configLeader = null;
×
386
      }
×
387
      waitAndReconnect();
×
388
    }
389
    throw new TException(MSG_RECONNECTION_FAIL);
×
390
  }
391

392
  @Override
393
  public TDataNodeRestartResp restartDataNode(TDataNodeRestartReq req) throws TException {
394
    return executeRemoteCallWithRetry(
×
395
        () -> client.restartDataNode(req), resp -> !updateConfigNodeLeader(resp.status));
×
396
  }
397

398
  @Override
399
  public TDataNodeRemoveResp removeDataNode(TDataNodeRemoveReq req) throws TException {
400
    return executeRemoteCallWithRetry(
×
401
        () -> client.removeDataNode(req), resp -> !updateConfigNodeLeader(resp.status));
×
402
  }
403

404
  @Override
405
  public TSStatus reportDataNodeShutdown(TDataNodeLocation dataNodeLocation) throws TException {
406
    return executeRemoteCallWithRetry(
×
407
        () -> client.reportDataNodeShutdown(dataNodeLocation),
×
408
        status -> !updateConfigNodeLeader(status));
×
409
  }
410

411
  @Override
412
  public TDataNodeConfigurationResp getDataNodeConfiguration(int dataNodeId) throws TException {
413
    return executeRemoteCallWithRetry(
×
414
        () -> client.getDataNodeConfiguration(dataNodeId),
×
415
        resp -> !updateConfigNodeLeader(resp.status));
×
416
  }
417

418
  @Override
419
  public TSStatus reportRegionMigrateResult(TRegionMigrateResultReportReq req) throws TException {
420
    return executeRemoteCallWithRetry(
×
421
        () -> client.reportRegionMigrateResult(req), status -> !updateConfigNodeLeader(status));
×
422
  }
423

424
  @Override
425
  public TShowClusterResp showCluster() throws TException {
426
    return executeRemoteCallWithRetry(
×
427
        () -> client.showCluster(), resp -> !updateConfigNodeLeader(resp.status));
×
428
  }
429

430
  @Override
431
  public TShowVariablesResp showVariables() throws TException {
432
    return executeRemoteCallWithRetry(
×
433
        () -> client.showVariables(), resp -> !updateConfigNodeLeader(resp.status));
×
434
  }
435

436
  @Override
437
  public TSStatus setDatabase(TDatabaseSchema databaseSchema) throws TException {
438
    return executeRemoteCallWithRetry(
×
439
        () -> client.setDatabase(databaseSchema), status -> !updateConfigNodeLeader(status));
×
440
  }
441

442
  @Override
443
  public TSStatus alterDatabase(TDatabaseSchema databaseSchema) throws TException {
444
    return executeRemoteCallWithRetry(
×
445
        () -> client.alterDatabase(databaseSchema), status -> !updateConfigNodeLeader(status));
×
446
  }
447

448
  @Override
449
  public TSStatus deleteDatabase(TDeleteDatabaseReq req) throws TException {
450
    return executeRemoteCallWithRetry(
×
451
        () -> client.deleteDatabase(req), status -> !updateConfigNodeLeader(status));
×
452
  }
453

454
  @Override
455
  public TSStatus deleteDatabases(TDeleteDatabasesReq req) throws TException {
456
    return executeRemoteCallWithRetry(
×
457
        () -> client.deleteDatabases(req), status -> !updateConfigNodeLeader(status));
×
458
  }
459

460
  @Override
461
  public TCountDatabaseResp countMatchedDatabases(List<String> storageGroupPathPattern)
462
      throws TException {
463
    return executeRemoteCallWithRetry(
×
464
        () -> client.countMatchedDatabases(storageGroupPathPattern),
×
465
        resp -> !updateConfigNodeLeader(resp.status));
×
466
  }
467

468
  @Override
469
  public TDatabaseSchemaResp getMatchedDatabaseSchemas(List<String> storageGroupPathPattern)
470
      throws TException {
471
    return executeRemoteCallWithRetry(
×
472
        () -> client.getMatchedDatabaseSchemas(storageGroupPathPattern),
×
473
        resp -> !updateConfigNodeLeader(resp.status));
×
474
  }
475

476
  @Override
477
  public TSStatus setTTL(TSetTTLReq setTTLReq) throws TException {
478
    return executeRemoteCallWithRetry(
×
479
        () -> client.setTTL(setTTLReq), status -> !updateConfigNodeLeader(status));
×
480
  }
481

482
  @Override
483
  public TSStatus setSchemaReplicationFactor(TSetSchemaReplicationFactorReq req) throws TException {
484
    return executeRemoteCallWithRetry(
×
485
        () -> client.setSchemaReplicationFactor(req), status -> !updateConfigNodeLeader(status));
×
486
  }
487

488
  @Override
489
  public TSStatus setDataReplicationFactor(TSetDataReplicationFactorReq req) throws TException {
490
    return executeRemoteCallWithRetry(
×
491
        () -> client.setDataReplicationFactor(req), status -> !updateConfigNodeLeader(status));
×
492
  }
493

494
  @Override
495
  public TSStatus setTimePartitionInterval(TSetTimePartitionIntervalReq req) throws TException {
496
    return executeRemoteCallWithRetry(
×
497
        () -> client.setTimePartitionInterval(req), status -> !updateConfigNodeLeader(status));
×
498
  }
499

500
  @Override
501
  public TSchemaPartitionTableResp getSchemaPartitionTable(TSchemaPartitionReq req)
502
      throws TException {
503
    return executeRemoteCallWithRetry(
×
504
        () -> client.getSchemaPartitionTable(req), resp -> !updateConfigNodeLeader(resp.status));
×
505
  }
506

507
  @Override
508
  public TSchemaPartitionTableResp getOrCreateSchemaPartitionTable(TSchemaPartitionReq req)
509
      throws TException {
510
    return executeRemoteCallWithRetry(
×
511
        () -> client.getOrCreateSchemaPartitionTable(req),
×
512
        resp -> !updateConfigNodeLeader(resp.status));
×
513
  }
514

515
  @Override
516
  public TSchemaNodeManagementResp getSchemaNodeManagementPartition(TSchemaNodeManagementReq req)
517
      throws TException {
518
    return executeRemoteCallWithRetry(
×
519
        () -> client.getSchemaNodeManagementPartition(req),
×
520
        resp -> !updateConfigNodeLeader(resp.status));
×
521
  }
522

523
  @Override
524
  public TDataPartitionTableResp getDataPartitionTable(TDataPartitionReq req) throws TException {
525
    return executeRemoteCallWithRetry(
×
526
        () -> client.getDataPartitionTable(req), resp -> !updateConfigNodeLeader(resp.status));
×
527
  }
528

529
  @Override
530
  public TDataPartitionTableResp getOrCreateDataPartitionTable(TDataPartitionReq req)
531
      throws TException {
532
    return executeRemoteCallWithRetry(
×
533
        () -> client.getOrCreateDataPartitionTable(req),
×
534
        resp -> !updateConfigNodeLeader(resp.status));
×
535
  }
536

537
  @Override
538
  public TSStatus operatePermission(TAuthorizerReq req) throws TException {
539
    return executeRemoteCallWithRetry(
×
540
        () -> client.operatePermission(req), status -> !updateConfigNodeLeader(status));
×
541
  }
542

543
  @Override
544
  public TAuthorizerResp queryPermission(TAuthorizerReq req) throws TException {
545
    return executeRemoteCallWithRetry(
×
546
        () -> client.queryPermission(req), resp -> !updateConfigNodeLeader(resp.status));
×
547
  }
548

549
  @Override
550
  public TPermissionInfoResp login(TLoginReq req) throws TException {
551
    return executeRemoteCallWithRetry(
×
552
        () -> client.login(req), resp -> !updateConfigNodeLeader(resp.status));
×
553
  }
554

555
  @Override
556
  public TPermissionInfoResp checkUserPrivileges(TCheckUserPrivilegesReq req) throws TException {
557
    return executeRemoteCallWithRetry(
×
558
        () -> client.checkUserPrivileges(req), resp -> !updateConfigNodeLeader(resp.status));
×
559
  }
560

561
  @Override
562
  public TConfigNodeRegisterResp registerConfigNode(TConfigNodeRegisterReq req) throws TException {
563
    throw new TException("DataNode to ConfigNode client doesn't support registerConfigNode.");
×
564
  }
565

566
  @Override
567
  public TSStatus addConsensusGroup(TAddConsensusGroupReq registerResp) throws TException {
568
    throw new TException("DataNode to ConfigNode client doesn't support addConsensusGroup.");
×
569
  }
570

571
  @Override
572
  public TSStatus notifyRegisterSuccess() throws TException {
573
    throw new TException("DataNode to ConfigNode client doesn't support notifyRegisterSuccess.");
×
574
  }
575

576
  @Override
577
  public TSStatus removeConfigNode(TConfigNodeLocation configNodeLocation) throws TException {
578
    throw new TException("DataNode to ConfigNode client doesn't support removeConfigNode.");
×
579
  }
580

581
  @Override
582
  public TSStatus deleteConfigNodePeer(TConfigNodeLocation configNodeLocation) throws TException {
583
    throw new TException("DataNode to ConfigNode client doesn't support removeConsensusGroup.");
×
584
  }
585

586
  @Override
587
  public TSStatus reportConfigNodeShutdown(TConfigNodeLocation configNodeLocation)
588
      throws TException {
589
    throw new TException("DataNode to ConfigNode client doesn't support reportConfigNodeShutdown.");
×
590
  }
591

592
  @Override
593
  public TSStatus stopConfigNode(TConfigNodeLocation configNodeLocation) throws TException {
594
    throw new TException("DataNode to ConfigNode client doesn't support stopConfigNode.");
×
595
  }
596

597
  @Override
598
  public TSStatus merge() throws TException {
599
    return executeRemoteCallWithRetry(
×
600
        () -> client.merge(), status -> !updateConfigNodeLeader(status));
×
601
  }
602

603
  @Override
604
  public TSStatus flush(TFlushReq req) throws TException {
605
    return executeRemoteCallWithRetry(
×
606
        () -> client.flush(req), status -> !updateConfigNodeLeader(status));
×
607
  }
608

609
  @Override
610
  public TSStatus clearCache() throws TException {
611
    return executeRemoteCallWithRetry(
×
612
        () -> client.clearCache(), status -> !updateConfigNodeLeader(status));
×
613
  }
614

615
  @Override
616
  public TSStatus loadConfiguration() throws TException {
617
    return executeRemoteCallWithRetry(
×
618
        () -> client.loadConfiguration(), status -> !updateConfigNodeLeader(status));
×
619
  }
620

621
  @Override
622
  public TSStatus setSystemStatus(String systemStatus) throws TException {
623
    return executeRemoteCallWithRetry(
×
624
        () -> client.setSystemStatus(systemStatus), status -> !updateConfigNodeLeader(status));
×
625
  }
626

627
  @Override
628
  public TSStatus setDataNodeStatus(TSetDataNodeStatusReq req) throws TException {
629
    throw new TException("DataNode to ConfigNode client doesn't support setDataNodeStatus.");
×
630
  }
631

632
  @Override
633
  public TSStatus killQuery(String queryId, int dataNodeId) throws TException {
634
    return executeRemoteCallWithRetry(
×
635
        () -> client.killQuery(queryId, dataNodeId), status -> !updateConfigNodeLeader(status));
×
636
  }
637

638
  @Override
639
  public TGetDataNodeLocationsResp getRunningDataNodeLocations() throws TException {
640
    return executeRemoteCallWithRetry(
×
641
        () -> client.getRunningDataNodeLocations(), resp -> !updateConfigNodeLeader(resp.status));
×
642
  }
643

644
  @Override
645
  public TShowRegionResp showRegion(TShowRegionReq req) throws TException {
646
    return executeRemoteCallWithRetry(
×
647
        () -> client.showRegion(req), resp -> !updateConfigNodeLeader(resp.status));
×
648
  }
649

650
  @Override
651
  public TShowDataNodesResp showDataNodes() throws TException {
652
    return executeRemoteCallWithRetry(
×
653
        () -> client.showDataNodes(), resp -> !updateConfigNodeLeader(resp.status));
×
654
  }
655

656
  @Override
657
  public TShowConfigNodesResp showConfigNodes() throws TException {
658
    return executeRemoteCallWithRetry(
×
659
        () -> client.showConfigNodes(), resp -> !updateConfigNodeLeader(resp.status));
×
660
  }
661

662
  @Override
663
  public TShowDatabaseResp showDatabase(List<String> storageGroupPathPattern) throws TException {
664
    return executeRemoteCallWithRetry(
×
665
        () -> client.showDatabase(storageGroupPathPattern),
×
666
        resp -> !updateConfigNodeLeader(resp.status));
×
667
  }
668

669
  @Override
670
  public TRegionRouteMapResp getLatestRegionRouteMap() throws TException {
671
    return executeRemoteCallWithRetry(
×
672
        () -> client.getLatestRegionRouteMap(), resp -> !updateConfigNodeLeader(resp.status));
×
673
  }
674

675
  @Override
676
  public long getConfigNodeHeartBeat(long timestamp) throws TException {
677
    throw new TException("DataNode to ConfigNode client doesn't support getConfigNodeHeartBeat.");
×
678
  }
679

680
  @Override
681
  public TSStatus createFunction(TCreateFunctionReq req) throws TException {
682
    return executeRemoteCallWithRetry(
×
683
        () -> client.createFunction(req), status -> !updateConfigNodeLeader(status));
×
684
  }
685

686
  @Override
687
  public TSStatus dropFunction(TDropFunctionReq req) throws TException {
688
    return executeRemoteCallWithRetry(
×
689
        () -> client.dropFunction(req), status -> !updateConfigNodeLeader(status));
×
690
  }
691

692
  @Override
693
  public TGetUDFTableResp getUDFTable() throws TException {
694
    return executeRemoteCallWithRetry(
×
695
        () -> client.getUDFTable(), resp -> !updateConfigNodeLeader(resp.status));
×
696
  }
697

698
  @Override
699
  public TGetJarInListResp getUDFJar(TGetJarInListReq req) throws TException {
700
    return executeRemoteCallWithRetry(
×
701
        () -> client.getUDFJar(req), resp -> !updateConfigNodeLeader(resp.status));
×
702
  }
703

704
  @Override
705
  public TSStatus createTrigger(TCreateTriggerReq req) throws TException {
706
    return executeRemoteCallWithRetry(
×
707
        () -> client.createTrigger(req), status -> !updateConfigNodeLeader(status));
×
708
  }
709

710
  @Override
711
  public TSStatus dropTrigger(TDropTriggerReq req) throws TException {
712
    return executeRemoteCallWithRetry(
×
713
        () -> client.dropTrigger(req), status -> !updateConfigNodeLeader(status));
×
714
  }
715

716
  @Override
717
  public TGetLocationForTriggerResp getLocationOfStatefulTrigger(String triggerName)
718
      throws TException {
719
    return executeRemoteCallWithRetry(
×
720
        () -> client.getLocationOfStatefulTrigger(triggerName),
×
721
        resp -> !updateConfigNodeLeader(resp.status));
×
722
  }
723

724
  public TGetTriggerTableResp getTriggerTable() throws TException {
725
    return executeRemoteCallWithRetry(
×
726
        () -> client.getTriggerTable(), resp -> !updateConfigNodeLeader(resp.status));
×
727
  }
728

729
  @Override
730
  public TGetTriggerTableResp getStatefulTriggerTable() throws TException {
731
    return executeRemoteCallWithRetry(
×
732
        () -> client.getStatefulTriggerTable(), resp -> !updateConfigNodeLeader(resp.status));
×
733
  }
734

735
  @Override
736
  public TGetJarInListResp getTriggerJar(TGetJarInListReq req) throws TException {
737
    return executeRemoteCallWithRetry(
×
738
        () -> client.getTriggerJar(req), resp -> !updateConfigNodeLeader(resp.status));
×
739
  }
740

741
  @Override
742
  public TSStatus createPipePlugin(TCreatePipePluginReq req) throws TException {
743
    return executeRemoteCallWithRetry(
×
744
        () -> client.createPipePlugin(req), status -> !updateConfigNodeLeader(status));
×
745
  }
746

747
  @Override
748
  public TSStatus dropPipePlugin(TDropPipePluginReq req) throws TException {
749
    return executeRemoteCallWithRetry(
×
750
        () -> client.dropPipePlugin(req), status -> !updateConfigNodeLeader(status));
×
751
  }
752

753
  @Override
754
  public TGetPipePluginTableResp getPipePluginTable() throws TException {
755
    return executeRemoteCallWithRetry(
×
756
        () -> client.getPipePluginTable(), resp -> !updateConfigNodeLeader(resp.status));
×
757
  }
758

759
  @Override
760
  public TGetJarInListResp getPipePluginJar(TGetJarInListReq req) throws TException {
761
    return executeRemoteCallWithRetry(
×
762
        () -> client.getPipePluginJar(req), resp -> !updateConfigNodeLeader(resp.status));
×
763
  }
764

765
  @Override
766
  public TSStatus createSchemaTemplate(TCreateSchemaTemplateReq req) throws TException {
767
    return executeRemoteCallWithRetry(
×
768
        () -> client.createSchemaTemplate(req), status -> !updateConfigNodeLeader(status));
×
769
  }
770

771
  @Override
772
  public TGetAllTemplatesResp getAllTemplates() throws TException {
773
    return executeRemoteCallWithRetry(
×
774
        () -> client.getAllTemplates(), resp -> !updateConfigNodeLeader(resp.status));
×
775
  }
776

777
  @Override
778
  public TGetTemplateResp getTemplate(String req) throws TException {
779
    return executeRemoteCallWithRetry(
×
780
        () -> client.getTemplate(req), resp -> !updateConfigNodeLeader(resp.status));
×
781
  }
782

783
  @Override
784
  public TSStatus setSchemaTemplate(TSetSchemaTemplateReq req) throws TException {
785
    return executeRemoteCallWithRetry(
×
786
        () -> client.setSchemaTemplate(req), status -> !updateConfigNodeLeader(status));
×
787
  }
788

789
  @Override
790
  public TGetPathsSetTemplatesResp getPathsSetTemplate(String req) throws TException {
791
    return executeRemoteCallWithRetry(
×
792
        () -> client.getPathsSetTemplate(req), resp -> !updateConfigNodeLeader(resp.status));
×
793
  }
794

795
  @Override
796
  public TSStatus deactivateSchemaTemplate(TDeactivateSchemaTemplateReq req) throws TException {
797
    return executeRemoteCallWithRetry(
×
798
        () -> client.deactivateSchemaTemplate(req), status -> !updateConfigNodeLeader(status));
×
799
  }
800

801
  @Override
802
  public TSStatus unsetSchemaTemplate(TUnsetSchemaTemplateReq req) throws TException {
803
    return executeRemoteCallWithRetry(
×
804
        () -> client.unsetSchemaTemplate(req), status -> !updateConfigNodeLeader(status));
×
805
  }
806

807
  @Override
808
  public TSStatus dropSchemaTemplate(String req) throws TException {
809
    return executeRemoteCallWithRetry(
×
810
        () -> client.dropSchemaTemplate(req), status -> !updateConfigNodeLeader(status));
×
811
  }
812

813
  @Override
814
  public TSStatus alterSchemaTemplate(TAlterSchemaTemplateReq req) throws TException {
815
    return executeRemoteCallWithRetry(
×
816
        () -> client.alterSchemaTemplate(req), status -> !updateConfigNodeLeader(status));
×
817
  }
818

819
  @Override
820
  public TSStatus deleteTimeSeries(TDeleteTimeSeriesReq req) throws TException {
821
    return executeRemoteCallWithRetry(
×
822
        () -> client.deleteTimeSeries(req), status -> !updateConfigNodeLeader(status));
×
823
  }
824

825
  @Override
826
  public TSStatus deleteLogicalView(TDeleteLogicalViewReq req) throws TException {
827
    return executeRemoteCallWithRetry(
×
828
        () -> client.deleteLogicalView(req), status -> !updateConfigNodeLeader(status));
×
829
  }
830

831
  @Override
832
  public TSStatus alterLogicalView(TAlterLogicalViewReq req) throws TException {
833
    return executeRemoteCallWithRetry(
×
834
        () -> client.alterLogicalView(req), status -> !updateConfigNodeLeader(status));
×
835
  }
836

837
  @Override
838
  public TSStatus createPipeSink(TPipeSinkInfo req) throws TException {
839
    return executeRemoteCallWithRetry(
×
840
        () -> client.createPipeSink(req), status -> !updateConfigNodeLeader(status));
×
841
  }
842

843
  @Override
844
  public TSStatus dropPipeSink(TDropPipeSinkReq req) throws TException {
845
    return executeRemoteCallWithRetry(
×
846
        () -> client.dropPipeSink(req), status -> !updateConfigNodeLeader(status));
×
847
  }
848

849
  @Override
850
  public TGetPipeSinkResp getPipeSink(TGetPipeSinkReq req) throws TException {
851
    return executeRemoteCallWithRetry(
×
852
        () -> client.getPipeSink(req), resp -> !updateConfigNodeLeader(resp.status));
×
853
  }
854

855
  @Override
856
  public TSStatus createPipe(TCreatePipeReq req) throws TException {
857
    return executeRemoteCallWithRetry(
×
858
        () -> client.createPipe(req), status -> !updateConfigNodeLeader(status));
×
859
  }
860

861
  @Override
862
  public TSStatus startPipe(String pipeName) throws TException {
863
    return executeRemoteCallWithRetry(
×
864
        () -> client.startPipe(pipeName), status -> !updateConfigNodeLeader(status));
×
865
  }
866

867
  @Override
868
  public TSStatus stopPipe(String pipeName) throws TException {
869
    return executeRemoteCallWithRetry(
×
870
        () -> client.stopPipe(pipeName), status -> !updateConfigNodeLeader(status));
×
871
  }
872

873
  @Override
874
  public TSStatus dropPipe(String pipeName) throws TException {
875
    return executeRemoteCallWithRetry(
×
876
        () -> client.dropPipe(pipeName), status -> !updateConfigNodeLeader(status));
×
877
  }
878

879
  @Override
880
  public TShowPipeResp showPipe(TShowPipeReq req) throws TException {
881
    return executeRemoteCallWithRetry(
×
882
        () -> client.showPipe(req), resp -> !updateConfigNodeLeader(resp.status));
×
883
  }
884

885
  @Override
886
  public TGetAllPipeInfoResp getAllPipeInfo() throws TException {
887
    return executeRemoteCallWithRetry(
×
888
        () -> client.getAllPipeInfo(), resp -> !updateConfigNodeLeader(resp.status));
×
889
  }
890

891
  @Override
892
  public TGetRegionIdResp getRegionId(TGetRegionIdReq req) throws TException {
893
    return executeRemoteCallWithRetry(
×
894
        () -> client.getRegionId(req), resp -> !updateConfigNodeLeader(resp.status));
×
895
  }
896

897
  @Override
898
  public TGetTimeSlotListResp getTimeSlotList(TGetTimeSlotListReq req) throws TException {
899
    return executeRemoteCallWithRetry(
×
900
        () -> client.getTimeSlotList(req), resp -> !updateConfigNodeLeader(resp.status));
×
901
  }
902

903
  public TCountTimeSlotListResp countTimeSlotList(TCountTimeSlotListReq req) throws TException {
904
    return executeRemoteCallWithRetry(
×
905
        () -> client.countTimeSlotList(req), resp -> !updateConfigNodeLeader(resp.status));
×
906
  }
907

908
  @Override
909
  public TGetSeriesSlotListResp getSeriesSlotList(TGetSeriesSlotListReq req) throws TException {
910
    return executeRemoteCallWithRetry(
×
911
        () -> client.getSeriesSlotList(req), resp -> !updateConfigNodeLeader(resp.status));
×
912
  }
913

914
  @Override
915
  public TSStatus migrateRegion(TMigrateRegionReq req) throws TException {
916
    return executeRemoteCallWithRetry(
×
917
        () -> client.migrateRegion(req), status -> !updateConfigNodeLeader(status));
×
918
  }
919

920
  @Override
921
  public TSStatus createCQ(TCreateCQReq req) throws TException {
922
    return executeRemoteCallWithRetry(
×
923
        () -> client.createCQ(req), status -> !updateConfigNodeLeader(status));
×
924
  }
925

926
  @Override
927
  public TSStatus dropCQ(TDropCQReq req) throws TException {
928
    return executeRemoteCallWithRetry(
×
929
        () -> client.dropCQ(req), status -> !updateConfigNodeLeader(status));
×
930
  }
931

932
  @Override
933
  public TShowCQResp showCQ() throws TException {
934
    return executeRemoteCallWithRetry(
×
935
        () -> client.showCQ(), resp -> !updateConfigNodeLeader(resp.status));
×
936
  }
937

938
  @Override
939
  public TSStatus createModel(TCreateModelReq req) throws TException {
940
    return executeRemoteCallWithRetry(
×
941
        () -> client.createModel(req), status -> !updateConfigNodeLeader(status));
×
942
  }
943

944
  @Override
945
  public TSStatus dropModel(TDropModelReq req) throws TException {
946
    return executeRemoteCallWithRetry(
×
947
        () -> client.dropModel(req), status -> !updateConfigNodeLeader(status));
×
948
  }
949

950
  @Override
951
  public TShowModelResp showModel(TShowModelReq req) throws TException {
952
    return executeRemoteCallWithRetry(
×
953
        () -> client.showModel(req), resp -> !updateConfigNodeLeader(resp.status));
×
954
  }
955

956
  @Override
957
  public TShowTrailResp showTrail(TShowTrailReq req) throws TException {
958
    return executeRemoteCallWithRetry(
×
959
        () -> client.showTrail(req), resp -> !updateConfigNodeLeader(resp.status));
×
960
  }
961

962
  @Override
963
  public TSStatus updateModelInfo(TUpdateModelInfoReq req) throws TException {
964
    throw new TException(new UnsupportedOperationException().getCause());
×
965
  }
966

967
  @Override
968
  public TSStatus updateModelState(TUpdateModelStateReq req) throws TException {
969
    throw new TException(new UnsupportedOperationException().getCause());
×
970
  }
971

972
  @Override
973
  public TSStatus setSpaceQuota(TSetSpaceQuotaReq req) throws TException {
974
    return executeRemoteCallWithRetry(
×
975
        () -> client.setSpaceQuota(req), status -> !updateConfigNodeLeader(status));
×
976
  }
977

978
  @Override
979
  public TSpaceQuotaResp showSpaceQuota(List<String> databases) throws TException {
980
    return executeRemoteCallWithRetry(
×
981
        () -> client.showSpaceQuota(databases), resp -> !updateConfigNodeLeader(resp.status));
×
982
  }
983

984
  @Override
985
  public TSpaceQuotaResp getSpaceQuota() throws TException {
986
    return executeRemoteCallWithRetry(
×
987
        () -> client.getSpaceQuota(), resp -> !updateConfigNodeLeader(resp.status));
×
988
  }
989

990
  @Override
991
  public TSStatus setThrottleQuota(TSetThrottleQuotaReq req) throws TException {
992
    return executeRemoteCallWithRetry(
×
993
        () -> client.setThrottleQuota(req), status -> !updateConfigNodeLeader(status));
×
994
  }
995

996
  @Override
997
  public TThrottleQuotaResp showThrottleQuota(TShowThrottleReq req) throws TException {
998
    return executeRemoteCallWithRetry(
×
999
        () -> client.showThrottleQuota(req), resp -> !updateConfigNodeLeader(resp.status));
×
1000
  }
1001

1002
  @Override
1003
  public TThrottleQuotaResp getThrottleQuota() throws TException {
1004
    return executeRemoteCallWithRetry(
×
1005
        () -> client.getThrottleQuota(), resp -> !updateConfigNodeLeader(resp.status));
×
1006
  }
1007

1008
  public static class Factory extends ThriftClientFactory<ConfigRegionId, ConfigNodeClient> {
1009

1010
    public Factory(
1011
        ClientManager<ConfigRegionId, ConfigNodeClient> clientManager,
1012
        ThriftClientProperty thriftClientProperty) {
1013
      super(clientManager, thriftClientProperty);
1✔
1014
    }
1✔
1015

1016
    @Override
1017
    public void destroyObject(
1018
        ConfigRegionId configRegionId, PooledObject<ConfigNodeClient> pooledObject) {
1019
      pooledObject.getObject().invalidate();
×
1020
    }
×
1021

1022
    @Override
1023
    public PooledObject<ConfigNodeClient> makeObject(ConfigRegionId configRegionId)
1024
        throws Exception {
1025
      return new DefaultPooledObject<>(
1✔
1026
          SyncThriftClientWithErrorHandler.newErrorHandler(
×
1027
              ConfigNodeClient.class,
1028
              ConfigNodeClient.class.getConstructor(
1✔
1029
                  List.class, thriftClientProperty.getClass(), clientManager.getClass()),
1✔
1030
              ConfigNodeInfo.getInstance().getLatestConfigNodes(),
1✔
1031
              thriftClientProperty,
1032
              clientManager));
1033
    }
1034

1035
    @Override
1036
    public boolean validateObject(
1037
        ConfigRegionId configRegionId, PooledObject<ConfigNodeClient> pooledObject) {
1038
      return Optional.ofNullable(pooledObject.getObject().getTransport())
×
1039
          .map(TTransport::isOpen)
×
1040
          .orElse(false);
×
1041
    }
1042
  }
1043
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc