• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

apache / iotdb / #9686

pending completion
#9686

push

travis_ci

web-flow
add build info in show cluster (#10595)

146 of 146 new or added lines in 13 files covered. (100.0%)

79232 of 165062 relevant lines covered (48.0%)

0.48 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

6.4
/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/service/ConfigNode.java
1
/*
2
 * Licensed to the Apache Software Foundation (ASF) under one
3
 * or more contributor license agreements.  See the NOTICE file
4
 * distributed with this work for additional information
5
 * regarding copyright ownership.  The ASF licenses this file
6
 * to you under the Apache License, Version 2.0 (the
7
 * "License"); you may not use this file except in compliance
8
 * with the License.  You may obtain a copy of the License at
9
 *
10
 *     http://www.apache.org/licenses/LICENSE-2.0
11
 *
12
 * Unless required by applicable law or agreed to in writing,
13
 * software distributed under the License is distributed on an
14
 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15
 * KIND, either express or implied.  See the License for the
16
 * specific language governing permissions and limitations
17
 * under the License.
18
 */
19

20
package org.apache.iotdb.confignode.service;
21

22
import org.apache.iotdb.common.rpc.thrift.TConfigNodeLocation;
23
import org.apache.iotdb.common.rpc.thrift.TEndPoint;
24
import org.apache.iotdb.common.rpc.thrift.TSStatus;
25
import org.apache.iotdb.commons.client.ClientManagerMetrics;
26
import org.apache.iotdb.commons.concurrent.ThreadModule;
27
import org.apache.iotdb.commons.concurrent.ThreadName;
28
import org.apache.iotdb.commons.concurrent.ThreadPoolMetrics;
29
import org.apache.iotdb.commons.conf.IoTDBConstant;
30
import org.apache.iotdb.commons.exception.StartupException;
31
import org.apache.iotdb.commons.service.JMXService;
32
import org.apache.iotdb.commons.service.RegisterManager;
33
import org.apache.iotdb.commons.service.metric.MetricService;
34
import org.apache.iotdb.commons.utils.TestOnly;
35
import org.apache.iotdb.confignode.client.ConfigNodeRequestType;
36
import org.apache.iotdb.confignode.client.sync.SyncConfigNodeClientPool;
37
import org.apache.iotdb.confignode.conf.ConfigNodeConfig;
38
import org.apache.iotdb.confignode.conf.ConfigNodeConstant;
39
import org.apache.iotdb.confignode.conf.ConfigNodeDescriptor;
40
import org.apache.iotdb.confignode.conf.SystemPropertiesUtils;
41
import org.apache.iotdb.confignode.manager.ConfigManager;
42
import org.apache.iotdb.confignode.rpc.thrift.TConfigNodeRegisterReq;
43
import org.apache.iotdb.confignode.rpc.thrift.TConfigNodeRegisterResp;
44
import org.apache.iotdb.confignode.rpc.thrift.TConfigNodeRestartReq;
45
import org.apache.iotdb.confignode.service.thrift.ConfigNodeRPCService;
46
import org.apache.iotdb.confignode.service.thrift.ConfigNodeRPCServiceProcessor;
47
import org.apache.iotdb.db.service.metrics.ProcessMetrics;
48
import org.apache.iotdb.metrics.config.MetricConfigDescriptor;
49
import org.apache.iotdb.metrics.metricsets.UpTimeMetrics;
50
import org.apache.iotdb.metrics.metricsets.cpu.CpuUsageMetrics;
51
import org.apache.iotdb.metrics.metricsets.disk.DiskMetrics;
52
import org.apache.iotdb.metrics.metricsets.jvm.JvmMetrics;
53
import org.apache.iotdb.metrics.metricsets.logback.LogbackMetrics;
54
import org.apache.iotdb.metrics.metricsets.net.NetMetrics;
55
import org.apache.iotdb.metrics.metricsets.system.SystemMetrics;
56
import org.apache.iotdb.rpc.TSStatusCode;
57

58
import org.slf4j.Logger;
59
import org.slf4j.LoggerFactory;
60

61
import java.io.File;
62
import java.io.IOException;
63
import java.nio.charset.Charset;
64
import java.util.ArrayList;
65
import java.util.Arrays;
66
import java.util.List;
67
import java.util.concurrent.TimeUnit;
68

69
public class ConfigNode implements ConfigNodeMBean {
70

71
  private static final Logger LOGGER = LoggerFactory.getLogger(ConfigNode.class);
1✔
72

73
  private static final ConfigNodeConfig CONF = ConfigNodeDescriptor.getInstance().getConf();
1✔
74

75
  private static final int STARTUP_RETRY_NUM = 10;
76
  private static final int SCHEDULE_WAITING_RETRY_NUM = 20;
77
  private static final long STARTUP_RETRY_INTERVAL_IN_MS = TimeUnit.SECONDS.toMillis(3);
1✔
78

79
  private static final int SEED_CONFIG_NODE_ID = 0;
80

81
  private static final int INIT_NON_SEED_CONFIG_NODE_ID = -1;
82

83
  private static final String CONFIGURATION = "IoTDB configuration: {}";
84

85
  private final String mbeanName =
1✔
86
      String.format(
1✔
87
          "%s:%s=%s",
88
          ConfigNodeConstant.CONFIGNODE_PACKAGE, ConfigNodeConstant.JMX_TYPE, "ConfigNode");
89
  private final RegisterManager registerManager = new RegisterManager();
1✔
90

91
  private ConfigManager configManager;
92

93
  private ConfigNode() {
1✔
94
    // We do not init anything here, so that we can re-initialize the instance in IT.
95
  }
1✔
96

97
  public static void main(String[] args) {
98
    LOGGER.info(
×
99
        "{} environment variables: {}",
100
        ConfigNodeConstant.GLOBAL_NAME,
101
        ConfigNodeConfig.getEnvironmentVariables());
×
102
    LOGGER.info(
×
103
        "{} default charset is: {}",
104
        ConfigNodeConstant.GLOBAL_NAME,
105
        Charset.defaultCharset().displayName());
×
106
    new ConfigNodeCommandLine().doMain(args);
×
107
  }
×
108

109
  public void active() {
110
    LOGGER.info("Activating {}...", ConfigNodeConstant.GLOBAL_NAME);
×
111

112
    try {
113
      processPid();
×
114
      // Add shutdown hook
115
      Runtime.getRuntime().addShutdownHook(new ConfigNodeShutdownHook());
×
116
      // Set up internal services
117
      setUpInternalServices();
×
118
      // Init ConfigManager
119
      initConfigManager();
×
120

121
      /* Restart */
122
      if (SystemPropertiesUtils.isRestarted()) {
×
123
        LOGGER.info("{} is in restarting process...", ConfigNodeConstant.GLOBAL_NAME);
×
124

125
        int configNodeId;
126
        if (!SystemPropertiesUtils.isSeedConfigNode()) {
×
127
          // The non-seed-ConfigNodes should send restart request and be checked (ip and port) by
128
          // leader before initConsensusManager
129
          sendRestartConfigNodeRequest();
×
130
          configNodeId = CONF.getConfigNodeId();
×
131
        } else {
132
          configNodeId = SEED_CONFIG_NODE_ID;
×
133
        }
134
        configManager.initConsensusManager();
×
135

136
        setUpMetricService();
×
137
        // Notice: We always set up Seed-ConfigNode's RPC service lastly to ensure
138
        // that the external service is not provided until ConfigNode is fully available
139
        setUpRPCService();
×
140
        LOGGER.info(CONFIGURATION, CONF.getConfigMessage());
×
141
        LOGGER.info(
×
142
            "{} has successfully restarted and joined the cluster: {}.",
143
            ConfigNodeConstant.GLOBAL_NAME,
144
            CONF.getClusterName());
×
145

146
        // Update item during restart
147
        // This will always be executed until the consensus write succeeds
148
        while (true) {
149
          TSStatus status =
×
150
              configManager
151
                  .getNodeManager()
×
152
                  .updateConfigNodeIfNecessary(configNodeId, IoTDBConstant.BUILD_INFO);
×
153
          if (status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
×
154
            break;
×
155
          } else {
156
            startUpSleep("restart ConfigNode failed! ");
×
157
          }
158
        }
×
159
        return;
×
160
      }
161

162
      /* Initial startup of Seed-ConfigNode */
163
      if (ConfigNodeDescriptor.getInstance().isSeedConfigNode()) {
×
164
        LOGGER.info(
×
165
            "The current {} is now starting as the Seed-ConfigNode.",
166
            ConfigNodeConstant.GLOBAL_NAME);
167

168
        /* Always set ClusterId and ConfigNodeId before initConsensusManager */
169
        CONF.setConfigNodeId(SEED_CONFIG_NODE_ID);
×
170
        configManager.initConsensusManager();
×
171

172
        // Persistence system parameters after the consensusGroup is built,
173
        // or the consensusGroup will not be initialized successfully otherwise.
174
        SystemPropertiesUtils.storeSystemParameters();
×
175

176
        // Seed-ConfigNode should apply itself when first start
177
        configManager
×
178
            .getNodeManager()
×
179
            .applyConfigNode(
×
180
                generateConfigNodeLocation(SEED_CONFIG_NODE_ID), IoTDBConstant.BUILD_INFO);
×
181
        setUpMetricService();
×
182
        // Notice: We always set up Seed-ConfigNode's RPC service lastly to ensure
183
        // that the external service is not provided until Seed-ConfigNode is fully initialized
184
        setUpRPCService();
×
185
        // The initial startup of Seed-ConfigNode finished
186
        LOGGER.info(CONFIGURATION, CONF.getConfigMessage());
×
187
        LOGGER.info(
×
188
            "{} has successfully started and joined the cluster: {}.",
189
            ConfigNodeConstant.GLOBAL_NAME,
190
            CONF.getClusterName());
×
191
        return;
×
192
      }
193

194
      /* Initial startup of Non-Seed-ConfigNode */
195
      // We set up Non-Seed ConfigNode's RPC service before sending the register request
196
      // in order to facilitate the scheduling of capacity expansion process in ConfigNode-leader
197
      setUpRPCService();
×
198
      sendRegisterConfigNodeRequest();
×
199
      // The initial startup of Non-Seed-ConfigNode is not yet finished,
200
      // we should wait for leader's scheduling
201
      LOGGER.info(CONFIGURATION, CONF.getConfigMessage());
×
202
      LOGGER.info(
×
203
          "{} {} has registered successfully. Waiting for the leader's scheduling to join the cluster: {}.",
204
          ConfigNodeConstant.GLOBAL_NAME,
205
          CONF.getConfigNodeId(),
×
206
          CONF.getClusterName());
×
207
      setUpMetricService();
×
208

209
      boolean isJoinedCluster = false;
×
210
      for (int retry = 0; retry < SCHEDULE_WAITING_RETRY_NUM; retry++) {
×
211
        if (!configManager
×
212
            .getConsensusManager()
×
213
            .getConsensusImpl()
×
214
            .getAllConsensusGroupIds()
×
215
            .isEmpty()) {
×
216
          isJoinedCluster = true;
×
217
          break;
×
218
        }
219
        startUpSleep("Waiting leader's scheduling is interrupted.");
×
220
      }
221

222
      if (!isJoinedCluster) {
×
223
        LOGGER.error(
×
224
            "The current ConfigNode can't joined the cluster because leader's scheduling failed. The possible cause is that the ip:port configuration is incorrect.");
225
        stop();
×
226
      }
227
    } catch (StartupException | IOException e) {
×
228
      LOGGER.error("Meet error while starting up.", e);
×
229
      stop();
×
230
    }
×
231
  }
×
232

233
  void processPid() {
234
    String pidFile = System.getProperty(ConfigNodeConstant.IOTDB_PIDFILE);
×
235
    if (pidFile != null) {
×
236
      new File(pidFile).deleteOnExit();
×
237
    }
238
  }
×
239

240
  private void setUpInternalServices() throws StartupException {
241
    // Setup JMXService
242
    registerManager.register(new JMXService());
×
243
    JMXService.registerMBean(this, mbeanName);
×
244

245
    LOGGER.info("Successfully setup internal services.");
×
246
  }
×
247

248
  private void setUpMetricService() throws StartupException {
249
    MetricConfigDescriptor.getInstance().getMetricConfig().setNodeId(CONF.getConfigNodeId());
×
250
    registerManager.register(MetricService.getInstance());
×
251
    // Bind predefined metric sets
252
    MetricService.getInstance().addMetricSet(new UpTimeMetrics());
×
253
    MetricService.getInstance().addMetricSet(new JvmMetrics());
×
254
    MetricService.getInstance().addMetricSet(new LogbackMetrics());
×
255
    MetricService.getInstance().addMetricSet(new ProcessMetrics());
×
256
    MetricService.getInstance().addMetricSet(new DiskMetrics(IoTDBConstant.CN_ROLE));
×
257
    MetricService.getInstance().addMetricSet(new NetMetrics(IoTDBConstant.CN_ROLE));
×
258
    MetricService.getInstance().addMetricSet(ClientManagerMetrics.getInstance());
×
259
    MetricService.getInstance().addMetricSet(ThreadPoolMetrics.getInstance());
×
260
    initCpuMetrics();
×
261
    initSystemMetrics();
×
262
  }
×
263

264
  private void initSystemMetrics() {
265
    ArrayList<String> diskDirs = new ArrayList<>();
×
266
    diskDirs.add(CONF.getSystemDir());
×
267
    diskDirs.add(CONF.getConsensusDir());
×
268
    MetricService.getInstance().addMetricSet(new SystemMetrics(diskDirs));
×
269
  }
×
270

271
  private void initCpuMetrics() {
272
    List<String> threadModules = new ArrayList<>();
×
273
    Arrays.stream(ThreadModule.values()).forEach(x -> threadModules.add(x.toString()));
×
274
    List<String> pools = new ArrayList<>();
×
275
    Arrays.stream(ThreadName.values()).forEach(x -> pools.add(x.name()));
×
276
    MetricService.getInstance()
×
277
        .addMetricSet(
×
278
            new CpuUsageMetrics(
279
                threadModules,
280
                pools,
281
                x -> ThreadName.getModuleTheThreadBelongs(x).toString(),
×
282
                x -> ThreadName.getThreadPoolTheThreadBelongs(x).name()));
×
283
  }
×
284

285
  private void initConfigManager() {
286
    try {
287
      configManager = new ConfigManager();
×
288
    } catch (IOException e) {
×
289
      LOGGER.error("Can't start ConfigNode consensus group!", e);
×
290
      stop();
×
291
    }
×
292
    // Add some Metrics for configManager
293
    configManager.addMetrics();
×
294
    LOGGER.info("Successfully initialize ConfigManager.");
×
295
  }
×
296

297
  /**
298
   * Register Non-seed ConfigNode when first startup.
299
   *
300
   * @throws StartupException if register failed.
301
   * @throws IOException if consensus manager init failed.
302
   */
303
  private void sendRegisterConfigNodeRequest() throws StartupException, IOException {
304
    TConfigNodeRegisterReq req =
×
305
        new TConfigNodeRegisterReq(
306
            configManager.getClusterParameters(),
×
307
            generateConfigNodeLocation(INIT_NON_SEED_CONFIG_NODE_ID));
×
308

309
    req.setBuildInfo(IoTDBConstant.BUILD_INFO);
×
310

311
    TEndPoint targetConfigNode = CONF.getTargetConfigNode();
×
312
    if (targetConfigNode == null) {
×
313
      LOGGER.error(
×
314
          "Please set the cn_target_config_node_list parameter in iotdb-confignode.properties file.");
315
      throw new StartupException("The targetConfigNode setting in conf is empty");
×
316
    }
317

318
    for (int retry = 0; retry < STARTUP_RETRY_NUM; retry++) {
×
319
      TSStatus status;
320
      TConfigNodeRegisterResp resp = null;
×
321
      Object obj =
322
          SyncConfigNodeClientPool.getInstance()
×
323
              .sendSyncRequestToConfigNodeWithRetry(
×
324
                  targetConfigNode, req, ConfigNodeRequestType.REGISTER_CONFIG_NODE);
325

326
      if (obj instanceof TConfigNodeRegisterResp) {
×
327
        resp = (TConfigNodeRegisterResp) obj;
×
328
        status = resp.getStatus();
×
329
      } else {
330
        status = (TSStatus) obj;
×
331
      }
332

333
      if (status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
×
334
        if (resp == null) {
×
335
          LOGGER.error("The result of register ConfigNode is empty!");
×
336
          throw new StartupException("The result of register ConfigNode is empty!");
×
337
        }
338
        /* Always set ConfigNodeId before initConsensusManager */
339
        CONF.setConfigNodeId(resp.getConfigNodeId());
×
340
        configManager.initConsensusManager();
×
341
        return;
×
342
      } else if (status.getCode() == TSStatusCode.REDIRECTION_RECOMMEND.getStatusCode()) {
×
343
        targetConfigNode = status.getRedirectNode();
×
344
        LOGGER.info("ConfigNode need redirect to  {}.", targetConfigNode);
×
345
      } else {
346
        throw new StartupException(status.getMessage());
×
347
      }
348
      startUpSleep("Register ConfigNode failed!");
×
349
    }
350

351
    LOGGER.error(
×
352
        "The current ConfigNode can't send register request to the ConfigNode-leader after all retries!");
353
    stop();
×
354
  }
×
355

356
  private void sendRestartConfigNodeRequest() throws StartupException {
357

358
    TConfigNodeRestartReq req =
×
359
        new TConfigNodeRestartReq(
360
            CONF.getClusterName(), generateConfigNodeLocation(CONF.getConfigNodeId()));
×
361

362
    TEndPoint targetConfigNode = CONF.getTargetConfigNode();
×
363
    if (targetConfigNode == null) {
×
364
      LOGGER.error(
×
365
          "Please set the cn_target_config_node_list parameter in iotdb-confignode.properties file.");
366
      throw new StartupException("The targetConfigNode setting in conf is empty");
×
367
    }
368

369
    for (int retry = 0; retry < STARTUP_RETRY_NUM; retry++) {
×
370
      TSStatus status =
371
          (TSStatus)
372
              SyncConfigNodeClientPool.getInstance()
×
373
                  .sendSyncRequestToConfigNodeWithRetry(
×
374
                      targetConfigNode, req, ConfigNodeRequestType.RESTART_CONFIG_NODE);
375

376
      if (status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
×
377
        LOGGER.info("Registration request of current ConfigNode is accepted.");
×
378
        return;
×
379
      } else if (status.getCode() == TSStatusCode.REDIRECTION_RECOMMEND.getStatusCode()) {
×
380
        targetConfigNode = status.getRedirectNode();
×
381
        LOGGER.info("ConfigNode need redirect to  {}.", targetConfigNode);
×
382
      } else {
383
        throw new StartupException(status.getMessage());
×
384
      }
385
      startUpSleep("Register ConfigNode failed! ");
×
386
    }
387
  }
×
388

389
  private TConfigNodeLocation generateConfigNodeLocation(int configNodeId) {
390
    return new TConfigNodeLocation(
×
391
        configNodeId,
392
        new TEndPoint(CONF.getInternalAddress(), CONF.getInternalPort()),
×
393
        new TEndPoint(CONF.getInternalAddress(), CONF.getConsensusPort()));
×
394
  }
395

396
  private void startUpSleep(String errorMessage) throws StartupException {
397
    try {
398
      TimeUnit.MILLISECONDS.sleep(STARTUP_RETRY_INTERVAL_IN_MS);
×
399
    } catch (InterruptedException e) {
×
400
      Thread.currentThread().interrupt();
×
401
      throw new StartupException(errorMessage);
×
402
    }
×
403
  }
×
404

405
  private void setUpRPCService() throws StartupException {
406
    // Setup RPCService
407
    ConfigNodeRPCService configNodeRPCService = new ConfigNodeRPCService();
×
408
    ConfigNodeRPCServiceProcessor configNodeRPCServiceProcessor =
×
409
        new ConfigNodeRPCServiceProcessor(configManager);
410
    configNodeRPCService.initSyncedServiceImpl(configNodeRPCServiceProcessor);
×
411
    registerManager.register(configNodeRPCService);
×
412
  }
×
413

414
  /**
415
   * Deactivating ConfigNode internal services.
416
   *
417
   * @throws IOException if close configManager failed.
418
   */
419
  public void deactivate() throws IOException {
420
    LOGGER.info("Deactivating {}...", ConfigNodeConstant.GLOBAL_NAME);
×
421
    registerManager.deregisterAll();
×
422
    JMXService.deregisterMBean(mbeanName);
×
423
    if (configManager != null) {
×
424
      configManager.close();
×
425
    }
426
    LOGGER.info("{} is deactivated.", ConfigNodeConstant.GLOBAL_NAME);
×
427
  }
×
428

429
  public void stop() {
430
    try {
431
      deactivate();
×
432
    } catch (IOException e) {
×
433
      LOGGER.error("Meet error when deactivate ConfigNode", e);
×
434
    }
×
435
    System.exit(-1);
×
436
  }
×
437

438
  public ConfigManager getConfigManager() {
439
    return configManager;
1✔
440
  }
441

442
  @TestOnly
443
  public void setConfigManager(ConfigManager configManager) {
444
    this.configManager = configManager;
1✔
445
  }
1✔
446

447
  private static class ConfigNodeHolder {
448

449
    private static final ConfigNode INSTANCE = new ConfigNode();
1✔
450

451
    private ConfigNodeHolder() {
452
      // Empty constructor
453
    }
454
  }
455

456
  public static ConfigNode getInstance() {
457
    return ConfigNodeHolder.INSTANCE;
1✔
458
  }
459
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc