• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

ankur-anand / unisondb / 15026438913

14 May 2025 04:48PM UTC coverage: 82.399% (-0.2%) from 82.642%
15026438913

Pull #111

github

ankur-anand
fix: memtable test case for temp dir.
Pull Request #111: fix: memtable test case for temp dir.

5777 of 7011 relevant lines covered (82.4%)

13875.52 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

74.59
/internal/services/streamer/grpc_streamer.go
1
package streamer
2

3
import (
4
        "context"
5
        "errors"
6
        "fmt"
7
        "log/slog"
8
        "time"
9

10
        "github.com/ankur-anand/unisondb/dbkernel"
11
        "github.com/ankur-anand/unisondb/internal"
12
        "github.com/ankur-anand/unisondb/internal/grpcutils"
13
        "github.com/ankur-anand/unisondb/internal/services"
14
        "github.com/ankur-anand/unisondb/pkg/replicator"
15
        v1 "github.com/ankur-anand/unisondb/schemas/proto/gen/go/unisondb/streamer/v1"
16
        "golang.org/x/sync/errgroup"
17
        "google.golang.org/grpc"
18
        "google.golang.org/grpc/codes"
19
        "google.golang.org/grpc/status"
20
        "google.golang.org/protobuf/types/known/timestamppb"
21
)
22

23
const (
24
        reqIDKey = "request_id"
25
)
26

27
var (
28
        // batchWaitTime defines a timeout value after which even if batch size,
29
        // threshold is not met all the reads from replicator will be flushed onto the channel.
30
        batchWaitTime = time.Millisecond * 100
31

32
        // batchSize defines a size of batch.
33
        batchSize = 20
34

35
        grpcMaxMsgSize = 1 << 20
36
)
37

38
// GrpcStreamer implements gRPC-based WalStreamerService.
39
type GrpcStreamer struct {
40
        // namespace mapped engine
41
        storageEngines map[string]*dbkernel.Engine
42
        dynamicTimeout time.Duration
43
        v1.UnimplementedWalStreamerServiceServer
44
        errGrp   *errgroup.Group
45
        shutdown chan struct{}
46
}
47

48
// NewGrpcStreamer returns an initialized GrpcStreamer that implements  grpc-based WalStreamerService.
49
func NewGrpcStreamer(errGrp *errgroup.Group, storageEngines map[string]*dbkernel.Engine, dynamicTimeout time.Duration) *GrpcStreamer {
6✔
50
        return &GrpcStreamer{
6✔
51
                storageEngines: storageEngines,
6✔
52
                dynamicTimeout: dynamicTimeout,
6✔
53
                errGrp:         errGrp,
6✔
54
                shutdown:       make(chan struct{}),
6✔
55
        }
6✔
56
}
6✔
57

58
func (s *GrpcStreamer) Close() {
×
59
        close(s.shutdown)
×
60
}
×
61

62
// GetLatestOffset returns the latest wal offset of the wal for the provided namespace.
63
func (s *GrpcStreamer) GetLatestOffset(ctx context.Context, _ *v1.GetLatestOffsetRequest) (*v1.GetLatestOffsetResponse, error) {
5✔
64
        namespace, reqID, method := grpcutils.GetRequestInfo(ctx)
5✔
65

5✔
66
        if namespace == "" {
6✔
67
                return nil, services.ToGRPCError(namespace, reqID, method, services.ErrMissingNamespaceInMetadata)
1✔
68
        }
1✔
69

70
        engine, ok := s.storageEngines[namespace]
4✔
71
        if !ok {
5✔
72
                return nil, services.ToGRPCError(namespace, reqID, method, services.ErrNamespaceNotExists)
1✔
73
        }
1✔
74

75
        offset := engine.CurrentOffset()
3✔
76
        if offset == nil {
4✔
77
                return &v1.GetLatestOffsetResponse{}, nil
1✔
78
        }
1✔
79
        return &v1.GetLatestOffsetResponse{Offset: offset.Encode()}, nil
2✔
80
}
81

82
// StreamWalRecords stream the underlying WAL record on the connection stream.
83
func (s *GrpcStreamer) StreamWalRecords(request *v1.StreamWalRecordsRequest, g grpc.ServerStreamingServer[v1.StreamWalRecordsResponse]) error {
17✔
84
        namespace, reqID, method := grpcutils.GetRequestInfo(g.Context())
17✔
85

17✔
86
        if namespace == "" {
17✔
87
                return services.ToGRPCError(namespace, reqID, method, services.ErrMissingNamespaceInMetadata)
×
88
        }
×
89

90
        engine, ok := s.storageEngines[namespace]
17✔
91
        if !ok {
19✔
92
                return services.ToGRPCError(namespace, reqID, method, services.ErrNamespaceNotExists)
2✔
93
        }
2✔
94

95
        // it can contain terrible data
96
        meta, err := decodeMetadata(request.GetOffset())
15✔
97
        if err != nil {
15✔
98
                slog.Error("[unisondb.streamer.grpc]",
×
99
                        slog.String("event_type", "metadata.decoding.failed"),
×
100
                        slog.Any("error", err),
×
101
                        slog.Any("server_offset", engine.CurrentOffset()),
×
102
                        slog.Group("request",
×
103
                                slog.String("namespace", namespace),
×
104
                                slog.String("id", string(reqID)),
×
105
                                slog.Any("offset", request.GetOffset()),
×
106
                        ),
×
107
                )
×
108
                return services.ToGRPCError(namespace, reqID, method, services.ErrInvalidMetadata)
×
109
        }
×
110
        // create a new replicator instance.
111
        slog.Debug("[unisondb.streamer.grpc] streaming WAL",
15✔
112
                "method", method,
15✔
113
                reqIDKey, reqID,
15✔
114
                "namespace", namespace,
15✔
115
                "offset", meta,
15✔
116
        )
15✔
117

15✔
118
        walReceiver := make(chan []*v1.WALRecord, 2)
15✔
119
        replicatorErr := make(chan error, 1)
15✔
120

15✔
121
        ctx, cancel := context.WithCancel(g.Context())
15✔
122
        defer cancel()
15✔
123

15✔
124
        rpInstance := replicator.NewReplicator(engine,
15✔
125
                batchSize,
15✔
126
                batchWaitTime, meta, "grpc")
15✔
127

15✔
128
        // when server is closed, the goroutine would be closed upon
15✔
129
        // cancel of ctx.
15✔
130
        // walReceiver should be closed only when Replicate method returns.
15✔
131
        s.errGrp.Go(func() error {
30✔
132
                defer func() {
30✔
133
                        close(walReceiver)
15✔
134
                        close(replicatorErr)
15✔
135
                }()
15✔
136

137
                err := rpInstance.Replicate(ctx, walReceiver)
15✔
138
                select {
15✔
139
                case replicatorErr <- err:
10✔
140
                case <-ctx.Done():
5✔
141
                        return nil
5✔
142
                }
143

144
                return nil
10✔
145
        })
146

147
        metricsActiveStreamTotal.WithLabelValues(namespace, string(method), "grpc").Inc()
15✔
148
        defer metricsActiveStreamTotal.WithLabelValues(namespace, string(method), "grpc").Dec()
15✔
149
        return s.streamWalRecords(ctx, g, walReceiver, replicatorErr)
15✔
150
}
151

152
// streamWalRecords streams namespaced Write-Ahead Log (WAL) records to the client in batches
153
// .
154
// While it might be tempting to stream WAL records indefinitely until the client cancels the RPC,
155
// this approach has significant drawbacks and is avoided here using dynamic timeout mechanism.
156
// (Note:  GOAWAY settings handle underlying connection issues.)
157
// Continuously streaming also has several problems:
158
//
159
//   - Resource Exhaustion: A malfunctioning client stream could monopolize the server's buffer,
160
//   - Unnecessary Processing: The server might fetch and stream WAL records that the client has already
161
//     received or is no longer interested in.
162
//
163
//nolint:gocognit
164
func (s *GrpcStreamer) streamWalRecords(ctx context.Context,
165
        g grpc.ServerStreamingServer[v1.StreamWalRecordsResponse],
166
        walReceiver chan []*v1.WALRecord,
167
        replicatorErr chan error) error {
15✔
168
        namespace, reqID, method := grpcutils.GetRequestInfo(g.Context())
15✔
169

15✔
170
        var (
15✔
171
                batch                  []*v1.WALRecord
15✔
172
                totalBatchSize         int
15✔
173
                lastReceivedRecordTime = time.Now()
15✔
174
        )
15✔
175

15✔
176
        flusher := func() error {
35✔
177
                if err := s.flushBatch(batch, g); err != nil {
20✔
178
                        return err
×
179
                }
×
180
                batch = []*v1.WALRecord{}
20✔
181
                totalBatchSize = 0
20✔
182
                return nil
20✔
183
        }
184

185
        dynamicTimeoutTicker := time.NewTicker(s.dynamicTimeout)
15✔
186
        defer dynamicTimeoutTicker.Stop()
15✔
187

15✔
188
        for {
54✔
189
                select {
39✔
190
                case <-dynamicTimeoutTicker.C:
17✔
191
                        if time.Since(lastReceivedRecordTime) > s.dynamicTimeout {
30✔
192
                                return services.ToGRPCError(namespace, reqID, method, services.ErrStreamTimeout)
13✔
193
                        }
13✔
194
                case <-s.shutdown:
×
195
                        return status.Error(codes.Unavailable, internal.GracefulShutdownMsg)
×
196

197
                case <-ctx.Done():
1✔
198
                        err := ctx.Err()
1✔
199
                        if errors.Is(err, services.ErrStreamTimeout) {
1✔
200
                                return services.ToGRPCError(namespace, reqID, method, services.ErrStreamTimeout)
×
201
                        }
×
202
                        if errors.Is(err, context.Canceled) {
2✔
203
                                return nil
1✔
204
                        }
1✔
205
                        return ctx.Err()
×
206
                case walRecords := <-walReceiver:
20✔
207
                        for _, walRecord := range walRecords {
70✔
208
                                lastReceivedRecordTime = time.Now()
50✔
209
                                totalBatchSize += len(walRecord.Record)
50✔
210
                                batch = append(batch, walRecord)
50✔
211

50✔
212
                                if totalBatchSize >= grpcMaxMsgSize {
50✔
213
                                        if err := flusher(); err != nil {
×
214
                                                return services.ToGRPCError(namespace, reqID, method, err)
×
215
                                        }
×
216
                                }
217
                        }
218
                        // flush remaining
219
                        if err := flusher(); err != nil {
20✔
220
                                return services.ToGRPCError(namespace, reqID, method, err)
×
221
                        }
×
222
                case err := <-replicatorErr:
1✔
223
                        if errors.Is(err, dbkernel.ErrInvalidOffset) {
2✔
224
                                slog.Error("[unisondb.streamer.grpc]",
1✔
225
                                        slog.String("event_type", "replicator.offset.invalid"),
1✔
226
                                        slog.Any("error", err),
1✔
227
                                        slog.Group("request",
1✔
228
                                                slog.String("namespace", namespace),
1✔
229
                                                slog.String("id", string(reqID)),
1✔
230
                                        ),
1✔
231
                                )
1✔
232

1✔
233
                                return services.ToGRPCError(namespace, reqID, method, services.ErrInvalidMetadata)
1✔
234
                        }
1✔
235
                        return services.ToGRPCError(namespace, reqID, method, err)
×
236
                }
237
        }
238
}
239

240
func (s *GrpcStreamer) flushBatch(batch []*v1.WALRecord, g grpc.ServerStream) error {
20✔
241
        namespace, reqID, method := grpcutils.GetRequestInfo(g.Context())
20✔
242
        metricsStreamSendTotal.WithLabelValues(namespace, string(method), "grpc").Add(float64(len(batch)))
20✔
243
        if len(batch) == 0 {
20✔
244
                return nil
×
245
        }
×
246

247
        slog.Debug("[unisondb.streamer.grpc] Batch flushing", "size", len(batch))
20✔
248
        response := &v1.StreamWalRecordsResponse{Records: batch, ServerTimestamp: timestamppb.Now()}
20✔
249

20✔
250
        start := time.Now()
20✔
251
        defer func() {
40✔
252
                metricsStreamSendLatency.WithLabelValues(namespace, string(method), "grpc").Observe(time.Since(start).Seconds())
20✔
253
        }()
20✔
254

255
        if err := g.SendMsg(response); err != nil {
20✔
256
                slog.Error("[unisondb.streamer.grpc]",
×
257
                        slog.String("event_type", "send.wal.records.failed"),
×
258
                        slog.Any("error", err),
×
259
                        slog.Int("records_count", len(batch)),
×
260
                        slog.Group("request",
×
261
                                slog.String("namespace", namespace),
×
262
                                slog.String("id", string(reqID)),
×
263
                        ),
×
264
                )
×
265

×
266
                metricsStreamSendErrors.WithLabelValues(namespace, string(method), "grpc").Inc()
×
267
                return fmt.Errorf("failed to send WAL records: %w", err)
×
268
        }
×
269
        return nil
20✔
270
}
271

272
// decodeMetadata protects from panic and decodes.
273
func decodeMetadata(data []byte) (o *dbkernel.Offset, err error) {
15✔
274
        defer func() {
30✔
275
                if r := recover(); r != nil {
15✔
276
                        err = fmt.Errorf("decode ChunkPosition: Panic recovered %v", r)
×
277
                }
×
278
        }()
279
        if len(data) == 0 {
24✔
280
                return nil, nil
9✔
281
        }
9✔
282
        o = dbkernel.DecodeOffset(data)
6✔
283
        return o, err
6✔
284
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc