• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

apache / iotdb / #9654

pending completion
#9654

push

travis_ci

web-flow
[IOTDB-6073] Add ClientManager metrics (#10617)

279 of 279 new or added lines in 8 files covered. (100.0%)

79121 of 165733 relevant lines covered (47.74%)

0.48 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

6.67
/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/service/ConfigNode.java
1
/*
2
 * Licensed to the Apache Software Foundation (ASF) under one
3
 * or more contributor license agreements.  See the NOTICE file
4
 * distributed with this work for additional information
5
 * regarding copyright ownership.  The ASF licenses this file
6
 * to you under the Apache License, Version 2.0 (the
7
 * "License"); you may not use this file except in compliance
8
 * with the License.  You may obtain a copy of the License at
9
 *
10
 *     http://www.apache.org/licenses/LICENSE-2.0
11
 *
12
 * Unless required by applicable law or agreed to in writing,
13
 * software distributed under the License is distributed on an
14
 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15
 * KIND, either express or implied.  See the License for the
16
 * specific language governing permissions and limitations
17
 * under the License.
18
 */
19

20
package org.apache.iotdb.confignode.service;
21

22
import org.apache.iotdb.common.rpc.thrift.TConfigNodeLocation;
23
import org.apache.iotdb.common.rpc.thrift.TEndPoint;
24
import org.apache.iotdb.common.rpc.thrift.TSStatus;
25
import org.apache.iotdb.commons.client.ClientManagerMetrics;
26
import org.apache.iotdb.commons.concurrent.ThreadModule;
27
import org.apache.iotdb.commons.concurrent.ThreadName;
28
import org.apache.iotdb.commons.conf.IoTDBConstant;
29
import org.apache.iotdb.commons.exception.StartupException;
30
import org.apache.iotdb.commons.service.JMXService;
31
import org.apache.iotdb.commons.service.RegisterManager;
32
import org.apache.iotdb.commons.service.metric.MetricService;
33
import org.apache.iotdb.commons.utils.TestOnly;
34
import org.apache.iotdb.confignode.client.ConfigNodeRequestType;
35
import org.apache.iotdb.confignode.client.sync.SyncConfigNodeClientPool;
36
import org.apache.iotdb.confignode.conf.ConfigNodeConfig;
37
import org.apache.iotdb.confignode.conf.ConfigNodeConstant;
38
import org.apache.iotdb.confignode.conf.ConfigNodeDescriptor;
39
import org.apache.iotdb.confignode.conf.SystemPropertiesUtils;
40
import org.apache.iotdb.confignode.manager.ConfigManager;
41
import org.apache.iotdb.confignode.rpc.thrift.TConfigNodeRegisterReq;
42
import org.apache.iotdb.confignode.rpc.thrift.TConfigNodeRegisterResp;
43
import org.apache.iotdb.confignode.rpc.thrift.TConfigNodeRestartReq;
44
import org.apache.iotdb.confignode.service.thrift.ConfigNodeRPCService;
45
import org.apache.iotdb.confignode.service.thrift.ConfigNodeRPCServiceProcessor;
46
import org.apache.iotdb.db.service.metrics.ProcessMetrics;
47
import org.apache.iotdb.db.service.metrics.SystemMetrics;
48
import org.apache.iotdb.metrics.config.MetricConfigDescriptor;
49
import org.apache.iotdb.metrics.metricsets.UpTimeMetrics;
50
import org.apache.iotdb.metrics.metricsets.cpu.CpuUsageMetrics;
51
import org.apache.iotdb.metrics.metricsets.disk.DiskMetrics;
52
import org.apache.iotdb.metrics.metricsets.jvm.JvmMetrics;
53
import org.apache.iotdb.metrics.metricsets.logback.LogbackMetrics;
54
import org.apache.iotdb.metrics.metricsets.net.NetMetrics;
55
import org.apache.iotdb.rpc.TSStatusCode;
56

57
import org.slf4j.Logger;
58
import org.slf4j.LoggerFactory;
59

60
import java.io.File;
61
import java.io.IOException;
62
import java.nio.charset.Charset;
63
import java.util.ArrayList;
64
import java.util.Arrays;
65
import java.util.List;
66
import java.util.concurrent.TimeUnit;
67

68
public class ConfigNode implements ConfigNodeMBean {
69

70
  private static final Logger LOGGER = LoggerFactory.getLogger(ConfigNode.class);
1✔
71

72
  private static final ConfigNodeConfig CONF = ConfigNodeDescriptor.getInstance().getConf();
1✔
73

74
  private static final int STARTUP_RETRY_NUM = 10;
75
  private static final int SCHEDULE_WAITING_RETRY_NUM = 20;
76
  private static final long STARTUP_RETRY_INTERVAL_IN_MS = TimeUnit.SECONDS.toMillis(3);
1✔
77

78
  private static final int SEED_CONFIG_NODE_ID = 0;
79

80
  private static final int INIT_NON_SEED_CONFIG_NODE_ID = -1;
81

82
  private static final String CONFIGURATION = "IoTDB configuration: {}";
83

84
  private final String mbeanName =
1✔
85
      String.format(
1✔
86
          "%s:%s=%s",
87
          ConfigNodeConstant.CONFIGNODE_PACKAGE, ConfigNodeConstant.JMX_TYPE, "ConfigNode");
88
  private final RegisterManager registerManager = new RegisterManager();
1✔
89

90
  private ConfigManager configManager;
91

92
  private ConfigNode() {
1✔
93
    // We do not init anything here, so that we can re-initialize the instance in IT.
94
  }
1✔
95

96
  public static void main(String[] args) {
97
    LOGGER.info(
×
98
        "{} environment variables: {}",
99
        ConfigNodeConstant.GLOBAL_NAME,
100
        ConfigNodeConfig.getEnvironmentVariables());
×
101
    LOGGER.info(
×
102
        "{} default charset is: {}",
103
        ConfigNodeConstant.GLOBAL_NAME,
104
        Charset.defaultCharset().displayName());
×
105
    new ConfigNodeCommandLine().doMain(args);
×
106
  }
×
107

108
  public void active() {
109
    LOGGER.info("Activating {}...", ConfigNodeConstant.GLOBAL_NAME);
×
110

111
    try {
112
      processPid();
×
113
      // Add shutdown hook
114
      Runtime.getRuntime().addShutdownHook(new ConfigNodeShutdownHook());
×
115
      // Set up internal services
116
      setUpInternalServices();
×
117
      // Init ConfigManager
118
      initConfigManager();
×
119

120
      /* Restart */
121
      if (SystemPropertiesUtils.isRestarted()) {
×
122
        LOGGER.info("{} is in restarting process...", ConfigNodeConstant.GLOBAL_NAME);
×
123

124
        if (!SystemPropertiesUtils.isSeedConfigNode()) {
×
125
          // The non-seed-ConfigNodes should send restart request
126
          sendRestartConfigNodeRequest();
×
127
        }
128

129
        configManager.initConsensusManager();
×
130
        setUpMetricService();
×
131
        // Notice: We always set up Seed-ConfigNode's RPC service lastly to ensure
132
        // that the external service is not provided until ConfigNode is fully available
133
        setUpRPCService();
×
134
        LOGGER.info(CONFIGURATION, CONF.getConfigMessage());
×
135
        LOGGER.info(
×
136
            "{} has successfully restarted and joined the cluster: {}.",
137
            ConfigNodeConstant.GLOBAL_NAME,
138
            CONF.getClusterName());
×
139
        return;
×
140
      }
141

142
      /* Initial startup of Seed-ConfigNode */
143
      if (ConfigNodeDescriptor.getInstance().isSeedConfigNode()) {
×
144
        LOGGER.info(
×
145
            "The current {} is now starting as the Seed-ConfigNode.",
146
            ConfigNodeConstant.GLOBAL_NAME);
147

148
        /* Always set ClusterId and ConfigNodeId before initConsensusManager */
149
        CONF.setConfigNodeId(SEED_CONFIG_NODE_ID);
×
150
        configManager.initConsensusManager();
×
151

152
        // Persistence system parameters after the consensusGroup is built,
153
        // or the consensusGroup will not be initialized successfully otherwise.
154
        SystemPropertiesUtils.storeSystemParameters();
×
155

156
        // Seed-ConfigNode should apply itself when first start
157
        configManager
×
158
            .getNodeManager()
×
159
            .applyConfigNode(
×
160
                new TConfigNodeLocation(
161
                    SEED_CONFIG_NODE_ID,
162
                    new TEndPoint(CONF.getInternalAddress(), CONF.getInternalPort()),
×
163
                    new TEndPoint(CONF.getInternalAddress(), CONF.getConsensusPort())));
×
164
        setUpMetricService();
×
165
        // Notice: We always set up Seed-ConfigNode's RPC service lastly to ensure
166
        // that the external service is not provided until Seed-ConfigNode is fully initialized
167
        setUpRPCService();
×
168
        // The initial startup of Seed-ConfigNode finished
169
        LOGGER.info(CONFIGURATION, CONF.getConfigMessage());
×
170
        LOGGER.info(
×
171
            "{} has successfully started and joined the cluster: {}.",
172
            ConfigNodeConstant.GLOBAL_NAME,
173
            CONF.getClusterName());
×
174
        return;
×
175
      }
176

177
      /* Initial startup of Non-Seed-ConfigNode */
178
      // We set up Non-Seed ConfigNode's RPC service before sending the register request
179
      // in order to facilitate the scheduling of capacity expansion process in ConfigNode-leader
180
      setUpRPCService();
×
181
      sendRegisterConfigNodeRequest();
×
182
      // The initial startup of Non-Seed-ConfigNode is not yet finished,
183
      // we should wait for leader's scheduling
184
      LOGGER.info(CONFIGURATION, CONF.getConfigMessage());
×
185
      LOGGER.info(
×
186
          "{} {} has registered successfully. Waiting for the leader's scheduling to join the cluster: {}.",
187
          ConfigNodeConstant.GLOBAL_NAME,
188
          CONF.getConfigNodeId(),
×
189
          CONF.getClusterName());
×
190
      setUpMetricService();
×
191

192
      boolean isJoinedCluster = false;
×
193
      for (int retry = 0; retry < SCHEDULE_WAITING_RETRY_NUM; retry++) {
×
194
        if (!configManager
×
195
            .getConsensusManager()
×
196
            .getConsensusImpl()
×
197
            .getAllConsensusGroupIds()
×
198
            .isEmpty()) {
×
199
          isJoinedCluster = true;
×
200
          break;
×
201
        }
202

203
        try {
204
          TimeUnit.MILLISECONDS.sleep(STARTUP_RETRY_INTERVAL_IN_MS);
×
205
        } catch (InterruptedException e) {
×
206
          LOGGER.warn("Waiting leader's scheduling is interrupted.");
×
207
          Thread.currentThread().interrupt();
×
208
        }
×
209
      }
210

211
      if (!isJoinedCluster) {
×
212
        LOGGER.error(
×
213
            "The current ConfigNode can't joined the cluster because leader's scheduling failed. The possible cause is that the ip:port configuration is incorrect.");
214
        stop();
×
215
      }
216
    } catch (StartupException | IOException e) {
×
217
      LOGGER.error("Meet error while starting up.", e);
×
218
      stop();
×
219
    }
×
220
  }
×
221

222
  void processPid() {
223
    String pidFile = System.getProperty(ConfigNodeConstant.IOTDB_PIDFILE);
×
224
    if (pidFile != null) {
×
225
      new File(pidFile).deleteOnExit();
×
226
    }
227
  }
×
228

229
  private void setUpInternalServices() throws StartupException {
230
    // Setup JMXService
231
    registerManager.register(new JMXService());
×
232
    JMXService.registerMBean(this, mbeanName);
×
233

234
    LOGGER.info("Successfully setup internal services.");
×
235
  }
×
236

237
  private void setUpMetricService() throws StartupException {
238
    MetricConfigDescriptor.getInstance().getMetricConfig().setNodeId(CONF.getConfigNodeId());
×
239
    registerManager.register(MetricService.getInstance());
×
240
    // Bind predefined metric sets
241
    MetricService.getInstance().addMetricSet(new UpTimeMetrics());
×
242
    MetricService.getInstance().addMetricSet(new JvmMetrics());
×
243
    MetricService.getInstance().addMetricSet(new LogbackMetrics());
×
244
    MetricService.getInstance().addMetricSet(new ProcessMetrics());
×
245
    MetricService.getInstance().addMetricSet(new SystemMetrics(false));
×
246
    MetricService.getInstance().addMetricSet(new DiskMetrics(IoTDBConstant.CN_ROLE));
×
247
    MetricService.getInstance().addMetricSet(new NetMetrics(IoTDBConstant.CN_ROLE));
×
248
    MetricService.getInstance().addMetricSet(ClientManagerMetrics.getInstance());
×
249
    initCpuMetrics();
×
250
  }
×
251

252
  private void initCpuMetrics() {
253
    List<String> threadModules = new ArrayList<>();
×
254
    Arrays.stream(ThreadModule.values()).forEach(x -> threadModules.add(x.toString()));
×
255
    List<String> pools = new ArrayList<>();
×
256
    Arrays.stream(ThreadName.values()).forEach(x -> pools.add(x.name()));
×
257
    MetricService.getInstance()
×
258
        .addMetricSet(
×
259
            new CpuUsageMetrics(
260
                threadModules,
261
                pools,
262
                x -> ThreadName.getModuleTheThreadBelongs(x).toString(),
×
263
                x -> ThreadName.getThreadPoolTheThreadBelongs(x).name()));
×
264
  }
×
265

266
  private void initConfigManager() {
267
    try {
268
      configManager = new ConfigManager();
×
269
    } catch (IOException e) {
×
270
      LOGGER.error("Can't start ConfigNode consensus group!", e);
×
271
      stop();
×
272
    }
×
273
    // Add some Metrics for configManager
274
    configManager.addMetrics();
×
275
    LOGGER.info("Successfully initialize ConfigManager.");
×
276
  }
×
277

278
  /**
279
   * Register Non-seed ConfigNode when first startup.
280
   *
281
   * @throws StartupException if register failed.
282
   * @throws IOException if consensus manager init failed.
283
   */
284
  private void sendRegisterConfigNodeRequest() throws StartupException, IOException {
285
    TConfigNodeRegisterReq req =
×
286
        new TConfigNodeRegisterReq(
287
            configManager.getClusterParameters(),
×
288
            new TConfigNodeLocation(
289
                INIT_NON_SEED_CONFIG_NODE_ID,
290
                new TEndPoint(CONF.getInternalAddress(), CONF.getInternalPort()),
×
291
                new TEndPoint(CONF.getInternalAddress(), CONF.getConsensusPort())));
×
292

293
    TEndPoint targetConfigNode = CONF.getTargetConfigNode();
×
294
    if (targetConfigNode == null) {
×
295
      LOGGER.error(
×
296
          "Please set the cn_target_config_node_list parameter in iotdb-confignode.properties file.");
297
      throw new StartupException("The targetConfigNode setting in conf is empty");
×
298
    }
299

300
    for (int retry = 0; retry < STARTUP_RETRY_NUM; retry++) {
×
301
      TSStatus status;
302
      TConfigNodeRegisterResp resp = null;
×
303
      Object obj =
304
          SyncConfigNodeClientPool.getInstance()
×
305
              .sendSyncRequestToConfigNodeWithRetry(
×
306
                  targetConfigNode, req, ConfigNodeRequestType.REGISTER_CONFIG_NODE);
307

308
      if (obj instanceof TConfigNodeRegisterResp) {
×
309
        resp = (TConfigNodeRegisterResp) obj;
×
310
        status = resp.getStatus();
×
311
      } else {
312
        status = (TSStatus) obj;
×
313
      }
314

315
      if (status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
×
316
        if (resp == null) {
×
317
          LOGGER.error("The result of register ConfigNode is empty!");
×
318
          throw new StartupException("The result of register ConfigNode is empty!");
×
319
        }
320
        /* Always set ConfigNodeId before initConsensusManager */
321
        CONF.setConfigNodeId(resp.getConfigNodeId());
×
322
        configManager.initConsensusManager();
×
323
        return;
×
324
      } else if (status.getCode() == TSStatusCode.REDIRECTION_RECOMMEND.getStatusCode()) {
×
325
        targetConfigNode = status.getRedirectNode();
×
326
        LOGGER.info("ConfigNode need redirect to  {}.", targetConfigNode);
×
327
      } else {
328
        throw new StartupException(status.getMessage());
×
329
      }
330

331
      try {
332
        TimeUnit.MILLISECONDS.sleep(STARTUP_RETRY_INTERVAL_IN_MS);
×
333
      } catch (InterruptedException e) {
×
334
        Thread.currentThread().interrupt();
×
335
        throw new StartupException("Register ConfigNode failed!");
×
336
      }
×
337
    }
338

339
    LOGGER.error(
×
340
        "The current ConfigNode can't send register request to the ConfigNode-leader after all retries!");
341
    stop();
×
342
  }
×
343

344
  private void sendRestartConfigNodeRequest() throws StartupException {
345
    TConfigNodeRestartReq req =
×
346
        new TConfigNodeRestartReq(
347
            CONF.getClusterName(),
×
348
            new TConfigNodeLocation(
349
                CONF.getConfigNodeId(),
×
350
                new TEndPoint(CONF.getInternalAddress(), CONF.getInternalPort()),
×
351
                new TEndPoint(CONF.getInternalAddress(), CONF.getConsensusPort())));
×
352

353
    TEndPoint targetConfigNode = CONF.getTargetConfigNode();
×
354
    if (targetConfigNode == null) {
×
355
      LOGGER.error(
×
356
          "Please set the cn_target_config_node_list parameter in iotdb-confignode.properties file.");
357
      throw new StartupException("The targetConfigNode setting in conf is empty");
×
358
    }
359

360
    for (int retry = 0; retry < STARTUP_RETRY_NUM; retry++) {
×
361
      TSStatus status =
362
          (TSStatus)
363
              SyncConfigNodeClientPool.getInstance()
×
364
                  .sendSyncRequestToConfigNodeWithRetry(
×
365
                      targetConfigNode, req, ConfigNodeRequestType.RESTART_CONFIG_NODE);
366

367
      if (status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
×
368
        LOGGER.info("Registration request of current ConfigNode is accepted.");
×
369
        return;
×
370
      } else if (status.getCode() == TSStatusCode.REDIRECTION_RECOMMEND.getStatusCode()) {
×
371
        targetConfigNode = status.getRedirectNode();
×
372
        LOGGER.info("ConfigNode need redirect to  {}.", targetConfigNode);
×
373
      } else {
374
        throw new StartupException(status.getMessage());
×
375
      }
376

377
      try {
378
        TimeUnit.MILLISECONDS.sleep(STARTUP_RETRY_INTERVAL_IN_MS);
×
379
      } catch (InterruptedException e) {
×
380
        Thread.currentThread().interrupt();
×
381
        throw new StartupException("Register ConfigNode failed! ");
×
382
      }
×
383
    }
384
  }
×
385

386
  private void setUpRPCService() throws StartupException {
387
    // Setup RPCService
388
    ConfigNodeRPCService configNodeRPCService = new ConfigNodeRPCService();
×
389
    ConfigNodeRPCServiceProcessor configNodeRPCServiceProcessor =
×
390
        new ConfigNodeRPCServiceProcessor(configManager);
391
    configNodeRPCService.initSyncedServiceImpl(configNodeRPCServiceProcessor);
×
392
    registerManager.register(configNodeRPCService);
×
393
  }
×
394

395
  /**
396
   * Deactivating ConfigNode internal services.
397
   *
398
   * @throws IOException if close configManager failed.
399
   */
400
  public void deactivate() throws IOException {
401
    LOGGER.info("Deactivating {}...", ConfigNodeConstant.GLOBAL_NAME);
×
402
    registerManager.deregisterAll();
×
403
    JMXService.deregisterMBean(mbeanName);
×
404
    if (configManager != null) {
×
405
      configManager.close();
×
406
    }
407
    LOGGER.info("{} is deactivated.", ConfigNodeConstant.GLOBAL_NAME);
×
408
  }
×
409

410
  public void stop() {
411
    try {
412
      deactivate();
×
413
    } catch (IOException e) {
×
414
      LOGGER.error("Meet error when deactivate ConfigNode", e);
×
415
    }
×
416
    System.exit(-1);
×
417
  }
×
418

419
  public ConfigManager getConfigManager() {
420
    return configManager;
1✔
421
  }
422

423
  @TestOnly
424
  public void setConfigManager(ConfigManager configManager) {
425
    this.configManager = configManager;
1✔
426
  }
1✔
427

428
  private static class ConfigNodeHolder {
429

430
    private static final ConfigNode INSTANCE = new ConfigNode();
1✔
431

432
    private ConfigNodeHolder() {
433
      // Empty constructor
434
    }
435
  }
436

437
  public static ConfigNode getInstance() {
438
    return ConfigNodeHolder.INSTANCE;
1✔
439
  }
440
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc