• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

apache / iotdb / #9708

pending completion
#9708

push

travis_ci

web-flow
[To rel/1.2] Enhance warning messages for ConfigNodeClient (#10722)

Signed-off-by: OneSizeFitQuorum <tanxinyu@apache.org>

1 of 1 new or added line in 1 file covered. (100.0%)

79235 of 165170 relevant lines covered (47.97%)

0.48 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

6.37
/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/service/ConfigNode.java
1
/*
2
 * Licensed to the Apache Software Foundation (ASF) under one
3
 * or more contributor license agreements.  See the NOTICE file
4
 * distributed with this work for additional information
5
 * regarding copyright ownership.  The ASF licenses this file
6
 * to you under the Apache License, Version 2.0 (the
7
 * "License"); you may not use this file except in compliance
8
 * with the License.  You may obtain a copy of the License at
9
 *
10
 *     http://www.apache.org/licenses/LICENSE-2.0
11
 *
12
 * Unless required by applicable law or agreed to in writing,
13
 * software distributed under the License is distributed on an
14
 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15
 * KIND, either express or implied.  See the License for the
16
 * specific language governing permissions and limitations
17
 * under the License.
18
 */
19

20
package org.apache.iotdb.confignode.service;
21

22
import org.apache.iotdb.common.rpc.thrift.TConfigNodeLocation;
23
import org.apache.iotdb.common.rpc.thrift.TEndPoint;
24
import org.apache.iotdb.common.rpc.thrift.TSStatus;
25
import org.apache.iotdb.commons.client.ClientManagerMetrics;
26
import org.apache.iotdb.commons.concurrent.ThreadModule;
27
import org.apache.iotdb.commons.concurrent.ThreadName;
28
import org.apache.iotdb.commons.concurrent.ThreadPoolMetrics;
29
import org.apache.iotdb.commons.conf.IoTDBConstant;
30
import org.apache.iotdb.commons.exception.StartupException;
31
import org.apache.iotdb.commons.service.JMXService;
32
import org.apache.iotdb.commons.service.RegisterManager;
33
import org.apache.iotdb.commons.service.metric.JvmGcMonitorMetrics;
34
import org.apache.iotdb.commons.service.metric.MetricService;
35
import org.apache.iotdb.commons.utils.TestOnly;
36
import org.apache.iotdb.confignode.client.ConfigNodeRequestType;
37
import org.apache.iotdb.confignode.client.sync.SyncConfigNodeClientPool;
38
import org.apache.iotdb.confignode.conf.ConfigNodeConfig;
39
import org.apache.iotdb.confignode.conf.ConfigNodeConstant;
40
import org.apache.iotdb.confignode.conf.ConfigNodeDescriptor;
41
import org.apache.iotdb.confignode.conf.SystemPropertiesUtils;
42
import org.apache.iotdb.confignode.manager.ConfigManager;
43
import org.apache.iotdb.confignode.rpc.thrift.TConfigNodeRegisterReq;
44
import org.apache.iotdb.confignode.rpc.thrift.TConfigNodeRegisterResp;
45
import org.apache.iotdb.confignode.rpc.thrift.TConfigNodeRestartReq;
46
import org.apache.iotdb.confignode.service.thrift.ConfigNodeRPCService;
47
import org.apache.iotdb.confignode.service.thrift.ConfigNodeRPCServiceProcessor;
48
import org.apache.iotdb.db.service.metrics.ProcessMetrics;
49
import org.apache.iotdb.metrics.config.MetricConfigDescriptor;
50
import org.apache.iotdb.metrics.metricsets.UpTimeMetrics;
51
import org.apache.iotdb.metrics.metricsets.cpu.CpuUsageMetrics;
52
import org.apache.iotdb.metrics.metricsets.disk.DiskMetrics;
53
import org.apache.iotdb.metrics.metricsets.jvm.JvmMetrics;
54
import org.apache.iotdb.metrics.metricsets.logback.LogbackMetrics;
55
import org.apache.iotdb.metrics.metricsets.net.NetMetrics;
56
import org.apache.iotdb.metrics.metricsets.system.SystemMetrics;
57
import org.apache.iotdb.rpc.TSStatusCode;
58

59
import org.slf4j.Logger;
60
import org.slf4j.LoggerFactory;
61

62
import java.io.File;
63
import java.io.IOException;
64
import java.nio.charset.Charset;
65
import java.util.ArrayList;
66
import java.util.Arrays;
67
import java.util.List;
68
import java.util.concurrent.TimeUnit;
69

70
public class ConfigNode implements ConfigNodeMBean {
71

72
  private static final Logger LOGGER = LoggerFactory.getLogger(ConfigNode.class);
1✔
73

74
  private static final ConfigNodeConfig CONF = ConfigNodeDescriptor.getInstance().getConf();
1✔
75

76
  private static final int STARTUP_RETRY_NUM = 10;
77
  private static final int SCHEDULE_WAITING_RETRY_NUM = 20;
78
  private static final long STARTUP_RETRY_INTERVAL_IN_MS = TimeUnit.SECONDS.toMillis(3);
1✔
79

80
  private static final int SEED_CONFIG_NODE_ID = 0;
81

82
  private static final int INIT_NON_SEED_CONFIG_NODE_ID = -1;
83

84
  private static final String CONFIGURATION = "IoTDB configuration: {}";
85

86
  private final String mbeanName =
1✔
87
      String.format(
1✔
88
          "%s:%s=%s",
89
          ConfigNodeConstant.CONFIGNODE_PACKAGE, ConfigNodeConstant.JMX_TYPE, "ConfigNode");
90
  private final RegisterManager registerManager = new RegisterManager();
1✔
91

92
  private ConfigManager configManager;
93

94
  private ConfigNode() {
1✔
95
    // We do not init anything here, so that we can re-initialize the instance in IT.
96
  }
1✔
97

98
  public static void main(String[] args) {
99
    LOGGER.info(
×
100
        "{} environment variables: {}",
101
        ConfigNodeConstant.GLOBAL_NAME,
102
        ConfigNodeConfig.getEnvironmentVariables());
×
103
    LOGGER.info(
×
104
        "{} default charset is: {}",
105
        ConfigNodeConstant.GLOBAL_NAME,
106
        Charset.defaultCharset().displayName());
×
107
    new ConfigNodeCommandLine().doMain(args);
×
108
  }
×
109

110
  public void active() {
111
    LOGGER.info("Activating {}...", ConfigNodeConstant.GLOBAL_NAME);
×
112

113
    try {
114
      processPid();
×
115
      // Add shutdown hook
116
      Runtime.getRuntime().addShutdownHook(new ConfigNodeShutdownHook());
×
117
      // Set up internal services
118
      setUpInternalServices();
×
119
      // Init ConfigManager
120
      initConfigManager();
×
121

122
      /* Restart */
123
      if (SystemPropertiesUtils.isRestarted()) {
×
124
        LOGGER.info("{} is in restarting process...", ConfigNodeConstant.GLOBAL_NAME);
×
125

126
        int configNodeId;
127
        if (!SystemPropertiesUtils.isSeedConfigNode()) {
×
128
          // The non-seed-ConfigNodes should send restart request and be checked (ip and port) by
129
          // leader before initConsensusManager
130
          sendRestartConfigNodeRequest();
×
131
          configNodeId = CONF.getConfigNodeId();
×
132
        } else {
133
          configNodeId = SEED_CONFIG_NODE_ID;
×
134
        }
135
        configManager.initConsensusManager();
×
136

137
        setUpMetricService();
×
138
        // Notice: We always set up Seed-ConfigNode's RPC service lastly to ensure
139
        // that the external service is not provided until ConfigNode is fully available
140
        setUpRPCService();
×
141
        LOGGER.info(CONFIGURATION, CONF.getConfigMessage());
×
142
        LOGGER.info(
×
143
            "{} has successfully restarted and joined the cluster: {}.",
144
            ConfigNodeConstant.GLOBAL_NAME,
145
            CONF.getClusterName());
×
146

147
        // Update item during restart
148
        // This will always be executed until the consensus write succeeds
149
        while (true) {
150
          TSStatus status =
×
151
              configManager
152
                  .getNodeManager()
×
153
                  .updateConfigNodeIfNecessary(configNodeId, IoTDBConstant.BUILD_INFO);
×
154
          if (status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
×
155
            break;
×
156
          } else {
157
            startUpSleep("restart ConfigNode failed! ");
×
158
          }
159
        }
×
160
        return;
×
161
      }
162

163
      /* Initial startup of Seed-ConfigNode */
164
      if (ConfigNodeDescriptor.getInstance().isSeedConfigNode()) {
×
165
        LOGGER.info(
×
166
            "The current {} is now starting as the Seed-ConfigNode.",
167
            ConfigNodeConstant.GLOBAL_NAME);
168

169
        /* Always set ClusterId and ConfigNodeId before initConsensusManager */
170
        CONF.setConfigNodeId(SEED_CONFIG_NODE_ID);
×
171
        configManager.initConsensusManager();
×
172

173
        // Persistence system parameters after the consensusGroup is built,
174
        // or the consensusGroup will not be initialized successfully otherwise.
175
        SystemPropertiesUtils.storeSystemParameters();
×
176

177
        // Seed-ConfigNode should apply itself when first start
178
        configManager
×
179
            .getNodeManager()
×
180
            .applyConfigNode(
×
181
                generateConfigNodeLocation(SEED_CONFIG_NODE_ID), IoTDBConstant.BUILD_INFO);
×
182
        setUpMetricService();
×
183
        // Notice: We always set up Seed-ConfigNode's RPC service lastly to ensure
184
        // that the external service is not provided until Seed-ConfigNode is fully initialized
185
        setUpRPCService();
×
186
        // The initial startup of Seed-ConfigNode finished
187
        LOGGER.info(CONFIGURATION, CONF.getConfigMessage());
×
188
        LOGGER.info(
×
189
            "{} has successfully started and joined the cluster: {}.",
190
            ConfigNodeConstant.GLOBAL_NAME,
191
            CONF.getClusterName());
×
192
        return;
×
193
      }
194

195
      /* Initial startup of Non-Seed-ConfigNode */
196
      // We set up Non-Seed ConfigNode's RPC service before sending the register request
197
      // in order to facilitate the scheduling of capacity expansion process in ConfigNode-leader
198
      setUpRPCService();
×
199
      sendRegisterConfigNodeRequest();
×
200
      // The initial startup of Non-Seed-ConfigNode is not yet finished,
201
      // we should wait for leader's scheduling
202
      LOGGER.info(CONFIGURATION, CONF.getConfigMessage());
×
203
      LOGGER.info(
×
204
          "{} {} has registered successfully. Waiting for the leader's scheduling to join the cluster: {}.",
205
          ConfigNodeConstant.GLOBAL_NAME,
206
          CONF.getConfigNodeId(),
×
207
          CONF.getClusterName());
×
208
      setUpMetricService();
×
209

210
      boolean isJoinedCluster = false;
×
211
      for (int retry = 0; retry < SCHEDULE_WAITING_RETRY_NUM; retry++) {
×
212
        if (!configManager
×
213
            .getConsensusManager()
×
214
            .getConsensusImpl()
×
215
            .getAllConsensusGroupIds()
×
216
            .isEmpty()) {
×
217
          isJoinedCluster = true;
×
218
          break;
×
219
        }
220
        startUpSleep("Waiting leader's scheduling is interrupted.");
×
221
      }
222

223
      if (!isJoinedCluster) {
×
224
        LOGGER.error(
×
225
            "The current ConfigNode can't joined the cluster because leader's scheduling failed. The possible cause is that the ip:port configuration is incorrect.");
226
        stop();
×
227
      }
228
    } catch (StartupException | IOException e) {
×
229
      LOGGER.error("Meet error while starting up.", e);
×
230
      stop();
×
231
    }
×
232
  }
×
233

234
  void processPid() {
235
    String pidFile = System.getProperty(ConfigNodeConstant.IOTDB_PIDFILE);
×
236
    if (pidFile != null) {
×
237
      new File(pidFile).deleteOnExit();
×
238
    }
239
  }
×
240

241
  private void setUpInternalServices() throws StartupException {
242
    // Setup JMXService
243
    registerManager.register(new JMXService());
×
244
    JMXService.registerMBean(this, mbeanName);
×
245

246
    LOGGER.info("Successfully setup internal services.");
×
247
  }
×
248

249
  private void setUpMetricService() throws StartupException {
250
    MetricConfigDescriptor.getInstance().getMetricConfig().setNodeId(CONF.getConfigNodeId());
×
251
    registerManager.register(MetricService.getInstance());
×
252
    // Bind predefined metric sets
253
    MetricService.getInstance().addMetricSet(new UpTimeMetrics());
×
254
    MetricService.getInstance().addMetricSet(new JvmMetrics());
×
255
    MetricService.getInstance().addMetricSet(new LogbackMetrics());
×
256
    MetricService.getInstance().addMetricSet(new ProcessMetrics());
×
257
    MetricService.getInstance().addMetricSet(new DiskMetrics(IoTDBConstant.CN_ROLE));
×
258
    MetricService.getInstance().addMetricSet(new NetMetrics(IoTDBConstant.CN_ROLE));
×
259
    MetricService.getInstance().addMetricSet(JvmGcMonitorMetrics.getInstance());
×
260
    MetricService.getInstance().addMetricSet(ClientManagerMetrics.getInstance());
×
261
    MetricService.getInstance().addMetricSet(ThreadPoolMetrics.getInstance());
×
262
    initCpuMetrics();
×
263
    initSystemMetrics();
×
264
  }
×
265

266
  private void initSystemMetrics() {
267
    ArrayList<String> diskDirs = new ArrayList<>();
×
268
    diskDirs.add(CONF.getSystemDir());
×
269
    diskDirs.add(CONF.getConsensusDir());
×
270
    MetricService.getInstance().addMetricSet(new SystemMetrics(diskDirs));
×
271
  }
×
272

273
  private void initCpuMetrics() {
274
    List<String> threadModules = new ArrayList<>();
×
275
    Arrays.stream(ThreadModule.values()).forEach(x -> threadModules.add(x.toString()));
×
276
    List<String> pools = new ArrayList<>();
×
277
    Arrays.stream(ThreadName.values()).forEach(x -> pools.add(x.name()));
×
278
    MetricService.getInstance()
×
279
        .addMetricSet(
×
280
            new CpuUsageMetrics(
281
                threadModules,
282
                pools,
283
                x -> ThreadName.getModuleTheThreadBelongs(x).toString(),
×
284
                x -> ThreadName.getThreadPoolTheThreadBelongs(x).name()));
×
285
  }
×
286

287
  private void initConfigManager() {
288
    try {
289
      configManager = new ConfigManager();
×
290
    } catch (IOException e) {
×
291
      LOGGER.error("Can't start ConfigNode consensus group!", e);
×
292
      stop();
×
293
    }
×
294
    // Add some Metrics for configManager
295
    configManager.addMetrics();
×
296
    LOGGER.info("Successfully initialize ConfigManager.");
×
297
  }
×
298

299
  /**
300
   * Register Non-seed ConfigNode when first startup.
301
   *
302
   * @throws StartupException if register failed.
303
   * @throws IOException if consensus manager init failed.
304
   */
305
  private void sendRegisterConfigNodeRequest() throws StartupException, IOException {
306
    TConfigNodeRegisterReq req =
×
307
        new TConfigNodeRegisterReq(
308
            configManager.getClusterParameters(),
×
309
            generateConfigNodeLocation(INIT_NON_SEED_CONFIG_NODE_ID));
×
310

311
    req.setBuildInfo(IoTDBConstant.BUILD_INFO);
×
312

313
    TEndPoint targetConfigNode = CONF.getTargetConfigNode();
×
314
    if (targetConfigNode == null) {
×
315
      LOGGER.error(
×
316
          "Please set the cn_target_config_node_list parameter in iotdb-confignode.properties file.");
317
      throw new StartupException("The targetConfigNode setting in conf is empty");
×
318
    }
319

320
    for (int retry = 0; retry < STARTUP_RETRY_NUM; retry++) {
×
321
      TSStatus status;
322
      TConfigNodeRegisterResp resp = null;
×
323
      Object obj =
324
          SyncConfigNodeClientPool.getInstance()
×
325
              .sendSyncRequestToConfigNodeWithRetry(
×
326
                  targetConfigNode, req, ConfigNodeRequestType.REGISTER_CONFIG_NODE);
327

328
      if (obj instanceof TConfigNodeRegisterResp) {
×
329
        resp = (TConfigNodeRegisterResp) obj;
×
330
        status = resp.getStatus();
×
331
      } else {
332
        status = (TSStatus) obj;
×
333
      }
334

335
      if (status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
×
336
        if (resp == null) {
×
337
          LOGGER.error("The result of register ConfigNode is empty!");
×
338
          throw new StartupException("The result of register ConfigNode is empty!");
×
339
        }
340
        /* Always set ConfigNodeId before initConsensusManager */
341
        CONF.setConfigNodeId(resp.getConfigNodeId());
×
342
        configManager.initConsensusManager();
×
343
        return;
×
344
      } else if (status.getCode() == TSStatusCode.REDIRECTION_RECOMMEND.getStatusCode()) {
×
345
        targetConfigNode = status.getRedirectNode();
×
346
        LOGGER.info("ConfigNode need redirect to  {}.", targetConfigNode);
×
347
      } else {
348
        throw new StartupException(status.getMessage());
×
349
      }
350
      startUpSleep("Register ConfigNode failed!");
×
351
    }
352

353
    LOGGER.error(
×
354
        "The current ConfigNode can't send register request to the ConfigNode-leader after all retries!");
355
    stop();
×
356
  }
×
357

358
  private void sendRestartConfigNodeRequest() throws StartupException {
359

360
    TConfigNodeRestartReq req =
×
361
        new TConfigNodeRestartReq(
362
            CONF.getClusterName(), generateConfigNodeLocation(CONF.getConfigNodeId()));
×
363

364
    TEndPoint targetConfigNode = CONF.getTargetConfigNode();
×
365
    if (targetConfigNode == null) {
×
366
      LOGGER.error(
×
367
          "Please set the cn_target_config_node_list parameter in iotdb-confignode.properties file.");
368
      throw new StartupException("The targetConfigNode setting in conf is empty");
×
369
    }
370

371
    for (int retry = 0; retry < STARTUP_RETRY_NUM; retry++) {
×
372
      TSStatus status =
373
          (TSStatus)
374
              SyncConfigNodeClientPool.getInstance()
×
375
                  .sendSyncRequestToConfigNodeWithRetry(
×
376
                      targetConfigNode, req, ConfigNodeRequestType.RESTART_CONFIG_NODE);
377

378
      if (status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
×
379
        LOGGER.info("Registration request of current ConfigNode is accepted.");
×
380
        return;
×
381
      } else if (status.getCode() == TSStatusCode.REDIRECTION_RECOMMEND.getStatusCode()) {
×
382
        targetConfigNode = status.getRedirectNode();
×
383
        LOGGER.info("ConfigNode need redirect to  {}.", targetConfigNode);
×
384
      } else {
385
        throw new StartupException(status.getMessage());
×
386
      }
387
      startUpSleep("Register ConfigNode failed! ");
×
388
    }
389
  }
×
390

391
  private TConfigNodeLocation generateConfigNodeLocation(int configNodeId) {
392
    return new TConfigNodeLocation(
×
393
        configNodeId,
394
        new TEndPoint(CONF.getInternalAddress(), CONF.getInternalPort()),
×
395
        new TEndPoint(CONF.getInternalAddress(), CONF.getConsensusPort()));
×
396
  }
397

398
  private void startUpSleep(String errorMessage) throws StartupException {
399
    try {
400
      TimeUnit.MILLISECONDS.sleep(STARTUP_RETRY_INTERVAL_IN_MS);
×
401
    } catch (InterruptedException e) {
×
402
      Thread.currentThread().interrupt();
×
403
      throw new StartupException(errorMessage);
×
404
    }
×
405
  }
×
406

407
  private void setUpRPCService() throws StartupException {
408
    // Setup RPCService
409
    ConfigNodeRPCService configNodeRPCService = new ConfigNodeRPCService();
×
410
    ConfigNodeRPCServiceProcessor configNodeRPCServiceProcessor =
×
411
        new ConfigNodeRPCServiceProcessor(configManager);
412
    configNodeRPCService.initSyncedServiceImpl(configNodeRPCServiceProcessor);
×
413
    registerManager.register(configNodeRPCService);
×
414
  }
×
415

416
  /**
417
   * Deactivating ConfigNode internal services.
418
   *
419
   * @throws IOException if close configManager failed.
420
   */
421
  public void deactivate() throws IOException {
422
    LOGGER.info("Deactivating {}...", ConfigNodeConstant.GLOBAL_NAME);
×
423
    registerManager.deregisterAll();
×
424
    JMXService.deregisterMBean(mbeanName);
×
425
    if (configManager != null) {
×
426
      configManager.close();
×
427
    }
428
    LOGGER.info("{} is deactivated.", ConfigNodeConstant.GLOBAL_NAME);
×
429
  }
×
430

431
  public void stop() {
432
    try {
433
      deactivate();
×
434
    } catch (IOException e) {
×
435
      LOGGER.error("Meet error when deactivate ConfigNode", e);
×
436
    }
×
437
    System.exit(-1);
×
438
  }
×
439

440
  public ConfigManager getConfigManager() {
441
    return configManager;
1✔
442
  }
443

444
  @TestOnly
445
  public void setConfigManager(ConfigManager configManager) {
446
    this.configManager = configManager;
1✔
447
  }
1✔
448

449
  private static class ConfigNodeHolder {
450

451
    private static final ConfigNode INSTANCE = new ConfigNode();
1✔
452

453
    private ConfigNodeHolder() {
454
      // Empty constructor
455
    }
456
  }
457

458
  public static ConfigNode getInstance() {
459
    return ConfigNodeHolder.INSTANCE;
1✔
460
  }
461
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc