• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

ankur-anand / unisondb / 14106509768

27 Mar 2025 12:24PM UTC coverage: 75.941% (+0.08%) from 75.866%
14106509768

Pull #41

github

ankur-anand
fix: benchtest go module rename from kvalchemy to unisondb.
Pull Request #41: fix: benchtest go module rename from kvalchemy to unisondb.

3068 of 4040 relevant lines covered (75.94%)

281.37 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

84.09
/internal/services/streamer/grpc_streamer.go
1
package streamer
2

3
import (
4
        "context"
5
        "errors"
6
        "fmt"
7
        "log/slog"
8
        "time"
9

10
        "github.com/ankur-anand/unisondb/dbkernel"
11
        "github.com/ankur-anand/unisondb/internal/middleware"
12
        "github.com/ankur-anand/unisondb/internal/services"
13
        "github.com/ankur-anand/unisondb/pkg/replicator"
14
        v1 "github.com/ankur-anand/unisondb/schemas/proto/gen/go/unisondb/streamer/v1"
15
        "golang.org/x/sync/errgroup"
16
        "google.golang.org/grpc"
17
        "google.golang.org/protobuf/types/known/timestamppb"
18
)
19

20
const (
21
        reqIDKey = "request_id"
22
)
23

24
var (
25
        // batchWaitTime defines a timeout value after which even if batch size,
26
        // threshold is not met all the reads from replicator will be flushed onto the channel.
27
        batchWaitTime = time.Millisecond * 100
28

29
        // batchSize defines a size of batch.
30
        batchSize = 20
31

32
        grpcMaxMsgSize = 1 << 20
33
)
34

35
// GrpcStreamer implements gRPC-based WalStreamerService.
36
type GrpcStreamer struct {
37
        // namespace mapped engine
38
        storageEngines map[string]*dbkernel.Engine
39
        dynamicTimeout time.Duration
40
        v1.UnimplementedWalStreamerServiceServer
41
        errGrp *errgroup.Group
42
}
43

44
// NewGrpcStreamer returns an initialized GrpcStreamer that implements  grpc-based WalStreamerService.
45
func NewGrpcStreamer(errGrp *errgroup.Group, storageEngines map[string]*dbkernel.Engine, dynamicTimeout time.Duration) *GrpcStreamer {
4✔
46
        return &GrpcStreamer{
4✔
47
                storageEngines: storageEngines,
4✔
48
                dynamicTimeout: dynamicTimeout,
4✔
49
                errGrp:         errGrp,
4✔
50
        }
4✔
51
}
4✔
52

53
// StreamWalRecords stream the underlying WAL record on the connection stream.
54
func (s *GrpcStreamer) StreamWalRecords(request *v1.StreamWalRecordsRequest, g grpc.ServerStreamingServer[v1.StreamWalRecordsResponse]) error {
10✔
55
        namespace, reqID, method := middleware.GetRequestInfo(g.Context())
10✔
56

10✔
57
        if namespace == "" {
10✔
58
                return services.ToGRPCError(namespace, reqID, method, services.ErrMissingNamespaceInMetadata)
×
59
        }
×
60

61
        engine, ok := s.storageEngines[namespace]
10✔
62
        if !ok {
11✔
63
                return services.ToGRPCError(namespace, reqID, method, services.ErrNamespaceNotExists)
1✔
64
        }
1✔
65

66
        // it can contain terrible data
67
        meta, err := decodeMetadata(request.GetOffset())
9✔
68
        if err != nil {
9✔
69
                return services.ToGRPCError(namespace, reqID, method, services.ErrInvalidMetadata)
×
70
        }
×
71
        // create a new replicator instance.
72
        slog.Debug("[unisondb.streamer.grpc] streaming WAL",
9✔
73
                "method", method,
9✔
74
                reqIDKey, reqID,
9✔
75
                "namespace", namespace,
9✔
76
                "offset", meta,
9✔
77
        )
9✔
78

9✔
79
        walReceiver := make(chan []*v1.WALRecord, 2)
9✔
80
        replicatorErr := make(chan error, 1)
9✔
81
        defer close(walReceiver)
9✔
82

9✔
83
        ctx, cancel := context.WithCancel(g.Context())
9✔
84
        defer cancel()
9✔
85

9✔
86
        rpInstance := replicator.NewReplicator(engine,
9✔
87
                batchSize,
9✔
88
                batchWaitTime, meta, "grpc")
9✔
89

9✔
90
        s.errGrp.Go(func() error {
18✔
91
                defer close(replicatorErr)
9✔
92
                err := rpInstance.Replicate(ctx, walReceiver)
9✔
93
                select {
9✔
94
                case replicatorErr <- err:
7✔
95
                case <-ctx.Done():
2✔
96
                        return nil
2✔
97
                }
98

99
                return nil
7✔
100
        })
101

102
        metricsActiveStreamTotal.WithLabelValues(namespace, string(method), "grpc").Inc()
9✔
103
        defer metricsActiveStreamTotal.WithLabelValues(namespace, string(method), "grpc").Dec()
9✔
104
        return s.streamWalRecords(ctx, g, walReceiver, replicatorErr)
9✔
105
}
106

107
// streamWalRecords streams namespaced Write-Ahead Log (WAL) records to the client in batches
108
// .
109
// While it might be tempting to stream WAL records indefinitely until the client cancels the RPC,
110
// this approach has significant drawbacks and is avoided here using dynamic timeout mechanism.
111
// (Note:  GOAWAY settings handle underlying connection issues.)
112
// Continuously streaming also has several problems:
113
//
114
//   - Resource Exhaustion: A malfunctioning client stream could monopolize the server's buffer,
115
//   - Unnecessary Processing: The server might fetch and stream WAL records that the client has already
116
//     received or is no longer interested in.
117
//
118
//nolint:gocognit
119
func (s *GrpcStreamer) streamWalRecords(ctx context.Context,
120
        g grpc.ServerStreamingServer[v1.StreamWalRecordsResponse],
121
        walReceiver chan []*v1.WALRecord,
122
        replicatorErr chan error) error {
9✔
123
        namespace, reqID, method := middleware.GetRequestInfo(g.Context())
9✔
124

9✔
125
        var (
9✔
126
                batch                  []*v1.WALRecord
9✔
127
                totalBatchSize         int
9✔
128
                lastReceivedRecordTime = time.Now()
9✔
129
        )
9✔
130

9✔
131
        flusher := func() error {
27✔
132
                if err := s.flushBatch(batch, g); err != nil {
19✔
133
                        return err
1✔
134
                }
1✔
135
                batch = []*v1.WALRecord{}
17✔
136
                totalBatchSize = 0
17✔
137
                return nil
17✔
138
        }
139

140
        dynamicTimeoutTicker := time.NewTicker(s.dynamicTimeout)
9✔
141
        defer dynamicTimeoutTicker.Stop()
9✔
142

9✔
143
        for {
37✔
144
                select {
28✔
145
                case <-dynamicTimeoutTicker.C:
9✔
146
                        if time.Since(lastReceivedRecordTime) > s.dynamicTimeout {
16✔
147
                                return services.ToGRPCError(namespace, reqID, method, services.ErrStreamTimeout)
7✔
148
                        }
7✔
149

150
                case <-ctx.Done():
×
151
                        err := ctx.Err()
×
152
                        if errors.Is(err, services.ErrStreamTimeout) {
×
153
                                return services.ToGRPCError(namespace, reqID, method, services.ErrStreamTimeout)
×
154
                        }
×
155
                        if errors.Is(err, context.Canceled) {
×
156
                                return nil
×
157
                        }
×
158
                        return ctx.Err()
×
159
                case walRecords := <-walReceiver:
18✔
160
                        for _, walRecord := range walRecords {
71✔
161
                                lastReceivedRecordTime = time.Now()
53✔
162
                                totalBatchSize += len(walRecord.Record)
53✔
163
                                batch = append(batch, walRecord)
53✔
164

53✔
165
                                if totalBatchSize >= grpcMaxMsgSize {
53✔
166
                                        if err := flusher(); err != nil {
×
167
                                                return services.ToGRPCError(namespace, reqID, method, err)
×
168
                                        }
×
169
                                }
170
                        }
171
                        // flush remaining
172
                        if err := flusher(); err != nil {
19✔
173
                                return services.ToGRPCError(namespace, reqID, method, err)
1✔
174
                        }
1✔
175
                case err := <-replicatorErr:
1✔
176
                        if errors.Is(err, dbkernel.ErrInvalidOffset) {
2✔
177
                                return services.ToGRPCError(namespace, reqID, method, services.ErrInvalidMetadata)
1✔
178
                        }
1✔
179
                        return services.ToGRPCError(namespace, reqID, method, err)
×
180
                }
181
        }
182
}
183

184
func (s *GrpcStreamer) flushBatch(batch []*v1.WALRecord, g grpc.ServerStream) error {
18✔
185
        namespace, reqID, method := middleware.GetRequestInfo(g.Context())
18✔
186
        metricsStreamSendTotal.WithLabelValues(namespace, string(method), "grpc").Add(float64(len(batch)))
18✔
187
        if len(batch) == 0 {
18✔
188
                return nil
×
189
        }
×
190

191
        slog.Debug("[unisondb.streamer.grpc] Batch flushing", "size", len(batch))
18✔
192
        response := &v1.StreamWalRecordsResponse{Records: batch, ServerTimestamp: timestamppb.Now()}
18✔
193

18✔
194
        start := time.Now()
18✔
195
        defer func() {
36✔
196
                metricsStreamSendLatency.WithLabelValues(namespace, string(method), "grpc").Observe(time.Since(start).Seconds())
18✔
197
        }()
18✔
198

199
        if err := g.SendMsg(response); err != nil {
19✔
200
                slog.Error("[unisondb.streamer.grpc] Stream: failed to send WAL records", "err", err,
1✔
201
                        "records_count", len(batch), "namespace", namespace, reqIDKey, reqID)
1✔
202
                metricsStreamSendErrors.WithLabelValues(namespace, string(method), "grpc").Inc()
1✔
203
                return fmt.Errorf("failed to send WAL records: %w", err)
1✔
204
        }
1✔
205
        return nil
17✔
206
}
207

208
// decodeMetadata protects from panic and decodes.
209
func decodeMetadata(data []byte) (o *dbkernel.Offset, err error) {
9✔
210
        defer func() {
18✔
211
                if r := recover(); r != nil {
9✔
212
                        err = fmt.Errorf("decode ChunkPosition: Panic recovered %v", r)
×
213
                }
×
214
        }()
215
        if len(data) == 0 {
12✔
216
                return nil, nil
3✔
217
        }
3✔
218
        o = dbkernel.DecodeOffset(data)
6✔
219
        return o, err
6✔
220
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc