• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

apache / iotdb / #9910

23 Aug 2023 10:24AM UTC coverage: 47.829% (-0.03%) from 47.856%
#9910

push

travis_ci

web-flow
[IOTDB-6103] Adding count_time aggregation feature (#10756)

403 of 403 new or added lines in 33 files covered. (100.0%)

80118 of 167508 relevant lines covered (47.83%)

0.48 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

6.8
/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/service/ConfigNode.java
1
/*
2
 * Licensed to the Apache Software Foundation (ASF) under one
3
 * or more contributor license agreements.  See the NOTICE file
4
 * distributed with this work for additional information
5
 * regarding copyright ownership.  The ASF licenses this file
6
 * to you under the Apache License, Version 2.0 (the
7
 * "License"); you may not use this file except in compliance
8
 * with the License.  You may obtain a copy of the License at
9
 *
10
 *     http://www.apache.org/licenses/LICENSE-2.0
11
 *
12
 * Unless required by applicable law or agreed to in writing,
13
 * software distributed under the License is distributed on an
14
 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15
 * KIND, either express or implied.  See the License for the
16
 * specific language governing permissions and limitations
17
 * under the License.
18
 */
19

20
package org.apache.iotdb.confignode.service;
21

22
import org.apache.iotdb.common.rpc.thrift.TConfigNodeLocation;
23
import org.apache.iotdb.common.rpc.thrift.TEndPoint;
24
import org.apache.iotdb.common.rpc.thrift.TSStatus;
25
import org.apache.iotdb.commons.client.ClientManagerMetrics;
26
import org.apache.iotdb.commons.concurrent.ThreadModule;
27
import org.apache.iotdb.commons.concurrent.ThreadName;
28
import org.apache.iotdb.commons.concurrent.ThreadPoolMetrics;
29
import org.apache.iotdb.commons.conf.IoTDBConstant;
30
import org.apache.iotdb.commons.exception.StartupException;
31
import org.apache.iotdb.commons.service.JMXService;
32
import org.apache.iotdb.commons.service.RegisterManager;
33
import org.apache.iotdb.commons.service.ServiceType;
34
import org.apache.iotdb.commons.service.metric.JvmGcMonitorMetrics;
35
import org.apache.iotdb.commons.service.metric.MetricService;
36
import org.apache.iotdb.commons.utils.TestOnly;
37
import org.apache.iotdb.confignode.client.ConfigNodeRequestType;
38
import org.apache.iotdb.confignode.client.sync.SyncConfigNodeClientPool;
39
import org.apache.iotdb.confignode.conf.ConfigNodeConfig;
40
import org.apache.iotdb.confignode.conf.ConfigNodeConstant;
41
import org.apache.iotdb.confignode.conf.ConfigNodeDescriptor;
42
import org.apache.iotdb.confignode.conf.SystemPropertiesUtils;
43
import org.apache.iotdb.confignode.manager.ConfigManager;
44
import org.apache.iotdb.confignode.rpc.thrift.TConfigNodeRegisterReq;
45
import org.apache.iotdb.confignode.rpc.thrift.TConfigNodeRegisterResp;
46
import org.apache.iotdb.confignode.rpc.thrift.TConfigNodeRestartReq;
47
import org.apache.iotdb.confignode.rpc.thrift.TNodeVersionInfo;
48
import org.apache.iotdb.confignode.service.thrift.ConfigNodeRPCService;
49
import org.apache.iotdb.confignode.service.thrift.ConfigNodeRPCServiceProcessor;
50
import org.apache.iotdb.db.service.metrics.ProcessMetrics;
51
import org.apache.iotdb.metrics.config.MetricConfigDescriptor;
52
import org.apache.iotdb.metrics.metricsets.UpTimeMetrics;
53
import org.apache.iotdb.metrics.metricsets.cpu.CpuUsageMetrics;
54
import org.apache.iotdb.metrics.metricsets.disk.DiskMetrics;
55
import org.apache.iotdb.metrics.metricsets.jvm.JvmMetrics;
56
import org.apache.iotdb.metrics.metricsets.logback.LogbackMetrics;
57
import org.apache.iotdb.metrics.metricsets.net.NetMetrics;
58
import org.apache.iotdb.metrics.metricsets.system.SystemMetrics;
59
import org.apache.iotdb.rpc.TSStatusCode;
60

61
import org.slf4j.Logger;
62
import org.slf4j.LoggerFactory;
63

64
import java.io.File;
65
import java.io.IOException;
66
import java.nio.charset.Charset;
67
import java.util.ArrayList;
68
import java.util.Arrays;
69
import java.util.List;
70
import java.util.concurrent.TimeUnit;
71

72
public class ConfigNode implements ConfigNodeMBean {
73

74
  private static final Logger LOGGER = LoggerFactory.getLogger(ConfigNode.class);
1✔
75

76
  private static final ConfigNodeConfig CONF = ConfigNodeDescriptor.getInstance().getConf();
1✔
77

78
  private static final int STARTUP_RETRY_NUM = 10;
79
  private static final int SCHEDULE_WAITING_RETRY_NUM = 20;
80
  private static final long STARTUP_RETRY_INTERVAL_IN_MS = TimeUnit.SECONDS.toMillis(3);
1✔
81

82
  private static final int SEED_CONFIG_NODE_ID = 0;
83

84
  private static final int INIT_NON_SEED_CONFIG_NODE_ID = -1;
85

86
  private static final String CONFIGURATION = "IoTDB configuration: {}";
87

88
  private final String mbeanName =
1✔
89
      String.format(
1✔
90
          "%s:%s=%s",
91
          IoTDBConstant.IOTDB_SERVICE_JMX_NAME,
92
          ConfigNodeConstant.JMX_TYPE,
93
          ServiceType.CONFIG_NODE.getJmxName());
1✔
94
  private final RegisterManager registerManager = new RegisterManager();
1✔
95

96
  private ConfigManager configManager;
97

98
  private ConfigNode() {
1✔
99
    // We do not init anything here, so that we can re-initialize the instance in IT.
100
  }
1✔
101

102
  public static void main(String[] args) {
103
    LOGGER.info(
×
104
        "{} environment variables: {}",
105
        ConfigNodeConstant.GLOBAL_NAME,
106
        ConfigNodeConfig.getEnvironmentVariables());
×
107
    LOGGER.info(
×
108
        "{} default charset is: {}",
109
        ConfigNodeConstant.GLOBAL_NAME,
110
        Charset.defaultCharset().displayName());
×
111
    new ConfigNodeCommandLine().doMain(args);
×
112
  }
×
113

114
  public void active() {
115
    LOGGER.info("Activating {}...", ConfigNodeConstant.GLOBAL_NAME);
×
116

117
    try {
118
      processPid();
×
119
      // Add shutdown hook
120
      Runtime.getRuntime().addShutdownHook(new ConfigNodeShutdownHook());
×
121
      // Set up internal services
122
      setUpInternalServices();
×
123
      // Init ConfigManager
124
      initConfigManager();
×
125

126
      /* Restart */
127
      if (SystemPropertiesUtils.isRestarted()) {
×
128
        LOGGER.info("{} is in restarting process...", ConfigNodeConstant.GLOBAL_NAME);
×
129

130
        int configNodeId;
131
        if (!SystemPropertiesUtils.isSeedConfigNode()) {
×
132
          // The non-seed-ConfigNodes should send restart request and be checked (ip and port) by
133
          // leader before initConsensusManager
134
          sendRestartConfigNodeRequest();
×
135
          configNodeId = CONF.getConfigNodeId();
×
136
        } else {
137
          configNodeId = SEED_CONFIG_NODE_ID;
×
138
        }
139
        configManager.initConsensusManager();
×
140

141
        setUpMetricService();
×
142
        // Notice: We always set up Seed-ConfigNode's RPC service lastly to ensure
143
        // that the external service is not provided until ConfigNode is fully available
144
        setUpRPCService();
×
145
        LOGGER.info(CONFIGURATION, CONF.getConfigMessage());
×
146
        LOGGER.info(
×
147
            "{} has successfully restarted and joined the cluster: {}.",
148
            ConfigNodeConstant.GLOBAL_NAME,
149
            CONF.getClusterName());
×
150

151
        // Update item during restart
152
        // This will always be executed until the consensus write succeeds
153
        while (true) {
154
          TSStatus status =
×
155
              configManager
156
                  .getNodeManager()
×
157
                  .updateConfigNodeIfNecessary(
×
158
                      configNodeId,
159
                      new TNodeVersionInfo(IoTDBConstant.VERSION, IoTDBConstant.BUILD_INFO));
160
          if (status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
×
161
            break;
×
162
          } else {
163
            startUpSleep("restart ConfigNode failed! ");
×
164
          }
165
        }
×
166
        return;
×
167
      }
168

169
      /* Initial startup of Seed-ConfigNode */
170
      if (ConfigNodeDescriptor.getInstance().isSeedConfigNode()) {
×
171
        LOGGER.info(
×
172
            "The current {} is now starting as the Seed-ConfigNode.",
173
            ConfigNodeConstant.GLOBAL_NAME);
174

175
        /* Always set ClusterId and ConfigNodeId before initConsensusManager */
176
        CONF.setConfigNodeId(SEED_CONFIG_NODE_ID);
×
177
        configManager.initConsensusManager();
×
178

179
        // Persistence system parameters after the consensusGroup is built,
180
        // or the consensusGroup will not be initialized successfully otherwise.
181
        SystemPropertiesUtils.storeSystemParameters();
×
182

183
        // Seed-ConfigNode should apply itself when first start
184
        configManager
×
185
            .getNodeManager()
×
186
            .applyConfigNode(
×
187
                generateConfigNodeLocation(SEED_CONFIG_NODE_ID),
×
188
                new TNodeVersionInfo(IoTDBConstant.VERSION, IoTDBConstant.BUILD_INFO));
189
        setUpMetricService();
×
190
        // Notice: We always set up Seed-ConfigNode's RPC service lastly to ensure
191
        // that the external service is not provided until Seed-ConfigNode is fully initialized
192
        setUpRPCService();
×
193
        // The initial startup of Seed-ConfigNode finished
194
        LOGGER.info(CONFIGURATION, CONF.getConfigMessage());
×
195
        LOGGER.info(
×
196
            "{} has successfully started and joined the cluster: {}.",
197
            ConfigNodeConstant.GLOBAL_NAME,
198
            CONF.getClusterName());
×
199
        return;
×
200
      }
201

202
      /* Initial startup of Non-Seed-ConfigNode */
203
      // We set up Non-Seed ConfigNode's RPC service before sending the register request
204
      // in order to facilitate the scheduling of capacity expansion process in ConfigNode-leader
205
      setUpRPCService();
×
206
      sendRegisterConfigNodeRequest();
×
207
      // The initial startup of Non-Seed-ConfigNode is not yet finished,
208
      // we should wait for leader's scheduling
209
      LOGGER.info(CONFIGURATION, CONF.getConfigMessage());
×
210
      LOGGER.info(
×
211
          "{} {} has registered successfully. Waiting for the leader's scheduling to join the cluster: {}.",
212
          ConfigNodeConstant.GLOBAL_NAME,
213
          CONF.getConfigNodeId(),
×
214
          CONF.getClusterName());
×
215
      setUpMetricService();
×
216

217
      boolean isJoinedCluster = false;
×
218
      for (int retry = 0; retry < SCHEDULE_WAITING_RETRY_NUM; retry++) {
×
219
        if (!configManager
×
220
            .getConsensusManager()
×
221
            .getConsensusImpl()
×
222
            .getAllConsensusGroupIds()
×
223
            .isEmpty()) {
×
224
          isJoinedCluster = true;
×
225
          break;
×
226
        }
227
        startUpSleep("Waiting leader's scheduling is interrupted.");
×
228
      }
229

230
      if (!isJoinedCluster) {
×
231
        LOGGER.error(
×
232
            "The current ConfigNode can't joined the cluster because leader's scheduling failed. The possible cause is that the ip:port configuration is incorrect.");
233
        stop();
×
234
      }
235
    } catch (StartupException | IOException e) {
×
236
      LOGGER.error("Meet error while starting up.", e);
×
237
      stop();
×
238
    }
×
239
  }
×
240

241
  void processPid() {
242
    String pidFile = System.getProperty(ConfigNodeConstant.IOTDB_PIDFILE);
×
243
    if (pidFile != null) {
×
244
      new File(pidFile).deleteOnExit();
×
245
    }
246
  }
×
247

248
  private void setUpInternalServices() throws StartupException {
249
    // Setup JMXService
250
    registerManager.register(new JMXService());
×
251
    JMXService.registerMBean(this, mbeanName);
×
252

253
    LOGGER.info("Successfully setup internal services.");
×
254
  }
×
255

256
  private void setUpMetricService() throws StartupException {
257
    MetricConfigDescriptor.getInstance().getMetricConfig().setNodeId(CONF.getConfigNodeId());
×
258
    registerManager.register(MetricService.getInstance());
×
259
    // Bind predefined metric sets
260
    MetricService.getInstance().addMetricSet(new UpTimeMetrics());
×
261
    MetricService.getInstance().addMetricSet(new JvmMetrics());
×
262
    MetricService.getInstance().addMetricSet(new LogbackMetrics());
×
263
    MetricService.getInstance().addMetricSet(new ProcessMetrics());
×
264
    MetricService.getInstance().addMetricSet(new DiskMetrics(IoTDBConstant.CN_ROLE));
×
265
    MetricService.getInstance().addMetricSet(new NetMetrics(IoTDBConstant.CN_ROLE));
×
266
    MetricService.getInstance().addMetricSet(JvmGcMonitorMetrics.getInstance());
×
267
    MetricService.getInstance().addMetricSet(ClientManagerMetrics.getInstance());
×
268
    MetricService.getInstance().addMetricSet(ThreadPoolMetrics.getInstance());
×
269
    initCpuMetrics();
×
270
    initSystemMetrics();
×
271
  }
×
272

273
  private void initSystemMetrics() {
274
    ArrayList<String> diskDirs = new ArrayList<>();
×
275
    diskDirs.add(CONF.getSystemDir());
×
276
    diskDirs.add(CONF.getConsensusDir());
×
277
    SystemMetrics.getInstance().setDiskDirs(diskDirs);
×
278
    MetricService.getInstance().addMetricSet(SystemMetrics.getInstance());
×
279
  }
×
280

281
  private void initCpuMetrics() {
282
    List<String> threadModules = new ArrayList<>();
×
283
    Arrays.stream(ThreadModule.values()).forEach(x -> threadModules.add(x.toString()));
×
284
    List<String> pools = new ArrayList<>();
×
285
    Arrays.stream(ThreadName.values()).forEach(x -> pools.add(x.name()));
×
286
    MetricService.getInstance()
×
287
        .addMetricSet(
×
288
            new CpuUsageMetrics(
289
                threadModules,
290
                pools,
291
                x -> ThreadName.getModuleTheThreadBelongs(x).toString(),
×
292
                x -> ThreadName.getThreadPoolTheThreadBelongs(x).name()));
×
293
  }
×
294

295
  private void initConfigManager() {
296
    try {
297
      configManager = new ConfigManager();
×
298
    } catch (IOException e) {
×
299
      LOGGER.error("Can't start ConfigNode consensus group!", e);
×
300
      stop();
×
301
    }
×
302
    // Add some Metrics for configManager
303
    configManager.addMetrics();
×
304
    LOGGER.info("Successfully initialize ConfigManager.");
×
305
  }
×
306

307
  /**
308
   * Register Non-seed ConfigNode when first startup.
309
   *
310
   * @throws StartupException if register failed.
311
   * @throws IOException if consensus manager init failed.
312
   */
313
  private void sendRegisterConfigNodeRequest() throws StartupException, IOException {
314
    TConfigNodeRegisterReq req =
×
315
        new TConfigNodeRegisterReq(
316
            configManager.getClusterParameters(),
×
317
            generateConfigNodeLocation(INIT_NON_SEED_CONFIG_NODE_ID));
×
318

319
    req.setVersionInfo(new TNodeVersionInfo(IoTDBConstant.VERSION, IoTDBConstant.BUILD_INFO));
×
320

321
    TEndPoint targetConfigNode = CONF.getTargetConfigNode();
×
322
    if (targetConfigNode == null) {
×
323
      LOGGER.error(
×
324
          "Please set the cn_target_config_node_list parameter in iotdb-confignode.properties file.");
325
      throw new StartupException("The targetConfigNode setting in conf is empty");
×
326
    }
327

328
    for (int retry = 0; retry < STARTUP_RETRY_NUM; retry++) {
×
329
      TSStatus status;
330
      TConfigNodeRegisterResp resp = null;
×
331
      Object obj =
332
          SyncConfigNodeClientPool.getInstance()
×
333
              .sendSyncRequestToConfigNodeWithRetry(
×
334
                  targetConfigNode, req, ConfigNodeRequestType.REGISTER_CONFIG_NODE);
335

336
      if (obj instanceof TConfigNodeRegisterResp) {
×
337
        resp = (TConfigNodeRegisterResp) obj;
×
338
        status = resp.getStatus();
×
339
      } else {
340
        status = (TSStatus) obj;
×
341
      }
342

343
      if (status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
×
344
        if (resp == null) {
×
345
          LOGGER.error("The result of register ConfigNode is empty!");
×
346
          throw new StartupException("The result of register ConfigNode is empty!");
×
347
        }
348
        /* Always set ConfigNodeId before initConsensusManager */
349
        CONF.setConfigNodeId(resp.getConfigNodeId());
×
350
        configManager.initConsensusManager();
×
351
        return;
×
352
      } else if (status.getCode() == TSStatusCode.REDIRECTION_RECOMMEND.getStatusCode()) {
×
353
        targetConfigNode = status.getRedirectNode();
×
354
        LOGGER.info("ConfigNode need redirect to  {}.", targetConfigNode);
×
355
      } else {
356
        throw new StartupException(status.getMessage());
×
357
      }
358
      startUpSleep("Register ConfigNode failed!");
×
359
    }
360

361
    LOGGER.error(
×
362
        "The current ConfigNode can't send register request to the ConfigNode-leader after all retries!");
363
    stop();
×
364
  }
×
365

366
  private void sendRestartConfigNodeRequest() throws StartupException {
367

368
    TConfigNodeRestartReq req =
×
369
        new TConfigNodeRestartReq(
370
            CONF.getClusterName(), generateConfigNodeLocation(CONF.getConfigNodeId()));
×
371

372
    TEndPoint targetConfigNode = CONF.getTargetConfigNode();
×
373
    if (targetConfigNode == null) {
×
374
      LOGGER.error(
×
375
          "Please set the cn_target_config_node_list parameter in iotdb-confignode.properties file.");
376
      throw new StartupException("The targetConfigNode setting in conf is empty");
×
377
    }
378

379
    for (int retry = 0; retry < STARTUP_RETRY_NUM; retry++) {
×
380
      TSStatus status =
381
          (TSStatus)
382
              SyncConfigNodeClientPool.getInstance()
×
383
                  .sendSyncRequestToConfigNodeWithRetry(
×
384
                      targetConfigNode, req, ConfigNodeRequestType.RESTART_CONFIG_NODE);
385

386
      if (status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
×
387
        LOGGER.info("Registration request of current ConfigNode is accepted.");
×
388
        return;
×
389
      } else if (status.getCode() == TSStatusCode.REDIRECTION_RECOMMEND.getStatusCode()) {
×
390
        targetConfigNode = status.getRedirectNode();
×
391
        LOGGER.info("ConfigNode need redirect to  {}.", targetConfigNode);
×
392
      } else {
393
        throw new StartupException(status.getMessage());
×
394
      }
395
      startUpSleep("Register ConfigNode failed! ");
×
396
    }
397
  }
×
398

399
  private TConfigNodeLocation generateConfigNodeLocation(int configNodeId) {
400
    return new TConfigNodeLocation(
×
401
        configNodeId,
402
        new TEndPoint(CONF.getInternalAddress(), CONF.getInternalPort()),
×
403
        new TEndPoint(CONF.getInternalAddress(), CONF.getConsensusPort()));
×
404
  }
405

406
  private void startUpSleep(String errorMessage) throws StartupException {
407
    try {
408
      TimeUnit.MILLISECONDS.sleep(STARTUP_RETRY_INTERVAL_IN_MS);
×
409
    } catch (InterruptedException e) {
×
410
      Thread.currentThread().interrupt();
×
411
      throw new StartupException(errorMessage);
×
412
    }
×
413
  }
×
414

415
  private void setUpRPCService() throws StartupException {
416
    // Setup RPCService
417
    ConfigNodeRPCService configNodeRPCService = new ConfigNodeRPCService();
×
418
    ConfigNodeRPCServiceProcessor configNodeRPCServiceProcessor =
×
419
        new ConfigNodeRPCServiceProcessor(configManager);
420
    configNodeRPCService.initSyncedServiceImpl(configNodeRPCServiceProcessor);
×
421
    registerManager.register(configNodeRPCService);
×
422
  }
×
423

424
  /**
425
   * Deactivating ConfigNode internal services.
426
   *
427
   * @throws IOException if close configManager failed.
428
   */
429
  public void deactivate() throws IOException {
430
    LOGGER.info("Deactivating {}...", ConfigNodeConstant.GLOBAL_NAME);
×
431
    registerManager.deregisterAll();
×
432
    JMXService.deregisterMBean(mbeanName);
×
433
    if (configManager != null) {
×
434
      configManager.close();
×
435
    }
436
    LOGGER.info("{} is deactivated.", ConfigNodeConstant.GLOBAL_NAME);
×
437
  }
×
438

439
  public void stop() {
440
    try {
441
      deactivate();
×
442
    } catch (IOException e) {
×
443
      LOGGER.error("Meet error when deactivate ConfigNode", e);
×
444
    }
×
445
    System.exit(-1);
×
446
  }
×
447

448
  public ConfigManager getConfigManager() {
449
    return configManager;
1✔
450
  }
451

452
  @TestOnly
453
  public void setConfigManager(ConfigManager configManager) {
454
    this.configManager = configManager;
1✔
455
  }
1✔
456

457
  private static class ConfigNodeHolder {
458

459
    private static final ConfigNode INSTANCE = new ConfigNode();
1✔
460

461
    private ConfigNodeHolder() {
462
      // Empty constructor
463
    }
464
  }
465

466
  public static ConfigNode getInstance() {
467
    return ConfigNodeHolder.INSTANCE;
1✔
468
  }
469
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc