• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

apache / iotdb / #9708

pending completion
#9708

push

travis_ci

web-flow
[To rel/1.2] Enhance warning messages for ConfigNodeClient (#10722)

Signed-off-by: OneSizeFitQuorum <tanxinyu@apache.org>

1 of 1 new or added line in 1 file covered. (100.0%)

79235 of 165170 relevant lines covered (47.97%)

0.48 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

78.18
/iotdb-core/tsfile/src/main/java/org/apache/iotdb/tsfile/read/TsFileSequenceReader.java
1
/*
2
 * Licensed to the Apache Software Foundation (ASF) under one
3
 * or more contributor license agreements.  See the NOTICE file
4
 * distributed with this work for additional information
5
 * regarding copyright ownership.  The ASF licenses this file
6
 * to you under the Apache License, Version 2.0 (the
7
 * "License"); you may not use this file except in compliance
8
 * with the License.  You may obtain a copy of the License at
9
 * <p>
10
 * http://www.apache.org/licenses/LICENSE-2.0
11
 * <p>
12
 * Unless required by applicable law or agreed to in writing,
13
 * software distributed under the License is distributed on an
14
 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15
 * KIND, either express or implied.  See the License for the
16
 * specific language governing permissions and limitations
17
 * under the License.
18
 */
19

20
package org.apache.iotdb.tsfile.read;
21

22
import org.apache.iotdb.tsfile.common.conf.TSFileConfig;
23
import org.apache.iotdb.tsfile.common.conf.TSFileDescriptor;
24
import org.apache.iotdb.tsfile.common.constant.TsFileConstant;
25
import org.apache.iotdb.tsfile.compress.IUnCompressor;
26
import org.apache.iotdb.tsfile.encoding.decoder.Decoder;
27
import org.apache.iotdb.tsfile.exception.TsFileRuntimeException;
28
import org.apache.iotdb.tsfile.exception.TsFileStatisticsMistakesException;
29
import org.apache.iotdb.tsfile.file.MetaMarker;
30
import org.apache.iotdb.tsfile.file.header.ChunkGroupHeader;
31
import org.apache.iotdb.tsfile.file.header.ChunkHeader;
32
import org.apache.iotdb.tsfile.file.header.PageHeader;
33
import org.apache.iotdb.tsfile.file.metadata.AlignedChunkMetadata;
34
import org.apache.iotdb.tsfile.file.metadata.AlignedTimeSeriesMetadata;
35
import org.apache.iotdb.tsfile.file.metadata.ChunkGroupMetadata;
36
import org.apache.iotdb.tsfile.file.metadata.ChunkMetadata;
37
import org.apache.iotdb.tsfile.file.metadata.IChunkMetadata;
38
import org.apache.iotdb.tsfile.file.metadata.ITimeSeriesMetadata;
39
import org.apache.iotdb.tsfile.file.metadata.MetadataIndexEntry;
40
import org.apache.iotdb.tsfile.file.metadata.MetadataIndexNode;
41
import org.apache.iotdb.tsfile.file.metadata.TimeseriesMetadata;
42
import org.apache.iotdb.tsfile.file.metadata.TsFileMetadata;
43
import org.apache.iotdb.tsfile.file.metadata.enums.CompressionType;
44
import org.apache.iotdb.tsfile.file.metadata.enums.MetadataIndexNodeType;
45
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
46
import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding;
47
import org.apache.iotdb.tsfile.file.metadata.statistics.Statistics;
48
import org.apache.iotdb.tsfile.fileSystem.FSFactoryProducer;
49
import org.apache.iotdb.tsfile.read.common.BatchData;
50
import org.apache.iotdb.tsfile.read.common.Chunk;
51
import org.apache.iotdb.tsfile.read.common.Path;
52
import org.apache.iotdb.tsfile.read.controller.CachedChunkLoaderImpl;
53
import org.apache.iotdb.tsfile.read.controller.MetadataQuerierByFileImpl;
54
import org.apache.iotdb.tsfile.read.reader.TsFileInput;
55
import org.apache.iotdb.tsfile.read.reader.page.PageReader;
56
import org.apache.iotdb.tsfile.read.reader.page.TimePageReader;
57
import org.apache.iotdb.tsfile.read.reader.page.ValuePageReader;
58
import org.apache.iotdb.tsfile.utils.BloomFilter;
59
import org.apache.iotdb.tsfile.utils.Pair;
60
import org.apache.iotdb.tsfile.utils.ReadWriteForEncodingUtils;
61
import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
62
import org.apache.iotdb.tsfile.utils.TsPrimitiveType;
63
import org.apache.iotdb.tsfile.write.schema.IMeasurementSchema;
64
import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
65

66
import org.slf4j.Logger;
67
import org.slf4j.LoggerFactory;
68

69
import java.io.File;
70
import java.io.IOException;
71
import java.io.Serializable;
72
import java.nio.ByteBuffer;
73
import java.util.ArrayList;
74
import java.util.Collections;
75
import java.util.Comparator;
76
import java.util.HashMap;
77
import java.util.HashSet;
78
import java.util.Iterator;
79
import java.util.LinkedHashMap;
80
import java.util.LinkedList;
81
import java.util.List;
82
import java.util.Map;
83
import java.util.NoSuchElementException;
84
import java.util.Objects;
85
import java.util.Queue;
86
import java.util.Set;
87
import java.util.TreeMap;
88
import java.util.concurrent.ConcurrentHashMap;
89
import java.util.concurrent.locks.ReadWriteLock;
90
import java.util.concurrent.locks.ReentrantReadWriteLock;
91
import java.util.stream.Collectors;
92

93
public class TsFileSequenceReader implements AutoCloseable {
94

95
  private static final Logger logger = LoggerFactory.getLogger(TsFileSequenceReader.class);
1✔
96
  private static final Logger resourceLogger = LoggerFactory.getLogger("FileMonitor");
1✔
97
  protected static final TSFileConfig config = TSFileDescriptor.getInstance().getConfig();
1✔
98
  private static final String METADATA_INDEX_NODE_DESERIALIZE_ERROR =
99
      "Something error happened while deserializing MetadataIndexNode of file {}";
100
  private static final int MAX_READ_BUFFER_SIZE = 4 * 1024 * 1024;
101
  protected String file;
102
  protected TsFileInput tsFileInput;
103
  protected long fileMetadataPos;
104
  protected int fileMetadataSize;
105
  private ByteBuffer markerBuffer = ByteBuffer.allocate(Byte.BYTES);
1✔
106

107
  @SuppressWarnings("squid:S3077")
108
  protected volatile TsFileMetadata tsFileMetaData;
109

110
  // device -> measurement -> TimeseriesMetadata
111
  private Map<String, Map<String, TimeseriesMetadata>> cachedDeviceMetadata =
1✔
112
      new ConcurrentHashMap<>();
113
  private static final ReadWriteLock cacheLock = new ReentrantReadWriteLock();
1✔
114
  private boolean cacheDeviceMetadata;
115
  private long minPlanIndex = Long.MAX_VALUE;
1✔
116
  private long maxPlanIndex = Long.MIN_VALUE;
1✔
117

118
  /**
119
   * Create a file reader of the given file. The reader will read the tail of the file to get the
120
   * file metadata size.Then the reader will skip the first
121
   * TSFileConfig.MAGIC_STRING.getBytes().length + TSFileConfig.NUMBER_VERSION.getBytes().length
122
   * bytes of the file for preparing reading real data.
123
   *
124
   * @param file the data file
125
   * @throws IOException If some I/O error occurs
126
   */
127
  public TsFileSequenceReader(String file) throws IOException {
128
    this(file, true);
1✔
129
  }
1✔
130

131
  /**
132
   * construct function for TsFileSequenceReader.
133
   *
134
   * @param file -given file name
135
   * @param loadMetadataSize -whether load meta data size
136
   */
137
  public TsFileSequenceReader(String file, boolean loadMetadataSize) throws IOException {
1✔
138
    if (resourceLogger.isDebugEnabled()) {
1✔
139
      resourceLogger.debug("{} reader is opened. {}", file, getClass().getName());
×
140
    }
141
    this.file = file;
1✔
142
    tsFileInput = FSFactoryProducer.getFileInputFactory().getTsFileInput(file);
1✔
143
    try {
144
      if (loadMetadataSize) {
1✔
145
        loadMetadataSize();
1✔
146
      }
147
    } catch (Throwable e) {
×
148
      tsFileInput.close();
×
149
      throw e;
×
150
    }
1✔
151
  }
1✔
152

153
  // used in merge resource
154
  public TsFileSequenceReader(String file, boolean loadMetadata, boolean cacheDeviceMetadata)
155
      throws IOException {
156
    this(file, loadMetadata);
1✔
157
    this.cacheDeviceMetadata = cacheDeviceMetadata;
1✔
158
  }
1✔
159

160
  /**
161
   * Create a file reader of the given file. The reader will read the tail of the file to get the
162
   * file metadata size.Then the reader will skip the first
163
   * TSFileConfig.MAGIC_STRING.getBytes().length + TSFileConfig.NUMBER_VERSION.getBytes().length
164
   * bytes of the file for preparing reading real data.
165
   *
166
   * @param input given input
167
   */
168
  public TsFileSequenceReader(TsFileInput input) throws IOException {
169
    this(input, true);
×
170
  }
×
171

172
  /**
173
   * construct function for TsFileSequenceReader.
174
   *
175
   * @param input -given input
176
   * @param loadMetadataSize -load meta data size
177
   */
178
  public TsFileSequenceReader(TsFileInput input, boolean loadMetadataSize) throws IOException {
×
179
    this.tsFileInput = input;
×
180
    this.file = input.getFilePath();
×
181
    try {
182
      if (loadMetadataSize) { // NOTE no autoRepair here
×
183
        loadMetadataSize();
×
184
      }
185
    } catch (Throwable e) {
×
186
      tsFileInput.close();
×
187
      throw e;
×
188
    }
×
189
  }
×
190

191
  /**
192
   * construct function for TsFileSequenceReader.
193
   *
194
   * @param input the input of a tsfile. The current position should be a marker and then a chunk
195
   *     Header, rather than the magic number
196
   * @param fileMetadataPos the position of the file metadata in the TsFileInput from the beginning
197
   *     of the input to the current position
198
   * @param fileMetadataSize the byte size of the file metadata in the input
199
   */
200
  public TsFileSequenceReader(TsFileInput input, long fileMetadataPos, int fileMetadataSize) {
×
201
    this.tsFileInput = input;
×
202
    this.fileMetadataPos = fileMetadataPos;
×
203
    this.fileMetadataSize = fileMetadataSize;
×
204
  }
×
205

206
  public void loadMetadataSize() throws IOException {
207
    ByteBuffer metadataSize = ByteBuffer.allocate(Integer.BYTES);
1✔
208
    if (readTailMagic().equals(TSFileConfig.MAGIC_STRING)) {
1✔
209
      tsFileInput.read(
1✔
210
          metadataSize,
211
          tsFileInput.size() - TSFileConfig.MAGIC_STRING.getBytes().length - Integer.BYTES);
1✔
212
      metadataSize.flip();
1✔
213
      // read file metadata size and position
214
      fileMetadataSize = ReadWriteIOUtils.readInt(metadataSize);
1✔
215
      fileMetadataPos =
1✔
216
          tsFileInput.size()
1✔
217
              - TSFileConfig.MAGIC_STRING.getBytes().length
1✔
218
              - Integer.BYTES
219
              - fileMetadataSize;
220
    }
221
  }
1✔
222

223
  public long getFileMetadataPos() {
224
    return fileMetadataPos;
1✔
225
  }
226

227
  public int getTsFileMetadataSize() {
228
    return fileMetadataSize;
×
229
  }
230

231
  /**
232
   * Return the whole meta data size of this tsfile, including ChunkMetadata, TimeseriesMetadata and
233
   * etc.
234
   */
235
  public long getFileMetadataSize() throws IOException {
236
    return tsFileInput.size() - getFileMetadataPos();
1✔
237
  }
238

239
  /** this function does not modify the position of the file reader. */
240
  public String readTailMagic() throws IOException {
241
    long totalSize = tsFileInput.size();
1✔
242
    ByteBuffer magicStringBytes = ByteBuffer.allocate(TSFileConfig.MAGIC_STRING.getBytes().length);
1✔
243
    tsFileInput.read(magicStringBytes, totalSize - TSFileConfig.MAGIC_STRING.getBytes().length);
1✔
244
    magicStringBytes.flip();
1✔
245
    return new String(magicStringBytes.array());
1✔
246
  }
247

248
  /** whether the file is a complete TsFile: only if the head magic and tail magic string exists. */
249
  public boolean isComplete() throws IOException {
250
    long size = tsFileInput.size();
1✔
251
    // TSFileConfig.MAGIC_STRING.getBytes().length * 2 for two magic string
252
    // Byte.BYTES for the file version number
253
    if (size >= TSFileConfig.MAGIC_STRING.getBytes().length * 2 + Byte.BYTES) {
1✔
254
      String tailMagic = readTailMagic();
1✔
255
      String headMagic = readHeadMagic();
1✔
256
      return tailMagic.equals(headMagic);
1✔
257
    } else {
258
      return false;
1✔
259
    }
260
  }
261

262
  /** this function does not modify the position of the file reader. */
263
  public String readHeadMagic() throws IOException {
264
    ByteBuffer magicStringBytes = ByteBuffer.allocate(TSFileConfig.MAGIC_STRING.getBytes().length);
1✔
265
    tsFileInput.read(magicStringBytes, 0);
1✔
266
    magicStringBytes.flip();
1✔
267
    return new String(magicStringBytes.array());
1✔
268
  }
269

270
  /** this function reads version number and checks compatibility of TsFile. */
271
  public byte readVersionNumber() throws IOException {
272
    ByteBuffer versionNumberByte = ByteBuffer.allocate(Byte.BYTES);
1✔
273
    tsFileInput.read(versionNumberByte, TSFileConfig.MAGIC_STRING.getBytes().length);
1✔
274
    versionNumberByte.flip();
1✔
275
    return versionNumberByte.get();
1✔
276
  }
277

278
  /**
279
   * this function does not modify the position of the file reader.
280
   *
281
   * @throws IOException io error
282
   */
283
  public TsFileMetadata readFileMetadata() throws IOException {
284
    try {
285
      if (tsFileMetaData == null) {
1✔
286
        synchronized (this) {
1✔
287
          if (tsFileMetaData == null) {
1✔
288
            tsFileMetaData =
1✔
289
                TsFileMetadata.deserializeFrom(readData(fileMetadataPos, fileMetadataSize));
1✔
290
          }
291
        }
1✔
292
      }
293
    } catch (Exception e) {
1✔
294
      logger.error("Something error happened while reading file metadata of file {}", file);
1✔
295
      throw e;
1✔
296
    }
1✔
297
    return tsFileMetaData;
1✔
298
  }
299

300
  /**
301
   * this function does not modify the position of the file reader.
302
   *
303
   * @throws IOException io error
304
   */
305
  public BloomFilter readBloomFilter() throws IOException {
306
    readFileMetadata();
1✔
307
    return tsFileMetaData.getBloomFilter();
1✔
308
  }
309

310
  /**
311
   * this function reads measurements and TimeseriesMetaDatas in given device Thread Safe
312
   *
313
   * @param device name
314
   * @return the map measurementId -> TimeseriesMetaData in one device
315
   * @throws IOException io error
316
   */
317
  public Map<String, TimeseriesMetadata> readDeviceMetadata(String device) throws IOException {
318
    if (!cacheDeviceMetadata) {
1✔
319
      return readDeviceMetadataFromDisk(device);
1✔
320
    }
321

322
    cacheLock.readLock().lock();
×
323
    try {
324
      if (cachedDeviceMetadata.containsKey(device)) {
×
325
        return cachedDeviceMetadata.get(device);
×
326
      }
327
    } finally {
328
      cacheLock.readLock().unlock();
×
329
    }
330

331
    cacheLock.writeLock().lock();
×
332
    try {
333
      if (cachedDeviceMetadata.containsKey(device)) {
×
334
        return cachedDeviceMetadata.get(device);
×
335
      }
336
      readFileMetadata();
×
337
      Map<String, TimeseriesMetadata> deviceMetadata = readDeviceMetadataFromDisk(device);
×
338
      cachedDeviceMetadata.put(device, deviceMetadata);
×
339
      return deviceMetadata;
×
340
    } finally {
341
      cacheLock.writeLock().unlock();
×
342
    }
343
  }
344

345
  private Map<String, TimeseriesMetadata> readDeviceMetadataFromDisk(String device)
346
      throws IOException {
347
    readFileMetadata();
1✔
348
    List<TimeseriesMetadata> timeseriesMetadataList =
1✔
349
        getDeviceTimeseriesMetadataWithoutChunkMetadata(device);
1✔
350
    Map<String, TimeseriesMetadata> deviceMetadata = new HashMap<>();
1✔
351
    for (TimeseriesMetadata timeseriesMetadata : timeseriesMetadataList) {
1✔
352
      deviceMetadata.put(timeseriesMetadata.getMeasurementId(), timeseriesMetadata);
1✔
353
    }
1✔
354
    return deviceMetadata;
1✔
355
  }
356

357
  /** @deprecated Use {@link #readTimeseriesMetadata(String, String, boolean)} instead. */
358
  @Deprecated
359
  @SuppressWarnings("java:S1133") // suppress warn of deprecation
360
  public TimeseriesMetadata readTimeseriesMetadata(Path path, boolean ignoreNotExists)
361
      throws IOException {
362
    return readTimeseriesMetadata(path.getDevice(), path.getMeasurement(), ignoreNotExists);
×
363
  }
364

365
  public TimeseriesMetadata readTimeseriesMetadata(
366
      String device, String measurement, boolean ignoreNotExists) throws IOException {
367
    readFileMetadata();
1✔
368
    MetadataIndexNode deviceMetadataIndexNode = tsFileMetaData.getMetadataIndex();
1✔
369
    Pair<MetadataIndexEntry, Long> metadataIndexPair =
1✔
370
        getMetadataAndEndOffset(deviceMetadataIndexNode, device, true, true);
1✔
371
    if (metadataIndexPair == null) {
1✔
372
      if (ignoreNotExists) {
×
373
        return null;
×
374
      }
375
      throw new IOException("Device {" + device + "} is not in tsFileMetaData");
×
376
    }
377
    ByteBuffer buffer = readData(metadataIndexPair.left.getOffset(), metadataIndexPair.right);
1✔
378
    MetadataIndexNode metadataIndexNode = deviceMetadataIndexNode;
1✔
379
    if (!metadataIndexNode.getNodeType().equals(MetadataIndexNodeType.LEAF_MEASUREMENT)) {
1✔
380
      try {
381
        metadataIndexNode = MetadataIndexNode.deserializeFrom(buffer);
1✔
382
      } catch (Exception e) {
×
383
        logger.error(METADATA_INDEX_NODE_DESERIALIZE_ERROR, file);
×
384
        throw e;
×
385
      }
1✔
386
      metadataIndexPair = getMetadataAndEndOffset(metadataIndexNode, measurement, false, false);
1✔
387
    }
388
    if (metadataIndexPair == null) {
1✔
389
      return null;
×
390
    }
391
    List<TimeseriesMetadata> timeseriesMetadataList = new ArrayList<>();
1✔
392
    buffer = readData(metadataIndexPair.left.getOffset(), metadataIndexPair.right);
1✔
393
    while (buffer.hasRemaining()) {
1✔
394
      try {
395
        timeseriesMetadataList.add(TimeseriesMetadata.deserializeFrom(buffer, true));
1✔
396
      } catch (Exception e) {
×
397
        logger.error(
×
398
            "Something error happened while deserializing TimeseriesMetadata of file {}", file);
399
        throw e;
×
400
      }
1✔
401
    }
402
    // return null if path does not exist in the TsFile
403
    int searchResult = binarySearchInTimeseriesMetadataList(timeseriesMetadataList, measurement);
1✔
404
    return searchResult >= 0 ? timeseriesMetadataList.get(searchResult) : null;
1✔
405
  }
406

407
  // This method is only used for TsFile
408
  public ITimeSeriesMetadata readITimeseriesMetadata(Path path, boolean ignoreNotExists)
409
      throws IOException {
410
    readFileMetadata();
1✔
411
    MetadataIndexNode deviceMetadataIndexNode = tsFileMetaData.getMetadataIndex();
1✔
412
    Pair<MetadataIndexEntry, Long> metadataIndexPair =
1✔
413
        getMetadataAndEndOffset(deviceMetadataIndexNode, path.getDevice(), true, true);
1✔
414
    if (metadataIndexPair == null) {
1✔
415
      if (ignoreNotExists) {
×
416
        return null;
×
417
      }
418
      throw new IOException("Device {" + path.getDevice() + "} is not in tsFileMetaData");
×
419
    }
420
    ByteBuffer buffer = readData(metadataIndexPair.left.getOffset(), metadataIndexPair.right);
1✔
421
    MetadataIndexNode metadataIndexNode;
422
    TimeseriesMetadata firstTimeseriesMetadata;
423
    try {
424
      // next layer MeasurementNode of the specific DeviceNode
425
      metadataIndexNode = MetadataIndexNode.deserializeFrom(buffer);
1✔
426
    } catch (Exception e) {
×
427
      logger.error(METADATA_INDEX_NODE_DESERIALIZE_ERROR, file);
×
428
      throw e;
×
429
    }
1✔
430
    firstTimeseriesMetadata = tryToGetFirstTimeseriesMetadata(metadataIndexNode);
1✔
431
    metadataIndexPair =
1✔
432
        getMetadataAndEndOffset(metadataIndexNode, path.getMeasurement(), false, false);
1✔
433

434
    if (metadataIndexPair == null) {
1✔
435
      return null;
×
436
    }
437
    List<TimeseriesMetadata> timeseriesMetadataList = new ArrayList<>();
1✔
438
    buffer = readData(metadataIndexPair.left.getOffset(), metadataIndexPair.right);
1✔
439
    while (buffer.hasRemaining()) {
1✔
440
      try {
441
        timeseriesMetadataList.add(TimeseriesMetadata.deserializeFrom(buffer, true));
1✔
442
      } catch (Exception e) {
×
443
        logger.error(
×
444
            "Something error happened while deserializing TimeseriesMetadata of file {}", file);
445
        throw e;
×
446
      }
1✔
447
    }
448
    // return null if path does not exist in the TsFile
449
    int searchResult =
1✔
450
        binarySearchInTimeseriesMetadataList(timeseriesMetadataList, path.getMeasurement());
1✔
451
    if (searchResult >= 0) {
1✔
452
      if (firstTimeseriesMetadata != null) {
1✔
453
        List<TimeseriesMetadata> valueTimeseriesMetadataList = new ArrayList<>();
1✔
454
        valueTimeseriesMetadataList.add(timeseriesMetadataList.get(searchResult));
1✔
455
        return new AlignedTimeSeriesMetadata(firstTimeseriesMetadata, valueTimeseriesMetadataList);
1✔
456
      } else {
457
        return timeseriesMetadataList.get(searchResult);
1✔
458
      }
459
    } else {
460
      return null;
1✔
461
    }
462
  }
463

464
  /* Find the leaf node that contains path, return all the sensors in that leaf node which are also in allSensors set */
465
  public List<TimeseriesMetadata> readTimeseriesMetadata(
466
      String device, String measurement, Set<String> allSensors) throws IOException {
467
    Pair<MetadataIndexEntry, Long> metadataIndexPair =
1✔
468
        getLeafMetadataIndexPair(device, measurement);
1✔
469
    if (metadataIndexPair == null) {
1✔
470
      return Collections.emptyList();
×
471
    }
472
    List<TimeseriesMetadata> timeseriesMetadataList = new ArrayList<>();
1✔
473

474
    ByteBuffer buffer = readData(metadataIndexPair.left.getOffset(), metadataIndexPair.right);
1✔
475
    while (buffer.hasRemaining()) {
1✔
476
      TimeseriesMetadata timeseriesMetadata;
477
      try {
478
        timeseriesMetadata = TimeseriesMetadata.deserializeFrom(buffer, true);
1✔
479
      } catch (Exception e) {
×
480
        logger.error(
×
481
            "Something error happened while deserializing TimeseriesMetadata of file {}", file);
482
        throw e;
×
483
      }
1✔
484
      if (allSensors.contains(timeseriesMetadata.getMeasurementId())) {
1✔
485
        timeseriesMetadataList.add(timeseriesMetadata);
1✔
486
      }
487
    }
1✔
488
    return timeseriesMetadataList;
1✔
489
  }
490

491
  /* Get leaf MetadataIndexPair which contains path */
492
  private Pair<MetadataIndexEntry, Long> getLeafMetadataIndexPair(String device, String measurement)
493
      throws IOException {
494
    readFileMetadata();
1✔
495
    MetadataIndexNode deviceMetadataIndexNode = tsFileMetaData.getMetadataIndex();
1✔
496
    Pair<MetadataIndexEntry, Long> metadataIndexPair =
1✔
497
        getMetadataAndEndOffset(deviceMetadataIndexNode, device, true, true);
1✔
498
    if (metadataIndexPair == null) {
1✔
499
      return null;
×
500
    }
501
    ByteBuffer buffer = readData(metadataIndexPair.left.getOffset(), metadataIndexPair.right);
1✔
502
    MetadataIndexNode metadataIndexNode = deviceMetadataIndexNode;
1✔
503
    if (!metadataIndexNode.getNodeType().equals(MetadataIndexNodeType.LEAF_MEASUREMENT)) {
1✔
504
      try {
505
        metadataIndexNode = MetadataIndexNode.deserializeFrom(buffer);
1✔
506
      } catch (Exception e) {
×
507
        logger.error(METADATA_INDEX_NODE_DESERIALIZE_ERROR, file);
×
508
        throw e;
×
509
      }
1✔
510
      metadataIndexPair = getMetadataAndEndOffset(metadataIndexNode, measurement, false, false);
1✔
511
    }
512
    return metadataIndexPair;
1✔
513
  }
514

515
  // This method is only used for TsFile
516
  public List<ITimeSeriesMetadata> readITimeseriesMetadata(String device, Set<String> measurements)
517
      throws IOException {
518
    readFileMetadata();
1✔
519
    MetadataIndexNode deviceMetadataIndexNode = tsFileMetaData.getMetadataIndex();
1✔
520
    Pair<MetadataIndexEntry, Long> metadataIndexPair =
1✔
521
        getMetadataAndEndOffset(deviceMetadataIndexNode, device, true, false);
1✔
522
    if (metadataIndexPair == null) {
1✔
523
      return Collections.emptyList();
×
524
    }
525
    List<ITimeSeriesMetadata> resultTimeseriesMetadataList = new ArrayList<>();
1✔
526
    List<String> measurementList = new ArrayList<>(measurements);
1✔
527
    Set<String> measurementsHadFound = new HashSet<>();
1✔
528
    // the content of next Layer MeasurementNode of the specific device's DeviceNode
529
    ByteBuffer buffer = readData(metadataIndexPair.left.getOffset(), metadataIndexPair.right);
1✔
530
    Pair<MetadataIndexEntry, Long> measurementMetadataIndexPair = metadataIndexPair;
1✔
531
    List<TimeseriesMetadata> timeseriesMetadataList = new ArrayList<>();
1✔
532

533
    // next layer MeasurementNode of the specific DeviceNode
534
    MetadataIndexNode measurementMetadataIndexNode;
535
    try {
536
      measurementMetadataIndexNode = MetadataIndexNode.deserializeFrom(buffer);
1✔
537
    } catch (Exception e) {
×
538
      logger.error(METADATA_INDEX_NODE_DESERIALIZE_ERROR, file);
×
539
      throw e;
×
540
    }
1✔
541
    // Get the first timeseriesMetadata of the device
542
    TimeseriesMetadata firstTimeseriesMetadata =
1✔
543
        tryToGetFirstTimeseriesMetadata(measurementMetadataIndexNode);
1✔
544

545
    for (int i = 0; i < measurementList.size(); i++) {
1✔
546
      if (measurementsHadFound.contains(measurementList.get(i))) {
1✔
547
        continue;
×
548
      }
549
      timeseriesMetadataList.clear();
1✔
550
      measurementMetadataIndexPair =
1✔
551
          getMetadataAndEndOffset(
1✔
552
              measurementMetadataIndexNode, measurementList.get(i), false, false);
1✔
553

554
      if (measurementMetadataIndexPair == null) {
1✔
555
        continue;
×
556
      }
557
      // the content of TimeseriesNode of the specific MeasurementLeafNode
558
      buffer =
1✔
559
          readData(
1✔
560
              measurementMetadataIndexPair.left.getOffset(), measurementMetadataIndexPair.right);
1✔
561
      while (buffer.hasRemaining()) {
1✔
562
        try {
563
          timeseriesMetadataList.add(TimeseriesMetadata.deserializeFrom(buffer, true));
1✔
564
        } catch (Exception e) {
×
565
          logger.error(
×
566
              "Something error happened while deserializing TimeseriesMetadata of file {}", file);
567
          throw e;
×
568
        }
1✔
569
      }
570
      for (int j = i; j < measurementList.size(); j++) {
1✔
571
        String current = measurementList.get(j);
1✔
572
        if (!measurementsHadFound.contains(current)) {
1✔
573
          int searchResult = binarySearchInTimeseriesMetadataList(timeseriesMetadataList, current);
1✔
574
          if (searchResult >= 0) {
1✔
575
            if (firstTimeseriesMetadata != null) {
1✔
576
              List<TimeseriesMetadata> valueTimeseriesMetadataList = new ArrayList<>();
1✔
577
              valueTimeseriesMetadataList.add(timeseriesMetadataList.get(searchResult));
1✔
578
              resultTimeseriesMetadataList.add(
1✔
579
                  new AlignedTimeSeriesMetadata(
580
                      firstTimeseriesMetadata, valueTimeseriesMetadataList));
581
            } else {
1✔
582
              resultTimeseriesMetadataList.add(timeseriesMetadataList.get(searchResult));
1✔
583
            }
584
            measurementsHadFound.add(current);
1✔
585
          }
586
        }
587
        if (measurementsHadFound.size() == measurements.size()) {
1✔
588
          return resultTimeseriesMetadataList;
1✔
589
        }
590
      }
591
    }
592
    return resultTimeseriesMetadataList;
×
593
  }
594

595
  protected int binarySearchInTimeseriesMetadataList(
596
      List<TimeseriesMetadata> timeseriesMetadataList, String key) {
597
    int low = 0;
1✔
598
    int high = timeseriesMetadataList.size() - 1;
1✔
599

600
    while (low <= high) {
1✔
601
      int mid = (low + high) >>> 1;
1✔
602
      TimeseriesMetadata midVal = timeseriesMetadataList.get(mid);
1✔
603
      int cmp = midVal.getMeasurementId().compareTo(key);
1✔
604

605
      if (cmp < 0) {
1✔
606
        low = mid + 1;
1✔
607
      } else if (cmp > 0) {
1✔
608
        high = mid - 1;
1✔
609
      } else {
610
        return mid; // key found
1✔
611
      }
612
    }
1✔
613
    return -1; // key not found
1✔
614
  }
615

616
  public List<String> getAllDevices() throws IOException {
617
    if (tsFileMetaData == null) {
1✔
618
      readFileMetadata();
1✔
619
    }
620
    return getAllDevices(tsFileMetaData.getMetadataIndex());
1✔
621
  }
622

623
  private List<String> getAllDevices(MetadataIndexNode metadataIndexNode) throws IOException {
624
    List<String> deviceList = new ArrayList<>();
1✔
625
    // if metadataIndexNode is LEAF_DEVICE, put all devices in node entry into the list
626
    if (metadataIndexNode.getNodeType().equals(MetadataIndexNodeType.LEAF_DEVICE)) {
1✔
627
      deviceList.addAll(
1✔
628
          metadataIndexNode.getChildren().stream()
1✔
629
              .map(x -> x.getName().intern())
1✔
630
              .collect(Collectors.toList()));
1✔
631
      return deviceList;
1✔
632
    }
633

634
    int metadataIndexListSize = metadataIndexNode.getChildren().size();
1✔
635
    for (int i = 0; i < metadataIndexListSize; i++) {
1✔
636
      long endOffset = metadataIndexNode.getEndOffset();
1✔
637
      if (i != metadataIndexListSize - 1) {
1✔
638
        endOffset = metadataIndexNode.getChildren().get(i + 1).getOffset();
1✔
639
      }
640
      ByteBuffer buffer = readData(metadataIndexNode.getChildren().get(i).getOffset(), endOffset);
1✔
641
      MetadataIndexNode node = MetadataIndexNode.deserializeFrom(buffer);
1✔
642
      deviceList.addAll(getAllDevices(node));
1✔
643
    }
644
    return deviceList;
1✔
645
  }
646

647
  /**
648
   * @return an iterator of "device, isAligned" list, in which names of devices are ordered in
649
   *     dictionary order, and isAligned represents whether the device is aligned. Only read devices
650
   *     on one device leaf node each time to save memory.
651
   */
652
  public TsFileDeviceIterator getAllDevicesIteratorWithIsAligned() throws IOException {
653
    readFileMetadata();
1✔
654
    Queue<Pair<String, long[]>> queue = new LinkedList<>();
1✔
655
    List<long[]> leafDeviceNodeOffsets = new ArrayList<>();
1✔
656
    MetadataIndexNode metadataIndexNode = tsFileMetaData.getMetadataIndex();
1✔
657
    if (metadataIndexNode.getNodeType().equals(MetadataIndexNodeType.LEAF_DEVICE)) {
1✔
658
      // the first node of index tree is device leaf node, then get the devices directly
659
      getDevicesOfLeafNode(metadataIndexNode, queue);
1✔
660
    } else {
661
      // get all device leaf node offset
662
      getAllDeviceLeafNodeOffset(metadataIndexNode, leafDeviceNodeOffsets);
1✔
663
    }
664

665
    return new TsFileDeviceIterator(this, leafDeviceNodeOffsets, queue);
1✔
666
  }
667

668
  /**
669
   * Get devices and first measurement node offset.
670
   *
671
   * @param startOffset start offset of device leaf node
672
   * @param endOffset end offset of device leaf node
673
   * @param measurementNodeOffsetQueue device -> first measurement node offset
674
   */
675
  public void getDevicesAndEntriesOfOneLeafNode(
676
      Long startOffset, Long endOffset, Queue<Pair<String, long[]>> measurementNodeOffsetQueue)
677
      throws IOException {
678
    try {
679
      ByteBuffer nextBuffer = readData(startOffset, endOffset);
1✔
680
      MetadataIndexNode deviceLeafNode = MetadataIndexNode.deserializeFrom(nextBuffer);
1✔
681
      getDevicesOfLeafNode(deviceLeafNode, measurementNodeOffsetQueue);
1✔
682
    } catch (Exception e) {
×
683
      logger.error("Something error happened while getting all devices of file {}", file);
×
684
      throw e;
×
685
    }
1✔
686
  }
1✔
687

688
  /**
689
   * Get all devices and its corresponding entries on the specific device leaf node.
690
   *
691
   * @param deviceLeafNode this node must be device leaf node
692
   */
693
  private void getDevicesOfLeafNode(
694
      MetadataIndexNode deviceLeafNode, Queue<Pair<String, long[]>> measurementNodeOffsetQueue) {
695
    if (!deviceLeafNode.getNodeType().equals(MetadataIndexNodeType.LEAF_DEVICE)) {
1✔
696
      throw new IllegalStateException("the first param should be device leaf node.");
×
697
    }
698
    List<MetadataIndexEntry> childrenEntries = deviceLeafNode.getChildren();
1✔
699
    for (int i = 0; i < childrenEntries.size(); i++) {
1✔
700
      MetadataIndexEntry deviceEntry = childrenEntries.get(i);
1✔
701
      long childStartOffset = deviceEntry.getOffset();
1✔
702
      long childEndOffset =
703
          i == childrenEntries.size() - 1
1✔
704
              ? deviceLeafNode.getEndOffset()
1✔
705
              : childrenEntries.get(i + 1).getOffset();
1✔
706
      long[] offset = {childStartOffset, childEndOffset};
1✔
707
      measurementNodeOffsetQueue.add(new Pair<>(deviceEntry.getName(), offset));
1✔
708
    }
709
  }
1✔
710

711
  /**
712
   * Get the device leaf node offset under the specific device internal node.
713
   *
714
   * @param deviceInternalNode this node must be device internal node
715
   */
716
  private void getAllDeviceLeafNodeOffset(
717
      MetadataIndexNode deviceInternalNode, List<long[]> leafDeviceNodeOffsets) throws IOException {
718
    if (!deviceInternalNode.getNodeType().equals(MetadataIndexNodeType.INTERNAL_DEVICE)) {
1✔
719
      throw new IllegalStateException("the first param should be device internal node.");
×
720
    }
721
    try {
722
      int metadataIndexListSize = deviceInternalNode.getChildren().size();
1✔
723
      boolean isCurrentLayerLeafNode = false;
1✔
724
      for (int i = 0; i < metadataIndexListSize; i++) {
1✔
725
        MetadataIndexEntry entry = deviceInternalNode.getChildren().get(i);
1✔
726
        long startOffset = entry.getOffset();
1✔
727
        long endOffset = deviceInternalNode.getEndOffset();
1✔
728
        if (i != metadataIndexListSize - 1) {
1✔
729
          endOffset = deviceInternalNode.getChildren().get(i + 1).getOffset();
1✔
730
        }
731
        if (i == 0) {
1✔
732
          // check is current layer device leaf node or device internal node. Just need to check the
733
          // first entry, because the rest are the same
734
          MetadataIndexNodeType nodeType =
1✔
735
              MetadataIndexNodeType.deserialize(
1✔
736
                  ReadWriteIOUtils.readByte(readData(endOffset - 1, endOffset)));
1✔
737
          isCurrentLayerLeafNode = nodeType.equals(MetadataIndexNodeType.LEAF_DEVICE);
1✔
738
        }
739
        if (isCurrentLayerLeafNode) {
1✔
740
          // is device leaf node
741
          long[] offset = {startOffset, endOffset};
1✔
742
          leafDeviceNodeOffsets.add(offset);
1✔
743
          continue;
1✔
744
        }
745
        ByteBuffer nextBuffer = readData(startOffset, endOffset);
1✔
746
        getAllDeviceLeafNodeOffset(
1✔
747
            MetadataIndexNode.deserializeFrom(nextBuffer), leafDeviceNodeOffsets);
1✔
748
      }
749
    } catch (Exception e) {
×
750
      logger.error("Something error happened while getting all devices of file {}", file);
×
751
      throw e;
×
752
    }
1✔
753
  }
1✔
754

755
  /**
756
   * read all ChunkMetaDatas of given device
757
   *
758
   * @param device name
759
   * @return measurement -> ChunkMetadata list
760
   * @throws IOException io error
761
   */
762
  public Map<String, List<ChunkMetadata>> readChunkMetadataInDevice(String device)
763
      throws IOException {
764
    readFileMetadata();
1✔
765
    List<TimeseriesMetadata> timeseriesMetadataMap = getDeviceTimeseriesMetadata(device);
1✔
766
    if (timeseriesMetadataMap.isEmpty()) {
1✔
767
      return new HashMap<>();
1✔
768
    }
769
    Map<String, List<ChunkMetadata>> seriesMetadata = new LinkedHashMap<>();
1✔
770
    for (TimeseriesMetadata timeseriesMetadata : timeseriesMetadataMap) {
1✔
771
      seriesMetadata.put(
1✔
772
          timeseriesMetadata.getMeasurementId(),
1✔
773
          timeseriesMetadata.getChunkMetadataList().stream()
1✔
774
              .map(chunkMetadata -> ((ChunkMetadata) chunkMetadata))
1✔
775
              .collect(Collectors.toList()));
1✔
776
    }
1✔
777
    return seriesMetadata;
1✔
778
  }
779

780
  /**
781
   * this function return all timeseries names
782
   *
783
   * @return list of Paths
784
   * @throws IOException io error
785
   */
786
  public List<Path> getAllPaths() throws IOException {
787
    List<Path> paths = new ArrayList<>();
1✔
788
    for (String device : getAllDevices()) {
1✔
789
      Map<String, TimeseriesMetadata> timeseriesMetadataMap = readDeviceMetadata(device);
1✔
790
      for (String measurementId : timeseriesMetadataMap.keySet()) {
1✔
791
        paths.add(new Path(device, measurementId, true));
1✔
792
      }
1✔
793
    }
1✔
794
    return paths;
1✔
795
  }
796

797
  /**
798
   * @return an iterator of timeseries list, in which names of timeseries are ordered in dictionary
799
   *     order
800
   * @throws IOException io error
801
   */
802
  public Iterator<List<Path>> getPathsIterator() throws IOException {
803
    readFileMetadata();
1✔
804

805
    MetadataIndexNode metadataIndexNode = tsFileMetaData.getMetadataIndex();
1✔
806
    List<MetadataIndexEntry> metadataIndexEntryList = metadataIndexNode.getChildren();
1✔
807
    Queue<Pair<String, Pair<Long, Long>>> queue = new LinkedList<>();
1✔
808
    for (int i = 0; i < metadataIndexEntryList.size(); i++) {
1✔
809
      MetadataIndexEntry metadataIndexEntry = metadataIndexEntryList.get(i);
1✔
810
      long endOffset = metadataIndexNode.getEndOffset();
1✔
811
      if (i != metadataIndexEntryList.size() - 1) {
1✔
812
        endOffset = metadataIndexEntryList.get(i + 1).getOffset();
1✔
813
      }
814
      ByteBuffer buffer = readData(metadataIndexEntry.getOffset(), endOffset);
1✔
815
      getAllPaths(metadataIndexEntry, buffer, null, metadataIndexNode.getNodeType(), queue);
1✔
816
    }
817
    return new Iterator<List<Path>>() {
1✔
818
      @Override
819
      public boolean hasNext() {
820
        return !queue.isEmpty();
1✔
821
      }
822

823
      @Override
824
      public List<Path> next() {
825
        if (!hasNext()) {
1✔
826
          throw new NoSuchElementException();
×
827
        }
828
        Pair<String, Pair<Long, Long>> startEndPair = queue.remove();
1✔
829
        List<Path> paths = new ArrayList<>();
1✔
830
        try {
831
          ByteBuffer nextBuffer = readData(startEndPair.right.left, startEndPair.right.right);
1✔
832
          while (nextBuffer.hasRemaining()) {
1✔
833
            paths.add(
1✔
834
                new Path(
835
                    startEndPair.left,
836
                    TimeseriesMetadata.deserializeFrom(nextBuffer, false).getMeasurementId(),
1✔
837
                    true));
838
          }
839
          return paths;
1✔
840
        } catch (IOException e) {
×
841
          throw new TsFileRuntimeException(
×
842
              "Error occurred while reading a time series metadata block.");
843
        }
844
      }
845
    };
846
  }
847

848
  private void getAllPaths(
849
      MetadataIndexEntry metadataIndex,
850
      ByteBuffer buffer,
851
      String deviceId,
852
      MetadataIndexNodeType type,
853
      Queue<Pair<String, Pair<Long, Long>>> queue)
854
      throws IOException {
855
    try {
856
      if (type.equals(MetadataIndexNodeType.LEAF_DEVICE)) {
1✔
857
        deviceId = metadataIndex.getName();
1✔
858
      }
859
      MetadataIndexNode metadataIndexNode = MetadataIndexNode.deserializeFrom(buffer);
1✔
860
      int metadataIndexListSize = metadataIndexNode.getChildren().size();
1✔
861
      for (int i = 0; i < metadataIndexListSize; i++) {
1✔
862
        long startOffset = metadataIndexNode.getChildren().get(i).getOffset();
1✔
863
        long endOffset = metadataIndexNode.getEndOffset();
1✔
864
        if (i != metadataIndexListSize - 1) {
1✔
865
          endOffset = metadataIndexNode.getChildren().get(i + 1).getOffset();
1✔
866
        }
867
        if (metadataIndexNode.getNodeType().equals(MetadataIndexNodeType.LEAF_MEASUREMENT)) {
1✔
868
          queue.add(new Pair<>(deviceId, new Pair<>(startOffset, endOffset)));
1✔
869
          continue;
1✔
870
        }
871
        ByteBuffer nextBuffer =
1✔
872
            readData(metadataIndexNode.getChildren().get(i).getOffset(), endOffset);
1✔
873
        getAllPaths(
1✔
874
            metadataIndexNode.getChildren().get(i),
1✔
875
            nextBuffer,
876
            deviceId,
877
            metadataIndexNode.getNodeType(),
1✔
878
            queue);
879
      }
880
    } catch (Exception e) {
×
881
      logger.error("Something error happened while getting all paths of file {}", file);
×
882
      throw e;
×
883
    }
1✔
884
  }
1✔
885

886
  /**
887
   * Check whether the deivce is aligned or not.
888
   *
889
   * @param measurementNode the next measurement layer node of specific device node
890
   */
891
  public boolean isAlignedDevice(MetadataIndexNode measurementNode) {
892
    return "".equals(measurementNode.getChildren().get(0).getName());
1✔
893
  }
894

895
  TimeseriesMetadata tryToGetFirstTimeseriesMetadata(MetadataIndexNode measurementNode)
896
      throws IOException {
897
    // Not aligned timeseries
898
    if (!"".equals(measurementNode.getChildren().get(0).getName())) {
1✔
899
      return null;
1✔
900
    }
901

902
    // Aligned timeseries
903
    if (measurementNode.getNodeType().equals(MetadataIndexNodeType.LEAF_MEASUREMENT)) {
1✔
904
      ByteBuffer buffer;
905
      if (measurementNode.getChildren().size() > 1) {
1✔
906
        buffer =
1✔
907
            readData(
1✔
908
                measurementNode.getChildren().get(0).getOffset(),
1✔
909
                measurementNode.getChildren().get(1).getOffset());
1✔
910
      } else {
911
        buffer =
1✔
912
            readData(
1✔
913
                measurementNode.getChildren().get(0).getOffset(), measurementNode.getEndOffset());
1✔
914
      }
915
      return TimeseriesMetadata.deserializeFrom(buffer, true);
1✔
916
    } else if (measurementNode.getNodeType().equals(MetadataIndexNodeType.INTERNAL_MEASUREMENT)) {
1✔
917
      ByteBuffer buffer =
1✔
918
          readData(
1✔
919
              measurementNode.getChildren().get(0).getOffset(),
1✔
920
              measurementNode.getChildren().get(1).getOffset());
1✔
921
      MetadataIndexNode metadataIndexNode = MetadataIndexNode.deserializeFrom(buffer);
1✔
922
      return tryToGetFirstTimeseriesMetadata(metadataIndexNode);
1✔
923
    }
924
    return null;
×
925
  }
926

927
  /**
928
   * Get the measurements of current device by its first measurement node. Also get the chunk
929
   * metadata list and timeseries metadata offset.
930
   *
931
   * @param measurementNode first measurement node of the device
932
   * @param excludedMeasurementIds do not deserialize chunk metadatas whose measurementId is in the
933
   *     set. Notice: It only takes effect when the needChunkMetadata parameter is true.
934
   * @param needChunkMetadata need to deserialize chunk metadatas or not
935
   * @return measurement -> chunk metadata list -> timeseries metadata <startOffset, endOffset>
936
   */
937
  public Map<String, Pair<List<IChunkMetadata>, Pair<Long, Long>>>
938
      getTimeseriesMetadataOffsetByDevice(
939
          MetadataIndexNode measurementNode,
940
          Set<String> excludedMeasurementIds,
941
          boolean needChunkMetadata)
942
          throws IOException {
943
    Map<String, Pair<List<IChunkMetadata>, Pair<Long, Long>>> timeseriesMetadataOffsetMap =
1✔
944
        new LinkedHashMap<>();
945
    List<MetadataIndexEntry> childrenEntryList = measurementNode.getChildren();
1✔
946
    for (int i = 0; i < childrenEntryList.size(); i++) {
1✔
947
      long startOffset = childrenEntryList.get(i).getOffset();
1✔
948
      long endOffset =
949
          i == childrenEntryList.size() - 1
1✔
950
              ? measurementNode.getEndOffset()
1✔
951
              : childrenEntryList.get(i + 1).getOffset();
1✔
952
      ByteBuffer nextBuffer = readData(startOffset, endOffset);
1✔
953
      if (measurementNode.getNodeType().equals(MetadataIndexNodeType.LEAF_MEASUREMENT)) {
1✔
954
        // leaf measurement node
955
        while (nextBuffer.hasRemaining()) {
1✔
956
          int metadataStartOffset = nextBuffer.position();
1✔
957
          TimeseriesMetadata timeseriesMetadata =
1✔
958
              TimeseriesMetadata.deserializeFrom(
1✔
959
                  nextBuffer, excludedMeasurementIds, needChunkMetadata);
960
          timeseriesMetadataOffsetMap.put(
1✔
961
              timeseriesMetadata.getMeasurementId(),
1✔
962
              new Pair<>(
963
                  timeseriesMetadata.getChunkMetadataList(),
1✔
964
                  new Pair<>(
965
                      startOffset + metadataStartOffset, startOffset + nextBuffer.position())));
1✔
966
        }
1✔
967

968
      } else {
969
        // internal measurement node
970
        MetadataIndexNode nextLayerMeasurementNode = MetadataIndexNode.deserializeFrom(nextBuffer);
1✔
971
        timeseriesMetadataOffsetMap.putAll(
1✔
972
            getTimeseriesMetadataOffsetByDevice(
1✔
973
                nextLayerMeasurementNode, excludedMeasurementIds, needChunkMetadata));
974
      }
975
    }
976
    return timeseriesMetadataOffsetMap;
1✔
977
  }
978

979
  /**
980
   * Get chunk metadata list by the start offset and end offset of the timeseries metadata.
981
   *
982
   * @param startOffset the start offset of timeseries metadata
983
   * @param endOffset the end offset of timeseries metadata
984
   */
985
  public List<IChunkMetadata> getChunkMetadataListByTimeseriesMetadataOffset(
986
      long startOffset, long endOffset) throws IOException {
987
    ByteBuffer timeseriesMetadataBuffer = readData(startOffset, endOffset);
1✔
988

989
    TimeseriesMetadata timeseriesMetadata =
1✔
990
        TimeseriesMetadata.deserializeFrom(timeseriesMetadataBuffer, true);
1✔
991
    return timeseriesMetadata.getChunkMetadataList();
1✔
992
  }
993

994
  /**
995
   * Get timeseries metadata under the measurementNode and put them into timeseriesMetadataList.
996
   * Skip timeseries whose measurementId is in the excludedMeasurementIds.
997
   *
998
   * @param measurementNode next layer measurement node of specific device leaf node
999
   * @param excludedMeasurementIds skip timeseries whose measurementId is in the set
1000
   */
1001
  public void getDeviceTimeseriesMetadata(
1002
      List<TimeseriesMetadata> timeseriesMetadataList,
1003
      MetadataIndexNode measurementNode,
1004
      Set<String> excludedMeasurementIds,
1005
      boolean needChunkMetadata)
1006
      throws IOException {
1007
    int metadataIndexListSize = measurementNode.getChildren().size();
1✔
1008
    for (int i = 0; i < metadataIndexListSize; i++) {
1✔
1009
      long endOffset = measurementNode.getEndOffset();
1✔
1010
      if (i != metadataIndexListSize - 1) {
1✔
1011
        endOffset = measurementNode.getChildren().get(i + 1).getOffset();
1✔
1012
      }
1013
      ByteBuffer nextBuffer = readData(measurementNode.getChildren().get(i).getOffset(), endOffset);
1✔
1014
      if (measurementNode.getNodeType().equals(MetadataIndexNodeType.LEAF_MEASUREMENT)) {
1✔
1015
        // leaf measurement node
1016
        while (nextBuffer.hasRemaining()) {
1✔
1017
          TimeseriesMetadata timeseriesMetadata =
1✔
1018
              TimeseriesMetadata.deserializeFrom(
1✔
1019
                  nextBuffer, excludedMeasurementIds, needChunkMetadata);
1020
          if (!excludedMeasurementIds.contains(timeseriesMetadata.getMeasurementId())) {
1✔
1021
            timeseriesMetadataList.add(timeseriesMetadata);
1✔
1022
          }
1023
        }
1✔
1024
      } else {
1025
        // internal measurement node
1026
        MetadataIndexNode nextLayerMeasurementNode = MetadataIndexNode.deserializeFrom(nextBuffer);
1✔
1027
        getDeviceTimeseriesMetadata(
1✔
1028
            timeseriesMetadataList,
1029
            nextLayerMeasurementNode,
1030
            excludedMeasurementIds,
1031
            needChunkMetadata);
1032
      }
1033
    }
1034
  }
1✔
1035

1036
  /**
1037
   * Traverse the metadata index from MetadataIndexEntry to get TimeseriesMetadatas
1038
   *
1039
   * @param metadataIndex MetadataIndexEntry
1040
   * @param buffer byte buffer
1041
   * @param deviceId String
1042
   * @param timeseriesMetadataMap map: deviceId -> timeseriesMetadata list
1043
   * @param needChunkMetadata deserialize chunk metadata list or not
1044
   */
1045
  private void generateMetadataIndex(
1046
      MetadataIndexEntry metadataIndex,
1047
      ByteBuffer buffer,
1048
      String deviceId,
1049
      MetadataIndexNodeType type,
1050
      Map<String, List<TimeseriesMetadata>> timeseriesMetadataMap,
1051
      boolean needChunkMetadata)
1052
      throws IOException {
1053
    try {
1054
      if (type.equals(MetadataIndexNodeType.LEAF_MEASUREMENT)) {
1✔
1055
        List<TimeseriesMetadata> timeseriesMetadataList = new ArrayList<>();
1✔
1056
        while (buffer.hasRemaining()) {
1✔
1057
          timeseriesMetadataList.add(TimeseriesMetadata.deserializeFrom(buffer, needChunkMetadata));
1✔
1058
        }
1059
        timeseriesMetadataMap
1✔
1060
            .computeIfAbsent(deviceId, k -> new ArrayList<>())
1✔
1061
            .addAll(timeseriesMetadataList);
1✔
1062
      } else {
1✔
1063
        // deviceId should be determined by LEAF_DEVICE node
1064
        if (type.equals(MetadataIndexNodeType.LEAF_DEVICE)) {
1✔
1065
          deviceId = metadataIndex.getName();
1✔
1066
        }
1067
        MetadataIndexNode metadataIndexNode = MetadataIndexNode.deserializeFrom(buffer);
1✔
1068
        int metadataIndexListSize = metadataIndexNode.getChildren().size();
1✔
1069
        for (int i = 0; i < metadataIndexListSize; i++) {
1✔
1070
          long endOffset = metadataIndexNode.getEndOffset();
1✔
1071
          if (i != metadataIndexListSize - 1) {
1✔
1072
            endOffset = metadataIndexNode.getChildren().get(i + 1).getOffset();
1✔
1073
          }
1074
          ByteBuffer nextBuffer =
1✔
1075
              readData(metadataIndexNode.getChildren().get(i).getOffset(), endOffset);
1✔
1076
          generateMetadataIndex(
1✔
1077
              metadataIndexNode.getChildren().get(i),
1✔
1078
              nextBuffer,
1079
              deviceId,
1080
              metadataIndexNode.getNodeType(),
1✔
1081
              timeseriesMetadataMap,
1082
              needChunkMetadata);
1083
        }
1084
      }
1085
    } catch (Exception e) {
1✔
1086
      logger.error("Something error happened while generating MetadataIndex of file {}", file);
1✔
1087
      throw e;
1✔
1088
    }
1✔
1089
  }
1✔
1090

1091
  /* TimeseriesMetadata don't need deserialize chunk metadata list */
1092
  public Map<String, List<TimeseriesMetadata>> getAllTimeseriesMetadata(boolean needChunkMetadata)
1093
      throws IOException {
1094
    if (tsFileMetaData == null) {
1✔
1095
      readFileMetadata();
1✔
1096
    }
1097
    Map<String, List<TimeseriesMetadata>> timeseriesMetadataMap = new HashMap<>();
1✔
1098
    MetadataIndexNode metadataIndexNode = tsFileMetaData.getMetadataIndex();
1✔
1099
    List<MetadataIndexEntry> metadataIndexEntryList = metadataIndexNode.getChildren();
1✔
1100
    for (int i = 0; i < metadataIndexEntryList.size(); i++) {
1✔
1101
      MetadataIndexEntry metadataIndexEntry = metadataIndexEntryList.get(i);
1✔
1102
      long endOffset = metadataIndexNode.getEndOffset();
1✔
1103
      if (i != metadataIndexEntryList.size() - 1) {
1✔
1104
        endOffset = metadataIndexEntryList.get(i + 1).getOffset();
1✔
1105
      }
1106
      ByteBuffer buffer = readData(metadataIndexEntry.getOffset(), endOffset);
1✔
1107
      generateMetadataIndex(
1✔
1108
          metadataIndexEntry,
1109
          buffer,
1110
          null,
1111
          metadataIndexNode.getNodeType(),
1✔
1112
          timeseriesMetadataMap,
1113
          needChunkMetadata);
1114
    }
1115
    return timeseriesMetadataMap;
1✔
1116
  }
1117

1118
  /* This method will only deserialize the TimeseriesMetadata, not including chunk metadata list */
1119
  private List<TimeseriesMetadata> getDeviceTimeseriesMetadataWithoutChunkMetadata(String device)
1120
      throws IOException {
1121
    MetadataIndexNode metadataIndexNode = tsFileMetaData.getMetadataIndex();
1✔
1122
    Pair<MetadataIndexEntry, Long> metadataIndexPair =
1✔
1123
        getMetadataAndEndOffset(metadataIndexNode, device, true, true);
1✔
1124
    if (metadataIndexPair == null) {
1✔
1125
      return Collections.emptyList();
1✔
1126
    }
1127
    ByteBuffer buffer = readData(metadataIndexPair.left.getOffset(), metadataIndexPair.right);
1✔
1128
    Map<String, List<TimeseriesMetadata>> timeseriesMetadataMap = new TreeMap<>();
1✔
1129
    generateMetadataIndex(
1✔
1130
        metadataIndexPair.left,
1131
        buffer,
1132
        device,
1133
        MetadataIndexNodeType.INTERNAL_MEASUREMENT,
1134
        timeseriesMetadataMap,
1135
        false);
1136
    List<TimeseriesMetadata> deviceTimeseriesMetadata = new ArrayList<>();
1✔
1137
    for (List<TimeseriesMetadata> timeseriesMetadataList : timeseriesMetadataMap.values()) {
1✔
1138
      deviceTimeseriesMetadata.addAll(timeseriesMetadataList);
1✔
1139
    }
1✔
1140
    return deviceTimeseriesMetadata;
1✔
1141
  }
1142

1143
  /* This method will not only deserialize the TimeseriesMetadata, but also all the chunk metadata list meanwhile. */
1144
  private List<TimeseriesMetadata> getDeviceTimeseriesMetadata(String device) throws IOException {
1145
    MetadataIndexNode metadataIndexNode = tsFileMetaData.getMetadataIndex();
1✔
1146
    Pair<MetadataIndexEntry, Long> metadataIndexPair =
1✔
1147
        getMetadataAndEndOffset(metadataIndexNode, device, true, true);
1✔
1148
    if (metadataIndexPair == null) {
1✔
1149
      return Collections.emptyList();
1✔
1150
    }
1151
    ByteBuffer buffer = readData(metadataIndexPair.left.getOffset(), metadataIndexPair.right);
1✔
1152
    Map<String, List<TimeseriesMetadata>> timeseriesMetadataMap = new TreeMap<>();
1✔
1153
    generateMetadataIndex(
1✔
1154
        metadataIndexPair.left,
1155
        buffer,
1156
        device,
1157
        MetadataIndexNodeType.INTERNAL_MEASUREMENT,
1158
        timeseriesMetadataMap,
1159
        true);
1160
    List<TimeseriesMetadata> deviceTimeseriesMetadata = new ArrayList<>();
1✔
1161
    for (List<TimeseriesMetadata> timeseriesMetadataList : timeseriesMetadataMap.values()) {
1✔
1162
      deviceTimeseriesMetadata.addAll(timeseriesMetadataList);
1✔
1163
    }
1✔
1164
    return deviceTimeseriesMetadata;
1✔
1165
  }
1166

1167
  /**
1168
   * Get target MetadataIndexEntry and its end offset
1169
   *
1170
   * @param metadataIndex given MetadataIndexNode
1171
   * @param name target device / measurement name
1172
   * @param isDeviceLevel whether target MetadataIndexNode is device level
1173
   * @param exactSearch whether is in exact search mode, return null when there is no entry with
1174
   *     name; or else return the nearest MetadataIndexEntry before it (for deeper search)
1175
   * @return target MetadataIndexEntry, endOffset pair
1176
   */
1177
  protected Pair<MetadataIndexEntry, Long> getMetadataAndEndOffset(
1178
      MetadataIndexNode metadataIndex, String name, boolean isDeviceLevel, boolean exactSearch)
1179
      throws IOException {
1180
    try {
1181
      // When searching for a device node, return when it is not INTERNAL_DEVICE
1182
      // When searching for a measurement node, return when it is not INTERNAL_MEASUREMENT
1183
      if ((isDeviceLevel
1✔
1184
              && !metadataIndex.getNodeType().equals(MetadataIndexNodeType.INTERNAL_DEVICE))
1✔
1185
          || (!isDeviceLevel
1186
              && !metadataIndex.getNodeType().equals(MetadataIndexNodeType.INTERNAL_MEASUREMENT))) {
1✔
1187
        return metadataIndex.getChildIndexEntry(name, exactSearch);
1✔
1188
      } else {
1189
        Pair<MetadataIndexEntry, Long> childIndexEntry =
1✔
1190
            metadataIndex.getChildIndexEntry(name, false);
1✔
1191
        ByteBuffer buffer = readData(childIndexEntry.left.getOffset(), childIndexEntry.right);
1✔
1192
        return getMetadataAndEndOffset(
1✔
1193
            MetadataIndexNode.deserializeFrom(buffer), name, isDeviceLevel, exactSearch);
1✔
1194
      }
1195
    } catch (Exception e) {
×
1196
      logger.error("Something error happened while deserializing MetadataIndex of file {}", file);
×
1197
      throw e;
×
1198
    }
1199
  }
1200

1201
  /**
1202
   * read data from current position of the input, and deserialize it to a CHUNK_GROUP_FOOTER. <br>
1203
   * This method is not threadsafe.
1204
   *
1205
   * @return a CHUNK_GROUP_FOOTER
1206
   * @throws IOException io error
1207
   */
1208
  public ChunkGroupHeader readChunkGroupHeader() throws IOException {
1209
    return ChunkGroupHeader.deserializeFrom(tsFileInput.wrapAsInputStream(), true);
1✔
1210
  }
1211

1212
  /**
1213
   * read data from current position of the input, and deserialize it to a CHUNK_GROUP_FOOTER.
1214
   *
1215
   * @param position the offset of the chunk group footer in the file
1216
   * @param markerRead true if the offset does not contains the marker , otherwise false
1217
   * @return a CHUNK_GROUP_FOOTER
1218
   * @throws IOException io error
1219
   */
1220
  public ChunkGroupHeader readChunkGroupHeader(long position, boolean markerRead)
1221
      throws IOException {
1222
    return ChunkGroupHeader.deserializeFrom(tsFileInput, position, markerRead);
×
1223
  }
1224

1225
  public void readPlanIndex() throws IOException {
1226
    ByteBuffer buffer = ByteBuffer.allocate(Long.BYTES);
1✔
1227
    if (ReadWriteIOUtils.readAsPossible(tsFileInput, buffer) == 0) {
1✔
1228
      throw new IOException("reach the end of the file.");
×
1229
    }
1230
    buffer.flip();
1✔
1231
    minPlanIndex = buffer.getLong();
1✔
1232
    buffer.clear();
1✔
1233
    if (ReadWriteIOUtils.readAsPossible(tsFileInput, buffer) == 0) {
1✔
1234
      throw new IOException("reach the end of the file.");
×
1235
    }
1236
    buffer.flip();
1✔
1237
    maxPlanIndex = buffer.getLong();
1✔
1238
  }
1✔
1239

1240
  /**
1241
   * read data from current position of the input, and deserialize it to a CHUNK_HEADER. <br>
1242
   * This method is not threadsafe.
1243
   *
1244
   * @return a CHUNK_HEADER
1245
   * @throws IOException io error
1246
   */
1247
  public ChunkHeader readChunkHeader(byte chunkType) throws IOException {
1248
    try {
1249
      return ChunkHeader.deserializeFrom(tsFileInput.wrapAsInputStream(), chunkType);
1✔
1250
    } catch (Throwable t) {
1✔
1251
      logger.warn("Exception {} happened while reading chunk header of {}", t.getMessage(), file);
1✔
1252
      throw t;
1✔
1253
    }
1254
  }
1255

1256
  /**
1257
   * read the chunk's header.
1258
   *
1259
   * @param position the file offset of this chunk's header
1260
   * @param chunkHeaderSize the size of chunk's header
1261
   */
1262
  private ChunkHeader readChunkHeader(long position, int chunkHeaderSize) throws IOException {
1263
    try {
1264
      return ChunkHeader.deserializeFrom(tsFileInput, position, chunkHeaderSize);
1✔
1265
    } catch (Throwable t) {
×
1266
      logger.warn("Exception {} happened while reading chunk header of {}", t.getMessage(), file);
×
1267
      throw t;
×
1268
    }
1269
  }
1270

1271
  /**
1272
   * notice, this function will modify channel's position.
1273
   *
1274
   * @param dataSize the size of chunkdata
1275
   * @param position the offset of the chunk data
1276
   * @return the pages of this chunk
1277
   */
1278
  public ByteBuffer readChunk(long position, int dataSize) throws IOException {
1279
    try {
1280
      return readData(position, dataSize);
1✔
1281
    } catch (Throwable t) {
×
1282
      logger.warn("Exception {} happened while reading chunk of {}", t.getMessage(), file);
×
1283
      throw t;
×
1284
    }
1285
  }
1286

1287
  /**
1288
   * read memory chunk.
1289
   *
1290
   * @param metaData -given chunk meta data
1291
   * @return -chunk
1292
   */
1293
  public Chunk readMemChunk(ChunkMetadata metaData) throws IOException {
1294
    try {
1295
      int chunkHeadSize = ChunkHeader.getSerializedSize(metaData.getMeasurementUid());
1✔
1296
      ChunkHeader header = readChunkHeader(metaData.getOffsetOfChunkHeader(), chunkHeadSize);
1✔
1297
      ByteBuffer buffer =
1✔
1298
          readChunk(
1✔
1299
              metaData.getOffsetOfChunkHeader() + header.getSerializedSize(), header.getDataSize());
1✔
1300
      return new Chunk(header, buffer, metaData.getDeleteIntervalList(), metaData.getStatistics());
1✔
1301
    } catch (Throwable t) {
×
1302
      logger.warn("Exception {} happened while reading chunk of {}", t.getMessage(), file);
×
1303
      throw t;
×
1304
    }
1305
  }
1306

1307
  /**
1308
   * read memory chunk.
1309
   *
1310
   * @param chunkCacheKey given key of chunk LRUCache
1311
   * @return chunk
1312
   */
1313
  public Chunk readMemChunk(CachedChunkLoaderImpl.ChunkCacheKey chunkCacheKey) throws IOException {
1314
    int chunkHeadSize = ChunkHeader.getSerializedSize(chunkCacheKey.getMeasurementUid());
1✔
1315
    ChunkHeader header = readChunkHeader(chunkCacheKey.getOffsetOfChunkHeader(), chunkHeadSize);
1✔
1316
    ByteBuffer buffer =
1✔
1317
        readChunk(
1✔
1318
            chunkCacheKey.getOffsetOfChunkHeader() + header.getSerializedSize(),
1✔
1319
            header.getDataSize());
1✔
1320
    return new Chunk(
1✔
1321
        header, buffer, chunkCacheKey.getDeleteIntervalList(), chunkCacheKey.getStatistics());
1✔
1322
  }
1323

1324
  /**
1325
   * read the {@link CompressionType} and {@link TSEncoding} of a timeseries. This method will skip
1326
   * the measurement id, and data type. This method will change the position of this reader.
1327
   *
1328
   * @param timeseriesMetadata timeseries' metadata
1329
   * @return a pair of {@link CompressionType} and {@link TSEncoding} of given timeseries.
1330
   * @throws IOException
1331
   */
1332
  public Pair<CompressionType, TSEncoding> readTimeseriesCompressionTypeAndEncoding(
1333
      TimeseriesMetadata timeseriesMetadata) throws IOException {
1334

1335
    String measurementId = timeseriesMetadata.getMeasurementId();
×
1336
    int measurementIdLength = measurementId.getBytes(TSFileConfig.STRING_CHARSET).length;
×
1337
    position(
×
1338
        timeseriesMetadata.getChunkMetadataList().get(0).getOffsetOfChunkHeader()
×
1339
            + Byte.BYTES // chunkType
1340
            + ReadWriteForEncodingUtils.varIntSize(measurementIdLength) // measurementID length
×
1341
            + measurementIdLength); // measurementID
1342
    return ChunkHeader.deserializeCompressionTypeAndEncoding(tsFileInput.wrapAsInputStream());
×
1343
  }
1344

1345
  /** Get measurement schema by chunkMetadatas. */
1346
  public MeasurementSchema getMeasurementSchema(List<IChunkMetadata> chunkMetadataList)
1347
      throws IOException {
1348
    if (chunkMetadataList.isEmpty()) {
1✔
1349
      return null;
×
1350
    }
1351
    IChunkMetadata lastChunkMetadata = chunkMetadataList.get(chunkMetadataList.size() - 1);
1✔
1352
    int chunkHeadSize = ChunkHeader.getSerializedSize(lastChunkMetadata.getMeasurementUid());
1✔
1353
    ChunkHeader header = readChunkHeader(lastChunkMetadata.getOffsetOfChunkHeader(), chunkHeadSize);
1✔
1354
    return new MeasurementSchema(
1✔
1355
        lastChunkMetadata.getMeasurementUid(),
1✔
1356
        header.getDataType(),
1✔
1357
        header.getEncodingType(),
1✔
1358
        header.getCompressionType());
1✔
1359
  }
1360

1361
  /**
1362
   * not thread safe.
1363
   *
1364
   * @param type given tsfile data type
1365
   */
1366
  public PageHeader readPageHeader(TSDataType type, boolean hasStatistic) throws IOException {
1367
    try {
1368
      return PageHeader.deserializeFrom(tsFileInput.wrapAsInputStream(), type, hasStatistic);
1✔
1369
    } catch (Throwable t) {
1✔
1370
      logger.warn("Exception {} happened while reading page header of {}", t.getMessage(), file);
1✔
1371
      throw t;
1✔
1372
    }
1373
  }
1374

1375
  public long position() throws IOException {
1376
    return tsFileInput.position();
1✔
1377
  }
1378

1379
  public void position(long offset) throws IOException {
1380
    tsFileInput.position(offset);
1✔
1381
  }
1✔
1382

1383
  public void skipPageData(PageHeader header) throws IOException {
1384
    tsFileInput.position(tsFileInput.position() + header.getCompressedSize());
1✔
1385
  }
1✔
1386

1387
  public ByteBuffer readCompressedPage(PageHeader header) throws IOException {
1388
    return readData(-1, header.getCompressedSize());
×
1389
  }
1390

1391
  public ByteBuffer readPage(PageHeader header, CompressionType type) throws IOException {
1392
    ByteBuffer buffer = readData(-1, header.getCompressedSize());
1✔
1393
    if (header.getUncompressedSize() == 0 || type == CompressionType.UNCOMPRESSED) {
1✔
1394
      return buffer;
1✔
1395
    } // FIXME if the buffer is not array-implemented.
1396
    IUnCompressor unCompressor = IUnCompressor.getUnCompressor(type);
1✔
1397
    ByteBuffer uncompressedBuffer = ByteBuffer.allocate(header.getUncompressedSize());
1✔
1398
    unCompressor.uncompress(
1✔
1399
        buffer.array(), buffer.position(), buffer.remaining(), uncompressedBuffer.array(), 0);
1✔
1400
    return uncompressedBuffer;
1✔
1401
  }
1402

1403
  /**
1404
   * read one byte from the input. <br>
1405
   * this method is not thread safe
1406
   */
1407
  public byte readMarker() throws IOException {
1408
    markerBuffer.clear();
1✔
1409
    if (ReadWriteIOUtils.readAsPossible(tsFileInput, markerBuffer) == 0) {
1✔
1410
      throw new IOException("reach the end of the file.");
1✔
1411
    }
1412
    markerBuffer.flip();
1✔
1413
    return markerBuffer.get();
1✔
1414
  }
1415

1416
  @Override
1417
  public void close() throws IOException {
1418
    if (resourceLogger.isDebugEnabled()) {
1✔
1419
      resourceLogger.debug("{} reader is closed.", file);
×
1420
    }
1421
    this.tsFileInput.close();
1✔
1422
  }
1✔
1423

1424
  public String getFileName() {
1425
    return this.file;
×
1426
  }
1427

1428
  public long fileSize() throws IOException {
1429
    return tsFileInput.size();
×
1430
  }
1431

1432
  /**
1433
   * read data from tsFileInput, from the current position (if position = -1), or the given
1434
   * position. <br>
1435
   * if position = -1, the tsFileInput's position will be changed to the current position + real
1436
   * data size that been read. Other wise, the tsFileInput's position is not changed.
1437
   *
1438
   * @param position the start position of data in the tsFileInput, or the current position if
1439
   *     position = -1
1440
   * @param totalSize the size of data that want to read
1441
   * @return data that been read.
1442
   */
1443
  protected ByteBuffer readData(long position, int totalSize) throws IOException {
1444
    int allocateSize = Math.min(MAX_READ_BUFFER_SIZE, totalSize);
1✔
1445
    int allocateNum = (int) Math.ceil((double) totalSize / allocateSize);
1✔
1446
    ByteBuffer buffer = ByteBuffer.allocate(totalSize);
1✔
1447
    int bufferLimit = 0;
1✔
1448
    for (int i = 0; i < allocateNum; i++) {
1✔
1449
      if (i == allocateNum - 1) {
1✔
1450
        allocateSize = totalSize - allocateSize * (allocateNum - 1);
1✔
1451
      }
1452
      bufferLimit += allocateSize;
1✔
1453
      buffer.limit(bufferLimit);
1✔
1454
      if (position < 0) {
1✔
1455
        if (ReadWriteIOUtils.readAsPossible(tsFileInput, buffer) != allocateSize) {
1✔
1456
          throw new IOException("reach the end of the data");
1✔
1457
        }
1458
      } else {
1459
        long actualReadSize =
1✔
1460
            ReadWriteIOUtils.readAsPossible(tsFileInput, buffer, position, allocateSize);
1✔
1461
        if (actualReadSize != allocateSize) {
1✔
1462
          throw new IOException(
×
1463
              String.format(
×
1464
                  "reach the end of the data. Size of data that want to read: %s,"
1465
                      + "actual read size: %s, position: %s",
1466
                  allocateSize, actualReadSize, position));
×
1467
        }
1468
        position += allocateSize;
1✔
1469
      }
1470
    }
1471
    buffer.flip();
1✔
1472
    return buffer;
1✔
1473
  }
1474

1475
  /**
1476
   * read data from tsFileInput, from the current position (if position = -1), or the given
1477
   * position.
1478
   *
1479
   * @param start the start position of data in the tsFileInput, or the current position if position
1480
   *     = -1
1481
   * @param end the end position of data that want to read
1482
   * @return data that been read.
1483
   */
1484
  protected ByteBuffer readData(long start, long end) throws IOException {
1485
    try {
1486
      return readData(start, (int) (end - start));
1✔
1487
    } catch (Throwable t) {
1✔
1488
      logger.warn("Exception {} happened while reading data of {}", t.getMessage(), file);
1✔
1489
      throw t;
1✔
1490
    }
1491
  }
1492

1493
  /** notice, the target bytebuffer are not flipped. */
1494
  public int readRaw(long position, int length, ByteBuffer target) throws IOException {
1495
    return ReadWriteIOUtils.readAsPossible(tsFileInput, target, position, length);
×
1496
  }
1497

1498
  /**
1499
   * Self Check the file and return the position before where the data is safe.
1500
   *
1501
   * @param newSchema the schema on each time series in the file
1502
   * @param chunkGroupMetadataList ChunkGroupMetadata List
1503
   * @param fastFinish if true and the file is complete, then newSchema and chunkGroupMetadataList
1504
   *     parameter will be not modified.
1505
   * @return the position of the file that is fine. All data after the position in the file should
1506
   *     be truncated.
1507
   */
1508
  @SuppressWarnings("squid:S3776") // Suppress high Cognitive Complexity warning
1509
  public long selfCheck(
1510
      Map<Path, IMeasurementSchema> newSchema,
1511
      List<ChunkGroupMetadata> chunkGroupMetadataList,
1512
      boolean fastFinish)
1513
      throws IOException {
1514
    File checkFile = FSFactoryProducer.getFSFactory().getFile(this.file);
1✔
1515
    long fileSize;
1516
    if (!checkFile.exists()) {
1✔
1517
      return TsFileCheckStatus.FILE_NOT_FOUND;
×
1518
    } else {
1519
      fileSize = checkFile.length();
1✔
1520
    }
1521
    ChunkMetadata currentChunk;
1522
    String measurementID;
1523
    TSDataType dataType;
1524
    long fileOffsetOfChunk;
1525

1526
    // ChunkMetadata of current ChunkGroup
1527
    List<ChunkMetadata> chunkMetadataList = new ArrayList<>();
1✔
1528

1529
    int headerLength = TSFileConfig.MAGIC_STRING.getBytes().length + Byte.BYTES;
1✔
1530
    if (fileSize < headerLength) {
1✔
1531
      return TsFileCheckStatus.INCOMPATIBLE_FILE;
1✔
1532
    }
1533
    if (!TSFileConfig.MAGIC_STRING.equals(readHeadMagic())
1✔
1534
        || (TSFileConfig.VERSION_NUMBER != readVersionNumber())) {
1✔
1535
      return TsFileCheckStatus.INCOMPATIBLE_FILE;
×
1536
    }
1537

1538
    tsFileInput.position(headerLength);
1✔
1539
    boolean isComplete = isComplete();
1✔
1540
    if (fileSize == headerLength) {
1✔
1541
      return headerLength;
1✔
1542
    } else if (isComplete) {
1✔
1543
      loadMetadataSize();
1✔
1544
      if (fastFinish) {
1✔
1545
        return TsFileCheckStatus.COMPLETE_FILE;
1✔
1546
      }
1547
    }
1548
    // if not a complete file, we will recover it...
1549
    long truncatedSize = headerLength;
1✔
1550
    byte marker;
1551
    List<long[]> timeBatch = new ArrayList<>();
1✔
1552
    String lastDeviceId = null;
1✔
1553
    List<IMeasurementSchema> measurementSchemaList = new ArrayList<>();
1✔
1554
    try {
1555
      while ((marker = this.readMarker()) != MetaMarker.SEPARATOR) {
1✔
1556
        switch (marker) {
1✔
1557
          case MetaMarker.CHUNK_HEADER:
1558
          case MetaMarker.TIME_CHUNK_HEADER:
1559
          case MetaMarker.VALUE_CHUNK_HEADER:
1560
          case MetaMarker.ONLY_ONE_PAGE_CHUNK_HEADER:
1561
          case MetaMarker.ONLY_ONE_PAGE_TIME_CHUNK_HEADER:
1562
          case MetaMarker.ONLY_ONE_PAGE_VALUE_CHUNK_HEADER:
1563
            fileOffsetOfChunk = this.position() - 1;
1✔
1564
            // if there is something wrong with a chunk, we will drop the whole ChunkGroup
1565
            // as different chunks may be created by the same insertions(sqls), and partial
1566
            // insertion is not tolerable
1567
            ChunkHeader chunkHeader = this.readChunkHeader(marker);
1✔
1568
            measurementID = chunkHeader.getMeasurementID();
1✔
1569
            IMeasurementSchema measurementSchema =
1✔
1570
                new MeasurementSchema(
1571
                    measurementID,
1572
                    chunkHeader.getDataType(),
1✔
1573
                    chunkHeader.getEncodingType(),
1✔
1574
                    chunkHeader.getCompressionType());
1✔
1575
            measurementSchemaList.add(measurementSchema);
1✔
1576
            dataType = chunkHeader.getDataType();
1✔
1577
            if (chunkHeader.getDataType() == TSDataType.VECTOR) {
1✔
1578
              timeBatch.clear();
1✔
1579
            }
1580
            Statistics<? extends Serializable> chunkStatistics =
1✔
1581
                Statistics.getStatsByType(dataType);
1✔
1582
            int dataSize = chunkHeader.getDataSize();
1✔
1583

1584
            if (dataSize > 0) {
1✔
1585
              if (((byte) (chunkHeader.getChunkType() & 0x3F))
1✔
1586
                  == MetaMarker
1587
                      .CHUNK_HEADER) { // more than one page, we could use page statistics to
1588
                // generate chunk statistic
1589
                while (dataSize > 0) {
1✔
1590
                  // a new Page
1591
                  PageHeader pageHeader = this.readPageHeader(chunkHeader.getDataType(), true);
1✔
1592
                  if (pageHeader.getUncompressedSize() != 0) {
1✔
1593
                    // not empty page
1594
                    chunkStatistics.mergeStatistics(pageHeader.getStatistics());
1✔
1595
                  }
1596
                  this.skipPageData(pageHeader);
1✔
1597
                  dataSize -= pageHeader.getSerializedPageSize();
1✔
1598
                  chunkHeader.increasePageNums(1);
1✔
1599
                }
1✔
1600
              } else { // only one page without statistic, we need to iterate each point to generate
1601
                // chunk statistic
1602
                PageHeader pageHeader = this.readPageHeader(chunkHeader.getDataType(), false);
1✔
1603
                Decoder valueDecoder =
1✔
1604
                    Decoder.getDecoderByType(
1✔
1605
                        chunkHeader.getEncodingType(), chunkHeader.getDataType());
1✔
1606
                ByteBuffer pageData = readPage(pageHeader, chunkHeader.getCompressionType());
1✔
1607
                Decoder timeDecoder =
1608
                    Decoder.getDecoderByType(
1✔
1609
                        TSEncoding.valueOf(
1✔
1610
                            TSFileDescriptor.getInstance().getConfig().getTimeEncoder()),
1✔
1611
                        TSDataType.INT64);
1612

1613
                if ((chunkHeader.getChunkType() & TsFileConstant.TIME_COLUMN_MASK)
1✔
1614
                    == TsFileConstant.TIME_COLUMN_MASK) { // Time Chunk with only one page
1615

1616
                  TimePageReader timePageReader =
×
1617
                      new TimePageReader(pageHeader, pageData, timeDecoder);
1618
                  long[] currentTimeBatch = timePageReader.getNextTimeBatch();
×
1619
                  timeBatch.add(currentTimeBatch);
×
1620
                  for (long currentTime : currentTimeBatch) {
×
1621
                    chunkStatistics.update(currentTime);
×
1622
                  }
1623
                } else if ((chunkHeader.getChunkType() & TsFileConstant.VALUE_COLUMN_MASK)
1✔
1624
                    == TsFileConstant.VALUE_COLUMN_MASK) { // Value Chunk with only one page
1625

1626
                  ValuePageReader valuePageReader =
×
1627
                      new ValuePageReader(
1628
                          pageHeader, pageData, chunkHeader.getDataType(), valueDecoder);
×
1629
                  TsPrimitiveType[] valueBatch = valuePageReader.nextValueBatch(timeBatch.get(0));
×
1630

1631
                  if (valueBatch != null && valueBatch.length != 0) {
×
1632
                    for (int i = 0; i < valueBatch.length; i++) {
×
1633
                      TsPrimitiveType value = valueBatch[i];
×
1634
                      if (value == null) {
×
1635
                        continue;
×
1636
                      }
1637
                      long timeStamp = timeBatch.get(0)[i];
×
1638
                      switch (dataType) {
×
1639
                        case INT32:
1640
                          chunkStatistics.update(timeStamp, value.getInt());
×
1641
                          break;
×
1642
                        case INT64:
1643
                          chunkStatistics.update(timeStamp, value.getLong());
×
1644
                          break;
×
1645
                        case FLOAT:
1646
                          chunkStatistics.update(timeStamp, value.getFloat());
×
1647
                          break;
×
1648
                        case DOUBLE:
1649
                          chunkStatistics.update(timeStamp, value.getDouble());
×
1650
                          break;
×
1651
                        case BOOLEAN:
1652
                          chunkStatistics.update(timeStamp, value.getBoolean());
×
1653
                          break;
×
1654
                        case TEXT:
1655
                          chunkStatistics.update(timeStamp, value.getBinary());
×
1656
                          break;
×
1657
                        default:
1658
                          throw new IOException("Unexpected type " + dataType);
×
1659
                      }
1660
                    }
1661
                  }
1662

1663
                } else { // NonAligned Chunk with only one page
×
1664
                  PageReader reader =
1✔
1665
                      new PageReader(
1666
                          pageHeader,
1667
                          pageData,
1668
                          chunkHeader.getDataType(),
1✔
1669
                          valueDecoder,
1670
                          timeDecoder,
1671
                          null);
1672
                  BatchData batchData = reader.getAllSatisfiedPageData();
1✔
1673
                  while (batchData.hasCurrent()) {
1✔
1674
                    switch (dataType) {
1✔
1675
                      case INT32:
1676
                        chunkStatistics.update(batchData.currentTime(), batchData.getInt());
1✔
1677
                        break;
1✔
1678
                      case INT64:
1679
                        chunkStatistics.update(batchData.currentTime(), batchData.getLong());
1✔
1680
                        break;
1✔
1681
                      case FLOAT:
1682
                        chunkStatistics.update(batchData.currentTime(), batchData.getFloat());
1✔
1683
                        break;
1✔
1684
                      case DOUBLE:
1685
                        chunkStatistics.update(batchData.currentTime(), batchData.getDouble());
1✔
1686
                        break;
1✔
1687
                      case BOOLEAN:
1688
                        chunkStatistics.update(batchData.currentTime(), batchData.getBoolean());
×
1689
                        break;
×
1690
                      case TEXT:
1691
                        chunkStatistics.update(batchData.currentTime(), batchData.getBinary());
×
1692
                        break;
×
1693
                      default:
1694
                        throw new IOException("Unexpected type " + dataType);
×
1695
                    }
1696
                    batchData.next();
1✔
1697
                  }
1698
                }
1699
                chunkHeader.increasePageNums(1);
1✔
1700
              }
1701
            }
1702
            currentChunk =
1✔
1703
                new ChunkMetadata(measurementID, dataType, fileOffsetOfChunk, chunkStatistics);
1704
            chunkMetadataList.add(currentChunk);
1✔
1705
            break;
1✔
1706
          case MetaMarker.CHUNK_GROUP_HEADER:
1707
            // if there is something wrong with the ChunkGroup Header, we will drop this ChunkGroup
1708
            // because we can not guarantee the correctness of the deviceId.
1709
            truncatedSize = this.position() - 1;
1✔
1710
            if (lastDeviceId != null) {
1✔
1711
              // schema of last chunk group
1712
              if (newSchema != null) {
1✔
1713
                for (IMeasurementSchema tsSchema : measurementSchemaList) {
1✔
1714
                  newSchema.putIfAbsent(
1✔
1715
                      new Path(lastDeviceId, tsSchema.getMeasurementId(), true), tsSchema);
1✔
1716
                }
1✔
1717
              }
1718
              measurementSchemaList = new ArrayList<>();
1✔
1719
              // last chunk group Metadata
1720
              chunkGroupMetadataList.add(new ChunkGroupMetadata(lastDeviceId, chunkMetadataList));
1✔
1721
            }
1722
            // this is a chunk group
1723
            chunkMetadataList = new ArrayList<>();
1✔
1724
            ChunkGroupHeader chunkGroupHeader = this.readChunkGroupHeader();
1✔
1725
            lastDeviceId = chunkGroupHeader.getDeviceID();
1✔
1726
            break;
1✔
1727
          case MetaMarker.OPERATION_INDEX_RANGE:
1728
            truncatedSize = this.position() - 1;
1✔
1729
            if (lastDeviceId != null) {
1✔
1730
              // schema of last chunk group
1731
              if (newSchema != null) {
1✔
1732
                for (IMeasurementSchema tsSchema : measurementSchemaList) {
1✔
1733
                  newSchema.putIfAbsent(
1✔
1734
                      new Path(lastDeviceId, tsSchema.getMeasurementId(), true), tsSchema);
1✔
1735
                }
1✔
1736
              }
1737
              measurementSchemaList = new ArrayList<>();
1✔
1738
              // last chunk group Metadata
1739
              chunkGroupMetadataList.add(new ChunkGroupMetadata(lastDeviceId, chunkMetadataList));
1✔
1740
              lastDeviceId = null;
1✔
1741
            }
1742
            readPlanIndex();
1✔
1743
            truncatedSize = this.position();
1✔
1744
            break;
1✔
1745
          default:
1746
            // the disk file is corrupted, using this file may be dangerous
1747
            throw new IOException("Unexpected marker " + marker);
×
1748
        }
1749
      }
1750
      // now we read the tail of the data section, so we are sure that the last
1751
      // ChunkGroupFooter is complete.
1752
      if (lastDeviceId != null) {
1✔
1753
        // schema of last chunk group
1754
        if (newSchema != null) {
1✔
1755
          for (IMeasurementSchema tsSchema : measurementSchemaList) {
1✔
1756
            newSchema.putIfAbsent(
1✔
1757
                new Path(lastDeviceId, tsSchema.getMeasurementId(), true), tsSchema);
1✔
1758
          }
1✔
1759
        }
1760
        // last chunk group Metadata
1761
        chunkGroupMetadataList.add(new ChunkGroupMetadata(lastDeviceId, chunkMetadataList));
1✔
1762
      }
1763
      if (isComplete) {
1✔
1764
        truncatedSize = TsFileCheckStatus.COMPLETE_FILE;
1✔
1765
      } else {
1766
        truncatedSize = this.position() - 1;
1✔
1767
      }
1768
    } catch (Exception e) {
1✔
1769
      logger.warn(
1✔
1770
          "TsFile {} self-check cannot proceed at position {} " + "recovered, because : {}",
1771
          file,
1772
          this.position(),
1✔
1773
          e.getMessage());
1✔
1774
    }
1✔
1775
    // Despite the completeness of the data section, we will discard current FileMetadata
1776
    // so that we can continue to write data into this tsfile.
1777
    return truncatedSize;
1✔
1778
  }
1779

1780
  /**
1781
   * Self Check the file and return whether the file is safe.
1782
   *
1783
   * @param filename the path of file
1784
   * @param fastFinish if true, the method will only check the format of head (Magic String TsFile,
1785
   *     Version Number) and tail (Magic String TsFile) of TsFile.
1786
   * @return the status of TsFile
1787
   */
1788
  public long selfCheckWithInfo(
1789
      String filename,
1790
      boolean fastFinish,
1791
      Map<Long, Pair<Path, TimeseriesMetadata>> timeseriesMetadataMap)
1792
      throws IOException, TsFileStatisticsMistakesException {
1793
    String message = " exists statistics mistakes at position ";
1✔
1794
    File checkFile = FSFactoryProducer.getFSFactory().getFile(filename);
1✔
1795
    if (!checkFile.exists()) {
1✔
1796
      return TsFileCheckStatus.FILE_NOT_FOUND;
×
1797
    }
1798
    long fileSize = checkFile.length();
1✔
1799
    logger.info("file length: " + fileSize);
1✔
1800

1801
    int headerLength = TSFileConfig.MAGIC_STRING.getBytes().length + Byte.BYTES;
1✔
1802
    if (fileSize < headerLength) {
1✔
1803
      return TsFileCheckStatus.INCOMPATIBLE_FILE;
×
1804
    }
1805
    try {
1806
      if (!TSFileConfig.MAGIC_STRING.equals(readHeadMagic())
1✔
1807
          || (TSFileConfig.VERSION_NUMBER != readVersionNumber())) {
1✔
1808
        return TsFileCheckStatus.INCOMPATIBLE_FILE;
×
1809
      }
1810
      tsFileInput.position(headerLength);
1✔
1811
      if (isComplete()) {
1✔
1812
        loadMetadataSize();
1✔
1813
        if (fastFinish) {
1✔
1814
          return TsFileCheckStatus.COMPLETE_FILE;
×
1815
        }
1816
      }
1817
    } catch (IOException e) {
×
1818
      logger.error("Error occurred while fast checking TsFile.");
×
1819
      throw e;
×
1820
    }
1✔
1821
    for (Map.Entry<Long, Pair<Path, TimeseriesMetadata>> entry : timeseriesMetadataMap.entrySet()) {
1✔
1822
      TimeseriesMetadata timeseriesMetadata = entry.getValue().right;
1✔
1823
      TSDataType dataType = timeseriesMetadata.getTsDataType();
1✔
1824
      Statistics<? extends Serializable> timeseriesMetadataSta = timeseriesMetadata.getStatistics();
1✔
1825
      Statistics<? extends Serializable> chunkMetadatasSta = Statistics.getStatsByType(dataType);
1✔
1826
      for (IChunkMetadata chunkMetadata : getChunkMetadataList(entry.getValue().left)) {
1✔
1827
        long tscheckStatus = TsFileCheckStatus.COMPLETE_FILE;
1✔
1828
        try {
1829
          tscheckStatus = checkChunkAndPagesStatistics(chunkMetadata);
1✔
1830
        } catch (IOException e) {
×
1831
          logger.error("Error occurred while checking the statistics of chunk and its pages");
×
1832
          throw e;
×
1833
        }
1✔
1834
        if (tscheckStatus == TsFileCheckStatus.FILE_EXISTS_MISTAKES) {
1✔
1835
          throw new TsFileStatisticsMistakesException(
1✔
1836
              "Chunk" + message + chunkMetadata.getOffsetOfChunkHeader());
1✔
1837
        }
1838
        chunkMetadatasSta.mergeStatistics(chunkMetadata.getStatistics());
1✔
1839
      }
1✔
1840
      if (!timeseriesMetadataSta.equals(chunkMetadatasSta)) {
1✔
1841
        long timeseriesMetadataPos = entry.getKey();
×
1842
        throw new TsFileStatisticsMistakesException(
×
1843
            "TimeseriesMetadata" + message + timeseriesMetadataPos);
1844
      }
1845
    }
1✔
1846
    return TsFileCheckStatus.COMPLETE_FILE;
1✔
1847
  }
1848

1849
  public long checkChunkAndPagesStatistics(IChunkMetadata chunkMetadata) throws IOException {
1850
    long offsetOfChunkHeader = chunkMetadata.getOffsetOfChunkHeader();
1✔
1851
    tsFileInput.position(offsetOfChunkHeader);
1✔
1852
    byte marker = this.readMarker();
1✔
1853
    ChunkHeader chunkHeader = this.readChunkHeader(marker);
1✔
1854
    TSDataType dataType = chunkHeader.getDataType();
1✔
1855
    Statistics<? extends Serializable> chunkStatistics = Statistics.getStatsByType(dataType);
1✔
1856
    int dataSize = chunkHeader.getDataSize();
1✔
1857
    if (((byte) (chunkHeader.getChunkType() & 0x3F)) == MetaMarker.CHUNK_HEADER) {
1✔
1858
      while (dataSize > 0) {
1✔
1859
        // a new Page
1860
        PageHeader pageHeader = this.readPageHeader(chunkHeader.getDataType(), true);
1✔
1861
        chunkStatistics.mergeStatistics(pageHeader.getStatistics());
1✔
1862
        this.skipPageData(pageHeader);
1✔
1863
        dataSize -= pageHeader.getSerializedPageSize();
1✔
1864
        chunkHeader.increasePageNums(1);
1✔
1865
      }
1✔
1866
    } else {
1867
      // only one page without statistic, we need to iterate each point to generate
1868
      // statistic
1869
      PageHeader pageHeader = this.readPageHeader(chunkHeader.getDataType(), false);
×
1870
      Decoder valueDecoder =
×
1871
          Decoder.getDecoderByType(chunkHeader.getEncodingType(), chunkHeader.getDataType());
×
1872
      ByteBuffer pageData = readPage(pageHeader, chunkHeader.getCompressionType());
×
1873
      Decoder timeDecoder =
1874
          Decoder.getDecoderByType(
×
1875
              TSEncoding.valueOf(TSFileDescriptor.getInstance().getConfig().getTimeEncoder()),
×
1876
              TSDataType.INT64);
1877
      PageReader reader =
×
1878
          new PageReader(
1879
              pageHeader, pageData, chunkHeader.getDataType(), valueDecoder, timeDecoder, null);
×
1880
      BatchData batchData = reader.getAllSatisfiedPageData();
×
1881
      while (batchData.hasCurrent()) {
×
1882
        switch (dataType) {
×
1883
          case INT32:
1884
            chunkStatistics.update(batchData.currentTime(), batchData.getInt());
×
1885
            break;
×
1886
          case INT64:
1887
            chunkStatistics.update(batchData.currentTime(), batchData.getLong());
×
1888
            break;
×
1889
          case FLOAT:
1890
            chunkStatistics.update(batchData.currentTime(), batchData.getFloat());
×
1891
            break;
×
1892
          case DOUBLE:
1893
            chunkStatistics.update(batchData.currentTime(), batchData.getDouble());
×
1894
            break;
×
1895
          case BOOLEAN:
1896
            chunkStatistics.update(batchData.currentTime(), batchData.getBoolean());
×
1897
            break;
×
1898
          case TEXT:
1899
            chunkStatistics.update(batchData.currentTime(), batchData.getBinary());
×
1900
            break;
×
1901
          default:
1902
            throw new IOException("Unexpected type " + dataType);
×
1903
        }
1904
        batchData.next();
×
1905
      }
1906
      chunkHeader.increasePageNums(1);
×
1907
    }
1908
    if (chunkMetadata.getStatistics().equals(chunkStatistics)) {
1✔
1909
      return TsFileCheckStatus.COMPLETE_FILE;
1✔
1910
    }
1911
    return TsFileCheckStatus.FILE_EXISTS_MISTAKES;
1✔
1912
  }
1913

1914
  /**
1915
   * get ChunkMetaDatas of given path, and throw exception if path not exists
1916
   *
1917
   * @param path timeseries path
1918
   * @return List of ChunkMetaData
1919
   */
1920
  public List<ChunkMetadata> getChunkMetadataList(Path path, boolean ignoreNotExists)
1921
      throws IOException {
1922
    TimeseriesMetadata timeseriesMetaData =
1✔
1923
        readTimeseriesMetadata(path.getDevice(), path.getMeasurement(), ignoreNotExists);
1✔
1924
    if (timeseriesMetaData == null) {
1✔
1925
      return Collections.emptyList();
×
1926
    }
1927
    List<ChunkMetadata> chunkMetadataList = readChunkMetaDataList(timeseriesMetaData);
1✔
1928
    chunkMetadataList.sort(Comparator.comparingLong(IChunkMetadata::getStartTime));
1✔
1929
    return chunkMetadataList;
1✔
1930
  }
1931

1932
  // This method is only used for TsFile
1933
  public List<IChunkMetadata> getIChunkMetadataList(Path path) throws IOException {
1934
    ITimeSeriesMetadata timeseriesMetaData = readITimeseriesMetadata(path, true);
1✔
1935
    if (timeseriesMetaData == null) {
1✔
1936
      return Collections.emptyList();
1✔
1937
    }
1938
    List<IChunkMetadata> chunkMetadataList = readIChunkMetaDataList(timeseriesMetaData);
1✔
1939
    chunkMetadataList.sort(Comparator.comparingLong(IChunkMetadata::getStartTime));
1✔
1940
    return chunkMetadataList;
1✔
1941
  }
1942

1943
  public List<ChunkMetadata> getChunkMetadataList(Path path) throws IOException {
1944
    return getChunkMetadataList(path, false);
1✔
1945
  }
1946

1947
  /**
1948
   * Get AlignedChunkMetadata of sensors under one device
1949
   *
1950
   * @param device device name
1951
   */
1952
  public List<AlignedChunkMetadata> getAlignedChunkMetadata(String device) throws IOException {
1953
    readFileMetadata();
1✔
1954
    MetadataIndexNode deviceMetadataIndexNode = tsFileMetaData.getMetadataIndex();
1✔
1955
    Pair<MetadataIndexEntry, Long> metadataIndexPair =
1✔
1956
        getMetadataAndEndOffset(deviceMetadataIndexNode, device, true, true);
1✔
1957
    if (metadataIndexPair == null) {
1✔
1958
      throw new IOException("Device {" + device + "} is not in tsFileMetaData");
1✔
1959
    }
1960
    ByteBuffer buffer = readData(metadataIndexPair.left.getOffset(), metadataIndexPair.right);
1✔
1961
    MetadataIndexNode metadataIndexNode;
1962
    TimeseriesMetadata firstTimeseriesMetadata;
1963
    try {
1964
      // next layer MeasurementNode of the specific DeviceNode
1965
      metadataIndexNode = MetadataIndexNode.deserializeFrom(buffer);
1✔
1966
    } catch (Exception e) {
×
1967
      logger.error(METADATA_INDEX_NODE_DESERIALIZE_ERROR, file);
×
1968
      throw e;
×
1969
    }
1✔
1970
    firstTimeseriesMetadata = tryToGetFirstTimeseriesMetadata(metadataIndexNode);
1✔
1971
    if (firstTimeseriesMetadata == null) {
1✔
1972
      throw new IOException("Timeseries of device {" + device + "} are not aligned");
1✔
1973
    }
1974

1975
    Map<String, List<TimeseriesMetadata>> timeseriesMetadataMap = new TreeMap<>();
1✔
1976
    List<MetadataIndexEntry> metadataIndexEntryList = metadataIndexNode.getChildren();
1✔
1977

1978
    for (int i = 0; i < metadataIndexEntryList.size(); i++) {
1✔
1979
      MetadataIndexEntry metadataIndexEntry = metadataIndexEntryList.get(i);
1✔
1980
      long endOffset = metadataIndexNode.getEndOffset();
1✔
1981
      if (i != metadataIndexEntryList.size() - 1) {
1✔
1982
        endOffset = metadataIndexEntryList.get(i + 1).getOffset();
1✔
1983
      }
1984
      buffer = readData(metadataIndexEntry.getOffset(), endOffset);
1✔
1985
      if (metadataIndexNode.getNodeType().equals(MetadataIndexNodeType.LEAF_MEASUREMENT)) {
1✔
1986
        List<TimeseriesMetadata> timeseriesMetadataList = new ArrayList<>();
1✔
1987
        while (buffer.hasRemaining()) {
1✔
1988
          timeseriesMetadataList.add(TimeseriesMetadata.deserializeFrom(buffer, true));
1✔
1989
        }
1990
        timeseriesMetadataMap
1✔
1991
            .computeIfAbsent(device, k -> new ArrayList<>())
1✔
1992
            .addAll(timeseriesMetadataList);
1✔
1993
      } else {
1✔
1994
        generateMetadataIndex(
1✔
1995
            metadataIndexEntry,
1996
            buffer,
1997
            device,
1998
            metadataIndexNode.getNodeType(),
1✔
1999
            timeseriesMetadataMap,
2000
            true);
2001
      }
2002
    }
2003

2004
    if (timeseriesMetadataMap.values().size() != 1) {
1✔
2005
      throw new IOException(
×
2006
          String.format(
×
2007
              "Error when reading timeseriesMetadata of device %s in file %s: should only one timeseriesMetadataList in one device, actual: %d",
2008
              device, file, timeseriesMetadataMap.values().size()));
×
2009
    }
2010

2011
    List<TimeseriesMetadata> timeseriesMetadataList =
1✔
2012
        timeseriesMetadataMap.values().iterator().next();
1✔
2013
    TimeseriesMetadata timeseriesMetadata = timeseriesMetadataList.get(0);
1✔
2014
    List<TimeseriesMetadata> valueTimeseriesMetadataList = new ArrayList<>();
1✔
2015

2016
    for (int i = 1; i < timeseriesMetadataList.size(); i++) {
1✔
2017
      valueTimeseriesMetadataList.add(timeseriesMetadataList.get(i));
1✔
2018
    }
2019

2020
    AlignedTimeSeriesMetadata alignedTimeSeriesMetadata =
1✔
2021
        new AlignedTimeSeriesMetadata(timeseriesMetadata, valueTimeseriesMetadataList);
2022
    List<AlignedChunkMetadata> chunkMetadataList = new ArrayList<>();
1✔
2023
    for (IChunkMetadata chunkMetadata : readIChunkMetaDataList(alignedTimeSeriesMetadata)) {
1✔
2024
      chunkMetadataList.add((AlignedChunkMetadata) chunkMetadata);
1✔
2025
    }
1✔
2026
    return chunkMetadataList;
1✔
2027
  }
2028

2029
  /**
2030
   * get ChunkMetaDatas in given TimeseriesMetaData
2031
   *
2032
   * @return List of ChunkMetaData
2033
   */
2034
  public List<ChunkMetadata> readChunkMetaDataList(TimeseriesMetadata timeseriesMetaData)
2035
      throws IOException {
2036
    return timeseriesMetaData.getChunkMetadataList().stream()
1✔
2037
        .map(chunkMetadata -> (ChunkMetadata) chunkMetadata)
1✔
2038
        .collect(Collectors.toList());
1✔
2039
  }
2040

2041
  // This method is only used for TsFile
2042
  public List<IChunkMetadata> readIChunkMetaDataList(ITimeSeriesMetadata timeseriesMetaData) {
2043
    if (timeseriesMetaData instanceof AlignedTimeSeriesMetadata) {
1✔
2044
      return new ArrayList<>(
1✔
2045
          ((AlignedTimeSeriesMetadata) timeseriesMetaData).getChunkMetadataList());
1✔
2046
    } else {
2047
      return new ArrayList<>(((TimeseriesMetadata) timeseriesMetaData).getChunkMetadataList());
1✔
2048
    }
2049
  }
2050

2051
  /**
2052
   * get all measurements in this file
2053
   *
2054
   * @return measurement -> datatype
2055
   */
2056
  public Map<String, TSDataType> getAllMeasurements() throws IOException {
2057
    Map<String, TSDataType> result = new HashMap<>();
×
2058
    for (String device : getAllDevices()) {
×
2059
      Map<String, TimeseriesMetadata> timeseriesMetadataMap = readDeviceMetadata(device);
×
2060
      for (TimeseriesMetadata timeseriesMetadata : timeseriesMetadataMap.values()) {
×
2061
        result.put(timeseriesMetadata.getMeasurementId(), timeseriesMetadata.getTsDataType());
×
2062
      }
×
2063
    }
×
2064
    return result;
×
2065
  }
2066

2067
  /**
2068
   * get all types of measurements in this file
2069
   *
2070
   * @return full path -> datatype
2071
   */
2072
  public Map<String, TSDataType> getFullPathDataTypeMap() throws IOException {
2073
    final Map<String, TSDataType> result = new HashMap<>();
1✔
2074
    for (final String device : getAllDevices()) {
1✔
2075
      Map<String, TimeseriesMetadata> timeseriesMetadataMap = readDeviceMetadata(device);
1✔
2076
      for (TimeseriesMetadata timeseriesMetadata : timeseriesMetadataMap.values()) {
1✔
2077
        result.put(
1✔
2078
            device + TsFileConstant.PATH_SEPARATOR + timeseriesMetadata.getMeasurementId(),
1✔
2079
            timeseriesMetadata.getTsDataType());
1✔
2080
      }
1✔
2081
    }
1✔
2082
    return result;
1✔
2083
  }
2084

2085
  public Map<String, List<String>> getDeviceMeasurementsMap() throws IOException {
2086
    Map<String, List<String>> result = new HashMap<>();
1✔
2087
    for (String device : getAllDevices()) {
1✔
2088
      Map<String, TimeseriesMetadata> timeseriesMetadataMap = readDeviceMetadata(device);
1✔
2089
      for (TimeseriesMetadata timeseriesMetadata : timeseriesMetadataMap.values()) {
1✔
2090
        result
1✔
2091
            .computeIfAbsent(device, d -> new ArrayList<>())
1✔
2092
            .add(timeseriesMetadata.getMeasurementId());
1✔
2093
      }
1✔
2094
    }
1✔
2095
    return result;
1✔
2096
  }
2097

2098
  /**
2099
   * get device names which has valid chunks in [start, end)
2100
   *
2101
   * @param start start of the partition
2102
   * @param end end of the partition
2103
   * @return device names in range
2104
   */
2105
  public List<String> getDeviceNameInRange(long start, long end) throws IOException {
2106
    List<String> res = new ArrayList<>();
×
2107
    for (String device : getAllDevices()) {
×
2108
      Map<String, List<ChunkMetadata>> seriesMetadataMap = readChunkMetadataInDevice(device);
×
2109
      if (hasDataInPartition(seriesMetadataMap, start, end)) {
×
2110
        res.add(device);
×
2111
      }
2112
    }
×
2113
    return res;
×
2114
  }
2115

2116
  /**
2117
   * get metadata index node
2118
   *
2119
   * @param startOffset start read offset
2120
   * @param endOffset end read offset
2121
   * @return MetadataIndexNode
2122
   */
2123
  public MetadataIndexNode getMetadataIndexNode(long startOffset, long endOffset)
2124
      throws IOException {
2125
    return MetadataIndexNode.deserializeFrom(readData(startOffset, endOffset));
1✔
2126
  }
2127

2128
  /**
2129
   * Check if the device has at least one Chunk in this partition
2130
   *
2131
   * @param seriesMetadataMap chunkMetaDataList of each measurement
2132
   * @param start the start position of the space partition
2133
   * @param end the end position of the space partition
2134
   */
2135
  private boolean hasDataInPartition(
2136
      Map<String, List<ChunkMetadata>> seriesMetadataMap, long start, long end) {
2137
    for (List<ChunkMetadata> chunkMetadataList : seriesMetadataMap.values()) {
×
2138
      for (ChunkMetadata chunkMetadata : chunkMetadataList) {
×
2139
        LocateStatus location =
×
2140
            MetadataQuerierByFileImpl.checkLocateStatus(chunkMetadata, start, end);
×
2141
        if (location == LocateStatus.IN) {
×
2142
          return true;
×
2143
        }
2144
      }
×
2145
    }
×
2146
    return false;
×
2147
  }
2148

2149
  /**
2150
   * The location of a chunkGroupMetaData with respect to a space partition constraint.
2151
   *
2152
   * <p>in - the middle point of the chunkGroupMetaData is located in the current space partition.
2153
   * before - the middle point of the chunkGroupMetaData is located before the current space
2154
   * partition. after - the middle point of the chunkGroupMetaData is located after the current
2155
   * space partition.
2156
   */
2157
  public enum LocateStatus {
1✔
2158
    IN,
1✔
2159
    BEFORE,
1✔
2160
    AFTER
1✔
2161
  }
2162

2163
  public long getMinPlanIndex() {
2164
    return minPlanIndex;
1✔
2165
  }
2166

2167
  public long getMaxPlanIndex() {
2168
    return maxPlanIndex;
1✔
2169
  }
2170

2171
  /**
2172
   * @return An iterator of linked hashmaps ( measurement -> chunk metadata list ). When traversing
2173
   *     the linked hashmap, you will get chunk metadata lists according to the lexicographic order
2174
   *     of the measurements. The first measurement of the linked hashmap of each iteration is
2175
   *     always larger than the last measurement of the linked hashmap of the previous iteration in
2176
   *     lexicographic order.
2177
   */
2178
  public Iterator<Map<String, List<ChunkMetadata>>> getMeasurementChunkMetadataListMapIterator(
2179
      String device) throws IOException {
2180
    readFileMetadata();
1✔
2181

2182
    MetadataIndexNode metadataIndexNode = tsFileMetaData.getMetadataIndex();
1✔
2183
    Pair<MetadataIndexEntry, Long> metadataIndexPair =
1✔
2184
        getMetadataAndEndOffset(metadataIndexNode, device, true, true);
1✔
2185

2186
    if (metadataIndexPair == null) {
1✔
2187
      return new Iterator<Map<String, List<ChunkMetadata>>>() {
1✔
2188

2189
        @Override
2190
        public boolean hasNext() {
2191
          return false;
1✔
2192
        }
2193

2194
        @Override
2195
        public LinkedHashMap<String, List<ChunkMetadata>> next() {
2196
          throw new NoSuchElementException();
×
2197
        }
2198
      };
2199
    }
2200

2201
    Queue<Pair<Long, Long>> queue = new LinkedList<>();
1✔
2202
    ByteBuffer buffer = readData(metadataIndexPair.left.getOffset(), metadataIndexPair.right);
1✔
2203
    collectEachLeafMeasurementNodeOffsetRange(buffer, queue);
1✔
2204

2205
    return new Iterator<Map<String, List<ChunkMetadata>>>() {
1✔
2206

2207
      @Override
2208
      public boolean hasNext() {
2209
        return !queue.isEmpty();
1✔
2210
      }
2211

2212
      @Override
2213
      public LinkedHashMap<String, List<ChunkMetadata>> next() {
2214
        if (!hasNext()) {
1✔
2215
          throw new NoSuchElementException();
×
2216
        }
2217
        Pair<Long, Long> startEndPair = queue.remove();
1✔
2218
        LinkedHashMap<String, List<ChunkMetadata>> measurementChunkMetadataList =
1✔
2219
            new LinkedHashMap<>();
2220
        try {
2221
          List<TimeseriesMetadata> timeseriesMetadataList = new ArrayList<>();
1✔
2222
          ByteBuffer nextBuffer = readData(startEndPair.left, startEndPair.right);
1✔
2223
          while (nextBuffer.hasRemaining()) {
1✔
2224
            timeseriesMetadataList.add(TimeseriesMetadata.deserializeFrom(nextBuffer, true));
1✔
2225
          }
2226
          for (TimeseriesMetadata timeseriesMetadata : timeseriesMetadataList) {
1✔
2227
            List<ChunkMetadata> list =
1✔
2228
                measurementChunkMetadataList.computeIfAbsent(
1✔
2229
                    timeseriesMetadata.getMeasurementId(), m -> new ArrayList<>());
1✔
2230
            for (IChunkMetadata chunkMetadata : timeseriesMetadata.getChunkMetadataList()) {
1✔
2231
              list.add((ChunkMetadata) chunkMetadata);
1✔
2232
            }
1✔
2233
          }
1✔
2234
          return measurementChunkMetadataList;
1✔
2235
        } catch (IOException e) {
×
2236
          throw new TsFileRuntimeException(
×
2237
              "Error occurred while reading a time series metadata block.");
2238
        }
2239
      }
2240
    };
2241
  }
2242

2243
  private void collectEachLeafMeasurementNodeOffsetRange(
2244
      ByteBuffer buffer, Queue<Pair<Long, Long>> queue) throws IOException {
2245
    try {
2246
      final MetadataIndexNode metadataIndexNode = MetadataIndexNode.deserializeFrom(buffer);
1✔
2247
      final MetadataIndexNodeType metadataIndexNodeType = metadataIndexNode.getNodeType();
1✔
2248
      final int metadataIndexListSize = metadataIndexNode.getChildren().size();
1✔
2249
      for (int i = 0; i < metadataIndexListSize; ++i) {
1✔
2250
        long startOffset = metadataIndexNode.getChildren().get(i).getOffset();
1✔
2251
        long endOffset = metadataIndexNode.getEndOffset();
1✔
2252
        if (i != metadataIndexListSize - 1) {
1✔
2253
          endOffset = metadataIndexNode.getChildren().get(i + 1).getOffset();
1✔
2254
        }
2255
        if (metadataIndexNodeType.equals(MetadataIndexNodeType.LEAF_MEASUREMENT)) {
1✔
2256
          queue.add(new Pair<>(startOffset, endOffset));
1✔
2257
          continue;
1✔
2258
        }
2259
        collectEachLeafMeasurementNodeOffsetRange(readData(startOffset, endOffset), queue);
1✔
2260
      }
2261
    } catch (Exception e) {
×
2262
      logger.error(
×
2263
          "Error occurred while collecting offset ranges of measurement nodes of file {}", file);
2264
      throw e;
×
2265
    }
1✔
2266
  }
1✔
2267

2268
  /**
2269
   * Read MetadataIndexNode by start and end offset.
2270
   *
2271
   * @param start the start offset of the MetadataIndexNode
2272
   * @param end the end offset of the MetadataIndexNode
2273
   * @return MetadataIndexNode
2274
   * @throws IOException IOException
2275
   */
2276
  public MetadataIndexNode readMetadataIndexNode(long start, long end) throws IOException {
2277
    return MetadataIndexNode.deserializeFrom(readData(start, end));
1✔
2278
  }
2279

2280
  @Override
2281
  public boolean equals(Object o) {
2282
    if (this == o) {
×
2283
      return true;
×
2284
    }
2285
    if (o == null || getClass() != o.getClass()) {
×
2286
      return false;
×
2287
    }
2288
    TsFileSequenceReader reader = (TsFileSequenceReader) o;
×
2289
    return file.equals(reader.file);
×
2290
  }
2291

2292
  @Override
2293
  public int hashCode() {
2294
    return Objects.hash(file);
1✔
2295
  }
2296
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc