• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

ankur-anand / unisondb / 15591886029

11 Jun 2025 05:44PM UTC coverage: 81.998% (-0.2%) from 82.152%
15591886029

push

github

web-flow
chore: add doc and log for wal (#130)

* chore: add doc and log for wal

* structure error msg.

* remove app name from log

17 of 29 new or added lines in 3 files covered. (58.62%)

17 existing lines in 1 file now uncovered.

5812 of 7088 relevant lines covered (82.0%)

13526.06 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

67.74
/internal/services/streamer/grpc_streamer.go
1
package streamer
2

3
import (
4
        "context"
5
        "errors"
6
        "fmt"
7
        "log/slog"
8
        "time"
9

10
        "github.com/ankur-anand/unisondb/dbkernel"
11
        "github.com/ankur-anand/unisondb/internal"
12
        "github.com/ankur-anand/unisondb/internal/grpcutils"
13
        "github.com/ankur-anand/unisondb/internal/services"
14
        "github.com/ankur-anand/unisondb/pkg/replicator"
15
        v1 "github.com/ankur-anand/unisondb/schemas/proto/gen/go/unisondb/streamer/v1"
16
        "golang.org/x/sync/errgroup"
17
        "google.golang.org/grpc"
18
        "google.golang.org/grpc/codes"
19
        "google.golang.org/grpc/status"
20
        "google.golang.org/protobuf/types/known/timestamppb"
21
)
22

23
const (
24
        reqIDKey = "request_id"
25
)
26

27
var (
28
        // batchWaitTime defines a timeout value after which even if batch size,
29
        // threshold is not met all the reads from replicator will be flushed onto the channel.
30
        batchWaitTime = time.Millisecond * 100
31

32
        // batchSize defines a size of batch.
33
        batchSize = 20
34

35
        grpcMaxMsgSize = 1 << 20
36
)
37

38
// GrpcStreamer implements gRPC-based WalStreamerService.
39
type GrpcStreamer struct {
40
        // namespace mapped engine
41
        storageEngines map[string]*dbkernel.Engine
42
        dynamicTimeout time.Duration
43
        v1.UnimplementedWalStreamerServiceServer
44
        errGrp   *errgroup.Group
45
        shutdown chan struct{}
46
}
47

48
// NewGrpcStreamer returns an initialized GrpcStreamer that implements  grpc-based WalStreamerService.
49
func NewGrpcStreamer(errGrp *errgroup.Group, storageEngines map[string]*dbkernel.Engine, dynamicTimeout time.Duration) *GrpcStreamer {
6✔
50
        return &GrpcStreamer{
6✔
51
                storageEngines: storageEngines,
6✔
52
                dynamicTimeout: dynamicTimeout,
6✔
53
                errGrp:         errGrp,
6✔
54
                shutdown:       make(chan struct{}),
6✔
55
        }
6✔
56
}
6✔
57

58
func (s *GrpcStreamer) Close() {
×
59
        close(s.shutdown)
×
60
}
×
61

62
// GetLatestOffset returns the latest wal offset of the wal for the provided namespace.
63
func (s *GrpcStreamer) GetLatestOffset(ctx context.Context, _ *v1.GetLatestOffsetRequest) (*v1.GetLatestOffsetResponse, error) {
5✔
64
        namespace, reqID, method := grpcutils.GetRequestInfo(ctx)
5✔
65

5✔
66
        if namespace == "" {
6✔
67
                return nil, services.ToGRPCError(namespace, reqID, method, services.ErrMissingNamespaceInMetadata)
1✔
68
        }
1✔
69

70
        engine, ok := s.storageEngines[namespace]
4✔
71
        if !ok {
5✔
72
                return nil, services.ToGRPCError(namespace, reqID, method, services.ErrNamespaceNotExists)
1✔
73
        }
1✔
74

75
        offset := engine.CurrentOffset()
3✔
76
        if offset == nil {
4✔
77
                return &v1.GetLatestOffsetResponse{}, nil
1✔
78
        }
1✔
79
        return &v1.GetLatestOffsetResponse{Offset: offset.Encode()}, nil
2✔
80
}
81

82
// StreamWalRecords stream the underlying WAL record on the connection stream.
83
func (s *GrpcStreamer) StreamWalRecords(request *v1.StreamWalRecordsRequest, g grpc.ServerStreamingServer[v1.StreamWalRecordsResponse]) error {
17✔
84
        namespace, reqID, method := grpcutils.GetRequestInfo(g.Context())
17✔
85

17✔
86
        if namespace == "" {
17✔
87
                return services.ToGRPCError(namespace, reqID, method, services.ErrMissingNamespaceInMetadata)
×
88
        }
×
89

90
        engine, ok := s.storageEngines[namespace]
17✔
91
        if !ok {
19✔
92
                return services.ToGRPCError(namespace, reqID, method, services.ErrNamespaceNotExists)
2✔
93
        }
2✔
94

95
        // it can contain terrible data
96
        meta, err := decodeMetadata(request.GetOffset())
15✔
97
        if err != nil {
15✔
98
                slog.Error("[unisondb.streamer.grpc]",
×
99
                        slog.String("event_type", "metadata.decoding.failed"),
×
100
                        slog.Any("error", err),
×
101
                        slog.Any("server_offset", engine.CurrentOffset()),
×
102
                        slog.Group("request",
×
103
                                slog.String("namespace", namespace),
×
104
                                slog.String("id", string(reqID)),
×
105
                                slog.Any("offset", request.GetOffset()),
×
106
                        ),
×
107
                )
×
108
                return services.ToGRPCError(namespace, reqID, method, services.ErrInvalidMetadata)
×
109
        }
×
110
        // create a new replicator instance.
111
        slog.Debug("[unisondb.streamer.grpc] streaming WAL",
15✔
112
                "method", method,
15✔
113
                reqIDKey, reqID,
15✔
114
                "namespace", namespace,
15✔
115
                "offset", meta,
15✔
116
        )
15✔
117

15✔
118
        walReceiver := make(chan []*v1.WALRecord, 2)
15✔
119
        replicatorErr := make(chan error, 1)
15✔
120

15✔
121
        ctx, cancel := context.WithCancel(g.Context())
15✔
122
        defer cancel()
15✔
123

15✔
124
        rpInstance := replicator.NewReplicator(engine,
15✔
125
                batchSize,
15✔
126
                batchWaitTime, meta, "grpc")
15✔
127

15✔
128
        currentOffset := engine.CurrentOffset()
15✔
129
        if meta != nil && currentOffset == nil {
16✔
130
                return services.ToGRPCError(namespace, reqID, method, services.ErrInvalidMetadata)
1✔
131
        }
1✔
132

133
        // when server is closed, the goroutine would be closed upon
134
        // cancel of ctx.
135
        // walReceiver should be closed only when Replicate method returns.
136
        s.errGrp.Go(func() error {
28✔
137
                defer func() {
28✔
138
                        close(walReceiver)
14✔
139
                        close(replicatorErr)
14✔
140
                }()
14✔
141

142
                err := rpInstance.Replicate(ctx, walReceiver)
14✔
143
                select {
14✔
144
                case replicatorErr <- err:
7✔
145
                case <-ctx.Done():
7✔
146
                        return nil
7✔
147
                }
148

149
                return nil
7✔
150
        })
151

152
        metricsActiveStreamTotal.WithLabelValues(namespace, string(method), "grpc").Inc()
14✔
153
        defer metricsActiveStreamTotal.WithLabelValues(namespace, string(method), "grpc").Dec()
14✔
154
        return s.streamWalRecords(ctx, g, walReceiver, replicatorErr)
14✔
155
}
156

157
// streamWalRecords streams namespaced Write-Ahead Log (WAL) records to the client in batches
158
// .
159
// While it might be tempting to stream WAL records indefinitely until the client cancels the RPC,
160
// this approach has significant drawbacks and is avoided here using dynamic timeout mechanism.
161
// (Note:  GOAWAY settings handle underlying connection issues.)
162
// Continuously streaming also has several problems:
163
//
164
//   - Resource Exhaustion: A malfunctioning client stream could monopolize the server's buffer,
165
//   - Unnecessary Processing: The server might fetch and stream WAL records that the client has already
166
//     received or is no longer interested in.
167
//
168
//nolint:gocognit
169
func (s *GrpcStreamer) streamWalRecords(ctx context.Context,
170
        g grpc.ServerStreamingServer[v1.StreamWalRecordsResponse],
171
        walReceiver chan []*v1.WALRecord,
172
        replicatorErr chan error) error {
14✔
173
        namespace, reqID, method := grpcutils.GetRequestInfo(g.Context())
14✔
174

14✔
175
        var (
14✔
176
                batch                  []*v1.WALRecord
14✔
177
                totalBatchSize         int
14✔
178
                lastReceivedRecordTime = time.Now()
14✔
179
        )
14✔
180

14✔
181
        flusher := func() error {
35✔
182
                if err := s.flushBatch(batch, g); err != nil {
21✔
UNCOV
183
                        return err
×
UNCOV
184
                }
×
185
                batch = []*v1.WALRecord{}
21✔
186
                totalBatchSize = 0
21✔
187
                return nil
21✔
188
        }
189

190
        dynamicTimeoutTicker := time.NewTicker(s.dynamicTimeout)
14✔
191
        defer dynamicTimeoutTicker.Stop()
14✔
192

14✔
193
        for {
52✔
194
                select {
38✔
195
                case <-dynamicTimeoutTicker.C:
16✔
196
                        if time.Since(lastReceivedRecordTime) > s.dynamicTimeout {
29✔
197
                                return services.ToGRPCError(namespace, reqID, method, services.ErrStreamTimeout)
13✔
198
                        }
13✔
199
                case <-s.shutdown:
×
200
                        return status.Error(codes.Unavailable, internal.GracefulShutdownMsg)
×
201

202
                case <-ctx.Done():
1✔
203
                        err := ctx.Err()
1✔
204
                        if errors.Is(err, services.ErrStreamTimeout) {
1✔
205
                                return services.ToGRPCError(namespace, reqID, method, services.ErrStreamTimeout)
×
206
                        }
×
207
                        if errors.Is(err, context.Canceled) {
2✔
208
                                return nil
1✔
209
                        }
1✔
210
                        return ctx.Err()
×
211
                case walRecords := <-walReceiver:
21✔
212
                        for _, walRecord := range walRecords {
71✔
213
                                lastReceivedRecordTime = time.Now()
50✔
214
                                totalBatchSize += len(walRecord.Record)
50✔
215
                                batch = append(batch, walRecord)
50✔
216

50✔
217
                                if totalBatchSize >= grpcMaxMsgSize {
50✔
218
                                        if err := flusher(); err != nil {
×
219
                                                return services.ToGRPCError(namespace, reqID, method, err)
×
220
                                        }
×
221
                                }
222
                        }
223
                        // flush remaining
224
                        if err := flusher(); err != nil {
21✔
UNCOV
225
                                return services.ToGRPCError(namespace, reqID, method, err)
×
UNCOV
226
                        }
×
227
                case err := <-replicatorErr:
×
228
                        if errors.Is(err, dbkernel.ErrInvalidOffset) {
×
229
                                slog.Error("[unisondb.streamer.grpc]",
×
230
                                        slog.String("event_type", "replicator.offset.invalid"),
×
231
                                        slog.Any("error", err),
×
232
                                        slog.Group("request",
×
233
                                                slog.String("namespace", namespace),
×
234
                                                slog.String("id", string(reqID)),
×
235
                                        ),
×
236
                                )
×
237

×
238
                                return services.ToGRPCError(namespace, reqID, method, services.ErrInvalidMetadata)
×
239
                        }
×
240
                        return services.ToGRPCError(namespace, reqID, method, err)
×
241
                }
242
        }
243
}
244

245
func (s *GrpcStreamer) flushBatch(batch []*v1.WALRecord, g grpc.ServerStream) error {
21✔
246
        namespace, reqID, method := grpcutils.GetRequestInfo(g.Context())
21✔
247
        metricsStreamSendTotal.WithLabelValues(namespace, string(method), "grpc").Add(float64(len(batch)))
21✔
248
        if len(batch) == 0 {
21✔
249
                return nil
×
250
        }
×
251

252
        slog.Debug("[unisondb.streamer.grpc] Batch flushing", "size", len(batch))
21✔
253
        response := &v1.StreamWalRecordsResponse{Records: batch, ServerTimestamp: timestamppb.Now()}
21✔
254

21✔
255
        start := time.Now()
21✔
256
        defer func() {
42✔
257
                metricsStreamSendLatency.WithLabelValues(namespace, string(method), "grpc").Observe(time.Since(start).Seconds())
21✔
258
        }()
21✔
259

260
        if err := g.SendMsg(response); err != nil {
21✔
UNCOV
261
                slog.Error("[unisondb.streamer.grpc]",
×
UNCOV
262
                        slog.String("event_type", "send.wal.records.failed"),
×
UNCOV
263
                        slog.Any("error", err),
×
UNCOV
264
                        slog.Int("records_count", len(batch)),
×
UNCOV
265
                        slog.Group("request",
×
UNCOV
266
                                slog.String("namespace", namespace),
×
UNCOV
267
                                slog.String("id", string(reqID)),
×
UNCOV
268
                        ),
×
UNCOV
269
                )
×
UNCOV
270

×
UNCOV
271
                metricsStreamSendErrors.WithLabelValues(namespace, string(method), "grpc").Inc()
×
UNCOV
272
                return fmt.Errorf("failed to send WAL records: %w", err)
×
UNCOV
273
        }
×
274
        return nil
21✔
275
}
276

277
// decodeMetadata protects from panic and decodes.
278
func decodeMetadata(data []byte) (o *dbkernel.Offset, err error) {
15✔
279
        defer func() {
30✔
280
                if r := recover(); r != nil {
15✔
281
                        err = fmt.Errorf("decode ChunkPosition: Panic recovered %v", r)
×
282
                }
×
283
        }()
284
        if len(data) == 0 {
24✔
285
                return nil, nil
9✔
286
        }
9✔
287
        o = dbkernel.DecodeOffset(data)
6✔
288
        return o, err
6✔
289
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc