• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

apache / datasketches-java / #306

30 Apr 2024 10:01PM UTC coverage: 97.645% (-0.5%) from 98.139%
#306

push

web-flow
Merge pull request #555 from apache/fix_pom_xml_header

Fix pom xml header

26865 of 27513 relevant lines covered (97.64%)

0.98 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

95.92
/src/main/java/org/apache/datasketches/theta/UpdateSketchBuilder.java
1
/*
2
 * Licensed to the Apache Software Foundation (ASF) under one
3
 * or more contributor license agreements.  See the NOTICE file
4
 * distributed with this work for additional information
5
 * regarding copyright ownership.  The ASF licenses this file
6
 * to you under the Apache License, Version 2.0 (the
7
 * "License"); you may not use this file except in compliance
8
 * with the License.  You may obtain a copy of the License at
9
 *
10
 *   http://www.apache.org/licenses/LICENSE-2.0
11
 *
12
 * Unless required by applicable law or agreed to in writing,
13
 * software distributed under the License is distributed on an
14
 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15
 * KIND, either express or implied.  See the License for the
16
 * specific language governing permissions and limitations
17
 * under the License.
18
 */
19

20
package org.apache.datasketches.theta;
21

22
import static org.apache.datasketches.common.Util.LS;
23
import static org.apache.datasketches.common.Util.TAB;
24
import static org.apache.datasketches.common.Util.ceilingPowerOf2;
25

26
import org.apache.datasketches.common.Family;
27
import org.apache.datasketches.common.ResizeFactor;
28
import org.apache.datasketches.common.SketchesArgumentException;
29
import org.apache.datasketches.common.SketchesStateException;
30
import org.apache.datasketches.common.SuppressFBWarnings;
31
import org.apache.datasketches.memory.DefaultMemoryRequestServer;
32
import org.apache.datasketches.memory.MemoryRequestServer;
33
import org.apache.datasketches.memory.WritableMemory;
34
import org.apache.datasketches.thetacommon.ThetaUtil;
35

36
/**
37
 * For building a new UpdateSketch.
38
 *
39
 * @author Lee Rhodes
40
 */
41
public class UpdateSketchBuilder {
42
  private int bLgNomLongs;
43
  private long bSeed;
44
  private ResizeFactor bRF;
45
  private Family bFam;
46
  private float bP;
47
  private MemoryRequestServer bMemReqSvr;
48

49
  //Fields for concurrent theta sketch
50
  private int bNumPoolThreads;
51
  private int bLocalLgNomLongs;
52
  private boolean bPropagateOrderedCompact;
53
  private double bMaxConcurrencyError;
54
  private int bMaxNumLocalThreads;
55

56
  /**
57
   * Constructor for building a new UpdateSketch. The default configuration is
58
   * <ul>
59
   * <li>Nominal Entries: {@value org.apache.datasketches.thetacommon.ThetaUtil#DEFAULT_NOMINAL_ENTRIES}</li>
60
   * <li>Seed: {@value org.apache.datasketches.thetacommon.ThetaUtil#DEFAULT_UPDATE_SEED}</li>
61
   * <li>Input Sampling Probability: 1.0</li>
62
   * <li>Family: {@link org.apache.datasketches.common.Family#QUICKSELECT}</li>
63
   * <li>Resize Factor: The default for sketches on the Java heap is {@link ResizeFactor#X8}.
64
   * For direct sketches, which are targeted for native memory off the Java heap, this value will
65
   * be fixed at either {@link ResizeFactor#X1} or {@link ResizeFactor#X2}.</li>
66
   * <li>MemoryRequestServer (Direct only):
67
   * {@link org.apache.datasketches.memory.DefaultMemoryRequestServer}.</li>
68
   * </ul>
69
   * Parameters unique to the concurrent sketches only:
70
   * <ul>
71
   * <li>Number of local Nominal Entries: 4</li>
72
   * <li>Concurrent NumPoolThreads: 3</li>
73
   * <li>Concurrent PropagateOrderedCompact: true</li>
74
   * <li>Concurrent MaxConcurrencyError: 0</li>
75
   * </ul>
76
   */
77
  public UpdateSketchBuilder() {
1✔
78
    bLgNomLongs = Integer.numberOfTrailingZeros(ThetaUtil.DEFAULT_NOMINAL_ENTRIES);
1✔
79
    bSeed = ThetaUtil.DEFAULT_UPDATE_SEED;
1✔
80
    bP = (float) 1.0;
1✔
81
    bRF = ResizeFactor.X8;
1✔
82
    bFam = Family.QUICKSELECT;
1✔
83
    bMemReqSvr = new DefaultMemoryRequestServer();
1✔
84
    // Default values for concurrent sketch
85
    bNumPoolThreads = ConcurrentPropagationService.NUM_POOL_THREADS;
1✔
86
    bLocalLgNomLongs = 4; //default is smallest legal QS sketch
1✔
87
    bPropagateOrderedCompact = true;
1✔
88
    bMaxConcurrencyError = 0;
1✔
89
    bMaxNumLocalThreads = 1;
1✔
90
  }
1✔
91

92
  /**
93
   * Sets the Nominal Entries for this sketch.
94
   * This value is also used for building a shared concurrent sketch.
95
   * The minimum value is 16 (2^4) and the maximum value is 67,108,864 (2^26).
96
   * Be aware that sketches as large as this maximum value may not have been
97
   * thoroughly tested or characterized for performance.
98
   *
99
   * @param nomEntries <a href="{@docRoot}/resources/dictionary.html#nomEntries">Nominal Entries</a>
100
   * This will become the ceiling power of 2 if the given value is not.
101
   * @return this UpdateSketchBuilder
102
   */
103
  public UpdateSketchBuilder setNominalEntries(final int nomEntries) {
104
    bLgNomLongs = ThetaUtil.checkNomLongs(nomEntries);
1✔
105
    return this;
1✔
106
  }
107

108
  /**
109
   * Alternative method of setting the Nominal Entries for this sketch from the log_base2 value.
110
   * This value is also used for building a shared concurrent sketch.
111
   * The minimum value is 4 and the maximum value is 26.
112
   * Be aware that sketches as large as this maximum value may not have been
113
   * thoroughly characterized for performance.
114
   *
115
   * @param lgNomEntries the Log Nominal Entries. Also for the concurrent shared sketch
116
   * @return this UpdateSketchBuilder
117
   */
118
  public UpdateSketchBuilder setLogNominalEntries(final int lgNomEntries) {
119
    bLgNomLongs = ThetaUtil.checkNomLongs(1 << lgNomEntries);
1✔
120
    return this;
1✔
121
  }
122

123
  /**
124
   * Returns Log-base 2 Nominal Entries
125
   * @return Log-base 2 Nominal Entries
126
   */
127
  public int getLgNominalEntries() {
128
    return bLgNomLongs;
1✔
129
  }
130

131
  /**
132
   * Sets the Nominal Entries for the concurrent local sketch. The minimum value is 16 and the
133
   * maximum value is 67,108,864, which is 2^26.
134
   * Be aware that sketches as large as this maximum
135
   * value have not been thoroughly tested or characterized for performance.
136
   *
137
   * @param nomEntries <a href="{@docRoot}/resources/dictionary.html#nomEntries">Nominal Entries</a>
138
   *                   This will become the ceiling power of 2 if it is not.
139
   * @return this UpdateSketchBuilder
140
   */
141
  public UpdateSketchBuilder setLocalNominalEntries(final int nomEntries) {
142
    bLocalLgNomLongs = Integer.numberOfTrailingZeros(ceilingPowerOf2(nomEntries));
1✔
143
    if ((bLocalLgNomLongs > ThetaUtil.MAX_LG_NOM_LONGS) || (bLocalLgNomLongs < ThetaUtil.MIN_LG_NOM_LONGS)) {
1✔
144
      throw new SketchesArgumentException(
1✔
145
          "Nominal Entries must be >= 16 and <= 67108864: " + nomEntries);
146
    }
147
    return this;
×
148
  }
149

150
  /**
151
   * Alternative method of setting the Nominal Entries for a local concurrent sketch from the
152
   * log_base2 value.
153
   * The minimum value is 4 and the maximum value is 26.
154
   * Be aware that sketches as large as this maximum
155
   * value have not been thoroughly tested or characterized for performance.
156
   *
157
   * @param lgNomEntries the Log Nominal Entries for a concurrent local sketch
158
   * @return this UpdateSketchBuilder
159
   */
160
  public UpdateSketchBuilder setLocalLogNominalEntries(final int lgNomEntries) {
161
    bLocalLgNomLongs = lgNomEntries;
1✔
162
    if ((bLocalLgNomLongs > ThetaUtil.MAX_LG_NOM_LONGS) || (bLocalLgNomLongs < ThetaUtil.MIN_LG_NOM_LONGS)) {
1✔
163
      throw new SketchesArgumentException(
1✔
164
          "Log Nominal Entries must be >= 4 and <= 26: " + lgNomEntries);
165
    }
166
    return this;
1✔
167
  }
168

169
  /**
170
   * Returns Log-base 2 Nominal Entries for the concurrent local sketch
171
   * @return Log-base 2 Nominal Entries for the concurrent local sketch
172
   */
173
  public int getLocalLgNominalEntries() {
174
    return bLocalLgNomLongs;
1✔
175
  }
176

177
  /**
178
   * Sets the long seed value that is required by the hashing function.
179
   * @param seed <a href="{@docRoot}/resources/dictionary.html#seed">See seed</a>
180
   * @return this UpdateSketchBuilder
181
   */
182
  public UpdateSketchBuilder setSeed(final long seed) {
183
    bSeed = seed;
1✔
184
    return this;
1✔
185
  }
186

187
  /**
188
   * Returns the seed
189
   * @return the seed
190
   */
191
  public long getSeed() {
192
    return bSeed;
1✔
193
  }
194

195
  /**
196
   * Sets the upfront uniform sampling probability, <i>p</i>
197
   * @param p <a href="{@docRoot}/resources/dictionary.html#p">See Sampling Probability, <i>p</i></a>
198
   * @return this UpdateSketchBuilder
199
   */
200
  public UpdateSketchBuilder setP(final float p) {
201
    if ((p <= 0.0) || (p > 1.0)) {
1✔
202
      throw new SketchesArgumentException("p must be > 0 and <= 1.0: " + p);
1✔
203
    }
204
    bP = p;
1✔
205
    return this;
1✔
206
  }
207

208
  /**
209
   * Returns the pre-sampling probability <i>p</i>
210
   * @return the pre-sampling probability <i>p</i>
211
   */
212
  public float getP() {
213
    return bP;
1✔
214
  }
215

216
  /**
217
   * Sets the cache Resize Factor.
218
   * @param rf <a href="{@docRoot}/resources/dictionary.html#resizeFactor">See Resize Factor</a>
219
   * @return this UpdateSketchBuilder
220
   */
221
  public UpdateSketchBuilder setResizeFactor(final ResizeFactor rf) {
222
    bRF = rf;
1✔
223
    return this;
1✔
224
  }
225

226
  /**
227
   * Returns the Resize Factor
228
   * @return the Resize Factor
229
   */
230
  public ResizeFactor getResizeFactor() {
231
    return bRF;
1✔
232
  }
233

234
  /**
235
   * Set the Family.
236
   * @param family the family for this builder
237
   * @return this UpdateSketchBuilder
238
   */
239
  public UpdateSketchBuilder setFamily(final Family family) {
240
    bFam = family;
1✔
241
    return this;
1✔
242
  }
243

244
  /**
245
   * Returns the Family
246
   * @return the Family
247
   */
248
  public Family getFamily() {
249
    return bFam;
1✔
250
  }
251

252
  /**
253
   * Set the MemoryRequestServer
254
   * @param memReqSvr the given MemoryRequestServer
255
   * @return this UpdateSketchBuilder
256
   */
257
  public UpdateSketchBuilder setMemoryRequestServer(final MemoryRequestServer memReqSvr) {
258
    bMemReqSvr = memReqSvr;
1✔
259
    return this;
1✔
260
  }
261

262
  /**
263
   * Returns the MemoryRequestServer
264
   * @return the MemoryRequestServer
265
   */
266
  public MemoryRequestServer getMemoryRequestServer() {
267
    return bMemReqSvr;
1✔
268
  }
269

270
  /**
271
   * Sets the number of pool threads used for background propagation in the concurrent sketches.
272
   * @param numPoolThreads the given number of pool threads
273
   */
274
  public void setNumPoolThreads(final int numPoolThreads) {
275
    bNumPoolThreads = numPoolThreads;
1✔
276
  }
1✔
277

278
  /**
279
   * Gets the number of background pool threads used for propagation in the concurrent sketches.
280
   * @return the number of background pool threads
281
   */
282
  public int getNumPoolThreads() {
283
    return bNumPoolThreads;
1✔
284
  }
285

286
  /**
287
   * Sets the Propagate Ordered Compact flag to the given value. Used with concurrent sketches.
288
   *
289
   * @param prop the given value
290
   * @return this UpdateSketchBuilder
291
   */
292
  public UpdateSketchBuilder setPropagateOrderedCompact(final boolean prop) {
293
    bPropagateOrderedCompact = prop;
1✔
294
    return this;
1✔
295
  }
296

297
  /**
298
   * Gets the Propagate Ordered Compact flag used with concurrent sketches.
299
   * @return the Propagate Ordered Compact flag
300
   */
301
  public boolean getPropagateOrderedCompact() {
302
    return bPropagateOrderedCompact;
×
303
  }
304

305
  /**
306
   * Sets the Maximum Concurrency Error.
307
   * @param maxConcurrencyError the given Maximum Concurrency Error.
308
   */
309
  public void setMaxConcurrencyError(final double maxConcurrencyError) {
310
    bMaxConcurrencyError = maxConcurrencyError;
1✔
311
  }
1✔
312

313
  /**
314
   * Gets the Maximum Concurrency Error
315
   * @return the Maximum Concurrency Error
316
   */
317
  public double getMaxConcurrencyError() {
318
    return bMaxConcurrencyError;
1✔
319
  }
320

321
  /**
322
   * Sets the Maximum Number of Local Threads.
323
   * This is used to set the size of the local concurrent buffers.
324
   * @param maxNumLocalThreads the given Maximum Number of Local Threads
325
   */
326
  public void setMaxNumLocalThreads(final int maxNumLocalThreads) {
327
    bMaxNumLocalThreads = maxNumLocalThreads;
1✔
328
  }
1✔
329

330
  /**
331
   * Gets the Maximum Number of Local Threads.
332
   * @return the Maximum Number of Local Threads.
333
   */
334
  public int getMaxNumLocalThreads() {
335
    return bMaxNumLocalThreads;
1✔
336
  }
337

338
  // BUILD FUNCTIONS
339

340
  /**
341
   * Returns an UpdateSketch with the current configuration of this Builder.
342
   * @return an UpdateSketch
343
   */
344
  public UpdateSketch build() {
345
    return build(null);
1✔
346
  }
347

348
  /**
349
   * Returns an UpdateSketch with the current configuration of this Builder
350
   * with the specified backing destination Memory store.
351
   * Note: this cannot be used with the Alpha Family of sketches.
352
   * @param dstMem The destination Memory.
353
   * @return an UpdateSketch
354
   */
355
  public UpdateSketch build(final WritableMemory dstMem) {
356
    UpdateSketch sketch = null;
1✔
357
    switch (bFam) {
1✔
358
      case ALPHA: {
359
        if (dstMem == null) {
1✔
360
          sketch = HeapAlphaSketch.newHeapInstance(bLgNomLongs, bSeed, bP, bRF);
1✔
361
        }
362
        else {
363
          throw new SketchesArgumentException("AlphaSketch cannot be made Direct to Memory.");
1✔
364
        }
365
        break;
366
      }
367
      case QUICKSELECT: {
368
        if (dstMem == null) {
1✔
369
          sketch =  new HeapQuickSelectSketch(bLgNomLongs, bSeed, bP, bRF, false);
1✔
370
        }
371
        else {
372
          sketch = new DirectQuickSelectSketch(
1✔
373
              bLgNomLongs, bSeed, bP, bRF, bMemReqSvr, dstMem, false);
374
        }
375
        break;
1✔
376
      }
377
      default: {
378
        throw new SketchesArgumentException(
1✔
379
          "Given Family cannot be built as a Theta Sketch: " + bFam.toString());
1✔
380
      }
381
    }
382
    return sketch;
1✔
383
  }
384

385
  /**
386
   * Returns an on-heap concurrent shared UpdateSketch with the current configuration of the
387
   * Builder.
388
   *
389
   * <p>The parameters unique to the shared concurrent sketch are:
390
   * <ul>
391
   * <li>Number of Pool Threads (default is 3)</li>
392
   * <li>Maximum Concurrency Error</li>
393
   * </ul>
394
   *
395
   * <p>Key parameters that are in common with other <i>Theta</i> sketches:
396
   * <ul>
397
   * <li>Nominal Entries or Log Nominal Entries (for the shared concurrent sketch)</li>
398
   * </ul>
399
   *
400
   * @return an on-heap concurrent UpdateSketch with the current configuration of the Builder.
401
   */
402
  public UpdateSketch buildShared() {
403
    return buildShared(null);
×
404
  }
405

406
  /**
407
   * Returns a direct (potentially off-heap) concurrent shared UpdateSketch with the current
408
   * configuration of the Builder and the given destination WritableMemory. If the destination
409
   * WritableMemory is null, this defaults to an on-heap concurrent shared UpdateSketch.
410
   *
411
   * <p>The parameters unique to the shared concurrent sketch are:
412
   * <ul>
413
   * <li>Number of Pool Threads (default is 3)</li>
414
   * <li>Maximum Concurrency Error</li>
415
   * </ul>
416
   *
417
   * <p>Key parameters that are in common with other <i>Theta</i> sketches:
418
   * <ul>
419
   * <li>Nominal Entries or Log Nominal Entries (for the shared concurrent sketch)</li>
420
   * <li>Destination Writable Memory (if not null, returned sketch is Direct. Default is null.)</li>
421
   * </ul>
422
   *
423
   * @param dstMem the given WritableMemory for Direct, otherwise <i>null</i>.
424
   * @return a concurrent UpdateSketch with the current configuration of the Builder
425
   * and the given destination WritableMemory.
426
   */
427
  @SuppressFBWarnings(value = "ST_WRITE_TO_STATIC_FROM_INSTANCE_METHOD",
428
      justification = "Harmless in Builder, fix later")
429
  public UpdateSketch buildShared(final WritableMemory dstMem) {
430
    ConcurrentPropagationService.NUM_POOL_THREADS = bNumPoolThreads;
1✔
431
    if (dstMem == null) {
1✔
432
      return new ConcurrentHeapQuickSelectSketch(bLgNomLongs, bSeed, bMaxConcurrencyError);
1✔
433
    } else {
434
      return new ConcurrentDirectQuickSelectSketch(bLgNomLongs, bSeed, bMaxConcurrencyError, dstMem);
1✔
435
    }
436
  }
437

438
  /**
439
   * Returns a direct (potentially off-heap) concurrent shared UpdateSketch with the current
440
   * configuration of the Builder, the data from the given sketch, and the given destination
441
   * WritableMemory. If the destination WritableMemory is null, this defaults to an on-heap
442
   * concurrent shared UpdateSketch.
443
   *
444
   * <p>The parameters unique to the shared concurrent sketch are:
445
   * <ul>
446
   * <li>Number of Pool Threads (default is 3)</li>
447
   * <li>Maximum Concurrency Error</li>
448
   * </ul>
449
   *
450
   * <p>Key parameters that are in common with other <i>Theta</i> sketches:
451
   * <ul>
452
   * <li>Nominal Entries or Log Nominal Entries (for the shared concurrent sketch)</li>
453
   * <li>Destination Writable Memory (if not null, returned sketch is Direct. Default is null.)</li>
454
   * </ul>
455
   *
456
   * @param sketch a given UpdateSketch from which the data is used to initialize the returned
457
   * shared sketch.
458
   * @param dstMem the given WritableMemory for Direct, otherwise <i>null</i>.
459
   * @return a concurrent UpdateSketch with the current configuration of the Builder
460
   * and the given destination WritableMemory.
461
   */
462
  @SuppressFBWarnings(value = "ST_WRITE_TO_STATIC_FROM_INSTANCE_METHOD",
463
      justification = "Harmless in Builder, fix later")
464
  public UpdateSketch buildSharedFromSketch(final UpdateSketch sketch, final WritableMemory dstMem) {
465
    ConcurrentPropagationService.NUM_POOL_THREADS = bNumPoolThreads;
1✔
466
    if (dstMem == null) {
1✔
467
      return new ConcurrentHeapQuickSelectSketch(sketch, bSeed, bMaxConcurrencyError);
1✔
468
    } else {
469
      return new ConcurrentDirectQuickSelectSketch(sketch, bSeed, bMaxConcurrencyError, dstMem);
1✔
470
    }
471
  }
472

473
  /**
474
   * Returns a local, on-heap, concurrent UpdateSketch to be used as a per-thread local buffer
475
   * along with the given concurrent shared UpdateSketch and the current configuration of this
476
   * Builder.
477
   *
478
   * <p>The parameters unique to the local concurrent sketch are:
479
   * <ul>
480
   * <li>Local Nominal Entries or Local Log Nominal Entries</li>
481
   * <li>Propagate Ordered Compact flag</li>
482
   * </ul>
483
   *
484
   * @param shared the concurrent shared sketch to be accessed via the concurrent local sketch.
485
   * @return an UpdateSketch to be used as a per-thread local buffer.
486
   */
487
  public UpdateSketch buildLocal(final UpdateSketch shared) {
488
    if ((shared == null) || !(shared instanceof ConcurrentSharedThetaSketch)) {
1✔
489
      throw new SketchesStateException("The concurrent shared sketch must be built first.");
×
490
    }
491
    return new ConcurrentHeapThetaBuffer(bLocalLgNomLongs, bSeed,
1✔
492
        (ConcurrentSharedThetaSketch) shared, bPropagateOrderedCompact, bMaxNumLocalThreads);
493
  }
494

495
  @Override
496
  public String toString() {
497
    final StringBuilder sb = new StringBuilder();
1✔
498
    sb.append("UpdateSketchBuilder configuration:").append(LS);
1✔
499
    sb.append("LgK:").append(TAB).append(bLgNomLongs).append(LS);
1✔
500
    sb.append("K:").append(TAB).append(1 << bLgNomLongs).append(LS);
1✔
501
    sb.append("LgLocalK:").append(TAB).append(bLocalLgNomLongs).append(LS);
1✔
502
    sb.append("LocalK:").append(TAB).append(1 << bLocalLgNomLongs).append(LS);
1✔
503
    sb.append("Seed:").append(TAB).append(bSeed).append(LS);
1✔
504
    sb.append("p:").append(TAB).append(bP).append(LS);
1✔
505
    sb.append("ResizeFactor:").append(TAB).append(bRF).append(LS);
1✔
506
    sb.append("Family:").append(TAB).append(bFam).append(LS);
1✔
507
    final String mrsStr = bMemReqSvr.getClass().getSimpleName();
1✔
508
    sb.append("MemoryRequestServer:").append(TAB).append(mrsStr).append(LS);
1✔
509
    sb.append("Propagate Ordered Compact").append(TAB).append(bPropagateOrderedCompact).append(LS);
1✔
510
    sb.append("NumPoolThreads").append(TAB).append(bNumPoolThreads).append(LS);
1✔
511
    sb.append("MaxConcurrencyError").append(TAB).append(bMaxConcurrencyError).append(LS);
1✔
512
    sb.append("MaxNumLocalThreads").append(TAB).append(bMaxNumLocalThreads).append(LS);
1✔
513
    return sb.toString();
1✔
514
  }
515

516
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc