• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

apache / iotdb / #9941

28 Aug 2023 04:15PM UTC coverage: 47.749% (+0.01%) from 47.736%
#9941

push

travis_ci

web-flow
Pipe: drop HeartbeatEvent when pipe is stopped to avoid OOM (#10977)

17 of 17 new or added lines in 5 files covered. (100.0%)

80250 of 168065 relevant lines covered (47.75%)

0.48 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

50.45
/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java
1
/*
2
 * Licensed to the Apache Software Foundation (ASF) under one
3
 * or more contributor license agreements.  See the NOTICE file
4
 * distributed with this work for additional information
5
 * regarding copyright ownership.  The ASF licenses this file
6
 * to you under the Apache License, Version 2.0 (the
7
 * "License"); you may not use this file except in compliance
8
 * with the License.  You may obtain a copy of the License at
9
 *
10
 *      http://www.apache.org/licenses/LICENSE-2.0
11
 *
12
 * Unless required by applicable law or agreed to in writing,
13
 * software distributed under the License is distributed on an
14
 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15
 * KIND, either express or implied.  See the License for the
16
 * specific language governing permissions and limitations
17
 * under the License.
18
 */
19

20
package org.apache.iotdb.db.storageengine.dataregion;
21

22
import org.apache.iotdb.common.rpc.thrift.TSStatus;
23
import org.apache.iotdb.commons.cluster.NodeStatus;
24
import org.apache.iotdb.commons.concurrent.IoTDBThreadPoolFactory;
25
import org.apache.iotdb.commons.concurrent.ThreadName;
26
import org.apache.iotdb.commons.concurrent.threadpool.ScheduledExecutorUtil;
27
import org.apache.iotdb.commons.conf.CommonDescriptor;
28
import org.apache.iotdb.commons.consensus.DataRegionId;
29
import org.apache.iotdb.commons.exception.IllegalPathException;
30
import org.apache.iotdb.commons.exception.MetadataException;
31
import org.apache.iotdb.commons.file.SystemFileFactory;
32
import org.apache.iotdb.commons.path.PartialPath;
33
import org.apache.iotdb.commons.service.metric.MetricService;
34
import org.apache.iotdb.commons.service.metric.PerformanceOverviewMetrics;
35
import org.apache.iotdb.commons.utils.CommonDateTimeUtils;
36
import org.apache.iotdb.commons.utils.TestOnly;
37
import org.apache.iotdb.consensus.ConsensusFactory;
38
import org.apache.iotdb.db.conf.IoTDBConfig;
39
import org.apache.iotdb.db.conf.IoTDBDescriptor;
40
import org.apache.iotdb.db.exception.BatchProcessException;
41
import org.apache.iotdb.db.exception.DataRegionException;
42
import org.apache.iotdb.db.exception.DiskSpaceInsufficientException;
43
import org.apache.iotdb.db.exception.LoadFileException;
44
import org.apache.iotdb.db.exception.TsFileProcessorException;
45
import org.apache.iotdb.db.exception.WriteProcessException;
46
import org.apache.iotdb.db.exception.WriteProcessRejectException;
47
import org.apache.iotdb.db.exception.query.OutOfTTLException;
48
import org.apache.iotdb.db.exception.query.QueryProcessException;
49
import org.apache.iotdb.db.exception.quota.ExceedQuotaException;
50
import org.apache.iotdb.db.pipe.extractor.realtime.listener.PipeInsertionDataNodeListener;
51
import org.apache.iotdb.db.queryengine.execution.fragment.QueryContext;
52
import org.apache.iotdb.db.queryengine.metric.QueryResourceMetricSet;
53
import org.apache.iotdb.db.queryengine.plan.analyze.cache.schema.DataNodeSchemaCache;
54
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNodeId;
55
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.DeleteDataNode;
56
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertMultiTabletsNode;
57
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertRowNode;
58
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertRowsNode;
59
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertRowsOfOneDeviceNode;
60
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertTabletNode;
61
import org.apache.iotdb.db.service.SettleService;
62
import org.apache.iotdb.db.service.metrics.FileMetrics;
63
import org.apache.iotdb.db.storageengine.StorageEngine;
64
import org.apache.iotdb.db.storageengine.buffer.BloomFilterCache;
65
import org.apache.iotdb.db.storageengine.buffer.ChunkCache;
66
import org.apache.iotdb.db.storageengine.buffer.TimeSeriesMetadataCache;
67
import org.apache.iotdb.db.storageengine.dataregion.compaction.execute.recover.CompactionRecoverManager;
68
import org.apache.iotdb.db.storageengine.dataregion.compaction.execute.task.AbstractCompactionTask;
69
import org.apache.iotdb.db.storageengine.dataregion.compaction.schedule.CompactionScheduler;
70
import org.apache.iotdb.db.storageengine.dataregion.compaction.schedule.CompactionTaskManager;
71
import org.apache.iotdb.db.storageengine.dataregion.flush.CloseFileListener;
72
import org.apache.iotdb.db.storageengine.dataregion.flush.FlushListener;
73
import org.apache.iotdb.db.storageengine.dataregion.flush.FlushStatus;
74
import org.apache.iotdb.db.storageengine.dataregion.flush.TsFileFlushPolicy;
75
import org.apache.iotdb.db.storageengine.dataregion.memtable.IMemTable;
76
import org.apache.iotdb.db.storageengine.dataregion.memtable.TsFileProcessor;
77
import org.apache.iotdb.db.storageengine.dataregion.memtable.TsFileProcessorInfo;
78
import org.apache.iotdb.db.storageengine.dataregion.modification.Deletion;
79
import org.apache.iotdb.db.storageengine.dataregion.modification.ModificationFile;
80
import org.apache.iotdb.db.storageengine.dataregion.read.QueryDataSource;
81
import org.apache.iotdb.db.storageengine.dataregion.read.control.FileReaderManager;
82
import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileManager;
83
import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResource;
84
import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResourceStatus;
85
import org.apache.iotdb.db.storageengine.dataregion.tsfile.generator.TsFileNameGenerator;
86
import org.apache.iotdb.db.storageengine.dataregion.tsfile.generator.VersionController;
87
import org.apache.iotdb.db.storageengine.dataregion.wal.WALManager;
88
import org.apache.iotdb.db.storageengine.dataregion.wal.node.IWALNode;
89
import org.apache.iotdb.db.storageengine.dataregion.wal.recover.WALRecoverManager;
90
import org.apache.iotdb.db.storageengine.dataregion.wal.recover.file.SealedTsFileRecoverPerformer;
91
import org.apache.iotdb.db.storageengine.dataregion.wal.recover.file.UnsealedTsFileRecoverPerformer;
92
import org.apache.iotdb.db.storageengine.dataregion.wal.utils.WALMode;
93
import org.apache.iotdb.db.storageengine.dataregion.wal.utils.listener.WALFlushListener;
94
import org.apache.iotdb.db.storageengine.dataregion.wal.utils.listener.WALRecoverListener;
95
import org.apache.iotdb.db.storageengine.rescon.disk.TierManager;
96
import org.apache.iotdb.db.storageengine.rescon.memory.TimePartitionInfo;
97
import org.apache.iotdb.db.storageengine.rescon.memory.TimePartitionManager;
98
import org.apache.iotdb.db.storageengine.rescon.memory.TsFileResourceManager;
99
import org.apache.iotdb.db.storageengine.rescon.quotas.DataNodeSpaceQuotaManager;
100
import org.apache.iotdb.db.tools.settle.TsFileAndModSettleTool;
101
import org.apache.iotdb.db.utils.CopyOnReadLinkedList;
102
import org.apache.iotdb.db.utils.DateTimeUtils;
103
import org.apache.iotdb.rpc.RpcUtils;
104
import org.apache.iotdb.rpc.TSStatusCode;
105
import org.apache.iotdb.tsfile.file.metadata.ChunkMetadata;
106
import org.apache.iotdb.tsfile.fileSystem.FSFactoryProducer;
107
import org.apache.iotdb.tsfile.fileSystem.FSType;
108
import org.apache.iotdb.tsfile.fileSystem.fsFactory.FSFactory;
109
import org.apache.iotdb.tsfile.read.filter.basic.Filter;
110
import org.apache.iotdb.tsfile.utils.FSUtils;
111
import org.apache.iotdb.tsfile.utils.Pair;
112
import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
113
import org.apache.iotdb.tsfile.write.writer.RestorableTsFileIOWriter;
114

115
import org.apache.commons.io.FileUtils;
116
import org.slf4j.Logger;
117
import org.slf4j.LoggerFactory;
118

119
import java.io.File;
120
import java.io.IOException;
121
import java.nio.file.Files;
122
import java.util.ArrayList;
123
import java.util.Arrays;
124
import java.util.Collection;
125
import java.util.Collections;
126
import java.util.Comparator;
127
import java.util.Date;
128
import java.util.HashMap;
129
import java.util.HashSet;
130
import java.util.Iterator;
131
import java.util.List;
132
import java.util.Map;
133
import java.util.Map.Entry;
134
import java.util.Set;
135
import java.util.TreeMap;
136
import java.util.concurrent.ScheduledExecutorService;
137
import java.util.concurrent.TimeUnit;
138
import java.util.concurrent.atomic.AtomicBoolean;
139
import java.util.concurrent.atomic.AtomicLong;
140
import java.util.concurrent.locks.Condition;
141
import java.util.concurrent.locks.ReadWriteLock;
142
import java.util.concurrent.locks.ReentrantReadWriteLock;
143

144
import static org.apache.iotdb.commons.conf.IoTDBConstant.FILE_NAME_SEPARATOR;
145
import static org.apache.iotdb.db.queryengine.metric.QueryResourceMetricSet.SEQUENCE_TSFILE;
146
import static org.apache.iotdb.db.queryengine.metric.QueryResourceMetricSet.UNSEQUENCE_TSFILE;
147
import static org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResource.TEMP_SUFFIX;
148
import static org.apache.iotdb.tsfile.common.constant.TsFileConstant.TSFILE_SUFFIX;
149

150
/**
151
 * For sequence data, a DataRegion has some TsFileProcessors, in which there is only one
152
 * TsFileProcessor in the working status. <br>
153
 *
154
 * <p>There are two situations to set the working TsFileProcessor to closing status:<br>
155
 *
156
 * <p>(1) when inserting data into the TsFileProcessor, and the TsFileProcessor shouldFlush() (or
157
 * shouldClose())<br>
158
 *
159
 * <p>(2) someone calls syncCloseAllWorkingTsFileProcessors(). (up to now, only flush command from
160
 * cli will call this method)<br>
161
 *
162
 * <p>UnSequence data has the similar process as above.
163
 *
164
 * <p>When a sequence TsFileProcessor is submitted to be flushed, the
165
 * updateLatestFlushTimeCallback() method will be called as a callback.<br>
166
 *
167
 * <p>When a TsFileProcessor is closed, the closeUnsealedTsFileProcessorCallBack() method will be
168
 * called as a callback.
169
 */
170
public class DataRegion implements IDataRegionForQuery {
171

172
  private static final IoTDBConfig config = IoTDBDescriptor.getInstance().getConfig();
1✔
173
  private static final Logger DEBUG_LOGGER = LoggerFactory.getLogger("QUERY_DEBUG");
1✔
174

175
  /**
176
   * All newly generated chunks after merge have version number 0, so we set merged Modification
177
   * file version to 1 to take effect.
178
   */
179
  private static final int MERGE_MOD_START_VERSION_NUM = 1;
180

181
  private static final Logger logger = LoggerFactory.getLogger(DataRegion.class);
1✔
182

183
  private final boolean enableMemControl = config.isEnableMemControl();
1✔
184
  /**
185
   * a read write lock for guaranteeing concurrent safety when accessing all fields in this class
186
   * (i.e., schema, (un)sequenceFileList, work(un)SequenceTsFileProcessor,
187
   * closing(Un)SequenceTsFileProcessor, latestTimeForEachDevice, and
188
   * partitionLatestFlushedTimeForEachDevice)
189
   */
190
  private final ReadWriteLock insertLock = new ReentrantReadWriteLock();
1✔
191
  /** condition to safely delete data region. */
192
  private final Condition deletedCondition = insertLock.writeLock().newCondition();
1✔
193
  /** data region has been deleted or not. */
194
  private volatile boolean deleted = false;
1✔
195
  /** closeStorageGroupCondition is used to wait for all currently closing TsFiles to be done. */
196
  private final Object closeStorageGroupCondition = new Object();
1✔
197
  /**
198
   * avoid some tsfileResource is changed (e.g., from unsealed to sealed) when a read is executed.
199
   */
200
  private final ReadWriteLock closeQueryLock = new ReentrantReadWriteLock();
1✔
201
  /** time partition id in the database -> tsFileProcessor for this time partition. */
202
  private final TreeMap<Long, TsFileProcessor> workSequenceTsFileProcessors = new TreeMap<>();
1✔
203
  /** time partition id in the database -> tsFileProcessor for this time partition. */
204
  private final TreeMap<Long, TsFileProcessor> workUnsequenceTsFileProcessors = new TreeMap<>();
1✔
205

206
  /** sequence tsfile processors which are closing. */
207
  private final CopyOnReadLinkedList<TsFileProcessor> closingSequenceTsFileProcessor =
1✔
208
      new CopyOnReadLinkedList<>();
209

210
  /** unsequence tsfile processors which are closing. */
211
  private final CopyOnReadLinkedList<TsFileProcessor> closingUnSequenceTsFileProcessor =
1✔
212
      new CopyOnReadLinkedList<>();
213

214
  private final AtomicBoolean isSettling = new AtomicBoolean();
1✔
215

216
  /** data region id. */
217
  private final String dataRegionId;
218
  /** database name. */
219
  private final String databaseName;
220
  /** database system directory. */
221
  private File storageGroupSysDir;
222
  /** manage seqFileList and unSeqFileList. */
223
  private final TsFileManager tsFileManager;
224

225
  /** manage tsFileResource degrade. */
226
  private final TsFileResourceManager tsFileResourceManager = TsFileResourceManager.getInstance();
1✔
227

228
  /**
229
   * time partition id -> version controller which assigns a version for each MemTable and
230
   * deletion/update such that after they are persisted, the order of insertions, deletions and
231
   * updates can be re-determined. Will be empty if there are not MemTables in memory.
232
   */
233
  private final HashMap<Long, VersionController> timePartitionIdVersionControllerMap =
1✔
234
      new HashMap<>();
235
  /**
236
   * when the data in a database is older than dataTTL, it is considered invalid and will be
237
   * eventually removed.
238
   */
239
  private long dataTTL = Long.MAX_VALUE;
1✔
240
  /** file system factory (local or hdfs). */
241
  private final FSFactory fsFactory = FSFactoryProducer.getFSFactory();
1✔
242
  /** file flush policy. */
243
  private TsFileFlushPolicy fileFlushPolicy;
244
  /**
245
   * The max file versions in each partition. By recording this, if several IoTDB instances have the
246
   * same policy of closing file and their ingestion is identical, then files of the same version in
247
   * different IoTDB instance will have identical data, providing convenience for data comparison
248
   * across different instances. partition number -> max version number
249
   */
250
  private Map<Long, Long> partitionMaxFileVersions = new HashMap<>();
1✔
251
  /** database info for mem control. */
252
  private final DataRegionInfo dataRegionInfo = new DataRegionInfo(this);
1✔
253
  /** whether it's ready from recovery. */
254
  private boolean isReady = false;
1✔
255
  /** close file listeners. */
256
  private List<CloseFileListener> customCloseFileListeners = Collections.emptyList();
1✔
257
  /** flush listeners. */
258
  private List<FlushListener> customFlushListeners = Collections.emptyList();
1✔
259

260
  private ILastFlushTimeMap lastFlushTimeMap;
261

262
  /**
263
   * record the insertWriteLock in SG is being hold by which method, it will be empty string if no
264
   * one holds the insertWriteLock.
265
   */
266
  private String insertWriteLockHolder = "";
1✔
267

268
  private ScheduledExecutorService timedCompactionScheduleTask;
269

270
  public static final long COMPACTION_TASK_SUBMIT_DELAY = 20L * 1000L;
271

272
  private static final QueryResourceMetricSet QUERY_RESOURCE_METRIC_SET =
273
      QueryResourceMetricSet.getInstance();
1✔
274

275
  private static final PerformanceOverviewMetrics PERFORMANCE_OVERVIEW_METRICS =
1✔
276
      PerformanceOverviewMetrics.getInstance();
1✔
277

278
  /**
279
   * construct a database processor.
280
   *
281
   * @param systemDir system dir path
282
   * @param dataRegionId data region id e.g. 1
283
   * @param fileFlushPolicy file flush policy
284
   * @param databaseName database name e.g. root.sg1
285
   */
286
  public DataRegion(
287
      String systemDir, String dataRegionId, TsFileFlushPolicy fileFlushPolicy, String databaseName)
288
      throws DataRegionException {
1✔
289
    this.dataRegionId = dataRegionId;
1✔
290
    this.databaseName = databaseName;
1✔
291
    this.fileFlushPolicy = fileFlushPolicy;
1✔
292

293
    storageGroupSysDir = SystemFileFactory.INSTANCE.getFile(systemDir, dataRegionId);
1✔
294
    this.tsFileManager =
1✔
295
        new TsFileManager(databaseName, dataRegionId, storageGroupSysDir.getPath());
1✔
296
    if (storageGroupSysDir.mkdirs()) {
1✔
297
      logger.info(
1✔
298
          "Database system Directory {} doesn't exist, create it", storageGroupSysDir.getPath());
1✔
299
    } else if (!storageGroupSysDir.exists()) {
1✔
300
      logger.error("create database system Directory {} failed", storageGroupSysDir.getPath());
×
301
    }
302

303
    lastFlushTimeMap = new HashLastFlushTimeMap(tsFileManager);
1✔
304

305
    // recover tsfiles unless consensus protocol is ratis and storage storageengine is not ready
306
    if (config.isClusterMode()
1✔
307
        && config.getDataRegionConsensusProtocolClass().equals(ConsensusFactory.RATIS_CONSENSUS)
×
308
        && !StorageEngine.getInstance().isAllSgReady()) {
×
309
      logger.debug(
×
310
          "Skip recovering data region {}[{}] when consensus protocol is ratis and storage storageengine is not ready.",
311
          databaseName,
312
          dataRegionId);
313
      for (String fileFolder : TierManager.getInstance().getAllFilesFolders()) {
×
314
        File dataRegionFolder =
×
315
            fsFactory.getFile(fileFolder, databaseName + File.separator + dataRegionId);
×
316
        try {
317
          fsFactory.deleteDirectory(dataRegionFolder.getPath());
×
318
        } catch (IOException e) {
×
319
          logger.error(
×
320
              "Exception occurs when deleting data region folder for {}-{}",
321
              databaseName,
322
              dataRegionId,
323
              e);
324
        }
×
325
        if (FSUtils.getFSType(dataRegionFolder) == FSType.LOCAL) {
×
326
          dataRegionFolder.mkdirs();
×
327
        }
328
      }
×
329
    } else {
330
      recover();
1✔
331
    }
332

333
    MetricService.getInstance().addMetricSet(new DataRegionMetrics(this));
1✔
334
  }
1✔
335

336
  @TestOnly
337
  public DataRegion(String databaseName, String id) {
1✔
338
    this.databaseName = databaseName;
1✔
339
    this.dataRegionId = id;
1✔
340
    this.tsFileManager = new TsFileManager(databaseName, id, "");
1✔
341
    this.partitionMaxFileVersions = new HashMap<>();
1✔
342
    partitionMaxFileVersions.put(0L, 0L);
1✔
343
  }
1✔
344

345
  @Override
346
  public String getDatabaseName() {
347
    return databaseName;
1✔
348
  }
349

350
  public boolean isReady() {
351
    return isReady;
×
352
  }
353

354
  public void setReady(boolean ready) {
355
    isReady = ready;
×
356
  }
×
357

358
  private Map<Long, List<TsFileResource>> splitResourcesByPartition(
359
      List<TsFileResource> resources) {
360
    Map<Long, List<TsFileResource>> ret = new TreeMap<>();
1✔
361
    for (TsFileResource resource : resources) {
1✔
362
      ret.computeIfAbsent(resource.getTimePartition(), l -> new ArrayList<>()).add(resource);
1✔
363
    }
1✔
364
    return ret;
1✔
365
  }
366

367
  public AtomicBoolean getIsSettling() {
368
    return isSettling;
×
369
  }
370

371
  public void setSettling(boolean isSettling) {
372
    this.isSettling.set(isSettling);
×
373
  }
×
374

375
  /** this class is used to store recovering context. */
376
  private class DataRegionRecoveryContext {
377
    /** number of files to be recovered. */
378
    private final long numOfFilesToRecover;
379
    /** when the change of recoveredFilesNum exceeds this, log check will be triggered. */
380
    private final long filesNumLogCheckTrigger;
381
    /** number of already recovered files. */
382
    private long recoveredFilesNum;
383
    /** last recovery log time. */
384
    private long lastLogTime;
385
    /** last recovery log files num. */
386
    private long lastLogCheckFilesNum;
387
    /** recover performers of unsealed TsFiles. */
388
    private final List<UnsealedTsFileRecoverPerformer> recoverPerformers = new ArrayList<>();
1✔
389

390
    public DataRegionRecoveryContext(long numOfFilesToRecover) {
1✔
391
      this.numOfFilesToRecover = numOfFilesToRecover;
1✔
392
      this.recoveredFilesNum = 0;
1✔
393
      this.filesNumLogCheckTrigger = this.numOfFilesToRecover / 100;
1✔
394
      this.lastLogTime = System.currentTimeMillis();
1✔
395
      this.lastLogCheckFilesNum = 0;
1✔
396
    }
1✔
397

398
    public void incrementRecoveredFilesNum() {
399
      recoveredFilesNum++;
1✔
400
      // check log only when 1% more files have been recovered
401
      if (lastLogCheckFilesNum + filesNumLogCheckTrigger < recoveredFilesNum) {
1✔
402
        lastLogCheckFilesNum = recoveredFilesNum;
1✔
403
        // log only when log interval exceeds recovery log interval
404
        if (lastLogTime + config.getRecoveryLogIntervalInMs() < System.currentTimeMillis()) {
1✔
405
          logger.info(
×
406
              "The data region {}[{}] has recovered {}%, please wait a moment.",
407
              databaseName, dataRegionId, recoveredFilesNum * 1.0 / numOfFilesToRecover);
×
408
          lastLogTime = System.currentTimeMillis();
×
409
        }
410
      }
411
    }
1✔
412
  }
413

414
  /** recover from file */
415
  @SuppressWarnings({"squid:S3776", "squid:S6541"}) // Suppress high Cognitive Complexity warning
416
  private void recover() throws DataRegionException {
417
    try {
418
      recoverCompaction();
1✔
419
    } catch (Exception e) {
×
420
      throw new DataRegionException(e);
×
421
    }
1✔
422

423
    try {
424
      // collect candidate TsFiles from sequential and unsequential data directory
425
      List<TsFileResource> tmpSeqTsFiles =
1✔
426
          getAllFiles(TierManager.getInstance().getAllLocalSequenceFileFolders());
1✔
427
      List<TsFileResource> tmpUnseqTsFiles =
1✔
428
          getAllFiles(TierManager.getInstance().getAllLocalUnSequenceFileFolders());
1✔
429

430
      // split by partition so that we can find the last file of each partition and decide to
431
      // close it or not
432
      DataRegionRecoveryContext dataRegionRecoveryContext =
1✔
433
          new DataRegionRecoveryContext((long) tmpSeqTsFiles.size() + tmpUnseqTsFiles.size());
1✔
434
      Map<Long, List<TsFileResource>> partitionTmpSeqTsFiles =
1✔
435
          splitResourcesByPartition(tmpSeqTsFiles);
1✔
436
      Map<Long, List<TsFileResource>> partitionTmpUnseqTsFiles =
1✔
437
          splitResourcesByPartition(tmpUnseqTsFiles);
1✔
438
      // submit unsealed TsFiles to recover
439
      List<WALRecoverListener> recoverListeners = new ArrayList<>();
1✔
440
      for (List<TsFileResource> value : partitionTmpSeqTsFiles.values()) {
1✔
441
        // tsFiles without resource file are unsealed
442
        for (TsFileResource resource : value) {
1✔
443
          if (resource.resourceFileExists()) {
1✔
444
            FileMetrics.getInstance()
1✔
445
                .addFile(resource.getTsFile().length(), true, resource.getTsFile().getName());
1✔
446
            if (resource.getModFile().exists()) {
1✔
447
              FileMetrics.getInstance().increaseModFileNum(1);
×
448
              FileMetrics.getInstance().increaseModFileSize(resource.getModFile().getSize());
×
449
            }
450
          }
451
        }
1✔
452
        while (!value.isEmpty()) {
1✔
453
          TsFileResource tsFileResource = value.get(value.size() - 1);
1✔
454
          if (tsFileResource.resourceFileExists()) {
1✔
455
            break;
1✔
456
          } else {
457
            value.remove(value.size() - 1);
×
458
            WALRecoverListener recoverListener =
×
459
                recoverUnsealedTsFile(tsFileResource, dataRegionRecoveryContext, true);
×
460
            if (recoverListener != null) {
×
461
              recoverListeners.add(recoverListener);
×
462
            }
463
          }
464
        }
×
465
      }
1✔
466
      for (List<TsFileResource> value : partitionTmpUnseqTsFiles.values()) {
1✔
467
        // tsFiles without resource file are unsealed
468
        for (TsFileResource resource : value) {
×
469
          if (resource.resourceFileExists()) {
×
470
            FileMetrics.getInstance()
×
471
                .addFile(resource.getTsFile().length(), false, resource.getTsFile().getName());
×
472
          }
473
          if (resource.getModFile().exists()) {
×
474
            FileMetrics.getInstance().increaseModFileNum(1);
×
475
            FileMetrics.getInstance().increaseModFileSize(resource.getModFile().getSize());
×
476
          }
477
        }
×
478
        while (!value.isEmpty()) {
×
479
          TsFileResource tsFileResource = value.get(value.size() - 1);
×
480
          if (tsFileResource.resourceFileExists()) {
×
481
            break;
×
482
          } else {
483
            value.remove(value.size() - 1);
×
484
            WALRecoverListener recoverListener =
×
485
                recoverUnsealedTsFile(tsFileResource, dataRegionRecoveryContext, false);
×
486
            if (recoverListener != null) {
×
487
              recoverListeners.add(recoverListener);
×
488
            }
489
          }
490
        }
×
491
      }
×
492
      // signal wal recover manager to recover this region's files
493
      WALRecoverManager.getInstance().getAllDataRegionScannedLatch().countDown();
1✔
494
      // recover sealed TsFiles
495
      if (!partitionTmpSeqTsFiles.isEmpty()) {
1✔
496
        long latestPartitionId =
1✔
497
            ((TreeMap<Long, List<TsFileResource>>) partitionTmpSeqTsFiles).lastKey();
1✔
498
        for (Entry<Long, List<TsFileResource>> partitionFiles : partitionTmpSeqTsFiles.entrySet()) {
1✔
499
          recoverFilesInPartition(
1✔
500
              partitionFiles.getKey(),
1✔
501
              dataRegionRecoveryContext,
502
              partitionFiles.getValue(),
1✔
503
              true,
504
              partitionFiles.getKey() == latestPartitionId);
1✔
505
        }
1✔
506
      }
507
      for (Entry<Long, List<TsFileResource>> partitionFiles : partitionTmpUnseqTsFiles.entrySet()) {
1✔
508
        recoverFilesInPartition(
×
509
            partitionFiles.getKey(),
×
510
            dataRegionRecoveryContext,
511
            partitionFiles.getValue(),
×
512
            false,
513
            false);
514
      }
×
515
      // wait until all unsealed TsFiles have been recovered
516
      for (WALRecoverListener recoverListener : recoverListeners) {
1✔
517
        if (recoverListener.waitForResult() == WALRecoverListener.Status.FAILURE) {
×
518
          logger.error(
×
519
              "Fail to recover unsealed TsFile {}, skip it.",
520
              recoverListener.getFilePath(),
×
521
              recoverListener.getCause());
×
522
        }
523
        // update VSGRecoveryContext
524
        dataRegionRecoveryContext.incrementRecoveredFilesNum();
×
525
      }
×
526
      // recover unsealed TsFiles, sort make sure last flush time not be replaced by early files
527
      dataRegionRecoveryContext.recoverPerformers.sort(
1✔
528
          (p1, p2) ->
529
              compareFileName(
×
530
                  p1.getTsFileResource().getTsFile(), p2.getTsFileResource().getTsFile()));
×
531
      for (UnsealedTsFileRecoverPerformer recoverPerformer :
532
          dataRegionRecoveryContext.recoverPerformers) {
1✔
533
        recoverUnsealedTsFileCallBack(recoverPerformer);
×
534
      }
×
535
      for (TsFileResource resource : tsFileManager.getTsFileList(true)) {
1✔
536
        long partitionNum = resource.getTimePartition();
1✔
537
        updatePartitionFileVersion(partitionNum, resource.getVersion());
1✔
538
      }
1✔
539
      for (TsFileResource resource : tsFileManager.getTsFileList(false)) {
1✔
540
        long partitionNum = resource.getTimePartition();
×
541
        updatePartitionFileVersion(partitionNum, resource.getVersion());
×
542
      }
×
543
    } catch (IOException e) {
×
544
      throw new DataRegionException(e);
×
545
    }
1✔
546

547
    // recover and start timed compaction thread
548
    initCompaction();
1✔
549

550
    if (StorageEngine.getInstance().isAllSgReady()) {
1✔
551
      logger.info("The data region {}[{}] is created successfully", databaseName, dataRegionId);
1✔
552
    } else {
553
      logger.info("The data region {}[{}] is recovered successfully", databaseName, dataRegionId);
1✔
554
    }
555
  }
1✔
556

557
  private void updateLastFlushTime(TsFileResource resource, boolean isSeq) {
558
    //  only update flush time when it is a seq file
559
    if (isSeq) {
1✔
560
      long timePartitionId = resource.getTimePartition();
1✔
561
      Map<String, Long> endTimeMap = new HashMap<>();
1✔
562
      for (String deviceId : resource.getDevices()) {
1✔
563
        long endTime = resource.getEndTime(deviceId);
1✔
564
        endTimeMap.put(deviceId.intern(), endTime);
1✔
565
      }
1✔
566
      lastFlushTimeMap.setMultiDeviceFlushedTime(timePartitionId, endTimeMap);
1✔
567
      lastFlushTimeMap.setMultiDeviceGlobalFlushedTime(endTimeMap);
1✔
568
    }
569
  }
1✔
570

571
  private void initCompaction() {
572
    if (!config.isEnableSeqSpaceCompaction()
1✔
573
        && !config.isEnableUnseqSpaceCompaction()
×
574
        && !config.isEnableCrossSpaceCompaction()) {
×
575
      return;
×
576
    }
577
    timedCompactionScheduleTask =
1✔
578
        IoTDBThreadPoolFactory.newSingleThreadScheduledExecutor(
1✔
579
            ThreadName.COMPACTION_SCHEDULE.getName() + "-" + databaseName + "-" + dataRegionId);
1✔
580
    ScheduledExecutorUtil.safelyScheduleWithFixedDelay(
1✔
581
        timedCompactionScheduleTask,
582
        this::executeCompaction,
583
        COMPACTION_TASK_SUBMIT_DELAY,
584
        IoTDBDescriptor.getInstance().getConfig().getCompactionScheduleIntervalInMs(),
1✔
585
        TimeUnit.MILLISECONDS);
586
  }
1✔
587

588
  private void recoverCompaction() {
589
    CompactionRecoverManager compactionRecoverManager =
1✔
590
        new CompactionRecoverManager(tsFileManager, databaseName, dataRegionId);
591
    compactionRecoverManager.recoverInnerSpaceCompaction(true);
1✔
592
    compactionRecoverManager.recoverInnerSpaceCompaction(false);
1✔
593
    compactionRecoverManager.recoverCrossSpaceCompaction();
1✔
594
  }
1✔
595

596
  private void updatePartitionFileVersion(long partitionNum, long fileVersion) {
597
    long oldVersion = partitionMaxFileVersions.getOrDefault(partitionNum, 0L);
1✔
598
    if (fileVersion > oldVersion) {
1✔
599
      partitionMaxFileVersions.put(partitionNum, fileVersion);
1✔
600
    }
601
  }
1✔
602

603
  @SuppressWarnings("squid:S3776") // Suppress high Cognitive Complexity warning
604
  private List<TsFileResource> getAllFiles(List<String> folders)
605
      throws IOException, DataRegionException {
606
    // "{partition id}/{tsfile name}" -> tsfile file, remove duplicate files in one time partition
607
    Map<String, File> tsFilePartitionPath2File = new HashMap<>();
1✔
608
    for (String baseDir : folders) {
1✔
609
      File fileFolder = fsFactory.getFile(baseDir + File.separator + databaseName, dataRegionId);
1✔
610
      if (!fileFolder.exists()) {
1✔
611
        continue;
1✔
612
      }
613
      // some TsFileResource may be being persisted when the system crashed, try recovering such
614
      // resources
615
      continueFailedRenames(fileFolder, TEMP_SUFFIX);
1✔
616

617
      File[] subFiles = fileFolder.listFiles();
1✔
618
      if (subFiles != null) {
1✔
619
        for (File partitionFolder : subFiles) {
1✔
620
          if (!partitionFolder.isDirectory()) {
1✔
621
            logger.warn("{} is not a directory.", partitionFolder.getAbsolutePath());
1✔
622
          } else {
623
            // some TsFileResource may be being persisted when the system crashed, try recovering
624
            // such resources
625
            continueFailedRenames(partitionFolder, TEMP_SUFFIX);
1✔
626
            String partitionName = partitionFolder.getName();
1✔
627
            File[] tsFilesInThisFolder =
1✔
628
                fsFactory.listFilesBySuffix(partitionFolder.getAbsolutePath(), TSFILE_SUFFIX);
1✔
629
            for (File f : tsFilesInThisFolder) {
1✔
630
              String tsFilePartitionPath = partitionName + File.separator + f.getName();
1✔
631
              tsFilePartitionPath2File.put(tsFilePartitionPath, f);
1✔
632
            }
633
          }
634
        }
635
      }
636
    }
1✔
637

638
    List<File> sortedFiles = new ArrayList<>(tsFilePartitionPath2File.values());
1✔
639
    sortedFiles.sort(this::compareFileName);
1✔
640

641
    long currentTime = System.currentTimeMillis();
1✔
642
    List<TsFileResource> ret = new ArrayList<>();
1✔
643
    for (File f : sortedFiles) {
1✔
644
      checkTsFileTime(f, currentTime);
1✔
645
      ret.add(new TsFileResource(f));
1✔
646
    }
1✔
647
    return ret;
1✔
648
  }
649

650
  private void continueFailedRenames(File fileFolder, String suffix) throws IOException {
651
    File[] files = fsFactory.listFilesBySuffix(fileFolder.getAbsolutePath(), suffix);
1✔
652
    if (files != null) {
1✔
653
      for (File tempResource : files) {
1✔
654
        File originResource = fsFactory.getFile(tempResource.getPath().replace(suffix, ""));
×
655
        if (originResource.exists()) {
×
656
          Files.delete(tempResource.toPath());
×
657
        } else {
658
          Files.move(tempResource.toPath(), originResource.toPath());
×
659
        }
660
      }
661
    }
662
  }
1✔
663

664
  /** check if the tsfile's time is smaller than system current time. */
665
  private void checkTsFileTime(File tsFile, long currentTime) throws DataRegionException {
666
    String[] items = tsFile.getName().replace(TSFILE_SUFFIX, "").split(FILE_NAME_SEPARATOR);
1✔
667
    long fileTime = Long.parseLong(items[0]);
1✔
668
    if (fileTime > currentTime) {
1✔
669
      throw new DataRegionException(
×
670
          String.format(
×
671
              "data region %s[%s] is down, because the time of tsfile %s is larger than system current time, "
672
                  + "file time is %d while system current time is %d, please check it.",
673
              databaseName, dataRegionId, tsFile.getAbsolutePath(), fileTime, currentTime));
×
674
    }
675
  }
1✔
676

677
  /** submit unsealed TsFile to WALRecoverManager. */
678
  private WALRecoverListener recoverUnsealedTsFile(
679
      TsFileResource unsealedTsFile, DataRegionRecoveryContext context, boolean isSeq) {
680
    UnsealedTsFileRecoverPerformer recoverPerformer =
×
681
        new UnsealedTsFileRecoverPerformer(unsealedTsFile, isSeq, context.recoverPerformers::add);
×
682
    // remember to close UnsealedTsFileRecoverPerformer
683
    return WALRecoverManager.getInstance().addRecoverPerformer(recoverPerformer);
×
684
  }
685

686
  private void recoverUnsealedTsFileCallBack(UnsealedTsFileRecoverPerformer recoverPerformer) {
687
    try {
688
      TsFileResource tsFileResource = recoverPerformer.getTsFileResource();
×
689
      boolean isSeq = recoverPerformer.isSequence();
×
690
      if (!recoverPerformer.canWrite()) {
×
691
        // cannot write, just close it
692
        try {
693
          tsFileResource.close();
×
694
        } catch (IOException e) {
×
695
          logger.error("Fail to close TsFile {} when recovering", tsFileResource.getTsFile(), e);
×
696
        }
×
697
        updateLastFlushTime(tsFileResource, isSeq);
×
698
        tsFileResourceManager.registerSealedTsFileResource(tsFileResource);
×
699
        FileMetrics.getInstance()
×
700
            .addFile(
×
701
                tsFileResource.getTsFile().length(),
×
702
                recoverPerformer.isSequence(),
×
703
                tsFileResource.getTsFile().getName());
×
704
      } else {
705
        // the last file is not closed, continue writing to it
706
        RestorableTsFileIOWriter writer = recoverPerformer.getWriter();
×
707
        long timePartitionId = tsFileResource.getTimePartition();
×
708
        TimePartitionManager.getInstance()
×
709
            .updateAfterOpeningTsFileProcessor(
×
710
                new DataRegionId(Integer.parseInt(dataRegionId)), timePartitionId);
×
711
        TsFileProcessor tsFileProcessor =
×
712
            new TsFileProcessor(
713
                dataRegionId,
714
                dataRegionInfo,
715
                tsFileResource,
716
                this::closeUnsealedTsFileProcessorCallBack,
717
                isSeq ? this::sequenceFlushCallback : this::unsequenceFlushCallback,
×
718
                isSeq,
719
                writer);
720
        if (isSeq) {
×
721
          workSequenceTsFileProcessors.put(timePartitionId, tsFileProcessor);
×
722
        } else {
723
          workUnsequenceTsFileProcessors.put(timePartitionId, tsFileProcessor);
×
724
        }
725
        tsFileResource.setProcessor(tsFileProcessor);
×
726
        tsFileResource.removeResourceFile();
×
727
        tsFileProcessor.setTimeRangeId(timePartitionId);
×
728
        writer.makeMetadataVisible();
×
729
        if (enableMemControl) {
×
730
          TsFileProcessorInfo tsFileProcessorInfo = new TsFileProcessorInfo(dataRegionInfo);
×
731
          tsFileProcessor.setTsFileProcessorInfo(tsFileProcessorInfo);
×
732
          this.dataRegionInfo.initTsFileProcessorInfo(tsFileProcessor);
×
733
          // get chunkMetadata size
734
          long chunkMetadataSize = 0;
×
735
          for (Map<String, List<ChunkMetadata>> metaMap : writer.getMetadatasForQuery().values()) {
×
736
            for (List<ChunkMetadata> metadatas : metaMap.values()) {
×
737
              for (ChunkMetadata chunkMetadata : metadatas) {
×
738
                chunkMetadataSize += chunkMetadata.calculateRamSize();
×
739
              }
×
740
            }
×
741
          }
×
742
          tsFileProcessorInfo.addTSPMemCost(chunkMetadataSize);
×
743
        }
744
      }
745
      tsFileManager.add(tsFileResource, recoverPerformer.isSequence());
×
746
    } catch (Throwable e) {
×
747
      logger.error(
×
748
          "Fail to recover unsealed TsFile {}, skip it.",
749
          recoverPerformer.getTsFileAbsolutePath(),
×
750
          e);
751
    }
×
752
  }
×
753

754
  /** recover sealed TsFile. */
755
  private void recoverSealedTsFiles(
756
      TsFileResource sealedTsFile, DataRegionRecoveryContext context, boolean isSeq) {
757
    try (SealedTsFileRecoverPerformer recoverPerformer =
1✔
758
        new SealedTsFileRecoverPerformer(sealedTsFile)) {
759
      recoverPerformer.recover();
1✔
760
      // pick up crashed compaction target files
761
      if (recoverPerformer.hasCrashed()) {
1✔
762
        if (TsFileResource.getInnerCompactionCount(sealedTsFile.getTsFile().getName()) > 0) {
×
763
          tsFileManager.addForRecover(sealedTsFile, isSeq);
×
764
          return;
×
765
        } else {
766
          logger.warn(
×
767
              "Sealed TsFile {} has crashed at zero level, truncate and recover it.",
768
              sealedTsFile.getTsFilePath());
×
769
        }
770
      }
771
      sealedTsFile.close();
1✔
772
      tsFileManager.add(sealedTsFile, isSeq);
1✔
773
      tsFileResourceManager.registerSealedTsFileResource(sealedTsFile);
1✔
774
    } catch (Throwable e) {
×
775
      logger.error("Fail to recover sealed TsFile {}, skip it.", sealedTsFile.getTsFilePath(), e);
×
776
    } finally {
777
      // update recovery context
778
      context.incrementRecoveredFilesNum();
1✔
779
    }
780
  }
1✔
781

782
  private void recoverFilesInPartition(
783
      long partitionId,
784
      DataRegionRecoveryContext context,
785
      List<TsFileResource> resourceList,
786
      boolean isSeq,
787
      boolean isLatestPartition) {
788
    for (TsFileResource tsFileResource : resourceList) {
1✔
789
      recoverSealedTsFiles(tsFileResource, context, isSeq);
1✔
790
    }
1✔
791
    if (isLatestPartition && isSeq) {
1✔
792
      lastFlushTimeMap.checkAndCreateFlushedTimePartition(partitionId);
1✔
793
      for (TsFileResource tsFileResource : resourceList) {
1✔
794
        updateLastFlushTime(tsFileResource, true);
1✔
795
      }
1✔
796
      TimePartitionManager.getInstance()
1✔
797
          .registerTimePartitionInfo(
1✔
798
              new TimePartitionInfo(
799
                  new DataRegionId(Integer.parseInt(dataRegionId)),
1✔
800
                  partitionId,
801
                  false,
802
                  Long.MAX_VALUE,
803
                  lastFlushTimeMap.getMemSize(partitionId),
1✔
804
                  true));
805
    }
806
  }
1✔
807

808
  // ({systemTime}-{versionNum}-{mergeNum}.tsfile)
809
  private int compareFileName(File o1, File o2) {
810
    String[] items1 = o1.getName().replace(TSFILE_SUFFIX, "").split(FILE_NAME_SEPARATOR);
1✔
811
    String[] items2 = o2.getName().replace(TSFILE_SUFFIX, "").split(FILE_NAME_SEPARATOR);
1✔
812
    long ver1 = Long.parseLong(items1[0]);
1✔
813
    long ver2 = Long.parseLong(items2[0]);
1✔
814
    int cmp = Long.compare(ver1, ver2);
1✔
815
    if (cmp == 0) {
1✔
816
      return Long.compare(Long.parseLong(items1[1]), Long.parseLong(items2[1]));
×
817
    } else {
818
      return cmp;
1✔
819
    }
820
  }
821

822
  /**
823
   * insert one row of data.
824
   *
825
   * @param insertRowNode one row of data
826
   */
827
  public void insert(InsertRowNode insertRowNode) throws WriteProcessException {
828
    // reject insertions that are out of ttl
829
    if (!isAlive(insertRowNode.getTime())) {
1✔
830
      throw new OutOfTTLException(insertRowNode.getTime(), (DateTimeUtils.currentTime() - dataTTL));
1✔
831
    }
832
    if (enableMemControl) {
1✔
833
      StorageEngine.blockInsertionIfReject(null);
1✔
834
    }
835
    long startTime = System.nanoTime();
1✔
836
    writeLock("InsertRow");
1✔
837
    PERFORMANCE_OVERVIEW_METRICS.recordScheduleLockCost(System.nanoTime() - startTime);
1✔
838
    try {
839
      if (deleted) {
1✔
840
        return;
×
841
      }
842
      // init map
843
      long timePartitionId = StorageEngine.getTimePartition(insertRowNode.getTime());
1✔
844

845
      if (!lastFlushTimeMap.checkAndCreateFlushedTimePartition(timePartitionId)) {
1✔
846
        TimePartitionManager.getInstance()
1✔
847
            .registerTimePartitionInfo(
1✔
848
                new TimePartitionInfo(
849
                    new DataRegionId(Integer.parseInt(dataRegionId)),
1✔
850
                    timePartitionId,
851
                    true,
852
                    Long.MAX_VALUE,
853
                    0,
854
                    tsFileManager.isLatestTimePartition(timePartitionId)));
1✔
855
      }
856

857
      boolean isSequence =
1✔
858
          insertRowNode.getTime()
1✔
859
              > lastFlushTimeMap.getFlushedTime(
1✔
860
                  timePartitionId, insertRowNode.getDevicePath().getFullPath());
1✔
861

862
      // is unsequence and user set config to discard out of order data
863
      if (!isSequence
1✔
864
          && IoTDBDescriptor.getInstance().getConfig().isEnableDiscardOutOfOrderData()) {
1✔
865
        return;
1✔
866
      }
867

868
      // insert to sequence or unSequence file
869
      insertToTsFileProcessor(insertRowNode, isSequence, timePartitionId);
1✔
870
    } finally {
871
      writeUnlock();
1✔
872
    }
873
  }
1✔
874

875
  /**
876
   * Insert a tablet (rows belonging to the same devices) into this database.
877
   *
878
   * @throws BatchProcessException if some of the rows failed to be inserted
879
   */
880
  @SuppressWarnings({"squid:S3776", "squid:S6541"}) // Suppress high Cognitive Complexity warning
881
  public void insertTablet(InsertTabletNode insertTabletNode)
882
      throws BatchProcessException, WriteProcessException {
883
    if (enableMemControl) {
1✔
884
      StorageEngine.blockInsertionIfReject(null);
1✔
885
    }
886
    long startTime = System.nanoTime();
1✔
887
    writeLock("insertTablet");
1✔
888
    PERFORMANCE_OVERVIEW_METRICS.recordScheduleLockCost(System.nanoTime() - startTime);
1✔
889
    try {
890
      if (deleted) {
1✔
891
        return;
×
892
      }
893
      TSStatus[] results = new TSStatus[insertTabletNode.getRowCount()];
1✔
894
      Arrays.fill(results, RpcUtils.SUCCESS_STATUS);
1✔
895
      boolean noFailure = true;
1✔
896

897
      /*
898
       * assume that batch has been sorted by client
899
       */
900
      int loc = 0;
1✔
901
      while (loc < insertTabletNode.getRowCount()) {
1✔
902
        long currTime = insertTabletNode.getTimes()[loc];
1✔
903
        // skip points that do not satisfy TTL
904
        if (!isAlive(currTime)) {
1✔
905
          results[loc] =
×
906
              RpcUtils.getStatus(
×
907
                  TSStatusCode.OUT_OF_TTL,
908
                  String.format(
×
909
                      "Insertion time [%s] is less than ttl time bound [%s]",
910
                      DateTimeUtils.convertLongToDate(currTime),
×
911
                      DateTimeUtils.convertLongToDate(DateTimeUtils.currentTime() - dataTTL)));
×
912
          loc++;
×
913
          noFailure = false;
×
914
        } else {
915
          break;
916
        }
917
      }
×
918
      // loc pointing at first legal position
919
      if (loc == insertTabletNode.getRowCount()) {
1✔
920
        throw new OutOfTTLException(
×
921
            insertTabletNode.getTimes()[insertTabletNode.getTimes().length - 1],
×
922
            (DateTimeUtils.currentTime() - dataTTL));
×
923
      }
924
      // before is first start point
925
      int before = loc;
1✔
926
      // before time partition
927
      long beforeTimePartition =
1✔
928
          StorageEngine.getTimePartition(insertTabletNode.getTimes()[before]);
1✔
929
      // init map
930

931
      if (!lastFlushTimeMap.checkAndCreateFlushedTimePartition(beforeTimePartition)) {
1✔
932
        TimePartitionManager.getInstance()
1✔
933
            .registerTimePartitionInfo(
1✔
934
                new TimePartitionInfo(
935
                    new DataRegionId(Integer.parseInt(dataRegionId)),
1✔
936
                    beforeTimePartition,
937
                    true,
938
                    Long.MAX_VALUE,
939
                    0,
940
                    tsFileManager.isLatestTimePartition(beforeTimePartition)));
1✔
941
      }
942

943
      long lastFlushTime =
1✔
944
          lastFlushTimeMap.getFlushedTime(
1✔
945
              beforeTimePartition, insertTabletNode.getDevicePath().getFullPath());
1✔
946

947
      // if is sequence
948
      boolean isSequence = false;
1✔
949
      while (loc < insertTabletNode.getRowCount()) {
1✔
950
        long time = insertTabletNode.getTimes()[loc];
1✔
951
        // always in some time partition
952
        // judge if we should insert sequence
953
        if (!isSequence && time > lastFlushTime) {
1✔
954
          // insert into unsequence and then start sequence
955
          if (!IoTDBDescriptor.getInstance().getConfig().isEnableDiscardOutOfOrderData()) {
1✔
956
            noFailure =
1✔
957
                insertTabletToTsFileProcessor(
1✔
958
                        insertTabletNode, before, loc, false, results, beforeTimePartition)
959
                    && noFailure;
960
          }
961
          before = loc;
1✔
962
          isSequence = true;
1✔
963
        }
964
        loc++;
1✔
965
      }
1✔
966

967
      // do not forget last part
968
      if (before < loc
1✔
969
          && (isSequence
970
              || !IoTDBDescriptor.getInstance().getConfig().isEnableDiscardOutOfOrderData())) {
×
971
        noFailure =
1✔
972
            insertTabletToTsFileProcessor(
1✔
973
                    insertTabletNode, before, loc, isSequence, results, beforeTimePartition)
974
                && noFailure;
975
      }
976
      long globalLatestFlushedTime =
1✔
977
          lastFlushTimeMap.getGlobalFlushedTime(insertTabletNode.getDevicePath().getFullPath());
1✔
978
      startTime = System.nanoTime();
1✔
979
      tryToUpdateBatchInsertLastCache(insertTabletNode, globalLatestFlushedTime);
1✔
980
      PERFORMANCE_OVERVIEW_METRICS.recordScheduleUpdateLastCacheCost(System.nanoTime() - startTime);
1✔
981

982
      if (!noFailure) {
1✔
983
        throw new BatchProcessException(results);
×
984
      }
985
    } finally {
986
      writeUnlock();
1✔
987
    }
988
  }
1✔
989

990
  /**
991
   * Check whether the time falls in TTL.
992
   *
993
   * @return whether the given time falls in ttl
994
   */
995
  private boolean isAlive(long time) {
996
    return dataTTL == Long.MAX_VALUE || (DateTimeUtils.currentTime() - time) <= dataTTL;
1✔
997
  }
998

999
  /**
1000
   * insert batch to tsfile processor thread-safety that the caller need to guarantee The rows to be
1001
   * inserted are in the range [start, end) Null value in each column values will be replaced by the
1002
   * subsequent non-null value, e.g., {1, null, 3, null, 5} will be {1, 3, 5, null, 5}
1003
   *
1004
   * @param insertTabletNode insert a tablet of a device
1005
   * @param sequence whether is sequence
1006
   * @param start start index of rows to be inserted in insertTabletPlan
1007
   * @param end end index of rows to be inserted in insertTabletPlan
1008
   * @param results result array
1009
   * @param timePartitionId time partition id
1010
   * @return false if any failure occurs when inserting the tablet, true otherwise
1011
   */
1012
  private boolean insertTabletToTsFileProcessor(
1013
      InsertTabletNode insertTabletNode,
1014
      int start,
1015
      int end,
1016
      boolean sequence,
1017
      TSStatus[] results,
1018
      long timePartitionId) {
1019
    // return when start >= end
1020
    if (start >= end) {
1✔
1021
      return true;
1✔
1022
    }
1023

1024
    TsFileProcessor tsFileProcessor = getOrCreateTsFileProcessor(timePartitionId, sequence);
1✔
1025
    if (tsFileProcessor == null) {
1✔
1026
      for (int i = start; i < end; i++) {
×
1027
        results[i] =
×
1028
            RpcUtils.getStatus(
×
1029
                TSStatusCode.INTERNAL_SERVER_ERROR,
1030
                "can not create TsFileProcessor, timePartitionId: " + timePartitionId);
1031
      }
1032
      return false;
×
1033
    }
1034

1035
    try {
1036
      tsFileProcessor.insertTablet(insertTabletNode, start, end, results);
1✔
1037
    } catch (WriteProcessRejectException e) {
×
1038
      logger.warn("insert to TsFileProcessor rejected, {}", e.getMessage());
×
1039
      return false;
×
1040
    } catch (WriteProcessException e) {
×
1041
      logger.error("insert to TsFileProcessor error ", e);
×
1042
      return false;
×
1043
    }
1✔
1044

1045
    // check memtable size and may async try to flush the work memtable
1046
    if (tsFileProcessor.shouldFlush()) {
1✔
1047
      fileFlushPolicy.apply(this, tsFileProcessor, sequence);
×
1048
    }
1049
    return true;
1✔
1050
  }
1051

1052
  private void tryToUpdateBatchInsertLastCache(InsertTabletNode node, long latestFlushedTime) {
1053
    if (!CommonDescriptor.getInstance().getConfig().isLastCacheEnable()
1✔
1054
        || (config.getDataRegionConsensusProtocolClass().equals(ConsensusFactory.IOT_CONSENSUS)
1✔
1055
            && node.isSyncFromLeaderWhenUsingIoTConsensus())) {
×
1056
      // disable updating last cache on follower
1057
      return;
×
1058
    }
1059
    String[] measurements = node.getMeasurements();
1✔
1060
    MeasurementSchema[] measurementSchemas = node.getMeasurementSchemas();
1✔
1061
    String[] rawMeasurements = new String[measurements.length];
1✔
1062
    for (int i = 0; i < measurements.length; i++) {
1✔
1063
      if (measurementSchemas[i] != null) {
1✔
1064
        // get raw measurement rather than alias
1065
        rawMeasurements[i] = measurementSchemas[i].getMeasurementId();
1✔
1066
      } else {
1067
        rawMeasurements[i] = measurements[i];
×
1068
      }
1069
    }
1070
    DataNodeSchemaCache.getInstance()
1✔
1071
        .updateLastCache(
1✔
1072
            getDatabaseName(),
1✔
1073
            node.getDevicePath(),
1✔
1074
            rawMeasurements,
1075
            node.getMeasurementSchemas(),
1✔
1076
            node.isAligned(),
1✔
1077
            node::composeLastTimeValuePair,
1✔
1078
            index -> node.getColumns()[index] != null,
1✔
1079
            true,
1080
            latestFlushedTime);
1✔
1081
  }
1✔
1082

1083
  private void insertToTsFileProcessor(
1084
      InsertRowNode insertRowNode, boolean sequence, long timePartitionId)
1085
      throws WriteProcessException {
1086
    TsFileProcessor tsFileProcessor = getOrCreateTsFileProcessor(timePartitionId, sequence);
1✔
1087
    if (tsFileProcessor == null) {
1✔
1088
      return;
×
1089
    }
1090

1091
    tsFileProcessor.insert(insertRowNode);
1✔
1092

1093
    long globalLatestFlushTime =
1✔
1094
        lastFlushTimeMap.getGlobalFlushedTime(insertRowNode.getDevicePath().getFullPath());
1✔
1095

1096
    long startTime = System.nanoTime();
1✔
1097
    tryToUpdateInsertLastCache(insertRowNode, globalLatestFlushTime);
1✔
1098
    PERFORMANCE_OVERVIEW_METRICS.recordScheduleUpdateLastCacheCost(System.nanoTime() - startTime);
1✔
1099

1100
    // check memtable size and may asyncTryToFlush the work memtable
1101
    if (tsFileProcessor.shouldFlush()) {
1✔
1102
      fileFlushPolicy.apply(this, tsFileProcessor, sequence);
×
1103
    }
1104
  }
1✔
1105

1106
  private void tryToUpdateInsertLastCache(InsertRowNode node, long latestFlushedTime) {
1107
    if (!CommonDescriptor.getInstance().getConfig().isLastCacheEnable()
1✔
1108
        || (config.getDataRegionConsensusProtocolClass().equals(ConsensusFactory.IOT_CONSENSUS)
1✔
1109
            && node.isSyncFromLeaderWhenUsingIoTConsensus())) {
×
1110
      // disable updating last cache on follower
1111
      return;
×
1112
    }
1113
    String[] measurements = node.getMeasurements();
1✔
1114
    MeasurementSchema[] measurementSchemas = node.getMeasurementSchemas();
1✔
1115
    String[] rawMeasurements = new String[measurements.length];
1✔
1116
    for (int i = 0; i < measurements.length; i++) {
1✔
1117
      if (measurementSchemas[i] != null) {
1✔
1118
        // get raw measurement rather than alias
1119
        rawMeasurements[i] = measurementSchemas[i].getMeasurementId();
1✔
1120
      } else {
1121
        rawMeasurements[i] = measurements[i];
×
1122
      }
1123
    }
1124
    DataNodeSchemaCache.getInstance()
1✔
1125
        .updateLastCache(
1✔
1126
            getDatabaseName(),
1✔
1127
            node.getDevicePath(),
1✔
1128
            rawMeasurements,
1129
            node.getMeasurementSchemas(),
1✔
1130
            node.isAligned(),
1✔
1131
            node::composeTimeValuePair,
1✔
1132
            index -> node.getValues()[index] != null,
1✔
1133
            true,
1134
            latestFlushedTime);
1✔
1135
  }
1✔
1136

1137
  /**
1138
   * WAL module uses this method to flush memTable
1139
   *
1140
   * @return True if flush task is submitted successfully
1141
   */
1142
  public boolean submitAFlushTask(long timeRangeId, boolean sequence, IMemTable memTable) {
1143
    writeLock("submitAFlushTask");
×
1144
    try {
1145
      if (memTable.getFlushStatus() != FlushStatus.WORKING) {
×
1146
        return false;
×
1147
      }
1148

1149
      TsFileProcessor tsFileProcessor;
1150
      if (sequence) {
×
1151
        tsFileProcessor = workSequenceTsFileProcessors.get(timeRangeId);
×
1152
      } else {
1153
        tsFileProcessor = workUnsequenceTsFileProcessors.get(timeRangeId);
×
1154
      }
1155
      // only submit when tsFileProcessor exists and memTables are same
1156
      boolean shouldSubmit =
×
1157
          tsFileProcessor != null && tsFileProcessor.getWorkMemTable() == memTable;
×
1158
      if (shouldSubmit) {
×
1159
        fileFlushPolicy.apply(this, tsFileProcessor, tsFileProcessor.isSequence());
×
1160
      }
1161
      return shouldSubmit;
×
1162
    } finally {
1163
      writeUnlock();
×
1164
    }
1165
  }
1166

1167
  /**
1168
   * mem control module uses this method to flush memTable
1169
   *
1170
   * @param tsFileProcessor tsfile processor in which memTable to be flushed
1171
   */
1172
  public void submitAFlushTaskWhenShouldFlush(TsFileProcessor tsFileProcessor) {
1173
    writeLock("submitAFlushTaskWhenShouldFlush");
×
1174
    try {
1175
      // check memtable size and may asyncTryToFlush the work memtable
1176
      if (tsFileProcessor.shouldFlush()) {
×
1177
        fileFlushPolicy.apply(this, tsFileProcessor, tsFileProcessor.isSequence());
×
1178
      }
1179
    } finally {
1180
      writeUnlock();
×
1181
    }
1182
  }
×
1183

1184
  private TsFileProcessor getOrCreateTsFileProcessor(long timeRangeId, boolean sequence) {
1185
    TsFileProcessor tsFileProcessor = null;
1✔
1186
    int retryCnt = 0;
1✔
1187
    do {
1188
      try {
1189
        if (IoTDBDescriptor.getInstance().getConfig().isQuotaEnable()) {
1✔
1190
          if (!DataNodeSpaceQuotaManager.getInstance().checkRegionDisk(databaseName)) {
×
1191
            throw new ExceedQuotaException(
×
1192
                "Unable to continue writing data, because the space allocated to the database "
1193
                    + databaseName
1194
                    + " has already used the upper limit",
1195
                TSStatusCode.SPACE_QUOTA_EXCEEDED.getStatusCode());
×
1196
          }
1197
        }
1198
        if (sequence) {
1✔
1199
          tsFileProcessor =
1✔
1200
              getOrCreateTsFileProcessorIntern(timeRangeId, workSequenceTsFileProcessors, true);
1✔
1201
        } else {
1202
          tsFileProcessor =
1✔
1203
              getOrCreateTsFileProcessorIntern(timeRangeId, workUnsequenceTsFileProcessors, false);
1✔
1204
        }
1205
      } catch (DiskSpaceInsufficientException e) {
×
1206
        logger.error(
×
1207
            "disk space is insufficient when creating TsFile processor, change system mode to read-only",
1208
            e);
1209
        CommonDescriptor.getInstance().getConfig().setNodeStatus(NodeStatus.ReadOnly);
×
1210
        break;
×
1211
      } catch (IOException e) {
×
1212
        if (retryCnt < 3) {
×
1213
          logger.warn("meet IOException when creating TsFileProcessor, retry it again", e);
×
1214
          retryCnt++;
×
1215
        } else {
1216
          logger.error(
×
1217
              "meet IOException when creating TsFileProcessor, change system mode to error", e);
1218
          CommonDescriptor.getInstance().getConfig().handleUnrecoverableError();
×
1219
          break;
×
1220
        }
1221
      } catch (ExceedQuotaException e) {
×
1222
        logger.error(e.getMessage());
×
1223
        break;
×
1224
      }
1✔
1225
    } while (tsFileProcessor == null);
1✔
1226
    return tsFileProcessor;
1✔
1227
  }
1228

1229
  /**
1230
   * get processor from hashmap, flush oldest processor if necessary
1231
   *
1232
   * @param timeRangeId time partition range
1233
   * @param tsFileProcessorTreeMap tsFileProcessorTreeMap
1234
   * @param sequence whether is sequence or not
1235
   */
1236
  private TsFileProcessor getOrCreateTsFileProcessorIntern(
1237
      long timeRangeId, TreeMap<Long, TsFileProcessor> tsFileProcessorTreeMap, boolean sequence)
1238
      throws IOException, DiskSpaceInsufficientException {
1239

1240
    TsFileProcessor res = tsFileProcessorTreeMap.get(timeRangeId);
1✔
1241

1242
    if (null == res) {
1✔
1243
      // build new processor, memory control module will control the number of memtables
1244
      TimePartitionManager.getInstance()
1✔
1245
          .updateAfterOpeningTsFileProcessor(
1✔
1246
              new DataRegionId(Integer.valueOf(dataRegionId)), timeRangeId);
1✔
1247
      res = newTsFileProcessor(sequence, timeRangeId);
1✔
1248
      tsFileProcessorTreeMap.put(timeRangeId, res);
1✔
1249
      tsFileManager.add(res.getTsFileResource(), sequence);
1✔
1250
    }
1251

1252
    return res;
1✔
1253
  }
1254

1255
  private TsFileProcessor newTsFileProcessor(boolean sequence, long timePartitionId)
1256
      throws IOException, DiskSpaceInsufficientException {
1257

1258
    long version = partitionMaxFileVersions.getOrDefault(timePartitionId, 0L) + 1;
1✔
1259
    partitionMaxFileVersions.put(timePartitionId, version);
1✔
1260
    String filePath =
1✔
1261
        TsFileNameGenerator.generateNewTsFilePathWithMkdir(
1✔
1262
            sequence,
1263
            databaseName,
1264
            dataRegionId,
1265
            timePartitionId,
1266
            System.currentTimeMillis(),
1✔
1267
            version,
1268
            0,
1269
            0);
1270

1271
    return getTsFileProcessor(sequence, filePath, timePartitionId);
1✔
1272
  }
1273

1274
  private TsFileProcessor getTsFileProcessor(
1275
      boolean sequence, String filePath, long timePartitionId) throws IOException {
1276
    TsFileProcessor tsFileProcessor;
1277
    if (sequence) {
1✔
1278
      tsFileProcessor =
1✔
1279
          new TsFileProcessor(
1280
              databaseName + FILE_NAME_SEPARATOR + dataRegionId,
1281
              fsFactory.getFileWithParent(filePath),
1✔
1282
              dataRegionInfo,
1283
              this::closeUnsealedTsFileProcessorCallBack,
1284
              this::sequenceFlushCallback,
1285
              true);
1286
    } else {
1287
      tsFileProcessor =
1✔
1288
          new TsFileProcessor(
1289
              databaseName + FILE_NAME_SEPARATOR + dataRegionId,
1290
              fsFactory.getFileWithParent(filePath),
1✔
1291
              dataRegionInfo,
1292
              this::closeUnsealedTsFileProcessorCallBack,
1293
              this::unsequenceFlushCallback,
1294
              false);
1295
    }
1296

1297
    if (enableMemControl) {
1✔
1298
      TsFileProcessorInfo tsFileProcessorInfo = new TsFileProcessorInfo(dataRegionInfo);
1✔
1299
      tsFileProcessor.setTsFileProcessorInfo(tsFileProcessorInfo);
1✔
1300
      this.dataRegionInfo.initTsFileProcessorInfo(tsFileProcessor);
1✔
1301
    }
1302

1303
    tsFileProcessor.addCloseFileListeners(customCloseFileListeners);
1✔
1304
    tsFileProcessor.addFlushListeners(customFlushListeners);
1✔
1305
    tsFileProcessor.setTimeRangeId(timePartitionId);
1✔
1306

1307
    return tsFileProcessor;
1✔
1308
  }
1309

1310
  /**
1311
   * Create a new tsfile name
1312
   *
1313
   * @return file name
1314
   */
1315
  private String getNewTsFileName(long timePartitionId) {
1316
    long version = partitionMaxFileVersions.getOrDefault(timePartitionId, 0L) + 1;
×
1317
    partitionMaxFileVersions.put(timePartitionId, version);
×
1318
    return getNewTsFileName(System.currentTimeMillis(), version, 0, 0);
×
1319
  }
1320

1321
  private String getNewTsFileName(long time, long version, int mergeCnt, int unseqCompactionCnt) {
1322
    return TsFileNameGenerator.generateNewTsFileName(time, version, mergeCnt, unseqCompactionCnt);
×
1323
  }
1324

1325
  /**
1326
   * close one tsfile processor
1327
   *
1328
   * @param sequence whether this tsfile processor is sequence or not
1329
   * @param tsFileProcessor tsfile processor
1330
   */
1331
  public void syncCloseOneTsFileProcessor(boolean sequence, TsFileProcessor tsFileProcessor) {
1332
    synchronized (closeStorageGroupCondition) {
×
1333
      try {
1334
        asyncCloseOneTsFileProcessor(sequence, tsFileProcessor);
×
1335
        long startTime = System.currentTimeMillis();
×
1336
        while (closingSequenceTsFileProcessor.contains(tsFileProcessor)
×
1337
            || closingUnSequenceTsFileProcessor.contains(tsFileProcessor)) {
×
1338
          closeStorageGroupCondition.wait(60_000);
×
1339
          if (System.currentTimeMillis() - startTime > 60_000) {
×
1340
            logger.warn(
×
1341
                "{} has spent {}s to wait for closing one tsfile.",
1342
                databaseName + "-" + this.dataRegionId,
1343
                (System.currentTimeMillis() - startTime) / 1000);
×
1344
          }
1345
        }
1346
      } catch (InterruptedException e) {
×
1347
        Thread.currentThread().interrupt();
×
1348
        logger.error(
×
1349
            "syncCloseOneTsFileProcessor error occurs while waiting for closing the storage "
1350
                + "group {}",
1351
            databaseName + "-" + dataRegionId,
1352
            e);
1353
      }
×
1354
    }
×
1355
  }
×
1356

1357
  /**
1358
   * close one tsfile processor, thread-safety should be ensured by caller
1359
   *
1360
   * @param sequence whether this tsfile processor is sequence or not
1361
   * @param tsFileProcessor tsfile processor
1362
   */
1363
  public void asyncCloseOneTsFileProcessor(boolean sequence, TsFileProcessor tsFileProcessor) {
1364
    // for sequence tsfile, we update the endTimeMap only when the file is prepared to be closed.
1365
    // for unsequence tsfile, we have maintained the endTimeMap when an insertion comes.
1366
    if (closingSequenceTsFileProcessor.contains(tsFileProcessor)
1✔
1367
        || closingUnSequenceTsFileProcessor.contains(tsFileProcessor)
1✔
1368
        || tsFileProcessor.alreadyMarkedClosing()) {
1✔
1369
      return;
×
1370
    }
1371
    logger.info(
1✔
1372
        "Async close tsfile: {}",
1373
        tsFileProcessor.getTsFileResource().getTsFile().getAbsolutePath());
1✔
1374

1375
    if (sequence) {
1✔
1376
      closingSequenceTsFileProcessor.add(tsFileProcessor);
1✔
1377
      tsFileProcessor.asyncClose();
1✔
1378

1379
      workSequenceTsFileProcessors.remove(tsFileProcessor.getTimeRangeId());
1✔
1380
      // if unsequence files don't contain this time range id, we should remove it's version
1381
      // controller
1382
      if (!workUnsequenceTsFileProcessors.containsKey(tsFileProcessor.getTimeRangeId())) {
1✔
1383
        timePartitionIdVersionControllerMap.remove(tsFileProcessor.getTimeRangeId());
1✔
1384
      }
1385
      logger.info("close a sequence tsfile processor {}", databaseName + "-" + dataRegionId);
1✔
1386
    } else {
1387
      closingUnSequenceTsFileProcessor.add(tsFileProcessor);
1✔
1388
      tsFileProcessor.asyncClose();
1✔
1389

1390
      workUnsequenceTsFileProcessors.remove(tsFileProcessor.getTimeRangeId());
1✔
1391
      // if sequence files don't contain this time range id, we should remove it's version
1392
      // controller
1393
      if (!workSequenceTsFileProcessors.containsKey(tsFileProcessor.getTimeRangeId())) {
1✔
1394
        timePartitionIdVersionControllerMap.remove(tsFileProcessor.getTimeRangeId());
1✔
1395
      }
1396
    }
1397
  }
1✔
1398

1399
  /**
1400
   * delete the database's own folder in folder data/system/databases
1401
   *
1402
   * @param systemDir system dir
1403
   */
1404
  public void deleteFolder(String systemDir) {
1405
    logger.info(
1✔
1406
        "{} will close all files for deleting data folder {}",
1407
        databaseName + "-" + dataRegionId,
1408
        systemDir);
1409
    writeLock("deleteFolder");
1✔
1410
    try {
1411
      File dataRegionSystemFolder =
1✔
1412
          SystemFileFactory.INSTANCE.getFile(
1✔
1413
              systemDir + File.separator + databaseName, dataRegionId);
1414
      org.apache.iotdb.commons.utils.FileUtils.deleteDirectoryAndEmptyParent(
1✔
1415
          dataRegionSystemFolder);
1416
    } finally {
1417
      writeUnlock();
1✔
1418
    }
1419
  }
1✔
1420

1421
  /** close all tsfile resource */
1422
  public void closeAllResources() {
1423
    for (TsFileResource tsFileResource : tsFileManager.getTsFileList(false)) {
1✔
1424
      try {
1425
        tsFileResource.close();
1✔
1426
      } catch (IOException e) {
×
1427
        logger.error("Cannot close a TsFileResource {}", tsFileResource, e);
×
1428
      }
1✔
1429
    }
1✔
1430
    for (TsFileResource tsFileResource : tsFileManager.getTsFileList(true)) {
1✔
1431
      try {
1432
        tsFileResource.close();
1✔
1433
      } catch (IOException e) {
×
1434
        logger.error("Cannot close a TsFileResource {}", tsFileResource, e);
×
1435
      }
1✔
1436
    }
1✔
1437
  }
1✔
1438

1439
  /** delete tsfile */
1440
  public void syncDeleteDataFiles() {
1441
    logger.info(
1✔
1442
        "{} will close all files for deleting data files", databaseName + "-" + dataRegionId);
1443
    writeLock("syncDeleteDataFiles");
1✔
1444
    try {
1445

1446
      syncCloseAllWorkingTsFileProcessors();
1✔
1447
      // normally, mergingModification is just need to be closed by after a merge task is finished.
1448
      // we close it here just for IT test.
1449
      closeAllResources();
1✔
1450
      List<TsFileResource> tsFileResourceList = tsFileManager.getTsFileList(true);
1✔
1451
      tsFileResourceList.addAll(tsFileManager.getTsFileList(false));
1✔
1452
      tsFileResourceList.forEach(
1✔
1453
          x -> {
1454
            FileMetrics.getInstance()
1✔
1455
                .deleteFile(
1✔
1456
                    new long[] {x.getTsFileSize()},
1✔
1457
                    x.isSeq(),
1✔
1458
                    Collections.singletonList(x.getTsFile().getName()));
1✔
1459
            if (x.getModFile().exists()) {
1✔
1460
              FileMetrics.getInstance().decreaseModFileNum(1);
1✔
1461
              FileMetrics.getInstance().decreaseModFileSize(x.getModFile().getSize());
1✔
1462
            }
1463
          });
1✔
1464
      deleteAllSGFolders(TierManager.getInstance().getAllFilesFolders());
1✔
1465

1466
      this.workSequenceTsFileProcessors.clear();
1✔
1467
      this.workUnsequenceTsFileProcessors.clear();
1✔
1468
      this.tsFileManager.clear();
1✔
1469
      lastFlushTimeMap.clearFlushedTime();
1✔
1470
      lastFlushTimeMap.clearGlobalFlushedTime();
1✔
1471
    } finally {
1472
      writeUnlock();
1✔
1473
    }
1474
  }
1✔
1475

1476
  private void deleteAllSGFolders(List<String> folder) {
1477
    for (String tsfilePath : folder) {
1✔
1478
      File dataRegionDataFolder =
1✔
1479
          fsFactory.getFile(tsfilePath, databaseName + File.separator + dataRegionId);
1✔
1480
      if (FSUtils.getFSType(dataRegionDataFolder) != FSType.LOCAL) {
1✔
1481
        try {
1482
          fsFactory.deleteDirectory(dataRegionDataFolder.getPath());
×
1483
        } catch (IOException e) {
×
1484
          logger.error("Fail to delete data region folder {}", dataRegionDataFolder);
×
1485
        }
×
1486
      } else {
1487
        if (dataRegionDataFolder.exists()) {
1✔
1488
          org.apache.iotdb.commons.utils.FileUtils.deleteDirectoryAndEmptyParent(
1✔
1489
              dataRegionDataFolder);
1490
        }
1491
      }
1492
    }
1✔
1493
  }
1✔
1494

1495
  /** Iterate each TsFile and try to lock and remove those out of TTL. */
1496
  public synchronized void checkFilesTTL() {
1497
    if (dataTTL == Long.MAX_VALUE) {
1✔
1498
      logger.debug("{}: TTL not set, ignore the check", databaseName + "-" + dataRegionId);
×
1499
      return;
×
1500
    }
1501
    long ttlLowerBound = DateTimeUtils.currentTime() - dataTTL;
1✔
1502
    logger.debug(
1✔
1503
        "{}: TTL removing files before {}",
1504
        databaseName + "-" + dataRegionId,
1505
        new Date(ttlLowerBound));
1506

1507
    // copy to avoid concurrent modification of deletion
1508
    List<TsFileResource> seqFiles = new ArrayList<>(tsFileManager.getTsFileList(true));
1✔
1509
    List<TsFileResource> unseqFiles = new ArrayList<>(tsFileManager.getTsFileList(false));
1✔
1510

1511
    for (TsFileResource tsFileResource : seqFiles) {
1✔
1512
      checkFileTTL(tsFileResource, ttlLowerBound, true);
1✔
1513
    }
1✔
1514
    for (TsFileResource tsFileResource : unseqFiles) {
1✔
1515
      checkFileTTL(tsFileResource, ttlLowerBound, false);
1✔
1516
    }
1✔
1517
  }
1✔
1518

1519
  private void checkFileTTL(TsFileResource resource, long ttlLowerBound, boolean isSeq) {
1520
    if (!resource.isClosed() || !resource.isDeleted() && resource.stillLives(ttlLowerBound)) {
1✔
1521
      return;
×
1522
    }
1523
    // Try to set the resource to DELETED status and return if it failed
1524
    if (!resource.setStatus(TsFileResourceStatus.DELETED)) {
1✔
1525
      return;
×
1526
    }
1527
    tsFileManager.remove(resource, isSeq);
1✔
1528
    // ensure that the file is not used by any queries
1529
    resource.writeLock();
1✔
1530
    try {
1531
      // try to delete physical data file
1532
      resource.remove();
1✔
1533
      FileMetrics.getInstance()
1✔
1534
          .deleteFile(
1✔
1535
              new long[] {resource.getTsFileSize()},
1✔
1536
              isSeq,
1537
              Collections.singletonList(resource.getTsFile().getName()));
1✔
1538
      logger.info(
1✔
1539
          "Removed a file {} before {} by ttl ({} {})",
1540
          resource.getTsFilePath(),
1✔
1541
          new Date(ttlLowerBound),
1542
          dataTTL,
1✔
1543
          CommonDescriptor.getInstance().getConfig().getTimestampPrecision());
1✔
1544
    } finally {
1545
      resource.writeUnlock();
1✔
1546
    }
1547
  }
1✔
1548

1549
  public void timedFlushSeqMemTable() {
1550
    writeLock("timedFlushSeqMemTable");
1✔
1551
    try {
1552
      // only check sequence tsfiles' memtables
1553
      List<TsFileProcessor> tsFileProcessors =
1✔
1554
          new ArrayList<>(workSequenceTsFileProcessors.values());
1✔
1555
      long timeLowerBound = System.currentTimeMillis() - config.getSeqMemtableFlushInterval();
1✔
1556

1557
      for (TsFileProcessor tsFileProcessor : tsFileProcessors) {
1✔
1558
        if (tsFileProcessor.getWorkMemTableCreatedTime() < timeLowerBound) {
1✔
1559
          logger.info(
1✔
1560
              "Exceed sequence memtable flush interval, so flush working memtable of time partition {} in database {}[{}]",
1561
              tsFileProcessor.getTimeRangeId(),
1✔
1562
              databaseName,
1563
              dataRegionId);
1564
          fileFlushPolicy.apply(this, tsFileProcessor, tsFileProcessor.isSequence());
1✔
1565
        }
1566
      }
1✔
1567
    } finally {
1568
      writeUnlock();
1✔
1569
    }
1570
  }
1✔
1571

1572
  public void timedFlushUnseqMemTable() {
1573
    writeLock("timedFlushUnseqMemTable");
1✔
1574
    try {
1575
      // only check unsequence tsfiles' memtables
1576
      List<TsFileProcessor> tsFileProcessors =
1✔
1577
          new ArrayList<>(workUnsequenceTsFileProcessors.values());
1✔
1578
      long timeLowerBound = System.currentTimeMillis() - config.getUnseqMemtableFlushInterval();
1✔
1579

1580
      for (TsFileProcessor tsFileProcessor : tsFileProcessors) {
1✔
1581
        if (tsFileProcessor.getWorkMemTableCreatedTime() < timeLowerBound) {
1✔
1582
          logger.info(
1✔
1583
              "Exceed unsequence memtable flush interval, so flush working memtable of time partition {} in database {}[{}]",
1584
              tsFileProcessor.getTimeRangeId(),
1✔
1585
              databaseName,
1586
              dataRegionId);
1587
          fileFlushPolicy.apply(this, tsFileProcessor, tsFileProcessor.isSequence());
1✔
1588
        }
1589
      }
1✔
1590
    } finally {
1591
      writeUnlock();
1✔
1592
    }
1593
  }
1✔
1594

1595
  /** This method will be blocked until all tsfile processors are closed. */
1596
  public void syncCloseAllWorkingTsFileProcessors() {
1597
    synchronized (closeStorageGroupCondition) {
1✔
1598
      try {
1599
        asyncCloseAllWorkingTsFileProcessors();
1✔
1600
        long startTime = System.currentTimeMillis();
1✔
1601
        while (!closingSequenceTsFileProcessor.isEmpty()
1✔
1602
            || !closingUnSequenceTsFileProcessor.isEmpty()) {
1✔
1603
          closeStorageGroupCondition.wait(60_000);
1✔
1604
          if (System.currentTimeMillis() - startTime > 60_000) {
1✔
1605
            logger.warn(
×
1606
                "{} has spent {}s to wait for closing all TsFiles.",
1607
                databaseName + "-" + this.dataRegionId,
1608
                (System.currentTimeMillis() - startTime) / 1000);
×
1609
          }
1610
        }
1611
      } catch (InterruptedException e) {
×
1612
        logger.error(
×
1613
            "CloseFileNodeCondition error occurs while waiting for closing the storage "
1614
                + "group {}",
1615
            databaseName + "-" + dataRegionId,
1616
            e);
1617
        Thread.currentThread().interrupt();
×
1618
      }
1✔
1619
    }
1✔
1620
  }
1✔
1621

1622
  /** close all working tsfile processors */
1623
  public void asyncCloseAllWorkingTsFileProcessors() {
1624
    writeLock("asyncCloseAllWorkingTsFileProcessors");
1✔
1625
    try {
1626
      logger.info("async force close all files in database: {}", databaseName + "-" + dataRegionId);
1✔
1627
      // to avoid concurrent modification problem, we need a new array list
1628
      for (TsFileProcessor tsFileProcessor :
1629
          new ArrayList<>(workSequenceTsFileProcessors.values())) {
1✔
1630
        asyncCloseOneTsFileProcessor(true, tsFileProcessor);
1✔
1631
      }
1✔
1632
      // to avoid concurrent modification problem, we need a new array list
1633
      for (TsFileProcessor tsFileProcessor :
1634
          new ArrayList<>(workUnsequenceTsFileProcessors.values())) {
1✔
1635
        asyncCloseOneTsFileProcessor(false, tsFileProcessor);
1✔
1636
      }
1✔
1637
    } finally {
1638
      writeUnlock();
1✔
1639
    }
1640
  }
1✔
1641

1642
  /** force close all working tsfile processors */
1643
  public void forceCloseAllWorkingTsFileProcessors() throws TsFileProcessorException {
1644
    writeLock("forceCloseAllWorkingTsFileProcessors");
×
1645
    try {
1646
      logger.info("force close all processors in database: {}", databaseName + "-" + dataRegionId);
×
1647
      // to avoid concurrent modification problem, we need a new array list
1648
      for (TsFileProcessor tsFileProcessor :
1649
          new ArrayList<>(workSequenceTsFileProcessors.values())) {
×
1650
        tsFileProcessor.putMemTableBackAndClose();
×
1651
      }
×
1652
      // to avoid concurrent modification problem, we need a new array list
1653
      for (TsFileProcessor tsFileProcessor :
1654
          new ArrayList<>(workUnsequenceTsFileProcessors.values())) {
×
1655
        tsFileProcessor.putMemTableBackAndClose();
×
1656
      }
×
1657
    } finally {
1658
      writeUnlock();
×
1659
    }
1660
  }
×
1661

1662
  /** used for queryengine */
1663
  @Override
1664
  public QueryDataSource query(
1665
      List<PartialPath> pathList, String singleDeviceId, QueryContext context, Filter timeFilter)
1666
      throws QueryProcessException {
1667
    try {
1668
      List<TsFileResource> seqResources =
1✔
1669
          getFileResourceListForQuery(
1✔
1670
              tsFileManager.getTsFileList(true),
1✔
1671
              pathList,
1672
              singleDeviceId,
1673
              context,
1674
              timeFilter,
1675
              true);
1676
      List<TsFileResource> unseqResources =
1✔
1677
          getFileResourceListForQuery(
1✔
1678
              tsFileManager.getTsFileList(false),
1✔
1679
              pathList,
1680
              singleDeviceId,
1681
              context,
1682
              timeFilter,
1683
              false);
1684

1685
      QUERY_RESOURCE_METRIC_SET.recordQueryResourceNum(SEQUENCE_TSFILE, seqResources.size());
1✔
1686
      QUERY_RESOURCE_METRIC_SET.recordQueryResourceNum(UNSEQUENCE_TSFILE, unseqResources.size());
1✔
1687

1688
      QueryDataSource dataSource = new QueryDataSource(seqResources, unseqResources);
1✔
1689
      dataSource.setDataTTL(dataTTL);
1✔
1690
      return dataSource;
1✔
1691
    } catch (MetadataException e) {
×
1692
      throw new QueryProcessException(e);
×
1693
    }
1694
  }
1695

1696
  /** lock the read lock of the insert lock */
1697
  @Override
1698
  public void readLock() {
1699
    // apply read lock for SG insert lock to prevent inconsistent with concurrently writing memtable
1700
    insertLock.readLock().lock();
×
1701
    // apply read lock for TsFileResource list
1702
    tsFileManager.readLock();
×
1703
  }
×
1704

1705
  /** unlock the read lock of insert lock */
1706
  @Override
1707
  public void readUnlock() {
1708
    tsFileManager.readUnlock();
×
1709
    insertLock.readLock().unlock();
×
1710
  }
×
1711

1712
  /** lock the write lock of the insert lock */
1713
  public void writeLock(String holder) {
1714
    insertLock.writeLock().lock();
1✔
1715
    insertWriteLockHolder = holder;
1✔
1716
  }
1✔
1717

1718
  /** unlock the write lock of the insert lock */
1719
  public void writeUnlock() {
1720
    insertWriteLockHolder = "";
1✔
1721
    insertLock.writeLock().unlock();
1✔
1722
  }
1✔
1723

1724
  /**
1725
   * @param tsFileResources includes sealed and unsealed tsfile resources
1726
   * @return fill unsealed tsfile resources with memory data and ChunkMetadataList of data in disk
1727
   */
1728
  private List<TsFileResource> getFileResourceListForQuery(
1729
      Collection<TsFileResource> tsFileResources,
1730
      List<PartialPath> pathList,
1731
      String singleDeviceId,
1732
      QueryContext context,
1733
      Filter timeFilter,
1734
      boolean isSeq)
1735
      throws MetadataException {
1736

1737
    if (context.isDebug()) {
1✔
1738
      DEBUG_LOGGER.info(
×
1739
          "Path: {}, get tsfile list: {} isSeq: {} timefilter: {}",
1740
          pathList,
1741
          tsFileResources,
1742
          isSeq,
×
1743
          (timeFilter == null ? "null" : timeFilter));
×
1744
    }
1745

1746
    List<TsFileResource> tsfileResourcesForQuery = new ArrayList<>();
1✔
1747

1748
    long timeLowerBound =
1749
        dataTTL != Long.MAX_VALUE ? DateTimeUtils.currentTime() - dataTTL : Long.MIN_VALUE;
1✔
1750
    context.setQueryTimeLowerBound(timeLowerBound);
1✔
1751

1752
    for (TsFileResource tsFileResource : tsFileResources) {
1✔
1753
      if (!tsFileResource.isSatisfied(
1✔
1754
          singleDeviceId, timeFilter, isSeq, dataTTL, context.isDebug())) {
1✔
1755
        continue;
1✔
1756
      }
1757
      closeQueryLock.readLock().lock();
1✔
1758
      try {
1759
        if (tsFileResource.isClosed()) {
1✔
1760
          tsfileResourcesForQuery.add(tsFileResource);
1✔
1761
        } else {
1762
          tsFileResource.getProcessor().query(pathList, context, tsfileResourcesForQuery);
1✔
1763
        }
1764
      } catch (IOException e) {
×
1765
        throw new MetadataException(e);
×
1766
      } finally {
1767
        closeQueryLock.readLock().unlock();
1✔
1768
      }
1769
    }
1✔
1770
    return tsfileResourcesForQuery;
1✔
1771
  }
1772

1773
  /** Seperate tsfiles in TsFileManager to sealedList and unsealedList. */
1774
  private void separateTsFile(
1775
      List<TsFileResource> sealedResource, List<TsFileResource> unsealedResource) {
1776
    tsFileManager
1✔
1777
        .getTsFileList(true)
1✔
1778
        .forEach(
1✔
1779
            tsFileResource -> {
1780
              if (tsFileResource.isClosed()) {
1✔
1781
                sealedResource.add(tsFileResource);
1✔
1782
              } else {
1783
                unsealedResource.add(tsFileResource);
1✔
1784
              }
1785
            });
1✔
1786
    tsFileManager
1✔
1787
        .getTsFileList(false)
1✔
1788
        .forEach(
1✔
1789
            tsFileResource -> {
1790
              if (tsFileResource.isClosed()) {
1✔
1791
                sealedResource.add(tsFileResource);
1✔
1792
              } else {
1793
                unsealedResource.add(tsFileResource);
1✔
1794
              }
1795
            });
1✔
1796
  }
1✔
1797

1798
  /**
1799
   * @param pattern Must be a pattern start with a precise device path
1800
   * @param startTime
1801
   * @param endTime
1802
   * @param searchIndex
1803
   * @param timePartitionFilter
1804
   * @throws IOException
1805
   */
1806
  public void deleteByDevice(
1807
      PartialPath pattern,
1808
      long startTime,
1809
      long endTime,
1810
      long searchIndex,
1811
      TimePartitionFilter timePartitionFilter)
1812
      throws IOException {
1813
    if (SettleService.getINSTANCE().getFilesToBeSettledCount().get() != 0) {
1✔
1814
      throw new IOException(
×
1815
          "Delete failed. " + "Please do not delete until the old files settled.");
1816
    }
1817
    // TODO: how to avoid partial deletion?
1818
    // FIXME: notice that if we may remove a SGProcessor out of memory, we need to close all opened
1819
    // mod files in mergingModification, sequenceFileList, and unsequenceFileList
1820
    writeLock("delete");
1✔
1821

1822
    boolean hasReleasedLock = false;
1✔
1823

1824
    try {
1825

1826
      Set<PartialPath> devicePaths = new HashSet<>(pattern.getDevicePathPattern());
1✔
1827

1828
      // delete Last cache record if necessary
1829
      // todo implement more precise process
1830
      DataNodeSchemaCache.getInstance().takeWriteLock();
1✔
1831
      try {
1832
        DataNodeSchemaCache.getInstance().invalidateAll();
1✔
1833
      } finally {
1834
        DataNodeSchemaCache.getInstance().releaseWriteLock();
1✔
1835
      }
1836

1837
      // write log to impacted working TsFileProcessors
1838
      List<WALFlushListener> walListeners =
1✔
1839
          logDeletionInWAL(startTime, endTime, searchIndex, pattern, timePartitionFilter);
1✔
1840

1841
      for (WALFlushListener walFlushListener : walListeners) {
1✔
1842
        if (walFlushListener.waitForResult() == WALFlushListener.Status.FAILURE) {
1✔
1843
          logger.error("Fail to log delete to wal.", walFlushListener.getCause());
×
1844
          throw walFlushListener.getCause();
×
1845
        }
1846
      }
1✔
1847

1848
      Deletion deletion = new Deletion(pattern, MERGE_MOD_START_VERSION_NUM, startTime, endTime);
1✔
1849

1850
      List<TsFileResource> sealedTsFileResource = new ArrayList<>();
1✔
1851
      List<TsFileResource> unsealedTsFileResource = new ArrayList<>();
1✔
1852
      separateTsFile(sealedTsFileResource, unsealedTsFileResource);
1✔
1853
      // deviceMatchInfo is used for filter the matched deviceId in TsFileResource
1854
      // deviceMatchInfo contains the DeviceId means this device matched the pattern
1855
      Set<String> deviceMatchInfo = new HashSet<>();
1✔
1856
      deleteDataInFiles(
1✔
1857
          unsealedTsFileResource, deletion, devicePaths, timePartitionFilter, deviceMatchInfo);
1858
      writeUnlock();
1✔
1859
      hasReleasedLock = true;
1✔
1860

1861
      deleteDataInFiles(
1✔
1862
          sealedTsFileResource, deletion, devicePaths, timePartitionFilter, deviceMatchInfo);
1863

1864
    } catch (Exception e) {
×
1865
      throw new IOException(e);
×
1866
    } finally {
1867
      if (!hasReleasedLock) {
1✔
1868
        writeUnlock();
×
1869
      }
1870
    }
1871
  }
1✔
1872

1873
  private List<WALFlushListener> logDeletionInWAL(
1874
      long startTime,
1875
      long endTime,
1876
      long searchIndex,
1877
      PartialPath path,
1878
      TimePartitionFilter timePartitionFilter) {
1879
    long timePartitionStartId = StorageEngine.getTimePartition(startTime);
1✔
1880
    long timePartitionEndId = StorageEngine.getTimePartition(endTime);
1✔
1881
    List<WALFlushListener> walFlushListeners = new ArrayList<>();
1✔
1882
    if (config.getWalMode() == WALMode.DISABLE) {
1✔
1883
      return walFlushListeners;
×
1884
    }
1885
    DeleteDataNode deleteDataNode =
1✔
1886
        new DeleteDataNode(new PlanNodeId(""), Collections.singletonList(path), startTime, endTime);
1✔
1887
    deleteDataNode.setSearchIndex(searchIndex);
1✔
1888
    for (Map.Entry<Long, TsFileProcessor> entry : workSequenceTsFileProcessors.entrySet()) {
1✔
1889
      if (timePartitionStartId <= entry.getKey()
1✔
1890
          && entry.getKey() <= timePartitionEndId
1✔
1891
          && (timePartitionFilter == null
1892
              || timePartitionFilter.satisfy(databaseName, entry.getKey()))) {
×
1893
        WALFlushListener walFlushListener = entry.getValue().logDeleteDataNodeInWAL(deleteDataNode);
1✔
1894
        walFlushListeners.add(walFlushListener);
1✔
1895
      }
1896
    }
1✔
1897
    for (Map.Entry<Long, TsFileProcessor> entry : workUnsequenceTsFileProcessors.entrySet()) {
1✔
1898
      if (timePartitionStartId <= entry.getKey()
1✔
1899
          && entry.getKey() <= timePartitionEndId
1✔
1900
          && (timePartitionFilter == null
1901
              || timePartitionFilter.satisfy(databaseName, entry.getKey()))) {
×
1902
        WALFlushListener walFlushListener = entry.getValue().logDeleteDataNodeInWAL(deleteDataNode);
1✔
1903
        walFlushListeners.add(walFlushListener);
1✔
1904
      }
1905
    }
1✔
1906
    return walFlushListeners;
1✔
1907
  }
1908

1909
  private boolean canSkipDelete(
1910
      TsFileResource tsFileResource,
1911
      Set<PartialPath> devicePaths,
1912
      long deleteStart,
1913
      long deleteEnd,
1914
      TimePartitionFilter timePartitionFilter,
1915
      Set<String> deviceMatchInfo) {
1916
    if (timePartitionFilter != null
1✔
1917
        && !timePartitionFilter.satisfy(databaseName, tsFileResource.getTimePartition())) {
×
1918
      return true;
×
1919
    }
1920
    long fileStartTime = tsFileResource.getTimeIndex().getMinStartTime();
1✔
1921
    long fileEndTime = tsFileResource.getTimeIndex().getMaxEndTime();
1✔
1922

1923
    for (PartialPath device : devicePaths) {
1✔
1924
      long deviceStartTime, deviceEndTime;
1925
      if (device.hasWildcard()) {
1✔
1926
        if (!tsFileResource.isClosed() && fileEndTime == Long.MIN_VALUE) {
×
1927
          // unsealed seq file
1928
          if (deleteEnd < fileStartTime) {
×
1929
            // time range of file has not overlapped with the deletion
1930
            return true;
×
1931
          }
1932
        } else {
1933
          if (deleteEnd < fileStartTime || deleteStart > fileEndTime) {
×
1934
            // time range of file has not overlapped with the deletion
1935
            return true;
×
1936
          }
1937
        }
1938
        if (databaseName.contentEquals(device.getDevice())) {
×
1939
          return false;
×
1940
        }
1941
        Pair<Long, Long> startAndEndTime =
×
1942
            tsFileResource.getPossibleStartTimeAndEndTime(device, deviceMatchInfo);
×
1943
        if (startAndEndTime == null) {
×
1944
          continue;
×
1945
        }
1946
        deviceStartTime = startAndEndTime.getLeft();
×
1947
        deviceEndTime = startAndEndTime.getRight();
×
1948
      } else {
×
1949
        String deviceId = device.getFullPath();
1✔
1950
        if (tsFileResource.definitelyNotContains(deviceId)) {
1✔
1951
          // resource does not contain this device
1952
          continue;
1✔
1953
        }
1954
        deviceStartTime = tsFileResource.getStartTime(deviceId);
1✔
1955
        deviceEndTime = tsFileResource.getEndTime(deviceId);
1✔
1956
      }
1957

1958
      if (!tsFileResource.isClosed() && deviceEndTime == Long.MIN_VALUE) {
1✔
1959
        // unsealed seq file
1960
        if (deleteEnd >= deviceStartTime) {
1✔
1961
          return false;
1✔
1962
        }
1963
      } else {
1964
        // sealed file or unsealed unseq file
1965
        if (deleteEnd >= deviceStartTime && deleteStart <= deviceEndTime) {
1✔
1966
          // time range of device has overlap with the deletion
1967
          return false;
1✔
1968
        }
1969
      }
1970
    }
1✔
1971
    return true;
1✔
1972
  }
1973

1974
  // suppress warn of Throwable catch
1975
  @SuppressWarnings("java:S1181")
1976
  private void deleteDataInFiles(
1977
      Collection<TsFileResource> tsFileResourceList,
1978
      Deletion deletion,
1979
      Set<PartialPath> devicePaths,
1980
      TimePartitionFilter timePartitionFilter,
1981
      Set<String> deviceMatchInfo)
1982
      throws IOException {
1983
    for (TsFileResource tsFileResource : tsFileResourceList) {
1✔
1984
      if (canSkipDelete(
1✔
1985
          tsFileResource,
1986
          devicePaths,
1987
          deletion.getStartTime(),
1✔
1988
          deletion.getEndTime(),
1✔
1989
          timePartitionFilter,
1990
          deviceMatchInfo)) {
1991
        continue;
1✔
1992
      }
1993

1994
      ModificationFile modFile = tsFileResource.getModFile();
1✔
1995
      if (tsFileResource.isClosed()) {
1✔
1996
        long originSize = -1;
1✔
1997
        synchronized (modFile) {
1✔
1998
          try {
1999
            originSize = modFile.getSize();
1✔
2000
            // delete data in sealed file
2001
            if (tsFileResource.isCompacting()) {
1✔
2002
              // we have to set modification offset to MAX_VALUE, as the offset of source chunk may
2003
              // change after compaction
2004
              deletion.setFileOffset(Long.MAX_VALUE);
1✔
2005
              // write deletion into compaction modification file
2006
              tsFileResource.getCompactionModFile().write(deletion);
1✔
2007
              // write deletion into modification file to enable read during compaction
2008
              modFile.write(deletion);
1✔
2009
              // remember to close mod file
2010
              tsFileResource.getCompactionModFile().close();
1✔
2011
              modFile.close();
1✔
2012
            } else {
2013
              deletion.setFileOffset(tsFileResource.getTsFileSize());
1✔
2014
              // write deletion into modification file
2015
              boolean modFileExists = modFile.exists();
1✔
2016

2017
              modFile.write(deletion);
1✔
2018

2019
              // remember to close mod file
2020
              modFile.close();
1✔
2021

2022
              // if file length greater than 1M,execute compact.
2023
              modFile.compact();
1✔
2024

2025
              if (!modFileExists) {
1✔
2026
                FileMetrics.getInstance().increaseModFileNum(1);
1✔
2027
              }
2028

2029
              // The file size may be smaller than the original file, so the increment here may be
2030
              // negative
2031
              FileMetrics.getInstance().increaseModFileSize(modFile.getSize() - originSize);
1✔
2032
            }
2033
          } catch (Throwable t) {
×
2034
            if (originSize != -1) {
×
2035
              modFile.truncate(originSize);
×
2036
            }
2037
            throw t;
×
2038
          }
1✔
2039
          logger.info(
1✔
2040
              "[Deletion] Deletion with path:{}, time:{}-{} written into mods file:{}.",
2041
              deletion.getPath(),
1✔
2042
              deletion.getStartTime(),
1✔
2043
              deletion.getEndTime(),
1✔
2044
              modFile.getFilePath());
1✔
2045
        }
1✔
2046
      } else {
1✔
2047
        // delete data in memory of unsealed file
2048
        tsFileResource.getProcessor().deleteDataInMemory(deletion, devicePaths);
1✔
2049
      }
2050
    }
1✔
2051
  }
1✔
2052

2053
  private void unsequenceFlushCallback(
2054
      TsFileProcessor processor, Map<String, Long> updateMap, long systemFlushTime) {
2055
    TimePartitionManager.getInstance()
1✔
2056
        .updateAfterFlushing(
1✔
2057
            new DataRegionId(Integer.valueOf(dataRegionId)),
1✔
2058
            processor.getTimeRangeId(),
1✔
2059
            systemFlushTime,
2060
            lastFlushTimeMap.getMemSize(processor.getTimeRangeId()),
1✔
2061
            workSequenceTsFileProcessors.get(processor.getTimeRangeId()) != null);
1✔
2062
  }
1✔
2063

2064
  private void sequenceFlushCallback(
2065
      TsFileProcessor processor, Map<String, Long> updateMap, long systemFlushTime) {
2066
    lastFlushTimeMap.updateLatestFlushTime(processor.getTimeRangeId(), updateMap);
1✔
2067
    TimePartitionManager.getInstance()
1✔
2068
        .updateAfterFlushing(
1✔
2069
            new DataRegionId(Integer.valueOf(dataRegionId)),
1✔
2070
            processor.getTimeRangeId(),
1✔
2071
            systemFlushTime,
2072
            lastFlushTimeMap.getMemSize(processor.getTimeRangeId()),
1✔
2073
            workUnsequenceTsFileProcessors.get(processor.getTimeRangeId()) != null);
1✔
2074
  }
1✔
2075

2076
  /** put the memtable back to the MemTablePool and make the metadata in writer visible */
2077
  // TODO please consider concurrency with read and insert method.
2078
  private void closeUnsealedTsFileProcessorCallBack(TsFileProcessor tsFileProcessor)
2079
      throws TsFileProcessorException {
2080
    closeQueryLock.writeLock().lock();
1✔
2081
    try {
2082
      tsFileProcessor.close();
1✔
2083
      if (tsFileProcessor.isEmpty()) {
1✔
2084
        try {
2085
          fsFactory.deleteIfExists(tsFileProcessor.getTsFileResource().getTsFile());
1✔
2086
          tsFileManager.remove(tsFileProcessor.getTsFileResource(), tsFileProcessor.isSequence());
1✔
2087
        } catch (IOException e) {
×
2088
          logger.error(
×
2089
              "Remove empty file {} error",
2090
              tsFileProcessor.getTsFileResource().getTsFile().getAbsolutePath());
×
2091
        }
1✔
2092
      } else {
2093
        tsFileResourceManager.registerSealedTsFileResource(tsFileProcessor.getTsFileResource());
1✔
2094
      }
2095
    } finally {
2096
      closeQueryLock.writeLock().unlock();
1✔
2097
    }
2098
    // closingSequenceTsFileProcessor is a thread safety class.
2099
    if (closingSequenceTsFileProcessor.contains(tsFileProcessor)) {
1✔
2100
      closingSequenceTsFileProcessor.remove(tsFileProcessor);
1✔
2101
    } else {
2102
      closingUnSequenceTsFileProcessor.remove(tsFileProcessor);
1✔
2103
    }
2104
    synchronized (closeStorageGroupCondition) {
1✔
2105
      closeStorageGroupCondition.notifyAll();
1✔
2106
    }
1✔
2107
    FileMetrics.getInstance()
1✔
2108
        .addFile(
1✔
2109
            tsFileProcessor.getTsFileResource().getTsFileSize(),
1✔
2110
            tsFileProcessor.isSequence(),
1✔
2111
            tsFileProcessor.getTsFileResource().getTsFile().getName());
1✔
2112
    logger.info("signal closing database condition in {}", databaseName + "-" + dataRegionId);
1✔
2113
  }
1✔
2114

2115
  protected int executeCompaction() {
2116
    // the name of this variable is trySubmitCount, because the task submitted to the queue could be
2117
    // evicted due to the low priority of the task
2118
    int trySubmitCount = 0;
1✔
2119
    try {
2120
      List<Long> timePartitions = new ArrayList<>(tsFileManager.getTimePartitions());
1✔
2121
      // sort the time partition from largest to smallest
2122
      timePartitions.sort(Comparator.reverseOrder());
1✔
2123
      for (long timePartition : timePartitions) {
1✔
2124
        trySubmitCount += CompactionScheduler.scheduleCompaction(tsFileManager, timePartition);
1✔
2125
      }
1✔
2126
    } catch (Throwable e) {
×
2127
      logger.error("Meet error in compaction schedule.", e);
×
2128
    }
1✔
2129
    return trySubmitCount;
1✔
2130
  }
2131

2132
  /**
2133
   * After finishing settling tsfile, we need to do 2 things : (1) move the new tsfile to the
2134
   * correct folder, including deleting its old mods file (2) update the relevant data of this old
2135
   * tsFile in memory ,eg: FileSequenceReader, tsFileManager, cache, etc.
2136
   */
2137
  private void settleTsFileCallBack(
2138
      TsFileResource oldTsFileResource, List<TsFileResource> newTsFileResources)
2139
      throws WriteProcessException {
2140
    oldTsFileResource.readUnlock();
×
2141
    oldTsFileResource.writeLock();
×
2142
    try {
2143
      TsFileAndModSettleTool.moveNewTsFile(oldTsFileResource, newTsFileResources);
×
2144
      if (TsFileAndModSettleTool.getInstance().recoverSettleFileMap.size() != 0) {
×
2145
        TsFileAndModSettleTool.getInstance()
×
2146
            .recoverSettleFileMap
2147
            .remove(oldTsFileResource.getTsFile().getAbsolutePath());
×
2148
      }
2149
      // clear Cache , including chunk cache, timeseriesMetadata cache and bloom filter cache
2150
      operateClearCache();
×
2151

2152
      // if old tsfile is being deleted in the process due to its all data's being deleted.
2153
      if (!oldTsFileResource.getTsFile().exists()) {
×
2154
        tsFileManager.remove(oldTsFileResource, oldTsFileResource.isSeq());
×
2155
      }
2156
      FileReaderManager.getInstance().closeFileAndRemoveReader(oldTsFileResource.getTsFilePath());
×
2157
      oldTsFileResource.setSettleTsFileCallBack(null);
×
2158
      SettleService.getINSTANCE().getFilesToBeSettledCount().addAndGet(-1);
×
2159
    } catch (IOException e) {
×
2160
      logger.error("Exception to move new tsfile in settling", e);
×
2161
      throw new WriteProcessException(
×
2162
          "Meet error when settling file: " + oldTsFileResource.getTsFile().getAbsolutePath(), e);
×
2163
    } finally {
2164
      oldTsFileResource.writeUnlock();
×
2165
    }
2166
  }
×
2167

2168
  public static void operateClearCache() {
2169
    ChunkCache.getInstance().clear();
×
2170
    TimeSeriesMetadataCache.getInstance().clear();
×
2171
    BloomFilterCache.getInstance().clear();
×
2172
  }
×
2173

2174
  /** merge file under this database processor */
2175
  public int compact() {
2176
    writeLock("merge");
1✔
2177
    try {
2178
      return executeCompaction();
1✔
2179
    } finally {
2180
      writeUnlock();
1✔
2181
    }
2182
  }
2183

2184
  private void resetLastCacheWhenLoadingTsFile() throws IllegalPathException {
2185
    if (!CommonDescriptor.getInstance().getConfig().isLastCacheEnable()) {
×
2186
      return;
×
2187
    }
2188
    DataNodeSchemaCache.getInstance().takeWriteLock();
×
2189
    try {
2190
      DataNodeSchemaCache.getInstance().invalidateAll();
×
2191
    } finally {
2192
      DataNodeSchemaCache.getInstance().releaseWriteLock();
×
2193
    }
2194
  }
×
2195

2196
  /**
2197
   * Load a new tsfile to unsequence dir.
2198
   *
2199
   * <p>Then, update the latestTimeForEachDevice and partitionLatestFlushedTimeForEachDevice.
2200
   *
2201
   * @param newTsFileResource tsfile resource @UsedBy load external tsfile module
2202
   * @param deleteOriginFile whether to delete origin tsfile
2203
   * @param isGeneratedByPipe whether the load tsfile request is generated by pipe
2204
   */
2205
  public void loadNewTsFile(
2206
      TsFileResource newTsFileResource, boolean deleteOriginFile, boolean isGeneratedByPipe)
2207
      throws LoadFileException {
2208
    File tsfileToBeInserted = newTsFileResource.getTsFile();
×
2209
    long newFilePartitionId = newTsFileResource.getTimePartitionWithCheck();
×
2210
    writeLock("loadNewTsFile");
×
2211
    try {
2212
      newTsFileResource.setSeq(false);
×
2213
      String newFileName =
×
2214
          getNewTsFileName(
×
2215
              System.currentTimeMillis(),
×
2216
              getAndSetNewVersion(newFilePartitionId, newTsFileResource),
×
2217
              0,
2218
              0);
2219

2220
      if (!newFileName.equals(tsfileToBeInserted.getName())) {
×
2221
        logger.info(
×
2222
            "TsFile {} must be renamed to {} for loading into the unsequence list.",
2223
            tsfileToBeInserted.getName(),
×
2224
            newFileName);
2225
        newTsFileResource.setFile(
×
2226
            fsFactory.getFile(tsfileToBeInserted.getParentFile(), newFileName));
×
2227
      }
2228
      loadTsFileToUnSequence(
×
2229
          tsfileToBeInserted, newTsFileResource, newFilePartitionId, deleteOriginFile);
2230

2231
      PipeInsertionDataNodeListener.getInstance()
×
2232
          .listenToTsFile(dataRegionId, newTsFileResource, isGeneratedByPipe);
×
2233

2234
      FileMetrics.getInstance()
×
2235
          .addFile(
×
2236
              newTsFileResource.getTsFile().length(),
×
2237
              false,
2238
              newTsFileResource.getTsFile().getName());
×
2239

2240
      resetLastCacheWhenLoadingTsFile(); // update last cache
×
2241
      updateLastFlushTime(newTsFileResource); // update last flush time
×
2242
      long partitionNum = newTsFileResource.getTimePartition();
×
2243
      updatePartitionFileVersion(partitionNum, newTsFileResource.getVersion());
×
2244
      logger.info("TsFile {} is successfully loaded in unsequence list.", newFileName);
×
2245
    } catch (DiskSpaceInsufficientException e) {
×
2246
      logger.error(
×
2247
          "Failed to append the tsfile {} to database processor {} because the disk space is insufficient.",
2248
          tsfileToBeInserted.getAbsolutePath(),
×
2249
          tsfileToBeInserted.getParentFile().getName());
×
2250
      throw new LoadFileException(e);
×
2251
    } catch (IllegalPathException e) {
×
2252
      logger.error(
×
2253
          "Failed to reset last cache when loading file {}", newTsFileResource.getTsFilePath());
×
2254
      throw new LoadFileException(e);
×
2255
    } finally {
2256
      writeUnlock();
×
2257
    }
2258
  }
×
2259

2260
  /**
2261
   * Set the version in "partition" to "version" if "version" is larger than the current version.
2262
   */
2263
  public void setPartitionFileVersionToMax(long partition, long version) {
2264
    partitionMaxFileVersions.compute(
×
2265
        partition, (prt, oldVer) -> computeMaxVersion(oldVer, version));
×
2266
  }
×
2267

2268
  private long computeMaxVersion(Long oldVersion, Long newVersion) {
2269
    if (oldVersion == null) {
×
2270
      return newVersion;
×
2271
    }
2272
    return Math.max(oldVersion, newVersion);
×
2273
  }
2274

2275
  /**
2276
   * If the historical versions of a file is a sub-set of the given file's, (close and) remove it to
2277
   * reduce unnecessary merge. Only used when the file sender and the receiver share the same file
2278
   * close policy. Warning: DO NOT REMOVE
2279
   */
2280
  @SuppressWarnings("unused")
2281
  public void removeFullyOverlapFiles(TsFileResource resource) {
2282
    writeLock("removeFullyOverlapFiles");
×
2283
    try {
2284
      Iterator<TsFileResource> iterator = tsFileManager.getIterator(true);
×
2285
      removeFullyOverlapFiles(resource, iterator, true);
×
2286

2287
      iterator = tsFileManager.getIterator(false);
×
2288
      removeFullyOverlapFiles(resource, iterator, false);
×
2289
    } finally {
2290
      writeUnlock();
×
2291
    }
2292
  }
×
2293

2294
  private void removeFullyOverlapFiles(
2295
      TsFileResource newTsFile, Iterator<TsFileResource> iterator, boolean isSeq) {
2296
    while (iterator.hasNext()) {
×
2297
      TsFileResource existingTsFile = iterator.next();
×
2298
      if (newTsFile.isPlanRangeCovers(existingTsFile)
×
2299
          && !newTsFile.getTsFile().equals(existingTsFile.getTsFile())
×
2300
          && existingTsFile.tryWriteLock()) {
×
2301
        logger.info(
×
2302
            "{} is covered by {}: [{}, {}], [{}, {}], remove it",
2303
            existingTsFile,
2304
            newTsFile,
2305
            existingTsFile.minPlanIndex,
×
2306
            existingTsFile.maxPlanIndex,
×
2307
            newTsFile.minPlanIndex,
×
2308
            newTsFile.maxPlanIndex);
×
2309
        // if we fail to lock the file, it means it is being queried or merged and we will not
2310
        // wait until it is free, we will just leave it to the next merge
2311
        try {
2312
          removeFullyOverlapFile(existingTsFile, iterator, isSeq);
×
2313
        } catch (Exception e) {
×
2314
          logger.error(
×
2315
              "Something gets wrong while removing FullyOverlapFiles: {}",
2316
              existingTsFile.getTsFile().getAbsolutePath(),
×
2317
              e);
2318
        } finally {
2319
          existingTsFile.writeUnlock();
×
2320
        }
2321
      }
2322
    }
×
2323
  }
×
2324

2325
  /**
2326
   * remove the given tsFileResource. If the corresponding tsFileProcessor is in the working status,
2327
   * close it before remove the related resource files. maybe time-consuming for closing a tsfile.
2328
   */
2329
  private void removeFullyOverlapFile(
2330
      TsFileResource tsFileResource, Iterator<TsFileResource> iterator, boolean isSeq) {
2331
    logger.info(
×
2332
        "Removing a covered file {}, closed: {}", tsFileResource, tsFileResource.isClosed());
×
2333
    if (!tsFileResource.isClosed()) {
×
2334
      try {
2335
        // also remove the TsFileProcessor if the overlapped file is not closed
2336
        long timePartition = tsFileResource.getTimePartition();
×
2337
        Map<Long, TsFileProcessor> fileProcessorMap =
2338
            isSeq ? workSequenceTsFileProcessors : workUnsequenceTsFileProcessors;
×
2339
        TsFileProcessor tsFileProcessor = fileProcessorMap.get(timePartition);
×
2340
        if (tsFileProcessor != null && tsFileProcessor.getTsFileResource() == tsFileResource) {
×
2341
          // have to take some time to close the tsFileProcessor
2342
          tsFileProcessor.syncClose();
×
2343
          fileProcessorMap.remove(timePartition);
×
2344
        }
2345
      } catch (Exception e) {
×
2346
        logger.error("Cannot close {}", tsFileResource, e);
×
2347
      }
×
2348
    }
2349
    tsFileManager.remove(tsFileResource, isSeq);
×
2350
    iterator.remove();
×
2351
    tsFileResource.remove();
×
2352
  }
×
2353

2354
  private long getAndSetNewVersion(long timePartitionId, TsFileResource tsFileResource) {
2355
    long version = partitionMaxFileVersions.getOrDefault(timePartitionId, 0L) + 1;
×
2356
    partitionMaxFileVersions.put(timePartitionId, version);
×
2357
    tsFileResource.setVersion(version);
×
2358
    return version;
×
2359
  }
2360

2361
  /**
2362
   * Update latest time in latestTimeForEachDevice and
2363
   * partitionLatestFlushedTimeForEachDevice. @UsedBy sync module, load external tsfile module.
2364
   */
2365
  private void updateLastFlushTime(TsFileResource newTsFileResource) {
2366
    for (String device : newTsFileResource.getDevices()) {
×
2367
      long endTime = newTsFileResource.getEndTime(device);
×
2368
      long timePartitionId = StorageEngine.getTimePartition(endTime);
×
2369
      lastFlushTimeMap.updateFlushedTime(timePartitionId, device, endTime);
×
2370
      lastFlushTimeMap.updateGlobalFlushedTime(device, endTime);
×
2371
    }
×
2372
  }
×
2373

2374
  /**
2375
   * Execute the loading process by the type.
2376
   *
2377
   * @param tsFileResource tsfile resource to be loaded
2378
   * @param filePartitionId the partition id of the new file
2379
   * @param deleteOriginFile whether to delete the original file
2380
   * @return load the file successfully @UsedBy sync module, load external tsfile module.
2381
   */
2382
  private boolean loadTsFileToUnSequence(
2383
      File tsFileToLoad,
2384
      TsFileResource tsFileResource,
2385
      long filePartitionId,
2386
      boolean deleteOriginFile)
2387
      throws LoadFileException, DiskSpaceInsufficientException {
2388
    File targetFile;
2389
    targetFile =
×
2390
        fsFactory.getFile(
×
2391
            TierManager.getInstance().getNextFolderForTsFile(0, false),
×
2392
            databaseName
2393
                + File.separatorChar
2394
                + dataRegionId
2395
                + File.separatorChar
2396
                + filePartitionId
2397
                + File.separator
2398
                + tsFileResource.getTsFile().getName());
×
2399
    tsFileResource.setFile(targetFile);
×
2400
    if (tsFileManager.contains(tsFileResource, false)) {
×
2401
      logger.error("The file {} has already been loaded in unsequence list", tsFileResource);
×
2402
      return false;
×
2403
    }
2404
    tsFileManager.add(tsFileResource, false);
×
2405
    logger.info(
×
2406
        "Load tsfile in unsequence list, move file from {} to {}",
2407
        tsFileToLoad.getAbsolutePath(),
×
2408
        targetFile.getAbsolutePath());
×
2409

2410
    // move file from sync dir to data dir
2411
    if (!targetFile.getParentFile().exists()) {
×
2412
      targetFile.getParentFile().mkdirs();
×
2413
    }
2414
    try {
2415
      if (deleteOriginFile) {
×
2416
        FileUtils.moveFile(tsFileToLoad, targetFile);
×
2417
      } else {
2418
        Files.copy(tsFileToLoad.toPath(), targetFile.toPath());
×
2419
      }
2420
    } catch (IOException e) {
×
2421
      logger.error(
×
2422
          "File renaming failed when loading tsfile. Origin: {}, Target: {}",
2423
          tsFileToLoad.getAbsolutePath(),
×
2424
          targetFile.getAbsolutePath(),
×
2425
          e);
2426
      throw new LoadFileException(
×
2427
          String.format(
×
2428
              "File renaming failed when loading tsfile. Origin: %s, Target: %s, because %s",
2429
              tsFileToLoad.getAbsolutePath(), targetFile.getAbsolutePath(), e.getMessage()));
×
2430
    }
×
2431

2432
    File resourceFileToLoad =
×
2433
        fsFactory.getFile(tsFileToLoad.getAbsolutePath() + TsFileResource.RESOURCE_SUFFIX);
×
2434
    File targetResourceFile =
×
2435
        fsFactory.getFile(targetFile.getAbsolutePath() + TsFileResource.RESOURCE_SUFFIX);
×
2436
    try {
2437
      if (deleteOriginFile) {
×
2438
        FileUtils.moveFile(resourceFileToLoad, targetResourceFile);
×
2439
      } else {
2440
        Files.copy(resourceFileToLoad.toPath(), targetResourceFile.toPath());
×
2441
      }
2442

2443
    } catch (IOException e) {
×
2444
      logger.error(
×
2445
          "File renaming failed when loading .resource file. Origin: {}, Target: {}",
2446
          resourceFileToLoad.getAbsolutePath(),
×
2447
          targetResourceFile.getAbsolutePath(),
×
2448
          e);
2449
      throw new LoadFileException(
×
2450
          String.format(
×
2451
              "File renaming failed when loading .resource file. Origin: %s, Target: %s, because %s",
2452
              resourceFileToLoad.getAbsolutePath(),
×
2453
              targetResourceFile.getAbsolutePath(),
×
2454
              e.getMessage()));
×
2455
    }
×
2456

2457
    File modFileToLoad =
×
2458
        fsFactory.getFile(tsFileToLoad.getAbsolutePath() + ModificationFile.FILE_SUFFIX);
×
2459
    if (modFileToLoad.exists()) {
×
2460
      // when successfully loaded, the filepath of the resource will be changed to the IoTDB data
2461
      // dir, so we can add a suffix to find the old modification file.
2462
      File targetModFile =
×
2463
          fsFactory.getFile(targetFile.getAbsolutePath() + ModificationFile.FILE_SUFFIX);
×
2464
      try {
2465
        Files.deleteIfExists(targetModFile.toPath());
×
2466
      } catch (IOException e) {
×
2467
        logger.warn("Cannot delete localModFile {}", targetModFile, e);
×
2468
      }
×
2469
      try {
2470
        if (deleteOriginFile) {
×
2471
          FileUtils.moveFile(modFileToLoad, targetModFile);
×
2472
        } else {
2473
          Files.copy(modFileToLoad.toPath(), targetModFile.toPath());
×
2474
        }
2475
      } catch (IOException e) {
×
2476
        logger.error(
×
2477
            "File renaming failed when loading .mod file. Origin: {}, Target: {}",
2478
            modFileToLoad.getAbsolutePath(),
×
2479
            targetModFile.getAbsolutePath(),
×
2480
            e);
2481
        throw new LoadFileException(
×
2482
            String.format(
×
2483
                "File renaming failed when loading .mod file. Origin: %s, Target: %s, because %s",
2484
                modFileToLoad.getAbsolutePath(), targetModFile.getAbsolutePath(), e.getMessage()));
×
2485
      } finally {
2486
        // ModFile will be updated during the next call to `getModFile`
2487
        tsFileResource.setModFile(null);
×
2488
      }
2489
    }
2490

2491
    updatePartitionFileVersion(filePartitionId, tsFileResource.getVersion());
×
2492
    return true;
×
2493
  }
2494

2495
  /**
2496
   * get all working sequence tsfile processors
2497
   *
2498
   * @return all working sequence tsfile processors
2499
   */
2500
  public Collection<TsFileProcessor> getWorkSequenceTsFileProcessors() {
2501
    return workSequenceTsFileProcessors.values();
1✔
2502
  }
2503

2504
  /**
2505
   * Unload tsfile and move it to the target directory if it exists.
2506
   *
2507
   * <p>Firstly, unload the TsFileResource from sequenceFileList/unSequenceFileList.
2508
   *
2509
   * <p>Secondly, move the tsfile and .resource file to the target directory.
2510
   *
2511
   * @param fileToBeUnloaded tsfile to be unloaded
2512
   * @return whether the file to be unloaded exists. @UsedBy load external tsfile module.
2513
   */
2514
  public boolean unloadTsfile(File fileToBeUnloaded, File targetDir) throws IOException {
2515
    writeLock("unloadTsfile");
×
2516
    TsFileResource tsFileResourceToBeMoved = null;
×
2517
    try {
2518
      Iterator<TsFileResource> sequenceIterator = tsFileManager.getIterator(true);
×
2519
      while (sequenceIterator.hasNext()) {
×
2520
        TsFileResource sequenceResource = sequenceIterator.next();
×
2521
        if (sequenceResource.getTsFile().getName().equals(fileToBeUnloaded.getName())) {
×
2522
          tsFileResourceToBeMoved = sequenceResource;
×
2523
          tsFileManager.remove(tsFileResourceToBeMoved, true);
×
2524
          FileMetrics.getInstance()
×
2525
              .deleteFile(
×
2526
                  new long[] {tsFileResourceToBeMoved.getTsFileSize()},
×
2527
                  true,
2528
                  Collections.singletonList(tsFileResourceToBeMoved.getTsFile().getName()));
×
2529
          break;
×
2530
        }
2531
      }
×
2532
      if (tsFileResourceToBeMoved == null) {
×
2533
        Iterator<TsFileResource> unsequenceIterator = tsFileManager.getIterator(false);
×
2534
        while (unsequenceIterator.hasNext()) {
×
2535
          TsFileResource unsequenceResource = unsequenceIterator.next();
×
2536
          if (unsequenceResource.getTsFile().getName().equals(fileToBeUnloaded.getName())) {
×
2537
            tsFileResourceToBeMoved = unsequenceResource;
×
2538
            tsFileManager.remove(tsFileResourceToBeMoved, false);
×
2539
            FileMetrics.getInstance()
×
2540
                .deleteFile(
×
2541
                    new long[] {tsFileResourceToBeMoved.getTsFileSize()},
×
2542
                    false,
2543
                    Collections.singletonList(tsFileResourceToBeMoved.getTsFile().getName()));
×
2544
            break;
×
2545
          }
2546
        }
×
2547
      }
2548
    } finally {
2549
      writeUnlock();
×
2550
    }
2551
    if (tsFileResourceToBeMoved == null) {
×
2552
      return false;
×
2553
    }
2554
    tsFileResourceToBeMoved.writeLock();
×
2555
    try {
2556
      tsFileResourceToBeMoved.moveTo(targetDir);
×
2557
      logger.info(
×
2558
          "Move tsfile {} to target dir {} successfully.",
2559
          tsFileResourceToBeMoved.getTsFile(),
×
2560
          targetDir.getPath());
×
2561
    } finally {
2562
      tsFileResourceToBeMoved.writeUnlock();
×
2563
    }
2564
    return true;
×
2565
  }
2566

2567
  /**
2568
   * get all working unsequence tsfile processors
2569
   *
2570
   * @return all working unsequence tsfile processors
2571
   */
2572
  public Collection<TsFileProcessor> getWorkUnsequenceTsFileProcessors() {
2573
    return workUnsequenceTsFileProcessors.values();
1✔
2574
  }
2575

2576
  public void setDataTTLWithTimePrecisionCheck(long dataTTL) {
2577
    if (dataTTL != Long.MAX_VALUE) {
×
2578
      dataTTL =
×
2579
          CommonDateTimeUtils.convertMilliTimeWithPrecision(
×
2580
              dataTTL, CommonDescriptor.getInstance().getConfig().getTimestampPrecision());
×
2581
    }
2582
    this.dataTTL = dataTTL;
×
2583
  }
×
2584

2585
  public void setDataTTL(long dataTTL) {
2586
    this.dataTTL = dataTTL;
1✔
2587
  }
1✔
2588

2589
  public List<TsFileResource> getSequenceFileList() {
2590
    return tsFileManager.getTsFileList(true);
1✔
2591
  }
2592

2593
  public List<TsFileResource> getUnSequenceFileList() {
2594
    return tsFileManager.getTsFileList(false);
1✔
2595
  }
2596

2597
  public String getDataRegionId() {
2598
    return dataRegionId;
1✔
2599
  }
2600

2601
  /**
2602
   * Get the storageGroupPath with dataRegionId.
2603
   *
2604
   * @return data region path, like root.sg1/0
2605
   */
2606
  public String getStorageGroupPath() {
2607
    return databaseName + File.separator + dataRegionId;
×
2608
  }
2609

2610
  /**
2611
   * Check if the data of "tsFileResource" all exist locally by comparing planIndexes in the
2612
   * partition of "partitionNumber". This is available only when the IoTDB instances which generated
2613
   * "tsFileResource" have the same plan indexes as the local one.
2614
   *
2615
   * @return true if any file contains plans with indexes no less than the max plan index of
2616
   *     "tsFileResource", otherwise false.
2617
   */
2618
  public boolean isFileAlreadyExist(TsFileResource tsFileResource, long partitionNum) {
2619
    // examine working processor first as they have the largest plan index
2620
    return isFileAlreadyExistInWorking(
×
2621
            tsFileResource, partitionNum, getWorkSequenceTsFileProcessors())
×
2622
        || isFileAlreadyExistInWorking(
×
2623
            tsFileResource, partitionNum, getWorkUnsequenceTsFileProcessors())
×
2624
        || isFileAlreadyExistInClosed(tsFileResource, partitionNum, getSequenceFileList())
×
2625
        || isFileAlreadyExistInClosed(tsFileResource, partitionNum, getUnSequenceFileList());
×
2626
  }
2627

2628
  private boolean isFileAlreadyExistInClosed(
2629
      TsFileResource tsFileResource, long partitionNum, Collection<TsFileResource> existingFiles) {
2630
    for (TsFileResource resource : existingFiles) {
×
2631
      if (resource.getTimePartition() == partitionNum
×
2632
          && resource.getMaxPlanIndex() > tsFileResource.getMaxPlanIndex()) {
×
2633
        logger.info(
×
2634
            "{} is covered by a closed file {}: [{}, {}] [{}, {}]",
2635
            tsFileResource,
2636
            resource,
2637
            tsFileResource.minPlanIndex,
×
2638
            tsFileResource.maxPlanIndex,
×
2639
            resource.minPlanIndex,
×
2640
            resource.maxPlanIndex);
×
2641
        return true;
×
2642
      }
2643
    }
×
2644
    return false;
×
2645
  }
2646

2647
  private boolean isFileAlreadyExistInWorking(
2648
      TsFileResource tsFileResource,
2649
      long partitionNum,
2650
      Collection<TsFileProcessor> workingProcessors) {
2651
    for (TsFileProcessor workingProcesssor : workingProcessors) {
×
2652
      if (workingProcesssor.getTimeRangeId() == partitionNum) {
×
2653
        TsFileResource workResource = workingProcesssor.getTsFileResource();
×
2654
        boolean isCovered = workResource.getMaxPlanIndex() > tsFileResource.getMaxPlanIndex();
×
2655
        if (isCovered) {
×
2656
          logger.info(
×
2657
              "{} is covered by a working file {}: [{}, {}] [{}, {}]",
2658
              tsFileResource,
2659
              workResource,
2660
              tsFileResource.minPlanIndex,
×
2661
              tsFileResource.maxPlanIndex,
×
2662
              workResource.minPlanIndex,
×
2663
              workResource.maxPlanIndex);
×
2664
        }
2665
        return isCovered;
×
2666
      }
2667
    }
×
2668
    return false;
×
2669
  }
2670

2671
  public void abortCompaction() {
2672
    tsFileManager.setAllowCompaction(false);
1✔
2673
    List<AbstractCompactionTask> runningTasks =
2674
        CompactionTaskManager.getInstance().abortCompaction(databaseName + "-" + dataRegionId);
1✔
2675
    while (CompactionTaskManager.getInstance().isAnyTaskInListStillRunning(runningTasks)) {
1✔
2676
      try {
2677
        TimeUnit.MILLISECONDS.sleep(10);
×
2678
      } catch (InterruptedException e) {
×
2679
        logger.error("Thread get interrupted when waiting compaction to finish", e);
×
2680
        Thread.currentThread().interrupt();
×
2681
      }
×
2682
    }
2683
    if (timedCompactionScheduleTask != null) {
1✔
2684
      timedCompactionScheduleTask.shutdownNow();
1✔
2685
    }
2686
  }
1✔
2687

2688
  public TsFileManager getTsFileResourceManager() {
2689
    return tsFileManager;
1✔
2690
  }
2691

2692
  /**
2693
   * insert batch of rows belongs to one device
2694
   *
2695
   * @param insertRowsOfOneDeviceNode batch of rows belongs to one device
2696
   */
2697
  public void insert(InsertRowsOfOneDeviceNode insertRowsOfOneDeviceNode)
2698
      throws WriteProcessException, BatchProcessException {
2699
    if (enableMemControl) {
×
2700
      StorageEngine.blockInsertionIfReject(null);
×
2701
    }
2702
    long startTime = System.nanoTime();
×
2703
    writeLock("InsertRowsOfOneDevice");
×
2704
    PERFORMANCE_OVERVIEW_METRICS.recordScheduleLockCost(System.nanoTime() - startTime);
×
2705
    try {
2706
      if (deleted) {
×
2707
        return;
×
2708
      }
2709
      boolean isSequence = false;
×
2710
      for (int i = 0; i < insertRowsOfOneDeviceNode.getInsertRowNodeList().size(); i++) {
×
2711
        InsertRowNode insertRowNode = insertRowsOfOneDeviceNode.getInsertRowNodeList().get(i);
×
2712
        if (!isAlive(insertRowNode.getTime())) {
×
2713
          // we do not need to write these part of data, as they can not be queried
2714
          // or the sub-plan has already been executed, we are retrying other sub-plans
2715
          insertRowsOfOneDeviceNode
×
2716
              .getResults()
×
2717
              .put(
×
2718
                  i,
×
2719
                  RpcUtils.getStatus(
×
2720
                      TSStatusCode.OUT_OF_TTL.getStatusCode(),
×
2721
                      String.format(
×
2722
                          "Insertion time [%s] is less than ttl time bound [%s]",
2723
                          DateTimeUtils.convertLongToDate(insertRowNode.getTime()),
×
2724
                          DateTimeUtils.convertLongToDate(DateTimeUtils.currentTime() - dataTTL))));
×
2725
          continue;
×
2726
        }
2727
        // init map
2728
        long timePartitionId = StorageEngine.getTimePartition(insertRowNode.getTime());
×
2729

2730
        if (!lastFlushTimeMap.checkAndCreateFlushedTimePartition(timePartitionId)) {
×
2731
          TimePartitionManager.getInstance()
×
2732
              .registerTimePartitionInfo(
×
2733
                  new TimePartitionInfo(
2734
                      new DataRegionId(Integer.valueOf(dataRegionId)),
×
2735
                      timePartitionId,
2736
                      true,
2737
                      Long.MAX_VALUE,
2738
                      0,
2739
                      tsFileManager.isLatestTimePartition(timePartitionId)));
×
2740
        }
2741

2742
        // as the plans have been ordered, and we have get the write lock,
2743
        // So, if a plan is sequenced, then all the rest plans are sequenced.
2744
        //
2745
        if (!isSequence) {
×
2746
          isSequence =
×
2747
              insertRowNode.getTime()
×
2748
                  > lastFlushTimeMap.getFlushedTime(
×
2749
                      timePartitionId, insertRowNode.getDevicePath().getFullPath());
×
2750
        }
2751
        // is unsequence and user set config to discard out of order data
2752
        if (!isSequence
×
2753
            && IoTDBDescriptor.getInstance().getConfig().isEnableDiscardOutOfOrderData()) {
×
2754
          return;
×
2755
        }
2756

2757
        // insert to sequence or unSequence file
2758
        try {
2759
          insertToTsFileProcessor(insertRowNode, isSequence, timePartitionId);
×
2760
        } catch (WriteProcessException e) {
×
2761
          insertRowsOfOneDeviceNode
×
2762
              .getResults()
×
2763
              .put(i, RpcUtils.getStatus(e.getErrorCode(), e.getMessage()));
×
2764
        }
×
2765
      }
2766
    } finally {
2767
      writeUnlock();
×
2768
    }
2769
    if (!insertRowsOfOneDeviceNode.getResults().isEmpty()) {
×
2770
      throw new BatchProcessException("Partial failed inserting rows of one device");
×
2771
    }
2772
  }
×
2773

2774
  /**
2775
   * insert batch of rows belongs to multiple devices
2776
   *
2777
   * @param insertRowsNode batch of rows belongs to multiple devices
2778
   */
2779
  public void insert(InsertRowsNode insertRowsNode) throws BatchProcessException {
2780
    for (int i = 0; i < insertRowsNode.getInsertRowNodeList().size(); i++) {
×
2781
      InsertRowNode insertRowNode = insertRowsNode.getInsertRowNodeList().get(i);
×
2782
      try {
2783
        insert(insertRowNode);
×
2784
      } catch (WriteProcessException e) {
×
2785
        insertRowsNode.getResults().put(i, RpcUtils.getStatus(e.getErrorCode(), e.getMessage()));
×
2786
      }
×
2787
    }
2788

2789
    if (!insertRowsNode.getResults().isEmpty()) {
×
2790
      throw new BatchProcessException("Partial failed inserting rows");
×
2791
    }
2792
  }
×
2793

2794
  /**
2795
   * insert batch of tablets belongs to multiple devices
2796
   *
2797
   * @param insertMultiTabletsNode batch of tablets belongs to multiple devices
2798
   */
2799
  public void insertTablets(InsertMultiTabletsNode insertMultiTabletsNode)
2800
      throws BatchProcessException {
2801
    for (int i = 0; i < insertMultiTabletsNode.getInsertTabletNodeList().size(); i++) {
×
2802
      InsertTabletNode insertTabletNode = insertMultiTabletsNode.getInsertTabletNodeList().get(i);
×
2803
      try {
2804
        insertTablet(insertTabletNode);
×
2805
      } catch (WriteProcessException | BatchProcessException e) {
×
2806
        insertMultiTabletsNode
×
2807
            .getResults()
×
2808
            .put(i, RpcUtils.getStatus(e.getErrorCode(), e.getMessage()));
×
2809
      }
×
2810
    }
2811

2812
    if (!insertMultiTabletsNode.getResults().isEmpty()) {
×
2813
      throw new BatchProcessException("Partial failed inserting multi tablets");
×
2814
    }
2815
  }
×
2816

2817
  /** @return the disk space occupied by this data region, unit is MB */
2818
  public long countRegionDiskSize() {
2819
    AtomicLong diskSize = new AtomicLong(0);
×
2820
    TierManager.getInstance()
×
2821
        .getAllLocalFilesFolders()
×
2822
        .forEach(
×
2823
            folder -> {
2824
              folder = folder + File.separator + databaseName + File.separator + dataRegionId;
×
2825
              countFolderDiskSize(folder, diskSize);
×
2826
            });
×
2827
    return diskSize.get() / 1024 / 1024;
×
2828
  }
2829

2830
  /**
2831
   * @param folder the folder's path
2832
   * @param diskSize the disk space occupied by this folder, unit is MB
2833
   */
2834
  private void countFolderDiskSize(String folder, AtomicLong diskSize) {
2835
    File file = FSFactoryProducer.getFSFactory().getFile(folder);
×
2836
    File[] allFile = file.listFiles();
×
2837
    if (allFile == null) {
×
2838
      return;
×
2839
    }
2840
    for (File f : allFile) {
×
2841
      if (f.isFile()) {
×
2842
        diskSize.addAndGet(f.length());
×
2843
      } else if (f.isDirectory()) {
×
2844
        countFolderDiskSize(f.getAbsolutePath(), diskSize);
×
2845
      }
2846
    }
2847
  }
×
2848

2849
  public void addSettleFilesToList(
2850
      List<TsFileResource> seqResourcesToBeSettled,
2851
      List<TsFileResource> unseqResourcesToBeSettled,
2852
      List<String> tsFilePaths) {
2853
    if (tsFilePaths.isEmpty()) {
×
2854
      for (TsFileResource resource : tsFileManager.getTsFileList(true)) {
×
2855
        if (!resource.isClosed()) {
×
2856
          continue;
×
2857
        }
2858
        resource.setSettleTsFileCallBack(this::settleTsFileCallBack);
×
2859
        seqResourcesToBeSettled.add(resource);
×
2860
      }
×
2861
      for (TsFileResource resource : tsFileManager.getTsFileList(false)) {
×
2862
        if (!resource.isClosed()) {
×
2863
          continue;
×
2864
        }
2865
        resource.setSettleTsFileCallBack(this::settleTsFileCallBack);
×
2866
        unseqResourcesToBeSettled.add(resource);
×
2867
      }
×
2868
    } else {
2869
      for (String tsFilePath : tsFilePaths) {
×
2870
        File fileToBeSettled = new File(tsFilePath);
×
2871
        if ("sequence"
×
2872
            .equals(
×
2873
                fileToBeSettled
2874
                    .getParentFile()
×
2875
                    .getParentFile()
×
2876
                    .getParentFile()
×
2877
                    .getParentFile()
×
2878
                    .getName())) {
×
2879
          for (TsFileResource resource : tsFileManager.getTsFileList(true)) {
×
2880
            if (resource.getTsFile().getAbsolutePath().equals(tsFilePath)) {
×
2881
              resource.setSettleTsFileCallBack(this::settleTsFileCallBack);
×
2882
              seqResourcesToBeSettled.add(resource);
×
2883
              break;
×
2884
            }
2885
          }
×
2886
        } else {
2887
          for (TsFileResource resource : tsFileManager.getTsFileList(false)) {
×
2888
            if (resource.getTsFile().getAbsolutePath().equals(tsFilePath)) {
×
2889
              unseqResourcesToBeSettled.add(resource);
×
2890
              break;
×
2891
            }
2892
          }
×
2893
        }
2894
      }
×
2895
    }
2896
  }
×
2897

2898
  public void setCustomCloseFileListeners(List<CloseFileListener> customCloseFileListeners) {
2899
    this.customCloseFileListeners = customCloseFileListeners;
×
2900
  }
×
2901

2902
  public void setCustomFlushListeners(List<FlushListener> customFlushListeners) {
2903
    this.customFlushListeners = customFlushListeners;
×
2904
  }
×
2905

2906
  public void setAllowCompaction(boolean allowCompaction) {
2907
    this.tsFileManager.setAllowCompaction(allowCompaction);
×
2908
  }
×
2909

2910
  @FunctionalInterface
2911
  public interface CloseTsFileCallBack {
2912

2913
    void call(TsFileProcessor caller) throws TsFileProcessorException, IOException;
2914
  }
2915

2916
  @FunctionalInterface
2917
  public interface UpdateEndTimeCallBack {
2918

2919
    void call(TsFileProcessor caller, Map<String, Long> updateMap, long systemFlushTime);
2920
  }
2921

2922
  @FunctionalInterface
2923
  public interface CompactionRecoverCallBack {
2924
    void call();
2925
  }
2926

2927
  @FunctionalInterface
2928
  public interface TimePartitionFilter {
2929

2930
    boolean satisfy(String storageGroupName, long timePartitionId);
2931
  }
2932

2933
  @FunctionalInterface
2934
  public interface SettleTsFileCallBack {
2935

2936
    void call(TsFileResource oldTsFileResource, List<TsFileResource> newTsFileResources)
2937
        throws WriteProcessException;
2938
  }
2939

2940
  public List<Long> getTimePartitions() {
2941
    return new ArrayList<>(partitionMaxFileVersions.keySet());
×
2942
  }
2943

2944
  public Long getLatestTimePartition() {
2945
    return getTimePartitions().stream().max(Long::compareTo).orElse(0L);
×
2946
  }
2947

2948
  public String getInsertWriteLockHolder() {
2949
    return insertWriteLockHolder;
×
2950
  }
2951

2952
  public ScheduledExecutorService getTimedCompactionScheduleTask() {
2953
    return timedCompactionScheduleTask;
×
2954
  }
2955

2956
  /** This method could only be used in iot consensus */
2957
  public IWALNode getWALNode() {
2958
    if (!config.getDataRegionConsensusProtocolClass().equals(ConsensusFactory.IOT_CONSENSUS)) {
×
2959
      throw new UnsupportedOperationException();
×
2960
    }
2961
    // identifier should be same with getTsFileProcessor method
2962
    return WALManager.getInstance()
×
2963
        .applyForWALNode(databaseName + FILE_NAME_SEPARATOR + dataRegionId);
×
2964
  }
2965

2966
  /** Wait for this data region successfully deleted */
2967
  public void waitForDeleted() {
2968
    writeLock("waitForDeleted");
×
2969
    try {
2970
      if (!deleted) {
×
2971
        deletedCondition.await();
×
2972
      }
2973
    } catch (InterruptedException e) {
×
2974
      logger.error("Interrupted When waiting for data region deleted.");
×
2975
      Thread.currentThread().interrupt();
×
2976
    } finally {
2977
      writeUnlock();
×
2978
    }
2979
  }
×
2980

2981
  /** Release all threads waiting for this data region successfully deleted */
2982
  public void markDeleted() {
2983
    writeLock("markDeleted");
1✔
2984
    try {
2985
      deleted = true;
1✔
2986
      deletedCondition.signalAll();
1✔
2987
    } finally {
2988
      writeUnlock();
1✔
2989
    }
2990
  }
1✔
2991

2992
  public void releaseFlushTimeMap(long timePartitionId) {
2993
    lastFlushTimeMap.removePartition(timePartitionId);
×
2994
  }
×
2995

2996
  public long getMemCost() {
2997
    return dataRegionInfo.getMemCost();
×
2998
  }
2999

3000
  @Override
3001
  public long getDataTTL() {
3002
    return dataTTL;
×
3003
  }
3004

3005
  @TestOnly
3006
  public ILastFlushTimeMap getLastFlushTimeMap() {
3007
    return lastFlushTimeMap;
1✔
3008
  }
3009

3010
  @TestOnly
3011
  public TsFileManager getTsFileManager() {
3012
    return tsFileManager;
1✔
3013
  }
3014
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc