• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

apache / iotdb / #9770

pending completion
#9770

push

travis_ci

web-flow
[IOTDB-6101] Pipe: Support tsfile cascade transport  (#10795) (#10796)

Support tsfile cascade transport. For example, there are 3 iotdb clusters A, B and C. Now we can use pipe to transport tsfile from A to C (via B, A -> B -> C).

(cherry picked from commit b3a4bdf81)

5 of 5 new or added lines in 4 files covered. (100.0%)

79456 of 165675 relevant lines covered (47.96%)

0.48 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

80.0
/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/FileLoaderUtils.java
1
/*
2
 * Licensed to the Apache Software Foundation (ASF) under one
3
 * or more contributor license agreements.  See the NOTICE file
4
 * distributed with this work for additional information
5
 * regarding copyright ownership.  The ASF licenses this file
6
 * to you under the Apache License, Version 2.0 (the
7
 * "License"); you may not use this file except in compliance
8
 * with the License.  You may obtain a copy of the License at
9
 *
10
 *     http://www.apache.org/licenses/LICENSE-2.0
11
 *
12
 * Unless required by applicable law or agreed to in writing,
13
 * software distributed under the License is distributed on an
14
 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15
 * KIND, either express or implied.  See the License for the
16
 * specific language governing permissions and limitations
17
 * under the License.
18
 */
19

20
package org.apache.iotdb.db.utils;
21

22
import org.apache.iotdb.commons.path.AlignedPath;
23
import org.apache.iotdb.commons.path.PartialPath;
24
import org.apache.iotdb.db.pipe.agent.PipeAgent;
25
import org.apache.iotdb.db.queryengine.execution.fragment.QueryContext;
26
import org.apache.iotdb.db.queryengine.metric.SeriesScanCostMetricSet;
27
import org.apache.iotdb.db.storageengine.buffer.TimeSeriesMetadataCache;
28
import org.apache.iotdb.db.storageengine.buffer.TimeSeriesMetadataCache.TimeSeriesMetadataCacheKey;
29
import org.apache.iotdb.db.storageengine.dataregion.modification.Modification;
30
import org.apache.iotdb.db.storageengine.dataregion.read.reader.chunk.metadata.DiskAlignedChunkMetadataLoader;
31
import org.apache.iotdb.db.storageengine.dataregion.read.reader.chunk.metadata.DiskChunkMetadataLoader;
32
import org.apache.iotdb.db.storageengine.dataregion.read.reader.chunk.metadata.MemAlignedChunkMetadataLoader;
33
import org.apache.iotdb.db.storageengine.dataregion.read.reader.chunk.metadata.MemChunkMetadataLoader;
34
import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResource;
35
import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResourceStatus;
36
import org.apache.iotdb.tsfile.file.metadata.AlignedTimeSeriesMetadata;
37
import org.apache.iotdb.tsfile.file.metadata.ChunkGroupMetadata;
38
import org.apache.iotdb.tsfile.file.metadata.ChunkMetadata;
39
import org.apache.iotdb.tsfile.file.metadata.IChunkMetadata;
40
import org.apache.iotdb.tsfile.file.metadata.ITimeSeriesMetadata;
41
import org.apache.iotdb.tsfile.file.metadata.TimeseriesMetadata;
42
import org.apache.iotdb.tsfile.read.TsFileSequenceReader;
43
import org.apache.iotdb.tsfile.read.controller.IChunkLoader;
44
import org.apache.iotdb.tsfile.read.filter.basic.Filter;
45
import org.apache.iotdb.tsfile.read.reader.IChunkReader;
46
import org.apache.iotdb.tsfile.read.reader.IPageReader;
47
import org.apache.iotdb.tsfile.write.writer.TsFileIOWriter;
48

49
import java.io.IOException;
50
import java.util.ArrayList;
51
import java.util.HashSet;
52
import java.util.List;
53
import java.util.Map;
54
import java.util.Map.Entry;
55
import java.util.Set;
56

57
import static org.apache.iotdb.db.queryengine.metric.SeriesScanCostMetricSet.LOAD_TIMESERIES_METADATA_ALIGNED_DISK;
58
import static org.apache.iotdb.db.queryengine.metric.SeriesScanCostMetricSet.LOAD_TIMESERIES_METADATA_ALIGNED_MEM;
59
import static org.apache.iotdb.db.queryengine.metric.SeriesScanCostMetricSet.LOAD_TIMESERIES_METADATA_NONALIGNED_DISK;
60
import static org.apache.iotdb.db.queryengine.metric.SeriesScanCostMetricSet.LOAD_TIMESERIES_METADATA_NONALIGNED_MEM;
61
import static org.apache.iotdb.db.queryengine.metric.SeriesScanCostMetricSet.TIMESERIES_METADATA_MODIFICATION_ALIGNED;
62
import static org.apache.iotdb.db.queryengine.metric.SeriesScanCostMetricSet.TIMESERIES_METADATA_MODIFICATION_NONALIGNED;
63

64
public class FileLoaderUtils {
65

66
  private static final SeriesScanCostMetricSet SERIES_SCAN_COST_METRIC_SET =
1✔
67
      SeriesScanCostMetricSet.getInstance();
1✔
68

69
  private FileLoaderUtils() {
70
    // empty constructor
71
  }
72

73
  public static void updateTsFileResource(
74
      TsFileSequenceReader reader, TsFileResource tsFileResource) throws IOException {
75
    updateTsFileResource(reader.getAllTimeseriesMetadata(false), tsFileResource);
1✔
76
    tsFileResource.updatePlanIndexes(reader.getMinPlanIndex());
1✔
77
    tsFileResource.updatePlanIndexes(reader.getMaxPlanIndex());
1✔
78
  }
1✔
79

80
  public static void updateTsFileResource(
81
      Map<String, List<TimeseriesMetadata>> device2Metadata, TsFileResource tsFileResource) {
82
    for (Entry<String, List<TimeseriesMetadata>> entry : device2Metadata.entrySet()) {
1✔
83
      for (TimeseriesMetadata timeseriesMetaData : entry.getValue()) {
1✔
84
        tsFileResource.updateStartTime(
1✔
85
            entry.getKey(), timeseriesMetaData.getStatistics().getStartTime());
1✔
86
        tsFileResource.updateEndTime(
1✔
87
            entry.getKey(), timeseriesMetaData.getStatistics().getEndTime());
1✔
88
      }
1✔
89
    }
1✔
90
  }
1✔
91

92
  /**
93
   * Generate {@link TsFileResource} from a closed {@link TsFileIOWriter}. Notice that the writer
94
   * should have executed {@link TsFileIOWriter#endFile()}. And this method will not record plan
95
   * Index of this writer.
96
   *
97
   * @param writer a {@link TsFileIOWriter}
98
   * @return a updated {@link TsFileResource}
99
   */
100
  public static TsFileResource generateTsFileResource(TsFileIOWriter writer) {
101
    TsFileResource resource = new TsFileResource(writer.getFile());
×
102
    for (ChunkGroupMetadata chunkGroupMetadata : writer.getChunkGroupMetadataList()) {
×
103
      String device = chunkGroupMetadata.getDevice();
×
104
      for (ChunkMetadata chunkMetadata : chunkGroupMetadata.getChunkMetadataList()) {
×
105
        resource.updateStartTime(device, chunkMetadata.getStartTime());
×
106
        resource.updateEndTime(device, chunkMetadata.getEndTime());
×
107
      }
×
108
    }
×
109
    resource.setStatus(TsFileResourceStatus.NORMAL);
×
110
    PipeAgent.runtime().assignRecoverProgressIndexForTsFileRecovery(resource);
×
111
    return resource;
×
112
  }
113

114
  /**
115
   * Load TimeSeriesMetadata for non-aligned time series
116
   *
117
   * @param resource TsFile
118
   * @param seriesPath Timeseries path
119
   * @param allSensors measurements queried at the same time of this device
120
   * @param filter any filter, only used to check time range
121
   * @throws IOException IOException may be thrown while reading it from disk.
122
   */
123
  @SuppressWarnings("squid:S3776") // Suppress high Cognitive Complexity warning
124
  public static TimeseriesMetadata loadTimeSeriesMetadata(
125
      TsFileResource resource,
126
      PartialPath seriesPath,
127
      QueryContext context,
128
      Filter filter,
129
      Set<String> allSensors)
130
      throws IOException {
131
    long t1 = System.nanoTime();
1✔
132
    boolean loadFromMem = false;
1✔
133
    try {
134
      // common path
135
      TimeseriesMetadata timeSeriesMetadata;
136
      // If the tsfile is closed, we need to load from tsfile
137
      if (resource.isClosed()) {
1✔
138
        // when resource.getTimeIndexType() == 1, TsFileResource.timeIndexType is deviceTimeIndex
139
        // we should not ignore the non-exist of device in TsFileMetadata
140
        timeSeriesMetadata =
141
            TimeSeriesMetadataCache.getInstance()
1✔
142
                .get(
1✔
143
                    new TimeSeriesMetadataCache.TimeSeriesMetadataCacheKey(
144
                        resource.getTsFilePath(),
1✔
145
                        seriesPath.getDevice(),
1✔
146
                        seriesPath.getMeasurement()),
1✔
147
                    allSensors,
148
                    resource.getTimeIndexType() != 1,
1✔
149
                    context.isDebug());
1✔
150
        if (timeSeriesMetadata != null) {
1✔
151
          timeSeriesMetadata.setChunkMetadataLoader(
1✔
152
              new DiskChunkMetadataLoader(resource, seriesPath, context, filter));
153
        }
154
      } else { // if the tsfile is unclosed, we just get it directly from TsFileResource
155
        loadFromMem = true;
×
156

157
        timeSeriesMetadata = (TimeseriesMetadata) resource.getTimeSeriesMetadata(seriesPath);
×
158
        if (timeSeriesMetadata != null) {
×
159
          timeSeriesMetadata.setChunkMetadataLoader(
×
160
              new MemChunkMetadataLoader(resource, seriesPath, context, filter));
161
        }
162
      }
163

164
      if (timeSeriesMetadata != null) {
1✔
165
        long t2 = System.nanoTime();
1✔
166
        try {
167
          List<Modification> pathModifications =
1✔
168
              context.getPathModifications(resource.getModFile(), seriesPath);
1✔
169
          timeSeriesMetadata.setModified(!pathModifications.isEmpty());
1✔
170
          if (timeSeriesMetadata.getStatistics().getStartTime()
1✔
171
              > timeSeriesMetadata.getStatistics().getEndTime()) {
1✔
172
            return null;
×
173
          }
174
          if (filter != null
1✔
175
              && !filter.satisfyStartEndTime(
1✔
176
                  timeSeriesMetadata.getStatistics().getStartTime(),
1✔
177
                  timeSeriesMetadata.getStatistics().getEndTime())) {
1✔
178
            return null;
×
179
          }
180
        } finally {
181
          SERIES_SCAN_COST_METRIC_SET.recordSeriesScanCost(
1✔
182
              TIMESERIES_METADATA_MODIFICATION_NONALIGNED, System.nanoTime() - t2);
1✔
183
        }
184
      }
185
      return timeSeriesMetadata;
1✔
186
    } finally {
187
      SERIES_SCAN_COST_METRIC_SET.recordSeriesScanCost(
1✔
188
          loadFromMem
1✔
189
              ? LOAD_TIMESERIES_METADATA_NONALIGNED_MEM
×
190
              : LOAD_TIMESERIES_METADATA_NONALIGNED_DISK,
1✔
191
          System.nanoTime() - t1);
1✔
192
    }
193
  }
194

195
  /**
196
   * Load AlignedTimeSeriesMetadata for aligned time series.
197
   *
198
   * @param resource corresponding TsFileResource
199
   * @param alignedPath instance of VectorPartialPath, vector's full path, e.g. (root.sg1.d1.vector,
200
   *     [root.sg1.d1.vector.s1, root.sg1.d1.vector.s2])
201
   * @throws IOException IOException may be thrown while reading it from disk.
202
   */
203
  public static AlignedTimeSeriesMetadata loadTimeSeriesMetadata(
204
      TsFileResource resource, AlignedPath alignedPath, QueryContext context, Filter filter)
205
      throws IOException {
206
    final long t1 = System.nanoTime();
1✔
207
    boolean loadFromMem = false;
1✔
208
    try {
209
      AlignedTimeSeriesMetadata alignedTimeSeriesMetadata;
210
      // If the tsfile is closed, we need to load from tsfile
211
      if (resource.isClosed()) {
1✔
212
        alignedTimeSeriesMetadata = loadFromDisk(resource, alignedPath, context, filter);
1✔
213
      } else { // if the tsfile is unclosed, we just get it directly from TsFileResource
214
        loadFromMem = true;
×
215
        alignedTimeSeriesMetadata =
×
216
            (AlignedTimeSeriesMetadata) resource.getTimeSeriesMetadata(alignedPath);
×
217
        if (alignedTimeSeriesMetadata != null) {
×
218
          alignedTimeSeriesMetadata.setChunkMetadataLoader(
×
219
              new MemAlignedChunkMetadataLoader(resource, alignedPath, context, filter));
220
        }
221
      }
222

223
      if (alignedTimeSeriesMetadata != null) {
1✔
224
        final long t2 = System.nanoTime();
1✔
225
        try {
226
          if (alignedTimeSeriesMetadata.getTimeseriesMetadata().getStatistics().getStartTime()
1✔
227
              > alignedTimeSeriesMetadata.getTimeseriesMetadata().getStatistics().getEndTime()) {
1✔
228
            return null;
×
229
          }
230
          if (filter != null
1✔
231
              && !filter.satisfyStartEndTime(
1✔
232
                  alignedTimeSeriesMetadata.getTimeseriesMetadata().getStatistics().getStartTime(),
1✔
233
                  alignedTimeSeriesMetadata.getTimeseriesMetadata().getStatistics().getEndTime())) {
1✔
234
            return null;
×
235
          }
236

237
          // set modifications to each aligned path
238
          setModifications(resource, alignedTimeSeriesMetadata, alignedPath, context);
1✔
239
        } finally {
240
          SERIES_SCAN_COST_METRIC_SET.recordSeriesScanCost(
1✔
241
              TIMESERIES_METADATA_MODIFICATION_ALIGNED, System.nanoTime() - t2);
1✔
242
        }
243
      }
244
      return alignedTimeSeriesMetadata;
1✔
245
    } finally {
246
      SERIES_SCAN_COST_METRIC_SET.recordSeriesScanCost(
1✔
247
          loadFromMem
1✔
248
              ? LOAD_TIMESERIES_METADATA_ALIGNED_MEM
×
249
              : LOAD_TIMESERIES_METADATA_ALIGNED_DISK,
1✔
250
          System.nanoTime() - t1);
1✔
251
    }
252
  }
253

254
  private static AlignedTimeSeriesMetadata loadFromDisk(
255
      TsFileResource resource, AlignedPath alignedPath, QueryContext context, Filter filter)
256
      throws IOException {
257
    AlignedTimeSeriesMetadata alignedTimeSeriesMetadata = null;
1✔
258
    // load all the TimeseriesMetadata of vector, the first one is for time column and the
259
    // remaining is for sub sensors
260
    // the order of timeSeriesMetadata list is same as subSensorList's order
261
    TimeSeriesMetadataCache cache = TimeSeriesMetadataCache.getInstance();
1✔
262
    List<String> valueMeasurementList = alignedPath.getMeasurementList();
1✔
263
    Set<String> allSensors = new HashSet<>(valueMeasurementList);
1✔
264
    allSensors.add("");
1✔
265
    boolean isDebug = context.isDebug();
1✔
266
    String filePath = resource.getTsFilePath();
1✔
267
    String deviceId = alignedPath.getDevice();
1✔
268

269
    // when resource.getTimeIndexType() == 1, TsFileResource.timeIndexType is deviceTimeIndex
270
    // we should not ignore the non-exist of device in TsFileMetadata
271
    TimeseriesMetadata timeColumn =
1✔
272
        cache.get(
1✔
273
            new TimeSeriesMetadataCacheKey(filePath, deviceId, ""),
274
            allSensors,
275
            resource.getTimeIndexType() != 1,
1✔
276
            isDebug);
277
    if (timeColumn != null) {
1✔
278
      List<TimeseriesMetadata> valueTimeSeriesMetadataList =
1✔
279
          new ArrayList<>(valueMeasurementList.size());
1✔
280
      // if all the queried aligned sensors does not exist, we will return null
281
      boolean exist = false;
1✔
282
      for (String valueMeasurement : valueMeasurementList) {
1✔
283
        TimeseriesMetadata valueColumn =
1✔
284
            cache.get(
1✔
285
                new TimeSeriesMetadataCacheKey(filePath, deviceId, valueMeasurement),
286
                allSensors,
287
                resource.getTimeIndexType() != 1,
1✔
288
                isDebug);
289
        exist = (exist || (valueColumn != null));
1✔
290
        valueTimeSeriesMetadataList.add(valueColumn);
1✔
291
      }
1✔
292
      if (exist) {
1✔
293
        alignedTimeSeriesMetadata =
1✔
294
            new AlignedTimeSeriesMetadata(timeColumn, valueTimeSeriesMetadataList);
295
        alignedTimeSeriesMetadata.setChunkMetadataLoader(
1✔
296
            new DiskAlignedChunkMetadataLoader(resource, alignedPath, context, filter));
297
      }
298
    }
299
    return alignedTimeSeriesMetadata;
1✔
300
  }
301

302
  private static void setModifications(
303
      TsFileResource resource,
304
      AlignedTimeSeriesMetadata alignedTimeSeriesMetadata,
305
      AlignedPath alignedPath,
306
      QueryContext context) {
307
    List<TimeseriesMetadata> valueTimeSeriesMetadataList =
1✔
308
        alignedTimeSeriesMetadata.getValueTimeseriesMetadataList();
1✔
309
    boolean modified = false;
1✔
310
    for (int i = 0; i < valueTimeSeriesMetadataList.size(); i++) {
1✔
311
      if (valueTimeSeriesMetadataList.get(i) != null) {
1✔
312
        List<Modification> pathModifications =
1✔
313
            context.getPathModifications(
1✔
314
                resource.getModFile(), alignedPath.getPathWithMeasurement(i));
1✔
315
        valueTimeSeriesMetadataList.get(i).setModified(!pathModifications.isEmpty());
1✔
316
        modified = (modified || !pathModifications.isEmpty());
1✔
317
      }
318
    }
319
    alignedTimeSeriesMetadata.getTimeseriesMetadata().setModified(modified);
1✔
320
  }
1✔
321

322
  /**
323
   * load all chunk metadata of one time series in one file.
324
   *
325
   * @param timeSeriesMetadata the corresponding TimeSeriesMetadata in that file.
326
   */
327
  public static List<IChunkMetadata> loadChunkMetadataList(ITimeSeriesMetadata timeSeriesMetadata) {
328
    return timeSeriesMetadata.loadChunkMetadataList();
1✔
329
  }
330

331
  /**
332
   * load all page readers in one chunk that satisfying the timeFilter.
333
   *
334
   * @param chunkMetaData the corresponding chunk metadata
335
   * @param timeFilter it should be a TimeFilter instead of a ValueFilter
336
   * @throws IOException if chunkMetaData is null or errors happened while loading page readers,
337
   *     IOException will be thrown
338
   */
339
  public static List<IPageReader> loadPageReaderList(
340
      IChunkMetadata chunkMetaData, Filter timeFilter) throws IOException {
341
    if (chunkMetaData == null) {
1✔
342
      throw new IOException("Can't init null chunkMeta");
×
343
    }
344
    IChunkLoader chunkLoader = chunkMetaData.getChunkLoader();
1✔
345
    IChunkReader chunkReader = chunkLoader.getChunkReader(chunkMetaData, timeFilter);
1✔
346
    return chunkReader.loadPageReaderList();
1✔
347
  }
348
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc