• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

apache / iotdb / #9968

31 Aug 2023 02:59AM UTC coverage: 47.703% (+0.003%) from 47.7%
#9968

push

travis_ci

web-flow
[To rel/1.2] Refactoring DeleteOutdatedFileTask in WalNode (#10992)

99 of 99 new or added lines in 1 file covered. (100.0%)

80192 of 168108 relevant lines covered (47.7%)

0.48 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

78.05
/iotdb-core/tsfile/src/main/java/org/apache/iotdb/tsfile/read/TsFileSequenceReader.java
1
/*
2
 * Licensed to the Apache Software Foundation (ASF) under one
3
 * or more contributor license agreements.  See the NOTICE file
4
 * distributed with this work for additional information
5
 * regarding copyright ownership.  The ASF licenses this file
6
 * to you under the Apache License, Version 2.0 (the
7
 * "License"); you may not use this file except in compliance
8
 * with the License.  You may obtain a copy of the License at
9
 * <p>
10
 * http://www.apache.org/licenses/LICENSE-2.0
11
 * <p>
12
 * Unless required by applicable law or agreed to in writing,
13
 * software distributed under the License is distributed on an
14
 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15
 * KIND, either express or implied.  See the License for the
16
 * specific language governing permissions and limitations
17
 * under the License.
18
 */
19

20
package org.apache.iotdb.tsfile.read;
21

22
import org.apache.iotdb.tsfile.common.conf.TSFileConfig;
23
import org.apache.iotdb.tsfile.common.conf.TSFileDescriptor;
24
import org.apache.iotdb.tsfile.common.constant.TsFileConstant;
25
import org.apache.iotdb.tsfile.compress.IUnCompressor;
26
import org.apache.iotdb.tsfile.encoding.decoder.Decoder;
27
import org.apache.iotdb.tsfile.exception.TsFileRuntimeException;
28
import org.apache.iotdb.tsfile.exception.TsFileStatisticsMistakesException;
29
import org.apache.iotdb.tsfile.file.MetaMarker;
30
import org.apache.iotdb.tsfile.file.header.ChunkGroupHeader;
31
import org.apache.iotdb.tsfile.file.header.ChunkHeader;
32
import org.apache.iotdb.tsfile.file.header.PageHeader;
33
import org.apache.iotdb.tsfile.file.metadata.AlignedChunkMetadata;
34
import org.apache.iotdb.tsfile.file.metadata.AlignedTimeSeriesMetadata;
35
import org.apache.iotdb.tsfile.file.metadata.ChunkGroupMetadata;
36
import org.apache.iotdb.tsfile.file.metadata.ChunkMetadata;
37
import org.apache.iotdb.tsfile.file.metadata.IChunkMetadata;
38
import org.apache.iotdb.tsfile.file.metadata.ITimeSeriesMetadata;
39
import org.apache.iotdb.tsfile.file.metadata.MetadataIndexEntry;
40
import org.apache.iotdb.tsfile.file.metadata.MetadataIndexNode;
41
import org.apache.iotdb.tsfile.file.metadata.TimeseriesMetadata;
42
import org.apache.iotdb.tsfile.file.metadata.TsFileMetadata;
43
import org.apache.iotdb.tsfile.file.metadata.enums.CompressionType;
44
import org.apache.iotdb.tsfile.file.metadata.enums.MetadataIndexNodeType;
45
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
46
import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding;
47
import org.apache.iotdb.tsfile.file.metadata.statistics.Statistics;
48
import org.apache.iotdb.tsfile.fileSystem.FSFactoryProducer;
49
import org.apache.iotdb.tsfile.read.common.BatchData;
50
import org.apache.iotdb.tsfile.read.common.Chunk;
51
import org.apache.iotdb.tsfile.read.common.Path;
52
import org.apache.iotdb.tsfile.read.controller.CachedChunkLoaderImpl;
53
import org.apache.iotdb.tsfile.read.controller.MetadataQuerierByFileImpl;
54
import org.apache.iotdb.tsfile.read.reader.TsFileInput;
55
import org.apache.iotdb.tsfile.read.reader.page.PageReader;
56
import org.apache.iotdb.tsfile.read.reader.page.TimePageReader;
57
import org.apache.iotdb.tsfile.read.reader.page.ValuePageReader;
58
import org.apache.iotdb.tsfile.utils.BloomFilter;
59
import org.apache.iotdb.tsfile.utils.Pair;
60
import org.apache.iotdb.tsfile.utils.ReadWriteForEncodingUtils;
61
import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
62
import org.apache.iotdb.tsfile.utils.TsPrimitiveType;
63
import org.apache.iotdb.tsfile.write.schema.IMeasurementSchema;
64
import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
65

66
import org.slf4j.Logger;
67
import org.slf4j.LoggerFactory;
68

69
import java.io.File;
70
import java.io.IOException;
71
import java.io.Serializable;
72
import java.nio.ByteBuffer;
73
import java.util.ArrayList;
74
import java.util.Collections;
75
import java.util.Comparator;
76
import java.util.HashMap;
77
import java.util.HashSet;
78
import java.util.Iterator;
79
import java.util.LinkedHashMap;
80
import java.util.LinkedList;
81
import java.util.List;
82
import java.util.Map;
83
import java.util.NoSuchElementException;
84
import java.util.Objects;
85
import java.util.Queue;
86
import java.util.Set;
87
import java.util.TreeMap;
88
import java.util.concurrent.ConcurrentHashMap;
89
import java.util.concurrent.locks.ReadWriteLock;
90
import java.util.concurrent.locks.ReentrantReadWriteLock;
91
import java.util.stream.Collectors;
92

93
public class TsFileSequenceReader implements AutoCloseable {
94

95
  private static final Logger logger = LoggerFactory.getLogger(TsFileSequenceReader.class);
1✔
96
  private static final Logger resourceLogger = LoggerFactory.getLogger("FileMonitor");
1✔
97
  protected static final TSFileConfig config = TSFileDescriptor.getInstance().getConfig();
1✔
98
  private static final String METADATA_INDEX_NODE_DESERIALIZE_ERROR =
99
      "Something error happened while deserializing MetadataIndexNode of file {}";
100
  private static final int MAX_READ_BUFFER_SIZE = 4 * 1024 * 1024;
101
  protected String file;
102
  protected TsFileInput tsFileInput;
103
  protected long fileMetadataPos;
104
  protected int fileMetadataSize;
105
  private ByteBuffer markerBuffer = ByteBuffer.allocate(Byte.BYTES);
1✔
106

107
  @SuppressWarnings("squid:S3077")
108
  protected volatile TsFileMetadata tsFileMetaData;
109

110
  // device -> measurement -> TimeseriesMetadata
111
  private Map<String, Map<String, TimeseriesMetadata>> cachedDeviceMetadata =
1✔
112
      new ConcurrentHashMap<>();
113
  private static final ReadWriteLock cacheLock = new ReentrantReadWriteLock();
1✔
114
  private boolean cacheDeviceMetadata;
115
  private long minPlanIndex = Long.MAX_VALUE;
1✔
116
  private long maxPlanIndex = Long.MIN_VALUE;
1✔
117

118
  /**
119
   * Create a file reader of the given file. The reader will read the tail of the file to get the
120
   * file metadata size.Then the reader will skip the first
121
   * TSFileConfig.MAGIC_STRING.getBytes().length + TSFileConfig.NUMBER_VERSION.getBytes().length
122
   * bytes of the file for preparing reading real data.
123
   *
124
   * @param file the data file
125
   * @throws IOException If some I/O error occurs
126
   */
127
  public TsFileSequenceReader(String file) throws IOException {
128
    this(file, true);
1✔
129
  }
1✔
130

131
  /**
132
   * construct function for TsFileSequenceReader.
133
   *
134
   * @param file -given file name
135
   * @param loadMetadataSize -whether load meta data size
136
   */
137
  public TsFileSequenceReader(String file, boolean loadMetadataSize) throws IOException {
1✔
138
    if (resourceLogger.isDebugEnabled()) {
1✔
139
      resourceLogger.debug("{} reader is opened. {}", file, getClass().getName());
×
140
    }
141
    this.file = file;
1✔
142
    tsFileInput = FSFactoryProducer.getFileInputFactory().getTsFileInput(file);
1✔
143
    try {
144
      if (loadMetadataSize) {
1✔
145
        loadMetadataSize();
1✔
146
      }
147
    } catch (Throwable e) {
×
148
      tsFileInput.close();
×
149
      throw e;
×
150
    }
1✔
151
  }
1✔
152

153
  // used in merge resource
154
  public TsFileSequenceReader(String file, boolean loadMetadata, boolean cacheDeviceMetadata)
155
      throws IOException {
156
    this(file, loadMetadata);
1✔
157
    this.cacheDeviceMetadata = cacheDeviceMetadata;
1✔
158
  }
1✔
159

160
  /**
161
   * Create a file reader of the given file. The reader will read the tail of the file to get the
162
   * file metadata size.Then the reader will skip the first
163
   * TSFileConfig.MAGIC_STRING.getBytes().length + TSFileConfig.NUMBER_VERSION.getBytes().length
164
   * bytes of the file for preparing reading real data.
165
   *
166
   * @param input given input
167
   */
168
  public TsFileSequenceReader(TsFileInput input) throws IOException {
169
    this(input, true);
×
170
  }
×
171

172
  /**
173
   * construct function for TsFileSequenceReader.
174
   *
175
   * @param input -given input
176
   * @param loadMetadataSize -load meta data size
177
   */
178
  public TsFileSequenceReader(TsFileInput input, boolean loadMetadataSize) throws IOException {
×
179
    this.tsFileInput = input;
×
180
    this.file = input.getFilePath();
×
181
    try {
182
      if (loadMetadataSize) { // NOTE no autoRepair here
×
183
        loadMetadataSize();
×
184
      }
185
    } catch (Throwable e) {
×
186
      tsFileInput.close();
×
187
      throw e;
×
188
    }
×
189
  }
×
190

191
  /**
192
   * construct function for TsFileSequenceReader.
193
   *
194
   * @param input the input of a tsfile. The current position should be a marker and then a chunk
195
   *     Header, rather than the magic number
196
   * @param fileMetadataPos the position of the file metadata in the TsFileInput from the beginning
197
   *     of the input to the current position
198
   * @param fileMetadataSize the byte size of the file metadata in the input
199
   */
200
  public TsFileSequenceReader(TsFileInput input, long fileMetadataPos, int fileMetadataSize) {
×
201
    this.tsFileInput = input;
×
202
    this.fileMetadataPos = fileMetadataPos;
×
203
    this.fileMetadataSize = fileMetadataSize;
×
204
  }
×
205

206
  public void loadMetadataSize() throws IOException {
207
    ByteBuffer metadataSize = ByteBuffer.allocate(Integer.BYTES);
1✔
208
    if (readTailMagic().equals(TSFileConfig.MAGIC_STRING)) {
1✔
209
      tsFileInput.read(
1✔
210
          metadataSize,
211
          tsFileInput.size() - TSFileConfig.MAGIC_STRING.getBytes().length - Integer.BYTES);
1✔
212
      metadataSize.flip();
1✔
213
      // read file metadata size and position
214
      fileMetadataSize = ReadWriteIOUtils.readInt(metadataSize);
1✔
215
      fileMetadataPos =
1✔
216
          tsFileInput.size()
1✔
217
              - TSFileConfig.MAGIC_STRING.getBytes().length
1✔
218
              - Integer.BYTES
219
              - fileMetadataSize;
220
    }
221
  }
1✔
222

223
  public long getFileMetadataPos() {
224
    return fileMetadataPos;
1✔
225
  }
226

227
  public int getTsFileMetadataSize() {
228
    return fileMetadataSize;
×
229
  }
230

231
  /** Return the tsfile meta data size of this tsfile. */
232
  public long getFileMetadataSize() throws IOException {
233
    return tsFileInput.size() - getFileMetadataPos();
×
234
  }
235

236
  /**
237
   * Return the whole meta data size of this tsfile, including ChunkMetadata, TimeseriesMetadata and
238
   * etc.
239
   */
240
  public long getAllMetadataSize() throws IOException {
241
    if (tsFileMetaData == null) {
1✔
242
      readFileMetadata();
×
243
    }
244
    return tsFileInput.size() - tsFileMetaData.getMetaOffset();
1✔
245
  }
246

247
  /** this function does not modify the position of the file reader. */
248
  public String readTailMagic() throws IOException {
249
    long totalSize = tsFileInput.size();
1✔
250
    ByteBuffer magicStringBytes = ByteBuffer.allocate(TSFileConfig.MAGIC_STRING.getBytes().length);
1✔
251
    tsFileInput.read(magicStringBytes, totalSize - TSFileConfig.MAGIC_STRING.getBytes().length);
1✔
252
    magicStringBytes.flip();
1✔
253
    return new String(magicStringBytes.array());
1✔
254
  }
255

256
  /** whether the file is a complete TsFile: only if the head magic and tail magic string exists. */
257
  public boolean isComplete() throws IOException {
258
    long size = tsFileInput.size();
1✔
259
    // TSFileConfig.MAGIC_STRING.getBytes().length * 2 for two magic string
260
    // Byte.BYTES for the file version number
261
    if (size >= TSFileConfig.MAGIC_STRING.getBytes().length * 2 + Byte.BYTES) {
1✔
262
      String tailMagic = readTailMagic();
1✔
263
      String headMagic = readHeadMagic();
1✔
264
      return tailMagic.equals(headMagic);
1✔
265
    } else {
266
      return false;
1✔
267
    }
268
  }
269

270
  /** this function does not modify the position of the file reader. */
271
  public String readHeadMagic() throws IOException {
272
    ByteBuffer magicStringBytes = ByteBuffer.allocate(TSFileConfig.MAGIC_STRING.getBytes().length);
1✔
273
    tsFileInput.read(magicStringBytes, 0);
1✔
274
    magicStringBytes.flip();
1✔
275
    return new String(magicStringBytes.array());
1✔
276
  }
277

278
  /** this function reads version number and checks compatibility of TsFile. */
279
  public byte readVersionNumber() throws IOException {
280
    ByteBuffer versionNumberByte = ByteBuffer.allocate(Byte.BYTES);
1✔
281
    tsFileInput.read(versionNumberByte, TSFileConfig.MAGIC_STRING.getBytes().length);
1✔
282
    versionNumberByte.flip();
1✔
283
    return versionNumberByte.get();
1✔
284
  }
285

286
  /**
287
   * this function does not modify the position of the file reader.
288
   *
289
   * @throws IOException io error
290
   */
291
  public TsFileMetadata readFileMetadata() throws IOException {
292
    try {
293
      if (tsFileMetaData == null) {
1✔
294
        synchronized (this) {
1✔
295
          if (tsFileMetaData == null) {
1✔
296
            tsFileMetaData =
1✔
297
                TsFileMetadata.deserializeFrom(readData(fileMetadataPos, fileMetadataSize));
1✔
298
          }
299
        }
1✔
300
      }
301
    } catch (Exception e) {
1✔
302
      logger.error("Something error happened while reading file metadata of file {}", file);
1✔
303
      throw e;
1✔
304
    }
1✔
305
    return tsFileMetaData;
1✔
306
  }
307

308
  /**
309
   * this function does not modify the position of the file reader.
310
   *
311
   * @throws IOException io error
312
   */
313
  public BloomFilter readBloomFilter() throws IOException {
314
    readFileMetadata();
1✔
315
    return tsFileMetaData.getBloomFilter();
1✔
316
  }
317

318
  /**
319
   * this function reads measurements and TimeseriesMetaDatas in given device Thread Safe
320
   *
321
   * @param device name
322
   * @return the map measurementId -> TimeseriesMetaData in one device
323
   * @throws IOException io error
324
   */
325
  public Map<String, TimeseriesMetadata> readDeviceMetadata(String device) throws IOException {
326
    if (!cacheDeviceMetadata) {
1✔
327
      return readDeviceMetadataFromDisk(device);
1✔
328
    }
329

330
    cacheLock.readLock().lock();
×
331
    try {
332
      if (cachedDeviceMetadata.containsKey(device)) {
×
333
        return cachedDeviceMetadata.get(device);
×
334
      }
335
    } finally {
336
      cacheLock.readLock().unlock();
×
337
    }
338

339
    cacheLock.writeLock().lock();
×
340
    try {
341
      if (cachedDeviceMetadata.containsKey(device)) {
×
342
        return cachedDeviceMetadata.get(device);
×
343
      }
344
      readFileMetadata();
×
345
      Map<String, TimeseriesMetadata> deviceMetadata = readDeviceMetadataFromDisk(device);
×
346
      cachedDeviceMetadata.put(device, deviceMetadata);
×
347
      return deviceMetadata;
×
348
    } finally {
349
      cacheLock.writeLock().unlock();
×
350
    }
351
  }
352

353
  private Map<String, TimeseriesMetadata> readDeviceMetadataFromDisk(String device)
354
      throws IOException {
355
    readFileMetadata();
1✔
356
    List<TimeseriesMetadata> timeseriesMetadataList =
1✔
357
        getDeviceTimeseriesMetadataWithoutChunkMetadata(device);
1✔
358
    Map<String, TimeseriesMetadata> deviceMetadata = new HashMap<>();
1✔
359
    for (TimeseriesMetadata timeseriesMetadata : timeseriesMetadataList) {
1✔
360
      deviceMetadata.put(timeseriesMetadata.getMeasurementId(), timeseriesMetadata);
1✔
361
    }
1✔
362
    return deviceMetadata;
1✔
363
  }
364

365
  /** @deprecated Use {@link #readTimeseriesMetadata(String, String, boolean)} instead. */
366
  @Deprecated
367
  @SuppressWarnings("java:S1133") // suppress warn of deprecation
368
  public TimeseriesMetadata readTimeseriesMetadata(Path path, boolean ignoreNotExists)
369
      throws IOException {
370
    return readTimeseriesMetadata(path.getDevice(), path.getMeasurement(), ignoreNotExists);
×
371
  }
372

373
  public TimeseriesMetadata readTimeseriesMetadata(
374
      String device, String measurement, boolean ignoreNotExists) throws IOException {
375
    readFileMetadata();
1✔
376
    MetadataIndexNode deviceMetadataIndexNode = tsFileMetaData.getMetadataIndex();
1✔
377
    Pair<MetadataIndexEntry, Long> metadataIndexPair =
1✔
378
        getMetadataAndEndOffset(deviceMetadataIndexNode, device, true, true);
1✔
379
    if (metadataIndexPair == null) {
1✔
380
      if (ignoreNotExists) {
×
381
        return null;
×
382
      }
383
      throw new IOException("Device {" + device + "} is not in tsFileMetaData");
×
384
    }
385
    ByteBuffer buffer = readData(metadataIndexPair.left.getOffset(), metadataIndexPair.right);
1✔
386
    MetadataIndexNode metadataIndexNode = deviceMetadataIndexNode;
1✔
387
    if (!metadataIndexNode.getNodeType().equals(MetadataIndexNodeType.LEAF_MEASUREMENT)) {
1✔
388
      try {
389
        metadataIndexNode = MetadataIndexNode.deserializeFrom(buffer);
1✔
390
      } catch (Exception e) {
×
391
        logger.error(METADATA_INDEX_NODE_DESERIALIZE_ERROR, file);
×
392
        throw e;
×
393
      }
1✔
394
      metadataIndexPair = getMetadataAndEndOffset(metadataIndexNode, measurement, false, false);
1✔
395
    }
396
    if (metadataIndexPair == null) {
1✔
397
      return null;
×
398
    }
399
    List<TimeseriesMetadata> timeseriesMetadataList = new ArrayList<>();
1✔
400
    buffer = readData(metadataIndexPair.left.getOffset(), metadataIndexPair.right);
1✔
401
    while (buffer.hasRemaining()) {
1✔
402
      try {
403
        timeseriesMetadataList.add(TimeseriesMetadata.deserializeFrom(buffer, true));
1✔
404
      } catch (Exception e) {
×
405
        logger.error(
×
406
            "Something error happened while deserializing TimeseriesMetadata of file {}", file);
407
        throw e;
×
408
      }
1✔
409
    }
410
    // return null if path does not exist in the TsFile
411
    int searchResult = binarySearchInTimeseriesMetadataList(timeseriesMetadataList, measurement);
1✔
412
    return searchResult >= 0 ? timeseriesMetadataList.get(searchResult) : null;
1✔
413
  }
414

415
  // This method is only used for TsFile
416
  public ITimeSeriesMetadata readITimeseriesMetadata(Path path, boolean ignoreNotExists)
417
      throws IOException {
418
    readFileMetadata();
1✔
419
    MetadataIndexNode deviceMetadataIndexNode = tsFileMetaData.getMetadataIndex();
1✔
420
    Pair<MetadataIndexEntry, Long> metadataIndexPair =
1✔
421
        getMetadataAndEndOffset(deviceMetadataIndexNode, path.getDevice(), true, true);
1✔
422
    if (metadataIndexPair == null) {
1✔
423
      if (ignoreNotExists) {
×
424
        return null;
×
425
      }
426
      throw new IOException("Device {" + path.getDevice() + "} is not in tsFileMetaData");
×
427
    }
428
    ByteBuffer buffer = readData(metadataIndexPair.left.getOffset(), metadataIndexPair.right);
1✔
429
    MetadataIndexNode metadataIndexNode;
430
    TimeseriesMetadata firstTimeseriesMetadata;
431
    try {
432
      // next layer MeasurementNode of the specific DeviceNode
433
      metadataIndexNode = MetadataIndexNode.deserializeFrom(buffer);
1✔
434
    } catch (Exception e) {
×
435
      logger.error(METADATA_INDEX_NODE_DESERIALIZE_ERROR, file);
×
436
      throw e;
×
437
    }
1✔
438
    firstTimeseriesMetadata = tryToGetFirstTimeseriesMetadata(metadataIndexNode);
1✔
439
    metadataIndexPair =
1✔
440
        getMetadataAndEndOffset(metadataIndexNode, path.getMeasurement(), false, false);
1✔
441

442
    if (metadataIndexPair == null) {
1✔
443
      return null;
×
444
    }
445
    List<TimeseriesMetadata> timeseriesMetadataList = new ArrayList<>();
1✔
446
    buffer = readData(metadataIndexPair.left.getOffset(), metadataIndexPair.right);
1✔
447
    while (buffer.hasRemaining()) {
1✔
448
      try {
449
        timeseriesMetadataList.add(TimeseriesMetadata.deserializeFrom(buffer, true));
1✔
450
      } catch (Exception e) {
×
451
        logger.error(
×
452
            "Something error happened while deserializing TimeseriesMetadata of file {}", file);
453
        throw e;
×
454
      }
1✔
455
    }
456
    // return null if path does not exist in the TsFile
457
    int searchResult =
1✔
458
        binarySearchInTimeseriesMetadataList(timeseriesMetadataList, path.getMeasurement());
1✔
459
    if (searchResult >= 0) {
1✔
460
      if (firstTimeseriesMetadata != null) {
1✔
461
        List<TimeseriesMetadata> valueTimeseriesMetadataList = new ArrayList<>();
1✔
462
        valueTimeseriesMetadataList.add(timeseriesMetadataList.get(searchResult));
1✔
463
        return new AlignedTimeSeriesMetadata(firstTimeseriesMetadata, valueTimeseriesMetadataList);
1✔
464
      } else {
465
        return timeseriesMetadataList.get(searchResult);
1✔
466
      }
467
    } else {
468
      return null;
1✔
469
    }
470
  }
471

472
  /* Find the leaf node that contains path, return all the sensors in that leaf node which are also in allSensors set */
473
  public List<TimeseriesMetadata> readTimeseriesMetadata(
474
      String device, String measurement, Set<String> allSensors) throws IOException {
475
    Pair<MetadataIndexEntry, Long> metadataIndexPair =
1✔
476
        getLeafMetadataIndexPair(device, measurement);
1✔
477
    if (metadataIndexPair == null) {
1✔
478
      return Collections.emptyList();
×
479
    }
480
    List<TimeseriesMetadata> timeseriesMetadataList = new ArrayList<>();
1✔
481

482
    ByteBuffer buffer = readData(metadataIndexPair.left.getOffset(), metadataIndexPair.right);
1✔
483
    while (buffer.hasRemaining()) {
1✔
484
      TimeseriesMetadata timeseriesMetadata;
485
      try {
486
        timeseriesMetadata = TimeseriesMetadata.deserializeFrom(buffer, true);
1✔
487
      } catch (Exception e) {
×
488
        logger.error(
×
489
            "Something error happened while deserializing TimeseriesMetadata of file {}", file);
490
        throw e;
×
491
      }
1✔
492
      if (allSensors.contains(timeseriesMetadata.getMeasurementId())) {
1✔
493
        timeseriesMetadataList.add(timeseriesMetadata);
1✔
494
      }
495
    }
1✔
496
    return timeseriesMetadataList;
1✔
497
  }
498

499
  /* Get leaf MetadataIndexPair which contains path */
500
  private Pair<MetadataIndexEntry, Long> getLeafMetadataIndexPair(String device, String measurement)
501
      throws IOException {
502
    readFileMetadata();
1✔
503
    MetadataIndexNode deviceMetadataIndexNode = tsFileMetaData.getMetadataIndex();
1✔
504
    Pair<MetadataIndexEntry, Long> metadataIndexPair =
1✔
505
        getMetadataAndEndOffset(deviceMetadataIndexNode, device, true, true);
1✔
506
    if (metadataIndexPair == null) {
1✔
507
      return null;
×
508
    }
509
    ByteBuffer buffer = readData(metadataIndexPair.left.getOffset(), metadataIndexPair.right);
1✔
510
    MetadataIndexNode metadataIndexNode = deviceMetadataIndexNode;
1✔
511
    if (!metadataIndexNode.getNodeType().equals(MetadataIndexNodeType.LEAF_MEASUREMENT)) {
1✔
512
      try {
513
        metadataIndexNode = MetadataIndexNode.deserializeFrom(buffer);
1✔
514
      } catch (Exception e) {
×
515
        logger.error(METADATA_INDEX_NODE_DESERIALIZE_ERROR, file);
×
516
        throw e;
×
517
      }
1✔
518
      metadataIndexPair = getMetadataAndEndOffset(metadataIndexNode, measurement, false, false);
1✔
519
    }
520
    return metadataIndexPair;
1✔
521
  }
522

523
  // This method is only used for TsFile
524
  public List<ITimeSeriesMetadata> readITimeseriesMetadata(String device, Set<String> measurements)
525
      throws IOException {
526
    readFileMetadata();
1✔
527
    MetadataIndexNode deviceMetadataIndexNode = tsFileMetaData.getMetadataIndex();
1✔
528
    Pair<MetadataIndexEntry, Long> metadataIndexPair =
1✔
529
        getMetadataAndEndOffset(deviceMetadataIndexNode, device, true, false);
1✔
530
    if (metadataIndexPair == null) {
1✔
531
      return Collections.emptyList();
×
532
    }
533
    List<ITimeSeriesMetadata> resultTimeseriesMetadataList = new ArrayList<>();
1✔
534
    List<String> measurementList = new ArrayList<>(measurements);
1✔
535
    Set<String> measurementsHadFound = new HashSet<>();
1✔
536
    // the content of next Layer MeasurementNode of the specific device's DeviceNode
537
    ByteBuffer buffer = readData(metadataIndexPair.left.getOffset(), metadataIndexPair.right);
1✔
538
    Pair<MetadataIndexEntry, Long> measurementMetadataIndexPair = metadataIndexPair;
1✔
539
    List<TimeseriesMetadata> timeseriesMetadataList = new ArrayList<>();
1✔
540

541
    // next layer MeasurementNode of the specific DeviceNode
542
    MetadataIndexNode measurementMetadataIndexNode;
543
    try {
544
      measurementMetadataIndexNode = MetadataIndexNode.deserializeFrom(buffer);
1✔
545
    } catch (Exception e) {
×
546
      logger.error(METADATA_INDEX_NODE_DESERIALIZE_ERROR, file);
×
547
      throw e;
×
548
    }
1✔
549
    // Get the first timeseriesMetadata of the device
550
    TimeseriesMetadata firstTimeseriesMetadata =
1✔
551
        tryToGetFirstTimeseriesMetadata(measurementMetadataIndexNode);
1✔
552

553
    for (int i = 0; i < measurementList.size(); i++) {
1✔
554
      if (measurementsHadFound.contains(measurementList.get(i))) {
1✔
555
        continue;
×
556
      }
557
      timeseriesMetadataList.clear();
1✔
558
      measurementMetadataIndexPair =
1✔
559
          getMetadataAndEndOffset(
1✔
560
              measurementMetadataIndexNode, measurementList.get(i), false, false);
1✔
561

562
      if (measurementMetadataIndexPair == null) {
1✔
563
        continue;
×
564
      }
565
      // the content of TimeseriesNode of the specific MeasurementLeafNode
566
      buffer =
1✔
567
          readData(
1✔
568
              measurementMetadataIndexPair.left.getOffset(), measurementMetadataIndexPair.right);
1✔
569
      while (buffer.hasRemaining()) {
1✔
570
        try {
571
          timeseriesMetadataList.add(TimeseriesMetadata.deserializeFrom(buffer, true));
1✔
572
        } catch (Exception e) {
×
573
          logger.error(
×
574
              "Something error happened while deserializing TimeseriesMetadata of file {}", file);
575
          throw e;
×
576
        }
1✔
577
      }
578
      for (int j = i; j < measurementList.size(); j++) {
1✔
579
        String current = measurementList.get(j);
1✔
580
        if (!measurementsHadFound.contains(current)) {
1✔
581
          int searchResult = binarySearchInTimeseriesMetadataList(timeseriesMetadataList, current);
1✔
582
          if (searchResult >= 0) {
1✔
583
            if (firstTimeseriesMetadata != null) {
1✔
584
              List<TimeseriesMetadata> valueTimeseriesMetadataList = new ArrayList<>();
1✔
585
              valueTimeseriesMetadataList.add(timeseriesMetadataList.get(searchResult));
1✔
586
              resultTimeseriesMetadataList.add(
1✔
587
                  new AlignedTimeSeriesMetadata(
588
                      firstTimeseriesMetadata, valueTimeseriesMetadataList));
589
            } else {
1✔
590
              resultTimeseriesMetadataList.add(timeseriesMetadataList.get(searchResult));
1✔
591
            }
592
            measurementsHadFound.add(current);
1✔
593
          }
594
        }
595
        if (measurementsHadFound.size() == measurements.size()) {
1✔
596
          return resultTimeseriesMetadataList;
1✔
597
        }
598
      }
599
    }
600
    return resultTimeseriesMetadataList;
×
601
  }
602

603
  protected int binarySearchInTimeseriesMetadataList(
604
      List<TimeseriesMetadata> timeseriesMetadataList, String key) {
605
    int low = 0;
1✔
606
    int high = timeseriesMetadataList.size() - 1;
1✔
607

608
    while (low <= high) {
1✔
609
      int mid = (low + high) >>> 1;
1✔
610
      TimeseriesMetadata midVal = timeseriesMetadataList.get(mid);
1✔
611
      int cmp = midVal.getMeasurementId().compareTo(key);
1✔
612

613
      if (cmp < 0) {
1✔
614
        low = mid + 1;
1✔
615
      } else if (cmp > 0) {
1✔
616
        high = mid - 1;
1✔
617
      } else {
618
        return mid; // key found
1✔
619
      }
620
    }
1✔
621
    return -1; // key not found
1✔
622
  }
623

624
  public List<String> getAllDevices() throws IOException {
625
    if (tsFileMetaData == null) {
1✔
626
      readFileMetadata();
1✔
627
    }
628
    return getAllDevices(tsFileMetaData.getMetadataIndex());
1✔
629
  }
630

631
  private List<String> getAllDevices(MetadataIndexNode metadataIndexNode) throws IOException {
632
    List<String> deviceList = new ArrayList<>();
1✔
633
    // if metadataIndexNode is LEAF_DEVICE, put all devices in node entry into the list
634
    if (metadataIndexNode.getNodeType().equals(MetadataIndexNodeType.LEAF_DEVICE)) {
1✔
635
      deviceList.addAll(
1✔
636
          metadataIndexNode.getChildren().stream()
1✔
637
              .map(x -> x.getName().intern())
1✔
638
              .collect(Collectors.toList()));
1✔
639
      return deviceList;
1✔
640
    }
641

642
    int metadataIndexListSize = metadataIndexNode.getChildren().size();
1✔
643
    for (int i = 0; i < metadataIndexListSize; i++) {
1✔
644
      long endOffset = metadataIndexNode.getEndOffset();
1✔
645
      if (i != metadataIndexListSize - 1) {
1✔
646
        endOffset = metadataIndexNode.getChildren().get(i + 1).getOffset();
1✔
647
      }
648
      ByteBuffer buffer = readData(metadataIndexNode.getChildren().get(i).getOffset(), endOffset);
1✔
649
      MetadataIndexNode node = MetadataIndexNode.deserializeFrom(buffer);
1✔
650
      deviceList.addAll(getAllDevices(node));
1✔
651
    }
652
    return deviceList;
1✔
653
  }
654

655
  /**
656
   * @return an iterator of "device, isAligned" list, in which names of devices are ordered in
657
   *     dictionary order, and isAligned represents whether the device is aligned. Only read devices
658
   *     on one device leaf node each time to save memory.
659
   */
660
  public TsFileDeviceIterator getAllDevicesIteratorWithIsAligned() throws IOException {
661
    readFileMetadata();
1✔
662
    Queue<Pair<String, long[]>> queue = new LinkedList<>();
1✔
663
    List<long[]> leafDeviceNodeOffsets = new ArrayList<>();
1✔
664
    MetadataIndexNode metadataIndexNode = tsFileMetaData.getMetadataIndex();
1✔
665
    if (metadataIndexNode.getNodeType().equals(MetadataIndexNodeType.LEAF_DEVICE)) {
1✔
666
      // the first node of index tree is device leaf node, then get the devices directly
667
      getDevicesOfLeafNode(metadataIndexNode, queue);
1✔
668
    } else {
669
      // get all device leaf node offset
670
      getAllDeviceLeafNodeOffset(metadataIndexNode, leafDeviceNodeOffsets);
1✔
671
    }
672

673
    return new TsFileDeviceIterator(this, leafDeviceNodeOffsets, queue);
1✔
674
  }
675

676
  /**
677
   * Get devices and first measurement node offset.
678
   *
679
   * @param startOffset start offset of device leaf node
680
   * @param endOffset end offset of device leaf node
681
   * @param measurementNodeOffsetQueue device -> first measurement node offset
682
   */
683
  public void getDevicesAndEntriesOfOneLeafNode(
684
      Long startOffset, Long endOffset, Queue<Pair<String, long[]>> measurementNodeOffsetQueue)
685
      throws IOException {
686
    try {
687
      ByteBuffer nextBuffer = readData(startOffset, endOffset);
1✔
688
      MetadataIndexNode deviceLeafNode = MetadataIndexNode.deserializeFrom(nextBuffer);
1✔
689
      getDevicesOfLeafNode(deviceLeafNode, measurementNodeOffsetQueue);
1✔
690
    } catch (Exception e) {
×
691
      logger.error("Something error happened while getting all devices of file {}", file);
×
692
      throw e;
×
693
    }
1✔
694
  }
1✔
695

696
  /**
697
   * Get all devices and its corresponding entries on the specific device leaf node.
698
   *
699
   * @param deviceLeafNode this node must be device leaf node
700
   */
701
  private void getDevicesOfLeafNode(
702
      MetadataIndexNode deviceLeafNode, Queue<Pair<String, long[]>> measurementNodeOffsetQueue) {
703
    if (!deviceLeafNode.getNodeType().equals(MetadataIndexNodeType.LEAF_DEVICE)) {
1✔
704
      throw new IllegalStateException("the first param should be device leaf node.");
×
705
    }
706
    List<MetadataIndexEntry> childrenEntries = deviceLeafNode.getChildren();
1✔
707
    for (int i = 0; i < childrenEntries.size(); i++) {
1✔
708
      MetadataIndexEntry deviceEntry = childrenEntries.get(i);
1✔
709
      long childStartOffset = deviceEntry.getOffset();
1✔
710
      long childEndOffset =
711
          i == childrenEntries.size() - 1
1✔
712
              ? deviceLeafNode.getEndOffset()
1✔
713
              : childrenEntries.get(i + 1).getOffset();
1✔
714
      long[] offset = {childStartOffset, childEndOffset};
1✔
715
      measurementNodeOffsetQueue.add(new Pair<>(deviceEntry.getName(), offset));
1✔
716
    }
717
  }
1✔
718

719
  /**
720
   * Get the device leaf node offset under the specific device internal node.
721
   *
722
   * @param deviceInternalNode this node must be device internal node
723
   */
724
  private void getAllDeviceLeafNodeOffset(
725
      MetadataIndexNode deviceInternalNode, List<long[]> leafDeviceNodeOffsets) throws IOException {
726
    if (!deviceInternalNode.getNodeType().equals(MetadataIndexNodeType.INTERNAL_DEVICE)) {
1✔
727
      throw new IllegalStateException("the first param should be device internal node.");
×
728
    }
729
    try {
730
      int metadataIndexListSize = deviceInternalNode.getChildren().size();
1✔
731
      boolean isCurrentLayerLeafNode = false;
1✔
732
      for (int i = 0; i < metadataIndexListSize; i++) {
1✔
733
        MetadataIndexEntry entry = deviceInternalNode.getChildren().get(i);
1✔
734
        long startOffset = entry.getOffset();
1✔
735
        long endOffset = deviceInternalNode.getEndOffset();
1✔
736
        if (i != metadataIndexListSize - 1) {
1✔
737
          endOffset = deviceInternalNode.getChildren().get(i + 1).getOffset();
1✔
738
        }
739
        if (i == 0) {
1✔
740
          // check is current layer device leaf node or device internal node. Just need to check the
741
          // first entry, because the rest are the same
742
          MetadataIndexNodeType nodeType =
1✔
743
              MetadataIndexNodeType.deserialize(
1✔
744
                  ReadWriteIOUtils.readByte(readData(endOffset - 1, endOffset)));
1✔
745
          isCurrentLayerLeafNode = nodeType.equals(MetadataIndexNodeType.LEAF_DEVICE);
1✔
746
        }
747
        if (isCurrentLayerLeafNode) {
1✔
748
          // is device leaf node
749
          long[] offset = {startOffset, endOffset};
1✔
750
          leafDeviceNodeOffsets.add(offset);
1✔
751
          continue;
1✔
752
        }
753
        ByteBuffer nextBuffer = readData(startOffset, endOffset);
1✔
754
        getAllDeviceLeafNodeOffset(
1✔
755
            MetadataIndexNode.deserializeFrom(nextBuffer), leafDeviceNodeOffsets);
1✔
756
      }
757
    } catch (Exception e) {
×
758
      logger.error("Something error happened while getting all devices of file {}", file);
×
759
      throw e;
×
760
    }
1✔
761
  }
1✔
762

763
  /**
764
   * read all ChunkMetaDatas of given device
765
   *
766
   * @param device name
767
   * @return measurement -> ChunkMetadata list
768
   * @throws IOException io error
769
   */
770
  public Map<String, List<ChunkMetadata>> readChunkMetadataInDevice(String device)
771
      throws IOException {
772
    readFileMetadata();
1✔
773
    List<TimeseriesMetadata> timeseriesMetadataMap = getDeviceTimeseriesMetadata(device);
1✔
774
    if (timeseriesMetadataMap.isEmpty()) {
1✔
775
      return new HashMap<>();
1✔
776
    }
777
    Map<String, List<ChunkMetadata>> seriesMetadata = new LinkedHashMap<>();
1✔
778
    for (TimeseriesMetadata timeseriesMetadata : timeseriesMetadataMap) {
1✔
779
      seriesMetadata.put(
1✔
780
          timeseriesMetadata.getMeasurementId(),
1✔
781
          timeseriesMetadata.getChunkMetadataList().stream()
1✔
782
              .map(chunkMetadata -> ((ChunkMetadata) chunkMetadata))
1✔
783
              .collect(Collectors.toList()));
1✔
784
    }
1✔
785
    return seriesMetadata;
1✔
786
  }
787

788
  /**
789
   * this function return all timeseries names
790
   *
791
   * @return list of Paths
792
   * @throws IOException io error
793
   */
794
  public List<Path> getAllPaths() throws IOException {
795
    List<Path> paths = new ArrayList<>();
1✔
796
    for (String device : getAllDevices()) {
1✔
797
      Map<String, TimeseriesMetadata> timeseriesMetadataMap = readDeviceMetadata(device);
1✔
798
      for (String measurementId : timeseriesMetadataMap.keySet()) {
1✔
799
        paths.add(new Path(device, measurementId, true));
1✔
800
      }
1✔
801
    }
1✔
802
    return paths;
1✔
803
  }
804

805
  /**
806
   * @return an iterator of timeseries list, in which names of timeseries are ordered in dictionary
807
   *     order
808
   * @throws IOException io error
809
   */
810
  public Iterator<List<Path>> getPathsIterator() throws IOException {
811
    readFileMetadata();
1✔
812

813
    MetadataIndexNode metadataIndexNode = tsFileMetaData.getMetadataIndex();
1✔
814
    List<MetadataIndexEntry> metadataIndexEntryList = metadataIndexNode.getChildren();
1✔
815
    Queue<Pair<String, Pair<Long, Long>>> queue = new LinkedList<>();
1✔
816
    for (int i = 0; i < metadataIndexEntryList.size(); i++) {
1✔
817
      MetadataIndexEntry metadataIndexEntry = metadataIndexEntryList.get(i);
1✔
818
      long endOffset = metadataIndexNode.getEndOffset();
1✔
819
      if (i != metadataIndexEntryList.size() - 1) {
1✔
820
        endOffset = metadataIndexEntryList.get(i + 1).getOffset();
1✔
821
      }
822
      ByteBuffer buffer = readData(metadataIndexEntry.getOffset(), endOffset);
1✔
823
      getAllPaths(metadataIndexEntry, buffer, null, metadataIndexNode.getNodeType(), queue);
1✔
824
    }
825
    return new Iterator<List<Path>>() {
1✔
826
      @Override
827
      public boolean hasNext() {
828
        return !queue.isEmpty();
1✔
829
      }
830

831
      @Override
832
      public List<Path> next() {
833
        if (!hasNext()) {
1✔
834
          throw new NoSuchElementException();
×
835
        }
836
        Pair<String, Pair<Long, Long>> startEndPair = queue.remove();
1✔
837
        List<Path> paths = new ArrayList<>();
1✔
838
        try {
839
          ByteBuffer nextBuffer = readData(startEndPair.right.left, startEndPair.right.right);
1✔
840
          while (nextBuffer.hasRemaining()) {
1✔
841
            paths.add(
1✔
842
                new Path(
843
                    startEndPair.left,
844
                    TimeseriesMetadata.deserializeFrom(nextBuffer, false).getMeasurementId(),
1✔
845
                    true));
846
          }
847
          return paths;
1✔
848
        } catch (IOException e) {
×
849
          throw new TsFileRuntimeException(
×
850
              "Error occurred while reading a time series metadata block.");
851
        }
852
      }
853
    };
854
  }
855

856
  private void getAllPaths(
857
      MetadataIndexEntry metadataIndex,
858
      ByteBuffer buffer,
859
      String deviceId,
860
      MetadataIndexNodeType type,
861
      Queue<Pair<String, Pair<Long, Long>>> queue)
862
      throws IOException {
863
    try {
864
      if (type.equals(MetadataIndexNodeType.LEAF_DEVICE)) {
1✔
865
        deviceId = metadataIndex.getName();
1✔
866
      }
867
      MetadataIndexNode metadataIndexNode = MetadataIndexNode.deserializeFrom(buffer);
1✔
868
      int metadataIndexListSize = metadataIndexNode.getChildren().size();
1✔
869
      for (int i = 0; i < metadataIndexListSize; i++) {
1✔
870
        long startOffset = metadataIndexNode.getChildren().get(i).getOffset();
1✔
871
        long endOffset = metadataIndexNode.getEndOffset();
1✔
872
        if (i != metadataIndexListSize - 1) {
1✔
873
          endOffset = metadataIndexNode.getChildren().get(i + 1).getOffset();
1✔
874
        }
875
        if (metadataIndexNode.getNodeType().equals(MetadataIndexNodeType.LEAF_MEASUREMENT)) {
1✔
876
          queue.add(new Pair<>(deviceId, new Pair<>(startOffset, endOffset)));
1✔
877
          continue;
1✔
878
        }
879
        ByteBuffer nextBuffer =
1✔
880
            readData(metadataIndexNode.getChildren().get(i).getOffset(), endOffset);
1✔
881
        getAllPaths(
1✔
882
            metadataIndexNode.getChildren().get(i),
1✔
883
            nextBuffer,
884
            deviceId,
885
            metadataIndexNode.getNodeType(),
1✔
886
            queue);
887
      }
888
    } catch (Exception e) {
×
889
      logger.error("Something error happened while getting all paths of file {}", file);
×
890
      throw e;
×
891
    }
1✔
892
  }
1✔
893

894
  /**
895
   * Check whether the deivce is aligned or not.
896
   *
897
   * @param measurementNode the next measurement layer node of specific device node
898
   */
899
  public boolean isAlignedDevice(MetadataIndexNode measurementNode) {
900
    return "".equals(measurementNode.getChildren().get(0).getName());
1✔
901
  }
902

903
  TimeseriesMetadata tryToGetFirstTimeseriesMetadata(MetadataIndexNode measurementNode)
904
      throws IOException {
905
    // Not aligned timeseries
906
    if (!"".equals(measurementNode.getChildren().get(0).getName())) {
1✔
907
      return null;
1✔
908
    }
909

910
    // Aligned timeseries
911
    if (measurementNode.getNodeType().equals(MetadataIndexNodeType.LEAF_MEASUREMENT)) {
1✔
912
      ByteBuffer buffer;
913
      if (measurementNode.getChildren().size() > 1) {
1✔
914
        buffer =
1✔
915
            readData(
1✔
916
                measurementNode.getChildren().get(0).getOffset(),
1✔
917
                measurementNode.getChildren().get(1).getOffset());
1✔
918
      } else {
919
        buffer =
1✔
920
            readData(
1✔
921
                measurementNode.getChildren().get(0).getOffset(), measurementNode.getEndOffset());
1✔
922
      }
923
      return TimeseriesMetadata.deserializeFrom(buffer, true);
1✔
924
    } else if (measurementNode.getNodeType().equals(MetadataIndexNodeType.INTERNAL_MEASUREMENT)) {
1✔
925
      ByteBuffer buffer =
1✔
926
          readData(
1✔
927
              measurementNode.getChildren().get(0).getOffset(),
1✔
928
              measurementNode.getChildren().get(1).getOffset());
1✔
929
      MetadataIndexNode metadataIndexNode = MetadataIndexNode.deserializeFrom(buffer);
1✔
930
      return tryToGetFirstTimeseriesMetadata(metadataIndexNode);
1✔
931
    }
932
    return null;
×
933
  }
934

935
  /**
936
   * Get the measurements of current device by its first measurement node. Also get the chunk
937
   * metadata list and timeseries metadata offset.
938
   *
939
   * @param measurementNode first measurement node of the device
940
   * @param excludedMeasurementIds do not deserialize chunk metadatas whose measurementId is in the
941
   *     set. Notice: It only takes effect when the needChunkMetadata parameter is true.
942
   * @param needChunkMetadata need to deserialize chunk metadatas or not
943
   * @return measurement -> chunk metadata list -> timeseries metadata <startOffset, endOffset>
944
   */
945
  public Map<String, Pair<List<IChunkMetadata>, Pair<Long, Long>>>
946
      getTimeseriesMetadataOffsetByDevice(
947
          MetadataIndexNode measurementNode,
948
          Set<String> excludedMeasurementIds,
949
          boolean needChunkMetadata)
950
          throws IOException {
951
    Map<String, Pair<List<IChunkMetadata>, Pair<Long, Long>>> timeseriesMetadataOffsetMap =
1✔
952
        new LinkedHashMap<>();
953
    List<MetadataIndexEntry> childrenEntryList = measurementNode.getChildren();
1✔
954
    for (int i = 0; i < childrenEntryList.size(); i++) {
1✔
955
      long startOffset = childrenEntryList.get(i).getOffset();
1✔
956
      long endOffset =
957
          i == childrenEntryList.size() - 1
1✔
958
              ? measurementNode.getEndOffset()
1✔
959
              : childrenEntryList.get(i + 1).getOffset();
1✔
960
      ByteBuffer nextBuffer = readData(startOffset, endOffset);
1✔
961
      if (measurementNode.getNodeType().equals(MetadataIndexNodeType.LEAF_MEASUREMENT)) {
1✔
962
        // leaf measurement node
963
        while (nextBuffer.hasRemaining()) {
1✔
964
          int metadataStartOffset = nextBuffer.position();
1✔
965
          TimeseriesMetadata timeseriesMetadata =
1✔
966
              TimeseriesMetadata.deserializeFrom(
1✔
967
                  nextBuffer, excludedMeasurementIds, needChunkMetadata);
968
          timeseriesMetadataOffsetMap.put(
1✔
969
              timeseriesMetadata.getMeasurementId(),
1✔
970
              new Pair<>(
971
                  timeseriesMetadata.getChunkMetadataList(),
1✔
972
                  new Pair<>(
973
                      startOffset + metadataStartOffset, startOffset + nextBuffer.position())));
1✔
974
        }
1✔
975

976
      } else {
977
        // internal measurement node
978
        MetadataIndexNode nextLayerMeasurementNode = MetadataIndexNode.deserializeFrom(nextBuffer);
1✔
979
        timeseriesMetadataOffsetMap.putAll(
1✔
980
            getTimeseriesMetadataOffsetByDevice(
1✔
981
                nextLayerMeasurementNode, excludedMeasurementIds, needChunkMetadata));
982
      }
983
    }
984
    return timeseriesMetadataOffsetMap;
1✔
985
  }
986

987
  /**
988
   * Get chunk metadata list by the start offset and end offset of the timeseries metadata.
989
   *
990
   * @param startOffset the start offset of timeseries metadata
991
   * @param endOffset the end offset of timeseries metadata
992
   */
993
  public List<IChunkMetadata> getChunkMetadataListByTimeseriesMetadataOffset(
994
      long startOffset, long endOffset) throws IOException {
995
    ByteBuffer timeseriesMetadataBuffer = readData(startOffset, endOffset);
1✔
996

997
    TimeseriesMetadata timeseriesMetadata =
1✔
998
        TimeseriesMetadata.deserializeFrom(timeseriesMetadataBuffer, true);
1✔
999
    return timeseriesMetadata.getChunkMetadataList();
1✔
1000
  }
1001

1002
  /**
1003
   * Get timeseries metadata under the measurementNode and put them into timeseriesMetadataList.
1004
   * Skip timeseries whose measurementId is in the excludedMeasurementIds.
1005
   *
1006
   * @param measurementNode next layer measurement node of specific device leaf node
1007
   * @param excludedMeasurementIds skip timeseries whose measurementId is in the set
1008
   */
1009
  public void getDeviceTimeseriesMetadata(
1010
      List<TimeseriesMetadata> timeseriesMetadataList,
1011
      MetadataIndexNode measurementNode,
1012
      Set<String> excludedMeasurementIds,
1013
      boolean needChunkMetadata)
1014
      throws IOException {
1015
    int metadataIndexListSize = measurementNode.getChildren().size();
1✔
1016
    for (int i = 0; i < metadataIndexListSize; i++) {
1✔
1017
      long endOffset = measurementNode.getEndOffset();
1✔
1018
      if (i != metadataIndexListSize - 1) {
1✔
1019
        endOffset = measurementNode.getChildren().get(i + 1).getOffset();
1✔
1020
      }
1021
      ByteBuffer nextBuffer = readData(measurementNode.getChildren().get(i).getOffset(), endOffset);
1✔
1022
      if (measurementNode.getNodeType().equals(MetadataIndexNodeType.LEAF_MEASUREMENT)) {
1✔
1023
        // leaf measurement node
1024
        while (nextBuffer.hasRemaining()) {
1✔
1025
          TimeseriesMetadata timeseriesMetadata =
1✔
1026
              TimeseriesMetadata.deserializeFrom(
1✔
1027
                  nextBuffer, excludedMeasurementIds, needChunkMetadata);
1028
          if (!excludedMeasurementIds.contains(timeseriesMetadata.getMeasurementId())) {
1✔
1029
            timeseriesMetadataList.add(timeseriesMetadata);
1✔
1030
          }
1031
        }
1✔
1032
      } else {
1033
        // internal measurement node
1034
        MetadataIndexNode nextLayerMeasurementNode = MetadataIndexNode.deserializeFrom(nextBuffer);
1✔
1035
        getDeviceTimeseriesMetadata(
1✔
1036
            timeseriesMetadataList,
1037
            nextLayerMeasurementNode,
1038
            excludedMeasurementIds,
1039
            needChunkMetadata);
1040
      }
1041
    }
1042
  }
1✔
1043

1044
  /**
1045
   * Traverse the metadata index from MetadataIndexEntry to get TimeseriesMetadatas
1046
   *
1047
   * @param metadataIndex MetadataIndexEntry
1048
   * @param buffer byte buffer
1049
   * @param deviceId String
1050
   * @param timeseriesMetadataMap map: deviceId -> timeseriesMetadata list
1051
   * @param needChunkMetadata deserialize chunk metadata list or not
1052
   */
1053
  private void generateMetadataIndex(
1054
      MetadataIndexEntry metadataIndex,
1055
      ByteBuffer buffer,
1056
      String deviceId,
1057
      MetadataIndexNodeType type,
1058
      Map<String, List<TimeseriesMetadata>> timeseriesMetadataMap,
1059
      boolean needChunkMetadata)
1060
      throws IOException {
1061
    try {
1062
      if (type.equals(MetadataIndexNodeType.LEAF_MEASUREMENT)) {
1✔
1063
        List<TimeseriesMetadata> timeseriesMetadataList = new ArrayList<>();
1✔
1064
        while (buffer.hasRemaining()) {
1✔
1065
          timeseriesMetadataList.add(TimeseriesMetadata.deserializeFrom(buffer, needChunkMetadata));
1✔
1066
        }
1067
        timeseriesMetadataMap
1✔
1068
            .computeIfAbsent(deviceId, k -> new ArrayList<>())
1✔
1069
            .addAll(timeseriesMetadataList);
1✔
1070
      } else {
1✔
1071
        // deviceId should be determined by LEAF_DEVICE node
1072
        if (type.equals(MetadataIndexNodeType.LEAF_DEVICE)) {
1✔
1073
          deviceId = metadataIndex.getName();
1✔
1074
        }
1075
        MetadataIndexNode metadataIndexNode = MetadataIndexNode.deserializeFrom(buffer);
1✔
1076
        int metadataIndexListSize = metadataIndexNode.getChildren().size();
1✔
1077
        for (int i = 0; i < metadataIndexListSize; i++) {
1✔
1078
          long endOffset = metadataIndexNode.getEndOffset();
1✔
1079
          if (i != metadataIndexListSize - 1) {
1✔
1080
            endOffset = metadataIndexNode.getChildren().get(i + 1).getOffset();
1✔
1081
          }
1082
          ByteBuffer nextBuffer =
1✔
1083
              readData(metadataIndexNode.getChildren().get(i).getOffset(), endOffset);
1✔
1084
          generateMetadataIndex(
1✔
1085
              metadataIndexNode.getChildren().get(i),
1✔
1086
              nextBuffer,
1087
              deviceId,
1088
              metadataIndexNode.getNodeType(),
1✔
1089
              timeseriesMetadataMap,
1090
              needChunkMetadata);
1091
        }
1092
      }
1093
    } catch (Exception e) {
1✔
1094
      logger.error("Something error happened while generating MetadataIndex of file {}", file);
1✔
1095
      throw e;
1✔
1096
    }
1✔
1097
  }
1✔
1098

1099
  /* TimeseriesMetadata don't need deserialize chunk metadata list */
1100
  public Map<String, List<TimeseriesMetadata>> getAllTimeseriesMetadata(boolean needChunkMetadata)
1101
      throws IOException {
1102
    if (tsFileMetaData == null) {
1✔
1103
      readFileMetadata();
1✔
1104
    }
1105
    Map<String, List<TimeseriesMetadata>> timeseriesMetadataMap = new HashMap<>();
1✔
1106
    MetadataIndexNode metadataIndexNode = tsFileMetaData.getMetadataIndex();
1✔
1107
    List<MetadataIndexEntry> metadataIndexEntryList = metadataIndexNode.getChildren();
1✔
1108
    for (int i = 0; i < metadataIndexEntryList.size(); i++) {
1✔
1109
      MetadataIndexEntry metadataIndexEntry = metadataIndexEntryList.get(i);
1✔
1110
      long endOffset = metadataIndexNode.getEndOffset();
1✔
1111
      if (i != metadataIndexEntryList.size() - 1) {
1✔
1112
        endOffset = metadataIndexEntryList.get(i + 1).getOffset();
1✔
1113
      }
1114
      ByteBuffer buffer = readData(metadataIndexEntry.getOffset(), endOffset);
1✔
1115
      generateMetadataIndex(
1✔
1116
          metadataIndexEntry,
1117
          buffer,
1118
          null,
1119
          metadataIndexNode.getNodeType(),
1✔
1120
          timeseriesMetadataMap,
1121
          needChunkMetadata);
1122
    }
1123
    return timeseriesMetadataMap;
1✔
1124
  }
1125

1126
  /* This method will only deserialize the TimeseriesMetadata, not including chunk metadata list */
1127
  private List<TimeseriesMetadata> getDeviceTimeseriesMetadataWithoutChunkMetadata(String device)
1128
      throws IOException {
1129
    MetadataIndexNode metadataIndexNode = tsFileMetaData.getMetadataIndex();
1✔
1130
    Pair<MetadataIndexEntry, Long> metadataIndexPair =
1✔
1131
        getMetadataAndEndOffset(metadataIndexNode, device, true, true);
1✔
1132
    if (metadataIndexPair == null) {
1✔
1133
      return Collections.emptyList();
1✔
1134
    }
1135
    ByteBuffer buffer = readData(metadataIndexPair.left.getOffset(), metadataIndexPair.right);
1✔
1136
    Map<String, List<TimeseriesMetadata>> timeseriesMetadataMap = new TreeMap<>();
1✔
1137
    generateMetadataIndex(
1✔
1138
        metadataIndexPair.left,
1139
        buffer,
1140
        device,
1141
        MetadataIndexNodeType.INTERNAL_MEASUREMENT,
1142
        timeseriesMetadataMap,
1143
        false);
1144
    List<TimeseriesMetadata> deviceTimeseriesMetadata = new ArrayList<>();
1✔
1145
    for (List<TimeseriesMetadata> timeseriesMetadataList : timeseriesMetadataMap.values()) {
1✔
1146
      deviceTimeseriesMetadata.addAll(timeseriesMetadataList);
1✔
1147
    }
1✔
1148
    return deviceTimeseriesMetadata;
1✔
1149
  }
1150

1151
  /* This method will not only deserialize the TimeseriesMetadata, but also all the chunk metadata list meanwhile. */
1152
  private List<TimeseriesMetadata> getDeviceTimeseriesMetadata(String device) throws IOException {
1153
    MetadataIndexNode metadataIndexNode = tsFileMetaData.getMetadataIndex();
1✔
1154
    Pair<MetadataIndexEntry, Long> metadataIndexPair =
1✔
1155
        getMetadataAndEndOffset(metadataIndexNode, device, true, true);
1✔
1156
    if (metadataIndexPair == null) {
1✔
1157
      return Collections.emptyList();
1✔
1158
    }
1159
    ByteBuffer buffer = readData(metadataIndexPair.left.getOffset(), metadataIndexPair.right);
1✔
1160
    Map<String, List<TimeseriesMetadata>> timeseriesMetadataMap = new TreeMap<>();
1✔
1161
    generateMetadataIndex(
1✔
1162
        metadataIndexPair.left,
1163
        buffer,
1164
        device,
1165
        MetadataIndexNodeType.INTERNAL_MEASUREMENT,
1166
        timeseriesMetadataMap,
1167
        true);
1168
    List<TimeseriesMetadata> deviceTimeseriesMetadata = new ArrayList<>();
1✔
1169
    for (List<TimeseriesMetadata> timeseriesMetadataList : timeseriesMetadataMap.values()) {
1✔
1170
      deviceTimeseriesMetadata.addAll(timeseriesMetadataList);
1✔
1171
    }
1✔
1172
    return deviceTimeseriesMetadata;
1✔
1173
  }
1174

1175
  /**
1176
   * Get target MetadataIndexEntry and its end offset
1177
   *
1178
   * @param metadataIndex given MetadataIndexNode
1179
   * @param name target device / measurement name
1180
   * @param isDeviceLevel whether target MetadataIndexNode is device level
1181
   * @param exactSearch whether is in exact search mode, return null when there is no entry with
1182
   *     name; or else return the nearest MetadataIndexEntry before it (for deeper search)
1183
   * @return target MetadataIndexEntry, endOffset pair
1184
   */
1185
  protected Pair<MetadataIndexEntry, Long> getMetadataAndEndOffset(
1186
      MetadataIndexNode metadataIndex, String name, boolean isDeviceLevel, boolean exactSearch)
1187
      throws IOException {
1188
    try {
1189
      // When searching for a device node, return when it is not INTERNAL_DEVICE
1190
      // When searching for a measurement node, return when it is not INTERNAL_MEASUREMENT
1191
      if ((isDeviceLevel
1✔
1192
              && !metadataIndex.getNodeType().equals(MetadataIndexNodeType.INTERNAL_DEVICE))
1✔
1193
          || (!isDeviceLevel
1194
              && !metadataIndex.getNodeType().equals(MetadataIndexNodeType.INTERNAL_MEASUREMENT))) {
1✔
1195
        return metadataIndex.getChildIndexEntry(name, exactSearch);
1✔
1196
      } else {
1197
        Pair<MetadataIndexEntry, Long> childIndexEntry =
1✔
1198
            metadataIndex.getChildIndexEntry(name, false);
1✔
1199
        ByteBuffer buffer = readData(childIndexEntry.left.getOffset(), childIndexEntry.right);
1✔
1200
        return getMetadataAndEndOffset(
1✔
1201
            MetadataIndexNode.deserializeFrom(buffer), name, isDeviceLevel, exactSearch);
1✔
1202
      }
1203
    } catch (Exception e) {
×
1204
      logger.error("Something error happened while deserializing MetadataIndex of file {}", file);
×
1205
      throw e;
×
1206
    }
1207
  }
1208

1209
  /**
1210
   * read data from current position of the input, and deserialize it to a CHUNK_GROUP_FOOTER. <br>
1211
   * This method is not threadsafe.
1212
   *
1213
   * @return a CHUNK_GROUP_FOOTER
1214
   * @throws IOException io error
1215
   */
1216
  public ChunkGroupHeader readChunkGroupHeader() throws IOException {
1217
    return ChunkGroupHeader.deserializeFrom(tsFileInput.wrapAsInputStream(), true);
1✔
1218
  }
1219

1220
  /**
1221
   * read data from current position of the input, and deserialize it to a CHUNK_GROUP_FOOTER.
1222
   *
1223
   * @param position the offset of the chunk group footer in the file
1224
   * @param markerRead true if the offset does not contains the marker , otherwise false
1225
   * @return a CHUNK_GROUP_FOOTER
1226
   * @throws IOException io error
1227
   */
1228
  public ChunkGroupHeader readChunkGroupHeader(long position, boolean markerRead)
1229
      throws IOException {
1230
    return ChunkGroupHeader.deserializeFrom(tsFileInput, position, markerRead);
×
1231
  }
1232

1233
  public void readPlanIndex() throws IOException {
1234
    ByteBuffer buffer = ByteBuffer.allocate(Long.BYTES);
1✔
1235
    if (ReadWriteIOUtils.readAsPossible(tsFileInput, buffer) == 0) {
1✔
1236
      throw new IOException("reach the end of the file.");
×
1237
    }
1238
    buffer.flip();
1✔
1239
    minPlanIndex = buffer.getLong();
1✔
1240
    buffer.clear();
1✔
1241
    if (ReadWriteIOUtils.readAsPossible(tsFileInput, buffer) == 0) {
1✔
1242
      throw new IOException("reach the end of the file.");
×
1243
    }
1244
    buffer.flip();
1✔
1245
    maxPlanIndex = buffer.getLong();
1✔
1246
  }
1✔
1247

1248
  /**
1249
   * read data from current position of the input, and deserialize it to a CHUNK_HEADER. <br>
1250
   * This method is not threadsafe.
1251
   *
1252
   * @return a CHUNK_HEADER
1253
   * @throws IOException io error
1254
   */
1255
  public ChunkHeader readChunkHeader(byte chunkType) throws IOException {
1256
    try {
1257
      return ChunkHeader.deserializeFrom(tsFileInput.wrapAsInputStream(), chunkType);
1✔
1258
    } catch (Throwable t) {
1✔
1259
      logger.warn("Exception {} happened while reading chunk header of {}", t.getMessage(), file);
1✔
1260
      throw t;
1✔
1261
    }
1262
  }
1263

1264
  /**
1265
   * read the chunk's header.
1266
   *
1267
   * @param position the file offset of this chunk's header
1268
   * @param chunkHeaderSize the size of chunk's header
1269
   */
1270
  private ChunkHeader readChunkHeader(long position, int chunkHeaderSize) throws IOException {
1271
    try {
1272
      return ChunkHeader.deserializeFrom(tsFileInput, position, chunkHeaderSize);
1✔
1273
    } catch (Throwable t) {
×
1274
      logger.warn("Exception {} happened while reading chunk header of {}", t.getMessage(), file);
×
1275
      throw t;
×
1276
    }
1277
  }
1278

1279
  /**
1280
   * notice, this function will modify channel's position.
1281
   *
1282
   * @param dataSize the size of chunkdata
1283
   * @param position the offset of the chunk data
1284
   * @return the pages of this chunk
1285
   */
1286
  public ByteBuffer readChunk(long position, int dataSize) throws IOException {
1287
    try {
1288
      return readData(position, dataSize);
1✔
1289
    } catch (Throwable t) {
×
1290
      logger.warn("Exception {} happened while reading chunk of {}", t.getMessage(), file);
×
1291
      throw t;
×
1292
    }
1293
  }
1294

1295
  /**
1296
   * read memory chunk.
1297
   *
1298
   * @param metaData -given chunk meta data
1299
   * @return -chunk
1300
   */
1301
  public Chunk readMemChunk(ChunkMetadata metaData) throws IOException {
1302
    try {
1303
      int chunkHeadSize = ChunkHeader.getSerializedSize(metaData.getMeasurementUid());
1✔
1304
      ChunkHeader header = readChunkHeader(metaData.getOffsetOfChunkHeader(), chunkHeadSize);
1✔
1305
      ByteBuffer buffer =
1✔
1306
          readChunk(
1✔
1307
              metaData.getOffsetOfChunkHeader() + header.getSerializedSize(), header.getDataSize());
1✔
1308
      return new Chunk(header, buffer, metaData.getDeleteIntervalList(), metaData.getStatistics());
1✔
1309
    } catch (Throwable t) {
×
1310
      logger.warn("Exception {} happened while reading chunk of {}", t.getMessage(), file);
×
1311
      throw t;
×
1312
    }
1313
  }
1314

1315
  /**
1316
   * read memory chunk.
1317
   *
1318
   * @param chunkCacheKey given key of chunk LRUCache
1319
   * @return chunk
1320
   */
1321
  public Chunk readMemChunk(CachedChunkLoaderImpl.ChunkCacheKey chunkCacheKey) throws IOException {
1322
    int chunkHeadSize = ChunkHeader.getSerializedSize(chunkCacheKey.getMeasurementUid());
1✔
1323
    ChunkHeader header = readChunkHeader(chunkCacheKey.getOffsetOfChunkHeader(), chunkHeadSize);
1✔
1324
    ByteBuffer buffer =
1✔
1325
        readChunk(
1✔
1326
            chunkCacheKey.getOffsetOfChunkHeader() + header.getSerializedSize(),
1✔
1327
            header.getDataSize());
1✔
1328
    return new Chunk(
1✔
1329
        header, buffer, chunkCacheKey.getDeleteIntervalList(), chunkCacheKey.getStatistics());
1✔
1330
  }
1331

1332
  /**
1333
   * read the {@link CompressionType} and {@link TSEncoding} of a timeseries. This method will skip
1334
   * the measurement id, and data type. This method will change the position of this reader.
1335
   *
1336
   * @param timeseriesMetadata timeseries' metadata
1337
   * @return a pair of {@link CompressionType} and {@link TSEncoding} of given timeseries.
1338
   * @throws IOException
1339
   */
1340
  public Pair<CompressionType, TSEncoding> readTimeseriesCompressionTypeAndEncoding(
1341
      TimeseriesMetadata timeseriesMetadata) throws IOException {
1342

1343
    String measurementId = timeseriesMetadata.getMeasurementId();
×
1344
    int measurementIdLength = measurementId.getBytes(TSFileConfig.STRING_CHARSET).length;
×
1345
    position(
×
1346
        timeseriesMetadata.getChunkMetadataList().get(0).getOffsetOfChunkHeader()
×
1347
            + Byte.BYTES // chunkType
1348
            + ReadWriteForEncodingUtils.varIntSize(measurementIdLength) // measurementID length
×
1349
            + measurementIdLength); // measurementID
1350
    return ChunkHeader.deserializeCompressionTypeAndEncoding(tsFileInput.wrapAsInputStream());
×
1351
  }
1352

1353
  /** Get measurement schema by chunkMetadatas. */
1354
  public MeasurementSchema getMeasurementSchema(List<IChunkMetadata> chunkMetadataList)
1355
      throws IOException {
1356
    if (chunkMetadataList.isEmpty()) {
1✔
1357
      return null;
×
1358
    }
1359
    IChunkMetadata lastChunkMetadata = chunkMetadataList.get(chunkMetadataList.size() - 1);
1✔
1360
    int chunkHeadSize = ChunkHeader.getSerializedSize(lastChunkMetadata.getMeasurementUid());
1✔
1361
    ChunkHeader header = readChunkHeader(lastChunkMetadata.getOffsetOfChunkHeader(), chunkHeadSize);
1✔
1362
    return new MeasurementSchema(
1✔
1363
        lastChunkMetadata.getMeasurementUid(),
1✔
1364
        header.getDataType(),
1✔
1365
        header.getEncodingType(),
1✔
1366
        header.getCompressionType());
1✔
1367
  }
1368

1369
  /**
1370
   * not thread safe.
1371
   *
1372
   * @param type given tsfile data type
1373
   */
1374
  public PageHeader readPageHeader(TSDataType type, boolean hasStatistic) throws IOException {
1375
    try {
1376
      return PageHeader.deserializeFrom(tsFileInput.wrapAsInputStream(), type, hasStatistic);
1✔
1377
    } catch (Throwable t) {
1✔
1378
      logger.warn("Exception {} happened while reading page header of {}", t.getMessage(), file);
1✔
1379
      throw t;
1✔
1380
    }
1381
  }
1382

1383
  public long position() throws IOException {
1384
    return tsFileInput.position();
1✔
1385
  }
1386

1387
  public void position(long offset) throws IOException {
1388
    tsFileInput.position(offset);
1✔
1389
  }
1✔
1390

1391
  public void skipPageData(PageHeader header) throws IOException {
1392
    tsFileInput.position(tsFileInput.position() + header.getCompressedSize());
1✔
1393
  }
1✔
1394

1395
  public ByteBuffer readCompressedPage(PageHeader header) throws IOException {
1396
    return readData(-1, header.getCompressedSize());
×
1397
  }
1398

1399
  public ByteBuffer readPage(PageHeader header, CompressionType type) throws IOException {
1400
    ByteBuffer buffer = readData(-1, header.getCompressedSize());
1✔
1401
    if (header.getUncompressedSize() == 0 || type == CompressionType.UNCOMPRESSED) {
1✔
1402
      return buffer;
1✔
1403
    } // FIXME if the buffer is not array-implemented.
1404
    IUnCompressor unCompressor = IUnCompressor.getUnCompressor(type);
1✔
1405
    ByteBuffer uncompressedBuffer = ByteBuffer.allocate(header.getUncompressedSize());
1✔
1406
    unCompressor.uncompress(
1✔
1407
        buffer.array(), buffer.position(), buffer.remaining(), uncompressedBuffer.array(), 0);
1✔
1408
    return uncompressedBuffer;
1✔
1409
  }
1410

1411
  /**
1412
   * read one byte from the input. <br>
1413
   * this method is not thread safe
1414
   */
1415
  public byte readMarker() throws IOException {
1416
    markerBuffer.clear();
1✔
1417
    if (ReadWriteIOUtils.readAsPossible(tsFileInput, markerBuffer) == 0) {
1✔
1418
      throw new IOException("reach the end of the file.");
1✔
1419
    }
1420
    markerBuffer.flip();
1✔
1421
    return markerBuffer.get();
1✔
1422
  }
1423

1424
  @Override
1425
  public void close() throws IOException {
1426
    if (resourceLogger.isDebugEnabled()) {
1✔
1427
      resourceLogger.debug("{} reader is closed.", file);
×
1428
    }
1429
    this.tsFileInput.close();
1✔
1430
  }
1✔
1431

1432
  public String getFileName() {
1433
    return this.file;
×
1434
  }
1435

1436
  public long fileSize() throws IOException {
1437
    return tsFileInput.size();
×
1438
  }
1439

1440
  /**
1441
   * read data from tsFileInput, from the current position (if position = -1), or the given
1442
   * position. <br>
1443
   * if position = -1, the tsFileInput's position will be changed to the current position + real
1444
   * data size that been read. Other wise, the tsFileInput's position is not changed.
1445
   *
1446
   * @param position the start position of data in the tsFileInput, or the current position if
1447
   *     position = -1
1448
   * @param totalSize the size of data that want to read
1449
   * @return data that been read.
1450
   */
1451
  protected ByteBuffer readData(long position, int totalSize) throws IOException {
1452
    int allocateSize = Math.min(MAX_READ_BUFFER_SIZE, totalSize);
1✔
1453
    int allocateNum = (int) Math.ceil((double) totalSize / allocateSize);
1✔
1454
    ByteBuffer buffer = ByteBuffer.allocate(totalSize);
1✔
1455
    int bufferLimit = 0;
1✔
1456
    for (int i = 0; i < allocateNum; i++) {
1✔
1457
      if (i == allocateNum - 1) {
1✔
1458
        allocateSize = totalSize - allocateSize * (allocateNum - 1);
1✔
1459
      }
1460
      bufferLimit += allocateSize;
1✔
1461
      buffer.limit(bufferLimit);
1✔
1462
      if (position < 0) {
1✔
1463
        if (ReadWriteIOUtils.readAsPossible(tsFileInput, buffer) != allocateSize) {
1✔
1464
          throw new IOException("reach the end of the data");
1✔
1465
        }
1466
      } else {
1467
        long actualReadSize =
1✔
1468
            ReadWriteIOUtils.readAsPossible(tsFileInput, buffer, position, allocateSize);
1✔
1469
        if (actualReadSize != allocateSize) {
1✔
1470
          throw new IOException(
×
1471
              String.format(
×
1472
                  "reach the end of the data. Size of data that want to read: %s,"
1473
                      + "actual read size: %s, position: %s",
1474
                  allocateSize, actualReadSize, position));
×
1475
        }
1476
        position += allocateSize;
1✔
1477
      }
1478
    }
1479
    buffer.flip();
1✔
1480
    return buffer;
1✔
1481
  }
1482

1483
  /**
1484
   * read data from tsFileInput, from the current position (if position = -1), or the given
1485
   * position.
1486
   *
1487
   * @param start the start position of data in the tsFileInput, or the current position if position
1488
   *     = -1
1489
   * @param end the end position of data that want to read
1490
   * @return data that been read.
1491
   */
1492
  protected ByteBuffer readData(long start, long end) throws IOException {
1493
    try {
1494
      return readData(start, (int) (end - start));
1✔
1495
    } catch (Throwable t) {
1✔
1496
      logger.warn("Exception {} happened while reading data of {}", t.getMessage(), file);
1✔
1497
      throw t;
1✔
1498
    }
1499
  }
1500

1501
  /** notice, the target bytebuffer are not flipped. */
1502
  public int readRaw(long position, int length, ByteBuffer target) throws IOException {
1503
    return ReadWriteIOUtils.readAsPossible(tsFileInput, target, position, length);
×
1504
  }
1505

1506
  /**
1507
   * Self Check the file and return the position before where the data is safe.
1508
   *
1509
   * @param newSchema the schema on each time series in the file
1510
   * @param chunkGroupMetadataList ChunkGroupMetadata List
1511
   * @param fastFinish if true and the file is complete, then newSchema and chunkGroupMetadataList
1512
   *     parameter will be not modified.
1513
   * @return the position of the file that is fine. All data after the position in the file should
1514
   *     be truncated.
1515
   */
1516
  @SuppressWarnings("squid:S3776") // Suppress high Cognitive Complexity warning
1517
  public long selfCheck(
1518
      Map<Path, IMeasurementSchema> newSchema,
1519
      List<ChunkGroupMetadata> chunkGroupMetadataList,
1520
      boolean fastFinish)
1521
      throws IOException {
1522
    File checkFile = FSFactoryProducer.getFSFactory().getFile(this.file);
1✔
1523
    long fileSize;
1524
    if (!checkFile.exists()) {
1✔
1525
      return TsFileCheckStatus.FILE_NOT_FOUND;
×
1526
    } else {
1527
      fileSize = checkFile.length();
1✔
1528
    }
1529
    ChunkMetadata currentChunk;
1530
    String measurementID;
1531
    TSDataType dataType;
1532
    long fileOffsetOfChunk;
1533

1534
    // ChunkMetadata of current ChunkGroup
1535
    List<ChunkMetadata> chunkMetadataList = new ArrayList<>();
1✔
1536

1537
    int headerLength = TSFileConfig.MAGIC_STRING.getBytes().length + Byte.BYTES;
1✔
1538
    if (fileSize < headerLength) {
1✔
1539
      return TsFileCheckStatus.INCOMPATIBLE_FILE;
1✔
1540
    }
1541
    if (!TSFileConfig.MAGIC_STRING.equals(readHeadMagic())
1✔
1542
        || (TSFileConfig.VERSION_NUMBER != readVersionNumber())) {
1✔
1543
      return TsFileCheckStatus.INCOMPATIBLE_FILE;
×
1544
    }
1545

1546
    tsFileInput.position(headerLength);
1✔
1547
    boolean isComplete = isComplete();
1✔
1548
    if (fileSize == headerLength) {
1✔
1549
      return headerLength;
1✔
1550
    } else if (isComplete) {
1✔
1551
      loadMetadataSize();
1✔
1552
      if (fastFinish) {
1✔
1553
        return TsFileCheckStatus.COMPLETE_FILE;
1✔
1554
      }
1555
    }
1556
    // if not a complete file, we will recover it...
1557
    long truncatedSize = headerLength;
1✔
1558
    byte marker;
1559
    List<long[]> timeBatch = new ArrayList<>();
1✔
1560
    String lastDeviceId = null;
1✔
1561
    List<IMeasurementSchema> measurementSchemaList = new ArrayList<>();
1✔
1562
    try {
1563
      while ((marker = this.readMarker()) != MetaMarker.SEPARATOR) {
1✔
1564
        switch (marker) {
1✔
1565
          case MetaMarker.CHUNK_HEADER:
1566
          case MetaMarker.TIME_CHUNK_HEADER:
1567
          case MetaMarker.VALUE_CHUNK_HEADER:
1568
          case MetaMarker.ONLY_ONE_PAGE_CHUNK_HEADER:
1569
          case MetaMarker.ONLY_ONE_PAGE_TIME_CHUNK_HEADER:
1570
          case MetaMarker.ONLY_ONE_PAGE_VALUE_CHUNK_HEADER:
1571
            fileOffsetOfChunk = this.position() - 1;
1✔
1572
            // if there is something wrong with a chunk, we will drop the whole ChunkGroup
1573
            // as different chunks may be created by the same insertions(sqls), and partial
1574
            // insertion is not tolerable
1575
            ChunkHeader chunkHeader = this.readChunkHeader(marker);
1✔
1576
            measurementID = chunkHeader.getMeasurementID();
1✔
1577
            IMeasurementSchema measurementSchema =
1✔
1578
                new MeasurementSchema(
1579
                    measurementID,
1580
                    chunkHeader.getDataType(),
1✔
1581
                    chunkHeader.getEncodingType(),
1✔
1582
                    chunkHeader.getCompressionType());
1✔
1583
            measurementSchemaList.add(measurementSchema);
1✔
1584
            dataType = chunkHeader.getDataType();
1✔
1585
            if (chunkHeader.getDataType() == TSDataType.VECTOR) {
1✔
1586
              timeBatch.clear();
1✔
1587
            }
1588
            Statistics<? extends Serializable> chunkStatistics =
1✔
1589
                Statistics.getStatsByType(dataType);
1✔
1590
            int dataSize = chunkHeader.getDataSize();
1✔
1591

1592
            if (dataSize > 0) {
1✔
1593
              if (((byte) (chunkHeader.getChunkType() & 0x3F))
1✔
1594
                  == MetaMarker
1595
                      .CHUNK_HEADER) { // more than one page, we could use page statistics to
1596
                // generate chunk statistic
1597
                while (dataSize > 0) {
1✔
1598
                  // a new Page
1599
                  PageHeader pageHeader = this.readPageHeader(chunkHeader.getDataType(), true);
1✔
1600
                  if (pageHeader.getUncompressedSize() != 0) {
1✔
1601
                    // not empty page
1602
                    chunkStatistics.mergeStatistics(pageHeader.getStatistics());
1✔
1603
                  }
1604
                  this.skipPageData(pageHeader);
1✔
1605
                  dataSize -= pageHeader.getSerializedPageSize();
1✔
1606
                  chunkHeader.increasePageNums(1);
1✔
1607
                }
1✔
1608
              } else { // only one page without statistic, we need to iterate each point to generate
1609
                // chunk statistic
1610
                PageHeader pageHeader = this.readPageHeader(chunkHeader.getDataType(), false);
1✔
1611
                Decoder valueDecoder =
1✔
1612
                    Decoder.getDecoderByType(
1✔
1613
                        chunkHeader.getEncodingType(), chunkHeader.getDataType());
1✔
1614
                ByteBuffer pageData = readPage(pageHeader, chunkHeader.getCompressionType());
1✔
1615
                Decoder timeDecoder =
1616
                    Decoder.getDecoderByType(
1✔
1617
                        TSEncoding.valueOf(
1✔
1618
                            TSFileDescriptor.getInstance().getConfig().getTimeEncoder()),
1✔
1619
                        TSDataType.INT64);
1620

1621
                if ((chunkHeader.getChunkType() & TsFileConstant.TIME_COLUMN_MASK)
1✔
1622
                    == TsFileConstant.TIME_COLUMN_MASK) { // Time Chunk with only one page
1623

1624
                  TimePageReader timePageReader =
×
1625
                      new TimePageReader(pageHeader, pageData, timeDecoder);
1626
                  long[] currentTimeBatch = timePageReader.getNextTimeBatch();
×
1627
                  timeBatch.add(currentTimeBatch);
×
1628
                  for (long currentTime : currentTimeBatch) {
×
1629
                    chunkStatistics.update(currentTime);
×
1630
                  }
1631
                } else if ((chunkHeader.getChunkType() & TsFileConstant.VALUE_COLUMN_MASK)
1✔
1632
                    == TsFileConstant.VALUE_COLUMN_MASK) { // Value Chunk with only one page
1633

1634
                  ValuePageReader valuePageReader =
×
1635
                      new ValuePageReader(
1636
                          pageHeader, pageData, chunkHeader.getDataType(), valueDecoder);
×
1637
                  TsPrimitiveType[] valueBatch = valuePageReader.nextValueBatch(timeBatch.get(0));
×
1638

1639
                  if (valueBatch != null && valueBatch.length != 0) {
×
1640
                    for (int i = 0; i < valueBatch.length; i++) {
×
1641
                      TsPrimitiveType value = valueBatch[i];
×
1642
                      if (value == null) {
×
1643
                        continue;
×
1644
                      }
1645
                      long timeStamp = timeBatch.get(0)[i];
×
1646
                      switch (dataType) {
×
1647
                        case INT32:
1648
                          chunkStatistics.update(timeStamp, value.getInt());
×
1649
                          break;
×
1650
                        case INT64:
1651
                          chunkStatistics.update(timeStamp, value.getLong());
×
1652
                          break;
×
1653
                        case FLOAT:
1654
                          chunkStatistics.update(timeStamp, value.getFloat());
×
1655
                          break;
×
1656
                        case DOUBLE:
1657
                          chunkStatistics.update(timeStamp, value.getDouble());
×
1658
                          break;
×
1659
                        case BOOLEAN:
1660
                          chunkStatistics.update(timeStamp, value.getBoolean());
×
1661
                          break;
×
1662
                        case TEXT:
1663
                          chunkStatistics.update(timeStamp, value.getBinary());
×
1664
                          break;
×
1665
                        default:
1666
                          throw new IOException("Unexpected type " + dataType);
×
1667
                      }
1668
                    }
1669
                  }
1670

1671
                } else { // NonAligned Chunk with only one page
×
1672
                  PageReader reader =
1✔
1673
                      new PageReader(
1674
                          pageHeader,
1675
                          pageData,
1676
                          chunkHeader.getDataType(),
1✔
1677
                          valueDecoder,
1678
                          timeDecoder,
1679
                          null);
1680
                  BatchData batchData = reader.getAllSatisfiedPageData();
1✔
1681
                  while (batchData.hasCurrent()) {
1✔
1682
                    switch (dataType) {
1✔
1683
                      case INT32:
1684
                        chunkStatistics.update(batchData.currentTime(), batchData.getInt());
1✔
1685
                        break;
1✔
1686
                      case INT64:
1687
                        chunkStatistics.update(batchData.currentTime(), batchData.getLong());
1✔
1688
                        break;
1✔
1689
                      case FLOAT:
1690
                        chunkStatistics.update(batchData.currentTime(), batchData.getFloat());
1✔
1691
                        break;
1✔
1692
                      case DOUBLE:
1693
                        chunkStatistics.update(batchData.currentTime(), batchData.getDouble());
1✔
1694
                        break;
1✔
1695
                      case BOOLEAN:
1696
                        chunkStatistics.update(batchData.currentTime(), batchData.getBoolean());
×
1697
                        break;
×
1698
                      case TEXT:
1699
                        chunkStatistics.update(batchData.currentTime(), batchData.getBinary());
×
1700
                        break;
×
1701
                      default:
1702
                        throw new IOException("Unexpected type " + dataType);
×
1703
                    }
1704
                    batchData.next();
1✔
1705
                  }
1706
                }
1707
                chunkHeader.increasePageNums(1);
1✔
1708
              }
1709
            }
1710
            currentChunk =
1✔
1711
                new ChunkMetadata(measurementID, dataType, fileOffsetOfChunk, chunkStatistics);
1712
            chunkMetadataList.add(currentChunk);
1✔
1713
            break;
1✔
1714
          case MetaMarker.CHUNK_GROUP_HEADER:
1715
            // if there is something wrong with the ChunkGroup Header, we will drop this ChunkGroup
1716
            // because we can not guarantee the correctness of the deviceId.
1717
            truncatedSize = this.position() - 1;
1✔
1718
            if (lastDeviceId != null) {
1✔
1719
              // schema of last chunk group
1720
              if (newSchema != null) {
1✔
1721
                for (IMeasurementSchema tsSchema : measurementSchemaList) {
1✔
1722
                  newSchema.putIfAbsent(
1✔
1723
                      new Path(lastDeviceId, tsSchema.getMeasurementId(), true), tsSchema);
1✔
1724
                }
1✔
1725
              }
1726
              measurementSchemaList = new ArrayList<>();
1✔
1727
              // last chunk group Metadata
1728
              chunkGroupMetadataList.add(new ChunkGroupMetadata(lastDeviceId, chunkMetadataList));
1✔
1729
            }
1730
            // this is a chunk group
1731
            chunkMetadataList = new ArrayList<>();
1✔
1732
            ChunkGroupHeader chunkGroupHeader = this.readChunkGroupHeader();
1✔
1733
            lastDeviceId = chunkGroupHeader.getDeviceID();
1✔
1734
            break;
1✔
1735
          case MetaMarker.OPERATION_INDEX_RANGE:
1736
            truncatedSize = this.position() - 1;
1✔
1737
            if (lastDeviceId != null) {
1✔
1738
              // schema of last chunk group
1739
              if (newSchema != null) {
1✔
1740
                for (IMeasurementSchema tsSchema : measurementSchemaList) {
1✔
1741
                  newSchema.putIfAbsent(
1✔
1742
                      new Path(lastDeviceId, tsSchema.getMeasurementId(), true), tsSchema);
1✔
1743
                }
1✔
1744
              }
1745
              measurementSchemaList = new ArrayList<>();
1✔
1746
              // last chunk group Metadata
1747
              chunkGroupMetadataList.add(new ChunkGroupMetadata(lastDeviceId, chunkMetadataList));
1✔
1748
              lastDeviceId = null;
1✔
1749
            }
1750
            readPlanIndex();
1✔
1751
            truncatedSize = this.position();
1✔
1752
            break;
1✔
1753
          default:
1754
            // the disk file is corrupted, using this file may be dangerous
1755
            throw new IOException("Unexpected marker " + marker);
×
1756
        }
1757
      }
1758
      // now we read the tail of the data section, so we are sure that the last
1759
      // ChunkGroupFooter is complete.
1760
      if (lastDeviceId != null) {
1✔
1761
        // schema of last chunk group
1762
        if (newSchema != null) {
1✔
1763
          for (IMeasurementSchema tsSchema : measurementSchemaList) {
1✔
1764
            newSchema.putIfAbsent(
1✔
1765
                new Path(lastDeviceId, tsSchema.getMeasurementId(), true), tsSchema);
1✔
1766
          }
1✔
1767
        }
1768
        // last chunk group Metadata
1769
        chunkGroupMetadataList.add(new ChunkGroupMetadata(lastDeviceId, chunkMetadataList));
1✔
1770
      }
1771
      if (isComplete) {
1✔
1772
        truncatedSize = TsFileCheckStatus.COMPLETE_FILE;
1✔
1773
      } else {
1774
        truncatedSize = this.position() - 1;
1✔
1775
      }
1776
    } catch (Exception e) {
1✔
1777
      logger.warn(
1✔
1778
          "TsFile {} self-check cannot proceed at position {} " + "recovered, because : {}",
1779
          file,
1780
          this.position(),
1✔
1781
          e.getMessage());
1✔
1782
    }
1✔
1783
    // Despite the completeness of the data section, we will discard current FileMetadata
1784
    // so that we can continue to write data into this tsfile.
1785
    return truncatedSize;
1✔
1786
  }
1787

1788
  /**
1789
   * Self Check the file and return whether the file is safe.
1790
   *
1791
   * @param filename the path of file
1792
   * @param fastFinish if true, the method will only check the format of head (Magic String TsFile,
1793
   *     Version Number) and tail (Magic String TsFile) of TsFile.
1794
   * @return the status of TsFile
1795
   */
1796
  public long selfCheckWithInfo(
1797
      String filename,
1798
      boolean fastFinish,
1799
      Map<Long, Pair<Path, TimeseriesMetadata>> timeseriesMetadataMap)
1800
      throws IOException, TsFileStatisticsMistakesException {
1801
    String message = " exists statistics mistakes at position ";
1✔
1802
    File checkFile = FSFactoryProducer.getFSFactory().getFile(filename);
1✔
1803
    if (!checkFile.exists()) {
1✔
1804
      return TsFileCheckStatus.FILE_NOT_FOUND;
×
1805
    }
1806
    long fileSize = checkFile.length();
1✔
1807
    logger.info("file length: " + fileSize);
1✔
1808

1809
    int headerLength = TSFileConfig.MAGIC_STRING.getBytes().length + Byte.BYTES;
1✔
1810
    if (fileSize < headerLength) {
1✔
1811
      return TsFileCheckStatus.INCOMPATIBLE_FILE;
×
1812
    }
1813
    try {
1814
      if (!TSFileConfig.MAGIC_STRING.equals(readHeadMagic())
1✔
1815
          || (TSFileConfig.VERSION_NUMBER != readVersionNumber())) {
1✔
1816
        return TsFileCheckStatus.INCOMPATIBLE_FILE;
×
1817
      }
1818
      tsFileInput.position(headerLength);
1✔
1819
      if (isComplete()) {
1✔
1820
        loadMetadataSize();
1✔
1821
        if (fastFinish) {
1✔
1822
          return TsFileCheckStatus.COMPLETE_FILE;
×
1823
        }
1824
      }
1825
    } catch (IOException e) {
×
1826
      logger.error("Error occurred while fast checking TsFile.");
×
1827
      throw e;
×
1828
    }
1✔
1829
    for (Map.Entry<Long, Pair<Path, TimeseriesMetadata>> entry : timeseriesMetadataMap.entrySet()) {
1✔
1830
      TimeseriesMetadata timeseriesMetadata = entry.getValue().right;
1✔
1831
      TSDataType dataType = timeseriesMetadata.getTsDataType();
1✔
1832
      Statistics<? extends Serializable> timeseriesMetadataSta = timeseriesMetadata.getStatistics();
1✔
1833
      Statistics<? extends Serializable> chunkMetadatasSta = Statistics.getStatsByType(dataType);
1✔
1834
      for (IChunkMetadata chunkMetadata : getChunkMetadataList(entry.getValue().left)) {
1✔
1835
        long tscheckStatus = TsFileCheckStatus.COMPLETE_FILE;
1✔
1836
        try {
1837
          tscheckStatus = checkChunkAndPagesStatistics(chunkMetadata);
1✔
1838
        } catch (IOException e) {
×
1839
          logger.error("Error occurred while checking the statistics of chunk and its pages");
×
1840
          throw e;
×
1841
        }
1✔
1842
        if (tscheckStatus == TsFileCheckStatus.FILE_EXISTS_MISTAKES) {
1✔
1843
          throw new TsFileStatisticsMistakesException(
1✔
1844
              "Chunk" + message + chunkMetadata.getOffsetOfChunkHeader());
1✔
1845
        }
1846
        chunkMetadatasSta.mergeStatistics(chunkMetadata.getStatistics());
1✔
1847
      }
1✔
1848
      if (!timeseriesMetadataSta.equals(chunkMetadatasSta)) {
1✔
1849
        long timeseriesMetadataPos = entry.getKey();
×
1850
        throw new TsFileStatisticsMistakesException(
×
1851
            "TimeseriesMetadata" + message + timeseriesMetadataPos);
1852
      }
1853
    }
1✔
1854
    return TsFileCheckStatus.COMPLETE_FILE;
1✔
1855
  }
1856

1857
  public long checkChunkAndPagesStatistics(IChunkMetadata chunkMetadata) throws IOException {
1858
    long offsetOfChunkHeader = chunkMetadata.getOffsetOfChunkHeader();
1✔
1859
    tsFileInput.position(offsetOfChunkHeader);
1✔
1860
    byte marker = this.readMarker();
1✔
1861
    ChunkHeader chunkHeader = this.readChunkHeader(marker);
1✔
1862
    TSDataType dataType = chunkHeader.getDataType();
1✔
1863
    Statistics<? extends Serializable> chunkStatistics = Statistics.getStatsByType(dataType);
1✔
1864
    int dataSize = chunkHeader.getDataSize();
1✔
1865
    if (((byte) (chunkHeader.getChunkType() & 0x3F)) == MetaMarker.CHUNK_HEADER) {
1✔
1866
      while (dataSize > 0) {
1✔
1867
        // a new Page
1868
        PageHeader pageHeader = this.readPageHeader(chunkHeader.getDataType(), true);
1✔
1869
        chunkStatistics.mergeStatistics(pageHeader.getStatistics());
1✔
1870
        this.skipPageData(pageHeader);
1✔
1871
        dataSize -= pageHeader.getSerializedPageSize();
1✔
1872
        chunkHeader.increasePageNums(1);
1✔
1873
      }
1✔
1874
    } else {
1875
      // only one page without statistic, we need to iterate each point to generate
1876
      // statistic
1877
      PageHeader pageHeader = this.readPageHeader(chunkHeader.getDataType(), false);
×
1878
      Decoder valueDecoder =
×
1879
          Decoder.getDecoderByType(chunkHeader.getEncodingType(), chunkHeader.getDataType());
×
1880
      ByteBuffer pageData = readPage(pageHeader, chunkHeader.getCompressionType());
×
1881
      Decoder timeDecoder =
1882
          Decoder.getDecoderByType(
×
1883
              TSEncoding.valueOf(TSFileDescriptor.getInstance().getConfig().getTimeEncoder()),
×
1884
              TSDataType.INT64);
1885
      PageReader reader =
×
1886
          new PageReader(
1887
              pageHeader, pageData, chunkHeader.getDataType(), valueDecoder, timeDecoder, null);
×
1888
      BatchData batchData = reader.getAllSatisfiedPageData();
×
1889
      while (batchData.hasCurrent()) {
×
1890
        switch (dataType) {
×
1891
          case INT32:
1892
            chunkStatistics.update(batchData.currentTime(), batchData.getInt());
×
1893
            break;
×
1894
          case INT64:
1895
            chunkStatistics.update(batchData.currentTime(), batchData.getLong());
×
1896
            break;
×
1897
          case FLOAT:
1898
            chunkStatistics.update(batchData.currentTime(), batchData.getFloat());
×
1899
            break;
×
1900
          case DOUBLE:
1901
            chunkStatistics.update(batchData.currentTime(), batchData.getDouble());
×
1902
            break;
×
1903
          case BOOLEAN:
1904
            chunkStatistics.update(batchData.currentTime(), batchData.getBoolean());
×
1905
            break;
×
1906
          case TEXT:
1907
            chunkStatistics.update(batchData.currentTime(), batchData.getBinary());
×
1908
            break;
×
1909
          default:
1910
            throw new IOException("Unexpected type " + dataType);
×
1911
        }
1912
        batchData.next();
×
1913
      }
1914
      chunkHeader.increasePageNums(1);
×
1915
    }
1916
    if (chunkMetadata.getStatistics().equals(chunkStatistics)) {
1✔
1917
      return TsFileCheckStatus.COMPLETE_FILE;
1✔
1918
    }
1919
    return TsFileCheckStatus.FILE_EXISTS_MISTAKES;
1✔
1920
  }
1921

1922
  /**
1923
   * get ChunkMetaDatas of given path, and throw exception if path not exists
1924
   *
1925
   * @param path timeseries path
1926
   * @return List of ChunkMetaData
1927
   */
1928
  public List<ChunkMetadata> getChunkMetadataList(Path path, boolean ignoreNotExists)
1929
      throws IOException {
1930
    TimeseriesMetadata timeseriesMetaData =
1✔
1931
        readTimeseriesMetadata(path.getDevice(), path.getMeasurement(), ignoreNotExists);
1✔
1932
    if (timeseriesMetaData == null) {
1✔
1933
      return Collections.emptyList();
×
1934
    }
1935
    List<ChunkMetadata> chunkMetadataList = readChunkMetaDataList(timeseriesMetaData);
1✔
1936
    chunkMetadataList.sort(Comparator.comparingLong(IChunkMetadata::getStartTime));
1✔
1937
    return chunkMetadataList;
1✔
1938
  }
1939

1940
  // This method is only used for TsFile
1941
  public List<IChunkMetadata> getIChunkMetadataList(Path path) throws IOException {
1942
    ITimeSeriesMetadata timeseriesMetaData = readITimeseriesMetadata(path, true);
1✔
1943
    if (timeseriesMetaData == null) {
1✔
1944
      return Collections.emptyList();
1✔
1945
    }
1946
    List<IChunkMetadata> chunkMetadataList = readIChunkMetaDataList(timeseriesMetaData);
1✔
1947
    chunkMetadataList.sort(Comparator.comparingLong(IChunkMetadata::getStartTime));
1✔
1948
    return chunkMetadataList;
1✔
1949
  }
1950

1951
  public List<ChunkMetadata> getChunkMetadataList(Path path) throws IOException {
1952
    return getChunkMetadataList(path, false);
1✔
1953
  }
1954

1955
  /**
1956
   * Get AlignedChunkMetadata of sensors under one device
1957
   *
1958
   * @param device device name
1959
   */
1960
  public List<AlignedChunkMetadata> getAlignedChunkMetadata(String device) throws IOException {
1961
    readFileMetadata();
1✔
1962
    MetadataIndexNode deviceMetadataIndexNode = tsFileMetaData.getMetadataIndex();
1✔
1963
    Pair<MetadataIndexEntry, Long> metadataIndexPair =
1✔
1964
        getMetadataAndEndOffset(deviceMetadataIndexNode, device, true, true);
1✔
1965
    if (metadataIndexPair == null) {
1✔
1966
      throw new IOException("Device {" + device + "} is not in tsFileMetaData");
1✔
1967
    }
1968
    ByteBuffer buffer = readData(metadataIndexPair.left.getOffset(), metadataIndexPair.right);
1✔
1969
    MetadataIndexNode metadataIndexNode;
1970
    TimeseriesMetadata firstTimeseriesMetadata;
1971
    try {
1972
      // next layer MeasurementNode of the specific DeviceNode
1973
      metadataIndexNode = MetadataIndexNode.deserializeFrom(buffer);
1✔
1974
    } catch (Exception e) {
×
1975
      logger.error(METADATA_INDEX_NODE_DESERIALIZE_ERROR, file);
×
1976
      throw e;
×
1977
    }
1✔
1978
    firstTimeseriesMetadata = tryToGetFirstTimeseriesMetadata(metadataIndexNode);
1✔
1979
    if (firstTimeseriesMetadata == null) {
1✔
1980
      throw new IOException("Timeseries of device {" + device + "} are not aligned");
1✔
1981
    }
1982

1983
    Map<String, List<TimeseriesMetadata>> timeseriesMetadataMap = new TreeMap<>();
1✔
1984
    List<MetadataIndexEntry> metadataIndexEntryList = metadataIndexNode.getChildren();
1✔
1985

1986
    for (int i = 0; i < metadataIndexEntryList.size(); i++) {
1✔
1987
      MetadataIndexEntry metadataIndexEntry = metadataIndexEntryList.get(i);
1✔
1988
      long endOffset = metadataIndexNode.getEndOffset();
1✔
1989
      if (i != metadataIndexEntryList.size() - 1) {
1✔
1990
        endOffset = metadataIndexEntryList.get(i + 1).getOffset();
1✔
1991
      }
1992
      buffer = readData(metadataIndexEntry.getOffset(), endOffset);
1✔
1993
      if (metadataIndexNode.getNodeType().equals(MetadataIndexNodeType.LEAF_MEASUREMENT)) {
1✔
1994
        List<TimeseriesMetadata> timeseriesMetadataList = new ArrayList<>();
1✔
1995
        while (buffer.hasRemaining()) {
1✔
1996
          timeseriesMetadataList.add(TimeseriesMetadata.deserializeFrom(buffer, true));
1✔
1997
        }
1998
        timeseriesMetadataMap
1✔
1999
            .computeIfAbsent(device, k -> new ArrayList<>())
1✔
2000
            .addAll(timeseriesMetadataList);
1✔
2001
      } else {
1✔
2002
        generateMetadataIndex(
1✔
2003
            metadataIndexEntry,
2004
            buffer,
2005
            device,
2006
            metadataIndexNode.getNodeType(),
1✔
2007
            timeseriesMetadataMap,
2008
            true);
2009
      }
2010
    }
2011

2012
    if (timeseriesMetadataMap.values().size() != 1) {
1✔
2013
      throw new IOException(
×
2014
          String.format(
×
2015
              "Error when reading timeseriesMetadata of device %s in file %s: should only one timeseriesMetadataList in one device, actual: %d",
2016
              device, file, timeseriesMetadataMap.values().size()));
×
2017
    }
2018

2019
    List<TimeseriesMetadata> timeseriesMetadataList =
1✔
2020
        timeseriesMetadataMap.values().iterator().next();
1✔
2021
    TimeseriesMetadata timeseriesMetadata = timeseriesMetadataList.get(0);
1✔
2022
    List<TimeseriesMetadata> valueTimeseriesMetadataList = new ArrayList<>();
1✔
2023

2024
    for (int i = 1; i < timeseriesMetadataList.size(); i++) {
1✔
2025
      valueTimeseriesMetadataList.add(timeseriesMetadataList.get(i));
1✔
2026
    }
2027

2028
    AlignedTimeSeriesMetadata alignedTimeSeriesMetadata =
1✔
2029
        new AlignedTimeSeriesMetadata(timeseriesMetadata, valueTimeseriesMetadataList);
2030
    List<AlignedChunkMetadata> chunkMetadataList = new ArrayList<>();
1✔
2031
    for (IChunkMetadata chunkMetadata : readIChunkMetaDataList(alignedTimeSeriesMetadata)) {
1✔
2032
      chunkMetadataList.add((AlignedChunkMetadata) chunkMetadata);
1✔
2033
    }
1✔
2034
    return chunkMetadataList;
1✔
2035
  }
2036

2037
  /**
2038
   * get ChunkMetaDatas in given TimeseriesMetaData
2039
   *
2040
   * @return List of ChunkMetaData
2041
   */
2042
  public List<ChunkMetadata> readChunkMetaDataList(TimeseriesMetadata timeseriesMetaData)
2043
      throws IOException {
2044
    return timeseriesMetaData.getChunkMetadataList().stream()
1✔
2045
        .map(chunkMetadata -> (ChunkMetadata) chunkMetadata)
1✔
2046
        .collect(Collectors.toList());
1✔
2047
  }
2048

2049
  // This method is only used for TsFile
2050
  public List<IChunkMetadata> readIChunkMetaDataList(ITimeSeriesMetadata timeseriesMetaData) {
2051
    if (timeseriesMetaData instanceof AlignedTimeSeriesMetadata) {
1✔
2052
      return new ArrayList<>(
1✔
2053
          ((AlignedTimeSeriesMetadata) timeseriesMetaData).getChunkMetadataList());
1✔
2054
    } else {
2055
      return new ArrayList<>(((TimeseriesMetadata) timeseriesMetaData).getChunkMetadataList());
1✔
2056
    }
2057
  }
2058

2059
  /**
2060
   * get all measurements in this file
2061
   *
2062
   * @return measurement -> datatype
2063
   */
2064
  public Map<String, TSDataType> getAllMeasurements() throws IOException {
2065
    Map<String, TSDataType> result = new HashMap<>();
×
2066
    for (String device : getAllDevices()) {
×
2067
      Map<String, TimeseriesMetadata> timeseriesMetadataMap = readDeviceMetadata(device);
×
2068
      for (TimeseriesMetadata timeseriesMetadata : timeseriesMetadataMap.values()) {
×
2069
        result.put(timeseriesMetadata.getMeasurementId(), timeseriesMetadata.getTsDataType());
×
2070
      }
×
2071
    }
×
2072
    return result;
×
2073
  }
2074

2075
  /**
2076
   * get all types of measurements in this file
2077
   *
2078
   * @return full path -> datatype
2079
   */
2080
  public Map<String, TSDataType> getFullPathDataTypeMap() throws IOException {
2081
    final Map<String, TSDataType> result = new HashMap<>();
1✔
2082
    for (final String device : getAllDevices()) {
1✔
2083
      Map<String, TimeseriesMetadata> timeseriesMetadataMap = readDeviceMetadata(device);
1✔
2084
      for (TimeseriesMetadata timeseriesMetadata : timeseriesMetadataMap.values()) {
1✔
2085
        result.put(
1✔
2086
            device + TsFileConstant.PATH_SEPARATOR + timeseriesMetadata.getMeasurementId(),
1✔
2087
            timeseriesMetadata.getTsDataType());
1✔
2088
      }
1✔
2089
    }
1✔
2090
    return result;
1✔
2091
  }
2092

2093
  public Map<String, List<String>> getDeviceMeasurementsMap() throws IOException {
2094
    Map<String, List<String>> result = new HashMap<>();
1✔
2095
    for (String device : getAllDevices()) {
1✔
2096
      Map<String, TimeseriesMetadata> timeseriesMetadataMap = readDeviceMetadata(device);
1✔
2097
      for (TimeseriesMetadata timeseriesMetadata : timeseriesMetadataMap.values()) {
1✔
2098
        result
1✔
2099
            .computeIfAbsent(device, d -> new ArrayList<>())
1✔
2100
            .add(timeseriesMetadata.getMeasurementId());
1✔
2101
      }
1✔
2102
    }
1✔
2103
    return result;
1✔
2104
  }
2105

2106
  /**
2107
   * get device names which has valid chunks in [start, end)
2108
   *
2109
   * @param start start of the partition
2110
   * @param end end of the partition
2111
   * @return device names in range
2112
   */
2113
  public List<String> getDeviceNameInRange(long start, long end) throws IOException {
2114
    List<String> res = new ArrayList<>();
×
2115
    for (String device : getAllDevices()) {
×
2116
      Map<String, List<ChunkMetadata>> seriesMetadataMap = readChunkMetadataInDevice(device);
×
2117
      if (hasDataInPartition(seriesMetadataMap, start, end)) {
×
2118
        res.add(device);
×
2119
      }
2120
    }
×
2121
    return res;
×
2122
  }
2123

2124
  /**
2125
   * get metadata index node
2126
   *
2127
   * @param startOffset start read offset
2128
   * @param endOffset end read offset
2129
   * @return MetadataIndexNode
2130
   */
2131
  public MetadataIndexNode getMetadataIndexNode(long startOffset, long endOffset)
2132
      throws IOException {
2133
    return MetadataIndexNode.deserializeFrom(readData(startOffset, endOffset));
1✔
2134
  }
2135

2136
  /**
2137
   * Check if the device has at least one Chunk in this partition
2138
   *
2139
   * @param seriesMetadataMap chunkMetaDataList of each measurement
2140
   * @param start the start position of the space partition
2141
   * @param end the end position of the space partition
2142
   */
2143
  private boolean hasDataInPartition(
2144
      Map<String, List<ChunkMetadata>> seriesMetadataMap, long start, long end) {
2145
    for (List<ChunkMetadata> chunkMetadataList : seriesMetadataMap.values()) {
×
2146
      for (ChunkMetadata chunkMetadata : chunkMetadataList) {
×
2147
        LocateStatus location =
×
2148
            MetadataQuerierByFileImpl.checkLocateStatus(chunkMetadata, start, end);
×
2149
        if (location == LocateStatus.IN) {
×
2150
          return true;
×
2151
        }
2152
      }
×
2153
    }
×
2154
    return false;
×
2155
  }
2156

2157
  /**
2158
   * The location of a chunkGroupMetaData with respect to a space partition constraint.
2159
   *
2160
   * <p>in - the middle point of the chunkGroupMetaData is located in the current space partition.
2161
   * before - the middle point of the chunkGroupMetaData is located before the current space
2162
   * partition. after - the middle point of the chunkGroupMetaData is located after the current
2163
   * space partition.
2164
   */
2165
  public enum LocateStatus {
1✔
2166
    IN,
1✔
2167
    BEFORE,
1✔
2168
    AFTER
1✔
2169
  }
2170

2171
  public long getMinPlanIndex() {
2172
    return minPlanIndex;
1✔
2173
  }
2174

2175
  public long getMaxPlanIndex() {
2176
    return maxPlanIndex;
1✔
2177
  }
2178

2179
  /**
2180
   * @return An iterator of linked hashmaps ( measurement -> chunk metadata list ). When traversing
2181
   *     the linked hashmap, you will get chunk metadata lists according to the lexicographic order
2182
   *     of the measurements. The first measurement of the linked hashmap of each iteration is
2183
   *     always larger than the last measurement of the linked hashmap of the previous iteration in
2184
   *     lexicographic order.
2185
   */
2186
  public Iterator<Map<String, List<ChunkMetadata>>> getMeasurementChunkMetadataListMapIterator(
2187
      String device) throws IOException {
2188
    readFileMetadata();
1✔
2189

2190
    MetadataIndexNode metadataIndexNode = tsFileMetaData.getMetadataIndex();
1✔
2191
    Pair<MetadataIndexEntry, Long> metadataIndexPair =
1✔
2192
        getMetadataAndEndOffset(metadataIndexNode, device, true, true);
1✔
2193

2194
    if (metadataIndexPair == null) {
1✔
2195
      return new Iterator<Map<String, List<ChunkMetadata>>>() {
1✔
2196

2197
        @Override
2198
        public boolean hasNext() {
2199
          return false;
1✔
2200
        }
2201

2202
        @Override
2203
        public LinkedHashMap<String, List<ChunkMetadata>> next() {
2204
          throw new NoSuchElementException();
×
2205
        }
2206
      };
2207
    }
2208

2209
    Queue<Pair<Long, Long>> queue = new LinkedList<>();
1✔
2210
    ByteBuffer buffer = readData(metadataIndexPair.left.getOffset(), metadataIndexPair.right);
1✔
2211
    collectEachLeafMeasurementNodeOffsetRange(buffer, queue);
1✔
2212

2213
    return new Iterator<Map<String, List<ChunkMetadata>>>() {
1✔
2214

2215
      @Override
2216
      public boolean hasNext() {
2217
        return !queue.isEmpty();
1✔
2218
      }
2219

2220
      @Override
2221
      public LinkedHashMap<String, List<ChunkMetadata>> next() {
2222
        if (!hasNext()) {
1✔
2223
          throw new NoSuchElementException();
×
2224
        }
2225
        Pair<Long, Long> startEndPair = queue.remove();
1✔
2226
        LinkedHashMap<String, List<ChunkMetadata>> measurementChunkMetadataList =
1✔
2227
            new LinkedHashMap<>();
2228
        try {
2229
          List<TimeseriesMetadata> timeseriesMetadataList = new ArrayList<>();
1✔
2230
          ByteBuffer nextBuffer = readData(startEndPair.left, startEndPair.right);
1✔
2231
          while (nextBuffer.hasRemaining()) {
1✔
2232
            timeseriesMetadataList.add(TimeseriesMetadata.deserializeFrom(nextBuffer, true));
1✔
2233
          }
2234
          for (TimeseriesMetadata timeseriesMetadata : timeseriesMetadataList) {
1✔
2235
            List<ChunkMetadata> list =
1✔
2236
                measurementChunkMetadataList.computeIfAbsent(
1✔
2237
                    timeseriesMetadata.getMeasurementId(), m -> new ArrayList<>());
1✔
2238
            for (IChunkMetadata chunkMetadata : timeseriesMetadata.getChunkMetadataList()) {
1✔
2239
              list.add((ChunkMetadata) chunkMetadata);
1✔
2240
            }
1✔
2241
          }
1✔
2242
          return measurementChunkMetadataList;
1✔
2243
        } catch (IOException e) {
×
2244
          throw new TsFileRuntimeException(
×
2245
              "Error occurred while reading a time series metadata block.");
2246
        }
2247
      }
2248
    };
2249
  }
2250

2251
  private void collectEachLeafMeasurementNodeOffsetRange(
2252
      ByteBuffer buffer, Queue<Pair<Long, Long>> queue) throws IOException {
2253
    try {
2254
      final MetadataIndexNode metadataIndexNode = MetadataIndexNode.deserializeFrom(buffer);
1✔
2255
      final MetadataIndexNodeType metadataIndexNodeType = metadataIndexNode.getNodeType();
1✔
2256
      final int metadataIndexListSize = metadataIndexNode.getChildren().size();
1✔
2257
      for (int i = 0; i < metadataIndexListSize; ++i) {
1✔
2258
        long startOffset = metadataIndexNode.getChildren().get(i).getOffset();
1✔
2259
        long endOffset = metadataIndexNode.getEndOffset();
1✔
2260
        if (i != metadataIndexListSize - 1) {
1✔
2261
          endOffset = metadataIndexNode.getChildren().get(i + 1).getOffset();
1✔
2262
        }
2263
        if (metadataIndexNodeType.equals(MetadataIndexNodeType.LEAF_MEASUREMENT)) {
1✔
2264
          queue.add(new Pair<>(startOffset, endOffset));
1✔
2265
          continue;
1✔
2266
        }
2267
        collectEachLeafMeasurementNodeOffsetRange(readData(startOffset, endOffset), queue);
1✔
2268
      }
2269
    } catch (Exception e) {
×
2270
      logger.error(
×
2271
          "Error occurred while collecting offset ranges of measurement nodes of file {}", file);
2272
      throw e;
×
2273
    }
1✔
2274
  }
1✔
2275

2276
  /**
2277
   * Read MetadataIndexNode by start and end offset.
2278
   *
2279
   * @param start the start offset of the MetadataIndexNode
2280
   * @param end the end offset of the MetadataIndexNode
2281
   * @return MetadataIndexNode
2282
   * @throws IOException IOException
2283
   */
2284
  public MetadataIndexNode readMetadataIndexNode(long start, long end) throws IOException {
2285
    return MetadataIndexNode.deserializeFrom(readData(start, end));
1✔
2286
  }
2287

2288
  @Override
2289
  public boolean equals(Object o) {
2290
    if (this == o) {
×
2291
      return true;
×
2292
    }
2293
    if (o == null || getClass() != o.getClass()) {
×
2294
      return false;
×
2295
    }
2296
    TsFileSequenceReader reader = (TsFileSequenceReader) o;
×
2297
    return file.equals(reader.file);
×
2298
  }
2299

2300
  @Override
2301
  public int hashCode() {
2302
    return Objects.hash(file);
1✔
2303
  }
2304
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc