• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

ory / keto / 14309294778

07 Apr 2025 12:33PM UTC coverage: 51.121% (-26.7%) from 77.849%
14309294778

push

github

ory-bot
autogen: update license overview

3854 of 7539 relevant lines covered (51.12%)

0.57 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

78.78
/internal/persistence/sql/relationtuples.go
1
// Copyright © 2023 Ory Corp
2
// SPDX-License-Identifier: Apache-2.0
3

4
package sql
5

6
import (
7
        "context"
8
        "database/sql"
9
        "fmt"
10
        "slices"
11
        "strings"
12
        "time"
13

14
        "github.com/gobuffalo/pop/v6"
15
        "github.com/gofrs/uuid"
16
        "github.com/ory/x/otelx"
17
        "github.com/ory/x/sqlcon"
18
        "github.com/pkg/errors"
19
        "go.opentelemetry.io/otel/attribute"
20
        "go.opentelemetry.io/otel/trace"
21

22
        "github.com/ory/keto/internal/relationtuple"
23
        "github.com/ory/keto/internal/x"
24
        "github.com/ory/keto/ketoapi"
25
)
26

27
// Typical database limits for placeholders/bind vars are 1<<15 (32k, MySQL, SQLite) and 1<<16 (64k, PostgreSQL, CockroachDB).
28
const (
29
        chunkSizeInsertUUIDMappings = 15000 // two placeholders per mapping
30
        chunkSizeInsertTuple        = 3000  // ten placeholders per tuple
31
        chunkSizeDeleteTuple        = 100   // the database must build an expression tree for each chunk, so we must limit more aggressively
32
)
33

34
type (
35
        RelationTuple struct {
36
                // An ID field is required to make pop happy. The actual ID is a composite primary key.
37
                ID                  uuid.UUID      `db:"shard_id"`
38
                NetworkID           uuid.UUID      `db:"nid"`
39
                Namespace           string         `db:"namespace"`
40
                Object              uuid.UUID      `db:"object"`
41
                Relation            string         `db:"relation"`
42
                SubjectID           uuid.NullUUID  `db:"subject_id"`
43
                SubjectSetNamespace sql.NullString `db:"subject_set_namespace"`
44
                SubjectSetObject    uuid.NullUUID  `db:"subject_set_object"`
45
                SubjectSetRelation  sql.NullString `db:"subject_set_relation"`
46
                CommitTime          time.Time      `db:"commit_time"`
47
        }
48
        relationTuples []*RelationTuple
49
)
50

51
func (relationTuples) TableName() string {
1✔
52
        return "keto_relation_tuples"
1✔
53
}
1✔
54

55
func (*RelationTuple) TableName() string {
1✔
56
        return "keto_relation_tuples"
1✔
57
}
1✔
58

59
func (r *RelationTuple) ToInternal() (*relationtuple.RelationTuple, error) {
1✔
60
        if r == nil {
1✔
61
                return nil, nil
×
62
        }
×
63

64
        rt := &relationtuple.RelationTuple{
1✔
65
                Relation:  r.Relation,
1✔
66
                Object:    r.Object,
1✔
67
                Namespace: r.Namespace,
1✔
68
        }
1✔
69

1✔
70
        if r.SubjectID.Valid {
2✔
71
                rt.Subject = &relationtuple.SubjectID{
1✔
72
                        ID: r.SubjectID.UUID,
1✔
73
                }
1✔
74
        } else {
2✔
75
                rt.Subject = &relationtuple.SubjectSet{
1✔
76
                        Namespace: r.SubjectSetNamespace.String,
1✔
77
                        Object:    r.SubjectSetObject.UUID,
1✔
78
                        Relation:  r.SubjectSetRelation.String,
1✔
79
                }
1✔
80
        }
1✔
81

82
        return rt, nil
1✔
83
}
84

85
func (r *RelationTuple) insertSubject(s relationtuple.Subject) error {
1✔
86
        switch st := s.(type) {
1✔
87
        case *relationtuple.SubjectID:
1✔
88
                r.SubjectID = uuid.NullUUID{
1✔
89
                        UUID:  st.ID,
1✔
90
                        Valid: true,
1✔
91
                }
1✔
92
                r.SubjectSetNamespace = sql.NullString{}
1✔
93
                r.SubjectSetObject = uuid.NullUUID{}
1✔
94
                r.SubjectSetRelation = sql.NullString{}
1✔
95
        case *relationtuple.SubjectSet:
1✔
96
                r.SubjectID = uuid.NullUUID{}
1✔
97
                _ = r.SubjectSetNamespace.Scan(st.Namespace)
1✔
98
                _ = r.SubjectSetObject.Scan(st.Object)
1✔
99
                _ = r.SubjectSetRelation.Scan(st.Relation)
1✔
100
        }
101
        return nil
1✔
102
}
103

104
func (r *RelationTuple) FromInternal(rt *relationtuple.RelationTuple) (err error) {
1✔
105
        r.Namespace = rt.Namespace
1✔
106
        r.Object = rt.Object
1✔
107
        r.Relation = rt.Relation
1✔
108

1✔
109
        return r.insertSubject(rt.Subject)
1✔
110
}
1✔
111

112
func (p *Persister) whereSubject(_ context.Context, q *pop.Query, sub relationtuple.Subject) error {
1✔
113
        switch s := sub.(type) {
1✔
114
        case *relationtuple.SubjectID:
1✔
115
                q.
1✔
116
                        Where("subject_id = ?", s.ID).
1✔
117
                        // NULL checks to leverage partial indexes
1✔
118
                        Where("subject_set_namespace IS NULL").
1✔
119
                        Where("subject_set_object IS NULL").
1✔
120
                        Where("subject_set_relation IS NULL")
1✔
121
        case *relationtuple.SubjectSet:
×
122
                q.
×
123
                        Where("subject_set_namespace = ?", s.Namespace).
×
124
                        Where("subject_set_object = ?", s.Object).
×
125
                        Where("subject_set_relation = ?", s.Relation).
×
126
                        // NULL checks to leverage partial indexes
×
127
                        Where("subject_id IS NULL")
×
128
        case nil:
×
129
                return errors.WithStack(ketoapi.ErrNilSubject)
×
130
        }
131
        return nil
1✔
132
}
133

134
func (p *Persister) whereQuery(ctx context.Context, q *pop.Query, rq *relationtuple.RelationQuery) error {
1✔
135
        if rq.Namespace != nil {
2✔
136
                q.Where("namespace = ?", rq.Namespace)
1✔
137
        }
1✔
138
        if rq.Object != nil {
2✔
139
                q.Where("object = ?", rq.Object)
1✔
140
        }
1✔
141
        if rq.Relation != nil {
2✔
142
                q.Where("relation = ?", rq.Relation)
1✔
143
        }
1✔
144
        if s := rq.Subject; s != nil {
2✔
145
                if err := p.whereSubject(ctx, q, s); err != nil {
1✔
146
                        return err
×
147
                }
×
148
        }
149
        return nil
1✔
150
}
151

152
func buildDelete(nid uuid.UUID, rs []*relationtuple.RelationTuple) (query string, args []any, err error) {
1✔
153
        if len(rs) == 0 {
2✔
154
                return "", nil, errors.WithStack(ketoapi.ErrMalformedInput)
1✔
155
        }
1✔
156

157
        args = make([]any, 0, 6*len(rs)+1)
1✔
158
        ors := make([]string, 0, len(rs))
1✔
159
        for _, rt := range rs {
2✔
160
                switch s := rt.Subject.(type) {
1✔
161
                case *relationtuple.SubjectID:
1✔
162
                        ors = append(ors, "(namespace = ? AND object = ? AND relation = ? AND subject_id = ? AND subject_set_namespace IS NULL AND subject_set_object IS NULL AND subject_set_relation IS NULL)")
1✔
163
                        args = append(args, rt.Namespace, rt.Object, rt.Relation, s.ID)
1✔
164
                case *relationtuple.SubjectSet:
1✔
165
                        ors = append(ors, "(namespace = ? AND object = ? AND relation = ? AND subject_id IS NULL AND subject_set_namespace = ? AND subject_set_object = ? AND subject_set_relation = ?)")
1✔
166
                        args = append(args, rt.Namespace, rt.Object, rt.Relation, s.Namespace, s.Object, s.Relation)
1✔
167
                case nil:
1✔
168
                        return "", nil, errors.WithStack(ketoapi.ErrNilSubject)
1✔
169
                }
170
        }
171

172
        query = fmt.Sprintf("DELETE FROM %s WHERE (%s) AND nid = ?", (&RelationTuple{}).TableName(), strings.Join(ors, " OR "))
1✔
173
        args = append(args, nid)
1✔
174
        return query, args, nil
1✔
175
}
176

177
func (p *Persister) DeleteRelationTuples(ctx context.Context, rs ...*relationtuple.RelationTuple) (err error) {
1✔
178
        if len(rs) == 0 {
2✔
179
                return nil
1✔
180
        }
1✔
181

182
        ctx, span := p.d.Tracer(ctx).Tracer().Start(ctx, "persistence.sql.DeleteRelationTuples",
1✔
183
                trace.WithAttributes(attribute.Int("count", len(rs))))
1✔
184
        defer otelx.End(span, &err)
1✔
185

1✔
186
        return p.Transaction(ctx, func(ctx context.Context) error {
2✔
187
                for chunk := range slices.Chunk(rs, chunkSizeDeleteTuple) {
2✔
188
                        q, args, err := buildDelete(p.NetworkID(ctx), chunk)
1✔
189
                        if err != nil {
2✔
190
                                return err
1✔
191
                        }
1✔
192
                        if q == "" {
1✔
193
                                continue
×
194
                        }
195
                        if err := p.Connection(ctx).RawQuery(q, args...).Exec(); err != nil {
1✔
196
                                return sqlcon.HandleError(err)
×
197
                        }
×
198
                }
199
                return nil
1✔
200
        })
201
}
202

203
func (p *Persister) DeleteAllRelationTuples(ctx context.Context, query *relationtuple.RelationQuery) (err error) {
×
204
        ctx, span := p.d.Tracer(ctx).Tracer().Start(ctx, "persistence.sql.DeleteAllRelationTuples")
×
205
        defer otelx.End(span, &err)
×
206

×
207
        return p.Transaction(ctx, func(ctx context.Context) error {
×
208
                sqlQuery := p.queryWithNetwork(ctx)
×
209
                err := p.whereQuery(ctx, sqlQuery, query)
×
210
                if err != nil {
×
211
                        return err
×
212
                }
×
213

214
                var res relationTuples
×
215
                return sqlQuery.Delete(&res)
×
216
        })
217
}
218

219
func (p *Persister) GetRelationTuples(ctx context.Context, query *relationtuple.RelationQuery, options ...x.PaginationOptionSetter) (_ []*relationtuple.RelationTuple, nextPageToken string, err error) {
1✔
220
        ctx, span := p.d.Tracer(ctx).Tracer().Start(ctx, "persistence.sql.GetRelationTuples")
1✔
221
        defer otelx.End(span, &err)
1✔
222

1✔
223
        pagination, err := internalPaginationFromOptions(options...)
1✔
224
        if err != nil {
1✔
225
                return nil, "", err
×
226
        }
×
227

228
        sqlQuery := p.queryWithNetwork(ctx).
1✔
229
                Order("shard_id").
1✔
230
                Where("shard_id > ?", pagination.LastID).
1✔
231
                Limit(pagination.PerPage + 1)
1✔
232

1✔
233
        err = p.whereQuery(ctx, sqlQuery, query)
1✔
234
        if err != nil {
1✔
235
                return nil, "", err
×
236
        }
×
237
        var res relationTuples
1✔
238
        if err := sqlQuery.All(&res); err != nil {
2✔
239
                return nil, "", sqlcon.HandleError(err)
1✔
240
        }
1✔
241
        if len(res) == 0 {
2✔
242
                return make([]*relationtuple.RelationTuple, 0), "", nil
1✔
243
        }
1✔
244

245
        if len(res) > pagination.PerPage {
2✔
246
                res = res[:len(res)-1]
1✔
247
                nextPageToken = pagination.encodeNextPageToken(res[len(res)-1].ID)
1✔
248
        }
1✔
249

250
        internalRes := make([]*relationtuple.RelationTuple, 0, len(res))
1✔
251
        for _, r := range res {
2✔
252
                if rt, err := r.ToInternal(); err == nil {
2✔
253
                        // Ignore error here, which stems from a deleted namespace.
1✔
254
                        internalRes = append(internalRes, rt)
1✔
255
                }
1✔
256
        }
257

258
        return internalRes, nextPageToken, nil
1✔
259
}
260

261
func (p *Persister) ExistsRelationTuples(ctx context.Context, query *relationtuple.RelationQuery) (_ bool, err error) {
×
262
        ctx, span := p.d.Tracer(ctx).Tracer().Start(ctx, "persistence.sql.ExistsRelationTuples")
×
263
        defer otelx.End(span, &err)
×
264

×
265
        sqlQuery := p.queryWithNetwork(ctx)
×
266

×
267
        err = p.whereQuery(ctx, sqlQuery, query)
×
268
        if err != nil {
×
269
                return false, err
×
270
        }
×
271
        exists, err := sqlQuery.Exists(&RelationTuple{})
×
272
        return exists, sqlcon.HandleError(err)
×
273
}
274

275
func buildInsert(commitTime time.Time, nid uuid.UUID, rs []*relationtuple.RelationTuple) (query string, args []any, err error) {
1✔
276
        if len(rs) == 0 {
2✔
277
                return "", nil, errors.WithStack(ketoapi.ErrMalformedInput)
1✔
278
        }
1✔
279

280
        var q strings.Builder
1✔
281
        fmt.Fprintf(&q, "INSERT INTO %s (shard_id, nid, namespace, object, relation, subject_id, subject_set_namespace, subject_set_object, subject_set_relation, commit_time) VALUES ", (&RelationTuple{}).TableName())
1✔
282
        const placeholders = "(?, ?, ?, ?, ?, ?, ?, ?, ?, ?)"
1✔
283
        const separator = ", "
1✔
284
        q.Grow(len(rs) * (len(placeholders) + len(separator)))
1✔
285
        args = make([]any, 0, 10*len(rs))
1✔
286

1✔
287
        for i, r := range rs {
2✔
288
                if r.Subject == nil {
2✔
289
                        return "", nil, errors.WithStack(ketoapi.ErrNilSubject)
1✔
290
                }
1✔
291

292
                rt := &RelationTuple{
1✔
293
                        ID:         uuid.Must(uuid.NewV4()),
1✔
294
                        NetworkID:  nid,
1✔
295
                        CommitTime: commitTime,
1✔
296
                }
1✔
297
                if err := rt.FromInternal(r); err != nil {
1✔
298
                        return "", nil, err
×
299
                }
×
300

301
                if i > 0 {
2✔
302
                        q.WriteString(separator)
1✔
303
                }
1✔
304
                q.WriteString(placeholders)
1✔
305
                args = append(args, rt.ID, rt.NetworkID, rt.Namespace, rt.Object, rt.Relation, rt.SubjectID, rt.SubjectSetNamespace, rt.SubjectSetObject, rt.SubjectSetRelation, rt.CommitTime)
1✔
306
        }
307

308
        query = q.String()
1✔
309
        return query, args, nil
1✔
310
}
311

312
func (p *Persister) WriteRelationTuples(ctx context.Context, rs ...*relationtuple.RelationTuple) (err error) {
1✔
313
        if len(rs) == 0 {
1✔
314
                return nil
×
315
        }
×
316

317
        ctx, span := p.d.Tracer(ctx).Tracer().Start(ctx, "persistence.sql.WriteRelationTuples",
1✔
318
                trace.WithAttributes(attribute.Int("count", len(rs))))
1✔
319
        defer otelx.End(span, &err)
1✔
320

1✔
321
        commitTime := time.Now()
1✔
322

1✔
323
        return p.Transaction(ctx, func(ctx context.Context) error {
2✔
324
                for chunk := range slices.Chunk(rs, chunkSizeInsertTuple) {
2✔
325
                        q, args, err := buildInsert(commitTime, p.NetworkID(ctx), chunk)
1✔
326
                        if err != nil {
2✔
327
                                return err
1✔
328
                        }
1✔
329
                        if err := p.Connection(ctx).RawQuery(q, args...).Exec(); err != nil {
1✔
330
                                return sqlcon.HandleError(err)
×
331
                        }
×
332
                }
333
                return nil
1✔
334
        })
335
}
336

337
func (p *Persister) TransactRelationTuples(ctx context.Context, ins []*relationtuple.RelationTuple, del []*relationtuple.RelationTuple) (err error) {
1✔
338
        ctx, span := p.d.Tracer(ctx).Tracer().Start(ctx, "persistence.sql.TransactRelationTuples")
1✔
339
        defer otelx.End(span, &err)
1✔
340

1✔
341
        if len(ins)+len(del) == 0 {
1✔
342
                return nil
×
343
        }
×
344

345
        return p.Transaction(ctx, func(ctx context.Context) error {
2✔
346
                if err := p.WriteRelationTuples(ctx, ins...); err != nil {
2✔
347
                        return err
1✔
348
                }
1✔
349
                return p.DeleteRelationTuples(ctx, del...)
1✔
350
        })
351
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc