• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

apache / datasketches-java / #306

30 Apr 2024 10:01PM UTC coverage: 97.645% (-0.5%) from 98.139%
#306

push

web-flow
Merge pull request #555 from apache/fix_pom_xml_header

Fix pom xml header

26865 of 27513 relevant lines covered (97.64%)

0.98 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

91.67
/src/main/java/org/apache/datasketches/theta/ConcurrentDirectQuickSelectSketch.java
1
/*
2
 * Licensed to the Apache Software Foundation (ASF) under one
3
 * or more contributor license agreements.  See the NOTICE file
4
 * distributed with this work for additional information
5
 * regarding copyright ownership.  The ASF licenses this file
6
 * to you under the Apache License, Version 2.0 (the
7
 * "License"); you may not use this file except in compliance
8
 * with the License.  You may obtain a copy of the License at
9
 *
10
 *   http://www.apache.org/licenses/LICENSE-2.0
11
 *
12
 * Unless required by applicable law or agreed to in writing,
13
 * software distributed under the License is distributed on an
14
 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15
 * KIND, either express or implied.  See the License for the
16
 * specific language governing permissions and limitations
17
 * under the License.
18
 */
19

20
package org.apache.datasketches.theta;
21

22
import static org.apache.datasketches.theta.PreambleUtil.THETA_LONG;
23

24
import java.util.concurrent.ExecutorService;
25
import java.util.concurrent.TimeUnit;
26
import java.util.concurrent.atomic.AtomicBoolean;
27

28
import org.apache.datasketches.common.ResizeFactor;
29
import org.apache.datasketches.common.SuppressFBWarnings;
30
import org.apache.datasketches.memory.WritableMemory;
31

32
/**
33
 * A concurrent shared sketch that is based on DirectQuickSelectSketch.
34
 * It reflects all data processed by a single or multiple update threads, and can serve queries at
35
 * any time.
36
 * Background propagation threads are used to propagate data from thread local buffers into this
37
 * sketch which stores the most up-to-date estimation of number of unique items.
38
 *
39
 * @author eshcar
40
 * @author Lee Rhodes
41
 */
42
final class ConcurrentDirectQuickSelectSketch extends DirectQuickSelectSketch
43
    implements ConcurrentSharedThetaSketch {
44

45
  // The propagation thread
46
  private ExecutorService executorService_;
47

48
  // A flag to coordinate between several eager propagation threads
49
  private final AtomicBoolean sharedPropagationInProgress_;
50

51
  // Theta value of concurrent sketch
52
  private volatile long volatileThetaLong_;
53

54
  // A snapshot of the estimated number of unique entries
55
  private volatile double volatileEstimate_;
56

57
  // Num of retained entries in which the sketch toggles from sync (exact) mode to async
58
  //  propagation mode
59
  private final long exactLimit_;
60

61
  // An epoch defines an interval between two resets. A propagation invoked at epoch i cannot
62
  // affect the sketch at epoch j > i.
63
  private volatile long epoch_;
64

65
  /**
66
   * Construct a new sketch instance and initialize the given Memory as its backing store.
67
   *
68
   * @param lgNomLongs <a href="{@docRoot}/resources/dictionary.html#lgNomLongs">See lgNomLongs</a>.
69
   * @param seed       <a href="{@docRoot}/resources/dictionary.html#seed">See Update Hash Seed</a>.
70
   * @param maxConcurrencyError the max error value including error induced by concurrency.
71
   * @param dstMem     the given Memory object destination. It cannot be null.
72
   */
73
  ConcurrentDirectQuickSelectSketch(final int lgNomLongs, final long seed,
74
      final double maxConcurrencyError, final WritableMemory dstMem) {
75
    super(lgNomLongs, seed, 1.0F, //p
1✔
76
      ResizeFactor.X1, //rf,
77
      null, dstMem, false); //unionGadget
78

79
    volatileThetaLong_ = Long.MAX_VALUE;
1✔
80
    volatileEstimate_ = 0;
1✔
81
    exactLimit_ = ConcurrentSharedThetaSketch.computeExactLimit(1L << getLgNomLongs(),
1✔
82
        maxConcurrencyError);
83
    sharedPropagationInProgress_ = new AtomicBoolean(false);
1✔
84
    epoch_ = 0;
1✔
85
    initBgPropagationService();
1✔
86
  }
1✔
87

88
  ConcurrentDirectQuickSelectSketch(final UpdateSketch sketch, final long seed,
89
      final double maxConcurrencyError, final WritableMemory dstMem) {
90
    super(sketch.getLgNomLongs(), seed, 1.0F, //p
1✔
91
        ResizeFactor.X1, //rf,
92
        null, //mem Req Svr
93
        dstMem,
94
        false); //unionGadget
95

96
    exactLimit_ = ConcurrentSharedThetaSketch.computeExactLimit(1L << getLgNomLongs(),
1✔
97
        maxConcurrencyError);
98
    sharedPropagationInProgress_ = new AtomicBoolean(false);
1✔
99
    epoch_ = 0;
1✔
100
    initBgPropagationService();
1✔
101
    for (final long hashIn : sketch.getCache()) {
1✔
102
      propagate(hashIn);
1✔
103
    }
104
    wmem_.putLong(THETA_LONG, sketch.getThetaLong());
1✔
105
    updateVolatileTheta();
1✔
106
    updateEstimationSnapshot();
1✔
107
  }
1✔
108

109
  //Sketch overrides
110

111
  @Override
112
  public double getEstimate() {
113
    return volatileEstimate_;
1✔
114
  }
115

116
  @Override
117
  public boolean isEstimationMode() {
118
    return (getRetainedEntries(false) > exactLimit_) || super.isEstimationMode();
1✔
119
  }
120

121
  @Override
122
  public byte[] toByteArray() {
123
    while (!sharedPropagationInProgress_.compareAndSet(false, true)) { } //busy wait till free
1✔
124
    final byte[] res = super.toByteArray();
1✔
125
    sharedPropagationInProgress_.set(false);
1✔
126
    return res;
1✔
127
  }
128

129
  //UpdateSketch overrides
130

131
  @Override
132
  public UpdateSketch rebuild() {
133
    super.rebuild();
1✔
134
    updateEstimationSnapshot();
1✔
135
    return this;
1✔
136
  }
137

138
  /**
139
   * {@inheritDoc}
140
   * Takes care of mutual exclusion with propagation thread.
141
   */
142
  @Override
143
  public void reset() {
144
    advanceEpoch();
1✔
145
    super.reset();
1✔
146
    volatileThetaLong_ = Long.MAX_VALUE;
1✔
147
    volatileEstimate_ = 0;
1✔
148
  }
1✔
149

150
  @Override
151
  UpdateReturnState hashUpdate(final long hash) {
152
    final String msg = "No update method should be called directly to a shared theta sketch."
1✔
153
        + " Updating the shared sketch is only permitted through propagation from local sketches.";
154
    throw new UnsupportedOperationException(msg);
1✔
155
  }
156

157
  //ConcurrentSharedThetaSketch declarations
158

159
  @Override
160
  public long getExactLimit() {
161
    return exactLimit_;
1✔
162
  }
163

164
  @Override
165
  public boolean startEagerPropagation() {
166
    while (!sharedPropagationInProgress_.compareAndSet(false, true)) { } //busy wait till free
1✔
167
    return (!isEstimationMode());// no eager propagation is allowed in estimation mode
1✔
168
  }
169

170
  @Override
171
  public void endPropagation(final AtomicBoolean localPropagationInProgress, final boolean isEager) {
172
    //update volatile theta, uniques estimate and propagation flag
173
    updateVolatileTheta();
1✔
174
    updateEstimationSnapshot();
1✔
175
    if (isEager) {
1✔
176
      sharedPropagationInProgress_.set(false);
1✔
177
    }
178
    if (localPropagationInProgress != null) {
1✔
179
      localPropagationInProgress.set(false); //clear local propagation flag
1✔
180
    }
181
  }
1✔
182

183
  @Override
184
  public long getVolatileTheta() {
185
    return volatileThetaLong_;
1✔
186
  }
187

188
  @Override
189
  public void awaitBgPropagationTermination() {
190
    try {
191
      executorService_.shutdown();
1✔
192
      while (!executorService_.awaitTermination(1, TimeUnit.MILLISECONDS)) {
1✔
193
        Thread.sleep(1);
×
194
      }
195
    } catch (final InterruptedException e) {
×
196
      e.printStackTrace();
×
197
    }
1✔
198
  }
1✔
199

200
  @Override
201
  public final void initBgPropagationService() {
202
    executorService_ = ConcurrentPropagationService.getExecutorService(Thread.currentThread().getId());
1✔
203
  }
1✔
204

205
  @Override
206
  public boolean propagate(final AtomicBoolean localPropagationInProgress,
207
                           final Sketch sketchIn, final long singleHash) {
208
    final long epoch = epoch_;
1✔
209
    if ((singleHash != NOT_SINGLE_HASH)                   // namely, is a single hash and
1✔
210
        && (getRetainedEntries(false) < exactLimit_)) {   // a small sketch then propagate myself (blocking)
1✔
211
      if (!startEagerPropagation()) {
1✔
212
        endPropagation(localPropagationInProgress, true);
×
213
        return false;
×
214
      }
215
      if (!validateEpoch(epoch)) {
1✔
216
        endPropagation(null, true); // do not change local flag
×
217
        return true;
×
218
      }
219
      propagate(singleHash);
1✔
220
      endPropagation(localPropagationInProgress, true);
1✔
221
      return true;
1✔
222
    }
223
    // otherwise, be nonblocking, let background thread do the work
224
    final ConcurrentBackgroundThetaPropagation job = new ConcurrentBackgroundThetaPropagation(
1✔
225
        this, localPropagationInProgress, sketchIn, singleHash, epoch);
226
    executorService_.execute(job);
1✔
227
    return true;
1✔
228
  }
229

230
  @Override
231
  public void propagate(final long singleHash) {
232
    super.hashUpdate(singleHash);
1✔
233
  }
1✔
234

235
  @Override
236
  public void updateEstimationSnapshot() {
237
    volatileEstimate_ = super.getEstimate();
1✔
238
  }
1✔
239

240
  @Override
241
  public void updateVolatileTheta() {
242
    volatileThetaLong_ = getThetaLong();
1✔
243
  }
1✔
244

245
  @Override
246
  public boolean validateEpoch(final long epoch) {
247
    return epoch_ == epoch;
1✔
248
  }
249

250
  //Restricted
251

252
  /**
253
   * Advances the epoch while there is no background propagation
254
   * This ensures a propagation invoked before the reset cannot affect the sketch after the reset
255
   * is completed. Ignore VO_VOLATILE_INCREMENT findbugs warning, it is False Positive.
256
   */
257
  @SuppressFBWarnings(value = "VO_VOLATILE_INCREMENT", justification = "Likely False Positive, Fix Later")
258
  private void advanceEpoch() {
259
    awaitBgPropagationTermination();
1✔
260
    startEagerPropagation();
1✔
261
    ConcurrentPropagationService.resetExecutorService(Thread.currentThread().getId());
1✔
262
    //no inspection NonAtomicOperationOnVolatileField
263
    // this increment of a volatile field is done within the scope of the propagation
264
    // synchronization and hence is done by a single thread.
265
    epoch_++;
1✔
266
    endPropagation(null, true);
1✔
267
    initBgPropagationService();
1✔
268
  }
1✔
269

270
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc