• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

Permify / permify / 10479058678

20 Aug 2024 08:54PM UTC coverage: 80.175% (+0.07%) from 80.108%
10479058678

push

github

web-flow
Merge pull request #1470 from Permify/feat/pagination-sync-bulkchecker

feat(bulkchecker): add pagination limit and improve synchronization

464 of 554 new or added lines in 16 files covered. (83.75%)

4 existing lines in 2 files now uncovered.

7959 of 9927 relevant lines covered (80.18%)

115.9 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

73.36
/internal/storage/memory/dataReader.go
1
package memory
2

3
import (
4
        "context"
5
        "errors"
6
        "sort"
7
        "strconv"
8
        "time"
9

10
        "github.com/hashicorp/go-memdb"
11

12
        "github.com/Permify/permify/internal/storage/memory/constants"
13

14
        "github.com/Permify/permify/internal/storage"
15
        "github.com/Permify/permify/internal/storage/memory/snapshot"
16
        "github.com/Permify/permify/internal/storage/memory/utils"
17
        "github.com/Permify/permify/pkg/database"
18
        db "github.com/Permify/permify/pkg/database/memory"
19
        base "github.com/Permify/permify/pkg/pb/base/v1"
20
        "github.com/Permify/permify/pkg/token"
21
)
22

23
// DataReader -
24
type DataReader struct {
25
        database *db.Memory
26
}
27

28
// NewDataReader - Creates a new DataReader
29
func NewDataReader(database *db.Memory) *DataReader {
8✔
30
        return &DataReader{
8✔
31
                database: database,
8✔
32
        }
8✔
33
}
8✔
34

35
// QueryRelationships queries the database for relationships based on the provided filter.
36
func (r *DataReader) QueryRelationships(_ context.Context, tenantID string, filter *base.TupleFilter, _ string, pagination database.CursorPagination) (it *database.TupleIterator, err error) {
1✔
37
        txn := r.database.DB.Txn(false)
1✔
38
        defer txn.Abort()
1✔
39

1✔
40
        var lowerBound string
1✔
41

1✔
42
        if pagination.Cursor() != "" {
1✔
NEW
43
                var t database.ContinuousToken
×
NEW
44
                t, err = utils.EncodedContinuousToken{Value: pagination.Cursor()}.Decode()
×
NEW
45
                if err != nil {
×
NEW
46
                        return nil, err
×
NEW
47
                }
×
NEW
48
                lowerBound = t.(utils.ContinuousToken).Value
×
49
        }
50

51
        // Get the index and arguments based on the filter.
52
        index, args := utils.GetRelationTuplesIndexNameAndArgsByFilters(tenantID, filter)
1✔
53

1✔
54
        // Get the result iterator based on the index and arguments.
1✔
55
        var result memdb.ResultIterator
1✔
56
        result, err = txn.LowerBound(constants.RelationTuplesTable, index, args...)
1✔
57
        if err != nil {
1✔
58
                return nil, errors.New(base.ErrorCode_ERROR_CODE_EXECUTION.String())
×
59
        }
×
60

61
        // Filter the result iterator and add the tuples to the collection.
62
        tup := make([]storage.RelationTuple, 0, 10)
1✔
63
        fit := memdb.NewFilterIterator(result, utils.FilterRelationTuplesQuery(tenantID, filter))
1✔
64
        for obj := fit.Next(); obj != nil; obj = fit.Next() {
3✔
65
                t, ok := obj.(storage.RelationTuple)
2✔
66
                if !ok {
2✔
67
                        return nil, errors.New(base.ErrorCode_ERROR_CODE_TYPE_CONVERSATION.String())
×
68
                }
×
69
                tup = append(tup, t)
2✔
70
        }
71

72
        // Sort tuples based on the provided order field
73
        sort.Slice(tup, func(i, j int) bool {
2✔
74
                switch pagination.Sort() {
1✔
NEW
75
                case "entity_id":
×
NEW
76
                        return tup[i].EntityID < tup[j].EntityID
×
NEW
77
                case "subject_id":
×
NEW
78
                        return tup[i].SubjectID < tup[j].SubjectID
×
79
                default:
1✔
80
                        return false // No sorting if order field is invalid
1✔
81
                }
82
        })
83

84
        var tuples []*base.Tuple
1✔
85
        for _, t := range tup {
3✔
86
                switch pagination.Sort() {
2✔
NEW
87
                case "entity_id":
×
NEW
88
                        if t.EntityID >= lowerBound {
×
NEW
89
                                tuples = append(tuples, t.ToTuple())
×
NEW
90
                        }
×
NEW
91
                case "subject_id":
×
NEW
92
                        if t.SubjectID >= lowerBound {
×
NEW
93
                                tuples = append(tuples, t.ToTuple())
×
NEW
94
                        }
×
95
                default:
2✔
96
                        tuples = append(tuples, t.ToTuple())
2✔
97
                }
98
        }
99

100
        return database.NewTupleCollection(tuples...).CreateTupleIterator(), nil
1✔
101
}
102

103
// ReadRelationships reads relationships from the database taking into account the pagination.
104
func (r *DataReader) ReadRelationships(_ context.Context, tenantID string, filter *base.TupleFilter, _ string, pagination database.Pagination) (collection *database.TupleCollection, ct database.EncodedContinuousToken, err error) {
4✔
105
        txn := r.database.DB.Txn(false)
4✔
106
        defer txn.Abort()
4✔
107

4✔
108
        var lowerBound uint64
4✔
109
        if pagination.Token() != "" {
5✔
110
                var t database.ContinuousToken
1✔
111
                t, err = utils.EncodedContinuousToken{Value: pagination.Token()}.Decode()
1✔
112
                if err != nil {
1✔
113
                        return nil, database.NewNoopContinuousToken().Encode(), err
×
114
                }
×
115
                lowerBound, err = strconv.ParseUint(t.(utils.ContinuousToken).Value, 10, 64)
1✔
116
                if err != nil {
1✔
117
                        return nil, database.NewNoopContinuousToken().Encode(), errors.New(base.ErrorCode_ERROR_CODE_INVALID_CONTINUOUS_TOKEN.String())
×
118
                }
×
119
        }
120

121
        index, args := utils.GetRelationTuplesIndexNameAndArgsByFilters(tenantID, filter)
4✔
122

4✔
123
        // Get the result iterator using lower bound.
4✔
124
        var result memdb.ResultIterator
4✔
125
        result, err = txn.LowerBound(constants.RelationTuplesTable, index, args...)
4✔
126
        if err != nil {
4✔
127
                return nil, database.NewNoopContinuousToken().Encode(), errors.New(base.ErrorCode_ERROR_CODE_EXECUTION.String())
×
128
        }
×
129

130
        // Filter the result iterator and add the tuples to the array.
131
        tup := make([]storage.RelationTuple, 0, 10)
4✔
132
        fit := memdb.NewFilterIterator(result, utils.FilterRelationTuplesQuery(tenantID, filter))
4✔
133
        for obj := fit.Next(); obj != nil; obj = fit.Next() {
20✔
134
                t, ok := obj.(storage.RelationTuple)
16✔
135
                if !ok {
16✔
136
                        return nil, database.NewNoopContinuousToken().Encode(), errors.New(base.ErrorCode_ERROR_CODE_TYPE_CONVERSATION.String())
×
137
                }
×
138
                tup = append(tup, t)
16✔
139
        }
140

141
        // Sort the tuples and append them to the collection.
142
        sort.Slice(tup, func(i, j int) bool {
16✔
143
                return tup[i].ID < tup[j].ID
12✔
144
        })
12✔
145

146
        tuples := make([]*base.Tuple, 0, pagination.PageSize()+1)
4✔
147
        for _, t := range tup {
18✔
148
                if t.ID >= lowerBound {
26✔
149
                        tuples = append(tuples, t.ToTuple())
12✔
150
                        if pagination.PageSize() != 0 && len(tuples) > int(pagination.PageSize()) {
13✔
151
                                return database.NewTupleCollection(tuples[:pagination.PageSize()]...), utils.NewContinuousToken(strconv.FormatUint(t.ID, 10)).Encode(), nil
1✔
152
                        }
1✔
153
                }
154
        }
155

156
        return database.NewTupleCollection(tuples...), database.NewNoopContinuousToken().Encode(), nil
3✔
157
}
158

159
// QuerySingleAttribute queries the database for a single attribute based on the provided filter.
160
func (r *DataReader) QuerySingleAttribute(_ context.Context, tenantID string, filter *base.AttributeFilter, _ string) (attribute *base.Attribute, err error) {
2✔
161
        txn := r.database.DB.Txn(false)
2✔
162
        defer txn.Abort()
2✔
163

2✔
164
        // Get the index and arguments based on the filter.
2✔
165
        index, args := utils.GetAttributesIndexNameAndArgsByFilters(tenantID, filter)
2✔
166

2✔
167
        // Get the result iterator based on the index and arguments.
2✔
168
        var result memdb.ResultIterator
2✔
169
        result, err = txn.Get(constants.AttributesTable, index, args...)
2✔
170
        if err != nil {
2✔
171
                return nil, errors.New(base.ErrorCode_ERROR_CODE_EXECUTION.String())
×
172
        }
×
173

174
        // Filter the result iterator and add the attributes to the collection.
175
        fit := memdb.NewFilterIterator(result, utils.FilterAttributesQuery(tenantID, filter))
2✔
176
        for obj := fit.Next(); obj != nil; {
3✔
177
                t, ok := obj.(storage.Attribute)
1✔
178
                if !ok {
1✔
179
                        return nil, errors.New(base.ErrorCode_ERROR_CODE_TYPE_CONVERSATION.String())
×
180
                }
×
181
                return t.ToAttribute(), nil
1✔
182
        }
183

184
        return nil, nil
1✔
185
}
186

187
// QueryAttributes queries the database for attributes based on the provided filter.
188
func (r *DataReader) QueryAttributes(_ context.Context, tenantID string, filter *base.AttributeFilter, _ string) (iterator *database.AttributeIterator, err error) {
1✔
189
        txn := r.database.DB.Txn(false)
1✔
190
        defer txn.Abort()
1✔
191

1✔
192
        collection := database.NewAttributeCollection()
1✔
193

1✔
194
        // Get the index and arguments based on the filter.
1✔
195
        index, args := utils.GetAttributesIndexNameAndArgsByFilters(tenantID, filter)
1✔
196

1✔
197
        // Get the result iterator based on the index and arguments.
1✔
198
        var result memdb.ResultIterator
1✔
199
        result, err = txn.Get(constants.AttributesTable, index, args...)
1✔
200
        if err != nil {
1✔
201
                return nil, errors.New(base.ErrorCode_ERROR_CODE_EXECUTION.String())
×
202
        }
×
203

204
        // Filter the result iterator and add the attributes to the collection.
205
        fit := memdb.NewFilterIterator(result, utils.FilterAttributesQuery(tenantID, filter))
1✔
206
        for obj := fit.Next(); obj != nil; obj = fit.Next() {
3✔
207
                t, ok := obj.(storage.Attribute)
2✔
208
                if !ok {
2✔
209
                        return nil, errors.New(base.ErrorCode_ERROR_CODE_TYPE_CONVERSATION.String())
×
210
                }
×
211
                collection.Add(t.ToAttribute())
2✔
212
        }
213

214
        return collection.CreateAttributeIterator(), nil
1✔
215
}
216

217
// ReadAttributes reads attributes from the database taking into account the pagination.
218
func (r *DataReader) ReadAttributes(_ context.Context, tenantID string, filter *base.AttributeFilter, _ string, pagination database.Pagination) (collection *database.AttributeCollection, ct database.EncodedContinuousToken, err error) {
4✔
219
        txn := r.database.DB.Txn(false)
4✔
220
        defer txn.Abort()
4✔
221

4✔
222
        var lowerBound uint64
4✔
223
        if pagination.Token() != "" {
5✔
224
                var t database.ContinuousToken
1✔
225
                t, err = utils.EncodedContinuousToken{Value: pagination.Token()}.Decode()
1✔
226
                if err != nil {
1✔
227
                        return nil, database.NewNoopContinuousToken().Encode(), err
×
228
                }
×
229
                lowerBound, err = strconv.ParseUint(t.(utils.ContinuousToken).Value, 10, 64)
1✔
230
                if err != nil {
1✔
231
                        return nil, database.NewNoopContinuousToken().Encode(), errors.New(base.ErrorCode_ERROR_CODE_INVALID_CONTINUOUS_TOKEN.String())
×
232
                }
×
233
        }
234

235
        // Get the index and arguments based on the filter.
236
        index, args := utils.GetAttributesIndexNameAndArgsByFilters(tenantID, filter)
4✔
237

4✔
238
        // Get the result iterator using lower bound.
4✔
239
        var result memdb.ResultIterator
4✔
240
        result, err = txn.LowerBound(constants.AttributesTable, index, args...)
4✔
241
        if err != nil {
4✔
242
                return nil, database.NewNoopContinuousToken().Encode(), errors.New(base.ErrorCode_ERROR_CODE_EXECUTION.String())
×
243
        }
×
244

245
        // Filter the result iterator and add the attributes to the array.
246
        attr := make([]storage.Attribute, 0, 10)
4✔
247
        fit := memdb.NewFilterIterator(result, utils.FilterAttributesQuery(tenantID, filter))
4✔
248
        for obj := fit.Next(); obj != nil; obj = fit.Next() {
19✔
249
                a, ok := obj.(storage.Attribute)
15✔
250
                if !ok {
15✔
251
                        return nil, database.NewNoopContinuousToken().Encode(), errors.New(base.ErrorCode_ERROR_CODE_TYPE_CONVERSATION.String())
×
252
                }
×
253
                attr = append(attr, a)
15✔
254
        }
255

256
        // Sort the attributes and append them to the collection.
257
        sort.Slice(attr, func(i, j int) bool {
25✔
258
                return attr[i].ID < attr[j].ID
21✔
259
        })
21✔
260

261
        attributes := make([]*base.Attribute, 0, pagination.PageSize()+1)
4✔
262
        for _, t := range attr {
17✔
263
                if t.ID >= lowerBound {
24✔
264
                        attributes = append(attributes, t.ToAttribute())
11✔
265
                        if pagination.PageSize() != 0 && len(attributes) > int(pagination.PageSize()) {
12✔
266
                                return database.NewAttributeCollection(attributes[:pagination.PageSize()]...), utils.NewContinuousToken(strconv.FormatUint(t.ID, 10)).Encode(), nil
1✔
267
                        }
1✔
268
                }
269
        }
270

271
        return database.NewAttributeCollection(attributes...), database.NewNoopContinuousToken().Encode(), nil
3✔
272
}
273

274
// QueryUniqueEntities is a function that searches for unique entities in a given database.
275
func (r *DataReader) QueryUniqueEntities(_ context.Context, tenantID, name, _ string, pagination database.Pagination) (ids []string, ct database.EncodedContinuousToken, err error) {
2✔
276
        // Starts a new read-only transaction
2✔
277
        txn := r.database.DB.Txn(false)
2✔
278
        defer txn.Abort()
2✔
279

2✔
280
        var lowerBound string
2✔
281
        if pagination.Token() != "" {
2✔
NEW
282
                var t database.ContinuousToken
×
NEW
283
                t, err := utils.EncodedContinuousToken{Value: pagination.Token()}.Decode()
×
NEW
284
                if err != nil {
×
NEW
285
                        return nil, database.NewNoopContinuousToken().Encode(), err
×
NEW
286
                }
×
NEW
287
                lowerBound = t.(utils.ContinuousToken).Value
×
288
        }
289

290
        var tupleIds []string
2✔
291

2✔
292
        // Query the database for entities matching the given tenant ID and name
2✔
293
        var entityResult memdb.ResultIterator
2✔
294
        entityResult, err = txn.LowerBound(constants.RelationTuplesTable, "entity-type-index", tenantID, name)
2✔
295
        if err != nil {
2✔
296
                // Returns an error if execution fails
×
297
                return nil, database.NewNoopContinuousToken().Encode(), errors.New(base.ErrorCode_ERROR_CODE_EXECUTION.String())
×
298
        }
×
299

300
        // Iterates over the resulting entities and append their IDs to the tupleIds slice
301
        for obj := entityResult.Next(); obj != nil; obj = entityResult.Next() {
14✔
302
                t, ok := obj.(storage.RelationTuple)
12✔
303
                if !ok {
12✔
304
                        // Returns an error if type conversion fails
×
305
                        return nil, database.NewNoopContinuousToken().Encode(), errors.New(base.ErrorCode_ERROR_CODE_TYPE_CONVERSATION.String())
×
306
                }
×
307
                tupleIds = append(tupleIds, t.EntityID)
12✔
308
        }
309

310
        var attributeIds []string
2✔
311

2✔
312
        // Query the database for attributes matching the given tenant ID and name
2✔
313
        var attributeResult memdb.ResultIterator
2✔
314
        attributeResult, err = txn.LowerBound(constants.AttributesTable, "entity-type-index", tenantID, name)
2✔
315
        if err != nil {
2✔
316
                // Returns an error if execution fails
×
317
                return nil, database.NewNoopContinuousToken().Encode(), errors.New(base.ErrorCode_ERROR_CODE_EXECUTION.String())
×
318
        }
×
319

320
        // Iterates over the resulting attributes and append their IDs to the tupleIds slice
321
        for obj := attributeResult.Next(); obj != nil; obj = attributeResult.Next() {
15✔
322
                t, ok := obj.(storage.Attribute)
13✔
323
                if !ok {
13✔
324
                        // Returns an error if type conversion fails
×
325
                        return nil, database.NewNoopContinuousToken().Encode(), errors.New(base.ErrorCode_ERROR_CODE_TYPE_CONVERSATION.String())
×
326
                }
×
327
                attributeIds = append(attributeIds, t.EntityID)
13✔
328
        }
329

330
        all := append(tupleIds, attributeIds...)
2✔
331

2✔
332
        // Sort the combined slice
2✔
333
        sort.Slice(all, func(i, j int) bool {
65✔
334
                return all[i] < all[j]
63✔
335
        })
63✔
336

337
        mp := make(map[string]bool)
2✔
338
        var lastID string
2✔
339

2✔
340
        for _, b := range all {
27✔
341
                if _, exists := mp[b]; !exists && b >= lowerBound {
44✔
342
                        ids = append(ids, b)
19✔
343
                        mp[b] = true
19✔
344

19✔
345
                        // Capture the last ID after adding pagesize + 1 elements
19✔
346
                        if len(ids) == int(pagination.PageSize())+1 {
21✔
347
                                lastID = b
2✔
348
                        }
2✔
349

350
                        // Stop appending if we've reached the page size
351
                        if pagination.PageSize() != 0 && len(ids) > int(pagination.PageSize()) {
19✔
NEW
352
                                return ids[:pagination.PageSize()], utils.NewContinuousToken(lastID).Encode(), nil
×
NEW
353
                        }
×
354
                }
355
        }
356

357
        // If page size is not exceeded, return the entire list with a noop token
358
        return ids, database.NewNoopContinuousToken().Encode(), nil
2✔
359
}
360

361
// QueryUniqueSubjectReferences is a function that searches for unique subject references in a given database.
362
func (r *DataReader) QueryUniqueSubjectReferences(_ context.Context, tenantID string, subjectReference *base.RelationReference, _ string, pagination database.Pagination) (ids []string, _ database.EncodedContinuousToken, err error) {
2✔
363
        txn := r.database.DB.Txn(false)
2✔
364
        defer txn.Abort()
2✔
365

2✔
366
        var lowerBound string
2✔
367
        if pagination.Token() != "" {
2✔
NEW
368
                var t database.ContinuousToken
×
NEW
369
                t, err := utils.EncodedContinuousToken{Value: pagination.Token()}.Decode()
×
NEW
370
                if err != nil {
×
NEW
371
                        return nil, database.NewNoopContinuousToken().Encode(), err
×
NEW
372
                }
×
NEW
373
                lowerBound = t.(utils.ContinuousToken).Value
×
374
        }
375

376
        // Get the result iterator based on the index and arguments.
377
        result, err := txn.LowerBound(constants.RelationTuplesTable, "id")
2✔
378
        if err != nil {
2✔
379
                return nil, database.NewNoopContinuousToken().Encode(), errors.New(base.ErrorCode_ERROR_CODE_EXECUTION.String())
×
380
        }
×
381

382
        var subjectIDs []string
2✔
383

2✔
384
        // Filter the result iterator and add the tuples to the collection.
2✔
385
        fit := memdb.NewFilterIterator(result, utils.FilterRelationTuplesQuery(tenantID, &base.TupleFilter{
2✔
386
                Subject: &base.SubjectFilter{
2✔
387
                        Type:     subjectReference.GetType(),
2✔
388
                        Relation: subjectReference.GetRelation(),
2✔
389
                },
2✔
390
        }))
2✔
391
        for obj := fit.Next(); obj != nil; obj = fit.Next() {
8✔
392
                t, ok := obj.(storage.RelationTuple)
6✔
393
                if !ok {
6✔
394
                        return nil, database.NewNoopContinuousToken().Encode(), errors.New(base.ErrorCode_ERROR_CODE_TYPE_CONVERSATION.String())
×
395
                }
×
396
                subjectIDs = append(subjectIDs, t.SubjectID)
6✔
397
        }
398

399
        // Sort the tuples and append them to the collection.
400
        sort.Slice(subjectIDs, func(i, j int) bool {
9✔
401
                return subjectIDs[i] < subjectIDs[j]
7✔
402
        })
7✔
403

404
        mp := make(map[string]bool)
2✔
405
        var lastID string
2✔
406

2✔
407
        for _, b := range subjectIDs {
8✔
408
                if _, exists := mp[b]; !exists && b >= lowerBound {
11✔
409
                        ids = append(ids, b)
5✔
410
                        mp[b] = true
5✔
411

5✔
412
                        // Capture the last ID after adding pagesize + 1 elements
5✔
413
                        if len(ids) == int(pagination.PageSize())+1 {
7✔
414
                                lastID = b
2✔
415
                        }
2✔
416

417
                        // Stop appending if we've reached the page size
418
                        if pagination.PageSize() != 0 && len(ids) > int(pagination.PageSize()) {
5✔
NEW
419
                                return ids[:pagination.PageSize()], utils.NewContinuousToken(lastID).Encode(), nil
×
NEW
420
                        }
×
421
                }
422
        }
423

424
        return ids, database.NewNoopContinuousToken().Encode(), nil
2✔
425
}
426

427
// HeadSnapshot - Reads the latest version of the snapshot from the repository.
428
func (r *DataReader) HeadSnapshot(_ context.Context, _ string) (token.SnapToken, error) {
×
429
        return snapshot.NewToken(time.Now()), nil
×
430
}
×
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc