• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

apache / iotdb / #9703

pending completion
#9703

push

travis_ci

web-flow
[IOTDB-6067] Pipe: Improve the stability of iotdb-thrift-connector-v2 during fault tolerance (avoid OOM) (#10550) (#10719)

Co-authored-by: Steve Yurong Su <rong@apache.org>
(cherry picked from commit f280381be)

283 of 283 new or added lines in 14 files covered. (100.0%)

79217 of 165033 relevant lines covered (48.0%)

0.48 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

50.53
/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java
1
/*
2
 * Licensed to the Apache Software Foundation (ASF) under one
3
 * or more contributor license agreements.  See the NOTICE file
4
 * distributed with this work for additional information
5
 * regarding copyright ownership.  The ASF licenses this file
6
 * to you under the Apache License, Version 2.0 (the
7
 * "License"); you may not use this file except in compliance
8
 * with the License.  You may obtain a copy of the License at
9
 *
10
 *      http://www.apache.org/licenses/LICENSE-2.0
11
 *
12
 * Unless required by applicable law or agreed to in writing,
13
 * software distributed under the License is distributed on an
14
 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15
 * KIND, either express or implied.  See the License for the
16
 * specific language governing permissions and limitations
17
 * under the License.
18
 */
19

20
package org.apache.iotdb.db.storageengine.dataregion;
21

22
import org.apache.iotdb.common.rpc.thrift.TSStatus;
23
import org.apache.iotdb.commons.cluster.NodeStatus;
24
import org.apache.iotdb.commons.concurrent.IoTDBThreadPoolFactory;
25
import org.apache.iotdb.commons.concurrent.ThreadName;
26
import org.apache.iotdb.commons.concurrent.threadpool.ScheduledExecutorUtil;
27
import org.apache.iotdb.commons.conf.CommonDescriptor;
28
import org.apache.iotdb.commons.consensus.DataRegionId;
29
import org.apache.iotdb.commons.exception.IllegalPathException;
30
import org.apache.iotdb.commons.exception.MetadataException;
31
import org.apache.iotdb.commons.file.SystemFileFactory;
32
import org.apache.iotdb.commons.path.PartialPath;
33
import org.apache.iotdb.commons.service.metric.MetricService;
34
import org.apache.iotdb.commons.service.metric.PerformanceOverviewMetrics;
35
import org.apache.iotdb.commons.utils.CommonDateTimeUtils;
36
import org.apache.iotdb.commons.utils.TestOnly;
37
import org.apache.iotdb.consensus.ConsensusFactory;
38
import org.apache.iotdb.db.conf.IoTDBConfig;
39
import org.apache.iotdb.db.conf.IoTDBDescriptor;
40
import org.apache.iotdb.db.exception.BatchProcessException;
41
import org.apache.iotdb.db.exception.DataRegionException;
42
import org.apache.iotdb.db.exception.DiskSpaceInsufficientException;
43
import org.apache.iotdb.db.exception.LoadFileException;
44
import org.apache.iotdb.db.exception.TsFileProcessorException;
45
import org.apache.iotdb.db.exception.WriteProcessException;
46
import org.apache.iotdb.db.exception.WriteProcessRejectException;
47
import org.apache.iotdb.db.exception.query.OutOfTTLException;
48
import org.apache.iotdb.db.exception.query.QueryProcessException;
49
import org.apache.iotdb.db.exception.quota.ExceedQuotaException;
50
import org.apache.iotdb.db.queryengine.execution.fragment.QueryContext;
51
import org.apache.iotdb.db.queryengine.metric.QueryResourceMetricSet;
52
import org.apache.iotdb.db.queryengine.plan.analyze.cache.schema.DataNodeSchemaCache;
53
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNodeId;
54
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.DeleteDataNode;
55
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertMultiTabletsNode;
56
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertRowNode;
57
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertRowsNode;
58
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertRowsOfOneDeviceNode;
59
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertTabletNode;
60
import org.apache.iotdb.db.service.SettleService;
61
import org.apache.iotdb.db.service.metrics.FileMetrics;
62
import org.apache.iotdb.db.storageengine.StorageEngine;
63
import org.apache.iotdb.db.storageengine.buffer.BloomFilterCache;
64
import org.apache.iotdb.db.storageengine.buffer.ChunkCache;
65
import org.apache.iotdb.db.storageengine.buffer.TimeSeriesMetadataCache;
66
import org.apache.iotdb.db.storageengine.dataregion.compaction.execute.recover.CompactionRecoverManager;
67
import org.apache.iotdb.db.storageengine.dataregion.compaction.execute.task.AbstractCompactionTask;
68
import org.apache.iotdb.db.storageengine.dataregion.compaction.schedule.CompactionScheduler;
69
import org.apache.iotdb.db.storageengine.dataregion.compaction.schedule.CompactionTaskManager;
70
import org.apache.iotdb.db.storageengine.dataregion.flush.CloseFileListener;
71
import org.apache.iotdb.db.storageengine.dataregion.flush.FlushListener;
72
import org.apache.iotdb.db.storageengine.dataregion.flush.FlushStatus;
73
import org.apache.iotdb.db.storageengine.dataregion.flush.TsFileFlushPolicy;
74
import org.apache.iotdb.db.storageengine.dataregion.memtable.IMemTable;
75
import org.apache.iotdb.db.storageengine.dataregion.memtable.TsFileProcessor;
76
import org.apache.iotdb.db.storageengine.dataregion.memtable.TsFileProcessorInfo;
77
import org.apache.iotdb.db.storageengine.dataregion.modification.Deletion;
78
import org.apache.iotdb.db.storageengine.dataregion.modification.ModificationFile;
79
import org.apache.iotdb.db.storageengine.dataregion.read.QueryDataSource;
80
import org.apache.iotdb.db.storageengine.dataregion.read.control.FileReaderManager;
81
import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileManager;
82
import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResource;
83
import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResourceStatus;
84
import org.apache.iotdb.db.storageengine.dataregion.tsfile.generator.TsFileNameGenerator;
85
import org.apache.iotdb.db.storageengine.dataregion.tsfile.generator.VersionController;
86
import org.apache.iotdb.db.storageengine.dataregion.wal.WALManager;
87
import org.apache.iotdb.db.storageengine.dataregion.wal.node.IWALNode;
88
import org.apache.iotdb.db.storageengine.dataregion.wal.recover.WALRecoverManager;
89
import org.apache.iotdb.db.storageengine.dataregion.wal.recover.file.SealedTsFileRecoverPerformer;
90
import org.apache.iotdb.db.storageengine.dataregion.wal.recover.file.UnsealedTsFileRecoverPerformer;
91
import org.apache.iotdb.db.storageengine.dataregion.wal.utils.WALMode;
92
import org.apache.iotdb.db.storageengine.dataregion.wal.utils.listener.WALFlushListener;
93
import org.apache.iotdb.db.storageengine.dataregion.wal.utils.listener.WALRecoverListener;
94
import org.apache.iotdb.db.storageengine.rescon.disk.TierManager;
95
import org.apache.iotdb.db.storageengine.rescon.memory.TimePartitionInfo;
96
import org.apache.iotdb.db.storageengine.rescon.memory.TimePartitionManager;
97
import org.apache.iotdb.db.storageengine.rescon.memory.TsFileResourceManager;
98
import org.apache.iotdb.db.storageengine.rescon.quotas.DataNodeSpaceQuotaManager;
99
import org.apache.iotdb.db.tools.settle.TsFileAndModSettleTool;
100
import org.apache.iotdb.db.utils.CopyOnReadLinkedList;
101
import org.apache.iotdb.db.utils.DateTimeUtils;
102
import org.apache.iotdb.rpc.RpcUtils;
103
import org.apache.iotdb.rpc.TSStatusCode;
104
import org.apache.iotdb.tsfile.file.metadata.ChunkMetadata;
105
import org.apache.iotdb.tsfile.fileSystem.FSFactoryProducer;
106
import org.apache.iotdb.tsfile.fileSystem.FSType;
107
import org.apache.iotdb.tsfile.fileSystem.fsFactory.FSFactory;
108
import org.apache.iotdb.tsfile.read.filter.basic.Filter;
109
import org.apache.iotdb.tsfile.utils.FSUtils;
110
import org.apache.iotdb.tsfile.utils.Pair;
111
import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
112
import org.apache.iotdb.tsfile.write.writer.RestorableTsFileIOWriter;
113

114
import org.apache.commons.io.FileUtils;
115
import org.slf4j.Logger;
116
import org.slf4j.LoggerFactory;
117

118
import java.io.File;
119
import java.io.IOException;
120
import java.nio.file.Files;
121
import java.util.ArrayList;
122
import java.util.Arrays;
123
import java.util.Collection;
124
import java.util.Collections;
125
import java.util.Comparator;
126
import java.util.Date;
127
import java.util.HashMap;
128
import java.util.HashSet;
129
import java.util.Iterator;
130
import java.util.List;
131
import java.util.Map;
132
import java.util.Map.Entry;
133
import java.util.Set;
134
import java.util.TreeMap;
135
import java.util.concurrent.ScheduledExecutorService;
136
import java.util.concurrent.TimeUnit;
137
import java.util.concurrent.atomic.AtomicBoolean;
138
import java.util.concurrent.atomic.AtomicLong;
139
import java.util.concurrent.locks.Condition;
140
import java.util.concurrent.locks.ReadWriteLock;
141
import java.util.concurrent.locks.ReentrantReadWriteLock;
142

143
import static org.apache.iotdb.commons.conf.IoTDBConstant.FILE_NAME_SEPARATOR;
144
import static org.apache.iotdb.db.queryengine.metric.QueryResourceMetricSet.SEQUENCE_TSFILE;
145
import static org.apache.iotdb.db.queryengine.metric.QueryResourceMetricSet.UNSEQUENCE_TSFILE;
146
import static org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResource.TEMP_SUFFIX;
147
import static org.apache.iotdb.tsfile.common.constant.TsFileConstant.TSFILE_SUFFIX;
148

149
/**
150
 * For sequence data, a DataRegion has some TsFileProcessors, in which there is only one
151
 * TsFileProcessor in the working status. <br>
152
 *
153
 * <p>There are two situations to set the working TsFileProcessor to closing status:<br>
154
 *
155
 * <p>(1) when inserting data into the TsFileProcessor, and the TsFileProcessor shouldFlush() (or
156
 * shouldClose())<br>
157
 *
158
 * <p>(2) someone calls syncCloseAllWorkingTsFileProcessors(). (up to now, only flush command from
159
 * cli will call this method)<br>
160
 *
161
 * <p>UnSequence data has the similar process as above.
162
 *
163
 * <p>When a sequence TsFileProcessor is submitted to be flushed, the
164
 * updateLatestFlushTimeCallback() method will be called as a callback.<br>
165
 *
166
 * <p>When a TsFileProcessor is closed, the closeUnsealedTsFileProcessorCallBack() method will be
167
 * called as a callback.
168
 */
169
public class DataRegion implements IDataRegionForQuery {
170

171
  private static final IoTDBConfig config = IoTDBDescriptor.getInstance().getConfig();
1✔
172
  private static final Logger DEBUG_LOGGER = LoggerFactory.getLogger("QUERY_DEBUG");
1✔
173

174
  /**
175
   * All newly generated chunks after merge have version number 0, so we set merged Modification
176
   * file version to 1 to take effect.
177
   */
178
  private static final int MERGE_MOD_START_VERSION_NUM = 1;
179

180
  private static final Logger logger = LoggerFactory.getLogger(DataRegion.class);
1✔
181

182
  private final boolean enableMemControl = config.isEnableMemControl();
1✔
183
  /**
184
   * a read write lock for guaranteeing concurrent safety when accessing all fields in this class
185
   * (i.e., schema, (un)sequenceFileList, work(un)SequenceTsFileProcessor,
186
   * closing(Un)SequenceTsFileProcessor, latestTimeForEachDevice, and
187
   * partitionLatestFlushedTimeForEachDevice)
188
   */
189
  private final ReadWriteLock insertLock = new ReentrantReadWriteLock();
1✔
190
  /** condition to safely delete data region. */
191
  private final Condition deletedCondition = insertLock.writeLock().newCondition();
1✔
192
  /** data region has been deleted or not. */
193
  private volatile boolean deleted = false;
1✔
194
  /** closeStorageGroupCondition is used to wait for all currently closing TsFiles to be done. */
195
  private final Object closeStorageGroupCondition = new Object();
1✔
196
  /**
197
   * avoid some tsfileResource is changed (e.g., from unsealed to sealed) when a read is executed.
198
   */
199
  private final ReadWriteLock closeQueryLock = new ReentrantReadWriteLock();
1✔
200
  /** time partition id in the database -> tsFileProcessor for this time partition. */
201
  private final TreeMap<Long, TsFileProcessor> workSequenceTsFileProcessors = new TreeMap<>();
1✔
202
  /** time partition id in the database -> tsFileProcessor for this time partition. */
203
  private final TreeMap<Long, TsFileProcessor> workUnsequenceTsFileProcessors = new TreeMap<>();
1✔
204

205
  /** sequence tsfile processors which are closing. */
206
  private final CopyOnReadLinkedList<TsFileProcessor> closingSequenceTsFileProcessor =
1✔
207
      new CopyOnReadLinkedList<>();
208

209
  /** unsequence tsfile processors which are closing. */
210
  private final CopyOnReadLinkedList<TsFileProcessor> closingUnSequenceTsFileProcessor =
1✔
211
      new CopyOnReadLinkedList<>();
212

213
  private final AtomicBoolean isSettling = new AtomicBoolean();
1✔
214

215
  /** data region id. */
216
  private final String dataRegionId;
217
  /** database name. */
218
  private final String databaseName;
219
  /** database system directory. */
220
  private File storageGroupSysDir;
221
  /** manage seqFileList and unSeqFileList. */
222
  private final TsFileManager tsFileManager;
223

224
  /** manage tsFileResource degrade. */
225
  private final TsFileResourceManager tsFileResourceManager = TsFileResourceManager.getInstance();
1✔
226

227
  /**
228
   * time partition id -> version controller which assigns a version for each MemTable and
229
   * deletion/update such that after they are persisted, the order of insertions, deletions and
230
   * updates can be re-determined. Will be empty if there are not MemTables in memory.
231
   */
232
  private final HashMap<Long, VersionController> timePartitionIdVersionControllerMap =
1✔
233
      new HashMap<>();
234
  /**
235
   * when the data in a database is older than dataTTL, it is considered invalid and will be
236
   * eventually removed.
237
   */
238
  private long dataTTL = Long.MAX_VALUE;
1✔
239
  /** file system factory (local or hdfs). */
240
  private final FSFactory fsFactory = FSFactoryProducer.getFSFactory();
1✔
241
  /** file flush policy. */
242
  private TsFileFlushPolicy fileFlushPolicy;
243
  /**
244
   * The max file versions in each partition. By recording this, if several IoTDB instances have the
245
   * same policy of closing file and their ingestion is identical, then files of the same version in
246
   * different IoTDB instance will have identical data, providing convenience for data comparison
247
   * across different instances. partition number -> max version number
248
   */
249
  private Map<Long, Long> partitionMaxFileVersions = new HashMap<>();
1✔
250
  /** database info for mem control. */
251
  private final DataRegionInfo dataRegionInfo = new DataRegionInfo(this);
1✔
252
  /** whether it's ready from recovery. */
253
  private boolean isReady = false;
1✔
254
  /** close file listeners. */
255
  private List<CloseFileListener> customCloseFileListeners = Collections.emptyList();
1✔
256
  /** flush listeners. */
257
  private List<FlushListener> customFlushListeners = Collections.emptyList();
1✔
258

259
  private ILastFlushTimeMap lastFlushTimeMap;
260

261
  /**
262
   * record the insertWriteLock in SG is being hold by which method, it will be empty string if no
263
   * one holds the insertWriteLock.
264
   */
265
  private String insertWriteLockHolder = "";
1✔
266

267
  private ScheduledExecutorService timedCompactionScheduleTask;
268

269
  public static final long COMPACTION_TASK_SUBMIT_DELAY = 20L * 1000L;
270

271
  private static final QueryResourceMetricSet QUERY_RESOURCE_METRIC_SET =
272
      QueryResourceMetricSet.getInstance();
1✔
273

274
  private static final PerformanceOverviewMetrics PERFORMANCE_OVERVIEW_METRICS =
1✔
275
      PerformanceOverviewMetrics.getInstance();
1✔
276

277
  /**
278
   * construct a database processor.
279
   *
280
   * @param systemDir system dir path
281
   * @param dataRegionId data region id e.g. 1
282
   * @param fileFlushPolicy file flush policy
283
   * @param databaseName database name e.g. root.sg1
284
   */
285
  public DataRegion(
286
      String systemDir, String dataRegionId, TsFileFlushPolicy fileFlushPolicy, String databaseName)
287
      throws DataRegionException {
1✔
288
    this.dataRegionId = dataRegionId;
1✔
289
    this.databaseName = databaseName;
1✔
290
    this.fileFlushPolicy = fileFlushPolicy;
1✔
291

292
    storageGroupSysDir = SystemFileFactory.INSTANCE.getFile(systemDir, dataRegionId);
1✔
293
    this.tsFileManager =
1✔
294
        new TsFileManager(databaseName, dataRegionId, storageGroupSysDir.getPath());
1✔
295
    if (storageGroupSysDir.mkdirs()) {
1✔
296
      logger.info(
1✔
297
          "Database system Directory {} doesn't exist, create it", storageGroupSysDir.getPath());
1✔
298
    } else if (!storageGroupSysDir.exists()) {
1✔
299
      logger.error("create database system Directory {} failed", storageGroupSysDir.getPath());
×
300
    }
301

302
    lastFlushTimeMap = new HashLastFlushTimeMap(tsFileManager);
1✔
303

304
    // recover tsfiles unless consensus protocol is ratis and storage storageengine is not ready
305
    if (config.isClusterMode()
1✔
306
        && config.getDataRegionConsensusProtocolClass().equals(ConsensusFactory.RATIS_CONSENSUS)
×
307
        && !StorageEngine.getInstance().isAllSgReady()) {
×
308
      logger.debug(
×
309
          "Skip recovering data region {}[{}] when consensus protocol is ratis and storage storageengine is not ready.",
310
          databaseName,
311
          dataRegionId);
312
      for (String fileFolder : TierManager.getInstance().getAllFilesFolders()) {
×
313
        File dataRegionFolder =
×
314
            fsFactory.getFile(fileFolder, databaseName + File.separator + dataRegionId);
×
315
        try {
316
          fsFactory.deleteDirectory(dataRegionFolder.getPath());
×
317
        } catch (IOException e) {
×
318
          logger.error(
×
319
              "Exception occurs when deleting data region folder for {}-{}",
320
              databaseName,
321
              dataRegionId,
322
              e);
323
        }
×
324
        if (FSUtils.getFSType(dataRegionFolder) == FSType.LOCAL) {
×
325
          dataRegionFolder.mkdirs();
×
326
        }
327
      }
×
328
    } else {
329
      recover();
1✔
330
    }
331

332
    MetricService.getInstance().addMetricSet(new DataRegionMetrics(this));
1✔
333
  }
1✔
334

335
  @TestOnly
336
  public DataRegion(String databaseName, String id) {
1✔
337
    this.databaseName = databaseName;
1✔
338
    this.dataRegionId = id;
1✔
339
    this.tsFileManager = new TsFileManager(databaseName, id, "");
1✔
340
    this.partitionMaxFileVersions = new HashMap<>();
1✔
341
    partitionMaxFileVersions.put(0L, 0L);
1✔
342
  }
1✔
343

344
  @Override
345
  public String getDatabaseName() {
346
    return databaseName;
1✔
347
  }
348

349
  public boolean isReady() {
350
    return isReady;
×
351
  }
352

353
  public void setReady(boolean ready) {
354
    isReady = ready;
×
355
  }
×
356

357
  private Map<Long, List<TsFileResource>> splitResourcesByPartition(
358
      List<TsFileResource> resources) {
359
    Map<Long, List<TsFileResource>> ret = new TreeMap<>();
1✔
360
    for (TsFileResource resource : resources) {
1✔
361
      ret.computeIfAbsent(resource.getTimePartition(), l -> new ArrayList<>()).add(resource);
1✔
362
    }
1✔
363
    return ret;
1✔
364
  }
365

366
  public AtomicBoolean getIsSettling() {
367
    return isSettling;
×
368
  }
369

370
  public void setSettling(boolean isSettling) {
371
    this.isSettling.set(isSettling);
×
372
  }
×
373

374
  /** this class is used to store recovering context. */
375
  private class DataRegionRecoveryContext {
376
    /** number of files to be recovered. */
377
    private final long numOfFilesToRecover;
378
    /** when the change of recoveredFilesNum exceeds this, log check will be triggered. */
379
    private final long filesNumLogCheckTrigger;
380
    /** number of already recovered files. */
381
    private long recoveredFilesNum;
382
    /** last recovery log time. */
383
    private long lastLogTime;
384
    /** last recovery log files num. */
385
    private long lastLogCheckFilesNum;
386
    /** recover performers of unsealed TsFiles. */
387
    private final List<UnsealedTsFileRecoverPerformer> recoverPerformers = new ArrayList<>();
1✔
388

389
    public DataRegionRecoveryContext(long numOfFilesToRecover) {
1✔
390
      this.numOfFilesToRecover = numOfFilesToRecover;
1✔
391
      this.recoveredFilesNum = 0;
1✔
392
      this.filesNumLogCheckTrigger = this.numOfFilesToRecover / 100;
1✔
393
      this.lastLogTime = System.currentTimeMillis();
1✔
394
      this.lastLogCheckFilesNum = 0;
1✔
395
    }
1✔
396

397
    public void incrementRecoveredFilesNum() {
398
      recoveredFilesNum++;
1✔
399
      // check log only when 1% more files have been recovered
400
      if (lastLogCheckFilesNum + filesNumLogCheckTrigger < recoveredFilesNum) {
1✔
401
        lastLogCheckFilesNum = recoveredFilesNum;
1✔
402
        // log only when log interval exceeds recovery log interval
403
        if (lastLogTime + config.getRecoveryLogIntervalInMs() < System.currentTimeMillis()) {
1✔
404
          logger.info(
×
405
              "The data region {}[{}] has recovered {}%, please wait a moment.",
406
              databaseName, dataRegionId, recoveredFilesNum * 1.0 / numOfFilesToRecover);
×
407
          lastLogTime = System.currentTimeMillis();
×
408
        }
409
      }
410
    }
1✔
411
  }
412

413
  /** recover from file */
414
  @SuppressWarnings({"squid:S3776", "squid:S6541"}) // Suppress high Cognitive Complexity warning
415
  private void recover() throws DataRegionException {
416
    try {
417
      recoverCompaction();
1✔
418
    } catch (Exception e) {
×
419
      throw new DataRegionException(e);
×
420
    }
1✔
421

422
    try {
423
      // collect candidate TsFiles from sequential and unsequential data directory
424
      List<TsFileResource> tmpSeqTsFiles =
1✔
425
          getAllFiles(TierManager.getInstance().getAllLocalSequenceFileFolders());
1✔
426
      List<TsFileResource> tmpUnseqTsFiles =
1✔
427
          getAllFiles(TierManager.getInstance().getAllLocalUnSequenceFileFolders());
1✔
428

429
      // split by partition so that we can find the last file of each partition and decide to
430
      // close it or not
431
      DataRegionRecoveryContext dataRegionRecoveryContext =
1✔
432
          new DataRegionRecoveryContext((long) tmpSeqTsFiles.size() + tmpUnseqTsFiles.size());
1✔
433
      Map<Long, List<TsFileResource>> partitionTmpSeqTsFiles =
1✔
434
          splitResourcesByPartition(tmpSeqTsFiles);
1✔
435
      Map<Long, List<TsFileResource>> partitionTmpUnseqTsFiles =
1✔
436
          splitResourcesByPartition(tmpUnseqTsFiles);
1✔
437
      // submit unsealed TsFiles to recover
438
      List<WALRecoverListener> recoverListeners = new ArrayList<>();
1✔
439
      for (List<TsFileResource> value : partitionTmpSeqTsFiles.values()) {
1✔
440
        // tsFiles without resource file are unsealed
441
        for (TsFileResource resource : value) {
1✔
442
          if (resource.resourceFileExists()) {
1✔
443
            FileMetrics.getInstance()
1✔
444
                .addFile(resource.getTsFile().length(), true, resource.getTsFile().getName());
1✔
445
            if (resource.getModFile().exists()) {
1✔
446
              FileMetrics.getInstance().increaseModFileNum(1);
×
447
              FileMetrics.getInstance().increaseModFileSize(resource.getModFile().getSize());
×
448
            }
449
          }
450
        }
1✔
451
        while (!value.isEmpty()) {
1✔
452
          TsFileResource tsFileResource = value.get(value.size() - 1);
1✔
453
          if (tsFileResource.resourceFileExists()) {
1✔
454
            break;
1✔
455
          } else {
456
            value.remove(value.size() - 1);
×
457
            WALRecoverListener recoverListener =
×
458
                recoverUnsealedTsFile(tsFileResource, dataRegionRecoveryContext, true);
×
459
            if (recoverListener != null) {
×
460
              recoverListeners.add(recoverListener);
×
461
            }
462
          }
463
        }
×
464
      }
1✔
465
      for (List<TsFileResource> value : partitionTmpUnseqTsFiles.values()) {
1✔
466
        // tsFiles without resource file are unsealed
467
        for (TsFileResource resource : value) {
×
468
          if (resource.resourceFileExists()) {
×
469
            FileMetrics.getInstance()
×
470
                .addFile(resource.getTsFile().length(), false, resource.getTsFile().getName());
×
471
          }
472
          if (resource.getModFile().exists()) {
×
473
            FileMetrics.getInstance().increaseModFileNum(1);
×
474
            FileMetrics.getInstance().increaseModFileSize(resource.getModFile().getSize());
×
475
          }
476
        }
×
477
        while (!value.isEmpty()) {
×
478
          TsFileResource tsFileResource = value.get(value.size() - 1);
×
479
          if (tsFileResource.resourceFileExists()) {
×
480
            break;
×
481
          } else {
482
            value.remove(value.size() - 1);
×
483
            WALRecoverListener recoverListener =
×
484
                recoverUnsealedTsFile(tsFileResource, dataRegionRecoveryContext, false);
×
485
            if (recoverListener != null) {
×
486
              recoverListeners.add(recoverListener);
×
487
            }
488
          }
489
        }
×
490
      }
×
491
      // signal wal recover manager to recover this region's files
492
      WALRecoverManager.getInstance().getAllDataRegionScannedLatch().countDown();
1✔
493
      // recover sealed TsFiles
494
      if (!partitionTmpSeqTsFiles.isEmpty()) {
1✔
495
        long latestPartitionId =
1✔
496
            ((TreeMap<Long, List<TsFileResource>>) partitionTmpSeqTsFiles).lastKey();
1✔
497
        for (Entry<Long, List<TsFileResource>> partitionFiles : partitionTmpSeqTsFiles.entrySet()) {
1✔
498
          recoverFilesInPartition(
1✔
499
              partitionFiles.getKey(),
1✔
500
              dataRegionRecoveryContext,
501
              partitionFiles.getValue(),
1✔
502
              true,
503
              partitionFiles.getKey() == latestPartitionId);
1✔
504
        }
1✔
505
      }
506
      for (Entry<Long, List<TsFileResource>> partitionFiles : partitionTmpUnseqTsFiles.entrySet()) {
1✔
507
        recoverFilesInPartition(
×
508
            partitionFiles.getKey(),
×
509
            dataRegionRecoveryContext,
510
            partitionFiles.getValue(),
×
511
            false,
512
            false);
513
      }
×
514
      // wait until all unsealed TsFiles have been recovered
515
      for (WALRecoverListener recoverListener : recoverListeners) {
1✔
516
        if (recoverListener.waitForResult() == WALRecoverListener.Status.FAILURE) {
×
517
          logger.error(
×
518
              "Fail to recover unsealed TsFile {}, skip it.",
519
              recoverListener.getFilePath(),
×
520
              recoverListener.getCause());
×
521
        }
522
        // update VSGRecoveryContext
523
        dataRegionRecoveryContext.incrementRecoveredFilesNum();
×
524
      }
×
525
      // recover unsealed TsFiles, sort make sure last flush time not be replaced by early files
526
      dataRegionRecoveryContext.recoverPerformers.sort(
1✔
527
          (p1, p2) ->
528
              compareFileName(
×
529
                  p1.getTsFileResource().getTsFile(), p2.getTsFileResource().getTsFile()));
×
530
      for (UnsealedTsFileRecoverPerformer recoverPerformer :
531
          dataRegionRecoveryContext.recoverPerformers) {
1✔
532
        recoverUnsealedTsFileCallBack(recoverPerformer);
×
533
      }
×
534
      for (TsFileResource resource : tsFileManager.getTsFileList(true)) {
1✔
535
        long partitionNum = resource.getTimePartition();
1✔
536
        updatePartitionFileVersion(partitionNum, resource.getVersion());
1✔
537
      }
1✔
538
      for (TsFileResource resource : tsFileManager.getTsFileList(false)) {
1✔
539
        long partitionNum = resource.getTimePartition();
×
540
        updatePartitionFileVersion(partitionNum, resource.getVersion());
×
541
      }
×
542
    } catch (IOException e) {
×
543
      throw new DataRegionException(e);
×
544
    }
1✔
545

546
    // recover and start timed compaction thread
547
    initCompaction();
1✔
548

549
    if (StorageEngine.getInstance().isAllSgReady()) {
1✔
550
      logger.info("The data region {}[{}] is created successfully", databaseName, dataRegionId);
1✔
551
    } else {
552
      logger.info("The data region {}[{}] is recovered successfully", databaseName, dataRegionId);
1✔
553
    }
554
  }
1✔
555

556
  private void updateLastFlushTime(TsFileResource resource, boolean isSeq) {
557
    //  only update flush time when it is a seq file
558
    if (isSeq) {
1✔
559
      long timePartitionId = resource.getTimePartition();
1✔
560
      Map<String, Long> endTimeMap = new HashMap<>();
1✔
561
      for (String deviceId : resource.getDevices()) {
1✔
562
        long endTime = resource.getEndTime(deviceId);
1✔
563
        endTimeMap.put(deviceId.intern(), endTime);
1✔
564
      }
1✔
565
      lastFlushTimeMap.setMultiDeviceFlushedTime(timePartitionId, endTimeMap);
1✔
566
      lastFlushTimeMap.setMultiDeviceGlobalFlushedTime(endTimeMap);
1✔
567
    }
568
  }
1✔
569

570
  private void initCompaction() {
571
    if (!config.isEnableSeqSpaceCompaction()
1✔
572
        && !config.isEnableUnseqSpaceCompaction()
×
573
        && !config.isEnableCrossSpaceCompaction()) {
×
574
      return;
×
575
    }
576
    timedCompactionScheduleTask =
1✔
577
        IoTDBThreadPoolFactory.newSingleThreadScheduledExecutor(
1✔
578
            ThreadName.COMPACTION_SCHEDULE.getName() + "-" + databaseName + "-" + dataRegionId);
1✔
579
    ScheduledExecutorUtil.safelyScheduleWithFixedDelay(
1✔
580
        timedCompactionScheduleTask,
581
        this::executeCompaction,
582
        COMPACTION_TASK_SUBMIT_DELAY,
583
        IoTDBDescriptor.getInstance().getConfig().getCompactionScheduleIntervalInMs(),
1✔
584
        TimeUnit.MILLISECONDS);
585
  }
1✔
586

587
  private void recoverCompaction() {
588
    CompactionRecoverManager compactionRecoverManager =
1✔
589
        new CompactionRecoverManager(tsFileManager, databaseName, dataRegionId);
590
    compactionRecoverManager.recoverInnerSpaceCompaction(true);
1✔
591
    compactionRecoverManager.recoverInnerSpaceCompaction(false);
1✔
592
    compactionRecoverManager.recoverCrossSpaceCompaction();
1✔
593
  }
1✔
594

595
  private void updatePartitionFileVersion(long partitionNum, long fileVersion) {
596
    long oldVersion = partitionMaxFileVersions.getOrDefault(partitionNum, 0L);
1✔
597
    if (fileVersion > oldVersion) {
1✔
598
      partitionMaxFileVersions.put(partitionNum, fileVersion);
1✔
599
    }
600
  }
1✔
601

602
  @SuppressWarnings("squid:S3776") // Suppress high Cognitive Complexity warning
603
  private List<TsFileResource> getAllFiles(List<String> folders)
604
      throws IOException, DataRegionException {
605
    // "{partition id}/{tsfile name}" -> tsfile file, remove duplicate files in one time partition
606
    Map<String, File> tsFilePartitionPath2File = new HashMap<>();
1✔
607
    for (String baseDir : folders) {
1✔
608
      File fileFolder = fsFactory.getFile(baseDir + File.separator + databaseName, dataRegionId);
1✔
609
      if (!fileFolder.exists()) {
1✔
610
        continue;
1✔
611
      }
612
      // some TsFileResource may be being persisted when the system crashed, try recovering such
613
      // resources
614
      continueFailedRenames(fileFolder, TEMP_SUFFIX);
1✔
615

616
      File[] subFiles = fileFolder.listFiles();
1✔
617
      if (subFiles != null) {
1✔
618
        for (File partitionFolder : subFiles) {
1✔
619
          if (!partitionFolder.isDirectory()) {
1✔
620
            logger.warn("{} is not a directory.", partitionFolder.getAbsolutePath());
1✔
621
          } else {
622
            // some TsFileResource may be being persisted when the system crashed, try recovering
623
            // such resources
624
            continueFailedRenames(partitionFolder, TEMP_SUFFIX);
1✔
625
            String partitionName = partitionFolder.getName();
1✔
626
            File[] tsFilesInThisFolder =
1✔
627
                fsFactory.listFilesBySuffix(partitionFolder.getAbsolutePath(), TSFILE_SUFFIX);
1✔
628
            for (File f : tsFilesInThisFolder) {
1✔
629
              String tsFilePartitionPath = partitionName + File.separator + f.getName();
1✔
630
              tsFilePartitionPath2File.put(tsFilePartitionPath, f);
1✔
631
            }
632
          }
633
        }
634
      }
635
    }
1✔
636

637
    List<File> sortedFiles = new ArrayList<>(tsFilePartitionPath2File.values());
1✔
638
    sortedFiles.sort(this::compareFileName);
1✔
639

640
    long currentTime = System.currentTimeMillis();
1✔
641
    List<TsFileResource> ret = new ArrayList<>();
1✔
642
    for (File f : sortedFiles) {
1✔
643
      checkTsFileTime(f, currentTime);
1✔
644
      ret.add(new TsFileResource(f));
1✔
645
    }
1✔
646
    return ret;
1✔
647
  }
648

649
  private void continueFailedRenames(File fileFolder, String suffix) throws IOException {
650
    File[] files = fsFactory.listFilesBySuffix(fileFolder.getAbsolutePath(), suffix);
1✔
651
    if (files != null) {
1✔
652
      for (File tempResource : files) {
1✔
653
        File originResource = fsFactory.getFile(tempResource.getPath().replace(suffix, ""));
×
654
        if (originResource.exists()) {
×
655
          Files.delete(tempResource.toPath());
×
656
        } else {
657
          Files.move(tempResource.toPath(), originResource.toPath());
×
658
        }
659
      }
660
    }
661
  }
1✔
662

663
  /** check if the tsfile's time is smaller than system current time. */
664
  private void checkTsFileTime(File tsFile, long currentTime) throws DataRegionException {
665
    String[] items = tsFile.getName().replace(TSFILE_SUFFIX, "").split(FILE_NAME_SEPARATOR);
1✔
666
    long fileTime = Long.parseLong(items[0]);
1✔
667
    if (fileTime > currentTime) {
1✔
668
      throw new DataRegionException(
×
669
          String.format(
×
670
              "data region %s[%s] is down, because the time of tsfile %s is larger than system current time, "
671
                  + "file time is %d while system current time is %d, please check it.",
672
              databaseName, dataRegionId, tsFile.getAbsolutePath(), fileTime, currentTime));
×
673
    }
674
  }
1✔
675

676
  /** submit unsealed TsFile to WALRecoverManager. */
677
  private WALRecoverListener recoverUnsealedTsFile(
678
      TsFileResource unsealedTsFile, DataRegionRecoveryContext context, boolean isSeq) {
679
    UnsealedTsFileRecoverPerformer recoverPerformer =
×
680
        new UnsealedTsFileRecoverPerformer(unsealedTsFile, isSeq, context.recoverPerformers::add);
×
681
    // remember to close UnsealedTsFileRecoverPerformer
682
    return WALRecoverManager.getInstance().addRecoverPerformer(recoverPerformer);
×
683
  }
684

685
  private void recoverUnsealedTsFileCallBack(UnsealedTsFileRecoverPerformer recoverPerformer) {
686
    try {
687
      TsFileResource tsFileResource = recoverPerformer.getTsFileResource();
×
688
      boolean isSeq = recoverPerformer.isSequence();
×
689
      if (!recoverPerformer.canWrite()) {
×
690
        // cannot write, just close it
691
        try {
692
          tsFileResource.close();
×
693
        } catch (IOException e) {
×
694
          logger.error("Fail to close TsFile {} when recovering", tsFileResource.getTsFile(), e);
×
695
        }
×
696
        updateLastFlushTime(tsFileResource, isSeq);
×
697
        tsFileResourceManager.registerSealedTsFileResource(tsFileResource);
×
698
        FileMetrics.getInstance()
×
699
            .addFile(
×
700
                tsFileResource.getTsFile().length(),
×
701
                recoverPerformer.isSequence(),
×
702
                tsFileResource.getTsFile().getName());
×
703
      } else {
704
        // the last file is not closed, continue writing to it
705
        RestorableTsFileIOWriter writer = recoverPerformer.getWriter();
×
706
        long timePartitionId = tsFileResource.getTimePartition();
×
707
        TimePartitionManager.getInstance()
×
708
            .updateAfterOpeningTsFileProcessor(
×
709
                new DataRegionId(Integer.parseInt(dataRegionId)), timePartitionId);
×
710
        TsFileProcessor tsFileProcessor =
×
711
            new TsFileProcessor(
712
                dataRegionId,
713
                dataRegionInfo,
714
                tsFileResource,
715
                this::closeUnsealedTsFileProcessorCallBack,
716
                isSeq ? this::sequenceFlushCallback : this::unsequenceFlushCallback,
×
717
                isSeq,
718
                writer);
719
        if (isSeq) {
×
720
          workSequenceTsFileProcessors.put(timePartitionId, tsFileProcessor);
×
721
        } else {
722
          workUnsequenceTsFileProcessors.put(timePartitionId, tsFileProcessor);
×
723
        }
724
        tsFileResource.setProcessor(tsFileProcessor);
×
725
        tsFileResource.removeResourceFile();
×
726
        tsFileProcessor.setTimeRangeId(timePartitionId);
×
727
        writer.makeMetadataVisible();
×
728
        if (enableMemControl) {
×
729
          TsFileProcessorInfo tsFileProcessorInfo = new TsFileProcessorInfo(dataRegionInfo);
×
730
          tsFileProcessor.setTsFileProcessorInfo(tsFileProcessorInfo);
×
731
          this.dataRegionInfo.initTsFileProcessorInfo(tsFileProcessor);
×
732
          // get chunkMetadata size
733
          long chunkMetadataSize = 0;
×
734
          for (Map<String, List<ChunkMetadata>> metaMap : writer.getMetadatasForQuery().values()) {
×
735
            for (List<ChunkMetadata> metadatas : metaMap.values()) {
×
736
              for (ChunkMetadata chunkMetadata : metadatas) {
×
737
                chunkMetadataSize += chunkMetadata.calculateRamSize();
×
738
              }
×
739
            }
×
740
          }
×
741
          tsFileProcessorInfo.addTSPMemCost(chunkMetadataSize);
×
742
        }
743
      }
744
      tsFileManager.add(tsFileResource, recoverPerformer.isSequence());
×
745
    } catch (Throwable e) {
×
746
      logger.error(
×
747
          "Fail to recover unsealed TsFile {}, skip it.",
748
          recoverPerformer.getTsFileAbsolutePath(),
×
749
          e);
750
    }
×
751
  }
×
752

753
  /** recover sealed TsFile. */
754
  private void recoverSealedTsFiles(
755
      TsFileResource sealedTsFile, DataRegionRecoveryContext context, boolean isSeq) {
756
    try (SealedTsFileRecoverPerformer recoverPerformer =
1✔
757
        new SealedTsFileRecoverPerformer(sealedTsFile)) {
758
      recoverPerformer.recover();
1✔
759
      // pick up crashed compaction target files
760
      if (recoverPerformer.hasCrashed()) {
1✔
761
        if (TsFileResource.getInnerCompactionCount(sealedTsFile.getTsFile().getName()) > 0) {
×
762
          tsFileManager.addForRecover(sealedTsFile, isSeq);
×
763
          return;
×
764
        } else {
765
          logger.warn(
×
766
              "Sealed TsFile {} has crashed at zero level, truncate and recover it.",
767
              sealedTsFile.getTsFilePath());
×
768
        }
769
      }
770
      sealedTsFile.close();
1✔
771
      tsFileManager.add(sealedTsFile, isSeq);
1✔
772
      tsFileResourceManager.registerSealedTsFileResource(sealedTsFile);
1✔
773
    } catch (Throwable e) {
×
774
      logger.error("Fail to recover sealed TsFile {}, skip it.", sealedTsFile.getTsFilePath(), e);
×
775
    } finally {
776
      // update recovery context
777
      context.incrementRecoveredFilesNum();
1✔
778
    }
779
  }
1✔
780

781
  private void recoverFilesInPartition(
782
      long partitionId,
783
      DataRegionRecoveryContext context,
784
      List<TsFileResource> resourceList,
785
      boolean isSeq,
786
      boolean isLatestPartition) {
787
    for (TsFileResource tsFileResource : resourceList) {
1✔
788
      recoverSealedTsFiles(tsFileResource, context, isSeq);
1✔
789
    }
1✔
790
    if (isLatestPartition && isSeq) {
1✔
791
      lastFlushTimeMap.checkAndCreateFlushedTimePartition(partitionId);
1✔
792
      for (TsFileResource tsFileResource : resourceList) {
1✔
793
        updateLastFlushTime(tsFileResource, true);
1✔
794
      }
1✔
795
      TimePartitionManager.getInstance()
1✔
796
          .registerTimePartitionInfo(
1✔
797
              new TimePartitionInfo(
798
                  new DataRegionId(Integer.parseInt(dataRegionId)),
1✔
799
                  partitionId,
800
                  false,
801
                  Long.MAX_VALUE,
802
                  lastFlushTimeMap.getMemSize(partitionId),
1✔
803
                  true));
804
    }
805
  }
1✔
806

807
  // ({systemTime}-{versionNum}-{mergeNum}.tsfile)
808
  private int compareFileName(File o1, File o2) {
809
    String[] items1 = o1.getName().replace(TSFILE_SUFFIX, "").split(FILE_NAME_SEPARATOR);
1✔
810
    String[] items2 = o2.getName().replace(TSFILE_SUFFIX, "").split(FILE_NAME_SEPARATOR);
1✔
811
    long ver1 = Long.parseLong(items1[0]);
1✔
812
    long ver2 = Long.parseLong(items2[0]);
1✔
813
    int cmp = Long.compare(ver1, ver2);
1✔
814
    if (cmp == 0) {
1✔
815
      return Long.compare(Long.parseLong(items1[1]), Long.parseLong(items2[1]));
×
816
    } else {
817
      return cmp;
1✔
818
    }
819
  }
820

821
  /**
822
   * insert one row of data.
823
   *
824
   * @param insertRowNode one row of data
825
   */
826
  public void insert(InsertRowNode insertRowNode) throws WriteProcessException {
827
    // reject insertions that are out of ttl
828
    if (!isAlive(insertRowNode.getTime())) {
1✔
829
      throw new OutOfTTLException(insertRowNode.getTime(), (DateTimeUtils.currentTime() - dataTTL));
1✔
830
    }
831
    if (enableMemControl) {
1✔
832
      StorageEngine.blockInsertionIfReject(null);
1✔
833
    }
834
    long startTime = System.nanoTime();
1✔
835
    writeLock("InsertRow");
1✔
836
    PERFORMANCE_OVERVIEW_METRICS.recordScheduleLockCost(System.nanoTime() - startTime);
1✔
837
    try {
838
      if (deleted) {
1✔
839
        return;
×
840
      }
841
      // init map
842
      long timePartitionId = StorageEngine.getTimePartition(insertRowNode.getTime());
1✔
843

844
      if (!lastFlushTimeMap.checkAndCreateFlushedTimePartition(timePartitionId)) {
1✔
845
        TimePartitionManager.getInstance()
1✔
846
            .registerTimePartitionInfo(
1✔
847
                new TimePartitionInfo(
848
                    new DataRegionId(Integer.parseInt(dataRegionId)),
1✔
849
                    timePartitionId,
850
                    true,
851
                    Long.MAX_VALUE,
852
                    0,
853
                    tsFileManager.isLatestTimePartition(timePartitionId)));
1✔
854
      }
855

856
      boolean isSequence =
1✔
857
          insertRowNode.getTime()
1✔
858
              > lastFlushTimeMap.getFlushedTime(
1✔
859
                  timePartitionId, insertRowNode.getDevicePath().getFullPath());
1✔
860

861
      // is unsequence and user set config to discard out of order data
862
      if (!isSequence
1✔
863
          && IoTDBDescriptor.getInstance().getConfig().isEnableDiscardOutOfOrderData()) {
1✔
864
        return;
1✔
865
      }
866

867
      // insert to sequence or unSequence file
868
      insertToTsFileProcessor(insertRowNode, isSequence, timePartitionId);
1✔
869
    } finally {
870
      writeUnlock();
1✔
871
    }
872
  }
1✔
873

874
  /**
875
   * Insert a tablet (rows belonging to the same devices) into this database.
876
   *
877
   * @throws BatchProcessException if some of the rows failed to be inserted
878
   */
879
  @SuppressWarnings({"squid:S3776", "squid:S6541"}) // Suppress high Cognitive Complexity warning
880
  public void insertTablet(InsertTabletNode insertTabletNode)
881
      throws BatchProcessException, WriteProcessException {
882
    if (enableMemControl) {
1✔
883
      StorageEngine.blockInsertionIfReject(null);
1✔
884
    }
885
    long startTime = System.nanoTime();
1✔
886
    writeLock("insertTablet");
1✔
887
    PERFORMANCE_OVERVIEW_METRICS.recordScheduleLockCost(System.nanoTime() - startTime);
1✔
888
    try {
889
      if (deleted) {
1✔
890
        return;
×
891
      }
892
      TSStatus[] results = new TSStatus[insertTabletNode.getRowCount()];
1✔
893
      Arrays.fill(results, RpcUtils.SUCCESS_STATUS);
1✔
894
      boolean noFailure = true;
1✔
895

896
      /*
897
       * assume that batch has been sorted by client
898
       */
899
      int loc = 0;
1✔
900
      while (loc < insertTabletNode.getRowCount()) {
1✔
901
        long currTime = insertTabletNode.getTimes()[loc];
1✔
902
        // skip points that do not satisfy TTL
903
        if (!isAlive(currTime)) {
1✔
904
          results[loc] =
×
905
              RpcUtils.getStatus(
×
906
                  TSStatusCode.OUT_OF_TTL,
907
                  String.format(
×
908
                      "Insertion time [%s] is less than ttl time bound [%s]",
909
                      DateTimeUtils.convertLongToDate(currTime),
×
910
                      DateTimeUtils.convertLongToDate(DateTimeUtils.currentTime() - dataTTL)));
×
911
          loc++;
×
912
          noFailure = false;
×
913
        } else {
914
          break;
915
        }
916
      }
×
917
      // loc pointing at first legal position
918
      if (loc == insertTabletNode.getRowCount()) {
1✔
919
        throw new OutOfTTLException(
×
920
            insertTabletNode.getTimes()[insertTabletNode.getTimes().length - 1],
×
921
            (DateTimeUtils.currentTime() - dataTTL));
×
922
      }
923
      // before is first start point
924
      int before = loc;
1✔
925
      // before time partition
926
      long beforeTimePartition =
1✔
927
          StorageEngine.getTimePartition(insertTabletNode.getTimes()[before]);
1✔
928
      // init map
929

930
      if (!lastFlushTimeMap.checkAndCreateFlushedTimePartition(beforeTimePartition)) {
1✔
931
        TimePartitionManager.getInstance()
1✔
932
            .registerTimePartitionInfo(
1✔
933
                new TimePartitionInfo(
934
                    new DataRegionId(Integer.parseInt(dataRegionId)),
1✔
935
                    beforeTimePartition,
936
                    true,
937
                    Long.MAX_VALUE,
938
                    0,
939
                    tsFileManager.isLatestTimePartition(beforeTimePartition)));
1✔
940
      }
941

942
      long lastFlushTime =
1✔
943
          lastFlushTimeMap.getFlushedTime(
1✔
944
              beforeTimePartition, insertTabletNode.getDevicePath().getFullPath());
1✔
945

946
      // if is sequence
947
      boolean isSequence = false;
1✔
948
      while (loc < insertTabletNode.getRowCount()) {
1✔
949
        long time = insertTabletNode.getTimes()[loc];
1✔
950
        // always in some time partition
951
        // judge if we should insert sequence
952
        if (!isSequence && time > lastFlushTime) {
1✔
953
          // insert into unsequence and then start sequence
954
          if (!IoTDBDescriptor.getInstance().getConfig().isEnableDiscardOutOfOrderData()) {
1✔
955
            noFailure =
1✔
956
                insertTabletToTsFileProcessor(
1✔
957
                        insertTabletNode, before, loc, false, results, beforeTimePartition)
958
                    && noFailure;
959
          }
960
          before = loc;
1✔
961
          isSequence = true;
1✔
962
        }
963
        loc++;
1✔
964
      }
1✔
965

966
      // do not forget last part
967
      if (before < loc
1✔
968
          && (isSequence
969
              || !IoTDBDescriptor.getInstance().getConfig().isEnableDiscardOutOfOrderData())) {
×
970
        noFailure =
1✔
971
            insertTabletToTsFileProcessor(
1✔
972
                    insertTabletNode, before, loc, isSequence, results, beforeTimePartition)
973
                && noFailure;
974
      }
975
      long globalLatestFlushedTime =
1✔
976
          lastFlushTimeMap.getGlobalFlushedTime(insertTabletNode.getDevicePath().getFullPath());
1✔
977
      startTime = System.nanoTime();
1✔
978
      tryToUpdateBatchInsertLastCache(insertTabletNode, globalLatestFlushedTime);
1✔
979
      PERFORMANCE_OVERVIEW_METRICS.recordScheduleUpdateLastCacheCost(System.nanoTime() - startTime);
1✔
980

981
      if (!noFailure) {
1✔
982
        throw new BatchProcessException(results);
×
983
      }
984
    } finally {
985
      writeUnlock();
1✔
986
    }
987
  }
1✔
988

989
  /**
990
   * Check whether the time falls in TTL.
991
   *
992
   * @return whether the given time falls in ttl
993
   */
994
  private boolean isAlive(long time) {
995
    return dataTTL == Long.MAX_VALUE || (DateTimeUtils.currentTime() - time) <= dataTTL;
1✔
996
  }
997

998
  /**
999
   * insert batch to tsfile processor thread-safety that the caller need to guarantee The rows to be
1000
   * inserted are in the range [start, end) Null value in each column values will be replaced by the
1001
   * subsequent non-null value, e.g., {1, null, 3, null, 5} will be {1, 3, 5, null, 5}
1002
   *
1003
   * @param insertTabletNode insert a tablet of a device
1004
   * @param sequence whether is sequence
1005
   * @param start start index of rows to be inserted in insertTabletPlan
1006
   * @param end end index of rows to be inserted in insertTabletPlan
1007
   * @param results result array
1008
   * @param timePartitionId time partition id
1009
   * @return false if any failure occurs when inserting the tablet, true otherwise
1010
   */
1011
  private boolean insertTabletToTsFileProcessor(
1012
      InsertTabletNode insertTabletNode,
1013
      int start,
1014
      int end,
1015
      boolean sequence,
1016
      TSStatus[] results,
1017
      long timePartitionId) {
1018
    // return when start >= end
1019
    if (start >= end) {
1✔
1020
      return true;
1✔
1021
    }
1022

1023
    TsFileProcessor tsFileProcessor = getOrCreateTsFileProcessor(timePartitionId, sequence);
1✔
1024
    if (tsFileProcessor == null) {
1✔
1025
      for (int i = start; i < end; i++) {
×
1026
        results[i] =
×
1027
            RpcUtils.getStatus(
×
1028
                TSStatusCode.INTERNAL_SERVER_ERROR,
1029
                "can not create TsFileProcessor, timePartitionId: " + timePartitionId);
1030
      }
1031
      return false;
×
1032
    }
1033

1034
    try {
1035
      tsFileProcessor.insertTablet(insertTabletNode, start, end, results);
1✔
1036
    } catch (WriteProcessRejectException e) {
×
1037
      logger.warn("insert to TsFileProcessor rejected, {}", e.getMessage());
×
1038
      return false;
×
1039
    } catch (WriteProcessException e) {
×
1040
      logger.error("insert to TsFileProcessor error ", e);
×
1041
      return false;
×
1042
    }
1✔
1043

1044
    // check memtable size and may async try to flush the work memtable
1045
    if (tsFileProcessor.shouldFlush()) {
1✔
1046
      fileFlushPolicy.apply(this, tsFileProcessor, sequence);
×
1047
    }
1048
    return true;
1✔
1049
  }
1050

1051
  private void tryToUpdateBatchInsertLastCache(InsertTabletNode node, long latestFlushedTime) {
1052
    if (!CommonDescriptor.getInstance().getConfig().isLastCacheEnable()
1✔
1053
        || (config.getDataRegionConsensusProtocolClass().equals(ConsensusFactory.IOT_CONSENSUS)
1✔
1054
            && node.isSyncFromLeaderWhenUsingIoTConsensus())) {
×
1055
      // disable updating last cache on follower
1056
      return;
×
1057
    }
1058
    String[] measurements = node.getMeasurements();
1✔
1059
    MeasurementSchema[] measurementSchemas = node.getMeasurementSchemas();
1✔
1060
    String[] rawMeasurements = new String[measurements.length];
1✔
1061
    for (int i = 0; i < measurements.length; i++) {
1✔
1062
      if (measurementSchemas[i] != null) {
1✔
1063
        // get raw measurement rather than alias
1064
        rawMeasurements[i] = measurementSchemas[i].getMeasurementId();
1✔
1065
      } else {
1066
        rawMeasurements[i] = measurements[i];
×
1067
      }
1068
    }
1069
    DataNodeSchemaCache.getInstance()
1✔
1070
        .updateLastCache(
1✔
1071
            getDatabaseName(),
1✔
1072
            node.getDevicePath(),
1✔
1073
            rawMeasurements,
1074
            node.getMeasurementSchemas(),
1✔
1075
            node.isAligned(),
1✔
1076
            node::composeLastTimeValuePair,
1✔
1077
            index -> node.getColumns()[index] != null,
1✔
1078
            true,
1079
            latestFlushedTime);
1✔
1080
  }
1✔
1081

1082
  private void insertToTsFileProcessor(
1083
      InsertRowNode insertRowNode, boolean sequence, long timePartitionId)
1084
      throws WriteProcessException {
1085
    TsFileProcessor tsFileProcessor = getOrCreateTsFileProcessor(timePartitionId, sequence);
1✔
1086
    if (tsFileProcessor == null) {
1✔
1087
      return;
×
1088
    }
1089

1090
    tsFileProcessor.insert(insertRowNode);
1✔
1091

1092
    long globalLatestFlushTime =
1✔
1093
        lastFlushTimeMap.getGlobalFlushedTime(insertRowNode.getDevicePath().getFullPath());
1✔
1094

1095
    long startTime = System.nanoTime();
1✔
1096
    tryToUpdateInsertLastCache(insertRowNode, globalLatestFlushTime);
1✔
1097
    PERFORMANCE_OVERVIEW_METRICS.recordScheduleUpdateLastCacheCost(System.nanoTime() - startTime);
1✔
1098

1099
    // check memtable size and may asyncTryToFlush the work memtable
1100
    if (tsFileProcessor.shouldFlush()) {
1✔
1101
      fileFlushPolicy.apply(this, tsFileProcessor, sequence);
×
1102
    }
1103
  }
1✔
1104

1105
  private void tryToUpdateInsertLastCache(InsertRowNode node, long latestFlushedTime) {
1106
    if (!CommonDescriptor.getInstance().getConfig().isLastCacheEnable()
1✔
1107
        || (config.getDataRegionConsensusProtocolClass().equals(ConsensusFactory.IOT_CONSENSUS)
1✔
1108
            && node.isSyncFromLeaderWhenUsingIoTConsensus())) {
×
1109
      // disable updating last cache on follower
1110
      return;
×
1111
    }
1112
    String[] measurements = node.getMeasurements();
1✔
1113
    MeasurementSchema[] measurementSchemas = node.getMeasurementSchemas();
1✔
1114
    String[] rawMeasurements = new String[measurements.length];
1✔
1115
    for (int i = 0; i < measurements.length; i++) {
1✔
1116
      if (measurementSchemas[i] != null) {
1✔
1117
        // get raw measurement rather than alias
1118
        rawMeasurements[i] = measurementSchemas[i].getMeasurementId();
1✔
1119
      } else {
1120
        rawMeasurements[i] = measurements[i];
×
1121
      }
1122
    }
1123
    DataNodeSchemaCache.getInstance()
1✔
1124
        .updateLastCache(
1✔
1125
            getDatabaseName(),
1✔
1126
            node.getDevicePath(),
1✔
1127
            rawMeasurements,
1128
            node.getMeasurementSchemas(),
1✔
1129
            node.isAligned(),
1✔
1130
            node::composeTimeValuePair,
1✔
1131
            index -> node.getValues()[index] != null,
1✔
1132
            true,
1133
            latestFlushedTime);
1✔
1134
  }
1✔
1135

1136
  /**
1137
   * WAL module uses this method to flush memTable
1138
   *
1139
   * @return True if flush task is submitted successfully
1140
   */
1141
  public boolean submitAFlushTask(long timeRangeId, boolean sequence, IMemTable memTable) {
1142
    writeLock("submitAFlushTask");
×
1143
    try {
1144
      if (memTable.getFlushStatus() != FlushStatus.WORKING) {
×
1145
        return false;
×
1146
      }
1147

1148
      TsFileProcessor tsFileProcessor;
1149
      if (sequence) {
×
1150
        tsFileProcessor = workSequenceTsFileProcessors.get(timeRangeId);
×
1151
      } else {
1152
        tsFileProcessor = workUnsequenceTsFileProcessors.get(timeRangeId);
×
1153
      }
1154
      // only submit when tsFileProcessor exists and memTables are same
1155
      boolean shouldSubmit =
×
1156
          tsFileProcessor != null && tsFileProcessor.getWorkMemTable() == memTable;
×
1157
      if (shouldSubmit) {
×
1158
        fileFlushPolicy.apply(this, tsFileProcessor, tsFileProcessor.isSequence());
×
1159
      }
1160
      return shouldSubmit;
×
1161
    } finally {
1162
      writeUnlock();
×
1163
    }
1164
  }
1165

1166
  /**
1167
   * mem control module uses this method to flush memTable
1168
   *
1169
   * @param tsFileProcessor tsfile processor in which memTable to be flushed
1170
   */
1171
  public void submitAFlushTaskWhenShouldFlush(TsFileProcessor tsFileProcessor) {
1172
    writeLock("submitAFlushTaskWhenShouldFlush");
×
1173
    try {
1174
      // check memtable size and may asyncTryToFlush the work memtable
1175
      if (tsFileProcessor.shouldFlush()) {
×
1176
        fileFlushPolicy.apply(this, tsFileProcessor, tsFileProcessor.isSequence());
×
1177
      }
1178
    } finally {
1179
      writeUnlock();
×
1180
    }
1181
  }
×
1182

1183
  private TsFileProcessor getOrCreateTsFileProcessor(long timeRangeId, boolean sequence) {
1184
    TsFileProcessor tsFileProcessor = null;
1✔
1185
    int retryCnt = 0;
1✔
1186
    do {
1187
      try {
1188
        if (IoTDBDescriptor.getInstance().getConfig().isQuotaEnable()) {
1✔
1189
          if (!DataNodeSpaceQuotaManager.getInstance().checkRegionDisk(databaseName)) {
×
1190
            throw new ExceedQuotaException(
×
1191
                "Unable to continue writing data, because the space allocated to the database "
1192
                    + databaseName
1193
                    + " has already used the upper limit",
1194
                TSStatusCode.SPACE_QUOTA_EXCEEDED.getStatusCode());
×
1195
          }
1196
        }
1197
        if (sequence) {
1✔
1198
          tsFileProcessor =
1✔
1199
              getOrCreateTsFileProcessorIntern(timeRangeId, workSequenceTsFileProcessors, true);
1✔
1200
        } else {
1201
          tsFileProcessor =
1✔
1202
              getOrCreateTsFileProcessorIntern(timeRangeId, workUnsequenceTsFileProcessors, false);
1✔
1203
        }
1204
      } catch (DiskSpaceInsufficientException e) {
×
1205
        logger.error(
×
1206
            "disk space is insufficient when creating TsFile processor, change system mode to read-only",
1207
            e);
1208
        CommonDescriptor.getInstance().getConfig().setNodeStatus(NodeStatus.ReadOnly);
×
1209
        break;
×
1210
      } catch (IOException e) {
×
1211
        if (retryCnt < 3) {
×
1212
          logger.warn("meet IOException when creating TsFileProcessor, retry it again", e);
×
1213
          retryCnt++;
×
1214
        } else {
1215
          logger.error(
×
1216
              "meet IOException when creating TsFileProcessor, change system mode to error", e);
1217
          CommonDescriptor.getInstance().getConfig().handleUnrecoverableError();
×
1218
          break;
×
1219
        }
1220
      } catch (ExceedQuotaException e) {
×
1221
        logger.error(e.getMessage());
×
1222
        break;
×
1223
      }
1✔
1224
    } while (tsFileProcessor == null);
1✔
1225
    return tsFileProcessor;
1✔
1226
  }
1227

1228
  /**
1229
   * get processor from hashmap, flush oldest processor if necessary
1230
   *
1231
   * @param timeRangeId time partition range
1232
   * @param tsFileProcessorTreeMap tsFileProcessorTreeMap
1233
   * @param sequence whether is sequence or not
1234
   */
1235
  private TsFileProcessor getOrCreateTsFileProcessorIntern(
1236
      long timeRangeId, TreeMap<Long, TsFileProcessor> tsFileProcessorTreeMap, boolean sequence)
1237
      throws IOException, DiskSpaceInsufficientException {
1238

1239
    TsFileProcessor res = tsFileProcessorTreeMap.get(timeRangeId);
1✔
1240

1241
    if (null == res) {
1✔
1242
      // build new processor, memory control module will control the number of memtables
1243
      TimePartitionManager.getInstance()
1✔
1244
          .updateAfterOpeningTsFileProcessor(
1✔
1245
              new DataRegionId(Integer.valueOf(dataRegionId)), timeRangeId);
1✔
1246
      res = newTsFileProcessor(sequence, timeRangeId);
1✔
1247
      tsFileProcessorTreeMap.put(timeRangeId, res);
1✔
1248
      tsFileManager.add(res.getTsFileResource(), sequence);
1✔
1249
    }
1250

1251
    return res;
1✔
1252
  }
1253

1254
  private TsFileProcessor newTsFileProcessor(boolean sequence, long timePartitionId)
1255
      throws IOException, DiskSpaceInsufficientException {
1256

1257
    long version = partitionMaxFileVersions.getOrDefault(timePartitionId, 0L) + 1;
1✔
1258
    partitionMaxFileVersions.put(timePartitionId, version);
1✔
1259
    String filePath =
1✔
1260
        TsFileNameGenerator.generateNewTsFilePathWithMkdir(
1✔
1261
            sequence,
1262
            databaseName,
1263
            dataRegionId,
1264
            timePartitionId,
1265
            System.currentTimeMillis(),
1✔
1266
            version,
1267
            0,
1268
            0);
1269

1270
    return getTsFileProcessor(sequence, filePath, timePartitionId);
1✔
1271
  }
1272

1273
  private TsFileProcessor getTsFileProcessor(
1274
      boolean sequence, String filePath, long timePartitionId) throws IOException {
1275
    TsFileProcessor tsFileProcessor;
1276
    if (sequence) {
1✔
1277
      tsFileProcessor =
1✔
1278
          new TsFileProcessor(
1279
              databaseName + FILE_NAME_SEPARATOR + dataRegionId,
1280
              fsFactory.getFileWithParent(filePath),
1✔
1281
              dataRegionInfo,
1282
              this::closeUnsealedTsFileProcessorCallBack,
1283
              this::sequenceFlushCallback,
1284
              true);
1285
    } else {
1286
      tsFileProcessor =
1✔
1287
          new TsFileProcessor(
1288
              databaseName + FILE_NAME_SEPARATOR + dataRegionId,
1289
              fsFactory.getFileWithParent(filePath),
1✔
1290
              dataRegionInfo,
1291
              this::closeUnsealedTsFileProcessorCallBack,
1292
              this::unsequenceFlushCallback,
1293
              false);
1294
    }
1295

1296
    if (enableMemControl) {
1✔
1297
      TsFileProcessorInfo tsFileProcessorInfo = new TsFileProcessorInfo(dataRegionInfo);
1✔
1298
      tsFileProcessor.setTsFileProcessorInfo(tsFileProcessorInfo);
1✔
1299
      this.dataRegionInfo.initTsFileProcessorInfo(tsFileProcessor);
1✔
1300
    }
1301

1302
    tsFileProcessor.addCloseFileListeners(customCloseFileListeners);
1✔
1303
    tsFileProcessor.addFlushListeners(customFlushListeners);
1✔
1304
    tsFileProcessor.setTimeRangeId(timePartitionId);
1✔
1305

1306
    return tsFileProcessor;
1✔
1307
  }
1308

1309
  /**
1310
   * Create a new tsfile name
1311
   *
1312
   * @return file name
1313
   */
1314
  private String getNewTsFileName(long timePartitionId) {
1315
    long version = partitionMaxFileVersions.getOrDefault(timePartitionId, 0L) + 1;
×
1316
    partitionMaxFileVersions.put(timePartitionId, version);
×
1317
    return getNewTsFileName(System.currentTimeMillis(), version, 0, 0);
×
1318
  }
1319

1320
  private String getNewTsFileName(long time, long version, int mergeCnt, int unseqCompactionCnt) {
1321
    return TsFileNameGenerator.generateNewTsFileName(time, version, mergeCnt, unseqCompactionCnt);
×
1322
  }
1323

1324
  /**
1325
   * close one tsfile processor
1326
   *
1327
   * @param sequence whether this tsfile processor is sequence or not
1328
   * @param tsFileProcessor tsfile processor
1329
   */
1330
  public void syncCloseOneTsFileProcessor(boolean sequence, TsFileProcessor tsFileProcessor) {
1331
    synchronized (closeStorageGroupCondition) {
×
1332
      try {
1333
        asyncCloseOneTsFileProcessor(sequence, tsFileProcessor);
×
1334
        long startTime = System.currentTimeMillis();
×
1335
        while (closingSequenceTsFileProcessor.contains(tsFileProcessor)
×
1336
            || closingUnSequenceTsFileProcessor.contains(tsFileProcessor)) {
×
1337
          closeStorageGroupCondition.wait(60_000);
×
1338
          if (System.currentTimeMillis() - startTime > 60_000) {
×
1339
            logger.warn(
×
1340
                "{} has spent {}s to wait for closing one tsfile.",
1341
                databaseName + "-" + this.dataRegionId,
1342
                (System.currentTimeMillis() - startTime) / 1000);
×
1343
          }
1344
        }
1345
      } catch (InterruptedException e) {
×
1346
        Thread.currentThread().interrupt();
×
1347
        logger.error(
×
1348
            "syncCloseOneTsFileProcessor error occurs while waiting for closing the storage "
1349
                + "group {}",
1350
            databaseName + "-" + dataRegionId,
1351
            e);
1352
      }
×
1353
    }
×
1354
  }
×
1355

1356
  /**
1357
   * close one tsfile processor, thread-safety should be ensured by caller
1358
   *
1359
   * @param sequence whether this tsfile processor is sequence or not
1360
   * @param tsFileProcessor tsfile processor
1361
   */
1362
  public void asyncCloseOneTsFileProcessor(boolean sequence, TsFileProcessor tsFileProcessor) {
1363
    // for sequence tsfile, we update the endTimeMap only when the file is prepared to be closed.
1364
    // for unsequence tsfile, we have maintained the endTimeMap when an insertion comes.
1365
    if (closingSequenceTsFileProcessor.contains(tsFileProcessor)
1✔
1366
        || closingUnSequenceTsFileProcessor.contains(tsFileProcessor)
1✔
1367
        || tsFileProcessor.alreadyMarkedClosing()) {
1✔
1368
      return;
×
1369
    }
1370
    logger.info(
1✔
1371
        "Async close tsfile: {}",
1372
        tsFileProcessor.getTsFileResource().getTsFile().getAbsolutePath());
1✔
1373

1374
    if (sequence) {
1✔
1375
      closingSequenceTsFileProcessor.add(tsFileProcessor);
1✔
1376
      tsFileProcessor.asyncClose();
1✔
1377

1378
      workSequenceTsFileProcessors.remove(tsFileProcessor.getTimeRangeId());
1✔
1379
      // if unsequence files don't contain this time range id, we should remove it's version
1380
      // controller
1381
      if (!workUnsequenceTsFileProcessors.containsKey(tsFileProcessor.getTimeRangeId())) {
1✔
1382
        timePartitionIdVersionControllerMap.remove(tsFileProcessor.getTimeRangeId());
1✔
1383
      }
1384
      logger.info("close a sequence tsfile processor {}", databaseName + "-" + dataRegionId);
1✔
1385
    } else {
1386
      closingUnSequenceTsFileProcessor.add(tsFileProcessor);
1✔
1387
      tsFileProcessor.asyncClose();
1✔
1388

1389
      workUnsequenceTsFileProcessors.remove(tsFileProcessor.getTimeRangeId());
1✔
1390
      // if sequence files don't contain this time range id, we should remove it's version
1391
      // controller
1392
      if (!workSequenceTsFileProcessors.containsKey(tsFileProcessor.getTimeRangeId())) {
1✔
1393
        timePartitionIdVersionControllerMap.remove(tsFileProcessor.getTimeRangeId());
1✔
1394
      }
1395
    }
1396
  }
1✔
1397

1398
  /**
1399
   * delete the database's own folder in folder data/system/databases
1400
   *
1401
   * @param systemDir system dir
1402
   */
1403
  public void deleteFolder(String systemDir) {
1404
    logger.info(
1✔
1405
        "{} will close all files for deleting data folder {}",
1406
        databaseName + "-" + dataRegionId,
1407
        systemDir);
1408
    writeLock("deleteFolder");
1✔
1409
    try {
1410
      File dataRegionSystemFolder =
1✔
1411
          SystemFileFactory.INSTANCE.getFile(
1✔
1412
              systemDir + File.separator + databaseName, dataRegionId);
1413
      org.apache.iotdb.commons.utils.FileUtils.deleteDirectoryAndEmptyParent(
1✔
1414
          dataRegionSystemFolder);
1415
    } finally {
1416
      writeUnlock();
1✔
1417
    }
1418
  }
1✔
1419

1420
  /** close all tsfile resource */
1421
  public void closeAllResources() {
1422
    for (TsFileResource tsFileResource : tsFileManager.getTsFileList(false)) {
1✔
1423
      try {
1424
        tsFileResource.close();
1✔
1425
      } catch (IOException e) {
×
1426
        logger.error("Cannot close a TsFileResource {}", tsFileResource, e);
×
1427
      }
1✔
1428
    }
1✔
1429
    for (TsFileResource tsFileResource : tsFileManager.getTsFileList(true)) {
1✔
1430
      try {
1431
        tsFileResource.close();
1✔
1432
      } catch (IOException e) {
×
1433
        logger.error("Cannot close a TsFileResource {}", tsFileResource, e);
×
1434
      }
1✔
1435
    }
1✔
1436
  }
1✔
1437

1438
  /** delete tsfile */
1439
  public void syncDeleteDataFiles() {
1440
    logger.info(
1✔
1441
        "{} will close all files for deleting data files", databaseName + "-" + dataRegionId);
1442
    writeLock("syncDeleteDataFiles");
1✔
1443
    try {
1444

1445
      syncCloseAllWorkingTsFileProcessors();
1✔
1446
      // normally, mergingModification is just need to be closed by after a merge task is finished.
1447
      // we close it here just for IT test.
1448
      closeAllResources();
1✔
1449
      List<TsFileResource> tsFileResourceList = tsFileManager.getTsFileList(true);
1✔
1450
      tsFileResourceList.addAll(tsFileManager.getTsFileList(false));
1✔
1451
      tsFileResourceList.forEach(
1✔
1452
          x -> {
1453
            FileMetrics.getInstance()
1✔
1454
                .deleteFile(
1✔
1455
                    new long[] {x.getTsFileSize()},
1✔
1456
                    x.isSeq(),
1✔
1457
                    Collections.singletonList(x.getTsFile().getName()));
1✔
1458
            if (x.getModFile().exists()) {
1✔
1459
              FileMetrics.getInstance().decreaseModFileNum(1);
1✔
1460
              FileMetrics.getInstance().decreaseModFileSize(x.getModFile().getSize());
1✔
1461
            }
1462
          });
1✔
1463
      deleteAllSGFolders(TierManager.getInstance().getAllFilesFolders());
1✔
1464

1465
      this.workSequenceTsFileProcessors.clear();
1✔
1466
      this.workUnsequenceTsFileProcessors.clear();
1✔
1467
      this.tsFileManager.clear();
1✔
1468
      lastFlushTimeMap.clearFlushedTime();
1✔
1469
      lastFlushTimeMap.clearGlobalFlushedTime();
1✔
1470
    } finally {
1471
      writeUnlock();
1✔
1472
    }
1473
  }
1✔
1474

1475
  private void deleteAllSGFolders(List<String> folder) {
1476
    for (String tsfilePath : folder) {
1✔
1477
      File dataRegionDataFolder =
1✔
1478
          fsFactory.getFile(tsfilePath, databaseName + File.separator + dataRegionId);
1✔
1479
      if (FSUtils.getFSType(dataRegionDataFolder) != FSType.LOCAL) {
1✔
1480
        try {
1481
          fsFactory.deleteDirectory(dataRegionDataFolder.getPath());
×
1482
        } catch (IOException e) {
×
1483
          logger.error("Fail to delete data region folder {}", dataRegionDataFolder);
×
1484
        }
×
1485
      } else {
1486
        if (dataRegionDataFolder.exists()) {
1✔
1487
          org.apache.iotdb.commons.utils.FileUtils.deleteDirectoryAndEmptyParent(
1✔
1488
              dataRegionDataFolder);
1489
        }
1490
      }
1491
    }
1✔
1492
  }
1✔
1493

1494
  /** Iterate each TsFile and try to lock and remove those out of TTL. */
1495
  public synchronized void checkFilesTTL() {
1496
    if (dataTTL == Long.MAX_VALUE) {
1✔
1497
      logger.debug("{}: TTL not set, ignore the check", databaseName + "-" + dataRegionId);
×
1498
      return;
×
1499
    }
1500
    long ttlLowerBound = DateTimeUtils.currentTime() - dataTTL;
1✔
1501
    logger.debug(
1✔
1502
        "{}: TTL removing files before {}",
1503
        databaseName + "-" + dataRegionId,
1504
        new Date(ttlLowerBound));
1505

1506
    // copy to avoid concurrent modification of deletion
1507
    List<TsFileResource> seqFiles = new ArrayList<>(tsFileManager.getTsFileList(true));
1✔
1508
    List<TsFileResource> unseqFiles = new ArrayList<>(tsFileManager.getTsFileList(false));
1✔
1509

1510
    for (TsFileResource tsFileResource : seqFiles) {
1✔
1511
      checkFileTTL(tsFileResource, ttlLowerBound, true);
1✔
1512
    }
1✔
1513
    for (TsFileResource tsFileResource : unseqFiles) {
1✔
1514
      checkFileTTL(tsFileResource, ttlLowerBound, false);
1✔
1515
    }
1✔
1516
  }
1✔
1517

1518
  private void checkFileTTL(TsFileResource resource, long ttlLowerBound, boolean isSeq) {
1519
    if (!resource.isClosed() || !resource.isDeleted() && resource.stillLives(ttlLowerBound)) {
1✔
1520
      return;
×
1521
    }
1522
    // Try to set the resource to DELETED status and return if it failed
1523
    if (!resource.setStatus(TsFileResourceStatus.DELETED)) {
1✔
1524
      return;
×
1525
    }
1526
    tsFileManager.remove(resource, isSeq);
1✔
1527
    // ensure that the file is not used by any queries
1528
    resource.writeLock();
1✔
1529
    try {
1530
      // try to delete physical data file
1531
      resource.remove();
1✔
1532
      FileMetrics.getInstance()
1✔
1533
          .deleteFile(
1✔
1534
              new long[] {resource.getTsFileSize()},
1✔
1535
              isSeq,
1536
              Collections.singletonList(resource.getTsFile().getName()));
1✔
1537
      logger.info(
1✔
1538
          "Removed a file {} before {} by ttl ({} {})",
1539
          resource.getTsFilePath(),
1✔
1540
          new Date(ttlLowerBound),
1541
          dataTTL,
1✔
1542
          CommonDescriptor.getInstance().getConfig().getTimestampPrecision());
1✔
1543
    } finally {
1544
      resource.writeUnlock();
1✔
1545
    }
1546
  }
1✔
1547

1548
  public void timedFlushSeqMemTable() {
1549
    writeLock("timedFlushSeqMemTable");
1✔
1550
    try {
1551
      // only check sequence tsfiles' memtables
1552
      List<TsFileProcessor> tsFileProcessors =
1✔
1553
          new ArrayList<>(workSequenceTsFileProcessors.values());
1✔
1554
      long timeLowerBound = System.currentTimeMillis() - config.getSeqMemtableFlushInterval();
1✔
1555

1556
      for (TsFileProcessor tsFileProcessor : tsFileProcessors) {
1✔
1557
        if (tsFileProcessor.getWorkMemTableCreatedTime() < timeLowerBound) {
1✔
1558
          logger.info(
1✔
1559
              "Exceed sequence memtable flush interval, so flush working memtable of time partition {} in database {}[{}]",
1560
              tsFileProcessor.getTimeRangeId(),
1✔
1561
              databaseName,
1562
              dataRegionId);
1563
          fileFlushPolicy.apply(this, tsFileProcessor, tsFileProcessor.isSequence());
1✔
1564
        }
1565
      }
1✔
1566
    } finally {
1567
      writeUnlock();
1✔
1568
    }
1569
  }
1✔
1570

1571
  public void timedFlushUnseqMemTable() {
1572
    writeLock("timedFlushUnseqMemTable");
1✔
1573
    try {
1574
      // only check unsequence tsfiles' memtables
1575
      List<TsFileProcessor> tsFileProcessors =
1✔
1576
          new ArrayList<>(workUnsequenceTsFileProcessors.values());
1✔
1577
      long timeLowerBound = System.currentTimeMillis() - config.getUnseqMemtableFlushInterval();
1✔
1578

1579
      for (TsFileProcessor tsFileProcessor : tsFileProcessors) {
1✔
1580
        if (tsFileProcessor.getWorkMemTableCreatedTime() < timeLowerBound) {
1✔
1581
          logger.info(
1✔
1582
              "Exceed unsequence memtable flush interval, so flush working memtable of time partition {} in database {}[{}]",
1583
              tsFileProcessor.getTimeRangeId(),
1✔
1584
              databaseName,
1585
              dataRegionId);
1586
          fileFlushPolicy.apply(this, tsFileProcessor, tsFileProcessor.isSequence());
1✔
1587
        }
1588
      }
1✔
1589
    } finally {
1590
      writeUnlock();
1✔
1591
    }
1592
  }
1✔
1593

1594
  /** This method will be blocked until all tsfile processors are closed. */
1595
  public void syncCloseAllWorkingTsFileProcessors() {
1596
    synchronized (closeStorageGroupCondition) {
1✔
1597
      try {
1598
        asyncCloseAllWorkingTsFileProcessors();
1✔
1599
        long startTime = System.currentTimeMillis();
1✔
1600
        while (!closingSequenceTsFileProcessor.isEmpty()
1✔
1601
            || !closingUnSequenceTsFileProcessor.isEmpty()) {
1✔
1602
          closeStorageGroupCondition.wait(60_000);
1✔
1603
          if (System.currentTimeMillis() - startTime > 60_000) {
1✔
1604
            logger.warn(
×
1605
                "{} has spent {}s to wait for closing all TsFiles.",
1606
                databaseName + "-" + this.dataRegionId,
1607
                (System.currentTimeMillis() - startTime) / 1000);
×
1608
          }
1609
        }
1610
      } catch (InterruptedException e) {
×
1611
        logger.error(
×
1612
            "CloseFileNodeCondition error occurs while waiting for closing the storage "
1613
                + "group {}",
1614
            databaseName + "-" + dataRegionId,
1615
            e);
1616
        Thread.currentThread().interrupt();
×
1617
      }
1✔
1618
    }
1✔
1619
  }
1✔
1620

1621
  /** close all working tsfile processors */
1622
  public void asyncCloseAllWorkingTsFileProcessors() {
1623
    writeLock("asyncCloseAllWorkingTsFileProcessors");
1✔
1624
    try {
1625
      logger.info("async force close all files in database: {}", databaseName + "-" + dataRegionId);
1✔
1626
      // to avoid concurrent modification problem, we need a new array list
1627
      for (TsFileProcessor tsFileProcessor :
1628
          new ArrayList<>(workSequenceTsFileProcessors.values())) {
1✔
1629
        asyncCloseOneTsFileProcessor(true, tsFileProcessor);
1✔
1630
      }
1✔
1631
      // to avoid concurrent modification problem, we need a new array list
1632
      for (TsFileProcessor tsFileProcessor :
1633
          new ArrayList<>(workUnsequenceTsFileProcessors.values())) {
1✔
1634
        asyncCloseOneTsFileProcessor(false, tsFileProcessor);
1✔
1635
      }
1✔
1636
    } finally {
1637
      writeUnlock();
1✔
1638
    }
1639
  }
1✔
1640

1641
  /** force close all working tsfile processors */
1642
  public void forceCloseAllWorkingTsFileProcessors() throws TsFileProcessorException {
1643
    writeLock("forceCloseAllWorkingTsFileProcessors");
×
1644
    try {
1645
      logger.info("force close all processors in database: {}", databaseName + "-" + dataRegionId);
×
1646
      // to avoid concurrent modification problem, we need a new array list
1647
      for (TsFileProcessor tsFileProcessor :
1648
          new ArrayList<>(workSequenceTsFileProcessors.values())) {
×
1649
        tsFileProcessor.putMemTableBackAndClose();
×
1650
      }
×
1651
      // to avoid concurrent modification problem, we need a new array list
1652
      for (TsFileProcessor tsFileProcessor :
1653
          new ArrayList<>(workUnsequenceTsFileProcessors.values())) {
×
1654
        tsFileProcessor.putMemTableBackAndClose();
×
1655
      }
×
1656
    } finally {
1657
      writeUnlock();
×
1658
    }
1659
  }
×
1660

1661
  /** used for queryengine */
1662
  @Override
1663
  public QueryDataSource query(
1664
      List<PartialPath> pathList, String singleDeviceId, QueryContext context, Filter timeFilter)
1665
      throws QueryProcessException {
1666
    try {
1667
      List<TsFileResource> seqResources =
1✔
1668
          getFileResourceListForQuery(
1✔
1669
              tsFileManager.getTsFileList(true),
1✔
1670
              pathList,
1671
              singleDeviceId,
1672
              context,
1673
              timeFilter,
1674
              true);
1675
      List<TsFileResource> unseqResources =
1✔
1676
          getFileResourceListForQuery(
1✔
1677
              tsFileManager.getTsFileList(false),
1✔
1678
              pathList,
1679
              singleDeviceId,
1680
              context,
1681
              timeFilter,
1682
              false);
1683

1684
      QUERY_RESOURCE_METRIC_SET.recordQueryResourceNum(SEQUENCE_TSFILE, seqResources.size());
1✔
1685
      QUERY_RESOURCE_METRIC_SET.recordQueryResourceNum(UNSEQUENCE_TSFILE, unseqResources.size());
1✔
1686

1687
      QueryDataSource dataSource = new QueryDataSource(seqResources, unseqResources);
1✔
1688
      dataSource.setDataTTL(dataTTL);
1✔
1689
      return dataSource;
1✔
1690
    } catch (MetadataException e) {
×
1691
      throw new QueryProcessException(e);
×
1692
    }
1693
  }
1694

1695
  /** lock the read lock of the insert lock */
1696
  @Override
1697
  public void readLock() {
1698
    // apply read lock for SG insert lock to prevent inconsistent with concurrently writing memtable
1699
    insertLock.readLock().lock();
×
1700
    // apply read lock for TsFileResource list
1701
    tsFileManager.readLock();
×
1702
  }
×
1703

1704
  /** unlock the read lock of insert lock */
1705
  @Override
1706
  public void readUnlock() {
1707
    tsFileManager.readUnlock();
×
1708
    insertLock.readLock().unlock();
×
1709
  }
×
1710

1711
  /** lock the write lock of the insert lock */
1712
  public void writeLock(String holder) {
1713
    insertLock.writeLock().lock();
1✔
1714
    insertWriteLockHolder = holder;
1✔
1715
  }
1✔
1716

1717
  /** unlock the write lock of the insert lock */
1718
  public void writeUnlock() {
1719
    insertWriteLockHolder = "";
1✔
1720
    insertLock.writeLock().unlock();
1✔
1721
  }
1✔
1722

1723
  /**
1724
   * @param tsFileResources includes sealed and unsealed tsfile resources
1725
   * @return fill unsealed tsfile resources with memory data and ChunkMetadataList of data in disk
1726
   */
1727
  private List<TsFileResource> getFileResourceListForQuery(
1728
      Collection<TsFileResource> tsFileResources,
1729
      List<PartialPath> pathList,
1730
      String singleDeviceId,
1731
      QueryContext context,
1732
      Filter timeFilter,
1733
      boolean isSeq)
1734
      throws MetadataException {
1735

1736
    if (context.isDebug()) {
1✔
1737
      DEBUG_LOGGER.info(
×
1738
          "Path: {}, get tsfile list: {} isSeq: {} timefilter: {}",
1739
          pathList,
1740
          tsFileResources,
1741
          isSeq,
×
1742
          (timeFilter == null ? "null" : timeFilter));
×
1743
    }
1744

1745
    List<TsFileResource> tsfileResourcesForQuery = new ArrayList<>();
1✔
1746

1747
    long timeLowerBound =
1748
        dataTTL != Long.MAX_VALUE ? DateTimeUtils.currentTime() - dataTTL : Long.MIN_VALUE;
1✔
1749
    context.setQueryTimeLowerBound(timeLowerBound);
1✔
1750

1751
    for (TsFileResource tsFileResource : tsFileResources) {
1✔
1752
      if (!tsFileResource.isSatisfied(
1✔
1753
          singleDeviceId, timeFilter, isSeq, dataTTL, context.isDebug())) {
1✔
1754
        continue;
1✔
1755
      }
1756
      closeQueryLock.readLock().lock();
1✔
1757
      try {
1758
        if (tsFileResource.isClosed()) {
1✔
1759
          tsfileResourcesForQuery.add(tsFileResource);
1✔
1760
        } else {
1761
          tsFileResource.getProcessor().query(pathList, context, tsfileResourcesForQuery);
1✔
1762
        }
1763
      } catch (IOException e) {
×
1764
        throw new MetadataException(e);
×
1765
      } finally {
1766
        closeQueryLock.readLock().unlock();
1✔
1767
      }
1768
    }
1✔
1769
    return tsfileResourcesForQuery;
1✔
1770
  }
1771

1772
  /** Seperate tsfiles in TsFileManager to sealedList and unsealedList. */
1773
  private void separateTsFile(
1774
      List<TsFileResource> sealedResource, List<TsFileResource> unsealedResource) {
1775
    tsFileManager
1✔
1776
        .getTsFileList(true)
1✔
1777
        .forEach(
1✔
1778
            tsFileResource -> {
1779
              if (tsFileResource.isClosed()) {
1✔
1780
                sealedResource.add(tsFileResource);
1✔
1781
              } else {
1782
                unsealedResource.add(tsFileResource);
1✔
1783
              }
1784
            });
1✔
1785
    tsFileManager
1✔
1786
        .getTsFileList(false)
1✔
1787
        .forEach(
1✔
1788
            tsFileResource -> {
1789
              if (tsFileResource.isClosed()) {
1✔
1790
                sealedResource.add(tsFileResource);
1✔
1791
              } else {
1792
                unsealedResource.add(tsFileResource);
1✔
1793
              }
1794
            });
1✔
1795
  }
1✔
1796

1797
  /**
1798
   * @param pattern Must be a pattern start with a precise device path
1799
   * @param startTime
1800
   * @param endTime
1801
   * @param searchIndex
1802
   * @param timePartitionFilter
1803
   * @throws IOException
1804
   */
1805
  public void deleteByDevice(
1806
      PartialPath pattern,
1807
      long startTime,
1808
      long endTime,
1809
      long searchIndex,
1810
      TimePartitionFilter timePartitionFilter)
1811
      throws IOException {
1812
    if (SettleService.getINSTANCE().getFilesToBeSettledCount().get() != 0) {
1✔
1813
      throw new IOException(
×
1814
          "Delete failed. " + "Please do not delete until the old files settled.");
1815
    }
1816
    // TODO: how to avoid partial deletion?
1817
    // FIXME: notice that if we may remove a SGProcessor out of memory, we need to close all opened
1818
    // mod files in mergingModification, sequenceFileList, and unsequenceFileList
1819
    writeLock("delete");
1✔
1820

1821
    boolean hasReleasedLock = false;
1✔
1822

1823
    try {
1824

1825
      Set<PartialPath> devicePaths = new HashSet<>(pattern.getDevicePathPattern());
1✔
1826

1827
      // delete Last cache record if necessary
1828
      // todo implement more precise process
1829
      DataNodeSchemaCache.getInstance().takeWriteLock();
1✔
1830
      try {
1831
        DataNodeSchemaCache.getInstance().invalidateAll();
1✔
1832
      } finally {
1833
        DataNodeSchemaCache.getInstance().releaseWriteLock();
1✔
1834
      }
1835

1836
      // write log to impacted working TsFileProcessors
1837
      List<WALFlushListener> walListeners =
1✔
1838
          logDeletionInWAL(startTime, endTime, searchIndex, pattern, timePartitionFilter);
1✔
1839

1840
      for (WALFlushListener walFlushListener : walListeners) {
1✔
1841
        if (walFlushListener.waitForResult() == WALFlushListener.Status.FAILURE) {
1✔
1842
          logger.error("Fail to log delete to wal.", walFlushListener.getCause());
×
1843
          throw walFlushListener.getCause();
×
1844
        }
1845
      }
1✔
1846

1847
      Deletion deletion = new Deletion(pattern, MERGE_MOD_START_VERSION_NUM, startTime, endTime);
1✔
1848

1849
      List<TsFileResource> sealedTsFileResource = new ArrayList<>();
1✔
1850
      List<TsFileResource> unsealedTsFileResource = new ArrayList<>();
1✔
1851
      separateTsFile(sealedTsFileResource, unsealedTsFileResource);
1✔
1852
      // deviceMatchInfo is used for filter the matched deviceId in TsFileResource
1853
      // deviceMatchInfo contains the DeviceId means this device matched the pattern
1854
      Set<String> deviceMatchInfo = new HashSet<>();
1✔
1855
      deleteDataInFiles(
1✔
1856
          unsealedTsFileResource, deletion, devicePaths, timePartitionFilter, deviceMatchInfo);
1857
      writeUnlock();
1✔
1858
      hasReleasedLock = true;
1✔
1859

1860
      deleteDataInFiles(
1✔
1861
          sealedTsFileResource, deletion, devicePaths, timePartitionFilter, deviceMatchInfo);
1862

1863
    } catch (Exception e) {
×
1864
      throw new IOException(e);
×
1865
    } finally {
1866
      if (!hasReleasedLock) {
1✔
1867
        writeUnlock();
×
1868
      }
1869
    }
1870
  }
1✔
1871

1872
  private List<WALFlushListener> logDeletionInWAL(
1873
      long startTime,
1874
      long endTime,
1875
      long searchIndex,
1876
      PartialPath path,
1877
      TimePartitionFilter timePartitionFilter) {
1878
    long timePartitionStartId = StorageEngine.getTimePartition(startTime);
1✔
1879
    long timePartitionEndId = StorageEngine.getTimePartition(endTime);
1✔
1880
    List<WALFlushListener> walFlushListeners = new ArrayList<>();
1✔
1881
    if (config.getWalMode() == WALMode.DISABLE) {
1✔
1882
      return walFlushListeners;
×
1883
    }
1884
    DeleteDataNode deleteDataNode =
1✔
1885
        new DeleteDataNode(new PlanNodeId(""), Collections.singletonList(path), startTime, endTime);
1✔
1886
    deleteDataNode.setSearchIndex(searchIndex);
1✔
1887
    for (Map.Entry<Long, TsFileProcessor> entry : workSequenceTsFileProcessors.entrySet()) {
1✔
1888
      if (timePartitionStartId <= entry.getKey()
1✔
1889
          && entry.getKey() <= timePartitionEndId
1✔
1890
          && (timePartitionFilter == null
1891
              || timePartitionFilter.satisfy(databaseName, entry.getKey()))) {
×
1892
        WALFlushListener walFlushListener = entry.getValue().logDeleteDataNodeInWAL(deleteDataNode);
1✔
1893
        walFlushListeners.add(walFlushListener);
1✔
1894
      }
1895
    }
1✔
1896
    for (Map.Entry<Long, TsFileProcessor> entry : workUnsequenceTsFileProcessors.entrySet()) {
1✔
1897
      if (timePartitionStartId <= entry.getKey()
1✔
1898
          && entry.getKey() <= timePartitionEndId
1✔
1899
          && (timePartitionFilter == null
1900
              || timePartitionFilter.satisfy(databaseName, entry.getKey()))) {
×
1901
        WALFlushListener walFlushListener = entry.getValue().logDeleteDataNodeInWAL(deleteDataNode);
1✔
1902
        walFlushListeners.add(walFlushListener);
1✔
1903
      }
1904
    }
1✔
1905
    return walFlushListeners;
1✔
1906
  }
1907

1908
  private boolean canSkipDelete(
1909
      TsFileResource tsFileResource,
1910
      Set<PartialPath> devicePaths,
1911
      long deleteStart,
1912
      long deleteEnd,
1913
      TimePartitionFilter timePartitionFilter,
1914
      Set<String> deviceMatchInfo) {
1915
    if (timePartitionFilter != null
1✔
1916
        && !timePartitionFilter.satisfy(databaseName, tsFileResource.getTimePartition())) {
×
1917
      return true;
×
1918
    }
1919
    long fileStartTime = tsFileResource.getTimeIndex().getMinStartTime();
1✔
1920
    long fileEndTime = tsFileResource.getTimeIndex().getMaxEndTime();
1✔
1921

1922
    for (PartialPath device : devicePaths) {
1✔
1923
      long deviceStartTime, deviceEndTime;
1924
      if (device.hasWildcard()) {
1✔
1925
        if (!tsFileResource.isClosed() && fileEndTime == Long.MIN_VALUE) {
×
1926
          // unsealed seq file
1927
          if (deleteEnd < fileStartTime) {
×
1928
            // time range of file has not overlapped with the deletion
1929
            return true;
×
1930
          }
1931
        } else {
1932
          if (deleteEnd < fileStartTime || deleteStart > fileEndTime) {
×
1933
            // time range of file has not overlapped with the deletion
1934
            return true;
×
1935
          }
1936
        }
1937
        Pair<Long, Long> startAndEndTime =
×
1938
            tsFileResource.getPossibleStartTimeAndEndTime(device, deviceMatchInfo);
×
1939
        if (startAndEndTime == null) {
×
1940
          continue;
×
1941
        }
1942
        deviceStartTime = startAndEndTime.getLeft();
×
1943
        deviceEndTime = startAndEndTime.getRight();
×
1944
      } else {
×
1945
        String deviceId = device.getFullPath();
1✔
1946
        if (!tsFileResource.mayContainsDevice(deviceId)) {
1✔
1947
          // resource does not contain this device
1948
          continue;
1✔
1949
        }
1950
        deviceStartTime = tsFileResource.getStartTime(deviceId);
1✔
1951
        deviceEndTime = tsFileResource.getEndTime(deviceId);
1✔
1952
      }
1953

1954
      if (!tsFileResource.isClosed() && deviceEndTime == Long.MIN_VALUE) {
1✔
1955
        // unsealed seq file
1956
        if (deleteEnd >= deviceStartTime) {
1✔
1957
          return false;
1✔
1958
        }
1959
      } else {
1960
        // sealed file or unsealed unseq file
1961
        if (deleteEnd >= deviceStartTime && deleteStart <= deviceEndTime) {
1✔
1962
          // time range of device has overlap with the deletion
1963
          return false;
1✔
1964
        }
1965
      }
1966
    }
1✔
1967
    return true;
1✔
1968
  }
1969

1970
  // suppress warn of Throwable catch
1971
  @SuppressWarnings("java:S1181")
1972
  private void deleteDataInFiles(
1973
      Collection<TsFileResource> tsFileResourceList,
1974
      Deletion deletion,
1975
      Set<PartialPath> devicePaths,
1976
      TimePartitionFilter timePartitionFilter,
1977
      Set<String> deviceMatchInfo)
1978
      throws IOException {
1979
    for (TsFileResource tsFileResource : tsFileResourceList) {
1✔
1980
      if (canSkipDelete(
1✔
1981
          tsFileResource,
1982
          devicePaths,
1983
          deletion.getStartTime(),
1✔
1984
          deletion.getEndTime(),
1✔
1985
          timePartitionFilter,
1986
          deviceMatchInfo)) {
1987
        continue;
1✔
1988
      }
1989

1990
      ModificationFile modFile = tsFileResource.getModFile();
1✔
1991
      if (tsFileResource.isClosed()) {
1✔
1992
        long originSize = -1;
1✔
1993
        synchronized (modFile) {
1✔
1994
          try {
1995
            originSize = modFile.getSize();
1✔
1996
            // delete data in sealed file
1997
            if (tsFileResource.isCompacting()) {
1✔
1998
              // we have to set modification offset to MAX_VALUE, as the offset of source chunk may
1999
              // change after compaction
2000
              deletion.setFileOffset(Long.MAX_VALUE);
1✔
2001
              // write deletion into compaction modification file
2002
              tsFileResource.getCompactionModFile().write(deletion);
1✔
2003
              // write deletion into modification file to enable read during compaction
2004
              modFile.write(deletion);
1✔
2005
              // remember to close mod file
2006
              tsFileResource.getCompactionModFile().close();
1✔
2007
              modFile.close();
1✔
2008
            } else {
2009
              deletion.setFileOffset(tsFileResource.getTsFileSize());
1✔
2010
              // write deletion into modification file
2011
              boolean modFileExists = modFile.exists();
1✔
2012

2013
              modFile.write(deletion);
1✔
2014

2015
              // remember to close mod file
2016
              modFile.close();
1✔
2017

2018
              // if file length greater than 1M,execute compact.
2019
              modFile.compact();
1✔
2020

2021
              if (!modFileExists) {
1✔
2022
                FileMetrics.getInstance().increaseModFileNum(1);
1✔
2023
              }
2024

2025
              // The file size may be smaller than the original file, so the increment here may be
2026
              // negative
2027
              FileMetrics.getInstance().increaseModFileSize(modFile.getSize() - originSize);
1✔
2028
            }
2029
          } catch (Throwable t) {
×
2030
            if (originSize != -1) {
×
2031
              modFile.truncate(originSize);
×
2032
            }
2033
            throw t;
×
2034
          }
1✔
2035
          logger.info(
1✔
2036
              "[Deletion] Deletion with path:{}, time:{}-{} written into mods file:{}.",
2037
              deletion.getPath(),
1✔
2038
              deletion.getStartTime(),
1✔
2039
              deletion.getEndTime(),
1✔
2040
              modFile.getFilePath());
1✔
2041
        }
1✔
2042
      } else {
1✔
2043
        // delete data in memory of unsealed file
2044
        tsFileResource.getProcessor().deleteDataInMemory(deletion, devicePaths);
1✔
2045
      }
2046
    }
1✔
2047
  }
1✔
2048

2049
  private void unsequenceFlushCallback(
2050
      TsFileProcessor processor, Map<String, Long> updateMap, long systemFlushTime) {
2051
    TimePartitionManager.getInstance()
1✔
2052
        .updateAfterFlushing(
1✔
2053
            new DataRegionId(Integer.valueOf(dataRegionId)),
1✔
2054
            processor.getTimeRangeId(),
1✔
2055
            systemFlushTime,
2056
            lastFlushTimeMap.getMemSize(processor.getTimeRangeId()),
1✔
2057
            workSequenceTsFileProcessors.get(processor.getTimeRangeId()) != null);
1✔
2058
  }
1✔
2059

2060
  private void sequenceFlushCallback(
2061
      TsFileProcessor processor, Map<String, Long> updateMap, long systemFlushTime) {
2062
    lastFlushTimeMap.updateLatestFlushTime(processor.getTimeRangeId(), updateMap);
1✔
2063
    TimePartitionManager.getInstance()
1✔
2064
        .updateAfterFlushing(
1✔
2065
            new DataRegionId(Integer.valueOf(dataRegionId)),
1✔
2066
            processor.getTimeRangeId(),
1✔
2067
            systemFlushTime,
2068
            lastFlushTimeMap.getMemSize(processor.getTimeRangeId()),
1✔
2069
            workUnsequenceTsFileProcessors.get(processor.getTimeRangeId()) != null);
1✔
2070
  }
1✔
2071

2072
  /** put the memtable back to the MemTablePool and make the metadata in writer visible */
2073
  // TODO please consider concurrency with read and insert method.
2074
  private void closeUnsealedTsFileProcessorCallBack(TsFileProcessor tsFileProcessor)
2075
      throws TsFileProcessorException {
2076
    closeQueryLock.writeLock().lock();
1✔
2077
    try {
2078
      tsFileProcessor.close();
1✔
2079
      if (tsFileProcessor.isEmpty()) {
1✔
2080
        try {
2081
          fsFactory.deleteIfExists(tsFileProcessor.getTsFileResource().getTsFile());
1✔
2082
          tsFileManager.remove(tsFileProcessor.getTsFileResource(), tsFileProcessor.isSequence());
1✔
2083
        } catch (IOException e) {
×
2084
          logger.error(
×
2085
              "Remove empty file {} error",
2086
              tsFileProcessor.getTsFileResource().getTsFile().getAbsolutePath());
×
2087
        }
1✔
2088
      } else {
2089
        tsFileResourceManager.registerSealedTsFileResource(tsFileProcessor.getTsFileResource());
1✔
2090
      }
2091
    } finally {
2092
      closeQueryLock.writeLock().unlock();
1✔
2093
    }
2094
    // closingSequenceTsFileProcessor is a thread safety class.
2095
    if (closingSequenceTsFileProcessor.contains(tsFileProcessor)) {
1✔
2096
      closingSequenceTsFileProcessor.remove(tsFileProcessor);
1✔
2097
    } else {
2098
      closingUnSequenceTsFileProcessor.remove(tsFileProcessor);
1✔
2099
    }
2100
    synchronized (closeStorageGroupCondition) {
1✔
2101
      closeStorageGroupCondition.notifyAll();
1✔
2102
    }
1✔
2103
    FileMetrics.getInstance()
1✔
2104
        .addFile(
1✔
2105
            tsFileProcessor.getTsFileResource().getTsFileSize(),
1✔
2106
            tsFileProcessor.isSequence(),
1✔
2107
            tsFileProcessor.getTsFileResource().getTsFile().getName());
1✔
2108
    logger.info("signal closing database condition in {}", databaseName + "-" + dataRegionId);
1✔
2109
  }
1✔
2110

2111
  protected int executeCompaction() {
2112
    // the name of this variable is trySubmitCount, because the task submitted to the queue could be
2113
    // evicted due to the low priority of the task
2114
    int trySubmitCount = 0;
1✔
2115
    try {
2116
      List<Long> timePartitions = new ArrayList<>(tsFileManager.getTimePartitions());
1✔
2117
      // sort the time partition from largest to smallest
2118
      timePartitions.sort(Comparator.reverseOrder());
1✔
2119
      for (long timePartition : timePartitions) {
1✔
2120
        trySubmitCount += CompactionScheduler.scheduleCompaction(tsFileManager, timePartition);
1✔
2121
      }
1✔
2122
    } catch (Throwable e) {
×
2123
      logger.error("Meet error in compaction schedule.", e);
×
2124
    }
1✔
2125
    return trySubmitCount;
1✔
2126
  }
2127

2128
  /**
2129
   * After finishing settling tsfile, we need to do 2 things : (1) move the new tsfile to the
2130
   * correct folder, including deleting its old mods file (2) update the relevant data of this old
2131
   * tsFile in memory ,eg: FileSequenceReader, tsFileManager, cache, etc.
2132
   */
2133
  private void settleTsFileCallBack(
2134
      TsFileResource oldTsFileResource, List<TsFileResource> newTsFileResources)
2135
      throws WriteProcessException {
2136
    oldTsFileResource.readUnlock();
×
2137
    oldTsFileResource.writeLock();
×
2138
    try {
2139
      TsFileAndModSettleTool.moveNewTsFile(oldTsFileResource, newTsFileResources);
×
2140
      if (TsFileAndModSettleTool.getInstance().recoverSettleFileMap.size() != 0) {
×
2141
        TsFileAndModSettleTool.getInstance()
×
2142
            .recoverSettleFileMap
2143
            .remove(oldTsFileResource.getTsFile().getAbsolutePath());
×
2144
      }
2145
      // clear Cache , including chunk cache, timeseriesMetadata cache and bloom filter cache
2146
      operateClearCache();
×
2147

2148
      // if old tsfile is being deleted in the process due to its all data's being deleted.
2149
      if (!oldTsFileResource.getTsFile().exists()) {
×
2150
        tsFileManager.remove(oldTsFileResource, oldTsFileResource.isSeq());
×
2151
      }
2152
      FileReaderManager.getInstance().closeFileAndRemoveReader(oldTsFileResource.getTsFilePath());
×
2153
      oldTsFileResource.setSettleTsFileCallBack(null);
×
2154
      SettleService.getINSTANCE().getFilesToBeSettledCount().addAndGet(-1);
×
2155
    } catch (IOException e) {
×
2156
      logger.error("Exception to move new tsfile in settling", e);
×
2157
      throw new WriteProcessException(
×
2158
          "Meet error when settling file: " + oldTsFileResource.getTsFile().getAbsolutePath(), e);
×
2159
    } finally {
2160
      oldTsFileResource.writeUnlock();
×
2161
    }
2162
  }
×
2163

2164
  public static void operateClearCache() {
2165
    ChunkCache.getInstance().clear();
×
2166
    TimeSeriesMetadataCache.getInstance().clear();
×
2167
    BloomFilterCache.getInstance().clear();
×
2168
  }
×
2169

2170
  /** merge file under this database processor */
2171
  public int compact() {
2172
    writeLock("merge");
1✔
2173
    try {
2174
      return executeCompaction();
1✔
2175
    } finally {
2176
      writeUnlock();
1✔
2177
    }
2178
  }
2179

2180
  private void resetLastCacheWhenLoadingTsFile() throws IllegalPathException {
2181
    if (!CommonDescriptor.getInstance().getConfig().isLastCacheEnable()) {
×
2182
      return;
×
2183
    }
2184
    DataNodeSchemaCache.getInstance().takeWriteLock();
×
2185
    try {
2186
      DataNodeSchemaCache.getInstance().invalidateAll();
×
2187
    } finally {
2188
      DataNodeSchemaCache.getInstance().releaseWriteLock();
×
2189
    }
2190
  }
×
2191

2192
  /**
2193
   * Load a new tsfile to unsequence dir.
2194
   *
2195
   * <p>Then, update the latestTimeForEachDevice and partitionLatestFlushedTimeForEachDevice.
2196
   *
2197
   * @param newTsFileResource tsfile resource @UsedBy load external tsfile module
2198
   * @param deleteOriginFile whether to delete origin tsfile
2199
   */
2200
  public void loadNewTsFile(TsFileResource newTsFileResource, boolean deleteOriginFile)
2201
      throws LoadFileException {
2202
    File tsfileToBeInserted = newTsFileResource.getTsFile();
×
2203
    long newFilePartitionId = newTsFileResource.getTimePartitionWithCheck();
×
2204
    writeLock("loadNewTsFile");
×
2205
    try {
2206
      newTsFileResource.setSeq(false);
×
2207
      String newFileName =
×
2208
          getNewTsFileName(
×
2209
              System.currentTimeMillis(),
×
2210
              getAndSetNewVersion(newFilePartitionId, newTsFileResource),
×
2211
              0,
2212
              0);
2213

2214
      if (!newFileName.equals(tsfileToBeInserted.getName())) {
×
2215
        logger.info(
×
2216
            "TsFile {} must be renamed to {} for loading into the unsequence list.",
2217
            tsfileToBeInserted.getName(),
×
2218
            newFileName);
2219
        newTsFileResource.setFile(
×
2220
            fsFactory.getFile(tsfileToBeInserted.getParentFile(), newFileName));
×
2221
      }
2222
      loadTsFileToUnSequence(
×
2223
          tsfileToBeInserted, newTsFileResource, newFilePartitionId, deleteOriginFile);
2224
      FileMetrics.getInstance()
×
2225
          .addFile(
×
2226
              newTsFileResource.getTsFile().length(),
×
2227
              false,
2228
              newTsFileResource.getTsFile().getName());
×
2229

2230
      resetLastCacheWhenLoadingTsFile(); // update last cache
×
2231
      updateLastFlushTime(newTsFileResource); // update last flush time
×
2232
      long partitionNum = newTsFileResource.getTimePartition();
×
2233
      updatePartitionFileVersion(partitionNum, newTsFileResource.getVersion());
×
2234
      logger.info("TsFile {} is successfully loaded in unsequence list.", newFileName);
×
2235
    } catch (DiskSpaceInsufficientException e) {
×
2236
      logger.error(
×
2237
          "Failed to append the tsfile {} to database processor {} because the disk space is insufficient.",
2238
          tsfileToBeInserted.getAbsolutePath(),
×
2239
          tsfileToBeInserted.getParentFile().getName());
×
2240
      throw new LoadFileException(e);
×
2241
    } catch (IllegalPathException e) {
×
2242
      logger.error(
×
2243
          "Failed to reset last cache when loading file {}", newTsFileResource.getTsFilePath());
×
2244
      throw new LoadFileException(e);
×
2245
    } finally {
2246
      writeUnlock();
×
2247
    }
2248
  }
×
2249

2250
  /**
2251
   * Set the version in "partition" to "version" if "version" is larger than the current version.
2252
   */
2253
  public void setPartitionFileVersionToMax(long partition, long version) {
2254
    partitionMaxFileVersions.compute(
×
2255
        partition, (prt, oldVer) -> computeMaxVersion(oldVer, version));
×
2256
  }
×
2257

2258
  private long computeMaxVersion(Long oldVersion, Long newVersion) {
2259
    if (oldVersion == null) {
×
2260
      return newVersion;
×
2261
    }
2262
    return Math.max(oldVersion, newVersion);
×
2263
  }
2264

2265
  /**
2266
   * If the historical versions of a file is a sub-set of the given file's, (close and) remove it to
2267
   * reduce unnecessary merge. Only used when the file sender and the receiver share the same file
2268
   * close policy. Warning: DO NOT REMOVE
2269
   */
2270
  @SuppressWarnings("unused")
2271
  public void removeFullyOverlapFiles(TsFileResource resource) {
2272
    writeLock("removeFullyOverlapFiles");
×
2273
    try {
2274
      Iterator<TsFileResource> iterator = tsFileManager.getIterator(true);
×
2275
      removeFullyOverlapFiles(resource, iterator, true);
×
2276

2277
      iterator = tsFileManager.getIterator(false);
×
2278
      removeFullyOverlapFiles(resource, iterator, false);
×
2279
    } finally {
2280
      writeUnlock();
×
2281
    }
2282
  }
×
2283

2284
  private void removeFullyOverlapFiles(
2285
      TsFileResource newTsFile, Iterator<TsFileResource> iterator, boolean isSeq) {
2286
    while (iterator.hasNext()) {
×
2287
      TsFileResource existingTsFile = iterator.next();
×
2288
      if (newTsFile.isPlanRangeCovers(existingTsFile)
×
2289
          && !newTsFile.getTsFile().equals(existingTsFile.getTsFile())
×
2290
          && existingTsFile.tryWriteLock()) {
×
2291
        logger.info(
×
2292
            "{} is covered by {}: [{}, {}], [{}, {}], remove it",
2293
            existingTsFile,
2294
            newTsFile,
2295
            existingTsFile.minPlanIndex,
×
2296
            existingTsFile.maxPlanIndex,
×
2297
            newTsFile.minPlanIndex,
×
2298
            newTsFile.maxPlanIndex);
×
2299
        // if we fail to lock the file, it means it is being queried or merged and we will not
2300
        // wait until it is free, we will just leave it to the next merge
2301
        try {
2302
          removeFullyOverlapFile(existingTsFile, iterator, isSeq);
×
2303
        } catch (Exception e) {
×
2304
          logger.error(
×
2305
              "Something gets wrong while removing FullyOverlapFiles: {}",
2306
              existingTsFile.getTsFile().getAbsolutePath(),
×
2307
              e);
2308
        } finally {
2309
          existingTsFile.writeUnlock();
×
2310
        }
2311
      }
2312
    }
×
2313
  }
×
2314

2315
  /**
2316
   * remove the given tsFileResource. If the corresponding tsFileProcessor is in the working status,
2317
   * close it before remove the related resource files. maybe time-consuming for closing a tsfile.
2318
   */
2319
  private void removeFullyOverlapFile(
2320
      TsFileResource tsFileResource, Iterator<TsFileResource> iterator, boolean isSeq) {
2321
    logger.info(
×
2322
        "Removing a covered file {}, closed: {}", tsFileResource, tsFileResource.isClosed());
×
2323
    if (!tsFileResource.isClosed()) {
×
2324
      try {
2325
        // also remove the TsFileProcessor if the overlapped file is not closed
2326
        long timePartition = tsFileResource.getTimePartition();
×
2327
        Map<Long, TsFileProcessor> fileProcessorMap =
2328
            isSeq ? workSequenceTsFileProcessors : workUnsequenceTsFileProcessors;
×
2329
        TsFileProcessor tsFileProcessor = fileProcessorMap.get(timePartition);
×
2330
        if (tsFileProcessor != null && tsFileProcessor.getTsFileResource() == tsFileResource) {
×
2331
          // have to take some time to close the tsFileProcessor
2332
          tsFileProcessor.syncClose();
×
2333
          fileProcessorMap.remove(timePartition);
×
2334
        }
2335
      } catch (Exception e) {
×
2336
        logger.error("Cannot close {}", tsFileResource, e);
×
2337
      }
×
2338
    }
2339
    tsFileManager.remove(tsFileResource, isSeq);
×
2340
    iterator.remove();
×
2341
    tsFileResource.remove();
×
2342
  }
×
2343

2344
  private long getAndSetNewVersion(long timePartitionId, TsFileResource tsFileResource) {
2345
    long version = partitionMaxFileVersions.getOrDefault(timePartitionId, 0L) + 1;
×
2346
    partitionMaxFileVersions.put(timePartitionId, version);
×
2347
    tsFileResource.setVersion(version);
×
2348
    return version;
×
2349
  }
2350

2351
  /**
2352
   * Update latest time in latestTimeForEachDevice and
2353
   * partitionLatestFlushedTimeForEachDevice. @UsedBy sync module, load external tsfile module.
2354
   */
2355
  private void updateLastFlushTime(TsFileResource newTsFileResource) {
2356
    for (String device : newTsFileResource.getDevices()) {
×
2357
      long endTime = newTsFileResource.getEndTime(device);
×
2358
      long timePartitionId = StorageEngine.getTimePartition(endTime);
×
2359
      lastFlushTimeMap.updateFlushedTime(timePartitionId, device, endTime);
×
2360
      lastFlushTimeMap.updateGlobalFlushedTime(device, endTime);
×
2361
    }
×
2362
  }
×
2363

2364
  /**
2365
   * Execute the loading process by the type.
2366
   *
2367
   * @param tsFileResource tsfile resource to be loaded
2368
   * @param filePartitionId the partition id of the new file
2369
   * @param deleteOriginFile whether to delete the original file
2370
   * @return load the file successfully @UsedBy sync module, load external tsfile module.
2371
   */
2372
  private boolean loadTsFileToUnSequence(
2373
      File tsFileToLoad,
2374
      TsFileResource tsFileResource,
2375
      long filePartitionId,
2376
      boolean deleteOriginFile)
2377
      throws LoadFileException, DiskSpaceInsufficientException {
2378
    File targetFile;
2379
    targetFile =
×
2380
        fsFactory.getFile(
×
2381
            TierManager.getInstance().getNextFolderForTsFile(0, false),
×
2382
            databaseName
2383
                + File.separatorChar
2384
                + dataRegionId
2385
                + File.separatorChar
2386
                + filePartitionId
2387
                + File.separator
2388
                + tsFileResource.getTsFile().getName());
×
2389
    tsFileResource.setFile(targetFile);
×
2390
    if (tsFileManager.contains(tsFileResource, false)) {
×
2391
      logger.error("The file {} has already been loaded in unsequence list", tsFileResource);
×
2392
      return false;
×
2393
    }
2394
    tsFileManager.add(tsFileResource, false);
×
2395
    logger.info(
×
2396
        "Load tsfile in unsequence list, move file from {} to {}",
2397
        tsFileToLoad.getAbsolutePath(),
×
2398
        targetFile.getAbsolutePath());
×
2399

2400
    // move file from sync dir to data dir
2401
    if (!targetFile.getParentFile().exists()) {
×
2402
      targetFile.getParentFile().mkdirs();
×
2403
    }
2404
    try {
2405
      if (deleteOriginFile) {
×
2406
        FileUtils.moveFile(tsFileToLoad, targetFile);
×
2407
      } else {
2408
        Files.copy(tsFileToLoad.toPath(), targetFile.toPath());
×
2409
      }
2410
    } catch (IOException e) {
×
2411
      logger.error(
×
2412
          "File renaming failed when loading tsfile. Origin: {}, Target: {}",
2413
          tsFileToLoad.getAbsolutePath(),
×
2414
          targetFile.getAbsolutePath(),
×
2415
          e);
2416
      throw new LoadFileException(
×
2417
          String.format(
×
2418
              "File renaming failed when loading tsfile. Origin: %s, Target: %s, because %s",
2419
              tsFileToLoad.getAbsolutePath(), targetFile.getAbsolutePath(), e.getMessage()));
×
2420
    }
×
2421

2422
    File resourceFileToLoad =
×
2423
        fsFactory.getFile(tsFileToLoad.getAbsolutePath() + TsFileResource.RESOURCE_SUFFIX);
×
2424
    File targetResourceFile =
×
2425
        fsFactory.getFile(targetFile.getAbsolutePath() + TsFileResource.RESOURCE_SUFFIX);
×
2426
    try {
2427
      if (deleteOriginFile) {
×
2428
        FileUtils.moveFile(resourceFileToLoad, targetResourceFile);
×
2429
      } else {
2430
        Files.copy(resourceFileToLoad.toPath(), targetResourceFile.toPath());
×
2431
      }
2432
    } catch (IOException e) {
×
2433
      logger.error(
×
2434
          "File renaming failed when loading .resource file. Origin: {}, Target: {}",
2435
          resourceFileToLoad.getAbsolutePath(),
×
2436
          targetResourceFile.getAbsolutePath(),
×
2437
          e);
2438
      throw new LoadFileException(
×
2439
          String.format(
×
2440
              "File renaming failed when loading .resource file. Origin: %s, Target: %s, because %s",
2441
              resourceFileToLoad.getAbsolutePath(),
×
2442
              targetResourceFile.getAbsolutePath(),
×
2443
              e.getMessage()));
×
2444
    }
×
2445

2446
    File modFileToLoad =
×
2447
        fsFactory.getFile(tsFileToLoad.getAbsolutePath() + ModificationFile.FILE_SUFFIX);
×
2448
    if (modFileToLoad.exists()) {
×
2449
      // when successfully loaded, the filepath of the resource will be changed to the IoTDB data
2450
      // dir, so we can add a suffix to find the old modification file.
2451
      File targetModFile =
×
2452
          fsFactory.getFile(targetFile.getAbsolutePath() + ModificationFile.FILE_SUFFIX);
×
2453
      try {
2454
        Files.deleteIfExists(targetModFile.toPath());
×
2455
      } catch (IOException e) {
×
2456
        logger.warn("Cannot delete localModFile {}", targetModFile, e);
×
2457
      }
×
2458
      try {
2459
        if (deleteOriginFile) {
×
2460
          FileUtils.moveFile(modFileToLoad, targetModFile);
×
2461
        } else {
2462
          Files.copy(modFileToLoad.toPath(), targetModFile.toPath());
×
2463
        }
2464
      } catch (IOException e) {
×
2465
        logger.error(
×
2466
            "File renaming failed when loading .mod file. Origin: {}, Target: {}",
2467
            modFileToLoad.getAbsolutePath(),
×
2468
            targetModFile.getAbsolutePath(),
×
2469
            e);
2470
        throw new LoadFileException(
×
2471
            String.format(
×
2472
                "File renaming failed when loading .mod file. Origin: %s, Target: %s, because %s",
2473
                modFileToLoad.getAbsolutePath(), targetModFile.getAbsolutePath(), e.getMessage()));
×
2474
      } finally {
2475
        // ModFile will be updated during the next call to `getModFile`
2476
        tsFileResource.setModFile(null);
×
2477
      }
2478
    }
2479

2480
    updatePartitionFileVersion(filePartitionId, tsFileResource.getVersion());
×
2481
    return true;
×
2482
  }
2483

2484
  /**
2485
   * get all working sequence tsfile processors
2486
   *
2487
   * @return all working sequence tsfile processors
2488
   */
2489
  public Collection<TsFileProcessor> getWorkSequenceTsFileProcessors() {
2490
    return workSequenceTsFileProcessors.values();
1✔
2491
  }
2492

2493
  /**
2494
   * Unload tsfile and move it to the target directory if it exists.
2495
   *
2496
   * <p>Firstly, unload the TsFileResource from sequenceFileList/unSequenceFileList.
2497
   *
2498
   * <p>Secondly, move the tsfile and .resource file to the target directory.
2499
   *
2500
   * @param fileToBeUnloaded tsfile to be unloaded
2501
   * @return whether the file to be unloaded exists. @UsedBy load external tsfile module.
2502
   */
2503
  public boolean unloadTsfile(File fileToBeUnloaded, File targetDir) throws IOException {
2504
    writeLock("unloadTsfile");
×
2505
    TsFileResource tsFileResourceToBeMoved = null;
×
2506
    try {
2507
      Iterator<TsFileResource> sequenceIterator = tsFileManager.getIterator(true);
×
2508
      while (sequenceIterator.hasNext()) {
×
2509
        TsFileResource sequenceResource = sequenceIterator.next();
×
2510
        if (sequenceResource.getTsFile().getName().equals(fileToBeUnloaded.getName())) {
×
2511
          tsFileResourceToBeMoved = sequenceResource;
×
2512
          tsFileManager.remove(tsFileResourceToBeMoved, true);
×
2513
          FileMetrics.getInstance()
×
2514
              .deleteFile(
×
2515
                  new long[] {tsFileResourceToBeMoved.getTsFileSize()},
×
2516
                  true,
2517
                  Collections.singletonList(tsFileResourceToBeMoved.getTsFile().getName()));
×
2518
          break;
×
2519
        }
2520
      }
×
2521
      if (tsFileResourceToBeMoved == null) {
×
2522
        Iterator<TsFileResource> unsequenceIterator = tsFileManager.getIterator(false);
×
2523
        while (unsequenceIterator.hasNext()) {
×
2524
          TsFileResource unsequenceResource = unsequenceIterator.next();
×
2525
          if (unsequenceResource.getTsFile().getName().equals(fileToBeUnloaded.getName())) {
×
2526
            tsFileResourceToBeMoved = unsequenceResource;
×
2527
            tsFileManager.remove(tsFileResourceToBeMoved, false);
×
2528
            FileMetrics.getInstance()
×
2529
                .deleteFile(
×
2530
                    new long[] {tsFileResourceToBeMoved.getTsFileSize()},
×
2531
                    false,
2532
                    Collections.singletonList(tsFileResourceToBeMoved.getTsFile().getName()));
×
2533
            break;
×
2534
          }
2535
        }
×
2536
      }
2537
    } finally {
2538
      writeUnlock();
×
2539
    }
2540
    if (tsFileResourceToBeMoved == null) {
×
2541
      return false;
×
2542
    }
2543
    tsFileResourceToBeMoved.writeLock();
×
2544
    try {
2545
      tsFileResourceToBeMoved.moveTo(targetDir);
×
2546
      logger.info(
×
2547
          "Move tsfile {} to target dir {} successfully.",
2548
          tsFileResourceToBeMoved.getTsFile(),
×
2549
          targetDir.getPath());
×
2550
    } finally {
2551
      tsFileResourceToBeMoved.writeUnlock();
×
2552
    }
2553
    return true;
×
2554
  }
2555

2556
  /**
2557
   * get all working unsequence tsfile processors
2558
   *
2559
   * @return all working unsequence tsfile processors
2560
   */
2561
  public Collection<TsFileProcessor> getWorkUnsequenceTsFileProcessors() {
2562
    return workUnsequenceTsFileProcessors.values();
1✔
2563
  }
2564

2565
  public void setDataTTLWithTimePrecisionCheck(long dataTTL) {
2566
    if (dataTTL != Long.MAX_VALUE) {
×
2567
      dataTTL =
×
2568
          CommonDateTimeUtils.convertMilliTimeWithPrecision(
×
2569
              dataTTL, CommonDescriptor.getInstance().getConfig().getTimestampPrecision());
×
2570
    }
2571
    this.dataTTL = dataTTL;
×
2572
  }
×
2573

2574
  public void setDataTTL(long dataTTL) {
2575
    this.dataTTL = dataTTL;
1✔
2576
  }
1✔
2577

2578
  public List<TsFileResource> getSequenceFileList() {
2579
    return tsFileManager.getTsFileList(true);
1✔
2580
  }
2581

2582
  public List<TsFileResource> getUnSequenceFileList() {
2583
    return tsFileManager.getTsFileList(false);
1✔
2584
  }
2585

2586
  public String getDataRegionId() {
2587
    return dataRegionId;
1✔
2588
  }
2589

2590
  /**
2591
   * Get the storageGroupPath with dataRegionId.
2592
   *
2593
   * @return data region path, like root.sg1/0
2594
   */
2595
  public String getStorageGroupPath() {
2596
    return databaseName + File.separator + dataRegionId;
×
2597
  }
2598

2599
  /**
2600
   * Check if the data of "tsFileResource" all exist locally by comparing planIndexes in the
2601
   * partition of "partitionNumber". This is available only when the IoTDB instances which generated
2602
   * "tsFileResource" have the same plan indexes as the local one.
2603
   *
2604
   * @return true if any file contains plans with indexes no less than the max plan index of
2605
   *     "tsFileResource", otherwise false.
2606
   */
2607
  public boolean isFileAlreadyExist(TsFileResource tsFileResource, long partitionNum) {
2608
    // examine working processor first as they have the largest plan index
2609
    return isFileAlreadyExistInWorking(
×
2610
            tsFileResource, partitionNum, getWorkSequenceTsFileProcessors())
×
2611
        || isFileAlreadyExistInWorking(
×
2612
            tsFileResource, partitionNum, getWorkUnsequenceTsFileProcessors())
×
2613
        || isFileAlreadyExistInClosed(tsFileResource, partitionNum, getSequenceFileList())
×
2614
        || isFileAlreadyExistInClosed(tsFileResource, partitionNum, getUnSequenceFileList());
×
2615
  }
2616

2617
  private boolean isFileAlreadyExistInClosed(
2618
      TsFileResource tsFileResource, long partitionNum, Collection<TsFileResource> existingFiles) {
2619
    for (TsFileResource resource : existingFiles) {
×
2620
      if (resource.getTimePartition() == partitionNum
×
2621
          && resource.getMaxPlanIndex() > tsFileResource.getMaxPlanIndex()) {
×
2622
        logger.info(
×
2623
            "{} is covered by a closed file {}: [{}, {}] [{}, {}]",
2624
            tsFileResource,
2625
            resource,
2626
            tsFileResource.minPlanIndex,
×
2627
            tsFileResource.maxPlanIndex,
×
2628
            resource.minPlanIndex,
×
2629
            resource.maxPlanIndex);
×
2630
        return true;
×
2631
      }
2632
    }
×
2633
    return false;
×
2634
  }
2635

2636
  private boolean isFileAlreadyExistInWorking(
2637
      TsFileResource tsFileResource,
2638
      long partitionNum,
2639
      Collection<TsFileProcessor> workingProcessors) {
2640
    for (TsFileProcessor workingProcesssor : workingProcessors) {
×
2641
      if (workingProcesssor.getTimeRangeId() == partitionNum) {
×
2642
        TsFileResource workResource = workingProcesssor.getTsFileResource();
×
2643
        boolean isCovered = workResource.getMaxPlanIndex() > tsFileResource.getMaxPlanIndex();
×
2644
        if (isCovered) {
×
2645
          logger.info(
×
2646
              "{} is covered by a working file {}: [{}, {}] [{}, {}]",
2647
              tsFileResource,
2648
              workResource,
2649
              tsFileResource.minPlanIndex,
×
2650
              tsFileResource.maxPlanIndex,
×
2651
              workResource.minPlanIndex,
×
2652
              workResource.maxPlanIndex);
×
2653
        }
2654
        return isCovered;
×
2655
      }
2656
    }
×
2657
    return false;
×
2658
  }
2659

2660
  public void abortCompaction() {
2661
    tsFileManager.setAllowCompaction(false);
1✔
2662
    List<AbstractCompactionTask> runningTasks =
2663
        CompactionTaskManager.getInstance().abortCompaction(databaseName + "-" + dataRegionId);
1✔
2664
    while (CompactionTaskManager.getInstance().isAnyTaskInListStillRunning(runningTasks)) {
1✔
2665
      try {
2666
        TimeUnit.MILLISECONDS.sleep(10);
×
2667
      } catch (InterruptedException e) {
×
2668
        logger.error("Thread get interrupted when waiting compaction to finish", e);
×
2669
        Thread.currentThread().interrupt();
×
2670
      }
×
2671
    }
2672
    if (timedCompactionScheduleTask != null) {
1✔
2673
      timedCompactionScheduleTask.shutdownNow();
1✔
2674
    }
2675
  }
1✔
2676

2677
  public TsFileManager getTsFileResourceManager() {
2678
    return tsFileManager;
1✔
2679
  }
2680

2681
  /**
2682
   * insert batch of rows belongs to one device
2683
   *
2684
   * @param insertRowsOfOneDeviceNode batch of rows belongs to one device
2685
   */
2686
  public void insert(InsertRowsOfOneDeviceNode insertRowsOfOneDeviceNode)
2687
      throws WriteProcessException, BatchProcessException {
2688
    if (enableMemControl) {
×
2689
      StorageEngine.blockInsertionIfReject(null);
×
2690
    }
2691
    long startTime = System.nanoTime();
×
2692
    writeLock("InsertRowsOfOneDevice");
×
2693
    PERFORMANCE_OVERVIEW_METRICS.recordScheduleLockCost(System.nanoTime() - startTime);
×
2694
    try {
2695
      if (deleted) {
×
2696
        return;
×
2697
      }
2698
      boolean isSequence = false;
×
2699
      for (int i = 0; i < insertRowsOfOneDeviceNode.getInsertRowNodeList().size(); i++) {
×
2700
        InsertRowNode insertRowNode = insertRowsOfOneDeviceNode.getInsertRowNodeList().get(i);
×
2701
        if (!isAlive(insertRowNode.getTime())) {
×
2702
          // we do not need to write these part of data, as they can not be queried
2703
          // or the sub-plan has already been executed, we are retrying other sub-plans
2704
          insertRowsOfOneDeviceNode
×
2705
              .getResults()
×
2706
              .put(
×
2707
                  i,
×
2708
                  RpcUtils.getStatus(
×
2709
                      TSStatusCode.OUT_OF_TTL.getStatusCode(),
×
2710
                      String.format(
×
2711
                          "Insertion time [%s] is less than ttl time bound [%s]",
2712
                          DateTimeUtils.convertLongToDate(insertRowNode.getTime()),
×
2713
                          DateTimeUtils.convertLongToDate(DateTimeUtils.currentTime() - dataTTL))));
×
2714
          continue;
×
2715
        }
2716
        // init map
2717
        long timePartitionId = StorageEngine.getTimePartition(insertRowNode.getTime());
×
2718

2719
        if (!lastFlushTimeMap.checkAndCreateFlushedTimePartition(timePartitionId)) {
×
2720
          TimePartitionManager.getInstance()
×
2721
              .registerTimePartitionInfo(
×
2722
                  new TimePartitionInfo(
2723
                      new DataRegionId(Integer.valueOf(dataRegionId)),
×
2724
                      timePartitionId,
2725
                      true,
2726
                      Long.MAX_VALUE,
2727
                      0,
2728
                      tsFileManager.isLatestTimePartition(timePartitionId)));
×
2729
        }
2730

2731
        // as the plans have been ordered, and we have get the write lock,
2732
        // So, if a plan is sequenced, then all the rest plans are sequenced.
2733
        //
2734
        if (!isSequence) {
×
2735
          isSequence =
×
2736
              insertRowNode.getTime()
×
2737
                  > lastFlushTimeMap.getFlushedTime(
×
2738
                      timePartitionId, insertRowNode.getDevicePath().getFullPath());
×
2739
        }
2740
        // is unsequence and user set config to discard out of order data
2741
        if (!isSequence
×
2742
            && IoTDBDescriptor.getInstance().getConfig().isEnableDiscardOutOfOrderData()) {
×
2743
          return;
×
2744
        }
2745

2746
        // insert to sequence or unSequence file
2747
        try {
2748
          insertToTsFileProcessor(insertRowNode, isSequence, timePartitionId);
×
2749
        } catch (WriteProcessException e) {
×
2750
          insertRowsOfOneDeviceNode
×
2751
              .getResults()
×
2752
              .put(i, RpcUtils.getStatus(e.getErrorCode(), e.getMessage()));
×
2753
        }
×
2754
      }
2755
    } finally {
2756
      writeUnlock();
×
2757
    }
2758
    if (!insertRowsOfOneDeviceNode.getResults().isEmpty()) {
×
2759
      throw new BatchProcessException("Partial failed inserting rows of one device");
×
2760
    }
2761
  }
×
2762

2763
  /**
2764
   * insert batch of rows belongs to multiple devices
2765
   *
2766
   * @param insertRowsNode batch of rows belongs to multiple devices
2767
   */
2768
  public void insert(InsertRowsNode insertRowsNode) throws BatchProcessException {
2769
    for (int i = 0; i < insertRowsNode.getInsertRowNodeList().size(); i++) {
×
2770
      InsertRowNode insertRowNode = insertRowsNode.getInsertRowNodeList().get(i);
×
2771
      try {
2772
        insert(insertRowNode);
×
2773
      } catch (WriteProcessException e) {
×
2774
        insertRowsNode.getResults().put(i, RpcUtils.getStatus(e.getErrorCode(), e.getMessage()));
×
2775
      }
×
2776
    }
2777

2778
    if (!insertRowsNode.getResults().isEmpty()) {
×
2779
      throw new BatchProcessException("Partial failed inserting rows");
×
2780
    }
2781
  }
×
2782

2783
  /**
2784
   * insert batch of tablets belongs to multiple devices
2785
   *
2786
   * @param insertMultiTabletsNode batch of tablets belongs to multiple devices
2787
   */
2788
  public void insertTablets(InsertMultiTabletsNode insertMultiTabletsNode)
2789
      throws BatchProcessException {
2790
    for (int i = 0; i < insertMultiTabletsNode.getInsertTabletNodeList().size(); i++) {
×
2791
      InsertTabletNode insertTabletNode = insertMultiTabletsNode.getInsertTabletNodeList().get(i);
×
2792
      try {
2793
        insertTablet(insertTabletNode);
×
2794
      } catch (WriteProcessException | BatchProcessException e) {
×
2795
        insertMultiTabletsNode
×
2796
            .getResults()
×
2797
            .put(i, RpcUtils.getStatus(e.getErrorCode(), e.getMessage()));
×
2798
      }
×
2799
    }
2800

2801
    if (!insertMultiTabletsNode.getResults().isEmpty()) {
×
2802
      throw new BatchProcessException("Partial failed inserting multi tablets");
×
2803
    }
2804
  }
×
2805

2806
  /** @return the disk space occupied by this data region, unit is MB */
2807
  public long countRegionDiskSize() {
2808
    AtomicLong diskSize = new AtomicLong(0);
×
2809
    TierManager.getInstance()
×
2810
        .getAllLocalFilesFolders()
×
2811
        .forEach(
×
2812
            folder -> {
2813
              folder = folder + File.separator + databaseName + File.separator + dataRegionId;
×
2814
              countFolderDiskSize(folder, diskSize);
×
2815
            });
×
2816
    return diskSize.get() / 1024 / 1024;
×
2817
  }
2818

2819
  /**
2820
   * @param folder the folder's path
2821
   * @param diskSize the disk space occupied by this folder, unit is MB
2822
   */
2823
  private void countFolderDiskSize(String folder, AtomicLong diskSize) {
2824
    File file = FSFactoryProducer.getFSFactory().getFile(folder);
×
2825
    File[] allFile = file.listFiles();
×
2826
    if (allFile == null) {
×
2827
      return;
×
2828
    }
2829
    for (File f : allFile) {
×
2830
      if (f.isFile()) {
×
2831
        diskSize.addAndGet(f.length());
×
2832
      } else if (f.isDirectory()) {
×
2833
        countFolderDiskSize(f.getAbsolutePath(), diskSize);
×
2834
      }
2835
    }
2836
  }
×
2837

2838
  public void addSettleFilesToList(
2839
      List<TsFileResource> seqResourcesToBeSettled,
2840
      List<TsFileResource> unseqResourcesToBeSettled,
2841
      List<String> tsFilePaths) {
2842
    if (tsFilePaths.isEmpty()) {
×
2843
      for (TsFileResource resource : tsFileManager.getTsFileList(true)) {
×
2844
        if (!resource.isClosed()) {
×
2845
          continue;
×
2846
        }
2847
        resource.setSettleTsFileCallBack(this::settleTsFileCallBack);
×
2848
        seqResourcesToBeSettled.add(resource);
×
2849
      }
×
2850
      for (TsFileResource resource : tsFileManager.getTsFileList(false)) {
×
2851
        if (!resource.isClosed()) {
×
2852
          continue;
×
2853
        }
2854
        resource.setSettleTsFileCallBack(this::settleTsFileCallBack);
×
2855
        unseqResourcesToBeSettled.add(resource);
×
2856
      }
×
2857
    } else {
2858
      for (String tsFilePath : tsFilePaths) {
×
2859
        File fileToBeSettled = new File(tsFilePath);
×
2860
        if ("sequence"
×
2861
            .equals(
×
2862
                fileToBeSettled
2863
                    .getParentFile()
×
2864
                    .getParentFile()
×
2865
                    .getParentFile()
×
2866
                    .getParentFile()
×
2867
                    .getName())) {
×
2868
          for (TsFileResource resource : tsFileManager.getTsFileList(true)) {
×
2869
            if (resource.getTsFile().getAbsolutePath().equals(tsFilePath)) {
×
2870
              resource.setSettleTsFileCallBack(this::settleTsFileCallBack);
×
2871
              seqResourcesToBeSettled.add(resource);
×
2872
              break;
×
2873
            }
2874
          }
×
2875
        } else {
2876
          for (TsFileResource resource : tsFileManager.getTsFileList(false)) {
×
2877
            if (resource.getTsFile().getAbsolutePath().equals(tsFilePath)) {
×
2878
              unseqResourcesToBeSettled.add(resource);
×
2879
              break;
×
2880
            }
2881
          }
×
2882
        }
2883
      }
×
2884
    }
2885
  }
×
2886

2887
  public void setCustomCloseFileListeners(List<CloseFileListener> customCloseFileListeners) {
2888
    this.customCloseFileListeners = customCloseFileListeners;
×
2889
  }
×
2890

2891
  public void setCustomFlushListeners(List<FlushListener> customFlushListeners) {
2892
    this.customFlushListeners = customFlushListeners;
×
2893
  }
×
2894

2895
  public void setAllowCompaction(boolean allowCompaction) {
2896
    this.tsFileManager.setAllowCompaction(allowCompaction);
×
2897
  }
×
2898

2899
  @FunctionalInterface
2900
  public interface CloseTsFileCallBack {
2901

2902
    void call(TsFileProcessor caller) throws TsFileProcessorException, IOException;
2903
  }
2904

2905
  @FunctionalInterface
2906
  public interface UpdateEndTimeCallBack {
2907

2908
    void call(TsFileProcessor caller, Map<String, Long> updateMap, long systemFlushTime);
2909
  }
2910

2911
  @FunctionalInterface
2912
  public interface CompactionRecoverCallBack {
2913
    void call();
2914
  }
2915

2916
  @FunctionalInterface
2917
  public interface TimePartitionFilter {
2918

2919
    boolean satisfy(String storageGroupName, long timePartitionId);
2920
  }
2921

2922
  @FunctionalInterface
2923
  public interface SettleTsFileCallBack {
2924

2925
    void call(TsFileResource oldTsFileResource, List<TsFileResource> newTsFileResources)
2926
        throws WriteProcessException;
2927
  }
2928

2929
  public List<Long> getTimePartitions() {
2930
    return new ArrayList<>(partitionMaxFileVersions.keySet());
×
2931
  }
2932

2933
  public Long getLatestTimePartition() {
2934
    return getTimePartitions().stream().max(Long::compareTo).orElse(0L);
×
2935
  }
2936

2937
  public String getInsertWriteLockHolder() {
2938
    return insertWriteLockHolder;
×
2939
  }
2940

2941
  public ScheduledExecutorService getTimedCompactionScheduleTask() {
2942
    return timedCompactionScheduleTask;
×
2943
  }
2944

2945
  /** This method could only be used in iot consensus */
2946
  public IWALNode getWALNode() {
2947
    if (!config.getDataRegionConsensusProtocolClass().equals(ConsensusFactory.IOT_CONSENSUS)) {
×
2948
      throw new UnsupportedOperationException();
×
2949
    }
2950
    // identifier should be same with getTsFileProcessor method
2951
    return WALManager.getInstance()
×
2952
        .applyForWALNode(databaseName + FILE_NAME_SEPARATOR + dataRegionId);
×
2953
  }
2954

2955
  /** Wait for this data region successfully deleted */
2956
  public void waitForDeleted() {
2957
    writeLock("waitForDeleted");
×
2958
    try {
2959
      if (!deleted) {
×
2960
        deletedCondition.await();
×
2961
      }
2962
    } catch (InterruptedException e) {
×
2963
      logger.error("Interrupted When waiting for data region deleted.");
×
2964
      Thread.currentThread().interrupt();
×
2965
    } finally {
2966
      writeUnlock();
×
2967
    }
2968
  }
×
2969

2970
  /** Release all threads waiting for this data region successfully deleted */
2971
  public void markDeleted() {
2972
    writeLock("markDeleted");
1✔
2973
    try {
2974
      deleted = true;
1✔
2975
      deletedCondition.signalAll();
1✔
2976
    } finally {
2977
      writeUnlock();
1✔
2978
    }
2979
  }
1✔
2980

2981
  public void releaseFlushTimeMap(long timePartitionId) {
2982
    lastFlushTimeMap.removePartition(timePartitionId);
×
2983
  }
×
2984

2985
  public long getMemCost() {
2986
    return dataRegionInfo.getMemCost();
×
2987
  }
2988

2989
  @Override
2990
  public long getDataTTL() {
2991
    return dataTTL;
×
2992
  }
2993

2994
  @TestOnly
2995
  public ILastFlushTimeMap getLastFlushTimeMap() {
2996
    return lastFlushTimeMap;
×
2997
  }
2998

2999
  @TestOnly
3000
  public TsFileManager getTsFileManager() {
3001
    return tsFileManager;
1✔
3002
  }
3003
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc