• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

apache / iotdb / #9678

pending completion
#9678

push

travis_ci

web-flow
add iotdb-doap.rdf back (#10685)

79139 of 164854 relevant lines covered (48.01%)

0.48 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

6.47
/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/service/ConfigNode.java
1
/*
2
 * Licensed to the Apache Software Foundation (ASF) under one
3
 * or more contributor license agreements.  See the NOTICE file
4
 * distributed with this work for additional information
5
 * regarding copyright ownership.  The ASF licenses this file
6
 * to you under the Apache License, Version 2.0 (the
7
 * "License"); you may not use this file except in compliance
8
 * with the License.  You may obtain a copy of the License at
9
 *
10
 *     http://www.apache.org/licenses/LICENSE-2.0
11
 *
12
 * Unless required by applicable law or agreed to in writing,
13
 * software distributed under the License is distributed on an
14
 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15
 * KIND, either express or implied.  See the License for the
16
 * specific language governing permissions and limitations
17
 * under the License.
18
 */
19

20
package org.apache.iotdb.confignode.service;
21

22
import org.apache.iotdb.common.rpc.thrift.TConfigNodeLocation;
23
import org.apache.iotdb.common.rpc.thrift.TEndPoint;
24
import org.apache.iotdb.common.rpc.thrift.TSStatus;
25
import org.apache.iotdb.commons.client.ClientManagerMetrics;
26
import org.apache.iotdb.commons.concurrent.ThreadModule;
27
import org.apache.iotdb.commons.concurrent.ThreadName;
28
import org.apache.iotdb.commons.concurrent.ThreadPoolMetrics;
29
import org.apache.iotdb.commons.conf.IoTDBConstant;
30
import org.apache.iotdb.commons.exception.StartupException;
31
import org.apache.iotdb.commons.service.JMXService;
32
import org.apache.iotdb.commons.service.RegisterManager;
33
import org.apache.iotdb.commons.service.metric.MetricService;
34
import org.apache.iotdb.commons.utils.TestOnly;
35
import org.apache.iotdb.confignode.client.ConfigNodeRequestType;
36
import org.apache.iotdb.confignode.client.sync.SyncConfigNodeClientPool;
37
import org.apache.iotdb.confignode.conf.ConfigNodeConfig;
38
import org.apache.iotdb.confignode.conf.ConfigNodeConstant;
39
import org.apache.iotdb.confignode.conf.ConfigNodeDescriptor;
40
import org.apache.iotdb.confignode.conf.SystemPropertiesUtils;
41
import org.apache.iotdb.confignode.manager.ConfigManager;
42
import org.apache.iotdb.confignode.rpc.thrift.TConfigNodeRegisterReq;
43
import org.apache.iotdb.confignode.rpc.thrift.TConfigNodeRegisterResp;
44
import org.apache.iotdb.confignode.rpc.thrift.TConfigNodeRestartReq;
45
import org.apache.iotdb.confignode.service.thrift.ConfigNodeRPCService;
46
import org.apache.iotdb.confignode.service.thrift.ConfigNodeRPCServiceProcessor;
47
import org.apache.iotdb.db.service.metrics.ProcessMetrics;
48
import org.apache.iotdb.metrics.config.MetricConfigDescriptor;
49
import org.apache.iotdb.metrics.metricsets.UpTimeMetrics;
50
import org.apache.iotdb.metrics.metricsets.cpu.CpuUsageMetrics;
51
import org.apache.iotdb.metrics.metricsets.disk.DiskMetrics;
52
import org.apache.iotdb.metrics.metricsets.jvm.JvmMetrics;
53
import org.apache.iotdb.metrics.metricsets.logback.LogbackMetrics;
54
import org.apache.iotdb.metrics.metricsets.net.NetMetrics;
55
import org.apache.iotdb.metrics.metricsets.system.SystemMetrics;
56
import org.apache.iotdb.rpc.TSStatusCode;
57

58
import org.slf4j.Logger;
59
import org.slf4j.LoggerFactory;
60

61
import java.io.File;
62
import java.io.IOException;
63
import java.nio.charset.Charset;
64
import java.util.ArrayList;
65
import java.util.Arrays;
66
import java.util.List;
67
import java.util.concurrent.TimeUnit;
68

69
public class ConfigNode implements ConfigNodeMBean {
70

71
  private static final Logger LOGGER = LoggerFactory.getLogger(ConfigNode.class);
1✔
72

73
  private static final ConfigNodeConfig CONF = ConfigNodeDescriptor.getInstance().getConf();
1✔
74

75
  private static final int STARTUP_RETRY_NUM = 10;
76
  private static final int SCHEDULE_WAITING_RETRY_NUM = 20;
77
  private static final long STARTUP_RETRY_INTERVAL_IN_MS = TimeUnit.SECONDS.toMillis(3);
1✔
78

79
  private static final int SEED_CONFIG_NODE_ID = 0;
80

81
  private static final int INIT_NON_SEED_CONFIG_NODE_ID = -1;
82

83
  private static final String CONFIGURATION = "IoTDB configuration: {}";
84

85
  private final String mbeanName =
1✔
86
      String.format(
1✔
87
          "%s:%s=%s",
88
          ConfigNodeConstant.CONFIGNODE_PACKAGE, ConfigNodeConstant.JMX_TYPE, "ConfigNode");
89
  private final RegisterManager registerManager = new RegisterManager();
1✔
90

91
  private ConfigManager configManager;
92

93
  private ConfigNode() {
1✔
94
    // We do not init anything here, so that we can re-initialize the instance in IT.
95
  }
1✔
96

97
  public static void main(String[] args) {
98
    LOGGER.info(
×
99
        "{} environment variables: {}",
100
        ConfigNodeConstant.GLOBAL_NAME,
101
        ConfigNodeConfig.getEnvironmentVariables());
×
102
    LOGGER.info(
×
103
        "{} default charset is: {}",
104
        ConfigNodeConstant.GLOBAL_NAME,
105
        Charset.defaultCharset().displayName());
×
106
    new ConfigNodeCommandLine().doMain(args);
×
107
  }
×
108

109
  public void active() {
110
    LOGGER.info("Activating {}...", ConfigNodeConstant.GLOBAL_NAME);
×
111

112
    try {
113
      processPid();
×
114
      // Add shutdown hook
115
      Runtime.getRuntime().addShutdownHook(new ConfigNodeShutdownHook());
×
116
      // Set up internal services
117
      setUpInternalServices();
×
118
      // Init ConfigManager
119
      initConfigManager();
×
120

121
      /* Restart */
122
      if (SystemPropertiesUtils.isRestarted()) {
×
123
        LOGGER.info("{} is in restarting process...", ConfigNodeConstant.GLOBAL_NAME);
×
124

125
        if (!SystemPropertiesUtils.isSeedConfigNode()) {
×
126
          // The non-seed-ConfigNodes should send restart request
127
          sendRestartConfigNodeRequest();
×
128
        }
129

130
        configManager.initConsensusManager();
×
131
        setUpMetricService();
×
132
        // Notice: We always set up Seed-ConfigNode's RPC service lastly to ensure
133
        // that the external service is not provided until ConfigNode is fully available
134
        setUpRPCService();
×
135
        LOGGER.info(CONFIGURATION, CONF.getConfigMessage());
×
136
        LOGGER.info(
×
137
            "{} has successfully restarted and joined the cluster: {}.",
138
            ConfigNodeConstant.GLOBAL_NAME,
139
            CONF.getClusterName());
×
140
        return;
×
141
      }
142

143
      /* Initial startup of Seed-ConfigNode */
144
      if (ConfigNodeDescriptor.getInstance().isSeedConfigNode()) {
×
145
        LOGGER.info(
×
146
            "The current {} is now starting as the Seed-ConfigNode.",
147
            ConfigNodeConstant.GLOBAL_NAME);
148

149
        /* Always set ClusterId and ConfigNodeId before initConsensusManager */
150
        CONF.setConfigNodeId(SEED_CONFIG_NODE_ID);
×
151
        configManager.initConsensusManager();
×
152

153
        // Persistence system parameters after the consensusGroup is built,
154
        // or the consensusGroup will not be initialized successfully otherwise.
155
        SystemPropertiesUtils.storeSystemParameters();
×
156

157
        // Seed-ConfigNode should apply itself when first start
158
        configManager
×
159
            .getNodeManager()
×
160
            .applyConfigNode(
×
161
                new TConfigNodeLocation(
162
                    SEED_CONFIG_NODE_ID,
163
                    new TEndPoint(CONF.getInternalAddress(), CONF.getInternalPort()),
×
164
                    new TEndPoint(CONF.getInternalAddress(), CONF.getConsensusPort())));
×
165
        setUpMetricService();
×
166
        // Notice: We always set up Seed-ConfigNode's RPC service lastly to ensure
167
        // that the external service is not provided until Seed-ConfigNode is fully initialized
168
        setUpRPCService();
×
169
        // The initial startup of Seed-ConfigNode finished
170
        LOGGER.info(CONFIGURATION, CONF.getConfigMessage());
×
171
        LOGGER.info(
×
172
            "{} has successfully started and joined the cluster: {}.",
173
            ConfigNodeConstant.GLOBAL_NAME,
174
            CONF.getClusterName());
×
175
        return;
×
176
      }
177

178
      /* Initial startup of Non-Seed-ConfigNode */
179
      // We set up Non-Seed ConfigNode's RPC service before sending the register request
180
      // in order to facilitate the scheduling of capacity expansion process in ConfigNode-leader
181
      setUpRPCService();
×
182
      sendRegisterConfigNodeRequest();
×
183
      // The initial startup of Non-Seed-ConfigNode is not yet finished,
184
      // we should wait for leader's scheduling
185
      LOGGER.info(CONFIGURATION, CONF.getConfigMessage());
×
186
      LOGGER.info(
×
187
          "{} {} has registered successfully. Waiting for the leader's scheduling to join the cluster: {}.",
188
          ConfigNodeConstant.GLOBAL_NAME,
189
          CONF.getConfigNodeId(),
×
190
          CONF.getClusterName());
×
191
      setUpMetricService();
×
192

193
      boolean isJoinedCluster = false;
×
194
      for (int retry = 0; retry < SCHEDULE_WAITING_RETRY_NUM; retry++) {
×
195
        if (!configManager
×
196
            .getConsensusManager()
×
197
            .getConsensusImpl()
×
198
            .getAllConsensusGroupIds()
×
199
            .isEmpty()) {
×
200
          isJoinedCluster = true;
×
201
          break;
×
202
        }
203

204
        try {
205
          TimeUnit.MILLISECONDS.sleep(STARTUP_RETRY_INTERVAL_IN_MS);
×
206
        } catch (InterruptedException e) {
×
207
          LOGGER.warn("Waiting leader's scheduling is interrupted.");
×
208
          Thread.currentThread().interrupt();
×
209
        }
×
210
      }
211

212
      if (!isJoinedCluster) {
×
213
        LOGGER.error(
×
214
            "The current ConfigNode can't joined the cluster because leader's scheduling failed. The possible cause is that the ip:port configuration is incorrect.");
215
        stop();
×
216
      }
217
    } catch (StartupException | IOException e) {
×
218
      LOGGER.error("Meet error while starting up.", e);
×
219
      stop();
×
220
    }
×
221
  }
×
222

223
  void processPid() {
224
    String pidFile = System.getProperty(ConfigNodeConstant.IOTDB_PIDFILE);
×
225
    if (pidFile != null) {
×
226
      new File(pidFile).deleteOnExit();
×
227
    }
228
  }
×
229

230
  private void setUpInternalServices() throws StartupException {
231
    // Setup JMXService
232
    registerManager.register(new JMXService());
×
233
    JMXService.registerMBean(this, mbeanName);
×
234

235
    LOGGER.info("Successfully setup internal services.");
×
236
  }
×
237

238
  private void setUpMetricService() throws StartupException {
239
    MetricConfigDescriptor.getInstance().getMetricConfig().setNodeId(CONF.getConfigNodeId());
×
240
    registerManager.register(MetricService.getInstance());
×
241
    // Bind predefined metric sets
242
    MetricService.getInstance().addMetricSet(new UpTimeMetrics());
×
243
    MetricService.getInstance().addMetricSet(new JvmMetrics());
×
244
    MetricService.getInstance().addMetricSet(new LogbackMetrics());
×
245
    MetricService.getInstance().addMetricSet(new ProcessMetrics());
×
246
    MetricService.getInstance().addMetricSet(new DiskMetrics(IoTDBConstant.CN_ROLE));
×
247
    MetricService.getInstance().addMetricSet(new NetMetrics(IoTDBConstant.CN_ROLE));
×
248
    MetricService.getInstance().addMetricSet(ClientManagerMetrics.getInstance());
×
249
    MetricService.getInstance().addMetricSet(ThreadPoolMetrics.getInstance());
×
250
    initCpuMetrics();
×
251
    initSystemMetrics();
×
252
  }
×
253

254
  private void initSystemMetrics() {
255
    ArrayList<String> diskDirs = new ArrayList<>();
×
256
    diskDirs.add(CONF.getSystemDir());
×
257
    diskDirs.add(CONF.getConsensusDir());
×
258
    MetricService.getInstance().addMetricSet(new SystemMetrics(diskDirs));
×
259
  }
×
260

261
  private void initCpuMetrics() {
262
    List<String> threadModules = new ArrayList<>();
×
263
    Arrays.stream(ThreadModule.values()).forEach(x -> threadModules.add(x.toString()));
×
264
    List<String> pools = new ArrayList<>();
×
265
    Arrays.stream(ThreadName.values()).forEach(x -> pools.add(x.name()));
×
266
    MetricService.getInstance()
×
267
        .addMetricSet(
×
268
            new CpuUsageMetrics(
269
                threadModules,
270
                pools,
271
                x -> ThreadName.getModuleTheThreadBelongs(x).toString(),
×
272
                x -> ThreadName.getThreadPoolTheThreadBelongs(x).name()));
×
273
  }
×
274

275
  private void initConfigManager() {
276
    try {
277
      configManager = new ConfigManager();
×
278
    } catch (IOException e) {
×
279
      LOGGER.error("Can't start ConfigNode consensus group!", e);
×
280
      stop();
×
281
    }
×
282
    // Add some Metrics for configManager
283
    configManager.addMetrics();
×
284
    LOGGER.info("Successfully initialize ConfigManager.");
×
285
  }
×
286

287
  /**
288
   * Register Non-seed ConfigNode when first startup.
289
   *
290
   * @throws StartupException if register failed.
291
   * @throws IOException if consensus manager init failed.
292
   */
293
  private void sendRegisterConfigNodeRequest() throws StartupException, IOException {
294
    TConfigNodeRegisterReq req =
×
295
        new TConfigNodeRegisterReq(
296
            configManager.getClusterParameters(),
×
297
            new TConfigNodeLocation(
298
                INIT_NON_SEED_CONFIG_NODE_ID,
299
                new TEndPoint(CONF.getInternalAddress(), CONF.getInternalPort()),
×
300
                new TEndPoint(CONF.getInternalAddress(), CONF.getConsensusPort())));
×
301

302
    TEndPoint targetConfigNode = CONF.getTargetConfigNode();
×
303
    if (targetConfigNode == null) {
×
304
      LOGGER.error(
×
305
          "Please set the cn_target_config_node_list parameter in iotdb-confignode.properties file.");
306
      throw new StartupException("The targetConfigNode setting in conf is empty");
×
307
    }
308

309
    for (int retry = 0; retry < STARTUP_RETRY_NUM; retry++) {
×
310
      TSStatus status;
311
      TConfigNodeRegisterResp resp = null;
×
312
      Object obj =
313
          SyncConfigNodeClientPool.getInstance()
×
314
              .sendSyncRequestToConfigNodeWithRetry(
×
315
                  targetConfigNode, req, ConfigNodeRequestType.REGISTER_CONFIG_NODE);
316

317
      if (obj instanceof TConfigNodeRegisterResp) {
×
318
        resp = (TConfigNodeRegisterResp) obj;
×
319
        status = resp.getStatus();
×
320
      } else {
321
        status = (TSStatus) obj;
×
322
      }
323

324
      if (status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
×
325
        if (resp == null) {
×
326
          LOGGER.error("The result of register ConfigNode is empty!");
×
327
          throw new StartupException("The result of register ConfigNode is empty!");
×
328
        }
329
        /* Always set ConfigNodeId before initConsensusManager */
330
        CONF.setConfigNodeId(resp.getConfigNodeId());
×
331
        configManager.initConsensusManager();
×
332
        return;
×
333
      } else if (status.getCode() == TSStatusCode.REDIRECTION_RECOMMEND.getStatusCode()) {
×
334
        targetConfigNode = status.getRedirectNode();
×
335
        LOGGER.info("ConfigNode need redirect to  {}.", targetConfigNode);
×
336
      } else {
337
        throw new StartupException(status.getMessage());
×
338
      }
339

340
      try {
341
        TimeUnit.MILLISECONDS.sleep(STARTUP_RETRY_INTERVAL_IN_MS);
×
342
      } catch (InterruptedException e) {
×
343
        Thread.currentThread().interrupt();
×
344
        throw new StartupException("Register ConfigNode failed!");
×
345
      }
×
346
    }
347

348
    LOGGER.error(
×
349
        "The current ConfigNode can't send register request to the ConfigNode-leader after all retries!");
350
    stop();
×
351
  }
×
352

353
  private void sendRestartConfigNodeRequest() throws StartupException {
354
    TConfigNodeRestartReq req =
×
355
        new TConfigNodeRestartReq(
356
            CONF.getClusterName(),
×
357
            new TConfigNodeLocation(
358
                CONF.getConfigNodeId(),
×
359
                new TEndPoint(CONF.getInternalAddress(), CONF.getInternalPort()),
×
360
                new TEndPoint(CONF.getInternalAddress(), CONF.getConsensusPort())));
×
361

362
    TEndPoint targetConfigNode = CONF.getTargetConfigNode();
×
363
    if (targetConfigNode == null) {
×
364
      LOGGER.error(
×
365
          "Please set the cn_target_config_node_list parameter in iotdb-confignode.properties file.");
366
      throw new StartupException("The targetConfigNode setting in conf is empty");
×
367
    }
368

369
    for (int retry = 0; retry < STARTUP_RETRY_NUM; retry++) {
×
370
      TSStatus status =
371
          (TSStatus)
372
              SyncConfigNodeClientPool.getInstance()
×
373
                  .sendSyncRequestToConfigNodeWithRetry(
×
374
                      targetConfigNode, req, ConfigNodeRequestType.RESTART_CONFIG_NODE);
375

376
      if (status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
×
377
        LOGGER.info("Registration request of current ConfigNode is accepted.");
×
378
        return;
×
379
      } else if (status.getCode() == TSStatusCode.REDIRECTION_RECOMMEND.getStatusCode()) {
×
380
        targetConfigNode = status.getRedirectNode();
×
381
        LOGGER.info("ConfigNode need redirect to  {}.", targetConfigNode);
×
382
      } else {
383
        throw new StartupException(status.getMessage());
×
384
      }
385

386
      try {
387
        TimeUnit.MILLISECONDS.sleep(STARTUP_RETRY_INTERVAL_IN_MS);
×
388
      } catch (InterruptedException e) {
×
389
        Thread.currentThread().interrupt();
×
390
        throw new StartupException("Register ConfigNode failed! ");
×
391
      }
×
392
    }
393
  }
×
394

395
  private void setUpRPCService() throws StartupException {
396
    // Setup RPCService
397
    ConfigNodeRPCService configNodeRPCService = new ConfigNodeRPCService();
×
398
    ConfigNodeRPCServiceProcessor configNodeRPCServiceProcessor =
×
399
        new ConfigNodeRPCServiceProcessor(configManager);
400
    configNodeRPCService.initSyncedServiceImpl(configNodeRPCServiceProcessor);
×
401
    registerManager.register(configNodeRPCService);
×
402
  }
×
403

404
  /**
405
   * Deactivating ConfigNode internal services.
406
   *
407
   * @throws IOException if close configManager failed.
408
   */
409
  public void deactivate() throws IOException {
410
    LOGGER.info("Deactivating {}...", ConfigNodeConstant.GLOBAL_NAME);
×
411
    registerManager.deregisterAll();
×
412
    JMXService.deregisterMBean(mbeanName);
×
413
    if (configManager != null) {
×
414
      configManager.close();
×
415
    }
416
    LOGGER.info("{} is deactivated.", ConfigNodeConstant.GLOBAL_NAME);
×
417
  }
×
418

419
  public void stop() {
420
    try {
421
      deactivate();
×
422
    } catch (IOException e) {
×
423
      LOGGER.error("Meet error when deactivate ConfigNode", e);
×
424
    }
×
425
    System.exit(-1);
×
426
  }
×
427

428
  public ConfigManager getConfigManager() {
429
    return configManager;
1✔
430
  }
431

432
  @TestOnly
433
  public void setConfigManager(ConfigManager configManager) {
434
    this.configManager = configManager;
1✔
435
  }
1✔
436

437
  private static class ConfigNodeHolder {
438

439
    private static final ConfigNode INSTANCE = new ConfigNode();
1✔
440

441
    private ConfigNodeHolder() {
442
      // Empty constructor
443
    }
444
  }
445

446
  public static ConfigNode getInstance() {
447
    return ConfigNodeHolder.INSTANCE;
1✔
448
  }
449
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc