• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

Permify / permify / 11803394053

12 Nov 2024 06:23PM UTC coverage: 79.994% (-0.02%) from 80.014%
11803394053

push

github

web-flow
Merge pull request #1780 from Permify/dependabot/go_modules/go.opentelemetry.io/otel/exporters/zipkin-1.32.0

build(deps): bump go.opentelemetry.io/otel/exporters/zipkin from 1.31.0 to 1.32.0

8169 of 10212 relevant lines covered (79.99%)

121.04 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

70.72
/internal/storage/postgres/dataWriter.go
1
package postgres
2

3
import (
4
        "context"
5
        "errors"
6
        "log/slog"
7

8
        "github.com/jackc/pgx/v5"
9

10
        "github.com/Masterminds/squirrel"
11
        "github.com/golang/protobuf/jsonpb"
12
        "github.com/jackc/pgx/v5/pgconn"
13

14
        "github.com/Permify/permify/internal"
15
        "github.com/Permify/permify/internal/storage/postgres/snapshot"
16
        "github.com/Permify/permify/internal/storage/postgres/types"
17
        "github.com/Permify/permify/internal/storage/postgres/utils"
18
        "github.com/Permify/permify/internal/validation"
19
        "github.com/Permify/permify/pkg/bundle"
20
        "github.com/Permify/permify/pkg/database"
21
        db "github.com/Permify/permify/pkg/database/postgres"
22
        base "github.com/Permify/permify/pkg/pb/base/v1"
23
        "github.com/Permify/permify/pkg/token"
24
        "github.com/Permify/permify/pkg/tuple"
25
)
26

27
// DataWriter - Structure for Data Writer
28
type DataWriter struct {
29
        database *db.Postgres
30
        // options
31
        txOptions pgx.TxOptions
32
}
33

34
func NewDataWriter(database *db.Postgres) *DataWriter {
14✔
35
        return &DataWriter{
14✔
36
                database:  database,
14✔
37
                txOptions: pgx.TxOptions{IsoLevel: pgx.Serializable, AccessMode: pgx.ReadWrite},
14✔
38
        }
14✔
39
}
14✔
40

41
// Write method writes a collection of tuples and attributes to the database for a specific tenant.
42
// It returns an EncodedSnapToken upon successful write or an error if the write fails.
43
func (w *DataWriter) Write(
44
        ctx context.Context,
45
        tenantID string,
46
        tupleCollection *database.TupleCollection,
47
        attributeCollection *database.AttributeCollection,
48
) (token token.EncodedSnapToken, err error) {
20✔
49
        // Start a new tracing span for this operation.
20✔
50
        ctx, span := internal.Tracer.Start(ctx, "data-writer.write")
20✔
51
        defer span.End() // Ensure that the span is ended when the function returns.
20✔
52

20✔
53
        // Log the start of a data write operation.
20✔
54
        slog.DebugContext(ctx, "writing data for tenant_id", slog.String("tenant_id", tenantID), "max retries", slog.Any("max_retries", w.database.GetMaxRetries()))
20✔
55

20✔
56
        // Check if the total number of tuples and attributes exceeds the maximum allowed per write.
20✔
57
        if len(tupleCollection.GetTuples())+len(attributeCollection.GetAttributes()) > w.database.GetMaxDataPerWrite() {
20✔
58
                return nil, errors.New(base.ErrorCode_ERROR_CODE_MAX_DATA_PER_WRITE_EXCEEDED.String())
×
59
        }
×
60

61
        // Retry loop for handling transient errors like serialization issues.
62
        for i := 0; i <= w.database.GetMaxRetries(); i++ {
40✔
63
                // Attempt to write the data to the database.
20✔
64
                tkn, err := w.write(ctx, tenantID, tupleCollection, attributeCollection)
20✔
65
                if err != nil {
20✔
66
                        // Check if the error is due to serialization, and if so, retry.
×
67
                        if utils.IsSerializationRelatedError(err) || pgconn.SafeToRetry(err) {
×
68
                                slog.WarnContext(ctx, "serialization error occurred", slog.String("tenant_id", tenantID), slog.Int("retry", i))
×
69
                                utils.WaitWithBackoff(ctx, tenantID, i)
×
70
                                continue // Retry the operation.
×
71
                        }
72
                        // If the error is not serialization-related, handle it and return.
73
                        return nil, utils.HandleError(ctx, span, err, base.ErrorCode_ERROR_CODE_DATASTORE)
×
74
                }
75
                // If to write is successful, return the token.
76
                return tkn, nil
20✔
77
        }
78

79
        // Log an error if the operation failed after reaching the maximum number of retries.
80
        slog.ErrorContext(ctx, "max retries reached", slog.Any("error", errors.New(base.ErrorCode_ERROR_CODE_ERROR_MAX_RETRIES.String())))
×
81

×
82
        // Return an error indicating that the maximum number of retries has been reached.
×
83
        return nil, errors.New(base.ErrorCode_ERROR_CODE_ERROR_MAX_RETRIES.String())
×
84
}
85

86
// Delete method removes data from the database based on the provided tuple and attribute filters.
87
// It returns an EncodedSnapToken upon successful deletion or an error if the deletion fails.
88
func (w *DataWriter) Delete(
89
        ctx context.Context,
90
        tenantID string,
91
        tupleFilter *base.TupleFilter,
92
        attributeFilter *base.AttributeFilter,
93
) (token.EncodedSnapToken, error) {
5✔
94
        // Start a new tracing span for this delete operation.
5✔
95
        ctx, span := internal.Tracer.Start(ctx, "data-writer.delete")
5✔
96
        defer span.End() // Ensure that the span is ended when the function returns.
5✔
97

5✔
98
        // Log the start of a data deletion operation.
5✔
99
        slog.DebugContext(ctx, "deleting data for tenant_id", slog.String("tenant_id", tenantID), "max retries", slog.Any("max_retries", w.database.GetMaxRetries()))
5✔
100

5✔
101
        // Retry loop for handling transient errors like serialization issues.
5✔
102
        for i := 0; i <= w.database.GetMaxRetries(); i++ {
10✔
103
                // Attempt to delete the data from the database.
5✔
104
                tkn, err := w.delete(ctx, tenantID, tupleFilter, attributeFilter)
5✔
105
                if err != nil {
5✔
106
                        // Check if the error is due to serialization, and if so, retry.
×
107
                        if utils.IsSerializationRelatedError(err) || pgconn.SafeToRetry(err) {
×
108
                                slog.WarnContext(ctx, "serialization error occurred", slog.String("tenant_id", tenantID), slog.Int("retry", i))
×
109
                                utils.WaitWithBackoff(ctx, tenantID, i)
×
110
                                continue // Retry the operation.
×
111
                        }
112
                        // If the error is not serialization-related, handle it and return.
113
                        return nil, utils.HandleError(ctx, span, err, base.ErrorCode_ERROR_CODE_DATASTORE)
×
114
                }
115
                // If the delete operation is successful, return the token.
116
                return tkn, nil
5✔
117
        }
118

119
        // Log an error if the operation failed after reaching the maximum number of retries.
120
        slog.DebugContext(ctx, "max retries reached", slog.Any("error", errors.New(base.ErrorCode_ERROR_CODE_ERROR_MAX_RETRIES.String())))
×
121

×
122
        // Return an error indicating that the maximum number of retries has been reached.
×
123
        return nil, errors.New(base.ErrorCode_ERROR_CODE_ERROR_MAX_RETRIES.String())
×
124
}
125

126
// RunBundle executes a bundle of operations in the context of a given tenant.
127
// It returns an EncodedSnapToken upon successful completion or an error if the operation fails.
128
func (w *DataWriter) RunBundle(
129
        ctx context.Context,
130
        tenantID string,
131
        arguments map[string]string,
132
        b *base.DataBundle,
133
) (token.EncodedSnapToken, error) {
1✔
134
        // Start a new tracing span for this operation.
1✔
135
        ctx, span := internal.Tracer.Start(ctx, "data-writer.run-bundle")
1✔
136
        defer span.End() // Ensure that the span is ended when the function returns.
1✔
137

1✔
138
        // Log the start of running a bundle operation.
1✔
139
        slog.DebugContext(ctx, "running bundle for tenant_id", slog.String("tenant_id", tenantID), "max retries", slog.Any("max_retries", w.database.GetMaxRetries()))
1✔
140

1✔
141
        // Retry loop for handling transient errors like serialization issues.
1✔
142
        for i := 0; i <= w.database.GetMaxRetries(); i++ {
2✔
143
                // Attempt to run the bundle operation.
1✔
144
                tkn, err := w.runBundle(ctx, tenantID, arguments, b)
1✔
145
                if err != nil {
1✔
146
                        // Check if the error is due to serialization, and if so, retry.
×
147
                        if utils.IsSerializationRelatedError(err) || pgconn.SafeToRetry(err) {
×
148
                                slog.WarnContext(ctx, "serialization error occurred", slog.String("tenant_id", tenantID), slog.Int("retry", i))
×
149
                                utils.WaitWithBackoff(ctx, tenantID, i)
×
150
                                continue // Retry the operation.
×
151
                        }
152
                        // If the error is not serialization-related, handle it and return.
153
                        return nil, utils.HandleError(ctx, span, err, base.ErrorCode_ERROR_CODE_DATASTORE)
×
154
                }
155
                // If the operation is successful, return the token.
156
                return tkn, nil
1✔
157
        }
158

159
        // Log an error if the operation failed after reaching the maximum number of retries.
160
        slog.ErrorContext(ctx, "max retries reached", slog.Any("error", errors.New(base.ErrorCode_ERROR_CODE_ERROR_MAX_RETRIES.String())))
×
161

×
162
        // Return an error indicating that the maximum number of retries has been reached.
×
163
        return nil, errors.New(base.ErrorCode_ERROR_CODE_ERROR_MAX_RETRIES.String())
×
164
}
165

166
// write handles the database writing of tuple and attribute collections for a given tenant.
167
// It returns an EncodedSnapToken upon successful write or an error if the write fails.
168
func (w *DataWriter) write(
169
        ctx context.Context,
170
        tenantID string,
171
        tupleCollection *database.TupleCollection,
172
        attributeCollection *database.AttributeCollection,
173
) (token token.EncodedSnapToken, err error) {
20✔
174
        var tx pgx.Tx
20✔
175
        tx, err = w.database.WritePool.BeginTx(ctx, w.txOptions)
20✔
176
        if err != nil {
20✔
177
                return nil, err
×
178
        }
×
179

180
        defer func() {
40✔
181
                _ = tx.Rollback(ctx)
20✔
182
        }()
20✔
183

184
        var xid types.XID8
20✔
185
        err = tx.QueryRow(ctx, utils.TransactionTemplate, tenantID).Scan(&xid)
20✔
186
        if err != nil {
20✔
187
                return nil, err
×
188
        }
×
189

190
        slog.DebugContext(ctx, "retrieved transaction", slog.Any("xid", xid), "for tenant", slog.Any("tenant_id", tenantID))
20✔
191

20✔
192
        slog.DebugContext(ctx, "processing tuples and executing insert query")
20✔
193

20✔
194
        batch := &pgx.Batch{}
20✔
195

20✔
196
        if len(tupleCollection.GetTuples()) > 0 {
34✔
197
                err = w.batchUpdateRelationships(batch, xid, tenantID, buildDeleteClausesForRelationships(tupleCollection))
14✔
198
                if err != nil {
14✔
199
                        return nil, err
×
200
                }
×
201
                err = w.batchInsertRelationships(batch, xid, tenantID, tupleCollection)
14✔
202
                if err != nil {
14✔
203
                        return nil, err
×
204
                }
×
205
        }
206

207
        if len(attributeCollection.GetAttributes()) > 0 {
35✔
208
                err = w.batchUpdateAttributes(batch, xid, tenantID, buildDeleteClausesForAttributes(attributeCollection))
15✔
209
                if err != nil {
15✔
210
                        return nil, err
×
211
                }
×
212
                err = w.batchInsertAttributes(batch, xid, tenantID, attributeCollection)
15✔
213
                if err != nil {
15✔
214
                        return nil, err
×
215
                }
×
216
        }
217

218
        batchResult := tx.SendBatch(ctx, batch)
20✔
219
        for i := 0; i < batch.Len(); i++ {
182✔
220
                _, err = batchResult.Exec()
162✔
221
                if err != nil {
162✔
222
                        err = batchResult.Close()
×
223
                        if err != nil {
×
224
                                return nil, err
×
225
                        }
×
226
                        return nil, err
×
227
                }
228
        }
229

230
        err = batchResult.Close()
20✔
231
        if err != nil {
20✔
232
                return nil, err
×
233
        }
×
234

235
        if err = tx.Commit(ctx); err != nil {
20✔
236
                return nil, err
×
237
        }
×
238

239
        slog.DebugContext(ctx, "data successfully written to the database")
20✔
240

20✔
241
        return snapshot.NewToken(xid).Encode(), nil
20✔
242
}
243

244
// delete handles the deletion of tuples and attributes from the database based on provided filters.
245
// It returns an EncodedSnapToken upon successful deletion or an error if the deletion fails.
246
func (w *DataWriter) delete(
247
        ctx context.Context,
248
        tenantID string,
249
        tupleFilter *base.TupleFilter,
250
        attributeFilter *base.AttributeFilter,
251
) (token token.EncodedSnapToken, err error) {
5✔
252
        var tx pgx.Tx
5✔
253
        tx, err = w.database.WritePool.BeginTx(ctx, w.txOptions)
5✔
254
        if err != nil {
5✔
255
                return nil, err
×
256
        }
×
257

258
        defer func() {
10✔
259
                _ = tx.Rollback(ctx)
5✔
260
        }()
5✔
261

262
        var xid types.XID8
5✔
263
        err = tx.QueryRow(ctx, utils.TransactionTemplate, tenantID).Scan(&xid)
5✔
264
        if err != nil {
5✔
265
                return nil, err
×
266
        }
×
267

268
        slog.DebugContext(ctx, "retrieved transaction", slog.Any("xid", xid), "for tenant", slog.Any("tenant_id", tenantID))
5✔
269

5✔
270
        slog.DebugContext(ctx, "processing tuple and executing update query")
5✔
271

5✔
272
        if !validation.IsTupleFilterEmpty(tupleFilter) {
8✔
273
                tbuilder := w.database.Builder.Update(RelationTuplesTable).Set("expired_tx_id", xid).Where(squirrel.Eq{"expired_tx_id": "0", "tenant_id": tenantID})
3✔
274
                tbuilder = utils.TuplesFilterQueryForUpdateBuilder(tbuilder, tupleFilter)
3✔
275

3✔
276
                var tquery string
3✔
277
                var targs []interface{}
3✔
278

3✔
279
                tquery, targs, err = tbuilder.ToSql()
3✔
280
                if err != nil {
3✔
281
                        return nil, err
×
282
                }
×
283

284
                _, err = tx.Exec(ctx, tquery, targs...)
3✔
285
                if err != nil {
3✔
286
                        return nil, err
×
287
                }
×
288
        }
289

290
        slog.DebugContext(ctx, "processing attribute and executing update query")
5✔
291

5✔
292
        if !validation.IsAttributeFilterEmpty(attributeFilter) {
9✔
293
                abuilder := w.database.Builder.Update(AttributesTable).Set("expired_tx_id", xid).Where(squirrel.Eq{"expired_tx_id": "0", "tenant_id": tenantID})
4✔
294
                abuilder = utils.AttributesFilterQueryForUpdateBuilder(abuilder, attributeFilter)
4✔
295

4✔
296
                var aquery string
4✔
297
                var aargs []interface{}
4✔
298

4✔
299
                aquery, aargs, err = abuilder.ToSql()
4✔
300
                if err != nil {
4✔
301
                        return nil, err
×
302
                }
×
303

304
                _, err = tx.Exec(ctx, aquery, aargs...)
4✔
305
                if err != nil {
4✔
306
                        return nil, err
×
307
                }
×
308
        }
309

310
        if err = tx.Commit(ctx); err != nil {
5✔
311
                return nil, err
×
312
        }
×
313

314
        slog.DebugContext(ctx, "data successfully deleted from the database")
5✔
315

5✔
316
        return snapshot.NewToken(xid).Encode(), nil
5✔
317
}
318

319
// runBundle executes a series of operations defined in a DataBundle within a single database transaction.
320
// It returns an EncodedSnapToken upon successful execution of all operations or an error if any operation fails.
321
func (w *DataWriter) runBundle(
322
        ctx context.Context,
323
        tenantID string,
324
        arguments map[string]string,
325
        b *base.DataBundle,
326
) (token token.EncodedSnapToken, err error) {
1✔
327
        var tx pgx.Tx
1✔
328
        tx, err = w.database.WritePool.BeginTx(ctx, w.txOptions)
1✔
329
        if err != nil {
1✔
330
                return nil, err
×
331
        }
×
332

333
        defer func() {
2✔
334
                _ = tx.Rollback(ctx)
1✔
335
        }()
1✔
336

337
        var xid types.XID8
1✔
338
        err = tx.QueryRow(ctx, utils.TransactionTemplate, tenantID).Scan(&xid)
1✔
339
        if err != nil {
1✔
340
                return nil, err
×
341
        }
×
342

343
        slog.DebugContext(ctx, "retrieved transaction", slog.Any("xid", xid), "for tenant", slog.Any("tenant_id", tenantID))
1✔
344

1✔
345
        batch := &pgx.Batch{}
1✔
346

1✔
347
        for _, op := range b.GetOperations() {
2✔
348
                tb, ab, err := bundle.Operation(arguments, op)
1✔
349
                if err != nil {
1✔
350
                        return nil, err
×
351
                }
×
352

353
                err = w.runOperation(batch, xid, tenantID, tb, ab)
1✔
354
                if err != nil {
1✔
355
                        return nil, err
×
356
                }
×
357
        }
358

359
        batchResult := tx.SendBatch(ctx, batch)
1✔
360
        for i := 0; i < batch.Len(); i++ {
13✔
361
                _, err = batchResult.Exec()
12✔
362
                if err != nil {
12✔
363
                        err = batchResult.Close()
×
364
                        if err != nil {
×
365
                                return nil, err
×
366
                        }
×
367
                        return nil, err
×
368
                }
369
        }
370

371
        err = batchResult.Close()
1✔
372
        if err != nil {
1✔
373
                return nil, err
×
374
        }
×
375

376
        if err = tx.Commit(ctx); err != nil {
1✔
377
                return nil, err
×
378
        }
×
379

380
        return snapshot.NewToken(xid).Encode(), nil
1✔
381
}
382

383
// runOperation processes and executes database operations defined in TupleBundle and AttributeBundle within a given transaction.
384
func (w *DataWriter) runOperation(
385
        batch *pgx.Batch,
386
        xid types.XID8,
387
        tenantID string,
388
        tb database.TupleBundle,
389
        ab database.AttributeBundle,
390
) (err error) {
1✔
391
        slog.Debug("processing bundles queries")
1✔
392
        if len(tb.Write.GetTuples()) > 0 {
2✔
393
                deleteClauses := buildDeleteClausesForRelationships(&tb.Write)
1✔
394
                err = w.batchUpdateRelationships(batch, xid, tenantID, deleteClauses)
1✔
395
                if err != nil {
1✔
396
                        return err
×
397
                }
×
398
                err = w.batchInsertRelationships(batch, xid, tenantID, &tb.Write)
1✔
399
                if err != nil {
1✔
400
                        return err
×
401
                }
×
402
        }
403

404
        if len(ab.Write.GetAttributes()) > 0 {
2✔
405
                deleteClauses := buildDeleteClausesForAttributes(&ab.Write)
1✔
406
                err = w.batchUpdateAttributes(batch, xid, tenantID, deleteClauses)
1✔
407
                if err != nil {
1✔
408
                        return err
×
409
                }
×
410

411
                err = w.batchInsertAttributes(batch, xid, tenantID, &ab.Write)
1✔
412
                if err != nil {
1✔
413
                        return err
×
414
                }
×
415
        }
416

417
        if len(tb.Delete.GetTuples()) > 0 {
2✔
418
                deleteClauses := buildDeleteClausesForRelationships(&tb.Delete)
1✔
419
                err = w.batchUpdateRelationships(batch, xid, tenantID, deleteClauses)
1✔
420
                if err != nil {
1✔
421
                        return err
×
422
                }
×
423
        }
424

425
        if len(ab.Delete.GetAttributes()) > 0 {
2✔
426
                deleteClauses := buildDeleteClausesForAttributes(&ab.Delete)
1✔
427
                err = w.batchUpdateAttributes(batch, xid, tenantID, deleteClauses)
1✔
428
                if err != nil {
1✔
429
                        return err
×
430
                }
×
431
        }
432

433
        return nil
1✔
434
}
435

436
// batchInsertTuples function for batch inserting tuples
437
func (w *DataWriter) batchInsertRelationships(batch *pgx.Batch, xid types.XID8, tenantID string, tupleCollection *database.TupleCollection) error {
15✔
438
        titer := tupleCollection.CreateTupleIterator()
15✔
439
        for titer.HasNext() {
59✔
440
                t := titer.GetNext()
44✔
441
                srelation := t.GetSubject().GetRelation()
44✔
442
                if srelation == tuple.ELLIPSIS {
44✔
443
                        srelation = ""
×
444
                }
×
445
                batch.Queue(
44✔
446
                        "INSERT INTO relation_tuples (entity_type, entity_id, relation, subject_type, subject_id, subject_relation, created_tx_id, tenant_id) VALUES ($1, $2, $3, $4, $5, $6, $7, $8)",
44✔
447
                        t.GetEntity().GetType(), t.GetEntity().GetId(), t.GetRelation(), t.GetSubject().GetType(), t.GetSubject().GetId(), srelation, xid, tenantID,
44✔
448
                )
44✔
449
        }
450
        return nil
15✔
451
}
452

453
// batchUpdateTuples function for batch updating tuples
454
func (w *DataWriter) batchUpdateRelationships(batch *pgx.Batch, xid types.XID8, tenantID string, deleteClauses []squirrel.Eq) error {
16✔
455
        for _, condition := range deleteClauses {
61✔
456
                query, args, err := w.database.Builder.Update(RelationTuplesTable).
45✔
457
                        Set("expired_tx_id", xid).
45✔
458
                        Where(squirrel.Eq{"expired_tx_id": "0", "tenant_id": tenantID}).
45✔
459
                        Where(condition).
45✔
460
                        ToSql()
45✔
461
                if err != nil {
45✔
462
                        return err
×
463
                }
×
464
                batch.Queue(query, args...)
45✔
465
        }
466
        return nil
16✔
467
}
468

469
// buildDeleteClauses function to build delete clauses for tuples
470
func buildDeleteClausesForRelationships(tupleCollection *database.TupleCollection) []squirrel.Eq {
16✔
471
        deleteClauses := make([]squirrel.Eq, 0)
16✔
472

16✔
473
        titer := tupleCollection.CreateTupleIterator()
16✔
474
        for titer.HasNext() {
61✔
475
                t := titer.GetNext()
45✔
476
                srelation := t.GetSubject().GetRelation()
45✔
477
                if srelation == tuple.ELLIPSIS {
45✔
478
                        srelation = ""
×
479
                }
×
480

481
                condition := squirrel.Eq{
45✔
482
                        "entity_type":      t.GetEntity().GetType(),
45✔
483
                        "entity_id":        t.GetEntity().GetId(),
45✔
484
                        "relation":         t.GetRelation(),
45✔
485
                        "subject_type":     t.GetSubject().GetType(),
45✔
486
                        "subject_id":       t.GetSubject().GetId(),
45✔
487
                        "subject_relation": srelation,
45✔
488
                }
45✔
489

45✔
490
                deleteClauses = append(deleteClauses, condition)
45✔
491
        }
492

493
        return deleteClauses
16✔
494
}
495

496
// batchInsertAttributes function for batch inserting attributes
497
func (w *DataWriter) batchInsertAttributes(batch *pgx.Batch, xid types.XID8, tenantID string, attributeCollection *database.AttributeCollection) error {
16✔
498
        m := jsonpb.Marshaler{}
16✔
499
        aiter := attributeCollection.CreateAttributeIterator()
16✔
500
        for aiter.HasNext() {
58✔
501
                a := aiter.GetNext()
42✔
502

42✔
503
                jsonStr, err := m.MarshalToString(a.GetValue())
42✔
504
                if err != nil {
42✔
505
                        return err
×
506
                }
×
507

508
                batch.Queue(
42✔
509
                        "INSERT INTO attributes (entity_type, entity_id, attribute, value, created_tx_id, tenant_id) VALUES ($1, $2, $3, $4, $5, $6)",
42✔
510
                        a.GetEntity().GetType(), a.GetEntity().GetId(), a.GetAttribute(), jsonStr, xid, tenantID,
42✔
511
                )
42✔
512
        }
513
        return nil
16✔
514
}
515

516
// batchUpdateAttributes function for batch updating attributes
517
func (w *DataWriter) batchUpdateAttributes(batch *pgx.Batch, xid types.XID8, tenantID string, deleteClauses []squirrel.Eq) error {
17✔
518
        for _, condition := range deleteClauses {
60✔
519
                query, args, err := w.database.Builder.Update(AttributesTable).
43✔
520
                        Set("expired_tx_id", xid).
43✔
521
                        Where(squirrel.Eq{"expired_tx_id": "0", "tenant_id": tenantID}).
43✔
522
                        Where(condition).
43✔
523
                        ToSql()
43✔
524
                if err != nil {
43✔
525
                        return err
×
526
                }
×
527

528
                batch.Queue(query, args...)
43✔
529
        }
530
        return nil
17✔
531
}
532

533
// buildDeleteClausesForAttributes function to build delete clauses for attributes
534
func buildDeleteClausesForAttributes(attributeCollection *database.AttributeCollection) []squirrel.Eq {
17✔
535
        deleteClauses := make([]squirrel.Eq, 0)
17✔
536

17✔
537
        aiter := attributeCollection.CreateAttributeIterator()
17✔
538
        for aiter.HasNext() {
60✔
539
                a := aiter.GetNext()
43✔
540

43✔
541
                condition := squirrel.Eq{
43✔
542
                        "entity_type": a.GetEntity().GetType(),
43✔
543
                        "entity_id":   a.GetEntity().GetId(),
43✔
544
                        "attribute":   a.GetAttribute(),
43✔
545
                }
43✔
546

43✔
547
                deleteClauses = append(deleteClauses, condition)
43✔
548
        }
43✔
549

550
        return deleteClauses
17✔
551
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc