• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

brotherlogic / recordadder / 12591244555

03 Jan 2025 01:24AM UTC coverage: 53.873%. First build
12591244555

Pull #2310

github

brotherlogic
Tracks CDs also
Pull Request #2310: Tracks CDs also

0 of 17 new or added lines in 1 file covered. (0.0%)

153 of 284 relevant lines covered (53.87%)

0.63 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

38.5
/recordadderapi.go
1
package main
2

3
import (
4
        "fmt"
5
        "sort"
6
        "time"
7

8
        "github.com/brotherlogic/goserver/utils"
9
        pb "github.com/brotherlogic/recordadder/proto"
10
        google_protobuf "github.com/golang/protobuf/ptypes/any"
11
        "github.com/prometheus/client_golang/prometheus"
12
        "golang.org/x/net/context"
13
        "google.golang.org/grpc/codes"
14
        "google.golang.org/grpc/status"
15
        "google.golang.org/protobuf/proto"
16

17
        pbgd "github.com/brotherlogic/godiscogs/proto"
18
        qpb "github.com/brotherlogic/queue/proto"
19
        pbrc "github.com/brotherlogic/recordcollection/proto"
20
)
21

22
const (
23
        // QUEUE - Where we store incoming requests
24
        QUEUE = "/github.com/brotherlogic/recordadder/queue"
25
)
26

27
// AddRecord adds a record into the system
28
func (s *Server) AddRecord(ctx context.Context, req *pb.AddRecordRequest) (*pb.AddRecordResponse, error) {
1✔
29
        if req.GetPurchaseLocation() != "amoeba" &&
1✔
30
                req.GetPurchaseLocation() != "stranded" &&
1✔
31
                req.GetPurchaseLocation() != "hercules" &&
1✔
32
                req.GetPurchaseLocation() != "discogs" &&
1✔
33
                req.GetPurchaseLocation() != "gift" &&
1✔
34
                req.GetPurchaseLocation() != "bandcamp" &&
1✔
35
                req.GetPurchaseLocation() != "download" &&
1✔
36
                req.GetPurchaseLocation() != "cherry" &&
1✔
37
                req.GetPurchaseLocation() != "bleep" &&
1✔
38
                req.GetPurchaseLocation() != "groovemerchant" &&
1✔
39
                req.GetPurchaseLocation() != "sacredbones" &&
1✔
40
                req.GetPurchaseLocation() != "direct" {
2✔
41
                return nil, fmt.Errorf("Bad purchase location: %v", req.GetPurchaseLocation())
1✔
42
        }
1✔
43

44
        data, _, err := s.KSclient.Read(ctx, QUEUE, &pb.Queue{})
1✔
45
        if err != nil {
1✔
46
                return nil, err
×
47
        }
×
48
        queue := data.(*pb.Queue)
1✔
49

1✔
50
        for _, entry := range queue.Requests {
2✔
51
                if entry.Id == req.Id {
2✔
52
                        return nil, fmt.Errorf("This record is already in the queue")
1✔
53
                }
1✔
54
        }
55

56
        req.DateAdded = time.Now().Unix()
1✔
57

1✔
58
        queue.Requests = append(queue.Requests, req)
1✔
59

1✔
60
        conf, err := s.loadConfig(ctx)
1✔
61
        if err != nil {
1✔
62
                return nil, err
×
63
        }
×
64

65
        s.CtxLog(ctx, fmt.Sprintf("Found %v", conf.GetTodayFolders()))
1✔
66

1✔
67
        if conf.GetTodayFolders()[req.GetFolder()] > 0 {
1✔
68
                s.DeleteIssue(ctx, conf.GetTodayFolders()[req.GetFolder()])
×
69
        }
×
70

71
        // Run the fanout
72
        for _, server := range s.fanout {
2✔
73
                // Use a new context for fanout
1✔
74
                ctxfinner, cancelfinner := utils.ManualContext("rasave", time.Minute)
1✔
75
                err := s.runFanout(ctxfinner, server, req.GetId())
1✔
76
                code := status.Convert(err)
1✔
77
                if code.Code() != codes.OK && code.Code() != codes.Unavailable {
2✔
78
                        s.RaiseIssue(fmt.Sprintf("Fanout for %v failed", server), fmt.Sprintf("Error was %v (%v)", err, code.Code()))
1✔
79
                }
1✔
80
                cancelfinner()
1✔
81
        }
82

83
        err = s.KSclient.Save(ctx, QUEUE, queue)
1✔
84
        return &pb.AddRecordResponse{ExpectedAdditionDate: time.Now().Add(time.Hour * time.Duration((24 * len(queue.Requests)))).Unix()}, err
1✔
85
}
86

87
// ListQueue lists the entries in the queue
88
func (s *Server) ListQueue(ctx context.Context, req *pb.ListQueueRequest) (*pb.ListQueueResponse, error) {
1✔
89
        data, _, err := s.KSclient.Read(ctx, QUEUE, &pb.Queue{})
1✔
90
        if err != nil {
2✔
91
                return nil, err
1✔
92
        }
1✔
93
        queue := data.(*pb.Queue)
1✔
94

1✔
95
        return &pb.ListQueueResponse{Requests: queue.GetRequests()}, nil
1✔
96
}
97

98
// UpdateRecord updates a record
99
func (s *Server) UpdateRecord(ctx context.Context, req *pb.UpdateRecordRequest) (*pb.UpdateRecordResponse, error) {
1✔
100
        data, _, err := s.KSclient.Read(ctx, QUEUE, &pb.Queue{})
1✔
101
        if err != nil {
2✔
102
                return nil, err
1✔
103
        }
1✔
104
        queue := data.(*pb.Queue)
1✔
105

1✔
106
        updated := false
1✔
107
        for _, entry := range queue.Requests {
2✔
108
                if entry.Id == req.Id {
2✔
109
                        if req.GetAvailable() {
2✔
110
                                entry.Arrived = true
1✔
111
                                updated = true
1✔
112
                        }
1✔
113
                }
114
        }
115

116
        if !updated {
1✔
117
                return nil, status.Errorf(codes.NotFound, "Unable to locate %v for update", req.GetId())
×
118
        }
×
119

120
        return &pb.UpdateRecordResponse{}, s.KSclient.Save(ctx, QUEUE, queue)
1✔
121
}
122

123
// DeleteRecord remove a record from the queue
124
func (s *Server) DeleteRecord(ctx context.Context, req *pb.DeleteRecordRequest) (*pb.DeleteRecordResponse, error) {
1✔
125
        data, _, err := s.KSclient.Read(ctx, QUEUE, &pb.Queue{})
1✔
126
        if err != nil {
2✔
127
                return nil, err
1✔
128
        }
1✔
129
        queue := data.(*pb.Queue)
1✔
130

1✔
131
        nqueue := []*pb.AddRecordRequest{}
1✔
132
        for _, entry := range queue.Requests {
2✔
133
                if entry.Id != req.Id {
2✔
134
                        nqueue = append(nqueue, entry)
1✔
135
                }
1✔
136
        }
137
        queue.Requests = nqueue
1✔
138

1✔
139
        return &pb.DeleteRecordResponse{}, s.KSclient.Save(ctx, QUEUE, queue)
1✔
140

141
}
142

143
// ProcAdded processes the added queue
144
func (s *Server) ProcAdded(ctx context.Context, req *pb.ProcAddedRequest) (*pb.ProcAddedResponse, error) {
×
145
        conf, err3 := s.loadConfig(ctx)
×
146
        if err3 != nil {
×
147
                return nil, err3
×
148
        }
×
149

150
        if time.Now().YearDay() != int(conf.GetCurrentDay()) {
×
151
                conf.TodayFolders = make(map[int32]int32)
×
152
                conf.CurrentDay = int32(time.Now().YearDay())
×
153
                err := s.saveConfig(ctx, conf)
×
154
                if err != nil {
×
155
                        return nil, err
×
156
                }
×
157
        }
158

159
        s.CtxLog(ctx, fmt.Sprintf("Found %v", conf.GetTodayFolders()))
×
160

×
161
        if conf.GetTodayFolders()[267116] == 0 {
×
162

×
163
                ulcount, err := s.getUnlistenedSevens(ctx)
×
164
                if err != nil {
×
165
                        return nil, err
×
166
                }
×
167
                if ulcount < 3 {
×
168
                        s.CtxLog(ctx, fmt.Sprintf("Adding more because current count is %v", ulcount))
×
169
                        issue, err := s.ImmediateIssue(ctx, "Add 3 7 inches", "Do this", true, true)
×
170
                        if err != nil {
×
171
                                return nil, err
×
172
                        }
×
173
                        conf.TodayFolders[267116] = issue.GetNumber()
×
174
                        err = s.saveConfig(ctx, conf)
×
175
                        if err != nil {
×
176
                                return nil, err
×
177
                        }
×
178
                }
179
        }
180

NEW
181
        if conf.GetTodayFolders()[242018] == 0 {
×
NEW
182

×
NEW
183
                ulcount, err := s.getUnlistenedCDs(ctx)
×
NEW
184
                if err != nil {
×
NEW
185
                        return nil, err
×
NEW
186
                }
×
NEW
187
                if ulcount < 2 {
×
NEW
188
                        s.CtxLog(ctx, fmt.Sprintf("Adding more because current count is %v", ulcount))
×
NEW
189
                        issue, err := s.ImmediateIssue(ctx, "Add a CD", "Do this", true, true)
×
NEW
190
                        if err != nil {
×
NEW
191
                                return nil, err
×
NEW
192
                        }
×
NEW
193
                        conf.TodayFolders[242018] = issue.GetNumber()
×
NEW
194
                        err = s.saveConfig(ctx, conf)
×
NEW
195
                        if err != nil {
×
NEW
196
                                return nil, err
×
NEW
197
                        }
×
198
                }
199
        }
200

201
        s.CtxLog(ctx, fmt.Sprintf("Found %v", conf.GetTodayFolders()))
×
202

×
203
        val, ok := conf.GetAddedMap()[req.GetType()]
×
204
        //s.CtxLog(ctx,fmt.Sprintf("ADDED the MAP: %v (%v)", time.Since(time.Unix(val, 0)), time.Unix(val, 0)))
×
205
        if !ok || time.Since(time.Unix(val, 0)) > time.Hour*24 ||
×
206
                (time.Since(time.Unix(val, 0)) > time.Minute && req.GetType() == "FILE_7_INCH") ||
×
207
                (time.Since(time.Unix(val, 0)) > time.Minute && req.GetType() == "FILE_CD") ||
×
208
                (time.Since(time.Unix(val, 0)) > time.Hour*6 && req.GetType() == "FILE_12_INCH") {
×
209
                //s.CtxLog(ctx,"Adding!")
×
210

×
211
                dones.With(prometheus.Labels{"dest": req.GetType()}).Inc()
×
212
                conn, err := s.FDialServer(ctx, "recordcollection")
×
213
                if err != nil {
×
214
                        return nil, err
×
215
                }
×
216
                defer conn.Close()
×
217

×
218
                client := pbrc.NewRecordCollectionServiceClient(conn)
×
219
                res, err := client.QueryRecords(ctx, &pbrc.QueryRecordsRequest{Query: &pbrc.QueryRecordsRequest_Category{Category: pbrc.ReleaseMetadata_ARRIVED}})
×
220
                if err != nil {
×
221
                        return nil, err
×
222
                }
×
223

224
                var recs []*pbrc.Record
×
225
                for _, id := range res.GetInstanceIds() {
×
226
                        rec, err := client.GetRecord(ctx, &pbrc.GetRecordRequest{InstanceId: id})
×
227
                        if err != nil {
×
228
                                return nil, err
×
229
                        }
×
230

231
                        if rec.GetRecord().GetMetadata().GetFiledUnder().String() == req.GetType() {
×
232
                                recs = append(recs, rec.GetRecord())
×
233
                        }
×
234
                }
235

236
                sort.SliceStable(recs, func(i, j int) bool {
×
237
                        return recs[i].GetMetadata().GetDateAdded() < recs[j].GetMetadata().GetDateAdded()
×
238
                })
×
239

240
                if len(recs) > 0 {
×
241
                        _, err = client.UpdateRecord(ctx, &pbrc.UpdateRecordRequest{
×
242
                                Reason: "Updating for addition",
×
243
                                Update: &pbrc.Record{Release: &pbgd.Release{InstanceId: recs[0].GetRelease().GetInstanceId()},
×
244
                                        Metadata: &pbrc.ReleaseMetadata{Category: pbrc.ReleaseMetadata_UNLISTENED}},
×
245
                        })
×
246
                        s.CtxLog(ctx, fmt.Sprintf("RFOUND: %v with %v: %v", len(recs), req.GetType(), err))
×
247

×
248
                        if err != nil {
×
249
                                return nil, err
×
250
                        }
×
251

252
                        conf.AddedMap[req.GetType()] = time.Now().Unix()
×
253
                        err = s.saveConfig(ctx, conf)
×
254
                        if err != nil {
×
255
                                return nil, err
×
256
                        }
×
257
                        val = time.Now().Add(time.Hour).Unix()
×
258
                }
259

260
                if len(recs) <= 1 {
×
261
                        val = time.Now().Add(time.Hour).Unix()
×
262
                }
×
263

264
                runTime := val
×
265

×
266
                conn2, err2 := s.FDialServer(ctx, "queue")
×
267
                if err2 != nil {
×
268
                        return nil, err2
×
269
                }
×
270
                defer conn2.Close()
×
271
                qclient := qpb.NewQueueServiceClient(conn2)
×
272
                upup := &pb.ProcAddedRequest{
×
273
                        Type: req.GetType(),
×
274
                }
×
275
                data, _ := proto.Marshal(upup)
×
276
                _, err3 = qclient.AddQueueItem(ctx, &qpb.AddQueueItemRequest{
×
277
                        QueueName:     "record_adder",
×
278
                        RunTime:       runTime,
×
279
                        Payload:       &google_protobuf.Any{Value: data},
×
280
                        Key:           fmt.Sprintf("%v", req.GetType()),
×
281
                        RequireUnique: true,
×
282
                })
×
283
                return &pb.ProcAddedResponse{}, err3
×
284
        }
285

286
        return nil, status.Errorf(codes.FailedPrecondition, "there is nothing to add here (%v) until %v", req.GetType(), time.Since(time.Unix(val, 0)))
×
287
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc