• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

Permify / permify / 6945401364

21 Nov 2023 02:42PM UTC coverage: 70.338%. Remained the same
6945401364

push

github

web-flow
Merge pull request #856 from Permify/next

Next

6047 of 8597 relevant lines covered (70.34%)

52.67 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

53.4
/internal/storage/postgres/dataReader.go
1
package postgres
2

3
import (
4
        "context"
5
        "database/sql"
6
        "errors"
7
        "fmt"
8
        "log/slog"
9
        "strconv"
10
        "strings"
11

12
        "github.com/Masterminds/squirrel"
13
        "github.com/golang/protobuf/jsonpb"
14
        "google.golang.org/protobuf/types/known/anypb"
15

16
        "go.opentelemetry.io/otel/codes"
17

18
        "github.com/Permify/permify/internal/storage"
19
        "github.com/Permify/permify/internal/storage/postgres/snapshot"
20
        "github.com/Permify/permify/internal/storage/postgres/types"
21
        "github.com/Permify/permify/internal/storage/postgres/utils"
22
        "github.com/Permify/permify/pkg/database"
23
        db "github.com/Permify/permify/pkg/database/postgres"
24
        base "github.com/Permify/permify/pkg/pb/base/v1"
25
        "github.com/Permify/permify/pkg/token"
26
)
27

28
// DataReader is a struct which holds a reference to the database, transaction options and a logger.
29
// It is responsible for reading data from the database.
30
type DataReader struct {
31
        database  *db.Postgres  // database is an instance of the PostgreSQL database
32
        txOptions sql.TxOptions // txOptions specifies the isolation level for database transaction and sets it as read only
33
}
34

35
// NewDataReader is a constructor function for DataReader.
36
// It initializes a new DataReader with a given database, a logger, and sets transaction options to be read-only with Repeatable Read isolation level.
37
func NewDataReader(database *db.Postgres) *DataReader {
13✔
38
        return &DataReader{
13✔
39
                database:  database,                                                          // Set the database to the passed in PostgreSQL instance
13✔
40
                txOptions: sql.TxOptions{Isolation: sql.LevelRepeatableRead, ReadOnly: true}, // Set the transaction options
13✔
41
        }
13✔
42
}
13✔
43

44
// QueryRelationships reads relation tuples from the storage based on the given filter.
45
func (r *DataReader) QueryRelationships(ctx context.Context, tenantID string, filter *base.TupleFilter, snap string) (it *database.TupleIterator, err error) {
2✔
46
        // Start a new trace span and end it when the function exits.
2✔
47
        ctx, span := tracer.Start(ctx, "data-reader.query-relationships")
2✔
48
        defer span.End()
2✔
49

2✔
50
        slog.Info("Querying relationships for tenantID: ", slog.String("tenant_id", tenantID))
2✔
51

2✔
52
        // Decode the snapshot value.
2✔
53
        var st token.SnapToken
2✔
54
        st, err = snapshot.EncodedToken{Value: snap}.Decode()
2✔
55
        if err != nil {
2✔
56
                span.RecordError(err)
×
57
                span.SetStatus(codes.Error, err.Error())
×
58

×
59
                slog.Error("Failed to decode snapshot value: ", slog.Any("error", err))
×
60

×
61
                return nil, err
×
62
        }
×
63

64
        // Begin a new read-only transaction with the specified isolation level.
65
        var tx *sql.Tx
2✔
66
        tx, err = r.database.DB.BeginTx(ctx, &r.txOptions)
2✔
67
        if err != nil {
2✔
68
                span.RecordError(err)
×
69
                span.SetStatus(codes.Error, err.Error())
×
70

×
71
                slog.Error("Failed to begin transaction: ", slog.Any("error", err))
×
72

×
73
                return nil, err
×
74
        }
×
75

76
        // Rollback the transaction in case of any error.
77
        defer utils.Rollback(tx)
2✔
78

2✔
79
        // Build the relationships query based on the provided filter and snapshot value.
2✔
80
        var args []interface{}
2✔
81
        builder := r.database.Builder.Select("entity_type, entity_id, relation, subject_type, subject_id, subject_relation").From(RelationTuplesTable).Where(squirrel.Eq{"tenant_id": tenantID})
2✔
82
        builder = utils.TuplesFilterQueryForSelectBuilder(builder, filter)
2✔
83
        builder = utils.SnapshotQuery(builder, st.(snapshot.Token).Value.Uint)
2✔
84

2✔
85
        // Generate the SQL query and arguments.
2✔
86
        var query string
2✔
87
        query, args, err = builder.ToSql()
2✔
88

2✔
89
        if err != nil {
2✔
90
                span.RecordError(err)
×
91
                span.SetStatus(codes.Error, err.Error())
×
92

×
93
                slog.Error("Failed to generate SQL query: ", slog.Any("error", err))
×
94

×
95
                return nil, errors.New(base.ErrorCode_ERROR_CODE_SQL_BUILDER.String())
×
96
        }
×
97

98
        slog.Debug("Generated SQL query: ", slog.String("query", query), "with args", slog.Any("arguments", args))
2✔
99

2✔
100
        // Execute the SQL query and retrieve the result rows.
2✔
101
        var rows *sql.Rows
2✔
102
        rows, err = tx.QueryContext(ctx, query, args...)
2✔
103
        if err != nil {
2✔
104
                span.RecordError(err)
×
105
                span.SetStatus(codes.Error, err.Error())
×
106

×
107
                slog.Error("Failed to execute SQL query: ", slog.Any("error", err))
×
108

×
109
                return nil, errors.New(base.ErrorCode_ERROR_CODE_EXECUTION.String())
×
110
        }
×
111
        defer rows.Close()
2✔
112

2✔
113
        // Process the result rows and store the relationships in a TupleCollection.
2✔
114
        collection := database.NewTupleCollection()
2✔
115
        for rows.Next() {
5✔
116
                rt := storage.RelationTuple{}
3✔
117
                err = rows.Scan(&rt.EntityType, &rt.EntityID, &rt.Relation, &rt.SubjectType, &rt.SubjectID, &rt.SubjectRelation)
3✔
118
                if err != nil {
3✔
119
                        span.RecordError(err)
×
120
                        span.SetStatus(codes.Error, err.Error())
×
121

×
122
                        slog.Error("Failed to scan result rows: ", slog.Any("error", err))
×
123

×
124
                        return nil, err
×
125
                }
×
126
                collection.Add(rt.ToTuple())
3✔
127
        }
128
        if err = rows.Err(); err != nil {
2✔
129
                span.RecordError(err)
×
130
                span.SetStatus(codes.Error, err.Error())
×
131

×
132
                slog.Error("Failed to process result rows: ", slog.Any("error", err))
×
133

×
134
                return nil, err
×
135
        }
×
136

137
        // Commit the transaction.
138
        err = tx.Commit()
2✔
139
        if err != nil {
2✔
140
                span.RecordError(err)
×
141
                span.SetStatus(codes.Error, err.Error())
×
142

×
143
                slog.Error("Failed to commit transaction: ", slog.Any("error", err))
×
144

×
145
                return nil, err
×
146
        }
×
147

148
        slog.Info("Successfully retrieved relationship tuples from the database.")
2✔
149

2✔
150
        // Return a TupleIterator created from the TupleCollection.
2✔
151
        return collection.CreateTupleIterator(), nil
2✔
152
}
153

154
// ReadRelationships reads relation tuples from the storage based on the given filter and pagination.
155
func (r *DataReader) ReadRelationships(ctx context.Context, tenantID string, filter *base.TupleFilter, snap string, pagination database.Pagination) (collection *database.TupleCollection, ct database.EncodedContinuousToken, err error) {
9✔
156
        // Start a new trace span and end it when the function exits.
9✔
157
        ctx, span := tracer.Start(ctx, "data-reader.read-relationships")
9✔
158
        defer span.End()
9✔
159

9✔
160
        slog.Info("Reading relationships for tenantID: ", slog.String("tenant_id", tenantID))
9✔
161

9✔
162
        // Decode the snapshot value.
9✔
163
        var st token.SnapToken
9✔
164
        st, err = snapshot.EncodedToken{Value: snap}.Decode()
9✔
165
        if err != nil {
9✔
166
                span.RecordError(err)
×
167
                span.SetStatus(codes.Error, err.Error())
×
168

×
169
                slog.Error("Failed to decode snapshot value: ", slog.Any("error", err))
×
170

×
171
                return nil, nil, err
×
172
        }
×
173

174
        // Begin a new read-only transaction with the specified isolation level.
175
        var tx *sql.Tx
9✔
176
        tx, err = r.database.DB.BeginTx(ctx, &r.txOptions)
9✔
177
        if err != nil {
9✔
178
                span.RecordError(err)
×
179
                span.SetStatus(codes.Error, err.Error())
×
180

×
181
                slog.Error("Failed to begin transaction: ", slog.Any("error", err))
×
182

×
183
                return nil, nil, err
×
184
        }
×
185

186
        // Rollback the transaction in case of any error.
187
        defer utils.Rollback(tx)
9✔
188

9✔
189
        // Build the relationships query based on the provided filter, snapshot value, and pagination settings.
9✔
190
        builder := r.database.Builder.Select("id, entity_type, entity_id, relation, subject_type, subject_id, subject_relation").From(RelationTuplesTable).Where(squirrel.Eq{"tenant_id": tenantID})
9✔
191
        builder = utils.TuplesFilterQueryForSelectBuilder(builder, filter)
9✔
192
        builder = utils.SnapshotQuery(builder, st.(snapshot.Token).Value.Uint)
9✔
193

9✔
194
        // Apply the pagination token and limit to the query.
9✔
195
        if pagination.Token() != "" {
10✔
196
                var t database.ContinuousToken
1✔
197
                t, err = utils.EncodedContinuousToken{Value: pagination.Token()}.Decode()
1✔
198
                if err != nil {
1✔
199
                        span.RecordError(err)
×
200
                        span.SetStatus(codes.Error, err.Error())
×
201

×
202
                        slog.Error("Failed to apply pagination token: ", slog.Any("error", err))
×
203

×
204
                        return nil, nil, err
×
205
                }
×
206
                var v uint64
1✔
207
                v, err = strconv.ParseUint(t.(utils.ContinuousToken).Value, 10, 64)
1✔
208
                if err != nil {
1✔
209
                        span.RecordError(err)
×
210
                        span.SetStatus(codes.Error, err.Error())
×
211

×
212
                        slog.Error("Failed to apply limit to the query: ", slog.Any("error", err))
×
213

×
214
                        return nil, nil, errors.New(base.ErrorCode_ERROR_CODE_INVALID_CONTINUOUS_TOKEN.String())
×
215
                }
×
216
                builder = builder.Where(squirrel.GtOrEq{"id": v})
1✔
217
        }
218

219
        builder = builder.OrderBy("id").Limit(uint64(pagination.PageSize() + 1))
9✔
220

9✔
221
        // Generate the SQL query and arguments.
9✔
222
        var query string
9✔
223
        var args []interface{}
9✔
224
        query, args, err = builder.ToSql()
9✔
225
        if err != nil {
9✔
226
                span.RecordError(err)
×
227
                span.SetStatus(codes.Error, err.Error())
×
228

×
229
                slog.Error("Failed to generate SQL query: ", slog.Any("error", err))
×
230

×
231
                return nil, database.NewNoopContinuousToken().Encode(), errors.New(base.ErrorCode_ERROR_CODE_SQL_BUILDER.String())
×
232
        }
×
233

234
        slog.Debug("Generated SQL query: ", slog.String("query", query), "with args", slog.Any("arguments", args))
9✔
235

9✔
236
        // Execute the query and retrieve the rows.
9✔
237
        var rows *sql.Rows
9✔
238
        rows, err = tx.QueryContext(ctx, query, args...)
9✔
239
        if err != nil {
9✔
240
                span.RecordError(err)
×
241
                span.SetStatus(codes.Error, err.Error())
×
242

×
243
                slog.Error("Failed to execute SQL query and retrieve the rows: ", slog.Any("error", err))
×
244

×
245
                return nil, database.NewNoopContinuousToken().Encode(), errors.New(base.ErrorCode_ERROR_CODE_EXECUTION.String())
×
246
        }
×
247
        defer rows.Close()
9✔
248

9✔
249
        var lastID uint64
9✔
250

9✔
251
        // Iterate through the rows and scan the result into a RelationTuple struct.
9✔
252
        tuples := make([]*base.Tuple, 0, pagination.PageSize()+1)
9✔
253
        for rows.Next() {
35✔
254
                rt := storage.RelationTuple{}
26✔
255
                err = rows.Scan(&rt.ID, &rt.EntityType, &rt.EntityID, &rt.Relation, &rt.SubjectType, &rt.SubjectID, &rt.SubjectRelation)
26✔
256
                if err != nil {
26✔
257
                        span.RecordError(err)
×
258
                        span.SetStatus(codes.Error, err.Error())
×
259

×
260
                        slog.Error("Failed to scan result rows: ", slog.Any("error", err))
×
261

×
262
                        return nil, nil, err
×
263
                }
×
264
                lastID = rt.ID
26✔
265
                tuples = append(tuples, rt.ToTuple())
26✔
266
        }
267
        // Check for any errors during iteration.
268
        if err = rows.Err(); err != nil {
9✔
269
                span.RecordError(err)
×
270
                span.SetStatus(codes.Error, err.Error())
×
271

×
272
                slog.Error("Failed to process result rows: ", slog.Any("error", err))
×
273

×
274
                return nil, nil, err
×
275
        }
×
276

277
        // Commit the transaction.
278
        err = tx.Commit()
9✔
279
        if err != nil {
9✔
280
                span.RecordError(err)
×
281
                span.SetStatus(codes.Error, err.Error())
×
282

×
283
                slog.Error("Failed to commit transaction: ", slog.Any("error", err))
×
284

×
285
                return nil, nil, err
×
286
        }
×
287

288
        slog.Info("Successfully read relationships from database.")
9✔
289

9✔
290
        // Return the results and encoded continuous token for pagination.
9✔
291
        if len(tuples) > int(pagination.PageSize()) {
10✔
292
                return database.NewTupleCollection(tuples[:pagination.PageSize()]...), utils.NewContinuousToken(strconv.FormatUint(lastID, 10)).Encode(), nil
1✔
293
        }
1✔
294

295
        return database.NewTupleCollection(tuples...), database.NewNoopContinuousToken().Encode(), nil
8✔
296
}
297

298
// QuerySingleAttribute retrieves a single attribute from the storage based on the given filter.
299
func (r *DataReader) QuerySingleAttribute(ctx context.Context, tenantID string, filter *base.AttributeFilter, snap string) (attribute *base.Attribute, err error) {
4✔
300
        // Start a new trace span and end it when the function exits.
4✔
301
        ctx, span := tracer.Start(ctx, "data-reader.query-single-attribute")
4✔
302
        defer span.End()
4✔
303
        slog.Info("Querying single attribute for tenantID: ", slog.String("tenant_id", tenantID))
4✔
304

4✔
305
        // Decode the snapshot value.
4✔
306
        var st token.SnapToken
4✔
307
        st, err = snapshot.EncodedToken{Value: snap}.Decode()
4✔
308
        if err != nil {
4✔
309
                span.RecordError(err)
×
310
                span.SetStatus(codes.Error, err.Error())
×
311

×
312
                slog.Error("Failed to decode snapshot value: ", slog.Any("error", err))
×
313

×
314
                return nil, err
×
315
        }
×
316

317
        // Begin a new read-only transaction with the specified isolation level.
318
        var tx *sql.Tx
4✔
319
        tx, err = r.database.DB.BeginTx(ctx, &r.txOptions)
4✔
320
        if err != nil {
4✔
321
                span.RecordError(err)
×
322
                span.SetStatus(codes.Error, err.Error())
×
323

×
324
                slog.Error("Failed to begin transaction: ", slog.Any("error", err))
×
325

×
326
                return nil, err
×
327
        }
×
328

329
        // Rollback the transaction in case of any error.
330
        defer utils.Rollback(tx)
4✔
331

4✔
332
        // Build the relationships query based on the provided filter and snapshot value.
4✔
333
        var args []interface{}
4✔
334
        builder := r.database.Builder.Select("entity_type, entity_id, attribute, value").From(AttributesTable).Where(squirrel.Eq{"tenant_id": tenantID})
4✔
335
        builder = utils.AttributesFilterQueryForSelectBuilder(builder, filter)
4✔
336
        builder = utils.SnapshotQuery(builder, st.(snapshot.Token).Value.Uint)
4✔
337

4✔
338
        // Generate the SQL query and arguments.
4✔
339
        var query string
4✔
340
        query, args, err = builder.ToSql()
4✔
341
        if err != nil {
4✔
342
                span.RecordError(err)
×
343
                span.SetStatus(codes.Error, err.Error())
×
344

×
345
                slog.Error("Failed to generate SQL query: ", slog.Any("error", err))
×
346

×
347
                return nil, errors.New(base.ErrorCode_ERROR_CODE_SQL_BUILDER.String())
×
348
        }
×
349

350
        slog.Debug("Generated SQL query: ", slog.String("query", query), "with args", slog.Any("arguments", args))
4✔
351

4✔
352
        row := tx.QueryRowContext(ctx, query, args...)
4✔
353

4✔
354
        rt := storage.Attribute{}
4✔
355

4✔
356
        // Suppose you have a struct `rt` with a field `Value` of type `*anypb.Any`.
4✔
357
        var valueStr string
4✔
358

4✔
359
        // Scan the row from the database into the fields of `rt` and `valueStr`.
4✔
360
        err = row.Scan(&rt.EntityType, &rt.EntityID, &rt.Attribute, &valueStr)
4✔
361
        if err != nil {
5✔
362
                if errors.Is(err, sql.ErrNoRows) {
2✔
363
                        return nil, nil
1✔
364
                } else {
1✔
365
                        span.RecordError(err)
×
366
                        span.SetStatus(codes.Error, err.Error())
×
367

×
368
                        slog.Error("Failed to scan result rows: ", slog.Any("error", err))
×
369

×
370
                        return nil, err
×
371
                }
×
372
        }
373

374
        // Unmarshal the JSON data from `valueStr` into `rt.Value`.
375
        rt.Value = &anypb.Any{}
3✔
376
        unmarshaler := &jsonpb.Unmarshaler{}
3✔
377
        err = unmarshaler.Unmarshal(strings.NewReader(valueStr), rt.Value)
3✔
378
        if err != nil {
3✔
379

×
380
                slog.Error("Failed unmarshal the value: ", slog.Any("error", err))
×
381

×
382
                return nil, err
×
383
        }
×
384

385
        // Commit the transaction.
386
        err = tx.Commit()
3✔
387
        if err != nil {
3✔
388
                span.RecordError(err)
×
389
                span.SetStatus(codes.Error, err.Error())
×
390

×
391
                slog.Error("Failed to commit transaction: ", slog.Any("error", err))
×
392

×
393
                return nil, err
×
394
        }
×
395

396
        slog.Info("Successfully retrieved Single attribute from the database.")
3✔
397

3✔
398
        return rt.ToAttribute(), nil
3✔
399
}
400

401
// QueryAttributes reads multiple attributes from the storage based on the given filter.
402
func (r *DataReader) QueryAttributes(ctx context.Context, tenantID string, filter *base.AttributeFilter, snap string) (it *database.AttributeIterator, err error) {
2✔
403
        // Start a new trace span and end it when the function exits.
2✔
404
        ctx, span := tracer.Start(ctx, "data-reader.query-attributes")
2✔
405
        defer span.End()
2✔
406

2✔
407
        slog.Info("Querying Attributes for tenantID: ", slog.String("tenant_id", tenantID))
2✔
408

2✔
409
        // Decode the snapshot value.
2✔
410
        var st token.SnapToken
2✔
411
        st, err = snapshot.EncodedToken{Value: snap}.Decode()
2✔
412
        if err != nil {
2✔
413
                span.RecordError(err)
×
414
                span.SetStatus(codes.Error, err.Error())
×
415

×
416
                slog.Error("Failed to decode snapshot value: ", slog.Any("error", err))
×
417

×
418
                return nil, err
×
419
        }
×
420

421
        // Begin a new read-only transaction with the specified isolation level.
422
        var tx *sql.Tx
2✔
423
        tx, err = r.database.DB.BeginTx(ctx, &r.txOptions)
2✔
424
        if err != nil {
2✔
425
                span.RecordError(err)
×
426
                span.SetStatus(codes.Error, err.Error())
×
427

×
428
                slog.Error("Failed to begin transaction: ", slog.Any("error", err))
×
429

×
430
                return nil, err
×
431
        }
×
432

433
        // Rollback the transaction in case of any error.
434
        defer utils.Rollback(tx)
2✔
435

2✔
436
        // Build the relationships query based on the provided filter and snapshot value.
2✔
437
        var args []interface{}
2✔
438
        builder := r.database.Builder.Select("entity_type, entity_id, attribute, value").From(AttributesTable).Where(squirrel.Eq{"tenant_id": tenantID})
2✔
439
        builder = utils.AttributesFilterQueryForSelectBuilder(builder, filter)
2✔
440
        builder = utils.SnapshotQuery(builder, st.(snapshot.Token).Value.Uint)
2✔
441

2✔
442
        // Generate the SQL query and arguments.
2✔
443
        var query string
2✔
444
        query, args, err = builder.ToSql()
2✔
445

2✔
446
        if err != nil {
2✔
447
                span.RecordError(err)
×
448
                span.SetStatus(codes.Error, err.Error())
×
449

×
450
                slog.Error("Failed to generate SQL query: ", slog.Any("error", err))
×
451

×
452
                return nil, errors.New(base.ErrorCode_ERROR_CODE_SQL_BUILDER.String())
×
453
        }
×
454

455
        slog.Debug("Generated SQL query: ", slog.String("query", query), "with args", slog.Any("arguments", args))
2✔
456

2✔
457
        // Execute the SQL query and retrieve the result rows.
2✔
458
        var rows *sql.Rows
2✔
459
        rows, err = tx.QueryContext(ctx, query, args...)
2✔
460
        if err != nil {
2✔
461
                span.RecordError(err)
×
462
                span.SetStatus(codes.Error, err.Error())
×
463

×
464
                slog.Error("Failed to execute SQL query: ", slog.Any("error", err))
×
465

×
466
                return nil, errors.New(base.ErrorCode_ERROR_CODE_EXECUTION.String())
×
467
        }
×
468
        defer rows.Close()
2✔
469

2✔
470
        // Process the result rows and store the relationships in a TupleCollection.
2✔
471
        collection := database.NewAttributeCollection()
2✔
472
        for rows.Next() {
5✔
473
                rt := storage.Attribute{}
3✔
474

3✔
475
                // Suppose you have a struct `rt` with a field `Value` of type `*anypb.Any`.
3✔
476
                var valueStr string
3✔
477

3✔
478
                // Scan the row from the database into the fields of `rt` and `valueStr`.
3✔
479
                err := rows.Scan(&rt.EntityType, &rt.EntityID, &rt.Attribute, &valueStr)
3✔
480
                if err != nil {
3✔
481
                        span.RecordError(err)
×
482
                        span.SetStatus(codes.Error, err.Error())
×
483

×
484
                        slog.Error("Failed to scan result rows: ", slog.Any("error", err))
×
485

×
486
                        return nil, err
×
487
                }
×
488

489
                // Unmarshal the JSON data from `valueStr` into `rt.Value`.
490
                rt.Value = &anypb.Any{}
3✔
491
                unmarshaler := &jsonpb.Unmarshaler{}
3✔
492
                err = unmarshaler.Unmarshal(strings.NewReader(valueStr), rt.Value)
3✔
493
                if err != nil {
3✔
494

×
495
                        slog.Error("Failed unmarshal the value: ", slog.Any("error", err))
×
496

×
497
                        return nil, err
×
498
                }
×
499

500
                collection.Add(rt.ToAttribute())
3✔
501
        }
502
        if err = rows.Err(); err != nil {
2✔
503
                span.RecordError(err)
×
504
                span.SetStatus(codes.Error, err.Error())
×
505

×
506
                slog.Error("Failed to process result rows: ", slog.Any("error", err))
×
507

×
508
                return nil, err
×
509
        }
×
510

511
        // Commit the transaction.
512
        err = tx.Commit()
2✔
513
        if err != nil {
2✔
514
                span.RecordError(err)
×
515
                span.SetStatus(codes.Error, err.Error())
×
516

×
517
                slog.Error("Failed to commit transaction: ", slog.Any("error", err))
×
518

×
519
                return nil, err
×
520
        }
×
521

522
        slog.Info("Successfully retrieved attributes tuples from the database.")
2✔
523

2✔
524
        // Return a TupleIterator created from the TupleCollection.
2✔
525
        return collection.CreateAttributeIterator(), nil
2✔
526
}
527

528
// ReadAttributes reads multiple attributes from the storage based on the given filter and pagination.
529
func (r *DataReader) ReadAttributes(ctx context.Context, tenantID string, filter *base.AttributeFilter, snap string, pagination database.Pagination) (collection *database.AttributeCollection, ct database.EncodedContinuousToken, err error) {
9✔
530
        // Start a new trace span and end it when the function exits.
9✔
531
        ctx, span := tracer.Start(ctx, "data-reader.read-attributes")
9✔
532
        defer span.End()
9✔
533

9✔
534
        slog.Info("Reading attributes for tenantID: ", slog.String("tenant_id", tenantID))
9✔
535

9✔
536
        // Decode the snapshot value.
9✔
537
        var st token.SnapToken
9✔
538
        st, err = snapshot.EncodedToken{Value: snap}.Decode()
9✔
539
        if err != nil {
9✔
540
                span.RecordError(err)
×
541
                span.SetStatus(codes.Error, err.Error())
×
542

×
543
                slog.Error("Failed to decode snapshot value: ", slog.Any("error", err))
×
544

×
545
                return nil, nil, err
×
546
        }
×
547

548
        // Begin a new read-only transaction with the specified isolation level.
549
        var tx *sql.Tx
9✔
550
        tx, err = r.database.DB.BeginTx(ctx, &r.txOptions)
9✔
551
        if err != nil {
9✔
552
                span.RecordError(err)
×
553
                span.SetStatus(codes.Error, err.Error())
×
554

×
555
                slog.Error("Failed to begin transaction: ", slog.Any("error", err))
×
556

×
557
                return nil, nil, err
×
558
        }
×
559

560
        // Rollback the transaction in case of any error.
561
        defer utils.Rollback(tx)
9✔
562

9✔
563
        // Build the relationships query based on the provided filter, snapshot value, and pagination settings.
9✔
564
        builder := r.database.Builder.Select("id, entity_type, entity_id, attribute, value").From(AttributesTable).Where(squirrel.Eq{"tenant_id": tenantID})
9✔
565
        builder = utils.AttributesFilterQueryForSelectBuilder(builder, filter)
9✔
566
        builder = utils.SnapshotQuery(builder, st.(snapshot.Token).Value.Uint)
9✔
567

9✔
568
        // Apply the pagination token and limit to the query.
9✔
569
        if pagination.Token() != "" {
10✔
570
                var t database.ContinuousToken
1✔
571
                t, err = utils.EncodedContinuousToken{Value: pagination.Token()}.Decode()
1✔
572
                if err != nil {
1✔
573
                        span.RecordError(err)
×
574
                        span.SetStatus(codes.Error, err.Error())
×
575

×
576
                        slog.Error("Failed to apply pagination token: ", slog.Any("error", err))
×
577

×
578
                        return nil, nil, err
×
579
                }
×
580
                var v uint64
1✔
581
                v, err = strconv.ParseUint(t.(utils.ContinuousToken).Value, 10, 64)
1✔
582
                if err != nil {
1✔
583
                        span.RecordError(err)
×
584
                        span.SetStatus(codes.Error, err.Error())
×
585

×
586
                        slog.Error("Failed to apply limit on query: ", slog.Any("error", err))
×
587

×
588
                        return nil, nil, errors.New(base.ErrorCode_ERROR_CODE_INVALID_CONTINUOUS_TOKEN.String())
×
589
                }
×
590
                builder = builder.Where(squirrel.GtOrEq{"id": v})
1✔
591
        }
592

593
        builder = builder.OrderBy("id").Limit(uint64(pagination.PageSize() + 1))
9✔
594

9✔
595
        // Generate the SQL query and arguments.
9✔
596
        var query string
9✔
597
        var args []interface{}
9✔
598
        query, args, err = builder.ToSql()
9✔
599
        if err != nil {
9✔
600
                span.RecordError(err)
×
601
                span.SetStatus(codes.Error, err.Error())
×
602

×
603
                slog.Error("Failed to generate SQL query: ", slog.Any("error", err))
×
604

×
605
                return nil, database.NewNoopContinuousToken().Encode(), errors.New(base.ErrorCode_ERROR_CODE_SQL_BUILDER.String())
×
606
        }
×
607

608
        slog.Debug("Generated SQL query: ", slog.String("query", query), "with args", slog.Any("arguments", args))
9✔
609

9✔
610
        // Execute the query and retrieve the rows.
9✔
611
        var rows *sql.Rows
9✔
612
        rows, err = tx.QueryContext(ctx, query, args...)
9✔
613
        if err != nil {
9✔
614
                span.RecordError(err)
×
615
                span.SetStatus(codes.Error, err.Error())
×
616

×
617
                slog.Error("Failed to execute SQL query: ", slog.Any("error", err))
×
618

×
619
                return nil, database.NewNoopContinuousToken().Encode(), errors.New(base.ErrorCode_ERROR_CODE_EXECUTION.String())
×
620
        }
×
621
        defer rows.Close()
9✔
622

9✔
623
        var lastID uint64
9✔
624

9✔
625
        // Iterate through the rows and scan the result into a RelationTuple struct.
9✔
626
        attributes := make([]*base.Attribute, 0, pagination.PageSize()+1)
9✔
627
        for rows.Next() {
29✔
628
                rt := storage.Attribute{}
20✔
629

20✔
630
                // Suppose you have a struct `rt` with a field `Value` of type `*anypb.Any`.
20✔
631
                var valueStr string
20✔
632

20✔
633
                // Scan the row from the database into the fields of `rt` and `valueStr`.
20✔
634
                err := rows.Scan(&rt.ID, &rt.EntityType, &rt.EntityID, &rt.Attribute, &valueStr)
20✔
635
                if err != nil {
20✔
636
                        span.RecordError(err)
×
637
                        span.SetStatus(codes.Error, err.Error())
×
638

×
639
                        slog.Error("Failed to scan result rows: ", slog.Any("error", err))
×
640

×
641
                        return nil, nil, err
×
642
                }
×
643
                lastID = rt.ID
20✔
644

20✔
645
                // Unmarshal the JSON data from `valueStr` into `rt.Value`.
20✔
646
                rt.Value = &anypb.Any{}
20✔
647
                unmarshaler := &jsonpb.Unmarshaler{}
20✔
648
                err = unmarshaler.Unmarshal(strings.NewReader(valueStr), rt.Value)
20✔
649
                if err != nil {
20✔
650
                        slog.Error("Failed to unmarshall the value: ", slog.Any("error", err))
×
651

×
652
                        return nil, nil, err
×
653
                }
×
654

655
                attributes = append(attributes, rt.ToAttribute())
20✔
656
        }
657
        // Check for any errors during iteration.
658
        if err = rows.Err(); err != nil {
9✔
659
                span.RecordError(err)
×
660
                span.SetStatus(codes.Error, err.Error())
×
661

×
662
                slog.Error("Failed to process result rows: ", slog.Any("error", err))
×
663

×
664
                return nil, nil, err
×
665
        }
×
666

667
        // Commit the transaction.
668
        err = tx.Commit()
9✔
669
        if err != nil {
9✔
670
                span.RecordError(err)
×
671
                span.SetStatus(codes.Error, err.Error())
×
672

×
673
                slog.Error("Failed to commit transaction: ", slog.Any("error", err))
×
674

×
675
                return nil, nil, err
×
676
        }
×
677

678
        slog.Info("Successfully read attributes from the database.")
9✔
679

9✔
680
        // Return the results and encoded continuous token for pagination.
9✔
681
        if len(attributes) > int(pagination.PageSize()) {
10✔
682
                return database.NewAttributeCollection(attributes[:pagination.PageSize()]...), utils.NewContinuousToken(strconv.FormatUint(lastID, 10)).Encode(), nil
1✔
683
        }
1✔
684

685
        return database.NewAttributeCollection(attributes...), database.NewNoopContinuousToken().Encode(), nil
8✔
686
}
687

688
// QueryUniqueEntities reads unique entities from the storage based on the given filter and pagination.
689
func (r *DataReader) QueryUniqueEntities(ctx context.Context, tenantID, name, snap string, pagination database.Pagination) (ids []string, ct database.EncodedContinuousToken, err error) {
4✔
690
        // Start a new trace span and end it when the function exits.
4✔
691
        ctx, span := tracer.Start(ctx, "data-reader.query-unique-entities")
4✔
692
        defer span.End()
4✔
693

4✔
694
        // Decode the snapshot value.
4✔
695
        var st token.SnapToken
4✔
696
        st, err = snapshot.EncodedToken{Value: snap}.Decode()
4✔
697
        if err != nil {
4✔
698
                span.RecordError(err)
×
699
                span.SetStatus(codes.Error, err.Error())
×
700
                return nil, nil, err
×
701
        }
×
702

703
        // Begin a new read-only transaction with the specified isolation level.
704
        var tx *sql.Tx
4✔
705
        tx, err = r.database.DB.BeginTx(ctx, &r.txOptions)
4✔
706
        if err != nil {
4✔
707
                span.RecordError(err)
×
708
                span.SetStatus(codes.Error, err.Error())
×
709
                return nil, nil, err
×
710
        }
×
711

712
        // Rollback the transaction in case of any error.
713
        defer utils.Rollback(tx)
4✔
714

4✔
715
        query := utils.BulkEntityFilterQuery(tenantID, name, st.(snapshot.Token).Value.Uint)
4✔
716

4✔
717
        // Apply the pagination token and limit to the subQuery.
4✔
718
        if pagination.Token() != "" {
5✔
719
                var t database.ContinuousToken
1✔
720
                t, err = utils.EncodedContinuousToken{Value: pagination.Token()}.Decode()
1✔
721
                if err != nil {
1✔
722
                        span.RecordError(err)
×
723
                        span.SetStatus(codes.Error, err.Error())
×
724
                        return nil, nil, err
×
725
                }
×
726
                var v uint64
1✔
727
                v, err = strconv.ParseUint(t.(utils.ContinuousToken).Value, 10, 64)
1✔
728
                if err != nil {
1✔
729
                        span.RecordError(err)
×
730
                        span.SetStatus(codes.Error, err.Error())
×
731
                        return nil, nil, errors.New(base.ErrorCode_ERROR_CODE_INVALID_CONTINUOUS_TOKEN.String())
×
732
                }
×
733

734
                query = fmt.Sprintf("%s WHERE id >= %s", query, strconv.FormatUint(v, 10))
1✔
735
        }
736

737
        // Append ORDER BY and LIMIT clauses.
738
        query = fmt.Sprintf("%s ORDER BY id LIMIT %d", query, pagination.PageSize()+1)
4✔
739

4✔
740
        // Execute the query and retrieve the rows.
4✔
741
        var rows *sql.Rows
4✔
742
        rows, err = tx.QueryContext(ctx, query)
4✔
743
        if err != nil {
4✔
744
                span.RecordError(err)
×
745
                span.SetStatus(codes.Error, err.Error())
×
746
                return nil, database.NewNoopContinuousToken().Encode(), errors.New(base.ErrorCode_ERROR_CODE_EXECUTION.String())
×
747
        }
×
748
        defer rows.Close()
4✔
749

4✔
750
        var lastID uint64
4✔
751

4✔
752
        // Iterate through the rows and scan the result into a RelationTuple struct.
4✔
753
        entityIDs := make([]string, 0, pagination.PageSize()+1)
4✔
754
        for rows.Next() {
31✔
755
                var entityId string
27✔
756
                err = rows.Scan(&lastID, &entityId)
27✔
757
                if err != nil {
27✔
758
                        span.RecordError(err)
×
759
                        span.SetStatus(codes.Error, err.Error())
×
760
                        return nil, nil, err
×
761
                }
×
762

763
                entityIDs = append(entityIDs, entityId)
27✔
764
        }
765

766
        // Check for any errors during iteration.
767
        if err = rows.Err(); err != nil {
4✔
768
                span.RecordError(err)
×
769
                span.SetStatus(codes.Error, err.Error())
×
770
                return nil, nil, err
×
771
        }
×
772

773
        // Commit the transaction.
774
        err = tx.Commit()
4✔
775
        if err != nil {
4✔
776
                span.RecordError(err)
×
777
                span.SetStatus(codes.Error, err.Error())
×
778
                return nil, nil, err
×
779
        }
×
780

781
        // Return the results and encoded continuous token for pagination.
782
        if len(entityIDs) > int(pagination.PageSize()) {
5✔
783
                return entityIDs[:pagination.PageSize()], utils.NewContinuousToken(strconv.FormatUint(lastID, 10)).Encode(), nil
1✔
784
        }
1✔
785

786
        return entityIDs, database.NewNoopContinuousToken().Encode(), nil
3✔
787
}
788

789
// QueryUniqueSubjectReferences reads unique subject references from the storage based on the given filter and pagination.
790
func (r *DataReader) QueryUniqueSubjectReferences(ctx context.Context, tenantID string, subjectReference *base.RelationReference, snap string, pagination database.Pagination) (ids []string, ct database.EncodedContinuousToken, err error) {
4✔
791
        // Start a new trace span and end it when the function exits.
4✔
792
        ctx, span := tracer.Start(ctx, "data-reader.query-unique-subject-reference")
4✔
793
        defer span.End()
4✔
794

4✔
795
        slog.Info("Querying unique subject references for tenantID: ", slog.String("tenant_id", tenantID))
4✔
796

4✔
797
        // Decode the snapshot value.
4✔
798
        var st token.SnapToken
4✔
799
        st, err = snapshot.EncodedToken{Value: snap}.Decode()
4✔
800
        if err != nil {
4✔
801
                span.RecordError(err)
×
802
                span.SetStatus(codes.Error, err.Error())
×
803

×
804
                slog.Error("Failed to decode snapshot value: ", slog.Any("error", err))
×
805

×
806
                return nil, nil, err
×
807
        }
×
808

809
        // Begin a new read-only transaction with the specified isolation level.
810
        var tx *sql.Tx
4✔
811
        tx, err = r.database.DB.BeginTx(ctx, &r.txOptions)
4✔
812
        if err != nil {
4✔
813
                span.RecordError(err)
×
814
                span.SetStatus(codes.Error, err.Error())
×
815

×
816
                slog.Error("Failed to begin transaction: ", slog.Any("error", err))
×
817

×
818
                return nil, nil, err
×
819
        }
×
820

821
        // Rollback the transaction in case of any error.
822
        defer utils.Rollback(tx)
4✔
823

4✔
824
        // Build the relationships query based on the provided filter, snapshot value, and pagination settings.
4✔
825
        builder := r.database.Builder.
4✔
826
                Select("MIN(id) as id, subject_id"). // This will pick the smallest `id` for each unique `subject_id`.
4✔
827
                From(RelationTuplesTable).
4✔
828
                Where(squirrel.Eq{"tenant_id": tenantID}).
4✔
829
                GroupBy("subject_id")
4✔
830
        builder = utils.TuplesFilterQueryForSelectBuilder(builder, &base.TupleFilter{Subject: &base.SubjectFilter{Type: subjectReference.GetType(), Relation: subjectReference.GetRelation()}})
4✔
831
        builder = utils.SnapshotQuery(builder, st.(snapshot.Token).Value.Uint)
4✔
832

4✔
833
        // Apply the pagination token and limit to the query.
4✔
834
        if pagination.Token() != "" {
5✔
835
                var t database.ContinuousToken
1✔
836
                t, err = utils.EncodedContinuousToken{Value: pagination.Token()}.Decode()
1✔
837
                if err != nil {
1✔
838
                        span.RecordError(err)
×
839
                        span.SetStatus(codes.Error, err.Error())
×
840

×
841
                        slog.Error("Failed to apply pagination token: ", slog.Any("error", err))
×
842

×
843
                        return nil, nil, err
×
844
                }
×
845
                var v uint64
1✔
846
                v, err = strconv.ParseUint(t.(utils.ContinuousToken).Value, 10, 64)
1✔
847
                if err != nil {
1✔
848
                        span.RecordError(err)
×
849
                        span.SetStatus(codes.Error, err.Error())
×
850

×
851
                        slog.Error("Failed to apply limit on query: ", slog.Any("error", err))
×
852

×
853
                        return nil, nil, errors.New(base.ErrorCode_ERROR_CODE_INVALID_CONTINUOUS_TOKEN.String())
×
854
                }
×
855
                builder = builder.Where(squirrel.GtOrEq{"id": v})
1✔
856
        }
857

858
        builder = builder.OrderBy("id").Limit(uint64(pagination.PageSize() + 1))
4✔
859

4✔
860
        // Generate the SQL query and arguments.
4✔
861
        var query string
4✔
862
        var args []interface{}
4✔
863
        query, args, err = builder.ToSql()
4✔
864
        if err != nil {
4✔
865
                span.RecordError(err)
×
866
                span.SetStatus(codes.Error, err.Error())
×
867

×
868
                slog.Error("Failed to generate SQL query: ", slog.Any("error", err))
×
869

×
870
                return nil, database.NewNoopContinuousToken().Encode(), errors.New(base.ErrorCode_ERROR_CODE_SQL_BUILDER.String())
×
871
        }
×
872

873
        slog.Debug("Generated SQL query: ", slog.String("query", query), "with args", slog.Any("arguments", args))
4✔
874

4✔
875
        // Execute the query and retrieve the rows.
4✔
876
        var rows *sql.Rows
4✔
877
        rows, err = tx.QueryContext(ctx, query, args...)
4✔
878
        if err != nil {
4✔
879
                span.RecordError(err)
×
880
                span.SetStatus(codes.Error, err.Error())
×
881

×
882
                slog.Error("Failed to execute SQL query: ", slog.Any("error", err))
×
883

×
884
                return nil, database.NewNoopContinuousToken().Encode(), errors.New(base.ErrorCode_ERROR_CODE_EXECUTION.String())
×
885
        }
×
886
        defer rows.Close()
4✔
887

4✔
888
        var lastID uint64
4✔
889

4✔
890
        // Iterate through the rows and scan the result into a RelationTuple struct.
4✔
891
        subjectIDs := make([]string, 0, pagination.PageSize()+1)
4✔
892
        for rows.Next() {
14✔
893
                var subjectID string
10✔
894
                err = rows.Scan(&lastID, &subjectID)
10✔
895
                if err != nil {
10✔
896
                        span.RecordError(err)
×
897
                        span.SetStatus(codes.Error, err.Error())
×
898

×
899
                        slog.Error("Failed to scan result rows: ", slog.Any("error", err))
×
900

×
901
                        return nil, nil, err
×
902
                }
×
903
                subjectIDs = append(subjectIDs, subjectID)
10✔
904
        }
905
        // Check for any errors during iteration.
906
        if err = rows.Err(); err != nil {
4✔
907
                span.RecordError(err)
×
908
                span.SetStatus(codes.Error, err.Error())
×
909

×
910
                slog.Error("Failed to process result rows: ", slog.Any("error", err))
×
911

×
912
                return nil, nil, err
×
913
        }
×
914

915
        // Commit the transaction.
916
        err = tx.Commit()
4✔
917
        if err != nil {
4✔
918
                span.RecordError(err)
×
919
                span.SetStatus(codes.Error, err.Error())
×
920

×
921
                slog.Error("Failed to commit transaction: ", slog.Any("error", err))
×
922

×
923
                return nil, nil, err
×
924
        }
×
925

926
        slog.Info("Successfully retrieved unique subject references from the database.")
4✔
927

4✔
928
        // Return the results and encoded continuous token for pagination.
4✔
929
        if len(subjectIDs) > int(pagination.PageSize()) {
5✔
930
                return subjectIDs[:pagination.PageSize()], utils.NewContinuousToken(strconv.FormatUint(lastID, 10)).Encode(), nil
1✔
931
        }
1✔
932

933
        return subjectIDs, database.NewNoopContinuousToken().Encode(), nil
3✔
934
}
935

936
// HeadSnapshot retrieves the latest snapshot token associated with the tenant.
937
func (r *DataReader) HeadSnapshot(ctx context.Context, tenantID string) (token.SnapToken, error) {
1✔
938
        // Start a new trace span and end it when the function exits.
1✔
939
        ctx, span := tracer.Start(ctx, "data-reader.head-snapshot")
1✔
940
        defer span.End()
1✔
941

1✔
942
        slog.Info("Getting headsnapshot for tenantID: ", slog.String("tenant_id", tenantID))
1✔
943

1✔
944
        var xid types.XID8
1✔
945

1✔
946
        // Build the query to find the highest transaction ID associated with the tenant.
1✔
947
        builder := r.database.Builder.Select("id").From(TransactionsTable).Where(squirrel.Eq{"tenant_id": tenantID}).OrderBy("id DESC").Limit(1)
1✔
948
        query, args, err := builder.ToSql()
1✔
949
        if err != nil {
1✔
950
                span.RecordError(err)
×
951
                span.SetStatus(codes.Error, err.Error())
×
952

×
953
                slog.Error("Failed to build the query: ", slog.Any("error", err))
×
954

×
955
                return nil, errors.New(base.ErrorCode_ERROR_CODE_SQL_BUILDER.String())
×
956
        }
×
957

958
        // Execute the query and retrieve the highest transaction ID.
959
        row := r.database.DB.QueryRowContext(ctx, query, args...)
1✔
960
        err = row.Scan(&xid)
1✔
961
        if err != nil {
1✔
962
                span.RecordError(err)
×
963
                span.SetStatus(codes.Error, err.Error())
×
964
                // If no rows are found, return a snapshot token with a value of 0.
×
965
                if errors.Is(err, sql.ErrNoRows) {
×
966
                        return snapshot.Token{Value: types.XID8{Uint: 0}}, nil
×
967
                }
×
968

969
                slog.Error("Failed to execute query: ", slog.Any("error", err))
×
970

×
971
                return nil, err
×
972
        }
973

974
        slog.Info("Successfully retrieved latest snapshot token")
1✔
975

1✔
976
        // Return the latest snapshot token associated with the tenant.
1✔
977
        return snapshot.Token{Value: xid}, nil
1✔
978
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc