• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

apache / iotdb / #9919

25 Aug 2023 07:08AM UTC coverage: 47.802% (+0.007%) from 47.795%
#9919

push

travis_ci

web-flow
Remove some useless configs (#10950)

80023 of 167404 relevant lines covered (47.8%)

0.48 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

6.34
/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/service/ConfigNode.java
1
/*
2
 * Licensed to the Apache Software Foundation (ASF) under one
3
 * or more contributor license agreements.  See the NOTICE file
4
 * distributed with this work for additional information
5
 * regarding copyright ownership.  The ASF licenses this file
6
 * to you under the Apache License, Version 2.0 (the
7
 * "License"); you may not use this file except in compliance
8
 * with the License.  You may obtain a copy of the License at
9
 *
10
 *     http://www.apache.org/licenses/LICENSE-2.0
11
 *
12
 * Unless required by applicable law or agreed to in writing,
13
 * software distributed under the License is distributed on an
14
 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15
 * KIND, either express or implied.  See the License for the
16
 * specific language governing permissions and limitations
17
 * under the License.
18
 */
19

20
package org.apache.iotdb.confignode.service;
21

22
import org.apache.iotdb.common.rpc.thrift.TConfigNodeLocation;
23
import org.apache.iotdb.common.rpc.thrift.TEndPoint;
24
import org.apache.iotdb.common.rpc.thrift.TSStatus;
25
import org.apache.iotdb.commons.client.ClientManagerMetrics;
26
import org.apache.iotdb.commons.concurrent.ThreadModule;
27
import org.apache.iotdb.commons.concurrent.ThreadName;
28
import org.apache.iotdb.commons.concurrent.ThreadPoolMetrics;
29
import org.apache.iotdb.commons.conf.IoTDBConstant;
30
import org.apache.iotdb.commons.exception.StartupException;
31
import org.apache.iotdb.commons.service.JMXService;
32
import org.apache.iotdb.commons.service.RegisterManager;
33
import org.apache.iotdb.commons.service.metric.JvmGcMonitorMetrics;
34
import org.apache.iotdb.commons.service.metric.MetricService;
35
import org.apache.iotdb.commons.utils.TestOnly;
36
import org.apache.iotdb.confignode.client.ConfigNodeRequestType;
37
import org.apache.iotdb.confignode.client.sync.SyncConfigNodeClientPool;
38
import org.apache.iotdb.confignode.conf.ConfigNodeConfig;
39
import org.apache.iotdb.confignode.conf.ConfigNodeConstant;
40
import org.apache.iotdb.confignode.conf.ConfigNodeDescriptor;
41
import org.apache.iotdb.confignode.conf.SystemPropertiesUtils;
42
import org.apache.iotdb.confignode.manager.ConfigManager;
43
import org.apache.iotdb.confignode.rpc.thrift.TConfigNodeRegisterReq;
44
import org.apache.iotdb.confignode.rpc.thrift.TConfigNodeRegisterResp;
45
import org.apache.iotdb.confignode.rpc.thrift.TConfigNodeRestartReq;
46
import org.apache.iotdb.confignode.rpc.thrift.TNodeVersionInfo;
47
import org.apache.iotdb.confignode.service.thrift.ConfigNodeRPCService;
48
import org.apache.iotdb.confignode.service.thrift.ConfigNodeRPCServiceProcessor;
49
import org.apache.iotdb.db.service.metrics.ProcessMetrics;
50
import org.apache.iotdb.metrics.config.MetricConfigDescriptor;
51
import org.apache.iotdb.metrics.metricsets.UpTimeMetrics;
52
import org.apache.iotdb.metrics.metricsets.cpu.CpuUsageMetrics;
53
import org.apache.iotdb.metrics.metricsets.disk.DiskMetrics;
54
import org.apache.iotdb.metrics.metricsets.jvm.JvmMetrics;
55
import org.apache.iotdb.metrics.metricsets.logback.LogbackMetrics;
56
import org.apache.iotdb.metrics.metricsets.net.NetMetrics;
57
import org.apache.iotdb.metrics.metricsets.system.SystemMetrics;
58
import org.apache.iotdb.rpc.TSStatusCode;
59

60
import org.slf4j.Logger;
61
import org.slf4j.LoggerFactory;
62

63
import java.io.File;
64
import java.io.IOException;
65
import java.nio.charset.Charset;
66
import java.util.ArrayList;
67
import java.util.Arrays;
68
import java.util.List;
69
import java.util.concurrent.TimeUnit;
70

71
public class ConfigNode implements ConfigNodeMBean {
72

73
  private static final Logger LOGGER = LoggerFactory.getLogger(ConfigNode.class);
1✔
74

75
  private static final ConfigNodeConfig CONF = ConfigNodeDescriptor.getInstance().getConf();
1✔
76

77
  private static final int STARTUP_RETRY_NUM = 10;
78
  private static final int SCHEDULE_WAITING_RETRY_NUM = 20;
79
  private static final long STARTUP_RETRY_INTERVAL_IN_MS = TimeUnit.SECONDS.toMillis(3);
1✔
80

81
  private static final int SEED_CONFIG_NODE_ID = 0;
82

83
  private static final int INIT_NON_SEED_CONFIG_NODE_ID = -1;
84

85
  private static final String CONFIGURATION = "IoTDB configuration: {}";
86

87
  private final String mbeanName =
1✔
88
      String.format(
1✔
89
          "%s:%s=%s",
90
          ConfigNodeConstant.CONFIGNODE_PACKAGE, ConfigNodeConstant.JMX_TYPE, "ConfigNode");
91
  private final RegisterManager registerManager = new RegisterManager();
1✔
92

93
  private ConfigManager configManager;
94

95
  private ConfigNode() {
1✔
96
    // We do not init anything here, so that we can re-initialize the instance in IT.
97
  }
1✔
98

99
  public static void main(String[] args) {
100
    LOGGER.info(
×
101
        "{} environment variables: {}",
102
        ConfigNodeConstant.GLOBAL_NAME,
103
        ConfigNodeConfig.getEnvironmentVariables());
×
104
    LOGGER.info(
×
105
        "{} default charset is: {}",
106
        ConfigNodeConstant.GLOBAL_NAME,
107
        Charset.defaultCharset().displayName());
×
108
    new ConfigNodeCommandLine().doMain(args);
×
109
  }
×
110

111
  public void active() {
112
    LOGGER.info("Activating {}...", ConfigNodeConstant.GLOBAL_NAME);
×
113

114
    try {
115
      processPid();
×
116
      // Add shutdown hook
117
      Runtime.getRuntime().addShutdownHook(new ConfigNodeShutdownHook());
×
118
      // Set up internal services
119
      setUpInternalServices();
×
120
      // Init ConfigManager
121
      initConfigManager();
×
122

123
      /* Restart */
124
      if (SystemPropertiesUtils.isRestarted()) {
×
125
        LOGGER.info("{} is in restarting process...", ConfigNodeConstant.GLOBAL_NAME);
×
126

127
        int configNodeId;
128
        if (!SystemPropertiesUtils.isSeedConfigNode()) {
×
129
          // The non-seed-ConfigNodes should send restart request and be checked (ip and port) by
130
          // leader before initConsensusManager
131
          sendRestartConfigNodeRequest();
×
132
          configNodeId = CONF.getConfigNodeId();
×
133
        } else {
134
          configNodeId = SEED_CONFIG_NODE_ID;
×
135
        }
136
        configManager.initConsensusManager();
×
137

138
        setUpMetricService();
×
139
        // Notice: We always set up Seed-ConfigNode's RPC service lastly to ensure
140
        // that the external service is not provided until ConfigNode is fully available
141
        setUpRPCService();
×
142
        LOGGER.info(CONFIGURATION, CONF.getConfigMessage());
×
143
        LOGGER.info(
×
144
            "{} has successfully restarted and joined the cluster: {}.",
145
            ConfigNodeConstant.GLOBAL_NAME,
146
            CONF.getClusterName());
×
147

148
        // Update item during restart
149
        // This will always be executed until the consensus write succeeds
150
        while (true) {
151
          TSStatus status =
×
152
              configManager
153
                  .getNodeManager()
×
154
                  .updateConfigNodeIfNecessary(
×
155
                      configNodeId,
156
                      new TNodeVersionInfo(IoTDBConstant.VERSION, IoTDBConstant.BUILD_INFO));
157
          if (status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
×
158
            break;
×
159
          } else {
160
            startUpSleep("restart ConfigNode failed! ");
×
161
          }
162
        }
×
163
        return;
×
164
      }
165

166
      /* Initial startup of Seed-ConfigNode */
167
      if (ConfigNodeDescriptor.getInstance().isSeedConfigNode()) {
×
168
        LOGGER.info(
×
169
            "The current {} is now starting as the Seed-ConfigNode.",
170
            ConfigNodeConstant.GLOBAL_NAME);
171

172
        /* Always set ClusterId and ConfigNodeId before initConsensusManager */
173
        CONF.setConfigNodeId(SEED_CONFIG_NODE_ID);
×
174
        configManager.initConsensusManager();
×
175

176
        // Persistence system parameters after the consensusGroup is built,
177
        // or the consensusGroup will not be initialized successfully otherwise.
178
        SystemPropertiesUtils.storeSystemParameters();
×
179

180
        // Seed-ConfigNode should apply itself when first start
181
        configManager
×
182
            .getNodeManager()
×
183
            .applyConfigNode(
×
184
                generateConfigNodeLocation(SEED_CONFIG_NODE_ID),
×
185
                new TNodeVersionInfo(IoTDBConstant.VERSION, IoTDBConstant.BUILD_INFO));
186
        setUpMetricService();
×
187
        // Notice: We always set up Seed-ConfigNode's RPC service lastly to ensure
188
        // that the external service is not provided until Seed-ConfigNode is fully initialized
189
        setUpRPCService();
×
190
        // The initial startup of Seed-ConfigNode finished
191
        LOGGER.info(CONFIGURATION, CONF.getConfigMessage());
×
192
        LOGGER.info(
×
193
            "{} has successfully started and joined the cluster: {}.",
194
            ConfigNodeConstant.GLOBAL_NAME,
195
            CONF.getClusterName());
×
196
        return;
×
197
      }
198

199
      /* Initial startup of Non-Seed-ConfigNode */
200
      // We set up Non-Seed ConfigNode's RPC service before sending the register request
201
      // in order to facilitate the scheduling of capacity expansion process in ConfigNode-leader
202
      setUpRPCService();
×
203
      sendRegisterConfigNodeRequest();
×
204
      // The initial startup of Non-Seed-ConfigNode is not yet finished,
205
      // we should wait for leader's scheduling
206
      LOGGER.info(CONFIGURATION, CONF.getConfigMessage());
×
207
      LOGGER.info(
×
208
          "{} {} has registered successfully. Waiting for the leader's scheduling to join the cluster: {}.",
209
          ConfigNodeConstant.GLOBAL_NAME,
210
          CONF.getConfigNodeId(),
×
211
          CONF.getClusterName());
×
212
      setUpMetricService();
×
213

214
      boolean isJoinedCluster = false;
×
215
      for (int retry = 0; retry < SCHEDULE_WAITING_RETRY_NUM; retry++) {
×
216
        if (!configManager
×
217
            .getConsensusManager()
×
218
            .getConsensusImpl()
×
219
            .getAllConsensusGroupIds()
×
220
            .isEmpty()) {
×
221
          isJoinedCluster = true;
×
222
          break;
×
223
        }
224
        startUpSleep("Waiting leader's scheduling is interrupted.");
×
225
      }
226

227
      if (!isJoinedCluster) {
×
228
        LOGGER.error(
×
229
            "The current ConfigNode can't joined the cluster because leader's scheduling failed. The possible cause is that the ip:port configuration is incorrect.");
230
        stop();
×
231
      }
232
    } catch (StartupException | IOException e) {
×
233
      LOGGER.error("Meet error while starting up.", e);
×
234
      stop();
×
235
    }
×
236
  }
×
237

238
  void processPid() {
239
    String pidFile = System.getProperty(ConfigNodeConstant.IOTDB_PIDFILE);
×
240
    if (pidFile != null) {
×
241
      new File(pidFile).deleteOnExit();
×
242
    }
243
  }
×
244

245
  private void setUpInternalServices() throws StartupException {
246
    // Setup JMXService
247
    registerManager.register(new JMXService());
×
248
    JMXService.registerMBean(this, mbeanName);
×
249

250
    LOGGER.info("Successfully setup internal services.");
×
251
  }
×
252

253
  private void setUpMetricService() throws StartupException {
254
    MetricConfigDescriptor.getInstance().getMetricConfig().setNodeId(CONF.getConfigNodeId());
×
255
    registerManager.register(MetricService.getInstance());
×
256
    // Bind predefined metric sets
257
    MetricService.getInstance().addMetricSet(new UpTimeMetrics());
×
258
    MetricService.getInstance().addMetricSet(new JvmMetrics());
×
259
    MetricService.getInstance().addMetricSet(new LogbackMetrics());
×
260
    MetricService.getInstance().addMetricSet(new ProcessMetrics());
×
261
    MetricService.getInstance().addMetricSet(new DiskMetrics(IoTDBConstant.CN_ROLE));
×
262
    MetricService.getInstance().addMetricSet(new NetMetrics(IoTDBConstant.CN_ROLE));
×
263
    MetricService.getInstance().addMetricSet(JvmGcMonitorMetrics.getInstance());
×
264
    MetricService.getInstance().addMetricSet(ClientManagerMetrics.getInstance());
×
265
    MetricService.getInstance().addMetricSet(ThreadPoolMetrics.getInstance());
×
266
    initCpuMetrics();
×
267
    initSystemMetrics();
×
268
  }
×
269

270
  private void initSystemMetrics() {
271
    ArrayList<String> diskDirs = new ArrayList<>();
×
272
    diskDirs.add(CONF.getSystemDir());
×
273
    diskDirs.add(CONF.getConsensusDir());
×
274
    SystemMetrics.getInstance().setDiskDirs(diskDirs);
×
275
    MetricService.getInstance().addMetricSet(SystemMetrics.getInstance());
×
276
  }
×
277

278
  private void initCpuMetrics() {
279
    List<String> threadModules = new ArrayList<>();
×
280
    Arrays.stream(ThreadModule.values()).forEach(x -> threadModules.add(x.toString()));
×
281
    List<String> pools = new ArrayList<>();
×
282
    Arrays.stream(ThreadName.values()).forEach(x -> pools.add(x.name()));
×
283
    MetricService.getInstance()
×
284
        .addMetricSet(
×
285
            new CpuUsageMetrics(
286
                threadModules,
287
                pools,
288
                x -> ThreadName.getModuleTheThreadBelongs(x).toString(),
×
289
                x -> ThreadName.getThreadPoolTheThreadBelongs(x).name()));
×
290
  }
×
291

292
  private void initConfigManager() {
293
    try {
294
      configManager = new ConfigManager();
×
295
    } catch (IOException e) {
×
296
      LOGGER.error("Can't start ConfigNode consensus group!", e);
×
297
      stop();
×
298
    }
×
299
    // Add some Metrics for configManager
300
    configManager.addMetrics();
×
301
    LOGGER.info("Successfully initialize ConfigManager.");
×
302
  }
×
303

304
  /**
305
   * Register Non-seed ConfigNode when first startup.
306
   *
307
   * @throws StartupException if register failed.
308
   * @throws IOException if consensus manager init failed.
309
   */
310
  private void sendRegisterConfigNodeRequest() throws StartupException, IOException {
311
    TConfigNodeRegisterReq req =
×
312
        new TConfigNodeRegisterReq(
313
            configManager.getClusterParameters(),
×
314
            generateConfigNodeLocation(INIT_NON_SEED_CONFIG_NODE_ID));
×
315

316
    req.setVersionInfo(new TNodeVersionInfo(IoTDBConstant.VERSION, IoTDBConstant.BUILD_INFO));
×
317

318
    TEndPoint targetConfigNode = CONF.getTargetConfigNode();
×
319
    if (targetConfigNode == null) {
×
320
      LOGGER.error(
×
321
          "Please set the cn_target_config_node_list parameter in iotdb-confignode.properties file.");
322
      throw new StartupException("The targetConfigNode setting in conf is empty");
×
323
    }
324

325
    for (int retry = 0; retry < STARTUP_RETRY_NUM; retry++) {
×
326
      TSStatus status;
327
      TConfigNodeRegisterResp resp = null;
×
328
      Object obj =
329
          SyncConfigNodeClientPool.getInstance()
×
330
              .sendSyncRequestToConfigNodeWithRetry(
×
331
                  targetConfigNode, req, ConfigNodeRequestType.REGISTER_CONFIG_NODE);
332

333
      if (obj instanceof TConfigNodeRegisterResp) {
×
334
        resp = (TConfigNodeRegisterResp) obj;
×
335
        status = resp.getStatus();
×
336
      } else {
337
        status = (TSStatus) obj;
×
338
      }
339

340
      if (status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
×
341
        if (resp == null) {
×
342
          LOGGER.error("The result of register ConfigNode is empty!");
×
343
          throw new StartupException("The result of register ConfigNode is empty!");
×
344
        }
345
        /* Always set ConfigNodeId before initConsensusManager */
346
        CONF.setConfigNodeId(resp.getConfigNodeId());
×
347
        configManager.initConsensusManager();
×
348
        return;
×
349
      } else if (status.getCode() == TSStatusCode.REDIRECTION_RECOMMEND.getStatusCode()) {
×
350
        targetConfigNode = status.getRedirectNode();
×
351
        LOGGER.info("ConfigNode need redirect to  {}.", targetConfigNode);
×
352
      } else {
353
        throw new StartupException(status.getMessage());
×
354
      }
355
      startUpSleep("Register ConfigNode failed!");
×
356
    }
357

358
    LOGGER.error(
×
359
        "The current ConfigNode can't send register request to the ConfigNode-leader after all retries!");
360
    stop();
×
361
  }
×
362

363
  private void sendRestartConfigNodeRequest() throws StartupException {
364

365
    TConfigNodeRestartReq req =
×
366
        new TConfigNodeRestartReq(
367
            CONF.getClusterName(), generateConfigNodeLocation(CONF.getConfigNodeId()));
×
368

369
    TEndPoint targetConfigNode = CONF.getTargetConfigNode();
×
370
    if (targetConfigNode == null) {
×
371
      LOGGER.error(
×
372
          "Please set the cn_target_config_node_list parameter in iotdb-confignode.properties file.");
373
      throw new StartupException("The targetConfigNode setting in conf is empty");
×
374
    }
375

376
    for (int retry = 0; retry < STARTUP_RETRY_NUM; retry++) {
×
377
      TSStatus status =
378
          (TSStatus)
379
              SyncConfigNodeClientPool.getInstance()
×
380
                  .sendSyncRequestToConfigNodeWithRetry(
×
381
                      targetConfigNode, req, ConfigNodeRequestType.RESTART_CONFIG_NODE);
382

383
      if (status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
×
384
        LOGGER.info("Registration request of current ConfigNode is accepted.");
×
385
        return;
×
386
      } else if (status.getCode() == TSStatusCode.REDIRECTION_RECOMMEND.getStatusCode()) {
×
387
        targetConfigNode = status.getRedirectNode();
×
388
        LOGGER.info("ConfigNode need redirect to  {}.", targetConfigNode);
×
389
      } else {
390
        throw new StartupException(status.getMessage());
×
391
      }
392
      startUpSleep("Register ConfigNode failed! ");
×
393
    }
394
  }
×
395

396
  private TConfigNodeLocation generateConfigNodeLocation(int configNodeId) {
397
    return new TConfigNodeLocation(
×
398
        configNodeId,
399
        new TEndPoint(CONF.getInternalAddress(), CONF.getInternalPort()),
×
400
        new TEndPoint(CONF.getInternalAddress(), CONF.getConsensusPort()));
×
401
  }
402

403
  private void startUpSleep(String errorMessage) throws StartupException {
404
    try {
405
      TimeUnit.MILLISECONDS.sleep(STARTUP_RETRY_INTERVAL_IN_MS);
×
406
    } catch (InterruptedException e) {
×
407
      Thread.currentThread().interrupt();
×
408
      throw new StartupException(errorMessage);
×
409
    }
×
410
  }
×
411

412
  private void setUpRPCService() throws StartupException {
413
    // Setup RPCService
414
    ConfigNodeRPCService configNodeRPCService = new ConfigNodeRPCService();
×
415
    ConfigNodeRPCServiceProcessor configNodeRPCServiceProcessor =
×
416
        new ConfigNodeRPCServiceProcessor(configManager);
417
    configNodeRPCService.initSyncedServiceImpl(configNodeRPCServiceProcessor);
×
418
    registerManager.register(configNodeRPCService);
×
419
  }
×
420

421
  /**
422
   * Deactivating ConfigNode internal services.
423
   *
424
   * @throws IOException if close configManager failed.
425
   */
426
  public void deactivate() throws IOException {
427
    LOGGER.info("Deactivating {}...", ConfigNodeConstant.GLOBAL_NAME);
×
428
    registerManager.deregisterAll();
×
429
    JMXService.deregisterMBean(mbeanName);
×
430
    if (configManager != null) {
×
431
      configManager.close();
×
432
    }
433
    LOGGER.info("{} is deactivated.", ConfigNodeConstant.GLOBAL_NAME);
×
434
  }
×
435

436
  public void stop() {
437
    try {
438
      deactivate();
×
439
    } catch (IOException e) {
×
440
      LOGGER.error("Meet error when deactivate ConfigNode", e);
×
441
    }
×
442
    System.exit(-1);
×
443
  }
×
444

445
  public ConfigManager getConfigManager() {
446
    return configManager;
1✔
447
  }
448

449
  @TestOnly
450
  public void setConfigManager(ConfigManager configManager) {
451
    this.configManager = configManager;
1✔
452
  }
1✔
453

454
  private static class ConfigNodeHolder {
455

456
    private static final ConfigNode INSTANCE = new ConfigNode();
1✔
457

458
    private ConfigNodeHolder() {
459
      // Empty constructor
460
    }
461
  }
462

463
  public static ConfigNode getInstance() {
464
    return ConfigNodeHolder.INSTANCE;
1✔
465
  }
466
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc