• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

ankur-anand / unisondb / 15026438913

14 May 2025 04:48PM UTC coverage: 82.399% (-0.2%) from 82.642%
15026438913

Pull #111

github

ankur-anand
fix: memtable test case for temp dir.
Pull Request #111: fix: memtable test case for temp dir.

5777 of 7011 relevant lines covered (82.4%)

13875.52 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

78.47
/dbkernel/internal/wal/wal.go
1
package wal
2

3
import (
4
        "context"
5
        "io"
6
        "log"
7
        "log/slog"
8
        "math/rand"
9
        "slices"
10
        "sync/atomic"
11
        "time"
12

13
        "github.com/ankur-anand/unisondb/pkg/umetrics"
14
        "github.com/ankur-anand/unisondb/pkg/walfs"
15
        "github.com/ankur-anand/unisondb/schemas/logrecord"
16
        "github.com/pkg/errors"
17
)
18

19
var (
20
        metricWalWriteTotal        = "write_total"
21
        metricWalWriteError        = "write_errors_total"
22
        metricWalWriteDuration     = "write_duration_seconds"
23
        metricsSegmentRotatedTotal = "segment_rotated_total"
24
        metricsWalReadTotal        = "read_total"
25
        metricsWalReadErrorTotal   = "read_error_total"
26
        metricsWalReadDuration     = "read_durations_second"
27
        metricsFSyncTotal          = "fsync_total"
28
        metricsFSyncErrors         = "fsync_errors_total"
29
        metricsFSyncDurations      = "fsync_durations_seconds"
30
)
31

32
// Offset is a type alias to underlying wal implementation.
33
type Offset = walfs.RecordPosition
34

35
type SegID = walfs.SegmentID
36

37
func DecodeOffset(b []byte) *Offset {
1✔
38
        pos, _ := walfs.DecodeRecordPosition(b)
1✔
39
        return &pos
1✔
40
}
1✔
41

42
// WalIO provides a write and read to underlying file based wal store.
43
type WalIO struct {
44
        appendLog   *walfs.WALog
45
        namespace   string
46
        taggedScope umetrics.Scope
47
}
48

49
func NewWalIO(dirname, namespace string, config *Config) (*WalIO, error) {
3✔
50
        config.applyDefaults()
3✔
51
        taggedScope := umetrics.AutoScope().Tagged(map[string]string{
3✔
52
                "namespace": namespace,
3✔
53
        })
3✔
54
        callbackOnRotate := func() {
3✔
55
                taggedScope.Counter(metricsSegmentRotatedTotal).Inc(1)
×
56
        }
×
57
        wLog, err := walfs.NewWALog(dirname, ".seg", walfs.WithMaxSegmentSize(config.SegmentSize),
3✔
58
                walfs.WithMSyncEveryWrite(config.FSync), walfs.WithBytesPerSync(int64(config.BytesPerSync)),
3✔
59
                walfs.WithOnSegmentRotated(callbackOnRotate),
3✔
60
                walfs.WithAutoCleanupPolicy(config.MaxAge, config.MinSegment, config.MaxSegment, config.AutoCleanup))
3✔
61
        if err != nil {
3✔
62
                return nil, err
×
63
        }
×
64

65
        return &WalIO{
3✔
66
                        appendLog:   wLog,
3✔
67
                        namespace:   namespace,
3✔
68
                        taggedScope: taggedScope,
3✔
69
                },
3✔
70
                err
3✔
71
}
72

73
func (w *WalIO) RunWalCleanup(ctx context.Context, interval time.Duration, predicate walfs.DeletionPredicate) {
×
74
        go func() {
×
75
                ticker := time.NewTicker(interval)
×
76
                defer ticker.Stop()
×
77
                for {
×
78
                        select {
×
79
                        case <-ctx.Done():
×
80
                                return
×
81
                        case <-ticker.C:
×
82
                                w.appendLog.MarkSegmentsForDeletion()
×
83
                        }
84
                }
85
        }()
86
        jitter := rand.Intn(10)
×
87
        dur := time.Duration(jitter)*time.Second + interval
×
88
        w.appendLog.StartPendingSegmentCleaner(ctx, dur, predicate)
×
89
}
90

91
// Sync Flushes the wal using fsync.
92
func (w *WalIO) Sync() error {
4✔
93
        w.taggedScope.Counter(metricsFSyncTotal).Inc(1)
4✔
94
        startTime := time.Now()
4✔
95
        defer func() {
8✔
96
                w.taggedScope.Histogram(metricsFSyncDurations, fsyncLatencyBuckets).RecordDuration(time.Since(startTime))
4✔
97
        }()
4✔
98
        err := w.appendLog.Sync()
4✔
99
        if err != nil {
4✔
100
                w.taggedScope.Counter(metricsFSyncErrors).Inc(1)
×
101
        }
×
102
        return err
4✔
103
}
104

105
func (w *WalIO) Close() error {
3✔
106
        err := w.Sync()
3✔
107
        if err != nil {
3✔
108
                slog.Error("[unisondb.wal] Fsync to log file failed]", "error", err)
×
109
                w.taggedScope.Counter(metricsFSyncErrors).Inc(1)
×
110
        }
×
111
        return w.appendLog.Close()
3✔
112
}
113

114
func (w *WalIO) Read(pos *Offset) ([]byte, error) {
4✔
115
        w.taggedScope.Counter(metricsWalReadTotal).Inc(1)
4✔
116
        startTime := time.Now()
4✔
117
        defer func() {
8✔
118
                w.taggedScope.Histogram(metricsWalReadDuration, readLatencyBuckets).RecordDuration(time.Since(startTime))
4✔
119
        }()
4✔
120
        value, err := w.appendLog.Read(*pos)
4✔
121
        if err != nil {
5✔
122
                w.taggedScope.Counter(metricsWalReadErrorTotal).Inc(1)
1✔
123
        }
1✔
124
        return value, err
4✔
125
}
126

127
func (w *WalIO) Append(data []byte) (*Offset, error) {
420✔
128
        w.taggedScope.Counter(metricWalWriteTotal).Inc(1)
420✔
129
        startTime := time.Now()
420✔
130
        defer func() {
840✔
131
                duration := time.Since(startTime)
420✔
132
                if rand.Float64() < 0.1 || duration > 10*time.Millisecond {
462✔
133
                        w.taggedScope.Histogram(metricWalWriteDuration, writeLatencyBuckets).RecordDuration(duration)
42✔
134
                }
42✔
135
        }()
136

137
        off, err := w.appendLog.Write(data)
420✔
138
        if errors.Is(err, walfs.ErrFsync) {
420✔
139
                log.Fatalf("[unisondb.wal] write to log file failed: %v", err)
×
140
        }
×
141
        if err != nil {
420✔
142
                w.taggedScope.Counter(metricWalWriteError).Inc(1)
×
143
        }
×
144
        return &off, err
420✔
145
}
146

147
type Reader struct {
148
        appendReader *walfs.Reader
149
        closed       atomic.Bool
150
        namespace    string
151
        taggedScope  umetrics.Scope
152
}
153

154
// Next returns the next chunk data and its position in the WAL.
155
// If there is no data, io. EOF will be returned.
156
// The position can be used to read the data from the segment file.
157
func (r *Reader) Next() ([]byte, *Offset, error) {
9✔
158
        if r.closed.Load() {
11✔
159
                return nil, nil, io.EOF
2✔
160
        }
2✔
161
        startTime := time.Now()
7✔
162
        value, off, err := r.appendReader.Next()
7✔
163
        if err != nil && !errors.Is(err, io.EOF) {
7✔
164
                r.taggedScope.Counter(metricsWalReadTotal).Inc(1)
×
165
                r.taggedScope.Counter(metricsWalReadErrorTotal).Inc(1)
×
166
        }
×
167
        if errors.Is(err, io.EOF) {
10✔
168
                r.Close()
3✔
169
        }
3✔
170
        if err == nil {
11✔
171
                duration := time.Since(startTime)
4✔
172
                // we are sampling only 10% of fast-path reads, but always capture slow reads
4✔
173
                if rand.Float64() < 0.1 || duration > 10*time.Millisecond {
4✔
174
                        r.taggedScope.Histogram(metricsWalReadDuration, readLatencyBuckets).RecordDuration(duration)
×
175
                }
×
176
                r.taggedScope.Counter(metricsWalReadTotal).Inc(1)
4✔
177
        }
178
        return value, off, err
7✔
179
}
180

181
func (r *Reader) Close() {
4✔
182
        if r.closed.CompareAndSwap(false, true) {
8✔
183
                r.appendReader.Close()
4✔
184
        }
4✔
185
}
186

187
// NewReader returns a new instance of WIOReader, allowing the caller to
188
// access WAL logs for replication, recovery, or log processing.
189
func (w *WalIO) NewReader() (*Reader, error) {
3✔
190
        return &Reader{
3✔
191
                appendReader: w.appendLog.NewReader(),
3✔
192
                namespace:    w.namespace,
3✔
193
                taggedScope:  w.taggedScope,
3✔
194
        }, nil
3✔
195
}
3✔
196

197
// NewReaderWithStart returns a new instance of WIOReader from the provided Offset.
198
func (w *WalIO) NewReaderWithStart(offset *Offset) (*Reader, error) {
1✔
199
        reader, err := w.appendLog.NewReaderWithStart(*offset)
1✔
200
        return &Reader{
1✔
201
                appendReader: reader,
1✔
202
                namespace:    w.namespace,
1✔
203
                taggedScope:  w.taggedScope,
1✔
204
        }, err
1✔
205
}
1✔
206

207
// GetTransactionRecords returns all the WalRecord that is part of the particular Txn.
208
func (w *WalIO) GetTransactionRecords(startOffset *Offset) ([]*logrecord.LogRecord, error) {
4✔
209
        if startOffset == nil {
6✔
210
                return nil, nil
2✔
211
        }
2✔
212

213
        var records []*logrecord.LogRecord
2✔
214
        nextOffset := startOffset
2✔
215

2✔
216
        for {
5✔
217
                walEntry, err := w.Read(nextOffset)
3✔
218

3✔
219
                if err != nil {
4✔
220
                        return nil, errors.Wrapf(ErrWalNextOffset, "failed to read WAL at offset %+v: %s", nextOffset, err)
1✔
221
                }
1✔
222

223
                record := logrecord.GetRootAsLogRecord(walEntry, 0)
2✔
224
                records = append(records, record)
2✔
225

2✔
226
                if record.PrevTxnWalIndexLength() == 0 {
3✔
227
                        break
1✔
228
                }
229

230
                nextOffset = DecodeOffset(record.PrevTxnWalIndexBytes())
1✔
231
        }
232

233
        slices.Reverse(records)
1✔
234
        return records, nil
1✔
235
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc