• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

dgraph-io / dgraph / 5929719277

21 Aug 2023 05:48PM UTC coverage: 67.075% (-0.03%) from 67.105%
5929719277

push

web-flow
fix(debug): fix debug tool for schema keys (#7939)

Co-authored-by: Aman Mangal <aman@dgraph.io>

31 of 31 new or added lines in 1 file covered. (100.0%)

58449 of 87140 relevant lines covered (67.07%)

2236473.98 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

4.36
/dgraph/cmd/debug/run.go
1
/*
2
 * Copyright 2023 Dgraph Labs, Inc. and Contributors
3
 *
4
 * Licensed under the Apache License, Version 2.0 (the "License");
5
 * you may not use this file except in compliance with the License.
6
 * You may obtain a copy of the License at
7
 *
8
 *     http://www.apache.org/licenses/LICENSE-2.0
9
 *
10
 * Unless required by applicable law or agreed to in writing, software
11
 * distributed under the License is distributed on an "AS IS" BASIS,
12
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
 * See the License for the specific language governing permissions and
14
 * limitations under the License.
15
 */
16

17
package debug
18

19
import (
20
        "bufio"
21
        "bytes"
22
        "context"
23
        "encoding/hex"
24
        "fmt"
25
        "io"
26
        "log"
27
        "math"
28
        "net/http"
29
        _ "net/http/pprof" // http profiler
30
        "os"
31
        "sort"
32
        "strconv"
33
        "strings"
34
        "sync/atomic"
35

36
        "github.com/dustin/go-humanize"
37
        "github.com/spf13/cobra"
38

39
        "github.com/dgraph-io/badger/v4"
40
        bpb "github.com/dgraph-io/badger/v4/pb"
41
        "github.com/dgraph-io/dgraph/codec"
42
        "github.com/dgraph-io/dgraph/ee"
43
        "github.com/dgraph-io/dgraph/posting"
44
        "github.com/dgraph-io/dgraph/protos/pb"
45
        "github.com/dgraph-io/dgraph/raftwal"
46
        "github.com/dgraph-io/dgraph/types"
47
        "github.com/dgraph-io/dgraph/x"
48
        "github.com/dgraph-io/ristretto/z"
49
)
50

51
var (
52
        // Debug is the sub-command invoked when calling "dgraph debug"
53
        Debug x.SubCommand
54
        opt   flagOptions
55
)
56

57
type flagOptions struct {
58
        vals          bool
59
        keyLookup     string
60
        rollupKey     string
61
        keyHistory    bool
62
        predicate     string
63
        prefix        string
64
        readOnly      bool
65
        pdir          string
66
        itemMeta      bool
67
        jepsen        string
68
        readTs        uint64
69
        sizeHistogram bool
70
        noKeys        bool
71
        key           x.Sensitive
72
        onlySummary   bool
73

74
        // Options related to the WAL.
75
        wdir           string
76
        wtruncateUntil uint64
77
        wsetSnapshot   string
78
}
79

80
func init() {
158✔
81
        Debug.Cmd = &cobra.Command{
158✔
82
                Use:   "debug",
158✔
83
                Short: "Debug Dgraph instance",
158✔
84
                Run: func(cmd *cobra.Command, args []string) {
158✔
85
                        run()
×
86
                },
×
87
                Annotations: map[string]string{"group": "debug"},
88
        }
89
        Debug.Cmd.SetHelpTemplate(x.NonRootTemplate)
158✔
90

158✔
91
        flag := Debug.Cmd.Flags()
158✔
92
        flag.BoolVar(&opt.itemMeta, "item", true, "Output item meta as well. Set to false for diffs.")
158✔
93
        flag.BoolVar(&opt.vals, "vals", false, "Output values along with keys.")
158✔
94
        flag.BoolVar(&opt.noKeys, "nokeys", false,
158✔
95
                "Ignore key_. Only consider amount when calculating total.")
158✔
96
        flag.StringVar(&opt.jepsen, "jepsen", "", "Disect Jepsen output. Can be linear/binary.")
158✔
97
        flag.Uint64Var(&opt.readTs, "at", math.MaxUint64, "Set read timestamp for all txns.")
158✔
98
        flag.BoolVarP(&opt.readOnly, "readonly", "o", true, "Open in read only mode.")
158✔
99
        flag.StringVarP(&opt.predicate, "pred", "r", "", "Only output specified predicate.")
158✔
100
        flag.StringVarP(&opt.prefix, "prefix", "", "", "Uses a hex prefix.")
158✔
101
        flag.StringVarP(&opt.keyLookup, "lookup", "l", "", "Hex of key to lookup.")
158✔
102
        flag.StringVar(&opt.rollupKey, "rollup", "", "Hex of key to rollup.")
158✔
103
        flag.BoolVarP(&opt.keyHistory, "history", "y", false, "Show all versions of a key.")
158✔
104
        flag.StringVarP(&opt.pdir, "postings", "p", "", "Directory where posting lists are stored.")
158✔
105
        flag.BoolVar(&opt.sizeHistogram, "histogram", false,
158✔
106
                "Show a histogram of the key and value sizes.")
158✔
107
        flag.BoolVar(&opt.onlySummary, "only-summary", false,
158✔
108
                "If true, only show the summary of the p directory.")
158✔
109

158✔
110
        // Flags related to WAL.
158✔
111
        flag.StringVarP(&opt.wdir, "wal", "w", "", "Directory where Raft write-ahead logs are stored.")
158✔
112
        flag.Uint64VarP(&opt.wtruncateUntil, "truncate", "t", 0,
158✔
113
                "Remove data from Raft entries until but not including this index.")
158✔
114
        flag.StringVarP(&opt.wsetSnapshot, "snap", "s", "",
158✔
115
                "Set snapshot term,index,readts to this. Value must be comma-separated list containing"+
158✔
116
                        " the value for these vars in that order.")
158✔
117
        ee.RegisterEncFlag(flag)
158✔
118
}
119

120
func toInt(o *pb.Posting) int {
×
121
        from := types.Val{
×
122
                Tid:   types.TypeID(o.ValType),
×
123
                Value: o.Value,
×
124
        }
×
125
        out, err := types.Convert(from, types.StringID)
×
126
        x.Check(err)
×
127
        val := out.Value.(string)
×
128
        a, err := strconv.Atoi(val)
×
129
        if err != nil {
×
130
                return 0
×
131
        }
×
132
        return a
×
133
}
134

135
func uidToVal(itr *badger.Iterator, prefix string) map[uint64]int {
×
136
        keys := make(map[uint64]int)
×
137
        var lastKey []byte
×
138
        for itr.Rewind(); itr.Valid(); {
×
139
                item := itr.Item()
×
140
                if bytes.Equal(lastKey, item.Key()) {
×
141
                        itr.Next()
×
142
                        continue
×
143
                }
144
                lastKey = append(lastKey[:0], item.Key()...)
×
145
                pk, err := x.Parse(item.Key())
×
146
                x.Check(err)
×
147
                if !pk.IsData() || !strings.HasPrefix(x.ParseAttr(pk.Attr), prefix) {
×
148
                        continue
×
149
                }
150
                if pk.IsSchema() {
×
151
                        continue
×
152
                }
153
                if pk.StartUid > 0 {
×
154
                        // This key is part of a multi-part posting list. Skip it and only read
×
155
                        // the main key, which is the entry point to read the whole list.
×
156
                        continue
×
157
                }
158

159
                pl, err := posting.ReadPostingList(item.KeyCopy(nil), itr)
×
160
                if err != nil {
×
161
                        log.Fatalf("Unable to read posting list: %v", err)
×
162
                }
×
163
                err = pl.Iterate(math.MaxUint64, 0, func(o *pb.Posting) error {
×
164
                        from := types.Val{
×
165
                                Tid:   types.TypeID(o.ValType),
×
166
                                Value: o.Value,
×
167
                        }
×
168
                        out, err := types.Convert(from, types.StringID)
×
169
                        x.Check(err)
×
170
                        key := out.Value.(string)
×
171
                        k, err := strconv.Atoi(key)
×
172
                        x.Check(err)
×
173
                        keys[pk.Uid] = k
×
174
                        // fmt.Printf("Type: %v Uid=%d key=%s. commit=%d hex %x\n",
×
175
                        //         o.ValType, pk.Uid, key, o.CommitTs, lastKey)
×
176
                        return nil
×
177
                })
×
178
                x.Checkf(err, "during iterate")
×
179
        }
180
        return keys
×
181
}
182

183
func seekTotal(db *badger.DB, readTs uint64) int {
×
184
        txn := db.NewTransactionAt(readTs, false)
×
185
        defer txn.Discard()
×
186

×
187
        iopt := badger.DefaultIteratorOptions
×
188
        iopt.AllVersions = true
×
189
        iopt.PrefetchValues = false
×
190
        itr := txn.NewIterator(iopt)
×
191
        defer itr.Close()
×
192

×
193
        keys := uidToVal(itr, "key_")
×
194
        fmt.Printf("Got keys: %+v\n", keys)
×
195
        vals := uidToVal(itr, "amount_")
×
196
        var total int
×
197
        for _, val := range vals {
×
198
                total += val
×
199
        }
×
200
        fmt.Printf("Got vals: %+v. Total: %d\n", vals, total)
×
201
        if opt.noKeys {
×
202
                // Ignore the key_ predicate. Only consider the amount_ predicate. Useful when tablets are
×
203
                // being moved around.
×
204
                keys = vals
×
205
        }
×
206

207
        total = 0
×
208
        for uid, key := range keys {
×
209
                a := vals[uid]
×
210
                fmt.Printf("uid: %-5d %x key: %d amount: %d\n", uid, uid, key, a)
×
211
                total += a
×
212
        }
×
213
        fmt.Printf("Total @ %d = %d\n", readTs, total)
×
214
        return total
×
215
}
216

217
func findFirstValidTxn(db *badger.DB) uint64 {
×
218
        readTs := opt.readTs
×
219
        var wrong uint64
×
220
        for {
×
221
                min, max := getMinMax(db, readTs-1)
×
222
                if max <= min {
×
223
                        fmt.Printf("Can't find it. Max: %d\n", max)
×
224
                        return 0
×
225
                }
×
226
                readTs = max
×
227
                if total := seekTotal(db, readTs); total != 100 {
×
228
                        fmt.Printf("===> VIOLATION at ts: %d\n", readTs)
×
229
                        showAllPostingsAt(db, readTs)
×
230
                        wrong = readTs
×
231
                } else {
×
232
                        fmt.Printf("===> Found first correct version at %d\n", readTs)
×
233
                        showAllPostingsAt(db, readTs)
×
234
                        return wrong
×
235
                }
×
236
        }
237
}
238

239
func findFirstInvalidTxn(db *badger.DB, lowTs, highTs uint64) uint64 {
×
240
        fmt.Println()
×
241
        if highTs-lowTs < 1 {
×
242
                fmt.Printf("Checking at lowTs: %d\n", lowTs)
×
243
                if total := seekTotal(db, lowTs); total != 100 {
×
244
                        fmt.Printf("==> VIOLATION at ts: %d\n", lowTs)
×
245
                        return lowTs
×
246
                }
×
247
                fmt.Printf("No violation found at ts: %d\n", lowTs)
×
248
                return 0
×
249
        }
250

251
        midTs := (lowTs + highTs) / 2
×
252
        fmt.Printf("Checking. low=%d. high=%d. mid=%d\n", lowTs, highTs, midTs)
×
253
        if total := seekTotal(db, midTs); total == 100 {
×
254
                // If no failure, move to higher ts.
×
255
                return findFirstInvalidTxn(db, midTs+1, highTs)
×
256
        }
×
257
        // Found an error.
258
        return findFirstInvalidTxn(db, lowTs, midTs)
×
259
}
260

261
func showAllPostingsAt(db *badger.DB, readTs uint64) {
×
262
        txn := db.NewTransactionAt(readTs, false)
×
263
        defer txn.Discard()
×
264

×
265
        itr := txn.NewIterator(badger.DefaultIteratorOptions)
×
266
        defer itr.Close()
×
267

×
268
        type account struct {
×
269
                Key int
×
270
                Amt int
×
271
        }
×
272
        keys := make(map[uint64]*account)
×
273

×
274
        var buf bytes.Buffer
×
275
        fmt.Fprintf(&buf, "SHOWING all postings at %d\n", readTs)
×
276
        for itr.Rewind(); itr.Valid(); itr.Next() {
×
277
                item := itr.Item()
×
278
                if item.Version() != readTs {
×
279
                        continue
×
280
                }
281

282
                pk, err := x.Parse(item.Key())
×
283
                x.Check(err)
×
284
                if !pk.IsData() {
×
285
                        continue
×
286
                }
287

288
                var acc *account
×
289
                attr := x.ParseAttr(pk.Attr)
×
290
                if strings.HasPrefix(attr, "key_") || strings.HasPrefix(attr, "amount_") {
×
291
                        var has bool
×
292
                        acc, has = keys[pk.Uid]
×
293
                        if !has {
×
294
                                acc = &account{}
×
295
                                keys[pk.Uid] = acc
×
296
                        }
×
297
                }
298
                fmt.Fprintf(&buf, "  key: %+v hex: %x\n", pk, item.Key())
×
299
                val, err := item.ValueCopy(nil)
×
300
                x.Check(err)
×
301
                var plist pb.PostingList
×
302
                x.Check(plist.Unmarshal(val))
×
303

×
304
                x.AssertTrue(len(plist.Postings) <= 1)
×
305
                var num int
×
306
                for _, p := range plist.Postings {
×
307
                        num = toInt(p)
×
308
                        appendPosting(&buf, p)
×
309
                }
×
310
                if num > 0 && acc != nil {
×
311
                        switch {
×
312
                        case strings.HasPrefix(attr, "key_"):
×
313
                                acc.Key = num
×
314
                        case strings.HasPrefix(attr, "amount_"):
×
315
                                acc.Amt = num
×
316
                        }
317
                }
318
        }
319
        for uid, acc := range keys {
×
320
                fmt.Fprintf(&buf, "Uid: %d %x Key: %d Amount: %d\n", uid, uid, acc.Key, acc.Amt)
×
321
        }
×
322
        fmt.Println(buf.String())
×
323
}
324

325
func getMinMax(db *badger.DB, readTs uint64) (uint64, uint64) {
×
326
        var min, max uint64 = math.MaxUint64, 0
×
327
        txn := db.NewTransactionAt(readTs, false)
×
328
        defer txn.Discard()
×
329

×
330
        iopt := badger.DefaultIteratorOptions
×
331
        iopt.AllVersions = true
×
332
        itr := txn.NewIterator(iopt)
×
333
        defer itr.Close()
×
334

×
335
        for itr.Rewind(); itr.Valid(); itr.Next() {
×
336
                item := itr.Item()
×
337
                if min > item.Version() {
×
338
                        min = item.Version()
×
339
                }
×
340
                if max < item.Version() {
×
341
                        max = item.Version()
×
342
                }
×
343
        }
344
        return min, max
×
345
}
346

347
func jepsen(db *badger.DB) {
×
348
        min, max := getMinMax(db, opt.readTs)
×
349
        fmt.Printf("min=%d. max=%d\n", min, max)
×
350

×
351
        var ts uint64
×
352
        switch opt.jepsen {
×
353
        case "binary":
×
354
                ts = findFirstInvalidTxn(db, min, max)
×
355
        case "linear":
×
356
                ts = findFirstValidTxn(db)
×
357
        }
358
        fmt.Println()
×
359
        if ts == 0 {
×
360
                fmt.Println("Nothing found. Exiting.")
×
361
                return
×
362
        }
×
363
        showAllPostingsAt(db, ts)
×
364
        seekTotal(db, ts-1)
×
365

×
366
        for i := 0; i < 5; i++ {
×
367
                // Get a few previous commits.
×
368
                _, ts = getMinMax(db, ts-1)
×
369
                showAllPostingsAt(db, ts)
×
370
                seekTotal(db, ts-1)
×
371
        }
×
372
}
373

374
func history(lookup []byte, itr *badger.Iterator) {
×
375
        var buf bytes.Buffer
×
376
        pk, err := x.Parse(lookup)
×
377
        x.Check(err)
×
378
        fmt.Fprintf(&buf, "==> key: %x. PK: %+v\n", lookup, pk)
×
379
        for ; itr.Valid(); itr.Next() {
×
380
                item := itr.Item()
×
381
                if !bytes.Equal(item.Key(), lookup) {
×
382
                        break
×
383
                }
384

385
                fmt.Fprintf(&buf, "ts: %d", item.Version())
×
386
                x.Check2(buf.WriteString(" {item}"))
×
387
                if item.IsDeletedOrExpired() {
×
388
                        x.Check2(buf.WriteString("{deleted}"))
×
389
                }
×
390
                if item.DiscardEarlierVersions() {
×
391
                        x.Check2(buf.WriteString("{discard}"))
×
392
                }
×
393
                val, err := item.ValueCopy(nil)
×
394
                x.Check(err)
×
395

×
396
                meta := item.UserMeta()
×
397
                if meta&posting.BitCompletePosting > 0 {
×
398
                        x.Check2(buf.WriteString("{complete}"))
×
399
                }
×
400
                if meta&posting.BitDeltaPosting > 0 {
×
401
                        x.Check2(buf.WriteString("{delta}"))
×
402
                }
×
403
                if meta&posting.BitEmptyPosting > 0 {
×
404
                        x.Check2(buf.WriteString("{empty}"))
×
405
                }
×
406
                fmt.Fprintln(&buf)
×
407
                if meta&posting.BitDeltaPosting > 0 {
×
408
                        plist := &pb.PostingList{}
×
409
                        x.Check(plist.Unmarshal(val))
×
410
                        for _, p := range plist.Postings {
×
411
                                appendPosting(&buf, p)
×
412
                        }
×
413
                }
414
                if meta&posting.BitCompletePosting > 0 {
×
415
                        var plist pb.PostingList
×
416
                        x.Check(plist.Unmarshal(val))
×
417

×
418
                        for _, p := range plist.Postings {
×
419
                                appendPosting(&buf, p)
×
420
                        }
×
421

422
                        fmt.Fprintf(&buf, " Num uids = %d. Size = %d\n",
×
423
                                codec.ExactLen(plist.Pack), plist.Pack.Size())
×
424
                        dec := codec.Decoder{Pack: plist.Pack}
×
425
                        for uids := dec.Seek(0, codec.SeekStart); len(uids) > 0; uids = dec.Next() {
×
426
                                for _, uid := range uids {
×
427
                                        fmt.Fprintf(&buf, " Uid = %d\n", uid)
×
428
                                }
×
429
                        }
430
                }
431
                x.Check2(buf.WriteString("\n"))
×
432
        }
433
        fmt.Println(buf.String())
×
434
}
435

436
func appendPosting(w io.Writer, o *pb.Posting) {
×
437
        fmt.Fprintf(w, " Uid: %d Op: %d ", o.Uid, o.Op)
×
438

×
439
        if len(o.Value) > 0 {
×
440
                fmt.Fprintf(w, " Type: %v. ", o.ValType)
×
441
                from := types.Val{
×
442
                        Tid:   types.TypeID(o.ValType),
×
443
                        Value: o.Value,
×
444
                }
×
445
                out, err := types.Convert(from, types.StringID)
×
446
                if err != nil {
×
447
                        fmt.Fprintf(w, " Value: %q Error: %v", o.Value, err)
×
448
                } else {
×
449
                        fmt.Fprintf(w, " String Value: %q", out.Value)
×
450
                }
×
451
        }
452
        fmt.Fprintln(w, "")
×
453
}
454
func rollupKey(db *badger.DB) {
×
455
        txn := db.NewTransactionAt(opt.readTs, false)
×
456
        defer txn.Discard()
×
457

×
458
        key, err := hex.DecodeString(opt.rollupKey)
×
459
        x.Check(err)
×
460

×
461
        iopts := badger.DefaultIteratorOptions
×
462
        iopts.AllVersions = true
×
463
        iopts.PrefetchValues = false
×
464
        itr := txn.NewKeyIterator(key, iopts)
×
465
        defer itr.Close()
×
466

×
467
        itr.Rewind()
×
468
        if !itr.Valid() {
×
469
                log.Fatalf("Unable to seek to key: %s", hex.Dump(key))
×
470
        }
×
471

472
        item := itr.Item()
×
473
        // Don't need to do anything if the bitdelta is not set.
×
474
        if item.UserMeta()&posting.BitDeltaPosting == 0 {
×
475
                fmt.Printf("First item has UserMeta:[b%04b]. Nothing to do\n", item.UserMeta())
×
476
                return
×
477
        }
×
478
        pl, err := posting.ReadPostingList(item.KeyCopy(nil), itr)
×
479
        x.Check(err)
×
480

×
481
        alloc := z.NewAllocator(32<<20, "Debug.RollupKey")
×
482
        defer alloc.Release()
×
483

×
484
        // Setting kvs at their original value as we can't give a new timestamp in debug mode.
×
485
        kvs, err := pl.Rollup(alloc, math.MaxUint64)
×
486
        x.Check(err)
×
487

×
488
        wb := db.NewManagedWriteBatch()
×
489
        x.Check(wb.WriteList(&bpb.KVList{Kv: kvs}))
×
490
        x.Check(wb.Flush())
×
491
}
492

493
func lookup(db *badger.DB) {
×
494
        txn := db.NewTransactionAt(opt.readTs, false)
×
495
        defer txn.Discard()
×
496

×
497
        iopts := badger.DefaultIteratorOptions
×
498
        iopts.AllVersions = true
×
499
        iopts.PrefetchValues = false
×
500
        itr := txn.NewIterator(iopts)
×
501
        defer itr.Close()
×
502

×
503
        key, err := hex.DecodeString(opt.keyLookup)
×
504
        if err != nil {
×
505
                log.Fatal(err)
×
506
        }
×
507
        itr.Seek(key)
×
508
        if !itr.Valid() {
×
509
                log.Fatalf("Unable to seek to key: %s", hex.Dump(key))
×
510
        }
×
511

512
        if opt.keyHistory {
×
513
                history(key, itr)
×
514
                return
×
515
        }
×
516

517
        var buf bytes.Buffer
×
518
        item := itr.Item()
×
519
        if item.UserMeta()&posting.BitSchemaPosting > 0 {
×
520
                // Schema is stored as pb.SchemaUpdate, we should not try to read it as a posting list
×
521
                fmt.Fprintf(&buf, "Key: %x\n", item.Key())
×
522
                schemaBytes, err := item.ValueCopy(nil)
×
523
                x.Check(err)
×
524

×
525
                var s pb.SchemaUpdate
×
526
                x.Check(s.Unmarshal(schemaBytes))
×
527
                fmt.Fprintf(&buf, "Value: %+v\n", s)
×
528
        } else {
×
529
                fmt.Fprintf(&buf, "Key: %x", item.Key())
×
530
                pl, err := posting.ReadPostingList(item.KeyCopy(nil), itr)
×
531
                if err != nil {
×
532
                        log.Fatal(err)
×
533
                }
×
534
                fmt.Fprintf(&buf, " Length: %d", pl.Length(math.MaxUint64, 0))
×
535

×
536
                splits := pl.PartSplits()
×
537
                isMultiPart := len(splits) > 0
×
538
                fmt.Fprintf(&buf, " Is multi-part list? %v", isMultiPart)
×
539
                if isMultiPart {
×
540
                        fmt.Fprintf(&buf, " Start UID of parts: %v\n", splits)
×
541
                }
×
542

543
                err = pl.Iterate(math.MaxUint64, 0, func(o *pb.Posting) error {
×
544
                        appendPosting(&buf, o)
×
545
                        return nil
×
546
                })
×
547
                if err != nil {
×
548
                        log.Fatal(err)
×
549
                }
×
550
        }
551
        fmt.Println(buf.String())
×
552
}
553

554
// Current format is like:
555
// {i} attr: name term: [8] woods  ts: 535 item: [28, b0100] sz: 81 dcnt: 3 key: 00000...6f6f6473
556
// Fix the TestBulkLoadMultiShard accordingly, if the format changes.
557
func printKeys(db *badger.DB) {
×
558
        var prefix []byte
×
559
        if len(opt.predicate) > 0 {
×
560
                prefix = x.PredicatePrefix(opt.predicate)
×
561
        } else if len(opt.prefix) > 0 {
×
562
                p, err := hex.DecodeString(opt.prefix)
×
563
                x.Check(err)
×
564
                prefix = p
×
565
        }
×
566
        fmt.Printf("prefix = %s\n", hex.Dump(prefix))
×
567
        stream := db.NewStreamAt(opt.readTs)
×
568
        stream.Prefix = prefix
×
569
        var total uint64
×
570
        stream.KeyToList = func(key []byte, itr *badger.Iterator) (*bpb.KVList, error) {
×
571
                item := itr.Item()
×
572
                pk, err := x.Parse(key)
×
573
                x.Check(err)
×
574

×
575
                var buf bytes.Buffer
×
576
                // Don't use a switch case here. Because multiple of these can be true. In particular,
×
577
                // IsSchema can be true alongside IsData.
×
578
                if pk.IsData() {
×
579
                        x.Check2(buf.WriteString("{d}"))
×
580
                }
×
581
                if pk.IsIndex() {
×
582
                        x.Check2(buf.WriteString("{i}"))
×
583
                }
×
584
                if pk.IsCountOrCountRev() {
×
585
                        x.Check2(buf.WriteString("{c}"))
×
586
                }
×
587
                if pk.IsSchema() {
×
588
                        x.Check2(buf.WriteString("{s}"))
×
589
                }
×
590
                if pk.IsReverse() {
×
591
                        x.Check2(buf.WriteString("{r}"))
×
592
                }
×
593
                ns, attr := x.ParseNamespaceAttr(pk.Attr)
×
594
                x.Check2(buf.WriteString(fmt.Sprintf(" ns: %#x ", ns)))
×
595
                x.Check2(buf.WriteString(" attr: " + attr))
×
596
                if len(pk.Term) > 0 {
×
597
                        fmt.Fprintf(&buf, " term: [%d] [%v] ", pk.Term[0], pk.Term[1:])
×
598
                }
×
599
                if pk.Uid > 0 {
×
600
                        fmt.Fprintf(&buf, " uid: %d ", pk.Uid)
×
601
                }
×
602
                if pk.StartUid > 0 {
×
603
                        fmt.Fprintf(&buf, " startUid: %d ", pk.StartUid)
×
604
                }
×
605

606
                if opt.itemMeta {
×
607
                        fmt.Fprintf(&buf, " ts: %d", item.Version())
×
608
                        fmt.Fprintf(&buf, " item: [%d, b%04b]", item.EstimatedSize(), item.UserMeta())
×
609
                }
×
610

611
                var sz, deltaCount int64
×
612
        LOOP:
×
613
                for ; itr.ValidForPrefix(prefix); itr.Next() {
×
614
                        item := itr.Item()
×
615
                        if !bytes.Equal(item.Key(), key) {
×
616
                                break
×
617
                        }
618
                        if item.IsDeletedOrExpired() {
×
619
                                x.Check2(buf.WriteString(" {v.del}"))
×
620
                                break
×
621
                        }
622
                        switch item.UserMeta() {
×
623
                        // This is rather a default case as one of the 4 bit must be set.
624
                        case posting.BitCompletePosting, posting.BitEmptyPosting, posting.BitSchemaPosting:
×
625
                                sz += item.EstimatedSize()
×
626
                                break LOOP
×
627
                        case posting.BitDeltaPosting:
×
628
                                sz += item.EstimatedSize()
×
629
                                deltaCount++
×
630
                        default:
×
631
                                fmt.Printf("No user meta found for key: %s\n", hex.EncodeToString(key))
×
632
                        }
633
                        if item.DiscardEarlierVersions() {
×
634
                                x.Check2(buf.WriteString(" {v.las}"))
×
635
                                break
×
636
                        }
637
                }
638
                var invalidSz, invalidCount uint64
×
639
                // skip all the versions of key
×
640
                for ; itr.ValidForPrefix(prefix); itr.Next() {
×
641
                        item := itr.Item()
×
642
                        if !bytes.Equal(item.Key(), key) {
×
643
                                break
×
644
                        }
645
                        invalidSz += uint64(item.EstimatedSize())
×
646
                        invalidCount++
×
647
                }
648

649
                fmt.Fprintf(&buf, " sz: %d dcnt: %d", sz, deltaCount)
×
650
                if invalidCount > 0 {
×
651
                        fmt.Fprintf(&buf, " isz: %d icount: %d", invalidSz, invalidCount)
×
652
                }
×
653
                fmt.Fprintf(&buf, " key: %s", hex.EncodeToString(key))
×
654
                // If total size is more than 1 GB or we have more than 1 million keys, flag this key.
×
655
                if uint64(sz)+invalidSz > (1<<30) || uint64(deltaCount)+invalidCount > 10e6 {
×
656
                        fmt.Fprintf(&buf, " [HEAVY]")
×
657
                }
×
658
                buf.WriteRune('\n')
×
659
                list := &bpb.KVList{}
×
660
                list.Kv = append(list.Kv, &bpb.KV{
×
661
                        Value: buf.Bytes(),
×
662
                })
×
663
                // Don't call fmt.Println here. It is much slower.
×
664
                return list, nil
×
665
        }
666

667
        w := bufio.NewWriterSize(os.Stdout, 16<<20)
×
668
        stream.Send = func(buf *z.Buffer) error {
×
669
                var count int
×
670
                err := buf.SliceIterate(func(s []byte) error {
×
671
                        var kv bpb.KV
×
672
                        if err := kv.Unmarshal(s); err != nil {
×
673
                                return err
×
674
                        }
×
675
                        x.Check2(w.Write(kv.Value))
×
676
                        count++
×
677
                        return nil
×
678
                })
679
                atomic.AddUint64(&total, uint64(count))
×
680
                return err
×
681
        }
682
        x.Check(stream.Orchestrate(context.Background()))
×
683
        x.Check(w.Flush())
×
684
        fmt.Println()
×
685
        fmt.Printf("Found %d keys\n", atomic.LoadUint64(&total))
×
686
}
687

688
// Creates bounds for an histogram. The bounds are powers of two of the form
689
// [2^min_exponent, ..., 2^max_exponent].
690
func getHistogramBounds(minExponent, maxExponent uint32) []float64 {
×
691
        var bounds []float64
×
692
        for i := minExponent; i <= maxExponent; i++ {
×
693
                bounds = append(bounds, float64(int(1)<<i))
×
694
        }
×
695
        return bounds
×
696
}
697

698
// HistogramData stores the information needed to represent the sizes of the keys and values
699
// as a histogram.
700
type HistogramData struct {
701
        Bounds         []float64
702
        Count          int64
703
        CountPerBucket []int64
704
        Min            int64
705
        Max            int64
706
        Sum            int64
707
}
708

709
// NewHistogramData returns a new instance of HistogramData with properly initialized fields.
710
func NewHistogramData(bounds []float64) *HistogramData {
×
711
        return &HistogramData{
×
712
                Bounds:         bounds,
×
713
                CountPerBucket: make([]int64, len(bounds)+1),
×
714
                Max:            0,
×
715
                Min:            math.MaxInt64,
×
716
        }
×
717
}
×
718

719
// Update changes the Min and Max fields if value is less than or greater than the current values.
720
func (histogram *HistogramData) Update(value int64) {
×
721
        if value > histogram.Max {
×
722
                histogram.Max = value
×
723
        }
×
724
        if value < histogram.Min {
×
725
                histogram.Min = value
×
726
        }
×
727

728
        histogram.Sum += value
×
729
        histogram.Count++
×
730

×
731
        for index := 0; index <= len(histogram.Bounds); index++ {
×
732
                // Allocate value in the last buckets if we reached the end of the Bounds array.
×
733
                if index == len(histogram.Bounds) {
×
734
                        histogram.CountPerBucket[index]++
×
735
                        break
×
736
                }
737

738
                if value < int64(histogram.Bounds[index]) {
×
739
                        histogram.CountPerBucket[index]++
×
740
                        break
×
741
                }
742
        }
743
}
744

745
// PrintHistogram prints the histogram data in a human-readable format.
746
func (histogram *HistogramData) PrintHistogram() {
×
747
        if histogram == nil {
×
748
                return
×
749
        }
×
750

751
        fmt.Printf("Min value: %d\n", histogram.Min)
×
752
        fmt.Printf("Max value: %d\n", histogram.Max)
×
753
        fmt.Printf("Mean: %.2f\n", float64(histogram.Sum)/float64(histogram.Count))
×
754
        fmt.Printf("%24s %9s\n", "Range", "Count")
×
755

×
756
        numBounds := len(histogram.Bounds)
×
757
        for index, count := range histogram.CountPerBucket {
×
758
                if count == 0 {
×
759
                        continue
×
760
                }
761

762
                // The last bucket represents the bucket that contains the range from
763
                // the last bound up to infinity so it's processed differently than the
764
                // other buckets.
765
                if index == len(histogram.CountPerBucket)-1 {
×
766
                        lowerBound := int(histogram.Bounds[numBounds-1])
×
767
                        fmt.Printf("[%10d, %10s) %9d\n", lowerBound, "infinity", count)
×
768
                        continue
×
769
                }
770

771
                upperBound := int(histogram.Bounds[index])
×
772
                lowerBound := 0
×
773
                if index > 0 {
×
774
                        lowerBound = int(histogram.Bounds[index-1])
×
775
                }
×
776

777
                fmt.Printf("[%10d, %10d) %9d\n", lowerBound, upperBound, count)
×
778
        }
779
}
780

781
func sizeHistogram(db *badger.DB) {
×
782
        txn := db.NewTransactionAt(opt.readTs, false)
×
783
        defer txn.Discard()
×
784

×
785
        iopts := badger.DefaultIteratorOptions
×
786
        iopts.PrefetchValues = false
×
787
        itr := txn.NewIterator(iopts)
×
788
        defer itr.Close()
×
789

×
790
        // Generate distribution bounds. Key sizes are not greater than 2^16 while
×
791
        // value sizes are not greater than 1GB (2^30).
×
792
        keyBounds := getHistogramBounds(5, 16)
×
793
        valueBounds := getHistogramBounds(5, 30)
×
794

×
795
        // Initialize exporter.
×
796
        keySizeHistogram := NewHistogramData(keyBounds)
×
797
        valueSizeHistogram := NewHistogramData(valueBounds)
×
798

×
799
        // Collect key and value sizes.
×
800
        var prefix []byte
×
801
        if len(opt.predicate) > 0 {
×
802
                prefix = x.PredicatePrefix(opt.predicate)
×
803
        }
×
804
        var loop int
×
805
        for itr.Seek(prefix); itr.ValidForPrefix(prefix); itr.Next() {
×
806
                item := itr.Item()
×
807

×
808
                keySizeHistogram.Update(int64(len(item.Key())))
×
809
                valueSizeHistogram.Update(item.ValueSize())
×
810

×
811
                loop++
×
812
        }
×
813

814
        fmt.Printf("prefix = %s\n", hex.Dump(prefix))
×
815
        fmt.Printf("Found %d keys\n", loop)
×
816
        fmt.Printf("\nHistogram of key sizes (in bytes)\n")
×
817
        keySizeHistogram.PrintHistogram()
×
818
        fmt.Printf("\nHistogram of value sizes (in bytes)\n")
×
819
        valueSizeHistogram.PrintHistogram()
×
820
}
821

822
func printAlphaProposal(buf *bytes.Buffer, pr *pb.Proposal, pending map[uint64]bool) {
×
823
        if pr == nil {
×
824
                return
×
825
        }
×
826

827
        switch {
×
828
        case pr.Mutations != nil:
×
829
                fmt.Fprintf(buf, " Mutation . StartTs: %d . Edges: %d .",
×
830
                        pr.Mutations.StartTs, len(pr.Mutations.Edges))
×
831
                if len(pr.Mutations.Edges) > 0 {
×
832
                        pending[pr.Mutations.StartTs] = true
×
833
                } else {
×
834
                        fmt.Fprintf(buf, " Mutation: %+v .", pr.Mutations)
×
835
                }
×
836
                fmt.Fprintf(buf, " Pending txns: %d .", len(pending))
×
837
        case len(pr.Kv) > 0:
×
838
                fmt.Fprintf(buf, " KV . Size: %d ", len(pr.Kv))
×
839
        case pr.State != nil:
×
840
                fmt.Fprintf(buf, " State . %+v ", pr.State)
×
841
        case pr.Delta != nil:
×
842
                fmt.Fprintf(buf, " Delta .")
×
843
                sort.Slice(pr.Delta.Txns, func(i, j int) bool {
×
844
                        ti := pr.Delta.Txns[i]
×
845
                        tj := pr.Delta.Txns[j]
×
846
                        return ti.StartTs < tj.StartTs
×
847
                })
×
848
                fmt.Fprintf(buf, " Max: %d .", pr.Delta.GetMaxAssigned())
×
849
                for _, txn := range pr.Delta.Txns {
×
850
                        delete(pending, txn.StartTs)
×
851
                }
×
852
                // There could be many thousands of txns within a single delta. We
853
                // don't need to print out every single entry, so just show the
854
                // first 10.
855
                if len(pr.Delta.Txns) >= 10 {
×
856
                        fmt.Fprintf(buf, " Num txns: %d .", len(pr.Delta.Txns))
×
857
                        pr.Delta.Txns = pr.Delta.Txns[:10]
×
858
                }
×
859
                for _, txn := range pr.Delta.Txns {
×
860
                        fmt.Fprintf(buf, " %d → %d .", txn.StartTs, txn.CommitTs)
×
861
                }
×
862
                fmt.Fprintf(buf, " Pending txns: %d .", len(pending))
×
863
        case pr.Snapshot != nil:
×
864
                fmt.Fprintf(buf, " Snapshot . %+v ", pr.Snapshot)
×
865
        }
866
}
867

868
func printZeroProposal(buf *bytes.Buffer, zpr *pb.ZeroProposal) {
×
869
        if zpr == nil {
×
870
                return
×
871
        }
×
872

873
        switch {
×
874
        case len(zpr.SnapshotTs) > 0:
×
875
                fmt.Fprintf(buf, " Snapshot: %+v .", zpr.SnapshotTs)
×
876
        case zpr.Member != nil:
×
877
                fmt.Fprintf(buf, " Member: %+v .", zpr.Member)
×
878
        case zpr.Tablet != nil:
×
879
                fmt.Fprintf(buf, " Tablet: %+v .", zpr.Tablet)
×
880
        case zpr.MaxUID > 0:
×
881
                fmt.Fprintf(buf, " MaxUID: %d .", zpr.MaxUID)
×
882
        case zpr.MaxNsID > 0:
×
883
                fmt.Fprintf(buf, " MaxNsID: %d .", zpr.MaxNsID)
×
884
        case zpr.MaxRaftId > 0:
×
885
                fmt.Fprintf(buf, " MaxRaftId: %d .", zpr.MaxRaftId)
×
886
        case zpr.MaxTxnTs > 0:
×
887
                fmt.Fprintf(buf, " MaxTxnTs: %d .", zpr.MaxTxnTs)
×
888
        case zpr.Txn != nil:
×
889
                txn := zpr.Txn
×
890
                fmt.Fprintf(buf, " Txn %d → %d .", txn.StartTs, txn.CommitTs)
×
891
        default:
×
892
                fmt.Fprintf(buf, " Proposal: %+v .", zpr)
×
893
        }
894
}
895

896
func printSummary(db *badger.DB) {
×
897
        nsFromKey := func(key []byte) uint64 {
×
898
                pk, err := x.Parse(key)
×
899
                if err != nil {
×
900
                        // Some of the keys are badger's internal and couldn't be parsed.
×
901
                        // Hence, the error is expected in that case.
×
902
                        fmt.Printf("Unable to parse key: %#x\n", key)
×
903
                        return x.GalaxyNamespace
×
904
                }
×
905
                return x.ParseNamespace(pk.Attr)
×
906
        }
907
        banned := db.BannedNamespaces()
×
908
        bannedNs := make(map[uint64]struct{})
×
909
        for _, ns := range banned {
×
910
                bannedNs[ns] = struct{}{}
×
911
        }
×
912

913
        tables := db.Tables()
×
914
        levelSizes := make([]uint64, len(db.Levels()))
×
915
        nsSize := make(map[uint64]uint64)
×
916
        for _, tab := range tables {
×
917
                levelSizes[tab.Level] += uint64(tab.OnDiskSize)
×
918
                if nsFromKey(tab.Left) == nsFromKey(tab.Right) {
×
919
                        nsSize[nsFromKey(tab.Left)] += uint64(tab.OnDiskSize)
×
920
                }
×
921
        }
922

923
        fmt.Println("[SUMMARY]")
×
924
        totalSize := uint64(0)
×
925
        for i, sz := range levelSizes {
×
926
                fmt.Printf("Level %d size: %12s\n", i, humanize.IBytes(sz))
×
927
                totalSize += sz
×
928
        }
×
929
        fmt.Printf("Total SST size: %12s\n", humanize.IBytes(totalSize))
×
930
        fmt.Println()
×
931
        for ns, sz := range nsSize {
×
932
                fmt.Printf("Namespace %#x size: %12s", ns, humanize.IBytes(sz))
×
933
                if _, ok := bannedNs[ns]; ok {
×
934
                        fmt.Printf(" (banned)")
×
935
                }
×
936
                fmt.Println()
×
937
        }
938
        fmt.Println()
×
939
}
940

941
func run() {
×
942
        go func() {
×
943
                for i := 8080; i < 9080; i++ {
×
944
                        fmt.Printf("Listening for /debug HTTP requests at port: %d\n", i)
×
945
                        if err := http.ListenAndServe(fmt.Sprintf("localhost:%d", i), nil); err != nil {
×
946
                                fmt.Println("Port busy. Trying another one...")
×
947
                                continue
×
948
                        }
949
                }
950
        }()
951

952
        dir := opt.pdir
×
953
        isWal := false
×
954
        if len(dir) == 0 {
×
955
                dir = opt.wdir
×
956
                isWal = true
×
957
        }
×
958
        keys, err := ee.GetKeys(Debug.Conf)
×
959
        x.Check(err)
×
960
        opt.key = keys.EncKey
×
961

×
962
        if isWal {
×
963
                store, err := raftwal.InitEncrypted(dir, opt.key)
×
964
                x.Check(err)
×
965
                if err := handleWal(store); err != nil {
×
966
                        fmt.Printf("\nGot error while handling WAL: %v\n", err)
×
967
                }
×
968
                return
×
969
        }
970

971
        bopts := badger.DefaultOptions(dir).
×
972
                WithReadOnly(opt.readOnly).
×
973
                WithEncryptionKey(opt.key).
×
974
                WithBlockCacheSize(1 << 30).
×
975
                WithIndexCacheSize(1 << 30).
×
976
                WithNamespaceOffset(x.NamespaceOffset) // We don't want to see the banned data.
×
977

×
978
        x.AssertTruef(len(bopts.Dir) > 0, "No posting or wal dir specified.")
×
979
        fmt.Printf("Opening DB: %s\n", bopts.Dir)
×
980

×
981
        db, err := badger.OpenManaged(bopts)
×
982
        x.Check(err)
×
983
        // Not using posting list cache
×
984
        posting.Init(db, 0)
×
985
        defer db.Close()
×
986

×
987
        printSummary(db)
×
988
        if opt.onlySummary {
×
989
                return
×
990
        }
×
991

992
        // Commenting the following out because on large Badger DBs, this can take a LONG time.
993
        // min, max := getMinMax(db, opt.readTs)
994
        // fmt.Printf("Min commit: %d. Max commit: %d, w.r.t %d\n", min, max, opt.readTs)
995

996
        switch {
×
997
        case len(opt.rollupKey) > 0:
×
998
                rollupKey(db)
×
999
        case len(opt.keyLookup) > 0:
×
1000
                lookup(db)
×
1001
        case len(opt.jepsen) > 0:
×
1002
                jepsen(db)
×
1003
        case opt.vals:
×
1004
                total := seekTotal(db, opt.readTs)
×
1005
                fmt.Printf("Total: %d\n", total)
×
1006
        case opt.sizeHistogram:
×
1007
                sizeHistogram(db)
×
1008
        default:
×
1009
                printKeys(db)
×
1010
        }
1011
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc