• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

brotherlogic / dstore / 21648687834

03 Feb 2026 09:32PM UTC coverage: 0.0%. First build
21648687834

Pull #4441

github

brotherlogic
Log read time
Pull Request #4441: Log read time

0 of 1 new or added line in 1 file covered. (0.0%)

0 of 213 relevant lines covered (0.0%)

0.0 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

0.0
/dstoreapi.go
1
package main
2

3
import (
4
        "crypto/sha256"
5
        "fmt"
6
        "strings"
7
        "time"
8

9
        "golang.org/x/net/context"
10
        "google.golang.org/grpc/codes"
11
        "google.golang.org/grpc/status"
12

13
        pb "github.com/brotherlogic/dstore/proto"
14
        "github.com/brotherlogic/goserver/utils"
15
        "github.com/prometheus/client_golang/prometheus"
16
        "github.com/prometheus/client_golang/prometheus/promauto"
17
)
18

19
var (
20
        write_consensus = promauto.NewGaugeVec(prometheus.GaugeOpts{
21
                Name: "dstore_write_consensus",
22
                Help: "The oldest physical record",
23
        }, []string{"key"})
24
        read_consensus = promauto.NewGaugeVec(prometheus.GaugeOpts{
25
                Name: "dstore_read_consensus",
26
                Help: "The oldest physical record",
27
        }, []string{"key"})
28
        mainLatency = promauto.NewHistogramVec(prometheus.HistogramOpts{
29
                Name:    "dstore_main_latency",
30
                Help:    "The latency of server requests",
31
                Buckets: []float64{5, 10, 25, 50, 100, 250, 500, 1000, 2000, 4000, 8000, 16000, 32000, 64000, 128000, 256000, 1024000},
32
        }, []string{"method"})
33
        subLatency = promauto.NewHistogramVec(prometheus.HistogramOpts{
34
                Name:    "dstore_sub_latency",
35
                Help:    "The latency of server requests",
36
                Buckets: []float64{5, 10, 25, 50, 100, 250, 500, 1000, 2000, 4000, 8000, 16000, 32000, 64000, 128000, 256000, 1024000},
37
        }, []string{"method", "client"})
38
)
39

40
func (s *Server) GetLatest(ctx context.Context, req *pb.GetLatestRequest) (*pb.GetLatestResponse, error) {
×
41
        key := "latest"
×
42
        if req.GetHash() != "" {
×
43
                key = req.GetHash()
×
44
        }
×
45
        resp, err := s.readFile(req.GetKey(), key)
×
46
        if err != nil {
×
47
                return nil, err
×
48
        }
×
49
        return &pb.GetLatestResponse{Hash: resp.GetHash(), Timestamp: resp.GetTimestamp()}, nil
×
50
}
51

52
// Read reads out some data
53
func (s *Server) Read(ctx context.Context, req *pb.ReadRequest) (*pb.ReadResponse, error) {
×
54
        t1 := time.Now()
×
55
        defer func(run bool) {
×
NEW
56
                s.CtxLog(ctx, fmt.Sprintf("Read %v in %v", req.GetKey(), time.Since(t1)))
×
57
                if run {
×
58
                        mainLatency.With(prometheus.Labels{"method": "READ"}).Observe(float64(time.Since(t1).Milliseconds()))
×
59
                }
×
60
        }(!req.GetNoFanout())
61
        //Get the latest item if we don't have hash
62
        if req.GetHash() == "" {
×
63
                req.Hash = "latest"
×
64
        }
×
65

66
        resp, err := s.readFile(req.GetKey(), req.GetHash())
×
67

×
68
        hashMap := make(map[string]*pb.ReadResponse)
×
69
        countMap := make(map[string]int)
×
70

×
71
        bestCount := 1
×
72
        bestHash := ""
×
73
        friends := []string{"me"}
×
74

×
75
        if err == nil {
×
76
                hashMap[resp.GetHash()] = resp
×
77
                countMap[resp.GetHash()] = 1
×
78
                bestHash = resp.GetHash()
×
79
        }
×
80

81
        fcount := float32(0)
×
82
        if !req.NoFanout {
×
83
                req.NoFanout = true
×
84
                friends, err = s.FFind(ctx, "dstore")
×
85
                if err == nil {
×
86
                        for _, friend := range friends {
×
87
                                if !strings.HasPrefix(friend, s.Registry.GetIdentifier()) {
×
88
                                        conn, err := s.FDial(friend)
×
89
                                        if err == nil {
×
90
                                                client := pb.NewDStoreServiceClient(conn)
×
91

×
92
                                                t2 := time.Now()
×
93
                                                read, err := client.Read(ctx, req)
×
94
                                                subLatency.With(prometheus.Labels{"method": "READ", "client": friend}).Observe(float64(time.Since(t2).Milliseconds()))
×
95

×
96
                                                // We only consider reads where we got something back
×
97
                                                if err == nil || status.Convert(err).Code() == codes.InvalidArgument {
×
98
                                                        fcount++
×
99
                                                }
×
100

101
                                                if err == nil {
×
102
                                                        if _, ok := hashMap[read.GetHash()]; !ok {
×
103
                                                                hashMap[read.GetHash()] = read
×
104
                                                        }
×
105

106
                                                        countMap[read.GetHash()]++
×
107
                                                        if countMap[read.GetHash()] > bestCount {
×
108
                                                                bestCount = countMap[read.GetHash()]
×
109
                                                                bestHash = read.GetHash()
×
110
                                                        }
×
111
                                                }
112
                                                conn.Close()
×
113
                                        }
114
                                }
115
                        }
116
                }
117
        }
118

119
        // If we've read nothing return not found
120
        if bestHash == "" {
×
121
                s.CtxLog(ctx, fmt.Sprintf("Unable to read %v -> %v", req, err))
×
122
                return nil, status.Errorf(codes.InvalidArgument, "Cannot locate %v", req.GetKey())
×
123
        }
×
124

125
        //Let's get a consensus on the latest
126
        retResp := hashMap[bestHash]
×
127
        retResp.Consensus = float32(bestCount) / fcount
×
128

×
129
        read_consensus.With(prometheus.Labels{"key": req.GetKey()}).Set(float64(retResp.GetConsensus()))
×
130

×
131
        return retResp, nil
×
132
}
133

134
// Write writes out a key
135
func (s *Server) Write(ctx context.Context, req *pb.WriteRequest) (*pb.WriteResponse, error) {
×
136
        if !req.GetNoFanout() {
×
137
                s.CtxLog(ctx, fmt.Sprintf("writing %v as main", req.GetKey()))
×
138
        }
×
139
        t1 := time.Now()
×
140
        defer func(run bool) {
×
141
                s.CtxLog(ctx, fmt.Sprintf("Took %v as %v", time.Since(t1), run))
×
142
                if run {
×
143
                        mainLatency.With(prometheus.Labels{"method": "WRITE"}).Observe(float64(time.Since(t1).Milliseconds()))
×
144
                }
×
145
        }(!req.GetNoFanout())
146
        if strings.HasPrefix(req.GetKey(), "/") {
×
147
                return nil, fmt.Errorf("keys should not start with a backslash: %v", req.GetKey())
×
148
        }
×
149

150
        s.writeLock.Lock()
×
151

×
152
        found := false
×
153
        for _, k := range s.cleans {
×
154
                if k == req.GetKey() {
×
155
                        found = true
×
156
                }
×
157
        }
158
        if !found {
×
159
                s.cleans = append(s.cleans, req.GetKey())
×
160
        }
×
161

162
        h := sha256.New()
×
163
        h.Write(req.GetValue().Value)
×
164
        hash := fmt.Sprintf("%x", h.Sum(nil))
×
165

×
166
        ts := time.Now().Unix()
×
167
        err := s.writeToDir(ctx, req.GetKey(), hash, &pb.ReadResponse{
×
168
                Hash:      hash,
×
169
                Value:     req.GetValue(),
×
170
                Timestamp: ts,
×
171
        }, "latest")
×
172
        if err != nil {
×
173
                s.writeLock.Unlock()
×
174
                return nil, err
×
175
        }
×
176

177
        s.writeLock.Unlock()
×
178
        count := 1
×
179
        if !req.NoFanout {
×
180
                times := make(map[string]time.Duration)
×
181
                friends, err := s.FFind(ctx, "dstore")
×
182
                if err == nil {
×
183
                        req.NoFanout = true
×
184
                        for _, friend := range friends {
×
185
                                if !strings.HasPrefix(friend, s.Registry.GetIdentifier()) {
×
186
                                        tt := time.Now()
×
187
                                        conn, err := s.FDial(friend)
×
188
                                        if err == nil {
×
189
                                                client := pb.NewDStoreServiceClient(conn)
×
190
                                                t2 := time.Now()
×
191
                                                s.CtxLog(ctx, fmt.Sprintf("writing %v as sub to %v", req.GetKey(), friend))
×
192
                                                _, err := client.Write(ctx, req)
×
193
                                                subLatency.With(prometheus.Labels{"method": "WRITE", "client": friend}).Observe(float64(time.Since(t2).Milliseconds()))
×
194
                                                if err == nil {
×
195
                                                        count++
×
196
                                                        times[friend] = time.Since(tt)
×
197
                                                }
×
198
                                                conn.Close()
×
199
                                        }
200
                                }
201
                        }
202
                }
203

204
                write_consensus.With(prometheus.Labels{"key": req.GetKey()}).Set(float64(float32(count) / float32(len(friends))))
×
205

×
206
                s.CtxLog(ctx, fmt.Sprintf("Written %v in %v (%v) -> %v", req.GetKey(), time.Since(t1), req.GetNoFanout(), times))
×
207
                if !req.GetNoFanout() && time.Since(t1) > time.Millisecond*500 {
×
208
                        key, err := utils.GetContextKey(ctx)
×
209
                        s.RaiseIssue("Slow D Write", fmt.Sprintf("%v (%v) was a slow write (%v)", key, err, time.Since(t1)))
×
210
                }
×
211
                return &pb.WriteResponse{
×
212
                        Consensus: float32(count) / float32(len(friends)),
×
213
                        Hash:      hash,
×
214
                        Timestamp: ts,
×
215
                }, nil
×
216
        }
217

218
        return &pb.WriteResponse{}, nil
×
219
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc