• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

ankur-anand / unisondb / 15026438913

14 May 2025 04:48PM UTC coverage: 82.399% (-0.2%) from 82.642%
15026438913

Pull #111

github

ankur-anand
fix: memtable test case for temp dir.
Pull Request #111: fix: memtable test case for temp dir.

5777 of 7011 relevant lines covered (82.4%)

13875.52 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

78.57
/internal/services/relayer/local.go
1
package relayer
2

3
import (
4
        "context"
5
        "errors"
6
        "fmt"
7
        "io"
8
        "log/slog"
9
        "strconv"
10
        "time"
11

12
        "github.com/ankur-anand/unisondb/dbkernel"
13
        "github.com/ankur-anand/unisondb/pkg/replicator"
14
        "github.com/ankur-anand/unisondb/schemas/logrecord"
15
        v1 "github.com/ankur-anand/unisondb/schemas/proto/gen/go/unisondb/streamer/v1"
16
        "github.com/prometheus/client_golang/prometheus"
17
        "github.com/prometheus/client_golang/prometheus/promauto"
18
)
19

20
const segmentLagEmitThreshold = 3
21

22
var (
23
        localSegmentLagGauge = promauto.NewGaugeVec(prometheus.GaugeOpts{
24
                Namespace: "unisondb",
25
                Subsystem: "local_relayer",
26
                Name:      "wal_segment_lag",
27
                Help:      "Difference in segment IDs between upstream and noop replica",
28
                // id can really increment cardinality of the metrics, should only be used in test or controlled
29
                // env.
30
        }, []string{"namespace", "id"})
31

32
        localWALReplicatedTotal = promauto.NewCounterVec(prometheus.CounterOpts{
33
                Namespace: "unisondb",
34
                Subsystem: "local_relayer",
35
                Name:      "wal_records_replicated_total",
36
                Help:      "Cumulative count of WAL records successfully replicated locally.",
37
        }, []string{"namespace"})
38
)
39

40
// LocalWalRelayer encodes all the parameter needed to start local relayer and used for testing purpose only.
41
type LocalWalRelayer struct {
42
        id              string
43
        lastOffset      *dbkernel.Offset
44
        replicatedCount int
45
        lsn             uint64
46
}
47

48
func NewLocalWalRelayer(id int) *LocalWalRelayer {
10✔
49
        return &LocalWalRelayer{
10✔
50
                id: strconv.Itoa(id),
10✔
51
        }
10✔
52
}
10✔
53

54
// Run starts the relayer which continuously pulls WAL records and lag emits metrics.
55
func (n *LocalWalRelayer) Run(ctx context.Context, engine *dbkernel.Engine, metricsTickInterval time.Duration) error {
10✔
56
        rpInstance := replicator.NewReplicator(engine,
10✔
57
                20,
10✔
58
                100*time.Millisecond, n.lastOffset, "local")
10✔
59

10✔
60
        walReceiver := make(chan []*v1.WALRecord, 2)
10✔
61
        replicatorErrors := make(chan error, 2)
10✔
62
        go func() {
20✔
63
                err := rpInstance.Replicate(ctx, walReceiver)
10✔
64
                replicatorErrors <- err
10✔
65
        }()
10✔
66

67
        ticker := time.NewTicker(metricsTickInterval)
10✔
68
        defer ticker.Stop()
10✔
69
        namespace := engine.Namespace()
10✔
70
        for {
4,010✔
71
                select {
4,000✔
72
                case <-ctx.Done():
10✔
73
                        var segmentLag int
10✔
74
                        segment := -1
10✔
75
                        if n.lastOffset != nil {
20✔
76
                                segmentLag = int(engine.CurrentOffset().SegmentID) - int(n.lastOffset.SegmentID)
10✔
77
                                segment = int(n.lastOffset.SegmentID)
10✔
78
                        }
10✔
79
                        slog.Debug("[unisondb.relayer]",
10✔
80
                                slog.String("event_type", "local.relayer.sync.stats"),
10✔
81
                                slog.String("namespace", namespace),
10✔
82
                                slog.Int("segment", segment),
10✔
83
                                slog.Int("segment_lag", segmentLag),
10✔
84
                                slog.Int("replicated", n.replicatedCount),
10✔
85
                        )
10✔
86
                        return nil
10✔
87
                case records := <-walReceiver:
1,990✔
88
                        if len(records) == 0 {
1,990✔
89
                                continue
×
90
                        }
91
                        for _, record := range records {
3,980✔
92
                                localWALReplicatedTotal.WithLabelValues(namespace).Inc()
1,990✔
93
                                fbRecord := logrecord.GetRootAsLogRecord(record.Record, 0)
1,990✔
94
                                receivedLSN := fbRecord.Lsn()
1,990✔
95
                                if receivedLSN != n.lsn+1 {
1,990✔
96
                                        panic(fmt.Sprintf("received wrong LSN %d, want %d", receivedLSN, n.lsn+1))
×
97
                                }
98
                                n.lsn++
1,990✔
99
                        }
100
                        n.replicatedCount += len(records)
1,990✔
101
                        n.lastOffset = dbkernel.DecodeOffset(records[len(records)-1].Offset)
1,990✔
102
                case err := <-replicatorErrors:
×
103
                        if errors.Is(err, context.Canceled) || errors.Is(err, io.EOF) {
×
104
                                return nil
×
105
                        }
×
106
                        panic(err)
×
107
                case <-ticker.C:
2,000✔
108
                        if n.lastOffset != nil {
3,997✔
109
                                segmentLag := int(engine.CurrentOffset().SegmentID) - int(n.lastOffset.SegmentID)
1,997✔
110
                                if segmentLag >= segmentLagEmitThreshold {
1,997✔
111
                                        localSegmentLagGauge.WithLabelValues(namespace, n.id).Set(float64(segmentLag))
×
112
                                        slog.Info("[unisondb.relayer]",
×
113
                                                slog.String("event_type", "local.relayer.sync.stats"),
×
114
                                                slog.String("namespace", namespace),
×
115
                                                slog.Int("segment", int(n.lastOffset.SegmentID)),
×
116
                                                slog.Int("segment_lag", segmentLag),
×
117
                                                slog.Int("replicated", n.replicatedCount),
×
118
                                        )
×
119
                                } else {
1,997✔
120
                                        localSegmentLagGauge.DeleteLabelValues(namespace, n.id)
1,997✔
121
                                }
1,997✔
122
                        }
123
                }
124
        }
125
}
126

127
// StartNLocalRelayer launches multiple local relayers for a given engine.
128
func StartNLocalRelayer(ctx context.Context, engine *dbkernel.Engine, num int, metricsTickInterval time.Duration) error {
1✔
129
        if num <= 0 {
1✔
130
                return nil
×
131
        }
×
132
        slog.Info("[unisondb.relayer]",
1✔
133
                slog.String("event_type", "starting.local.relayer"),
1✔
134
                slog.String("namespace", engine.Namespace()),
1✔
135
                slog.Int("num", num),
1✔
136
        )
1✔
137
        for i := 0; i < num; i++ {
11✔
138
                rep := NewLocalWalRelayer(i)
10✔
139

10✔
140
                go func(r *LocalWalRelayer) {
20✔
141
                        if err := r.Run(ctx, engine, metricsTickInterval); err != nil && !errors.Is(err, context.Canceled) {
10✔
142
                                panic(err)
×
143
                        }
144
                }(rep)
145
        }
146

147
        return nil
1✔
148
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc