• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

dgraph-io / dgraph / 5961268406

24 Aug 2023 08:21AM UTC coverage: 66.76% (-0.3%) from 67.079%
5961268406

push

web-flow
feat(debug): add parse_key to debug tool (#7640)

This adds a `dgraph debug --parse_key` flag that can return the ParsedKey
struct of a hex key. This is useful if there's a lone key that you want parse
without having to have a p directory around. This flag does not need a p
directory, just the hex key string.

Example:

$ dgraph debug --parse_key
<a class=hub.com/dgraph-io/dgraph/commit/000000000000000000000b6467726170682e7479">0000000007065000000000000000001
     {d} Key: UID: 1, Attr: 0-dgraph.type, Data key

This tells you that the key
000000000000000000000b6467726170682e74797065000000000000000001
is for the predicate `0-dgraph.type` and the UID `1`.

Docs PR: https://github.com/dgraph-io/dgraph-docs/pull/636

Co-authored-by: Aman Mangal <aman@dgraph.io>

28 of 28 new or added lines in 1 file covered. (100.0%)

58199 of 87176 relevant lines covered (66.76%)

2220636.94 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

4.34
/dgraph/cmd/debug/run.go
1
/*
2
 * Copyright 2023 Dgraph Labs, Inc. and Contributors
3
 *
4
 * Licensed under the Apache License, Version 2.0 (the "License");
5
 * you may not use this file except in compliance with the License.
6
 * You may obtain a copy of the License at
7
 *
8
 *     http://www.apache.org/licenses/LICENSE-2.0
9
 *
10
 * Unless required by applicable law or agreed to in writing, software
11
 * distributed under the License is distributed on an "AS IS" BASIS,
12
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
 * See the License for the specific language governing permissions and
14
 * limitations under the License.
15
 */
16

17
package debug
18

19
import (
20
        "bufio"
21
        "bytes"
22
        "context"
23
        "encoding/hex"
24
        "fmt"
25
        "io"
26
        "log"
27
        "math"
28
        "net/http"
29
        _ "net/http/pprof" // http profiler
30
        "os"
31
        "sort"
32
        "strconv"
33
        "strings"
34
        "sync/atomic"
35

36
        "github.com/dustin/go-humanize"
37
        "github.com/spf13/cobra"
38

39
        "github.com/dgraph-io/badger/v4"
40
        bpb "github.com/dgraph-io/badger/v4/pb"
41
        "github.com/dgraph-io/dgraph/codec"
42
        "github.com/dgraph-io/dgraph/ee"
43
        "github.com/dgraph-io/dgraph/posting"
44
        "github.com/dgraph-io/dgraph/protos/pb"
45
        "github.com/dgraph-io/dgraph/raftwal"
46
        "github.com/dgraph-io/dgraph/types"
47
        "github.com/dgraph-io/dgraph/x"
48
        "github.com/dgraph-io/ristretto/z"
49
)
50

51
var (
52
        // Debug is the sub-command invoked when calling "dgraph debug"
53
        Debug x.SubCommand
54
        opt   flagOptions
55
)
56

57
type flagOptions struct {
58
        vals          bool
59
        keyLookup     string
60
        rollupKey     string
61
        keyHistory    bool
62
        predicate     string
63
        prefix        string
64
        readOnly      bool
65
        pdir          string
66
        itemMeta      bool
67
        jepsen        string
68
        readTs        uint64
69
        sizeHistogram bool
70
        noKeys        bool
71
        key           x.Sensitive
72
        onlySummary   bool
73
        parseKey      string
74

75
        // Options related to the WAL.
76
        wdir           string
77
        wtruncateUntil uint64
78
        wsetSnapshot   string
79
}
80

81
func init() {
159✔
82
        Debug.Cmd = &cobra.Command{
159✔
83
                Use:   "debug",
159✔
84
                Short: "Debug Dgraph instance",
159✔
85
                Run: func(cmd *cobra.Command, args []string) {
159✔
86
                        run()
×
87
                },
×
88
                Annotations: map[string]string{"group": "debug"},
89
        }
90
        Debug.Cmd.SetHelpTemplate(x.NonRootTemplate)
159✔
91

159✔
92
        flag := Debug.Cmd.Flags()
159✔
93
        flag.BoolVar(&opt.itemMeta, "item", true, "Output item meta as well. Set to false for diffs.")
159✔
94
        flag.BoolVar(&opt.vals, "vals", false, "Output values along with keys.")
159✔
95
        flag.BoolVar(&opt.noKeys, "nokeys", false,
159✔
96
                "Ignore key_. Only consider amount when calculating total.")
159✔
97
        flag.StringVar(&opt.jepsen, "jepsen", "", "Disect Jepsen output. Can be linear/binary.")
159✔
98
        flag.Uint64Var(&opt.readTs, "at", math.MaxUint64, "Set read timestamp for all txns.")
159✔
99
        flag.BoolVarP(&opt.readOnly, "readonly", "o", true, "Open in read only mode.")
159✔
100
        flag.StringVarP(&opt.predicate, "pred", "r", "", "Only output specified predicate.")
159✔
101
        flag.StringVarP(&opt.prefix, "prefix", "", "", "Uses a hex prefix.")
159✔
102
        flag.StringVarP(&opt.keyLookup, "lookup", "l", "", "Hex of key to lookup.")
159✔
103
        flag.StringVar(&opt.rollupKey, "rollup", "", "Hex of key to rollup.")
159✔
104
        flag.BoolVarP(&opt.keyHistory, "history", "y", false, "Show all versions of a key.")
159✔
105
        flag.StringVarP(&opt.pdir, "postings", "p", "", "Directory where posting lists are stored.")
159✔
106
        flag.BoolVar(&opt.sizeHistogram, "histogram", false,
159✔
107
                "Show a histogram of the key and value sizes.")
159✔
108
        flag.BoolVar(&opt.onlySummary, "only-summary", false,
159✔
109
                "If true, only show the summary of the p directory.")
159✔
110

159✔
111
        // Flags related to WAL.
159✔
112
        flag.StringVarP(&opt.wdir, "wal", "w", "", "Directory where Raft write-ahead logs are stored.")
159✔
113
        flag.Uint64VarP(&opt.wtruncateUntil, "truncate", "t", 0,
159✔
114
                "Remove data from Raft entries until but not including this index.")
159✔
115
        flag.StringVarP(&opt.wsetSnapshot, "snap", "s", "",
159✔
116
                "Set snapshot term,index,readts to this. Value must be comma-separated list containing"+
159✔
117
                        " the value for these vars in that order.")
159✔
118
        flag.StringVar(&opt.parseKey, "parse_key", "", "Parse hex key.")
159✔
119
        ee.RegisterEncFlag(flag)
159✔
120
}
121

122
func toInt(o *pb.Posting) int {
×
123
        from := types.Val{
×
124
                Tid:   types.TypeID(o.ValType),
×
125
                Value: o.Value,
×
126
        }
×
127
        out, err := types.Convert(from, types.StringID)
×
128
        x.Check(err)
×
129
        val := out.Value.(string)
×
130
        a, err := strconv.Atoi(val)
×
131
        if err != nil {
×
132
                return 0
×
133
        }
×
134
        return a
×
135
}
136

137
func uidToVal(itr *badger.Iterator, prefix string) map[uint64]int {
×
138
        keys := make(map[uint64]int)
×
139
        var lastKey []byte
×
140
        for itr.Rewind(); itr.Valid(); {
×
141
                item := itr.Item()
×
142
                if bytes.Equal(lastKey, item.Key()) {
×
143
                        itr.Next()
×
144
                        continue
×
145
                }
146
                lastKey = append(lastKey[:0], item.Key()...)
×
147
                pk, err := x.Parse(item.Key())
×
148
                x.Check(err)
×
149
                if !pk.IsData() || !strings.HasPrefix(x.ParseAttr(pk.Attr), prefix) {
×
150
                        continue
×
151
                }
152
                if pk.IsSchema() {
×
153
                        continue
×
154
                }
155
                if pk.StartUid > 0 {
×
156
                        // This key is part of a multi-part posting list. Skip it and only read
×
157
                        // the main key, which is the entry point to read the whole list.
×
158
                        continue
×
159
                }
160

161
                pl, err := posting.ReadPostingList(item.KeyCopy(nil), itr)
×
162
                if err != nil {
×
163
                        log.Fatalf("Unable to read posting list: %v", err)
×
164
                }
×
165
                err = pl.Iterate(math.MaxUint64, 0, func(o *pb.Posting) error {
×
166
                        from := types.Val{
×
167
                                Tid:   types.TypeID(o.ValType),
×
168
                                Value: o.Value,
×
169
                        }
×
170
                        out, err := types.Convert(from, types.StringID)
×
171
                        x.Check(err)
×
172
                        key := out.Value.(string)
×
173
                        k, err := strconv.Atoi(key)
×
174
                        x.Check(err)
×
175
                        keys[pk.Uid] = k
×
176
                        // fmt.Printf("Type: %v Uid=%d key=%s. commit=%d hex %x\n",
×
177
                        //         o.ValType, pk.Uid, key, o.CommitTs, lastKey)
×
178
                        return nil
×
179
                })
×
180
                x.Checkf(err, "during iterate")
×
181
        }
182
        return keys
×
183
}
184

185
func seekTotal(db *badger.DB, readTs uint64) int {
×
186
        txn := db.NewTransactionAt(readTs, false)
×
187
        defer txn.Discard()
×
188

×
189
        iopt := badger.DefaultIteratorOptions
×
190
        iopt.AllVersions = true
×
191
        iopt.PrefetchValues = false
×
192
        itr := txn.NewIterator(iopt)
×
193
        defer itr.Close()
×
194

×
195
        keys := uidToVal(itr, "key_")
×
196
        fmt.Printf("Got keys: %+v\n", keys)
×
197
        vals := uidToVal(itr, "amount_")
×
198
        var total int
×
199
        for _, val := range vals {
×
200
                total += val
×
201
        }
×
202
        fmt.Printf("Got vals: %+v. Total: %d\n", vals, total)
×
203
        if opt.noKeys {
×
204
                // Ignore the key_ predicate. Only consider the amount_ predicate. Useful when tablets are
×
205
                // being moved around.
×
206
                keys = vals
×
207
        }
×
208

209
        total = 0
×
210
        for uid, key := range keys {
×
211
                a := vals[uid]
×
212
                fmt.Printf("uid: %-5d %x key: %d amount: %d\n", uid, uid, key, a)
×
213
                total += a
×
214
        }
×
215
        fmt.Printf("Total @ %d = %d\n", readTs, total)
×
216
        return total
×
217
}
218

219
func findFirstValidTxn(db *badger.DB) uint64 {
×
220
        readTs := opt.readTs
×
221
        var wrong uint64
×
222
        for {
×
223
                min, max := getMinMax(db, readTs-1)
×
224
                if max <= min {
×
225
                        fmt.Printf("Can't find it. Max: %d\n", max)
×
226
                        return 0
×
227
                }
×
228
                readTs = max
×
229
                if total := seekTotal(db, readTs); total != 100 {
×
230
                        fmt.Printf("===> VIOLATION at ts: %d\n", readTs)
×
231
                        showAllPostingsAt(db, readTs)
×
232
                        wrong = readTs
×
233
                } else {
×
234
                        fmt.Printf("===> Found first correct version at %d\n", readTs)
×
235
                        showAllPostingsAt(db, readTs)
×
236
                        return wrong
×
237
                }
×
238
        }
239
}
240

241
func findFirstInvalidTxn(db *badger.DB, lowTs, highTs uint64) uint64 {
×
242
        fmt.Println()
×
243
        if highTs-lowTs < 1 {
×
244
                fmt.Printf("Checking at lowTs: %d\n", lowTs)
×
245
                if total := seekTotal(db, lowTs); total != 100 {
×
246
                        fmt.Printf("==> VIOLATION at ts: %d\n", lowTs)
×
247
                        return lowTs
×
248
                }
×
249
                fmt.Printf("No violation found at ts: %d\n", lowTs)
×
250
                return 0
×
251
        }
252

253
        midTs := (lowTs + highTs) / 2
×
254
        fmt.Printf("Checking. low=%d. high=%d. mid=%d\n", lowTs, highTs, midTs)
×
255
        if total := seekTotal(db, midTs); total == 100 {
×
256
                // If no failure, move to higher ts.
×
257
                return findFirstInvalidTxn(db, midTs+1, highTs)
×
258
        }
×
259
        // Found an error.
260
        return findFirstInvalidTxn(db, lowTs, midTs)
×
261
}
262

263
func showAllPostingsAt(db *badger.DB, readTs uint64) {
×
264
        txn := db.NewTransactionAt(readTs, false)
×
265
        defer txn.Discard()
×
266

×
267
        itr := txn.NewIterator(badger.DefaultIteratorOptions)
×
268
        defer itr.Close()
×
269

×
270
        type account struct {
×
271
                Key int
×
272
                Amt int
×
273
        }
×
274
        keys := make(map[uint64]*account)
×
275

×
276
        var buf bytes.Buffer
×
277
        fmt.Fprintf(&buf, "SHOWING all postings at %d\n", readTs)
×
278
        for itr.Rewind(); itr.Valid(); itr.Next() {
×
279
                item := itr.Item()
×
280
                if item.Version() != readTs {
×
281
                        continue
×
282
                }
283

284
                pk, err := x.Parse(item.Key())
×
285
                x.Check(err)
×
286
                if !pk.IsData() {
×
287
                        continue
×
288
                }
289

290
                var acc *account
×
291
                attr := x.ParseAttr(pk.Attr)
×
292
                if strings.HasPrefix(attr, "key_") || strings.HasPrefix(attr, "amount_") {
×
293
                        var has bool
×
294
                        acc, has = keys[pk.Uid]
×
295
                        if !has {
×
296
                                acc = &account{}
×
297
                                keys[pk.Uid] = acc
×
298
                        }
×
299
                }
300
                fmt.Fprintf(&buf, "  key: %+v hex: %x\n", pk, item.Key())
×
301
                val, err := item.ValueCopy(nil)
×
302
                x.Check(err)
×
303
                var plist pb.PostingList
×
304
                x.Check(plist.Unmarshal(val))
×
305

×
306
                x.AssertTrue(len(plist.Postings) <= 1)
×
307
                var num int
×
308
                for _, p := range plist.Postings {
×
309
                        num = toInt(p)
×
310
                        appendPosting(&buf, p)
×
311
                }
×
312
                if num > 0 && acc != nil {
×
313
                        switch {
×
314
                        case strings.HasPrefix(attr, "key_"):
×
315
                                acc.Key = num
×
316
                        case strings.HasPrefix(attr, "amount_"):
×
317
                                acc.Amt = num
×
318
                        }
319
                }
320
        }
321
        for uid, acc := range keys {
×
322
                fmt.Fprintf(&buf, "Uid: %d %x Key: %d Amount: %d\n", uid, uid, acc.Key, acc.Amt)
×
323
        }
×
324
        fmt.Println(buf.String())
×
325
}
326

327
func getMinMax(db *badger.DB, readTs uint64) (uint64, uint64) {
×
328
        var min, max uint64 = math.MaxUint64, 0
×
329
        txn := db.NewTransactionAt(readTs, false)
×
330
        defer txn.Discard()
×
331

×
332
        iopt := badger.DefaultIteratorOptions
×
333
        iopt.AllVersions = true
×
334
        itr := txn.NewIterator(iopt)
×
335
        defer itr.Close()
×
336

×
337
        for itr.Rewind(); itr.Valid(); itr.Next() {
×
338
                item := itr.Item()
×
339
                if min > item.Version() {
×
340
                        min = item.Version()
×
341
                }
×
342
                if max < item.Version() {
×
343
                        max = item.Version()
×
344
                }
×
345
        }
346
        return min, max
×
347
}
348

349
func jepsen(db *badger.DB) {
×
350
        min, max := getMinMax(db, opt.readTs)
×
351
        fmt.Printf("min=%d. max=%d\n", min, max)
×
352

×
353
        var ts uint64
×
354
        switch opt.jepsen {
×
355
        case "binary":
×
356
                ts = findFirstInvalidTxn(db, min, max)
×
357
        case "linear":
×
358
                ts = findFirstValidTxn(db)
×
359
        }
360
        fmt.Println()
×
361
        if ts == 0 {
×
362
                fmt.Println("Nothing found. Exiting.")
×
363
                return
×
364
        }
×
365
        showAllPostingsAt(db, ts)
×
366
        seekTotal(db, ts-1)
×
367

×
368
        for i := 0; i < 5; i++ {
×
369
                // Get a few previous commits.
×
370
                _, ts = getMinMax(db, ts-1)
×
371
                showAllPostingsAt(db, ts)
×
372
                seekTotal(db, ts-1)
×
373
        }
×
374
}
375

376
func history(lookup []byte, itr *badger.Iterator) {
×
377
        var buf bytes.Buffer
×
378
        pk, err := x.Parse(lookup)
×
379
        x.Check(err)
×
380
        fmt.Fprintf(&buf, "==> key: %x. PK: %+v\n", lookup, pk)
×
381
        for ; itr.Valid(); itr.Next() {
×
382
                item := itr.Item()
×
383
                if !bytes.Equal(item.Key(), lookup) {
×
384
                        break
×
385
                }
386

387
                fmt.Fprintf(&buf, "ts: %d", item.Version())
×
388
                x.Check2(buf.WriteString(" {item}"))
×
389
                if item.IsDeletedOrExpired() {
×
390
                        x.Check2(buf.WriteString("{deleted}"))
×
391
                }
×
392
                if item.DiscardEarlierVersions() {
×
393
                        x.Check2(buf.WriteString("{discard}"))
×
394
                }
×
395
                val, err := item.ValueCopy(nil)
×
396
                x.Check(err)
×
397

×
398
                meta := item.UserMeta()
×
399
                if meta&posting.BitCompletePosting > 0 {
×
400
                        x.Check2(buf.WriteString("{complete}"))
×
401
                }
×
402
                if meta&posting.BitDeltaPosting > 0 {
×
403
                        x.Check2(buf.WriteString("{delta}"))
×
404
                }
×
405
                if meta&posting.BitEmptyPosting > 0 {
×
406
                        x.Check2(buf.WriteString("{empty}"))
×
407
                }
×
408
                fmt.Fprintln(&buf)
×
409
                if meta&posting.BitDeltaPosting > 0 {
×
410
                        plist := &pb.PostingList{}
×
411
                        x.Check(plist.Unmarshal(val))
×
412
                        for _, p := range plist.Postings {
×
413
                                appendPosting(&buf, p)
×
414
                        }
×
415
                }
416
                if meta&posting.BitCompletePosting > 0 {
×
417
                        var plist pb.PostingList
×
418
                        x.Check(plist.Unmarshal(val))
×
419

×
420
                        for _, p := range plist.Postings {
×
421
                                appendPosting(&buf, p)
×
422
                        }
×
423

424
                        fmt.Fprintf(&buf, " Num uids = %d. Size = %d\n",
×
425
                                codec.ExactLen(plist.Pack), plist.Pack.Size())
×
426
                        dec := codec.Decoder{Pack: plist.Pack}
×
427
                        for uids := dec.Seek(0, codec.SeekStart); len(uids) > 0; uids = dec.Next() {
×
428
                                for _, uid := range uids {
×
429
                                        fmt.Fprintf(&buf, " Uid = %d\n", uid)
×
430
                                }
×
431
                        }
432
                }
433
                x.Check2(buf.WriteString("\n"))
×
434
        }
435
        fmt.Println(buf.String())
×
436
}
437

438
func appendPosting(w io.Writer, o *pb.Posting) {
×
439
        fmt.Fprintf(w, " Uid: %d Op: %d ", o.Uid, o.Op)
×
440

×
441
        if len(o.Value) > 0 {
×
442
                fmt.Fprintf(w, " Type: %v. ", o.ValType)
×
443
                from := types.Val{
×
444
                        Tid:   types.TypeID(o.ValType),
×
445
                        Value: o.Value,
×
446
                }
×
447
                out, err := types.Convert(from, types.StringID)
×
448
                if err != nil {
×
449
                        fmt.Fprintf(w, " Value: %q Error: %v", o.Value, err)
×
450
                } else {
×
451
                        fmt.Fprintf(w, " String Value: %q", out.Value)
×
452
                }
×
453
        }
454
        fmt.Fprintln(w, "")
×
455
}
456
func rollupKey(db *badger.DB) {
×
457
        txn := db.NewTransactionAt(opt.readTs, false)
×
458
        defer txn.Discard()
×
459

×
460
        key, err := hex.DecodeString(opt.rollupKey)
×
461
        x.Check(err)
×
462

×
463
        iopts := badger.DefaultIteratorOptions
×
464
        iopts.AllVersions = true
×
465
        iopts.PrefetchValues = false
×
466
        itr := txn.NewKeyIterator(key, iopts)
×
467
        defer itr.Close()
×
468

×
469
        itr.Rewind()
×
470
        if !itr.Valid() {
×
471
                log.Fatalf("Unable to seek to key: %s", hex.Dump(key))
×
472
        }
×
473

474
        item := itr.Item()
×
475
        // Don't need to do anything if the bitdelta is not set.
×
476
        if item.UserMeta()&posting.BitDeltaPosting == 0 {
×
477
                fmt.Printf("First item has UserMeta:[b%04b]. Nothing to do\n", item.UserMeta())
×
478
                return
×
479
        }
×
480
        pl, err := posting.ReadPostingList(item.KeyCopy(nil), itr)
×
481
        x.Check(err)
×
482

×
483
        alloc := z.NewAllocator(32<<20, "Debug.RollupKey")
×
484
        defer alloc.Release()
×
485

×
486
        // Setting kvs at their original value as we can't give a new timestamp in debug mode.
×
487
        kvs, err := pl.Rollup(alloc, math.MaxUint64)
×
488
        x.Check(err)
×
489

×
490
        wb := db.NewManagedWriteBatch()
×
491
        x.Check(wb.WriteList(&bpb.KVList{Kv: kvs}))
×
492
        x.Check(wb.Flush())
×
493
}
494

495
func lookup(db *badger.DB) {
×
496
        txn := db.NewTransactionAt(opt.readTs, false)
×
497
        defer txn.Discard()
×
498

×
499
        iopts := badger.DefaultIteratorOptions
×
500
        iopts.AllVersions = true
×
501
        iopts.PrefetchValues = false
×
502
        itr := txn.NewIterator(iopts)
×
503
        defer itr.Close()
×
504

×
505
        key, err := hex.DecodeString(opt.keyLookup)
×
506
        if err != nil {
×
507
                log.Fatal(err)
×
508
        }
×
509
        itr.Seek(key)
×
510
        if !itr.Valid() {
×
511
                log.Fatalf("Unable to seek to key: %s", hex.Dump(key))
×
512
        }
×
513

514
        if opt.keyHistory {
×
515
                history(key, itr)
×
516
                return
×
517
        }
×
518

519
        var buf bytes.Buffer
×
520
        item := itr.Item()
×
521
        if item.UserMeta()&posting.BitSchemaPosting > 0 {
×
522
                // Schema is stored as pb.SchemaUpdate, we should not try to read it as a posting list
×
523
                fmt.Fprintf(&buf, "Key: %x\n", item.Key())
×
524
                schemaBytes, err := item.ValueCopy(nil)
×
525
                x.Check(err)
×
526

×
527
                var s pb.SchemaUpdate
×
528
                x.Check(s.Unmarshal(schemaBytes))
×
529
                fmt.Fprintf(&buf, "Value: %+v\n", s)
×
530
        } else {
×
531
                fmt.Fprintf(&buf, "Key: %x", item.Key())
×
532
                pl, err := posting.ReadPostingList(item.KeyCopy(nil), itr)
×
533
                if err != nil {
×
534
                        log.Fatal(err)
×
535
                }
×
536
                fmt.Fprintf(&buf, " Length: %d", pl.Length(math.MaxUint64, 0))
×
537

×
538
                splits := pl.PartSplits()
×
539
                isMultiPart := len(splits) > 0
×
540
                fmt.Fprintf(&buf, " Is multi-part list? %v", isMultiPart)
×
541
                if isMultiPart {
×
542
                        fmt.Fprintf(&buf, " Start UID of parts: %v\n", splits)
×
543
                }
×
544

545
                err = pl.Iterate(math.MaxUint64, 0, func(o *pb.Posting) error {
×
546
                        appendPosting(&buf, o)
×
547
                        return nil
×
548
                })
×
549
                if err != nil {
×
550
                        log.Fatal(err)
×
551
                }
×
552
        }
553
        fmt.Println(buf.String())
×
554
}
555

556
// Current format is like:
557
// {i} attr: name term: [8] woods  ts: 535 item: [28, b0100] sz: 81 dcnt: 3 key: 00000...6f6f6473
558
// Fix the TestBulkLoadMultiShard accordingly, if the format changes.
559
func printKeys(db *badger.DB) {
×
560
        var prefix []byte
×
561
        if len(opt.predicate) > 0 {
×
562
                prefix = x.PredicatePrefix(opt.predicate)
×
563
        } else if len(opt.prefix) > 0 {
×
564
                p, err := hex.DecodeString(opt.prefix)
×
565
                x.Check(err)
×
566
                prefix = p
×
567
        }
×
568
        fmt.Printf("prefix = %s\n", hex.Dump(prefix))
×
569
        stream := db.NewStreamAt(opt.readTs)
×
570
        stream.Prefix = prefix
×
571
        var total uint64
×
572
        stream.KeyToList = func(key []byte, itr *badger.Iterator) (*bpb.KVList, error) {
×
573
                item := itr.Item()
×
574
                pk, err := x.Parse(key)
×
575
                x.Check(err)
×
576

×
577
                var buf bytes.Buffer
×
578
                // Don't use a switch case here. Because multiple of these can be true. In particular,
×
579
                // IsSchema can be true alongside IsData.
×
580
                if pk.IsData() {
×
581
                        x.Check2(buf.WriteString("{d}"))
×
582
                }
×
583
                if pk.IsIndex() {
×
584
                        x.Check2(buf.WriteString("{i}"))
×
585
                }
×
586
                if pk.IsCountOrCountRev() {
×
587
                        x.Check2(buf.WriteString("{c}"))
×
588
                }
×
589
                if pk.IsSchema() {
×
590
                        x.Check2(buf.WriteString("{s}"))
×
591
                }
×
592
                if pk.IsReverse() {
×
593
                        x.Check2(buf.WriteString("{r}"))
×
594
                }
×
595
                ns, attr := x.ParseNamespaceAttr(pk.Attr)
×
596
                x.Check2(buf.WriteString(fmt.Sprintf(" ns: %#x ", ns)))
×
597
                x.Check2(buf.WriteString(" attr: " + attr))
×
598
                if len(pk.Term) > 0 {
×
599
                        fmt.Fprintf(&buf, " term: [%d] [%v] ", pk.Term[0], pk.Term[1:])
×
600
                }
×
601
                if pk.Uid > 0 {
×
602
                        fmt.Fprintf(&buf, " uid: %d ", pk.Uid)
×
603
                }
×
604
                if pk.StartUid > 0 {
×
605
                        fmt.Fprintf(&buf, " startUid: %d ", pk.StartUid)
×
606
                }
×
607

608
                if opt.itemMeta {
×
609
                        fmt.Fprintf(&buf, " ts: %d", item.Version())
×
610
                        fmt.Fprintf(&buf, " item: [%d, b%04b]", item.EstimatedSize(), item.UserMeta())
×
611
                }
×
612

613
                var sz, deltaCount int64
×
614
        LOOP:
×
615
                for ; itr.ValidForPrefix(prefix); itr.Next() {
×
616
                        item := itr.Item()
×
617
                        if !bytes.Equal(item.Key(), key) {
×
618
                                break
×
619
                        }
620
                        if item.IsDeletedOrExpired() {
×
621
                                x.Check2(buf.WriteString(" {v.del}"))
×
622
                                break
×
623
                        }
624
                        switch item.UserMeta() {
×
625
                        // This is rather a default case as one of the 4 bit must be set.
626
                        case posting.BitCompletePosting, posting.BitEmptyPosting, posting.BitSchemaPosting:
×
627
                                sz += item.EstimatedSize()
×
628
                                break LOOP
×
629
                        case posting.BitDeltaPosting:
×
630
                                sz += item.EstimatedSize()
×
631
                                deltaCount++
×
632
                        default:
×
633
                                fmt.Printf("No user meta found for key: %s\n", hex.EncodeToString(key))
×
634
                        }
635
                        if item.DiscardEarlierVersions() {
×
636
                                x.Check2(buf.WriteString(" {v.las}"))
×
637
                                break
×
638
                        }
639
                }
640
                var invalidSz, invalidCount uint64
×
641
                // skip all the versions of key
×
642
                for ; itr.ValidForPrefix(prefix); itr.Next() {
×
643
                        item := itr.Item()
×
644
                        if !bytes.Equal(item.Key(), key) {
×
645
                                break
×
646
                        }
647
                        invalidSz += uint64(item.EstimatedSize())
×
648
                        invalidCount++
×
649
                }
650

651
                fmt.Fprintf(&buf, " sz: %d dcnt: %d", sz, deltaCount)
×
652
                if invalidCount > 0 {
×
653
                        fmt.Fprintf(&buf, " isz: %d icount: %d", invalidSz, invalidCount)
×
654
                }
×
655
                fmt.Fprintf(&buf, " key: %s", hex.EncodeToString(key))
×
656
                // If total size is more than 1 GB or we have more than 1 million keys, flag this key.
×
657
                if uint64(sz)+invalidSz > (1<<30) || uint64(deltaCount)+invalidCount > 10e6 {
×
658
                        fmt.Fprintf(&buf, " [HEAVY]")
×
659
                }
×
660
                buf.WriteRune('\n')
×
661
                list := &bpb.KVList{}
×
662
                list.Kv = append(list.Kv, &bpb.KV{
×
663
                        Value: buf.Bytes(),
×
664
                })
×
665
                // Don't call fmt.Println here. It is much slower.
×
666
                return list, nil
×
667
        }
668

669
        w := bufio.NewWriterSize(os.Stdout, 16<<20)
×
670
        stream.Send = func(buf *z.Buffer) error {
×
671
                var count int
×
672
                err := buf.SliceIterate(func(s []byte) error {
×
673
                        var kv bpb.KV
×
674
                        if err := kv.Unmarshal(s); err != nil {
×
675
                                return err
×
676
                        }
×
677
                        x.Check2(w.Write(kv.Value))
×
678
                        count++
×
679
                        return nil
×
680
                })
681
                atomic.AddUint64(&total, uint64(count))
×
682
                return err
×
683
        }
684
        x.Check(stream.Orchestrate(context.Background()))
×
685
        x.Check(w.Flush())
×
686
        fmt.Println()
×
687
        fmt.Printf("Found %d keys\n", atomic.LoadUint64(&total))
×
688
}
689

690
// Creates bounds for an histogram. The bounds are powers of two of the form
691
// [2^min_exponent, ..., 2^max_exponent].
692
func getHistogramBounds(minExponent, maxExponent uint32) []float64 {
×
693
        var bounds []float64
×
694
        for i := minExponent; i <= maxExponent; i++ {
×
695
                bounds = append(bounds, float64(int(1)<<i))
×
696
        }
×
697
        return bounds
×
698
}
699

700
// HistogramData stores the information needed to represent the sizes of the keys and values
701
// as a histogram.
702
type HistogramData struct {
703
        Bounds         []float64
704
        Count          int64
705
        CountPerBucket []int64
706
        Min            int64
707
        Max            int64
708
        Sum            int64
709
}
710

711
// NewHistogramData returns a new instance of HistogramData with properly initialized fields.
712
func NewHistogramData(bounds []float64) *HistogramData {
×
713
        return &HistogramData{
×
714
                Bounds:         bounds,
×
715
                CountPerBucket: make([]int64, len(bounds)+1),
×
716
                Max:            0,
×
717
                Min:            math.MaxInt64,
×
718
        }
×
719
}
×
720

721
// Update changes the Min and Max fields if value is less than or greater than the current values.
722
func (histogram *HistogramData) Update(value int64) {
×
723
        if value > histogram.Max {
×
724
                histogram.Max = value
×
725
        }
×
726
        if value < histogram.Min {
×
727
                histogram.Min = value
×
728
        }
×
729

730
        histogram.Sum += value
×
731
        histogram.Count++
×
732

×
733
        for index := 0; index <= len(histogram.Bounds); index++ {
×
734
                // Allocate value in the last buckets if we reached the end of the Bounds array.
×
735
                if index == len(histogram.Bounds) {
×
736
                        histogram.CountPerBucket[index]++
×
737
                        break
×
738
                }
739

740
                if value < int64(histogram.Bounds[index]) {
×
741
                        histogram.CountPerBucket[index]++
×
742
                        break
×
743
                }
744
        }
745
}
746

747
// PrintHistogram prints the histogram data in a human-readable format.
748
func (histogram *HistogramData) PrintHistogram() {
×
749
        if histogram == nil {
×
750
                return
×
751
        }
×
752

753
        fmt.Printf("Min value: %d\n", histogram.Min)
×
754
        fmt.Printf("Max value: %d\n", histogram.Max)
×
755
        fmt.Printf("Mean: %.2f\n", float64(histogram.Sum)/float64(histogram.Count))
×
756
        fmt.Printf("%24s %9s\n", "Range", "Count")
×
757

×
758
        numBounds := len(histogram.Bounds)
×
759
        for index, count := range histogram.CountPerBucket {
×
760
                if count == 0 {
×
761
                        continue
×
762
                }
763

764
                // The last bucket represents the bucket that contains the range from
765
                // the last bound up to infinity so it's processed differently than the
766
                // other buckets.
767
                if index == len(histogram.CountPerBucket)-1 {
×
768
                        lowerBound := int(histogram.Bounds[numBounds-1])
×
769
                        fmt.Printf("[%10d, %10s) %9d\n", lowerBound, "infinity", count)
×
770
                        continue
×
771
                }
772

773
                upperBound := int(histogram.Bounds[index])
×
774
                lowerBound := 0
×
775
                if index > 0 {
×
776
                        lowerBound = int(histogram.Bounds[index-1])
×
777
                }
×
778

779
                fmt.Printf("[%10d, %10d) %9d\n", lowerBound, upperBound, count)
×
780
        }
781
}
782

783
func sizeHistogram(db *badger.DB) {
×
784
        txn := db.NewTransactionAt(opt.readTs, false)
×
785
        defer txn.Discard()
×
786

×
787
        iopts := badger.DefaultIteratorOptions
×
788
        iopts.PrefetchValues = false
×
789
        itr := txn.NewIterator(iopts)
×
790
        defer itr.Close()
×
791

×
792
        // Generate distribution bounds. Key sizes are not greater than 2^16 while
×
793
        // value sizes are not greater than 1GB (2^30).
×
794
        keyBounds := getHistogramBounds(5, 16)
×
795
        valueBounds := getHistogramBounds(5, 30)
×
796

×
797
        // Initialize exporter.
×
798
        keySizeHistogram := NewHistogramData(keyBounds)
×
799
        valueSizeHistogram := NewHistogramData(valueBounds)
×
800

×
801
        // Collect key and value sizes.
×
802
        var prefix []byte
×
803
        if len(opt.predicate) > 0 {
×
804
                prefix = x.PredicatePrefix(opt.predicate)
×
805
        }
×
806
        var loop int
×
807
        for itr.Seek(prefix); itr.ValidForPrefix(prefix); itr.Next() {
×
808
                item := itr.Item()
×
809

×
810
                keySizeHistogram.Update(int64(len(item.Key())))
×
811
                valueSizeHistogram.Update(item.ValueSize())
×
812

×
813
                loop++
×
814
        }
×
815

816
        fmt.Printf("prefix = %s\n", hex.Dump(prefix))
×
817
        fmt.Printf("Found %d keys\n", loop)
×
818
        fmt.Printf("\nHistogram of key sizes (in bytes)\n")
×
819
        keySizeHistogram.PrintHistogram()
×
820
        fmt.Printf("\nHistogram of value sizes (in bytes)\n")
×
821
        valueSizeHistogram.PrintHistogram()
×
822
}
823

824
func printAlphaProposal(buf *bytes.Buffer, pr *pb.Proposal, pending map[uint64]bool) {
×
825
        if pr == nil {
×
826
                return
×
827
        }
×
828

829
        switch {
×
830
        case pr.Mutations != nil:
×
831
                fmt.Fprintf(buf, " Mutation . StartTs: %d . Edges: %d .",
×
832
                        pr.Mutations.StartTs, len(pr.Mutations.Edges))
×
833
                if len(pr.Mutations.Edges) > 0 {
×
834
                        pending[pr.Mutations.StartTs] = true
×
835
                } else {
×
836
                        fmt.Fprintf(buf, " Mutation: %+v .", pr.Mutations)
×
837
                }
×
838
                fmt.Fprintf(buf, " Pending txns: %d .", len(pending))
×
839
        case len(pr.Kv) > 0:
×
840
                fmt.Fprintf(buf, " KV . Size: %d ", len(pr.Kv))
×
841
        case pr.State != nil:
×
842
                fmt.Fprintf(buf, " State . %+v ", pr.State)
×
843
        case pr.Delta != nil:
×
844
                fmt.Fprintf(buf, " Delta .")
×
845
                sort.Slice(pr.Delta.Txns, func(i, j int) bool {
×
846
                        ti := pr.Delta.Txns[i]
×
847
                        tj := pr.Delta.Txns[j]
×
848
                        return ti.StartTs < tj.StartTs
×
849
                })
×
850
                fmt.Fprintf(buf, " Max: %d .", pr.Delta.GetMaxAssigned())
×
851
                for _, txn := range pr.Delta.Txns {
×
852
                        delete(pending, txn.StartTs)
×
853
                }
×
854
                // There could be many thousands of txns within a single delta. We
855
                // don't need to print out every single entry, so just show the
856
                // first 10.
857
                if len(pr.Delta.Txns) >= 10 {
×
858
                        fmt.Fprintf(buf, " Num txns: %d .", len(pr.Delta.Txns))
×
859
                        pr.Delta.Txns = pr.Delta.Txns[:10]
×
860
                }
×
861
                for _, txn := range pr.Delta.Txns {
×
862
                        fmt.Fprintf(buf, " %d → %d .", txn.StartTs, txn.CommitTs)
×
863
                }
×
864
                fmt.Fprintf(buf, " Pending txns: %d .", len(pending))
×
865
        case pr.Snapshot != nil:
×
866
                fmt.Fprintf(buf, " Snapshot . %+v ", pr.Snapshot)
×
867
        }
868
}
869

870
func printZeroProposal(buf *bytes.Buffer, zpr *pb.ZeroProposal) {
×
871
        if zpr == nil {
×
872
                return
×
873
        }
×
874

875
        switch {
×
876
        case len(zpr.SnapshotTs) > 0:
×
877
                fmt.Fprintf(buf, " Snapshot: %+v .", zpr.SnapshotTs)
×
878
        case zpr.Member != nil:
×
879
                fmt.Fprintf(buf, " Member: %+v .", zpr.Member)
×
880
        case zpr.Tablet != nil:
×
881
                fmt.Fprintf(buf, " Tablet: %+v .", zpr.Tablet)
×
882
        case zpr.MaxUID > 0:
×
883
                fmt.Fprintf(buf, " MaxUID: %d .", zpr.MaxUID)
×
884
        case zpr.MaxNsID > 0:
×
885
                fmt.Fprintf(buf, " MaxNsID: %d .", zpr.MaxNsID)
×
886
        case zpr.MaxRaftId > 0:
×
887
                fmt.Fprintf(buf, " MaxRaftId: %d .", zpr.MaxRaftId)
×
888
        case zpr.MaxTxnTs > 0:
×
889
                fmt.Fprintf(buf, " MaxTxnTs: %d .", zpr.MaxTxnTs)
×
890
        case zpr.Txn != nil:
×
891
                txn := zpr.Txn
×
892
                fmt.Fprintf(buf, " Txn %d → %d .", txn.StartTs, txn.CommitTs)
×
893
        default:
×
894
                fmt.Fprintf(buf, " Proposal: %+v .", zpr)
×
895
        }
896
}
897

898
func printSummary(db *badger.DB) {
×
899
        nsFromKey := func(key []byte) uint64 {
×
900
                pk, err := x.Parse(key)
×
901
                if err != nil {
×
902
                        // Some of the keys are badger's internal and couldn't be parsed.
×
903
                        // Hence, the error is expected in that case.
×
904
                        fmt.Printf("Unable to parse key: %#x\n", key)
×
905
                        return x.GalaxyNamespace
×
906
                }
×
907
                return x.ParseNamespace(pk.Attr)
×
908
        }
909
        banned := db.BannedNamespaces()
×
910
        bannedNs := make(map[uint64]struct{})
×
911
        for _, ns := range banned {
×
912
                bannedNs[ns] = struct{}{}
×
913
        }
×
914

915
        tables := db.Tables()
×
916
        levelSizes := make([]uint64, len(db.Levels()))
×
917
        nsSize := make(map[uint64]uint64)
×
918
        for _, tab := range tables {
×
919
                levelSizes[tab.Level] += uint64(tab.OnDiskSize)
×
920
                if nsFromKey(tab.Left) == nsFromKey(tab.Right) {
×
921
                        nsSize[nsFromKey(tab.Left)] += uint64(tab.OnDiskSize)
×
922
                }
×
923
        }
924

925
        fmt.Println("[SUMMARY]")
×
926
        totalSize := uint64(0)
×
927
        for i, sz := range levelSizes {
×
928
                fmt.Printf("Level %d size: %12s\n", i, humanize.IBytes(sz))
×
929
                totalSize += sz
×
930
        }
×
931
        fmt.Printf("Total SST size: %12s\n", humanize.IBytes(totalSize))
×
932
        fmt.Println()
×
933
        for ns, sz := range nsSize {
×
934
                fmt.Printf("Namespace %#x size: %12s", ns, humanize.IBytes(sz))
×
935
                if _, ok := bannedNs[ns]; ok {
×
936
                        fmt.Printf(" (banned)")
×
937
                }
×
938
                fmt.Println()
×
939
        }
940
        fmt.Println()
×
941
}
942

943
func run() {
×
944
        go func() {
×
945
                for i := 8080; i < 9080; i++ {
×
946
                        fmt.Printf("Listening for /debug HTTP requests at port: %d\n", i)
×
947
                        if err := http.ListenAndServe(fmt.Sprintf("localhost:%d", i), nil); err != nil {
×
948
                                fmt.Println("Port busy. Trying another one...")
×
949
                                continue
×
950
                        }
951
                }
952
        }()
953

954
        if opt.parseKey != "" {
×
955
                k, err := hex.DecodeString(opt.parseKey)
×
956
                if err != nil {
×
957
                        log.Fatalf("error while decoding hex key: %v\n", err)
×
958
                }
×
959
                pk, err := x.Parse(k)
×
960
                if err != nil {
×
961
                        log.Fatalf("error while parsing key: %v\n", err)
×
962
                }
×
963
                if pk.IsData() {
×
964
                        fmt.Printf("{d}")
×
965
                }
×
966
                if pk.IsIndex() {
×
967
                        fmt.Printf("{i}")
×
968
                }
×
969
                if pk.IsCountOrCountRev() {
×
970
                        fmt.Printf("{c}")
×
971
                }
×
972
                if pk.IsSchema() {
×
973
                        fmt.Printf("{s}")
×
974
                }
×
975
                if pk.IsReverse() {
×
976
                        fmt.Printf("{r}")
×
977
                }
×
978
                fmt.Printf(" Key: %+v\n", pk)
×
979
                return
×
980
        }
981

982
        var err error
×
983
        dir := opt.pdir
×
984
        isWal := false
×
985
        if len(dir) == 0 {
×
986
                dir = opt.wdir
×
987
                isWal = true
×
988
        }
×
989
        keys, err := ee.GetKeys(Debug.Conf)
×
990
        x.Check(err)
×
991
        opt.key = keys.EncKey
×
992

×
993
        if isWal {
×
994
                store, err := raftwal.InitEncrypted(dir, opt.key)
×
995
                x.Check(err)
×
996
                if err := handleWal(store); err != nil {
×
997
                        fmt.Printf("\nGot error while handling WAL: %v\n", err)
×
998
                }
×
999
                return
×
1000
        }
1001

1002
        bopts := badger.DefaultOptions(dir).
×
1003
                WithReadOnly(opt.readOnly).
×
1004
                WithEncryptionKey(opt.key).
×
1005
                WithBlockCacheSize(1 << 30).
×
1006
                WithIndexCacheSize(1 << 30).
×
1007
                WithNamespaceOffset(x.NamespaceOffset) // We don't want to see the banned data.
×
1008

×
1009
        x.AssertTruef(len(bopts.Dir) > 0, "No posting or wal dir specified.")
×
1010
        fmt.Printf("Opening DB: %s\n", bopts.Dir)
×
1011

×
1012
        db, err := badger.OpenManaged(bopts)
×
1013
        x.Check(err)
×
1014
        // Not using posting list cache
×
1015
        posting.Init(db, 0)
×
1016
        defer db.Close()
×
1017

×
1018
        printSummary(db)
×
1019
        if opt.onlySummary {
×
1020
                return
×
1021
        }
×
1022

1023
        // Commenting the following out because on large Badger DBs, this can take a LONG time.
1024
        // min, max := getMinMax(db, opt.readTs)
1025
        // fmt.Printf("Min commit: %d. Max commit: %d, w.r.t %d\n", min, max, opt.readTs)
1026

1027
        switch {
×
1028
        case len(opt.rollupKey) > 0:
×
1029
                rollupKey(db)
×
1030
        case len(opt.keyLookup) > 0:
×
1031
                lookup(db)
×
1032
        case len(opt.jepsen) > 0:
×
1033
                jepsen(db)
×
1034
        case opt.vals:
×
1035
                total := seekTotal(db, opt.readTs)
×
1036
                fmt.Printf("Total: %d\n", total)
×
1037
        case opt.sizeHistogram:
×
1038
                sizeHistogram(db)
×
1039
        default:
×
1040
                printKeys(db)
×
1041
        }
1042
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc