• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

apache / datasketches-java / #306

30 Apr 2024 10:01PM UTC coverage: 97.645% (-0.5%) from 98.139%
#306

push

web-flow
Merge pull request #555 from apache/fix_pom_xml_header

Fix pom xml header

26865 of 27513 relevant lines covered (97.64%)

0.98 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

99.38
/src/main/java/org/apache/datasketches/quantiles/HeapUpdateDoublesSketch.java
1
/*
2
 * Licensed to the Apache Software Foundation (ASF) under one
3
 * or more contributor license agreements.  See the NOTICE file
4
 * distributed with this work for additional information
5
 * regarding copyright ownership.  The ASF licenses this file
6
 * to you under the Apache License, Version 2.0 (the
7
 * "License"); you may not use this file except in compliance
8
 * with the License.  You may obtain a copy of the License at
9
 *
10
 *   http://www.apache.org/licenses/LICENSE-2.0
11
 *
12
 * Unless required by applicable law or agreed to in writing,
13
 * software distributed under the License is distributed on an
14
 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15
 * KIND, either express or implied.  See the License for the
16
 * specific language governing permissions and limitations
17
 * under the License.
18
 */
19

20
package org.apache.datasketches.quantiles;
21

22
import static org.apache.datasketches.quantiles.ClassicUtil.MIN_K;
23
import static org.apache.datasketches.quantiles.ClassicUtil.checkFamilyID;
24
import static org.apache.datasketches.quantiles.ClassicUtil.checkHeapFlags;
25
import static org.apache.datasketches.quantiles.ClassicUtil.computeBaseBufferItems;
26
import static org.apache.datasketches.quantiles.ClassicUtil.computeBitPattern;
27
import static org.apache.datasketches.quantiles.ClassicUtil.computeCombinedBufferItemCapacity;
28
import static org.apache.datasketches.quantiles.ClassicUtil.computeNumLevelsNeeded;
29
import static org.apache.datasketches.quantiles.ClassicUtil.computeRetainedItems;
30
import static org.apache.datasketches.quantiles.PreambleUtil.COMPACT_FLAG_MASK;
31
import static org.apache.datasketches.quantiles.PreambleUtil.EMPTY_FLAG_MASK;
32
import static org.apache.datasketches.quantiles.PreambleUtil.MAX_DOUBLE;
33
import static org.apache.datasketches.quantiles.PreambleUtil.MIN_DOUBLE;
34
import static org.apache.datasketches.quantiles.PreambleUtil.extractFamilyID;
35
import static org.apache.datasketches.quantiles.PreambleUtil.extractFlags;
36
import static org.apache.datasketches.quantiles.PreambleUtil.extractK;
37
import static org.apache.datasketches.quantiles.PreambleUtil.extractN;
38
import static org.apache.datasketches.quantiles.PreambleUtil.extractPreLongs;
39
import static org.apache.datasketches.quantiles.PreambleUtil.extractSerVer;
40

41
import java.util.Arrays;
42

43
import org.apache.datasketches.common.Family;
44
import org.apache.datasketches.common.SketchesArgumentException;
45
import org.apache.datasketches.memory.Memory;
46
import org.apache.datasketches.memory.WritableMemory;
47
import org.apache.datasketches.quantilescommon.QuantilesAPI;
48

49
/**
50
 * Implements the DoublesSketch on the Java heap.
51
 *
52
 * @author Lee Rhodes
53
 * @author Jon Malkin
54
 */
55
final class HeapUpdateDoublesSketch extends UpdateDoublesSketch {
1✔
56
  static final int MIN_HEAP_DOUBLES_SER_VER = 1;
57

58
  /**
59
   * The smallest item ever seen in the stream.
60
   */
61
  private double minItem_;
62

63
  /**
64
   * The largest item ever seen in the stream.
65
   */
66
  private double maxItem_;
67

68
  /**
69
   * The total count of items seen.
70
   */
71
  private long n_;
72

73
  /**
74
   * Number of items currently in base buffer.
75
   *
76
   * <p>Count = N % (2*K)</p>
77
   */
78
  private int baseBufferCount_;
79

80
  /**
81
   * Active levels expressed as a bit pattern.
82
   *
83
   * <p>Pattern = N / (2 * K)</p>
84
   */
85
  private long bitPattern_;
86

87
  /**
88
   * This single array contains the base buffer plus all levels some of which may not be used,
89
   * i.e, is in non-compact form.
90
   * A level is of size K and is either full and sorted, or not used. A "not used" buffer may have
91
   * garbage. Whether a level buffer used or not is indicated by the bitPattern_.
92
   * The base buffer has length 2*K but might not be full and isn't necessarily sorted.
93
   * The base buffer precedes the level buffers. This buffer does not include the min, max items.
94
   *
95
   * <p>The levels arrays require quite a bit of explanation, which we defer until later.</p>
96
   */
97
  private double[] combinedBuffer_;
98

99
  //**CONSTRUCTORS**********************************************************
100
  private HeapUpdateDoublesSketch(final int k) {
101
    super(k); //Checks k
1✔
102
  }
1✔
103

104
  /**
105
   * Obtains a new on-heap instance of a DoublesSketch.
106
   *
107
   * @param k Parameter that controls space usage of sketch and accuracy of estimates.
108
   * Must be greater than 1 and less than 65536 and a power of 2.
109
   * @return a HeapUpdateDoublesSketch
110
   */
111
  static HeapUpdateDoublesSketch newInstance(final int k) {
112
    final HeapUpdateDoublesSketch hqs = new HeapUpdateDoublesSketch(k);
1✔
113
    final int baseBufAlloc = 2 * Math.min(MIN_K, k); //the min is important
1✔
114
    hqs.n_ = 0;
1✔
115
    hqs.combinedBuffer_ = new double[baseBufAlloc];
1✔
116
    hqs.baseBufferCount_ = 0;
1✔
117
    hqs.bitPattern_ = 0;
1✔
118
    hqs.minItem_ = Double.NaN;
1✔
119
    hqs.maxItem_ = Double.NaN;
1✔
120
    return hqs;
1✔
121
  }
122

123
  /**
124
   * Heapifies the given srcMem, which must be a Memory image of a DoublesSketch and may have data.
125
   *
126
   * @param srcMem a Memory image of a sketch, which may be in compact or not compact form.
127
   * <a href="{@docRoot}/resources/dictionary.html#mem">See Memory</a>
128
   * @return a DoublesSketch on the Java heap.
129
   */
130
  static HeapUpdateDoublesSketch heapifyInstance(final Memory srcMem) {
131
    final long memCapBytes = srcMem.getCapacity();
1✔
132
    if (memCapBytes < 8) {
1✔
133
      throw new SketchesArgumentException("Source Memory too small: " + memCapBytes + " < 8");
1✔
134
    }
135

136
    final int preLongs = extractPreLongs(srcMem);
1✔
137
    final int serVer = extractSerVer(srcMem);
1✔
138
    final int familyID = extractFamilyID(srcMem);
1✔
139
    final int flags = extractFlags(srcMem);
1✔
140
    final int k = extractK(srcMem);
1✔
141

142
    final boolean empty = (flags & EMPTY_FLAG_MASK) > 0; //Preamble flags empty state
1✔
143
    final long n = empty ? 0 : extractN(srcMem);
1✔
144

145
    //VALIDITY CHECKS
146
    DoublesUtil.checkDoublesSerVer(serVer, MIN_HEAP_DOUBLES_SER_VER);
1✔
147
    checkHeapFlags(flags);
1✔
148
    checkPreLongsFlagsSerVer(flags, serVer, preLongs);
1✔
149
    checkFamilyID(familyID);
1✔
150

151
    final HeapUpdateDoublesSketch hds = newInstance(k); //checks k
1✔
152
    if (empty) { return hds; }
1✔
153

154
    //Not empty, must have valid preamble + min, max, n.
155
    //Forward compatibility from SerVer = 1 :
156
    final boolean srcIsCompact = (serVer == 2) | ((flags & COMPACT_FLAG_MASK) > 0);
1✔
157

158
    checkHeapMemCapacity(k, n, srcIsCompact, serVer, memCapBytes);
1✔
159

160
    //set class members by computing them
161
    hds.n_ = n;
1✔
162
    final int combBufCap = computeCombinedBufferItemCapacity(k, n);
1✔
163
    hds.baseBufferCount_ = computeBaseBufferItems(k, n);
1✔
164
    hds.bitPattern_ = computeBitPattern(k, n);
1✔
165
    //Extract min, max, data from srcMem into Combined Buffer
166
    hds.srcMemoryToCombinedBuffer(srcMem, serVer, srcIsCompact, combBufCap);
1✔
167
    return hds;
1✔
168
  }
169

170
  @Override
171
  public double getMaxItem() {
172
    if (isEmpty()) { throw new IllegalArgumentException(QuantilesAPI.EMPTY_MSG); }
1✔
173
    return maxItem_;
1✔
174
  }
175

176
  @Override
177
  public double getMinItem() {
178
    if (isEmpty()) { throw new IllegalArgumentException(QuantilesAPI.EMPTY_MSG); }
1✔
179
    return minItem_;
1✔
180
  }
181

182
  @Override
183
  public long getN() {
184
    return n_;
1✔
185
  }
186

187
  @Override
188
  public boolean hasMemory() {
189
    return false;
1✔
190
  }
191

192
  @Override
193
  public boolean isDirect() {
194
    return false;
1✔
195
  }
196

197
  @Override
198
  public boolean isReadOnly() {
199
    return false;
×
200
  }
201

202
  @Override
203
  public void reset() {
204
    n_ = 0;
1✔
205
    final int combinedBufferItemCapacity = 2 * Math.min(MIN_K, k_); //min is important
1✔
206
    combinedBuffer_ = new double[combinedBufferItemCapacity];
1✔
207
    baseBufferCount_ = 0;
1✔
208
    bitPattern_ = 0;
1✔
209
    minItem_ = Double.NaN;
1✔
210
    maxItem_ = Double.NaN;
1✔
211
  }
1✔
212

213
  @Override
214
  public void update(final double dataItem) {
215
    if (Double.isNaN(dataItem)) { return; }
1✔
216

217
    if (n_ == 0) {
1✔
218
      putMaxItem(dataItem);
1✔
219
      putMinItem(dataItem);
1✔
220
    } else {
221
      if (dataItem > getMaxItem()) { putMaxItem(dataItem); }
1✔
222
      if (dataItem < getMinItem()) { putMinItem(dataItem); }
1✔
223
    }
224

225
    //don't increment n_ and baseBufferCount_ yet
226
    final int curBBCount = baseBufferCount_;
1✔
227
    final int newBBCount = curBBCount + 1;
1✔
228
    final long newN = n_ + 1;
1✔
229

230
    final int combBufItemCap = combinedBuffer_.length;
1✔
231
    if (newBBCount > combBufItemCap) {
1✔
232
      growBaseBuffer(); //only changes combinedBuffer when it is only a base buffer
1✔
233
    }
234

235
    //put the new item in the base buffer
236
    combinedBuffer_[curBBCount] = dataItem;
1✔
237

238
    if (newBBCount == (k_ << 1)) { //Propagate
1✔
239

240
      // make sure there will be enough space (levels) for the propagation
241
      final int spaceNeeded = DoublesUpdateImpl.getRequiredItemCapacity(k_, newN);
1✔
242

243
      if (spaceNeeded > combBufItemCap) {
1✔
244
        // copies base buffer plus old levels, adds space for new level
245
        growCombinedBuffer(combBufItemCap, spaceNeeded);
1✔
246
      }
247

248
      // sort only the (full) base buffer via accessor which modifies the underlying base buffer,
249
      // then use as one of the inputs to propagate-carry
250
      final DoublesSketchAccessor bbAccessor = DoublesSketchAccessor.wrap(this, true);
1✔
251
      bbAccessor.sort();
1✔
252

253
      final long newBitPattern = DoublesUpdateImpl.inPlacePropagateCarry(
1✔
254
              0, // starting level
255
              null,
256
              bbAccessor,
257
              true,
258
              k_,
259
              DoublesSketchAccessor.wrap(this, true),
1✔
260
              bitPattern_
261
      );
262

263
      assert newBitPattern == computeBitPattern(k_, newN); // internal consistency check
1✔
264
      assert newBitPattern == (bitPattern_ + 1);
1✔
265

266
      bitPattern_ = newBitPattern;
1✔
267
      baseBufferCount_ = 0;
1✔
268
    } else {
1✔
269
      //bitPattern unchanged
270
      baseBufferCount_ = newBBCount;
1✔
271
    }
272
    n_ = newN;
1✔
273
    doublesSV = null;
1✔
274
  }
1✔
275

276
  /**
277
   * Loads the Combined Buffer, min and max from the given source Memory.
278
   * The resulting Combined Buffer is always in non-compact form and must be pre-allocated.
279
   * @param srcMem the given source Memory
280
   * @param serVer the serialization version of the source
281
   * @param srcIsCompact true if the given source Memory is in compact form
282
   * @param combBufCap total items for the combined buffer (size in doubles)
283
   */
284
  private void srcMemoryToCombinedBuffer(final Memory srcMem, final int serVer,
285
                                         final boolean srcIsCompact, final int combBufCap) {
286
    final int preLongs = 2;
1✔
287
    final int extra = (serVer == 1) ? 3 : 2; // space for min and max quantiles, buf alloc (SerVer 1)
1✔
288
    final int preBytes = (preLongs + extra) << 3;
1✔
289
    final int bbCnt = baseBufferCount_;
1✔
290
    final int k = getK();
1✔
291
    final long n = getN();
1✔
292
    final double[] combinedBuffer = new double[combBufCap]; //always non-compact
1✔
293
    //Load min, max
294
    putMinItem(srcMem.getDouble(MIN_DOUBLE));
1✔
295
    putMaxItem(srcMem.getDouble(MAX_DOUBLE));
1✔
296

297
    if (srcIsCompact) {
1✔
298
      //Load base buffer
299
      srcMem.getDoubleArray(preBytes, combinedBuffer, 0, bbCnt);
1✔
300

301
      //Load levels from compact srcMem
302
      long bitPattern = bitPattern_;
1✔
303
      if (bitPattern != 0) {
1✔
304
        long memOffset = preBytes + (bbCnt << 3);
1✔
305
        int combBufOffset = 2 * k;
1✔
306
        while (bitPattern != 0L) {
1✔
307
          if ((bitPattern & 1L) > 0L) {
1✔
308
            srcMem.getDoubleArray(memOffset, combinedBuffer, combBufOffset, k);
1✔
309
            memOffset += (k << 3); //bytes, increment compactly
1✔
310
          }
311
          combBufOffset += k; //doubles, increment every level
1✔
312
          bitPattern >>>= 1;
1✔
313
        }
314

315
      }
316
    } else { //srcMem not compact
1✔
317
      final int levels = computeNumLevelsNeeded(k, n);
1✔
318
      final int totItems = (levels == 0) ? bbCnt : (2 + levels) * k;
1✔
319
      srcMem.getDoubleArray(preBytes, combinedBuffer, 0, totItems);
1✔
320
    }
321
    putCombinedBuffer(combinedBuffer);
1✔
322
  }
1✔
323

324
  //Restricted overrides
325
  //Gets
326

327
  @Override
328
  int getBaseBufferCount() {
329
    return baseBufferCount_;
1✔
330
  }
331

332
  @Override
333
  int getCombinedBufferItemCapacity() {
334
    return combinedBuffer_.length;
1✔
335
  }
336

337
  @Override
338
  double[] getCombinedBuffer() {
339
    return combinedBuffer_;
1✔
340
  }
341

342
  @Override
343
  long getBitPattern() {
344
    return bitPattern_;
1✔
345
  }
346

347
  @Override
348
  WritableMemory getMemory() {
349
    return null;
1✔
350
  }
351

352
  //Puts
353

354
  @Override
355
  void putMinItem(final double minItem) {
356
    minItem_ = minItem;
1✔
357
  }
1✔
358

359
  @Override
360
  void putMaxItem(final double maxItem) {
361
    maxItem_ = maxItem;
1✔
362
  }
1✔
363

364
  @Override
365
  void putN(final long n) {
366
    n_ = n;
1✔
367
  }
1✔
368

369
  @Override
370
  void putCombinedBuffer(final double[] combinedBuffer) {
371
    combinedBuffer_ = combinedBuffer;
1✔
372
  }
1✔
373

374
  @Override
375
  void putBaseBufferCount(final int baseBufferCount) {
376
    baseBufferCount_ = baseBufferCount;
1✔
377
  }
1✔
378

379
  @Override
380
  void putBitPattern(final long bitPattern) {
381
    bitPattern_ = bitPattern;
1✔
382
  }
1✔
383

384
  @Override //the returned array is not always used
385
  double[] growCombinedBuffer(final int currentSpace, final int spaceNeeded) {
386
    combinedBuffer_ = Arrays.copyOf(combinedBuffer_, spaceNeeded);
1✔
387
    return combinedBuffer_;
1✔
388
  }
389

390
  /**
391
   * This is only used for on-heap sketches, and grows the Base Buffer by factors of 2 until it
392
   * reaches the maximum size of 2 * k. It is only called when there are no levels above the
393
   * Base Buffer.
394
   */
395
  //important: n has not been incremented yet
396
  private void growBaseBuffer() {
397
    final int oldSize = combinedBuffer_.length;
1✔
398
    assert oldSize < (2 * k_);
1✔
399
    final double[] baseBuffer = combinedBuffer_;
1✔
400
    final int newSize = 2 * Math.max(Math.min(k_, oldSize), MIN_K);
1✔
401
    combinedBuffer_ = Arrays.copyOf(baseBuffer, newSize);
1✔
402
  }
1✔
403

404
  static void checkPreLongsFlagsSerVer(final int flags, final int serVer, final int preLongs) {
405
    final boolean empty = (flags & EMPTY_FLAG_MASK) > 0;
1✔
406
    final boolean compact = (flags & COMPACT_FLAG_MASK) > 0;
1✔
407

408
    final int sw = (compact ? 1 : 0) + (2 * (empty ? 1 : 0)) + (4 * (serVer & 0xF))
1✔
409
        + (32 * (preLongs & 0x3F));
410
    boolean valid = true;
1✔
411
    switch (sw) { //These are the valid cases.
1✔
412
      case 38  : break; //!compact,  empty, serVer = 1, preLongs = 1; always stored as not compact
1✔
413
      case 164 : break; //!compact, !empty, serVer = 1, preLongs = 5; always stored as not compact
1✔
414
      case 42  : break; //!compact,  empty, serVer = 2, preLongs = 1; always stored as compact
1✔
415
      case 72  : break; //!compact, !empty, serVer = 2, preLongs = 2; always stored as compact
1✔
416
      case 47  : break; // compact,  empty, serVer = 3, preLongs = 1;
1✔
417
      case 46  : break; //!compact,  empty, serVer = 3, preLongs = 1;
1✔
418
      case 79  : break; // compact,  empty, serVer = 3, preLongs = 2;
1✔
419
      case 78  : break; //!compact,  empty, serVer = 3, preLongs = 2;
1✔
420
      case 77  : break; // compact, !empty, serVer = 3, preLongs = 2;
1✔
421
      case 76  : break; //!compact, !empty, serVer = 3, preLongs = 2;
1✔
422
      default : //all other cases are invalid
423
        valid = false;
1✔
424
    }
425

426
    if (!valid) {
1✔
427
      throw new SketchesArgumentException("Possible corruption. Inconsistent state: "
1✔
428
          + "PreambleLongs = " + preLongs + ", empty = " + empty + ", SerVer = " + serVer
429
          + ", Compact = " + compact);
430
    }
431
  }
1✔
432

433
  /**
434
   * Checks the validity of the heap memory capacity assuming n, k and the compact state.
435
   * @param k the given k
436
   * @param n the given n
437
   * @param compact true if memory is in compact form
438
   * @param serVer serialization version of the source
439
   * @param memCapBytes the current memory capacity in bytes
440
   */
441
  static void checkHeapMemCapacity(final int k, final long n, final boolean compact,
442
                                   final int serVer, final long memCapBytes) {
443
    final int metaPre = Family.QUANTILES.getMaxPreLongs() + ((serVer == 1) ? 3 : 2);
1✔
444
    final int retainedItems = computeRetainedItems(k, n);
1✔
445
    final int reqBufBytes;
446
    if (compact) {
1✔
447
      reqBufBytes = (metaPre + retainedItems) << 3;
1✔
448
    } else { //not compact
449
      final int totLevels = computeNumLevelsNeeded(k, n);
1✔
450
      reqBufBytes = (totLevels == 0)
1✔
451
          ? (metaPre + retainedItems) << 3
452
          : (metaPre + ((2 + totLevels) * k)) << 3;
453
    }
454
    if (memCapBytes < reqBufBytes) {
1✔
455
      throw new SketchesArgumentException("Possible corruption: Memory capacity too small: "
1✔
456
          + memCapBytes + " < " + reqBufBytes);
457
    }
458
  }
1✔
459

460
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc