• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

apache / iotdb / #9723

pending completion
#9723

push

travis_ci

web-flow
[To rel/1.2][IOTDB-6079] Cluster computing resource balance (#10746)

264 of 264 new or added lines in 11 files covered. (100.0%)

79280 of 165370 relevant lines covered (47.94%)

0.48 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

4.47
/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/service/DataNode.java
1
/*
2
 * Licensed to the Apache Software Foundation (ASF) under one
3
 * or more contributor license agreements.  See the NOTICE file
4
 * distributed with this work for additional information
5
 * regarding copyright ownership.  The ASF licenses this file
6
 * to you under the Apache License, Version 2.0 (the
7
 * "License"); you may not use this file except in compliance
8
 * with the License.  You may obtain a copy of the License at
9
 *
10
 *     http://www.apache.org/licenses/LICENSE-2.0
11
 *
12
 * Unless required by applicable law or agreed to in writing,
13
 * software distributed under the License is distributed on an
14
 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15
 * KIND, either express or implied.  See the License for the
16
 * specific language governing permissions and limitations
17
 * under the License.
18
 */
19

20
package org.apache.iotdb.db.service;
21

22
import org.apache.iotdb.common.rpc.thrift.TConfigNodeLocation;
23
import org.apache.iotdb.common.rpc.thrift.TConsensusGroupType;
24
import org.apache.iotdb.common.rpc.thrift.TDataNodeConfiguration;
25
import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation;
26
import org.apache.iotdb.common.rpc.thrift.TEndPoint;
27
import org.apache.iotdb.common.rpc.thrift.TNodeResource;
28
import org.apache.iotdb.commons.client.exception.ClientManagerException;
29
import org.apache.iotdb.commons.concurrent.IoTDBDefaultThreadExceptionHandler;
30
import org.apache.iotdb.commons.conf.CommonDescriptor;
31
import org.apache.iotdb.commons.conf.IoTDBConstant;
32
import org.apache.iotdb.commons.exception.StartupException;
33
import org.apache.iotdb.commons.file.SystemFileFactory;
34
import org.apache.iotdb.commons.pipe.plugin.meta.PipePluginMeta;
35
import org.apache.iotdb.commons.service.JMXService;
36
import org.apache.iotdb.commons.service.RegisterManager;
37
import org.apache.iotdb.commons.service.metric.MetricService;
38
import org.apache.iotdb.commons.trigger.TriggerInformation;
39
import org.apache.iotdb.commons.trigger.exception.TriggerManagementException;
40
import org.apache.iotdb.commons.trigger.service.TriggerExecutableManager;
41
import org.apache.iotdb.commons.udf.UDFInformation;
42
import org.apache.iotdb.commons.udf.service.UDFClassLoaderManager;
43
import org.apache.iotdb.commons.udf.service.UDFExecutableManager;
44
import org.apache.iotdb.commons.udf.service.UDFManagementService;
45
import org.apache.iotdb.confignode.rpc.thrift.TDataNodeRegisterReq;
46
import org.apache.iotdb.confignode.rpc.thrift.TDataNodeRegisterResp;
47
import org.apache.iotdb.confignode.rpc.thrift.TDataNodeRestartReq;
48
import org.apache.iotdb.confignode.rpc.thrift.TDataNodeRestartResp;
49
import org.apache.iotdb.confignode.rpc.thrift.TGetJarInListReq;
50
import org.apache.iotdb.confignode.rpc.thrift.TGetJarInListResp;
51
import org.apache.iotdb.confignode.rpc.thrift.TNodeVersionInfo;
52
import org.apache.iotdb.confignode.rpc.thrift.TRuntimeConfiguration;
53
import org.apache.iotdb.confignode.rpc.thrift.TSystemConfigurationResp;
54
import org.apache.iotdb.consensus.ConsensusFactory;
55
import org.apache.iotdb.db.conf.DataNodeStartupCheck;
56
import org.apache.iotdb.db.conf.IoTDBConfig;
57
import org.apache.iotdb.db.conf.IoTDBDescriptor;
58
import org.apache.iotdb.db.conf.IoTDBStartCheck;
59
import org.apache.iotdb.db.conf.rest.IoTDBRestServiceDescriptor;
60
import org.apache.iotdb.db.consensus.DataRegionConsensusImpl;
61
import org.apache.iotdb.db.consensus.SchemaRegionConsensusImpl;
62
import org.apache.iotdb.db.pipe.agent.PipeAgent;
63
import org.apache.iotdb.db.protocol.client.ConfigNodeClient;
64
import org.apache.iotdb.db.protocol.client.ConfigNodeClientManager;
65
import org.apache.iotdb.db.protocol.client.ConfigNodeInfo;
66
import org.apache.iotdb.db.protocol.thrift.impl.ClientRPCServiceImpl;
67
import org.apache.iotdb.db.protocol.thrift.impl.DataNodeRegionManager;
68
import org.apache.iotdb.db.queryengine.execution.exchange.MPPDataExchangeService;
69
import org.apache.iotdb.db.queryengine.execution.schedule.DriverScheduler;
70
import org.apache.iotdb.db.schemaengine.SchemaEngine;
71
import org.apache.iotdb.db.schemaengine.template.ClusterTemplateManager;
72
import org.apache.iotdb.db.service.metrics.DataNodeMetricsHelper;
73
import org.apache.iotdb.db.service.metrics.IoTDBInternalLocalReporter;
74
import org.apache.iotdb.db.storageengine.StorageEngine;
75
import org.apache.iotdb.db.storageengine.buffer.CacheHitRatioMonitor;
76
import org.apache.iotdb.db.storageengine.dataregion.compaction.schedule.CompactionTaskManager;
77
import org.apache.iotdb.db.storageengine.dataregion.flush.FlushManager;
78
import org.apache.iotdb.db.storageengine.dataregion.wal.WALManager;
79
import org.apache.iotdb.db.storageengine.dataregion.wal.utils.WALMode;
80
import org.apache.iotdb.db.storageengine.rescon.disk.TierManager;
81
import org.apache.iotdb.db.trigger.executor.TriggerExecutor;
82
import org.apache.iotdb.db.trigger.service.TriggerInformationUpdater;
83
import org.apache.iotdb.db.trigger.service.TriggerManagementService;
84
import org.apache.iotdb.metrics.config.MetricConfigDescriptor;
85
import org.apache.iotdb.metrics.utils.InternalReporterType;
86
import org.apache.iotdb.rpc.TSStatusCode;
87
import org.apache.iotdb.udf.api.exception.UDFManagementException;
88

89
import org.apache.thrift.TException;
90
import org.slf4j.Logger;
91
import org.slf4j.LoggerFactory;
92

93
import java.io.File;
94
import java.io.IOException;
95
import java.nio.ByteBuffer;
96
import java.nio.charset.Charset;
97
import java.util.ArrayList;
98
import java.util.List;
99
import java.util.concurrent.TimeUnit;
100
import java.util.stream.Collectors;
101

102
import static org.apache.iotdb.commons.conf.IoTDBConstant.DEFAULT_CLUSTER_NAME;
103

104
public class DataNode implements DataNodeMBean {
105

106
  private static final Logger logger = LoggerFactory.getLogger(DataNode.class);
1✔
107
  private static final IoTDBConfig config = IoTDBDescriptor.getInstance().getConfig();
1✔
108

109
  private final String mbeanName =
1✔
110
      String.format(
1✔
111
          "%s:%s=%s", "org.apache.iotdb.datanode.service", IoTDBConstant.JMX_TYPE, "DataNode");
112

113
  private static final File SYSTEM_PROPERTIES =
1✔
114
      SystemFileFactory.INSTANCE.getFile(
1✔
115
          config.getSchemaDir() + File.separator + IoTDBStartCheck.PROPERTIES_FILE_NAME);
1✔
116

117
  /**
118
   * When joining a cluster or getting configuration this node will retry at most "DEFAULT_RETRY"
119
   * times before returning a failure to the client.
120
   */
121
  private static final int DEFAULT_RETRY = 10;
122

123
  private static final long DEFAULT_RETRY_INTERVAL_IN_MS = config.getJoinClusterRetryIntervalMs();
1✔
124

125
  private final TEndPoint thisNode = new TEndPoint();
1✔
126

127
  /** Hold the information of trigger, udf...... */
128
  private final ResourcesInformationHolder resourcesInformationHolder =
1✔
129
      new ResourcesInformationHolder();
130

131
  /** Responsible for keeping trigger information up to date. */
132
  private final TriggerInformationUpdater triggerInformationUpdater =
1✔
133
      new TriggerInformationUpdater();
134

135
  private static final String REGISTER_INTERRUPTION =
136
      "Unexpected interruption when waiting to register to the cluster";
137

138
  private DataNode() {
1✔
139
    // We do not init anything here, so that we can re-initialize the instance in IT.
140
  }
1✔
141

142
  private static final RegisterManager registerManager = new RegisterManager();
1✔
143

144
  public static DataNode getInstance() {
145
    return DataNodeHolder.INSTANCE;
1✔
146
  }
147

148
  public static void main(String[] args) {
149
    logger.info("IoTDB-DataNode environment variables: {}", IoTDBConfig.getEnvironmentVariables());
×
150
    logger.info("IoTDB-DataNode default charset is: {}", Charset.defaultCharset().displayName());
×
151
    new DataNodeServerCommandLine().doMain(args);
×
152
  }
×
153

154
  protected void doAddNode() {
155
    boolean isFirstStart = false;
×
156
    try {
157
      // Check if this DataNode is start for the first time and do other pre-checks
158
      isFirstStart = prepareDataNode();
×
159

160
      // Set target ConfigNodeList from iotdb-datanode.properties file
161
      ConfigNodeInfo.getInstance().updateConfigNodeList(config.getTargetConfigNodeList());
×
162

163
      // Pull and check system configurations from ConfigNode-leader
164
      pullAndCheckSystemConfigurations();
×
165

166
      if (isFirstStart) {
×
167
        // Register this DataNode to the cluster when first start
168
        sendRegisterRequestToConfigNode();
×
169
      } else {
170
        // Send restart request of this DataNode
171
        sendRestartRequestToConfigNode();
×
172
      }
173
      // TierManager need DataNodeId to do some operations so the reset method need to be invoked
174
      // after DataNode adding
175
      TierManager.getInstance().resetFolders();
×
176
      // Active DataNode
177
      active();
×
178

179
      // Setup metric service
180
      setUpMetricService();
×
181

182
      // Setup rpc service
183
      setUpRPCService();
×
184

185
      // Serialize mutable system properties
186
      IoTDBStartCheck.getInstance().serializeMutableSystemPropertiesIfNecessary();
×
187

188
      logger.info("IoTDB configuration: {}", config.getConfigMessage());
×
189
      logger.info("Congratulation, IoTDB DataNode is set up successfully. Now, enjoy yourself!");
×
190

191
    } catch (StartupException | IOException e) {
×
192
      logger.error("Fail to start server", e);
×
193
      if (isFirstStart) {
×
194
        // Delete the system.properties file when first start failed.
195
        // Therefore, the next time this DataNode is start will still be seen as the first time.
196
        SYSTEM_PROPERTIES.deleteOnExit();
×
197
      }
198
      stop();
×
199
    }
×
200
  }
×
201

202
  /** Prepare cluster IoTDB-DataNode */
203
  private boolean prepareDataNode() throws StartupException, IOException {
204
    // Set cluster mode
205
    config.setClusterMode(true);
×
206

207
    // Notice: Consider this DataNode as first start if the system.properties file doesn't exist
208
    boolean isFirstStart = IoTDBStartCheck.getInstance().checkIsFirstStart();
×
209

210
    // Check target ConfigNodes
211
    for (TEndPoint endPoint : config.getTargetConfigNodeList()) {
×
212
      if (endPoint.getIp().equals("0.0.0.0")) {
×
213
        throw new StartupException(
×
214
            "The ip address of any target_config_node_list couldn't be 0.0.0.0");
215
      }
216
    }
×
217

218
    // Set this node
219
    thisNode.setIp(config.getInternalAddress());
×
220
    thisNode.setPort(config.getInternalPort());
×
221

222
    // Startup checks
223
    DataNodeStartupCheck checks = new DataNodeStartupCheck(IoTDBConstant.DN_ROLE, config);
×
224
    checks.startUpCheck();
×
225
    return isFirstStart;
×
226
  }
227

228
  /**
229
   * Pull and check the following system configurations:
230
   *
231
   * <p>1. GlobalConfig
232
   *
233
   * <p>2. RatisConfig
234
   *
235
   * <p>3. CQConfig
236
   *
237
   * @throws StartupException When failed connect to ConfigNode-leader
238
   */
239
  private void pullAndCheckSystemConfigurations() throws StartupException {
240
    logger.info("Pulling system configurations from the ConfigNode-leader...");
×
241

242
    /* Pull system configurations */
243
    int retry = DEFAULT_RETRY;
×
244
    TSystemConfigurationResp configurationResp = null;
×
245
    while (retry > 0) {
×
246
      try (ConfigNodeClient configNodeClient =
247
          ConfigNodeClientManager.getInstance().borrowClient(ConfigNodeInfo.CONFIG_REGION_ID)) {
×
248
        configurationResp = configNodeClient.getSystemConfiguration();
×
249
        break;
250
      } catch (TException | ClientManagerException e) {
×
251
        // Read ConfigNodes from system.properties and retry
252
        logger.warn(
×
253
            "Cannot pull system configurations from ConfigNode-leader, because: {}",
254
            e.getMessage());
×
255
        ConfigNodeInfo.getInstance().loadConfigNodeList();
×
256
        retry--;
×
257
      }
258

259
      try {
260
        // wait to start the next try
261
        Thread.sleep(DEFAULT_RETRY_INTERVAL_IN_MS);
×
262
      } catch (InterruptedException e) {
×
263
        Thread.currentThread().interrupt();
×
264
        logger.warn(REGISTER_INTERRUPTION, e);
×
265
        retry = -1;
×
266
      }
×
267
    }
268
    if (configurationResp == null) {
×
269
      // All tries failed
270
      logger.error(
×
271
          "Cannot pull system configurations from ConfigNode-leader after {} retries",
272
          DEFAULT_RETRY);
×
273
      throw new StartupException("Cannot pull system configurations from ConfigNode-leader");
×
274
    }
275

276
    /* Load system configurations */
277
    IoTDBDescriptor.getInstance().loadGlobalConfig(configurationResp.globalConfig);
×
278
    IoTDBDescriptor.getInstance().loadRatisConfig(configurationResp.ratisConfig);
×
279
    IoTDBDescriptor.getInstance().loadCQConfig(configurationResp.cqConfig);
×
280
    CommonDescriptor.getInstance().loadGlobalConfig(configurationResp.globalConfig);
×
281

282
    /* Set cluster consensus protocol class */
283
    if (!IoTDBStartCheck.getInstance()
×
284
        .checkConsensusProtocolExists(TConsensusGroupType.DataRegion)) {
×
285
      config.setDataRegionConsensusProtocolClass(
×
286
          configurationResp.globalConfig.getDataRegionConsensusProtocolClass());
×
287
    }
288

289
    if (!IoTDBStartCheck.getInstance()
×
290
        .checkConsensusProtocolExists(TConsensusGroupType.SchemaRegion)) {
×
291
      config.setSchemaRegionConsensusProtocolClass(
×
292
          configurationResp.globalConfig.getSchemaRegionConsensusProtocolClass());
×
293
    }
294

295
    /* Check system configurations */
296
    try {
297
      IoTDBStartCheck.getInstance().checkSystemConfig();
×
298
      IoTDBStartCheck.getInstance().checkDirectory();
×
299
      if (!config.getDataRegionConsensusProtocolClass().equals(ConsensusFactory.IOT_CONSENSUS)) {
×
300
        // In current implementation, only IoTConsensus need separated memory from Consensus
301
        IoTDBDescriptor.getInstance().reclaimConsensusMemory();
×
302
      }
303
    } catch (Exception e) {
×
304
      throw new StartupException(e.getMessage());
×
305
    }
×
306

307
    logger.info("Successfully pull system configurations from ConfigNode-leader.");
×
308
  }
×
309

310
  /**
311
   * Store runtime configurations, which includes:
312
   *
313
   * <p>1. All ConfigNodes in cluster
314
   *
315
   * <p>2. All template information
316
   *
317
   * <p>3. All UDF information
318
   *
319
   * <p>4. All trigger information
320
   *
321
   * <p>5. All Pipe information
322
   *
323
   * <p>6. All TTL information
324
   */
325
  private void storeRuntimeConfigurations(
326
      List<TConfigNodeLocation> configNodeLocations, TRuntimeConfiguration runtimeConfiguration) {
327
    /* Store ConfigNodeList */
328
    List<TEndPoint> configNodeList = new ArrayList<>();
×
329
    for (TConfigNodeLocation configNodeLocation : configNodeLocations) {
×
330
      configNodeList.add(configNodeLocation.getInternalEndPoint());
×
331
    }
×
332
    ConfigNodeInfo.getInstance().updateConfigNodeList(configNodeList);
×
333

334
    /* Store templateSetInfo */
335
    ClusterTemplateManager.getInstance()
×
336
        .updateTemplateSetInfo(runtimeConfiguration.getTemplateInfo());
×
337

338
    /* Store udfInformationList */
339
    getUDFInformationList(runtimeConfiguration.getAllUDFInformation());
×
340

341
    /* Store triggerInformationList */
342
    getTriggerInformationList(runtimeConfiguration.getAllTriggerInformation());
×
343

344
    /* Store pipeInformationList */
345
    getPipeInformationList(runtimeConfiguration.getAllPipeInformation());
×
346

347
    /* Store ttl information */
348
    StorageEngine.getInstance().updateTTLInfo(runtimeConfiguration.getAllTTLInformation());
×
349
  }
×
350

351
  /**
352
   * Register this DataNode into cluster.
353
   *
354
   * @throws StartupException if register failed.
355
   * @throws IOException if serialize cluster name and dataNode Id failed.
356
   */
357
  private void sendRegisterRequestToConfigNode() throws StartupException, IOException {
358
    logger.info("Sending register request to ConfigNode-leader...");
×
359

360
    /* Send register request */
361
    int retry = DEFAULT_RETRY;
×
362
    TDataNodeRegisterReq req = new TDataNodeRegisterReq();
×
363
    req.setDataNodeConfiguration(generateDataNodeConfiguration());
×
364
    req.setClusterName(config.getClusterName());
×
365
    req.setVersionInfo(new TNodeVersionInfo(IoTDBConstant.VERSION, IoTDBConstant.BUILD_INFO));
×
366
    TDataNodeRegisterResp dataNodeRegisterResp = null;
×
367
    while (retry > 0) {
×
368
      try (ConfigNodeClient configNodeClient =
369
          ConfigNodeClientManager.getInstance().borrowClient(ConfigNodeInfo.CONFIG_REGION_ID)) {
×
370
        dataNodeRegisterResp = configNodeClient.registerDataNode(req);
×
371
        break;
372
      } catch (TException | ClientManagerException e) {
×
373
        // Read ConfigNodes from system.properties and retry
374
        logger.warn("Cannot register to the cluster, because: {}", e.getMessage());
×
375
        ConfigNodeInfo.getInstance().loadConfigNodeList();
×
376
        retry--;
×
377
      }
378

379
      try {
380
        // Wait to start the next try
381
        Thread.sleep(DEFAULT_RETRY_INTERVAL_IN_MS);
×
382
      } catch (InterruptedException e) {
×
383
        Thread.currentThread().interrupt();
×
384
        logger.warn(REGISTER_INTERRUPTION, e);
×
385
        retry = -1;
×
386
      }
×
387
    }
388
    if (dataNodeRegisterResp == null) {
×
389
      // All tries failed
390
      logger.error(
×
391
          "Cannot register into cluster after {} retries. "
392
              + "Please check dn_target_config_node_list in iotdb-datanode.properties.",
393
          DEFAULT_RETRY);
×
394
      throw new StartupException("Cannot register into the cluster.");
×
395
    }
396

397
    if (dataNodeRegisterResp.getStatus().getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
×
398

399
      /* Store runtime configurations when register success */
400
      int dataNodeID = dataNodeRegisterResp.getDataNodeId();
×
401
      config.setDataNodeId(dataNodeID);
×
402
      IoTDBStartCheck.getInstance()
×
403
          .serializeClusterNameAndDataNodeId(config.getClusterName(), dataNodeID);
×
404

405
      storeRuntimeConfigurations(
×
406
          dataNodeRegisterResp.getConfigNodeList(), dataNodeRegisterResp.getRuntimeConfiguration());
×
407

408
      logger.info("Successfully register to the cluster: {}", config.getClusterName());
×
409
    } else {
×
410
      /* Throw exception when register failed */
411
      logger.error(dataNodeRegisterResp.getStatus().getMessage());
×
412
      throw new StartupException("Cannot register to the cluster.");
×
413
    }
414
  }
×
415

416
  private void sendRestartRequestToConfigNode() throws StartupException {
417
    logger.info("Sending restart request to ConfigNode-leader...");
×
418

419
    /* Send restart request */
420
    int retry = DEFAULT_RETRY;
×
421
    TDataNodeRestartReq req = new TDataNodeRestartReq();
×
422
    req.setClusterName(
×
423
        config.getClusterName() == null ? DEFAULT_CLUSTER_NAME : config.getClusterName());
×
424
    req.setDataNodeConfiguration(generateDataNodeConfiguration());
×
425
    req.setVersionInfo(new TNodeVersionInfo(IoTDBConstant.VERSION, IoTDBConstant.BUILD_INFO));
×
426
    TDataNodeRestartResp dataNodeRestartResp = null;
×
427
    while (retry > 0) {
×
428
      try (ConfigNodeClient configNodeClient =
429
          ConfigNodeClientManager.getInstance().borrowClient(ConfigNodeInfo.CONFIG_REGION_ID)) {
×
430
        dataNodeRestartResp = configNodeClient.restartDataNode(req);
×
431
        break;
432
      } catch (TException | ClientManagerException e) {
×
433
        // Read ConfigNodes from system.properties and retry
434
        logger.warn(
×
435
            "Cannot send restart request to the ConfigNode-leader, because: {}", e.getMessage());
×
436
        ConfigNodeInfo.getInstance().loadConfigNodeList();
×
437
        retry--;
×
438
      }
439

440
      try {
441
        // wait to start the next try
442
        Thread.sleep(DEFAULT_RETRY_INTERVAL_IN_MS);
×
443
      } catch (InterruptedException e) {
×
444
        Thread.currentThread().interrupt();
×
445
        logger.warn(REGISTER_INTERRUPTION, e);
×
446
        retry = -1;
×
447
      }
×
448
    }
449
    if (dataNodeRestartResp == null) {
×
450
      // All tries failed
451
      logger.error(
×
452
          "Cannot send restart DataNode request to ConfigNode-leader after {} retries. "
453
              + "Please check dn_target_config_node_list in iotdb-datanode.properties.",
454
          DEFAULT_RETRY);
×
455
      throw new StartupException("Cannot send restart DataNode request to ConfigNode-leader.");
×
456
    }
457

458
    if (dataNodeRestartResp.getStatus().getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
×
459
      /* Store runtime configurations when restart request is accepted */
460
      storeRuntimeConfigurations(
×
461
          dataNodeRestartResp.getConfigNodeList(), dataNodeRestartResp.getRuntimeConfiguration());
×
462
      logger.info("Restart request to cluster: {} is accepted.", config.getClusterName());
×
463
    } else {
464
      /* Throw exception when restart is rejected */
465
      throw new StartupException(dataNodeRestartResp.getStatus().getMessage());
×
466
    }
467
  }
×
468

469
  private void prepareResources() throws StartupException {
470
    prepareUDFResources();
×
471
    prepareTriggerResources();
×
472
    preparePipeResources();
×
473
  }
×
474

475
  /**
476
   * Register services and set up DataNode.
477
   *
478
   * @throws StartupException if start up failed.
479
   */
480
  private void active() throws StartupException {
481
    try {
482
      processPid();
×
483
      setUp();
×
484
    } catch (StartupException e) {
×
485
      logger.error("Meet error while starting up.", e);
×
486
      throw new StartupException("Error in activating IoTDB DataNode.");
×
487
    }
×
488
    logger.info("IoTDB DataNode has started.");
×
489

490
    try {
491
      SchemaRegionConsensusImpl.setupAndGetInstance().start();
×
492
      DataRegionConsensusImpl.setupAndGetInstance().start();
×
493
    } catch (IOException e) {
×
494
      throw new StartupException(e);
×
495
    }
×
496
  }
×
497

498
  void processPid() {
499
    String pidFile = System.getProperty(IoTDBConstant.IOTDB_PIDFILE);
1✔
500
    if (pidFile != null) {
1✔
501
      new File(pidFile).deleteOnExit();
1✔
502
    }
503
  }
1✔
504

505
  private void setUp() throws StartupException {
506
    logger.info("Setting up IoTDB DataNode...");
×
507
    registerManager.register(new JMXService());
×
508
    JMXService.registerMBean(getInstance(), mbeanName);
×
509

510
    // Get resources for trigger,udf,pipe...
511
    prepareResources();
×
512

513
    Runtime.getRuntime().addShutdownHook(new IoTDBShutdownHook());
×
514
    setUncaughtExceptionHandler();
×
515

516
    logger.info("Recover the schema...");
×
517
    initSchemaEngine();
×
518
    registerManager.register(FlushManager.getInstance());
×
519
    registerManager.register(CacheHitRatioMonitor.getInstance());
×
520

521
    // Close wal when using ratis consensus
522
    if (config.isClusterMode()
×
523
        && config.getDataRegionConsensusProtocolClass().equals(ConsensusFactory.RATIS_CONSENSUS)) {
×
524
      config.setWalMode(WALMode.DISABLE);
×
525
    }
526
    registerManager.register(WALManager.getInstance());
×
527

528
    // In mpp mode we need to start some other services
529
    registerManager.register(StorageEngine.getInstance());
×
530
    registerManager.register(MPPDataExchangeService.getInstance());
×
531
    registerManager.register(DriverScheduler.getInstance());
×
532

533
    registerUdfServices();
×
534

535
    logger.info(
×
536
        "IoTDB DataNode is setting up, some databases may not be ready now, please wait several seconds...");
537

538
    while (!StorageEngine.getInstance().isAllSgReady()) {
×
539
      try {
540
        TimeUnit.MILLISECONDS.sleep(1000);
×
541
      } catch (InterruptedException e) {
×
542
        logger.warn("IoTDB DataNode failed to set up.", e);
×
543
        Thread.currentThread().interrupt();
×
544
        return;
×
545
      }
×
546
    }
547

548
    // Must init after SchemaEngine and StorageEngine prepared well
549
    DataNodeRegionManager.getInstance().init();
×
550

551
    // Start region migrate service
552
    registerManager.register(RegionMigrateService.getInstance());
×
553

554
    registerManager.register(CompactionTaskManager.getInstance());
×
555

556
    registerManager.register(PipeAgent.runtime());
×
557
  }
×
558

559
  /** Set up RPC and protocols after DataNode is available */
560
  private void setUpRPCService() throws StartupException {
561
    // Start InternalRPCService to indicate that the current DataNode can accept cluster scheduling
562
    registerManager.register(DataNodeInternalRPCService.getInstance());
×
563
    // Start InternalRPCService to indicate that the current DataNode can accept request from MLNode
564
    if (config.isEnableMLNodeService()) {
×
565
      registerManager.register(MLNodeRPCService.getInstance());
×
566
    }
567

568
    // Notice: During the period between starting the internal RPC service
569
    // and starting the client RPC service , some requests may fail because
570
    // DataNode is not marked as RUNNING by ConfigNode-leader yet.
571

572
    // Start client RPCService to indicate that the current DataNode provide external services
573
    IoTDBDescriptor.getInstance()
×
574
        .getConfig()
×
575
        .setRpcImplClassName(ClientRPCServiceImpl.class.getName());
×
576
    if (config.isEnableRpcService()) {
×
577
      registerManager.register(RPCService.getInstance());
×
578
    }
579
    // init service protocols
580
    initProtocols();
×
581
  }
×
582

583
  private void setUpMetricService() throws StartupException {
584
    MetricConfigDescriptor.getInstance().getMetricConfig().setNodeId(config.getDataNodeId());
×
585
    registerManager.register(MetricService.getInstance());
×
586

587
    // init metric service
588
    if (MetricConfigDescriptor.getInstance()
×
589
        .getMetricConfig()
×
590
        .getInternalReportType()
×
591
        .equals(InternalReporterType.IOTDB)) {
×
592
      MetricService.getInstance().updateInternalReporter(new IoTDBInternalLocalReporter());
×
593
    }
594
    MetricService.getInstance().startInternalReporter();
×
595
    // bind predefined metrics
596
    DataNodeMetricsHelper.bind();
×
597
  }
×
598

599
  public static TDataNodeLocation generateDataNodeLocation() {
600
    TDataNodeLocation location = new TDataNodeLocation();
×
601
    location.setDataNodeId(config.getDataNodeId());
×
602
    location.setClientRpcEndPoint(new TEndPoint(config.getRpcAddress(), config.getRpcPort()));
×
603
    location.setInternalEndPoint(
×
604
        new TEndPoint(config.getInternalAddress(), config.getInternalPort()));
×
605
    location.setMPPDataExchangeEndPoint(
×
606
        new TEndPoint(config.getInternalAddress(), config.getMppDataExchangePort()));
×
607
    location.setDataRegionConsensusEndPoint(
×
608
        new TEndPoint(config.getInternalAddress(), config.getDataRegionConsensusPort()));
×
609
    location.setSchemaRegionConsensusEndPoint(
×
610
        new TEndPoint(config.getInternalAddress(), config.getSchemaRegionConsensusPort()));
×
611
    return location;
×
612
  }
613

614
  /**
615
   * Generate dataNodeConfiguration.
616
   *
617
   * @return TDataNodeConfiguration
618
   */
619
  private TDataNodeConfiguration generateDataNodeConfiguration() {
620
    // Set DataNodeLocation
621
    TDataNodeLocation location = generateDataNodeLocation();
×
622

623
    // Set NodeResource
624
    TNodeResource resource = new TNodeResource();
×
625
    resource.setCpuCoreNum(Runtime.getRuntime().availableProcessors());
×
626
    resource.setMaxMemory(Runtime.getRuntime().totalMemory());
×
627

628
    return new TDataNodeConfiguration(location, resource);
×
629
  }
630

631
  private void registerUdfServices() throws StartupException {
632
    registerManager.register(TemporaryQueryDataFileService.getInstance());
×
633
    registerManager.register(UDFClassLoaderManager.setupAndGetInstance(config.getUdfDir()));
×
634
  }
×
635

636
  private void initUDFRelatedInstance() throws StartupException {
637
    try {
638
      UDFExecutableManager.setupAndGetInstance(config.getUdfTemporaryLibDir(), config.getUdfDir());
×
639
      UDFClassLoaderManager.setupAndGetInstance(config.getUdfDir());
×
640
    } catch (IOException e) {
×
641
      throw new StartupException(e);
×
642
    }
×
643
  }
×
644

645
  private void prepareUDFResources() throws StartupException {
646
    initUDFRelatedInstance();
×
647
    if (resourcesInformationHolder.getUDFInformationList() == null
×
648
        || resourcesInformationHolder.getUDFInformationList().isEmpty()) {
×
649
      return;
×
650
    }
651

652
    // Get jars from config node
653
    List<UDFInformation> udfNeedJarList = getJarListForUDF();
×
654
    int index = 0;
×
655
    while (index < udfNeedJarList.size()) {
×
656
      List<UDFInformation> curList = new ArrayList<>();
×
657
      int offset = 0;
×
658
      while (offset < ResourcesInformationHolder.getJarNumOfOneRpc()
×
659
          && index + offset < udfNeedJarList.size()) {
×
660
        curList.add(udfNeedJarList.get(index + offset));
×
661
        offset++;
×
662
      }
663
      index += (offset + 1);
×
664
      getJarOfUDFs(curList);
×
665
    }
×
666

667
    // Create instances of udf and do registration
668
    try {
669
      for (UDFInformation udfInformation : resourcesInformationHolder.getUDFInformationList()) {
×
670
        UDFManagementService.getInstance().doRegister(udfInformation);
×
671
      }
×
672
    } catch (Exception e) {
×
673
      throw new StartupException(e);
×
674
    }
×
675

676
    logger.debug("successfully registered all the UDFs");
×
677
    if (logger.isDebugEnabled()) {
×
678
      for (UDFInformation udfInformation :
679
          UDFManagementService.getInstance().getAllUDFInformation()) {
×
680
        logger.debug("get udf: {}", udfInformation.getFunctionName());
×
681
      }
682
    }
683
  }
×
684

685
  private void getJarOfUDFs(List<UDFInformation> udfInformationList) throws StartupException {
686
    try (ConfigNodeClient configNodeClient =
687
        ConfigNodeClientManager.getInstance().borrowClient(ConfigNodeInfo.CONFIG_REGION_ID)) {
×
688
      List<String> jarNameList =
×
689
          udfInformationList.stream().map(UDFInformation::getJarName).collect(Collectors.toList());
×
690
      TGetJarInListResp resp = configNodeClient.getUDFJar(new TGetJarInListReq(jarNameList));
×
691
      if (resp.getStatus().getCode() == TSStatusCode.EXECUTE_STATEMENT_ERROR.getStatusCode()) {
×
692
        throw new StartupException("Failed to get UDF jar from config node.");
×
693
      }
694
      List<ByteBuffer> jarList = resp.getJarList();
×
695
      for (int i = 0; i < udfInformationList.size(); i++) {
×
696
        UDFExecutableManager.getInstance()
×
697
            .saveToInstallDir(jarList.get(i), udfInformationList.get(i).getJarName());
×
698
      }
699
    } catch (IOException | TException | ClientManagerException e) {
×
700
      throw new StartupException(e);
×
701
    }
×
702
  }
×
703

704
  /** Generate a list for UDFs that do not have jar on this node. */
705
  private List<UDFInformation> getJarListForUDF() {
706
    List<UDFInformation> res = new ArrayList<>();
×
707
    for (UDFInformation udfInformation : resourcesInformationHolder.getUDFInformationList()) {
×
708
      if (udfInformation.isUsingURI()) {
×
709
        // Jar does not exist, add current udfInformation to list
710
        if (!UDFExecutableManager.getInstance()
×
711
            .hasFileUnderInstallDir(udfInformation.getJarName())) {
×
712
          res.add(udfInformation);
×
713
        } else {
714
          try {
715
            // Local jar has conflicts with jar on config node, add current triggerInformation to
716
            // list
717
            if (UDFManagementService.getInstance().isLocalJarConflicted(udfInformation)) {
×
718
              res.add(udfInformation);
×
719
            }
720
          } catch (UDFManagementException e) {
×
721
            res.add(udfInformation);
×
722
          }
×
723
        }
724
      }
725
    }
×
726
    return res;
×
727
  }
728

729
  private void getUDFInformationList(List<ByteBuffer> allUDFInformation) {
730
    if (allUDFInformation != null && !allUDFInformation.isEmpty()) {
×
731
      List<UDFInformation> list = new ArrayList<>();
×
732
      for (ByteBuffer UDFInformationByteBuffer : allUDFInformation) {
×
733
        list.add(UDFInformation.deserialize(UDFInformationByteBuffer));
×
734
      }
×
735
      resourcesInformationHolder.setUDFInformationList(list);
×
736
    }
737
  }
×
738

739
  private void initTriggerRelatedInstance() throws StartupException {
740
    try {
741
      TriggerExecutableManager.setupAndGetInstance(
×
742
          config.getTriggerTemporaryLibDir(), config.getTriggerDir());
×
743
    } catch (IOException e) {
×
744
      throw new StartupException(e);
×
745
    }
×
746
  }
×
747

748
  private void prepareTriggerResources() throws StartupException {
749
    initTriggerRelatedInstance();
×
750
    if (resourcesInformationHolder.getTriggerInformationList() == null
×
751
        || resourcesInformationHolder.getTriggerInformationList().isEmpty()) {
×
752
      return;
×
753
    }
754

755
    // Get jars from config node
756
    List<TriggerInformation> triggerNeedJarList = getJarListForTrigger();
×
757
    int index = 0;
×
758
    while (index < triggerNeedJarList.size()) {
×
759
      List<TriggerInformation> curList = new ArrayList<>();
×
760
      int offset = 0;
×
761
      while (offset < ResourcesInformationHolder.getJarNumOfOneRpc()
×
762
          && index + offset < triggerNeedJarList.size()) {
×
763
        curList.add(triggerNeedJarList.get(index + offset));
×
764
        offset++;
×
765
      }
766
      index += (offset + 1);
×
767
      getJarOfTriggers(curList);
×
768
    }
×
769

770
    // Create instances of triggers and do registration
771
    try {
772
      for (TriggerInformation triggerInformation :
773
          resourcesInformationHolder.getTriggerInformationList()) {
×
774
        TriggerManagementService.getInstance().doRegister(triggerInformation, true);
×
775
      }
×
776
    } catch (Exception e) {
×
777
      throw new StartupException(e);
×
778
    }
×
779
    logger.debug("successfully registered all the triggers");
×
780
    if (logger.isDebugEnabled()) {
×
781
      for (TriggerInformation triggerInformation :
782
          TriggerManagementService.getInstance().getAllTriggerInformationInTriggerTable()) {
×
783
        logger.debug("get trigger: {}", triggerInformation.getTriggerName());
×
784
      }
×
785
      for (TriggerExecutor triggerExecutor :
786
          TriggerManagementService.getInstance().getAllTriggerExecutors()) {
×
787
        logger.debug(
×
788
            "get trigger executor: {}", triggerExecutor.getTriggerInformation().getTriggerName());
×
789
      }
×
790
    }
791
    // Start TriggerInformationUpdater
792
    triggerInformationUpdater.startTriggerInformationUpdater();
×
793
  }
×
794

795
  private void getJarOfTriggers(List<TriggerInformation> triggerInformationList)
796
      throws StartupException {
797
    try (ConfigNodeClient configNodeClient =
798
        ConfigNodeClientManager.getInstance().borrowClient(ConfigNodeInfo.CONFIG_REGION_ID)) {
×
799
      List<String> jarNameList =
×
800
          triggerInformationList.stream()
×
801
              .map(TriggerInformation::getJarName)
×
802
              .collect(Collectors.toList());
×
803
      TGetJarInListResp resp = configNodeClient.getTriggerJar(new TGetJarInListReq(jarNameList));
×
804
      if (resp.getStatus().getCode() == TSStatusCode.EXECUTE_STATEMENT_ERROR.getStatusCode()) {
×
805
        throw new StartupException("Failed to get trigger jar from config node.");
×
806
      }
807
      List<ByteBuffer> jarList = resp.getJarList();
×
808
      for (int i = 0; i < triggerInformationList.size(); i++) {
×
809
        TriggerExecutableManager.getInstance()
×
810
            .saveToInstallDir(jarList.get(i), triggerInformationList.get(i).getJarName());
×
811
      }
812
    } catch (IOException | TException | ClientManagerException e) {
×
813
      throw new StartupException(e);
×
814
    }
×
815
  }
×
816

817
  /** Generate a list for triggers that do not have jar on this node. */
818
  private List<TriggerInformation> getJarListForTrigger() {
819
    List<TriggerInformation> res = new ArrayList<>();
×
820
    for (TriggerInformation triggerInformation :
821
        resourcesInformationHolder.getTriggerInformationList()) {
×
822
      if (triggerInformation.isUsingURI()) {
×
823
        // jar does not exist, add current triggerInformation to list
824
        if (!TriggerExecutableManager.getInstance()
×
825
            .hasFileUnderInstallDir(triggerInformation.getJarName())) {
×
826
          res.add(triggerInformation);
×
827
        } else {
828
          try {
829
            // local jar has conflicts with jar on config node, add current triggerInformation to
830
            // list
831
            if (TriggerManagementService.getInstance().isLocalJarConflicted(triggerInformation)) {
×
832
              res.add(triggerInformation);
×
833
            }
834
          } catch (TriggerManagementException e) {
×
835
            res.add(triggerInformation);
×
836
          }
×
837
        }
838
      }
839
    }
×
840
    return res;
×
841
  }
842

843
  private void getTriggerInformationList(List<ByteBuffer> allTriggerInformation) {
844
    if (allTriggerInformation != null && !allTriggerInformation.isEmpty()) {
×
845
      List<TriggerInformation> list = new ArrayList<>();
×
846
      for (ByteBuffer triggerInformationByteBuffer : allTriggerInformation) {
×
847
        list.add(TriggerInformation.deserialize(triggerInformationByteBuffer));
×
848
      }
×
849
      resourcesInformationHolder.setTriggerInformationList(list);
×
850
    }
851
  }
×
852

853
  private void preparePipeResources() throws StartupException {
854
    PipeAgent.runtime().preparePipeResources(resourcesInformationHolder);
×
855
  }
×
856

857
  private void getPipeInformationList(List<ByteBuffer> allPipeInformation) {
858
    final List<PipePluginMeta> list = new ArrayList<>();
×
859
    if (allPipeInformation != null) {
×
860
      for (ByteBuffer pipeInformationByteBuffer : allPipeInformation) {
×
861
        list.add(PipePluginMeta.deserialize(pipeInformationByteBuffer));
×
862
      }
×
863
    }
864
    resourcesInformationHolder.setPipePluginMetaList(list);
×
865
  }
×
866

867
  private void initSchemaEngine() {
868
    long time = System.currentTimeMillis();
×
869
    SchemaEngine.getInstance().init();
×
870
    long end = System.currentTimeMillis() - time;
×
871
    logger.info("Spent {}ms to recover schema.", end);
×
872
  }
×
873

874
  public void stop() {
875
    deactivate();
×
876

877
    try {
878
      MetricService.getInstance().stop();
×
879
      if (SchemaRegionConsensusImpl.getInstance() != null) {
×
880
        SchemaRegionConsensusImpl.getInstance().stop();
×
881
      }
882
      if (DataRegionConsensusImpl.getInstance() != null) {
×
883
        DataRegionConsensusImpl.getInstance().stop();
×
884
      }
885
    } catch (Exception e) {
×
886
      logger.error("Stop data node error", e);
×
887
    }
×
888
  }
×
889

890
  private void initProtocols() throws StartupException {
891
    if (config.isEnableMQTTService()) {
×
892
      registerManager.register(MQTTService.getInstance());
×
893
    }
894
    if (IoTDBRestServiceDescriptor.getInstance().getConfig().isEnableRestService()) {
×
895
      registerManager.register(RestService.getInstance());
×
896
    }
897
  }
×
898

899
  private void deactivate() {
900
    logger.info("Deactivating IoTDB DataNode...");
×
901
    stopTriggerRelatedServices();
×
902
    registerManager.deregisterAll();
×
903
    JMXService.deregisterMBean(mbeanName);
×
904
    logger.info("IoTDB DataNode is deactivated.");
×
905
  }
×
906

907
  private void stopTriggerRelatedServices() {
908
    triggerInformationUpdater.stopTriggerInformationUpdater();
×
909
  }
×
910

911
  private void setUncaughtExceptionHandler() {
912
    Thread.setDefaultUncaughtExceptionHandler(new IoTDBDefaultThreadExceptionHandler());
×
913
  }
×
914

915
  private static class DataNodeHolder {
916

917
    private static final DataNode INSTANCE = new DataNode();
1✔
918

919
    private DataNodeHolder() {
920
      // Empty constructor
921
    }
922
  }
923
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc