• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

dgraph-io / dgraph / 4763653586

21 Apr 2023 10:46AM UTC coverage: 66.83% (-0.09%) from 66.924%
4763653586

push

GitHub
fix(vscode): fixed Jaeger parameters (#8801)

58024 of 86823 relevant lines covered (66.83%)

2239173.62 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

83.61
/edgraph/server.go
1
/*
2
 * Copyright 2017-2023 Dgraph Labs, Inc. and Contributors
3
 *
4
 * Licensed under the Apache License, Version 2.0 (the "License");
5
 * you may not use this file except in compliance with the License.
6
 * You may obtain a copy of the License at
7
 *
8
 *     http://www.apache.org/licenses/LICENSE-2.0
9
 *
10
 * Unless required by applicable law or agreed to in writing, software
11
 * distributed under the License is distributed on an "AS IS" BASIS,
12
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
 * See the License for the specific language governing permissions and
14
 * limitations under the License.
15
 */
16

17
package edgraph
18

19
import (
20
        "bytes"
21
        "context"
22
        "crypto/sha256"
23
        "encoding/hex"
24
        "encoding/json"
25
        "fmt"
26
        "math"
27
        "net"
28
        "sort"
29
        "strconv"
30
        "strings"
31
        "sync/atomic"
32
        "time"
33
        "unicode"
34

35
        "github.com/gogo/protobuf/jsonpb"
36
        "github.com/golang/glog"
37
        "github.com/pkg/errors"
38
        ostats "go.opencensus.io/stats"
39
        "go.opencensus.io/tag"
40
        otrace "go.opencensus.io/trace"
41
        "google.golang.org/grpc"
42
        "google.golang.org/grpc/codes"
43
        "google.golang.org/grpc/metadata"
44
        "google.golang.org/grpc/status"
45

46
        "github.com/dgraph-io/dgo/v210"
47
        "github.com/dgraph-io/dgo/v210/protos/api"
48
        "github.com/dgraph-io/dgraph/chunker"
49
        "github.com/dgraph-io/dgraph/conn"
50
        "github.com/dgraph-io/dgraph/dql"
51
        gqlSchema "github.com/dgraph-io/dgraph/graphql/schema"
52
        "github.com/dgraph-io/dgraph/posting"
53
        "github.com/dgraph-io/dgraph/protos/pb"
54
        "github.com/dgraph-io/dgraph/query"
55
        "github.com/dgraph-io/dgraph/schema"
56
        "github.com/dgraph-io/dgraph/telemetry"
57
        "github.com/dgraph-io/dgraph/tok"
58
        "github.com/dgraph-io/dgraph/types"
59
        "github.com/dgraph-io/dgraph/types/facets"
60
        "github.com/dgraph-io/dgraph/worker"
61
        "github.com/dgraph-io/dgraph/x"
62
)
63

64
const (
65
        methodMutate = "Server.Mutate"
66
        methodQuery  = "Server.Query"
67
)
68

69
type GraphqlContextKey int
70

71
const (
72
        // IsGraphql is used to validate requests which are allowed to mutate GraphQL reserved
73
        // predicates, like dgraph.graphql.schema and dgraph.graphql.xid.
74
        IsGraphql GraphqlContextKey = iota
75
        // Authorize is used to set if the request requires validation.
76
        Authorize
77
)
78

79
type AuthMode int
80

81
const (
82
        // NeedAuthorize is used to indicate that the request needs to be authorized.
83
        NeedAuthorize AuthMode = iota
84
        // NoAuthorize is used to indicate that authorization needs to be skipped.
85
        // Used when ACL needs to query information for performing the authorization check.
86
        NoAuthorize
87
)
88

89
var (
90
        numGraphQLPM uint64
91
        numGraphQL   uint64
92
)
93

94
var (
95
        errIndexingInProgress = errors.New("errIndexingInProgress. Please retry")
96
)
97

98
// Server implements protos.DgraphServer
99
type Server struct{}
100

101
// graphQLSchemaNode represents the node which contains GraphQL schema
102
type graphQLSchemaNode struct {
103
        Uid    string `json:"uid"`
104
        UidInt uint64
105
        Schema string `json:"dgraph.graphql.schema"`
106
}
107

108
type existingGQLSchemaQryResp struct {
109
        ExistingGQLSchema []graphQLSchemaNode `json:"ExistingGQLSchema"`
110
}
111

112
// PeriodicallyPostTelemetry periodically reports telemetry data for alpha.
113
func PeriodicallyPostTelemetry() {
96✔
114
        glog.V(2).Infof("Starting telemetry data collection for alpha...")
96✔
115

96✔
116
        start := time.Now()
96✔
117
        ticker := time.NewTicker(time.Minute * 10)
96✔
118
        defer ticker.Stop()
96✔
119

96✔
120
        var lastPostedAt time.Time
96✔
121
        for range ticker.C {
120✔
122
                if time.Since(lastPostedAt) < time.Hour {
24✔
123
                        continue
×
124
                }
125
                ms := worker.GetMembershipState()
24✔
126
                t := telemetry.NewAlpha(ms)
24✔
127
                t.NumGraphQLPM = atomic.SwapUint64(&numGraphQLPM, 0)
24✔
128
                t.NumGraphQL = atomic.SwapUint64(&numGraphQL, 0)
24✔
129
                t.SinceHours = int(time.Since(start).Hours())
24✔
130
                glog.V(2).Infof("Posting Telemetry data: %+v", t)
24✔
131

24✔
132
                err := t.Post()
24✔
133
                if err == nil {
24✔
134
                        lastPostedAt = time.Now()
×
135
                } else {
24✔
136
                        atomic.AddUint64(&numGraphQLPM, t.NumGraphQLPM)
24✔
137
                        atomic.AddUint64(&numGraphQL, t.NumGraphQL)
24✔
138
                        glog.V(2).Infof("Telemetry couldn't be posted. Error: %v", err)
24✔
139
                }
24✔
140
        }
141
}
142

143
// GetGQLSchema queries for the GraphQL schema node, and returns the uid and the GraphQL schema.
144
// If multiple schema nodes were found, it returns an error.
145
func GetGQLSchema(namespace uint64) (uid, graphQLSchema string, err error) {
226✔
146
        ctx := context.WithValue(context.Background(), Authorize, false)
226✔
147
        ctx = x.AttachNamespace(ctx, namespace)
226✔
148
        resp, err := (&Server{}).QueryNoGrpc(ctx,
226✔
149
                &api.Request{
226✔
150
                        Query: `
226✔
151
                        query {
226✔
152
                                ExistingGQLSchema(func: has(dgraph.graphql.schema)) {
226✔
153
                                        uid
226✔
154
                                        dgraph.graphql.schema
226✔
155
                                  }
226✔
156
                                }`})
226✔
157
        if err != nil {
276✔
158
                return "", "", err
50✔
159
        }
50✔
160

161
        var result existingGQLSchemaQryResp
176✔
162
        if err := json.Unmarshal(resp.GetJson(), &result); err != nil {
176✔
163
                return "", "", errors.Wrap(err, "Couldn't unmarshal response from Dgraph query")
×
164
        }
×
165
        res := result.ExistingGQLSchema
176✔
166
        if len(res) == 0 {
297✔
167
                // no schema has been stored yet in Dgraph
121✔
168
                return "", "", nil
121✔
169
        } else if len(res) == 1 {
231✔
170
                // we found an existing GraphQL schema
55✔
171
                gqlSchemaNode := res[0]
55✔
172
                return gqlSchemaNode.Uid, gqlSchemaNode.Schema, nil
55✔
173
        }
55✔
174

175
        // found multiple GraphQL schema nodes, this should never happen
176
        // returning the schema node which is added last
177
        for i := range res {
×
178
                iUid, err := dql.ParseUid(res[i].Uid)
×
179
                if err != nil {
×
180
                        return "", "", err
×
181
                }
×
182
                res[i].UidInt = iUid
×
183
        }
184

185
        sort.Slice(res, func(i, j int) bool {
×
186
                return res[i].UidInt < res[j].UidInt
×
187
        })
×
188
        glog.Errorf("namespace: %d. Multiple schema nodes found, using the last one", namespace)
×
189
        resLast := res[len(res)-1]
×
190
        return resLast.Uid, resLast.Schema, nil
×
191
}
192

193
// UpdateGQLSchema updates the GraphQL and Dgraph schemas using the given inputs.
194
// It first validates and parses the dgraphSchema given in input. If that fails,
195
// it returns an error. All this is done on the alpha on which the update request is received.
196
// Then it sends an update request to the worker, which is executed only on Group-1 leader.
197
func UpdateGQLSchema(ctx context.Context, gqlSchema,
198
        dgraphSchema string) (*pb.UpdateGraphQLSchemaResponse, error) {
517✔
199
        var err error
517✔
200
        parsedDgraphSchema := &schema.ParsedSchema{}
517✔
201

517✔
202
        if !x.WorkerConfig.AclEnabled {
767✔
203
                ctx = x.AttachNamespace(ctx, x.GalaxyNamespace)
250✔
204
        }
250✔
205
        // The schema could be empty if it only has custom types/queries/mutations.
206
        if dgraphSchema != "" {
703✔
207
                op := &api.Operation{Schema: dgraphSchema}
186✔
208
                if err = validateAlterOperation(ctx, op); err != nil {
186✔
209
                        return nil, err
×
210
                }
×
211
                if parsedDgraphSchema, err = parseSchemaFromAlterOperation(ctx, op); err != nil {
186✔
212
                        return nil, err
×
213
                }
×
214
        }
215

216
        return worker.UpdateGQLSchemaOverNetwork(ctx, &pb.UpdateGraphQLSchemaRequest{
517✔
217
                StartTs:       worker.State.GetTimestamp(false),
517✔
218
                GraphqlSchema: gqlSchema,
517✔
219
                DgraphPreds:   parsedDgraphSchema.Preds,
517✔
220
                DgraphTypes:   parsedDgraphSchema.Types,
517✔
221
        })
517✔
222
}
223

224
// validateAlterOperation validates the given operation for alter.
225
func validateAlterOperation(ctx context.Context, op *api.Operation) error {
1,259✔
226
        // The following code block checks if the operation should run or not.
1,259✔
227
        if op.Schema == "" && op.DropAttr == "" && !op.DropAll && op.DropOp == api.Operation_NONE {
1,259✔
228
                // Must have at least one field set. This helps users if they attempt
×
229
                // to set a field but use the wrong name (could be decoded from JSON).
×
230
                return errors.Errorf("Operation must have at least one field set")
×
231
        }
×
232
        if err := x.HealthCheck(); err != nil {
1,260✔
233
                return err
1✔
234
        }
1✔
235

236
        if isDropAll(op) && op.DropOp == api.Operation_DATA {
1,259✔
237
                return errors.Errorf("Only one of DropAll and DropData can be true")
1✔
238
        }
1✔
239

240
        if !isMutationAllowed(ctx) {
1,259✔
241
                return errors.Errorf("No mutations allowed by server.")
2✔
242
        }
2✔
243
        if _, err := hasAdminAuth(ctx, "Alter"); err != nil {
1,255✔
244
                glog.Warningf("Alter denied with error: %v\n", err)
×
245
                return err
×
246
        }
×
247

248
        if err := authorizeAlter(ctx, op); err != nil {
1,279✔
249
                glog.Warningf("Alter denied with error: %v\n", err)
24✔
250
                return err
24✔
251
        }
24✔
252

253
        return nil
1,231✔
254
}
255

256
// parseSchemaFromAlterOperation parses the string schema given in input operation to a Go
257
// struct, and performs some checks to make sure that the schema is valid.
258
func parseSchemaFromAlterOperation(ctx context.Context, op *api.Operation) (
259
        *schema.ParsedSchema, error) {
907✔
260

907✔
261
        // If a background task is already running, we should reject all the new alter requests.
907✔
262
        if schema.State().IndexingInProgress() {
907✔
263
                return nil, errIndexingInProgress
×
264
        }
×
265

266
        namespace, err := x.ExtractNamespace(ctx)
907✔
267
        if err != nil {
907✔
268
                return nil, errors.Wrapf(err, "While parsing schema")
×
269
        }
×
270

271
        if x.IsGalaxyOperation(ctx) {
910✔
272
                // Only the guardian of the galaxy can do a galaxy wide query/mutation. This operation is
3✔
273
                // needed by live loader.
3✔
274
                if err := AuthGuardianOfTheGalaxy(ctx); err != nil {
3✔
275
                        s := status.Convert(err)
×
276
                        return nil, status.Error(s.Code(),
×
277
                                "Non guardian of galaxy user cannot bypass namespaces. "+s.Message())
×
278
                }
×
279
                var err error
3✔
280
                namespace, err = strconv.ParseUint(x.GetForceNamespace(ctx), 0, 64)
3✔
281
                if err != nil {
3✔
282
                        return nil, errors.Wrapf(err, "Valid force namespace not found in metadata")
×
283
                }
×
284
        }
285

286
        result, err := schema.ParseWithNamespace(op.Schema, namespace)
907✔
287
        if err != nil {
908✔
288
                return nil, err
1✔
289
        }
1✔
290

291
        preds := make(map[string]struct{})
906✔
292

906✔
293
        for _, update := range result.Preds {
4,636✔
294
                if _, ok := preds[update.Predicate]; ok {
3,731✔
295
                        return nil, errors.Errorf("predicate %s defined multiple times",
1✔
296
                                x.ParseAttr(update.Predicate))
1✔
297
                }
1✔
298
                preds[update.Predicate] = struct{}{}
3,729✔
299

3,729✔
300
                // Pre-defined predicates cannot be altered but let the update go through
3,729✔
301
                // if the update is equal to the existing one.
3,729✔
302
                if schema.IsPreDefPredChanged(update) {
3,731✔
303
                        return nil, errors.Errorf("predicate %s is pre-defined and is not allowed to be"+
2✔
304
                                " modified", x.ParseAttr(update.Predicate))
2✔
305
                }
2✔
306

307
                if err := validatePredName(update.Predicate); err != nil {
3,728✔
308
                        return nil, err
1✔
309
                }
1✔
310
                // Users are not allowed to create a predicate under the reserved `dgraph.` namespace. But,
311
                // there are pre-defined predicates (subset of reserved predicates), and for them we allow
312
                // the schema update to go through if the update is equal to the existing one.
313
                // So, here we check if the predicate is reserved but not pre-defined to block users from
314
                // creating predicates in reserved namespace.
315
                if x.IsReservedPredicate(update.Predicate) && !x.IsPreDefinedPredicate(update.Predicate) {
3,727✔
316
                        return nil, errors.Errorf("Can't alter predicate `%s` as it is prefixed with `dgraph.`"+
1✔
317
                                " which is reserved as the namespace for dgraph's internal types/predicates.",
1✔
318
                                x.ParseAttr(update.Predicate))
1✔
319
                }
1✔
320
        }
321

322
        types := make(map[string]struct{})
901✔
323

901✔
324
        for _, typ := range result.Types {
2,107✔
325
                if _, ok := types[typ.TypeName]; ok {
1,207✔
326
                        return nil, errors.Errorf("type %s defined multiple times", x.ParseAttr(typ.TypeName))
1✔
327
                }
1✔
328
                types[typ.TypeName] = struct{}{}
1,205✔
329

1,205✔
330
                // Pre-defined types cannot be altered but let the update go through
1,205✔
331
                // if the update is equal to the existing one.
1,205✔
332
                if schema.IsPreDefTypeChanged(typ) {
1,206✔
333
                        return nil, errors.Errorf("type %s is pre-defined and is not allowed to be modified",
1✔
334
                                x.ParseAttr(typ.TypeName))
1✔
335
                }
1✔
336

337
                // Users are not allowed to create types in reserved namespace. But, there are pre-defined
338
                // types for which the update should go through if the update is equal to the existing one.
339
                if x.IsReservedType(typ.TypeName) && !x.IsPreDefinedType(typ.TypeName) {
1,205✔
340
                        return nil, errors.Errorf("Can't alter type `%s` as it is prefixed with `dgraph.` "+
1✔
341
                                "which is reserved as the namespace for dgraph's internal types/predicates.",
1✔
342
                                x.ParseAttr(typ.TypeName))
1✔
343
                }
1✔
344
        }
345

346
        return result, nil
898✔
347
}
348

349
// InsertDropRecord is used to insert a helper record when a DROP operation is performed.
350
// This helper record lets us know during backup that a DROP operation was performed and that we
351
// need to write this information in backup manifest. So that while restoring from a backup series,
352
// we can create an exact replica of the system which existed at the time the last backup was taken.
353
// Note that if the server crashes after the DROP operation & before this helper record is inserted,
354
// then restoring from the incremental backup of such a DB would restore even the dropped
355
// data back. This is also used to capture the delete namespace operation during backup.
356
func InsertDropRecord(ctx context.Context, dropOp string) error {
332✔
357
        _, err := (&Server{}).doQuery(context.WithValue(ctx, IsGraphql, true), &Request{
332✔
358
                req: &api.Request{
332✔
359
                        Mutations: []*api.Mutation{{
332✔
360
                                Set: []*api.NQuad{{
332✔
361
                                        Subject:     "_:r",
332✔
362
                                        Predicate:   "dgraph.drop.op",
332✔
363
                                        ObjectValue: &api.Value{Val: &api.Value_StrVal{StrVal: dropOp}},
332✔
364
                                }},
332✔
365
                        }},
332✔
366
                        CommitNow: true,
332✔
367
                }, doAuth: NoAuthorize})
332✔
368
        return err
332✔
369
}
332✔
370

371
// Alter handles requests to change the schema or remove parts or all of the data.
372
func (s *Server) Alter(ctx context.Context, op *api.Operation) (*api.Payload, error) {
1,073✔
373
        ctx, span := otrace.StartSpan(ctx, "Server.Alter")
1,073✔
374
        defer span.End()
1,073✔
375

1,073✔
376
        ctx = x.AttachJWTNamespace(ctx)
1,073✔
377
        span.Annotatef(nil, "Alter operation: %+v", op)
1,073✔
378

1,073✔
379
        // Always print out Alter operations because they are important and rare.
1,073✔
380
        glog.Infof("Received ALTER op: %+v", op)
1,073✔
381

1,073✔
382
        // check if the operation is valid
1,073✔
383
        if err := validateAlterOperation(ctx, op); err != nil {
1,101✔
384
                return nil, err
28✔
385
        }
28✔
386

387
        defer glog.Infof("ALTER op: %+v done", op)
1,045✔
388

1,045✔
389
        empty := &api.Payload{}
1,045✔
390
        namespace, err := x.ExtractNamespace(ctx)
1,045✔
391
        if err != nil {
1,045✔
392
                return nil, errors.Wrapf(err, "While altering")
×
393
        }
×
394

395
        // StartTs is not needed if the predicate to be dropped lies on this server but is required
396
        // if it lies on some other machine. Let's get it for safety.
397
        m := &pb.Mutations{StartTs: worker.State.GetTimestamp(false)}
1,045✔
398
        if isDropAll(op) {
1,345✔
399
                if x.Config.BlockClusterWideDrop {
300✔
400
                        glog.V(2).Info("Blocked drop-all because it is not permitted.")
×
401
                        return empty, errors.New("Drop all operation is not permitted.")
×
402
                }
×
403
                if err := AuthGuardianOfTheGalaxy(ctx); err != nil {
300✔
404
                        s := status.Convert(err)
×
405
                        return empty, status.Error(s.Code(),
×
406
                                "Drop all can only be called by the guardian of the galaxy. "+s.Message())
×
407
                }
×
408
                if len(op.DropValue) > 0 {
300✔
409
                        return empty, errors.Errorf("If DropOp is set to ALL, DropValue must be empty")
×
410
                }
×
411

412
                m.DropOp = pb.Mutations_ALL
300✔
413
                _, err := query.ApplyMutations(ctx, m)
300✔
414
                if err != nil {
300✔
415
                        return empty, err
×
416
                }
×
417

418
                // insert a helper record for backup & restore, indicating that drop_all was done
419
                err = InsertDropRecord(ctx, "DROP_ALL;")
300✔
420
                if err != nil {
300✔
421
                        return empty, err
×
422
                }
×
423

424
                // insert empty GraphQL schema, so all alphas get notified to
425
                // reset their in-memory GraphQL schema
426
                _, err = UpdateGQLSchema(ctx, "", "")
300✔
427
                // recreate the admin account after a drop all operation
300✔
428
                InitializeAcl(nil)
300✔
429
                return empty, err
300✔
430
        }
431

432
        if op.DropOp == api.Operation_DATA {
755✔
433
                if len(op.DropValue) > 0 {
10✔
434
                        return empty, errors.Errorf("If DropOp is set to DATA, DropValue must be empty")
×
435
                }
×
436

437
                // query the GraphQL schema and keep it in memory, so it can be inserted again
438
                _, graphQLSchema, err := GetGQLSchema(namespace)
10✔
439
                if err != nil {
10✔
440
                        return empty, err
×
441
                }
×
442

443
                m.DropOp = pb.Mutations_DATA
10✔
444
                m.DropValue = fmt.Sprintf("%#x", namespace)
10✔
445
                _, err = query.ApplyMutations(ctx, m)
10✔
446
                if err != nil {
10✔
447
                        return empty, err
×
448
                }
×
449

450
                // insert a helper record for backup & restore, indicating that drop_data was done
451
                err = InsertDropRecord(ctx, fmt.Sprintf("DROP_DATA;%#x", namespace))
10✔
452
                if err != nil {
10✔
453
                        return empty, err
×
454
                }
×
455

456
                // just reinsert the GraphQL schema, no need to alter dgraph schema as this was drop_data
457
                _, err = UpdateGQLSchema(ctx, graphQLSchema, "")
10✔
458
                // recreate the admin account after a drop data operation
10✔
459
                InitializeAcl(nil)
10✔
460
                return empty, err
10✔
461
        }
462

463
        if len(op.DropAttr) > 0 || op.DropOp == api.Operation_ATTR {
749✔
464
                if op.DropOp == api.Operation_ATTR && op.DropValue == "" {
14✔
465
                        return empty, errors.Errorf("If DropOp is set to ATTR, DropValue must not be empty")
×
466
                }
×
467

468
                var attr string
14✔
469
                if len(op.DropAttr) > 0 {
26✔
470
                        attr = op.DropAttr
12✔
471
                } else {
14✔
472
                        attr = op.DropValue
2✔
473
                }
2✔
474
                attr = x.NamespaceAttr(namespace, attr)
14✔
475
                // Pre-defined predicates cannot be dropped.
14✔
476
                if x.IsPreDefinedPredicate(attr) {
15✔
477
                        return empty, errors.Errorf("predicate %s is pre-defined and is not allowed to be"+
1✔
478
                                " dropped", x.ParseAttr(attr))
1✔
479
                }
1✔
480

481
                nq := &api.NQuad{
13✔
482
                        Subject:     x.Star,
13✔
483
                        Predicate:   x.ParseAttr(attr),
13✔
484
                        ObjectValue: &api.Value{Val: &api.Value_StrVal{StrVal: x.Star}},
13✔
485
                }
13✔
486
                wnq := &dql.NQuad{NQuad: nq}
13✔
487
                edge, err := wnq.ToDeletePredEdge()
13✔
488
                if err != nil {
13✔
489
                        return empty, err
×
490
                }
×
491
                edges := []*pb.DirectedEdge{edge}
13✔
492
                m.Edges = edges
13✔
493
                _, err = query.ApplyMutations(ctx, m)
13✔
494
                if err != nil {
13✔
495
                        return empty, err
×
496
                }
×
497

498
                // insert a helper record for backup & restore, indicating that drop_attr was done
499
                err = InsertDropRecord(ctx, "DROP_ATTR;"+attr)
13✔
500
                return empty, err
13✔
501
        }
502

503
        if op.DropOp == api.Operation_TYPE {
725✔
504
                if op.DropValue == "" {
5✔
505
                        return empty, errors.Errorf("If DropOp is set to TYPE, DropValue must not be empty")
1✔
506
                }
1✔
507

508
                // Pre-defined types cannot be dropped.
509
                dropPred := x.NamespaceAttr(namespace, op.DropValue)
3✔
510
                if x.IsPreDefinedType(dropPred) {
4✔
511
                        return empty, errors.Errorf("type %s is pre-defined and is not allowed to be dropped",
1✔
512
                                op.DropValue)
1✔
513
                }
1✔
514

515
                m.DropOp = pb.Mutations_TYPE
2✔
516
                m.DropValue = dropPred
2✔
517
                _, err := query.ApplyMutations(ctx, m)
2✔
518
                return empty, err
2✔
519
        }
520
        result, err := parseSchemaFromAlterOperation(ctx, op)
717✔
521
        if err == errIndexingInProgress {
717✔
522
                // Make the client wait a bit.
×
523
                time.Sleep(time.Second)
×
524
                return nil, err
×
525
        } else if err != nil {
723✔
526
                return nil, err
6✔
527
        }
6✔
528

529
        glog.Infof("Got schema: %+v\n", result)
711✔
530
        // TODO: Maybe add some checks about the schema.
711✔
531
        m.Schema = result.Preds
711✔
532
        m.Types = result.Types
711✔
533
        _, err = query.ApplyMutations(ctx, m)
711✔
534
        if err != nil {
718✔
535
                return empty, err
7✔
536
        }
7✔
537

538
        // wait for indexing to complete or context to be canceled.
539
        if err = worker.WaitForIndexing(ctx, !op.RunInBackground); err != nil {
704✔
540
                return empty, err
×
541
        }
×
542

543
        return empty, nil
704✔
544
}
545

546
func annotateNamespace(span *otrace.Span, ns uint64) {
40,700✔
547
        span.AddAttributes(otrace.Int64Attribute("ns", int64(ns)))
40,700✔
548
}
40,700✔
549

550
func annotateStartTs(span *otrace.Span, ts uint64) {
42,652✔
551
        span.AddAttributes(otrace.Int64Attribute("startTs", int64(ts)))
42,652✔
552
}
42,652✔
553

554
func (s *Server) doMutate(ctx context.Context, qc *queryContext, resp *api.Response) error {
39,689✔
555
        if len(qc.gmuList) == 0 {
63,965✔
556
                return nil
24,276✔
557
        }
24,276✔
558
        if ctx.Err() != nil {
15,413✔
559
                return ctx.Err()
×
560
        }
×
561

562
        start := time.Now()
15,413✔
563
        defer func() {
30,826✔
564
                qc.latency.Processing += time.Since(start)
15,413✔
565
        }()
15,413✔
566

567
        if !isMutationAllowed(ctx) {
15,415✔
568
                return errors.Errorf("no mutations allowed")
2✔
569
        }
2✔
570

571
        // update mutations from the query results before assigning UIDs
572
        if err := updateMutations(qc); err != nil {
15,411✔
573
                return err
×
574
        }
×
575

576
        newUids, err := query.AssignUids(ctx, qc.gmuList)
15,411✔
577
        if err != nil {
15,414✔
578
                return err
3✔
579
        }
3✔
580

581
        // resp.Uids contains a map of the node name to the uid.
582
        // 1. For a blank node, like _:foo, the key would be foo.
583
        // 2. For a uid variable that is part of an upsert query,
584
        //    like uid(foo), the key would be uid(foo).
585
        resp.Uids = query.UidsToHex(query.StripBlankNode(newUids))
15,408✔
586
        edges, err := query.ToDirectedEdges(qc.gmuList, newUids)
15,408✔
587
        if err != nil {
15,408✔
588
                return err
×
589
        }
×
590
        ns, err := x.ExtractNamespace(ctx)
15,408✔
591
        if err != nil {
15,408✔
592
                return errors.Wrapf(err, "While doing mutations:")
×
593
        }
×
594
        predHints := make(map[string]pb.Metadata_HintType)
15,408✔
595
        for _, gmu := range qc.gmuList {
31,066✔
596
                for pred, hint := range gmu.Metadata.GetPredHints() {
27,910✔
597
                        pred = x.NamespaceAttr(ns, pred)
12,252✔
598
                        if oldHint := predHints[pred]; oldHint == pb.Metadata_LIST {
12,505✔
599
                                continue
253✔
600
                        }
601
                        predHints[pred] = hint
11,999✔
602
                }
603
        }
604
        m := &pb.Mutations{
15,408✔
605
                Edges:   edges,
15,408✔
606
                StartTs: qc.req.StartTs,
15,408✔
607
                Metadata: &pb.Metadata{
15,408✔
608
                        PredHints: predHints,
15,408✔
609
                },
15,408✔
610
        }
15,408✔
611

15,408✔
612
        // ensure that we do not insert very large (> 64 KB) value
15,408✔
613
        if err := validateMutation(ctx, edges); err != nil {
15,412✔
614
                return err
4✔
615
        }
4✔
616

617
        qc.span.Annotatef(nil, "Applying mutations: %+v", m)
15,404✔
618
        resp.Txn, err = query.ApplyMutations(ctx, m)
15,404✔
619
        qc.span.Annotatef(nil, "Txn Context: %+v. Err=%v", resp.Txn, err)
15,404✔
620

15,404✔
621
        // calculateMutationMetrics calculate cost for the mutation.
15,404✔
622
        calculateMutationMetrics := func() {
29,188✔
623
                cost := uint64(len(newUids) + len(edges))
13,784✔
624
                resp.Metrics.NumUids["mutation_cost"] = cost
13,784✔
625
                resp.Metrics.NumUids["_total"] = resp.Metrics.NumUids["_total"] + cost
13,784✔
626
        }
13,784✔
627
        if !qc.req.CommitNow {
22,659✔
628
                calculateMutationMetrics()
7,255✔
629
                if err == x.ErrConflict {
7,255✔
630
                        err = status.Error(codes.FailedPrecondition, err.Error())
×
631
                }
×
632

633
                return err
7,255✔
634
        }
635

636
        // The following logic is for committing immediately.
637
        if err != nil {
8,160✔
638
                // ApplyMutations failed. We now want to abort the transaction,
11✔
639
                // ignoring any error that might occur during the abort (the user would
11✔
640
                // care more about the previous error).
11✔
641
                if resp.Txn == nil {
13✔
642
                        resp.Txn = &api.TxnContext{StartTs: qc.req.StartTs}
2✔
643
                }
2✔
644

645
                resp.Txn.Aborted = true
11✔
646
                _, _ = worker.CommitOverNetwork(ctx, resp.Txn)
11✔
647

11✔
648
                if err == x.ErrConflict {
11✔
649
                        // We have already aborted the transaction, so the error message should reflect that.
×
650
                        return dgo.ErrAborted
×
651
                }
×
652

653
                return err
11✔
654
        }
655

656
        qc.span.Annotatef(nil, "Prewrites err: %v. Attempting to commit/abort immediately.", err)
8,138✔
657
        ctxn := resp.Txn
8,138✔
658
        // zero would assign the CommitTs
8,138✔
659
        cts, err := worker.CommitOverNetwork(ctx, ctxn)
8,138✔
660
        qc.span.Annotatef(nil, "Status of commit at ts: %d: %v", ctxn.StartTs, err)
8,138✔
661
        if err != nil {
9,747✔
662
                if err == dgo.ErrAborted {
3,218✔
663
                        err = status.Errorf(codes.Aborted, err.Error())
1,609✔
664
                        resp.Txn.Aborted = true
1,609✔
665
                }
1,609✔
666

667
                return err
1,609✔
668
        }
669

670
        // CommitNow was true, no need to send keys.
671
        resp.Txn.Keys = resp.Txn.Keys[:0]
6,529✔
672
        resp.Txn.CommitTs = cts
6,529✔
673
        calculateMutationMetrics()
6,529✔
674
        return nil
6,529✔
675
}
676

677
// validateMutation ensures that the value in the edge is not too big.
678
// The challange here is that the keys in badger have a limitation on their size (< 2<<16).
679
// We need to ensure that no key, either primary or secondary index key is bigger than that.
680
// See here for more details: https://github.com/dgraph-io/projects/issues/73
681
func validateMutation(ctx context.Context, edges []*pb.DirectedEdge) error {
15,408✔
682
        errValueTooBigForIndex := errors.New("value in the mutation is too large for the index")
15,408✔
683

15,408✔
684
        // key = meta data + predicate + actual key, this all needs to fit into 64 KB
15,408✔
685
        // we are keeping 536 bytes aside for meta information we put into the key and we
15,408✔
686
        // use 65000 bytes for the rest, that is predicate and the actual key.
15,408✔
687
        const maxKeySize = 65000
15,408✔
688

15,408✔
689
        for _, e := range edges {
271,340✔
690
                maxSizeForDataKey := maxKeySize - len(e.Attr)
255,932✔
691

255,932✔
692
                // seems reasonable to assume, the tokens for indexes won't be bigger than the value itself
255,932✔
693
                if len(e.Value) <= maxSizeForDataKey {
511,857✔
694
                        continue
255,925✔
695
                }
696
                pred := x.NamespaceAttr(e.Namespace, e.Attr)
7✔
697
                update, ok := schema.State().Get(ctx, pred)
7✔
698
                if !ok {
7✔
699
                        continue
×
700
                }
701
                // only string type can have large values that could cause us issues later
702
                if update.GetValueType() != pb.Posting_STRING {
7✔
703
                        continue
×
704
                }
705

706
                storageVal := types.Val{Tid: types.TypeID(e.GetValueType()), Value: e.GetValue()}
7✔
707
                schemaVal, err := types.Convert(storageVal, types.TypeID(update.GetValueType()))
7✔
708
                if err != nil {
7✔
709
                        return err
×
710
                }
×
711

712
                for _, tokenizer := range schema.State().Tokenizer(ctx, pred) {
14✔
713
                        toks, err := tok.BuildTokens(schemaVal.Value, tok.GetTokenizerForLang(tokenizer, e.Lang))
7✔
714
                        if err != nil {
7✔
715
                                return fmt.Errorf("error while building index tokens: %w", err)
×
716
                        }
×
717

718
                        for _, tok := range toks {
25✔
719
                                if len(tok) > maxSizeForDataKey {
22✔
720
                                        return errValueTooBigForIndex
4✔
721
                                }
4✔
722
                        }
723
                }
724
        }
725

726
        return nil
15,404✔
727
}
728

729
// buildUpsertQuery modifies the query to evaluate the
730
// @if condition defined in Conditional Upsert.
731
func buildUpsertQuery(qc *queryContext) string {
15,528✔
732
        if qc.req.Query == "" || len(qc.gmuList) == 0 {
28,271✔
733
                return qc.req.Query
12,743✔
734
        }
12,743✔
735

736
        qc.condVars = make([]string, len(qc.req.Mutations))
2,785✔
737
        upsertQuery := strings.TrimSuffix(qc.req.Query, "}")
2,785✔
738
        for i, gmu := range qc.gmuList {
5,684✔
739
                isCondUpsert := strings.TrimSpace(gmu.Cond) != ""
2,899✔
740
                if isCondUpsert {
4,205✔
741
                        qc.condVars[i] = "__dgraph__" + strconv.Itoa(i)
1,306✔
742
                        qc.uidRes[qc.condVars[i]] = nil
1,306✔
743
                        // @if in upsert is same as @filter in the query
1,306✔
744
                        cond := strings.Replace(gmu.Cond, "@if", "@filter", 1)
1,306✔
745

1,306✔
746
                        // Add dummy query to evaluate the @if directive, ok to use uid(0) because
1,306✔
747
                        // dgraph doesn't check for existence of UIDs until we query for other predicates.
1,306✔
748
                        // Here, we are only querying for uid predicate in the dummy query.
1,306✔
749
                        //
1,306✔
750
                        // For example if - mu.Query = {
1,306✔
751
                        //      me(...) {...}
1,306✔
752
                        //   }
1,306✔
753
                        //
1,306✔
754
                        // Then, upsertQuery = {
1,306✔
755
                        //      me(...) {...}
1,306✔
756
                        //      __dgraph_0__ as var(func: uid(0)) @filter(...)
1,306✔
757
                        //   }
1,306✔
758
                        //
1,306✔
759
                        // The variable __dgraph_0__ will -
1,306✔
760
                        //      * be empty if the condition is true
1,306✔
761
                        //      * have 1 UID (the 0 UID) if the condition is false
1,306✔
762
                        upsertQuery += qc.condVars[i] + ` as var(func: uid(0)) ` + cond + `
1,306✔
763
                         `
1,306✔
764
                }
1,306✔
765
        }
766
        upsertQuery += `}`
2,785✔
767

2,785✔
768
        return upsertQuery
2,785✔
769
}
770

771
// updateMutations updates the mutation and replaces uid(var) and val(var) with
772
// their values or a blank node, in case of an upsert.
773
// We use the values stored in qc.uidRes and qc.valRes to update the mutation.
774
func updateMutations(qc *queryContext) error {
15,411✔
775
        for i, condVar := range qc.condVars {
18,212✔
776
                gmu := qc.gmuList[i]
2,801✔
777
                if condVar != "" {
4,011✔
778
                        uids, ok := qc.uidRes[condVar]
1,210✔
779
                        if !(ok && len(uids) == 1) {
1,353✔
780
                                gmu.Set = nil
143✔
781
                                gmu.Del = nil
143✔
782
                                continue
143✔
783
                        }
784
                }
785

786
                if err := updateUIDInMutations(gmu, qc); err != nil {
2,658✔
787
                        return err
×
788
                }
×
789
                if err := updateValInMutations(gmu, qc); err != nil {
2,658✔
790
                        return err
×
791
                }
×
792
        }
793

794
        return nil
15,411✔
795
}
796

797
// findMutationVars finds all the variables used in mutation block and stores them
798
// qc.uidRes and qc.valRes so that we only look for these variables in query results.
799
func findMutationVars(qc *queryContext) []string {
15,528✔
800
        updateVars := func(s string) {
532,154✔
801
                if strings.HasPrefix(s, "uid(") {
526,780✔
802
                        varName := s[4 : len(s)-1]
10,154✔
803
                        qc.uidRes[varName] = nil
10,154✔
804
                } else if strings.HasPrefix(s, "val(") {
516,658✔
805
                        varName := s[4 : len(s)-1]
32✔
806
                        qc.valRes[varName] = nil
32✔
807
                }
32✔
808
        }
809

810
        for _, gmu := range qc.gmuList {
31,306✔
811
                for _, nq := range gmu.Set {
269,595✔
812
                        updateVars(nq.Subject)
253,817✔
813
                        updateVars(nq.ObjectId)
253,817✔
814
                }
253,817✔
815
                for _, nq := range gmu.Del {
20,274✔
816
                        updateVars(nq.Subject)
4,496✔
817
                        updateVars(nq.ObjectId)
4,496✔
818
                }
4,496✔
819
        }
820

821
        varsList := make([]string, 0, len(qc.uidRes)+len(qc.valRes))
15,528✔
822
        for v := range qc.uidRes {
21,006✔
823
                varsList = append(varsList, v)
5,478✔
824
        }
5,478✔
825
        for v := range qc.valRes {
15,560✔
826
                varsList = append(varsList, v)
32✔
827
        }
32✔
828

829
        return varsList
15,528✔
830
}
831

832
// updateValInNQuads picks the val() from object and replaces it with its value
833
// Assumption is that Subject can contain UID, whereas Object can contain Val
834
// If val(variable) exists in a query, but the values are not there for the variable,
835
// it will ignore the mutation silently.
836
func updateValInNQuads(nquads []*api.NQuad, qc *queryContext, isSet bool) []*api.NQuad {
5,316✔
837
        getNewVals := func(s string) (map[uint64]types.Val, bool) {
13,350✔
838
                if strings.HasPrefix(s, "val(") {
8,092✔
839
                        varName := s[4 : len(s)-1]
58✔
840
                        if v, ok := qc.valRes[varName]; ok && v != nil {
115✔
841
                                return v, true
57✔
842
                        }
57✔
843
                        return nil, true
1✔
844
                }
845
                return nil, false
7,976✔
846
        }
847

848
        getValue := func(key uint64, uidToVal map[uint64]types.Val) (types.Val, bool) {
5,374✔
849
                val, ok := uidToVal[key]
58✔
850
                if ok {
105✔
851
                        return val, true
47✔
852
                }
47✔
853

854
                // Check if the variable is aggregate variable
855
                // Only 0 key would exist for aggregate variable
856
                val, ok = uidToVal[0]
11✔
857
                return val, ok
11✔
858
        }
859

860
        newNQuads := nquads[:0]
5,316✔
861
        for _, nq := range nquads {
13,350✔
862
                // Check if the nquad contains a val() in Object or not.
8,034✔
863
                // If not then, keep the mutation and continue
8,034✔
864
                uidToVal, found := getNewVals(nq.ObjectId)
8,034✔
865
                if !found {
16,010✔
866
                        newNQuads = append(newNQuads, nq)
7,976✔
867
                        continue
7,976✔
868
                }
869

870
                // uid(u) <amount> val(amt)
871
                // For each NQuad, we need to convert the val(variable_name)
872
                // to *api.Value before applying the mutation. For that, first
873
                // we convert key to uint64 and get the UID to Value map from
874
                // the result of the query.
875
                var key uint64
58✔
876
                var err error
58✔
877
                switch {
58✔
878
                case nq.Subject[0] == '_' && isSet:
11✔
879
                        // in case aggregate val(var) is there, that should work with blank node.
11✔
880
                        key = 0
11✔
881
                case nq.Subject[0] == '_' && !isSet:
×
882
                        // UID is of format "_:uid(u)". Ignore the delete silently
×
883
                        continue
×
884
                default:
47✔
885
                        key, err = strconv.ParseUint(nq.Subject, 0, 64)
47✔
886
                        if err != nil {
47✔
887
                                // Key conversion failed, ignoring the nquad. Ideally,
×
888
                                // it shouldn't happen as this is the result of a query.
×
889
                                glog.Errorf("Conversion of subject %s failed. Error: %s",
×
890
                                        nq.Subject, err.Error())
×
891
                                continue
×
892
                        }
893
                }
894

895
                // Get the value to the corresponding UID(key) from the query result
896
                nq.ObjectId = ""
58✔
897
                val, ok := getValue(key, uidToVal)
58✔
898
                if !ok {
66✔
899
                        continue
8✔
900
                }
901

902
                // Convert the value from types.Val to *api.Value
903
                nq.ObjectValue, err = types.ObjectValue(val.Tid, val.Value)
50✔
904
                if err != nil {
50✔
905
                        // Value conversion failed, ignoring the nquad. Ideally,
×
906
                        // it shouldn't happen as this is the result of a query.
×
907
                        glog.Errorf("Conversion of %s failed for %d subject. Error: %s",
×
908
                                nq.ObjectId, key, err.Error())
×
909
                        continue
×
910
                }
911

912
                newNQuads = append(newNQuads, nq)
50✔
913
        }
914
        qc.nquadsCount += len(newNQuads)
5,316✔
915
        return newNQuads
5,316✔
916
}
917

918
// updateValInMutations does following transformations:
919
// 0x123 <amount> val(v) -> 0x123 <amount> 13.0
920
func updateValInMutations(gmu *dql.Mutation, qc *queryContext) error {
2,658✔
921
        gmu.Del = updateValInNQuads(gmu.Del, qc, false)
2,658✔
922
        gmu.Set = updateValInNQuads(gmu.Set, qc, true)
2,658✔
923
        if qc.nquadsCount > x.Config.LimitMutationsNquad {
2,658✔
924
                return errors.Errorf("NQuad count in the request: %d, is more that threshold: %d",
×
925
                        qc.nquadsCount, x.Config.LimitMutationsNquad)
×
926
        }
×
927
        return nil
2,658✔
928
}
929

930
// updateUIDInMutations does following transformations:
931
//   - uid(v) -> 0x123     -- If v is defined in query block
932
//   - uid(v) -> _:uid(v)  -- Otherwise
933

934
func updateUIDInMutations(gmu *dql.Mutation, qc *queryContext) error {
2,658✔
935
        // usedMutationVars keeps track of variables that are used in mutations.
2,658✔
936
        getNewVals := func(s string) []string {
22,320✔
937
                if strings.HasPrefix(s, "uid(") {
29,687✔
938
                        varName := s[4 : len(s)-1]
10,025✔
939
                        if uids, ok := qc.uidRes[varName]; ok && len(uids) != 0 {
17,495✔
940
                                return uids
7,470✔
941
                        }
7,470✔
942

943
                        return []string{"_:" + s}
2,555✔
944
                }
945

946
                return []string{s}
9,637✔
947
        }
948

949
        getNewNQuad := func(nq *api.NQuad, s, o string) *api.NQuad {
10,692✔
950
                // The following copy is fine because we only modify Subject and ObjectId.
8,034✔
951
                // The pointer values are not modified across different copies of NQuad.
8,034✔
952
                n := *nq
8,034✔
953

8,034✔
954
                n.Subject = s
8,034✔
955
                n.ObjectId = o
8,034✔
956
                return &n
8,034✔
957
        }
8,034✔
958

959
        // Remove the mutations from gmu.Del when no UID was found.
960
        gmuDel := make([]*api.NQuad, 0, len(gmu.Del))
2,658✔
961
        for _, nq := range gmu.Del {
6,098✔
962
                // if Subject or/and Object are variables, each NQuad can result
3,440✔
963
                // in multiple NQuads if any variable stores more than one UIDs.
3,440✔
964
                newSubs := getNewVals(nq.Subject)
3,440✔
965
                newObs := getNewVals(nq.ObjectId)
3,440✔
966

3,440✔
967
                for _, s := range newSubs {
7,206✔
968
                        for _, o := range newObs {
7,909✔
969
                                // Blank node has no meaning in case of deletion.
4,143✔
970
                                if strings.HasPrefix(s, "_:uid(") ||
4,143✔
971
                                        strings.HasPrefix(o, "_:uid(") {
6,735✔
972
                                        continue
2,592✔
973
                                }
974

975
                                gmuDel = append(gmuDel, getNewNQuad(nq, s, o))
1,551✔
976
                                qc.nquadsCount++
1,551✔
977
                        }
978
                        if qc.nquadsCount > x.Config.LimitMutationsNquad {
3,766✔
979
                                return errors.Errorf("NQuad count in the request: %d, is more that threshold: %d",
×
980
                                        qc.nquadsCount, x.Config.LimitMutationsNquad)
×
981
                        }
×
982
                }
983
        }
984

985
        gmu.Del = gmuDel
2,658✔
986

2,658✔
987
        // Update the values in mutation block from the query block.
2,658✔
988
        gmuSet := make([]*api.NQuad, 0, len(gmu.Set))
2,658✔
989
        for _, nq := range gmu.Set {
9,049✔
990
                newSubs := getNewVals(nq.Subject)
6,391✔
991
                newObs := getNewVals(nq.ObjectId)
6,391✔
992

6,391✔
993
                qc.nquadsCount += len(newSubs) * len(newObs)
6,391✔
994
                if qc.nquadsCount > int(x.Config.LimitQueryEdge) {
6,391✔
995
                        return errors.Errorf("NQuad count in the request: %d, is more that threshold: %d",
×
996
                                qc.nquadsCount, int(x.Config.LimitQueryEdge))
×
997
                }
×
998

999
                for _, s := range newSubs {
12,858✔
1000
                        for _, o := range newObs {
12,950✔
1001
                                gmuSet = append(gmuSet, getNewNQuad(nq, s, o))
6,483✔
1002
                        }
6,483✔
1003
                }
1004
        }
1005
        gmu.Set = gmuSet
2,658✔
1006
        return nil
2,658✔
1007
}
1008

1009
// queryContext is used to pass around all the variables needed
1010
// to process a request for query, mutation or upsert.
1011
type queryContext struct {
1012
        // req is the incoming, not yet parsed request containing
1013
        // a query or more than one mutations or both (in case of upsert)
1014
        req *api.Request
1015
        // gmuList is the list of mutations after parsing req.Mutations
1016
        gmuList []*dql.Mutation
1017
        // dqlRes contains result of parsing the req.Query
1018
        dqlRes dql.Result
1019
        // condVars are conditional variables used in the (modified) query to figure out
1020
        // whether the condition in Conditional Upsert is true. The string would be empty
1021
        // if the corresponding mutation is not a conditional upsert.
1022
        // Note that, len(condVars) == len(gmuList).
1023
        condVars []string
1024
        // uidRes stores mapping from variable names to UIDs for UID variables.
1025
        // These variables are either dummy variables used for Conditional
1026
        // Upsert or variables used in the mutation block in the incoming request.
1027
        uidRes map[string][]string
1028
        // valRes stores mapping from variable names to values for value
1029
        // variables used in the mutation block of incoming request.
1030
        valRes map[string]map[uint64]types.Val
1031
        // l stores latency numbers
1032
        latency *query.Latency
1033
        // span stores a opencensus span used throughout the query processing
1034
        span *otrace.Span
1035
        // graphql indicates whether the given request is from graphql admin or not.
1036
        graphql bool
1037
        // gqlField stores the GraphQL field for which the query is being processed.
1038
        // This would be set only if the request is a query from GraphQL layer,
1039
        // otherwise it would be nil. (Eg. nil cases: in case of a DQL query,
1040
        // a mutation being executed from GraphQL layer).
1041
        gqlField gqlSchema.Field
1042
        // nquadsCount maintains numbers of nquads which would be inserted as part of this request.
1043
        // In some cases(mostly upserts), numbers of nquads to be inserted can to huge(we have seen upto
1044
        // 1B) and resulting in OOM. We are limiting number of nquads which can be inserted in
1045
        // a single request.
1046
        nquadsCount int
1047
}
1048

1049
// Request represents a query request sent to the doQuery() method on the Server.
1050
// It contains all the metadata required to execute a query.
1051
type Request struct {
1052
        // req is the incoming gRPC request
1053
        req *api.Request
1054
        // gqlField is the GraphQL field for which the request is being sent
1055
        gqlField gqlSchema.Field
1056
        // doAuth tells whether this request needs ACL authorization or not
1057
        doAuth AuthMode
1058
}
1059

1060
// Health handles /health and /health?all requests.
1061
func (s *Server) Health(ctx context.Context, all bool) (*api.Response, error) {
151✔
1062
        if ctx.Err() != nil {
151✔
1063
                return nil, ctx.Err()
×
1064
        }
×
1065

1066
        var healthAll []pb.HealthInfo
151✔
1067
        if all {
162✔
1068
                if err := AuthorizeGuardians(ctx); err != nil {
12✔
1069
                        return nil, err
1✔
1070
                }
1✔
1071
                pool := conn.GetPools().GetAll()
10✔
1072
                for _, p := range pool {
37✔
1073
                        if p.Addr == x.WorkerConfig.MyAddr {
37✔
1074
                                continue
10✔
1075
                        }
1076
                        healthAll = append(healthAll, p.HealthInfo())
17✔
1077
                }
1078
        }
1079

1080
        // Append self.
1081
        healthAll = append(healthAll, pb.HealthInfo{
150✔
1082
                Instance:    "alpha",
150✔
1083
                Address:     x.WorkerConfig.MyAddr,
150✔
1084
                Status:      "healthy",
150✔
1085
                Group:       strconv.Itoa(int(worker.GroupId())),
150✔
1086
                Version:     x.Version(),
150✔
1087
                Uptime:      int64(time.Since(x.WorkerConfig.StartTime) / time.Second),
150✔
1088
                LastEcho:    time.Now().Unix(),
150✔
1089
                Ongoing:     worker.GetOngoingTasks(),
150✔
1090
                Indexing:    schema.GetIndexingPredicates(),
150✔
1091
                EeFeatures:  worker.GetEEFeaturesList(),
150✔
1092
                MaxAssigned: posting.Oracle().MaxAssigned(),
150✔
1093
        })
150✔
1094

150✔
1095
        var err error
150✔
1096
        var jsonOut []byte
150✔
1097
        if jsonOut, err = json.Marshal(healthAll); err != nil {
150✔
1098
                return nil, errors.Errorf("Unable to Marshal. Err %v", err)
×
1099
        }
×
1100
        return &api.Response{Json: jsonOut}, nil
150✔
1101
}
1102

1103
// Filter out the tablets that do not belong to the requestor's namespace.
1104
func filterTablets(ctx context.Context, ms *pb.MembershipState) error {
10✔
1105
        if !x.WorkerConfig.AclEnabled {
16✔
1106
                return nil
6✔
1107
        }
6✔
1108
        namespace, err := x.ExtractNamespaceFrom(ctx)
4✔
1109
        if err != nil {
4✔
1110
                return errors.Errorf("Namespace not found in JWT.")
×
1111
        }
×
1112
        if namespace == x.GalaxyNamespace {
7✔
1113
                // For galaxy namespace, we don't want to filter out the predicates.
3✔
1114
                return nil
3✔
1115
        }
3✔
1116
        for _, group := range ms.GetGroups() {
4✔
1117
                tablets := make(map[string]*pb.Tablet)
3✔
1118
                for pred, tablet := range group.GetTablets() {
40✔
1119
                        if ns, attr := x.ParseNamespaceAttr(pred); namespace == ns {
48✔
1120
                                tablets[attr] = tablet
11✔
1121
                                tablets[attr].Predicate = attr
11✔
1122
                        }
11✔
1123
                }
1124
                group.Tablets = tablets
3✔
1125
        }
1126
        return nil
1✔
1127
}
1128

1129
// State handles state requests
1130
func (s *Server) State(ctx context.Context) (*api.Response, error) {
10✔
1131
        if ctx.Err() != nil {
10✔
1132
                return nil, ctx.Err()
×
1133
        }
×
1134

1135
        if err := AuthorizeGuardians(ctx); err != nil {
10✔
1136
                return nil, err
×
1137
        }
×
1138

1139
        ms := worker.GetMembershipState()
10✔
1140
        if ms == nil {
10✔
1141
                return nil, errors.Errorf("No membership state found")
×
1142
        }
×
1143

1144
        if err := filterTablets(ctx, ms); err != nil {
10✔
1145
                return nil, err
×
1146
        }
×
1147

1148
        m := jsonpb.Marshaler{EmitDefaults: true}
10✔
1149
        var jsonState bytes.Buffer
10✔
1150
        if err := m.Marshal(&jsonState, ms); err != nil {
10✔
1151
                return nil, errors.Errorf("Error marshalling state information to JSON")
×
1152
        }
×
1153

1154
        return &api.Response{Json: jsonState.Bytes()}, nil
10✔
1155
}
1156

1157
func getAuthMode(ctx context.Context) AuthMode {
32,645✔
1158
        if auth := ctx.Value(Authorize); auth == nil || auth.(bool) {
65,064✔
1159
                return NeedAuthorize
32,419✔
1160
        }
32,419✔
1161
        return NoAuthorize
226✔
1162
}
1163

1164
// QueryGraphQL handles only GraphQL queries, neither mutations nor DQL.
1165
func (s *Server) QueryGraphQL(ctx context.Context, req *api.Request,
1166
        field gqlSchema.Field) (*api.Response, error) {
5,013✔
1167
        // Add a timeout for queries which don't have a deadline set. We don't want to
5,013✔
1168
        // apply a timeout if it's a mutation, that's currently handled by flag
5,013✔
1169
        // "txn-abort-after".
5,013✔
1170
        if req.GetMutations() == nil && x.Config.QueryTimeout != 0 {
5,013✔
1171
                if d, _ := ctx.Deadline(); d.IsZero() {
×
1172
                        var cancel context.CancelFunc
×
1173
                        ctx, cancel = context.WithTimeout(ctx, x.Config.QueryTimeout)
×
1174
                        defer cancel()
×
1175
                }
×
1176
        }
1177
        // no need to attach namespace here, it is already done by GraphQL layer
1178
        return s.doQuery(ctx, &Request{req: req, gqlField: field, doAuth: getAuthMode(ctx)})
5,013✔
1179
}
1180

1181
func (s *Server) Query(ctx context.Context, req *api.Request) (*api.Response, error) {
26,397✔
1182
        resp, err := s.QueryNoGrpc(ctx, req)
26,397✔
1183
        if err != nil {
27,603✔
1184
                return resp, err
1,206✔
1185
        }
1,206✔
1186
        md := metadata.Pairs(x.DgraphCostHeader, fmt.Sprint(resp.Metrics.NumUids["_total"]))
25,191✔
1187
        if err := grpc.SendHeader(ctx, md); err != nil {
25,191✔
1188
                glog.Warningf("error in sending grpc headers: %v", err)
×
1189
        }
×
1190
        return resp, nil
25,191✔
1191
}
1192

1193
// Query handles queries or mutations
1194
func (s *Server) QueryNoGrpc(ctx context.Context, req *api.Request) (*api.Response, error) {
27,632✔
1195
        ctx = x.AttachJWTNamespace(ctx)
27,632✔
1196
        if x.WorkerConfig.AclEnabled && req.GetStartTs() != 0 {
44,905✔
1197
                // A fresh StartTs is assigned if it is 0.
17,273✔
1198
                ns, err := x.ExtractNamespace(ctx)
17,273✔
1199
                if err != nil {
17,273✔
1200
                        return nil, err
×
1201
                }
×
1202
                if req.GetHash() != getHash(ns, req.GetStartTs()) {
17,273✔
1203
                        return nil, x.ErrHashMismatch
×
1204
                }
×
1205
        }
1206
        // Add a timeout for queries which don't have a deadline set. We don't want to
1207
        // apply a timeout if it's a mutation, that's currently handled by flag
1208
        // "txn-abort-after".
1209
        if req.GetMutations() == nil && x.Config.QueryTimeout != 0 {
27,632✔
1210
                if d, _ := ctx.Deadline(); d.IsZero() {
×
1211
                        var cancel context.CancelFunc
×
1212
                        ctx, cancel = context.WithTimeout(ctx, x.Config.QueryTimeout)
×
1213
                        defer cancel()
×
1214
                }
×
1215
        }
1216
        return s.doQuery(ctx, &Request{req: req, doAuth: getAuthMode(ctx)})
27,632✔
1217
}
1218

1219
var pendingQueries int64
1220
var maxPendingQueries int64
1221
var serverOverloadErr = errors.New("429 Too Many Requests. Please throttle your requests")
1222

1223
func Init() {
96✔
1224
        maxPendingQueries = x.Config.Limit.GetInt64("max-pending-queries")
96✔
1225
}
96✔
1226

1227
func (s *Server) doQuery(ctx context.Context, req *Request) (resp *api.Response, rerr error) {
40,090✔
1228
        if ctx.Err() != nil {
40,090✔
1229
                return nil, ctx.Err()
×
1230
        }
×
1231
        defer atomic.AddInt64(&pendingQueries, -1)
40,090✔
1232
        if val := atomic.AddInt64(&pendingQueries, 1); val > maxPendingQueries {
40,090✔
1233
                return nil, serverOverloadErr
×
1234
        }
×
1235

1236
        isGraphQL, _ := ctx.Value(IsGraphql).(bool)
40,090✔
1237
        if isGraphQL {
45,440✔
1238
                atomic.AddUint64(&numGraphQL, 1)
5,350✔
1239
        } else {
40,090✔
1240
                atomic.AddUint64(&numGraphQLPM, 1)
34,740✔
1241
        }
34,740✔
1242
        l := &query.Latency{}
40,090✔
1243
        l.Start = time.Now()
40,090✔
1244

40,090✔
1245
        if bool(glog.V(3)) || worker.LogDQLRequestEnabled() {
41,561✔
1246
                glog.Infof("Got a query, DQL form: %+v at %+v", req.req, l.Start.Format(time.RFC3339))
1,471✔
1247
        }
1,471✔
1248

1249
        isMutation := len(req.req.Mutations) > 0
40,090✔
1250
        methodRequest := methodQuery
40,090✔
1251
        if isMutation {
55,627✔
1252
                methodRequest = methodMutate
15,537✔
1253
        }
15,537✔
1254

1255
        var measurements []ostats.Measurement
40,090✔
1256
        ctx, span := otrace.StartSpan(ctx, methodRequest)
40,090✔
1257
        if ns, err := x.ExtractNamespace(ctx); err == nil {
80,172✔
1258
                annotateNamespace(span, ns)
40,082✔
1259
        }
40,082✔
1260

1261
        ctx = x.WithMethod(ctx, methodRequest)
40,090✔
1262
        defer func() {
80,179✔
1263
                span.End()
40,089✔
1264
                v := x.TagValueStatusOK
40,089✔
1265
                if rerr != nil {
42,139✔
1266
                        v = x.TagValueStatusError
2,050✔
1267
                }
2,050✔
1268
                ctx, _ = tag.New(ctx, tag.Upsert(x.KeyStatus, v))
40,089✔
1269
                timeSpentMs := x.SinceMs(l.Start)
40,089✔
1270
                measurements = append(measurements, x.LatencyMs.M(timeSpentMs))
40,089✔
1271
                ostats.Record(ctx, measurements...)
40,089✔
1272
        }()
1273

1274
        if rerr = x.HealthCheck(); rerr != nil {
40,175✔
1275
                return
85✔
1276
        }
85✔
1277

1278
        req.req.Query = strings.TrimSpace(req.req.Query)
40,005✔
1279
        isQuery := len(req.req.Query) != 0
40,005✔
1280
        if !isQuery && !isMutation {
40,006✔
1281
                span.Annotate(nil, "empty request")
1✔
1282
                return nil, errors.Errorf("empty request")
1✔
1283
        }
1✔
1284

1285
        span.AddAttributes(otrace.StringAttribute("Query", req.req.Query))
40,004✔
1286
        span.Annotatef(nil, "Request received: %v", req.req)
40,004✔
1287
        if isQuery {
67,262✔
1288
                ostats.Record(ctx, x.PendingQueries.M(1), x.NumQueries.M(1))
27,258✔
1289
                defer func() {
54,515✔
1290
                        measurements = append(measurements, x.PendingQueries.M(-1))
27,257✔
1291
                }()
27,257✔
1292
        }
1293
        if isMutation {
55,540✔
1294
                ostats.Record(ctx, x.NumMutations.M(1))
15,536✔
1295
        }
15,536✔
1296

1297
        if req.doAuth == NeedAuthorize && x.IsGalaxyOperation(ctx) {
40,014✔
1298
                // Only the guardian of the galaxy can do a galaxy wide query/mutation. This operation is
10✔
1299
                // needed by live loader.
10✔
1300
                if err := AuthGuardianOfTheGalaxy(ctx); err != nil {
10✔
1301
                        s := status.Convert(err)
×
1302
                        return nil, status.Error(s.Code(),
×
1303
                                "Non guardian of galaxy user cannot bypass namespaces. "+s.Message())
×
1304
                }
×
1305
        }
1306

1307
        qc := &queryContext{
40,004✔
1308
                req:      req.req,
40,004✔
1309
                latency:  l,
40,004✔
1310
                span:     span,
40,004✔
1311
                graphql:  isGraphQL,
40,004✔
1312
                gqlField: req.gqlField,
40,004✔
1313
        }
40,004✔
1314
        if rerr = parseRequest(qc); rerr != nil {
40,033✔
1315
                return
29✔
1316
        }
29✔
1317

1318
        if req.doAuth == NeedAuthorize {
72,332✔
1319
                if rerr = authorizeRequest(ctx, qc); rerr != nil {
32,378✔
1320
                        return
21✔
1321
                }
21✔
1322
        }
1323

1324
        // We use defer here because for queries, startTs will be
1325
        // assigned in the processQuery function called below.
1326
        defer annotateStartTs(qc.span, qc.req.StartTs)
39,954✔
1327
        // For mutations, we update the startTs if necessary.
39,954✔
1328
        if isMutation && req.req.StartTs == 0 {
48,858✔
1329
                start := time.Now()
8,904✔
1330
                req.req.StartTs = worker.State.GetTimestamp(false)
8,904✔
1331
                qc.latency.AssignTimestamp = time.Since(start)
8,904✔
1332
        }
8,904✔
1333
        if x.WorkerConfig.AclEnabled {
72,428✔
1334
                ns, err := x.ExtractNamespace(ctx)
32,475✔
1335
                if err != nil {
32,475✔
1336
                        return nil, err
×
1337
                }
×
1338
                defer func() {
64,950✔
1339
                        if resp != nil && resp.Txn != nil {
64,936✔
1340
                                // attach the hash, user must send this hash when further operating on this startTs.
32,461✔
1341
                                resp.Txn.Hash = getHash(ns, resp.Txn.StartTs)
32,461✔
1342
                        }
32,461✔
1343
                }()
1344
        }
1345

1346
        var gqlErrs error
39,953✔
1347
        if resp, rerr = processQuery(ctx, qc); rerr != nil {
40,228✔
1348
                // if rerr is just some error from GraphQL encoding, then we need to continue the normal
275✔
1349
                // execution ignoring the error as we still need to assign latency info to resp. If we can
275✔
1350
                // change the api.Response proto to have a field to contain GraphQL errors, that would be
275✔
1351
                // great. Otherwise, we will have to do such checks a lot and that would make code ugly.
275✔
1352
                if qc.gqlField != nil && x.IsGqlErrorList(rerr) {
286✔
1353
                        gqlErrs = rerr
11✔
1354
                } else {
275✔
1355
                        return
264✔
1356
                }
264✔
1357
        }
1358
        // if it were a mutation, simple or upsert, in any case gqlErrs would be empty as GraphQL JSON
1359
        // is formed only for queries. So, gqlErrs can have something only in the case of a pure query.
1360
        // So, safe to ignore gqlErrs and not return that here.
1361
        if rerr = s.doMutate(ctx, qc, resp); rerr != nil {
41,328✔
1362
                return
1,639✔
1363
        }
1,639✔
1364

1365
        // TODO(Ahsan): resp.Txn.Preds contain predicates of form gid-namespace|attr.
1366
        // Remove the namespace from the response.
1367
        // resp.Txn.Preds = x.ParseAttrList(resp.Txn.Preds)
1368

1369
        // TODO(martinmr): Include Transport as part of the latency. Need to do
1370
        // this separately since it involves modifying the API protos.
1371
        resp.Latency = &api.Latency{
38,050✔
1372
                AssignTimestampNs: uint64(l.AssignTimestamp.Nanoseconds()),
38,050✔
1373
                ParsingNs:         uint64(l.Parsing.Nanoseconds()),
38,050✔
1374
                ProcessingNs:      uint64(l.Processing.Nanoseconds()),
38,050✔
1375
                EncodingNs:        uint64(l.Json.Nanoseconds()),
38,050✔
1376
                TotalNs:           uint64((time.Since(l.Start)).Nanoseconds()),
38,050✔
1377
        }
38,050✔
1378
        return resp, gqlErrs
38,050✔
1379
}
1380

1381
func processQuery(ctx context.Context, qc *queryContext) (*api.Response, error) {
39,953✔
1382
        resp := &api.Response{}
39,953✔
1383
        if qc.req.Query == "" {
52,679✔
1384
                // No query, so make the query cost 0.
12,726✔
1385
                resp.Metrics = &api.Metrics{
12,726✔
1386
                        NumUids: map[string]uint64{"_total": 0},
12,726✔
1387
                }
12,726✔
1388
                return resp, nil
12,726✔
1389
        }
12,726✔
1390
        if ctx.Err() != nil {
27,227✔
1391
                return resp, ctx.Err()
×
1392
        }
×
1393
        qr := query.Request{
27,227✔
1394
                Latency:  qc.latency,
27,227✔
1395
                GqlQuery: &qc.dqlRes,
27,227✔
1396
        }
27,227✔
1397

27,227✔
1398
        // Here we try our best effort to not contact Zero for a timestamp. If we succeed,
27,227✔
1399
        // then we use the max known transaction ts value (from ProcessDelta) for a read-only query.
27,227✔
1400
        // If we haven't processed any updates yet then fall back to getting TS from Zero.
27,227✔
1401
        switch {
27,227✔
1402
        case qc.req.BestEffort:
1,014✔
1403
                qc.span.Annotate([]otrace.Attribute{otrace.BoolAttribute("be", true)}, "")
1,014✔
1404
        case qc.req.ReadOnly:
7,990✔
1405
                qc.span.Annotate([]otrace.Attribute{otrace.BoolAttribute("ro", true)}, "")
7,990✔
1406
        default:
18,223✔
1407
                qc.span.Annotate([]otrace.Attribute{otrace.BoolAttribute("no", true)}, "")
18,223✔
1408
        }
1409

1410
        if qc.req.BestEffort {
28,241✔
1411
                // Sanity: check that request is read-only too.
1,014✔
1412
                if !qc.req.ReadOnly {
1,014✔
1413
                        return resp, errors.Errorf("A best effort query must be read-only.")
×
1414
                }
×
1415
                if qc.req.StartTs == 0 {
2,027✔
1416
                        qc.req.StartTs = posting.Oracle().MaxAssigned()
1,013✔
1417
                }
1,013✔
1418
                qr.Cache = worker.NoCache
1,014✔
1419
        }
1420

1421
        if qc.req.StartTs == 0 {
33,697✔
1422
                assignTimestampStart := time.Now()
6,470✔
1423
                qc.req.StartTs = worker.State.GetTimestamp(qc.req.ReadOnly)
6,470✔
1424
                qc.latency.AssignTimestamp = time.Since(assignTimestampStart)
6,470✔
1425
        }
6,470✔
1426

1427
        qr.ReadTs = qc.req.StartTs
27,227✔
1428
        resp.Txn = &api.TxnContext{StartTs: qc.req.StartTs}
27,227✔
1429

27,227✔
1430
        // Core processing happens here.
27,227✔
1431
        er, err := qr.Process(ctx)
27,227✔
1432

27,227✔
1433
        if bool(glog.V(3)) || worker.LogDQLRequestEnabled() {
28,350✔
1434
                glog.Infof("Finished a query that started at: %+v",
1,123✔
1435
                        qr.Latency.Start.Format(time.RFC3339))
1,123✔
1436
        }
1,123✔
1437

1438
        if err != nil {
27,483✔
1439
                if bool(glog.V(3)) {
256✔
1440
                        glog.Infof("Error processing query: %+v\n", err.Error())
×
1441
                }
×
1442
                return resp, errors.Wrap(err, "")
256✔
1443
        }
1444

1445
        if len(er.SchemaNode) > 0 || len(er.Types) > 0 {
27,064✔
1446
                if err = authorizeSchemaQuery(ctx, &er); err != nil {
93✔
1447
                        return resp, err
×
1448
                }
×
1449
                sort.Slice(er.SchemaNode, func(i, j int) bool {
4,573✔
1450
                        return er.SchemaNode[i].Predicate < er.SchemaNode[j].Predicate
4,480✔
1451
                })
4,480✔
1452
                sort.Slice(er.Types, func(i, j int) bool {
1,154✔
1453
                        return er.Types[i].TypeName < er.Types[j].TypeName
1,061✔
1454
                })
1,061✔
1455

1456
                respMap := make(map[string]interface{})
93✔
1457
                if len(er.SchemaNode) > 0 {
181✔
1458
                        respMap["schema"] = er.SchemaNode
88✔
1459
                }
88✔
1460
                if len(er.Types) > 0 {
159✔
1461
                        respMap["types"] = formatTypes(er.Types)
66✔
1462
                }
66✔
1463
                resp.Json, err = json.Marshal(respMap)
93✔
1464
        } else if qc.req.RespFormat == api.Request_RDF {
26,889✔
1465
                resp.Rdf, err = query.ToRDF(qc.latency, er.Subgraphs)
11✔
1466
        } else {
26,878✔
1467
                resp.Json, err = query.ToJson(ctx, qc.latency, er.Subgraphs, qc.gqlField)
26,867✔
1468
        }
26,867✔
1469
        // if err is just some error from GraphQL encoding, then we need to continue the normal
1470
        // execution ignoring the error as we still need to assign metrics and latency info to resp.
1471
        if err != nil && (qc.gqlField == nil || !x.IsGqlErrorList(err)) {
26,979✔
1472
                return resp, err
8✔
1473
        }
8✔
1474
        qc.span.Annotatef(nil, "Response = %s", resp.Json)
26,963✔
1475

26,963✔
1476
        // varToUID contains a map of variable name to the uids corresponding to it.
26,963✔
1477
        // It is used later for constructing set and delete mutations by replacing
26,963✔
1478
        // variables with the actual uids they correspond to.
26,963✔
1479
        // If a variable doesn't have any UID, we generate one ourselves later.
26,963✔
1480
        for name := range qc.uidRes {
32,339✔
1481
                v := qr.Vars[name]
5,376✔
1482

5,376✔
1483
                // If the list of UIDs is empty but the map of values is not,
5,376✔
1484
                // we need to get the UIDs from the keys in the map.
5,376✔
1485
                var uidList []uint64
5,376✔
1486
                if v.Uids != nil && len(v.Uids.Uids) > 0 {
9,409✔
1487
                        uidList = v.Uids.Uids
4,033✔
1488
                } else {
5,376✔
1489
                        uidList = make([]uint64, 0, len(v.Vals))
1,343✔
1490
                        for uid := range v.Vals {
1,345✔
1491
                                uidList = append(uidList, uid)
2✔
1492
                        }
2✔
1493
                }
1494
                if len(uidList) == 0 {
6,717✔
1495
                        continue
1,341✔
1496
                }
1497

1498
                // We support maximum 1 million UIDs per variable to ensure that we
1499
                // don't do bad things to alpha and mutation doesn't become too big.
1500
                if len(uidList) > 1e6 {
4,035✔
1501
                        return resp, errors.Errorf("var [%v] has over million UIDs", name)
×
1502
                }
×
1503

1504
                uids := make([]string, len(uidList))
4,035✔
1505
                for i, u := range uidList {
8,470✔
1506
                        // We use base 10 here because the RDF mutations expect the uid to be in base 10.
4,435✔
1507
                        uids[i] = strconv.FormatUint(u, 10)
4,435✔
1508
                }
4,435✔
1509
                qc.uidRes[name] = uids
4,035✔
1510
        }
1511

1512
        // look for values for value variables
1513
        for name := range qc.valRes {
26,995✔
1514
                v := qr.Vars[name]
32✔
1515
                qc.valRes[name] = v.Vals
32✔
1516
        }
32✔
1517

1518
        resp.Metrics = &api.Metrics{
26,963✔
1519
                NumUids: er.Metrics,
26,963✔
1520
        }
26,963✔
1521
        var total uint64
26,963✔
1522
        for _, num := range resp.Metrics.NumUids {
118,783✔
1523
                total += num
91,820✔
1524
        }
91,820✔
1525
        resp.Metrics.NumUids["_total"] = total
26,963✔
1526

26,963✔
1527
        return resp, err
26,963✔
1528
}
1529

1530
// parseRequest parses the incoming request
1531
func parseRequest(qc *queryContext) error {
40,004✔
1532
        start := time.Now()
40,004✔
1533
        defer func() {
80,008✔
1534
                qc.latency.Parsing = time.Since(start)
40,004✔
1535
        }()
40,004✔
1536

1537
        var needVars []string
40,004✔
1538
        upsertQuery := qc.req.Query
40,004✔
1539
        if len(qc.req.Mutations) > 0 {
55,540✔
1540
                // parsing mutations
15,536✔
1541
                qc.gmuList = make([]*dql.Mutation, 0, len(qc.req.Mutations))
15,536✔
1542
                for _, mu := range qc.req.Mutations {
31,322✔
1543
                        gmu, err := parseMutationObject(mu, qc)
15,786✔
1544
                        if err != nil {
15,794✔
1545
                                return err
8✔
1546
                        }
8✔
1547

1548
                        qc.gmuList = append(qc.gmuList, gmu)
15,778✔
1549
                }
1550

1551
                qc.uidRes = make(map[string][]string)
15,528✔
1552
                qc.valRes = make(map[string]map[uint64]types.Val)
15,528✔
1553
                upsertQuery = buildUpsertQuery(qc)
15,528✔
1554
                needVars = findMutationVars(qc)
15,528✔
1555
                if upsertQuery == "" {
28,271✔
1556
                        if len(needVars) > 0 {
12,744✔
1557
                                return errors.Errorf("variables %v not defined", needVars)
1✔
1558
                        }
1✔
1559

1560
                        return nil
12,742✔
1561
                }
1562
        }
1563

1564
        // parsing the updated query
1565
        var err error
27,253✔
1566
        qc.dqlRes, err = dql.ParseWithNeedVars(dql.Request{
27,253✔
1567
                Str:       upsertQuery,
27,253✔
1568
                Variables: qc.req.Vars,
27,253✔
1569
        }, needVars)
27,253✔
1570
        if err != nil {
27,272✔
1571
                return err
19✔
1572
        }
19✔
1573
        return validateQuery(qc.dqlRes.Query)
27,234✔
1574
}
1575

1576
func authorizeRequest(ctx context.Context, qc *queryContext) error {
32,357✔
1577
        if err := authorizeQuery(ctx, &qc.dqlRes, qc.graphql); err != nil {
32,365✔
1578
                return err
8✔
1579
        }
8✔
1580

1581
        // TODO(Aman): can be optimized to do the authorization in just one func call
1582
        for _, gmu := range qc.gmuList {
46,799✔
1583
                if err := authorizeMutation(ctx, gmu); err != nil {
14,463✔
1584
                        return err
13✔
1585
                }
13✔
1586
        }
1587

1588
        return nil
32,336✔
1589
}
1590

1591
func getHash(ns, startTs uint64) string {
50,352✔
1592
        h := sha256.New()
50,352✔
1593
        h.Write([]byte(fmt.Sprintf("%#x%#x%s", ns, startTs, x.WorkerConfig.HmacSecret)))
50,352✔
1594
        return hex.EncodeToString(h.Sum(nil))
50,352✔
1595
}
50,352✔
1596

1597
func validateNamespace(ctx context.Context, tc *api.TxnContext) error {
2,699✔
1598
        if !x.WorkerConfig.AclEnabled {
4,780✔
1599
                return nil
2,081✔
1600
        }
2,081✔
1601

1602
        ns, err := x.ExtractNamespaceFrom(ctx)
618✔
1603
        if err != nil {
618✔
1604
                return err
×
1605
        }
×
1606
        if tc.Hash != getHash(ns, tc.StartTs) {
618✔
1607
                return x.ErrHashMismatch
×
1608
        }
×
1609
        return nil
618✔
1610
}
1611

1612
// CommitOrAbort commits or aborts a transaction.
1613
func (s *Server) CommitOrAbort(ctx context.Context, tc *api.TxnContext) (*api.TxnContext, error) {
3,588✔
1614
        ctx, span := otrace.StartSpan(ctx, "Server.CommitOrAbort")
3,588✔
1615
        defer span.End()
3,588✔
1616

3,588✔
1617
        if err := x.HealthCheck(); err != nil {
3,588✔
1618
                return &api.TxnContext{}, err
×
1619
        }
×
1620

1621
        tctx := &api.TxnContext{}
3,588✔
1622
        if tc.StartTs == 0 {
4,477✔
1623
                return &api.TxnContext{}, errors.Errorf(
889✔
1624
                        "StartTs cannot be zero while committing a transaction")
889✔
1625
        }
889✔
1626
        if ns, err := x.ExtractNamespaceFrom(ctx); err == nil {
3,317✔
1627
                annotateNamespace(span, ns)
618✔
1628
        }
618✔
1629
        annotateStartTs(span, tc.StartTs)
2,699✔
1630

2,699✔
1631
        if err := validateNamespace(ctx, tc); err != nil {
2,699✔
1632
                return &api.TxnContext{}, err
×
1633
        }
×
1634

1635
        span.Annotatef(nil, "Txn Context received: %+v", tc)
2,699✔
1636
        commitTs, err := worker.CommitOverNetwork(ctx, tc)
2,699✔
1637
        if err == dgo.ErrAborted {
3,260✔
1638
                // If err returned is dgo.ErrAborted and tc.Aborted was set, that means the client has
561✔
1639
                // aborted the transaction by calling txn.Discard(). Hence return a nil error.
561✔
1640
                tctx.Aborted = true
561✔
1641
                if tc.Aborted {
983✔
1642
                        return tctx, nil
422✔
1643
                }
422✔
1644

1645
                return tctx, status.Errorf(codes.Aborted, err.Error())
139✔
1646
        }
1647
        tctx.StartTs = tc.StartTs
2,138✔
1648
        tctx.CommitTs = commitTs
2,138✔
1649
        return tctx, err
2,138✔
1650
}
1651

1652
// CheckVersion returns the version of this Dgraph instance.
1653
func (s *Server) CheckVersion(ctx context.Context, c *api.Check) (v *api.Version, err error) {
×
1654
        if err := x.HealthCheck(); err != nil {
×
1655
                return v, err
×
1656
        }
×
1657

1658
        v = new(api.Version)
×
1659
        v.Tag = x.Version()
×
1660
        return v, nil
×
1661
}
1662

1663
// -------------------------------------------------------------------------------------------------
1664
// HELPER FUNCTIONS
1665
// -------------------------------------------------------------------------------------------------
1666
func isMutationAllowed(ctx context.Context) bool {
16,670✔
1667
        if worker.Config.MutationsMode != worker.DisallowMutations {
33,336✔
1668
                return true
16,666✔
1669
        }
16,666✔
1670
        shareAllowed, ok := ctx.Value("_share_").(bool)
4✔
1671
        if !ok || !shareAllowed {
8✔
1672
                return false
4✔
1673
        }
4✔
1674
        return true
×
1675
}
1676

1677
var errNoAuth = errors.Errorf("No Auth Token found. Token needed for Admin operations.")
1678

1679
func hasAdminAuth(ctx context.Context, tag string) (net.Addr, error) {
1,697✔
1680
        ipAddr, err := x.HasWhitelistedIP(ctx)
1,697✔
1681
        if err != nil {
1,697✔
1682
                return nil, err
×
1683
        }
×
1684
        glog.Infof("Got %s request from: %q\n", tag, ipAddr)
1,697✔
1685
        if err = hasPoormansAuth(ctx); err != nil {
1,697✔
1686
                return nil, err
×
1687
        }
×
1688
        return ipAddr, nil
1,697✔
1689
}
1690

1691
func hasPoormansAuth(ctx context.Context) error {
1,697✔
1692
        if worker.Config.AuthToken == "" {
3,389✔
1693
                return nil
1,692✔
1694
        }
1,692✔
1695
        md, ok := metadata.FromIncomingContext(ctx)
5✔
1696
        if !ok {
5✔
1697
                return errNoAuth
×
1698
        }
×
1699
        tokens := md.Get("auth-token")
5✔
1700
        if len(tokens) == 0 {
5✔
1701
                return errNoAuth
×
1702
        }
×
1703
        if tokens[0] != worker.Config.AuthToken {
5✔
1704
                return errors.Errorf("Provided auth token [%s] does not match. Permission denied.", tokens[0])
×
1705
        }
×
1706
        return nil
5✔
1707
}
1708

1709
// parseMutationObject tries to consolidate fields of the api.Mutation into the
1710
// corresponding field of the returned dql.Mutation. For example, the 3 fields,
1711
// api.Mutation#SetJson, api.Mutation#SetNquads and api.Mutation#Set are consolidated into the
1712
// dql.Mutation.Set field. Similarly the 3 fields api.Mutation#DeleteJson, api.Mutation#DelNquads
1713
// and api.Mutation#Del are merged into the dql.Mutation#Del field.
1714
func parseMutationObject(mu *api.Mutation, qc *queryContext) (*dql.Mutation, error) {
15,786✔
1715
        res := &dql.Mutation{Cond: mu.Cond}
15,786✔
1716

15,786✔
1717
        if len(mu.SetJson) > 0 {
22,424✔
1718
                nqs, md, err := chunker.ParseJSON(mu.SetJson, chunker.SetNquads)
6,638✔
1719
                if err != nil {
6,639✔
1720
                        return nil, err
1✔
1721
                }
1✔
1722
                res.Set = append(res.Set, nqs...)
6,637✔
1723
                res.Metadata = md
6,637✔
1724
        }
1725
        if len(mu.DeleteJson) > 0 {
16,682✔
1726
                // The metadata is not currently needed for delete operations so it can be safely ignored.
897✔
1727
                nqs, _, err := chunker.ParseJSON(mu.DeleteJson, chunker.DeleteNquads)
897✔
1728
                if err != nil {
897✔
1729
                        return nil, err
×
1730
                }
×
1731
                res.Del = append(res.Del, nqs...)
897✔
1732
        }
1733
        if len(mu.SetNquads) > 0 {
21,611✔
1734
                nqs, md, err := chunker.ParseRDFs(mu.SetNquads)
5,826✔
1735
                if err != nil {
5,831✔
1736
                        return nil, err
5✔
1737
                }
5✔
1738
                res.Set = append(res.Set, nqs...)
5,821✔
1739
                res.Metadata = md
5,821✔
1740
        }
1741
        if len(mu.DelNquads) > 0 {
17,505✔
1742
                nqs, _, err := chunker.ParseRDFs(mu.DelNquads)
1,725✔
1743
                if err != nil {
1,725✔
1744
                        return nil, err
×
1745
                }
×
1746
                res.Del = append(res.Del, nqs...)
1,725✔
1747
        }
1748

1749
        res.Set = append(res.Set, mu.Set...)
15,780✔
1750
        res.Del = append(res.Del, mu.Del...)
15,780✔
1751
        // parse facets and convert to the binary format so that
15,780✔
1752
        // a field of type datetime like "2017-01-01" can be correctly encoded in the
15,780✔
1753
        // marshaled binary format as done in the time.Marshal method
15,780✔
1754
        if err := validateAndConvertFacets(res.Set); err != nil {
15,780✔
1755
                return nil, err
×
1756
        }
×
1757

1758
        if err := validateNQuads(res.Set, res.Del, qc); err != nil {
15,782✔
1759
                return nil, err
2✔
1760
        }
2✔
1761
        return res, nil
15,778✔
1762
}
1763

1764
func validateAndConvertFacets(nquads []*api.NQuad) error {
15,780✔
1765
        for _, m := range nquads {
269,598✔
1766
                encodedFacets := make([]*api.Facet, 0, len(m.Facets))
253,818✔
1767
                for _, f := range m.Facets {
254,041✔
1768
                        // try to interpret the value as binary first
223✔
1769
                        if _, err := facets.ValFor(f); err == nil {
442✔
1770
                                encodedFacets = append(encodedFacets, f)
219✔
1771
                        } else {
223✔
1772
                                encodedFacet, err := facets.FacetFor(f.Key, string(f.Value))
4✔
1773
                                if err != nil {
4✔
1774
                                        return err
×
1775
                                }
×
1776
                                encodedFacets = append(encodedFacets, encodedFacet)
4✔
1777
                        }
1778
                }
1779

1780
                m.Facets = encodedFacets
253,818✔
1781
        }
1782
        return nil
15,780✔
1783
}
1784

1785
// validateForGraphql validate nquads for graphql
1786
func validateForGraphql(nq *api.NQuad, isGraphql bool) error {
258,313✔
1787
        // Check whether the incoming predicate is graphql reserved predicate or not.
258,313✔
1788
        if !isGraphql && x.IsGraphqlReservedPredicate(nq.Predicate) {
258,313✔
1789
                return errors.Errorf("Cannot mutate graphql reserved predicate %s", nq.Predicate)
×
1790
        }
×
1791
        return nil
258,313✔
1792
}
1793

1794
func validateNQuads(set, del []*api.NQuad, qc *queryContext) error {
15,780✔
1795

15,780✔
1796
        for _, nq := range set {
269,598✔
1797
                if err := validatePredName(nq.Predicate); err != nil {
253,819✔
1798
                        return err
1✔
1799
                }
1✔
1800
                var ostar bool
253,817✔
1801
                if o, ok := nq.ObjectValue.GetVal().(*api.Value_DefaultVal); ok {
363,984✔
1802
                        ostar = o.DefaultVal == x.Star
110,167✔
1803
                }
110,167✔
1804
                if nq.Subject == x.Star || nq.Predicate == x.Star || ostar {
253,817✔
1805
                        return errors.Errorf("Cannot use star in set n-quad: %+v", nq)
×
1806
                }
×
1807
                if err := validateKeys(nq); err != nil {
253,817✔
1808
                        return errors.Wrapf(err, "key error: %+v", nq)
×
1809
                }
×
1810
                if err := validateForGraphql(nq, qc.graphql); err != nil {
253,817✔
1811
                        return err
×
1812
                }
×
1813
        }
1814
        for _, nq := range del {
20,276✔
1815
                if err := validatePredName(nq.Predicate); err != nil {
4,498✔
1816
                        return err
1✔
1817
                }
1✔
1818
                var ostar bool
4,496✔
1819
                if o, ok := nq.ObjectValue.GetVal().(*api.Value_DefaultVal); ok {
6,091✔
1820
                        ostar = o.DefaultVal == x.Star
1,595✔
1821
                }
1,595✔
1822
                if nq.Subject == x.Star || (nq.Predicate == x.Star && !ostar) {
4,496✔
1823
                        return errors.Errorf("Only valid wildcard delete patterns are 'S * *' and 'S P *': %v", nq)
×
1824
                }
×
1825
                if err := validateForGraphql(nq, qc.graphql); err != nil {
4,496✔
1826
                        return err
×
1827
                }
×
1828
                // NOTE: we dont validateKeys() with delete to let users fix existing mistakes
1829
                // with bad predicate forms. ex: foo@bar ~something
1830
        }
1831
        return nil
15,778✔
1832
}
1833

1834
func validateKey(key string) error {
254,065✔
1835
        switch {
254,065✔
1836
        case key == "":
×
1837
                return errors.Errorf("Has zero length")
×
1838
        case strings.ContainsAny(key, "~@"):
9✔
1839
                return errors.Errorf("Has invalid characters")
9✔
1840
        case strings.IndexFunc(key, unicode.IsSpace) != -1:
4✔
1841
                return errors.Errorf("Must not contain spaces")
4✔
1842
        }
1843
        return nil
254,052✔
1844
}
1845

1846
// validateKeys checks predicate and facet keys in N-Quad for syntax errors.
1847
func validateKeys(nq *api.NQuad) error {
253,832✔
1848
        if err := validateKey(nq.Predicate); err != nil {
253,839✔
1849
                return errors.Wrapf(err, "predicate %q", nq.Predicate)
7✔
1850
        }
7✔
1851
        for i := range nq.Facets {
254,058✔
1852
                if nq.Facets[i] == nil {
233✔
1853
                        continue
×
1854
                }
1855
                if err := validateKey(nq.Facets[i].Key); err != nil {
239✔
1856
                        return errors.Errorf("Facet %q, %s", nq.Facets[i].Key, err)
6✔
1857
                }
6✔
1858
        }
1859
        return nil
253,819✔
1860
}
1861

1862
// validateQuery verifies that the query does not contain any preds that
1863
// are longer than the limit (2^16).
1864
func validateQuery(queries []*dql.GraphQuery) error {
137,046✔
1865
        for _, q := range queries {
246,859✔
1866
                if err := validatePredName(q.Attr); err != nil {
109,814✔
1867
                        return err
1✔
1868
                }
1✔
1869

1870
                if err := validateQuery(q.Children); err != nil {
109,813✔
1871
                        return err
1✔
1872
                }
1✔
1873
        }
1874

1875
        return nil
137,044✔
1876
}
1877

1878
func validatePredName(name string) error {
371,855✔
1879
        if len(name) > math.MaxUint16 {
371,859✔
1880
                return errors.Errorf("Predicate name length cannot be bigger than 2^16. Predicate: %v",
4✔
1881
                        name[:80])
4✔
1882
        }
4✔
1883
        return nil
371,851✔
1884
}
1885

1886
// formatTypes takes a list of TypeUpdates and converts them in to a list of
1887
// maps in a format that is human-readable to be marshaled into JSON.
1888
func formatTypes(typeList []*pb.TypeUpdate) []map[string]interface{} {
66✔
1889
        var res []map[string]interface{}
66✔
1890
        for _, typ := range typeList {
480✔
1891
                typeMap := make(map[string]interface{})
414✔
1892
                typeMap["name"] = typ.TypeName
414✔
1893
                fields := make([]map[string]string, len(typ.Fields))
414✔
1894

414✔
1895
                for i, field := range typ.Fields {
1,321✔
1896
                        m := make(map[string]string, 1)
907✔
1897
                        m["name"] = field.Predicate
907✔
1898
                        fields[i] = m
907✔
1899
                }
907✔
1900
                typeMap["fields"] = fields
414✔
1901

414✔
1902
                res = append(res, typeMap)
414✔
1903
        }
1904
        return res
66✔
1905
}
1906

1907
func isDropAll(op *api.Operation) bool {
2,328✔
1908
        if op.DropAll || op.DropOp == api.Operation_ALL {
2,930✔
1909
                return true
602✔
1910
        }
602✔
1911
        return false
1,726✔
1912
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc