• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

apache / iotdb / #9686

pending completion
#9686

push

travis_ci

web-flow
add build info in show cluster (#10595)

146 of 146 new or added lines in 13 files covered. (100.0%)

79232 of 165062 relevant lines covered (48.0%)

0.48 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

4.47
/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/service/DataNode.java
1
/*
2
 * Licensed to the Apache Software Foundation (ASF) under one
3
 * or more contributor license agreements.  See the NOTICE file
4
 * distributed with this work for additional information
5
 * regarding copyright ownership.  The ASF licenses this file
6
 * to you under the Apache License, Version 2.0 (the
7
 * "License"); you may not use this file except in compliance
8
 * with the License.  You may obtain a copy of the License at
9
 *
10
 *     http://www.apache.org/licenses/LICENSE-2.0
11
 *
12
 * Unless required by applicable law or agreed to in writing,
13
 * software distributed under the License is distributed on an
14
 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15
 * KIND, either express or implied.  See the License for the
16
 * specific language governing permissions and limitations
17
 * under the License.
18
 */
19

20
package org.apache.iotdb.db.service;
21

22
import org.apache.iotdb.common.rpc.thrift.TConfigNodeLocation;
23
import org.apache.iotdb.common.rpc.thrift.TConsensusGroupType;
24
import org.apache.iotdb.common.rpc.thrift.TDataNodeConfiguration;
25
import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation;
26
import org.apache.iotdb.common.rpc.thrift.TEndPoint;
27
import org.apache.iotdb.common.rpc.thrift.TNodeResource;
28
import org.apache.iotdb.commons.client.exception.ClientManagerException;
29
import org.apache.iotdb.commons.concurrent.IoTDBDefaultThreadExceptionHandler;
30
import org.apache.iotdb.commons.conf.CommonDescriptor;
31
import org.apache.iotdb.commons.conf.IoTDBConstant;
32
import org.apache.iotdb.commons.exception.StartupException;
33
import org.apache.iotdb.commons.file.SystemFileFactory;
34
import org.apache.iotdb.commons.pipe.plugin.meta.PipePluginMeta;
35
import org.apache.iotdb.commons.service.JMXService;
36
import org.apache.iotdb.commons.service.RegisterManager;
37
import org.apache.iotdb.commons.service.metric.MetricService;
38
import org.apache.iotdb.commons.trigger.TriggerInformation;
39
import org.apache.iotdb.commons.trigger.exception.TriggerManagementException;
40
import org.apache.iotdb.commons.trigger.service.TriggerExecutableManager;
41
import org.apache.iotdb.commons.udf.UDFInformation;
42
import org.apache.iotdb.commons.udf.service.UDFClassLoaderManager;
43
import org.apache.iotdb.commons.udf.service.UDFExecutableManager;
44
import org.apache.iotdb.commons.udf.service.UDFManagementService;
45
import org.apache.iotdb.confignode.rpc.thrift.TDataNodeRegisterReq;
46
import org.apache.iotdb.confignode.rpc.thrift.TDataNodeRegisterResp;
47
import org.apache.iotdb.confignode.rpc.thrift.TDataNodeRestartReq;
48
import org.apache.iotdb.confignode.rpc.thrift.TDataNodeRestartResp;
49
import org.apache.iotdb.confignode.rpc.thrift.TGetJarInListReq;
50
import org.apache.iotdb.confignode.rpc.thrift.TGetJarInListResp;
51
import org.apache.iotdb.confignode.rpc.thrift.TRuntimeConfiguration;
52
import org.apache.iotdb.confignode.rpc.thrift.TSystemConfigurationResp;
53
import org.apache.iotdb.consensus.ConsensusFactory;
54
import org.apache.iotdb.db.conf.DataNodeStartupCheck;
55
import org.apache.iotdb.db.conf.IoTDBConfig;
56
import org.apache.iotdb.db.conf.IoTDBDescriptor;
57
import org.apache.iotdb.db.conf.IoTDBStartCheck;
58
import org.apache.iotdb.db.conf.rest.IoTDBRestServiceDescriptor;
59
import org.apache.iotdb.db.consensus.DataRegionConsensusImpl;
60
import org.apache.iotdb.db.consensus.SchemaRegionConsensusImpl;
61
import org.apache.iotdb.db.pipe.agent.PipeAgent;
62
import org.apache.iotdb.db.protocol.client.ConfigNodeClient;
63
import org.apache.iotdb.db.protocol.client.ConfigNodeClientManager;
64
import org.apache.iotdb.db.protocol.client.ConfigNodeInfo;
65
import org.apache.iotdb.db.protocol.thrift.impl.ClientRPCServiceImpl;
66
import org.apache.iotdb.db.protocol.thrift.impl.DataNodeRegionManager;
67
import org.apache.iotdb.db.queryengine.execution.exchange.MPPDataExchangeService;
68
import org.apache.iotdb.db.queryengine.execution.schedule.DriverScheduler;
69
import org.apache.iotdb.db.schemaengine.SchemaEngine;
70
import org.apache.iotdb.db.schemaengine.template.ClusterTemplateManager;
71
import org.apache.iotdb.db.service.metrics.DataNodeMetricsHelper;
72
import org.apache.iotdb.db.service.metrics.IoTDBInternalLocalReporter;
73
import org.apache.iotdb.db.storageengine.StorageEngine;
74
import org.apache.iotdb.db.storageengine.buffer.CacheHitRatioMonitor;
75
import org.apache.iotdb.db.storageengine.dataregion.compaction.schedule.CompactionTaskManager;
76
import org.apache.iotdb.db.storageengine.dataregion.flush.FlushManager;
77
import org.apache.iotdb.db.storageengine.dataregion.wal.WALManager;
78
import org.apache.iotdb.db.storageengine.dataregion.wal.utils.WALMode;
79
import org.apache.iotdb.db.storageengine.rescon.disk.TierManager;
80
import org.apache.iotdb.db.trigger.executor.TriggerExecutor;
81
import org.apache.iotdb.db.trigger.service.TriggerInformationUpdater;
82
import org.apache.iotdb.db.trigger.service.TriggerManagementService;
83
import org.apache.iotdb.metrics.config.MetricConfigDescriptor;
84
import org.apache.iotdb.metrics.utils.InternalReporterType;
85
import org.apache.iotdb.rpc.TSStatusCode;
86
import org.apache.iotdb.udf.api.exception.UDFManagementException;
87

88
import org.apache.thrift.TException;
89
import org.slf4j.Logger;
90
import org.slf4j.LoggerFactory;
91

92
import java.io.File;
93
import java.io.IOException;
94
import java.nio.ByteBuffer;
95
import java.nio.charset.Charset;
96
import java.util.ArrayList;
97
import java.util.List;
98
import java.util.concurrent.TimeUnit;
99
import java.util.stream.Collectors;
100

101
import static org.apache.iotdb.commons.conf.IoTDBConstant.DEFAULT_CLUSTER_NAME;
102

103
public class DataNode implements DataNodeMBean {
104

105
  private static final Logger logger = LoggerFactory.getLogger(DataNode.class);
1✔
106
  private static final IoTDBConfig config = IoTDBDescriptor.getInstance().getConfig();
1✔
107

108
  private final String mbeanName =
1✔
109
      String.format(
1✔
110
          "%s:%s=%s", "org.apache.iotdb.datanode.service", IoTDBConstant.JMX_TYPE, "DataNode");
111

112
  private static final File SYSTEM_PROPERTIES =
1✔
113
      SystemFileFactory.INSTANCE.getFile(
1✔
114
          config.getSchemaDir() + File.separator + IoTDBStartCheck.PROPERTIES_FILE_NAME);
1✔
115

116
  /**
117
   * When joining a cluster or getting configuration this node will retry at most "DEFAULT_RETRY"
118
   * times before returning a failure to the client.
119
   */
120
  private static final int DEFAULT_RETRY = 10;
121

122
  private static final long DEFAULT_RETRY_INTERVAL_IN_MS = config.getJoinClusterRetryIntervalMs();
1✔
123

124
  private final TEndPoint thisNode = new TEndPoint();
1✔
125

126
  /** Hold the information of trigger, udf...... */
127
  private final ResourcesInformationHolder resourcesInformationHolder =
1✔
128
      new ResourcesInformationHolder();
129

130
  /** Responsible for keeping trigger information up to date. */
131
  private final TriggerInformationUpdater triggerInformationUpdater =
1✔
132
      new TriggerInformationUpdater();
133

134
  private static final String REGISTER_INTERRUPTION =
135
      "Unexpected interruption when waiting to register to the cluster";
136

137
  private DataNode() {
1✔
138
    // We do not init anything here, so that we can re-initialize the instance in IT.
139
  }
1✔
140

141
  private static final RegisterManager registerManager = new RegisterManager();
1✔
142

143
  public static DataNode getInstance() {
144
    return DataNodeHolder.INSTANCE;
1✔
145
  }
146

147
  public static void main(String[] args) {
148
    logger.info("IoTDB-DataNode environment variables: {}", IoTDBConfig.getEnvironmentVariables());
×
149
    logger.info("IoTDB-DataNode default charset is: {}", Charset.defaultCharset().displayName());
×
150
    new DataNodeServerCommandLine().doMain(args);
×
151
  }
×
152

153
  protected void doAddNode() {
154
    boolean isFirstStart = false;
×
155
    try {
156
      // Check if this DataNode is start for the first time and do other pre-checks
157
      isFirstStart = prepareDataNode();
×
158

159
      // Set target ConfigNodeList from iotdb-datanode.properties file
160
      ConfigNodeInfo.getInstance().updateConfigNodeList(config.getTargetConfigNodeList());
×
161

162
      // Pull and check system configurations from ConfigNode-leader
163
      pullAndCheckSystemConfigurations();
×
164

165
      if (isFirstStart) {
×
166
        // Register this DataNode to the cluster when first start
167
        sendRegisterRequestToConfigNode();
×
168
      } else {
169
        // Send restart request of this DataNode
170
        sendRestartRequestToConfigNode();
×
171
      }
172
      // TierManager need DataNodeId to do some operations so the reset method need to be invoked
173
      // after DataNode adding
174
      TierManager.getInstance().resetFolders();
×
175
      // Active DataNode
176
      active();
×
177

178
      // Setup metric service
179
      setUpMetricService();
×
180

181
      // Setup rpc service
182
      setUpRPCService();
×
183

184
      // Serialize mutable system properties
185
      IoTDBStartCheck.getInstance().serializeMutableSystemPropertiesIfNecessary();
×
186

187
      logger.info("IoTDB configuration: {}", config.getConfigMessage());
×
188
      logger.info("Congratulation, IoTDB DataNode is set up successfully. Now, enjoy yourself!");
×
189

190
    } catch (StartupException | IOException e) {
×
191
      logger.error("Fail to start server", e);
×
192
      if (isFirstStart) {
×
193
        // Delete the system.properties file when first start failed.
194
        // Therefore, the next time this DataNode is start will still be seen as the first time.
195
        SYSTEM_PROPERTIES.deleteOnExit();
×
196
      }
197
      stop();
×
198
    }
×
199
  }
×
200

201
  /** Prepare cluster IoTDB-DataNode */
202
  private boolean prepareDataNode() throws StartupException, IOException {
203
    // Set cluster mode
204
    config.setClusterMode(true);
×
205

206
    // Notice: Consider this DataNode as first start if the system.properties file doesn't exist
207
    boolean isFirstStart = IoTDBStartCheck.getInstance().checkIsFirstStart();
×
208

209
    // Check target ConfigNodes
210
    for (TEndPoint endPoint : config.getTargetConfigNodeList()) {
×
211
      if (endPoint.getIp().equals("0.0.0.0")) {
×
212
        throw new StartupException(
×
213
            "The ip address of any target_config_node_list couldn't be 0.0.0.0");
214
      }
215
    }
×
216

217
    // Set this node
218
    thisNode.setIp(config.getInternalAddress());
×
219
    thisNode.setPort(config.getInternalPort());
×
220

221
    // Startup checks
222
    DataNodeStartupCheck checks = new DataNodeStartupCheck(IoTDBConstant.DN_ROLE, config);
×
223
    checks.startUpCheck();
×
224
    return isFirstStart;
×
225
  }
226

227
  /**
228
   * Pull and check the following system configurations:
229
   *
230
   * <p>1. GlobalConfig
231
   *
232
   * <p>2. RatisConfig
233
   *
234
   * <p>3. CQConfig
235
   *
236
   * @throws StartupException When failed connect to ConfigNode-leader
237
   */
238
  private void pullAndCheckSystemConfigurations() throws StartupException {
239
    logger.info("Pulling system configurations from the ConfigNode-leader...");
×
240

241
    /* Pull system configurations */
242
    int retry = DEFAULT_RETRY;
×
243
    TSystemConfigurationResp configurationResp = null;
×
244
    while (retry > 0) {
×
245
      try (ConfigNodeClient configNodeClient =
246
          ConfigNodeClientManager.getInstance().borrowClient(ConfigNodeInfo.CONFIG_REGION_ID)) {
×
247
        configurationResp = configNodeClient.getSystemConfiguration();
×
248
        break;
249
      } catch (TException | ClientManagerException e) {
×
250
        // Read ConfigNodes from system.properties and retry
251
        logger.warn(
×
252
            "Cannot pull system configurations from ConfigNode-leader, because: {}",
253
            e.getMessage());
×
254
        ConfigNodeInfo.getInstance().loadConfigNodeList();
×
255
        retry--;
×
256
      }
257

258
      try {
259
        // wait to start the next try
260
        Thread.sleep(DEFAULT_RETRY_INTERVAL_IN_MS);
×
261
      } catch (InterruptedException e) {
×
262
        Thread.currentThread().interrupt();
×
263
        logger.warn(REGISTER_INTERRUPTION, e);
×
264
        retry = -1;
×
265
      }
×
266
    }
267
    if (configurationResp == null) {
×
268
      // All tries failed
269
      logger.error(
×
270
          "Cannot pull system configurations from ConfigNode-leader after {} retries",
271
          DEFAULT_RETRY);
×
272
      throw new StartupException("Cannot pull system configurations from ConfigNode-leader");
×
273
    }
274

275
    /* Load system configurations */
276
    IoTDBDescriptor.getInstance().loadGlobalConfig(configurationResp.globalConfig);
×
277
    IoTDBDescriptor.getInstance().loadRatisConfig(configurationResp.ratisConfig);
×
278
    IoTDBDescriptor.getInstance().loadCQConfig(configurationResp.cqConfig);
×
279
    CommonDescriptor.getInstance().loadGlobalConfig(configurationResp.globalConfig);
×
280

281
    /* Set cluster consensus protocol class */
282
    if (!IoTDBStartCheck.getInstance()
×
283
        .checkConsensusProtocolExists(TConsensusGroupType.DataRegion)) {
×
284
      config.setDataRegionConsensusProtocolClass(
×
285
          configurationResp.globalConfig.getDataRegionConsensusProtocolClass());
×
286
    }
287

288
    if (!IoTDBStartCheck.getInstance()
×
289
        .checkConsensusProtocolExists(TConsensusGroupType.SchemaRegion)) {
×
290
      config.setSchemaRegionConsensusProtocolClass(
×
291
          configurationResp.globalConfig.getSchemaRegionConsensusProtocolClass());
×
292
    }
293

294
    /* Check system configurations */
295
    try {
296
      IoTDBStartCheck.getInstance().checkSystemConfig();
×
297
      IoTDBStartCheck.getInstance().checkDirectory();
×
298
      if (!config.getDataRegionConsensusProtocolClass().equals(ConsensusFactory.IOT_CONSENSUS)) {
×
299
        // In current implementation, only IoTConsensus need separated memory from Consensus
300
        IoTDBDescriptor.getInstance().reclaimConsensusMemory();
×
301
      }
302
    } catch (Exception e) {
×
303
      throw new StartupException(e.getMessage());
×
304
    }
×
305

306
    logger.info("Successfully pull system configurations from ConfigNode-leader.");
×
307
  }
×
308

309
  /**
310
   * Store runtime configurations, which includes:
311
   *
312
   * <p>1. All ConfigNodes in cluster
313
   *
314
   * <p>2. All template information
315
   *
316
   * <p>3. All UDF information
317
   *
318
   * <p>4. All trigger information
319
   *
320
   * <p>5. All Pipe information
321
   *
322
   * <p>6. All TTL information
323
   */
324
  private void storeRuntimeConfigurations(
325
      List<TConfigNodeLocation> configNodeLocations, TRuntimeConfiguration runtimeConfiguration) {
326
    /* Store ConfigNodeList */
327
    List<TEndPoint> configNodeList = new ArrayList<>();
×
328
    for (TConfigNodeLocation configNodeLocation : configNodeLocations) {
×
329
      configNodeList.add(configNodeLocation.getInternalEndPoint());
×
330
    }
×
331
    ConfigNodeInfo.getInstance().updateConfigNodeList(configNodeList);
×
332

333
    /* Store templateSetInfo */
334
    ClusterTemplateManager.getInstance()
×
335
        .updateTemplateSetInfo(runtimeConfiguration.getTemplateInfo());
×
336

337
    /* Store udfInformationList */
338
    getUDFInformationList(runtimeConfiguration.getAllUDFInformation());
×
339

340
    /* Store triggerInformationList */
341
    getTriggerInformationList(runtimeConfiguration.getAllTriggerInformation());
×
342

343
    /* Store pipeInformationList */
344
    getPipeInformationList(runtimeConfiguration.getAllPipeInformation());
×
345

346
    /* Store ttl information */
347
    StorageEngine.getInstance().updateTTLInfo(runtimeConfiguration.getAllTTLInformation());
×
348
  }
×
349

350
  /**
351
   * Register this DataNode into cluster.
352
   *
353
   * @throws StartupException if register failed.
354
   * @throws IOException if serialize cluster name and dataNode Id failed.
355
   */
356
  private void sendRegisterRequestToConfigNode() throws StartupException, IOException {
357
    logger.info("Sending register request to ConfigNode-leader...");
×
358

359
    /* Send register request */
360
    int retry = DEFAULT_RETRY;
×
361
    TDataNodeRegisterReq req = new TDataNodeRegisterReq();
×
362
    req.setDataNodeConfiguration(generateDataNodeConfiguration());
×
363
    req.setClusterName(config.getClusterName());
×
364
    req.setBuildInfo(IoTDBConstant.BUILD_INFO);
×
365
    TDataNodeRegisterResp dataNodeRegisterResp = null;
×
366
    while (retry > 0) {
×
367
      try (ConfigNodeClient configNodeClient =
368
          ConfigNodeClientManager.getInstance().borrowClient(ConfigNodeInfo.CONFIG_REGION_ID)) {
×
369
        dataNodeRegisterResp = configNodeClient.registerDataNode(req);
×
370
        break;
371
      } catch (TException | ClientManagerException e) {
×
372
        // Read ConfigNodes from system.properties and retry
373
        logger.warn("Cannot register to the cluster, because: {}", e.getMessage());
×
374
        ConfigNodeInfo.getInstance().loadConfigNodeList();
×
375
        retry--;
×
376
      }
377

378
      try {
379
        // Wait to start the next try
380
        Thread.sleep(DEFAULT_RETRY_INTERVAL_IN_MS);
×
381
      } catch (InterruptedException e) {
×
382
        Thread.currentThread().interrupt();
×
383
        logger.warn(REGISTER_INTERRUPTION, e);
×
384
        retry = -1;
×
385
      }
×
386
    }
387
    if (dataNodeRegisterResp == null) {
×
388
      // All tries failed
389
      logger.error(
×
390
          "Cannot register into cluster after {} retries. "
391
              + "Please check dn_target_config_node_list in iotdb-datanode.properties.",
392
          DEFAULT_RETRY);
×
393
      throw new StartupException("Cannot register into the cluster.");
×
394
    }
395

396
    if (dataNodeRegisterResp.getStatus().getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
×
397

398
      /* Store runtime configurations when register success */
399
      int dataNodeID = dataNodeRegisterResp.getDataNodeId();
×
400
      config.setDataNodeId(dataNodeID);
×
401
      IoTDBStartCheck.getInstance()
×
402
          .serializeClusterNameAndDataNodeId(config.getClusterName(), dataNodeID);
×
403

404
      storeRuntimeConfigurations(
×
405
          dataNodeRegisterResp.getConfigNodeList(), dataNodeRegisterResp.getRuntimeConfiguration());
×
406

407
      logger.info("Successfully register to the cluster: {}", config.getClusterName());
×
408
    } else {
×
409
      /* Throw exception when register failed */
410
      logger.error(dataNodeRegisterResp.getStatus().getMessage());
×
411
      throw new StartupException("Cannot register to the cluster.");
×
412
    }
413
  }
×
414

415
  private void sendRestartRequestToConfigNode() throws StartupException {
416
    logger.info("Sending restart request to ConfigNode-leader...");
×
417

418
    /* Send restart request */
419
    int retry = DEFAULT_RETRY;
×
420
    TDataNodeRestartReq req = new TDataNodeRestartReq();
×
421
    req.setClusterName(
×
422
        config.getClusterName() == null ? DEFAULT_CLUSTER_NAME : config.getClusterName());
×
423
    req.setDataNodeConfiguration(generateDataNodeConfiguration());
×
424
    req.setBuildInfo(IoTDBConstant.BUILD_INFO);
×
425
    TDataNodeRestartResp dataNodeRestartResp = null;
×
426
    while (retry > 0) {
×
427
      try (ConfigNodeClient configNodeClient =
428
          ConfigNodeClientManager.getInstance().borrowClient(ConfigNodeInfo.CONFIG_REGION_ID)) {
×
429
        dataNodeRestartResp = configNodeClient.restartDataNode(req);
×
430
        break;
431
      } catch (TException | ClientManagerException e) {
×
432
        // Read ConfigNodes from system.properties and retry
433
        logger.warn(
×
434
            "Cannot send restart request to the ConfigNode-leader, because: {}", e.getMessage());
×
435
        ConfigNodeInfo.getInstance().loadConfigNodeList();
×
436
        retry--;
×
437
      }
438

439
      try {
440
        // wait to start the next try
441
        Thread.sleep(DEFAULT_RETRY_INTERVAL_IN_MS);
×
442
      } catch (InterruptedException e) {
×
443
        Thread.currentThread().interrupt();
×
444
        logger.warn(REGISTER_INTERRUPTION, e);
×
445
        retry = -1;
×
446
      }
×
447
    }
448
    if (dataNodeRestartResp == null) {
×
449
      // All tries failed
450
      logger.error(
×
451
          "Cannot send restart DataNode request to ConfigNode-leader after {} retries. "
452
              + "Please check dn_target_config_node_list in iotdb-datanode.properties.",
453
          DEFAULT_RETRY);
×
454
      throw new StartupException("Cannot send restart DataNode request to ConfigNode-leader.");
×
455
    }
456

457
    if (dataNodeRestartResp.getStatus().getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
×
458
      /* Store runtime configurations when restart request is accepted */
459
      storeRuntimeConfigurations(
×
460
          dataNodeRestartResp.getConfigNodeList(), dataNodeRestartResp.getRuntimeConfiguration());
×
461
      logger.info("Restart request to cluster: {} is accepted.", config.getClusterName());
×
462
    } else {
463
      /* Throw exception when restart is rejected */
464
      throw new StartupException(dataNodeRestartResp.getStatus().getMessage());
×
465
    }
466
  }
×
467

468
  private void prepareResources() throws StartupException {
469
    prepareUDFResources();
×
470
    prepareTriggerResources();
×
471
    preparePipeResources();
×
472
  }
×
473

474
  /**
475
   * Register services and set up DataNode.
476
   *
477
   * @throws StartupException if start up failed.
478
   */
479
  private void active() throws StartupException {
480
    try {
481
      processPid();
×
482
      setUp();
×
483
    } catch (StartupException e) {
×
484
      logger.error("Meet error while starting up.", e);
×
485
      throw new StartupException("Error in activating IoTDB DataNode.");
×
486
    }
×
487
    logger.info("IoTDB DataNode has started.");
×
488

489
    try {
490
      SchemaRegionConsensusImpl.setupAndGetInstance().start();
×
491
      DataRegionConsensusImpl.setupAndGetInstance().start();
×
492
    } catch (IOException e) {
×
493
      throw new StartupException(e);
×
494
    }
×
495
  }
×
496

497
  void processPid() {
498
    String pidFile = System.getProperty(IoTDBConstant.IOTDB_PIDFILE);
1✔
499
    if (pidFile != null) {
1✔
500
      new File(pidFile).deleteOnExit();
1✔
501
    }
502
  }
1✔
503

504
  private void setUp() throws StartupException {
505
    logger.info("Setting up IoTDB DataNode...");
×
506
    registerManager.register(new JMXService());
×
507
    JMXService.registerMBean(getInstance(), mbeanName);
×
508

509
    // Get resources for trigger,udf,pipe...
510
    prepareResources();
×
511

512
    Runtime.getRuntime().addShutdownHook(new IoTDBShutdownHook());
×
513
    setUncaughtExceptionHandler();
×
514

515
    logger.info("Recover the schema...");
×
516
    initSchemaEngine();
×
517
    registerManager.register(FlushManager.getInstance());
×
518
    registerManager.register(CacheHitRatioMonitor.getInstance());
×
519

520
    // Close wal when using ratis consensus
521
    if (config.isClusterMode()
×
522
        && config.getDataRegionConsensusProtocolClass().equals(ConsensusFactory.RATIS_CONSENSUS)) {
×
523
      config.setWalMode(WALMode.DISABLE);
×
524
    }
525
    registerManager.register(WALManager.getInstance());
×
526

527
    // In mpp mode we need to start some other services
528
    registerManager.register(StorageEngine.getInstance());
×
529
    registerManager.register(MPPDataExchangeService.getInstance());
×
530
    registerManager.register(DriverScheduler.getInstance());
×
531

532
    registerUdfServices();
×
533

534
    logger.info(
×
535
        "IoTDB DataNode is setting up, some databases may not be ready now, please wait several seconds...");
536

537
    while (!StorageEngine.getInstance().isAllSgReady()) {
×
538
      try {
539
        TimeUnit.MILLISECONDS.sleep(1000);
×
540
      } catch (InterruptedException e) {
×
541
        logger.warn("IoTDB DataNode failed to set up.", e);
×
542
        Thread.currentThread().interrupt();
×
543
        return;
×
544
      }
×
545
    }
546

547
    // Must init after SchemaEngine and StorageEngine prepared well
548
    DataNodeRegionManager.getInstance().init();
×
549

550
    // Start region migrate service
551
    registerManager.register(RegionMigrateService.getInstance());
×
552

553
    registerManager.register(CompactionTaskManager.getInstance());
×
554

555
    registerManager.register(PipeAgent.runtime());
×
556
  }
×
557

558
  /** Set up RPC and protocols after DataNode is available */
559
  private void setUpRPCService() throws StartupException {
560
    // Start InternalRPCService to indicate that the current DataNode can accept cluster scheduling
561
    registerManager.register(DataNodeInternalRPCService.getInstance());
×
562
    // Start InternalRPCService to indicate that the current DataNode can accept request from MLNode
563
    if (config.isEnableMLNodeService()) {
×
564
      registerManager.register(MLNodeRPCService.getInstance());
×
565
    }
566

567
    // Notice: During the period between starting the internal RPC service
568
    // and starting the client RPC service , some requests may fail because
569
    // DataNode is not marked as RUNNING by ConfigNode-leader yet.
570

571
    // Start client RPCService to indicate that the current DataNode provide external services
572
    IoTDBDescriptor.getInstance()
×
573
        .getConfig()
×
574
        .setRpcImplClassName(ClientRPCServiceImpl.class.getName());
×
575
    if (config.isEnableRpcService()) {
×
576
      registerManager.register(RPCService.getInstance());
×
577
    }
578
    // init service protocols
579
    initProtocols();
×
580
  }
×
581

582
  private void setUpMetricService() throws StartupException {
583
    MetricConfigDescriptor.getInstance().getMetricConfig().setNodeId(config.getDataNodeId());
×
584
    registerManager.register(MetricService.getInstance());
×
585

586
    // init metric service
587
    if (MetricConfigDescriptor.getInstance()
×
588
        .getMetricConfig()
×
589
        .getInternalReportType()
×
590
        .equals(InternalReporterType.IOTDB)) {
×
591
      MetricService.getInstance().updateInternalReporter(new IoTDBInternalLocalReporter());
×
592
    }
593
    MetricService.getInstance().startInternalReporter();
×
594
    // bind predefined metrics
595
    DataNodeMetricsHelper.bind();
×
596
  }
×
597

598
  public static TDataNodeLocation generateDataNodeLocation() {
599
    TDataNodeLocation location = new TDataNodeLocation();
×
600
    location.setDataNodeId(config.getDataNodeId());
×
601
    location.setClientRpcEndPoint(new TEndPoint(config.getRpcAddress(), config.getRpcPort()));
×
602
    location.setInternalEndPoint(
×
603
        new TEndPoint(config.getInternalAddress(), config.getInternalPort()));
×
604
    location.setMPPDataExchangeEndPoint(
×
605
        new TEndPoint(config.getInternalAddress(), config.getMppDataExchangePort()));
×
606
    location.setDataRegionConsensusEndPoint(
×
607
        new TEndPoint(config.getInternalAddress(), config.getDataRegionConsensusPort()));
×
608
    location.setSchemaRegionConsensusEndPoint(
×
609
        new TEndPoint(config.getInternalAddress(), config.getSchemaRegionConsensusPort()));
×
610
    return location;
×
611
  }
612

613
  /**
614
   * Generate dataNodeConfiguration.
615
   *
616
   * @return TDataNodeConfiguration
617
   */
618
  private TDataNodeConfiguration generateDataNodeConfiguration() {
619
    // Set DataNodeLocation
620
    TDataNodeLocation location = generateDataNodeLocation();
×
621

622
    // Set NodeResource
623
    TNodeResource resource = new TNodeResource();
×
624
    resource.setCpuCoreNum(Runtime.getRuntime().availableProcessors());
×
625
    resource.setMaxMemory(Runtime.getRuntime().totalMemory());
×
626

627
    return new TDataNodeConfiguration(location, resource);
×
628
  }
629

630
  private void registerUdfServices() throws StartupException {
631
    registerManager.register(TemporaryQueryDataFileService.getInstance());
×
632
    registerManager.register(UDFClassLoaderManager.setupAndGetInstance(config.getUdfDir()));
×
633
  }
×
634

635
  private void initUDFRelatedInstance() throws StartupException {
636
    try {
637
      UDFExecutableManager.setupAndGetInstance(config.getUdfTemporaryLibDir(), config.getUdfDir());
×
638
      UDFClassLoaderManager.setupAndGetInstance(config.getUdfDir());
×
639
    } catch (IOException e) {
×
640
      throw new StartupException(e);
×
641
    }
×
642
  }
×
643

644
  private void prepareUDFResources() throws StartupException {
645
    initUDFRelatedInstance();
×
646
    if (resourcesInformationHolder.getUDFInformationList() == null
×
647
        || resourcesInformationHolder.getUDFInformationList().isEmpty()) {
×
648
      return;
×
649
    }
650

651
    // Get jars from config node
652
    List<UDFInformation> udfNeedJarList = getJarListForUDF();
×
653
    int index = 0;
×
654
    while (index < udfNeedJarList.size()) {
×
655
      List<UDFInformation> curList = new ArrayList<>();
×
656
      int offset = 0;
×
657
      while (offset < ResourcesInformationHolder.getJarNumOfOneRpc()
×
658
          && index + offset < udfNeedJarList.size()) {
×
659
        curList.add(udfNeedJarList.get(index + offset));
×
660
        offset++;
×
661
      }
662
      index += (offset + 1);
×
663
      getJarOfUDFs(curList);
×
664
    }
×
665

666
    // Create instances of udf and do registration
667
    try {
668
      for (UDFInformation udfInformation : resourcesInformationHolder.getUDFInformationList()) {
×
669
        UDFManagementService.getInstance().doRegister(udfInformation);
×
670
      }
×
671
    } catch (Exception e) {
×
672
      throw new StartupException(e);
×
673
    }
×
674

675
    logger.debug("successfully registered all the UDFs");
×
676
    if (logger.isDebugEnabled()) {
×
677
      for (UDFInformation udfInformation :
678
          UDFManagementService.getInstance().getAllUDFInformation()) {
×
679
        logger.debug("get udf: {}", udfInformation.getFunctionName());
×
680
      }
681
    }
682
  }
×
683

684
  private void getJarOfUDFs(List<UDFInformation> udfInformationList) throws StartupException {
685
    try (ConfigNodeClient configNodeClient =
686
        ConfigNodeClientManager.getInstance().borrowClient(ConfigNodeInfo.CONFIG_REGION_ID)) {
×
687
      List<String> jarNameList =
×
688
          udfInformationList.stream().map(UDFInformation::getJarName).collect(Collectors.toList());
×
689
      TGetJarInListResp resp = configNodeClient.getUDFJar(new TGetJarInListReq(jarNameList));
×
690
      if (resp.getStatus().getCode() == TSStatusCode.EXECUTE_STATEMENT_ERROR.getStatusCode()) {
×
691
        throw new StartupException("Failed to get UDF jar from config node.");
×
692
      }
693
      List<ByteBuffer> jarList = resp.getJarList();
×
694
      for (int i = 0; i < udfInformationList.size(); i++) {
×
695
        UDFExecutableManager.getInstance()
×
696
            .saveToInstallDir(jarList.get(i), udfInformationList.get(i).getJarName());
×
697
      }
698
    } catch (IOException | TException | ClientManagerException e) {
×
699
      throw new StartupException(e);
×
700
    }
×
701
  }
×
702

703
  /** Generate a list for UDFs that do not have jar on this node. */
704
  private List<UDFInformation> getJarListForUDF() {
705
    List<UDFInformation> res = new ArrayList<>();
×
706
    for (UDFInformation udfInformation : resourcesInformationHolder.getUDFInformationList()) {
×
707
      if (udfInformation.isUsingURI()) {
×
708
        // Jar does not exist, add current udfInformation to list
709
        if (!UDFExecutableManager.getInstance()
×
710
            .hasFileUnderInstallDir(udfInformation.getJarName())) {
×
711
          res.add(udfInformation);
×
712
        } else {
713
          try {
714
            // Local jar has conflicts with jar on config node, add current triggerInformation to
715
            // list
716
            if (UDFManagementService.getInstance().isLocalJarConflicted(udfInformation)) {
×
717
              res.add(udfInformation);
×
718
            }
719
          } catch (UDFManagementException e) {
×
720
            res.add(udfInformation);
×
721
          }
×
722
        }
723
      }
724
    }
×
725
    return res;
×
726
  }
727

728
  private void getUDFInformationList(List<ByteBuffer> allUDFInformation) {
729
    if (allUDFInformation != null && !allUDFInformation.isEmpty()) {
×
730
      List<UDFInformation> list = new ArrayList<>();
×
731
      for (ByteBuffer UDFInformationByteBuffer : allUDFInformation) {
×
732
        list.add(UDFInformation.deserialize(UDFInformationByteBuffer));
×
733
      }
×
734
      resourcesInformationHolder.setUDFInformationList(list);
×
735
    }
736
  }
×
737

738
  private void initTriggerRelatedInstance() throws StartupException {
739
    try {
740
      TriggerExecutableManager.setupAndGetInstance(
×
741
          config.getTriggerTemporaryLibDir(), config.getTriggerDir());
×
742
    } catch (IOException e) {
×
743
      throw new StartupException(e);
×
744
    }
×
745
  }
×
746

747
  private void prepareTriggerResources() throws StartupException {
748
    initTriggerRelatedInstance();
×
749
    if (resourcesInformationHolder.getTriggerInformationList() == null
×
750
        || resourcesInformationHolder.getTriggerInformationList().isEmpty()) {
×
751
      return;
×
752
    }
753

754
    // Get jars from config node
755
    List<TriggerInformation> triggerNeedJarList = getJarListForTrigger();
×
756
    int index = 0;
×
757
    while (index < triggerNeedJarList.size()) {
×
758
      List<TriggerInformation> curList = new ArrayList<>();
×
759
      int offset = 0;
×
760
      while (offset < ResourcesInformationHolder.getJarNumOfOneRpc()
×
761
          && index + offset < triggerNeedJarList.size()) {
×
762
        curList.add(triggerNeedJarList.get(index + offset));
×
763
        offset++;
×
764
      }
765
      index += (offset + 1);
×
766
      getJarOfTriggers(curList);
×
767
    }
×
768

769
    // Create instances of triggers and do registration
770
    try {
771
      for (TriggerInformation triggerInformation :
772
          resourcesInformationHolder.getTriggerInformationList()) {
×
773
        TriggerManagementService.getInstance().doRegister(triggerInformation, true);
×
774
      }
×
775
    } catch (Exception e) {
×
776
      throw new StartupException(e);
×
777
    }
×
778
    logger.debug("successfully registered all the triggers");
×
779
    if (logger.isDebugEnabled()) {
×
780
      for (TriggerInformation triggerInformation :
781
          TriggerManagementService.getInstance().getAllTriggerInformationInTriggerTable()) {
×
782
        logger.debug("get trigger: {}", triggerInformation.getTriggerName());
×
783
      }
×
784
      for (TriggerExecutor triggerExecutor :
785
          TriggerManagementService.getInstance().getAllTriggerExecutors()) {
×
786
        logger.debug(
×
787
            "get trigger executor: {}", triggerExecutor.getTriggerInformation().getTriggerName());
×
788
      }
×
789
    }
790
    // Start TriggerInformationUpdater
791
    triggerInformationUpdater.startTriggerInformationUpdater();
×
792
  }
×
793

794
  private void getJarOfTriggers(List<TriggerInformation> triggerInformationList)
795
      throws StartupException {
796
    try (ConfigNodeClient configNodeClient =
797
        ConfigNodeClientManager.getInstance().borrowClient(ConfigNodeInfo.CONFIG_REGION_ID)) {
×
798
      List<String> jarNameList =
×
799
          triggerInformationList.stream()
×
800
              .map(TriggerInformation::getJarName)
×
801
              .collect(Collectors.toList());
×
802
      TGetJarInListResp resp = configNodeClient.getTriggerJar(new TGetJarInListReq(jarNameList));
×
803
      if (resp.getStatus().getCode() == TSStatusCode.EXECUTE_STATEMENT_ERROR.getStatusCode()) {
×
804
        throw new StartupException("Failed to get trigger jar from config node.");
×
805
      }
806
      List<ByteBuffer> jarList = resp.getJarList();
×
807
      for (int i = 0; i < triggerInformationList.size(); i++) {
×
808
        TriggerExecutableManager.getInstance()
×
809
            .saveToInstallDir(jarList.get(i), triggerInformationList.get(i).getJarName());
×
810
      }
811
    } catch (IOException | TException | ClientManagerException e) {
×
812
      throw new StartupException(e);
×
813
    }
×
814
  }
×
815

816
  /** Generate a list for triggers that do not have jar on this node. */
817
  private List<TriggerInformation> getJarListForTrigger() {
818
    List<TriggerInformation> res = new ArrayList<>();
×
819
    for (TriggerInformation triggerInformation :
820
        resourcesInformationHolder.getTriggerInformationList()) {
×
821
      if (triggerInformation.isUsingURI()) {
×
822
        // jar does not exist, add current triggerInformation to list
823
        if (!TriggerExecutableManager.getInstance()
×
824
            .hasFileUnderInstallDir(triggerInformation.getJarName())) {
×
825
          res.add(triggerInformation);
×
826
        } else {
827
          try {
828
            // local jar has conflicts with jar on config node, add current triggerInformation to
829
            // list
830
            if (TriggerManagementService.getInstance().isLocalJarConflicted(triggerInformation)) {
×
831
              res.add(triggerInformation);
×
832
            }
833
          } catch (TriggerManagementException e) {
×
834
            res.add(triggerInformation);
×
835
          }
×
836
        }
837
      }
838
    }
×
839
    return res;
×
840
  }
841

842
  private void getTriggerInformationList(List<ByteBuffer> allTriggerInformation) {
843
    if (allTriggerInformation != null && !allTriggerInformation.isEmpty()) {
×
844
      List<TriggerInformation> list = new ArrayList<>();
×
845
      for (ByteBuffer triggerInformationByteBuffer : allTriggerInformation) {
×
846
        list.add(TriggerInformation.deserialize(triggerInformationByteBuffer));
×
847
      }
×
848
      resourcesInformationHolder.setTriggerInformationList(list);
×
849
    }
850
  }
×
851

852
  private void preparePipeResources() throws StartupException {
853
    PipeAgent.runtime().preparePipeResources(resourcesInformationHolder);
×
854
  }
×
855

856
  private void getPipeInformationList(List<ByteBuffer> allPipeInformation) {
857
    final List<PipePluginMeta> list = new ArrayList<>();
×
858
    if (allPipeInformation != null) {
×
859
      for (ByteBuffer pipeInformationByteBuffer : allPipeInformation) {
×
860
        list.add(PipePluginMeta.deserialize(pipeInformationByteBuffer));
×
861
      }
×
862
    }
863
    resourcesInformationHolder.setPipePluginMetaList(list);
×
864
  }
×
865

866
  private void initSchemaEngine() {
867
    long time = System.currentTimeMillis();
×
868
    SchemaEngine.getInstance().init();
×
869
    long end = System.currentTimeMillis() - time;
×
870
    logger.info("Spent {}ms to recover schema.", end);
×
871
  }
×
872

873
  public void stop() {
874
    deactivate();
×
875

876
    try {
877
      MetricService.getInstance().stop();
×
878
      if (SchemaRegionConsensusImpl.getInstance() != null) {
×
879
        SchemaRegionConsensusImpl.getInstance().stop();
×
880
      }
881
      if (DataRegionConsensusImpl.getInstance() != null) {
×
882
        DataRegionConsensusImpl.getInstance().stop();
×
883
      }
884
    } catch (Exception e) {
×
885
      logger.error("Stop data node error", e);
×
886
    }
×
887
  }
×
888

889
  private void initProtocols() throws StartupException {
890
    if (config.isEnableMQTTService()) {
×
891
      registerManager.register(MQTTService.getInstance());
×
892
    }
893
    if (IoTDBRestServiceDescriptor.getInstance().getConfig().isEnableRestService()) {
×
894
      registerManager.register(RestService.getInstance());
×
895
    }
896
  }
×
897

898
  private void deactivate() {
899
    logger.info("Deactivating IoTDB DataNode...");
×
900
    stopTriggerRelatedServices();
×
901
    registerManager.deregisterAll();
×
902
    JMXService.deregisterMBean(mbeanName);
×
903
    logger.info("IoTDB DataNode is deactivated.");
×
904
  }
×
905

906
  private void stopTriggerRelatedServices() {
907
    triggerInformationUpdater.stopTriggerInformationUpdater();
×
908
  }
×
909

910
  private void setUncaughtExceptionHandler() {
911
    Thread.setDefaultUncaughtExceptionHandler(new IoTDBDefaultThreadExceptionHandler());
×
912
  }
×
913

914
  private static class DataNodeHolder {
915

916
    private static final DataNode INSTANCE = new DataNode();
1✔
917

918
    private DataNodeHolder() {
919
      // Empty constructor
920
    }
921
  }
922
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc