• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

Permify / permify / 12882779193

21 Jan 2025 08:19AM UTC coverage: 79.891% (-0.06%) from 79.953%
12882779193

push

github

web-flow
Merge pull request #1928 from Permify/feat-add-cursor-pagination-limit

refactor: update dataReader.go for improved pagination and limit hand…

24 of 43 new or added lines in 3 files covered. (55.81%)

2 existing lines in 1 file now uncovered.

8184 of 10244 relevant lines covered (79.89%)

122.03 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

78.14
/internal/storage/postgres/dataReader.go
1
package postgres
2

3
import (
4
        "context"
5
        "errors"
6
        "log/slog"
7
        "strconv"
8
        "strings"
9

10
        "github.com/jackc/pgx/v5"
11

12
        "github.com/Masterminds/squirrel"
13
        "github.com/golang/protobuf/jsonpb"
14
        "google.golang.org/protobuf/types/known/anypb"
15

16
        "github.com/Permify/permify/internal"
17
        "github.com/Permify/permify/internal/storage"
18
        "github.com/Permify/permify/internal/storage/postgres/snapshot"
19
        "github.com/Permify/permify/internal/storage/postgres/types"
20
        "github.com/Permify/permify/internal/storage/postgres/utils"
21
        "github.com/Permify/permify/pkg/database"
22
        db "github.com/Permify/permify/pkg/database/postgres"
23
        base "github.com/Permify/permify/pkg/pb/base/v1"
24
        "github.com/Permify/permify/pkg/token"
25
)
26

27
// DataReader is a struct which holds a reference to the database, transaction options and a logger.
28
// It is responsible for reading data from the database.
29
type DataReader struct {
30
        database  *db.Postgres  // database is an instance of the PostgreSQL database
31
        txOptions pgx.TxOptions // txOptions specifies the isolation level for database transaction and sets it as read only
32
}
33

34
// NewDataReader is a constructor function for DataReader.
35
// It initializes a new DataReader with a given database, a logger, and sets transaction options to be read-only with Repeatable Read isolation level.
36
func NewDataReader(database *db.Postgres) *DataReader {
13✔
37
        return &DataReader{
13✔
38
                database:  database,                                                             // Set the database to the passed in PostgreSQL instance
13✔
39
                txOptions: pgx.TxOptions{IsoLevel: pgx.ReadCommitted, AccessMode: pgx.ReadOnly}, // Set the transaction options
13✔
40
        }
13✔
41
}
13✔
42

43
// QueryRelationships reads relation tuples from the storage based on the given filter.
44
func (r *DataReader) QueryRelationships(ctx context.Context, tenantID string, filter *base.TupleFilter, snap string, pagination database.CursorPagination) (it *database.TupleIterator, err error) {
2✔
45
        // Start a new trace span and end it when the function exits.
2✔
46
        ctx, span := internal.Tracer.Start(ctx, "data-reader.query-relationships")
2✔
47
        defer span.End()
2✔
48

2✔
49
        slog.DebugContext(ctx, "querying relationships for tenant_id", slog.String("tenant_id", tenantID))
2✔
50

2✔
51
        // Decode the snapshot value.
2✔
52
        var st token.SnapToken
2✔
53
        st, err = snapshot.EncodedToken{Value: snap}.Decode()
2✔
54
        if err != nil {
2✔
55
                return nil, utils.HandleError(ctx, span, err, base.ErrorCode_ERROR_CODE_INTERNAL)
×
56
        }
×
57

58
        // Build the relationships query based on the provided filter and snapshot value.
59
        var args []interface{}
2✔
60
        builder := r.database.Builder.Select("entity_type, entity_id, relation, subject_type, subject_id, subject_relation").From(RelationTuplesTable).Where(squirrel.Eq{"tenant_id": tenantID})
2✔
61
        builder = utils.TuplesFilterQueryForSelectBuilder(builder, filter)
2✔
62
        builder = utils.SnapshotQuery(builder, st.(snapshot.Token).Value.Uint)
2✔
63

2✔
64
        if pagination.Cursor() != "" {
2✔
65
                var t database.ContinuousToken
×
66
                t, err = utils.EncodedContinuousToken{Value: pagination.Cursor()}.Decode()
×
67
                if err != nil {
×
68
                        return nil, utils.HandleError(ctx, span, err, base.ErrorCode_ERROR_CODE_INVALID_CONTINUOUS_TOKEN)
×
69
                }
×
70
                builder = builder.Where(squirrel.GtOrEq{pagination.Sort(): t.(utils.ContinuousToken).Value})
×
71
        }
72

73
        if pagination.Sort() != "" {
2✔
74
                builder = builder.OrderBy(pagination.Sort())
×
75
        }
×
76

77
        // Apply limit if specified in pagination
78
        limit := pagination.Limit()
2✔
79
        if limit > 0 {
2✔
NEW
80
                builder = builder.Limit(uint64(limit))
×
NEW
81
        }
×
82

83
        // Generate the SQL query and arguments.
84
        var query string
2✔
85
        query, args, err = builder.ToSql()
2✔
86
        if err != nil {
2✔
87
                return nil, utils.HandleError(ctx, span, err, base.ErrorCode_ERROR_CODE_SQL_BUILDER)
×
88
        }
×
89

90
        slog.DebugContext(ctx, "generated sql query", slog.String("query", query), "with args", slog.Any("arguments", args))
2✔
91

2✔
92
        // Execute the SQL query and retrieve the result rows.
2✔
93
        var rows pgx.Rows
2✔
94
        rows, err = r.database.ReadPool.Query(ctx, query, args...)
2✔
95
        if err != nil {
2✔
96
                return nil, utils.HandleError(ctx, span, err, base.ErrorCode_ERROR_CODE_EXECUTION)
×
97
        }
×
98
        defer rows.Close()
2✔
99

2✔
100
        // Process the result rows and store the relationships in a TupleCollection.
2✔
101
        collection := database.NewTupleCollection()
2✔
102
        for rows.Next() {
5✔
103
                rt := storage.RelationTuple{}
3✔
104
                err = rows.Scan(&rt.EntityType, &rt.EntityID, &rt.Relation, &rt.SubjectType, &rt.SubjectID, &rt.SubjectRelation)
3✔
105
                if err != nil {
3✔
106
                        return nil, utils.HandleError(ctx, span, err, base.ErrorCode_ERROR_CODE_SCAN)
×
107
                }
×
108
                collection.Add(rt.ToTuple())
3✔
109
        }
110
        if err = rows.Err(); err != nil {
2✔
111
                return nil, utils.HandleError(ctx, span, err, base.ErrorCode_ERROR_CODE_SCAN)
×
112
        }
×
113

114
        slog.DebugContext(ctx, "successfully retrieved relation tuples from the database")
2✔
115

2✔
116
        // Return a TupleIterator created from the TupleCollection.
2✔
117
        return collection.CreateTupleIterator(), nil
2✔
118
}
119

120
// ReadRelationships reads relation tuples from the storage based on the given filter and pagination.
121
func (r *DataReader) ReadRelationships(ctx context.Context, tenantID string, filter *base.TupleFilter, snap string, pagination database.Pagination) (collection *database.TupleCollection, ct database.EncodedContinuousToken, err error) {
10✔
122
        // Start a new trace span and end it when the function exits.
10✔
123
        ctx, span := internal.Tracer.Start(ctx, "data-reader.read-relationships")
10✔
124
        defer span.End()
10✔
125

10✔
126
        slog.DebugContext(ctx, "reading relationships for tenant_id", slog.String("tenant_id", tenantID))
10✔
127

10✔
128
        // Decode the snapshot value.
10✔
129
        var st token.SnapToken
10✔
130
        st, err = snapshot.EncodedToken{Value: snap}.Decode()
10✔
131
        if err != nil {
10✔
132
                return nil, nil, utils.HandleError(ctx, span, err, base.ErrorCode_ERROR_CODE_INTERNAL)
×
133
        }
×
134

135
        // Build the relationships query based on the provided filter, snapshot value, and pagination settings.
136
        builder := r.database.Builder.Select("id, entity_type, entity_id, relation, subject_type, subject_id, subject_relation").From(RelationTuplesTable).Where(squirrel.Eq{"tenant_id": tenantID})
10✔
137
        builder = utils.TuplesFilterQueryForSelectBuilder(builder, filter)
10✔
138
        builder = utils.SnapshotQuery(builder, st.(snapshot.Token).Value.Uint)
10✔
139

10✔
140
        // Apply the pagination token and limit to the query.
10✔
141
        if pagination.Token() != "" {
11✔
142
                var t database.ContinuousToken
1✔
143
                t, err = utils.EncodedContinuousToken{Value: pagination.Token()}.Decode()
1✔
144
                if err != nil {
1✔
145
                        return nil, nil, utils.HandleError(ctx, span, err, base.ErrorCode_ERROR_CODE_INVALID_CONTINUOUS_TOKEN)
×
146
                }
×
147
                var v uint64
1✔
148
                v, err = strconv.ParseUint(t.(utils.ContinuousToken).Value, 10, 64)
1✔
149
                if err != nil {
1✔
150
                        return nil, nil, utils.HandleError(ctx, span, err, base.ErrorCode_ERROR_CODE_INVALID_CONTINUOUS_TOKEN)
×
151
                }
×
152
                builder = builder.Where(squirrel.GtOrEq{"id": v})
1✔
153
        }
154

155
        builder = builder.OrderBy("id")
10✔
156

10✔
157
        if pagination.PageSize() != 0 {
20✔
158
                builder = builder.Limit(uint64(pagination.PageSize() + 1))
10✔
159
        }
10✔
160

161
        // Generate the SQL query and arguments.
162
        var query string
10✔
163
        var args []interface{}
10✔
164
        query, args, err = builder.ToSql()
10✔
165
        if err != nil {
10✔
166
                return nil, database.NewNoopContinuousToken().Encode(), utils.HandleError(ctx, span, err, base.ErrorCode_ERROR_CODE_SQL_BUILDER)
×
167
        }
×
168

169
        slog.DebugContext(ctx, "generated sql query", slog.String("query", query), "with args", slog.Any("arguments", args))
10✔
170

10✔
171
        // Execute the query and retrieve the rows.
10✔
172
        var rows pgx.Rows
10✔
173
        rows, err = r.database.ReadPool.Query(ctx, query, args...)
10✔
174
        if err != nil {
10✔
175
                return nil, database.NewNoopContinuousToken().Encode(), utils.HandleError(ctx, span, err, base.ErrorCode_ERROR_CODE_EXECUTION)
×
176
        }
×
177
        defer rows.Close()
10✔
178

10✔
179
        var lastID uint64
10✔
180

10✔
181
        // Iterate through the rows and scan the result into a RelationTuple struct.
10✔
182
        tuples := make([]*base.Tuple, 0, pagination.PageSize()+1)
10✔
183
        for rows.Next() {
38✔
184
                rt := storage.RelationTuple{}
28✔
185
                err = rows.Scan(&rt.ID, &rt.EntityType, &rt.EntityID, &rt.Relation, &rt.SubjectType, &rt.SubjectID, &rt.SubjectRelation)
28✔
186
                if err != nil {
28✔
187
                        return nil, nil, utils.HandleError(ctx, span, err, base.ErrorCode_ERROR_CODE_SCAN)
×
188
                }
×
189
                lastID = rt.ID
28✔
190
                tuples = append(tuples, rt.ToTuple())
28✔
191
        }
192
        // Check for any errors during iteration.
193
        if err = rows.Err(); err != nil {
10✔
194
                return nil, nil, utils.HandleError(ctx, span, err, base.ErrorCode_ERROR_CODE_SCAN)
×
195
        }
×
196

197
        slog.DebugContext(ctx, "successfully read relation tuples from database")
10✔
198

10✔
199
        // Return the results and encoded continuous token for pagination.
10✔
200
        if pagination.PageSize() != 0 && len(tuples) > int(pagination.PageSize()) {
11✔
201
                return database.NewTupleCollection(tuples[:pagination.PageSize()]...), utils.NewContinuousToken(strconv.FormatUint(lastID, 10)).Encode(), nil
1✔
202
        }
1✔
203

204
        return database.NewTupleCollection(tuples...), database.NewNoopContinuousToken().Encode(), nil
9✔
205
}
206

207
// QuerySingleAttribute retrieves a single attribute from the storage based on the given filter.
208
func (r *DataReader) QuerySingleAttribute(ctx context.Context, tenantID string, filter *base.AttributeFilter, snap string) (attribute *base.Attribute, err error) {
4✔
209
        // Start a new trace span and end it when the function exits.
4✔
210
        ctx, span := internal.Tracer.Start(ctx, "data-reader.query-single-attribute")
4✔
211
        defer span.End()
4✔
212

4✔
213
        slog.DebugContext(ctx, "querying single attribute for tenant_id", slog.String("tenant_id", tenantID))
4✔
214

4✔
215
        // Decode the snapshot value.
4✔
216
        var st token.SnapToken
4✔
217
        st, err = snapshot.EncodedToken{Value: snap}.Decode()
4✔
218
        if err != nil {
4✔
219
                return nil, utils.HandleError(ctx, span, err, base.ErrorCode_ERROR_CODE_INTERNAL)
×
220
        }
×
221

222
        // Build the relationships query based on the provided filter and snapshot value.
223
        var args []interface{}
4✔
224
        builder := r.database.Builder.Select("entity_type, entity_id, attribute, value").From(AttributesTable).Where(squirrel.Eq{"tenant_id": tenantID})
4✔
225
        builder = utils.AttributesFilterQueryForSelectBuilder(builder, filter)
4✔
226
        builder = utils.SnapshotQuery(builder, st.(snapshot.Token).Value.Uint)
4✔
227

4✔
228
        // Generate the SQL query and arguments.
4✔
229
        var query string
4✔
230
        query, args, err = builder.ToSql()
4✔
231
        if err != nil {
4✔
232
                return nil, utils.HandleError(ctx, span, err, base.ErrorCode_ERROR_CODE_SQL_BUILDER)
×
233
        }
×
234

235
        slog.DebugContext(ctx, "generated sql query", slog.String("query", query), "with args", slog.Any("arguments", args))
4✔
236

4✔
237
        row := r.database.ReadPool.QueryRow(ctx, query, args...)
4✔
238

4✔
239
        rt := storage.Attribute{}
4✔
240

4✔
241
        // Suppose you have a struct `rt` with a field `Value` of type `*anypb.Any`.
4✔
242
        var valueStr string
4✔
243

4✔
244
        // Scan the row from the database into the fields of `rt` and `valueStr`.
4✔
245
        err = row.Scan(&rt.EntityType, &rt.EntityID, &rt.Attribute, &valueStr)
4✔
246
        if err != nil {
5✔
247
                if errors.Is(err, pgx.ErrNoRows) {
2✔
248
                        return nil, nil
1✔
249
                } else {
1✔
250
                        return nil, utils.HandleError(ctx, span, err, base.ErrorCode_ERROR_CODE_SCAN)
×
251
                }
×
252
        }
253

254
        // Unmarshal the JSON data from `valueStr` into `rt.Value`.
255
        rt.Value = &anypb.Any{}
3✔
256
        unmarshaler := &jsonpb.Unmarshaler{}
3✔
257
        err = unmarshaler.Unmarshal(strings.NewReader(valueStr), rt.Value)
3✔
258
        if err != nil {
3✔
259
                return nil, utils.HandleError(ctx, span, err, base.ErrorCode_ERROR_CODE_INTERNAL)
×
260
        }
×
261

262
        slog.DebugContext(ctx, "successfully retrieved Single attribute from the database")
3✔
263

3✔
264
        return rt.ToAttribute(), nil
3✔
265
}
266

267
// QueryAttributes reads multiple attributes from the storage based on the given filter.
268
func (r *DataReader) QueryAttributes(ctx context.Context, tenantID string, filter *base.AttributeFilter, snap string, pagination database.CursorPagination) (it *database.AttributeIterator, err error) {
2✔
269
        // Start a new trace span and end it when the function exits.
2✔
270
        ctx, span := internal.Tracer.Start(ctx, "data-reader.query-attributes")
2✔
271
        defer span.End()
2✔
272

2✔
273
        slog.DebugContext(ctx, "querying Attributes for tenant_id", slog.String("tenant_id", tenantID))
2✔
274

2✔
275
        // Decode the snapshot value.
2✔
276
        var st token.SnapToken
2✔
277
        st, err = snapshot.EncodedToken{Value: snap}.Decode()
2✔
278
        if err != nil {
2✔
279
                return nil, utils.HandleError(ctx, span, err, base.ErrorCode_ERROR_CODE_INTERNAL)
×
280
        }
×
281

282
        // Build the attributes query based on the provided filter and snapshot value.
283
        var args []interface{}
2✔
284
        builder := r.database.Builder.Select("entity_type, entity_id, attribute, value").From(AttributesTable).Where(squirrel.Eq{"tenant_id": tenantID})
2✔
285
        builder = utils.AttributesFilterQueryForSelectBuilder(builder, filter)
2✔
286
        builder = utils.SnapshotQuery(builder, st.(snapshot.Token).Value.Uint)
2✔
287

2✔
288
        if pagination.Cursor() != "" {
2✔
289
                var t database.ContinuousToken
×
290
                t, err = utils.EncodedContinuousToken{Value: pagination.Cursor()}.Decode()
×
291
                if err != nil {
×
292
                        return nil, utils.HandleError(ctx, span, err, base.ErrorCode_ERROR_CODE_INVALID_CONTINUOUS_TOKEN)
×
293
                }
×
294
                builder = builder.Where(squirrel.GtOrEq{pagination.Sort(): t.(utils.ContinuousToken).Value})
×
295
        }
296

297
        if pagination.Sort() != "" {
2✔
298
                builder = builder.OrderBy(pagination.Sort())
×
299
        }
×
300

301
        // Apply limit if specified in pagination
302
        limit := pagination.Limit()
2✔
303
        if limit > 0 {
2✔
NEW
304
                builder = builder.Limit(uint64(limit))
×
NEW
305
        }
×
306

307
        // Generate the SQL query and arguments.
308
        var query string
2✔
309
        query, args, err = builder.ToSql()
2✔
310
        if err != nil {
2✔
311
                return nil, utils.HandleError(ctx, span, err, base.ErrorCode_ERROR_CODE_SQL_BUILDER)
×
312
        }
×
313

314
        slog.DebugContext(ctx, "generated sql query", slog.String("query", query), "with args", slog.Any("arguments", args))
2✔
315

2✔
316
        // Execute the SQL query and retrieve the result rows.
2✔
317
        var rows pgx.Rows
2✔
318
        rows, err = r.database.ReadPool.Query(ctx, query, args...)
2✔
319
        if err != nil {
2✔
320
                return nil, utils.HandleError(ctx, span, err, base.ErrorCode_ERROR_CODE_EXECUTION)
×
321
        }
×
322
        defer rows.Close()
2✔
323

2✔
324
        // Process the result rows and store the attributes in an AttributeCollection.
2✔
325
        collection := database.NewAttributeCollection()
2✔
326
        for rows.Next() {
5✔
327
                rt := storage.Attribute{}
3✔
328

3✔
329
                // Suppose you have a struct `rt` with a field `Value` of type `*anypb.Any`.
3✔
330
                var valueStr string
3✔
331

3✔
332
                // Scan the row from the database into the fields of `rt` and `valueStr`.
3✔
333
                err := rows.Scan(&rt.EntityType, &rt.EntityID, &rt.Attribute, &valueStr)
3✔
334
                if err != nil {
3✔
335
                        return nil, utils.HandleError(ctx, span, err, base.ErrorCode_ERROR_CODE_SCAN)
×
336
                }
×
337

338
                // Unmarshal the JSON data from `valueStr` into `rt.Value`.
339
                rt.Value = &anypb.Any{}
3✔
340
                unmarshaler := &jsonpb.Unmarshaler{}
3✔
341
                err = unmarshaler.Unmarshal(strings.NewReader(valueStr), rt.Value)
3✔
342
                if err != nil {
3✔
343
                        return nil, utils.HandleError(ctx, span, err, base.ErrorCode_ERROR_CODE_INTERNAL)
×
344
                }
×
345

346
                collection.Add(rt.ToAttribute())
3✔
347
        }
348
        if err = rows.Err(); err != nil {
2✔
349
                return nil, utils.HandleError(ctx, span, err, base.ErrorCode_ERROR_CODE_SCAN)
×
350
        }
×
351

352
        slog.DebugContext(ctx, "successfully retrieved attributes tuples from the database")
2✔
353

2✔
354
        // Return an AttributeIterator created from the AttributeCollection.
2✔
355
        return collection.CreateAttributeIterator(), nil
2✔
356
}
357

358
// ReadAttributes reads multiple attributes from the storage based on the given filter and pagination.
359
func (r *DataReader) ReadAttributes(ctx context.Context, tenantID string, filter *base.AttributeFilter, snap string, pagination database.Pagination) (collection *database.AttributeCollection, ct database.EncodedContinuousToken, err error) {
10✔
360
        // Start a new trace span and end it when the function exits.
10✔
361
        ctx, span := internal.Tracer.Start(ctx, "data-reader.read-attributes")
10✔
362
        defer span.End()
10✔
363

10✔
364
        slog.DebugContext(ctx, "reading attributes for tenant_id", slog.String("tenant_id", tenantID))
10✔
365

10✔
366
        // Decode the snapshot value.
10✔
367
        var st token.SnapToken
10✔
368
        st, err = snapshot.EncodedToken{Value: snap}.Decode()
10✔
369
        if err != nil {
10✔
370
                return nil, nil, utils.HandleError(ctx, span, err, base.ErrorCode_ERROR_CODE_INTERNAL)
×
371
        }
×
372

373
        // Build the relationships query based on the provided filter, snapshot value, and pagination settings.
374
        builder := r.database.Builder.Select("id, entity_type, entity_id, attribute, value").From(AttributesTable).Where(squirrel.Eq{"tenant_id": tenantID})
10✔
375
        builder = utils.AttributesFilterQueryForSelectBuilder(builder, filter)
10✔
376
        builder = utils.SnapshotQuery(builder, st.(snapshot.Token).Value.Uint)
10✔
377

10✔
378
        // Apply the pagination token and limit to the query.
10✔
379
        if pagination.Token() != "" {
11✔
380
                var t database.ContinuousToken
1✔
381
                t, err = utils.EncodedContinuousToken{Value: pagination.Token()}.Decode()
1✔
382
                if err != nil {
1✔
383
                        return nil, nil, utils.HandleError(ctx, span, err, base.ErrorCode_ERROR_CODE_INVALID_CONTINUOUS_TOKEN)
×
384
                }
×
385
                var v uint64
1✔
386
                v, err = strconv.ParseUint(t.(utils.ContinuousToken).Value, 10, 64)
1✔
387
                if err != nil {
1✔
388
                        return nil, nil, utils.HandleError(ctx, span, err, base.ErrorCode_ERROR_CODE_INVALID_CONTINUOUS_TOKEN)
×
389
                }
×
390
                builder = builder.Where(squirrel.GtOrEq{"id": v})
1✔
391
        }
392

393
        builder = builder.OrderBy("id")
10✔
394

10✔
395
        if pagination.PageSize() != 0 {
20✔
396
                builder = builder.Limit(uint64(pagination.PageSize() + 1))
10✔
397
        }
10✔
398

399
        // Generate the SQL query and arguments.
400
        var query string
10✔
401
        var args []interface{}
10✔
402
        query, args, err = builder.ToSql()
10✔
403
        if err != nil {
10✔
404
                return nil, database.NewNoopContinuousToken().Encode(), utils.HandleError(ctx, span, err, base.ErrorCode_ERROR_CODE_SQL_BUILDER)
×
405
        }
×
406

407
        slog.DebugContext(ctx, "generated sql query", slog.String("query", query), "with args", slog.Any("arguments", args))
10✔
408

10✔
409
        // Execute the query and retrieve the rows.
10✔
410
        var rows pgx.Rows
10✔
411
        rows, err = r.database.ReadPool.Query(ctx, query, args...)
10✔
412
        if err != nil {
10✔
413
                return nil, database.NewNoopContinuousToken().Encode(), utils.HandleError(ctx, span, err, base.ErrorCode_ERROR_CODE_EXECUTION)
×
414
        }
×
415
        defer rows.Close()
10✔
416

10✔
417
        var lastID uint64
10✔
418

10✔
419
        // Iterate through the rows and scan the result into a RelationTuple struct.
10✔
420
        attributes := make([]*base.Attribute, 0, pagination.PageSize()+1)
10✔
421
        for rows.Next() {
31✔
422
                rt := storage.Attribute{}
21✔
423

21✔
424
                // Suppose you have a struct `rt` with a field `Value` of type `*anypb.Any`.
21✔
425
                var valueStr string
21✔
426

21✔
427
                // Scan the row from the database into the fields of `rt` and `valueStr`.
21✔
428
                err := rows.Scan(&rt.ID, &rt.EntityType, &rt.EntityID, &rt.Attribute, &valueStr)
21✔
429
                if err != nil {
21✔
430
                        return nil, nil, utils.HandleError(ctx, span, err, base.ErrorCode_ERROR_CODE_SCAN)
×
431
                }
×
432
                lastID = rt.ID
21✔
433

21✔
434
                // Unmarshal the JSON data from `valueStr` into `rt.Value`.
21✔
435
                rt.Value = &anypb.Any{}
21✔
436
                unmarshaler := &jsonpb.Unmarshaler{}
21✔
437
                err = unmarshaler.Unmarshal(strings.NewReader(valueStr), rt.Value)
21✔
438
                if err != nil {
21✔
439
                        return nil, nil, utils.HandleError(ctx, span, err, base.ErrorCode_ERROR_CODE_INTERNAL)
×
440
                }
×
441

442
                attributes = append(attributes, rt.ToAttribute())
21✔
443
        }
444
        // Check for any errors during iteration.
445
        if err = rows.Err(); err != nil {
10✔
446
                return nil, nil, utils.HandleError(ctx, span, err, base.ErrorCode_ERROR_CODE_SCAN)
×
447
        }
×
448

449
        slog.DebugContext(ctx, "successfully read attributes from the database")
10✔
450

10✔
451
        // Return the results and encoded continuous token for pagination.
10✔
452
        if len(attributes) > int(pagination.PageSize()) {
11✔
453
                return database.NewAttributeCollection(attributes[:pagination.PageSize()]...), utils.NewContinuousToken(strconv.FormatUint(lastID, 10)).Encode(), nil
1✔
454
        }
1✔
455

456
        return database.NewAttributeCollection(attributes...), database.NewNoopContinuousToken().Encode(), nil
9✔
457
}
458

459
// QueryUniqueSubjectReferences reads unique subject references from the storage based on the given filter and pagination.
460
func (r *DataReader) QueryUniqueSubjectReferences(ctx context.Context, tenantID string, subjectReference *base.RelationReference, excluded []string, snap string, pagination database.Pagination) (ids []string, ct database.EncodedContinuousToken, err error) {
4✔
461
        // Start a new trace span and end it when the function exits.
4✔
462
        ctx, span := internal.Tracer.Start(ctx, "data-reader.query-unique-subject-reference")
4✔
463
        defer span.End()
4✔
464

4✔
465
        slog.DebugContext(ctx, "querying unique subject references for tenant_id", slog.String("tenant_id", tenantID))
4✔
466

4✔
467
        // Decode the snapshot value.
4✔
468
        var st token.SnapToken
4✔
469
        st, err = snapshot.EncodedToken{Value: snap}.Decode()
4✔
470
        if err != nil {
4✔
471
                return nil, nil, utils.HandleError(ctx, span, err, base.ErrorCode_ERROR_CODE_INTERNAL)
×
472
        }
×
473

474
        // Build the relationships query based on the provided filter, snapshot value, and pagination settings.
475
        builder := r.database.Builder.
4✔
476
                Select("subject_id"). // This will pick the smallest `id` for each unique `subject_id`.
4✔
477
                From(RelationTuplesTable).
4✔
478
                Where(squirrel.Eq{"tenant_id": tenantID}).
4✔
479
                GroupBy("subject_id")
4✔
480

4✔
481
        // Apply subject filter
4✔
482
        builder = utils.TuplesFilterQueryForSelectBuilder(builder, &base.TupleFilter{
4✔
483
                Subject: &base.SubjectFilter{
4✔
484
                        Type:     subjectReference.GetType(),
4✔
485
                        Relation: subjectReference.GetRelation(),
4✔
486
                },
4✔
487
        })
4✔
488

4✔
489
        // Apply snapshot filter
4✔
490
        builder = utils.SnapshotQuery(builder, st.(snapshot.Token).Value.Uint)
4✔
491

4✔
492
        // Apply exclusion if the list is not empty
4✔
493
        if len(excluded) > 0 {
4✔
494
                builder = builder.Where(squirrel.NotEq{"subject_id": excluded})
×
495
        }
×
496

497
        // Apply the pagination token and limit to the query.
498
        if pagination.Token() != "" {
5✔
499
                var t database.ContinuousToken
1✔
500
                t, err = utils.EncodedContinuousToken{Value: pagination.Token()}.Decode()
1✔
501
                if err != nil {
1✔
502
                        return nil, nil, utils.HandleError(ctx, span, err, base.ErrorCode_ERROR_CODE_INVALID_CONTINUOUS_TOKEN)
×
503
                }
×
504
                builder = builder.Where(squirrel.GtOrEq{"subject_id": t.(utils.ContinuousToken).Value})
1✔
505
        }
506

507
        builder = builder.OrderBy("subject_id")
4✔
508

4✔
509
        if pagination.PageSize() != 0 {
8✔
510
                builder = builder.Limit(uint64(pagination.PageSize() + 1))
4✔
511
        }
4✔
512

513
        // Generate the SQL query and arguments.
514
        var query string
4✔
515
        var args []interface{}
4✔
516
        query, args, err = builder.ToSql()
4✔
517
        if err != nil {
4✔
518
                return nil, database.NewNoopContinuousToken().Encode(), utils.HandleError(ctx, span, err, base.ErrorCode_ERROR_CODE_SQL_BUILDER)
×
519
        }
×
520

521
        slog.DebugContext(ctx, "generated sql query", slog.String("query", query), "with args", slog.Any("arguments", args))
4✔
522

4✔
523
        // Execute the query and retrieve the rows.
4✔
524
        var rows pgx.Rows
4✔
525
        rows, err = r.database.ReadPool.Query(ctx, query, args...)
4✔
526
        if err != nil {
4✔
527
                return nil, database.NewNoopContinuousToken().Encode(), utils.HandleError(ctx, span, err, base.ErrorCode_ERROR_CODE_EXECUTION)
×
528
        }
×
529
        defer rows.Close()
4✔
530

4✔
531
        var lastID string
4✔
532

4✔
533
        // Iterate through the rows and scan the result into a RelationTuple struct.
4✔
534
        subjectIDs := make([]string, 0, pagination.PageSize()+1)
4✔
535
        for rows.Next() {
14✔
536
                var subjectID string
10✔
537
                err = rows.Scan(&subjectID)
10✔
538
                if err != nil {
10✔
539
                        return nil, nil, utils.HandleError(ctx, span, err, base.ErrorCode_ERROR_CODE_INTERNAL)
×
540
                }
×
541

542
                subjectIDs = append(subjectIDs, subjectID)
10✔
543
                lastID = subjectID
10✔
544
        }
545
        // Check for any errors during iteration.
546
        if err = rows.Err(); err != nil {
4✔
547
                return nil, nil, utils.HandleError(ctx, span, err, base.ErrorCode_ERROR_CODE_SCAN)
×
548
        }
×
549

550
        slog.DebugContext(ctx, "successfully retrieved unique subject references from the database")
4✔
551

4✔
552
        // Return the results and encoded continuous token for pagination.
4✔
553
        if pagination.PageSize() != 0 && len(subjectIDs) > int(pagination.PageSize()) {
5✔
554
                return subjectIDs[:pagination.PageSize()], utils.NewContinuousToken(lastID).Encode(), nil
1✔
555
        }
1✔
556

557
        return subjectIDs, database.NewNoopContinuousToken().Encode(), nil
3✔
558
}
559

560
// HeadSnapshot retrieves the latest snapshot token associated with the tenant.
561
func (r *DataReader) HeadSnapshot(ctx context.Context, tenantID string) (token.SnapToken, error) {
1✔
562
        // Start a new trace span and end it when the function exits.
1✔
563
        ctx, span := internal.Tracer.Start(ctx, "data-reader.head-snapshot")
1✔
564
        defer span.End()
1✔
565

1✔
566
        slog.DebugContext(ctx, "getting head snapshot for tenant_id", slog.String("tenant_id", tenantID))
1✔
567

1✔
568
        var xid types.XID8
1✔
569

1✔
570
        // Build the query to find the highest transaction ID associated with the tenant.
1✔
571
        builder := r.database.Builder.Select("id").From(TransactionsTable).Where(squirrel.Eq{"tenant_id": tenantID}).OrderBy("id DESC").Limit(1)
1✔
572
        query, args, err := builder.ToSql()
1✔
573
        if err != nil {
1✔
574
                return nil, utils.HandleError(ctx, span, err, base.ErrorCode_ERROR_CODE_SQL_BUILDER)
×
575
        }
×
576

577
        // TODO: To optimize this query, create the following index concurrently to avoid table locks:
578
        // CREATE INDEX CONCURRENTLY idx_transactions_tenant_id_id ON transactions(tenant_id, id DESC);
579

580
        // Execute the query and retrieve the highest transaction ID.
581
        err = r.database.ReadPool.QueryRow(ctx, query, args...).Scan(&xid)
1✔
582
        if err != nil {
1✔
583
                // If no rows are found, return a snapshot token with a value of 0.
×
584
                if errors.Is(err, pgx.ErrNoRows) {
×
585
                        return snapshot.Token{Value: types.XID8{Uint: 0}}, nil
×
586
                }
×
587
                return nil, utils.HandleError(ctx, span, err, base.ErrorCode_ERROR_CODE_SCAN)
×
588
        }
589

590
        slog.DebugContext(ctx, "successfully retrieved latest snapshot token")
1✔
591

1✔
592
        // Return the latest snapshot token associated with the tenant.
1✔
593
        return snapshot.Token{Value: xid}, nil
1✔
594
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc