• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

apache / iotdb / #9833

pending completion
#9833

push

travis_ci

web-flow
[To rel/1.2] [IOTDB-6112] Fix Limit & Offset push down doesn't take effect while there exist time filter

20 of 20 new or added lines in 3 files covered. (100.0%)

79714 of 165800 relevant lines covered (48.08%)

0.48 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

92.13
/iotdb-core/tsfile/src/main/java/org/apache/iotdb/tsfile/read/reader/page/AlignedPageReader.java
1
/*
2
 * Licensed to the Apache Software Foundation (ASF) under one
3
 * or more contributor license agreements.  See the NOTICE file
4
 * distributed with this work for additional information
5
 * regarding copyright ownership.  The ASF licenses this file
6
 * to you under the Apache License, Version 2.0 (the
7
 * "License"); you may not use this file except in compliance
8
 * with the License.  You may obtain a copy of the License at
9
 *
10
 *     http://www.apache.org/licenses/LICENSE-2.0
11
 *
12
 * Unless required by applicable law or agreed to in writing,
13
 * software distributed under the License is distributed on an
14
 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15
 * KIND, either express or implied.  See the License for the
16
 * specific language governing permissions and limitations
17
 * under the License.
18
 */
19

20
package org.apache.iotdb.tsfile.read.reader.page;
21

22
import org.apache.iotdb.tsfile.encoding.decoder.Decoder;
23
import org.apache.iotdb.tsfile.file.header.PageHeader;
24
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
25
import org.apache.iotdb.tsfile.file.metadata.statistics.Statistics;
26
import org.apache.iotdb.tsfile.read.common.BatchData;
27
import org.apache.iotdb.tsfile.read.common.BatchDataFactory;
28
import org.apache.iotdb.tsfile.read.common.TimeRange;
29
import org.apache.iotdb.tsfile.read.common.block.TsBlock;
30
import org.apache.iotdb.tsfile.read.common.block.TsBlockBuilder;
31
import org.apache.iotdb.tsfile.read.filter.basic.Filter;
32
import org.apache.iotdb.tsfile.read.filter.operator.AndFilter;
33
import org.apache.iotdb.tsfile.read.reader.IAlignedPageReader;
34
import org.apache.iotdb.tsfile.read.reader.IPageReader;
35
import org.apache.iotdb.tsfile.read.reader.series.PaginationController;
36
import org.apache.iotdb.tsfile.utils.TsPrimitiveType;
37

38
import java.io.IOException;
39
import java.io.Serializable;
40
import java.nio.ByteBuffer;
41
import java.util.ArrayList;
42
import java.util.Arrays;
43
import java.util.List;
44

45
import static org.apache.iotdb.tsfile.read.reader.series.PaginationController.UNLIMITED_PAGINATION_CONTROLLER;
46

47
public class AlignedPageReader implements IPageReader, IAlignedPageReader {
48

49
  private final TimePageReader timePageReader;
50
  private final List<ValuePageReader> valuePageReaderList;
51
  private final int valueCount;
52

53
  private Filter filter;
54
  private PaginationController paginationController = UNLIMITED_PAGINATION_CONTROLLER;
1✔
55

56
  private boolean isModified;
57
  private TsBlockBuilder builder;
58

59
  private static final int MASK = 0x80;
60

61
  @SuppressWarnings("squid:S107")
62
  public AlignedPageReader(
63
      PageHeader timePageHeader,
64
      ByteBuffer timePageData,
65
      Decoder timeDecoder,
66
      List<PageHeader> valuePageHeaderList,
67
      List<ByteBuffer> valuePageDataList,
68
      List<TSDataType> valueDataTypeList,
69
      List<Decoder> valueDecoderList,
70
      Filter filter) {
1✔
71
    timePageReader = new TimePageReader(timePageHeader, timePageData, timeDecoder);
1✔
72
    isModified = timePageReader.isModified();
1✔
73
    valuePageReaderList = new ArrayList<>(valuePageHeaderList.size());
1✔
74
    for (int i = 0; i < valuePageHeaderList.size(); i++) {
1✔
75
      if (valuePageHeaderList.get(i) != null) {
1✔
76
        ValuePageReader valuePageReader =
1✔
77
            new ValuePageReader(
78
                valuePageHeaderList.get(i),
1✔
79
                valuePageDataList.get(i),
1✔
80
                valueDataTypeList.get(i),
1✔
81
                valueDecoderList.get(i));
1✔
82
        valuePageReaderList.add(valuePageReader);
1✔
83
        isModified = isModified || valuePageReader.isModified();
1✔
84
      } else {
1✔
85
        valuePageReaderList.add(null);
1✔
86
      }
87
    }
88
    this.filter = filter;
1✔
89
    this.valueCount = valuePageReaderList.size();
1✔
90
  }
1✔
91

92
  @Override
93
  public BatchData getAllSatisfiedPageData(boolean ascending) throws IOException {
94
    BatchData pageData = BatchDataFactory.createBatchData(TSDataType.VECTOR, ascending, false);
1✔
95
    int timeIndex = -1;
1✔
96
    while (timePageReader.hasNextTime()) {
1✔
97
      long timestamp = timePageReader.nextTime();
1✔
98
      timeIndex++;
1✔
99
      // if all the sub sensors' value are null in current row, just discard it
100
      boolean isNull = true;
1✔
101
      Object notNullObject = null;
1✔
102
      TsPrimitiveType[] v = new TsPrimitiveType[valueCount];
1✔
103
      for (int i = 0; i < v.length; i++) {
1✔
104
        ValuePageReader pageReader = valuePageReaderList.get(i);
1✔
105
        v[i] = pageReader == null ? null : pageReader.nextValue(timestamp, timeIndex);
1✔
106
        if (v[i] != null) {
1✔
107
          isNull = false;
1✔
108
          notNullObject = v[i].getValue();
1✔
109
        }
110
      }
111
      // Currently, if it's a value filter, it will only accept AlignedPath with only one sub
112
      // sensor
113
      if (!isNull && (filter == null || filter.satisfy(timestamp, notNullObject))) {
1✔
114
        pageData.putVector(timestamp, v);
1✔
115
      }
116
    }
1✔
117
    return pageData.flip();
1✔
118
  }
119

120
  private boolean pageSatisfy() {
121
    Statistics statistics = getStatistics();
1✔
122
    if (filter == null || filter.allSatisfy(statistics)) {
1✔
123
      // For aligned series, When we only query some measurements under an aligned device, if the
124
      // values of these queried measurements at a timestamp are all null, the timestamp will not be
125
      // selected.
126
      // NOTE: if we change the query semantic in the future for aligned series, we need to remove
127
      // this check here.
128
      long rowCount = getTimeStatistics().getCount();
1✔
129
      for (Statistics vStatistics : getValueStatisticsList()) {
1✔
130
        if (vStatistics == null || vStatistics.hasNullValue(rowCount)) {
1✔
131
          return true;
1✔
132
        }
133
      }
1✔
134
      // When the number of points in all value pages is the same as that in the time page, it means
135
      // that there is no null value, and all timestamps will be selected.
136
      if (paginationController.hasCurOffset(rowCount)) {
1✔
137
        paginationController.consumeOffset(rowCount);
×
138
        return false;
×
139
      } else {
140
        return true;
1✔
141
      }
142
    } else {
143
      // TODO accept valueStatisticsList to filter
144
      return filter.satisfy(statistics);
×
145
    }
146
  }
147

148
  @Override
149
  public TsBlock getAllSatisfiedData() throws IOException {
150
    builder.reset();
1✔
151
    if (!pageSatisfy()) {
1✔
152
      return builder.build();
×
153
    }
154

155
    long[] timeBatch = timePageReader.getNextTimeBatch();
1✔
156

157
    // if all the sub sensors' value are null in current row, just discard it
158
    // if !filter.satisfy, discard this row
159
    boolean[] keepCurrentRow = new boolean[timeBatch.length];
1✔
160
    if (filter == null) {
1✔
161
      Arrays.fill(keepCurrentRow, true);
1✔
162
    } else {
163
      for (int i = 0, n = timeBatch.length; i < n; i++) {
1✔
164
        keepCurrentRow[i] = filter.satisfy(timeBatch[i], null);
1✔
165
      }
166
    }
167

168
    // using bitMap in valuePageReaders to indicate whether columns of current row are all null.
169
    byte[] bitmask = new byte[(timeBatch.length - 1) / 8 + 1];
1✔
170
    Arrays.fill(bitmask, (byte) 0x00);
1✔
171
    boolean[][] isDeleted = new boolean[valueCount][timeBatch.length];
1✔
172
    for (int columnIndex = 0; columnIndex < valueCount; columnIndex++) {
1✔
173
      ValuePageReader pageReader = valuePageReaderList.get(columnIndex);
1✔
174
      if (pageReader != null) {
1✔
175
        byte[] bitmap = pageReader.getBitmap();
1✔
176
        pageReader.fillIsDeleted(timeBatch, isDeleted[columnIndex]);
1✔
177

178
        for (int i = 0, n = isDeleted[columnIndex].length; i < n; i++) {
1✔
179
          if (isDeleted[columnIndex][i]) {
1✔
180
            int shift = i % 8;
1✔
181
            bitmap[i / 8] = (byte) (bitmap[i / 8] & (~(MASK >>> shift)));
1✔
182
          }
183
        }
184
        for (int i = 0, n = bitmask.length; i < n; i++) {
1✔
185
          bitmask[i] = (byte) (bitmap[i] | bitmask[i]);
1✔
186
        }
187
      }
188
    }
189

190
    for (int i = 0, n = bitmask.length; i < n; i++) {
1✔
191
      if (bitmask[i] == (byte) 0xFF) {
1✔
192
        // 8 rows are not all null, do nothing
193
      } else if (bitmask[i] == (byte) 0x00) {
1✔
194
        for (int j = 0; j < 8 && (i * 8 + j < keepCurrentRow.length); j++) {
1✔
195
          keepCurrentRow[i * 8 + j] = false;
1✔
196
        }
197
      } else {
198
        for (int j = 0; j < 8 && (i * 8 + j < keepCurrentRow.length); j++) {
1✔
199
          if (((bitmask[i] & 0xFF) & (MASK >>> j)) == 0) {
1✔
200
            keepCurrentRow[i * 8 + j] = false;
1✔
201
          }
202
        }
203
      }
204
    }
205

206
    // construct time column
207
    int readEndIndex = timeBatch.length;
1✔
208
    for (int i = 0; i < timeBatch.length; i++) {
1✔
209
      if (!keepCurrentRow[i]) {
1✔
210
        continue;
1✔
211
      }
212
      if (paginationController.hasCurOffset()) {
1✔
213
        paginationController.consumeOffset();
1✔
214
        keepCurrentRow[i] = false;
1✔
215
        continue;
1✔
216
      }
217
      if (paginationController.hasCurLimit()) {
1✔
218
        builder.getTimeColumnBuilder().writeLong(timeBatch[i]);
1✔
219
        builder.declarePosition();
1✔
220
        paginationController.consumeLimit();
1✔
221
      } else {
222
        readEndIndex = i;
1✔
223
        break;
1✔
224
      }
225
    }
226

227
    // construct value columns
228
    for (int i = 0; i < valueCount; i++) {
1✔
229
      ValuePageReader pageReader = valuePageReaderList.get(i);
1✔
230
      if (pageReader != null) {
1✔
231
        pageReader.writeColumnBuilderWithNextBatch(
1✔
232
            readEndIndex, builder.getColumnBuilder(i), keepCurrentRow, isDeleted[i]);
1✔
233
      } else {
234
        for (int j = 0; j < readEndIndex; j++) {
1✔
235
          if (keepCurrentRow[j]) {
1✔
236
            builder.getColumnBuilder(i).appendNull();
1✔
237
          }
238
        }
239
      }
240
    }
241
    return builder.build();
1✔
242
  }
243

244
  public void setDeleteIntervalList(List<List<TimeRange>> list) {
245
    for (int i = 0; i < valueCount; i++) {
1✔
246
      if (valuePageReaderList.get(i) != null) {
1✔
247
        valuePageReaderList.get(i).setDeleteIntervalList(list.get(i));
1✔
248
      }
249
    }
250
  }
1✔
251

252
  @Override
253
  public Statistics getStatistics() {
254
    return valuePageReaderList.size() == 1 && valuePageReaderList.get(0) != null
1✔
255
        ? valuePageReaderList.get(0).getStatistics()
1✔
256
        : timePageReader.getStatistics();
1✔
257
  }
258

259
  @Override
260
  public Statistics getStatistics(int index) {
261
    ValuePageReader valuePageReader = valuePageReaderList.get(index);
×
262
    return valuePageReader == null ? null : valuePageReader.getStatistics();
×
263
  }
264

265
  @Override
266
  public Statistics getTimeStatistics() {
267
    return timePageReader.getStatistics();
1✔
268
  }
269

270
  private List<Statistics<Serializable>> getValueStatisticsList() {
271
    List<Statistics<Serializable>> valueStatisticsList = new ArrayList<>();
1✔
272
    for (ValuePageReader v : valuePageReaderList) {
1✔
273
      valueStatisticsList.add(v == null ? null : v.getStatistics());
1✔
274
    }
1✔
275
    return valueStatisticsList;
1✔
276
  }
277

278
  @Override
279
  public void setFilter(Filter filter) {
280
    if (this.filter == null) {
×
281
      this.filter = filter;
×
282
    } else {
283
      this.filter = new AndFilter(this.filter, filter);
×
284
    }
285
  }
×
286

287
  @Override
288
  public void setLimitOffset(PaginationController paginationController) {
289
    this.paginationController = paginationController;
1✔
290
  }
1✔
291

292
  @Override
293
  public boolean isModified() {
294
    return isModified;
1✔
295
  }
296

297
  @Override
298
  public void initTsBlockBuilder(List<TSDataType> dataTypes) {
299
    builder = new TsBlockBuilder(dataTypes);
1✔
300
  }
1✔
301
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc