• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

dgraph-io / dgraph / 5071841203

24 May 2023 06:48PM UTC coverage: 67.305% (+0.5%) from 66.817%
5071841203

push

GitHub
dgraphtest: print container logs if the test fails (#8829)

58436 of 86823 relevant lines covered (67.3%)

2242581.29 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

67.78
/worker/queue.go
1
/*
2
 * Copyright 2023 Dgraph Labs, Inc. and Contributors
3
 *
4
 * Licensed under the Apache License, Version 2.0 (the "License");
5
 * you may not use this file except in compliance with the License.
6
 * You may obtain a copy of the License at
7
 *
8
 *     http://www.apache.org/licenses/LICENSE-2.0
9
 *
10
 * Unless required by applicable law or agreed to in writing, software
11
 * distributed under the License is distributed on an "AS IS" BASIS,
12
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
 * See the License for the specific language governing permissions and
14
 * limitations under the License.
15
 */
16

17
package worker
18

19
import (
20
        "context"
21
        "fmt"
22
        "math"
23
        "math/rand"
24
        "path/filepath"
25
        "reflect"
26
        "sync"
27
        "time"
28

29
        "github.com/golang/glog"
30
        "github.com/pkg/errors"
31

32
        "github.com/dgraph-io/dgraph/conn"
33
        "github.com/dgraph-io/dgraph/protos/pb"
34
        "github.com/dgraph-io/dgraph/raftwal"
35
        "github.com/dgraph-io/dgraph/x"
36
        "github.com/dgraph-io/ristretto/z"
37
)
38

39
// TaskStatusOverNetwork fetches the status of a task over the network. Alphas only know about the
40
// tasks created by them, but this function would fetch the task from the correct Alpha.
41
func TaskStatusOverNetwork(ctx context.Context, req *pb.TaskStatusRequest,
42
) (*pb.TaskStatusResponse, error) {
33✔
43
        // Extract Raft ID from Task ID.
33✔
44
        taskId := req.GetTaskId()
33✔
45
        if taskId == 0 {
33✔
46
                return nil, fmt.Errorf("invalid task ID: %#x", taskId)
×
47
        }
×
48
        raftId := taskId >> 32
33✔
49

33✔
50
        // Skip the network call if the required Alpha is me.
33✔
51
        myRaftId := State.WALstore.Uint(raftwal.RaftId)
33✔
52
        if raftId == myRaftId {
66✔
53
                worker := (*grpcWorker)(nil)
33✔
54
                return worker.TaskStatus(ctx, req)
33✔
55
        }
33✔
56

57
        // Find the Alpha with the required Raft ID.
58
        var addr string
×
59
        for _, group := range groups().state.GetGroups() {
×
60
                for _, member := range group.GetMembers() {
×
61
                        if member.GetId() == raftId {
×
62
                                addr = member.GetAddr()
×
63
                        }
×
64
                }
65
        }
66
        if addr == "" {
×
67
                return nil, fmt.Errorf("the Alpha that served that task is not available")
×
68
        }
×
69

70
        // Send the request to the Alpha.
71
        pool, err := conn.GetPools().Get(addr)
×
72
        if err != nil {
×
73
                return nil, errors.Wrapf(err, "unable to reach the Alpha that served that task")
×
74
        }
×
75
        client := pb.NewWorkerClient(pool.Get())
×
76
        return client.TaskStatus(ctx, req)
×
77
}
78

79
// TaskStatus retrieves metadata for a given task ID.
80
func (*grpcWorker) TaskStatus(ctx context.Context, req *pb.TaskStatusRequest,
81
) (*pb.TaskStatusResponse, error) {
33✔
82
        taskId := req.GetTaskId()
33✔
83
        meta, err := Tasks.get(taskId)
33✔
84
        if err != nil {
33✔
85
                return nil, err
×
86
        }
×
87

88
        resp := &pb.TaskStatusResponse{TaskMeta: meta.uint64()}
33✔
89
        return resp, nil
33✔
90
}
91

92
var (
93
        // Tasks is a global persistent task queue.
94
        // Do not use this before calling InitTasks.
95
        Tasks *tasks
96
)
97

98
// InitTasks initializes the global Tasks variable.
99
func InitTasks() {
91✔
100
        path := filepath.Join(x.WorkerConfig.TmpDir, "tasks.buf")
91✔
101
        log, err := z.NewTreePersistent(path)
91✔
102
        x.Check(err)
91✔
103

91✔
104
        // #nosec G404: weak RNG
91✔
105
        Tasks = &tasks{
91✔
106
                queue: make(chan taskRequest, 16),
91✔
107
                log:   log,
91✔
108
                logMu: new(sync.Mutex),
91✔
109
                rng:   rand.New(rand.NewSource(time.Now().UnixNano())),
91✔
110
        }
91✔
111

91✔
112
        // Mark all pending tasks as failed.
91✔
113
        Tasks.logMu.Lock()
91✔
114
        Tasks.log.IterateKV(func(id, val uint64) uint64 {
91✔
115
                meta := TaskMeta(val)
×
116
                if status := meta.Status(); status == TaskStatusQueued || status == TaskStatusRunning {
×
117
                        return uint64(newTaskMeta(meta.Kind(), TaskStatusFailed))
×
118
                }
×
119
                return 0
×
120
        })
121
        Tasks.logMu.Unlock()
91✔
122

91✔
123
        // Start the task runner.
91✔
124
        go Tasks.worker()
91✔
125
}
126

127
// tasks is a persistent task queue.
128
type tasks struct {
129
        // queue stores the full Protobuf request.
130
        queue chan taskRequest
131
        // log stores the timestamp, TaskKind, and TaskStatus.
132
        log   *z.Tree
133
        logMu *sync.Mutex
134

135
        rng *rand.Rand
136
}
137

138
// Enqueue adds a new task to the queue, waits for 3 seconds, and returns any errors that
139
// may have happened in that span of time. The request must be of type:
140
// - *pb.BackupRequest
141
// - *pb.ExportRequest
142
func (t *tasks) Enqueue(req interface{}) (uint64, error) {
42✔
143
        if t == nil {
42✔
144
                return 0, fmt.Errorf("task queue hasn't been initialized yet")
×
145
        }
×
146

147
        id, err := t.enqueue(req)
42✔
148
        if err != nil {
42✔
149
                return 0, err
×
150
        }
×
151

152
        // Wait for upto 3 seconds to check for errors.
153
        for i := 0; i < 3; i++ {
84✔
154
                time.Sleep(time.Second)
42✔
155

42✔
156
                t.logMu.Lock()
42✔
157
                meta := TaskMeta(t.log.Get(id))
42✔
158
                t.logMu.Unlock()
42✔
159

42✔
160
                // Early return
42✔
161
                switch meta.Status() {
42✔
162
                case TaskStatusFailed:
×
163
                        return 0, fmt.Errorf("task failed")
×
164
                case TaskStatusSuccess:
42✔
165
                        return id, nil
42✔
166
                }
167
        }
168

169
        return id, nil
×
170
}
171

172
// enqueue adds a new task to the queue. This must be of type:
173
// - *pb.BackupRequest
174
// - *pb.ExportRequest
175
func (t *tasks) enqueue(req interface{}) (uint64, error) {
42✔
176
        var kind TaskKind
42✔
177
        switch req.(type) {
42✔
178
        case *pb.BackupRequest:
35✔
179
                kind = TaskKindBackup
35✔
180
        case *pb.ExportRequest:
7✔
181
                kind = TaskKindExport
7✔
182
        default:
×
183
                panic(fmt.Sprintf("invalid TaskKind: %d", kind))
×
184
        }
185

186
        t.logMu.Lock()
42✔
187
        defer t.logMu.Unlock()
42✔
188

42✔
189
        task := taskRequest{
42✔
190
                id:  t.newId(),
42✔
191
                req: req,
42✔
192
        }
42✔
193
        select {
42✔
194
        // t.logMu must be acquired before pushing to t.queue, otherwise the worker might start the
195
        // task, and won't be able to find it in t.log.
196
        case t.queue <- task:
42✔
197
                t.log.Set(task.id, newTaskMeta(kind, TaskStatusQueued).uint64())
42✔
198
                return task.id, nil
42✔
199
        default:
×
200
                return 0, fmt.Errorf("too many pending tasks, please try again later")
×
201
        }
202
}
203

204
// get retrieves metadata for a given task ID.
205
func (t *tasks) get(id uint64) (TaskMeta, error) {
33✔
206
        if t == nil {
33✔
207
                return 0, fmt.Errorf("task queue hasn't been initialized yet")
×
208
        }
×
209

210
        if id == 0 || id == math.MaxUint64 {
33✔
211
                return 0, fmt.Errorf("task ID is invalid: %d", id)
×
212
        }
×
213
        t.logMu.Lock()
33✔
214
        defer t.logMu.Unlock()
33✔
215
        meta := TaskMeta(t.log.Get(id))
33✔
216
        if meta == 0 {
33✔
217
                return 0, fmt.Errorf("task does not exist or has expired")
×
218
        }
×
219
        return meta, nil
33✔
220
}
221

222
// worker loops forever, running queued tasks one at a time. Any returned errors are logged.
223
func (t *tasks) worker() {
91✔
224
        shouldCleanup := time.NewTicker(time.Hour)
91✔
225
        defer shouldCleanup.Stop()
91✔
226
        for {
224✔
227
                // If the server is shutting down, return immediately. Else, fetch a task from the queue.
133✔
228
                var task taskRequest
133✔
229
                select {
133✔
230
                case <-x.ServerCloser.HasBeenClosed():
91✔
231
                        if err := t.log.Close(); err != nil {
91✔
232
                                glog.Warningf("error closing log file: %v", err)
×
233
                        }
×
234
                        return
91✔
235
                case <-shouldCleanup.C:
×
236
                        t.cleanup()
×
237
                case task = <-t.queue:
42✔
238
                        if err := t.run(task); err != nil {
42✔
239
                                glog.Errorf("task %#x: failed: %s", task.id, err)
×
240
                        } else {
42✔
241
                                glog.Infof("task %#x: completed successfully", task.id)
42✔
242
                        }
42✔
243
                }
244
        }
245
}
246

247
func (t *tasks) run(task taskRequest) error {
42✔
248
        // Fetch the task from the log. If the task isn't found, this means it has expired (older than
42✔
249
        // taskTtl).
42✔
250
        t.logMu.Lock()
42✔
251
        meta := TaskMeta(t.log.Get(task.id))
42✔
252
        t.logMu.Unlock()
42✔
253
        if meta == 0 {
42✔
254
                return fmt.Errorf("is expired, skipping")
×
255
        }
×
256

257
        // Only proceed if the task is still queued. It's possible that the task got canceled before we
258
        // were able to run it.
259
        if status := meta.Status(); status != TaskStatusQueued {
42✔
260
                return fmt.Errorf("status is set to %s, skipping", status)
×
261
        }
×
262

263
        // Change the task status to Running.
264
        t.logMu.Lock()
42✔
265
        t.log.Set(task.id, newTaskMeta(meta.Kind(), TaskStatusRunning).uint64())
42✔
266
        t.logMu.Unlock()
42✔
267

42✔
268
        // Run the task.
42✔
269
        var status TaskStatus
42✔
270
        err := task.run()
42✔
271
        if err != nil {
42✔
272
                status = TaskStatusFailed
×
273
        } else {
42✔
274
                status = TaskStatusSuccess
42✔
275
        }
42✔
276

277
        // Change the task status to Success / Failed.
278
        t.logMu.Lock()
42✔
279
        t.log.Set(task.id, newTaskMeta(meta.Kind(), status).uint64())
42✔
280
        t.logMu.Unlock()
42✔
281

42✔
282
        // Return the error from the task.
42✔
283
        return err
42✔
284
}
285

286
// cleanup deletes all expired tasks.
287
func (t *tasks) cleanup() {
×
288
        const taskTtl = 7 * 24 * time.Hour // 1 week
×
289
        minTs := time.Now().UTC().Add(-taskTtl).Unix()
×
290
        minMeta := uint64(minTs) << 32
×
291

×
292
        t.logMu.Lock()
×
293
        defer t.logMu.Unlock()
×
294
        t.log.DeleteBelow(minMeta)
×
295
}
×
296

297
// newId generates a random unique task ID. logMu must be acquired before calling this function.
298
//
299
// The format of this is:
300
// 32 bits: raft ID
301
// 32 bits: random number
302
func (t *tasks) newId() uint64 {
42✔
303
        myRaftId := State.WALstore.Uint(raftwal.RaftId)
42✔
304
        for {
84✔
305
                id := myRaftId<<32 | uint64(t.rng.Intn(math.MaxUint32))
42✔
306
                // z.Tree cannot store 0 or math.MaxUint64. Check that id is unique.
42✔
307
                if id != 0 && id != math.MaxUint64 && t.log.Get(id) == 0 {
84✔
308
                        return id
42✔
309
                }
42✔
310
        }
311
}
312

313
type taskRequest struct {
314
        id  uint64
315
        req interface{} // *pb.BackupRequest, *pb.ExportRequest
316
}
317

318
// run starts a task and blocks till it completes.
319
func (t *taskRequest) run() error {
42✔
320
        switch req := t.req.(type) {
42✔
321
        case *pb.BackupRequest:
35✔
322
                if err := ProcessBackupRequest(context.Background(), req); err != nil {
35✔
323
                        return err
×
324
                }
×
325
        case *pb.ExportRequest:
7✔
326
                files, err := ExportOverNetwork(context.Background(), req)
7✔
327
                if err != nil {
7✔
328
                        return err
×
329
                }
×
330
                glog.Infof("task %#x: exported files: %v", t.id, files)
7✔
331
        default:
×
332
                glog.Errorf(
×
333
                        "task %#x: received request of unknown type (%T)", t.id, reflect.TypeOf(t.req))
×
334
        }
335
        return nil
42✔
336
}
337

338
// TaskMeta stores a timestamp, a TaskKind and a Status.
339
//
340
// The format of this is:
341
// 32 bits: UNIX timestamp (overflows on 2106-02-07)
342
// 16 bits: TaskKind
343
// 16 bits: TaskStatus
344
type TaskMeta uint64
345

346
func newTaskMeta(kind TaskKind, status TaskStatus) TaskMeta {
126✔
347
        now := time.Now().UTC().Unix()
126✔
348
        return TaskMeta(now)<<32 | TaskMeta(kind)<<16 | TaskMeta(status)
126✔
349
}
126✔
350

351
// Timestamp returns the timestamp of the last status change of the task.
352
func (t TaskMeta) Timestamp() time.Time {
33✔
353
        return time.Unix(int64(t>>32), 0)
33✔
354
}
33✔
355

356
// Kind returns the type of the task.
357
func (t TaskMeta) Kind() TaskKind {
117✔
358
        return TaskKind((t >> 16) & math.MaxUint16)
117✔
359
}
117✔
360

361
// Status returns the current status of the task.
362
func (t TaskMeta) Status() TaskStatus {
117✔
363
        return TaskStatus(t & math.MaxUint16)
117✔
364
}
117✔
365

366
// uint64 represents the TaskMeta as a uint64.
367
func (t TaskMeta) uint64() uint64 {
159✔
368
        return uint64(t)
159✔
369
}
159✔
370

371
const (
372
        // Reserve the zero value for errors.
373
        TaskKindBackup TaskKind = iota + 1
374
        TaskKindExport
375
)
376

377
type TaskKind uint64
378

379
func (k TaskKind) String() string {
33✔
380
        switch k {
33✔
381
        case TaskKindBackup:
28✔
382
                return "Backup"
28✔
383
        case TaskKindExport:
5✔
384
                return "Export"
5✔
385
        default:
×
386
                return "Unknown"
×
387
        }
388
}
389

390
const (
391
        // Reserve the zero value for errors.
392
        TaskStatusQueued TaskStatus = iota + 1
393
        TaskStatusRunning
394
        TaskStatusFailed
395
        TaskStatusSuccess
396
)
397

398
type TaskStatus uint64
399

400
func (status TaskStatus) String() string {
33✔
401
        switch status {
33✔
402
        case TaskStatusQueued:
×
403
                return "Queued"
×
404
        case TaskStatusRunning:
×
405
                return "Running"
×
406
        case TaskStatusFailed:
×
407
                return "Failed"
×
408
        case TaskStatusSuccess:
33✔
409
                return "Success"
33✔
410
        default:
×
411
                return "Unknown"
×
412
        }
413
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc