• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

Permify / permify / 10549558409

25 Aug 2024 07:29PM UTC coverage: 79.849%. Remained the same
10549558409

push

github

web-flow
Merge pull request #1495 from Permify/dependabot/go_modules/github.com/rs/xid-1.6.0

8020 of 10044 relevant lines covered (79.85%)

114.59 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

70.72
/internal/storage/postgres/dataWriter.go
1
package postgres
2

3
import (
4
        "context"
5
        "errors"
6
        "log/slog"
7

8
        "github.com/jackc/pgx/v5"
9

10
        "github.com/Masterminds/squirrel"
11
        "github.com/golang/protobuf/jsonpb"
12
        "github.com/jackc/pgx/v5/pgconn"
13

14
        "github.com/Permify/permify/internal/storage/postgres/snapshot"
15
        "github.com/Permify/permify/internal/storage/postgres/types"
16
        "github.com/Permify/permify/internal/storage/postgres/utils"
17
        "github.com/Permify/permify/internal/validation"
18
        "github.com/Permify/permify/pkg/bundle"
19
        "github.com/Permify/permify/pkg/database"
20
        db "github.com/Permify/permify/pkg/database/postgres"
21
        base "github.com/Permify/permify/pkg/pb/base/v1"
22
        "github.com/Permify/permify/pkg/token"
23
        "github.com/Permify/permify/pkg/tuple"
24
)
25

26
// DataWriter - Structure for Data Writer
27
type DataWriter struct {
28
        database *db.Postgres
29
        // options
30
        txOptions pgx.TxOptions
31
}
32

33
func NewDataWriter(database *db.Postgres) *DataWriter {
15✔
34
        return &DataWriter{
15✔
35
                database:  database,
15✔
36
                txOptions: pgx.TxOptions{IsoLevel: pgx.Serializable, AccessMode: pgx.ReadWrite},
15✔
37
        }
15✔
38
}
15✔
39

40
// Write method writes a collection of tuples and attributes to the database for a specific tenant.
41
// It returns an EncodedSnapToken upon successful write or an error if the write fails.
42
func (w *DataWriter) Write(
43
        ctx context.Context,
44
        tenantID string,
45
        tupleCollection *database.TupleCollection,
46
        attributeCollection *database.AttributeCollection,
47
) (token token.EncodedSnapToken, err error) {
21✔
48
        // Start a new tracing span for this operation.
21✔
49
        ctx, span := tracer.Start(ctx, "data-writer.write")
21✔
50
        defer span.End() // Ensure that the span is ended when the function returns.
21✔
51

21✔
52
        // Log the start of a data write operation.
21✔
53
        slog.DebugContext(ctx, "writing data for tenant_id", slog.String("tenant_id", tenantID), "max retries", slog.Any("max_retries", w.database.GetMaxRetries()))
21✔
54

21✔
55
        // Check if the total number of tuples and attributes exceeds the maximum allowed per write.
21✔
56
        if len(tupleCollection.GetTuples())+len(attributeCollection.GetAttributes()) > w.database.GetMaxDataPerWrite() {
21✔
57
                return nil, errors.New(base.ErrorCode_ERROR_CODE_MAX_DATA_PER_WRITE_EXCEEDED.String())
×
58
        }
×
59

60
        // Retry loop for handling transient errors like serialization issues.
61
        for i := 0; i <= w.database.GetMaxRetries(); i++ {
42✔
62
                // Attempt to write the data to the database.
21✔
63
                tkn, err := w.write(ctx, tenantID, tupleCollection, attributeCollection)
21✔
64
                if err != nil {
21✔
65
                        // Check if the error is due to serialization, and if so, retry.
×
66
                        if utils.IsSerializationRelatedError(err) || pgconn.SafeToRetry(err) {
×
67
                                slog.WarnContext(ctx, "serialization error occurred", slog.String("tenant_id", tenantID), slog.Int("retry", i))
×
68
                                utils.WaitWithBackoff(ctx, tenantID, i)
×
69
                                continue // Retry the operation.
×
70
                        }
71
                        // If the error is not serialization-related, handle it and return.
72
                        return nil, utils.HandleError(ctx, span, err, base.ErrorCode_ERROR_CODE_DATASTORE)
×
73
                }
74
                // If to write is successful, return the token.
75
                return tkn, nil
21✔
76
        }
77

78
        // Log an error if the operation failed after reaching the maximum number of retries.
79
        slog.ErrorContext(ctx, "max retries reached", slog.Any("error", errors.New(base.ErrorCode_ERROR_CODE_ERROR_MAX_RETRIES.String())))
×
80

×
81
        // Return an error indicating that the maximum number of retries has been reached.
×
82
        return nil, errors.New(base.ErrorCode_ERROR_CODE_ERROR_MAX_RETRIES.String())
×
83
}
84

85
// Delete method removes data from the database based on the provided tuple and attribute filters.
86
// It returns an EncodedSnapToken upon successful deletion or an error if the deletion fails.
87
func (w *DataWriter) Delete(
88
        ctx context.Context,
89
        tenantID string,
90
        tupleFilter *base.TupleFilter,
91
        attributeFilter *base.AttributeFilter,
92
) (token.EncodedSnapToken, error) {
6✔
93
        // Start a new tracing span for this delete operation.
6✔
94
        ctx, span := tracer.Start(ctx, "data-writer.delete")
6✔
95
        defer span.End() // Ensure that the span is ended when the function returns.
6✔
96

6✔
97
        // Log the start of a data deletion operation.
6✔
98
        slog.DebugContext(ctx, "deleting data for tenant_id", slog.String("tenant_id", tenantID), "max retries", slog.Any("max_retries", w.database.GetMaxRetries()))
6✔
99

6✔
100
        // Retry loop for handling transient errors like serialization issues.
6✔
101
        for i := 0; i <= w.database.GetMaxRetries(); i++ {
12✔
102
                // Attempt to delete the data from the database.
6✔
103
                tkn, err := w.delete(ctx, tenantID, tupleFilter, attributeFilter)
6✔
104
                if err != nil {
6✔
105
                        // Check if the error is due to serialization, and if so, retry.
×
106
                        if utils.IsSerializationRelatedError(err) || pgconn.SafeToRetry(err) {
×
107
                                slog.WarnContext(ctx, "serialization error occurred", slog.String("tenant_id", tenantID), slog.Int("retry", i))
×
108
                                utils.WaitWithBackoff(ctx, tenantID, i)
×
109
                                continue // Retry the operation.
×
110
                        }
111
                        // If the error is not serialization-related, handle it and return.
112
                        return nil, utils.HandleError(ctx, span, err, base.ErrorCode_ERROR_CODE_DATASTORE)
×
113
                }
114
                // If the delete operation is successful, return the token.
115
                return tkn, nil
6✔
116
        }
117

118
        // Log an error if the operation failed after reaching the maximum number of retries.
119
        slog.DebugContext(ctx, "max retries reached", slog.Any("error", errors.New(base.ErrorCode_ERROR_CODE_ERROR_MAX_RETRIES.String())))
×
120

×
121
        // Return an error indicating that the maximum number of retries has been reached.
×
122
        return nil, errors.New(base.ErrorCode_ERROR_CODE_ERROR_MAX_RETRIES.String())
×
123
}
124

125
// RunBundle executes a bundle of operations in the context of a given tenant.
126
// It returns an EncodedSnapToken upon successful completion or an error if the operation fails.
127
func (w *DataWriter) RunBundle(
128
        ctx context.Context,
129
        tenantID string,
130
        arguments map[string]string,
131
        b *base.DataBundle,
132
) (token.EncodedSnapToken, error) {
1✔
133
        // Start a new tracing span for this operation.
1✔
134
        ctx, span := tracer.Start(ctx, "data-writer.run-bundle")
1✔
135
        defer span.End() // Ensure that the span is ended when the function returns.
1✔
136

1✔
137
        // Log the start of running a bundle operation.
1✔
138
        slog.DebugContext(ctx, "running bundle for tenant_id", slog.String("tenant_id", tenantID), "max retries", slog.Any("max_retries", w.database.GetMaxRetries()))
1✔
139

1✔
140
        // Retry loop for handling transient errors like serialization issues.
1✔
141
        for i := 0; i <= w.database.GetMaxRetries(); i++ {
2✔
142
                // Attempt to run the bundle operation.
1✔
143
                tkn, err := w.runBundle(ctx, tenantID, arguments, b)
1✔
144
                if err != nil {
1✔
145
                        // Check if the error is due to serialization, and if so, retry.
×
146
                        if utils.IsSerializationRelatedError(err) || pgconn.SafeToRetry(err) {
×
147
                                slog.WarnContext(ctx, "serialization error occurred", slog.String("tenant_id", tenantID), slog.Int("retry", i))
×
148
                                utils.WaitWithBackoff(ctx, tenantID, i)
×
149
                                continue // Retry the operation.
×
150
                        }
151
                        // If the error is not serialization-related, handle it and return.
152
                        return nil, utils.HandleError(ctx, span, err, base.ErrorCode_ERROR_CODE_DATASTORE)
×
153
                }
154
                // If the operation is successful, return the token.
155
                return tkn, nil
1✔
156
        }
157

158
        // Log an error if the operation failed after reaching the maximum number of retries.
159
        slog.ErrorContext(ctx, "max retries reached", slog.Any("error", errors.New(base.ErrorCode_ERROR_CODE_ERROR_MAX_RETRIES.String())))
×
160

×
161
        // Return an error indicating that the maximum number of retries has been reached.
×
162
        return nil, errors.New(base.ErrorCode_ERROR_CODE_ERROR_MAX_RETRIES.String())
×
163
}
164

165
// write handles the database writing of tuple and attribute collections for a given tenant.
166
// It returns an EncodedSnapToken upon successful write or an error if the write fails.
167
func (w *DataWriter) write(
168
        ctx context.Context,
169
        tenantID string,
170
        tupleCollection *database.TupleCollection,
171
        attributeCollection *database.AttributeCollection,
172
) (token token.EncodedSnapToken, err error) {
21✔
173
        var tx pgx.Tx
21✔
174
        tx, err = w.database.WritePool.BeginTx(ctx, w.txOptions)
21✔
175
        if err != nil {
21✔
176
                return nil, err
×
177
        }
×
178

179
        defer func() {
42✔
180
                _ = tx.Rollback(ctx)
21✔
181
        }()
21✔
182

183
        var xid types.XID8
21✔
184
        err = tx.QueryRow(ctx, utils.TransactionTemplate, tenantID).Scan(&xid)
21✔
185
        if err != nil {
21✔
186
                return nil, err
×
187
        }
×
188

189
        slog.DebugContext(ctx, "retrieved transaction", slog.Any("xid", xid), "for tenant", slog.Any("tenant_id", tenantID))
21✔
190

21✔
191
        slog.DebugContext(ctx, "processing tuples and executing insert query")
21✔
192

21✔
193
        batch := &pgx.Batch{}
21✔
194

21✔
195
        if len(tupleCollection.GetTuples()) > 0 {
36✔
196
                err = w.batchUpdateRelationships(batch, xid, tenantID, buildDeleteClausesForRelationships(tupleCollection))
15✔
197
                if err != nil {
15✔
198
                        return nil, err
×
199
                }
×
200
                err = w.batchInsertRelationships(batch, xid, tenantID, tupleCollection)
15✔
201
                if err != nil {
15✔
202
                        return nil, err
×
203
                }
×
204
        }
205

206
        if len(attributeCollection.GetAttributes()) > 0 {
37✔
207
                err = w.batchUpdateAttributes(batch, xid, tenantID, buildDeleteClausesForAttributes(attributeCollection))
16✔
208
                if err != nil {
16✔
209
                        return nil, err
×
210
                }
×
211
                err = w.batchInsertAttributes(batch, xid, tenantID, attributeCollection)
16✔
212
                if err != nil {
16✔
213
                        return nil, err
×
214
                }
×
215
        }
216

217
        batchResult := tx.SendBatch(ctx, batch)
21✔
218
        for i := 0; i < batch.Len(); i++ {
209✔
219
                _, err = batchResult.Exec()
188✔
220
                if err != nil {
188✔
221
                        err = batchResult.Close()
×
222
                        if err != nil {
×
223
                                return nil, err
×
224
                        }
×
225
                        return nil, err
×
226
                }
227
        }
228

229
        err = batchResult.Close()
21✔
230
        if err != nil {
21✔
231
                return nil, err
×
232
        }
×
233

234
        if err = tx.Commit(ctx); err != nil {
21✔
235
                return nil, err
×
236
        }
×
237

238
        slog.DebugContext(ctx, "data successfully written to the database")
21✔
239

21✔
240
        return snapshot.NewToken(xid).Encode(), nil
21✔
241
}
242

243
// delete handles the deletion of tuples and attributes from the database based on provided filters.
244
// It returns an EncodedSnapToken upon successful deletion or an error if the deletion fails.
245
func (w *DataWriter) delete(
246
        ctx context.Context,
247
        tenantID string,
248
        tupleFilter *base.TupleFilter,
249
        attributeFilter *base.AttributeFilter,
250
) (token token.EncodedSnapToken, err error) {
6✔
251
        var tx pgx.Tx
6✔
252
        tx, err = w.database.WritePool.BeginTx(ctx, w.txOptions)
6✔
253
        if err != nil {
6✔
254
                return nil, err
×
255
        }
×
256

257
        defer func() {
12✔
258
                _ = tx.Rollback(ctx)
6✔
259
        }()
6✔
260

261
        var xid types.XID8
6✔
262
        err = tx.QueryRow(ctx, utils.TransactionTemplate, tenantID).Scan(&xid)
6✔
263
        if err != nil {
6✔
264
                return nil, err
×
265
        }
×
266

267
        slog.DebugContext(ctx, "retrieved transaction", slog.Any("xid", xid), "for tenant", slog.Any("tenant_id", tenantID))
6✔
268

6✔
269
        slog.DebugContext(ctx, "processing tuple and executing update query")
6✔
270

6✔
271
        if !validation.IsTupleFilterEmpty(tupleFilter) {
9✔
272
                tbuilder := w.database.Builder.Update(RelationTuplesTable).Set("expired_tx_id", xid).Where(squirrel.Eq{"expired_tx_id": "0", "tenant_id": tenantID})
3✔
273
                tbuilder = utils.TuplesFilterQueryForUpdateBuilder(tbuilder, tupleFilter)
3✔
274

3✔
275
                var tquery string
3✔
276
                var targs []interface{}
3✔
277

3✔
278
                tquery, targs, err = tbuilder.ToSql()
3✔
279
                if err != nil {
3✔
280
                        return nil, err
×
281
                }
×
282

283
                _, err = tx.Exec(ctx, tquery, targs...)
3✔
284
                if err != nil {
3✔
285
                        return nil, err
×
286
                }
×
287
        }
288

289
        slog.DebugContext(ctx, "processing attribute and executing update query")
6✔
290

6✔
291
        if !validation.IsAttributeFilterEmpty(attributeFilter) {
11✔
292
                abuilder := w.database.Builder.Update(AttributesTable).Set("expired_tx_id", xid).Where(squirrel.Eq{"expired_tx_id": "0", "tenant_id": tenantID})
5✔
293
                abuilder = utils.AttributesFilterQueryForUpdateBuilder(abuilder, attributeFilter)
5✔
294

5✔
295
                var aquery string
5✔
296
                var aargs []interface{}
5✔
297

5✔
298
                aquery, aargs, err = abuilder.ToSql()
5✔
299
                if err != nil {
5✔
300
                        return nil, err
×
301
                }
×
302

303
                _, err = tx.Exec(ctx, aquery, aargs...)
5✔
304
                if err != nil {
5✔
305
                        return nil, err
×
306
                }
×
307
        }
308

309
        if err = tx.Commit(ctx); err != nil {
6✔
310
                return nil, err
×
311
        }
×
312

313
        slog.DebugContext(ctx, "data successfully deleted from the database")
6✔
314

6✔
315
        return snapshot.NewToken(xid).Encode(), nil
6✔
316
}
317

318
// runBundle executes a series of operations defined in a DataBundle within a single database transaction.
319
// It returns an EncodedSnapToken upon successful execution of all operations or an error if any operation fails.
320
func (w *DataWriter) runBundle(
321
        ctx context.Context,
322
        tenantID string,
323
        arguments map[string]string,
324
        b *base.DataBundle,
325
) (token token.EncodedSnapToken, err error) {
1✔
326
        var tx pgx.Tx
1✔
327
        tx, err = w.database.WritePool.BeginTx(ctx, w.txOptions)
1✔
328
        if err != nil {
1✔
329
                return nil, err
×
330
        }
×
331

332
        defer func() {
2✔
333
                _ = tx.Rollback(ctx)
1✔
334
        }()
1✔
335

336
        var xid types.XID8
1✔
337
        err = tx.QueryRow(ctx, utils.TransactionTemplate, tenantID).Scan(&xid)
1✔
338
        if err != nil {
1✔
339
                return nil, err
×
340
        }
×
341

342
        slog.DebugContext(ctx, "retrieved transaction", slog.Any("xid", xid), "for tenant", slog.Any("tenant_id", tenantID))
1✔
343

1✔
344
        batch := &pgx.Batch{}
1✔
345

1✔
346
        for _, op := range b.GetOperations() {
2✔
347
                tb, ab, err := bundle.Operation(arguments, op)
1✔
348
                if err != nil {
1✔
349
                        return nil, err
×
350
                }
×
351

352
                err = w.runOperation(batch, xid, tenantID, tb, ab)
1✔
353
                if err != nil {
1✔
354
                        return nil, err
×
355
                }
×
356
        }
357

358
        batchResult := tx.SendBatch(ctx, batch)
1✔
359
        for i := 0; i < batch.Len(); i++ {
13✔
360
                _, err = batchResult.Exec()
12✔
361
                if err != nil {
12✔
362
                        err = batchResult.Close()
×
363
                        if err != nil {
×
364
                                return nil, err
×
365
                        }
×
366
                        return nil, err
×
367
                }
368
        }
369

370
        err = batchResult.Close()
1✔
371
        if err != nil {
1✔
372
                return nil, err
×
373
        }
×
374

375
        if err = tx.Commit(ctx); err != nil {
1✔
376
                return nil, err
×
377
        }
×
378

379
        return snapshot.NewToken(xid).Encode(), nil
1✔
380
}
381

382
// runOperation processes and executes database operations defined in TupleBundle and AttributeBundle within a given transaction.
383
func (w *DataWriter) runOperation(
384
        batch *pgx.Batch,
385
        xid types.XID8,
386
        tenantID string,
387
        tb database.TupleBundle,
388
        ab database.AttributeBundle,
389
) (err error) {
1✔
390
        slog.Debug("processing bundles queries")
1✔
391
        if len(tb.Write.GetTuples()) > 0 {
2✔
392
                deleteClauses := buildDeleteClausesForRelationships(&tb.Write)
1✔
393
                err = w.batchUpdateRelationships(batch, xid, tenantID, deleteClauses)
1✔
394
                if err != nil {
1✔
395
                        return err
×
396
                }
×
397
                err = w.batchInsertRelationships(batch, xid, tenantID, &tb.Write)
1✔
398
                if err != nil {
1✔
399
                        return err
×
400
                }
×
401
        }
402

403
        if len(ab.Write.GetAttributes()) > 0 {
2✔
404
                deleteClauses := buildDeleteClausesForAttributes(&ab.Write)
1✔
405
                err = w.batchUpdateAttributes(batch, xid, tenantID, deleteClauses)
1✔
406
                if err != nil {
1✔
407
                        return err
×
408
                }
×
409

410
                err = w.batchInsertAttributes(batch, xid, tenantID, &ab.Write)
1✔
411
                if err != nil {
1✔
412
                        return err
×
413
                }
×
414
        }
415

416
        if len(tb.Delete.GetTuples()) > 0 {
2✔
417
                deleteClauses := buildDeleteClausesForRelationships(&tb.Delete)
1✔
418
                err = w.batchUpdateRelationships(batch, xid, tenantID, deleteClauses)
1✔
419
                if err != nil {
1✔
420
                        return err
×
421
                }
×
422
        }
423

424
        if len(ab.Delete.GetAttributes()) > 0 {
2✔
425
                deleteClauses := buildDeleteClausesForAttributes(&ab.Delete)
1✔
426
                err = w.batchUpdateAttributes(batch, xid, tenantID, deleteClauses)
1✔
427
                if err != nil {
1✔
428
                        return err
×
429
                }
×
430
        }
431

432
        return nil
1✔
433
}
434

435
// batchInsertTuples function for batch inserting tuples
436
func (w *DataWriter) batchInsertRelationships(batch *pgx.Batch, xid types.XID8, tenantID string, tupleCollection *database.TupleCollection) error {
16✔
437
        titer := tupleCollection.CreateTupleIterator()
16✔
438
        for titer.HasNext() {
66✔
439
                t := titer.GetNext()
50✔
440
                srelation := t.GetSubject().GetRelation()
50✔
441
                if srelation == tuple.ELLIPSIS {
50✔
442
                        srelation = ""
×
443
                }
×
444
                batch.Queue(
50✔
445
                        "INSERT INTO relation_tuples (entity_type, entity_id, relation, subject_type, subject_id, subject_relation, created_tx_id, tenant_id) VALUES ($1, $2, $3, $4, $5, $6, $7, $8)",
50✔
446
                        t.GetEntity().GetType(), t.GetEntity().GetId(), t.GetRelation(), t.GetSubject().GetType(), t.GetSubject().GetId(), srelation, xid, tenantID,
50✔
447
                )
50✔
448
        }
449
        return nil
16✔
450
}
451

452
// batchUpdateTuples function for batch updating tuples
453
func (w *DataWriter) batchUpdateRelationships(batch *pgx.Batch, xid types.XID8, tenantID string, deleteClauses []squirrel.Eq) error {
17✔
454
        for _, condition := range deleteClauses {
68✔
455
                query, args, err := w.database.Builder.Update(RelationTuplesTable).
51✔
456
                        Set("expired_tx_id", xid).
51✔
457
                        Where(squirrel.Eq{"expired_tx_id": "0", "tenant_id": tenantID}).
51✔
458
                        Where(condition).
51✔
459
                        ToSql()
51✔
460
                if err != nil {
51✔
461
                        return err
×
462
                }
×
463
                batch.Queue(query, args...)
51✔
464
        }
465
        return nil
17✔
466
}
467

468
// buildDeleteClauses function to build delete clauses for tuples
469
func buildDeleteClausesForRelationships(tupleCollection *database.TupleCollection) []squirrel.Eq {
17✔
470
        deleteClauses := make([]squirrel.Eq, 0)
17✔
471

17✔
472
        titer := tupleCollection.CreateTupleIterator()
17✔
473
        for titer.HasNext() {
68✔
474
                t := titer.GetNext()
51✔
475
                srelation := t.GetSubject().GetRelation()
51✔
476
                if srelation == tuple.ELLIPSIS {
51✔
477
                        srelation = ""
×
478
                }
×
479

480
                condition := squirrel.Eq{
51✔
481
                        "entity_type":      t.GetEntity().GetType(),
51✔
482
                        "entity_id":        t.GetEntity().GetId(),
51✔
483
                        "relation":         t.GetRelation(),
51✔
484
                        "subject_type":     t.GetSubject().GetType(),
51✔
485
                        "subject_id":       t.GetSubject().GetId(),
51✔
486
                        "subject_relation": srelation,
51✔
487
                }
51✔
488

51✔
489
                deleteClauses = append(deleteClauses, condition)
51✔
490
        }
491

492
        return deleteClauses
17✔
493
}
494

495
// batchInsertAttributes function for batch inserting attributes
496
func (w *DataWriter) batchInsertAttributes(batch *pgx.Batch, xid types.XID8, tenantID string, attributeCollection *database.AttributeCollection) error {
17✔
497
        m := jsonpb.Marshaler{}
17✔
498
        aiter := attributeCollection.CreateAttributeIterator()
17✔
499
        for aiter.HasNext() {
66✔
500
                a := aiter.GetNext()
49✔
501

49✔
502
                jsonStr, err := m.MarshalToString(a.GetValue())
49✔
503
                if err != nil {
49✔
504
                        return err
×
505
                }
×
506

507
                batch.Queue(
49✔
508
                        "INSERT INTO attributes (entity_type, entity_id, attribute, value, created_tx_id, tenant_id) VALUES ($1, $2, $3, $4, $5, $6)",
49✔
509
                        a.GetEntity().GetType(), a.GetEntity().GetId(), a.GetAttribute(), jsonStr, xid, tenantID,
49✔
510
                )
49✔
511
        }
512
        return nil
17✔
513
}
514

515
// batchUpdateAttributes function for batch updating attributes
516
func (w *DataWriter) batchUpdateAttributes(batch *pgx.Batch, xid types.XID8, tenantID string, deleteClauses []squirrel.Eq) error {
18✔
517
        for _, condition := range deleteClauses {
68✔
518
                query, args, err := w.database.Builder.Update(AttributesTable).
50✔
519
                        Set("expired_tx_id", xid).
50✔
520
                        Where(squirrel.Eq{"expired_tx_id": "0", "tenant_id": tenantID}).
50✔
521
                        Where(condition).
50✔
522
                        ToSql()
50✔
523
                if err != nil {
50✔
524
                        return err
×
525
                }
×
526

527
                batch.Queue(query, args...)
50✔
528
        }
529
        return nil
18✔
530
}
531

532
// buildDeleteClausesForAttributes function to build delete clauses for attributes
533
func buildDeleteClausesForAttributes(attributeCollection *database.AttributeCollection) []squirrel.Eq {
18✔
534
        deleteClauses := make([]squirrel.Eq, 0)
18✔
535

18✔
536
        aiter := attributeCollection.CreateAttributeIterator()
18✔
537
        for aiter.HasNext() {
68✔
538
                a := aiter.GetNext()
50✔
539

50✔
540
                condition := squirrel.Eq{
50✔
541
                        "entity_type": a.GetEntity().GetType(),
50✔
542
                        "entity_id":   a.GetEntity().GetId(),
50✔
543
                        "attribute":   a.GetAttribute(),
50✔
544
                }
50✔
545

50✔
546
                deleteClauses = append(deleteClauses, condition)
50✔
547
        }
50✔
548

549
        return deleteClauses
18✔
550
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc