• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

dgraph-io / dgraph / 5262646808

14 Jun 2023 03:10AM UTC coverage: 67.209% (+0.004%) from 67.205%
5262646808

push

web-flow
Merge 3e9114f07 into 2787cfc58

58347 of 86814 relevant lines covered (67.21%)

2247229.13 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

59.46
/dgraph/cmd/bulk/loader.go
1
/*
2
 * Copyright 2017-2023 Dgraph Labs, Inc. and Contributors
3
 *
4
 * Licensed under the Apache License, Version 2.0 (the "License");
5
 * you may not use this file except in compliance with the License.
6
 * You may obtain a copy of the License at
7
 *
8
 *     http://www.apache.org/licenses/LICENSE-2.0
9
 *
10
 * Unless required by applicable law or agreed to in writing, software
11
 * distributed under the License is distributed on an "AS IS" BASIS,
12
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
 * See the License for the specific language governing permissions and
14
 * limitations under the License.
15
 */
16

17
package bulk
18

19
import (
20
        "bytes"
21
        "compress/gzip"
22
        "context"
23
        "encoding/json"
24
        "fmt"
25
        "hash/adler32"
26
        "io"
27
        "log"
28
        "math"
29
        "os"
30
        "path/filepath"
31
        "strconv"
32
        "sync"
33
        "time"
34

35
        "github.com/golang/glog"
36
        "google.golang.org/grpc"
37
        "google.golang.org/grpc/credentials"
38
        "google.golang.org/grpc/credentials/insecure"
39

40
        "github.com/dgraph-io/badger/v4"
41
        "github.com/dgraph-io/badger/v4/y"
42
        "github.com/dgraph-io/dgraph/chunker"
43
        "github.com/dgraph-io/dgraph/ee/enc"
44
        "github.com/dgraph-io/dgraph/filestore"
45
        "github.com/dgraph-io/dgraph/protos/pb"
46
        "github.com/dgraph-io/dgraph/schema"
47
        "github.com/dgraph-io/dgraph/x"
48
        "github.com/dgraph-io/dgraph/xidmap"
49
)
50

51
type options struct {
52
        DataFiles        string
53
        DataFormat       string
54
        SchemaFile       string
55
        GqlSchemaFile    string
56
        OutDir           string
57
        ReplaceOutDir    bool
58
        TmpDir           string
59
        NumGoroutines    int
60
        MapBufSize       uint64
61
        PartitionBufSize int64
62
        SkipMapPhase     bool
63
        CleanupTmp       bool
64
        NumReducers      int
65
        Version          bool
66
        StoreXids        bool
67
        ZeroAddr         string
68
        HttpAddr         string
69
        IgnoreErrors     bool
70
        CustomTokenizers string
71
        NewUids          bool
72
        ClientDir        string
73
        Encrypted        bool
74
        EncryptedOut     bool
75

76
        MapShards    int
77
        ReduceShards int
78

79
        Namespace uint64
80

81
        shardOutputDirs []string
82

83
        // ........... Badger options ..........
84
        // EncryptionKey is the key used for encryption. Enterprise only feature.
85
        EncryptionKey x.Sensitive
86
        // Badger options.
87
        Badger badger.Options
88
}
89

90
type state struct {
91
        opt           *options
92
        prog          *progress
93
        xids          *xidmap.XidMap
94
        schema        *schemaStore
95
        shards        *shardMap
96
        readerChunkCh chan *bytes.Buffer
97
        mapFileId     uint32 // Used atomically to name the output files of the mappers.
98
        dbs           []*badger.DB
99
        tmpDbs        []*badger.DB // Temporary DB to write the split lists to avoid ordering issues.
100
        writeTs       uint64       // All badger writes use this timestamp
101
        namespaces    *sync.Map    // To store the encountered namespaces.
102
}
103

104
type loader struct {
105
        *state
106
        mappers []*mapper
107
        zero    *grpc.ClientConn
108
}
109

110
func newLoader(opt *options) *loader {
2✔
111
        if opt == nil {
2✔
112
                log.Fatalf("Cannot create loader with nil options.")
×
113
        }
×
114

115
        fmt.Printf("Connecting to zero at %s\n", opt.ZeroAddr)
2✔
116
        ctx, cancel := context.WithTimeout(context.Background(), time.Minute)
2✔
117
        defer cancel()
2✔
118

2✔
119
        tlsConf, err := x.LoadClientTLSConfigForInternalPort(Bulk.Conf)
2✔
120
        x.Check(err)
2✔
121
        dialOpts := []grpc.DialOption{
2✔
122
                grpc.WithBlock(),
2✔
123
        }
2✔
124
        if tlsConf != nil {
2✔
125
                dialOpts = append(dialOpts, grpc.WithTransportCredentials(credentials.NewTLS(tlsConf)))
×
126
        } else {
2✔
127
                dialOpts = append(dialOpts, grpc.WithTransportCredentials(insecure.NewCredentials()))
2✔
128
        }
2✔
129
        zero, err := grpc.DialContext(ctx, opt.ZeroAddr, dialOpts...)
2✔
130
        x.Checkf(err, "Unable to connect to zero, Is it running at %s?", opt.ZeroAddr)
2✔
131
        st := &state{
2✔
132
                opt:    opt,
2✔
133
                prog:   newProgress(),
2✔
134
                shards: newShardMap(opt.MapShards),
2✔
135
                // Lots of gz readers, so not much channel buffer needed.
2✔
136
                readerChunkCh: make(chan *bytes.Buffer, opt.NumGoroutines),
2✔
137
                writeTs:       getWriteTimestamp(zero),
2✔
138
                namespaces:    &sync.Map{},
2✔
139
        }
2✔
140
        st.schema = newSchemaStore(readSchema(opt), opt, st)
2✔
141
        ld := &loader{
2✔
142
                state:   st,
2✔
143
                mappers: make([]*mapper, opt.NumGoroutines),
2✔
144
                zero:    zero,
2✔
145
        }
2✔
146
        for i := 0; i < opt.NumGoroutines; i++ {
10✔
147
                ld.mappers[i] = newMapper(st)
8✔
148
        }
8✔
149
        go ld.prog.report()
2✔
150
        return ld
2✔
151
}
152

153
func getWriteTimestamp(zero *grpc.ClientConn) uint64 {
2✔
154
        client := pb.NewZeroClient(zero)
2✔
155
        for {
4✔
156
                ctx, cancel := context.WithTimeout(context.Background(), time.Second)
2✔
157
                ts, err := client.Timestamps(ctx, &pb.Num{Val: 1})
2✔
158
                cancel()
2✔
159
                if err == nil {
4✔
160
                        return ts.GetStartId()
2✔
161
                }
2✔
162
                fmt.Printf("Error communicating with dgraph zero, retrying: %v", err)
×
163
                time.Sleep(time.Second)
×
164
        }
165
}
166

167
// leaseNamespace is called at the end of map phase. It leases the namespace ids till the maximum
168
// seen namespace id.
169
func (ld *loader) leaseNamespaces() {
2✔
170
        var maxNs uint64
2✔
171
        ld.namespaces.Range(func(key, value interface{}) bool {
4✔
172
                if ns := key.(uint64); ns > maxNs {
2✔
173
                        maxNs = ns
×
174
                }
×
175
                return true
2✔
176
        })
177

178
        // If only the default namespace is seen, do nothing.
179
        if maxNs == 0 {
4✔
180
                return
2✔
181
        }
2✔
182

183
        client := pb.NewZeroClient(ld.zero)
×
184
        for {
×
185
                ctx, cancel := context.WithTimeout(context.Background(), time.Second)
×
186
                ns, err := client.AssignIds(ctx, &pb.Num{Val: maxNs, Type: pb.Num_NS_ID})
×
187
                cancel()
×
188
                if err == nil {
×
189
                        fmt.Printf("Assigned namespaces till %d", ns.GetEndId())
×
190
                        return
×
191
                }
×
192
                fmt.Printf("Error communicating with dgraph zero, retrying: %v", err)
×
193
                time.Sleep(time.Second)
×
194
        }
195
}
196

197
func readSchema(opt *options) *schema.ParsedSchema {
2✔
198
        f, err := filestore.Open(opt.SchemaFile)
2✔
199
        x.Check(err)
2✔
200
        defer func() {
4✔
201
                if err := f.Close(); err != nil {
2✔
202
                        glog.Warningf("error while closing fd: %v", err)
×
203
                }
×
204
        }()
205

206
        key := opt.EncryptionKey
2✔
207
        if !opt.Encrypted {
4✔
208
                key = nil
2✔
209
        }
2✔
210
        r, err := enc.GetReader(key, f)
2✔
211
        x.Check(err)
2✔
212
        if filepath.Ext(opt.SchemaFile) == ".gz" {
2✔
213
                r, err = gzip.NewReader(r)
×
214
                x.Check(err)
×
215
        }
×
216

217
        buf, err := io.ReadAll(r)
2✔
218
        x.Check(err)
2✔
219

2✔
220
        result, err := schema.ParseWithNamespace(string(buf), opt.Namespace)
2✔
221
        x.Check(err)
2✔
222
        return result
2✔
223
}
224

225
func (ld *loader) mapStage() {
2✔
226
        ld.prog.setPhase(mapPhase)
2✔
227
        var db *badger.DB
2✔
228
        if len(ld.opt.ClientDir) > 0 {
2✔
229
                x.Check(os.MkdirAll(ld.opt.ClientDir, 0700))
×
230

×
231
                var err error
×
232
                db, err = badger.Open(badger.DefaultOptions(ld.opt.ClientDir))
×
233
                x.Checkf(err, "Error while creating badger KV posting store")
×
234
        }
×
235
        ld.xids = xidmap.New(xidmap.XidMapOptions{
2✔
236
                UidAssigner: ld.zero,
2✔
237
                DB:          db,
2✔
238
                Dir:         filepath.Join(ld.opt.TmpDir, bufferDir),
2✔
239
        })
2✔
240

2✔
241
        fs := filestore.NewFileStore(ld.opt.DataFiles)
2✔
242

2✔
243
        files := fs.FindDataFiles(ld.opt.DataFiles, []string{".rdf", ".rdf.gz", ".json", ".json.gz"})
2✔
244
        if len(files) == 0 {
2✔
245
                fmt.Printf("No data files found in %s.\n", ld.opt.DataFiles)
×
246
                os.Exit(1)
×
247
        }
×
248

249
        // Because mappers must handle chunks that may be from different input files, they must all
250
        // assume the same data format, either RDF or JSON. Use the one specified by the user or by
251
        // the first load file.
252
        loadType := chunker.DataFormat(files[0], ld.opt.DataFormat)
2✔
253
        if loadType == chunker.UnknownFormat {
2✔
254
                // Dont't try to detect JSON input in bulk loader.
×
255
                fmt.Printf("Need --format=rdf or --format=json to load %s", files[0])
×
256
                os.Exit(1)
×
257
        }
×
258

259
        var mapperWg sync.WaitGroup
2✔
260
        mapperWg.Add(len(ld.mappers))
2✔
261
        for _, m := range ld.mappers {
10✔
262
                go func(m *mapper) {
16✔
263
                        m.run(loadType)
8✔
264
                        mapperWg.Done()
8✔
265
                }(m)
8✔
266
        }
267

268
        // This is the main map loop.
269
        thr := y.NewThrottle(ld.opt.NumGoroutines)
2✔
270
        for i, file := range files {
50✔
271
                x.Check(thr.Do())
48✔
272
                fmt.Printf("Processing file (%d out of %d): %s\n", i+1, len(files), file)
48✔
273

48✔
274
                go func(file string) {
96✔
275
                        defer thr.Done(nil)
48✔
276

48✔
277
                        key := ld.opt.EncryptionKey
48✔
278
                        if !ld.opt.Encrypted {
96✔
279
                                key = nil
48✔
280
                        }
48✔
281
                        r, cleanup := fs.ChunkReader(file, key)
48✔
282
                        defer cleanup()
48✔
283

48✔
284
                        chunk := chunker.NewChunker(loadType, 1000)
48✔
285
                        for {
326✔
286
                                chunkBuf, err := chunk.Chunk(r)
278✔
287
                                if chunkBuf != nil && chunkBuf.Len() > 0 {
556✔
288
                                        ld.readerChunkCh <- chunkBuf
278✔
289
                                }
278✔
290
                                if err == io.EOF {
326✔
291
                                        break
48✔
292
                                } else if err != nil {
230✔
293
                                        x.Check(err)
×
294
                                }
×
295
                        }
296
                }(file)
297
        }
298
        x.Check(thr.Finish())
2✔
299

2✔
300
        // Send the graphql triples
2✔
301
        ld.processGqlSchema(loadType)
2✔
302

2✔
303
        close(ld.readerChunkCh)
2✔
304
        mapperWg.Wait()
2✔
305

2✔
306
        // Allow memory to GC before the reduce phase.
2✔
307
        for i := range ld.mappers {
10✔
308
                ld.mappers[i] = nil
8✔
309
        }
8✔
310
        x.Check(ld.xids.Flush())
2✔
311
        if db != nil {
2✔
312
                x.Check(db.Close())
×
313
        }
×
314
        ld.xids = nil
2✔
315
}
316

317
func parseGqlSchema(s string) map[uint64]string {
×
318
        var schemas []x.ExportedGQLSchema
×
319
        if err := json.Unmarshal([]byte(s), &schemas); err != nil {
×
320
                fmt.Println("Error while decoding the graphql schema. Assuming it to be in format < 21.03.")
×
321
                return map[uint64]string{x.GalaxyNamespace: s}
×
322
        }
×
323

324
        schemaMap := make(map[uint64]string)
×
325
        for _, schema := range schemas {
×
326
                if _, ok := schemaMap[schema.Namespace]; ok {
×
327
                        fmt.Printf("Found multiple GraphQL schema for namespace %d.", schema.Namespace)
×
328
                        continue
×
329
                }
330
                schemaMap[schema.Namespace] = schema.Schema
×
331
        }
332
        return schemaMap
×
333
}
334

335
func (ld *loader) processGqlSchema(loadType chunker.InputFormat) {
2✔
336
        if ld.opt.GqlSchemaFile == "" {
4✔
337
                return
2✔
338
        }
2✔
339

340
        f, err := filestore.Open(ld.opt.GqlSchemaFile)
×
341
        x.Check(err)
×
342
        defer func() {
×
343
                if err := f.Close(); err != nil {
×
344
                        glog.Warningf("error while closing fd: %v", err)
×
345
                }
×
346
        }()
347

348
        key := ld.opt.EncryptionKey
×
349
        if !ld.opt.Encrypted {
×
350
                key = nil
×
351
        }
×
352
        r, err := enc.GetReader(key, f)
×
353
        x.Check(err)
×
354
        if filepath.Ext(ld.opt.GqlSchemaFile) == ".gz" {
×
355
                r, err = gzip.NewReader(r)
×
356
                x.Check(err)
×
357
        }
×
358

359
        buf, err := io.ReadAll(r)
×
360
        x.Check(err)
×
361

×
362
        rdfSchema := `_:gqlschema <dgraph.type> "dgraph.graphql" <%#x> .
×
363
        _:gqlschema <dgraph.graphql.xid> "dgraph.graphql.schema" <%#x> .
×
364
        _:gqlschema <dgraph.graphql.schema> %s <%#x> .
×
365
        `
×
366

×
367
        jsonSchema := `{
×
368
                "namespace": "%#x",
×
369
                "dgraph.type": "dgraph.graphql",
×
370
                "dgraph.graphql.xid": "dgraph.graphql.schema",
×
371
                "dgraph.graphql.schema": %s
×
372
        }`
×
373

×
374
        process := func(ns uint64, schema string) {
×
375
                // Ignore the schema if the namespace is not already seen.
×
376
                if _, ok := ld.schema.namespaces.Load(ns); !ok {
×
377
                        fmt.Printf("No data exist for namespace: %d. Cannot load the graphql schema.", ns)
×
378
                        return
×
379
                }
×
380
                gqlBuf := &bytes.Buffer{}
×
381
                schema = strconv.Quote(schema)
×
382
                switch loadType {
×
383
                case chunker.RdfFormat:
×
384
                        x.Check2(gqlBuf.Write([]byte(fmt.Sprintf(rdfSchema, ns, ns, schema, ns))))
×
385
                case chunker.JsonFormat:
×
386
                        x.Check2(gqlBuf.Write([]byte(fmt.Sprintf(jsonSchema, ns, schema))))
×
387
                }
388
                ld.readerChunkCh <- gqlBuf
×
389
        }
390

391
        schemas := parseGqlSchema(string(buf))
×
392
        if ld.opt.Namespace == math.MaxUint64 {
×
393
                // Preserve the namespace.
×
394
                for ns, schema := range schemas {
×
395
                        process(ns, schema)
×
396
                }
×
397
                return
×
398
        }
399

400
        switch len(schemas) {
×
401
        case 1:
×
402
                // User might have exported from a different namespace. So, schema.Namespace will not be
×
403
                // having the correct value.
×
404
                for _, schema := range schemas {
×
405
                        process(ld.opt.Namespace, schema)
×
406
                }
×
407
        default:
×
408
                if _, ok := schemas[ld.opt.Namespace]; !ok {
×
409
                        // We expect only a single GraphQL schema when loading into specific namespace.
×
410
                        fmt.Printf("Didn't find GraphQL schema for namespace %d. Not loading GraphQL schema.",
×
411
                                ld.opt.Namespace)
×
412
                        return
×
413
                }
×
414
                process(ld.opt.Namespace, schemas[ld.opt.Namespace])
×
415
        }
416
}
417

418
func (ld *loader) reduceStage() {
2✔
419
        ld.prog.setPhase(reducePhase)
2✔
420

2✔
421
        r := reducer{
2✔
422
                state:     ld.state,
2✔
423
                streamIds: make(map[string]uint32),
2✔
424
        }
2✔
425
        x.Check(r.run())
2✔
426
}
2✔
427

428
func (ld *loader) writeSchema() {
2✔
429
        numDBs := uint32(len(ld.dbs))
2✔
430
        preds := make([][]string, numDBs)
2✔
431

2✔
432
        // Get all predicates that have data in some DB.
2✔
433
        m := make(map[string]struct{})
2✔
434
        for i, db := range ld.dbs {
4✔
435
                preds[i] = ld.schema.getPredicates(db)
2✔
436
                for _, p := range preds[i] {
70✔
437
                        m[p] = struct{}{}
68✔
438
                }
68✔
439
        }
440

441
        // Find any predicates that don't have data in any DB
442
        // and distribute them among all the DBs.
443
        for p := range ld.schema.schemaMap {
92✔
444
                if _, ok := m[p]; !ok {
112✔
445
                        i := adler32.Checksum([]byte(p)) % numDBs
22✔
446
                        preds[i] = append(preds[i], p)
22✔
447
                }
22✔
448
        }
449

450
        // Write out each DB's final predicate list.
451
        for i, db := range ld.dbs {
4✔
452
                ld.schema.write(db, preds[i])
2✔
453
        }
2✔
454
}
455

456
func (ld *loader) cleanup() {
2✔
457
        for _, db := range ld.dbs {
4✔
458
                x.Check(db.Close())
2✔
459
        }
2✔
460
        for _, db := range ld.tmpDbs {
4✔
461
                opts := db.Opts()
2✔
462
                x.Check(db.Close())
2✔
463
                x.Check(os.RemoveAll(opts.Dir))
2✔
464
        }
2✔
465
        ld.prog.endSummary()
2✔
466
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc