• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

apache / iotdb / #10019

07 Sep 2023 04:50AM UTC coverage: 47.489% (-0.2%) from 47.655%
#10019

push

travis_ci

web-flow
Pipe: Fix ConcurrentModificationException caused by concurrently iterating through CachedSchemaPatternMatcher.extractors when an PipeHeartbeatEvent is being assigned (#11074)

* try to fix ConcurrentModificationException when assigning PipeHeartbeatEvent

* Update CachedSchemaPatternMatcher.java

---------

Co-authored-by: 马子坤 <55695098+DanielWang2035@users.noreply.github.com>

1 of 1 new or added line in 1 file covered. (100.0%)

80551 of 169622 relevant lines covered (47.49%)

0.47 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

59.89
/iotdb-core/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/block/TsBlock.java
1
/*
2
 * Licensed to the Apache Software Foundation (ASF) under one
3
 * or more contributor license agreements.  See the NOTICE file
4
 * distributed with this work for additional information
5
 * regarding copyright ownership.  The ASF licenses this file
6
 * to you under the Apache License, Version 2.0 (the
7
 * "License"); you may not use this file except in compliance
8
 * with the License.  You may obtain a copy of the License at
9
 *
10
 *      http://www.apache.org/licenses/LICENSE-2.0
11
 *
12
 * Unless required by applicable law or agreed to in writing,
13
 * software distributed under the License is distributed on an
14
 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15
 * KIND, either express or implied.  See the License for the
16
 * specific language governing permissions and limitations
17
 * under the License.
18
 */
19

20
package org.apache.iotdb.tsfile.read.common.block;
21

22
import org.apache.iotdb.tsfile.read.TimeValuePair;
23
import org.apache.iotdb.tsfile.read.common.IBatchDataIterator;
24
import org.apache.iotdb.tsfile.read.common.block.column.Column;
25
import org.apache.iotdb.tsfile.read.common.block.column.TimeColumn;
26
import org.apache.iotdb.tsfile.read.reader.IPointReader;
27
import org.apache.iotdb.tsfile.utils.TsPrimitiveType;
28

29
import org.openjdk.jol.info.ClassLayout;
30

31
import java.util.Arrays;
32
import java.util.Iterator;
33
import java.util.NoSuchElementException;
34

35
import static java.lang.String.format;
36
import static java.util.Objects.requireNonNull;
37

38
/**
39
 * Intermediate result for most of ExecOperators. The TsBlock contains data from one or more columns
40
 * and constructs them as a row based view The columns can be series, aggregation result for one
41
 * series or scalar value (such as deviceName).
42
 */
43
public class TsBlock {
44

45
  public static final int INSTANCE_SIZE = ClassLayout.parseClass(TsBlock.class).instanceSize();
1✔
46

47
  private static final Column[] EMPTY_COLUMNS = new Column[0];
1✔
48

49
  /**
50
   * Visible to give trusted classes like {@link TsBlockBuilder} access to a constructor that
51
   * doesn't defensively copy the valueColumns
52
   */
53
  public static TsBlock wrapBlocksWithoutCopy(
54
      int positionCount, TimeColumn timeColumn, Column[] valueColumns) {
55
    return new TsBlock(false, positionCount, timeColumn, valueColumns);
1✔
56
  }
57

58
  private final TimeColumn timeColumn;
59

60
  private final Column[] valueColumns;
61

62
  /** How many rows in current TsBlock */
63
  private final int positionCount;
64

65
  private volatile long retainedSizeInBytes = -1;
1✔
66

67
  public TsBlock(int positionCount) {
68
    this(false, positionCount, null, EMPTY_COLUMNS);
1✔
69
  }
1✔
70

71
  public TsBlock(TimeColumn timeColumn, Column... valueColumns) {
72
    this(true, determinePositionCount(valueColumns), timeColumn, valueColumns);
1✔
73
  }
1✔
74

75
  public TsBlock(int positionCount, TimeColumn timeColumn, Column... valueColumns) {
76
    this(true, positionCount, timeColumn, valueColumns);
1✔
77
  }
1✔
78

79
  private TsBlock(
80
      boolean columnsCopyRequired,
81
      int positionCount,
82
      TimeColumn timeColumn,
83
      Column[] valueColumns) {
1✔
84
    requireNonNull(valueColumns, "blocks is null");
1✔
85
    this.positionCount = positionCount;
1✔
86
    this.timeColumn = timeColumn;
1✔
87
    if (valueColumns.length == 0) {
1✔
88
      this.valueColumns = EMPTY_COLUMNS;
1✔
89
      // Empty blocks are not considered "retained" by any particular page
90
      this.retainedSizeInBytes = INSTANCE_SIZE;
1✔
91
    } else {
92
      this.valueColumns = columnsCopyRequired ? valueColumns.clone() : valueColumns;
1✔
93
    }
94
  }
1✔
95

96
  public int getPositionCount() {
97
    return positionCount;
1✔
98
  }
99

100
  public long getStartTime() {
101
    return timeColumn.getStartTime();
1✔
102
  }
103

104
  public long getEndTime() {
105
    return timeColumn.getEndTime();
1✔
106
  }
107

108
  public boolean isEmpty() {
109
    return positionCount == 0;
1✔
110
  }
111

112
  public long getRetainedSizeInBytes() {
113
    if (retainedSizeInBytes < 0) {
1✔
114
      return updateRetainedSize();
1✔
115
    }
116
    return retainedSizeInBytes;
1✔
117
  }
118

119
  /**
120
   * @param positionOffset start offset
121
   * @param length slice length
122
   * @return view of current TsBlock start from positionOffset to positionOffset + length
123
   */
124
  public TsBlock getRegion(int positionOffset, int length) {
125
    if (positionOffset < 0 || length < 0 || positionOffset + length > positionCount) {
1✔
126
      throw new IndexOutOfBoundsException(
×
127
          format(
×
128
              "Invalid position %s and length %s in page with %s positions",
129
              positionOffset, length, positionCount));
×
130
    }
131
    int channelCount = getValueColumnCount();
1✔
132
    Column[] slicedColumns = new Column[channelCount];
1✔
133
    for (int i = 0; i < channelCount; i++) {
1✔
134
      slicedColumns[i] = valueColumns[i].getRegion(positionOffset, length);
1✔
135
    }
136
    return wrapBlocksWithoutCopy(
1✔
137
        length, (TimeColumn) timeColumn.getRegion(positionOffset, length), slicedColumns);
1✔
138
  }
139

140
  public TsBlock appendValueColumns(Column[] columns) {
141
    Column[] newBlocks = Arrays.copyOf(valueColumns, valueColumns.length + columns.length);
×
142
    int newColumnIndex = valueColumns.length;
×
143
    for (Column column : columns) {
×
144
      requireNonNull(column, "Column is null");
×
145
      if (positionCount != column.getPositionCount()) {
×
146
        throw new IllegalArgumentException("Block does not have same position count");
×
147
      }
148
      newBlocks[newColumnIndex++] = column;
×
149
    }
150
    return wrapBlocksWithoutCopy(positionCount, timeColumn, newBlocks);
×
151
  }
152

153
  /**
154
   * Attention. This method uses System.arraycopy() to extend the valueColumn array, so its
155
   * performance is not ensured if you have many insert operations.
156
   */
157
  public TsBlock insertValueColumn(int index, Column column) {
158
    requireNonNull(column, "Column is null");
×
159
    if (positionCount != column.getPositionCount()) {
×
160
      throw new IllegalArgumentException("Block does not have same position count");
×
161
    }
162

163
    Column[] newBlocks = Arrays.copyOf(valueColumns, valueColumns.length + 1);
×
164
    System.arraycopy(newBlocks, index, newBlocks, index + 1, valueColumns.length - index);
×
165
    newBlocks[index] = column;
×
166
    return wrapBlocksWithoutCopy(positionCount, timeColumn, newBlocks);
×
167
  }
168

169
  /**
170
   * This method will create a temporary view of origin tsBlock, which will reuse the arrays of
171
   * columns but with different offset. It can be used where you want to skip some points when
172
   * getting iterator.
173
   */
174
  public TsBlock subTsBlock(int fromIndex) {
175
    if (fromIndex > positionCount) {
1✔
176
      throw new IllegalArgumentException("FromIndex of subTsBlock cannot over positionCount.");
×
177
    }
178
    TimeColumn subTimeColumn = (TimeColumn) timeColumn.subColumn(fromIndex);
1✔
179
    Column[] subValueColumns = new Column[valueColumns.length];
1✔
180
    for (int i = 0; i < subValueColumns.length; i++) {
1✔
181
      subValueColumns[i] = valueColumns[i].subColumn(fromIndex);
1✔
182
    }
183
    return new TsBlock(subTimeColumn, subValueColumns);
1✔
184
  }
185

186
  public TsBlock skipFirst() {
187
    return this.subTsBlock(1);
1✔
188
  }
189

190
  public long getTimeByIndex(int index) {
191
    return timeColumn.getLong(index);
1✔
192
  }
193

194
  public int getValueColumnCount() {
195
    return valueColumns.length;
1✔
196
  }
197

198
  public TimeColumn getTimeColumn() {
199
    return timeColumn;
1✔
200
  }
201

202
  public Column[] getValueColumns() {
203
    return valueColumns;
1✔
204
  }
205

206
  public Column getColumn(int columnIndex) {
207
    return valueColumns[columnIndex];
1✔
208
  }
209

210
  public Column[] getTimeAndValueColumn(int columnIndex) {
211
    Column[] columns = new Column[2];
×
212
    columns[0] = getTimeColumn();
×
213
    columns[1] = getColumn(columnIndex);
×
214
    return columns;
×
215
  }
216

217
  public Column[] getColumns(int[] columnIndexes) {
218
    Column[] columns = new Column[columnIndexes.length];
×
219
    for (int i = 0; i < columnIndexes.length; i++) {
×
220
      columns[i] = valueColumns[columnIndexes[i]];
×
221
    }
222
    return columns;
×
223
  }
224

225
  public TsBlockSingleColumnIterator getTsBlockSingleColumnIterator() {
226
    return new TsBlockSingleColumnIterator(0);
1✔
227
  }
228

229
  public TsBlockSingleColumnIterator getTsBlockSingleColumnIterator(int columnIndex) {
230
    return new TsBlockSingleColumnIterator(0, columnIndex);
×
231
  }
232

233
  public void reverse() {
234
    timeColumn.reverse();
1✔
235
    for (Column valueColumn : valueColumns) {
1✔
236
      valueColumn.reverse();
1✔
237
    }
238
  }
1✔
239

240
  public TsBlockRowIterator getTsBlockRowIterator() {
241
    return new TsBlockRowIterator(0);
×
242
  }
243

244
  /** Only used for the batch data of vector time series. */
245
  public TsBlockAlignedRowIterator getTsBlockAlignedRowIterator() {
246
    return new TsBlockAlignedRowIterator(0);
1✔
247
  }
248

249
  public class TsBlockSingleColumnIterator implements IPointReader, IBatchDataIterator {
250

251
    private int rowIndex;
252
    private final int columnIndex;
253

254
    public TsBlockSingleColumnIterator(int rowIndex) {
1✔
255
      this.rowIndex = rowIndex;
1✔
256
      this.columnIndex = 0;
1✔
257
    }
1✔
258

259
    public TsBlockSingleColumnIterator(int rowIndex, int columnIndex) {
×
260
      this.rowIndex = rowIndex;
×
261
      this.columnIndex = columnIndex;
×
262
    }
×
263

264
    @Override
265
    public boolean hasNext() {
266
      return rowIndex < positionCount;
1✔
267
    }
268

269
    @Override
270
    public boolean hasNext(long minBound, long maxBound) {
271
      return hasNext();
×
272
    }
273

274
    @Override
275
    public void next() {
276
      rowIndex++;
1✔
277
    }
1✔
278

279
    @Override
280
    public long currentTime() {
281
      return timeColumn.getLong(rowIndex);
1✔
282
    }
283

284
    @Override
285
    public Object currentValue() {
286
      return valueColumns[columnIndex].getTsPrimitiveType(rowIndex).getValue();
1✔
287
    }
288

289
    @Override
290
    public void reset() {
291
      rowIndex = 0;
×
292
    }
×
293

294
    @Override
295
    public int totalLength() {
296
      return positionCount;
×
297
    }
298

299
    @Override
300
    public boolean hasNextTimeValuePair() {
301
      return hasNext();
1✔
302
    }
303

304
    @Override
305
    public TimeValuePair nextTimeValuePair() {
306
      TimeValuePair res = currentTimeValuePair();
1✔
307
      next();
1✔
308
      return res;
1✔
309
    }
310

311
    @Override
312
    public TimeValuePair currentTimeValuePair() {
313
      return new TimeValuePair(
1✔
314
          timeColumn.getLong(rowIndex), valueColumns[columnIndex].getTsPrimitiveType(rowIndex));
1✔
315
    }
316

317
    @Override
318
    public void close() {
319
      // do nothing
320
    }
1✔
321

322
    public long getEndTime() {
323
      return TsBlock.this.getEndTime();
×
324
    }
325

326
    public long getStartTime() {
327
      return TsBlock.this.getStartTime();
×
328
    }
329

330
    public int getRowIndex() {
331
      return rowIndex;
×
332
    }
333

334
    public void setRowIndex(int rowIndex) {
335
      this.rowIndex = rowIndex;
×
336
    }
×
337
  }
338

339
  /** Mainly used for UDF framework. Note that the timestamps are at the last column. */
340
  public class TsBlockRowIterator implements Iterator<Object[]> {
341

342
    protected int rowIndex;
343
    protected int columnCount;
344

345
    public TsBlockRowIterator(int rowIndex) {
×
346
      this.rowIndex = rowIndex;
×
347
      columnCount = getValueColumnCount();
×
348
    }
×
349

350
    @Override
351
    public boolean hasNext() {
352
      return rowIndex < positionCount;
×
353
    }
354

355
    /** @return A row in the TsBlock. The timestamp is at the last column. */
356
    @Override
357
    public Object[] next() {
358
      if (!hasNext()) {
×
359
        throw new NoSuchElementException();
×
360
      }
361

362
      int curColumnCount = getValueColumnCount();
×
363
      Object[] row = new Object[curColumnCount + 1];
×
364
      for (int i = 0; i < curColumnCount; ++i) {
×
365
        final Column column = valueColumns[i];
×
366
        row[i] = column.isNull(rowIndex) ? null : column.getObject(rowIndex);
×
367
      }
368
      row[curColumnCount] = timeColumn.getObject(rowIndex);
×
369

370
      rowIndex++;
×
371

372
      return row;
×
373
    }
374
  }
375

376
  private class TsBlockAlignedRowIterator implements IPointReader, IBatchDataIterator {
377

378
    private int rowIndex;
379

380
    public TsBlockAlignedRowIterator(int rowIndex) {
1✔
381
      this.rowIndex = rowIndex;
1✔
382
    }
1✔
383

384
    @Override
385
    public boolean hasNext() {
386
      return rowIndex < positionCount;
1✔
387
    }
388

389
    @Override
390
    public boolean hasNext(long minBound, long maxBound) {
391
      while (hasNext()) {
×
392
        if (currentTime() < minBound || currentTime() >= maxBound) {
×
393
          break;
×
394
        }
395
        next();
×
396
      }
397
      return hasNext();
×
398
    }
399

400
    @Override
401
    public void next() {
402
      rowIndex++;
1✔
403
    }
1✔
404

405
    @Override
406
    public long currentTime() {
407
      return timeColumn.getLong(rowIndex);
1✔
408
    }
409

410
    @Override
411
    public TsPrimitiveType[] currentValue() {
412
      TsPrimitiveType[] tsPrimitiveTypes = new TsPrimitiveType[valueColumns.length];
1✔
413
      for (int i = 0; i < valueColumns.length; i++) {
1✔
414
        if (!valueColumns[i].isNull(rowIndex)) {
1✔
415
          tsPrimitiveTypes[i] = valueColumns[i].getTsPrimitiveType(rowIndex);
1✔
416
        }
417
      }
418
      return tsPrimitiveTypes;
1✔
419
    }
420

421
    @Override
422
    public void reset() {
423
      rowIndex = 0;
×
424
    }
×
425

426
    @Override
427
    public int totalLength() {
428
      return positionCount;
×
429
    }
430

431
    @Override
432
    public boolean hasNextTimeValuePair() {
433
      while (hasNext() && isCurrentValueAllNull()) {
1✔
434
        next();
1✔
435
      }
436
      return hasNext();
1✔
437
    }
438

439
    @Override
440
    public TimeValuePair nextTimeValuePair() {
441
      TimeValuePair res = currentTimeValuePair();
1✔
442
      next();
1✔
443
      return res;
1✔
444
    }
445

446
    @Override
447
    public TimeValuePair currentTimeValuePair() {
448
      return new TimeValuePair(
1✔
449
          timeColumn.getLong(rowIndex), new TsPrimitiveType.TsVector(currentValue()));
1✔
450
    }
451

452
    @Override
453
    public void close() {
454
      // do nothing
455
    }
1✔
456

457
    public long getEndTime() {
458
      return TsBlock.this.getEndTime();
×
459
    }
460

461
    public long getStartTime() {
462
      return TsBlock.this.getStartTime();
×
463
    }
464

465
    public int getRowIndex() {
466
      return rowIndex;
×
467
    }
468

469
    public void setRowIndex(int rowIndex) {
470
      this.rowIndex = rowIndex;
×
471
    }
×
472

473
    private boolean isCurrentValueAllNull() {
474
      for (Column valueColumn : valueColumns) {
1✔
475
        if (!valueColumn.isNull(rowIndex)) {
1✔
476
          return false;
1✔
477
        }
478
      }
479
      return true;
1✔
480
    }
481
  }
482

483
  private long updateRetainedSize() {
484
    long newRetainedSizeInBytes = INSTANCE_SIZE;
1✔
485
    newRetainedSizeInBytes += timeColumn.getRetainedSizeInBytes();
1✔
486
    for (Column column : valueColumns) {
1✔
487
      newRetainedSizeInBytes += column.getRetainedSizeInBytes();
1✔
488
    }
489
    this.retainedSizeInBytes = newRetainedSizeInBytes;
1✔
490
    return newRetainedSizeInBytes;
1✔
491
  }
492

493
  public int getTotalInstanceSize() {
494
    int totalInstanceSize = INSTANCE_SIZE;
1✔
495
    totalInstanceSize += timeColumn.getInstanceSize();
1✔
496
    for (Column column : valueColumns) {
1✔
497
      totalInstanceSize += column.getInstanceSize();
1✔
498
    }
499
    return totalInstanceSize;
1✔
500
  }
501

502
  private static int determinePositionCount(Column... columns) {
503
    requireNonNull(columns, "columns is null");
1✔
504
    if (columns.length == 0) {
1✔
505
      throw new IllegalArgumentException("columns is empty");
×
506
    }
507

508
    return columns[0].getPositionCount();
1✔
509
  }
510
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc