• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

Permify / permify / 6945401364

21 Nov 2023 02:42PM UTC coverage: 70.338%. Remained the same
6945401364

push

github

web-flow
Merge pull request #856 from Permify/next

Next

6047 of 8597 relevant lines covered (70.34%)

52.67 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

48.94
/internal/storage/postgres/schemaReader.go
1
package postgres
2

3
import (
4
        "context"
5
        "database/sql"
6
        "errors"
7
        "log/slog"
8

9
        "github.com/Masterminds/squirrel"
10
        "go.opentelemetry.io/otel/codes"
11

12
        "github.com/Permify/permify/internal/schema"
13
        "github.com/Permify/permify/internal/storage"
14
        db "github.com/Permify/permify/pkg/database/postgres"
15
        base "github.com/Permify/permify/pkg/pb/base/v1"
16
)
17

18
// SchemaReader - Structure for SchemaReader
19
type SchemaReader struct {
20
        database *db.Postgres
21
        // options
22
        txOptions sql.TxOptions
23
}
24

25
// NewSchemaReader - Creates a new SchemaReader
26
func NewSchemaReader(database *db.Postgres) *SchemaReader {
5✔
27
        return &SchemaReader{
5✔
28
                database:  database,
5✔
29
                txOptions: sql.TxOptions{Isolation: sql.LevelReadCommitted, ReadOnly: true},
5✔
30
        }
5✔
31
}
5✔
32

33
// ReadSchema - Reads entity config from the repository.
34
func (r *SchemaReader) ReadSchema(ctx context.Context, tenantID, version string) (sch *base.SchemaDefinition, err error) {
2✔
35
        ctx, span := tracer.Start(ctx, "schema-reader.read-schema")
2✔
36
        defer span.End()
2✔
37

2✔
38
        slog.Info("Reading schema: ", slog.Any("tenant_id", tenantID), slog.Any("version", version))
2✔
39

2✔
40
        builder := r.database.Builder.Select("name, serialized_definition, version").From(SchemaDefinitionTable).Where(squirrel.Eq{"version": version, "tenant_id": tenantID})
2✔
41

2✔
42
        var query string
2✔
43
        var args []interface{}
2✔
44

2✔
45
        query, args, err = builder.ToSql()
2✔
46
        if err != nil {
2✔
47
                span.RecordError(err)
×
48
                span.SetStatus(codes.Error, err.Error())
×
49

×
50
                slog.Error("Error in building SQL query: ", slog.Any("error", err))
×
51

×
52
                return nil, errors.New(base.ErrorCode_ERROR_CODE_SQL_BUILDER.String())
×
53
        }
×
54

55
        slog.Debug("Executing SQL query: ", slog.Any("query", query), slog.Any("arguments", args))
2✔
56

2✔
57
        var rows *sql.Rows
2✔
58
        rows, err = r.database.DB.QueryContext(ctx, query, args...)
2✔
59
        if err != nil {
2✔
60
                span.RecordError(err)
×
61
                span.SetStatus(codes.Error, err.Error())
×
62

×
63
                slog.Error("Error in executing query: ", slog.Any("error", err))
×
64

×
65
                return nil, errors.New(base.ErrorCode_ERROR_CODE_EXECUTION.String())
×
66
        }
×
67
        defer rows.Close()
2✔
68

2✔
69
        var definitions []string
2✔
70
        for rows.Next() {
6✔
71
                sd := storage.SchemaDefinition{}
4✔
72
                err = rows.Scan(&sd.Name, &sd.SerializedDefinition, &sd.Version)
4✔
73
                if err != nil {
4✔
74
                        span.RecordError(err)
×
75
                        span.SetStatus(codes.Error, err.Error())
×
76

×
77
                        slog.Error("Error scanning rows: ", slog.Any("error", err))
×
78

×
79
                        return nil, err
×
80
                }
×
81
                definitions = append(definitions, sd.Serialized())
4✔
82
        }
83
        if err = rows.Err(); err != nil {
2✔
84
                span.RecordError(err)
×
85
                span.SetStatus(codes.Error, err.Error())
×
86

×
87
                slog.Error("Error iterating over rows: ", slog.Any("error", err))
×
88

×
89
                return nil, err
×
90
        }
×
91

92
        slog.Info("Successfully retrieved", slog.Any("schema definitions", len(definitions)))
2✔
93

2✔
94
        sch, err = schema.NewSchemaFromStringDefinitions(false, definitions...)
2✔
95
        if err != nil {
2✔
96
                span.RecordError(err)
×
97
                span.SetStatus(codes.Error, err.Error())
×
98

×
99
                slog.Error("Failed while creating schema from definitions: ", slog.Any("error", err))
×
100

×
101
                return nil, err
×
102
        }
×
103

104
        slog.Info("Successfully created schema.")
2✔
105

2✔
106
        return sch, err
2✔
107
}
108

109
// ReadEntityDefinition - Reads entity config from the repository.
110
func (r *SchemaReader) ReadEntityDefinition(ctx context.Context, tenantID, name, version string) (definition *base.EntityDefinition, v string, err error) {
1✔
111
        ctx, span := tracer.Start(ctx, "schema-reader.read-entity-definition")
1✔
112
        defer span.End()
1✔
113

1✔
114
        slog.Info("Reading entity definition: ", slog.Any("tenant_id", tenantID), slog.Any("version", version))
1✔
115

1✔
116
        builder := r.database.Builder.Select("name, serialized_definition, version").Where(squirrel.Eq{"name": name, "version": version, "tenant_id": tenantID}).From(SchemaDefinitionTable).Limit(1)
1✔
117

1✔
118
        var query string
1✔
119
        var args []interface{}
1✔
120

1✔
121
        query, args, err = builder.ToSql()
1✔
122
        if err != nil {
1✔
123
                span.RecordError(err)
×
124
                span.SetStatus(codes.Error, err.Error())
×
125

×
126
                slog.Error("Error building SQL query: ", slog.Any("error", err))
×
127

×
128
                return nil, "", errors.New(base.ErrorCode_ERROR_CODE_SQL_BUILDER.String())
×
129
        }
×
130

131
        slog.Debug("Executing SQL query: ", slog.Any("query", query), slog.Any("arguments", args))
1✔
132

1✔
133
        var def storage.SchemaDefinition
1✔
134
        row := r.database.DB.QueryRowContext(ctx, query, args...)
1✔
135
        if err = row.Err(); err != nil {
1✔
136
                span.RecordError(err)
×
137
                span.SetStatus(codes.Error, err.Error())
×
138

×
139
                slog.Error("Error executing query: ", slog.Any("error", err))
×
140

×
141
                return nil, "", errors.New(base.ErrorCode_ERROR_CODE_EXECUTION.String())
×
142
        }
×
143

144
        if err = row.Scan(&def.Name, &def.SerializedDefinition, &def.Version); err != nil {
1✔
145
                span.RecordError(err)
×
146
                span.SetStatus(codes.Error, err.Error())
×
147
                if errors.Is(err, sql.ErrNoRows) {
×
148
                        return nil, "", errors.New(base.ErrorCode_ERROR_CODE_SCHEMA_NOT_FOUND.String())
×
149
                }
×
150

151
                slog.Error("Error scanning rows: ", slog.Any("error", err))
×
152

×
153
                return nil, "", errors.New(base.ErrorCode_ERROR_CODE_SCAN.String())
×
154
        }
155

156
        var sch *base.SchemaDefinition
1✔
157
        sch, err = schema.NewSchemaFromStringDefinitions(false, def.Serialized())
1✔
158
        if err != nil {
1✔
159
                span.RecordError(err)
×
160
                span.SetStatus(codes.Error, err.Error())
×
161

×
162
                slog.Error("Failed while creating schema from definitions: ", slog.Any("error", err))
×
163

×
164
                return nil, "", err
×
165
        }
×
166

167
        definition, err = schema.GetEntityByName(sch, name)
1✔
168

1✔
169
        slog.Info("Successfully retrieved", slog.Any("schema definition", definition))
1✔
170

1✔
171
        return definition, def.Version, err
1✔
172
}
173

174
// ReadRuleDefinition - Reads rule config from the repository.
175
func (r *SchemaReader) ReadRuleDefinition(ctx context.Context, tenantID, name, version string) (definition *base.RuleDefinition, v string, err error) {
1✔
176
        ctx, span := tracer.Start(ctx, "schema-reader.read-rule-definition")
1✔
177
        defer span.End()
1✔
178

1✔
179
        slog.Info("Reading rule definition: ", slog.Any("tenant_id", tenantID), slog.Any("name", name), slog.Any("version", version))
1✔
180

1✔
181
        builder := r.database.Builder.Select("name, serialized_definition, version").Where(squirrel.Eq{"name": name, "version": version, "tenant_id": tenantID}).From(SchemaDefinitionTable).Limit(1)
1✔
182

1✔
183
        var query string
1✔
184
        var args []interface{}
1✔
185

1✔
186
        query, args, err = builder.ToSql()
1✔
187
        if err != nil {
1✔
188
                span.RecordError(err)
×
189
                span.SetStatus(codes.Error, err.Error())
×
190

×
191
                slog.Error("Error building SQL query: ", slog.Any("error", err))
×
192

×
193
                return nil, "", errors.New(base.ErrorCode_ERROR_CODE_SQL_BUILDER.String())
×
194
        }
×
195

196
        slog.Debug("Executing SQL query: ", slog.Any("query", query), slog.Any("arguments", args))
1✔
197

1✔
198
        var def storage.SchemaDefinition
1✔
199
        row := r.database.DB.QueryRowContext(ctx, query, args...)
1✔
200
        if err = row.Err(); err != nil {
1✔
201
                span.RecordError(err)
×
202
                span.SetStatus(codes.Error, err.Error())
×
203

×
204
                slog.Error("Error executing query: ", slog.Any("error", err))
×
205

×
206
                return nil, "", errors.New(base.ErrorCode_ERROR_CODE_EXECUTION.String())
×
207
        }
×
208

209
        if err = row.Scan(&def.Name, &def.SerializedDefinition, &def.Version); err != nil {
1✔
210

×
211
                slog.Error("Error scanning row: ", slog.Any("error", err))
×
212

×
213
                span.RecordError(err)
×
214
                span.SetStatus(codes.Error, err.Error())
×
215
                if errors.Is(err, sql.ErrNoRows) {
×
216

×
217
                        slog.Error("Rule not found in the database")
×
218

×
219
                        return nil, "", errors.New(base.ErrorCode_ERROR_CODE_SCHEMA_NOT_FOUND.String())
×
220
                }
×
221

222
                slog.Error("Error scanning row values: ", slog.Any("error", err))
×
223

×
224
                return nil, "", errors.New(base.ErrorCode_ERROR_CODE_SCAN.String())
×
225
        }
226

227
        slog.Info("Successfully retrieved rule definition for: ", slog.Any("name", name))
1✔
228

1✔
229
        var sch *base.SchemaDefinition
1✔
230
        sch, err = schema.NewSchemaFromStringDefinitions(false, def.Serialized())
1✔
231
        if err != nil {
1✔
232
                span.RecordError(err)
×
233
                span.SetStatus(codes.Error, err.Error())
×
234

×
235
                slog.Error("Error creating schema from definition: ", slog.Any("error", err))
×
236

×
237
                return nil, "", err
×
238
        }
×
239

240
        definition, err = schema.GetRuleByName(sch, name)
1✔
241
        slog.Info("Successfully created rule definition")
1✔
242

1✔
243
        return definition, def.Version, err
1✔
244
}
245

246
// HeadVersion - Finds the latest version of the schema.
247
func (r *SchemaReader) HeadVersion(ctx context.Context, tenantID string) (version string, err error) {
1✔
248
        ctx, span := tracer.Start(ctx, "schema-reader.head-version")
1✔
249
        defer span.End()
1✔
250

1✔
251
        slog.Info("Finding the latest version fo the schema for: ", slog.String("tenant_id", tenantID))
1✔
252

1✔
253
        var query string
1✔
254
        var args []interface{}
1✔
255
        query, args, err = r.database.Builder.
1✔
256
                Select("version").From(SchemaDefinitionTable).Where(squirrel.Eq{"tenant_id": tenantID}).OrderBy("version DESC").Limit(1).
1✔
257
                ToSql()
1✔
258
        if err != nil {
1✔
259

×
260
                slog.Error("Failed to build SQL query: ", slog.Any("error", err))
×
261

×
262
                span.RecordError(err)
×
263
                span.SetStatus(codes.Error, err.Error())
×
264
                return "", errors.New(base.ErrorCode_ERROR_CODE_SQL_BUILDER.String())
×
265
        }
×
266

267
        slog.Debug("Executing SQL query: ", slog.Any("query", query), slog.Any("arguments", args))
1✔
268

1✔
269
        row := r.database.DB.QueryRowContext(ctx, query, args...)
1✔
270
        err = row.Scan(&version)
1✔
271
        if err != nil {
1✔
272

×
273
                slog.Error("Error while scanning row: ", slog.Any("error", err))
×
274

×
275
                span.RecordError(err)
×
276
                span.SetStatus(codes.Error, err.Error())
×
277
                if errors.Is(err, sql.ErrNoRows) {
×
278

×
279
                        slog.Error("Schema not found in the database.")
×
280

×
281
                        return "", errors.New(base.ErrorCode_ERROR_CODE_SCHEMA_NOT_FOUND.String())
×
282
                }
×
283

284
                slog.Error("Error while scanning row values: ", slog.Any("error", err))
×
285

×
286
                return "", err
×
287
        }
288

289
        slog.Info("Successfully found the latest schema version: ", slog.Any("version", version))
1✔
290

1✔
291
        return version, nil
1✔
292
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc