• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

brotherlogic / recordadder / 21019644258

15 Jan 2026 04:21AM UTC coverage: 54.064%. First build
21019644258

Pull #3040

github

brotherlogic
Faster tapes
Pull Request #3040: Faster tapes

0 of 1 new or added line in 1 file covered. (0.0%)

153 of 283 relevant lines covered (54.06%)

0.63 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

38.68
/recordadderapi.go
1
package main
2

3
import (
4
        "fmt"
5
        "sort"
6
        "time"
7

8
        "github.com/brotherlogic/goserver/utils"
9
        pb "github.com/brotherlogic/recordadder/proto"
10
        google_protobuf "github.com/golang/protobuf/ptypes/any"
11
        "github.com/prometheus/client_golang/prometheus"
12
        "golang.org/x/net/context"
13
        "google.golang.org/grpc/codes"
14
        "google.golang.org/grpc/status"
15
        "google.golang.org/protobuf/proto"
16

17
        pbgd "github.com/brotherlogic/godiscogs/proto"
18
        qpb "github.com/brotherlogic/queue/proto"
19
        pbrc "github.com/brotherlogic/recordcollection/proto"
20
)
21

22
const (
23
        // QUEUE - Where we store incoming requests
24
        QUEUE = "/github.com/brotherlogic/recordadder/queue"
25
)
26

27
// AddRecord adds a record into the system
28
func (s *Server) AddRecord(ctx context.Context, req *pb.AddRecordRequest) (*pb.AddRecordResponse, error) {
1✔
29
        if req.GetPurchaseLocation() != "amoeba" &&
1✔
30
                req.GetPurchaseLocation() != "stranded" &&
1✔
31
                req.GetPurchaseLocation() != "hercules" &&
1✔
32
                req.GetPurchaseLocation() != "discogs" &&
1✔
33
                req.GetPurchaseLocation() != "gift" &&
1✔
34
                req.GetPurchaseLocation() != "bandcamp" &&
1✔
35
                req.GetPurchaseLocation() != "download" &&
1✔
36
                req.GetPurchaseLocation() != "cherry" &&
1✔
37
                req.GetPurchaseLocation() != "bleep" &&
1✔
38
                req.GetPurchaseLocation() != "groovemerchant" &&
1✔
39
                req.GetPurchaseLocation() != "sacredbones" &&
1✔
40
                req.GetPurchaseLocation() != "direct" {
2✔
41
                return nil, fmt.Errorf("Bad purchase location: %v", req.GetPurchaseLocation())
1✔
42
        }
1✔
43

44
        data, _, err := s.KSclient.Read(ctx, QUEUE, &pb.Queue{})
1✔
45
        if err != nil {
1✔
46
                return nil, err
×
47
        }
×
48
        queue := data.(*pb.Queue)
1✔
49

1✔
50
        for _, entry := range queue.Requests {
2✔
51
                if entry.Id == req.Id {
2✔
52
                        return nil, fmt.Errorf("This record is already in the queue")
1✔
53
                }
1✔
54
        }
55

56
        req.DateAdded = time.Now().Unix()
1✔
57

1✔
58
        queue.Requests = append(queue.Requests, req)
1✔
59

1✔
60
        conf, err := s.loadConfig(ctx)
1✔
61
        if err != nil {
1✔
62
                return nil, err
×
63
        }
×
64

65
        s.CtxLog(ctx, fmt.Sprintf("Found %v", conf.GetTodayFolders()))
1✔
66

1✔
67
        if conf.GetTodayFolders()[req.GetFolder()] > 0 {
1✔
68
                s.DeleteIssue(ctx, conf.GetTodayFolders()[req.GetFolder()])
×
69
        }
×
70

71
        // Run the fanout
72
        for _, server := range s.fanout {
2✔
73
                // Use a new context for fanout
1✔
74
                ctxfinner, cancelfinner := utils.ManualContext("rasave", time.Minute)
1✔
75
                err := s.runFanout(ctxfinner, server, req.GetId())
1✔
76
                code := status.Convert(err)
1✔
77
                if code.Code() != codes.OK && code.Code() != codes.Unavailable {
2✔
78
                        s.RaiseIssue(fmt.Sprintf("Fanout for %v failed", server), fmt.Sprintf("Error was %v (%v)", err, code.Code()))
1✔
79
                }
1✔
80
                cancelfinner()
1✔
81
        }
82

83
        err = s.KSclient.Save(ctx, QUEUE, queue)
1✔
84
        return &pb.AddRecordResponse{ExpectedAdditionDate: time.Now().Add(time.Hour * time.Duration((24 * len(queue.Requests)))).Unix()}, err
1✔
85
}
86

87
// ListQueue lists the entries in the queue
88
func (s *Server) ListQueue(ctx context.Context, req *pb.ListQueueRequest) (*pb.ListQueueResponse, error) {
1✔
89
        data, _, err := s.KSclient.Read(ctx, QUEUE, &pb.Queue{})
1✔
90
        if err != nil {
2✔
91
                return nil, err
1✔
92
        }
1✔
93
        queue := data.(*pb.Queue)
1✔
94

1✔
95
        return &pb.ListQueueResponse{Requests: queue.GetRequests()}, nil
1✔
96
}
97

98
// UpdateRecord updates a record
99
func (s *Server) UpdateRecord(ctx context.Context, req *pb.UpdateRecordRequest) (*pb.UpdateRecordResponse, error) {
1✔
100
        data, _, err := s.KSclient.Read(ctx, QUEUE, &pb.Queue{})
1✔
101
        if err != nil {
2✔
102
                return nil, err
1✔
103
        }
1✔
104
        queue := data.(*pb.Queue)
1✔
105

1✔
106
        updated := false
1✔
107
        for _, entry := range queue.Requests {
2✔
108
                if entry.Id == req.Id {
2✔
109
                        if req.GetAvailable() {
2✔
110
                                entry.Arrived = true
1✔
111
                                updated = true
1✔
112
                        }
1✔
113
                }
114
        }
115

116
        if !updated {
1✔
117
                return nil, status.Errorf(codes.NotFound, "Unable to locate %v for update", req.GetId())
×
118
        }
×
119

120
        return &pb.UpdateRecordResponse{}, s.KSclient.Save(ctx, QUEUE, queue)
1✔
121
}
122

123
// DeleteRecord remove a record from the queue
124
func (s *Server) DeleteRecord(ctx context.Context, req *pb.DeleteRecordRequest) (*pb.DeleteRecordResponse, error) {
1✔
125
        data, _, err := s.KSclient.Read(ctx, QUEUE, &pb.Queue{})
1✔
126
        if err != nil {
2✔
127
                return nil, err
1✔
128
        }
1✔
129
        queue := data.(*pb.Queue)
1✔
130

1✔
131
        nqueue := []*pb.AddRecordRequest{}
1✔
132
        for _, entry := range queue.Requests {
2✔
133
                if entry.Id != req.Id {
2✔
134
                        nqueue = append(nqueue, entry)
1✔
135
                }
1✔
136
        }
137
        queue.Requests = nqueue
1✔
138

1✔
139
        return &pb.DeleteRecordResponse{}, s.KSclient.Save(ctx, QUEUE, queue)
1✔
140

141
}
142

143
// ProcAdded processes the added queue
144
func (s *Server) ProcAdded(ctx context.Context, req *pb.ProcAddedRequest) (*pb.ProcAddedResponse, error) {
×
145
        conf, err3 := s.loadConfig(ctx)
×
146
        if err3 != nil {
×
147
                return nil, err3
×
148
        }
×
149

150
        if time.Now().YearDay() != int(conf.GetCurrentDay()) {
×
151
                conf.TodayFolders = make(map[int32]int32)
×
152
                conf.CurrentDay = int32(time.Now().YearDay())
×
153
                err := s.saveConfig(ctx, conf)
×
154
                if err != nil {
×
155
                        return nil, err
×
156
                }
×
157
        }
158

159
        s.CtxLog(ctx, fmt.Sprintf("Found %v", conf.GetTodayFolders()))
×
160

×
161
        if conf.GetTodayFolders()[242018] == 0 {
×
162

×
163
                ulcount, err := s.getUnlistenedCDs(ctx)
×
164
                if err != nil {
×
165
                        return nil, err
×
166
                }
×
167
                if ulcount < 2 {
×
168
                        s.CtxLog(ctx, fmt.Sprintf("Adding more because current count is %v", ulcount))
×
169
                        issue, err := s.ImmediateIssue(ctx, "Add a CD", "Do this", true, true)
×
170
                        if err != nil {
×
171
                                return nil, err
×
172
                        }
×
173
                        conf.TodayFolders[242018] = issue.GetNumber()
×
174
                        err = s.saveConfig(ctx, conf)
×
175
                        if err != nil {
×
176
                                return nil, err
×
177
                        }
×
178
                }
179
        }
180

181
        s.CtxLog(ctx, fmt.Sprintf("Found %v", conf.GetTodayFolders()))
×
182

×
183
        val, ok := conf.GetAddedMap()[req.GetType()]
×
184
        //s.CtxLog(ctx,fmt.Sprintf("ADDED the MAP: %v (%v)", time.Since(time.Unix(val, 0)), time.Unix(val, 0)))
×
185
        if !ok || time.Since(time.Unix(val, 0)) > time.Hour*24 ||
×
186
                (time.Since(time.Unix(val, 0)) > time.Minute && req.GetType() == "FILE_7_INCH") ||
×
NEW
187
                (time.Since(time.Unix(val, 0)) > time.Minute && req.GetType() == "FILE_TAPE") ||
×
188
                (time.Since(time.Unix(val, 0)) > time.Minute && req.GetType() == "FILE_CD") ||
×
189
                (time.Since(time.Unix(val, 0)) > time.Minute && req.GetType() == "FILE_DIGITAL") ||
×
190
                (time.Since(time.Unix(val, 0)) > time.Hour*6 && req.GetType() == "FILE_12_INCH") {
×
191
                //s.CtxLog(ctx,"Adding!")
×
192

×
193
                dones.With(prometheus.Labels{"dest": req.GetType()}).Inc()
×
194
                conn, err := s.FDialServer(ctx, "recordcollection")
×
195
                if err != nil {
×
196
                        return nil, err
×
197
                }
×
198
                defer conn.Close()
×
199

×
200
                client := pbrc.NewRecordCollectionServiceClient(conn)
×
201
                res, err := client.QueryRecords(ctx, &pbrc.QueryRecordsRequest{Query: &pbrc.QueryRecordsRequest_Category{Category: pbrc.ReleaseMetadata_ARRIVED}})
×
202
                if err != nil {
×
203
                        return nil, err
×
204
                }
×
205

206
                var recs []*pbrc.Record
×
207
                for _, id := range res.GetInstanceIds() {
×
208
                        rec, err := client.GetRecord(ctx, &pbrc.GetRecordRequest{InstanceId: id})
×
209
                        if err != nil {
×
210
                                return nil, err
×
211
                        }
×
212

213
                        if rec.GetRecord().GetMetadata().GetFiledUnder().String() == req.GetType() {
×
214
                                recs = append(recs, rec.GetRecord())
×
215
                        }
×
216
                }
217

218
                sort.SliceStable(recs, func(i, j int) bool {
×
219
                        return recs[i].GetMetadata().GetDateAdded() < recs[j].GetMetadata().GetDateAdded()
×
220
                })
×
221

222
                if len(recs) > 0 {
×
223
                        _, err = client.UpdateRecord(ctx, &pbrc.UpdateRecordRequest{
×
224
                                Reason: "Updating for addition",
×
225
                                Update: &pbrc.Record{Release: &pbgd.Release{InstanceId: recs[0].GetRelease().GetInstanceId()},
×
226
                                        Metadata: &pbrc.ReleaseMetadata{Category: pbrc.ReleaseMetadata_UNLISTENED}},
×
227
                        })
×
228
                        s.CtxLog(ctx, fmt.Sprintf("RFOUND: %v with %v: %v", len(recs), req.GetType(), err))
×
229

×
230
                        if err != nil {
×
231
                                return nil, err
×
232
                        }
×
233

234
                        conf.AddedMap[req.GetType()] = time.Now().Unix()
×
235
                        err = s.saveConfig(ctx, conf)
×
236
                        if err != nil {
×
237
                                return nil, err
×
238
                        }
×
239
                        val = time.Now().Add(time.Minute).Unix()
×
240
                }
241

242
                if len(recs) <= 1 {
×
243
                        val = time.Now().Add(time.Minute).Unix()
×
244
                }
×
245

246
                runTime := val
×
247

×
248
                conn2, err2 := s.FDialServer(ctx, "queue")
×
249
                if err2 != nil {
×
250
                        return nil, err2
×
251
                }
×
252
                defer conn2.Close()
×
253
                qclient := qpb.NewQueueServiceClient(conn2)
×
254
                upup := &pb.ProcAddedRequest{
×
255
                        Type: req.GetType(),
×
256
                }
×
257
                data, _ := proto.Marshal(upup)
×
258
                _, err3 = qclient.AddQueueItem(ctx, &qpb.AddQueueItemRequest{
×
259
                        QueueName:     "record_adder",
×
260
                        RunTime:       runTime,
×
261
                        Payload:       &google_protobuf.Any{Value: data},
×
262
                        Key:           fmt.Sprintf("%v", req.GetType()),
×
263
                        RequireUnique: true,
×
264
                })
×
265
                return &pb.ProcAddedResponse{}, err3
×
266
        }
267

268
        return nil, status.Errorf(codes.FailedPrecondition, "there is nothing to add here (%v) until %v", req.GetType(), time.Since(time.Unix(val, 0)))
×
269
}
270

271
func (s *Server) ClientUpdate(ctx context.Context, req *pbrc.ClientUpdateRequest) (*pbrc.ClientUpdateResponse, error) {
×
272
        rec, err := s.rc.getRecord(ctx, req.GetInstanceId())
×
273
        if err != nil {
×
274
                return nil, err
×
275
        }
×
276
        conf, err := s.loadConfig(ctx)
×
277
        if err != nil {
×
278
                return nil, err
×
279
        }
×
280

281
        if time.Since(time.Unix(rec.GetMetadata().GetDateAdded(), 0)) < time.Hour*24 {
×
282
                if conf.GetTodayFolders()[rec.GetMetadata().GetGoalFolder()] > 0 {
×
283
                        s.DeleteIssue(ctx, conf.GetTodayFolders()[rec.GetMetadata().GetGoalFolder()])
×
284
                }
×
285
        }
286

287
        return &pbrc.ClientUpdateResponse{}, s.saveConfig(ctx, conf)
×
288
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc