• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

Permify / permify / 10528103096

23 Aug 2024 03:13PM UTC coverage: 66.726%. Remained the same
10528103096

Pull #1488

github

tolgaOzen
feat: update API requests and client-side calls with improved formatting
Pull Request #1488: feat: update API requests and client-side calls with improved formatting

6700 of 10041 relevant lines covered (66.73%)

113.51 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

0.0
/internal/storage/postgres/schemaReader.go
1
package postgres
2

3
import (
4
        "context"
5
        "errors"
6
        "log/slog"
7

8
        "github.com/jackc/pgx/v5"
9

10
        "github.com/Masterminds/squirrel"
11
        "github.com/rs/xid"
12

13
        "github.com/Permify/permify/internal/schema"
14
        "github.com/Permify/permify/internal/storage"
15
        "github.com/Permify/permify/internal/storage/postgres/utils"
16
        "github.com/Permify/permify/pkg/database"
17
        db "github.com/Permify/permify/pkg/database/postgres"
18
        base "github.com/Permify/permify/pkg/pb/base/v1"
19
)
20

21
// SchemaReader - Structure for SchemaReader
22
type SchemaReader struct {
23
        database *db.Postgres
24
        // options
25
        txOptions pgx.TxOptions
26
}
27

28
// NewSchemaReader - Creates a new SchemaReader
29
func NewSchemaReader(database *db.Postgres) *SchemaReader {
×
30
        return &SchemaReader{
×
31
                database:  database,
×
32
                txOptions: pgx.TxOptions{IsoLevel: pgx.ReadCommitted, AccessMode: pgx.ReadOnly},
×
33
        }
×
34
}
×
35

36
// ReadSchema returns the schema definition for a specific tenant and version as a structured object.
37
func (r *SchemaReader) ReadSchema(ctx context.Context, tenantID, version string) (sch *base.SchemaDefinition, err error) {
×
38
        ctx, span := tracer.Start(ctx, "schema-reader.read-schema")
×
39
        defer span.End()
×
40

×
41
        slog.DebugContext(ctx, "reading schema", slog.Any("tenant_id", tenantID), slog.Any("version", version))
×
42

×
43
        builder := r.database.Builder.Select("name, serialized_definition, version").From(SchemaDefinitionTable).Where(squirrel.Eq{"version": version, "tenant_id": tenantID})
×
44

×
45
        var query string
×
46
        var args []interface{}
×
47

×
48
        query, args, err = builder.ToSql()
×
49
        if err != nil {
×
50
                return nil, utils.HandleError(ctx, span, err, base.ErrorCode_ERROR_CODE_SQL_BUILDER)
×
51
        }
×
52

53
        slog.DebugContext(ctx, "executing sql query", slog.Any("query", query), slog.Any("arguments", args))
×
54

×
55
        var rows pgx.Rows
×
56
        rows, err = r.database.ReadPool.Query(ctx, query, args...)
×
57
        if err != nil {
×
58
                return nil, utils.HandleError(ctx, span, err, base.ErrorCode_ERROR_CODE_EXECUTION)
×
59
        }
×
60
        defer rows.Close()
×
61

×
62
        var definitions []string
×
63
        for rows.Next() {
×
64
                sd := storage.SchemaDefinition{}
×
65
                err = rows.Scan(&sd.Name, &sd.SerializedDefinition, &sd.Version)
×
66
                if err != nil {
×
67
                        return nil, utils.HandleError(ctx, span, err, base.ErrorCode_ERROR_CODE_SCAN)
×
68
                }
×
69
                definitions = append(definitions, sd.Serialized())
×
70
        }
71
        if err = rows.Err(); err != nil {
×
72
                return nil, utils.HandleError(ctx, span, err, base.ErrorCode_ERROR_CODE_SCAN)
×
73
        }
×
74

75
        slog.DebugContext(ctx, "successfully retrieved", slog.Any("schema definitions", len(definitions)))
×
76

×
77
        sch, err = schema.NewSchemaFromStringDefinitions(false, definitions...)
×
78
        if err != nil {
×
79
                return nil, utils.HandleError(ctx, span, err, base.ErrorCode_ERROR_CODE_INTERNAL)
×
80
        }
×
81

82
        return sch, err
×
83
}
84

85
// ReadSchemaString returns the schema definition for a specific tenant and version as a string.
86
func (r *SchemaReader) ReadSchemaString(ctx context.Context, tenantID, version string) (definitions []string, err error) {
×
87
        ctx, span := tracer.Start(ctx, "schema-reader.read-schema-string")
×
88
        defer span.End()
×
89

×
90
        slog.DebugContext(ctx, "reading schema", slog.Any("tenant_id", tenantID), slog.Any("version", version))
×
91

×
92
        builder := r.database.Builder.Select("name, serialized_definition, version").From(SchemaDefinitionTable).Where(squirrel.Eq{"version": version, "tenant_id": tenantID})
×
93

×
94
        var query string
×
95
        var args []interface{}
×
96

×
97
        query, args, err = builder.ToSql()
×
98
        if err != nil {
×
99
                return []string{}, utils.HandleError(ctx, span, err, base.ErrorCode_ERROR_CODE_SQL_BUILDER)
×
100
        }
×
101

102
        slog.DebugContext(ctx, "executing sql query", slog.Any("query", query), slog.Any("arguments", args))
×
103

×
104
        var rows pgx.Rows
×
105
        rows, err = r.database.ReadPool.Query(ctx, query, args...)
×
106
        if err != nil {
×
107
                return []string{}, utils.HandleError(ctx, span, err, base.ErrorCode_ERROR_CODE_EXECUTION)
×
108
        }
×
109
        defer rows.Close()
×
110

×
111
        for rows.Next() {
×
112
                sd := storage.SchemaDefinition{}
×
113
                err = rows.Scan(&sd.Name, &sd.SerializedDefinition, &sd.Version)
×
114
                if err != nil {
×
115
                        return []string{}, utils.HandleError(ctx, span, err, base.ErrorCode_ERROR_CODE_SCAN)
×
116
                }
×
117
                definitions = append(definitions, sd.Serialized())
×
118
        }
119
        if err = rows.Err(); err != nil {
×
120
                return []string{}, utils.HandleError(ctx, span, err, base.ErrorCode_ERROR_CODE_SCAN)
×
121
        }
×
122

123
        slog.DebugContext(ctx, "successfully retrieved", slog.Any("schema definitions", len(definitions)))
×
124

×
125
        return definitions, err
×
126
}
127

128
// ReadEntityDefinition - Reads entity config from the repository.
129
func (r *SchemaReader) ReadEntityDefinition(ctx context.Context, tenantID, name, version string) (definition *base.EntityDefinition, v string, err error) {
×
130
        ctx, span := tracer.Start(ctx, "schema-reader.read-entity-definition")
×
131
        defer span.End()
×
132

×
133
        slog.DebugContext(ctx, "reading entity definition", slog.Any("tenant_id", tenantID), slog.Any("version", version))
×
134

×
135
        builder := r.database.Builder.Select("name, serialized_definition, version").Where(squirrel.Eq{"name": name, "version": version, "tenant_id": tenantID}).From(SchemaDefinitionTable).Limit(1)
×
136

×
137
        var query string
×
138
        var args []interface{}
×
139

×
140
        query, args, err = builder.ToSql()
×
141
        if err != nil {
×
142
                return nil, "", utils.HandleError(ctx, span, err, base.ErrorCode_ERROR_CODE_SQL_BUILDER)
×
143
        }
×
144

145
        slog.DebugContext(ctx, "executing sql query", slog.Any("query", query), slog.Any("arguments", args))
×
146

×
147
        var def storage.SchemaDefinition
×
148
        row := r.database.ReadPool.QueryRow(ctx, query, args...)
×
149
        if err = row.Scan(&def.Name, &def.SerializedDefinition, &def.Version); err != nil {
×
150
                if errors.Is(err, pgx.ErrNoRows) {
×
151
                        return nil, "", utils.HandleError(ctx, span, err, base.ErrorCode_ERROR_CODE_SCHEMA_NOT_FOUND)
×
152
                }
×
153
                return nil, "", utils.HandleError(ctx, span, err, base.ErrorCode_ERROR_CODE_SCAN)
×
154
        }
155

156
        var sch *base.SchemaDefinition
×
157
        sch, err = schema.NewSchemaFromStringDefinitions(false, def.Serialized())
×
158
        if err != nil {
×
159
                return nil, "", utils.HandleError(ctx, span, err, base.ErrorCode_ERROR_CODE_INTERNAL)
×
160
        }
×
161

162
        definition, err = schema.GetEntityByName(sch, name)
×
163

×
164
        slog.DebugContext(ctx, "successfully retrieved", slog.Any("schema definition", definition))
×
165

×
166
        return definition, def.Version, err
×
167
}
168

169
// ReadRuleDefinition - Reads rule config from the repository.
170
func (r *SchemaReader) ReadRuleDefinition(ctx context.Context, tenantID, name, version string) (definition *base.RuleDefinition, v string, err error) {
×
171
        ctx, span := tracer.Start(ctx, "schema-reader.read-rule-definition")
×
172
        defer span.End()
×
173

×
174
        slog.DebugContext(ctx, "reading rule definition", slog.Any("tenant_id", tenantID), slog.Any("name", name), slog.Any("version", version))
×
175

×
176
        builder := r.database.Builder.Select("name, serialized_definition, version").Where(squirrel.Eq{"name": name, "version": version, "tenant_id": tenantID}).From(SchemaDefinitionTable).Limit(1)
×
177

×
178
        var query string
×
179
        var args []interface{}
×
180

×
181
        query, args, err = builder.ToSql()
×
182
        if err != nil {
×
183
                return nil, "", utils.HandleError(ctx, span, err, base.ErrorCode_ERROR_CODE_SQL_BUILDER)
×
184
        }
×
185

186
        slog.DebugContext(ctx, "executing sql query", slog.Any("query", query), slog.Any("arguments", args))
×
187

×
188
        var def storage.SchemaDefinition
×
189
        row := r.database.ReadPool.QueryRow(ctx, query, args...)
×
190
        if err = row.Scan(&def.Name, &def.SerializedDefinition, &def.Version); err != nil {
×
191
                if errors.Is(err, pgx.ErrNoRows) {
×
192
                        return nil, "", utils.HandleError(ctx, span, err, base.ErrorCode_ERROR_CODE_SCHEMA_NOT_FOUND)
×
193
                }
×
194
                return nil, "", utils.HandleError(ctx, span, err, base.ErrorCode_ERROR_CODE_SCAN)
×
195
        }
196

197
        slog.DebugContext(ctx, "successfully retrieved rule definition for", slog.Any("name", name))
×
198

×
199
        var sch *base.SchemaDefinition
×
200
        sch, err = schema.NewSchemaFromStringDefinitions(false, def.Serialized())
×
201
        if err != nil {
×
202
                return nil, "", utils.HandleError(ctx, span, err, base.ErrorCode_ERROR_CODE_INTERNAL)
×
203
        }
×
204

205
        definition, err = schema.GetRuleByName(sch, name)
×
206

×
207
        slog.DebugContext(ctx, "successfully created rule definition")
×
208

×
209
        return definition, def.Version, err
×
210
}
211

212
// HeadVersion - Finds the latest version of the schema.
213
func (r *SchemaReader) HeadVersion(ctx context.Context, tenantID string) (version string, err error) {
×
214
        ctx, span := tracer.Start(ctx, "schema-reader.head-version")
×
215
        defer span.End()
×
216

×
217
        slog.DebugContext(ctx, "finding the latest version fo the schema for", slog.String("tenant_id", tenantID))
×
218

×
219
        var query string
×
220
        var args []interface{}
×
221
        query, args, err = r.database.Builder.
×
222
                Select("version").From(SchemaDefinitionTable).Where(squirrel.Eq{"tenant_id": tenantID}).OrderBy("version DESC").Limit(1).
×
223
                ToSql()
×
224
        if err != nil {
×
225
                return "", utils.HandleError(ctx, span, err, base.ErrorCode_ERROR_CODE_SQL_BUILDER)
×
226
        }
×
227

228
        slog.DebugContext(ctx, "executing sql query", slog.Any("query", query), slog.Any("arguments", args))
×
229

×
230
        row := r.database.ReadPool.QueryRow(ctx, query, args...)
×
231
        err = row.Scan(&version)
×
232
        if err != nil {
×
233
                if errors.Is(err, pgx.ErrNoRows) {
×
234
                        return "", utils.HandleError(ctx, span, err, base.ErrorCode_ERROR_CODE_SCHEMA_NOT_FOUND)
×
235
                }
×
236
                return "", utils.HandleError(ctx, span, err, base.ErrorCode_ERROR_CODE_SCAN)
×
237
        }
238

239
        slog.DebugContext(ctx, "successfully found the latest schema version", slog.Any("version", version))
×
240

×
241
        return version, nil
×
242
}
243

244
// ListSchemas - List all Schemas
245
func (r *SchemaReader) ListSchemas(ctx context.Context, tenantID string, pagination database.Pagination) (schemas []*base.SchemaList, ct database.EncodedContinuousToken, err error) {
×
246
        ctx, span := tracer.Start(ctx, "tenant-reader.list-tenants")
×
247
        defer span.End()
×
248

×
249
        slog.DebugContext(ctx, "listing schemas with pagination", slog.Any("pagination", pagination))
×
250

×
251
        builder := r.database.Builder.Select("DISTINCT version").From(SchemaDefinitionTable).Where(squirrel.Eq{"tenant_id": tenantID})
×
252
        if pagination.Token() != "" {
×
253
                var t database.ContinuousToken
×
254
                t, err = utils.EncodedContinuousToken{Value: pagination.Token()}.Decode()
×
255
                if err != nil {
×
256
                        return nil, nil, utils.HandleError(ctx, span, err, base.ErrorCode_ERROR_CODE_INVALID_CONTINUOUS_TOKEN)
×
257
                }
×
258
                builder = builder.Where(squirrel.LtOrEq{"version": t.(utils.ContinuousToken).Value})
×
259
        }
260

261
        builder = builder.OrderBy("version DESC").Limit(uint64(pagination.PageSize() + 1))
×
262

×
263
        var query string
×
264
        var args []interface{}
×
265

×
266
        query, args, err = builder.ToSql()
×
267
        if err != nil {
×
268
                return nil, nil, utils.HandleError(ctx, span, err, base.ErrorCode_ERROR_CODE_SQL_BUILDER)
×
269
        }
×
270

271
        slog.DebugContext(ctx, "executing sql query", slog.Any("query", query), slog.Any("arguments", args))
×
272

×
273
        var rows pgx.Rows
×
274
        rows, err = r.database.ReadPool.Query(ctx, query, args...)
×
275
        if err != nil {
×
276
                return nil, nil, utils.HandleError(ctx, span, err, base.ErrorCode_ERROR_CODE_EXECUTION)
×
277
        }
×
278
        defer rows.Close()
×
279

×
280
        var lastVersion string
×
281
        schemas = make([]*base.SchemaList, 0, pagination.PageSize()+1)
×
282
        for rows.Next() {
×
283
                sch := &base.SchemaList{}
×
284
                err = rows.Scan(&sch.Version)
×
285
                if err != nil {
×
286
                        return nil, nil, utils.HandleError(ctx, span, err, base.ErrorCode_ERROR_CODE_SCAN)
×
287
                }
×
288
                id, err := xid.FromString(sch.Version)
×
289
                if err != nil {
×
290
                        return nil, nil, utils.HandleError(ctx, span, err, base.ErrorCode_ERROR_CODE_SCAN)
×
291
                }
×
292
                sch.CreatedAt = id.Time().String()
×
293
                lastVersion = sch.Version
×
294
                schemas = append(schemas, sch)
×
295
        }
296
        if err = rows.Err(); err != nil {
×
297
                return nil, nil, utils.HandleError(ctx, span, err, base.ErrorCode_ERROR_CODE_INTERNAL)
×
298
        }
×
299

300
        slog.DebugContext(ctx, "successfully listed schemas", slog.Any("number_of_schemas", len(schemas)))
×
301

×
302
        if len(schemas) > int(pagination.PageSize()) {
×
303
                return schemas[:pagination.PageSize()], utils.NewContinuousToken(lastVersion).Encode(), nil
×
304
        }
×
305
        return schemas, database.NewNoopContinuousToken().Encode(), nil
×
306
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc