• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

brotherlogic / gobuildslave / 13597327478

28 Feb 2025 10:07PM UTC coverage: 29.895%. First build
13597327478

Pull #4754

github

brotherlogic
Merge remote-tracking branch 'origin/main'
Pull Request #4754: Merge remote-tracking branch 'origin/main'

0 of 11 new or added lines in 1 file covered. (0.0%)

371 of 1241 relevant lines covered (29.9%)

0.33 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

24.17
/gobuildslaveutils.go
1
package main
2

3
import (
4
        "context"
5
        "fmt"
6
        "os/exec"
7
        "strings"
8
        "time"
9

10
        "github.com/brotherlogic/goserver/utils"
11
        "github.com/prometheus/client_golang/prometheus"
12
        "github.com/prometheus/client_golang/prometheus/promauto"
13
        "google.golang.org/grpc/codes"
14
        "google.golang.org/grpc/status"
15

16
        pbb "github.com/brotherlogic/buildserver/proto"
17
        pbfc "github.com/brotherlogic/filecopier/proto"
18
        pb "github.com/brotherlogic/gobuildslave/proto"
19
        pbvt "github.com/brotherlogic/versiontracker/proto"
20
)
21

22
const (
23
        pendWait = time.Minute
24
)
25

26
var (
27
        ackQueueLen = promauto.NewGauge(prometheus.GaugeOpts{
28
                Name: "gobuildslave_ackqueuelen",
29
                Help: "The size of the ack queue",
30
        })
31
)
32

33
func (s *Server) procAcks() {
×
34
        for job := range s.ackChan {
×
35
                done := false
×
36
                for !done {
×
37
                        ackQueueLen.Set(float64(len(s.ackChan)))
×
38
                        ctx, cancel := utils.ManualContext("gobuildslaveack", time.Minute)
×
39
                        conn, err := s.FDialSpecificServer(ctx, "versiontracker", s.Registry.GetIdentifier())
×
40
                        if err != nil {
×
41
                                s.DLog(ctx, fmt.Sprintf("Dial error: (%v), %v\n", job.GetJob(), err))
×
42
                        } else {
×
43
                                client := pbvt.NewVersionTrackerServiceClient(conn)
×
44
                                _, err = client.NewJob(ctx, &pbvt.NewJobRequest{Version: &pbb.Version{Version: job.GetRunningVersion(), Job: job.GetJob()}})
×
45

×
46
                                //Slave can block if version tracker is unavailable - ignore this failure
×
47
                                if err == nil || status.Convert(err).Code() == codes.Unavailable {
×
48
                                        done = true
×
49
                                }
×
50
                                conn.Close()
×
51
                        }
52
                        cancel()
×
53

×
54
                        // Don't rush the system
×
55
                        time.Sleep(time.Second)
×
56
                }
57
        }
58
}
59

60
func (s *Server) runTransition(ctx context.Context, job *pb.JobAssignment) {
1✔
61
        // Stop all the tranistions if we're shutting down
1✔
62
        if s.shuttingDown {
1✔
63
                return
×
64
        }
×
65
        s.DLog(ctx, fmt.Sprintf("TRANS: %v\n", job))
1✔
66
        startState := job.State
1✔
67
        job.LastUpdateTime = time.Now().Unix()
1✔
68
        switch job.State {
1✔
69
        case pb.State_WARMUP:
1✔
70
                res, err := exec.Command("md5sum", fmt.Sprintf("/home/simon/gobuild/bin/%v", job.GetJob().GetName())).Output()
1✔
71
                if err != nil {
2✔
72
                        s.CtxLog(ctx, fmt.Sprintf("Error reading md5sum: %v", err))
1✔
73
                } else {
1✔
74
                        elems := strings.Fields(string(res))
×
75
                        job.RunningVersion = elems[0]
×
76
                }
×
77

78
                // Need to ack this job to get a version
79
                s.ackChan <- job
1✔
80

1✔
81
                job.State = pb.State_ACKNOWLEDGED
1✔
82
        case pb.State_ACKNOWLEDGED:
1✔
83
                key := s.scheduleBuild(ctx, job)
1✔
84
                job.SubState = fmt.Sprintf("SCHED: %v @ %v", key, time.Now())
1✔
85
                if !job.Job.Bootstrap {
2✔
86
                        if key != "" {
2✔
87
                                job.State = pb.State_BUILT
1✔
88
                                job.RunningVersion = key
1✔
89
                                s.ackChan <- job
1✔
90
                        } else {
2✔
91
                                // Bootstrap this job since we don't have an initial version
1✔
92
                                if job.Job.PartialBootstrap {
2✔
93
                                        job.Job.Bootstrap = true
1✔
94
                                }
1✔
95
                        }
96
                } else {
×
97
                        job.CommandKey = key
×
98
                        job.State = pb.State_BUILDING
×
99
                }
×
100
        case pb.State_BUILDING:
×
101
                s.CtxLog(ctx, fmt.Sprintf("Locking %v", job.Job.Name))
×
102
                s.stateMutex.Lock()
×
103
                s.stateMap[job.Job.Name] = fmt.Sprintf("BUILD(%v): %v", job.CommandKey, s.scheduler.getState(job.CommandKey))
×
104
                s.stateMutex.Unlock()
×
105
                s.CtxLog(ctx, fmt.Sprintf("UnLocking %v", job.Job.Name))
×
106
                s.scheduler.wait(job.CommandKey)
×
107
                job.State = pb.State_BUILT
×
108
        case pb.State_BUILT:
1✔
109
                job.SubState = "Getting Output"
1✔
110
                output, _ := s.scheduler.getOutput(job.CommandKey)
1✔
111
                job.SubState = "Entering Lock"
1✔
112
                s.CtxLog(ctx, fmt.Sprintf("Built Job %v -> %v", job.Job.Name, output))
1✔
113
                s.stateMutex.Lock()
1✔
114
                s.stateMap[job.Job.Name] = fmt.Sprintf("BUILT(%v): (%v): %v", job.CommandKey, len(output), output)
1✔
115
                s.stateMutex.Unlock()
1✔
116
                s.CtxLog(ctx, fmt.Sprintf("UnLocking %v", job.Job.Name))
1✔
117
                if job.Job.Bootstrap && len(output) > 0 {
2✔
118
                        if job.BuildFail == 5 {
2✔
119
                                job.SubState = "Sending Report"
1✔
120
                                s.deliverCrashReport(ctx, job, output)
1✔
121
                                job.BuildFail = 0
1✔
122
                        }
1✔
123
                        job.BuildFail++
1✔
124
                        job.State = pb.State_DIED
1✔
125
                } else {
×
126
                        job.State = pb.State_VERSION_CHECK
×
127
                        // Don't check version on pb jobs
×
128
                        if job.GetJob().GetPartialBootstrap() {
×
129
                                s.doCopy(job)
×
130
                                job.BuildFail = 0
×
131
                                job.SubState = "Scheduling A Run post copy"
×
132
                                key := s.scheduleRun(job)
×
133
                                job.CommandKey = key
×
134
                                job.StartTime = time.Now().Unix()
×
135
                                job.State = pb.State_PENDING
×
136
                                if _, ok := s.pendingMap[time.Now().Weekday()]; !ok {
×
137
                                        s.pendingMap[time.Now().Weekday()] = make(map[string]int)
×
138
                                }
×
139
                                s.pendingMap[time.Now().Weekday()][job.Job.Name]++
×
140
                        }
141
                }
142
                job.SubState = "Out of case"
1✔
143
        case pb.State_VERSION_CHECK:
×
144
                s.doCopy(job)
×
145
                s.loadCurrentVersions()
×
146
                version, err := s.getLatestVersion(ctx, job.GetJob().Name, job.GetJob().GetGoPath())
×
147
                if err != nil {
×
148
                        s.CtxLog(ctx, fmt.Sprintf("Error getting version: %v", err))
×
149
                        break
×
150
                }
151

152
                res, err := exec.Command("md5sum", fmt.Sprintf("/home/simon/gobuild/bin/%v", job.GetJob().GetName())).Output()
×
153
                if err != nil {
×
154
                        s.CtxLog(ctx, fmt.Sprintf("Error reading md5sum: %v", err))
×
155
                }
×
156
                elems := strings.Fields(string(res))
×
157
                job.RunningVersion = elems[0]
×
158
                s.ackChan <- job
×
159

×
160
                if elems[0] != version.GetVersion() {
×
161
                        s.versionsMutex.Lock()
×
162
                        s.CtxLog(ctx, fmt.Sprintf("Bad version on %v for %v -> %v vs %v", s.Registry.Identifier, job.GetJob().GetName(), elems[0], s.versions[job.GetJob().GetName()].Version))
×
163
                        s.versionsMutex.Unlock()
×
164

×
165
                        job.SubState = fmt.Sprintf("Dealing With Version Mismatch: %v", job.BuildFail)
×
166

×
167
                        if job.BuildFail > 10 {
×
NEW
168
                                // Do a fire and forget build request
×
NEW
169
                                conn, err := s.FDialServer(ctx, "buildserver")
×
NEW
170
                                if err == nil {
×
NEW
171
                                        bclient := pbb.NewBuildServiceClient(conn)
×
NEW
172
                                        bclient.Build(ctx, &pbb.BuildRequest{
×
NEW
173
                                                Job:     job.Job,
×
NEW
174
                                                BitSize: int32(s.Bits),
×
NEW
175
                                        })
×
NEW
176
                                        conn.Close()
×
NEW
177

×
NEW
178
                                }
×
179
                                s.RaiseIssue(fmt.Sprintf("Error running %v", job.Job.Name), fmt.Sprintf("Running on %v", s.Registry.Identifier))
×
180
                        }
181

182
                        conn, err := s.FDialSpecificServer(ctx, "versiontracker", s.Registry.Identifier)
×
183
                        if err != nil {
×
184
                                s.CtxLog(ctx, fmt.Sprintf("Unable to dial vt: %v", err))
×
185
                        } else {
×
186
                                defer conn.Close()
×
187
                                vtc := pbvt.NewVersionTrackerServiceClient(conn)
×
188
                                _, err = vtc.NewVersion(ctx, &pbvt.NewVersionRequest{
×
189
                                        Version: version,
×
190
                                })
×
191
                                s.CtxLog(ctx, fmt.Sprintf("Requested new version: %v", err))
×
192
                        }
×
193

194
                        // Don't let the job sit here
195
                        job.BuildFail++
×
196
                        if job.BuildFail > 10 {
×
197
                                job.State = pb.State_ACKNOWLEDGED
×
198

×
199
                        }
×
200
                        break
×
201
                }
202

203
                job.BuildFail = 0
×
204
                job.SubState = "Scheduling of the Run"
×
205
                key := s.scheduleRun(job)
×
206
                job.CommandKey = key
×
207
                job.StartTime = time.Now().Unix()
×
208
                job.State = pb.State_PENDING
×
209
                if _, ok := s.pendingMap[time.Now().Weekday()]; !ok {
×
210
                        s.pendingMap[time.Now().Weekday()] = make(map[string]int)
×
211
                }
×
212
                s.pendingMap[time.Now().Weekday()][job.Job.Name]++
×
213

214
        case pb.State_PENDING:
×
215
                res, err := exec.Command("md5sum", fmt.Sprintf("/home/simon/gobuild/bin/%v", job.GetJob().GetName())).Output()
×
216
                if err != nil {
×
217
                        s.CtxLog(ctx, fmt.Sprintf("Error reading md5sum: %v", err))
×
218
                }
×
219
                elems := strings.Fields(string(res))
×
220
                job.RunningVersion = elems[0]
×
221
                s.CtxLog(ctx, fmt.Sprintf("Sending to ack chan %v -> %v", job.GetJob().GetName(), len(s.ackChan)))
×
222
                s.ackChan <- job
×
223

×
224
                if job.Job.PartialBootstrap && job.Job.Bootstrap {
×
225
                        job.Job.Bootstrap = false
×
226
                }
×
227
                s.CtxLog(ctx, fmt.Sprintf("Locking %v", job.Job.Name))
×
228
                s.stateMutex.Lock()
×
229
                out, _ := s.scheduler.getOutput(job.CommandKey)
×
230
                s.stateMap[job.Job.Name] = fmt.Sprintf("OUTPUT = %v", out)
×
231
                s.stateMutex.Unlock()
×
232
                s.CtxLog(ctx, fmt.Sprintf("UnLocking %v", job.Job.Name))
×
233
                if time.Now().Add(-time.Minute).Unix() > job.StartTime {
×
234
                        var err error
×
235
                        if job.Job.Name == "discovery" {
×
236
                                err = s.runOnChange()
×
237
                        }
×
238
                        code := status.Convert(err).Code()
×
239

×
240
                        //Unavailable allows the job to die here
×
241
                        if code == codes.OK || code == codes.Unavailable {
×
242
                                // Validate that the job is alive
×
243
                                if !s.isJobAlive(ctx, job) {
×
244
                                        s.CtxLog(ctx, fmt.Sprintf("Job %v is not alive", job))
×
245
                                }
×
246
                                job.State = pb.State_RUNNING
×
247
                        } else {
×
248
                                if len(s.scheduler.getState(job.CommandKey)) > 0 {
×
249
                                        job.State = pb.State_DIED
×
250
                                        s.CtxLog(ctx, fmt.Sprintf("Recording job as dead: '%v'", s.scheduler.getState(job.CommandKey)))
×
251
                                }
×
252
                                s.CtxLog(ctx, fmt.Sprintf("Cannot reregister: %v", err))
×
253
                        }
254
                }
255
        case pb.State_RUNNING:
×
256
                output, errout := s.scheduler.getOutput(job.CommandKey)
×
257
                output2, _ := s.scheduler.getErrOutput(job.CommandKey)
×
258
                s.CtxLog(ctx, fmt.Sprintf("Locking %v", job.Job.Name))
×
259
                s.stateMutex.Lock()
×
260
                s.stateMap[job.Job.Name] = fmt.Sprintf("ROUTPUT = %v, %v", output, s.scheduler.getStatus(job.CommandKey))
×
261
                job.Status = s.scheduler.getStatus(job.CommandKey)
×
262
                s.stateMutex.Unlock()
×
263
                s.CtxLog(ctx, fmt.Sprintf("UnLocking %v", job.Job.Name))
×
264
                if len(job.CommandKey) > 0 {
×
265
                        s.scheduler.wait(job.CommandKey)
×
266
                        s.stateMap[job.Job.Name] = fmt.Sprintf("ONLOCk = (%v, %v)", job, output)
×
267
                        s.CtxLog(ctx, fmt.Sprintf("Locking %v", job.Job.Name))
×
268
                        s.stateMutex.Lock()
×
269
                        s.stateMap[job.Job.Name] = fmt.Sprintf("COMPLETE = (%v, %v)", job, output)
×
270
                        s.stateMutex.Unlock()
×
271
                        s.CtxLog(ctx, fmt.Sprintf("UnLocking %v", job.Job.Name))
×
272
                        s.deliverCrashReport(ctx, job, fmt.Sprintf("%v%v", output, output2))
×
273
                        job.State = pb.State_DIED
×
274
                }
×
275

276
                if s.Registry != nil {
×
277
                        entry, err := s.FFindSpecificServer(ctx, job.Job.Name, s.Registry.Identifier)
×
278
                        if err != nil {
×
279
                                if job.DiscoverCount > 30 {
×
280
                                        output2, errout2 := s.scheduler.getErrOutput(job.CommandKey)
×
281
                                        s.RaiseIssue("Cannot Discover Running Server", fmt.Sprintf("%v on %v is not discoverable, despite running (%v) the output says %v (%v), %v, %v", job.Job.Name, s.Registry.Identifier, err, output, errout, output2, errout2))
×
282
                                }
×
283
                                job.DiscoverCount++
×
284
                                s.CtxLog(ctx, fmt.Sprintf("Missing discover for %+v, %v -> %v", job, entry, err))
×
285
                        } else {
×
286
                                job.Port = entry.GetPort()
×
287
                                job.DiscoverCount = 0
×
288
                        }
×
289
                }
290

291
                // Restart this job if we need to
292
                if !job.Job.Bootstrap {
×
293
                        if time.Now().Sub(time.Unix(job.LastVersionPull, 0)) > time.Minute*5 {
×
294
                                version, err := s.getVersion(ctx, job.Job)
×
295
                                job.LastVersionPull = time.Now().Unix()
×
296

×
297
                                if err == nil && version.Version != job.RunningVersion {
×
298
                                        s.CtxLog(ctx, fmt.Sprintf("Locking %v", job.Job.Name))
×
299
                                        s.stateMutex.Lock()
×
300
                                        s.stateMap[job.Job.Name] = fmt.Sprintf("VERSION_MISMATCH = %v,%v", version, job.RunningVersion)
×
301
                                        s.stateMutex.Unlock()
×
302
                                        s.CtxLog(ctx, fmt.Sprintf("UnLocking %v", job.Job.Name))
×
303
                                        s.scheduler.killJob(job.CommandKey)
×
304
                                }
×
305
                        }
306
                }
307
        case pb.State_BRINK_OF_DEATH:
1✔
308
                if s.version.confirm(ctx, job.Job.Name) {
2✔
309
                        s.scheduler.killJob(job.CommandKey)
1✔
310
                        job.State = pb.State_ACKNOWLEDGED
1✔
311
                }
1✔
312
        case pb.State_DIED:
×
313
                s.CtxLog(ctx, fmt.Sprintf("Locking %v", job.Job.Name))
×
314
                s.stateMutex.Lock()
×
315
                s.stateMap[job.Job.Name] = fmt.Sprintf("DIED %v", job.CommandKey)
×
316
                s.stateMutex.Unlock()
×
317
                s.CtxLog(ctx, fmt.Sprintf("UnLocking %v", job.Job.Name))
×
318
                job.State = pb.State_ACKNOWLEDGED
×
319
        }
320

321
        time.Sleep(time.Second * 5)
1✔
322

1✔
323
        if job.State != startState {
2✔
324
                job.LastTransitionTime = time.Now().Unix()
1✔
325
        }
1✔
326
}
327

328
type translator interface {
329
        build(job *pb.Job) *exec.Cmd
330
        run(job *pb.Job) *exec.Cmd
331
}
332

333
type checker interface {
334
        isAlive(ctx context.Context, job *pb.JobAssignment) bool
335
}
336

337
func (s *Server) getVersion(ctx context.Context, job *pb.Job) (*pbb.Version, error) {
1✔
338
        version, err := s.builder.build(ctx, job)
1✔
339
        if err != nil {
2✔
340
                return &pbb.Version{}, err
1✔
341
        }
1✔
342

343
        return version, nil
×
344
}
345

346
func updateJob(err error, job *pb.JobAssignment, resp *pbfc.CopyResponse) {
1✔
347
        if err == nil {
2✔
348
                job.QueuePos = resp.IndexInQueue
1✔
349
        }
1✔
350

351
}
352

353
// scheduleBuild builds out the job, returning the current version
354
func (s *Server) scheduleBuild(ctx context.Context, job *pb.JobAssignment) string {
1✔
355
        if job.Job.Bootstrap {
1✔
356
                c := s.translator.build(job.Job)
×
357
                // Block builds to prevent clashes
×
358
                return s.scheduler.Schedule(&rCommand{command: c, base: job.Job.Name, block: true})
×
359
        }
×
360

361
        val, err := s.builder.build(ctx, job.Job)
1✔
362
        if err != nil {
2✔
363
                s.DLog(ctx, fmt.Sprintf("BUILD Error: %v\n", err))
1✔
364
                return ""
1✔
365
        }
1✔
366
        return val.Version
1✔
367
}
368

369
func (s *Server) doCopy(job *pb.JobAssignment) {
×
370
        //Copy over any existing new versions
×
371

×
372
        key := s.scheduler.Schedule(&rCommand{command: exec.Command("mv", "$GOPATH/bin/"+job.GetJob().GetName()+".new", "$GOPATH/bin/"+job.GetJob().GetName()), base: job.GetJob().GetName()})
×
373
        s.scheduler.wait(key)
×
374

×
375
        key = s.scheduler.Schedule(&rCommand{command: exec.Command("mv", "$GOPATH/bin/"+job.GetJob().GetName()+".nversion", "$GOPATH/bin/"+job.GetJob().GetName()+".version"), base: job.GetJob().GetName() + ".version"})
×
376
        s.scheduler.wait(key)
×
377
}
×
378

379
func (s *Server) scheduleRun(job *pb.JobAssignment) string {
×
380
        // Wait a while before starting the job JIC
×
381
        time.Sleep(time.Second * 2)
×
382

×
383
        c := s.translator.run(job.GetJob())
×
384
        return s.scheduler.Schedule(&rCommand{command: c, base: job.GetJob().GetName()})
×
385
}
×
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc