• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

apache / iotdb / #9972

31 Aug 2023 07:24AM UTC coverage: 47.709% (+0.02%) from 47.685%
#9972

push

travis_ci

web-flow
[To rel/1.2] fix wal npe when memTable has flushed. (#10900)

7 of 7 new or added lines in 1 file covered. (100.0%)

80186 of 168074 relevant lines covered (47.71%)

0.48 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

6.81
/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/service/ConfigNode.java
1
/*
2
 * Licensed to the Apache Software Foundation (ASF) under one
3
 * or more contributor license agreements.  See the NOTICE file
4
 * distributed with this work for additional information
5
 * regarding copyright ownership.  The ASF licenses this file
6
 * to you under the Apache License, Version 2.0 (the
7
 * "License"); you may not use this file except in compliance
8
 * with the License.  You may obtain a copy of the License at
9
 *
10
 *     http://www.apache.org/licenses/LICENSE-2.0
11
 *
12
 * Unless required by applicable law or agreed to in writing,
13
 * software distributed under the License is distributed on an
14
 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15
 * KIND, either express or implied.  See the License for the
16
 * specific language governing permissions and limitations
17
 * under the License.
18
 */
19

20
package org.apache.iotdb.confignode.service;
21

22
import org.apache.iotdb.common.rpc.thrift.TConfigNodeLocation;
23
import org.apache.iotdb.common.rpc.thrift.TEndPoint;
24
import org.apache.iotdb.common.rpc.thrift.TSStatus;
25
import org.apache.iotdb.commons.client.ClientManagerMetrics;
26
import org.apache.iotdb.commons.concurrent.ThreadModule;
27
import org.apache.iotdb.commons.concurrent.ThreadName;
28
import org.apache.iotdb.commons.concurrent.ThreadPoolMetrics;
29
import org.apache.iotdb.commons.conf.IoTDBConstant;
30
import org.apache.iotdb.commons.exception.StartupException;
31
import org.apache.iotdb.commons.service.JMXService;
32
import org.apache.iotdb.commons.service.RegisterManager;
33
import org.apache.iotdb.commons.service.metric.JvmGcMonitorMetrics;
34
import org.apache.iotdb.commons.service.metric.MetricService;
35
import org.apache.iotdb.commons.utils.TestOnly;
36
import org.apache.iotdb.confignode.client.ConfigNodeRequestType;
37
import org.apache.iotdb.confignode.client.sync.SyncConfigNodeClientPool;
38
import org.apache.iotdb.confignode.conf.ConfigNodeConfig;
39
import org.apache.iotdb.confignode.conf.ConfigNodeConstant;
40
import org.apache.iotdb.confignode.conf.ConfigNodeDescriptor;
41
import org.apache.iotdb.confignode.conf.SystemPropertiesUtils;
42
import org.apache.iotdb.confignode.manager.ConfigManager;
43
import org.apache.iotdb.confignode.rpc.thrift.TConfigNodeRegisterReq;
44
import org.apache.iotdb.confignode.rpc.thrift.TConfigNodeRegisterResp;
45
import org.apache.iotdb.confignode.rpc.thrift.TNodeVersionInfo;
46
import org.apache.iotdb.confignode.service.thrift.ConfigNodeRPCService;
47
import org.apache.iotdb.confignode.service.thrift.ConfigNodeRPCServiceProcessor;
48
import org.apache.iotdb.db.service.metrics.ProcessMetrics;
49
import org.apache.iotdb.metrics.config.MetricConfigDescriptor;
50
import org.apache.iotdb.metrics.metricsets.UpTimeMetrics;
51
import org.apache.iotdb.metrics.metricsets.cpu.CpuUsageMetrics;
52
import org.apache.iotdb.metrics.metricsets.disk.DiskMetrics;
53
import org.apache.iotdb.metrics.metricsets.jvm.JvmMetrics;
54
import org.apache.iotdb.metrics.metricsets.logback.LogbackMetrics;
55
import org.apache.iotdb.metrics.metricsets.net.NetMetrics;
56
import org.apache.iotdb.metrics.metricsets.system.SystemMetrics;
57
import org.apache.iotdb.rpc.TSStatusCode;
58

59
import org.slf4j.Logger;
60
import org.slf4j.LoggerFactory;
61

62
import java.io.File;
63
import java.io.IOException;
64
import java.nio.charset.Charset;
65
import java.util.ArrayList;
66
import java.util.Arrays;
67
import java.util.List;
68
import java.util.concurrent.TimeUnit;
69

70
public class ConfigNode implements ConfigNodeMBean {
71

72
  private static final Logger LOGGER = LoggerFactory.getLogger(ConfigNode.class);
1✔
73

74
  private static final ConfigNodeConfig CONF = ConfigNodeDescriptor.getInstance().getConf();
1✔
75

76
  private static final int STARTUP_RETRY_NUM = 10;
77
  private static final int SCHEDULE_WAITING_RETRY_NUM = 20;
78
  private static final long STARTUP_RETRY_INTERVAL_IN_MS = TimeUnit.SECONDS.toMillis(3);
1✔
79

80
  private static final int SEED_CONFIG_NODE_ID = 0;
81

82
  private static final int INIT_NON_SEED_CONFIG_NODE_ID = -1;
83

84
  private static final String CONFIGURATION = "IoTDB configuration: {}";
85

86
  private final String mbeanName =
1✔
87
      String.format(
1✔
88
          "%s:%s=%s",
89
          ConfigNodeConstant.CONFIGNODE_PACKAGE, ConfigNodeConstant.JMX_TYPE, "ConfigNode");
90
  private final RegisterManager registerManager = new RegisterManager();
1✔
91

92
  private ConfigManager configManager;
93

94
  private ConfigNode() {
1✔
95
    // We do not init anything here, so that we can re-initialize the instance in IT.
96
  }
1✔
97

98
  public static void main(String[] args) {
99
    LOGGER.info(
×
100
        "{} environment variables: {}",
101
        ConfigNodeConstant.GLOBAL_NAME,
102
        ConfigNodeConfig.getEnvironmentVariables());
×
103
    LOGGER.info(
×
104
        "{} default charset is: {}",
105
        ConfigNodeConstant.GLOBAL_NAME,
106
        Charset.defaultCharset().displayName());
×
107
    new ConfigNodeCommandLine().doMain(args);
×
108
  }
×
109

110
  public void active() {
111
    LOGGER.info("Activating {}...", ConfigNodeConstant.GLOBAL_NAME);
×
112

113
    try {
114
      processPid();
×
115
      // Add shutdown hook
116
      Runtime.getRuntime().addShutdownHook(new ConfigNodeShutdownHook());
×
117
      // Set up internal services
118
      setUpInternalServices();
×
119
      // Init ConfigManager
120
      initConfigManager();
×
121

122
      /* Restart */
123
      if (SystemPropertiesUtils.isRestarted()) {
×
124
        LOGGER.info("{} is in restarting process...", ConfigNodeConstant.GLOBAL_NAME);
×
125

126
        int configNodeId = CONF.getConfigNodeId();
×
127
        configManager.initConsensusManager();
×
128
        while (configManager.getConsensusManager().getLeader() == null) {
×
129
          LOGGER.info("Leader has not been elected yet, wait for 1 second");
×
130
          try {
131
            Thread.sleep(1000);
×
132
          } catch (InterruptedException e) {
×
133
            Thread.currentThread().interrupt();
×
134
            LOGGER.warn("Unexpected interruption during waiting for leader election.");
×
135
          }
×
136
        }
137
        setUpMetricService();
×
138
        // Notice: We always set up Seed-ConfigNode's RPC service lastly to ensure
139
        // that the external service is not provided until ConfigNode is fully available
140
        setUpRPCService();
×
141
        LOGGER.info(CONFIGURATION, CONF.getConfigMessage());
×
142
        LOGGER.info(
×
143
            "{} has successfully restarted and joined the cluster: {}.",
144
            ConfigNodeConstant.GLOBAL_NAME,
145
            CONF.getClusterName());
×
146

147
        // Update item during restart
148
        // This will always be executed until the consensus write succeeds
149
        while (true) {
150
          TSStatus status =
×
151
              configManager
152
                  .getNodeManager()
×
153
                  .updateConfigNodeIfNecessary(
×
154
                      configNodeId,
155
                      new TNodeVersionInfo(IoTDBConstant.VERSION, IoTDBConstant.BUILD_INFO));
156
          if (status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
×
157
            break;
×
158
          } else {
159
            startUpSleep("restart ConfigNode failed! ");
×
160
          }
161
        }
×
162
        return;
×
163
      }
164

165
      /* Initial startup of Seed-ConfigNode */
166
      if (ConfigNodeDescriptor.getInstance().isSeedConfigNode()) {
×
167
        LOGGER.info(
×
168
            "The current {} is now starting as the Seed-ConfigNode.",
169
            ConfigNodeConstant.GLOBAL_NAME);
170

171
        /* Always set ClusterId and ConfigNodeId before initConsensusManager */
172
        CONF.setConfigNodeId(SEED_CONFIG_NODE_ID);
×
173
        configManager.initConsensusManager();
×
174

175
        // Persistence system parameters after the consensusGroup is built,
176
        // or the consensusGroup will not be initialized successfully otherwise.
177
        SystemPropertiesUtils.storeSystemParameters();
×
178

179
        // Seed-ConfigNode should apply itself when first start
180
        configManager
×
181
            .getNodeManager()
×
182
            .applyConfigNode(
×
183
                generateConfigNodeLocation(SEED_CONFIG_NODE_ID),
×
184
                new TNodeVersionInfo(IoTDBConstant.VERSION, IoTDBConstant.BUILD_INFO));
185
        setUpMetricService();
×
186
        // Notice: We always set up Seed-ConfigNode's RPC service lastly to ensure
187
        // that the external service is not provided until Seed-ConfigNode is fully initialized
188
        setUpRPCService();
×
189
        // The initial startup of Seed-ConfigNode finished
190
        LOGGER.info(CONFIGURATION, CONF.getConfigMessage());
×
191
        LOGGER.info(
×
192
            "{} has successfully started and joined the cluster: {}.",
193
            ConfigNodeConstant.GLOBAL_NAME,
194
            CONF.getClusterName());
×
195
        return;
×
196
      }
197

198
      /* Initial startup of Non-Seed-ConfigNode */
199
      // We set up Non-Seed ConfigNode's RPC service before sending the register request
200
      // in order to facilitate the scheduling of capacity expansion process in ConfigNode-leader
201
      setUpRPCService();
×
202
      sendRegisterConfigNodeRequest();
×
203
      // The initial startup of Non-Seed-ConfigNode is not yet finished,
204
      // we should wait for leader's scheduling
205
      LOGGER.info(CONFIGURATION, CONF.getConfigMessage());
×
206
      LOGGER.info(
×
207
          "{} {} has registered successfully. Waiting for the leader's scheduling to join the cluster: {}.",
208
          ConfigNodeConstant.GLOBAL_NAME,
209
          CONF.getConfigNodeId(),
×
210
          CONF.getClusterName());
×
211
      setUpMetricService();
×
212

213
      boolean isJoinedCluster = false;
×
214
      for (int retry = 0; retry < SCHEDULE_WAITING_RETRY_NUM; retry++) {
×
215
        if (!configManager
×
216
            .getConsensusManager()
×
217
            .getConsensusImpl()
×
218
            .getAllConsensusGroupIds()
×
219
            .isEmpty()) {
×
220
          isJoinedCluster = true;
×
221
          break;
×
222
        }
223
        startUpSleep("Waiting leader's scheduling is interrupted.");
×
224
      }
225

226
      if (!isJoinedCluster) {
×
227
        LOGGER.error(
×
228
            "The current ConfigNode can't joined the cluster because leader's scheduling failed. The possible cause is that the ip:port configuration is incorrect.");
229
        stop();
×
230
      }
231
    } catch (StartupException | IOException e) {
×
232
      LOGGER.error("Meet error while starting up.", e);
×
233
      stop();
×
234
    }
×
235
  }
×
236

237
  void processPid() {
238
    String pidFile = System.getProperty(ConfigNodeConstant.IOTDB_PIDFILE);
×
239
    if (pidFile != null) {
×
240
      new File(pidFile).deleteOnExit();
×
241
    }
242
  }
×
243

244
  private void setUpInternalServices() throws StartupException {
245
    // Setup JMXService
246
    registerManager.register(new JMXService());
×
247
    JMXService.registerMBean(this, mbeanName);
×
248

249
    LOGGER.info("Successfully setup internal services.");
×
250
  }
×
251

252
  private void setUpMetricService() throws StartupException {
253
    MetricConfigDescriptor.getInstance().getMetricConfig().setNodeId(CONF.getConfigNodeId());
×
254
    registerManager.register(MetricService.getInstance());
×
255
    // Bind predefined metric sets
256
    MetricService.getInstance().addMetricSet(new UpTimeMetrics());
×
257
    MetricService.getInstance().addMetricSet(new JvmMetrics());
×
258
    MetricService.getInstance().addMetricSet(new LogbackMetrics());
×
259
    MetricService.getInstance().addMetricSet(new ProcessMetrics());
×
260
    MetricService.getInstance().addMetricSet(new DiskMetrics(IoTDBConstant.CN_ROLE));
×
261
    MetricService.getInstance().addMetricSet(new NetMetrics(IoTDBConstant.CN_ROLE));
×
262
    MetricService.getInstance().addMetricSet(JvmGcMonitorMetrics.getInstance());
×
263
    MetricService.getInstance().addMetricSet(ClientManagerMetrics.getInstance());
×
264
    MetricService.getInstance().addMetricSet(ThreadPoolMetrics.getInstance());
×
265
    initCpuMetrics();
×
266
    initSystemMetrics();
×
267
  }
×
268

269
  private void initSystemMetrics() {
270
    ArrayList<String> diskDirs = new ArrayList<>();
×
271
    diskDirs.add(CONF.getSystemDir());
×
272
    diskDirs.add(CONF.getConsensusDir());
×
273
    SystemMetrics.getInstance().setDiskDirs(diskDirs);
×
274
    MetricService.getInstance().addMetricSet(SystemMetrics.getInstance());
×
275
  }
×
276

277
  private void initCpuMetrics() {
278
    List<String> threadModules = new ArrayList<>();
×
279
    Arrays.stream(ThreadModule.values()).forEach(x -> threadModules.add(x.toString()));
×
280
    List<String> pools = new ArrayList<>();
×
281
    Arrays.stream(ThreadName.values()).forEach(x -> pools.add(x.name()));
×
282
    MetricService.getInstance()
×
283
        .addMetricSet(
×
284
            new CpuUsageMetrics(
285
                threadModules,
286
                pools,
287
                x -> ThreadName.getModuleTheThreadBelongs(x).toString(),
×
288
                x -> ThreadName.getThreadPoolTheThreadBelongs(x).name()));
×
289
  }
×
290

291
  private void initConfigManager() {
292
    try {
293
      configManager = new ConfigManager();
×
294
    } catch (IOException e) {
×
295
      LOGGER.error("Can't start ConfigNode consensus group!", e);
×
296
      stop();
×
297
    }
×
298
    // Add some Metrics for configManager
299
    configManager.addMetrics();
×
300
    LOGGER.info("Successfully initialize ConfigManager.");
×
301
  }
×
302

303
  /**
304
   * Register Non-seed ConfigNode when first startup.
305
   *
306
   * @throws StartupException if register failed.
307
   * @throws IOException if consensus manager init failed.
308
   */
309
  private void sendRegisterConfigNodeRequest() throws StartupException, IOException {
310
    TConfigNodeRegisterReq req =
×
311
        new TConfigNodeRegisterReq(
312
            configManager.getClusterParameters(),
×
313
            generateConfigNodeLocation(INIT_NON_SEED_CONFIG_NODE_ID));
×
314

315
    req.setVersionInfo(new TNodeVersionInfo(IoTDBConstant.VERSION, IoTDBConstant.BUILD_INFO));
×
316

317
    TEndPoint targetConfigNode = CONF.getTargetConfigNode();
×
318
    if (targetConfigNode == null) {
×
319
      LOGGER.error(
×
320
          "Please set the cn_target_config_node_list parameter in iotdb-confignode.properties file.");
321
      throw new StartupException("The targetConfigNode setting in conf is empty");
×
322
    }
323

324
    for (int retry = 0; retry < STARTUP_RETRY_NUM; retry++) {
×
325
      TSStatus status;
326
      TConfigNodeRegisterResp resp = null;
×
327
      Object obj =
328
          SyncConfigNodeClientPool.getInstance()
×
329
              .sendSyncRequestToConfigNodeWithRetry(
×
330
                  targetConfigNode, req, ConfigNodeRequestType.REGISTER_CONFIG_NODE);
331

332
      if (obj instanceof TConfigNodeRegisterResp) {
×
333
        resp = (TConfigNodeRegisterResp) obj;
×
334
        status = resp.getStatus();
×
335
      } else {
336
        status = (TSStatus) obj;
×
337
      }
338

339
      if (status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
×
340
        if (resp == null) {
×
341
          LOGGER.error("The result of register ConfigNode is empty!");
×
342
          throw new StartupException("The result of register ConfigNode is empty!");
×
343
        }
344
        /* Always set ConfigNodeId before initConsensusManager */
345
        CONF.setConfigNodeId(resp.getConfigNodeId());
×
346
        configManager.initConsensusManager();
×
347
        return;
×
348
      } else if (status.getCode() == TSStatusCode.REDIRECTION_RECOMMEND.getStatusCode()) {
×
349
        targetConfigNode = status.getRedirectNode();
×
350
        LOGGER.info("ConfigNode need redirect to  {}.", targetConfigNode);
×
351
      } else {
352
        throw new StartupException(status.getMessage());
×
353
      }
354
      startUpSleep("Register ConfigNode failed!");
×
355
    }
356

357
    LOGGER.error(
×
358
        "The current ConfigNode can't send register request to the ConfigNode-leader after all retries!");
359
    stop();
×
360
  }
×
361

362
  private TConfigNodeLocation generateConfigNodeLocation(int configNodeId) {
363
    return new TConfigNodeLocation(
×
364
        configNodeId,
365
        new TEndPoint(CONF.getInternalAddress(), CONF.getInternalPort()),
×
366
        new TEndPoint(CONF.getInternalAddress(), CONF.getConsensusPort()));
×
367
  }
368

369
  private void startUpSleep(String errorMessage) throws StartupException {
370
    try {
371
      TimeUnit.MILLISECONDS.sleep(STARTUP_RETRY_INTERVAL_IN_MS);
×
372
    } catch (InterruptedException e) {
×
373
      Thread.currentThread().interrupt();
×
374
      throw new StartupException(errorMessage);
×
375
    }
×
376
  }
×
377

378
  private void setUpRPCService() throws StartupException {
379
    // Setup RPCService
380
    ConfigNodeRPCService configNodeRPCService = new ConfigNodeRPCService();
×
381
    ConfigNodeRPCServiceProcessor configNodeRPCServiceProcessor =
×
382
        new ConfigNodeRPCServiceProcessor(configManager);
383
    configNodeRPCService.initSyncedServiceImpl(configNodeRPCServiceProcessor);
×
384
    registerManager.register(configNodeRPCService);
×
385
  }
×
386

387
  /**
388
   * Deactivating ConfigNode internal services.
389
   *
390
   * @throws IOException if close configManager failed.
391
   */
392
  public void deactivate() throws IOException {
393
    LOGGER.info("Deactivating {}...", ConfigNodeConstant.GLOBAL_NAME);
×
394
    registerManager.deregisterAll();
×
395
    JMXService.deregisterMBean(mbeanName);
×
396
    if (configManager != null) {
×
397
      configManager.close();
×
398
    }
399
    LOGGER.info("{} is deactivated.", ConfigNodeConstant.GLOBAL_NAME);
×
400
  }
×
401

402
  public void stop() {
403
    try {
404
      deactivate();
×
405
    } catch (IOException e) {
×
406
      LOGGER.error("Meet error when deactivate ConfigNode", e);
×
407
    }
×
408
    System.exit(-1);
×
409
  }
×
410

411
  public ConfigManager getConfigManager() {
412
    return configManager;
1✔
413
  }
414

415
  @TestOnly
416
  public void setConfigManager(ConfigManager configManager) {
417
    this.configManager = configManager;
1✔
418
  }
1✔
419

420
  private static class ConfigNodeHolder {
421

422
    private static final ConfigNode INSTANCE = new ConfigNode();
1✔
423

424
    private ConfigNodeHolder() {
425
      // Empty constructor
426
    }
427
  }
428

429
  public static ConfigNode getInstance() {
430
    return ConfigNodeHolder.INSTANCE;
1✔
431
  }
432
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc