• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

dgraph-io / dgraph / 5078820494

25 May 2023 11:19AM UTC coverage: 67.259% (-0.009%) from 67.268%
5078820494

push

GitHub
dgraphtest: print container logs if the test fails (#8829)

58396 of 86823 relevant lines covered (67.26%)

2270884.31 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

81.13
/dgraph/cmd/zero/oracle.go
1
/*
2
 * Copyright 2017-2023 Dgraph Labs, Inc. and Contributors
3
 *
4
 * Licensed under the Apache License, Version 2.0 (the "License");
5
 * you may not use this file except in compliance with the License.
6
 * You may obtain a copy of the License at
7
 *
8
 *     http://www.apache.org/licenses/LICENSE-2.0
9
 *
10
 * Unless required by applicable law or agreed to in writing, software
11
 * distributed under the License is distributed on an "AS IS" BASIS,
12
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
 * See the License for the specific language governing permissions and
14
 * limitations under the License.
15
 */
16

17
package zero
18

19
import (
20
        "context"
21
        "math/rand"
22
        "strconv"
23
        "strings"
24
        "time"
25

26
        "github.com/golang/glog"
27
        "github.com/pkg/errors"
28
        otrace "go.opencensus.io/trace"
29

30
        "github.com/dgraph-io/badger/v4/y"
31
        "github.com/dgraph-io/dgo/v230/protos/api"
32
        "github.com/dgraph-io/dgraph/protos/pb"
33
        "github.com/dgraph-io/dgraph/x"
34
        "github.com/dgraph-io/ristretto/z"
35
)
36

37
// Oracle stores and manages the transaction state and conflict detection.
38
type Oracle struct {
39
        x.SafeMutex
40
        commits map[uint64]uint64 // startTs -> commitTs
41
        // TODO: Check if we need LRU.
42
        keyCommit   *z.Tree // fp(key) -> commitTs. Used to detect conflict.
43
        maxAssigned uint64  // max transaction assigned by us.
44

45
        // All transactions with startTs < startTxnTs return true for hasConflict.
46
        startTxnTs  uint64
47
        subscribers map[int]chan pb.OracleDelta
48
        updates     chan *pb.OracleDelta
49
        doneUntil   y.WaterMark
50
}
51

52
// Init initializes the oracle.
53
func (o *Oracle) Init() {
66✔
54
        o.commits = make(map[uint64]uint64)
66✔
55
        // Remove the older btree file, before creating NewTree, as it may contain stale data leading
66✔
56
        // to wrong results.
66✔
57
        o.keyCommit = z.NewTree("oracle")
66✔
58
        o.subscribers = make(map[int]chan pb.OracleDelta)
66✔
59
        o.updates = make(chan *pb.OracleDelta, 100000) // Keeping 1 second worth of updates.
66✔
60
        o.doneUntil.Init(nil)
66✔
61
        go o.sendDeltasToSubscribers()
66✔
62
}
66✔
63

64
// close releases the memory associated with btree used for keycommit.
65
func (o *Oracle) close() {
66✔
66
        if err := o.keyCommit.Close(); err != nil {
66✔
67
                glog.Warningf("error while closing tree: %v", err)
×
68
        }
×
69
}
70

71
func (o *Oracle) updateStartTxnTs(ts uint64) {
53✔
72
        o.Lock()
53✔
73
        defer o.Unlock()
53✔
74
        o.startTxnTs = ts
53✔
75
        o.keyCommit.Reset()
53✔
76
}
53✔
77

78
// TODO: This should be done during proposal application for Txn status.
79
func (o *Oracle) hasConflict(src *api.TxnContext) bool {
20,218✔
80
        // This transaction was started before I became leader.
20,218✔
81
        if src.StartTs < o.startTxnTs {
20,218✔
82
                return true
×
83
        }
×
84
        for _, k := range src.Keys {
1,075,303✔
85
                ki, err := strconv.ParseUint(k, 36, 64)
1,055,085✔
86
                if err != nil {
1,055,085✔
87
                        glog.Errorf("Got error while parsing conflict key %q: %v\n", k, err)
×
88
                        continue
×
89
                }
90
                if last := o.keyCommit.Get(ki); last > src.StartTs {
1,056,901✔
91
                        return true
1,816✔
92
                }
1,816✔
93
        }
94
        return false
18,402✔
95
}
96

97
func (o *Oracle) purgeBelow(minTs uint64) {
40✔
98
        var timer x.Timer
40✔
99
        timer.Start()
40✔
100

40✔
101
        o.Lock()
40✔
102
        defer o.Unlock()
40✔
103

40✔
104
        // Set startTxnTs so that every txn with start ts less than this, would be aborted.
40✔
105
        o.startTxnTs = minTs
40✔
106

40✔
107
        // Dropping would be cheaper if abort/commits map is sharded
40✔
108
        for ts := range o.commits {
33,364✔
109
                if ts < minTs {
50,996✔
110
                        delete(o.commits, ts)
17,672✔
111
                }
17,672✔
112
        }
113
        timer.Record("commits")
40✔
114

40✔
115
        // There is no transaction running with startTs less than minTs
40✔
116
        // So we can delete everything from rowCommit whose commitTs < minTs
40✔
117
        stats := o.keyCommit.Stats()
40✔
118
        if stats.Occupancy < 50.0 {
74✔
119
                return
34✔
120
        }
34✔
121
        o.keyCommit.DeleteBelow(minTs)
6✔
122
        timer.Record("deleteBelow")
6✔
123
        glog.V(2).Infof("Purged below ts:%d, len(o.commits):%d, keyCommit: [before: %+v, after: %+v].\n",
6✔
124
                minTs, len(o.commits), stats, o.keyCommit.Stats())
6✔
125
        if timer.Total() > time.Second {
6✔
126
                glog.V(2).Infof("Purge %s\n", timer.String())
×
127
        }
×
128
}
129

130
func (o *Oracle) commit(src *api.TxnContext) error {
9,219✔
131
        o.Lock()
9,219✔
132
        defer o.Unlock()
9,219✔
133

9,219✔
134
        if o.hasConflict(src) {
9,255✔
135
                return x.ErrConflict
36✔
136
        }
36✔
137
        // We store src.Keys as string to ensure compatibility with all the various language clients we
138
        // have. But, really they are just uint64s encoded as strings. We use base 36 during creation of
139
        // these keys in FillContext in posting/mvcc.go.
140
        for _, k := range src.Keys {
535,416✔
141
                ki, err := strconv.ParseUint(k, 36, 64)
526,233✔
142
                if err != nil {
526,233✔
143
                        glog.Errorf("Got error while parsing conflict key %q: %v\n", k, err)
×
144
                        continue
×
145
                }
146
                o.keyCommit.Set(ki, src.CommitTs) // CommitTs is handed out before calling this func.
526,233✔
147
        }
148
        return nil
9,183✔
149
}
150

151
func (o *Oracle) currentState() *pb.OracleDelta {
82✔
152
        o.AssertRLock()
82✔
153
        resp := &pb.OracleDelta{}
82✔
154
        for start, commit := range o.commits {
104✔
155
                resp.Txns = append(resp.Txns,
22✔
156
                        &pb.TxnStatus{StartTs: start, CommitTs: commit})
22✔
157
        }
22✔
158
        resp.MaxAssigned = o.maxAssigned
82✔
159
        return resp
82✔
160
}
161

162
func (o *Oracle) newSubscriber() (<-chan pb.OracleDelta, int) {
82✔
163
        o.Lock()
82✔
164
        defer o.Unlock()
82✔
165
        var id int
82✔
166
        for {
164✔
167
                //nolint:gosec // random generator for node id does not require cryptographic precision
82✔
168
                id = rand.Int()
82✔
169
                if _, has := o.subscribers[id]; !has {
164✔
170
                        break
82✔
171
                }
172
        }
173

174
        // The channel takes a delta instead of a pointer as the receiver needs to
175
        // modify it by setting the group checksums. Passing a pointer previously
176
        // resulted in a race condition.
177
        ch := make(chan pb.OracleDelta, 1000)
82✔
178
        ch <- *o.currentState() // Queue up the full state as the first entry.
82✔
179
        o.subscribers[id] = ch
82✔
180
        return ch, id
82✔
181
}
182

183
func (o *Oracle) removeSubscriber(id int) {
82✔
184
        o.Lock()
82✔
185
        defer o.Unlock()
82✔
186
        delete(o.subscribers, id)
82✔
187
}
82✔
188

189
// sendDeltasToSubscribers reads updates from the o.updates
190
// constructs a delta object containing transactions from one or more updates
191
// and sends the delta object to each subscriber's channel
192
func (o *Oracle) sendDeltasToSubscribers() {
66✔
193
        delta := &pb.OracleDelta{}
66✔
194
        ticker := time.NewTicker(time.Second)
66✔
195
        defer ticker.Stop()
66✔
196

66✔
197
        // waitFor calculates the maximum value of delta.MaxAssigned and all the CommitTs of delta.Txns
66✔
198
        waitFor := func() uint64 {
106,949✔
199
                w := delta.MaxAssigned
106,883✔
200
                for _, txn := range delta.Txns {
44,104,505✔
201
                        w = x.Max(w, txn.CommitTs)
43,997,622✔
202
                }
43,997,622✔
203
                return w
106,883✔
204
        }
205

206
        for {
94,436✔
207
        get_update:
94,370✔
208
                var update *pb.OracleDelta
106,905✔
209
                select {
106,905✔
210
                case update = <-o.updates:
94,260✔
211
                case <-ticker.C:
12,579✔
212
                        wait := waitFor()
12,579✔
213
                        if wait == 0 || o.doneUntil.DoneUntil() < wait {
25,114✔
214
                                goto get_update
12,535✔
215
                        }
216
                        // Send empty update.
217
                        update = &pb.OracleDelta{}
44✔
218
                }
219
        slurp_loop:
94,304✔
220
                for {
189,817✔
221
                        delta.MaxAssigned = x.Max(delta.MaxAssigned, update.MaxAssigned)
95,513✔
222
                        delta.Txns = append(delta.Txns, update.Txns...)
95,513✔
223
                        select {
95,513✔
224
                        case update = <-o.updates:
1,209✔
225
                        default:
94,304✔
226
                                break slurp_loop
94,304✔
227
                        }
228
                }
229
                // No need to sort the txn updates here. Alpha would sort them before
230
                // applying.
231

232
                // Let's ensure that we have all the commits up until the max here.
233
                // Otherwise, we'll be sending commit timestamps out of order, which
234
                // would cause Alphas to drop some of them, during writes to Badger.
235
                if o.doneUntil.DoneUntil() < waitFor() {
113,795✔
236
                        continue // The for loop doing blocking reads from o.updates.
19,491✔
237
                        // We need at least one entry from the updates channel to pick up a missing update.
238
                        // Don't goto slurp_loop, because it would break from select immediately.
239
                }
240

241
                if glog.V(3) {
74,813✔
242
                        glog.Infof("DoneUntil: %d. Sending delta: %+v\n", o.doneUntil.DoneUntil(), delta)
×
243
                }
×
244
                o.Lock()
74,813✔
245
                for id, ch := range o.subscribers {
227,575✔
246
                        select {
152,762✔
247
                        case ch <- *delta:
152,762✔
248
                        default:
×
249
                                close(ch)
×
250
                                delete(o.subscribers, id)
×
251
                        }
252
                }
253
                o.Unlock()
74,813✔
254
                delta = &pb.OracleDelta{}
74,813✔
255
        }
256
}
257

258
func (o *Oracle) updateCommitStatusHelper(index uint64, src *api.TxnContext) bool {
23,635✔
259
        o.Lock()
23,635✔
260
        defer o.Unlock()
23,635✔
261
        if _, ok := o.commits[src.StartTs]; ok {
24,279✔
262
                return false
644✔
263
        }
644✔
264
        if src.Aborted {
28,101✔
265
                o.commits[src.StartTs] = 0
5,110✔
266
        } else {
22,991✔
267
                o.commits[src.StartTs] = src.CommitTs
17,881✔
268
        }
17,881✔
269
        return true
22,991✔
270
}
271

272
func (o *Oracle) updateCommitStatus(index uint64, src *api.TxnContext) {
23,635✔
273
        // TODO: We should check if the tablet is in read-only status here.
23,635✔
274
        if o.updateCommitStatusHelper(index, src) {
46,626✔
275
                delta := new(pb.OracleDelta)
22,991✔
276
                delta.Txns = append(delta.Txns, &pb.TxnStatus{
22,991✔
277
                        StartTs:  src.StartTs,
22,991✔
278
                        CommitTs: o.commitTs(src.StartTs),
22,991✔
279
                })
22,991✔
280
                o.updates <- delta
22,991✔
281
        }
22,991✔
282
}
283

284
func (o *Oracle) commitTs(startTs uint64) uint64 {
34,422✔
285
        o.RLock()
34,422✔
286
        defer o.RUnlock()
34,422✔
287
        return o.commits[startTs]
34,422✔
288
}
34,422✔
289

290
func (o *Oracle) storePending(ids *pb.AssignedIds) {
65,121✔
291
        // Wait to finish up processing everything before start id.
65,121✔
292
        max := x.Max(ids.EndId, ids.ReadOnly)
65,121✔
293
        if err := o.doneUntil.WaitForMark(context.Background(), max); err != nil {
65,121✔
294
                glog.Errorf("Error while waiting for mark: %+v", err)
×
295
        }
×
296

297
        // Now send it out to updates.
298
        o.updates <- &pb.OracleDelta{MaxAssigned: max}
65,121✔
299

65,121✔
300
        o.Lock()
65,121✔
301
        defer o.Unlock()
65,121✔
302
        o.maxAssigned = x.Max(o.maxAssigned, max)
65,121✔
303
}
304

305
// MaxPending returns the maximum assigned timestamp.
306
func (o *Oracle) MaxPending() uint64 {
493✔
307
        o.RLock()
493✔
308
        defer o.RUnlock()
493✔
309
        return o.maxAssigned
493✔
310
}
493✔
311

312
// proposeTxn proposes a txn update, and then updates src to reflect the state
313
// of the commit after proposal is run.
314
func (s *Server) proposeTxn(ctx context.Context, src *api.TxnContext) error {
11,431✔
315
        var zp pb.ZeroProposal
11,431✔
316
        zp.Txn = &api.TxnContext{
11,431✔
317
                StartTs:  src.StartTs,
11,431✔
318
                CommitTs: src.CommitTs,
11,431✔
319
                Aborted:  src.Aborted,
11,431✔
320
        }
11,431✔
321

11,431✔
322
        // NOTE: It is important that we continue retrying proposeTxn until we succeed. This should
11,431✔
323
        // happen, irrespective of what the user context timeout might be. We check for it before
11,431✔
324
        // reaching this stage, but now that we're here, we have to ensure that the commit proposal goes
11,431✔
325
        // through. Otherwise, we should block here forever. If we don't do this, we'll see txn
11,431✔
326
        // violations in Jepsen, because we'll send out a MaxAssigned higher than a commit, which would
11,431✔
327
        // cause newer txns to see older data.
11,431✔
328

11,431✔
329
        // If this node stops being the leader, we want this proposal to not be forwarded to the leader,
11,431✔
330
        // and get aborted.
11,431✔
331
        if err := s.Node.proposeAndWait(ctx, &zp); err != nil {
11,431✔
332
                return err
×
333
        }
×
334

335
        // There might be race between this proposal trying to commit and predicate
336
        // move aborting it. A predicate move, triggered by Zero, would abort all
337
        // pending transactions.  At the same time, a client which has already done
338
        // mutations, can proceed to commit it. A race condition can happen here,
339
        // with both proposing their respective states, only one can succeed after
340
        // the proposal is done. So, check again to see the fate of the transaction
341
        // here.
342
        src.CommitTs = s.orc.commitTs(src.StartTs)
11,431✔
343
        if src.CommitTs == 0 {
13,678✔
344
                src.Aborted = true
2,247✔
345
        }
2,247✔
346
        return nil
11,431✔
347
}
348

349
func (s *Server) commit(ctx context.Context, src *api.TxnContext) error {
11,431✔
350
        span := otrace.FromContext(ctx)
11,431✔
351
        span.Annotate([]otrace.Attribute{otrace.Int64Attribute("startTs", int64(src.StartTs))}, "")
11,431✔
352
        if src.Aborted {
11,863✔
353
                return s.proposeTxn(ctx, src)
432✔
354
        }
432✔
355

356
        // Use the start timestamp to check if we have a conflict, before we need to assign a commit ts.
357
        s.orc.RLock()
10,999✔
358
        conflict := s.orc.hasConflict(src)
10,999✔
359
        s.orc.RUnlock()
10,999✔
360
        if conflict {
12,779✔
361
                span.Annotate([]otrace.Attribute{otrace.BoolAttribute("abort", true)},
1,780✔
362
                        "Oracle found conflict")
1,780✔
363
                src.Aborted = true
1,780✔
364
                return s.proposeTxn(ctx, src)
1,780✔
365
        }
1,780✔
366

367
        checkPreds := func() error {
18,438✔
368
                // Check if any of these tablets is being moved. If so, abort the transaction.
9,219✔
369
                for _, pkey := range src.Preds {
28,773✔
370
                        splits := strings.SplitN(pkey, "-", 2)
19,554✔
371
                        if len(splits) < 2 {
19,554✔
372
                                return errors.Errorf("Unable to find group id in %s", pkey)
×
373
                        }
×
374
                        gid, err := strconv.Atoi(splits[0])
19,554✔
375
                        if err != nil {
19,554✔
376
                                return errors.Wrapf(err, "unable to parse group id from %s", pkey)
×
377
                        }
×
378
                        pred := splits[1]
19,554✔
379
                        tablet := s.ServingTablet(pred)
19,554✔
380
                        if tablet == nil {
19,554✔
381
                                return errors.Errorf("Tablet for %s is nil", pred)
×
382
                        }
×
383
                        if tablet.GroupId != uint32(gid) {
19,554✔
384
                                return errors.Errorf("Mutation done in group: %d. Predicate %s assigned to %d",
×
385
                                        gid, pred, tablet.GroupId)
×
386
                        }
×
387
                        if s.isBlocked(pred) {
19,554✔
388
                                return errors.Errorf("Commits on predicate %s are blocked due to predicate move", pred)
×
389
                        }
×
390
                }
391
                return nil
9,219✔
392
        }
393
        if err := checkPreds(); err != nil {
9,219✔
394
                span.Annotate([]otrace.Attribute{otrace.BoolAttribute("abort", true)}, err.Error())
×
395
                src.Aborted = true
×
396
                return s.proposeTxn(ctx, src)
×
397
        }
×
398

399
        num := pb.Num{Val: 1, Type: pb.Num_TXN_TS}
9,219✔
400
        assigned, err := s.lease(ctx, &num)
9,219✔
401
        if err != nil {
9,219✔
402
                return err
×
403
        }
×
404
        src.CommitTs = assigned.StartId
9,219✔
405
        // Mark the transaction as done, irrespective of whether the proposal succeeded or not.
9,219✔
406
        defer s.orc.doneUntil.Done(src.CommitTs)
9,219✔
407
        span.Annotatef([]otrace.Attribute{otrace.Int64Attribute("commitTs", int64(src.CommitTs))},
9,219✔
408
                "Node Id: %d. Proposing TxnContext: %+v", s.Node.Id, src)
9,219✔
409

9,219✔
410
        if err := s.orc.commit(src); err != nil {
9,255✔
411
                span.Annotatef(nil, "Found a conflict. Aborting.")
36✔
412
                src.Aborted = true
36✔
413
        }
36✔
414
        if err := ctx.Err(); err != nil {
9,219✔
415
                span.Annotatef(nil, "Aborting txn due to context timing out.")
×
416
                src.Aborted = true
×
417
        }
×
418
        // Propose txn should be used to set watermark as done.
419
        return s.proposeTxn(ctx, src)
9,219✔
420
}
421

422
// CommitOrAbort either commits a transaction or aborts it.
423
// The abortion can happen under the following conditions
424
// 1) the api.TxnContext.Aborted flag is set in the src argument
425
// 2) if there's an error (e.g server is not the leader or there's a conflicting transaction)
426
func (s *Server) CommitOrAbort(ctx context.Context, src *api.TxnContext) (*api.TxnContext, error) {
11,431✔
427
        if ctx.Err() != nil {
11,431✔
428
                return nil, ctx.Err()
×
429
        }
×
430
        ctx, span := otrace.StartSpan(ctx, "Zero.CommitOrAbort")
11,431✔
431
        defer span.End()
11,431✔
432

11,431✔
433
        if !s.Node.AmLeader() {
11,431✔
434
                return nil, errors.Errorf("Only leader can decide to commit or abort")
×
435
        }
×
436
        err := s.commit(ctx, src)
11,431✔
437
        if err != nil {
11,431✔
438
                span.Annotate([]otrace.Attribute{otrace.BoolAttribute("error", true)}, err.Error())
×
439
        }
×
440
        return src, err
11,431✔
441
}
442

443
var errClosed = errors.New("Streaming closed by oracle")
444
var errNotLeader = errors.New("Node is no longer leader")
445

446
// Oracle streams the oracle state to the alphas.
447
// The first entry sent by Zero contains the entire state of transactions. Zero periodically
448
// confirms receipt from the group, and truncates its state. This 2-way acknowledgement is a
449
// safe way to get the status of all the transactions.
450
func (s *Server) Oracle(_ *api.Payload, server pb.Zero_OracleServer) error {
82✔
451
        if !s.Node.AmLeader() {
82✔
452
                return errNotLeader
×
453
        }
×
454
        ch, id := s.orc.newSubscriber()
82✔
455
        defer s.orc.removeSubscriber(id)
82✔
456

82✔
457
        ctx := server.Context()
82✔
458
        leaderChangeCh := s.leaderChangeChannel()
82✔
459
        for {
153,008✔
460
                select {
152,926✔
461
                case <-leaderChangeCh:
×
462
                        return errNotLeader
×
463
                case delta, open := <-ch:
152,844✔
464
                        if !open {
152,844✔
465
                                return errClosed
×
466
                        }
×
467
                        // Pass in the latest group checksum as well, so the Alpha can use that to determine
468
                        // when not to service a read.
469
                        delta.GroupChecksums = s.groupChecksums()
152,844✔
470
                        if err := server.Send(&delta); err != nil {
152,844✔
471
                                return err
×
472
                        }
×
473
                case <-ctx.Done():
43✔
474
                        return ctx.Err()
43✔
475
                case <-s.closer.HasBeenClosed():
39✔
476
                        return errServerShutDown
39✔
477
                }
478
        }
479
}
480

481
// TryAbort attempts to abort the given transactions which are not already committed..
482
func (s *Server) TryAbort(ctx context.Context,
483
        txns *pb.TxnTimestamps) (*pb.OracleDelta, error) {
×
484
        delta := &pb.OracleDelta{}
×
485
        for _, startTs := range txns.Ts {
×
486
                // Do via proposals to avoid race
×
487
                tctx := &api.TxnContext{StartTs: startTs, Aborted: true}
×
488
                if err := s.proposeTxn(ctx, tctx); err != nil {
×
489
                        return delta, err
×
490
                }
×
491
                // Txn should be aborted if not already committed.
492
                delta.Txns = append(delta.Txns, &pb.TxnStatus{
×
493
                        StartTs:  startTs,
×
494
                        CommitTs: s.orc.commitTs(startTs)})
×
495
        }
496
        return delta, nil
×
497
}
498

499
// Timestamps is used to assign startTs for a new transaction
500
func (s *Server) Timestamps(ctx context.Context, num *pb.Num) (*pb.AssignedIds, error) {
65,811✔
501
        ctx, span := otrace.StartSpan(ctx, "Zero.Timestamps")
65,811✔
502
        defer span.End()
65,811✔
503

65,811✔
504
        span.Annotatef(nil, "Zero id: %d. Timestamp request: %+v", s.Node.Id, num)
65,811✔
505
        if ctx.Err() != nil {
65,811✔
506
                return &emptyAssignedIds, ctx.Err()
×
507
        }
×
508

509
        num.Type = pb.Num_TXN_TS
65,811✔
510
        reply, err := s.lease(ctx, num)
65,811✔
511
        span.Annotatef(nil, "Response: %+v. Error: %v", reply, err)
65,811✔
512

65,811✔
513
        switch err {
65,811✔
514
        case nil:
65,121✔
515
                s.orc.doneUntil.Done(x.Max(reply.EndId, reply.ReadOnly))
65,121✔
516
                go s.orc.storePending(reply)
65,121✔
517
        case errServedFromMemory:
688✔
518
                // Avoid calling doneUntil.Done, and storePending.
688✔
519
                err = nil
688✔
520
        default:
2✔
521
                glog.Errorf("Got error: %v while leasing timestamps: %+v", err, num)
2✔
522
        }
523
        return reply, err
65,811✔
524
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc