• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

apache / iotdb / #9905

23 Aug 2023 06:20AM UTC coverage: 47.785% (-0.1%) from 47.922%
#9905

push

travis_ci

web-flow
[To rel/1.2][Metric] Fix flush point statistics (#10934)

23 of 23 new or added lines in 1 file covered. (100.0%)

79851 of 167106 relevant lines covered (47.78%)

0.48 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

4.51
/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/service/DataNode.java
1
/*
2
 * Licensed to the Apache Software Foundation (ASF) under one
3
 * or more contributor license agreements.  See the NOTICE file
4
 * distributed with this work for additional information
5
 * regarding copyright ownership.  The ASF licenses this file
6
 * to you under the Apache License, Version 2.0 (the
7
 * "License"); you may not use this file except in compliance
8
 * with the License.  You may obtain a copy of the License at
9
 *
10
 *     http://www.apache.org/licenses/LICENSE-2.0
11
 *
12
 * Unless required by applicable law or agreed to in writing,
13
 * software distributed under the License is distributed on an
14
 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15
 * KIND, either express or implied.  See the License for the
16
 * specific language governing permissions and limitations
17
 * under the License.
18
 */
19

20
package org.apache.iotdb.db.service;
21

22
import org.apache.iotdb.common.rpc.thrift.TConfigNodeLocation;
23
import org.apache.iotdb.common.rpc.thrift.TConsensusGroupType;
24
import org.apache.iotdb.common.rpc.thrift.TDataNodeConfiguration;
25
import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation;
26
import org.apache.iotdb.common.rpc.thrift.TEndPoint;
27
import org.apache.iotdb.common.rpc.thrift.TNodeResource;
28
import org.apache.iotdb.commons.client.exception.ClientManagerException;
29
import org.apache.iotdb.commons.concurrent.IoTDBDefaultThreadExceptionHandler;
30
import org.apache.iotdb.commons.conf.CommonDescriptor;
31
import org.apache.iotdb.commons.conf.IoTDBConstant;
32
import org.apache.iotdb.commons.exception.StartupException;
33
import org.apache.iotdb.commons.file.SystemFileFactory;
34
import org.apache.iotdb.commons.pipe.config.PipeConfig;
35
import org.apache.iotdb.commons.pipe.plugin.meta.PipePluginMeta;
36
import org.apache.iotdb.commons.service.JMXService;
37
import org.apache.iotdb.commons.service.RegisterManager;
38
import org.apache.iotdb.commons.service.metric.MetricService;
39
import org.apache.iotdb.commons.trigger.TriggerInformation;
40
import org.apache.iotdb.commons.trigger.exception.TriggerManagementException;
41
import org.apache.iotdb.commons.trigger.service.TriggerExecutableManager;
42
import org.apache.iotdb.commons.udf.UDFInformation;
43
import org.apache.iotdb.commons.udf.service.UDFClassLoaderManager;
44
import org.apache.iotdb.commons.udf.service.UDFExecutableManager;
45
import org.apache.iotdb.commons.udf.service.UDFManagementService;
46
import org.apache.iotdb.confignode.rpc.thrift.TDataNodeRegisterReq;
47
import org.apache.iotdb.confignode.rpc.thrift.TDataNodeRegisterResp;
48
import org.apache.iotdb.confignode.rpc.thrift.TDataNodeRestartReq;
49
import org.apache.iotdb.confignode.rpc.thrift.TDataNodeRestartResp;
50
import org.apache.iotdb.confignode.rpc.thrift.TGetJarInListReq;
51
import org.apache.iotdb.confignode.rpc.thrift.TGetJarInListResp;
52
import org.apache.iotdb.confignode.rpc.thrift.TNodeVersionInfo;
53
import org.apache.iotdb.confignode.rpc.thrift.TRuntimeConfiguration;
54
import org.apache.iotdb.confignode.rpc.thrift.TSystemConfigurationResp;
55
import org.apache.iotdb.consensus.ConsensusFactory;
56
import org.apache.iotdb.db.conf.DataNodeStartupCheck;
57
import org.apache.iotdb.db.conf.IoTDBConfig;
58
import org.apache.iotdb.db.conf.IoTDBDescriptor;
59
import org.apache.iotdb.db.conf.IoTDBStartCheck;
60
import org.apache.iotdb.db.conf.rest.IoTDBRestServiceDescriptor;
61
import org.apache.iotdb.db.consensus.DataRegionConsensusImpl;
62
import org.apache.iotdb.db.consensus.SchemaRegionConsensusImpl;
63
import org.apache.iotdb.db.pipe.agent.PipeAgent;
64
import org.apache.iotdb.db.protocol.client.ConfigNodeClient;
65
import org.apache.iotdb.db.protocol.client.ConfigNodeClientManager;
66
import org.apache.iotdb.db.protocol.client.ConfigNodeInfo;
67
import org.apache.iotdb.db.protocol.thrift.impl.ClientRPCServiceImpl;
68
import org.apache.iotdb.db.protocol.thrift.impl.DataNodeRegionManager;
69
import org.apache.iotdb.db.queryengine.execution.exchange.MPPDataExchangeService;
70
import org.apache.iotdb.db.queryengine.execution.schedule.DriverScheduler;
71
import org.apache.iotdb.db.schemaengine.SchemaEngine;
72
import org.apache.iotdb.db.schemaengine.template.ClusterTemplateManager;
73
import org.apache.iotdb.db.service.metrics.DataNodeMetricsHelper;
74
import org.apache.iotdb.db.service.metrics.IoTDBInternalLocalReporter;
75
import org.apache.iotdb.db.storageengine.StorageEngine;
76
import org.apache.iotdb.db.storageengine.buffer.CacheHitRatioMonitor;
77
import org.apache.iotdb.db.storageengine.dataregion.compaction.schedule.CompactionTaskManager;
78
import org.apache.iotdb.db.storageengine.dataregion.flush.FlushManager;
79
import org.apache.iotdb.db.storageengine.dataregion.wal.WALManager;
80
import org.apache.iotdb.db.storageengine.dataregion.wal.utils.WALMode;
81
import org.apache.iotdb.db.storageengine.rescon.disk.TierManager;
82
import org.apache.iotdb.db.trigger.executor.TriggerExecutor;
83
import org.apache.iotdb.db.trigger.service.TriggerInformationUpdater;
84
import org.apache.iotdb.db.trigger.service.TriggerManagementService;
85
import org.apache.iotdb.metrics.config.MetricConfigDescriptor;
86
import org.apache.iotdb.metrics.utils.InternalReporterType;
87
import org.apache.iotdb.rpc.TSStatusCode;
88
import org.apache.iotdb.udf.api.exception.UDFManagementException;
89

90
import org.apache.thrift.TException;
91
import org.slf4j.Logger;
92
import org.slf4j.LoggerFactory;
93

94
import java.io.File;
95
import java.io.IOException;
96
import java.nio.ByteBuffer;
97
import java.nio.charset.Charset;
98
import java.util.ArrayList;
99
import java.util.List;
100
import java.util.concurrent.TimeUnit;
101
import java.util.stream.Collectors;
102

103
import static org.apache.iotdb.commons.conf.IoTDBConstant.DEFAULT_CLUSTER_NAME;
104

105
public class DataNode implements DataNodeMBean {
106

107
  private static final Logger logger = LoggerFactory.getLogger(DataNode.class);
1✔
108
  private static final IoTDBConfig config = IoTDBDescriptor.getInstance().getConfig();
1✔
109

110
  private final String mbeanName =
1✔
111
      String.format(
1✔
112
          "%s:%s=%s", "org.apache.iotdb.datanode.service", IoTDBConstant.JMX_TYPE, "DataNode");
113

114
  private static final File SYSTEM_PROPERTIES =
1✔
115
      SystemFileFactory.INSTANCE.getFile(
1✔
116
          config.getSchemaDir() + File.separator + IoTDBStartCheck.PROPERTIES_FILE_NAME);
1✔
117

118
  /**
119
   * When joining a cluster or getting configuration this node will retry at most "DEFAULT_RETRY"
120
   * times before returning a failure to the client.
121
   */
122
  private static final int DEFAULT_RETRY = 10;
123

124
  private static final long DEFAULT_RETRY_INTERVAL_IN_MS = config.getJoinClusterRetryIntervalMs();
1✔
125

126
  private final TEndPoint thisNode = new TEndPoint();
1✔
127

128
  /** Hold the information of trigger, udf...... */
129
  private final ResourcesInformationHolder resourcesInformationHolder =
1✔
130
      new ResourcesInformationHolder();
131

132
  /** Responsible for keeping trigger information up to date. */
133
  private final TriggerInformationUpdater triggerInformationUpdater =
1✔
134
      new TriggerInformationUpdater();
135

136
  private static final String REGISTER_INTERRUPTION =
137
      "Unexpected interruption when waiting to register to the cluster";
138

139
  private DataNode() {
1✔
140
    // We do not init anything here, so that we can re-initialize the instance in IT.
141
  }
1✔
142

143
  private static final RegisterManager registerManager = new RegisterManager();
1✔
144

145
  public static DataNode getInstance() {
146
    return DataNodeHolder.INSTANCE;
1✔
147
  }
148

149
  public static void main(String[] args) {
150
    logger.info("IoTDB-DataNode environment variables: {}", IoTDBConfig.getEnvironmentVariables());
×
151
    logger.info("IoTDB-DataNode default charset is: {}", Charset.defaultCharset().displayName());
×
152
    new DataNodeServerCommandLine().doMain(args);
×
153
  }
×
154

155
  protected void doAddNode() {
156
    boolean isFirstStart = false;
×
157
    try {
158
      // Check if this DataNode is start for the first time and do other pre-checks
159
      isFirstStart = prepareDataNode();
×
160

161
      // Set target ConfigNodeList from iotdb-datanode.properties file
162
      ConfigNodeInfo.getInstance().updateConfigNodeList(config.getTargetConfigNodeList());
×
163

164
      // Pull and check system configurations from ConfigNode-leader
165
      pullAndCheckSystemConfigurations();
×
166

167
      if (isFirstStart) {
×
168
        // Register this DataNode to the cluster when first start
169
        sendRegisterRequestToConfigNode();
×
170
      } else {
171
        // Send restart request of this DataNode
172
        sendRestartRequestToConfigNode();
×
173
      }
174
      // TierManager need DataNodeId to do some operations so the reset method need to be invoked
175
      // after DataNode adding
176
      TierManager.getInstance().resetFolders();
×
177
      // Active DataNode
178
      active();
×
179

180
      // Setup metric service
181
      setUpMetricService();
×
182

183
      // Setup rpc service
184
      setUpRPCService();
×
185

186
      // Serialize mutable system properties
187
      IoTDBStartCheck.getInstance().serializeMutableSystemPropertiesIfNecessary();
×
188

189
      logger.info("IoTDB configuration: {}", config.getConfigMessage());
×
190
      logger.info("Congratulation, IoTDB DataNode is set up successfully. Now, enjoy yourself!");
×
191

192
    } catch (StartupException | IOException e) {
×
193
      logger.error("Fail to start server", e);
×
194
      if (isFirstStart) {
×
195
        // Delete the system.properties file when first start failed.
196
        // Therefore, the next time this DataNode is start will still be seen as the first time.
197
        SYSTEM_PROPERTIES.deleteOnExit();
×
198
      }
199
      stop();
×
200
    }
×
201
  }
×
202

203
  /** Prepare cluster IoTDB-DataNode */
204
  private boolean prepareDataNode() throws StartupException, IOException {
205
    // Set cluster mode
206
    config.setClusterMode(true);
×
207

208
    // Notice: Consider this DataNode as first start if the system.properties file doesn't exist
209
    boolean isFirstStart = IoTDBStartCheck.getInstance().checkIsFirstStart();
×
210

211
    // Set this node
212
    thisNode.setIp(config.getInternalAddress());
×
213
    thisNode.setPort(config.getInternalPort());
×
214

215
    // Startup checks
216
    DataNodeStartupCheck checks = new DataNodeStartupCheck(IoTDBConstant.DN_ROLE, config);
×
217
    checks.startUpCheck();
×
218
    return isFirstStart;
×
219
  }
220

221
  /**
222
   * Pull and check the following system configurations:
223
   *
224
   * <p>1. GlobalConfig
225
   *
226
   * <p>2. RatisConfig
227
   *
228
   * <p>3. CQConfig
229
   *
230
   * @throws StartupException When failed connect to ConfigNode-leader
231
   */
232
  private void pullAndCheckSystemConfigurations() throws StartupException {
233
    logger.info("Pulling system configurations from the ConfigNode-leader...");
×
234

235
    /* Pull system configurations */
236
    int retry = DEFAULT_RETRY;
×
237
    TSystemConfigurationResp configurationResp = null;
×
238
    while (retry > 0) {
×
239
      try (ConfigNodeClient configNodeClient =
240
          ConfigNodeClientManager.getInstance().borrowClient(ConfigNodeInfo.CONFIG_REGION_ID)) {
×
241
        configurationResp = configNodeClient.getSystemConfiguration();
×
242
        break;
243
      } catch (TException | ClientManagerException e) {
×
244
        // Read ConfigNodes from system.properties and retry
245
        logger.warn(
×
246
            "Cannot pull system configurations from ConfigNode-leader, because: {}",
247
            e.getMessage());
×
248
        ConfigNodeInfo.getInstance().loadConfigNodeList();
×
249
        retry--;
×
250
      }
251

252
      try {
253
        // wait to start the next try
254
        Thread.sleep(DEFAULT_RETRY_INTERVAL_IN_MS);
×
255
      } catch (InterruptedException e) {
×
256
        Thread.currentThread().interrupt();
×
257
        logger.warn(REGISTER_INTERRUPTION, e);
×
258
        retry = -1;
×
259
      }
×
260
    }
261
    if (configurationResp == null) {
×
262
      // All tries failed
263
      logger.error(
×
264
          "Cannot pull system configurations from ConfigNode-leader after {} retries",
265
          DEFAULT_RETRY);
×
266
      throw new StartupException("Cannot pull system configurations from ConfigNode-leader");
×
267
    }
268

269
    /* Load system configurations */
270
    IoTDBDescriptor.getInstance().loadGlobalConfig(configurationResp.globalConfig);
×
271
    IoTDBDescriptor.getInstance().loadRatisConfig(configurationResp.ratisConfig);
×
272
    IoTDBDescriptor.getInstance().loadCQConfig(configurationResp.cqConfig);
×
273
    CommonDescriptor.getInstance().loadGlobalConfig(configurationResp.globalConfig);
×
274

275
    /* Set cluster consensus protocol class */
276
    if (!IoTDBStartCheck.getInstance()
×
277
        .checkConsensusProtocolExists(TConsensusGroupType.DataRegion)) {
×
278
      config.setDataRegionConsensusProtocolClass(
×
279
          configurationResp.globalConfig.getDataRegionConsensusProtocolClass());
×
280
    }
281

282
    if (!IoTDBStartCheck.getInstance()
×
283
        .checkConsensusProtocolExists(TConsensusGroupType.SchemaRegion)) {
×
284
      config.setSchemaRegionConsensusProtocolClass(
×
285
          configurationResp.globalConfig.getSchemaRegionConsensusProtocolClass());
×
286
    }
287

288
    /* Check system configurations */
289
    try {
290
      IoTDBStartCheck.getInstance().checkSystemConfig();
×
291
      IoTDBStartCheck.getInstance().checkDirectory();
×
292
      if (!config.getDataRegionConsensusProtocolClass().equals(ConsensusFactory.IOT_CONSENSUS)) {
×
293
        // In current implementation, only IoTConsensus need separated memory from Consensus
294
        IoTDBDescriptor.getInstance().reclaimConsensusMemory();
×
295
      }
296
    } catch (Exception e) {
×
297
      throw new StartupException(e.getMessage());
×
298
    }
×
299

300
    logger.info("Successfully pull system configurations from ConfigNode-leader.");
×
301
  }
×
302

303
  /**
304
   * Store runtime configurations, which includes:
305
   *
306
   * <p>1. All ConfigNodes in cluster
307
   *
308
   * <p>2. All template information
309
   *
310
   * <p>3. All UDF information
311
   *
312
   * <p>4. All trigger information
313
   *
314
   * <p>5. All Pipe information
315
   *
316
   * <p>6. All TTL information
317
   */
318
  private void storeRuntimeConfigurations(
319
      List<TConfigNodeLocation> configNodeLocations, TRuntimeConfiguration runtimeConfiguration) {
320
    /* Store ConfigNodeList */
321
    List<TEndPoint> configNodeList = new ArrayList<>();
×
322
    for (TConfigNodeLocation configNodeLocation : configNodeLocations) {
×
323
      configNodeList.add(configNodeLocation.getInternalEndPoint());
×
324
    }
×
325
    ConfigNodeInfo.getInstance().updateConfigNodeList(configNodeList);
×
326

327
    /* Store templateSetInfo */
328
    ClusterTemplateManager.getInstance()
×
329
        .updateTemplateSetInfo(runtimeConfiguration.getTemplateInfo());
×
330

331
    /* Store udfInformationList */
332
    getUDFInformationList(runtimeConfiguration.getAllUDFInformation());
×
333

334
    /* Store triggerInformationList */
335
    getTriggerInformationList(runtimeConfiguration.getAllTriggerInformation());
×
336

337
    /* Store pipeInformationList */
338
    getPipeInformationList(runtimeConfiguration.getAllPipeInformation());
×
339

340
    /* Store ttl information */
341
    StorageEngine.getInstance().updateTTLInfo(runtimeConfiguration.getAllTTLInformation());
×
342
  }
×
343

344
  /**
345
   * Register this DataNode into cluster.
346
   *
347
   * @throws StartupException if register failed.
348
   * @throws IOException if serialize cluster name and dataNode Id failed.
349
   */
350
  private void sendRegisterRequestToConfigNode() throws StartupException, IOException {
351
    logger.info("Sending register request to ConfigNode-leader...");
×
352

353
    /* Send register request */
354
    int retry = DEFAULT_RETRY;
×
355
    TDataNodeRegisterReq req = new TDataNodeRegisterReq();
×
356
    req.setDataNodeConfiguration(generateDataNodeConfiguration());
×
357
    req.setClusterName(config.getClusterName());
×
358
    req.setVersionInfo(new TNodeVersionInfo(IoTDBConstant.VERSION, IoTDBConstant.BUILD_INFO));
×
359
    TDataNodeRegisterResp dataNodeRegisterResp = null;
×
360
    while (retry > 0) {
×
361
      try (ConfigNodeClient configNodeClient =
362
          ConfigNodeClientManager.getInstance().borrowClient(ConfigNodeInfo.CONFIG_REGION_ID)) {
×
363
        dataNodeRegisterResp = configNodeClient.registerDataNode(req);
×
364
        break;
365
      } catch (TException | ClientManagerException e) {
×
366
        // Read ConfigNodes from system.properties and retry
367
        logger.warn("Cannot register to the cluster, because: {}", e.getMessage());
×
368
        ConfigNodeInfo.getInstance().loadConfigNodeList();
×
369
        retry--;
×
370
      }
371

372
      try {
373
        // Wait to start the next try
374
        Thread.sleep(DEFAULT_RETRY_INTERVAL_IN_MS);
×
375
      } catch (InterruptedException e) {
×
376
        Thread.currentThread().interrupt();
×
377
        logger.warn(REGISTER_INTERRUPTION, e);
×
378
        retry = -1;
×
379
      }
×
380
    }
381
    if (dataNodeRegisterResp == null) {
×
382
      // All tries failed
383
      logger.error(
×
384
          "Cannot register into cluster after {} retries. "
385
              + "Please check dn_target_config_node_list in iotdb-datanode.properties.",
386
          DEFAULT_RETRY);
×
387
      throw new StartupException("Cannot register into the cluster.");
×
388
    }
389

390
    if (dataNodeRegisterResp.getStatus().getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
×
391

392
      /* Store runtime configurations when register success */
393
      int dataNodeID = dataNodeRegisterResp.getDataNodeId();
×
394
      config.setDataNodeId(dataNodeID);
×
395
      IoTDBStartCheck.getInstance()
×
396
          .serializeClusterNameAndDataNodeId(config.getClusterName(), dataNodeID);
×
397

398
      storeRuntimeConfigurations(
×
399
          dataNodeRegisterResp.getConfigNodeList(), dataNodeRegisterResp.getRuntimeConfiguration());
×
400

401
      logger.info("Successfully register to the cluster: {}", config.getClusterName());
×
402
    } else {
×
403
      /* Throw exception when register failed */
404
      logger.error(dataNodeRegisterResp.getStatus().getMessage());
×
405
      throw new StartupException("Cannot register to the cluster.");
×
406
    }
407
  }
×
408

409
  private void sendRestartRequestToConfigNode() throws StartupException {
410
    logger.info("Sending restart request to ConfigNode-leader...");
×
411

412
    /* Send restart request */
413
    int retry = DEFAULT_RETRY;
×
414
    TDataNodeRestartReq req = new TDataNodeRestartReq();
×
415
    req.setClusterName(
×
416
        config.getClusterName() == null ? DEFAULT_CLUSTER_NAME : config.getClusterName());
×
417
    req.setDataNodeConfiguration(generateDataNodeConfiguration());
×
418
    req.setVersionInfo(new TNodeVersionInfo(IoTDBConstant.VERSION, IoTDBConstant.BUILD_INFO));
×
419
    TDataNodeRestartResp dataNodeRestartResp = null;
×
420
    while (retry > 0) {
×
421
      try (ConfigNodeClient configNodeClient =
422
          ConfigNodeClientManager.getInstance().borrowClient(ConfigNodeInfo.CONFIG_REGION_ID)) {
×
423
        dataNodeRestartResp = configNodeClient.restartDataNode(req);
×
424
        break;
425
      } catch (TException | ClientManagerException e) {
×
426
        // Read ConfigNodes from system.properties and retry
427
        logger.warn(
×
428
            "Cannot send restart request to the ConfigNode-leader, because: {}", e.getMessage());
×
429
        ConfigNodeInfo.getInstance().loadConfigNodeList();
×
430
        retry--;
×
431
      }
432

433
      try {
434
        // wait to start the next try
435
        Thread.sleep(DEFAULT_RETRY_INTERVAL_IN_MS);
×
436
      } catch (InterruptedException e) {
×
437
        Thread.currentThread().interrupt();
×
438
        logger.warn(REGISTER_INTERRUPTION, e);
×
439
        retry = -1;
×
440
      }
×
441
    }
442
    if (dataNodeRestartResp == null) {
×
443
      // All tries failed
444
      logger.error(
×
445
          "Cannot send restart DataNode request to ConfigNode-leader after {} retries. "
446
              + "Please check dn_target_config_node_list in iotdb-datanode.properties.",
447
          DEFAULT_RETRY);
×
448
      throw new StartupException("Cannot send restart DataNode request to ConfigNode-leader.");
×
449
    }
450

451
    if (dataNodeRestartResp.getStatus().getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
×
452
      /* Store runtime configurations when restart request is accepted */
453
      storeRuntimeConfigurations(
×
454
          dataNodeRestartResp.getConfigNodeList(), dataNodeRestartResp.getRuntimeConfiguration());
×
455
      logger.info("Restart request to cluster: {} is accepted.", config.getClusterName());
×
456
    } else {
457
      /* Throw exception when restart is rejected */
458
      throw new StartupException(dataNodeRestartResp.getStatus().getMessage());
×
459
    }
460
  }
×
461

462
  private void prepareResources() throws StartupException {
463
    prepareUDFResources();
×
464
    prepareTriggerResources();
×
465
    preparePipeResources();
×
466
  }
×
467

468
  /**
469
   * Register services and set up DataNode.
470
   *
471
   * @throws StartupException if start up failed.
472
   */
473
  private void active() throws StartupException {
474
    try {
475
      processPid();
×
476
      setUp();
×
477
    } catch (StartupException e) {
×
478
      logger.error("Meet error while starting up.", e);
×
479
      throw new StartupException("Error in activating IoTDB DataNode.");
×
480
    }
×
481
    logger.info("IoTDB DataNode has started.");
×
482

483
    try {
484
      SchemaRegionConsensusImpl.getInstance().start();
×
485
      DataRegionConsensusImpl.getInstance().start();
×
486
    } catch (IOException e) {
×
487
      throw new StartupException(e);
×
488
    }
×
489
  }
×
490

491
  void processPid() {
492
    String pidFile = System.getProperty(IoTDBConstant.IOTDB_PIDFILE);
1✔
493
    if (pidFile != null) {
1✔
494
      new File(pidFile).deleteOnExit();
1✔
495
    }
496
  }
1✔
497

498
  private void setUp() throws StartupException {
499
    logger.info("Setting up IoTDB DataNode...");
×
500
    registerManager.register(new JMXService());
×
501
    JMXService.registerMBean(getInstance(), mbeanName);
×
502

503
    // Get resources for trigger,udf,pipe...
504
    prepareResources();
×
505

506
    Runtime.getRuntime().addShutdownHook(new IoTDBShutdownHook());
×
507
    setUncaughtExceptionHandler();
×
508

509
    logger.info("Recover the schema...");
×
510
    initSchemaEngine();
×
511
    registerManager.register(FlushManager.getInstance());
×
512
    registerManager.register(CacheHitRatioMonitor.getInstance());
×
513

514
    // Close wal when using ratis consensus
515
    if (config.isClusterMode()
×
516
        && config.getDataRegionConsensusProtocolClass().equals(ConsensusFactory.RATIS_CONSENSUS)) {
×
517
      config.setWalMode(WALMode.DISABLE);
×
518
    }
519
    registerManager.register(WALManager.getInstance());
×
520

521
    // In mpp mode we need to start some other services
522
    registerManager.register(StorageEngine.getInstance());
×
523
    registerManager.register(MPPDataExchangeService.getInstance());
×
524
    registerManager.register(DriverScheduler.getInstance());
×
525

526
    registerUdfServices();
×
527

528
    logger.info(
×
529
        "IoTDB DataNode is setting up, some databases may not be ready now, please wait several seconds...");
530

531
    while (!StorageEngine.getInstance().isAllSgReady()) {
×
532
      try {
533
        TimeUnit.MILLISECONDS.sleep(1000);
×
534
      } catch (InterruptedException e) {
×
535
        logger.warn("IoTDB DataNode failed to set up.", e);
×
536
        Thread.currentThread().interrupt();
×
537
        return;
×
538
      }
×
539
    }
540

541
    // Must init after SchemaEngine and StorageEngine prepared well
542
    DataNodeRegionManager.getInstance().init();
×
543

544
    // Start region migrate service
545
    registerManager.register(RegionMigrateService.getInstance());
×
546

547
    registerManager.register(CompactionTaskManager.getInstance());
×
548

549
    registerManager.register(PipeAgent.runtime());
×
550
  }
×
551

552
  /** Set up RPC and protocols after DataNode is available */
553
  private void setUpRPCService() throws StartupException {
554
    // Start InternalRPCService to indicate that the current DataNode can accept cluster scheduling
555
    registerManager.register(DataNodeInternalRPCService.getInstance());
×
556
    // Start InternalRPCService to indicate that the current DataNode can accept request from MLNode
557
    if (config.isEnableMLNodeService()) {
×
558
      registerManager.register(MLNodeRPCService.getInstance());
×
559
    }
560

561
    // Notice: During the period between starting the internal RPC service
562
    // and starting the client RPC service , some requests may fail because
563
    // DataNode is not marked as RUNNING by ConfigNode-leader yet.
564

565
    // Start client RPCService to indicate that the current DataNode provide external services
566
    IoTDBDescriptor.getInstance()
×
567
        .getConfig()
×
568
        .setRpcImplClassName(ClientRPCServiceImpl.class.getName());
×
569
    if (config.isEnableRpcService()) {
×
570
      registerManager.register(RPCService.getInstance());
×
571
    }
572
    // init service protocols
573
    initProtocols();
×
574
  }
×
575

576
  private void setUpMetricService() throws StartupException {
577
    MetricConfigDescriptor.getInstance().getMetricConfig().setNodeId(config.getDataNodeId());
×
578
    registerManager.register(MetricService.getInstance());
×
579

580
    // init metric service
581
    if (MetricConfigDescriptor.getInstance()
×
582
        .getMetricConfig()
×
583
        .getInternalReportType()
×
584
        .equals(InternalReporterType.IOTDB)) {
×
585
      MetricService.getInstance().updateInternalReporter(new IoTDBInternalLocalReporter());
×
586
    }
587
    MetricService.getInstance().startInternalReporter();
×
588
    // bind predefined metrics
589
    DataNodeMetricsHelper.bind();
×
590
  }
×
591

592
  public static TDataNodeLocation generateDataNodeLocation() {
593
    TDataNodeLocation location = new TDataNodeLocation();
×
594
    location.setDataNodeId(config.getDataNodeId());
×
595
    location.setClientRpcEndPoint(new TEndPoint(config.getRpcAddress(), config.getRpcPort()));
×
596
    location.setInternalEndPoint(
×
597
        new TEndPoint(config.getInternalAddress(), config.getInternalPort()));
×
598
    location.setMPPDataExchangeEndPoint(
×
599
        new TEndPoint(config.getInternalAddress(), config.getMppDataExchangePort()));
×
600
    location.setDataRegionConsensusEndPoint(
×
601
        new TEndPoint(config.getInternalAddress(), config.getDataRegionConsensusPort()));
×
602
    location.setSchemaRegionConsensusEndPoint(
×
603
        new TEndPoint(config.getInternalAddress(), config.getSchemaRegionConsensusPort()));
×
604
    return location;
×
605
  }
606

607
  /**
608
   * Generate dataNodeConfiguration.
609
   *
610
   * @return TDataNodeConfiguration
611
   */
612
  private TDataNodeConfiguration generateDataNodeConfiguration() {
613
    // Set DataNodeLocation
614
    TDataNodeLocation location = generateDataNodeLocation();
×
615

616
    // Set NodeResource
617
    TNodeResource resource = new TNodeResource();
×
618
    resource.setCpuCoreNum(Runtime.getRuntime().availableProcessors());
×
619
    resource.setMaxMemory(Runtime.getRuntime().totalMemory());
×
620

621
    return new TDataNodeConfiguration(location, resource);
×
622
  }
623

624
  private void registerUdfServices() throws StartupException {
625
    registerManager.register(TemporaryQueryDataFileService.getInstance());
×
626
    registerManager.register(UDFClassLoaderManager.setupAndGetInstance(config.getUdfDir()));
×
627
  }
×
628

629
  private void initUDFRelatedInstance() throws StartupException {
630
    try {
631
      UDFExecutableManager.setupAndGetInstance(config.getUdfTemporaryLibDir(), config.getUdfDir());
×
632
      UDFClassLoaderManager.setupAndGetInstance(config.getUdfDir());
×
633
    } catch (IOException e) {
×
634
      throw new StartupException(e);
×
635
    }
×
636
  }
×
637

638
  private void prepareUDFResources() throws StartupException {
639
    initUDFRelatedInstance();
×
640
    if (resourcesInformationHolder.getUDFInformationList() == null
×
641
        || resourcesInformationHolder.getUDFInformationList().isEmpty()) {
×
642
      return;
×
643
    }
644

645
    // Get jars from config node
646
    List<UDFInformation> udfNeedJarList = getJarListForUDF();
×
647
    int index = 0;
×
648
    while (index < udfNeedJarList.size()) {
×
649
      List<UDFInformation> curList = new ArrayList<>();
×
650
      int offset = 0;
×
651
      while (offset < ResourcesInformationHolder.getJarNumOfOneRpc()
×
652
          && index + offset < udfNeedJarList.size()) {
×
653
        curList.add(udfNeedJarList.get(index + offset));
×
654
        offset++;
×
655
      }
656
      index += (offset + 1);
×
657
      getJarOfUDFs(curList);
×
658
    }
×
659

660
    // Create instances of udf and do registration
661
    try {
662
      for (UDFInformation udfInformation : resourcesInformationHolder.getUDFInformationList()) {
×
663
        UDFManagementService.getInstance().doRegister(udfInformation);
×
664
      }
×
665
    } catch (Exception e) {
×
666
      throw new StartupException(e);
×
667
    }
×
668

669
    logger.debug("successfully registered all the UDFs");
×
670
    if (logger.isDebugEnabled()) {
×
671
      for (UDFInformation udfInformation :
672
          UDFManagementService.getInstance().getAllUDFInformation()) {
×
673
        logger.debug("get udf: {}", udfInformation.getFunctionName());
×
674
      }
675
    }
676
  }
×
677

678
  private void getJarOfUDFs(List<UDFInformation> udfInformationList) throws StartupException {
679
    try (ConfigNodeClient configNodeClient =
680
        ConfigNodeClientManager.getInstance().borrowClient(ConfigNodeInfo.CONFIG_REGION_ID)) {
×
681
      List<String> jarNameList =
×
682
          udfInformationList.stream().map(UDFInformation::getJarName).collect(Collectors.toList());
×
683
      TGetJarInListResp resp = configNodeClient.getUDFJar(new TGetJarInListReq(jarNameList));
×
684
      if (resp.getStatus().getCode() == TSStatusCode.EXECUTE_STATEMENT_ERROR.getStatusCode()) {
×
685
        throw new StartupException("Failed to get UDF jar from config node.");
×
686
      }
687
      List<ByteBuffer> jarList = resp.getJarList();
×
688
      for (int i = 0; i < udfInformationList.size(); i++) {
×
689
        UDFExecutableManager.getInstance()
×
690
            .saveToInstallDir(jarList.get(i), udfInformationList.get(i).getJarName());
×
691
      }
692
    } catch (IOException | TException | ClientManagerException e) {
×
693
      throw new StartupException(e);
×
694
    }
×
695
  }
×
696

697
  /** Generate a list for UDFs that do not have jar on this node. */
698
  private List<UDFInformation> getJarListForUDF() {
699
    List<UDFInformation> res = new ArrayList<>();
×
700
    for (UDFInformation udfInformation : resourcesInformationHolder.getUDFInformationList()) {
×
701
      if (udfInformation.isUsingURI()) {
×
702
        // Jar does not exist, add current udfInformation to list
703
        if (!UDFExecutableManager.getInstance()
×
704
            .hasFileUnderInstallDir(udfInformation.getJarName())) {
×
705
          res.add(udfInformation);
×
706
        } else {
707
          try {
708
            // Local jar has conflicts with jar on config node, add current triggerInformation to
709
            // list
710
            if (UDFManagementService.getInstance().isLocalJarConflicted(udfInformation)) {
×
711
              res.add(udfInformation);
×
712
            }
713
          } catch (UDFManagementException e) {
×
714
            res.add(udfInformation);
×
715
          }
×
716
        }
717
      }
718
    }
×
719
    return res;
×
720
  }
721

722
  private void getUDFInformationList(List<ByteBuffer> allUDFInformation) {
723
    if (allUDFInformation != null && !allUDFInformation.isEmpty()) {
×
724
      List<UDFInformation> list = new ArrayList<>();
×
725
      for (ByteBuffer UDFInformationByteBuffer : allUDFInformation) {
×
726
        list.add(UDFInformation.deserialize(UDFInformationByteBuffer));
×
727
      }
×
728
      resourcesInformationHolder.setUDFInformationList(list);
×
729
    }
730
  }
×
731

732
  private void initTriggerRelatedInstance() throws StartupException {
733
    try {
734
      TriggerExecutableManager.setupAndGetInstance(
×
735
          config.getTriggerTemporaryLibDir(), config.getTriggerDir());
×
736
    } catch (IOException e) {
×
737
      throw new StartupException(e);
×
738
    }
×
739
  }
×
740

741
  private void prepareTriggerResources() throws StartupException {
742
    initTriggerRelatedInstance();
×
743
    if (resourcesInformationHolder.getTriggerInformationList() == null
×
744
        || resourcesInformationHolder.getTriggerInformationList().isEmpty()) {
×
745
      return;
×
746
    }
747

748
    // Get jars from config node
749
    List<TriggerInformation> triggerNeedJarList = getJarListForTrigger();
×
750
    int index = 0;
×
751
    while (index < triggerNeedJarList.size()) {
×
752
      List<TriggerInformation> curList = new ArrayList<>();
×
753
      int offset = 0;
×
754
      while (offset < ResourcesInformationHolder.getJarNumOfOneRpc()
×
755
          && index + offset < triggerNeedJarList.size()) {
×
756
        curList.add(triggerNeedJarList.get(index + offset));
×
757
        offset++;
×
758
      }
759
      index += (offset + 1);
×
760
      getJarOfTriggers(curList);
×
761
    }
×
762

763
    // Create instances of triggers and do registration
764
    try {
765
      for (TriggerInformation triggerInformation :
766
          resourcesInformationHolder.getTriggerInformationList()) {
×
767
        TriggerManagementService.getInstance().doRegister(triggerInformation, true);
×
768
      }
×
769
    } catch (Exception e) {
×
770
      throw new StartupException(e);
×
771
    }
×
772
    logger.debug("successfully registered all the triggers");
×
773
    if (logger.isDebugEnabled()) {
×
774
      for (TriggerInformation triggerInformation :
775
          TriggerManagementService.getInstance().getAllTriggerInformationInTriggerTable()) {
×
776
        logger.debug("get trigger: {}", triggerInformation.getTriggerName());
×
777
      }
×
778
      for (TriggerExecutor triggerExecutor :
779
          TriggerManagementService.getInstance().getAllTriggerExecutors()) {
×
780
        logger.debug(
×
781
            "get trigger executor: {}", triggerExecutor.getTriggerInformation().getTriggerName());
×
782
      }
×
783
    }
784
    // Start TriggerInformationUpdater
785
    triggerInformationUpdater.startTriggerInformationUpdater();
×
786
  }
×
787

788
  private void getJarOfTriggers(List<TriggerInformation> triggerInformationList)
789
      throws StartupException {
790
    try (ConfigNodeClient configNodeClient =
791
        ConfigNodeClientManager.getInstance().borrowClient(ConfigNodeInfo.CONFIG_REGION_ID)) {
×
792
      List<String> jarNameList =
×
793
          triggerInformationList.stream()
×
794
              .map(TriggerInformation::getJarName)
×
795
              .collect(Collectors.toList());
×
796
      TGetJarInListResp resp = configNodeClient.getTriggerJar(new TGetJarInListReq(jarNameList));
×
797
      if (resp.getStatus().getCode() == TSStatusCode.EXECUTE_STATEMENT_ERROR.getStatusCode()) {
×
798
        throw new StartupException("Failed to get trigger jar from config node.");
×
799
      }
800
      List<ByteBuffer> jarList = resp.getJarList();
×
801
      for (int i = 0; i < triggerInformationList.size(); i++) {
×
802
        TriggerExecutableManager.getInstance()
×
803
            .saveToInstallDir(jarList.get(i), triggerInformationList.get(i).getJarName());
×
804
      }
805
    } catch (IOException | TException | ClientManagerException e) {
×
806
      throw new StartupException(e);
×
807
    }
×
808
  }
×
809

810
  /** Generate a list for triggers that do not have jar on this node. */
811
  private List<TriggerInformation> getJarListForTrigger() {
812
    List<TriggerInformation> res = new ArrayList<>();
×
813
    for (TriggerInformation triggerInformation :
814
        resourcesInformationHolder.getTriggerInformationList()) {
×
815
      if (triggerInformation.isUsingURI()) {
×
816
        // jar does not exist, add current triggerInformation to list
817
        if (!TriggerExecutableManager.getInstance()
×
818
            .hasFileUnderInstallDir(triggerInformation.getJarName())) {
×
819
          res.add(triggerInformation);
×
820
        } else {
821
          try {
822
            // local jar has conflicts with jar on config node, add current triggerInformation to
823
            // list
824
            if (TriggerManagementService.getInstance().isLocalJarConflicted(triggerInformation)) {
×
825
              res.add(triggerInformation);
×
826
            }
827
          } catch (TriggerManagementException e) {
×
828
            res.add(triggerInformation);
×
829
          }
×
830
        }
831
      }
832
    }
×
833
    return res;
×
834
  }
835

836
  private void getTriggerInformationList(List<ByteBuffer> allTriggerInformation) {
837
    if (allTriggerInformation != null && !allTriggerInformation.isEmpty()) {
×
838
      List<TriggerInformation> list = new ArrayList<>();
×
839
      for (ByteBuffer triggerInformationByteBuffer : allTriggerInformation) {
×
840
        list.add(TriggerInformation.deserialize(triggerInformationByteBuffer));
×
841
      }
×
842
      resourcesInformationHolder.setTriggerInformationList(list);
×
843
    }
844
  }
×
845

846
  private void preparePipeResources() throws StartupException {
847
    PipeAgent.runtime().preparePipeResources(resourcesInformationHolder);
×
848
  }
×
849

850
  private void getPipeInformationList(List<ByteBuffer> allPipeInformation) {
851
    final List<PipePluginMeta> list = new ArrayList<>();
×
852
    if (allPipeInformation != null) {
×
853
      for (ByteBuffer pipeInformationByteBuffer : allPipeInformation) {
×
854
        list.add(PipePluginMeta.deserialize(pipeInformationByteBuffer));
×
855
      }
×
856
    }
857
    resourcesInformationHolder.setPipePluginMetaList(list);
×
858
  }
×
859

860
  private void initSchemaEngine() {
861
    long time = System.currentTimeMillis();
×
862
    SchemaEngine.getInstance().init();
×
863
    long end = System.currentTimeMillis() - time;
×
864
    logger.info("Spent {}ms to recover schema.", end);
×
865
  }
×
866

867
  public void stop() {
868
    deactivate();
×
869

870
    try {
871
      MetricService.getInstance().stop();
×
872
      SchemaRegionConsensusImpl.getInstance().stop();
×
873
      DataRegionConsensusImpl.getInstance().stop();
×
874
    } catch (Exception e) {
×
875
      logger.error("Stop data node error", e);
×
876
    }
×
877
  }
×
878

879
  private void initProtocols() throws StartupException {
880
    if (config.isEnableMQTTService()) {
×
881
      registerManager.register(MQTTService.getInstance());
×
882
    }
883
    if (IoTDBRestServiceDescriptor.getInstance().getConfig().isEnableRestService()) {
×
884
      registerManager.register(RestService.getInstance());
×
885
    }
886
    if (PipeConfig.getInstance().getPipeAirGapReceiverEnabled()) {
×
887
      registerManager.register(PipeAgent.receiver().airGap());
×
888
    }
889
  }
×
890

891
  private void deactivate() {
892
    logger.info("Deactivating IoTDB DataNode...");
×
893
    stopTriggerRelatedServices();
×
894
    registerManager.deregisterAll();
×
895
    JMXService.deregisterMBean(mbeanName);
×
896
    logger.info("IoTDB DataNode is deactivated.");
×
897
  }
×
898

899
  private void stopTriggerRelatedServices() {
900
    triggerInformationUpdater.stopTriggerInformationUpdater();
×
901
  }
×
902

903
  private void setUncaughtExceptionHandler() {
904
    Thread.setDefaultUncaughtExceptionHandler(new IoTDBDefaultThreadExceptionHandler());
×
905
  }
×
906

907
  private static class DataNodeHolder {
908

909
    private static final DataNode INSTANCE = new DataNode();
1✔
910

911
    private DataNodeHolder() {
912
      // Empty constructor
913
    }
914
  }
915
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc