• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

apache / datasketches-java / #306

30 Apr 2024 10:01PM UTC coverage: 97.645% (-0.5%) from 98.139%
#306

push

web-flow
Merge pull request #555 from apache/fix_pom_xml_header

Fix pom xml header

26865 of 27513 relevant lines covered (97.64%)

0.98 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

96.57
/src/main/java/org/apache/datasketches/cpc/CpcUnion.java
1
/*
2
 * Licensed to the Apache Software Foundation (ASF) under one
3
 * or more contributor license agreements.  See the NOTICE file
4
 * distributed with this work for additional information
5
 * regarding copyright ownership.  The ASF licenses this file
6
 * to you under the Apache License, Version 2.0 (the
7
 * "License"); you may not use this file except in compliance
8
 * with the License.  You may obtain a copy of the License at
9
 *
10
 *   http://www.apache.org/licenses/LICENSE-2.0
11
 *
12
 * Unless required by applicable law or agreed to in writing,
13
 * software distributed under the License is distributed on an
14
 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15
 * KIND, either express or implied.  See the License for the
16
 * specific language governing permissions and limitations
17
 * under the License.
18
 */
19

20
package org.apache.datasketches.cpc;
21

22
import static org.apache.datasketches.common.Util.INVERSE_GOLDEN;
23
import static org.apache.datasketches.cpc.CpcUtil.countBitsSetInMatrix;
24
import static org.apache.datasketches.cpc.Flavor.EMPTY;
25
import static org.apache.datasketches.cpc.Flavor.SPARSE;
26

27
import org.apache.datasketches.common.Family;
28
import org.apache.datasketches.common.SketchesArgumentException;
29
import org.apache.datasketches.common.SketchesStateException;
30
import org.apache.datasketches.thetacommon.ThetaUtil;
31

32
/*
33
 * The merging logic is somewhat involved, so it will be summarized here.
34
 *
35
 * <p>First, we compare the K values of the union and the source sketch.
36
 *
37
 * <p>If (source.K &lt; union.K), we reduce the union's K to match, which
38
 * requires downsampling the union's internal sketch.
39
 *
40
 * <p>Here is how to perform the downsampling.
41
 *
42
 * <p>If the union contains a bitMatrix, downsample it by row-wise OR'ing.
43
 *
44
 * <p>If the union contains a sparse sketch, then create a new empty
45
 * sketch, and walk the old target sketch updating the new one (with modulo).
46
 * At the end, check whether the new target
47
 * sketch is still in sparse mode (it might not be, because downsampling
48
 * densifies the set of collected coupons). If it is NOT in sparse mode,
49
 * immediately convert it to a bitmatrix.
50
 *
51
 * <p>At this point, we have source.K &ge; union.K.
52
 * [We won't keep mentioning this, but in all of the following the
53
 * source's row indices are used mod union.K while updating the union's sketch.
54
 * That takes care of the situation where source.K &lt; union.K.]
55
 *
56
 * <p>Case A: union is Sparse and source is Sparse. We walk the source sketch
57
 * updating the union's sketch. At the end, if the union's sketch
58
 * is no longer in sparse mode, we convert it to a bitmatrix.
59
 *
60
 * <p>Case B: union is bitmatrix and source is Sparse. We walk the source sketch,
61
 * setting bits in the bitmatrix.
62
 *
63
 * <p>In the remaining cases, we have flavor(source) &gt; Sparse, so we immediately convert the
64
 * union's sketch to a bitmatrix (even if the union contains very few coupons). Then:
65
 *
66
 * <p>Case C: union is bitmatrix and source is Hybrid or Pinned. Then we OR the source's
67
 * sliding window into the bitmatrix, and walk the source's table, setting bits in the bitmatrix.
68
 *
69
 * <p>Case D: union is bitmatrix, and source is Sliding. Then we convert the source into
70
 * a bitmatrix, and OR it into the union's bitmatrix. [Important note; merely walking the source
71
 * wouldn't work because of the partially inverted Logic in the Sliding flavor, where the presence of
72
 * coupons is sometimes indicated by the ABSENCE of rowCol pairs in the surprises table.]
73
 *
74
 * <p>How does getResult work?
75
 *
76
 * <p>If the union is using its accumulator field, make a copy of that sketch.
77
 *
78
 * <p>If the union is using its bitMatrix field, then we have to convert the
79
 * bitMatrix back into a sketch, which requires doing some extra work to
80
 * figure out the values of numCoupons, offset, fiCol, and KxQ.
81
 *
82
 */
83
/**
84
 * The union (merge) operation for the CPC sketches.
85
 *
86
 * @author Lee Rhodes
87
 * @author Kevin Lang
88
 */
89
public class CpcUnion {
1✔
90
  private final long seed;
91
  private int lgK;
92

93
  // Note: at most one of bitMatrix and accumulator will be non-null at any given moment.
94
  // accumulator is a sketch object that is employed until it graduates out of Sparse mode.
95
  // At that point, it is converted into a full-sized bitMatrix, which is mathematically a sketch,
96
  // but doesn't maintain any of the "extra" fields of our sketch objects, so some additional work
97
  // is required when getResult is called at the end.
98
  private long[] bitMatrix;
99
  private CpcSketch accumulator; //can only be empty or sparse Flavor
100

101
  /**
102
   * Construct this unioning object with the default LgK and the default update seed.
103
   */
104
  public CpcUnion() {
105
    this(CpcSketch.DEFAULT_LG_K, ThetaUtil.DEFAULT_UPDATE_SEED);
1✔
106
  }
1✔
107

108
  /**
109
   * Construct this unioning object with LgK and the default update seed.
110
   * @param lgK The given log2 of K.
111
   */
112
  public CpcUnion(final int lgK) {
113
    this(lgK, ThetaUtil.DEFAULT_UPDATE_SEED);
1✔
114
  }
1✔
115

116
  /**
117
   * Construct this unioning object with LgK and a given seed.
118
   * @param lgK The given log2 of K.
119
   * @param seed The given seed.
120
   */
121
  public CpcUnion(final int lgK, final long seed) {
1✔
122
    this.seed = seed;
1✔
123
    this.lgK = lgK;
1✔
124
    bitMatrix = null;
1✔
125
    // We begin with the accumulator holding an EMPTY_MERGED sketch object.
126
    // As an optimization the accumulator could start as NULL, but that would require changes elsewhere.
127
    accumulator = new CpcSketch(lgK);
1✔
128
  }
1✔
129

130
  /**
131
   * Update this union with a CpcSketch.
132
   * @param sketch the given CpcSketch.
133
   */
134
  public void update(final CpcSketch sketch) {
135
    mergeInto(this, sketch);
1✔
136
  }
1✔
137

138
  /**
139
   * Returns the result of union operations as a CPC sketch.
140
   * @return the result of union operations as a CPC sketch.
141
   */
142
  public CpcSketch getResult() {
143
    return getResult(this);
1✔
144
  }
145

146
  /**
147
   * Returns the current value of Log_base2 of K.  Note that due to merging with source sketches that
148
   * may have a lower value of LgK, this value can be less than what the union object was configured
149
   * with.
150
   *
151
   * @return the current value of Log_base2 of K.
152
   */
153
  public int getLgK() {
154
    return lgK;
1✔
155
  }
156

157
  /**
158
   * Return the DataSketches identifier for this CPC family of sketches.
159
   * @return the DataSketches identifier for this CPC family of sketches.
160
   */
161
  public static Family getFamily() {
162
    return Family.CPC;
1✔
163
  }
164

165
  //used for testing only
166
  long getNumCoupons() {
167
    if (bitMatrix != null) {
1✔
168
      return countBitsSetInMatrix(bitMatrix);
1✔
169
    }
170
    return accumulator.numCoupons;
1✔
171
  }
172

173
  //used for testing only
174
  static long[] getBitMatrix(final CpcUnion union) {
175
    checkUnionState(union);
1✔
176
    return (union.bitMatrix != null)
1✔
177
        ? union.bitMatrix
178
        : CpcUtil.bitMatrixOfSketch(union.accumulator);
1✔
179
  }
180

181
  private static void walkTableUpdatingSketch(final CpcSketch dest, final PairTable table) {
182
    final int[] slots = table.getSlotsArr();
1✔
183
    final int numSlots = (1 << table.getLgSizeInts());
1✔
184
    assert dest.lgK <= 26;
1✔
185
    final int destMask = (((1 << dest.lgK) - 1) << 6) | 63; //downsamples when destlgK < srcLgK
1✔
186

187
    /* Using the inverse golden ratio stride fixes the
188
     * <a href="{@docRoot}/resources/dictionary.html#SnowPlow">Snow Plow Effect</a>.
189
     */
190
    int stride =  (int) (INVERSE_GOLDEN * numSlots);
1✔
191
    assert stride >= 2;
1✔
192
    if (stride == ((stride >>> 1) << 1)) { stride += 1; } //force the stride to be odd
1✔
193
    assert (stride >= 3) && (stride < numSlots);
1✔
194

195
    for (int i = 0, j = 0; i < numSlots; i++, j += stride) {
1✔
196
      j &= (numSlots - 1);
1✔
197
      final int rowCol = slots[j];
1✔
198
      if (rowCol != -1) {
1✔
199
        dest.rowColUpdate(rowCol & destMask);
1✔
200
      }
201
    }
202
  }
1✔
203

204
  private static void orTableIntoMatrix(final long[] bitMatrix, final int destLgK, final PairTable table) {
205
    final int[] slots = table.getSlotsArr();
1✔
206
    final int numSlots = 1 << table.getLgSizeInts();
1✔
207
    final int destMask = (1 << destLgK) - 1;  // downsamples when destlgK < srcLgK
1✔
208
    for (int i = 0; i < numSlots; i++) {
1✔
209
      final int rowCol = slots[i];
1✔
210
      if (rowCol != -1) {
1✔
211
        final int col = rowCol & 63;
1✔
212
        final int row = rowCol >>> 6;
1✔
213
        bitMatrix[row & destMask] |= (1L << col); // Set the bit.
1✔
214
      }
215
    }
216
  }
1✔
217

218
  private static void orWindowIntoMatrix(final long[] destMatrix, final int destLgK,
219
      final byte[] srcWindow, final int srcOffset, final int srcLgK) {
220
    assert (destLgK <= srcLgK);
1✔
221
    final int destMask = (1 << destLgK) - 1;  // downsamples when destlgK < srcLgK
1✔
222
    final int srcK = 1 << srcLgK;
1✔
223
    for (int srcRow = 0; srcRow < srcK; srcRow++) {
1✔
224
      destMatrix[srcRow & destMask] |= ((srcWindow[srcRow] & 0XFFL) << srcOffset);
1✔
225
    }
226
  }
1✔
227

228
  private static void orMatrixIntoMatrix(final long[] destMatrix, final int destLgK,
229
      final long[] srcMatrix, final int srcLgK) {
230
    assert (destLgK <= srcLgK);
1✔
231
    final int destMask = (1 << destLgK) - 1; // downsamples when destlgK < srcLgK
1✔
232
    final int srcK = 1 << srcLgK;
1✔
233
    for (int srcRow = 0; srcRow < srcK; srcRow++) {
1✔
234
      destMatrix[srcRow & destMask] |= srcMatrix[srcRow];
1✔
235
    }
236
  }
1✔
237

238
  private static void reduceUnionK(final CpcUnion union, final int newLgK) {
239
    assert (newLgK < union.lgK);
1✔
240

241
    if (union.bitMatrix != null) { // downsample the union's bit matrix
1✔
242
      final int newK = 1 << newLgK;
1✔
243
      final long[] newMatrix = new long[newK];
1✔
244

245
      orMatrixIntoMatrix(newMatrix, newLgK, union.bitMatrix, union.lgK);
1✔
246
      union.bitMatrix = newMatrix;
1✔
247
      union.lgK = newLgK;
1✔
248
    }
1✔
249

250
    else { // downsample the union's accumulator
251
      final CpcSketch oldSketch = union.accumulator;
1✔
252

253
      if (oldSketch.numCoupons == 0) {
1✔
254
        union.accumulator = new CpcSketch(newLgK, oldSketch.seed);
1✔
255
        union.lgK = newLgK;
1✔
256
        return;
1✔
257
      }
258

259
      final CpcSketch newSketch = new CpcSketch(newLgK, oldSketch.seed);
1✔
260
      walkTableUpdatingSketch(newSketch, oldSketch.pairTable);
1✔
261

262
      final Flavor finalNewFlavor = newSketch.getFlavor();
1✔
263
      assert (finalNewFlavor != EMPTY); //SV table had to have something in it
1✔
264

265
      if (finalNewFlavor == SPARSE) {
1✔
266
        union.accumulator = newSketch;
1✔
267
        union.lgK = newLgK;
1✔
268
        return;
1✔
269
      }
270

271
      // the new sketch has graduated beyond sparse, so convert to bitMatrix
272
      union.accumulator = null;
1✔
273
      union.bitMatrix = CpcUtil.bitMatrixOfSketch(newSketch);
1✔
274
      union.lgK = newLgK;
1✔
275
    }
276
  }
1✔
277

278
  private static void mergeInto(final CpcUnion union, final CpcSketch source) {
279
    if (source == null) { return; }
1✔
280
    checkSeeds(union.seed, source.seed);
1✔
281

282
    final int sourceFlavorOrd = source.getFlavor().ordinal();
1✔
283
    if (sourceFlavorOrd == 0) { return; } //EMPTY
1✔
284

285
    //Accumulator and bitMatrix must be mutually exclusive,
286
    //so bitMatrix != null => accumulator == null and visa versa
287
    //if (Accumulator != null) union must be EMPTY or SPARSE,
288
    checkUnionState(union);
1✔
289

290
    if (source.lgK < union.lgK) { reduceUnionK(union, source.lgK); }
1✔
291

292
    // if source is past SPARSE mode, make sure that union is a bitMatrix.
293
    if ((sourceFlavorOrd > 1) && (union.accumulator != null)) {
1✔
294
      union.bitMatrix = CpcUtil.bitMatrixOfSketch(union.accumulator);
1✔
295
      union.accumulator = null;
1✔
296
    }
297

298
    final int state = ((sourceFlavorOrd - 1) << 1) | ((union.bitMatrix != null) ? 1 : 0);
1✔
299
    switch (state) {
1✔
300
      case 0 : { //A: Sparse, bitMatrix == null, accumulator valid
301
        if (union.accumulator == null) {
1✔
302
          //CodeQL could not figure this out so I have to insert this.
303
          throw new SketchesStateException("union.accumulator can never be null here.");
×
304
        }
305
        if ((union.accumulator.getFlavor() == EMPTY)
1✔
306
            && (union.lgK == source.lgK)) {
307
          union.accumulator = source.copy();
1✔
308
          break;
1✔
309
        }
310
        walkTableUpdatingSketch(union.accumulator, source.pairTable);
1✔
311
        // if the accumulator has graduated beyond sparse, switch union to a bitMatrix
312
        if (union.accumulator.getFlavor().ordinal() > 1) {
1✔
313
          union.bitMatrix = CpcUtil.bitMatrixOfSketch(union.accumulator);
1✔
314
          union.accumulator = null;
1✔
315
        }
316
        break;
317
      }
318
      case 1 : { //B: Sparse, bitMatrix valid, accumulator == null
319
        orTableIntoMatrix(union.bitMatrix, union.lgK, source.pairTable);
1✔
320
        break;
1✔
321
      }
322
      case 3 :   //C: Hybrid, bitMatrix valid, accumulator == null
323
      case 5 : { //C: Pinned, bitMatrix valid, accumulator == null
324
        orWindowIntoMatrix(union.bitMatrix, union.lgK, source.slidingWindow,
1✔
325
            source.windowOffset, source.lgK);
326
        orTableIntoMatrix(union.bitMatrix, union.lgK, source.pairTable);
1✔
327
        break;
1✔
328
      }
329
      case 7 : { //D: Sliding, bitMatrix valid, accumulator == null
330
        // SLIDING mode involves inverted logic, so we can't just walk the source sketch.
331
        // Instead, we convert it to a bitMatrix that can be OR'ed into the destination.
332
        final long[] sourceMatrix = CpcUtil.bitMatrixOfSketch(source);
1✔
333
        orMatrixIntoMatrix(union.bitMatrix, union.lgK, sourceMatrix, source.lgK);
1✔
334
        break;
1✔
335
      }
336
      default: throw new SketchesStateException("Illegal Union state: " + state);
×
337
    }
338
  }
1✔
339

340
  private static CpcSketch getResult(final CpcUnion union) {
341
    checkUnionState(union);
1✔
342

343
    if (union.accumulator != null) { // start of case where union contains a sketch
1✔
344
      if (union.accumulator.numCoupons == 0) {
1✔
345
        final CpcSketch result = new CpcSketch(union.lgK, union.accumulator.seed);
1✔
346
        result.mergeFlag = true;
1✔
347
        return (result);
1✔
348
      }
349
      assert (SPARSE == union.accumulator.getFlavor());
1✔
350
      final CpcSketch result = union.accumulator.copy();
1✔
351
      result.mergeFlag = true;
1✔
352
      return (result);
1✔
353
    } // end of case where union contains a sketch
354

355
    // start of case where union contains a bitMatrix
356
    final long[] matrix = union.bitMatrix;
1✔
357
    final int lgK = union.lgK;
1✔
358
    final CpcSketch result = new CpcSketch(union.lgK, union.seed);
1✔
359

360
    final long numCoupons = countBitsSetInMatrix(matrix);
1✔
361
    result.numCoupons = numCoupons;
1✔
362

363
    final Flavor flavor = CpcUtil.determineFlavor(lgK, numCoupons);
1✔
364
    assert ((flavor.ordinal() > SPARSE.ordinal()) );
1✔
365

366
    final int offset = CpcUtil.determineCorrectOffset(lgK, numCoupons);
1✔
367
    result.windowOffset = offset;
1✔
368

369
    //Build the window and pair table
370

371
    final int k = 1 << lgK;
1✔
372
    final byte[] window = new byte[k];
1✔
373
    result.slidingWindow = window;
1✔
374

375
    // LgSize = K/16; in some cases this will end up being oversized
376
    final int newTableLgSize = Math.max(lgK - 4, 2);
1✔
377
    final PairTable table = new PairTable(newTableLgSize, 6 + lgK);
1✔
378
    result.pairTable = table;
1✔
379

380
    // The following works even when the offset is zero.
381
    final long maskForClearingWindow = (0XFFL << offset) ^ -1L;
1✔
382
    final long maskForFlippingEarlyZone = (1L << offset) - 1L;
1✔
383
    long allSurprisesORed = 0;
1✔
384

385
    /* using a sufficiently large hash table avoids the
386
     * <a href="{@docRoot}/resources/dictionary.html#SnowPlow">Snow Plow Effect</a>
387
     */
388
    for (int i = 0; i < k; i++) {
1✔
389
      long pattern = matrix[i];
1✔
390
      window[i] = (byte) ((pattern >>> offset) & 0XFFL);
1✔
391
      pattern &= maskForClearingWindow;
1✔
392
      pattern ^= maskForFlippingEarlyZone; // This flipping converts surprising 0's to 1's.
1✔
393
      allSurprisesORed |= pattern;
1✔
394
      while (pattern != 0) {
1✔
395
        final int col = Long.numberOfTrailingZeros(pattern);
1✔
396
        pattern = pattern ^ (1L << col); // erase the 1.
1✔
397
        final int rowCol = (i << 6) | col;
1✔
398
        final boolean isNovel = PairTable.maybeInsert(table, rowCol);
1✔
399
        assert isNovel;
1✔
400
      }
1✔
401
    }
402

403
    // At this point we could shrink an oversize hash table, but the relative waste isn't very big.
404
    result.fiCol = Long.numberOfTrailingZeros(allSurprisesORed);
1✔
405
    if (result.fiCol > offset) {
1✔
406
      result.fiCol = offset;
1✔
407
    } // corner case
408

409
    // NB: the HIP-related fields will contain bogus values, but that is okay.
410

411
    result.mergeFlag = true;
1✔
412
    return result;
1✔
413
    // end of case where union contains a bitMatrix
414
  }
415

416
  private static void checkSeeds(final long seedA, final long seedB) {
417
    if (seedA != seedB) {
1✔
418
      throw new SketchesArgumentException("Hash Seeds do not match.");
1✔
419
    }
420
  }
1✔
421

422
  private static void checkUnionState(final CpcUnion union) {
423
    if (union == null) {
1✔
424
      throw new SketchesStateException("union cannot be null");
1✔
425
    }
426
    final CpcSketch accumulator = union.accumulator;
1✔
427
    if ( !((accumulator != null) ^ (union.bitMatrix != null)) ) {
1✔
428
      throw new SketchesStateException(
×
429
        "accumulator and bitMatrix cannot be both valid or both null: "
430
        + "accumValid = " + (accumulator != null)
431
        + ", bitMatrixValid = " + (union.bitMatrix != null));
432
    }
433
    if (accumulator != null) { //must be SPARSE or EMPTY
1✔
434
      if (accumulator.numCoupons > 0) { //SPARSE
1✔
435
        if ( !((accumulator.slidingWindow == null) && (accumulator.pairTable != null)) ) {
1✔
436
          throw new SketchesStateException(
×
437
              "Non-empty union accumulator must be SPARSE: " + accumulator.getFlavor());
×
438
        }
439
      } //else EMPTY
440
      if (union.lgK != accumulator.lgK) {
1✔
441
        throw new SketchesStateException("union LgK must equal accumulator LgK");
×
442
      }
443
    }
444
  }
1✔
445

446
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc