• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

apache / datasketches-java / #306

30 Apr 2024 10:01PM UTC coverage: 97.645% (-0.5%) from 98.139%
#306

push

web-flow
Merge pull request #555 from apache/fix_pom_xml_header

Fix pom xml header

26865 of 27513 relevant lines covered (97.64%)

0.98 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

91.67
/src/main/java/org/apache/datasketches/theta/ConcurrentHeapQuickSelectSketch.java
1
/*
2
 * Licensed to the Apache Software Foundation (ASF) under one
3
 * or more contributor license agreements.  See the NOTICE file
4
 * distributed with this work for additional information
5
 * regarding copyright ownership.  The ASF licenses this file
6
 * to you under the Apache License, Version 2.0 (the
7
 * "License"); you may not use this file except in compliance
8
 * with the License.  You may obtain a copy of the License at
9
 *
10
 *   http://www.apache.org/licenses/LICENSE-2.0
11
 *
12
 * Unless required by applicable law or agreed to in writing,
13
 * software distributed under the License is distributed on an
14
 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15
 * KIND, either express or implied.  See the License for the
16
 * specific language governing permissions and limitations
17
 * under the License.
18
 */
19

20
package org.apache.datasketches.theta;
21

22
import java.util.concurrent.ExecutorService;
23
import java.util.concurrent.TimeUnit;
24
import java.util.concurrent.atomic.AtomicBoolean;
25

26
import org.apache.datasketches.common.ResizeFactor;
27
import org.apache.datasketches.common.SuppressFBWarnings;
28

29
/**
30
 * A concurrent shared sketch that is based on HeapQuickSelectSketch.
31
 * It reflects all data processed by a single or multiple update threads, and can serve queries at
32
 * any time.
33
 * Background propagation threads are used to propagate data from thread local buffers into this
34
 * sketch which stores the most up-to-date estimation of number of unique items.
35
 *
36
 * @author eshcar
37
 * @author Lee Rhodes
38
 */
39
final class ConcurrentHeapQuickSelectSketch extends HeapQuickSelectSketch
40
    implements ConcurrentSharedThetaSketch {
41

42
  // The propagation thread
43
  private volatile ExecutorService executorService_;
44

45
  //A flag to coordinate between several eager propagation threads
46
  private final AtomicBoolean sharedPropagationInProgress_;
47

48
  // Theta value of concurrent sketch
49
  private volatile long volatileThetaLong_;
50

51
  // A snapshot of the estimated number of unique entries
52
  private volatile double volatileEstimate_;
53

54
  // Num of retained entries in which the sketch toggles from sync (exact) mode to async
55
  //  propagation mode
56
  private final long exactLimit_;
57

58
  // An epoch defines an interval between two resets. A propagation invoked at epoch i cannot
59
  // affect the sketch at epoch j > i.
60
  private volatile long epoch_;
61

62
  /**
63
   * Construct a new sketch instance on the java heap.
64
   *
65
   * @param lgNomLongs <a href="{@docRoot}/resources/dictionary.html#lgNomLogs">See lgNomLongs</a>.
66
   * @param seed       <a href="{@docRoot}/resources/dictionary.html#seed">See seed</a>
67
   * @param maxConcurrencyError the max error value including error induced by concurrency
68
   *
69
   */
70
  ConcurrentHeapQuickSelectSketch(final int lgNomLongs, final long seed,
71
      final double maxConcurrencyError) {
72
    super(lgNomLongs, seed, 1.0F, //p
1✔
73
        ResizeFactor.X1, //rf,
74
        false); //unionGadget
75

76
    volatileThetaLong_ = Long.MAX_VALUE;
1✔
77
    volatileEstimate_ = 0;
1✔
78
    exactLimit_ = ConcurrentSharedThetaSketch.computeExactLimit(1L << getLgNomLongs(),
1✔
79
        maxConcurrencyError);
80
    sharedPropagationInProgress_ = new AtomicBoolean(false);
1✔
81
    epoch_ = 0;
1✔
82
    initBgPropagationService();
1✔
83
  }
1✔
84

85
  ConcurrentHeapQuickSelectSketch(final UpdateSketch sketch, final long seed,
86
      final double maxConcurrencyError) {
87
    super(sketch.getLgNomLongs(), seed, 1.0F, //p
1✔
88
        ResizeFactor.X1, //rf,
89
        false); //unionGadget
90

91
    exactLimit_ = ConcurrentSharedThetaSketch.computeExactLimit(1L << getLgNomLongs(),
1✔
92
        maxConcurrencyError);
93
    sharedPropagationInProgress_ = new AtomicBoolean(false);
1✔
94
    epoch_ = 0;
1✔
95
    initBgPropagationService();
1✔
96
    for (final long hashIn : sketch.getCache()) {
1✔
97
      propagate(hashIn);
1✔
98
    }
99
    thetaLong_ = sketch.getThetaLong();
1✔
100
    updateVolatileTheta();
1✔
101
    updateEstimationSnapshot();
1✔
102
  }
1✔
103

104
  //Sketch overrides
105

106
  @Override
107
  public double getEstimate() {
108
    return volatileEstimate_;
1✔
109
  }
110

111
  @Override
112
  public boolean isEstimationMode() {
113
    return (getRetainedEntries(false) > exactLimit_) || super.isEstimationMode();
1✔
114
  }
115

116
  @Override
117
  public byte[] toByteArray() {
118
    while (!sharedPropagationInProgress_.compareAndSet(false, true)) { } //busy wait till free
1✔
119
    final byte[] res = super.toByteArray();
1✔
120
    sharedPropagationInProgress_.set(false);
1✔
121
    return res;
1✔
122
  }
123

124
  //UpdateSketch overrides
125

126
  @Override
127
  public UpdateSketch rebuild() {
128
    super.rebuild();
1✔
129
    updateEstimationSnapshot();
1✔
130
    return this;
1✔
131
  }
132

133
  /**
134
   * {@inheritDoc}
135
   * Takes care of mutual exclusion with propagation thread.
136
   */
137
  @Override
138
  public void reset() {
139
    advanceEpoch();
1✔
140
    super.reset();
1✔
141
    volatileThetaLong_ = Long.MAX_VALUE;
1✔
142
    volatileEstimate_ = 0;
1✔
143
  }
1✔
144

145
  @Override
146
  UpdateReturnState hashUpdate(final long hash) {
147
    final String msg = "No update method should be called directly to a shared theta sketch."
1✔
148
        + " Updating the shared sketch is only permitted through propagation from local sketches.";
149
    throw new UnsupportedOperationException(msg);
1✔
150
  }
151

152
  //ConcurrentSharedThetaSketch declarations
153

154
  @Override
155
  public long getExactLimit() {
156
    return exactLimit_;
1✔
157
  }
158

159
  @Override
160
  public boolean startEagerPropagation() {
161
    while (!sharedPropagationInProgress_.compareAndSet(false, true)) { } //busy wait till free
1✔
162
    return (!isEstimationMode());// no eager propagation is allowed in estimation mode
1✔
163
  }
164

165
  @Override
166
  public void endPropagation(final AtomicBoolean localPropagationInProgress, final boolean isEager) {
167
    //update volatile theta, uniques estimate and propagation flag
168
    updateVolatileTheta();
1✔
169
    updateEstimationSnapshot();
1✔
170
    if (isEager) {
1✔
171
      sharedPropagationInProgress_.set(false);
1✔
172
    }
173
    if (localPropagationInProgress != null) {
1✔
174
      localPropagationInProgress.set(false); //clear local propagation flag
1✔
175
    }
176
  }
1✔
177

178
  @Override
179
  public long getVolatileTheta() {
180
    return volatileThetaLong_;
1✔
181
  }
182

183
  @Override
184
  public void awaitBgPropagationTermination() {
185
    try {
186
      executorService_.shutdown();
1✔
187
      while (!executorService_.awaitTermination(1, TimeUnit.MILLISECONDS)) {
1✔
188
        Thread.sleep(1);
×
189
      }
190
    } catch (final InterruptedException e) {
×
191
      e.printStackTrace();
×
192
    }
1✔
193
  }
1✔
194

195
  @Override
196
  public void initBgPropagationService() {
197
    executorService_ = ConcurrentPropagationService.getExecutorService(Thread.currentThread().getId());
1✔
198
  }
1✔
199

200
  @Override
201
  public boolean propagate(final AtomicBoolean localPropagationInProgress,
202
                           final Sketch sketchIn, final long singleHash) {
203
    final long epoch = epoch_;
1✔
204
    if ((singleHash != NOT_SINGLE_HASH)                 //namely, is a single hash and
1✔
205
        && (getRetainedEntries(false) < exactLimit_)) { //a small sketch then propagate myself (blocking)
1✔
206
      if (!startEagerPropagation()) {
1✔
207
        endPropagation(localPropagationInProgress, true);
×
208
        return false;
×
209
      }
210
      if (!validateEpoch(epoch)) {
1✔
211
        endPropagation(null, true); // do not change local flag
×
212
        return true;
×
213
      }
214
      propagate(singleHash);
1✔
215
      endPropagation(localPropagationInProgress, true);
1✔
216
      return true;
1✔
217
    }
218
    // otherwise, be nonblocking, let background thread do the work
219
    final ConcurrentBackgroundThetaPropagation job = new ConcurrentBackgroundThetaPropagation(
1✔
220
        this, localPropagationInProgress, sketchIn, singleHash, epoch);
221
    executorService_.execute(job);
1✔
222
    return true;
1✔
223
  }
224

225
  @Override
226
  public void propagate(final long singleHash) {
227
    super.hashUpdate(singleHash);
1✔
228
  }
1✔
229

230
  @Override
231
  public void updateEstimationSnapshot() {
232
    volatileEstimate_ = super.getEstimate();
1✔
233
  }
1✔
234

235
  @Override
236
  public void updateVolatileTheta() {
237
    volatileThetaLong_ = getThetaLong();
1✔
238
  }
1✔
239

240
  @Override
241
  public boolean validateEpoch(final long epoch) {
242
    return epoch_ == epoch;
1✔
243
  }
244

245
  //Restricted
246

247
  /**
248
   * Advances the epoch while there is no background propagation
249
   * This ensures a propagation invoked before the reset cannot affect the sketch after the reset
250
   * is completed.
251
   */
252
  @SuppressFBWarnings(value = "VO_VOLATILE_INCREMENT", justification = "Likely False Positive, Fix Later")
253
  private void advanceEpoch() {
254
    awaitBgPropagationTermination();
1✔
255
    startEagerPropagation();
1✔
256
    ConcurrentPropagationService.resetExecutorService(Thread.currentThread().getId());
1✔
257
    //no inspection NonAtomicOperationOnVolatileField
258
    // this increment of a volatile field is done within the scope of the propagation
259
    // synchronization and hence is done by a single thread
260
    // Ignore a FindBugs warning
261
    epoch_++;
1✔
262
    endPropagation(null, true);
1✔
263
    initBgPropagationService();
1✔
264
  }
1✔
265

266
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc