• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

apache / iotdb / #9965

30 Aug 2023 11:08AM UTC coverage: 47.773% (+0.01%) from 47.759%
#9965

push

travis_ci

web-flow
[IOTDB-6129] ConfigNode restarts without relying on Seed-ConfigNode  (#10988)

9 of 9 new or added lines in 2 files covered. (100.0%)

80390 of 168274 relevant lines covered (47.77%)

0.48 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

7.29
/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/service/ConfigNode.java
1
/*
2
 * Licensed to the Apache Software Foundation (ASF) under one
3
 * or more contributor license agreements.  See the NOTICE file
4
 * distributed with this work for additional information
5
 * regarding copyright ownership.  The ASF licenses this file
6
 * to you under the Apache License, Version 2.0 (the
7
 * "License"); you may not use this file except in compliance
8
 * with the License.  You may obtain a copy of the License at
9
 *
10
 *     http://www.apache.org/licenses/LICENSE-2.0
11
 *
12
 * Unless required by applicable law or agreed to in writing,
13
 * software distributed under the License is distributed on an
14
 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15
 * KIND, either express or implied.  See the License for the
16
 * specific language governing permissions and limitations
17
 * under the License.
18
 */
19

20
package org.apache.iotdb.confignode.service;
21

22
import org.apache.iotdb.common.rpc.thrift.TConfigNodeLocation;
23
import org.apache.iotdb.common.rpc.thrift.TEndPoint;
24
import org.apache.iotdb.common.rpc.thrift.TSStatus;
25
import org.apache.iotdb.commons.client.ClientManagerMetrics;
26
import org.apache.iotdb.commons.concurrent.ThreadModule;
27
import org.apache.iotdb.commons.concurrent.ThreadName;
28
import org.apache.iotdb.commons.concurrent.ThreadPoolMetrics;
29
import org.apache.iotdb.commons.conf.IoTDBConstant;
30
import org.apache.iotdb.commons.exception.StartupException;
31
import org.apache.iotdb.commons.service.JMXService;
32
import org.apache.iotdb.commons.service.RegisterManager;
33
import org.apache.iotdb.commons.service.ServiceType;
34
import org.apache.iotdb.commons.service.metric.JvmGcMonitorMetrics;
35
import org.apache.iotdb.commons.service.metric.MetricService;
36
import org.apache.iotdb.commons.utils.TestOnly;
37
import org.apache.iotdb.confignode.client.ConfigNodeRequestType;
38
import org.apache.iotdb.confignode.client.sync.SyncConfigNodeClientPool;
39
import org.apache.iotdb.confignode.conf.ConfigNodeConfig;
40
import org.apache.iotdb.confignode.conf.ConfigNodeConstant;
41
import org.apache.iotdb.confignode.conf.ConfigNodeDescriptor;
42
import org.apache.iotdb.confignode.conf.SystemPropertiesUtils;
43
import org.apache.iotdb.confignode.manager.ConfigManager;
44
import org.apache.iotdb.confignode.rpc.thrift.TConfigNodeRegisterReq;
45
import org.apache.iotdb.confignode.rpc.thrift.TConfigNodeRegisterResp;
46
import org.apache.iotdb.confignode.rpc.thrift.TNodeVersionInfo;
47
import org.apache.iotdb.confignode.service.thrift.ConfigNodeRPCService;
48
import org.apache.iotdb.confignode.service.thrift.ConfigNodeRPCServiceProcessor;
49
import org.apache.iotdb.db.service.metrics.ProcessMetrics;
50
import org.apache.iotdb.metrics.config.MetricConfigDescriptor;
51
import org.apache.iotdb.metrics.metricsets.UpTimeMetrics;
52
import org.apache.iotdb.metrics.metricsets.cpu.CpuUsageMetrics;
53
import org.apache.iotdb.metrics.metricsets.disk.DiskMetrics;
54
import org.apache.iotdb.metrics.metricsets.jvm.JvmMetrics;
55
import org.apache.iotdb.metrics.metricsets.logback.LogbackMetrics;
56
import org.apache.iotdb.metrics.metricsets.net.NetMetrics;
57
import org.apache.iotdb.metrics.metricsets.system.SystemMetrics;
58
import org.apache.iotdb.rpc.TSStatusCode;
59

60
import org.slf4j.Logger;
61
import org.slf4j.LoggerFactory;
62

63
import java.io.File;
64
import java.io.IOException;
65
import java.nio.charset.Charset;
66
import java.util.ArrayList;
67
import java.util.Arrays;
68
import java.util.List;
69
import java.util.concurrent.TimeUnit;
70

71
public class ConfigNode implements ConfigNodeMBean {
72

73
  private static final Logger LOGGER = LoggerFactory.getLogger(ConfigNode.class);
1✔
74

75
  private static final ConfigNodeConfig CONF = ConfigNodeDescriptor.getInstance().getConf();
1✔
76

77
  private static final int STARTUP_RETRY_NUM = 10;
78
  private static final int SCHEDULE_WAITING_RETRY_NUM = 20;
79
  private static final long STARTUP_RETRY_INTERVAL_IN_MS = TimeUnit.SECONDS.toMillis(3);
1✔
80

81
  private static final int SEED_CONFIG_NODE_ID = 0;
82

83
  private static final int INIT_NON_SEED_CONFIG_NODE_ID = -1;
84

85
  private static final String CONFIGURATION = "IoTDB configuration: {}";
86

87
  private final String mbeanName =
1✔
88
      String.format(
1✔
89
          "%s:%s=%s",
90
          IoTDBConstant.IOTDB_SERVICE_JMX_NAME,
91
          ConfigNodeConstant.JMX_TYPE,
92
          ServiceType.CONFIG_NODE.getJmxName());
1✔
93
  private final RegisterManager registerManager = new RegisterManager();
1✔
94

95
  private ConfigManager configManager;
96

97
  private ConfigNode() {
1✔
98
    // We do not init anything here, so that we can re-initialize the instance in IT.
99
  }
1✔
100

101
  public static void main(String[] args) {
102
    LOGGER.info(
×
103
        "{} environment variables: {}",
104
        ConfigNodeConstant.GLOBAL_NAME,
105
        ConfigNodeConfig.getEnvironmentVariables());
×
106
    LOGGER.info(
×
107
        "{} default charset is: {}",
108
        ConfigNodeConstant.GLOBAL_NAME,
109
        Charset.defaultCharset().displayName());
×
110
    new ConfigNodeCommandLine().doMain(args);
×
111
  }
×
112

113
  public void active() {
114
    LOGGER.info("Activating {}...", ConfigNodeConstant.GLOBAL_NAME);
×
115

116
    try {
117
      processPid();
×
118
      // Add shutdown hook
119
      Runtime.getRuntime().addShutdownHook(new ConfigNodeShutdownHook());
×
120
      // Set up internal services
121
      setUpInternalServices();
×
122
      // Init ConfigManager
123
      initConfigManager();
×
124

125
      /* Restart */
126
      if (SystemPropertiesUtils.isRestarted()) {
×
127
        LOGGER.info("{} is in restarting process...", ConfigNodeConstant.GLOBAL_NAME);
×
128

129
        int configNodeId = CONF.getConfigNodeId();
×
130
        configManager.initConsensusManager();
×
131
        while (configManager.getConsensusManager().getLeader() == null) {
×
132
          LOGGER.info("Leader has not been elected yet, wait for 1 second");
×
133
          try {
134
            Thread.sleep(1000);
×
135
          } catch (InterruptedException e) {
×
136
            Thread.currentThread().interrupt();
×
137
            LOGGER.warn("Unexpected interruption during waiting for leader election.");
×
138
          }
×
139
        }
140
        setUpMetricService();
×
141
        // Notice: We always set up Seed-ConfigNode's RPC service lastly to ensure
142
        // that the external service is not provided until ConfigNode is fully available
143
        setUpRPCService();
×
144
        LOGGER.info(CONFIGURATION, CONF.getConfigMessage());
×
145
        LOGGER.info(
×
146
            "{} has successfully restarted and joined the cluster: {}.",
147
            ConfigNodeConstant.GLOBAL_NAME,
148
            CONF.getClusterName());
×
149

150
        // Update item during restart
151
        // This will always be executed until the consensus write succeeds
152
        while (true) {
153
          TSStatus status =
×
154
              configManager
155
                  .getNodeManager()
×
156
                  .updateConfigNodeIfNecessary(
×
157
                      configNodeId,
158
                      new TNodeVersionInfo(IoTDBConstant.VERSION, IoTDBConstant.BUILD_INFO));
159
          if (status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
×
160
            break;
×
161
          } else {
162
            startUpSleep("restart ConfigNode failed! ");
×
163
          }
164
        }
×
165
        return;
×
166
      }
167

168
      /* Initial startup of Seed-ConfigNode */
169
      if (ConfigNodeDescriptor.getInstance().isSeedConfigNode()) {
×
170
        LOGGER.info(
×
171
            "The current {} is now starting as the Seed-ConfigNode.",
172
            ConfigNodeConstant.GLOBAL_NAME);
173

174
        /* Always set ClusterId and ConfigNodeId before initConsensusManager */
175
        CONF.setConfigNodeId(SEED_CONFIG_NODE_ID);
×
176
        configManager.initConsensusManager();
×
177

178
        // Persistence system parameters after the consensusGroup is built,
179
        // or the consensusGroup will not be initialized successfully otherwise.
180
        SystemPropertiesUtils.storeSystemParameters();
×
181

182
        // Seed-ConfigNode should apply itself when first start
183
        configManager
×
184
            .getNodeManager()
×
185
            .applyConfigNode(
×
186
                generateConfigNodeLocation(SEED_CONFIG_NODE_ID),
×
187
                new TNodeVersionInfo(IoTDBConstant.VERSION, IoTDBConstant.BUILD_INFO));
188
        setUpMetricService();
×
189
        // Notice: We always set up Seed-ConfigNode's RPC service lastly to ensure
190
        // that the external service is not provided until Seed-ConfigNode is fully initialized
191
        setUpRPCService();
×
192
        // The initial startup of Seed-ConfigNode finished
193
        LOGGER.info(CONFIGURATION, CONF.getConfigMessage());
×
194
        LOGGER.info(
×
195
            "{} has successfully started and joined the cluster: {}.",
196
            ConfigNodeConstant.GLOBAL_NAME,
197
            CONF.getClusterName());
×
198
        return;
×
199
      }
200

201
      /* Initial startup of Non-Seed-ConfigNode */
202
      // We set up Non-Seed ConfigNode's RPC service before sending the register request
203
      // in order to facilitate the scheduling of capacity expansion process in ConfigNode-leader
204
      setUpRPCService();
×
205
      sendRegisterConfigNodeRequest();
×
206
      // The initial startup of Non-Seed-ConfigNode is not yet finished,
207
      // we should wait for leader's scheduling
208
      LOGGER.info(CONFIGURATION, CONF.getConfigMessage());
×
209
      LOGGER.info(
×
210
          "{} {} has registered successfully. Waiting for the leader's scheduling to join the cluster: {}.",
211
          ConfigNodeConstant.GLOBAL_NAME,
212
          CONF.getConfigNodeId(),
×
213
          CONF.getClusterName());
×
214
      setUpMetricService();
×
215

216
      boolean isJoinedCluster = false;
×
217
      for (int retry = 0; retry < SCHEDULE_WAITING_RETRY_NUM; retry++) {
×
218
        if (!configManager
×
219
            .getConsensusManager()
×
220
            .getConsensusImpl()
×
221
            .getAllConsensusGroupIds()
×
222
            .isEmpty()) {
×
223
          isJoinedCluster = true;
×
224
          break;
×
225
        }
226
        startUpSleep("Waiting leader's scheduling is interrupted.");
×
227
      }
228

229
      if (!isJoinedCluster) {
×
230
        LOGGER.error(
×
231
            "The current ConfigNode can't joined the cluster because leader's scheduling failed. The possible cause is that the ip:port configuration is incorrect.");
232
        stop();
×
233
      }
234
    } catch (StartupException | IOException e) {
×
235
      LOGGER.error("Meet error while starting up.", e);
×
236
      stop();
×
237
    }
×
238
  }
×
239

240
  void processPid() {
241
    String pidFile = System.getProperty(ConfigNodeConstant.IOTDB_PIDFILE);
×
242
    if (pidFile != null) {
×
243
      new File(pidFile).deleteOnExit();
×
244
    }
245
  }
×
246

247
  private void setUpInternalServices() throws StartupException {
248
    // Setup JMXService
249
    registerManager.register(new JMXService());
×
250
    JMXService.registerMBean(this, mbeanName);
×
251

252
    LOGGER.info("Successfully setup internal services.");
×
253
  }
×
254

255
  private void setUpMetricService() throws StartupException {
256
    MetricConfigDescriptor.getInstance().getMetricConfig().setNodeId(CONF.getConfigNodeId());
×
257
    registerManager.register(MetricService.getInstance());
×
258
    // Bind predefined metric sets
259
    MetricService.getInstance().addMetricSet(new UpTimeMetrics());
×
260
    MetricService.getInstance().addMetricSet(new JvmMetrics());
×
261
    MetricService.getInstance().addMetricSet(new LogbackMetrics());
×
262
    MetricService.getInstance().addMetricSet(new ProcessMetrics());
×
263
    MetricService.getInstance().addMetricSet(new DiskMetrics(IoTDBConstant.CN_ROLE));
×
264
    MetricService.getInstance().addMetricSet(new NetMetrics(IoTDBConstant.CN_ROLE));
×
265
    MetricService.getInstance().addMetricSet(JvmGcMonitorMetrics.getInstance());
×
266
    MetricService.getInstance().addMetricSet(ClientManagerMetrics.getInstance());
×
267
    MetricService.getInstance().addMetricSet(ThreadPoolMetrics.getInstance());
×
268
    initCpuMetrics();
×
269
    initSystemMetrics();
×
270
  }
×
271

272
  private void initSystemMetrics() {
273
    ArrayList<String> diskDirs = new ArrayList<>();
×
274
    diskDirs.add(CONF.getSystemDir());
×
275
    diskDirs.add(CONF.getConsensusDir());
×
276
    SystemMetrics.getInstance().setDiskDirs(diskDirs);
×
277
    MetricService.getInstance().addMetricSet(SystemMetrics.getInstance());
×
278
  }
×
279

280
  private void initCpuMetrics() {
281
    List<String> threadModules = new ArrayList<>();
×
282
    Arrays.stream(ThreadModule.values()).forEach(x -> threadModules.add(x.toString()));
×
283
    List<String> pools = new ArrayList<>();
×
284
    Arrays.stream(ThreadName.values()).forEach(x -> pools.add(x.name()));
×
285
    MetricService.getInstance()
×
286
        .addMetricSet(
×
287
            new CpuUsageMetrics(
288
                threadModules,
289
                pools,
290
                x -> ThreadName.getModuleTheThreadBelongs(x).toString(),
×
291
                x -> ThreadName.getThreadPoolTheThreadBelongs(x).name()));
×
292
  }
×
293

294
  private void initConfigManager() {
295
    try {
296
      configManager = new ConfigManager();
×
297
    } catch (IOException e) {
×
298
      LOGGER.error("Can't start ConfigNode consensus group!", e);
×
299
      stop();
×
300
    }
×
301
    // Add some Metrics for configManager
302
    configManager.addMetrics();
×
303
    LOGGER.info("Successfully initialize ConfigManager.");
×
304
  }
×
305

306
  /**
307
   * Register Non-seed ConfigNode when first startup.
308
   *
309
   * @throws StartupException if register failed.
310
   * @throws IOException if consensus manager init failed.
311
   */
312
  private void sendRegisterConfigNodeRequest() throws StartupException, IOException {
313
    TConfigNodeRegisterReq req =
×
314
        new TConfigNodeRegisterReq(
315
            configManager.getClusterParameters(),
×
316
            generateConfigNodeLocation(INIT_NON_SEED_CONFIG_NODE_ID));
×
317

318
    req.setVersionInfo(new TNodeVersionInfo(IoTDBConstant.VERSION, IoTDBConstant.BUILD_INFO));
×
319

320
    TEndPoint targetConfigNode = CONF.getTargetConfigNode();
×
321
    if (targetConfigNode == null) {
×
322
      LOGGER.error(
×
323
          "Please set the cn_target_config_node_list parameter in iotdb-confignode.properties file.");
324
      throw new StartupException("The targetConfigNode setting in conf is empty");
×
325
    }
326

327
    for (int retry = 0; retry < STARTUP_RETRY_NUM; retry++) {
×
328
      TSStatus status;
329
      TConfigNodeRegisterResp resp = null;
×
330
      Object obj =
331
          SyncConfigNodeClientPool.getInstance()
×
332
              .sendSyncRequestToConfigNodeWithRetry(
×
333
                  targetConfigNode, req, ConfigNodeRequestType.REGISTER_CONFIG_NODE);
334

335
      if (obj instanceof TConfigNodeRegisterResp) {
×
336
        resp = (TConfigNodeRegisterResp) obj;
×
337
        status = resp.getStatus();
×
338
      } else {
339
        status = (TSStatus) obj;
×
340
      }
341

342
      if (status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
×
343
        if (resp == null) {
×
344
          LOGGER.error("The result of register ConfigNode is empty!");
×
345
          throw new StartupException("The result of register ConfigNode is empty!");
×
346
        }
347
        /* Always set ConfigNodeId before initConsensusManager */
348
        CONF.setConfigNodeId(resp.getConfigNodeId());
×
349
        configManager.initConsensusManager();
×
350
        return;
×
351
      } else if (status.getCode() == TSStatusCode.REDIRECTION_RECOMMEND.getStatusCode()) {
×
352
        targetConfigNode = status.getRedirectNode();
×
353
        LOGGER.info("ConfigNode need redirect to  {}.", targetConfigNode);
×
354
      } else {
355
        throw new StartupException(status.getMessage());
×
356
      }
357
      startUpSleep("Register ConfigNode failed!");
×
358
    }
359

360
    LOGGER.error(
×
361
        "The current ConfigNode can't send register request to the ConfigNode-leader after all retries!");
362
    stop();
×
363
  }
×
364

365
  private TConfigNodeLocation generateConfigNodeLocation(int configNodeId) {
366
    return new TConfigNodeLocation(
×
367
        configNodeId,
368
        new TEndPoint(CONF.getInternalAddress(), CONF.getInternalPort()),
×
369
        new TEndPoint(CONF.getInternalAddress(), CONF.getConsensusPort()));
×
370
  }
371

372
  private void startUpSleep(String errorMessage) throws StartupException {
373
    try {
374
      TimeUnit.MILLISECONDS.sleep(STARTUP_RETRY_INTERVAL_IN_MS);
×
375
    } catch (InterruptedException e) {
×
376
      Thread.currentThread().interrupt();
×
377
      throw new StartupException(errorMessage);
×
378
    }
×
379
  }
×
380

381
  private void setUpRPCService() throws StartupException {
382
    // Setup RPCService
383
    ConfigNodeRPCService configNodeRPCService = new ConfigNodeRPCService();
×
384
    ConfigNodeRPCServiceProcessor configNodeRPCServiceProcessor =
×
385
        new ConfigNodeRPCServiceProcessor(configManager);
386
    configNodeRPCService.initSyncedServiceImpl(configNodeRPCServiceProcessor);
×
387
    registerManager.register(configNodeRPCService);
×
388
  }
×
389

390
  /**
391
   * Deactivating ConfigNode internal services.
392
   *
393
   * @throws IOException if close configManager failed.
394
   */
395
  public void deactivate() throws IOException {
396
    LOGGER.info("Deactivating {}...", ConfigNodeConstant.GLOBAL_NAME);
×
397
    registerManager.deregisterAll();
×
398
    JMXService.deregisterMBean(mbeanName);
×
399
    if (configManager != null) {
×
400
      configManager.close();
×
401
    }
402
    LOGGER.info("{} is deactivated.", ConfigNodeConstant.GLOBAL_NAME);
×
403
  }
×
404

405
  public void stop() {
406
    try {
407
      deactivate();
×
408
    } catch (IOException e) {
×
409
      LOGGER.error("Meet error when deactivate ConfigNode", e);
×
410
    }
×
411
    System.exit(-1);
×
412
  }
×
413

414
  public ConfigManager getConfigManager() {
415
    return configManager;
1✔
416
  }
417

418
  @TestOnly
419
  public void setConfigManager(ConfigManager configManager) {
420
    this.configManager = configManager;
1✔
421
  }
1✔
422

423
  private static class ConfigNodeHolder {
424

425
    private static final ConfigNode INSTANCE = new ConfigNode();
1✔
426

427
    private ConfigNodeHolder() {
428
      // Empty constructor
429
    }
430
  }
431

432
  public static ConfigNode getInstance() {
433
    return ConfigNodeHolder.INSTANCE;
1✔
434
  }
435
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc