• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

apache / iotdb / #10011

06 Sep 2023 11:55AM UTC coverage: 47.651% (-0.003%) from 47.654%
#10011

Pull #11068

travis_ci

web-flow
Merge 42d83cb9e into 720ad0d5b
Pull Request #11068: try opt

97 of 97 new or added lines in 3 files covered. (100.0%)

80187 of 168279 relevant lines covered (47.65%)

0.48 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

79.49
/iotdb-core/tsfile/src/main/java/org/apache/iotdb/tsfile/read/reader/page/AlignedPageReader.java
1
/*
2
 * Licensed to the Apache Software Foundation (ASF) under one
3
 * or more contributor license agreements.  See the NOTICE file
4
 * distributed with this work for additional information
5
 * regarding copyright ownership.  The ASF licenses this file
6
 * to you under the Apache License, Version 2.0 (the
7
 * "License"); you may not use this file except in compliance
8
 * with the License.  You may obtain a copy of the License at
9
 *
10
 *     http://www.apache.org/licenses/LICENSE-2.0
11
 *
12
 * Unless required by applicable law or agreed to in writing,
13
 * software distributed under the License is distributed on an
14
 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15
 * KIND, either express or implied.  See the License for the
16
 * specific language governing permissions and limitations
17
 * under the License.
18
 */
19

20
package org.apache.iotdb.tsfile.read.reader.page;
21

22
import org.apache.iotdb.tsfile.encoding.decoder.Decoder;
23
import org.apache.iotdb.tsfile.file.header.PageHeader;
24
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
25
import org.apache.iotdb.tsfile.file.metadata.statistics.Statistics;
26
import org.apache.iotdb.tsfile.read.common.BatchData;
27
import org.apache.iotdb.tsfile.read.common.BatchDataFactory;
28
import org.apache.iotdb.tsfile.read.common.TimeRange;
29
import org.apache.iotdb.tsfile.read.common.block.TsBlock;
30
import org.apache.iotdb.tsfile.read.common.block.TsBlockBuilder;
31
import org.apache.iotdb.tsfile.read.filter.basic.Filter;
32
import org.apache.iotdb.tsfile.read.filter.operator.AndFilter;
33
import org.apache.iotdb.tsfile.read.reader.IAlignedPageReader;
34
import org.apache.iotdb.tsfile.read.reader.IPageReader;
35
import org.apache.iotdb.tsfile.read.reader.IPointReader;
36
import org.apache.iotdb.tsfile.read.reader.series.PaginationController;
37
import org.apache.iotdb.tsfile.utils.TsPrimitiveType;
38

39
import java.io.IOException;
40
import java.io.Serializable;
41
import java.nio.ByteBuffer;
42
import java.util.ArrayList;
43
import java.util.Arrays;
44
import java.util.List;
45

46
import static org.apache.iotdb.tsfile.read.reader.series.PaginationController.UNLIMITED_PAGINATION_CONTROLLER;
47

48
public class AlignedPageReader implements IPageReader, IAlignedPageReader {
49

50
  private final TimePageReader timePageReader;
51
  private final List<ValuePageReader> valuePageReaderList;
52
  private final int valueCount;
53

54
  // only used for limit and offset push down optimizer, if we select all columns from aligned
55
  // device, we
56
  // can use statistics to skip.
57
  // it's only exact while using limit & offset push down
58
  private final boolean queryAllSensors;
59

60
  private Filter filter;
61
  private PaginationController paginationController = UNLIMITED_PAGINATION_CONTROLLER;
1✔
62

63
  private boolean isModified;
64
  private TsBlockBuilder builder;
65

66
  private static final int MASK = 0x80;
67

68
  @SuppressWarnings("squid:S107")
69
  public AlignedPageReader(
70
      PageHeader timePageHeader,
71
      ByteBuffer timePageData,
72
      Decoder timeDecoder,
73
      List<PageHeader> valuePageHeaderList,
74
      List<ByteBuffer> valuePageDataList,
75
      List<TSDataType> valueDataTypeList,
76
      List<Decoder> valueDecoderList,
77
      Filter filter,
78
      boolean queryAllSensors) {
1✔
79
    timePageReader = new TimePageReader(timePageHeader, timePageData, timeDecoder);
1✔
80
    isModified = timePageReader.isModified();
1✔
81
    valuePageReaderList = new ArrayList<>(valuePageHeaderList.size());
1✔
82
    for (int i = 0; i < valuePageHeaderList.size(); i++) {
1✔
83
      if (valuePageHeaderList.get(i) != null) {
1✔
84
        ValuePageReader valuePageReader =
1✔
85
            new ValuePageReader(
86
                valuePageHeaderList.get(i),
1✔
87
                valuePageDataList.get(i),
1✔
88
                valueDataTypeList.get(i),
1✔
89
                valueDecoderList.get(i));
1✔
90
        valuePageReaderList.add(valuePageReader);
1✔
91
        isModified = isModified || valuePageReader.isModified();
1✔
92
      } else {
1✔
93
        valuePageReaderList.add(null);
1✔
94
      }
95
    }
96
    this.filter = filter;
1✔
97
    this.valueCount = valuePageReaderList.size();
1✔
98
    this.queryAllSensors = queryAllSensors;
1✔
99
  }
1✔
100

101
  @Override
102
  public BatchData getAllSatisfiedPageData(boolean ascending) throws IOException {
103
    BatchData pageData = BatchDataFactory.createBatchData(TSDataType.VECTOR, ascending, false);
1✔
104
    int timeIndex = -1;
1✔
105
    while (timePageReader.hasNextTime()) {
1✔
106
      long timestamp = timePageReader.nextTime();
1✔
107
      timeIndex++;
1✔
108
      // if all the sub sensors' value are null in current row, just discard it
109
      boolean isNull = true;
1✔
110
      Object notNullObject = null;
1✔
111
      TsPrimitiveType[] v = new TsPrimitiveType[valueCount];
1✔
112
      for (int i = 0; i < v.length; i++) {
1✔
113
        ValuePageReader pageReader = valuePageReaderList.get(i);
1✔
114
        v[i] = pageReader == null ? null : pageReader.nextValue(timestamp, timeIndex);
1✔
115
        if (v[i] != null) {
1✔
116
          isNull = false;
1✔
117
          notNullObject = v[i].getValue();
1✔
118
        }
119
      }
120
      // Currently, if it's a value filter, it will only accept AlignedPath with only one sub
121
      // sensor
122
      if (!isNull && (filter == null || filter.satisfy(timestamp, notNullObject))) {
1✔
123
        pageData.putVector(timestamp, v);
1✔
124
      }
125
    }
1✔
126
    return pageData.flip();
1✔
127
  }
128

129
  private boolean pageSatisfy() {
130
    Statistics statistics = getStatistics();
1✔
131
    if (filter == null || filter.allSatisfy(statistics)) {
1✔
132
      // For aligned series, When we only query some measurements under an aligned device, if any
133
      // values of these queried measurements has the same value count as the time column, the
134
      // timestamp will be selected.
135
      // NOTE: if we change the query semantic in the future for aligned series, we need to remove
136
      // this check here.
137
      long rowCount = getTimeStatistics().getCount();
1✔
138
      boolean canUse = queryAllSensors || getValueStatisticsList().isEmpty();
1✔
139
      if (!canUse) {
1✔
140
        for (Statistics vStatistics : getValueStatisticsList()) {
1✔
141
          if (vStatistics != null && !vStatistics.hasNullValue(rowCount)) {
1✔
142
            canUse = true;
1✔
143
            break;
1✔
144
          }
145
        }
1✔
146
      }
147
      if (!canUse) {
1✔
148
        return true;
1✔
149
      }
150
      // When the number of points in all value pages is the same as that in the time page, it means
151
      // that there is no null value, and all timestamps will be selected.
152
      if (paginationController.hasCurOffset(rowCount)) {
1✔
153
        paginationController.consumeOffset(rowCount);
×
154
        return false;
×
155
      } else {
156
        return true;
1✔
157
      }
158
    } else {
159
      // TODO accept valueStatisticsList to filter
160
      return filter.satisfy(statistics);
×
161
    }
162
  }
163

164
  public IPointReader getLazyPointReader() throws IOException {
165
    return new LazyLoadAlignedPagePointReader(timePageReader, valuePageReaderList);
1✔
166
  }
167

168
  @Override
169
  public TsBlock getAllSatisfiedData() throws IOException {
170
    if (!pageSatisfy()) {
1✔
171
      return builder.build();
×
172
    }
173

174
    long[] timeBatch = timePageReader.getNextTimeBatch();
1✔
175

176
    if (queryAllSensors && !isModified) {
1✔
177
      // skip all the page
178
      if (paginationController.hasCurOffset(timeBatch.length)) {
×
179
        paginationController.consumeOffset(timeBatch.length);
×
180
      } else {
181
        // consume the remaining offset
182
        if (paginationController.hasCurOffset()) {
×
183
          paginationController.consumeOffset(paginationController.getCurOffset());
×
184
        }
185
        int readStartIndex =
186
            paginationController.hasCurOffset() ? (int) paginationController.getCurOffset() : 0;
×
187
        // not included
188
        int readEndIndex =
189
            (paginationController.hasCurLimit()
×
190
                    && (paginationController.getCurLimit() < timeBatch.length - readStartIndex + 1))
×
191
                ? readStartIndex + (int) paginationController.getCurLimit()
×
192
                : timeBatch.length;
×
193
        if (paginationController.hasCurLimit()) {
×
194
          paginationController.consumeLimit(readEndIndex - readStartIndex);
×
195
        }
196
        // construct time column
197
        for (int i = readStartIndex; i < readEndIndex; i++) {
×
198
          builder.getTimeColumnBuilder().writeLong(timeBatch[i]);
×
199
          builder.declarePosition();
×
200
        }
201
        // construct value columns
202
        for (int i = 0; i < valueCount; i++) {
×
203
          ValuePageReader pageReader = valuePageReaderList.get(i);
×
204
          if (pageReader != null) {
×
205
            pageReader.writeColumnBuilderWithNextBatch(
×
206
                readStartIndex, readEndIndex, builder.getColumnBuilder(i));
×
207
          } else {
208
            for (int j = readStartIndex; j < readEndIndex; j++) {
×
209
              builder.getColumnBuilder(i).appendNull();
×
210
            }
211
          }
212
        }
213
      }
×
214
    } else {
215
      // if all the sub sensors' value are null in current row, just discard it
216
      // if !filter.satisfy, discard this row
217
      boolean[] keepCurrentRow = new boolean[timeBatch.length];
1✔
218
      if (filter == null) {
1✔
219
        Arrays.fill(keepCurrentRow, true);
1✔
220
      } else {
221
        for (int i = 0, n = timeBatch.length; i < n; i++) {
1✔
222
          keepCurrentRow[i] = filter.satisfy(timeBatch[i], null);
1✔
223
        }
224
      }
225

226
      boolean[][] isDeleted = null;
1✔
227
      if (valueCount != 0) {
1✔
228
        // using bitMap in valuePageReaders to indicate whether columns of current row are all null.
229
        byte[] bitmask = new byte[(timeBatch.length - 1) / 8 + 1];
1✔
230
        Arrays.fill(bitmask, (byte) 0x00);
1✔
231
        isDeleted = new boolean[valueCount][timeBatch.length];
1✔
232
        for (int columnIndex = 0; columnIndex < valueCount; columnIndex++) {
1✔
233
          ValuePageReader pageReader = valuePageReaderList.get(columnIndex);
1✔
234
          if (pageReader != null) {
1✔
235
            byte[] bitmap = pageReader.getBitmap();
1✔
236
            pageReader.fillIsDeleted(timeBatch, isDeleted[columnIndex]);
1✔
237

238
            for (int i = 0, n = isDeleted[columnIndex].length; i < n; i++) {
1✔
239
              if (isDeleted[columnIndex][i]) {
1✔
240
                int shift = i % 8;
1✔
241
                bitmap[i / 8] = (byte) (bitmap[i / 8] & (~(MASK >>> shift)));
1✔
242
              }
243
            }
244
            for (int i = 0, n = bitmask.length; i < n; i++) {
1✔
245
              bitmask[i] = (byte) (bitmap[i] | bitmask[i]);
1✔
246
            }
247
          }
248
        }
249

250
        for (int i = 0, n = bitmask.length; i < n; i++) {
1✔
251
          if (bitmask[i] == (byte) 0xFF) {
1✔
252
            // 8 rows are not all null, do nothing
253
          } else if (bitmask[i] == (byte) 0x00) {
1✔
254
            for (int j = 0; j < 8 && (i * 8 + j < keepCurrentRow.length); j++) {
1✔
255
              keepCurrentRow[i * 8 + j] = false;
1✔
256
            }
257
          } else {
258
            for (int j = 0; j < 8 && (i * 8 + j < keepCurrentRow.length); j++) {
1✔
259
              if (((bitmask[i] & 0xFF) & (MASK >>> j)) == 0) {
1✔
260
                keepCurrentRow[i * 8 + j] = false;
1✔
261
              }
262
            }
263
          }
264
        }
265
      }
266

267
      // construct time column
268
      int readEndIndex = timeBatch.length;
1✔
269
      for (int i = 0; i < timeBatch.length; i++) {
1✔
270
        if (keepCurrentRow[i]) {
1✔
271
          if (paginationController.hasCurOffset()) {
1✔
272
            paginationController.consumeOffset();
1✔
273
            keepCurrentRow[i] = false;
1✔
274
          } else if (paginationController.hasCurLimit()) {
1✔
275
            builder.getTimeColumnBuilder().writeLong(timeBatch[i]);
1✔
276
            builder.declarePosition();
1✔
277
            paginationController.consumeLimit();
1✔
278
          } else {
279
            readEndIndex = i;
1✔
280
            break;
1✔
281
          }
282
        }
283
      }
284

285
      // construct value columns
286
      for (int i = 0; i < valueCount; i++) {
1✔
287
        ValuePageReader pageReader = valuePageReaderList.get(i);
1✔
288
        if (pageReader != null) {
1✔
289
          pageReader.writeColumnBuilderWithNextBatch(
1✔
290
              readEndIndex, builder.getColumnBuilder(i), keepCurrentRow, isDeleted[i]);
1✔
291
        } else {
292
          for (int j = 0; j < readEndIndex; j++) {
1✔
293
            if (keepCurrentRow[j]) {
1✔
294
              builder.getColumnBuilder(i).appendNull();
1✔
295
            }
296
          }
297
        }
298
      }
299
    }
300
    return builder.build();
1✔
301
  }
302

303
  public void setDeleteIntervalList(List<List<TimeRange>> list) {
304
    for (int i = 0; i < valueCount; i++) {
1✔
305
      if (valuePageReaderList.get(i) != null) {
1✔
306
        valuePageReaderList.get(i).setDeleteIntervalList(list.get(i));
1✔
307
      }
308
    }
309
  }
1✔
310

311
  @Override
312
  public Statistics getStatistics() {
313
    return valuePageReaderList.size() == 1 && valuePageReaderList.get(0) != null
1✔
314
        ? valuePageReaderList.get(0).getStatistics()
1✔
315
        : timePageReader.getStatistics();
1✔
316
  }
317

318
  @Override
319
  public Statistics getStatistics(int index) {
320
    ValuePageReader valuePageReader = valuePageReaderList.get(index);
×
321
    return valuePageReader == null ? null : valuePageReader.getStatistics();
×
322
  }
323

324
  @Override
325
  public Statistics getTimeStatistics() {
326
    return timePageReader.getStatistics();
1✔
327
  }
328

329
  private List<Statistics<Serializable>> getValueStatisticsList() {
330
    List<Statistics<Serializable>> valueStatisticsList = new ArrayList<>();
1✔
331
    for (ValuePageReader v : valuePageReaderList) {
1✔
332
      valueStatisticsList.add(v == null ? null : v.getStatistics());
1✔
333
    }
1✔
334
    return valueStatisticsList;
1✔
335
  }
336

337
  @Override
338
  public void setFilter(Filter filter) {
339
    if (this.filter == null) {
×
340
      this.filter = filter;
×
341
    } else {
342
      this.filter = new AndFilter(this.filter, filter);
×
343
    }
344
  }
×
345

346
  @Override
347
  public void setLimitOffset(PaginationController paginationController) {
348
    this.paginationController = paginationController;
1✔
349
  }
1✔
350

351
  @Override
352
  public boolean isModified() {
353
    return isModified;
1✔
354
  }
355

356
  @Override
357
  public void initTsBlockBuilder(List<TSDataType> dataTypes) {
358
    builder = new TsBlockBuilder((int) timePageReader.getStatistics().getCount(), dataTypes);
1✔
359
  }
1✔
360
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc