• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

ankur-anand / unisondb / 15591886029

11 Jun 2025 05:44PM UTC coverage: 81.998% (-0.2%) from 82.152%
15591886029

push

github

web-flow
chore: add doc and log for wal (#130)

* chore: add doc and log for wal

* structure error msg.

* remove app name from log

17 of 29 new or added lines in 3 files covered. (58.62%)

17 existing lines in 1 file now uncovered.

5812 of 7088 relevant lines covered (82.0%)

13526.06 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

72.73
/dbkernel/internal/wal/wal.go
1
package wal
2

3
import (
4
        "context"
5
        "io"
6
        "log"
7
        "log/slog"
8
        "math/rand"
9
        "slices"
10
        "sync/atomic"
11
        "time"
12

13
        "github.com/ankur-anand/unisondb/pkg/umetrics"
14
        "github.com/ankur-anand/unisondb/pkg/walfs"
15
        "github.com/ankur-anand/unisondb/schemas/logrecord"
16
        "github.com/pkg/errors"
17
        "github.com/uber-go/tally/v4"
18
)
19

20
var segmentEntryBuckets = tally.MustMakeExponentialValueBuckets(500, 2, 10)
21

22
var (
23
        metricWalWriteTotal        = "write_total"
24
        metricWalWriteError        = "write_errors_total"
25
        metricWalWriteDuration     = "write_duration_seconds"
26
        metricsSegmentRotatedTotal = "segment_rotated_total"
27
        metricsReaderCreatedTotal  = "reader_created_total"
28
        metricsWalReadErrorTotal   = "read_error_total"
29
        metricsWalReadDuration     = "read_durations_second"
30
        metricsFSyncTotal          = "fsync_total"
31
        metricsFSyncErrors         = "fsync_errors_total"
32
        metricsFSyncDurations      = "fsync_durations_seconds"
33
        metricWalBytesWrittenTotal = "bytes_written_total"
34
        metricWalEntriesPerSegment = "entries_per_segment"
35
)
36

37
// Offset is a type alias to underlying wal implementation.
38
type Offset = walfs.RecordPosition
39

40
type SegID = walfs.SegmentID
41

42
func DecodeOffset(b []byte) *Offset {
1✔
43
        pos, _ := walfs.DecodeRecordPosition(b)
1✔
44
        return &pos
1✔
45
}
1✔
46

47
const (
48
        // We are sampling the read duration in the reader hot path.
49
        // we are avoiding random as one its more intensive than CPU masking in power2.
50
        // 16383 mask gives us sampling in the range of (1 in 16,384) 0.006%
51
        // So we only record few in hot reader path for each reader.
52
        samplingMask = 16383
53
)
54

55
// WalIO provides a write and read to underlying file based wal store.
56
type WalIO struct {
57
        appendLog        *walfs.WALog
58
        namespace        string
59
        taggedScope      umetrics.Scope
60
        entriesInSegment atomic.Int64
61
}
62

63
// NewWalIO initializes and returns a new instance of WalIO.
64
func NewWalIO(dirname, namespace string, config *Config) (*WalIO, error) {
3✔
65
        config.applyDefaults()
3✔
66
        taggedScope := umetrics.AutoScope().Tagged(map[string]string{
3✔
67
                "namespace": namespace,
3✔
68
        })
3✔
69

3✔
70
        w := &WalIO{
3✔
71
                namespace:   namespace,
3✔
72
                taggedScope: taggedScope,
3✔
73
        }
3✔
74

3✔
75
        callbackOnRotate := func() {
3✔
76
                old := w.entriesInSegment.Swap(0)
×
77
                taggedScope.Counter(metricsSegmentRotatedTotal).Inc(1)
×
78
                taggedScope.Histogram(metricWalEntriesPerSegment, segmentEntryBuckets).RecordValue(float64(old))
×
79
        }
×
80

81
        wLog, err := walfs.NewWALog(dirname, ".seg", walfs.WithMaxSegmentSize(config.SegmentSize),
3✔
82
                walfs.WithMSyncEveryWrite(config.FSync), walfs.WithBytesPerSync(int64(config.BytesPerSync)),
3✔
83
                walfs.WithOnSegmentRotated(callbackOnRotate),
3✔
84
                walfs.WithAutoCleanupPolicy(config.MaxAge, config.MinSegment, config.MaxSegment, config.AutoCleanup))
3✔
85

3✔
86
        if err != nil {
3✔
87
                return nil, err
×
88
        }
×
89
        w.appendLog = wLog
3✔
90
        return w, err
3✔
91
}
92

93
// RunWalCleanup starts background cleanup routines for old WAL segments.
94
func (w *WalIO) RunWalCleanup(ctx context.Context, interval time.Duration, predicate walfs.DeletionPredicate) {
×
95
        go func() {
×
96
                ticker := time.NewTicker(interval)
×
97
                defer ticker.Stop()
×
98
                for {
×
99
                        select {
×
100
                        case <-ctx.Done():
×
101
                                return
×
102
                        case <-ticker.C:
×
103
                                w.appendLog.MarkSegmentsForDeletion()
×
104
                        }
105
                }
106
        }()
107
        jitter := rand.Intn(10)
×
108
        dur := time.Duration(jitter)*time.Second + interval
×
109
        w.appendLog.StartPendingSegmentCleaner(ctx, dur, predicate)
×
110
}
111

112
// Sync Flushes the wal using fsync.
113
func (w *WalIO) Sync() error {
4✔
114
        w.taggedScope.Counter(metricsFSyncTotal).Inc(1)
4✔
115
        startTime := time.Now()
4✔
116
        defer func() {
8✔
117
                w.taggedScope.Histogram(metricsFSyncDurations, fsyncLatencyBuckets).RecordDuration(time.Since(startTime))
4✔
118
        }()
4✔
119
        err := w.appendLog.Sync()
4✔
120
        if err != nil {
4✔
121
                w.taggedScope.Counter(metricsFSyncErrors).Inc(1)
×
122
        }
×
123
        return err
4✔
124
}
125

126
// Close flushes WAL data to disk using fsync and then closes all underlying WAL segments.
127
func (w *WalIO) Close() error {
3✔
128
        err := w.Sync()
3✔
129
        if err != nil {
3✔
NEW
130
                slog.Error("[wal]", slog.String("message", "Failed to fsync WAL segment"),
×
NEW
131
                        slog.Any("error", err))
×
132
                w.taggedScope.Counter(metricsFSyncErrors).Inc(1)
×
133
        }
×
134
        return w.appendLog.Close()
3✔
135
}
136

137
// Read retrieves the WAL record at the given position.
138
func (w *WalIO) Read(pos *Offset) ([]byte, error) {
126✔
139
        startTime := time.Now()
126✔
140
        defer func() {
252✔
141
                w.taggedScope.Histogram(metricsWalReadDuration, readLatencyBuckets).RecordDuration(time.Since(startTime))
126✔
142
        }()
126✔
143
        value, err := w.appendLog.Read(*pos)
126✔
144
        if err != nil {
127✔
145
                w.taggedScope.Counter(metricsWalReadErrorTotal).Inc(1)
1✔
146
        }
1✔
147
        return value, err
126✔
148
}
149

150
// Append writes the provided data as a new record to the active WAL segment.
151
func (w *WalIO) Append(data []byte) (*Offset, error) {
420✔
152
        w.taggedScope.Counter(metricWalWriteTotal).Inc(1)
420✔
153
        startTime := time.Now()
420✔
154
        defer func() {
840✔
155
                duration := time.Since(startTime)
420✔
156
                w.taggedScope.Histogram(metricWalWriteDuration, writeLatencyBuckets).RecordDuration(duration)
420✔
157
        }()
420✔
158
        w.taggedScope.Counter(metricWalBytesWrittenTotal).Inc(int64(len(data)))
420✔
159
        off, err := w.appendLog.Write(data)
420✔
160
        if errors.Is(err, walfs.ErrFsync) {
420✔
NEW
161
                log.Fatalf("[wal] write to log file failed: %v", err)
×
162
        }
×
163
        if err != nil {
420✔
164
                w.taggedScope.Counter(metricWalWriteError).Inc(1)
×
165
        }
×
166
        if err == nil {
840✔
167
                w.entriesInSegment.Add(1)
420✔
168
        }
420✔
169
        return &off, err
420✔
170
}
171

172
// ReaderOption defines a functional option for configuring a Reader instance.
173
type ReaderOption func(*Reader)
174

175
// WithActiveTail enables or disables active tail reading mode for the WAL reader.
176
func WithActiveTail(enabled bool) ReaderOption {
×
177
        return func(r *Reader) {
×
178
                r.withActiveTail = enabled
×
179
        }
×
180
}
181

182
// Reader provides a forward-only iterator over WAL records and is not concurrent safe.
183
type Reader struct {
184
        appendReader   *walfs.Reader
185
        closed         atomic.Bool
186
        namespace      string
187
        taggedScope    umetrics.Scope
188
        withActiveTail bool
189
        readCount      int
190
}
191

192
// Next returns the next chunk data and its position in the WAL.
193
// If there is no data, io. EOF will be returned.
194
// The position can be used to read the data from the segment file.
195
// The returned data is a memory-mapped slice — user must copy the data if retention of data is needed.
196
func (r *Reader) Next() ([]byte, Offset, error) {
9✔
197
        if r.closed.Load() {
11✔
198
                return nil, walfs.NilRecordPosition, io.EOF
2✔
199
        }
2✔
200

201
        start := time.Now()
7✔
202
        data, pos, err := r.appendReader.Next()
7✔
203

7✔
204
        if err != nil {
10✔
205
                switch {
3✔
206
                case errors.Is(err, walfs.ErrNoNewData):
3✔
207
                        if !r.withActiveTail {
6✔
208
                                r.Close()
3✔
209
                                return nil, walfs.NilRecordPosition, io.EOF
3✔
210
                        }
3✔
NEW
211
                        return nil, walfs.NilRecordPosition, err
×
212

NEW
213
                case errors.Is(err, io.EOF):
×
NEW
214
                        r.Close()
×
NEW
215
                        return nil, walfs.NilRecordPosition, err
×
216

NEW
217
                case err != nil:
×
NEW
218
                        r.taggedScope.Counter(metricsWalReadErrorTotal).Inc(1)
×
NEW
219
                        return nil, walfs.NilRecordPosition, err
×
220
                }
221
        }
222

223
        r.readCount++
4✔
224
        // Successful read
4✔
225
        dur := time.Since(start)
4✔
226
        if r.readCount&samplingMask == 0 || dur > 10*time.Millisecond {
4✔
227
                r.taggedScope.Histogram(metricsWalReadDuration, readLatencyBuckets).RecordDuration(dur)
×
228
        }
×
229

230
        return data, pos, nil
4✔
231
}
232

233
// Close releases the underlying segment reader and marks the Reader as closed.
234
func (r *Reader) Close() {
4✔
235
        if r.closed.CompareAndSwap(false, true) {
8✔
236
                r.appendReader.Close()
4✔
237
        }
4✔
238
}
239

240
// NewReader returns a new instance of WIOReader, allowing the caller to
241
// access WAL logs for replication, recovery, or log processing.
242
func (w *WalIO) NewReader(options ...ReaderOption) (*Reader, error) {
3✔
243
        reader := &Reader{
3✔
244
                appendReader: w.appendLog.NewReader(),
3✔
245
                namespace:    w.namespace,
3✔
246
                taggedScope:  w.taggedScope,
3✔
247
        }
3✔
248

3✔
249
        for _, opt := range options {
3✔
250
                opt(reader)
×
251
        }
×
252

253
        w.taggedScope.Counter(metricsReaderCreatedTotal).Inc(1)
3✔
254
        return reader, nil
3✔
255
}
256

257
// NewReaderWithStart returns a new instance of WIOReader from the provided Offset.
258
func (w *WalIO) NewReaderWithStart(offset *Offset, options ...ReaderOption) (*Reader, error) {
1✔
259
        underlyingReader, err := w.appendLog.NewReaderWithStart(*offset)
1✔
260
        if err != nil {
1✔
261
                return nil, err
×
262
        }
×
263
        reader := &Reader{
1✔
264
                appendReader: underlyingReader,
1✔
265
                namespace:    w.namespace,
1✔
266
                taggedScope:  w.taggedScope,
1✔
267
        }
1✔
268

1✔
269
        for _, opt := range options {
1✔
270
                opt(reader)
×
271
        }
×
272

273
        w.taggedScope.Counter(metricsReaderCreatedTotal).Inc(1)
1✔
274
        return reader, nil
1✔
275
}
276

277
// GetTransactionRecords returns all the WalRecord that is part of the particular Txn.
278
func (w *WalIO) GetTransactionRecords(startOffset *Offset) ([]*logrecord.LogRecord, error) {
4✔
279
        if startOffset == nil {
6✔
280
                return nil, nil
2✔
281
        }
2✔
282

283
        var records []*logrecord.LogRecord
2✔
284
        nextOffset := startOffset
2✔
285

2✔
286
        for {
5✔
287
                walEntry, err := w.Read(nextOffset)
3✔
288

3✔
289
                if err != nil {
4✔
290
                        return nil, errors.Wrapf(ErrWalNextOffset, "failed to read WAL at offset %+v: %s", nextOffset, err)
1✔
291
                }
1✔
292

293
                record := logrecord.GetRootAsLogRecord(walEntry, 0)
2✔
294
                records = append(records, record)
2✔
295

2✔
296
                if record.PrevTxnWalIndexLength() == 0 {
3✔
297
                        break
1✔
298
                }
299

300
                nextOffset = DecodeOffset(record.PrevTxnWalIndexBytes())
1✔
301
        }
302

303
        slices.Reverse(records)
1✔
304
        return records, nil
1✔
305
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc