• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

dgraph-io / dgraph / 4736860513

18 Apr 2023 09:46PM UTC coverage: 67.279% (+0.008%) from 67.271%
4736860513

push

GitHub
fix(changelog): lower case naming convention (#8795)

58414 of 86823 relevant lines covered (67.28%)

2233274.3 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

4.43
/dgraph/cmd/debug/run.go
1
/*
2
 * Copyright 2023 Dgraph Labs, Inc. and Contributors
3
 *
4
 * Licensed under the Apache License, Version 2.0 (the "License");
5
 * you may not use this file except in compliance with the License.
6
 * You may obtain a copy of the License at
7
 *
8
 *     http://www.apache.org/licenses/LICENSE-2.0
9
 *
10
 * Unless required by applicable law or agreed to in writing, software
11
 * distributed under the License is distributed on an "AS IS" BASIS,
12
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
 * See the License for the specific language governing permissions and
14
 * limitations under the License.
15
 */
16

17
package debug
18

19
import (
20
        "bufio"
21
        "bytes"
22
        "context"
23
        "encoding/hex"
24
        "fmt"
25
        "io"
26
        "log"
27
        "math"
28
        "net/http"
29
        _ "net/http/pprof" // http profiler
30
        "os"
31
        "sort"
32
        "strconv"
33
        "strings"
34
        "sync/atomic"
35

36
        "github.com/dustin/go-humanize"
37
        "github.com/spf13/cobra"
38

39
        "github.com/dgraph-io/badger/v4"
40
        bpb "github.com/dgraph-io/badger/v4/pb"
41
        "github.com/dgraph-io/dgraph/codec"
42
        "github.com/dgraph-io/dgraph/ee"
43
        "github.com/dgraph-io/dgraph/posting"
44
        "github.com/dgraph-io/dgraph/protos/pb"
45
        "github.com/dgraph-io/dgraph/raftwal"
46
        "github.com/dgraph-io/dgraph/types"
47
        "github.com/dgraph-io/dgraph/x"
48
        "github.com/dgraph-io/ristretto/z"
49
)
50

51
var (
52
        // Debug is the sub-command invoked when calling "dgraph debug"
53
        Debug x.SubCommand
54
        opt   flagOptions
55
)
56

57
type flagOptions struct {
58
        vals          bool
59
        keyLookup     string
60
        rollupKey     string
61
        keyHistory    bool
62
        predicate     string
63
        prefix        string
64
        readOnly      bool
65
        pdir          string
66
        itemMeta      bool
67
        jepsen        string
68
        readTs        uint64
69
        sizeHistogram bool
70
        noKeys        bool
71
        key           x.Sensitive
72
        onlySummary   bool
73

74
        // Options related to the WAL.
75
        wdir           string
76
        wtruncateUntil uint64
77
        wsetSnapshot   string
78
}
79

80
func init() {
155✔
81
        Debug.Cmd = &cobra.Command{
155✔
82
                Use:   "debug",
155✔
83
                Short: "Debug Dgraph instance",
155✔
84
                Run: func(cmd *cobra.Command, args []string) {
155✔
85
                        run()
×
86
                },
×
87
                Annotations: map[string]string{"group": "debug"},
88
        }
89
        Debug.Cmd.SetHelpTemplate(x.NonRootTemplate)
155✔
90

155✔
91
        flag := Debug.Cmd.Flags()
155✔
92
        flag.BoolVar(&opt.itemMeta, "item", true, "Output item meta as well. Set to false for diffs.")
155✔
93
        flag.BoolVar(&opt.vals, "vals", false, "Output values along with keys.")
155✔
94
        flag.BoolVar(&opt.noKeys, "nokeys", false,
155✔
95
                "Ignore key_. Only consider amount when calculating total.")
155✔
96
        flag.StringVar(&opt.jepsen, "jepsen", "", "Disect Jepsen output. Can be linear/binary.")
155✔
97
        flag.Uint64Var(&opt.readTs, "at", math.MaxUint64, "Set read timestamp for all txns.")
155✔
98
        flag.BoolVarP(&opt.readOnly, "readonly", "o", true, "Open in read only mode.")
155✔
99
        flag.StringVarP(&opt.predicate, "pred", "r", "", "Only output specified predicate.")
155✔
100
        flag.StringVarP(&opt.prefix, "prefix", "", "", "Uses a hex prefix.")
155✔
101
        flag.StringVarP(&opt.keyLookup, "lookup", "l", "", "Hex of key to lookup.")
155✔
102
        flag.StringVar(&opt.rollupKey, "rollup", "", "Hex of key to rollup.")
155✔
103
        flag.BoolVarP(&opt.keyHistory, "history", "y", false, "Show all versions of a key.")
155✔
104
        flag.StringVarP(&opt.pdir, "postings", "p", "", "Directory where posting lists are stored.")
155✔
105
        flag.BoolVar(&opt.sizeHistogram, "histogram", false,
155✔
106
                "Show a histogram of the key and value sizes.")
155✔
107
        flag.BoolVar(&opt.onlySummary, "only-summary", false,
155✔
108
                "If true, only show the summary of the p directory.")
155✔
109

155✔
110
        // Flags related to WAL.
155✔
111
        flag.StringVarP(&opt.wdir, "wal", "w", "", "Directory where Raft write-ahead logs are stored.")
155✔
112
        flag.Uint64VarP(&opt.wtruncateUntil, "truncate", "t", 0,
155✔
113
                "Remove data from Raft entries until but not including this index.")
155✔
114
        flag.StringVarP(&opt.wsetSnapshot, "snap", "s", "",
155✔
115
                "Set snapshot term,index,readts to this. Value must be comma-separated list containing"+
155✔
116
                        " the value for these vars in that order.")
155✔
117
        ee.RegisterEncFlag(flag)
155✔
118
}
119

120
func toInt(o *pb.Posting) int {
×
121
        from := types.Val{
×
122
                Tid:   types.TypeID(o.ValType),
×
123
                Value: o.Value,
×
124
        }
×
125
        out, err := types.Convert(from, types.StringID)
×
126
        x.Check(err)
×
127
        val := out.Value.(string)
×
128
        a, err := strconv.Atoi(val)
×
129
        if err != nil {
×
130
                return 0
×
131
        }
×
132
        return a
×
133
}
134

135
func uidToVal(itr *badger.Iterator, prefix string) map[uint64]int {
×
136
        keys := make(map[uint64]int)
×
137
        var lastKey []byte
×
138
        for itr.Rewind(); itr.Valid(); {
×
139
                item := itr.Item()
×
140
                if bytes.Equal(lastKey, item.Key()) {
×
141
                        itr.Next()
×
142
                        continue
×
143
                }
144
                lastKey = append(lastKey[:0], item.Key()...)
×
145
                pk, err := x.Parse(item.Key())
×
146
                x.Check(err)
×
147
                if !pk.IsData() || !strings.HasPrefix(x.ParseAttr(pk.Attr), prefix) {
×
148
                        continue
×
149
                }
150
                if pk.IsSchema() {
×
151
                        continue
×
152
                }
153
                if pk.StartUid > 0 {
×
154
                        // This key is part of a multi-part posting list. Skip it and only read
×
155
                        // the main key, which is the entry point to read the whole list.
×
156
                        continue
×
157
                }
158

159
                pl, err := posting.ReadPostingList(item.KeyCopy(nil), itr)
×
160
                if err != nil {
×
161
                        log.Fatalf("Unable to read posting list: %v", err)
×
162
                }
×
163
                err = pl.Iterate(math.MaxUint64, 0, func(o *pb.Posting) error {
×
164
                        from := types.Val{
×
165
                                Tid:   types.TypeID(o.ValType),
×
166
                                Value: o.Value,
×
167
                        }
×
168
                        out, err := types.Convert(from, types.StringID)
×
169
                        x.Check(err)
×
170
                        key := out.Value.(string)
×
171
                        k, err := strconv.Atoi(key)
×
172
                        x.Check(err)
×
173
                        keys[pk.Uid] = k
×
174
                        // fmt.Printf("Type: %v Uid=%d key=%s. commit=%d hex %x\n",
×
175
                        //         o.ValType, pk.Uid, key, o.CommitTs, lastKey)
×
176
                        return nil
×
177
                })
×
178
                x.Checkf(err, "during iterate")
×
179
        }
180
        return keys
×
181
}
182

183
func seekTotal(db *badger.DB, readTs uint64) int {
×
184
        txn := db.NewTransactionAt(readTs, false)
×
185
        defer txn.Discard()
×
186

×
187
        iopt := badger.DefaultIteratorOptions
×
188
        iopt.AllVersions = true
×
189
        iopt.PrefetchValues = false
×
190
        itr := txn.NewIterator(iopt)
×
191
        defer itr.Close()
×
192

×
193
        keys := uidToVal(itr, "key_")
×
194
        fmt.Printf("Got keys: %+v\n", keys)
×
195
        vals := uidToVal(itr, "amount_")
×
196
        var total int
×
197
        for _, val := range vals {
×
198
                total += val
×
199
        }
×
200
        fmt.Printf("Got vals: %+v. Total: %d\n", vals, total)
×
201
        if opt.noKeys {
×
202
                // Ignore the key_ predicate. Only consider the amount_ predicate. Useful when tablets are
×
203
                // being moved around.
×
204
                keys = vals
×
205
        }
×
206

207
        total = 0
×
208
        for uid, key := range keys {
×
209
                a := vals[uid]
×
210
                fmt.Printf("uid: %-5d %x key: %d amount: %d\n", uid, uid, key, a)
×
211
                total += a
×
212
        }
×
213
        fmt.Printf("Total @ %d = %d\n", readTs, total)
×
214
        return total
×
215
}
216

217
func findFirstValidTxn(db *badger.DB) uint64 {
×
218
        readTs := opt.readTs
×
219
        var wrong uint64
×
220
        for {
×
221
                min, max := getMinMax(db, readTs-1)
×
222
                if max <= min {
×
223
                        fmt.Printf("Can't find it. Max: %d\n", max)
×
224
                        return 0
×
225
                }
×
226
                readTs = max
×
227
                if total := seekTotal(db, readTs); total != 100 {
×
228
                        fmt.Printf("===> VIOLATION at ts: %d\n", readTs)
×
229
                        showAllPostingsAt(db, readTs)
×
230
                        wrong = readTs
×
231
                } else {
×
232
                        fmt.Printf("===> Found first correct version at %d\n", readTs)
×
233
                        showAllPostingsAt(db, readTs)
×
234
                        return wrong
×
235
                }
×
236
        }
237
}
238

239
func findFirstInvalidTxn(db *badger.DB, lowTs, highTs uint64) uint64 {
×
240
        fmt.Println()
×
241
        if highTs-lowTs < 1 {
×
242
                fmt.Printf("Checking at lowTs: %d\n", lowTs)
×
243
                if total := seekTotal(db, lowTs); total != 100 {
×
244
                        fmt.Printf("==> VIOLATION at ts: %d\n", lowTs)
×
245
                        return lowTs
×
246
                }
×
247
                fmt.Printf("No violation found at ts: %d\n", lowTs)
×
248
                return 0
×
249
        }
250

251
        midTs := (lowTs + highTs) / 2
×
252
        fmt.Printf("Checking. low=%d. high=%d. mid=%d\n", lowTs, highTs, midTs)
×
253
        if total := seekTotal(db, midTs); total == 100 {
×
254
                // If no failure, move to higher ts.
×
255
                return findFirstInvalidTxn(db, midTs+1, highTs)
×
256
        }
×
257
        // Found an error.
258
        return findFirstInvalidTxn(db, lowTs, midTs)
×
259
}
260

261
func showAllPostingsAt(db *badger.DB, readTs uint64) {
×
262
        txn := db.NewTransactionAt(readTs, false)
×
263
        defer txn.Discard()
×
264

×
265
        itr := txn.NewIterator(badger.DefaultIteratorOptions)
×
266
        defer itr.Close()
×
267

×
268
        type account struct {
×
269
                Key int
×
270
                Amt int
×
271
        }
×
272
        keys := make(map[uint64]*account)
×
273

×
274
        var buf bytes.Buffer
×
275
        fmt.Fprintf(&buf, "SHOWING all postings at %d\n", readTs)
×
276
        for itr.Rewind(); itr.Valid(); itr.Next() {
×
277
                item := itr.Item()
×
278
                if item.Version() != readTs {
×
279
                        continue
×
280
                }
281

282
                pk, err := x.Parse(item.Key())
×
283
                x.Check(err)
×
284
                if !pk.IsData() {
×
285
                        continue
×
286
                }
287

288
                var acc *account
×
289
                attr := x.ParseAttr(pk.Attr)
×
290
                if strings.HasPrefix(attr, "key_") || strings.HasPrefix(attr, "amount_") {
×
291
                        var has bool
×
292
                        acc, has = keys[pk.Uid]
×
293
                        if !has {
×
294
                                acc = &account{}
×
295
                                keys[pk.Uid] = acc
×
296
                        }
×
297
                }
298
                fmt.Fprintf(&buf, "  key: %+v hex: %x\n", pk, item.Key())
×
299
                val, err := item.ValueCopy(nil)
×
300
                x.Check(err)
×
301
                var plist pb.PostingList
×
302
                x.Check(plist.Unmarshal(val))
×
303

×
304
                x.AssertTrue(len(plist.Postings) <= 1)
×
305
                var num int
×
306
                for _, p := range plist.Postings {
×
307
                        num = toInt(p)
×
308
                        appendPosting(&buf, p)
×
309
                }
×
310
                if num > 0 && acc != nil {
×
311
                        switch {
×
312
                        case strings.HasPrefix(attr, "key_"):
×
313
                                acc.Key = num
×
314
                        case strings.HasPrefix(attr, "amount_"):
×
315
                                acc.Amt = num
×
316
                        }
317
                }
318
        }
319
        for uid, acc := range keys {
×
320
                fmt.Fprintf(&buf, "Uid: %d %x Key: %d Amount: %d\n", uid, uid, acc.Key, acc.Amt)
×
321
        }
×
322
        fmt.Println(buf.String())
×
323
}
324

325
func getMinMax(db *badger.DB, readTs uint64) (uint64, uint64) {
×
326
        var min, max uint64 = math.MaxUint64, 0
×
327
        txn := db.NewTransactionAt(readTs, false)
×
328
        defer txn.Discard()
×
329

×
330
        iopt := badger.DefaultIteratorOptions
×
331
        iopt.AllVersions = true
×
332
        itr := txn.NewIterator(iopt)
×
333
        defer itr.Close()
×
334

×
335
        for itr.Rewind(); itr.Valid(); itr.Next() {
×
336
                item := itr.Item()
×
337
                if min > item.Version() {
×
338
                        min = item.Version()
×
339
                }
×
340
                if max < item.Version() {
×
341
                        max = item.Version()
×
342
                }
×
343
        }
344
        return min, max
×
345
}
346

347
func jepsen(db *badger.DB) {
×
348
        min, max := getMinMax(db, opt.readTs)
×
349
        fmt.Printf("min=%d. max=%d\n", min, max)
×
350

×
351
        var ts uint64
×
352
        switch opt.jepsen {
×
353
        case "binary":
×
354
                ts = findFirstInvalidTxn(db, min, max)
×
355
        case "linear":
×
356
                ts = findFirstValidTxn(db)
×
357
        }
358
        fmt.Println()
×
359
        if ts == 0 {
×
360
                fmt.Println("Nothing found. Exiting.")
×
361
                return
×
362
        }
×
363
        showAllPostingsAt(db, ts)
×
364
        seekTotal(db, ts-1)
×
365

×
366
        for i := 0; i < 5; i++ {
×
367
                // Get a few previous commits.
×
368
                _, ts = getMinMax(db, ts-1)
×
369
                showAllPostingsAt(db, ts)
×
370
                seekTotal(db, ts-1)
×
371
        }
×
372
}
373

374
func history(lookup []byte, itr *badger.Iterator) {
×
375
        var buf bytes.Buffer
×
376
        pk, err := x.Parse(lookup)
×
377
        x.Check(err)
×
378
        fmt.Fprintf(&buf, "==> key: %x. PK: %+v\n", lookup, pk)
×
379
        for ; itr.Valid(); itr.Next() {
×
380
                item := itr.Item()
×
381
                if !bytes.Equal(item.Key(), lookup) {
×
382
                        break
×
383
                }
384

385
                fmt.Fprintf(&buf, "ts: %d", item.Version())
×
386
                x.Check2(buf.WriteString(" {item}"))
×
387
                if item.IsDeletedOrExpired() {
×
388
                        x.Check2(buf.WriteString("{deleted}"))
×
389
                }
×
390
                if item.DiscardEarlierVersions() {
×
391
                        x.Check2(buf.WriteString("{discard}"))
×
392
                }
×
393
                val, err := item.ValueCopy(nil)
×
394
                x.Check(err)
×
395

×
396
                meta := item.UserMeta()
×
397
                if meta&posting.BitCompletePosting > 0 {
×
398
                        x.Check2(buf.WriteString("{complete}"))
×
399
                }
×
400
                if meta&posting.BitDeltaPosting > 0 {
×
401
                        x.Check2(buf.WriteString("{delta}"))
×
402
                }
×
403
                if meta&posting.BitEmptyPosting > 0 {
×
404
                        x.Check2(buf.WriteString("{empty}"))
×
405
                }
×
406
                fmt.Fprintln(&buf)
×
407
                if meta&posting.BitDeltaPosting > 0 {
×
408
                        plist := &pb.PostingList{}
×
409
                        x.Check(plist.Unmarshal(val))
×
410
                        for _, p := range plist.Postings {
×
411
                                appendPosting(&buf, p)
×
412
                        }
×
413
                }
414
                if meta&posting.BitCompletePosting > 0 {
×
415
                        var plist pb.PostingList
×
416
                        x.Check(plist.Unmarshal(val))
×
417

×
418
                        for _, p := range plist.Postings {
×
419
                                appendPosting(&buf, p)
×
420
                        }
×
421

422
                        fmt.Fprintf(&buf, " Num uids = %d. Size = %d\n",
×
423
                                codec.ExactLen(plist.Pack), plist.Pack.Size())
×
424
                        dec := codec.Decoder{Pack: plist.Pack}
×
425
                        for uids := dec.Seek(0, codec.SeekStart); len(uids) > 0; uids = dec.Next() {
×
426
                                for _, uid := range uids {
×
427
                                        fmt.Fprintf(&buf, " Uid = %d\n", uid)
×
428
                                }
×
429
                        }
430
                }
431
                x.Check2(buf.WriteString("\n"))
×
432
        }
433
        fmt.Println(buf.String())
×
434
}
435

436
func appendPosting(w io.Writer, o *pb.Posting) {
×
437
        fmt.Fprintf(w, " Uid: %d Op: %d ", o.Uid, o.Op)
×
438

×
439
        if len(o.Value) > 0 {
×
440
                fmt.Fprintf(w, " Type: %v. ", o.ValType)
×
441
                from := types.Val{
×
442
                        Tid:   types.TypeID(o.ValType),
×
443
                        Value: o.Value,
×
444
                }
×
445
                out, err := types.Convert(from, types.StringID)
×
446
                if err != nil {
×
447
                        fmt.Fprintf(w, " Value: %q Error: %v", o.Value, err)
×
448
                } else {
×
449
                        fmt.Fprintf(w, " String Value: %q", out.Value)
×
450
                }
×
451
        }
452
        fmt.Fprintln(w, "")
×
453
}
454
func rollupKey(db *badger.DB) {
×
455
        txn := db.NewTransactionAt(opt.readTs, false)
×
456
        defer txn.Discard()
×
457

×
458
        key, err := hex.DecodeString(opt.rollupKey)
×
459
        x.Check(err)
×
460

×
461
        iopts := badger.DefaultIteratorOptions
×
462
        iopts.AllVersions = true
×
463
        iopts.PrefetchValues = false
×
464
        itr := txn.NewKeyIterator(key, iopts)
×
465
        defer itr.Close()
×
466

×
467
        itr.Rewind()
×
468
        if !itr.Valid() {
×
469
                log.Fatalf("Unable to seek to key: %s", hex.Dump(key))
×
470
        }
×
471

472
        item := itr.Item()
×
473
        // Don't need to do anything if the bitdelta is not set.
×
474
        if item.UserMeta()&posting.BitDeltaPosting == 0 {
×
475
                fmt.Printf("First item has UserMeta:[b%04b]. Nothing to do\n", item.UserMeta())
×
476
                return
×
477
        }
×
478
        pl, err := posting.ReadPostingList(item.KeyCopy(nil), itr)
×
479
        x.Check(err)
×
480

×
481
        alloc := z.NewAllocator(32<<20, "Debug.RollupKey")
×
482
        defer alloc.Release()
×
483

×
484
        // Setting kvs at their original value as we can't give a new timestamp in debug mode.
×
485
        kvs, err := pl.Rollup(alloc, math.MaxUint64)
×
486
        x.Check(err)
×
487

×
488
        wb := db.NewManagedWriteBatch()
×
489
        x.Check(wb.WriteList(&bpb.KVList{Kv: kvs}))
×
490
        x.Check(wb.Flush())
×
491
}
492

493
func lookup(db *badger.DB) {
×
494
        txn := db.NewTransactionAt(opt.readTs, false)
×
495
        defer txn.Discard()
×
496

×
497
        iopts := badger.DefaultIteratorOptions
×
498
        iopts.AllVersions = true
×
499
        iopts.PrefetchValues = false
×
500
        itr := txn.NewIterator(iopts)
×
501
        defer itr.Close()
×
502

×
503
        key, err := hex.DecodeString(opt.keyLookup)
×
504
        if err != nil {
×
505
                log.Fatal(err)
×
506
        }
×
507
        itr.Seek(key)
×
508
        if !itr.Valid() {
×
509
                log.Fatalf("Unable to seek to key: %s", hex.Dump(key))
×
510
        }
×
511

512
        if opt.keyHistory {
×
513
                history(key, itr)
×
514
                return
×
515
        }
×
516

517
        item := itr.Item()
×
518
        pl, err := posting.ReadPostingList(item.KeyCopy(nil), itr)
×
519
        if err != nil {
×
520
                log.Fatal(err)
×
521
        }
×
522
        var buf bytes.Buffer
×
523
        fmt.Fprintf(&buf, " Key: %x", item.Key())
×
524
        fmt.Fprintf(&buf, " Length: %d", pl.Length(math.MaxUint64, 0))
×
525

×
526
        splits := pl.PartSplits()
×
527
        isMultiPart := len(splits) > 0
×
528
        fmt.Fprintf(&buf, " Is multi-part list? %v", isMultiPart)
×
529
        if isMultiPart {
×
530
                fmt.Fprintf(&buf, " Start UID of parts: %v\n", splits)
×
531
        }
×
532

533
        err = pl.Iterate(math.MaxUint64, 0, func(o *pb.Posting) error {
×
534
                appendPosting(&buf, o)
×
535
                return nil
×
536
        })
×
537
        if err != nil {
×
538
                log.Fatal(err)
×
539
        }
×
540
        fmt.Println(buf.String())
×
541
}
542

543
// Current format is like:
544
// {i} attr: name term: [8] woods  ts: 535 item: [28, b0100] sz: 81 dcnt: 3 key: 00000...6f6f6473
545
// Fix the TestBulkLoadMultiShard accordingly, if the format changes.
546
func printKeys(db *badger.DB) {
×
547
        var prefix []byte
×
548
        if len(opt.predicate) > 0 {
×
549
                prefix = x.PredicatePrefix(opt.predicate)
×
550
        } else if len(opt.prefix) > 0 {
×
551
                p, err := hex.DecodeString(opt.prefix)
×
552
                x.Check(err)
×
553
                prefix = p
×
554
        }
×
555
        fmt.Printf("prefix = %s\n", hex.Dump(prefix))
×
556
        stream := db.NewStreamAt(opt.readTs)
×
557
        stream.Prefix = prefix
×
558
        var total uint64
×
559
        stream.KeyToList = func(key []byte, itr *badger.Iterator) (*bpb.KVList, error) {
×
560
                item := itr.Item()
×
561
                pk, err := x.Parse(key)
×
562
                x.Check(err)
×
563
                var buf bytes.Buffer
×
564
                // Don't use a switch case here. Because multiple of these can be true. In particular,
×
565
                // IsSchema can be true alongside IsData.
×
566
                if pk.IsData() {
×
567
                        x.Check2(buf.WriteString("{d}"))
×
568
                }
×
569
                if pk.IsIndex() {
×
570
                        x.Check2(buf.WriteString("{i}"))
×
571
                }
×
572
                if pk.IsCountOrCountRev() {
×
573
                        x.Check2(buf.WriteString("{c}"))
×
574
                }
×
575
                if pk.IsSchema() {
×
576
                        x.Check2(buf.WriteString("{s}"))
×
577
                }
×
578
                if pk.IsReverse() {
×
579
                        x.Check2(buf.WriteString("{r}"))
×
580
                }
×
581
                ns, attr := x.ParseNamespaceAttr(pk.Attr)
×
582
                x.Check2(buf.WriteString(fmt.Sprintf(" ns: %#x ", ns)))
×
583
                x.Check2(buf.WriteString(" attr: " + attr))
×
584
                if len(pk.Term) > 0 {
×
585
                        fmt.Fprintf(&buf, " term: [%d] %s ", pk.Term[0], pk.Term[1:])
×
586
                }
×
587
                if pk.Uid > 0 {
×
588
                        fmt.Fprintf(&buf, " uid: %d ", pk.Uid)
×
589
                }
×
590
                if pk.StartUid > 0 {
×
591
                        fmt.Fprintf(&buf, " startUid: %d ", pk.StartUid)
×
592
                }
×
593

594
                if opt.itemMeta {
×
595
                        fmt.Fprintf(&buf, " ts: %d", item.Version())
×
596
                        fmt.Fprintf(&buf, " item: [%d, b%04b]", item.EstimatedSize(), item.UserMeta())
×
597
                }
×
598

599
                var sz, deltaCount int64
×
600
        LOOP:
×
601
                for ; itr.ValidForPrefix(prefix); itr.Next() {
×
602
                        item := itr.Item()
×
603
                        if !bytes.Equal(item.Key(), key) {
×
604
                                break
×
605
                        }
606
                        if item.IsDeletedOrExpired() {
×
607
                                x.Check2(buf.WriteString(" {v.del}"))
×
608
                                break
×
609
                        }
610
                        switch item.UserMeta() {
×
611
                        // This is rather a default case as one of the 4 bit must be set.
612
                        case posting.BitCompletePosting, posting.BitEmptyPosting, posting.BitSchemaPosting:
×
613
                                sz += item.EstimatedSize()
×
614
                                break LOOP
×
615
                        case posting.BitDeltaPosting:
×
616
                                sz += item.EstimatedSize()
×
617
                                deltaCount++
×
618
                        default:
×
619
                                fmt.Printf("No user meta found for key: %s\n", hex.EncodeToString(key))
×
620
                        }
621
                        if item.DiscardEarlierVersions() {
×
622
                                x.Check2(buf.WriteString(" {v.las}"))
×
623
                                break
×
624
                        }
625
                }
626
                var invalidSz, invalidCount uint64
×
627
                // skip all the versions of key
×
628
                for ; itr.ValidForPrefix(prefix); itr.Next() {
×
629
                        item := itr.Item()
×
630
                        if !bytes.Equal(item.Key(), key) {
×
631
                                break
×
632
                        }
633
                        invalidSz += uint64(item.EstimatedSize())
×
634
                        invalidCount++
×
635
                }
636

637
                fmt.Fprintf(&buf, " sz: %d dcnt: %d", sz, deltaCount)
×
638
                if invalidCount > 0 {
×
639
                        fmt.Fprintf(&buf, " isz: %d icount: %d", invalidSz, invalidCount)
×
640
                }
×
641
                fmt.Fprintf(&buf, " key: %s", hex.EncodeToString(key))
×
642
                // If total size is more than 1 GB or we have more than 1 million keys, flag this key.
×
643
                if uint64(sz)+invalidSz > (1<<30) || uint64(deltaCount)+invalidCount > 10e6 {
×
644
                        fmt.Fprintf(&buf, " [HEAVY]")
×
645
                }
×
646
                buf.WriteRune('\n')
×
647
                list := &bpb.KVList{}
×
648
                list.Kv = append(list.Kv, &bpb.KV{
×
649
                        Value: buf.Bytes(),
×
650
                })
×
651
                // Don't call fmt.Println here. It is much slower.
×
652
                return list, nil
×
653
        }
654

655
        w := bufio.NewWriterSize(os.Stdout, 16<<20)
×
656
        stream.Send = func(buf *z.Buffer) error {
×
657
                var count int
×
658
                err := buf.SliceIterate(func(s []byte) error {
×
659
                        var kv bpb.KV
×
660
                        if err := kv.Unmarshal(s); err != nil {
×
661
                                return err
×
662
                        }
×
663
                        x.Check2(w.Write(kv.Value))
×
664
                        count++
×
665
                        return nil
×
666
                })
667
                atomic.AddUint64(&total, uint64(count))
×
668
                return err
×
669
        }
670
        x.Check(stream.Orchestrate(context.Background()))
×
671
        x.Check(w.Flush())
×
672
        fmt.Println()
×
673
        fmt.Printf("Found %d keys\n", atomic.LoadUint64(&total))
×
674
}
675

676
// Creates bounds for an histogram. The bounds are powers of two of the form
677
// [2^min_exponent, ..., 2^max_exponent].
678
func getHistogramBounds(minExponent, maxExponent uint32) []float64 {
×
679
        var bounds []float64
×
680
        for i := minExponent; i <= maxExponent; i++ {
×
681
                bounds = append(bounds, float64(int(1)<<i))
×
682
        }
×
683
        return bounds
×
684
}
685

686
// HistogramData stores the information needed to represent the sizes of the keys and values
687
// as a histogram.
688
type HistogramData struct {
689
        Bounds         []float64
690
        Count          int64
691
        CountPerBucket []int64
692
        Min            int64
693
        Max            int64
694
        Sum            int64
695
}
696

697
// NewHistogramData returns a new instance of HistogramData with properly initialized fields.
698
func NewHistogramData(bounds []float64) *HistogramData {
×
699
        return &HistogramData{
×
700
                Bounds:         bounds,
×
701
                CountPerBucket: make([]int64, len(bounds)+1),
×
702
                Max:            0,
×
703
                Min:            math.MaxInt64,
×
704
        }
×
705
}
×
706

707
// Update changes the Min and Max fields if value is less than or greater than the current values.
708
func (histogram *HistogramData) Update(value int64) {
×
709
        if value > histogram.Max {
×
710
                histogram.Max = value
×
711
        }
×
712
        if value < histogram.Min {
×
713
                histogram.Min = value
×
714
        }
×
715

716
        histogram.Sum += value
×
717
        histogram.Count++
×
718

×
719
        for index := 0; index <= len(histogram.Bounds); index++ {
×
720
                // Allocate value in the last buckets if we reached the end of the Bounds array.
×
721
                if index == len(histogram.Bounds) {
×
722
                        histogram.CountPerBucket[index]++
×
723
                        break
×
724
                }
725

726
                if value < int64(histogram.Bounds[index]) {
×
727
                        histogram.CountPerBucket[index]++
×
728
                        break
×
729
                }
730
        }
731
}
732

733
// PrintHistogram prints the histogram data in a human-readable format.
734
func (histogram *HistogramData) PrintHistogram() {
×
735
        if histogram == nil {
×
736
                return
×
737
        }
×
738

739
        fmt.Printf("Min value: %d\n", histogram.Min)
×
740
        fmt.Printf("Max value: %d\n", histogram.Max)
×
741
        fmt.Printf("Mean: %.2f\n", float64(histogram.Sum)/float64(histogram.Count))
×
742
        fmt.Printf("%24s %9s\n", "Range", "Count")
×
743

×
744
        numBounds := len(histogram.Bounds)
×
745
        for index, count := range histogram.CountPerBucket {
×
746
                if count == 0 {
×
747
                        continue
×
748
                }
749

750
                // The last bucket represents the bucket that contains the range from
751
                // the last bound up to infinity so it's processed differently than the
752
                // other buckets.
753
                if index == len(histogram.CountPerBucket)-1 {
×
754
                        lowerBound := int(histogram.Bounds[numBounds-1])
×
755
                        fmt.Printf("[%10d, %10s) %9d\n", lowerBound, "infinity", count)
×
756
                        continue
×
757
                }
758

759
                upperBound := int(histogram.Bounds[index])
×
760
                lowerBound := 0
×
761
                if index > 0 {
×
762
                        lowerBound = int(histogram.Bounds[index-1])
×
763
                }
×
764

765
                fmt.Printf("[%10d, %10d) %9d\n", lowerBound, upperBound, count)
×
766
        }
767
}
768

769
func sizeHistogram(db *badger.DB) {
×
770
        txn := db.NewTransactionAt(opt.readTs, false)
×
771
        defer txn.Discard()
×
772

×
773
        iopts := badger.DefaultIteratorOptions
×
774
        iopts.PrefetchValues = false
×
775
        itr := txn.NewIterator(iopts)
×
776
        defer itr.Close()
×
777

×
778
        // Generate distribution bounds. Key sizes are not greater than 2^16 while
×
779
        // value sizes are not greater than 1GB (2^30).
×
780
        keyBounds := getHistogramBounds(5, 16)
×
781
        valueBounds := getHistogramBounds(5, 30)
×
782

×
783
        // Initialize exporter.
×
784
        keySizeHistogram := NewHistogramData(keyBounds)
×
785
        valueSizeHistogram := NewHistogramData(valueBounds)
×
786

×
787
        // Collect key and value sizes.
×
788
        var prefix []byte
×
789
        if len(opt.predicate) > 0 {
×
790
                prefix = x.PredicatePrefix(opt.predicate)
×
791
        }
×
792
        var loop int
×
793
        for itr.Seek(prefix); itr.ValidForPrefix(prefix); itr.Next() {
×
794
                item := itr.Item()
×
795

×
796
                keySizeHistogram.Update(int64(len(item.Key())))
×
797
                valueSizeHistogram.Update(item.ValueSize())
×
798

×
799
                loop++
×
800
        }
×
801

802
        fmt.Printf("prefix = %s\n", hex.Dump(prefix))
×
803
        fmt.Printf("Found %d keys\n", loop)
×
804
        fmt.Printf("\nHistogram of key sizes (in bytes)\n")
×
805
        keySizeHistogram.PrintHistogram()
×
806
        fmt.Printf("\nHistogram of value sizes (in bytes)\n")
×
807
        valueSizeHistogram.PrintHistogram()
×
808
}
809

810
func printAlphaProposal(buf *bytes.Buffer, pr *pb.Proposal, pending map[uint64]bool) {
×
811
        if pr == nil {
×
812
                return
×
813
        }
×
814

815
        switch {
×
816
        case pr.Mutations != nil:
×
817
                fmt.Fprintf(buf, " Mutation . StartTs: %d . Edges: %d .",
×
818
                        pr.Mutations.StartTs, len(pr.Mutations.Edges))
×
819
                if len(pr.Mutations.Edges) > 0 {
×
820
                        pending[pr.Mutations.StartTs] = true
×
821
                } else {
×
822
                        fmt.Fprintf(buf, " Mutation: %+v .", pr.Mutations)
×
823
                }
×
824
                fmt.Fprintf(buf, " Pending txns: %d .", len(pending))
×
825
        case len(pr.Kv) > 0:
×
826
                fmt.Fprintf(buf, " KV . Size: %d ", len(pr.Kv))
×
827
        case pr.State != nil:
×
828
                fmt.Fprintf(buf, " State . %+v ", pr.State)
×
829
        case pr.Delta != nil:
×
830
                fmt.Fprintf(buf, " Delta .")
×
831
                sort.Slice(pr.Delta.Txns, func(i, j int) bool {
×
832
                        ti := pr.Delta.Txns[i]
×
833
                        tj := pr.Delta.Txns[j]
×
834
                        return ti.StartTs < tj.StartTs
×
835
                })
×
836
                fmt.Fprintf(buf, " Max: %d .", pr.Delta.GetMaxAssigned())
×
837
                for _, txn := range pr.Delta.Txns {
×
838
                        delete(pending, txn.StartTs)
×
839
                }
×
840
                // There could be many thousands of txns within a single delta. We
841
                // don't need to print out every single entry, so just show the
842
                // first 10.
843
                if len(pr.Delta.Txns) >= 10 {
×
844
                        fmt.Fprintf(buf, " Num txns: %d .", len(pr.Delta.Txns))
×
845
                        pr.Delta.Txns = pr.Delta.Txns[:10]
×
846
                }
×
847
                for _, txn := range pr.Delta.Txns {
×
848
                        fmt.Fprintf(buf, " %d → %d .", txn.StartTs, txn.CommitTs)
×
849
                }
×
850
                fmt.Fprintf(buf, " Pending txns: %d .", len(pending))
×
851
        case pr.Snapshot != nil:
×
852
                fmt.Fprintf(buf, " Snapshot . %+v ", pr.Snapshot)
×
853
        }
854
}
855

856
func printZeroProposal(buf *bytes.Buffer, zpr *pb.ZeroProposal) {
×
857
        if zpr == nil {
×
858
                return
×
859
        }
×
860

861
        switch {
×
862
        case len(zpr.SnapshotTs) > 0:
×
863
                fmt.Fprintf(buf, " Snapshot: %+v .", zpr.SnapshotTs)
×
864
        case zpr.Member != nil:
×
865
                fmt.Fprintf(buf, " Member: %+v .", zpr.Member)
×
866
        case zpr.Tablet != nil:
×
867
                fmt.Fprintf(buf, " Tablet: %+v .", zpr.Tablet)
×
868
        case zpr.MaxUID > 0:
×
869
                fmt.Fprintf(buf, " MaxUID: %d .", zpr.MaxUID)
×
870
        case zpr.MaxNsID > 0:
×
871
                fmt.Fprintf(buf, " MaxNsID: %d .", zpr.MaxNsID)
×
872
        case zpr.MaxRaftId > 0:
×
873
                fmt.Fprintf(buf, " MaxRaftId: %d .", zpr.MaxRaftId)
×
874
        case zpr.MaxTxnTs > 0:
×
875
                fmt.Fprintf(buf, " MaxTxnTs: %d .", zpr.MaxTxnTs)
×
876
        case zpr.Txn != nil:
×
877
                txn := zpr.Txn
×
878
                fmt.Fprintf(buf, " Txn %d → %d .", txn.StartTs, txn.CommitTs)
×
879
        default:
×
880
                fmt.Fprintf(buf, " Proposal: %+v .", zpr)
×
881
        }
882
}
883

884
func printSummary(db *badger.DB) {
×
885
        nsFromKey := func(key []byte) uint64 {
×
886
                pk, err := x.Parse(key)
×
887
                if err != nil {
×
888
                        // Some of the keys are badger's internal and couldn't be parsed.
×
889
                        // Hence, the error is expected in that case.
×
890
                        fmt.Printf("Unable to parse key: %#x\n", key)
×
891
                        return x.GalaxyNamespace
×
892
                }
×
893
                return x.ParseNamespace(pk.Attr)
×
894
        }
895
        banned := db.BannedNamespaces()
×
896
        bannedNs := make(map[uint64]struct{})
×
897
        for _, ns := range banned {
×
898
                bannedNs[ns] = struct{}{}
×
899
        }
×
900

901
        tables := db.Tables()
×
902
        levelSizes := make([]uint64, len(db.Levels()))
×
903
        nsSize := make(map[uint64]uint64)
×
904
        for _, tab := range tables {
×
905
                levelSizes[tab.Level] += uint64(tab.OnDiskSize)
×
906
                if nsFromKey(tab.Left) == nsFromKey(tab.Right) {
×
907
                        nsSize[nsFromKey(tab.Left)] += uint64(tab.OnDiskSize)
×
908
                }
×
909
        }
910

911
        fmt.Println("[SUMMARY]")
×
912
        totalSize := uint64(0)
×
913
        for i, sz := range levelSizes {
×
914
                fmt.Printf("Level %d size: %12s\n", i, humanize.IBytes(sz))
×
915
                totalSize += sz
×
916
        }
×
917
        fmt.Printf("Total SST size: %12s\n", humanize.IBytes(totalSize))
×
918
        fmt.Println()
×
919
        for ns, sz := range nsSize {
×
920
                fmt.Printf("Namespace %#x size: %12s", ns, humanize.IBytes(sz))
×
921
                if _, ok := bannedNs[ns]; ok {
×
922
                        fmt.Printf(" (banned)")
×
923
                }
×
924
                fmt.Println()
×
925
        }
926
        fmt.Println()
×
927
}
928

929
func run() {
×
930
        go func() {
×
931
                for i := 8080; i < 9080; i++ {
×
932
                        fmt.Printf("Listening for /debug HTTP requests at port: %d\n", i)
×
933
                        if err := http.ListenAndServe(fmt.Sprintf("localhost:%d", i), nil); err != nil {
×
934
                                fmt.Println("Port busy. Trying another one...")
×
935
                                continue
×
936
                        }
937
                }
938
        }()
939

940
        dir := opt.pdir
×
941
        isWal := false
×
942
        if len(dir) == 0 {
×
943
                dir = opt.wdir
×
944
                isWal = true
×
945
        }
×
946
        keys, err := ee.GetKeys(Debug.Conf)
×
947
        x.Check(err)
×
948
        opt.key = keys.EncKey
×
949

×
950
        if isWal {
×
951
                store, err := raftwal.InitEncrypted(dir, opt.key)
×
952
                x.Check(err)
×
953
                if err := handleWal(store); err != nil {
×
954
                        fmt.Printf("\nGot error while handling WAL: %v\n", err)
×
955
                }
×
956
                return
×
957
        }
958

959
        bopts := badger.DefaultOptions(dir).
×
960
                WithReadOnly(opt.readOnly).
×
961
                WithEncryptionKey(opt.key).
×
962
                WithBlockCacheSize(1 << 30).
×
963
                WithIndexCacheSize(1 << 30).
×
964
                WithNamespaceOffset(x.NamespaceOffset) // We don't want to see the banned data.
×
965

×
966
        x.AssertTruef(len(bopts.Dir) > 0, "No posting or wal dir specified.")
×
967
        fmt.Printf("Opening DB: %s\n", bopts.Dir)
×
968

×
969
        db, err := badger.OpenManaged(bopts)
×
970
        x.Check(err)
×
971
        // Not using posting list cache
×
972
        posting.Init(db, 0)
×
973
        defer db.Close()
×
974

×
975
        printSummary(db)
×
976
        if opt.onlySummary {
×
977
                return
×
978
        }
×
979

980
        // Commenting the following out because on large Badger DBs, this can take a LONG time.
981
        // min, max := getMinMax(db, opt.readTs)
982
        // fmt.Printf("Min commit: %d. Max commit: %d, w.r.t %d\n", min, max, opt.readTs)
983

984
        switch {
×
985
        case len(opt.rollupKey) > 0:
×
986
                rollupKey(db)
×
987
        case len(opt.keyLookup) > 0:
×
988
                lookup(db)
×
989
        case len(opt.jepsen) > 0:
×
990
                jepsen(db)
×
991
        case opt.vals:
×
992
                total := seekTotal(db, opt.readTs)
×
993
                fmt.Printf("Total: %d\n", total)
×
994
        case opt.sizeHistogram:
×
995
                sizeHistogram(db)
×
996
        default:
×
997
                printKeys(db)
×
998
        }
999
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc