• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

apache / iotdb / #9890

22 Aug 2023 09:07AM UTC coverage: 47.922% (-0.07%) from 47.992%
#9890

push

travis_ci

web-flow
[IOTDB-6114] Pipe: Support multi-cluster data sync (#10868)(#10926)

306 of 306 new or added lines in 33 files covered. (100.0%)

79862 of 166649 relevant lines covered (47.92%)

0.48 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

31.69
/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/StorageEngine.java
1
/*
2
 * Licensed to the Apache Software Foundation (ASF) under one
3
 * or more contributor license agreements.  See the NOTICE file
4
 * distributed with this work for additional information
5
 * regarding copyright ownership.  The ASF licenses this file
6
 * to you under the Apache License, Version 2.0 (the
7
 * "License"); you may not use this file except in compliance
8
 * with the License.  You may obtain a copy of the License at
9
 *
10
 *     http://www.apache.org/licenses/LICENSE-2.0
11
 *
12
 * Unless required by applicable law or agreed to in writing,
13
 * software distributed under the License is distributed on an
14
 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15
 * KIND, either express or implied.  See the License for the
16
 * specific language governing permissions and limitations
17
 * under the License.
18
 */
19
package org.apache.iotdb.db.storageengine;
20

21
import org.apache.iotdb.common.rpc.thrift.TFlushReq;
22
import org.apache.iotdb.common.rpc.thrift.TSStatus;
23
import org.apache.iotdb.common.rpc.thrift.TSetTTLReq;
24
import org.apache.iotdb.commons.concurrent.IoTDBThreadPoolFactory;
25
import org.apache.iotdb.commons.concurrent.ThreadName;
26
import org.apache.iotdb.commons.concurrent.threadpool.ScheduledExecutorUtil;
27
import org.apache.iotdb.commons.conf.CommonDescriptor;
28
import org.apache.iotdb.commons.conf.IoTDBConstant;
29
import org.apache.iotdb.commons.consensus.DataRegionId;
30
import org.apache.iotdb.commons.exception.ShutdownException;
31
import org.apache.iotdb.commons.file.SystemFileFactory;
32
import org.apache.iotdb.commons.service.IService;
33
import org.apache.iotdb.commons.service.ServiceType;
34
import org.apache.iotdb.commons.utils.TestOnly;
35
import org.apache.iotdb.consensus.ConsensusFactory;
36
import org.apache.iotdb.db.conf.IoTDBConfig;
37
import org.apache.iotdb.db.conf.IoTDBDescriptor;
38
import org.apache.iotdb.db.consensus.statemachine.dataregion.DataExecutionVisitor;
39
import org.apache.iotdb.db.exception.DataRegionException;
40
import org.apache.iotdb.db.exception.LoadFileException;
41
import org.apache.iotdb.db.exception.StorageEngineException;
42
import org.apache.iotdb.db.exception.TsFileProcessorException;
43
import org.apache.iotdb.db.exception.WriteProcessRejectException;
44
import org.apache.iotdb.db.exception.runtime.StorageEngineFailureException;
45
import org.apache.iotdb.db.queryengine.execution.load.LoadTsFileManager;
46
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNode;
47
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.load.LoadTsFilePieceNode;
48
import org.apache.iotdb.db.queryengine.plan.scheduler.load.LoadTsFileScheduler;
49
import org.apache.iotdb.db.service.metrics.WritingMetrics;
50
import org.apache.iotdb.db.storageengine.buffer.BloomFilterCache;
51
import org.apache.iotdb.db.storageengine.buffer.ChunkCache;
52
import org.apache.iotdb.db.storageengine.buffer.TimeSeriesMetadataCache;
53
import org.apache.iotdb.db.storageengine.dataregion.DataRegion;
54
import org.apache.iotdb.db.storageengine.dataregion.flush.CloseFileListener;
55
import org.apache.iotdb.db.storageengine.dataregion.flush.FlushListener;
56
import org.apache.iotdb.db.storageengine.dataregion.flush.TsFileFlushPolicy;
57
import org.apache.iotdb.db.storageengine.dataregion.flush.TsFileFlushPolicy.DirectFlushPolicy;
58
import org.apache.iotdb.db.storageengine.dataregion.memtable.TsFileProcessor;
59
import org.apache.iotdb.db.storageengine.dataregion.wal.WALManager;
60
import org.apache.iotdb.db.storageengine.dataregion.wal.exception.WALException;
61
import org.apache.iotdb.db.storageengine.dataregion.wal.recover.WALRecoverManager;
62
import org.apache.iotdb.db.storageengine.rescon.memory.SystemInfo;
63
import org.apache.iotdb.db.utils.ThreadUtils;
64
import org.apache.iotdb.rpc.RpcUtils;
65
import org.apache.iotdb.rpc.TSStatusCode;
66
import org.apache.iotdb.tsfile.utils.FilePathUtils;
67
import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
68

69
import org.apache.commons.io.FileUtils;
70
import org.slf4j.Logger;
71
import org.slf4j.LoggerFactory;
72

73
import java.io.File;
74
import java.io.IOException;
75
import java.nio.ByteBuffer;
76
import java.util.ArrayList;
77
import java.util.ConcurrentModificationException;
78
import java.util.HashMap;
79
import java.util.LinkedList;
80
import java.util.List;
81
import java.util.Map;
82
import java.util.Objects;
83
import java.util.concurrent.Callable;
84
import java.util.concurrent.ConcurrentHashMap;
85
import java.util.concurrent.CountDownLatch;
86
import java.util.concurrent.ExecutionException;
87
import java.util.concurrent.ExecutorService;
88
import java.util.concurrent.Future;
89
import java.util.concurrent.ScheduledExecutorService;
90
import java.util.concurrent.TimeUnit;
91
import java.util.concurrent.atomic.AtomicBoolean;
92
import java.util.concurrent.atomic.AtomicInteger;
93
import java.util.concurrent.atomic.AtomicReference;
94
import java.util.function.Consumer;
95

96
import static org.apache.iotdb.commons.conf.IoTDBConstant.FILE_NAME_SEPARATOR;
97

98
public class StorageEngine implements IService {
99
  private static final Logger logger = LoggerFactory.getLogger(StorageEngine.class);
1✔
100

101
  private static final IoTDBConfig config = IoTDBDescriptor.getInstance().getConfig();
1✔
102
  private static final long TTL_CHECK_INTERVAL = 60 * 1000L;
103
  private static final WritingMetrics WRITING_METRICS = WritingMetrics.getInstance();
1✔
104

105
  /** Time range for dividing database, the time unit is the same with IoTDB's TimestampPrecision */
106
  private static long timePartitionInterval = -1;
1✔
107

108
  /**
109
   * a folder (system/databases/ by default) that persist system info. Each database will have a
110
   * subfolder under the systemDir.
111
   */
112
  private final String systemDir =
1✔
113
      FilePathUtils.regularizePath(config.getSystemDir()) + "databases";
1✔
114

115
  /** DataRegionId -> DataRegion */
116
  private final ConcurrentHashMap<DataRegionId, DataRegion> dataRegionMap =
1✔
117
      new ConcurrentHashMap<>();
118

119
  /** DataRegionId -> DataRegion which is being deleted */
120
  private final ConcurrentHashMap<DataRegionId, DataRegion> deletingDataRegionMap =
1✔
121
      new ConcurrentHashMap<>();
122

123
  /** Database name -> ttl, for region recovery only */
124
  private final Map<String, Long> ttlMapForRecover = new ConcurrentHashMap<>();
1✔
125

126
  /** number of ready data region */
127
  private AtomicInteger readyDataRegionNum;
128

129
  private AtomicBoolean isAllSgReady = new AtomicBoolean(false);
1✔
130

131
  private ScheduledExecutorService ttlCheckThread;
132
  private ScheduledExecutorService seqMemtableTimedFlushCheckThread;
133
  private ScheduledExecutorService unseqMemtableTimedFlushCheckThread;
134

135
  private TsFileFlushPolicy fileFlushPolicy = new DirectFlushPolicy();
1✔
136
  /** used to do short-lived asynchronous tasks */
137
  private ExecutorService cachedThreadPool;
138
  // add customized listeners here for flush and close events
139
  private List<CloseFileListener> customCloseFileListeners = new ArrayList<>();
1✔
140
  private List<FlushListener> customFlushListeners = new ArrayList<>();
1✔
141
  private int recoverDataRegionNum = 0;
1✔
142

143
  private LoadTsFileManager loadTsFileManager;
144

145
  private StorageEngine() {}
1✔
146

147
  public static StorageEngine getInstance() {
148
    return InstanceHolder.INSTANCE;
1✔
149
  }
150

151
  private static void initTimePartition() {
152
    timePartitionInterval = CommonDescriptor.getInstance().getConfig().getTimePartitionInterval();
1✔
153
  }
1✔
154

155
  public static long getTimePartitionInterval() {
156
    if (timePartitionInterval == -1) {
×
157
      initTimePartition();
×
158
    }
159
    return timePartitionInterval;
×
160
  }
161

162
  public static long getTimePartition(long time) {
163
    if (timePartitionInterval == -1) {
1✔
164
      initTimePartition();
1✔
165
    }
166
    return time / timePartitionInterval;
1✔
167
  }
168

169
  /** block insertion if the insertion is rejected by memory control */
170
  public static void blockInsertionIfReject(TsFileProcessor tsFileProcessor)
171
      throws WriteProcessRejectException {
172
    long startTime = System.currentTimeMillis();
1✔
173
    while (SystemInfo.getInstance().isRejected()) {
1✔
174
      if (tsFileProcessor != null && tsFileProcessor.shouldFlush()) {
×
175
        break;
×
176
      }
177
      try {
178
        TimeUnit.MILLISECONDS.sleep(config.getCheckPeriodWhenInsertBlocked());
×
179
        if (System.currentTimeMillis() - startTime > config.getMaxWaitingTimeWhenInsertBlocked()) {
×
180
          throw new WriteProcessRejectException(
×
181
              "System rejected over " + (System.currentTimeMillis() - startTime) + "ms");
×
182
        }
183
      } catch (InterruptedException e) {
×
184
        Thread.currentThread().interrupt();
×
185
      }
×
186
    }
187
  }
1✔
188

189
  public void updateTTLInfo(byte[] allTTLInformation) {
190
    if (allTTLInformation == null) {
×
191
      return;
×
192
    }
193
    ByteBuffer buffer = ByteBuffer.wrap(allTTLInformation);
×
194
    int mapSize = ReadWriteIOUtils.readInt(buffer);
×
195
    for (int i = 0; i < mapSize; i++) {
×
196
      ttlMapForRecover.put(
×
197
          Objects.requireNonNull(ReadWriteIOUtils.readString(buffer)),
×
198
          ReadWriteIOUtils.readLong(buffer));
×
199
    }
200
  }
×
201

202
  public boolean isAllSgReady() {
203
    return isAllSgReady.get();
1✔
204
  }
205

206
  public void setAllSgReady(boolean allSgReady) {
207
    isAllSgReady.set(allSgReady);
1✔
208
  }
1✔
209

210
  public void recover() {
211
    setAllSgReady(false);
1✔
212
    cachedThreadPool =
1✔
213
        IoTDBThreadPoolFactory.newCachedThreadPool(ThreadName.STORAGE_ENGINE_CACHED_POOL.getName());
1✔
214

215
    List<Future<Void>> futures = new LinkedList<>();
1✔
216
    asyncRecover(futures);
1✔
217

218
    // wait until wal is recovered
219
    if (!config.isClusterMode()
1✔
220
        || !config.getDataRegionConsensusProtocolClass().equals(ConsensusFactory.RATIS_CONSENSUS)) {
×
221
      try {
222
        WALRecoverManager.getInstance().recover();
1✔
223
      } catch (WALException e) {
×
224
        logger.error("Fail to recover wal.", e);
×
225
      }
1✔
226
    }
227

228
    // operations after all data regions are recovered
229
    Thread recoverEndTrigger =
1✔
230
        new Thread(
231
            () -> {
232
              checkResults(futures, "StorageEngine failed to recover.");
1✔
233
              setAllSgReady(true);
1✔
234
              ttlMapForRecover.clear();
1✔
235
            },
1✔
236
            ThreadName.STORAGE_ENGINE_RECOVER_TRIGGER.getName());
1✔
237
    recoverEndTrigger.start();
1✔
238
  }
1✔
239

240
  private void asyncRecover(List<Future<Void>> futures) {
241
    Map<String, List<DataRegionId>> localDataRegionInfo = getLocalDataRegionInfo();
1✔
242
    localDataRegionInfo.values().forEach(list -> recoverDataRegionNum += list.size());
1✔
243
    readyDataRegionNum = new AtomicInteger(0);
1✔
244
    // init wal recover manager
245
    WALRecoverManager.getInstance()
1✔
246
        .setAllDataRegionScannedLatch(new CountDownLatch(recoverDataRegionNum));
1✔
247
    for (Map.Entry<String, List<DataRegionId>> entry : localDataRegionInfo.entrySet()) {
1✔
248
      String sgName = entry.getKey();
×
249
      for (DataRegionId dataRegionId : entry.getValue()) {
×
250
        Callable<Void> recoverDataRegionTask =
×
251
            () -> {
252
              DataRegion dataRegion = null;
×
253
              try {
254
                dataRegion =
×
255
                    buildNewDataRegion(
×
256
                        sgName,
257
                        dataRegionId,
258
                        ttlMapForRecover.getOrDefault(sgName, Long.MAX_VALUE));
×
259
              } catch (DataRegionException e) {
×
260
                logger.error(
×
261
                    "Failed to recover data region {}[{}]", sgName, dataRegionId.getId(), e);
×
262
              }
×
263
              dataRegionMap.put(dataRegionId, dataRegion);
×
264
              logger.info(
×
265
                  "Data regions have been recovered {}/{}",
266
                  readyDataRegionNum.incrementAndGet(),
×
267
                  recoverDataRegionNum);
×
268
              return null;
×
269
            };
270
        futures.add(cachedThreadPool.submit(recoverDataRegionTask));
×
271
      }
×
272
    }
×
273
  }
1✔
274

275
  /** get StorageGroup -> DataRegionIdList map from data/system directory. */
276
  public Map<String, List<DataRegionId>> getLocalDataRegionInfo() {
277
    File system = SystemFileFactory.INSTANCE.getFile(systemDir);
1✔
278
    File[] sgDirs = system.listFiles();
1✔
279
    Map<String, List<DataRegionId>> localDataRegionInfo = new HashMap<>();
1✔
280
    if (sgDirs == null) {
1✔
281
      return localDataRegionInfo;
×
282
    }
283
    for (File sgDir : sgDirs) {
1✔
284
      if (!sgDir.isDirectory()) {
×
285
        continue;
×
286
      }
287
      String sgName = sgDir.getName();
×
288
      List<DataRegionId> dataRegionIdList = new ArrayList<>();
×
289
      for (File dataRegionDir : sgDir.listFiles()) {
×
290
        if (!dataRegionDir.isDirectory()) {
×
291
          continue;
×
292
        }
293
        dataRegionIdList.add(new DataRegionId(Integer.parseInt(dataRegionDir.getName())));
×
294
      }
295
      localDataRegionInfo.put(sgName, dataRegionIdList);
×
296
    }
297
    return localDataRegionInfo;
1✔
298
  }
299

300
  @Override
301
  public void start() {
302
    // build time Interval to divide time partition
303
    initTimePartition();
1✔
304
    // create systemDir
305
    try {
306
      FileUtils.forceMkdir(SystemFileFactory.INSTANCE.getFile(systemDir));
1✔
307
    } catch (IOException e) {
×
308
      throw new StorageEngineFailureException(e);
×
309
    }
1✔
310

311
    recover();
1✔
312

313
    ttlCheckThread =
1✔
314
        IoTDBThreadPoolFactory.newSingleThreadScheduledExecutor(ThreadName.TTL_CHECK.getName());
1✔
315
    ScheduledExecutorUtil.safelyScheduleAtFixedRate(
1✔
316
        ttlCheckThread,
317
        this::checkTTL,
318
        TTL_CHECK_INTERVAL,
319
        TTL_CHECK_INTERVAL,
320
        TimeUnit.MILLISECONDS);
321
    logger.info("start ttl check thread successfully.");
1✔
322

323
    startTimedService();
1✔
324
  }
1✔
325

326
  private void checkTTL() {
327
    try {
328
      for (DataRegion dataRegion : dataRegionMap.values()) {
×
329
        if (dataRegion != null) {
×
330
          dataRegion.checkFilesTTL();
×
331
        }
332
      }
×
333
    } catch (ConcurrentModificationException e) {
×
334
      // ignore
335
    } catch (Exception e) {
×
336
      logger.error("An error occurred when checking TTL", e);
×
337
    }
×
338
  }
×
339

340
  private void startTimedService() {
341
    // timed flush sequence memtable
342
    if (config.isEnableTimedFlushSeqMemtable()) {
1✔
343
      seqMemtableTimedFlushCheckThread =
1✔
344
          IoTDBThreadPoolFactory.newSingleThreadScheduledExecutor(
1✔
345
              ThreadName.TIMED_FLUSH_SEQ_MEMTABLE.getName());
1✔
346
      ScheduledExecutorUtil.safelyScheduleAtFixedRate(
1✔
347
          seqMemtableTimedFlushCheckThread,
348
          this::timedFlushSeqMemTable,
349
          config.getSeqMemtableFlushCheckInterval(),
1✔
350
          config.getSeqMemtableFlushCheckInterval(),
1✔
351
          TimeUnit.MILLISECONDS);
352
      logger.info("start sequence memtable timed flush check thread successfully.");
1✔
353
    }
354
    // timed flush unsequence memtable
355
    if (config.isEnableTimedFlushUnseqMemtable()) {
1✔
356
      unseqMemtableTimedFlushCheckThread =
1✔
357
          IoTDBThreadPoolFactory.newSingleThreadScheduledExecutor(
1✔
358
              ThreadName.TIMED_FLUSH_UNSEQ_MEMTABLE.getName());
1✔
359
      ScheduledExecutorUtil.safelyScheduleAtFixedRate(
1✔
360
          unseqMemtableTimedFlushCheckThread,
361
          this::timedFlushUnseqMemTable,
362
          config.getUnseqMemtableFlushCheckInterval(),
1✔
363
          config.getUnseqMemtableFlushCheckInterval(),
1✔
364
          TimeUnit.MILLISECONDS);
365
      logger.info("start unsequence memtable timed flush check thread successfully.");
1✔
366
    }
367
  }
1✔
368

369
  private void timedFlushSeqMemTable() {
370
    for (DataRegion dataRegion : dataRegionMap.values()) {
×
371
      if (dataRegion != null) {
×
372
        dataRegion.timedFlushSeqMemTable();
×
373
      }
374
    }
×
375
  }
×
376

377
  private void timedFlushUnseqMemTable() {
378
    for (DataRegion dataRegion : dataRegionMap.values()) {
×
379
      if (dataRegion != null) {
×
380
        dataRegion.timedFlushUnseqMemTable();
×
381
      }
382
    }
×
383
  }
×
384

385
  @Override
386
  public void stop() {
387
    for (DataRegion dataRegion : dataRegionMap.values()) {
1✔
388
      if (dataRegion != null) {
×
389
        ThreadUtils.stopThreadPool(
×
390
            dataRegion.getTimedCompactionScheduleTask(), ThreadName.COMPACTION_SCHEDULE);
×
391
      }
392
    }
×
393
    syncCloseAllProcessor();
1✔
394
    ThreadUtils.stopThreadPool(ttlCheckThread, ThreadName.TTL_CHECK);
1✔
395
    ThreadUtils.stopThreadPool(
1✔
396
        seqMemtableTimedFlushCheckThread, ThreadName.TIMED_FLUSH_SEQ_MEMTABLE);
397
    ThreadUtils.stopThreadPool(
1✔
398
        unseqMemtableTimedFlushCheckThread, ThreadName.TIMED_FLUSH_UNSEQ_MEMTABLE);
399
    if (cachedThreadPool != null) {
1✔
400
      cachedThreadPool.shutdownNow();
1✔
401
    }
402
    dataRegionMap.clear();
1✔
403
  }
1✔
404

405
  @Override
406
  public void shutdown(long milliseconds) throws ShutdownException {
407
    try {
408
      for (DataRegion dataRegion : dataRegionMap.values()) {
×
409
        ThreadUtils.stopThreadPool(
×
410
            dataRegion.getTimedCompactionScheduleTask(), ThreadName.COMPACTION_SCHEDULE);
×
411
      }
×
412
      forceCloseAllProcessor();
×
413
    } catch (TsFileProcessorException e) {
×
414
      throw new ShutdownException(e);
×
415
    }
×
416
    shutdownTimedService(ttlCheckThread, "TTlCheckThread");
×
417
    shutdownTimedService(seqMemtableTimedFlushCheckThread, "SeqMemtableTimedFlushCheckThread");
×
418
    shutdownTimedService(unseqMemtableTimedFlushCheckThread, "UnseqMemtableTimedFlushCheckThread");
×
419
    cachedThreadPool.shutdownNow();
×
420
    dataRegionMap.clear();
×
421
  }
×
422

423
  private void shutdownTimedService(ScheduledExecutorService pool, String poolName) {
424
    if (pool != null) {
×
425
      pool.shutdownNow();
×
426
      try {
427
        pool.awaitTermination(30, TimeUnit.SECONDS);
×
428
      } catch (InterruptedException e) {
×
429
        logger.warn("{} still doesn't exit after 30s", poolName);
×
430
        Thread.currentThread().interrupt();
×
431
      }
×
432
    }
433
  }
×
434

435
  @Override
436
  public ServiceType getID() {
437
    return ServiceType.STORAGE_ENGINE_SERVICE;
×
438
  }
439

440
  /**
441
   * build a new data region
442
   *
443
   * @param dataRegionId data region id e.g. 1
444
   * @param logicalStorageGroupName database name e.g. root.sg1
445
   */
446
  public DataRegion buildNewDataRegion(
447
      String logicalStorageGroupName, DataRegionId dataRegionId, long ttl)
448
      throws DataRegionException {
449
    DataRegion dataRegion;
450
    logger.info(
×
451
        "construct a data region instance, the database is {}, Thread is {}",
452
        logicalStorageGroupName,
453
        Thread.currentThread().getId());
×
454
    dataRegion =
×
455
        new DataRegion(
456
            systemDir + File.separator + logicalStorageGroupName,
457
            String.valueOf(dataRegionId.getId()),
×
458
            fileFlushPolicy,
459
            logicalStorageGroupName);
460
    WRITING_METRICS.createFlushingMemTableStatusMetrics(dataRegionId);
×
461
    WRITING_METRICS.createDataRegionMemoryCostMetrics(dataRegion);
×
462
    dataRegion.setDataTTLWithTimePrecisionCheck(ttl);
×
463
    dataRegion.setCustomFlushListeners(customFlushListeners);
×
464
    dataRegion.setCustomCloseFileListeners(customCloseFileListeners);
×
465
    return dataRegion;
×
466
  }
467

468
  /** Write data into DataRegion. For standalone mode only. */
469
  public TSStatus write(DataRegionId groupId, PlanNode planNode) {
470
    return planNode.accept(new DataExecutionVisitor(), dataRegionMap.get(groupId));
×
471
  }
472

473
  /** This function is just for unit test. */
474
  @TestOnly
475
  public synchronized void reset() {
476
    dataRegionMap.clear();
×
477
  }
×
478

479
  /** flush command Sync asyncCloseOneProcessor all file node processors. */
480
  public void syncCloseAllProcessor() {
481
    logger.info("Start closing all database processor");
1✔
482
    List<Future<Void>> tasks = new ArrayList<>();
1✔
483
    for (DataRegion dataRegion : dataRegionMap.values()) {
1✔
484
      if (dataRegion != null) {
×
485
        tasks.add(
×
486
            cachedThreadPool.submit(
×
487
                () -> {
488
                  dataRegion.syncCloseAllWorkingTsFileProcessors();
×
489
                  return null;
×
490
                }));
491
      }
492
    }
×
493
    checkResults(tasks, "Failed to sync close processor.");
1✔
494
  }
1✔
495

496
  public void forceCloseAllProcessor() throws TsFileProcessorException {
497
    logger.info("Start force closing all database processor");
×
498
    List<Future<Void>> tasks = new ArrayList<>();
×
499
    for (DataRegion dataRegion : dataRegionMap.values()) {
×
500
      if (dataRegion != null) {
×
501
        tasks.add(
×
502
            cachedThreadPool.submit(
×
503
                () -> {
504
                  dataRegion.forceCloseAllWorkingTsFileProcessors();
×
505
                  return null;
×
506
                }));
507
      }
508
    }
×
509
    checkResults(tasks, "Failed to force close processor.");
×
510
  }
×
511

512
  public void closeStorageGroupProcessor(String storageGroupPath, boolean isSeq) {
513
    List<Future<Void>> tasks = new ArrayList<>();
×
514
    for (DataRegion dataRegion : dataRegionMap.values()) {
×
515
      if (dataRegion.getDatabaseName().equals(storageGroupPath)) {
×
516
        if (isSeq) {
×
517
          for (TsFileProcessor tsFileProcessor : dataRegion.getWorkSequenceTsFileProcessors()) {
×
518
            tasks.add(
×
519
                cachedThreadPool.submit(
×
520
                    () -> {
521
                      dataRegion.syncCloseOneTsFileProcessor(isSeq, tsFileProcessor);
×
522
                      return null;
×
523
                    }));
524
          }
×
525
        } else {
526
          for (TsFileProcessor tsFileProcessor : dataRegion.getWorkUnsequenceTsFileProcessors()) {
×
527
            tasks.add(
×
528
                cachedThreadPool.submit(
×
529
                    () -> {
530
                      dataRegion.syncCloseOneTsFileProcessor(isSeq, tsFileProcessor);
×
531
                      return null;
×
532
                    }));
533
          }
×
534
        }
535
      }
536
    }
×
537
    checkResults(tasks, "Failed to close database processor.");
×
538
  }
×
539

540
  private <V> void checkResults(List<Future<V>> tasks, String errorMsg) {
541
    for (Future<V> task : tasks) {
1✔
542
      try {
543
        task.get();
×
544
      } catch (ExecutionException e) {
×
545
        throw new StorageEngineFailureException(errorMsg, e);
×
546
      } catch (InterruptedException e) {
×
547
        Thread.currentThread().interrupt();
×
548
        throw new StorageEngineFailureException(errorMsg, e);
×
549
      }
×
550
    }
×
551
  }
1✔
552

553
  /**
554
   * merge all databases.
555
   *
556
   * @throws StorageEngineException StorageEngineException
557
   */
558
  public void mergeAll() throws StorageEngineException {
559
    if (CommonDescriptor.getInstance().getConfig().isReadOnly()) {
×
560
      throw new StorageEngineException("Current system mode is read only, does not support merge");
×
561
    }
562
    dataRegionMap.values().forEach(DataRegion::compact);
×
563
  }
×
564

565
  public void operateFlush(TFlushReq req) {
566
    if (req.storageGroups == null) {
×
567
      StorageEngine.getInstance().syncCloseAllProcessor();
×
568
      WALManager.getInstance().deleteOutdatedWALFiles();
×
569
    } else {
570
      for (String storageGroup : req.storageGroups) {
×
571
        if (req.isSeq == null) {
×
572
          StorageEngine.getInstance().closeStorageGroupProcessor(storageGroup, true);
×
573
          StorageEngine.getInstance().closeStorageGroupProcessor(storageGroup, false);
×
574
        } else {
575
          StorageEngine.getInstance()
×
576
              .closeStorageGroupProcessor(storageGroup, Boolean.parseBoolean(req.isSeq));
×
577
        }
578
      }
×
579
    }
580
  }
×
581

582
  public void clearCache() {
583
    ChunkCache.getInstance().clear();
×
584
    TimeSeriesMetadataCache.getInstance().clear();
×
585
    BloomFilterCache.getInstance().clear();
×
586
  }
×
587

588
  public void setTTL(List<DataRegionId> dataRegionIdList, long dataTTL) {
589
    for (DataRegionId dataRegionId : dataRegionIdList) {
×
590
      DataRegion dataRegion = dataRegionMap.get(dataRegionId);
×
591
      if (dataRegion != null) {
×
592
        dataRegion.setDataTTLWithTimePrecisionCheck(dataTTL);
×
593
      }
594
    }
×
595
  }
×
596

597
  /**
598
   * Add a listener to listen flush start/end events. Notice that this addition only applies to
599
   * TsFileProcessors created afterwards.
600
   *
601
   * @param listener
602
   */
603
  public void registerFlushListener(FlushListener listener) {
604
    customFlushListeners.add(listener);
×
605
  }
×
606

607
  /**
608
   * Add a listener to listen file close events. Notice that this addition only applies to
609
   * TsFileProcessors created afterwards.
610
   *
611
   * @param listener
612
   */
613
  public void registerCloseFileListener(CloseFileListener listener) {
614
    customCloseFileListeners.add(listener);
×
615
  }
×
616

617
  private void makeSureNoOldRegion(DataRegionId regionId) {
618
    while (deletingDataRegionMap.containsKey(regionId)) {
×
619
      DataRegion oldRegion = deletingDataRegionMap.get(regionId);
×
620
      if (oldRegion != null) {
×
621
        oldRegion.waitForDeleted();
×
622
      }
623
    }
×
624
  }
×
625

626
  // When registering a new region, the coordinator needs to register the corresponding region with
627
  // the local storageengine before adding the corresponding consensusGroup to the consensus layer
628
  public DataRegion createDataRegion(DataRegionId regionId, String sg, long ttl)
629
      throws DataRegionException {
630
    makeSureNoOldRegion(regionId);
×
631
    AtomicReference<DataRegionException> exceptionAtomicReference = new AtomicReference<>(null);
×
632
    DataRegion dataRegion =
×
633
        dataRegionMap.computeIfAbsent(
×
634
            regionId,
635
            x -> {
636
              try {
637
                return buildNewDataRegion(sg, x, ttl);
×
638
              } catch (DataRegionException e) {
×
639
                exceptionAtomicReference.set(e);
×
640
              }
641
              return null;
×
642
            });
643
    if (exceptionAtomicReference.get() != null) {
×
644
      throw exceptionAtomicReference.get();
×
645
    }
646
    return dataRegion;
×
647
  }
648

649
  public void deleteDataRegion(DataRegionId regionId) {
650
    if (!dataRegionMap.containsKey(regionId) || deletingDataRegionMap.containsKey(regionId)) {
1✔
651
      return;
1✔
652
    }
653
    DataRegion region =
1✔
654
        deletingDataRegionMap.computeIfAbsent(regionId, k -> dataRegionMap.remove(regionId));
1✔
655
    if (region != null) {
1✔
656
      region.markDeleted();
1✔
657
      WRITING_METRICS.removeDataRegionMemoryCostMetrics(regionId);
1✔
658
      WRITING_METRICS.removeFlushingMemTableStatusMetrics(regionId);
1✔
659
      try {
660
        region.abortCompaction();
1✔
661
        region.syncDeleteDataFiles();
1✔
662
        region.deleteFolder(systemDir);
1✔
663
        if (config.isClusterMode()
1✔
664
            && config
665
                .getDataRegionConsensusProtocolClass()
×
666
                .equals(ConsensusFactory.IOT_CONSENSUS)) {
×
667
          // delete wal
668
          WALManager.getInstance()
×
669
              .deleteWALNode(
×
670
                  region.getDatabaseName() + FILE_NAME_SEPARATOR + region.getDataRegionId());
×
671
          // delete snapshot
672
          for (String dataDir : config.getLocalDataDirs()) {
×
673
            File regionSnapshotDir =
×
674
                new File(
675
                    dataDir + File.separator + IoTDBConstant.SNAPSHOT_FOLDER_NAME,
676
                    region.getDatabaseName() + FILE_NAME_SEPARATOR + regionId.getId());
×
677
            if (regionSnapshotDir.exists()) {
×
678
              try {
679
                FileUtils.deleteDirectory(regionSnapshotDir);
×
680
              } catch (IOException e) {
×
681
                logger.error("Failed to delete snapshot dir {}", regionSnapshotDir, e);
×
682
              }
×
683
            }
684
          }
685
        }
686
      } catch (Exception e) {
×
687
        logger.error(
×
688
            "Error occurs when deleting data region {}-{}",
689
            region.getDatabaseName(),
×
690
            region.getDataRegionId(),
×
691
            e);
692
      } finally {
693
        deletingDataRegionMap.remove(regionId);
1✔
694
      }
695
    }
696
  }
1✔
697

698
  /**
699
   * run the runnable if the region is absent. if the region is present, do nothing.
700
   *
701
   * <p>we don't use computeIfAbsent because we don't want to create a new region if the region is
702
   * absent, we just want to run the runnable in a synchronized way.
703
   *
704
   * @return true if the region is absent and the runnable is run. false if the region is present.
705
   */
706
  public boolean runIfAbsent(DataRegionId regionId, Runnable runnable) {
707
    final AtomicBoolean result = new AtomicBoolean(false);
×
708
    dataRegionMap.computeIfAbsent(
×
709
        regionId,
710
        k -> {
711
          runnable.run();
×
712
          result.set(true);
×
713
          return null;
×
714
        });
715
    return result.get();
×
716
  }
717

718
  /**
719
   * run the consumer if the region is present. if the region is absent, do nothing.
720
   *
721
   * <p>we don't use computeIfPresent because we don't want to remove the region if the consumer
722
   * returns null, we just want to run the consumer in a synchronized way.
723
   *
724
   * @return true if the region is present and the consumer is run. false if the region is absent.
725
   */
726
  public boolean runIfPresent(DataRegionId regionId, Consumer<DataRegion> consumer) {
727
    final AtomicBoolean result = new AtomicBoolean(false);
×
728
    dataRegionMap.computeIfPresent(
×
729
        regionId,
730
        (id, region) -> {
731
          consumer.accept(region);
×
732
          result.set(true);
×
733
          return region;
×
734
        });
735
    return result.get();
×
736
  }
737

738
  public DataRegion getDataRegion(DataRegionId regionId) {
739
    return dataRegionMap.get(regionId);
1✔
740
  }
741

742
  public List<DataRegion> getAllDataRegions() {
743
    return new ArrayList<>(dataRegionMap.values());
1✔
744
  }
745

746
  public List<DataRegionId> getAllDataRegionIds() {
747
    return new ArrayList<>(dataRegionMap.keySet());
1✔
748
  }
749

750
  /** This method is not thread-safe */
751
  public void setDataRegion(DataRegionId regionId, DataRegion newRegion) {
752
    if (dataRegionMap.containsKey(regionId)) {
1✔
753
      DataRegion oldRegion = dataRegionMap.get(regionId);
×
754
      oldRegion.syncCloseAllWorkingTsFileProcessors();
×
755
      oldRegion.abortCompaction();
×
756
    }
757
    dataRegionMap.put(regionId, newRegion);
1✔
758
  }
1✔
759

760
  public TSStatus setTTL(TSetTTLReq req) {
761
    Map<String, List<DataRegionId>> localDataRegionInfo =
762
        StorageEngine.getInstance().getLocalDataRegionInfo();
×
763
    List<DataRegionId> dataRegionIdList = new ArrayList<>();
×
764
    req.storageGroupPathPattern.forEach(
×
765
        storageGroup -> dataRegionIdList.addAll(localDataRegionInfo.get(storageGroup)));
×
766
    for (DataRegionId dataRegionId : dataRegionIdList) {
×
767
      DataRegion dataRegion = dataRegionMap.get(dataRegionId);
×
768
      if (dataRegion != null) {
×
769
        dataRegion.setDataTTLWithTimePrecisionCheck(req.TTL);
×
770
      }
771
    }
×
772
    return RpcUtils.getStatus(TSStatusCode.SUCCESS_STATUS);
×
773
  }
774

775
  public TsFileFlushPolicy getFileFlushPolicy() {
776
    return fileFlushPolicy;
1✔
777
  }
778

779
  public TSStatus writeLoadTsFileNode(
780
      DataRegionId dataRegionId, LoadTsFilePieceNode pieceNode, String uuid) {
781
    TSStatus status = new TSStatus();
×
782

783
    try {
784
      getLoadTsFileManager().writeToDataRegion(getDataRegion(dataRegionId), pieceNode, uuid);
×
785
    } catch (IOException e) {
×
786
      logger.error(
×
787
          String.format(
×
788
              "IO error when writing piece node of TsFile %s to DataRegion %s.",
789
              pieceNode.getTsFile(), dataRegionId),
×
790
          e);
791
      status.setCode(TSStatusCode.LOAD_FILE_ERROR.getStatusCode());
×
792
      status.setMessage(e.getMessage());
×
793
      return status;
×
794
    }
×
795

796
    return RpcUtils.SUCCESS_STATUS;
×
797
  }
798

799
  public TSStatus executeLoadCommand(
800
      LoadTsFileScheduler.LoadCommand loadCommand, String uuid, boolean isGeneratedByPipe) {
801
    TSStatus status = new TSStatus();
×
802

803
    try {
804
      switch (loadCommand) {
×
805
        case EXECUTE:
806
          if (getLoadTsFileManager().loadAll(uuid, isGeneratedByPipe)) {
×
807
            status = RpcUtils.SUCCESS_STATUS;
×
808
          } else {
809
            status.setCode(TSStatusCode.LOAD_FILE_ERROR.getStatusCode());
×
810
            status.setMessage(
×
811
                String.format(
×
812
                    "No load TsFile uuid %s recorded for execute load command %s.",
813
                    uuid, loadCommand));
814
          }
815
          break;
×
816
        case ROLLBACK:
817
          if (getLoadTsFileManager().deleteAll(uuid)) {
×
818
            status = RpcUtils.SUCCESS_STATUS;
×
819
          } else {
820
            status.setCode(TSStatusCode.LOAD_FILE_ERROR.getStatusCode());
×
821
            status.setMessage(
×
822
                String.format(
×
823
                    "No load TsFile uuid %s recorded for execute load command %s.",
824
                    uuid, loadCommand));
825
          }
826
          break;
×
827
        default:
828
          status.setCode(TSStatusCode.ILLEGAL_PARAMETER.getStatusCode());
×
829
          status.setMessage(String.format("Wrong load command %s.", loadCommand));
×
830
      }
831
    } catch (IOException | LoadFileException e) {
×
832
      logger.error(String.format("Execute load command %s error.", loadCommand), e);
×
833
      status.setCode(TSStatusCode.LOAD_FILE_ERROR.getStatusCode());
×
834
      status.setMessage(e.getMessage());
×
835
    }
×
836

837
    return status;
×
838
  }
839

840
  /** reboot timed flush sequence/unsequence memetable thread */
841
  public void rebootTimedService() throws ShutdownException {
842
    logger.info("Start rebooting all timed service.");
1✔
843

844
    // exclude ttl check thread
845
    stopTimedServiceAndThrow(seqMemtableTimedFlushCheckThread, "SeqMemtableTimedFlushCheckThread");
1✔
846
    stopTimedServiceAndThrow(
1✔
847
        unseqMemtableTimedFlushCheckThread, "UnseqMemtableTimedFlushCheckThread");
848

849
    logger.info("Stop all timed service successfully, and now restart them.");
1✔
850

851
    startTimedService();
1✔
852

853
    logger.info("Reboot all timed service successfully");
1✔
854
  }
1✔
855

856
  private void stopTimedServiceAndThrow(ScheduledExecutorService pool, String poolName)
857
      throws ShutdownException {
858
    if (pool != null) {
1✔
859
      pool.shutdownNow();
1✔
860
      try {
861
        pool.awaitTermination(30, TimeUnit.SECONDS);
1✔
862
      } catch (InterruptedException e) {
×
863
        logger.warn("{} still doesn't exit after 30s", poolName);
×
864
        throw new ShutdownException(e);
×
865
      }
1✔
866
    }
867
  }
1✔
868

869
  public void getDiskSizeByDataRegion(
870
      Map<Integer, Long> dataRegionDisk, List<Integer> dataRegionIds) {
871
    dataRegionMap.forEach(
1✔
872
        (dataRegionId, dataRegion) -> {
873
          if (dataRegionIds.contains(dataRegionId.getId())) {
×
874
            dataRegionDisk.put(dataRegionId.getId(), dataRegion.countRegionDiskSize());
×
875
          }
876
        });
×
877
  }
1✔
878

879
  private LoadTsFileManager getLoadTsFileManager() {
880
    if (loadTsFileManager == null) {
×
881
      synchronized (LoadTsFileManager.class) {
×
882
        if (loadTsFileManager == null) {
×
883
          loadTsFileManager = new LoadTsFileManager();
×
884
        }
885
      }
×
886
    }
887
    return loadTsFileManager;
×
888
  }
889

890
  static class InstanceHolder {
891

892
    private static final StorageEngine INSTANCE = new StorageEngine();
1✔
893

894
    private InstanceHolder() {
895
      // forbidding instantiation
896
    }
897
  }
898
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc