• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

apache / iotdb / #10018

07 Sep 2023 05:00AM UTC coverage: 47.717% (+0.03%) from 47.691%
#10018

push

travis_ci

web-flow
Pipe: Fix ConcurrentModificationException caused by concurrently iterating through CachedSchemaPatternMatcher.extractors when an PipeHeartbeatEvent is being assigned (#11074) (#11075)

* try to fix ConcurrentModificationException when assigning PipeHeartbeatEvent

* Update CachedSchemaPatternMatcher.java

---------

Co-authored-by: 马子坤 <55695098+DanielWang2035@users.noreply.github.com>
(cherry picked from commit ac0dd9d31)

1 of 1 new or added line in 1 file covered. (100.0%)

80262 of 168204 relevant lines covered (47.72%)

0.48 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

32.71
/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/StorageEngine.java
1
/*
2
 * Licensed to the Apache Software Foundation (ASF) under one
3
 * or more contributor license agreements.  See the NOTICE file
4
 * distributed with this work for additional information
5
 * regarding copyright ownership.  The ASF licenses this file
6
 * to you under the Apache License, Version 2.0 (the
7
 * "License"); you may not use this file except in compliance
8
 * with the License.  You may obtain a copy of the License at
9
 *
10
 *     http://www.apache.org/licenses/LICENSE-2.0
11
 *
12
 * Unless required by applicable law or agreed to in writing,
13
 * software distributed under the License is distributed on an
14
 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15
 * KIND, either express or implied.  See the License for the
16
 * specific language governing permissions and limitations
17
 * under the License.
18
 */
19
package org.apache.iotdb.db.storageengine;
20

21
import org.apache.iotdb.common.rpc.thrift.TFlushReq;
22
import org.apache.iotdb.common.rpc.thrift.TSStatus;
23
import org.apache.iotdb.common.rpc.thrift.TSetTTLReq;
24
import org.apache.iotdb.commons.concurrent.IoTDBThreadPoolFactory;
25
import org.apache.iotdb.commons.concurrent.ThreadName;
26
import org.apache.iotdb.commons.concurrent.threadpool.ScheduledExecutorUtil;
27
import org.apache.iotdb.commons.conf.CommonDescriptor;
28
import org.apache.iotdb.commons.conf.IoTDBConstant;
29
import org.apache.iotdb.commons.consensus.DataRegionId;
30
import org.apache.iotdb.commons.exception.ShutdownException;
31
import org.apache.iotdb.commons.file.SystemFileFactory;
32
import org.apache.iotdb.commons.service.IService;
33
import org.apache.iotdb.commons.service.ServiceType;
34
import org.apache.iotdb.commons.utils.TestOnly;
35
import org.apache.iotdb.consensus.ConsensusFactory;
36
import org.apache.iotdb.db.conf.IoTDBConfig;
37
import org.apache.iotdb.db.conf.IoTDBDescriptor;
38
import org.apache.iotdb.db.consensus.statemachine.dataregion.DataExecutionVisitor;
39
import org.apache.iotdb.db.exception.DataRegionException;
40
import org.apache.iotdb.db.exception.LoadFileException;
41
import org.apache.iotdb.db.exception.StorageEngineException;
42
import org.apache.iotdb.db.exception.TsFileProcessorException;
43
import org.apache.iotdb.db.exception.WriteProcessRejectException;
44
import org.apache.iotdb.db.exception.runtime.StorageEngineFailureException;
45
import org.apache.iotdb.db.queryengine.execution.load.LoadTsFileManager;
46
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNode;
47
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.load.LoadTsFilePieceNode;
48
import org.apache.iotdb.db.queryengine.plan.scheduler.load.LoadTsFileScheduler;
49
import org.apache.iotdb.db.service.metrics.WritingMetrics;
50
import org.apache.iotdb.db.storageengine.buffer.BloomFilterCache;
51
import org.apache.iotdb.db.storageengine.buffer.ChunkCache;
52
import org.apache.iotdb.db.storageengine.buffer.TimeSeriesMetadataCache;
53
import org.apache.iotdb.db.storageengine.dataregion.DataRegion;
54
import org.apache.iotdb.db.storageengine.dataregion.flush.CloseFileListener;
55
import org.apache.iotdb.db.storageengine.dataregion.flush.FlushListener;
56
import org.apache.iotdb.db.storageengine.dataregion.flush.TsFileFlushPolicy;
57
import org.apache.iotdb.db.storageengine.dataregion.flush.TsFileFlushPolicy.DirectFlushPolicy;
58
import org.apache.iotdb.db.storageengine.dataregion.memtable.TsFileProcessor;
59
import org.apache.iotdb.db.storageengine.dataregion.wal.WALManager;
60
import org.apache.iotdb.db.storageengine.dataregion.wal.exception.WALException;
61
import org.apache.iotdb.db.storageengine.dataregion.wal.recover.WALRecoverManager;
62
import org.apache.iotdb.db.storageengine.rescon.memory.SystemInfo;
63
import org.apache.iotdb.db.utils.ThreadUtils;
64
import org.apache.iotdb.rpc.RpcUtils;
65
import org.apache.iotdb.rpc.TSStatusCode;
66
import org.apache.iotdb.tsfile.utils.FilePathUtils;
67
import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
68

69
import org.apache.commons.io.FileUtils;
70
import org.slf4j.Logger;
71
import org.slf4j.LoggerFactory;
72

73
import java.io.File;
74
import java.io.IOException;
75
import java.nio.ByteBuffer;
76
import java.util.ArrayList;
77
import java.util.ConcurrentModificationException;
78
import java.util.HashMap;
79
import java.util.LinkedList;
80
import java.util.List;
81
import java.util.Map;
82
import java.util.Objects;
83
import java.util.concurrent.Callable;
84
import java.util.concurrent.ConcurrentHashMap;
85
import java.util.concurrent.CountDownLatch;
86
import java.util.concurrent.ExecutionException;
87
import java.util.concurrent.ExecutorService;
88
import java.util.concurrent.Future;
89
import java.util.concurrent.ScheduledExecutorService;
90
import java.util.concurrent.TimeUnit;
91
import java.util.concurrent.atomic.AtomicBoolean;
92
import java.util.concurrent.atomic.AtomicInteger;
93
import java.util.concurrent.atomic.AtomicReference;
94
import java.util.function.Consumer;
95

96
import static org.apache.iotdb.commons.conf.IoTDBConstant.FILE_NAME_SEPARATOR;
97

98
public class StorageEngine implements IService {
99
  private static final Logger logger = LoggerFactory.getLogger(StorageEngine.class);
1✔
100

101
  private static final IoTDBConfig config = IoTDBDescriptor.getInstance().getConfig();
1✔
102
  private static final long TTL_CHECK_INTERVAL = 60 * 1000L;
103
  private static final WritingMetrics WRITING_METRICS = WritingMetrics.getInstance();
1✔
104

105
  /** Time range for dividing database, the time unit is the same with IoTDB's TimestampPrecision */
106
  private static long timePartitionInterval = -1;
1✔
107

108
  /**
109
   * a folder (system/databases/ by default) that persist system info. Each database will have a
110
   * subfolder under the systemDir.
111
   */
112
  private final String systemDir =
1✔
113
      FilePathUtils.regularizePath(config.getSystemDir()) + "databases";
1✔
114

115
  /** DataRegionId -> DataRegion */
116
  private final ConcurrentHashMap<DataRegionId, DataRegion> dataRegionMap =
1✔
117
      new ConcurrentHashMap<>();
118

119
  /** DataRegionId -> DataRegion which is being deleted */
120
  private final ConcurrentHashMap<DataRegionId, DataRegion> deletingDataRegionMap =
1✔
121
      new ConcurrentHashMap<>();
122

123
  /** Database name -> ttl, for region recovery only */
124
  private final Map<String, Long> ttlMapForRecover = new ConcurrentHashMap<>();
1✔
125

126
  /** number of ready data region */
127
  private AtomicInteger readyDataRegionNum;
128

129
  private AtomicBoolean isAllSgReady = new AtomicBoolean(false);
1✔
130

131
  private ScheduledExecutorService ttlCheckThread;
132
  private ScheduledExecutorService seqMemtableTimedFlushCheckThread;
133
  private ScheduledExecutorService unseqMemtableTimedFlushCheckThread;
134

135
  private TsFileFlushPolicy fileFlushPolicy = new DirectFlushPolicy();
1✔
136
  /** used to do short-lived asynchronous tasks */
137
  private ExecutorService cachedThreadPool;
138
  // add customized listeners here for flush and close events
139
  private List<CloseFileListener> customCloseFileListeners = new ArrayList<>();
1✔
140
  private List<FlushListener> customFlushListeners = new ArrayList<>();
1✔
141
  private int recoverDataRegionNum = 0;
1✔
142

143
  private LoadTsFileManager loadTsFileManager;
144

145
  private StorageEngine() {}
1✔
146

147
  public static StorageEngine getInstance() {
148
    return InstanceHolder.INSTANCE;
1✔
149
  }
150

151
  private static void initTimePartition() {
152
    timePartitionInterval = CommonDescriptor.getInstance().getConfig().getTimePartitionInterval();
1✔
153
  }
1✔
154

155
  public static long getTimePartitionInterval() {
156
    if (timePartitionInterval == -1) {
1✔
157
      initTimePartition();
1✔
158
    }
159
    return timePartitionInterval;
1✔
160
  }
161

162
  public static long getTimePartition(long time) {
163
    if (timePartitionInterval == -1) {
1✔
164
      initTimePartition();
1✔
165
    }
166
    return time > 0 || time % timePartitionInterval == 0
1✔
167
        ? time / timePartitionInterval
1✔
168
        : time / timePartitionInterval - 1;
1✔
169
  }
170

171
  /** block insertion if the insertion is rejected by memory control */
172
  public static void blockInsertionIfReject(TsFileProcessor tsFileProcessor)
173
      throws WriteProcessRejectException {
174
    long startTime = System.currentTimeMillis();
1✔
175
    while (SystemInfo.getInstance().isRejected()) {
1✔
176
      if (tsFileProcessor != null && tsFileProcessor.shouldFlush()) {
×
177
        break;
×
178
      }
179
      try {
180
        TimeUnit.MILLISECONDS.sleep(config.getCheckPeriodWhenInsertBlocked());
×
181
        if (System.currentTimeMillis() - startTime > config.getMaxWaitingTimeWhenInsertBlocked()) {
×
182
          throw new WriteProcessRejectException(
×
183
              "System rejected over " + (System.currentTimeMillis() - startTime) + "ms");
×
184
        }
185
      } catch (InterruptedException e) {
×
186
        Thread.currentThread().interrupt();
×
187
      }
×
188
    }
189
  }
1✔
190

191
  public void updateTTLInfo(byte[] allTTLInformation) {
192
    if (allTTLInformation == null) {
×
193
      return;
×
194
    }
195
    ByteBuffer buffer = ByteBuffer.wrap(allTTLInformation);
×
196
    int mapSize = ReadWriteIOUtils.readInt(buffer);
×
197
    for (int i = 0; i < mapSize; i++) {
×
198
      ttlMapForRecover.put(
×
199
          Objects.requireNonNull(ReadWriteIOUtils.readString(buffer)),
×
200
          ReadWriteIOUtils.readLong(buffer));
×
201
    }
202
  }
×
203

204
  public boolean isAllSgReady() {
205
    return isAllSgReady.get();
1✔
206
  }
207

208
  public void setAllSgReady(boolean allSgReady) {
209
    isAllSgReady.set(allSgReady);
1✔
210
  }
1✔
211

212
  public void recover() {
213
    setAllSgReady(false);
1✔
214
    cachedThreadPool =
1✔
215
        IoTDBThreadPoolFactory.newCachedThreadPool(ThreadName.STORAGE_ENGINE_CACHED_POOL.getName());
1✔
216

217
    List<Future<Void>> futures = new LinkedList<>();
1✔
218
    asyncRecover(futures);
1✔
219

220
    // wait until wal is recovered
221
    if (!config.isClusterMode()
1✔
222
        || !config.getDataRegionConsensusProtocolClass().equals(ConsensusFactory.RATIS_CONSENSUS)) {
×
223
      try {
224
        WALRecoverManager.getInstance().recover();
1✔
225
      } catch (WALException e) {
×
226
        logger.error("Fail to recover wal.", e);
×
227
      }
1✔
228
    }
229

230
    // operations after all data regions are recovered
231
    Thread recoverEndTrigger =
1✔
232
        new Thread(
233
            () -> {
234
              checkResults(futures, "StorageEngine failed to recover.");
1✔
235
              setAllSgReady(true);
1✔
236
              ttlMapForRecover.clear();
1✔
237
            },
1✔
238
            ThreadName.STORAGE_ENGINE_RECOVER_TRIGGER.getName());
1✔
239
    recoverEndTrigger.start();
1✔
240
  }
1✔
241

242
  private void asyncRecover(List<Future<Void>> futures) {
243
    Map<String, List<DataRegionId>> localDataRegionInfo = getLocalDataRegionInfo();
1✔
244
    localDataRegionInfo.values().forEach(list -> recoverDataRegionNum += list.size());
1✔
245
    readyDataRegionNum = new AtomicInteger(0);
1✔
246
    // init wal recover manager
247
    WALRecoverManager.getInstance()
1✔
248
        .setAllDataRegionScannedLatch(new CountDownLatch(recoverDataRegionNum));
1✔
249
    for (Map.Entry<String, List<DataRegionId>> entry : localDataRegionInfo.entrySet()) {
1✔
250
      String sgName = entry.getKey();
×
251
      for (DataRegionId dataRegionId : entry.getValue()) {
×
252
        Callable<Void> recoverDataRegionTask =
×
253
            () -> {
254
              DataRegion dataRegion = null;
×
255
              try {
256
                dataRegion =
×
257
                    buildNewDataRegion(
×
258
                        sgName,
259
                        dataRegionId,
260
                        ttlMapForRecover.getOrDefault(sgName, Long.MAX_VALUE));
×
261
              } catch (DataRegionException e) {
×
262
                logger.error(
×
263
                    "Failed to recover data region {}[{}]", sgName, dataRegionId.getId(), e);
×
264
              }
×
265
              dataRegionMap.put(dataRegionId, dataRegion);
×
266
              logger.info(
×
267
                  "Data regions have been recovered {}/{}",
268
                  readyDataRegionNum.incrementAndGet(),
×
269
                  recoverDataRegionNum);
×
270
              return null;
×
271
            };
272
        futures.add(cachedThreadPool.submit(recoverDataRegionTask));
×
273
      }
×
274
    }
×
275
  }
1✔
276

277
  /** get StorageGroup -> DataRegionIdList map from data/system directory. */
278
  public Map<String, List<DataRegionId>> getLocalDataRegionInfo() {
279
    File system = SystemFileFactory.INSTANCE.getFile(systemDir);
1✔
280
    File[] sgDirs = system.listFiles();
1✔
281
    Map<String, List<DataRegionId>> localDataRegionInfo = new HashMap<>();
1✔
282
    if (sgDirs == null) {
1✔
283
      return localDataRegionInfo;
×
284
    }
285
    for (File sgDir : sgDirs) {
1✔
286
      if (!sgDir.isDirectory()) {
×
287
        continue;
×
288
      }
289
      String sgName = sgDir.getName();
×
290
      List<DataRegionId> dataRegionIdList = new ArrayList<>();
×
291
      for (File dataRegionDir : sgDir.listFiles()) {
×
292
        if (!dataRegionDir.isDirectory()) {
×
293
          continue;
×
294
        }
295
        dataRegionIdList.add(new DataRegionId(Integer.parseInt(dataRegionDir.getName())));
×
296
      }
297
      localDataRegionInfo.put(sgName, dataRegionIdList);
×
298
    }
299
    return localDataRegionInfo;
1✔
300
  }
301

302
  @Override
303
  public void start() {
304
    // build time Interval to divide time partition
305
    initTimePartition();
1✔
306
    // create systemDir
307
    try {
308
      FileUtils.forceMkdir(SystemFileFactory.INSTANCE.getFile(systemDir));
1✔
309
    } catch (IOException e) {
×
310
      throw new StorageEngineFailureException(e);
×
311
    }
1✔
312

313
    recover();
1✔
314

315
    ttlCheckThread =
1✔
316
        IoTDBThreadPoolFactory.newSingleThreadScheduledExecutor(ThreadName.TTL_CHECK.getName());
1✔
317
    ScheduledExecutorUtil.safelyScheduleAtFixedRate(
1✔
318
        ttlCheckThread,
319
        this::checkTTL,
320
        TTL_CHECK_INTERVAL,
321
        TTL_CHECK_INTERVAL,
322
        TimeUnit.MILLISECONDS);
323
    logger.info("start ttl check thread successfully.");
1✔
324

325
    startTimedService();
1✔
326
  }
1✔
327

328
  private void checkTTL() {
329
    try {
330
      for (DataRegion dataRegion : dataRegionMap.values()) {
×
331
        if (dataRegion != null) {
×
332
          dataRegion.checkFilesTTL();
×
333
        }
334
      }
×
335
    } catch (ConcurrentModificationException e) {
×
336
      // ignore
337
    } catch (Exception e) {
×
338
      logger.error("An error occurred when checking TTL", e);
×
339
    }
×
340
  }
×
341

342
  private void startTimedService() {
343
    // timed flush sequence memtable
344
    if (config.isEnableTimedFlushSeqMemtable()) {
1✔
345
      seqMemtableTimedFlushCheckThread =
1✔
346
          IoTDBThreadPoolFactory.newSingleThreadScheduledExecutor(
1✔
347
              ThreadName.TIMED_FLUSH_SEQ_MEMTABLE.getName());
1✔
348
      ScheduledExecutorUtil.safelyScheduleAtFixedRate(
1✔
349
          seqMemtableTimedFlushCheckThread,
350
          this::timedFlushSeqMemTable,
351
          config.getSeqMemtableFlushCheckInterval(),
1✔
352
          config.getSeqMemtableFlushCheckInterval(),
1✔
353
          TimeUnit.MILLISECONDS);
354
      logger.info("start sequence memtable timed flush check thread successfully.");
1✔
355
    }
356
    // timed flush unsequence memtable
357
    if (config.isEnableTimedFlushUnseqMemtable()) {
1✔
358
      unseqMemtableTimedFlushCheckThread =
1✔
359
          IoTDBThreadPoolFactory.newSingleThreadScheduledExecutor(
1✔
360
              ThreadName.TIMED_FLUSH_UNSEQ_MEMTABLE.getName());
1✔
361
      ScheduledExecutorUtil.safelyScheduleAtFixedRate(
1✔
362
          unseqMemtableTimedFlushCheckThread,
363
          this::timedFlushUnseqMemTable,
364
          config.getUnseqMemtableFlushCheckInterval(),
1✔
365
          config.getUnseqMemtableFlushCheckInterval(),
1✔
366
          TimeUnit.MILLISECONDS);
367
      logger.info("start unsequence memtable timed flush check thread successfully.");
1✔
368
    }
369
  }
1✔
370

371
  private void timedFlushSeqMemTable() {
372
    for (DataRegion dataRegion : dataRegionMap.values()) {
×
373
      if (dataRegion != null) {
×
374
        dataRegion.timedFlushSeqMemTable();
×
375
      }
376
    }
×
377
  }
×
378

379
  private void timedFlushUnseqMemTable() {
380
    for (DataRegion dataRegion : dataRegionMap.values()) {
×
381
      if (dataRegion != null) {
×
382
        dataRegion.timedFlushUnseqMemTable();
×
383
      }
384
    }
×
385
  }
×
386

387
  @Override
388
  public void stop() {
389
    for (DataRegion dataRegion : dataRegionMap.values()) {
1✔
390
      if (dataRegion != null) {
×
391
        ThreadUtils.stopThreadPool(
×
392
            dataRegion.getTimedCompactionScheduleTask(), ThreadName.COMPACTION_SCHEDULE);
×
393
      }
394
    }
×
395
    syncCloseAllProcessor();
1✔
396
    ThreadUtils.stopThreadPool(ttlCheckThread, ThreadName.TTL_CHECK);
1✔
397
    ThreadUtils.stopThreadPool(
1✔
398
        seqMemtableTimedFlushCheckThread, ThreadName.TIMED_FLUSH_SEQ_MEMTABLE);
399
    ThreadUtils.stopThreadPool(
1✔
400
        unseqMemtableTimedFlushCheckThread, ThreadName.TIMED_FLUSH_UNSEQ_MEMTABLE);
401
    if (cachedThreadPool != null) {
1✔
402
      cachedThreadPool.shutdownNow();
1✔
403
    }
404
    dataRegionMap.clear();
1✔
405
  }
1✔
406

407
  @Override
408
  public void shutdown(long milliseconds) throws ShutdownException {
409
    try {
410
      for (DataRegion dataRegion : dataRegionMap.values()) {
×
411
        ThreadUtils.stopThreadPool(
×
412
            dataRegion.getTimedCompactionScheduleTask(), ThreadName.COMPACTION_SCHEDULE);
×
413
      }
×
414
      forceCloseAllProcessor();
×
415
    } catch (TsFileProcessorException e) {
×
416
      throw new ShutdownException(e);
×
417
    }
×
418
    shutdownTimedService(ttlCheckThread, "TTlCheckThread");
×
419
    shutdownTimedService(seqMemtableTimedFlushCheckThread, "SeqMemtableTimedFlushCheckThread");
×
420
    shutdownTimedService(unseqMemtableTimedFlushCheckThread, "UnseqMemtableTimedFlushCheckThread");
×
421
    cachedThreadPool.shutdownNow();
×
422
    dataRegionMap.clear();
×
423
  }
×
424

425
  private void shutdownTimedService(ScheduledExecutorService pool, String poolName) {
426
    if (pool != null) {
×
427
      pool.shutdownNow();
×
428
      try {
429
        pool.awaitTermination(30, TimeUnit.SECONDS);
×
430
      } catch (InterruptedException e) {
×
431
        logger.warn("{} still doesn't exit after 30s", poolName);
×
432
        Thread.currentThread().interrupt();
×
433
      }
×
434
    }
435
  }
×
436

437
  @Override
438
  public ServiceType getID() {
439
    return ServiceType.STORAGE_ENGINE_SERVICE;
×
440
  }
441

442
  /**
443
   * build a new data region
444
   *
445
   * @param dataRegionId data region id e.g. 1
446
   * @param logicalStorageGroupName database name e.g. root.sg1
447
   */
448
  public DataRegion buildNewDataRegion(
449
      String logicalStorageGroupName, DataRegionId dataRegionId, long ttl)
450
      throws DataRegionException {
451
    DataRegion dataRegion;
452
    logger.info(
×
453
        "construct a data region instance, the database is {}, Thread is {}",
454
        logicalStorageGroupName,
455
        Thread.currentThread().getId());
×
456
    dataRegion =
×
457
        new DataRegion(
458
            systemDir + File.separator + logicalStorageGroupName,
459
            String.valueOf(dataRegionId.getId()),
×
460
            fileFlushPolicy,
461
            logicalStorageGroupName);
462
    WRITING_METRICS.createFlushingMemTableStatusMetrics(dataRegionId);
×
463
    WRITING_METRICS.createDataRegionMemoryCostMetrics(dataRegion);
×
464
    dataRegion.setDataTTLWithTimePrecisionCheck(ttl);
×
465
    dataRegion.setCustomFlushListeners(customFlushListeners);
×
466
    dataRegion.setCustomCloseFileListeners(customCloseFileListeners);
×
467
    return dataRegion;
×
468
  }
469

470
  /** Write data into DataRegion. For standalone mode only. */
471
  public TSStatus write(DataRegionId groupId, PlanNode planNode) {
472
    return planNode.accept(new DataExecutionVisitor(), dataRegionMap.get(groupId));
×
473
  }
474

475
  /** This function is just for unit test. */
476
  @TestOnly
477
  public synchronized void reset() {
478
    dataRegionMap.clear();
×
479
  }
×
480

481
  /** flush command Sync asyncCloseOneProcessor all file node processors. */
482
  public void syncCloseAllProcessor() {
483
    logger.info("Start closing all database processor");
1✔
484
    List<Future<Void>> tasks = new ArrayList<>();
1✔
485
    for (DataRegion dataRegion : dataRegionMap.values()) {
1✔
486
      if (dataRegion != null) {
×
487
        tasks.add(
×
488
            cachedThreadPool.submit(
×
489
                () -> {
490
                  dataRegion.syncCloseAllWorkingTsFileProcessors();
×
491
                  return null;
×
492
                }));
493
      }
494
    }
×
495
    checkResults(tasks, "Failed to sync close processor.");
1✔
496
  }
1✔
497

498
  public void forceCloseAllProcessor() throws TsFileProcessorException {
499
    logger.info("Start force closing all database processor");
×
500
    List<Future<Void>> tasks = new ArrayList<>();
×
501
    for (DataRegion dataRegion : dataRegionMap.values()) {
×
502
      if (dataRegion != null) {
×
503
        tasks.add(
×
504
            cachedThreadPool.submit(
×
505
                () -> {
506
                  dataRegion.forceCloseAllWorkingTsFileProcessors();
×
507
                  return null;
×
508
                }));
509
      }
510
    }
×
511
    checkResults(tasks, "Failed to force close processor.");
×
512
  }
×
513

514
  public void closeStorageGroupProcessor(String storageGroupPath, boolean isSeq) {
515
    List<Future<Void>> tasks = new ArrayList<>();
×
516
    for (DataRegion dataRegion : dataRegionMap.values()) {
×
517
      if (dataRegion.getDatabaseName().equals(storageGroupPath)) {
×
518
        if (isSeq) {
×
519
          for (TsFileProcessor tsFileProcessor : dataRegion.getWorkSequenceTsFileProcessors()) {
×
520
            tasks.add(
×
521
                cachedThreadPool.submit(
×
522
                    () -> {
523
                      dataRegion.syncCloseOneTsFileProcessor(isSeq, tsFileProcessor);
×
524
                      return null;
×
525
                    }));
526
          }
×
527
        } else {
528
          for (TsFileProcessor tsFileProcessor : dataRegion.getWorkUnsequenceTsFileProcessors()) {
×
529
            tasks.add(
×
530
                cachedThreadPool.submit(
×
531
                    () -> {
532
                      dataRegion.syncCloseOneTsFileProcessor(isSeq, tsFileProcessor);
×
533
                      return null;
×
534
                    }));
535
          }
×
536
        }
537
      }
538
    }
×
539
    checkResults(tasks, "Failed to close database processor.");
×
540
  }
×
541

542
  private <V> void checkResults(List<Future<V>> tasks, String errorMsg) {
543
    for (Future<V> task : tasks) {
1✔
544
      try {
545
        task.get();
×
546
      } catch (ExecutionException e) {
×
547
        throw new StorageEngineFailureException(errorMsg, e);
×
548
      } catch (InterruptedException e) {
×
549
        Thread.currentThread().interrupt();
×
550
        throw new StorageEngineFailureException(errorMsg, e);
×
551
      }
×
552
    }
×
553
  }
1✔
554

555
  /**
556
   * merge all databases.
557
   *
558
   * @throws StorageEngineException StorageEngineException
559
   */
560
  public void mergeAll() throws StorageEngineException {
561
    if (CommonDescriptor.getInstance().getConfig().isReadOnly()) {
×
562
      throw new StorageEngineException("Current system mode is read only, does not support merge");
×
563
    }
564
    dataRegionMap.values().forEach(DataRegion::compact);
×
565
  }
×
566

567
  public void operateFlush(TFlushReq req) {
568
    if (req.storageGroups == null) {
×
569
      StorageEngine.getInstance().syncCloseAllProcessor();
×
570
      WALManager.getInstance().deleteOutdatedWALFiles();
×
571
    } else {
572
      for (String storageGroup : req.storageGroups) {
×
573
        if (req.isSeq == null) {
×
574
          StorageEngine.getInstance().closeStorageGroupProcessor(storageGroup, true);
×
575
          StorageEngine.getInstance().closeStorageGroupProcessor(storageGroup, false);
×
576
        } else {
577
          StorageEngine.getInstance()
×
578
              .closeStorageGroupProcessor(storageGroup, Boolean.parseBoolean(req.isSeq));
×
579
        }
580
      }
×
581
    }
582
  }
×
583

584
  public void clearCache() {
585
    ChunkCache.getInstance().clear();
×
586
    TimeSeriesMetadataCache.getInstance().clear();
×
587
    BloomFilterCache.getInstance().clear();
×
588
  }
×
589

590
  public void setTTL(List<DataRegionId> dataRegionIdList, long dataTTL) {
591
    for (DataRegionId dataRegionId : dataRegionIdList) {
×
592
      DataRegion dataRegion = dataRegionMap.get(dataRegionId);
×
593
      if (dataRegion != null) {
×
594
        dataRegion.setDataTTLWithTimePrecisionCheck(dataTTL);
×
595
      }
596
    }
×
597
  }
×
598

599
  /**
600
   * Add a listener to listen flush start/end events. Notice that this addition only applies to
601
   * TsFileProcessors created afterwards.
602
   *
603
   * @param listener
604
   */
605
  public void registerFlushListener(FlushListener listener) {
606
    customFlushListeners.add(listener);
×
607
  }
×
608

609
  /**
610
   * Add a listener to listen file close events. Notice that this addition only applies to
611
   * TsFileProcessors created afterwards.
612
   *
613
   * @param listener
614
   */
615
  public void registerCloseFileListener(CloseFileListener listener) {
616
    customCloseFileListeners.add(listener);
×
617
  }
×
618

619
  private void makeSureNoOldRegion(DataRegionId regionId) {
620
    while (deletingDataRegionMap.containsKey(regionId)) {
×
621
      DataRegion oldRegion = deletingDataRegionMap.get(regionId);
×
622
      if (oldRegion != null) {
×
623
        oldRegion.waitForDeleted();
×
624
      }
625
    }
×
626
  }
×
627

628
  // When registering a new region, the coordinator needs to register the corresponding region with
629
  // the local storageengine before adding the corresponding consensusGroup to the consensus layer
630
  public DataRegion createDataRegion(DataRegionId regionId, String sg, long ttl)
631
      throws DataRegionException {
632
    makeSureNoOldRegion(regionId);
×
633
    AtomicReference<DataRegionException> exceptionAtomicReference = new AtomicReference<>(null);
×
634
    DataRegion dataRegion =
×
635
        dataRegionMap.computeIfAbsent(
×
636
            regionId,
637
            x -> {
638
              try {
639
                return buildNewDataRegion(sg, x, ttl);
×
640
              } catch (DataRegionException e) {
×
641
                exceptionAtomicReference.set(e);
×
642
              }
643
              return null;
×
644
            });
645
    if (exceptionAtomicReference.get() != null) {
×
646
      throw exceptionAtomicReference.get();
×
647
    }
648
    return dataRegion;
×
649
  }
650

651
  public void deleteDataRegion(DataRegionId regionId) {
652
    if (!dataRegionMap.containsKey(regionId) || deletingDataRegionMap.containsKey(regionId)) {
1✔
653
      return;
1✔
654
    }
655
    DataRegion region =
1✔
656
        deletingDataRegionMap.computeIfAbsent(regionId, k -> dataRegionMap.remove(regionId));
1✔
657
    if (region != null) {
1✔
658
      region.markDeleted();
1✔
659
      WRITING_METRICS.removeDataRegionMemoryCostMetrics(regionId);
1✔
660
      WRITING_METRICS.removeFlushingMemTableStatusMetrics(regionId);
1✔
661
      try {
662
        region.abortCompaction();
1✔
663
        region.syncDeleteDataFiles();
1✔
664
        region.deleteFolder(systemDir);
1✔
665
        if (config.isClusterMode()
1✔
666
            && config
667
                .getDataRegionConsensusProtocolClass()
×
668
                .equals(ConsensusFactory.IOT_CONSENSUS)) {
×
669
          // delete wal
670
          WALManager.getInstance()
×
671
              .deleteWALNode(
×
672
                  region.getDatabaseName() + FILE_NAME_SEPARATOR + region.getDataRegionId());
×
673
          // delete snapshot
674
          for (String dataDir : config.getLocalDataDirs()) {
×
675
            File regionSnapshotDir =
×
676
                new File(
677
                    dataDir + File.separator + IoTDBConstant.SNAPSHOT_FOLDER_NAME,
678
                    region.getDatabaseName() + FILE_NAME_SEPARATOR + regionId.getId());
×
679
            if (regionSnapshotDir.exists()) {
×
680
              try {
681
                FileUtils.deleteDirectory(regionSnapshotDir);
×
682
              } catch (IOException e) {
×
683
                logger.error("Failed to delete snapshot dir {}", regionSnapshotDir, e);
×
684
              }
×
685
            }
686
          }
687
        }
688
      } catch (Exception e) {
×
689
        logger.error(
×
690
            "Error occurs when deleting data region {}-{}",
691
            region.getDatabaseName(),
×
692
            region.getDataRegionId(),
×
693
            e);
694
      } finally {
695
        deletingDataRegionMap.remove(regionId);
1✔
696
      }
697
    }
698
  }
1✔
699

700
  /**
701
   * run the runnable if the region is absent. if the region is present, do nothing.
702
   *
703
   * <p>we don't use computeIfAbsent because we don't want to create a new region if the region is
704
   * absent, we just want to run the runnable in a synchronized way.
705
   *
706
   * @return true if the region is absent and the runnable is run. false if the region is present.
707
   */
708
  public boolean runIfAbsent(DataRegionId regionId, Runnable runnable) {
709
    final AtomicBoolean result = new AtomicBoolean(false);
×
710
    dataRegionMap.computeIfAbsent(
×
711
        regionId,
712
        k -> {
713
          runnable.run();
×
714
          result.set(true);
×
715
          return null;
×
716
        });
717
    return result.get();
×
718
  }
719

720
  /**
721
   * run the consumer if the region is present. if the region is absent, do nothing.
722
   *
723
   * <p>we don't use computeIfPresent because we don't want to remove the region if the consumer
724
   * returns null, we just want to run the consumer in a synchronized way.
725
   *
726
   * @return true if the region is present and the consumer is run. false if the region is absent.
727
   */
728
  public boolean runIfPresent(DataRegionId regionId, Consumer<DataRegion> consumer) {
729
    final AtomicBoolean result = new AtomicBoolean(false);
×
730
    dataRegionMap.computeIfPresent(
×
731
        regionId,
732
        (id, region) -> {
733
          consumer.accept(region);
×
734
          result.set(true);
×
735
          return region;
×
736
        });
737
    return result.get();
×
738
  }
739

740
  public DataRegion getDataRegion(DataRegionId regionId) {
741
    return dataRegionMap.get(regionId);
1✔
742
  }
743

744
  public List<DataRegion> getAllDataRegions() {
745
    return new ArrayList<>(dataRegionMap.values());
1✔
746
  }
747

748
  public List<DataRegionId> getAllDataRegionIds() {
749
    return new ArrayList<>(dataRegionMap.keySet());
1✔
750
  }
751

752
  /** This method is not thread-safe */
753
  public void setDataRegion(DataRegionId regionId, DataRegion newRegion) {
754
    if (dataRegionMap.containsKey(regionId)) {
1✔
755
      DataRegion oldRegion = dataRegionMap.get(regionId);
×
756
      oldRegion.syncCloseAllWorkingTsFileProcessors();
×
757
      oldRegion.abortCompaction();
×
758
    }
759
    dataRegionMap.put(regionId, newRegion);
1✔
760
  }
1✔
761

762
  public TSStatus setTTL(TSetTTLReq req) {
763
    Map<String, List<DataRegionId>> localDataRegionInfo =
764
        StorageEngine.getInstance().getLocalDataRegionInfo();
×
765
    List<DataRegionId> dataRegionIdList = new ArrayList<>();
×
766
    req.storageGroupPathPattern.forEach(
×
767
        storageGroup -> dataRegionIdList.addAll(localDataRegionInfo.get(storageGroup)));
×
768
    for (DataRegionId dataRegionId : dataRegionIdList) {
×
769
      DataRegion dataRegion = dataRegionMap.get(dataRegionId);
×
770
      if (dataRegion != null) {
×
771
        dataRegion.setDataTTLWithTimePrecisionCheck(req.TTL);
×
772
      }
773
    }
×
774
    return RpcUtils.getStatus(TSStatusCode.SUCCESS_STATUS);
×
775
  }
776

777
  public TsFileFlushPolicy getFileFlushPolicy() {
778
    return fileFlushPolicy;
1✔
779
  }
780

781
  public TSStatus writeLoadTsFileNode(
782
      DataRegionId dataRegionId, LoadTsFilePieceNode pieceNode, String uuid) {
783
    TSStatus status = new TSStatus();
×
784

785
    try {
786
      getLoadTsFileManager().writeToDataRegion(getDataRegion(dataRegionId), pieceNode, uuid);
×
787
    } catch (IOException e) {
×
788
      logger.error(
×
789
          String.format(
×
790
              "IO error when writing piece node of TsFile %s to DataRegion %s.",
791
              pieceNode.getTsFile(), dataRegionId),
×
792
          e);
793
      status.setCode(TSStatusCode.LOAD_FILE_ERROR.getStatusCode());
×
794
      status.setMessage(e.getMessage());
×
795
      return status;
×
796
    }
×
797

798
    return RpcUtils.SUCCESS_STATUS;
×
799
  }
800

801
  public TSStatus executeLoadCommand(
802
      LoadTsFileScheduler.LoadCommand loadCommand, String uuid, boolean isGeneratedByPipe) {
803
    TSStatus status = new TSStatus();
×
804

805
    try {
806
      switch (loadCommand) {
×
807
        case EXECUTE:
808
          if (getLoadTsFileManager().loadAll(uuid, isGeneratedByPipe)) {
×
809
            status = RpcUtils.SUCCESS_STATUS;
×
810
          } else {
811
            status.setCode(TSStatusCode.LOAD_FILE_ERROR.getStatusCode());
×
812
            status.setMessage(
×
813
                String.format(
×
814
                    "No load TsFile uuid %s recorded for execute load command %s.",
815
                    uuid, loadCommand));
816
          }
817
          break;
×
818
        case ROLLBACK:
819
          if (getLoadTsFileManager().deleteAll(uuid)) {
×
820
            status = RpcUtils.SUCCESS_STATUS;
×
821
          } else {
822
            status.setCode(TSStatusCode.LOAD_FILE_ERROR.getStatusCode());
×
823
            status.setMessage(
×
824
                String.format(
×
825
                    "No load TsFile uuid %s recorded for execute load command %s.",
826
                    uuid, loadCommand));
827
          }
828
          break;
×
829
        default:
830
          status.setCode(TSStatusCode.ILLEGAL_PARAMETER.getStatusCode());
×
831
          status.setMessage(String.format("Wrong load command %s.", loadCommand));
×
832
      }
833
    } catch (IOException | LoadFileException e) {
×
834
      logger.error(String.format("Execute load command %s error.", loadCommand), e);
×
835
      status.setCode(TSStatusCode.LOAD_FILE_ERROR.getStatusCode());
×
836
      status.setMessage(e.getMessage());
×
837
    }
×
838

839
    return status;
×
840
  }
841

842
  /** reboot timed flush sequence/unsequence memetable thread */
843
  public void rebootTimedService() throws ShutdownException {
844
    logger.info("Start rebooting all timed service.");
1✔
845

846
    // exclude ttl check thread
847
    stopTimedServiceAndThrow(seqMemtableTimedFlushCheckThread, "SeqMemtableTimedFlushCheckThread");
1✔
848
    stopTimedServiceAndThrow(
1✔
849
        unseqMemtableTimedFlushCheckThread, "UnseqMemtableTimedFlushCheckThread");
850

851
    logger.info("Stop all timed service successfully, and now restart them.");
1✔
852

853
    startTimedService();
1✔
854

855
    logger.info("Reboot all timed service successfully");
1✔
856
  }
1✔
857

858
  private void stopTimedServiceAndThrow(ScheduledExecutorService pool, String poolName)
859
      throws ShutdownException {
860
    if (pool != null) {
1✔
861
      pool.shutdownNow();
1✔
862
      try {
863
        pool.awaitTermination(30, TimeUnit.SECONDS);
1✔
864
      } catch (InterruptedException e) {
×
865
        logger.warn("{} still doesn't exit after 30s", poolName);
×
866
        throw new ShutdownException(e);
×
867
      }
1✔
868
    }
869
  }
1✔
870

871
  public void getDiskSizeByDataRegion(
872
      Map<Integer, Long> dataRegionDisk, List<Integer> dataRegionIds) {
873
    dataRegionMap.forEach(
1✔
874
        (dataRegionId, dataRegion) -> {
875
          if (dataRegionIds.contains(dataRegionId.getId())) {
×
876
            dataRegionDisk.put(dataRegionId.getId(), dataRegion.countRegionDiskSize());
×
877
          }
878
        });
×
879
  }
1✔
880

881
  private LoadTsFileManager getLoadTsFileManager() {
882
    if (loadTsFileManager == null) {
×
883
      synchronized (LoadTsFileManager.class) {
×
884
        if (loadTsFileManager == null) {
×
885
          loadTsFileManager = new LoadTsFileManager();
×
886
        }
887
      }
×
888
    }
889
    return loadTsFileManager;
×
890
  }
891

892
  static class InstanceHolder {
893

894
    private static final StorageEngine INSTANCE = new StorageEngine();
1✔
895

896
    private InstanceHolder() {
897
      // forbidding instantiation
898
    }
899
  }
900
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc