• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

dgraph-io / dgraph / 5262836423

14 Jun 2023 04:36AM UTC coverage: 66.963% (-0.3%) from 67.236%
5262836423

push

web-flow
Merge 9d86c52b0 into 2787cfc58

58133 of 86814 relevant lines covered (66.96%)

2264443.57 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

81.13
/dgraph/cmd/zero/oracle.go
1
/*
2
 * Copyright 2017-2023 Dgraph Labs, Inc. and Contributors
3
 *
4
 * Licensed under the Apache License, Version 2.0 (the "License");
5
 * you may not use this file except in compliance with the License.
6
 * You may obtain a copy of the License at
7
 *
8
 *     http://www.apache.org/licenses/LICENSE-2.0
9
 *
10
 * Unless required by applicable law or agreed to in writing, software
11
 * distributed under the License is distributed on an "AS IS" BASIS,
12
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
 * See the License for the specific language governing permissions and
14
 * limitations under the License.
15
 */
16

17
package zero
18

19
import (
20
        "context"
21
        "math/rand"
22
        "strconv"
23
        "strings"
24
        "time"
25

26
        "github.com/golang/glog"
27
        "github.com/pkg/errors"
28
        otrace "go.opencensus.io/trace"
29

30
        "github.com/dgraph-io/badger/v4/y"
31
        "github.com/dgraph-io/dgo/v230/protos/api"
32
        "github.com/dgraph-io/dgraph/protos/pb"
33
        "github.com/dgraph-io/dgraph/x"
34
        "github.com/dgraph-io/ristretto/z"
35
)
36

37
// Oracle stores and manages the transaction state and conflict detection.
38
type Oracle struct {
39
        x.SafeMutex
40
        commits map[uint64]uint64 // startTs -> commitTs
41
        // TODO: Check if we need LRU.
42
        keyCommit   *z.Tree // fp(key) -> commitTs. Used to detect conflict.
43
        maxAssigned uint64  // max transaction assigned by us.
44

45
        // All transactions with startTs < startTxnTs return true for hasConflict.
46
        startTxnTs  uint64
47
        subscribers map[int]chan pb.OracleDelta
48
        updates     chan *pb.OracleDelta
49
        doneUntil   y.WaterMark
50
}
51

52
// Init initializes the oracle.
53
func (o *Oracle) Init() {
64✔
54
        o.commits = make(map[uint64]uint64)
64✔
55
        // Remove the older btree file, before creating NewTree, as it may contain stale data leading
64✔
56
        // to wrong results.
64✔
57
        o.keyCommit = z.NewTree("oracle")
64✔
58
        o.subscribers = make(map[int]chan pb.OracleDelta)
64✔
59
        o.updates = make(chan *pb.OracleDelta, 100000) // Keeping 1 second worth of updates.
64✔
60
        o.doneUntil.Init(nil)
64✔
61
        go o.sendDeltasToSubscribers()
64✔
62
}
64✔
63

64
// close releases the memory associated with btree used for keycommit.
65
func (o *Oracle) close() {
64✔
66
        if err := o.keyCommit.Close(); err != nil {
64✔
67
                glog.Warningf("error while closing tree: %v", err)
×
68
        }
×
69
}
70

71
func (o *Oracle) updateStartTxnTs(ts uint64) {
52✔
72
        o.Lock()
52✔
73
        defer o.Unlock()
52✔
74
        o.startTxnTs = ts
52✔
75
        o.keyCommit.Reset()
52✔
76
}
52✔
77

78
// TODO: This should be done during proposal application for Txn status.
79
func (o *Oracle) hasConflict(src *api.TxnContext) bool {
19,257✔
80
        // This transaction was started before I became leader.
19,257✔
81
        if src.StartTs < o.startTxnTs {
19,257✔
82
                return true
×
83
        }
×
84
        for _, k := range src.Keys {
1,073,362✔
85
                ki, err := strconv.ParseUint(k, 36, 64)
1,054,105✔
86
                if err != nil {
1,054,105✔
87
                        glog.Errorf("Got error while parsing conflict key %q: %v\n", k, err)
×
88
                        continue
×
89
                }
90
                if last := o.keyCommit.Get(ki); last > src.StartTs {
1,055,958✔
91
                        return true
1,853✔
92
                }
1,853✔
93
        }
94
        return false
17,404✔
95
}
96

97
func (o *Oracle) purgeBelow(minTs uint64) {
41✔
98
        var timer x.Timer
41✔
99
        timer.Start()
41✔
100

41✔
101
        o.Lock()
41✔
102
        defer o.Unlock()
41✔
103

41✔
104
        // Set startTxnTs so that every txn with start ts less than this, would be aborted.
41✔
105
        o.startTxnTs = minTs
41✔
106

41✔
107
        // Dropping would be cheaper if abort/commits map is sharded
41✔
108
        for ts := range o.commits {
28,606✔
109
                if ts < minTs {
45,760✔
110
                        delete(o.commits, ts)
17,195✔
111
                }
17,195✔
112
        }
113
        timer.Record("commits")
41✔
114

41✔
115
        // There is no transaction running with startTs less than minTs
41✔
116
        // So we can delete everything from rowCommit whose commitTs < minTs
41✔
117
        stats := o.keyCommit.Stats()
41✔
118
        if stats.Occupancy < 50.0 {
75✔
119
                return
34✔
120
        }
34✔
121
        o.keyCommit.DeleteBelow(minTs)
7✔
122
        timer.Record("deleteBelow")
7✔
123
        glog.V(2).Infof("Purged below ts:%d, len(o.commits):%d, keyCommit: [before: %+v, after: %+v].\n",
7✔
124
                minTs, len(o.commits), stats, o.keyCommit.Stats())
7✔
125
        if timer.Total() > time.Second {
7✔
126
                glog.V(2).Infof("Purge %s\n", timer.String())
×
127
        }
×
128
}
129

130
func (o *Oracle) commit(src *api.TxnContext) error {
8,720✔
131
        o.Lock()
8,720✔
132
        defer o.Unlock()
8,720✔
133

8,720✔
134
        if o.hasConflict(src) {
8,756✔
135
                return x.ErrConflict
36✔
136
        }
36✔
137
        // We store src.Keys as string to ensure compatibility with all the various language clients we
138
        // have. But, really they are just uint64s encoded as strings. We use base 36 during creation of
139
        // these keys in FillContext in posting/mvcc.go.
140
        for _, k := range src.Keys {
534,431✔
141
                ki, err := strconv.ParseUint(k, 36, 64)
525,747✔
142
                if err != nil {
525,747✔
143
                        glog.Errorf("Got error while parsing conflict key %q: %v\n", k, err)
×
144
                        continue
×
145
                }
146
                o.keyCommit.Set(ki, src.CommitTs) // CommitTs is handed out before calling this func.
525,747✔
147
        }
148
        return nil
8,684✔
149
}
150

151
func (o *Oracle) currentState() *pb.OracleDelta {
78✔
152
        o.AssertRLock()
78✔
153
        resp := &pb.OracleDelta{}
78✔
154
        for start, commit := range o.commits {
96✔
155
                resp.Txns = append(resp.Txns,
18✔
156
                        &pb.TxnStatus{StartTs: start, CommitTs: commit})
18✔
157
        }
18✔
158
        resp.MaxAssigned = o.maxAssigned
78✔
159
        return resp
78✔
160
}
161

162
func (o *Oracle) newSubscriber() (<-chan pb.OracleDelta, int) {
78✔
163
        o.Lock()
78✔
164
        defer o.Unlock()
78✔
165
        var id int
78✔
166
        for {
156✔
167
                //nolint:gosec // random generator for node id does not require cryptographic precision
78✔
168
                id = rand.Int()
78✔
169
                if _, has := o.subscribers[id]; !has {
156✔
170
                        break
78✔
171
                }
172
        }
173

174
        // The channel takes a delta instead of a pointer as the receiver needs to
175
        // modify it by setting the group checksums. Passing a pointer previously
176
        // resulted in a race condition.
177
        ch := make(chan pb.OracleDelta, 1000)
78✔
178
        ch <- *o.currentState() // Queue up the full state as the first entry.
78✔
179
        o.subscribers[id] = ch
78✔
180
        return ch, id
78✔
181
}
182

183
func (o *Oracle) removeSubscriber(id int) {
78✔
184
        o.Lock()
78✔
185
        defer o.Unlock()
78✔
186
        delete(o.subscribers, id)
78✔
187
}
78✔
188

189
// sendDeltasToSubscribers reads updates from the o.updates
190
// constructs a delta object containing transactions from one or more updates
191
// and sends the delta object to each subscriber's channel
192
func (o *Oracle) sendDeltasToSubscribers() {
64✔
193
        delta := &pb.OracleDelta{}
64✔
194
        ticker := time.NewTicker(time.Second)
64✔
195
        defer ticker.Stop()
64✔
196

64✔
197
        // waitFor calculates the maximum value of delta.MaxAssigned and all the CommitTs of delta.Txns
64✔
198
        waitFor := func() uint64 {
103,541✔
199
                w := delta.MaxAssigned
103,477✔
200
                for _, txn := range delta.Txns {
43,910,545✔
201
                        w = x.Max(w, txn.CommitTs)
43,807,068✔
202
                }
43,807,068✔
203
                return w
103,477✔
204
        }
205

206
        for {
91,153✔
207
        get_update:
91,089✔
208
                var update *pb.OracleDelta
103,499✔
209
                select {
103,499✔
210
                case update = <-o.updates:
90,983✔
211
                case <-ticker.C:
12,452✔
212
                        wait := waitFor()
12,452✔
213
                        if wait == 0 || o.doneUntil.DoneUntil() < wait {
24,862✔
214
                                goto get_update
12,410✔
215
                        }
216
                        // Send empty update.
217
                        update = &pb.OracleDelta{}
42✔
218
                }
219
        slurp_loop:
91,025✔
220
                for {
183,308✔
221
                        delta.MaxAssigned = x.Max(delta.MaxAssigned, update.MaxAssigned)
92,283✔
222
                        delta.Txns = append(delta.Txns, update.Txns...)
92,283✔
223
                        select {
92,283✔
224
                        case update = <-o.updates:
1,258✔
225
                        default:
91,025✔
226
                                break slurp_loop
91,025✔
227
                        }
228
                }
229
                // No need to sort the txn updates here. Alpha would sort them before
230
                // applying.
231

232
                // Let's ensure that we have all the commits up until the max here.
233
                // Otherwise, we'll be sending commit timestamps out of order, which
234
                // would cause Alphas to drop some of them, during writes to Badger.
235
                if o.doneUntil.DoneUntil() < waitFor() {
110,000✔
236
                        continue // The for loop doing blocking reads from o.updates.
18,975✔
237
                        // We need at least one entry from the updates channel to pick up a missing update.
238
                        // Don't goto slurp_loop, because it would break from select immediately.
239
                }
240

241
                if glog.V(3) {
72,050✔
242
                        glog.Infof("DoneUntil: %d. Sending delta: %+v\n", o.doneUntil.DoneUntil(), delta)
×
243
                }
×
244
                o.Lock()
72,050✔
245
                for id, ch := range o.subscribers {
219,023✔
246
                        select {
146,973✔
247
                        case ch <- *delta:
146,973✔
248
                        default:
×
249
                                close(ch)
×
250
                                delete(o.subscribers, id)
×
251
                        }
252
                }
253
                o.Unlock()
72,050✔
254
                delta = &pb.OracleDelta{}
72,050✔
255
        }
256
}
257

258
func (o *Oracle) updateCommitStatusHelper(index uint64, src *api.TxnContext) bool {
23,131✔
259
        o.Lock()
23,131✔
260
        defer o.Unlock()
23,131✔
261
        if _, ok := o.commits[src.StartTs]; ok {
23,777✔
262
                return false
646✔
263
        }
646✔
264
        if src.Aborted {
27,722✔
265
                o.commits[src.StartTs] = 0
5,237✔
266
        } else {
22,485✔
267
                o.commits[src.StartTs] = src.CommitTs
17,248✔
268
        }
17,248✔
269
        return true
22,485✔
270
}
271

272
func (o *Oracle) updateCommitStatus(index uint64, src *api.TxnContext) {
23,131✔
273
        // TODO: We should check if the tablet is in read-only status here.
23,131✔
274
        if o.updateCommitStatusHelper(index, src) {
45,616✔
275
                delta := new(pb.OracleDelta)
22,485✔
276
                delta.Txns = append(delta.Txns, &pb.TxnStatus{
22,485✔
277
                        StartTs:  src.StartTs,
22,485✔
278
                        CommitTs: o.commitTs(src.StartTs),
22,485✔
279
                })
22,485✔
280
                o.updates <- delta
22,485✔
281
        }
22,485✔
282
}
283

284
func (o *Oracle) commitTs(startTs uint64) uint64 {
33,458✔
285
        o.RLock()
33,458✔
286
        defer o.RUnlock()
33,458✔
287
        return o.commits[startTs]
33,458✔
288
}
33,458✔
289

290
func (o *Oracle) storePending(ids *pb.AssignedIds) {
62,352✔
291
        // Wait to finish up processing everything before start id.
62,352✔
292
        max := x.Max(ids.EndId, ids.ReadOnly)
62,352✔
293
        if err := o.doneUntil.WaitForMark(context.Background(), max); err != nil {
62,352✔
294
                glog.Errorf("Error while waiting for mark: %+v", err)
×
295
        }
×
296

297
        // Now send it out to updates.
298
        o.updates <- &pb.OracleDelta{MaxAssigned: max}
62,352✔
299

62,352✔
300
        o.Lock()
62,352✔
301
        defer o.Unlock()
62,352✔
302
        o.maxAssigned = x.Max(o.maxAssigned, max)
62,352✔
303
}
304

305
// MaxPending returns the maximum assigned timestamp.
306
func (o *Oracle) MaxPending() uint64 {
478✔
307
        o.RLock()
478✔
308
        defer o.RUnlock()
478✔
309
        return o.maxAssigned
478✔
310
}
478✔
311

312
// proposeTxn proposes a txn update, and then updates src to reflect the state
313
// of the commit after proposal is run.
314
func (s *Server) proposeTxn(ctx context.Context, src *api.TxnContext) error {
10,973✔
315
        var zp pb.ZeroProposal
10,973✔
316
        zp.Txn = &api.TxnContext{
10,973✔
317
                StartTs:  src.StartTs,
10,973✔
318
                CommitTs: src.CommitTs,
10,973✔
319
                Aborted:  src.Aborted,
10,973✔
320
        }
10,973✔
321

10,973✔
322
        // NOTE: It is important that we continue retrying proposeTxn until we succeed. This should
10,973✔
323
        // happen, irrespective of what the user context timeout might be. We check for it before
10,973✔
324
        // reaching this stage, but now that we're here, we have to ensure that the commit proposal goes
10,973✔
325
        // through. Otherwise, we should block here forever. If we don't do this, we'll see txn
10,973✔
326
        // violations in Jepsen, because we'll send out a MaxAssigned higher than a commit, which would
10,973✔
327
        // cause newer txns to see older data.
10,973✔
328

10,973✔
329
        // If this node stops being the leader, we want this proposal to not be forwarded to the leader,
10,973✔
330
        // and get aborted.
10,973✔
331
        if err := s.Node.proposeAndWait(ctx, &zp); err != nil {
10,973✔
332
                return err
×
333
        }
×
334

335
        // There might be race between this proposal trying to commit and predicate
336
        // move aborting it. A predicate move, triggered by Zero, would abort all
337
        // pending transactions.  At the same time, a client which has already done
338
        // mutations, can proceed to commit it. A race condition can happen here,
339
        // with both proposing their respective states, only one can succeed after
340
        // the proposal is done. So, check again to see the fate of the transaction
341
        // here.
342
        src.CommitTs = s.orc.commitTs(src.StartTs)
10,973✔
343
        if src.CommitTs == 0 {
13,261✔
344
                src.Aborted = true
2,288✔
345
        }
2,288✔
346
        return nil
10,973✔
347
}
348

349
func (s *Server) commit(ctx context.Context, src *api.TxnContext) error {
10,973✔
350
        span := otrace.FromContext(ctx)
10,973✔
351
        span.Annotate([]otrace.Attribute{otrace.Int64Attribute("startTs", int64(src.StartTs))}, "")
10,973✔
352
        if src.Aborted {
11,409✔
353
                return s.proposeTxn(ctx, src)
436✔
354
        }
436✔
355

356
        // Use the start timestamp to check if we have a conflict, before we need to assign a commit ts.
357
        s.orc.RLock()
10,537✔
358
        conflict := s.orc.hasConflict(src)
10,537✔
359
        s.orc.RUnlock()
10,537✔
360
        if conflict {
12,354✔
361
                span.Annotate([]otrace.Attribute{otrace.BoolAttribute("abort", true)},
1,817✔
362
                        "Oracle found conflict")
1,817✔
363
                src.Aborted = true
1,817✔
364
                return s.proposeTxn(ctx, src)
1,817✔
365
        }
1,817✔
366

367
        checkPreds := func() error {
17,440✔
368
                // Check if any of these tablets is being moved. If so, abort the transaction.
8,720✔
369
                for _, pkey := range src.Preds {
27,776✔
370
                        splits := strings.SplitN(pkey, "-", 2)
19,056✔
371
                        if len(splits) < 2 {
19,056✔
372
                                return errors.Errorf("Unable to find group id in %s", pkey)
×
373
                        }
×
374
                        gid, err := strconv.Atoi(splits[0])
19,056✔
375
                        if err != nil {
19,056✔
376
                                return errors.Wrapf(err, "unable to parse group id from %s", pkey)
×
377
                        }
×
378
                        pred := splits[1]
19,056✔
379
                        tablet := s.ServingTablet(pred)
19,056✔
380
                        if tablet == nil {
19,056✔
381
                                return errors.Errorf("Tablet for %s is nil", pred)
×
382
                        }
×
383
                        if tablet.GroupId != uint32(gid) {
19,056✔
384
                                return errors.Errorf("Mutation done in group: %d. Predicate %s assigned to %d",
×
385
                                        gid, pred, tablet.GroupId)
×
386
                        }
×
387
                        if s.isBlocked(pred) {
19,056✔
388
                                return errors.Errorf("Commits on predicate %s are blocked due to predicate move", pred)
×
389
                        }
×
390
                }
391
                return nil
8,720✔
392
        }
393
        if err := checkPreds(); err != nil {
8,720✔
394
                span.Annotate([]otrace.Attribute{otrace.BoolAttribute("abort", true)}, err.Error())
×
395
                src.Aborted = true
×
396
                return s.proposeTxn(ctx, src)
×
397
        }
×
398

399
        num := pb.Num{Val: 1, Type: pb.Num_TXN_TS}
8,720✔
400
        assigned, err := s.lease(ctx, &num)
8,720✔
401
        if err != nil {
8,720✔
402
                return err
×
403
        }
×
404
        src.CommitTs = assigned.StartId
8,720✔
405
        // Mark the transaction as done, irrespective of whether the proposal succeeded or not.
8,720✔
406
        defer s.orc.doneUntil.Done(src.CommitTs)
8,720✔
407
        span.Annotatef([]otrace.Attribute{otrace.Int64Attribute("commitTs", int64(src.CommitTs))},
8,720✔
408
                "Node Id: %d. Proposing TxnContext: %+v", s.Node.Id, src)
8,720✔
409

8,720✔
410
        if err := s.orc.commit(src); err != nil {
8,756✔
411
                span.Annotatef(nil, "Found a conflict. Aborting.")
36✔
412
                src.Aborted = true
36✔
413
        }
36✔
414
        if err := ctx.Err(); err != nil {
8,720✔
415
                span.Annotatef(nil, "Aborting txn due to context timing out.")
×
416
                src.Aborted = true
×
417
        }
×
418
        // Propose txn should be used to set watermark as done.
419
        return s.proposeTxn(ctx, src)
8,720✔
420
}
421

422
// CommitOrAbort either commits a transaction or aborts it.
423
// The abortion can happen under the following conditions
424
// 1) the api.TxnContext.Aborted flag is set in the src argument
425
// 2) if there's an error (e.g server is not the leader or there's a conflicting transaction)
426
func (s *Server) CommitOrAbort(ctx context.Context, src *api.TxnContext) (*api.TxnContext, error) {
10,973✔
427
        if ctx.Err() != nil {
10,973✔
428
                return nil, ctx.Err()
×
429
        }
×
430
        ctx, span := otrace.StartSpan(ctx, "Zero.CommitOrAbort")
10,973✔
431
        defer span.End()
10,973✔
432

10,973✔
433
        if !s.Node.AmLeader() {
10,973✔
434
                return nil, errors.Errorf("Only leader can decide to commit or abort")
×
435
        }
×
436
        err := s.commit(ctx, src)
10,973✔
437
        if err != nil {
10,973✔
438
                span.Annotate([]otrace.Attribute{otrace.BoolAttribute("error", true)}, err.Error())
×
439
        }
×
440
        return src, err
10,973✔
441
}
442

443
var errClosed = errors.New("Streaming closed by oracle")
444
var errNotLeader = errors.New("Node is no longer leader")
445

446
// Oracle streams the oracle state to the alphas.
447
// The first entry sent by Zero contains the entire state of transactions. Zero periodically
448
// confirms receipt from the group, and truncates its state. This 2-way acknowledgement is a
449
// safe way to get the status of all the transactions.
450
func (s *Server) Oracle(_ *api.Payload, server pb.Zero_OracleServer) error {
78✔
451
        if !s.Node.AmLeader() {
78✔
452
                return errNotLeader
×
453
        }
×
454
        ch, id := s.orc.newSubscriber()
78✔
455
        defer s.orc.removeSubscriber(id)
78✔
456

78✔
457
        ctx := server.Context()
78✔
458
        leaderChangeCh := s.leaderChangeChannel()
78✔
459
        for {
147,207✔
460
                select {
147,129✔
461
                case <-leaderChangeCh:
×
462
                        return errNotLeader
×
463
                case delta, open := <-ch:
147,051✔
464
                        if !open {
147,051✔
465
                                return errClosed
×
466
                        }
×
467
                        // Pass in the latest group checksum as well, so the Alpha can use that to determine
468
                        // when not to service a read.
469
                        delta.GroupChecksums = s.groupChecksums()
147,051✔
470
                        if err := server.Send(&delta); err != nil {
147,051✔
471
                                return err
×
472
                        }
×
473
                case <-ctx.Done():
34✔
474
                        return ctx.Err()
34✔
475
                case <-s.closer.HasBeenClosed():
44✔
476
                        return errServerShutDown
44✔
477
                }
478
        }
479
}
480

481
// TryAbort attempts to abort the given transactions which are not already committed..
482
func (s *Server) TryAbort(ctx context.Context,
483
        txns *pb.TxnTimestamps) (*pb.OracleDelta, error) {
×
484
        delta := &pb.OracleDelta{}
×
485
        for _, startTs := range txns.Ts {
×
486
                // Do via proposals to avoid race
×
487
                tctx := &api.TxnContext{StartTs: startTs, Aborted: true}
×
488
                if err := s.proposeTxn(ctx, tctx); err != nil {
×
489
                        return delta, err
×
490
                }
×
491
                // Txn should be aborted if not already committed.
492
                delta.Txns = append(delta.Txns, &pb.TxnStatus{
×
493
                        StartTs:  startTs,
×
494
                        CommitTs: s.orc.commitTs(startTs)})
×
495
        }
496
        return delta, nil
×
497
}
498

499
// Timestamps is used to assign startTs for a new transaction
500
func (s *Server) Timestamps(ctx context.Context, num *pb.Num) (*pb.AssignedIds, error) {
63,042✔
501
        ctx, span := otrace.StartSpan(ctx, "Zero.Timestamps")
63,042✔
502
        defer span.End()
63,042✔
503

63,042✔
504
        span.Annotatef(nil, "Zero id: %d. Timestamp request: %+v", s.Node.Id, num)
63,042✔
505
        if ctx.Err() != nil {
63,042✔
506
                return &emptyAssignedIds, ctx.Err()
×
507
        }
×
508

509
        num.Type = pb.Num_TXN_TS
63,042✔
510
        reply, err := s.lease(ctx, num)
63,042✔
511
        span.Annotatef(nil, "Response: %+v. Error: %v", reply, err)
63,042✔
512

63,042✔
513
        switch err {
63,042✔
514
        case nil:
62,352✔
515
                s.orc.doneUntil.Done(x.Max(reply.EndId, reply.ReadOnly))
62,352✔
516
                go s.orc.storePending(reply)
62,352✔
517
        case errServedFromMemory:
688✔
518
                // Avoid calling doneUntil.Done, and storePending.
688✔
519
                err = nil
688✔
520
        default:
2✔
521
                glog.Errorf("Got error: %v while leasing timestamps: %+v", err, num)
2✔
522
        }
523
        return reply, err
63,042✔
524
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc