• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

apache / iotdb / #9733

pending completion
#9733

push

travis_ci

web-flow
[To rel/1.2] Add compression and encoding type check for FastCompactionPerformer (#10712)

32 of 32 new or added lines in 4 files covered. (100.0%)

79232 of 165563 relevant lines covered (47.86%)

0.48 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

77.09
/iotdb-core/tsfile/src/main/java/org/apache/iotdb/tsfile/write/writer/TsFileIOWriter.java
1
/*
2
 * Licensed to the Apache Software Foundation (ASF) under one
3
 * or more contributor license agreements.  See the NOTICE file
4
 * distributed with this work for additional information
5
 * regarding copyright ownership.  The ASF licenses this file
6
 * to you under the Apache License, Version 2.0 (the
7
 * "License"); you may not use this file except in compliance
8
 * with the License.  You may obtain a copy of the License at
9
 *
10
 *     http://www.apache.org/licenses/LICENSE-2.0
11
 *
12
 * Unless required by applicable law or agreed to in writing,
13
 * software distributed under the License is distributed on an
14
 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15
 * KIND, either express or implied.  See the License for the
16
 * specific language governing permissions and limitations
17
 * under the License.
18
 */
19
package org.apache.iotdb.tsfile.write.writer;
20

21
import org.apache.iotdb.tsfile.common.conf.TSFileConfig;
22
import org.apache.iotdb.tsfile.common.conf.TSFileDescriptor;
23
import org.apache.iotdb.tsfile.common.constant.TsFileConstant;
24
import org.apache.iotdb.tsfile.file.MetaMarker;
25
import org.apache.iotdb.tsfile.file.header.ChunkGroupHeader;
26
import org.apache.iotdb.tsfile.file.header.ChunkHeader;
27
import org.apache.iotdb.tsfile.file.metadata.ChunkGroupMetadata;
28
import org.apache.iotdb.tsfile.file.metadata.ChunkMetadata;
29
import org.apache.iotdb.tsfile.file.metadata.IChunkMetadata;
30
import org.apache.iotdb.tsfile.file.metadata.MetadataIndexEntry;
31
import org.apache.iotdb.tsfile.file.metadata.MetadataIndexNode;
32
import org.apache.iotdb.tsfile.file.metadata.TimeseriesMetadata;
33
import org.apache.iotdb.tsfile.file.metadata.TsFileMetadata;
34
import org.apache.iotdb.tsfile.file.metadata.enums.CompressionType;
35
import org.apache.iotdb.tsfile.file.metadata.enums.MetadataIndexNodeType;
36
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
37
import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding;
38
import org.apache.iotdb.tsfile.file.metadata.statistics.Statistics;
39
import org.apache.iotdb.tsfile.fileSystem.FSFactoryProducer;
40
import org.apache.iotdb.tsfile.read.common.Chunk;
41
import org.apache.iotdb.tsfile.read.common.Path;
42
import org.apache.iotdb.tsfile.utils.BloomFilter;
43
import org.apache.iotdb.tsfile.utils.BytesUtils;
44
import org.apache.iotdb.tsfile.utils.Pair;
45
import org.apache.iotdb.tsfile.utils.PublicBAOS;
46
import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
47
import org.apache.iotdb.tsfile.write.writer.tsmiterator.TSMIterator;
48

49
import org.apache.commons.io.FileUtils;
50
import org.slf4j.Logger;
51
import org.slf4j.LoggerFactory;
52

53
import java.io.File;
54
import java.io.FileOutputStream;
55
import java.io.IOException;
56
import java.io.Serializable;
57
import java.util.ArrayDeque;
58
import java.util.ArrayList;
59
import java.util.HashMap;
60
import java.util.Iterator;
61
import java.util.LinkedList;
62
import java.util.List;
63
import java.util.Map;
64
import java.util.Queue;
65
import java.util.TreeMap;
66

67
import static org.apache.iotdb.tsfile.file.metadata.MetadataIndexConstructor.addCurrentIndexNodeToQueue;
68
import static org.apache.iotdb.tsfile.file.metadata.MetadataIndexConstructor.checkAndBuildLevelIndex;
69
import static org.apache.iotdb.tsfile.file.metadata.MetadataIndexConstructor.generateRootNode;
70

71
/**
72
 * TsFileIOWriter is used to construct metadata and write data stored in memory to output stream.
73
 */
74
public class TsFileIOWriter implements AutoCloseable {
75

76
  protected static final byte[] MAGIC_STRING_BYTES;
77
  public static final byte VERSION_NUMBER_BYTE;
78
  protected static final TSFileConfig config = TSFileDescriptor.getInstance().getConfig();
1✔
79
  private static final Logger logger = LoggerFactory.getLogger(TsFileIOWriter.class);
1✔
80
  private static final Logger resourceLogger = LoggerFactory.getLogger("FileMonitor");
1✔
81

82
  static {
83
    MAGIC_STRING_BYTES = BytesUtils.stringToBytes(TSFileConfig.MAGIC_STRING);
1✔
84
    VERSION_NUMBER_BYTE = TSFileConfig.VERSION_NUMBER;
1✔
85
  }
1✔
86

87
  protected TsFileOutput out;
88
  protected boolean canWrite = true;
1✔
89
  protected File file;
90

91
  // current flushed Chunk
92
  protected ChunkMetadata currentChunkMetadata;
93
  // current flushed ChunkGroup
94
  protected List<ChunkMetadata> chunkMetadataList = new ArrayList<>();
1✔
95
  // all flushed ChunkGroups
96
  protected List<ChunkGroupMetadata> chunkGroupMetadataList = new ArrayList<>();
1✔
97

98
  private long markedPosition;
99
  private String currentChunkGroupDeviceId;
100

101
  // the two longs marks the index range of operations in current MemTable
102
  // and are serialized after MetaMarker.OPERATION_INDEX_RANGE to recover file-level range
103
  private long minPlanIndex;
104
  private long maxPlanIndex;
105

106
  // the following variable is used for memory control
107
  protected long maxMetadataSize;
108
  protected long currentChunkMetadataSize = 0L;
1✔
109
  protected File chunkMetadataTempFile;
110
  protected LocalTsFileOutput tempOutput;
111
  protected volatile boolean hasChunkMetadataInDisk = false;
1✔
112
  // record the total num of path in order to make bloom filter
113
  protected int pathCount = 0;
1✔
114
  protected boolean enableMemoryControl = false;
1✔
115
  private Path lastSerializePath = null;
1✔
116
  protected LinkedList<Long> endPosInCMTForDevice = new LinkedList<>();
1✔
117
  private volatile int chunkMetadataCount = 0;
1✔
118
  public static final String CHUNK_METADATA_TEMP_FILE_SUFFIX = ".meta";
119

120
  /** empty construct function. */
121
  protected TsFileIOWriter() {}
1✔
122

123
  /**
124
   * for writing a new tsfile.
125
   *
126
   * @param file be used to output written data
127
   * @throws IOException if I/O error occurs
128
   */
129
  public TsFileIOWriter(File file) throws IOException {
1✔
130
    this.out = FSFactoryProducer.getFileOutputFactory().getTsFileOutput(file.getPath(), false);
1✔
131
    this.file = file;
1✔
132
    if (resourceLogger.isDebugEnabled()) {
1✔
133
      resourceLogger.debug("{} writer is opened.", file.getName());
×
134
    }
135
    startFile();
1✔
136
  }
1✔
137

138
  /**
139
   * for writing a new tsfile.
140
   *
141
   * @param output be used to output written data
142
   */
143
  public TsFileIOWriter(TsFileOutput output) throws IOException {
×
144
    this.out = output;
×
145
    startFile();
×
146
  }
×
147

148
  /** for test only */
149
  public TsFileIOWriter(TsFileOutput output, boolean test) {
1✔
150
    this.out = output;
1✔
151
  }
1✔
152

153
  /** for write with memory control */
154
  public TsFileIOWriter(File file, boolean enableMemoryControl, long maxMetadataSize)
155
      throws IOException {
156
    this(file);
1✔
157
    this.enableMemoryControl = enableMemoryControl;
1✔
158
    this.maxMetadataSize = maxMetadataSize;
1✔
159
    chunkMetadataTempFile = new File(file.getAbsolutePath() + CHUNK_METADATA_TEMP_FILE_SUFFIX);
1✔
160
  }
1✔
161

162
  /**
163
   * Writes given bytes to output stream. This method is called when total memory size exceeds the
164
   * chunk group size threshold.
165
   *
166
   * @param bytes - data of several pages which has been packed
167
   * @throws IOException if an I/O error occurs.
168
   */
169
  public void writeBytesToStream(PublicBAOS bytes) throws IOException {
170
    bytes.writeTo(out.wrapAsStream());
1✔
171
  }
1✔
172

173
  protected void startFile() throws IOException {
174
    out.write(MAGIC_STRING_BYTES);
1✔
175
    out.write(VERSION_NUMBER_BYTE);
1✔
176
  }
1✔
177

178
  public int startChunkGroup(String deviceId) throws IOException {
179
    this.currentChunkGroupDeviceId = deviceId;
1✔
180
    if (logger.isDebugEnabled()) {
1✔
181
      logger.debug("start chunk group:{}, file position {}", deviceId, out.getPosition());
×
182
    }
183
    chunkMetadataList = new ArrayList<>();
1✔
184
    ChunkGroupHeader chunkGroupHeader = new ChunkGroupHeader(currentChunkGroupDeviceId);
1✔
185
    return chunkGroupHeader.serializeTo(out.wrapAsStream());
1✔
186
  }
187

188
  /**
189
   * end chunk and write some log. If there is no data in the chunk group, nothing will be flushed.
190
   */
191
  public void endChunkGroup() throws IOException {
192
    if (currentChunkGroupDeviceId == null || chunkMetadataList.isEmpty()) {
1✔
193
      return;
1✔
194
    }
195
    chunkGroupMetadataList.add(
1✔
196
        new ChunkGroupMetadata(currentChunkGroupDeviceId, chunkMetadataList));
197
    currentChunkGroupDeviceId = null;
1✔
198
    chunkMetadataList = null;
1✔
199
    out.flush();
1✔
200
  }
1✔
201

202
  /**
203
   * For TsFileReWriteTool / UpgradeTool. Use this method to determine if needs to start a
204
   * ChunkGroup.
205
   *
206
   * @return isWritingChunkGroup
207
   */
208
  public boolean isWritingChunkGroup() {
209
    return currentChunkGroupDeviceId != null;
1✔
210
  }
211

212
  /**
213
   * start a {@linkplain ChunkMetadata ChunkMetaData}.
214
   *
215
   * @param measurementId - measurementId of this time series
216
   * @param compressionCodecName - compression name of this time series
217
   * @param tsDataType - data type
218
   * @param statistics - Chunk statistics
219
   * @param dataSize - the serialized size of all pages
220
   * @param mask - 0x80 for time chunk, 0x40 for value chunk, 0x00 for common chunk
221
   * @throws IOException if I/O error occurs
222
   */
223
  public void startFlushChunk(
224
      String measurementId,
225
      CompressionType compressionCodecName,
226
      TSDataType tsDataType,
227
      TSEncoding encodingType,
228
      Statistics<? extends Serializable> statistics,
229
      int dataSize,
230
      int numOfPages,
231
      int mask)
232
      throws IOException {
233

234
    currentChunkMetadata =
1✔
235
        new ChunkMetadata(measurementId, tsDataType, out.getPosition(), statistics);
1✔
236
    currentChunkMetadata.setMask((byte) mask);
1✔
237

238
    ChunkHeader header =
1✔
239
        new ChunkHeader(
240
            measurementId,
241
            dataSize,
242
            tsDataType,
243
            compressionCodecName,
244
            encodingType,
245
            numOfPages,
246
            mask);
247
    header.serializeTo(out.wrapAsStream());
1✔
248
  }
1✔
249

250
  /** Write a whole chunk in another file into this file. Providing fast merge for IoTDB. */
251
  public void writeChunk(Chunk chunk, ChunkMetadata chunkMetadata) throws IOException {
252
    ChunkHeader chunkHeader = chunk.getHeader();
1✔
253
    currentChunkMetadata =
1✔
254
        new ChunkMetadata(
255
            chunkHeader.getMeasurementID(),
1✔
256
            chunkHeader.getDataType(),
1✔
257
            out.getPosition(),
1✔
258
            chunkMetadata.getStatistics());
1✔
259
    chunkHeader.serializeTo(out.wrapAsStream());
1✔
260
    out.write(chunk.getData());
1✔
261
    endCurrentChunk();
1✔
262
    if (logger.isDebugEnabled()) {
1✔
263
      logger.debug(
×
264
          "end flushing a chunk:{}, totalvalue:{}",
265
          chunkHeader.getMeasurementID(),
×
266
          chunkMetadata.getNumOfPoints());
×
267
    }
268
  }
1✔
269

270
  /** Write an empty value chunk into file directly. Only used for aligned timeseries. */
271
  public void writeEmptyValueChunk(
272
      String measurementId,
273
      CompressionType compressionType,
274
      TSDataType tsDataType,
275
      TSEncoding encodingType,
276
      Statistics<? extends Serializable> statistics)
277
      throws IOException {
278
    currentChunkMetadata =
×
279
        new ChunkMetadata(measurementId, tsDataType, out.getPosition(), statistics);
×
280
    currentChunkMetadata.setMask(TsFileConstant.VALUE_COLUMN_MASK);
×
281
    ChunkHeader emptyChunkHeader =
×
282
        new ChunkHeader(
283
            measurementId,
284
            0,
285
            tsDataType,
286
            compressionType,
287
            encodingType,
288
            0,
289
            TsFileConstant.VALUE_COLUMN_MASK);
290
    emptyChunkHeader.serializeTo(out.wrapAsStream());
×
291
    endCurrentChunk();
×
292
  }
×
293

294
  public void writeChunk(Chunk chunk) throws IOException {
295
    ChunkHeader chunkHeader = chunk.getHeader();
×
296
    currentChunkMetadata =
×
297
        new ChunkMetadata(
298
            chunkHeader.getMeasurementID(),
×
299
            chunkHeader.getDataType(),
×
300
            out.getPosition(),
×
301
            chunk.getChunkStatistic());
×
302
    chunkHeader.serializeTo(out.wrapAsStream());
×
303
    out.write(chunk.getData());
×
304
    endCurrentChunk();
×
305
  }
×
306

307
  /** end chunk and write some log. */
308
  public void endCurrentChunk() {
309
    if (enableMemoryControl) {
1✔
310
      this.currentChunkMetadataSize += currentChunkMetadata.calculateRamSize();
1✔
311
    }
312
    chunkMetadataCount++;
1✔
313
    chunkMetadataList.add(currentChunkMetadata);
1✔
314
    currentChunkMetadata = null;
1✔
315
  }
1✔
316

317
  /**
318
   * write {@linkplain TsFileMetadata TSFileMetaData} to output stream and close it.
319
   *
320
   * @throws IOException if I/O error occurs
321
   */
322
  @SuppressWarnings("squid:S3776") // Suppress high Cognitive Complexity warning
323
  public void endFile() throws IOException {
324
    long startTime = System.currentTimeMillis();
1✔
325
    checkInMemoryPathCount();
1✔
326
    readChunkMetadataAndConstructIndexTree();
1✔
327

328
    long footerIndex = out.getPosition();
1✔
329
    if (logger.isDebugEnabled()) {
1✔
330
      logger.debug("start to flush the footer,file pos:{}", footerIndex);
×
331
    }
332

333
    // write magic string
334
    out.write(MAGIC_STRING_BYTES);
1✔
335

336
    // close file
337
    out.close();
1✔
338
    if (resourceLogger.isDebugEnabled() && file != null) {
1✔
339
      resourceLogger.debug("{} writer is closed.", file.getName());
×
340
    }
341
    if (file != null) {
1✔
342
      File chunkMetadataFile = new File(file.getAbsolutePath() + CHUNK_METADATA_TEMP_FILE_SUFFIX);
1✔
343
      if (chunkMetadataFile.exists()) {
1✔
344
        FileUtils.delete(chunkMetadataFile);
1✔
345
      }
346
    }
347
    canWrite = false;
1✔
348
    long cost = System.currentTimeMillis() - startTime;
1✔
349
    logger.info("Time for flushing metadata is {} ms", cost);
1✔
350
  }
1✔
351

352
  private void checkInMemoryPathCount() {
353
    for (ChunkGroupMetadata chunkGroupMetadata : chunkGroupMetadataList) {
1✔
354
      pathCount += chunkGroupMetadata.getChunkMetadataList().size();
1✔
355
    }
1✔
356
  }
1✔
357

358
  private void readChunkMetadataAndConstructIndexTree() throws IOException {
359
    if (tempOutput != null) {
1✔
360
      tempOutput.close();
1✔
361
    }
362
    long metaOffset = out.getPosition();
1✔
363

364
    // serialize the SEPARATOR of MetaData
365
    ReadWriteIOUtils.write(MetaMarker.SEPARATOR, out.wrapAsStream());
1✔
366

367
    TSMIterator tsmIterator =
368
        hasChunkMetadataInDisk
1✔
369
            ? TSMIterator.getTSMIteratorInDisk(
1✔
370
                chunkMetadataTempFile, chunkGroupMetadataList, endPosInCMTForDevice)
371
            : TSMIterator.getTSMIteratorInMemory(chunkGroupMetadataList);
1✔
372
    Map<String, MetadataIndexNode> deviceMetadataIndexMap = new TreeMap<>();
1✔
373
    Queue<MetadataIndexNode> measurementMetadataIndexQueue = new ArrayDeque<>();
1✔
374
    String currentDevice = null;
1✔
375
    String prevDevice = null;
1✔
376
    Path currentPath = null;
1✔
377
    MetadataIndexNode currentIndexNode =
1✔
378
        new MetadataIndexNode(MetadataIndexNodeType.LEAF_MEASUREMENT);
379
    TSFileConfig config = TSFileDescriptor.getInstance().getConfig();
1✔
380
    int seriesIdxForCurrDevice = 0;
1✔
381
    BloomFilter filter =
382
        BloomFilter.getEmptyBloomFilter(
1✔
383
            TSFileDescriptor.getInstance().getConfig().getBloomFilterErrorRate(), pathCount);
1✔
384

385
    int indexCount = 0;
1✔
386
    while (tsmIterator.hasNext()) {
1✔
387
      // read in all chunk metadata of one series
388
      // construct the timeseries metadata for this series
389
      Pair<Path, TimeseriesMetadata> timeseriesMetadataPair = tsmIterator.next();
1✔
390
      TimeseriesMetadata timeseriesMetadata = timeseriesMetadataPair.right;
1✔
391
      currentPath = timeseriesMetadataPair.left;
1✔
392

393
      indexCount++;
1✔
394
      // build bloom filter
395
      filter.add(currentPath.getFullPath());
1✔
396
      // construct the index tree node for the series
397

398
      currentDevice = currentPath.getDevice();
1✔
399
      if (!currentDevice.equals(prevDevice)) {
1✔
400
        if (prevDevice != null) {
1✔
401
          addCurrentIndexNodeToQueue(currentIndexNode, measurementMetadataIndexQueue, out);
1✔
402
          deviceMetadataIndexMap.put(
1✔
403
              prevDevice,
404
              generateRootNode(
1✔
405
                  measurementMetadataIndexQueue, out, MetadataIndexNodeType.INTERNAL_MEASUREMENT));
406
          currentIndexNode = new MetadataIndexNode(MetadataIndexNodeType.LEAF_MEASUREMENT);
1✔
407
        }
408
        measurementMetadataIndexQueue = new ArrayDeque<>();
1✔
409
        seriesIdxForCurrDevice = 0;
1✔
410
      }
411

412
      if (seriesIdxForCurrDevice % config.getMaxDegreeOfIndexNode() == 0) {
1✔
413
        if (currentIndexNode.isFull()) {
1✔
414
          addCurrentIndexNodeToQueue(currentIndexNode, measurementMetadataIndexQueue, out);
1✔
415
          currentIndexNode = new MetadataIndexNode(MetadataIndexNodeType.LEAF_MEASUREMENT);
1✔
416
        }
417
        if (timeseriesMetadata.getTsDataType() != TSDataType.VECTOR) {
1✔
418
          currentIndexNode.addEntry(
1✔
419
              new MetadataIndexEntry(currentPath.getMeasurement(), out.getPosition()));
1✔
420
        } else {
421
          currentIndexNode.addEntry(new MetadataIndexEntry("", out.getPosition()));
1✔
422
        }
423
      }
424

425
      prevDevice = currentDevice;
1✔
426
      seriesIdxForCurrDevice++;
1✔
427
      // serialize the timeseries metadata to file
428
      timeseriesMetadata.serializeTo(out.wrapAsStream());
1✔
429
    }
1✔
430

431
    addCurrentIndexNodeToQueue(currentIndexNode, measurementMetadataIndexQueue, out);
1✔
432
    if (prevDevice != null) {
1✔
433
      deviceMetadataIndexMap.put(
1✔
434
          prevDevice,
435
          generateRootNode(
1✔
436
              measurementMetadataIndexQueue, out, MetadataIndexNodeType.INTERNAL_MEASUREMENT));
437
    }
438

439
    MetadataIndexNode metadataIndex = checkAndBuildLevelIndex(deviceMetadataIndexMap, out);
1✔
440

441
    TsFileMetadata tsFileMetadata = new TsFileMetadata();
1✔
442
    tsFileMetadata.setMetadataIndex(metadataIndex);
1✔
443
    tsFileMetadata.setMetaOffset(metaOffset);
1✔
444

445
    int size = tsFileMetadata.serializeTo(out.wrapAsStream());
1✔
446
    size += tsFileMetadata.serializeBloomFilter(out.wrapAsStream(), filter);
1✔
447

448
    // write TsFileMetaData size
449
    ReadWriteIOUtils.write(size, out.wrapAsStream());
1✔
450
  }
1✔
451

452
  /**
453
   * get the length of normal OutputStream.
454
   *
455
   * @return - length of normal OutputStream
456
   * @throws IOException if I/O error occurs
457
   */
458
  public long getPos() throws IOException {
459
    return out.getPosition();
1✔
460
  }
461

462
  // device -> ChunkMetadataList
463
  public Map<String, List<ChunkMetadata>> getDeviceChunkMetadataMap() {
464
    Map<String, List<ChunkMetadata>> deviceChunkMetadataMap = new HashMap<>();
1✔
465

466
    for (ChunkGroupMetadata chunkGroupMetadata : chunkGroupMetadataList) {
1✔
467
      deviceChunkMetadataMap
1✔
468
          .computeIfAbsent(chunkGroupMetadata.getDevice(), k -> new ArrayList<>())
1✔
469
          .addAll(chunkGroupMetadata.getChunkMetadataList());
1✔
470
    }
1✔
471
    return deviceChunkMetadataMap;
1✔
472
  }
473

474
  public boolean canWrite() {
475
    return canWrite;
1✔
476
  }
477

478
  public void mark() throws IOException {
479
    markedPosition = getPos();
1✔
480
  }
1✔
481

482
  public void reset() throws IOException {
483
    out.truncate(markedPosition);
×
484
  }
×
485

486
  /**
487
   * close the outputStream or file channel without writing FileMetadata. This is just used for
488
   * Testing.
489
   */
490
  public void close() throws IOException {
491
    canWrite = false;
1✔
492
    out.close();
1✔
493
    if (tempOutput != null) {
1✔
494
      this.tempOutput.close();
1✔
495
    }
496
  }
1✔
497

498
  void writeSeparatorMaskForTest() throws IOException {
499
    out.write(new byte[] {MetaMarker.SEPARATOR});
1✔
500
  }
1✔
501

502
  void writeChunkGroupMarkerForTest() throws IOException {
503
    out.write(new byte[] {MetaMarker.CHUNK_GROUP_HEADER});
1✔
504
  }
1✔
505

506
  public File getFile() {
507
    return file;
1✔
508
  }
509

510
  public void setFile(File file) {
511
    this.file = file;
×
512
  }
×
513

514
  /** Remove such ChunkMetadata that its startTime is not in chunkStartTimes */
515
  public void filterChunks(Map<Path, List<Long>> chunkStartTimes) {
516
    Map<Path, Integer> startTimeIdxes = new HashMap<>();
×
517
    chunkStartTimes.forEach((p, t) -> startTimeIdxes.put(p, 0));
×
518

519
    Iterator<ChunkGroupMetadata> chunkGroupMetaDataIterator = chunkGroupMetadataList.iterator();
×
520
    while (chunkGroupMetaDataIterator.hasNext()) {
×
521
      ChunkGroupMetadata chunkGroupMetaData = chunkGroupMetaDataIterator.next();
×
522
      String deviceId = chunkGroupMetaData.getDevice();
×
523
      int chunkNum = chunkGroupMetaData.getChunkMetadataList().size();
×
524
      Iterator<ChunkMetadata> chunkMetaDataIterator =
×
525
          chunkGroupMetaData.getChunkMetadataList().iterator();
×
526
      while (chunkMetaDataIterator.hasNext()) {
×
527
        IChunkMetadata chunkMetaData = chunkMetaDataIterator.next();
×
528
        Path path = new Path(deviceId, chunkMetaData.getMeasurementUid(), true);
×
529
        int startTimeIdx = startTimeIdxes.get(path);
×
530

531
        List<Long> pathChunkStartTimes = chunkStartTimes.get(path);
×
532
        boolean chunkValid =
×
533
            startTimeIdx < pathChunkStartTimes.size()
×
534
                && pathChunkStartTimes.get(startTimeIdx) == chunkMetaData.getStartTime();
×
535
        if (!chunkValid) {
×
536
          chunkMetaDataIterator.remove();
×
537
          chunkNum--;
×
538
        } else {
539
          startTimeIdxes.put(path, startTimeIdx + 1);
×
540
        }
541
      }
×
542
      if (chunkNum == 0) {
×
543
        chunkGroupMetaDataIterator.remove();
×
544
      }
545
    }
×
546
  }
×
547

548
  public void writePlanIndices() throws IOException {
549
    ReadWriteIOUtils.write(MetaMarker.OPERATION_INDEX_RANGE, out.wrapAsStream());
1✔
550
    ReadWriteIOUtils.write(minPlanIndex, out.wrapAsStream());
1✔
551
    ReadWriteIOUtils.write(maxPlanIndex, out.wrapAsStream());
1✔
552
    out.flush();
1✔
553
  }
1✔
554

555
  public void truncate(long offset) throws IOException {
556
    out.truncate(offset);
1✔
557
  }
1✔
558

559
  /**
560
   * this function is only for Test.
561
   *
562
   * @return TsFileOutput
563
   */
564
  public TsFileOutput getIOWriterOut() {
565
    return out;
1✔
566
  }
567

568
  /**
569
   * This method should be called before flushing chunk group metadata list, otherwise, it will
570
   * return null.
571
   */
572
  public List<ChunkMetadata> getChunkMetadataListOfCurrentDeviceInMemory() {
573
    return chunkMetadataList;
1✔
574
  }
575

576
  /**
577
   * this function is for Upgrade Tool and Split Tool.
578
   *
579
   * @return DeviceTimeseriesMetadataMap
580
   */
581
  public Map<String, List<TimeseriesMetadata>> getDeviceTimeseriesMetadataMap() {
582
    Map<String, List<TimeseriesMetadata>> deviceTimeseriesMetadataMap = new TreeMap<>();
1✔
583
    Map<String, Map<String, List<IChunkMetadata>>> chunkMetadataMap = new TreeMap<>();
1✔
584
    for (ChunkGroupMetadata chunkGroupMetadata : chunkGroupMetadataList) {
1✔
585
      for (ChunkMetadata chunkMetadata : chunkGroupMetadata.getChunkMetadataList()) {
1✔
586
        chunkMetadataMap
1✔
587
            .computeIfAbsent(chunkGroupMetadata.getDevice(), x -> new TreeMap<>())
1✔
588
            .computeIfAbsent(chunkMetadata.getMeasurementUid(), x -> new ArrayList<>())
1✔
589
            .add(chunkMetadata);
1✔
590
      }
1✔
591
    }
1✔
592
    for (String device : chunkMetadataMap.keySet()) {
1✔
593
      Map<String, List<IChunkMetadata>> seriesToChunkMetadataMap = chunkMetadataMap.get(device);
1✔
594
      for (Map.Entry<String, List<IChunkMetadata>> entry : seriesToChunkMetadataMap.entrySet()) {
1✔
595
        try {
596
          deviceTimeseriesMetadataMap
1✔
597
              .computeIfAbsent(device, x -> new ArrayList<>())
1✔
598
              .add(TSMIterator.constructOneTimeseriesMetadata(entry.getKey(), entry.getValue()));
1✔
599
        } catch (IOException e) {
×
600
          logger.error("Failed to get device timeseries metadata map", e);
×
601
          return null;
×
602
        }
1✔
603
      }
1✔
604
    }
1✔
605

606
    return deviceTimeseriesMetadataMap;
1✔
607
  }
608

609
  public long getMinPlanIndex() {
610
    return minPlanIndex;
×
611
  }
612

613
  public void setMinPlanIndex(long minPlanIndex) {
614
    this.minPlanIndex = minPlanIndex;
1✔
615
  }
1✔
616

617
  public long getMaxPlanIndex() {
618
    return maxPlanIndex;
×
619
  }
620

621
  public void setMaxPlanIndex(long maxPlanIndex) {
622
    this.maxPlanIndex = maxPlanIndex;
1✔
623
  }
1✔
624

625
  /**
626
   * Check if the size of chunk metadata in memory is greater than the given threshold. If so, the
627
   * chunk metadata will be written to a temp files. <b>Notice! If you are writing a aligned device
628
   * in row, you should make sure all data of current writing device has been written before this
629
   * method is called. For writing not aligned series or writing aligned series in column, you
630
   * should make sure that all data of one series is written before you call this function.</b>
631
   *
632
   * @throws IOException
633
   */
634
  public int checkMetadataSizeAndMayFlush() throws IOException {
635
    // This function should be called after all data of an aligned device has been written
636
    if (enableMemoryControl && currentChunkMetadataSize > maxMetadataSize) {
1✔
637
      try {
638
        if (logger.isDebugEnabled()) {
1✔
639
          logger.debug(
×
640
              "Flushing chunk metadata, total size is {}, count is {}, avg size is {}",
641
              currentChunkMetadataSize,
×
642
              chunkMetadataCount,
×
643
              currentChunkMetadataSize / chunkMetadataCount);
×
644
        }
645
        return sortAndFlushChunkMetadata();
1✔
646
      } catch (IOException e) {
×
647
        logger.error("Meets exception when flushing metadata to temp file for {}", file, e);
×
648
        throw e;
×
649
      }
650
    } else {
651
      return 0;
1✔
652
    }
653
  }
654

655
  /**
656
   * Sort the chunk metadata by the lexicographical order and the start time of the chunk, then
657
   * flush them to a temp file.
658
   *
659
   * @throws IOException
660
   */
661
  protected int sortAndFlushChunkMetadata() throws IOException {
662
    int writtenSize = 0;
1✔
663
    // group by series
664
    List<Pair<Path, List<IChunkMetadata>>> sortedChunkMetadataList =
1✔
665
        TSMIterator.sortChunkMetadata(
1✔
666
            chunkGroupMetadataList, currentChunkGroupDeviceId, chunkMetadataList);
667
    if (tempOutput == null) {
1✔
668
      tempOutput = new LocalTsFileOutput(new FileOutputStream(chunkMetadataTempFile));
1✔
669
    }
670
    hasChunkMetadataInDisk = true;
1✔
671
    for (Pair<Path, List<IChunkMetadata>> pair : sortedChunkMetadataList) {
1✔
672
      Path seriesPath = pair.left;
1✔
673
      boolean isNewPath = !seriesPath.equals(lastSerializePath);
1✔
674
      if (isNewPath) {
1✔
675
        // record the count of path to construct bloom filter later
676
        pathCount++;
1✔
677
      }
678
      List<IChunkMetadata> iChunkMetadataList = pair.right;
1✔
679
      writtenSize += writeChunkMetadataToTempFile(iChunkMetadataList, seriesPath, isNewPath);
1✔
680
      lastSerializePath = seriesPath;
1✔
681
      logger.debug("Flushing {}", seriesPath);
1✔
682
    }
1✔
683
    // clear the cache metadata to release the memory
684
    chunkGroupMetadataList.clear();
1✔
685
    if (chunkMetadataList != null) {
1✔
686
      chunkMetadataList.clear();
1✔
687
    }
688
    chunkMetadataCount = 0;
1✔
689
    currentChunkMetadataSize = 0;
1✔
690
    return writtenSize;
1✔
691
  }
692

693
  private int writeChunkMetadataToTempFile(
694
      List<IChunkMetadata> iChunkMetadataList, Path seriesPath, boolean isNewPath)
695
      throws IOException {
696
    int writtenSize = 0;
1✔
697
    // [DeviceId] measurementId datatype size chunkMetadataBuffer
698
    if (lastSerializePath == null
1✔
699
        || !seriesPath.getDevice().equals(lastSerializePath.getDevice())) {
1✔
700
      // mark the end position of last device
701
      endPosInCMTForDevice.add(tempOutput.getPosition());
1✔
702
      // serialize the device
703
      // for each device, we only serialize it once, in order to save io
704
      writtenSize += ReadWriteIOUtils.write(seriesPath.getDevice(), tempOutput.wrapAsStream());
1✔
705
    }
706
    if (isNewPath && iChunkMetadataList.size() > 0) {
1✔
707
      // serialize the public info of this measurement
708
      writtenSize +=
1✔
709
          ReadWriteIOUtils.writeVar(seriesPath.getMeasurement(), tempOutput.wrapAsStream());
1✔
710
      writtenSize +=
1✔
711
          ReadWriteIOUtils.write(
1✔
712
              iChunkMetadataList.get(0).getDataType(), tempOutput.wrapAsStream());
1✔
713
    }
714
    PublicBAOS buffer = new PublicBAOS();
1✔
715
    int totalSize = 0;
1✔
716
    for (IChunkMetadata chunkMetadata : iChunkMetadataList) {
1✔
717
      totalSize += chunkMetadata.serializeTo(buffer, true);
1✔
718
    }
1✔
719
    writtenSize += ReadWriteIOUtils.write(totalSize, tempOutput.wrapAsStream());
1✔
720
    buffer.writeTo(tempOutput);
1✔
721
    writtenSize += buffer.size();
1✔
722
    return writtenSize;
1✔
723
  }
724

725
  public String getCurrentChunkGroupDeviceId() {
726
    return currentChunkGroupDeviceId;
×
727
  }
728

729
  public List<ChunkGroupMetadata> getChunkGroupMetadataList() {
730
    return chunkGroupMetadataList;
×
731
  }
732

733
  public void flush() throws IOException {
734
    out.flush();
×
735
  }
×
736
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc