• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

apache / datasketches-go / 22620505955

03 Mar 2026 11:15AM UTC coverage: 86.407% (-0.009%) from 86.416%
22620505955

push

github

web-flow
Merge pull request #131 from proost/test-fix-flaky-test

test: fix flaky test

20576 of 23813 relevant lines covered (86.41%)

0.94 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

87.43
/tuple/intersection.go
1
/*
2
 * Licensed to the Apache Software Foundation (ASF) under one or more
3
 * contributor license agreements.  See the NOTICE file distributed with
4
 * this work for additional information regarding copyright ownership.
5
 * The ASF licenses this file to You under the Apache License, Version 2.0
6
 * (the "License"); you may not use this file except in compliance with
7
 * the License.  You may obtain a copy of the License at
8
 *
9
 *     http://www.apache.org/licenses/LICENSE-2.0
10
 *
11
 * Unless required by applicable law or agreed to in writing, software
12
 * distributed under the License is distributed on an "AS IS" BASIS,
13
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14
 * See the License for the specific language governing permissions and
15
 * limitations under the License.
16
 */
17

18
package tuple
19

20
import (
21
        "errors"
22
        "slices"
23

24
        "github.com/apache/datasketches-go/internal"
25
        "github.com/apache/datasketches-go/theta"
26
)
27

28
type intersectionOptions struct {
29
        seed uint64
30
}
31

32
type IntersectionOptionFunc func(*intersectionOptions)
33

34
// WithIntersectionSeed sets the seed for the hash function.
35
func WithIntersectionSeed(seed uint64) IntersectionOptionFunc {
1✔
36
        return func(i *intersectionOptions) {
2✔
37
                i.seed = seed
1✔
38
        }
1✔
39
}
40

41
// Intersection computes the intersection of sketches.
42
type Intersection[S Summary] struct {
43
        hashtable     *hashtable[S]
44
        policy        Policy[S]
45
        applyFunc     func(S, S) S
46
        entryLessFunc func(a, b entry[S]) int
47
        isValid       bool
48
}
49

50
// NewIntersection creates a new intersection.
51
func NewIntersection[S Summary](policy Policy[S], opts ...IntersectionOptionFunc) *Intersection[S] {
1✔
52
        options := &intersectionOptions{
1✔
53
                seed: theta.DefaultSeed,
1✔
54
        }
1✔
55
        for _, opt := range opts {
2✔
56
                opt(options)
1✔
57
        }
1✔
58

59
        return &Intersection[S]{
1✔
60
                hashtable: newHashtable[S](
1✔
61
                        0, 0, theta.ResizeX1, 1.0, theta.MaxTheta, options.seed, false,
1✔
62
                ),
1✔
63
                entryLessFunc: func(a, b entry[S]) int {
2✔
64
                        if a.Hash < b.Hash {
2✔
65
                                return -1
1✔
66
                        } else if a.Hash > b.Hash {
3✔
67
                                return 1
1✔
68
                        }
1✔
69
                        return 0
×
70
                },
71
                policy:  policy,
72
                isValid: false,
73
        }
74
}
75

76
// NewIntersectionWithSummaryMergeFunc creates a new intersection that uses a function to merge summaries.
77
// This is useful for value-type summaries where Policy.Apply cannot mutate the internal summary.
78
func NewIntersectionWithSummaryMergeFunc[S Summary](
79
        applyFunc func(S, S) S, opts ...IntersectionOptionFunc,
80
) *Intersection[S] {
1✔
81
        options := &intersectionOptions{
1✔
82
                seed: theta.DefaultSeed,
1✔
83
        }
1✔
84
        for _, opt := range opts {
1✔
85
                opt(options)
×
86
        }
×
87

88
        return &Intersection[S]{
1✔
89
                hashtable: newHashtable[S](
1✔
90
                        0, 0, theta.ResizeX1, 1.0, theta.MaxTheta, options.seed, false,
1✔
91
                ),
1✔
92
                entryLessFunc: func(a, b entry[S]) int {
2✔
93
                        if a.Hash < b.Hash {
2✔
94
                                return -1
1✔
95
                        } else if a.Hash > b.Hash {
3✔
96
                                return 1
1✔
97
                        }
1✔
98
                        return 0
×
99
                },
100
                applyFunc: applyFunc,
101
                isValid:   false,
102
        }
103
}
104

105
// Update updates the intersection with a given sketch.
106
func (i *Intersection[S]) Update(sketch Sketch[S]) error {
1✔
107
        if i.hashtable.isEmpty {
2✔
108
                return nil
1✔
109
        }
1✔
110

111
        seedHash, err := internal.ComputeSeedHash(int64(i.hashtable.seed))
1✔
112
        if err != nil {
1✔
113
                return err
×
114
        }
×
115
        sketchSeedHash, err := sketch.SeedHash()
1✔
116
        if err != nil {
1✔
117
                return err
×
118
        }
×
119
        if !sketch.IsEmpty() && sketchSeedHash != uint16(seedHash) {
2✔
120
                return errors.New("seed hash mismatch")
1✔
121
        }
1✔
122

123
        i.hashtable.isEmpty = i.hashtable.isEmpty || sketch.IsEmpty()
1✔
124
        if i.hashtable.isEmpty {
2✔
125
                i.hashtable.theta = theta.MaxTheta
1✔
126
        } else {
2✔
127
                i.hashtable.theta = min(i.hashtable.theta, sketch.Theta64())
1✔
128
        }
1✔
129

130
        if i.isValid && i.hashtable.numEntries == 0 {
2✔
131
                return nil
1✔
132
        }
1✔
133

134
        if sketch.NumRetained() == 0 {
2✔
135
                i.isValid = true
1✔
136
                i.hashtable = newHashtable[S](
1✔
137
                        0, 0, theta.ResizeX1, 1.0, i.hashtable.theta, i.hashtable.seed, i.hashtable.isEmpty,
1✔
138
                )
1✔
139
                return nil
1✔
140
        }
1✔
141

142
        if !i.isValid { // first update, copy or move incoming sketch
2✔
143
                i.isValid = true
1✔
144

1✔
145
                lgSize := internal.LgSizeFromCount(sketch.NumRetained(), rebuildThreshold)
1✔
146
                i.hashtable = newHashtable[S](lgSize, lgSize-1, theta.ResizeX1, 1.0, i.hashtable.theta, i.hashtable.seed, i.hashtable.isEmpty)
1✔
147

1✔
148
                for hash, summary := range sketch.All() {
2✔
149
                        idx, err := i.hashtable.Find(hash)
1✔
150
                        if err == nil {
1✔
151
                                return errors.New("duplicate key, possibly corrupted input sketch")
×
152
                        }
×
153

154
                        i.hashtable.Insert(idx, entry[S]{
1✔
155
                                Hash:    hash,
1✔
156
                                Summary: summary,
1✔
157
                        })
1✔
158
                }
159

160
                if i.hashtable.numEntries != sketch.NumRetained() {
1✔
161
                        return errors.New("num entries mismatch, possibly corrupted input sketch")
×
162
                }
×
163

164
                return nil
1✔
165
        }
166

167
        // intersection
168
        var (
1✔
169
                maxMatches     = min(i.hashtable.numEntries, sketch.NumRetained())
1✔
170
                matchesEntries = make([]entry[S], 0, maxMatches)
1✔
171
                matchCount     = 0
1✔
172
                count          = 0
1✔
173
        )
1✔
174
        for hash, summary := range sketch.All() {
2✔
175
                if hash < i.hashtable.theta {
2✔
176
                        key, err := i.hashtable.Find(hash)
1✔
177
                        if err == nil {
2✔
178
                                if uint32(matchCount) == maxMatches {
1✔
179
                                        return errors.New("max matches exceeded, possibly corrupted input sketch")
×
180
                                }
×
181

182
                                if i.applyFunc != nil {
2✔
183
                                        i.hashtable.entries[key].Summary = i.applyFunc(i.hashtable.entries[key].Summary, summary)
1✔
184
                                } else {
2✔
185
                                        i.policy.Apply(i.hashtable.entries[key].Summary, summary)
1✔
186
                                }
1✔
187

188
                                matchesEntries = append(matchesEntries, i.hashtable.entries[key])
1✔
189
                                matchCount++
1✔
190
                        }
191
                } else if sketch.IsOrdered() {
2✔
192
                        // early stop
1✔
193
                        break
1✔
194
                }
195

196
                count++
1✔
197
        }
198

199
        if count > int(sketch.NumRetained()) {
1✔
200
                return errors.New("more keys than expected, possibly corrupted input sketch")
×
201
        }
×
202
        if !sketch.IsOrdered() && count < int(sketch.NumRetained()) {
1✔
203
                return errors.New("fewer keys than expected, possibly corrupted input sketch")
×
204
        }
×
205

206
        if matchCount == 0 {
2✔
207
                i.hashtable = newHashtable[S](
1✔
208
                        0, 0, theta.ResizeX1, 1.0, i.hashtable.theta, i.hashtable.seed, i.hashtable.isEmpty,
1✔
209
                )
1✔
210
                if i.hashtable.theta == theta.MaxTheta {
2✔
211
                        i.hashtable.isEmpty = true
1✔
212
                }
1✔
213
        } else {
1✔
214
                lgSize := internal.LgSizeFromCount(uint32(matchCount), rebuildThreshold)
1✔
215
                i.hashtable = newHashtable[S](
1✔
216
                        lgSize, lgSize-1, theta.ResizeX1, 1.0, i.hashtable.theta, i.hashtable.seed, i.hashtable.isEmpty,
1✔
217
                )
1✔
218
                for j := 0; j < matchCount; j++ {
2✔
219
                        key, err := i.hashtable.Find(matchesEntries[j].Hash)
1✔
220
                        if err != nil && err == ErrKeyNotFoundAndNoEmptySlots {
1✔
221
                                return err
×
222
                        }
×
223

224
                        i.hashtable.Insert(key, matchesEntries[j])
1✔
225
                }
226
        }
227
        return nil
1✔
228
}
229

230
// Result produces a copy of the current state of the intersection.
231
func (i *Intersection[S]) Result(ordered bool) (*CompactSketch[S], error) {
1✔
232
        if !i.isValid {
2✔
233
                return nil, errors.New("calling Result() before calling Update() is undefined")
1✔
234
        }
1✔
235

236
        entries := make([]entry[S], 0, i.hashtable.numEntries)
1✔
237
        if i.hashtable.numEntries > 0 {
2✔
238
                for _, e := range i.hashtable.entries {
2✔
239
                        if e.Hash != 0 {
2✔
240
                                entries = append(entries, entry[S]{
1✔
241
                                        Hash:    e.Hash,
1✔
242
                                        Summary: e.Summary.Clone().(S),
1✔
243
                                })
1✔
244
                        }
1✔
245
                }
246

247
                if ordered {
2✔
248
                        slices.SortFunc(entries, i.entryLessFunc)
1✔
249
                }
1✔
250
        }
251

252
        seedHash, err := internal.ComputeSeedHash(int64(i.hashtable.seed))
1✔
253
        if err != nil {
1✔
254
                return nil, err
×
255
        }
×
256

257
        return newCompactSketch[S](
1✔
258
                i.hashtable.isEmpty,
1✔
259
                ordered,
1✔
260
                uint16(seedHash),
1✔
261
                i.hashtable.theta,
1✔
262
                entries,
1✔
263
        ), nil
1✔
264
}
265

266
// OrderedResult produces a copy of the current state of the intersection.
267
func (i *Intersection[S]) OrderedResult() (*CompactSketch[S], error) {
1✔
268
        return i.Result(true)
1✔
269
}
1✔
270

271
// HasResult returns true if the state of the intersection is defined.
272
func (i *Intersection[S]) HasResult() bool {
1✔
273
        return i.isValid
1✔
274
}
1✔
275

276
// Policy returns the policy for processing matched summary during intersection.
277
func (i *Intersection[S]) Policy() Policy[S] {
1✔
278
        return i.policy
1✔
279
}
1✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc