• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

Permify / permify / 7231443282

16 Dec 2023 10:44AM UTC coverage: 77.237% (+0.02%) from 77.214%
7231443282

push

github

web-flow
Merge pull request #940 from Permify/next

Next

6664 of 8628 relevant lines covered (77.24%)

53.2 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

64.59
/internal/storage/postgres/dataWriter.go
1
package postgres
2

3
import (
4
        "context"
5
        "database/sql"
6
        "errors"
7
        "log/slog"
8
        "strings"
9

10
        "github.com/golang/protobuf/jsonpb"
11

12
        "github.com/Masterminds/squirrel"
13

14
        "github.com/Permify/permify/internal/storage/postgres/snapshot"
15
        "github.com/Permify/permify/internal/storage/postgres/types"
16
        "github.com/Permify/permify/internal/storage/postgres/utils"
17
        "github.com/Permify/permify/internal/validation"
18
        "github.com/Permify/permify/pkg/bundle"
19
        "github.com/Permify/permify/pkg/database"
20
        db "github.com/Permify/permify/pkg/database/postgres"
21
        base "github.com/Permify/permify/pkg/pb/base/v1"
22
        "github.com/Permify/permify/pkg/token"
23
        "github.com/Permify/permify/pkg/tuple"
24
)
25

26
// DataWriter - Structure for Data Writer
27
type DataWriter struct {
28
        database *db.Postgres
29
        // options
30
        txOptions       sql.TxOptions
31
        maxDataPerWrite int
32
        maxRetries      int
33
}
34

35
func NewDataWriter(database *db.Postgres) *DataWriter {
14✔
36
        return &DataWriter{
14✔
37
                database:        database,
14✔
38
                txOptions:       sql.TxOptions{Isolation: sql.LevelSerializable, ReadOnly: false},
14✔
39
                maxDataPerWrite: _defaultMaxDataPerWrite,
14✔
40
                maxRetries:      _defaultMaxRetries,
14✔
41
        }
14✔
42
}
14✔
43

44
// Write method writes a collection of tuples and attributes to the database for a specific tenant.
45
// It returns an EncodedSnapToken upon successful write or an error if the write fails.
46
func (w *DataWriter) Write(
47
        ctx context.Context,
48
        tenantID string,
49
        tupleCollection *database.TupleCollection,
50
        attributeCollection *database.AttributeCollection,
51
) (token token.EncodedSnapToken, err error) {
19✔
52
        // Start a new tracing span for this operation.
19✔
53
        ctx, span := tracer.Start(ctx, "data-writer.write")
19✔
54
        defer span.End() // Ensure that the span is ended when the function returns.
19✔
55

19✔
56
        // Log the start of a data write operation.
19✔
57
        slog.Info("Writing data to the database. TenantID: ", slog.String("tenant_id", tenantID), "Max Retries: ", slog.Any("max_retries", w.maxRetries))
19✔
58

19✔
59
        // Check if the total number of tuples and attributes exceeds the maximum allowed per write.
19✔
60
        if len(tupleCollection.GetTuples())+len(attributeCollection.GetAttributes()) > w.maxDataPerWrite {
19✔
61
                return nil, errors.New("max data per write exceeded")
×
62
        }
×
63

64
        // Retry loop for handling transient errors like serialization issues.
65
        for i := 0; i <= w.maxRetries; i++ {
38✔
66
                // Attempt to write the data to the database.
19✔
67
                tkn, err := w.write(ctx, tenantID, tupleCollection, attributeCollection)
19✔
68
                if err != nil {
19✔
69
                        // Check if the error is due to serialization, and if so, retry.
×
70
                        if strings.Contains(err.Error(), "could not serialize") {
×
71
                                slog.Warn("Serialization error occurred. Retrying...", slog.String("tenant_id", tenantID), slog.Int("retry", i))
×
72
                                continue // Retry the operation.
×
73
                        }
74
                        // If the error is not serialization-related, handle it and return.
75
                        return nil, utils.HandleError(span, err, base.ErrorCode_ERROR_CODE_DATASTORE)
×
76
                }
77
                // If the write is successful, return the token.
78
                return tkn, err
19✔
79
        }
80

81
        // Log an error if the operation failed after reaching the maximum number of retries.
82
        slog.Error("Failed to write data to the database. Max retries reached. Aborting operation. ", slog.Any("error", errors.New(base.ErrorCode_ERROR_CODE_ERROR_MAX_RETRIES.String())))
×
83

×
84
        // Return an error indicating that the maximum number of retries has been reached.
×
85
        return nil, errors.New(base.ErrorCode_ERROR_CODE_ERROR_MAX_RETRIES.String())
×
86
}
87

88
// Delete method removes data from the database based on the provided tuple and attribute filters.
89
// It returns an EncodedSnapToken upon successful deletion or an error if the deletion fails.
90
func (w *DataWriter) Delete(
91
        ctx context.Context,
92
        tenantID string,
93
        tupleFilter *base.TupleFilter,
94
        attributeFilter *base.AttributeFilter,
95
) (token.EncodedSnapToken, error) {
6✔
96
        // Start a new tracing span for this delete operation.
6✔
97
        ctx, span := tracer.Start(ctx, "data-writer.delete")
6✔
98
        defer span.End() // Ensure that the span is ended when the function returns.
6✔
99

6✔
100
        // Log the start of a data deletion operation.
6✔
101
        slog.Info("Deleting data from the database. TenantID: ", slog.String("tenant_id", tenantID), "Max Retries: ", slog.Any("max_retries", w.maxRetries))
6✔
102

6✔
103
        // Retry loop for handling transient errors like serialization issues.
6✔
104
        for i := 0; i <= w.maxRetries; i++ {
12✔
105
                // Attempt to delete the data from the database.
6✔
106
                tkn, err := w.delete(ctx, tenantID, tupleFilter, attributeFilter)
6✔
107
                if err != nil {
6✔
108
                        // Check if the error is due to serialization, and if so, retry.
×
109
                        if strings.Contains(err.Error(), "could not serialize") {
×
110
                                slog.Warn("Serialization error occurred. Retrying...", slog.String("tenant_id", tenantID), slog.Int("retry", i))
×
111
                                continue // Retry the operation.
×
112
                        }
113
                        // If the error is not serialization-related, handle it and return.
114
                        return nil, utils.HandleError(span, err, base.ErrorCode_ERROR_CODE_DATASTORE)
×
115
                }
116
                // If the delete operation is successful, return the token.
117
                return tkn, err
6✔
118
        }
119

120
        // Log an error if the operation failed after reaching the maximum number of retries.
121
        slog.Error("Failed to delete data from the database. Max retries reached. Aborting operation. ", slog.Any("error", errors.New(base.ErrorCode_ERROR_CODE_ERROR_MAX_RETRIES.String())))
×
122

×
123
        // Return an error indicating that the maximum number of retries has been reached.
×
124
        return nil, errors.New(base.ErrorCode_ERROR_CODE_ERROR_MAX_RETRIES.String())
×
125
}
126

127
// RunBundle executes a bundle of operations in the context of a given tenant.
128
// It returns an EncodedSnapToken upon successful completion or an error if the operation fails.
129
func (w *DataWriter) RunBundle(
130
        ctx context.Context,
131
        tenantID string,
132
        arguments map[string]string,
133
        b *base.DataBundle,
134
) (token.EncodedSnapToken, error) {
1✔
135
        // Start a new tracing span for this operation.
1✔
136
        ctx, span := tracer.Start(ctx, "data-writer.run-bundle")
1✔
137
        defer span.End() // Ensure that the span is ended when the function returns.
1✔
138

1✔
139
        // Log the start of running a bundle operation.
1✔
140
        slog.Info("Running bundle from the database. TenantID: ", slog.String("tenant_id", tenantID), "Max Retries: ", slog.Any("max_retries", w.maxRetries))
1✔
141

1✔
142
        // Retry loop for handling transient errors like serialization issues.
1✔
143
        for i := 0; i <= w.maxRetries; i++ {
2✔
144
                // Attempt to run the bundle operation.
1✔
145
                tkn, err := w.runBundle(ctx, tenantID, arguments, b)
1✔
146
                if err != nil {
1✔
147
                        // Check if the error is due to serialization, and if so, retry.
×
148
                        if strings.Contains(err.Error(), "could not serialize") {
×
149
                                slog.Warn("Serialization error occurred. Retrying...", slog.String("tenant_id", tenantID), slog.Int("retry", i))
×
150
                                continue // Retry the operation.
×
151
                        }
152
                        // If the error is not serialization-related, handle it and return.
153
                        return nil, utils.HandleError(span, err, base.ErrorCode_ERROR_CODE_DATASTORE)
×
154
                }
155
                // If the operation is successful, return the token.
156
                return tkn, err
1✔
157
        }
158

159
        // Log an error if the operation failed after reaching the maximum number of retries.
160
        slog.Error("Failed to run bundle from the database. Max retries reached. Aborting operation. ", slog.Any("error", errors.New(base.ErrorCode_ERROR_CODE_ERROR_MAX_RETRIES.String())))
×
161

×
162
        // Return an error indicating that the maximum number of retries has been reached.
×
163
        return nil, errors.New(base.ErrorCode_ERROR_CODE_ERROR_MAX_RETRIES.String())
×
164
}
165

166
// write handles the database writing of tuple and attribute collections for a given tenant.
167
// It returns an EncodedSnapToken upon successful write or an error if the write fails.
168
func (w *DataWriter) write(
169
        ctx context.Context,
170
        tenantID string,
171
        tupleCollection *database.TupleCollection,
172
        attributeCollection *database.AttributeCollection,
173
) (token token.EncodedSnapToken, err error) {
19✔
174
        var tx *sql.Tx
19✔
175
        tx, err = w.database.DB.BeginTx(ctx, &w.txOptions)
19✔
176
        if err != nil {
19✔
177
                return nil, err
×
178
        }
×
179

180
        defer func() {
38✔
181
                _ = tx.Rollback()
19✔
182
        }()
19✔
183

184
        transaction := w.database.Builder.Insert("transactions").
19✔
185
                Columns("tenant_id").
19✔
186
                Values(tenantID).
19✔
187
                Suffix("RETURNING id").RunWith(tx)
19✔
188
        if err != nil {
19✔
189
                return nil, err
×
190
        }
×
191

192
        var xid types.XID8
19✔
193
        err = transaction.QueryRowContext(ctx).Scan(&xid)
19✔
194
        if err != nil {
19✔
195
                return nil, err
×
196
        }
×
197

198
        slog.Debug("Retrieved transaction: ", slog.Any("transaction", transaction), "for tenant: ", slog.Any("tenant_id", tenantID))
19✔
199

19✔
200
        slog.Debug("Processing tuples and executing insert query. ")
19✔
201

19✔
202
        if len(tupleCollection.GetTuples()) > 0 {
32✔
203

13✔
204
                tuplesInsertBuilder := w.database.Builder.Insert(RelationTuplesTable).Columns("entity_type, entity_id, relation, subject_type, subject_id, subject_relation, created_tx_id, tenant_id")
13✔
205

13✔
206
                deleteClauses := squirrel.Or{}
13✔
207

13✔
208
                titer := tupleCollection.CreateTupleIterator()
13✔
209
                for titer.HasNext() {
56✔
210
                        t := titer.GetNext()
43✔
211
                        srelation := t.GetSubject().GetRelation()
43✔
212
                        if srelation == tuple.ELLIPSIS {
43✔
213
                                srelation = ""
×
214
                        }
×
215

216
                        // Build the condition for this tuple.
217
                        condition := squirrel.Eq{
43✔
218
                                "entity_type":      t.GetEntity().GetType(),
43✔
219
                                "entity_id":        t.GetEntity().GetId(),
43✔
220
                                "relation":         t.GetRelation(),
43✔
221
                                "subject_type":     t.GetSubject().GetType(),
43✔
222
                                "subject_id":       t.GetSubject().GetId(),
43✔
223
                                "subject_relation": srelation,
43✔
224
                        }
43✔
225

43✔
226
                        // Add the condition to the OR slice.
43✔
227
                        deleteClauses = append(deleteClauses, condition)
43✔
228

43✔
229
                        tuplesInsertBuilder = tuplesInsertBuilder.Values(t.GetEntity().GetType(), t.GetEntity().GetId(), t.GetRelation(), t.GetSubject().GetType(), t.GetSubject().GetId(), srelation, xid, tenantID)
43✔
230
                }
231

232
                tDeleteBuilder := w.database.Builder.Update(RelationTuplesTable).Set("expired_tx_id", xid).Where(squirrel.Eq{
13✔
233
                        "expired_tx_id": "0",
13✔
234
                        "tenant_id":     tenantID,
13✔
235
                }).Where(deleteClauses)
13✔
236

13✔
237
                var tdquery string
13✔
238
                var tdargs []interface{}
13✔
239

13✔
240
                tdquery, tdargs, err = tDeleteBuilder.ToSql()
13✔
241
                if err != nil {
13✔
242
                        return nil, err
×
243
                }
×
244

245
                _, err = tx.ExecContext(ctx, tdquery, tdargs...)
13✔
246
                if err != nil {
13✔
247
                        return nil, err
×
248
                }
×
249

250
                var tiquery string
13✔
251
                var tiargs []interface{}
13✔
252

13✔
253
                tiquery, tiargs, err = tuplesInsertBuilder.ToSql()
13✔
254
                if err != nil {
13✔
255
                        return nil, err
×
256
                }
×
257

258
                _, err = tx.ExecContext(ctx, tiquery, tiargs...)
13✔
259
                if err != nil {
13✔
260
                        return nil, err
×
261
                }
×
262
        }
263

264
        if len(attributeCollection.GetAttributes()) > 0 {
34✔
265

15✔
266
                attributesInsertBuilder := w.database.Builder.Insert(AttributesTable).Columns("entity_type, entity_id, attribute, value, created_tx_id, tenant_id")
15✔
267

15✔
268
                deleteClauses := squirrel.Or{}
15✔
269

15✔
270
                aiter := attributeCollection.CreateAttributeIterator()
15✔
271
                for aiter.HasNext() {
59✔
272
                        a := aiter.GetNext()
44✔
273

44✔
274
                        m := jsonpb.Marshaler{}
44✔
275
                        jsonStr, err := m.MarshalToString(a.GetValue())
44✔
276
                        if err != nil {
44✔
277
                                return nil, err
×
278
                        }
×
279

280
                        // Build the condition for this attribute.
281
                        condition := squirrel.Eq{
44✔
282
                                "entity_type": a.GetEntity().GetType(),
44✔
283
                                "entity_id":   a.GetEntity().GetId(),
44✔
284
                                "attribute":   a.GetAttribute(),
44✔
285
                        }
44✔
286

44✔
287
                        // Add the condition to the OR slice.
44✔
288
                        deleteClauses = append(deleteClauses, condition)
44✔
289

44✔
290
                        attributesInsertBuilder = attributesInsertBuilder.Values(a.GetEntity().GetType(), a.GetEntity().GetId(), a.GetAttribute(), jsonStr, xid, tenantID)
44✔
291
                }
292

293
                aDeleteBuilder := w.database.Builder.Update(AttributesTable).Set("expired_tx_id", xid).Where(squirrel.Eq{
15✔
294
                        "expired_tx_id": "0",
15✔
295
                        "tenant_id":     tenantID,
15✔
296
                }).Where(deleteClauses)
15✔
297

15✔
298
                var adquery string
15✔
299
                var adargs []interface{}
15✔
300

15✔
301
                adquery, adargs, err = aDeleteBuilder.ToSql()
15✔
302
                if err != nil {
15✔
303
                        return nil, err
×
304
                }
×
305

306
                _, err = tx.ExecContext(ctx, adquery, adargs...)
15✔
307
                if err != nil {
15✔
308
                        return nil, err
×
309
                }
×
310

311
                var aquery string
15✔
312
                var aargs []interface{}
15✔
313

15✔
314
                aquery, aargs, err = attributesInsertBuilder.ToSql()
15✔
315
                if err != nil {
15✔
316
                        return nil, err
×
317
                }
×
318

319
                _, err = tx.ExecContext(ctx, aquery, aargs...)
15✔
320
                if err != nil {
15✔
321
                        return nil, err
×
322
                }
×
323
        }
324

325
        if err = tx.Commit(); err != nil {
19✔
326
                return nil, err
×
327
        }
×
328

329
        slog.Info("Data successfully written to the database.")
19✔
330

19✔
331
        return snapshot.NewToken(xid).Encode(), nil
19✔
332
}
333

334
// delete handles the deletion of tuples and attributes from the database based on provided filters.
335
// It returns an EncodedSnapToken upon successful deletion or an error if the deletion fails.
336
func (w *DataWriter) delete(
337
        ctx context.Context,
338
        tenantID string,
339
        tupleFilter *base.TupleFilter,
340
        attributeFilter *base.AttributeFilter,
341
) (token token.EncodedSnapToken, err error) {
6✔
342
        var tx *sql.Tx
6✔
343
        tx, err = w.database.DB.BeginTx(ctx, &w.txOptions)
6✔
344
        if err != nil {
6✔
345
                return nil, err
×
346
        }
×
347

348
        defer func() {
12✔
349
                _ = tx.Rollback()
6✔
350
        }()
6✔
351

352
        transaction := w.database.Builder.Insert("transactions").
6✔
353
                Columns("tenant_id").
6✔
354
                Values(tenantID).
6✔
355
                Suffix("RETURNING id").RunWith(tx)
6✔
356
        if err != nil {
6✔
357
                return nil, err
×
358
        }
×
359

360
        var xid types.XID8
6✔
361
        err = transaction.QueryRowContext(ctx).Scan(&xid)
6✔
362
        if err != nil {
6✔
363
                return nil, err
×
364
        }
×
365

366
        slog.Debug("Retrieved transaction: ", slog.Any("transaction", transaction), "for tenant: ", slog.Any("tenant_id", tenantID))
6✔
367

6✔
368
        slog.Debug("Processing tuple and executing update query. ")
6✔
369

6✔
370
        if !validation.IsTupleFilterEmpty(tupleFilter) {
9✔
371
                tbuilder := w.database.Builder.Update(RelationTuplesTable).Set("expired_tx_id", xid).Where(squirrel.Eq{"expired_tx_id": "0", "tenant_id": tenantID})
3✔
372
                tbuilder = utils.TuplesFilterQueryForUpdateBuilder(tbuilder, tupleFilter)
3✔
373

3✔
374
                var tquery string
3✔
375
                var targs []interface{}
3✔
376

3✔
377
                tquery, targs, err = tbuilder.ToSql()
3✔
378
                if err != nil {
3✔
379
                        return nil, err
×
380
                }
×
381

382
                _, err = tx.ExecContext(ctx, tquery, targs...)
3✔
383
                if err != nil {
3✔
384
                        return nil, err
×
385
                }
×
386
        }
387

388
        slog.Debug("Processing attribute and executing update query.")
6✔
389

6✔
390
        if !validation.IsAttributeFilterEmpty(attributeFilter) {
11✔
391
                abuilder := w.database.Builder.Update(AttributesTable).Set("expired_tx_id", xid).Where(squirrel.Eq{"expired_tx_id": "0", "tenant_id": tenantID})
5✔
392
                abuilder = utils.AttributesFilterQueryForUpdateBuilder(abuilder, attributeFilter)
5✔
393

5✔
394
                var aquery string
5✔
395
                var aargs []interface{}
5✔
396

5✔
397
                aquery, aargs, err = abuilder.ToSql()
5✔
398
                if err != nil {
5✔
399
                        return nil, err
×
400
                }
×
401

402
                _, err = tx.ExecContext(ctx, aquery, aargs...)
5✔
403
                if err != nil {
5✔
404
                        return nil, err
×
405
                }
×
406
        }
407

408
        if err = tx.Commit(); err != nil {
6✔
409
                return nil, err
×
410
        }
×
411

412
        slog.Info("Data successfully deleted from the database.")
6✔
413

6✔
414
        return snapshot.NewToken(xid).Encode(), nil
6✔
415
}
416

417
// runBundle executes a series of operations defined in a DataBundle within a single database transaction.
418
// It returns an EncodedSnapToken upon successful execution of all operations or an error if any operation fails.
419
func (w *DataWriter) runBundle(
420
        ctx context.Context,
421
        tenantID string,
422
        arguments map[string]string,
423
        b *base.DataBundle,
424
) (token token.EncodedSnapToken, err error) {
1✔
425
        var tx *sql.Tx
1✔
426
        tx, err = w.database.DB.BeginTx(ctx, &w.txOptions)
1✔
427
        if err != nil {
1✔
428
                return nil, err
×
429
        }
×
430

431
        defer func() {
2✔
432
                _ = tx.Rollback()
1✔
433
        }()
1✔
434

435
        transaction := w.database.Builder.Insert("transactions").
1✔
436
                Columns("tenant_id").
1✔
437
                Values(tenantID).
1✔
438
                Suffix("RETURNING id").RunWith(tx)
1✔
439
        if err != nil {
1✔
440
                return nil, err
×
441
        }
×
442

443
        var xid types.XID8
1✔
444
        err = transaction.QueryRowContext(ctx).Scan(&xid)
1✔
445
        if err != nil {
1✔
446
                return nil, err
×
447
        }
×
448

449
        slog.Debug("Retrieved transaction: ", slog.Any("transaction", transaction), "for tenant: ", slog.Any("tenant_id", tenantID))
1✔
450

1✔
451
        for _, op := range b.GetOperations() {
2✔
452
                tb, ab, err := bundle.Operation(arguments, op)
1✔
453
                if err != nil {
1✔
454
                        return nil, err
×
455
                }
×
456

457
                err = w.runOperation(ctx, tx, xid, tenantID, tb, ab)
1✔
458
                if err != nil {
1✔
459
                        return nil, err
×
460
                }
×
461
        }
462

463
        if err := tx.Commit(); err != nil {
1✔
464
                return nil, err
×
465
        }
×
466

467
        return snapshot.NewToken(xid).Encode(), nil
1✔
468
}
469

470
// runOperation processes and executes database operations defined in TupleBundle and AttributeBundle within a given transaction.
471
func (w *DataWriter) runOperation(
472
        ctx context.Context,
473
        tx *sql.Tx,
474
        xid types.XID8,
475
        tenantID string,
476
        tb database.TupleBundle,
477
        ab database.AttributeBundle,
478
) (err error) {
1✔
479
        slog.Debug("Processing bundles queries. ")
1✔
480

1✔
481
        if len(tb.Write.GetTuples()) > 0 {
2✔
482

1✔
483
                tuplesInsertBuilder := w.database.Builder.Insert(RelationTuplesTable).Columns("entity_type, entity_id, relation, subject_type, subject_id, subject_relation, created_tx_id, tenant_id")
1✔
484

1✔
485
                deleteClauses := squirrel.Or{}
1✔
486

1✔
487
                titer := tb.Write.CreateTupleIterator()
1✔
488
                for titer.HasNext() {
3✔
489
                        t := titer.GetNext()
2✔
490
                        srelation := t.GetSubject().GetRelation()
2✔
491
                        if srelation == tuple.ELLIPSIS {
2✔
492
                                srelation = ""
×
493
                        }
×
494

495
                        // Build the condition for this tuple.
496
                        condition := squirrel.Eq{
2✔
497
                                "entity_type":      t.GetEntity().GetType(),
2✔
498
                                "entity_id":        t.GetEntity().GetId(),
2✔
499
                                "relation":         t.GetRelation(),
2✔
500
                                "subject_type":     t.GetSubject().GetType(),
2✔
501
                                "subject_id":       t.GetSubject().GetId(),
2✔
502
                                "subject_relation": srelation,
2✔
503
                        }
2✔
504

2✔
505
                        // Add the condition to the OR slice.
2✔
506
                        deleteClauses = append(deleteClauses, condition)
2✔
507

2✔
508
                        tuplesInsertBuilder = tuplesInsertBuilder.Values(t.GetEntity().GetType(), t.GetEntity().GetId(), t.GetRelation(), t.GetSubject().GetType(), t.GetSubject().GetId(), srelation, xid, tenantID)
2✔
509
                }
510

511
                tDeleteBuilder := w.database.Builder.Update(RelationTuplesTable).Set("expired_tx_id", xid).Where(squirrel.Eq{
1✔
512
                        "expired_tx_id": "0",
1✔
513
                        "tenant_id":     tenantID,
1✔
514
                }).Where(deleteClauses)
1✔
515

1✔
516
                var tdquery string
1✔
517
                var tdargs []interface{}
1✔
518

1✔
519
                tdquery, tdargs, err = tDeleteBuilder.ToSql()
1✔
520
                if err != nil {
1✔
521
                        return err
×
522
                }
×
523

524
                _, err = tx.ExecContext(ctx, tdquery, tdargs...)
1✔
525
                if err != nil {
1✔
526
                        return err
×
527
                }
×
528

529
                var tiquery string
1✔
530
                var tiargs []interface{}
1✔
531

1✔
532
                tiquery, tiargs, err = tuplesInsertBuilder.ToSql()
1✔
533
                if err != nil {
1✔
534
                        return err
×
535
                }
×
536

537
                _, err = tx.ExecContext(ctx, tiquery, tiargs...)
1✔
538
                if err != nil {
1✔
539
                        return err
×
540
                }
×
541
        }
542

543
        if len(ab.Write.GetAttributes()) > 0 {
2✔
544

1✔
545
                attributesInsertBuilder := w.database.Builder.Insert(AttributesTable).Columns("entity_type, entity_id, attribute, value, created_tx_id, tenant_id")
1✔
546

1✔
547
                deleteClauses := squirrel.Or{}
1✔
548

1✔
549
                aiter := ab.Write.CreateAttributeIterator()
1✔
550
                for aiter.HasNext() {
2✔
551
                        a := aiter.GetNext()
1✔
552

1✔
553
                        m := jsonpb.Marshaler{}
1✔
554
                        jsonStr, err := m.MarshalToString(a.GetValue())
1✔
555
                        if err != nil {
1✔
556
                                return err
×
557
                        }
×
558

559
                        // Build the condition for this tuple.
560
                        condition := squirrel.Eq{
1✔
561
                                "entity_type": a.GetEntity().GetType(),
1✔
562
                                "entity_id":   a.GetEntity().GetId(),
1✔
563
                                "attribute":   a.GetAttribute(),
1✔
564
                        }
1✔
565

1✔
566
                        // Add the condition to the OR slice.
1✔
567
                        deleteClauses = append(deleteClauses, condition)
1✔
568

1✔
569
                        attributesInsertBuilder = attributesInsertBuilder.Values(a.GetEntity().GetType(), a.GetEntity().GetId(), a.GetAttribute(), jsonStr, xid, tenantID)
1✔
570
                }
571

572
                tDeleteBuilder := w.database.Builder.Update(AttributesTable).Set("expired_tx_id", xid).Where(squirrel.Eq{
1✔
573
                        "expired_tx_id": "0",
1✔
574
                        "tenant_id":     tenantID,
1✔
575
                }).Where(deleteClauses)
1✔
576

1✔
577
                var adquery string
1✔
578
                var adargs []interface{}
1✔
579

1✔
580
                adquery, adargs, err = tDeleteBuilder.ToSql()
1✔
581
                if err != nil {
1✔
582
                        return err
×
583
                }
×
584

585
                _, err = tx.ExecContext(ctx, adquery, adargs...)
1✔
586
                if err != nil {
1✔
587
                        return err
×
588
                }
×
589

590
                var aquery string
1✔
591
                var aargs []interface{}
1✔
592

1✔
593
                aquery, aargs, err = attributesInsertBuilder.ToSql()
1✔
594
                if err != nil {
1✔
595
                        return err
×
596
                }
×
597

598
                _, err = tx.ExecContext(ctx, aquery, aargs...)
1✔
599
                if err != nil {
1✔
600
                        return err
×
601
                }
×
602
        }
603

604
        if len(tb.Delete.GetTuples()) > 0 {
1✔
605

×
606
                deleteClauses := squirrel.Or{}
×
607

×
608
                titer := tb.Delete.CreateTupleIterator()
×
609
                for titer.HasNext() {
×
610
                        t := titer.GetNext()
×
611
                        srelation := t.GetSubject().GetRelation()
×
612
                        if srelation == tuple.ELLIPSIS {
×
613
                                srelation = ""
×
614
                        }
×
615

616
                        // Build the condition for this tuple.
617
                        condition := squirrel.Eq{
×
618
                                "entity_type":      t.GetEntity().GetType(),
×
619
                                "entity_id":        t.GetEntity().GetId(),
×
620
                                "relation":         t.GetRelation(),
×
621
                                "subject_type":     t.GetSubject().GetType(),
×
622
                                "subject_id":       t.GetSubject().GetId(),
×
623
                                "subject_relation": srelation,
×
624
                        }
×
625

×
626
                        // Add the condition to the OR slice.
×
627
                        deleteClauses = append(deleteClauses, condition)
×
628
                }
629

630
                tDeleteBuilder := w.database.Builder.Update(RelationTuplesTable).Set("expired_tx_id", xid).Where(squirrel.Eq{
×
631
                        "expired_tx_id": "0",
×
632
                        "tenant_id":     tenantID,
×
633
                }).Where(deleteClauses)
×
634

×
635
                var tquery string
×
636
                var targs []interface{}
×
637

×
638
                tquery, targs, err = tDeleteBuilder.ToSql()
×
639
                if err != nil {
×
640
                        return err
×
641
                }
×
642

643
                _, err = tx.ExecContext(ctx, tquery, targs...)
×
644
                if err != nil {
×
645
                        return err
×
646
                }
×
647
        }
648

649
        if len(ab.Delete.GetAttributes()) > 0 {
1✔
650

×
651
                deleteClauses := squirrel.Or{}
×
652

×
653
                aiter := ab.Delete.CreateAttributeIterator()
×
654
                for aiter.HasNext() {
×
655
                        a := aiter.GetNext()
×
656

×
657
                        // Build the condition for this tuple.
×
658
                        condition := squirrel.Eq{
×
659
                                "entity_type": a.GetEntity().GetType(),
×
660
                                "entity_id":   a.GetEntity().GetId(),
×
661
                                "attribute":   a.GetAttribute(),
×
662
                        }
×
663

×
664
                        // Add the condition to the OR slice.
×
665
                        deleteClauses = append(deleteClauses, condition)
×
666

×
667
                }
×
668

669
                aDeleteBuilder := w.database.Builder.Update(AttributesTable).Set("expired_tx_id", xid).Where(squirrel.Eq{
×
670
                        "expired_tx_id": "0",
×
671
                        "tenant_id":     tenantID,
×
672
                }).Where(deleteClauses)
×
673

×
674
                var tquery string
×
675
                var targs []interface{}
×
676

×
677
                tquery, targs, err = aDeleteBuilder.ToSql()
×
678
                if err != nil {
×
679
                        return err
×
680
                }
×
681

682
                _, err = tx.ExecContext(ctx, tquery, targs...)
×
683
                if err != nil {
×
684
                        return err
×
685
                }
×
686
        }
687

688
        return nil
1✔
689
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc