• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

dgraph-io / dgraph / 5262788795

14 Jun 2023 04:29AM UTC coverage: 66.997% (+0.2%) from 66.8%
5262788795

push

web-flow
Merge e61abef06 into 2787cfc58

58163 of 86814 relevant lines covered (67.0%)

2270813.74 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

82.36
/worker/task.go
1
/*
2
 * Copyright 2016-2023 Dgraph Labs, Inc. and Contributors
3
 *
4
 * Licensed under the Apache License, Version 2.0 (the "License");
5
 * you may not use this file except in compliance with the License.
6
 * You may obtain a copy of the License at
7
 *
8
 *     http://www.apache.org/licenses/LICENSE-2.0
9
 *
10
 * Unless required by applicable law or agreed to in writing, software
11
 * distributed under the License is distributed on an "AS IS" BASIS,
12
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
 * See the License for the specific language governing permissions and
14
 * limitations under the License.
15
 */
16

17
package worker
18

19
import (
20
        "bytes"
21
        "context"
22
        "sort"
23
        "strconv"
24
        "strings"
25
        "time"
26

27
        "github.com/golang/glog"
28
        "github.com/golang/protobuf/proto"
29
        cindex "github.com/google/codesearch/index"
30
        cregexp "github.com/google/codesearch/regexp"
31
        "github.com/pkg/errors"
32
        otrace "go.opencensus.io/trace"
33
        "golang.org/x/sync/errgroup"
34

35
        "github.com/dgraph-io/badger/v4"
36
        "github.com/dgraph-io/dgo/v230/protos/api"
37
        "github.com/dgraph-io/dgraph/algo"
38
        "github.com/dgraph-io/dgraph/conn"
39
        "github.com/dgraph-io/dgraph/posting"
40
        "github.com/dgraph-io/dgraph/protos/pb"
41
        "github.com/dgraph-io/dgraph/schema"
42
        ctask "github.com/dgraph-io/dgraph/task"
43
        "github.com/dgraph-io/dgraph/tok"
44
        "github.com/dgraph-io/dgraph/types"
45
        "github.com/dgraph-io/dgraph/types/facets"
46
        "github.com/dgraph-io/dgraph/x"
47
)
48

49
func invokeNetworkRequest(ctx context.Context, addr string,
50
        f func(context.Context, pb.WorkerClient) (interface{}, error)) (interface{}, error) {
18,913✔
51
        pl, err := conn.GetPools().Get(addr)
18,913✔
52
        if err != nil {
18,913✔
53
                return nil, errors.Wrapf(err, "dispatchTaskOverNetwork: while retrieving connection.")
×
54
        }
×
55

56
        if span := otrace.FromContext(ctx); span != nil {
37,826✔
57
                span.Annotatef(nil, "invokeNetworkRequest: Sending request to %v", addr)
18,913✔
58
        }
18,913✔
59
        c := pb.NewWorkerClient(pl.Get())
18,913✔
60
        return f(ctx, c)
18,913✔
61
}
62

63
const backupRequestGracePeriod = time.Second
64

65
// TODO: Cross-server cancellation as described in Jeff Dean's talk.
66
func processWithBackupRequest(
67
        ctx context.Context,
68
        gid uint32,
69
        f func(context.Context, pb.WorkerClient) (interface{}, error)) (interface{}, error) {
18,866✔
70
        addrs := groups().AnyTwoServers(gid)
18,866✔
71
        if len(addrs) == 0 {
18,866✔
72
                return nil, errors.New("No network connection")
×
73
        }
×
74
        if len(addrs) == 1 {
19,746✔
75
                reply, err := invokeNetworkRequest(ctx, addrs[0], f)
880✔
76
                return reply, err
880✔
77
        }
880✔
78
        type taskresult struct {
17,986✔
79
                reply interface{}
17,986✔
80
                err   error
17,986✔
81
        }
17,986✔
82

17,986✔
83
        chResults := make(chan taskresult, len(addrs))
17,986✔
84
        ctx0, cancel := context.WithCancel(ctx)
17,986✔
85
        defer cancel()
17,986✔
86

17,986✔
87
        go func() {
35,972✔
88
                reply, err := invokeNetworkRequest(ctx0, addrs[0], f)
17,986✔
89
                chResults <- taskresult{reply, err}
17,986✔
90
        }()
17,986✔
91

92
        timer := time.NewTimer(backupRequestGracePeriod)
17,986✔
93
        defer timer.Stop()
17,986✔
94

17,986✔
95
        select {
17,986✔
96
        case <-ctx.Done():
×
97
                return nil, ctx.Err()
×
98
        case <-timer.C:
9✔
99
                go func() {
18✔
100
                        reply, err := invokeNetworkRequest(ctx0, addrs[1], f)
9✔
101
                        chResults <- taskresult{reply, err}
9✔
102
                }()
9✔
103
                select {
9✔
104
                case <-ctx.Done():
×
105
                        return nil, ctx.Err()
×
106
                case result := <-chResults:
9✔
107
                        if result.err != nil {
10✔
108
                                select {
1✔
109
                                case <-ctx.Done():
×
110
                                        return nil, ctx.Err()
×
111
                                case result := <-chResults:
1✔
112
                                        return result.reply, result.err
1✔
113
                                }
114
                        } else {
8✔
115
                                return result.reply, nil
8✔
116
                        }
8✔
117
                }
118
        case result := <-chResults:
17,977✔
119
                if result.err != nil {
18,015✔
120
                        cancel() // Might as well cleanup resources ASAP
38✔
121
                        timer.Stop()
38✔
122
                        return invokeNetworkRequest(ctx, addrs[1], f)
38✔
123
                }
38✔
124
                return result.reply, nil
17,939✔
125
        }
126
}
127

128
// ProcessTaskOverNetwork is used to process the query and get the result from
129
// the instance which stores posting list corresponding to the predicate in the
130
// query.
131
func ProcessTaskOverNetwork(ctx context.Context, q *pb.Query) (*pb.Result, error) {
85,127✔
132
        attr := q.Attr
85,127✔
133
        gid, err := groups().BelongsToReadOnly(attr, q.ReadTs)
85,127✔
134
        switch {
85,127✔
135
        case err != nil:
×
136
                return nil, err
×
137
        case gid == 0:
972✔
138
                return nil, errNonExistentTablet
972✔
139
        }
140

141
        span := otrace.FromContext(ctx)
84,155✔
142
        if span != nil {
168,307✔
143
                span.Annotatef(nil, "ProcessTaskOverNetwork. attr: %v gid: %v, readTs: %d, node id: %d",
84,152✔
144
                        attr, gid, q.ReadTs, groups().Node.Id)
84,152✔
145
        }
84,152✔
146

147
        if groups().ServesGroup(gid) {
149,444✔
148
                // No need for a network call, as this should be run from within this instance.
65,289✔
149
                return processTask(ctx, q, gid)
65,289✔
150
        }
65,289✔
151

152
        result, err := processWithBackupRequest(ctx, gid,
18,866✔
153
                func(ctx context.Context, c pb.WorkerClient) (interface{}, error) {
37,779✔
154
                        return c.ServeTask(ctx, q)
18,913✔
155
                })
18,913✔
156
        if err != nil {
18,906✔
157
                return nil, err
40✔
158
        }
40✔
159

160
        reply := result.(*pb.Result)
18,826✔
161
        if span != nil {
37,652✔
162
                span.Annotatef(nil, "Reply from server. len: %v gid: %v Attr: %v",
18,826✔
163
                        len(reply.UidMatrix), gid, attr)
18,826✔
164
        }
18,826✔
165
        return reply, nil
18,826✔
166
}
167

168
// convertValue converts the data to the schema.State() type of predicate.
169
func convertValue(attr, data string) (types.Val, error) {
16,816✔
170
        // Parse given value and get token. There should be only one token.
16,816✔
171
        t, err := schema.State().TypeOf(attr)
16,816✔
172
        if err != nil {
16,930✔
173
                return types.Val{}, err
114✔
174
        }
114✔
175
        if !t.IsScalar() {
16,702✔
176
                return types.Val{}, errors.Errorf("Attribute %s is not valid scalar type",
×
177
                        x.ParseAttr(attr))
×
178
        }
×
179
        src := types.Val{Tid: types.StringID, Value: []byte(data)}
16,702✔
180
        dst, err := types.Convert(src, t)
16,702✔
181
        return dst, err
16,702✔
182
}
183

184
// Returns nil byte on error
185
func convertToType(v types.Val, typ types.TypeID) (*pb.TaskValue, error) {
422,602✔
186
        result := &pb.TaskValue{ValType: typ.Enum(), Val: x.Nilbyte}
422,602✔
187
        if v.Tid == typ {
845,191✔
188
                result.Val = v.Value.([]byte)
422,589✔
189
                return result, nil
422,589✔
190
        }
422,589✔
191

192
        // convert data from binary to appropriate format
193
        val, err := types.Convert(v, typ)
13✔
194
        if err != nil {
13✔
195
                return result, err
×
196
        }
×
197
        // Marshal
198
        data := types.ValueForType(types.BinaryID)
13✔
199
        err = types.Marshal(val, &data)
13✔
200
        if err != nil {
13✔
201
                return result, errors.Errorf("Failed convertToType during Marshal")
×
202
        }
×
203
        result.Val = data.Value.([]byte)
13✔
204
        return result, nil
13✔
205
}
206

207
// FuncType represents the type of a query function (aggregation, has, etc).
208
type FuncType int
209

210
const (
211
        notAFunction FuncType = iota
212
        aggregatorFn
213
        compareAttrFn
214
        compareScalarFn
215
        geoFn
216
        passwordFn
217
        regexFn
218
        fullTextSearchFn
219
        hasFn
220
        uidInFn
221
        customIndexFn
222
        matchFn
223
        standardFn = 100
224
)
225

226
func parseFuncType(srcFunc *pb.SrcFunction) (FuncType, string) {
84,172✔
227
        if srcFunc == nil {
147,585✔
228
                return notAFunction, ""
63,413✔
229
        }
63,413✔
230
        ftype, fname := parseFuncTypeHelper(srcFunc.Name)
20,759✔
231
        if srcFunc.IsCount && ftype == compareAttrFn {
20,804✔
232
                // gt(release_date, "1990") is 'CompareAttr' which
45✔
233
                //    takes advantage of indexed-attr
45✔
234
                // gt(count(films), 0) is 'CompareScalar', we first do
45✔
235
                //    counting on attr, then compare the result as scalar with int
45✔
236
                return compareScalarFn, fname
45✔
237
        }
45✔
238
        return ftype, fname
20,714✔
239
}
240

241
func parseFuncTypeHelper(name string) (FuncType, string) {
20,822✔
242
        if len(name) == 0 {
20,822✔
243
                return notAFunction, ""
×
244
        }
×
245
        f := strings.ToLower(name)
20,822✔
246
        switch f {
20,822✔
247
        case "le", "ge", "lt", "gt", "eq", "between":
16,710✔
248
                return compareAttrFn, f
16,710✔
249
        case "min", "max", "sum", "avg":
5✔
250
                return aggregatorFn, f
5✔
251
        case "checkpwd":
473✔
252
                return passwordFn, f
473✔
253
        case "regexp":
110✔
254
                return regexFn, f
110✔
255
        case "alloftext", "anyoftext":
34✔
256
                return fullTextSearchFn, f
34✔
257
        case "has":
3,141✔
258
                return hasFn, f
3,141✔
259
        case "uid_in":
29✔
260
                return uidInFn, f
29✔
261
        case "anyof", "allof":
18✔
262
                return customIndexFn, f
18✔
263
        case "match":
19✔
264
                return matchFn, f
19✔
265
        default:
283✔
266
                if types.IsGeoFunc(f) {
306✔
267
                        return geoFn, f
23✔
268
                }
23✔
269
                return standardFn, f
260✔
270
        }
271
}
272

273
func needsIndex(fnType FuncType, uidList *pb.List) bool {
83,980✔
274
        switch fnType {
83,980✔
275
        case compareAttrFn:
16,493✔
276
                if uidList != nil {
20,448✔
277
                        // UidList is not nil means this is a filter. Filter predicate is not indexed, so
3,955✔
278
                        // instead of fetching values by index key, we will fetch value by data key
3,955✔
279
                        // (from uid and predicate) and apply filter on values.
3,955✔
280
                        return false
3,955✔
281
                }
3,955✔
282
                return true
12,538✔
283
        case geoFn, fullTextSearchFn, standardFn, matchFn:
306✔
284
                return true
306✔
285
        }
286
        return false
67,181✔
287
}
288

289
// needsIntersect checks if the function type needs algo.IntersectSorted() after the results
290
// are collected. This is needed for functions that require all values to  match, like
291
// "allofterms", "alloftext", and custom functions with "allof".
292
// Returns true if function results need intersect, false otherwise.
293
func needsIntersect(fnName string) bool {
305✔
294
        return strings.HasPrefix(fnName, "allof") || strings.HasSuffix(fnName, "allof")
305✔
295
}
305✔
296

297
type funcArgs struct {
298
        q     *pb.Query
299
        gid   uint32
300
        srcFn *functionContext
301
        out   *pb.Result
302
}
303

304
// The function tells us whether we want to fetch value posting lists or uid posting lists.
305
func (srcFn *functionContext) needsValuePostings(typ types.TypeID) (bool, error) {
83,975✔
306
        switch srcFn.fnType {
83,975✔
307
        case aggregatorFn, passwordFn:
478✔
308
                return true, nil
478✔
309
        case compareAttrFn:
16,488✔
310
                if len(srcFn.tokens) > 0 {
32,920✔
311
                        return false, nil
16,432✔
312
                }
16,432✔
313
                return true, nil
56✔
314
        case geoFn, regexFn, fullTextSearchFn, standardFn, hasFn, customIndexFn, matchFn:
3,573✔
315
                // All of these require an index, hence would require fetching uid postings.
3,573✔
316
                return false, nil
3,573✔
317
        case uidInFn, compareScalarFn:
67✔
318
                // Operate on uid postings
67✔
319
                return false, nil
67✔
320
        case notAFunction:
63,369✔
321
                return typ.IsScalar(), nil
63,369✔
322
        }
323
        return false, errors.Errorf("Unhandled case in fetchValuePostings for fn: %s", srcFn.fname)
×
324
}
325

326
// Handles fetching of value posting lists and filtering of uids based on that.
327
func (qs *queryState) handleValuePostings(ctx context.Context, args funcArgs) error {
47,672✔
328
        srcFn := args.srcFn
47,672✔
329
        q := args.q
47,672✔
330

47,672✔
331
        facetsTree, err := preprocessFilter(q.FacetsFilter)
47,672✔
332
        if err != nil {
47,672✔
333
                return err
×
334
        }
×
335

336
        span := otrace.FromContext(ctx)
47,672✔
337
        stop := x.SpanTimer(span, "handleValuePostings")
47,672✔
338
        defer stop()
47,672✔
339
        if span != nil {
95,344✔
340
                span.Annotatef(nil, "Number of uids: %d. args.srcFn: %+v", srcFn.n, args.srcFn)
47,672✔
341
        }
47,672✔
342

343
        switch srcFn.fnType {
47,672✔
344
        case notAFunction, aggregatorFn, passwordFn, compareAttrFn:
47,672✔
345
        default:
×
346
                return errors.Errorf("Unhandled function in handleValuePostings: %s", srcFn.fname)
×
347
        }
348

349
        if srcFn.atype == types.PasswordID && srcFn.fnType != passwordFn {
47,688✔
350
                // Silently skip if the user is trying to fetch an attribute of type password.
16✔
351
                return nil
16✔
352
        }
16✔
353
        if srcFn.fnType == passwordFn && srcFn.atype != types.PasswordID {
47,657✔
354
                return errors.Errorf("checkpwd fn can only be used on attr: [%s] with schema type "+
1✔
355
                        "password. Got type: %s", x.ParseAttr(q.Attr), srcFn.atype.Name())
1✔
356
        }
1✔
357
        if srcFn.n == 0 {
60,275✔
358
                return nil
12,620✔
359
        }
12,620✔
360

361
        // srcFn.n should be equal to len(q.UidList.Uids) for below implementation(DivideAndRule and
362
        // calculate) to work correctly. But we have seen some panics while forming DataKey in
363
        // calculate(). panic is of the form "index out of range [4] with length 1". Hence return error
364
        // from here when srcFn.n != len(q.UidList.Uids).
365
        if srcFn.n != len(q.UidList.Uids) {
35,035✔
366
                return errors.Errorf("srcFn.n: %d is not equal to len(q.UidList.Uids): %d, srcFn: %+v in "+
×
367
                        "handleValuePostings", srcFn.n, len(q.UidList.GetUids()), srcFn)
×
368
        }
×
369

370
        // This function has small boilerplate as handleUidPostings, around how the code gets
371
        // concurrently executed. I didn't see much value in trying to separate it out, because the core
372
        // logic constitutes most of the code volume here.
373
        numGo, width := x.DivideAndRule(srcFn.n)
35,035✔
374
        x.AssertTrue(width > 0)
35,035✔
375
        span.Annotatef(nil, "Width: %d. NumGo: %d", width, numGo)
35,035✔
376

35,035✔
377
        outputs := make([]*pb.Result, numGo)
35,035✔
378
        listType := schema.State().IsList(q.Attr)
35,035✔
379

35,035✔
380
        calculate := func(start, end int) error {
70,196✔
381
                x.AssertTrue(start%width == 0)
35,161✔
382
                out := &pb.Result{}
35,161✔
383
                outputs[start/width] = out
35,161✔
384

35,161✔
385
                for i := start; i < end; i++ {
457,835✔
386
                        select {
422,674✔
387
                        case <-ctx.Done():
×
388
                                return ctx.Err()
×
389
                        default:
422,674✔
390
                        }
391
                        key := x.DataKey(q.Attr, q.UidList.Uids[i])
422,674✔
392

422,674✔
393
                        // Get or create the posting list for an entity, attribute combination.
422,674✔
394
                        pl, err := qs.cache.Get(key)
422,674✔
395
                        if err != nil {
422,674✔
396
                                return err
×
397
                        }
×
398

399
                        // If count is being requested, there is no need to populate value and facets matrix.
400
                        if q.DoCount {
422,690✔
401
                                count, err := countForValuePostings(args, pl, facetsTree, listType)
16✔
402
                                if err != nil && err != posting.ErrNoValue {
16✔
403
                                        return err
×
404
                                }
×
405
                                out.Counts = append(out.Counts, uint32(count))
16✔
406
                                // Add an empty UID list to make later processing consistent.
16✔
407
                                out.UidMatrix = append(out.UidMatrix, &pb.List{})
16✔
408
                                continue
16✔
409
                        }
410

411
                        vals, fcs, err := retrieveValuesAndFacets(args, pl, facetsTree, listType)
422,658✔
412
                        switch {
422,658✔
413
                        case err == posting.ErrNoValue || (err == nil && len(vals) == 0):
1,886✔
414
                                // This branch is taken when the value does not exist in the pl or
1,886✔
415
                                // the number of values retrieved is zero (there could still be facets).
1,886✔
416
                                // We add empty lists to the UidMatrix, FaceMatrix, ValueMatrix and
1,886✔
417
                                // LangMatrix so that all these data structure have predictable layouts.
1,886✔
418
                                out.UidMatrix = append(out.UidMatrix, &pb.List{})
1,886✔
419
                                out.FacetMatrix = append(out.FacetMatrix, &pb.FacetsList{})
1,886✔
420
                                out.ValueMatrix = append(out.ValueMatrix,
1,886✔
421
                                        &pb.ValueList{Values: []*pb.TaskValue{}})
1,886✔
422
                                if q.ExpandAll {
1,894✔
423
                                        // To keep the cardinality same as that of ValueMatrix.
8✔
424
                                        out.LangMatrix = append(out.LangMatrix, &pb.LangList{})
8✔
425
                                }
8✔
426
                                continue
1,886✔
427
                        case err != nil:
×
428
                                return err
×
429
                        }
430

431
                        if q.ExpandAll {
420,855✔
432
                                langTags, err := pl.GetLangTags(args.q.ReadTs)
83✔
433
                                if err != nil {
83✔
434
                                        return err
×
435
                                }
×
436
                                out.LangMatrix = append(out.LangMatrix, &pb.LangList{Lang: langTags})
83✔
437
                        }
438

439
                        uidList := new(pb.List)
420,772✔
440
                        var vl pb.ValueList
420,772✔
441
                        for _, val := range vals {
843,374✔
442
                                newValue, err := convertToType(val, srcFn.atype)
422,602✔
443
                                if err != nil {
422,602✔
444
                                        return err
×
445
                                }
×
446

447
                                // This means we fetched the value directly instead of fetching index key and
448
                                // intersecting. Lets compare the value and add filter the uid.
449
                                if srcFn.fnType == compareAttrFn {
422,828✔
450
                                        // Lets convert the val to its type.
226✔
451
                                        if val, err = types.Convert(val, srcFn.atype); err != nil {
226✔
452
                                                return err
×
453
                                        }
×
454
                                        switch srcFn.fname {
226✔
455
                                        case "eq":
23✔
456
                                                for _, eqToken := range srcFn.eqTokens {
48✔
457
                                                        if types.CompareVals(srcFn.fname, val, eqToken) {
34✔
458
                                                                uidList.Uids = append(uidList.Uids, q.UidList.Uids[i])
9✔
459
                                                                break
9✔
460
                                                        }
461
                                                }
462
                                        case "between":
25✔
463
                                                if types.CompareBetween(val, srcFn.eqTokens[0], srcFn.eqTokens[1]) {
38✔
464
                                                        uidList.Uids = append(uidList.Uids, q.UidList.Uids[i])
13✔
465
                                                }
13✔
466
                                        default:
178✔
467
                                                if types.CompareVals(srcFn.fname, val, srcFn.eqTokens[0]) {
276✔
468
                                                        uidList.Uids = append(uidList.Uids, q.UidList.Uids[i])
98✔
469
                                                }
98✔
470
                                        }
471

472
                                } else {
422,376✔
473
                                        vl.Values = append(vl.Values, newValue)
422,376✔
474
                                }
422,376✔
475
                        }
476
                        out.ValueMatrix = append(out.ValueMatrix, &vl)
420,772✔
477

420,772✔
478
                        // Add facets to result.
420,772✔
479
                        out.FacetMatrix = append(out.FacetMatrix, fcs)
420,772✔
480

420,772✔
481
                        switch {
420,772✔
482
                        case srcFn.fnType == aggregatorFn:
20✔
483
                                // Add an empty UID list to make later processing consistent
20✔
484
                                out.UidMatrix = append(out.UidMatrix, &pb.List{})
20✔
485
                        case srcFn.fnType == passwordFn:
469✔
486
                                lastPos := len(out.ValueMatrix) - 1
469✔
487
                                if len(out.ValueMatrix[lastPos].Values) == 0 {
469✔
488
                                        continue
×
489
                                }
490
                                newValue := out.ValueMatrix[lastPos].Values[0]
469✔
491
                                if len(newValue.Val) == 0 {
469✔
492
                                        out.ValueMatrix[lastPos].Values[0] = ctask.FalseVal
×
493
                                }
×
494
                                pwd := q.SrcFunc.Args[0]
469✔
495
                                err = types.VerifyPassword(pwd, string(newValue.Val))
469✔
496
                                if err != nil {
495✔
497
                                        out.ValueMatrix[lastPos].Values[0] = ctask.FalseVal
26✔
498
                                } else {
469✔
499
                                        out.ValueMatrix[lastPos].Values[0] = ctask.TrueVal
443✔
500
                                }
443✔
501
                                // Add an empty UID list to make later processing consistent
502
                                out.UidMatrix = append(out.UidMatrix, &pb.List{})
469✔
503
                        default:
420,283✔
504
                                out.UidMatrix = append(out.UidMatrix, uidList)
420,283✔
505
                        }
506
                }
507
                return nil
35,161✔
508
        } // End of calculate function.
509

510
        var g errgroup.Group
35,035✔
511
        for i := 0; i < numGo; i++ {
70,196✔
512
                start := i * width
35,161✔
513
                end := start + width
35,161✔
514
                if end > srcFn.n {
35,163✔
515
                        end = srcFn.n
2✔
516
                }
2✔
517
                g.Go(func() error {
70,322✔
518
                        return calculate(start, end)
35,161✔
519
                })
35,161✔
520
        }
521
        if err := g.Wait(); err != nil {
35,035✔
522
                return err
×
523
        }
×
524

525
        // All goroutines are done. Now attach their results.
526
        out := args.out
35,035✔
527
        for _, chunk := range outputs {
70,196✔
528
                out.UidMatrix = append(out.UidMatrix, chunk.UidMatrix...)
35,161✔
529
                out.Counts = append(out.Counts, chunk.Counts...)
35,161✔
530
                out.ValueMatrix = append(out.ValueMatrix, chunk.ValueMatrix...)
35,161✔
531
                out.FacetMatrix = append(out.FacetMatrix, chunk.FacetMatrix...)
35,161✔
532
                out.LangMatrix = append(out.LangMatrix, chunk.LangMatrix...)
35,161✔
533
        }
35,161✔
534
        return nil
35,035✔
535
}
536

537
func facetsFilterValuePostingList(args funcArgs, pl *posting.List, facetsTree *facetsTree,
538
        listType bool, fn func(p *pb.Posting)) error {
422,674✔
539
        q := args.q
422,674✔
540

422,674✔
541
        var langMatch *pb.Posting
422,674✔
542
        var err error
422,674✔
543

422,674✔
544
        // We need to pick multiple postings only in two cases:
422,674✔
545
        // 1. ExpandAll is true.
422,674✔
546
        // 2. Attribute type is of list type and no lang tag is specified in query.
422,674✔
547
        pickMultiplePostings := q.ExpandAll || (listType && len(q.Langs) == 0)
422,674✔
548

422,674✔
549
        if !pickMultiplePostings {
842,148✔
550
                // Retrieve the posting that matches the language preferences.
419,474✔
551
                langMatch, err = pl.PostingFor(q.ReadTs, q.Langs)
419,474✔
552
                if err != nil && err != posting.ErrNoValue {
419,474✔
553
                        return err
×
554
                }
×
555
        }
556

557
        // TODO(Ashish): This function starts iteration from start(afterUID is always 0). This can be
558
        // optimized in come cases. For example when we know lang tag to fetch, we can directly jump
559
        // to posting starting with that UID(check list.ValueFor()).
560
        return pl.Iterate(q.ReadTs, 0, func(p *pb.Posting) error {
846,058✔
561
                if q.ExpandAll {
423,478✔
562
                        // If q.ExpandAll is true we need to consider all postings irrespective of langs.
94✔
563
                } else if listType && len(q.Langs) == 0 {
428,180✔
564
                        // Don't retrieve tagged values unless explicitly asked.
4,796✔
565
                        if len(p.LangTag) > 0 {
4,796✔
566
                                return nil
×
567
                        }
×
568
                } else {
418,494✔
569
                        // Only consider the posting that matches our language preferences.
418,494✔
570
                        if !proto.Equal(p, langMatch) {
418,960✔
571
                                return nil
466✔
572
                        }
466✔
573
                }
574

575
                // If filterTree is nil, applyFacetsTree returns true and nil error.
576
                picked, err := applyFacetsTree(p.Facets, facetsTree)
422,918✔
577
                if err != nil {
422,918✔
578
                        return err
×
579
                }
×
580
                if picked {
845,534✔
581
                        fn(p)
422,616✔
582
                }
422,616✔
583

584
                if pickMultiplePostings {
427,808✔
585
                        return nil // Continue iteration.
4,890✔
586
                }
4,890✔
587

588
                // We have picked the right posting, we can stop iteration now.
589
                return posting.ErrStopIteration
418,028✔
590
        })
591
}
592

593
func countForValuePostings(args funcArgs, pl *posting.List, facetsTree *facetsTree,
594
        listType bool) (int, error) {
16✔
595
        var filteredCount int
16✔
596
        err := facetsFilterValuePostingList(args, pl, facetsTree, listType, func(p *pb.Posting) {
30✔
597
                filteredCount++
14✔
598
        })
14✔
599
        if err != nil {
16✔
600
                return 0, err
×
601
        }
×
602

603
        return filteredCount, nil
16✔
604
}
605

606
func retrieveValuesAndFacets(args funcArgs, pl *posting.List, facetsTree *facetsTree,
607
        listType bool) ([]types.Val, *pb.FacetsList, error) {
422,658✔
608
        q := args.q
422,658✔
609
        var vals []types.Val
422,658✔
610
        var fcs []*pb.Facets
422,658✔
611

422,658✔
612
        err := facetsFilterValuePostingList(args, pl, facetsTree, listType, func(p *pb.Posting) {
845,260✔
613
                vals = append(vals, types.Val{
422,602✔
614
                        Tid:   types.TypeID(p.ValType),
422,602✔
615
                        Value: p.Value,
422,602✔
616
                })
422,602✔
617
                if q.FacetParam != nil {
422,768✔
618
                        fcs = append(fcs, &pb.Facets{Facets: facets.CopyFacets(p.Facets, q.FacetParam)})
166✔
619
                }
166✔
620
        })
621
        if err != nil {
422,658✔
622
                return nil, nil, err
×
623
        }
×
624

625
        return vals, &pb.FacetsList{FacetsList: fcs}, nil
422,658✔
626
}
627

628
func facetsFilterUidPostingList(pl *posting.List, facetsTree *facetsTree, opts posting.ListOptions,
629
        fn func(*pb.Posting)) error {
482✔
630

482✔
631
        return pl.Postings(opts, func(p *pb.Posting) error {
9,425✔
632
                // If filterTree is nil, applyFacetsTree returns true and nil error.
8,943✔
633
                pick, err := applyFacetsTree(p.Facets, facetsTree)
8,943✔
634
                if err != nil {
8,943✔
635
                        return err
×
636
                }
×
637
                if pick {
17,754✔
638
                        fn(p)
8,811✔
639
                }
8,811✔
640
                return nil
8,943✔
641
        })
642
}
643

644
func countForUidPostings(args funcArgs, pl *posting.List, facetsTree *facetsTree,
645
        opts posting.ListOptions) (int, error) {
104✔
646

104✔
647
        var filteredCount int
104✔
648
        err := facetsFilterUidPostingList(pl, facetsTree, opts, func(p *pb.Posting) {
287✔
649
                filteredCount++
183✔
650
        })
183✔
651
        if err != nil {
104✔
652
                return 0, err
×
653
        }
×
654

655
        return filteredCount, nil
104✔
656
}
657

658
func retrieveUidsAndFacets(args funcArgs, pl *posting.List, facetsTree *facetsTree,
659
        opts posting.ListOptions) (*pb.List, []*pb.Facets, error) {
378✔
660
        q := args.q
378✔
661

378✔
662
        var fcsList []*pb.Facets
378✔
663
        uidList := &pb.List{
378✔
664
                Uids: make([]uint64, 0, pl.ApproxLen()), // preallocate uid slice.
378✔
665
        }
378✔
666

378✔
667
        err := facetsFilterUidPostingList(pl, facetsTree, opts, func(p *pb.Posting) {
9,006✔
668
                uidList.Uids = append(uidList.Uids, p.Uid)
8,628✔
669
                if q.FacetParam != nil {
17,218✔
670
                        fcsList = append(fcsList, &pb.Facets{
8,590✔
671
                                Facets: facets.CopyFacets(p.Facets, q.FacetParam),
8,590✔
672
                        })
8,590✔
673
                }
8,590✔
674
        })
675
        if err != nil {
378✔
676
                return nil, nil, err
×
677
        }
×
678

679
        return uidList, fcsList, nil
378✔
680
}
681

682
// This function handles operations on uid posting lists. Index keys, reverse keys and some data
683
// keys store uid posting lists.
684
func (qs *queryState) handleUidPostings(
685
        ctx context.Context, args funcArgs, opts posting.ListOptions) error {
36,303✔
686
        srcFn := args.srcFn
36,303✔
687
        q := args.q
36,303✔
688

36,303✔
689
        facetsTree, err := preprocessFilter(q.FacetsFilter)
36,303✔
690
        if err != nil {
36,303✔
691
                return err
×
692
        }
×
693

694
        span := otrace.FromContext(ctx)
36,303✔
695
        stop := x.SpanTimer(span, "handleUidPostings")
36,303✔
696
        defer stop()
36,303✔
697
        if span != nil {
72,606✔
698
                span.Annotatef(nil, "Number of uids: %d. args.srcFn: %+v", srcFn.n, args.srcFn)
36,303✔
699
        }
36,303✔
700
        if srcFn.n == 0 {
39,868✔
701
                return nil
3,565✔
702
        }
3,565✔
703

704
        // srcFn.n should be equal to len(q.UidList.Uids) for below implementation(DivideAndRule and
705
        // calculate) to work correctly. But we have seen some panics while forming DataKey in
706
        // calculate(). panic is of the form "index out of range [4] with length 1". Hence return error
707
        // from here when srcFn.n != len(q.UidList.Uids).
708
        switch srcFn.fnType {
32,738✔
709
        case notAFunction, compareScalarFn, hasFn, uidInFn:
15,982✔
710
                if srcFn.n != len(q.UidList.GetUids()) {
15,982✔
711
                        return errors.Errorf("srcFn.n: %d is not equal to len(q.UidList.Uids): %d, srcFn: %+v in "+
×
712
                                "handleUidPostings", srcFn.n, len(q.UidList.GetUids()), srcFn)
×
713
                }
×
714
        }
715

716
        // Divide the task into many goroutines.
717
        numGo, width := x.DivideAndRule(srcFn.n)
32,738✔
718
        x.AssertTrue(width > 0)
32,738✔
719
        span.Annotatef(nil, "Width: %d. NumGo: %d", width, numGo)
32,738✔
720

32,738✔
721
        errCh := make(chan error, numGo)
32,738✔
722
        outputs := make([]*pb.Result, numGo)
32,738✔
723

32,738✔
724
        calculate := func(start, end int) error {
66,158✔
725
                x.AssertTrue(start%width == 0)
33,420✔
726
                out := &pb.Result{}
33,420✔
727
                outputs[start/width] = out
33,420✔
728

33,420✔
729
                for i := start; i < end; i++ {
1,137,569✔
730
                        if i%100 == 0 {
1,147,474✔
731
                                select {
43,325✔
732
                                case <-ctx.Done():
×
733
                                        return ctx.Err()
×
734
                                default:
43,325✔
735
                                }
736
                        }
737
                        var key []byte
1,104,149✔
738
                        switch srcFn.fnType {
1,104,149✔
739
                        case notAFunction, compareScalarFn, hasFn, uidInFn:
1,057,968✔
740
                                if q.Reverse {
1,126,855✔
741
                                        key = x.ReverseKey(q.Attr, q.UidList.Uids[i])
68,887✔
742
                                } else {
1,057,968✔
743
                                        key = x.DataKey(q.Attr, q.UidList.Uids[i])
989,081✔
744
                                }
989,081✔
745
                        case geoFn, regexFn, fullTextSearchFn, standardFn, customIndexFn, matchFn,
746
                                compareAttrFn:
46,181✔
747
                                key = x.IndexKey(q.Attr, srcFn.tokens[i])
46,181✔
748
                        default:
×
749
                                return errors.Errorf("Unhandled function in handleUidPostings: %s", srcFn.fname)
×
750
                        }
751

752
                        // Get or create the posting list for an entity, attribute combination.
753
                        pl, err := qs.cache.Get(key)
1,104,149✔
754
                        if err != nil {
1,104,149✔
755
                                return err
×
756
                        }
×
757

758
                        switch {
1,104,149✔
759
                        case q.DoCount:
104✔
760
                                if i == 0 {
154✔
761
                                        span.Annotate(nil, "DoCount")
50✔
762
                                }
50✔
763
                                count, err := countForUidPostings(args, pl, facetsTree, opts)
104✔
764
                                if err != nil {
104✔
765
                                        return err
×
766
                                }
×
767
                                out.Counts = append(out.Counts, uint32(count))
104✔
768
                                // Add an empty UID list to make later processing consistent.
104✔
769
                                out.UidMatrix = append(out.UidMatrix, &pb.List{})
104✔
770
                        case srcFn.fnType == compareScalarFn:
21✔
771
                                if i == 0 {
30✔
772
                                        span.Annotate(nil, "CompareScalarFn")
9✔
773
                                }
9✔
774
                                len := pl.Length(args.q.ReadTs, 0)
21✔
775
                                if len == -1 {
21✔
776
                                        return posting.ErrTsTooOld
×
777
                                }
×
778
                                count := int64(len)
21✔
779
                                if evalCompare(srcFn.fname, count, srcFn.threshold[0]) {
35✔
780
                                        tlist := &pb.List{Uids: []uint64{q.UidList.Uids[i]}}
14✔
781
                                        out.UidMatrix = append(out.UidMatrix, tlist)
14✔
782
                                }
14✔
783
                        case srcFn.fnType == hasFn:
88✔
784
                                if i == 0 {
140✔
785
                                        span.Annotate(nil, "HasFn")
52✔
786
                                }
52✔
787
                                empty, err := pl.IsEmpty(args.q.ReadTs, 0)
88✔
788
                                if err != nil {
88✔
789
                                        return err
×
790
                                }
×
791
                                if !empty {
150✔
792
                                        tlist := &pb.List{Uids: []uint64{q.UidList.Uids[i]}}
62✔
793
                                        out.UidMatrix = append(out.UidMatrix, tlist)
62✔
794
                                }
62✔
795
                        case srcFn.fnType == uidInFn:
957,681✔
796
                                if i == 0 {
957,706✔
797
                                        span.Annotate(nil, "UidInFn")
25✔
798
                                }
25✔
799
                                reqList := &pb.List{Uids: srcFn.uidsPresent}
957,681✔
800
                                topts := posting.ListOptions{
957,681✔
801
                                        ReadTs:    args.q.ReadTs,
957,681✔
802
                                        AfterUid:  0,
957,681✔
803
                                        Intersect: reqList,
957,681✔
804
                                        First:     int(args.q.First + args.q.Offset),
957,681✔
805
                                }
957,681✔
806
                                plist, err := pl.Uids(topts)
957,681✔
807
                                if err != nil {
957,681✔
808
                                        return err
×
809
                                }
×
810
                                if len(plist.Uids) > 0 {
1,025,026✔
811
                                        tlist := &pb.List{Uids: []uint64{q.UidList.Uids[i]}}
67,345✔
812
                                        out.UidMatrix = append(out.UidMatrix, tlist)
67,345✔
813
                                }
67,345✔
814
                        case q.FacetParam != nil || facetsTree != nil:
378✔
815
                                if i == 0 {
602✔
816
                                        span.Annotate(nil, "default with facets")
224✔
817
                                }
224✔
818
                                uidList, fcsList, err := retrieveUidsAndFacets(args, pl, facetsTree, opts)
378✔
819
                                if err != nil {
378✔
820
                                        return err
×
821
                                }
×
822
                                out.UidMatrix = append(out.UidMatrix, uidList)
378✔
823
                                if q.FacetParam != nil {
718✔
824
                                        out.FacetMatrix = append(out.FacetMatrix, &pb.FacetsList{FacetsList: fcsList})
340✔
825
                                }
340✔
826
                        default:
145,877✔
827
                                if i == 0 {
178,255✔
828
                                        span.Annotate(nil, "default no facets")
32,378✔
829
                                }
32,378✔
830
                                uidList, err := pl.Uids(opts)
145,877✔
831
                                if err != nil {
145,877✔
832
                                        return err
×
833
                                }
×
834
                                out.UidMatrix = append(out.UidMatrix, uidList)
145,877✔
835
                        }
836
                }
837
                return nil
33,420✔
838
        } // End of calculate function.
839

840
        for i := 0; i < numGo; i++ {
66,158✔
841
                start := i * width
33,420✔
842
                end := start + width
33,420✔
843
                if end > srcFn.n {
33,444✔
844
                        end = srcFn.n
24✔
845
                }
24✔
846
                go func(start, end int) {
66,840✔
847
                        errCh <- calculate(start, end)
33,420✔
848
                }(start, end)
33,420✔
849
        }
850
        for i := 0; i < numGo; i++ {
66,158✔
851
                if err := <-errCh; err != nil {
33,420✔
852
                        return err
×
853
                }
×
854
        }
855
        // All goroutines are done. Now attach their results.
856
        out := args.out
32,738✔
857
        for _, chunk := range outputs {
66,158✔
858
                out.FacetMatrix = append(out.FacetMatrix, chunk.FacetMatrix...)
33,420✔
859
                out.Counts = append(out.Counts, chunk.Counts...)
33,420✔
860
                out.UidMatrix = append(out.UidMatrix, chunk.UidMatrix...)
33,420✔
861
        }
33,420✔
862
        var total int
32,738✔
863
        for _, list := range out.UidMatrix {
246,518✔
864
                total += len(list.Uids)
213,780✔
865
        }
213,780✔
866
        span.Annotatef(nil, "Total number of elements in matrix: %d", total)
32,738✔
867
        return nil
32,738✔
868
}
869

870
const (
871
        // UseTxnCache indicates the transaction cache should be used.
872
        UseTxnCache = iota
873
        // NoCache indicates no caches should be used.
874
        NoCache
875
)
876

877
// processTask processes the query, accumulates and returns the result.
878
func processTask(ctx context.Context, q *pb.Query, gid uint32) (*pb.Result, error) {
84,176✔
879
        ctx, span := otrace.StartSpan(ctx, "processTask."+q.Attr)
84,176✔
880
        defer span.End()
84,176✔
881

84,176✔
882
        stop := x.SpanTimer(span, "processTask"+q.Attr)
84,176✔
883
        defer stop()
84,176✔
884

84,176✔
885
        span.Annotatef(nil, "Waiting for startTs: %d at node: %d, gid: %d",
84,176✔
886
                q.ReadTs, groups().Node.Id, gid)
84,176✔
887
        if err := posting.Oracle().WaitForTs(ctx, q.ReadTs); err != nil {
84,178✔
888
                return nil, err
2✔
889
        }
2✔
890
        if span != nil {
168,348✔
891
                maxAssigned := posting.Oracle().MaxAssigned()
84,174✔
892
                span.Annotatef(nil, "Done waiting for maxAssigned. Attr: %q ReadTs: %d Max: %d",
84,174✔
893
                        q.Attr, q.ReadTs, maxAssigned)
84,174✔
894
        }
84,174✔
895
        if err := groups().ChecksumsMatch(ctx); err != nil {
84,176✔
896
                return nil, err
2✔
897
        }
2✔
898
        span.Annotatef(nil, "Done waiting for checksum match")
84,172✔
899

84,172✔
900
        // If a group stops serving tablet and it gets partitioned away from group
84,172✔
901
        // zero, then it wouldn't know that this group is no longer serving this
84,172✔
902
        // predicate. There's no issue if a we are serving a particular tablet and
84,172✔
903
        // we get partitioned away from group zero as long as it's not removed.
84,172✔
904
        // BelongsToReadOnly is called instead of BelongsTo to prevent this alpha
84,172✔
905
        // from requesting to serve this tablet.
84,172✔
906
        knownGid, err := groups().BelongsToReadOnly(q.Attr, q.ReadTs)
84,172✔
907
        switch {
84,172✔
908
        case err != nil:
×
909
                return nil, err
×
910
        case knownGid == 0:
×
911
                return nil, errNonExistentTablet
×
912
        case knownGid != groups().groupId():
×
913
                return nil, errUnservedTablet
×
914
        }
915

916
        var qs queryState
84,172✔
917
        if q.Cache == UseTxnCache {
166,338✔
918
                qs.cache = posting.Oracle().CacheAt(q.ReadTs)
82,166✔
919
        }
82,166✔
920
        if qs.cache == nil {
156,570✔
921
                qs.cache = posting.NoCache(q.ReadTs)
72,398✔
922
        }
72,398✔
923
        // For now, remove the query level cache. It is causing contention for queries with high
924
        // fan-out.
925
        out, err := qs.helpProcessTask(ctx, q, gid)
84,172✔
926
        if err != nil {
84,375✔
927
                return nil, err
203✔
928
        }
203✔
929
        return out, nil
83,969✔
930
}
931

932
type queryState struct {
933
        cache *posting.LocalCache
934
}
935

936
func (qs *queryState) helpProcessTask(ctx context.Context, q *pb.Query, gid uint32) (
937
        *pb.Result, error) {
84,172✔
938

84,172✔
939
        span := otrace.FromContext(ctx)
84,172✔
940
        out := new(pb.Result)
84,172✔
941
        attr := q.Attr
84,172✔
942

84,172✔
943
        srcFn, err := parseSrcFn(ctx, q)
84,172✔
944
        if err != nil {
84,320✔
945
                return nil, err
148✔
946
        }
148✔
947

948
        if q.Reverse && !schema.State().IsReversed(ctx, attr) {
84,068✔
949
                return nil, errors.Errorf("Predicate %s doesn't have reverse edge", x.ParseAttr(attr))
44✔
950
        }
44✔
951

952
        if needsIndex(srcFn.fnType, q.UidList) && !schema.State().IsIndexed(ctx, q.Attr) {
83,985✔
953
                return nil, errors.Errorf("Predicate %s is not indexed", x.ParseAttr(q.Attr))
5✔
954
        }
5✔
955

956
        if len(q.Langs) > 0 && !schema.State().HasLang(attr) {
83,975✔
957
                return nil, errors.Errorf("Language tags can only be used with predicates of string type"+
×
958
                        " having @lang directive in schema. Got: [%v]", x.ParseAttr(attr))
×
959
        }
×
960
        if len(q.Langs) == 1 && q.Langs[0] == "*" {
83,978✔
961
                // Reset the Langs fields. The ExpandAll field is set to true already so there's no
3✔
962
                // more need to store the star value in this field.
3✔
963
                q.Langs = nil
3✔
964
        }
3✔
965

966
        typ, err := schema.State().TypeOf(attr)
83,975✔
967
        if err != nil {
84,110✔
968
                // All schema checks are done before this, this type is only used to
135✔
969
                // convert it to schema type before returning.
135✔
970
                // Schema type won't be present only if there is no data for that predicate
135✔
971
                // or if we load through bulk loader.
135✔
972
                typ = types.DefaultID
135✔
973
        }
135✔
974
        out.List = schema.State().IsList(attr)
83,975✔
975
        srcFn.atype = typ
83,975✔
976

83,975✔
977
        // Reverse attributes might have more than 1 results even if the original attribute
83,975✔
978
        // is not a list.
83,975✔
979
        if q.Reverse {
89,837✔
980
                out.List = true
5,862✔
981
        }
5,862✔
982

983
        opts := posting.ListOptions{
83,975✔
984
                ReadTs:   q.ReadTs,
83,975✔
985
                AfterUid: q.AfterUid,
83,975✔
986
                First:    int(q.First + q.Offset),
83,975✔
987
        }
83,975✔
988
        // If we have srcFunc and Uids, it means its a filter. So we intersect.
83,975✔
989
        if srcFn.fnType != notAFunction && q.UidList != nil && len(q.UidList.Uids) > 0 {
88,697✔
990
                opts.Intersect = q.UidList
4,722✔
991
        }
4,722✔
992

993
        args := funcArgs{q, gid, srcFn, out}
83,975✔
994
        needsValPostings, err := srcFn.needsValuePostings(typ)
83,975✔
995
        if err != nil {
83,975✔
996
                return nil, err
×
997
        }
×
998
        if needsValPostings {
131,647✔
999
                span.Annotate(nil, "handleValuePostings")
47,672✔
1000
                if err = qs.handleValuePostings(ctx, args); err != nil {
47,673✔
1001
                        return nil, err
1✔
1002
                }
1✔
1003
        } else {
36,303✔
1004
                span.Annotate(nil, "handleUidPostings")
36,303✔
1005
                if err = qs.handleUidPostings(ctx, args, opts); err != nil {
36,303✔
1006
                        return nil, err
×
1007
                }
×
1008
        }
1009

1010
        if srcFn.fnType == hasFn && srcFn.isFuncAtRoot {
87,063✔
1011
                span.Annotate(nil, "handleHasFunction")
3,089✔
1012
                if err := qs.handleHasFunction(ctx, q, out, srcFn); err != nil {
3,089✔
1013
                        return nil, err
×
1014
                }
×
1015
        }
1016

1017
        if srcFn.fnType == compareScalarFn && srcFn.isFuncAtRoot {
84,007✔
1018
                span.Annotate(nil, "handleCompareScalarFunction")
33✔
1019
                if err := qs.handleCompareScalarFunction(ctx, args); err != nil {
35✔
1020
                        return nil, err
2✔
1021
                }
2✔
1022
        }
1023

1024
        if srcFn.fnType == regexFn {
84,080✔
1025
                span.Annotate(nil, "handleRegexFunction")
108✔
1026
                if err := qs.handleRegexFunction(ctx, args); err != nil {
110✔
1027
                        return nil, err
2✔
1028
                }
2✔
1029
        }
1030

1031
        if srcFn.fnType == matchFn {
83,988✔
1032
                span.Annotate(nil, "handleMatchFunction")
18✔
1033
                if err := qs.handleMatchFunction(ctx, args); err != nil {
18✔
1034
                        return nil, err
×
1035
                }
×
1036
        }
1037

1038
        // We fetch the actual value for the uids, compare them to the value in the
1039
        // request and filter the uids only if the tokenizer IsLossy.
1040
        if srcFn.fnType == compareAttrFn && len(srcFn.tokens) > 0 {
100,402✔
1041
                span.Annotate(nil, "handleCompareFunction")
16,432✔
1042
                if err := qs.handleCompareFunction(ctx, args); err != nil {
16,433✔
1043
                        return nil, err
1✔
1044
                }
1✔
1045
        }
1046

1047
        // If geo filter, do value check for correctness.
1048
        if srcFn.geoQuery != nil {
83,988✔
1049
                span.Annotate(nil, "handleGeoFunction")
19✔
1050
                if err := qs.filterGeoFunction(ctx, args); err != nil {
19✔
1051
                        return nil, err
×
1052
                }
×
1053
        }
1054

1055
        // For string matching functions, check the language. We are not checking here
1056
        // for hasFn as filtering for it has already been done in handleHasFunction.
1057
        if srcFn.fnType != hasFn && needsStringFiltering(srcFn, q.Langs, attr) {
84,271✔
1058
                span.Annotate(nil, "filterStringFunction")
302✔
1059
                if err := qs.filterStringFunction(args); err != nil {
302✔
1060
                        return nil, err
×
1061
                }
×
1062
        }
1063

1064
        out.IntersectDest = srcFn.intersectDest
83,969✔
1065
        return out, nil
83,969✔
1066
}
1067

1068
func needsStringFiltering(srcFn *functionContext, langs []string, attr string) bool {
83,917✔
1069
        if !srcFn.isStringFn {
150,888✔
1070
                return false
66,971✔
1071
        }
66,971✔
1072

1073
        // If a predicate doesn't have @lang directive in schema, we don't need to do any string
1074
        // filtering.
1075
        if !schema.State().HasLang(attr) {
33,515✔
1076
                return false
16,569✔
1077
        }
16,569✔
1078

1079
        return langForFunc(langs) != "." &&
377✔
1080
                (srcFn.fnType == standardFn || srcFn.fnType == hasFn ||
377✔
1081
                        srcFn.fnType == fullTextSearchFn || srcFn.fnType == compareAttrFn ||
377✔
1082
                        srcFn.fnType == customIndexFn)
377✔
1083
}
1084

1085
func (qs *queryState) handleCompareScalarFunction(ctx context.Context, arg funcArgs) error {
33✔
1086
        attr := arg.q.Attr
33✔
1087
        if ok := schema.State().HasCount(ctx, attr); !ok {
34✔
1088
                return errors.Errorf("Need @count directive in schema for attr: %s for fn: %s at root",
1✔
1089
                        x.ParseAttr(attr), arg.srcFn.fname)
1✔
1090
        }
1✔
1091
        counts := arg.srcFn.threshold
32✔
1092
        cp := countParams{
32✔
1093
                fn:      arg.srcFn.fname,
32✔
1094
                counts:  counts,
32✔
1095
                attr:    attr,
32✔
1096
                gid:     arg.gid,
32✔
1097
                readTs:  arg.q.ReadTs,
32✔
1098
                reverse: arg.q.Reverse,
32✔
1099
        }
32✔
1100
        return qs.evaluate(cp, arg.out)
32✔
1101
}
1102

1103
func (qs *queryState) handleRegexFunction(ctx context.Context, arg funcArgs) error {
108✔
1104
        span := otrace.FromContext(ctx)
108✔
1105
        stop := x.SpanTimer(span, "handleRegexFunction")
108✔
1106
        defer stop()
108✔
1107
        if span != nil {
216✔
1108
                span.Annotatef(nil, "Number of uids: %d. args.srcFn: %+v", arg.srcFn.n, arg.srcFn)
108✔
1109
        }
108✔
1110

1111
        attr := arg.q.Attr
108✔
1112
        typ, err := schema.State().TypeOf(attr)
108✔
1113
        span.Annotatef(nil, "Attr: %s. Type: %s", attr, typ.Name())
108✔
1114
        if err != nil || !typ.IsScalar() {
108✔
1115
                return errors.Errorf("Attribute not scalar: %s %v", x.ParseAttr(attr), typ)
×
1116
        }
×
1117
        if typ != types.StringID {
109✔
1118
                return errors.Errorf("Got non-string type. Regex match is allowed only on string type.")
1✔
1119
        }
1✔
1120
        useIndex := schema.State().HasTokenizer(ctx, tok.IdentTrigram, attr)
107✔
1121
        span.Annotatef(nil, "Trigram index found: %t, func at root: %t",
107✔
1122
                useIndex, arg.srcFn.isFuncAtRoot)
107✔
1123

107✔
1124
        query := cindex.RegexpQuery(arg.srcFn.regex.Syntax)
107✔
1125
        empty := pb.List{}
107✔
1126
        var uids *pb.List
107✔
1127

107✔
1128
        // Here we determine the list of uids to match.
107✔
1129
        switch {
107✔
1130
        // If this is a filter eval, use the given uid list (good)
1131
        case arg.q.UidList != nil:
97✔
1132
                // These UIDs are copied into arg.out.UidMatrix which is later updated while
97✔
1133
                // processing the query. The below trick makes a copy of the list to avoid the
97✔
1134
                // race conditions later. I (Aman) did a race condition tests to ensure that we
97✔
1135
                // do not have more race condition in similar code in the rest of the file.
97✔
1136
                // The race condition was found only here because in filter condition, even when
97✔
1137
                // predicates do not have indexes, we allow regexp queries (for example, we do
97✔
1138
                // not support eq/gt/lt/le in @filter, see #4077), and this was new code that
97✔
1139
                // was added just to support the aforementioned case, the race condition is only
97✔
1140
                // in this part of the code.
97✔
1141
                uids = &pb.List{}
97✔
1142
                uids.Uids = append(arg.q.UidList.Uids[:0:0], arg.q.UidList.Uids...)
97✔
1143

1144
        // Prefer to use an index (fast)
1145
        case useIndex:
9✔
1146
                uids, err = uidsForRegex(attr, arg, query, &empty)
9✔
1147
                if err != nil {
9✔
1148
                        return err
×
1149
                }
×
1150

1151
        // No index and at root, return error instructing user to use `has` or index.
1152
        default:
1✔
1153
                return errors.Errorf(
1✔
1154
                        "Attribute %v does not have trigram index for regex matching. "+
1✔
1155
                                "Please add a trigram index or use has/uid function with regexp() as filter.",
1✔
1156
                        x.ParseAttr(attr))
1✔
1157
        }
1158

1159
        arg.out.UidMatrix = append(arg.out.UidMatrix, uids)
106✔
1160
        isList := schema.State().IsList(attr)
106✔
1161
        lang := langForFunc(arg.q.Langs)
106✔
1162

106✔
1163
        span.Annotatef(nil, "Total uids: %d, list: %t lang: %v", len(uids.Uids), isList, lang)
106✔
1164

106✔
1165
        filtered := &pb.List{}
106✔
1166
        for _, uid := range uids.Uids {
765✔
1167
                select {
659✔
1168
                case <-ctx.Done():
×
1169
                        return ctx.Err()
×
1170
                default:
659✔
1171
                }
1172
                pl, err := qs.cache.Get(x.DataKey(attr, uid))
659✔
1173
                if err != nil {
659✔
1174
                        return err
×
1175
                }
×
1176

1177
                vals := make([]types.Val, 1)
659✔
1178
                switch {
659✔
1179
                case lang != "":
10✔
1180
                        vals[0], err = pl.ValueForTag(arg.q.ReadTs, lang)
10✔
1181

1182
                case isList:
6✔
1183
                        vals, err = pl.AllUntaggedValues(arg.q.ReadTs)
6✔
1184

1185
                default:
643✔
1186
                        vals[0], err = pl.Value(arg.q.ReadTs)
643✔
1187
                }
1188
                if err != nil {
668✔
1189
                        if err == posting.ErrNoValue {
18✔
1190
                                continue
9✔
1191
                        }
1192
                        return err
×
1193
                }
1194

1195
                for _, val := range vals {
1,300✔
1196
                        // convert data from binary to appropriate format
650✔
1197
                        strVal, err := types.Convert(val, types.StringID)
650✔
1198
                        if err == nil && matchRegex(strVal, arg.srcFn.regex) {
756✔
1199
                                filtered.Uids = append(filtered.Uids, uid)
106✔
1200
                                // NOTE: We only add the uid once.
106✔
1201
                                break
106✔
1202
                        }
1203
                }
1204
        }
1205

1206
        for i := 0; i < len(arg.out.UidMatrix); i++ {
212✔
1207
                algo.IntersectWith(arg.out.UidMatrix[i], filtered, arg.out.UidMatrix[i])
106✔
1208
        }
106✔
1209

1210
        return nil
106✔
1211
}
1212

1213
func (qs *queryState) handleCompareFunction(ctx context.Context, arg funcArgs) error {
16,432✔
1214
        span := otrace.FromContext(ctx)
16,432✔
1215
        stop := x.SpanTimer(span, "handleCompareFunction")
16,432✔
1216
        defer stop()
16,432✔
1217
        if span != nil {
32,864✔
1218
                span.Annotatef(nil, "Number of uids: %d. args.srcFn: %+v", arg.srcFn.n, arg.srcFn)
16,432✔
1219
        }
16,432✔
1220

1221
        attr := arg.q.Attr
16,432✔
1222
        span.Annotatef(nil, "Attr: %s. Fname: %s", attr, arg.srcFn.fname)
16,432✔
1223
        tokenizer, err := pickTokenizer(ctx, attr, arg.srcFn.fname)
16,432✔
1224
        if err != nil {
16,433✔
1225
                return err
1✔
1226
        }
1✔
1227

1228
        // Only if the tokenizer that we used IsLossy
1229
        // then we need to fetch and compare the actual values.
1230
        span.Annotatef(nil, "Tokenizer: %s, Lossy: %t", tokenizer.Name(), tokenizer.IsLossy())
16,431✔
1231

16,431✔
1232
        if !tokenizer.IsLossy() {
32,743✔
1233
                return nil
16,312✔
1234
        }
16,312✔
1235

1236
        // Need to evaluate inequality for entries in the first bucket.
1237
        typ, err := schema.State().TypeOf(attr)
119✔
1238
        if err != nil || !typ.IsScalar() {
119✔
1239
                return errors.Errorf("Attribute not scalar: %s %v", x.ParseAttr(attr), typ)
×
1240
        }
×
1241

1242
        x.AssertTrue(len(arg.out.UidMatrix) > 0)
119✔
1243
        isList := schema.State().IsList(attr)
119✔
1244
        lang := langForFunc(arg.q.Langs)
119✔
1245

119✔
1246
        filterRow := func(row int, compareFunc func(types.Val) bool) error {
251✔
1247
                select {
132✔
1248
                case <-ctx.Done():
×
1249
                        return ctx.Err()
×
1250
                default:
132✔
1251
                }
1252

1253
                var filterErr error
132✔
1254
                algo.ApplyFilter(arg.out.UidMatrix[row], func(uid uint64, i int) bool {
1,088✔
1255
                        switch lang {
956✔
1256
                        case "":
954✔
1257
                                if isList {
967✔
1258
                                        pl, err := posting.GetNoStore(x.DataKey(attr, uid), arg.q.ReadTs)
13✔
1259
                                        if err != nil {
13✔
1260
                                                filterErr = err
×
1261
                                                return false
×
1262
                                        }
×
1263
                                        svs, err := pl.AllUntaggedValues(arg.q.ReadTs)
13✔
1264
                                        if err != nil {
13✔
1265
                                                if err != posting.ErrNoValue {
×
1266
                                                        filterErr = err
×
1267
                                                }
×
1268
                                                return false
×
1269
                                        }
1270
                                        for _, sv := range svs {
28✔
1271
                                                dst, err := types.Convert(sv, typ)
15✔
1272
                                                if err == nil && compareFunc(dst) {
23✔
1273
                                                        return true
8✔
1274
                                                }
8✔
1275
                                        }
1276

1277
                                        return false
5✔
1278
                                }
1279

1280
                                pl, err := posting.GetNoStore(x.DataKey(attr, uid), arg.q.ReadTs)
941✔
1281
                                if err != nil {
941✔
1282
                                        filterErr = err
×
1283
                                        return false
×
1284
                                }
×
1285
                                sv, err := pl.Value(arg.q.ReadTs)
941✔
1286
                                if err != nil {
942✔
1287
                                        if err != posting.ErrNoValue {
1✔
1288
                                                filterErr = err
×
1289
                                        }
×
1290
                                        return false
1✔
1291
                                }
1292
                                dst, err := types.Convert(sv, typ)
940✔
1293
                                return err == nil && compareFunc(dst)
940✔
1294
                        case ".":
×
1295
                                pl, err := posting.GetNoStore(x.DataKey(attr, uid), arg.q.ReadTs)
×
1296
                                if err != nil {
×
1297
                                        filterErr = err
×
1298
                                        return false
×
1299
                                }
×
1300
                                values, err := pl.AllValues(arg.q.ReadTs) // does not return ErrNoValue
×
1301
                                if err != nil {
×
1302
                                        filterErr = err
×
1303
                                        return false
×
1304
                                }
×
1305
                                for _, sv := range values {
×
1306
                                        dst, err := types.Convert(sv, typ)
×
1307
                                        if err == nil && compareFunc(dst) {
×
1308
                                                return true
×
1309
                                        }
×
1310
                                }
1311
                                return false
×
1312
                        default:
2✔
1313
                                sv, err := fetchValue(uid, attr, arg.q.Langs, typ, arg.q.ReadTs)
2✔
1314
                                if err != nil {
2✔
1315
                                        if err != posting.ErrNoValue {
×
1316
                                                filterErr = err
×
1317
                                        }
×
1318
                                        return false
×
1319
                                }
1320
                                if sv.Value == nil {
2✔
1321
                                        return false
×
1322
                                }
×
1323
                                return compareFunc(sv)
2✔
1324
                        }
1325
                })
1326
                if filterErr != nil {
132✔
1327
                        return err
×
1328
                }
×
1329

1330
                return nil
132✔
1331
        }
1332

1333
        switch {
119✔
1334
        case arg.srcFn.fname == eq:
59✔
1335
                // If fn is eq, we could have multiple arguments and hence multiple rows to filter.
59✔
1336
                for row := 0; row < len(arg.srcFn.tokens); row++ {
128✔
1337
                        compareFunc := func(dst types.Val) bool {
179✔
1338
                                return types.CompareVals(arg.srcFn.fname, dst, arg.srcFn.eqTokens[row])
110✔
1339
                        }
110✔
1340
                        if err := filterRow(row, compareFunc); err != nil {
69✔
1341
                                return err
×
1342
                        }
×
1343
                }
1344
        case arg.srcFn.fname == between:
9✔
1345
                compareFunc := func(dst types.Val) bool {
26✔
1346
                        return types.CompareBetween(dst, arg.srcFn.eqTokens[0], arg.srcFn.eqTokens[1])
17✔
1347
                }
17✔
1348
                if err := filterRow(0, compareFunc); err != nil {
9✔
1349
                        return err
×
1350
                }
×
1351
                if err := filterRow(len(arg.out.UidMatrix)-1, compareFunc); err != nil {
9✔
1352
                        return err
×
1353
                }
×
1354
        case arg.srcFn.tokens[0] == arg.srcFn.ineqValueToken[0]:
45✔
1355
                // If operation is not eq and ineqValueToken equals first token,
45✔
1356
                // then we need to filter first row.
45✔
1357
                compareFunc := func(dst types.Val) bool {
875✔
1358
                        return types.CompareVals(arg.q.SrcFunc.Name, dst, arg.srcFn.eqTokens[0])
830✔
1359
                }
830✔
1360
                if err := filterRow(0, compareFunc); err != nil {
45✔
1361
                        return err
×
1362
                }
×
1363
        }
1364

1365
        return nil
119✔
1366
}
1367

1368
func (qs *queryState) handleMatchFunction(ctx context.Context, arg funcArgs) error {
18✔
1369
        span := otrace.FromContext(ctx)
18✔
1370
        stop := x.SpanTimer(span, "handleMatchFunction")
18✔
1371
        defer stop()
18✔
1372
        if span != nil {
36✔
1373
                span.Annotatef(nil, "Number of uids: %d. args.srcFn: %+v", arg.srcFn.n, arg.srcFn)
18✔
1374
        }
18✔
1375

1376
        attr := arg.q.Attr
18✔
1377
        typ := arg.srcFn.atype
18✔
1378
        span.Annotatef(nil, "Attr: %s. Type: %s", attr, typ.Name())
18✔
1379
        var uids *pb.List
18✔
1380
        switch {
18✔
1381
        case !typ.IsScalar():
×
1382
                return errors.Errorf("Attribute not scalar: %s %v", attr, typ)
×
1383

1384
        case typ != types.StringID:
×
1385
                return errors.Errorf("Got non-string type. Fuzzy match is allowed only on string type.")
×
1386

1387
        case arg.q.UidList != nil && len(arg.q.UidList.Uids) != 0:
1✔
1388
                uids = arg.q.UidList
1✔
1389

1390
        case schema.State().HasTokenizer(ctx, tok.IdentTrigram, attr):
17✔
1391
                var err error
17✔
1392
                uids, err = uidsForMatch(attr, arg)
17✔
1393
                if err != nil {
17✔
1394
                        return err
×
1395
                }
×
1396

1397
        default:
×
1398
                return errors.Errorf(
×
1399
                        "Attribute %v does not have trigram index for fuzzy matching. "+
×
1400
                                "Please add a trigram index or use has/uid function with match() as filter.",
×
1401
                        x.ParseAttr(attr))
×
1402
        }
1403

1404
        isList := schema.State().IsList(attr)
18✔
1405
        lang := langForFunc(arg.q.Langs)
18✔
1406
        span.Annotatef(nil, "Total uids: %d, list: %t lang: %v", len(uids.Uids), isList, lang)
18✔
1407
        arg.out.UidMatrix = append(arg.out.UidMatrix, uids)
18✔
1408

18✔
1409
        matchQuery := strings.Join(arg.srcFn.tokens, "")
18✔
1410
        filtered := &pb.List{}
18✔
1411
        for _, uid := range uids.Uids {
124✔
1412
                select {
106✔
1413
                case <-ctx.Done():
×
1414
                        return ctx.Err()
×
1415
                default:
106✔
1416
                }
1417
                pl, err := qs.cache.Get(x.DataKey(attr, uid))
106✔
1418
                if err != nil {
106✔
1419
                        return err
×
1420
                }
×
1421

1422
                vals := make([]types.Val, 1)
106✔
1423
                switch {
106✔
1424
                case lang != "":
×
1425
                        vals[0], err = pl.ValueForTag(arg.q.ReadTs, lang)
×
1426

1427
                case isList:
×
1428
                        vals, err = pl.AllUntaggedValues(arg.q.ReadTs)
×
1429

1430
                default:
106✔
1431
                        vals[0], err = pl.Value(arg.q.ReadTs)
106✔
1432
                }
1433
                if err != nil {
106✔
1434
                        if err == posting.ErrNoValue {
×
1435
                                continue
×
1436
                        }
1437
                        return err
×
1438
                }
1439

1440
                max := int(arg.srcFn.threshold[0])
106✔
1441
                for _, val := range vals {
212✔
1442
                        // convert data from binary to appropriate format
106✔
1443
                        strVal, err := types.Convert(val, types.StringID)
106✔
1444
                        if err == nil && matchFuzzy(matchQuery, strVal.Value.(string), max) {
177✔
1445
                                filtered.Uids = append(filtered.Uids, uid)
71✔
1446
                                // NOTE: We only add the uid once.
71✔
1447
                                break
71✔
1448
                        }
1449
                }
1450
        }
1451

1452
        for i := 0; i < len(arg.out.UidMatrix); i++ {
54✔
1453
                algo.IntersectWith(arg.out.UidMatrix[i], filtered, arg.out.UidMatrix[i])
36✔
1454
        }
36✔
1455

1456
        return nil
18✔
1457
}
1458

1459
func (qs *queryState) filterGeoFunction(ctx context.Context, arg funcArgs) error {
19✔
1460
        span := otrace.FromContext(ctx)
19✔
1461
        stop := x.SpanTimer(span, "filterGeoFunction")
19✔
1462
        defer stop()
19✔
1463

19✔
1464
        attr := arg.q.Attr
19✔
1465
        uids := algo.MergeSorted(arg.out.UidMatrix)
19✔
1466
        numGo, width := x.DivideAndRule(len(uids.Uids))
19✔
1467
        if span != nil && numGo > 1 {
19✔
1468
                span.Annotatef(nil, "Number of uids: %d. NumGo: %d. Width: %d\n",
×
1469
                        len(uids.Uids), numGo, width)
×
1470
        }
×
1471

1472
        filtered := make([]*pb.List, numGo)
19✔
1473
        filter := func(idx, start, end int) error {
38✔
1474
                filtered[idx] = &pb.List{}
19✔
1475
                out := filtered[idx]
19✔
1476
                for _, uid := range uids.Uids[start:end] {
93✔
1477
                        pl, err := qs.cache.Get(x.DataKey(attr, uid))
74✔
1478
                        if err != nil {
74✔
1479
                                return err
×
1480
                        }
×
1481
                        var tv pb.TaskValue
74✔
1482
                        err = pl.Iterate(arg.q.ReadTs, 0, func(p *pb.Posting) error {
149✔
1483
                                tv.ValType = p.ValType
75✔
1484
                                tv.Val = p.Value
75✔
1485
                                if types.MatchGeo(&tv, arg.srcFn.geoQuery) {
132✔
1486
                                        out.Uids = append(out.Uids, uid)
57✔
1487
                                        return posting.ErrStopIteration
57✔
1488
                                }
57✔
1489
                                return nil
18✔
1490
                        })
1491
                        if err != nil {
74✔
1492
                                return err
×
1493
                        }
×
1494
                }
1495
                return nil
19✔
1496
        }
1497

1498
        errCh := make(chan error, numGo)
19✔
1499
        for i := 0; i < numGo; i++ {
38✔
1500
                start := i * width
19✔
1501
                end := start + width
19✔
1502
                if end > len(uids.Uids) {
19✔
1503
                        end = len(uids.Uids)
×
1504
                }
×
1505
                go func(idx, start, end int) {
38✔
1506
                        errCh <- filter(idx, start, end)
19✔
1507
                }(i, start, end)
19✔
1508
        }
1509
        for i := 0; i < numGo; i++ {
38✔
1510
                if err := <-errCh; err != nil {
19✔
1511
                        return err
×
1512
                }
×
1513
        }
1514
        final := &pb.List{}
19✔
1515
        for _, out := range filtered {
38✔
1516
                final.Uids = append(final.Uids, out.Uids...)
19✔
1517
        }
19✔
1518
        if span != nil && numGo > 1 {
19✔
1519
                span.Annotatef(nil, "Total uids after filtering geo: %d", len(final.Uids))
×
1520
        }
×
1521
        for i := 0; i < len(arg.out.UidMatrix); i++ {
25,510✔
1522
                algo.IntersectWith(arg.out.UidMatrix[i], final, arg.out.UidMatrix[i])
25,491✔
1523
        }
25,491✔
1524
        return nil
19✔
1525
}
1526

1527
// TODO: This function is really slow when there are a lot of UIDs to filter, for e.g. when used in
1528
// `has(name)`. We could potentially have a query level cache, which can be used to speed things up
1529
// a bit. Or, try to reduce the number of UIDs which make it here.
1530
func (qs *queryState) filterStringFunction(arg funcArgs) error {
302✔
1531
        if glog.V(3) {
302✔
1532
                glog.Infof("filterStringFunction. arg: %+v\n", arg.q)
×
1533
                defer glog.Infof("Done filterStringFunction")
×
1534
        }
×
1535
        attr := arg.q.Attr
302✔
1536
        uids := algo.MergeSorted(arg.out.UidMatrix)
302✔
1537
        var values [][]types.Val
302✔
1538
        filteredUids := make([]uint64, 0, len(uids.Uids))
302✔
1539
        lang := langForFunc(arg.q.Langs)
302✔
1540

302✔
1541
        // This iteration must be done in a serial order, because we're also storing the values in a
302✔
1542
        // matrix, to check it later.
302✔
1543
        // TODO: This function can be optimized by having a query specific cache, which can be populated
302✔
1544
        // by the handleHasFunction for e.g. for a `has(name)` query.
302✔
1545
        for _, uid := range uids.Uids {
1,188✔
1546
                vals, err := qs.getValsForUID(attr, lang, uid, arg.q.ReadTs)
886✔
1547
                switch {
886✔
1548
                case err == posting.ErrNoValue:
18✔
1549
                        continue
18✔
1550
                case err != nil:
×
1551
                        return err
×
1552
                }
1553

1554
                var strVals []types.Val
868✔
1555
                for _, v := range vals {
1,736✔
1556
                        // convert data from binary to appropriate format
868✔
1557
                        strVal, err := types.Convert(v, types.StringID)
868✔
1558
                        if err != nil {
868✔
1559
                                continue
×
1560
                        }
1561
                        strVals = append(strVals, strVal)
868✔
1562
                }
1563
                if len(strVals) > 0 {
1,736✔
1564
                        values = append(values, strVals)
868✔
1565
                        filteredUids = append(filteredUids, uid)
868✔
1566
                }
868✔
1567
        }
1568

1569
        filtered := &pb.List{Uids: filteredUids}
302✔
1570
        filter := stringFilter{
302✔
1571
                funcName: arg.srcFn.fname,
302✔
1572
                funcType: arg.srcFn.fnType,
302✔
1573
                lang:     lang,
302✔
1574
        }
302✔
1575

302✔
1576
        switch arg.srcFn.fnType {
302✔
1577
        case hasFn:
×
1578
                // Dont do anything, as filtering based on lang is already
1579
                // done above.
1580
        case fullTextSearchFn:
×
1581
                filter.tokens = arg.srcFn.tokens
×
1582
                filter.match = defaultMatch
×
1583
                filter.tokName = "fulltext"
×
1584
                filtered = matchStrings(filtered, values, &filter)
×
1585
        case standardFn:
144✔
1586
                filter.tokens = arg.srcFn.tokens
144✔
1587
                filter.match = defaultMatch
144✔
1588
                filter.tokName = "term"
144✔
1589
                filtered = matchStrings(filtered, values, &filter)
144✔
1590
        case customIndexFn:
1✔
1591
                filter.tokens = arg.srcFn.tokens
1✔
1592
                filter.match = defaultMatch
1✔
1593
                filter.tokName = arg.q.SrcFunc.Args[0]
1✔
1594
                filtered = matchStrings(filtered, values, &filter)
1✔
1595
        case compareAttrFn:
157✔
1596
                // filter.ineqValue = arg.srcFn.ineqValue
157✔
1597
                filter.eqVals = arg.srcFn.eqTokens
157✔
1598
                filter.match = ineqMatch
157✔
1599
                filtered = matchStrings(filtered, values, &filter)
157✔
1600
        }
1601

1602
        for i := 0; i < len(arg.out.UidMatrix); i++ {
1,017✔
1603
                algo.IntersectWith(arg.out.UidMatrix[i], filtered, arg.out.UidMatrix[i])
715✔
1604
        }
715✔
1605
        return nil
302✔
1606
}
1607

1608
func (qs *queryState) getValsForUID(attr, lang string, uid, ReadTs uint64) ([]types.Val, error) {
3,046✔
1609
        key := x.DataKey(attr, uid)
3,046✔
1610
        pl, err := qs.cache.Get(key)
3,046✔
1611
        if err != nil {
3,046✔
1612
                return nil, err
×
1613
        }
×
1614

1615
        var vals []types.Val
3,046✔
1616
        var val types.Val
3,046✔
1617
        if lang == "" {
5,624✔
1618
                if schema.State().IsList(attr) {
2,578✔
1619
                        // NOTE: we will never reach here if this function is called from handleHasFunction, as
×
1620
                        // @lang is not allowed for list predicates.
×
1621
                        vals, err = pl.AllValues(ReadTs)
×
1622
                } else {
2,578✔
1623
                        val, err = pl.Value(ReadTs)
2,578✔
1624
                        vals = append(vals, val)
2,578✔
1625
                }
2,578✔
1626
        } else {
468✔
1627
                val, err = pl.ValueForTag(ReadTs, lang)
468✔
1628
                vals = append(vals, val)
468✔
1629
        }
468✔
1630

1631
        return vals, err
3,046✔
1632
}
1633

1634
func matchRegex(value types.Val, regex *cregexp.Regexp) bool {
650✔
1635
        return len(value.Value.(string)) > 0 && regex.MatchString(value.Value.(string), true, true) > 0
650✔
1636
}
650✔
1637

1638
type functionContext struct {
1639
        tokens        []string
1640
        geoQuery      *types.GeoQueryData
1641
        intersectDest bool
1642
        // eqTokens is used by compareAttr functions. It stores values corresponding to each
1643
        // function argument. There could be multiple arguments to `eq` function but only one for
1644
        // other compareAttr functions.
1645
        // TODO(@Animesh): change field names which could explain their uses better. Check if we
1646
        // really need all of ineqValue, eqTokens, tokens
1647
        eqTokens       []types.Val
1648
        ineqValueToken []string
1649
        n              int
1650
        threshold      []int64
1651
        uidsPresent    []uint64
1652
        fname          string
1653
        fnType         FuncType
1654
        regex          *cregexp.Regexp
1655
        isFuncAtRoot   bool
1656
        isStringFn     bool
1657
        atype          types.TypeID
1658
}
1659

1660
const (
1661
        eq      = "eq" // equal
1662
        between = "between"
1663
)
1664

1665
func ensureArgsCount(srcFunc *pb.SrcFunction, expected int) error {
4,089✔
1666
        if len(srcFunc.Args) != expected {
4,092✔
1667
                return errors.Errorf("Function '%s' requires %d arguments, but got %d (%v)",
3✔
1668
                        srcFunc.Name, expected, len(srcFunc.Args), srcFunc.Args)
3✔
1669
        }
3✔
1670
        return nil
4,086✔
1671
}
1672

1673
func checkRoot(q *pb.Query, fc *functionContext) {
3,210✔
1674
        if q.UidList == nil {
6,334✔
1675
                // Fetch Uids from Store and populate in q.UidList.
3,124✔
1676
                fc.n = 0
3,124✔
1677
                fc.isFuncAtRoot = true
3,124✔
1678
        } else {
3,210✔
1679
                fc.n = len(q.UidList.Uids)
86✔
1680
        }
86✔
1681
}
1682

1683
// We allow atmost one lang in functions. We can inline in 1.9.
1684
func langForFunc(langs []string) string {
4,298✔
1685
        x.AssertTrue(len(langs) <= 1)
4,298✔
1686
        if len(langs) == 0 {
8,455✔
1687
                return ""
4,157✔
1688
        }
4,157✔
1689
        return langs[0]
141✔
1690
}
1691

1692
func parseSrcFn(ctx context.Context, q *pb.Query) (*functionContext, error) {
84,172✔
1693
        fnType, f := parseFuncType(q.SrcFunc)
84,172✔
1694
        attr := q.Attr
84,172✔
1695
        fc := &functionContext{fnType: fnType, fname: f}
84,172✔
1696
        isIndexedAttr := schema.State().IsIndexed(ctx, attr)
84,172✔
1697
        var err error
84,172✔
1698

84,172✔
1699
        t, err := schema.State().TypeOf(attr)
84,172✔
1700
        if err == nil && fnType != notAFunction && t.Name() == types.StringID.Name() {
101,150✔
1701
                fc.isStringFn = true
16,978✔
1702
        }
16,978✔
1703

1704
        switch fnType {
84,172✔
1705
        case notAFunction:
63,413✔
1706
                fc.n = len(q.UidList.Uids)
63,413✔
1707
        case aggregatorFn:
5✔
1708
                // confirm aggregator could apply on the attributes
5✔
1709
                typ, err := schema.State().TypeOf(attr)
5✔
1710
                if err != nil {
5✔
1711
                        return nil, errors.Errorf("Attribute %q is not scalar-type", x.ParseAttr(attr))
×
1712
                }
×
1713
                if !couldApplyAggregatorOn(f, typ) {
5✔
1714
                        return nil, errors.Errorf("Aggregator %q could not apply on %v",
×
1715
                                f, x.ParseAttr(attr))
×
1716
                }
×
1717
                fc.n = len(q.UidList.Uids)
5✔
1718
        case compareAttrFn:
16,613✔
1719
                args := q.SrcFunc.Args
16,613✔
1720
                if fc.fname == eq { // Only eq can have multiple args. It should have atleast one.
33,048✔
1721
                        if len(args) < 1 {
16,435✔
1722
                                return nil, errors.Errorf("eq expects atleast 1 argument.")
×
1723
                        }
×
1724
                } else if fc.fname == between { // between should have exactly 2 arguments.
210✔
1725
                        if len(args) != 2 {
32✔
1726
                                return nil, errors.Errorf("between expects exactly 2 argument.")
×
1727
                        }
×
1728
                } else { // Others can have only 1 arg.
146✔
1729
                        if len(args) != 1 {
147✔
1730
                                return nil, errors.Errorf("%+v expects only 1 argument. Got: %+v",
1✔
1731
                                        fc.fname, args)
1✔
1732
                        }
1✔
1733
                }
1734

1735
                var tokens []string
16,612✔
1736
                var ineqValues []types.Val
16,612✔
1737
                // eq can have multiple args.
16,612✔
1738
                for idx := 0; idx < len(args); idx++ {
33,378✔
1739
                        arg := args[idx]
16,766✔
1740
                        ineqValues = ineqValues[:0]
16,766✔
1741
                        ineqValue1, err := convertValue(attr, arg)
16,766✔
1742
                        if err != nil {
16,881✔
1743
                                return nil, errors.Errorf("Got error: %v while running: %v", err, q.SrcFunc)
115✔
1744
                        }
115✔
1745
                        ineqValues = append(ineqValues, ineqValue1)
16,651✔
1746
                        fc.eqTokens = append(fc.eqTokens, ineqValue1)
16,651✔
1747

16,651✔
1748
                        // in case of between also pass other value.
16,651✔
1749
                        if fc.fname == between {
16,683✔
1750
                                ineqValue2, err := convertValue(attr, args[idx+1])
32✔
1751
                                if err != nil {
32✔
1752
                                        return nil, errors.Errorf("Got error: %v while running: %v", err, q.SrcFunc)
×
1753
                                }
×
1754
                                idx++
32✔
1755
                                ineqValues = append(ineqValues, ineqValue2)
32✔
1756
                                fc.eqTokens = append(fc.eqTokens, ineqValue2)
32✔
1757
                        }
1758

1759
                        if !isIndexedAttr {
16,683✔
1760
                                // In case of non-indexed predicate we won't have any tokens.
32✔
1761
                                continue
32✔
1762
                        }
1763

1764
                        var lang string
16,619✔
1765
                        if len(q.Langs) > 0 {
16,668✔
1766
                                // Only one language is allowed.
49✔
1767
                                lang = q.Langs[0]
49✔
1768
                        }
49✔
1769

1770
                        // Get tokens ge/le ineqValueToken.
1771
                        if tokens, fc.ineqValueToken, err = getInequalityTokens(ctx, q.ReadTs, attr, f, lang,
16,619✔
1772
                                ineqValues); err != nil {
16,623✔
1773
                                return nil, err
4✔
1774
                        }
4✔
1775
                        if len(tokens) == 0 {
16,629✔
1776
                                continue
14✔
1777
                        }
1778
                        fc.tokens = append(fc.tokens, tokens...)
16,601✔
1779
                }
1780

1781
                // In case of non-indexed predicate, there won't be any tokens. We will fetch value
1782
                // from data keys.
1783
                // If number of index keys is more than no. of uids to filter, so its better to fetch values
1784
                // from data keys directly and compare. Lets make tokens empty.
1785
                // We don't do this for eq because eq could have multiple arguments and we would have to
1786
                // compare the value with all of them. Also eq would usually have less arguments, hence we
1787
                // won't be fetching many index keys.
1788
                switch {
16,493✔
1789
                case q.UidList != nil && !isIndexedAttr:
26✔
1790
                        fc.n = len(q.UidList.Uids)
26✔
1791
                case q.UidList != nil && len(fc.tokens) > len(q.UidList.Uids) && fc.fname != eq:
16✔
1792
                        fc.tokens = fc.tokens[:0]
16✔
1793
                        fc.n = len(q.UidList.Uids)
16✔
1794
                default:
16,451✔
1795
                        fc.n = len(fc.tokens)
16,451✔
1796
                }
1797
        case compareScalarFn:
45✔
1798
                argCount := 1
45✔
1799
                if q.SrcFunc.Name == between {
48✔
1800
                        argCount = 2
3✔
1801
                }
3✔
1802
                if err = ensureArgsCount(q.SrcFunc, argCount); err != nil {
47✔
1803
                        return nil, err
2✔
1804
                }
2✔
1805
                var thresholds []int64
43✔
1806
                for _, arg := range q.SrcFunc.Args {
89✔
1807
                        threshold, err := strconv.ParseInt(arg, 0, 64)
46✔
1808
                        if err != nil {
47✔
1809
                                return nil, errors.Wrapf(err, "Compare %v(%v) require digits, but got invalid num",
1✔
1810
                                        q.SrcFunc.Name, q.SrcFunc.Args[0])
1✔
1811
                        }
1✔
1812
                        thresholds = append(thresholds, threshold)
45✔
1813
                }
1814
                fc.threshold = thresholds
42✔
1815
                checkRoot(q, fc)
42✔
1816
        case geoFn:
23✔
1817
                // For geo functions, we get extra information used for filtering.
23✔
1818
                fc.tokens, fc.geoQuery, err = types.GetGeoTokens(q.SrcFunc)
23✔
1819
                tok.EncodeGeoTokens(fc.tokens)
23✔
1820
                if err != nil {
27✔
1821
                        return nil, err
4✔
1822
                }
4✔
1823
                fc.n = len(fc.tokens)
19✔
1824
        case passwordFn:
473✔
1825
                if err = ensureArgsCount(q.SrcFunc, 2); err != nil {
473✔
1826
                        return nil, err
×
1827
                }
×
1828
                fc.n = len(q.UidList.Uids)
473✔
1829
        case standardFn, fullTextSearchFn:
283✔
1830
                // srcfunc 0th val is func name and [2:] are args.
283✔
1831
                // we tokenize the arguments of the query.
283✔
1832
                if err = ensureArgsCount(q.SrcFunc, 1); err != nil {
283✔
1833
                        return nil, err
×
1834
                }
×
1835
                required, found := verifyStringIndex(ctx, attr, fnType)
283✔
1836
                if !found {
297✔
1837
                        return nil, errors.Errorf("Attribute %s is not indexed with type %s", x.ParseAttr(attr),
14✔
1838
                                required)
14✔
1839
                }
14✔
1840
                if fc.tokens, err = getStringTokens(q.SrcFunc.Args, langForFunc(q.Langs), fnType); err != nil {
269✔
1841
                        return nil, err
×
1842
                }
×
1843
                fc.intersectDest = needsIntersect(f)
269✔
1844
                fc.n = len(fc.tokens)
269✔
1845
        case matchFn:
19✔
1846
                if err = ensureArgsCount(q.SrcFunc, 2); err != nil {
19✔
1847
                        return nil, err
×
1848
                }
×
1849
                required, found := verifyStringIndex(ctx, attr, fnType)
19✔
1850
                if !found {
20✔
1851
                        return nil, errors.Errorf("Attribute %s is not indexed with type %s", x.ParseAttr(attr),
1✔
1852
                                required)
1✔
1853
                }
1✔
1854
                fc.intersectDest = needsIntersect(f)
18✔
1855
                // Max Levenshtein distance
18✔
1856
                var s string
18✔
1857
                s, q.SrcFunc.Args = q.SrcFunc.Args[1], q.SrcFunc.Args[:1]
18✔
1858
                max, err := strconv.ParseInt(s, 10, 32)
18✔
1859
                if err != nil {
18✔
1860
                        return nil, errors.Errorf("Levenshtein distance value must be an int, got %v", s)
×
1861
                }
×
1862
                if max < 0 {
18✔
1863
                        return nil, errors.Errorf("Levenshtein distance value must be greater than 0, got %v", s)
×
1864
                }
×
1865
                fc.threshold = []int64{max}
18✔
1866
                fc.tokens = q.SrcFunc.Args
18✔
1867
                fc.n = len(fc.tokens)
18✔
1868
        case customIndexFn:
18✔
1869
                if err = ensureArgsCount(q.SrcFunc, 2); err != nil {
18✔
1870
                        return nil, err
×
1871
                }
×
1872
                tokerName := q.SrcFunc.Args[0]
18✔
1873
                if !verifyCustomIndex(ctx, q.Attr, tokerName) {
18✔
1874
                        return nil, errors.Errorf("Attribute %s is not indexed with custom tokenizer %s",
×
1875
                                x.ParseAttr(q.Attr), tokerName)
×
1876
                }
×
1877
                valToTok, err := convertValue(q.Attr, q.SrcFunc.Args[1])
18✔
1878
                if err != nil {
18✔
1879
                        return nil, err
×
1880
                }
×
1881
                tokenizer, ok := tok.GetTokenizer(tokerName)
18✔
1882
                if !ok {
18✔
1883
                        return nil, errors.Errorf("Could not find tokenizer with name %q", tokerName)
×
1884
                }
×
1885
                fc.tokens, _ = tok.BuildTokens(valToTok.Value,
18✔
1886
                        tok.GetTokenizerForLang(tokenizer, langForFunc(q.Langs)))
18✔
1887
                fc.intersectDest = needsIntersect(f)
18✔
1888
                fc.n = len(fc.tokens)
18✔
1889
        case regexFn:
110✔
1890
                if err = ensureArgsCount(q.SrcFunc, 2); err != nil {
111✔
1891
                        return nil, err
1✔
1892
                }
1✔
1893
                ignoreCase := false
109✔
1894
                modifiers := q.SrcFunc.Args[1]
109✔
1895
                if len(modifiers) > 0 {
156✔
1896
                        if modifiers == "i" {
93✔
1897
                                ignoreCase = true
46✔
1898
                        } else {
47✔
1899
                                return nil, errors.Errorf("Invalid regexp modifier: %s", modifiers)
1✔
1900
                        }
1✔
1901
                }
1902
                matchType := "(?m)" // this is cregexp library specific
108✔
1903
                if ignoreCase {
154✔
1904
                        matchType = "(?i)" + matchType
46✔
1905
                }
46✔
1906
                if fc.regex, err = cregexp.Compile(matchType + q.SrcFunc.Args[0]); err != nil {
108✔
1907
                        return nil, err
×
1908
                }
×
1909
                fc.n = 0
108✔
1910
        case hasFn:
3,141✔
1911
                if err = ensureArgsCount(q.SrcFunc, 0); err != nil {
3,141✔
1912
                        return nil, err
×
1913
                }
×
1914
                checkRoot(q, fc)
3,141✔
1915
        case uidInFn:
29✔
1916
                for _, arg := range q.SrcFunc.Args {
50,993✔
1917
                        uidParsed, err := strconv.ParseUint(arg, 0, 64)
50,964✔
1918
                        if err != nil {
50,966✔
1919
                                if e, ok := err.(*strconv.NumError); ok && e.Err == strconv.ErrSyntax {
4✔
1920
                                        return nil, errors.Errorf("Value %q in %s is not a number",
2✔
1921
                                                arg, q.SrcFunc.Name)
2✔
1922
                                }
2✔
1923
                                return nil, err
×
1924
                        }
1925
                        fc.uidsPresent = append(fc.uidsPresent, uidParsed)
50,962✔
1926
                }
1927
                sort.Slice(fc.uidsPresent, func(i, j int) bool {
51,023✔
1928
                        return fc.uidsPresent[i] < fc.uidsPresent[j]
50,996✔
1929
                })
50,996✔
1930
                checkRoot(q, fc)
27✔
1931
                if fc.isFuncAtRoot {
29✔
1932
                        return nil, errors.Errorf("uid_in function not allowed at root")
2✔
1933
                }
2✔
1934
        default:
×
1935
                return nil, errors.Errorf("FnType %d not handled in numFnAttrs.", fnType)
×
1936
        }
1937
        return fc, nil
84,024✔
1938
}
1939

1940
// ServeTask is used to respond to a query.
1941
func (w *grpcWorker) ServeTask(ctx context.Context, q *pb.Query) (*pb.Result, error) {
18,914✔
1942
        ctx, span := otrace.StartSpan(ctx, "worker.ServeTask")
18,914✔
1943
        defer span.End()
18,914✔
1944

18,914✔
1945
        if ctx.Err() != nil {
18,916✔
1946
                return nil, ctx.Err()
2✔
1947
        }
2✔
1948

1949
        // It could be possible that the server isn't ready but a peer sends a
1950
        // request. In that case we should check for the health here.
1951
        if err := x.HealthCheck(); err != nil {
18,937✔
1952
                return nil, err
25✔
1953
        }
25✔
1954

1955
        gid, err := groups().BelongsToReadOnly(q.Attr, q.ReadTs)
18,887✔
1956
        switch {
18,887✔
1957
        case err != nil:
×
1958
                return nil, err
×
1959
        case gid == 0:
×
1960
                return nil, errNonExistentTablet
×
1961
        case gid != groups().groupId():
×
1962
                return nil, errUnservedTablet
×
1963
        }
1964

1965
        var numUids int
18,887✔
1966
        if q.UidList != nil {
34,951✔
1967
                numUids = len(q.UidList.Uids)
16,064✔
1968
        }
16,064✔
1969
        span.Annotatef(nil, "Attribute: %q NumUids: %v groupId: %v ServeTask", q.Attr, numUids, gid)
18,887✔
1970

18,887✔
1971
        if !groups().ServesGroup(gid) {
18,887✔
1972
                return nil, errors.Errorf(
×
1973
                        "Temporary error, attr: %q groupId: %v Request sent to wrong server",
×
1974
                        x.ParseAttr(q.Attr), gid)
×
1975
        }
×
1976

1977
        type reply struct {
18,887✔
1978
                result *pb.Result
18,887✔
1979
                err    error
18,887✔
1980
        }
18,887✔
1981
        c := make(chan reply, 1)
18,887✔
1982
        go func() {
37,774✔
1983
                result, err := processTask(ctx, q, gid)
18,887✔
1984
                c <- reply{result, err}
18,887✔
1985
        }()
18,887✔
1986

1987
        select {
18,887✔
1988
        case <-ctx.Done():
3✔
1989
                return nil, ctx.Err()
3✔
1990
        case reply := <-c:
18,884✔
1991
                return reply.result, reply.err
18,884✔
1992
        }
1993
}
1994

1995
// applyFacetsTree : we return error only when query has some problems.
1996
// like Or has 3 arguments, argument facet val overflows integer.
1997
// returns true if postingFacets can be included.
1998
func applyFacetsTree(postingFacets []*api.Facet, ftree *facetsTree) (bool, error) {
432,220✔
1999
        if ftree == nil {
863,546✔
2000
                return true, nil
431,326✔
2001
        }
431,326✔
2002
        if ftree.function != nil {
1,606✔
2003
                var fc *api.Facet
712✔
2004
                for _, fci := range postingFacets {
1,192✔
2005
                        if fci.Key == ftree.function.key {
707✔
2006
                                fc = fci
227✔
2007
                                break
227✔
2008
                        }
2009
                }
2010
                if fc == nil { // facet is not there
1,197✔
2011
                        return false, nil
485✔
2012
                }
485✔
2013

2014
                switch ftree.function.fnType {
227✔
2015
                case compareAttrFn: // lt, gt, le, ge, eq
206✔
2016
                        fVal, err := facets.ValFor(fc)
206✔
2017
                        if err != nil {
206✔
2018
                                return false, err
×
2019
                        }
×
2020

2021
                        v, ok := ftree.function.typesToVal[fVal.Tid]
206✔
2022
                        if !ok {
207✔
2023
                                // Not found in map and hence convert it here.
1✔
2024
                                v, err = types.Convert(ftree.function.val, fVal.Tid)
1✔
2025
                                if err != nil {
2✔
2026
                                        // ignore facet if not of appropriate type.
1✔
2027
                                        return false, nil
1✔
2028
                                }
1✔
2029
                        }
2030

2031
                        return types.CompareVals(ftree.function.name, fVal, v), nil
205✔
2032

2033
                case standardFn: // allofterms, anyofterms
21✔
2034
                        facetType, err := facets.TypeIDFor(fc)
21✔
2035
                        if err != nil {
21✔
2036
                                return false, err
×
2037
                        }
×
2038
                        if facetType != types.StringID {
21✔
2039
                                return false, nil
×
2040
                        }
×
2041
                        return filterOnStandardFn(ftree.function.name, fc.Tokens, ftree.function.tokens)
21✔
2042
                }
2043
                return false, errors.Errorf("Fn %s not supported in facets filtering.", ftree.function.name)
×
2044
        }
2045

2046
        res := make([]bool, 0, 2) // We can have max two children for a node.
182✔
2047
        for _, c := range ftree.children {
541✔
2048
                r, err := applyFacetsTree(postingFacets, c)
359✔
2049
                if err != nil {
359✔
2050
                        return false, err
×
2051
                }
×
2052
                res = append(res, r)
359✔
2053
        }
2054

2055
        // we have already checked for number of children in preprocessFilter
2056
        switch ftree.op {
182✔
2057
        case "not":
5✔
2058
                return !res[0], nil
5✔
2059
        case "and":
143✔
2060
                return res[0] && res[1], nil
143✔
2061
        case "or":
34✔
2062
                return res[0] || res[1], nil
34✔
2063
        }
2064
        return false, errors.Errorf("Unexpected behavior in applyFacetsTree.")
×
2065
}
2066

2067
// filterOnStandardFn : tells whether facet corresponding to fcTokens can be taken or not.
2068
// fcTokens and argTokens should be sorted.
2069
func filterOnStandardFn(fname string, fcTokens []string, argTokens []string) (bool, error) {
21✔
2070
        switch fname {
21✔
2071
        case "allofterms":
12✔
2072
                // allofterms argTokens should be in fcTokens
12✔
2073
                if len(argTokens) > len(fcTokens) {
13✔
2074
                        return false, nil
1✔
2075
                }
1✔
2076
                aidx := 0
11✔
2077
        loop:
11✔
2078
                for fidx := 0; aidx < len(argTokens) && fidx < len(fcTokens); {
46✔
2079
                        switch {
35✔
2080
                        case fcTokens[fidx] < argTokens[aidx]:
13✔
2081
                                fidx++
13✔
2082
                        case fcTokens[fidx] == argTokens[aidx]:
17✔
2083
                                fidx++
17✔
2084
                                aidx++
17✔
2085
                        default:
5✔
2086
                                // as all of argTokens should match
5✔
2087
                                // which is not possible now.
5✔
2088
                                break loop
5✔
2089
                        }
2090
                }
2091
                return aidx == len(argTokens), nil
11✔
2092
        case "anyofterms":
9✔
2093
                for aidx, fidx := 0, 0; aidx < len(argTokens) && fidx < len(fcTokens); {
31✔
2094
                        switch {
22✔
2095
                        case fcTokens[fidx] < argTokens[aidx]:
11✔
2096
                                fidx++
11✔
2097
                        case fcTokens[fidx] == argTokens[aidx]:
5✔
2098
                                return true, nil
5✔
2099
                        default:
6✔
2100
                                aidx++
6✔
2101
                        }
2102
                }
2103
                return false, nil
4✔
2104
        }
2105
        return false, errors.Errorf("Fn %s not supported in facets filtering.", fname)
×
2106
}
2107

2108
type facetsFunc struct {
2109
        name   string
2110
        key    string
2111
        args   []string
2112
        tokens []string
2113
        val    types.Val
2114
        fnType FuncType
2115
        // typesToVal stores converted vals of the function val for all common types. Converting
2116
        // function val to particular type val(check applyFacetsTree()) consumes significant amount of
2117
        // time. This maps helps in doing conversion only once(check preprocessFilter()).
2118
        typesToVal map[types.TypeID]types.Val
2119
}
2120
type facetsTree struct {
2121
        op       string
2122
        children []*facetsTree
2123
        function *facetsFunc
2124
}
2125

2126
// commonTypeIDs is list of type ids which are more common. In preprocessFilter() we keep converted
2127
// values for these typeIDs at every function node.
2128
var commonTypeIDs = [...]types.TypeID{types.StringID, types.IntID, types.FloatID,
2129
        types.DateTimeID, types.BoolID, types.DefaultID}
2130

2131
func preprocessFilter(tree *pb.FilterTree) (*facetsTree, error) {
84,006✔
2132
        if tree == nil {
167,933✔
2133
                return nil, nil
83,927✔
2134
        }
83,927✔
2135
        ftree := &facetsTree{}
79✔
2136
        ftree.op = strings.ToLower(tree.Op)
79✔
2137
        if tree.Func != nil {
142✔
2138
                ftree.function = &facetsFunc{}
63✔
2139
                ftree.function.key = tree.Func.Key
63✔
2140
                ftree.function.args = tree.Func.Args
63✔
2141

63✔
2142
                fnType, fname := parseFuncTypeHelper(tree.Func.Name)
63✔
2143
                if len(tree.Func.Args) != 1 {
63✔
2144
                        return nil, errors.Errorf("One argument expected in %s, but got %d.",
×
2145
                                fname, len(tree.Func.Args))
×
2146
                }
×
2147

2148
                ftree.function.name = fname
63✔
2149
                ftree.function.fnType = fnType
63✔
2150

63✔
2151
                switch fnType {
63✔
2152
                case compareAttrFn:
52✔
2153
                        ftree.function.val = types.Val{Tid: types.StringID, Value: []byte(tree.Func.Args[0])}
52✔
2154
                        ftree.function.typesToVal = make(map[types.TypeID]types.Val, len(commonTypeIDs))
52✔
2155
                        for _, typeID := range commonTypeIDs {
364✔
2156
                                // TODO: if conversion is not possible we are not putting anything to map. In
312✔
2157
                                // applyFacetsTree we check if entry for a type is not present, we try to convert
312✔
2158
                                // it. This double conversion can be avoided.
312✔
2159
                                cv, err := types.Convert(ftree.function.val, typeID)
312✔
2160
                                if err != nil {
476✔
2161
                                        continue
164✔
2162
                                }
2163
                                ftree.function.typesToVal[typeID] = cv
148✔
2164
                        }
2165
                case standardFn:
11✔
2166
                        argTokens, aerr := tok.GetTermTokens(tree.Func.Args)
11✔
2167
                        if aerr != nil { // query error ; stop processing.
11✔
2168
                                return nil, aerr
×
2169
                        }
×
2170
                        sort.Strings(argTokens)
11✔
2171
                        ftree.function.tokens = argTokens
11✔
2172
                default:
×
2173
                        return nil, errors.Errorf("Fn %s not supported in preprocessFilter.", fname)
×
2174
                }
2175
                return ftree, nil
63✔
2176
        }
2177

2178
        for _, c := range tree.Children {
47✔
2179
                ftreec, err := preprocessFilter(c)
31✔
2180
                if err != nil {
31✔
2181
                        return nil, err
×
2182
                }
×
2183
                ftree.children = append(ftree.children, ftreec)
31✔
2184
        }
2185

2186
        numChild := len(tree.Children)
16✔
2187
        switch ftree.op {
16✔
2188
        case "not":
1✔
2189
                if numChild != 1 {
1✔
2190
                        return nil, errors.Errorf("Expected 1 child for not but got %d.", numChild)
×
2191
                }
×
2192
        case "and":
7✔
2193
                if numChild != 2 {
7✔
2194
                        return nil, errors.Errorf("Expected 2 child for not but got %d.", numChild)
×
2195
                }
×
2196
        case "or":
8✔
2197
                if numChild != 2 {
8✔
2198
                        return nil, errors.Errorf("Expected 2 child for not but got %d.", numChild)
×
2199
                }
×
2200
        default:
×
2201
                return nil, errors.Errorf("Unsupported operation in facet filtering: %s.", tree.Op)
×
2202
        }
2203
        return ftree, nil
16✔
2204
}
2205

2206
type countParams struct {
2207
        readTs  uint64
2208
        counts  []int64
2209
        attr    string
2210
        gid     uint32
2211
        reverse bool   // If query is asking for ~pred
2212
        fn      string // function name
2213
}
2214

2215
func (qs *queryState) evaluate(cp countParams, out *pb.Result) error {
32✔
2216
        countl := cp.counts[0]
32✔
2217
        var counth int64
32✔
2218
        if cp.fn == between {
35✔
2219
                counth = cp.counts[1]
3✔
2220
        }
3✔
2221
        var illegal bool
32✔
2222
        switch cp.fn {
32✔
2223
        case "eq":
21✔
2224
                illegal = countl <= 0
21✔
2225
        case "lt":
×
2226
                illegal = countl <= 1
×
2227
        case "le":
1✔
2228
                illegal = countl <= 0
1✔
2229
        case "gt":
5✔
2230
                illegal = countl < 0
5✔
2231
        case "ge":
2✔
2232
                illegal = countl <= 0
2✔
2233
        case "between":
3✔
2234
                illegal = countl <= 0 || counth <= 0
3✔
2235
        default:
×
2236
                x.AssertTruef(false, "unhandled count comparison fn: %v", cp.fn)
×
2237
        }
2238
        if illegal {
33✔
2239
                return errors.Errorf("count(predicate) cannot be used to search for " +
1✔
2240
                        "negative counts (nonsensical) or zero counts (not tracked).")
1✔
2241
        }
1✔
2242

2243
        countKey := x.CountKey(cp.attr, uint32(countl), cp.reverse)
31✔
2244
        if cp.fn == "eq" {
51✔
2245
                pl, err := qs.cache.Get(countKey)
20✔
2246
                if err != nil {
20✔
2247
                        return err
×
2248
                }
×
2249
                uids, err := pl.Uids(posting.ListOptions{ReadTs: cp.readTs})
20✔
2250
                if err != nil {
20✔
2251
                        return err
×
2252
                }
×
2253
                out.UidMatrix = append(out.UidMatrix, uids)
20✔
2254
                return nil
20✔
2255
        }
2256

2257
        switch cp.fn {
11✔
2258
        case "lt":
×
2259
                countl--
×
2260
        case "gt":
5✔
2261
                countl++
5✔
2262
        }
2263

2264
        x.AssertTrue(countl >= 1)
11✔
2265
        countKey = x.CountKey(cp.attr, uint32(countl), cp.reverse)
11✔
2266

11✔
2267
        txn := pstore.NewTransactionAt(cp.readTs, false)
11✔
2268
        defer txn.Discard()
11✔
2269

11✔
2270
        pk := x.ParsedKey{Attr: cp.attr}
11✔
2271
        itOpt := badger.DefaultIteratorOptions
11✔
2272
        itOpt.PrefetchValues = false
11✔
2273
        itOpt.Reverse = cp.fn == "le" || cp.fn == "lt"
11✔
2274
        itOpt.Prefix = pk.CountPrefix(cp.reverse)
11✔
2275

11✔
2276
        itr := txn.NewIterator(itOpt)
11✔
2277
        defer itr.Close()
11✔
2278

11✔
2279
        for itr.Seek(countKey); itr.Valid(); itr.Next() {
39✔
2280
                item := itr.Item()
28✔
2281
                var key []byte
28✔
2282
                key = item.KeyCopy(key)
28✔
2283
                k, err := x.Parse(key)
28✔
2284
                if err != nil {
28✔
2285
                        return err
×
2286
                }
×
2287
                if cp.fn == between && int64(k.Count) > counth {
30✔
2288
                        break
2✔
2289
                }
2290

2291
                pl, err := qs.cache.Get(item.KeyCopy(key))
26✔
2292
                if err != nil {
26✔
2293
                        return err
×
2294
                }
×
2295
                uids, err := pl.Uids(posting.ListOptions{ReadTs: cp.readTs})
26✔
2296
                if err != nil {
26✔
2297
                        return err
×
2298
                }
×
2299
                out.UidMatrix = append(out.UidMatrix, uids)
26✔
2300
        }
2301
        return nil
11✔
2302
}
2303

2304
func (qs *queryState) handleHasFunction(ctx context.Context, q *pb.Query, out *pb.Result,
2305
        srcFn *functionContext) error {
3,089✔
2306
        span := otrace.FromContext(ctx)
3,089✔
2307
        stop := x.SpanTimer(span, "handleHasFunction")
3,089✔
2308
        defer stop()
3,089✔
2309
        if glog.V(3) {
3,100✔
2310
                glog.Infof("handleHasFunction query: %+v\n", q)
11✔
2311
        }
11✔
2312

2313
        txn := pstore.NewTransactionAt(q.ReadTs, false)
3,089✔
2314
        defer txn.Discard()
3,089✔
2315

3,089✔
2316
        initKey := x.ParsedKey{
3,089✔
2317
                Attr: q.Attr,
3,089✔
2318
        }
3,089✔
2319
        startKey := x.DataKey(q.Attr, q.AfterUid+1)
3,089✔
2320
        prefix := initKey.DataPrefix()
3,089✔
2321
        if q.Reverse {
3,093✔
2322
                // Reverse does not mean reverse iteration. It means we're looking for
4✔
2323
                // the reverse index.
4✔
2324
                startKey = x.ReverseKey(q.Attr, q.AfterUid+1)
4✔
2325
                prefix = initKey.ReversePrefix()
4✔
2326
        }
4✔
2327

2328
        result := &pb.List{}
3,089✔
2329
        var prevKey []byte
3,089✔
2330
        itOpt := badger.DefaultIteratorOptions
3,089✔
2331
        itOpt.PrefetchValues = false
3,089✔
2332
        itOpt.AllVersions = true
3,089✔
2333
        itOpt.Prefix = prefix
3,089✔
2334
        it := txn.NewIterator(itOpt)
3,089✔
2335
        defer it.Close()
3,089✔
2336

3,089✔
2337
        lang := langForFunc(q.Langs)
3,089✔
2338
        needFiltering := needsStringFiltering(srcFn, q.Langs, q.Attr)
3,089✔
2339

3,089✔
2340
        // This function checks if we should include uid in result or not when has is queried with
3,089✔
2341
        // @lang(eg: has(name@en)). We need to do this inside this function to return correct result
3,089✔
2342
        // for first.
3,089✔
2343
        checkInclusion := func(uid uint64) error {
8,755✔
2344
                if !needFiltering {
9,172✔
2345
                        return nil
3,506✔
2346
                }
3,506✔
2347

2348
                _, err := qs.getValsForUID(q.Attr, lang, uid, q.ReadTs)
2,160✔
2349
                return err
2,160✔
2350
        }
2351

2352
        cnt := int32(0)
3,089✔
2353
loop:
3,089✔
2354
        // This function could be switched to the stream.Lists framework, but after the change to use
3,089✔
2355
        // BitCompletePosting, the speed here is already pretty fast. The slowdown for @lang predicates
3,089✔
2356
        // occurs in filterStringFunction (like has(name) queries).
3,089✔
2357
        for it.Seek(startKey); it.Valid(); {
22,367✔
2358
                item := it.Item()
19,278✔
2359
                if bytes.Equal(item.Key(), prevKey) {
32,810✔
2360
                        it.Next()
13,532✔
2361
                        continue
13,532✔
2362
                }
2363
                prevKey = append(prevKey[:0], item.Key()...)
5,746✔
2364

5,746✔
2365
                // Parse the key upfront, otherwise ReadPostingList would advance the
5,746✔
2366
                // iterator.
5,746✔
2367
                pk, err := x.Parse(item.Key())
5,746✔
2368
                if err != nil {
5,746✔
2369
                        return err
×
2370
                }
×
2371

2372
                if pk.HasStartUid {
5,746✔
2373
                        // The keys holding parts of a split key should not be accessed here because
×
2374
                        // they have a different prefix. However, the check is being added to guard
×
2375
                        // against future bugs.
×
2376
                        continue
×
2377
                }
2378

2379
                // The following optimization speeds up this iteration considerably, because it avoids
2380
                // the need to run ReadPostingList.
2381
                if item.UserMeta()&posting.BitEmptyPosting > 0 {
5,813✔
2382
                        // This is an empty posting list. So, it should not be included.
67✔
2383
                        continue
67✔
2384
                }
2385
                if item.UserMeta()&posting.BitCompletePosting > 0 {
7,978✔
2386
                        // This bit would only be set if there are valid uids in UidPack.
2,299✔
2387
                        err := checkInclusion(pk.Uid)
2,299✔
2388
                        switch {
2,299✔
2389
                        case err == posting.ErrNoValue:
286✔
2390
                                continue
286✔
2391
                        case err != nil:
×
2392
                                return err
×
2393
                        }
2394
                        // skip entries upto Offset and do not store in the result.
2395
                        if cnt < q.Offset {
2,023✔
2396
                                cnt++
10✔
2397
                                continue
10✔
2398
                        }
2399
                        result.Uids = append(result.Uids, pk.Uid)
2,003✔
2400

2,003✔
2401
                        // We'll stop fetching if we fetch the required count.
2,003✔
2402
                        if len(result.Uids) >= int(q.First) {
2,014✔
2403
                                break
11✔
2404
                        }
2405
                        continue
1,992✔
2406
                }
2407

2408
                // We do need to copy over the key for ReadPostingList.
2409
                l, err := posting.ReadPostingList(item.KeyCopy(nil), it)
3,380✔
2410
                if err != nil {
3,380✔
2411
                        return err
×
2412
                }
×
2413
                empty, err := l.IsEmpty(q.ReadTs, 0)
3,380✔
2414
                switch {
3,380✔
2415
                case err != nil:
×
2416
                        return err
×
2417
                case !empty:
3,367✔
2418
                        err := checkInclusion(pk.Uid)
3,367✔
2419
                        switch {
3,367✔
2420
                        case err == posting.ErrNoValue:
132✔
2421
                                continue
132✔
2422
                        case err != nil:
×
2423
                                return err
×
2424
                        }
2425
                        // skip entries upto Offset and do not store in the result.
2426
                        if cnt < q.Offset {
3,235✔
2427
                                cnt++
×
2428
                                continue
×
2429
                        }
2430
                        result.Uids = append(result.Uids, pk.Uid)
3,235✔
2431

3,235✔
2432
                        // We'll stop fetching if we fetch the required count.
3,235✔
2433
                        if len(result.Uids) >= int(q.First) {
3,242✔
2434
                                break loop
7✔
2435
                        }
2436
                }
2437

2438
                if len(result.Uids)%100000 == 0 {
3,249✔
2439
                        select {
8✔
2440
                        case <-ctx.Done():
×
2441
                                return ctx.Err()
×
2442
                        default:
8✔
2443
                        }
2444
                }
2445
        }
2446
        if span != nil {
6,178✔
2447
                span.Annotatef(nil, "handleHasFunction found %d uids", len(result.Uids))
3,089✔
2448
        }
3,089✔
2449
        out.UidMatrix = append(out.UidMatrix, result)
3,089✔
2450
        return nil
3,089✔
2451
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc