• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

apache / datasketches-go / 23520345713

25 Mar 2026 01:23AM UTC coverage: 86.404% (-0.1%) from 86.509%
23520345713

push

github

web-flow
Merge pull request #140 from proost/ci-upload-coverage-directly

ci: upload coverage directly

20743 of 24007 relevant lines covered (86.4%)

0.94 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

87.43
/tuple/intersection.go
1
/*
2
 * Licensed to the Apache Software Foundation (ASF) under one or more
3
 * contributor license agreements.  See the NOTICE file distributed with
4
 * this work for additional information regarding copyright ownership.
5
 * The ASF licenses this file to You under the Apache License, Version 2.0
6
 * (the "License"); you may not use this file except in compliance with
7
 * the License.  You may obtain a copy of the License at
8
 *
9
 *     http://www.apache.org/licenses/LICENSE-2.0
10
 *
11
 * Unless required by applicable law or agreed to in writing, software
12
 * distributed under the License is distributed on an "AS IS" BASIS,
13
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14
 * See the License for the specific language governing permissions and
15
 * limitations under the License.
16
 */
17

18
package tuple
19

20
import (
21
        "errors"
22
        "slices"
23

24
        "github.com/apache/datasketches-go/internal"
25
        "github.com/apache/datasketches-go/theta"
26
)
27

28
type intersectionOptions struct {
29
        seed uint64
30
}
31

32
type IntersectionOptionFunc func(*intersectionOptions)
33

34
// WithIntersectionSeed sets the seed for the hash function.
35
func WithIntersectionSeed(seed uint64) IntersectionOptionFunc {
1✔
36
        return func(i *intersectionOptions) {
2✔
37
                i.seed = seed
1✔
38
        }
1✔
39
}
40

41
// Intersection computes the intersection of sketches.
42
type Intersection[S Summary] struct {
43
        hashtable     *hashtable[S]
44
        policy        Policy[S]
45
        applyFunc     func(S, S) S
46
        entryLessFunc func(a, b entry[S]) int
47
        isValid       bool
48
}
49

50
// NewIntersection creates a new intersection.
51
func NewIntersection[S Summary](policy Policy[S], opts ...IntersectionOptionFunc) *Intersection[S] {
1✔
52
        options := &intersectionOptions{
1✔
53
                seed: theta.DefaultSeed,
1✔
54
        }
1✔
55
        for _, opt := range opts {
2✔
56
                opt(options)
1✔
57
        }
1✔
58

59
        return &Intersection[S]{
1✔
60
                hashtable: newHashtable[S](
1✔
61
                        0, 0, theta.ResizeX1, 1.0, theta.MaxTheta, options.seed, false,
1✔
62
                ),
1✔
63
                entryLessFunc: func(a, b entry[S]) int {
2✔
64
                        if a.Hash < b.Hash {
2✔
65
                                return -1
1✔
66
                        } else if a.Hash > b.Hash {
3✔
67
                                return 1
1✔
68
                        }
1✔
69
                        return 0
×
70
                },
71
                policy:  policy,
72
                isValid: false,
73
        }
74
}
75

76
// NewIntersectionWithSummaryMergeFunc creates a new intersection that uses a function to merge summaries.
77
// This is useful for value-type summaries where Policy.Apply cannot mutate the internal summary.
78
func NewIntersectionWithSummaryMergeFunc[S Summary](
79
        applyFunc func(S, S) S, opts ...IntersectionOptionFunc,
80
) *Intersection[S] {
1✔
81
        options := &intersectionOptions{
1✔
82
                seed: theta.DefaultSeed,
1✔
83
        }
1✔
84
        for _, opt := range opts {
1✔
85
                opt(options)
×
86
        }
×
87

88
        return &Intersection[S]{
1✔
89
                hashtable: newHashtable[S](
1✔
90
                        0, 0, theta.ResizeX1, 1.0, theta.MaxTheta, options.seed, false,
1✔
91
                ),
1✔
92
                entryLessFunc: func(a, b entry[S]) int {
2✔
93
                        if a.Hash < b.Hash {
2✔
94
                                return -1
1✔
95
                        } else if a.Hash > b.Hash {
3✔
96
                                return 1
1✔
97
                        }
1✔
98
                        return 0
×
99
                },
100
                applyFunc: applyFunc,
101
                isValid:   false,
102
        }
103
}
104

105
// Update updates the intersection with a given sketch.
106
//
107
// If the summary contains string values and the caller cares about
108
// cross-language compatibility, it is the caller's responsibility to ensure
109
// that both sketches use string values encoded as valid UTF-8.
110
func (i *Intersection[S]) Update(sketch Sketch[S]) error {
1✔
111
        if i.hashtable.isEmpty {
2✔
112
                return nil
1✔
113
        }
1✔
114

115
        seedHash, err := internal.ComputeSeedHash(int64(i.hashtable.seed))
1✔
116
        if err != nil {
1✔
117
                return err
×
118
        }
×
119
        sketchSeedHash, err := sketch.SeedHash()
1✔
120
        if err != nil {
1✔
121
                return err
×
122
        }
×
123
        if !sketch.IsEmpty() && sketchSeedHash != uint16(seedHash) {
2✔
124
                return errors.New("seed hash mismatch")
1✔
125
        }
1✔
126

127
        i.hashtable.isEmpty = i.hashtable.isEmpty || sketch.IsEmpty()
1✔
128
        if i.hashtable.isEmpty {
2✔
129
                i.hashtable.theta = theta.MaxTheta
1✔
130
        } else {
2✔
131
                i.hashtable.theta = min(i.hashtable.theta, sketch.Theta64())
1✔
132
        }
1✔
133

134
        if i.isValid && i.hashtable.numEntries == 0 {
2✔
135
                return nil
1✔
136
        }
1✔
137

138
        if sketch.NumRetained() == 0 {
2✔
139
                i.isValid = true
1✔
140
                i.hashtable = newHashtable[S](
1✔
141
                        0, 0, theta.ResizeX1, 1.0, i.hashtable.theta, i.hashtable.seed, i.hashtable.isEmpty,
1✔
142
                )
1✔
143
                return nil
1✔
144
        }
1✔
145

146
        if !i.isValid { // first update, copy or move incoming sketch
2✔
147
                i.isValid = true
1✔
148

1✔
149
                lgSize := internal.LgSizeFromCount(sketch.NumRetained(), rebuildThreshold)
1✔
150
                i.hashtable = newHashtable[S](lgSize, lgSize-1, theta.ResizeX1, 1.0, i.hashtable.theta, i.hashtable.seed, i.hashtable.isEmpty)
1✔
151

1✔
152
                for hash, summary := range sketch.All() {
2✔
153
                        idx, err := i.hashtable.Find(hash)
1✔
154
                        if err == nil {
1✔
155
                                return errors.New("duplicate key, possibly corrupted input sketch")
×
156
                        }
×
157

158
                        i.hashtable.Insert(idx, entry[S]{
1✔
159
                                Hash:    hash,
1✔
160
                                Summary: summary,
1✔
161
                        })
1✔
162
                }
163

164
                if i.hashtable.numEntries != sketch.NumRetained() {
1✔
165
                        return errors.New("num entries mismatch, possibly corrupted input sketch")
×
166
                }
×
167

168
                return nil
1✔
169
        }
170

171
        // intersection
172
        var (
1✔
173
                maxMatches     = min(i.hashtable.numEntries, sketch.NumRetained())
1✔
174
                matchesEntries = make([]entry[S], 0, maxMatches)
1✔
175
                matchCount     = 0
1✔
176
                count          = 0
1✔
177
        )
1✔
178
        for hash, summary := range sketch.All() {
2✔
179
                if hash < i.hashtable.theta {
2✔
180
                        key, err := i.hashtable.Find(hash)
1✔
181
                        if err == nil {
2✔
182
                                if uint32(matchCount) == maxMatches {
1✔
183
                                        return errors.New("max matches exceeded, possibly corrupted input sketch")
×
184
                                }
×
185

186
                                if i.applyFunc != nil {
2✔
187
                                        i.hashtable.entries[key].Summary = i.applyFunc(i.hashtable.entries[key].Summary, summary)
1✔
188
                                } else {
2✔
189
                                        i.policy.Apply(i.hashtable.entries[key].Summary, summary)
1✔
190
                                }
1✔
191

192
                                matchesEntries = append(matchesEntries, i.hashtable.entries[key])
1✔
193
                                matchCount++
1✔
194
                        }
195
                } else if sketch.IsOrdered() {
2✔
196
                        // early stop
1✔
197
                        break
1✔
198
                }
199

200
                count++
1✔
201
        }
202

203
        if count > int(sketch.NumRetained()) {
1✔
204
                return errors.New("more keys than expected, possibly corrupted input sketch")
×
205
        }
×
206
        if !sketch.IsOrdered() && count < int(sketch.NumRetained()) {
1✔
207
                return errors.New("fewer keys than expected, possibly corrupted input sketch")
×
208
        }
×
209

210
        if matchCount == 0 {
2✔
211
                i.hashtable = newHashtable[S](
1✔
212
                        0, 0, theta.ResizeX1, 1.0, i.hashtable.theta, i.hashtable.seed, i.hashtable.isEmpty,
1✔
213
                )
1✔
214
                if i.hashtable.theta == theta.MaxTheta {
2✔
215
                        i.hashtable.isEmpty = true
1✔
216
                }
1✔
217
        } else {
1✔
218
                lgSize := internal.LgSizeFromCount(uint32(matchCount), rebuildThreshold)
1✔
219
                i.hashtable = newHashtable[S](
1✔
220
                        lgSize, lgSize-1, theta.ResizeX1, 1.0, i.hashtable.theta, i.hashtable.seed, i.hashtable.isEmpty,
1✔
221
                )
1✔
222
                for j := 0; j < matchCount; j++ {
2✔
223
                        key, err := i.hashtable.Find(matchesEntries[j].Hash)
1✔
224
                        if err != nil && err == ErrKeyNotFoundAndNoEmptySlots {
1✔
225
                                return err
×
226
                        }
×
227

228
                        i.hashtable.Insert(key, matchesEntries[j])
1✔
229
                }
230
        }
231
        return nil
1✔
232
}
233

234
// Result produces a copy of the current state of the intersection.
235
func (i *Intersection[S]) Result(ordered bool) (*CompactSketch[S], error) {
1✔
236
        if !i.isValid {
2✔
237
                return nil, errors.New("calling Result() before calling Update() is undefined")
1✔
238
        }
1✔
239

240
        entries := make([]entry[S], 0, i.hashtable.numEntries)
1✔
241
        if i.hashtable.numEntries > 0 {
2✔
242
                for _, e := range i.hashtable.entries {
2✔
243
                        if e.Hash != 0 {
2✔
244
                                entries = append(entries, entry[S]{
1✔
245
                                        Hash:    e.Hash,
1✔
246
                                        Summary: e.Summary.Clone().(S),
1✔
247
                                })
1✔
248
                        }
1✔
249
                }
250

251
                if ordered {
2✔
252
                        slices.SortFunc(entries, i.entryLessFunc)
1✔
253
                }
1✔
254
        }
255

256
        seedHash, err := internal.ComputeSeedHash(int64(i.hashtable.seed))
1✔
257
        if err != nil {
1✔
258
                return nil, err
×
259
        }
×
260

261
        return newCompactSketch[S](
1✔
262
                i.hashtable.isEmpty,
1✔
263
                ordered,
1✔
264
                uint16(seedHash),
1✔
265
                i.hashtable.theta,
1✔
266
                entries,
1✔
267
        ), nil
1✔
268
}
269

270
// OrderedResult produces a copy of the current state of the intersection.
271
func (i *Intersection[S]) OrderedResult() (*CompactSketch[S], error) {
1✔
272
        return i.Result(true)
1✔
273
}
1✔
274

275
// HasResult returns true if the state of the intersection is defined.
276
func (i *Intersection[S]) HasResult() bool {
1✔
277
        return i.isValid
1✔
278
}
1✔
279

280
// Policy returns the policy for processing matched summary during intersection.
281
func (i *Intersection[S]) Policy() Policy[S] {
1✔
282
        return i.policy
1✔
283
}
1✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc